本文目录导读:

实现Python实时通信有多种方式,具体取决于你的使用场景(如聊天应用、数据流处理、游戏、物联网等),下面我会介绍几种主流方法,并给出完整的代码案例。
WebSocket(最常用,适合Web应用)
WebSocket是Web端实时通信的标准方案,Python中常用websockets库或WebSocket协议配合asyncio。
安装
pip install websockets
服务端代码 (server.py)
import asyncio
import websockets
import json
# 存储所有连接的客户端
connected_clients = set()
async def handler(websocket, path):
# 注册客户端
connected_clients.add(websocket)
try:
async for message in websocket:
data = json.loads(message)
print(f"收到消息: {data}")
# 广播给所有其他客户端
for client in connected_clients:
if client != websocket:
await client.send(json.dumps({
"user": data["user"],
"message": data["message"],
"timestamp": data["timestamp"]
}))
except websockets.exceptions.ConnectionClosed:
pass
finally:
# 移除断开的客户端
connected_clients.remove(websocket)
async def main():
async with websockets.serve(handler, "localhost", 8765):
print("WebSocket 服务器启动在 ws://localhost:8765")
await asyncio.Future() # 保持服务运行
if __name__ == "__main__":
asyncio.run(main())
客户端代码 (client.py)
import asyncio
import websockets
import json
from datetime import datetime
async def send_messages():
async with websockets.connect("ws://localhost:8765") as websocket:
while True:
message = input("请输入消息 (格式: 用户名:消息): ")
if message.lower() == 'quit':
break
parts = message.split(':', 1)
if len(parts) == 2:
user, msg = parts[0], parts[1]
data = {
"user": user.strip(),
"message": msg.strip(),
"timestamp": datetime.now().isoformat()
}
await websocket.send(json.dumps(data))
# 接收服务器广播的消息
response = await websocket.recv()
print(f"收到广播: {response}")
async def main():
await send_messages()
if __name__ == "__main__":
asyncio.run(main())
Socket(TCP/UDP,适合局域网或自定义协议)
TCP 服务器 (tcp_server.py)
import socket
import threading
def handle_client(client_socket, address):
print(f"新连接: {address}")
try:
while True:
data = client_socket.recv(1024)
if not data:
break
message = data.decode('utf-8')
print(f"收到来自 {address} 的消息: {message}")
# 回复确认
client_socket.send(f"服务器已收到: {message}".encode('utf-8'))
except Exception as e:
print(f"连接异常: {e}")
finally:
client_socket.close()
print(f"连接关闭: {address}")
def start_tcp_server(host='0.0.0.0', port=9999):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((host, port))
server.listen(5)
print(f"TCP服务器启动在 {host}:{port}")
while True:
client, address = server.accept()
thread = threading.Thread(target=handle_client, args=(client, address))
thread.start()
if __name__ == "__main__":
start_tcp_server()
TCP 客户端 (tcp_client.py)
import socket
import threading
def receive_messages(client_socket):
try:
while True:
data = client_socket.recv(1024)
if not data:
break
print(f"服务器回复: {data.decode('utf-8')}")
except:
pass
def start_tcp_client(host='127.0.0.1', port=9999):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
# 启动接收线程
receive_thread = threading.Thread(target=receive_messages, args=(client,))
receive_thread.daemon = True
receive_thread.start()
try:
while True:
message = input("输入消息 (输入'quit'退出): ")
if message.lower() == 'quit':
break
client.send(message.encode('utf-8'))
finally:
client.close()
if __name__ == "__main__":
start_tcp_client()
使用Redis发布/订阅(适合分布式系统)
安装
pip install redis
Redis发布者 (publisher.py)
import redis
import json
from datetime import datetime
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
def publish_message(channel, message):
data = {
"content": message,
"timestamp": datetime.now().isoformat(),
"sender": "publisher1"
}
r.publish(channel, json.dumps(data))
print(f"已发布消息: {data}")
if __name__ == "__main__":
channel = "chat_channel"
while True:
message = input("输入消息: ")
if message.lower() == 'quit':
break
publish_message(channel, message)
Redis订阅者 (subscriber.py)
import redis
import json
def listen_messages():
r = redis.Redis(host='localhost', port=6379, db=0)
pubsub = r.pubsub()
pubsub.subscribe('chat_channel')
print("正在监听 chat_channel 频道...")
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
print(f"[{data['timestamp']}] {data['sender']}: {data['content']}")
if __name__ == "__main__":
listen_messages()
使用MQTT(适合物联网和轻量级通信)
安装
pip install paho-mqtt
MQTT发布者 (mqtt_publisher.py)
import paho.mqtt.client as mqtt
import json
from datetime import datetime
import time
# MQTT配置
BROKER = "test.mosquitto.org" # 公共测试服务器
PORT = 1883
TOPIC = "python/real-time/chat"
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("连接MQTT服务器成功")
else:
print(f"连接失败,返回码: {rc}")
client = mqtt.Client()
client.on_connect = on_connect
client.connect(BROKER, PORT, 60)
try:
while True:
message = input("输入消息: ")
if message.lower() == 'quit':
break
data = {
"content": message,
"timestamp": datetime.now().isoformat(),
"sender": "user1"
}
client.publish(TOPIC, json.dumps(data))
print(f"已发布: {data}")
finally:
client.disconnect()
MQTT订阅者 (mqtt_subscriber.py)
import paho.mqtt.client as mqtt
import json
BROKER = "test.mosquitto.org"
PORT = 1883
TOPIC = "python/real-time/chat"
def on_connect(client, userdata, flags, rc):
print(f"连接MQTT服务器,返回码: {rc}")
client.subscribe(TOPIC)
print(f"已订阅主题: {TOPIC}")
def on_message(client, userdata, msg):
try:
data = json.loads(msg.payload)
print(f"[{data['timestamp']}] {data['sender']}: {data['content']}")
except json.JSONDecodeError:
print(f"收到原始消息: {msg.payload.decode()}")
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT, 60)
client.loop_forever()
各方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| WebSocket | 全双工、浏览器友好 | 需要Web服务器配合 | Web应用、实时聊天、在线游戏 |
| Socket (TCP/UDP) | 低延迟、完全控制 | 需自行处理粘包 | 局域网通信、自定义协议 |
| Redis Pub/Sub | 分布式、持久化 | 需要Redis基础服务 | 微服务架构、实时数据分发 |
| MQTT | 轻量级、QoS支持 | 需要MQTT代理 | IoT设备、移动应用、低带宽场景 |
实战建议
- Web应用开发:优先选择 WebSocket,配合前端JS直接使用
- 内网实时通信:使用 TCP Socket,控制更精细
- 分布式系统:使用 Redis Pub/Sub 或 MQTT
- 物联网项目:强烈推荐 MQTT,支持设备休眠和断线重连
需要我针对某个具体场景(如实时股票数据推送、多人游戏同步等)提供更详细的代码实现吗?