Python案例如何实现实时通信?

wen python案例 67

本文目录导读:

Python案例如何实现实时通信?

  1. WebSocket(最常用,适合Web应用)
  2. Socket(TCP/UDP,适合局域网或自定义协议)
  3. 使用Redis发布/订阅(适合分布式系统)
  4. 使用MQTT(适合物联网和轻量级通信)
  5. 各方案对比
  6. 实战建议

实现Python实时通信有多种方式,具体取决于你的使用场景(如聊天应用、数据流处理、游戏、物联网等),下面我会介绍几种主流方法,并给出完整的代码案例。

WebSocket(最常用,适合Web应用)

WebSocket是Web端实时通信的标准方案,Python中常用websockets库或WebSocket协议配合asyncio

安装

pip install websockets

服务端代码 (server.py)

import asyncio
import websockets
import json
# 存储所有连接的客户端
connected_clients = set()
async def handler(websocket, path):
    # 注册客户端
    connected_clients.add(websocket)
    try:
        async for message in websocket:
            data = json.loads(message)
            print(f"收到消息: {data}")
            # 广播给所有其他客户端
            for client in connected_clients:
                if client != websocket:
                    await client.send(json.dumps({
                        "user": data["user"],
                        "message": data["message"],
                        "timestamp": data["timestamp"]
                    }))
    except websockets.exceptions.ConnectionClosed:
        pass
    finally:
        # 移除断开的客户端
        connected_clients.remove(websocket)
async def main():
    async with websockets.serve(handler, "localhost", 8765):
        print("WebSocket 服务器启动在 ws://localhost:8765")
        await asyncio.Future()  # 保持服务运行
if __name__ == "__main__":
    asyncio.run(main())

客户端代码 (client.py)

import asyncio
import websockets
import json
from datetime import datetime
async def send_messages():
    async with websockets.connect("ws://localhost:8765") as websocket:
        while True:
            message = input("请输入消息 (格式: 用户名:消息): ")
            if message.lower() == 'quit':
                break
            parts = message.split(':', 1)
            if len(parts) == 2:
                user, msg = parts[0], parts[1]
                data = {
                    "user": user.strip(),
                    "message": msg.strip(),
                    "timestamp": datetime.now().isoformat()
                }
                await websocket.send(json.dumps(data))
                # 接收服务器广播的消息
                response = await websocket.recv()
                print(f"收到广播: {response}")
async def main():
    await send_messages()
if __name__ == "__main__":
    asyncio.run(main())

Socket(TCP/UDP,适合局域网或自定义协议)

TCP 服务器 (tcp_server.py)

import socket
import threading
def handle_client(client_socket, address):
    print(f"新连接: {address}")
    try:
        while True:
            data = client_socket.recv(1024)
            if not data:
                break
            message = data.decode('utf-8')
            print(f"收到来自 {address} 的消息: {message}")
            # 回复确认
            client_socket.send(f"服务器已收到: {message}".encode('utf-8'))
    except Exception as e:
        print(f"连接异常: {e}")
    finally:
        client_socket.close()
        print(f"连接关闭: {address}")
def start_tcp_server(host='0.0.0.0', port=9999):
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind((host, port))
    server.listen(5)
    print(f"TCP服务器启动在 {host}:{port}")
    while True:
        client, address = server.accept()
        thread = threading.Thread(target=handle_client, args=(client, address))
        thread.start()
if __name__ == "__main__":
    start_tcp_server()

TCP 客户端 (tcp_client.py)

import socket
import threading
def receive_messages(client_socket):
    try:
        while True:
            data = client_socket.recv(1024)
            if not data:
                break
            print(f"服务器回复: {data.decode('utf-8')}")
    except:
        pass
def start_tcp_client(host='127.0.0.1', port=9999):
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect((host, port))
    # 启动接收线程
    receive_thread = threading.Thread(target=receive_messages, args=(client,))
    receive_thread.daemon = True
    receive_thread.start()
    try:
        while True:
            message = input("输入消息 (输入'quit'退出): ")
            if message.lower() == 'quit':
                break
            client.send(message.encode('utf-8'))
    finally:
        client.close()
if __name__ == "__main__":
    start_tcp_client()

使用Redis发布/订阅(适合分布式系统)

安装

pip install redis

Redis发布者 (publisher.py)

import redis
import json
from datetime import datetime
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
def publish_message(channel, message):
    data = {
        "content": message,
        "timestamp": datetime.now().isoformat(),
        "sender": "publisher1"
    }
    r.publish(channel, json.dumps(data))
    print(f"已发布消息: {data}")
if __name__ == "__main__":
    channel = "chat_channel"
    while True:
        message = input("输入消息: ")
        if message.lower() == 'quit':
            break
        publish_message(channel, message)

Redis订阅者 (subscriber.py)

import redis
import json
def listen_messages():
    r = redis.Redis(host='localhost', port=6379, db=0)
    pubsub = r.pubsub()
    pubsub.subscribe('chat_channel')
    print("正在监听 chat_channel 频道...")
    for message in pubsub.listen():
        if message['type'] == 'message':
            data = json.loads(message['data'])
            print(f"[{data['timestamp']}] {data['sender']}: {data['content']}")
if __name__ == "__main__":
    listen_messages()

使用MQTT(适合物联网和轻量级通信)

安装

pip install paho-mqtt

MQTT发布者 (mqtt_publisher.py)

import paho.mqtt.client as mqtt
import json
from datetime import datetime
import time
# MQTT配置
BROKER = "test.mosquitto.org"  # 公共测试服务器
PORT = 1883
TOPIC = "python/real-time/chat"
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("连接MQTT服务器成功")
    else:
        print(f"连接失败,返回码: {rc}")
client = mqtt.Client()
client.on_connect = on_connect
client.connect(BROKER, PORT, 60)
try:
    while True:
        message = input("输入消息: ")
        if message.lower() == 'quit':
            break
        data = {
            "content": message,
            "timestamp": datetime.now().isoformat(),
            "sender": "user1"
        }
        client.publish(TOPIC, json.dumps(data))
        print(f"已发布: {data}")
finally:
    client.disconnect()

MQTT订阅者 (mqtt_subscriber.py)

import paho.mqtt.client as mqtt
import json
BROKER = "test.mosquitto.org"
PORT = 1883
TOPIC = "python/real-time/chat"
def on_connect(client, userdata, flags, rc):
    print(f"连接MQTT服务器,返回码: {rc}")
    client.subscribe(TOPIC)
    print(f"已订阅主题: {TOPIC}")
def on_message(client, userdata, msg):
    try:
        data = json.loads(msg.payload)
        print(f"[{data['timestamp']}] {data['sender']}: {data['content']}")
    except json.JSONDecodeError:
        print(f"收到原始消息: {msg.payload.decode()}")
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT, 60)
client.loop_forever()

各方案对比

方案 优点 缺点 适用场景
WebSocket 全双工、浏览器友好 需要Web服务器配合 Web应用、实时聊天、在线游戏
Socket (TCP/UDP) 低延迟、完全控制 需自行处理粘包 局域网通信、自定义协议
Redis Pub/Sub 分布式、持久化 需要Redis基础服务 微服务架构、实时数据分发
MQTT 轻量级、QoS支持 需要MQTT代理 IoT设备、移动应用、低带宽场景

实战建议

  1. Web应用开发:优先选择 WebSocket,配合前端JS直接使用
  2. 内网实时通信:使用 TCP Socket,控制更精细
  3. 分布式系统:使用 Redis Pub/SubMQTT
  4. 物联网项目:强烈推荐 MQTT,支持设备休眠和断线重连

需要我针对某个具体场景(如实时股票数据推送、多人游戏同步等)提供更详细的代码实现吗?

抱歉,评论功能暂时关闭!