《RocketMQ技术内幕》 笔记2 消息发送
Published on: | Views: 75Message
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
private String topic;
private int flag;
private Map<String, String> properties; //属性组,可以增加自定义属性(putUserProperty)
private byte[] body; //消息体
private String transactionId;
}
DefaultMQProducer
public class DefaultMQProducer extends ClientConfig implements MQProducer {
private String producerGroup; //生产者组, 主要用于事务消息回查事务状态
private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC; //默认topic
private volatile int defaultTopicQueueNums = 4; //默认主题在每个broker中的队列数
private int sendMsgTimeout = 3000; //发送超时时间
private int compressMsgBodyOverHowmuch = 1024 * 4; //消息体压缩阈值
private int retryTimesWhenSendFailed = 2; //同步发送失败后重试次数
private int retryTimesWhenSendAsyncFailed = 2;//异步发送失败后重试次数
private boolean retryAnotherBrokerWhenNotStoreOK = false; // 重试另外一个broker时是否不等待存储成功就返回
private int maxMessageSize = 1024 * 1024 * 4; // 4M 允许的最大消息长度
}
MQClientInstance
生产者和消费者均通过MQClientInstance和Broker,NameServer通信 MQClientInstance 是由clientId 标识的 clientId=Ip@instanceName[@unitName] instanceName被默认设置为当前进程id 所以,默认情况下,一个进程中的诸多生产者和消费者共享一个MQClientInstance.
消息路由
public class TopicPublishInfo {
private boolean orderTopic = false; //是否为顺序消息(?)
private boolean haveTopicRouterInfo = false; // 是否有路由消息
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); //消费队列
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); // 队列选择自增器
private TopicRouteData topicRouteData; //topic路由信息
}
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
private String topic;
private String brokerName;
private int queueId;
}
生产者每30秒去获取一次路由消息,并缓存在topicPublishInfoTable中。
负载均衡
每个topic都拥有一个sendWhichQueue变量,每次取队列后这个数值会增1,所以这里采用了轮询算法。
失败重试
消息发送失败后,会重试,重试次数是通过retryTimesWhenSendFailed和retryTimesWhenSendAsyncFailed控制,一般情况下,只有RemotingException、MQClientException、MQBrokerException部分异常情况下才会重试。 在同步发送情况下,如果开启了retryAnotherBrokerWhenNotStoreOK,那么没有异常但返回不成功时也会重试。
默认情况下,重试时,会选择下一个队列, 但是开启sendLatencyFaultEnable后,会跳过失败的broker, 直接找其他broker的队列。