--- 摄于 2017 年 9 月 藏川线前段
想要详细了解 CITA 的工作流程,最重要也是最容易的方式就是关注每个模块的 main.rs
文件头部的消息订阅发布通道,通过消息通道可以方便得了解信息流,进而了解整个 CITA 的工作流程。
区块链本质上还是一个分布式网络应用,节点间的通信是非常重要的一个环节。CITA 的 network 模块负责节点间的通信,其中与公链有区别的地方是,network 的配置文件中维护了一个白名单,只有白名单中存在的节点才参与到链的活动,通过监听文件的更改事件来动态增删节点。
Network 模块是 CITA 运行过程中相对比较忙的一个模块了,涉及的功能很多:
在这个系列第一篇的时候,有一句话:
模块间通过 protobuf + RabbitMQ/ZeroMQ 进行通信,节点间通过自定义二进制协议 + protobuf + server/client 进行通信。
network 模块中实现了 client 和 server ,并通过自定义的二进制协议与其他节点进行通信。协议如下:
Start | Full length | Key length | Key value | Message value |
---|---|---|---|---|
xDEADBEEF | u32 | u8(byte) | bytes of a str | a serialize data |
代码:https://github.com/cryptape/cita/blob/develop/cita-network/src/citaprotocol.rs#L102
network 的监听分为三块:MQ 节点内消息监听(三个线程),Server 对外消息监听(一个线程),文件修改事件监听(一个线程)。
监听内部 auth 发来的 transaction 消息,并广播给其他节点:
thread::spawn(move || loop {
let (key, body) = crx_sub_tx.recv().unwrap();
let msg = Message::try_from(&body).unwrap();
trace!("Auth Tx from Local");
con_tx.broadcast(key, msg);
});
监听内部 bft 发来的消息,并广播给其他节点:
thread::spawn(move || loop {
let (key, body) = crx_sub_consensus.recv().unwrap();
let msg = Message::try_from(&body).unwrap();
trace!("Consensus Msg from Local");
con.broadcast(key, msg);
});
监听内部的其他消息,并将消息转发至 network 的消息处理中心(同时处理内部和外部消息的一个线程):
loop {
// Msg from MQ need proc before broadcast
let (key, body) = crx_sub.recv().unwrap();
trace!("handle delivery from {} payload {:?}", key, body);
net_work_tx.send((Source::LOCAL, (key, body))).unwrap();
}
消息处理中心的结构:
pub struct NetWork {
con: Arc<Connection>,
tx_pub: Sender<(String, Vec<u8>)>,
tx_sync: Sender<(Source, (String, Vec<u8>))>,
tx_new_tx: Sender<(String, Vec<u8>)>,
tx_consensus: Sender<(String, Vec<u8>)>,
}
这里面是一堆 channel 的 sender 端 + 一个引用计数指针,指向的是其他节点的 TCP 连接。
代码:https://github.com/cryptape/cita/blob/develop/cita-network/src/network.rs#L37
消息中心的处理主要分为两大块:
connection
中 peers_pair
的数量,然后返回给 jsonrpc代码:https://github.com/cryptape/cita/blob/develop/cita-network/src/connection.rs#L145
一个简单的确认对应 address 节点是否存在的循环。其他操作就是更新地址,以及将消息广播至整个网络。
同步处理这里只是简单介绍一下,因为涉及的模块很多,后面应该会有一篇专门讲解同步流程的文章。(这一块在 0.16 版本发布之前我负责修了一下bug 并优化了部分逻辑)
代码:https://github.com/cryptape/cita/blob/develop/cita-network/src/synchronizer.rs
结构体:
pub struct Synchronizer {
tx_pub: mpsc::Sender<(String, Vec<u8>)>,
con: Arc<Connection>,
current_status: Status,
global_status: Status,
sync_end_height: u64, //current_status <= sync_end_status
is_synchronizing: bool,
latest_status_lists: BTreeMap<u64, VecDeque<u32>>,
block_lists: BTreeMap<u64, Block>,
rand: ThreadRng,
remote_sync_time_out: Instant,
/// local sync error
local_sync_count: u8,
}
这里面维护了很多状态信息,其实也是必然需要考虑的问题,虽然 CITA 有白名单机制,但是节点之间依然不能完全信任,节点接收的外部消息都需要进行必要的验证,所以这里需要维护一些状态:全局(链上)的最新状态(高度)global_status
,以及能够连接的每个节点的高度 latest_status_lists
。在发起同步请求的时候,会根据需要的高度信息,从 latest_status_lists
中随机抽取一个符合条件的节点,向其发出同步请求,如果出现超时或者验证失败等情况,就会再次随机抽取一个节点发起同步。
节点内部的状态也需要 network 进行同步更新 current_status
,同步过程中,数据的接收是乱序的,需要进行排序处理 block_lists
并发送给 executor 和 chain,根据 chain 的入块返回信息删减缓存(ack 机制),同步超时或者重复状态需要根据情况将缓存的 block 重发或者更新缓存(即重新发起同步),记录同步得到的最大高度 sync_end_height
。
每次最多请求 200 个块,处理超时时间是 60 秒。
Network 这个模块是一个相对来说复杂一点的网络应用。这个模块不止是作为转发通信使用,同时还维护了一些状态信息,能够通过状态去判断是否开始需要同步,或者是否需要重新执行同步。
下一篇,由于 auth 需要重构,有可能是 chain 和 executor 混杂起来讲,或者 bft 模块,这个得看情况了,现在还没想好。
请登录后评论
评论区
加载更多