update networkmanager dmenu

This commit is contained in:
mintycube 2024-10-05 07:32:04 +05:00
parent b25f389121
commit 4599a4fe84
2 changed files with 406 additions and 247 deletions

View File

@ -1,28 +1,9 @@
[dmenu] [dmenu]
dmenu_command = dmenu -i -bw 2 -W 250 -X 1101 -Y 15 -l 15 dmenu_command = dmenu -i -bw 2 -W 350 -X 1008 -Y 558 -l 8
# # Note that dmenu_command can contain arguments as well like: active_chars = 󱘖
# # `dmenu_command = rofi -dmenu -i -theme nmdm`
# # `dmenu_command = rofi -dmenu -width 30 -i`
# # `dmenu_command = dmenu -i -l 25 -b -nb #909090 -nf #303030`
# rofi_highlight = <True or False> # (Default: False) use rofi highlighting instead of '=='
rofi_highlight = True
# compact = <True or False> # (Default: False). Remove extra spacing from display
compact = True compact = True
# pinentry = <Pinentry command> # (Default: None) e.g. `pinentry-gtk` wifi_icons = 󰤯󰤟󰤢󰤥󰤨
# wifi_chars = <string of 4 unicode characters representing 1-4 bars strength> format = {icon} {name}
# wifi_chars = ▂▄▆█
# list_saved = <True or False> # (Default: False) list saved connections
[dmenu_passphrase] [dmenu_passphrase]
# # Uses the -password flag for Rofi, -x for bemenu. For dmenu, sets -nb and
# # -nf to the same color or uses -P if the dmenu password patch is applied
# # https://tools.suckless.org/dmenu/patches/password/
obscure = True obscure = True
# obscure_color = #222222
[editor]
# terminal = <name of terminal program>
# gui_if_available = <True or False> (Default: True)
[nmdm]
# rescan_delay = <seconds> # (seconds to wait after a wifi rescan before redisplaying the results)

View File

@ -12,22 +12,24 @@ Add dmenu options and default terminal if desired to
~/.config/networkmanager-dmenu/config.ini ~/.config/networkmanager-dmenu/config.ini
""" """
import pathlib
import struct
import configparser import configparser
import locale import locale
import os import os
from os.path import basename, expanduser import pathlib
import shlex import shlex
from shutil import which import struct
import sys
from time import sleep
import uuid
import subprocess import subprocess
import sys
import uuid
from os.path import basename, expanduser
from shutil import which
from time import sleep
# pylint: disable=import-error # pylint: disable=import-error
import gi import gi
gi.require_version('NM', '1.0')
gi.require_version("NM", "1.0")
from gi.repository import GLib, NM # noqa pylint: disable=wrong-import-position from gi.repository import GLib, NM # noqa pylint: disable=wrong-import-position
# pylint: enable=import-error # pylint: enable=import-error
@ -39,7 +41,7 @@ CONF.read(expanduser("~/.config/networkmanager-dmenu/config.ini"))
def cli_args(): def cli_args():
""" Don't override dmenu_cmd function arguments with CLI args. Removes -l """Don't override dmenu_cmd function arguments with CLI args. Removes -l
and -p if those are passed on the command line. and -p if those are passed on the command line.
Exception: if -l is passed and dmenu_command is not defined, assume that the Exception: if -l is passed and dmenu_command is not defined, assume that the
@ -49,9 +51,9 @@ def cli_args():
""" """
args = sys.argv[1:] args = sys.argv[1:]
cmd = CONF.get('dmenu', 'dmenu_command', fallback=False) cmd = CONF.get("dmenu", "dmenu_command", fallback=False)
if "-l" in args or "-p" in args: if "-l" in args or "-p" in args:
for nope in ['-l', '-p'] if cmd is not False else ['-p']: for nope in ["-l", "-p"] if cmd is not False else ["-p"]:
try: try:
nope_idx = args.index(nope) nope_idx = args.index(nope)
del args[nope_idx] del args[nope_idx]
@ -70,13 +72,14 @@ def dmenu_pass(command, color):
Returns: list or None Returns: list or None
""" """
if command != 'dmenu': if command != "dmenu":
return None return None
try: try:
# Check for dmenu password patch # Check for dmenu password patch
dm_patch = b'P' in subprocess.run(["dmenu", "-h"], dm_patch = (
capture_output=True, b"P"
check=False).stderr in subprocess.run(["dmenu", "-h"], capture_output=True, check=False).stderr
)
except FileNotFoundError: except FileNotFoundError:
dm_patch = False dm_patch = False
return ["-P"] if dm_patch else ["-nb", color, "-nf", color] return ["-P"] if dm_patch else ["-nb", color, "-nf", color]
@ -93,36 +96,46 @@ def dmenu_cmd(num_lines, prompt="Networks", active_lines=None):
""" """
# Create command string # Create command string
commands = {"dmenu": ["-p", str(prompt)], commands = {
"rofi": ["-dmenu", "-p", str(prompt)], "dmenu": ["-p", str(prompt)],
"bemenu": ["-p", str(prompt)], "rofi": ["-dmenu", "-p", str(prompt)],
"wofi": ["-p", str(prompt)], "bemenu": ["-p", str(prompt)],
"fuzzel": ["-p", str(prompt), "-l", str(num_lines), "--log-level", "none"]} "wofi": ["-p", str(prompt)],
command = shlex.split(CONF.get('dmenu', 'dmenu_command', fallback="dmenu")) "fuzzel": ["-p", str(prompt), "--log-level", "none"],
}
command = shlex.split(CONF.get("dmenu", "dmenu_command", fallback="dmenu"))
cmd_base = basename(command[0]) cmd_base = basename(command[0])
command.extend(cli_args()) command.extend(cli_args())
command.extend(commands.get(cmd_base, [])) command.extend(commands.get(cmd_base, []))
# Rofi Highlighting # Highlighting
rofi_highlight = CONF.getboolean('dmenu', 'rofi_highlight', fallback=False) highlight = CONF.getboolean("dmenu", "highlight", fallback=False)
if rofi_highlight is True and cmd_base == "rofi" and active_lines: if highlight is True:
command.extend(["-a", ",".join([str(num) for num in active_lines])]) # Rofi
if cmd_base == "rofi" and active_lines:
command.extend(["-a", ",".join([str(num) for num in active_lines])])
# Wofi
if cmd_base == "wofi" and active_lines:
# add '-q' to prevent tag name and properties of pango markup from searchable
command.extend(["-m", "-q"])
# Passphrase prompts # Passphrase prompts
obscure = CONF.getboolean('dmenu_passphrase', 'obscure', fallback=False) obscure = CONF.getboolean("dmenu_passphrase", "obscure", fallback=False)
if prompt == "Passphrase" and obscure is True: if prompt == "Passphrase" and obscure is True:
obscure_color = CONF.get('dmenu_passphrase', 'obscure_color', fallback='#222222') obscure_color = CONF.get(
pass_prompts = {"dmenu": dmenu_pass(cmd_base, obscure_color), "dmenu_passphrase", "obscure_color", fallback="#222222"
"rofi": ['-password'], )
"bemenu": ['-x'], pass_prompts = {
"wofi": ['-P'], "dmenu": dmenu_pass(cmd_base, obscure_color),
"fuzzel": ['--password']} "rofi": ["-password"],
"bemenu": ["-x"],
"wofi": ["-P"],
"fuzzel": ["--password"],
}
command.extend(pass_prompts.get(cmd_base, [])) command.extend(pass_prompts.get(cmd_base, []))
return command return command
def choose_adapter(client): def choose_adapter(client):
"""If there is more than one wifi adapter installed, ask which one to use """If there is more than one wifi adapter installed, ask which one to use"""
"""
devices = client.get_devices() devices = client.get_devices()
devices = [i for i in devices if i.get_device_type() == NM.DeviceType.WIFI] devices = [i for i in devices if i.get_device_type() == NM.DeviceType.WIFI]
if not devices: if not devices:
@ -130,12 +143,14 @@ def choose_adapter(client):
if len(devices) == 1: if len(devices) == 1:
return devices[0] return devices[0]
device_names = "\n".join([d.get_iface() for d in devices]) device_names = "\n".join([d.get_iface() for d in devices])
sel = subprocess.run(dmenu_cmd(len(devices), "CHOOSE ADAPTER:"), sel = subprocess.run(
capture_output=True, dmenu_cmd(len(devices), "CHOOSE ADAPTER:"),
check=False, capture_output=True,
env=ENV, check=False,
input=device_names, env=ENV,
encoding=ENC).stdout input=device_names,
encoding=ENC,
).stdout
if not sel.strip(): if not sel.strip():
sys.exit() sys.exit()
devices = [i for i in devices if i.get_iface() == sel.strip()] devices = [i for i in devices if i.get_iface() == sel.strip()]
@ -149,22 +164,39 @@ def is_installed(cmd):
return which(cmd) is not None return which(cmd) is not None
def is_running(cmd):
try:
subprocess.check_output(["pidof", cmd])
except subprocess.CalledProcessError:
return False
else:
return True
def bluetooth_get_enabled(): def bluetooth_get_enabled():
"""Check if bluetooth is enabled via rfkill. """Check if bluetooth is enabled. Try bluetoothctl first, then rfkill.
Returns None if no bluetooth device was found. Returns None if no bluetooth device was found.
""" """
if is_installed("bluetoothctl") and is_running("bluetoothd"):
# Times out in 2 seconds, otherwise bluetoothctl will hang if bluetooth
# service isn't running.
try:
res = subprocess.run(
["bluetoothctl", "show"], timeout=2, capture_output=True, text=True
)
return "Powered: yes" in res.stdout
except subprocess.TimeoutExpired:
pass
# See https://www.kernel.org/doc/Documentation/ABI/stable/sysfs-class-rfkill # See https://www.kernel.org/doc/Documentation/ABI/stable/sysfs-class-rfkill
for path in pathlib.Path('/sys/class/rfkill/').glob('rfkill*'): for path in pathlib.Path("/sys/class/rfkill/").glob("rfkill*"):
if (path / 'type').read_text().strip() == 'bluetooth': if (path / "type").read_text().strip() == "bluetooth":
return (path / 'soft').read_text().strip() == '0' return (path / "soft").read_text().strip() == "0"
return None return None
def create_other_actions(client): def create_other_actions(client):
"""Return list of other actions that can be taken """Return list of other actions that can be taken"""
"""
networking_enabled = client.networking_get_enabled() networking_enabled = client.networking_get_enabled()
networking_action = "Disable" if networking_enabled else "Enable" networking_action = "Disable" if networking_enabled else "Enable"
@ -174,15 +206,22 @@ def create_other_actions(client):
bluetooth_enabled = bluetooth_get_enabled() bluetooth_enabled = bluetooth_get_enabled()
bluetooth_action = "Disable" if bluetooth_enabled else "Enable" bluetooth_action = "Disable" if bluetooth_enabled else "Enable"
actions = [Action(f"{wifi_action} Wifi", toggle_wifi, actions = [
not wifi_enabled), Action(f"{wifi_action} Wifi", toggle_wifi, not wifi_enabled),
Action(f"{networking_action} Networking", Action(
toggle_networking, not networking_enabled)] f"{networking_action} Networking", toggle_networking, not networking_enabled
),
]
if bluetooth_enabled is not None: if bluetooth_enabled is not None:
actions.append(Action(f"{bluetooth_action} Bluetooth", actions.append(
toggle_bluetooth, not bluetooth_enabled)) Action(
actions += [Action("Launch Connection Manager", launch_connection_editor), f"{bluetooth_action} Bluetooth", toggle_bluetooth, not bluetooth_enabled
Action("Delete a Connection", delete_connection)] )
)
actions += [
Action("Launch Connection Manager", launch_connection_editor),
Action("Delete a Connection", delete_connection),
]
if wifi_enabled: if wifi_enabled:
actions.append(Action("Rescan Wifi Networks", rescan_wifi)) actions.append(Action("Rescan Wifi Networks", rescan_wifi))
return actions return actions
@ -192,7 +231,7 @@ def rescan_wifi():
""" """
Rescan Wifi Access Points Rescan Wifi Access Points
""" """
delay = CONF.getint('nmdm', 'rescan_delay', fallback=5) delay = CONF.getint("nmdm", "rescan_delay", fallback=5)
for dev in CLIENT.get_devices(): for dev in CLIENT.get_devices():
if gi.repository.NM.DeviceWifi == type(dev): if gi.repository.NM.DeviceWifi == type(dev):
try: try:
@ -209,9 +248,7 @@ def rescan_wifi():
def rescan_cb(dev, res, data): def rescan_cb(dev, res, data):
"""Callback for rescan_wifi. Just for notifications """Callback for rescan_wifi. Just for notifications"""
"""
if dev.request_scan_finish(res) is True: if dev.request_scan_finish(res) is True:
notify("Wifi scan running...") notify("Wifi scan running...")
else: else:
@ -220,7 +257,7 @@ def rescan_cb(dev, res, data):
def ssid_to_utf8(nm_ap): def ssid_to_utf8(nm_ap):
""" Convert binary ssid to utf-8 """ """Convert binary ssid to utf-8"""
ssid = nm_ap.get_ssid() ssid = nm_ap.get_ssid()
if not ssid: if not ssid:
return "" return ""
@ -236,25 +273,30 @@ def prompt_saved(saved_cons):
def ap_security(nm_ap): def ap_security(nm_ap):
"""Parse the security flags to return a string with 'WPA2', etc. """ """Parse the security flags to return a string with 'WPA2', etc."""
flags = nm_ap.get_flags() flags = nm_ap.get_flags()
wpa_flags = nm_ap.get_wpa_flags() wpa_flags = nm_ap.get_wpa_flags()
rsn_flags = nm_ap.get_rsn_flags() rsn_flags = nm_ap.get_rsn_flags()
sec_str = "" sec_str = ""
if ((flags & getattr(NM, '80211ApFlags').PRIVACY) and if (
(wpa_flags == 0) and (rsn_flags == 0)): (flags & getattr(NM, "80211ApFlags").PRIVACY)
and (wpa_flags == 0)
and (rsn_flags == 0)
):
sec_str = " WEP" sec_str = " WEP"
if wpa_flags: if wpa_flags:
sec_str = " WPA1" sec_str = " WPA1"
if rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_PSK: if rsn_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_PSK:
sec_str += " WPA2" sec_str += " WPA2"
if rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_SAE: if rsn_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_SAE:
sec_str += " WPA3" sec_str += " WPA3"
if ((wpa_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X) or if (wpa_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_802_1X) or (
(rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X)): rsn_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_802_1X
):
sec_str += " 802.1X" sec_str += " 802.1X"
if ((wpa_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_OWE) or if (wpa_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_OWE) or (
(rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_OWE)): rsn_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_OWE
):
sec_str += " OWE" sec_str += " OWE"
# If there is no security use "--" # If there is no security use "--"
@ -263,13 +305,10 @@ def ap_security(nm_ap):
return sec_str.lstrip() return sec_str.lstrip()
class Action(): # pylint: disable=too-few-public-methods class Action: # pylint: disable=too-few-public-methods
"""Helper class to execute functions from a string variable""" """Helper class to execute functions from a string variable"""
def __init__(self,
name, def __init__(self, name, func, args=None, active=False):
func,
args=None,
active=False):
self.name = name self.name = name
self.func = func self.func = func
self.is_active = active self.is_active = active
@ -324,16 +363,19 @@ def process_ap(nm_ap, is_active, adapter):
CLIENT.deactivate_connection_async(nm_ap, None, deactivate_cb, nm_ap) CLIENT.deactivate_connection_async(nm_ap, None, deactivate_cb, nm_ap)
LOOP.run() LOOP.run()
else: else:
conns_cur = [i for i in CONNS if conns_cur = [
i.get_setting_wireless() is not None and i
conn_matches_adapter(i, adapter)] for i in CONNS
if i.get_setting_wireless() is not None and conn_matches_adapter(i, adapter)
]
con = nm_ap.filter_connections(conns_cur) con = nm_ap.filter_connections(conns_cur)
if len(con) > 1: if len(con) > 1:
raise ValueError("There are multiple connections possible") raise ValueError("There are multiple connections possible")
if len(con) == 1: if len(con) == 1:
CLIENT.activate_connection_async(con[0], adapter, nm_ap.get_path(), CLIENT.activate_connection_async(
None, activate_cb, nm_ap) con[0], adapter, nm_ap.get_path(), None, activate_cb, nm_ap
)
LOOP.run() LOOP.run()
else: else:
if ap_security(nm_ap) != "--": if ap_security(nm_ap) != "--":
@ -344,9 +386,7 @@ def process_ap(nm_ap, is_active, adapter):
def activate_cb(dev, res, data): def activate_cb(dev, res, data):
"""Notification if activate connection completed successfully """Notification if activate connection completed successfully"""
"""
try: try:
conn = dev.activate_connection_finish(res) conn = dev.activate_connection_finish(res)
except GLib.Error: except GLib.Error:
@ -359,9 +399,7 @@ def activate_cb(dev, res, data):
def deactivate_cb(dev, res, data): def deactivate_cb(dev, res, data):
"""Notification if deactivate connection completed successfully """Notification if deactivate connection completed successfully"""
"""
if dev.deactivate_connection_finish(res) is True: if dev.deactivate_connection_finish(res) is True:
notify(f"Deactivated {data.get_id()}") notify(f"Deactivated {data.get_id()}")
else: else:
@ -372,17 +410,17 @@ def deactivate_cb(dev, res, data):
def process_vpngsm(con, activate): def process_vpngsm(con, activate):
"""Activate/deactive VPN or GSM connections""" """Activate/deactive VPN or GSM connections"""
if activate: if activate:
CLIENT.activate_connection_async(con, None, None, CLIENT.activate_connection_async(con, None, None, None, activate_cb, con)
None, activate_cb, con)
else: else:
CLIENT.deactivate_connection_async(con, None, deactivate_cb, con) CLIENT.deactivate_connection_async(con, None, deactivate_cb, con)
LOOP.run() LOOP.run()
def strength_bars(signal_strength): def strength_bars(signal_strength):
bars = NM.utils_wifi_strength_bars(signal_strength) bars = NM.utils_wifi_strength_bars(signal_strength)
wifi_chars = CONF.get("dmenu", "wifi_chars", fallback=False) wifi_chars = CONF.get("dmenu", "wifi_chars", fallback=False)
if wifi_chars: if wifi_chars:
bars = "".join([wifi_chars[i] for i, j in enumerate(bars) if j == '*']) bars = "".join([wifi_chars[i] for i, j in enumerate(bars) if j == "*"])
return bars return bars
@ -410,22 +448,37 @@ def create_ap_actions(aps, active_ap, active_connection, adapter): # noqa pylin
if CONF.getboolean("dmenu", "compact", fallback=False): if CONF.getboolean("dmenu", "compact", fallback=False):
format = CONF.get("dmenu", "format", fallback="{name} {sec} {bars}") format = CONF.get("dmenu", "format", fallback="{name} {sec} {bars}")
else: else:
format = CONF.get("dmenu", "format", fallback="{name:<{max_len_name}s} {sec:<{max_len_sec}s} {bars:>4}") format = CONF.get(
"dmenu",
"format",
fallback="{name:<{max_len_name}s} {sec:<{max_len_sec}s} {bars:>4}",
)
for nm_ap, name, sec in zip(aps, names, secs): for nm_ap, name, sec in zip(aps, names, secs):
is_active = nm_ap.get_bssid() == active_ap_bssid is_active = nm_ap.get_bssid() == active_ap_bssid
signal_strength = nm_ap.get_strength() signal_strength = nm_ap.get_strength()
bars = strength_bars(signal_strength) bars = strength_bars(signal_strength)
icon = strength_icon(signal_strength) icon = strength_icon(signal_strength)
action_name = format.format(name=name, sec=sec, signal=signal_strength, bars=bars, icon=icon, action_name = format.format(
max_len_name=max_len_name, max_len_sec=max_len_sec) name=name,
sec=sec,
signal=signal_strength,
bars=bars,
icon=icon,
max_len_name=max_len_name,
max_len_sec=max_len_sec,
)
if is_active: if is_active:
ap_actions.append(Action(action_name, process_ap, ap_actions.append(
[active_connection, True, adapter], Action(
active=True)) action_name,
process_ap,
[active_connection, True, adapter],
active=True,
)
)
else: else:
ap_actions.append(Action(action_name, process_ap, ap_actions.append(Action(action_name, process_ap, [nm_ap, False, adapter]))
[nm_ap, False, adapter]))
return ap_actions return ap_actions
@ -438,6 +491,15 @@ def create_vpn_actions(vpns, active):
return _create_vpngsm_actions(vpns, active_vpns, "VPN") return _create_vpngsm_actions(vpns, active_vpns, "VPN")
def create_vlan_actions(vlans, active):
"""Create the list of strings to display with associated function
(activate/deactivate) for VLAN connections.
"""
active_vlans = [i for i in active if "vlan" == i.get_connection_type()]
return _create_vpngsm_actions(vlans, active_vlans, "VLAN")
def create_wireguard_actions(wgs, active): def create_wireguard_actions(wgs, active):
"""Create the list of strings to display with associated function """Create the list of strings to display with associated function
(activate/deactivate) for Wireguard connections. (activate/deactivate) for Wireguard connections.
@ -452,25 +514,31 @@ def create_eth_actions(eths, active):
(activate/deactivate) for Ethernet connections. (activate/deactivate) for Ethernet connections.
""" """
active_eths = [i for i in active if 'ethernet' in i.get_connection_type()] active_eths = [i for i in active if "ethernet" in i.get_connection_type()]
return _create_vpngsm_actions(eths, active_eths, "Eth") return _create_vpngsm_actions(eths, active_eths, "Eth")
def create_gsm_actions(gsms, active): def create_gsm_actions(gsms, active):
"""Create the list of strings to display with associated function """Create the list of strings to display with associated function
(activate/deactivate) GSM connections.""" (activate/deactivate) GSM connections."""
active_gsms = [i for i in active if active_gsms = [
i.get_connection() is not None and i
i.get_connection().is_type(NM.SETTING_GSM_SETTING_NAME)] for i in active
if i.get_connection() is not None
and i.get_connection().is_type(NM.SETTING_GSM_SETTING_NAME)
]
return _create_vpngsm_actions(gsms, active_gsms, "GSM") return _create_vpngsm_actions(gsms, active_gsms, "GSM")
def create_blue_actions(blues, active): def create_blue_actions(blues, active):
"""Create the list of strings to display with associated function """Create the list of strings to display with associated function
(activate/deactivate) Bluetooth connections.""" (activate/deactivate) Bluetooth connections."""
active_blues = [i for i in active if active_blues = [
i.get_connection() is not None and i
i.get_connection().is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)] for i in active
if i.get_connection() is not None
and i.get_connection().is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)
]
return _create_vpngsm_actions(blues, active_blues, "Bluetooth") return _create_vpngsm_actions(blues, active_blues, "Bluetooth")
@ -489,30 +557,29 @@ def _create_vpngsm_actions(cons, active_cons, label):
is_active = con.get_id() in active_con_ids is_active = con.get_id() in active_con_ids
action_name = f"{con.get_id()}:{label}" action_name = f"{con.get_id()}:{label}"
if is_active: if is_active:
active_connection = [a for a in active_cons active_connection = [a for a in active_cons if a.get_id() == con.get_id()]
if a.get_id() == con.get_id()]
if len(active_connection) != 1: if len(active_connection) != 1:
raise ValueError(f"Multiple active connections match {con.get_id()}") raise ValueError(f"Multiple active connections match {con.get_id()}")
active_connection = active_connection[0] active_connection = active_connection[0]
actions.append(Action(action_name, process_vpngsm, actions.append(
[active_connection, False], active=True)) Action(
action_name, process_vpngsm, [active_connection, False], active=True
)
)
else: else:
actions.append(Action(action_name, process_vpngsm, actions.append(Action(action_name, process_vpngsm, [con, True]))
[con, True]))
return actions return actions
def create_wwan_actions(client): def create_wwan_actions(client):
"""Create WWWAN actions """Create WWWAN actions"""
"""
wwan_enabled = client.wwan_get_enabled() wwan_enabled = client.wwan_get_enabled()
wwan_action = "Disable" if wwan_enabled else "Enable" wwan_action = "Disable" if wwan_enabled else "Enable"
return [Action(f"{wwan_action} WWAN", toggle_wwan, not wwan_enabled)] return [Action(f"{wwan_action} WWAN", toggle_wwan, not wwan_enabled)]
def combine_actions(eths, aps, vpns, wgs, gsms, blues, wwan, others, saved): def combine_actions(eths, aps, vlans, vpns, wgs, gsms, blues, wwan, others, saved):
# pylint: disable=too-many-arguments # pylint: disable=too-many-arguments
"""Combine all given actions into a list of actions. """Combine all given actions into a list of actions.
@ -525,10 +592,11 @@ def combine_actions(eths, aps, vpns, wgs, gsms, blues, wwan, others, saved):
others: list of Actions others: list of Actions
""" """
compact = CONF.getboolean("dmenu", "compact", fallback=False) compact = CONF.getboolean("dmenu", "compact", fallback=False)
empty_action = [Action('', None)] if not compact else [] empty_action = [Action("", None)] if not compact else []
all_actions = [] all_actions = []
all_actions += eths + empty_action if eths else [] all_actions += eths + empty_action if eths else []
all_actions += aps + empty_action if aps else [] all_actions += aps + empty_action if aps else []
all_actions += vlans + empty_action if vlans else []
all_actions += vpns + empty_action if vpns else [] all_actions += vpns + empty_action if vpns else []
all_actions += wgs + empty_action if wgs else [] all_actions += wgs + empty_action if wgs else []
all_actions += gsms + empty_action if (gsms and wwan) else [] all_actions += gsms + empty_action if (gsms and wwan) else []
@ -539,38 +607,81 @@ def combine_actions(eths, aps, vpns, wgs, gsms, blues, wwan, others, saved):
return all_actions return all_actions
def get_wofi_highlight_markup(action):
highlight_fg = CONF.get("dmenu", "highlight_fg", fallback=None)
highlight_bg = CONF.get("dmenu", "highlight_bg", fallback=None)
highlight_bold = CONF.getboolean("dmenu", "highlight_bold", fallback=True)
style = ""
if highlight_fg:
style += f'foreground="{highlight_fg}" '
if highlight_bg:
style += f'background="{highlight_bg}" '
if highlight_bold:
style += 'weight="bold" '
return f"<span {style}>" + str(action) + "</span>"
def get_selection(all_actions): def get_selection(all_actions):
"""Spawn dmenu for selection and execute the associated action.""" """Spawn dmenu for selection and execute the associated action."""
rofi_highlight = CONF.getboolean('dmenu', 'rofi_highlight', fallback=False) command = shlex.split(CONF.get("dmenu", "dmenu_command", fallback="dmenu"))
cmd_base = basename(command[0])
active_chars = CONF.get("dmenu", "active_chars", fallback="==")
highlight = CONF.getboolean("dmenu", "highlight", fallback=False)
inp = [] inp = []
if rofi_highlight is True: if highlight is True and cmd_base == "rofi":
inp = [str(action) for action in all_actions] inp = [str(action) for action in all_actions]
elif highlight is True and cmd_base == "wofi":
inp = [
get_wofi_highlight_markup(action) if action.is_active else str(action)
for action in all_actions
]
else: else:
inp = [('== ' if action.is_active else ' ') + str(action) inp = [
for action in all_actions] (active_chars if action.is_active else " " * len(active_chars))
active_lines = [index for index, action in enumerate(all_actions) + " "
if action.is_active] + str(action)
for action in all_actions
]
active_lines = [
index for index, action in enumerate(all_actions) if action.is_active
]
command = dmenu_cmd(len(inp), active_lines=active_lines) command = dmenu_cmd(len(inp), active_lines=active_lines)
sel = subprocess.run(command, sel = subprocess.run(
capture_output=True, command,
check=False, capture_output=True,
input="\n".join(inp), check=False,
encoding=ENC, input="\n".join(inp),
env=ENV).stdout encoding=ENC,
env=ENV,
).stdout
if not sel.rstrip(): if not sel.rstrip():
sys.exit() sys.exit()
if rofi_highlight is False: if highlight is True and cmd_base == "rofi":
action = [i for i in all_actions
if ((str(i).strip() == str(sel.strip())
and not i.is_active) or
('== ' + str(i) == str(sel.rstrip('\n'))
and i.is_active))]
else:
action = [i for i in all_actions if str(i).strip() == sel.strip()] action = [i for i in all_actions if str(i).strip() == sel.strip()]
elif highlight is True and cmd_base == "wofi":
action = [
i
for i in all_actions
if str(i).strip() == sel.strip()
or get_wofi_highlight_markup(i) == sel.strip()
]
else:
action = [
i
for i in all_actions
if (
(str(i).strip() == str(sel.strip()) and not i.is_active)
or (
active_chars + " " + str(i) == str(sel.rstrip("\n")) and i.is_active
)
)
]
if len(action) != 1: if len(action) != 1:
raise ValueError(f"Selection was ambiguous: '{str(sel.strip())}'") raise ValueError(f"Selection was ambiguous: '{str(sel.strip())}'")
return action[0] return action[0]
@ -584,8 +695,17 @@ def toggle_networking(enable):
""" """
toggle = GLib.Variant.new_tuple(GLib.Variant.new_boolean(enable)) toggle = GLib.Variant.new_tuple(GLib.Variant.new_boolean(enable))
try: try:
CLIENT.dbus_call(NM.DBUS_PATH, NM.DBUS_INTERFACE, "Enable", toggle, CLIENT.dbus_call(
None, -1, None, None, None) NM.DBUS_PATH,
NM.DBUS_INTERFACE,
"Enable",
toggle,
None,
-1,
None,
None,
None,
)
except AttributeError: except AttributeError:
# Workaround for older versions of python-gobject # Workaround for older versions of python-gobject
CLIENT.networking_set_enabled(enable) CLIENT.networking_set_enabled(enable)
@ -600,8 +720,16 @@ def toggle_wifi(enable):
""" """
toggle = GLib.Variant.new_boolean(enable) toggle = GLib.Variant.new_boolean(enable)
try: try:
CLIENT.dbus_set_property(NM.DBUS_PATH, NM.DBUS_INTERFACE, "WirelessEnabled", toggle, CLIENT.dbus_set_property(
-1, None, None, None) NM.DBUS_PATH,
NM.DBUS_INTERFACE,
"WirelessEnabled",
toggle,
-1,
None,
None,
None,
)
except AttributeError: except AttributeError:
# Workaround for older versions of python-gobject # Workaround for older versions of python-gobject
CLIENT.wireless_set_enabled(enable) CLIENT.wireless_set_enabled(enable)
@ -616,8 +744,9 @@ def toggle_wwan(enable):
""" """
toggle = GLib.Variant.new_boolean(enable) toggle = GLib.Variant.new_boolean(enable)
try: try:
CLIENT.dbus_set_property(NM.DBUS_PATH, NM.DBUS_INTERFACE, "WwanEnabled", toggle, CLIENT.dbus_set_property(
-1, None, None, None) NM.DBUS_PATH, NM.DBUS_INTERFACE, "WwanEnabled", toggle, -1, None, None, None
)
except AttributeError: except AttributeError:
# Workaround for older versions of python-gobject # Workaround for older versions of python-gobject
CLIENT.wwan_set_enabled(enable) CLIENT.wwan_set_enabled(enable)
@ -627,6 +756,9 @@ def toggle_wwan(enable):
def toggle_bluetooth(enable): def toggle_bluetooth(enable):
"""Enable/disable Bluetooth """Enable/disable Bluetooth
Try bluetoothctl first, then drop to rfkill if it's not installed or
bluetooth service isn't running.
Args: enable - boolean Args: enable - boolean
References: References:
@ -635,40 +767,61 @@ def toggle_bluetooth(enable):
https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/include/uapi/linux/rfkill.h?h=v5.8.9 https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/include/uapi/linux/rfkill.h?h=v5.8.9
""" """
if is_installed("bluetoothctl") and is_running("bluetoothd"):
# Times out in 2 seconds, otherwise bluetoothctl will hang if bluetooth
# service isn't running.
try:
res = subprocess.run(
["bluetoothctl", "power", "on" if enable is True else "off"],
timeout=2,
capture_output=True,
)
except subprocess.TimeoutExpired:
pass
try:
res = subprocess.run(
["bluetoothctl", "show"], timeout=2, capture_output=True, text=True
)
if "Powered: yes" in res.stdout:
notify("Bluetooth enabled")
return
except subprocess.TimeoutExpired:
pass
# Now try using rfkill
type_bluetooth = 2 type_bluetooth = 2
op_change_all = 3 op_change_all = 3
idx = 0 idx = 0
soft_state = 0 if enable else 1 soft_state = 0 if enable else 1
hard_state = 0 hard_state = 0
data = struct.pack("IBBBB", idx, type_bluetooth, op_change_all, data = struct.pack(
soft_state, hard_state) "IBBBB", idx, type_bluetooth, op_change_all, soft_state, hard_state
)
try: try:
with open('/dev/rfkill', 'r+b', buffering=0) as rff: with open("/dev/rfkill", "r+b", buffering=0) as rff:
rff.write(data) rff.write(data)
except PermissionError: except PermissionError:
notify("Lacking permission to write to /dev/rfkill.", notify(
"Check README for configuration options.", "Lacking permission to write to /dev/rfkill.",
urgency="critical") "Check README for configuration options.",
urgency="critical",
)
else: else:
notify(f"Bluetooth {'enabled' if enable else 'disabled'}") notify(f"Bluetooth {'enabled' if enable else 'disabled'}")
def launch_connection_editor(): def launch_connection_editor():
"""Launch nmtui or the gui nm-connection-editor """Launch nmtui or the gui nm-connection-editor"""
terminal = shlex.split(CONF.get("editor", "terminal", fallback="xterm"))
"""
terminal = CONF.get("editor", "terminal", fallback="xterm")
gui_if_available = CONF.getboolean("editor", "gui_if_available", fallback=True) gui_if_available = CONF.getboolean("editor", "gui_if_available", fallback=True)
guis = ["gnome-control-center", "nm-connection-editor"] gui = CONF.get("editor", "gui", fallback="nm-connection-editor")
if gui_if_available is True: if gui_if_available is True:
for gui in guis: if is_installed(gui):
if is_installed(gui): subprocess.run(gui, check=False)
subprocess.run(gui, check=False) return
return
if is_installed("nmtui"): if is_installed("nmtui"):
subprocess.run([terminal, "-e", "nmtui"], check=False) subprocess.run(terminal + ["-e", "nmtui"], check=False)
return return
notify("No network connection editor installed", urgency="critical") notify("No network connection editor installed", urgency="critical")
@ -681,38 +834,46 @@ def get_passphrase():
""" """
pinentry = CONF.get("dmenu", "pinentry", fallback=None) pinentry = CONF.get("dmenu", "pinentry", fallback=None)
if pinentry: if pinentry:
description = CONF.get("pinentry", "description", fallback="Get network password") description = CONF.get(
"pinentry", "description", fallback="Get network password"
)
prompt = CONF.get("pinentry", "prompt", fallback="Password: ") prompt = CONF.get("pinentry", "prompt", fallback="Password: ")
pin = "" pin = ""
out = subprocess.run(pinentry, out = subprocess.run(
capture_output=True, pinentry,
check=False, capture_output=True,
encoding=ENC, check=False,
input=f"setdesc {description}\nsetprompt {prompt}\ngetpin\n").stdout encoding=ENC,
input=f"setdesc {description}\nsetprompt {prompt}\ngetpin\n",
).stdout
if out: if out:
res = [i for i in out.split("\n") if i.startswith("D ")] res = [i for i in out.split("\n") if i.startswith("D ")]
if res and res[0].startswith("D "): if res and res[0].startswith("D "):
pin = res[0].split("D ")[1] pin = res[0].split("D ")[1]
return pin return pin
return subprocess.run(dmenu_cmd(0, "Passphrase"), return subprocess.run(
stdin=subprocess.DEVNULL, dmenu_cmd(0, "Passphrase"),
capture_output=True, stdin=subprocess.DEVNULL,
check=False, capture_output=True,
encoding=ENC).stdout check=False,
encoding=ENC,
).stdout
def delete_connection(): def delete_connection():
"""Display list of NM connections and delete the selected one """Display list of NM connections and delete the selected one"""
conn_acts = [
""" Action(i.get_id(), i.delete_async, args=[None, delete_cb, None]) for i in CONNS
conn_acts = [Action(i.get_id(), i.delete_async, args=[None, delete_cb, None]) for i in CONNS] ]
conn_names = "\n".join([str(i) for i in conn_acts]) conn_names = "\n".join([str(i) for i in conn_acts])
sel = subprocess.run(dmenu_cmd(len(conn_acts), "CHOOSE CONNECTION TO DELETE:"), sel = subprocess.run(
capture_output=True, dmenu_cmd(len(conn_acts), "CHOOSE CONNECTION TO DELETE:"),
check=False, capture_output=True,
input=conn_names, check=False,
encoding=ENC, input=conn_names,
env=ENV).stdout encoding=ENC,
env=ENV,
).stdout
if not sel.strip(): if not sel.strip():
sys.exit() sys.exit()
action = [i for i in conn_acts if str(i) == sel.rstrip("\n")] action = [i for i in conn_acts if str(i) == sel.rstrip("\n")]
@ -723,9 +884,7 @@ def delete_connection():
def delete_cb(dev, res, data): def delete_cb(dev, res, data):
"""Notification if delete completed successfully """Notification if delete completed successfully"""
"""
if dev.delete_finish(res) is True: if dev.delete_finish(res) is True:
notify(f"Deleted {dev.get_id()}") notify(f"Deleted {dev.get_id()}")
else: else:
@ -742,8 +901,9 @@ def set_new_connection(nm_ap, nm_pw, adapter):
""" """
nm_pw = str(nm_pw).strip() nm_pw = str(nm_pw).strip()
profile = create_wifi_profile(nm_ap, nm_pw, adapter) profile = create_wifi_profile(nm_ap, nm_pw, adapter)
CLIENT.add_and_activate_connection_async(profile, adapter, nm_ap.get_path(), CLIENT.add_and_activate_connection_async(
None, verify_conn, profile) profile, adapter, nm_ap.get_path(), None, verify_conn, profile
)
LOOP.run() LOOP.run()
@ -764,8 +924,10 @@ def create_wifi_profile(nm_ap, password, adapter):
s_wifi = NM.SettingWireless.new() s_wifi = NM.SettingWireless.new()
s_wifi.set_property(NM.SETTING_WIRELESS_SSID, nm_ap.get_ssid()) s_wifi.set_property(NM.SETTING_WIRELESS_SSID, nm_ap.get_ssid())
s_wifi.set_property(NM.SETTING_WIRELESS_MODE, 'infrastructure') s_wifi.set_property(NM.SETTING_WIRELESS_MODE, "infrastructure")
s_wifi.set_property(NM.SETTING_WIRELESS_MAC_ADDRESS, adapter.get_permanent_hw_address()) s_wifi.set_property(
NM.SETTING_WIRELESS_MAC_ADDRESS, adapter.get_permanent_hw_address()
)
profile.add_setting(s_wifi) profile.add_setting(s_wifi)
s_ip4 = NM.SettingIP4Config.new() s_ip4 = NM.SettingIP4Config.new()
@ -780,19 +942,18 @@ def create_wifi_profile(nm_ap, password, adapter):
s_wifi_sec = NM.SettingWirelessSecurity.new() s_wifi_sec = NM.SettingWirelessSecurity.new()
if "WPA" in ap_sec: if "WPA" in ap_sec:
if "WPA3" in ap_sec: if "WPA3" in ap_sec:
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, "sae")
"sae")
else: else:
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, s_wifi_sec.set_property(
"wpa-psk") NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, "wpa-psk"
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_AUTH_ALG, )
"open") s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_AUTH_ALG, "open")
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_PSK, password) s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_PSK, password)
elif "WEP" in ap_sec: elif "WEP" in ap_sec:
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, "None")
"None") s_wifi_sec.set_property(
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_WEP_KEY_TYPE, NM.SETTING_WIRELESS_SECURITY_WEP_KEY_TYPE, NM.WepKeyType.PASSPHRASE
NM.WepKeyType.PASSPHRASE) )
s_wifi_sec.set_wep_key(0, password) s_wifi_sec.set_wep_key(0, password)
profile.add_setting(s_wifi_sec) profile.add_setting(s_wifi_sec)
@ -809,16 +970,14 @@ def verify_conn(client, result, data):
try: try:
act_conn = client.add_and_activate_connection_finish(result) act_conn = client.add_and_activate_connection_finish(result)
conn = act_conn.get_connection() conn = act_conn.get_connection()
if not all([conn.verify(), if not all(
conn.verify_secrets(), [conn.verify(), conn.verify_secrets(), data.verify(), data.verify_secrets()]
data.verify(), ):
data.verify_secrets()]):
raise GLib.Error raise GLib.Error
notify(f"Added {conn.get_id()}") notify(f"Added {conn.get_id()}")
except GLib.Error: except GLib.Error:
try: try:
notify(f"Connection to {conn.get_id()} failed", notify(f"Connection to {conn.get_id()} failed", urgency="critical")
urgency="critical")
conn.delete_async(None, None, None) conn.delete_async(None, None, None)
except UnboundLocalError: except UnboundLocalError:
pass pass
@ -841,22 +1000,27 @@ def create_ap_list(adapter, active_connections):
aps = [] aps = []
ap_names = [] ap_names = []
active_ap = adapter.get_active_access_point() active_ap = adapter.get_active_access_point()
aps_all = sorted(adapter.get_access_points(), aps_all = sorted(
key=lambda a: a.get_strength(), reverse=True) adapter.get_access_points(), key=lambda a: a.get_strength(), reverse=True
conns_cur = [i for i in CONNS if )
i.get_setting_wireless() is not None and conns_cur = [
conn_matches_adapter(i, adapter)] i
for i in CONNS
if i.get_setting_wireless() is not None and conn_matches_adapter(i, adapter)
]
try: try:
ap_conns = active_ap.filter_connections(conns_cur) ap_conns = active_ap.filter_connections(conns_cur)
active_ap_name = ssid_to_utf8(active_ap) active_ap_name = ssid_to_utf8(active_ap)
active_ap_con = [active_conn for active_conn in active_connections active_ap_con = [
if active_conn.get_connection() in ap_conns] active_conn
for active_conn in active_connections
if active_conn.get_connection() in ap_conns
]
except AttributeError: except AttributeError:
active_ap_name = None active_ap_name = None
active_ap_con = [] active_ap_con = []
if len(active_ap_con) > 1: if len(active_ap_con) > 1:
raise ValueError("Multiple connection profiles match" raise ValueError("Multiple connection profiles match" " the wireless AP")
" the wireless AP")
active_ap_con = active_ap_con[0] if active_ap_con else None active_ap_con = active_ap_con[0] if active_ap_con else None
for nm_ap in aps_all: for nm_ap in aps_all:
ap_name = ssid_to_utf8(nm_ap) ap_name = ssid_to_utf8(nm_ap)
@ -870,12 +1034,17 @@ def create_ap_list(adapter, active_connections):
def notify(message, details=None, urgency="low"): def notify(message, details=None, urgency="low"):
"""Use notify-send if available for notifications """Use notify-send if available for notifications"""
delay = CONF.getint("nmdm", "rescan_delay", fallback=5)
""" args = [
delay = CONF.getint('nmdm', 'rescan_delay', fallback=5) "-u",
args = ["-u", urgency, "-a", "networkmanager-dmenu", urgency,
"-t", str(delay * 1000), message] "-a",
"networkmanager-dmenu",
"-t",
str(delay * 1000),
message,
]
if details is not None: if details is not None:
args.append(details) args.append(details)
if is_installed("notify-send"): if is_installed("notify-send"):
@ -903,11 +1072,13 @@ def run(): # pylint: disable=too-many-locals
# Workaround for older versions of python-gobject with no wireguard support # Workaround for older versions of python-gobject with no wireguard support
wgs = [] wgs = []
eths = [i for i in CONNS if i.is_type(NM.SETTING_WIRED_SETTING_NAME)] eths = [i for i in CONNS if i.is_type(NM.SETTING_WIRED_SETTING_NAME)]
vlans = [i for i in CONNS if i.is_type(NM.SETTING_VLAN_SETTING_NAME)]
blues = [i for i in CONNS if i.is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)] blues = [i for i in CONNS if i.is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)]
vpn_actions = create_vpn_actions(vpns, active) vpn_actions = create_vpn_actions(vpns, active)
wg_actions = create_wireguard_actions(wgs, active) wg_actions = create_wireguard_actions(wgs, active)
eth_actions = create_eth_actions(eths, active) eth_actions = create_eth_actions(eths, active)
vlan_actions = create_vlan_actions(vlans, active)
blue_actions = create_blue_actions(blues, active) blue_actions = create_blue_actions(blues, active)
other_actions = create_other_actions(CLIENT) other_actions = create_other_actions(CLIENT)
wwan_installed = is_installed("ModemManager") wwan_installed = is_installed("ModemManager")
@ -919,24 +1090,31 @@ def run(): # pylint: disable=too-many-locals
gsm_actions = [] gsm_actions = []
wwan_actions = [] wwan_actions = []
list_saved = CONF.getboolean('dmenu', 'list_saved', fallback=False) list_saved = CONF.getboolean("dmenu", "list_saved", fallback=False)
saved_cons = [i for i in CONNS if i not in vpns + wgs + eths + blues] saved_cons = [i for i in CONNS if i not in vpns + wgs + eths + blues]
if list_saved: if list_saved:
saved_actions = create_saved_actions(saved_cons) saved_actions = create_saved_actions(saved_cons)
else: else:
saved_actions = [Action("Saved connections", prompt_saved, [saved_cons])] saved_actions = [Action("Saved connections", prompt_saved, [saved_cons])]
actions = combine_actions(eth_actions, ap_actions, vpn_actions, wg_actions, actions = combine_actions(
gsm_actions, blue_actions, wwan_actions, eth_actions,
other_actions, saved_actions) ap_actions,
vlan_actions,
vpn_actions,
wg_actions,
gsm_actions,
blue_actions,
wwan_actions,
other_actions,
saved_actions,
)
sel = get_selection(actions) sel = get_selection(actions)
sel() sel()
def main(): def main():
"""Main. Enables script to be re-run after a wifi rescan """Main. Enables script to be re-run after a wifi rescan"""
"""
global CLIENT, CONNS, LOOP # noqa pylint: disable=global-variable-undefined global CLIENT, CONNS, LOOP # noqa pylint: disable=global-variable-undefined
CLIENT = NM.Client.new(None) CLIENT = NM.Client.new(None)
LOOP = GLib.MainLoop() LOOP = GLib.MainLoop()
@ -945,7 +1123,7 @@ def main():
run() run()
if __name__ == '__main__': if __name__ == "__main__":
main() main()
# vim: set et ts=4 sw=4 : # vim: set et ts=4 sw=4 :