博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ学习(七):消息的生命周期下之消息的消费
阅读量:6210 次
发布时间:2019-06-21

本文共 2736 字,大约阅读时间需要 9 分钟。

源代码版本是3.2.6。接着上一篇消息的产生,这篇是消息的消费。Consumer选择DefaultMQPushConsumer为例。

1.DefaultMQPushConsumer.start()开始。

2.RebalanceService.run()方法定时调用RebalanceImpl.doRebalance()方法,该方法内部是遍历订阅的topic,执行rebalanceByTopic(topic)。

3.调用RebalanceImpl.updateProcessQueueTableInRebalance(),构造PullRequest,从Broker获取nextOffset,pullRequest.setNextOffset(nextOffset),同时更新本地消费进度记录。

4.调用RebalancePushImpl.dispatchPullRequest(List)。

5.调用PullMessageService.executePullRequestImmediately(final PullRequest)放入pullRequestQueue队列中去。

6.PullMessageService.run()从pullRequestQueue队列中取出PullRequest,调用DefaultMQPushConsumerImpl.pullMessage(pullRequest)作拉取消息的动作。

7.构造回调函数PullCallback,对拉取消息结果PullResult做处理,具体是,从PullResult中解码出拉取的消息列表,如果消息的订阅tag不为空且不是classFilter过滤模式,则进行tag过滤,然后把过滤后的消息列表装入PullResult,取出pullResult的nextBeginOffset装入当前的pullRequest的NextOffset中,更新统计数据,异步提交ConsumeRequest进行消息消费,接着提交pullRequest准备做下一次拉取消息的请求。

8.DefaultMQPushConsumerImpl.pullAPIWrapper.pullKernelImpl(//

pullRequest.getMessageQueue(), // 1
subExpression, // 2
subscriptionData.getSubVersion(), // 3
pullRequest.getNextOffset(), // 4
this.defaultMQPushConsumer.getPullBatchSize(), // 5
sysFlag, // 6
commitOffsetValue,// 7
BrokerSuspendMaxTimeMillis, // 8
ConsumerTimeoutMillisWhenSuspend, // 9
CommunicationMode.ASYNC, // 10
pullCallback// 11
);

######到此Consumer端发消息结束######

———我是分割线———-

######接着Request走到Broker######

9.PullMessageProcessor.processRequest()接收到拉消息的请求,做一些简单的判断,如检查Broker权限,确保订阅组存在,检查topic是否存在,然后去messageStore里取消息。

详细说明:DefaultMessageStore根据请求的Topic和queueId获取对应的ConsumerQueue,根据传入的queueOffset从consumerQueue里取出目标buffer,然后以20个字节为单位循环从目标buffer里取,取出偏移量offsetPy(占8个字节),消息长度sizePy(占4个字节),过滤标识tagCode(占8个字节),判断如果订阅信息匹配tagCode,则以offsetPy和sizePy从commitLog中以取出消息体buffer,存入GetMessageResult,然后再进行下一次取,最后返回GetMessageResult。

10.取出GetMessageResult的NextBeginoffset,minOffset,maxOffet3个属性,设置到responseHeader中,然后把GetMessageResult打包进response后发送到Consumer端。

######到此Broker端结束######

———我是分割线———-

######GetMessageResult又走到Consumer######

11.接着response到Consumer端,就会执行前面说的PullCallback里的操作。这里展开消息消费说一下,就是前面提到的,异步提交ConsumeRequest进行消息消费:

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//

pullResult.getMsgFoundList(), //
processQueue, //
pullRequest.getMessageQueue(), //
dispathToConsume);

说明:

consumeMessageService这里是ConsumeMessageConcurrentlyService。
listener就是consumer在start之前注册的listener。

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + ” Receive New Messages: ” + msgs);
System.out.println(” Receive Message Size: ” + msgs.size());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

是多线程并行消费。

近2个月事情比较多,东西都忘光了。

转载地址:http://exkja.baihongyu.com/

你可能感兴趣的文章
C#的未来:闭包注解
查看>>
解读2015之Golang篇:Golang的全迸发时代
查看>>
微软宣布开源WPF、WinForms和WinUI
查看>>
技术之外的收获
查看>>
.NET Core运行时和基础类库性能提升
查看>>
利用JS在AWS上构建大规模弹性Web应用
查看>>
Babylon.js 3.3发布:更强大的粒子系统和WebVR支持
查看>>
威马汽车合伙人、CTO闫枫:用互联网思维改变百年汽车行业
查看>>
Plaid.com的监控系统如何实现与9600多家金融机构的集成
查看>>
时间序列数据库的秘密(3)——加载和分布式计算
查看>>
Uber开源基于web的自主可视化系统,可共享数据
查看>>
伯克利最新研究成果:让机器人比你还了解自己的偏好
查看>>
Safari浏览器的智能跟踪预防工作原理
查看>>
那家CTO带头喊麦的直播公司,快要倒闭了
查看>>
express学习笔记
查看>>
华为吴晟:分布式监控系统的设计与实现
查看>>
.NET Core 2.1的重大缺陷延长了.NET Core 2.0的寿命
查看>>
我的第一次移动端页面制作 — 总结与思考
查看>>
Android 自定义 TabLayout
查看>>
js根据参数名获取url上的参数值
查看>>