11. Nacos configuration service server source code analysis

In the previous article, we introduced the plug-in function of the configuration service, which can be extended very well through plug-ins. This article continues the content of the ConfigChangePublisher.notifyConfigChange part that was not analyzed in the previous article.

public static void notifyConfigChange(ConfigDataChangeEvent event) {
    if (PropertyUtil.isEmbeddedStorage() && !EnvUtil.getStandaloneMode()) {
        // Internal storage and non-singleton will not be processed
        return;
    }
    NotifyCenter.publishEvent(event);
}

The above code encounters the NotifyCenter that was analyzed before. For those who have not seen it, you can click here to view it. Now that you have reached the notification center, you can directly find the onEvent method of ConfigDataChangeEvent to view the processing logic.

@Autowired
// spring automatic injection
public AsyncNotifyService(ServerMemberManager memberManager) {
    this.memberManager = memberManager;
    // Register ConfigDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
    // Register a handler class
    NotifyCenter.registerSubscriber(new Subscriber() {
        @Override
        public void onEvent(Event event) {
            if (event instanceof ConfigDataChangeEvent) {
                ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                long dumpTs = evt.lastModifiedTs;
                String dataId = evt.dataId;
                String group = evt.group;
                String tenant = evt.tenant;
                String tag = evt.tag;
                // record monitor
                MetricsMonitor.incrementConfigChangeCount(tenant, group, dataId);
                // Get all service nodes
                Collection<Member> ipList = memberManager.allMembers();
                Queue<NotifySingleTask> httpQueue = new LinkedList<>();
                Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>();
                for (Member member : ipList) {
                    if (!MemberUtil.isSupportedLongCon(member)) {
                        httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                                                           evt.isBeta));
                    } else {
                        rpcQueue.add(
                            new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
                    }
                }
                if (!httpQueue.isEmpty()) {
                    ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));
                }
                if (!rpcQueue.isEmpty()) {
                    // Focus on Rpc processing
                    ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
                }

            }
        }
        @Override
        public Class<? extends Event> subscribeType() {
            return ConfigDataChangeEvent.class;
        }
    });
}

The ConfigDataChangeEvent listener processor is registered in the AsyncNotifyService construction method. And AsyncNotifyService belongs to spring for hosting, and this bean will be created when the container is created. Then create a queue, store other service nodes related to the configuration, and then make asynchronous notification calls through the thread pool.

ServerMemberManager

Let's first analyze the ServerMemberManager passed in AsyncNotifyService. It is the management of multi-service nodes in the cluster. Including obtaining all service node lists, whether a certain service node list is included, whether it is a local node, etc. The following is a brief analysis of how to get all the nodes.

public ServerMemberManager(ServletContext servletContext) throws Exception {
    // First create a HashMap that supports concurrency and skip tables
    this.serverList = new ConcurrentSkipListMap<>();
    EnvUtil.setContextPath(servletContext.getContextPath());
    // initialization
    init();
}

protected void init() throws NacosException {
    Loggers.CORE.info("Nacos-related cluster resource initialization");
    // Handle local ip
    this.port = EnvUtil.getProperty(SERVER_PORT_PROPERTY, Integer.class, DEFAULT_SERVER_PORT);
    this.localAddress = InetUtils.getSelfIP() + ":" + port;
    this.self = MemberUtil.singleParse(this.localAddress);
    this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);
    // init abilities.
    this.self.setAbilities(initMemberAbilities());
    // add local address
    serverList.put(self.getAddress(), self);
    // Register NodeChangeEvent publisher to NotifyManager
    registerClusterEvent();
    // Initialize some listeners and look for other configured service nodes
    initAndStartLookup();
    if (serverList.isEmpty()) {
        throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");
    }
    Loggers.CORE.info("The cluster resource is initialized");
}

ConcurrentSkipListMap is created in the constructor of ServerMemberManager, and ConcurrentSkipListMap can support more data and higher concurrency. In the subsequent initialization process, the local machine address is first added to the service list, and then the cluster event listener is registered and some configurations are initialized.

registerClusterEvent

private void registerClusterEvent() {
    // Register node change notification event
    NotifyCenter.registerToPublisher(MembersChangeEvent.class,
                                     EnvUtil.getProperty(MEMBER_CHANGE_EVENT_QUEUE_SIZE_PROPERTY, Integer.class,
                                                         DEFAULT_MEMBER_CHANGE_EVENT_QUEUE_SIZE));
    // Register an ip change listener, which can dynamically update the local ip
    NotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>() {
        @Override
        public void onEvent(InetUtils.IPChangeEvent event) {
            String newAddress = event.getNewIP() + ":" + port;
            ServerMemberManager.this.localAddress = newAddress;
            EnvUtil.setLocalAddress(localAddress);
            Member self = ServerMemberManager.this.self;
            self.setIp(event.getNewIP());
            String oldAddress = event.getOldIP() + ":" + port;
            ServerMemberManager.this.serverList.remove(oldAddress);
            ServerMemberManager.this.serverList.put(newAddress, self);
            ServerMemberManager.this.memberAddressInfos.remove(oldAddress);
            ServerMemberManager.this.memberAddressInfos.add(newAddress);
        }
        @Override
        public Class<? extends Event> subscribeType() {
            return InetUtils.IPChangeEvent.class;
        }
    });
}

registerClusterEvent mainly registers an event publisher and handles the change of ip. Can support dynamic update ip.

initAndStartLookup

private void initAndStartLookup() throws NacosException {
    // Get MemberLookup
    this.lookup = LookupFactory.createLookUp(this);
    isUseAddressServer = this.lookup.useAddressServer();
    // Monitor files
    this.lookup.start();
}

A lookup function is created during initialization. The address is then monitored for updates. Here we take the frequently used cluster.file as an example to analyze.

public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {
    if (!EnvUtil.getStandaloneMode()) {
        // In the case of non-standalone, choose a search method
        String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);
        LookupType type = chooseLookup(lookupType);
        LOOK_UP = find(type);
        currentLookupType = type;
    } else {
        LOOK_UP = new StandaloneMemberLookup();
    }
    LOOK_UP.injectMemberManager(memberManager);
    Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());
    return LOOK_UP;
}

private static LookupType chooseLookup(String lookupType) {
    if (StringUtils.isNotBlank(lookupType)) {
        // Configured directly with the configuration
        LookupType type = LookupType.sourceOf(lookupType);
        if (Objects.nonNull(type)) {
            return type;
        }
    }
    // First judge whether cluster.conf exists, and use the file mode if it exists
    File file = new File(EnvUtil.getClusterConfFilePath());
    if (file.exists() || StringUtils.isNotBlank(EnvUtil.getMemberList())) {
        return LookupType.FILE_CONFIG;
    }
    // For non-files, use the configuration address of the configuration file application.properties
    return LookupType.ADDRESS_SERVER;
}

In the above code, Nacos will look for the configured cluster address for the cluster mode. There are two ways, one is to use the cluster.conf file mode configuration, and the other is to configure nacos.member.list in application.properties. There is a priority to use here. If a certain method is configured, the configuration shall prevail. Otherwise, the cluster.conf file shall be used first, and application.properties shall be used if none of the above is used.

Continue to analyze this.lookup.start()

@Override
public void start() throws NacosException {
    if (start.compareAndSet(false, true)) {
        // Start if not started
        doStart();
    }
}
 @Override
public void doStart() throws NacosException {
    // read data from disk
    readClusterConfFromDisk();
    // Use the inotify mechanism to monitor file changes and automatically
    // trigger the reading of cluster.conf
    try {
        // Monitor file changes and automatically modify service node members in the cluster
        WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
    } catch (Throwable e) {
        Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());
    }
}

in the start method. It just reads the contents of the cluster.conf file, but it also monitors the changes of the file. And how is the file change monitoring done?

public static synchronized boolean registerWatcher(final String paths, FileWatcher watcher) throws NacosException {
    checkState();
    if (NOW_WATCH_JOB_CNT == MAX_WATCH_FILE_JOB) {
        return false;
    }
    WatchDirJob job = MANAGER.get(paths);
    if (job == null) {
        job = new WatchDirJob(paths);
        // job thread asynchronous processing
        job.start();
        MANAGER.put(paths, job);
        NOW_WATCH_JOB_CNT++;
    }
    job.addSubscribe(watcher);
    return true;
}

public void run() {
    // Continuous monitoring
    while (watch && !this.isInterrupted()) {
        try {
            final WatchKey watchKey = watchService.take();
            final List<WatchEvent<?>> events = watchKey.pollEvents();
            watchKey.reset();
            if (callBackExecutor.isShutdown()) {
                return;
            }
            if (events.isEmpty()) {
                continue;
            }
            // File monitoring thread pool for processing
            callBackExecutor.execute(() -> {
                for (WatchEvent<?> event : events) {
                    WatchEvent.Kind<?> kind = event.kind();
                    // Since the OS's event cache may be overflow, a backstop is needed
                    if (StandardWatchEventKinds.OVERFLOW.equals(kind)) {
                        eventOverflow();
                    } else {
                        // Handle monitoring file time logic
                        eventProcess(event.context());
                    }
                }
            });
        } catch (InterruptedException | ClosedWatchServiceException ignore) {
            Thread.interrupted();
        } catch (Throwable ex) {
            LOGGER.error("An exception occurred during file listening : ", ex);
        }
    }
}

private void eventProcess(Object context) {
    // Construct file change event
    final FileChangeEvent fileChangeEvent = FileChangeEvent.builder().paths(paths).context(context).build();
    final String str = String.valueOf(context);
    for (final FileWatcher watcher : watchers) {
        if (watcher.interest(str)) {
            // Construct trigger event
            Runnable job = () -> watcher.onChange(fileChangeEvent);
            Executor executor = watcher.executor();
            if (executor == null) {
                try {
                    // direct execution
                    job.run();
                } catch (Throwable ex) {
                    LOGGER.error("File change event callback error : ", ex);
                }
            } else {
                executor.execute(job);
            }
        }
    }
}

The monitoring of this file is that there is a thread in the background, which constantly obtains the latest status. If the file is changed, it will notify other subscribers who are interested in this event as soon as possible.

AsyncRpcTask

After obtaining the service list, call the asynchronous task AsyncRpcTask through the thread pool.

class AsyncRpcTask implements Runnable {
    private Queue<NotifySingleRpcTask> queue;
    public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) {
        // The construction method is put into the queue of rpcTask
        this.queue = queue;
    }
    @Override
    public void run() {
        while (!queue.isEmpty()) {
            NotifySingleRpcTask task = queue.poll();
            // Construct configuration change request
            ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
            syncRequest.setDataId(task.getDataId());
            syncRequest.setGroup(task.getGroup());
            syncRequest.setBeta(task.isBeta);
            syncRequest.setLastModified(task.getLastModified());
            syncRequest.setTag(task.tag);
            syncRequest.setTenant(task.getTenant());
            Member member = task.member;,
            // If it is the current node, directly call dumpService to perform dump operation
            if (memberManager.getSelf().equals(member)) {
                if (syncRequest.isBeta()) {
                    dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                                     syncRequest.getLastModified(), NetUtils.localIP(), true);
                } else {
                    dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
                                     syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
                }
                continue;
            }

            if (memberManager.hasMember(member.getAddress())) {
                // Start the health check, if there is an IP that is not monitored, it will be directly put into the notification queue, otherwise it will be notified
                boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());
                if (unHealthNeedDelay) {
                    // Destination IP health is unhealthy, then put it in the notification list
                    ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                                                      task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
                                                      0, member.getAddress());
                    // Get delay time and set failure count to task
                    asyncTaskExecute(task);
                } else {
                    // Determine whether to support long connection, use rpc for long connection, use http for non-long connection
                    if (!MemberUtil.isSupportedLongCon(member)) {
                        // Execute http requests asynchronously
                        asyncTaskExecute(
                            new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,
                                                 task.getLastModified(), member.getAddress(), task.isBeta));
                    } else {
                        try {
                            // rpc request
                            configClusterRpcClientProxy
                                .syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
                        } catch (Exception e) {
                            MetricsMonitor.getConfigNotifyException().increment();
                            asyncTaskExecute(task);
                        }
                    }

                }
            } else {
                //No nothig if  member has offline.
            }

        }
    }
}

AsyncRpcTask mainly performs two important logics:

  1. The machine directly executes the dump operation
  2. Other service nodes judge whether to support long connection, non-long connection, that is, http mode, send http request asynchronously, and send grpc request for long connection

Summarize

This article mainly introduces the main logic of AsyncNotifyService and how to obtain service nodes in the cluster. However, AsyncRpcTask mainly performs two important logic dump s and message synchronization between service nodes. This will be analyzed in the next article.

Tags: Java Back-end Distribution Middleware

Posted by Phasma Felis on Sat, 01 Apr 2023 04:27:09 +1030