布隆过滤器


docker run -p 6372:6379 -it -d --name redisbloom redislabs/rebloom


from bs4 import BeautifulSoup
# 使用示例
import redis
from crawlab import save_item

import requests
import json
import io


import edge_tts
import requests
import json
import io
import asyncio


def run_tts(file_path: str, output: str, voice: str = 'zh-CN-YunyangNeural') -> None:
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            text = file.read()
    except Exception as e:
        print(f"读取文件失败: {e}")
        return

    try:
        communicate = edge_tts.Communicate(text, voice)
        asyncio.run(communicate.save(output))
    except Exception as e:
        print(f"处理文本或保存音频失败: {e}")


class WeChatBot:
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={self.api_key}'
        self.headers = {'Content-Type': 'application/json'}
        self.upload_url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={self.api_key}&type=file'

    def send_message(self, data):
        return self._post_request(self.base_url, data)

    def upload_media(self, content, file_name, content_type):
        files = {'media': (file_name, content, content_type)}
        return self._post_request(self.upload_url, files=files, is_json=False)

    def upload_text_as_media(self, text_content, file_name):
        file_stream = io.BytesIO(text_content.encode('utf-8'))
        return self.upload_media(file_stream, file_name, 'application/octet-stream')

    def upload_audio_as_media(self, audio_file_path):
        return self._upload_file(audio_file_path, 'audio/mpeg')

    def upload_video_as_media(self, video_file_path):
        return self._upload_file(video_file_path, 'video/mp4')

    def send_media(self, response):
        if response.get("media_id"):
            data = {
                "msgtype": "file",
                "file": {"media_id": response["media_id"]}
            }
            return self.send_message(data)
        return response

    def _post_request(self, url, data=None, files=None, is_json=True):
        try:
            if is_json:
                response = requests.post(url, headers=self.headers, data=json.dumps(data))
            else:
                response = requests.post(url, files=files)
            response.raise_for_status()
            return response.json()
        except requests.RequestException as e:
            print(f"请求时出错: {e}")
            return {"error": str(e)}

    def _upload_file(self, file_path, content_type):
        with open(file_path, 'rb') as file:
            file_name = file_path.split('/')[-1]
            return self.upload_media(file, file_name, content_type)


def text_to_speech_and_send(file_path: str, output: str, api_key: str, voice: str = 'zh-CN-YunyangNeural'):
    # 文本转语音
    run_tts(file_path, output, voice)

    # 通过企业微信发送音频
    bot = WeChatBot(api_key)
    audio_response = bot.upload_audio_as_media(output)
    print("上传音频文件的响应:", audio_response)

    if 'media_id' in audio_response:
        media_response = bot.send_media(audio_response)
        print("发送音频媒体消息的响应:", media_response)
    else:
        print("音频文件上传失败")



class RedisBloomFilter:
    def __init__(self, redis_host='116.198.253.144', redis_port=6372, redis_db=0, bf_name='my_bloom_filter', error_rate=0.01, initial_capacity=1000):
        self.client = redis.Redis(host=redis_host, port=redis_port, db=redis_db)
        self.bf_name = bf_name
        if not self.bf_exists():
            # 如果布隆过滤器不存在,则创建一个新的布隆过滤器
            self.client.execute_command('BF.RESERVE', bf_name, error_rate, initial_capacity)

    def bf_exists(self):
        """检查布隆过滤器是否存在"""
        try:
            # 尝试对过滤器进行一次检查操作,如果不抛出异常则说明过滤器存在
            self.client.execute_command('BF.EXISTS', self.bf_name, "test_key")
            return True
        except redis.exceptions.ResponseError as e:
            # 如果返回错误信息为布隆过滤器不存在,则过滤器确实不存在
            if str(e).startswith("ERR not found"):
                return False
            # 其他错误抛出
            raise

    def add(self, item):
        """ 添加元素到布隆过滤器 """
        return self.client.execute_command('BF.ADD', self.bf_name, item)

    def check(self, item):
        """ 检查元素是否存在于布隆过滤器中 """
        return self.client.execute_command('BF.EXISTS', self.bf_name, item)

    def multi_add(self, *items):
        """ 同时添加多个元素到布隆过滤器 """
        return self.client.execute_command('BF.MADD', self.bf_name, *items)

    def multi_check(self, *items):
        """ 同时检查多个元素是否存在于布隆过滤器中 """
        return self.client.execute_command('BF.MEXISTS', self.bf_name, *items)



def fetch_and_extract_links(url):
    # Using a session object for connection pooling
    session = requests.Session()
    try:
        # Fetch the page
        response = session.get(url)
        response.raise_for_status()  # Raises a HTTPError for bad responses
    except requests.RequestException as e:
        print(f"Failed to fetch URL: {e}")
        return []

    # Parse the HTML content
    soup = BeautifulSoup(response.content, 'html.parser')

    # Define the base URL for link concatenation
    base_url = "http://www.xianqihaotianmi.org"
    extracted_data = []

    # Extract all links within list items in unordered lists
    links = soup.select("ul li a")
    for link in links:
        href = link.get("href")
        if href and href.startswith("/read/"):
            full_url = base_url + href  # Concatenate the base URL with the href attribute
            extracted_data.append({
                "text": link.text.strip(),
                "url": full_url
            })

    return extracted_data

def fetch_and_extract_content(url, selector):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
    }
    # Fetch the page
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        # Parse the HTML content using BeautifulSoup
        soup = BeautifulSoup(response.text, 'html.parser')
        # Select elements based on the CSS selector
        elements = soup.select(selector)
        # Extract text from selected elements and combine them
        extracted_text = ' '.join(element.get_text(strip=True) for element in elements)
        return extracted_text
    else:
        print(f"Failed to fetch URL with status code: {response.status_code}")
        return None


import time

class LinkProcessor:
    def __init__(self, bf, bot):
        self.bf = bf
        self.bot = bot

    def process_links(self, links):
        for link in links:
            self.process_single_link(link)

    def process_single_link(self, link):
        result = {
            "text": link["text"],
            "url": link["url"],
            "content": None
        }

        try:
            if not self.bf.check(link["text"]):
                selector = 'div.panel-body.content-body.content-ext'
                text_content = fetch_and_extract_content(link["url"], selector)
                result["content"] = text_content
                self.send_content_as_file(text_content, link["text"])
                self.bf.add(link["text"])
                save_item(result)

        except Exception as e:
            print(f"Error processing link {link['url']}: {str(e)}")

    def send_content_as_file(self, content, title):
        file_name = title.replace(" ", "_") + '.txt'

        try:
            output_file = title.replace(" ", "_") + '.mp3'  # 输出的音频文件名
            api_key = '4e35a96d-134b-45fa-9c5a-f3d4f65670f6'  # 企业微信 API key

            text_to_speech_and_send(file_name, output_file, api_key)
            response = self.bot.upload_text_as_media(content, file_name)
            if 'media_id' in response:
                self.bot.send_media(response)
            time.sleep(3)  # Throttle sending messages
        except Exception as e:
            print(f"Failed to send media for {file_name}: {str(e)}")


if __name__ == "__main__":
    bf = RedisBloomFilter(bf_name='xianqihaotianmi_liuhuangzi')
    bot = WeChatBot('4e35a96d-134b-45fa-9c5a-f3d4f65670f6')
    url = 'http://www.xianqihaotianmi.org/book/107684.html'
    links_data = fetch_and_extract_links(url)

    processor = LinkProcessor(bf, bot)
    processor.process_links(links_data[-7:])  # Only process the last link for the example