1.创建NameServer服务

先用命令docker search rocketmq搜索rocketmq相关镜像

这一步先拉取rocketmqinc/rocketmq镜像,docker pull rocketmqinc/rocketmq

然后在本地创建数据存储路径,因为是在本地电脑搭建环境,所以要用绝对路径

mkdir -p /Users/mymac/docker/rocketmq/data/namesrv/logs /Users/mymac/docker/rocketmq/data/namesrv/store

然后构建namesrv容器,-v也是用刚刚创建的绝对路径

docker run -d \
--restart=always \
--name rmqnamesrv \
-p 9876:9876 \
-v /Users/mymac/docker/rocketmq/data/namesrv/logs:/root/logs \
-v /Users/mymac/docker/rocketmq/data/namesrv/store:/root/store \
-e "MAX_POSSIBLE_HEAP=100000000" \
rocketmqinc/rocketmq \
sh mqnamesrv

2.创建broker结点

先在本地创建数据存储路径

mkdir -p /Users/mymac/docker/rocketmq/data/broker/logs /Users/mymac/docker/rocketmq/data/broker/store /Users/mymac/docker/rocketmq/conf

然后在conf文件夹里面创建一个配置文件夹broker.conf,编辑里面的内容如下

#集群名称
brokerClusterName = DefaultCluster
#broker名称,master和slave名称相同
brokerName = broker-a
#0表示master,大于0表示各个slave
brokerId = 0
#默认凌晨4点消息删除
deleteWhen = 04
#消息在磁盘保留时长,单位小时
fileReservedTime = 48
#broker角色复制方式:SYNC_MASTER,ASYNC_MASTER,SLAVE;即 Master同步复制、Master异步Master、Slave之间同步数据
brokerRole = ASYNC_MASTER
#刷盘策略:ASYNC_FLUSH,SYNC_FLUSH;表示同步刷盘和异步刷盘
flushDiskType = ASYNC_FLUSH

#nameserver地址,其中10.0.54.77是我本机的ip地址(因为我在本机测试),通过ifconfig的en0可以查出
namesrvAddr = 10.0.54.77:9876
#broker结点所在服务器ip地址,因为我在本机测试,所以填写本机ip 10.0.54.77
brokerIP1 = 10.0.54.77
# 监听端口,默认是10911
listenPort = 10911

然后可以创建broker容器了,-v用刚刚创建的绝对路径

docker run -d  \
--name rmqbroker \
--link rmqnamesrv:namesrv \
-p 10911:10911 \
-p 10909:10909 \
-v  /Users/mymac/docker/rocketmq/data/broker/logs:/root/logs \
-v  /Users/mymac/docker/rocketmq/data/broker/store:/root/store \
-v /Users/mymac/docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "MAX_POSSIBLE_HEAP=200000000" \
rocketmqinc/rocketmq \
sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

3.创建rockermq-console服务

先拉取styletang/rocketmq-console-ng镜像,docker pull styletang/rocketmq-console-ng

然后创建容器,其中9999是在本地访问的端口

docker run -d \
--name rmqconsole \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=10.0.54.77:9876 \
-Dcom.rocketmq.sendMessageWithVIPChannel=false" \
-p 9999:8080 \
styletang/rocketmq-console-ng

此时三个容器都创建好了,可以看到如下图

此时在浏览器输入127.0.0.1:9999可以看到如下场景

4.创建生产者

先在rockermq-console浏览器里面创建一个叫kevintest的topic,然后运行如下代码

func main() {
   topic := "kevintest"

   p, _ := rocketmq.NewProducer(
      producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.0.54.89:9876"})),
      producer.WithRetry(2),
   )
   err := p.Start()
   if err != nil {
      log.Printf("start producer error: %s \n", err.Error())
      os.Exit(1)
   }

   for i := 0; i < 10; i++ {
      msg := &primitive.Message{
         Topic: topic,
         Body:  []byte("啦啦啦啦啦啦啦啦啦" + strconv.Itoa(i)),
      }
      res, err := p.SendSync(context.Background(), msg)

      if err != nil {
         fmt.Printf("send message error: %s\n", err)
      } else {
         fmt.Printf("send message success: result=%s\n", res.String())
      }
   }
   err = p.Shutdown()
   if err != nil {
      fmt.Printf("shutdown producer error: %s", err.Error())
   }
}

运行之后可以看到如下信息

INFO[0000] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a\",\"readQueueNums\":8,\"writeQueueNums\":8,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster\",\"brokerName\":\"broker-a\",\"brokerAddrs\":{\"0\":\"10.0.54.89:10911\"}}]}" changedFrom="<nil>" topic=kevintest
send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00001, offsetMsgId=0A00365900002A9F0000000000000D48, queueOffset=4, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=1]]
send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00002, offsetMsgId=0A00365900002A9F0000000000000DF2, queueOffset=4, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=2]]
send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00003, offsetMsgId=0A00365900002A9F0000000000000E9C, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=3]]
send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00004, offsetMsgId=0A00365900002A9F0000000000000F46, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=4]]
send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00005, offsetMsgId=0A00365900002A9F0000000000000FF0, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=5]]
send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00006, offsetMsgId=0A00365900002A9F000000000000109A, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=6]]
send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00007, offsetMsgId=0A00365900002A9F0000000000001144, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=7]]
send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00008, offsetMsgId=0A00365900002A9F00000000000011EE, queueOffset=2, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=0]]
send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c00009, offsetMsgId=0A00365900002A9F0000000000001298, queueOffset=5, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=1]]
send message success: result=SendResult [sendStatus=0, msgIds=0A003659F951000000007e00b6c0000a, offsetMsgId=0A00365900002A9F0000000000001342, queueOffset=5, messageQueue=MessageQueue [topic=kevintest, brokerName=broker-a, queueId=2]]
INFO[0000] will remove client from clientMap             clientID=10.0.54.89@63825

进程 已完成,退出代码为 0

在rockermq-console浏览器的Message也可以看到消息发送成功

5.创建消费者

现在来编写消费者的代码

func main() {
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"10.0.54.89:9876"})),
	)
	err := c.Subscribe("kevintest", consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for i := range msgs {
			fmt.Printf("subscribe callback: %v \n", msgs[i])
		}

		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}

	err = c.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}

	//不能马上退出,要等到收到消息
	time.Sleep(time.Millisecond*30000)
	err = c.Shutdown()
	if err != nil {
		fmt.Printf("shutdown Consumer error: %s", err.Error())
	}
}

运行上面代码之后会到的如下结果

WARN[0000] delete mq from offset table                   MessageQueue="MessageQueue [topic=%RETRY%DEFAULT_CONSUMER, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER
WARN[0000] fecth offset of mq from broker success        MessageQueue="MessageQueue [topic=%RETRY%DEFAULT_CONSUMER, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER offset=0
INFO[0000] the MessageQueue changed, version also updated  changeTo=1643080672878951000 changedFrom=0
INFO[0000] The PullThresholdForTopic is changed          changeTo=102400 changedFrom=102400
INFO[0000] The PullThresholdSizeForTopic is changed      changeTo=51200 changedFrom=51200
WARN[0000] delete mq from offset table                   MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=4]" consumerGroup=DEFAULT_CONSUMER
WARN[0000] fecth offset of mq from broker success        MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=4]" consumerGroup=DEFAULT_CONSUMER offset=1
WARN[0000] delete mq from offset table                   MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=5]" consumerGroup=DEFAULT_CONSUMER
WARN[0000] fecth offset of mq from broker success        MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=5]" consumerGroup=DEFAULT_CONSUMER offset=1
WARN[0000] delete mq from offset table                   MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=6]" consumerGroup=DEFAULT_CONSUMER
WARN[0000] fecth offset of mq from broker success        MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=6]" consumerGroup=DEFAULT_CONSUMER offset=1
WARN[0000] delete mq from offset table                   MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=7]" consumerGroup=DEFAULT_CONSUMER
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦3, Flag=0, properties=map[CONSUME_START_TIME:1643080672908 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880004], TransactionId=], MsgId=0A00364DF14300000000799d60880004, Offse00364D00002A9F00000000000008A2,QueueId=4, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757033, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757044, StoreHost=10.0.54.77:10911, CommitLogOffset=2210, BodyCRC=2146591400, ReconsumeTimes=0, PreparedTransactionOffset=0] 
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦3, Flag=0, properties=map[CONSUME_START_TIME:1643080672908 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00004], TransactionId=], MsgId=0A003659F951000000007e00b6c00004, Offse00365900002A9F0000000000000F46,QueueId=4, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376546, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376569, StoreHost=10.0.54.89:10911, CommitLogOffset=3910, BodyCRC=2146591400, ReconsumeTimes=0, PreparedTransactionOffset=0] 
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦4, Flag=0, properties=map[CONSUME_START_TIME:1643080672910 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880005], TransactionId=], MsgId=0A00364DF14300000000799d60880005, Offse00364D00002A9F000000000000094C,QueueId=5, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757036, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757047, StoreHost=10.0.54.77:10911, CommitLogOffset=2380, BodyCRC=1637283595, ReconsumeTimes=0, PreparedTransactionOffset=0] 
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦4, Flag=0, properties=map[CONSUME_START_TIME:1643080672910 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00005], TransactionId=], MsgId=0A003659F951000000007e00b6c00005, Offse00365900002A9F0000000000000FF0,QueueId=5, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376550, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376573, StoreHost=10.0.54.89:10911, CommitLogOffset=4080, BodyCRC=1637283595, ReconsumeTimes=0, PreparedTransactionOffset=0] 
WARN[0000] fecth offset of mq from broker success        MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=7]" consumerGroup=DEFAULT_CONSUMER offset=1
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦5, Flag=0, properties=map[CONSUME_START_TIME:1643080672914 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880006], TransactionId=], MsgId=0A00364DF14300000000799d60880006, Offse00364D00002A9F00000000000009F6,QueueId=6, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757039, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757051, StoreHost=10.0.54.77:10911, CommitLogOffset=2550, BodyCRC=378652573, ReconsumeTimes=0, PreparedTransactionOffset=0] 
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦5, Flag=0, properties=map[CONSUME_START_TIME:1643080672914 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00006], TransactionId=], MsgId=0A003659F951000000007e00b6c00006, Offse00365900002A9F000000000000109A,QueueId=6, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376554, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376577, StoreHost=10.0.54.89:10911, CommitLogOffset=4250, BodyCRC=378652573, ReconsumeTimes=0, PreparedTransactionOffset=0] 
WARN[0000] delete mq from offset table                   MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER
WARN[0000] fecth offset of mq from broker success        MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=0]" consumerGroup=DEFAULT_CONSUMER offset=1
WARN[0000] delete mq from offset table                   MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=1]" consumerGroup=DEFAULT_CONSUMER
WARN[0000] fecth offset of mq from broker success        MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=1]" consumerGroup=DEFAULT_CONSUMER offset=2
WARN[0000] delete mq from offset table                   MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=2]" consumerGroup=DEFAULT_CONSUMER
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦6, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880007], TransactionId=], MsgId=0A00364DF14300000000799d60880007, Offse00364D00002A9F0000000000000AA0,QueueId=7, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757043, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757054, StoreHost=10.0.54.77:10911, CommitLogOffset=2720, BodyCRC=261658151, ReconsumeTimes=0, PreparedTransactionOffset=0] 
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦6, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00007], TransactionId=], MsgId=0A003659F951000000007e00b6c00007, Offse00365900002A9F0000000000001144,QueueId=7, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376558, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376581, StoreHost=10.0.54.89:10911, CommitLogOffset=4420, BodyCRC=261658151, ReconsumeTimes=0, PreparedTransactionOffset=0] 
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦7, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880008], TransactionId=], MsgId=0A00364DF14300000000799d60880008, Offse00364D00002A9F0000000000000B4A,QueueId=0, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757046, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757057, StoreHost=10.0.54.77:10911, CommitLogOffset=2890, BodyCRC=2023728817, ReconsumeTimes=0, PreparedTransactionOffset=0] 
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦7, Flag=0, properties=map[CONSUME_START_TIME:1643080672927 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00008], TransactionId=], MsgId=0A003659F951000000007e00b6c00008, Offse00365900002A9F00000000000011EE,QueueId=0, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376562, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376585, StoreHost=10.0.54.89:10911, CommitLogOffset=4590, BodyCRC=2023728817, ReconsumeTimes=0, PreparedTransactionOffset=0] 
WARN[0000] fecth offset of mq from broker success        MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=2]" consumerGroup=DEFAULT_CONSUMER offset=2
WARN[0000] delete mq from offset table                   MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=3]" consumerGroup=DEFAULT_CONSUMER
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦0, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880001], TransactionId=], MsgId=0A00364DF14300000000799d60880001, Offse00364D00002A9F00000000000006A4,QueueId=1, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643006757016, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757031, StoreHost=10.0.54.77:10911, CommitLogOffset=1700, BodyCRC=1727738642, ReconsumeTimes=0, PreparedTransactionOffset=0] 
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦8, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880009], TransactionId=], MsgId=0A00364DF14300000000799d60880009, Offse00364D00002A9F0000000000000BF4,QueueId=1, StoreSize=170, QueueOffset=3, SysFlag=0, BornTimestamp=1643006757048, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757059, StoreHost=10.0.54.77:10911, CommitLogOffset=3060, BodyCRC=1746975520, ReconsumeTimes=0, PreparedTransactionOffset=0] 
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦0, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00001], TransactionId=], MsgId=0A003659F951000000007e00b6c00001, Offse00365900002A9F0000000000000D48,QueueId=1, StoreSize=170, QueueOffset=4, SysFlag=0, BornTimestamp=1643080376514, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376550, StoreHost=10.0.54.89:10911, CommitLogOffset=3400, BodyCRC=1727738642, ReconsumeTimes=0, PreparedTransactionOffset=0] 
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦8, Flag=0, properties=map[CONSUME_START_TIME:1643080672933 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00009], TransactionId=], MsgId=0A003659F951000000007e00b6c00009, Offse00365900002A9F0000000000001298,QueueId=1, StoreSize=170, QueueOffset=5, SysFlag=0, BornTimestamp=1643080376566, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376589, StoreHost=10.0.54.89:10911, CommitLogOffset=4760, BodyCRC=1746975520, ReconsumeTimes=0, PreparedTransactionOffset=0] 
WARN[0000] fecth offset of mq from broker success        MessageQueue="MessageQueue [topic=kevintest, brokerName=broker-a, queueId=3]" consumerGroup=DEFAULT_CONSUMER offset=1
INFO[0000] the MessageQueue changed, version also updated  changeTo=1643080672937325000 changedFrom=0
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦1, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880002], TransactionId=], MsgId=0A00364DF14300000000799d60880002, Offse00364D00002A9F000000000000074E,QueueId=2, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643006757023, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757037, StoreHost=10.0.54.77:10911, CommitLogOffset=1870, BodyCRC=301728644, ReconsumeTimes=0, PreparedTransactionOffset=0] 
INFO[0000] The PullThresholdForTopic is changed          changeTo=11377 changedFrom=102400
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦9, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d6088000a], TransactionId=], MsgId=0A00364DF14300000000799d6088000a, Offse00364D00002A9F0000000000000C9E,QueueId=2, StoreSize=170, QueueOffset=3, SysFlag=0, BornTimestamp=1643006757051, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757062, StoreHost=10.0.54.77:10911, CommitLogOffset=3230, BodyCRC=522685366, ReconsumeTimes=0, PreparedTransactionOffset=0] 
INFO[0000] The PullThresholdSizeForTopic is changed      changeTo=5688 changedFrom=51200
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦1, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00002], TransactionId=], MsgId=0A003659F951000000007e00b6c00002, Offse00365900002A9F0000000000000DF2,QueueId=2, StoreSize=170, QueueOffset=4, SysFlag=0, BornTimestamp=1643080376538, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376561, StoreHost=10.0.54.89:10911, CommitLogOffset=3570, BodyCRC=301728644, ReconsumeTimes=0, PreparedTransactionOffset=0] 
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦9, Flag=0, properties=map[CONSUME_START_TIME:1643080672937 MAX_OFFSET:6 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c0000a], TransactionId=], MsgId=0A003659F951000000007e00b6c0000a, Offse00365900002A9F0000000000001342,QueueId=2, StoreSize=170, QueueOffset=5, SysFlag=0, BornTimestamp=1643080376570, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376593, StoreHost=10.0.54.89:10911, CommitLogOffset=4930, BodyCRC=522685366, ReconsumeTimes=0, PreparedTransactionOffset=0] 
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦2, Flag=0, properties=map[CONSUME_START_TIME:1643080672941 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A00364DF14300000000799d60880003], TransactionId=], MsgId=0A00364DF14300000000799d60880003, Offse00364D00002A9F00000000000007F8,QueueId=3, StoreSize=170, QueueOffset=1, SysFlag=0, BornTimestamp=1643006757029, BornHost=172.17.0.1:59328, StoreTimestamp=1643006757040, StoreHost=10.0.54.77:10911, CommitLogOffset=2040, BodyCRC=150295102, ReconsumeTimes=0, PreparedTransactionOffset=0] 
subscribe callback: [Message=[topic=kevintest, body=啦啦啦啦啦啦啦啦啦2, Flag=0, properties=map[CONSUME_START_TIME:1643080672941 MAX_OFFSET:3 MIN_OFFSET:0 UNIQ_KEY:0A003659F951000000007e00b6c00003], TransactionId=], MsgId=0A003659F951000000007e00b6c00003, Offse00365900002A9F0000000000000E9C,QueueId=3, StoreSize=170, QueueOffset=2, SysFlag=0, BornTimestamp=1643080376542, BornHost=172.17.0.1:55196, StoreTimestamp=1643080376566, StoreHost=10.0.54.89:10911, CommitLogOffset=3740, BodyCRC=150295102, ReconsumeTimes=0, PreparedTransactionOffset=0]

进程 已完成,退出代码为 0

可见消息消费成功

6.参考链接

https://www.masaiqi.com/archives/rocketmq%E5%AD%A6%E4%B9%A0%E4%B8%8E%E9%83%A8%E7%BD%B2docker%E8%AE%B0%E5%BD%95%E5%90%8E%E7%BB%AD%E6%9B%B4%E6%96%B0

https://www.cnblogs.com/manwaa/p/12675031.html

https://blog.csdn.net/ming19951224/article/details/109063041

https://github.com/apache/rocketmq-client-go/blob/master/examples/producer/simple/main.go

https://github.com/apache/rocketmq-client-go/blob/master/examples/consumer/simple/main.go