布隆过滤器
docker run -p 6372:6379 -it -d --name redisbloom redislabs/rebloom
from bs4 import BeautifulSoup
# 使用示例
import redis
from crawlab import save_item
import requests
import json
import io
import edge_tts
import requests
import json
import io
import asyncio
def run_tts(file_path: str, output: str, voice: str = 'zh-CN-YunyangNeural') -> None:
try:
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
except Exception as e:
print(f"读取文件失败: {e}")
return
try:
communicate = edge_tts.Communicate(text, voice)
asyncio.run(communicate.save(output))
except Exception as e:
print(f"处理文本或保存音频失败: {e}")
class WeChatBot:
def __init__(self, api_key):
self.api_key = api_key
self.base_url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={self.api_key}'
self.headers = {'Content-Type': 'application/json'}
self.upload_url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={self.api_key}&type=file'
def send_message(self, data):
return self._post_request(self.base_url, data)
def upload_media(self, content, file_name, content_type):
files = {'media': (file_name, content, content_type)}
return self._post_request(self.upload_url, files=files, is_json=False)
def upload_text_as_media(self, text_content, file_name):
file_stream = io.BytesIO(text_content.encode('utf-8'))
return self.upload_media(file_stream, file_name, 'application/octet-stream')
def upload_audio_as_media(self, audio_file_path):
return self._upload_file(audio_file_path, 'audio/mpeg')
def upload_video_as_media(self, video_file_path):
return self._upload_file(video_file_path, 'video/mp4')
def send_media(self, response):
if response.get("media_id"):
data = {
"msgtype": "file",
"file": {"media_id": response["media_id"]}
}
return self.send_message(data)
return response
def _post_request(self, url, data=None, files=None, is_json=True):
try:
if is_json:
response = requests.post(url, headers=self.headers, data=json.dumps(data))
else:
response = requests.post(url, files=files)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"请求时出错: {e}")
return {"error": str(e)}
def _upload_file(self, file_path, content_type):
with open(file_path, 'rb') as file:
file_name = file_path.split('/')[-1]
return self.upload_media(file, file_name, content_type)
def text_to_speech_and_send(file_path: str, output: str, api_key: str, voice: str = 'zh-CN-YunyangNeural'):
# 文本转语音
run_tts(file_path, output, voice)
# 通过企业微信发送音频
bot = WeChatBot(api_key)
audio_response = bot.upload_audio_as_media(output)
print("上传音频文件的响应:", audio_response)
if 'media_id' in audio_response:
media_response = bot.send_media(audio_response)
print("发送音频媒体消息的响应:", media_response)
else:
print("音频文件上传失败")
class RedisBloomFilter:
def __init__(self, redis_host='116.198.253.144', redis_port=6372, redis_db=0, bf_name='my_bloom_filter', error_rate=0.01, initial_capacity=1000):
self.client = redis.Redis(host=redis_host, port=redis_port, db=redis_db)
self.bf_name = bf_name
if not self.bf_exists():
# 如果布隆过滤器不存在,则创建一个新的布隆过滤器
self.client.execute_command('BF.RESERVE', bf_name, error_rate, initial_capacity)
def bf_exists(self):
"""检查布隆过滤器是否存在"""
try:
# 尝试对过滤器进行一次检查操作,如果不抛出异常则说明过滤器存在
self.client.execute_command('BF.EXISTS', self.bf_name, "test_key")
return True
except redis.exceptions.ResponseError as e:
# 如果返回错误信息为布隆过滤器不存在,则过滤器确实不存在
if str(e).startswith("ERR not found"):
return False
# 其他错误抛出
raise
def add(self, item):
""" 添加元素到布隆过滤器 """
return self.client.execute_command('BF.ADD', self.bf_name, item)
def check(self, item):
""" 检查元素是否存在于布隆过滤器中 """
return self.client.execute_command('BF.EXISTS', self.bf_name, item)
def multi_add(self, *items):
""" 同时添加多个元素到布隆过滤器 """
return self.client.execute_command('BF.MADD', self.bf_name, *items)
def multi_check(self, *items):
""" 同时检查多个元素是否存在于布隆过滤器中 """
return self.client.execute_command('BF.MEXISTS', self.bf_name, *items)
def fetch_and_extract_links(url):
# Using a session object for connection pooling
session = requests.Session()
try:
# Fetch the page
response = session.get(url)
response.raise_for_status() # Raises a HTTPError for bad responses
except requests.RequestException as e:
print(f"Failed to fetch URL: {e}")
return []
# Parse the HTML content
soup = BeautifulSoup(response.content, 'html.parser')
# Define the base URL for link concatenation
base_url = "http://www.xianqihaotianmi.org"
extracted_data = []
# Extract all links within list items in unordered lists
links = soup.select("ul li a")
for link in links:
href = link.get("href")
if href and href.startswith("/read/"):
full_url = base_url + href # Concatenate the base URL with the href attribute
extracted_data.append({
"text": link.text.strip(),
"url": full_url
})
return extracted_data
def fetch_and_extract_content(url, selector):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
}
# Fetch the page
response = requests.get(url, headers=headers)
if response.status_code == 200:
# Parse the HTML content using BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
# Select elements based on the CSS selector
elements = soup.select(selector)
# Extract text from selected elements and combine them
extracted_text = ' '.join(element.get_text(strip=True) for element in elements)
return extracted_text
else:
print(f"Failed to fetch URL with status code: {response.status_code}")
return None
import time
class LinkProcessor:
def __init__(self, bf, bot):
self.bf = bf
self.bot = bot
def process_links(self, links):
for link in links:
self.process_single_link(link)
def process_single_link(self, link):
result = {
"text": link["text"],
"url": link["url"],
"content": None
}
try:
if not self.bf.check(link["text"]):
selector = 'div.panel-body.content-body.content-ext'
text_content = fetch_and_extract_content(link["url"], selector)
result["content"] = text_content
self.send_content_as_file(text_content, link["text"])
self.bf.add(link["text"])
save_item(result)
except Exception as e:
print(f"Error processing link {link['url']}: {str(e)}")
def send_content_as_file(self, content, title):
file_name = title.replace(" ", "_") + '.txt'
try:
output_file = title.replace(" ", "_") + '.mp3' # 输出的音频文件名
api_key = '4e35a96d-134b-45fa-9c5a-f3d4f65670f6' # 企业微信 API key
text_to_speech_and_send(file_name, output_file, api_key)
response = self.bot.upload_text_as_media(content, file_name)
if 'media_id' in response:
self.bot.send_media(response)
time.sleep(3) # Throttle sending messages
except Exception as e:
print(f"Failed to send media for {file_name}: {str(e)}")
if __name__ == "__main__":
bf = RedisBloomFilter(bf_name='xianqihaotianmi_liuhuangzi')
bot = WeChatBot('4e35a96d-134b-45fa-9c5a-f3d4f65670f6')
url = 'http://www.xianqihaotianmi.org/book/107684.html'
links_data = fetch_and_extract_links(url)
processor = LinkProcessor(bf, bot)
processor.process_links(links_data[-7:]) # Only process the last link for the example