diff --git a/lib/rarity-map.js b/lib/rarity-map.js index af712cc6..ea5afcea 100644 --- a/lib/rarity-map.js +++ b/lib/rarity-map.js @@ -10,21 +10,21 @@ class RarityMap { this._pieces = new Array(this._numPieces) this._onWire = wire => { - this.recalculate() + this._dirty = true this._initWire(wire) } this._onWireHave = index => { this._pieces[index] += 1 } this._onWireBitfield = () => { - this.recalculate() + this._dirty = true } this._torrent.wires.forEach(wire => { this._initWire(wire) }) this._torrent.on('wire', this._onWire) - this.recalculate() + this._dirty = true } /** @@ -35,6 +35,11 @@ class RarityMap { * @return {number} index of rarest piece, or -1 */ getRarestPiece (pieceFilterFunc) { + if (this._dirty) { + this._dirty = false + this.recalculate() + } + let candidates = [] let min = Infinity diff --git a/lib/torrent.js b/lib/torrent.js index 77cf8988..7660b75c 100644 --- a/lib/torrent.js +++ b/lib/torrent.js @@ -117,6 +117,7 @@ class Torrent extends EventEmitter { // TODO: remove this and expose a hook instead // optimization: don't recheck every file if it hasn't changed this._fileModtimes = opts.fileModtimes + this._verifyPiece = opts.verifyPiece if (torrentId !== null) this._onTorrentId(torrentId) @@ -566,6 +567,14 @@ class Torrent extends EventEmitter { parallelLimit(this.pieces.map((piece, index) => cb => { if (this.destroyed) return cb(new Error('torrent is destroyed')) + if (this._verifyPiece) { + if (this._verifyPiece(index, this._hashes[index], this.infoHash)) { + this._debug('piece verified %s', index) + this._markVerified(index) + } + return process.nextTick(cb, null) + } + this.store.get(index, (err, buf) => { if (this.destroyed) return cb(new Error('torrent is destroyed')) @@ -611,6 +620,7 @@ class Torrent extends EventEmitter { this.pieces[index] = null this._reservations[index] = null this.bitfield.set(index, true) + this.emit('verified', index, this._hashes[index]) } /** @@ -1161,14 +1171,23 @@ class Torrent extends EventEmitter { * Heartbeat to update all peers and their requests. */ _update () { - if (this.destroyed) return - - // update wires in random order for better request distribution - const ite = randomIterate(this.wires) - let wire - while ((wire = ite())) { - this._updateWireWrapper(wire) + if (this._updateScheduled) { + this._debug('_update already queued') + return } + + this._updateScheduled = true + setTimeout(() => { + this._updateScheduled = false + if (this.destroyed) return + + // update wires in random order for better request distribution + const ite = randomIterate(this.wires) + let wire + while ((wire = ite())) { + this._updateWireWrapper(wire) + } + }, 1000) } _updateWireWrapper (wire) { @@ -1527,6 +1546,7 @@ class Torrent extends EventEmitter { self.pieces[index] = null self._reservations[index] = null self.bitfield.set(index, true) + self.emit('verified', index, hash) self.store.put(index, buf) @@ -1545,6 +1565,11 @@ class Torrent extends EventEmitter { }) }) + // the wire may be choking or closed + if (numRequests === wire.requests.length) { + return false + } + function onUpdateTick () { process.nextTick(() => { self._update() }) }