本文目录导读:

- 核心原理
- 环境准备
- 第一步:创建项目并安装 Workerman
- 第二步:编写 WebSocket 服务端代码
- 第三步:编写简单的 Web 客户端(HTML + JavaScript)
- 第四步:启动服务端
- 第五步:测试
- 进阶功能与注意事项
使用Workerman搭建即时通讯(IM)系统是一个非常经典且高效的方案,Workerman是一个高性能的PHP socket框架,它允许你直接操作TCP/UDP连接,非常适合处理长连接和实时通信。
下面我将分步骤、完整地讲解如何使用 Workerman 搭建一个基础的即时通讯服务。
核心原理
- WebSocket 协议:浏览器和服务器之间建立长连接的标准协议,Workerman 内置了 WebSocket 支持。
- 事件驱动:Workerman 通过监听
onConnect、onMessage、onClose等事件来处理客户端连接、消息收发和断开。 - 全局连接管理:使用一个类(如
User)来管理所有在线客户端,方便实现点对点或群组消息转发。
环境准备
- PHP 环境:需要安装 PHP,建议版本 >= 7.2 (推荐 PHP 8.x),确保已安装
pcntl和posix扩展(Linux/Mac)。 - Composer:PHP 的依赖管理工具。
- 服务器:Linux 或 Mac 系统最佳(Windows 下 Workerman 功能受限,仅支持单进程)。
第一步:创建项目并安装 Workerman
在你的项目目录中打开终端,执行:
composer require workerman/workerman
第二步:编写 WebSocket 服务端代码
创建一个文件 server.php(或者 start.php),这是 IM 服务的主入口。
<?php
// server.php
require_once __DIR__ . '/vendor/autoload.php';
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
// 创建一个 Worker 监听 2346 端口,使用 WebSocket 协议
$ws_worker = new Worker('websocket://0.0.0.0:2346');
// 启动 4 个进程,对外提供服务(根据服务器 CPU 核心数调整)
$ws_worker->count = 4;
// 用于存储所有在线用户的连接实例(uid => connection)
$userConnections = [];
/**
* 当有新的客户端连接时触发
* @param TcpConnection $connection
*/
$ws_worker->onConnect = function(TcpConnection $connection) {
echo "New connection from: " . $connection->getRemoteIp() . "\n";
// 可以在这里给连接分配一个临时 ID,或者等待用户登录后绑定 UID
$connection->uid = null; // 暂时未登录
};
/**
* 当收到客户端消息时触发
* @param TcpConnection $connection
* @param string $data 客户端发送的原始数据(JSON 字符串)
*/
$ws_worker->onMessage = function(TcpConnection $connection, $data) use (&$userConnections) {
global $userConnections;
// 1. 解析 JSON 消息
$message = json_decode($data, true);
if (!$message || !isset($message['type'])) {
$connection->send(json_encode(['error' => 'Invalid message format']));
return;
}
// 2. 根据消息类型处理
switch ($message['type']) {
case 'login':
// 用户登录/绑定 UID
$uid = $message['uid'] ?? '';
if (empty($uid)) {
$connection->send(json_encode(['error' => 'UID is required']));
break;
}
// 踢掉旧的连接(如果同一个 UID 在别处登录)
if (isset($userConnections[$uid])) {
$oldConn = $userConnections[$uid];
$oldConn->send(json_encode(['type' => 'kick', 'message' => 'You have been logged in elsewhere.']));
$oldConn->close();
unset($userConnections[$uid]);
}
// 绑定 UID 到当前连接
$connection->uid = $uid;
$userConnections[$uid] = $connection;
// 回复登录成功
$connection->send(json_encode([
'type' => 'login_success',
'uid' => $uid,
'online_users' => array_keys($userConnections) // 返回在线用户列表
]));
// 广播给其他在线用户:新用户上线
broadcastToAll($userConnections, [
'type' => 'user_online',
'uid' => $uid
], $uid); // 不给自己发
echo "User $uid logged in. Online users: " . count($userConnections) . "\n";
break;
case 'private_message':
// 点对点私聊
$fromUid = $connection->uid;
$toUid = $message['to_uid'] ?? '';
$content = $message['content'] ?? '';
if (!$fromUid || !$toUid || empty($content)) {
$connection->send(json_encode(['error' => 'Invalid private message']));
break;
}
// 检查目标用户是否在线
if (isset($userConnections[$toUid])) {
$targetConn = $userConnections[$toUid];
$targetConn->send(json_encode([
'type' => 'private_message',
'from_uid' => $fromUid,
'content' => $content,
'timestamp' => time()
]));
// 给发送者也回一条确认(可选)
$connection->send(json_encode([
'type' => 'message_sent',
'to_uid' => $toUid,
'content' => $content,
'timestamp' => time()
]));
} else {
$connection->send(json_encode([
'type' => 'error',
'message' => "User $toUid is offline or does not exist."
]));
}
break;
case 'broadcast':
// 群发/广播消息
$fromUid = $connection->uid;
$content = $message['content'] ?? '';
if (!$fromUid || empty($content)) {
$connection->send(json_encode(['error' => 'Invalid broadcast']));
break;
}
broadcastToAll($userConnections, [
'type' => 'broadcast',
'from_uid' => $fromUid,
'content' => $content,
'timestamp' => time()
], $fromUid);
break;
case 'ping':
// 心跳检测(客户端定期发送 ping 保持连接)
$connection->send(json_encode(['type' => 'pong']));
break;
default:
$connection->send(json_encode(['error' => 'Unknown message type']));
break;
}
};
/**
* 当客户端连接断开时触发
* @param TcpConnection $connection
*/
$ws_worker->onClose = function(TcpConnection $connection) use (&$userConnections) {
global $userConnections;
$uid = $connection->uid;
if ($uid && isset($userConnections[$uid])) {
// 从在线列表中移除
unset($userConnections[$uid]);
// 广播给其他用户:该用户已下线
broadcastToAll($userConnections, [
'type' => 'user_offline',
'uid' => $uid
]);
echo "User $uid disconnected. Online users: " . count($userConnections) . "\n";
} else {
echo "Connection closed (no UID bound)\n";
}
};
/**
* 辅助函数:向所有在线用户广播消息
* @param array $connections 用户连接数组
* @param array $messageData 要发送的消息数组
* @param string|null $excludeUid 排除的 UID
*/
function broadcastToAll(array &$connections, array $messageData, $excludeUid = null) {
$message = json_encode($messageData);
foreach ($connections as $uid => $conn) {
if ($uid !== $excludeUid) {
$conn->send($message);
}
}
}
// 运行 Worker
Worker::runAll();
第三步:编写简单的 Web 客户端(HTML + JavaScript)
创建一个 client.html 文件,用于在浏览器中测试。
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">Workerman IM Client</title>
<style>
body { font-family: Arial, sans-serif; padding: 20px; }
#messages { border: 1px solid #ccc; height: 300px; overflow-y: scroll; padding: 10px; margin-top: 10px; }
.msg { margin-bottom: 5px; }
.system { color: gray; font-style: italic; }
</style>
</head>
<body>
<h3>Workerman 即时通讯测试</h3>
<div>
UID: <input type="text" id="uid" value="user1">
<button onclick="login()">登录</button>
</div>
<div style="margin-top:10px;">
发送给 (UID): <input type="text" id="to_uid" value="user2">
<input type="text" id="msg_content" placeholder="输入消息" onkeydown="if(event.key==='Enter') sendPrivateMsg()">
<button onclick="sendPrivateMsg()">发送私聊</button>
<button onclick="sendBroadcast()">群发</button>
</div>
<div id="messages"></div>
<script>
let ws = null;
let currentUid = null;
const msgBox = document.getElementById('messages');
function log(message, type = 'system') {
const div = document.createElement('div');
div.className = `msg ${type}`;
div.textContent = message;
msgBox.appendChild(div);
msgBox.scrollTop = msgBox.scrollHeight;
}
function connect() {
// 替换为你的服务器 IP
ws = new WebSocket('ws://127.0.0.1:2346');
ws.onopen = () => {
log('WebSocket 连接已建立');
// 登录
const uid = document.getElementById('uid').value;
currentUid = uid;
ws.send(JSON.stringify({ type: 'login', uid: uid }));
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
console.log('收到消息:', msg);
switch (msg.type) {
case 'login_success':
log(`登录成功!你的 UID: ${msg.uid}`, 'system');
log(`在线用户: ${msg.online_users.join(', ')}`, 'system');
break;
case 'user_online':
log(`用户 ${msg.uid} 上线了`, 'system');
break;
case 'user_offline':
log(`用户 ${msg.uid} 下线了`, 'system');
break;
case 'private_message':
log(`[来自 ${msg.from_uid}] ${msg.content}`, '');
break;
case 'message_sent':
log(`[你 -> ${msg.to_uid}] ${msg.content}`, '');
break;
case 'broadcast':
log(`[广播] ${msg.from_uid}: ${msg.content}`, '');
break;
case 'kick':
log(`系统: ${msg.message}`, 'system');
break;
case 'error':
log(`错误: ${msg.message}`, 'system');
break;
case 'pong':
// 心跳回复,不需要显示
break;
default:
log(`未知类型: ${JSON.stringify(msg)}`);
}
};
ws.onclose = () => {
log('连接已关闭,尝试重连...', 'system');
currentUid = null;
// 2 秒后重连
setTimeout(connect, 2000);
};
ws.onerror = (error) => {
log('连接错误: ' + error, 'system');
};
}
function login() {
// 如果已有连接,先关闭
if (ws) {
ws.close();
}
connect();
}
function sendPrivateMsg() {
if (!ws || ws.readyState !== WebSocket.OPEN) {
log('未连接或连接未就绪', 'system');
return;
}
const toUid = document.getElementById('to_uid').value;
const content = document.getElementById('msg_content').value;
if (!toUid || !content) {
log('请填写接收人和消息内容', 'system');
return;
}
ws.send(JSON.stringify({
type: 'private_message',
to_uid: toUid,
content: content
}));
document.getElementById('msg_content').value = ''; // 清空输入框
}
function sendBroadcast() {
if (!ws || ws.readyState !== WebSocket.OPEN) {
log('未连接或连接未就绪', 'system');
return;
}
const content = document.getElementById('msg_content').value;
if (!content) {
log('请输入要群发的消息', 'system');
return;
}
ws.send(JSON.stringify({
type: 'broadcast',
content: content
}));
document.getElementById('msg_content').value = '';
}
// 页面加载后自动连接
window.onload = () => {
connect();
// 可选:开启定时心跳(如果服务端要求)
setInterval(() => {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ping' }));
}
}, 30000);
};
</script>
</body>
</html>
第四步:启动服务端
在项目根目录下运行:
php server.php start
你会看到类似输出:
Workerman[server.php] start in DEBUG mode
----------------------- WORKERMAN -----------------------------
Workerman version:4.x.x PHP version:8.x.x
------------------------ WORKERS -------------------------------
user worker listen processes status
root none websocket://0.0.0.0:2346 4 [OK]
重要提示:
- 生产环境请使用
php server.php start -d以守护进程模式运行。 - 如果使用云服务器,请确保安全组/防火墙开放了 2346 端口。
第五步:测试
- 打开浏览器,访问你的
client.html文件(http://localhost/client.html)。 - 可以打开多个浏览器标签页,分别输入不同的 UID (如
user1,user2) 并点击登录。 - 在其中一个标签页中,选择接收方 UID,输入消息,点击“发送私聊”或“群发”进行测试。
进阶功能与注意事项
-
数据库持久化:
- 消息存储:使用 MySQL 或 Redis(推荐)来保存聊天记录,每次收到
private_message或broadcast时,异步写入数据库。 - 离线消息:如果目标用户不在线,将消息存入数据库,当该用户下次登录时,主动拉取离线消息。
- 消息存储:使用 MySQL 或 Redis(推荐)来保存聊天记录,每次收到
-
认证与安全:
- Token 验证:登录时不要只传 UID,应该传一个由后端生成的 token(如 JWT),在
onConnect或onMessage中验证其合法性。 - HTTPS/WSS:生产环境必须使用 WSS(WebSocket Secure),需要在 Worker 中配置 SSL 证书。
- Token 验证:登录时不要只传 UID,应该传一个由后端生成的 token(如 JWT),在
-
分布式部署:
当用户量增大,单机 Workerman 不够时,需要使用消息队列(如 Redis Pub/Sub 或 RabbitMQ)来同步不同 Workerman 进程(甚至不同服务器)之间的用户在线状态和消息。
-
心跳机制:
- 客户端定期发送
ping,服务端回复pong(如代码所示)。 - Workerman 默认有
$ws_worker->pingInterval设置,也可以自定义。
- 客户端定期发送
-
进程数优化:
$ws_worker->count = 4可以根据服务器的 CPU 核心数调整,通常设置为 CPU 核心数 x 2。
你只需这几个步骤就可以搭建一个可用的 IM 服务:
- 服务端:利用 Workerman 的 WebSocket 功能监听连接,维护在线用户映射表。
- 客户端:通过 JavaScript 的 WebSocket API 与服务端通信。
- 扩展:后续可以加入数据库、认证、分布式支持,使其成为一个完善的即时通讯系统。