diff --git a/py/测绘站采集.py b/py/测绘站采集.py index 38c262f..a76a814 100644 --- a/py/测绘站采集.py +++ b/py/测绘站采集.py @@ -62,32 +62,19 @@ def get_random_header(): proxies = [] last_refresh_time = 0 -def scrape_proxies(url): - """从指定URL抓取代理列表""" +def scrape_proxies_89ip(url): + """从 89ip.cn 抓取代理列表""" try: response = requests.get(url) response.raise_for_status() - soup = BeautifulSoup(response.text, 'html.parser') - # 找到代理表格 - table = soup.find('table', {'id': 'proxylisttable'}) - if not table: - return [] + # 89ip.cn 返回的代理列表是纯文本,每行一个代理 + proxies = response.text.splitlines() + # 过滤掉空行和无效的代理 + valid_proxies = [proxy.strip() for proxy in proxies if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}$', proxy.strip())] - # 解析表格中的代理数据 - proxies = [] - for row in table.find_all('tr')[1:]: # 跳过表头 - cols = row.find_all('td') - if len(cols) < 8: - continue - - ip = cols[0].text.strip() - port = cols[1].text.strip() - protocol = 'https' if cols[6].text.strip() == 'yes' else 'http' - - proxies.append(f"{protocol}://{ip}:{port}") - - return proxies + # 默认所有代理为 HTTP 协议 + return [f"http://{proxy}" for proxy in valid_proxies] except Exception as e: print(f"抓取代理失败: {str(e)}") @@ -99,130 +86,7 @@ def get_proxies(): current_time = time.time() # 如果代理列表为空或超过刷新间隔,重新抓取代理 - if not proxies or current_time - last_refresh_time > PROXY_REFRESH_INTERVAL: - print("正在刷新代理列表...") - proxy_url = "https://free-proxy-list.net/" - proxies = scrape_proxies(proxy_url) - last_refresh_time = current_time - - if not proxies: - raise Exception("无法获取代理") - - return proxies - -def safe_request(url): - """带重试机制和代理的请求函数""" - for attempt in range(MAX_RETRIES): - try: - # 随机延迟防止被封 - time.sleep(random.uniform(*DELAY_RANGE)) - - # 获取代理列表 - proxy_list = get_proxies() - - # 随机选择一个代理 - proxy = random.choice(proxy_list) - print(f"使用代理: {proxy}") - - response = requests.get( - url, - headers=get_random_header(), - proxies={"http": proxy, "https": proxy}, - timeout=REQUEST_TIMEOUT - ) - - # 检查HTTP状态码 - if response.status_code == 429: - wait_time = 30 # 遇到反爬等待30秒 - print(f"遇到反爬机制,等待{wait_time}秒后重试") - time.sleep(wait_time) - continue - - response.raise_for_status() - return response.text - - except Exception as e: - print(f"请求失败(第{attempt+1}次重试): {str(e)}") - if attempt == MAX_RETRIES - 1: - raise - -def validate_video(url, mcast): - """验证视频流有效性""" - video_url = f"{url}/rtp/{mcast}" - print(f"正在验证: {video_url}") - - try: - # 设置超时参数 - cap = cv2.VideoCapture(video_url, cv2.CAP_FFMPEG) - cap.set(cv2.CAP_PROP_TIMEOUT, 5000) # 5秒超时 - - if cap.isOpened(): - width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - cap.release() - return width > 0 and height > 0 - return False - - except Exception as e: - print(f"视频验证异常: {str(e)}") - return False - -def main(): - # 获取需要处理的文件列表 - files = [f.split('.')[0] for f in os.listdir('rtp') if f.endswith('.txt')] - print(f"待处理频道列表: {files}") - - for filename in files: - province_isp = filename.split('_') - if len(province_isp) != 2: - continue - - province, isp = province_isp - print(f"\n正在处理: {province}{isp}") - - # 读取组播地址 - try: - with open(f'rtp/{filename}.txt', 'r', encoding='utf-8') as f: - mcast = f.readline().split('rtp://')[1].split()[0].strip() - except Exception as e: - print(f"文件读取失败: {str(e)}") - continue - - # 构造搜索请求 - search_txt = f'"udpxy" && country="CN" && region="{province}"' - encoded_query = base64.b64encode(search_txt.encode()).decode() - search_url = f'https://fofa.info/result?qbase64={encoded_query}' - - # 执行搜索 - try: - html = safe_request(search_url) - except Exception as e: - print(f"搜索失败: {str(e)}") - continue - - # 解析搜索结果 - soup = BeautifulSoup(html, 'html.parser') - pattern = re.compile(r"http://\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+") - found_urls = set(pattern.findall(html)) - print(f"找到{len(found_urls)}个有效地址") - - # 验证地址有效性 - valid_urls = [url for url in found_urls if validate_video(url, mcast)] - print(f"验证通过{len(valid_urls)}个有效地址") - - # 生成播放列表 - if valid_urls: - output_file = f'playlist/{province}{isp}.txt' - with open(f'rtp/{filename}.txt', 'r') as src, open(output_file, 'w') as dst: - original_content = src.read() - for url in valid_urls: - modified = original_content.replace('rtp://', f'{url}/rtp/') - dst.write(modified + '\n') - print(f"已生成播放列表: {output_file}") - -if __name__ == '__main__': - main() - + if not proxies or current_time - last_refresh_time > print('对playlist文件夹里面的所有txt文件进行去重处理') def remove_duplicates_keep_order(folder_path):