玩转.Net gRPC简单使用详细教程

gRPC是高性能的RPC框架, 有效地用于服务通信(不管是数据中心内部还是跨数据中心)。

由Google开源,目前是一个Cloud Native Computing Foundation(CNCF)孵化项目。

它的主要优点:

  • 现代高性能轻量级 RPC 框架。
  • 约定优先的 API 开发,默认使用 Protocol Buffers 作为描述语言,允许与语言无关的实现。
  • 可用于多种语言的工具,以生成强类型的服务器和客户端。
  • 支持客户端,服务器双向流调用。
  • 通过Protocol Buffers二进制序列化减少网络使用。
  • 使用 HTTP/2 进行传输

这些优点使gRPC非常适合:

  • 高性能轻量级微服务 - gRPC设计为低延迟和高吞吐量通信,非常适合需要高性能的轻量级微服务。
  • 多语言混合开发 - gRPC工具支持所有流行的开发语言,使gRPC成为多语言开发环境的理想选择。
  • 点对点实时通信 - gRPC对双向流调用提供出色的支持。gRPC服务可以实时推送消息而无需轮询。
  • 网络受限环境 - 使用 Protocol Buffers二进制序列化消息,该序列化始终小于等效的JSON消息,对网络带宽需求比JSON小

不建议使用gRPC的场景:

  • 浏览器可访问的API - 浏览器不完全支持gRPC。虽然gRPC-Web可以提供浏览器支持,但是它有局限性,引入了服务器代理
  • 广播实时通信 - gRPC支持通过流进行实时通信,但不存在向已注册连接广播消息的概念
  • 进程间通信 - 进程必须承载HTTP/2才能接受传入的gRPC调用,对于Windows,进程间通信管道是一种更快速的方法。

 

在本文中,我将向您展示如何使用.NET5创建gRPC服务。我将分解gRPC的一些重要基础概念,并给出一个通信示例。

 

1.1 The RPC Service Definition

 

gRPC使用的传输协议: Protocol Buffers

消息传输类型:
1 一元消息 请求-响应
2server streaming(流) server会把数据streaming回给client  
3 client streaming client把数据streaming给server            
4 双向streaming   

首先 咱们创建一个空 Asp.Net Core Web应用 命名 GrpcServer.Web

创建一个Protos文件夹和文件Message.proto

syntax = "proto3";

option csharp_namespace = "GrpcServer.Web.Protos";

message Employee{
	int32 id = 1;
	int32 no =2;
	string firstname=3;
	string lastname =4;
	float salary = 5;
}

message GetByNoRequest{
	int32 no = 1;
}

message EmployeeResponse{
	Employee employee = 1;
}

message GetAllRequest{

}

message AddPhotoRequest{
	bytes data = 1;
}

message AddPhotoResponse{
	bool isOk = 1;
}

message EmployeeRequest{
	Employee employee = 1;
}

service EmployeeService{
	rpc GetByNo(GetByNoRequest) returns (EmployeeResponse);
	rpc GetAll(GetAllRequest) returns (stream EmployeeResponse);
	rpc AddPhoto(stream AddPhotoRequest) returns (AddPhotoResponse);

	rpc Save(EmployeeRequest) returns (EmployeeResponse);
	rpc SaveAll(stream EmployeeRequest) returns (stream EmployeeResponse);
}

让我们分解一下.proto文件,了解protocol buffers的基本语法

从.proto文件上大致知道 定义的服务功能 (给某人一个回应), 这里提示一些语法:

①. syntax指示使用的protocol buffers的版本。在这种情况下,proto3是撰写本文时的最新版本。

②. csharp_namespace指示生成的文件所在的命名空间package说明符也是这个作用,用于防止协议消息类型之间的名称冲突。

对于C#,如果提供选项csharp_namespace,csharp_namespace值将用作命名空间;
在Java中,如果提供选项java_package,java_package将用作包名称。

③. service EmployeeService定义服务基类名称,rpc GetByNo(GetByNoRequest) returns (EmployeeResponse);是一个一元rpc调用

④. GetByNoRequest和EmployeeResponse是在客户端和服务器之间交换信息的数据结构。它们被称为消息
你在消息字段中定义的数字是不可重复的,当消息被序列化为Protobuf时,该数字用于标识字段,这是因为序列化一个数字比序列化整个字段名称要快。

1.2 实现服务接口

在nuget中安装包 Grpc.AspNetCore

右键Message.proto 点属性 设置成如下

设置完后请重新生成项目

 

现在创建一些测试数据:

创建一个Data文件夹 创建InMemoryData.cs

using GrpcServer.Web.Protos;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace GrpcServer.Web.Data
{
    public class InMemoryData
    {
        public static List<Employee> Employees = new List<Employee>()
        {
            new Employee
            {
                Id = 1, No= 1994, Firstname = "Chandler", Lastname = "Bing", Salary = 2200
            },
             new Employee
            {
                Id = 2, No= 1999, Firstname = "Rachel", Lastname = "Green", Salary = 2400
            },
               new Employee
            {
                Id = 3, No= 2004, Firstname = "3", Lastname = "333", Salary = 2600
            },
        };
    }
}

接着创建Services文件夹,创建MyEmployeeService.cs ,内容如下

using Grpc.Core;
using GrpcServer.Web.Data;
using GrpcServer.Web.Protos;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using static GrpcServer.Web.Protos.EmployeeService;

namespace GrpcServer.Web.Services
{
    public class MyEmployeeService : EmployeeServiceBase
    {
        private readonly ILogger<MyEmployeeService> logger;

        public MyEmployeeService(ILogger<MyEmployeeService> logger)
        {
            this.logger = logger;
        }


        
        public override Task<EmployeeResponse> GetByNo(GetByNoRequest request, ServerCallContext context)
        {
            var md = context.RequestHeaders;
            foreach (var pair in md)
            {
                logger.LogInformation($"{pair.Key}:{pair.Value}");
            }

            var employee = InMemoryData.Employees.SingleOrDefault(x => x.No == request.No);
            if (employee != null)
            {
                var response = new EmployeeResponse
                {
                     Employee = employee
                };
                return Task.FromResult(response);
            }
            throw new Exception("Employee not found no");
        }

        public override async Task GetAll(GetAllRequest request, IServerStreamWriter<EmployeeResponse> responseStream, ServerCallContext context)
        {
            foreach (var employe in InMemoryData.Employees)
            {
                await responseStream.WriteAsync(new EmployeeResponse { 
                    Employee = employe
                });
            }
        }

        public override async Task<AddPhotoResponse> AddPhoto(IAsyncStreamReader<AddPhotoRequest> requestStream, ServerCallContext context)
        {
            Metadata md = context.RequestHeaders;
            foreach (var pair in md)
            {
                logger.LogInformation($"{pair.Key}:{pair.Value}");
            }
            var data = new  List<byte>();
            while (await requestStream.MoveNext())
            {
                logger.LogInformation($"Received{requestStream.Current.Data.Length} bytes");
                data.AddRange(requestStream.Current.Data);
            } 
            logger.LogInformation($"Received file with {data.Count} bytes");

            await System.IO.File.WriteAllBytesAsync("logo.jpg", data.ToArray());
            return new AddPhotoResponse
            {
                IsOk = true
            };
        }

        public override Task<EmployeeResponse> Save(EmployeeRequest request, ServerCallContext context)
        {
            Metadata md = context.RequestHeaders;
            foreach (var pair in md)
            {
                logger.LogInformation($"{pair.Key}:{pair.Value}");
            }

            logger.LogInformation(request.Employee.ToString());
            

            var response = new EmployeeResponse
            {
                Employee = InMemoryData.Employees.FirstOrDefault()
            };
            return Task.FromResult(response);
        }

        public override async Task SaveAll(IAsyncStreamReader<EmployeeRequest> requestStream, IServerStreamWriter<EmployeeResponse> responseStream, ServerCallContext context)
        {
            while (await requestStream.MoveNext())
            {
                var employee = requestStream.Current.Employee;
                lock (this)
                {
                    InMemoryData.Employees.Add(employee);
                }

                await responseStream.WriteAsync(new EmployeeResponse { Employee = employee  });
            }
            logger.LogInformation($"Employees:");
            foreach (var employee in InMemoryData.Employees)
            {
                logger.LogInformation(employee.ToString());
            }


        }

    }
}

 

在Startup.cs中注册相关服务:

services.AddGrpc();

app.UseRouting();
app.UseHttpsRedirection();
app.UseEndpoints(endpoints =>
{
    endpoints.MapGrpcService<MyEmployeeService>();
);

然后启动服务,我这里地址是https://localhost:5001

 

2. 创建gRPC .NET控制台客户端

Visual Studio创建一个名为GrpcClient的新控制台项目。

安装如下nuget包:

Grpc.Net.Client
Google.Protobuf
Grpc.Tools

拷贝服务端项目中的..proto文件 ,如下

设置完后请重新生成项目

接着Program.cs内容如下:

using Google.Protobuf;
using Grpc.Core;
using Grpc.Net.Client;
using GrpcServer.Web.Protos;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;

namespace GrpcClient
{
    class Program
    {
        static async Task Main(string[] args)
        {

            using var channel = GrpcChannel.ForAddress("https://localhost:5001");
            var client = new EmployeeService.EmployeeServiceClient(channel);


            var option = int.Parse(args[0]);
            switch (option)
            {
                case 1:
                    await GetByNoAsync(client);
                    break;
                case 2:
                    await GetAllAsync(client);
                    break;
                case 3:
                    await AddPhotoAsync(client);
                    break;
                case 4:
                    await SaveAsync(client);
                    break;
                case 5:
                    await SaveAllAsync(client);
                    break;
            }


           

            Console.ReadKey();
        }

        public static async Task GetByNoAsync(EmployeeService.EmployeeServiceClient client)
        {
            var md = new Metadata
            {
                { "username","bing"},
                { "role","admin"},
            };
            var response = await client.GetByNoAsync(new GetByNoRequest()
            {
                No = 1994
            }, md);

            Console.WriteLine(response);
        }

        public static async Task GetAllAsync(EmployeeService.EmployeeServiceClient client)
        {
            using var call = client.GetAll(new GetAllRequest());
            var responseStream = call.ResponseStream;
            while (await responseStream.MoveNext())
            {
                Console.WriteLine(responseStream.Current.Employee);
            }
           
        }

        public static async Task AddPhotoAsync(EmployeeService.EmployeeServiceClient client)
        {
            var md = new Metadata
            {
                { "username","bing"},
                { "role","admin"},
            };
            FileStream fs = File.OpenRead("logo.jpg");
            using var call = client.AddPhoto(md);

            var stream = call.RequestStream;
            

            while (true)
            {
                byte[] buffer = new byte[1024];
                int numRead = await fs.ReadAsync(buffer,0, buffer.Length);
                if (numRead == 0)
                {
                    break;
                }
                if (numRead < buffer.Length)
                {
                    Array.Resize(ref buffer,numRead);
                }
                await stream.WriteAsync(new AddPhotoRequest() 
                {
                    Data = ByteString.CopyFrom(buffer) 
                });
            }
            await stream.CompleteAsync(); //告诉服务端传完了

            var res = await call.ResponseAsync;
            Console.WriteLine(res.IsOk);
        }

        public static async Task SaveAsync(EmployeeService.EmployeeServiceClient client)
        {
            var md = new Metadata
            {
                { "username","bing"},
                { "role","admin"},
            };
            EmployeeRequest employeeRequest = new EmployeeRequest();
            employeeRequest.Employee = new Employee() { Id = 1, No = 1994, Firstname = "Chandler", Lastname = "Bing", Salary = 2200 };
            var call = await client.SaveAsync(employeeRequest,md);

            Console.WriteLine(call.Employee);
        }

        public static async Task SaveAllAsync(EmployeeService.EmployeeServiceClient client)
        {
            var emloyees = new List<Employee>()
            {
                new Employee
                {
                    Id = 1, No= 1994, Firstname = "Chandler", Lastname = "Bing", Salary = 2200
                },
                new Employee
                {
                    Id = 2, No= 1999, Firstname = "Rachel", Lastname = "Green", Salary = 2400
                },
               new Employee
                {
                    Id = 3, No= 2004, Firstname = "3", Lastname = "333", Salary = 2600
                },
            };
            using var call = client.SaveAll();
            var requestStream = call.RequestStream;
            var responseStram = call.ResponseStream;

            var responseTask = Task.Run(async () =>
            {
                while (await responseStram.MoveNext())
                {
                    Console.WriteLine($"Saved:{responseStram.Current.Employee}");
                }
            });
            foreach (var item in emloyees)
            {
                await requestStream.WriteAsync(new EmployeeRequest()
                {
                     Employee= item
                });
            }


            //一定按这个顺序
            await requestStream.CompleteAsync();
            await responseTask;

        }
    }
}

运行程序命令  dotnet run 1/2/3/4/5

 

3. 其他核心功能

3.1 通信方式

  • Unary RPC(一元Rpc调用): 上面的例子
  • Server streaming RPC : 服务器流式RPC,客户端在其中向服务器发送请求,并获取流以读取回一系列消息。客户端从返回的流中读取,直到没有更多消息为止。 gRPC保证单个RPC调用中的消息顺序。
  • Client streaming RPC:客户端流式RPC,客户端在其中编写一系列消息,然后再次使用提供的流将它们发送到服务器。客户端写完消息后,它将等待服务器读取消息并返回响应。同样,gRPC保证了单个RPC调用中的消息顺序
  • Bidirectional streaming RPC: 双向流式RPC,双方都使用读写流发送一系列消息。这两个流是独立运行的,因此客户端和服务器可以按照自己喜欢的顺序进行读写:例如,服务器可以在写响应之前等待接收所有客户端消息,或者可以先读取一条消息再写入一条消息,或读写的其他组合。每个流中的消息顺序都会保留。

3.2 Metadata

元数据是以键值对列表的形式提供的有关特定RPC调用的信息(例如身份验证详细信息),其中键是字符串,值通常是字符串,但可以是二进制数据。

元数据对于gRPC本身是不透明的:它允许客户端向服务器提供与调用相关的信息,反之亦然。

3.3 Channels

gRPC通道提供到指定主机和端口上的gRPC服务器的连接。
创建客户端存根时用到它,可以指定通道参数来修改gRPC的默认行为,例如打开或关闭消息压缩。
通道具有状态,包括已连接和空闲。

 

总结

gRPC是具有可插拔身份验证和负载平衡功能的高性能RPC框架。
使用protocol buffers定义结构化数据;使用不同语言自动产生的源代码在各种数据流中写入和读取结构化数据。

在本文中,您学习了如何使用protocol buffers(版本3)定义服务接口以及如何使用C#实现服务。最后,您使用gRPC双向流式通信创建了 简单的Demo。