
消息队列RocketMQ是一种常用的异步RPC技术. 本文以阿里云消息队列RocketMQ为例,介绍如何使用ACM对消息队列RocketMQ进行流控制.
对于消息队列RocketMQ调用,当前的限制方法是在订阅端限制电流. 有两种限流方法:
消息订户的消耗延迟流控制的基本原理是消息队列 使用场景,每当消费者向客户端添加延迟以控制消耗率时,理论上最快的并发消耗率是:
MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber
如果消息并发使用线程(ConcurrentThreadNumber)为20,并且延迟(ConsumInterval)为100 ms,则替换以上公式以获取:

200 = 1 / 0.1 * 20
从以上所述,我们可以从理论上将并发消耗控制在200以下.
与并发线程数流控制相比,消耗延迟流控制的优点是它的实现相对简单,它对消息队列RocketMQ客户端程序包的依赖性较小,并且不需要客户端提供动态调整控制并发线程接口的数量.
如果使用上述流控制方法在分布式体系结构中实现全局动态控制,则可以通过配置中心来发布流控制参数来实现.
以下详细介绍如何基于配置中心实施异步消息使用的全局动态流控制. 该示例使用阿里云的消息队列RocketMQ(消息队列)和ACM(应用程序配置管理)产品,语言是Java.

请注意,以消息队列RocketMQ为例,因为: 到目前为止,消息队列RocketMQ Consumer Client SDK当前不支持动态调整现成并发数以及动态调整消耗量的方法通过ACM的延迟可以解决RocketMQ消费者队列消息流动力学控制中的消息队列.
如图所示,管理员或应用程序通过ACM控制台发布消耗延迟配置(RCV_INTERVAL_TIME),所有消息队列RocketMQ使用者程序都订阅该配置. 从理论上讲,此配置可以在从发布到交付给所有客户端的1秒钟内完成(取决于网络延迟).

以下各章提供了一些代码示例,这些代码示例基于配置中心实现异步消息使用的全局动态流控制. 有关SDK的详细介绍,请参阅消息队列RocketMQ和官方产品文档.
在ACM上创建消耗延迟参数.


设置消耗接收延迟的全局变量.
// 初始化消息接收延时参数,单位为millisecond
static int RCV_INTERVAL_TIME = 10000;
// 初始化配置服务,控制台通过示例代码自动获取下面参数
ConfigService.init("acm.aliyun.com", /*租户ID*/"xxx", /*AK*/"xxx", /*SK*/"yyy");
// 主动获取配置
String content = ConfigService.getConfig("app.mq.qos", "DEFAULT_GROUP", 6000);
Properties p = new Properties();
try {
p.load(new StringReader(content));
RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME"));
} catch (IOException e) {
e.printStackTrace();
}
设置ACM侦听器,以确保在修改配置后立即更新RCV_INTERVAL_TIME参数.
// 初始化的时候,给配置添加,配置变更会回调通知
ConfigService.addListener("app.mq.qos", "DEFAULT_GROUP", new ConfigChangeListener() {
public void receiveConfigInfo(String configInfo) {
Properties p = new Properties();
try {
p.load(new StringReader(configInfo));
RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME"));
} catch (IOException e) {
e.printStackTrace();
}
}
});

完整的示例如下.
//以下代码可直接贴在Main()函数里
Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, "CID_consumer_group");
properties.put(PropertyKeyConst.AccessKey,"xxx");
properties.put(PropertyKeyConst.SecretKey, "yyy");
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 设置 TCP 接入域名(此处以公共云生产环境为例)
properties.put(PropertyKeyConst.ONSAddr,
"http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(/*Topic*/"topic-name", /*Tag*/null, new MessageListener()
{
public Action consume(Message message, ConsumeContext context) {
//消息队列 RocketMQ Subscribe QoS logical start,
// Each consuming process will sleep for RCV_INTERVAL_TIME seconds with 100 ms sleeping cycle.
// Within each cycle, the thread will check RCV_INTERVAL_TIME in case it's set to a smaller value.
// RCV_INTERVAL_TIME <= 0 means no sleeping.
int rcvIntervalTimeLeft = RCV_INTERVAL_TIME;
while (rcvIntervalTimeLeft > 0) {
if (rcvIntervalTimeLeft > RCV_INTERVAL_TIME) {
rcvIntervalTimeLeft = RCV_INTERVAL_TIME;
}
try {
if (rcvIntervalTimeLeft >= 100) {
rcvIntervalTimeLeft -= 100;
Thread.sleep(100);
} else {
Thread.sleep(rcvIntervalTimeLeft);
rcvIntervalTimeLeft = 0;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消息队列 RocketMQ Subscribe interval logical ends
System.out.println("Receive: " + message);
/*
* Put your business logic here.
*/
doSomething();
return Action.CommitMessage;
}
});
consumer.start();
如果队列中有无限数量的消息消息队列 使用场景,则单台计算机将运行“消费者”以进行消费,该计算机分为三个部分,分别运行约5分钟. 通过ACM配置推送,可以达到以下效果.
在单个消息队列RocketMQ消费者服务处理花费大约100毫秒的情况下,在一台计算机上20个并发线程的测试结果如下.
可以从以上结果得出: 消耗与tpm成反比,应用程序在整个过程中都不会中断,并且流控制推送结果在几秒钟内对分布式集群有效. 与预期结果一致. 独立的性能结果如下所示.
本文来自电脑杂谈,转载请注明本文网址:
http://www.pc-fly.com/a/jisuanjixue/article-207613-1.html
来一次撞一次
我看到你心情就变的棒棒的了
如在生产
就一句话