rocket mq发送文件(RocketMQMQ消息发送)

首先来看一个RcoketMQ发送消息的例子:@Service public class MQService { @Autowired DefaultMQProducer defaultMQProducer; public void SendMsg() { String msg = "我是一条消息"; // 创建消息,指定TOPIC、TAG和消息内容 Message sendMsg = new Message("TestTopic", "TestTag", msg.getBytes()); SendResult sendResult = null; try { // 同步发送消息 sendResult = defaultMQProducer.send(sendMsg); System.out.println("消息发送响应:" sendResult.toString()); } catch (mqClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } ,我来为大家讲解一下关于rocket mq发送文件?跟着小编一起来看一看吧!

rocket mq发送文件(RocketMQMQ消息发送)

rocket mq发送文件

消息发送

首先来看一个RcoketMQ发送消息的例子:

@Service public class MQService { @Autowired DefaultMQProducer defaultMQProducer; public void SendMsg() { String msg = "我是一条消息"; // 创建消息,指定TOPIC、TAG和消息内容 Message sendMsg = new Message("TestTopic", "TestTag", msg.getBytes()); SendResult sendResult = null; try { // 同步发送消息 sendResult = defaultMQProducer.send(sendMsg); System.out.println("消息发送响应:" sendResult.toString()); } catch (mqClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }

RocketMQ是通过DefaultMQProducer进行消息发送的,它实现了MQProducer接口,MQProducer接口中定义了消息发送的方法,方法主要分为三大类:

  1. 同步进行消息发送,向Broker发送消息之后等待响应结果
  2. 异步进行消息发送,向Broker发送消息之后立刻返回,当消息发送完毕之后触发回调函数
  3. sendOneway单向发送,也是异步消息发送,向Broker发送消息之后立刻返回,但是没有回调函数

public interface MQProducer extends MQAdmin { // 同步发送消息 SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; // 异步发送消息,SendCallback为回调函数 void send(final Message msg, final SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException; // 异步发送消息,没有回调函数 void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException; // 省略其他方法 }

接下来以将以同步消息发送为例来分析消息发送的流程。

DefaultMQProducer里面有一个DefaultMQProducerImpl类型的成员变量defaultMQProducerImpl,从默认的无参构造函数中可以看出在构造函数中对defaultMQProducerImpl进行了实例化,在send方法中就是调用defaultMQProducerImpl的方法进行消息发送的:

public class DefaultMQProducer extends ClientConfig implements MQProducer { /** * 默认消息生产者实现类 */ protected final transient DefaultMQProducerImpl defaultMQProducerImpl; /** * 默认的构造函数 */ public DefaultMQProducer() { this(null, MixAll.DEFAULT_PRODUCER_GROUP, null); } /** * 构造函数 */ public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) { this.namespace = namespace; this.producerGroup = producerGroup; // 实例化 defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); } /** * 同步发送消息 */ @Override public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 设置主题 msg.setTopic(withNamespace(msg.getTopic())); // 发送消息 return this.defaultMQProducerImpl.send(msg); } }

DefaultMQProducerImpl中消息的发送在sendDefaultImpl方法中实现,处理逻辑如下:

  1. 根据设置的主题查找对应的路由信息TopicPublishInfo
  2. 获取失败重试次数,在消息发送失败时进行重试
  3. 获取上一次选择的消息队列所在的Broker,如果上次选择的Broker为空则为NULL,然后调用selectOneMessageQueue方法选择一个消息队列,并记录本次选择的消息队列,在下一次发送消息时选择队列时使用
  4. 计算选择消息队列的耗时,如果大于超时时间,终止本次发送
  5. 调用sendKernelImpl方法进行消息发送
  6. 调用updateFaultItem记录向Broker发送消息的耗时,在开启故障延迟处理机制时使用

public class DefaultMQProducerImpl implements MQProducerInner { /** * DEFAULT SYNC ------------------------------------------------------- */ public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 发送消息 return send(msg, this.defaultMQProducer.getSendMsgTimeout()); } public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 发送消息 return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); } /** * 发送消息 * @param msg 发送的消息 * @param communicationMode * @param sendCallback 回调函数 * @param timeout 超时时间 */ private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); // 开始时间 long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // 查找主题路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; // 消息队列 MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // 获取失败重试次数 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times ) { // 获取BrokerName String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 根据BrokerName选择一个消息队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { // 记录本次选择的消息队列 mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { // 记录时间 beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } // 计算选择消息队列的耗时时间 long costTime = beginTimestampPrev - beginTimestampFirst; // 如果已经超时,终止发送 if (timeout < costTime) { callTimeout = true; break; } // 发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); // 结束时间 endTimestamp = System.currentTimeMillis(); // 记录向Broker发送消息的请求耗时,消息发送结束时间 - 开始时间 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: // 如果发送失败 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { // 是否重试 if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } // 返回结果 return sendResult; default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); // 如果抛出异常,记录请求耗时 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } // ... 省略其他异常处理 } else { break; } } if (sendResult != null) { return sendResult; } // ... } validateNameServerSetting(); throw new MQClientException("No route info of this topic: " msg.getTopic() FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); } } 折叠

获取路由信息

DefaultMQProducerImpl中有一个路由信息表topicPublishInfoTable,记录了主题对应的路由信息,其中KEY为topic, value为对应的路由信息对象TopicPublishInfo:

public class DefaultMQProducerImpl implements MQProducerInner { // 路由信息表,KEY为topic, value为对应的路由信息对象TopicPublishInfo private final ConcurrentMap<String, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>(); }

主题路由信息

TopicPublishInfo中记录了主题所在的消息队列信息、所在Broker等信息:

messageQueueList:一个MessageQueue类型的消息队列列表,MessageQueue中记录了主题名称、主题所属的Broker名称和队列ID

sendWhichQueue:计数器,选择消息队列的时候增1,以此达到轮询的目的

topicRouteData:从NameServer查询到的主题对应的路由数据,包含了队列和Broker的相关数据

public class TopicPublishInfo { // 消息队列列表 private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); // 一个计数器,每次选择消息队列的时候增1,以此达到轮询的目的 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); // 主题路由数据 private TopicRouteData topicRouteData; // ... } // 消息队列 public class MessageQueue implements Comparable<MessageQueue>, Serializable { private static final long serialVersionUID = 6191200464116433425L; private String topic; // 主题 private String brokerName; // 所属Broker名称 private int queueId; // 队列ID // ... } // 主题路由数据 public class TopicRouteData extends RemotingSerializable { private List<QueueData> queueDatas; // 队列数据列表 private List<BrokerData> brokerDatas; // Broker信息列表 // ... } // 队列数据 public class QueueData implements Comparable<QueueData> { private String brokerName; // Broker名称 private int readQueueNums; // 可读队列数量 private int writeQueueNums; // 可写队列数量 private int perm; private int topicSysFlag; } // Broker数据 public class BrokerData implements Comparable<BrokerData> { private String cluster; // 集群名称 private String brokerName; // Broker名称 private HashMap<Long, String> brokerAddrs; // Broker地址集合,KEY为Broker ID, value为Broker 地址 // ... } 折叠

查找路由信息

在查找主题路由信息的时候首先从DefaultMQProducerImpl缓存的路由表topicPublishInfoTable中根据主题查找路由信息,如果查询成功返回即可,如果未查询到,需要从NameServer中获取路由信息,如果获取失败,则使用默认的主题路由信息:

public class DefaultMQProducerImpl implements MQProducerInner { // 路由信息表,KEY为topic, value为对应的路由信息对象TopicPublishInfo private final ConcurrentMap<String, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>(); /** * 根据主题查找路由信息 * @param topic 主题 * @return */ private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { // 根据主题获取对应的主题路由信息 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); // 如果未获取到 if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); // 从NameServer中查询路由信息 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } // 如果路由信息获取成功 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { // 返回路由信息 return topicPublishInfo; } else { // 如果路由信息未获取成功,使用默认主题查询路由信息 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); // 返回路由信息 return topicPublishInfo; } } }

从NameServer获取主题路由信息

从NameServer获取主题路由信息数据是在MQClientInstance中的updateTopicRouteInfoFromNameServer方法中实现的:

  1. 判断是否使用默认的主题路由信息,如果是则获取默认的路由信息
  2. 如果不使用默认的路由信息,则从NameServer根据Topic查询取路由信息
  3. 获取到的主题路由信息被封装为TopicRouteData类型的对象返回
  4. 从topicRouteTable主题路由表中根据主题获取旧的路由信息,与新的对比,判断信息是否发生了变化,如果发送了变化需要更新brokerAddrTable中记录的数据
  5. 将新的路由信息对象加入到路由表topicRouteTable中,替换掉旧的信息

public class MQClientInstance { public boolean updateTopicRouteInfoFromNameServer(final String topic) { // 从NameServer更新路由信息 return updateTopicRouteInfoFromNameServer(topic, false, null); } /** * 从NameServer更新路由信息 * @param topic 主题 * @param isDefault 是否使用默认的主题 * @param defaultMQProducer 默认消息生产者 * @return */ public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; // 是否使用默认的路由信息 if (isDefault && defaultMQProducer != null) { // 使用默认的主题路由信息 topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), clientConfig.getMqClientApiTimeout()); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); // 设置可读队列数量 data.setWriteQueueNums(queueNums); // 设置可写队列数量 } } } else { // 从NameServer获取路由信息 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout()); } // 如果路由信息不为空 if (topicRouteData != null) { // 从路由表中获取旧的路由信息 TopicRouteData old = this.topicRouteTable.get(topic); // 判断路由信息是否发生变化 boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { // 是否需要更新路由信息 changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); } // 如果数据发生变化 if (changed) { // 克隆一份新的路由信息 TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); // 处理brokerAddrTable中的数据 for (BrokerData bd : topicRouteData.getBrokerDatas()) { // 更新brokerAddrTable中的数据 this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // ... log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); // 将新的路由信息加入到路由表 this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } } else { log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId); } } catch (MQClientException e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } } catch (RemotingException e) { log.error("updateTopicRouteInfoFromNameServer Exception", e); throw new IllegalStateException(e); } finally { this.lockNamesrv.unlock(); } } else { log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId); } } catch (InterruptedException e) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } return false; } } 折叠

发送请求

向NameServer发送请求的代码实现在MQClientAPIImpl的getTopicRouteInfoFromNameServer方法中,可以看到构建了请求命令RemotingCommand并设置请求类型为RequestCode.GET_ROUTEINFO_BY_TOPIC,表示从NameServer获取路由信息,之后通过Netty向NameServer发送请求,并解析返回结果:

public class MQClientAPIImpl { public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { // 从NameServer获取路由信息 return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true); } /** * 从NameServer获取路由信息 * @param topic * @param timeoutMillis * @param allowTopicNotExist * @return * @throws MQClientException * @throws InterruptedException * @throws RemotingTimeoutException * @throws RemotingSendRequestException * @throws RemotingConnectException */ public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis, boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); requestHeader.setTopic(topic); // 创建请求命令,请求类型为获取主题路由信息GET_ROUTEINFO_BY_TOPIC RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader); // 发送请求 RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); assert response != null; switch (response.getCode()) { // 如果主题不存在 case ResponseCode.TOPIC_NOT_EXIST: { if (allowTopicNotExist) { log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic); } break; } // 如果请求发送成功 case ResponseCode.SUCCESS: { byte[] body = response.getBody(); // 返回获取的路由信息 if (body != null) { return TopicRouteData.decode(body, TopicRouteData.class); } } default: break; } throw new MQClientException(response.getCode(), response.getRemark()); } } 折叠

选择消息队列

主题路由信息数据TopicPublishInfo获取到之后,需要从中选取一个消息队列,是通过调用MQFaultStrategy的selectOneMessageQueue方法触发的,之后会进入MQFaultStrategy的selectOneMessageQueue方法从主题路由信息中选择消息队列:

public class DefaultMQProducerImpl implements MQProducerInner { private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 选择消息队列 return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); } }

MQFaultStrategy的selectOneMessageQueue方法主要是通过调用TopicPublishInfo中的相关方法进行消息队列选择的

启用故障延迟机制

如果启用了故障延迟机制,会遍历TopicPublishInfo中存储的消息队列列表,对计数器增1,轮询选择一个消息队列,接着会判断消息队列所属的Broker是否可用,如果Broker可用返回消息队列即可。

如果选出的队列所属Broker不可用,会调用latencyFaultTolerance的pickOneAtLeast方法(下面会讲到)选择一个Broker,从tpInfo中获取此Broker可写的队列数量,如果数量大于0,调用selectOneMessageQueue()方法选择一个队列。

如果故障延迟机制未选出消息队列,依旧会调用selectOneMessageQueue()选择出一个消息队列。

未启用故障延迟机制

直接调用的selectOneMessageQueue(String lastBrokerName)方法并传入上一次使用的Broker名称进行选择。

public class MQFaultStrategy { /** * 选择消息队列 * @param tpInfo 主题路由信息 * @param lastBrokerName 上一次使用的Broker名称 * @return */ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 如果启用故障延迟机制 if (this.sendLatencyFaultEnable) { try { // 计数器增1 int index = tpInfo.getSendWhichQueue().incrementAndGet(); // 遍历TopicPublishInfo中存储的消息队列列表 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i ) { // 轮询选择一个消息队列 int pos = Math.abs(index ) % tpInfo.getMessageQueueList().size(); // 如果下标小于0,则使用0 if (pos < 0) pos = 0; // 根据下标获取消息队列 MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 判断消息队列所属的Broker是否可用,如果可用返回当前选择的消息队列 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } // 如果未获取到可用的Broker // 调用pickOneAtLeast选择一个 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); // 从tpInfo中获取Broker可写的队列数量 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); // 如果可写的队列数量大于0 if (writeQueueNums > 0) { // 选择一个消息队列 final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { // 设置消息队列所属的Broker mq.setBrokerName(notBestBroker); // 设置队列ID mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } // 返回消息队列 return mq; } else { // 移除Broker latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } // 如果故障延迟机制未选出消息队列,调用selectOneMessageQueue选择消息队列 return tpInfo.selectOneMessageQueue(); } // 根据上一次使用的BrokerName获取消息队列 return tpInfo.selectOneMessageQueue(lastBrokerName); } } 折叠

selectOneMessageQueue方法的实现

selectOneMessageQueue方法中,如果上一次选择的BrokerName为空,则调用无参的selectOneMessageQueue方法选择消息队列,也是默认的选择方式,首先对计数器增一,然后用计数器的值对messageQueueList列表的长度取余得到下标值pos,再从messageQueueList中获取pos位置的元素,以此达到轮询从messageQueueList列表中选择消息队列的目的。

如果传入的BrokerName不为空,遍历messageQueueList列表,同样对计数器增一,并对messageQueueList列表的长度取余,选取一个消息队列,不同的地方是选择消息队列之后,会判断消息队列所属的Broker是否与上一次选择的Broker名称一致,如果一致则继续循环,轮询选择下一个消息队列,也就是说,如果上一次选择了某个Broker发送消息,本次将不会再选择这个Broker,当然如果最后仍未找到满足要求的消息队列,则仍旧使用默认的选择方式,也就是调用无参的selectOneMessageQueue方法进行选择。

public class TopicPublishInfo { private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); // 消息队列列表 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); // 一个计数器,每次选择消息队列的时候增1,以此达到轮询的目的 private TopicRouteData topicRouteData; // ... public MessageQueue selectOneMessageQueue(final String lastBrokerName) { // 如果上一次选择的BrokerName为空 if (lastBrokerName == null) { // 选择消息队列 return selectOneMessageQueue(); } else { // 遍历消息队列列表 for (int i = 0; i < this.messageQueueList.size(); i ) { // 计数器增1 int index = this.sendWhichQueue.incrementAndGet(); // 对长度取余 int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; // 获取消息队列,也就是使用使用轮询的方式选择消息队列 MessageQueue mq = this.messageQueueList.get(pos); // 如果队列所属的Broker与上一次选择的不同,返回消息队列 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } // 使用默认方式选择 return selectOneMessageQueue(); } } // 选择消息队列 public MessageQueue selectOneMessageQueue() { // 自增 int index = this.sendWhichQueue.incrementAndGet(); // 对长度取余 int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; // 选择消息队列 return this.messageQueueList.get(pos); } } 折叠

故障延迟机制

回到发送消息的代码中,可以看到消息发送无论成功与否都会调用updateFaultItem方法更新失败条目:

public class DefaultMQProducerImpl implements MQProducerInner { private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); // 发送消息 private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // ... for (; times < timesTotal; times ) { try { // 开始时间 beginTimestampPrev = System.currentTimeMillis(); // ... // 发送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); // 结束时间 endTimestamp = System.currentTimeMillis(); // 更新失败条目 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); // ... } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); // 更新失败条目 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } // 省略其他catch // ... catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } // ... } // 更新FaultItem public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { // 调用MQFaultStrategy的updateFaultItem方法 this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); } } 折叠

MQFaultStrategy中有一个类型的成员变量,最终是通过调用latencyFaultTolerance的updateFaultItem方法进行更新的,并传入了三个参数:

brokerName:Broker名称

currentLatency:当前延迟时间,由上面的调用可知传入的值为发送消息的耗时时间,即消息发送结束时间 - 开始时间

duration:持续时间,根据isolation的值决定,如果为true,duration的值为30000ms也就是30s,否则与currentLatency的值一致

public class MQFaultStrategy { // 故障延迟机制 private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); /** * 更新失败条目 * @param brokerName Broker名称 * @param currentLatency 发送消息耗时:请求结束时间 - 开始时间 * @param isolation */ public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { // 计算duration,isolation为true时使用30000,否则使用发送消息的耗时时间currentLatency long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); // 更新到latencyFaultTolerance中 this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } }

LatencyFaultToleranceImpl

LatencyFaultToleranceImpl中有一个faultItemTable,记录了每个Broker对应的FaultItem,在updateFaultItem方法中首先根据Broker名称从faultItemTable获取FaultItem:

  • 如果获取为空,说明需要新增FaultItem,新建FaultItem对象,设置传入的currentLatency延迟时间(消息发送结束时间 - 开始时间)和开始时间即当前时间 notAvailableDuration,notAvailableDuration值有两种情况,值为30000毫秒或者与currentLatency的值一致
  • 如果获取不为空,说明之前已经创建过对应的FaultItem,更新FaultItem中的currentLatency延迟时间和StartTimestamp开始时间

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> { // FaultItem集合,Key为BrokerName,value为对应的FaultItem对象 private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16); /** * 更新FaultItem * @param name Broker名称 * @param currentLatency 延迟时间,也就是发送消息耗时:请求结束时间 - 开始时间 * @param notAvailableDuration 不可用的持续时间,也就是上一步中的duration */ @Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { // 获取FaultItem FaultItem old = this.faultItemTable.get(name); // 如果不存在 if (null == old) { // 新建FaultItem final FaultItem faultItem = new FaultItem(name); // 设置currentLatency延迟时间 faultItem.setCurrentLatency(currentLatency); // 设置规避故障开始时间,当前时间 不可用的持续时间,不可用的持续时间有两种情况:值为30000或者与currentLatency一致 faultItem.setStartTimestamp(System.currentTimeMillis() notAvailableDuration); // 添加到faultItemTable old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() notAvailableDuration); } } else { // 更新时间 old.setCurrentLatency(currentLatency); // 更新开始时间 old.setStartTimestamp(System.currentTimeMillis() notAvailableDuration); } } }

失败条目

FaultItem是LatencyFaultToleranceImpl的一个内部类,里面有三个变量:

  • name:Broker名称。
  • currentLatency:延迟时间,等于发送消息耗时时间:发送消息结束时间 - 开始时间。
  • startTimestamp:规避故障开始时间:新建/更新FaultItem的时间 不可用的时间notAvailableDuration,notAvailableDuration值有两种情况,值为30000毫秒或者与currentLatency的值一致。

isAvailable方法

isAvailable方法用于开启故障延迟机制时判断Broker是否可用,可用判断方式为:当前时间 - startTimestamp的值大于等于 0,如果小于0则认为不可用。

上面分析可知startTimestamp的值为新建/更新FaultItem的时间 不可用的时间,如果当前时间减去规避故障开始时间的值大于等于0,说明此Broker已经超过了设置的规避时间,可以重新被选择用于发送消息。

compareTo方法

FaultItem还实现了Comparable,重写了compareTo方法,在排序的时候使用,对比大小的规则如下:

  1. 调用isAvailable方法判断当前对象和other的值是否相等,如果相等继续第2步,如果不相等,说明两个对象一个返回true一个返回false,此时优先判断当前对象的isAvailable方法返回值是否为true:
  2. true:表示当前对象比other小,返回-1,对应当前对象为true,other对象为false的情况
  3. false:调用other的isAvailable方法判断是否为true,如果为true,返回1,表示other比较大(对应当前对象为false,other对象为true的情况),否则继续第2步根据其他条件判断。
  4. 对比currentLatency的值,如果currentLatency值小于other的,返回-1,表示当前对象比other小。
  5. 对比startTimestamp的值,如果startTimestamp值小于other的,返回-1,同样表示当前对象比other小。

总结

isAvailable方法返回true的时候表示FaultItem对象的值越小,因为true代表Broker已经过了规避故障的时间,可以重新被选择。

currentLatency的值越小表示FaultItem的值越小。currentLatency的值与Broker发送消息的耗时有关,耗时越低,值就越小。

startTimestamp值越小同样表示整个FaultItem的值也越小。startTimestamp的值与currentLatency有关(值不为默认的30000毫秒情况下),currentLatency值越小,startTimestamp的值也越小。

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> { class FaultItem implements Comparable<FaultItem> { private final String name; // Broker名称 private volatile long currentLatency; // 发送消息耗时时间:请求结束时间 - 开始时间 private volatile long startTimestamp; // 规避开始时间:新建/更新FaultItem的时间 不可用的时间notAvailableDuration @Override public int compareTo(final FaultItem other) { // 如果isAvailable不相等,说明一个为true一个为false if (this.isAvailable() != other.isAvailable()) { if (this.isAvailable()) // 如果当前对象为true return -1; // 当前对象小 if (other.isAvailable())// 如果other对象为true return 1; // other对象大 } // 对比发送消息耗时时间 if (this.currentLatency < other.currentLatency) return -1;// 当前对象小 else if (this.currentLatency > other.currentLatency) { return 1; // other对象大 } // 对比故障规避开始时间 if (this.startTimestamp < other.startTimestamp) return -1; else if (this.startTimestamp > other.startTimestamp) { return 1; } return 0; } // 用于判断Broker是否可用 public boolean isAvailable() { // 当前时间减去startTimestamp的值是否大于等于0,大于等于0表示可用 return (System.currentTimeMillis() - startTimestamp) >= 0; } } } 折叠

在选择消息队列时,如果开启故障延迟机制并且未找到合适的消息队列,会调用pickOneAtLeast方法选择一个Broker,那么是如何选择Broker的呢?

  1. 首先遍历faultItemTableMap集合,将每一个Broker对应的FaultItem加入到LinkedList链表中
  2. 调用sort方法对链表进行排序,默认是正序从小到大排序,FaultItem还实现Comparable就是为了在这里进行排序,值小的排在链表前面
  3. 计算中间值half:
  4. 如果half值小于等于0,取链表中的第一个元素
  5. 如果half值大于0,从前half个元素中轮询选择元素

由FaultItem的compareTo方法可知,currentLatency和startTimestamp的值越小,整个FaultItem的值也就越小,正序排序时越靠前,靠前表示向Broker发送消息的延迟越低,在选择Broker时优先级越高,所以如果half值小于等于0的时候,取链表中的第一个元素,half值大于0的时候,处于链表前half个的Brokerddd,延迟都是相对较低的,此时轮询从前haft个Broker中选择一个Broker。

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> { // FaultItem集合,Key为BrokerName,value为对应的FaultItem对象 private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16); @Override public String pickOneAtLeast() { final Enumeration<FaultItem> elements = this.faultItemTable.elements(); List<FaultItem> tmpList = new LinkedList<FaultItem>(); // 遍历faultItemTable while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); // 将FaultItem添加到列表中 tmpList.add(faultItem); } if (!tmpList.isEmpty()) { Collections.shuffle(tmpList); // 排序 Collections.sort(tmpList); // 计算中间数 final int half = tmpList.size() / 2; // 如果中位数小于等于0 if (half <= 0) { // 获取第一个元素 return tmpList.get(0).getName(); } else { // 对中间数取余 final int i = this.whichItemWorst.incrementAndGet() % half; return tmpList.get(i).getName(); } } return null; } }

故障规避

再回到MQFaultStrategy中选择消息队列的地方,在开启故障延迟机制的时候,选择队列后会调用LatencyFaultToleranceImpl的isAvailable方法来判断Broker是否可用,而LatencyFaultToleranceImpl的isAvailable方法又是调用Broker对应 FaultItem的isAvailable方法来判断的。

由上面的分析可知,isAvailable返回true表示Broker已经过了规避时间可以用于发送消息,返回false表示还在规避时间内,需要避免选择此Broker,所以故障延迟机制指的是在发送消息时记录每个Broker的耗时时间,如果某个Broker发生故障,但是生产者还未感知(NameServer 30s检测一次心跳,有可能Broker已经发生故障但未到检测时间,所以会有一定的延迟),用耗时时间做为一个故障规避时间(也可以是30000ms),此时消息会发送失败,在重试或者下次选择消息队列的时候,如果在规避时间内,可以在短时间内避免再次选择到此Broker,以此达到故障规避的目的。

如果某个主题所在的所有Broker都处于不可用状态,此时调用pickOneAtLeast方法尽量选择延迟时间最短、规避时间最短(排序后的失败条目中靠前的元素)的Broker作为此次发生消息的Broker。

public class MQFaultStrategy { private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); /** * 选择消息队列 * @param tpInfo 主题路由信息 * @param lastBrokerName 上一次使用的Broker名称 * @return */ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 如果启用故障延迟机制 if (this.sendLatencyFaultEnable) { try { // 计数器增1 int index = tpInfo.getSendWhichQueue().incrementAndGet(); // 遍历TopicPublishInfo中存储的消息队列列表 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i ) { // 轮询选择一个消息队列 int pos = Math.abs(index ) % tpInfo.getMessageQueueList().size(); // 如果下标小于0,则使用0 if (pos < 0) pos = 0; // 根据下标获取消息队列 MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 判断消息队列所属的Broker是否可用,如果可用返回当前选择的消息队列 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } // 如果未获取到可用的Broker // 调用pickOneAtLeast选择一个 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); // 从tpInfo中获取Broker可写的队列数量 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); // 如果可写的队列数量大于0 if (writeQueueNums > 0) { // 选择一个消息队列 final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { // 设置消息队列所属的Broker mq.setBrokerName(notBestBroker); // 设置队列ID mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } // 返回消息队列 return mq; } else { // 移除Broker latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } // 如果故障延迟机制未选出消息队列,调用selectOneMessageQueue选择消息队列 return tpInfo.selectOneMessageQueue(); } // 根据上一次使用的BrokerName获取消息队列 return tpInfo.selectOneMessageQueue(lastBrokerName); } } public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> { private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16); @Override public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { // 调用FaultItem的isAvailable方法判断是否可用 return faultItem.isAvailable(); } return true; } }

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页