415 lines
18 KiB
Python
415 lines
18 KiB
Python
import requests
|
|
import re
|
|
import cv2 # 导入OpenCV库
|
|
|
|
|
|
#本段代码更新文件方式-----追加写入
|
|
#同省份多城市查询IP
|
|
|
|
# 更新四川电信组播定义fofa链接列表 #///////////////////////////////////////
|
|
fofa_urls = [
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiBjaXR5PSJjaGVuZ2R1Ig%3D%3D', # 成都 #///////////////////////////////////////
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiBjaXR5PSJYaWNoYW5nIg%3D%3D', # 西昌
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiBjaXR5PSJtaWFueWFuZyI%3D', # 绵阳
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiBjaXR5PSJsdXpob3Ui', # 泸州
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiBjaXR5PSJkYXpob3Ui', # 达州
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiBjaXR5PSJuYW5jaG9uZyI%3D' # 南充
|
|
]
|
|
|
|
# 尝试从fofa链接提取IP地址和端口号,并去除重复项
|
|
def extract_unique_ip_ports(fofa_urls):
|
|
all_unique_ips_ports = set()
|
|
for fofa_url in fofa_urls:
|
|
try:
|
|
response = requests.get(fofa_url, timeout=10)
|
|
html_content = response.text
|
|
# 使用正则表达式匹配IP地址和端口号
|
|
ips_ports = re.findall(r'(\d+\.\d+\.\d+\.\d+:\d+)', html_content)
|
|
all_unique_ips_ports.update(ips_ports) # 将IP和端口号添加到集合中
|
|
except requests.RequestException as e:
|
|
print(f"请求 {fofa_url} 时发生错误: {e}")
|
|
return list(all_unique_ips_ports)
|
|
# 检查视频流的可达性
|
|
def check_video_stream_connectivity(ip_port, urls_udp):
|
|
video_url = f"http://{ip_port}{urls_udp}"
|
|
cap = cv2.VideoCapture(video_url)
|
|
if cap.isOpened():
|
|
cap.release()
|
|
return ip_port # 如果打开成功,返回IP和端口号
|
|
else:
|
|
cap.release()
|
|
return None
|
|
# 从本地文件读取内容
|
|
def read_file_content(file_path):
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
return file.read()
|
|
except FileNotFoundError:
|
|
print(f"文件 {file_path} 未找到。")
|
|
return None
|
|
# 更新文件中的IP地址并将每个IP追加写入
|
|
def update_and_write_ips(file_path, valid_ips_ports):
|
|
try:
|
|
original_content = read_file_content(file_path)
|
|
if original_content:
|
|
# 假设原文件中的IP地址格式为 http://IP:PORT
|
|
ip_port_pattern = r'http://(\d+\.\d+\.\d+:\d+)'
|
|
new_lines = []
|
|
for line in original_content.splitlines():
|
|
match = re.search(ip_port_pattern, line)
|
|
if match:
|
|
# 找到一个IP地址,替换为有效的IP地址列表
|
|
for valid_ip_port in valid_ips_ports:
|
|
new_line = re.sub(ip_port_pattern, f'http://{valid_ip_port}', line)
|
|
new_lines.append(new_line)
|
|
# 将更新后的内容追加到文件末尾
|
|
with open(file_path, 'a', encoding='utf-8') as file:
|
|
file.write("\n".join(new_lines) + "\n")
|
|
print(f"文件 {file_path} 已更新并追加保存。")
|
|
except Exception as e:
|
|
print(f"更新文件 {file_path} 时发生错误: {e}")
|
|
# 定义组播地址和端口
|
|
urls_udp = "/rtp/239.93.0.58:5140" #///////////////////////////////////////
|
|
# 提取唯一的IP地址和端口号
|
|
unique_ips_ports = extract_unique_ip_ports(fofa_urls)
|
|
# 存储所有有效的IP地址和端口号
|
|
valid_ips_ports = [ip_port for ip_port in unique_ips_ports if check_video_stream_connectivity(ip_port, urls_udp)]
|
|
if valid_ips_ports:
|
|
print("找到的可访问视频流服务的IP地址和端口号:")
|
|
for valid_ip_port in valid_ips_ports:
|
|
print(valid_ip_port)
|
|
# 指定需要更新的本地文件路径
|
|
local_file_path = 'playlist/四川电信.txt' #///////////////////////////////////////
|
|
# 更新文件中的IP地址并将每个IP写入新行
|
|
update_and_write_ips(local_file_path, valid_ips_ports)
|
|
else:
|
|
print("没有找到可访问的视频流服务或者没有提取到IP地址和端口号。")
|
|
|
|
|
|
|
|
|
|
# 更新湖北电信组播定义fofa链接列表 #///////////////////////////////////////
|
|
fofa_urls = [
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiBjaXR5PSJ3dWhhbiI%3D', # 武汉 #///////////////////////////////////////
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiBjaXR5PSJodWFuZ3NoaSI%3D', # 黄石
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiBjaXR5PSJ4aWFuZ3lhbmci', # 襄阳
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiBjaXR5PSJqaW5nemhvdSI%3D', # 荆州
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiBjaXR5PSJodWFuZ2dhbmci', # 黄冈
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiBjaXR5PSJ4aWFvZ2FuIg%3D%3D', # 孝感
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiBjaXR5PSJKaW5nbWVuIg%3D%3D', # 荆门
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiBjaXR5PSJ4aWFubmluZyI%3D' # 咸宁
|
|
]
|
|
|
|
# 尝试从fofa链接提取IP地址和端口号,并去除重复项
|
|
def extract_unique_ip_ports(fofa_urls):
|
|
all_unique_ips_ports = set()
|
|
for fofa_url in fofa_urls:
|
|
try:
|
|
response = requests.get(fofa_url, timeout=10)
|
|
html_content = response.text
|
|
# 使用正则表达式匹配IP地址和端口号
|
|
ips_ports = re.findall(r'(\d+\.\d+\.\d+\.\d+:\d+)', html_content)
|
|
all_unique_ips_ports.update(ips_ports) # 将IP和端口号添加到集合中
|
|
except requests.RequestException as e:
|
|
print(f"请求 {fofa_url} 时发生错误: {e}")
|
|
return list(all_unique_ips_ports)
|
|
# 检查视频流的可达性
|
|
def check_video_stream_connectivity(ip_port, urls_udp):
|
|
video_url = f"http://{ip_port}{urls_udp}"
|
|
cap = cv2.VideoCapture(video_url)
|
|
if cap.isOpened():
|
|
cap.release()
|
|
return ip_port # 如果打开成功,返回IP和端口号
|
|
else:
|
|
cap.release()
|
|
return None
|
|
# 从本地文件读取内容
|
|
def read_file_content(file_path):
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
return file.read()
|
|
except FileNotFoundError:
|
|
print(f"文件 {file_path} 未找到。")
|
|
return None
|
|
# 更新文件中的IP地址并将每个IP追加写入
|
|
def update_and_write_ips(file_path, valid_ips_ports):
|
|
try:
|
|
original_content = read_file_content(file_path)
|
|
if original_content:
|
|
# 假设原文件中的IP地址格式为 http://IP:PORT
|
|
ip_port_pattern = r'http://(\d+\.\d+\.\d+:\d+)'
|
|
new_lines = []
|
|
for line in original_content.splitlines():
|
|
match = re.search(ip_port_pattern, line)
|
|
if match:
|
|
# 找到一个IP地址,替换为有效的IP地址列表
|
|
for valid_ip_port in valid_ips_ports:
|
|
new_line = re.sub(ip_port_pattern, f'http://{valid_ip_port}', line)
|
|
new_lines.append(new_line)
|
|
# 将更新后的内容追加到文件末尾
|
|
with open(file_path, 'a', encoding='utf-8') as file:
|
|
file.write("\n".join(new_lines) + "\n")
|
|
print(f"文件 {file_path} 已更新并追加保存。")
|
|
except Exception as e:
|
|
print(f"更新文件 {file_path} 时发生错误: {e}")
|
|
# 定义组播地址和端口
|
|
urls_udp = "/rtp/239.254.96.96:8550" #///////////////////////////////////////
|
|
# 提取唯一的IP地址和端口号
|
|
unique_ips_ports = extract_unique_ip_ports(fofa_urls)
|
|
# 存储所有有效的IP地址和端口号
|
|
valid_ips_ports = [ip_port for ip_port in unique_ips_ports if check_video_stream_connectivity(ip_port, urls_udp)]
|
|
if valid_ips_ports:
|
|
print("找到的可访问视频流服务的IP地址和端口号:")
|
|
for valid_ip_port in valid_ips_ports:
|
|
print(valid_ip_port)
|
|
# 指定需要更新的本地文件路径
|
|
local_file_path = 'playlist/湖北电信.txt' #///////////////////////////////////////
|
|
# 更新文件中的IP地址并将每个IP写入新行
|
|
update_and_write_ips(local_file_path, valid_ips_ports)
|
|
else:
|
|
print("没有找到可访问的视频流服务或者没有提取到IP地址和端口号。")
|
|
|
|
|
|
|
|
# 更新北京联通组播定义fofa链接列表//////////////////////////////////////////////////////////////
|
|
fofa_urls = [
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiByZWdpb249ImJlaWppbmci' # 北京
|
|
]
|
|
|
|
# 尝试从fofa链接提取IP地址和端口号,并去除重复项
|
|
def extract_unique_ip_ports(fofa_urls):
|
|
all_unique_ips_ports = set()
|
|
for fofa_url in fofa_urls:
|
|
try:
|
|
response = requests.get(fofa_url, timeout=10)
|
|
html_content = response.text
|
|
# 使用正则表达式匹配IP地址和端口号
|
|
ips_ports = re.findall(r'(\d+\.\d+\.\d+\.\d+:\d+)', html_content)
|
|
all_unique_ips_ports.update(ips_ports) # 将IP和端口号添加到集合中
|
|
except requests.RequestException as e:
|
|
print(f"请求 {fofa_url} 时发生错误: {e}")
|
|
return list(all_unique_ips_ports)
|
|
# 检查视频流的可达性
|
|
def check_video_stream_connectivity(ip_port, urls_udp):
|
|
video_url = f"http://{ip_port}{urls_udp}"
|
|
cap = cv2.VideoCapture(video_url)
|
|
if cap.isOpened():
|
|
cap.release()
|
|
return ip_port # 如果打开成功,返回IP和端口号
|
|
else:
|
|
cap.release()
|
|
return None
|
|
# 从本地文件读取内容
|
|
def read_file_content(file_path):
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
return file.read()
|
|
except FileNotFoundError:
|
|
print(f"文件 {file_path} 未找到。")
|
|
return None
|
|
# 更新文件中的IP地址并将每个IP追加写入
|
|
def update_and_write_ips(file_path, valid_ips_ports):
|
|
try:
|
|
original_content = read_file_content(file_path)
|
|
if original_content:
|
|
# 假设原文件中的IP地址格式为 http://IP:PORT
|
|
ip_port_pattern = r'http://(\d+\.\d+\.\d+:\d+)'
|
|
new_lines = []
|
|
for line in original_content.splitlines():
|
|
match = re.search(ip_port_pattern, line)
|
|
if match:
|
|
# 找到一个IP地址,替换为有效的IP地址列表
|
|
for valid_ip_port in valid_ips_ports:
|
|
new_line = re.sub(ip_port_pattern, f'http://{valid_ip_port}', line)
|
|
new_lines.append(new_line)
|
|
# 将更新后的内容追加到文件末尾
|
|
with open(file_path, 'a', encoding='utf-8') as file:
|
|
file.write("\n".join(new_lines) + "\n")
|
|
print(f"文件 {file_path} 已更新并追加保存。")
|
|
except Exception as e:
|
|
print(f"更新文件 {file_path} 时发生错误: {e}")
|
|
# 定义组播地址和端口
|
|
urls_udp = "/rtp/239.3.1.129:8008" #/////////////////////////////////////////////////////////
|
|
# 提取唯一的IP地址和端口号
|
|
unique_ips_ports = extract_unique_ip_ports(fofa_urls)
|
|
# 存储所有有效的IP地址和端口号
|
|
valid_ips_ports = [ip_port for ip_port in unique_ips_ports if check_video_stream_connectivity(ip_port, urls_udp)]
|
|
if valid_ips_ports:
|
|
print("找到的可访问视频流服务的IP地址和端口号:")
|
|
for valid_ip_port in valid_ips_ports:
|
|
print(valid_ip_port)
|
|
# 指定需要更新的本地文件路径
|
|
local_file_path = 'playlist/北京联通.txt' #///////////////////////////////////////
|
|
# 更新文件中的IP地址并将每个IP写入新行
|
|
update_and_write_ips(local_file_path, valid_ips_ports)
|
|
else:
|
|
print("没有找到可访问的视频流服务或者没有提取到IP地址和端口号。")
|
|
|
|
|
|
|
|
|
|
|
|
# 更新江苏电信组播定义fofa链接列表 #///////////////////////////////////////
|
|
fofa_urls = [
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiByZWdpb249IkppYW5nc3UiICYmIGNpdHk9InN1emhvdSI%3D', # 苏州 #///////////////////////////////////////
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiByZWdpb249IkppYW5nc3UiICYmIGNpdHk9Im5hbmppbmci', # nanjing
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiByZWdpb249IkppYW5nc3UiICYmIGNpdHk9Im5hbnRvbmci', # 南通
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiByZWdpb249IkppYW5nc3UiICYmIGNpdHk9Inlhbmd6aG91Ig%3D%3D', # 扬州
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiByZWdpb249IkppYW5nc3UiICYmIGNpdHk9ImNoYW5nemhvdSI=', # 常州
|
|
'https://fofa.info/result?qbase64=InVkcHh5IiAmJiByZWdpb249IkppYW5nc3UiICYmIGNpdHk9Ind1eGki', # 无锡
|
|
'InVkcHh5IiAmJiByZWdpb249IkppYW5nc3UiICYmIGNpdHk9Inh1emhvdSI=' # 徐州
|
|
]
|
|
|
|
# 尝试从fofa链接提取IP地址和端口号,并去除重复项
|
|
def extract_unique_ip_ports(fofa_urls):
|
|
all_unique_ips_ports = set()
|
|
for fofa_url in fofa_urls:
|
|
try:
|
|
response = requests.get(fofa_url, timeout=10)
|
|
html_content = response.text
|
|
# 使用正则表达式匹配IP地址和端口号
|
|
ips_ports = re.findall(r'(\d+\.\d+\.\d+\.\d+:\d+)', html_content)
|
|
all_unique_ips_ports.update(ips_ports) # 将IP和端口号添加到集合中
|
|
except requests.RequestException as e:
|
|
print(f"请求 {fofa_url} 时发生错误: {e}")
|
|
return list(all_unique_ips_ports)
|
|
# 检查视频流的可达性
|
|
def check_video_stream_connectivity(ip_port, urls_udp):
|
|
video_url = f"http://{ip_port}{urls_udp}"
|
|
cap = cv2.VideoCapture(video_url)
|
|
if cap.isOpened():
|
|
cap.release()
|
|
return ip_port # 如果打开成功,返回IP和端口号
|
|
else:
|
|
cap.release()
|
|
return None
|
|
# 从本地文件读取内容
|
|
def read_file_content(file_path):
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
return file.read()
|
|
except FileNotFoundError:
|
|
print(f"文件 {file_path} 未找到。")
|
|
return None
|
|
# 更新文件中的IP地址并将每个IP追加写入
|
|
def update_and_write_ips(file_path, valid_ips_ports):
|
|
try:
|
|
original_content = read_file_content(file_path)
|
|
if original_content:
|
|
# 假设原文件中的IP地址格式为 http://IP:PORT
|
|
ip_port_pattern = r'http://(\d+\.\d+\.\d+:\d+)'
|
|
new_lines = []
|
|
for line in original_content.splitlines():
|
|
match = re.search(ip_port_pattern, line)
|
|
if match:
|
|
# 找到一个IP地址,替换为有效的IP地址列表
|
|
for valid_ip_port in valid_ips_ports:
|
|
new_line = re.sub(ip_port_pattern, f'http://{valid_ip_port}', line)
|
|
new_lines.append(new_line)
|
|
# 将更新后的内容追加到文件末尾
|
|
with open(file_path, 'a', encoding='utf-8') as file:
|
|
file.write("\n".join(new_lines) + "\n")
|
|
print(f"文件 {file_path} 已更新并追加保存。")
|
|
except Exception as e:
|
|
print(f"更新文件 {file_path} 时发生错误: {e}")
|
|
# 定义组播地址和端口
|
|
urls_udp = "/rtp/239.49.8.19:9614" #///////////////////////////////////////
|
|
# 提取唯一的IP地址和端口号
|
|
unique_ips_ports = extract_unique_ip_ports(fofa_urls)
|
|
# 存储所有有效的IP地址和端口号
|
|
valid_ips_ports = [ip_port for ip_port in unique_ips_ports if check_video_stream_connectivity(ip_port, urls_udp)]
|
|
if valid_ips_ports:
|
|
print("找到的可访问视频流服务的IP地址和端口号:")
|
|
for valid_ip_port in valid_ips_ports:
|
|
print(valid_ip_port)
|
|
# 指定需要更新的本地文件路径
|
|
local_file_path = 'playlist/江苏电信.txt' #///////////////////////////////////////
|
|
# 更新文件中的IP地址并将每个IP写入新行
|
|
update_and_write_ips(local_file_path, valid_ips_ports)
|
|
else:
|
|
print("没有找到可访问的视频流服务或者没有提取到IP地址和端口号。")
|
|
|
|
|
|
###############检测playlist文件夹内所有txt文件内的组播
|
|
###############检测playlist文件夹内所有txt文件内的组播
|
|
###############检测playlist文件夹内所有txt文件内的组播
|
|
|
|
import os
|
|
import cv2
|
|
import time
|
|
from tqdm import tqdm
|
|
import sys
|
|
|
|
# 初始化字典以存储IP检测结果
|
|
detected_ips = {}
|
|
|
|
def get_ip_key(url):
|
|
"""从URL中提取IP地址,并构造一个唯一的键"""
|
|
start = url.find('://') + 3
|
|
end = url.find('/', start)
|
|
if end == -1:
|
|
end = len(url)
|
|
return url[start:end].strip()
|
|
|
|
# 设置固定的文件夹路径
|
|
folder_path = 'playlist'
|
|
|
|
# 确保文件夹路径存在
|
|
if not os.path.isdir(folder_path):
|
|
print("指定的文件夹不存在。")
|
|
sys.exit()
|
|
|
|
# 遍历文件夹中的所有.txt文件
|
|
for filename in os.listdir(folder_path):
|
|
if filename.endswith('.txt'):
|
|
file_path = os.path.join(folder_path, filename)
|
|
# 读取文件内容
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
lines = file.readlines()
|
|
|
|
# 准备写回文件
|
|
with open(file_path, 'w', encoding='utf-8') as output_file:
|
|
# 使用 tqdm 显示进度条
|
|
for line in tqdm(lines, total=len(lines), desc=f"Processing {filename}"):
|
|
parts = line.split(',', 1)
|
|
if len(parts) >= 2:
|
|
channel_name, url = parts
|
|
channel_name = channel_name.strip()
|
|
url = url.strip()
|
|
ip_key = get_ip_key(url)
|
|
|
|
# 检查IP是否已经被检测过
|
|
if ip_key in detected_ips:
|
|
# 如果之前检测成功,则写入该行
|
|
if detected_ips[ip_key]['status'] == 'ok':
|
|
output_file.write(line)
|
|
continue # 无论之前检测结果如何,都不重新检测
|
|
|
|
# 初始化帧计数器和成功标志
|
|
frame_count = 0
|
|
success = False
|
|
# 尝试打开视频流
|
|
cap = cv2.VideoCapture(url)
|
|
start_time = time.time()
|
|
while (time.time() - start_time) < 5:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
frame_count += 1
|
|
# 如果在3秒内读取到60帧以上,设置成功标志
|
|
if frame_count >= 5:
|
|
success = True
|
|
break
|
|
cap.release()
|
|
|
|
# 根据检测结果更新字典
|
|
if success:
|
|
detected_ips[ip_key] = {'status': 'ok'}
|
|
output_file.write(line)
|
|
else:
|
|
detected_ips[ip_key] = {'status': 'fail'}
|
|
|
|
# 打印检测结果
|
|
for ip_key, result in detected_ips.items():
|
|
print(f"IP Key: {ip_key}, Status: {result['status']}")
|