diff --git a/lib/torrent.js b/lib/torrent.js index 2cc0800b..f27db017 100644 --- a/lib/torrent.js +++ b/lib/torrent.js @@ -29,12 +29,16 @@ var sha1 = require('simple-sha1') var uniq = require('uniq') var ut_metadata = require('ut_metadata') var ut_pex = require('ut_pex') // browser exclude +var subscribe = require('subscribe-ev') var File = require('./file') var RarityMap = require('./rarity-map') var Server = require('./server') // browser exclude var Swarm = require('./swarm') +var initSubscribe = subscribe.init +var toDestory = subscribe.toDestory + var MAX_BLOCK_LENGTH = 128 * 1024 var PIECE_TIMEOUT = 30000 var CHOKE_TIMEOUT = 5000 @@ -101,6 +105,8 @@ function Torrent (torrentId, client, opts) { this._servers = [] if (torrentId !== null) this._onTorrentId(torrentId) + + initSubscribe(this) } Object.defineProperty(Torrent.prototype, 'timeRemaining', { @@ -223,20 +229,21 @@ Torrent.prototype._onParsedTorrent = function (parsedTorrent) { }, maxConns: self.client.maxConns }) - self.swarm.on('error', function (err) { + + subscribe(self, self.swarm, 'error', function (err) { self._onError(err) }) - self.swarm.on('wire', function (wire, addr) { + subscribe(self, self.swarm, 'wire', function (wire, addr) { self._onWire(wire, addr) }) - self.swarm.on('download', function (downloaded) { + subscribe(self, self.swarm, 'download', function (downloaded) { self.client._downloadSpeed(downloaded) // update overall client stats self.client.emit('download', downloaded) self.emit('download', downloaded) }) - self.swarm.on('upload', function (uploaded) { + subscribe(self, self.swarm, 'upload', function (uploaded) { self.client._uploadSpeed(uploaded) // update overall client stats self.client.emit('upload', uploaded) self.emit('upload', uploaded) @@ -304,17 +311,18 @@ Torrent.prototype._onSwarmListening = function () { peerId: self.client.peerId, port: self.client.torrentPort }) - self.discovery.on('error', function (err) { + + subscribe(self, self.discovery, 'error', function (err) { self._onError(err) }) - self.discovery.on('peer', function (peer) { + subscribe(self, self.discovery, 'peer', function (peer) { // Don't create new outgoing TCP connections when torrent is done if (typeof peer === 'string' && self.done) return self.addPeer(peer) }) // expose discovery events - reemit(self.discovery, self, ['trackerAnnounce', 'dhtAnnounce', 'warning']) + toDestory(self, reemit(self.discovery, self, ['trackerAnnounce', 'dhtAnnounce', 'warning'])) // if full metadata was included in initial torrent id, use it if (self.info) self._onMetadata(self) @@ -622,7 +630,7 @@ Torrent.prototype._onWire = function (wire, addr) { // When peer sends PORT message, add that DHT node to routing table if (self.client.dht && self.client.dht.listening) { - wire.on('port', function (port) { + subscribe(self, wire, 'port', function (port) { if (self.destroyed || self.client.dht.destroyed) { return } @@ -638,7 +646,7 @@ Torrent.prototype._onWire = function (wire, addr) { }) } - wire.on('timeout', function () { + subscribe(self, wire, 'timeout', function () { self._debug('wire timeout (%s)', addr) // TODO: this might be destroying wires too eagerly wire.destroy() @@ -653,12 +661,12 @@ Torrent.prototype._onWire = function (wire, addr) { // use ut_metadata extension wire.use(ut_metadata(self.metadata)) - wire.ut_metadata.on('warning', function (err) { + subscribe(self, wire.ut_metadata, 'warning', function (err) { self._debug('ut_metadata warning: %s', err.message) }) if (!self.metadata) { - wire.ut_metadata.on('metadata', function (metadata) { + subscribe(self, wire.ut_metadata, 'metadata', function (metadata) { self._debug('got metadata via ut_metadata') self._onMetadata(metadata) }) @@ -669,14 +677,14 @@ Torrent.prototype._onWire = function (wire, addr) { if (typeof ut_pex === 'function' && !self.private) { wire.use(ut_pex()) - wire.ut_pex.on('peer', function (peer) { + subscribe(self, wire.ut_pex, 'peer', function (peer) { // Only add potential new peers when we're not seeding if (self.done) return self._debug('ut_pex: got peer: %s (from %s)', peer, addr) self.addPeer(peer) }) - wire.ut_pex.on('dropped', function (peer) { + subscribe(self, wire.ut_pex, 'dropped', function (peer) { // the remote peer believes a given peer has been dropped from the swarm. // if we're not currently connected to it, then remove it from the swarm's queue. var peerObj = self.swarm._peers[peer] @@ -686,10 +694,10 @@ Torrent.prototype._onWire = function (wire, addr) { } }) - wire.once('close', function () { + subscribe(self, wire, 'close', function () { // Stop sending updates to remote peer wire.ut_pex.reset() - }) + }, true) } // Hook to allow user-defined `bittorrent-protocol` extensions @@ -731,36 +739,36 @@ Torrent.prototype._onWireWithMetadata = function (wire) { wire.choke() // always choke seeders } - wire.on('bitfield', function () { + subscribe(self, wire, 'bitfield', function () { updateSeedStatus() self._update() }) - wire.on('have', function () { + subscribe(self, wire, 'have', function () { updateSeedStatus() self._update() }) - wire.once('interested', function () { + subscribe(self, wire, 'interested', function () { wire.unchoke() - }) + }, true) - wire.once('close', function () { + subscribe(self, wire, 'close', function () { clearTimeout(timeoutId) - }) + }, true) - wire.on('choke', function () { + subscribe(self, wire, 'choke', function () { clearTimeout(timeoutId) timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT) if (timeoutId.unref) timeoutId.unref() }) - wire.on('unchoke', function () { + subscribe(self, wire, 'unchoke', function () { clearTimeout(timeoutId) self._update() }) - wire.on('request', function (index, offset, length, cb) { + subscribe(self, wire, 'request', function (index, offset, length, cb) { if (length > MAX_BLOCK_LENGTH) { // Per spec, disconnect from peers that request >128KB return wire.destroy() diff --git a/package.json b/package.json index 3e8e4fd7..9acb166f 100644 --- a/package.json +++ b/package.json @@ -61,6 +61,7 @@ "speedometer": "^1.0.0", "stream-to-blob-url": "^2.0.0", "stream-with-known-length-to-buffer": "^1.0.0", + "subscribe-ev": "^1.0.0", "torrent-discovery": "^7.0.0", "torrent-piece": "^1.0.0", "uniq": "^1.0.1",