# Flink real-time statistics history pv, uv

I have written three blogs about Flink's real-time statistics of pv and uv. Recently, I have made another attempt to use sql to calculate pv and uv of full data.

Stream Api writes real-time and offline pv and uv. There are no other obstacles except to write code

There are many obstacles to writing SQL api, such as no trigger in the window, unable to operate the state, udf is not as easy to use as process operator, etc

## problem

Preset two scenarios:
1. Count pv and uv by day
2. On the basis of solving problem 1, solve the statistics of historical pv and uv

## Realization idea

There are several ideas to realize real-time statistics of pv and uv

1. Calculate pv and uv of the current day directly using CUMULATE WINDOW
2. Calculate the pv and uv of the current day directly with CUMULATE WINDOW, and then obtain the pv of yesterday, and accumulate to obtain the pv based on history
3. pv calculation is the same as solution 2. udaf is used for uv calculation, and bloom filter is used to roughly calculate uv
4. pv calculation is the same as solution 2. udaf is used for uv calculation, and redis is used to record the user_id, user is obtained during each calculation_ The number of IDS is uv
5. pv calculation is the same as solution 2. udaf is used for uv calculation, and the historical user is obtained every time it is started_ The ID is cached in memory, plus the new user_id calculation uv
6. Global window, directly calculate the total pv and uv (meaningless, not implemented)

Note: due to the need for real-time output of results, CUMULATE WINDOW is selected for SQL

## Create table statement

The table creation statement only has data flow table, output table and lookup join output table

```CREATE TABLE user_log
(
user_id     VARCHAR
,item_id     VARCHAR
,category_id VARCHAR
,behavior    VARCHAR
,ts          TIMESTAMP(3)
,proc_time   as PROCTIME()
,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka'
,'topic' = 'user_log'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'user_log'
,'scan.startup.mode' = 'latest-offset'
,'format' = 'json'
);

create table if not exists user_log_lookup_join(
cal_day varchar
,behavior varchar
,pv  bigint
,uv  bigint
,PRIMARY KEY (cal_day, behavior) NOT ENFORCED
) with (
'connector' = 'jdbc'
,'url' = 'jdbc:mysql://localhost:3306/venn'
,'table-name' = 'pv_uv'
,'scan.partition.column' = 'cal_day'
,'scan.partition.num' = '1'
,'scan.partition.lower-bound' = '0'
,'scan.partition.upper-bound' = '9999'
,'lookup.cache.max-rows' = '1000'
-- one day, once cache, the value will not update
,'lookup.cache.ttl' = '86400000' -- ttl time No data for such a long time
);

create table if not exists user_log_sink(
cal_day varchar
,behavior varchar
,start_time VARCHAR
,end_time VARCHAR
,pv  bigint
,uv  bigint
,last_pv  bigint
,last_uv  bigint
,PRIMARY KEY (cal_day, behavior) NOT ENFORCED
) with (
--      'connector' = 'print'
'connector' = 'jdbc'
,'url' = 'jdbc:mysql://venn:3306/venn'
,'table-name' = 'pv_uv'
);

```

## Train of thought 1

It is a simple daily window of CUMULATE. count/count distinct is counted. The trigger event of the window is once every 10 seconds

The sql is as follows:

```insert into user_log_sink
select
date_format(window_start, 'yyyy-MM-dd') cal_day
,behavior
,date_format(window_start, 'HH:mm:ss') start_time
, date_format(window_end, 'HH:mm:ss') end_time
, count(user_id) pv
, count(distinct user_id) uv
FROM TABLE(
CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' SECOND, INTERVAL '1' DAY))
GROUP BY window_start, window_end, behavior
;

```

Conclusion: this can only output the pv and uv of the day in real time, and cannot calculate the pv and uv of history

## Train of thought 2

On the basis of train of thought 1, correlate yesterday's results

The sql is as follows:

```insert into user_log_sink
select
a.cal_day
,a.behavior
,'' start_time
,date_format(a.proc_time, 'yyyy-MM-dd HH:mm:ss')
,a.pv + COALESCE(c.pv,0) -- add last
,a.uv
,c.pv last_uv
,c.uv last_uv
from(
select
date_format(window_start, 'yyyy-MM-dd') cal_day
,behavior
,max(proc_time) proc_time
,count(user_id) pv
,count(distinct user_id) uv
FROM TABLE(
CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' SECOND, INTERVAL '1' DAY))
GROUP BY window_start, window_end, behavior
)a
ON  a.behavior = c.behavior
and udf_date_add_new(date_format(a.proc_time, 'yyyy-MM-dd HH:mm:ss'), -1) = c.cal_day
;

```

Conclusion: the CUMULATE window calculates the pv and uv of the day and adds the pv of yesterday to get the accumulated pv. The uv is still only today (the accumulation of uv value is meaningless)

## Train of thought 3

Based on idea 2, bloom filter is used to calculate uv

The sql is as follows:

```insert into user_log_sink
select
a.cal_day
,a.behavior
,'' start_time
,date_format(a.ts, 'yyyy-MM-dd HH:mm:ss')
,a.pv + COALESCE(c.pv,0) -- add last
,a.uv + COALESCE(c.uv,0)
,c.pv last_uv
,c.uv last_uv
from(
select
date_format(window_start, 'yyyy-MM-dd') cal_day
,behavior
,max(ts) ts
,max(proc_time) proc_time
,count(user_id) pv
,udaf_uv_count(user_id) uv
FROM TABLE(
CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' minute, INTERVAL '1' day))
GROUP BY window_start, window_end, behavior
)a
ON  a.behavior = c.behavior
and udf_date_add_new(date_format(a.proc_time, 'yyyy-MM-dd HH:mm:ss'), -1) = c.cal_day
;

```

bloom filter in udaf_ UV_ Implemented in count

```public class BloomFilter extends AggregateFunction<Integer, CountAcc > {

private final static Logger LOG = LoggerFactory.getLogger(BloomFilter.class);
@Override
public void open(FunctionContext context) throws Exception {
LOG.info("bloom filter open...");
// Create bloom filter object, expected data volume, misjudgment rate
Funnels.byteArrayFunnel(),
1000 * 10000,
0.01);
}

public void accumulate(CountAcc acc, String userId) {

if (userId == null || userId.length() == 0) {
return;
}
// parse userId to byte
byte[] arr = userId.getBytes(StandardCharsets.UTF_8);
// check userId exists bloom filter
if(!filter.mightContain(arr)){
// not exists
filter.put(arr);
// count ++
acc.count += 1;
}

}

@Override
public void close() throws Exception {
}

@Override
public Integer getValue(CountAcc acc) {
// get
return acc.count;
}

@Override
public CountAcc createAccumulator() {
CountAcc acc = new CountAcc();
return acc;
}

public void merge(CountAcc acc, Iterable<CountAcc> it) {
int last = acc.count;
StringBuilder builder = new StringBuilder();
for (CountAcc a : it) {
acc.count += a.count;
}
}

}

```

Conclusion: pv is like train of thought 2. The uv value can only get the value of the current window
reason:
1. bloom filter cannot return uv data
2. There is only data in the current window in the accumulator
3. The status of the window (start and end) cannot be obtained in udaf, and the data of the previous window cannot be recorded with global variables

Note: big guys can try it by themselves

## Train of thought 4

On the basis of idea 2, add new users each time_ Put the ID into redis and get the full number of users from redis when getting the value_ id

The SQL is as follows:

```insert into user_log_sink
select
a.cal_day
,a.behavior
,'' start_time
,date_format(a.ts, 'yyyy-MM-dd HH:mm:ss')
,a.pv + COALESCE(c.pv,0) -- add last
,a.uv + COALESCE(c.uv,0)
,c.pv last_uv
,c.uv last_uv
from(
select
date_format(window_start, 'yyyy-MM-dd') cal_day
,behavior
,max(ts) ts
,max(proc_time) proc_time
,count(user_id) pv
,udaf_redis_uv_count('user_log_uv', user_id) uv
FROM TABLE(
CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '10' minute, INTERVAL '1' day))
GROUP BY window_start, window_end, behavior
)a
ON  a.behavior = c.behavior
and udf_date_add_new(date_format(a.proc_time, 'yyyy-MM-dd HH:mm:ss'), -1) = c.cal_day
;

```

udf is implemented as follows:

```/**
* accumulate add user_id to redis
* getValue: get all redis user_id, count the uv
*/
public class RedisUv extends AggregateFunction<Integer, Integer> {

private final static Logger LOG = LoggerFactory.getLogger(RedisUv.class);
// "redis://localhost"
private String url;
private StatefulRedisConnection<String, String> connection;
private RedisClient redisClient;
private RedisCommands<String, String> sync;
private String key;

public RedisUv(String url, String key ) {
this.url = url;
this.key = key;
}

@Override
public void open(FunctionContext context) throws Exception {
// connect redis
reconnect();
}

public void reconnect() {
redisClient = RedisClient.create(this.url);
connection = redisClient.connect();
sync = connection.sync();
}

public void accumulate(Integer acc, String key, String userId) {

//        if (this.key == null) {
//            this.key = key;
//        }
int retry = 3;
while (retry >= 1) {
try {
sync.hset(key, userId, "0");
return;
} catch (Exception e) {
LOG.info("set redis error, retry");
reconnect();
retry -= 1;
}
}

}

@Override
public Integer getValue(Integer accumulator) {
long start = System.currentTimeMillis();
int size = 0;
if (this.key == null) {
return size;
}
// get all userId, count size
int retry = 3;
while (retry >= 1) {
try {
size = sync.hgetall(this.key).size();
break;
} catch (Exception e) {
LOG.info("set redis error, retry");
reconnect();
retry -= 1;
}
}
long end = System.currentTimeMillis();
LOG.info("count all cost : " + (end - start));
return size;
}

@Override
public Integer createAccumulator() {
return 0;
}

public void merge(Integer acc, Iterable<Integer> it) {
// do nothing
}
}

```

Conclusion: pv calculation is like train of thought 2, and it can accurately calculate the historical uv, but there is a serious performance problem (docker single redis, millions of user_id, which takes more than 500 ms at a time. With the increase of user data, the time will be longer)

Note: there is a problem. The key passed in from accumulate is not globally visible in udaf. Accumulate and getValue are not executed in one thread (or even on one server)

## Train of thought 5

Test 1 million numbers and put them in the map. gc shows that it uses 300+ M of memory and gives them up directly

```Heap
PSYoungGen      total 547840K, used 295484K [0x0000000715580000, 0x0000000738180000, 0x00000007c0000000)
eden space 526336K, 52% used [0x0000000715580000,0x000000072610f248,0x0000000735780000)
from space 21504K, 100% used [0x0000000736c80000,0x0000000738180000,0x0000000738180000)
to   space 21504K, 0% used [0x0000000735780000,0x0000000735780000,0x0000000736c80000)
ParOldGen       total 349696K, used 158905K [0x00000005c0000000, 0x00000005d5580000, 0x0000000715580000)
object space 349696K, 45% used [0x00000005c0000000,0x00000005c9b2e410,0x00000005d5580000)
Metaspace       used 15991K, capacity 16444K, committed 16512K, reserved 1062912K
class space    used 2022K, capacity 2173K, committed 2176K, reserved 1048576K

```

## Train of thought 6

The direct global window calculation of pv and uv is not very displayed. Firstly, there is no result that cannot be output in real time, and secondly, there is no historical value

## conclusion

1. If you only need the most recent, you can use the CUMULATE window directly
2. The pv of statistical history can be calculated by adding the pv of the current day to the historical value
3. It's better to count the full history of uv or stream api. It's very convenient to use the state or bloom filter algorithm here