In the previous article, we introduced the plug-in function of the configuration service, which can be extended very well through plug-ins. This article continues the content of the ConfigChangePublisher.notifyConfigChange part that was not analyzed in the previous article.
public static void notifyConfigChange(ConfigDataChangeEvent event) { if (PropertyUtil.isEmbeddedStorage() && !EnvUtil.getStandaloneMode()) { // Internal storage and non-singleton will not be processed return; } NotifyCenter.publishEvent(event); }
The above code encounters the NotifyCenter that was analyzed before. For those who have not seen it, you can click here to view it. Now that you have reached the notification center, you can directly find the onEvent method of ConfigDataChangeEvent to view the processing logic.
@Autowired // spring automatic injection public AsyncNotifyService(ServerMemberManager memberManager) { this.memberManager = memberManager; // Register ConfigDataChangeEvent to NotifyCenter. NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize); // Register a handler class NotifyCenter.registerSubscriber(new Subscriber() { @Override public void onEvent(Event event) { if (event instanceof ConfigDataChangeEvent) { ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; long dumpTs = evt.lastModifiedTs; String dataId = evt.dataId; String group = evt.group; String tenant = evt.tenant; String tag = evt.tag; // record monitor MetricsMonitor.incrementConfigChangeCount(tenant, group, dataId); // Get all service nodes Collection<Member> ipList = memberManager.allMembers(); Queue<NotifySingleTask> httpQueue = new LinkedList<>(); Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>(); for (Member member : ipList) { if (!MemberUtil.isSupportedLongCon(member)) { httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), evt.isBeta)); } else { rpcQueue.add( new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member)); } } if (!httpQueue.isEmpty()) { ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue)); } if (!rpcQueue.isEmpty()) { // Focus on Rpc processing ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue)); } } } @Override public Class<? extends Event> subscribeType() { return ConfigDataChangeEvent.class; } }); }
The ConfigDataChangeEvent listener processor is registered in the AsyncNotifyService construction method. And AsyncNotifyService belongs to spring for hosting, and this bean will be created when the container is created. Then create a queue, store other service nodes related to the configuration, and then make asynchronous notification calls through the thread pool.
ServerMemberManager
Let's first analyze the ServerMemberManager passed in AsyncNotifyService. It is the management of multi-service nodes in the cluster. Including obtaining all service node lists, whether a certain service node list is included, whether it is a local node, etc. The following is a brief analysis of how to get all the nodes.
public ServerMemberManager(ServletContext servletContext) throws Exception { // First create a HashMap that supports concurrency and skip tables this.serverList = new ConcurrentSkipListMap<>(); EnvUtil.setContextPath(servletContext.getContextPath()); // initialization init(); } protected void init() throws NacosException { Loggers.CORE.info("Nacos-related cluster resource initialization"); // Handle local ip this.port = EnvUtil.getProperty(SERVER_PORT_PROPERTY, Integer.class, DEFAULT_SERVER_PORT); this.localAddress = InetUtils.getSelfIP() + ":" + port; this.self = MemberUtil.singleParse(this.localAddress); this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version); // init abilities. this.self.setAbilities(initMemberAbilities()); // add local address serverList.put(self.getAddress(), self); // Register NodeChangeEvent publisher to NotifyManager registerClusterEvent(); // Initialize some listeners and look for other configured service nodes initAndStartLookup(); if (serverList.isEmpty()) { throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit."); } Loggers.CORE.info("The cluster resource is initialized"); }
ConcurrentSkipListMap is created in the constructor of ServerMemberManager, and ConcurrentSkipListMap can support more data and higher concurrency. In the subsequent initialization process, the local machine address is first added to the service list, and then the cluster event listener is registered and some configurations are initialized.
registerClusterEvent
private void registerClusterEvent() { // Register node change notification event NotifyCenter.registerToPublisher(MembersChangeEvent.class, EnvUtil.getProperty(MEMBER_CHANGE_EVENT_QUEUE_SIZE_PROPERTY, Integer.class, DEFAULT_MEMBER_CHANGE_EVENT_QUEUE_SIZE)); // Register an ip change listener, which can dynamically update the local ip NotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>() { @Override public void onEvent(InetUtils.IPChangeEvent event) { String newAddress = event.getNewIP() + ":" + port; ServerMemberManager.this.localAddress = newAddress; EnvUtil.setLocalAddress(localAddress); Member self = ServerMemberManager.this.self; self.setIp(event.getNewIP()); String oldAddress = event.getOldIP() + ":" + port; ServerMemberManager.this.serverList.remove(oldAddress); ServerMemberManager.this.serverList.put(newAddress, self); ServerMemberManager.this.memberAddressInfos.remove(oldAddress); ServerMemberManager.this.memberAddressInfos.add(newAddress); } @Override public Class<? extends Event> subscribeType() { return InetUtils.IPChangeEvent.class; } }); }
registerClusterEvent mainly registers an event publisher and handles the change of ip. Can support dynamic update ip.
initAndStartLookup
private void initAndStartLookup() throws NacosException { // Get MemberLookup this.lookup = LookupFactory.createLookUp(this); isUseAddressServer = this.lookup.useAddressServer(); // Monitor files this.lookup.start(); }
A lookup function is created during initialization. The address is then monitored for updates. Here we take the frequently used cluster.file as an example to analyze.
public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException { if (!EnvUtil.getStandaloneMode()) { // In the case of non-standalone, choose a search method String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE); LookupType type = chooseLookup(lookupType); LOOK_UP = find(type); currentLookupType = type; } else { LOOK_UP = new StandaloneMemberLookup(); } LOOK_UP.injectMemberManager(memberManager); Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName()); return LOOK_UP; } private static LookupType chooseLookup(String lookupType) { if (StringUtils.isNotBlank(lookupType)) { // Configured directly with the configuration LookupType type = LookupType.sourceOf(lookupType); if (Objects.nonNull(type)) { return type; } } // First judge whether cluster.conf exists, and use the file mode if it exists File file = new File(EnvUtil.getClusterConfFilePath()); if (file.exists() || StringUtils.isNotBlank(EnvUtil.getMemberList())) { return LookupType.FILE_CONFIG; } // For non-files, use the configuration address of the configuration file application.properties return LookupType.ADDRESS_SERVER; }
In the above code, Nacos will look for the configured cluster address for the cluster mode. There are two ways, one is to use the cluster.conf file mode configuration, and the other is to configure nacos.member.list in application.properties. There is a priority to use here. If a certain method is configured, the configuration shall prevail. Otherwise, the cluster.conf file shall be used first, and application.properties shall be used if none of the above is used.
Continue to analyze this.lookup.start()
@Override public void start() throws NacosException { if (start.compareAndSet(false, true)) { // Start if not started doStart(); } } @Override public void doStart() throws NacosException { // read data from disk readClusterConfFromDisk(); // Use the inotify mechanism to monitor file changes and automatically // trigger the reading of cluster.conf try { // Monitor file changes and automatically modify service node members in the cluster WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher); } catch (Throwable e) { Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage()); } }
in the start method. It just reads the contents of the cluster.conf file, but it also monitors the changes of the file. And how is the file change monitoring done?
public static synchronized boolean registerWatcher(final String paths, FileWatcher watcher) throws NacosException { checkState(); if (NOW_WATCH_JOB_CNT == MAX_WATCH_FILE_JOB) { return false; } WatchDirJob job = MANAGER.get(paths); if (job == null) { job = new WatchDirJob(paths); // job thread asynchronous processing job.start(); MANAGER.put(paths, job); NOW_WATCH_JOB_CNT++; } job.addSubscribe(watcher); return true; } public void run() { // Continuous monitoring while (watch && !this.isInterrupted()) { try { final WatchKey watchKey = watchService.take(); final List<WatchEvent<?>> events = watchKey.pollEvents(); watchKey.reset(); if (callBackExecutor.isShutdown()) { return; } if (events.isEmpty()) { continue; } // File monitoring thread pool for processing callBackExecutor.execute(() -> { for (WatchEvent<?> event : events) { WatchEvent.Kind<?> kind = event.kind(); // Since the OS's event cache may be overflow, a backstop is needed if (StandardWatchEventKinds.OVERFLOW.equals(kind)) { eventOverflow(); } else { // Handle monitoring file time logic eventProcess(event.context()); } } }); } catch (InterruptedException | ClosedWatchServiceException ignore) { Thread.interrupted(); } catch (Throwable ex) { LOGGER.error("An exception occurred during file listening : ", ex); } } } private void eventProcess(Object context) { // Construct file change event final FileChangeEvent fileChangeEvent = FileChangeEvent.builder().paths(paths).context(context).build(); final String str = String.valueOf(context); for (final FileWatcher watcher : watchers) { if (watcher.interest(str)) { // Construct trigger event Runnable job = () -> watcher.onChange(fileChangeEvent); Executor executor = watcher.executor(); if (executor == null) { try { // direct execution job.run(); } catch (Throwable ex) { LOGGER.error("File change event callback error : ", ex); } } else { executor.execute(job); } } } }
The monitoring of this file is that there is a thread in the background, which constantly obtains the latest status. If the file is changed, it will notify other subscribers who are interested in this event as soon as possible.
AsyncRpcTask
After obtaining the service list, call the asynchronous task AsyncRpcTask through the thread pool.
class AsyncRpcTask implements Runnable { private Queue<NotifySingleRpcTask> queue; public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) { // The construction method is put into the queue of rpcTask this.queue = queue; } @Override public void run() { while (!queue.isEmpty()) { NotifySingleRpcTask task = queue.poll(); // Construct configuration change request ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest(); syncRequest.setDataId(task.getDataId()); syncRequest.setGroup(task.getGroup()); syncRequest.setBeta(task.isBeta); syncRequest.setLastModified(task.getLastModified()); syncRequest.setTag(task.tag); syncRequest.setTenant(task.getTenant()); Member member = task.member;, // If it is the current node, directly call dumpService to perform dump operation if (memberManager.getSelf().equals(member)) { if (syncRequest.isBeta()) { dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), syncRequest.getLastModified(), NetUtils.localIP(), true); } else { dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP()); } continue; } if (memberManager.hasMember(member.getAddress())) { // Start the health check, if there is an IP that is not monitored, it will be directly put into the notification queue, otherwise it will be notified boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress()); if (unHealthNeedDelay) { // Destination IP health is unhealthy, then put it in the notification list ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, member.getAddress()); // Get delay time and set failure count to task asyncTaskExecute(task); } else { // Determine whether to support long connection, use rpc for long connection, use http for non-long connection if (!MemberUtil.isSupportedLongCon(member)) { // Execute http requests asynchronously asyncTaskExecute( new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag, task.getLastModified(), member.getAddress(), task.isBeta)); } else { try { // rpc request configClusterRpcClientProxy .syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task)); } catch (Exception e) { MetricsMonitor.getConfigNotifyException().increment(); asyncTaskExecute(task); } } } } else { //No nothig if member has offline. } } } }
AsyncRpcTask mainly performs two important logics:
- The machine directly executes the dump operation
- Other service nodes judge whether to support long connection, non-long connection, that is, http mode, send http request asynchronously, and send grpc request for long connection
Summarize
This article mainly introduces the main logic of AsyncNotifyService and how to obtain service nodes in the cluster. However, AsyncRpcTask mainly performs two important logic dump s and message synchronization between service nodes. This will be analyzed in the next article.