热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

深入解析Java如何利用Redis实现高效消息队列

本篇文章主要介绍了Java利用Redis实现消息队列的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧本文介绍了Java利用Redis实现消息队

本篇文章主要介绍了Java利用Redis实现消息队列的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下:

应用场景

为什么要用redis?

二进制存储、java序列化传输、IO连接数高、连接频繁

一、序列化

这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口;

其代码如下:

package Utils;import java.io.*;/** * Created by Kinglf on 2016/10/17. */public class ObjectUtil { /**  * 对象转byte[]  * @param obj  * @return  * @throws IOException  */ public static byte[] object2Bytes(Object obj) throws IOException{  ByteArrayOutputStream bo=new ByteArrayOutputStream();  ObjectOutputStream oo=new ObjectOutputStream(bo);  oo.writeObject(obj);  byte[] bytes=bo.toByteArray();  bo.close();  oo.close();  return bytes; } /**  * byte[]转对象  * @param bytes  * @return  * @throws Exception  */ public static Object bytes2Object(byte[] bytes) throws Exception{  ByteArrayInputStream in=new ByteArrayInputStream(bytes);  ObjectInputStream sIn=new ObjectInputStream(in);  return sIn.readObject(); }}

二、消息类(实现Serializable接口)

package Model;import java.io.Serializable;/** * Created by Kinglf on 2016/10/17. */public class Message implements Serializable { private static final long serialVersiOnUID= -389326121047047723L; private int id; private String content; public Message(int id, String content) {  this.id = id;  this.cOntent= content; } public int getId() {  return id; } public void setId(int id) {  this.id = id; } public String getContent() {  return content; } public void setContent(String content) {  this.cOntent= content; }}

三、Redis的操作

利用redis做队列,我们采用的是redis中list的push和pop操作;

结合队列的特点:

只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则 Redis中lpush头入(rpop尾出)或rpush尾入(lpop头出)可以满足要求,而Redis中list药push或 pop的对象仅需要转换成byte[]即可

java采用Jedis进行Redis的存储和Redis的连接池设置

上代码:

package Utils;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPoolConfig;import java.util.List;import java.util.Map;import java.util.Set;/** * Created by Kinglf on 2016/10/17. */public class JedisUtil { private static String JEDIS_IP; private static int JEDIS_PORT; private static String JEDIS_PASSWORD; private static JedisPool jedisPool; static {  //Configuration自行写的配置文件解析类,继承自Properties  Configuration cOnf=Configuration.getInstance();  JEDIS_IP=conf.getString("jedis.ip","127.0.0.1");  JEDIS_PORT=conf.getInt("jedis.port",6379);  JEDIS_PASSWORD=conf.getString("jedis.password",null);  JedisPoolConfig cOnfig=new JedisPoolConfig();  config.setMaxActive(5000);  config.setMaxIdle(256);  config.setMaxWait(5000L);  config.setTestOnBorrow(true);  config.setTestOnReturn(true);  config.setTestWhileIdle(true);  config.setMinEvictableIdleTimeMillis(60000L);  config.setTimeBetweenEvictionRunsMillis(3000L);  config.setNumTestsPerEvictionRun(-1);  jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000); } /**  * 获取数据  * @param key  * @return  */ public static String get(String key){  String value=null;  Jedis jedis=null;  try{   jedis=jedisPool.getResource();   value=jedis.get(key);  }catch (Exception e){   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  }finally {   close(jedis);  }  return value; } private static void close(Jedis jedis) {  try{   jedisPool.returnResource(jedis);  }catch (Exception e){   if(jedis.isConnected()){    jedis.quit();    jedis.disconnect();   }  } } public static byte[] get(byte[] key){  byte[] value = null;  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   value = jedis.get(key);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  }  return value; } public static void set(byte[] key, byte[] value) {  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   jedis.set(key, value);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  } } public static void set(byte[] key, byte[] value, int time) {  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   jedis.set(key, value);   jedis.expire(key, time);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  } } public static void hset(byte[] key, byte[] field, byte[] value) {  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   jedis.hset(key, field, value);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  } } public static void hset(String key, String field, String value) {  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   jedis.hset(key, field, value);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  } } /**  * 获取数据  *  * @param key  * @return  */ public static String hget(String key, String field) {  String value = null;  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   value = jedis.hget(key, field);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  }  return value; } /**  * 获取数据  *  * @param key  * @return  */ public static byte[] hget(byte[] key, byte[] field) {  byte[] value = null;  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   value = jedis.hget(key, field);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  }  return value; } public static void hdel(byte[] key, byte[] field) {  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   jedis.hdel(key, field);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  } } /**  * 存储REDIS队列 顺序存储  * @param key reids键名  * @param value 键值  */ public static void lpush(byte[] key, byte[] value) {  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   jedis.lpush(key, value);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  } } /**  * 存储REDIS队列 反向存储  * @param key reids键名  * @param value 键值  */ public static void rpush(byte[] key, byte[] value) {  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   jedis.rpush(key, value);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  } } /**  * 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端  * @param key reids键名  * @param destination 键值  */ public static void rpoplpush(byte[] key, byte[] destination) {  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   jedis.rpoplpush(key, destination);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  } } /**  * 获取队列数据  * @param key 键名  * @return  */ public static List lpopList(byte[] key) {  List list = null;  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   list = jedis.lrange(key, 0, -1);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  }  return list; } /**  * 获取队列数据  * @param key 键名  * @return  */ public static byte[] rpop(byte[] key) {  byte[] bytes = null;  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   bytes = jedis.rpop(key);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  }  return bytes; } public static void hmset(Object key, Map hash) {  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   jedis.hmset(key.toString(), hash);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  } } public static void hmset(Object key, Map hash, int time) {  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   jedis.hmset(key.toString(), hash);   jedis.expire(key.toString(), time);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  } } public static List hmget(Object key, String... fields) {  List result = null;  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   result = jedis.hmget(key.toString(), fields);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  }  return result; } public static Set hkeys(String key) {  Set result = null;  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   result = jedis.hkeys(key);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  }  return result; } public static List lrange(byte[] key, int from, int to) {  List result = null;  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   result = jedis.lrange(key, from, to);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  }  return result; } public static Map hgetAll(byte[] key) {  Map result = null;  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   result = jedis.hgetAll(key);  } catch (Exception e) {   //释放redis对象   jed本文来源gaodai$ma#com搞$$代**码网isPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  }  return result; } public static void del(byte[] key) {  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   jedis.del(key);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  } } public static long llen(byte[] key) {  long len = 0;  Jedis jedis = null;  try {   jedis = jedisPool.getResource();   jedis.llen(key);  } catch (Exception e) {   //释放redis对象   jedisPool.returnBrokenResource(jedis);   e.printStackTrace();  } finally {   //返还到连接池   close(jedis);  }  return len; }}

四、Configuration主要用于读取Redis的配置信息

package Utils;import java.io.IOException;import java.io.InputStream;import java.util.Properties;/** * Created by Kinglf on 2016/10/17. */public class Configuration extends Properties { private static final long serialVersiOnUID= -2296275030489943706L; private static Configuration instance = null; public static synchronized Configuration getInstance() {  if (instance == null) {   instance = new Configuration();  }  return instance; } public String getProperty(String key, String defaultValue) {  String val = getProperty(key);  return (val == null || val.isEmpty()) ? defaultValue : val; } public String getString(String name, String defaultValue) {  return this.getProperty(name, defaultValue); } public int getInt(String name, int defaultValue) {  String val = this.getProperty(name);  return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val); } public long getLong(String name, long defaultValue) {  String val = this.getProperty(name);  return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val); } public float getFloat(String name, float defaultValue) {  String val = this.getProperty(name);  return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val); } public double getDouble(String name, double defaultValue) {  String val = this.getProperty(name);  return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val); } public byte getByte(String name, byte defaultValue) {  String val = this.getProperty(name);  return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val); } public Configuration() {  InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");  try {   this.loadFromXML(in);   in.close();  } catch (IOException ioe) {  } }}

五、测试

import Model.Message;import Utils.JedisUtil;import Utils.ObjectUtil;import redis.clients.jedis.Jedis;import java.io.IOException;/** * Created by Kinglf on 2016/10/17. */public class TestRedisQueue { public static byte[] redisKey = "key".getBytes(); static {  try {   init();  } catch (IOException e) {   e.printStackTrace();  } } private static void init() throws IOException {  for (int i = 0; i <1000000; i++) {   Message message = new Message(i, "这是第" + i + "个内容");   JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message));  } } public static void main(String[] args) {  try {   pop();  } catch (Exception e) {   e.printStackTrace();  } } private static void pop() throws Exception {  byte[] bytes = JedisUtil.rpop(redisKey);  Message msg = (Message) ObjectUtil.bytes2Object(bytes);  if (msg != null) {   System.out.println(msg.getId() + "----" + msg.getContent());  } }}

每执行一次pop()方法,结果如下:
1----这是第1个内容
2----这是第2个内容
3----这是第3个内容
4----这是第4个内容

总结

至此,整个Redis消息队列的生产者和消费者代码已经完成

1.Message 需要传送的实体类(需实现Serializable接口)

2.Configuration Redis的配置读取类,继承自Properties

3.ObjectUtil 将对象和byte数组双向转换的工具类

4.Jedis 通过消息队列的先进先出(FIFO)的特点结合Redis的list中的push和pop操作进行封装的工具类

以上就是Java如何使用Redis来实现消息队列的具体分析的详细内容,更多请关注gaodaima其它相关文章!



推荐阅读
  • 从 .NET 转 Java 的自学之路:IO 流基础篇
    本文详细介绍了 Java 中的 IO 流,包括字节流和字符流的基本概念及其操作方式。探讨了如何处理不同类型的文件数据,并结合编码机制确保字符数据的正确读写。同时,文中还涵盖了装饰设计模式的应用,以及多种常见的 IO 操作实例。 ... [详细]
  • 本文深入探讨了HTTP请求和响应对象的使用,详细介绍了如何通过响应对象向客户端发送数据、处理中文乱码问题以及常见的HTTP状态码。此外,还涵盖了文件下载、请求重定向、请求转发等高级功能。 ... [详细]
  • 本文详细探讨了HTML表单中GET和POST请求的区别,包括它们的工作原理、数据传输方式、安全性及适用场景。同时,通过实例展示了如何在Servlet中处理这两种请求。 ... [详细]
  • 深入解析Redis内存对象模型
    本文详细介绍了Redis内存对象模型的关键知识点,包括内存统计、内存分配、数据存储细节及优化策略。通过实际案例和专业分析,帮助读者全面理解Redis内存管理机制。 ... [详细]
  • 本文介绍如何使用阿里云的fastjson库解析包含时间戳、IP地址和参数等信息的JSON格式文本,并进行数据处理和保存。 ... [详细]
  • 作者:守望者1028链接:https:www.nowcoder.comdiscuss55353来源:牛客网面试高频题:校招过程中参考过牛客诸位大佬的面经,但是具体哪一块是参考谁的我 ... [详细]
  • 在软件开发过程中,MD5加密是一种常见的数据保护手段。本文将详细介绍如何在C#中使用两种不同的方式来实现MD5加密:字符串加密和流加密。 ... [详细]
  • JavaScript 基础语法指南
    本文详细介绍了 JavaScript 的基础语法,包括变量、数据类型、运算符、语句和函数等内容,旨在为初学者提供全面的入门指导。 ... [详细]
  • 采用IKE方式建立IPsec安全隧道
    一、【组网和实验环境】按如上的接口ip先作配置,再作ipsec的相关配置,配置文本见文章最后本文实验采用的交换机是H3C模拟器,下载地址如 ... [详细]
  • 深入解析Java枚举及其高级特性
    本文详细介绍了Java枚举的概念、语法、使用规则和应用场景,并探讨了其在实际编程中的高级应用。所有相关内容已收录于GitHub仓库[JavaLearningmanual](https://github.com/Ziphtracks/JavaLearningmanual),欢迎Star并持续关注。 ... [详细]
  • 深入解析Java虚拟机(JVM)架构与原理
    本文旨在为读者提供对Java虚拟机(JVM)的全面理解,涵盖其主要组成部分、工作原理及其在不同平台上的实现。通过详细探讨JVM的结构和内部机制,帮助开发者更好地掌握Java编程的核心技术。 ... [详细]
  • 深入解析for与foreach遍历集合时的性能差异
    本文将详细探讨for循环和foreach(迭代器)在遍历集合时的性能差异,并通过实际代码示例和源码分析,帮助读者理解这两种遍历方式的不同之处。文章内容丰富且专业,旨在为编程爱好者提供有价值的参考。 ... [详细]
  • 并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性
    Java并发编程实践目录并发编程01——ThreadLocal并发编程02——ConcurrentHashMap并发编程03——阻塞队列和生产者-消费者模式并发编程04——闭锁Co ... [详细]
  • 深入解析 Android IPC 中的 Messenger 机制
    本文详细介绍了 Android 中基于消息传递的进程间通信(IPC)机制——Messenger。通过实例和源码分析,帮助开发者更好地理解和使用这一高效的通信工具。 ... [详细]
  • 本文详细介绍了数组和线性表这两种常见的数据结构。数组是一种由类型名、标识符及维度构成的复合数据类型,其元素类型由类型名决定,维数表示数组中元素的数量。线性表则是一种逻辑结构,其中的数据元素呈现一对一的关系,便于实现和操作。 ... [详细]
author-avatar
求道金林
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有