gRPC-go服务发现&负载均衡
ztj100 2025-05-16 18:04 15 浏览 0 评论
前言
以下示例基于
https://github.com/grpc/grpc-go v1.30.0,关于proto文件定义,服务生成参考gRPC 官方文档中文版
client
grpc使用的是客户端负载均衡模式,每次新建连接的时候会根据负载均衡算法选出服务端的IP然后建立连接。现在grpc默认支持两种算法pick_first(第一次地址) 和 round_robin(轮询)pick_first:pick_first每次都是尝试连接第一个地址,如果连接失败就会尝试下一个,直到连接成功为止,之后的RPC请求都会使用这个连接round_robin:round_robin会对每个地址建立连接,之后的RPC请求会依次通过这些连接发送到后端
客户端新建一个连接
conn, err := grpc.Dial(
fmt.Sprintf("%s:///%s", "game", baseService),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),
grpc.WithInsecure(),
//grpc.WithUnaryInterceptor(unaryClientInterceptor),
//grpc.WithBlock(),
//grpc.WithCompressor Deprecated
)
客户端每次发起请求都需要通过grpc.dail创建一个ClientConn,然后通过ClientConn.XXXX发送请求。
建立连接的各项参数:grpc.WithInsecure:禁用传输认证,没有这个选项必须设置一种认证方式grpc.WithCompressor:在grpc.Dial参数中设置压缩的方式将要被废弃,推荐使用UseCompressor
grpc.UseCompressor(gzip.Name)
conn, err := grpc.Dial(
//...
)
PS:压缩方式客户端应该和服务端对应
grpc.WithBlock():grpc.Dial默认建立连接是异步的,加了这个参数后会等待所有连接建立成功后再返回
grpc.WithUnaryInterceptor:一元拦截器,适用于普通rpc连接,相应的还有流拦截器。拦截器只有第一个生效,所以一般设置一个。拦截器是对请求的一次封装,客户端和服务端都可以设置拦截器,请求的发送/执行都是在拦截器内操作的,所以在请求的前后都可以嵌入用户自定义的代码,类似hook
//客户端拦截器
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
var credsConfigured bool
for _, o := range opts {
_, ok := o.(grpc.PerRPCCredsCallOption)
if ok {
credsConfigured = true
break
}
}
if !credsConfigured {
opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{
AccessToken: fallbackToken,
})))
}
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
end := time.Now()
logger("RPC: %s, start time: %s, end time: %s, err: %v", method, start.Format("Basic"), end.Format(time.RFC3339), err)
return err
}
//服务端拦截器
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// authentication (token verification)
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errMissingMetadata
}
if !valid(md["authorization"]) {
return nil, errInvalidToken
}
m, err := handler(ctx, req)
if err != nil {
logger("RPC failed with error %v", err)
}
return m, err
}
grpc.WithDefaultServiceConfig: 旧的版本可以通过grpc.RoundRobin(),和grpc.WithBalancer()来设置负载均衡,这个版本grpc.RoundRobin()已经取消了,grpc.WithBalancer()和grpc. 也WithBalancerName()标记为废弃。
//service config example
{
"loadBalancingConfig": [ { "round_robin": {} } ],
"methodConfig": [
{
"name": [
{ "service": "foo", "method": "bar" },
{ "service": "baz" }
],
"timeout": "1.0000000001s"
}
]
}
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name))
可以这样设置BalancingPolicy
target: grpc.Dial:的第一个参数,这个参数的主要作用的通过它来找到对应的服务端地址,target传入是一个字符串,统一格式为
scheme://authority/endpoint,然后通过以下方式解析为Target struct
type Target struct {
Scheme string
Authority string
Endpoint string
}
func parseTarget(target string) (ret resolver.Target) {
var ok bool
ret.Scheme, ret.Endpoint, ok = split2(target, "://")
if !ok {
return resolver.Target{Endpoint: target}
}
ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
if !ok {
return resolver.Target{Endpoint: target}
}
return ret
}
解析target的时候有以下几种情况:
- 当前参数有没有直接设置resolverBuilder,如果设置了,直接设置Endpoint=target
- 如果未直接设置resolverBuilder,则通过Scheme来找到resolverBuilder
- 如果通过Scheme没有找到resolverBuilder,resolverBuilder为默认的dns builder,设置
Endpoint=target
所以,真正获取IP地址是通过resolverBuilder这个接口
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
Scheme() string
}
Build():为给定目标创建一个新的resolver,当调用grpc.Dial()时执行。Scheme():返回此resolver方案的名称
type Resolver interface {
ResolveNow(ResolveNowOptions)
Close()
}
ResolveNow():被 gRPC 调用,以尝试再次解析目标名称。只用于提示,可忽略该方法。Close方法:关闭resolver
下面我们看一个示例
func init() {
resolver.Register(&exampleResolverBuilder{})
/*
//注册的时候将Scheme => builder保存到m
func Register(b Builder) {
m[b.Scheme()] = b
}
*/
}
const (
exampleScheme = "example"
exampleServiceName = "lb.example.grpc.io"
)
var addrs = []string{"localhost:50051", "localhost:50052"}
type exampleResolverBuilder struct{}
func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r := &exampleResolver{
target: target,
cc: cc,
addrsStore: map[string][]string{
exampleServiceName: addrs,
},
}
r.start()
return r, nil
}
func (*exampleResolverBuilder) Scheme() string { return exampleScheme }
type exampleResolver struct {
target resolver.Target
cc resolver.ClientConn
addrsStore map[string][]string
}
func (r *exampleResolver) start() {
addrStrs := r.addrsStore[r.target.Endpoint]
addrs := make([]resolver.Address, len(addrStrs))
for i, s := range addrStrs {
addrs[i] = resolver.Address{Addr: s}
}
r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func (*exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (*exampleResolver) Close() {}
func main() {
//...
roundrobinConn, err := grpc.Dial(
// Target{Scheme:exampleScheme,Endpoint:exampleServiceName}
fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),
grpc.WithInsecure(),
grpc.WithBlock(),
)
//...
}
grpc.Dial() 会调用Scheme=>builder 的Build() 方法,之后调用r.start()
r.cc.UpdateState(resolver.State{Addresses: addrs})
UpdateState()将addr更新到cc,也就是外部的连接中,供其他接口使用。
server
server相对来说启动比较简单,一般都会加拦截器来获取matedata或者去recover() panic,又或者打印一些日志
grpc.UseCompressor(gzip.Name)
s := grpc.NewServer(grpc.UnaryInterceptor(unaryServerInterceptor))
//...
matedata: matedata是一个map[string][]string的结构,用来在客户端和服务器之间传输数据。其中的一个作用是可以传递分布式调用环境中的链路id,方便跟踪调试。另外也可以传一些业务相关的数据
客户端拦截器中设置metedata
md := metadata.Pairs("XXX_id",xxxID, "YYY_id", yyyID)
mdOld, _ := metadata.FromIncomingContext(ctx)
md = metadata.Join(mdOld, md)
ctx = metadata.NewOutgoingContext(ctx, md)
//...
invoker(ctx, method, req, reply, cc, opts...)
服务端拦截器获取metadata
var xxxID,yyyID
md, _ := metadata.FromIncomingContext(ctx)
if arr := md["XXX_id"]; len(arr) > 0 {
xxxID = arr[0]
}
if arr := md["YYY_id"]; len(arr) > 0 {
yyyID = arr[0]
}
m, err := handler(ctx, req)
if err != nil {
logger("RPC failed with error %v", err)
}
在server启动之后,需要将这个服务注册到etcd 。用etcd3在编译的时候出现了和groc-go版本不兼容的问题
首先当前用的etcd 版本是 3.4.9,支持的grpc-go最高版本是v1.26.0,于是需要将grpc-go降级replace google.golang.org/grpc => google.golang.org/grpc v1.26.0降级之后之前生成的proto.pb.go 又出现了错误,于是将protobuf降级replace
github.com/golang/protobuf =>
github.com/golang/protobuf v1.2.0以上的问题网上其他人也遇到过,下面的这个不清楚是我本地环境有问题还是其他原因报错原因是
google.golang.org/genproto这个包下面生成的proto.pb.go里面指定了protobuf1.4的版本变量,解决办法还是降级,版本号是在$GOPATH/pkg/mod/... 下面找到的replace
google.golang.org/genproto =>
google.golang.org/genproto
v0.0.0-20180817151627-c66870c02cf8
关于etcd的内容之后再整理吧。
小结
结合etcd 的watch功能,很容易检测某一个路径节点的变化,如果,server端注册两个服务到etcdkey = /project/service/user/1 val = 127.0.0.1:9999key = /project/service/user/2 val = 127.0.0.1:9998
在客户端,如果我们自定义了一个名叫example的resolverBuilder,同时开启一个watch协程 ,监测/project/service下面的节点,动态维护Build()中addrsStore,这个时候我们设置addrsStore[user] = {127.0.0.1:9999,127.0.0.1:9998}。
然后在客户端grpc.Dai中令target = example:///user那么在r.start()中就可以获取到 {127.0.0.1:9999,127.0.0.1:9998}(具体可以看上面示例中r.start()方法)
server注册的key,Build()中addrsStore中的key,以及target 后面的endPoint 的不同选择可以实现不通粒度的服务划分。
转自:
https://www.jianshu.com/p/9507eca8960f?
go语言中文文档:www.topgoer.com
- 上一篇:golang初级进阶(四):函数(下)
- 下一篇:赏析Singleflight设计
相关推荐
- 拒绝躺平,如何使用AOP的环绕通知实现分布式锁
-
如何在分布式环境下,像用synchronized关键字那样使用分布式锁。比如开发一个注解,叫@DistributionLock,作用于一个方法函数上,每次调方法前加锁,调完之后自动释放锁。可以利用Sp...
- 「解锁新姿势」 兄dei,你代码需要优化了
-
前言在我们平常开发过程中,由于项目时间紧张,代码可以用就好,往往会忽视代码的质量问题。甚至有些复制粘贴过来,不加以整理规范。往往导致项目后期难以维护,更别说后续接手项目的人。所以啊,我们要编写出优雅的...
- 消息队列核心面试点讲解(消息队列面试题)
-
Rocketmq消息不丢失一、前言RocketMQ可以理解成一个特殊的存储系统,这个存储系统特殊之处数据是一般只会被使用一次,这种情况下,如何保证这个被消费一次的消息不丢失是非常重要的。本文将分析Ro...
- 秒杀系统—4.第二版升级优化的技术文档二
-
大纲7.秒杀系统的秒杀活动服务实现...
- SpringBoot JPA动态查询与Specification详解:从基础到高级实战
-
一、JPA动态查询概述1.1什么是动态查询动态查询是指根据运行时条件构建的查询,与静态查询(如@Query注解或命名查询)相对。在业务系统中,80%的查询需求都是动态的,例如电商系统中的商品筛选、订...
- Java常用工具类技术文档(java常用工具类技术文档有哪些)
-
一、概述Java工具类(UtilityClasses)是封装了通用功能的静态方法集合,能够简化代码、提高开发效率。本文整理Java原生及常用第三方库(如ApacheCommons、GoogleG...
- Guava 之Joiner 拼接字符串和Map(字符串拼接join的用法)
-
Guave是一个强大的的工具集合,今天给大家介绍一下,常用的拼接字符串的方法,当然JDK也有方便的拼接字符串的方式,本文主要介绍guava的,可以对比使用基本的拼接的话可以如下操作...
- SpringBoot怎么整合Redis,监听Key过期事件?
-
一、修改Redis配置文件1、在Redis的安装目录2、找到redis.windows.conf文件,搜索“notify-keyspace-events”...
- 如何使用Python将多个excel文件数据快速汇总?
-
在数据分析和处理的过程中,Excel文件是我们经常会遇到的数据格式之一。本文将通过一个具体的示例,展示如何使用Python和Pandas库来读取、合并和处理多个Excel文件的数据,并最终生成一个包含...
- 利用Pandas高效处理百万级数据集,速度提升10倍的秘密武器
-
处理大规模数据集,尤其是百万级别的数据量,对效率的要求非常高。使用Pandas时,可以通过一些策略和技巧显著提高数据处理的速度。以下是一些关键的方法,帮助你使用Pandas高效地处理大型数据集,从而实...
- Python进阶-Day 25: 数据分析基础
-
目标:掌握Pandas和NumPy的基本操作,学习如何分析CSV数据集并生成报告。课程内容...
- Pandas 入门教程 - 第五课: 高级数据操作
-
在前几节课中,我们学习了如何使用Pandas进行数据操作和可视化。在这一课中,我们将进一步探索一些高级的数据操作技巧,包括数据透视、分组聚合、时间序列处理以及高级索引和切片。高级索引和切片...
- 原来这才是Pandas!(原来这才是薯片真正的吃法)
-
听到一些人说,Pandas语法太乱、太杂了,根本记不住。...
- python(pandas + numpy)数据分析的基础
-
数据NaN值排查,统计,排序...
- 利用Python进行数据分组/数据透视表
-
1.数据分组源数据表如下所示:1.1分组键是列名分组键是列名时直接将某一列或多列的列名传给groupby()方法,groupby()方法就会按照这一列或多列进行分组。按照一列进行分组...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)
- vmware17pro最新密钥 (34)
- mysql单表最大数据量 (35)