mq is the abbreviation of message queue. At present, a variety of message queues are available, including RabbitMQ, Kafka, etc. they have their own characteristics and can be used in combination with specific project requirements.
ZeroMQ is abbreviated as Zmq, or 0mq. The core engine is written in c + +. It is a lightweight message communication library and a characteristic message communication middleware formed on the basis of extending the traditional standard socket interface.
Zmq provides the abstraction of asynchronous message queue, has a variety of message communication modes, can realize message filtering, and can seamlessly connect a variety of transmission protocols.
In short, when using socket s, you need to explicitly establish connections, destroy connections, select protocols (TCP/UDP) and handle errors, and ZMQ shields these details to make network programming easier.
ZeroMQ is a new layer in network communication, between the application layer and the transport layer (divided according to TCP/IP). It is a scalable layer that can run in parallel and distributed among distributed systems.
In summary, the following is an excerpt from the 100 word description:
ZMQ (? MQ, ZeroMQ, 0MQ) looks like a set of embedded network link libraries, but works more like a parallel framework. It provides sockets that can transmit messages in a variety of protocols, such as inter thread, inter process, TCP, broadcast, etc. you can use sockets to build many to many connection modes, such as fan out, publish subscribe, task distribution, request reply, etc. ZMQ is fast enough to be competent for cluster application products. Its asynchronous I/O mechanism enables you to build multi-core applications and complete asynchronous message processing tasks. ZMQ has multilingual support and can run on almost all operating systems.
ZMQ is a product of iMatix company and is released under the LGPLv3 open source protocol.
ZMQ has a variety of modes to use, including request/reply, publish/subscribe and push/pull.
1. Request reply
Connect a group of servers and a group of clients for remote procedure call or task distribution.
Here, a simple hello world program is implemented using the request response mode.
Like ordinary socket communication, a client and a server are required. The client sends hello, and the server responds to world after receiving hello, as shown in the figure.
Server code:
// // Hello World server in C++ // Binds REP socket to tcp://*:5555 // Expects "Hello" from client, replies with "World" // #include <zmq.hpp> #include <string> #include <iostream> #ifndef _WIN32 #include <unistd.h> #else #include <windows.h> #define sleep(n) Sleep(n) #endif int main () { // Prepare our context and socket zmq::context_t context (1); zmq::socket_t socket (context, ZMQ_REP); socket.bind ("tcp://*:5555"); while (true) { zmq::message_t request; // Wait for next request from client socket.recv (&request); std::cout << "Received Hello" << std::endl; // Do some 'work' sleep(1); // Send reply back to client zmq::message_t reply (5); memcpy (reply.data (), "World", 5); socket.send (reply); } return 0; }
Client code:
// // Hello World client in C++ // Connects REQ socket to tcp://localhost:5555 // Sends "Hello" to server, expects "World" back // #include <zmq.hpp> #include <string> #include <iostream> int main () { // Prepare our context and socket zmq::context_t context (1); zmq::socket_t socket (context, ZMQ_REQ); std::cout << "Connecting to hello world server..." << std::endl; socket.connect ("tcp://localhost:5555"); // Do 10 requests, waiting each time for a response for (int request_nbr = 0; request_nbr != 10; request_nbr++) { zmq::message_t request (5); memcpy (request.data (), "Hello", 5); std::cout << "Sending Hello " << request_nbr << "..." << std::endl; socket.send (request); // Get the reply. zmq::message_t reply; socket.recv (&reply); std::cout << "Received World " << request_nbr << std::endl; } return 0; }
Sending and receiving messages using REQ-REP sockets need to follow certain rules. The client uses ZMQ first_ Send() sends a message, and then zmq_recv() receives and loops like this. If this order is disturbed (e.g. sending twice in a row), an error will be reported. Similarly, the server must receive first and then send. (like the http request mode, request response, one response for each request)
In theory, you can connect thousands of clients to this server. At the same time, the connection is OK, and the program will still work well. You can try opening the client first and then the server. You can see that the program will still work normally.
For strings, it should be noted that ZMQ does not care about the content of the sent message, as long as it knows the number of bytes it contains. This means that the ZMQ string has a length and is transmitted without a terminator. When receiving in c language, you should pay attention to applying for storage space one byte more than the length and setting the end character '/ 0', otherwise you may get strange results when printing strings.
The following is an example of using c language to process strings:
// Receive the string from ZMQ socket and convert it to C language string static char * s_recv (void *socket) { zmq_msg_t message; zmq_msg_init (&message); zmq_recv (socket, &message, 0); int size = zmq_msg_size (&message); char *string = malloc (size + 1);//release it after use memcpy (string, zmq_msg_data (&message), size); zmq_msg_close (&message); string [size] = 0; return (string); }
2. publish/subscribe
Connect a group of publishers and a group of subscribers for data distribution.
Publish / subscribe mode realizes one-way data distribution, and the server sends events to a group of clients.
Let's look at an example of weather information release, including zip code, temperature and relative humidity. We generate these random information to simulate what the weather station does.
Server code:
// Weather update server in C++ // Binds PUB socket to tcp://*:5556 // Publishes random weather updates // // Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com> // #include <zmq.hpp> #include <stdio.h> #include <stdlib.h> #include <time.h> #if (defined (WIN32)) #include <zhelpers.hpp> #endif #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0)) int main () { // Prepare our context and publisher zmq::context_t context (1); zmq::socket_t publisher (context, ZMQ_PUB); publisher.bind("tcp://*:5556"); publisher.bind("ipc://weather.ipc"); // Not usable on Windows. // Initialize random number generator srandom ((unsigned) time (NULL)); while (1) { int zipcode, temperature, relhumidity; // Get values that will fool the boss zipcode = within (100000); temperature = within (215) - 80; relhumidity = within (50) + 10; // Send message to all subscribers zmq::message_t message(20); snprintf ((char *) message.data(), 20 , "%05d %d %d", zipcode, temperature, relhumidity); publisher.send(message); } return 0; }
The server updates the data in an infinite cycle and continuously releases the weather information. Note that it doesn't care if someone receives data, just send it. If no one receives the data, it simply discards the message.
Client code:
// Weather update client in C++ // Connects SUB socket to tcp://localhost:5556 // Collects weather updates and finds avg temp in zipcode // // Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com> #include <zmq.hpp> #include <iostream> #include <sstream> int main (int argc, char *argv[]) { zmq::context_t context (1); // Socket to talk to server std::cout << "Collecting updates from weather server...\n" << std::endl; zmq::socket_t subscriber (context, ZMQ_SUB); subscriber.connect("tcp://localhost:5556"); // Subscribe to zipcode, default is NYC, 10001 const char *filter = (argc > 1)? argv [1]: "10001 "; subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter)); // Process 100 updates int update_nbr; long total_temp = 0; for (update_nbr = 0; update_nbr < 100; update_nbr++) { zmq::message_t update; int zipcode, temperature, relhumidity; subscriber.recv(&update); std::istringstream iss(static_cast<char*>(update.data())); iss >> zipcode >> temperature >> relhumidity ; total_temp += temperature; } std::cout << "Average temperature for zipcode '"<< filter <<"' was "<<(int) (total_temp / update_nbr) <<"F" << std::endl; return 0; }
The client can freely subscribe to the data of interest according to its own needs. It will accept the messages of the publisher and only process the information marked by a specific zip code. For example, the zip code of New York is 10001.
It should be noted that when using the SUB socket, you must use the setsockopt() method to set the subscription content. If you don't set up your subscription, you won't receive any messages. The subscription information can be any string and can be set multiple times. As long as the message satisfies one of the subscription information, the SUB socket will receive it. Subscribers can choose not to receive certain types of messages through ZMQ_ Implemented by the setsockopt() method.
The PUB-SUB socket combination is asynchronous. The client uses ZMQ in a loop body_ Recv() receives a message. If it sends a message to the SUB socket, an error will be reported; Similarly, the server can continuously use zmq_send() sends a message, but ZMQ cannot be used on a PUB socket_ recv().
One more thing to note about PUB-SUB socket: you can't know when SUB started receiving messages. Even if you open the SUB socket first and then open the PUB to send messages, the SUB will still lose some messages because it takes some time to establish a connection. Very few, but not zero.
As for how to synchronize publishers and subscribers, publishers can start sending messages only when subscribers are ready. A simple way to synchronize PUB and SUB is to delay PUB for a period of time before sending messages. This method is not recommended in real programming because it is too fragile and difficult to control.
Another way of synchronization is to think that the message flow of the publisher is endless, so it doesn't matter if you lose some of the previous information. This is what our weather information client does.
Notes:
- Subscribers can connect multiple publishers and receive messages in turn;
- If the publisher has no subscribers connected to it, the messages it sends will be discarded directly;
- If you use TCP, messages will accumulate at the publisher when the subscriber processing speed is too slow. You can use thresholds (HWM) to protect publishers.
- From zeromq v3 Starting from X, when using the (tcp:// or ipc: / /) connection protocol, the filtering of messages is at the publisher. When using epgm: / /, the filtering is at the subscriber. In zeromq v2 In X, all messages are filtered at the subscriber. That is, the publisher will send all messages to the subscriber, and the subscriber will discard the unsubscribed messages.
3. push/pull
Assembling multiple nodes in the form of fan in or fan out can generate multiple steps or loops for building a parallel processing architecture.
zmq's parallel processing model can perform supercomputing:
- The task distributor generates a large number of tasks that can be calculated in parallel
- A group of workers can handle these tasks. In reality, workers may be scattered in different computers and use GPU (image processing unit) for complex calculation
- The result collector will receive the processing results of all worker s at the end and summarize them
This is zmq's push-pull mode.
The following is the code of the task distributor, which will generate 100 tasks. The task content is to delay the received worker by several milliseconds.
// Task ventilator in C++ // Binds PUSH socket to tcp://localhost:5557 // Sends batch of tasks to workers via that socket // // Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com> // #include <zmq.hpp> #include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <iostream> #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0)) int main (int argc, char *argv[]) { zmq::context_t context (1); // Socket to send messages on zmq::socket_t sender(context, ZMQ_PUSH); sender.bind("tcp://*:5557"); std::cout << "Press Enter when the workers are ready: " << std::endl; getchar (); std::cout << "Sending tasks to workers...\n" << std::endl; // The first message is "0" and signals start of batch zmq::socket_t sink(context, ZMQ_PUSH); sink.connect("tcp://localhost:5558"); zmq::message_t message(2); memcpy(message.data(), "0", 1); sink.send(message); // Initialize random number generator srandom ((unsigned) time (NULL)); // Send 100 tasks int task_nbr; int total_msec = 0; // Total expected cost in msecs for (task_nbr = 0; task_nbr < 100; task_nbr++) { int workload; // Random workload from 1 to 100msecs workload = within (100) + 1; total_msec += workload; message.rebuild(10); memset(message.data(), '\0', 10); sprintf ((char *) message.data(), "%d", workload); sender.send(message); } std::cout << "Total expected cost: " << total_msec << " msec" << std::endl; sleep (1); // Give 0MQ time to deliver return 0; }
The following is the code of the worker, which accepts the information, delays the specified number of milliseconds, and sends the signal of completion of execution:
// Task worker in C++ // Connects PULL socket to tcp://localhost:5557 // Collects workloads from ventilator via that socket // Connects PUSH socket to tcp://localhost:5558 // Sends results to sink via that socket // // Olivier Chamoux <> // #include "zhelpers.hpp" #include <string> int main (int argc, char *argv[]) { zmq::context_t context(1); // Socket to receive messages on zmq::socket_t receiver(context, ZMQ_PULL); receiver.connect("tcp://localhost:5557"); // Socket to send messages to zmq::socket_t sender(context, ZMQ_PUSH); sender.connect("tcp://localhost:5558"); // Process tasks forever while (1) { zmq::message_t message; int workload; // Workload in msecs receiver.recv(&message); std::string smessage(static_cast<char*>(message.data()), message.size()); std::istringstream iss(smessage); iss >> workload; // Do the work s_sleep(workload); // Send results to sink message.rebuild(); sender.send(message); // Simple progress indicator for the viewer std::cout << "." << std::flush; } return 0; }
Here is the code for the result collector. It will collect 100 processing results and calculate the total execution time, so that we can judge whether the task is calculated in parallel.
// Task sink in C++ // Binds PULL socket to tcp://localhost:5558 // Collects results from workers via that socket // // Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com> // #include <zmq.hpp> #include <time.h> #include <sys/time.h> #include <iostream> int main (int argc, char *argv[]) { // Prepare our context and socket zmq::context_t context(1); zmq::socket_t receiver(context,ZMQ_PULL); receiver.bind("tcp://*:5558"); // Wait for start of batch zmq::message_t message; receiver.recv(&message); // Start our clock now struct timeval tstart; gettimeofday (&tstart, NULL); // Process 100 confirmations int task_nbr; int total_msec = 0; // Total calculated cost in msecs for (task_nbr = 0; task_nbr < 100; task_nbr++) { receiver.recv(&message); if ((task_nbr / 10) * 10 == task_nbr) std::cout << ":" << std::flush; else std::cout << "." << std::flush; } // Calculate and report duration of batch struct timeval tend, tdiff; gettimeofday (&tend, NULL); if (tend.tv_usec < tstart.tv_usec) { tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1; tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec; } else { tdiff.tv_sec = tend.tv_sec - tstart.tv_sec; tdiff.tv_usec = tend.tv_usec - tstart.tv_usec; } total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000; std::cout << "\nTotal elapsed time: " << total_msec << " msec\n" << std::endl; return 0; }
The average execution time of a group of tasks is about 5 seconds. The following are the execution results when starting 1, 2 and 4 worker s respectively:
# 1 worker Total elapsed time: 5034 msec # 2 workers Total elapsed time: 2421 msec # 4 workers Total elapsed time: 1018 msec
matters needing attention:
- The upstream of the worker is connected to the task distributor and the downstream is connected to the result collector, which means that you can start any number of workers. However, if the worker is bound to the endpoint rather than connected to the endpoint, we need to prepare more endpoints and configure the task distributor and result collector. Therefore, task distributors and result collectors are relatively stable parts of this network structure, so they should be bound to endpoints rather than workers because they are more dynamic.
- We need to do some synchronization work and wait for all the workers to start before distributing tasks. This is very important in ZMQ and is not easy to solve. The action of connecting to the socket takes some time, so when the first worker connects successfully, it will receive many tasks at once. So if we don't synchronize, these tasks won't be executed in parallel at all. You can try it yourself.
- The task distributor uses PUSH sockets to distribute tasks evenly to workers (assuming that all workers are connected). This mechanism is called load balancing
- As a result, the PULL socket of the collector collects messages evenly from the worker. This mechanism is called fair queue