1.普通消费
2. 顺序消费
3.事务消费
在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成。也就是这个三个环节要有顺序,这个订单才有意义。RocketMQ可以保证顺序消费。
produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息
注意:是把把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue
单个节点(Producer端1个、Consumer端1个)
1、Producer.java
package order; import java.util.List; import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * Producer,发送顺序消息 */
public class Producer { public static void main(String[] args) { try { DefaultMQProducer producer &#61; new DefaultMQProducer("order_Producer"); producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); producer.start(); // String[] tags &#61; new String[] { "TagA", "TagB", "TagC", "TagD", // "TagE" }; for (int i &#61; 1; i <&#61; 5; i&#43;&#43;) { Message msg &#61; new Message("TopicOrderTest", "order_1", "KEY" &#43; i, ("order_1 " &#43; i).getBytes()); SendResult sendResult &#61; producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List
}
2、Consumer.java
package order;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt; /** * 顺序消息消费&#xff0c;带事务方式&#xff08;应用可控制Offset什么时候提交&#xff09; */
public class Consumer1 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer &#61; new DefaultMQPushConsumer("order_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动&#xff0c;那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicOrderTest", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes &#61; new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List
结果如下图所示&#xff1a;
这个五条数据被顺序消费了
Producer.java
package order; import java.util.List; import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * Producer&#xff0c;发送顺序消息 */
public class Producer { public static void main(String[] args) { try { DefaultMQProducer producer &#61; new DefaultMQProducer("order_Producer"); producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); producer.start(); // String[] tags &#61; new String[] { "TagA", "TagB", "TagC", "TagD", // "TagE" }; for (int i &#61; 1; i <&#61; 5; i&#43;&#43;) { Message msg &#61; new Message("TopicOrderTest", "order_1", "KEY" &#43; i, ("order_1 " &#43; i).getBytes()); SendResult sendResult &#61; producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List
}
Consumer1.java
/** * 顺序消息消费&#xff0c;带事务方式&#xff08;应用可控制Offset什么时候提交&#xff09; */
public class Consumer1 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer &#61; new DefaultMQPushConsumer("order_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动&#xff0c;那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicOrderTest", "*"); /** * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到 *&#xff0c;第二个线程无法访问这个队列 */ consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes &#61; new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List
Consumer2.java
/** * 顺序消息消费&#xff0c;带事务方式&#xff08;应用可控制Offset什么时候提交&#xff09; */
public class Consumer2 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer &#61; new DefaultMQPushConsumer("order_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动&#xff0c;那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicOrderTest", "*"); /** * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到 *&#xff0c;第二个线程无法访问这个队列 */ consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes &#61; new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List
先启动Consumer1和Consumer2&#xff0c;然后启动Producer&#xff0c;Producer会发送15条消息
Consumer1消费情况如图&#xff0c;都按照顺序执行了
Consumer2消费情况如图&#xff0c;都按照顺序执行了
这里说的主要是分布式事物。下面的例子的数据库分别安装在不同的节点上。
事物消费需要先说说什么是事务。比如说&#xff1a;我们跨行转账&#xff0c;从工商银行转到建设银行&#xff0c;也就是我从工商银行扣除1000元之后&#xff0c;我的建设银行也必须加1000元。这样才能保证数据的一致性。假如工商银行转1000元之后&#xff0c;建设银行的服务器突然宕机&#xff0c;那么我扣除了1000&#xff0c;但是并没有在建设银行给我加1000&#xff0c;就出现了数据的不一致。因此加1000和减1000才行&#xff0c;减1000和减1000必须一起成功&#xff0c;一起失败。
再比如&#xff0c;我们进行网购的时候&#xff0c;我们下单之后&#xff0c;订单提交成功&#xff0c;仓库商品的数量必须减一。但是订单可能是一个数据库&#xff0c;仓库数量可能又是在另个数据库里面。有可能订单提交成功之后&#xff0c;仓库数量服务器突然宕机。这样也出现了数据不一致的问题。
使用消息队列来解决分布式事物&#xff1a;
现在我们去外面饭店吃饭&#xff0c;很多时候都不会直接给了钱之后直接在付款的窗口递饭菜&#xff0c;而是付款之后他会给你一张小票&#xff0c;你拿着这个小票去出饭的窗口取饭。这里和我们的系统类似&#xff0c;提高了吞吐量。即使你到第二个窗口&#xff0c;师傅告诉你已经没饭了&#xff0c;你可以拿着这个凭证去退款&#xff0c;即使中途由于出了意外你无法到达窗口进行取饭&#xff0c;但是只要凭证还在&#xff0c;可以将钱退给你。这样就保证了数据的一致性。
如何保证凭证&#xff08;消息&#xff09;有2种方法&#xff1a;
1、在工商银行扣款的时候&#xff0c;余额表扣除1000&#xff0c;同时记录日志&#xff0c;而且这2个表是在同一个数据库实例中&#xff0c;可以使用本地事物解决。然后我们通知建设银行需要加1000给该用户&#xff0c;建设银行收到之后给我返回已经加了1000给用户的确认信息之后&#xff0c;我再标记日志表里面的日志为已经完成。
2、通过消息中间件
原文地址&#xff1a;http://www.jianshu.com/p/453c6e7ff81c
RocketMQ第一阶段发送Prepared消息时&#xff0c;会拿到消息的地址&#xff0c;第二阶段执行本地事物&#xff0c;第三阶段通过第一阶段拿到的地址去访问消息&#xff0c;并修改消息的状态。
细心的你可能又发现问题了&#xff0c;如果确认消息发送失败了怎么办&#xff1f;RocketMQ会定期扫描消息集群中的事物消息&#xff0c;如果发现了Prepared消息&#xff0c;它会向消息发送端(生产者)确认&#xff0c;Bob的钱到底是减了还是没减呢&#xff1f;如果减了是回滚还是继续发送确认消息呢&#xff1f;RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
例子&#xff1a;
Consumer.java
package transaction; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt; /** * Consumer&#xff0c;订阅消息 */
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer &#61; new DefaultMQPushConsumer("transaction_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); consumer.setConsumeMessageBatchMaxSize(10); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动&#xff0c;那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTransactionTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List
} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
} }); consumer.start(); System.out.println("transaction_Consumer Started."); }
}
Producer.java
package transaction; import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
import com.alibaba.rocketmq.common.message.Message; /** * 发送事务消息例子 * */
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionCheckListener transactionCheckListener &#61; new TransactionCheckListenerImpl(); TransactionMQProducer producer &#61; new TransactionMQProducer("transaction_Producer"); producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); // 事务回查最小并发数 producer.setCheckThreadPoolMinSize(2); // 事务回查最大并发数 producer.setCheckThreadPoolMaxSize(2); // 队列数 producer.setCheckRequestHoldMax(2000); producer.setTransactionCheckListener(transactionCheckListener); producer.start(); // String[] tags &#61; new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" // }; TransactionExecuterImpl tranExecuter &#61; new TransactionExecuterImpl(); for (int i &#61; 1; i <&#61; 2; i&#43;&#43;) { try { Message msg &#61; new Message("TopicTransactionTest", "transaction" &#43; i, "KEY" &#43; i, ("Hello RocketMQ " &#43; i).getBytes()); SendResult sendResult &#61; producer.sendMessageInTransaction(msg, tranExecuter, null); System.out.println(sendResult); Thread.sleep(10); } catch (MQClientException e) { e.printStackTrace(); } } for (int i &#61; 0; i <100000; i&#43;&#43;) { Thread.sleep(1000); } producer.shutdown(); }
}
TransactionExecuterImpl .java --执行本地事务
package transaction; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.common.message.Message; /** * 执行本地事务 */
public class TransactionExecuterImpl implements LocalTransactionExecuter { // private AtomicInteger transactionIndex &#61; new AtomicInteger(1); public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { System.out.println("执行本地事务msg &#61; " &#43; new String(msg.getBody())); System.out.println("执行本地事务arg &#61; " &#43; arg); String tags &#61; msg.getTags(); if (tags.equals("transaction2")) { System.out.println("&#61;&#61;&#61;&#61;&#61;&#61;我的操作&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#xff0c;失败了 -进行ROLLBACK"); return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; // return LocalTransactionState.UNKNOW;
}
}
TransactionCheckListenerImpl--未决事务&#xff0c;服务器回查客户端(目前已经被阉割啦)
package transaction; import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.common.message.MessageExt; /** * 未决事务&#xff0c;服务器回查客户端 */
public class TransactionCheckListenerImpl implements TransactionCheckListener { // private AtomicInteger transactionIndex &#61; new AtomicInteger(0); //在这里&#xff0c;我们可以根据由MQ回传的key去数据库查询&#xff0c;这条数据到底是成功了还是失败了。 public LocalTransactionState checkLocalTransactionState(MessageExt msg) { System.out.println("未决事务&#xff0c;服务器回查客户端msg &#61;" &#43; new String(msg.getBody().toString())); // return LocalTransactionState.ROLLBACK_MESSAGE; return LocalTransactionState.COMMIT_MESSAGE; // return LocalTransactionState.UNKNOW;
}
}
producer端&#xff1a;发送数据到MQ&#xff0c;并且处理本地事物。这里模拟了一个成功一个失败。Consumer只会接收到本地事物成功的数据。第二个数据失败了&#xff0c;不会被消费。
Consumer只会接收到一个&#xff0c;第二个数据不会被接收到