From 35cc0d1dfd69af7606697a4d14079bd1469556de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=80=E5=8F=AA=E5=B0=8F=E8=8D=94=E6=9E=9D?= Date: Thu, 18 Dec 2025 06:06:37 +0100 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=20Python=E8=81=8A=E5=A4=A9?= =?UTF-8?q?=E5=B7=A5=E5=85=B7.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Python聊天工具.py | 727 ---------------------------------------------- 1 file changed, 727 deletions(-) delete mode 100644 Python聊天工具.py diff --git a/Python聊天工具.py b/Python聊天工具.py deleted file mode 100644 index 709d750..0000000 --- a/Python聊天工具.py +++ /dev/null @@ -1,727 +0,0 @@ -import socket -import threading -import time -import os -import sys -import json - -class Color: - R = '\033[0m' - C = '\033[96m' - Y = '\033[93m' - R_ = '\033[91m' - G = '\033[92m' - -class ChatRoom: - ROOM_BROADCAST_PORT = 5000 - ROOM_BROADCAST_INTERVAL = 3 - ROOM_TIMEOUT = 10 - - def __init__(self): - self.username = "" - self.client_socket = None - self.is_host = False - self.room_ip = "" - self.room_port = 0 - self.user_color = Color.Y - self.running = True - self.connected = False - self.clients = {} - self.room_info = {} - self.discovered_rooms = {} - self.broadcast_thread = None - self.discovery_thread = None - self.room_locked = False - self.room_password = "" - self.muted_users = set() - self.global_mute = False - - def register(self): - os.system('cls' if os.name == 'nt' else 'clear') - while True: - name = input(f"{Color.C}[系统提示] 请输入用户名:{Color.R}".strip()) - if name and not ' ' in name and len(name) <= 10: - self.username = name - break - print(f"{Color.C}[系统提示] 用户名无效!需1-10个字符且不含空格{Color.R}") - - def clear_screen(self): - os.system('cls' if os.name == 'nt' else 'clear') - - def show_main_menu(self): - while self.running and not self.connected: - self.clear_screen() - print(f"{Color.C}{'='*40}{Color.R}") - print(f"{Color.C}{'[Python简易聊天工具]':^40}{Color.R}") - print(f"{Color.C}{'='*40}{Color.R}") - print(f"{Color.C}{'1. 创建房间':<38}{'[ 创建属于你的房间 ]'}{Color.R}") - print(f"{Color.C}{'2. 加入房间':<38}{'[ 加入你想要的房间 ]'}{Color.R}") - print(f"{Color.C}{'3. 查找房间':<38}{'[ 自动搜索房间 ]'}{Color.R}") - print(f"{Color.C}{'4. 退出脚本':<38}{'[ 关闭Python脚本 ]'}{Color.R}") - print(f"{Color.C}{'='*40}{Color.R}") - choice = input(f"{Color.C}[请选择功能(1-4)]:{Color.R}".strip()) - if choice == "1": - self.create_room() - elif choice == "2": - self.join_room_manual() - elif choice == "3": - self.discover_rooms() - elif choice == "4": - self.exit_script() - else: - print(f"{Color.C}[系统提示] 无效选择,请输入1-4之间的数字{Color.R}") - time.sleep(1) - - def get_local_ip(self): - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - try: - s.connect(('8.8.8.8', 80)) - ip = s.getsockname()[0] - except Exception: - ip = '127.0.0.1' - finally: - s.close() - return ip - - def broadcast_room_info(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - while self.connected and self.is_host: - try: - room_info = json.dumps({ - 'ip': self.room_ip, - 'port': self.room_port, - 'host': self.username, - 'online': len(self.clients) + 1, - 'locked': self.room_locked, - 'timestamp': time.time() - }) - sock.sendto(room_info.encode('utf-8'), ('', self.ROOM_BROADCAST_PORT)) - time.sleep(self.ROOM_BROADCAST_INTERVAL) - except Exception: - break - sock.close() - - def discover_rooms(self): - self.clear_screen() - print(f"{Color.C}{'='*40}{Color.R}") - print(f"{Color.C}{'[ 房间查找 ]':^40}{Color.R}") - print(f"{Color.C}{'='*40}{Color.R}") - print(f"{Color.C}[系统提示] 正在局域网内查找房间...(按Ctrl+C停止){Color.R}") - print(f"{Color.C}{'-'*40}{Color.R}") - self.discovered_rooms = {} - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(('', self.ROOM_BROADCAST_PORT)) - sock.settimeout(1.0) - - def discovery(): - start_time = time.time() - while time.time() - start_time < 5 and self.running: - try: - data, addr = sock.recvfrom(1024) - info = json.loads(data.decode('utf-8')) - if time.time() - info['timestamp'] < self.ROOM_TIMEOUT: - self.discovered_rooms[f"{info['ip']}:{info['port']}"] = info - except socket.timeout: - continue - except Exception: - break - - self.discovery_thread = threading.Thread(target=discovery, daemon=True) - self.discovery_thread.start() - self.discovery_thread.join() - sock.close() - - self.clear_screen() - print(f"{Color.C}{'='*40}{Color.R}") - print(f"{Color.C}{'[ 查找结果 ]':^40}{Color.R}") - print(f"{Color.C}{'='*40}{Color.R}") - if not self.discovered_rooms: - print(f"{Color.C}[系统提示] 未找到任何房间{Color.R}") - print(f"{Color.C}{'-'*40}{Color.R}") - input(f"{Color.C}[系统提示] 按回车键返回主菜单...{Color.R}") - return - print(f"{Color.C}{'序号':<5}{'房间地址':<20}{'房主':<10}{'在线人数'}{'是否锁定'}{Color.R}") - print(f"{Color.C}{'-'*40}{Color.R}") - rooms = list(self.discovered_rooms.values()) - for i, room in enumerate(rooms, 1): - addr = f"{room['ip']}:{room['port']}" - locked = "是" if room['locked'] else "否" - print(f"{Color.C}{i:<5}{addr:<20}{room['host']:<10}{room['online']:<8}{locked}{Color.R}") - print(f"{Color.C}{'-'*40}{Color.R}") - while True: - choice = input(f"{Color.C}[请选择房间序号加入(0返回主菜单)]:{Color.R}".strip()) - if choice == "0": - return - if choice.isdigit() and 1 <= int(choice) <= len(rooms): - room = rooms[int(choice)-1] - self.room_ip = room['ip'] - self.room_port = room['port'] - self.room_locked = room['locked'] - self.join_room() - break - print(f"{Color.C}[系统提示] 无效选择,请输入正确的序号{Color.R}") - - def create_room(self): - self.clear_screen() - print(f"{Color.C}{'='*40}{Color.R}") - print(f"{Color.C}{'[ 创建房间 ]':^40}{Color.R}") - print(f"{Color.C}{'='*40}{Color.R}") - self.is_host = True - self.room_ip = self.get_local_ip() - - while True: - port_input = input(f"{Color.C}[请输入端口号(1024-65535,默认8888)]:{Color.R}".strip()) - self.room_port = 8888 if not port_input else int(port_input) - if 1024 <= self.room_port <= 65535: - break - print(f"{Color.C}[系统提示] 端口号无效!需在1024-65535之间{Color.R}") - - while True: - lock_choice = input(f"{Color.C}[是否开启房间锁(1=开启,0=关闭,默认0)]:{Color.R}".strip()) - lock_choice = lock_choice if lock_choice else "0" - if lock_choice in ["0", "1"]: - self.room_locked = (lock_choice == "1") - break - print(f"{Color.C}[系统提示] 无效选择!请输入0或1{Color.R}") - - if self.room_locked: - while True: - pwd = input(f"{Color.C}[请设置房间密码(4-8位字符)]:{Color.R}".strip()) - if 4 <= len(pwd) <= 8 and not ' ' in pwd: - self.room_password = pwd - break - print(f"{Color.C}[系统提示] 密码无效!需4-8位字符且不含空格{Color.R}") - - print(f"{Color.C}{'-'*40}{Color.R}") - print(f"{Color.C}[系统提示] 正在创建房间...{Color.R}") - server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - try: - server.bind((self.room_ip, self.room_port)) - server.listen(5) - self.connected = True - self.broadcast_thread = threading.Thread(target=self.broadcast_room_info, daemon=True) - self.broadcast_thread.start() - print(f"{Color.C}[系统提示] 房间创建成功!{Color.R}") - print(f"{Color.C}[房间地址] {self.room_ip}:{self.room_port}{Color.R}") - print(f"{Color.C}[房间状态] {'已锁定(需密码加入)' if self.room_locked else '未锁定(公开加入)'}{Color.R}") - print(f"{Color.C}[操作提示] 输入/房主菜单 查看专属功能{Color.R}") - print(f"{Color.Y}[系统提示] {self.username} 创建并加入了房间{Color.R}") - print(f"{Color.C}{'-'*40}{Color.R}") - threading.Thread(target=self.accept_clients, args=(server,), daemon=True).start() - self.client_chat_loop() - except PermissionError: - print(f"{Color.C}[系统提示] 创建房间失败!该端口已被占用,请更换端口{Color.R}") - self.is_host = False - time.sleep(2) - except Exception as e: - print(f"{Color.C}[系统提示] 创建房间失败!原因:{str(e)}{Color.R}") - self.is_host = False - time.sleep(2) - - def join_room_manual(self): - self.clear_screen() - print(f"{Color.C}{'='*40}{Color.R}") - print(f"{Color.C}{'[ 手动加入 ]':^40}{Color.R}") - print(f"{Color.C}{'='*40}{Color.R}") - self.room_ip = input(f"{Color.C}[请输入房间IP]:{Color.R}".strip()) - while True: - port_input = input(f"{Color.C}[请输入房间端口(默认8888)]:{Color.R}".strip()) - self.room_port = 8888 if not port_input else int(port_input) - if 1024 <= self.room_port <= 65535: - break - print(f"{Color.C}[系统提示] 端口号无效!需在1024-65535之间{Color.R}") - - self.room_locked = False - test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - test_socket.settimeout(2) - try: - test_socket.connect((self.room_ip, self.room_port)) - test_socket.send(b"CHECK_LOCK") - lock_response = test_socket.recv(1024).decode('utf-8') - if lock_response == "LOCKED": - self.room_locked = True - test_socket.close() - except Exception: - test_socket.close() - - self.room_password = "" - if self.room_locked: - self.room_password = input(f"{Color.C}[该房间已锁定,请输入密码]:{Color.R}".strip()) - - print(f"{Color.C}{'-'*40}{Color.R}") - self.join_room() - - def join_room(self): - self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - self.client_socket.connect((self.room_ip, self.room_port)) - if self.room_locked: - self.client_socket.send(f"JOIN_WITH_PWD:{self.username}:{self.room_password}".encode('utf-8')) - else: - self.client_socket.send(f"JOIN:{self.username}".encode('utf-8')) - - response = self.client_socket.recv(1024).decode('utf-8') - if response.startswith("SUCCESS"): - self.connected = True - print(f"{Color.Y}[系统提示] 成功加入房间!{Color.R}") - print(f"{Color.C}{'-'*40}{Color.R}") - threading.Thread(target=self.receive_messages, daemon=True).start() - self.client_chat_loop() - elif response.startswith("FAIL:用户名已存在"): - print(f"{Color.C}[系统提示] 加入失败!该用户名已被占用,请重新注册{Color.R}") - self.client_socket.close() - self.register() - time.sleep(2) - elif response == "FAIL:密码错误": - print(f"{Color.C}[系统提示] 加入失败!房间密码错误{Color.R}") - self.client_socket.close() - time.sleep(2) - else: - print(f"{Color.C}[系统提示] 加入失败!原因:{response.split(':', 1)[1] if ':' in response else response}{Color.R}") - self.client_socket.close() - time.sleep(2) - except ConnectionRefusedError: - print(f"{Color.C}[系统提示] 加入失败!目标房间未开启或地址错误{Color.R}") - self.client_socket.close() - time.sleep(2) - except socket.timeout: - print(f"{Color.C}[系统提示] 加入失败!连接超时{Color.R}") - self.client_socket.close() - time.sleep(2) - except Exception as e: - print(f"{Color.C}[系统提示] 加入失败!原因:{str(e)}{Color.R}") - self.client_socket.close() - time.sleep(2) - - def accept_clients(self, server): - while self.connected and self.is_host: - try: - client_socket, addr = server.accept() - data = client_socket.recv(1024).decode('utf-8') - if data == "CHECK_LOCK": - client_socket.send(b"LOCKED" if self.room_locked else b"UNLOCKED") - client_socket.close() - continue - - username = "" - password = "" - if data.startswith("JOIN_WITH_PWD:"): - parts = data.split(':', 2) - if len(parts) != 3: - client_socket.send("FAIL:无效请求".encode('utf-8')) - client_socket.close() - continue - username = parts[1] - password = parts[2] - if password != self.room_password: - client_socket.send("FAIL:密码错误".encode('utf-8')) - client_socket.close() - continue - elif data.startswith("JOIN:"): - username = data.split(':', 1)[1] - else: - client_socket.send("FAIL:无效请求".encode('utf-8')) - client_socket.close() - continue - - if username in self.clients or username == self.username: - client_socket.send("FAIL:用户名已存在".encode('utf-8')) - client_socket.close() - continue - - client_socket.send("SUCCESS:加入成功".encode('utf-8')) - self.clients[username] = client_socket - print(f"{Color.Y}[系统提示] {username} 加入了房间{Color.R}") - self.broadcast(f"{Color.Y}[系统提示] {username} 加入了房间{Color.R}", exclude=None) - threading.Thread(target=self.handle_client, args=(client_socket, username), daemon=True).start() - except Exception: - break - - def handle_client(self, client_socket, username): - while self.connected and username in self.clients: - try: - data = client_socket.recv(1024).decode('utf-8') - if not data: - self.remove_client(username) - break - if data == "/退出房间": - self.remove_client(username) - break - if self.global_mute or username in self.muted_users: - client_socket.send(f"{Color.C}[系统提示] 你已被禁言,无法发送消息{Color.R}".encode('utf-8')) - continue - if data.startswith("/"): - self.handle_command(data, username, client_socket) - else: - self.process_message(username, data) - except Exception: - self.remove_client(username) - break - - def process_message(self, username, content): - message = f"{Color.C}[{username}]{Color.R}: {self.user_color}{content}{Color.R}" - print(message) - - if "@全体" in content: - at_msg = f"{Color.R_}[提醒] {Color.C}[{username}]{Color.R} @全体成员:{self.user_color}{content}{Color.R}" - self.broadcast(at_msg, exclude=username) - else: - at_users = [user for user in self.clients if f"@{user}" in content] - if self.username in content: - at_users.append(self.username) - self.broadcast(message, exclude=username) - for user in at_users: - at_msg = f"{Color.R_}[提醒] {Color.C}[{user}]{Color.R} 你被 @ 了:{self.user_color}{content}{Color.R}" - if user == self.username: - print(at_msg) - else: - try: - self.clients[user].send(at_msg.encode('utf-8')) - except Exception: - pass - - def broadcast(self, message, exclude): - if self.is_host: - for username, sock in self.clients.items(): - if username != exclude: - try: - sock.send(message.encode('utf-8')) - except Exception: - self.remove_client(username) - else: - try: - self.client_socket.send(message.encode('utf-8')) - except Exception: - self.connected = False - - def remove_client(self, username): - if username in self.clients: - try: - self.clients[username].send(f"{Color.C}[系统提示] 你已被移出房间{Color.R}".encode('utf-8')) - except Exception: - pass - self.clients[username].close() - del self.clients[username] - if username in self.muted_users: - self.muted_users.remove(username) - print(f"{Color.Y}[系统提示] {username} 退出了房间{Color.R}") - self.broadcast(f"{Color.Y}[系统提示] {username} 退出了房间{Color.R}", exclude=None) - - def receive_messages(self): - while self.connected: - try: - data = self.client_socket.recv(1024).decode('utf-8') - if not data: - print(f"{Color.C}[系统提示] 与房间断开连接{Color.R}") - self.connected = False - break - print(data) - except Exception: - print(f"{Color.C}[系统提示] 接收消息失败,已断开连接{Color.R}") - self.connected = False - break - - def client_chat_loop(self): - while self.connected: - message = input().strip() - if message == "/清屏": - self.clear_screen() - elif message == "/退出脚本": - self.exit_script() - elif message == "/退出房间": - self.exit_room() - elif message == "/更换聊天颜色": - self.change_color() - elif message == "/房主菜单" and self.is_host: - self.show_host_menu() - elif message.startswith("/"): - if self.is_host: - self.handle_command(message, self.username, None) - else: - print(f"{Color.C}[系统提示] 无效指令{Color.R}") - else: - if self.is_host: - self.process_message(self.username, message) - else: - if message: - self.client_socket.send(message.encode('utf-8')) - self.process_message(self.username, message) - - def handle_command(self, cmd, username, client_socket): - if not self.is_host: - if client_socket: - client_socket.send(f"{Color.C}[系统提示] 无权限使用该指令{Color.R}".encode('utf-8')) - else: - print(f"{Color.C}[系统提示] 无权限使用该指令{Color.R}") - return - - if cmd == "/房主菜单": - self.show_host_menu() - elif cmd.startswith("/踢人 "): - parts = cmd.split(' ', 1) - if len(parts) < 2 or not parts[1].strip(): - print(f"{Color.C}[系统提示] 请输入要踢的用户名(格式:/踢人 用户名){Color.R}") - return - target = parts[1].strip() - if target == self.username: - print(f"{Color.C}[系统提示] 无法踢自己{Color.R}") - elif target in self.clients: - self.remove_client(target) - print(f"{Color.C}[系统提示] 已将{target} 踢出房间{Color.R}") - else: - print(f"{Color.C}[系统提示] 该用户不在房间内{Color.R}") - elif cmd.startswith("/禁言 "): - parts = cmd.split(' ', 1) - if len(parts) < 2 or not parts[1].strip(): - print(f"{Color.C}[系统提示] 请输入要禁言的对象(格式:/禁言 用户名/全体){Color.R}") - return - target = parts[1].strip() - if target == "全体": - self.global_mute = True - print(f"{Color.C}[系统提示] 已开启全体禁言{Color.R}") - self.broadcast(f"{Color.C}[系统提示] 房主已开启全体禁言{Color.R}", exclude=None) - elif target == self.username: - print(f"{Color.C}[系统提示] 无法禁言自己{Color.R}") - elif target in self.clients: - self.muted_users.add(target) - print(f"{Color.C}[系统提示] 已禁言{target}{Color.R}") - try: - self.clients[target].send(f"{Color.C}[系统提示] 你已被房主禁言{Color.R}".encode('utf-8')) - except Exception: - pass - else: - print(f"{Color.C}[系统提示] 该用户不在房间内{Color.R}") - elif cmd.startswith("/解禁 "): - parts = cmd.split(' ', 1) - if len(parts) < 2 or not parts[1].strip(): - print(f"{Color.C}[系统提示] 请输入要解禁的对象(格式:/解禁 用户名/全体){Color.R}") - return - target = parts[1].strip() - if target == "全体": - self.global_mute = False - self.muted_users.clear() - print(f"{Color.C}[系统提示] 已解除全体禁言{Color.R}") - self.broadcast(f"{Color.C}[系统提示] 房主已解除全体禁言{Color.R}", exclude=None) - elif target in self.muted_users: - self.muted_users.remove(target) - print(f"{Color.C}[系统提示] 已解禁{target}{Color.R}") - try: - self.clients[target].send(f"{Color.C}[系统提示] 你已被房主解禁{Color.R}".encode('utf-8')) - except Exception: - pass - else: - print(f"{Color.C}[系统提示] 该用户未被禁言或不在房间内{Color.R}") - else: - if client_socket: - client_socket.send(f"{Color.C}[系统提示] 无效指令{Color.R}".encode('utf-8')) - else: - print(f"{Color.C}[系统提示] 无效指令{Color.R}") - - def show_host_menu(self): - self.clear_screen() - print(f"{Color.C}{'='*50}{Color.R}") - print(f"{Color.C}{'[ 房主专属菜单 ]':^50}{Color.R}") - print(f"{Color.C}{'='*50}{Color.R}") - print(f"{Color.C}{'1. 踢人功能':<20}{'格式:/踢人 用户名':<30}{Color.R}") - print(f"{Color.C}{'2. 禁言功能':<20}{'格式:/禁言 用户名/全体':<30}{Color.R}") - print(f"{Color.C}{'3. 解禁功能':<20}{'格式:/解禁 用户名/全体':<30}{Color.R}") - print(f"{Color.C}{'4. 房间锁管理':<20}{'开启/关闭房间锁':<30}{Color.R}") - print(f"{Color.C}{'5. 查看在线用户':<20}{'显示当前房间所有成员':<30}{Color.R}") - print(f"{Color.C}{'6. 解散房间':<20}{'强制关闭房间,全员退出':<30}{Color.R}") - print(f"{Color.C}{'7. 关闭菜单':<20}{'返回聊天界面':<30}{Color.R}") - print(f"{Color.C}{'='*50}{Color.R}") - choice = input(f"{Color.C}[请选择功能(1-7)]:{Color.R}".strip()) - if choice == "1": - target = input(f"{Color.C}[请输入要踢的用户名]:{Color.R}".strip()) - if not target: - print(f"{Color.C}[系统提示] 用户名不能为空{Color.R}") - elif target == self.username: - print(f"{Color.C}[系统提示] 无法踢自己{Color.R}") - elif target in self.clients: - self.remove_client(target) - print(f"{Color.C}[系统提示] 已将{target} 踢出房间{Color.R}") - else: - print(f"{Color.C}[系统提示] 该用户不在房间内{Color.R}") - time.sleep(1) - elif choice == "2": - target = input(f"{Color.C}[请输入要禁言的用户名(输入'全体'开启全体禁言)]:{Color.R}".strip()) - if not target: - print(f"{Color.C}[系统提示] 请输入有效对象{Color.R}") - elif target == "全体": - self.global_mute = True - print(f"{Color.C}[系统提示] 已开启全体禁言{Color.R}") - self.broadcast(f"{Color.C}[系统提示] 房主已开启全体禁言{Color.R}", exclude=None) - elif target == self.username: - print(f"{Color.C}[系统提示] 无法禁言自己{Color.R}") - elif target in self.clients: - self.muted_users.add(target) - print(f"{Color.C}[系统提示] 已禁言{target}{Color.R}") - try: - self.clients[target].send(f"{Color.C}[系统提示] 你已被房主禁言{Color.R}".encode('utf-8')) - except Exception: - pass - else: - print(f"{Color.C}[系统提示] 该用户不在房间内{Color.R}") - time.sleep(1) - elif choice == "3": - target = input(f"{Color.C}[请输入要解禁的用户名(输入'全体'解除全体禁言)]:{Color.R}".strip()) - if not target: - print(f"{Color.C}[系统提示] 请输入有效对象{Color.R}") - elif target == "全体": - self.global_mute = False - self.muted_users.clear() - print(f"{Color.C}[系统提示] 已解除全体禁言{Color.R}") - self.broadcast(f"{Color.C}[系统提示] 房主已解除全体禁言{Color.R}", exclude=None) - elif target in self.muted_users: - self.muted_users.remove(target) - print(f"{Color.C}[系统提示] 已解禁{target}{Color.R}") - try: - self.clients[target].send(f"{Color.C}[系统提示] 你已被房主解禁{Color.R}".encode('utf-8')) - except Exception: - pass - else: - print(f"{Color.C}[系统提示] 该用户未被禁言或不在房间内{Color.R}") - time.sleep(1) - elif choice == "4": - print(f"{Color.C}{'-'*50}{Color.R}") - print(f"{Color.C}[当前房间锁状态] {'已开启' if self.room_locked else '已关闭'}{Color.R}") - while True: - lock_choice = input(f"{Color.C}[请选择(1=开启,0=关闭)]:{Color.R}".strip()) - if lock_choice in ["0", "1"]: - new_locked = (lock_choice == "1") - break - print(f"{Color.C}[系统提示] 无效选择!请输入0或1{Color.R}") - if new_locked == self.room_locked: - print(f"{Color.C}[系统提示] 房间锁状态未改变{Color.R}") - else: - if new_locked: - while True: - pwd = input(f"{Color.C}[请设置房间密码(4-8位字符)]:{Color.R}".strip()) - if 4 <= len(pwd) <= 8 and not ' ' in pwd: - self.room_password = pwd - break - print(f"{Color.C}[系统提示] 密码无效!需4-8位字符且不含空格{Color.R}") - self.room_locked = new_locked - print(f"{Color.C}[系统提示] 房间锁状态已更新为{'开启' if new_locked else '关闭'}{Color.R}") - self.broadcast(f"{Color.C}[系统提示] 房主已{'开启' if new_locked else '关闭'}房间锁{Color.R}", exclude=None) - print(f"{Color.C}{'-'*50}{Color.R}") - input(f"{Color.C}[系统提示] 按回车键返回...{Color.R}") - elif choice == "5": - print(f"{Color.C}{'-'*50}{Color.R}") - print(f"{Color.C}[在线用户统计] 共{len(self.clients)+1}人{Color.R}") - print(f"{Color.C}[房主] {self.username}{Color.R}") - if self.clients: - muted_list = [user for user in self.clients if user in self.muted_users] - normal_list = [user for user in self.clients if user not in self.muted_users] - print(f"{Color.C}[成员] {' | '.join(normal_list)}{Color.R}") - if muted_list: - print(f"{Color.C}[禁言成员] {' | '.join(muted_list)}{Color.R}") - else: - print(f"{Color.C}[成员] 暂无其他成员{Color.R}") - print(f"{Color.C}[全体禁言状态] {'开启' if self.global_mute else '关闭'}{Color.R}") - print(f"{Color.C}{'-'*50}{Color.R}") - input(f"{Color.C}[系统提示] 按回车键返回...{Color.R}") - elif choice == "6": - print(f"{Color.C}{'-'*50}{Color.R}") - confirm = input(f"{Color.C}[确认解散房间?(y/n)]:{Color.R}".strip()) - if confirm.lower() == "y": - self.dismiss_room() - elif choice != "7": - print(f"{Color.C}[系统提示] 无效选择{Color.R}") - time.sleep(1) - - def change_color(self): - print(f"{Color.C}{'='*40}{Color.R}") - print(f"{Color.C}{'[ 更 换 聊 天 颜 色 ]':^40}{Color.R}") - print(f"{Color.C}{'='*40}{Color.R}") - colors = [Color.Y, '\033[91m', '\033[92m', '\033[95m', '\033[94m'] - color_names = ['黄色', '红色', '绿色', '紫色', '蓝色'] - for i, name in enumerate(color_names, 1): - print(f"{Color.C}{i}. {colors[i-1]}{name}{Color.R}") - print(f"{Color.C}{'-'*40}{Color.R}") - choice = input(f"{Color.C}[请选择颜色(1-5)]:{Color.R}".strip()) - if choice.isdigit() and 1 <= int(choice) <= 5: - self.user_color = colors[int(choice)-1] - print(f"{Color.C}[系统提示] 聊天颜色已更换为{color_names[int(choice)-1]}{Color.R}") - else: - print(f"{Color.C}[系统提示] 无效选择,颜色未更改{Color.R}") - time.sleep(1) - - def dismiss_room(self): - print(f"{Color.C}[系统提示] 正在解散房间...{Color.R}") - self.connected = False - for sock in self.clients.values(): - try: - sock.send(f"{Color.C}[系统提示] 房主已解散房间{Color.R}".encode('utf-8')) - sock.close() - except Exception: - pass - self.clients.clear() - self.muted_users.clear() - self.global_mute = False - print(f"{Color.Y}[系统提示] 房间已解散{Color.R}") - time.sleep(2) - self.is_host = False - self.room_ip = "" - self.room_port = 0 - self.room_locked = False - self.room_password = "" - self.show_main_menu() - - def exit_room(self): - print(f"{Color.C}[系统提示] 正在退出房间...{Color.R}") - self.connected = False - if self.is_host: - for sock in self.clients.values(): - try: - sock.send(f"{Color.C}[系统提示] 房主已退出房间{Color.R}".encode('utf-8')) - sock.close() - except Exception: - pass - self.clients.clear() - self.muted_users.clear() - self.global_mute = False - else: - try: - self.client_socket.send("/退出房间".encode('utf-8')) - self.client_socket.close() - except Exception: - pass - print(f"{Color.Y}[系统提示] 已退出房间{Color.R}") - self.is_host = False - self.room_ip = "" - self.room_port = 0 - self.room_locked = False - self.room_password = "" - time.sleep(1) - self.show_main_menu() - - def exit_script(self): - print(f"{Color.C}[系统提示] 正在退出脚本...{Color.R}") - self.running = False - self.connected = False - if self.client_socket: - try: - self.client_socket.send("/退出脚本".encode('utf-8')) - self.client_socket.close() - except Exception: - pass - if self.is_host: - for sock in self.clients.values(): - try: - sock.send(f"{Color.C}[系统提示] 房主已退出,脚本关闭{Color.R}".encode('utf-8')) - sock.close() - except Exception: - pass - self.clients.clear() - self.muted_users.clear() - self.global_mute = False - sys.exit(0) - -if __name__ == "__main__": - chat = ChatRoom() - chat.register() - chat.show_main_menu()