Create iptv.py

This commit is contained in:
frxz751113
2024-08-13 16:51:56 +08:00
committed by GitHub
parent 830b0222cb
commit c9963a3d6f
+780
View File
@@ -0,0 +1,780 @@
#本程序只适用于酒店源的检测,请勿移植他用
import time
import concurrent.futures
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import requests
import re
import os
import threading
from queue import Queue
import queue
from datetime import datetime
import replace
import fileinput
from tqdm import tqdm
from pypinyin import lazy_pinyin
from opencc import OpenCC
import base64
import cv2
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from translate import Translator # 导入Translator类,用于文本翻译
# 扫源测绘空间地址
# 搜素关键词:"iptv/live/zh_cn.js" && country="CN" && region="Hunan" && city="changsha"
# 搜素关键词:"ZHGXTV" && country="CN" && region="Hunan" && city="changsha"
#"isShowLoginJs"智能KUTV管理
######################################################################################################################
######################################################################################################################
###########################################################ZHGX采集####################################################
######################################################################################################################
######################################################################################################################
urls = [
"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcmVnaW9uPSLlub%2Fopb8i",#广西
"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcmVnaW9uPSJndWFuZ2Rvbmci",#广东
"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcmVnaW9uPSJHdWFuZ2RvbmciICYmIGNpdHk9InNoYW50b3Ui",#广东汕头
"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcmVnaW9uPSJIZW5hbiI%3D",#河南#
"https://fofa.info/result?qbase64=IlpIR1hUViIgJiYgcmVnaW9uPSJoZWJlaSI%3D",#河北#
]
#定义网址替换规则
def modify_urls(url):
modified_urls = []
ip_start_index = url.find("//") + 2
ip_end_index = url.find(":", ip_start_index)
base_url = url[:ip_start_index] # http:// or https://
ip_address = url[ip_start_index:ip_end_index]
port = url[ip_end_index:]
ip_end = "/ZHGXTV/Public/json/live_interface.txt"
for i in range(1, 256):
modified_ip = f"{ip_address[:-1]}{i}"
modified_url = f"{base_url}{modified_ip}{port}{ip_end}"
modified_urls.append(modified_url)
return modified_urls
#定义超时时间以及是否返回正确的状态码
def is_url_accessible(url):
try:
response = requests.get(url, timeout=1) #//////////////////
if response.status_code == 200:
return url
except requests.exceptions.RequestException:
pass
return None
results = []
for url in urls:
# 创建一个Chrome WebDriver实例
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=chrome_options)
# 使用WebDriver访问网页
driver.get(url) # 将网址替换为你要访问的网页地址
time.sleep(10)
# 获取网页内容
page_content = driver.page_source
# 关闭WebDriver
driver.quit()
# 查找所有符合指定格式的网址
pattern = r"http://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+" # 设置匹配的格式,如http://8.8.8.8:8888
urls_all = re.findall(pattern, page_content)
# urls = list(set(urls_all)) # 去重得到唯一的URL列表
urls = set(urls_all) # 去重得到唯一的URL列表
x_urls = []
for url in urls: # 对urls进行处理,ip第四位修改为1,并去重
url = url.strip()
ip_start_index = url.find("//") + 2
ip_end_index = url.find(":", ip_start_index)
ip_dot_start = url.find(".") + 1
ip_dot_second = url.find(".", ip_dot_start) + 1
ip_dot_three = url.find(".", ip_dot_second) + 1
base_url = url[:ip_start_index] # http:// or https://
ip_address = url[ip_start_index:ip_dot_three]
port = url[ip_end_index:]
ip_end = "1"
modified_ip = f"{ip_address}{ip_end}"
x_url = f"{base_url}{modified_ip}{port}"
x_urls.append(x_url)
urls = set(x_urls) # 去重得到唯一的URL列表
valid_urls = []
# 多线程获取可用url
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
futures = []
for url in urls:
url = url.strip()
modified_urls = modify_urls(url)
for modified_url in modified_urls:
futures.append(executor.submit(is_url_accessible, modified_url))
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
valid_urls.append(result)
for url in valid_urls:
print(url)
# 遍历网址列表,获取JSON文件并解析
for url in valid_urls:
try:
# 发送GET请求获取JSON文件,设置超时时间为0.5秒
json_url = f"{url}"
response = requests.get(json_url, timeout=1)################################
json_data = response.content.decode('utf-8')
try:
# 按行分割数据
lines = json_data.split('\n')
for line in lines:
if 'hls' in line and ('udp' not in line or 'rtp' not in line): #行中需包含m3u,但排除udp和trp
line = line.strip()
if line:
name, channel_url = line.split(',')
urls = channel_url.split('/', 3)
url_data = json_url.split('/', 3)
if len(urls) >= 4:
urld = (f"{urls[0]}//{url_data[2]}/{urls[3]}")
else:
urld = (f"{urls[0]}//{url_data[2]}")
print(f"{name},{urld}")
if name and urld:
name = name.replace("高清电影", "影迷电影")
name = name.replace("中央", "CCTV")
name = name.replace("高清", "")
name = name.replace("HD", "")
name = name.replace("标清", "")
name = name.replace("超高", "")
name = name.replace("频道", "")
name = name.replace("靓妆", "女性时尚")
name = name.replace("本港台", "TVB星河")
name = name.replace("汉3", "")
name = name.replace("汉4", "")
name = name.replace("汉5", "")
name = name.replace("汉6", "")
name = name.replace("CHC动", "")
name = name.replace("CHC家", "")
name = name.replace("CHC影", "")
name = name.replace("-", "")
name = name.replace(" ", "")
name = name.replace("PLUS", "+")
name = name.replace("", "+")
name = name.replace("(", "")
name = name.replace(")", "")
name = name.replace("L", "")
name = name.replace("新农村", "河南新农村")
name = name.replace("百姓调解", "河南百姓调解")
name = name.replace("法治", "河南法治")
name = name.replace("睛彩中原", "河南睛彩")
name = name.replace("军事", "河南军事")
name = name.replace("梨园", "河南梨园")
name = name.replace("相声小品", "河南相声小品")
name = name.replace("移动戏曲", "河南移动戏曲")
name = name.replace("都市生活", "河南都市生活")
name = name.replace("民生", "河南民生")
name = name.replace("CCTVNEWS", "CCTV13")
name = name.replace("cctv", "CCTV")
name = re.sub(r"CCTV(\d+)台", r"CCTV\1", name)
name = name.replace("CCTV1综合", "CCTV1")
name = name.replace("CCTV2财经", "CCTV2")
name = name.replace("CCTV3综艺", "CCTV3")
name = name.replace("CCTV4国际", "CCTV4")
name = name.replace("CCTV4中文国际", "CCTV4")
name = name.replace("CCTV4欧洲", "CCTV4")
name = name.replace("CCTV5体育", "CCTV5")
name = name.replace("CCTV5+体育", "CCTV5+")
name = name.replace("CCTV6电影", "CCTV6")
name = name.replace("CCTV7军事", "CCTV7")
name = name.replace("CCTV7军农", "CCTV7")
name = name.replace("CCTV7农业", "CCTV7")
name = name.replace("CCTV7国防军事", "CCTV7")
name = name.replace("CCTV8电视剧", "CCTV8")
name = name.replace("CCTV8纪录", "CCTV9")
name = name.replace("CCTV9记录", "CCTV9")
name = name.replace("CCTV9纪录", "CCTV9")
name = name.replace("CCTV10科教", "CCTV10")
name = name.replace("CCTV11戏曲", "CCTV11")
name = name.replace("CCTV12社会与法", "CCTV12")
name = name.replace("CCTV13新闻", "CCTV13")
name = name.replace("CCTV新闻", "CCTV13")
name = name.replace("CCTV14少儿", "CCTV14")
name = name.replace("央视14少儿", "CCTV14")
name = name.replace("CCTV少儿超", "CCTV14")
name = name.replace("CCTV15音乐", "CCTV15")
name = name.replace("CCTV音乐", "CCTV15")
name = name.replace("CCTV16奥林匹克", "CCTV16")
name = name.replace("SCTV5四川影视)", "SCTV5")
name = name.replace("CCTV17农业农村", "CCTV17")
name = name.replace("CCTV17军农", "CCTV17")
name = name.replace("CCTV17农业", "CCTV17")
name = name.replace("CCTV5+体育赛视", "CCTV5+")
name = name.replace("CCTV5+赛视", "CCTV5+")
name = name.replace("CCTV5+体育赛事", "CCTV5+")
name = name.replace("CCTV5+赛事", "CCTV5+")
name = name.replace("CCTV5+体育", "CCTV5+")
name = name.replace("CCTV5赛事", "CCTV5+")
name = name.replace("凤凰中文台", "凤凰中文")
name = name.replace("凤凰资讯台", "凤凰资讯")
name = name.replace("CCTV4K测试)", "CCTV4")
name = name.replace("CCTV164K", "CCTV16")
name = name.replace("上海东方卫视", "上海卫视")
name = name.replace("东方卫视", "上海卫视")
name = name.replace("内蒙卫视", "内蒙古卫视")
name = name.replace("福建东南卫视", "东南卫视")
name = name.replace("广东南方卫视", "南方卫视")
name = name.replace("湖南金鹰卡通", "金鹰卡通")
name = name.replace("炫动卡通", "哈哈炫动")
name = name.replace("卡酷卡通", "卡酷少儿")
name = name.replace("卡酷动画", "卡酷少儿")
name = name.replace("BRTVKAKU少儿", "卡酷少儿")
name = name.replace("优曼卡通", "优漫卡通")
name = name.replace("优曼卡通", "优漫卡通")
name = name.replace("嘉佳卡通", "佳嘉卡通")
name = name.replace("世界地理", "地理世界")
name = name.replace("CCTV世界地理", "地理世界")
name = name.replace("BTV北京卫视", "北京卫视")
name = name.replace("BTV冬奥纪实", "冬奥纪实")
name = name.replace("东奥纪实", "冬奥纪实")
name = name.replace("卫视台", "卫视")
name = name.replace("湖南电视台", "湖南卫视")
name = name.replace("少儿科教", "少儿")
name = name.replace("TV星河2", "星河")
name = name.replace("影视剧", "影视")
name = name.replace("电视剧", "影视")
name = name.replace("奥运匹克", "")
results.append(f"{name},{urld}")
except:
continue
except:
continue
channels = []
for result in results:
line = result.strip()
if result:
channel_name, channel_url = result.split(',')
channels.append((channel_name, channel_url))
with open("iptv.txt", 'w', encoding='utf-8') as file:
for result in results:
file.write(result + "\n")
print(result)
print("频道列表文件iptv.txt获取完成!")
for line in fileinput.input("iptv.txt", inplace=True): #打开文件,并对其进行关键词原地替换
line = line.replace("河南河南", "河南")
line = line.replace("河南河南", "河南")
line = line.replace("河南法制", "河南法治")
line = line.replace("国防河南军事", "")
line = line.replace("CCTV12法制", "CCTV12")
line = line.replace("CCTV15+音乐", "CCTV15")
line = line.replace("CCTV17农村农业", "CCTV17")
line = line.replace("(福建卫视)", "")
line = line.replace("公共,http://171.8", "河南公共,http://171.8")
line = line.replace("新闻,http://171.8", "河南新闻,http://171.8")
line = line.replace("影视,http://171.8", "河南电视剧,http://171.8")
line = line.replace("河南影视,http://171.13", "河南电视剧,http://171.13")
line = line.replace("广东大湾区卫视", "大湾区卫视")
line = line.replace("吉林延边卫视", "延边卫视")
line = line.replace("国防河南军事", "国防军事")
line = line.replace("都市生活", "都市")
line = line.replace("都市生活6", "都市")
print(line, end="") #设置end="",避免输出多余的换行符
######################################################################################################################
######################################################################################################################
######################################################################################################################
######################################################################################################################
######################################################################################################################
#定义智慧桌面采集地址
urls = [
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgcG9ydD0iMTExMSI%3D", # 1111
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgcG9ydD0iODA4Ig%3D%3D", #808
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyI%3D", #随机
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5rKz5YyXIg%3D%3D", #河北
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5bm%2F5LicIg%3D%3D", #广东
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5rKz5Y2XIg%3D%3D", # 河南
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgcmVnaW9uPSJIdWJlaSIg",#湖北
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgcG9ydD0iODE4MSIgJiYgY2l0eT0iR3VpZ2FuZyI%3D", #贵港8181
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY2l0eT0ieXVsaW4i",#玉林
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5bm%2F6KW%2FIg%3D%3D", #广西 壮族iptv
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iSGViZWki", # Hebei (河北)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iYmVpamluZyI%3D", # Beijing (北京)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iZ3Vhbmdkb25nIg%3D%3D", # Guangdong (广东)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0ic2hhbmdoYWki", # Shanghai (上海)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0idGlhbmppbiI%3D", # Tianjin (天津)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iY2hvbmdxaW5nIg%3D%3D", # Chongqing (重庆)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0ic2hhbnhpIg%3D%3D", # Shanxi (山西)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iU2hhYW54aSI%3D", # Shaanxi (陕西)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0ibGlhb25pbmci", # Liaoning (辽宁)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iamlhbmdzdSI%3D", # Jiangsu (江苏)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iemhlamlhbmci", # Zhejiang (浙江)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5a6J5b69Ig%3D%3D", # Anhui (安徽)
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0iRnVqaWFuIg%3D%3D", # 福建
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5rGf6KW%2FIg%3D%3D", # 江西
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5bGx5LicIg%3D%3D", # 山东
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5rKz5Y2XIg%3D%3D", # 河南
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5rmW5YyXIg%3D%3D", # 湖北
"https://fofa.info/result?qbase64=ImlwdHYvbGl2ZS96aF9jbi5qcyIgJiYgY291bnRyeT0iQ04iICYmIHJlZ2lvbj0i5rmW5Y2XIg%3D%3D" # 湖南
]
def modify_urls(url):
modified_urls = []
ip_start_index = url.find("//") + 2
ip_end_index = url.find(":", ip_start_index)
base_url = url[:ip_start_index] # http:// or https://
ip_address = url[ip_start_index:ip_end_index]
port = url[ip_end_index:]
ip_end = "/iptv/live/1000.json?key=txiptv"
for i in range(1, 256):
modified_ip = f"{ip_address[:-1]}{i}"
modified_url = f"{base_url}{modified_ip}{port}{ip_end}"
modified_urls.append(modified_url)
return modified_urls
def is_url_accessible(url):
try:
response = requests.get(url, timeout=1) #//////////////////
if response.status_code == 200:
return url
except requests.exceptions.RequestException:
pass
return None
results = []
for url in urls:
# 创建一个Chrome WebDriver实例
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=chrome_options)
# 使用WebDriver访问网页
driver.get(url) # 将网址替换为你要访问的网页地址
time.sleep(10)
# 获取网页内容
page_content = driver.page_source
# 关闭WebDriver
driver.quit()
# 查找所有符合指定格式的网址
pattern = r"http://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+" # 设置匹配的格式,如http://8.8.8.8:8888
urls_all = re.findall(pattern, page_content)
# urls = list(set(urls_all)) # 去重得到唯一的URL列表
urls = set(urls_all) # 去重得到唯一的URL列表
x_urls = []
for url in urls: # 对urls进行处理,ip第四位修改为1,并去重
url = url.strip()
ip_start_index = url.find("//") + 2
ip_end_index = url.find(":", ip_start_index)
ip_dot_start = url.find(".") + 1
ip_dot_second = url.find(".", ip_dot_start) + 1
ip_dot_three = url.find(".", ip_dot_second) + 1
base_url = url[:ip_start_index] # http:// or https://
ip_address = url[ip_start_index:ip_dot_three]
port = url[ip_end_index:]
ip_end = "1"
modified_ip = f"{ip_address}{ip_end}"
x_url = f"{base_url}{modified_ip}{port}"
x_urls.append(x_url)
urls = set(x_urls) # 去重得到唯一的URL列表
valid_urls = []
# 多线程获取可用url
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
futures = []
for url in urls:
url = url.strip()
modified_urls = modify_urls(url)
for modified_url in modified_urls:
futures.append(executor.submit(is_url_accessible, modified_url))
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
valid_urls.append(result)
for url in valid_urls:
print(url)
# 遍历网址列表,获取JSON文件并解析
for url in valid_urls:
try:
# 发送GET请求获取JSON文件,设置超时时间为0.5秒
ip_start_index = url.find("//") + 2
ip_dot_start = url.find(".") + 1
ip_index_second = url.find("/", ip_dot_start)
base_url = url[:ip_start_index] # http:// or https://
ip_address = url[ip_start_index:ip_index_second]
url_x = f"{base_url}{ip_address}"
json_url = f"{url}"
response = requests.get(json_url, timeout=1) #///////////////
json_data = response.json()
try:
# 解析JSON文件,获取name和url字段
for item in json_data['data']:
if isinstance(item, dict):
name = item.get('name')
urlx = item.get('url')
if 'udp' in urlx or 'rtp' in urlx:
continue # 跳过包含'udp'或'rtp'的行
if 'http' in urlx:
urld = f"{urlx}"
else:
urld = f"{url_x}{urlx}"
if name and urld:
name = name.replace("高清电影", "影迷电影")
name = name.replace("中央", "CCTV")
name = name.replace("高清", "")
name = name.replace("HD", "")
name = name.replace("标清", "")
name = name.replace("超高", "")
name = name.replace("频道", "")
name = name.replace("汉1", "")
name = name.replace("汉2", "")
name = name.replace("汉3", "")
name = name.replace("汉4", "")
name = name.replace("汉5", "")
name = name.replace("汉6", "")
name = name.replace("CHC动", "")
name = name.replace("CHC家", "")
name = name.replace("CHC影", "")
name = name.replace("-", "")
name = name.replace(" ", "")
name = name.replace("PLUS", "+")
name = name.replace("", "+")
name = name.replace("(", "")
name = name.replace(")", "")
name = name.replace("CHC", "")
name = name.replace("L", "")
name = name.replace("002", "AA酒店MV")
name = name.replace("测试002", "凤凰卫视")
name = name.replace("测试003", "凤凰卫视")
name = name.replace("测试004", "私人影院")
name = name.replace("测试005", "私人影院")
name = name.replace("测试006", "东森洋片")
name = name.replace("测试007", "东森电影")
name = name.replace("测试008", "AXN电影")
name = name.replace("测试009", "好莱坞电影")
name = name.replace("测试010", "龙祥电影")
name = name.replace("莲花台", "凤凰香港")
name = name.replace("测试014", "凤凰资讯")
name = name.replace("测试015", "未知影视")
name = name.replace("TV星河", "")
name = name.replace("305", "酒店影视1")
name = name.replace("306", "酒店影视2")
name = name.replace("307", "酒店影视3")
name = name.replace("CMIPTV", "")
name = name.replace("cctv", "CCTV")
name = re.sub(r"CCTV(\d+)台", r"CCTV\1", name)
name = name.replace("CCTV1综合", "CCTV1")
name = name.replace("CCTV2财经", "CCTV2")
name = name.replace("CCTV3综艺", "CCTV3")
name = name.replace("CCTV4国际", "CCTV4")
name = name.replace("CCTV4中文国际", "CCTV4")
name = name.replace("CCTV4欧洲", "CCTV4")
name = name.replace("CCTV5体育", "CCTV5")
name = name.replace("CCTV5+体育", "CCTV5+")
name = name.replace("CCTV6电影", "CCTV6")
name = name.replace("CCTV7军事", "CCTV7")
name = name.replace("CCTV7军农", "CCTV7")
name = name.replace("CCTV7农业", "CCTV7")
name = name.replace("CCTV7国防军事", "CCTV7")
name = name.replace("CCTV8电视剧", "CCTV8")
name = name.replace("CCTV8纪录", "CCTV9")
name = name.replace("CCTV9记录", "CCTV9")
name = name.replace("CCTV9纪录", "CCTV9")
name = name.replace("CCTV10科教", "CCTV10")
name = name.replace("CCTV11戏曲", "CCTV11")
name = name.replace("CCTV12社会与法", "CCTV12")
name = name.replace("CCTV13新闻", "CCTV13")
name = name.replace("CCTV新闻", "CCTV13")
name = name.replace("CCTV14少儿", "CCTV14")
name = name.replace("央视14少儿", "CCTV14")
name = name.replace("CCTV少儿超", "CCTV14")
name = name.replace("CCTV15音乐", "CCTV15")
name = name.replace("CCTV音乐", "CCTV15")
name = name.replace("CCTV16奥林匹克", "CCTV16")
name = name.replace("CCTV17农业农村", "CCTV17")
name = name.replace("CCTV17军农", "CCTV17")
name = name.replace("CCTV17农业", "CCTV17")
name = name.replace("CCTV5+体育赛视", "CCTV5+")
name = name.replace("CCTV5+赛视", "CCTV5+")
name = name.replace("CCTV5+体育赛事", "CCTV5+")
name = name.replace("CCTV5+赛事", "CCTV5+")
name = name.replace("CCTV5+体育", "CCTV5+")
name = name.replace("CCTV5赛事", "CCTV5+")
name = name.replace("凤凰中文台", "凤凰中文")
name = name.replace("凤凰资讯台", "凤凰资讯")
name = name.replace("CCTV4K测试)", "CCTV4")
name = name.replace("CCTV164K", "CCTV16")
name = name.replace("上海东方卫视", "上海卫视")
name = name.replace("东方卫视", "上海卫视")
name = name.replace("内蒙卫视", "内蒙古卫视")
name = name.replace("福建东南卫视", "东南卫视")
name = name.replace("广东南方卫视", "南方卫视")
name = name.replace("湖南金鹰卡通", "金鹰卡通")
name = name.replace("炫动卡通", "哈哈炫动")
name = name.replace("卡酷卡通", "卡酷少儿")
name = name.replace("卡酷动画", "卡酷少儿")
name = name.replace("BRTVKAKU少儿", "卡酷少儿")
name = name.replace("优曼卡通", "优漫卡通")
name = name.replace("优曼卡通", "优漫卡通")
name = name.replace("嘉佳卡通", "佳嘉卡通")
name = name.replace("世界地理", "地理世界")
name = name.replace("CCTV世界地理", "地理世界")
name = name.replace("BTV北京卫视", "北京卫视")
name = name.replace("BTV冬奥纪实", "冬奥纪实")
name = name.replace("东奥纪实", "冬奥纪实")
name = name.replace("卫视台", "卫视")
name = name.replace("湖南电视台", "湖南卫视")
name = name.replace("少儿科教", "少儿")
name = name.replace("TV星河2", "星河")
name = name.replace("影视剧", "影视")
name = name.replace("电视剧", "影视")
name = name.replace("奥运匹克", "")
name = name.replace("TVBTVB", "TVB")
name = name.replace("星空卫视", "动物杂技")
results.append(f"{name},{urld}")
except:
continue
except:
continue
channels = []
for result in results:
line = result.strip()
if result:
channel_name, channel_url = result.split(',')
channels.append((channel_name, channel_url))
with open("iptv.txt", 'a', encoding='utf-8') as file: #打开文本以追加的形式写入行到ZHGX文件
for result in results:
file.write(result + "\n")
print(result)
print("频道列表文件iptv.txt追加写入成功!")
######################################################################################################################
######################################################################################################################
######################################################################################################################
######################################################################################################################
######################################################################################################################
###################################################去除列表中的组播地址,酒店源验证整理
def filter_lines(input_file, output_file):
with open(input_file, 'r', encoding='utf-8') as file:
lines = file.readlines()
filtered_lines = []
for line in lines:
if ('hls' in line and 'm3u' in line) or ('tsfile' in line and 'm3u' in line): #行中包含m3u的同时还要包含hls或者tsfile
if 'udp' not in line and 'rtp' not in line: # and 'CCTV' not in line and '卫视' not in line 排除组播地址
filtered_lines.append(line)
with open(output_file, 'w', encoding='utf-8') as output_file:
output_file.writelines(filtered_lines)
filter_lines("iptv.txt", "iptv.txt")
###################################################打开文件,并对其进行行内关键词原地替换再次规范频道名,若无异类频道名,此段代码可删
for line in fileinput.input("iptv.txt", inplace=True): #
line = line.replace("CHC电影", "影迷电影")
line = line.replace("湖北公共新闻", "湖北公共")
line = line.replace("CHC电影", "影迷电影")
line = line.replace("CHC电影", "影迷电影")
line = line.replace("河南影视", "河南电视剧")
line = line.replace("公共新闻", "公共")
line = line.replace("广东河南", "广东")
line = line.replace("公共,http://58.51", "湖北公共新闻,http://58.51")
line = line.replace("", "")
line = line.replace("", "")
line = line.replace("", "")
line = line.replace("影视文化", "影视")
line = line.replace("武汉影视", "武汉电视剧")
line = line.replace("公共新闻,http://58.", "湖北公共新闻,http://58.")
line = line.replace("湖北公共,", "湖北公共新闻,")
line = line.replace("湖北湖北", "湖北")
line = line.replace("深圳影视", "深圳电视剧")
print(line, end="") #设置end="",避免输出多余的换行符
###############################################################################文本排序
# 打开原始文件读取内容,并写入新文件
with open('iptv.txt', 'r', encoding='utf-8') as file:
lines = file.readlines()
# 定义一个函数,用于提取每行的第一个数字
def extract_first_number(line):
match = re.search(r'\d+', line)
return int(match.group()) if match else float('inf')
# 对列表中的行进行排序
# 按照第一个数字的大小排列,如果不存在数字则按中文拼音排序
sorted_lines = sorted(lines, key=lambda x: (not 'CCTV' in x, extract_first_number(x) if 'CCTV' in x else lazy_pinyin(x.strip())))
# 将排序后的行写入新的utf-8编码的文本文件,文件名基于原文件名
output_file_path = "sorted_" + os.path.basename(file_path)
# 写入新文件
with open('酒店源.txt', "w", encoding="utf-8") as file:
for line in sorted_lines:
file.write(line)
print(f"文件已排序并保存为: {output_file_path}")
#
##########################################################################################简体转繁体
# 创建一个OpenCC对象,指定转换的规则为繁体字转简体字
converter = OpenCC('t2s.json')#繁转简
#converter = OpenCC('s2t.json')#简转繁
# 打开txt文件
with open('酒店源.txt', 'r', encoding='utf-8') as file:
traditional_text = file.read()
# 进行繁体字转简体字的转换
simplified_text = converter.convert(traditional_text)
# 将转换后的简体字写入txt文件
with open('酒店源.txt', 'w', encoding='utf-8') as file:
file.write(simplified_text)
#
########################################################################定义关键词分割规则,分类提取
def check_and_write_file(input_file, output_file, keywords):
# 使用 split(', ') 而不是 split(',') 来分割关键词
keywords_list = keywords.split(', ')
first_keyword = keywords_list[0] # 获取第一个关键词作为头部信息
pattern = '|'.join(re.escape(keyword) for keyword in keywords_list)
extracted_lines = False
with open(input_file, 'r', encoding='utf-8') as file:
lines = file.readlines()
with open(output_file, 'w', encoding='utf-8') as out_file:
out_file.write(f'{first_keyword},#genre#\n') # 使用第一个关键词作为头部信息
for line in lines:
if 'genre' not in line and 'epg' not in line:
if re.search(pattern, line):
out_file.write(line)
extracted_lines = True
# 如果没有提取到任何关键词,则不保留输出文件
if not extracted_lines:
os.remove(output_file) # 删除空的输出文件
print(f"未提取到关键词,{output_file} 已被删除。")
else:
print(f"文件已提取关键词并保存为: {output_file}")
# 按类别提取关键词并写入文件
check_and_write_file('酒店源.txt', 'a0.txt', keywords="央视频道, 8K, 4K, 4k")
check_and_write_file('酒店源.txt', 'a.txt', keywords="央视频道, CCTV, 8K, 4K, 爱上4K, 纯享, 风云剧场, 怀旧剧场, 影迷, 高清电影, 动作电影, 每日影院, 全球大片, 第一剧场, 家庭影院, 影迷电影, 星光, 华语, 美国大片, 峨眉")
check_and_write_file('酒店源.txt', 'a1.txt', keywords="央视频道, 风云音乐, 女性时尚, 地理世界, 音乐现场")
check_and_write_file('酒店源.txt', 'b.txt', keywords="卫视频道, 卫视, 凤凰, 星空")
check_and_write_file('酒店源.txt', 'c.txt', keywords="影视频道, 爱情喜剧, 爱喜喜剧, 风云剧场, 怀旧剧场, 影迷, 高清电影, 动作电影, 每日影院, 全球大片, 第一剧场, 家庭影院, 影迷电影, 星光, 华语, 美国大片, 峨眉, \
电影, 惊嫊悬疑, 东北热剧, 无名, 都市剧场, iHOT, 剧场, 欢笑剧场, 重温经典, 明星大片, 中国功夫, 军旅, 东北热剧, 中国功夫, 军旅剧场, 古装剧场, \
家庭剧场, 惊悚悬疑, 欢乐剧场, 潮妈辣婆, 爱情喜剧, 精品大剧, 超级影视, 超级电影, 黑莓动画, 黑莓电影, 海外剧场, 精彩影视, 无名影视, 潮婆辣妈, 超级剧, 热播精选")
check_and_write_file('酒店源.txt', 'c1.txt', keywords="影视频道, 求索动物, 求索, 求索科学, 求索记录, 爱谍战, 爱动漫, 爱科幻, 爱青春, 爱自然, 爱科学, 爱浪漫, 爱历史, 爱旅行, 爱奇谈, 爱怀旧, 爱赛车, 爱都市, 爱体育, 爱经典, \
爱玩具, 爱喜剧, 爱悬疑, 爱幼教, 爱院线")
check_and_write_file('酒店源.txt', 'c2.txt', keywords="影视频道, 军事评论, 农业致富, 哒啵赛事, 怡伴健康, 武博世界, 超级综艺, 哒啵, HOT, 炫舞未来, 精品体育, 精品萌宠, 精品记录, 超级体育, 金牌, 武术世界, 精品纪录")
check_and_write_file('酒店源.txt', 'd.txt', keywords="少儿频道, 少儿, 卡通, 动漫, 宝贝, 哈哈")
check_and_write_file('酒店源.txt', 'e.txt', keywords="港澳频道, TVB, 珠江台, 澳门, 龙华, 广场舞, 动物杂技, 民视, 中视, 华视, AXN, MOMO, 采昌, 耀才, 靖天, 镜新闻, 靖洋, 莲花, 年代, 爱尔达, 好莱坞, 华丽, 非凡, 公视, \
寰宇, 无线, EVEN, MoMo, 爆谷, 面包, momo, 唐人, 中华小, 三立, CNA, FOX, RTHK, Movie, 八大, 中天, 中视, 东森, 凤凰, 天映, 美亚, 环球, 翡翠, 亚洲, 大爱, 大愛, 明珠, 半岛, AMC, 龙祥, 台视, 1905, 纬来, 神话, 经典都市, 视界, \
番薯, 私人, 酒店, TVB, 凤凰, 半岛, 星光视界, 大愛, 新加坡, 星河, 明珠, 环球, 翡翠台")
check_and_write_file('酒店源.txt', 'f.txt', keywords="省市频道, 湖北, 武汉, 河北, 广东, 河南, 陕西, 四川, 湖南, 广西, 石家庄, 南宁, 汕头, 揭阳, 普宁, 福建, 辽宁")
check_and_write_file('酒店源.txt', 'o1.txt', keywords="其他频道, 新闻, 综合, 文艺, 电视, 公共, 科教, 教育, 民生, 轮播, 套, 法制, 文化, 经济, 生活")
check_and_write_file('酒店源.txt', 'o.txt', keywords="其他频道, , ")
#
#对生成的文件进行合并
file_contents = []
file_paths = ["e.txt", "a0.txt", "a.txt", "a1.txt", "b.txt", "c.txt", "c1.txt", "c2.txt", "d.txt", "f.txt", "o1.txt", "o.txt"] # 替换为实际的文件路径列表
for file_path in file_paths:
if os.path.exists(file_path):
with open(file_path, 'r', encoding="utf-8") as file:
content = file.read()
file_contents.append(content)
else: # 如果文件不存在,则提示异常并打印提示信息
print(f"文件 {file_path} 不存在,跳过")
# 写入合并后的文件
with open("去重.txt", "w", encoding="utf-8") as output:
output.write('\n'.join(file_contents))
#
##################################################################### 打开文档并读取所有行 ,对提取后重复的频道去重
with open('去重.txt', 'r', encoding="utf-8") as file:
lines = file.readlines()
# 使用列表来存储唯一的行的顺序
unique_lines = []
seen_lines = set()
# 遍历每一行,如果是新的就加入unique_lines
for line in lines:
if line not in seen_lines:
unique_lines.append(line)
seen_lines.add(line)
# 将唯一的行写入新的文档
with open('酒店源.txt', 'w', encoding="utf-8") as file:
file.writelines(unique_lines)
#任务结束,删除不必要的过程文件
files_to_remove = ['去重.txt', "2.txt", "iptv.txt", "e.txt", "a0.txt", "a.txt", "a1.txt", "b.txt", "c.txt", "c1.txt", "c2.txt", "d.txt", "f.txt", "o1.txt", "o.txt", "检测结果.txt"]
for file in files_to_remove:
if os.path.exists(file):
os.remove(file)
else: # 如果文件不存在,则提示异常并打印提示信息
print(f"文件 {file} 不存在,跳过删除。")
print("任务运行完毕,酒店源频道列表可查看文件夹内txt文件!")
import cv2
import time
from tqdm import tqdm
# 初始化检测结果字典
detected_ips = {}
# 存储文件路径
file_path = "酒店源.txt"
output_file_path = "酒店优选.txt"
def get_ip_key(url):
"""从URL中提取IP地址,并构造一个唯一的键"""
# 找到'//'到第三个'.'之间的字符串
start = url.find('://') + 3 # '://'.length 是 3
end = start
dot_count = 0
while dot_count < 3:
end = url.find('.', end)
if end == -1: # 如果没有找到第三个'.',就结束
break
dot_count += 1
return url[start:end] if dot_count == 3 else None
# 打开输入文件和输出文件
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
# 获取总行数用于进度条
total_lines = len(lines)
# 写入通过检测的行到新文件
with open(output_file_path, 'w', encoding='utf-8') as output_file:
# 使用tqdm显示进度条
for i, line in tqdm(enumerate(lines), total=total_lines, desc="Processing", unit='line'):
# 检查是否包含 'genre'
if 'genre' in line:
output_file.write(line)
continue
# 分割频道名称和URL,并去除空白字符
parts = line.split(',', 1)
if len(parts) == 2:
channel_name, url = parts
channel_name = channel_name.strip()
url = url.strip()
# 构造IP键
ip_key = get_ip_key(url)
if ip_key and ip_key in detected_ips:
# 如果IP键已存在,根据之前的结果决定是否写入新文件
if detected_ips[ip_key]['status'] == 'ok':
output_file.write(line)
elif ip_key: # 新IP键,进行检测
# 进行检测
cap = cv2.VideoCapture(url)
start_time = time.time()
frame_count = 0
# 尝试捕获5秒内的帧
while frame_count < 100 and (time.time() - start_time) < 5:
ret, frame = cap.read()
if not ret:
break
frame_count += 1
# 释放资源
cap.release()
# 根据捕获的帧数判断状态并记录结果
if frame_count >= 100: #5秒内超过100帧则写入
detected_ips[ip_key] = {'status': 'ok'}
output_file.write(line) # 写入检测通过的行
else:
detected_ips[ip_key] = {'status': 'fail'}
# 打印检测结果
for ip_key, result in detected_ips.items():
print(f"IP Key: {ip_key}, Status: {result['status']}")
print("检测完毕,继续组播IP查找")
print("检测完毕,继续组播IP查找")
print("检测完毕,下一步组播IP查找")
print("检测完毕,下一步组播IP查找")
print("\n\n\n\n\n\n")