自己动手实现区块链

本文基于
https://blog.logrocket.com/how-to-build-a-blockchain-in-rust/
这篇博客,实现一个简单的区块链。

初始化项目

1
cargo new rust-miniblockchain

在cargo.toml中添加依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13

[dependencies]
chrono = "0.4"
sha2 = "0.9.8"
serde = {version = "1.0", features = ["derive"] }
serde_json = "1.0"
libp2p = { version = "0.39", features = ["tcp-tokio", "mdns"] }
tokio = { version = "1.0", features = ["io-util", "io-std", "macros", "rt", "rt-multi-thread", "sync", "time"] }
hex = "0.4"
once_cell = "1.5"
log = "0.4"
pretty_env_logger = "0.4"

  • libp2p负责p2p,是libp2p(基于go)的rust版,后面会有文章详细讨论这个库。
  • tokio 异步运行时
  • serde json序列化
  • chrono 时间戳
  • sha2 sha计算
  • onecell 静态初始化
  • log 日志门面 pretty_env_logger 日志实现

Block

定义区块

基本结构如此:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Block {
pub id: u64,
pub hash: String,
pub previous_hash: String,
pub timestamp: i64,
pub data: String,
pub nonce: u64,
}

pub struct Chain {
pub blocks: Vec<Block>,
}

Block中除data外都属于区块头,data为区块体。对于实现加密货币的场景,data中应包含交易信息及merkle树信息。但是本文着重于区块链本身,而不是局限于加密货币,此处的data只是一个简单的String。

Chain中定义了一个区块链,一个简单的Vec

计算区块链的哈希

使用sha2这个包中的Sha256计算哈希

1
2
3
4
5
6
7
8
9
10
11
12
use sha2::{Digest, Sha256};
impl Block {
fn calculate_hash(&self) -> Vec<u8> {
let mut s = Sha256::new();
s.update(self.id.to_le_bytes());
s.update(self.timestamp.to_le_bytes());
s.update(self.previous_hash.as_bytes());
s.update(self.data.as_bytes());
s.update(self.nonce.to_le_bytes());
s.finalize().as_slice().to_owned()
}
}

创世区块

区块链必须有一个创世区块,任何区块都可以向前回溯到创世区块,是整个链的根,创世区块通常会被硬编码到软件中。

1
2
3
4
5
6
7
8
9
10
11
12
impl Block {
fn genesis() -> Self {
Block {
id: 0,
timestamp: Utc::now().timestamp(),
previous_hash: String::from("创世区块"),
data: String::from("创世区块"),
nonce: 123,
hash: String::default(),
}
}
}

挖矿逻辑

挖矿本身就是根据前一个块构建新块:修改nonce后这里判断前缀是否满足难度,如果不满足,则nonce+1重新计算,直到得到满足难度的hash,此时挖矿完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const DIFFICULTY_PREFIX: &str = "00";
impl Block {
/**
* 当前块是最新块,在此块的基础上mine
*/
fn mine_block(&self, data: String) -> Self {
info!("开始挖矿");
let mut newb = Block {
id: self.id + 1,
hash: String::default(),
previous_hash: self.hash.clone(),
timestamp: Utc::now().timestamp(),
data,
nonce: 0,
};
loop {
let hash = newb.calculate_hash();
if hash.starts_with(DIFFICULTY_PREFIX.as_bytes()) {
info!("挖矿成功! hash: {}", hex::encode(&hash),);
newb.hash = hex::encode(hash);
break;
}
newb.nonce += 1;
}
newb
}
}

测试

构建创世块,然后挖出一个块。

1
2
3
4
5
6
7
8
9
10
11
12
13
#[test]
fn mine() {
let mut b = Block::genesis();
b.hash = hex::encode(b.calculate_hash());
println!("{:?}", serde_json::json!(b));

let s = b.mine_block(String::from("second block"));
println!("{:?}", serde_json::json!(s));
}

--------------
Object {"data": String("创世区块"), "hash": String("83d3d52afa3dfb1e0bd82580bd14600ed6564925091ccbb48a33cb60058f641f"), "id": Number(0), "nonce": Number(123), "previous_hash": String("创世区块"), "timestamp": Number(1674658280)}
Object {"data": String("second block"), "hash": String("3030d9d5a2c85a4db0a01bdc25529518aa0859f9000b3653eefb46acae75f6cc"), "id": Number(1), "nonce": Number(58249), "previous_hash": String("83d3d52afa3dfb1e0bd82580bd14600ed6564925091ccbb48a33cb60058f641f"), "timestamp": Number(1674658280)}

1
2
3
4
5
6
7
8
9
10
11
12
13
impl Chain {
// 初始化
fn new() -> Self {
Self { blocks: vec![] }
}

// 添加创世区块
fn genesis(&mut self) {
let mut g = Block::genesis();
g.hash = hex::encode(g.calculate_hash());
self.blocks.push(g);
}
}

验证链

等于逐个验证块is_block_valid 。验证点包括:

  1. 本块记录的前块hash是否和前块的hash匹配
  2. 本块hash的难度是否匹配
  3. 本块的id是否和前块id匹配
  4. 前块计算hash是否等于本块的hash
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    impl Chain {
    pub fn is_block_valid(&self, block: &Block, previous_block: &Block) -> bool {
    if block.previous_hash != previous_block.hash {
    warn!("块 id: {} 前一个块hash不同", block.id);
    return false;
    } else if !hex::decode(&block.hash)
    .expect("can decode from hex")
    .starts_with(DIFFICULTY_PREFIX.as_bytes())
    {
    warn!("块id: {} 难度不合要求: {}", block.id, DIFFICULTY_PREFIX);
    return false;
    } else if block.id != previous_block.id + 1 {
    warn!("块id: {} 块id不匹配: {}", block.id, previous_block.id);
    return false;
    } else if hex::encode(previous_block.calculate_hash()) != block.previous_hash {
    warn!(
    "块id: {} 和前块id:{} hash不匹配 ",
    block.id, previous_block.id
    );
    return false;
    }
    true
    }

    fn is_chain_valid(&self, chain: &[Block]) -> bool {
    for i in 0..chain.len() {
    if i == 0 {
    continue;
    }
    let first = chain.get(i - 1).expect("has to exist");
    let second = chain.get(i).expect("has to exist");
    if !self.is_block_valid(second, first) {
    return false;
    }
    }
    true
    }
    }

测试

1
2
3
4
5
6
7
8
9
#[test]
fn is_chain_valid_test() {
let mut c = Chain::new();
c.genesis();
let b = c.latest().mine_block(String::from("value"));
c.try_add_block(b);
assert_eq!(c.blocks.len(), 2);
assert_eq!(c.is_chain_valid(c.blocks.as_slice()), true);
}

选择最长链

先判断合法性,如果都合法选择最长的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
impl Chain {
pub fn choose_chain(&mut self, local: Vec<Block>, remote: Vec<Block>) -> Vec<Block> {
let is_local_valid = self.is_chain_valid(&local);
let is_remote_valid = self.is_chain_valid(&remote);

if is_local_valid && is_remote_valid {
if local.len() >= remote.len() {
local
} else {
remote
}
} else if is_remote_valid && !is_local_valid {
remote
} else if !is_remote_valid && is_local_valid {
local
} else {
panic!("local and remote chains are both invalid");
}
}
}

至此区块链本地模型基本完成,目前简单版的区块链不支持API,只支持stdin作为命令输入。下面做Event部分,这部分能令区块链支持p2p,命令输入。

Event

目前为了简单libp2p只用了floodsub的广播方式,将消息发送给每个节点。

定义消息体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#[derive(Debug, Serialize, Deserialize)]
pub struct ChainResponse {
pub blocks: Vec<Block>,
pub receiver: String,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ChainRequest {
pub from_peer_id: String,
}

pub enum Event {
ChainResponse(ChainResponse),
Cmd(String),
Init,
}

ChainResponse 用于查询目标节点所有区块
ChainRequest 用于询问目标节点区块
Event tokio处理的事件类型

此结构体用于注册libp2p回调。因此,所有业务状态要么注册channel丢进来,要么这里直接将Chain放进来。

1
2
3
4
5
6
7
8
9
10
11
12
#[derive(NetworkBehaviour)]
pub struct App {
pub floodsub: Floodsub,
pub mdns: Mdns,
#[behaviour(ignore)]
pub chain_sender: UnboundedSender<ChainResponse>,
#[behaviour(ignore)]
pub init_sender: UnboundedSender<bool>,
#[behaviour(ignore)]
pub chain: Chain,
}

用App注册回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

// 处理FloodSub事件
impl NetworkBehaviourEventProcess<FloodsubEvent> for App {
fn inject_event(&mut self, event: FloodsubEvent) {
if let FloodsubEvent::Message(msg) = event {
// 如果这里是别的节点返回的链,我们需对比下,选择最长的链
if let Ok(resp) = serde_json::from_slice::<ChainResponse>(&msg.data) {
// clone 区块返回
if resp.receiver == PEER_ID.to_string() {
info!("遍历区块:");
resp.blocks.iter().for_each(|r| info!("{:?}", r));

// 校验并选择最长链
self.chain.blocks = self.chain.choose_chain(self.chain.blocks.clone(), resp.blocks);
}
// 如果这是别的节点查询我们的链,我们需要clone链返回
} else if let Ok(resp) = serde_json::from_slice::<ChainRequest>(&msg.data) {
let peer_id = resp.from_peer_id;
// 询问本节点的区块
if PEER_ID.to_string() == peer_id {
// clone区块发送给查询者,这里没有同步发送,而是交给channel异步处理
if let Err(e) = self.chain_sender.send(ChainResponse {
blocks: self.chain.blocks.clone(),
receiver: msg.source.to_string(),
}) {
error!("error sending response via channel, {}", e);
}
}
// 如果是别的节点发来的区块,我们尝试将其加入到区块链中
} else if let Ok(block) = serde_json::from_slice::<Block>(&msg.data) {
info!("接受到新块 {}", msg.source.to_string());
self.chain.try_add_block(block);
}
}
}
}

impl NetworkBehaviourEventProcess<MdnsEvent> for App {
fn inject_event(&mut self, event: MdnsEvent) {
match event {
// 发现新节点
MdnsEvent::Discovered(discovered_list) => {
for (peer, _addr) in discovered_list {
self.floodsub.add_node_to_partial_view(peer);
}
}
// 节点过期
MdnsEvent::Expired(expired_list) => {
for (peer, _addr) in expired_list {
if !self.mdns.has_node(&peer) {
self.floodsub.remove_node_from_partial_view(&peer);
}
}
}
}
}
}

后面会有文章具体讨论libp2p和其中的floodsub,mdns的具体细节,本文只需知道mdns是p2p发现,floodsub是协议就可以了。
需要注意如果是查询的请求,这里将其转给channel异步处理避免阻塞libp2p event处理。

cmd

命令处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
use std::collections::HashSet;

use libp2p::Swarm;
use log::info;

use crate::event::{App, BLOCK_TOPIC};

// 获取节点列表
pub fn get_list_peers(swarm: &Swarm<App>) -> Vec<String> {
let nodes = swarm.behaviour().mdns.discovered_nodes();
let mut unique_peers = HashSet::new();
for peer in nodes {
unique_peers.insert(peer);
}
unique_peers.iter().map(|p| p.to_string()).collect()
}

// 处理打印节点命令
pub fn handle_print_peers(swarm: &Swarm<App>) {
let peers = get_list_peers(swarm);
info!("当前节点:");
peers.iter().for_each(|p| info!("{}", p));
}

// 处理打印区块命令
pub fn handle_print_chain(swarm: &Swarm<App>) {
info!("本地块:");
let pretty_json =
serde_json::to_string_pretty(&swarm.behaviour().chain.blocks).expect("can jsonify blocks");
info!("{}", pretty_json);
}

// 处理创建区块命令
pub fn handle_create_block(cmd: &str, swarm: &mut Swarm<App>) {
if let Some(data) = cmd.strip_prefix("create b") {
let behaviour = swarm.behaviour_mut();
// 最后一块开始挖矿
let latest_block = behaviour
.chain
.blocks
.last()
.expect("there is at least one block");

// 挖矿
let block = latest_block.mine_block(data.to_owned());
let json = serde_json::to_string(&block).expect("can jsonify request");

// 添加到本地区块链
behaviour.chain.blocks.push(block);
info!("开始广播新块");
behaviour
.floodsub
.publish(BLOCK_TOPIC.clone(), json.as_bytes());
}
}

最后就是tokio的主流程 main.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
use libp2p::{
core::upgrade,
futures::StreamExt,
mplex,
noise::{Keypair, NoiseConfig, X25519Spec},
swarm::{Swarm, SwarmBuilder},
tcp::TokioTcpConfig,
Transport,
};
use log::{error, info};

use std::time::Duration;
use tokio::{
io::{stdin, AsyncBufReadExt, BufReader},
select, spawn,
sync::mpsc,
time::sleep,
};

const DIFFICULTY_PREFIX: &str = "00";
mod block;
mod chain;
mod event;
mod cmd;
use block::Block;
use chain::Chain;

#[tokio::main]
async fn main() {
// 日志
pretty_env_logger::init();

info!("当前难度:{}", DIFFICULTY_PREFIX);
info!("Peer Id: {}", event::PEER_ID.clone());
let (chain_sender, mut chain_receiver) = mpsc::unbounded_channel();
let (init_sender, mut init_rcv) = mpsc::unbounded_channel();

// p2p密钥
let auth_keys = Keypair::<X25519Spec>::new()
.into_authentic(&event::KEYS)
.expect("can create auth keys");

// p2p配置
let transp = TokioTcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(NoiseConfig::xx(auth_keys).into_authenticated())
.multiplex(mplex::MplexConfig::new())
.boxed();

// 创建应用
let app = event::App::new(Chain::new(), chain_sender, init_sender.clone()).await;

// 创建节点
let mut swarm = SwarmBuilder::new(transp, app, *event::PEER_ID)
.executor(Box::new(|fut| {
spawn(fut);
}))
.build();

let mut stdin = BufReader::new(stdin()).lines();

// 节点开始监听
Swarm::listen_on(
&mut swarm,
"/ip4/0.0.0.0/tcp/0"
.parse()
.expect("can get a local socket"),
)
.expect("swarm can be started");

// 延迟1秒,等待节点启动
spawn(async move {
sleep(Duration::from_secs(1)).await;
init_sender.send(true).expect("can send init event");
});

// 主事件循环
loop {
let evt = {
select! {
// 从标准输入读取命令
line = stdin.next_line() => Some(event::Event::Cmd(
line.expect("can get line").expect("can read line from stdin")
)),

response = chain_receiver.recv() => {
Some(event::Event::ChainResponse(response.expect("response exists")))
},
// 初始化
_init = init_rcv.recv() => {
Some(event::Event::Init)
}
// 这里可以打印下swarm的消息
_event = swarm.select_next_some() => {
info!("swarm event: {:?}", _event);
None
},
}
};

if let Some(event) = evt {
match event {
event::Event::Init => {
// 创世区块
swarm.behaviour_mut().chain.genesis();

// 获取所有节点
let peers = cmd::get_list_peers(&swarm);

// 如果有节点,向最后一个节点请求区块链
if !peers.is_empty() {
let req = event::ChainRequest {
from_peer_id: peers
.iter()
.last()
.expect("at least one peer")
.to_string(),
};

let json = serde_json::to_string(&req).expect("can jsonify request");
// floodsub广播
swarm
.behaviour_mut()
.floodsub
.publish(event::CHAIN_TOPIC.clone(), json.as_bytes());
}
}
event::Event::ChainResponse(resp) => {
let json = serde_json::to_string(&resp).expect("can jsonify response");
// 发送给其他节点
swarm
.behaviour_mut()
.floodsub
.publish(event::CHAIN_TOPIC.clone(), json.as_bytes());
}
// 解析标准输入命令
event::Event::Cmd(line) => match line.as_str() {
"ls p" => cmd::handle_print_peers(&swarm),
cmd if cmd.starts_with("ls c") => cmd::handle_print_chain(&swarm),
cmd if cmd.starts_with("create b") => {
cmd::handle_create_block(cmd, &mut swarm)
}
_ => error!("unknown command"),
},
}
}
}
}

运行方式:

1
RUST_LOG=info cargo run

命令

  1. ls c 展示当前节点区块
  2. ls p 展示所有节点(除自身外)
  3. create b {data} 当前节点添加区块,data可以是字符串。

项目代码
https://github.com/fenixc9/rust-miniblockchain