LogAgent workflow:
1. Log reading - tailf third party Library
2. Log to kafka – sarama third party Library
kafka reference website
Introduction:
Kafaka is a distributed data flow platform, which can run on a single server or be deployed on multiple servers to form a cluster. It provides publish and subscribe functions. Users can send data to Kafka or read data from Kafka (for subsequent processing). Kafka has the characteristics of high throughput, low delay and high error tolerance.
1.Kafaka cluster architecture:
1.broker: each machine is a broker
2.topic: each log is classified
3.partition: partition. Divide the same topic into different partitions to increase the load
1.leader: the primary node of the partition (boss)
2.follower: the slave node of the partition (younger brother)
4.Consumer Group
2. Process for the producer to send data to Kafka (6 steps)
3.Kafka selects partition mode (3 kinds)
1. Specify which partition to write to
2. Specify the key. kafka hash es the key and decides which partition to write
3. Polling mode
4. Modes for producers to send data to kafka (3 types)
1.0: sending data to the leader is successful, with the highest efficiency and the lowest security
2.1: send the data to the leader and wait for the leader to return the ACK
3.all: send the data to the leader, the follower pulls the data from the leader, replies ack to the leader, and the leader replies ack; Highest security
5. Principle of partitioned file storage
6. Why kafka fast? (random reading becomes sequential reading, remembering the location of each file on the disk)
7. Consumer group
Each consumer instance can consume multiple partitions, but each partition can only be consumed by one instance in the consumer group at most.
How zookeeper works:
Whenever the host starts the service, it will register in zookeeper first, and zookeeper will generate a directory entry. When calling the service, it will check zookeeper first
tail Library:
//tailf read file instance package main import ( "fmt" "time" "github.com/hpcloud/tail" ) func main() { fileName := "./my.log" config := tail.Config{ ReOpen: true, //Reopen Follow: true, //Follow Location: &tail.SeekInfo{Offset: 0, Whence: 2}, //Where do I start reading the file MustExist: false, //No error is reported if the file does not exist Poll: true, } tails, err := tail.TailFile(fileName, config) if err != nil { fmt.Println("tail file failed,err:", err) return } var ( msg *tail.Line ok bool ) for { msg, ok = <-tails.Lines if !ok { fmt.Println("tail file close reopen,filename:", err) time.Sleep(time.Second) continue } fmt.Println("msg:", msg.Text) } }
Enabling kafka:
First execute zookeeper's zkserver cmd
Then execute the following command under D:\apache-zookeeper-3.8.0-bin:
kafka Enable command .\bin\windows\kafka-server-start.bat .\config\server.properties
Execute the following code:
package main import ( "fmt" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() //tailf package usage config.Producer.RequiredAcks = sarama.WaitForAll //After sending the data, both the leader and follow need to confirm config.Producer.Partitioner = sarama.NewRandomPartitioner //Select a new partition config.Producer.Return.Successes = true //The message of successful delivery will be returned in the success channel //Construct a message msg := &sarama.ProducerMessage{} msg.Topic = "web_log" msg.Value = sarama.StringEncoder("this is a test log") //Connect kafka client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config) if err != nil { fmt.Println("producer closed,err:", err) return } fmt.Println("connect kafka success!") defer client.Close() pid, offset, err := client.SendMessage(msg) //offset is the index position of the successfully written file if err != nil { fmt.Println("send msg failed,err:", err) return } fmt.Printf("pid:%v offset:%v\n", pid, offset) }