|
|
|
@ -12,24 +12,22 @@ Add dmenu options and default terminal if desired to
|
|
|
|
|
~/.config/networkmanager-dmenu/config.ini
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import pathlib
|
|
|
|
|
import struct
|
|
|
|
|
import configparser
|
|
|
|
|
import locale
|
|
|
|
|
import os
|
|
|
|
|
import pathlib
|
|
|
|
|
import shlex
|
|
|
|
|
import struct
|
|
|
|
|
import subprocess
|
|
|
|
|
import sys
|
|
|
|
|
import uuid
|
|
|
|
|
from os.path import basename, expanduser
|
|
|
|
|
import shlex
|
|
|
|
|
from shutil import which
|
|
|
|
|
import sys
|
|
|
|
|
from time import sleep
|
|
|
|
|
import uuid
|
|
|
|
|
import subprocess
|
|
|
|
|
|
|
|
|
|
# pylint: disable=import-error
|
|
|
|
|
import gi
|
|
|
|
|
|
|
|
|
|
gi.require_version("NM", "1.0")
|
|
|
|
|
gi.require_version('NM', '1.0')
|
|
|
|
|
from gi.repository import GLib, NM # noqa pylint: disable=wrong-import-position
|
|
|
|
|
# pylint: enable=import-error
|
|
|
|
|
|
|
|
|
@ -41,7 +39,7 @@ CONF.read(expanduser("~/.config/networkmanager-dmenu/config.ini"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def cli_args():
|
|
|
|
|
"""Don't override dmenu_cmd function arguments with CLI args. Removes -l
|
|
|
|
|
""" Don't override dmenu_cmd function arguments with CLI args. Removes -l
|
|
|
|
|
and -p if those are passed on the command line.
|
|
|
|
|
|
|
|
|
|
Exception: if -l is passed and dmenu_command is not defined, assume that the
|
|
|
|
@ -51,9 +49,9 @@ def cli_args():
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
args = sys.argv[1:]
|
|
|
|
|
cmd = CONF.get("dmenu", "dmenu_command", fallback=False)
|
|
|
|
|
cmd = CONF.get('dmenu', 'dmenu_command', fallback=False)
|
|
|
|
|
if "-l" in args or "-p" in args:
|
|
|
|
|
for nope in ["-l", "-p"] if cmd is not False else ["-p"]:
|
|
|
|
|
for nope in ['-l', '-p'] if cmd is not False else ['-p']:
|
|
|
|
|
try:
|
|
|
|
|
nope_idx = args.index(nope)
|
|
|
|
|
del args[nope_idx]
|
|
|
|
@ -72,14 +70,13 @@ def dmenu_pass(command, color):
|
|
|
|
|
Returns: list or None
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
if command != "dmenu":
|
|
|
|
|
if command != 'dmenu':
|
|
|
|
|
return None
|
|
|
|
|
try:
|
|
|
|
|
# Check for dmenu password patch
|
|
|
|
|
dm_patch = (
|
|
|
|
|
b"P"
|
|
|
|
|
in subprocess.run(["dmenu", "-h"], capture_output=True, check=False).stderr
|
|
|
|
|
)
|
|
|
|
|
dm_patch = b'P' in subprocess.run(["dmenu", "-h"],
|
|
|
|
|
capture_output=True,
|
|
|
|
|
check=False).stderr
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
dm_patch = False
|
|
|
|
|
return ["-P"] if dm_patch else ["-nb", color, "-nf", color]
|
|
|
|
@ -96,46 +93,36 @@ def dmenu_cmd(num_lines, prompt="Networks", active_lines=None):
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
# Create command string
|
|
|
|
|
commands = {
|
|
|
|
|
"dmenu": ["-p", str(prompt)],
|
|
|
|
|
commands = {"dmenu": ["-p", str(prompt)],
|
|
|
|
|
"rofi": ["-dmenu", "-p", str(prompt)],
|
|
|
|
|
"bemenu": ["-p", str(prompt)],
|
|
|
|
|
"wofi": ["-p", str(prompt)],
|
|
|
|
|
"fuzzel": ["-p", str(prompt), "--log-level", "none"],
|
|
|
|
|
}
|
|
|
|
|
command = shlex.split(CONF.get("dmenu", "dmenu_command", fallback="dmenu"))
|
|
|
|
|
"fuzzel": ["-p", str(prompt), "-l", str(num_lines), "--log-level", "none"]}
|
|
|
|
|
command = shlex.split(CONF.get('dmenu', 'dmenu_command', fallback="dmenu"))
|
|
|
|
|
cmd_base = basename(command[0])
|
|
|
|
|
command.extend(cli_args())
|
|
|
|
|
command.extend(commands.get(cmd_base, []))
|
|
|
|
|
# Highlighting
|
|
|
|
|
highlight = CONF.getboolean("dmenu", "highlight", fallback=False)
|
|
|
|
|
if highlight is True:
|
|
|
|
|
# Rofi
|
|
|
|
|
if cmd_base == "rofi" and active_lines:
|
|
|
|
|
# Rofi Highlighting
|
|
|
|
|
rofi_highlight = CONF.getboolean('dmenu', 'rofi_highlight', fallback=False)
|
|
|
|
|
if rofi_highlight is True and cmd_base == "rofi" and active_lines:
|
|
|
|
|
command.extend(["-a", ",".join([str(num) for num in active_lines])])
|
|
|
|
|
# Wofi
|
|
|
|
|
if cmd_base == "wofi" and active_lines:
|
|
|
|
|
# add '-q' to prevent tag name and properties of pango markup from searchable
|
|
|
|
|
command.extend(["-m", "-q"])
|
|
|
|
|
# Passphrase prompts
|
|
|
|
|
obscure = CONF.getboolean("dmenu_passphrase", "obscure", fallback=False)
|
|
|
|
|
obscure = CONF.getboolean('dmenu_passphrase', 'obscure', fallback=False)
|
|
|
|
|
if prompt == "Passphrase" and obscure is True:
|
|
|
|
|
obscure_color = CONF.get(
|
|
|
|
|
"dmenu_passphrase", "obscure_color", fallback="#222222"
|
|
|
|
|
)
|
|
|
|
|
pass_prompts = {
|
|
|
|
|
"dmenu": dmenu_pass(cmd_base, obscure_color),
|
|
|
|
|
"rofi": ["-password"],
|
|
|
|
|
"bemenu": ["-x"],
|
|
|
|
|
"wofi": ["-P"],
|
|
|
|
|
"fuzzel": ["--password"],
|
|
|
|
|
}
|
|
|
|
|
obscure_color = CONF.get('dmenu_passphrase', 'obscure_color', fallback='#222222')
|
|
|
|
|
pass_prompts = {"dmenu": dmenu_pass(cmd_base, obscure_color),
|
|
|
|
|
"rofi": ['-password'],
|
|
|
|
|
"bemenu": ['-x'],
|
|
|
|
|
"wofi": ['-P'],
|
|
|
|
|
"fuzzel": ['--password']}
|
|
|
|
|
command.extend(pass_prompts.get(cmd_base, []))
|
|
|
|
|
return command
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def choose_adapter(client):
|
|
|
|
|
"""If there is more than one wifi adapter installed, ask which one to use"""
|
|
|
|
|
"""If there is more than one wifi adapter installed, ask which one to use
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
devices = client.get_devices()
|
|
|
|
|
devices = [i for i in devices if i.get_device_type() == NM.DeviceType.WIFI]
|
|
|
|
|
if not devices:
|
|
|
|
@ -143,14 +130,12 @@ def choose_adapter(client):
|
|
|
|
|
if len(devices) == 1:
|
|
|
|
|
return devices[0]
|
|
|
|
|
device_names = "\n".join([d.get_iface() for d in devices])
|
|
|
|
|
sel = subprocess.run(
|
|
|
|
|
dmenu_cmd(len(devices), "CHOOSE ADAPTER:"),
|
|
|
|
|
sel = subprocess.run(dmenu_cmd(len(devices), "CHOOSE ADAPTER:"),
|
|
|
|
|
capture_output=True,
|
|
|
|
|
check=False,
|
|
|
|
|
env=ENV,
|
|
|
|
|
input=device_names,
|
|
|
|
|
encoding=ENC,
|
|
|
|
|
).stdout
|
|
|
|
|
encoding=ENC).stdout
|
|
|
|
|
if not sel.strip():
|
|
|
|
|
sys.exit()
|
|
|
|
|
devices = [i for i in devices if i.get_iface() == sel.strip()]
|
|
|
|
@ -164,39 +149,22 @@ def is_installed(cmd):
|
|
|
|
|
return which(cmd) is not None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_running(cmd):
|
|
|
|
|
try:
|
|
|
|
|
subprocess.check_output(["pidof", cmd])
|
|
|
|
|
except subprocess.CalledProcessError:
|
|
|
|
|
return False
|
|
|
|
|
else:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def bluetooth_get_enabled():
|
|
|
|
|
"""Check if bluetooth is enabled. Try bluetoothctl first, then rfkill.
|
|
|
|
|
"""Check if bluetooth is enabled via rfkill.
|
|
|
|
|
|
|
|
|
|
Returns None if no bluetooth device was found.
|
|
|
|
|
"""
|
|
|
|
|
if is_installed("bluetoothctl") and is_running("bluetoothd"):
|
|
|
|
|
# Times out in 2 seconds, otherwise bluetoothctl will hang if bluetooth
|
|
|
|
|
# service isn't running.
|
|
|
|
|
try:
|
|
|
|
|
res = subprocess.run(
|
|
|
|
|
["bluetoothctl", "show"], timeout=2, capture_output=True, text=True
|
|
|
|
|
)
|
|
|
|
|
return "Powered: yes" in res.stdout
|
|
|
|
|
except subprocess.TimeoutExpired:
|
|
|
|
|
pass
|
|
|
|
|
# See https://www.kernel.org/doc/Documentation/ABI/stable/sysfs-class-rfkill
|
|
|
|
|
for path in pathlib.Path("/sys/class/rfkill/").glob("rfkill*"):
|
|
|
|
|
if (path / "type").read_text().strip() == "bluetooth":
|
|
|
|
|
return (path / "soft").read_text().strip() == "0"
|
|
|
|
|
for path in pathlib.Path('/sys/class/rfkill/').glob('rfkill*'):
|
|
|
|
|
if (path / 'type').read_text().strip() == 'bluetooth':
|
|
|
|
|
return (path / 'soft').read_text().strip() == '0'
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_other_actions(client):
|
|
|
|
|
"""Return list of other actions that can be taken"""
|
|
|
|
|
"""Return list of other actions that can be taken
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
networking_enabled = client.networking_get_enabled()
|
|
|
|
|
networking_action = "Disable" if networking_enabled else "Enable"
|
|
|
|
|
|
|
|
|
@ -206,22 +174,15 @@ def create_other_actions(client):
|
|
|
|
|
bluetooth_enabled = bluetooth_get_enabled()
|
|
|
|
|
bluetooth_action = "Disable" if bluetooth_enabled else "Enable"
|
|
|
|
|
|
|
|
|
|
actions = [
|
|
|
|
|
Action(f"{wifi_action} Wifi", toggle_wifi, not wifi_enabled),
|
|
|
|
|
Action(
|
|
|
|
|
f"{networking_action} Networking", toggle_networking, not networking_enabled
|
|
|
|
|
),
|
|
|
|
|
]
|
|
|
|
|
actions = [Action(f"{wifi_action} Wifi", toggle_wifi,
|
|
|
|
|
not wifi_enabled),
|
|
|
|
|
Action(f"{networking_action} Networking",
|
|
|
|
|
toggle_networking, not networking_enabled)]
|
|
|
|
|
if bluetooth_enabled is not None:
|
|
|
|
|
actions.append(
|
|
|
|
|
Action(
|
|
|
|
|
f"{bluetooth_action} Bluetooth", toggle_bluetooth, not bluetooth_enabled
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
actions += [
|
|
|
|
|
Action("Launch Connection Manager", launch_connection_editor),
|
|
|
|
|
Action("Delete a Connection", delete_connection),
|
|
|
|
|
]
|
|
|
|
|
actions.append(Action(f"{bluetooth_action} Bluetooth",
|
|
|
|
|
toggle_bluetooth, not bluetooth_enabled))
|
|
|
|
|
actions += [Action("Launch Connection Manager", launch_connection_editor),
|
|
|
|
|
Action("Delete a Connection", delete_connection)]
|
|
|
|
|
if wifi_enabled:
|
|
|
|
|
actions.append(Action("Rescan Wifi Networks", rescan_wifi))
|
|
|
|
|
return actions
|
|
|
|
@ -231,7 +192,7 @@ def rescan_wifi():
|
|
|
|
|
"""
|
|
|
|
|
Rescan Wifi Access Points
|
|
|
|
|
"""
|
|
|
|
|
delay = CONF.getint("nmdm", "rescan_delay", fallback=5)
|
|
|
|
|
delay = CONF.getint('nmdm', 'rescan_delay', fallback=5)
|
|
|
|
|
for dev in CLIENT.get_devices():
|
|
|
|
|
if gi.repository.NM.DeviceWifi == type(dev):
|
|
|
|
|
try:
|
|
|
|
@ -248,7 +209,9 @@ def rescan_wifi():
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def rescan_cb(dev, res, data):
|
|
|
|
|
"""Callback for rescan_wifi. Just for notifications"""
|
|
|
|
|
"""Callback for rescan_wifi. Just for notifications
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
if dev.request_scan_finish(res) is True:
|
|
|
|
|
notify("Wifi scan running...")
|
|
|
|
|
else:
|
|
|
|
@ -257,7 +220,7 @@ def rescan_cb(dev, res, data):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ssid_to_utf8(nm_ap):
|
|
|
|
|
"""Convert binary ssid to utf-8"""
|
|
|
|
|
""" Convert binary ssid to utf-8 """
|
|
|
|
|
ssid = nm_ap.get_ssid()
|
|
|
|
|
if not ssid:
|
|
|
|
|
return ""
|
|
|
|
@ -273,30 +236,25 @@ def prompt_saved(saved_cons):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ap_security(nm_ap):
|
|
|
|
|
"""Parse the security flags to return a string with 'WPA2', etc."""
|
|
|
|
|
"""Parse the security flags to return a string with 'WPA2', etc. """
|
|
|
|
|
flags = nm_ap.get_flags()
|
|
|
|
|
wpa_flags = nm_ap.get_wpa_flags()
|
|
|
|
|
rsn_flags = nm_ap.get_rsn_flags()
|
|
|
|
|
sec_str = ""
|
|
|
|
|
if (
|
|
|
|
|
(flags & getattr(NM, "80211ApFlags").PRIVACY)
|
|
|
|
|
and (wpa_flags == 0)
|
|
|
|
|
and (rsn_flags == 0)
|
|
|
|
|
):
|
|
|
|
|
if ((flags & getattr(NM, '80211ApFlags').PRIVACY) and
|
|
|
|
|
(wpa_flags == 0) and (rsn_flags == 0)):
|
|
|
|
|
sec_str = " WEP"
|
|
|
|
|
if wpa_flags:
|
|
|
|
|
sec_str = " WPA1"
|
|
|
|
|
if rsn_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_PSK:
|
|
|
|
|
if rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_PSK:
|
|
|
|
|
sec_str += " WPA2"
|
|
|
|
|
if rsn_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_SAE:
|
|
|
|
|
if rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_SAE:
|
|
|
|
|
sec_str += " WPA3"
|
|
|
|
|
if (wpa_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_802_1X) or (
|
|
|
|
|
rsn_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_802_1X
|
|
|
|
|
):
|
|
|
|
|
if ((wpa_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X) or
|
|
|
|
|
(rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_802_1X)):
|
|
|
|
|
sec_str += " 802.1X"
|
|
|
|
|
if (wpa_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_OWE) or (
|
|
|
|
|
rsn_flags & getattr(NM, "80211ApSecurityFlags").KEY_MGMT_OWE
|
|
|
|
|
):
|
|
|
|
|
if ((wpa_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_OWE) or
|
|
|
|
|
(rsn_flags & getattr(NM, '80211ApSecurityFlags').KEY_MGMT_OWE)):
|
|
|
|
|
sec_str += " OWE"
|
|
|
|
|
|
|
|
|
|
# If there is no security use "--"
|
|
|
|
@ -305,10 +263,13 @@ def ap_security(nm_ap):
|
|
|
|
|
return sec_str.lstrip()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Action: # pylint: disable=too-few-public-methods
|
|
|
|
|
class Action(): # pylint: disable=too-few-public-methods
|
|
|
|
|
"""Helper class to execute functions from a string variable"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, name, func, args=None, active=False):
|
|
|
|
|
def __init__(self,
|
|
|
|
|
name,
|
|
|
|
|
func,
|
|
|
|
|
args=None,
|
|
|
|
|
active=False):
|
|
|
|
|
self.name = name
|
|
|
|
|
self.func = func
|
|
|
|
|
self.is_active = active
|
|
|
|
@ -363,19 +324,16 @@ def process_ap(nm_ap, is_active, adapter):
|
|
|
|
|
CLIENT.deactivate_connection_async(nm_ap, None, deactivate_cb, nm_ap)
|
|
|
|
|
LOOP.run()
|
|
|
|
|
else:
|
|
|
|
|
conns_cur = [
|
|
|
|
|
i
|
|
|
|
|
for i in CONNS
|
|
|
|
|
if i.get_setting_wireless() is not None and conn_matches_adapter(i, adapter)
|
|
|
|
|
]
|
|
|
|
|
conns_cur = [i for i in CONNS if
|
|
|
|
|
i.get_setting_wireless() is not None and
|
|
|
|
|
conn_matches_adapter(i, adapter)]
|
|
|
|
|
con = nm_ap.filter_connections(conns_cur)
|
|
|
|
|
if len(con) > 1:
|
|
|
|
|
raise ValueError("There are multiple connections possible")
|
|
|
|
|
|
|
|
|
|
if len(con) == 1:
|
|
|
|
|
CLIENT.activate_connection_async(
|
|
|
|
|
con[0], adapter, nm_ap.get_path(), None, activate_cb, nm_ap
|
|
|
|
|
)
|
|
|
|
|
CLIENT.activate_connection_async(con[0], adapter, nm_ap.get_path(),
|
|
|
|
|
None, activate_cb, nm_ap)
|
|
|
|
|
LOOP.run()
|
|
|
|
|
else:
|
|
|
|
|
if ap_security(nm_ap) != "--":
|
|
|
|
@ -386,7 +344,9 @@ def process_ap(nm_ap, is_active, adapter):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def activate_cb(dev, res, data):
|
|
|
|
|
"""Notification if activate connection completed successfully"""
|
|
|
|
|
"""Notification if activate connection completed successfully
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
conn = dev.activate_connection_finish(res)
|
|
|
|
|
except GLib.Error:
|
|
|
|
@ -399,7 +359,9 @@ def activate_cb(dev, res, data):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def deactivate_cb(dev, res, data):
|
|
|
|
|
"""Notification if deactivate connection completed successfully"""
|
|
|
|
|
"""Notification if deactivate connection completed successfully
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
if dev.deactivate_connection_finish(res) is True:
|
|
|
|
|
notify(f"Deactivated {data.get_id()}")
|
|
|
|
|
else:
|
|
|
|
@ -410,17 +372,17 @@ def deactivate_cb(dev, res, data):
|
|
|
|
|
def process_vpngsm(con, activate):
|
|
|
|
|
"""Activate/deactive VPN or GSM connections"""
|
|
|
|
|
if activate:
|
|
|
|
|
CLIENT.activate_connection_async(con, None, None, None, activate_cb, con)
|
|
|
|
|
CLIENT.activate_connection_async(con, None, None,
|
|
|
|
|
None, activate_cb, con)
|
|
|
|
|
else:
|
|
|
|
|
CLIENT.deactivate_connection_async(con, None, deactivate_cb, con)
|
|
|
|
|
LOOP.run()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def strength_bars(signal_strength):
|
|
|
|
|
bars = NM.utils_wifi_strength_bars(signal_strength)
|
|
|
|
|
wifi_chars = CONF.get("dmenu", "wifi_chars", fallback=False)
|
|
|
|
|
if wifi_chars:
|
|
|
|
|
bars = "".join([wifi_chars[i] for i, j in enumerate(bars) if j == "*"])
|
|
|
|
|
bars = "".join([wifi_chars[i] for i, j in enumerate(bars) if j == '*'])
|
|
|
|
|
return bars
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -448,37 +410,22 @@ def create_ap_actions(aps, active_ap, active_connection, adapter): # noqa pylin
|
|
|
|
|
if CONF.getboolean("dmenu", "compact", fallback=False):
|
|
|
|
|
format = CONF.get("dmenu", "format", fallback="{name} {sec} {bars}")
|
|
|
|
|
else:
|
|
|
|
|
format = CONF.get(
|
|
|
|
|
"dmenu",
|
|
|
|
|
"format",
|
|
|
|
|
fallback="{name:<{max_len_name}s} {sec:<{max_len_sec}s} {bars:>4}",
|
|
|
|
|
)
|
|
|
|
|
format = CONF.get("dmenu", "format", fallback="{name:<{max_len_name}s} {sec:<{max_len_sec}s} {bars:>4}")
|
|
|
|
|
|
|
|
|
|
for nm_ap, name, sec in zip(aps, names, secs):
|
|
|
|
|
is_active = nm_ap.get_bssid() == active_ap_bssid
|
|
|
|
|
signal_strength = nm_ap.get_strength()
|
|
|
|
|
bars = strength_bars(signal_strength)
|
|
|
|
|
icon = strength_icon(signal_strength)
|
|
|
|
|
action_name = format.format(
|
|
|
|
|
name=name,
|
|
|
|
|
sec=sec,
|
|
|
|
|
signal=signal_strength,
|
|
|
|
|
bars=bars,
|
|
|
|
|
icon=icon,
|
|
|
|
|
max_len_name=max_len_name,
|
|
|
|
|
max_len_sec=max_len_sec,
|
|
|
|
|
)
|
|
|
|
|
action_name = format.format(name=name, sec=sec, signal=signal_strength, bars=bars, icon=icon,
|
|
|
|
|
max_len_name=max_len_name, max_len_sec=max_len_sec)
|
|
|
|
|
if is_active:
|
|
|
|
|
ap_actions.append(
|
|
|
|
|
Action(
|
|
|
|
|
action_name,
|
|
|
|
|
process_ap,
|
|
|
|
|
ap_actions.append(Action(action_name, process_ap,
|
|
|
|
|
[active_connection, True, adapter],
|
|
|
|
|
active=True,
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
active=True))
|
|
|
|
|
else:
|
|
|
|
|
ap_actions.append(Action(action_name, process_ap, [nm_ap, False, adapter]))
|
|
|
|
|
ap_actions.append(Action(action_name, process_ap,
|
|
|
|
|
[nm_ap, False, adapter]))
|
|
|
|
|
return ap_actions
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -491,15 +438,6 @@ def create_vpn_actions(vpns, active):
|
|
|
|
|
return _create_vpngsm_actions(vpns, active_vpns, "VPN")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_vlan_actions(vlans, active):
|
|
|
|
|
"""Create the list of strings to display with associated function
|
|
|
|
|
(activate/deactivate) for VLAN connections.
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
active_vlans = [i for i in active if "vlan" == i.get_connection_type()]
|
|
|
|
|
return _create_vpngsm_actions(vlans, active_vlans, "VLAN")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_wireguard_actions(wgs, active):
|
|
|
|
|
"""Create the list of strings to display with associated function
|
|
|
|
|
(activate/deactivate) for Wireguard connections.
|
|
|
|
@ -514,31 +452,25 @@ def create_eth_actions(eths, active):
|
|
|
|
|
(activate/deactivate) for Ethernet connections.
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
active_eths = [i for i in active if "ethernet" in i.get_connection_type()]
|
|
|
|
|
active_eths = [i for i in active if 'ethernet' in i.get_connection_type()]
|
|
|
|
|
return _create_vpngsm_actions(eths, active_eths, "Eth")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_gsm_actions(gsms, active):
|
|
|
|
|
"""Create the list of strings to display with associated function
|
|
|
|
|
(activate/deactivate) GSM connections."""
|
|
|
|
|
active_gsms = [
|
|
|
|
|
i
|
|
|
|
|
for i in active
|
|
|
|
|
if i.get_connection() is not None
|
|
|
|
|
and i.get_connection().is_type(NM.SETTING_GSM_SETTING_NAME)
|
|
|
|
|
]
|
|
|
|
|
active_gsms = [i for i in active if
|
|
|
|
|
i.get_connection() is not None and
|
|
|
|
|
i.get_connection().is_type(NM.SETTING_GSM_SETTING_NAME)]
|
|
|
|
|
return _create_vpngsm_actions(gsms, active_gsms, "GSM")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_blue_actions(blues, active):
|
|
|
|
|
"""Create the list of strings to display with associated function
|
|
|
|
|
(activate/deactivate) Bluetooth connections."""
|
|
|
|
|
active_blues = [
|
|
|
|
|
i
|
|
|
|
|
for i in active
|
|
|
|
|
if i.get_connection() is not None
|
|
|
|
|
and i.get_connection().is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)
|
|
|
|
|
]
|
|
|
|
|
active_blues = [i for i in active if
|
|
|
|
|
i.get_connection() is not None and
|
|
|
|
|
i.get_connection().is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)]
|
|
|
|
|
return _create_vpngsm_actions(blues, active_blues, "Bluetooth")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -557,29 +489,30 @@ def _create_vpngsm_actions(cons, active_cons, label):
|
|
|
|
|
is_active = con.get_id() in active_con_ids
|
|
|
|
|
action_name = f"{con.get_id()}:{label}"
|
|
|
|
|
if is_active:
|
|
|
|
|
active_connection = [a for a in active_cons if a.get_id() == con.get_id()]
|
|
|
|
|
active_connection = [a for a in active_cons
|
|
|
|
|
if a.get_id() == con.get_id()]
|
|
|
|
|
if len(active_connection) != 1:
|
|
|
|
|
raise ValueError(f"Multiple active connections match {con.get_id()}")
|
|
|
|
|
active_connection = active_connection[0]
|
|
|
|
|
|
|
|
|
|
actions.append(
|
|
|
|
|
Action(
|
|
|
|
|
action_name, process_vpngsm, [active_connection, False], active=True
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
actions.append(Action(action_name, process_vpngsm,
|
|
|
|
|
[active_connection, False], active=True))
|
|
|
|
|
else:
|
|
|
|
|
actions.append(Action(action_name, process_vpngsm, [con, True]))
|
|
|
|
|
actions.append(Action(action_name, process_vpngsm,
|
|
|
|
|
[con, True]))
|
|
|
|
|
return actions
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_wwan_actions(client):
|
|
|
|
|
"""Create WWWAN actions"""
|
|
|
|
|
"""Create WWWAN actions
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
wwan_enabled = client.wwan_get_enabled()
|
|
|
|
|
wwan_action = "Disable" if wwan_enabled else "Enable"
|
|
|
|
|
return [Action(f"{wwan_action} WWAN", toggle_wwan, not wwan_enabled)]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def combine_actions(eths, aps, vlans, vpns, wgs, gsms, blues, wwan, others, saved):
|
|
|
|
|
def combine_actions(eths, aps, vpns, wgs, gsms, blues, wwan, others, saved):
|
|
|
|
|
# pylint: disable=too-many-arguments
|
|
|
|
|
"""Combine all given actions into a list of actions.
|
|
|
|
|
|
|
|
|
@ -592,11 +525,10 @@ def combine_actions(eths, aps, vlans, vpns, wgs, gsms, blues, wwan, others, save
|
|
|
|
|
others: list of Actions
|
|
|
|
|
"""
|
|
|
|
|
compact = CONF.getboolean("dmenu", "compact", fallback=False)
|
|
|
|
|
empty_action = [Action("", None)] if not compact else []
|
|
|
|
|
empty_action = [Action('', None)] if not compact else []
|
|
|
|
|
all_actions = []
|
|
|
|
|
all_actions += eths + empty_action if eths else []
|
|
|
|
|
all_actions += aps + empty_action if aps else []
|
|
|
|
|
all_actions += vlans + empty_action if vlans else []
|
|
|
|
|
all_actions += vpns + empty_action if vpns else []
|
|
|
|
|
all_actions += wgs + empty_action if wgs else []
|
|
|
|
|
all_actions += gsms + empty_action if (gsms and wwan) else []
|
|
|
|
@ -607,81 +539,38 @@ def combine_actions(eths, aps, vlans, vpns, wgs, gsms, blues, wwan, others, save
|
|
|
|
|
return all_actions
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_wofi_highlight_markup(action):
|
|
|
|
|
highlight_fg = CONF.get("dmenu", "highlight_fg", fallback=None)
|
|
|
|
|
highlight_bg = CONF.get("dmenu", "highlight_bg", fallback=None)
|
|
|
|
|
highlight_bold = CONF.getboolean("dmenu", "highlight_bold", fallback=True)
|
|
|
|
|
|
|
|
|
|
style = ""
|
|
|
|
|
if highlight_fg:
|
|
|
|
|
style += f'foreground="{highlight_fg}" '
|
|
|
|
|
if highlight_bg:
|
|
|
|
|
style += f'background="{highlight_bg}" '
|
|
|
|
|
if highlight_bold:
|
|
|
|
|
style += 'weight="bold" '
|
|
|
|
|
|
|
|
|
|
return f"<span {style}>" + str(action) + "</span>"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_selection(all_actions):
|
|
|
|
|
"""Spawn dmenu for selection and execute the associated action."""
|
|
|
|
|
command = shlex.split(CONF.get("dmenu", "dmenu_command", fallback="dmenu"))
|
|
|
|
|
cmd_base = basename(command[0])
|
|
|
|
|
active_chars = CONF.get("dmenu", "active_chars", fallback="==")
|
|
|
|
|
highlight = CONF.getboolean("dmenu", "highlight", fallback=False)
|
|
|
|
|
rofi_highlight = CONF.getboolean('dmenu', 'rofi_highlight', fallback=False)
|
|
|
|
|
inp = []
|
|
|
|
|
|
|
|
|
|
if highlight is True and cmd_base == "rofi":
|
|
|
|
|
if rofi_highlight is True:
|
|
|
|
|
inp = [str(action) for action in all_actions]
|
|
|
|
|
elif highlight is True and cmd_base == "wofi":
|
|
|
|
|
inp = [
|
|
|
|
|
get_wofi_highlight_markup(action) if action.is_active else str(action)
|
|
|
|
|
for action in all_actions
|
|
|
|
|
]
|
|
|
|
|
else:
|
|
|
|
|
inp = [
|
|
|
|
|
(active_chars if action.is_active else " " * len(active_chars))
|
|
|
|
|
+ " "
|
|
|
|
|
+ str(action)
|
|
|
|
|
for action in all_actions
|
|
|
|
|
]
|
|
|
|
|
active_lines = [
|
|
|
|
|
index for index, action in enumerate(all_actions) if action.is_active
|
|
|
|
|
]
|
|
|
|
|
inp = [('== ' if action.is_active else ' ') + str(action)
|
|
|
|
|
for action in all_actions]
|
|
|
|
|
active_lines = [index for index, action in enumerate(all_actions)
|
|
|
|
|
if action.is_active]
|
|
|
|
|
|
|
|
|
|
command = dmenu_cmd(len(inp), active_lines=active_lines)
|
|
|
|
|
sel = subprocess.run(
|
|
|
|
|
command,
|
|
|
|
|
sel = subprocess.run(command,
|
|
|
|
|
capture_output=True,
|
|
|
|
|
check=False,
|
|
|
|
|
input="\n".join(inp),
|
|
|
|
|
encoding=ENC,
|
|
|
|
|
env=ENV,
|
|
|
|
|
).stdout
|
|
|
|
|
env=ENV).stdout
|
|
|
|
|
|
|
|
|
|
if not sel.rstrip():
|
|
|
|
|
sys.exit()
|
|
|
|
|
|
|
|
|
|
if highlight is True and cmd_base == "rofi":
|
|
|
|
|
action = [i for i in all_actions if str(i).strip() == sel.strip()]
|
|
|
|
|
elif highlight is True and cmd_base == "wofi":
|
|
|
|
|
action = [
|
|
|
|
|
i
|
|
|
|
|
for i in all_actions
|
|
|
|
|
if str(i).strip() == sel.strip()
|
|
|
|
|
or get_wofi_highlight_markup(i) == sel.strip()
|
|
|
|
|
]
|
|
|
|
|
if rofi_highlight is False:
|
|
|
|
|
action = [i for i in all_actions
|
|
|
|
|
if ((str(i).strip() == str(sel.strip())
|
|
|
|
|
and not i.is_active) or
|
|
|
|
|
('== ' + str(i) == str(sel.rstrip('\n'))
|
|
|
|
|
and i.is_active))]
|
|
|
|
|
else:
|
|
|
|
|
action = [
|
|
|
|
|
i
|
|
|
|
|
for i in all_actions
|
|
|
|
|
if (
|
|
|
|
|
(str(i).strip() == str(sel.strip()) and not i.is_active)
|
|
|
|
|
or (
|
|
|
|
|
active_chars + " " + str(i) == str(sel.rstrip("\n")) and i.is_active
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
]
|
|
|
|
|
action = [i for i in all_actions if str(i).strip() == sel.strip()]
|
|
|
|
|
if len(action) != 1:
|
|
|
|
|
raise ValueError(f"Selection was ambiguous: '{str(sel.strip())}'")
|
|
|
|
|
return action[0]
|
|
|
|
@ -695,17 +584,8 @@ def toggle_networking(enable):
|
|
|
|
|
"""
|
|
|
|
|
toggle = GLib.Variant.new_tuple(GLib.Variant.new_boolean(enable))
|
|
|
|
|
try:
|
|
|
|
|
CLIENT.dbus_call(
|
|
|
|
|
NM.DBUS_PATH,
|
|
|
|
|
NM.DBUS_INTERFACE,
|
|
|
|
|
"Enable",
|
|
|
|
|
toggle,
|
|
|
|
|
None,
|
|
|
|
|
-1,
|
|
|
|
|
None,
|
|
|
|
|
None,
|
|
|
|
|
None,
|
|
|
|
|
)
|
|
|
|
|
CLIENT.dbus_call(NM.DBUS_PATH, NM.DBUS_INTERFACE, "Enable", toggle,
|
|
|
|
|
None, -1, None, None, None)
|
|
|
|
|
except AttributeError:
|
|
|
|
|
# Workaround for older versions of python-gobject
|
|
|
|
|
CLIENT.networking_set_enabled(enable)
|
|
|
|
@ -720,16 +600,8 @@ def toggle_wifi(enable):
|
|
|
|
|
"""
|
|
|
|
|
toggle = GLib.Variant.new_boolean(enable)
|
|
|
|
|
try:
|
|
|
|
|
CLIENT.dbus_set_property(
|
|
|
|
|
NM.DBUS_PATH,
|
|
|
|
|
NM.DBUS_INTERFACE,
|
|
|
|
|
"WirelessEnabled",
|
|
|
|
|
toggle,
|
|
|
|
|
-1,
|
|
|
|
|
None,
|
|
|
|
|
None,
|
|
|
|
|
None,
|
|
|
|
|
)
|
|
|
|
|
CLIENT.dbus_set_property(NM.DBUS_PATH, NM.DBUS_INTERFACE, "WirelessEnabled", toggle,
|
|
|
|
|
-1, None, None, None)
|
|
|
|
|
except AttributeError:
|
|
|
|
|
# Workaround for older versions of python-gobject
|
|
|
|
|
CLIENT.wireless_set_enabled(enable)
|
|
|
|
@ -744,9 +616,8 @@ def toggle_wwan(enable):
|
|
|
|
|
"""
|
|
|
|
|
toggle = GLib.Variant.new_boolean(enable)
|
|
|
|
|
try:
|
|
|
|
|
CLIENT.dbus_set_property(
|
|
|
|
|
NM.DBUS_PATH, NM.DBUS_INTERFACE, "WwanEnabled", toggle, -1, None, None, None
|
|
|
|
|
)
|
|
|
|
|
CLIENT.dbus_set_property(NM.DBUS_PATH, NM.DBUS_INTERFACE, "WwanEnabled", toggle,
|
|
|
|
|
-1, None, None, None)
|
|
|
|
|
except AttributeError:
|
|
|
|
|
# Workaround for older versions of python-gobject
|
|
|
|
|
CLIENT.wwan_set_enabled(enable)
|
|
|
|
@ -756,9 +627,6 @@ def toggle_wwan(enable):
|
|
|
|
|
def toggle_bluetooth(enable):
|
|
|
|
|
"""Enable/disable Bluetooth
|
|
|
|
|
|
|
|
|
|
Try bluetoothctl first, then drop to rfkill if it's not installed or
|
|
|
|
|
bluetooth service isn't running.
|
|
|
|
|
|
|
|
|
|
Args: enable - boolean
|
|
|
|
|
|
|
|
|
|
References:
|
|
|
|
@ -767,61 +635,40 @@ def toggle_bluetooth(enable):
|
|
|
|
|
https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/include/uapi/linux/rfkill.h?h=v5.8.9
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
if is_installed("bluetoothctl") and is_running("bluetoothd"):
|
|
|
|
|
# Times out in 2 seconds, otherwise bluetoothctl will hang if bluetooth
|
|
|
|
|
# service isn't running.
|
|
|
|
|
try:
|
|
|
|
|
res = subprocess.run(
|
|
|
|
|
["bluetoothctl", "power", "on" if enable is True else "off"],
|
|
|
|
|
timeout=2,
|
|
|
|
|
capture_output=True,
|
|
|
|
|
)
|
|
|
|
|
except subprocess.TimeoutExpired:
|
|
|
|
|
pass
|
|
|
|
|
try:
|
|
|
|
|
res = subprocess.run(
|
|
|
|
|
["bluetoothctl", "show"], timeout=2, capture_output=True, text=True
|
|
|
|
|
)
|
|
|
|
|
if "Powered: yes" in res.stdout:
|
|
|
|
|
notify("Bluetooth enabled")
|
|
|
|
|
return
|
|
|
|
|
except subprocess.TimeoutExpired:
|
|
|
|
|
pass
|
|
|
|
|
# Now try using rfkill
|
|
|
|
|
type_bluetooth = 2
|
|
|
|
|
op_change_all = 3
|
|
|
|
|
idx = 0
|
|
|
|
|
soft_state = 0 if enable else 1
|
|
|
|
|
hard_state = 0
|
|
|
|
|
|
|
|
|
|
data = struct.pack(
|
|
|
|
|
"IBBBB", idx, type_bluetooth, op_change_all, soft_state, hard_state
|
|
|
|
|
)
|
|
|
|
|
data = struct.pack("IBBBB", idx, type_bluetooth, op_change_all,
|
|
|
|
|
soft_state, hard_state)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
with open("/dev/rfkill", "r+b", buffering=0) as rff:
|
|
|
|
|
with open('/dev/rfkill', 'r+b', buffering=0) as rff:
|
|
|
|
|
rff.write(data)
|
|
|
|
|
except PermissionError:
|
|
|
|
|
notify(
|
|
|
|
|
"Lacking permission to write to /dev/rfkill.",
|
|
|
|
|
notify("Lacking permission to write to /dev/rfkill.",
|
|
|
|
|
"Check README for configuration options.",
|
|
|
|
|
urgency="critical",
|
|
|
|
|
)
|
|
|
|
|
urgency="critical")
|
|
|
|
|
else:
|
|
|
|
|
notify(f"Bluetooth {'enabled' if enable else 'disabled'}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def launch_connection_editor():
|
|
|
|
|
"""Launch nmtui or the gui nm-connection-editor"""
|
|
|
|
|
terminal = shlex.split(CONF.get("editor", "terminal", fallback="xterm"))
|
|
|
|
|
"""Launch nmtui or the gui nm-connection-editor
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
terminal = CONF.get("editor", "terminal", fallback="xterm")
|
|
|
|
|
gui_if_available = CONF.getboolean("editor", "gui_if_available", fallback=True)
|
|
|
|
|
gui = CONF.get("editor", "gui", fallback="nm-connection-editor")
|
|
|
|
|
guis = ["gnome-control-center", "nm-connection-editor"]
|
|
|
|
|
if gui_if_available is True:
|
|
|
|
|
for gui in guis:
|
|
|
|
|
if is_installed(gui):
|
|
|
|
|
subprocess.run(gui, check=False)
|
|
|
|
|
return
|
|
|
|
|
if is_installed("nmtui"):
|
|
|
|
|
subprocess.run(terminal + ["-e", "nmtui"], check=False)
|
|
|
|
|
subprocess.run([terminal, "-e", "nmtui"], check=False)
|
|
|
|
|
return
|
|
|
|
|
notify("No network connection editor installed", urgency="critical")
|
|
|
|
|
|
|
|
|
@ -834,46 +681,38 @@ def get_passphrase():
|
|
|
|
|
"""
|
|
|
|
|
pinentry = CONF.get("dmenu", "pinentry", fallback=None)
|
|
|
|
|
if pinentry:
|
|
|
|
|
description = CONF.get(
|
|
|
|
|
"pinentry", "description", fallback="Get network password"
|
|
|
|
|
)
|
|
|
|
|
description = CONF.get("pinentry", "description", fallback="Get network password")
|
|
|
|
|
prompt = CONF.get("pinentry", "prompt", fallback="Password: ")
|
|
|
|
|
pin = ""
|
|
|
|
|
out = subprocess.run(
|
|
|
|
|
pinentry,
|
|
|
|
|
out = subprocess.run(pinentry,
|
|
|
|
|
capture_output=True,
|
|
|
|
|
check=False,
|
|
|
|
|
encoding=ENC,
|
|
|
|
|
input=f"setdesc {description}\nsetprompt {prompt}\ngetpin\n",
|
|
|
|
|
).stdout
|
|
|
|
|
input=f"setdesc {description}\nsetprompt {prompt}\ngetpin\n").stdout
|
|
|
|
|
if out:
|
|
|
|
|
res = [i for i in out.split("\n") if i.startswith("D ")]
|
|
|
|
|
if res and res[0].startswith("D "):
|
|
|
|
|
pin = res[0].split("D ")[1]
|
|
|
|
|
return pin
|
|
|
|
|
return subprocess.run(
|
|
|
|
|
dmenu_cmd(0, "Passphrase"),
|
|
|
|
|
return subprocess.run(dmenu_cmd(0, "Passphrase"),
|
|
|
|
|
stdin=subprocess.DEVNULL,
|
|
|
|
|
capture_output=True,
|
|
|
|
|
check=False,
|
|
|
|
|
encoding=ENC,
|
|
|
|
|
).stdout
|
|
|
|
|
encoding=ENC).stdout
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def delete_connection():
|
|
|
|
|
"""Display list of NM connections and delete the selected one"""
|
|
|
|
|
conn_acts = [
|
|
|
|
|
Action(i.get_id(), i.delete_async, args=[None, delete_cb, None]) for i in CONNS
|
|
|
|
|
]
|
|
|
|
|
"""Display list of NM connections and delete the selected one
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
conn_acts = [Action(i.get_id(), i.delete_async, args=[None, delete_cb, None]) for i in CONNS]
|
|
|
|
|
conn_names = "\n".join([str(i) for i in conn_acts])
|
|
|
|
|
sel = subprocess.run(
|
|
|
|
|
dmenu_cmd(len(conn_acts), "CHOOSE CONNECTION TO DELETE:"),
|
|
|
|
|
sel = subprocess.run(dmenu_cmd(len(conn_acts), "CHOOSE CONNECTION TO DELETE:"),
|
|
|
|
|
capture_output=True,
|
|
|
|
|
check=False,
|
|
|
|
|
input=conn_names,
|
|
|
|
|
encoding=ENC,
|
|
|
|
|
env=ENV,
|
|
|
|
|
).stdout
|
|
|
|
|
env=ENV).stdout
|
|
|
|
|
if not sel.strip():
|
|
|
|
|
sys.exit()
|
|
|
|
|
action = [i for i in conn_acts if str(i) == sel.rstrip("\n")]
|
|
|
|
@ -884,7 +723,9 @@ def delete_connection():
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def delete_cb(dev, res, data):
|
|
|
|
|
"""Notification if delete completed successfully"""
|
|
|
|
|
"""Notification if delete completed successfully
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
if dev.delete_finish(res) is True:
|
|
|
|
|
notify(f"Deleted {dev.get_id()}")
|
|
|
|
|
else:
|
|
|
|
@ -901,9 +742,8 @@ def set_new_connection(nm_ap, nm_pw, adapter):
|
|
|
|
|
"""
|
|
|
|
|
nm_pw = str(nm_pw).strip()
|
|
|
|
|
profile = create_wifi_profile(nm_ap, nm_pw, adapter)
|
|
|
|
|
CLIENT.add_and_activate_connection_async(
|
|
|
|
|
profile, adapter, nm_ap.get_path(), None, verify_conn, profile
|
|
|
|
|
)
|
|
|
|
|
CLIENT.add_and_activate_connection_async(profile, adapter, nm_ap.get_path(),
|
|
|
|
|
None, verify_conn, profile)
|
|
|
|
|
LOOP.run()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -924,10 +764,8 @@ def create_wifi_profile(nm_ap, password, adapter):
|
|
|
|
|
|
|
|
|
|
s_wifi = NM.SettingWireless.new()
|
|
|
|
|
s_wifi.set_property(NM.SETTING_WIRELESS_SSID, nm_ap.get_ssid())
|
|
|
|
|
s_wifi.set_property(NM.SETTING_WIRELESS_MODE, "infrastructure")
|
|
|
|
|
s_wifi.set_property(
|
|
|
|
|
NM.SETTING_WIRELESS_MAC_ADDRESS, adapter.get_permanent_hw_address()
|
|
|
|
|
)
|
|
|
|
|
s_wifi.set_property(NM.SETTING_WIRELESS_MODE, 'infrastructure')
|
|
|
|
|
s_wifi.set_property(NM.SETTING_WIRELESS_MAC_ADDRESS, adapter.get_permanent_hw_address())
|
|
|
|
|
profile.add_setting(s_wifi)
|
|
|
|
|
|
|
|
|
|
s_ip4 = NM.SettingIP4Config.new()
|
|
|
|
@ -942,18 +780,19 @@ def create_wifi_profile(nm_ap, password, adapter):
|
|
|
|
|
s_wifi_sec = NM.SettingWirelessSecurity.new()
|
|
|
|
|
if "WPA" in ap_sec:
|
|
|
|
|
if "WPA3" in ap_sec:
|
|
|
|
|
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, "sae")
|
|
|
|
|
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT,
|
|
|
|
|
"sae")
|
|
|
|
|
else:
|
|
|
|
|
s_wifi_sec.set_property(
|
|
|
|
|
NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, "wpa-psk"
|
|
|
|
|
)
|
|
|
|
|
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_AUTH_ALG, "open")
|
|
|
|
|
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT,
|
|
|
|
|
"wpa-psk")
|
|
|
|
|
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_AUTH_ALG,
|
|
|
|
|
"open")
|
|
|
|
|
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_PSK, password)
|
|
|
|
|
elif "WEP" in ap_sec:
|
|
|
|
|
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, "None")
|
|
|
|
|
s_wifi_sec.set_property(
|
|
|
|
|
NM.SETTING_WIRELESS_SECURITY_WEP_KEY_TYPE, NM.WepKeyType.PASSPHRASE
|
|
|
|
|
)
|
|
|
|
|
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT,
|
|
|
|
|
"None")
|
|
|
|
|
s_wifi_sec.set_property(NM.SETTING_WIRELESS_SECURITY_WEP_KEY_TYPE,
|
|
|
|
|
NM.WepKeyType.PASSPHRASE)
|
|
|
|
|
s_wifi_sec.set_wep_key(0, password)
|
|
|
|
|
profile.add_setting(s_wifi_sec)
|
|
|
|
|
|
|
|
|
@ -970,14 +809,16 @@ def verify_conn(client, result, data):
|
|
|
|
|
try:
|
|
|
|
|
act_conn = client.add_and_activate_connection_finish(result)
|
|
|
|
|
conn = act_conn.get_connection()
|
|
|
|
|
if not all(
|
|
|
|
|
[conn.verify(), conn.verify_secrets(), data.verify(), data.verify_secrets()]
|
|
|
|
|
):
|
|
|
|
|
if not all([conn.verify(),
|
|
|
|
|
conn.verify_secrets(),
|
|
|
|
|
data.verify(),
|
|
|
|
|
data.verify_secrets()]):
|
|
|
|
|
raise GLib.Error
|
|
|
|
|
notify(f"Added {conn.get_id()}")
|
|
|
|
|
except GLib.Error:
|
|
|
|
|
try:
|
|
|
|
|
notify(f"Connection to {conn.get_id()} failed", urgency="critical")
|
|
|
|
|
notify(f"Connection to {conn.get_id()} failed",
|
|
|
|
|
urgency="critical")
|
|
|
|
|
conn.delete_async(None, None, None)
|
|
|
|
|
except UnboundLocalError:
|
|
|
|
|
pass
|
|
|
|
@ -1000,27 +841,22 @@ def create_ap_list(adapter, active_connections):
|
|
|
|
|
aps = []
|
|
|
|
|
ap_names = []
|
|
|
|
|
active_ap = adapter.get_active_access_point()
|
|
|
|
|
aps_all = sorted(
|
|
|
|
|
adapter.get_access_points(), key=lambda a: a.get_strength(), reverse=True
|
|
|
|
|
)
|
|
|
|
|
conns_cur = [
|
|
|
|
|
i
|
|
|
|
|
for i in CONNS
|
|
|
|
|
if i.get_setting_wireless() is not None and conn_matches_adapter(i, adapter)
|
|
|
|
|
]
|
|
|
|
|
aps_all = sorted(adapter.get_access_points(),
|
|
|
|
|
key=lambda a: a.get_strength(), reverse=True)
|
|
|
|
|
conns_cur = [i for i in CONNS if
|
|
|
|
|
i.get_setting_wireless() is not None and
|
|
|
|
|
conn_matches_adapter(i, adapter)]
|
|
|
|
|
try:
|
|
|
|
|
ap_conns = active_ap.filter_connections(conns_cur)
|
|
|
|
|
active_ap_name = ssid_to_utf8(active_ap)
|
|
|
|
|
active_ap_con = [
|
|
|
|
|
active_conn
|
|
|
|
|
for active_conn in active_connections
|
|
|
|
|
if active_conn.get_connection() in ap_conns
|
|
|
|
|
]
|
|
|
|
|
active_ap_con = [active_conn for active_conn in active_connections
|
|
|
|
|
if active_conn.get_connection() in ap_conns]
|
|
|
|
|
except AttributeError:
|
|
|
|
|
active_ap_name = None
|
|
|
|
|
active_ap_con = []
|
|
|
|
|
if len(active_ap_con) > 1:
|
|
|
|
|
raise ValueError("Multiple connection profiles match" " the wireless AP")
|
|
|
|
|
raise ValueError("Multiple connection profiles match"
|
|
|
|
|
" the wireless AP")
|
|
|
|
|
active_ap_con = active_ap_con[0] if active_ap_con else None
|
|
|
|
|
for nm_ap in aps_all:
|
|
|
|
|
ap_name = ssid_to_utf8(nm_ap)
|
|
|
|
@ -1034,17 +870,12 @@ def create_ap_list(adapter, active_connections):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def notify(message, details=None, urgency="low"):
|
|
|
|
|
"""Use notify-send if available for notifications"""
|
|
|
|
|
delay = CONF.getint("nmdm", "rescan_delay", fallback=5)
|
|
|
|
|
args = [
|
|
|
|
|
"-u",
|
|
|
|
|
urgency,
|
|
|
|
|
"-a",
|
|
|
|
|
"networkmanager-dmenu",
|
|
|
|
|
"-t",
|
|
|
|
|
str(delay * 1000),
|
|
|
|
|
message,
|
|
|
|
|
]
|
|
|
|
|
"""Use notify-send if available for notifications
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
delay = CONF.getint('nmdm', 'rescan_delay', fallback=5)
|
|
|
|
|
args = ["-u", urgency, "-a", "networkmanager-dmenu",
|
|
|
|
|
"-t", str(delay * 1000), message]
|
|
|
|
|
if details is not None:
|
|
|
|
|
args.append(details)
|
|
|
|
|
if is_installed("notify-send"):
|
|
|
|
@ -1072,13 +903,11 @@ def run(): # pylint: disable=too-many-locals
|
|
|
|
|
# Workaround for older versions of python-gobject with no wireguard support
|
|
|
|
|
wgs = []
|
|
|
|
|
eths = [i for i in CONNS if i.is_type(NM.SETTING_WIRED_SETTING_NAME)]
|
|
|
|
|
vlans = [i for i in CONNS if i.is_type(NM.SETTING_VLAN_SETTING_NAME)]
|
|
|
|
|
blues = [i for i in CONNS if i.is_type(NM.SETTING_BLUETOOTH_SETTING_NAME)]
|
|
|
|
|
|
|
|
|
|
vpn_actions = create_vpn_actions(vpns, active)
|
|
|
|
|
wg_actions = create_wireguard_actions(wgs, active)
|
|
|
|
|
eth_actions = create_eth_actions(eths, active)
|
|
|
|
|
vlan_actions = create_vlan_actions(vlans, active)
|
|
|
|
|
blue_actions = create_blue_actions(blues, active)
|
|
|
|
|
other_actions = create_other_actions(CLIENT)
|
|
|
|
|
wwan_installed = is_installed("ModemManager")
|
|
|
|
@ -1090,31 +919,24 @@ def run(): # pylint: disable=too-many-locals
|
|
|
|
|
gsm_actions = []
|
|
|
|
|
wwan_actions = []
|
|
|
|
|
|
|
|
|
|
list_saved = CONF.getboolean("dmenu", "list_saved", fallback=False)
|
|
|
|
|
list_saved = CONF.getboolean('dmenu', 'list_saved', fallback=False)
|
|
|
|
|
saved_cons = [i for i in CONNS if i not in vpns + wgs + eths + blues]
|
|
|
|
|
if list_saved:
|
|
|
|
|
saved_actions = create_saved_actions(saved_cons)
|
|
|
|
|
else:
|
|
|
|
|
saved_actions = [Action("Saved connections", prompt_saved, [saved_cons])]
|
|
|
|
|
|
|
|
|
|
actions = combine_actions(
|
|
|
|
|
eth_actions,
|
|
|
|
|
ap_actions,
|
|
|
|
|
vlan_actions,
|
|
|
|
|
vpn_actions,
|
|
|
|
|
wg_actions,
|
|
|
|
|
gsm_actions,
|
|
|
|
|
blue_actions,
|
|
|
|
|
wwan_actions,
|
|
|
|
|
other_actions,
|
|
|
|
|
saved_actions,
|
|
|
|
|
)
|
|
|
|
|
actions = combine_actions(eth_actions, ap_actions, vpn_actions, wg_actions,
|
|
|
|
|
gsm_actions, blue_actions, wwan_actions,
|
|
|
|
|
other_actions, saved_actions)
|
|
|
|
|
sel = get_selection(actions)
|
|
|
|
|
sel()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
"""Main. Enables script to be re-run after a wifi rescan"""
|
|
|
|
|
"""Main. Enables script to be re-run after a wifi rescan
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
global CLIENT, CONNS, LOOP # noqa pylint: disable=global-variable-undefined
|
|
|
|
|
CLIENT = NM.Client.new(None)
|
|
|
|
|
LOOP = GLib.MainLoop()
|
|
|
|
@ -1123,7 +945,7 @@ def main():
|
|
|
|
|
run()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
main()
|
|
|
|
|
|
|
|
|
|
# vim: set et ts=4 sw=4 :
|
|
|
|
|