1、RocketMQ简介

  • RocketMQ 是阿里巴巴开源的分布式消息中间件。支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。

  • RocketMQ是低延迟、高并发、高可用、高可靠的分布式消息中间件。该产品最初由阿里巴巴自研并捐赠给 Apache 基金会,目前是由Apache基金会维护。

2、RocketMQ功能

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

  • Name Server:充当路由消息的提供者。是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。在消息队列 RocketMQ 中提供命名服务,更新和发现 Broker 服务。

  • Broker:消息中转角色,负责存储消息,转发消息。可以理解为消息队列服务器,提供了消息的接收、存储、拉取和转发服务。broker是RocketMQ的核心,它是不能挂的,所以需要保证broker的高可用。

  • 生产者:与 Name Server 集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长链接,且定时向 Master Broker 发送心跳。

  • 消费者:与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从 Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。

  • Topic:表示消息的第一级类型,比如一个电商系统的消息可以分为:交易消息、物流消息… 一条消息必须有一个Topic。

  • Queue:主题被划分为一个或多个子主题,称为“message queues”。一个topic下,我们可以设置多个queue(消息队列)。当我们发送消息时,需要要指定该消息的topic。RocketMQ会轮询该topic下的所有队列,将消息发送出去。

  • Tags:Tags是Topic下的次级消息类型/二级类型(注:Tags也支持TagA || TagB这样的表达式),可以在同一个Topic下基于Tags进行消息过滤。Tags的过滤需要经过两次比对,首先会在Broker端通过Tag hashcode进行一次比对过滤,匹配成功传到consumer端后再对具体Tags进行比对,以防止Tag hashcode重复的情况。

3、RocketMQ下载及安装

1)、先决条件

  • 建议使用Linux / Unix / Mac;
  • 64位JDK 1.8+;
  • Maven 3.2.x+;
  • Git;
  • 适用于Broker服务器的4g+可用磁盘

附:Maven安装及配置方法
下载安装包:apache-maven-3.6.0-bin.tar.gz,并解压至任意目录,配置环境变量。

> vi /etc/profie
export M2_HOME={{maven解压目录}}/apache-maven-3.6.0-bin
export PATH=$M2_HOME/bin:$PATH
> source /etc/profile

Maven配置阿里云镜像源,比国外镜像源速度快N倍。
在maven安装目录找到conf下的setting.xml文件,在中添加以下镜像!

	<mirror>
		<id>nexus-aliyun</id>
		<mirrorOf>*</mirrorOf>
		<name>Nexus aliyun</name>
		<url>http://maven.aliyun.com/nexus/content/groups/public</url>
	</mirror>

如下图所示:
image.png

2)、RocketMQ下载及构建

点击此处下载RocketMQ4.6.0源码发行版,并进行构建工作,如需要下载其他版本可使用以下网址:http://rocketmq.apache.org/dowloading/releases/

> unzip rocketmq-all-4.6.0-source-release.zip
> cd rocketmq-all-4.6.0/
> mvn -Prelease-all -DskipTests clean install -U
> cd distribution/target/rocketmq-4.6.0

3)、启动Name Server

> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log

4)、启动Broker

> nohup sh bin/mqbroker -n localhost:9876 &
> tail -f ~/logs/rocketmqlogs/broker.log

5)、启动脚本的内存调整

启动脚本可以配置RocketMQ整体内存大小,有两个启动脚本,runbroker.sh与runserver.sh,以及tools.sh脚本。

将默认配置:

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g

修改为:

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m

注:以上内存配置可以根据机器实际情况进行调整,且runbroker.sh与runserver.sh要同时修改。

6)、发送和接收消息

在发送/接收消息之前,我们需要告诉客户端名称服务器的位置。RocketMQ提供了多种方法来实现这一目标。为简单起见,我们使用环境变量 NAMESRV_ADDR

 > export NAMESRV_ADDR=localhost:9876
 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...

 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...

7)、关闭服务器

> sh bin/mqshutdown broker     ###关闭Broker
The mqbroker(1809) is running...
Send shutdown request to mqbroker(1809) OK
> sh bin/mqshutdown namesrv   ###关闭Name Server
The mqnamesrv(1730) is running...
Send shutdown request to mqnamesrv(1730) OK

8)、常用命令

除了上面几个命令之外,还有如下常用的命令,ip请以实际为准:

  • 查看集群情况: ./mqadmin clusterList -n 127.0.0.1:9876
  • 查看 broker 状态: ./mqadmin brokerStatus -n 127.0.0.1:9876 -b 172.20.1.138:10911
  • 查看 topic 列表: ./mqadmin topicList -n 127.0.0.1:9876
  • 查看 topic 状态: ./mqadmin topicStatus -n 127.0.0.1:9876 -t MyTopic (换成想查询的 topic)
  • 查看 topic 路由: ./mqadmin topicRoute -n 127.0.0.1:9876 -t MyTopic

9)、执行mqadmin命令出现报错

如执行./mqadmin topicList -n localhost:9876 时出现如下错误:

[root@localhost bin]# ./mqadmin topicList -n localhost:9876
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
org.apache.rocketmq.tools.command.SubCommandException: TopicListSubCommand command failed
        at org.apache.rocketmq.tools.command.topic.TopicListSubCommand.execute(TopicListSubCommand.java:113)
        at org.apache.rocketmq.tools.command.MQAdminStartup.main0(MQAdminStartup.java:139)
        at org.apache.rocketmq.tools.command.MQAdminStartup.main(MQAdminStartup.java:90)
Caused by: org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to calculate a request signature. error=[10015:signature-failed] unable to calculate a request signature. error=Algorithm HmacSHA1 not available
        at org.apache.rocketmq.acl.common.AclSigner.signAndBase64Encode(AclSigner.java:84)
        at org.apache.rocketmq.acl.common.AclSigner.calSignature(AclSigner.java:73)
        at org.apache.rocketmq.acl.common.AclSigner.calSignature(AclSigner.java:68)
        at org.apache.rocketmq.acl.common.AclUtils.calSignature(AclUtils.java:69)
        at org.apache.rocketmq.acl.common.AclClientRPCHook.doBeforeRequest(AclClientRPCHook.java:44)
        at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.doBeforeRpcHooks(NettyRemotingAbstract.java:172)
        at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:368)
        at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicListFromNameServer(MQClientAPIImpl.java:1396)
        at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.fetchAllTopicList(DefaultMQAdminExtImpl.java:253)
        at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.fetchAllTopicList(DefaultMQAdminExt.java:218)
        at org.apache.rocketmq.tools.command.topic.TopicListSubCommand.execute(TopicListSubCommand.java:107)
        ... 2 more
Caused by: org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to calculate a request signature. error=Algorithm HmacSHA1 not available
        at org.apache.rocketmq.acl.common.AclSigner.sign(AclSigner.java:63)
        at org.apache.rocketmq.acl.common.AclSigner.signAndBase64Encode(AclSigner.java:79)
        ... 12 more
Caused by: java.security.NoSuchAlgorithmException: Algorithm HmacSHA1 not available
        at javax.crypto.Mac.getInstance(Mac.java:181)
        at org.apache.rocketmq.acl.common.AclSigner.sign(AclSigner.java:57)
        ... 13 more

解决方法是:
在vi /data/rocketmq-all-4.6.0/bin/tools.sh,在JAVA_OPT配置中,在-Djava.ext.dirs这一行的后面添加jdk的ext的路径/usr/java/jdk1.8.0_221-amd64/jre/lib/ext。

将原配置:

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

修改为:

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext:/usr/java/jdk1.8.0_221-amd64/jre/lib/ext"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

添加以后建议重启namesrv与broker服务之后,再执行mqadmin命令就没有报错信息了。

4、RocketMQ可视化平台
1)、Github下载rocketmq-externals项目
https://github.com/apache/rocketmq-externals

2)、配置
解压后进入rocketmq-console文件夹,进入到rocketmq-console/src/main/resources/下,打开application.properties文件,进行配置的修改;

rocketmq.config.namesrvAddr=ip:port   ###如有多个nameserver分号相隔

rocketmq.config.isVIPChannel=false

rocketmq.config.dataPath=/data/rocketmq-console/data   ###指定rocketmq数据存储路径

image.png

3)、编译及运行
进入/rocketmq-externals/rocketmq-console目录,执行编译打包命令:

mvn clean package -Dmaven.test.skip=true

然后cd target/目录,执行启动命令:

nohup java -jar rocketmq-console-ng-1.0.1.jar &

输入:http://ip:8080/ 访问进入rocketmq-externals控制台,至此所有配置已完成。
image.png

上一篇 下一篇