From 29b6216fd024b00ea638e99f8a1c57a1c3abe4d2 Mon Sep 17 00:00:00 2001 From: Pierre Dubouilh Date: Sun, 26 Aug 2018 17:48:02 +0200 Subject: [PATCH 1/2] utp: add utp support --- index.js | 21 +++++---- lib/{tcp-pool.js => conn-pool.js} | 77 +++++++++++++++++++++---------- lib/peer.js | 67 +++++++++++++++++++-------- lib/torrent.js | 66 +++++++++++++------------- package.json | 3 +- 5 files changed, 148 insertions(+), 86 deletions(-) rename lib/{tcp-pool.js => conn-pool.js} (58%) diff --git a/index.js b/index.js index b90fc87d..a32ccc81 100644 --- a/index.js +++ b/index.js @@ -19,7 +19,7 @@ var randombytes = require('randombytes') var speedometer = require('speedometer') var zeroFill = require('zero-fill') -var TCPPool = require('./lib/tcp-pool') // browser exclude +var ConnPool = require('./lib/conn-pool') // browser exclude var Torrent = require('./lib/torrent') /** @@ -87,6 +87,7 @@ function WebTorrent (opts) { self.tracker = opts.tracker !== undefined ? opts.tracker : {} self.torrents = [] self.maxConns = Number(opts.maxConns) || 55 + self.utp = opts.utp !== false self._debug( 'new webtorrent (peerId %s, nodeId %s, port %s)', @@ -110,8 +111,8 @@ function WebTorrent (opts) { } } - if (typeof TCPPool === 'function') { - self._tcpPool = new TCPPool(self) + if (typeof ConnPool === 'function') { + self._connPool = new ConnPool(self) } else { process.nextTick(function () { self._onListening() @@ -395,8 +396,8 @@ WebTorrent.prototype._remove = function (torrentId, cb) { WebTorrent.prototype.address = function () { if (!this.listening) return null - return this._tcpPool - ? this._tcpPool.server.address() + return this._connPool + ? this._connPool.tcpServer.address() : { address: '0.0.0.0', family: 'IPv4', port: 0 } } @@ -420,9 +421,9 @@ WebTorrent.prototype._destroy = function (err, cb) { } }) - if (self._tcpPool) { + if (self._connPool) { tasks.push(function (cb) { - self._tcpPool.destroy(cb) + self._connPool.destroy(cb) }) } @@ -437,7 +438,7 @@ WebTorrent.prototype._destroy = function (err, cb) { if (err) self.emit('error', err) self.torrents = [] - self._tcpPool = null + self._connPool = null self.dht = null } @@ -445,9 +446,9 @@ WebTorrent.prototype._onListening = function () { this._debug('listening') this.listening = true - if (this._tcpPool) { + if (this._connPool) { // Sometimes server.address() returns `null` in Docker. - var address = this._tcpPool.server.address() + var address = this._connPool.tcpServer.address() if (address) this.torrentPort = address.port } diff --git a/lib/tcp-pool.js b/lib/conn-pool.js similarity index 58% rename from lib/tcp-pool.js rename to lib/conn-pool.js index 1094c4fd..22e3b448 100644 --- a/lib/tcp-pool.js +++ b/lib/conn-pool.js @@ -1,24 +1,26 @@ const arrayRemove = require('unordered-array-remove') -const debug = require('debug')('webtorrent:tcp-pool') +const debug = require('debug')('webtorrent:conn-pool') const net = require('net') // browser exclude +const utp = require('utp-native') // browser exclude const Peer = require('./peer') /** - * TCPPool + * Connection Pool * - * A "TCP pool" allows multiple swarms to listen on the same TCP port and determines + * A connection pool allows multiple swarms to listen on the same TCP/UDP port and determines * which swarm incoming connections are intended for by inspecting the bittorrent * handshake that the remote peer sends. * * @param {number} port */ -class TCPPool { +class ConnPool { constructor (client) { - debug('create tcp pool (port %s)', client.torrentPort) - - this.server = net.createServer() + debug('create pool (port %s)', client.torrentPort) + const self = this + let i = 0 this._client = client + this.utp = client.utp // Temporarily store incoming connections so they can be destroyed if the server is // closed before the connection is passed off to a Torrent. @@ -29,30 +31,52 @@ class TCPPool { } this._onListening = () => { - this._client._onListening() + // Kickoff client onListening when everything's setup + if (!self._client.utp || ++i === 2) { + self._client._onListening() + // Start UTP if needed + } else if (self._client.utp) { + self.utpServer.listen(self.tcpServer.address().port) + } } - this._onError = err => { - this._client._destroy(err) + this._onError = e => { + this._client._destroy(e) } - this.server.on('connection', this._onConnectionBound) - this.server.on('listening', this._onListening) - this.server.on('error', this._onError) + // Setup TCP + this.tcpServer = net.createServer() + this.tcpServer.on('connection', this._onConnectionBound) + this.tcpServer.on('error', this._onError) + + // Setup uTP + if (this.utp) { + this.utpServer = utp.createServer() + this.utpServer.on('connection', this._onConnectionBound) + this.utpServer.on('error', this._onError) + this.utpServer.on('listening', this._onListening) + } - this.server.listen(client.torrentPort) + // Start TCP + this.tcpServer.listen(client.torrentPort, this._onListening) } - /** + /** * Destroy this TCP pool. * @param {function} cb */ destroy (cb) { - debug('destroy tcp pool') + debug('destroy conn pool') - this.server.removeListener('connection', this._onConnectionBound) - this.server.removeListener('listening', this._onListening) - this.server.removeListener('error', this._onError) + if (this._client.utp) { + this.utpServer.removeListener('connection', this._onConnectionBound) + this.utpServer.removeListener('listening', this._onListening) + this.utpServer.removeListener('error', this._onError) + } + + this.tcpServer.removeListener('connection', this._onConnectionBound) + this.tcpServer.removeListener('listening', this._onListening) + this.tcpServer.removeListener('error', this._onError) // Destroy all open connection objects so server can close gracefully without waiting // for connection timeout or remote peer to disconnect. @@ -61,13 +85,18 @@ class TCPPool { conn.destroy() }) + try { + this.utpServer.close() + } catch (e) { } + try { this.server.close(cb) } catch (err) { if (cb) process.nextTick(cb) } - this.server = null + this.tcpServer = null + this.utpServer = null this._client = null this._pendingConns = null } @@ -83,8 +112,7 @@ class TCPPool { // then `remoteAddress` will not be available, and we can't use this connection. // - Node.js issue: https://github.com/nodejs/node-v0.x-archive/issues/7566 // - WebTorrent issue: https://github.com/webtorrent/webtorrent/issues/398 - if (!conn.remoteAddress) { - conn.on('error', noop) + if (!conn.address().address) { conn.destroy() return } @@ -92,8 +120,7 @@ class TCPPool { self._pendingConns.push(conn) conn.once('close', cleanupPending) - const peer = Peer.createTCPIncomingPeer(conn) - + const peer = Peer.createIncomingPeer(conn) const wire = peer.wire wire.once('handshake', onHandshake) @@ -125,4 +152,4 @@ class TCPPool { function noop () {} -module.exports = TCPPool +module.exports = ConnPool diff --git a/lib/peer.js b/lib/peer.js index cd54403d..c81d8c6d 100644 --- a/lib/peer.js +++ b/lib/peer.js @@ -4,10 +4,16 @@ var Wire = require('bittorrent-protocol') var WebConn = require('./webconn') -var CONNECT_TIMEOUT_TCP = 5000 -var CONNECT_TIMEOUT_WEBRTC = 25000 +var CONNECT_TIMEOUT = { + utpOutgoing: 5000, + tcpOutgoing: 5000, + webrtc: 2500 +} + var HANDSHAKE_TIMEOUT = 25000 +var RECONNECT_WAIT = [1000, 5000, 15000] + /** * WebRTC peer connections start out connected, because WebRTC peers require an * "introduction" (i.e. WebRTC signaling), and there's no equivalent to an IP address @@ -34,23 +40,23 @@ exports.createWebRTCPeer = function (conn, swarm) { * listening port of the TCP server. Until the remote peer sends a handshake, we don't * know what swarm the connection is intended for. */ -exports.createTCPIncomingPeer = function (conn) { - var addr = conn.remoteAddress + ':' + conn.remotePort - var peer = new Peer(addr, 'tcpIncoming') +exports.createIncomingPeer = function (conn, type) { + var addr = conn.address() + var addrStr = addr.address + ':' + addr.port + var peer = new Peer(addrStr, conn._utp ? 'utpIncoming' : 'tcpIncoming') peer.conn = conn - peer.addr = addr + peer.addr = addrStr peer.onConnect() - return peer } /** - * Outgoing TCP peers start out with just an IP address. At some point (when there is an + * Outgoing peers start out with just an IP address. At some point (when there is an * available connection), the client can attempt to connect to the address. */ -exports.createTCPOutgoingPeer = function (addr, swarm) { - var peer = new Peer(addr, 'tcpOutgoing') +exports.createOutgoingPeer = function (addr, swarm) { + var peer = new Peer(addr, 'outgoing') peer.addr = addr peer.swarm = swarm @@ -78,11 +84,10 @@ exports.createWebSeedPeer = function (url, swarm) { */ function Peer (id, type) { var self = this - self.id = id - self.type = type - debug('new %s Peer %s', type, id) + self.id = id + self.type = type self.addr = null self.conn = null self.swarm = null @@ -105,7 +110,7 @@ Peer.prototype.onConnect = function () { if (self.destroyed) return self.connected = true - debug('Peer %s connected', self.id) + debug('Peer %s connected, type %s', self.id, self.type) clearTimeout(self.connectTimeout) @@ -185,6 +190,32 @@ Peer.prototype.onHandshake = function (infoHash, peerId) { if (!self.sentHandshake) self.handshake() } +// When connection closes, attempt reconnect after timeout (with exponential backoff) +Peer.prototype.onClose = function (torrent) { + var self = this + if (torrent.destroyed || torrent.done) return + + if (self.retries >= RECONNECT_WAIT.length) { + debug( + 'conn %s closed: will not re-add (max %s attempts)', + self.addr, RECONNECT_WAIT.length + ) + return + } + + var ms = RECONNECT_WAIT[self.retries] + debug( + 'conn %s closed: will re-add to queue in %sms (attempt %s)', + self.addr, ms, self.retries + 1 + ) + + var reconnectTimeout = setTimeout(function reconnectTimeout () { + var newPeer = torrent._addPeer(self.addr) + if (newPeer) newPeer.retries = self.retries + 1 + }, ms) + if (reconnectTimeout.unref) reconnectTimeout.unref() +} + Peer.prototype.handshake = function () { var self = this var opts = { @@ -194,12 +225,12 @@ Peer.prototype.handshake = function () { self.sentHandshake = true } -Peer.prototype.startConnectTimeout = function () { +Peer.prototype.startConnectTimeout = function (cb) { var self = this clearTimeout(self.connectTimeout) - self.connectTimeout = setTimeout(function () { - self.destroy(new Error('connect timeout')) - }, self.type === 'webrtc' ? CONNECT_TIMEOUT_WEBRTC : CONNECT_TIMEOUT_TCP) + function defaultCB () { self.destroy(new Error('connect timeout')) } + + self.connectTimeout = setTimeout(cb || defaultCB, CONNECT_TIMEOUT[self.type]) if (self.connectTimeout.unref) self.connectTimeout.unref() } diff --git a/lib/torrent.js b/lib/torrent.js index a32326ae..db0343ba 100644 --- a/lib/torrent.js +++ b/lib/torrent.js @@ -27,6 +27,7 @@ const speedometer = require('speedometer') const uniq = require('uniq') const utMetadata = require('ut_metadata') const utPex = require('ut_pex') // browser exclude +const utp = require('utp-native') // browser exclude const parseRange = require('parse-numeric-range') const File = require('./file') @@ -47,8 +48,6 @@ const RECHOKE_OPTIMISTIC_DURATION = 2 // 30 seconds const FILESYSTEM_CONCURRENCY = 2 -const RECONNECT_WAIT = [ 1000, 5000, 15000 ] - const VERSION = require('../package.json').version const USER_AGENT = `WebTorrent/${VERSION} (https://webtorrent.io)` @@ -759,7 +758,7 @@ class Torrent extends EventEmitter { let newPeer if (typeof peer === 'string') { // `peer` is an addr ("ip:port" string) - newPeer = Peer.createTCPOutgoingPeer(peer, self) + newPeer = Peer.createOutgoingPeer(peer, self) } else { // `peer` is a WebRTC connection (simple-peer) newPeer = Peer.createWebRTCPeer(peer, self) @@ -1646,46 +1645,49 @@ class Torrent extends EventEmitter { const peer = self._queue.shift() if (!peer) return // queue could be empty - this._debug('tcp connect attempt to %s', peer.addr) - const parts = addrToIPPort(peer.addr) - const opts = { + const connOpts = { host: parts[0], port: parts[1] } - const conn = peer.conn = net.connect(opts) + // Start trying to connect with uTP + this._utpConnect(peer, connOpts) + } - conn.once('connect', () => { peer.onConnect() }) - conn.once('error', err => { peer.destroy(err) }) - peer.startConnectTimeout() + _utpConnect (peer, connOpts) { + if (!this.client.utp) return this._tcpConnect(peer, connOpts) + const self = this - // When connection closes, attempt reconnect after timeout (with exponential backoff) - conn.on('close', () => { - if (self.destroyed) return + this._debug('utp connect attempt to %s', peer.addr) + var conn = peer.conn = utp.connect(connOpts.port, connOpts.host) + peer.type = 'utpOutgoing' - // TODO: If torrent is done, do not try to reconnect after a timeout + function fallbackToTCP (err) { + if (!err && (peer.destroyed || peer.connected)) return + self._debug('utp timeout/error %s, (%s)', peer.addr, err) + conn.once('connect', noop) + conn.once('error', noop) + peer.conn.destroy() + self._tcpConnect(peer, connOpts) + } - if (peer.retries >= RECONNECT_WAIT.length) { - self._debug( - 'conn %s closed: will not re-add (max %s attempts)', - peer.addr, RECONNECT_WAIT.length - ) - return - } + peer.startConnectTimeout(fallbackToTCP) + conn.once('connect', () => { peer.onConnect() }) + conn.once('error', e => { fallbackToTCP(e) }) + conn.once('close', () => { peer.onClose(self) }) + } - const ms = RECONNECT_WAIT[peer.retries] - self._debug( - 'conn %s closed: will re-add to queue in %sms (attempt %s)', - peer.addr, ms, peer.retries + 1 - ) + _tcpConnect (peer, connOpts) { + this._debug('tcp connect attempt to %s', peer.addr) + const self = this + var conn = peer.conn = net.connect(connOpts) + peer.type = 'tcpOutgoing' - const reconnectTimeout = setTimeout(function reconnectTimeout () { - const newPeer = self._addPeer(peer.addr) - if (newPeer) newPeer.retries = peer.retries + 1 - }, ms) - if (reconnectTimeout.unref) reconnectTimeout.unref() - }) + conn.once('connect', () => { peer.onConnect() }) + conn.once('error', e => { peer.destroy(e) }) + conn.on('close', () => { peer.onClose(self) }) + peer.startConnectTimeout() } /** diff --git a/package.json b/package.json index 849f5563..c0d78d95 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ }, "browser": { "./lib/server.js": false, - "./lib/tcp-pool.js": false, + "./lib/conn-pool.js": false, "bittorrent-dht/client": false, "fs-chunk-store": "memory-chunk-store", "load-ip-set": false, @@ -67,6 +67,7 @@ "unordered-array-remove": "^1.0.2", "ut_metadata": "^3.0.8", "ut_pex": "^1.1.1", + "utp-native": "^1.6.2", "xtend": "^4.0.1", "zero-fill": "^2.2.3" }, From c75c19b31883dd668cadaba4747dd7c2eca37dc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Rodr=C3=ADguez=20Baquero?= Date: Sun, 26 Aug 2018 12:21:09 -0500 Subject: [PATCH 2/2] Standard fix --- lib/conn-pool.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/conn-pool.js b/lib/conn-pool.js index 22e3b448..6e5142c8 100644 --- a/lib/conn-pool.js +++ b/lib/conn-pool.js @@ -61,7 +61,7 @@ class ConnPool { this.tcpServer.listen(client.torrentPort, this._onListening) } - /** + /** * Destroy this TCP pool. * @param {function} cb */