Introduction notes to golang - tail and kafka

LogAgent workflow:
1. Log reading - tailf third party Library
2. Log to kafka – sarama third party Library

kafka reference website
Kafaka is a distributed data flow platform, which can run on a single server or be deployed on multiple servers to form a cluster. It provides publish and subscribe functions. Users can send data to Kafka or read data from Kafka (for subsequent processing). Kafka has the characteristics of high throughput, low delay and high error tolerance.

1.Kafaka cluster architecture: each machine is a broker
2.topic: each log is classified
3.partition: partition. Divide the same topic into different partitions to increase the load
1.leader: the primary node of the partition (boss)
2.follower: the slave node of the partition (younger brother)
4.Consumer Group
2. Process for the producer to send data to Kafka (6 steps)

3.Kafka selects partition mode (3 kinds)
1. Specify which partition to write to
2. Specify the key. kafka hash es the key and decides which partition to write
3. Polling mode
4. Modes for producers to send data to kafka (3 types)
1.0: sending data to the leader is successful, with the highest efficiency and the lowest security
2.1: send the data to the leader and wait for the leader to return the ACK
3.all: send the data to the leader, the follower pulls the data from the leader, replies ack to the leader, and the leader replies ack; Highest security
5. Principle of partitioned file storage
6. Why kafka fast? (random reading becomes sequential reading, remembering the location of each file on the disk)
7. Consumer group

Each consumer instance can consume multiple partitions, but each partition can only be consumed by one instance in the consumer group at most.

How zookeeper works:
Whenever the host starts the service, it will register in zookeeper first, and zookeeper will generate a directory entry. When calling the service, it will check zookeeper first

tail Library:

//tailf read file instance
package main

import (


func main() {
	fileName := "./my.log"
	config := tail.Config{
		ReOpen:    true,                                 //Reopen
		Follow:    true,                                 //Follow
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //Where do I start reading the file
		MustExist: false,                                //No error is reported if the file does not exist
		Poll:      true,                                 
	tails, err := tail.TailFile(fileName, config)
	if err != nil {
		fmt.Println("tail file failed,err:", err)
	var (
		msg *tail.Line
		ok  bool
	for {
		msg, ok = <-tails.Lines
		if !ok {
			fmt.Println("tail file close reopen,filename:", err)
		fmt.Println("msg:", msg.Text)

Enabling kafka:
First execute zookeeper's zkserver cmd
Then execute the following command under D:\apache-zookeeper-3.8.0-bin:

kafka Enable command
.\bin\windows\kafka-server-start.bat .\config\

Execute the following code:

package main

import (


func main() {
	config := sarama.NewConfig()
	//tailf package usage
	config.Producer.RequiredAcks = sarama.WaitForAll          //After sending the data, both the leader and follow need to confirm
	config.Producer.Partitioner = sarama.NewRandomPartitioner //Select a new partition
	config.Producer.Return.Successes = true                   //The message of successful delivery will be returned in the success channel
	//Construct a message
	msg := &sarama.ProducerMessage{}
	msg.Topic = "web_log"
	msg.Value = sarama.StringEncoder("this is a test log")
	//Connect kafka
	client, err := sarama.NewSyncProducer([]string{""}, config)
	if err != nil {
		fmt.Println("producer closed,err:", err)
	fmt.Println("connect kafka success!")
	defer client.Close()
	pid, offset, err := client.SendMessage(msg) //offset is the index position of the successfully written file
	if err != nil {
		fmt.Println("send msg failed,err:", err)
	fmt.Printf("pid:%v offset:%v\n", pid, offset)

Tags: Go

Posted by misterm on Thu, 14 Apr 2022 14:18:33 +0930