本文目录导读:

我来提供一个完整的Java WebSocket推送案例,使用Java标准库和Spring Boot两种方式实现。
使用Java标准库实现
添加依赖(Maven)
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<version>1.1</version>
<scope>provided</scope>
</dependency>
WebSocket服务端
package com.example.websocket;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
@ServerEndpoint("/websocket")
public class WebSocketServer {
// 记录当前在线连接数
private static final AtomicInteger onlineCount = new AtomicInteger(0);
// 存放每个客户端对应的WebSocket对象
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet =
new CopyOnWriteArraySet<>();
// 与客户端的连接会话
private Session session;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
this.session = session;
webSocketSet.add(this);
onlineCount.incrementAndGet();
System.out.println("新连接加入!当前在线人数:" + onlineCount.get());
// 发送欢迎消息
sendMessage("欢迎连接到WebSocket服务!");
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this);
onlineCount.decrementAndGet();
System.out.println("连接关闭!当前在线人数:" + onlineCount.get());
}
/**
* 收到客户端消息后调用的方法
*/
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("来自客户端的消息:" + message);
// 群发消息
for (WebSocketServer item : webSocketSet) {
item.sendMessage("服务端回复:" + message);
}
}
/**
* 发生错误时调用
*/
@OnError
public void onError(Session session, Throwable error) {
System.out.println("发生错误");
error.printStackTrace();
}
/**
* 向客户端发送消息
*/
public void sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 群发自定义消息
*/
public static void sendToAll(String message) {
System.out.println("群发消息给所有客户端:" + message);
for (WebSocketServer item : webSocketSet) {
item.sendMessage(message);
}
}
/**
* 获取在线人数
*/
public static int getOnlineCount() {
return onlineCount.get();
}
}
WebSocket配置类
package com.example.websocket;
import javax.websocket.server.ServerEndpointConfig;
import javax.websocket.server.ServerEndpointConfig.Configurator;
public class WebSocketConfigurator extends Configurator {
@Override
public void modifyHandshake(ServerEndpointConfig sec,
javax.websocket.HandshakeRequest request,
javax.websocket.HandshakeResponse response) {
// 可以在这里添加握手时的自定义逻辑
System.out.println("WebSocket握手配置");
}
}
测试客户端(HTML页面)
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">WebSocket测试客户端</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
#message { width: 300px; height: 100px; }
#output { border: 1px solid #ccc; padding: 10px; height: 300px; overflow-y: auto; }
</style>
</head>
<body>
<h2>WebSocket测试</h2>
<div>
<input type="text" id="msgInput" placeholder="输入消息" />
<button onclick="sendMessage()">发送</button>
</div>
<div id="output"></div>
<script>
let websocket = null;
// 连接WebSocket
function connect() {
const url = "ws://localhost:8080/websocket"; // 修改为实际服务器地址
websocket = new WebSocket(url);
websocket.onopen = function() {
appendMessage("连接已建立");
};
websocket.onmessage = function(event) {
appendMessage("收到消息: " + event.data);
};
websocket.onclose = function() {
appendMessage("连接已关闭");
};
websocket.onerror = function(error) {
appendMessage("发生错误: " + error);
};
}
// 发送消息
function sendMessage() {
const msg = document.getElementById("msgInput").value;
if (websocket && websocket.readyState === WebSocket.OPEN) {
websocket.send(msg);
appendMessage("发送消息: " + msg);
document.getElementById("msgInput").value = "";
} else {
appendMessage("连接未建立");
}
}
// 显示消息
function appendMessage(msg) {
const output = document.getElementById("output");
const p = document.createElement("p");
p.textContent = msg;
output.appendChild(p);
output.scrollTop = output.scrollHeight;
}
// 页面加载时连接
window.onload = connect;
</script>
</body>
</html>
使用Spring Boot实现(推荐)
添加Maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
WebSocket配置类
package com.example.websocket.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
@EnableWebSocket
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
WebSocket服务端(使用Spring注解)
package com.example.websocket.server;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
@Component
@ServerEndpoint("/ws/push")
public class WebSocketPushServer {
// 存储所有连接的会话
private static CopyOnWriteArrayList<WebSocketPushServer> clients =
new CopyOnWriteArrayList<>();
private Session session;
private String userId;
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
this.session = session;
this.userId = userId;
clients.add(this);
System.out.println("用户 " + userId + " 连接成功,当前在线:" + clients.size());
}
@OnClose
public void onClose() {
clients.remove(this);
System.out.println("用户 " + userId + " 断开连接,当前在线:" + clients.size());
}
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("收到用户 " + userId + " 的消息:" + message);
// 处理业务逻辑
handleMessage(message);
}
@OnError
public void onError(Session session, Throwable error) {
System.out.println("用户 " + userId + " 发生错误");
error.printStackTrace();
}
/**
* 发送消息给指定用户
*/
public static void sendToUser(String userId, String message) {
for (WebSocketPushServer client : clients) {
if (client.userId.equals(userId)) {
try {
client.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 群发消息给所有用户
*/
public static void sendToAll(String message) {
for (WebSocketPushServer client : clients) {
try {
client.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 处理业务消息
*/
private void handleMessage(String message) {
// 这里添加业务逻辑处理
if (message.startsWith("BROADCAST:")) {
sendToAll("【广播】" + message.substring(10));
} else {
sendToUser(userId, "【回复】" + message);
}
}
}
消息推送服务接口
package com.example.websocket.service;
import org.springframework.stereotype.Service;
@Service
public class PushService {
/**
* 推送实时数据
*/
public void pushRealTimeData(String dataType, Object data) {
String message = String.format("{\"type\":\"%s\",\"data\":%s}",
dataType, data.toString());
WebSocketPushServer.sendToAll(message);
}
/**
* 推送通知给特定用户
*/
public void pushNotification(String userId, String title, String content) {
String message = String.format(
"{\"type\":\"notification\",\"title\":\"%s\",\"content\":\"%s\"}",
title, content);
WebSocketPushServer.sendToUser(userId, message);
}
}
控制器接口
package com.example.websocket.controller;
import com.example.websocket.service.PushService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/push")
public class PushController {
@Autowired
private PushService pushService;
/**
* 手动触发推送
*/
@PostMapping("/send")
public String sendPush(@RequestParam String message) {
WebSocketPushServer.sendToAll(message);
return "推送成功:" + message;
}
/**
* 发送给特定用户
*/
@PostMapping("/sendToUser")
public String sendToUser(@RequestParam String userId,
@RequestParam String message) {
WebSocketPushServer.sendToUser(userId, message);
return "推送成功给用户 " + userId;
}
/**
* 推送实时数据
*/
@PostMapping("/realtime")
public String pushRealTime(@RequestParam String type,
@RequestParam String data) {
pushService.pushRealTimeData(type, data);
return "实时数据推送成功";
}
}
增强版客户端(支持心跳检测)
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">实时推送客户端</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body { font-family: 'Microsoft YaHei', sans-serif; padding: 20px; background: #f5f5f5; }
.container { max-width: 600px; margin: 0 auto; }
.header { background: #1890ff; color: white; padding: 15px; border-radius: 8px 8px 0 0; }
.status {
display: inline-block;
width: 10px;
height: 10px;
border-radius: 50%;
margin-right: 10px;
}
.connected { background: #52c41a; }
.disconnected { background: #ff4d4f; }
.chat-area {
background: white;
border: 1px solid #e8e8e8;
border-top: none;
padding: 15px;
}
.message-area {
height: 400px;
overflow-y: auto;
margin-bottom: 15px;
padding: 10px;
background: #fafafa;
border-radius: 4px;
}
.input-area { display: flex; gap: 10px; }
.message-input {
flex: 1;
padding: 10px;
border: 1px solid #d9d9d9;
border-radius: 4px;
}
.btn {
padding: 10px 20px;
background: #1890ff;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
.btn:hover { background: #40a9ff; }
.btn-danger { background: #ff4d4f; }
.btn-danger:hover { background: #ff7875; }
.message-item {
margin: 8px 0;
padding: 8px 12px;
border-radius: 4px;
background: #e6f7ff;
border: 1px solid #91d5ff;
}
.message-item.received { background: #f6ffed; border-color: #b7eb8f; }
.message-item.sent { background: #e6f7ff; border-color: #91d5ff; }
.message-time {
float: right;
color: #999;
font-size: 12px;
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<span class="status" id="statusIndicator"></span>
<span id="connectionStatus">未连接</span>
<span style="float: right;" id="userId">用户ID: </span>
</div>
<div class="chat-area">
<div class="message-area" id="messageArea"></div>
<div class="input-area">
<input type="text" class="message-input" id="messageInput"
placeholder="输入消息..." onkeypress="handleKeyPress(event)">
<button class="btn" onclick="sendMessage()">发送</button>
<button class="btn btn-danger" onclick="disconnect()">断开</button>
</div>
</div>
</div>
<script>
let ws = null;
let heartbeatTimer = null;
let reconnectTimer = null;
let userId = 'user_' + Math.random().toString(36).substr(2, 9);
document.getElementById('userId').textContent = '用户ID: ' + userId;
function connect() {
const url = `ws://localhost:8080/ws/push/${userId}`;
if (ws && ws.readyState === WebSocket.OPEN) {
return;
}
ws = new WebSocket(url);
ws.onopen = function() {
updateStatus(true);
appendMessage('系统', '连接已建立', 'system');
startHeartbeat();
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
};
ws.onmessage = function(event) {
const data = event.data;
try {
const jsonData = JSON.parse(data);
appendMessage('服务器', jsonData, 'received');
} catch (e) {
appendMessage('服务器', data, 'received');
}
};
ws.onclose = function(event) {
updateStatus(false);
appendMessage('系统', `连接已关闭 (${event.code})`, 'system');
stopHeartbeat();
// 自动重连
if (event.code !== 1000) {
reconnect();
}
};
ws.onerror = function(error) {
console.error('WebSocket错误:', error);
appendMessage('系统', '连接发生错误', 'system');
};
}
function sendMessage() {
const input = document.getElementById('messageInput');
const message = input.value.trim();
if (!message) return;
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(message);
appendMessage('我', message, 'sent');
input.value = '';
} else {
appendMessage('系统', '连接未建立,请重试', 'system');
}
}
function disconnect() {
if (ws) {
ws.close(1000, '用户主动断开');
ws = null;
}
stopHeartbeat();
updateStatus(false);
}
function reconnect() {
if (reconnectTimer) return;
appendMessage('系统', '5秒后自动重连...', 'system');
reconnectTimer = setTimeout(() => {
appendMessage('系统', '正在重连...', 'system');
connect();
}, 5000);
}
function startHeartbeat() {
heartbeatTimer = setInterval(() => {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send('ping');
}
}, 30000);
}
function stopHeartbeat() {
if (heartbeatTimer) {
clearInterval(heartbeatTimer);
heartbeatTimer = null;
}
}
function updateStatus(connected) {
const indicator = document.getElementById('statusIndicator');
const status = document.getElementById('connectionStatus');
if (connected) {
indicator.className = 'status connected';
status.textContent = '已连接';
status.style.color = '#52c41a';
} else {
indicator.className = 'status disconnected';
status.textContent = '未连接';
status.style.color = '#ff4d4f';
}
}
function appendMessage(sender, content, type) {
const area = document.getElementById('messageArea');
const div = document.createElement('div');
div.className = `message-item ${type}`;
const time = new Date().toLocaleTimeString();
div.innerHTML = `
<strong>${sender}:</strong>
<span>${typeof content === 'object' ? JSON.stringify(content) : content}</span>
<span class="message-time">${time}</span>
`;
area.appendChild(div);
area.scrollTop = area.scrollHeight;
}
function handleKeyPress(event) {
if (event.key === 'Enter') {
sendMessage();
}
}
// 页面加载时自动连接
window.onload = function() {
connect();
};
// 页面关闭时断开连接
window.onbeforeunload = function() {
disconnect();
};
</script>
</body>
</html>
这个案例提供了两种实现方式:
- 标准Java WebSocket:适合独立部署的Java应用
- Spring Boot集成:适合微服务架构,提供了更完善的功能
核心功能包括:
- 连接管理和心跳检测
- 点对点推送和群发
- 自动重连机制
- 实时数据推送
- REST API接口支持手动触发推送
建议使用Spring Boot方案,因为它可以更好地与现有Spring项目集成,提供更丰富的功能和更好的扩展性。