藏川线前段

--- 摄于 2017 年 9 月 藏川线前段

CITA系列之网络通信——Network

想要详细了解 CITA 的工作流程,最重要也是最容易的方式就是关注每个模块的 main.rs 文件头部的消息订阅发布通道,通过消息通道可以方便得了解信息流,进而了解整个 CITA 的工作流程。

区块链本质上还是一个分布式网络应用,节点间的通信是非常重要的一个环节。CITA 的 network 模块负责节点间的通信,其中与公链有区别的地方是,network 的配置文件中维护了一个白名单,只有白名单中存在的节点才参与到链的活动,通过监听文件的更改事件来动态增删节点。

Network

Network 模块是 CITA 运行过程中相对比较忙的一个模块了,涉及的功能很多:

  1. 转发共识投票/提议的消息。
  2. 维护节点状态,监听全局状态,根据具体情况发起同步流程。
  3. 接收转发同步数据。
  4. 转发节点收到的 transaction 消息。
  5. 根据白名单动态增删节点信息。

代码分享

在这个系列第一篇的时候,有一句话:

模块间通过 protobuf + RabbitMQ/ZeroMQ 进行通信,节点间通过自定义二进制协议 + protobuf + server/client 进行通信。

network 模块中实现了 client 和 server ,并通过自定义的二进制协议与其他节点进行通信。协议如下:

Start Full length Key length Key value Message value
xDEADBEEF u32 u8(byte) bytes of a str a serialize data

代码:https://github.com/cryptape/cita/blob/develop/cita-network/src/citaprotocol.rs#L102

监听

network 的监听分为三块:MQ 节点内消息监听(三个线程),Server 对外消息监听(一个线程),文件修改事件监听(一个线程)。

监听内部 auth 发来的 transaction 消息,并广播给其他节点:

thread::spawn(move || loop {
    let (key, body) = crx_sub_tx.recv().unwrap();
    let msg = Message::try_from(&body).unwrap();
    trace!("Auth Tx from Local");
    con_tx.broadcast(key, msg);
});

监听内部 bft 发来的消息,并广播给其他节点:

thread::spawn(move || loop {
    let (key, body) = crx_sub_consensus.recv().unwrap();
    let msg = Message::try_from(&body).unwrap();
    trace!("Consensus Msg from Local");
    con.broadcast(key, msg);
});

监听内部的其他消息,并将消息转发至 network 的消息处理中心(同时处理内部和外部消息的一个线程):

loop {
    // Msg from MQ need proc before broadcast
    let (key, body) = crx_sub.recv().unwrap();
    trace!("handle delivery from {} payload {:?}", key, body);
    net_work_tx.send((Source::LOCAL, (key, body))).unwrap();
}
消息处理中心

消息处理中心的结构:

pub struct NetWork {
    con: Arc<Connection>,
    tx_pub: Sender<(String, Vec<u8>)>,
    tx_sync: Sender<(Source, (String, Vec<u8>))>,
    tx_new_tx: Sender<(String, Vec<u8>)>,
    tx_consensus: Sender<(String, Vec<u8>)>,
}

这里面是一堆 channel 的 sender 端 + 一个引用计数指针,指向的是其他节点的 TCP 连接。

代码:https://github.com/cryptape/cita/blob/develop/cita-network/src/network.rs#L37

消息中心的处理主要分为两大块:

  1. 接到内部消息:
    • chain 状态转发给 network 的管理同步线程
    • jsonrpc 的查询 peers 数量消息,直接查 connectionpeers_pair 的数量,然后返回给 jsonrpc
    • chain 发送的同步结果,转发到 network 管理同步的线程
  2. 接到 server 转发的远端消息:
    • 远端同步数据或者是状态,转发到 network 管理同步的线程
    • 远端同步请求,更改 topic 并将消息转发到 MQ 中,由 chain 处理
    • 远端 Auth 交易信息,更改 topic 并将消息转发到 MQ 中,由 auth 处理
    • 远端 bft 签名好的 proposol 或者 投票消息,更改 topic 并将消息转发到 MQ 中,由 bft、executor 处理
客户端

代码:https://github.com/cryptape/cita/blob/develop/cita-network/src/connection.rs#L145

一个简单的确认对应 address 节点是否存在的循环。其他操作就是更新地址,以及将消息广播至整个网络。

管理同步

同步处理这里只是简单介绍一下,因为涉及的模块很多,后面应该会有一篇专门讲解同步流程的文章。(这一块在 0.16 版本发布之前我负责修了一下bug 并优化了部分逻辑)

代码:https://github.com/cryptape/cita/blob/develop/cita-network/src/synchronizer.rs
结构体:

pub struct Synchronizer {
    tx_pub: mpsc::Sender<(String, Vec<u8>)>,
    con: Arc<Connection>,
    current_status: Status,
    global_status: Status,
    sync_end_height: u64, //current_status <= sync_end_status
    is_synchronizing: bool,
    latest_status_lists: BTreeMap<u64, VecDeque<u32>>,
    block_lists: BTreeMap<u64, Block>,
    rand: ThreadRng,
    remote_sync_time_out: Instant,
    /// local sync error
    local_sync_count: u8,
}

这里面维护了很多状态信息,其实也是必然需要考虑的问题,虽然 CITA 有白名单机制,但是节点之间依然不能完全信任,节点接收的外部消息都需要进行必要的验证,所以这里需要维护一些状态:全局(链上)的最新状态(高度)global_status,以及能够连接的每个节点的高度 latest_status_lists。在发起同步请求的时候,会根据需要的高度信息,从 latest_status_lists 中随机抽取一个符合条件的节点,向其发出同步请求,如果出现超时或者验证失败等情况,就会再次随机抽取一个节点发起同步。

节点内部的状态也需要 network 进行同步更新 current_status,同步过程中,数据的接收是乱序的,需要进行排序处理 block_lists 并发送给 executor 和 chain,根据 chain 的入块返回信息删减缓存(ack 机制),同步超时或者重复状态需要根据情况将缓存的 block 重发或者更新缓存(即重新发起同步),记录同步得到的最大高度 sync_end_height

每次最多请求 200 个块,处理超时时间是 60 秒。

总结

Network 这个模块是一个相对来说复杂一点的网络应用。这个模块不止是作为转发通信使用,同时还维护了一些状态信息,能够通过状态去判断是否开始需要同步,或者是否需要重新执行同步。

下一篇,由于 auth 需要重构,有可能是 chain 和 executor 混杂起来讲,或者 bft 模块,这个得看情况了,现在还没想好。

评论区

加载更多

登录后评论