Implementation of transactional output in flex exactly once series

Flink exactly once series Directory: 1, Two phase submission overview 2, Two stage submission implementation analysis 3, FileStream analysis 4, Transactional output implementation 5, Final consistency implementation According to the previous analysis, Flink can realize the internal exactly once semantics through the status and checkpoint mechanism. For the end-to-end exactly once semantics, Flink provides Kafka/HDFS output support through two-stage submission. The two-stage submission implementation is implemented in combination with the hook provided by the checkpoint process to realize the interface between CheckpointedFunction and CheckpointListener: 1. The initializestate method is used to restore and resubmit the transaction state 2. In the snapshotstate method, start the transaction and write the data to be output to the state 3. The notifycheckpointcomplete method commits the transaction The implementation of using flynk requires inheriting the TwoPhaseCommitSinkFunction class and implementing beginTransaction, preCommit, commit and abort. Although it is convenient to use, it has a limitation that the provided transaction hook (analogy Connection) can be serialized and the previous transaction can be committed after deserialization, which is impossible for many transactional databases, Therefore, we need to implement a unique set of transaction submission. The main problem of two-stage commit is that in the second stage, there may be partial success and partial failure in commit, so there is transaction fault-tolerant recovery. The restart of commit failure continues to commit. The restart of commit success is idempotent and will not affect the result of data. Now there is no such a serializable transaction hook. In addition, the data to be submitted is also subject to state fault tolerance. However, Flink provides a unique ID in the checkpoint mechanism. It is user accessible, monotonous and fault-tolerant. After a task fails, it will continue to increase from the last success point. Then you can use the checkpoint ID as the handle of transaction submission. First, let's take a look at the logical process:

1. invoke method: add the data to be submitted to the memory List 2. snapshotState method: store the checkpoint ID and list in the state 3. notifyCheckpointComplete method: submit the list and checkpointId for transaction, and use checkpointId as CAS mechanism 4. initializeState method: recover the checkpoint ID and list data from the state, and do the same transactional submission

Code implementation:

public abstract class CommonTwoPhaseCommit<IN extends Serializable> extends RichSinkFunction<IN>

        implements CheckpointedFunction, CheckpointListener {

    private long checkpointId;

    private List<IN> dataList;

    private ListState<IN> dataListState;

    private ListState<Long> checkpointIdState;

    @Override public void initializeState(FunctionInitializationContext context) throws Exception {

        dataList=new ArrayList<>();


        checkpointIdState=context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("checkpointI",Long.class));






            Iterator<Long> ckIdIter=checkpointIdState.get().iterator();





    @Override public void invoke(IN value, Context context) throws Exception {



    @Override public void snapshotState(FunctionSnapshotContext context) throws Exception {








    @Override public void notifyCheckpointComplete(long checkpointId) throws Exception {




     * Use the checkpoint to compare with the existing value in the database. It is required to be exactly 1 larger than it

     * @param data

     * @param checkpointId


    public abstract void commit(Iterable<IN> data,long checkpointId);


Then you only need to inherit the CommonTwoPhaseCommit class, implement its commit method and commit the transaction. At present, this scheme is used in the delay compensation processing of window aggregation. The output end is MySql. Later, we will study how to do consistency processing for Redis and other databases.

Posted by matstuff on Mon, 18 Apr 2022 12:14:20 +0930