grpc(通用远程过程调用)是谷歌基于rpc协议开发的一个框架,用于在分布式系统中调用函数。它具有使用 http/2 作为其传输协议的一些优点。 grpc 可以被视为流行的数据查询和操作语言(如 rest 或 graphql)的替代方案。
本文旨在作为一个简单的介绍演示,使任何人都可以在下一个项目中启动 grpc,并专注于协议的核心概念,不涉及更高级的主题,例如负载平衡、调用取消或拦截器。
本文中给出的示例基于 flutter 客户端和 rust 服务器,以展示流行的前端/后端组合的基本实现。
grpc 包
grpc 包的基本结构基于消息和服务的定义。 grpc 包与客户端和服务器必须知道的 .proto 文件相关联,以便正确处理和序列化消息。这些消息是使用协议缓冲区定义的,并支持各种数据类型和结构。服务由 rpc 组成,这些 rpc 基本上是一些基于消息的远程函数定义。
总之,服务允许四种类型的 rpc:
- 一个简单的一元 rpc,其中客户端向服务器发送单个请求并返回单个响应
- 服务器端流式 rpc,客户端向服务器发送单个请求并返回数据流
- 客户端流式 rpc,其中客户端将数据流发送到服务器并返回单个响应
- 双向流式 rpc,双方独立发送数据流。
在本文中,我们将重点介绍前三种服务方法及其工作原理。
一个简单的包
在 grpc 中,客户端触发通信。因此,从客户端的角度定义服务和消息名称通常是有意义的。以下文件显示了简单的包 my_grpc_service,其中包含两条消息 mygrpcrequest 和 mygrpcresponse 以及关联的服务 mygrpcservice。该服务由三种不同类型的 rpc 组成。
syntax = "proto3";
package my_grpc_service;
message mygrpcresponse{
double somedouble = 1;
string somestring = 2;
}
message mygrpcrequest{
string somestring = 1;
}
service mygrpcservice{
rpc sendandgetdata (mygrpcrequest) returns (mygrpcresponse); // simple unary rpc
rpc getlotsofdata (mygrpcrequest) returns (stream mygrpcresponse); // server-side streaming rpc
rpc sendlotsofdata (stream mygrpcrequest) returns (mygrpcresponse); // client-side streaming rpc
}
该文件将用于为客户端和服务器生成 grpc 代码。 grpc 官方网站概述了一些用于此目的的流行库。 这些库的基础通常由 protoc 编译器组成。
一个基本的实现
先决条件
对于 flutter,必须添加包 grpc 和 protobuf。 对于 rust,使用了流行的 crates tonic 和 tokio。 此外,必须安装 protoc 编译器并将其添加到 path。
生成 grpc 代码
将 .proto 文件放入名为 protos 的顶级文件夹是常见的做法。 出于这个原因,两个项目的 .proto 文件都放在那里。
可以通过在终端中运行以下命令来生成 dart 代码:
protoc --dart_out=grpc:lib/generated -iprotos protos/my_grpc_service.proto
这将生成实现服务所需的所有 dart 文件,并将它们放在 lib/generated。
rust 代码可以使用构建脚本生成。 构建脚本 build.rs 位于 .toml 文件旁边,定义了在构建过程之前要执行的操作。 在这种情况下,构建脚本由以下几行组成:
fn main() {
let proto_file = "./protos/my_grpc_service.proto";
tonic_build::configure()
.build_server(true)
.compile(&[proto_file], &["."])
.unwrap_or_else(|e| panic!("protobuf compile error: {}", e));
}
这将生成实现服务所需的所有 rust 文件并将它们放在构建文件夹中。
之后,必须将生成的 dart 客户端 mygrpcserviceclient 打包到服务中。 这使得通过 providers 将服务注入到 flutter 应用程序的各个部分成为可能。 在服务器端,必须实现生成的特征 mygrpcservice 以便为服务提供一些功能。
简单的一元 rpc
在简单的一元 rpc 的情况下,定义了一个函数来接收一些请求消息并返回一些响应消息。
import 'package:flutter_grpc_demo/generated/my_grpc_service.pbgrpc.dart';
import 'package:grpc/grpc.dart';
class mygrpcservice {
final channel = clientchannel(
'0.0.0.0', //localhost
port: 50052,
options: const channeloptions(credentials: channelcredentials.insecure()),
);
//...
future sendandgetdata(mygrpcrequest request) async {
final stub = mygrpcserviceclient(channel); // this class is auto generated
try {
return await stub.sendandgetdata(request);
} on exception catch (e) {
return future.error(e);
}
}
//...
}
在简单的一元 rpc 的情况下,还定义了一个函数来接收一些请求消息并返回一些响应消息。
pub struct mygrpcserver {}
#[tonic::async_trait]
impl mygrpcservice for mygrpcserver {
async fn send_and_get_data(
&self,
request: request,
) -> result, status> {
//incoming request message
let my_grpc_request = request.into_inner();
//do something with the request message and generate a response message
let my_grpc_response = get_some_response().await;
ok(response::new(my_grpc_response))
}
//...
}
mod service {
tonic::include_proto!("my_grpc_service");
}
use service::my_grpc_service_server::mygrpcserviceserver;
#[tokio::main]
async fn main() -> result<(), box> {
let addr = "0.0.0.0:50052".parse().unwrap(); // localhost
let server = mygrpcserver::default();
// start server
println!("server listening on {}", addr);
server::builder()
.add_service(mygrpcserviceserver::new(server))
.serve(addr)
.await?;
ok(())
}
服务器端流式 rpc
在服务器端流式 rpc 的情况下,客户端获取响应消息流。 这些消息可以异步获取,然后通过另一个可以监听的流传递给 flutter 应用程序。
import 'package:flutter_grpc_demo/generated/my_grpc_service.pbgrpc.dart';
import 'package:grpc/grpc.dart';
class mygrpcservice {
final channel = clientchannel(
'0.0.0.0', //localhost
port: 50052,
options: const channeloptions(credentials: channelcredentials.insecure()),
);
//...
stream listentostreamfromserver(mygrpcrequest request) async* {
final stub = mygrpcserviceclient(channel); // this class is auto generated
try {
var responses = stub.getlotsofdata(request);
await for (final response in responses) {
yield response;
}
} on exception catch (e) {
yield* stream.error(e);
}
}
//...
}
在服务器端,创建了一些可迭代的数据,并使用 tokio 生成了通道。 这些通道通过对项目进行排队来传输单个响应消息。
pub struct mygrpcserver {}
#[tonic::async_trait]
impl mygrpcservice for mygrpcserver {
type getlotsofdatastream = pin> send>>;
async fn get_lots_of_data(
&self,
request: request,
) -> result, status> {
//incoming request message
let my_grpc_request = request.into_inner();
//do something with the request message and generate a vector of response messages
let my_grpc_responses = get_vector_of_responses().await; // outgoing response messages
//spawn up to 128 channels and send asynchronous response messages
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
while let some(item) = tokio_stream::iter(my_grpc_responses).next().await {
match tx.send(result::<_, status>::ok(item)).await {
ok(_) => {
//item was queued
}
err(_item) => {
//output stream was build and will be dropped
break;
}
}
}
});
ok(response::new(box::pin(receiverstream::new(rx))))
}
//...
}
mod service {
tonic::include_proto!("my_grpc_service");
}
use service::my_grpc_service_server::mygrpcserviceserver;
#[tokio::main]
async fn main() -> result<(), box> {
let addr = "0.0.0.0:50052".parse().unwrap(); // localhost
let server = mygrpcserver::default();
// start server
println!("server listening on {}", addr);
server::builder()
.add_service(mygrpcserviceserver::new(server))
.serve(addr)
.await?;
ok(())
}
客户端流式 rpc
在客户端流式 rpc 的情况下,客户端发送请求消息流并接收单个响应消息。 此服务功能的输入是 flutter 应用程序提供的流。
import 'package:flutter_grpc_demo/generated/my_grpc_service.pbgrpc.dart';
import 'package:grpc/grpc.dart';
class mygrpcservice {
final channel = clientchannel(
'0.0.0.0', //localhost
port: 50052,
options: const channeloptions(credentials: channelcredentials.insecure()),
);
//...
future streamdatatoserver(stream requests) async {
final stub = mygrpcserviceclient(channel); // this class is auto generated
try {
return await stub.sendlotsofdata(requests);
} on exception catch (e) {
return future.error(e);
}
}
//...
}
在服务器端获取并迭代请求消息。 之后,将单个响应消息发送回客户端。
pub struct mygrpcserver {}
#[tonic::async_trait]
impl mygrpcservice for mygrpcserver {
async fn send_lots_of_data(
&self,
request: request>,
) -> result, tonic::status> {
//incoming request messages
let mut incoming_stream = request.into_inner();
while let some(request) = incoming_stream.next().await {
let current_request = some(request?);
//perform some operations on incoming request messages
//...
}
//generate some response message
let response = get_some_response().await;
ok(response::new(response))
}
//...
}
mod service {
tonic::include_proto!("my_grpc_service");
}
use service::my_grpc_service_server::mygrpcserviceserver;
#[tokio::main]
async fn main() -> result<(), box> {
let addr = "0.0.0.0:50052".parse().unwrap(); // localhost
let server = mygrpcserver::default();
// start server
println!("server listening on {}", addr);
server::builder()
.add_service(mygrpcserviceserver::new(server))
.serve(addr)
.await?;
ok(())
}
把它包起来
本文应该对 grpc 框架进行了快速简单的介绍,并展示了简单一元、服务器端流式传输和客户端流式传输 rpc 的基本实现。 应该清楚每次 rpc 过程中服务端和客户端接收和发送什么样的数据,核心概念是什么。
如若转载,请注明出处:https://www.dasum.com/45589.html