b2科目四模拟试题多少题驾考考爆了怎么补救
b2科目四模拟试题多少题 驾考考爆了怎么补救

与Golang的基于Redis的安全高效的RPC通信

电脑杂谈  发布时间:2020-03-26 00:24:02  来源:网络整理

分布式服务调用 安全_zookeeper分布式锁服务_基于dubbo框架构建分布式服务

RPC(远程过程调用),翻译为“远程过程调用”,是分布式系统中服务或节点之间的有效通信机制. 通过RPC分布式服务调用 安全,节点(或客户端)可以像在本地一样轻松地调用远程(或服务器)方法或服务. 许多现有的RPC框架都要求公开服务器地址,也就是说,必须知道服务器的IP和RPC端口. 本文将介绍一种不公开IP地址和端口的RPC通信方法. 此方法基于Redis BRPOP / BLPOP操作实现的延迟队列,以及Golang中的goroutine协程异步机制. 整个框架非常简单易懂,并且非常高效,稳定和安全. 该方法已应用于Crawlab中的节点通信,并已成为每个节点即时传输信息的主要方法. 让我们从Crawlab的早期节点通信解决方案PubSub开始,介绍当时遇到的问题和解决方案,然后介绍如何过渡到当前的RPC解决方案以及它在Crawlab中的工作方式.

早期的Crawlab是基于Redis的PubSub,即发布-订阅模型. 这是Redis中主要用于一对多单向通信的方案. 它的用法很简单:

SUBSCRIBE channel1 channel2 ...
PUBLISH channelx message

Redis的PubSub可以用作广播模式,即,一个发布者对应于多个订阅者. 在Crawlab中,一个发布者(主节点->工作节点: 节点: )只有一个订阅者,多个发布者(工作节点->主节点: 节点: master>)只有一个订阅者. 这是为了促进双向通信.

以下是节点通信原理的.

用 Golang 实现基于 Redis 的安全高效 RPC 通信

节点将通过Redis的PubSub函数相互通信.

所谓的PubSub只是一个发布-订阅模型. 订阅者将在Redis上订阅频道,其他任何节点都可以充当发布者,以便在该频道上发布消息.

在Crawlab中,主节点将订阅以下节点: 主通道. 如果其他节点需要将消息发送到主节点,则只需要将消息发布到节点: master. 同样,每个工作节点将订阅自己的通道节点: (node_id是MongoDB和MongoDB ObjectId中的节点ID). 如果需要将消息发送到工作程序节点,则只需将消息发布到通道. 已经.

网络请求的简单过程如下:

zookeeper分布式锁服务_分布式服务调用 安全_基于dubbo框架构建分布式服务

客户端(前端应用程序)向主节点(API)发送请求;主节点通过Redis PubSub的通道向相应的工作节点发布消息;工作节点收到消息后,将执行一些操作,然后通过通道将相应的消息释放到主节点;主节点收到消息后,它将消息返回给客户端.

并非所有节点通信都是双向的,也就是说,主节点将仅与工作节点进行单边通信,并且工作节点不会将响应返回给主节点,即所谓的单向通信. 以下是Crawlab中的通信类型.

如果您阅读Crawlab源代码,则会在节点通信中发现很多chan语法,这是Golang的并发功能.

chan表示一个通道,在Golang中分为无缓冲通道和缓冲通道. 我们使用无缓冲通道来阻止协程. 仅当chan接收到信号(chan <-“某些信号”)时,才会释放“阻塞”,协程将继续进行下一步. 在请求-响应模式下,如果是双向通信,则主节点将在接收到请求后生成一个无缓冲通道来阻塞请求. 从工作节点收到消息后,请为未缓冲的通道分配一个值,释放该块,然后将Response返回给客户端.

go命令将启动goroutine(协程)以完成并发. 借助chan,协程可以使用未缓冲的通道暂停,并等待信号执行下一个操作.

PubSub此消息订阅-发布设计模式是实现节点通信的有效方法,但存在两个问题:

PubSub的数据是即时的,并且在Redis崩溃时将丢失. 编写基于PubSub的通信服务将需要使用goroutine和渠道,这会增加开发难度并降低可维护性.

第二个问题更加困难. 如果要添加更,则需要编写大量异步代码,这将增加系统模块之间的耦合,导致可伸缩性差,并且代码难以阅读.

因此,为解决此问题,我们采用了基于Redis延迟消息队列的RPC服务.

下图是基于延迟队列体系结构的RPC实现的.

分布式服务调用 安全_基于dubbo框架构建分布式服务_zookeeper分布式锁服务

用 Golang 实现基于 Redis 的安全高效 RPC 通信

每个节点都有一个客户端和一个服务器. 客户端用于向目标节点(Target Node)发送消息并接收其返回的消息,服务器用于接收和处理源节点(Source Node)的消息并将消息返回给客户端源节点.

RPC通信的整个流程如下:

源节点的客户端通过LPUSH将消息推送到Redis的节点: ,并执行BRPOP节点: : 阻止并侦听此消息队列;目标节点的服务器始终使用BRPOP. 侦听节点后: ,收到消息后,通过消息中的Method字段执行相应的程序;目标节点执行完毕后分布式服务调用 安全,服务器通过LPUSH将消息推送到Redis的节点: : ,因为源节点客户端一直在侦听消息队列: nodes: : ,当目标节点服务器将消息推送到此队列时,源节点客户端将立即接收返回的消息,然后进行后续处理.

通过这种方式,整个节点的通信过程通过Redis完成. 这样做的好处是,无需公开HTTP的IP地址和端口,只需知道节点ID即可完成RPC通信.

设计的RPC代码更易于理解和维护. 每次您需要扩展新的通信类别时,只需继承rpc.Service类并实现ClientHandle(客户端处理方法)和ServerHandle(服务器端处理方法)方法.

更多关于BRPOP的信息. 它将移出并获取消息队列的最后一个元素. 如果消息队列中没有元素,它将阻塞队列,直到它等待超时或找到可弹出的元素. 因此,与轮训或其他方法相比,使用BRPOP命令可以避免对Redis的不间断请求,并避免浪费网络和计算资源.

如果您不熟悉Redis操作命令,则可以参考掘金手册“ Redis Deep Adventure: 核心原理和应用实践”. 本手册深入介绍了Redis的原理和工程实践. 将Redis应用于实际开发中非常实用.

拥有如此多的理论知识,我们仍然需要看一下代码. 老师经常教我们: “说话很便宜. 给我看代码. ”

由于Crawlab后端是由Golang开发的,因此了解以下代码需要Golang的一些基本知识.

分布式服务调用 安全_基于dubbo框架构建分布式服务_zookeeper分布式锁服务

首先,我们需要定义一个用于传输消息的数据结构. 代码显示如下.

package entity
type RpcMessage struct {
    Id      string            `json:"id"`      // 消息ID
    Method  string            `json:"method"`  // 消息方法
    NodeId  string            `json:"node_id"` // 节点ID
    Params  map[string]string `json:"params"`  // 参数
    Timeout int               `json:"timeout"` // 超时
    Result  string            `json:"result"`  // 结果
    Error   string            `json:"error"`   // 错误
}

在这里,我们定义消息ID,方法,节点ID,参数和其他字段. 消息ID是一个UUID,可以保证消息ID的唯一性.

首先,我们定义一个抽象的基本接口来促进实际业务逻辑模块的继承. 服务器端处理逻辑在ServerHandle中,它在实体中返回RpcMessage,而客户端逻辑在ClientHandle中.

// RPC服务基础类
type Service interface {
    ServerHandle() (entity.RpcMessage, error)
    ClientHandle() (interface{}, error)
}

当我们调用客户端的常规方法时,我们需要实现两种逻辑:

发送消息: 生成消息ID,将消息序列化为JSON,然后将LPUSH推送到Redis消息队列中;延迟通过BRPOP检索返回的消息并将其返回给调用方.

以下是已实现的代码.

// 客户端处理消息函数
func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) {
    return func() (replyMsg entity.RpcMessage, err error) {
        // 请求ID
        msg.Id = uuid.NewV4().String()
        // 发送RPC消息
        msgStr := utils.ObjectToString(msg)
        if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err != nil {
            log.Errorf("RpcClientFunc error: " + err.Error())
            debug.PrintStack()
            return replyMsg, err
        }
        // 获取RPC回复消息
        dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout)
        if err != nil {
            log.Errorf("RpcClientFunc error: " + err.Error())
            debug.PrintStack()
            return replyMsg, err
        }
        // 反序列化消息
        if err := json.Unmarshal([]byte(dataStr), &replyMsg); err != nil {
            log.Errorf("RpcClientFunc error: " + err.Error())
            debug.PrintStack()
            return replyMsg, err
        }
        // 如果返回消息有错误,返回错误
        if replyMsg.Error != "" {
            return replyMsg, errors.New(replyMsg.Error)
        }
        return
    }
}

服务器处理的逻辑如下,一般逻辑为:

基于dubbo框架构建分布式服务_分布式服务调用 安全_zookeeper分布式锁服务

在循环中,通过BRPOP获得与该节点相对应的消息;当获取到消息时,生成goroutine来异步处理消息;继续等待.

您可以在InitRpcService方法中看到以上逻辑. 私有方法handleMsg实现序列化的逻辑,调用服务器端RPC服务方法,并发送返回消息. 如果需要扩展RPC方法类型,请将其添加到工厂类方法GetService中.

// 获取RPC服务
func GetService(msg entity.RpcMessage) Service {
    switch msg.Method {
    case constants.RpcInstallLang:
        return &InstallLangService{msg: msg}
    case constants.RpcInstallDep:
        return &InstallDepService{msg: msg}
    case constants.RpcUninstallDep:
        return &UninstallDepService{msg: msg}
    case constants.RpcGetLang:
        return &GetLangService{msg: msg}
    case constants.RpcGetInstalledDepList:
        return &GetInstalledDepsService{msg: msg}
    }
    return nil
}
// 处理RPC消息
func handleMsg(msgStr string, node model.Node) {
    // 反序列化消息
    var msg entity.RpcMessage
    if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
        log.Errorf(err.Error())
        debug.PrintStack()
    }
    // 获取service
    service := GetService(msg)
    // 根据Method调用本地方法
    replyMsg, err := service.ServerHandle()
    if err != nil {
        log.Errorf(err.Error())
        debug.PrintStack()
    }
    // 发送返回消息
    if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil {
        log.Errorf(err.Error())
        debug.PrintStack()
    }
}
// 初始化服务端RPC服务
func InitRpcService() error {
    go func() {
        for {
            // 获取当前节点
            node, err := model.GetCurrentNode()
            if err != nil {
                log.Errorf(err.Error())
                debug.PrintStack()
                continue
            }
            // 获取获取消息队列信息
            msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
            if err != nil {
                if err != redis.ErrNil {
                    log.Errorf(err.Error())
                    debug.PrintStack()
                }
                continue
            }
            // 处理消息
            go handleMsg(msgStr, node)
        }
    }()
    return nil
}

Crawlab节点通常需要为爬网程序安装一些第三方依赖项,例如pymongo,请求等. 其中,我们还需要知道节点上是否已安装某种依赖项,这需要跨服务器通信即分布式网络中的双向通信. 并且此逻辑是通过RPC实现的. 主节点对目标节点进行RPC调用,目标节点运行被调用的方法,并将运行结果(已安装的依赖项列表)返回给客户端,然后客户端返回调用方.

以下代码实现了RPC方法,以将依赖项安装在目标节点上.

// 获取已安装依赖服务
// 继承Service基础类
type GetInstalledDepsService struct {
    msg entity.RpcMessage
}
// 服务端处理方法
// 重载ServerHandle
func (s *GetInstalledDepsService) ServerHandle() (entity.RpcMessage, error) {
    lang := utils.GetRaram("lang", s.msg.Params)
    deps, err := GetInstalledDepsLocal(lang)
    if err != nil {
        s.msg.Error = err.Error()
        return s.msg, err
    }
    resultStr, _ := json.Marshal(deps)
    s.msg.Result = string(resultStr)
    return s.msg, nil
}
// 客户端处理方法
// 重载ClientHandle
func (s *GetInstalledDepsService) ClientHandle() (o interface{}, err error) {
    // 发起 RPC 请求,获取服务端数据
    s.msg, err = ClientFunc(s.msg)()
    if err != nil {
        return o, err
    }
    // 反序列化
    var output []entity.Dependency
    if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
        return o, err
    }
    o = output
    return
}

编写了RPC服务器和客户端处理方法后,就可以轻松编写调用逻辑. 以下是获取远程安装依赖项列表的方法. 先前定义的GetService工厂类首先由GetService工厂类获取,然后调用其客户端处理方法ClientHandle,并返回结果. 就像在本地调用方法一样. 这不简单吗?

// 获取远端已安装依赖
func GetInstalledDepsRemote(nodeId string, lang string) (deps []entity.Dependency, err error) {
    params := make(map[string]string)
    params["lang"] = lang
    s := GetService(entity.RpcMessage{
        NodeId:  nodeId,
        Method:  constants.RpcGetInstalledDepList,
        Params:  params,
        Timeout: 60,
    })
    o, err := s.ClientHandle()
    if err != nil {
        return
    }
    deps = o.([]entity.Dependency)
    return
}

本文主要介绍基于Redis延迟队列的RPC通信方法. 此方法是一种非常安全的方法,无需公开每个节点或服务的IP地址或端口. 而且,这种方式已经使用Golang在Crawlab中实现了双向通信,尤其是goroutine的异步支持,这使得这种方式的实现变得简单. 实际上,该方法理论上非常有效,并且可以支持高并发数据传输.

但是,在Crawlab的实现中仍然存在一些隐患,即,它不限制服务器上并发处理的数量. 因此,如果传输的消息太多,则服务器资源将已满,从而存在处理速度降低甚至停机的风险. 该修补程序是为了限制服务器上的并发数量. 另外,由于时间的限制,作者还没有时间测试这种RPC通信方法的实际传输效率,并且没有添加容错机制. 因此,仍有很大的改进和优化空间.

尽管如此,该方法对于Crawlab的低并发远程通信已足够,并且在实际使用中没有问题,非常稳定. 对于有隐私要求并且不想公开地址信息的开发人员,我们还建议在实际应用中尝试使用此方法.

以上是本文的全部内容. 希望对大家的学习有所帮助. 我也希望每个人都能支持代码农场网络.


本文来自电脑杂谈,转载请注明本文网址:
http://www.pc-fly.com/a/tongxinshuyu/article-152354-1.html

    相关阅读
      发表评论  请自觉遵守互联网相关的政策法规,严禁发布、暴力、反动的言论

      热点图片
      拼命载入中...