作者:o.o | 来源:互联网 | 2023-08-01 13:02
我提倡的是用白话文,和经验去传承分享程序,让程序更加开源话,共享化,让后来者,初学者,遇到此类困难者等,少走弯路,提高效率,不喜勿喷。有更好的建议,可以留言探讨。。。。
首先说下此案例当时做的时候有点复杂,最后还是克服完成。拿出来广大网友分享,但愿能帮助你在java程序的道路上越走越远。。。
背景:
客户通过网页,在通过websocket协议,和Java后端创建连接,在通过Java后端和语音交互接口创建连接,接收到的返回数据,返回给前端在页面展示出来。Java就是做了一个中间件的作用。即是服务端又是客户端。
- 说完背景,直接上干货
org.java-websocketJava-WebSocket1.4.0org.springframework.bootspring-boot-starter-websocket
package com.datago.robot.common.utils;import com.datago.robot.entity.DatagoApiWithBLOBs;
import com.datago.robot.service.DatagoApiService;
import org.java_websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;/****/
@ServerEndpoint("/serverVue/{apiCode}")
@Component
public class SocketServerUtils {private Logger log = LoggerFactory.getLogger(SocketServerUtils.class);private Session session;private WebSocketClient client;private String apiCode;/*** 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/private static int OnlineCount= 0;/*** concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。* 在外部可以获取此连接的所有websocket对象,并能对其触发消息发送功能,我们的定时发送核心功能的实现在与此变量*/private static CopyOnWriteArraySet webSocketMap = new CopyOnWriteArraySet();private static DatagoApiService datagoApiService;@Autowiredpublic void setDatagoApiService(DatagoApiService datagoApiService) {SocketServerUtils.datagoApiService = datagoApiService;}/*** 建立连接** @param apiCode* @param session*/@OnOpenpublic void onOpen(@PathParam(value = "apiCode") String apiCode, Session session) {this.session = session;addOnlineCount();webSocketMap.add(this);//连接调用第三方服务client = SocketResultUtils.getClient("ws://192.168.0.109:8013/websocket");}/*** 收到客户端消息后调用的方法** @param message 客户端发送过来的音频数据*/@OnMessagepublic void onMessage(@PathParam(value = "apiCode") String apiCode, byte[] message) {log.info("音频数据报文:" + message);try {//在线client.send(message);ExceptionLog.isSuccess(apiCode, null,null);} catch (Exception e) {//离线isconnct(apiCode);client.send(message);ExceptionLog.isSuccess(apiCode, null,null);}}/*** 收到客户端消息后调用的方法** @param message 客户端发送过来的字符数据*/@OnMessagepublic void onMessage(@PathParam(value = "apiCode") String apiCode, String message) {log.info("字符数据报文:" + message);try {//在线client.send(message);ExceptionLog.isSuccess(apiCode, message,null);} catch (Exception e) {//离线isconnct(apiCode);client.send(message);ExceptionLog.isSuccess(apiCode, message,null);}}/*** 连接关闭调用的方法*/@OnClosepublic void onClose(@PathParam(value = "apiCode") String apiCode) {//请求第三方关闭连接webSocketMap.remove(this);subOnlineCount();log.info("有一连接关闭!当前在线人数为" + getOnlineCount());}/*** @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error, @PathParam(value = "apiCode") String apiCode) {log.error("错误原因:" + error.getMessage());}/*** 新加* 判断连接(在线,离线)** @param apiCode*/private void isconnct(String apiCode) {DatagoApiWithBLOBs datagoApi = datagoApiService.selectByApiCode(apiCode);if (Utils.isNotEmpty(datagoApi)) {//判断在线(true),离线(false)// 在线String apiUrl = datagoApi.getApiUrl();if (apiUrl.startsWith("ws")) {client = SocketResultUtils.getClient(apiUrl);if (Utils.isEmpty(client)) {//离线String apiOffUrl = datagoApi.getApiOffUrl();if (apiOffUrl.startsWith("ws")) {client = SocketResultUtils.getClient(apiOffUrl);if (Utils.isEmpty(client)) {ExceptionLog.isConnect(apiCode, null);}} else {ExceptionLog.isUrl(apiCode, null);}}} else {ExceptionLog.isUrl(apiCode, null);}}}/*** 实现服务器主动推送** @param message*/public void sendMessage(String message) {try {this.session.getBasicRemote().sendText(message);} catch (IOException e) {e.printStackTrace();}}public static synchronized int getOnlineCount() {return onlineCount;}public static synchronized void addOnlineCount() {SocketServerUtils.onlineCount++;}public static synchronized void subOnlineCount() {SocketServerUtils.onlineCount--;}public static CopyOnWriteArraySet getWebSocketSet() {return webSocketMap;}public static void setWebSocketSet(CopyOnWriteArraySet webSocketSet) {SocketServerUtils.webSocketMap = webSocketSet;}}
- Java与第三方服务交互代码(客户端),接收第三方服务的返回数据
package com.datago.robot.common.utils;import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;import java.net.*;import java.util.concurrent.CopyOnWriteArraySet;@Slf4j
@Component
public class SocketResultUtils {public static Logger log = LoggerFactory.getLogger(SocketResultUtils.class);/*** 获取客户端连接实例** @param uri* @return*/public static WebSocketClient getClient(String uri) {try {//创建客户端连接对象WebSocketClient client = new WebSocketClient(new URI(uri), new Draft_6455()) {/*** 建立连接调用* @param serverHandshake*/@Overridepublic void onOpen(ServerHandshake serverHandshake) {log.info("建立连接");}/*** 收到服务端消息调用* @param s*/@Overridepublic void onMessage(String s) {log.info("处理结束返回消息:" + s);//接收第三方websocket服务发送数据CopyOnWriteArraySet webSocketSet =SocketServerUtils.getWebSocketSet();if (Utils.isNotEmpty(webSocketSet)) {int i = 0;synchronized (webSocketSet) {webSocketSet.forEach(c -> {//返回前端数据log.info("返回结果数据, data = {}", s);for (int j = 0; j <10; j++) {c.sendMessage(s);}});log.info("收到来自服务端的消息:::" + s);}}}/*** 断开连接调用* @param i* @param s* @param b*/@Overridepublic void onClose(int i, String s, boolean b) {log.info("关闭连接:::" + "i = " + i + ":::s = " + s + ":::b = " + b);}/*** 连接报错调用* @param e*/@Overridepublic void onError(Exception e) {log.error("报错了:::" + e.getMessage());}};//请求与服务端建立连接client.connect();//判断连接状态,0为请求中 1为已建立 其它值都是建立失败while (client.getReadyState().ordinal() == 0) {try {Thread.sleep(200);} catch (Exception e) {log.warn("延迟操作出现问题,但并不影响功能");}log.info("连接中。。。");break;}//连接状态不再是0请求中,判断建立结果是不是1已建立if (client.getReadyState().ordinal() == 1) {return client;}} catch (URISyntaxException e) {log.error(e.getMessage());}return null;}
}