Back to Posts

使用gRPC创建简单聊天程序

Posted in Tech

作为一篇gRPC调研的总结,本篇主要介绍了gRPC在go语言上的使用方式,并着重记录了如何使用gRPC创建一个简单的聊天程序。

gRPC介绍

gRPC是Google开源的一款远程进程调用(RPC)框架。它使用http/2传输协议,使用Protocol Buffer作为编码协议。

这使得框架本身就带有全双工,消息压缩,节省带宽,跨语言等特性。

环境准备

在安装好go语言环境的情况下,还要额外安装以下工具和库。

获取gRPC库

go get -u google.golang.org/grpc

安装Protocol Buffer v3相关工具

下载protoc编译工具

可以在github上下载现成的二进制包。

解压,并将protoc命令工具移入PATH环境变量中。

下载go语言的protoc插件

protoc需要对应语言的插件才能把pb文件转换成对应的语言,所以下载完protoc之后,需要下载pb的go语言插件:

go get -u github.com/golang/protobuf/protoc-gen-go

同样,将protoc-gen-go加入到PATH环境变量。

pb文件定义

简单例子

pb中可以使用service关键字定义对应的RPC服务集合,并使用rpc关键字定义对应的方法名、接收参数和返回参数;

例如,我们要定义聊天服务,并定义一个交换姓名的RPC接口,可以定义如下pb文件:

service ChatService{
	rcp ExchangeNames(Names) returns (Names);
}

message Names {
	string firstName = 1;
	string secondName = 2;
}

以上定义了一个聊天的服务ChatService,这个服务中有一个方法叫ExchangeNames,是用来交换用户的名字信息,它接收和返回的Message都为Names类型。

不同的方法类型

Unary RPCs

以上定义的ExchangeNames是最简单的一种方法类型,称之为一元RPC(Unary RPCs);

这类方法十分简单,接收一个参数,然后返回另外一个参数,与平常的方法调用无异。

Stream RPCs

grpc支持流类型(stream),不管是参数还是返回值。

如下发送和接收对方的聊天用户列表:

service ChatService{
	rpc GetList(Count) returns (stream Names);
	rpc SendList(stream Names) returns (Count);
}

message Count {
    int32 count = 1;
    int32 offset = 2;
}
Bidirectional streaming RPCs

如果接收参数和发送参数都为流,则这种方法被称之为Bidirectional streaming RPCs。

例如聊天接口定义如下:

service ChatService{
	rpc Chat(stream Msg) returns (stream Msg);
}
message Msg {
	string sender;
	string msg;
}

编译pb文件到go代码

以以上的pb文件定义的service作为例子,一步一步实现这些gRPC服务。

创建项目

在GOPATH的src下建立目录grpc_chat作为项目的根目录,我们要在这个目录下依次建立聊天的服务端程序和客户端程序。

创建目录结构
grpc_chat
├── client
├── grpc_chat
└── server

其中grpc_chat为pb文件和对应的编译完成的go文件所在目录;

client和server分别为客户端程序和服务端程序。

创建pb文件

我们将之前介绍的pb文件写入grpc_chat/grcp_chat.proto文件中:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.grpc_chat";
option java_outer_classname = "GrpcChat";

package grpc_chat;

service ChatService{
	rpc ExchangeNames(Names) returns (Names);
	rpc GetList(Count) returns (stream Names);
	rpc SendList(stream Names) returns (Count);
	rpc Chat(stream Msg) returns (stream Msg);
}

message Count {
    int32 count = 1;
    int32 offset = 2;
}

message Names {
	string firstName = 1;
	string secondName = 2;
}

message Msg {
	string sender = 1;
	string msg = 2;
}

其中前置的几行内容是protobuf编译各个语言所需求要的额外信息;

synctax = "proto3"表示使用3版本的语法编译,而option中的内容为java所需要的额外信息。

编译pb文件

进入项目根目录,执行以下命令:

protoc -I ./grpc_chat --go_out=plugins=grpc:./grpc_chat ./grpc_chat/grpc_chat.proto

以上命令也可以使用go:generate来实现。

关于protc和protoc_gen_go这两个程序的参数可以分别参考这里这里

运行完成之后,会在grpc_chat目录下生成grpc_chat.pb.go文件,这样编译protobuf的工作就完成了。

创建grpc server

在创建grpc服务之前,我们需要一个可以用来服务的结构体:

import (
	pb "grpc_chat/grpc_chat"
	"context"
	"log"
)

var MyNames = &pb.Names{}

var MyFriends = []*pb.Names{}

type chatService struct{}

func (cs *chatService) ExchangeNames(ctx context.Context, names *pb.Names) (*pb.Names, error){
	MyFriends = append(MyFriends, names);
	log.Printf("%s %s is comming for exchange names\n", names.FirstName, names.SecondName)
	return MyNames, nil;
}

其中方法签名涵意一会儿会细说。

接下来,我们使用这个chatService起一个grpc服务:

import (
	pb "grpc_chat/grpc_chat"
	"context"
	"log"
	"google.golang.org/grpc/reflection"
	"net"
	"google.golang.org/grpc"
)

var Listen = ":8080"

func startService(){
	lis, err := net.Listen("tcp", ":17000")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterChatServiceServer(s, &chatService{})
	reflection.Register(s)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

func init() {
	flag.StringVar(&MyNames.FirstName, "fn", "", "first name")
	flag.StringVar(&MyNames.SecondName, "sn", "", "second name")
	flag.StringVar(&Listen, "l", ":8080", "listen address")
	flag.Parse()
}

func main() {
	go startService();
	select {};
}

虽然我们已经写完了主要代码,但目前程序还跑不起来,因为chatService只实现了我们定义的一个方法,其它三个方法还没有实现。

可以通过查看生成的grpc_chat/grcp_chat.pb.go来查看其它三个函数的签名:

type ChatServiceServer interface {
	ExchangeNames(context.Context, *Names) (*Names, error)
	GetList(*Count, ChatService_GetListServer) error
	SendList(ChatService_SendListServer) error
	Chat(ChatService_ChatServer) error
}

所以我们把剩下的三个方法定义完成:

func (cs *chatService)GetList(c *pb.Count, gls pb.ChatService_GetListServer)error{
	// todo
	return nil
}
func (cs *chatService)SendList(sls pb.ChatService_SendListServer)error {
	// todo
	return nil
}

func (cs *chatService)Chat(scs pb.ChatService_ChatServer) error{
	// todo
	return nil
}

我们把之前的内容添加到server/main.go文件里,那么go run server/main.go之后,我们就起了一个grpc的service。

创建gprc client

package main

import (
	"google.golang.org/grpc"
	"flag"
	pb "grpc_chat/grpc_chat"
	"log"
	"time"
	"context"
)
var (
	Server string
	MyNames = &pb.Names{}
	client pb.ChatServiceClient
)

func init(){
	flag.StringVar(&Server, "server", "127.0.0.1:8080", "grpc server address")
	flag.StringVar(&MyNames.FirstName, "fn", "", "first name")
	flag.StringVar(&MyNames.SecondName, "sn", "", "second name")
	flag.Parse()

	if conn, err := grpc.Dial(Server, grpc.WithInsecure()); err != nil {
		log.Fatal("connect to server failed", err)
	}else{
		client = pb.NewChatServiceClient(conn)
	}
}

func main(){
	ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second)
	defer cancel()
	sNames, err := client.ExchangeNames(ctx, MyNames);
	if err != nil {
		log.Fatalf("%v.ExchangeNames(_) = _, %v", client, err)
	}
	log.Printf("greet ok, server name: %s %s\n", sNames.FirstName, sNames.SecondName)
}

client调用服务的代码很简单,首先创建一个连接,使用pb.NewChatServiceClient(conn)即创建一个client。

再在这个client上就可以调用我们之前定义的四个方法了。

这个例子中,client只调用了ExchangeNames方法。

运行测试

开两个终端,先在一个终端上运行server:

go run server/main.go -fn=lee -sn=server

再在另外一个终端上运行client:

go run client/main.go -fn=lee -sn=client

顺利的话,可以看到server输出:

2018/04/30 13:26:53 lee client is comming for exchange names

在client端看到:

2018/04/30 13:26:53 greet ok, server name: lee server

所以对于Unary grpcs,实现起来比较简单。而且调用远程的函数与调用本地函数没有多大的差别,这就体现了rpc的优势,只需要知道服务端的函数签名和服务地址,剩下的就像调用本地函数一样调用。

完善成聊天程序

我们预想的聊天程序是这样,分成服务端和客户端两部分,服务端负责连接各个客户端,并转发各端间的消息,当然服务端也可以发送消息到各个client。

客户端通过在命令很输入不同的指令,可以发送消息,获取现有的好友列表等,并且客户端把接收到的消息在终端上输出。

服务端的实现

之前还差三个服务端函数没有实现,现在依次实现。

GetList

GetList函数代表了4种rpc类型中服务端发送结果为流(server-side streaming)的情况:

func (cs *chatService) GetList(c *pb.Count, gls pb.ChatService_GetListServer) error {
	MyFriendsLock.RLock();
	defer MyFriendsLock.RUnlock();
	if c.Offset > int32(len(MyFriends)) {
		return errors.New("no more friends")
	}
	count := c.Offset+c.Count;
	if count > int32(len(MyFriends)){
		count = int32(len(MyFriends)
	}
	sendNames := MyFriends[c.Offset:count]
	for _, names := range sendNames {
		if err := gls.Send(names); err != nil{
			return err;
		}
	}
	return nil
}
SendList

SendList代表了客户端发送参数为流(client-side streaming)的情况


// 实际无意义,用来接收客户端发上来的列表
func (cs *chatService) SendList(sls pb.ChatService_SendListServer) error {
	var ReciveNames []*pb.Names;
	for {
		names, err := sls.Recv()
		if err == io.EOF {
			return sls.SendAndClose(&pb.Count{
				int32(len(ReciveNames)),
				0,
			})
		}
		if err != nil {
			return err
		}
		ReciveNames = append(ReciveNames, names)
		log.Println("send :", names.FirstName, names.SecondName)
	}
}

在这个例子中,SendList并没有实质意义,只是为了演示一种rpc的类型。

Chat

Chat函数为双向流rpc(Bidirectional streaming RPC):

// 用于记录有多少客户端在chat
type Clients struct {
	sync.RWMutex
	Msg map[pb.ChatService_ChatServer]chan *pb.Msg
}
var clients = &Clients{Msg: map[pb.ChatService_ChatServer]chan *pb.Msg{}}

func (c *Clients) add(scs pb.ChatService_ChatServer, sendMsg chan *pb.Msg) {
	clients.Lock()
	clients.Msg[scs] = sendMsg
	clients.Unlock()
}

func (c *Clients) del(scs pb.ChatService_ChatServer) {
	clients.Lock()
	delete(clients.Msg, scs)
	clients.Unlock()
}

// 聊天双向流rpc
func (cs *chatService) Chat(scs pb.ChatService_ChatServer) error {
	sendMsg := make(chan *pb.Msg, 10)
	clients.add(scs, sendMsg)
	defer func() {
		clients.del(scs)
	}()
	waitc := make(chan struct{})
	go func() {
		for {
			in, err := scs.Recv()
			if err == io.EOF {
				close(waitc)
				return
			}
			if err != nil {
				log.Printf("receive error : %v\n", err)
				close(waitc)
				return
			}
			log.Printf(">>>>%s:%s\n", in.GetSender(), in.GetMsg())
			broadMsg(in)
		}
	}()
loop2:
	for {
		select {
		case s := <-sendMsg:
			scs.Send(s)
		case <-waitc:
			break loop2
		}
	}
	return nil
}
小结

从以上4个例子可以知道,对于server端来说:

  • 在发送参数和接口参数都不为流时,函数签名与pb中申明大体一致,只是接收参数的第一个位置多了一个context;

  • 当为server-side stream时,参数中会带有一个可以send对应类型的stream,在实现中多次调用send方法;

  • 当为client-side stream时,参数中会带有一个可以Recv对应类型的stream,在实现中多次调用Recv方法获取客户端流内容,在读取到EOF时,调用SendAndClose方法,将返回值写回客户端;

  • 当两端都为流时,可以开启读写协程,来各自接收和发送对应内容。

客户端实现

客户端实现与服务端大体一致,原理上其实在传输数据阶段,两端的地位是一致的,所以对应的实现也大体一致,不再细说:


var sendMsg = make(chan string, 10);

func chat() {
	stream, err := client.Chat(context.Background())
	if err != nil {
		log.Fatalf("%v.Chat(_) = _, %v", client, err)
	}
	waitc := make(chan struct{})
	// 读协程
	go func() {
		for {
			in, err := stream.Recv()
			if err == io.EOF {
				close(waitc)
				return
			}
			if err != nil {
				log.Fatalf("Failed to receive a msg : %v", err)
			}
			log.Printf(">>>>%s:%s", in.GetSender(), in.GetMsg())
		}
	}()
	// 写协程
loop2:
	for {
		select {
		case s := <-sendMsg:
			if s == "exit" {
				stream.CloseSend();
				return;
			}
			stream.Send(&pb.Msg{
				MyNames.FirstName + "." + MyNames.SecondName,
				s,
			})
		case <-waitc:
			break loop2
		}
	}
}

func get() {
	stream, err := client.GetList(context.Background(), &pb.Count{10, 0});
	if err != nil {
		log.Fatalf("getlist error :%v", err)
	}
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			return
		}
		if err != nil {
			log.Printf("Failed to receive a list : %v", err)
			return
		}
		log.Printf("==names===%s:%s", in.FirstName, in.SecondName)
	}
}
func send() {
	stream, err := client.SendList(context.Background())
	if err != nil {
		log.Fatalf("send list error :%v", err)
	}
	for i := 0; i < 10; i++ {
		err := stream.Send(&pb.Names{strconv.Itoa(i), strconv.Itoa(i)})
		if err != nil {
			log.Print("send error:", err)
			return
		}
	}
	c, err := stream.CloseAndRecv()
	if err != nil {
		log.Println("closeAndRecv failed:", err)
	} else {
		log.Println("send success", c.Count, c.Offset);
	}
}
func input() {
	for {
		var cmd, s string
		fmt.Scan(&cmd, &s)
		fmt.Println(cmd, s);
		switch cmd {
		case "get":
			get()
		case "send":
			send()
		case "msg":
			sendMsg <- s;
		case "exit":
			os.Exit(0);
		}
	}
}

以上程序代码,可以在github上找到,

运行效果

以下为开了一个server和三个client的运行效果: grpc_chat

添加证书校验与身份校验

添加TLS认证

生成私钥和自签名证书

生成私钥:

openssl genrsa -out server.key 2048

生成自签名证书:

openssl req -new -x509 -sha256 -key server.key -out server.pem -days 3650

这样,在当前目录下,我们就有了私钥(server.key)和公钥(server.pem)。

服务中添加认证
package main

import (
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc"
	"log"
	pb "grpc_chat/grpc_chat"

	"net"
	"google.golang.org/grpc/grpclog"
	"context"
	"fmt"
	"time"
)

type tlsService struct{}

func (ts *tlsService) SendMsg(ctx context.Context, msg *pb.Msg) (*pb.Msg, error) {
	log.Printf("%s >>> %s\n", msg.Sender, msg.Msg);
	return &pb.Msg{"server", "receive"}, nil
}

func server() {
	listen, err := net.Listen("tcp", ":8080")
	if err != nil {
		grpclog.Fatalf("failed to listen: %v", err)
	}

	// TLS认证
	creds, err := credentials.NewServerTLSFromFile("./server.pem", "./server.key")
	if err != nil {
		grpclog.Fatalf("Failed to generate credentials %v", err)
	}

	// 实例化grpc Server, 并开启TLS认证
	s := grpc.NewServer(grpc.Creds(creds))

	// 注册HelloService
	pb.RegisterTlsServiceServer(s, &tlsService{})

	go s.Serve(listen)
}

func client() {
	creds, err := credentials.NewClientTLSFromFile("./server.pem", "a.com");
	if err != nil {
		log.Fatalln("NewClientTLSFromFile error", err)
	}
	conn, _ := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(creds))
	// error handling omitted
	client := pb.NewTlsServiceClient(conn)
	msg, err := client.SendMsg(context.Background(), &pb.Msg{"client", "hello server"})
	fmt.Println(msg, err)
}

func main() {
	server();
	time.Sleep(time.Second)
	client();
}

可以看出,只是在NewServer和Dial时添加一个选项。

基于TLS的加密称之为Channel加密,他是针对这条连接上的所有请求。面之后要介绍的认证,是对每一次的请求做校验。

运行
➜  grpc_chat go run tls/main.go
2018/05/01 00:53:28 client >>> hello server
sender:"server" msg:"receive"  <nil>

添加身份验证

gRPC身份认证的原理是,客户端将信息通过以metadata的形式传递给服务端;服务端使用Interceptor在每一次请求到来的时候校验这些metadata信息,以完成身份认证:

在客户端请求的时候带上metadata
	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("auth", "pass1234"))
	msg, err := client.SendMsg(ctx, &pb.Msg{"client", "hello server"})
在服务端认证metadata
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	md, ok := metadata.FromIncomingContext(ctx);
	if !ok {
		return nil, errors.New("unaryInterceptor FromIncomingContext not ok ")
	}else if pass, ok := md["pass"]; !ok || pass[0] != "pass1234" {
		return nil, errors.New("auth failed");
	}
	return handler(ctx, req)
}

将以上定义的拦截器注册到对应的server中:

	s := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(unaryInterceptor))

如果客户端的请求中未带pass这一metadata,则调用方法时,会返回报错:rpc error: code = Unknown desc = auth failed。

在PHP中使用gRPC客户端请求go服务

准备工作

安装gRPC和protobuf扩展
$ sudo pecl install grpc
$ sudo pecl install protobuf
安装编译pb所要使用的protoc的PHP插件
$ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc
$ cd grpc
$ git submodule update --init
$ make grpc_php_plugin

安装完成后将目录下的bins/opt/grpc_php_plugin复制到一个方便方便获取的位置,这个位置在后面还需要用到,我把他拷贝到了${HOME}/Bin目录下。

创建项目依赖

在之前的grpc_chat项目目录下,创建名为php的目录,并做为php项目的根目录。

在项目下添加composer.json,将需要要到的grpc和protobuf加入到项目依赖中,整个文件看起来是这样:

{
    "name": "grpc_chat/php",
    "require": {
        "grpc/grpc": "^1.10",
        "google/protobuf": "^v3.3.0"

    },
    "authors": [
        {
            "name": "xiaochai",
            "email": "soso2501@gmail.com"
        }
    ]
}

执行composer install

编译pb文件到PHP代码

➜  protoc --proto_path=./grpc_chat/   --php_out=./php   --grpc_out=./php   --plugin=protoc-gen-grpc=${HOME}/Bin/grpc_php_plugin ./grpc_chat/tls.proto ./grpc_chat/grpc_chat.proto

这样,就在php目录下生成了GPBMetadata和Grpc_chat两个目录

编写client代码

在php目录下创建main.php:

<?php
include "vendor/autoload.php";

include(__DIR__ . "/Grpc_chat/ChatServiceClient.php");
include(__DIR__ . "/Grpc_chat/Count.php");
include(__DIR__ . "/Grpc_chat/Msg.php");
include(__DIR__ . "/Grpc_chat/Names.php");
include(__DIR__ . "/Grpc_chat/TlsServiceClient.php");
include(__DIR__ . "/GPBMetadata/GrpcChat.php");
include(__DIR__ . "/GPBMetadata/Tls.php");


$client = new Grpc_chat\ChatServiceClient("127.0.0.1:8080", ["credentials" => Grpc\ChannelCredentials::createInsecure()]);
$p = new Grpc_chat\Names();
$p->setFirstName("LL");
$p->setSecondName("EEE");

list($reply, $status) = $client->ExchangeNames($p)->wait();
var_dump($status, $reply->getFirstName(), $reply->getSecondName());

先使用go起一个服务端:

go run server/main.go -fn=lee -sn=server

然后执行main.php文件:

➜  php php main.php
class stdClass#13 (3) {
  public $metadata =>
  array(0) {
  }
  public $code =>
  int(0)
  public $details =>
  string(0) ""
}
string(3) "lee"
string(6) "server"

达到预期的结果。

添加认证信息

<?php

include "vendor/autoload.php";

include(__DIR__ . "/Grpc_chat/ChatServiceClient.php");
include(__DIR__ . "/Grpc_chat/Count.php");
include(__DIR__ . "/Grpc_chat/Msg.php");
include(__DIR__ . "/Grpc_chat/Names.php");
include(__DIR__ . "/Grpc_chat/TlsServiceClient.php");
include(__DIR__ . "/GPBMetadata/GrpcChat.php");
include(__DIR__ . "/GPBMetadata/Tls.php");


$override = "a.com";
$tlsClient = new Grpc_chat\TlsServiceClient("127.0.0.1:8080",
    [
        "credentials" => Grpc\ChannelCredentials::createSsl(file_get_contents(__DIR__ . "/../server.pem")),
        'grpc.ssl_target_name_override' => $override,
        'grpc.default_authority' => $override,
    ]
);

$msg = new Grpc_chat\Msg();
$msg->setSender("phpclient");
$msg->setMsg("hello");
list($reply, $status) = $tlsClient->SendMsg($msg, ["auth"=>["pass1234"]])->wait();
var_dump($reply->getSender(), $reply->getMsg(), $status);

运行结果:

➜  php php tls.php
string(6) "server"
string(7) "receive"
class stdClass#13 (3) {
  public $metadata =>
  array(0) {
  }
  public $code =>
  int(0)
  public $details =>
  string(0) ""
}

使用流程与go语言类似,需要注意的是TLS需要servername 信息,这块如果不填写(即grpc.ssl_target_name_overridegrpc.default_authority选项),则会报连接失败,导致问题定位困难。


参考

Golang gRPC实践 连载四 gRPC认证

openssl genrsa能够单独生成私钥还能推导出公钥的原因

grpc-golang实现账号and密码认证

完整代码

Read Next

加权随机抽样