如何用Java案例实现WebSocket推送?

wen java案例 1

本文目录导读:

如何用Java案例实现WebSocket推送?

  1. 方案一:使用Java标准库实现
  2. 方案二:使用Spring Boot实现(推荐)

我来提供一个完整的Java WebSocket推送案例,使用Java标准库和Spring Boot两种方式实现。

使用Java标准库实现

添加依赖(Maven)

<dependency>
    <groupId>javax.websocket</groupId>
    <artifactId>javax.websocket-api</artifactId>
    <version>1.1</version>
    <scope>provided</scope>
</dependency>

WebSocket服务端

package com.example.websocket;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
@ServerEndpoint("/websocket")
public class WebSocketServer {
    // 记录当前在线连接数
    private static final AtomicInteger onlineCount = new AtomicInteger(0);
    // 存放每个客户端对应的WebSocket对象
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = 
        new CopyOnWriteArraySet<>();
    // 与客户端的连接会话
    private Session session;
    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        webSocketSet.add(this);
        onlineCount.incrementAndGet();
        System.out.println("新连接加入!当前在线人数:" + onlineCount.get());
        // 发送欢迎消息
        sendMessage("欢迎连接到WebSocket服务!");
    }
    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);
        onlineCount.decrementAndGet();
        System.out.println("连接关闭!当前在线人数:" + onlineCount.get());
    }
    /**
     * 收到客户端消息后调用的方法
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("来自客户端的消息:" + message);
        // 群发消息
        for (WebSocketServer item : webSocketSet) {
            item.sendMessage("服务端回复:" + message);
        }
    }
    /**
     * 发生错误时调用
     */
    @OnError
    public void onError(Session session, Throwable error) {
        System.out.println("发生错误");
        error.printStackTrace();
    }
    /**
     * 向客户端发送消息
     */
    public void sendMessage(String message) {
        try {
            this.session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 群发自定义消息
     */
    public static void sendToAll(String message) {
        System.out.println("群发消息给所有客户端:" + message);
        for (WebSocketServer item : webSocketSet) {
            item.sendMessage(message);
        }
    }
    /**
     * 获取在线人数
     */
    public static int getOnlineCount() {
        return onlineCount.get();
    }
}

WebSocket配置类

package com.example.websocket;
import javax.websocket.server.ServerEndpointConfig;
import javax.websocket.server.ServerEndpointConfig.Configurator;
public class WebSocketConfigurator extends Configurator {
    @Override
    public void modifyHandshake(ServerEndpointConfig sec, 
                                javax.websocket.HandshakeRequest request,
                                javax.websocket.HandshakeResponse response) {
        // 可以在这里添加握手时的自定义逻辑
        System.out.println("WebSocket握手配置");
    }
}

测试客户端(HTML页面)

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">WebSocket测试客户端</title>
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; }
        #message { width: 300px; height: 100px; }
        #output { border: 1px solid #ccc; padding: 10px; height: 300px; overflow-y: auto; }
    </style>
</head>
<body>
    <h2>WebSocket测试</h2>
    <div>
        <input type="text" id="msgInput" placeholder="输入消息" />
        <button onclick="sendMessage()">发送</button>
    </div>
    <div id="output"></div>
    <script>
        let websocket = null;
        // 连接WebSocket
        function connect() {
            const url = "ws://localhost:8080/websocket"; // 修改为实际服务器地址
            websocket = new WebSocket(url);
            websocket.onopen = function() {
                appendMessage("连接已建立");
            };
            websocket.onmessage = function(event) {
                appendMessage("收到消息: " + event.data);
            };
            websocket.onclose = function() {
                appendMessage("连接已关闭");
            };
            websocket.onerror = function(error) {
                appendMessage("发生错误: " + error);
            };
        }
        // 发送消息
        function sendMessage() {
            const msg = document.getElementById("msgInput").value;
            if (websocket && websocket.readyState === WebSocket.OPEN) {
                websocket.send(msg);
                appendMessage("发送消息: " + msg);
                document.getElementById("msgInput").value = "";
            } else {
                appendMessage("连接未建立");
            }
        }
        // 显示消息
        function appendMessage(msg) {
            const output = document.getElementById("output");
            const p = document.createElement("p");
            p.textContent = msg;
            output.appendChild(p);
            output.scrollTop = output.scrollHeight;
        }
        // 页面加载时连接
        window.onload = connect;
    </script>
</body>
</html>

使用Spring Boot实现(推荐)

添加Maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

WebSocket配置类

package com.example.websocket.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
@EnableWebSocket
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

WebSocket服务端(使用Spring注解)

package com.example.websocket.server;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
@Component
@ServerEndpoint("/ws/push")
public class WebSocketPushServer {
    // 存储所有连接的会话
    private static CopyOnWriteArrayList<WebSocketPushServer> clients = 
        new CopyOnWriteArrayList<>();
    private Session session;
    private String userId;
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        this.session = session;
        this.userId = userId;
        clients.add(this);
        System.out.println("用户 " + userId + " 连接成功,当前在线:" + clients.size());
    }
    @OnClose
    public void onClose() {
        clients.remove(this);
        System.out.println("用户 " + userId + " 断开连接,当前在线:" + clients.size());
    }
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("收到用户 " + userId + " 的消息:" + message);
        // 处理业务逻辑
        handleMessage(message);
    }
    @OnError
    public void onError(Session session, Throwable error) {
        System.out.println("用户 " + userId + " 发生错误");
        error.printStackTrace();
    }
    /**
     * 发送消息给指定用户
     */
    public static void sendToUser(String userId, String message) {
        for (WebSocketPushServer client : clients) {
            if (client.userId.equals(userId)) {
                try {
                    client.session.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 群发消息给所有用户
     */
    public static void sendToAll(String message) {
        for (WebSocketPushServer client : clients) {
            try {
                client.session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * 处理业务消息
     */
    private void handleMessage(String message) {
        // 这里添加业务逻辑处理
        if (message.startsWith("BROADCAST:")) {
            sendToAll("【广播】" + message.substring(10));
        } else {
            sendToUser(userId, "【回复】" + message);
        }
    }
}

消息推送服务接口

package com.example.websocket.service;
import org.springframework.stereotype.Service;
@Service
public class PushService {
    /**
     * 推送实时数据
     */
    public void pushRealTimeData(String dataType, Object data) {
        String message = String.format("{\"type\":\"%s\",\"data\":%s}", 
                                       dataType, data.toString());
        WebSocketPushServer.sendToAll(message);
    }
    /**
     * 推送通知给特定用户
     */
    public void pushNotification(String userId, String title, String content) {
        String message = String.format(
            "{\"type\":\"notification\",\"title\":\"%s\",\"content\":\"%s\"}", 
            title, content);
        WebSocketPushServer.sendToUser(userId, message);
    }
}

控制器接口

package com.example.websocket.controller;
import com.example.websocket.service.PushService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/push")
public class PushController {
    @Autowired
    private PushService pushService;
    /**
     * 手动触发推送
     */
    @PostMapping("/send")
    public String sendPush(@RequestParam String message) {
        WebSocketPushServer.sendToAll(message);
        return "推送成功:" + message;
    }
    /**
     * 发送给特定用户
     */
    @PostMapping("/sendToUser")
    public String sendToUser(@RequestParam String userId, 
                            @RequestParam String message) {
        WebSocketPushServer.sendToUser(userId, message);
        return "推送成功给用户 " + userId;
    }
    /**
     * 推送实时数据
     */
    @PostMapping("/realtime")
    public String pushRealTime(@RequestParam String type, 
                              @RequestParam String data) {
        pushService.pushRealTimeData(type, data);
        return "实时数据推送成功";
    }
}

增强版客户端(支持心跳检测)

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">实时推送客户端</title>
    <style>
        * { margin: 0; padding: 0; box-sizing: border-box; }
        body { font-family: 'Microsoft YaHei', sans-serif; padding: 20px; background: #f5f5f5; }
        .container { max-width: 600px; margin: 0 auto; }
        .header { background: #1890ff; color: white; padding: 15px; border-radius: 8px 8px 0 0; }
        .status { 
            display: inline-block; 
            width: 10px; 
            height: 10px; 
            border-radius: 50%; 
            margin-right: 10px;
        }
        .connected { background: #52c41a; }
        .disconnected { background: #ff4d4f; }
        .chat-area {
            background: white;
            border: 1px solid #e8e8e8;
            border-top: none;
            padding: 15px;
        }
        .message-area {
            height: 400px;
            overflow-y: auto;
            margin-bottom: 15px;
            padding: 10px;
            background: #fafafa;
            border-radius: 4px;
        }
        .input-area { display: flex; gap: 10px; }
        .message-input {
            flex: 1;
            padding: 10px;
            border: 1px solid #d9d9d9;
            border-radius: 4px;
        }
        .btn {
            padding: 10px 20px;
            background: #1890ff;
            color: white;
            border: none;
            border-radius: 4px;
            cursor: pointer;
        }
        .btn:hover { background: #40a9ff; }
        .btn-danger { background: #ff4d4f; }
        .btn-danger:hover { background: #ff7875; }
        .message-item {
            margin: 8px 0;
            padding: 8px 12px;
            border-radius: 4px;
            background: #e6f7ff;
            border: 1px solid #91d5ff;
        }
        .message-item.received { background: #f6ffed; border-color: #b7eb8f; }
        .message-item.sent { background: #e6f7ff; border-color: #91d5ff; }
        .message-time {
            float: right;
            color: #999;
            font-size: 12px;
        }
    </style>
</head>
<body>
    <div class="container">
        <div class="header">
            <span class="status" id="statusIndicator"></span>
            <span id="connectionStatus">未连接</span>
            <span style="float: right;" id="userId">用户ID: </span>
        </div>
        <div class="chat-area">
            <div class="message-area" id="messageArea"></div>
            <div class="input-area">
                <input type="text" class="message-input" id="messageInput" 
                       placeholder="输入消息..." onkeypress="handleKeyPress(event)">
                <button class="btn" onclick="sendMessage()">发送</button>
                <button class="btn btn-danger" onclick="disconnect()">断开</button>
            </div>
        </div>
    </div>
    <script>
        let ws = null;
        let heartbeatTimer = null;
        let reconnectTimer = null;
        let userId = 'user_' + Math.random().toString(36).substr(2, 9);
        document.getElementById('userId').textContent = '用户ID: ' + userId;
        function connect() {
            const url = `ws://localhost:8080/ws/push/${userId}`;
            if (ws && ws.readyState === WebSocket.OPEN) {
                return;
            }
            ws = new WebSocket(url);
            ws.onopen = function() {
                updateStatus(true);
                appendMessage('系统', '连接已建立', 'system');
                startHeartbeat();
                if (reconnectTimer) {
                    clearTimeout(reconnectTimer);
                    reconnectTimer = null;
                }
            };
            ws.onmessage = function(event) {
                const data = event.data;
                try {
                    const jsonData = JSON.parse(data);
                    appendMessage('服务器', jsonData, 'received');
                } catch (e) {
                    appendMessage('服务器', data, 'received');
                }
            };
            ws.onclose = function(event) {
                updateStatus(false);
                appendMessage('系统', `连接已关闭 (${event.code})`, 'system');
                stopHeartbeat();
                // 自动重连
                if (event.code !== 1000) {
                    reconnect();
                }
            };
            ws.onerror = function(error) {
                console.error('WebSocket错误:', error);
                appendMessage('系统', '连接发生错误', 'system');
            };
        }
        function sendMessage() {
            const input = document.getElementById('messageInput');
            const message = input.value.trim();
            if (!message) return;
            if (ws && ws.readyState === WebSocket.OPEN) {
                ws.send(message);
                appendMessage('我', message, 'sent');
                input.value = '';
            } else {
                appendMessage('系统', '连接未建立,请重试', 'system');
            }
        }
        function disconnect() {
            if (ws) {
                ws.close(1000, '用户主动断开');
                ws = null;
            }
            stopHeartbeat();
            updateStatus(false);
        }
        function reconnect() {
            if (reconnectTimer) return;
            appendMessage('系统', '5秒后自动重连...', 'system');
            reconnectTimer = setTimeout(() => {
                appendMessage('系统', '正在重连...', 'system');
                connect();
            }, 5000);
        }
        function startHeartbeat() {
            heartbeatTimer = setInterval(() => {
                if (ws && ws.readyState === WebSocket.OPEN) {
                    ws.send('ping');
                }
            }, 30000);
        }
        function stopHeartbeat() {
            if (heartbeatTimer) {
                clearInterval(heartbeatTimer);
                heartbeatTimer = null;
            }
        }
        function updateStatus(connected) {
            const indicator = document.getElementById('statusIndicator');
            const status = document.getElementById('connectionStatus');
            if (connected) {
                indicator.className = 'status connected';
                status.textContent = '已连接';
                status.style.color = '#52c41a';
            } else {
                indicator.className = 'status disconnected';
                status.textContent = '未连接';
                status.style.color = '#ff4d4f';
            }
        }
        function appendMessage(sender, content, type) {
            const area = document.getElementById('messageArea');
            const div = document.createElement('div');
            div.className = `message-item ${type}`;
            const time = new Date().toLocaleTimeString();
            div.innerHTML = `
                <strong>${sender}:</strong> 
                <span>${typeof content === 'object' ? JSON.stringify(content) : content}</span>
                <span class="message-time">${time}</span>
            `;
            area.appendChild(div);
            area.scrollTop = area.scrollHeight;
        }
        function handleKeyPress(event) {
            if (event.key === 'Enter') {
                sendMessage();
            }
        }
        // 页面加载时自动连接
        window.onload = function() {
            connect();
        };
        // 页面关闭时断开连接
        window.onbeforeunload = function() {
            disconnect();
        };
    </script>
</body>
</html>

这个案例提供了两种实现方式:

  1. 标准Java WebSocket:适合独立部署的Java应用
  2. Spring Boot集成:适合微服务架构,提供了更完善的功能

核心功能包括:

  • 连接管理和心跳检测
  • 点对点推送和群发
  • 自动重连机制
  • 实时数据推送
  • REST API接口支持手动触发推送

建议使用Spring Boot方案,因为它可以更好地与现有Spring项目集成,提供更丰富的功能和更好的扩展性。

抱歉,评论功能暂时关闭!