網上有很多關于pos機發送數據,你來簡單回答一下RocketMQ的默認發送流程的知識,也有很多人為大家解答關于pos機發送數據的問題,今天pos機之家(www.bangarufamily.com)為大家整理了關于這方面的知識,讓我們一起來看下吧!
本文目錄一覽:
1、pos機發送數據
pos機發送數據
今天我們就開始學習下默認消息發送流程,學習他的實現思路,也幫助我們工作中,遇到了問題不會手足無措。
思考問題消息發送者是如何做負載均衡的?消息發送者是如何保證高可用的?消息發送批量消息如何保證一致性的?默認發送流程-工作原理源碼入口:org.apache.rocketMq.client.producer.DefaultMQProducer#Send(org.apache.rocketmq.common.message.Message)
啟動Demo:
DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("xxx:9876"); producer.start();Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);SendResult sendResult = producer.send(msg);
流程:
1.校驗主題,設置主題
msg.setTopic(withNamespace(msg.getTopic()));
public String withNamespace(String resource) { return NamespaceUtil.wrapNamespace(this.getNamespace(), resource);}
2.默認發送方式為同步發送,默認超時時間為3s
private int sendMsgTimeout = 3000;
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);}
3.確認 producer service 運行狀態是否為運行中
入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#makeSureStateOK
//檢查狀態,如果不是RUNNING狀態則拋出異常private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { throw new MQClientException("The producer service state not OK, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); }}
4.校驗信息
topic長達是否大于TOPIC_MAX_LENGTH,topic是否為空是否通過正則校驗,body是否為空,body大小是否超過4Mpublic static void checkTopic(String topic) throws MQClientException { if (UtilAll.isBlank(topic)) { throw new MQClientException("The specified topic is blank", null); } if (topic.length() > TOPIC_MAX_LENGTH) { throw new MQClientException( String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null); } if (isTopicOrGroupIllegal(topic)) { throw new MQClientException(String.format( "The specified topic[%s] contains illegal characters, allowing only %s", topic, "^[%|a-zA-Z0-9_-]+$"), null); }}
// body if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
5.找到主題發布的信息,未找到則拋出異常
入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo
消息生產者更新和維護路由信息緩存
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); // 消息生產者更新和維護路由信息緩存 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; }}
6.通過TopicPublishInfo 找到對應的MessageQueue下的,BrokerName信息
入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue
獲取到BrokerName對應的MessageQueue信息
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); }}
如果lastBrokerName為null,通過對 sendWhichQueue 方法獲取一個隊列
取余,然后從messageQueueList中獲取一個MessageQueue
public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos);}
7.最后消息發送
入口: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
1.根絕BrokerName獲取到broker地址
在啟動階段,對BrokerAddrTable信息進行了維護
public String findBrokerAddressInPublish(final String brokerName) { HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName); if (map != null && !map.isEmpty()) { return map.get(MixAll.MASTER_ID); } return null;}
如果未找到,則通過主題查找主題信息,通過更新路由信息后,在嘗試獲取,如果還未找到則拋出異常
if (null == brokerAddr) { // 1.1 如果未找到,則通過主題查找主題信息,通過更新路由信息后,在嘗試獲取,如果還未找到則拋出異常 tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}
2.為消息分配全局唯一ID
// 為消息分配全局唯一IDif (!(msg instanceof Messagebatch)) { MessageClientIDSetter.setUniqID(msg);}
在RocketMQ消息發送-請求與響應文章中,我們已經學習了請求參數中,創建了全局唯一的MsgId,可以回頭看一看
3.注冊鉤子消息發送鉤子函數
這里主要做了三件事情,確認MsgType類型、是否為延遲消息、調用鉤子函數內的方法
if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); // 3.1 通過isTrans來確定MsgType類型 if ("true".equals(isTrans)) { context.setMsgType(MessageType.Trans_Msg_Half); } // 3.2 如果msg里面 __STARTDELIVERTIME 或者 DELAY 不為空,則設置為延遲消息 if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } // 3.3 調用鉤子函數里的方法 this.executeSendMessageHookBefore(context);}
4.設置發送信息請求頭SendMessageRequestHeader
最后根據默認發送方式,進行消息的發送
主要利用NettyRemotingClient進行發送,這里就先不展開來說了 入口: MQClientAPIImpl.sendMessage()
問題答復消息發送者是如何做負載均衡的?默認采用輪詢,每一個消息發送者全局會維護一個 Topic 上一次選擇的隊列,然后基于這個序號進行遞增輪詢AllocateMessageQueueAveragely平均分配,按照總數除以消費者個數進行,對每個消費者進行分配AllocateMessageQueueAveragelyByCircle 輪流平均分配,按照消費者個數,進行輪詢分配消息發送者是如何保證高可用的?在上面的步驟中通過TopicPublishInfo 找到對應的MessageQueue下的,BrokerName信息,利用參數sendLatencyFaultEnable來開啟關閉故障規避機制sendLatencyFaultEnable 設置為 true:開啟延遲規避機制,一旦消息發送失敗會將 broker-a “悲觀”地認為在接下來的一段時間內該 Broker 不可用,在為未來某一段時間內所有的客戶端不會向該 Broker 發送消息。使用本次消息發送延遲時間來計算Broker故障規避時長,不參與消息發送隊列負載final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq;}
但是這樣子做可能帶來的后果是Broker沒有可用的情況,或者是某個Broker數據激增,增加消費者的壓力,所以默認不開啟規避機制,遇到消息發送失敗,規避 broker-a,但是在下一次消息發送時,即再次調用broker-a。
消息發送批量消息如何保證一致性的?將一個Topic下的消息,通過batch方法包一起發送客戶端ID與使用陷阱摘自丁威老師的文章
總結這段時間主要學習了RocketMQ的消息發送,主要是以源碼為主,深入了解了消息發送的啟動和消息發送的流程,以及認識到客戶端ID與使用陷阱 一圖總結
作者:叫我小郭_鏈接:https://juejin.cn/post/7105315713157431332來源:稀土掘金
以上就是關于pos機發送數據,你來簡單回答一下RocketMQ的默認發送流程的知識,后面我們會繼續為大家整理關于pos機發送數據的知識,希望能夠幫助到大家!
