Update 一锅烩.py

This commit is contained in:
frxz751113
2024-08-26 10:53:06 +08:00
committed by GitHub
parent d27f277fea
commit e78e150618
+57 -219
View File
@@ -253,239 +253,77 @@ if __name__ == "__main__":
write_filtered_lines(output_file_path, filtered_lines)
print(f"文件已过滤完成")
########################################################################################################
#################################################### 对整理好的频道列表测试HTTP连接
def test_connectivity(url, max_attempts=2): #定义测试HTTP连接的次数
# 尝试连接指定次数
for _ in range(max_attempts):
try:
response = requests.head(url, timeout=0.5) # 发送HEAD请求,仅支持V4,修改此行数字可定义链接超时##////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#response = requests.get(url, timeout=1) # 发送get请求,支持V6,修改此行数字可定义链接超时##############################//////////////////////////////////////////////////////////////////////////////////////
return response.status_code == 200 # 返回True如果状态码为200
except requests.RequestException: # 捕获requests引发的异常
pass # 发生异常时忽略
#return False # 如果所有尝试都失败,返回False
pass
# 使用队列来收集结果的函数
def process_line(line, result_queue):
parts = line.strip().split(",") # 去除行首尾空白并按逗号分割
if len(parts) == 2 and parts[1]: # 确保有URL,并且URL不为空
channel_name, channel_url = parts # 分别赋值频道名称和URL
if test_connectivity(channel_url): # 测试URL是否有效
result_queue.put((channel_name, channel_url, "有效")) # 将结果放入队列
else:
result_queue.put((channel_name, channel_url, "无效")) # 将结果放入队列
else:
# 格式不正确的行不放入队列
pass
# 主函数
def main(source_file_path, output_file_path):
with open(source_file_path, "r", encoding="utf-8") as source_file: # 打开源文件
lines = source_file.readlines() # 读取所有行s
result_queue = queue.Queue() # 创建队列
threads = [] # 初始化线程列表
for line in tqdm(lines, desc="检测进行中"): # 显示进度条
thread = threading.Thread(target=process_line, args=(line, result_queue)) # 创建线程
thread.start() # 启动线程
threads.append(thread) # 将线程加入线程列表
for thread in threads: # 等待所有线程完成
thread.join()
# 初始化计数器
valid_count = 0
invalid_count = 0
with open(output_file_path, "w", encoding="utf-8") as output_file: # 打开输出文件
for _ in range(result_queue.qsize()): # 使用队列的大小来循环
item = result_queue.get() # 获取队列中的项目
# 只有在队列中存在有效的项目时才写入文件
if item[0] and item[1]: # 确保channel_name和channel_url都不为None
output_file.write(f"{item[0]},{item[1]},{item[2]}\n") # 写入文件
if item[2] == "有效": # 统计有效源数量
valid_count += 1
else: # 统计无效源数量
invalid_count += 1
print(f"任务完成, 有效源数量: {valid_count}, 无效源数量: {invalid_count}") # 打印结果
if __name__ == "__main__":
try:
source_file_path = "2.txt" # 输入源文件路径
output_file_path = "2.txt" # 设置输出文件路径
main(source_file_path, output_file_path) # 调用main函数
except Exception as e:
print(f"程序发生错误: {e}") # 打印错误信息
#########################################################################提取2中的有效行
def filter_lines(file_path):
with open(file_path, 'r', encoding='utf-8') as file: # 打开文件
lines = file.readlines() # 读取所有行
filtered_lines = [] # 初始化过滤后的行列表
for line in lines: # 遍历所有行
if 'genre' in line or '有效' in line: # 如果行中包含'genre'或'有效'
filtered_lines.append(line) # 将行添加到过滤后的行列表
return filtered_lines # 返回过滤后的行列表
def write_filtered_lines(output_file_path, filtered_lines):
with open(output_file_path, 'w', encoding='utf-8') as output_file: # 打开输出文件
output_file.writelines(filtered_lines) # 写入过滤后的行
if __name__ == "__main__":
input_file_path = "2.txt" # 设置输入文件路径
output_file_path = "2.txt" # 设置输出文件路径
filtered_lines = filter_lines(input_file_path) # 调用filter_lines函数
write_filtered_lines(output_file_path, filtered_lines) # 调用write_filtered_lines函数
###################################################################################定义替换规则的字典,对整行内的内容进行替换
replacements = {
",有效": "", # 将",有效"替换为空字符串
"#genre#,无效": "#genre#", # 将"#genre#,无效"替换为"#genre#"
}
#################文本排序
# 打开原始文件读取内容,并写入新文件
with open('2.txt', 'r', encoding='utf-8') as file:
lines = file.readlines()
# 创建新文件并写入替换后的内容
with open('2.txt', 'w', encoding='utf-8') as new_file:
for line in lines:
for old, new in replacements.items(): # 遍历替换规则字典
line = line.replace(old, new) # 替换行中的内容
new_file.write(line) # 写入新文件
print("新文件已保存。") # 打印完成信息
# 定义一个函数,用于提取每行的第一个数字
def extract_first_number(line):
match = re.search(r'\d+', line)
return int(match.group()) if match else float('inf')
# 对列表中的行进行排序
# 按照第一个数字的大小排列,如果不存在数字则按中文拼音排序
sorted_lines = sorted(lines, key=lambda x: (not 'CCTV' in x, extract_first_number(x) if 'CCTV' in x else lazy_pinyin(x.strip())))
# 将排序后的行写入新的utf-8编码的文本文件,文件名基于原文件名
output_file_path = "sorted_" + os.path.basename(file_path)
# 写入新文件
with open('2.txt', "w", encoding="utf-8") as file:
for line in sorted_lines:
file.write(line)
print(f"文件已排序并保存为: {output_file_path}")
########################################################################################################################################################################################
################################################################定义关键词分割规则
def check_and_write_file(input_file, output_file, keywords):
# 使用 split(', ') 来分割关键词
keywords_list = keywords.split(', ')
pattern = '|'.join(re.escape(keyword) for keyword in keywords_list)
# 读取输入文件并提取包含关键词的行
extracted_lines = []
with open(input_file, 'r', encoding='utf-8') as file:
import re
def parse_file(input_file_path, output_file_name):
# 正则表达式匹配从'//'开始到第一个'/'或第一个'::'结束的部分
ip_or_domain_pattern = re.compile(r'//([^/:]*:[^/:]*::[^/:]*|[^/]*)')
# 用于存储每个IP或域名及其对应的行列表
ip_or_domain_to_lines = {}
# 读取原始文件内容
with open(input_file_path, 'r', encoding='utf-8') as file:
for line in file:
if "genre" not in line:
if re.search(pattern, line):
extracted_lines.append(line)
# 如果至少提取到一行,写入头部信息和提取的行到输出文件
if extracted_lines:
with open(output_file, 'w', encoding='utf-8') as out_file:
out_file.write(f"{keywords_list[0]},#genre#\n") # 写入头部信息
out_file.writelines(extracted_lines) # 写入提取的行
# 获取头部信息的大小
header_size = len(f"{keywords_list[0]},#genre#\n")
# 检查文件的总大小
file_size = os.path.getsize(output_file)
# 如果文件大小小于30字节(假设的最小文件大小),删除文件
if file_size < 800:
os.remove(output_file)
print(f"文件只包含头部信息,{output_file} 已被删除。")
else:
print(f"文件已提取关键词并保存为: {output_file}")
else:
print(f"未提取到关键词,不创建输出文件 {output_file}")
# 按类别提取关键词并写入文件
check_and_write_file('2.txt', 'a0.txt', keywords="央视频道, 8K, 4K, 4k")
check_and_write_file('2.txt', 'a.txt', keywords="央视频道, CCTV")
check_and_write_file('2.txt', 'a1.txt', keywords="央视频道, 第一剧场, 风云剧场, 怀旧剧场, 家庭影院, 高清电影, 影迷, 动作电影, 峨眉电影, 世界地理")
check_and_write_file('2.txt', 'b0.txt', keywords="卫视频道, 湖北卫视, 湖南卫视, 江苏卫视, 安徽卫视, 凤凰卫视, 辽宁卫视")
check_and_write_file('2.txt', 'b.txt', keywords="卫视频道, 卫视")
check_and_write_file('2.txt', 'c.txt', keywords="影视频道, 爱情喜剧, 爱喜喜剧, 惊嫊悬疑, 东北热剧, 动作电影, 无名, 都市剧场, iHOT, 海外剧场, 欢笑剧场, 重温经典, 明星大片, 中国功夫, 军旅, 东北热剧, 中国功夫, 军旅剧场, 古装剧场, \
家庭剧场, 惊悚悬疑, 欢乐剧场, 潮妈辣婆, 爱情喜剧, 精品大剧, 超级影视, 超级电影, 黑莓动画, 黑莓电影, 海外剧场, 精彩影视, 无名影视, 潮婆辣妈, 超级剧, 热播精选")
check_and_write_file('2.txt', 'c1.txt', keywords="影视频道, 求索动物, 求索, 求索科学, 求索记录, 爱谍战, 爱动漫, 爱科幻, 爱青春, 爱自然, 爱科学, 爱浪漫, 爱历史, 爱旅行, 爱奇谈, 爱怀旧, 爱赛车, 爱都市, 爱体育, 爱经典, \
爱玩具, 爱喜剧, 爱悬疑, 爱幼教, 爱院线")
check_and_write_file('2.txt', 'e.txt', keywords="港澳频道, TVB, 澳门, 龙华, 民视, 中视, 华视, AXN, AMC, MOMO, 采昌, 耀才, 靖天, 镜新闻, 靖洋, 莲花, 年代, 爱尔达, 好莱坞, 华丽, 非凡, 公视, 寰宇, 无线, EVEN, MoMo, 爆谷, 面包, momo, 唐人, \
中华小, 三立, CNA, FOX, RTHK, Movie, 八大, 中天, 中视, 东森, 凤凰, 天映, 美亚, 环球, 翡翠, 亚洲, 大爱, 大愛, 明珠, 半岛, AMC, 龙祥, 台视, 1905, 纬来, 神话, 经典都市, 视界, 番薯, 私人, 酒店, TVB, 凤凰, 半岛, 星光视界, \
番薯, 大愛, 新加坡, 星河, 明珠, 环球, 翡翠台, 大立, 好消息, 美国中文, 神州, 天良, 18台, HBO家庭, HBO, HISTORY, HOY国际财经, 八度, 博斯, 达文西, 迪士尼, \
动物星球, 港石金曲, 红牛, 互动英语, 华纳影视, 华语剧台, ELTV, 欢喜台, 美食星球, 千禧, 全球财经, 探案, 探索, 小尼克, 幸福空间, 影剧, 粤语片台, 智林, 猪哥亮")
check_and_write_file('2.txt', 'f0.txt', keywords="湖北湖南, 湖北, 湖南")
check_and_write_file('2.txt', 'f.txt', keywords="省市频道, 湖北, 武汉, 松滋, 十堰, 咸宁, 远安, 崇阳, 黄石, 荆州, 当阳, 恩施, 五峰, 来凤, 枝江, 黄冈, 随州, 荆门, 秭归, 宜昌, 长阳, 大悟, 孝感, 鄂州, 垄上, 宜都")
check_and_write_file('2.txt', 'f1.txt', keywords="省市频道, 湖南, 长沙, 常德, 郴州, 垂钓, 金鹰纪实, 衡阳, 怀化, 茶, 吉首, 娄底, 邵阳, 湘潭, 益阳, 永州, 岳阳, 张家界, 株洲, 城步, 崇左, 洪雅, 涟水, 灵石, 隆回, 罗城, 溆浦, 邵阳")
#check_and_write_file('2.txt', 'h0.txt', keywords="河南河北, 河南, 河北")
check_and_write_file('2.txt', 'h.txt', keywords="河南河北, 河南, 焦作, 封丘, 郏县, 获嘉, 巩义, 邓州, 宝丰, 开封, 卢氏, 洛阳, 孟津, 安阳, 渑池, 南阳, 林州, 滑县, 栾川, 襄城, 宜阳, 长垣, 内黄, 鹿邑, 新安, 平顶山, 淇县, \
杞县, 汝阳, 三门峡, 卫辉, 淅川, 新密, 新乡, 信阳, 新郑, 延津, 叶县, 义马, 永城, 禹州, 原阳, 镇平, 郑州, 周口, 泌阳, 郸城, 登封, 扶沟, 潢川, 辉县, 济源, 浚县, 临颍, 灵宝, 鲁山, 罗山, 沁阳, 汝州, 唐河, 尉氏")
check_and_write_file('2.txt', 'h1.txt', keywords="河南河北, 河北, 石家庄, 承德, 丰宁, 临漳, 井陉, 井陉矿区, 保定, 元氏, 兴隆, 内丘, 南宫, 吴桥, 唐县, 唐山, 安平, 定州, 大厂, 张家口, 徐水, 成安, 故城, 康保, 廊坊, 晋州, \
景县, 武安, 枣强, 柏乡, 涉县, 涞水, 涞源, 涿州, 深州, 深泽, 清河, 秦皇岛, 衡水, 遵化, 邢台, 邯郸, 邱县, 隆化, 雄县, 阜平, 高碑店, 高邑, 魏县, 黄骅, 饶阳, 赵县, 睛彩河北, 滦南, 玉田, 崇礼, 平泉, \
容城, 文安, 三河, 清河, 潞城, 迁安, 迁西, 清苑, 确山")
###############################################################################################################################################################################################################################
##############################################################对生成的文件进行合并
file_contents = []
file_paths = ["e.txt", "a0.txt", "a.txt", "a1.txt", "b0.txt", "b.txt", "c.txt", "c1.txt", "c2.txt", "d.txt", "f0.txt", "f.txt", "f1.txt", "g0.txt", "g.txt", "g1.txt", "h0.txt", "h.txt", "h1.txt"] # 替换为实际的文件路径列表
for file_path in file_paths:
if os.path.exists(file_path):
with open(file_path, 'r', encoding="utf-8") as file:
content = file.read()
file_contents.append(content)
else: # 如果文件不存在,则提示异常并打印提示信息
print(f"文件 {file_path} 不存在,跳过")
# 写入合并后的文件
with open("去重.txt", "w", encoding="utf-8") as output:
output.write('\n'.join(file_contents))
###############################################################################################################################################################################################################################
##############################################################对生成的文件进行网址及文本去重复,避免同一个频道出现在不同的类中
def remove_duplicates(input_file, output_file):
# 用于存储已经遇到的URL和包含genre的行
seen_urls = set()
seen_lines_with_genre = set()
# 用于存储最终输出的行
output_lines = []
# 打开输入文件并读取所有行
with open(input_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
print("去重前的行数:", len(lines))
# 遍历每一行
line = line.strip()
# 如果行是分类标签行,则跳过
if ",#genre#" in line:
continue
# 检查行是否包含IP或域名
match = ip_or_domain_pattern.search(line)
if match:
# 提取匹配到的IP或域名
matched_text = match.group(1)
# 去除IP或域名后的剩余部分,只保留匹配到的IP或域名
ip_or_domain = matched_text.split('://')[-1].split('/')[0].split('::')[0]
# 将行添加到对应的IP或域名列表中
if ip_or_domain not in ip_or_domain_to_lines:
ip_or_domain_to_lines[ip_or_domain] = []
ip_or_domain_to_lines[ip_or_domain].append(line)
# 过滤掉小于5000字节的IP或域名段
filtered_ip_or_domain_to_lines = {ip_or_domain: lines for ip_or_domain, lines in ip_or_domain_to_lines.items()
if sum(len(line) for line in lines) >= 3000}
# 如果没有满足条件的IP或域名段,则不生成文件
if not filtered_ip_or_domain_to_lines:
print("没有满足条件的IP或域名段,不生成文件。")
return
# 合并所有满足条件的IP或域名的行到一个文件
with open(output_file_name, 'w', encoding='utf-8') as output_file:
for ip_or_domain, lines in filtered_ip_or_domain_to_lines.items():
# 写入IP或域名及其对应的行到输出文件
output_file.write(f"{ip_or_domain},#genre#\n")
for line in lines:
# 使用正则表达式查找URL和包含genre的行,默认最后一行
urls = re.findall(r'[https]?[http]?[P2p]?[mitv]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', line)
genre_line = re.search(r'\bgenre\b', line, re.IGNORECASE) is not None
# 如果找到URL并且该URL尚未被记录
if urls and urls[0] not in seen_urls:
seen_urls.add(urls[0])
output_lines.append(line)
# 如果找到包含genre的行,无论是否已被记录,都写入新文件
if genre_line:
output_lines.append(line)
# 将结果写入输出文件
with open(output_file, 'w', encoding='utf-8') as f:
f.writelines(output_lines)
print("去重后的行数:", len(output_lines))
# 使用方法
remove_duplicates('去重.txt', '分类.txt')
output_file.write(line + '\n')
output_file.write('\n') # 在每个小段后添加一个空行作为分隔
# 调用函数并传入文件路径和输出文件名
parse_file('2.txt', '收集.txt')
# 打开文档并读取所有行
with open('分类.txt', 'r', encoding="utf-8") as file:
lines = file.readlines()
# 使用列表来存储唯一的行的顺序
unique_lines = []
seen_lines = set()
# 遍历每一行,如果是新的就加入unique_lines
for line in lines:
if line not in seen_lines:
unique_lines.append(line)
seen_lines.add(line)
# 将唯一的行写入新的文档
with open('大杂烩.txt', 'w', encoding="utf-8") as file:
file.writelines(unique_lines)
################################################################################################任务结束,删除不必要的过程文件
files_to_remove = ['去重.txt', '分类.txt', '回收站.txt', "2.txt", "a0.txt", "a.txt", "a1.txt", "b0.txt", "b.txt", "c.txt", "c1.txt", "c2.txt", "d.txt", "e.txt", "f0.txt", "f.txt", "f1.txt", "g0.txt", "g.txt", "g1.txt", "h0.txt", "h.txt", "h1.txt", "i.txt", \
"i1.txt", "j.txt", "j1.txt", "k.txt", "l0.txt", "l.txt", "l1.txt", "m.txt", "m1.txt", \
"n0.txt","n.txt","n1.txt", "o1.txt", "o.txt", "p.txt", "汇总.txt"]
files_to_remove = ["2.txt", "汇总.txt"]
for file in files_to_remove:
if os.path.exists(file):