队列的发送和接收相对于topic更简单一些,就只拿topic做简单示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://47.105.172.68:61616"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("MyTopic1"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); TextMessage textMessage = session.createTextMessage("topic message111"); producer.send(textMessage); session.commit(); session.close(); connection.close();
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://47.105.172.68:61616"); Connection connection = connectionFactory.createConnection(); connection.setClientID("c1"); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("MyTopic1"); TopicSubscriber subscriber = session.createDurableSubscriber(topic, "t1");
connection.start(); Message message = subscriber.receive(); while (message != null) { TextMessage textMessage = (TextMessage)message; System.out.println("接收到的消息:" + textMessage.getText()); message = subscriber.receive(1000L); }
session.commit(); session.close(); connection.close();
|
以上是一个对于持久订阅的topic的示例,需要注意的是,订阅者要先执行一次之后才能收到消费者处于非激活状态下的消息,因为消费者必须要告知JMS Provider自己的ID之后,消息中间件才能保存在这个客户端离线的时候,所有发送到客户端订阅的topic上的消息,并在该客户端上线之后将这些消息发给它
在向MyTopic1发送之后,在管理页面可以看到,topic中有一条消息

在Subscribiers中有一个订阅者,并且处于离线状态

当c1上线后就可以消费这条消息,但是出队的消息数量不会增加,因为其他的订阅者也可以订阅这个topic,保证同一个订阅者不会收到相同的消息就可以了
对于持久性消息而言,可靠性是优先考虑的因素,确保了消息服务在向消费者传送消息之前不会丢失,如果消息服务由于某种原因失败了,它会恢复消息并将此消息发送给消费者,这样增加了网络开销,但确保了可靠性