跳到主要内容

gRPC

gRPC (Google Remote Procedure Call) 是一个高性能、开源的 RPC 框架,最初由 Google 开发。它使用 Protocol Buffers 作为接口定义语言,支持多种编程语言,特别适合微服务架构。

简介

gRPC 特性

gRPC 核心特性:

  • 高性能: 基于 HTTP/2 和 Protobuf,性能优异
  • 跨语言: 支持多种编程语言
  • 接口定义: 使用 Protobuf 定义服务
  • 流式处理: 支持单向流和双向流
  • 负载均衡: 内置负载均衡和重试机制
  • 认证: 支持多种认证机制
  • 双向流: 支持客户端和服务端流
  • 超时控制: 精确的超时和取消控制

适用场景:

  • 微服务架构
  • 分布式系统
  • 高性能 API
  • 实时通信
  • 跨语言服务调用

安装 gRPC

# 安装 gRPC 和 Protobuf
go get -u google.golang.org/grpc
go get -u google.golang.org/protobuf

# 安装 protoc 编译器
# macOS
brew install protobuf

# Linux
apt-get install protobuf-compiler

# 安装 Go Protobuf 插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

快速开始

1. 定义服务 (proto/hello.proto)

syntax = "proto3";

package hello;

option go_package = "./proto";

service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc SayHelloStream (stream HelloRequest) returns (stream HelloReply) {}
}

message HelloRequest {
string name = 1;
}

message HelloReply {
string message = 1;
}

2. 生成 Go 代码

# 生成 gRPC 代码
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/hello.proto

3. 服务端实现

package main

import (
"context"
"log"
"net"

"google.golang.org/grpc"
pb "my-project/proto"
)

type server struct {
pb.UnimplementedGreeterServer
}

func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.GetName())
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}

s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})

log.Printf("Server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}

4. 客户端实现

package main

import (
"context"
"log"
"time"

"google.golang.org/grpc"
pb "my-project/proto"
)

func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()

c := pb.NewGreeterClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

r, err := c.SayHello(ctx, &pb.HelloRequest{Name: "Alice"})
if err != nil {
log.Fatalf("Could not greet: %v", err)
}

log.Printf("Greeting: %s", r.GetMessage())
}

Protobuf 基础

1. 消息定义

syntax = "proto3";

message User {
int32 id = 1;
string name = 2;
string email = 3;
repeated string tags = 4;
map<string, string> metadata = 5;

enum UserType {
ADMIN = 0;
USER = 1;
GUEST = 2;
}
UserType type = 6;

oneof contact {
string phone = 7;
string address = 8;
}
}

2. 服务定义

service UserService {
// 简单 RPC
rpc GetUser (GetUserRequest) returns (User);

// 服务端流式 RPC
rpc ListUsers (ListUsersRequest) returns (stream User);

// 客户端流式 RPC
rpc CreateUser (stream User) returns (CreateUserResponse);

// 双向流式 RPC
rpc Chat (stream Message) returns (stream Message);
}

gRPC 模式

1. 简单 RPC (Unary)

rpc SayHello (HelloRequest) returns (HelloReply);
// 服务端
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}

// 客户端
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: "Alice"})

2. 服务端流式 RPC

rpc ListUsers (ListUsersRequest) returns (stream User);
// 服务端
func (s *server) ListUsers(in *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
users := []*pb.User{
{Id: 1, Name: "Alice"},
{Id: 2, Name: "Bob"},
}

for _, user := range users {
if err := stream.Send(user); err != nil {
return err
}
}

return nil
}

// 客户端
stream, err := c.ListUsers(ctx, &pb.ListUsersRequest{})
for {
user, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
log.Printf("User: %v", user)
}

3. 客户端流式 RPC

rpc CreateUser (stream User) returns (CreateUserResponse);
// 服务端
func (s *server) CreateUser(stream pb.UserService_CreateUserServer) error {
var count int32
for {
user, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.CreateUserResponse{Count: count})
}
if err != nil {
return err
}
count++
}
}

// 客户端
stream, err := c.CreateUser(ctx)
for _, user := range users {
if err := stream.Send(user); err != nil {
log.Fatal(err)
}
}
r, err := stream.CloseAndRecv()

4. 双向流式 RPC

rpc Chat (stream Message) returns (stream Message);
// 服务端
func (s *server) Chat(stream pb.ChatService_ChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}

// 处理消息并返回
if err := stream.Send(&pb.Message{
Content: "Echo: " + in.Content,
}); err != nil {
return err
}
}
}

// 客户端
stream, err := c.Chat(ctx)
waitc := make(chan struct{})

// 接收 goroutine
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
close(waitc)
return
}
log.Printf("Received: %s", in.Content)
}
}()

// 发送消息
for _, msg := range messages {
if err := stream.Send(msg); err != nil {
log.Fatal(err)
}
}
stream.CloseSend()
<-waitc

拦截器 (Interceptor)

1. 一元拦截器

// 服务端拦截器
func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log.Printf("Method: %s, Request: %v", info.FullMethod, req)
return handler(ctx, req)
}

// 使用拦截器
s := grpc.NewServer(
grpc.UnaryInterceptor(loggingInterceptor),
)

2. 流式拦截器

func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
log.Printf("Streaming Method: %s", info.FullMethod)
return handler(srv, ss)
}

s := grpc.NewServer(
grpc.StreamInterceptor(streamInterceptor),
)

3. 客户端拦截器

func clientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
log.Printf("Calling method: %s", method)
return invoker(ctx, method, req, reply, cc, opts...)
}

conn, err := grpc.Dial(
"localhost:50051",
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(clientInterceptor),
)

认证和安全

1. SSL/TLS

// 服务端
creds, err := credentials.LoadTLSCredentials("server.crt", "server.key")
if err != nil {
log.Fatal(err)
}

s := grpc.NewServer(grpc.Creds(creds))

// 客户端
creds, err := credentials.NewClientTLSFromFile("server.crt", "")
if err != nil {
log.Fatal(err)
}

conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(creds),
)

2. Token 认证

type authentication struct {
user string
password string
}

func (a *authentication) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"user": a.user,
"password": a.password,
}, nil
}

func (a *authentication) RequireTransportSecurity() bool {
return false
}

conn, err := grpc.Dial("localhost:50051",
grpc.WithInsecure(),
grpc.WithPerRPCCredentials(&authentication{"user", "password"}),
)

3. JWT 认证

type jwtAuth struct {
token string
}

func (j *jwtAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + j.token,
}, nil
}

func (j *jwtAuth) RequireTransportSecurity() bool {
return true
}

conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(creds),
grpc.WithPerRPCCredentials(&jwtAuth{token: jwtToken}),
)

截止时间和取消

// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

r, err := c.SayHello(ctx, &pb.HelloRequest{Name: "Alice"})

// 取消请求
ctx, cancel := context.WithCancel(context.Background())
cancel() // 取消请求

错误处理

import "google.golang.org/grpc/status"

// 服务端返回错误
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
if in.Name == "" {
return nil, status.Error(codes.InvalidArgument, "Name is required")
}
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}

// 客户端处理错误
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: ""})
if err != nil {
st, ok := status.FromError(err)
if ok {
log.Printf("Code: %d, Message: %s", st.Code(), st.Message())
}
}

实战示例

用户服务

syntax = "proto3";

package user;

option go_package = "./proto";

service UserService {
rpc GetUser (GetUserRequest) returns (User);
rpc ListUsers (ListUsersRequest) returns (stream User);
rpc CreateUser (CreateUserRequest) returns (User);
rpc UpdateUser (UpdateUserRequest) returns (User);
rpc DeleteUser (DeleteUserRequest) returns (DeleteUserResponse);
}

message User {
int32 id = 1;
string name = 2;
string email = 3;
}

message GetUserRequest {
int32 id = 1;
}

message ListUsersRequest {
int32 page = 1;
int32 limit = 2;
}

message CreateUserRequest {
string name = 1;
string email = 2;
}

message UpdateUserRequest {
int32 id = 1;
string name = 2;
string email = 3;
}

message DeleteUserRequest {
int32 id = 1;
}

message DeleteUserResponse {
bool success = 1;
}

最佳实践

1. 错误码使用

import "google.golang.org/grpc/codes"

status.Error(codes.InvalidArgument, "Invalid argument")
status.Error(codes.NotFound, "Resource not found")
status.Error(codes.AlreadyExists, "Resource already exists")
status.Error(codes.PermissionDenied, "Permission denied")
status.Error(codes.Unauthenticated, "Unauthenticated")
status.Error(codes.Internal, "Internal server error")

2. 重试策略

import "google.golang.org/grpc/backoff"

conn, err := grpc.Dial(
"localhost:50051",
grpc.WithInsecure(),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
MaxDelay: 5 * time.Second,
},
}),
grpc.WithDefaultCallOptions(
grpc.WaitForReady(true),
),
)

3. 负载均衡

import "google.golang.org/grpc/balancer/roundrobin"

conn, err := grpc.Dial(
"localhost:50051,localhost:50052",
grpc.WithInsecure(),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
)

性能优化

1. 连接池

// 复用连接
var conn *grpc.ClientConn

func getClient() (pb.UserServiceClient, error) {
if conn == nil {
var err error
conn, err = grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
return nil, err
}
}
return pb.NewUserServiceClient(conn), nil
}

2. 消息压缩

import "google.golang.org/grpc/encoding/gzip"

conn, err := grpc.Dial(
"localhost:50051",
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(
grpc.UseCompressor(gzip.Name),
),
)

总结

gRPC 是一个现代化的高性能 RPC 框架,特别适合微服务架构和分布式系统。它基于 HTTP/2 和 Protobuf 提供了优异的性能和跨语言支持。掌握 gRPC 对于构建大规模分布式系统是非常重要的。

参考资料