#需要创建文件夹
/docker/namesrv/logs
/docker/namesrv/store
/docker/rocketmq/logs
/docker/rocketmq/store
#需要创建文件
/docker/rocketmq/broker.conf
文件内容如下:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = {本地外网 IP}
#部署并启动nameserver
docker run -d -p 9876:9876 -v /docker/namesrv/logs:/root/logs -v /docker/namesrv/store:/root/store --name rmqnamesrv foxiswho/rocketmq:server-4.5.1
#部署并启动broker
docker run -d -p 10911:10911 -p 10909:10909 -e TZ="Asia/Shanghai" -v /docker/rocketmq/logs:/root/logs -v /docker/rocketmq/store:/root/store -v /docker/rocketmq/broker.conf:/opt/rocketmq-4.5.1/conf/broker.conf --name rmqbroker -e "NAMESRV_ADDR=192.168.100.30:9876" -e "MAX_POSSIBLE_HEAP=200000000" foxiswho/rocketmq:broker-4.5.1 sh mqbroker -c /opt/rocketmq-4.5.1/conf/broker.conf
#部署并启动console
docker run -d --name rmqconsole -p 8180:8080 -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.100.30:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -t styletang/rocketmq-console-ng
package main
import (
"context"
"fmt"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
type DemoListener struct {
localTrans *sync.Map
transactionIndex int32
}
func NewDemoListener() *DemoListener {
return &DemoListener{
localTrans: new(sync.Map),
}
}
func (dl *DemoListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
nextIndex := atomic.AddInt32(&dl.transactionIndex, 1)
fmt.Printf("nextIndex: %v for transactionID: %v\n", nextIndex, msg.TransactionId)
status := nextIndex % 3
dl.localTrans.Store(msg.TransactionId, primitive.LocalTransactionState(status+1))
fmt.Printf("dl")
//在SendMessageInTransaction 方法调用ExecuteLocalTransaction方法,
//如果ExecuteLocalTransaction 返回primitive.UnknowState 那么brocker就会调用CheckLocalTransaction方法检查消息状态
// 如果返回 primitive.CommitMessageState 和primitive.RollbackMessageState 则不会调用CheckLocalTransaction
return primitive.UnknowState
}
func (dl *DemoListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
fmt.Printf("%v msg transactionID : %v\n", time.Now(), msg.TransactionId)
v, existed := dl.localTrans.Load(msg.TransactionId)
if !existed {
fmt.Printf("unknow msg: %v, return Commit", msg)
return primitive.CommitMessageState
}
state := v.(primitive.LocalTransactionState)
switch state {
case 1:
fmt.Printf("checkLocalTransaction COMMIT_MESSAGE: %v\n", msg)
return primitive.CommitMessageState
case 2:
fmt.Printf("checkLocalTransaction ROLLBACK_MESSAGE: %v\n", msg)
return primitive.RollbackMessageState
case 3:
fmt.Printf("checkLocalTransaction unknow: %v\n", msg)
return primitive.UnknowState
default:
fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg)
return primitive.CommitMessageState
}
}
func main() {
p, _ := rocketmq.NewTransactionProducer(
NewDemoListener(),
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.100.30:9876"})),
producer.WithRetry(1),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s\n", err.Error())
os.Exit(1)
}
topic := "test"
for i := 0; i <10; i++ {
res, err := p.SendMessageInTransaction(context.Background(),
primitive.NewMessage(topic, []byte("Hello RocketMQ again "+strconv.Itoa(i))))
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}
time.Sleep(5 * time.Minute)
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"192.168.100.30:9876"})),
)
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i])
}
//这个相当于消费者 消息ack,如果失败可以返回 consumer.ConsumeRetryLater
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
time.Sleep(time.Hour)
err = c.Shutdown()
if err != nil {
fmt.Printf("shutdown Consumer error: %s", err.Error())
}
}