diff --git a/.config/networkmanager-dmenu/config.ini b/.config/networkmanager-dmenu/config.ini index cc27551..cb245ed 100644 --- a/.config/networkmanager-dmenu/config.ini +++ b/.config/networkmanager-dmenu/config.ini @@ -1,28 +1,9 @@ [dmenu] -dmenu_command = dmenu -i -bw 2 -W 250 -X 1101 -Y 15 -l 15 -# # Note that dmenu_command can contain arguments as well like: -# # `dmenu_command = rofi -dmenu -i -theme nmdm` -# # `dmenu_command = rofi -dmenu -width 30 -i` -# # `dmenu_command = dmenu -i -l 25 -b -nb #909090 -nf #303030` -# rofi_highlight = # (Default: False) use rofi highlighting instead of '==' -rofi_highlight = True -# compact = # (Default: False). Remove extra spacing from display +dmenu_command = dmenu -i -bw 2 -W 350 -X 1008 -Y 558 -l 8 +active_chars = 󱘖 compact = True -# pinentry = # (Default: None) e.g. `pinentry-gtk` -# wifi_chars = -# wifi_chars = ▂▄▆█ -# list_saved = # (Default: False) list saved connections +wifi_icons = 󰤯󰤟󰤢󰤥󰤨 +format = {icon} {name} [dmenu_passphrase] -# # Uses the -password flag for Rofi, -x for bemenu. For dmenu, sets -nb and -# # -nf to the same color or uses -P if the dmenu password patch is applied -# # https://tools.suckless.org/dmenu/patches/password/ obscure = True -# obscure_color = #222222 - -[editor] -# terminal = -# gui_if_available = (Default: True) - -[nmdm] -# rescan_delay = # (seconds to wait after a wifi rescan before redisplaying the results) diff --git a/.local/bin/networkmanager_dmenu b/.local/bin/networkmanager_dmenu index 8ba393d..73874f4 100755 --- a/.local/bin/networkmanager_dmenu +++ b/.local/bin/networkmanager_dmenu @@ -12,22 +12,24 @@ Add dmenu options and default terminal if desired to ~/.config/networkmanager-dmenu/config.ini """ -import pathlib -import struct + import configparser import locale import os -from os.path import basename, expanduser +import pathlib import shlex -from shutil import which -import sys -from time import sleep -import uuid +import struct import subprocess +import sys +import uuid +from os.path import basename, expanduser +from shutil import which +from time import sleep # pylint: disable=import-error import gi -gi.require_version('NM', '1.0') + +gi.require_version("NM", "1.0") from gi.repository import GLib, NM # noqa pylint: disable=wrong-import-position # pylint: enable=import-error @@ -39,7 +41,7 @@ CONF.read(expanduser("~/.config/networkmanager-dmenu/config.ini")) def cli_args(): - """ Don't override dmenu_cmd function arguments with CLI args. Removes -l + """Don't override dmenu_cmd function arguments with CLI args. Removes -l and -p if those are passed on the command line. Exception: if -l is passed and dmenu_command is not defined, assume that the @@ -49,9 +51,9 @@ def cli_args(): """ args = sys.argv[1:] - cmd = CONF.get('dmenu', 'dmenu_command', fallback=False) + cmd = CONF.get("dmenu", "dmenu_command", fallback=False) if "-l" in args or "-p" in args: - for nope in ['-l', '-p'] if cmd is not False else ['-p']: + for nope in ["-l", "-p"] if cmd is not False else ["-p"]: try: nope_idx = args.index(nope) del args[nope_idx] @@ -70,13 +72,14 @@ def dmenu_pass(command, color): Returns: list or None """ - if command != 'dmenu': + if command != "dmenu": return None try: # Check for dmenu password patch - dm_patch = b'P' in subprocess.run(["dmenu", "-h"], - capture_output=True, - check=False).stderr + dm_patch = ( + b"P" + in subprocess.run(["dmenu", "-h"], capture_output=True, check=False).stderr + ) except FileNotFoundError: dm_patch = False return ["-P"] if dm_patch else ["-nb", color, "-nf", color] @@ -93,36 +96,46 @@ def dmenu_cmd(num_lines, prompt="Networks", active_lines=None): """ # Create command string - commands = {"dmenu": ["-p", str(prompt)], - "rofi": ["-dmenu", "-p", str(prompt)], - "bemenu": ["-p", str(prompt)], - "wofi": ["-p", str(prompt)], - "fuzzel": ["-p", str(prompt), "-l", str(num_lines), "--log-level", "none"]} - command = shlex.split(CONF.get('dmenu', 'dmenu_command', fallback="dmenu")) + commands = { + "dmenu": ["-p", str(prompt)], + "rofi": ["-dmenu", "-p", str(prompt)], + "bemenu": ["-p", str(prompt)], + "wofi": ["-p", str(prompt)], + "fuzzel": ["-p", str(prompt), "--log-level", "none"], + } + command = shlex.split(CONF.get("dmenu", "dmenu_command", fallback="dmenu")) cmd_base = basename(command[0]) command.extend(cli_args()) command.extend(commands.get(cmd_base, [])) - # Rofi Highlighting - rofi_highlight = CONF.getboolean('dmenu', 'rofi_highlight', fallback=False) - if rofi_highlight is True and cmd_base == "rofi" and active_lines: - command.extend(["-a", ",".join([str(num) for num in active_lines])]) + # Highlighting + highlight = CONF.getboolean("dmenu", "highlight", fallback=False) + if highlight is True: + # Rofi + if cmd_base == "rofi" and active_lines: + command.extend(["-a", ",".join([str(num) for num in active_lines])]) + # Wofi + if cmd_base == "wofi" and active_lines: + # add '-q' to prevent tag name and properties of pango markup from searchable + command.extend(["-m", "-q"]) # Passphrase prompts - obscure = CONF.getboolean('dmenu_passphrase', 'obscure', fallback=False) + obscure = CONF.getboolean("dmenu_passphrase", "obscure", fallback=False) if prompt == "Passphrase" and obscure is True: - obscure_color = CONF.get('dmenu_passphrase', 'obscure_color', fallback='#222222') - pass_prompts = {"dmenu": dmenu_pass(cmd_base, obscure_color), - "rofi": ['-password'], - "bemenu": ['-x'], - "wofi": ['-P'], - "fuzzel": ['--password']} + obscure_color = CONF.get( + "dmenu_passphrase", "obscure_color", fallback="#222222" + ) + pass_prompts = { + "dmenu": dmenu_pass(cmd_base, obscure_color), + "rofi": ["-password"], + "bemenu": ["-x"], + "wofi": ["-P"], + "fuzzel": ["--password"], + } command.extend(pass_prompts.get(cmd_base, [])) return command def choose_adapter(client): - """If there is more than one wifi adapter installed, ask which one to use - - """ + """If there is more than one wifi adapter installed, ask which one to use""" devices = client.get_devices() devices = [i for i in devices if i.get_device_type() == NM.DeviceType.WIFI] if not devices: @@ -130,12 +143,14 @@ def choose_adapter(client): if len(devices) == 1: return devices[0] device_names = "\n".join([d.get_iface() for d in devices]) - sel = subprocess.run(dmenu_cmd(len(devices), "CHOOSE ADAPTER:"), - capture_output=True, - check=False, - env=ENV, - input=device_names, - encoding=ENC).stdout + sel = subprocess.run( + dmenu_cmd(len(devices), "CHOOSE ADAPTER:"), + capture_output=True, + check=False, + env=ENV, + input=device_names, + encoding=ENC, + ).stdout if not sel.strip(): sys.exit() devices = [i for i in devices if i.get_iface() == sel.strip()] @@ -149,22 +164,39 @@ def is_installed(cmd): return which(cmd) is not None +def is_running(cmd): + try: + subprocess.check_output(["pidof", cmd]) + except subprocess.CalledProcessError: + return False + else: + return True + + def bluetooth_get_enabled(): - """Check if bluetooth is enabled via rfkill. + """Check if bluetooth is enabled. Try bluetoothctl first, then rfkill. Returns None if no bluetooth device was found. """ + if is_installed("bluetoothctl") and is_running("bluetoothd"): + # Times out in 2 seconds, otherwise bluetoothctl will hang if bluetooth + # service isn't running. + try: + res = subprocess.run( + ["bluetoothctl", "show"], timeout=2, capture_output=True, text=True + ) + return "Powered: yes" in res.stdout + except subprocess.TimeoutExpired: + pass # See https://www.kernel.org/doc/Documentation/ABI/stable/sysfs-class-rfkill - for path in pathlib.Path('/sys/class/rfkill/').glob('rfkill*'): - if (path / 'type').read_text().strip() == 'bluetooth': - return (path / 'soft').read_text().strip() == '0' + for path in pathlib.Path("/sys/class/rfkill/").glob("rfkill*"): + if (path / "type").read_text().strip() == "bluetooth": + return (path / "soft").read_text().strip() == "0" return None def create_other_actions(client): - """Return list of other actions that can be taken - - """ + """Return list of other actions that can be taken""" networking_enabled = client.networking_get_enabled() networking_action = "Disable" if networking_enabled else "Enable" @@ -174,15 +206,22 @@ def create_other_actions(client): bluetooth_enabled = bluetooth_get_enabled() bluetooth_action = "Disable" if bluetooth_enabled else "Enable" - actions = [Action(f"{wifi_action} Wifi", toggle_wifi, - not wifi_enabled), - Action(f"{networking_action} Networking", - toggle_networking, not networking_enabled)] + actions = [ + Action(f"{wifi_action} Wifi", toggle_wifi, not wifi_enabled), + Action( + f"{networking_action} Networking", toggle_networking, not networking_enabled + ), + ] if bluetooth_enabled is not None: - actions.append(Action(f"{bluetooth_action} Bluetooth", - toggle_bluetooth, not bluetooth_enabled)) - actions += [Action("Launch Connection Manager", launch_connection_editor), - Action("Delete a Connection", delete_connection)] + actions.append( + Action( + f"{bluetooth_action} Bluetooth", toggle_bluetooth, not bluetooth_enabled + ) + ) + actions += [ + Action("Launch Connection Manager", launch_connection_editor), + Action("Delete a Connection", delete_connection), + ] if wifi_enabled: actions.append(Action("Rescan Wifi Networks", rescan_wifi)) return actions @@ -192,7 +231,7 @@ def rescan_wifi(): """ Rescan Wifi Access Points """ - delay = CONF.getint('nmdm', 'rescan_delay', fallback=5) + delay = CONF.getint("nmdm", "rescan_delay", fallback=5) for dev in CLIENT.get_devices(): if gi.repository.NM.DeviceWifi == type(dev): try: @@ -209,9 +248,7 @@ def rescan_wifi(): def rescan_cb(dev, res, data): - """Callback for rescan_wifi. Just for notifications - - """ + """Callback for rescan_wifi. Just for notifications""" if dev.request_scan_finish(res) is True: notify("Wifi scan running...") else: @@ -220,7 +257,7 @@ def rescan_cb(dev, res, data): def ssid_to_utf8(nm_ap): - """ Convert binary ssid to utf-8 """ + """Convert binary ssid to utf-8""" ssid = nm_ap.get_ssid() if not ssid: return "" @@ -236,25 +273,30 @@ def prompt_saved(saved_cons): def ap_security(nm_ap): - """Parse the security flags to return a string with 'WPA2', etc. """ + """Parse the security flags to return a string with 'WPA2', etc.""" flags = nm_ap.get_flags() wpa_flags = nm_ap.get_wpa_flags() rsn_flags = nm_ap.get_rsn_flags() sec_str = "" - if ((flags & getattr(NM, '80211ApFlags').PRIVACY) and - (wpa_flags == 0) and (rsn_flags == 0)): + if ( + (flags & getattr(NM, "80211ApFlags").PRIVACY) + and (wpa_flags == 0) + and (rsn_flags == 0) + ): sec_str = " WEP" if wpa_flags: sec_str = " WPA1" - if rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_PSK: + if rsn_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_PSK: sec_str += " WPA2" - if rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_SAE: + if rsn_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_SAE: sec_str += " WPA3" - if ((wpa_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X) or - (rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X)): + if (wpa_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_802_1X) or ( + rsn_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_802_1X + ): sec_str += " 802.1X" - if ((wpa_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_OWE) or - (rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_OWE)): + if (wpa_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_OWE) or ( + rsn_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_OWE + ): sec_str += " OWE" # If there is no security use "--" @@ -263,13 +305,10 @@ def ap_security(nm_ap): return sec_str.lstrip() -class Action(): # pylint: disable=too-few-public-methods +class Action: # pylint: disable=too-few-public-methods """Helper class to execute functions from a string variable""" - def __init__(self, - name, - func, - args=None, - active=False): + + def __init__(self, name, func, args=None, active=False): self.name = name self.func = func self.is_active = active @@ -324,16 +363,19 @@ def process_ap(nm_ap, is_active, adapter): CLIENT.deactivate_connection_async(nm_ap, None, deactivate_cb, nm_ap) LOOP.run() else: - conns_cur = [i for i in CONNS if - i.get_setting_wireless() is not None and - conn_matches_adapter(i, adapter)] + conns_cur = [ + i + for i in CONNS + if i.get_setting_wireless() is not None and conn_matches_adapter(i, adapter) + ] con = nm_ap.filter_connections(conns_cur) if len(con) > 1: raise ValueError("There are multiple connections possible") if len(con) == 1: - CLIENT.activate_connection_async(con[0], adapter, nm_ap.get_path(), - None, activate_cb, nm_ap) + CLIENT.activate_connection_async( + con[0], adapter, nm_ap.get_path(), None, activate_cb, nm_ap + ) LOOP.run() else: if ap_security(nm_ap) != "--": @@ -344,9 +386,7 @@ def process_ap(nm_ap, is_active, adapter): def activate_cb(dev, res, data): - """Notification if activate connection completed successfully - - """ + """Notification if activate connection completed successfully""" try: conn = dev.activate_connection_finish(res) except GLib.Error: @@ -359,9 +399,7 @@ def activate_cb(dev, res, data): def deactivate_cb(dev, res, data): - """Notification if deactivate connection completed successfully - - """ + """Notification if deactivate connection completed successfully""" if dev.deactivate_connection_finish(res) is True: notify(f"Deactivated {data.get_id()}") else: @@ -372,17 +410,17 @@ def deactivate_cb(dev, res, data): def process_vpngsm(con, activate): """Activate/deactive VPN or GSM connections""" if activate: - CLIENT.activate_connection_async(con, None, None, - None, activate_cb, con) + CLIENT.activate_connection_async(con, None, None, None, activate_cb, con) else: CLIENT.deactivate_connection_async(con, None, deactivate_cb, con) LOOP.run() + def strength_bars(signal_strength): bars = NM.utils_wifi_strength_bars(signal_strength) wifi_chars = CONF.get("dmenu", "wifi_chars", fallback=False) if wifi_chars: - bars = "".join([wifi_chars[i] for i, j in enumerate(bars) if j == '*']) + bars = "".join([wifi_chars[i] for i, j in enumerate(bars) if j == "*"]) return bars @@ -410,22 +448,37 @@ def create_ap_actions(aps, active_ap, active_connection, adapter): # noqa pylin if CONF.getboolean("dmenu", "compact", fallback=False): format = CONF.get("dmenu", "format", fallback="{name} {sec} {bars}") else: - format = CONF.get("dmenu", "format", fallback="{name:<{max_len_name}s} {sec:<{max_len_sec}s} {bars:>4}") + format = CONF.get( + "dmenu", + "format", + fallback="{name:<{max_len_name}s} {sec:<{max_len_sec}s} {bars:>4}", + ) for nm_ap, name, sec in zip(aps, names, secs): is_active = nm_ap.get_bssid() == active_ap_bssid signal_strength = nm_ap.get_strength() bars = strength_bars(signal_strength) icon = strength_icon(signal_strength) - action_name = format.format(name=name, sec=sec, signal=signal_strength, bars=bars, icon=icon, - max_len_name=max_len_name, max_len_sec=max_len_sec) + action_name = format.format( + name=name, + sec=sec, + signal=signal_strength, + bars=bars, + icon=icon, + max_len_name=max_len_name, + max_len_sec=max_len_sec, + ) if is_active: - ap_actions.append(Action(action_name, process_ap, - [active_connection, True, adapter], - active=True)) + ap_actions.append( + Action( + action_name, + process_ap, + [active_connection, True, adapter], + active=True, + ) + ) else: - ap_actions.append(Action(action_name, process_ap, - [nm_ap, False, adapter])) + ap_actions.append(Action(action_name, process_ap, [nm_ap, False, adapter])) return ap_actions @@ -438,6 +491,15 @@ def create_vpn_actions(vpns, active): return _create_vpngsm_actions(vpns, active_vpns, "VPN") +def create_vlan_actions(vlans, active): + """Create the list of strings to display with associated function + (activate/deactivate) for VLAN connections. + + """ + active_vlans = [i for i in active if "vlan" == i.get_connection_type()] + return _create_vpngsm_actions(vlans, active_vlans, "VLAN") + + def create_wireguard_actions(wgs, active): """Create the list of strings to display with associated function (activate/deactivate) for Wireguard connections. @@ -452,25 +514,31 @@ def create_eth_actions(eths, active): (activate/deactivate) for Ethernet connections. """ - active_eths = [i for i in active if 'ethernet' in i.get_connection_type()] + active_eths = [i for i in active if "ethernet" in i.get_connection_type()] return _create_vpngsm_actions(eths, active_eths, "Eth") def create_gsm_actions(gsms, active): """Create the list of strings to display with associated function (activate/deactivate) GSM connections.""" - active_gsms = [i for i in active if - i.get_connection() is not None and - i.get_connection().is_type(NM.SETTING_GSM_SETTING_NAME)] + active_gsms = [ + i + for i in active + if i.get_connection() is not None + and i.get_connection().is_type(NM.SETTING_GSM_SETTING_NAME) + ] return _create_vpngsm_actions(gsms, active_gsms, "GSM") def create_blue_actions(blues, active): """Create the list of strings to display with associated function (activate/deactivate) Bluetooth connections.""" - active_blues = [i for i in active if - i.get_connection() is not None and - i.get_connection().is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)] + active_blues = [ + i + for i in active + if i.get_connection() is not None + and i.get_connection().is_type(NM.SETTING_BLUETOOTH_SETTING_NAME) + ] return _create_vpngsm_actions(blues, active_blues, "Bluetooth") @@ -489,30 +557,29 @@ def _create_vpngsm_actions(cons, active_cons, label): is_active = con.get_id() in active_con_ids action_name = f"{con.get_id()}:{label}" if is_active: - active_connection = [a for a in active_cons - if a.get_id() == con.get_id()] + active_connection = [a for a in active_cons if a.get_id() == con.get_id()] if len(active_connection) != 1: raise ValueError(f"Multiple active connections match {con.get_id()}") active_connection = active_connection[0] - actions.append(Action(action_name, process_vpngsm, - [active_connection, False], active=True)) + actions.append( + Action( + action_name, process_vpngsm, [active_connection, False], active=True + ) + ) else: - actions.append(Action(action_name, process_vpngsm, - [con, True])) + actions.append(Action(action_name, process_vpngsm, [con, True])) return actions def create_wwan_actions(client): - """Create WWWAN actions - - """ + """Create WWWAN actions""" wwan_enabled = client.wwan_get_enabled() wwan_action = "Disable" if wwan_enabled else "Enable" return [Action(f"{wwan_action} WWAN", toggle_wwan, not wwan_enabled)] -def combine_actions(eths, aps, vpns, wgs, gsms, blues, wwan, others, saved): +def combine_actions(eths, aps, vlans, vpns, wgs, gsms, blues, wwan, others, saved): # pylint: disable=too-many-arguments """Combine all given actions into a list of actions. @@ -525,10 +592,11 @@ def combine_actions(eths, aps, vpns, wgs, gsms, blues, wwan, others, saved): others: list of Actions """ compact = CONF.getboolean("dmenu", "compact", fallback=False) - empty_action = [Action('', None)] if not compact else [] + empty_action = [Action("", None)] if not compact else [] all_actions = [] all_actions += eths + empty_action if eths else [] all_actions += aps + empty_action if aps else [] + all_actions += vlans + empty_action if vlans else [] all_actions += vpns + empty_action if vpns else [] all_actions += wgs + empty_action if wgs else [] all_actions += gsms + empty_action if (gsms and wwan) else [] @@ -539,38 +607,81 @@ def combine_actions(eths, aps, vpns, wgs, gsms, blues, wwan, others, saved): return all_actions +def get_wofi_highlight_markup(action): + highlight_fg = CONF.get("dmenu", "highlight_fg", fallback=None) + highlight_bg = CONF.get("dmenu", "highlight_bg", fallback=None) + highlight_bold = CONF.getboolean("dmenu", "highlight_bold", fallback=True) + + style = "" + if highlight_fg: + style += f'foreground="{highlight_fg}" ' + if highlight_bg: + style += f'background="{highlight_bg}" ' + if highlight_bold: + style += 'weight="bold" ' + + return f"" + str(action) + "" + + def get_selection(all_actions): """Spawn dmenu for selection and execute the associated action.""" - rofi_highlight = CONF.getboolean('dmenu', 'rofi_highlight', fallback=False) + command = shlex.split(CONF.get("dmenu", "dmenu_command", fallback="dmenu")) + cmd_base = basename(command[0]) + active_chars = CONF.get("dmenu", "active_chars", fallback="==") + highlight = CONF.getboolean("dmenu", "highlight", fallback=False) inp = [] - if rofi_highlight is True: + if highlight is True and cmd_base == "rofi": inp = [str(action) for action in all_actions] + elif highlight is True and cmd_base == "wofi": + inp = [ + get_wofi_highlight_markup(action) if action.is_active else str(action) + for action in all_actions + ] else: - inp = [('== ' if action.is_active else ' ') + str(action) - for action in all_actions] - active_lines = [index for index, action in enumerate(all_actions) - if action.is_active] + inp = [ + (active_chars if action.is_active else " " * len(active_chars)) + + " " + + str(action) + for action in all_actions + ] + active_lines = [ + index for index, action in enumerate(all_actions) if action.is_active + ] command = dmenu_cmd(len(inp), active_lines=active_lines) - sel = subprocess.run(command, - capture_output=True, - check=False, - input="\n".join(inp), - encoding=ENC, - env=ENV).stdout + sel = subprocess.run( + command, + capture_output=True, + check=False, + input="\n".join(inp), + encoding=ENC, + env=ENV, + ).stdout if not sel.rstrip(): sys.exit() - if rofi_highlight is False: - action = [i for i in all_actions - if ((str(i).strip() == str(sel.strip()) - and not i.is_active) or - ('== ' + str(i) == str(sel.rstrip('\n')) - and i.is_active))] - else: + if highlight is True and cmd_base == "rofi": action = [i for i in all_actions if str(i).strip() == sel.strip()] + elif highlight is True and cmd_base == "wofi": + action = [ + i + for i in all_actions + if str(i).strip() == sel.strip() + or get_wofi_highlight_markup(i) == sel.strip() + ] + else: + action = [ + i + for i in all_actions + if ( + (str(i).strip() == str(sel.strip()) and not i.is_active) + or ( + active_chars + " " + str(i) == str(sel.rstrip("\n")) and i.is_active + ) + ) + ] if len(action) != 1: raise ValueError(f"Selection was ambiguous: '{str(sel.strip())}'") return action[0] @@ -584,8 +695,17 @@ def toggle_networking(enable): """ toggle = GLib.Variant.new_tuple(GLib.Variant.new_boolean(enable)) try: - CLIENT.dbus_call(NM.DBUS_PATH, NM.DBUS_INTERFACE, "Enable", toggle, - None, -1, None, None, None) + CLIENT.dbus_call( + NM.DBUS_PATH, + NM.DBUS_INTERFACE, + "Enable", + toggle, + None, + -1, + None, + None, + None, + ) except AttributeError: # Workaround for older versions of python-gobject CLIENT.networking_set_enabled(enable) @@ -600,8 +720,16 @@ def toggle_wifi(enable): """ toggle = GLib.Variant.new_boolean(enable) try: - CLIENT.dbus_set_property(NM.DBUS_PATH, NM.DBUS_INTERFACE, "WirelessEnabled", toggle, - -1, None, None, None) + CLIENT.dbus_set_property( + NM.DBUS_PATH, + NM.DBUS_INTERFACE, + "WirelessEnabled", + toggle, + -1, + None, + None, + None, + ) except AttributeError: # Workaround for older versions of python-gobject CLIENT.wireless_set_enabled(enable) @@ -616,8 +744,9 @@ def toggle_wwan(enable): """ toggle = GLib.Variant.new_boolean(enable) try: - CLIENT.dbus_set_property(NM.DBUS_PATH, NM.DBUS_INTERFACE, "WwanEnabled", toggle, - -1, None, None, None) + CLIENT.dbus_set_property( + NM.DBUS_PATH, NM.DBUS_INTERFACE, "WwanEnabled", toggle, -1, None, None, None + ) except AttributeError: # Workaround for older versions of python-gobject CLIENT.wwan_set_enabled(enable) @@ -627,6 +756,9 @@ def toggle_wwan(enable): def toggle_bluetooth(enable): """Enable/disable Bluetooth + Try bluetoothctl first, then drop to rfkill if it's not installed or + bluetooth service isn't running. + Args: enable - boolean References: @@ -635,40 +767,61 @@ def toggle_bluetooth(enable): https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/include/uapi/linux/rfkill.h?h=v5.8.9 """ + if is_installed("bluetoothctl") and is_running("bluetoothd"): + # Times out in 2 seconds, otherwise bluetoothctl will hang if bluetooth + # service isn't running. + try: + res = subprocess.run( + ["bluetoothctl", "power", "on" if enable is True else "off"], + timeout=2, + capture_output=True, + ) + except subprocess.TimeoutExpired: + pass + try: + res = subprocess.run( + ["bluetoothctl", "show"], timeout=2, capture_output=True, text=True + ) + if "Powered: yes" in res.stdout: + notify("Bluetooth enabled") + return + except subprocess.TimeoutExpired: + pass + # Now try using rfkill type_bluetooth = 2 op_change_all = 3 idx = 0 soft_state = 0 if enable else 1 hard_state = 0 - data = struct.pack("IBBBB", idx, type_bluetooth, op_change_all, - soft_state, hard_state) + data = struct.pack( + "IBBBB", idx, type_bluetooth, op_change_all, soft_state, hard_state + ) try: - with open('/dev/rfkill', 'r+b', buffering=0) as rff: + with open("/dev/rfkill", "r+b", buffering=0) as rff: rff.write(data) except PermissionError: - notify("Lacking permission to write to /dev/rfkill.", - "Check README for configuration options.", - urgency="critical") + notify( + "Lacking permission to write to /dev/rfkill.", + "Check README for configuration options.", + urgency="critical", + ) else: notify(f"Bluetooth {'enabled' if enable else 'disabled'}") def launch_connection_editor(): - """Launch nmtui or the gui nm-connection-editor - - """ - terminal = CONF.get("editor", "terminal", fallback="xterm") + """Launch nmtui or the gui nm-connection-editor""" + terminal = shlex.split(CONF.get("editor", "terminal", fallback="xterm")) gui_if_available = CONF.getboolean("editor", "gui_if_available", fallback=True) - guis = ["gnome-control-center", "nm-connection-editor"] + gui = CONF.get("editor", "gui", fallback="nm-connection-editor") if gui_if_available is True: - for gui in guis: - if is_installed(gui): - subprocess.run(gui, check=False) - return + if is_installed(gui): + subprocess.run(gui, check=False) + return if is_installed("nmtui"): - subprocess.run([terminal, "-e", "nmtui"], check=False) + subprocess.run(terminal + ["-e", "nmtui"], check=False) return notify("No network connection editor installed", urgency="critical") @@ -681,38 +834,46 @@ def get_passphrase(): """ pinentry = CONF.get("dmenu", "pinentry", fallback=None) if pinentry: - description = CONF.get("pinentry", "description", fallback="Get network password") + description = CONF.get( + "pinentry", "description", fallback="Get network password" + ) prompt = CONF.get("pinentry", "prompt", fallback="Password: ") pin = "" - out = subprocess.run(pinentry, - capture_output=True, - check=False, - encoding=ENC, - input=f"setdesc {description}\nsetprompt {prompt}\ngetpin\n").stdout + out = subprocess.run( + pinentry, + capture_output=True, + check=False, + encoding=ENC, + input=f"setdesc {description}\nsetprompt {prompt}\ngetpin\n", + ).stdout if out: res = [i for i in out.split("\n") if i.startswith("D ")] if res and res[0].startswith("D "): pin = res[0].split("D ")[1] return pin - return subprocess.run(dmenu_cmd(0, "Passphrase"), - stdin=subprocess.DEVNULL, - capture_output=True, - check=False, - encoding=ENC).stdout + return subprocess.run( + dmenu_cmd(0, "Passphrase"), + stdin=subprocess.DEVNULL, + capture_output=True, + check=False, + encoding=ENC, + ).stdout def delete_connection(): - """Display list of NM connections and delete the selected one - - """ - conn_acts = [Action(i.get_id(), i.delete_async, args=[None, delete_cb, None]) for i in CONNS] + """Display list of NM connections and delete the selected one""" + conn_acts = [ + Action(i.get_id(), i.delete_async, args=[None, delete_cb, None]) for i in CONNS + ] conn_names = "\n".join([str(i) for i in conn_acts]) - sel = subprocess.run(dmenu_cmd(len(conn_acts), "CHOOSE CONNECTION TO DELETE:"), - capture_output=True, - check=False, - input=conn_names, - encoding=ENC, - env=ENV).stdout + sel = subprocess.run( + dmenu_cmd(len(conn_acts), "CHOOSE CONNECTION TO DELETE:"), + capture_output=True, + check=False, + input=conn_names, + encoding=ENC, + env=ENV, + ).stdout if not sel.strip(): sys.exit() action = [i for i in conn_acts if str(i) == sel.rstrip("\n")] @@ -723,9 +884,7 @@ def delete_connection(): def delete_cb(dev, res, data): - """Notification if delete completed successfully - - """ + """Notification if delete completed successfully""" if dev.delete_finish(res) is True: notify(f"Deleted {dev.get_id()}") else: @@ -742,8 +901,9 @@ def set_new_connection(nm_ap, nm_pw, adapter): """ nm_pw = str(nm_pw).strip() profile = create_wifi_profile(nm_ap, nm_pw, adapter) - CLIENT.add_and_activate_connection_async(profile, adapter, nm_ap.get_path(), - None, verify_conn, profile) + CLIENT.add_and_activate_connection_async( + profile, adapter, nm_ap.get_path(), None, verify_conn, profile + ) LOOP.run() @@ -764,8 +924,10 @@ def create_wifi_profile(nm_ap, password, adapter): s_wifi = NM.SettingWireless.new() s_wifi.set_property(NM.SETTING_WIRELESS_SSID, nm_ap.get_ssid()) - s_wifi.set_property(NM.SETTING_WIRELESS_MODE, 'infrastructure') - s_wifi.set_property(NM.SETTING_WIRELESS_MAC_ADDRESS, adapter.get_permanent_hw_address()) + s_wifi.set_property(NM.SETTING_WIRELESS_MODE, "infrastructure") + s_wifi.set_property( + NM.SETTING_WIRELESS_MAC_ADDRESS, adapter.get_permanent_hw_address() + ) profile.add_setting(s_wifi) s_ip4 = NM.SettingIP4Config.new() @@ -780,19 +942,18 @@ def create_wifi_profile(nm_ap, password, adapter): s_wifi_sec = NM.SettingWirelessSecurity.new() if "WPA" in ap_sec: if "WPA3" in ap_sec: - s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, - "sae") + s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, "sae") else: - s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, - "wpa-psk") - s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_AUTH_ALG, - "open") + s_wifi_sec.set_property( + NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, "wpa-psk" + ) + s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_AUTH_ALG, "open") s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_PSK, password) elif "WEP" in ap_sec: - s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, - "None") - s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_WEP_KEY_TYPE, - NM.WepKeyType.PASSPHRASE) + s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, "None") + s_wifi_sec.set_property( + NM.SETTING_WIRELESS_SECURITY_WEP_KEY_TYPE, NM.WepKeyType.PASSPHRASE + ) s_wifi_sec.set_wep_key(0, password) profile.add_setting(s_wifi_sec) @@ -809,16 +970,14 @@ def verify_conn(client, result, data): try: act_conn = client.add_and_activate_connection_finish(result) conn = act_conn.get_connection() - if not all([conn.verify(), - conn.verify_secrets(), - data.verify(), - data.verify_secrets()]): + if not all( + [conn.verify(), conn.verify_secrets(), data.verify(), data.verify_secrets()] + ): raise GLib.Error notify(f"Added {conn.get_id()}") except GLib.Error: try: - notify(f"Connection to {conn.get_id()} failed", - urgency="critical") + notify(f"Connection to {conn.get_id()} failed", urgency="critical") conn.delete_async(None, None, None) except UnboundLocalError: pass @@ -841,22 +1000,27 @@ def create_ap_list(adapter, active_connections): aps = [] ap_names = [] active_ap = adapter.get_active_access_point() - aps_all = sorted(adapter.get_access_points(), - key=lambda a: a.get_strength(), reverse=True) - conns_cur = [i for i in CONNS if - i.get_setting_wireless() is not None and - conn_matches_adapter(i, adapter)] + aps_all = sorted( + adapter.get_access_points(), key=lambda a: a.get_strength(), reverse=True + ) + conns_cur = [ + i + for i in CONNS + if i.get_setting_wireless() is not None and conn_matches_adapter(i, adapter) + ] try: ap_conns = active_ap.filter_connections(conns_cur) active_ap_name = ssid_to_utf8(active_ap) - active_ap_con = [active_conn for active_conn in active_connections - if active_conn.get_connection() in ap_conns] + active_ap_con = [ + active_conn + for active_conn in active_connections + if active_conn.get_connection() in ap_conns + ] except AttributeError: active_ap_name = None active_ap_con = [] if len(active_ap_con) > 1: - raise ValueError("Multiple connection profiles match" - " the wireless AP") + raise ValueError("Multiple connection profiles match" " the wireless AP") active_ap_con = active_ap_con[0] if active_ap_con else None for nm_ap in aps_all: ap_name = ssid_to_utf8(nm_ap) @@ -870,12 +1034,17 @@ def create_ap_list(adapter, active_connections): def notify(message, details=None, urgency="low"): - """Use notify-send if available for notifications - - """ - delay = CONF.getint('nmdm', 'rescan_delay', fallback=5) - args = ["-u", urgency, "-a", "networkmanager-dmenu", - "-t", str(delay * 1000), message] + """Use notify-send if available for notifications""" + delay = CONF.getint("nmdm", "rescan_delay", fallback=5) + args = [ + "-u", + urgency, + "-a", + "networkmanager-dmenu", + "-t", + str(delay * 1000), + message, + ] if details is not None: args.append(details) if is_installed("notify-send"): @@ -903,11 +1072,13 @@ def run(): # pylint: disable=too-many-locals # Workaround for older versions of python-gobject with no wireguard support wgs = [] eths = [i for i in CONNS if i.is_type(NM.SETTING_WIRED_SETTING_NAME)] + vlans = [i for i in CONNS if i.is_type(NM.SETTING_VLAN_SETTING_NAME)] blues = [i for i in CONNS if i.is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)] vpn_actions = create_vpn_actions(vpns, active) wg_actions = create_wireguard_actions(wgs, active) eth_actions = create_eth_actions(eths, active) + vlan_actions = create_vlan_actions(vlans, active) blue_actions = create_blue_actions(blues, active) other_actions = create_other_actions(CLIENT) wwan_installed = is_installed("ModemManager") @@ -919,24 +1090,31 @@ def run(): # pylint: disable=too-many-locals gsm_actions = [] wwan_actions = [] - list_saved = CONF.getboolean('dmenu', 'list_saved', fallback=False) + list_saved = CONF.getboolean("dmenu", "list_saved", fallback=False) saved_cons = [i for i in CONNS if i not in vpns + wgs + eths + blues] if list_saved: saved_actions = create_saved_actions(saved_cons) else: saved_actions = [Action("Saved connections", prompt_saved, [saved_cons])] - actions = combine_actions(eth_actions, ap_actions, vpn_actions, wg_actions, - gsm_actions, blue_actions, wwan_actions, - other_actions, saved_actions) + actions = combine_actions( + eth_actions, + ap_actions, + vlan_actions, + vpn_actions, + wg_actions, + gsm_actions, + blue_actions, + wwan_actions, + other_actions, + saved_actions, + ) sel = get_selection(actions) sel() def main(): - """Main. Enables script to be re-run after a wifi rescan - - """ + """Main. Enables script to be re-run after a wifi rescan""" global CLIENT, CONNS, LOOP # noqa pylint: disable=global-variable-undefined CLIENT = NM.Client.new(None) LOOP = GLib.MainLoop() @@ -945,7 +1123,7 @@ def main(): run() -if __name__ == '__main__': +if __name__ == "__main__": main() # vim: set et ts=4 sw=4 :