热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

WebSocket实时推送消息

一个小型项目,但是中间涉及到一个gps定位推送的需求,找了一些资料,在此做一个小结1.需要做到实时后台推送到前端2.前端自动关闭或者主动关闭都不影响下次正常上线推送在线客户端:ht

一个小型项目,但是中间涉及到一个gps定位推送的需求,找了一些资料,在此做一个小结

1. 需要做到实时后台推送到前端


2.前端自动关闭或者主动关闭都不影响下次正常上线推送

在线客户端:http://www.websocket-test.com/

中间需要推送的数据存储到redis中了,因项目的不同可以做相对于的调整。

3.定时异步操作

后台通信处理,涉及连接客户端后对接的消息处理

gps
type=2&013320397086
type=2&013502892975
type=2&015391619781
type=2&013344952593
type=2&013798501105
013798501105
000000000000
alarmData
type=1&014092603014
type=1&015391619781
type=1&014092603008
type=1&014080602002

业务代码如下: 

package exsun.bigdata.hbasestorage.websocket;
import com.alibaba.dubbo.config.annotation.Service;
import exsun.bigdata.hbasestorage.config.RedisConfig;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Service
public class WsServer extends WebSocketServer {
private static final Logger logger = LoggerFactory.getLogger(WsServer.class);
public static Long DEVICE_ALARM_SLEEP_TIME = 10L;
public static Long GPS_SLEEP_TIME = 5L;
//initialDelay是说系统启动后,需要等待多久才开始执行
public static Long INIT_DELAY = 1L;
public static String DeviceAlarmtableName = "DeviceAlarmHbaseFactory";
public static String GpsDatatableName = "GpsDataStorageFactory";
private static final Map GpsDataMap = new HashMap();
private static final Map DeviceAlarmMap = new HashMap();
private static RedisConfig redisCOnfig= new RedisConfig();
private static int userNum = 0;
public WsServer(int port) {
super(new InetSocketAddress(port));
}
public WsServer(InetSocketAddress address) {
super(address);
}
public static void setRedisConfig(RedisConfig redis) {
redisCOnfig= redis;
}
@Override
public void onOpen(WebSocket conn, ClientHandshake handshake) {
// ws连接的时候触发的代码,onOpen中我们不做任何操作
logger.info(" onOpen start: ");
WsPool.addUser(userNum+"",conn);
userNum++;
}
@Override
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
//断开连接时候触发代码
userLeave(conn);
userNum--;
Future Gpsfuture = GpsDataMap.get(conn);
if (null != conn && null != Gpsfuture) {
Gpsfuture.cancel(true);
}
Future Devicfuture = DeviceAlarmMap.get(conn);
if (null != conn && null != Devicfuture) {
Devicfuture.cancel(true);
}
}
@Override
public void onMessage(WebSocket conn, String message) {
logger.info( " onMessage: " + message);
if(message == null || "".equals(message)){
WsPool.sendMessageToUser(conn, "500");
}
String dvo= message.substring((message.lastIndexOf("&")>0) ? (message.lastIndexOf("&")+1):message.length());
String type=message.substring(0,((message.indexOf("&")>0) ? message.indexOf("&"): message.length()));
if (("type=1").equals(type)) {//告警
//创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
//参数:corePoolSize - 池中所保存的线程数,即使线程是空闲的也包括在内。
//返回:新创建的安排线程池
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
// 从现在开始1秒钟之后,每隔10秒钟执行一次告警推送
Future future = service.scheduleAtFixedRate(new DynamicTiminTasks(conn,dvo,redisConfig,DeviceAlarmtableName), INIT_DELAY, DEVICE_ALARM_SLEEP_TIME, TimeUnit.SECONDS);
DeviceAlarmMap.put(conn,future);
} else if (("type=2").equals(type)) {//gps实时数据
//以固定周期频率执行任务 创建一个线程执行
ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
// 从现在开始1秒钟之后,每隔5秒钟执行一次GPS推送
Future future = service.scheduleAtFixedRate(new DynamicTiminTasks(conn,dvo,redisConfig,GpsDatatableName), INIT_DELAY, GPS_SLEEP_TIME, TimeUnit.SECONDS);
GpsDataMap.put(conn,future);
}
}
@Override
public void onError(WebSocket conn, Exception ex) {
//错误时候触发的代码
logger.error(" onError on error");
ex.printStackTrace();
}
/**
* 去除掉失效的websocket链接
* @param conn
*/
private void userLeave(WebSocket conn){
WsPool.removeUser(conn);
userNum--;
Future Gpsfuture = GpsDataMap.get(conn);
if (null != conn && null != Gpsfuture) {
Gpsfuture.cancel(true);
}
Future Devicfuture = DeviceAlarmMap.get(conn);
if (null != conn && null != Devicfuture) {
Devicfuture.cancel(true);
}
}
/**
* 将websocket加入用户池
* @param conn
* @param userName
*/
private void userJoin(WebSocket conn,String userName){
WsPool.addUser(userName, conn);
userNum++;
}
public static void start(int port){
WsServer s = new WsServer(port);
s.start();
}
//推送类型&推送设备编号
public static void main1111(String[] args) {
WsServer.start(8887);
}
}

实体发送如下:

package exsun.bigdata.hbasestorage.websocket;
import org.java_websocket.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class WsPool {
private static final Logger logger = LoggerFactory.getLogger(WsPool.class);
private static final Map wsUserMap = new HashMap();
public static final Integer OK = 0;
public static final Integer ERROR = 1;
/**
* 通过websocket连接获取其对应的用户
*
* @param conn
* @return
*/
public static String getUserByWs(WebSocket conn) {
return wsUserMap.get(conn);
}
/**
* 根据userName获取WebSocket,这是一个list,此处取第一个
* 因为有可能多个websocket对应一个userName(但一般是只有一个,因为在close方法中,我们将失效的websocket连接去除了)
*
* @param
*/
public static WebSocket getWsByUser(String userName) {
Set keySet = wsUserMap.keySet();
synchronized (keySet) {
for (WebSocket conn : keySet) {
String cuser = wsUserMap.get(conn);
if (cuser.equals(userName)) {
return conn;
}
}
}
return null;
}
/**
* 向连接池中添加连接
*
* @param
*/
public static void addUser(String userName, WebSocket conn) {
wsUserMap.put(conn, userName); // 添加连接
}
/**
* 获取所有连接池中的用户,因为set是不允许重复的,所以可以得到无重复的user数组
*
* @return
*/
public static Collection getOnlineUser() {
List setUsers = new ArrayList();
Collection setUser = wsUserMap.values();
for (String u : setUser) {
setUsers.add(u);
logger.error(" getOnlineUser setUser: " + setUser);
}
return setUsers;
}
/**
* 移除连接池中的连接
*
* @param
*/
public static boolean removeUser(WebSocket conn) {
if (wsUserMap.containsKey(conn)) {
wsUserMap.remove(conn); // 移除连接
return true;
} else {
return false;
}
}
/**
* 向特定的用户发送数据
*
* @param
* @param message
*/
public static Integer sendMessageToUser(WebSocket conn, String message) {
if (null != conn && null != wsUserMap.get(conn)) {
logger.info(" sendMessageToUser message: " + message);
conn.send(message);
return OK;
}else {
logger.error(" sendMessageToUser conn is null");
return ERROR;
}
}
/**
* 向所有的用户发送消息
*
* @param message
*/
public static void sendMessageToAll(String message) {
Set keySet = wsUserMap.keySet();
synchronized (keySet) {
for (WebSocket conn : keySet) {
String user = wsUserMap.get(conn);
if (user != null) {
logger.info(" sendMessageToAll message: " + message);
conn.send(message);
}
}
}
}
}

定时任务异步处理如下:

package exsun.bigdata.hbasestorage.websocket;
import exsun.bigdata.bigdatadata.entity.DeviceAlarmData;
import exsun.jt808.data.upload_data.GpsData;
import exsun.bigdata.hbasestorage.config.RedisConfig;
import org.java_websocket.WebSocket;
import org.springframework.data.redis.core.HashOperations;
class DynamicTiminTasks implements Runnable {
private WebSocket conn;
private String dvo;
private RedisConfig redisConfig;
private String sendType;
DynamicTiminTasks(WebSocket conn,String dvo,RedisConfig redisConfig,String sendType) {
this.cOnn= conn;
this.dvo = dvo;
this.redisCOnfig= redisConfig;
this.sendType = sendType;
}
@Override
public void run() {
if (sendType.equals(WsServer.GpsDatatableName)) {
HashOperations hashOperatiOns= redisConfig.redisTemplate().opsForHash();
GpsData gpsData = hashOperations.get(WsServer.GpsDatatableName, dvo);
String gpsdataStr = "GpsData(lat:" + gpsData.getLat() + '\'' + "lng:" + gpsData.getLng() + '\'' + " hgt=" + gpsData.getHgt() + '\'' +
"spd=" + gpsData.getSpd() + '\'' + ", dre=" + gpsData.getDre() + '\'' + ", gte=" + gpsData.getGte() + '\'' + ", gdt=" + gpsData.getGdt() + '\'' +
", mie=" + gpsData.getMie() + '\'' + ", poi=" + gpsData.getPoi() + '\'' + ", alm=" + gpsData.getAlm() + '\'' + ", amm=" + gpsData.getAmm() + '\'' +
", sts=" + gpsData.getSts() + '\'' + ", stm=" + gpsData.getStm() + '\'' + ", pop=" + gpsData.getPop() + '\'' + ", cnt=" + gpsData.getCnt() + '\'' +
", Ons=" + gpsData.getOns() + ", dvo=" + gpsData.getDvo() + ")";
WsPool.sendMessageToUser(conn, gpsdataStr);
}else if (sendType.equals(WsServer.DeviceAlarmtableName)) {
HashOperations hashOperatiOns= redisConfig.redisTemplate().opsForHash();
DeviceAlarmData deviceAlarmData = hashOperations.get(WsServer.DeviceAlarmtableName, dvo);
WsPool.sendMessageToUser(conn, deviceAlarmData.toString());
}
}
}

 代码中带有部分业务属性,有需要的朋友可以结合实际情况进一步简化操作!



推荐阅读
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
  • 本文详细介绍了在 Android 7.1 系统中调整屏幕分辨率和默认音量设置的方法。针对系统默认音量过大的问题,提供了具体的步骤来降低系统、铃声、媒体和闹钟的默认音量,以提升用户体验。此外,还涵盖了如何通过系统设置或使用第三方工具来优化屏幕分辨率,确保设备显示效果更加清晰和流畅。 ... [详细]
  • V8不仅是一款著名的八缸发动机,广泛应用于道奇Charger、宾利Continental GT和BossHoss摩托车中。自2008年以来,作为Chromium项目的一部分,V8 JavaScript引擎在性能优化和技术创新方面取得了显著进展。该引擎通过先进的编译技术和高效的垃圾回收机制,显著提升了JavaScript的执行效率,为现代Web应用提供了强大的支持。持续的优化和创新使得V8在处理复杂计算和大规模数据时表现更加出色,成为众多开发者和企业的首选。 ... [详细]
  • 线程能否先以安全方式获取对象,再进行非安全发布? ... [详细]
  • 本文详细探讨了OpenCV中人脸检测算法的实现原理与代码结构。通过分析核心函数和关键步骤,揭示了OpenCV如何高效地进行人脸检测。文章不仅提供了代码示例,还深入解释了算法背后的数学模型和优化技巧,为开发者提供了全面的理解和实用的参考。 ... [详细]
  • 本文详细解析了JSONP(JSON with Padding)的跨域机制及其工作原理。JSONP是一种通过动态创建``标签来实现跨域请求的技术,其核心在于利用了浏览器对``标签的宽松同源策略。文章不仅介绍了JSONP的产生背景,还深入探讨了其具体实现过程,包括如何构造请求、服务器端如何响应以及客户端如何处理返回的数据。此外,还分析了JSONP的优势和局限性,帮助读者全面理解这一技术在现代Web开发中的应用。 ... [详细]
  • 如何在Linux系统上部署MySQL 5.7.28
    本文详细介绍了在Linux系统上部署MySQL 5.7.28的具体步骤。通过官方下载页面获取最新安装包后,按照提供的指南进行配置和安装。文章内容实用性强,适合初学者和有经验的管理员参考。 ... [详细]
  • 本文整理了Java中org.jboss.netty.buffer.ChannelBuffer.readUnsignedInt()方法的一些代码示例,展示了C ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • 在C#中开发MP3播放器时,我正在考虑如何高效存储元数据以便快速检索。选择合适的数据结构,如字典或数组,对于优化性能至关重要。字典能够提供快速的键值对查找,而数组则在连续存储和遍历方面表现优异。根据具体需求,合理选择数据结构将显著提升应用的响应速度和用户体验。 ... [详细]
  • FastDFS Nginx 扩展模块的源代码解析与技术剖析
    FastDFS Nginx 扩展模块的源代码解析与技术剖析 ... [详细]
  • 在Kubernetes上部署多个Mitmproxy代理服务器以实现高效流量管理 ... [详细]
  • 解决MySQL 5.1服务器无法正确识别中文字符的问题
    在使用MySQL 5.1服务器时,可能会遇到无法正确识别中文字符的问题。由于相关资料较少且不够全面,本文将详细阐述解决方案。首先,需要检查MySQL的配置文件,确保字符集设置正确,并通过命令行工具验证当前的字符编码配置。此外,建议更新到最新版本以避免此类问题。 ... [详细]
  • 小记hbase数据库java API 常用方法及案例
    HBaseAdmin类:管理hbase数据库的表信息,‘创建表、删除表、列出表选项、使表有效/无效、添加或删除列簇’;  ... [详细]
  • 本文整理了Java中org.apache.hadoop.hbase.security.visibility.Authorizations.<init>()方 ... [详细]
author-avatar
霸气小米鱼鱼_156
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有