Low water mark
The lowest water mark refers to that in the design mode of WAL (Write Ahead Log), the log marked before this position can be discarded.
Problem background
The WAL (Write Ahead Log) pre write log maintains every update to the storage. As time goes on, the log file will become infinitely large. Segmented Log This design mode allows us to process only one smaller file at a time, but if the log is not cleaned up, it will grow endlessly so that the hard disk is full.
Solution
The lowest water mark design mode will tell the system which part of the log can be deleted, that is, all logs before the lowest water mark can be cleared. Generally, a thread in the program runs a scheduled task to constantly check which part of the log can be cleaned up and delete these log files.
this.logCleaner = newLogCleaner(config); this.logCleaner.startup();
The LogCleaner here can be implemented with scheduled tasks:
public void startup() { scheduleLogCleaning(); } private void scheduleLogCleaning() { singleThreadedExecutor.schedule(() -> { cleanLogs(); }, config.getCleanTaskIntervalMs(), TimeUnit.MILLISECONDS); }
Implementation and example of minimum water mark based on snapshot
Most distributed consistency systems (such as Zookeeper (ZAB simplified paxos protocol) and etcd (raft protocol)) implement the snapshot mechanism. Under this mechanism, their storage engine will take full snapshots regularly, record the log location corresponding to the snapshot, and take this location as the lowest watermark.
//Take a snapshot public SnapShot takeSnapshot() { //Get recent log id Long snapShotTakenAtLogIndex = wal.getLastLogEntryId(); //Use this log id as the id to generate a snapshot return new SnapShot(serializeState(kv), snapShotTakenAtLogIndex); }
When a snapshot is generated and successfully stored on disk, the corresponding lowest watermark will be used to clean up the old log:
//Get all log files before this location according to the location List<WALSegment> getSegmentsBefore(Long snapshotIndex) { List<WALSegment> markedForDeletion = new ArrayList<>(); List<WALSegment> sortedSavedSegments = wal.sortedSavedSegments; for (WALSegment sortedSavedSegment : sortedSavedSegments) { //If the latest log id of this log file is less than the snapshot location, it proves that it can be cleaned up if (sortedSavedSegment.getLastLogEntryId() < snapshotIndex) { markedForDeletion.add(sortedSavedSegment); } } return markedForDeletion; }
Implementation of minimum water mark in zookeeper
The scheduled task is located in the start method of DatadirCleanupManager:
public void start() { //Start only once if (PurgeTaskStatus.STARTED == purgeTaskStatus) { LOG.warn("Purge task is already running."); return; } //Check the validity of timing interval if (purgeInterval <= 0) { LOG.info("Purge task is not scheduled."); return; } //Start scheduled task timer = new Timer("PurgeTask", true); TimerTask task = new PurgeTask(dataLogDir, snapDir,snapRetainCount); timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval)); purgeTaskStatus = PurgeTaskStatus.STARTED; }
The core method is the purge method of PurgeTxnLog:
public static void purge(File dataDir, File snapDir, int num) throws IOException { //The number of reserved snapshot s cannot exceed 3 if (num < 3) { throw new IllegalArgumentException(COUNT_ERR_MSG); } FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir); //Number of statistical documents List<File> snaps = txnLog.findNValidSnapshots(num); int numSnaps = snaps.size(); if (numSnaps > 0) { //Using the log offset of the previous file, clean up the log file and snapshot file purgeOlderSnapshots(txnLog, snaps.get(numSnaps - 1)); } } static void purgeOlderSnapshots(FileTxnSnapLog txnLog, File snapShot) { //The name includes zxid at the beginning, which represents the log location final long leastZxidToBeRetain = Util.getZxidFromName(snapShot.getName(), PREFIX_SNAPSHOT); final Set<File> retainedTxnLogs = new HashSet<File>(); retainedTxnLogs.addAll(Arrays.asList(txnLog.getSnapshotLogs(leastZxidToBeRetain))); class MyFileFilter implements FileFilter { private final String prefix; MyFileFilter(String prefix) { this.prefix = prefix; } public boolean accept(File f) { if (!f.getName().startsWith(prefix + ".")) { return false; } if (retainedTxnLogs.contains(f)) { return false; } long fZxid = Util.getZxidFromName(f.getName(), prefix); //Filter out the files to be deleted according to the zxid represented by the file name return fZxid < leastZxidToBeRetain; } } //Filter out qualified log files and snapshot files File[] logs = txnLog.getDataDir().listFiles(new MyFileFilter(PREFIX_LOG)); List<File> files = new ArrayList<>(); if (logs != null) { files.addAll(Arrays.asList(logs)); } File[] snapshots = txnLog.getSnapDir().listFiles(new MyFileFilter(PREFIX_SNAPSHOT)); if (snapshots != null) { files.addAll(Arrays.asList(snapshots)); } //Delete for (File f : files) { final String msg = String.format( "Removing file: %s\t%s", DateFormat.getDateTimeInstance().format(f.lastModified()), f.getPath()); LOG.info(msg); System.out.println(msg); if (!f.delete()) { System.err.println("Failed to remove " + f.getPath()); } } }
So when is the snapshot? Check the run method of SyncRequestProcessor. This method processes the request, records the operation log to the log file when processing the request, and takes a snapshot when necessary:
public void run() { try { //Avoid all server s taking snapshot s at the same time resetSnapshotStats(); lastFlushTime = Time.currentElapsedTime(); while (true) { //Get request code omission // Request operation record succeeded if (!si.isThrottled() && zks.getZKDatabase().append(si)) { //Need a snapshot if (shouldSnapshot()) { //Whether the snapshot is required to determine the relevant statistics during reset resetSnapshotStats(); //Start a new file zks.getZKDatabase().rollLog(); //For snapshots, obtain the lock first to ensure that there is only one snapshot in progress if (!snapThreadMutex.tryAcquire()) { LOG.warn("Too busy to snap, skipping"); } else { //Asynchronous snapshot new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { //Release lock snapThreadMutex.release(); } } }.start(); } } } //Omit others } } catch (Throwable t) { handleException(this.getName(), t); } }
resetSnapshotStats() sets the random start bit to prevent all instances in the cluster from taking snapshots at the same time:
private void resetSnapshotStats() { //Generate random roll, snapCount (default 100000) randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2); //Generate random size, snapSizeInBytes (default 4GB) randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2)); }
shouldSnapshot() determines whether a snapshot is needed according to the random start bit and configuration set at startup
private boolean shouldSnapshot() { //Get log count int logCount = zks.getZKDatabase().getTxnCount(); //Get size long logSize = zks.getZKDatabase().getTxnSize(); //When the number of logs is greater than snapCount (default 100000) / 2 + random roll, or the log size is greater than snapSizeInBytes (default 4GB) / 2 + random size return (logCount > (snapCount / 2 + randRoll)) || (snapSizeInBytes > 0 && logSize > (snapSizeInBytes / 2 + randSize)); }
``
Implementation and example of minimum water level based on time
In some systems, logs are not used to update the status of the system and can be deleted after a period of time, regardless of whether any subsystem can be deleted before the lowest water mark. For example, kafka keeps the log for 7 days by default, and RocketMQ keeps the commit log for 3 days by default.
Implementation of the lowest water mark in RocketMQ
In the addScheduleTask() method of DefaultMeesageStore, the scheduled task of cleaning is defined:
private void addScheduleTask() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { DefaultMessageStore.this.cleanFilesPeriodically(); } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); //Ignore other scheduled tasks } private void cleanFilesPeriodically() { //Clean up message store files this.cleanCommitLogService.run(); //Clean up consumption queue files this.cleanConsumeQueueService.run(); }
We only care about clearing the message store file here, that is, the deleteExpiredFiles method of DefaultMessageStore:
private void deleteExpiredFiles() { int deleteCount = 0; //The file retention time is the time interval between the last update time of the file and the present time. If it exceeds this time interval, it is considered that it can be cleaned up long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); //The interval between deleting files. More than one file may be deleted each time. This configuration specifies the minimum interval between deleting two files int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); //When cleaning up a file, it may be occupied by other threads, such as reading messages. At this time, it cannot be easily deleted //When triggered for the first time, a current time stamp is recorded. When the interval between the current time and the current time exceeds this configuration, it is forcibly deleted int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); //It's time to decide whether to delete boolean timeup = this.isTimeToDelete(); //Judge whether there is enough disk space boolean spacefull = this.isSpaceToDelete(); //Is it triggered manually boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; //If one of them is satisfied, clean it up if (timeup || spacefull || manualDelete) { if (manualDelete) this.manualDeleteFileSeveralTimes--; boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; fileReservedTime *= 60 * 60 * 1000; //Clean up files deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0) { } else if (spacefull) { log.warn("disk space will be full soon, but delete file failed."); } } }
Code for cleaning up files: deleteExpiredFileByTime method of MappedFile:
public int deleteExpiredFileByTime(final long expiredTime, final int deleteFilesInterval, final long intervalForcibly, final boolean cleanImmediately) { Object[] mfs = this.copyMappedFiles(0); if (null == mfs) return 0; //Get rid of the latest file int mfsLength = mfs.length - 1; int deleteCount = 0; List<MappedFile> files = new ArrayList<MappedFile>(); if (null != mfs) { for (int i = 0; i < mfsLength; i++) { MappedFile mappedFile = (MappedFile) mfs[i]; long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; //If the expiration time is exceeded, or it needs to be cleaned up immediately if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { //Close, clean up and delete files if (mappedFile.destroy(intervalForcibly)) { files.add(mappedFile); deleteCount++; if (files.size() >= DELETE_FILES_BATCH_MAX) { break; } //If the file deletion interval is configured, you need to wait if (deleteFilesInterval > 0 && (i + 1) < mfsLength) { try { Thread.sleep(deleteFilesInterval); } catch (InterruptedException e) { } } } else { break; } } else { //avoid deleting files in the middle break; } } } //Remove the deleted files from the file list deleteExpiredFile(files); return deleteCount; }
One brush per day can easily improve the technology and gain various offer s: