From df6e2055a3a4eafa18220e78f41e4d3b3e27694c Mon Sep 17 00:00:00 2001 From: Kelvin Lu Date: Fri, 11 May 2018 20:24:08 -0400 Subject: [PATCH] PE/MSE Implementation for WebTorrent --- index.js | 16 +++++++++++ lib/peer.js | 71 ++++++++++++++++++++++++++++++++++++++++++++++--- lib/tcp-pool.js | 22 +++++++++++++-- 3 files changed, 104 insertions(+), 5 deletions(-) diff --git a/index.js b/index.js index 65dd70c4..07d3c675 100644 --- a/index.js +++ b/index.js @@ -16,6 +16,7 @@ var parseTorrent = require('parse-torrent') var path = require('path') var Peer = require('simple-peer') var randombytes = require('randombytes') +var sha1 = require('simple-sha1') var speedometer = require('speedometer') var zeroFill = require('zero-fill') @@ -459,6 +460,21 @@ WebTorrent.prototype._debug = function () { debug.apply(null, args) } +WebTorrent.prototype._getByHash = function (infoHashHash) { + var i, torrent + var len = this.torrents.length + var hReq2 = Buffer.from('req2', 'utf8').toString('hex') + + for (i = 0; i < len; i++) { + torrent = this.torrents[i] + if (infoHashHash === sha1.sync(Buffer.from(hReq2 + torrent.infoHash, 'hex'))) { + return torrent + } + } + + return null +} + /** * Check if `obj` is a node Readable stream * @param {*} obj diff --git a/lib/peer.js b/lib/peer.js index 01a3935f..2374e882 100644 --- a/lib/peer.js +++ b/lib/peer.js @@ -1,5 +1,6 @@ var arrayRemove = require('unordered-array-remove') var debug = require('debug')('webtorrent:peer') +var sha1 = require('simple-sha1') var Wire = require('bittorrent-protocol') var WebConn = require('./webconn') @@ -82,6 +83,7 @@ function Peer (id, type) { self.type = type debug('new Peer %s', id) + debug('type %s', type) self.addr = null self.conn = null @@ -93,6 +95,10 @@ function Peer (id, type) { self.timeout = null // handshake timeout self.retries = 0 // outgoing TCP connection retry count + self.sentPe1 = false + self.sentPe2 = false + self.sentPe3 = false + self.sentPe4 = false self.sentHandshake = false } @@ -123,8 +129,7 @@ Peer.prototype.onConnect = function () { self.destroy(err) }) - var wire = self.wire = new Wire() - wire.type = self.type + var wire = self.wire = new Wire(self.type, self.retries, false) wire.once('end', function () { self.destroy() }) @@ -138,13 +143,73 @@ Peer.prototype.onConnect = function () { self.destroy(err) }) + wire.once('pe1', function () { + self.onPe1() + }) + wire.once('pe2', function () { + self.onPe2() + }) + wire.once('pe3', function () { + self.onPe3() + }) + wire.once('pe4', function () { + self.onPe4() + }) wire.once('handshake', function (infoHash, peerId) { self.onHandshake(infoHash, peerId) }) self.startHandshakeTimeout() conn.pipe(wire).pipe(conn) - if (self.swarm && !self.sentHandshake) self.handshake() + if (self.swarm) { + if (self.type === 'tcpOutgoing') { + if (self.retries === 0 && !self.sentPe1) self.sendPe1() + else if (!self.sentHandshake) self.handshake() + } + } else if (self.type !== 'tcpIncoming' && !self.sentHandshake) self.handshake() +} + +Peer.prototype.sendPe1 = function () { + this.wire.sendPe1() + this.sentPe1 = true +} + +Peer.prototype.onPe1 = function () { + this.sendPe2() +} + +Peer.prototype.sendPe2 = function () { + this.wire.sendPe2() + this.sentPe2 = true +} + +Peer.prototype.onPe2 = function () { + this.sendPe3() +} + +Peer.prototype.sendPe3 = function () { + this.wire.sendPe3(this.swarm.infoHash) + this.sentPe3 = true +} + +Peer.prototype.onPe3 = function (infoHashHash) { + var self = this + if (self.swarm) { + if (sha1.sync(Buffer.from(Buffer.from('req2', 'utf8').toString('hex') + self.swarm.infoHash, 'hex')) !== infoHashHash) { + self.destroy(new Error('unexpected crypto handshake info hash for this swarm')) + } + self.sendPe4() + } +} + +Peer.prototype.sendPe4 = function () { + this.wire.sendPe4(this.swarm.infoHash) + this.sentPe4 = true +} + +Peer.prototype.onPe4 = function () { + var self = this + if (!self.sentHandshake) self.handshake() } /** diff --git a/lib/tcp-pool.js b/lib/tcp-pool.js index 7625d8d5..ef260a5e 100644 --- a/lib/tcp-pool.js +++ b/lib/tcp-pool.js @@ -98,15 +98,33 @@ TCPPool.prototype._onConnection = function (conn) { var peer = Peer.createTCPIncomingPeer(conn) var wire = peer.wire + wire.once('pe3', onPe3) wire.once('handshake', onHandshake) + function onPe3 (infoHashHash) { + var torrent = self._client._getByHash(infoHashHash) + if (torrent) { + peer.swarm = torrent + torrent._addIncomingPeer(peer) + peer.onPe3(infoHashHash) + } else { + var err = new Error( + 'Unexpected info hash hash ' + infoHashHash + ' from incoming peer ' + peer.id + ) + peer.destroy(err) + } + } + function onHandshake (infoHash, peerId) { cleanupPending() var torrent = self._client.get(infoHash) + // only add incoming peer if didn't already do so in protocol encryption handshake if (torrent) { - peer.swarm = torrent - torrent._addIncomingPeer(peer) + if (!peer.swarm) { + peer.swarm = torrent + torrent._addIncomingPeer(peer) + } peer.onHandshake(infoHash, peerId) } else { var err = new Error(