Recently, there is a requirement in the project to send a message to the user when the user's exam is about to begin to remind the user that the exam is about to begin. In fact, this kind of requirement is very common. In essence, it can be realized based on delay queue. Combined with the current use of project technology stack, three schemes are thought of:
- Time wheel
- Redis Key expiration monitoring
- Redis ZSet
- The expiration time is used as the score, and the data whose score is less than the current time is retrieved each time
Personally, I feel that the implementation of the single version of time wheel is simple, but it is complex in the distributed environment; Redis Key overdue monitoring may not be reliable according to past experience, and it is not easy to control the consumption progress; So we finally chose to use Redis ZSet to implement it.
First edition implementation
The implementation of the first version is very simple, and the code is as follows:
Queue operation class:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ZSetOperations; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.Collections; import java.util.Set; /** * @author Dongguabai * @description Queue operation * @date 2021-02-22 18:45 */ @Component public final class RedisDelayQueue2 { @Autowired private RedisTemplate redisTemplate; public void enqueue(String key, String value, double score) { redisTemplate.opsForZSet().add(key, value, score); } public void enqueue(String key, Set<ZSetOperations.TypedTuple<String>> tuples) { redisTemplate.opsForZSet().add(key, tuples); } @Deprecated public Set<String> deququeByTimeMills(String key) { //Time as score return dequeue(key, 0, System.currentTimeMillis()); } @Deprecated public Set<String> dequeue(String key, double min, double max) { ZSetOperations zSetOperations = redisTemplate.opsForZSet(); Set<String> set = zSetOperations.rangeByScore(key, min, max); if (!CollectionUtils.isEmpty(set) && redisTemplate.opsForSet().remove(key, set.toArray()) > 0) { return set; } return Collections.emptySet(); } }
Message consumption:
import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.DateUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.annotation.PreDestroy; import java.util.Date; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @author Dongguabai * @description * @date 2021-02-22 19:08 */ @Component @Slf4j public class RemindMessageConsumer implements CommandLineRunner { @Autowired private RedisDelayQueue2 redisDelayQueue2; @Autowired private RedisTemplate redisTemplate; private static final String NAME = "RemindMessageTask-thread-"; private final AtomicInteger seq = new AtomicInteger(1); private final ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, NAME + seq.getAndIncrement())); @Override public void run(String... args) { //Analog initialization data load(); log.info("RemindMessageTask run..."); String key = getRemindThreadMessageQueueKey(); pool.scheduleAtFixedRate(()->{ try { Set<String> set = redisDelayQueue2.deququeByTimeMills(key); if (!CollectionUtils.isEmpty(set)){ //Asynchronous processing log.error("---->Send Message: "+set); } }catch (Throwable e){ log.error("RemindMessageTask error..",e); } }, 5,1, TimeUnit.SECONDS); } private void load() { String key = getRemindThreadMessageQueueKey(); redisTemplate.delete(key); Date now = new Date(); for (int i = 1; i <= 10; i++) { Date date = DateUtils.addSeconds(now, i * 10); redisDelayQueue2.enqueue(key,"["+date.toLocaleString()+"]",date.getTime()); } System.out.println("Data:"+redisTemplate.opsForZSet().range(key,0,-1)); } @PreDestroy public void preDestroy(){ pool.shutdownNow(); } }
For the convenience of later testing, here is to get messages from the delay queue every 1s after the Spring container is started (of course, it will not be set in the actual production environment). The principle of getting messages is that the score of messages is less than or equal to the current time.
First, simply test the effect of the queue:
@Autowired private RedisDelayQueue2 delayQueue2; @Test public void test1(){ String key = "adadadadadada"; redisTemplate.delete(key); for (int i = 0; i < 10; i++) { delayQueue2.enqueue(key,i+"",i); } System.out.println("Total data:"+redisTemplate.opsForZSet().range(key,0,-1)); for (int i = 0; i < 10; i++) { Set<String> set = delayQueue2.dequeue(key, 0d, (double) i); System.out.println(i+":"+set); System.out.println("Remaining data:"+redisTemplate.opsForZSet().range(key,0,-1)); System.out.println("----------"); } redisTemplate.delete(key); }
Output:
Total data:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 0:[0] Remaining data:[1, 2, 3, 4, 5, 6, 7, 8, 9] ---------- 1:[1] Remaining data:[2, 3, 4, 5, 6, 7, 8, 9] ---------- 2:[2] Remaining data:[3, 4, 5, 6, 7, 8, 9] ---------- 3:[3] Remaining data:[4, 5, 6, 7, 8, 9] ---------- 4:[4] Remaining data:[5, 6, 7, 8, 9] ---------- 5:[5] Remaining data:[6, 7, 8, 9] ---------- 6:[6] Remaining data:[7, 8, 9] ---------- 7:[7] Remaining data:[8, 9] ---------- 8:[8] Remaining data:[9] ---------- 9:[9] Remaining data:[] ----------
After testing the whole process, start the SpringBoot project and the console outputs:
Data:[[2021-2-23 13:59:57], [2021-2-23 14:00:07], [2021-2-23 14:00:17], [2021-2-23 14:00:27], [2021-2-23 14:00:37], [2021-2-23 14:00:47], [2021-2-23 14:00:57], [2021-2-23 14:01:07], [2021-2-23 14:01:17], [2021-2-23 14:01:27]] [2021-02-23 13:59:48] [INFO] [main] cRemindMessageConsumer-RemindMessageTask run... [2021-02-23 13:59:58] [ERROR] [RemindMessageTask-thread-1] cRemindMessageConsumer----->Send Message: [[2021-2-23 13:59:57]] [2021-02-23 14:00:08] [ERROR] [RemindMessageTask-thread-1] cRemindMessageConsumer----->Send Message: [[2021-2-23 14:00:07]] [2021-02-23 14:00:18] [ERROR] [RemindMessageTask-thread-1] cRemindMessageConsumer----->Send Message: [[2021-2-23 14:00:17]] [2021-02-23 14:00:28] [ERROR] [RemindMessageTask-thread-1] cRemindMessageConsumer----->Send Message: [[2021-2-23 14:00:27]] [2021-02-23 14:00:38] [ERROR] [RemindMessageTask-thread-1] cRemindMessageConsumer----->Send Message: [[2021-2-23 14:00:37]] [2021-02-23 14:00:48] [ERROR] [RemindMessageTask-thread-1] cRemindMessageConsumer----->Send Message: [[2021-2-23 14:00:47]] [2021-02-23 14:00:58] [ERROR] [RemindMessageTask-thread-1] cRemindMessageConsumer----->Send Message: [[2021-2-23 14:00:57]] [2021-02-23 14:01:08] [ERROR] [RemindMessageTask-thread-1] cRemindMessageConsumer----->Send Message: [[2021-2-23 14:01:07]] [2021-02-23 14:01:18] [ERROR] [RemindMessageTask-thread-1] cRemindMessageConsumer----->Send Message: [[2021-2-23 14:01:17]] [2021-02-23 14:01:28] [ERROR] [RemindMessageTask-thread-1] cRemindMessageConsumer----->Send Message: [[2021-2-23 14:01:27]]
You can see that you can basically send a message at the specified time.
Existing problems and optimization ideas
There are two obvious problems with this scheme:
1. Performance issues
We get messages sent at a time interval that is less than the current time. If a large number of messages are generated within this interval, the performance of Redis will be affected. We use zrangebyscore and zrem commands here. The time complexity of these two commands:
ZREM key member [member ...] | O(M log(N)) | In the time complexity, N represents the number of members in the sorted set, and M represents the number of deleted members. This command will remove the member specified in the parameter, and the non-existent member will be ignored. If the Value associated with the Key is not sorted set, the corresponding error message will be returned. | Number of members actually deleted. |
---|
ZREVRANGEBYSCOREkey max min [WITHSCORES] [LIMIT offset count] | O(log(N)+M) | N in the time complexity represents the number of members in the sorted set, and M represents the number of returned members. Except that the sorting method of this command is based on sorting from high to low scores, the meaning of other functions and parameters are the same as that of ZRANGEBYSCORE. | Returns a list of members whose scores are within the specified range. |
---|
If you can consume messages asynchronously and timely, in fact, zrangebyscore is fine, but if a large number of messages are generated within the time interval and processed at the same time, the performance of zrem will decline sharply.
Here are three solutions:
- Multi thread concurrent consumption message;
- The service itself is deployed in a cluster and is naturally distributed. It can randomly value the start delay time of scheduled tasks or the time of each cycle, so that each machine can process messages at a certain interval. In this way, the number of messages to be processed in a single interval will be greatly reduced;
- The zrangebyscore command sets the limit to limit the number of messages processed at a time
2. Atomicity
In the dequeue method of RedisDelayQueue2, query first and then delete. These two operations are not atomic, which will obviously lead to concurrency problems. Although I judge the number of zrem here, some key s may be consumed by other machines. Although distributed locks can be used or the idempotency of message sending can be guaranteed, this is complex and unnecessary. So it can be implemented based on Lua script.
Script transformation based on Lua
Combined with the above problems, the transformation is based on Lua script.
Redis configuration class:
/** * @author Dongguabai * @description * @date 2021-02-22 23:20 */ @Configuration public class RedisConfiguration { @Bean public DefaultRedisScript<List> delayRedisScript() { DefaultRedisScript<List> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(List.class); defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/delay.lua"))); return defaultRedisScript; } }
Note that the ResultType here should be set to list. You can see this method org Springframework data. redis. core. script. DefaultScriptExecutor#execute(org.springframework.data.redis.core.script.RedisScript<T>, org.springframework.data.redis.serializer.RedisSerializer<?>, org.springframework.data.redis.serializer.RedisSerializer<T>, java.util.List<K>, java.lang.Object...):
Take another look at the fromJavaType method:
/** * @param javaType can be {@literal null} which translates to {@link ReturnType#STATUS}. * @return never {@literal null}. */ public static ReturnType fromJavaType(@Nullable Class<?> javaType) { if (javaType == null) { return ReturnType.STATUS; } if (javaType.isAssignableFrom(List.class)) { return ReturnType.MULTI; } if (javaType.isAssignableFrom(Boolean.class)) { return ReturnType.BOOLEAN; } if (javaType.isAssignableFrom(Long.class)) { return ReturnType.INTEGER; } return ReturnType.VALUE; }
You will find that there is a corresponding relationship.
I put the Lua script here in the src/resources/lua directory.
Lua script is as follows:
--- --- Created by Dongguabai. --- DateTime: 2021/2/23 00:51 --- local key = KEYS[1] local min = ARGV[1] local max = ARGV[2] local result = redis.call('zrangebyscore',key,min,max,'LIMIT',0,10) if next(result) ~= nil and #result > 0 then local re = redis.call('zrem',key,unpack(result)); if re > 0 then return result; end else return {} end
Queue operation class:
/** * @author Dongguabai * @description * @date 2021-02-22 18:45 */ @Component public final class RedisDelayQueue2 { @Autowired private RedisTemplate redisTemplate; @Resource(name = "delayRedisScript") private DefaultRedisScript delayRedisScript; public void enqueue(String key, String value, double score) { redisTemplate.opsForZSet().add(key, value, score); } public void enqueue(String key, Set<ZSetOperations.TypedTuple<String>> tuples) { redisTemplate.opsForZSet().add(key, tuples); } @Deprecated public Set<String> deququeByTimeMills(String key) { return dequeue(key, 0, System.currentTimeMillis()); } @Deprecated public Set<String> dequeue(String key, double min, double max) { ZSetOperations zSetOperations = redisTemplate.opsForZSet(); Set<String> set = zSetOperations.rangeByScore(key, min, max); if (!CollectionUtils.isEmpty(set) && zSetOperations.remove(key, set.toArray()) > 0) { return set; } return Collections.emptySet(); } public Set<String> dequeue2(String key, double min, double max) { List execute = (List) redisTemplate.execute(delayRedisScript, Collections.singletonList(key), min,max); return new HashSet<>(execute); } public Set<String> deququeByTimeMills2(String key) { return dequeue2(key, 0, System.currentTimeMillis()); } }
Message consumption:
/** * @author Dongguabai * @description * @date 2021-02-22 19:08 */ @Component @Slf4j public class RemindMessageConsumer implements CommandLineRunner { @Autowired private RedisDelayQueue2 redisDelayQueue2; @Autowired private RedisTemplate redisTemplate; private static final String NAME = "RemindMessageTask-thread-"; private final AtomicInteger seq = new AtomicInteger(1); private final ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, NAME + seq.getAndIncrement())); @Override public void run(String... args) { long initialDelay =Math.round(Math.random()*10+10); long period = Math.round(Math.random()*10); System.out.printf("initialDelay:%d,period: %d\n",initialDelay,period); //Analog initialization data load(); log.info("RemindMessageTask run..."); String key = getRemindThreadMessageQueueKey(); pool.scheduleAtFixedRate(()->{ try { Set<String> set = redisDelayQueue2.deququeByTimeMills2(key); if (!CollectionUtils.isEmpty(set)){ log.error("---->Send Message: "+set); } }catch (Throwable e){ log.error("RemindMessageTask error..",e); } }, initialDelay,period, TimeUnit.SECONDS); } private void load() { String key = getRemindThreadMessageQueueKey(); redisTemplate.delete(key); Date now = new Date(); for (int i = 1; i <= 10; i++) { Date date = DateUtils.addSeconds(now, i * 10); redisDelayQueue2.enqueue(key,"["+date.toLocaleString()+"]",date.getTime()); } System.out.println("Data:"+redisTemplate.opsForZSet().range(key,0,-1)); } @PreDestroy public void preDestroy(){ pool.shutdownNow(); } public static void main(String[] args) { for (int i = 0; i < 20; i++) { long l =Math.round(Math.random()*10+10); System.out.println(l); } } }
Start the project and the console outputs:
initialDelay:10,period: 7 [2021-02-23 14:54:08] [INFO] [main] RemindMessageConsumer-RemindMessageTask run... Data:[[2021-2-23 14:54:17], [2021-2-23 14:54:27], [2021-2-23 14:54:37], [2021-2-23 14:54:47], [2021-2-23 14:54:57], [2021-2-23 14:55:07], [2021-2-23 14:55:17], [2021-2-23 14:55:27], [2021-2-23 14:55:37], [2021-2-23 14:55:47]] [2021-02-23 14:54:18] [ERROR] [RemindMessageTask-thread-1] RemindMessageConsumer----->Send Message: [[2021-2-23 14:54:17]] [2021-02-23 14:54:32] [ERROR] [RemindMessageTask-thread-1] RemindMessageConsumer----->Send Message: [[2021-2-23 14:54:27]] [2021-02-23 14:54:39] [ERROR] [RemindMessageTask-thread-1] RemindMessageConsumer----->Send Message: [[2021-2-23 14:54:37]] [2021-02-23 14:54:53] [ERROR] [RemindMessageTask-thread-1] RemindMessageConsumer----->Send Message: [[2021-2-23 14:54:47]] [2021-02-23 14:55:00] [ERROR] [RemindMessageTask-thread-1] RemindMessageConsumer----->Send Message: [[2021-2-23 14:54:57]] [2021-02-23 14:55:14] [ERROR] [RemindMessageTask-thread-1] RemindMessageConsumer----->Send Message: [[2021-2-23 14:55:07]] [2021-02-23 14:55:21] [ERROR] [RemindMessageTask-thread-1] RemindMessageConsumer----->Send Message: [[2021-2-23 14:55:17]] [2021-02-23 14:55:28] [ERROR] [RemindMessageTask-thread-1] RemindMessageConsumer----->Send Message: [[2021-2-23 14:55:27]] [2021-02-23 14:55:42] [ERROR] [RemindMessageTask-thread-1] RemindMessageConsumer----->Send Message: [[2021-2-23 14:55:37]] [2021-02-23 14:55:49] [ERROR] [RemindMessageTask-thread-1] RemindMessageConsumer----->Send Message: [[2021-2-23 14:55:47]]
You can see that you can basically send a message at the specified time.
Message loss problem
At present, there will still be some problems of message loss. For example, if the system goes down after pulling messages from Redis (messages have been deleted from Redis at this time), these messages will be lost. You can consider adding the ACK mechanism to Redis after the system consumes messages, and ensure the idempotency of message consumption.
In the business scenario of my project, some messages are allowed to be lost, so this scheme can be met at present.
References
- https://blog.csdn.net/feeltouch/article/details/89007544
Welcome to the official account: