ActiveMQ learning 3- transport protocol and persistence

1. Transmission protocol of ActiveMQ

1. Introduction

1. Interview questions

How to change the default 61616 port? How are the connection protocols configured in your production? Use tcp?

2. Official website

http://activemq.apache.org/configuring-version-5-transports.html

The file for configuring the Transport Connector is located in conf / ActiveMQ. In the ActiveMQ installation directory Within the < transportconnector s > tag in XML.
See the actual configuration below:

In the configuration information given above, the header of URI description information adopts the protocol name: for example

When describing the listening port of amqp protocol, the URI description format adopted is "amqp: / / ·······";

When describing the listening port of stomp protocol, the URI description format is "stomp: / / ········";

Only when describing the openwire protocol, the URI header adopts "tcp: / / ··································································. This is because the default message protocol in ActiveMQ is openwire

2. Protocol introduction

(1) Transmission Control Protocol(TCP) default

1.This is the default Broker to configure, TCP of Client Listening port 61616
2.Before transmitting data over the network, the data must be serialized first. The message is sent through a message called wire protocol To serialize into a byte stream.
3.TCP Connected URI The form is as follows: tcp://HostName:port? Key = value & key = value, the following parameters are optional.
4.TCP Advantages of transmission:
    (4.1)TCP The protocol transmission has high reliability and strong stability
    (4.2)High efficiency: byte stream transmission is very efficient
    (4.3)Effectiveness and availability: it is widely used and supports any platform
5.about Transport For optional configuration parameters of the protocol, please refer to the official website http://activemq.apache.org/configuring-version-5-transports.html

(2)New I/O API Protocol(NIO)

1.NIO Agreement and TCP The agreement is similar, but NIO More focus on the underlying access operations. It allows developers to have more access to the same resource client There is more load on the calling and server side.
2.Suitable for use NIO Scenario of the agreement:
    (2.1)There may be a large number of Client To connect to Broker In general, a large number of Client De connection Broker Is limited by the threads of the operating system. So, NIO Implementation ratio of TCP Fewer threads are required to run, so it is recommended to use NIO agreement.
    (2.2)Maybe for Broker There is a very slow network transmission, NIO than TCP Provide better performance.
3.NIO Connected URI Form: nio://hostname:port?key=value&key=value
4.about Transport For optional configuration parameters of the protocol, please refer to the official website http://activemq.apache.org/configuring-version-5-transports.html

(3) AMQP protocol

Advanced Message Queuing Protocol,An application layer standard advanced message queuing protocol providing unified messaging service is an open standard of application layer protocol, which is designed for message oriented middleware. The client and message middleware based on this protocol can deliver messages and are not affected by the client/Middleware is limited by different products and different development languages.

(4) Stomp protocol

STOP,Streaming Text Orientation Message Protocol,Stream text oriented message protocol MOM(Message Oriented Middleware,Message Oriented Middleware)Designed simple text protocol.

(5)Secure Sockets Layer Protocol(SSL)

(6) MQTT protocol

MQTT(Message Queuing Telemetry Transport,Message Queuing Telemetry Transport )yes IBM The development of an instant messaging protocol may become an important part of the Internet of things. The protocol supports all platforms and can connect almost all networked items with the outside world, which can be used as sensors and actuators(For example, through Twitter Connect houses)Communication protocol.

# You can refer to the configuration in github
https://github.com/fusesource/mqtt-client

(7) WS protocol (websocket)

(8) Summary

2. nio case demonstration

1. Official website address

http://activemq.apache.org/configuring-version-5-transports.html

2. Modify profile

<transportConnectors>
      <transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" />
</transportConnectors>

If you do not specify the network listening ports of ActiveMQ, these ports will use the BIO network IO model. Therefore, in order to improve the network throughput performance of a single node, we need to specify the ActiveMQ network IO model explicitly.

As shown in the following figure: the URI format header starts with "NiO", indicating that this port uses the NiO network IO model based on TCP protocol.

3. Producer and consumer agreement code modification

private static final String ACTIVEMQ_URL = "nio://192.168.10.101:61618";
private static final String QUEUE_NAME = "Queue-NIO";

4. Testing

Run the producers and consumers in the following figure in turn:

The consumer can receive the data sent from the producer.

3. nio case demonstration enhancements

1. Introduction

The NIO performance is good, and we can further optimize it.

The URI format starts with "NIO", which represents the NIO network model based on TCP protocol used by this port. However, this setting method can only make this port support Openwire protocol.

Therefore, our optimization direction is to make this port support both NIO network model and multiple protocols.

The effects achieved are as follows:

2. Disposition

(1) Official website

http://activemq.apache.org/auto

That is, you can use the "+" symbol to set a variety of characteristics for the port.

(2) If we need to use a port to support NIO network model and multiple protocols, we need to add the following configuration in the configuration file:

<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608"/>

We can annotate the configuration added in the nio case. The effect diagram is as follows:

(3) Testing

Modify the port number in the producer and consumer and add the following code:

private static final String ACTIVEMQ_URL = "nio://192.168.10.101:61608";
private static final String QUEUE_NAME = "Queue-NIO";

At this time, the consumer can receive the data sent by the producer.

2. Message storage and persistence in ActiveMQ

1. Introduction

(1) Official website

http://activemq.apache.org/persistence

(2) Recall that there are several persistence mechanisms for Redis

RDB and AOF.

(3) Overview

In order to avoid the loss of information after accidental downtime, it is necessary to restore the message queue after restart, and half of the message system will adopt the persistence mechanism.

ActiveMQ's message persistence mechanisms include JDBC, AMQ, KahaDB and LevelDB. No matter which persistence method is used, the message storage logic is consistent. After the sender sends the message, the message center first stores the message in the local data file, memory database or remote database. Then try to send the message to the receiver. If successful, delete the message from the storage. If failed, continue to try to send.

After the message center is started, check whether there are messages that have not been successfully sent in the specified storage location. If so, the messages in the storage location will be sent first.

Bottom line: ActiveMQ is down, and messages will not be lost.

2. Persistence method

1. AMQ mesage store (understand)

The file based storage method is the previous default message storage, which is no longer used.

AMQ is a form of file storage, which has the characteristics of fast writing speed and easy recovery. The default size of the file in the message storage file is 32M. When all the messages in a file have been consumed, the file will be marked as removable. In the next cleaning stage, the file will be deleted. AMQ is applicable to activemq5 Version before 3.

2. KahaDB message store (default)

(1) Introduction

Based on the log file, from activemq5 4 start the default persistence plug-in.

Official website:

http://activemq.aache.org/kahadb

(2) Verify

(3) Description

http://activemq.aache.org/kahadb

KahaDB is the default storage mode at present. It can be used in any scenario and improves performance and recovery ability. The message store uses a transaction log and only one index file to store all its addresses.

KahaDB is also a solution for message persistence, which optimizes the typical message usage model.

Data is appended to data logs. When the data in the log file is no longer needed, the log file will be discarded.

(4) Storage principle

KahaDB has four types of files and a lock in the message storage directory, which is very concise compared with several other file storage engines of ActiveMQ.

(a)db-number.log

KahaDB stores messages in a data record file of a predetermined size, named DB number log. When the data file is full, a new file will be created and the number will increase with the increase of the number of messages. If there is no 32M file, the file name will be numbered according to the number, such as db-1 log,db-2.log······. When there are no more messages referenced in the data file, the file will be deleted or archived.

(b)db.data
This file contains the persistent BTree index, which indexes the messages in the message data record. It is the index file of the message. In essence, it is a B-Tree (B-Tree). B-Tree is used as the index to point to DB number. log stores messages.

(c)db.free
When asked about the current dB Which pages in the data file are free? The specific content of the file is the ID of all free pages

(d)db.redo

It is used for message recovery. If the KahaDB message store is started after forced exit, it is used to recover the BTree index

(e)lock
File lock, which indicates the broker of the current kahadb write only permission.

3. JDBC message store

Messages are stored based on JDBC.

4. LevelDB message store (understand)

Official website address:

http://activemq.apache.org/leveldb-store

This file system is from activemq5 Introduced after 8, it is very similar to KahaDB. It is also a file based local database storage form, but it provides faster persistence than KahaDB.

However, it does not use a custom B-Tree implementation to index individual write logs, but uses a LevelDB based index.

The default configuration is as follows:

<persistenceAdapter>
      <levelDB directory="activemq-data"/>
</persistenceAdapter>

5,JDBC Message Store with ActiveMQ Journal

3. IDBC store message

1. Architecture diagram

http://activemq.apache.org/persistence

2. Add the mysql database driver package to the lib folder

wget   -P    /myActivemq/activemq/lib  https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.17/mysql-connector-java-8.0.17.jar

wget -P /myActivemq/activemq/lib  https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.7.27/mysql-connector-java-5.7.27.jar

3. JDBC persistenceadapter configuration

Modify ActiveMQ XML configuration file.

<--Before modification KahaDB-->
<persistenceAdapter>
        <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

<--Modified jdbcPersisteceAdapter-->
<persistenceAdapter> 
       <jdbcPersistenceAdapter dataSource="#mysql-ds" /> 
</persistenceAdapter>

4. Database connection pool configuration

<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> 
    <property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/> 
    <property name="url" value="jdbc:mysql://192.168.43.4:3306/activemq?relaxAutoCommit=true&amp;serverTimezone=GMT%2B8"/> 
    <property name="username" value="root"/> 
    <property name="password" value="123"/> 
    <property name="poolPreparedStatements" value="true"/> 
</bean>

5. Description of database creation SQL and table creation

(1) Build a database called activemq.

(2) Description of three tables

ACTIVEMQ_MSGS Description:

keyworddescribe
IDSelf incremented database primary key
CONTAINERDestination of the message
MSGID_PRODPrimary key of the message sender
MSG_SEQIs the order in which messages are sent, MSGID_PROD+MSG_SEQ MessageID that can form JMS
EXPIRATIONThe expiration time of the message is the number of milliseconds from January 1, 1970 to the present
MSGBinary data of Java serialized object of message Ontology
PRIORITYPriority, from 0 to 9, the higher the value, the higher the priority

Message table, default table name: ACTIVEMQ_MSGS, Queue and Topic all exist in it. The structure is as follows:

ACTIVEMQ_ACKS Description:

ACTIVEMQ_LOCK Description:

Table ACTIVEMQ_LOCK is only useful in the cluster environment. Only one Broker can get messages, called Master Broker. Others can only be used as backup and wait for the Master Broker to be unavailable before they can become the next Master Broker. This table is used to record which Broker is the current Master Broker.

(3) If the new database is ok, the above configuration is ok, and activemq restarts and runs ok, the three tables will be generated automatically.

If the table is not generated, you may need to create it yourself:

-- auto-generated definition
create table ACTIVEMQ_ACKS
(
    CONTAINER     varchar(250)     not null comment 'Informative Destination',
    SUB_DEST      varchar(250)     null comment 'If using Static Cluster, this field will contain information about other systems in the cluster',
    CLIENT_ID     varchar(250)     not null comment 'Each subscriber must have a unique client ID To distinguish',
    SUB_NAME      varchar(250)     not null comment 'Subscriber name',
    SELECTOR      varchar(250)     null comment 'Selector, you can choose to consume only messages that meet the conditions. The conditions can be implemented with user-defined attributes and support multiple attributes AND and OR operation',
    LAST_ACKED_ID bigint           null comment 'Record consumed messages ID',
    PRIORITY      bigint default 5 not null comment 'Priority, default 5',
    XID           varchar(250)     null,
    primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)
)
    comment 'Used to store subscription relationships. If it's persistent Topic,The subscription relationship between the subscriber and the server is saved in this table';

create index ACTIVEMQ_ACKS_XIDX
    on ACTIVEMQ_ACKS (XID);

 
-- auto-generated definition
create table ACTIVEMQ_LOCK
(
    ID          bigint       not null
        primary key,
    TIME        bigint       null,
    BROKER_NAME varchar(250) null
);

 
-- auto-generated definition
create table ACTIVEMQ_MSGS
(
    ID         bigint       not null
        primary key,
    CONTAINER  varchar(250) not null,
    MSGID_PROD varchar(250) null,
    MSGID_SEQ  bigint       null,
    EXPIRATION bigint       null,
    MSG        blob         null,
    PRIORITY   bigint       null,
    XID        varchar(250) null
);

create index ACTIVEMQ_MSGS_CIDX
    on ACTIVEMQ_MSGS (CONTAINER);

create index ACTIVEMQ_MSGS_EIDX
    on ACTIVEMQ_MSGS (EXPIRATION);

create index ACTIVEMQ_MSGS_MIDX
    on ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ);

create index ACTIVEMQ_MSGS_PIDX
    on ACTIVEMQ_MSGS (PRIORITY);

create index ACTIVEMQ_MSGS_XIDX
    on ACTIVEMQ_MSGS (XID);

6. Testing

(1) Queue

Producer Code:

package com.xiaolun.activemq.mysql;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Producer {
	private static final String ACTIVEMQ_URL = "nio://192.168.10.101:61616";
	private static final String ACTIVEMQ_QUEUE_NAME = "Queue-JdbcPersistence";

	public static void main(String[] args) throws JMSException {
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
		activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);
		Connection connection = activeMQConnectionFactory.createConnection();
		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
		MessageProducer messageProducer = session.createProducer(queue);
		//Be sure to turn on persistence
		messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
		connection.start();
		for (int i = 0; i < 3; i++) {
			TextMessage textMessage = session.createTextMessage("Queue-JdbcPersistence Test message" + i);
			messageProducer.send(textMessage);
		}
		session.commit();
		System.out.println("Message sending completed");
		messageProducer.close();
		session.close();
		connection.close();
	}
}

Consumer code:

package com.xiaolun.activemq.mysql;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;

public class Consumer {
	private static final String ACTIVEMQ_URL = "nio://192.168.10.101:61616";
	private static final String ACTIVEMQ_QUEUE_NAME = "Queue-JdbcPersistence";

	public static void main(String[] args) throws JMSException, IOException {
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
		activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);
		Connection connection = activeMQConnectionFactory.createConnection();
		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
		MessageConsumer messageConsumer = session.createConsumer(queue);
		connection.start();
		messageConsumer.setMessageListener(new MessageListener() {
			@Override
			public void onMessage(Message message) {
				if (message instanceof TextMessage) {
					TextMessage textMessage = (TextMessage) message;
					try {
						session.commit();
						System.out.println("Consumer receives message" + textMessage.getText());
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
			}
		});
		System.in.read();
	}
}

Test:

First, execute the producer and send three pieces of data to MQ. At this time, it will be seen in the front-end console and database. Then start the consumer. Three pieces of data in MQ will be consumed and the data saved in the database will be deleted.

be careful:

The above test occurs after the producer sets the data persistence. If the data in the producer is not persistent, the data cannot be seen in the database after the producer code is executed first.

Conclusion:

In point-to-point type
 When DeliveryMode Set to NON_PERSISTENCE The message is saved in memory
 When DeliveryMode Set to PERSISTENCE When, the message is saved in broker In the corresponding file or database.

And once the message in the point-to-point type is Consumer Consumption is deleted from the data.

Messages before consumption will be stored in the database:

The above message is automatically deleted by MQ after being consumed:

(2) Theme

The code is similar to the queue, and the persistence configuration should also be added to the producer Code:

messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

If the persistent subscription is set, the subscriber information will be saved in the database, that is, the messages in the Topic will not be deleted, while the messages in the Queue will be deleted in the database after being deleted. If you need to save the Queue, you should use other solutions.

ACTIVEMQ_ Last in the acks table_ ACKED_ The ID records the client_ Last message signed by ID:

And LAST_ACKED_ID and ActiveMQ_ The ID field of msgs is a foreign key association relationship, so that the Topic message can be saved to ActiveMQ_ In the msgs table, according to ActiveMQ_ What was the last message that the persistent subscriber in the acks table received last time

7. Summary

(1) For queue

Messages are saved to ActiveMQ without consumer consumption_ In the msgs table, as long as any consumer has consumed, the consumed messages will be deleted

(2) For topic

Generally, when the consumer subscriber is started before production, the persistent subscriber will be permanently saved to qctivemq_acks, and the message is permanently stored in activemq_msgs, the subscriber in the acks table has a last_ack_id corresponds to ActiveMQ_ The id field in msgs, so you can know which message the subscriber received last.

8. Problems needing attention in development

(1) Database jar package

Pay attention to the corresponding version of the database jar or your own non built-in database connection pool jar package. When we use Alibaba's connection pool, we need to import not only one mysql jar package, but also other packages.

(2) Createtablesonstartup property

The default is true. Every time activemq is started, the table will be automatically created. After the first start, it should be changed to false to avoid unnecessary losses.

(3)java.lang.IllegalStateException: LifecycleProcessor not initialized

Verify that the computer host name is not underlined.

4. JDBC Message store with ActiveMQ Journal configuration

1. Introduction

(1) Overview

(2) Explain

This method overcomes the shortcomings of JDBC Store. Every time a message comes from JDBC, you need to write and read the library.
ActiveMQ Journal, which uses cache write technology, greatly improves performance.

When the speed of consumers can keep up with the production speed of producer messages in time, the journal file can greatly reduce the messages that need to be written to the DB.

for instance:

The producer produces 1000 messages, which will be saved to the journal file. If the consumer consumes more than 90% of the messages before the journal file is synchronized to the DB, then only the remaining 10% of the messages need to be synchronized to the dB at this time. If the speed of the consumer is very slow, the journal file can make the messages written to the DB in batch.

2. Disposition

<!--Configuration before modification-->
<persistenceAdapter> 
       <jdbcPersistenceAdapter dataSource="#mysql-ds" /> 
</persistenceAdapter>

<!--Modified configuration-->
<persistenceFactory>        
      <journalPersistenceAdapterFactory 
      journalLogFiles="5" 
      journalLogFileSize="32768" 
      useJournal="true" 
      useQuickJournal="true" 
      dataSource="#mysql-ds" 
      dataDirectory="activemq-data" /> 
</persistenceFactory>

3. Testing

In the past, mysql was written in real time. After using journal, the data will be processed by journal. If the journal processing (consumption) is completed within a certain time, mysql will not be written. If the consumption is not completed, mysql will be written to serve as a cache.

Therefore, when we first start the producer and push data to MQ, we can't see the data on the MySQL side at this time, because the data is put into the journal cache. Only after waiting for a period of time and the consumer doesn't consume the data, the journal will put the data in the cache into mysql.

5. ActiveMQ persistence mechanism summary

Persistent messages mainly refer to:

The mechanism that messages will not be lost when the server where MQ is located goes down.

The evolution process of persistence mechanism:

From the original AMQ Message Store scheme to the High Performance Journal (high performance transaction support) attachment exited from ActiveMQ V4 version, and synchronously launched the storage scheme of relational database. ActiveMQ5. Version 3 also introduced support for KahaDB (it was used as the default persistence scheme after version 5.4). Later, ActiveMQ version 5.8 began to support LevelDB, and now 5.9 provides a standard Zookeeper+LevelDB clustering scheme.

ActiveMQ message persistence mechanisms include:

AMQ              Based on log file
KahaDB            Based on log files, from ActiveMQ5.4 Start default use
JDBC              Based on third-party database
Replicated LevelDB Store From 5.9 Start offering LevelDB and Zookeeper Data replication method for Master-slave Preferred data replication scheme for.

Tags: Big Data Middleware ActiveMQ

Posted by acroporas on Mon, 18 Apr 2022 23:01:07 +0930