企业微信发送文件
import requests
import json
import io
class WeChatBot:
def __init__(self, api_key):
self.api_key = api_key
self.base_url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={self.api_key}'
self.headers = {'Content-Type': 'application/json'}
self.upload_url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={self.api_key}&type=file'
def send_message(self, data):
return self._post_request(self.base_url, data)
def upload_media(self, content, file_name, content_type):
files = {'media': (file_name, content, content_type)}
return self._post_request(self.upload_url, files=files, is_json=False)
def upload_text_as_media(self, text_content, file_name):
file_stream = io.BytesIO(text_content.encode('utf-8'))
return self.upload_media(file_stream, file_name, 'application/octet-stream')
def upload_audio_as_media(self, audio_file_path):
return self._upload_file(audio_file_path, 'audio/mpeg')
def upload_video_as_media(self, video_file_path):
return self._upload_file(video_file_path, 'video/mp4')
def send_media(self, response):
if response.get("media_id"):
data = {
"msgtype": "file",
"file": {"media_id": response["media_id"]}
}
return self.send_message(data)
return response
def _post_request(self, url, data=None, files=None, is_json=True):
try:
if is_json:
response = requests.post(url, headers=self.headers, data=json.dumps(data))
else:
response = requests.post(url, files=files)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"请求时出错: {e}")
return {"error": str(e)}
def _upload_file(self, file_path, content_type):
with open(file_path, 'rb') as file:
file_name = file_path.split('/')[-1]
return self.upload_media(file, file_name, content_type)
if __name__ == "__main__":
bot = WeChatBot('4e35a57d-134b-45fa-9c5a-f3d4f55670f6')
# 发送文本消息
message_data = {
"msgtype": "text",
"text": {
"content": "Hello, this is a test message"
}
}
response = bot.send_message(message_data)
print("发送文本消息的响应:", response)
# 上传文本文件并发送媒体消息
text_content = "This is the content of the file."
file_name = "example.txt"
upload_response = bot.upload_text_as_media(text_content, file_name)
print("上传文件的响应:", upload_response)
if 'media_id' in upload_response:
media_response = bot.send_media(upload_response)
print("发送媒体消息的响应:", media_response)
# 上传音频文件并发送媒体消息
audio_file_path = 'output.mp3'
audio_response = bot.upload_audio_as_media(audio_file_path)
print("上传音频文件的响应:", audio_response)
if 'media_id' in audio_response:
media_response = bot.send_media(audio_response)
print("发送音频媒体消息的响应:", media_response)
# 上传视频文件并发送媒体消息
video_file_path = '20240708_071114.mp4'
video_response = bot.upload_video_as_media(video_file_path)
print("上传视频文件的响应:", video_response)
if 'media_id' in video_response:
media_response = bot.send_media(video_response)
print("发送视频媒体消息的响应:", media_response)