本文目录导读:

- 使用WebSocket实现实时推送
- 使用HTTP请求推送(RESTful API)
- 使用消息队列(RabbitMQ)
- 使用Redis发布/订阅
- 使用第三方推送服务(OneSignal示例)
- 创建简单的推送框架
- 选择建议
在Python中推送自定义消息有多种方式,具体取决于你的应用场景,以下是几种常见的实现方案:
使用WebSocket实现实时推送
安装依赖
pip install websockets asyncio
服务端代码
import asyncio
import websockets
import json
async def handler(websocket, path):
try:
async for message in websocket:
print(f"收到消息: {message}")
# 向客户端发送响应
await websocket.send(json.dumps({
"status": "success",
"data": "消息已接收"
}))
except websockets.exceptions.ConnectionClosed:
print("连接关闭")
async def main():
async with websockets.serve(handler, "localhost", 8765):
print("WebSocket服务运行在 ws://localhost:8765")
await asyncio.Future() # 永远运行
# 推送自定义消息
async def push_message(message):
async with websockets.connect("ws://localhost:8765") as websocket:
await websocket.send(json.dumps(message))
response = await websocket.recv()
return response
if __name__ == "__main__":
asyncio.run(main())
使用HTTP请求推送(RESTful API)
服务端(Flask)
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
# 存储订阅者
subscribers = []
@app.route('/subscribe', methods=['POST'])
def subscribe():
data = request.json
subscribers.append(data['url'])
return jsonify({"status": "success"})
@app.route('/push', methods=['POST'])
def push():
message = request.json
failed_subscribers = []
for url in subscribers:
try:
response = requests.post(url, json=message, timeout=5)
except:
failed_subscribers.append(url)
return jsonify({
"failed_count": len(failed_subscribers),
"failed_subscribers": failed_subscribers
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
客户端
import requests
import json
def push_custom_message():
message = {
"title": "系统通知",
"content": "这是一条自定义推送消息",
"type": "info",
"timestamp": "2024-01-01 12:00:00",
"priority": "high"
}
# 推送到WebSocket服务器
response = requests.post(
"http://localhost:5000/push",
json=message
)
print(f"推送结果: {response.json()}")
if __name__ == "__main__":
push_custom_message()
使用消息队列(RabbitMQ)
安装依赖
pip install pika
生产者(推送消息)
import pika
import json
def push_to_rabbitmq(message):
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='custom_messages', durable=True)
# 发送消息
channel.basic_publish(
exchange='',
routing_key='custom_messages',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # 持久化消息
)
)
print(f"消息已发送: {message}")
connection.close()
if __name__ == "__main__":
message = {
"event": "system_alert",
"severity": "warning",
"message": "系统资源使用率超过90%"
}
push_to_rabbitmq(message)
消费者(接收消息)
import pika
import json
def callback(ch, method, properties, body):
message = json.loads(body)
print(f"收到消息: {message}")
# 处理消息...
ch.basic_ack(delivery_tag=method.delivery_tag)
def start_consumer():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='custom_messages', durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='custom_messages',
on_message_callback=callback
)
print('等待消息...')
channel.start_consuming()
if __name__ == "__main__":
start_consumer()
使用Redis发布/订阅
安装依赖
pip install redis
发布者
import redis
import json
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
def push_via_redis(channel, message):
# 发布消息到指定频道
r.publish(channel, json.dumps(message))
print(f"消息已发布到频道 {channel}")
if __name__ == "__main__":
message = {
"type": "user_activity",
"user_id": 12345,
"action": "login",
"timestamp": "2024-01-01 12:00:00"
}
# 推送到多个频道
channels = ['system_alerts', 'user_activity']
for channel in channels:
push_via_redis(channel, message)
订阅者
import redis
import json
def listen_to_channel(channel):
r = redis.Redis(host='localhost', port=6379, db=0)
pubsub = r.pubsub()
pubsub.subscribe(channel)
print(f"监听频道: {channel}")
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
print(f"收到消息: {data}")
# 处理消息...
if __name__ == "__main__":
listen_to_channel('system_alerts')
使用第三方推送服务(OneSignal示例)
安装依赖
pip install onesignal-sdk
from onesignal_sdk.client import Client
from onesignal_sdk.error import OneSignalError
def push_via_onesignal():
# 初始化OneSignal客户端
client = Client(
app_id="your_app_id",
rest_api_key="your_rest_api_key"
)
# 创建通知内容
notification_body = {
"contents": {
"en": "这是一条自定义推送消息",
"zh": "这是一条自定义推送消息"
},
"headings": {
"en": "系统通知",
"zh": "系统通知"
},
"include_player_ids": ["player_id_1", "player_id_2"], # 指定用户
"data": {
"type": "custom_notification",
"action": "open_url",
"url": "https://example.com"
},
"ios_badgeType": "Increase",
"ios_badgeCount": 1
}
try:
response = client.send_notification(notification_body)
print(f"推送成功: {response.body}")
except OneSignalError as e:
print(f"推送失败: {e}")
if __name__ == "__main__":
push_via_onesignal()
创建简单的推送框架
import json
import threading
import queue
from typing import Callable, Dict, Any
from datetime import datetime
class CustomPushFramework:
def __init__(self):
self.channels = {} # 存储通道和订阅者
self.message_queue = queue.Queue()
self.running = False
def subscribe(self, channel: str, callback: Callable):
"""订阅消息通道"""
if channel not in self.channels:
self.channels[channel] = []
self.channels[channel].append(callback)
print(f"已订阅通道: {channel}")
def unsubscribe(self, channel: str, callback: Callable):
"""取消订阅"""
if channel in self.channels:
self.channels[channel].remove(callback)
def publish(self, channel: str, message: Any):
"""发布消息到通道"""
message_with_meta = {
"channel": channel,
"data": message,
"timestamp": datetime.now().isoformat(),
"message_id": id(message)
}
self.message_queue.put(message_with_meta)
print(f"消息已入队: {message_with_meta}")
def process_messages(self):
"""处理消息队列中的消息"""
while self.running:
try:
message = self.message_queue.get(timeout=1)
channel = message["channel"]
if channel in self.channels:
for callback in self.channels[channel]:
try:
callback(message)
except Exception as e:
print(f"消息处理错误: {e}")
except queue.Empty:
continue
def start(self):
"""启动推送框架"""
self.running = True
worker = threading.Thread(target=self.process_messages)
worker.daemon = True
worker.start()
print("推送框架已启动")
def stop(self):
"""停止推送框架"""
self.running = False
print("推送框架已停止")
# 使用示例
def main():
# 创建推送框架实例
push_framework = CustomPushFramework()
push_framework.start()
# 定义消息处理器
def email_handler(message):
print(f"[邮件通知] {message['data']}")
def sms_handler(message):
print(f"[短信通知] {message['data']}")
def webhook_handler(message):
print(f"[Webhook通知] {message['data']}")
# 订阅通道
push_framework.subscribe("notifications", email_handler)
push_framework.subscribe("alerts", sms_handler)
push_framework.subscribe("webhooks", webhook_handler)
# 推送自定义消息
push_framework.publish("notifications", {
"title": "新消息通知",
"content": "您有一条新的系统消息"
})
push_framework.publish("alerts", {
"level": "warning",
"message": "服务器CPU使用率过高"
})
push_framework.publish("webhooks", {
"event": "user.created",
"user_id": 12345
})
# 等待处理完成
import time
time.sleep(2)
push_framework.stop()
if __name__ == "__main__":
main()
选择建议
- WebSocket:适合需要实时双向通信的场景
- HTTP API:适合简单的请求-响应模式
- 消息队列:适合高并发、需要消息持久化的场景
- Redis Pub/Sub:适合轻量级的发布订阅模式
- 第三方服务:适合需要跨平台推送的场景
根据你的具体需求选择合适的方案,如果需要更多针对特定场景的示例,请告诉我!