Do you know the lowest water mark idea of distributed system design pattern?

Low water mark

The lowest water mark refers to that in the design mode of WAL (Write Ahead Log), the log marked before this position can be discarded.

Problem background

The WAL (Write Ahead Log) pre write log maintains every update to the storage. As time goes on, the log file will become infinitely large. Segmented Log This design mode allows us to process only one smaller file at a time, but if the log is not cleaned up, it will grow endlessly so that the hard disk is full.


The lowest water mark design mode will tell the system which part of the log can be deleted, that is, all logs before the lowest water mark can be cleared. Generally, a thread in the program runs a scheduled task to constantly check which part of the log can be cleaned up and delete these log files.

this.logCleaner = newLogCleaner(config);

The LogCleaner here can be implemented with scheduled tasks:

public void startup() {

private void scheduleLogCleaning() {
    singleThreadedExecutor.schedule(() -> {
    }, config.getCleanTaskIntervalMs(), TimeUnit.MILLISECONDS);

Implementation and example of minimum water mark based on snapshot

Most distributed consistency systems (such as Zookeeper (ZAB simplified paxos protocol) and etcd (raft protocol)) implement the snapshot mechanism. Under this mechanism, their storage engine will take full snapshots regularly, record the log location corresponding to the snapshot, and take this location as the lowest watermark.

//Take a snapshot
public SnapShot takeSnapshot() {
    //Get recent log id
    Long snapShotTakenAtLogIndex = wal.getLastLogEntryId();
    //Use this log id as the id to generate a snapshot
    return new SnapShot(serializeState(kv), snapShotTakenAtLogIndex);

When a snapshot is generated and successfully stored on disk, the corresponding lowest watermark will be used to clean up the old log:

//Get all log files before this location according to the location
List<WALSegment> getSegmentsBefore(Long snapshotIndex) {
    List<WALSegment> markedForDeletion = new ArrayList<>();
    List<WALSegment> sortedSavedSegments = wal.sortedSavedSegments;
    for (WALSegment sortedSavedSegment : sortedSavedSegments) {
        //If the latest log id of this log file is less than the snapshot location, it proves that it can be cleaned up
        if (sortedSavedSegment.getLastLogEntryId() < snapshotIndex) {
    return markedForDeletion;

Implementation of minimum water mark in zookeeper

The scheduled task is located in the start method of DatadirCleanupManager:

public void start() {
    //Start only once
    if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
        LOG.warn("Purge task is already running.");
    //Check the validity of timing interval
    if (purgeInterval <= 0) {"Purge task is not scheduled.");
    //Start scheduled task
    timer = new Timer("PurgeTask", true);
    TimerTask task = new PurgeTask(dataLogDir, snapDir,snapRetainCount);
    timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
    purgeTaskStatus = PurgeTaskStatus.STARTED;

The core method is the purge method of PurgeTxnLog:

public static void purge(File dataDir, File snapDir, int num) throws IOException {
    //The number of reserved snapshot s cannot exceed 3
    if (num < 3) {
        throw new IllegalArgumentException(COUNT_ERR_MSG);

    FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
    //Number of statistical documents
    List<File> snaps = txnLog.findNValidSnapshots(num);
    int numSnaps = snaps.size();
    if (numSnaps > 0) {
        //Using the log offset of the previous file, clean up the log file and snapshot file
        purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1));

static void purgeOlderSnapshots(FileTxnSnapLog txnLog, File snapShot) {
    //The name includes zxid at the beginning, which represents the log location
    final long leastZxidToBeRetain = Util.getZxidFromName(snapShot.getName(), PREFIX_SNAPSHOT);
    final Set<File> retainedTxnLogs = new HashSet<File>();
    class MyFileFilter implements FileFilter {

        private final String prefix;
        MyFileFilter(String prefix) {
            this.prefix = prefix;
        public boolean accept(File f) {
            if (!f.getName().startsWith(prefix + ".")) {
                return false;
            if (retainedTxnLogs.contains(f)) {
                return false;
            long fZxid = Util.getZxidFromName(f.getName(), prefix);
            //Filter out the files to be deleted according to the zxid represented by the file name
            return fZxid < leastZxidToBeRetain;

    //Filter out qualified log files and snapshot files
    File[] logs = txnLog.getDataDir().listFiles(new MyFileFilter(PREFIX_LOG));
    List<File> files = new ArrayList<>();
    if (logs != null) {
    File[] snapshots = txnLog.getSnapDir().listFiles(new MyFileFilter(PREFIX_SNAPSHOT));
    if (snapshots != null) {
    for (File f : files) {
        final String msg = String.format(
            "Removing file: %s\t%s",

        if (!f.delete()) {
            System.err.println("Failed to remove " + f.getPath());


So when is the snapshot? Check the run method of SyncRequestProcessor. This method processes the request, records the operation log to the log file when processing the request, and takes a snapshot when necessary:

public void run() {
    try {
        //Avoid all server s taking snapshot s at the same time
        lastFlushTime = Time.currentElapsedTime();
        while (true) {
            //Get request code omission
            // Request operation record succeeded
            if (!si.isThrottled() && zks.getZKDatabase().append(si)) {
                //Need a snapshot
                if (shouldSnapshot()) {
                    //Whether the snapshot is required to determine the relevant statistics during reset
                    //Start a new file
                    //For snapshots, obtain the lock first to ensure that there is only one snapshot in progress
                    if (!snapThreadMutex.tryAcquire()) {
                        LOG.warn("Too busy to snap, skipping");
                    } else {
                        //Asynchronous snapshot
                        new ZooKeeperThread("Snapshot Thread") {
                            public void run() {
                                try {
                                } catch (Exception e) {
                                    LOG.warn("Unexpected exception", e);
                                } finally {
                                    //Release lock
            //Omit others
    } catch (Throwable t) {
        handleException(this.getName(), t);

resetSnapshotStats() sets the random start bit to prevent all instances in the cluster from taking snapshots at the same time:

private void resetSnapshotStats() {
    //Generate random roll, snapCount (default 100000)
    randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2);
    //Generate random size, snapSizeInBytes (default 4GB)
    randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2));

shouldSnapshot() determines whether a snapshot is needed according to the random start bit and configuration set at startup

private boolean shouldSnapshot() {
    //Get log count
    int logCount = zks.getZKDatabase().getTxnCount();
    //Get size
    long logSize = zks.getZKDatabase().getTxnSize();
    //When the number of logs is greater than snapCount (default 100000) / 2 + random roll, or the log size is greater than snapSizeInBytes (default 4GB) / 2 + random size
    return (logCount > (snapCount / 2 + randRoll))
           || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize));


Implementation and example of minimum water level based on time

In some systems, logs are not used to update the status of the system and can be deleted after a period of time, regardless of whether any subsystem can be deleted before the lowest water mark. For example, kafka keeps the log for 7 days by default, and RocketMQ keeps the commit log for 3 days by default.

Implementation of the lowest water mark in RocketMQ

In the addScheduleTask() method of DefaultMeesageStore, the scheduled task of cleaning is defined:

private void addScheduleTask() {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        public void run() {
    }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
    //Ignore other scheduled tasks

private void cleanFilesPeriodically() {
    //Clean up message store files;
    //Clean up consumption queue files;

We only care about clearing the message store file here, that is, the deleteExpiredFiles method of DefaultMessageStore:

private void deleteExpiredFiles() {
    int deleteCount = 0;
    //The file retention time is the time interval between the last update time of the file and the present time. If it exceeds this time interval, it is considered that it can be cleaned up
    long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    //The interval between deleting files. More than one file may be deleted each time. This configuration specifies the minimum interval between deleting two files
    int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    //When cleaning up a file, it may be occupied by other threads, such as reading messages. At this time, it cannot be easily deleted
    //When triggered for the first time, a current time stamp is recorded. When the interval between the current time and the current time exceeds this configuration, it is forcibly deleted
    int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

    //It's time to decide whether to delete
    boolean timeup = this.isTimeToDelete();
    //Judge whether there is enough disk space
    boolean spacefull = this.isSpaceToDelete();
    //Is it triggered manually
    boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

    //If one of them is satisfied, clean it up
    if (timeup || spacefull || manualDelete) {

        if (manualDelete)
        boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
        fileReservedTime *= 60 * 60 * 1000;
        //Clean up files
        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
            destroyMapedFileIntervalForcibly, cleanAtOnce);
        if (deleteCount > 0) {
        } else if (spacefull) {
            log.warn("disk space will be full soon, but delete file failed.");

Code for cleaning up files: deleteExpiredFileByTime method of MappedFile:

public int deleteExpiredFileByTime(final long expiredTime,
    final int deleteFilesInterval,
    final long intervalForcibly,
    final boolean cleanImmediately) {
    Object[] mfs = this.copyMappedFiles(0);

    if (null == mfs)
        return 0;

    //Get rid of the latest file
    int mfsLength = mfs.length - 1;
    int deleteCount = 0;
    List<MappedFile> files = new ArrayList<MappedFile>();
    if (null != mfs) {
        for (int i = 0; i < mfsLength; i++) {
            MappedFile mappedFile = (MappedFile) mfs[i];
            long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
            //If the expiration time is exceeded, or it needs to be cleaned up immediately
            if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                //Close, clean up and delete files
                if (mappedFile.destroy(intervalForcibly)) {

                    if (files.size() >= DELETE_FILES_BATCH_MAX) {

                    //If the file deletion interval is configured, you need to wait
                    if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                        try {
                        } catch (InterruptedException e) {
                } else {
            } else {
                //avoid deleting files in the middle

    //Remove the deleted files from the file list

    return deleteCount;

One brush per day can easily improve the technology and gain various offer s:

Tags: Design Pattern Zookeeper kafka etcd mark aslist

Posted by utdfederation on Tue, 19 Apr 2022 08:09:31 +0930