gRPC
gRPC (Google Remote Procedure Call) 是一个高性能、开源的 RPC 框架,最初由 Google 开发。它使用 Protocol Buffers 作为接口定义语言,支持多种编程语言,特别适合微服务架构。
简介
gRPC 特性
gRPC 核心特性:
- 高性能: 基于 HTTP/2 和 Protobuf,性能优异
- 跨语言: 支持多种编程语言
- 接口定义: 使用 Protobuf 定义服务
- 流式处理: 支持单向流和双向流
- 负载均衡: 内置负载均衡和重试机制
- 认证: 支持多种认证机制
- 双向流: 支持客户端和服务端流
- 超时控制: 精确的超时和取消控制
适用场景:
- 微服务架构
- 分布式系统
- 高性能 API
- 实时通信
- 跨语言服务调用
安装 gRPC
# 安装 gRPC 和 Protobuf
go get -u google.golang.org/grpc
go get -u google.golang.org/protobuf
# 安装 protoc 编译器
# macOS
brew install protobuf
# Linux
apt-get install protobuf-compiler
# 安装 Go Protobuf 插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
快速开始
1. 定义服务 (proto/hello.proto)
syntax = "proto3";
package hello;
option go_package = "./proto";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc SayHelloStream (stream HelloRequest) returns (stream HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
2. 生成 Go 代码
# 生成 gRPC 代码
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/hello.proto
3. 服务端实现
package main
import (
"context"
"log"
"net"
"google.golang.org/grpc"
pb "my-project/proto"
)
type server struct {
pb.UnimplementedGreeterServer
}
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.GetName())
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
log.Printf("Server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}
4. 客户端实现
package main
import (
"context"
"log"
"time"
"google.golang.org/grpc"
pb "my-project/proto"
)
func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: "Alice"})
if err != nil {
log.Fatalf("Could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}
Protobuf 基础
1. 消息定义
syntax = "proto3";
message User {
int32 id = 1;
string name = 2;
string email = 3;
repeated string tags = 4;
map<string, string> metadata = 5;
enum UserType {
ADMIN = 0;
USER = 1;
GUEST = 2;
}
UserType type = 6;
oneof contact {
string phone = 7;
string address = 8;
}
}
2. 服务定义
service UserService {
// 简单 RPC
rpc GetUser (GetUserRequest) returns (User);
// 服务端流式 RPC
rpc ListUsers (ListUsersRequest) returns (stream User);
// 客户端流式 RPC
rpc CreateUser (stream User) returns (CreateUserResponse);
// 双向流式 RPC
rpc Chat (stream Message) returns (stream Message);
}
gRPC 模式
1. 简单 RPC (Unary)
rpc SayHello (HelloRequest) returns (HelloReply);
// 服务端
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
// 客户端
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: "Alice"})
2. 服务端流式 RPC
rpc ListUsers (ListUsersRequest) returns (stream User);
// 服务端
func (s *server) ListUsers(in *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {
users := []*pb.User{
{Id: 1, Name: "Alice"},
{Id: 2, Name: "Bob"},
}
for _, user := range users {
if err := stream.Send(user); err != nil {
return err
}
}
return nil
}
// 客户端
stream, err := c.ListUsers(ctx, &pb.ListUsersRequest{})
for {
user, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
log.Printf("User: %v", user)
}
3. 客户端流式 RPC
rpc CreateUser (stream User) returns (CreateUserResponse);
// 服务端
func (s *server) CreateUser(stream pb.UserService_CreateUserServer) error {
var count int32
for {
user, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.CreateUserResponse{Count: count})
}
if err != nil {
return err
}
count++
}
}
// 客户端
stream, err := c.CreateUser(ctx)
for _, user := range users {
if err := stream.Send(user); err != nil {
log.Fatal(err)
}
}
r, err := stream.CloseAndRecv()
4. 双向流式 RPC
rpc Chat (stream Message) returns (stream Message);
// 服务端
func (s *server) Chat(stream pb.ChatService_ChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// 处理消息并返回
if err := stream.Send(&pb.Message{
Content: "Echo: " + in.Content,
}); err != nil {
return err
}
}
}
// 客户端
stream, err := c.Chat(ctx)
waitc := make(chan struct{})
// 接收 goroutine
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
close(waitc)
return
}
log.Printf("Received: %s", in.Content)
}
}()
// 发送消息
for _, msg := range messages {
if err := stream.Send(msg); err != nil {
log.Fatal(err)
}
}
stream.CloseSend()
<-waitc
拦截器 (Interceptor)
1. 一元拦截器
// 服务端拦截器
func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log.Printf("Method: %s, Request: %v", info.FullMethod, req)
return handler(ctx, req)
}
// 使用拦截器
s := grpc.NewServer(
grpc.UnaryInterceptor(loggingInterceptor),
)
2. 流式拦截器
func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
log.Printf("Streaming Method: %s", info.FullMethod)
return handler(srv, ss)
}
s := grpc.NewServer(
grpc.StreamInterceptor(streamInterceptor),
)
3. 客户端拦截器
func clientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
log.Printf("Calling method: %s", method)
return invoker(ctx, method, req, reply, cc, opts...)
}
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(clientInterceptor),
)
认证和安全
1. SSL/TLS
// 服务端
creds, err := credentials.LoadTLSCredentials("server.crt", "server.key")
if err != nil {
log.Fatal(err)
}
s := grpc.NewServer(grpc.Creds(creds))
// 客户端
creds, err := credentials.NewClientTLSFromFile("server.crt", "")
if err != nil {
log.Fatal(err)
}
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(creds),
)
2. Token 认证
type authentication struct {
user string
password string
}
func (a *authentication) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"user": a.user,
"password": a.password,
}, nil
}
func (a *authentication) RequireTransportSecurity() bool {
return false
}
conn, err := grpc.Dial("localhost:50051",
grpc.WithInsecure(),
grpc.WithPerRPCCredentials(&authentication{"user", "password"}),
)
3. JWT 认证
type jwtAuth struct {
token string
}
func (j *jwtAuth) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
return map[string]string{
"authorization": "Bearer " + j.token,
}, nil
}
func (j *jwtAuth) RequireTransportSecurity() bool {
return true
}
conn, err := grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(creds),
grpc.WithPerRPCCredentials(&jwtAuth{token: jwtToken}),
)
截止时间和取消
// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: "Alice"})
// 取消请求
ctx, cancel := context.WithCancel(context.Background())
cancel() // 取消请求
错误处理
import "google.golang.org/grpc/status"
// 服务端返回错误
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
if in.Name == "" {
return nil, status.Error(codes.InvalidArgument, "Name is required")
}
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
// 客户端处理错误
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: ""})
if err != nil {
st, ok := status.FromError(err)
if ok {
log.Printf("Code: %d, Message: %s", st.Code(), st.Message())
}
}
实战示例
用户服务
syntax = "proto3";
package user;
option go_package = "./proto";
service UserService {
rpc GetUser (GetUserRequest) returns (User);
rpc ListUsers (ListUsersRequest) returns (stream User);
rpc CreateUser (CreateUserRequest) returns (User);
rpc UpdateUser (UpdateUserRequest) returns (User);
rpc DeleteUser (DeleteUserRequest) returns (DeleteUserResponse);
}
message User {
int32 id = 1;
string name = 2;
string email = 3;
}
message GetUserRequest {
int32 id = 1;
}
message ListUsersRequest {
int32 page = 1;
int32 limit = 2;
}
message CreateUserRequest {
string name = 1;
string email = 2;
}
message UpdateUserRequest {
int32 id = 1;
string name = 2;
string email = 3;
}
message DeleteUserRequest {
int32 id = 1;
}
message DeleteUserResponse {
bool success = 1;
}
最佳实践
1. 错误码使用
import "google.golang.org/grpc/codes"
status.Error(codes.InvalidArgument, "Invalid argument")
status.Error(codes.NotFound, "Resource not found")
status.Error(codes.AlreadyExists, "Resource already exists")
status.Error(codes.PermissionDenied, "Permission denied")
status.Error(codes.Unauthenticated, "Unauthenticated")
status.Error(codes.Internal, "Internal server error")
2. 重试策略
import "google.golang.org/grpc/backoff"
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithInsecure(),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
MaxDelay: 5 * time.Second,
},
}),
grpc.WithDefaultCallOptions(
grpc.WaitForReady(true),
),
)
3. 负载均衡
import "google.golang.org/grpc/balancer/roundrobin"
conn, err := grpc.Dial(
"localhost:50051,localhost:50052",
grpc.WithInsecure(),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
)
性能优化
1. 连接池
// 复用连接
var conn *grpc.ClientConn
func getClient() (pb.UserServiceClient, error) {
if conn == nil {
var err error
conn, err = grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
return nil, err
}
}
return pb.NewUserServiceClient(conn), nil
}
2. 消息压缩
import "google.golang.org/grpc/encoding/gzip"
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(
grpc.UseCompressor(gzip.Name),
),
)
总结
gRPC 是一个现代化的高性能 RPC 框架,特别适合微服务架构和分布式系统。它基于 HTTP/2 和 Protobuf 提供了优异的性能和跨语言支持。掌握 gRPC 对于构建大规模分布式系统是非常重要的。