--- 摄于 2017 年 9 月 藏川线前段
这系列文章思路是按照模块从外部往内部讲,forever 模块、 evm 虚拟机实现、数据存储只会涉及接口,具体实现不会详细分析。之后,再联系几个模块讲解一些跨模块的功能,例如数据同步机制(涉及 network、chain、executor)、预执行机制(涉及bft、executor)。好,下面开始主题,Jsonrpc 模块。
CITA 的架构中,负责对外交互的模块有两个,一个是 Jsonrpc,一个是 Network,分别负责与人交互,与 peers 交互,这篇的主题是与人交互的 Jsonrpc。
顺便一提,CITA 项目将 Rust panic 的默认行为从线程退出改成了进程退出,于是就有了 forever 模块,这个模块类似于 goever,用来将意外死去的进程拉起来。
Jsonrpc 相对来说比较简单,提供了两个对外接口,分别是 WebSocket 和 Http,默认都是开启的,如果需要关闭,可以通过配置文件进行调整。
这个模块的核心功能:
这是一个无状态的模块,依赖 Rust 社区的 tokio/future/mio 这一套异步工具,值得期待的是,2018 年这套工具应该会完成稳定的任务,之后网络应用这种 IO 库应该会好写很多。
tokio 系列经历了几次重构,目前 Jsonrpc 用的 hyper 库依赖的并不是最新的版本(最新版本有一个运行时,可以提供线程池及 work-steal 模式的任务分配),而是上一个版本的方案,依赖 tokio-core 作为异步的执行器,所以在 http 接口上,启动了与服务器核心数一致的线程,每个线程一个 core 对外部请求进行分发处理:
代码地址: https://github.com/cryptape/cita/blob/develop/cita-jsonrpc/src/main.rs#L202
let threads: usize = config
.http_config
.thread_number
.unwrap_or_else(num_cpus::get);
for i in 0..threads {
let addr = addr.clone().parse().unwrap();
let tx = tx_relay.clone();
let timeout = http_config.timeout;
let http_responses = Arc::clone(&http_responses);
let allow_origin = http_config.allow_origin.clone();
let _ = thread::Builder::new()
.name(format!("worker{}", i))
.spawn(move || {
let core = Core::new().unwrap();
let handle = core.handle();
let timeout = Duration::from_secs(timeout);
let listener = http_server::listener(&addr, &handle).unwrap();
Server::start(core, listener, tx, http_responses, timeout, &allow_origin);
})
.unwrap();
}
相对来说,WebSocket 只用了一个线程对外进行监听:
代码地址:https://github.com/cryptape/cita/blob/develop/cita-jsonrpc/src/main.rs#L187
处理的逻辑:
将接到的请求,转成 reqlib::Request
+ topic
格式,会生成一个 uuid 作为 index 保存对应的 jsonrpc id /版本号/以及用户的 channel(socket),并插入全局 Response
里,待超时或者 MQ 中返回对应的 Response
内容后,从中取出,返回用户,看代码的时候,可以重点关注一下这个全局 Response
:
/// 类型 Arc<Mutex<HashMap<Uuid, TransferType>>>
let responses = Arc::new(Mutex::new(HashMap::with_capacity(backlog_capacity)));
处理请求代码:https://github.com/cryptape/cita/blob/develop/cita-jsonrpc/src/http_server.rs#L66
代码:https://github.com/cryptape/cita/blob/develop/cita-jsonrpc/src/main.rs#L163
fn forward_service(
topic: String,
req: reqlib::Request,
new_tx_request_buffer: &mut Vec<reqlib::Request>,
time_stamp: &mut SystemTime,
tx_pub: &Sender<(String, Vec<u8>)>,
config: &NewTxFlowConfig,
) {
if RoutingKey::from(&topic) != routing_key!(Jsonrpc >> RequestNewTx) {
let data: Message = req.into();
tx_pub.send((topic, data.try_into().unwrap())).unwrap();
} else {
new_tx_request_buffer.push(req);
trace!(
"New tx is pushed and has {} new tx and buffer time cost is {:?}",
new_tx_request_buffer.len(),
time_stamp.elapsed().unwrap()
);
if new_tx_request_buffer.len() > config.count_per_batch
|| time_stamp.elapsed().unwrap().subsec_nanos() > config.buffer_duration
{
batch_forward_new_tx(new_tx_request_buffer, time_stamp, tx_pub);
}
}
}
接到解析好的 Request
之后,如果 topic
是交易信息,会缓存在 buffer 中,超时或者达到一定量之后向 Auth 模块统一转发,而其他的请求信息就直接转发给对应的模块。
代码:https://github.com/cryptape/cita/blob/develop/cita-jsonrpc/src/main.rs#L231
pub fn handle(&mut self, key: &str, body: &[u8]) {
let mut msg = Message::try_from(body).unwrap();
trace!("get msg from routint_key {}", key);
match RoutingKey::from(key) {
routing_key!(Auth >> Response)
| routing_key!(Chain >> Response)
| routing_key!(Executor >> Response)
| routing_key!(Net >> Response) => {
let content = msg.take_response().unwrap();
trace!("from response request_id {:?}", content.request_id);
let value = { self.responses.lock().remove(&content.request_id) };
if let Some(val) = value {
match val {
TransferType::HTTP((req_info, sender)) => {
let _ = sender.send(Output::from(content, req_info.id, req_info.jsonrpc));
}
TransferType::WEBSOCKET((req_info, sender)) => {
let _ = sender.send(
serde_json::to_string(&Output::from(content, req_info.id, req_info.jsonrpc)).unwrap(),
);
}
}
} else {
warn!("receive lost request_id {:?}", content.request_id);
}
}
_ => {
warn!("receive unexpect key {}", key);
}
}
}
主要就是监听 MQ 信息,根据 ws 或者 http 分别构建 json 数据并返回。
这个模块的逻辑其实非常简单,类似于一个消息接收/分发的机制,它不需要知道消息是什么,只是根据 topic 去转发消息,然后本身有超时控制和辨别请求格式的功能。
下一篇应该是 NetWork,先把交互的两个模块讲完,相对于内部处理,网络这一块大部分人都经常接触,比较容易理解。
顺便一说,在 https://github.com/driftluo/cita-cli 这里,我在写一个对 cita 的命令行工具,可以理解为 client,有兴趣可以一起来写哦。
请登录后评论
评论区
加载更多