作为一篇gRPC调研的总结,本篇主要介绍了gRPC在go语言上的使用方式,并着重记录了如何使用gRPC创建一个简单的聊天程序。
gRPC介绍
gRPC是Google开源的一款远程进程调用(RPC)框架。它使用http/2传输协议,使用Protocol Buffer作为编码协议。
这使得框架本身就带有全双工,消息压缩,节省带宽,跨语言等特性。
环境准备
在安装好go语言环境的情况下,还要额外安装以下工具和库。
获取gRPC库
go get -u google.golang.org/grpc
安装Protocol Buffer v3相关工具
下载protoc编译工具
可以在github上下载现成的二进制包。
解压,并将protoc命令工具移入PATH环境变量中。
下载go语言的protoc插件
protoc需要对应语言的插件才能把pb文件转换成对应的语言,所以下载完protoc之后,需要下载pb的go语言插件:
go get -u github.com/golang/protobuf/protoc-gen-go
同样,将protoc-gen-go加入到PATH环境变量。
pb文件定义
简单例子
pb中可以使用service关键字定义对应的RPC服务集合,并使用rpc关键字定义对应的方法名、接收参数和返回参数;
例如,我们要定义聊天服务,并定义一个交换姓名的RPC接口,可以定义如下pb文件:
service ChatService{
rcp ExchangeNames(Names) returns (Names);
}
message Names {
string firstName = 1;
string secondName = 2;
}
以上定义了一个聊天的服务ChatService,这个服务中有一个方法叫ExchangeNames,是用来交换用户的名字信息,它接收和返回的Message都为Names类型。
不同的方法类型
Unary RPCs
以上定义的ExchangeNames是最简单的一种方法类型,称之为一元RPC(Unary RPCs);
这类方法十分简单,接收一个参数,然后返回另外一个参数,与平常的方法调用无异。
Stream RPCs
grpc支持流类型(stream),不管是参数还是返回值。
如下发送和接收对方的聊天用户列表:
service ChatService{
rpc GetList(Count) returns (stream Names);
rpc SendList(stream Names) returns (Count);
}
message Count {
int32 count = 1;
int32 offset = 2;
}
Bidirectional streaming RPCs
如果接收参数和发送参数都为流,则这种方法被称之为Bidirectional streaming RPCs。
例如聊天接口定义如下:
service ChatService{
rpc Chat(stream Msg) returns (stream Msg);
}
message Msg {
string sender;
string msg;
}
编译pb文件到go代码
以以上的pb文件定义的service作为例子,一步一步实现这些gRPC服务。
创建项目
在GOPATH的src下建立目录grpc_chat作为项目的根目录,我们要在这个目录下依次建立聊天的服务端程序和客户端程序。
创建目录结构
grpc_chat
├── client
├── grpc_chat
└── server
其中grpc_chat为pb文件和对应的编译完成的go文件所在目录;
client和server分别为客户端程序和服务端程序。
创建pb文件
我们将之前介绍的pb文件写入grpc_chat/grcp_chat.proto文件中:
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.grpc_chat";
option java_outer_classname = "GrpcChat";
package grpc_chat;
service ChatService{
rpc ExchangeNames(Names) returns (Names);
rpc GetList(Count) returns (stream Names);
rpc SendList(stream Names) returns (Count);
rpc Chat(stream Msg) returns (stream Msg);
}
message Count {
int32 count = 1;
int32 offset = 2;
}
message Names {
string firstName = 1;
string secondName = 2;
}
message Msg {
string sender = 1;
string msg = 2;
}
其中前置的几行内容是protobuf编译各个语言所需求要的额外信息;
如synctax = "proto3"
表示使用3版本的语法编译,而option中的内容为java所需要的额外信息。
编译pb文件
进入项目根目录,执行以下命令:
protoc -I ./grpc_chat --go_out=plugins=grpc:./grpc_chat ./grpc_chat/grpc_chat.proto
以上命令也可以使用go:generate来实现。
关于protc和protoc_gen_go这两个程序的参数可以分别参考这里和这里
运行完成之后,会在grpc_chat目录下生成grpc_chat.pb.go文件,这样编译protobuf的工作就完成了。
创建grpc server
在创建grpc服务之前,我们需要一个可以用来服务的结构体:
import (
pb "grpc_chat/grpc_chat"
"context"
"log"
)
var MyNames = &pb.Names{}
var MyFriends = []*pb.Names{}
type chatService struct{}
func (cs *chatService) ExchangeNames(ctx context.Context, names *pb.Names) (*pb.Names, error){
MyFriends = append(MyFriends, names);
log.Printf("%s %s is comming for exchange names\n", names.FirstName, names.SecondName)
return MyNames, nil;
}
其中方法签名涵意一会儿会细说。
接下来,我们使用这个chatService起一个grpc服务:
import (
pb "grpc_chat/grpc_chat"
"context"
"log"
"google.golang.org/grpc/reflection"
"net"
"google.golang.org/grpc"
)
var Listen = ":8080"
func startService(){
lis, err := net.Listen("tcp", ":17000")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterChatServiceServer(s, &chatService{})
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
func init() {
flag.StringVar(&MyNames.FirstName, "fn", "", "first name")
flag.StringVar(&MyNames.SecondName, "sn", "", "second name")
flag.StringVar(&Listen, "l", ":8080", "listen address")
flag.Parse()
}
func main() {
go startService();
select {};
}
虽然我们已经写完了主要代码,但目前程序还跑不起来,因为chatService只实现了我们定义的一个方法,其它三个方法还没有实现。
可以通过查看生成的grpc_chat/grcp_chat.pb.go来查看其它三个函数的签名:
type ChatServiceServer interface {
ExchangeNames(context.Context, *Names) (*Names, error)
GetList(*Count, ChatService_GetListServer) error
SendList(ChatService_SendListServer) error
Chat(ChatService_ChatServer) error
}
所以我们把剩下的三个方法定义完成:
func (cs *chatService)GetList(c *pb.Count, gls pb.ChatService_GetListServer)error{
// todo
return nil
}
func (cs *chatService)SendList(sls pb.ChatService_SendListServer)error {
// todo
return nil
}
func (cs *chatService)Chat(scs pb.ChatService_ChatServer) error{
// todo
return nil
}
我们把之前的内容添加到server/main.go文件里,那么go run server/main.go之后,我们就起了一个grpc的service。
创建gprc client
package main
import (
"google.golang.org/grpc"
"flag"
pb "grpc_chat/grpc_chat"
"log"
"time"
"context"
)
var (
Server string
MyNames = &pb.Names{}
client pb.ChatServiceClient
)
func init(){
flag.StringVar(&Server, "server", "127.0.0.1:8080", "grpc server address")
flag.StringVar(&MyNames.FirstName, "fn", "", "first name")
flag.StringVar(&MyNames.SecondName, "sn", "", "second name")
flag.Parse()
if conn, err := grpc.Dial(Server, grpc.WithInsecure()); err != nil {
log.Fatal("connect to server failed", err)
}else{
client = pb.NewChatServiceClient(conn)
}
}
func main(){
ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second)
defer cancel()
sNames, err := client.ExchangeNames(ctx, MyNames);
if err != nil {
log.Fatalf("%v.ExchangeNames(_) = _, %v", client, err)
}
log.Printf("greet ok, server name: %s %s\n", sNames.FirstName, sNames.SecondName)
}
client调用服务的代码很简单,首先创建一个连接,使用pb.NewChatServiceClient(conn)
即创建一个client。
再在这个client上就可以调用我们之前定义的四个方法了。
这个例子中,client只调用了ExchangeNames方法。
运行测试
开两个终端,先在一个终端上运行server:
go run server/main.go -fn=lee -sn=server
再在另外一个终端上运行client:
go run client/main.go -fn=lee -sn=client
顺利的话,可以看到server输出:
2018/04/30 13:26:53 lee client is comming for exchange names
在client端看到:
2018/04/30 13:26:53 greet ok, server name: lee server
所以对于Unary grpcs,实现起来比较简单。而且调用远程的函数与调用本地函数没有多大的差别,这就体现了rpc的优势,只需要知道服务端的函数签名和服务地址,剩下的就像调用本地函数一样调用。
完善成聊天程序
我们预想的聊天程序是这样,分成服务端和客户端两部分,服务端负责连接各个客户端,并转发各端间的消息,当然服务端也可以发送消息到各个client。
客户端通过在命令很输入不同的指令,可以发送消息,获取现有的好友列表等,并且客户端把接收到的消息在终端上输出。
服务端的实现
之前还差三个服务端函数没有实现,现在依次实现。
GetList
GetList函数代表了4种rpc类型中服务端发送结果为流(server-side streaming)的情况:
func (cs *chatService) GetList(c *pb.Count, gls pb.ChatService_GetListServer) error {
MyFriendsLock.RLock();
defer MyFriendsLock.RUnlock();
if c.Offset > int32(len(MyFriends)) {
return errors.New("no more friends")
}
count := c.Offset+c.Count;
if count > int32(len(MyFriends)){
count = int32(len(MyFriends)
}
sendNames := MyFriends[c.Offset:count]
for _, names := range sendNames {
if err := gls.Send(names); err != nil{
return err;
}
}
return nil
}
SendList
SendList代表了客户端发送参数为流(client-side streaming)的情况
// 实际无意义,用来接收客户端发上来的列表
func (cs *chatService) SendList(sls pb.ChatService_SendListServer) error {
var ReciveNames []*pb.Names;
for {
names, err := sls.Recv()
if err == io.EOF {
return sls.SendAndClose(&pb.Count{
int32(len(ReciveNames)),
0,
})
}
if err != nil {
return err
}
ReciveNames = append(ReciveNames, names)
log.Println("send :", names.FirstName, names.SecondName)
}
}
在这个例子中,SendList并没有实质意义,只是为了演示一种rpc的类型。
Chat
Chat函数为双向流rpc(Bidirectional streaming RPC):
// 用于记录有多少客户端在chat
type Clients struct {
sync.RWMutex
Msg map[pb.ChatService_ChatServer]chan *pb.Msg
}
var clients = &Clients{Msg: map[pb.ChatService_ChatServer]chan *pb.Msg{}}
func (c *Clients) add(scs pb.ChatService_ChatServer, sendMsg chan *pb.Msg) {
clients.Lock()
clients.Msg[scs] = sendMsg
clients.Unlock()
}
func (c *Clients) del(scs pb.ChatService_ChatServer) {
clients.Lock()
delete(clients.Msg, scs)
clients.Unlock()
}
// 聊天双向流rpc
func (cs *chatService) Chat(scs pb.ChatService_ChatServer) error {
sendMsg := make(chan *pb.Msg, 10)
clients.add(scs, sendMsg)
defer func() {
clients.del(scs)
}()
waitc := make(chan struct{})
go func() {
for {
in, err := scs.Recv()
if err == io.EOF {
close(waitc)
return
}
if err != nil {
log.Printf("receive error : %v\n", err)
close(waitc)
return
}
log.Printf(">>>>%s:%s\n", in.GetSender(), in.GetMsg())
broadMsg(in)
}
}()
loop2:
for {
select {
case s := <-sendMsg:
scs.Send(s)
case <-waitc:
break loop2
}
}
return nil
}
小结
从以上4个例子可以知道,对于server端来说:
-
在发送参数和接口参数都不为流时,函数签名与pb中申明大体一致,只是接收参数的第一个位置多了一个context;
-
当为server-side stream时,参数中会带有一个可以send对应类型的stream,在实现中多次调用send方法;
-
当为client-side stream时,参数中会带有一个可以Recv对应类型的stream,在实现中多次调用Recv方法获取客户端流内容,在读取到EOF时,调用SendAndClose方法,将返回值写回客户端;
-
当两端都为流时,可以开启读写协程,来各自接收和发送对应内容。
客户端实现
客户端实现与服务端大体一致,原理上其实在传输数据阶段,两端的地位是一致的,所以对应的实现也大体一致,不再细说:
var sendMsg = make(chan string, 10);
func chat() {
stream, err := client.Chat(context.Background())
if err != nil {
log.Fatalf("%v.Chat(_) = _, %v", client, err)
}
waitc := make(chan struct{})
// 读协程
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a msg : %v", err)
}
log.Printf(">>>>%s:%s", in.GetSender(), in.GetMsg())
}
}()
// 写协程
loop2:
for {
select {
case s := <-sendMsg:
if s == "exit" {
stream.CloseSend();
return;
}
stream.Send(&pb.Msg{
MyNames.FirstName + "." + MyNames.SecondName,
s,
})
case <-waitc:
break loop2
}
}
}
func get() {
stream, err := client.GetList(context.Background(), &pb.Count{10, 0});
if err != nil {
log.Fatalf("getlist error :%v", err)
}
for {
in, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Printf("Failed to receive a list : %v", err)
return
}
log.Printf("==names===%s:%s", in.FirstName, in.SecondName)
}
}
func send() {
stream, err := client.SendList(context.Background())
if err != nil {
log.Fatalf("send list error :%v", err)
}
for i := 0; i < 10; i++ {
err := stream.Send(&pb.Names{strconv.Itoa(i), strconv.Itoa(i)})
if err != nil {
log.Print("send error:", err)
return
}
}
c, err := stream.CloseAndRecv()
if err != nil {
log.Println("closeAndRecv failed:", err)
} else {
log.Println("send success", c.Count, c.Offset);
}
}
func input() {
for {
var cmd, s string
fmt.Scan(&cmd, &s)
fmt.Println(cmd, s);
switch cmd {
case "get":
get()
case "send":
send()
case "msg":
sendMsg <- s;
case "exit":
os.Exit(0);
}
}
}
以上程序代码,可以在github上找到,
运行效果
以下为开了一个server和三个client的运行效果:
添加证书校验与身份校验
添加TLS认证
生成私钥和自签名证书
生成私钥:
openssl genrsa -out server.key 2048
生成自签名证书:
openssl req -new -x509 -sha256 -key server.key -out server.pem -days 3650
这样,在当前目录下,我们就有了私钥(server.key)和公钥(server.pem)。
服务中添加认证
package main
import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc"
"log"
pb "grpc_chat/grpc_chat"
"net"
"google.golang.org/grpc/grpclog"
"context"
"fmt"
"time"
)
type tlsService struct{}
func (ts *tlsService) SendMsg(ctx context.Context, msg *pb.Msg) (*pb.Msg, error) {
log.Printf("%s >>> %s\n", msg.Sender, msg.Msg);
return &pb.Msg{"server", "receive"}, nil
}
func server() {
listen, err := net.Listen("tcp", ":8080")
if err != nil {
grpclog.Fatalf("failed to listen: %v", err)
}
// TLS认证
creds, err := credentials.NewServerTLSFromFile("./server.pem", "./server.key")
if err != nil {
grpclog.Fatalf("Failed to generate credentials %v", err)
}
// 实例化grpc Server, 并开启TLS认证
s := grpc.NewServer(grpc.Creds(creds))
// 注册HelloService
pb.RegisterTlsServiceServer(s, &tlsService{})
go s.Serve(listen)
}
func client() {
creds, err := credentials.NewClientTLSFromFile("./server.pem", "a.com");
if err != nil {
log.Fatalln("NewClientTLSFromFile error", err)
}
conn, _ := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(creds))
// error handling omitted
client := pb.NewTlsServiceClient(conn)
msg, err := client.SendMsg(context.Background(), &pb.Msg{"client", "hello server"})
fmt.Println(msg, err)
}
func main() {
server();
time.Sleep(time.Second)
client();
}
可以看出,只是在NewServer和Dial时添加一个选项。
基于TLS的加密称之为Channel加密,他是针对这条连接上的所有请求。面之后要介绍的认证,是对每一次的请求做校验。
运行
➜ grpc_chat go run tls/main.go
2018/05/01 00:53:28 client >>> hello server
sender:"server" msg:"receive" <nil>
添加身份验证
gRPC身份认证的原理是,客户端将信息通过以metadata的形式传递给服务端;服务端使用Interceptor在每一次请求到来的时候校验这些metadata信息,以完成身份认证:
在客户端请求的时候带上metadata
ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("auth", "pass1234"))
msg, err := client.SendMsg(ctx, &pb.Msg{"client", "hello server"})
在服务端认证metadata
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx);
if !ok {
return nil, errors.New("unaryInterceptor FromIncomingContext not ok ")
}else if pass, ok := md["pass"]; !ok || pass[0] != "pass1234" {
return nil, errors.New("auth failed");
}
return handler(ctx, req)
}
将以上定义的拦截器注册到对应的server中:
s := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(unaryInterceptor))
如果客户端的请求中未带pass这一metadata,则调用方法时,会返回报错:rpc error: code = Unknown desc = auth failed。
在PHP中使用gRPC客户端请求go服务
准备工作
安装gRPC和protobuf扩展
$ sudo pecl install grpc
$ sudo pecl install protobuf
安装编译pb所要使用的protoc的PHP插件
$ git clone -b $(curl -L https://grpc.io/release) https://github.com/grpc/grpc
$ cd grpc
$ git submodule update --init
$ make grpc_php_plugin
安装完成后将目录下的bins/opt/grpc_php_plugin复制到一个方便方便获取的位置,这个位置在后面还需要用到,我把他拷贝到了${HOME}/Bin目录下。
创建项目依赖
在之前的grpc_chat项目目录下,创建名为php的目录,并做为php项目的根目录。
在项目下添加composer.json,将需要要到的grpc和protobuf加入到项目依赖中,整个文件看起来是这样:
{
"name": "grpc_chat/php",
"require": {
"grpc/grpc": "^1.10",
"google/protobuf": "^v3.3.0"
},
"authors": [
{
"name": "xiaochai",
"email": "soso2501@gmail.com"
}
]
}
执行composer install
。
编译pb文件到PHP代码
➜ protoc --proto_path=./grpc_chat/ --php_out=./php --grpc_out=./php --plugin=protoc-gen-grpc=${HOME}/Bin/grpc_php_plugin ./grpc_chat/tls.proto ./grpc_chat/grpc_chat.proto
这样,就在php目录下生成了GPBMetadata和Grpc_chat两个目录
编写client代码
在php目录下创建main.php:
<?php
include "vendor/autoload.php";
include(__DIR__ . "/Grpc_chat/ChatServiceClient.php");
include(__DIR__ . "/Grpc_chat/Count.php");
include(__DIR__ . "/Grpc_chat/Msg.php");
include(__DIR__ . "/Grpc_chat/Names.php");
include(__DIR__ . "/Grpc_chat/TlsServiceClient.php");
include(__DIR__ . "/GPBMetadata/GrpcChat.php");
include(__DIR__ . "/GPBMetadata/Tls.php");
$client = new Grpc_chat\ChatServiceClient("127.0.0.1:8080", ["credentials" => Grpc\ChannelCredentials::createInsecure()]);
$p = new Grpc_chat\Names();
$p->setFirstName("LL");
$p->setSecondName("EEE");
list($reply, $status) = $client->ExchangeNames($p)->wait();
var_dump($status, $reply->getFirstName(), $reply->getSecondName());
先使用go起一个服务端:
go run server/main.go -fn=lee -sn=server
然后执行main.php文件:
➜ php php main.php
class stdClass#13 (3) {
public $metadata =>
array(0) {
}
public $code =>
int(0)
public $details =>
string(0) ""
}
string(3) "lee"
string(6) "server"
达到预期的结果。
添加认证信息
<?php
include "vendor/autoload.php";
include(__DIR__ . "/Grpc_chat/ChatServiceClient.php");
include(__DIR__ . "/Grpc_chat/Count.php");
include(__DIR__ . "/Grpc_chat/Msg.php");
include(__DIR__ . "/Grpc_chat/Names.php");
include(__DIR__ . "/Grpc_chat/TlsServiceClient.php");
include(__DIR__ . "/GPBMetadata/GrpcChat.php");
include(__DIR__ . "/GPBMetadata/Tls.php");
$override = "a.com";
$tlsClient = new Grpc_chat\TlsServiceClient("127.0.0.1:8080",
[
"credentials" => Grpc\ChannelCredentials::createSsl(file_get_contents(__DIR__ . "/../server.pem")),
'grpc.ssl_target_name_override' => $override,
'grpc.default_authority' => $override,
]
);
$msg = new Grpc_chat\Msg();
$msg->setSender("phpclient");
$msg->setMsg("hello");
list($reply, $status) = $tlsClient->SendMsg($msg, ["auth"=>["pass1234"]])->wait();
var_dump($reply->getSender(), $reply->getMsg(), $status);
运行结果:
➜ php php tls.php
string(6) "server"
string(7) "receive"
class stdClass#13 (3) {
public $metadata =>
array(0) {
}
public $code =>
int(0)
public $details =>
string(0) ""
}
使用流程与go语言类似,需要注意的是TLS需要servername 信息,这块如果不填写(即grpc.ssl_target_name_override
和grpc.default_authority
选项),则会报连接失败,导致问题定位困难。