JMS-API-消息发布与接收

​ 队列的发送和接收相对于topic更简单一些,就只拿topic做简单示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**************************创建发布者**********************************/
// 创建连接工厂
ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://47.105.172.68:61616");
// 获取连接
Connection connection = connectionFactory.createConnection();
// 创建session会话
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建topic
Destination destination = session.createTopic("MyTopic1");
// 创建生产者
MessageProducer producer = session.createProducer(destination);
// 设置消息持久化,也可以在send方法中为每一个消息设置投递模式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 持久订阅,需要在配置后启动
connection.start();
// 发送消息
TextMessage textMessage = session.createTextMessage("topic message111");
producer.send(textMessage);
session.commit();
session.close();
connection.close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**************************创建订阅者**********************************/
// 一定要先运行一次,等于向中间件注册这个消费者
ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://47.105.172.68:61616");
Connection connection = connectionFactory.createConnection();
// 设置客户端ID, 用来识别消费者
connection.setClientID("c1");
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("MyTopic1");
// 创建持久订阅
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "t1");
// 设置好之后,再start
connection.start();
// 接收消息
Message message = subscriber.receive();
while (message != null) {
TextMessage textMessage = (TextMessage)message;
System.out.println("接收到的消息:" + textMessage.getText());
message = subscriber.receive(1000L);
}

session.commit();
session.close();
connection.close();

​ 以上是一个对于持久订阅的topic的示例,需要注意的是,订阅者要先执行一次之后才能收到消费者处于非激活状态下的消息,因为消费者必须要告知JMS Provider自己的ID之后,消息中间件才能保存在这个客户端离线的时候,所有发送到客户端订阅的topic上的消息,并在该客户端上线之后将这些消息发给它

​ 在向MyTopic1发送之后,在管理页面可以看到,topic中有一条消息

​ 在Subscribiers中有一个订阅者,并且处于离线状态

​ 当c1上线后就可以消费这条消息,但是出队的消息数量不会增加,因为其他的订阅者也可以订阅这个topic,保证同一个订阅者不会收到相同的消息就可以了

​ 对于持久性消息而言,可靠性是优先考虑的因素,确保了消息服务在向消费者传送消息之前不会丢失,如果消息服务由于某种原因失败了,它会恢复消息并将此消息发送给消费者,这样增加了网络开销,但确保了可靠性