热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:RabbitMQRabbitMQ高级特性

本文由编程笔记#小编为大家整理,主要介绍了RabbitMQRabbitMQ高级特性相关的知识,希望对你有一定的参考价值。消息
本文由编程笔记#小编为大家整理,主要介绍了RabbitMQ RabbitMQ高级特性相关的知识,希望对你有一定的参考价值。



消息如何保障100%的投递成功


什么是生产端的可靠性投递



  • 保障消息的成功发出

  • 保障MQ节点的成功接收

  • 发送端收到MQ节点(Borker)确认应答

  • 完善的消息进行补偿机制


生产端-可靠性投递(一)



  • 消息落库,对消息状态进行打标

  • 消息的延迟投递,做二次确认,回调检查


生产端-可靠性投递(二)

消息落库,对消息状态进行打标
技术图片


消息落库,对消息进行打标(对消息设置状态,发送中,broker收到,)

定时器轮训,检测未发送的消息,进行二次投递,最大努力尝试(设置最大次数)



  • step1 消息落库(唯一的消息id) ,一定是数据库入库成功以后在进行发送消息

  • step2 发送消息 到MQ Broker

  • step3 Broker Confirm (发送消息确认)

  • step4 生产者ConfirmListener (异步监听,Broker回送的响应)

  • step5 成功,通过messageId更新消息状态

补偿

分布式定时任务,抓取数据(超过第一时长),尝试重发,重试次数限制


生产端-可靠性投递(三)

消息的延迟投递,做二次确认,回调检查 (最大限度的减少消息落库)
技术图片
方案一在高并发场景下,每次消息落库,影响性能(IO操作)

step1: 业务消息落库 ,一定是数据库入库成功以后在进行发送消息

step2:第一次消息的发送

step3:延迟消息的检测

step4:监听,处理完,生成一条新消息

step5:通过队列发送,确认 不是之前的ack


幂等性概念


幂等性是什么

借鉴数据库的乐观锁机制

执行一条更新SQL

update t_reps set count=count-1,version=version+1 where verison=1

消费端幂等性保障

在业务高峰期,如何避免消息的重复消费问题

消费端实现幂等性,就意味着,我们的消息永远不会被消费多次,即时收到多条一样的消息。

幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错
https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/how-to-ensure-that-messages-are-not-repeatedly-consumed.md


业界主流的幂等性操作



  • 唯一ID+指纹码机制,利用数据库主键去重

  • 利用Redis的原子性实现

方案一

唯一ID+指纹码机制



  • 唯一ID+指纹码机制,利用数据库主键去重

  • select count(1) from t_order where id=唯一id+指纹码

  • 好处:实现简单

  • 坏处:高并发下有数据库写入的性能瓶颈

  • 解决方案:跟进ID进行分库分表进行路由算法

方案二:利用Redis的原子性实现


Confirm确认消息

理解Confirm消息确认机制:



  • 消息的去人,是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答

  • 生产者进行接收应该,用来确定这条消息是否正常的发送到broker,这种方式也是消息的可靠性投递的核心保障

confirm确认消息流程解析
技术图片
confirm确认消息实现



  • 第一步:在channel上开启确认模式: channel.sconfirmSelect()

  • 第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理。

示例

生产者

/**
* @author niugang
*/
public class Producer {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 获取C onnection
Connection cOnnection= connectionFactory.newConnection();

//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
//4 指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect();

String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";

//5 发送一条消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

//6 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
//失败
// deliveryTag 消息唯一标签
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!-----------");
}
//成功
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
}
}

消费者

/**
* @author niugang
*/
public class Consumer {

public static void main(String[] args) throws Exception {


//1 创建ConnectionFactory
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

//2 获取C onnection
Connection cOnnection= connectionFactory.newConnection();

//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();

String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.#";
String queueName = "test_confirm_queue";

//4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

//5 创建消费者
QueueingConsumer queueingCOnsumer= new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);

while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());

System.err.println("消费端: " + msg);
}


}
}

Return消息机制



  • Return Listener 用于处理一些不可路由的消息。

  • 我们的消息生产者,通过指定一个Exchange和RoutingKey,把消息送达到某一个队列中取,然后我们的消费者监听队列,进行消费处理操作。

  • 但是在某些情况下,如果我们在发送消息的时候,当前的Exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener

配置

在基础API上有一个关键的配置项

Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息。

流程
技术图片
生产者

/**
* @author niugang
*/
public class Producer {
public static void main(String[] args) throws Exception {


ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();

String exchange = "test_return_exchange";
String routingKey = "return.save";
String routingKeyError = "abc.save";

String msg = "Hello RabbitMQ Return Message";


channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.err.println("---------handle return----------");
//响应码 312
System.err.println("replyCode: " + replyCode);
//NO_ROUTE
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
});


channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());

//channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
}
}

消费者

/**
* @author niugang
*/
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();

String exchangeName = "test_return_exchange";
String routingKey = "return.#";
String queueName = "test_return_queue";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
QueueingConsumer queueingCOnsumer= new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);

while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费者: " + msg);
}
}
}

消费者自定义监听



  • 我们一般就是在代码中编写while循环,进行consumer.nextDelivery方法获取下一条消息,然后进行消费处理!

  • 但是我们使用自定义的Consumer更加的方便,解耦性更加的强,也是在实际工作中最常用的使用方式!

实现方式

自定义类,继承 DefaultConsumer
技术图片
生产者

/**
* @author niugang
*/
public class Producer {

public static void main(String[] args) throws Exception {

ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();

String exchange = "test_consumer_exchange";
String routingKey = "consumer.save";

String msg = "Hello RabbitMQ Consumer Message";

for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}

}
}

自定义消费者

/**
* 自定义消费者
* @author niugang
*/
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
//消费标签
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}

消费者

/**
* @author niugang
*/
public class Consumer {

public static void main(String[] args) throws Exception {


ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();


String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.#";
String queueName = "test_consumer_queue";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

channel.basicConsume(queueName, true, new MyConsumer(channel));


}
}

消费端限流

什么是消费端的限流



  • 假设一个场景,首先,我们RabbitMQ服务器上有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况:

  • 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据;

  • RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息(autoAck为false)的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。

  • void BasicQos(uint prefetchSize,ushort prefetchCount,bool global);

  • prefetchSize:0 #这里为0表示不限制

  • prefetchCount: 会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack; (prefetchCount等于1即可)

  • global:truefalse 是否将上面设置应用于channel

  • 简单来说,就是上面限制是channel级别的还是consumer级别;

生产者

/**
* @author niugang
*/
public class Producer {

public static void main(String[] args) throws Exception {

ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhosy");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();

String exchange = "test_qos_exchange";
String routingKey = "qos.save";

String msg = "Hello RabbitMQ QOS Message";

for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}

}
}

自定义消费者

public class MyConsumer extends DefaultConsumer {
private Channel channel ;

public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
//ack 注释掉后 控制台只会接收到一条消息
channel.basicAck(envelope.getDeliveryTag(), false);

}
}

消费者

/**
* @author niugang
*/
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

//1 限流方式 第一件事就是 autoAck设置为 false
//prefetchCount broker 给 消费者 最大推送消息数量
channel.basicQos(0, 1, false);
//手工签收
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}

消息ACK与重回队列

消费端的手工ACK和NACK



  • 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿

  • 如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功

消费端的重回队列



  • 消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker (requeue属性设置)



  • 一般我们在实际应用中,都会关闭重回队列,也就是设置为false;



生产者

/**
* ack 测试生产者
* @author niugang
*/
public class Producer {
public static void main(String[] args) throws Exception {

ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_ack_exchange";
String routingKey = "ack.save";
for(int i =0; i<5; i ++){

Map headers = new HashMap();
headers.put("num", i);
//设置消息属性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2).expiration("1000")
.contentEncoding("UTF-8")
.headers(headers)
.build();
String msg = "Hello RabbitMQ ACK Message " + i;
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}

}
}

自定义消费者

/**
* 自定义消费者
* @author niugang
*/
public class MyConsumer extends DefaultConsumer {
private Channel channel ;

public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
//multiple 是否是批量
//requeue 重新添加到队列尾部
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}

}
}

消费者

/**
* 消费者
* @author niugang
*/
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "ack.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 手工签收 必须要关闭 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}

TTL队列/消息

TTL



  • TTL是Time To Live的缩写,也就是生存时间

  • RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。

  • RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的删除。

在控制台创建队列
技术图片
在控制台创建exchange 并添加binding 然后发送消息,然后在队列页面可以看到消息自动被队列剔除
技术图片
原生API设置TTL

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("1000").build();

Spring AMQP设置TTL

MessageProperties messageProperties = new MessageProperties();
//消息过期时间
messageProperties.setExpiration("1000");
Message stringMessage = new Message("Hello Springboot RabbitMQ".getBytes(), messageProperties);

死信队列

死信队列:DLX ,Dead-Letter-Exchange



  • 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX;

消息变成死信有以下几种情况



  • 消息被拒绝(basic.reject/basic.nack) 并且requeue=false;

  • 消息TTL过期

  • 队列达到最大长度

死信队列详细解释

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能;

死信队列具体设置

step1:首先需要设置死信队列的exchange和queue,然后进行绑定

例如定义如下exchange和queue



  • Exchange:dlx.echange

  • Queue:dlx.queue

  • RoutingKey:#

step2:

然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可:arguments.put(“x-dead-letter-exchange”,“dlx.exchange”);

这样消息在过期、requeue、队列在达到最大长度时,消息就可以直接路由到死信队列。

技术图片
生产者

/**
* 私信队列 生产端
*
* @author niugang
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();
//自定义普通的exchange
String exchange = "test_dlx_exchange";
String routingKey = "dlx.save";
String msg = "Hello RabbitMQ DLX Message";
for (int i = 0; i <1; i++) {
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
//过期时间为10s
.expiration("10000")
.build();
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}

自定义消息消费

/**
* 自定义消息消费
*
* @author niugang
*/
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}

生产者

/**
* 死信队列消费端
*
* @author niugang
*/
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory cOnnectionFactory= new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection cOnnection= connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 这就是一个普通的交换机 和 队列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map agruments = new HashMap(16);
//设置死信队列exchange 这些具体的参数可以通过rabbitmq控制台查看
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
//要进行死信队列的声明:
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}

技术图片



















推荐阅读
  • 开发心得:利用 Redis 构建分布式系统的轻量级协调机制
    开发心得:利用 Redis 构建分布式系统的轻量级协调机制 ... [详细]
  • 2019年后蚂蚁集团与拼多多面试经验详述与深度剖析
    2019年后蚂蚁集团与拼多多面试经验详述与深度剖析 ... [详细]
  • NoSQL数据库,即非关系型数据库,有时也被称作Not Only SQL,是一种区别于传统关系型数据库的管理系统。这类数据库设计用于处理大规模、高并发的数据存储与查询需求,特别适用于需要快速读写大量非结构化或半结构化数据的应用场景。NoSQL数据库通过牺牲部分一致性来换取更高的可扩展性和性能,支持分布式部署,能够有效应对互联网时代的海量数据挑战。 ... [详细]
  • 作为140字符的开创者,Twitter看似简单却异常复杂。其简洁之处在于仅用140个字符就能实现信息的高效传播,甚至在多次全球性事件中超越传统媒体的速度。然而,为了支持2亿用户的高效使用,其背后的技术架构和系统设计则极为复杂,涉及高并发处理、数据存储和实时传输等多个技术挑战。 ... [详细]
  • 本文详细介绍了HDFS的基础知识及其数据读写机制。首先,文章阐述了HDFS的架构,包括其核心组件及其角色和功能。特别地,对NameNode进行了深入解析,指出其主要负责在内存中存储元数据、目录结构以及文件块的映射关系,并通过持久化方案确保数据的可靠性和高可用性。此外,还探讨了DataNode的角色及其在数据存储和读取过程中的关键作用。 ... [详细]
  • Spring Security 认证模块的项目构建与初始化
    本文详细介绍了如何构建和初始化Spring Security认证模块的项目。首先,通过创建一个分布式Maven聚合工程,该工程包含四个模块,分别为core、browser(用于演示)、app等,以构成完整的SeehopeSecurity项目。在项目构建过程中,还涉及日志生成机制,确保能够输出关键信息,便于调试和监控。 ... [详细]
  • Java服务问题快速定位与解决策略全面指南 ... [详细]
  • 结语 | 《探索二进制世界:软件安全与逆向分析》读书笔记:深入理解二进制代码的逆向工程方法
    结语 | 《探索二进制世界:软件安全与逆向分析》读书笔记:深入理解二进制代码的逆向工程方法 ... [详细]
  • Spring框架入门指南:专为新手打造的详细学习笔记
    Spring框架是Java Web开发中广泛应用的轻量级应用框架,以其卓越的功能和出色的性能赢得了广大开发者的青睐。本文为初学者提供了详尽的学习指南,涵盖基础概念、核心组件及实际应用案例,帮助新手快速掌握Spring框架的核心技术与实践技巧。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 修复一个 Bug 竟耗时两天?真的有那么复杂吗?
    修复一个 Bug 竟然耗费了两天时间?这背后究竟隐藏着怎样的复杂性?本文将深入探讨这个看似简单的 Bug 为何会如此棘手,从代码层面剖析问题根源,并分享解决过程中遇到的技术挑战和心得。 ... [详细]
  • 本文深入探讨了IO复用技术的原理与实现,重点分析了其在解决C10K问题中的关键作用。IO复用技术允许单个进程同时管理多个IO对象,如文件、套接字和管道等,通过系统调用如`select`、`poll`和`epoll`,高效地处理大量并发连接。文章详细介绍了这些技术的工作机制,并结合实际案例,展示了它们在高并发场景下的应用效果。 ... [详细]
  • 如何正确配置与使用日志组件:Log4j、SLF4J及Logback的连接与整合方法
    在当前的软件开发实践中,无论是开源项目还是日常工作中,日志框架都是不可或缺的工具之一。本文详细探讨了如何正确配置与使用Log4j、SLF4J及Logback这三个流行的日志组件,并深入解析了它们之间的连接与整合方法,旨在帮助开发者高效地管理和优化日志记录流程。 ... [详细]
  • ActiveMQ是由Apache开发的一款广受欢迎且功能强大的开源消息中间件。作为完全符合JMS 1.1和J2EE 1.4规范的JMS Provider实现,尽管JMS规范已问世多年,但ActiveMQ依然保持了其在消息队列领域的领先地位。本文将带你初步了解ActiveMQ的核心概念及其应用场景,帮助你快速入门这一重要的消息传递技术。 ... [详细]
  • 探讨 jBPM 数据库表结构设计的精要与实践
    探讨 jBPM 数据库表结构设计的精要与实践 ... [详细]
author-avatar
优雅的鼻涕泡
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有