diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..a0d574d0 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,63 @@ +# WebTorrent Version History + +## UNRELEASED + +### Added + +- `client.listening` property to signal whether TCP server is listening for incoming + connections. + +- `client.dhtPort` property reflects the actual DHT port when user doesn't specify one + (this is parallel to `client.torrentPort` for the TCP torrent listening server) + +### Changed + +- Merged `Swarm` class into `Torrent` object. Properties on `torrent.swarm` (like + `torrent.swarm.wires`) now exist on `torrent` (e.g. `torrent.wires`). + +- `torrent.addPeer` can no longer be called before the `infoHash` event has been + emitted. + +- Remove `torrent.on('listening')` event. Use `client.on('listening')` instead. + +- Remove support from `TCPPool` for listening on multiple ports. This was not used by + WebTorrent and just added complexity. There is now a single `TCPPool` instance for the + whole WebTorrent client. + +- Deprecate: Do not use `client.download()` anymore. Use `client.add()` instead. + +- Deprecate: Do not use `torrent.swarm` anymore. Use `torrent` instead. + +- Only pass `torrent.infoHash` to the Chunk Store constructor, instead of the `Torrent` + instance itself, to prevent accidental memory leaks of the `Torrent` object by the + store. (Open an issue if you were using other properties. They can be re-added.) + +- Non-fatal errors with a single torrent will be emitted at `torrent.on('error')`. You + should listen to this event. Previously, all torrent errors were also emitted on + `client.on('error')` and handling `torrent.on('error')` was optional. This design is + better since now it is possible to distinguish between fatal client errors + (`client.on('error')`) when the whole client becomes unusable versus recoverable errors + where only a single torrent fails (`torrent.on('error')`) but the client can continue to + be used. However, if there is no `torrent.on('error')` event, then the error will be + forwarded to `client.on('error')`. This prevents crashing the client when the user + only has a listener on the client, but it makes it impossible for them to determine + a client error versus a torrent error. + +### Fixed + +- If `client.get` is passed a `Torrent` instance, it now only returns it if it is present + in the client. + +- Errors creating a torrent with `client.seed` are now emitted on the returned `torrent` + object instead of the client (unless there is no event listeners on + `torrent.on('error')` as previously discussed). The torrent object is now also destroyed + automatically for the user, as was probably expected. + +- Do not return existing torrent object when duplicate torrent is added. Fire an + `'error'` event instead. + +- Memory leaks of `Torrent` object caused by many internal subclasses of WebTorrent, + including `RarityMap`, `TCPPool`, `WebConn`, `Server`, `File`. + +- `client.ratio` and `torrent.ratio` are now calculated as `uploaded / received` instead + of `uploaded / downloaded`. diff --git a/docs/api.md b/docs/api.md index 124899d8..b0e3ebaf 100644 --- a/docs/api.md +++ b/docs/api.md @@ -63,7 +63,7 @@ If `opts` is specified, then the default options (shown below) will be overridde ## `client.add(torrentId, [opts], [function ontorrent (torrent) {}])` -Start downloading a new torrent. Aliased as `client.download`. +Start downloading a new torrent. `torrentId` can be one of: @@ -201,6 +201,10 @@ Magnet URI of the torrent (string). Array of all files in the torrent. See documentation for `File` below to learn what methods/properties files have. +## `torrent.timeRemaining` + +Time remaining for download to complete (in milliseconds). + ## `torrent.received` Total bytes received from peers (*including* invalid data). @@ -213,10 +217,6 @@ Total *verified* bytes received from peers. Total bytes uploaded to peers. -## `torrent.timeRemaining` - -Time remaining for download to complete (in milliseconds). - ## `torrent.downloadSpeed` Torrent download speed, in bytes/sec. @@ -241,15 +241,19 @@ Number of peers in the torrent swarm. Torrent download location. -## `torrent.destroy()` +## `torrent.destroy([callback])` -Alias for `client.remove(torrent)`. +Alias for `client.remove(torrent)`. If `callback` is provided, it will be called when +the torrent is fully destroyed, i.e. all open sockets are closed, and the storage is +closed. ## `torrent.addPeer(peer)` -Adds a peer to the torrent swarm. Normally, you don't need to call `torrent.addPeer()`. -WebTorrent will automatically find peers using the tracker servers or DHT. This is just -for manually adding a peer to the client. +Add a peer to the torrent swarm. This is advanced functionality. Normally, you should not +need to call `torrent.addPeer()` manually. WebTorrent will automatically find peers using +the tracker servers or DHT. This is just for manually adding a peer to the client. + +This method should not be called until the `infoHash` event has been emitted. Returns `true` if peer was added, `false` if peer was blocked by the loaded blocklist. @@ -259,7 +263,7 @@ instance (for WebRTC peers). ## `torrent.addWebSeed(url)` -Adds a web seed to the torrent swarm. For more information on BitTorrent web seeds, see +Add a web seed to the torrent swarm. For more information on BitTorrent web seeds, see [BEP19](http://www.bittorrent.org/beps/bep_0019.html). In the browser, web seed servers must have proper CORS (Cross-origin resource sharing) @@ -267,11 +271,20 @@ headers so that data can be fetched across domain. The `url` argument is the web seed URL. +## `torrent.removePeer(peer)` + +Remove a peer from the torrent swarm. This is advanced functionality. Normally, you should +not need to call `torrent.removePeer()` manually. WebTorrent will automatically remove +peers from the torrent swarm when they're slow or don't have pieces that are needed. + +The `peer` argument should be an address (i.e. "ip:port" string), a peer id (hex string), +or `simple-peer` instance. + ## `torrent.select(start, end, [priority], [notify])` -Selects a range of pieces to prioritize starting with `start` and ending with `end` (both inclusive) -at the given `priority`. `notify` is an optional callback to be called when the selection is updated -with new data. +Selects a range of pieces to prioritize starting with `start` and ending with `end` (both +inclusive) at the given `priority`. `notify` is an optional callback to be called when the +selection is updated with new data. ## `torrent.deselect(start, end, priority)` @@ -452,7 +465,7 @@ called once the file is ready. `callback` must be specified, and will be called ```js file.getBuffer(function (err, buffer) { if (err) throw err - console.log(buffer) // + console.log(buffer) // }) ``` @@ -463,9 +476,10 @@ that handles many file types like video (.mp4, .webm, .m4v, etc.), audio (.m4a, .wav, etc.), images (.jpg, .gif, .png, etc.), and other file formats (.pdf, .md, .txt, etc.). -The file will be fetched from the network with highest priority and streamed into the -page (if it's video or audio). In some cases, video or audio files will not be streamable -because they're not in a format that the browser can stream so the file will be fully downloaded before being played. For other non-streamable file types like images and PDFs, +The file will be fetched from the network with highest priority and streamed into the page +(if it's video or audio). In some cases, video or audio files will not be streamable +because they're not in a format that the browser can stream so the file will be fully +downloaded before being played. For other non-streamable file types like images and PDFs, the file will be downloaded then displayed. `rootElem` is a container element (CSS selector or reference to DOM node) that the content diff --git a/index.js b/index.js index 59be5299..29478884 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,6 @@ module.exports = WebTorrent +var concat = require('simple-concat') var createTorrent = require('create-torrent') var debug = require('debug')('webtorrent') var DHT = require('bittorrent-dht/client') // browser exclude @@ -15,11 +16,9 @@ var Peer = require('simple-peer') var speedometer = require('speedometer') var zeroFill = require('zero-fill') -var concatStream = require('./lib/concat-stream') +var TCPPool = require('./lib/tcp-pool') // browser exclude var Torrent = require('./lib/torrent') -module.exports.WEBRTC_SUPPORT = Peer.WEBRTC_SUPPORT - /** * WebTorrent version. */ @@ -55,42 +54,56 @@ function WebTorrent (opts) { if (!opts) opts = {} + self.peerId = typeof opts.peerId === 'string' + ? opts.peerId + : (opts.peerId || new Buffer(VERSION_PREFIX + hat(48))).toString('hex') + self.peerIdBuffer = new Buffer(self.peerId, 'hex') + + self.nodeId = typeof opts.nodeId === 'string' + ? opts.nodeId + : (opts.nodeId && opts.nodeId.toString('hex')) || hat(160) + self.nodeIdBuffer = new Buffer(self.nodeId, 'hex') + self.destroyed = false + self.listening = false self.torrentPort = opts.torrentPort || 0 + self.dhtPort = opts.dhtPort || 0 self.tracker = opts.tracker !== undefined ? opts.tracker : true + self.torrents = [] + self.maxConns = Number(opts.maxConns) || 55 self._rtcConfig = opts.rtcConfig self._wrtc = opts.wrtc || global.WRTC // to support `webtorrent-hybrid` package - self.torrents = [] + if (typeof TCPPool === 'function') { + self._tcpPool = new TCPPool(self) + } else { + process.nextTick(function () { + self._onListening() + }) + } + // stats self._downloadSpeed = speedometer() self._uploadSpeed = speedometer() - self.maxConns = opts.maxConns - - self.peerId = typeof opts.peerId === 'string' - ? opts.peerId - : (opts.peerId || new Buffer(VERSION_PREFIX + hat(48))).toString('hex') - self.peerIdBuffer = new Buffer(self.peerId, 'hex') - - self.nodeId = typeof opts.nodeId === 'string' - ? opts.nodeId - : (opts.nodeId && opts.nodeId.toString('hex')) || hat(160) - self.nodeIdBuffer = new Buffer(self.nodeId, 'hex') - if (opts.dht !== false && typeof DHT === 'function' /* browser exclude */) { // use a single DHT instance for all torrents, so the routing table can be reused self.dht = new DHT(extend({ nodeId: self.nodeId }, opts.dht)) + self.dht.once('error', function (err) { - self.emit('error', err) - self.destroy() + self._destroy(err) + }) + + self.dht.once('listening', function () { + var address = self.dht.address() + if (address) self.dhtPort = address.port }) // Ignore warning when there are > 10 torrents in the client self.dht.setMaxListeners(0) - self.dht.listen(opts.dhtPort) + self.dht.listen(self.dhtPort) } else { self.dht = false } @@ -114,6 +127,8 @@ function WebTorrent (opts) { } } +WebTorrent.WEBRTC_SUPPORT = Peer.WEBRTC_SUPPORT + Object.defineProperty(WebTorrent.prototype, 'downloadSpeed', { get: function () { return this._downloadSpeed() } }) @@ -142,10 +157,10 @@ Object.defineProperty(WebTorrent.prototype, 'ratio', { var uploaded = this.torrents.reduce(function (total, torrent) { return total + torrent.uploaded }, 0) - var downloaded = this.torrents.reduce(function (total, torrent) { - return total + torrent.downloaded + var received = this.torrents.reduce(function (total, torrent) { + return total + torrent.received }, 0) || 1 - return uploaded / downloaded + return uploaded / received } }) @@ -159,29 +174,41 @@ Object.defineProperty(WebTorrent.prototype, 'ratio', { */ WebTorrent.prototype.get = function (torrentId) { var self = this - if (torrentId instanceof Torrent) return torrentId + var i, torrent + var len = self.torrents.length - var parsed - try { parsed = parseTorrent(torrentId) } catch (err) {} + if (torrentId instanceof Torrent) { + for (i = 0; i < len; i++) { + torrent = self.torrents[i] + if (torrent === torrentId) return torrent + } + } else { + var parsed + try { parsed = parseTorrent(torrentId) } catch (err) {} - if (!parsed) return null - if (!parsed.infoHash) throw new Error('Invalid torrent identifier') + if (!parsed) return null + if (!parsed.infoHash) throw new Error('Invalid torrent identifier') - for (var i = 0, len = self.torrents.length; i < len; i++) { - var torrent = self.torrents[i] - if (torrent.infoHash === parsed.infoHash) return torrent + for (i = 0; i < len; i++) { + torrent = self.torrents[i] + if (torrent.infoHash === parsed.infoHash) return torrent + } } return null } +WebTorrent.prototype.download = function (torrentId, opts, ontorrent) { + console.warn('WebTorrent: client.download() is deprecated. Use client.add() instead') + return this.add(torrentId, opts, ontorrent) +} + /** * Start downloading a new torrent. Aliased as `client.download`. * @param {string|Buffer|Object} torrentId * @param {Object} opts torrent-specific options * @param {function=} ontorrent called when the torrent is ready (has metadata) */ -WebTorrent.prototype.add = -WebTorrent.prototype.download = function (torrentId, opts, ontorrent) { +WebTorrent.prototype.add = function (torrentId, opts, ontorrent) { var self = this if (self.destroyed) throw new Error('client is destroyed') if (typeof opts === 'function') return self.add(torrentId, null, opts) @@ -189,33 +216,36 @@ WebTorrent.prototype.download = function (torrentId, opts, ontorrent) { debug('add') opts = opts ? extend(opts) : {} - var torrent = self.get(torrentId) - - if (torrent) { - if (torrent.ready) process.nextTick(onReady) - else torrent.once('ready', onReady) - } else { - torrent = new Torrent(torrentId, self, opts) - self.torrents.push(torrent) - - torrent.once('error', function (err) { - self.emit('error', err, torrent) - self.remove(torrent) - }) + var torrent = new Torrent(torrentId, self, opts) + self.torrents.push(torrent) - torrent.once('listening', function (port) { - self.emit('listening', port, torrent) - }) + torrent.once('infoHash', onInfoHash) + torrent.once('ready', onReady) + torrent.once('close', onClose) - torrent.once('ready', onReady) + function onInfoHash () { + if (self.destroyed) return + for (var i = 0, len = self.torrents.length; i < len; i++) { + var t = self.torrents[i] + if (t.infoHash === torrent.infoHash && t !== torrent) { + torrent._destroy(new Error('Cannot add duplicate torrent ' + torrent.infoHash)) + return + } + } } function onReady () { - debug('on torrent') + if (self.destroyed) return if (typeof ontorrent === 'function') ontorrent(torrent) self.emit('torrent', torrent) } + function onClose () { + torrent.removeListener('infoHash', onInfoHash) + torrent.removeListener('ready', onReady) + torrent.removeListener('close', onClose) + } + return torrent } @@ -244,25 +274,28 @@ WebTorrent.prototype.seed = function (input, opts, onseed) { if (!Array.isArray(input)) input = [ input ] parallel(input.map(function (item) { return function (cb) { - if (isReadable(item)) concatStream(item, cb) + if (isReadable(item)) concat(item, cb) else cb(null, item) } }), function (err, input) { - if (err) return self.emit('error', err, torrent) if (self.destroyed) return + if (err) return torrent._destroy(err) + createTorrent.parseInput(input, opts, function (err, files) { - if (err) return self.emit('error', err, torrent) if (self.destroyed) return - streams = files.map(function (file) { return file.getStream }) + if (err) return torrent._destroy(err) + + streams = files.map(function (file) { + return file.getStream + }) createTorrent(input, opts, function (err, torrentBuf) { - if (err) return self.emit('error', err, torrent) if (self.destroyed) return + if (err) return torrent._destroy(err) var existingTorrent = self.get(torrentBuf) if (existingTorrent) { - torrent.destroy() - _onseed(existingTorrent) + torrent._destroy(new Error('Cannot add duplicate torrent ' + existingTorrent.infoHash)) } else { torrent._onTorrentId(torrentBuf) } @@ -283,7 +316,7 @@ WebTorrent.prototype.seed = function (input, opts, onseed) { } parallel(tasks, function (err) { if (self.destroyed) return - if (err) return self.emit('error', err, torrent) + if (err) return torrent._destroy(err) _onseed(torrent) }) } @@ -303,19 +336,24 @@ WebTorrent.prototype.seed = function (input, opts, onseed) { * @param {function} cb */ WebTorrent.prototype.remove = function (torrentId, cb) { - var self = this debug('remove') - - var torrent = self.get(torrentId) + var torrent = this.get(torrentId) if (!torrent) throw new Error('No torrent with id ' + torrentId) + this._remove(torrentId, cb) +} - self.torrents.splice(self.torrents.indexOf(torrent), 1) +WebTorrent.prototype._remove = function (torrentId, cb) { + var torrent = this.get(torrentId) + if (!torrent) return + this.torrents.splice(this.torrents.indexOf(torrent), 1) torrent.destroy(cb) } WebTorrent.prototype.address = function () { - var self = this - return { address: '0.0.0.0', family: 'IPv4', port: self.torrentPort } + if (!this.listening) return null + return this._tcpPool + ? this._tcpPool.server.address() + : { address: '0.0.0.0', family: 'IPv4', port: 0 } } /** @@ -323,18 +361,53 @@ WebTorrent.prototype.address = function () { * @param {function} cb */ WebTorrent.prototype.destroy = function (cb) { + if (this.destroyed) throw new Error('client already destroyed') + this._destroy(null, cb) +} + +WebTorrent.prototype._destroy = function (err, cb) { var self = this - if (self.destroyed) throw new Error('client already destroyed') + debug('client destroy') self.destroyed = true - debug('destroy') var tasks = self.torrents.map(function (torrent) { - return function (cb) { self.remove(torrent, cb) } + return function (cb) { + torrent.destroy(cb) + } }) - if (self.dht) tasks.push(function (cb) { self.dht.destroy(cb) }) + if (self._tcpPool) { + tasks.push(function (cb) { + self._tcpPool.destroy(cb) + }) + } + + if (self.dht) { + tasks.push(function (cb) { + self.dht.destroy(cb) + }) + } parallel(tasks, cb) + + if (err) self.emit('error', err) + + self.torrents = [] + self._tcpPool = null + self.dht = null +} + +WebTorrent.prototype._onListening = function () { + this.listening = true + + if (this._tcpPool) { + // Sometimes server.address() returns `null` in Docker. + // WebTorrent issue: https://github.com/feross/bittorrent-swarm/pull/18 + var address = this._tcpPool.server.address() + if (address) this.torrentPort = address.port + } + + this.emit('listening') } /** diff --git a/lib/concat-stream.js b/lib/concat-stream.js deleted file mode 100644 index c2d88600..00000000 --- a/lib/concat-stream.js +++ /dev/null @@ -1,14 +0,0 @@ -module.exports = function (stream, cb) { - var chunks = [] - stream.on('data', function (chunk) { - chunks.push(chunk) - }) - stream.once('end', function () { - if (cb) cb(null, Buffer.concat(chunks)) - cb = null - }) - stream.once('error', function (err) { - if (cb) cb(err) - cb = null - }) -} diff --git a/lib/file-stream.js b/lib/file-stream.js index 55355ea0..508f3584 100644 --- a/lib/file-stream.js +++ b/lib/file-stream.js @@ -88,7 +88,6 @@ FileStream.prototype.destroy = function (onclose) { FileStream.prototype._destroy = function (err, onclose) { if (this.destroyed) return - if (onclose) this.once('close', onclose) this.destroyed = true if (!this._torrent.destroyed) { @@ -97,4 +96,5 @@ FileStream.prototype._destroy = function (err, onclose) { if (err) this.emit('error', err) this.emit('close') + if (onclose) onclose() } diff --git a/lib/file.js b/lib/file.js index fd24e0ed..c73fc386 100644 --- a/lib/file.js +++ b/lib/file.js @@ -12,10 +12,6 @@ var streamToBuffer = require('stream-with-known-length-to-buffer') inherits(File, EventEmitter) -/** - * @param {Torrent} torrent torrent that the file belongs to - * @param {Object} file file object from the parsed torrent - */ function File (torrent, file) { EventEmitter.call(this) @@ -40,33 +36,16 @@ function File (torrent, file) { } } -/** - * Selects the file to be downloaded, but at a lower priority than files with streams. - * Useful if you know you need the file at a later stage. - */ File.prototype.select = function (priority) { if (this.length === 0) return this._torrent.select(this._startPiece, this._endPiece, priority) } -/** - * Deselects the file, which means it won't be downloaded unless someone creates a stream - * for it. - */ File.prototype.deselect = function () { if (this.length === 0) return this._torrent.deselect(this._startPiece, this._endPiece, false) } -/** - * Create a readable stream to the file. Pieces needed by the stream will be prioritized - * highly and fetched from the swarm first. - * - * @param {Object=} opts - * @param {number} opts.start start stream at byte (inclusive) - * @param {number} opts.end end stream at byte (inclusive) - * @return {FileStream} - */ File.prototype.createReadStream = function (opts) { var self = this if (this.length === 0) { @@ -89,36 +68,26 @@ File.prototype.createReadStream = function (opts) { return fileStream } -/** - * @param {function} cb - */ File.prototype.getBuffer = function (cb) { streamToBuffer(this.createReadStream(), this.length, cb) } -/** - * @param {function} cb - */ File.prototype.getBlobURL = function (cb) { if (typeof window === 'undefined') throw new Error('browser-only method') var mime = render.mime[path.extname(this.name).toLowerCase()] streamToBlobURL(this.createReadStream(), mime, cb) } -/** - * @param {Element|string} elem - * @param {function} cb - */ File.prototype.appendTo = function (elem, cb) { if (typeof window === 'undefined') throw new Error('browser-only method') render.append(this, elem, cb) } -/** - * @param {Element|string} elem - * @param {function} cb - */ File.prototype.renderTo = function (elem, cb) { if (typeof window === 'undefined') throw new Error('browser-only method') render.render(this, elem, cb) } + +File.prototype._destroy = function () { + this._torrent = null +} diff --git a/lib/peer.js b/lib/peer.js index 06786c5c..fcd296d5 100644 --- a/lib/peer.js +++ b/lib/peer.js @@ -60,10 +60,10 @@ exports.createTCPOutgoingPeer = function (addr, swarm) { /** * Peer that represents a Web Seed (BEP17 / BEP19). */ -exports.createWebSeedPeer = function (url, parsedTorrent, swarm) { +exports.createWebSeedPeer = function (url, swarm) { var peer = new Peer(url, 'webSeed') peer.swarm = swarm - peer.conn = new WebConn(url, parsedTorrent) + peer.conn = new WebConn(url, swarm) peer.onConnect() @@ -71,7 +71,7 @@ exports.createWebSeedPeer = function (url, parsedTorrent, swarm) { } /** - * Peer. Represents a peer in the Swarm. + * Peer. Represents a peer in the torrent swarm. * * @param {string} id "ip:port" string, peer id (for WebRTC peers), or url (for Web Seeds) * @param {string} type the type of the peer @@ -161,7 +161,7 @@ Peer.prototype.onHandshake = function (infoHash, peerId) { return self.destroy(new Error('unexpected handshake info hash for this swarm')) } if (peerId === self.swarm.peerId) { - return self.destroy(new Error('refusing to handshake with self')) + return self.destroy(new Error('refusing to connect to ourselves')) } debug('Peer %s got handshake %s', self.id, infoHash) @@ -170,27 +170,12 @@ Peer.prototype.onHandshake = function (infoHash, peerId) { self.retries = 0 - self.wire.on('download', function (downloaded) { - if (self.destroyed) return - self.swarm.downloaded += downloaded - self.swarm.downloadSpeed(downloaded) - self.swarm.emit('download', downloaded) - }) - - self.wire.on('upload', function (uploaded) { - if (self.destroyed) return - self.swarm.uploaded += uploaded - self.swarm.uploadSpeed(uploaded) - self.swarm.emit('upload', uploaded) - }) - - self.swarm.wires.push(self.wire) - var addr = self.addr if (!addr && self.conn.remoteAddress) { addr = self.conn.remoteAddress + ':' + self.conn.remotePort } - self.swarm.emit('wire', self.wire, addr) + self.swarm._onWire(self.wire, addr) + // swarm could be destroyed in user's 'wire' event handler if (!self.swarm || self.swarm.destroyed) return @@ -199,7 +184,10 @@ Peer.prototype.onHandshake = function (infoHash, peerId) { Peer.prototype.handshake = function () { var self = this - self.wire.handshake(self.swarm.infoHash, self.swarm.peerId, self.swarm.handshakeOpts) + var opts = { + dht: self.swarm.private ? false : !!self.swarm.client.dht + } + self.wire.handshake(self.swarm.infoHash, self.swarm.client.peerId, opts) self.sentHandshake = true } @@ -236,8 +224,8 @@ Peer.prototype.destroy = function (err) { var conn = self.conn var wire = self.wire - self.conn = null self.swarm = null + self.conn = null self.wire = null if (swarm && wire) { diff --git a/lib/rarity-map.js b/lib/rarity-map.js index 0b1d1478..430f62f8 100644 --- a/lib/rarity-map.js +++ b/lib/rarity-map.js @@ -1,57 +1,32 @@ module.exports = RarityMap /** - * Mapping of torrent pieces to their respective availability in the swarm. Used by - * the torrent manager for implementing the rarest piece first selection strategy. - * - * @param {Swarm} swarm bittorrent-swarm to track availability - * @param {number} numPieces number of pieces in the torrent + * Mapping of torrent pieces to their respective availability in the torrent swarm. Used + * by the torrent manager for implementing the rarest piece first selection strategy. */ -function RarityMap (swarm, numPieces) { +function RarityMap (torrent) { var self = this - self.pieces = [] - self.swarm = swarm - self.numPieces = numPieces - - function initWire (wire) { - wire.on('have', function (index) { - self.pieces[index] += 1 - }) - wire.on('bitfield', function () { - self.recalculate() - }) - wire.on('close', function () { - for (var i = 0; i < self.numPieces; ++i) { - self.pieces[i] -= wire.peerPieces.get(i) - } - }) - } + self._torrent = torrent + self._numPieces = torrent.pieces.length + self._pieces = [] - self.swarm.wires.forEach(initWire) - self.swarm.on('wire', function (wire) { + self._onWire = function (wire) { + self.recalculate() + self._initWire(wire) + } + self._onWireHave = function (index) { + self._pieces[index] += 1 + } + self._onWireBitfield = function () { self.recalculate() - initWire(wire) - }) - - self.recalculate() -} - -/** - * Recalculates piece availability across all peers in the swarm. - */ -RarityMap.prototype.recalculate = function () { - var self = this - - for (var i = 0; i < self.numPieces; ++i) { - self.pieces[i] = 0 } - self.swarm.wires.forEach(function (wire) { - for (var i = 0; i < self.numPieces; ++i) { - self.pieces[i] += wire.peerPieces.get(i) - } + self._torrent.wires.forEach(function (wire) { + self._initWire(wire) }) + self._torrent.on('wire', self._onWire) + self.recalculate() } /** @@ -62,15 +37,15 @@ RarityMap.prototype.recalculate = function () { * @return {number} index of rarest piece, or -1 */ RarityMap.prototype.getRarestPiece = function (pieceFilterFunc) { - var self = this + if (!pieceFilterFunc) pieceFilterFunc = trueFn + var candidates = [] var min = Infinity - pieceFilterFunc = pieceFilterFunc || function () { return true } - for (var i = 0; i < self.numPieces; ++i) { + for (var i = 0; i < this._numPieces; ++i) { if (!pieceFilterFunc(i)) continue - var availability = self.pieces[i] + var availability = this._pieces[i] if (availability === min) { candidates.push(i) } else if (availability < min) { @@ -86,3 +61,61 @@ RarityMap.prototype.getRarestPiece = function (pieceFilterFunc) { return -1 } } + +RarityMap.prototype.destroy = function () { + var self = this + self._torrent.removeListener('wire', self._onWire) + self._torrent.wires.forEach(function (wire) { + self._cleanupWireEvents(wire) + }) + self._torrent = null + self._pieces = null + + self._onWire = null + self._onWireHave = null + self._onWireBitfield = null +} + +RarityMap.prototype._initWire = function (wire) { + var self = this + + wire._onClose = function () { + self._cleanupWireEvents(wire) + for (var i = 0; i < this._numPieces; ++i) { + self._pieces[i] -= wire.peerPieces.get(i) + } + } + + wire.on('have', self._onWireHave) + wire.on('bitfield', self._onWireBitfield) + wire.once('close', wire._onClose) +} + +/** + * Recalculates piece availability across all peers in the torrent. + */ +RarityMap.prototype.recalculate = function () { + var i + for (i = 0; i < this._numPieces; ++i) { + this._pieces[i] = 0 + } + + var numWires = this._torrent.wires.length + for (i = 0; i < numWires; ++i) { + var wire = this._torrent.wires[i] + for (var j = 0; j < this._numPieces; ++j) { + this._pieces[j] += wire.peerPieces.get(j) + } + } +} + +RarityMap.prototype._cleanupWireEvents = function (wire) { + wire.removeListener('have', this._onWireHave) + wire.removeListener('bitfield', this._onWireBitfield) + if (wire._onClose) wire.removeListener('close', wire._onClose) + wire._onClose = null +} + +function trueFn () { + return true +} diff --git a/lib/server.js b/lib/server.js index 27a645e7..cfa883f3 100644 --- a/lib/server.js +++ b/lib/server.js @@ -12,19 +12,22 @@ function Server (torrent, opts) { var server = http.createServer(opts) var sockets = [] + var pendingReady = [] var closed = false - server.on('connection', function (socket) { - socket.setTimeout(36000000) - sockets.push(socket) - socket.on('close', function () { - arrayRemove(sockets, sockets.indexOf(socket)) - }) - }) + server.on('connection', onConnection) + server.on('request', onRequest) var _close = server.close server.close = function (cb) { closed = true + torrent = null + server.removeListener('connection', onConnection) + server.removeListener('request', onRequest) + while (pendingReady.length) { + var onReady = pendingReady.pop() + torrent.removeListener('ready', onReady) + } _close.call(server, cb) } @@ -38,7 +41,15 @@ function Server (torrent, opts) { else server.close(cb) } - server.on('request', function (req, res) { + function onConnection (socket) { + socket.setTimeout(36000000) + sockets.push(socket) + socket.once('close', function () { + arrayRemove(sockets, sockets.indexOf(socket)) + }) + } + + function onRequest (req, res) { debug('onRequest') // Allow CORS requests to specify arbitrary headers, e.g. 'Range', @@ -61,10 +72,15 @@ function Server (torrent, opts) { var pathname = url.parse(req.url).pathname if (pathname === '/favicon.ico') return res.end() - if (torrent.ready) onReady() - else torrent.once('ready', onReady) + if (torrent.ready) { + onReady() + } else { + pendingReady.push(onReady) + torrent.once('ready', onReady) + } function onReady () { + arrayRemove(pendingReady, pendingReady.indexOf(onReady)) if (pathname === '/') { res.setHeader('Content-Type', 'text/html') var listHtml = torrent.files.map(function (file, i) { @@ -112,7 +128,7 @@ function Server (torrent, opts) { if (req.method === 'HEAD') res.end() pump(file.createReadStream(range), res) } - }) + } return server } diff --git a/lib/swarm.js b/lib/swarm.js deleted file mode 100644 index 7edee82b..00000000 --- a/lib/swarm.js +++ /dev/null @@ -1,413 +0,0 @@ -module.exports = Swarm - -var addrToIPPort = require('addr-to-ip-port') -var debug = require('debug')('webtorrent:swarm') -var EventEmitter = require('events').EventEmitter -var inherits = require('inherits') -var net = require('net') // browser exclude -var speedometer = require('speedometer') - -var Peer = require('./peer') -var TCPPool = require('./tcp-pool') // browser-exclude - -var MAX_CONNS = 55 -var RECONNECT_WAIT = [ 1000, 5000, 15000 ] - -inherits(Swarm, EventEmitter) - -/** - * BitTorrent Swarm - * - * Abstraction of a BitTorrent "swarm", which is handy for managing all peer - * connections for a given torrent download. This handles connecting to peers, - * listening for incoming connections, and doing the initial peer wire protocol - * handshake with peers. It also tracks total data uploaded/downloaded to/from - * the swarm. - * - * @param {Buffer|string} infoHash - * @param {Buffer|string} peerId - * @param {Object} opts - * @param {Object} opts.handshake handshake options (passed to bittorrent-protocol) - * @param {number} opts.maxConns maximum number of connections in swarm - */ -function Swarm (infoHash, peerId, opts) { - var self = this - if (!(self instanceof Swarm)) return new Swarm(infoHash, peerId, opts) - EventEmitter.call(self) - - self.infoHash = typeof infoHash === 'string' - ? infoHash - : infoHash.toString('hex') - self.infoHashBuffer = new Buffer(self.infoHash, 'hex') - - self.peerId = typeof peerId === 'string' - ? peerId - : peerId.toString('hex') - self.peerIdBuffer = new Buffer(self.peerId, 'hex') - - if (!opts) opts = {} - - debug('new swarm (i %s p %s)', self.infoHash, self.peerId) - - self.handshakeOpts = opts.handshake // handshake extensions (optional) - self.maxConns = Number(opts.maxConns) || MAX_CONNS - - self.destroyed = false - self.listening = false - self.paused = false - - self.server = null // tcp listening socket - self.wires = [] // open wires (added *after* handshake) - - self._queue = [] // queue of outgoing tcp peers to connect to - self._peers = {} // connected peers (addr/peerId -> Peer) - self._peersLength = 0 // number of elements in `self._peers` (cache, for perf) - self._port = 0 // tcp listening port (cache, for perf) - - // track stats - self.downloaded = 0 - self.uploaded = 0 - self.downloadSpeed = speedometer() - self.uploadSpeed = speedometer() -} - -Object.defineProperty(Swarm.prototype, 'ratio', { - get: function () { - var self = this - return (self.uploaded / self.downloaded) || 0 - } -}) - -Object.defineProperty(Swarm.prototype, 'numQueued', { - get: function () { - var self = this - return self._queue.length + (self._peersLength - self.numConns) - } -}) - -Object.defineProperty(Swarm.prototype, 'numConns', { - get: function () { - var self = this - var numConns = 0 - for (var id in self._peers) { - if (self._peers[id].connected) numConns += 1 - } - return numConns - } -}) - -Object.defineProperty(Swarm.prototype, 'numPeers', { - get: function () { - var self = this - return self.wires.length - } -}) - -/** - * Add a peer to the swarm. - * @param {string|simple-peer} peer "ip:port" string or simple-peer instance - * @param {string} peer.id bittorrent peer id (when `peer` is simple-peer) - * @return {boolean} true if peer was added, false if peer was invalid - - */ -Swarm.prototype.addPeer = function (peer) { - var self = this - var newPeer = self._addPeer(peer) - return !!newPeer // don't expose private Peer instance in return value -} - -Swarm.prototype._addPeer = function (peer) { - var self = this - if (self.destroyed) { - debug('ignoring added peer: swarm already destroyed') - if (typeof peer !== 'string') peer.destroy() - return null - } - if (typeof peer === 'string' && !self._validAddr(peer)) { - debug('ignoring added peer: invalid address %s', peer) - return null - } - - var id = (peer && peer.id) || peer - if (self._peers[id]) { - debug('ignoring added peer: duplicate peer id') - if (typeof peer !== 'string') peer.destroy() - return null - } - - if (self.paused) { - debug('ignoring added peer: swarm paused') - if (typeof peer !== 'string') peer.destroy() - return null - } - - debug('addPeer %s', id) - - var newPeer - if (typeof peer === 'string') { - // `peer` is an addr ("ip:port" string) - newPeer = Peer.createTCPOutgoingPeer(peer, self) - } else { - // `peer` is a WebRTC connection (simple-peer) - newPeer = Peer.createWebRTCPeer(peer, self) - } - - self._peers[newPeer.id] = newPeer - self._peersLength += 1 - - if (typeof peer === 'string') { - // `peer` is an addr ("ip:port" string) - self._queue.push(newPeer) - self._drain() - } - - return newPeer -} - -/** - * Add a web seed to the swarm. - * @param {string} url web seed url - * @param {Object} parsedTorrent - */ -Swarm.prototype.addWebSeed = function (url, parsedTorrent) { - var self = this - if (self.destroyed) return - - if (!/^https?:\/\/.+/.test(url)) { - debug('ignoring invalid web seed %s (from swarm.addWebSeed)', url) - return - } - - if (self._peers[url]) return - - debug('addWebSeed %s', url) - - var newPeer = Peer.createWebSeedPeer(url, parsedTorrent, self) - self._peers[newPeer.id] = newPeer - self._peersLength += 1 -} - -/** - * Called whenever a new incoming TCP peer connects to this swarm. Called with a peer - * that has already sent a handshake. - * @param {Peer} peer - */ -Swarm.prototype._addIncomingPeer = function (peer) { - var self = this - if (self.destroyed) return peer.destroy(new Error('swarm already destroyed')) - if (self.paused) return peer.destroy(new Error('swarm paused')) - - if (!self._validAddr(peer.addr)) { - return peer.destroy(new Error('invalid addr ' + peer.addr + ' (from incoming)')) - } - debug('_addIncomingPeer %s', peer.id) - - self._peers[peer.id] = peer - self._peersLength += 1 -} - -/** - * Remove a peer from the swarm. - * @param {string} id for tcp peers, "ip:port" string; for webrtc peers, peerId - */ -Swarm.prototype.removePeer = function (id) { - var self = this - var peer = self._peers[id] - if (!peer) return - - debug('removePeer %s', id) - - delete self._peers[id] - self._peersLength -= 1 - - peer.destroy() - - // If swarm was at capacity before, try to open a new connection now - self._drain() -} - -/** - * Temporarily stop connecting to new peers. Note that this does not pause the streams - * of existing connections or their wires. - */ -Swarm.prototype.pause = function () { - var self = this - if (self.destroyed) return - debug('pause') - self.paused = true -} - -/** - * Resume connecting to new peers. - */ -Swarm.prototype.resume = function () { - var self = this - if (self.destroyed) return - debug('resume') - self.paused = false - self._drain() -} - -/** - * Listen on the given port for peer connections. - * @param {number} port - * @param {string=} hostname - * @param {function=} onlistening - */ -Swarm.prototype.listen = function (port, hostname, onlistening) { - var self = this - if (typeof hostname === 'function') { - onlistening = hostname - hostname = undefined - } - if (self.listening) throw new Error('swarm already listening') - if (onlistening) self.once('listening', onlistening) - - if (typeof TCPPool === 'function') { - self._port = port || TCPPool.getDefaultListenPort(self.infoHash) - self._hostname = hostname - - debug('listen %s', port) - - var pool = TCPPool.addSwarm(self) - self.server = pool.server - } else { - // In browser, listen() is no-op, but still fire 'listening' event so that - // same code works in node and the browser. - process.nextTick(function () { - self._onListening(0) - }) - } -} - -Swarm.prototype._onListening = function (port) { - var self = this - self._port = port - self.listening = true - self.emit('listening') -} - -Swarm.prototype.address = function () { - var self = this - if (!self.listening) return null - return self.server - ? self.server.address() - : { port: 0, family: 'IPv4', address: '127.0.0.1' } -} - -/** - * Destroy the swarm, close all open peer connections, and do cleanup. - * @param {function} onclose - */ -Swarm.prototype.destroy = function (onclose) { - var self = this - if (self.destroyed) return - - self.destroyed = true - self.listening = false - self.paused = false - - if (onclose) self.once('close', onclose) - - debug('destroy') - - for (var id in self._peers) { - self.removePeer(id) - } - - if (typeof TCPPool === 'function') { - TCPPool.removeSwarm(self, function () { - // TODO: only emit when all peers are destroyed - self.emit('close') - }) - } else { - process.nextTick(function () { - self.emit('close') - }) - } -} - -/** - * Pop a peer off the FIFO queue and connect to it. When _drain() gets called, - * the queue will usually have only one peer in it, except when there are too - * many peers (over `this.maxConns`) in which case they will just sit in the - * queue until another connection closes. - */ -Swarm.prototype._drain = function () { - var self = this - debug('_drain numConns %s maxConns %s', self.numConns, self.maxConns) - if (typeof net.connect !== 'function' || self.destroyed || self.paused || - self.numConns >= self.maxConns) { - return - } - debug('drain (%s queued, %s/%s peers)', self.numQueued, self.numPeers, self.maxConns) - - var peer = self._queue.shift() - if (!peer) return // queue could be empty - - debug('tcp connect attempt to %s', peer.addr) - - var parts = addrToIPPort(peer.addr) - var opts = { - host: parts[0], - port: parts[1] - } - if (self._hostname) opts.localAddress = self._hostname - - var conn = peer.conn = net.connect(opts) - - conn.once('connect', function () { peer.onConnect() }) - conn.once('error', function (err) { peer.destroy(err) }) - peer.startConnectTimeout() - - // When connection closes, attempt reconnect after timeout (with exponential backoff) - conn.on('close', function () { - if (self.destroyed) return - - // TODO: If torrent is done, do not try to reconnect after a timeout - - if (peer.retries >= RECONNECT_WAIT.length) { - debug( - 'conn %s closed: will not re-add (max %s attempts)', - peer.addr, RECONNECT_WAIT.length - ) - return - } - - var ms = RECONNECT_WAIT[peer.retries] - debug( - 'conn %s closed: will re-add to queue in %sms (attempt %s)', - peer.addr, ms, peer.retries + 1 - ) - - var reconnectTimeout = setTimeout(function reconnectTimeout () { - var newPeer = self._addPeer(peer.addr) - if (newPeer) newPeer.retries = peer.retries + 1 - }, ms) - if (reconnectTimeout.unref) reconnectTimeout.unref() - }) -} - -Swarm.prototype._onError = function (err) { - var self = this - self.emit('error', err) - self.destroy() -} - -/** - * Returns `true` if string is valid IPv4/6 address, and is not the address of this swarm. - * @param {string} addr - * @return {boolean} - */ -Swarm.prototype._validAddr = function (addr) { - var self = this - var parts - try { - parts = addrToIPPort(addr) - } catch (e) { - return false - } - var host = parts[0] - var port = parts[1] - return port > 0 && port < 65535 && !(host === '127.0.0.1' && port === self._port) -} diff --git a/lib/tcp-pool.js b/lib/tcp-pool.js index 7e95f6c8..5e15d0a3 100644 --- a/lib/tcp-pool.js +++ b/lib/tcp-pool.js @@ -6,12 +6,6 @@ var net = require('net') // browser exclude var Peer = require('./peer') -/** - * Shared TCP pools; shared among all swarms - * @type {Object} port: number -> pool: TCPPool - */ -var tcpPools = {} - /** * TCPPool * @@ -20,107 +14,35 @@ var tcpPools = {} * handshake that the remote peer sends. * * @param {number} port - * @param {string} hostname */ -function TCPPool (port, hostname) { +function TCPPool (client) { var self = this - - self.port = port - self.listening = false - self.swarms = {} // infoHash (hex) -> Swarm - - debug('new TCPPool (port: %s, hostname: %s)', port, hostname) - - // Save incoming conns so they can be destroyed if server is closed before the conn is - // passed off to a Swarm. - self.pendingConns = [] + debug('create tcp pool (port %s)', client.torrentPort) self.server = net.createServer() - self.server.on('connection', function (conn) { self._onConnection(conn) }) - self.server.on('error', function (err) { self._onError(err) }) - self.server.on('listening', function () { self._onListening() }) - self.server.listen(self.port, hostname) -} - -/** - * STATIC METHOD - * Add a swarm to a pool, creating a new pool if necessary. - * @param {Swarm} swarm - */ -TCPPool.addSwarm = function (swarm) { - var pool = tcpPools[swarm._port] - if (!pool) pool = tcpPools[swarm._port] = new TCPPool(swarm._port, swarm._hostname) - pool.addSwarm(swarm) - return pool -} - -/** - * STATIC METHOD - * Remove a swarm from its pool. - * @param {Swarm} swarm - */ -TCPPool.removeSwarm = function (swarm, cb) { - var pool = tcpPools[swarm._port] - if (!pool) return cb() - pool.removeSwarm(swarm) + self._client = client - if (Object.keys(pool.swarms).length === 0) pool.destroy(cb) - else process.nextTick(cb) -} + // Temporarily store incoming connections so they can be destroyed if the server is + // closed before the connection is passed off to a Torrent. + self._pendingConns = [] -/** - * STATIC METHOD - * When `Swarm.prototype.listen` is called without specifying a port, a reasonable - * default port must be chosen. If there already exists an active TCP pool, then return - * that pool's port so that TCP server can be re-used. Otherwise, return 0 so node will - * pick a free port. - * - * @return {number} port - */ -TCPPool.getDefaultListenPort = function (infoHash) { - for (var port in tcpPools) { - var pool = tcpPools[port] - if (!pool.swarms[infoHash]) return pool.port + self._onConnectionBound = function (conn) { + self._onConnection(conn) } - return 0 -} - -/** - * Add a swarm to this TCP pool. - * @param {Swarm} swarm - */ -TCPPool.prototype.addSwarm = function (swarm) { - var self = this - if (self.swarms[swarm.infoHash]) { - process.nextTick(function () { - swarm._onError(new Error( - 'There is already a swarm with info hash ' + swarm.infoHash + ' ' + - 'listening on port ' + swarm._port - )) - }) - return + self._onListening = function () { + self._client._onListening() } - self.swarms[swarm.infoHash] = swarm - - if (self.listening) { - process.nextTick(function () { - swarm._onListening(self.port) - }) + self._onError = function (err) { + self._client._destroy(err) } - debug('add swarm %s to tcp pool %s', swarm.infoHash, self.port) -} + self.server.on('connection', self._onConnectionBound) + self.server.on('listening', self._onListening) + self.server.on('error', self._onError) -/** - * Remove a swarm from this TCP pool. - * @param {Swarm} swarm - */ -TCPPool.prototype.removeSwarm = function (swarm) { - var self = this - debug('remove swarm %s from tcp pool %s', swarm.infoHash, self.port) - delete self.swarms[swarm.infoHash] + self.server.listen(client.torrentPort) } /** @@ -129,47 +51,28 @@ TCPPool.prototype.removeSwarm = function (swarm) { */ TCPPool.prototype.destroy = function (cb) { var self = this - debug('destroy tcp pool %s', self.port) + debug('destroy tcp pool') - self.listening = false + self.server.removeListener('connection', self._onConnectionBound) + self.server.removeListener('listening', self._onListening) + self.server.removeListener('error', self._onError) // Destroy all open connection objects so server can close gracefully without waiting // for connection timeout or remote peer to disconnect. - self.pendingConns.forEach(function (conn) { + self._pendingConns.forEach(function (conn) { + conn.on('error', noop) conn.destroy() }) - delete tcpPools[self.port] - try { self.server.close(cb) } catch (err) { if (cb) process.nextTick(cb) } -} - -TCPPool.prototype._onListening = function () { - var self = this - - // Fix for Docker Node image. Sometimes server.address() returns `null`. - // See issue: https://github.com/feross/bittorrent-swarm/pull/18 - var address = self.server.address() || { port: 0 } - var port = address.port - - debug('tcp pool listening on %s', port) - if (port !== self.port) { - // `port` was 0 when `listen` was called; update to the port that node selected - delete tcpPools[self.port] - self.port = port - tcpPools[self.port] = self - } - - self.listening = true - - for (var infoHash in self.swarms) { - self.swarms[infoHash]._onListening(self.port) - } + self.server = null + self._client = null + self._pendingConns = null } /** @@ -189,39 +92,34 @@ TCPPool.prototype._onConnection = function (conn) { return } - self.pendingConns.push(conn) - conn.once('close', removePendingConn) - - function removePendingConn () { - arrayRemove(self.pendingConns, self.pendingConns.indexOf(conn)) - } + self._pendingConns.push(conn) + conn.once('close', cleanupPending) var peer = Peer.createTCPIncomingPeer(conn) - peer.wire.once('handshake', function (infoHash, peerId) { - removePendingConn() - conn.removeListener('close', removePendingConn) + var wire = peer.wire + wire.once('handshake', onHandshake) - var swarm = self.swarms[infoHash] - if (swarm) { - peer.swarm = swarm - swarm._addIncomingPeer(peer) + function onHandshake (infoHash, peerId) { + cleanupPending() + + var torrent = self._client.get(infoHash) + if (torrent) { + peer.swarm = torrent + torrent._addIncomingPeer(peer) peer.onHandshake(infoHash, peerId) } else { - var err = new Error('Unexpected info hash ' + infoHash + ' from incoming peer ' + - peer.id + ': destroying peer') + var err = new Error( + 'Unexpected info hash ' + infoHash + ' from incoming peer ' + peer.id + ) peer.destroy(err) } - }) -} + } -TCPPool.prototype._onError = function (err) { - var self = this - self.destroy() - for (var infoHash in self.swarms) { - var swarm = self.swarms[infoHash] - self.removeSwarm(swarm) - swarm._onError(err) + function cleanupPending () { + conn.removeListener('close', cleanupPending) + wire.removeListener('handshake', onHandshake) + arrayRemove(self._pendingConns, self._pendingConns.indexOf(conn)) } } diff --git a/lib/torrent.js b/lib/torrent.js index 7681da05..f24b52ce 100644 --- a/lib/torrent.js +++ b/lib/torrent.js @@ -15,6 +15,7 @@ var FSChunkStore = require('fs-chunk-store') // browser: `memory-chunk-store` var ImmediateChunkStore = require('immediate-chunk-store') var inherits = require('inherits') var MultiStream = require('multistream') +var net = require('net') // browser exclude var os = require('os') // browser exclude var parallel = require('run-parallel') var parallelLimit = require('run-parallel-limit') @@ -25,14 +26,15 @@ var Piece = require('torrent-piece') var pump = require('pump') var randomIterate = require('random-iterate') var sha1 = require('simple-sha1') +var speedometer = require('speedometer') var uniq = require('uniq') var ut_metadata = require('ut_metadata') var ut_pex = require('ut_pex') // browser exclude var File = require('./file') +var Peer = require('./peer') var RarityMap = require('./rarity-map') var Server = require('./server') // browser exclude -var Swarm = require('./swarm') var MAX_BLOCK_LENGTH = 128 * 1024 var PIECE_TIMEOUT = 30000 @@ -47,17 +49,14 @@ var RECHOKE_OPTIMISTIC_DURATION = 2 // 30 seconds var FILESYSTEM_CONCURRENCY = 2 +var RECONNECT_WAIT = [ 1000, 5000, 15000 ] + var TMP = typeof pathExists.sync === 'function' ? path.join(pathExists.sync('/tmp') ? '/tmp' : os.tmpDir(), 'webtorrent') : '/tmp/webtorrent' inherits(Torrent, EventEmitter) -/** - * @param {string|Buffer|Object} torrentId - * @param {WebTorrent} client - * @param {Object=} opts - */ function Torrent (torrentId, client, opts) { EventEmitter.call(this) @@ -86,20 +85,34 @@ function Torrent (torrentId, client, opts) { this.ready = false this.destroyed = false + this.paused = false + this.done = false + this.metadata = null this.store = null - this.numBlockedPeers = 0 - this.files = null - this.done = false + this.files = [] + this.pieces = [] this._amInterested = false - this.pieces = [] this._selections = [] this._critical = [] + this.wires = [] // open wires (added *after* handshake) + + this._queue = [] // queue of outgoing tcp peers to connect to + this._peers = {} // connected peers (addr/peerId -> Peer) + this._peersLength = 0 // number of elements in `this._peers` (cache, for perf) + + // stats + this.received = 0 + this.uploaded = 0 + this._downloadSpeed = speedometer() + this._uploadSpeed = speedometer() + // for cleanup this._servers = [] + // TODO: remove this and expose a hook instead // optimization: don't recheck every file if it hasn't changed this._fileModtimes = opts.fileModtimes @@ -130,15 +143,7 @@ Object.defineProperty(Torrent.prototype, 'downloaded', { } }) -Object.defineProperty(Torrent.prototype, 'received', { - get: function () { return this.swarm ? this.swarm.downloaded : 0 } -}) - -Object.defineProperty(Torrent.prototype, 'uploaded', { - get: function () { return this.swarm ? this.swarm.uploaded : 0 } -}) - -// The number of missing pieces. Used to implement 'end game' mode. +// TODO: re-enable this. The number of missing pieces. Used to implement 'end game' mode. // Object.defineProperty(Storage.prototype, 'numMissing', { // get: function () { // var self = this @@ -151,11 +156,11 @@ Object.defineProperty(Torrent.prototype, 'uploaded', { // }) Object.defineProperty(Torrent.prototype, 'downloadSpeed', { - get: function () { return this.swarm ? this.swarm.downloadSpeed() : 0 } + get: function () { return this._downloadSpeed() } }) Object.defineProperty(Torrent.prototype, 'uploadSpeed', { - get: function () { return this.swarm ? this.swarm.uploadSpeed() : 0 } + get: function () { return this._uploadSpeed() } }) Object.defineProperty(Torrent.prototype, 'progress', { @@ -163,15 +168,13 @@ Object.defineProperty(Torrent.prototype, 'progress', { }) Object.defineProperty(Torrent.prototype, 'ratio', { - get: function () { return this.uploaded / (this.downloaded || 1) } + get: function () { return this.uploaded / (this.received || 1) } }) Object.defineProperty(Torrent.prototype, 'numPeers', { - get: function () { return this.swarm ? this.swarm.numPeers : 0 } + get: function () { return this.wires.length } }) -// TODO: remove this -// Torrent file as a blob url Object.defineProperty(Torrent.prototype, 'torrentFileBlobURL', { get: function () { if (typeof window === 'undefined') throw new Error('browser-only property') @@ -182,13 +185,36 @@ Object.defineProperty(Torrent.prototype, 'torrentFileBlobURL', { } }) +Object.defineProperty(Torrent.prototype, '_numQueued', { + get: function () { + return this._queue.length + (this._peersLength - this._numConns) + } +}) + +Object.defineProperty(Torrent.prototype, '_numConns', { + get: function () { + var self = this + var numConns = 0 + for (var id in self._peers) { + if (self._peers[id].connected) numConns += 1 + } + return numConns + } +}) + +// TODO: remove in v2 +Object.defineProperty(Torrent.prototype, 'swarm', { + get: function () { + console.log('WebTorrent: `torrent.swarm` is deprecated. Use `torrent` directly instead.') + } +}) + Torrent.prototype._onTorrentId = function (torrentId) { var self = this if (self.destroyed) return var parsedTorrent try { parsedTorrent = parseTorrent(torrentId) } catch (err) {} - if (parsedTorrent) { // Attempt to set infoHash property synchronously self.infoHash = parsedTorrent.infoHash @@ -201,7 +227,7 @@ Torrent.prototype._onTorrentId = function (torrentId) { // operation, i.e. http/https link, filesystem path, or Blob. parseTorrent.remote(torrentId, function (err, parsedTorrent) { if (self.destroyed) return - if (err) return self._onError(err) + if (err) return self._destroy(err) self._onParsedTorrent(parsedTorrent) }) } @@ -214,49 +240,26 @@ Torrent.prototype._onParsedTorrent = function (parsedTorrent) { self._processParsedTorrent(parsedTorrent) if (!self.infoHash) { - return self._onError(new Error('Malformed torrent data: No info hash')) + return self._destroy(new Error('Malformed torrent data: No info hash')) } if (!self.path) self.path = path.join(TMP, self.infoHash) - // create swarm - self.swarm = new Swarm(self.infoHash, self.client.peerId, { - handshake: { - dht: self.private ? false : !!self.client.dht - }, - maxConns: self.client.maxConns - }) - self.swarm.on('error', function (err) { - self._onError(err) - }) - self.swarm.on('wire', function (wire, addr) { - self._onWire(wire, addr) - }) - - self.swarm.on('download', function (downloaded) { - self.client._downloadSpeed(downloaded) // update overall client stats - self.client.emit('download', downloaded) - self.emit('download', downloaded) - }) - - self.swarm.on('upload', function (uploaded) { - self.client._uploadSpeed(uploaded) // update overall client stats - self.client.emit('upload', uploaded) - self.emit('upload', uploaded) - }) - - // listen for peers (note: in the browser, this is a no-op and callback is called on - // next tick) - self.swarm.listen(self.client.torrentPort, function () { - self._onSwarmListening() - }) - self._rechokeIntervalId = setInterval(function () { self._rechoke() }, RECHOKE_INTERVAL) if (self._rechokeIntervalId.unref) self._rechokeIntervalId.unref() self.emit('infoHash', self.infoHash) + if (self.destroyed) return // user might destroy torrent in `infoHash` event handler + + if (self.client.listening) { + self._onListening() + } else { + self.client.once('listening', function () { + self._onListening() + }) + } } Torrent.prototype._processParsedTorrent = function (parsedTorrent) { @@ -284,12 +287,9 @@ Torrent.prototype._processParsedTorrent = function (parsedTorrent) { this.torrentFile = parseTorrent.toTorrentFile(parsedTorrent) } -Torrent.prototype._onSwarmListening = function () { +Torrent.prototype._onListening = function () { var self = this - if (self.destroyed) return - - if (self.swarm.server) self.client.torrentPort = self.swarm.address().port - + if (self.discovery || self.destroyed) return var trackerOpts = { rtcConfig: self.client._rtcConfig, wrtc: self.client._wrtc, @@ -313,31 +313,38 @@ Torrent.prototype._onSwarmListening = function () { tracker: self.client.tracker && trackerOpts, port: self.client.torrentPort }) - self.discovery.on('error', function (err) { - self._onError(err) - }) - self.discovery.on('peer', function (peer) { + + self.discovery.on('error', onError) + self.discovery.on('peer', onPeer) + self.discovery.on('trackerAnnounce', onTrackerAnnounce) + self.discovery.on('dhtAnnounce', onDHTAnnounce) + self.discovery.on('warning', onWarning) + + function onError (err) { + self._destroy(err) + } + + function onPeer (peer) { // Don't create new outgoing TCP connections when torrent is done if (typeof peer === 'string' && self.done) return self.addPeer(peer) - }) + } - // expose discovery events - self.discovery.on('trackerAnnounce', function () { + function onTrackerAnnounce () { self.emit('trackerAnnounce') - }) - self.discovery.on('dhtAnnounce', function () { + } + + function onDHTAnnounce () { self.emit('dhtAnnounce') - }) - self.discovery.on('warning', function (err) { + } + + function onWarning (err) { self.emit('warning', err) - }) + } // if full metadata was included in initial torrent id, use it immediately. Otherwise, // wait for torrent-discovery to find peers and ut_metadata to get the metadata. if (self.info) self._onMetadata(self) - - self.emit('listening', self.client.torrentPort) } /** @@ -356,7 +363,7 @@ Torrent.prototype._onMetadata = function (metadata) { try { parsedTorrent = parseTorrent(metadata) } catch (err) { - return self._onError(err) + return self._destroy(err) } } @@ -368,11 +375,13 @@ Torrent.prototype._onMetadata = function (metadata) { self.addWebSeed(url) }) - self.rarityMap = new RarityMap(self.swarm, self.pieces.length) + self._rarityMap = new RarityMap(self) self.store = new ImmediateChunkStore( new self._store(self.pieceLength, { - torrent: self, + torrent: { + infoHash: self.infoHash + }, files: self.files.map(function (file) { return { path: path.join(self.path, file.path), @@ -403,7 +412,7 @@ Torrent.prototype._onMetadata = function (metadata) { self.bitfield = new BitField(self.pieces.length) - self.swarm.wires.forEach(function (wire) { + self.wires.forEach(function (wire) { // If we didn't have the metadata at the time ut_metadata was initialized for this // wire, we still want to make it available to the peer in case they request it. if (wire.ut_metadata) wire.ut_metadata.setMetadata(self.metadata) @@ -415,7 +424,7 @@ Torrent.prototype._onMetadata = function (metadata) { if (self._fileModtimes && self._store === FSChunkStore) { // don't verify if the files haven't been modified since we last checked self.getFileModtimes(function (err, fileModtimes) { - if (err) return self._onError(err) + if (err) return self._destroy(err) var unchanged = self.files.map(function (_, index) { return fileModtimes[index] === self._fileModtimes[index] @@ -440,6 +449,7 @@ Torrent.prototype._onMetadata = function (metadata) { } /* + * TODO: remove this * Gets the last modified time of every file on disk for this torrent. * Only valid in Node, not in the browser. */ @@ -464,6 +474,7 @@ Torrent.prototype._verifyPieces = function () { var self = this parallelLimit(self.pieces.map(function (_, index) { return function (cb) { + if (self.destroyed) return cb(new Error('torrent is destroyed')) self.store.get(index, function (err, buf) { if (err) return cb(null) // ignore error sha1(buf, function (hash) { @@ -479,7 +490,7 @@ Torrent.prototype._verifyPieces = function () { }) } }), FILESYSTEM_CONCURRENCY, function (err) { - if (err) return self._onError(err) + if (err) return self._destroy(err) self._debug('done verifying') self._onStore() }) @@ -492,7 +503,7 @@ Torrent.prototype._markVerified = function (index) { } /** - * Called when the metadata, swarm, and underlying chunk store is initialized. + * Called when the metadata, listening server, and underlying chunk store is initialized. */ Torrent.prototype._onStore = function () { var self = this @@ -512,49 +523,82 @@ Torrent.prototype._onStore = function () { self._updateSelections() } -/** - * Destroy and cleanup this torrent. - */ Torrent.prototype.destroy = function (cb) { + var self = this + self._destroy(null, cb) +} + +Torrent.prototype._destroy = function (err, cb) { var self = this if (self.destroyed) return self.destroyed = true self._debug('destroy') - self.client.remove(self) + self.client._remove(self) clearInterval(self._rechokeIntervalId) - var tasks = [] + if (self._rarityMap) { + self._rarityMap.destroy() + } + + for (var id in self._peers) { + self.removePeer(id) + } + + self.files.forEach(function (file) { + if (file instanceof File) file._destroy() + }) - self._servers.forEach(function (server) { - tasks.push(function (cb) { server.destroy(cb) }) + var tasks = self._servers.map(function (server) { + return function (cb) { + server.destroy(cb) + } }) - if (self.swarm) tasks.push(function (cb) { self.swarm.destroy(cb) }) - if (self.discovery) tasks.push(function (cb) { self.discovery.destroy(cb) }) - if (self.store) tasks.push(function (cb) { self.store.close(cb) }) + if (self.discovery) { + tasks.push(function (cb) { + self.discovery.destroy(cb) + }) + } + + if (self.store) { + tasks.push(function (cb) { + self.store.close(cb) + }) + } parallel(tasks, cb) + + if (err) { + // Torrent errors are emitted at `torrent.on('error')`. If there are no 'error' event + // handlers on the torrent instance, the error will be emitted at + // `client.on('error')`. This prevents crashing the user's program, but it makes it + // impossible to determine a client error versus a torrent error (where the client + // is still usable afterwards). Users are recommended for errors in both places + // to distinguish between the error types. + if (self.listenerCount('error') === 0) { + self.client.emit('error', err) + } else { + self.emit('error', err) + } + } + + self.emit('close') + + self.client = null + self.files = [] + self.discovery = null + self.store = null + self._rarityMap = null + self._peers = null + self._servers = null } -/** - * Add a peer to the swarm - * @param {string|SimplePeer} peer - * @return {boolean} true if peer was added, false if peer was blocked - */ Torrent.prototype.addPeer = function (peer) { var self = this if (self.destroyed) throw new Error('torrent is destroyed') - - function addPeer () { - var wasAdded = self.swarm.addPeer(peer) - if (wasAdded) { - self.emit('peer', peer) - } else { - self.emit('invalidPeer', peer) - } - } + if (!self.infoHash) throw new Error('addPeer() must not be called before the `infoHash` event') if (self.client.blocked) { var host @@ -563,6 +607,7 @@ Torrent.prototype.addPeer = function (peer) { try { parts = addrToIPPort(peer) } catch (e) { + self._debug('ignoring peer: invalid %s', peer) self.emit('invalidPeer', peer) return false } @@ -572,35 +617,127 @@ Torrent.prototype.addPeer = function (peer) { } if (host && self.client.blocked.contains(host)) { - self.numBlockedPeers += 1 // TODO: remove this. less api surface area + self._debug('ignoring peer: blocked %s', peer) + if (typeof peer !== 'string') peer.destroy() self.emit('blockedPeer', peer) return false } } - if (self.swarm) addPeer() - else self.once('listening', addPeer) - return true + var wasAdded = !!self._addPeer(peer) + if (wasAdded) { + self.emit('peer', peer) + } else { + self.emit('invalidPeer', peer) + } + return wasAdded +} + +Torrent.prototype._addPeer = function (peer) { + var self = this + if (self.destroyed) { + self._debug('ignoring peer: torrent is destroyed') + if (typeof peer !== 'string') peer.destroy() + return null + } + if (typeof peer === 'string' && !self._validAddr(peer)) { + self._debug('ignoring peer: invalid %s', peer) + return null + } + + var id = (peer && peer.id) || peer + if (self._peers[id]) { + self._debug('ignoring peer: duplicate (%s)', id) + if (typeof peer !== 'string') peer.destroy() + return null + } + + if (self.paused) { + self._debug('ignoring peer: torrent is paused') + if (typeof peer !== 'string') peer.destroy() + return null + } + + self._debug('add peer %s', id) + + var newPeer + if (typeof peer === 'string') { + // `peer` is an addr ("ip:port" string) + newPeer = Peer.createTCPOutgoingPeer(peer, self) + } else { + // `peer` is a WebRTC connection (simple-peer) + newPeer = Peer.createWebRTCPeer(peer, self) + } + + self._peers[newPeer.id] = newPeer + self._peersLength += 1 + + if (typeof peer === 'string') { + // `peer` is an addr ("ip:port" string) + self._queue.push(newPeer) + self._drain() + } + + return newPeer } -/** - * Add a web seed to the swarm - * @param {string} url web seed url - */ Torrent.prototype.addWebSeed = function (url) { if (this.destroyed) throw new Error('torrent is destroyed') + + if (!/^https?:\/\/.+/.test(url)) { + this._debug('ignoring invalid web seed %s', url) + this.emit('invalidPeer', url) + return + } + + if (this._peers[url]) { + this._debug('ignoring duplicate web seed %s', url) + this.emit('invalidPeer', url) + return + } + this._debug('add web seed %s', url) - this.swarm.addWebSeed(url, this) + + var newPeer = Peer.createWebSeedPeer(url, this) + this._peers[newPeer.id] = newPeer + this._peersLength += 1 + + this.emit('peer', url) } /** - * Select a range of pieces to prioritize. - * - * @param {number} start start piece index (inclusive) - * @param {number} end end piece index (inclusive) - * @param {number} priority priority associated with this selection - * @param {function} notify callback when selection is updated with new data + * Called whenever a new incoming TCP peer connects to this torrent swarm. Called with a + * peer that has already sent a handshake. */ +Torrent.prototype._addIncomingPeer = function (peer) { + var self = this + if (self.destroyed) return peer._destroy(new Error('torrent is destroyed')) + if (self.paused) return peer._destroy(new Error('torrent is paused')) + + this._debug('add incoming peer %s', peer.id) + + self._peers[peer.id] = peer + self._peersLength += 1 +} + +Torrent.prototype.removePeer = function (peer) { + var self = this + var id = (peer && peer.id) || peer + peer = self._peers[id] + + if (!peer) return + + this._debug('removePeer %s', id) + + delete self._peers[id] + self._peersLength -= 1 + + peer.destroy() + + // If torrent swarm was at capacity before, try to open a new connection now + self._drain() +} + Torrent.prototype.select = function (start, end, priority, notify) { var self = this if (self.destroyed) throw new Error('torrent is destroyed') @@ -627,13 +764,6 @@ Torrent.prototype.select = function (start, end, priority, notify) { self._updateSelections() } -/** - * Deprioritizes a range of previously selected pieces. - * - * @param {number} start start piece index (inclusive) - * @param {number} end end piece index (inclusive) - * @param {number} priority priority associated with the selection - */ Torrent.prototype.deselect = function (start, end, priority) { var self = this if (self.destroyed) throw new Error('torrent is destroyed') @@ -652,12 +782,6 @@ Torrent.prototype.deselect = function (start, end, priority) { self._updateSelections() } -/** - * Marks a range of pieces as critical priority to be downloaded ASAP. - * - * @param {number} start start piece index (inclusive) - * @param {number} end end piece index (inclusive) - */ Torrent.prototype.critical = function (start, end) { var self = this if (self.destroyed) throw new Error('torrent is destroyed') @@ -675,6 +799,26 @@ Torrent.prototype._onWire = function (wire, addr) { var self = this self._debug('got wire %s (%s)', wire._debugId, addr || 'Unknown') + wire.on('download', function (downloaded) { + if (self.destroyed) return + self.received += downloaded + self._downloadSpeed(downloaded) + self.client._downloadSpeed(downloaded) + self.emit('download', downloaded) + self.client.emit('download', downloaded) + }) + + wire.on('upload', function (uploaded) { + if (self.destroyed) return + self.uploaded += uploaded + self._uploadSpeed(uploaded) + self.client._uploadSpeed(uploaded) + self.emit('upload', uploaded) + self.client.emit('upload', uploaded) + }) + + self.wires.push(wire) + if (addr) { // Sometimes RTCPeerConnection.getStats() doesn't return an ip:port for peers var parts = addrToIPPort(addr) @@ -739,12 +883,12 @@ Torrent.prototype._onWire = function (wire, addr) { }) wire.ut_pex.on('dropped', function (peer) { - // the remote peer believes a given peer has been dropped from the swarm. - // if we're not currently connected to it, then remove it from the swarm's queue. - var peerObj = self.swarm._peers[peer] + // the remote peer believes a given peer has been dropped from the torrent swarm. + // if we're not currently connected to it, then remove it from the queue. + var peerObj = self._peers[peer] if (peerObj && !peerObj.connected) { self._debug('ut_pex: dropped peer: %s (from %s)', peer, addr) - self.swarm.removePeer(peer) + self.removePeer(peer) } }) @@ -760,8 +904,8 @@ Torrent.prototype._onWire = function (wire, addr) { if (self.metadata) { process.nextTick(function () { - // nextTick allows wire.handshake() to be called by `bittorrent-swarm` - // first, before we send any other messages on the wire + // This allows wire.handshake() to be called (by Peer.onHandshake) before any + // messages get sent on the wire self._onWireWithMetadata(wire) }) } @@ -774,7 +918,7 @@ Torrent.prototype._onWireWithMetadata = function (wire) { function onChokeTimeout () { if (self.destroyed || wire.destroyed) return - if (self.swarm.numQueued > 2 * (self.swarm.numConns - self.swarm.numPeers) && + if (self._numQueued > 2 * (self._numConns - self.numPeers) && wire.amInterested) { wire.destroy() } else { @@ -897,7 +1041,7 @@ Torrent.prototype._updateInterest = function () { var prev = self._amInterested self._amInterested = !!self._selections.length - self.swarm.wires.forEach(function (wire) { + self.wires.forEach(function (wire) { // TODO: only call wire.interested if the wire has at least one piece we need if (self._amInterested) wire.interested() else wire.uninterested() @@ -916,7 +1060,7 @@ Torrent.prototype._update = function () { if (self.destroyed) return // update wires in random order for better request distribution - var ite = randomIterate(self.swarm.wires) + var ite = randomIterate(self.wires) var wire while ((wire = ite())) { self._updateWire(wire) @@ -961,7 +1105,7 @@ Torrent.prototype._updateWire = function (wire) { var filter = genPieceFilterFunc(start, end, tried) while (tries < len) { - piece = self.rarityMap.getRarestPiece(filter) + piece = self._rarityMap.getRarestPiece(filter) if (piece < 0) break if (self._request(wire, piece, false)) return tried[piece] = true @@ -992,8 +1136,8 @@ Torrent.prototype._updateWire = function (wire) { var missing = self.pieces[index].missing - for (; ptr < self.swarm.wires.length; ptr++) { - var otherWire = self.swarm.wires[ptr] + for (; ptr < self.wires.length; ptr++) { + var otherWire = self.wires[ptr] var otherSpeed = otherWire.downloadSpeed() if (otherSpeed < SPEED_THRESHOLD) continue @@ -1036,7 +1180,7 @@ Torrent.prototype._updateWire = function (wire) { var filter = genPieceFilterFunc(start, end, tried, rank) while (tries < len) { - piece = self.rarityMap.getRarestPiece(filter) + piece = self._rarityMap.getRarestPiece(filter) if (piece < 0) break // request all non-reserved blocks in this piece @@ -1083,7 +1227,7 @@ Torrent.prototype._rechoke = function () { var peers = [] - self.swarm.wires.forEach(function (wire) { + self.wires.forEach(function (wire) { if (!wire.isSeeder && wire !== self._rechokeOptimisticWire) { peers.push({ wire: wire, @@ -1272,7 +1416,7 @@ Torrent.prototype._request = function (wire, index, hotswap) { self.store.put(index, buf) - self.swarm.wires.forEach(function (wire) { + self.wires.forEach(function (wire) { wire.have(index) }) @@ -1363,19 +1507,15 @@ Torrent.prototype.createServer = function (opts) { Torrent.prototype.pause = function () { if (this.destroyed) return - this.swarm.pause() + this._debug('pause') + this.paused = true } Torrent.prototype.resume = function () { if (this.destroyed) return - this.swarm.resume() -} - -Torrent.prototype._onError = function (err) { - var self = this - self._debug('torrent error: %s', err.message || err) - self.destroy() - self.emit('error', err) + this._debug('resume') + this.paused = false + this._drain() } Torrent.prototype._debug = function () { @@ -1384,6 +1524,84 @@ Torrent.prototype._debug = function () { debug.apply(null, args) } +/** + * Pop a peer off the FIFO queue and connect to it. When _drain() gets called, + * the queue will usually have only one peer in it, except when there are too + * many peers (over `this.maxConns`) in which case they will just sit in the + * queue until another connection closes. + */ +Torrent.prototype._drain = function () { + var self = this + this._debug('_drain numConns %s maxConns %s', self._numConns, self.client.maxConns) + if (typeof net.connect !== 'function' || self.destroyed || self.paused || + self._numConns >= self.client.maxConns) { + return + } + this._debug('drain (%s queued, %s/%s peers)', self._numQueued, self.numPeers, self.client.maxConns) + + var peer = self._queue.shift() + if (!peer) return // queue could be empty + + this._debug('tcp connect attempt to %s', peer.addr) + + var parts = addrToIPPort(peer.addr) + var opts = { + host: parts[0], + port: parts[1] + } + + var conn = peer.conn = net.connect(opts) + + conn.once('connect', function () { peer.onConnect() }) + conn.once('error', function (err) { peer.destroy(err) }) + peer.startConnectTimeout() + + // When connection closes, attempt reconnect after timeout (with exponential backoff) + conn.on('close', function () { + if (self.destroyed) return + + // TODO: If torrent is done, do not try to reconnect after a timeout + + if (peer.retries >= RECONNECT_WAIT.length) { + self._debug( + 'conn %s closed: will not re-add (max %s attempts)', + peer.addr, RECONNECT_WAIT.length + ) + return + } + + var ms = RECONNECT_WAIT[peer.retries] + self._debug( + 'conn %s closed: will re-add to queue in %sms (attempt %s)', + peer.addr, ms, peer.retries + 1 + ) + + var reconnectTimeout = setTimeout(function reconnectTimeout () { + var newPeer = self._addPeer(peer.addr) + if (newPeer) newPeer.retries = peer.retries + 1 + }, ms) + if (reconnectTimeout.unref) reconnectTimeout.unref() + }) +} + +/** + * Returns `true` if string is valid IPv4/6 address. + * @param {string} addr + * @return {boolean} + */ +Torrent.prototype._validAddr = function (addr) { + var parts + try { + parts = addrToIPPort(addr) + } catch (e) { + return false + } + var host = parts[0] + var port = parts[1] + return port > 0 && port < 65535 && + !(host === '127.0.0.1' && port === this.client.torrentPort) +} + function getBlockPipelineLength (wire, duration) { return 2 + Math.ceil(duration * wire.downloadSpeed() / Piece.BLOCK_LENGTH) } diff --git a/lib/webconn.js b/lib/webconn.js index ca6760fa..a45ca68d 100644 --- a/lib/webconn.js +++ b/lib/webconn.js @@ -12,21 +12,26 @@ inherits(WebConn, Wire) /** * Converts requests for torrent blocks into http range requests. * @param {string} url web seed url - * @param {Object} parsedTorrent + * @param {Object} torrent */ -function WebConn (url, parsedTorrent) { - var self = this +function WebConn (url, torrent) { Wire.call(this) - self.url = url - self.webPeerId = sha1.sync(url) - self.parsedTorrent = parsedTorrent + this.url = url + this.webPeerId = sha1.sync(url) + this._torrent = torrent + + this._init() +} +WebConn.prototype._init = function () { + var self = this self.setKeepAlive(true) - self.on('handshake', function (infoHash, peerId) { + self.once('handshake', function (infoHash, peerId) { + if (self.destroyed) return self.handshake(infoHash, self.webPeerId) - var numPieces = self.parsedTorrent.pieces.length + var numPieces = self._torrent.pieces.length var bitfield = new BitField(numPieces) for (var i = 0; i <= numPieces; i++) { bitfield.set(i, true) @@ -34,15 +39,14 @@ function WebConn (url, parsedTorrent) { self.bitfield(bitfield) }) - self.on('choke', function () { debug('choke') }) - self.on('unchoke', function () { debug('unchoke') }) - self.once('interested', function () { debug('interested') self.unchoke() }) - self.on('uninterested', function () { debug('uninterested') }) + self.on('uninterested', function () { debug('uninterested') }) + self.on('choke', function () { debug('choke') }) + self.on('unchoke', function () { debug('unchoke') }) self.on('bitfield', function () { debug('bitfield') }) self.on('request', function (pieceIndex, offset, length, callback) { @@ -53,14 +57,14 @@ function WebConn (url, parsedTorrent) { WebConn.prototype.httpRequest = function (pieceIndex, offset, length, cb) { var self = this - var pieceOffset = pieceIndex * self.parsedTorrent.pieceLength + var pieceOffset = pieceIndex * self._torrent.pieceLength var rangeStart = pieceOffset + offset /* offset within whole torrent */ var rangeEnd = rangeStart + length - 1 - // Web seed URL format - // For single-file torrents, you just make HTTP range requests directly to the web seed URL - // For multi-file torrents, you have to add the torrent folder and file name to the URL - var files = self.parsedTorrent.files + // Web seed URL format: + // For single-file torrents, make HTTP range requests directly to the web seed URL + // For multi-file torrents, add the torrent folder and file name to the URL + var files = self._torrent.files var requests if (files.length <= 1) { requests = [{ @@ -72,7 +76,9 @@ WebConn.prototype.httpRequest = function (pieceIndex, offset, length, cb) { var requestedFiles = files.filter(function (file) { return file.offset <= rangeEnd && (file.offset + file.length) > rangeStart }) - if (requestedFiles.length < 1) return cb(new Error('Could not find file corresponnding to web seed range request')) + if (requestedFiles.length < 1) { + return cb(new Error('Could not find file corresponnding to web seed range request')) + } requests = requestedFiles.map(function (requestedFile) { var fileEnd = requestedFile.offset + requestedFile.length - 1 @@ -133,3 +139,8 @@ WebConn.prototype.httpRequest = function (pieceIndex, offset, length, cb) { }) }) } + +WebConn.prototype.destroy = function () { + Wire.prototype.destroy.call(this) + this._torrent = null +} diff --git a/package.json b/package.json index 699c50c6..97dc8e4a 100644 --- a/package.json +++ b/package.json @@ -53,6 +53,7 @@ "render-media": "^2.0.0", "run-parallel": "^1.0.0", "run-parallel-limit": "^1.0.2", + "simple-concat": "^1.0.0", "simple-get": "^2.0.0", "simple-peer": "^6.0.0", "simple-sha1": "^2.0.0", diff --git a/test/duplicate.js b/test/duplicate.js new file mode 100644 index 00000000..a1896605 --- /dev/null +++ b/test/duplicate.js @@ -0,0 +1,176 @@ +var fixtures = require('webtorrent-fixtures') +var test = require('tape') +var WebTorrent = require('../') + +test('client.seed followed by duplicate client.add (sync)', function (t) { + t.plan(6) + + var client = new WebTorrent({ dht: false, tracker: false }) + client.on('error', function (err) { t.fail(err) }) + client.on('warning', function (err) { t.fail(err) }) + + client.seed(fixtures.leaves.content, { + name: 'Leaves of Grass by Walt Whitman.epub', + announce: [] + }, function (torrent1) { + t.equal(client.torrents.length, 1) + + var torrent2 = client.add(torrent1.infoHash) + + torrent2.once('ready', function () { + t.fail('torrent ready is not called') + }) + + torrent2.once('error', function (err) { + t.ok(err, 'got expected error on duplicate add') + t.equal(client.torrents.length, 1) + t.ok(torrent2.destroyed) + client.destroy(function (err) { + t.error(err, 'destroyed client') + t.equal(client.torrents.length, 0) + }) + }) + }) +}) + +test('client.seed followed by duplicate client.add (async)', function (t) { + t.plan(6) + + var client = new WebTorrent({ dht: false, tracker: false }) + client.on('error', function (err) { t.fail(err) }) + client.on('warning', function (err) { t.fail(err) }) + + client.seed(fixtures.leaves.content, { + name: 'Leaves of Grass by Walt Whitman.epub', + announce: [] + }, function (torrent1) { + t.equal(client.torrents.length, 1) + + var torrent2 = client.add(fixtures.leaves.torrentPath) + + torrent2.once('ready', function () { + t.fail('torrent ready is not called') + }) + + torrent2.once('error', function (err) { + t.ok(err, 'got expected error on duplicate add') + t.equal(client.torrents.length, 1) + t.ok(torrent2.destroyed) + client.destroy(function (err) { + t.error(err, 'destroyed client') + t.equal(client.torrents.length, 0) + }) + }) + }) +}) + +test('client.seed followed by two duplicate client.add calls (sync)', function (t) { + t.plan(9) + + var client = new WebTorrent({ dht: false, tracker: false }) + client.on('error', function (err) { t.fail(err) }) + client.on('warning', function (err) { t.fail(err) }) + + client.seed(fixtures.leaves.content, { + name: 'Leaves of Grass by Walt Whitman.epub', + announce: [] + }, function (torrent1) { + t.equal(client.torrents.length, 1) + + var torrent2 = client.add(torrent1.infoHash) + + torrent2.once('ready', function () { + t.fail('torrent ready is not called') + }) + + torrent2.once('error', function (err) { + t.ok(err, 'got expected error on duplicate add') + t.equal(client.torrents.length, 1) + t.ok(torrent2.destroyed) + + var torrent3 = client.add(torrent1.infoHash) + + torrent3.once('ready', function () { + t.fail('torrent ready is not called') + }) + + torrent3.once('error', function (err) { + t.ok(err, 'got expected error on duplicate add') + t.equal(client.torrents.length, 1) + t.ok(torrent3.destroyed) + client.destroy(function (err) { + t.error(err, 'destroyed client') + t.equal(client.torrents.length, 0) + }) + }) + }) + }) +}) + +test('client.seed followed by two duplicate client.add calls (async)', function (t) { + t.plan(9) + + var client = new WebTorrent({ dht: false, tracker: false }) + client.on('error', function (err) { t.fail(err) }) + client.on('warning', function (err) { t.fail(err) }) + + client.seed(fixtures.leaves.content, { + name: 'Leaves of Grass by Walt Whitman.epub', + announce: [] + }, function (torrent1) { + t.equal(client.torrents.length, 1) + + var torrent2 = client.add(fixtures.leaves.torrentPath) + + torrent2.once('ready', function () { + t.fail('torrent ready is not called') + }) + + torrent2.once('error', function (err) { + t.ok(err, 'got expected error on duplicate add') + t.equal(client.torrents.length, 1) + t.ok(torrent2.destroyed) + + var torrent3 = client.add(fixtures.leaves.torrentPath) + + torrent3.once('ready', function () { + t.fail('torrent ready is not called') + }) + + torrent3.once('error', function (err) { + t.ok(err, 'got expected error on duplicate add') + t.equal(client.torrents.length, 1) + t.ok(torrent3.destroyed) + client.destroy(function (err) { + t.error(err, 'destroyed client') + t.equal(client.torrents.length, 0) + }) + }) + }) + }) +}) + +test('successive sync client.add, client.remove, client.add, client.remove (sync)', function (t) { + t.plan(3) + + var client = new WebTorrent({ dht: false, tracker: false }) + client.on('error', function (err) { t.fail(err) }) + client.on('warning', function (err) { t.fail(err) }) + + client.seed(fixtures.leaves.content, { + name: 'Leaves of Grass by Walt Whitman.epub', + announce: [] + }, function (torrent1) { + t.equal(client.torrents.length, 1) + + client.add(torrent1.infoHash) + client.remove(torrent1.infoHash) + client.add(torrent1.infoHash) + client.remove(torrent1.infoHash, function () { + client.destroy(function (err) { + t.error(err, 'destroyed client') + t.equal(client.torrents.length, 0) + }) + }) + }) +}) diff --git a/test/node/download-dht-magnet.js b/test/node/download-dht-magnet.js index 028d6a62..760bb269 100644 --- a/test/node/download-dht-magnet.js +++ b/test/node/download-dht-magnet.js @@ -7,7 +7,7 @@ var test = require('tape') var WebTorrent = require('../../') test('Download using DHT (via magnet uri)', function (t) { - t.plan(11) + t.plan(12) var dhtServer = new DHT({ bootstrap: false }) @@ -27,6 +27,10 @@ test('Download using DHT (via magnet uri)', function (t) { dht: { bootstrap: '127.0.0.1:' + dhtServer.address().port, host: networkAddress.ipv4() } }) + client1.dht.on('listening', function () { + t.equal(client1.dhtPort, client1.dht.address().port) + }) + client1.on('error', function (err) { t.fail(err) }) client1.on('warning', function (err) { t.fail(err) }) diff --git a/test/node/download-dht-torrent.js b/test/node/download-dht-torrent.js index 5f88da5b..20c7b0c4 100644 --- a/test/node/download-dht-torrent.js +++ b/test/node/download-dht-torrent.js @@ -6,7 +6,7 @@ var test = require('tape') var WebTorrent = require('../../') test('Download using DHT (via .torrent file)', function (t) { - t.plan(8) + t.plan(9) var dhtServer = new DHT({ bootstrap: false }) @@ -26,6 +26,10 @@ test('Download using DHT (via .torrent file)', function (t) { dht: { bootstrap: '127.0.0.1:' + dhtServer.address().port } }) + client1.dht.on('listening', function () { + t.equal(client1.dhtPort, client1.dht.address().port) + }) + client1.on('error', function (err) { t.fail(err) }) client1.on('warning', function (err) { t.fail(err) }) diff --git a/test/node/download-private-dht.js b/test/node/download-private-dht.js index 023f7ff2..0f1a9844 100644 --- a/test/node/download-private-dht.js +++ b/test/node/download-private-dht.js @@ -35,7 +35,7 @@ test('private torrent should not use DHT', function (t) { }) client.on('torrent', function () { - if (!torrent.discovery.dht && !torrent.swarm.handshakeOpts.dht) { + if (!torrent.discovery.dht) { t.pass('dht is disabled for this torrent') cb(null) } diff --git a/test/node/download-webseed-magnet.js b/test/node/download-webseed-magnet.js index 47a10f62..e0d39bfb 100644 --- a/test/node/download-webseed-magnet.js +++ b/test/node/download-webseed-magnet.js @@ -51,12 +51,12 @@ test('Download using webseed (via magnet uri)', function (t) { maybeDone() }) - client1.on('listening', function () { + var torrent = client1.add(fixtures.leaves.parsedTorrent) + + torrent.on('infoHash', function () { gotListening = true maybeDone() }) - - client1.add(fixtures.leaves.parsedTorrent) }, function (cb) { @@ -91,11 +91,11 @@ test('Download using webseed (via magnet uri)', function (t) { } }) - client2.on('listening', function (port, torrent) { + var torrent = client2.add(magnetURI) + + torrent.on('infoHash', function () { torrent.addPeer('127.0.0.1:' + client1.address().port) }) - - client2.add(magnetURI) } ], function (err) { t.error(err) diff --git a/test/node/duplicates.js b/test/node/duplicates.js deleted file mode 100644 index 8689b859..00000000 --- a/test/node/duplicates.js +++ /dev/null @@ -1,80 +0,0 @@ -var fixtures = require('webtorrent-fixtures') -var test = require('tape') -var WebTorrent = require('../../') - -test('client.seed followed by duplicate client.add', function (t) { - t.plan(5) - - var client = new WebTorrent({ dht: false, tracker: false }) - client.on('error', function (err) { t.fail(err) }) - client.on('warning', function (err) { t.fail(err) }) - - client.seed(fixtures.leaves.content, { - name: 'Leaves of Grass by Walt Whitman.epub' - }, function (torrent1) { - t.equal(client.torrents.length, 1) - - client.add(torrent1.infoHash, function (torrent2) { - t.equal(torrent1.infoHash, torrent2.infoHash) - t.equal(client.torrents.length, 1) - - client.destroy(function (err) { - t.error(err, 'destroyed client') - t.equal(client.torrents.length, 0) - }) - }) - }) -}) - -test('client.seed followed by two duplicate client.add calls', function (t) { - t.plan(7) - - var client = new WebTorrent({ dht: false, tracker: false }) - client.on('error', function (err) { t.fail(err) }) - client.on('warning', function (err) { t.fail(err) }) - - client.seed(fixtures.leaves.content, { - name: 'Leaves of Grass by Walt Whitman.epub' - }, function (torrent1) { - t.equal(client.torrents.length, 1) - - client.add(torrent1.infoHash, function (torrent2) { - t.equal(torrent1.infoHash, torrent2.infoHash) - t.equal(client.torrents.length, 1) - - client.add(torrent1.infoHash, function (torrent2) { - t.equal(torrent1.infoHash, torrent2.infoHash) - t.equal(client.torrents.length, 1) - - client.destroy(function (err) { - t.error(err, 'destroyed client') - t.equal(client.torrents.length, 0) - }) - }) - }) - }) -}) - -test('successive sync client.add, client.remove, client.add, client.remove', function (t) { - t.plan(3) - - var client = new WebTorrent({ dht: false, tracker: false }) - client.on('error', function (err) { t.fail(err) }) - client.on('warning', function (err) { t.fail(err) }) - - client.seed(fixtures.leaves.content, { - name: 'Leaves of Grass by Walt Whitman.epub' - }, function (torrent1) { - t.equal(client.torrents.length, 1) - - client.add(torrent1.infoHash) - client.remove(torrent1.infoHash) - client.add(torrent1.infoHash) - client.remove(torrent1.infoHash, function () { - client.destroy(function (err) { - t.error(err, 'destroyed client') - t.equal(client.torrents.length, 0) - }) - }) - }) -}) diff --git a/test/node/extensions.js b/test/node/extensions.js index 5b689505..9dccd1d3 100644 --- a/test/node/extensions.js +++ b/test/node/extensions.js @@ -49,7 +49,7 @@ test('extension support', function (t) { t.pass('client2 onWire') wire.use(Extension) }) - client2.on('listening', function () { + torrent2.on('infoHash', function () { torrent2.addPeer('127.0.0.1:' + client1.address().port) }) }) diff --git a/test/node/metadata.js b/test/node/metadata.js index 6a40f251..2b3fe567 100644 --- a/test/node/metadata.js +++ b/test/node/metadata.js @@ -26,9 +26,9 @@ test('ut_metadata transfer', function (t) { t.deepEqual(torrent1.info, fixtures.leaves.parsedTorrent.info) // client2 starts with infohash - client2.add(fixtures.leaves.parsedTorrent.infoHash) + var torrent2 = client2.add(fixtures.leaves.parsedTorrent.infoHash) - client2.on('listening', function (port, torrent2) { + torrent2.on('infoHash', function () { // manually add the peer torrent2.addPeer('127.0.0.1:' + client1.address().port) diff --git a/test/node/swarm-basic.js b/test/node/swarm-basic.js index 64fdee4a..f81c3a45 100644 --- a/test/node/swarm-basic.js +++ b/test/node/swarm-basic.js @@ -1,60 +1,60 @@ -var hat = require('hat') -var Swarm = require('../../lib/swarm') -var test = require('tape') - -var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36' -var infoHash2 = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa37' -var peerId = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') -var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') - -test('two swarms listen on same port', function (t) { - t.plan(2) - - var swarm1 = new Swarm(infoHash, peerId) - swarm1.listen(0, function () { - var port = swarm1.address().port - t.ok(typeof port === 'number' && port !== 0) - - var swarm2 = new Swarm(infoHash2, peerId) - swarm2.listen(port, function () { - t.equal(swarm2.address().port, port, 'listened on requested port') - swarm1.destroy() - swarm2.destroy() - }) - }) -}) - -test('swarm join', function (t) { - t.plan(10) - - var swarm1 = new Swarm(infoHash, peerId) - swarm1.listen(0, function () { - var swarm2 = new Swarm(infoHash, peerId2) - - t.equal(swarm1.wires.length, 0) - t.equal(swarm2.wires.length, 0) - - swarm2.addPeer('127.0.0.1:' + swarm1.address().port) - - swarm1.on('wire', function (wire, addr) { - t.ok(wire, 'Peer join our swarm via listening port') - - t.equal(swarm1.wires.length, 1) - t.ok(/127\.0\.0\.1:\d{1,5}/.test(addr)) - t.equal(wire.peerId.toString('hex'), peerId2) - }) - - swarm2.on('wire', function (wire, addr) { - t.ok(wire, 'Joined swarm, got wire') - - t.equal(swarm2.wires.length, 1) - t.ok(/127\.0\.0\.1:\d{1,5}/.test(addr)) - t.equal(wire.peerId.toString('hex'), peerId) - }) - - t.on('end', function () { - swarm1.destroy() - swarm2.destroy() - }) - }) -}) +// var hat = require('hat') +// var Swarm = require('../../lib/swarm') +// var test = require('tape') + +// var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36' +// var infoHash2 = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa37' +// var peerId = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') +// var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') + +// test('two swarms listen on same port', function (t) { +// t.plan(2) + +// var swarm1 = new Swarm(infoHash, peerId) +// swarm1.listen(0, function () { +// var port = swarm1.address().port +// t.ok(typeof port === 'number' && port !== 0) + +// var swarm2 = new Swarm(infoHash2, peerId) +// swarm2.listen(port, function () { +// t.equal(swarm2.address().port, port, 'listened on requested port') +// swarm1.destroy() +// swarm2.destroy() +// }) +// }) +// }) + +// test('swarm join', function (t) { +// t.plan(10) + +// var swarm1 = new Swarm(infoHash, peerId) +// swarm1.listen(0, function () { +// var swarm2 = new Swarm(infoHash, peerId2) + +// t.equal(swarm1.wires.length, 0) +// t.equal(swarm2.wires.length, 0) + +// swarm2.addPeer('127.0.0.1:' + swarm1.address().port) + +// swarm1.on('wire', function (wire, addr) { +// t.ok(wire, 'Peer join our swarm via listening port') + +// t.equal(swarm1.wires.length, 1) +// t.ok(/127\.0\.0\.1:\d{1,5}/.test(addr)) +// t.equal(wire.peerId.toString('hex'), peerId2) +// }) + +// swarm2.on('wire', function (wire, addr) { +// t.ok(wire, 'Joined swarm, got wire') + +// t.equal(swarm2.wires.length, 1) +// t.ok(/127\.0\.0\.1:\d{1,5}/.test(addr)) +// t.equal(wire.peerId.toString('hex'), peerId) +// }) + +// t.on('end', function () { +// swarm1.destroy() +// swarm2.destroy() +// }) +// }) +// }) diff --git a/test/node/swarm-reconnect.js b/test/node/swarm-reconnect.js index 4fad7d03..abf3cc2c 100644 --- a/test/node/swarm-reconnect.js +++ b/test/node/swarm-reconnect.js @@ -1,62 +1,62 @@ -var hat = require('hat') -var Swarm = require('../../lib/swarm') -var test = require('tape') - -var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36' -var peerId1 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') -var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') - -test('reconnect when peer disconnects', function (t) { - t.plan(10) - - var swarm1 = new Swarm(infoHash, peerId1) - swarm1.listen(0, function () { - var swarm2 = new Swarm(infoHash, peerId2) - - var time1 = 0 - swarm1.on('wire', function (wire) { - if (time1 === 0) { - t.ok(wire, 'Peer joined via listening port') - t.equal(swarm1.wires.length, 1) - - // at some point in future, end wire - setTimeout(function () { - wire.destroy() - }, 100) - - // ...and prevent reconnect - swarm1._drain = function () {} - } else if (time1 === 1) { - t.ok(wire, 'Remote peer reconnected') - t.equal(swarm1.wires.length, 1) - } else { - throw new Error('too many wire events (1)') - } - time1 += 1 - }) - - var time2 = 0 - swarm2.on('wire', function (wire) { - if (time2 === 0) { - t.ok(wire, 'Joined swarm, got wire') - t.equal(swarm2.wires.length, 1) - - wire.on('end', function () { - t.pass('Wire ended by remote peer') - t.equal(swarm1.wires.length, 0) - }) - } else if (time2 === 1) { - t.ok(wire, 'Reconnected to remote peer') - t.equal(swarm2.wires.length, 1) - - swarm1.destroy() - swarm2.destroy() - } else { - throw new Error('too many wire events (2)') - } - time2 += 1 - }) - - swarm2.addPeer('127.0.0.1:' + swarm1.address().port) - }) -}) +// var hat = require('hat') +// var Swarm = require('../../lib/swarm') +// var test = require('tape') + +// var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36' +// var peerId1 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') +// var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') + +// test('reconnect when peer disconnects', function (t) { +// t.plan(10) + +// var swarm1 = new Swarm(infoHash, peerId1) +// swarm1.listen(0, function () { +// var swarm2 = new Swarm(infoHash, peerId2) + +// var time1 = 0 +// swarm1.on('wire', function (wire) { +// if (time1 === 0) { +// t.ok(wire, 'Peer joined via listening port') +// t.equal(swarm1.wires.length, 1) + +// // at some point in future, end wire +// setTimeout(function () { +// wire.destroy() +// }, 100) + +// // ...and prevent reconnect +// swarm1._drain = function () {} +// } else if (time1 === 1) { +// t.ok(wire, 'Remote peer reconnected') +// t.equal(swarm1.wires.length, 1) +// } else { +// throw new Error('too many wire events (1)') +// } +// time1 += 1 +// }) + +// var time2 = 0 +// swarm2.on('wire', function (wire) { +// if (time2 === 0) { +// t.ok(wire, 'Joined swarm, got wire') +// t.equal(swarm2.wires.length, 1) + +// wire.on('end', function () { +// t.pass('Wire ended by remote peer') +// t.equal(swarm1.wires.length, 0) +// }) +// } else if (time2 === 1) { +// t.ok(wire, 'Reconnected to remote peer') +// t.equal(swarm2.wires.length, 1) + +// swarm1.destroy() +// swarm2.destroy() +// } else { +// throw new Error('too many wire events (2)') +// } +// time2 += 1 +// }) + +// swarm2.addPeer('127.0.0.1:' + swarm1.address().port) +// }) +// }) diff --git a/test/node/swarm-timeout.js b/test/node/swarm-timeout.js index 43414655..50c98226 100644 --- a/test/node/swarm-timeout.js +++ b/test/node/swarm-timeout.js @@ -1,50 +1,50 @@ -var hat = require('hat') -var Swarm = require('../../lib/swarm') -var test = require('tape') - -var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36' -var peerId1 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') -var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') - -test('timeout if no handshake in 25 seconds', function (t) { - t.plan(4) - - var swarm1 = new Swarm(infoHash, peerId1) - - var _addIncomingPeer = swarm1._addIncomingPeer - swarm1._addIncomingPeer = function (peer) { - // Nuke the handshake function on swarm1's peer to test swarm2's - // handshake timeout code - peer.wire.handshake = function () {} - _addIncomingPeer.call(swarm1, peer) - } - - swarm1.listen(0, function () { - var swarm2 = new Swarm(infoHash, peerId2) - - var numWires = 0 - swarm1.on('wire', function (wire) { - numWires += 1 - if (numWires === 1) { - t.ok(wire, 'Got wire via listening port') - t.equal(swarm1.wires.length, 1) - - // swarm2 should never get a wire since swarm1 refuses to send it a - // handshake - t.equal(swarm2.wires.length, 0) - } else if (numWires === 2) { - t.pass('swarm2 reconnected after timeout') - swarm1.destroy() - swarm2.destroy() - } else { - t.fail('got wire after destroy') - } - }) - - swarm2.on('wire', function (wire) { - t.fail('Should not get a wire because peer did not handshake') - }) - - swarm2.addPeer('127.0.0.1:' + swarm1.address().port) - }) -}) +// var hat = require('hat') +// var Swarm = require('../../lib/swarm') +// var test = require('tape') + +// var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36' +// var peerId1 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') +// var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') + +// test('timeout if no handshake in 25 seconds', function (t) { +// t.plan(4) + +// var swarm1 = new Swarm(infoHash, peerId1) + +// var _addIncomingPeer = swarm1._addIncomingPeer +// swarm1._addIncomingPeer = function (peer) { +// // Nuke the handshake function on swarm1's peer to test swarm2's +// // handshake timeout code +// peer.wire.handshake = function () {} +// _addIncomingPeer.call(swarm1, peer) +// } + +// swarm1.listen(0, function () { +// var swarm2 = new Swarm(infoHash, peerId2) + +// var numWires = 0 +// swarm1.on('wire', function (wire) { +// numWires += 1 +// if (numWires === 1) { +// t.ok(wire, 'Got wire via listening port') +// t.equal(swarm1.wires.length, 1) + +// // swarm2 should never get a wire since swarm1 refuses to send it a +// // handshake +// t.equal(swarm2.wires.length, 0) +// } else if (numWires === 2) { +// t.pass('swarm2 reconnected after timeout') +// swarm1.destroy() +// swarm2.destroy() +// } else { +// t.fail('got wire after destroy') +// } +// }) + +// swarm2.on('wire', function (wire) { +// t.fail('Should not get a wire because peer did not handshake') +// }) + +// swarm2.addPeer('127.0.0.1:' + swarm1.address().port) +// }) +// }) diff --git a/test/rarity-map.js b/test/rarity-map.js index c516464c..f7da99bf 100644 --- a/test/rarity-map.js +++ b/test/rarity-map.js @@ -1,113 +1,128 @@ -var BitField = require('bitfield') -var EventEmitter = require('events').EventEmitter +var extend = require('xtend') +var fixtures = require('webtorrent-fixtures') var hat = require('hat') -var RarityMap = require('../lib/rarity-map') -var Swarm = require('../lib/swarm') var test = require('tape') - -var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36' -var peerId1 = new Buffer('-WW0001-' + hat(48)) +var Torrent = require('../lib/torrent') +var Wire = require('bittorrent-protocol') test('Rarity map usage', function (t) { t.plan(16) - var swarm = new Swarm(infoHash, peerId1) var numPieces = 4 - swarm.wires = [ new EventEmitter(), new EventEmitter() ] - swarm.wires.forEach(function (wire) { - wire.peerPieces = new BitField(numPieces) + var torrentId = extend(fixtures.numbers.parsedTorrent, { + pieces: Array(numPieces) }) - var rarityMap = new RarityMap(swarm, numPieces) - - function validateInitial () { - // note that getRarestPiece will return a random piece since they're all equal - // so repeat the test several times to reasonably ensure its correctness. - var piece = rarityMap.getRarestPiece() - t.ok(piece >= 0 && piece < numPieces) - - piece = rarityMap.getRarestPiece() - t.ok(piece >= 0 && piece < numPieces) - - piece = rarityMap.getRarestPiece() - t.ok(piece >= 0 && piece < numPieces) - - piece = rarityMap.getRarestPiece() - t.ok(piece >= 0 && piece < numPieces) + var client = { + listening: true, + peerId: hat(160), + torrentPort: 6889, + dht: false, + tracker: false, + _remove: function () {} } + var opts = {} + var torrent = new Torrent(torrentId, client, opts) + torrent.on('metadata', function () { + torrent._onWire(new Wire()) + torrent._onWire(new Wire()) - // test initial / empty case - validateInitial() + var rarityMap = torrent._rarityMap - rarityMap.recalculate() - - // test initial / empty case after recalc - validateInitial() - - function setPiece (wire, index) { - wire.peerPieces.set(index) - wire.emit('have', index) - } + // test initial / empty case + validateInitial() - setPiece(swarm.wires[0], 0) - setPiece(swarm.wires[1], 0) + rarityMap.recalculate() - setPiece(swarm.wires[0], 1) - setPiece(swarm.wires[1], 3) + // test initial / empty case after recalc + validateInitial() - // test rarest piece after setting pieces and handling 'have' events - var piece = rarityMap.getRarestPiece() - t.equal(piece, 2) + setPiece(torrent.wires[0], 0) + setPiece(torrent.wires[1], 0) - rarityMap.recalculate() + setPiece(torrent.wires[0], 1) + setPiece(torrent.wires[1], 3) - // test rarest piece after recalc to ensure its the same - piece = rarityMap.getRarestPiece() - t.equal(piece, 2) - - function addWire () { - var wire = new EventEmitter() - wire.peerPieces = new BitField(numPieces) - wire.peerPieces.set(1) - wire.peerPieces.set(2) - swarm.wires.push(wire) - swarm.emit('wire', wire) - } + // test rarest piece after setting pieces and handling 'have' events + var piece = rarityMap.getRarestPiece() + t.equal(piece, 2) - addWire() - addWire() + rarityMap.recalculate() - // test rarest piece after adding wires - piece = rarityMap.getRarestPiece() - t.equal(piece, 3) + // test rarest piece after recalc to ensure its the same + piece = rarityMap.getRarestPiece() + t.equal(piece, 2) - rarityMap.recalculate() + addWire() + addWire() - // test rarest piece after adding wires and recalc - piece = rarityMap.getRarestPiece() - t.equal(piece, 3) + // test rarest piece after adding wires + piece = rarityMap.getRarestPiece() + t.equal(piece, 3) - function removeWire (index) { - var wire = swarm.wires.splice(index, 1)[0] - wire.emit('close') - } + rarityMap.recalculate() - removeWire(3) - removeWire(1) + // test rarest piece after adding wires and recalc + piece = rarityMap.getRarestPiece() + t.equal(piece, 3) - // test rarest piece after removing wires - piece = rarityMap.getRarestPiece() - t.equal(piece, 3) + removeWire(3) + removeWire(1) - rarityMap.recalculate() + // test rarest piece after removing wires + piece = rarityMap.getRarestPiece() + t.equal(piece, 3) - // test rarest piece after removing wires and recalc - piece = rarityMap.getRarestPiece() - t.equal(piece, 3) + rarityMap.recalculate() - // test piece filter func - piece = rarityMap.getRarestPiece(function (i) { return i <= 1 }) - t.equal(piece, 0) + // test rarest piece after removing wires and recalc + piece = rarityMap.getRarestPiece() + t.equal(piece, 3) + + // test piece filter func + piece = rarityMap.getRarestPiece(function (i) { return i <= 1 }) + t.equal(piece, 0) + + piece = rarityMap.getRarestPiece(function (i) { return i === 1 || i === 2 }) + t.equal(piece, 2) + + function validateInitial () { + // note that getRarestPiece will return a random piece since they're all equal + // so repeat the test several times to reasonably ensure its correctness. + var piece = rarityMap.getRarestPiece() + t.ok(piece >= 0 && piece < numPieces) + + piece = rarityMap.getRarestPiece() + t.ok(piece >= 0 && piece < numPieces) + + piece = rarityMap.getRarestPiece() + t.ok(piece >= 0 && piece < numPieces) + + piece = rarityMap.getRarestPiece() + t.ok(piece >= 0 && piece < numPieces) + } + + function setPiece (wire, index) { + wire.peerPieces.set(index) + wire.emit('have', index) + } + + function addWire () { + var wire = new Wire() + wire.peerPieces.set(1) + wire.peerPieces.set(2) + torrent._onWire(wire) + } + + function removeWire (index) { + var wire = torrent.wires.splice(index, 1)[0] + wire.destroy() + } + }) - piece = rarityMap.getRarestPiece(function (i) { return i === 1 || i === 2 }) - t.equal(piece, 2) + t.on('end', function () { + torrent.wires.forEach(function (wire) { + wire.destroy() + }) + torrent.destroy() + }) }) diff --git a/test/swarm.js b/test/swarm.js index 9662d4d3..023455b3 100644 --- a/test/swarm.js +++ b/test/swarm.js @@ -1,82 +1,82 @@ -var hat = require('hat') -var Swarm = require('../lib/swarm') -var test = require('tape') +// var hat = require('hat') +// var Swarm = require('../lib/swarm') +// var test = require('tape') -var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36' -var infoHash2 = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa37' -var peerId = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') -var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') +// var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36' +// var infoHash2 = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa37' +// var peerId = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') +// var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') -test('create swarm, check invariants', function (t) { - var swarm = new Swarm(infoHash, peerId) +// test('create swarm, check invariants', function (t) { +// var swarm = new Swarm(infoHash, peerId) - t.equal(swarm.infoHash.toString('hex'), infoHash) - t.equal(swarm.peerId.toString('hex'), peerId) - t.equal(swarm.downloaded, 0) - t.equal(swarm.uploaded, 0) - t.ok(Array.isArray(swarm.wires)) - t.equal(swarm.wires.length, 0) - t.end() -}) +// t.equal(swarm.infoHash.toString('hex'), infoHash) +// t.equal(swarm.peerId.toString('hex'), peerId) +// t.equal(swarm.downloaded, 0) +// t.equal(swarm.uploaded, 0) +// t.ok(Array.isArray(swarm.wires)) +// t.equal(swarm.wires.length, 0) +// t.end() +// }) -test('swarm listen(0) selects free port', function (t) { - t.plan(2) +// test('swarm listen(0) selects free port', function (t) { +// t.plan(2) - var swarm = new Swarm(infoHash, peerId) - swarm.listen(0) - swarm.on('listening', function () { - var port = swarm.address().port - t.equal(typeof port, 'number', 'port is a number') - if (process.browser) { - t.equal(port, 0, 'port number is 0') - } else { - t.ok(port > 0 && port < 65535, 'valid port number') - } - swarm.destroy() - }) -}) +// var swarm = new Swarm(infoHash, peerId) +// swarm.listen(0) +// swarm.on('listening', function () { +// var port = swarm.address().port +// t.equal(typeof port, 'number', 'port is a number') +// if (process.browser) { +// t.equal(port, 0, 'port number is 0') +// } else { +// t.ok(port > 0 && port < 65535, 'valid port number') +// } +// swarm.destroy() +// }) +// }) -test('two swarms listen on same port (implicit)', function (t) { - t.plan(5) +// test('two swarms listen on same port (implicit)', function (t) { +// t.plan(5) - // When no port is specified and listen is called twice, they should get assigned the same port. +// // When no port is specified and listen is called twice, they should get assigned the same port. - var swarm1 = new Swarm(infoHash, peerId) - var swarm2 = new Swarm(infoHash2, peerId2) +// var swarm1 = new Swarm(infoHash, peerId) +// var swarm2 = new Swarm(infoHash2, peerId2) - var swarm1Port - var swarm2Port +// var swarm1Port +// var swarm2Port - function maybeDone () { - if (swarm1.listening && swarm2.listening) { - t.equal(swarm1Port, swarm2Port, 'swarms were given same port') +// function maybeDone () { +// if (swarm1.listening && swarm2.listening) { +// t.equal(swarm1Port, swarm2Port, 'swarms were given same port') - t.equal(typeof swarm1Port, 'number', 'port is a number') - if (process.browser) { - t.equal(swarm1Port, 0, 'port number is 0') - } else { - t.ok(swarm1Port > 0 && swarm1Port < 65535, 'valid port number') - } +// t.equal(typeof swarm1Port, 'number', 'port is a number') +// if (process.browser) { +// t.equal(swarm1Port, 0, 'port number is 0') +// } else { +// t.ok(swarm1Port > 0 && swarm1Port < 65535, 'valid port number') +// } - t.equal(typeof swarm2Port, 'number', 'port is a number') - if (process.browser) { - t.equal(swarm2Port, 0, 'port number is 0') - } else { - t.ok(swarm2Port > 0 && swarm2Port < 65535, 'valid port number') - } +// t.equal(typeof swarm2Port, 'number', 'port is a number') +// if (process.browser) { +// t.equal(swarm2Port, 0, 'port number is 0') +// } else { +// t.ok(swarm2Port > 0 && swarm2Port < 65535, 'valid port number') +// } - swarm1.destroy() - swarm2.destroy() - } - } +// swarm1.destroy() +// swarm2.destroy() +// } +// } - swarm1.listen(0, function () { - swarm1Port = swarm1.address().port - maybeDone() - }) +// swarm1.listen(0, function () { +// swarm1Port = swarm1.address().port +// maybeDone() +// }) - swarm2.listen(0, function (port2) { - swarm2Port = swarm2.address().port - maybeDone() - }) -}) +// swarm2.listen(0, function (port2) { +// swarm2Port = swarm2.address().port +// maybeDone() +// }) +// })