如何使用Workerman搭建即时通讯?

wen PHP项目 44

本文目录导读:

如何使用Workerman搭建即时通讯?

  1. 核心原理
  2. 环境准备
  3. 第一步:创建项目并安装 Workerman
  4. 第二步:编写 WebSocket 服务端代码
  5. 第三步:编写简单的 Web 客户端(HTML + JavaScript)
  6. 第四步:启动服务端
  7. 第五步:测试
  8. 进阶功能与注意事项

使用Workerman搭建即时通讯(IM)系统是一个非常经典且高效的方案,Workerman是一个高性能的PHP socket框架,它允许你直接操作TCP/UDP连接,非常适合处理长连接和实时通信。

下面我将分步骤、完整地讲解如何使用 Workerman 搭建一个基础的即时通讯服务。

核心原理

  1. WebSocket 协议:浏览器和服务器之间建立长连接的标准协议,Workerman 内置了 WebSocket 支持。
  2. 事件驱动:Workerman 通过监听 onConnectonMessageonClose 等事件来处理客户端连接、消息收发和断开。
  3. 全局连接管理:使用一个类(如 User)来管理所有在线客户端,方便实现点对点或群组消息转发。

环境准备

  1. PHP 环境:需要安装 PHP,建议版本 >= 7.2 (推荐 PHP 8.x),确保已安装 pcntlposix 扩展(Linux/Mac)。
  2. Composer:PHP 的依赖管理工具。
  3. 服务器:Linux 或 Mac 系统最佳(Windows 下 Workerman 功能受限,仅支持单进程)。

第一步:创建项目并安装 Workerman

在你的项目目录中打开终端,执行:

composer require workerman/workerman

第二步:编写 WebSocket 服务端代码

创建一个文件 server.php(或者 start.php),这是 IM 服务的主入口。

<?php
// server.php
require_once __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
// 创建一个 Worker 监听 2346 端口,使用 WebSocket 协议
$ws_worker = new Worker('websocket://0.0.0.0:2346');
// 启动 4 个进程,对外提供服务(根据服务器 CPU 核心数调整)
$ws_worker->count = 4;
// 用于存储所有在线用户的连接实例(uid => connection)
$userConnections = [];
/**
 * 当有新的客户端连接时触发
 * @param TcpConnection $connection
 */
$ws_worker->onConnect = function(TcpConnection $connection) {
    echo "New connection from: " . $connection->getRemoteIp() . "\n";
    // 可以在这里给连接分配一个临时 ID,或者等待用户登录后绑定 UID
    $connection->uid = null; // 暂时未登录
};
/**
 * 当收到客户端消息时触发
 * @param TcpConnection $connection
 * @param string $data 客户端发送的原始数据(JSON 字符串)
 */
$ws_worker->onMessage = function(TcpConnection $connection, $data) use (&$userConnections) {
    global $userConnections;
    // 1. 解析 JSON 消息
    $message = json_decode($data, true);
    if (!$message || !isset($message['type'])) {
        $connection->send(json_encode(['error' => 'Invalid message format']));
        return;
    }
    // 2. 根据消息类型处理
    switch ($message['type']) {
        case 'login':
            // 用户登录/绑定 UID
            $uid = $message['uid'] ?? '';
            if (empty($uid)) {
                $connection->send(json_encode(['error' => 'UID is required']));
                break;
            }
            // 踢掉旧的连接(如果同一个 UID 在别处登录)
            if (isset($userConnections[$uid])) {
                $oldConn = $userConnections[$uid];
                $oldConn->send(json_encode(['type' => 'kick', 'message' => 'You have been logged in elsewhere.']));
                $oldConn->close();
                unset($userConnections[$uid]);
            }
            // 绑定 UID 到当前连接
            $connection->uid = $uid;
            $userConnections[$uid] = $connection;
            // 回复登录成功
            $connection->send(json_encode([
                'type' => 'login_success',
                'uid' => $uid,
                'online_users' => array_keys($userConnections) // 返回在线用户列表
            ]));
            // 广播给其他在线用户:新用户上线
            broadcastToAll($userConnections, [
                'type' => 'user_online',
                'uid' => $uid
            ], $uid); // 不给自己发
            echo "User $uid logged in. Online users: " . count($userConnections) . "\n";
            break;
        case 'private_message':
            // 点对点私聊
            $fromUid = $connection->uid;
            $toUid = $message['to_uid'] ?? '';
            $content = $message['content'] ?? '';
            if (!$fromUid || !$toUid || empty($content)) {
                $connection->send(json_encode(['error' => 'Invalid private message']));
                break;
            }
            // 检查目标用户是否在线
            if (isset($userConnections[$toUid])) {
                $targetConn = $userConnections[$toUid];
                $targetConn->send(json_encode([
                    'type' => 'private_message',
                    'from_uid' => $fromUid,
                    'content' => $content,
                    'timestamp' => time()
                ]));
                // 给发送者也回一条确认(可选)
                $connection->send(json_encode([
                    'type' => 'message_sent',
                    'to_uid' => $toUid,
                    'content' => $content,
                    'timestamp' => time()
                ]));
            } else {
                $connection->send(json_encode([
                    'type' => 'error',
                    'message' => "User $toUid is offline or does not exist."
                ]));
            }
            break;
        case 'broadcast':
            // 群发/广播消息
            $fromUid = $connection->uid;
            $content = $message['content'] ?? '';
            if (!$fromUid || empty($content)) {
                $connection->send(json_encode(['error' => 'Invalid broadcast']));
                break;
            }
            broadcastToAll($userConnections, [
                'type' => 'broadcast',
                'from_uid' => $fromUid,
                'content' => $content,
                'timestamp' => time()
            ], $fromUid);
            break;
        case 'ping':
            // 心跳检测(客户端定期发送 ping 保持连接)
            $connection->send(json_encode(['type' => 'pong']));
            break;
        default:
            $connection->send(json_encode(['error' => 'Unknown message type']));
            break;
    }
};
/**
 * 当客户端连接断开时触发
 * @param TcpConnection $connection
 */
$ws_worker->onClose = function(TcpConnection $connection) use (&$userConnections) {
    global $userConnections;
    $uid = $connection->uid;
    if ($uid && isset($userConnections[$uid])) {
        // 从在线列表中移除
        unset($userConnections[$uid]);
        // 广播给其他用户:该用户已下线
        broadcastToAll($userConnections, [
            'type' => 'user_offline',
            'uid' => $uid
        ]);
        echo "User $uid disconnected. Online users: " . count($userConnections) . "\n";
    } else {
        echo "Connection closed (no UID bound)\n";
    }
};
/**
 * 辅助函数:向所有在线用户广播消息
 * @param array $connections 用户连接数组
 * @param array $messageData 要发送的消息数组
 * @param string|null $excludeUid 排除的 UID
 */
function broadcastToAll(array &$connections, array $messageData, $excludeUid = null) {
    $message = json_encode($messageData);
    foreach ($connections as $uid => $conn) {
        if ($uid !== $excludeUid) {
            $conn->send($message);
        }
    }
}
// 运行 Worker
Worker::runAll();

第三步:编写简单的 Web 客户端(HTML + JavaScript)

创建一个 client.html 文件,用于在浏览器中测试。

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">Workerman IM Client</title>
    <style>
        body { font-family: Arial, sans-serif; padding: 20px; }
        #messages { border: 1px solid #ccc; height: 300px; overflow-y: scroll; padding: 10px; margin-top: 10px; }
        .msg { margin-bottom: 5px; }
        .system { color: gray; font-style: italic; }
    </style>
</head>
<body>
    <h3>Workerman 即时通讯测试</h3>
    <div>
        UID: <input type="text" id="uid" value="user1">
        <button onclick="login()">登录</button>
    </div>
    <div style="margin-top:10px;">
        发送给 (UID): <input type="text" id="to_uid" value="user2">
        <input type="text" id="msg_content" placeholder="输入消息" onkeydown="if(event.key==='Enter') sendPrivateMsg()">
        <button onclick="sendPrivateMsg()">发送私聊</button>
        <button onclick="sendBroadcast()">群发</button>
    </div>
    <div id="messages"></div>
    <script>
        let ws = null;
        let currentUid = null;
        const msgBox = document.getElementById('messages');
        function log(message, type = 'system') {
            const div = document.createElement('div');
            div.className = `msg ${type}`;
            div.textContent = message;
            msgBox.appendChild(div);
            msgBox.scrollTop = msgBox.scrollHeight;
        }
        function connect() {
            // 替换为你的服务器 IP
            ws = new WebSocket('ws://127.0.0.1:2346');
            ws.onopen = () => {
                log('WebSocket 连接已建立');
                // 登录
                const uid = document.getElementById('uid').value;
                currentUid = uid;
                ws.send(JSON.stringify({ type: 'login', uid: uid }));
            };
            ws.onmessage = (event) => {
                const msg = JSON.parse(event.data);
                console.log('收到消息:', msg);
                switch (msg.type) {
                    case 'login_success':
                        log(`登录成功!你的 UID: ${msg.uid}`, 'system');
                        log(`在线用户: ${msg.online_users.join(', ')}`, 'system');
                        break;
                    case 'user_online':
                        log(`用户 ${msg.uid} 上线了`, 'system');
                        break;
                    case 'user_offline':
                        log(`用户 ${msg.uid} 下线了`, 'system');
                        break;
                    case 'private_message':
                        log(`[来自 ${msg.from_uid}] ${msg.content}`, '');
                        break;
                    case 'message_sent':
                        log(`[你 -> ${msg.to_uid}] ${msg.content}`, '');
                        break;
                    case 'broadcast':
                        log(`[广播] ${msg.from_uid}: ${msg.content}`, '');
                        break;
                    case 'kick':
                        log(`系统: ${msg.message}`, 'system');
                        break;
                    case 'error':
                        log(`错误: ${msg.message}`, 'system');
                        break;
                    case 'pong':
                        // 心跳回复,不需要显示
                        break;
                    default:
                        log(`未知类型: ${JSON.stringify(msg)}`);
                }
            };
            ws.onclose = () => {
                log('连接已关闭,尝试重连...', 'system');
                currentUid = null;
                // 2 秒后重连
                setTimeout(connect, 2000);
            };
            ws.onerror = (error) => {
                log('连接错误: ' + error, 'system');
            };
        }
        function login() {
            // 如果已有连接,先关闭
            if (ws) {
                ws.close();
            }
            connect();
        }
        function sendPrivateMsg() {
            if (!ws || ws.readyState !== WebSocket.OPEN) {
                log('未连接或连接未就绪', 'system');
                return;
            }
            const toUid = document.getElementById('to_uid').value;
            const content = document.getElementById('msg_content').value;
            if (!toUid || !content) {
                log('请填写接收人和消息内容', 'system');
                return;
            }
            ws.send(JSON.stringify({
                type: 'private_message',
                to_uid: toUid,
                content: content
            }));
            document.getElementById('msg_content').value = ''; // 清空输入框
        }
        function sendBroadcast() {
            if (!ws || ws.readyState !== WebSocket.OPEN) {
                log('未连接或连接未就绪', 'system');
                return;
            }
            const content = document.getElementById('msg_content').value;
            if (!content) {
                log('请输入要群发的消息', 'system');
                return;
            }
            ws.send(JSON.stringify({
                type: 'broadcast',
                content: content
            }));
            document.getElementById('msg_content').value = '';
        }
        // 页面加载后自动连接
        window.onload = () => {
            connect();
            // 可选:开启定时心跳(如果服务端要求)
            setInterval(() => {
                if (ws && ws.readyState === WebSocket.OPEN) {
                    ws.send(JSON.stringify({ type: 'ping' }));
                }
            }, 30000);
        };
    </script>
</body>
</html>

第四步:启动服务端

在项目根目录下运行:

php server.php start

你会看到类似输出:

Workerman[server.php] start in DEBUG mode
----------------------- WORKERMAN -----------------------------
Workerman version:4.x.x          PHP version:8.x.x
------------------------ WORKERS -------------------------------
user          worker        listen                         processes status
root          none          websocket://0.0.0.0:2346       4         [OK]

重要提示

  • 生产环境请使用 php server.php start -d 以守护进程模式运行。
  • 如果使用云服务器,请确保安全组/防火墙开放了 2346 端口。

第五步:测试

  1. 打开浏览器,访问你的 client.html 文件(http://localhost/client.html)。
  2. 可以打开多个浏览器标签页,分别输入不同的 UID (如 user1, user2) 并点击登录。
  3. 在其中一个标签页中,选择接收方 UID,输入消息,点击“发送私聊”或“群发”进行测试。

进阶功能与注意事项

  1. 数据库持久化

    • 消息存储:使用 MySQL 或 Redis(推荐)来保存聊天记录,每次收到 private_messagebroadcast 时,异步写入数据库。
    • 离线消息:如果目标用户不在线,将消息存入数据库,当该用户下次登录时,主动拉取离线消息。
  2. 认证与安全

    • Token 验证:登录时不要只传 UID,应该传一个由后端生成的 token(如 JWT),在 onConnectonMessage 中验证其合法性。
    • HTTPS/WSS:生产环境必须使用 WSS(WebSocket Secure),需要在 Worker 中配置 SSL 证书。
  3. 分布式部署

    当用户量增大,单机 Workerman 不够时,需要使用消息队列(如 Redis Pub/Sub 或 RabbitMQ)来同步不同 Workerman 进程(甚至不同服务器)之间的用户在线状态和消息。

  4. 心跳机制

    • 客户端定期发送 ping,服务端回复 pong(如代码所示)。
    • Workerman 默认有 $ws_worker->pingInterval 设置,也可以自定义。
  5. 进程数优化

    • $ws_worker->count = 4 可以根据服务器的 CPU 核心数调整,通常设置为 CPU 核心数 x 2。

你只需这几个步骤就可以搭建一个可用的 IM 服务:

  • 服务端:利用 Workerman 的 WebSocket 功能监听连接,维护在线用户映射表。
  • 客户端:通过 JavaScript 的 WebSocket API 与服务端通信。
  • 扩展:后续可以加入数据库、认证、分布式支持,使其成为一个完善的即时通讯系统。

抱歉,评论功能暂时关闭!