上传文件至「/」

正式版1.0
This commit is contained in:
2025-12-18 06:08:48 +01:00
parent 35cc0d1dfd
commit 5801ad39bc
+756
View File
@@ -0,0 +1,756 @@
import socket
import threading
import time
import os
import sys
import json
# 终端颜色定义
class Color:
R = '\033[0m' # 重置
C = '\033[96m' # 青色
Y = '\033[93m' # 黄色
R_ = '\033[91m' # 红色
G = '\033[92m' # 绿色
# 聊天工具核心类
class ChatRoom:
# 广播配置
ROOM_BROADCAST_PORT = 5000
ROOM_BROADCAST_INTERVAL = 3
ROOM_TIMEOUT = 10
def __init__(self):
# 基础信息
self.username = ""
self.client_socket = None
self.is_host = False
self.room_ip = ""
self.room_port = 0
self.user_color = Color.Y
self.running = True
self.connected = False
# 房间管理
self.clients = {}
self.discovered_rooms = {}
self.broadcast_thread = None
self.discovery_thread = None
self.room_locked = False
self.room_password = ""
# 权限控制
self.muted_users = set()
self.global_mute = False
# 用户注册
def register(self):
os.system('cls' if os.name == 'nt' else 'clear')
while True:
name = input(f"{Color.C}[系统提示] 请输入用户名:{Color.R}".strip())
if name and not ' ' in name and len(name) <= 10:
self.username = name
break
print(f"{Color.C}[系统提示] 用户名无效 需1-10个字符且不含空格{Color.R}")
# 清屏
def clear_screen(self):
os.system('cls' if os.name == 'nt' else 'clear')
# 主菜单
def show_main_menu(self):
while self.running and not self.connected:
self.clear_screen()
print(f"{Color.C}{'='*40}{Color.R}")
print(f"{Color.C}{'[Python简易聊天工具]':^40}{Color.R}")
print(f"{Color.C}{'='*40}{Color.R}")
print(f"{Color.C}{'1. 创建房间':<38}{'[ 创建专属房间 ]'}{Color.R}")
print(f"{Color.C}{'2. 加入房间':<38}{'[ 输入地址加入 ]'}{Color.R}")
print(f"{Color.C}{'3. 查找房间':<38}{'[ 自动搜索局域网 ]'}{Color.R}")
print(f"{Color.C}{'4. 退出脚本':<38}{'[ 关闭聊天工具 ]'}{Color.R}")
print(f"{Color.C}{'='*40}{Color.R}")
choice = input(f"{Color.C}[请选择功能(1-4)]{Color.R}".strip())
if choice == "1":
self.create_room()
elif choice == "2":
self.join_room_manual()
elif choice == "3":
self.discover_rooms()
elif choice == "4":
self.exit_script()
else:
print(f"{Color.C}[系统提示] 无效选择 请输入1-4之间的数字{Color.R}")
time.sleep(1)
# 获取本地IP
def get_local_ip(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
except Exception:
ip = '127.0.0.1'
finally:
s.close()
return ip
# 房间信息广播
def broadcast_room_info(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
while self.connected and self.is_host:
try:
room_info = json.dumps({
'ip': self.room_ip,
'port': self.room_port,
'host': self.username,
'online': len(self.clients) + 1,
'locked': self.room_locked,
'timestamp': time.time()
})
sock.sendto(room_info.encode('utf-8'), ('<broadcast>', self.ROOM_BROADCAST_PORT))
time.sleep(self.ROOM_BROADCAST_INTERVAL)
except Exception:
break
sock.close()
# 查找房间
def discover_rooms(self):
self.clear_screen()
print(f"{Color.C}{'='*40}{Color.R}")
print(f"{Color.C}{'[ 房间查找 ]':^40}{Color.R}")
print(f"{Color.C}{'='*40}{Color.R}")
print(f"{Color.C}[系统提示] 正在查找局域网房间 按Ctrl+C停止{Color.R}")
print(f"{Color.C}{'-'*40}{Color.R}")
self.discovered_rooms = {}
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', self.ROOM_BROADCAST_PORT))
sock.settimeout(1.0)
def discovery():
start_time = time.time()
while time.time() - start_time < 5 and self.running:
try:
data, addr = sock.recvfrom(1024)
info = json.loads(data.decode('utf-8'))
if time.time() - info['timestamp'] < self.ROOM_TIMEOUT:
self.discovered_rooms[f"{info['ip']}:{info['port']}"] = info
except socket.timeout:
continue
except Exception:
break
self.discovery_thread = threading.Thread(target=discovery, daemon=True)
self.discovery_thread.start()
self.discovery_thread.join()
sock.close()
self.clear_screen()
print(f"{Color.C}{'='*40}{Color.R}")
print(f"{Color.C}{'[ 查找结果 ]':^40}{Color.R}")
print(f"{Color.C}{'='*40}{Color.R}")
if not self.discovered_rooms:
print(f"{Color.C}[系统提示] 未找到任何房间{Color.R}")
print(f"{Color.C}{'-'*40}{Color.R}")
input(f"{Color.C}[系统提示] 按回车键返回主菜单{Color.R}")
return
print(f"{Color.C}{'序号':<5}{'房间地址':<20}{'房主':<10}{'在线人数'}{'是否锁定'}{Color.R}")
print(f"{Color.C}{'-'*40}{Color.R}")
rooms = list(self.discovered_rooms.values())
for i, room in enumerate(rooms, 1):
addr = f"{room['ip']}:{room['port']}"
locked = "" if room['locked'] else ""
print(f"{Color.C}{i:<5}{addr:<20}{room['host']:<10}{room['online']:<8}{locked}{Color.R}")
print(f"{Color.C}{'-'*40}{Color.R}")
while True:
choice = input(f"{Color.C}[请选择房间序号加入 0返回主菜单]{Color.R}".strip())
if choice == "0":
return
if choice.isdigit() and 1 <= int(choice) <= len(rooms):
room = rooms[int(choice)-1]
self.room_ip = room['ip']
self.room_port = room['port']
self.room_locked = room['locked']
self.join_room()
break
print(f"{Color.C}[系统提示] 无效选择 请输入正确序号{Color.R}")
# 创建房间
def create_room(self):
self.clear_screen()
print(f"{Color.C}{'='*40}{Color.R}")
print(f"{Color.C}{'[ 创建房间 ]':^40}{Color.R}")
print(f"{Color.C}{'='*40}{Color.R}")
self.is_host = True
self.room_ip = self.get_local_ip()
while True:
port_input = input(f"{Color.C}[请输入端口号 1024-65535 默认8888]{Color.R}".strip())
self.room_port = 8888 if not port_input else int(port_input)
if 1024 <= self.room_port <= 65535:
break
print(f"{Color.C}[系统提示] 端口号无效 需在1024-65535之间{Color.R}")
while True:
lock_choice = input(f"{Color.C}[是否开启房间锁 1=开启 0=关闭 默认0]:{Color.R}".strip())
lock_choice = lock_choice if lock_choice else "0"
if lock_choice in ["0", "1"]:
self.room_locked = (lock_choice == "1")
break
print(f"{Color.C}[系统提示] 无效选择 请输入0或1{Color.R}")
if self.room_locked:
while True:
pwd = input(f"{Color.C}[请设置房间密码 4-8位字符]{Color.R}".strip())
if 4 <= len(pwd) <= 8 and not ' ' in pwd:
self.room_password = pwd
break
print(f"{Color.C}[系统提示] 密码无效 需4-8位字符且不含空格{Color.R}")
print(f"{Color.C}{'-'*40}{Color.R}")
print(f"{Color.C}[系统提示] 正在创建房间{Color.R}")
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server.bind((self.room_ip, self.room_port))
server.listen(5)
self.connected = True
self.broadcast_thread = threading.Thread(target=self.broadcast_room_info, daemon=True)
self.broadcast_thread.start()
print(f"{Color.C}[系统提示] 房间创建成功{Color.R}")
print(f"{Color.C}[房间地址] {self.room_ip}:{self.room_port}{Color.R}")
print(f"{Color.C}[房间状态] {'已锁定 需密码加入' if self.room_locked else '未锁定 公开加入'}{Color.R}")
print(f"{Color.C}[操作提示] 输入/房主菜单 查看专属功能{Color.R}")
print(f"{Color.Y}[系统提示] {self.username} 创建并加入房间{Color.R}")
print(f"{Color.C}{'-'*40}{Color.R}")
threading.Thread(target=self.accept_clients, args=(server,), daemon=True).start()
self.client_chat_loop()
except PermissionError:
print(f"{Color.C}[系统提示] 创建房间失败 该端口已被占用 请更换端口{Color.R}")
self.is_host = False
time.sleep(2)
except Exception as e:
print(f"{Color.C}[系统提示] 创建房间失败 原因:{str(e)}{Color.R}")
self.is_host = False
time.sleep(2)
# 手动加入房间
def join_room_manual(self):
self.clear_screen()
print(f"{Color.C}{'='*40}{Color.R}")
print(f"{Color.C}{'[ 手动加入 ]':^40}{Color.R}")
print(f"{Color.C}{'='*40}{Color.R}")
self.room_ip = input(f"{Color.C}[请输入房间IP]{Color.R}".strip())
while True:
port_input = input(f"{Color.C}[请输入房间端口 默认8888]{Color.R}".strip())
self.room_port = 8888 if not port_input else int(port_input)
if 1024 <= self.room_port <= 65535:
break
print(f"{Color.C}[系统提示] 端口号无效 需在1024-65535之间{Color.R}")
self.room_locked = False
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
test_socket.settimeout(2)
try:
test_socket.connect((self.room_ip, self.room_port))
test_socket.send(b"CHECK_LOCK")
lock_response = test_socket.recv(1024).decode('utf-8')
if lock_response == "LOCKED":
self.room_locked = True
test_socket.close()
except Exception:
test_socket.close()
self.room_password = ""
if self.room_locked:
self.room_password = input(f"{Color.C}[该房间已锁定 请输入密码]{Color.R}".strip())
print(f"{Color.C}{'-'*40}{Color.R}")
self.join_room()
# 加入房间处理
def join_room(self):
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.client_socket.connect((self.room_ip, self.room_port))
if self.room_locked:
self.client_socket.send(f"JOIN_WITH_PWD:{self.username}:{self.room_password}".encode('utf-8'))
else:
self.client_socket.send(f"JOIN:{self.username}".encode('utf-8'))
response = self.client_socket.recv(1024).decode('utf-8')
if response.startswith("SUCCESS"):
self.connected = True
print(f"{Color.Y}[系统提示] 成功加入房间{Color.R}")
print(f"{Color.C}{'-'*40}{Color.R}")
threading.Thread(target=self.receive_messages, daemon=True).start()
self.client_chat_loop()
elif response.startswith("FAIL:用户名已存在"):
print(f"{Color.C}[系统提示] 加入失败 该用户名已被占用 请重新注册{Color.R}")
self.client_socket.close()
self.register()
time.sleep(2)
elif response == "FAIL:密码错误":
print(f"{Color.C}[系统提示] 加入失败 房间密码错误{Color.R}")
self.client_socket.close()
time.sleep(2)
else:
print(f"{Color.C}[系统提示] 加入失败 原因:{response.split(':', 1)[1] if ':' in response else response}{Color.R}")
self.client_socket.close()
time.sleep(2)
except ConnectionRefusedError:
print(f"{Color.C}[系统提示] 加入失败 目标房间未开启或地址错误{Color.R}")
self.client_socket.close()
time.sleep(2)
except socket.timeout:
print(f"{Color.C}[系统提示] 加入失败 连接超时{Color.R}")
self.client_socket.close()
time.sleep(2)
except Exception as e:
print(f"{Color.C}[系统提示] 加入失败 原因:{str(e)}{Color.R}")
self.client_socket.close()
time.sleep(2)
# 接收客户端连接
def accept_clients(self, server):
while self.connected and self.is_host:
try:
client_socket, addr = server.accept()
data = client_socket.recv(1024).decode('utf-8')
if data == "CHECK_LOCK":
client_socket.send(b"LOCKED" if self.room_locked else b"UNLOCKED")
client_socket.close()
continue
username = ""
password = ""
if data.startswith("JOIN_WITH_PWD:"):
parts = data.split(':', 2)
if len(parts) != 3:
client_socket.send("FAIL:无效请求".encode('utf-8'))
client_socket.close()
continue
username = parts[1]
password = parts[2]
if password != self.room_password:
client_socket.send("FAIL:密码错误".encode('utf-8'))
client_socket.close()
continue
elif data.startswith("JOIN:"):
username = data.split(':', 1)[1]
else:
client_socket.send("FAIL:无效请求".encode('utf-8'))
client_socket.close()
continue
if username in self.clients or username == self.username:
client_socket.send("FAIL:用户名已存在".encode('utf-8'))
client_socket.close()
continue
client_socket.send("SUCCESS:加入成功".encode('utf-8'))
self.clients[username] = client_socket
print(f"{Color.Y}[系统提示] {username} 加入房间{Color.R}")
self.broadcast(f"{Color.Y}[系统提示] {username} 加入房间{Color.R}", exclude=None)
threading.Thread(target=self.handle_client, args=(client_socket, username), daemon=True).start()
except Exception:
break
# 客户端消息处理
def handle_client(self, client_socket, username):
while self.connected and username in self.clients:
try:
data = client_socket.recv(1024).decode('utf-8')
if not data:
self.remove_client(username)
break
if data == "/退出房间":
self.remove_client(username)
break
if self.global_mute or username in self.muted_users:
client_socket.send(f"{Color.C}[系统提示] 你已被禁言 无法发送消息{Color.R}".encode('utf-8'))
continue
if data.startswith("/"):
self.handle_command(data, username, client_socket)
else:
self.process_message(username, data)
except Exception:
self.remove_client(username)
break
# 消息处理
def process_message(self, username, content):
message = f"{Color.C}[{username}]{Color.R}: {self.user_color}{content}{Color.R}"
print(message)
if "@全体" in content:
at_msg = f"{Color.R_}[提醒] {Color.C}[{username}]{Color.R} @全体成员:{self.user_color}{content}{Color.R}"
self.broadcast(at_msg, exclude=username)
else:
at_users = [user for user in self.clients if f"@{user}" in content]
if self.username in content:
at_users.append(self.username)
self.broadcast(message, exclude=username)
for user in at_users:
at_msg = f"{Color.R_}[提醒] {Color.C}[{user}]{Color.R} 你被@了:{self.user_color}{content}{Color.R}"
if user == self.username:
print(at_msg)
else:
try:
self.clients[user].send(at_msg.encode('utf-8'))
except Exception:
pass
# 消息广播
def broadcast(self, message, exclude):
if self.is_host:
for username, sock in self.clients.items():
if username != exclude:
try:
sock.send(message.encode('utf-8'))
except Exception:
self.remove_client(username)
else:
try:
self.client_socket.send(message.encode('utf-8'))
except Exception:
self.connected = False
# 移除客户端
def remove_client(self, username):
if username in self.clients:
try:
self.clients[username].send(f"{Color.C}[系统提示] 你已被移出房间{Color.R}".encode('utf-8'))
except Exception:
pass
self.clients[username].close()
del self.clients[username]
if username in self.muted_users:
self.muted_users.remove(username)
print(f"{Color.Y}[系统提示] {username} 退出房间{Color.R}")
self.broadcast(f"{Color.Y}[系统提示] {username} 退出房间{Color.R}", exclude=None)
# 接收消息
def receive_messages(self):
while self.connected:
try:
data = self.client_socket.recv(1024).decode('utf-8')
if not data:
print(f"{Color.C}[系统提示] 与房间断开连接{Color.R}")
self.connected = False
break
print(data)
except Exception:
print(f"{Color.C}[系统提示] 接收消息失败 已断开连接{Color.R}")
self.connected = False
break
# 聊天循环
def client_chat_loop(self):
while self.connected:
message = input().strip()
if message == "/清屏":
self.clear_screen()
elif message == "/退出脚本":
self.exit_script()
elif message == "/退出房间":
self.exit_room()
elif message == "/更换聊天颜色":
self.change_color()
elif message == "/房主菜单" and self.is_host:
self.show_host_menu()
elif message.startswith("/"):
if self.is_host:
self.handle_command(message, self.username, None)
else:
print(f"{Color.C}[系统提示] 无效指令{Color.R}")
else:
if self.is_host:
self.process_message(self.username, message)
else:
if message:
self.client_socket.send(message.encode('utf-8'))
self.process_message(self.username, message)
# 指令处理
def handle_command(self, cmd, username, client_socket):
if not self.is_host:
if client_socket:
client_socket.send(f"{Color.C}[系统提示] 无权限使用该指令{Color.R}".encode('utf-8'))
else:
print(f"{Color.C}[系统提示] 无权限使用该指令{Color.R}")
return
if cmd == "/房主菜单":
self.show_host_menu()
elif cmd.startswith("/踢人 "):
parts = cmd.split(' ', 1)
if len(parts) < 2 or not parts[1].strip():
print(f"{Color.C}[系统提示] 请输入要踢的用户名 格式:/踢人 用户名{Color.R}")
return
target = parts[1].strip()
if target == self.username:
print(f"{Color.C}[系统提示] 无法踢自己{Color.R}")
elif target in self.clients:
self.remove_client(target)
print(f"{Color.C}[系统提示] 已将{target} 踢出房间{Color.R}")
else:
print(f"{Color.C}[系统提示] 该用户不在房间内{Color.R}")
elif cmd.startswith("/禁言 "):
parts = cmd.split(' ', 1)
if len(parts) < 2 or not parts[1].strip():
print(f"{Color.C}[系统提示] 请输入要禁言的对象 格式:/禁言 用户名/全体{Color.R}")
return
target = parts[1].strip()
if target == "全体":
self.global_mute = True
print(f"{Color.C}[系统提示] 已开启全体禁言{Color.R}")
self.broadcast(f"{Color.C}[系统提示] 房主已开启全体禁言{Color.R}", exclude=None)
elif target == self.username:
print(f"{Color.C}[系统提示] 无法禁言自己{Color.R}")
elif target in self.clients:
self.muted_users.add(target)
print(f"{Color.C}[系统提示] 已禁言{target}{Color.R}")
try:
self.clients[target].send(f"{Color.C}[系统提示] 你已被房主禁言{Color.R}".encode('utf-8'))
except Exception:
pass
else:
print(f"{Color.C}[系统提示] 该用户不在房间内{Color.R}")
elif cmd.startswith("/解禁 "):
parts = cmd.split(' ', 1)
if len(parts) < 2 or not parts[1].strip():
print(f"{Color.C}[系统提示] 请输入要解禁的对象 格式:/解禁 用户名/全体{Color.R}")
return
target = parts[1].strip()
if target == "全体":
self.global_mute = False
self.muted_users.clear()
print(f"{Color.C}[系统提示] 已解除全体禁言{Color.R}")
self.broadcast(f"{Color.C}[系统提示] 房主已解除全体禁言{Color.R}", exclude=None)
elif target in self.muted_users:
self.muted_users.remove(target)
print(f"{Color.C}[系统提示] 已解禁{target}{Color.R}")
try:
self.clients[target].send(f"{Color.C}[系统提示] 你已被房主解禁{Color.R}".encode('utf-8'))
except Exception:
pass
else:
print(f"{Color.C}[系统提示] 该用户未被禁言或不在房间内{Color.R}")
else:
if client_socket:
client_socket.send(f"{Color.C}[系统提示] 无效指令{Color.R}".encode('utf-8'))
else:
print(f"{Color.C}[系统提示] 无效指令{Color.R}")
# 房主菜单
def show_host_menu(self):
self.clear_screen()
print(f"{Color.C}{'='*50}{Color.R}")
print(f"{Color.C}{'[ 房主专属菜单 ]':^50}{Color.R}")
print(f"{Color.C}{'='*50}{Color.R}")
print(f"{Color.C}{'1. 踢人功能':<20}{'格式:/踢人 用户名':<30}{Color.R}")
print(f"{Color.C}{'2. 禁言功能':<20}{'格式:/禁言 用户名/全体':<30}{Color.R}")
print(f"{Color.C}{'3. 解禁功能':<20}{'格式:/解禁 用户名/全体':<30}{Color.R}")
print(f"{Color.C}{'4. 房间锁管理':<20}{'开启/关闭房间锁':<30}{Color.R}")
print(f"{Color.C}{'5. 查看在线用户':<20}{'显示当前房间所有成员':<30}{Color.R}")
print(f"{Color.C}{'6. 解散房间':<20}{'强制关闭房间 全员退出':<30}{Color.R}")
print(f"{Color.C}{'7. 关闭菜单':<20}{'返回聊天界面':<30}{Color.R}")
print(f"{Color.C}{'='*50}{Color.R}")
choice = input(f"{Color.C}[请选择功能(1-7)]{Color.R}".strip())
if choice == "1":
target = input(f"{Color.C}[请输入要踢的用户名]{Color.R}".strip())
if not target:
print(f"{Color.C}[系统提示] 用户名不能为空{Color.R}")
elif target == self.username:
print(f"{Color.C}[系统提示] 无法踢自己{Color.R}")
elif target in self.clients:
self.remove_client(target)
print(f"{Color.C}[系统提示] 已将{target} 踢出房间{Color.R}")
else:
print(f"{Color.C}[系统提示] 该用户不在房间内{Color.R}")
time.sleep(1)
elif choice == "2":
target = input(f"{Color.C}[请输入要禁言的用户名 输入全体开启全体禁言]:{Color.R}".strip())
if not target:
print(f"{Color.C}[系统提示] 请输入有效对象{Color.R}")
elif target == "全体":
self.global_mute = True
print(f"{Color.C}[系统提示] 已开启全体禁言{Color.R}")
self.broadcast(f"{Color.C}[系统提示] 房主已开启全体禁言{Color.R}", exclude=None)
elif target == self.username:
print(f"{Color.C}[系统提示] 无法禁言自己{Color.R}")
elif target in self.clients:
self.muted_users.add(target)
print(f"{Color.C}[系统提示] 已禁言{target}{Color.R}")
try:
self.clients[target].send(f"{Color.C}[系统提示] 你已被房主禁言{Color.R}".encode('utf-8'))
except Exception:
pass
else:
print(f"{Color.C}[系统提示] 该用户不在房间内{Color.R}")
time.sleep(1)
elif choice == "3":
target = input(f"{Color.C}[请输入要解禁的用户名 输入全体解除全体禁言]:{Color.R}".strip())
if not target:
print(f"{Color.C}[系统提示] 请输入有效对象{Color.R}")
elif target == "全体":
self.global_mute = False
self.muted_users.clear()
print(f"{Color.C}[系统提示] 已解除全体禁言{Color.R}")
self.broadcast(f"{Color.C}[系统提示] 房主已解除全体禁言{Color.R}", exclude=None)
elif target in self.muted_users:
self.muted_users.remove(target)
print(f"{Color.C}[系统提示] 已解禁{target}{Color.R}")
try:
self.clients[target].send(f"{Color.C}[系统提示] 你已被房主解禁{Color.R}".encode('utf-8'))
except Exception:
pass
else:
print(f"{Color.C}[系统提示] 该用户未被禁言或不在房间内{Color.R}")
time.sleep(1)
elif choice == "4":
print(f"{Color.C}{'-'*50}{Color.R}")
print(f"{Color.C}[当前房间锁状态] {'已开启' if self.room_locked else '已关闭'}{Color.R}")
while True:
lock_choice = input(f"{Color.C}[请选择 1=开启 0=关闭]{Color.R}".strip())
if lock_choice in ["0", "1"]:
new_locked = (lock_choice == "1")
break
print(f"{Color.C}[系统提示] 无效选择 请输入0或1{Color.R}")
if new_locked == self.room_locked:
print(f"{Color.C}[系统提示] 房间锁状态未改变{Color.R}")
else:
if new_locked:
while True:
pwd = input(f"{Color.C}[请设置房间密码 4-8位字符]{Color.R}".strip())
if 4 <= len(pwd) <= 8 and not ' ' in pwd:
self.room_password = pwd
break
print(f"{Color.C}[系统提示] 密码无效 需4-8位字符且不含空格{Color.R}")
self.room_locked = new_locked
print(f"{Color.C}[系统提示] 房间锁状态已更新为{'开启' if new_locked else '关闭'}{Color.R}")
self.broadcast(f"{Color.C}[系统提示] 房主已{'开启' if new_locked else '关闭'}房间锁{Color.R}", exclude=None)
print(f"{Color.C}{'-'*50}{Color.R}")
input(f"{Color.C}[系统提示] 按回车键返回{Color.R}")
elif choice == "5":
print(f"{Color.C}{'-'*50}{Color.R}")
print(f"{Color.C}[在线用户统计] 共{len(self.clients)+1}{Color.R}")
print(f"{Color.C}[房主] {self.username}{Color.R}")
if self.clients:
muted_list = [user for user in self.clients if user in self.muted_users]
normal_list = [user for user in self.clients if user not in self.muted_users]
print(f"{Color.C}[成员] {' | '.join(normal_list)}{Color.R}")
if muted_list:
print(f"{Color.C}[禁言成员] {' | '.join(muted_list)}{Color.R}")
else:
print(f"{Color.C}[成员] 暂无其他成员{Color.R}")
print(f"{Color.C}[全体禁言状态] {'开启' if self.global_mute else '关闭'}{Color.R}")
print(f"{Color.C}{'-'*50}{Color.R}")
input(f"{Color.C}[系统提示] 按回车键返回{Color.R}")
elif choice == "6":
print(f"{Color.C}{'-'*50}{Color.R}")
confirm = input(f"{Color.C}[确认解散房间 y/n]{Color.R}".strip())
if confirm.lower() == "y":
self.dismiss_room()
elif choice != "7":
print(f"{Color.C}[系统提示] 无效选择{Color.R}")
time.sleep(1)
# 更换聊天颜色
def change_color(self):
print(f"{Color.C}{'='*40}{Color.R}")
print(f"{Color.C}{'[ 更换聊天颜色 ]':^40}{Color.R}")
print(f"{Color.C}{'='*40}{Color.R}")
colors = [Color.Y, '\033[91m', '\033[92m', '\033[95m', '\033[94m']
color_names = ['黄色', '红色', '绿色', '紫色', '蓝色']
for i, name in enumerate(color_names, 1):
print(f"{Color.C}{i}. {colors[i-1]}{name}{Color.R}")
print(f"{Color.C}{'-'*40}{Color.R}")
choice = input(f"{Color.C}[请选择颜色(1-5)]{Color.R}".strip())
if choice.isdigit() and 1 <= int(choice) <= 5:
self.user_color = colors[int(choice)-1]
print(f"{Color.C}[系统提示] 聊天颜色已更换为{color_names[int(choice)-1]}{Color.R}")
else:
print(f"{Color.C}[系统提示] 无效选择 颜色未更改{Color.R}")
time.sleep(1)
# 解散房间
def dismiss_room(self):
print(f"{Color.C}[系统提示] 正在解散房间{Color.R}")
self.connected = False
for sock in self.clients.values():
try:
sock.send(f"{Color.C}[系统提示] 房主已解散房间{Color.R}".encode('utf-8'))
sock.close()
except Exception:
pass
self.clients.clear()
self.muted_users.clear()
self.global_mute = False
print(f"{Color.Y}[系统提示] 房间已解散{Color.R}")
time.sleep(2)
self.is_host = False
self.room_ip = ""
self.room_port = 0
self.room_locked = False
self.room_password = ""
self.show_main_menu()
# 退出房间
def exit_room(self):
print(f"{Color.C}[系统提示] 正在退出房间{Color.R}")
self.connected = False
if self.is_host:
for sock in self.clients.values():
try:
sock.send(f"{Color.C}[系统提示] 房主已退出房间{Color.R}".encode('utf-8'))
sock.close()
except Exception:
pass
self.clients.clear()
self.muted_users.clear()
self.global_mute = False
else:
try:
self.client_socket.send("/退出房间".encode('utf-8'))
self.client_socket.close()
except Exception:
pass
print(f"{Color.Y}[系统提示] 已退出房间{Color.R}")
self.is_host = False
self.room_ip = ""
self.room_port = 0
self.room_locked = False
self.room_password = ""
time.sleep(1)
self.show_main_menu()
# 退出脚本
def exit_script(self):
print(f"{Color.C}[系统提示] 正在退出脚本{Color.R}")
self.running = False
self.connected = False
if self.client_socket:
try:
self.client_socket.send("/退出脚本".encode('utf-8'))
self.client_socket.close()
except Exception:
pass
if self.is_host:
for sock in self.clients.values():
try:
sock.send(f"{Color.C}[系统提示] 房主已退出 脚本关闭{Color.R}".encode('utf-8'))
sock.close()
except Exception:
pass
self.clients.clear()
self.muted_users.clear()
self.global_mute = False
sys.exit(0)
if __name__ == "__main__":
chat = ChatRoom()
chat.register()
chat.show_main_menu()