热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

SpringBoot+netty-socketio实现服务器端消息推送

这篇文章主要介绍了SpringBoot+netty-socketio实现服务器端消息推送,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

首先:因为工作需要,需要对接socket.io框架对接,所以目前只能使用netty-socketio。websocket是不支持对接socket.io框架的。

netty-socketio顾名思义他是一个底层基于netty'实现的socket。

在springboot项目中的集成,请看下面的代码

maven依赖


 com.corundumstudio.socketio
 netty-socketio
 1.7.11

 下面就是代码了

首先是配置参数

#socketio配置
socketio:
 host: localhost
 port: 9099
 # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
 maxFramePayloadLength: 1048576
 # 设置http交互最大内容长度
 maxHttpContentLength: 1048576
 # socket连接数大小(如只监听一个端口boss线程组为1即可)
 bossCount: 1
 workCount: 100
 allowCustomRequests: true
 # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
 upgradeTimeout: 1000000
 # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
 pingTimeout: 6000000
 # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
 pingInterval: 25000

上面的注释写的很清楚。下面是config代码

import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * kcm
 */
@Component
public class PushServer implements InitializingBean {

  @Autowired
  private EventListenner eventListenner;

  @Value("${socketio.port}")
  private int serverPort;

  @Value("${socketio.host}")
  private String serverHost;

  @Value("${socketio.bossCount}")
  private int bossCount;

  @Value("${socketio.workCount}")
  private int workCount;

  @Value("${socketio.allowCustomRequests}")
  private boolean allowCustomRequests;

  @Value("${socketio.upgradeTimeout}")
  private int upgradeTimeout;

  @Value("${socketio.pingTimeout}")
  private int pingTimeout;

  @Value("${socketio.pingInterval}")
  private int pingInterval;

  @Override
  public void afterPropertiesSet() throws Exception {
    Configuration cOnfig= new Configuration();
    config.setPort(serverPort);
    config.setHostname(serverHost);
    config.setBossThreads(bossCount);
    config.setWorkerThreads(workCount);
    config.setAllowCustomRequests(allowCustomRequests);
    config.setUpgradeTimeout(upgradeTimeout);
    config.setPingTimeout(pingTimeout);
    config.setPingInterval(pingInterval);

    SocketConfig socketCOnfig= new SocketConfig();
    socketConfig.setReuseAddress(true);
    socketConfig.setTcpNoDelay(true);
    socketConfig.setSoLinger(0);
    config.setSocketConfig(socketConfig);

    SocketIOServer server = new SocketIOServer(config);
    server.addListeners(eventListenner);
    server.start();
    System.out.println("启动正常");
  }
}

在就是监听代码

import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import org.apache.commons.lang3.StringUtils;
import org.bangying.auth.JwtSupport;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;

@Component
public class EventListenner {
  @Resource
  private ClientCache clientCache;

  @Resource
  private JwtSupport jwtSupport;

  /**
   * 客户端连接
   *
   * @param client
   */
  @OnConnect
  public void onConnect(SocketIOClient client) {
    String userId = client.getHandshakeData().getSingleUrlParam("userId");
//    userId = jwtSupport.getApplicationUser().getId().toString();
//    userId = "8";
    UUID sessiOnId= client.getSessionId();
    clientCache.saveClient(userId, sessionId, client);
    System.out.println("建立连接");
  }

  /**
   * 客户端断开
   *
   * @param client
   */
  @OnDisconnect
  public void onDisconnect(SocketIOClient client) {
    String userId = client.getHandshakeData().getSingleUrlParam("userId");
    if (StringUtils.isNotBlank(userId)) {
      clientCache.deleteSessionClient(userId, client.getSessionId());
      System.out.println("关闭连接");
    }
  }

  //消息接收入口,当接收到消息后,查找发送目标客户端,并且向该客户端发送消息,且给自己发送消息
  // 暂未使用
  @OnEvent("messageevent")
  public void onEvent(SocketIOClient client, AckRequest request) {
  }
}

本地缓存信息

import com.corundumstudio.socketio.SocketIOClient;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * kcm
 */
@Component
public class ClientCache {

  //本地缓存
  private static Map> cOncurrentHashMap=new ConcurrentHashMap<>();
  /**
   * 存入本地缓存
   * @param userId 用户ID
   * @param sessionId 页面sessionID
   * @param socketIOClient 页面对应的通道连接信息
   */
  public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){
    if(StringUtils.isNotBlank(userId)){
      HashMap sessiOnIdClientCache=concurrentHashMap.get(userId);
      if(sessiOnIdClientCache==null){
        sessiOnIdClientCache= new HashMap<>();
      }
      sessionIdClientCache.put(sessionId,socketIOClient);
      concurrentHashMap.put(userId,sessionIdClientCache);
    }
  }
  /**
   * 根据用户ID获取所有通道信息
   * @param userId
   * @return
   */
  public HashMap getUserClient(String userId){
    return concurrentHashMap.get(userId);
  }
  /**
   * 根据用户ID及页面sessionID删除页面链接信息
   * @param userId
   * @param sessionId
   */
  public void deleteSessionClient(String userId,UUID sessionId){
    concurrentHashMap.get(userId).remove(sessionId);
  }
}

下面是存储客户端连接信息

import com.corundumstudio.socketio.SocketIOClient;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * kcm
 */
@Component
public class ClientCache {

  //本地缓存
  private static Map> cOncurrentHashMap=new ConcurrentHashMap<>();
  /**
   * 存入本地缓存
   * @param userId 用户ID
   * @param sessionId 页面sessionID
   * @param socketIOClient 页面对应的通道连接信息
   */
  public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){
    if(StringUtils.isNotBlank(userId)){
      HashMap sessiOnIdClientCache=concurrentHashMap.get(userId);
      if(sessiOnIdClientCache==null){
        sessiOnIdClientCache= new HashMap<>();
      }
      sessionIdClientCache.put(sessionId,socketIOClient);
      concurrentHashMap.put(userId,sessionIdClientCache);
    }
  }
  /**
   * 根据用户ID获取所有通道信息
   * @param userId
   * @return
   */
  public HashMap getUserClient(String userId){
    return concurrentHashMap.get(userId);
  }
  /**
   * 根据用户ID及页面sessionID删除页面链接信息
   * @param userId
   * @param sessionId
   */
  public void deleteSessionClient(String userId,UUID sessionId){
    concurrentHashMap.get(userId).remove(sessionId);
  }
}

控制层推送方法

@RestController
@RequestMapping("/push")
public class PushController {
  @Resource
  private ClientCache clientCache;

  @Autowired
  private JwtSupport jwtSupport;

  @GetMapping("/message")
  public String pushTuUser(@Param("id") String id){
    Integer userId = jwtSupport.getApplicationUser().getId();
    HashMap userClient = clientCache.getUserClient(String.valueOf(userId));
    userClient.forEach((uuid, socketIOClient) -> {
      //向客户端推送消息
      socketIOClient.sendEvent("chatevent","服务端推送消息");
    });
    return "success";
  }
}

到此这篇关于SpringBoot+netty-socketio实现服务器端消息推送的文章就介绍到这了,更多相关SpringBoot netty-socketio服务器端推送内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!


推荐阅读
  • 探讨在构建类似Viber或WhatsApp的聊天应用时,如何有效实现客户端(Web、Android、iOS)与服务器之间的连接。本文将分析使用WebSockets标准及其替代方案的优劣。 ... [详细]
  • ServletContext接口在Java Web开发中扮演着重要角色,它提供了一种方式来获取关于整个Web应用程序的信息。通过ServletContext,开发者可以访问初始化参数、共享数据以及应用资源。 ... [详细]
  • 随着毕业设计的结束,我终于有时间更新我的博客了。这次,我将分享如何在自己的服务器上搭建 Bitwarden,一个广受好评的开源密码管理工具。 ... [详细]
  • 远程过程调用(RPC)是一种允许客户端通过网络请求服务器执行特定功能的技术。它简化了分布式系统的交互,使开发者可以像调用本地函数一样调用远程服务,并获得返回结果。本文将深入探讨RPC的工作原理、发展历程及其在现代技术中的应用。 ... [详细]
  • 本文详细介绍了如何利用Go语言和WebSockets技术构建一个高效的实时聊天系统。随着网络应用的日益复杂化,实时交互成为了提升用户体验的关键要素之一。通过本指南,开发者可以学习到最新的技术和最佳实践。 ... [详细]
  • Google排名优化-面向Google(Search Engine Friendly)的URL设计 ... [详细]
  • 深入解析Spring Cloud微服务架构与分布式系统实战
    本文详细介绍了Spring Cloud在微服务架构和分布式系统中的应用,结合实际案例和最新技术,帮助读者全面掌握微服务的实现与优化。 ... [详细]
  • 深入解析:OpenShift Origin环境下的Kubernetes Spark Operator
    本文探讨了如何在OpenShift Origin平台上利用Kubernetes Spark Operator来管理和部署Apache Spark集群与应用。作为Radanalytics.io项目的一部分,这一开源工具为大数据处理提供了强大的支持。 ... [详细]
  • 深入解析BookKeeper的设计与应用场景
    本文介绍了由Yahoo在2009年开发并于2011年开源的BookKeeper技术。BookKeeper是一种高效且可靠的日志流存储解决方案,广泛应用于需要高性能和强数据持久性的场景。 ... [详细]
  • 本文探讨了大型服务端开发过程中常见的几个误区,包括异步任务处理不当、日志同步模式使用、网络操作未设置超时、缓存命中率及响应时间未统计、单一缓存模式、分布式缓存加锁不当以及团队管理上的误区,旨在帮助开发者避免这些常见错误。 ... [详细]
  • 本文探讨了Web开发与游戏开发之间的主要区别,旨在帮助开发者更好地理解两种开发领域的特性和需求。文章基于作者的实际经验和网络资料整理而成。 ... [详细]
  • 在Linux系统上构建Web服务器的详细步骤
    本文详细介绍了如何在Linux系统上搭建Web服务器的过程,包括安装Apache、PHP和MySQL等关键组件,以及遇到的一些常见问题及其解决方案。 ... [详细]
  • 本文将详细介绍如何在ThinkPHP6框架中实现多数据库的部署,包括读写分离的策略,以及如何通过负载均衡和MySQL同步技术优化数据库性能。 ... [详细]
  • APM(Application Performance Management,应用性能管理)对于提供互联网服务的企业至关重要。本文将深入探讨APM如何帮助识别和解决导致用户流失的技术问题,以及它在提升整体用户体验方面的作用。 ... [详细]
  • 详解Linux系统启动过程及/etc/rc.d与/etc/rc.d/init.d目录关系
    本文深入探讨了Linux系统启动流程、运行级别及其与/etc/rc.d和/etc/rc.d/init.d目录的关联,旨在帮助读者理解系统启动时各脚本和服务的加载机制。 ... [详细]
author-avatar
W14154988
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有