巴比特论坛

发表于 2016-3-3 19:57:04 | 显示全部楼层
本帖最后由 imfly 于 2016-3-3 19:58 编辑

比特币可能失败,但技术不会。无论你风云如何变换,我依然坚持做自己。越往后,技术性越强,不感兴趣的,看看标题就足够了。本文已在技术社区同步发布。


前言

加密货币都是去中心化的应用,去中心化的基础就是P2P网络,其作用和地位不言而喻,无可替代。当然,对于一个不开源的所谓私链(私有区块链),是否必要,尚无定论。

事实上,P2P网络不是什么新技术。但是,使用Nodejs开发的P2P网络,确实值得围观。这一篇,我们就来看看Ebookcoin的点对点网络是如何实现的。

源码

主要源码地址:

peer.js:https://github.com/Ebookcoin/ebookcoin/blob/master/modules/peer.js

transport.js:https://github.com/Ebookcoin/ebookcoin/blob/master/modules/transport.js

router.js:https://github.com/Ebookcoin/ebookcoin/blob/master/helpers/router.js

类图

class.png

流程图

activity.png

解读

基于http的web应用,抓住路由的定义、设计与实现,是快速弄清业务逻辑的简单方法。目前,分析的是`modules`文件夹下的各个模块文件,这些模块基本都是独立的Express微应用,在开发和设计上相互独立,各不冲突,逻辑清晰,这为学习分析,提供了便利。

1.路由扩展

任何应用,只要提供Web访问能力或第三方访问的Api,都需要提供从地址到逻辑的请求分发功能,这就是路由。Ebookcoin是基于http协议的Express应用,Express底层基于Nodejs的connect模块,因此其路由设计简单而灵活。

前面,在入门部分,已经讲到对路由的分拆调用,这里是其简单实现。先看看`helper/router.js`吧。
  1. //27行
  2. varRouter=function(){
  3.         varrouter=require('express').Router();

  4.         router.use(function(req,res,next){
  5.                 res.header("Access-Control-Allow-Origin","*");
  6.                 res.header("Access-Control-Allow-Headers","Origin,X-Requested-With,Content-Type,Accept");
  7.                 next();
  8.         });

  9.         router.map=map;

  10.         returnrouter;
  11. }
  12. ...
复制代码
这段代码定义了一个Express路由器`Router`,并扩展了两个功能:

  • 允许任何客户端调用。其实,就是设置了跨域请求,选项`Access-Control-Allow-Origin`设置为`*`,自然任何IP和端口的节点都可以访问和被访问。
  • 添加了地址映射方法。该方法的主要内容如下:

  1. //3行
  2. functionmap(root,config){
  3.         varrouter=this;
  4.         Object.keys(config).forEach(function(params){
  5.                 varroute=params.split("");
  6.                 if(route.length!=2||["post","get","put"].indexOf(route[0])==-1){
  7.                         throwError("wrongmapconfig");
  8.                 }
  9.                 router[route[0]](route[1],function(req,res,next){
  10.                         root[config[params]]({"body":route[0]=="get"?req.query:req.body},function(err,response){
  11.                                 if(err){
  12.                                         res.json({"success":false,"error":err});
  13.                                 }else{
  14.                                         returnres.json(extend({},{"success":true},response));
  15.                                 }
  16.                         });
  17.                 });
  18.         });
  19. }
复制代码
该方法,接受两个对象作为参数:

  • root:定义了所要开放Api的逻辑函数;
  • config:定义了路由和root定义的函数的对应关系。


其运行的结果,就相当于:
  1. ```
  2. router.get('/peers',function(req,res,next){
  3.         root.getPeers(...);
  4. })
复制代码
```

这里关键的小技巧是,在js代码中,对象也是hash值,root.getPeers()与root['getPeers']()是一致的。不过后者可以用字符串变量代替,更加灵活,有点像ruby里的元编程。这是脚本语言的优势(简单的字符串拼接处理)。

扩展一下,在类似sails的框架(基于express)里,很多都是可以使用类似`config.json`的文件直接配置的,包括路由。参考这个函数,很容易理解和实现。

2.节点路由

很轻松就能在`peer.js`里找到上述map方法的使用:
  1. ```
  2. //3行
  3. Router=require('../helpers/router.js')

  4. //25
  5. private.attachApi=function(){
  6.         varrouter=newRouter();

  7.         router.use(function(req,res,next){
  8.                 if(modules)returnnext();
  9.                 res.status(500).send({success:false,error:"Blockchainisloading"});
  10.         });

  11.         //34行
  12.         router.map(shared,{
  13.                 "get/":"getPeers",
  14.                 "get/version":"version",
  15.                 "get/get":"getPeer"
  16.         });

  17.         router.use(function(req,res){
  18.                 res.status(500).send({success:false,error:"APIendpointnotfound"});
  19.         });

  20. //44行
  21.         library.network.app.use('/api/peers',router);
  22.         library.network.app.use(function(err,req,res,next){
  23.                 if(!err)returnnext();
  24.                 library.logger.error(req.url,err.toString());
  25.                 res.status(500).send({success:false,error:err.toString()});
  26.         });
  27. };
  28. ```
复制代码
上面代码的34行,可以直观想象到,会有类似`/version`的路由出现,44行是express应用,这里就是将定义好的路由放在`/api/peers`前缀之下,可以确信`peer.js`文件提供了下面3个公共Api地址:

http://ip:port/api/peers/

http://ip:port/api/peers/version

http://ip:port/api/peers/get

当然,是不是可以直接这么调用,要看具体对应的函数是否还有其他的参数要求,比如:`/api/peers/get`,按照restful的api设计原则,可以理解为是获得具体某个节点信息,那么总该给个`id`之类的限定条件吧。看源码:

  1. //455行
  2. library.scheme.validate(query,{
  3.                 type:"object",
  4.                 properties:{
  5.                         ip_str:{
  6.                                 type:"string",
  7.                                 minLength:1
  8.                         },
  9.                         port:{
  10.                                 type:"integer",
  11.                                 minimum:0,
  12.                                 maximum:65535
  13.                         }
  14.                 },
  15.                 required:['ip_str','port']
  16.         },function(err){
  17.                 ...
  18.                 //480行
  19.                 private.getByFilter({
  20.                         ...
  21.                 });
  22.         });
复制代码
这里,在具体运行过程中,library就是`app.js`里传过来的`scope`,该参数包含的scheme代表了一个`z_schema`实例。

`z_schema`是一个第三方组件,具体请看参考链接。该组件提供了json数据格式验证功能。上述代码的意思是:对请求参数`query`进行验证,验证规则是:object类型,属性`ip_str`要求长度不小于1的字符串,属性`port`要求0~65535之间的整数,并且都不能空(必需)。

这就说明,我们应该这样请求`http://ip:port/api/peers/get?ip_str=0.0.0.0&port=1234`,不然会返回错误信息。回头看看`getPeers`方法的实现,没有`required`字段,对应可以直接访问`http://ip:port/api/peers/`。

看480行,上面的地址,都会调用`private.getByFilter()`,并由它从sqlite数据库里查询数据表`peers`。这里涉及到[`dblite`第三方组件][](请看参考链接),对请求操作sqlite数据库进行了简单封装。

3.节点保存

大多数应用,读数据相对简单,难在写数据。上面的代码,都是`get`请求,可以查寻节点及其信息。我们自然会问,查询的信息从哪里来?初始的节点在哪里?节点变更了,怎么办?

(1)初始化节点

从现实角度考虑,在一个P2P网络中,一个孤立的节点,在没有其他任何节点信息的情况下,仅仅靠网络扫描去寻找其他节点,将是一件很难完成的事情,更别提高效和安全了。

因此,在运行软件之前,初始化一些节点供联网使用,是最简单直接的解决方案。这个在配置文件`config.json`里,有直接体现:
  1. ```
  2. //config.json15行
  3. "peers":{
  4.                 "list":[],
  5.                 "blackList":[],
  6.                 "options":{
  7.                                 "timeout":4000
  8.                 }
  9. },
  10. ...
  11. ```

  12. list的数据格式为:

  13. ```
  14. [
  15.         {
  16.                 ip:0.0.0.0,
  17.                 port:7000
  18.         },
  19.         ...
  20. ]
  21. ```
复制代码
当然,也可以在启动的时候,通过参数`--peers1.2.3.4:70001,2.1.2.3:7002`提供(代码见`app.js`47行)。

(2)写入节点

写入节点,就是持久化,或者保存到数据库,或者保存到某个文件。这里保存到sqlite3数据库里的`peers`表了,代码如下:
  1. ```
  2. //peer.js347行
  3. Peer.prototype.onBlockchainReady=function(){
  4.         async.eachSeries(library.config.peers.list,function(peer,cb){
  5.                 library.dbLite.query("INSERTORIGNOREINTOpeers(ip,port,state,sharePort)VALUES($ip,$port,$state,$sharePort)",{
  6.                         ip:ip.toLong(peer.ip),
  7.                         port:peer.port,
  8.                         state:2,//初始状态为2,都是健康的节点
  9.                         sharePort:Number(true)
  10.                 },cb);
  11.         },function(err){
  12.                 if(err){
  13.                         library.logger.error('onBlockchainReady',err);
  14.                 }

  15.                 private.count(function(err,count){
  16.                         if(count){
  17.                                 private.updatePeerList(function(err){
  18.                                         err&&library.logger.error('updatePeerList',err);
  19.                                         library.bus.message('peerReady');
  20.                                 })
  21.                                 library.logger.info('Peersready,stored'+count);
  22.                         }else{
  23.                                 library.logger.warn('Peerslistisempty');
  24.                         }
  25.                 });
  26.         });
  27. }
  28. ```
复制代码
这段代码的意思是,当区块链(后面篇章分析)加载完毕的时候(触发事件),依次将配置的节点写入数据库,如果数据库已经存在相同的记录就忽略,然后更新节点列表,触发节点加载完毕事件。

这里对数据库`Sqlite`的插入操作,插入语句是`library.dbLite.query("INSERTORIGNOREINTOpeers`,有意思的是`IGNORE`操作字符串,是sqlite3支持的(见参考),当数据库有相同记录的时候,该记录被忽略,继续往下执行。

执行成功,就会调用`library.bus.message('peerReady')`,进而触发`peerReady`事件。该事件的功能就是:

(3)更新节点

事件`onPeerReady`函数,如下:
  1. ```
  2. //peer.js374行
  3. Peer.prototype.onPeerReady=function(){
  4.         setImmediate(functionnextUpdatePeerList(){
  5.                 private.updatePeerList(function(err){
  6.                         err&&library.logger.error('updatePeerListtimer',err);
  7.                         setTimeout(nextUpdatePeerList,60*1000);
  8.                 })
  9.         });

  10.         setImmediate(functionnextBanManager(){
  11.                 private.banManager(function(err){
  12.                         err&&library.logger.error('banManagertimer',err);
  13.                         setTimeout(nextBanManager,65*1000)
  14.                 });
  15.         });
  16. }
  17. ```
复制代码
两个`setImmediate`函数的调用,一个循环更新节点列表,一个循环更新节点状态。

第一个循环调用

看看第一个循环调用的函数`updatePeerList`,
  1. ```
  2. private.updatePeerList=function(cb){
  3.         //53行
  4.         modules.transport.getFromRandomPeer({
  5.                 api:'/list',
  6.                 method:'GET'
  7.         },function(err,data){
  8.                 ...
  9.                 library.scheme.validate(data.body,{
  10.                                         ...
  11.                                         //124行
  12.                                         self.update(peer,cb);
  13.                                 });
  14.                         },cb);
  15.                 });
  16.         });
  17. };
  18. ```
复制代码
看53行,我们知道,程序通过`transport`模块的`.getFromRandomPeer`方法,逐个随机的验证节点信息,并将其做删除和更新处理。如此一来,各种调用关系更加清晰,看流程图更加直观。`.getFromRandomPeer`的代码:

```
//transport.js474行
Transport.prototype.getFromRandomPeer=function(config,options,cb){
        ...

        //481行
        async.retry(20,function(cb){
                modules.peer.list(config,function(err,peers){
                        if(!err&&peers.length){
                                varpeer=peers[0];

                                //485行
                                self.getFromPeer(peer,options,cb);
                        }else{
                                returncb(err||"Nopeersindb");
                        }
                });
...
};
```

代码很简单,重要的是理解`async.retry`的用法(下篇技术分享,详细学习),该方法就是要重复调用第一个task函数20次,有正确返回结果就传给回调函数。这里,只要查到一个节点,就会传给485行的`getFromPeer`函数,该函数是检验处理现存节点的核心函数,代码如下:

```
//transport.js500行
Transport.prototype.getFromPeer=function(peer,options,cb){
        ...
        varreq={
                //519行:获得节点地址
                url:'http://'+ip.fromLong(peer.ip)+':'+peer.port+url,
                ...
        };

        //532行:使用`request`组件发送请求
        returnrequest(req,function(err,response,body){
                if(err||response.statusCode!=200){
                        ...
                        if(peer){
                                if(err&&(err.code=="ETIMEDOUT"||err.code=="ESOCKETTIMEDOUT"||err.code=="ECONNREFUSED")){

                                        //542行:对于无法请求的,自然要删除
                                        modules.peer.remove(peer.ip,peer.port,function(err){
                                        ...
                                        });
                                }else{
                                        if(!options.not_ban){

                                                //549行:对于状态码不是200的,比如304等禁止状态,就要更改其状态
                                                modules.peer.state(peer.ip,peer.port,0,600,function(err){
                                                ...
                                                });
                                        }
                                }
                        }
                        cb&&cb(err||('requeststatuscode'+response.statusCode));
                        return;
                }

                ...
                if(port>0&&port<=65535&&response.headers['version']==library.config.version){
                        //595行:一切问题都不存在
                        modules.peer.update({
                                ip:peer.ip,
                                port:port,
                                state:2,//598行:看来健康的节点状态为2
                                ...
        });
}
```

这里最重要的是532行,`request`第三方组件的使用,请看参考链接。官方定义为简单的http客户端,功能足够强大,可以模拟浏览器访问信息,经常被用来做测试。

第二个循环调用

第二个循环调用的函数很简单,就是循环更改`state`和`clock`字段,主要是将禁止的状态`state=0`,修改为`1`,如下:

```
//142行
private.banManager=function(cb){
        library.dbLite.query("UPDATEpeersSETstate=1,clock=nullwhere(state=0andclock-$now<0)",{now:Date.now()},cb);
}
```

综上,整个P2P网络的读写和更新都已经清楚,回头再看活动图和类图,就更加明朗了。

最后,补充一下数据库里,节点表格`peers`的字段信息:id,ip,port,state,os,sharePort,version,clock

总结

本篇,重点阅读了`peer.js`文件,学习了一个使用Nodejs开发的P2P网络架构,其特点是:

  • 产品提供初始节点列表,保障了初始化节点快速完成,不至于成为孤立节点;
  • 节点具备跨域访问能力,任何节点之间都可以自由访问;
  • 节点具备自我更新能力,定期查询和更新死掉的节点,保障网络始终畅通;


一旦达到一定的节点数量,就会形成一个互联互通的`不死网络`。搭建在这种网络上的服务,会充满怎样的诱惑?加密货币为什么会被认为是下一代互联网?这加起来不足千行的代码,可以给我们足够多的遐想空间。

这部分代码,涉及到`dblite,request,z_schema`等第三方组件,以及Ebookcoin自行实现的事件处理方法`library.bus`(在`app.js`文件的行),都很简单,不再分享或赘述,请自行查阅。本篇涉及的代码中,关于回调的设计很多,值得总结和研究。`async`组件,被反复使用,有必须汇总一下,请关注后续的技术分享。

链接

本系列文章即时更新,若要掌握最新内容,请关注下面的链接

本源文地址:https://github.com/imfly/bitcoin-on-nodejs

电子书阅读:http://book.btcnodejs.com

电子书下载:[下载页面][][PDF文件][][ePub文件][][Mobi文件][]

Ebookcoin官方开发交流QQ群:185046161

参考

z_schema组件:https://github.com/Ebookcoin/z_schema

dblite组件:https://github.com/Ebookcoin/dblite

request组件:http://github.com/request/request

SQL As Understood By SQLite:https://www.sqlite.org/lang_conflict.html




补充内容 (2016-3-3 20:02):
【nodejs开发加密货币】开栏的话
http://8btc.com/thread-27448-1-1.html

打赏

参与人数 1金币 +3250 收起 理由
miner + 50 很给力!

查看全部打赏

成败都是积累,攻受都是成长!《Nodejs开发加密货币》

4条回复 跳转到指定楼层

bitbybit | 副船长 | 发表于 2016-3-3 22:28:00 来自手机版 | 显示全部楼层
我好像看到过一个币,叫node币的
imfly | 版主 | 发表于 2016-3-4 08:17:04 | 显示全部楼层
bitbybit 发表于 2016-3-3 22:28
我好像看到过一个币,叫node币的

是的,各种币实在很多。我个人觉得,加密货币的价值不在币,而在应用。所以,我也不太关心什么币,目标在什么样的应用能够落地,让普通人也能用上。
成败都是积累,攻受都是成长!《Nodejs开发加密货币》
miner | 管理员 | 发表于 2016-3-4 10:10:17 | 显示全部楼层
精品原创! 干货需要慢慢消化。
vincentwang0736 | 水手 | 发表于 2017-8-27 16:51:23 | 显示全部楼层
受益匪浅,最近也在开始学习区块链,请问楼主是怎么入门的啊
高级模式
您需要登录后才可以发帖 登录 | 立即注册 | 用新浪微博登录

本版积分规则

搜索

0关注 1粉丝 41主题

作者的其他主题

返回顶部 返回列表

登录

分享 发帖