RPC(远程过程调用),翻译为“远程过程调用”,是分布式系统中服务或节点之间的有效通信机制. 通过RPC分布式服务调用 安全,节点(或客户端)可以像在本地一样轻松地调用远程(或服务器)方法或服务. 许多现有的RPC框架都要求公开服务器地址,也就是说,必须知道服务器的IP和RPC端口. 本文将介绍一种不公开IP地址和端口的RPC通信方法. 此方法基于Redis BRPOP / BLPOP操作实现的延迟队列,以及Golang中的goroutine协程异步机制. 整个框架非常简单易懂,并且非常高效,稳定和安全. 该方法已应用于Crawlab中的节点通信,并已成为每个节点即时传输信息的主要方法. 让我们从Crawlab的早期节点通信解决方案PubSub开始,介绍当时遇到的问题和解决方案,然后介绍如何过渡到当前的RPC解决方案以及它在Crawlab中的工作方式.
早期的Crawlab是基于Redis的PubSub,即发布-订阅模型. 这是Redis中主要用于一对多单向通信的方案. 它的用法很简单:
SUBSCRIBE channel1 channel2 ... PUBLISH channelx message
Redis的PubSub可以用作广播模式,即,一个发布者对应于多个订阅者. 在Crawlab中,一个发布者(主节点->工作节点: 节点:
以下是节点通信原理的.
节点将通过Redis的PubSub函数相互通信.
所谓的PubSub只是一个发布-订阅模型. 订阅者将在Redis上订阅频道,其他任何节点都可以充当发布者,以便在该频道上发布消息.
在Crawlab中,主节点将订阅以下节点: 主通道. 如果其他节点需要将消息发送到主节点,则只需要将消息发布到节点: master. 同样,每个工作节点将订阅自己的通道节点:
网络请求的简单过程如下:
客户端(前端应用程序)向主节点(API)发送请求;主节点通过Redis PubSub的
并非所有节点通信都是双向的,也就是说,主节点将仅与工作节点进行单边通信,并且工作节点不会将响应返回给主节点,即所谓的单向通信. 以下是Crawlab中的通信类型.
如果您阅读Crawlab源代码,则会在节点通信中发现很多chan语法,这是Golang的并发功能.
chan表示一个通道,在Golang中分为无缓冲通道和缓冲通道. 我们使用无缓冲通道来阻止协程. 仅当chan接收到信号(chan <-“某些信号”)时,才会释放“阻塞”,协程将继续进行下一步. 在请求-响应模式下,如果是双向通信,则主节点将在接收到请求后生成一个无缓冲通道来阻塞请求. 从工作节点收到消息后,请为未缓冲的通道分配一个值,释放该块,然后将Response返回给客户端.
go命令将启动goroutine(协程)以完成并发. 借助chan,协程可以使用未缓冲的通道暂停,并等待信号执行下一个操作.
PubSub此消息订阅-发布设计模式是实现节点通信的有效方法,但存在两个问题:
PubSub的数据是即时的,并且在Redis崩溃时将丢失. 编写基于PubSub的通信服务将需要使用goroutine和渠道,这会增加开发难度并降低可维护性.
第二个问题更加困难. 如果要添加更,则需要编写大量异步代码,这将增加系统模块之间的耦合,导致可伸缩性差,并且代码难以阅读.
因此,为解决此问题,我们采用了基于Redis延迟消息队列的RPC服务.
下图是基于延迟队列体系结构的RPC实现的.
每个节点都有一个客户端和一个服务器. 客户端用于向目标节点(Target Node)发送消息并接收其返回的消息,服务器用于接收和处理源节点(Source Node)的消息并将消息返回给客户端源节点.
RPC通信的整个流程如下:
源节点的客户端通过LPUSH将消息推送到Redis的节点:
通过这种方式,整个节点的通信过程通过Redis完成. 这样做的好处是,无需公开HTTP的IP地址和端口,只需知道节点ID即可完成RPC通信.
设计的RPC代码更易于理解和维护. 每次您需要扩展新的通信类别时,只需继承rpc.Service类并实现ClientHandle(客户端处理方法)和ServerHandle(服务器端处理方法)方法.
更多关于BRPOP的信息. 它将移出并获取消息队列的最后一个元素. 如果消息队列中没有元素,它将阻塞队列,直到它等待超时或找到可弹出的元素. 因此,与轮训或其他方法相比,使用BRPOP命令可以避免对Redis的不间断请求,并避免浪费网络和计算资源.
如果您不熟悉Redis操作命令,则可以参考掘金手册“ Redis Deep Adventure: 核心原理和应用实践”. 本手册深入介绍了Redis的原理和工程实践. 将Redis应用于实际开发中非常实用.
拥有如此多的理论知识,我们仍然需要看一下代码. 老师经常教我们: “说话很便宜. 给我看代码. ”
由于Crawlab后端是由Golang开发的,因此了解以下代码需要Golang的一些基本知识.
首先,我们需要定义一个用于传输消息的数据结构. 代码显示如下.
package entity type RpcMessage struct { Id string `json:"id"` // 消息ID Method string `json:"method"` // 消息方法 NodeId string `json:"node_id"` // 节点ID Params map[string]string `json:"params"` // 参数 Timeout int `json:"timeout"` // 超时 Result string `json:"result"` // 结果 Error string `json:"error"` // 错误 }
在这里,我们定义消息ID,方法,节点ID,参数和其他字段. 消息ID是一个UUID,可以保证消息ID的唯一性.
首先,我们定义一个抽象的基本接口来促进实际业务逻辑模块的继承. 服务器端处理逻辑在ServerHandle中,它在实体中返回RpcMessage,而客户端逻辑在ClientHandle中.
// RPC服务基础类 type Service interface { ServerHandle() (entity.RpcMessage, error) ClientHandle() (interface{}, error) }
当我们调用客户端的常规方法时,我们需要实现两种逻辑:
发送消息: 生成消息ID,将消息序列化为JSON,然后将LPUSH推送到Redis消息队列中;延迟通过BRPOP检索返回的消息并将其返回给调用方.
以下是已实现的代码.
// 客户端处理消息函数 func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) { return func() (replyMsg entity.RpcMessage, err error) { // 请求ID msg.Id = uuid.NewV4().String() // 发送RPC消息 msgStr := utils.ObjectToString(msg) if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err != nil { log.Errorf("RpcClientFunc error: " + err.Error()) debug.PrintStack() return replyMsg, err } // 获取RPC回复消息 dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout) if err != nil { log.Errorf("RpcClientFunc error: " + err.Error()) debug.PrintStack() return replyMsg, err } // 反序列化消息 if err := json.Unmarshal([]byte(dataStr), &replyMsg); err != nil { log.Errorf("RpcClientFunc error: " + err.Error()) debug.PrintStack() return replyMsg, err } // 如果返回消息有错误,返回错误 if replyMsg.Error != "" { return replyMsg, errors.New(replyMsg.Error) } return } }
服务器处理的逻辑如下,一般逻辑为:
在循环中,通过BRPOP获得与该节点相对应的消息;当获取到消息时,生成goroutine来异步处理消息;继续等待.
您可以在InitRpcService方法中看到以上逻辑. 私有方法handleMsg实现序列化的逻辑,调用服务器端RPC服务方法,并发送返回消息. 如果需要扩展RPC方法类型,请将其添加到工厂类方法GetService中.
// 获取RPC服务 func GetService(msg entity.RpcMessage) Service { switch msg.Method { case constants.RpcInstallLang: return &InstallLangService{msg: msg} case constants.RpcInstallDep: return &InstallDepService{msg: msg} case constants.RpcUninstallDep: return &UninstallDepService{msg: msg} case constants.RpcGetLang: return &GetLangService{msg: msg} case constants.RpcGetInstalledDepList: return &GetInstalledDepsService{msg: msg} } return nil } // 处理RPC消息 func handleMsg(msgStr string, node model.Node) { // 反序列化消息 var msg entity.RpcMessage if err := json.Unmarshal([]byte(msgStr), &msg); err != nil { log.Errorf(err.Error()) debug.PrintStack() } // 获取service service := GetService(msg) // 根据Method调用本地方法 replyMsg, err := service.ServerHandle() if err != nil { log.Errorf(err.Error()) debug.PrintStack() } // 发送返回消息 if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil { log.Errorf(err.Error()) debug.PrintStack() } } // 初始化服务端RPC服务 func InitRpcService() error { go func() { for { // 获取当前节点 node, err := model.GetCurrentNode() if err != nil { log.Errorf(err.Error()) debug.PrintStack() continue } // 获取获取消息队列信息 msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0) if err != nil { if err != redis.ErrNil { log.Errorf(err.Error()) debug.PrintStack() } continue } // 处理消息 go handleMsg(msgStr, node) } }() return nil }
Crawlab节点通常需要为爬网程序安装一些第三方依赖项,例如pymongo,请求等. 其中,我们还需要知道节点上是否已安装某种依赖项,这需要跨服务器通信即分布式网络中的双向通信. 并且此逻辑是通过RPC实现的. 主节点对目标节点进行RPC调用,目标节点运行被调用的方法,并将运行结果(已安装的依赖项列表)返回给客户端,然后客户端返回调用方.
以下代码实现了RPC方法,以将依赖项安装在目标节点上.
// 获取已安装依赖服务 // 继承Service基础类 type GetInstalledDepsService struct { msg entity.RpcMessage } // 服务端处理方法 // 重载ServerHandle func (s *GetInstalledDepsService) ServerHandle() (entity.RpcMessage, error) { lang := utils.GetRaram("lang", s.msg.Params) deps, err := GetInstalledDepsLocal(lang) if err != nil { s.msg.Error = err.Error() return s.msg, err } resultStr, _ := json.Marshal(deps) s.msg.Result = string(resultStr) return s.msg, nil } // 客户端处理方法 // 重载ClientHandle func (s *GetInstalledDepsService) ClientHandle() (o interface{}, err error) { // 发起 RPC 请求,获取服务端数据 s.msg, err = ClientFunc(s.msg)() if err != nil { return o, err } // 反序列化 var output []entity.Dependency if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil { return o, err } o = output return }
编写了RPC服务器和客户端处理方法后,就可以轻松编写调用逻辑. 以下是获取远程安装依赖项列表的方法. 先前定义的GetService工厂类首先由GetService工厂类获取,然后调用其客户端处理方法ClientHandle,并返回结果. 就像在本地调用方法一样. 这不简单吗?
// 获取远端已安装依赖 func GetInstalledDepsRemote(nodeId string, lang string) (deps []entity.Dependency, err error) { params := make(map[string]string) params["lang"] = lang s := GetService(entity.RpcMessage{ NodeId: nodeId, Method: constants.RpcGetInstalledDepList, Params: params, Timeout: 60, }) o, err := s.ClientHandle() if err != nil { return } deps = o.([]entity.Dependency) return }
本文主要介绍基于Redis延迟队列的RPC通信方法. 此方法是一种非常安全的方法,无需公开每个节点或服务的IP地址或端口. 而且,这种方式已经使用Golang在Crawlab中实现了双向通信,尤其是goroutine的异步支持,这使得这种方式的实现变得简单. 实际上,该方法理论上非常有效,并且可以支持高并发数据传输.
但是,在Crawlab的实现中仍然存在一些隐患,即,它不限制服务器上并发处理的数量. 因此,如果传输的消息太多,则服务器资源将已满,从而存在处理速度降低甚至停机的风险. 该修补程序是为了限制服务器上的并发数量. 另外,由于时间的限制,作者还没有时间测试这种RPC通信方法的实际传输效率,并且没有添加容错机制. 因此,仍有很大的改进和优化空间.
尽管如此,该方法对于Crawlab的低并发远程通信已足够,并且在实际使用中没有问题,非常稳定. 对于有隐私要求并且不想公开地址信息的开发人员,我们还建议在实际应用中尝试使用此方法.
以上是本文的全部内容. 希望对大家的学习有所帮助. 我也希望每个人都能支持代码农场网络.
本文来自电脑杂谈,转载请注明本文网址:
http://www.pc-fly.com/a/tongxinshuyu/article-152354-1.html
楼上丢脸
专家都买到假货
说的真好