brief introduction

In the previous article, I discussed several ways of application system integration , It is found that the integration scheme for message queuing is a reasonable choice . here , Let's start with a specific message queue Activemq The basic way of communication is discussed .activemq yes JMS An implementation of the message communication specification . in general , The most common message communication modes defined in the message specification are publishing - subscribe 、 Point to point . in addition , By combining the specific application of these patterns , When we deal with some application scenarios, we also derive a request response mode . below , Let's discuss these ways one by one .

Based on the process

When we talk about specific ways , Let's take a look at using activemq The main process that needs to start the service .

according to JMS The specification of , We need to get one first JMS connection factory., Through this connection factory To create connection. On this basis, we can create session, destination, producer and consumer. So the main steps are as follows :

1. get JMS connection factory. By providing the connection information of specific environment to construct factory.

2. utilize factory structure JMS connection

3. start-up connection

4. adopt connection establish JMS session.

5. Appoint JMS destination.

6. establish JMS producer Or create JMS message And provide destination.

7. establish JMS consumer Or register JMS message listener.

8. Send and receive JMS message.

9. Close all JMS resources , Include connection, session, producer, consumer etc. .

publish-subscribe

The publish subscribe model is a bit similar to subscribing to newspapers in our daily life . At the end of the year , The post office will send out a collection of newspapers for us to choose which one to subscribe to . All the newspapers published are listed in this table , So for each of our subscribers , We can choose one or more newspapers . For example, Beijing Daily 、 Xiaoxiang Morning Post, etc . So these are the newspapers we subscribe to , It's equivalent to the publish subscribe model topic. A lot of people subscribe to newspapers , Someone might have subscribed to the same newspaper as me . that , ad locum , It's like we're in the same topic It's registered in . For a newspaper publisher , It and all the subscribers make up a 1 Relationship to many . This relationship is shown in the figure below :

p2p

p2p It's easier to understand . It's like two people on the phone , These two people are exclusive to this communication link . One side sends a message , The other party receives , It's that simple . In practical application, because there are many users using p2p The link to , Its communication scenario is shown in the figure below :

stay p2p In the scene of , The two sides who communicate with each other communicate through a way similar to queue . And the front pub-sub The difference is that topic There is one sender and multiple receivers , And in the p2p One of them queue There is only one sender and one receiver .

The code implementation of these two communication modes has a lot in common , Let's use an example to realize these two communication methods :

sender

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.util.StringTokenizer; import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Publisher {
public static final String url = "tcp://localhost:61616";// Default port , If you want to change , Can be found in apache-activemq-5.13.3\conf Medium activemq.xml Change the port number in
ConnectionFactory factory;
Connection connection;
Session session;
MessageProducer producer;
Destination[] destinations;
ComunicateMode comunicateMode = ComunicateMode.pubsub; enum ComunicateMode {
p2p, pubsub
} public Publisher(ComunicateMode mode) throws JMSException {
this.comunicateMode = mode;
factory = new ActiveMQConnectionFactory(url);// there url Or not ,java The code assigns the port to... By default 61616
connection = factory.createConnection();
try {
connection.start();
} catch (JMSException e) {
connection.close();
throw e;
}
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
} protected void setDestinations(String[] stocks) throws JMSException {
destinations = new Destination[stocks.length];
for (int i = 0; i < stocks.length; i++) {
destinations[i] = comunicateMode == ComunicateMode.pubsub ? session
.createTopic("Topic." + stocks[i]) : session
.createQueue("Queue." + stocks[i]);
}
} protected void sendMessage(String msg) throws JMSException {
for (Destination item : destinations) {
TextMessage msgMessage = session.createTextMessage(msg);
producer.send(item, msgMessage);
System.out.println(String.format(" Successful direction Topic For 【%s】 Send a message 【%s】",
item.toString(), msgMessage.getText()));
}
} protected void close() throws JMSException {
if (connection != null)
connection.close();
} public static void main(String[] args) throws JMSException,
InterruptedException, IOException {
Publisher publisher = new Publisher(ComunicateMode.p2p);// Here you can change the message transmission mode to pubsub
publisher.setDestinations(new String[] { "1", "2", "3" });
BufferedReader reader = null;
String contentString = "";
do {
System.out.println(" Please enter what you want to send (exit sign out ):");
reader = new BufferedReader(new InputStreamReader(System.in));
contentString = reader.readLine();
if (contentString.equals("exit"))
break;
publisher.sendMessage(contentString);
} while (!contentString.equals("exit"));
reader.close();
publisher.close();
}
}
 

The receiver

import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer {
public static final String url = "tcp://localhost:61616";// Default port , If you want to change , Can be found in apache-activemq-5.13.3\conf Medium activemq.xml Change the port number in
ConnectionFactory factory;
Connection connection;
Session session;
MessageConsumer[] consumers;
ComunicateMode comunicateMode = ComunicateMode.pubsub; enum ComunicateMode {
p2p, pubsub
} public Consumer(ComunicateMode mode, String[] destinationNames)
throws JMSException {
this.comunicateMode = mode;
factory = new ActiveMQConnectionFactory(url);// there url Or not ,java The code assigns the port to... By default 61616
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumers = new MessageConsumer[destinationNames.length];
for (int i = 0; i < destinationNames.length; i++) {
Destination destination = comunicateMode == ComunicateMode.pubsub ? session
.createTopic("Topic." + destinationNames[i]) : session
.createQueue("Queue." + destinationNames[i]);
consumers[i] = session.createConsumer(destination);
consumers[i].setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println(String.format(" Received a message 【%s】",
((TextMessage) message).getText()));
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
} public void close() throws JMSException {
if (connection != null)
connection.close();
} public static void main(String[] args) throws JMSException, IOException {
Consumer consumer = new Consumer(ComunicateMode.p2p,
new String[] { "2" });// Here you can change the message transmission mode to pubsub
System.in.read();
consumer.close();
} }
 

request-response

Compared with the previous two ways ,request-response It's a common way to communicate , But it's not a default mode . In the previous two modes, one is responsible for sending messages and the other is responsible for processing . And many of our practical applications are equivalent to a one response one answer process , Both parties need to be able to send messages to each other . So I asked - It's also important to respond to this kind of communication . It's also widely used .

request - The answer is not JMS A communication mode provided by the specification system by default , It is realized by using a little skill on the basis of the existing communication mode . Here's a typical request - The interactive process of response mode :

stay JMS Inside , If you want to implement the request / The way to respond , You can use JMSReplyTo and JMSCorrelationID Message header to associate the two sides of the communication . in addition ,QueueRequestor and TopicRequestor Be able to support simple requests / Response process .

Now , If we want to achieve such a process , When sending a request message and waiting for the result to be returned client The flow of the end is as follows :

client The client creates a temporary queue and specifies in the sent message the destination as well as correlationID. So what's going on with the message server After receiving this message, the client will know who to send it to .Server The general flow of the end is as follows :

Here we use server End registration MessageListener, By setting the return information CorrelationID and JMSReplyTo Return the information to .

The above is the general program structure of sending and receiving messages . The specific implementation code is as follows :

Client:

The code here not only initializes the parameters in the constructor, but also sets two destination, One is to send a message destination, stay session.createProducer(adminQueue); This sentence sets . The other is the message you want to receive destination, adopt Destination tempDest = session.createTemporaryQueue(); responseConsumer = session.createConsumer(tempDest); These two sentences specify the destination to receive the message . Here's a temporary queue to use . After specifying the communication queue for the return message , We need to inform server The end knows which queue to send the returned message to . therefore txtMessage.setJMSReplyTo(tempDest); This part is specified , meanwhile txtMessage.setJMSCorrelationID(correlationId); The main purpose of this method is to ensure that every time a request is sent back server The client can know which request it is . Here, a request and a response are equivalent to the same sequence number .

meanwhile , because client After sending a message, the end has to receive server The message returned by the client , So it also implements a message receiver The function of . Here is the implementation MessageListener How to interface :

public void onMessage(Message message) {
String messageText = null;
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
messageText = textMessage.getText();
System.out.println("messageText = " + messageText);
}
} catch (JMSException e) {
//Handle the exception appropriately
}
}

Server:

here server The process and client The ends are opposite , It receives messages first , After receiving the message, according to the JMSCorelationID To send the returned message :

front , stay replyProducer.send() In the method ,message.getJMSReplyTo() You get the message to send back destination.

in addition , Set these to send back information replyProducer This information is mainly implemented in constructor related methods :

On the whole , The whole interaction process is not complicated , It's just complicated . For the request / In terms of the way you respond , The typical interaction process is Client The end is set to send the request normally Queue Also set a temporary Queue. At the same time message It specifies the message to be returned destination as well as CorelationID, It's like the receipt in a letter . According to this information, people will know how to reply to your letter . about Server On the other hand, we need to create an extra producer, Reuse... In the way you process received messages producer Send the message back . This series of processes looks like http The agreement asks for - The way to respond , It's all question and answer .

Some applications and improvements

Looking back at the three basic ways of communication , We will find that , They all have something in common , For example, all of them need to be initialized ConnectionFactory, Connection, Session etc. . Close these resources after using them . If every communication end that implements it writes like this , It's a simple repetition . It's totally unnecessary from an engineering point of view . that , What can we do to reduce this repetition ?

A simple way is to encapsulate the creation and destruction of these objects through factory methods , Then simply get them by calling factory methods . in addition , Since the basic process is to create resources at the beginning and destroy them at the end , We can also use Template Method Mode of thinking . By inheriting an abstract class , Encapsulation of resources is provided in abstract classes . All inherited classes only need to implement the method of how to use these resources .Spring In the middle of the JMSTemplate This kind of encapsulation of similar ideas is provided . Specific implementation can refer to This article .

summary

activemq By default pub-sub, p2p These two ways of communication . At the same time, it also provides some suggestions for request-response The support of the way . actually , not only activemq, For all other implementations JMS Standard products can provide similar functions . Every way here is not too complicated , The main thing is that the steps of creating and managing resources are cumbersome .

activemq The use of the two basic ways of communication and summary of more related articles

  1. activemq Summary of several basic communication methods

    brief introduction In the previous article, I discussed several ways of application system integration , It is found that the integration scheme for message queuing is a reasonable choice . here , Let's start with a specific message queue Activemq The basic way of communication is discussed .activem ...

  2. SpringBoot Use JMS(activeMQ) Two ways Queue messages 、 subscribe / Release

    Just recently, my colleague asked me activemq I've just come into contact with you, so I can't tell you , It happened to be useful in the project some time ago , So I sort it out a little bit , For use only 1. download ActiveMQ Address :http://activemq.apache.org/ ...

  3. ActiveMQ The two message modes of , The theme 、 queue

    1. The development process is as follows : 2. Queue mode Queue If producers produce 100 Bar message , So if the two consumptions are at the same time , Will work together to take over this 100 Bar message . That's what every consumer receives 50 I'll deal with it . 3. Theme mode topic If ...

  4. activemq Three ways of communication

    One . Installation and startup 1. Download and install activemq, Download address :http://activemq.apache.org/download.html. 2. After installation , Go to the directory where it's located bin Below directory , According to the number of system bits ...

  5. ActiveMQ Two ways of messaging

    1. What is? ActiveMQ? ActiveMQ yes apache Open source , An intermediate plug-in for message delivery , You can talk to spring Integrate , Is the most popular open source message bus ,ActiveMQ Is a full support JMS1.1 and J ...

  6. Message queue : Quick start ActiveMQ Message queue JMS Way to use ( Two modes :Topic and Queue Message push and subscription )

    1. Realization function Hope to use a set of API, Realize message sending and receiving function in two modes , Convenient for business program call 1. send out Topic 2. send out Queue 3. receive Topic 4. receive Queue 2. Interface design Design public key according to function ...

  7. turn :nginx and php-fpm There are two ways to communicate with each other

    Original address :https://segmentfault.com/q/1010000004854045 Nginx and PHP-FPM There are two ways to communicate between processes , One is TCP, One is UNIX Domain Sock ...

  8. ActiveMQ Two modes PTP and PUB/SUB&lt; turn &gt;

    1.PTP Model PTP(Point-to-Point) The model is based on queues (Queue) Of , about PTP For the message model , Its message destination is a message queue (Queue), Every time a message producer sends a message, it always puts it into the message queue , ...

  9. php-fpm and nginx There are two ways to communicate with each other

    stay linux in ,nginx The server and php-fpm Can pass tcp socket and unix socket Two ways to achieve . Let's move on to :https://blog.csdn.net/qq62 ...

Random recommendation

  1. Auty Automated testing framework Part 6 —— Garbage collection 、 add to suite Support

    [ This article is from the blog Garden of tianwaiguiyun ] Garbage collection Add script recovery mechanism , Because the framework generates some code automatically , If there is a problem after the code is generated and the code is not deleted normally , It will cause code garbage , stay auty Catalog add recovery.py file : ...

  2. 【Android Rookie learning 】 Environmental construction - modify AVD Path

    change avd The default path

  3. sql server Use sql Sentence upload Excel To the database

    USE pro GO SELECT  * INTO   temp_budget_price@201704170950 FROM    OPENDATASOURCE('Microsoft.Jet.OLE ...

  4. Jmeter The interface test + Pressure test

    Link recommendation :https://blog.csdn.net/github_27109687/article/details/71968662

  5. MySQL Advanced knowledge ( Twelve )—— Global query log

    Preface : The global query log is used to save all sql Execution records , This function is mainly used in test environment , Never turn it on in a production environment . 1. How to open #1. adopt my.cnf Configure to enable this function . notes : Yes my.cnf After the file is configured , Need to restart my ...

  6. zombodb low-level api operation

    zombodb low-level api Allow direct access from zombodb Index insert.delete file , At the same time mvcc Characteristics of , But the data is not stored in pg in , But there are also data risks , We need to pay attention to es ...

  7. Python local error

    Originally in python A variable with the same global name as a function of , If you change the value of a variable, it becomes a local variable , Before modification, the reference to the variable will naturally have the error of undefined , If you are sure to refer to global variables , And to modify it , Have to add global Turn off ...

  8. 20-Python3 data structure

    2018-11-30 15:45:55 ''' list ''' # list.count(x): return x The number of times in the list a1 = [1,123.25,333,333,456.5] print(a1. ...

  9. spring Aop Concept

    Section oriented programming (AOP) Make up for object-oriented programming by providing another way to think about program structure (OOP) Deficiency . stay OOP The key unit of modularization in is class (classes), And in the AOP The modular unit in is the section . Facets can modularize concerns ...

  10. DOS Save system logs

    1. Export system log , Name it after the current date @echo offset nowDate=%date%set tmp=%nowDate:~0,-3%set file=%tmp:/=-% system log .evtx echo ...