热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

gorocketMq

我想用rocketMq大家主要是用它的事务,所以拿着官方的代码体验一下环境用docker安装rocketMq#需要创建文件夹dockernamesrvlogsdockernames

我想用rocketMq大家主要是用它的事务,所以拿着官方的代码体验一下


环境

用docker安装rocketMq


#需要创建文件夹
/docker/namesrv/logs
/docker/namesrv/store
/docker/rocketmq/logs
/docker/rocketmq/store
#需要创建文件
/docker/rocketmq/broker.conf
文件内容如下:
brokerClusterName
= DefaultCluster
brokerName
= broker-a
brokerId
= 0
deleteWhen
= 04
fileReservedTime
= 48
brokerRole
= ASYNC_MASTER
flushDiskType
= ASYNC_FLUSH
brokerIP1
= {本地外网 IP}
#部署并启动nameserver
docker run
-d -p 9876:9876 -v /docker/namesrv/logs:/root/logs -v /docker/namesrv/store:/root/store --name rmqnamesrv foxiswho/rocketmq:server-4.5.1
#部署并启动broker
docker run
-d -p 10911:10911 -p 10909:10909 -e TZ="Asia/Shanghai" -v /docker/rocketmq/logs:/root/logs -v /docker/rocketmq/store:/root/store -v /docker/rocketmq/broker.conf:/opt/rocketmq-4.5.1/conf/broker.conf --name rmqbroker -e "NAMESRV_ADDR=192.168.100.30:9876" -e "MAX_POSSIBLE_HEAP=200000000" foxiswho/rocketmq:broker-4.5.1 sh mqbroker -c /opt/rocketmq-4.5.1/conf/broker.conf
#部署并启动console
docker run
-d --name rmqconsole -p 8180:8080 -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.100.30:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -t styletang/rocketmq-console-ng

访问宿主机外网 IP:8180 访问console,如下界面不报错表示成功。


go代码:

服务端:


package main
import (
"context"
"fmt"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
type DemoListener
struct {
localTrans
*sync.Map
transactionIndex int32
}
func NewDemoListener()
*DemoListener {
return &DemoListener{
localTrans:
new(sync.Map),
}
}
func (dl
*DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
nextIndex :
= atomic.AddInt32(&dl.transactionIndex, 1)
fmt.Printf(
"nextIndex: %v for transactionID: %v\n", nextIndex, msg.TransactionId)
status :
= nextIndex % 3
dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status
+1))
fmt.Printf(
"dl")
//在SendMessageInTransaction 方法调用ExecuteLocalTransaction方法,
//如果ExecuteLocalTransaction 返回primitive.UnknowState 那么brocker就会调用CheckLocalTransaction方法检查消息状态
// 如果返回 primitive.CommitMessageState 和primitive.RollbackMessageState 则不会调用CheckLocalTransaction
return primitive.UnknowState
}
func (dl
*DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
fmt.Printf(
"%v msg transactionID : %v\n", time.Now(), msg.TransactionId)
v, existed :
= dl.localTrans.Load(msg.TransactionId)
if !existed {
fmt.Printf(
"unknow msg: %v, return Commit", msg)
return primitive.CommitMessageState
}
state :
= v.(primitive.LocalTransactionState)
switch state {
case 1:
fmt.Printf(
"checkLocalTransaction COMMIT_MESSAGE: %v\n", msg)
return primitive.CommitMessageState
case 2:
fmt.Printf(
"checkLocalTransaction ROLLBACK_MESSAGE: %v\n", msg)
return primitive.RollbackMessageState
case 3:
fmt.Printf(
"checkLocalTransaction unknow: %v\n", msg)
return primitive.UnknowState
default:
fmt.Printf(
"checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg)
return primitive.CommitMessageState
}
}
func main() {
p, _ :
= rocketmq.NewTransactionProducer(
NewDemoListener(),
producer.WithNsResolver(primitive.NewPassthroughResolver([]
string{"192.168.100.30:9876"})),
producer.WithRetry(
1),
)
err :
= p.Start()
if err != nil {
fmt.Printf(
"start producer error: %s\n", err.Error())
os.Exit(
1)
}
topic :
= "test"
for i := 0; i <10; i++ {
res, err :
= p.SendMessageInTransaction(context.Background(),
primitive.NewMessage(topic, []
byte("Hello RocketMQ again "+strconv.Itoa(i))))
if err != nil {
fmt.Printf(
"send message error: %s\n", err)
}
else {
fmt.Printf(
"send message success: result=%s\n", res.String())
}
}
time.Sleep(
5 * time.Minute)
err
= p.Shutdown()
if err != nil {
fmt.Printf(
"shutdown producer error: %s", err.Error())
}
}

客户端:


package main
import (
"context"
"fmt"
"os"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
c, _ :
= rocketmq.NewPushConsumer(
consumer.WithGroupName(
"testGroup"),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]
string{"192.168.100.30:9876"})),
)
err :
= c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...
*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf(
"subscribe callback: %v \n", msgs[i])
}
//这个相当于消费者 消息ack,如果失败可以返回 consumer.ConsumeRetryLater
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(
-1)
}
time.Sleep(time.Hour)
err
= c.Shutdown()
if err != nil {
fmt.Printf(
"shutdown Consumer error: %s", err.Error())
}
}

管理界面随便截个图:

技术图片

 

go rocketMq



推荐阅读
  • 本文内容为asp.net微信公众平台开发的目录汇总,包括数据库设计、多层架构框架搭建和入口实现、微信消息封装及反射赋值、关注事件、用户记录、回复文本消息、图文消息、服务搭建(接入)、自定义菜单等。同时提供了示例代码和相关的后台管理功能。内容涵盖了多个方面,适合综合运用。 ... [详细]
  • 本文介绍了lua语言中闭包的特性及其在模式匹配、日期处理、编译和模块化等方面的应用。lua中的闭包是严格遵循词法定界的第一类值,函数可以作为变量自由传递,也可以作为参数传递给其他函数。这些特性使得lua语言具有极大的灵活性,为程序开发带来了便利。 ... [详细]
  • 基于layUI的图片上传前预览功能的2种实现方式
    本文介绍了基于layUI的图片上传前预览功能的两种实现方式:一种是使用blob+FileReader,另一种是使用layUI自带的参数。通过选择文件后点击文件名,在页面中间弹窗内预览图片。其中,layUI自带的参数实现了图片预览功能。该功能依赖于layUI的上传模块,并使用了blob和FileReader来读取本地文件并获取图像的base64编码。点击文件名时会执行See()函数。摘要长度为169字。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • Metasploit攻击渗透实践
    本文介绍了Metasploit攻击渗透实践的内容和要求,包括主动攻击、针对浏览器和客户端的攻击,以及成功应用辅助模块的实践过程。其中涉及使用Hydra在不知道密码的情况下攻击metsploit2靶机获取密码,以及攻击浏览器中的tomcat服务的具体步骤。同时还讲解了爆破密码的方法和设置攻击目标主机的相关参数。 ... [详细]
  • 本文介绍了C#中数据集DataSet对象的使用及相关方法详解,包括DataSet对象的概述、与数据关系对象的互联、Rows集合和Columns集合的组成,以及DataSet对象常用的方法之一——Merge方法的使用。通过本文的阅读,读者可以了解到DataSet对象在C#中的重要性和使用方法。 ... [详细]
  • Mac OS 升级到11.2.2 Eclipse打不开了,报错Failed to create the Java Virtual Machine
    本文介绍了在Mac OS升级到11.2.2版本后,使用Eclipse打开时出现报错Failed to create the Java Virtual Machine的问题,并提供了解决方法。 ... [详细]
  • 本文详细介绍了Linux中进程控制块PCBtask_struct结构体的结构和作用,包括进程状态、进程号、待处理信号、进程地址空间、调度标志、锁深度、基本时间片、调度策略以及内存管理信息等方面的内容。阅读本文可以更加深入地了解Linux进程管理的原理和机制。 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • 后台获取视图对应的字符串
    1.帮助类后台获取视图对应的字符串publicclassViewHelper{将View输出为字符串(注:不会执行对应的ac ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • 本文介绍了通过ABAP开发往外网发邮件的需求,并提供了配置和代码整理的资料。其中包括了配置SAP邮件服务器的步骤和ABAP写发送邮件代码的过程。通过RZ10配置参数和icm/server_port_1的设定,可以实现向Sap User和外部邮件发送邮件的功能。希望对需要的开发人员有帮助。摘要长度:184字。 ... [详细]
  • 本文介绍了指针的概念以及在函数调用时使用指针作为参数的情况。指针存放的是变量的地址,通过指针可以修改指针所指的变量的值。然而,如果想要修改指针的指向,就需要使用指针的引用。文章还通过一个简单的示例代码解释了指针的引用的使用方法,并思考了在修改指针的指向后,取指针的输出结果。 ... [详细]
  • 在project.properties添加#Projecttarget.targetandroid-19android.library.reference.1..Sliding ... [详细]
  • PDF内容编辑的两种小方法,你知道怎么操作吗?
    本文介绍了两种PDF内容编辑的方法:迅捷PDF编辑器和Adobe Acrobat DC。使用迅捷PDF编辑器,用户可以通过选择需要更改的文字内容并设置字体形式、大小和颜色来编辑PDF文件。而使用Adobe Acrobat DC,则可以通过在软件中点击编辑来编辑PDF文件。PDF文件的编辑可以帮助办公人员进行文件内容的修改和定制。 ... [详细]
author-avatar
手机用户2502861455
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有