热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

基于ZooKeeper实现队列源码

这篇文章主要介绍了基于ZooKeeper实现队列源码的相关内容,包括其实现原理和应用场景,以及对队列的简单介绍,具有一定参考价值,需要的朋友可以了解下。

实现原理

先进先出队列是最常用的队列,使用Zookeeper实现先进先出队列就是在特定的目录下创建PERSISTENT_EQUENTIAL节点,创建成功时Watcher通知等待的队列,队列删除序列号最小的节点用以消费。此场景下Zookeeper的znode用于消息存储,znode存储的数据就是消息队列中的消息内容,SEQUENTIAL序列号就是消息的编号,按序取出即可。由于创建的节点是持久化的,所以不必担心队列消息的丢失问题。

队列(Queue)

分布式队列是通用的数据结构,为了在 Zookeeper 中实现分布式队列,首先需要指定一个 Znode 节点作为队列节点(queue node), 各个分布式客户端通过调用 create() 函数向队列中放入数据,调用create()时节点路径名带"qn-"结尾,并设置顺序(sequence)节点标志。 由于设置了节点的顺序标志,新的路径名具有以下字符串模式:"_path-to-queue-node_/qn-X",X 是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用 getChildren() 函数,有数据则获取(获取数据后可以删除也可以不删),没有则在队列节点(queue node)上将 watch 设置为 true,等待触发并处理最小序号的节点(即从序号最小的节点中取数据)。

应用场景

Zookeeper队列不太适合要求高性能的场合,但可以在数据量不大的情况下考虑使用。比如已在项目中使用Zookeeper又需要小规模的队列应用,这时可以使用Zookeeper实现的队列;毕竟引进一个消息中间件会增加系统的复杂性和运维的压力。

详细代码

ZookeeperClient工具类

package org.massive.common; 
import org.apache.zookeeper.WatchedEvent; 
import org.apache.zookeeper.Watcher; 
import org.apache.zookeeper.ZooKeeper; 
import java.io.IOException; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.TimeUnit; 
/** 
 * Created by Massive on 2016/12/18. 
 */ 
public class ZooKeeperClient { 
 private static String cOnnectionString= "localhost:2181"; 
 private static int sessiOnTimeout= 10000; 
 public static ZooKeeper getInstance() throws IOException, InterruptedException { 
 //-------------------------------------------------------------- 
 // 为避免连接还未完成就执行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss) 
 // 这里等Zookeeper的连接完成才返回实例 
 //-------------------------------------------------------------- 
 final CountDownLatch cOnnectedSignal= new CountDownLatch(1); 
 ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() { 
  @Override 
  public void process(WatchedEvent event) { 
   if (event.getState() == Event.KeeperState.SyncConnected) { 
   connectedSignal.countDown(); 
   } else if (event.getState() == Event.KeeperState.Expired) { 
   } 
  } 
  }); 
 connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS); 
 return zk; 
 } 
 public static int getSessionTimeout() { 
 return sessionTimeout; 
 } 
 public static void setSessionTimeout(int sessionTimeout) { 
 ZooKeeperClient.sessiOnTimeout= sessionTimeout; 
 } 
}

ZooKeeperQueue

package org.massive.queue; 
import org.apache.commons.lang3.RandomUtils; 
import org.apache.zookeeper.*; 
import org.apache.zookeeper.data.Stat; 
import org.massive.common.ZooKeeperClient; 
import java.io.IOException; 
import java.io.UnsupportedEncodingException; 
import java.util.List; 
import java.util.SortedSet; 
import java.util.TreeSet; 
/** 
 * Created by Allen on 2016/12/22. 
 */ 
public class ZooKeeperQueue { 
 private ZooKeeper zk; 
 private int sessionTimeout; 
 private static byte[] ROOT_QUEUE_DATA = {0x12,0x34}; 
 private static String QUEUE_ROOT = "/QUEUE"; 
 private String queueName; 
 private String queuePath; 
 private Object mutex = new Object(); 
 public ZooKeeperQueue(String queueName) throws IOException, KeeperException, InterruptedException { 
 this.queueName = queueName; 
 this.queuePath = QUEUE_ROOT + "/" + queueName; 
 this.zk = ZooKeeperClient.getInstance(); 
 this.sessiOnTimeout= zk.getSessionTimeout(); 
 //---------------------------------------------------- 
 // 确保队列根目录/QUEUE和当前队列的目录的存在 
 //---------------------------------------------------- 
 ensureExists(QUEUE_ROOT); 
 ensureExists(queuePath); 
 } 
 public byte[] consume() throws InterruptedException, KeeperException, UnsupportedEncodingException { 
 List nodes = null; 
 byte[] returnVal = null; 
 Stat stat = null; 
 do { 
  synchronized (mutex) { 
  nodes = zk.getChildren(queuePath, new ProduceWatcher()); 
  //---------------------------------------------------- 
  // 如果没有消息节点,等待生产者的通知 
  //---------------------------------------------------- 
  if (nodes == null || nodes.size() == 0) { 
   mutex.wait(); 
  } else { 
   SortedSet sortedNode = new TreeSet(); 
   for (String node : nodes) { 
   sortedNode.add(queuePath + "/" + node); 
   } 
   //---------------------------------------------------- 
   // 消费队列里序列号最小的消息 
   //---------------------------------------------------- 
   String first = sortedNode.first(); 
   returnVal = zk.getData(first, false, stat); 
   zk.delete(first, -1); 
   System.out.print(Thread.currentThread().getName() + " "); 
   System.out.print("consume a message from queue:" + first); 
   System.out.println(", message data is: " + new String(returnVal,"UTF-8")); 
   return returnVal; 
  } 
  } 
 } while (true); 
 } 
 class ProduceWatcher implements Watcher { 
 @Override 
 public void process(WatchedEvent event) { 
  //---------------------------------------------------- 
  // 生产一条消息成功后通知一个等待线程 
  //---------------------------------------------------- 
  synchronized (mutex) { 
  mutex.notify(); 
  } 
 } 
 } 
 public void produce(byte[] data) throws KeeperException, InterruptedException, UnsupportedEncodingException { 
 //---------------------------------------------------- 
 // 确保当前队列目录存在 
 // example: /QUEUE/queueName 
 //---------------------------------------------------- 
 ensureExists(queuePath); 
 String node = zk.create(queuePath + "/", data, 
  ZooDefs.Ids.OPEN_ACL_UNSAFE, 
  CreateMode.PERSISTENT_SEQUENTIAL); 
 System.out.print(Thread.currentThread().getName() + " "); 
 System.out.print("produce a message to queue:" + node); 
 System.out.println(" , message data is: " + new String(data,"UTF-8")); 
 } 
 public void ensureExists(String path) { 
 try { 
  Stat stat = zk.exists(path, false); 
  if (stat == null) { 
  zk.create(path, ROOT_QUEUE_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 
  } 
 } catch (KeeperException e) { 
  e.printStackTrace(); 
 } catch (InterruptedException e) { 
  e.printStackTrace(); 
 } 
 } 
 public static void main(String[] args) throws IOException, InterruptedException, KeeperException { 
 String queueName = "test"; 
 final ZooKeeperQueue queue = new ZooKeeperQueue(queueName); 
 for (int i = 0; i <10; i++) { 
  new Thread(new Runnable() { 
  @Override 
  public void run() { 
   try { 
   queue.consume(); 
   System.out.println("--------------------------------------------------------"); 
   System.out.println(); 
   } catch (InterruptedException e) { 
   e.printStackTrace(); 
   } catch (KeeperException e) { 
   e.printStackTrace(); 
   } catch (UnsupportedEncodingException e) { 
   e.printStackTrace(); 
   } 
  } 
  }).start(); 
 } 
 new Thread(new Runnable() { 
  @Override 
  public void run() { 
  for (int i = 0; i <10; i++) { 
   try { 
   Thread.sleep(RandomUtils.nextInt(100 * i, 200 * i)); 
   queue.produce(("massive" + i).getBytes()); 
   } catch (InterruptedException e) { 
   e.printStackTrace(); 
   } catch (KeeperException e) { 
   e.printStackTrace(); 
   } catch (UnsupportedEncodingException e) { 
   e.printStackTrace(); 
   } 
  } 
  } 
 },"Produce-thread").start(); 
 } 
}

测试

运行main方法,本机器的某次输出结果

Produce-thread produce a message to queue:/QUEUE/test/0000000000 , message data is: massive0 
Thread-8 consume a message from queue:/QUEUE/test/0000000000, message data is: massive0 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000001 , message data is: massive1 
Thread-6 consume a message from queue:/QUEUE/test/0000000001, message data is: massive1 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000002 , message data is: massive2 
Thread-3 consume a message from queue:/QUEUE/test/0000000002, message data is: massive2 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000003 , message data is: massive3 
Thread-0 consume a message from queue:/QUEUE/test/0000000003, message data is: massive3 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000004 , message data is: massive4 
Thread-5 consume a message from queue:/QUEUE/test/0000000004, message data is: massive4 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000005 , message data is: massive5 
Thread-2 consume a message from queue:/QUEUE/test/0000000005, message data is: massive5 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000006 , message data is: massive6 
Thread-4 consume a message from queue:/QUEUE/test/0000000006, message data is: massive6 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000007 , message data is: massive7 
Thread-9 consume a message from queue:/QUEUE/test/0000000007, message data is: massive7 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000008 , message data is: massive8 
Thread-7 consume a message from queue:/QUEUE/test/0000000008, message data is: massive8 
-------------------------------------------------------- 
Produce-thread produce a message to queue:/QUEUE/test/0000000009 , message data is: massive9 
Thread-1 consume a message from queue:/QUEUE/test/0000000009, message data is: massive9 

总结

以上就是本文有关于队列和基于ZooKeeper实现队列源码介绍的全部内容,希望对大家有所帮助。

感谢朋友们对本站的支持!


推荐阅读
  • window下kafka的安装以及测试
    目录一、安装JDK(需要安装依赖javaJDK)二、安装Kafka三、测试参考在Windows系统上安装消息队列kafka一、安装JDKÿ ... [详细]
  • Zookeeper面试常见问题解析
    本文详细介绍了Zookeeper中的ZAB协议、节点类型、ACL权限控制机制、角色分工、工作状态、Watch机制、常用客户端、分布式锁实现、默认通信框架以及消息广播和领导选举的流程。 ... [详细]
  • solrCloud分布式集群安装配置
    solrCloud分布式集群安装配置1.前提安装Zookeeper集群2.安装部署多个solr节点10.41.2.82 ... [详细]
  • Docker的安全基准
    nsitionalENhttp:www.w3.orgTRxhtml1DTDxhtml1-transitional.dtd ... [详细]
  • 深入解析Spring Cloud Ribbon负载均衡机制
    本文详细介绍了Spring Cloud中的Ribbon组件如何实现服务调用的负载均衡。通过分析其工作原理、源码结构及配置方式,帮助读者理解Ribbon在分布式系统中的重要作用。 ... [详细]
  • 网络运维工程师负责确保企业IT基础设施的稳定运行,保障业务连续性和数据安全。他们需要具备多种技能,包括搭建和维护网络环境、监控系统性能、处理突发事件等。本文将探讨网络运维工程师的职业前景及其平均薪酬水平。 ... [详细]
  • Hadoop入门与核心组件详解
    本文详细介绍了Hadoop的基础知识及其核心组件,包括HDFS、MapReduce和YARN。通过本文,读者可以全面了解Hadoop的生态系统及应用场景。 ... [详细]
  • HBase运维工具全解析
    本文深入探讨了HBase常用的运维工具,详细介绍了每种工具的功能、使用场景及操作示例。对于HBase的开发人员和运维工程师来说,这些工具是日常管理和故障排查的重要手段。 ... [详细]
  • 在本周的白板演练中,Apache Flink 的 PMC 成员及数据工匠首席技术官 Stephan Ewen 深入探讨了如何利用保存点功能进行流处理中的数据重新处理、错误修复、系统升级和 A/B 测试。本文将详细解释保存点的工作原理及其应用场景。 ... [详细]
  • 本文详细介绍了 Java 中的 org.apache.hadoop.registry.client.impl.zk.ZKPathDumper 类,提供了丰富的代码示例和使用指南。通过这些示例,读者可以更好地理解如何在实际项目中利用 ZKPathDumper 类进行注册表树的转储操作。 ... [详细]
  • Netflix利用Druid实现高效实时数据分析
    本文探讨了全球领先的在线娱乐公司Netflix如何通过采用Apache Druid,实现了高效的数据采集、处理和实时分析,从而显著提升了用户体验和业务决策的准确性。文章详细介绍了Netflix在系统架构、数据摄取、管理和查询方面的实践,并展示了Druid在大规模数据处理中的卓越性能。 ... [详细]
  • Hadoop发行版本选择指南:技术解析与应用实践
    本文详细介绍了Hadoop的不同发行版本及其特点,帮助读者根据实际需求选择最合适的Hadoop版本。内容涵盖Apache Hadoop、Cloudera CDH等主流版本的特性及应用场景。 ... [详细]
  • 前言无论是对于刚入行工作还是已经工作几年的java开发者来说,面试求职始终是你需要直面的一件事情。首先梳理自己的知识体系,针对性准备,会有事半功倍的效果。我们往往会把重点放在技术上 ... [详细]
  • 1整合dubbo1.1e3-manager-Service1.1.1pom.xml排除jar在e3-manager-Service工程中添加dubbo依赖的jar包。 ... [详细]
  • 本文详细介绍了 Apache ZooKeeper 的 FileTxnLog 类中的 setPreallocSize 方法,并提供了多个实际应用中的代码示例。通过这些示例,读者可以更好地理解如何在不同场景下合理设置日志文件的预分配大小。 ... [详细]
author-avatar
无为小妮子_373
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有