Introduction to [golang]grpc and streaming rpc


gRPC is a language neutral, platform neutral, high-performance and general open source RPC framework; It is developed based on protocol buffers (protocol buffers) serialization protocol and supports many development languages.

gRPC installation

Install protoc ol

From https://github.com/google/protobuf/releases Download the precompiled "protoc ol compiler" to generate gRPC service code.

Unzip the zip file and add the bin directory where the protoc ol binary file is located to the PATH environment variable.

Install the golang plug-in

After installing golang, set the environment:

go env -w GO111MODULE=auto
go env -w GOPROXY=https://goproxy.cn

Install proto go with the following command:

go get -u github.com/golang/protobuf/proto
go get -u github.com/golang/protobuf/protoc-gen-go

It can not be installed directly due to the wall:

git clone https://github.com/golang/protobuf.git
# Enter the protobuf / protoc Gen go directory
go build
go install
# At this time, there will be protocol Gen go in the bin directory under $GOPATH (add this directory to the PATH environment variable)

generate

To generate grpc, there are three steps:

  • to write. proto file
  • Use tools to remove the The proto file generates the code of the corresponding language
  • Write the code of server and client according to the generated code
// In the proto file, you need to set the package name
option go_package = "./;mytest";  //Path and package name

# After writing, generate the corresponding code () through protoc ol:
./protoc --go_out=plugins=grpc:proto -I proto/ ./proto/product.proto

Command parameter description:

  • -1: Specify the directory where proto files are stored; I f the file is not in the current directory, it is required;
  • --go_out=plugins=grpc:: specify the code to generate go, and specify the storage directory of go code after colon;

Simple gRPC

In gRPC, client applications can directly call application methods on remote services like local objects, making it easier to create distributed applications and services. It is based on the following concepts:

  • Define a service and specify the methods (including parameters and return types) that can be called remotely;
  • Implement this interface on the server side and run a gRPC server to handle client calls;
  • The client has a stub (the same method as the server) and the corresponding interface of the proxy server.

Define proto

First define a simple request and corresponding example

syntax = "proto3";

package service;

option go_package = "./;product";

service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
string name = 1;
}

message HelloReply {
string message = 1;
}

Put the sample file in the proto directory and execute the protoc command:

./protoc --go_out=plugins=grpc:proto -I proto/ ./proto/product.proto

The product.xml file is generated pb. Go file, which contains the code of server and client.

Server

To implement the services in proto, you need to implement the GreeterServer interface:

type GreeterServer interface {
	SayHello(context.Context, *HelloRequest) (*HelloReply, error)
}

First implement a corresponding service:

type server struct {
}

func (s *server) SayHello(ctx context.Context, in *product.HelloRequest) (*product.HelloReply, error) {
	log.Println("Input:", in)
	return &product.HelloReply{Message: "Hello " + in.Name}, nil
}

Then create the gRPC service:

func main() {
	// Create Tcp connection
	listen, err := net.Listen("tcp", ":5901")
	if err != nil {
		log.Println("Listen failed:", err)
		return
	}
	
	rpcServer := grpc.NewServer()
	
	// Registered service implementer
	// This function is in pb.go, automatic generation
	product.RegisterGreeterServer(rpcServer, &server{})

	// Register reflection service on gRPC service
	// So that you can query the service list or call the grpc method through the grpcurl tool
	// reflection.Register(grpcServer)

	err = grpcServer.Serve(listener)
	if err != nil {
		log.Fatalf("Start RPC Server fail: %v", err)
	}
}

client

The client can easily connect through Dial. To configure some parameters, you need to use DialContext:

func main() {
	//conn, err := grpc.Dial(":5901", grpc.WithInsecure())
	conn, err := grpc.DialContext(context.Background(), ":5901",
		grpc.WithInsecure(),
		grpc.WithKeepaliveParams(keepalive.ClientParameters{
			Time:                10 * time.Second, // send pings every 10 seconds if there is no activity
			Timeout:             2 * time.Second,  // wait 2 second for ping ack before considering the connection dead
			PermitWithoutStream: true,  // send pings even without active streams
		}))
	if err != nil {
		log.Println("Connect fail:", err)
		//return
	}
	defer conn.Close()

	count := 0
	client := product.NewGreeterClient(conn)
	for {
		time.Sleep(5 * time.Second)
		count++
		log.Println("Try", count)

		reply, err := client.SayHello(context.Background(), &product.HelloRequest{Name: "Mike" + strconv.Itoa(count)})

		if err != nil {
			log.Println("Call fail:", err)
			continue
		}

		log.Println("Reply:", reply)
	}
}

Streaming gRPC

Compared with simple gRPC, stream gRPC implements the scenario of sending / receiving a large amount of data or continuously transmitting data. It can be divided into:

  • ServerStream: the client sends a separate request, the server returns the streaming data, and the client reads the streaming data until EOF
    • After the server completes processing (all data is pushed), return nil means the response is completed;
    • The client passes err = = io EOF judges whether the server response is completed;
  • ClientStream, client push stream: the client writes streaming data and waits for the server to return the result after writing
    • After sending, the client closes the stream through * * CloseAndRecv * * and receives the response from the server;
    • The server passes err = = io EOF determines whether the client has finished sending. After that, use * * SendAndClose * * to close the stream and return a response;
  • BidirectionalStream: these two streams run independently, so the client and server can read and write in the order they need
    • The client and server push data to each other through stream. After pushing, close the stream through * * CloseSend * *; Pass err = = io EOF judges whether the server has responded (received data);
    • The server passes err = = io EOF determines whether the client has completed the response. return nil indicates that the response has been completed;

Define proto

Define data structure:

syntax = "proto3";
option go_package = ".;proto";

//Three streaming RPCs
//GetStream server return stream
//PutStream client upload stream
//Bistream bidirectional flow
service Stream{
  rpc GetStream(MsgRequest)returns(stream MsgReply){}
  rpc PutStream(stream MsgRequest)returns(MsgReply){}
  rpc BiStream(stream MsgRequest)returns(stream MsgReply){}
}

message MsgRequest {
  string data = 1;
}

message MsgReply {
  string data = 1;
}

Server

Define the service and implement the corresponding flow interface:

type server struct {
}

//Server - > client one-way flow
func (*server) GetStream(req *proto.MsgRequest, getServer proto.Stream_GetStreamServer) error {
	log.Println("GetServer Start.")
	i := 0
	for i < 10 {
		i++
		getServer.Send(&proto.MsgReply{Data: req.Data + ":" + fmt.Sprintf("%v", time.Now().Unix())})
		log.Println("Get Res Send.")
		time.Sleep(1 * time.Second)
	}
	log.Println("GetServer Start.")
	return nil
}

//Client - > server one-way flow
func (*server) PutStream(putServer proto.Stream_PutStreamServer) error {
	log.Println("PutServer Start.")
	var cliStr strings.Builder
	for {
		if putReq, err := putServer.Recv(); err == nil {
			log.Println("Put Req: " + putReq.Data)
			cliStr.WriteString(putReq.Data)
		} else {
			putServer.SendAndClose(&proto.MsgReply{Data: "Finish. Your Data is: " + cliStr.String()})
			break
		}
	}
	log.Println("PutServer Done.")
	return nil
}

//Bidirectional flow
func (*server) BiStream(biServer proto.Stream_BiStreamServer) error {
	log.Println("BiServer Start.")
	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		for {
			biReq, err := biServer.Recv()
			if err != nil {
				if err == io.EOF {
					log.Printf("[INFO] recv end")
				}
				break
			} else {
				log.Println("Bi Req: " + biReq.Data)
			}
		}
		wg.Done()
	}()

	go func() {
		for {
			err := biServer.Send(&proto.MsgReply{Data: "ok"})
			if err != nil {
				break
			} else {
				log.Println("Bi Res: ok")
				time.Sleep(time.Second)
			}
		}
		wg.Done()
	}()

	wg.Wait()
	log.Println("BiServer Done.")
	return nil
}

Start service:

func main() {
	//Listening port
	lis, err := net.Listen("tcp", ":5902")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	
	//Create a grpc server
	s := grpc.NewServer()
	//Registration event
	proto.RegisterStreamServer(s, &server{})
	
	// Register server reflection service
	// reflection.Register(s)
	
	//Processing links
	s.Serve(lis)
}

client

After calling the corresponding interface successfully, the corresponding flow object will be returned:

func main() {
	//New grpc connection
	grpcConn, err := grpc.Dial(":5902", grpc.WithInsecure())
	if err != nil {
		log.Fatalln(err)
	}
	defer grpcConn.Close()

	//Generate a client object through connection.
	c := proto.NewStreamClient(grpcConn)

	//Set timeout
	//ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	//defer cancel()
	ctx := context.Background()

	
	//Call the server-side push stream to obtain the server-side stream data
	log.Println("GetStream:")
	getClient, err := c.GetStream(ctx, &proto.MsgRequest{Data: "Get Time"})
	if err != nil {
		log.Fatalln(err)
		return
	}
	for {
		aa, err := getClient.Recv()
		if err != nil {
			if err == io.EOF {
				log.Printf("[INFO] recv end")
			}
			break
		}
		log.Println("Get Res Data: " + aa.Data)
	}

	
	//Client push stream
	log.Println("PutStream:")
	putClient, err := c.PutStream(ctx)
	if err != nil {
		log.Fatalln(err)
		return
	}
	i := 1
	for i < 4 {
		i++
		var putData = proto.MsgRequest{Data: "Put " + strconv.Itoa(i) + " "}
		log.Println("Put Req Data: " + putData.Data)
		putClient.Send(&putData)
		time.Sleep(time.Second)
	}
	putRes, err := putClient.CloseAndRecv()
	if err != nil {
		log.Fatalln(err)
	}
	log.Printf("Put Done. Res is %v", putRes.Data)

	
	//Bidirectional flow
	log.Println("BiStream:")
	//Set end wait
	done := make(chan struct{})
	biClient, err := c.BiStream(ctx)
	if err != nil {
		log.Fatalln(err)
		return
	}
	go func() {
		for {
			biRes, err := biClient.Recv()
			if err != nil {
				return
			} else {
				log.Println("Bi Res Data: " + biRes.Data)
			}
		}
	}()

	go func() {
		i := 1
		for i < 4 {
			i++
			biReq := proto.MsgRequest{Data: "send " + strconv.Itoa(i) + " "}
			log.Println("Bi Req Data: " + biReq.Data)
			biClient.Send(&biReq)
			time.Sleep(time.Second)
		}
        biClient.CloseSend()
		done <- struct{}{}
	}()

	<-done
	log.Println("All Done.")
}

proto3

proto3 is used to build protocol buffer data, including proto file syntax and how to base it proto files generate data access classes.

All data structures are defined by message:

  • Each field must specify a specific type (it can be a simple type or a message);
  • Each field shall be assigned a unique identification number and shall not be changed after being put into use:
    • For fields numbered 1 ~ 15, only one byte code is required;
    • For fields numbered from 16 to 2047, two byte codes are required;
    • The range of field number is: 1 ~ 536870911 (2 ^ 29-1). Among them, 19000 ~ 19999 field numbers reserved for ProtocolBuffer cannot be used.
  • Fields can be described using two rules:
    • Single: 0 or 1, which need not be indicated in the field definition;
    • Repeated: 0 to more than one, which should be indicated in the field definition.
  • Default value. When the message is parsed, if the field is not assigned, it will be set as the default value:
    • String: empty string
    • Bytes: empty bytes sequence
    • bool: false
    • Number type: 0
    • Enumeration type: the default value is the first value defined in the enumeration type, that is, 0
    • message type: depends on the compiled language; For repeated, the list is empty.

data type

The message scalar field can be one of the following types:

.protoC++JavaPythonGoC#Notes
doubledoubledoublefloatfloat64double
floatfloatfloatfloatfloat32float
int32int32intintint32intUsing variable length encoding is inefficient for negative values. If your domain may have negative values, please use sint64 instead
uint32uint32intint/longuint32uintUse variable length coding
uint64uint64longint/longuint64ulongUse variable length coding
sint32int32intintint32intUsing variable length codes, these codes are much more efficient than int32 at negative values
sint64int64longint/longint64longUsing variable length encoding, signed integer values. Coding is more efficient than the usual int64.
fixed32uint32intintuint32uintAlways 4 bytes. If the value is always greater than 228, this type will be more efficient than uint32.
fixed64uint64longint/longuint64ulongAlways 8 bytes. If the value is always greater than 256, this type will be more efficient than uint64.
sfixed32int32intintint32intAlways 4 bytes
sfixed64int64longint/longint64longAlways 8 bytes
boolboolbooleanboolboolbool
stringstringStringstr/unicodestringstringA string must be UTF-8 encoded or 7-bit ASCII encoded text.
bytesstringByteStringstr[]byteByteStringMay contain byte data in any order.

Tags: Go rpc grpc

Posted by avincent on Sun, 17 Apr 2022 17:56:20 +0930