rpc是远程过程调用的简称,是分布式系统中不同节点间流行的通信方式。在互联网时代,rpc已经和ipc一样成为一个不可或缺的基础构件。因此go语言的标准库也提供了一个简单的rpc实现,我们将以此为入口学习rpc的各种用法。
go语言的rpc包的路径为net/rpc,也就是放在了net包目录下面。因此我们可以猜测该rpc包是建立在net包基础之上的。在第一章“hello, world”革命一节最后,我们基于http实现了一个打印例子。下面我们尝试基于rpc实现一个类似的例子。
我们先构造一个helloservice类型,其中的hello方法用于实现打印功能:
type helloservice struct {}
func (p *helloservice) hello(request string, reply *string) error {
*reply = "hello:" request
return nil
}
其中hello方法必须满足go语言的rpc规则:方法只能有两个可序列化的参数,其中第二个参数是指针类型,并且返回一个error类型,同时必须是公开的方法。
然后就可以将helloservice类型的对象注册为一个rpc服务:
func main() {
rpc.registername("helloservice", new(helloservice))
listener, err := net.listen("tcp", ":1234")
if err != nil {
log.fatal("listentcp error:", err)
}
conn, err := listener.accept()
if err != nil {
log.fatal("accept error:", err)
}
rpc.serveconn(conn)
}
其中rpc.register函数调用会将对象类型中所有满足rpc规则的对象方法注册为rpc函数,所有注册的方法会放在“helloservice”服务空间之下。然后我们建立一个唯一的tcp链接,并且通过rpc.serveconn函数在该tcp链接上为对方提供rpc服务。
下面是客户端请求helloservice服务的代码:
func main() {
client, err := rpc.dial("tcp", "localhost:1234")
if err != nil {
log.fatal("dialing:", err)
}
var reply string
err = client.call("helloservice.hello", "hello", &reply)
if err != nil {
log.fatal(err)
}
fmt.println(reply)
}
首先是通过rpc.dial拨号rpc服务,然后通过client.call调用具体的rpc方法。在调用client.call时,第一个参数是用点号链接的rpc服务名字和方法名字,第二和第三个参数分别我们定义rpc方法的两个参数。
由这个例子可以看出rpc的使用其实非常简单。
在涉及rpc的应用中,作为开发人员一般至少有三种角色:首先是服务端实现rpc方法的开发人员,其次是客户端调用rpc方法的人员,最后也是最重要的是制定服务端和客户端rpc接口规范的设计人员。在前面的例子中我们为了简化将以上几种角色的工作全部放到了一起,虽然看似实现简单,但是不利于后期的维护和工作的切割。
如果要重构helloservice服务,第一步需要明确服务的名字和接口:
const helloservicename = "path/to/pkg.helloservice"
type helloserviceinterface = interface {
hello(request string, reply *string) error
}
func registerhelloservice(svc helloserviceinterface) error {
return rpc.registername(helloservicename, svc)
}
我们将rpc服务的接口规范分为三个部分:首先是服务的名字,然后是服务要实现的详细方法列表,最后是注册该类型服务的函数。为了避免名字冲突,我们在rpc服务的名字中增加了包路径前缀(这个是rpc服务抽象的包路径,并非完全等价go语言的包路径)。registerhelloservice注册服务时,编译器会要求传入的对象满足helloserviceinterface接口。
在定义了rpc服务接口规范之后,客户端就可以根据规范编写rpc调用的代码了:
func main() {
client, err := rpc.dial("tcp", "localhost:1234")
if err != nil {
log.fatal("dialing:", err)
}
var reply string
err = client.call(helloservicename ".hello", "hello", &reply)
if err != nil {
log.fatal(err)
}
}
其中唯一的变化是client.call的第一个参数用helloservicename ".hello"代替了"helloservice.hello"。然而通过client.call函数调用rpc方法依然比较繁琐,同时参数的类型依然无法得到编译器提供的安全保障。
为了简化客户端用户调用rpc函数,我们在可以在接口规范部分增加对客户端的简单包装:
type helloserviceclient struct {
*rpc.client
}
var _ helloserviceinterface = (*helloserviceclient)(nil)
func dialhelloservice(network, address string) (*helloserviceclient, error) {
c, err := rpc.dial(network, address)
if err != nil {
return nil, err
}
return &helloserviceclient{client: c}, nil
}
func (p *helloserviceclient) hello(request string, reply *string) error {
return p.client.call(helloservicename ".hello", request, reply)
}
我们在接口规范中针对客户端新增加了helloserviceclient类型,该类型也必须满足helloserviceinterface接口,这样客户端用户就可以直接通过接口对应的方法调用rpc函数。同时提供了一个dialhelloservice方法,直接拨号helloservice服务。
基于新的客户端接口,我们可以简化客户端用户的代码:
func main() {
client, err := dialhelloservice("tcp", "localhost:1234")
if err != nil {
log.fatal("dialing:", err)
}
var reply string
err = client.hello("hello", &reply)
if err != nil {
log.fatal(err)
}
}
现在客户端用户不用再担心rpc方法名字或参数类型不匹配等低级错误的发生。
最后是基于rpc接口规范编写真实的服务端代码:
type helloservice struct {}
func (p *helloservice) hello(request string, reply *string) error {
*reply = "hello:" request
return nil
}
func main() {
registerhelloservice(new(helloservice))
listener, err := net.listen("tcp", ":1234")
if err != nil {
log.fatal("listentcp error:", err)
}
for {
conn, err := listener.accept()
if err != nil {
log.fatal("accept error:", err)
}
go rpc.serveconn(conn)
}
}
在新的rpc服务端实现中,我们用registerhelloservice函数来注册函数,这样不仅可以避免命名服务名称的工作,同时也保证了传入的服务对象满足了rpc接口的定义。最后我们新的服务改为支持多个tcp链接,然后为每个tcp链接提供rpc服务。
标准库的rpc默认采用go语言特有的gob编码,因此从其它语言调用go语言实现的rpc服务将比较困难。在互联网的微服务时代,每个rpc以及服务的使用者都可能采用不同的编程语言,因此跨语言是互联网时代rpc的一个首要条件。得益于rpc的框架设计,go语言的rpc其实也是很容易实现跨语言支持的。
go语言的rpc框架有两个比较有特色的设计:一个是rpc数据打包时可以通过插件实现自定义的编码和解码;另一个是rpc建立在抽象的io.readwritecloser接口之上的,我们可以将rpc架设在不同的通讯协议之上。这里我们将尝试通过官方自带的net/rpc/jsonrpc扩展实现一个跨语言的rpc。
首先是基于json编码重新实现rpc服务:
func main() {
rpc.registername("helloservice", new(helloservice))
listener, err := net.listen("tcp", ":1234")
if err != nil {
log.fatal("listentcp error:", err)
}
for {
conn, err := listener.accept()
if err != nil {
log.fatal("accept error:", err)
}
go rpc.servecodec(jsonrpc.newservercodec(conn))
}
}
代码中最大的变化是用rpc.servecodec函数替代了rpc.serveconn函数,传入的参数是针对服务端的json编解码器。
然后是实现json版本的客户端:
func main() {
conn, err := net.dial("tcp", "localhost:1234")
if err != nil {
log.fatal("net.dial:", err)
}
client := rpc.newclientwithcodec(jsonrpc.newclientcodec(conn))
var reply string
err = client.call("helloservice.hello", "hello", &reply)
if err != nil {
log.fatal(err)
}
fmt.println(reply)
}
先手工调用net.dial函数建立tcp链接,然后基于该链接建立针对客户端的json编解码器。
在确保客户端可以正常调用rpc服务的方法之后,我们用一个普通的tcp服务代替go语言版本的rpc服务,这样可以查看客户端调用时发送的数据格式。比如通过nc命令nc -l 1234在同样的端口启动一个tcp服务。然后再次执行一次rpc调用将会发现nc输出了以下的信息:
{"method":"helloservice.hello","params":["hello"],"id":0}
这是一个json编码的数据,其中method部分对应要调用的rpc服务和方法组合成的名字,params部分的第一个元素为参数,id是由调用端维护的一个唯一的调用编号。
请求的json数据对象在内部对应两个结构体:客户端是clientrequest,服务端是serverrequest。clientrequest和serverrequest结构体的内容基本是一致的:
type clientrequest struct {
method string `json:"method"`
params [1]interface{} `json:"params"`
id uint64 `json:"id"`
}
type serverrequest struct {
method string `json:"method"`
params *json.rawmessage `json:"params"`
id *json.rawmessage `json:"id"`
}
在获取到rpc调用对应的json数据后,我们可以通过直接向架设了rpc服务的tcp服务器发送json数据模拟rpc方法调用:
$ echo -e '{"method":"helloservice.hello","params":["hello"],"id":1}' | nc localhost 1234
返回的结果也是一个json格式的数据:
{"id":1,"result":"hello:hello","error":null}
其中id对应输入的id参数,result为返回的结果,error部分在出问题时表示错误信息。对于顺序调用来说,id不是必须的。但是go语言的rpc框架支持异步调用,当返回结果的顺序和调用的顺序不一致时,可以通过id来识别对应的调用。
返回的json数据也是对应内部的两个结构体:客户端是clientresponse,服务端是serverresponse。两个结构体的内容同样也是类似的:
type clientresponse struct {
id uint64 `json:"id"`
result *json.rawmessage `json:"result"`
error interface{} `json:"error"`
}
type serverresponse struct {
id *json.rawmessage `json:"id"`
result interface{} `json:"result"`
error interface{} `json:"error"`
}
因此无论采用何种语言,只要遵循同样的json结构,以同样的流程就可以和go语言编写的rpc服务进行通信。这样我们就实现了跨语言的rpc。
go语言内在的rpc框架已经支持在http协议上提供rpc服务。但是框架的http服务同样采用了内置的gob协议,并且没有提供采用其它协议的接口,因此从其它语言依然无法访问的。在前面的例子中,我们已经实现了在tcp协议之上运行jsonrpc服务,并且通过nc命令行工具成功实现了rpc方法调用。现在我们尝试在http协议上提供jsonrpc服务。
新的rpc服务其实是一个类似rest规范的接口,接收请求并采用相应处理流程:
func main() {
rpc.registername("helloservice", new(helloservice))
http.handlefunc("/jsonrpc", func(w http.responsewriter, r *http.request) {
var conn io.readwritecloser = struct {
io.writer
io.readcloser
}{
readcloser: r.body,
writer: w,
}
rpc.serverequest(jsonrpc.newservercodec(conn))
})
http.listenandserve(":1234", nil)
}
rpc的服务架设在“/jsonrpc”路径,在处理函数中基于http.responsewriter和http.request类型的参数构造一个io.readwritecloser类型的conn通道。然后基于conn构建针对服务端的json编码解码器。最后通过rpc.serverequest函数为每次请求处理一次rpc方法调用。
模拟一次rpc调用的过程就是向该链接发送一个json字符串:
$ curl localhost:1234/jsonrpc -x post \
--data '{"method":"helloservice.hello","params":["hello"],"id":0}'
返回的结果依然是json字符串:
{"id":0,"result":"hello:hello","error":null}
这样就可以很方便地从不同语言中访问rpc服务了。
在不同的场景中rpc有着不同的需求,因此开源的社区就诞生了各种rpc框架。本节我们将尝试go内置rpc框架在一些比较特殊场景的用法。
go语言的rpc库最简单的使用方式是通过client.call方法进行同步阻塞调用,该方法的实现如下:
func (client *client) call(
servicemethod string, args interface{},
reply interface{},
) error {
call := <-client.go(servicemethod, args, reply, make(chan *call, 1)).done
return call.error
}
首先通过client.go方法进行一次异步调用,返回一个表示这次调用的call结构体。然后等待call结构体的done管道返回调用结果。
我们也可以通过client.go方法异步调用前面的helloservice服务:
func doclientwork(client *rpc.client) {
hellocall := client.go("helloservice.hello", "hello", new(string), nil)
// do some thing
hellocall = <-hellocall.done
if err := hellocall.error; err != nil {
log.fatal(err)
}
args := hellocall.args.(string)
reply := hellocall.reply.(string)
fmt.println(args, reply)
}
在异步调用命令发出后,一般会执行其他的任务,因此异步调用的输入参数和返回值可以通过返回的call变量进行获取。
执行异步调用的client.go方法实现如下:
func (client *client) go(
servicemethod string, args interface{},
reply interface{},
done chan *call,
) *call {
call := new(call)
call.servicemethod = servicemethod
call.args = args
call.reply = reply
call.done = make(chan *call, 10) // buffered.
client.send(call)
return call
}
首先是构造一个表示当前调用的call变量,然后通过client.send将call的完整参数发送到rpc框架。client.send方法调用是线程安全的,因此可以从多个goroutine同时向同一个rpc链接发送调用指令。
当调用完成或者发生错误时,将调用call.done方法通知完成:
func (call *call) done() {
select {
case call.done <- call:
// ok
default:
// we don't want to block here. it is the caller's responsibility to make
// sure the channel has enough buffer space. see comment in go().
}
}
从call.done方法的实现可以得知call.done管道会将处理后的call返回。
在很多系统中都提供了watch监视功能的接口,当系统满足某种条件时watch方法返回的结果。在这里我们可以尝试通过rpc框架实现一个基本的watch功能。如前文所描述,因为client.send是线程安全的,我们也可以通过在不同的goroutine中同时并发阻塞调用rpc方法。通过在一个独立的goroutine中调用watch函数进行。
为了便于演示,我们计划通过rpc构造一个简单的内存kv数据库。首先定义服务如下:
type kvstoreservice struct {
m map[string]string
filter map[string]func(key string)
mu sync.mutex
}
func newkvstoreservice() *kvstoreservice {
return &kvstoreservice{
m: make(map[string]string),
filter: make(map[string]func(key string)),
}
}
其中m成员是一个map类型,用于存储kv数据。filter成员对应每个watch调用时定义的过滤器函数列表。而mu成员为互斥锁,用于在多个goroutine访问或修改时对其它成员提供保护。
然后就是get和set方法:
func (p *kvstoreservice) get(key string, value *string) error {
p.mu.lock()
defer p.mu.unlock()
if v, ok := p.m[key]; ok {
*value = v
return nil
}
return fmt.errorf("not found")
}
func (p *kvstoreservice) set(kv [2]string, reply *struct{}) error {
p.mu.lock()
defer p.mu.unlock()
key, value := kv[0], kv[1]
if oldvalue := p.m[key]; oldvalue != value {
for _, fn := range p.filter {
fn(key)
}
}
p.m[key] = value
return nil
}
在set方法中,输入参数是key和value组成的数组,用一个匿名的空结构体表示忽略了输出参数。当修改某个key对应的值时会调用每一个过滤器函数。
而过滤器列表在watch方法中提供:
func (p *kvstoreservice) watch(timeoutsecond int, keychanged *string) error {
id := fmt.sprintf("watch-%s-d", time.now(), rand.int())
ch := make(chan string, 10) // buffered
p.mu.lock()
p.filter[id] = func(key string) { ch <- key }
p.mu.unlock()
select {
case <-time.after(time.duration(timeoutsecond) * time.second):
return fmt.errorf("timeout")
case key := <-ch:
*keychanged = key
return nil
}
return nil
}
watch方法的输入参数是超时的秒数。当有key变化时将key作为返回值返回。如果超过时间后依然没有key被修改,则返回超时的错误。watch的实现中,用唯一的id表示每个watch调用,然后根据id将自身对应的过滤器函数注册到p.filter列表。
kvstoreservice服务的注册和启动过程我们不再赘述。下面我们看看如何从客户端使用watch方法:
func doclientwork(client *rpc.client) {
go func() {
var keychanged string
err := client.call("kvstoreservice.watch", 30, &keychanged)
if err != nil {
log.fatal(err)
}
fmt.println("watch:", keychanged)
} ()
err := client.call(
"kvstoreservice.set", [2]string{"abc", "abc-value"},
new(struct{}),
)
if err != nil {
log.fatal(err)
}
time.sleep(time.second*3)
}
首先启动一个独立的goroutine监控key的变化。同步的watch调用会阻塞,直到有key发生变化或者超时。然后在通过set方法修改kv值时,服务器会将变化的key通过watch方法返回。这样我们就可以实现对某些状态的监控。
通常的rpc是基于c/s结构,rpc的服务端对应网络的服务器,rpc的客户端也对应网络客户端。但是对于一些特殊场景,比如在公司内网提供一个rpc服务,但是在外网无法链接到内网的服务器。这种时候我们可以参考类似反向代理的技术,首先从内网主动链接到外网的tcp服务器,然后基于tcp链接向外网提供rpc服务。
以下是启动反向rpc服务的代码:
func main() {
rpc.register(new(helloservice))
for {
conn, _ := net.dial("tcp", "localhost:1234")
if conn == nil {
time.sleep(time.second)
continue
}
rpc.serveconn(conn)
conn.close()
}
}
反向rpc的内网服务将不再主动提供tcp监听服务,而是首先主动链接到对方的tcp服务器。然后基于每个建立的tcp链接向对方提供rpc服务。
而rpc客户端则需要在一个公共的地址提供一个tcp服务,用于接受rpc服务器的链接请求:
func main() {
listener, err := net.listen("tcp", ":1234")
if err != nil {
log.fatal("listentcp error:", err)
}
clientchan := make(chan *rpc.client)
go func() {
for {
conn, err := listener.accept()
if err != nil {
log.fatal("accept error:", err)
}
clientchan <- rpc.newclient(conn)
}
}()
doclientwork(clientchan)
}
当每个链接建立后,基于网络链接构造rpc客户端对象并发送到clientchan管道。
客户端执行rpc调用的操作在doclientwork函数完成:
func doclientwork(clientchan <-chan *rpc.client) {
client := <-clientchan
defer client.close()
var reply string
err = client.call("helloservice.hello", "hello", &reply)
if err != nil {
log.fatal(err)
}
fmt.println(reply)
}
首先从管道去取一个rpc客户端对象,并且通过defer语句指定在函数退出前关闭客户端。然后是执行正常的rpc调用。
基于上下文我们可以针对不同客户端提供定制化的rpc服务。我们可以通过为每个链接提供独立的rpc服务来实现对上下文特性的支持。
首先改造helloservice,里面增加了对应链接的conn成员:
type helloservice struct {
conn net.conn
}
然后为每个链接启动独立的rpc服务:
func main() {
listener, err := net.listen("tcp", ":1234")
if err != nil {
log.fatal("listentcp error:", err)
}
for {
conn, err := listener.accept()
if err != nil {
log.fatal("accept error:", err)
}
go func() {
defer conn.close()
p := rpc.newserver()
p.register(&helloservice{conn: conn})
p.serveconn(conn)
} ()
}
}
hello方法中就可以根据conn成员识别不同链接的rpc调用:
func (p *helloservice) hello(request string, reply *string) error {
*reply = "hello:" request ", from" p.conn.remoteaddr().string()
return nil
}
基于上下文信息,我们可以方便地为rpc服务增加简单的登陆状态的验证:
type helloservice struct {
conn net.conn
islogin bool
}
func (p *helloservice) login(request string, reply *string) error {
if request != "user:password" {
return fmt.errorf("auth failed")
}
log.println("login ok")
p.islogin = true
return nil
}
func (p *helloservice) hello(request string, reply *string) error {
if !p.islogin {
return fmt.errorf("please login")
}
*reply = "hello:" request ", from" p.conn.remoteaddr().string()
return nil
}
这样可以要求在客户端链接rpc服务时,首先要执行登陆操作,登陆成功后才能正常执行其他的服务。
grpc是google公司基于protobuf开发的跨语言的开源rpc框架。grpc基于http/2协议设计,可以基于一个http/2链接提供多个服务,对于移动设备更加友好。本节将讲述grpc的简单用法。
go语言的grpc技术栈如图所示:
最底层为tcp或unix socket协议,在此之上是http/2协议的实现,然后在http/2协议之上又构建了针对go语言的grpc核心库。应用程序通过grpc插件生产的stub代码和grpc核心库通信,也可以直接和grpc核心库通信。
如果从protobuf的角度看,grpc只不过是一个针对service接口生成代码的生成器。我们在本章的第二节中手工实现了一个简单的protobuf代码生成器插件,只不过当时生成的代码是适配标准库的rpc框架的。现在我们将学习grpc的用法。
创建hello.proto文件,定义helloservice接口:
syntax = "proto3";
package main;
message string {
string value = 1;
}
service helloservice {
rpc hello (string) returns (string);
}
使用protoc-gen-go内置的grpc插件生成grpc代码:
$ protoc --go_out=plugins=grpc:. hello.proto
grpc插件会为服务端和客户端生成不同的接口:
type helloserviceserver interface {
hello(context.context, *string) (*string, error)
}
type helloserviceclient interface {
hello(context.context, *string, ...grpc.calloption) (*string, error)
}
grpc通过context.context参数,为每个方法调用提供了上下文支持。客户端在调用方法的时候,可以通过可选的grpc.calloption类型的参数提供额外的上下文信息。
基于服务端的helloserviceserver接口可以重新实现helloservice服务:
type helloserviceimpl struct{}
func (p *helloserviceimpl) hello(
ctx context.context, args *string,
) (*string, error) {
reply := &string{value: "hello:" args.getvalue()}
return reply, nil
}
grpc服务的启动流程和标准库的rpc服务启动流程类似:
func main() {
grpcserver := grpc.newserver()
registerhelloserviceserver(grpcserver, new(helloserviceimpl))
lis, err := net.listen("tcp", ":1234")
if err != nil {
log.fatal(err)
}
grpcserver.serve(lis)
}
首先是通过grpc.newserver()构造一个grpc服务对象,然后通过grpc插件生成的registerhelloserviceserver函数注册我们实现的helloserviceimpl服务。然后通过grpcserver.serve(lis)在一个监听端口上提供grpc服务。
然后就可以通过客户端链接grpc服务了:
func main() {
conn, err := grpc.dial("localhost:1234", grpc.withinsecure())
if err != nil {
log.fatal(err)
}
defer conn.close()
client := newhelloserviceclient(conn)
reply, err := client.hello(context.background(), &string{value: "hello"})
if err != nil {
log.fatal(err)
}
fmt.println(reply.getvalue())
}
其中grpc.dial负责和grpc服务建立链接,然后newhelloserviceclient函数基于已经建立的链接构造helloserviceclient对象。返回的client其实是一个helloserviceclient接口对象,通过接口定义的方法就可以调用服务端对应的grpc服务提供的方法。
grpc和标准库的rpc框架有一个区别,grpc生成的接口并不支持异步调用。不过我们可以在多个goroutine之间安全地共享grpc底层的http/2链接,因此可以通过在另一个goroutine阻塞调用的方式模拟异步调用。
rpc是远程函数调用,因此每次调用的函数参数和返回值不能太大,否则将严重影响每次调用的响应时间。因此传统的rpc方法调用对于上传和下载较大数据量场景并不适合。同时传统rpc模式也不适用于对时间不确定的订阅和发布模式。为此,grpc框架针对服务器端和客户端分别提供了流特性。
服务端或客户端的单向流是双向流的特例,我们在helloservice增加一个支持双向流的channel方法:
service helloservice {
rpc hello (string) returns (string);
rpc channel (stream string) returns (stream string);
}
关键字stream指定启用流特性,参数部分是接收客户端参数的流,返回值是返回给客户端的流。
重新生成代码可以看到接口中新增加的channel方法的定义:
type helloserviceserver interface {
hello(context.context, *string) (*string, error)
channel(helloservice_channelserver) error
}
type helloserviceclient interface {
hello(ctx context.context, in *string, opts ...grpc.calloption) (
*string, error,
)
channel(ctx context.context, opts ...grpc.calloption) (
helloservice_channelclient, error,
)
}
在服务端的channel方法参数是一个新的helloservice_channelserver类型的参数,可以用于和客户端双向通信。客户端的channel方法返回一个helloservice_channelclient类型的返回值,可以用于和服务端进行双向通信。
helloservice_channelserver和helloservice_channelclient均为接口类型:
type helloservice_channelserver interface {
send(*string) error
recv() (*string, error)
grpc.serverstream
}
type helloservice_channelclient interface {
send(*string) error
recv() (*string, error)
grpc.clientstream
}
可以发现服务端和客户端的流辅助接口均定义了send和recv方法用于流数据的双向通信。
现在我们可以实现流服务:
func (p *helloserviceimpl) channel(stream helloservice_channelserver) error {
for {
args, err := stream.recv()
if err != nil {
if err == io.eof {
return nil
}
return err
}
reply := &string{value: "hello:" args.getvalue()}
err = stream.send(reply)
if err != nil {
return err
}
}
}
服务端在循环中接收客户端发来的数据,如果遇到io.eof表示客户端流被关闭,如果函数退出表示服务端流关闭。生成返回的数据通过流发送给客户端,双向流数据的发送和接收都是完全独立的行为。需要注意的是,发送和接收的操作并不需要一一对应,用户可以根据真实场景进行组织代码。
客户端需要先调用channel方法获取返回的流对象:
stream, err := client.channel(context.background())
if err != nil {
log.fatal(err)
}
在客户端我们将发送和接收操作放到两个独立的goroutine。首先是向服务端发送数据:
go func() {
for {
if err := stream.send(&string{value: "hi"}); err != nil {
log.fatal(err)
}
time.sleep(time.second)
}
}()
然后在循环中接收服务端返回的数据:
for {
reply, err := stream.recv()
if err != nil {
if err == io.eof {
break
}
log.fatal(err)
}
fmt.println(reply.getvalue())
}
这样就完成了完整的流接收和发送支持。
在前一节中,我们基于go内置的rpc库实现了一个简化版的watch方法。基于watch的思路虽然也可以构造发布和订阅系统,但是因为rpc缺乏流机制导致每次只能返回一个结果。在发布和订阅模式中,由调用者主动发起的发布行为类似一个普通函数调用,而被动的订阅者则类似grpc客户端单向流中的接收者。现在我们可以尝试基于grpc的流特性构造一个发布和订阅系统。
发布订阅是一个常见的设计模式,开源社区中已经存在很多该模式的实现。其中docker项目中提供了一个pubsub的极简实现,下面是基于pubsub包实现的本地发布订阅代码:
import (
"github.com/moby/moby/pkg/pubsub"
)
func main() {
p := pubsub.newpublisher(100*time.millisecond, 10)
golang := p.subscribetopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.hasprefix(key, "golang:") {
return true
}
}
return false
})
docker := p.subscribetopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.hasprefix(key, "docker:") {
return true
}
}
return false
})
go p.publish("hi")
go p.publish("golang: https://golang.org")
go p.publish("docker: https://www.docker.com/")
time.sleep(1)
go func() {
fmt.println("golang topic:", <-golang)
}()
go func() {
fmt.println("docker topic:", <-docker)
}()
<-make(chan bool)
}
其中pubsub.newpublisher构造一个发布对象,p.subscribetopic()可以通过函数筛选感兴趣的主题进行订阅。
现在尝试基于grpc和pubsub包,提供一个跨网络的发布和订阅系统。首先通过protobuf定义一个发布订阅服务接口:
service pubsubservice {
rpc publish (string) returns (string);
rpc subscribe (string) returns (stream string);
}
其中publish是普通的rpc方法,subscribe则是一个单向的流服务。然后grpc插件会为服务端和客户端生成对应的接口:
type pubsubserviceserver interface {
publish(context.context, *string) (*string, error)
subscribe(*string, pubsubservice_subscribeserver) error
}
type pubsubserviceclient interface {
publish(context.context, *string, ...grpc.calloption) (*string, error)
subscribe(context.context, *string, ...grpc.calloption) (
pubsubservice_subscribeclient, error,
)
}
type pubsubservice_subscribeserver interface {
send(*string) error
grpc.serverstream
}
因为subscribe是服务端的单向流,因此生成的helloservice_subscribeserver接口中只有send方法。
然后就可以实现发布和订阅服务了:
type pubsubservice struct {
pub *pubsub.publisher
}
func newpubsubservice() *pubsubservice {
return &pubsubservice{
pub: pubsub.newpublisher(100*time.millisecond, 10),
}
}
然后是实现发布方法和订阅方法:
func (p *pubsubservice) publish(
ctx context.context, arg *string,
) (*string, error) {
p.pub.publish(arg.getvalue())
return &string{}, nil
}
func (p *pubsubservice) subscribe(
arg *string, stream pubsubservice_subscribeserver,
) error {
ch := p.pub.subscribetopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.hasprefix(key,arg.getvalue()) {
return true
}
}
return false
})
for v := range ch {
if err := stream.send(&string{value: v.(string)}); err != nil {
return err
}
}
return nil
}
这样就可以从客户端向服务器发布信息了:
func main() {
conn, err := grpc.dial("localhost:1234", grpc.withinsecure())
if err != nil {
log.fatal(err)
}
defer conn.close()
client := newpubsubserviceclient(conn)
_, err = client.publish(
context.background(), &string{value: "golang: hello go"},
)
if err != nil {
log.fatal(err)
}
_, err = client.publish(
context.background(), &string{value: "docker: hello docker"},
)
if err != nil {
log.fatal(err)
}
}
然后就可以在另一个客户端进行订阅信息了:
func main() {
conn, err := grpc.dial("localhost:1234", grpc.withinsecure())
if err != nil {
log.fatal(err)
}
defer conn.close()
client := newpubsubserviceclient(conn)
stream, err := client.subscribe(
context.background(), &string{value: "golang:"},
)
if err != nil {
log.fatal(err)
}
for {
reply, err := stream.recv()
if err != nil {
if err == io.eof {
break
}
log.fatal(err)
}
fmt.println(reply.getvalue())
}
}
到此我们就基于grpc简单实现了一个跨网络的发布和订阅服务。
作为一个基础的rpc框架,安全和扩展是经常遇到的问题。本节将简单介绍如何对grpc进行安全认证。然后介绍通过grpc的截取器特性,以及如何通过截取器优雅地实现token认证、调用跟踪以及panic捕获等特性。最后介绍了grpc服务如何和其他web服务共存。
grpc建立在http/2协议之上,对tls提供了很好的支持。我们前面章节中grpc的服务都没有提供证书支持,因此客户端在链接服务器中通过grpc.withinsecure()选项跳过了对服务器证书的验证。没有启用证书的grpc服务在和客户端进行的是明文通讯,信息面临被任何第三方监听的风险。为了保障grpc通信不被第三方监听篡改或伪造,我们可以对服务器启动tls加密特性。
可以用以下命令为服务器和客户端分别生成私钥和证书:
$ openssl genrsa -out server.key 2048
$ openssl req -new -x509 -days 3650 \
-subj "/c=gb/l=china/o=grpc-server/cn=server.grpc.io" \
-key server.key -out server.crt
$ openssl genrsa -out client.key 2048
$ openssl req -new -x509 -days 3650 \
-subj "/c=gb/l=china/o=grpc-client/cn=client.grpc.io" \
-key client.key -out client.crt
以上命令将生成server.key、server.crt、client.key和client.crt四个文件。其中以.key为后缀名的是私钥文件,需要妥善保管。以.crt为后缀名是证书文件,也可以简单理解为公钥文件,并不需要秘密保存。在subj参数中的/cn=server.grpc.io表示服务器的名字为server.grpc.io,在验证服务器的证书时需要用到该信息。
有了证书之后,我们就可以在启动grpc服务时传入证书选项参数:
func main() {
creds, err := credentials.newservertlsfromfile("server.crt", "server.key")
if err != nil {
log.fatal(err)
}
server := grpc.newserver(grpc.creds(creds))
...
}
其中credentials.newservertlsfromfile函数是从文件为服务器构造证书对象,然后通过grpc.creds(creds)函数将证书包装为选项后作为参数传入grpc.newserver函数。
在客户端基于服务器的证书和服务器名字就可以对服务器进行验证:
func main() {
creds, err := credentials.newclienttlsfromfile(
"server.crt", "server.grpc.io",
)
if err != nil {
log.fatal(err)
}
conn, err := grpc.dial("localhost:5000",
grpc.withtransportcredentials(creds),
)
if err != nil {
log.fatal(err)
}
defer conn.close()
...
}
其中redentials.newclienttlsfromfile是构造客户端用的证书对象,第一个参数是服务器的证书文件,第二个参数是签发证书的服务器的名字。然后通过grpc.withtransportcredentials(creds)将证书对象转为参数选项传人grpc.dial函数。
以上这种方式,需要提前将服务器的证书告知客户端,这样客户端在链接服务器时才能进行对服务器证书认证。在复杂的网络环境中,服务器证书的传输本身也是一个非常危险的问题。如果在中间某个环节,服务器证书被监听或替换那么对服务器的认证也将不再可靠。
为了避免证书的传递过程中被篡改,可以通过一个安全可靠的根证书分别对服务器和客户端的证书进行签名。这样客户端或服务器在收到对方的证书后可以通过根证书进行验证证书的有效性。
根证书的生成方式和自签名证书的生成方式类似:
$ openssl genrsa -out ca.key 2048
$ openssl req -new -x509 -days 3650 \
-subj "/c=gb/l=china/o=gobook/cn=github.com" \
-key ca.key -out ca.crt
然后是重新对服务器端证书进行签名:
$ openssl req -new \
-subj "/c=gb/l=china/o=server/cn=server.io" \
-key server.key \
-out server.csr
$ openssl x509 -req -sha256 \
-ca ca.crt -cakey ca.key -cacreateserial -days 3650 \
-in server.csr \
-out server.crt
签名的过程中引入了一个新的以.csr为后缀名的文件,它表示证书签名请求文件。在证书签名完成之后可以删除.csr文件。
然后在客户端就可以基于ca证书对服务器进行证书验证:
func main() {
certificate, err := tls.loadx509keypair("client.crt", "client.key")
if err != nil {
log.fatal(err)
}
certpool := x509.newcertpool()
ca, err := ioutil.readfile("ca.crt")
if err != nil {
log.fatal(err)
}
if ok := certpool.appendcertsfrompem(ca); !ok {
log.fatal("failed to append ca certs")
}
creds := credentials.newtls(&tls.config{
certificates: []tls.certificate{certificate},
servername: tlsservername, // note: this is required!
rootcas: certpool,
})
conn, err := grpc.dial(
"localhost:5000", grpc.withtransportcredentials(creds),
)
if err != nil {
log.fatal(err)
}
defer conn.close()
...
}
在新的客户端代码中,我们不再直接依赖服务器端证书文件。在credentials.newtls函数调用中,客户端通过引入一个ca根证书和服务器的名字来实现对服务器进行验证。客户端在链接服务器时会首先请求服务器的证书,然后使用ca根证书对收到的服务器端证书进行验证。
如果客户端的证书也采用ca根证书签名的话,服务器端也可以对客户端进行证书认证。我们用ca根证书对客户端证书签名:
$ openssl req -new \
-subj "/c=gb/l=china/o=client/cn=client.io" \
-key client.key \
-out client.csr
$ openssl x509 -req -sha256 \
-ca ca.crt -cakey ca.key -cacreateserial -days 3650 \
-in client.csr \
-out client.crt
因为引入了ca根证书签名,在启动服务器时同样要配置根证书:
func main() {
certificate, err := tls.loadx509keypair("server.crt", "server.key")
if err != nil {
log.fatal(err)
}
certpool := x509.newcertpool()
ca, err := ioutil.readfile("ca.crt")
if err != nil {
log.fatal(err)
}
if ok := certpool.appendcertsfrompem(ca); !ok {
log.fatal("failed to append certs")
}
creds := credentials.newtls(&tls.config{
certificates: []tls.certificate{certificate},
clientauth: tls.requireandverifyclientcert, // note: this is optional!
clientcas: certpool,
})
server := grpc.newserver(grpc.creds(creds))
...
}
服务器端同样改用credentials.newtls函数生成证书,通过clientcas选择ca根证书,并通过clientauth选项启用对客户端进行验证。
到此我们就实现了一个服务器和客户端进行双向证书验证的通信可靠的grpc系统。
前面讲述的基于证书的认证是针对每个grpc链接的认证。grpc还为每个grpc方法调用提供了认证支持,这样就基于用户token对不同的方法访问进行权限管理。
要实现对每个grpc方法进行认证,需要实现grpc.perrpccredentials接口:
type perrpccredentials interface {
// getrequestmetadata gets the current request metadata, refreshing
// tokens if required. this should be called by the transport layer on
// each request, and the data should be populated in headers or other
// context. if a status code is returned, it will be used as the status
// for the rpc. uri is the uri of the entry point for the request.
// when supported by the underlying implementation, ctx can be used for
// timeout and cancellation.
// todo(zhaoq): define the set of the qualified keys instead of leaving
// it as an arbitrary string.
getrequestmetadata(ctx context.context, uri ...string) (
map[string]string, error,
)
// requiretransportsecurity indicates whether the credentials requires
// transport security.
requiretransportsecurity() bool
}
在getrequestmetadata方法中返回认证需要的必要信息。requiretransportsecurity方法表示是否要求底层使用安全链接。在真实的环境中建议必须要求底层启用安全的链接,否则认证信息有泄露和被篡改的风险。
我们可以创建一个authentication类型,用于实现用户名和密码的认证:
type authentication struct {
user string
password string
}
func (a *authentication) getrequestmetadata(context.context, ...string) (
map[string]string, error,
) {
return map[string]string{"user":a.user, "password": a.password}, nil
}
func (a *authentication) requiretransportsecurity() bool {
return false
}
在getrequestmetadata方法中,我们返回地认证信息包装login和password两个信息。为了演示代码简单,requiretransportsecurity方法表示不要求底层使用安全链接。
然后在每次请求grpc服务时就可以将token信息作为参数选项传人:
func main() {
auth := authentication{
login: "gopher",
password: "password",
}
conn, err := grpc.dial("localhost" port, grpc.withinsecure(), grpc.withperrpccredentials(&auth))
if err != nil {
log.fatal(err)
}
defer conn.close()
...
}
通过grpc.withperrpccredentials函数将authentication对象转为grpc.dial参数。因为这里没有启用安全链接,需要传人grpc.withinsecure()表示忽略证书认证。
然后在grpc服务端的每个方法中通过authentication类型的auth方法进行身份认证:
type grpcserver struct { auth *authentication }
func (p *grpcserver) somemethod(
ctx context.context, in *hellorequest,
) (*helloreply, error) {
if err := p.auth.auth(ctx); err != nil {
return nil, err
}
return &helloreply{message: "hello " in.name}, nil
}
func (a *authentication) auth(ctx context.context) error {
md, ok := metadata.fromincomingcontext(ctx)
if !ok {
return fmt.errorf("missing credentials")
}
var appid string
var appkey string
if val, ok := md["login"]; ok { appid = val[0] }
if val, ok := md["password"]; ok { appkey = val[0] }
if appid != a.login || appkey != a.password {
return grpc.errorf(codes.unauthenticated, "invalid token")
}
return nil
}
详细地认证工作主要在authentication.auth方法中完成。首先通过metadata.fromincomingcontext从ctx上下文中获取元信息,然后取出相应的认证信息进行认证。如果认证失败,则返回一个codes.unauthenticated类型地错误。
grpc中的grpc.unaryinterceptor和grpc.streaminterceptor分别对普通方法和流方法提供了截取器的支持。我们这里简单介绍普通方法的截取器用法。
要实现普通方法的截取器,需要为grpc.unaryinterceptor的参数实现一个函数:
func filter(ctx context.context,
req interface{}, info *grpc.unaryserverinfo,
handler grpc.unaryhandler,
) (resp interface{}, err error) {
log.println("fileter:", info)
return handler(ctx, req)
}
函数的ctx和req参数就是每个普通的rpc方法的前两个参数。第三个info参数表示当前是对应的那个grpc方法,第四个handler参数对应当前的grpc方法函数。上面的函数中首先是日志输出info参数,然后调用handler对应的grpc方法函数。
要使用filter截取器函数,只需要在启动grpc服务时作为参数输入即可:
server := grpc.newserver(grpc.unaryinterceptor(filter))
然后服务器在收到每个grpc方法调用之前,会首先输出一行日志,然后再调用对方的方法。
如果截取器函数返回了错误,那么该次grpc方法调用将被视作失败处理。因此,我们可以在截取器中对输入的参数做一些简单的验证工作。同样,也可以对handler返回的结果做一些验证工作。截取器也非常适合前面对token认证工作。
下面是截取器增加了对grpc方法异常的捕获:
func filter(
ctx context.context, req interface{},
info *grpc.unaryserverinfo,
handler grpc.unaryhandler,
) (resp interface{}, err error) {
log.println("fileter:", info)
defer func() {
if r := recover(); r != nil {
err = fmt.errorf("panic: %v", r)
}
}()
return handler(ctx, req)
}
不过grpc框架中只能为每个服务设置一个截取器,因此所有的截取工作只能在一个函数中完成。开源的grpc-ecosystem项目中的go-grpc-middleware包已经基于grpc对截取器实现了链式截取器的支持。
以下是go-grpc-middleware包中链式截取器的简单用法
import "github.com/grpc-ecosystem/go-grpc-middleware"
myserver := grpc.newserver(
grpc.unaryinterceptor(grpc_middleware.chainunaryserver(
filter1, filter2, ...
)),
grpc.streaminterceptor(grpc_middleware.chainstreamserver(
filter1, filter2, ...
)),
)
感兴趣的同学可以参考go-grpc-middleware包的代码。
grpc构建在http/2协议之上,因此我们可以将grpc服务和普通的web服务架设在同一个端口之上。
对于没有启动tls协议的服务则需要对http2/2特性做适当的调整:
func main() {
mux := http.newservemux()
h2handler := h2c.newhandler(mux, &http2.server{})
server = &http.server{addr: ":3999", handler: h2handler}
server.listenandserve()
}
启用普通的https服务器则非常简单:
func main() {
mux := http.newservemux()
mux.handlefunc("/", func(w http.responsewriter, req *http.request) {
fmt.fprintln(w, "hello")
})
http.listenandservetls(port, "server.crt", "server.key",
http.handlerfunc(func(w http.responsewriter, r *http.request) {
mux.servehttp(w, r)
return
}),
)
}
而单独启用带证书的grpc服务也是同样的简单:
func main() {
creds, err := credentials.newservertlsfromfile("server.crt", "server.key")
if err != nil {
log.fatal(err)
}
grpcserver := grpc.newserver(grpc.creds(creds))
...
}
因为grpc服务已经实现了servehttp方法,可以直接作为web路由处理对象。如果将grpc和web服务放在一起,会导致grpc和web路径的冲突,在处理时我们需要区分两类服务。
我们可以通过以下方式生成同时支持web和grpc协议的路由处理函数:
func main() {
...
http.listenandservetls(port, "server.crt", "server.key",
http.handlerfunc(func(w http.responsewriter, r *http.request) {
if r.protomajor != 2 {
mux.servehttp(w, r)
return
}
if strings.contains(
r.header.get("content-type"), "application/grpc",
) {
grpcserver.servehttp(w, r) // grpc server
return
}
mux.servehttp(w, r)
return
}),
)
}
首先grpc是建立在http/2版本之上,如果http不是http/2协议则必然无法提供grpc支持。同时,每个grpc调用请求的content-type类型会被标注为"application/grpc"类型。
这样我们就可以在grpc端口上同时提供web服务了。
如若转载,请注明出处:https://www.dasum.com/110492.html