forked from niannian1145/python-chat-box
删除 Python聊天工具.py
This commit is contained in:
-727
@@ -1,727 +0,0 @@
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
|
||||
class Color:
|
||||
R = '\033[0m'
|
||||
C = '\033[96m'
|
||||
Y = '\033[93m'
|
||||
R_ = '\033[91m'
|
||||
G = '\033[92m'
|
||||
|
||||
class ChatRoom:
|
||||
ROOM_BROADCAST_PORT = 5000
|
||||
ROOM_BROADCAST_INTERVAL = 3
|
||||
ROOM_TIMEOUT = 10
|
||||
|
||||
def __init__(self):
|
||||
self.username = ""
|
||||
self.client_socket = None
|
||||
self.is_host = False
|
||||
self.room_ip = ""
|
||||
self.room_port = 0
|
||||
self.user_color = Color.Y
|
||||
self.running = True
|
||||
self.connected = False
|
||||
self.clients = {}
|
||||
self.room_info = {}
|
||||
self.discovered_rooms = {}
|
||||
self.broadcast_thread = None
|
||||
self.discovery_thread = None
|
||||
self.room_locked = False
|
||||
self.room_password = ""
|
||||
self.muted_users = set()
|
||||
self.global_mute = False
|
||||
|
||||
def register(self):
|
||||
os.system('cls' if os.name == 'nt' else 'clear')
|
||||
while True:
|
||||
name = input(f"{Color.C}[系统提示] 请输入用户名:{Color.R}".strip())
|
||||
if name and not ' ' in name and len(name) <= 10:
|
||||
self.username = name
|
||||
break
|
||||
print(f"{Color.C}[系统提示] 用户名无效!需1-10个字符且不含空格{Color.R}")
|
||||
|
||||
def clear_screen(self):
|
||||
os.system('cls' if os.name == 'nt' else 'clear')
|
||||
|
||||
def show_main_menu(self):
|
||||
while self.running and not self.connected:
|
||||
self.clear_screen()
|
||||
print(f"{Color.C}{'='*40}{Color.R}")
|
||||
print(f"{Color.C}{'[Python简易聊天工具]':^40}{Color.R}")
|
||||
print(f"{Color.C}{'='*40}{Color.R}")
|
||||
print(f"{Color.C}{'1. 创建房间':<38}{'[ 创建属于你的房间 ]'}{Color.R}")
|
||||
print(f"{Color.C}{'2. 加入房间':<38}{'[ 加入你想要的房间 ]'}{Color.R}")
|
||||
print(f"{Color.C}{'3. 查找房间':<38}{'[ 自动搜索房间 ]'}{Color.R}")
|
||||
print(f"{Color.C}{'4. 退出脚本':<38}{'[ 关闭Python脚本 ]'}{Color.R}")
|
||||
print(f"{Color.C}{'='*40}{Color.R}")
|
||||
choice = input(f"{Color.C}[请选择功能(1-4)]:{Color.R}".strip())
|
||||
if choice == "1":
|
||||
self.create_room()
|
||||
elif choice == "2":
|
||||
self.join_room_manual()
|
||||
elif choice == "3":
|
||||
self.discover_rooms()
|
||||
elif choice == "4":
|
||||
self.exit_script()
|
||||
else:
|
||||
print(f"{Color.C}[系统提示] 无效选择,请输入1-4之间的数字{Color.R}")
|
||||
time.sleep(1)
|
||||
|
||||
def get_local_ip(self):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
try:
|
||||
s.connect(('8.8.8.8', 80))
|
||||
ip = s.getsockname()[0]
|
||||
except Exception:
|
||||
ip = '127.0.0.1'
|
||||
finally:
|
||||
s.close()
|
||||
return ip
|
||||
|
||||
def broadcast_room_info(self):
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
while self.connected and self.is_host:
|
||||
try:
|
||||
room_info = json.dumps({
|
||||
'ip': self.room_ip,
|
||||
'port': self.room_port,
|
||||
'host': self.username,
|
||||
'online': len(self.clients) + 1,
|
||||
'locked': self.room_locked,
|
||||
'timestamp': time.time()
|
||||
})
|
||||
sock.sendto(room_info.encode('utf-8'), ('<broadcast>', self.ROOM_BROADCAST_PORT))
|
||||
time.sleep(self.ROOM_BROADCAST_INTERVAL)
|
||||
except Exception:
|
||||
break
|
||||
sock.close()
|
||||
|
||||
def discover_rooms(self):
|
||||
self.clear_screen()
|
||||
print(f"{Color.C}{'='*40}{Color.R}")
|
||||
print(f"{Color.C}{'[ 房间查找 ]':^40}{Color.R}")
|
||||
print(f"{Color.C}{'='*40}{Color.R}")
|
||||
print(f"{Color.C}[系统提示] 正在局域网内查找房间...(按Ctrl+C停止){Color.R}")
|
||||
print(f"{Color.C}{'-'*40}{Color.R}")
|
||||
self.discovered_rooms = {}
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
sock.bind(('', self.ROOM_BROADCAST_PORT))
|
||||
sock.settimeout(1.0)
|
||||
|
||||
def discovery():
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < 5 and self.running:
|
||||
try:
|
||||
data, addr = sock.recvfrom(1024)
|
||||
info = json.loads(data.decode('utf-8'))
|
||||
if time.time() - info['timestamp'] < self.ROOM_TIMEOUT:
|
||||
self.discovered_rooms[f"{info['ip']}:{info['port']}"] = info
|
||||
except socket.timeout:
|
||||
continue
|
||||
except Exception:
|
||||
break
|
||||
|
||||
self.discovery_thread = threading.Thread(target=discovery, daemon=True)
|
||||
self.discovery_thread.start()
|
||||
self.discovery_thread.join()
|
||||
sock.close()
|
||||
|
||||
self.clear_screen()
|
||||
print(f"{Color.C}{'='*40}{Color.R}")
|
||||
print(f"{Color.C}{'[ 查找结果 ]':^40}{Color.R}")
|
||||
print(f"{Color.C}{'='*40}{Color.R}")
|
||||
if not self.discovered_rooms:
|
||||
print(f"{Color.C}[系统提示] 未找到任何房间{Color.R}")
|
||||
print(f"{Color.C}{'-'*40}{Color.R}")
|
||||
input(f"{Color.C}[系统提示] 按回车键返回主菜单...{Color.R}")
|
||||
return
|
||||
print(f"{Color.C}{'序号':<5}{'房间地址':<20}{'房主':<10}{'在线人数'}{'是否锁定'}{Color.R}")
|
||||
print(f"{Color.C}{'-'*40}{Color.R}")
|
||||
rooms = list(self.discovered_rooms.values())
|
||||
for i, room in enumerate(rooms, 1):
|
||||
addr = f"{room['ip']}:{room['port']}"
|
||||
locked = "是" if room['locked'] else "否"
|
||||
print(f"{Color.C}{i:<5}{addr:<20}{room['host']:<10}{room['online']:<8}{locked}{Color.R}")
|
||||
print(f"{Color.C}{'-'*40}{Color.R}")
|
||||
while True:
|
||||
choice = input(f"{Color.C}[请选择房间序号加入(0返回主菜单)]:{Color.R}".strip())
|
||||
if choice == "0":
|
||||
return
|
||||
if choice.isdigit() and 1 <= int(choice) <= len(rooms):
|
||||
room = rooms[int(choice)-1]
|
||||
self.room_ip = room['ip']
|
||||
self.room_port = room['port']
|
||||
self.room_locked = room['locked']
|
||||
self.join_room()
|
||||
break
|
||||
print(f"{Color.C}[系统提示] 无效选择,请输入正确的序号{Color.R}")
|
||||
|
||||
def create_room(self):
|
||||
self.clear_screen()
|
||||
print(f"{Color.C}{'='*40}{Color.R}")
|
||||
print(f"{Color.C}{'[ 创建房间 ]':^40}{Color.R}")
|
||||
print(f"{Color.C}{'='*40}{Color.R}")
|
||||
self.is_host = True
|
||||
self.room_ip = self.get_local_ip()
|
||||
|
||||
while True:
|
||||
port_input = input(f"{Color.C}[请输入端口号(1024-65535,默认8888)]:{Color.R}".strip())
|
||||
self.room_port = 8888 if not port_input else int(port_input)
|
||||
if 1024 <= self.room_port <= 65535:
|
||||
break
|
||||
print(f"{Color.C}[系统提示] 端口号无效!需在1024-65535之间{Color.R}")
|
||||
|
||||
while True:
|
||||
lock_choice = input(f"{Color.C}[是否开启房间锁(1=开启,0=关闭,默认0)]:{Color.R}".strip())
|
||||
lock_choice = lock_choice if lock_choice else "0"
|
||||
if lock_choice in ["0", "1"]:
|
||||
self.room_locked = (lock_choice == "1")
|
||||
break
|
||||
print(f"{Color.C}[系统提示] 无效选择!请输入0或1{Color.R}")
|
||||
|
||||
if self.room_locked:
|
||||
while True:
|
||||
pwd = input(f"{Color.C}[请设置房间密码(4-8位字符)]:{Color.R}".strip())
|
||||
if 4 <= len(pwd) <= 8 and not ' ' in pwd:
|
||||
self.room_password = pwd
|
||||
break
|
||||
print(f"{Color.C}[系统提示] 密码无效!需4-8位字符且不含空格{Color.R}")
|
||||
|
||||
print(f"{Color.C}{'-'*40}{Color.R}")
|
||||
print(f"{Color.C}[系统提示] 正在创建房间...{Color.R}")
|
||||
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
try:
|
||||
server.bind((self.room_ip, self.room_port))
|
||||
server.listen(5)
|
||||
self.connected = True
|
||||
self.broadcast_thread = threading.Thread(target=self.broadcast_room_info, daemon=True)
|
||||
self.broadcast_thread.start()
|
||||
print(f"{Color.C}[系统提示] 房间创建成功!{Color.R}")
|
||||
print(f"{Color.C}[房间地址] {self.room_ip}:{self.room_port}{Color.R}")
|
||||
print(f"{Color.C}[房间状态] {'已锁定(需密码加入)' if self.room_locked else '未锁定(公开加入)'}{Color.R}")
|
||||
print(f"{Color.C}[操作提示] 输入/房主菜单 查看专属功能{Color.R}")
|
||||
print(f"{Color.Y}[系统提示] {self.username} 创建并加入了房间{Color.R}")
|
||||
print(f"{Color.C}{'-'*40}{Color.R}")
|
||||
threading.Thread(target=self.accept_clients, args=(server,), daemon=True).start()
|
||||
self.client_chat_loop()
|
||||
except PermissionError:
|
||||
print(f"{Color.C}[系统提示] 创建房间失败!该端口已被占用,请更换端口{Color.R}")
|
||||
self.is_host = False
|
||||
time.sleep(2)
|
||||
except Exception as e:
|
||||
print(f"{Color.C}[系统提示] 创建房间失败!原因:{str(e)}{Color.R}")
|
||||
self.is_host = False
|
||||
time.sleep(2)
|
||||
|
||||
def join_room_manual(self):
|
||||
self.clear_screen()
|
||||
print(f"{Color.C}{'='*40}{Color.R}")
|
||||
print(f"{Color.C}{'[ 手动加入 ]':^40}{Color.R}")
|
||||
print(f"{Color.C}{'='*40}{Color.R}")
|
||||
self.room_ip = input(f"{Color.C}[请输入房间IP]:{Color.R}".strip())
|
||||
while True:
|
||||
port_input = input(f"{Color.C}[请输入房间端口(默认8888)]:{Color.R}".strip())
|
||||
self.room_port = 8888 if not port_input else int(port_input)
|
||||
if 1024 <= self.room_port <= 65535:
|
||||
break
|
||||
print(f"{Color.C}[系统提示] 端口号无效!需在1024-65535之间{Color.R}")
|
||||
|
||||
self.room_locked = False
|
||||
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
test_socket.settimeout(2)
|
||||
try:
|
||||
test_socket.connect((self.room_ip, self.room_port))
|
||||
test_socket.send(b"CHECK_LOCK")
|
||||
lock_response = test_socket.recv(1024).decode('utf-8')
|
||||
if lock_response == "LOCKED":
|
||||
self.room_locked = True
|
||||
test_socket.close()
|
||||
except Exception:
|
||||
test_socket.close()
|
||||
|
||||
self.room_password = ""
|
||||
if self.room_locked:
|
||||
self.room_password = input(f"{Color.C}[该房间已锁定,请输入密码]:{Color.R}".strip())
|
||||
|
||||
print(f"{Color.C}{'-'*40}{Color.R}")
|
||||
self.join_room()
|
||||
|
||||
def join_room(self):
|
||||
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
try:
|
||||
self.client_socket.connect((self.room_ip, self.room_port))
|
||||
if self.room_locked:
|
||||
self.client_socket.send(f"JOIN_WITH_PWD:{self.username}:{self.room_password}".encode('utf-8'))
|
||||
else:
|
||||
self.client_socket.send(f"JOIN:{self.username}".encode('utf-8'))
|
||||
|
||||
response = self.client_socket.recv(1024).decode('utf-8')
|
||||
if response.startswith("SUCCESS"):
|
||||
self.connected = True
|
||||
print(f"{Color.Y}[系统提示] 成功加入房间!{Color.R}")
|
||||
print(f"{Color.C}{'-'*40}{Color.R}")
|
||||
threading.Thread(target=self.receive_messages, daemon=True).start()
|
||||
self.client_chat_loop()
|
||||
elif response.startswith("FAIL:用户名已存在"):
|
||||
print(f"{Color.C}[系统提示] 加入失败!该用户名已被占用,请重新注册{Color.R}")
|
||||
self.client_socket.close()
|
||||
self.register()
|
||||
time.sleep(2)
|
||||
elif response == "FAIL:密码错误":
|
||||
print(f"{Color.C}[系统提示] 加入失败!房间密码错误{Color.R}")
|
||||
self.client_socket.close()
|
||||
time.sleep(2)
|
||||
else:
|
||||
print(f"{Color.C}[系统提示] 加入失败!原因:{response.split(':', 1)[1] if ':' in response else response}{Color.R}")
|
||||
self.client_socket.close()
|
||||
time.sleep(2)
|
||||
except ConnectionRefusedError:
|
||||
print(f"{Color.C}[系统提示] 加入失败!目标房间未开启或地址错误{Color.R}")
|
||||
self.client_socket.close()
|
||||
time.sleep(2)
|
||||
except socket.timeout:
|
||||
print(f"{Color.C}[系统提示] 加入失败!连接超时{Color.R}")
|
||||
self.client_socket.close()
|
||||
time.sleep(2)
|
||||
except Exception as e:
|
||||
print(f"{Color.C}[系统提示] 加入失败!原因:{str(e)}{Color.R}")
|
||||
self.client_socket.close()
|
||||
time.sleep(2)
|
||||
|
||||
def accept_clients(self, server):
|
||||
while self.connected and self.is_host:
|
||||
try:
|
||||
client_socket, addr = server.accept()
|
||||
data = client_socket.recv(1024).decode('utf-8')
|
||||
if data == "CHECK_LOCK":
|
||||
client_socket.send(b"LOCKED" if self.room_locked else b"UNLOCKED")
|
||||
client_socket.close()
|
||||
continue
|
||||
|
||||
username = ""
|
||||
password = ""
|
||||
if data.startswith("JOIN_WITH_PWD:"):
|
||||
parts = data.split(':', 2)
|
||||
if len(parts) != 3:
|
||||
client_socket.send("FAIL:无效请求".encode('utf-8'))
|
||||
client_socket.close()
|
||||
continue
|
||||
username = parts[1]
|
||||
password = parts[2]
|
||||
if password != self.room_password:
|
||||
client_socket.send("FAIL:密码错误".encode('utf-8'))
|
||||
client_socket.close()
|
||||
continue
|
||||
elif data.startswith("JOIN:"):
|
||||
username = data.split(':', 1)[1]
|
||||
else:
|
||||
client_socket.send("FAIL:无效请求".encode('utf-8'))
|
||||
client_socket.close()
|
||||
continue
|
||||
|
||||
if username in self.clients or username == self.username:
|
||||
client_socket.send("FAIL:用户名已存在".encode('utf-8'))
|
||||
client_socket.close()
|
||||
continue
|
||||
|
||||
client_socket.send("SUCCESS:加入成功".encode('utf-8'))
|
||||
self.clients[username] = client_socket
|
||||
print(f"{Color.Y}[系统提示] {username} 加入了房间{Color.R}")
|
||||
self.broadcast(f"{Color.Y}[系统提示] {username} 加入了房间{Color.R}", exclude=None)
|
||||
threading.Thread(target=self.handle_client, args=(client_socket, username), daemon=True).start()
|
||||
except Exception:
|
||||
break
|
||||
|
||||
def handle_client(self, client_socket, username):
|
||||
while self.connected and username in self.clients:
|
||||
try:
|
||||
data = client_socket.recv(1024).decode('utf-8')
|
||||
if not data:
|
||||
self.remove_client(username)
|
||||
break
|
||||
if data == "/退出房间":
|
||||
self.remove_client(username)
|
||||
break
|
||||
if self.global_mute or username in self.muted_users:
|
||||
client_socket.send(f"{Color.C}[系统提示] 你已被禁言,无法发送消息{Color.R}".encode('utf-8'))
|
||||
continue
|
||||
if data.startswith("/"):
|
||||
self.handle_command(data, username, client_socket)
|
||||
else:
|
||||
self.process_message(username, data)
|
||||
except Exception:
|
||||
self.remove_client(username)
|
||||
break
|
||||
|
||||
def process_message(self, username, content):
|
||||
message = f"{Color.C}[{username}]{Color.R}: {self.user_color}{content}{Color.R}"
|
||||
print(message)
|
||||
|
||||
if "@全体" in content:
|
||||
at_msg = f"{Color.R_}[提醒] {Color.C}[{username}]{Color.R} @全体成员:{self.user_color}{content}{Color.R}"
|
||||
self.broadcast(at_msg, exclude=username)
|
||||
else:
|
||||
at_users = [user for user in self.clients if f"@{user}" in content]
|
||||
if self.username in content:
|
||||
at_users.append(self.username)
|
||||
self.broadcast(message, exclude=username)
|
||||
for user in at_users:
|
||||
at_msg = f"{Color.R_}[提醒] {Color.C}[{user}]{Color.R} 你被 @ 了:{self.user_color}{content}{Color.R}"
|
||||
if user == self.username:
|
||||
print(at_msg)
|
||||
else:
|
||||
try:
|
||||
self.clients[user].send(at_msg.encode('utf-8'))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def broadcast(self, message, exclude):
|
||||
if self.is_host:
|
||||
for username, sock in self.clients.items():
|
||||
if username != exclude:
|
||||
try:
|
||||
sock.send(message.encode('utf-8'))
|
||||
except Exception:
|
||||
self.remove_client(username)
|
||||
else:
|
||||
try:
|
||||
self.client_socket.send(message.encode('utf-8'))
|
||||
except Exception:
|
||||
self.connected = False
|
||||
|
||||
def remove_client(self, username):
|
||||
if username in self.clients:
|
||||
try:
|
||||
self.clients[username].send(f"{Color.C}[系统提示] 你已被移出房间{Color.R}".encode('utf-8'))
|
||||
except Exception:
|
||||
pass
|
||||
self.clients[username].close()
|
||||
del self.clients[username]
|
||||
if username in self.muted_users:
|
||||
self.muted_users.remove(username)
|
||||
print(f"{Color.Y}[系统提示] {username} 退出了房间{Color.R}")
|
||||
self.broadcast(f"{Color.Y}[系统提示] {username} 退出了房间{Color.R}", exclude=None)
|
||||
|
||||
def receive_messages(self):
|
||||
while self.connected:
|
||||
try:
|
||||
data = self.client_socket.recv(1024).decode('utf-8')
|
||||
if not data:
|
||||
print(f"{Color.C}[系统提示] 与房间断开连接{Color.R}")
|
||||
self.connected = False
|
||||
break
|
||||
print(data)
|
||||
except Exception:
|
||||
print(f"{Color.C}[系统提示] 接收消息失败,已断开连接{Color.R}")
|
||||
self.connected = False
|
||||
break
|
||||
|
||||
def client_chat_loop(self):
|
||||
while self.connected:
|
||||
message = input().strip()
|
||||
if message == "/清屏":
|
||||
self.clear_screen()
|
||||
elif message == "/退出脚本":
|
||||
self.exit_script()
|
||||
elif message == "/退出房间":
|
||||
self.exit_room()
|
||||
elif message == "/更换聊天颜色":
|
||||
self.change_color()
|
||||
elif message == "/房主菜单" and self.is_host:
|
||||
self.show_host_menu()
|
||||
elif message.startswith("/"):
|
||||
if self.is_host:
|
||||
self.handle_command(message, self.username, None)
|
||||
else:
|
||||
print(f"{Color.C}[系统提示] 无效指令{Color.R}")
|
||||
else:
|
||||
if self.is_host:
|
||||
self.process_message(self.username, message)
|
||||
else:
|
||||
if message:
|
||||
self.client_socket.send(message.encode('utf-8'))
|
||||
self.process_message(self.username, message)
|
||||
|
||||
def handle_command(self, cmd, username, client_socket):
|
||||
if not self.is_host:
|
||||
if client_socket:
|
||||
client_socket.send(f"{Color.C}[系统提示] 无权限使用该指令{Color.R}".encode('utf-8'))
|
||||
else:
|
||||
print(f"{Color.C}[系统提示] 无权限使用该指令{Color.R}")
|
||||
return
|
||||
|
||||
if cmd == "/房主菜单":
|
||||
self.show_host_menu()
|
||||
elif cmd.startswith("/踢人 "):
|
||||
parts = cmd.split(' ', 1)
|
||||
if len(parts) < 2 or not parts[1].strip():
|
||||
print(f"{Color.C}[系统提示] 请输入要踢的用户名(格式:/踢人 用户名){Color.R}")
|
||||
return
|
||||
target = parts[1].strip()
|
||||
if target == self.username:
|
||||
print(f"{Color.C}[系统提示] 无法踢自己{Color.R}")
|
||||
elif target in self.clients:
|
||||
self.remove_client(target)
|
||||
print(f"{Color.C}[系统提示] 已将{target} 踢出房间{Color.R}")
|
||||
else:
|
||||
print(f"{Color.C}[系统提示] 该用户不在房间内{Color.R}")
|
||||
elif cmd.startswith("/禁言 "):
|
||||
parts = cmd.split(' ', 1)
|
||||
if len(parts) < 2 or not parts[1].strip():
|
||||
print(f"{Color.C}[系统提示] 请输入要禁言的对象(格式:/禁言 用户名/全体){Color.R}")
|
||||
return
|
||||
target = parts[1].strip()
|
||||
if target == "全体":
|
||||
self.global_mute = True
|
||||
print(f"{Color.C}[系统提示] 已开启全体禁言{Color.R}")
|
||||
self.broadcast(f"{Color.C}[系统提示] 房主已开启全体禁言{Color.R}", exclude=None)
|
||||
elif target == self.username:
|
||||
print(f"{Color.C}[系统提示] 无法禁言自己{Color.R}")
|
||||
elif target in self.clients:
|
||||
self.muted_users.add(target)
|
||||
print(f"{Color.C}[系统提示] 已禁言{target}{Color.R}")
|
||||
try:
|
||||
self.clients[target].send(f"{Color.C}[系统提示] 你已被房主禁言{Color.R}".encode('utf-8'))
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
print(f"{Color.C}[系统提示] 该用户不在房间内{Color.R}")
|
||||
elif cmd.startswith("/解禁 "):
|
||||
parts = cmd.split(' ', 1)
|
||||
if len(parts) < 2 or not parts[1].strip():
|
||||
print(f"{Color.C}[系统提示] 请输入要解禁的对象(格式:/解禁 用户名/全体){Color.R}")
|
||||
return
|
||||
target = parts[1].strip()
|
||||
if target == "全体":
|
||||
self.global_mute = False
|
||||
self.muted_users.clear()
|
||||
print(f"{Color.C}[系统提示] 已解除全体禁言{Color.R}")
|
||||
self.broadcast(f"{Color.C}[系统提示] 房主已解除全体禁言{Color.R}", exclude=None)
|
||||
elif target in self.muted_users:
|
||||
self.muted_users.remove(target)
|
||||
print(f"{Color.C}[系统提示] 已解禁{target}{Color.R}")
|
||||
try:
|
||||
self.clients[target].send(f"{Color.C}[系统提示] 你已被房主解禁{Color.R}".encode('utf-8'))
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
print(f"{Color.C}[系统提示] 该用户未被禁言或不在房间内{Color.R}")
|
||||
else:
|
||||
if client_socket:
|
||||
client_socket.send(f"{Color.C}[系统提示] 无效指令{Color.R}".encode('utf-8'))
|
||||
else:
|
||||
print(f"{Color.C}[系统提示] 无效指令{Color.R}")
|
||||
|
||||
def show_host_menu(self):
|
||||
self.clear_screen()
|
||||
print(f"{Color.C}{'='*50}{Color.R}")
|
||||
print(f"{Color.C}{'[ 房主专属菜单 ]':^50}{Color.R}")
|
||||
print(f"{Color.C}{'='*50}{Color.R}")
|
||||
print(f"{Color.C}{'1. 踢人功能':<20}{'格式:/踢人 用户名':<30}{Color.R}")
|
||||
print(f"{Color.C}{'2. 禁言功能':<20}{'格式:/禁言 用户名/全体':<30}{Color.R}")
|
||||
print(f"{Color.C}{'3. 解禁功能':<20}{'格式:/解禁 用户名/全体':<30}{Color.R}")
|
||||
print(f"{Color.C}{'4. 房间锁管理':<20}{'开启/关闭房间锁':<30}{Color.R}")
|
||||
print(f"{Color.C}{'5. 查看在线用户':<20}{'显示当前房间所有成员':<30}{Color.R}")
|
||||
print(f"{Color.C}{'6. 解散房间':<20}{'强制关闭房间,全员退出':<30}{Color.R}")
|
||||
print(f"{Color.C}{'7. 关闭菜单':<20}{'返回聊天界面':<30}{Color.R}")
|
||||
print(f"{Color.C}{'='*50}{Color.R}")
|
||||
choice = input(f"{Color.C}[请选择功能(1-7)]:{Color.R}".strip())
|
||||
if choice == "1":
|
||||
target = input(f"{Color.C}[请输入要踢的用户名]:{Color.R}".strip())
|
||||
if not target:
|
||||
print(f"{Color.C}[系统提示] 用户名不能为空{Color.R}")
|
||||
elif target == self.username:
|
||||
print(f"{Color.C}[系统提示] 无法踢自己{Color.R}")
|
||||
elif target in self.clients:
|
||||
self.remove_client(target)
|
||||
print(f"{Color.C}[系统提示] 已将{target} 踢出房间{Color.R}")
|
||||
else:
|
||||
print(f"{Color.C}[系统提示] 该用户不在房间内{Color.R}")
|
||||
time.sleep(1)
|
||||
elif choice == "2":
|
||||
target = input(f"{Color.C}[请输入要禁言的用户名(输入'全体'开启全体禁言)]:{Color.R}".strip())
|
||||
if not target:
|
||||
print(f"{Color.C}[系统提示] 请输入有效对象{Color.R}")
|
||||
elif target == "全体":
|
||||
self.global_mute = True
|
||||
print(f"{Color.C}[系统提示] 已开启全体禁言{Color.R}")
|
||||
self.broadcast(f"{Color.C}[系统提示] 房主已开启全体禁言{Color.R}", exclude=None)
|
||||
elif target == self.username:
|
||||
print(f"{Color.C}[系统提示] 无法禁言自己{Color.R}")
|
||||
elif target in self.clients:
|
||||
self.muted_users.add(target)
|
||||
print(f"{Color.C}[系统提示] 已禁言{target}{Color.R}")
|
||||
try:
|
||||
self.clients[target].send(f"{Color.C}[系统提示] 你已被房主禁言{Color.R}".encode('utf-8'))
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
print(f"{Color.C}[系统提示] 该用户不在房间内{Color.R}")
|
||||
time.sleep(1)
|
||||
elif choice == "3":
|
||||
target = input(f"{Color.C}[请输入要解禁的用户名(输入'全体'解除全体禁言)]:{Color.R}".strip())
|
||||
if not target:
|
||||
print(f"{Color.C}[系统提示] 请输入有效对象{Color.R}")
|
||||
elif target == "全体":
|
||||
self.global_mute = False
|
||||
self.muted_users.clear()
|
||||
print(f"{Color.C}[系统提示] 已解除全体禁言{Color.R}")
|
||||
self.broadcast(f"{Color.C}[系统提示] 房主已解除全体禁言{Color.R}", exclude=None)
|
||||
elif target in self.muted_users:
|
||||
self.muted_users.remove(target)
|
||||
print(f"{Color.C}[系统提示] 已解禁{target}{Color.R}")
|
||||
try:
|
||||
self.clients[target].send(f"{Color.C}[系统提示] 你已被房主解禁{Color.R}".encode('utf-8'))
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
print(f"{Color.C}[系统提示] 该用户未被禁言或不在房间内{Color.R}")
|
||||
time.sleep(1)
|
||||
elif choice == "4":
|
||||
print(f"{Color.C}{'-'*50}{Color.R}")
|
||||
print(f"{Color.C}[当前房间锁状态] {'已开启' if self.room_locked else '已关闭'}{Color.R}")
|
||||
while True:
|
||||
lock_choice = input(f"{Color.C}[请选择(1=开启,0=关闭)]:{Color.R}".strip())
|
||||
if lock_choice in ["0", "1"]:
|
||||
new_locked = (lock_choice == "1")
|
||||
break
|
||||
print(f"{Color.C}[系统提示] 无效选择!请输入0或1{Color.R}")
|
||||
if new_locked == self.room_locked:
|
||||
print(f"{Color.C}[系统提示] 房间锁状态未改变{Color.R}")
|
||||
else:
|
||||
if new_locked:
|
||||
while True:
|
||||
pwd = input(f"{Color.C}[请设置房间密码(4-8位字符)]:{Color.R}".strip())
|
||||
if 4 <= len(pwd) <= 8 and not ' ' in pwd:
|
||||
self.room_password = pwd
|
||||
break
|
||||
print(f"{Color.C}[系统提示] 密码无效!需4-8位字符且不含空格{Color.R}")
|
||||
self.room_locked = new_locked
|
||||
print(f"{Color.C}[系统提示] 房间锁状态已更新为{'开启' if new_locked else '关闭'}{Color.R}")
|
||||
self.broadcast(f"{Color.C}[系统提示] 房主已{'开启' if new_locked else '关闭'}房间锁{Color.R}", exclude=None)
|
||||
print(f"{Color.C}{'-'*50}{Color.R}")
|
||||
input(f"{Color.C}[系统提示] 按回车键返回...{Color.R}")
|
||||
elif choice == "5":
|
||||
print(f"{Color.C}{'-'*50}{Color.R}")
|
||||
print(f"{Color.C}[在线用户统计] 共{len(self.clients)+1}人{Color.R}")
|
||||
print(f"{Color.C}[房主] {self.username}{Color.R}")
|
||||
if self.clients:
|
||||
muted_list = [user for user in self.clients if user in self.muted_users]
|
||||
normal_list = [user for user in self.clients if user not in self.muted_users]
|
||||
print(f"{Color.C}[成员] {' | '.join(normal_list)}{Color.R}")
|
||||
if muted_list:
|
||||
print(f"{Color.C}[禁言成员] {' | '.join(muted_list)}{Color.R}")
|
||||
else:
|
||||
print(f"{Color.C}[成员] 暂无其他成员{Color.R}")
|
||||
print(f"{Color.C}[全体禁言状态] {'开启' if self.global_mute else '关闭'}{Color.R}")
|
||||
print(f"{Color.C}{'-'*50}{Color.R}")
|
||||
input(f"{Color.C}[系统提示] 按回车键返回...{Color.R}")
|
||||
elif choice == "6":
|
||||
print(f"{Color.C}{'-'*50}{Color.R}")
|
||||
confirm = input(f"{Color.C}[确认解散房间?(y/n)]:{Color.R}".strip())
|
||||
if confirm.lower() == "y":
|
||||
self.dismiss_room()
|
||||
elif choice != "7":
|
||||
print(f"{Color.C}[系统提示] 无效选择{Color.R}")
|
||||
time.sleep(1)
|
||||
|
||||
def change_color(self):
|
||||
print(f"{Color.C}{'='*40}{Color.R}")
|
||||
print(f"{Color.C}{'[ 更 换 聊 天 颜 色 ]':^40}{Color.R}")
|
||||
print(f"{Color.C}{'='*40}{Color.R}")
|
||||
colors = [Color.Y, '\033[91m', '\033[92m', '\033[95m', '\033[94m']
|
||||
color_names = ['黄色', '红色', '绿色', '紫色', '蓝色']
|
||||
for i, name in enumerate(color_names, 1):
|
||||
print(f"{Color.C}{i}. {colors[i-1]}{name}{Color.R}")
|
||||
print(f"{Color.C}{'-'*40}{Color.R}")
|
||||
choice = input(f"{Color.C}[请选择颜色(1-5)]:{Color.R}".strip())
|
||||
if choice.isdigit() and 1 <= int(choice) <= 5:
|
||||
self.user_color = colors[int(choice)-1]
|
||||
print(f"{Color.C}[系统提示] 聊天颜色已更换为{color_names[int(choice)-1]}{Color.R}")
|
||||
else:
|
||||
print(f"{Color.C}[系统提示] 无效选择,颜色未更改{Color.R}")
|
||||
time.sleep(1)
|
||||
|
||||
def dismiss_room(self):
|
||||
print(f"{Color.C}[系统提示] 正在解散房间...{Color.R}")
|
||||
self.connected = False
|
||||
for sock in self.clients.values():
|
||||
try:
|
||||
sock.send(f"{Color.C}[系统提示] 房主已解散房间{Color.R}".encode('utf-8'))
|
||||
sock.close()
|
||||
except Exception:
|
||||
pass
|
||||
self.clients.clear()
|
||||
self.muted_users.clear()
|
||||
self.global_mute = False
|
||||
print(f"{Color.Y}[系统提示] 房间已解散{Color.R}")
|
||||
time.sleep(2)
|
||||
self.is_host = False
|
||||
self.room_ip = ""
|
||||
self.room_port = 0
|
||||
self.room_locked = False
|
||||
self.room_password = ""
|
||||
self.show_main_menu()
|
||||
|
||||
def exit_room(self):
|
||||
print(f"{Color.C}[系统提示] 正在退出房间...{Color.R}")
|
||||
self.connected = False
|
||||
if self.is_host:
|
||||
for sock in self.clients.values():
|
||||
try:
|
||||
sock.send(f"{Color.C}[系统提示] 房主已退出房间{Color.R}".encode('utf-8'))
|
||||
sock.close()
|
||||
except Exception:
|
||||
pass
|
||||
self.clients.clear()
|
||||
self.muted_users.clear()
|
||||
self.global_mute = False
|
||||
else:
|
||||
try:
|
||||
self.client_socket.send("/退出房间".encode('utf-8'))
|
||||
self.client_socket.close()
|
||||
except Exception:
|
||||
pass
|
||||
print(f"{Color.Y}[系统提示] 已退出房间{Color.R}")
|
||||
self.is_host = False
|
||||
self.room_ip = ""
|
||||
self.room_port = 0
|
||||
self.room_locked = False
|
||||
self.room_password = ""
|
||||
time.sleep(1)
|
||||
self.show_main_menu()
|
||||
|
||||
def exit_script(self):
|
||||
print(f"{Color.C}[系统提示] 正在退出脚本...{Color.R}")
|
||||
self.running = False
|
||||
self.connected = False
|
||||
if self.client_socket:
|
||||
try:
|
||||
self.client_socket.send("/退出脚本".encode('utf-8'))
|
||||
self.client_socket.close()
|
||||
except Exception:
|
||||
pass
|
||||
if self.is_host:
|
||||
for sock in self.clients.values():
|
||||
try:
|
||||
sock.send(f"{Color.C}[系统提示] 房主已退出,脚本关闭{Color.R}".encode('utf-8'))
|
||||
sock.close()
|
||||
except Exception:
|
||||
pass
|
||||
self.clients.clear()
|
||||
self.muted_users.clear()
|
||||
self.global_mute = False
|
||||
sys.exit(0)
|
||||
|
||||
if __name__ == "__main__":
|
||||
chat = ChatRoom()
|
||||
chat.register()
|
||||
chat.show_main_menu()
|
||||
Reference in New Issue
Block a user