diff --git a/lib/torrent.js b/lib/torrent.js index c0e8aafc..bfc0a1be 100644 --- a/lib/torrent.js +++ b/lib/torrent.js @@ -100,6 +100,8 @@ function Torrent (torrentId, client, opts) { this._amInterested = false this._selections = [] this._critical = [] + this._verified = [] + this._verifying = [] this.wires = [] // open wires (added *after* handshake) @@ -532,6 +534,8 @@ Torrent.prototype._onMetadata = function (metadata) { self._onWireWithMetadata(wire) }) + self.emit('metadata') + self._debug('verifying existing torrent data') if (self._fileModtimes && self._store === FSChunkStore) { // don't verify if the files haven't been modified since we last checked @@ -546,9 +550,11 @@ Torrent.prototype._onMetadata = function (metadata) { if (unchanged) { for (var index = 0; index < self.pieces.length; index++) { - self._markVerified(index) + self._markInStore(index) } - self._onStore() + self._checkDone() + + self.updateSelections() } else { self._verifyPieces() } @@ -557,7 +563,8 @@ Torrent.prototype._onMetadata = function (metadata) { self._verifyPieces() } - self.emit('metadata') + self.ready = true + self.emit('ready') } /* @@ -586,55 +593,67 @@ Torrent.prototype._verifyPieces = function () { var self = this parallelLimit(self.pieces.map(function (_, index) { return function (cb) { - if (self.destroyed) return cb(new Error('torrent is destroyed')) - - self.store.get(index, function (err, buf) { - if (self.destroyed) return cb(new Error('torrent is destroyed')) - - if (err) return process.nextTick(cb, null) // ignore error - sha1(buf, function (hash) { - if (self.destroyed) return cb(new Error('torrent is destroyed')) - - if (hash === self._hashes[index]) { - if (!self.pieces[index]) return - self._debug('piece verified %s', index) - self._markVerified(index) - } else { - self._debug('piece invalid %s', index) - } - cb(null) - }) - }) + self._verify(index, cb) } }), FILESYSTEM_CONCURRENCY, function (err) { if (err) return self._destroy(err) self._debug('done verifying') - self._onStore() }) } -Torrent.prototype._markVerified = function (index) { - this.pieces[index] = null - this._reservations[index] = null - this.bitfield.set(index, true) -} - -/** - * Called when the metadata, listening server, and underlying chunk store is initialized. - */ -Torrent.prototype._onStore = function () { +Torrent.prototype._verify = function (index, cb) { var self = this - if (self.destroyed) return - self._debug('on store') - self.ready = true - self.emit('ready') + if (self.destroyed) return cb(new Error('torrent is destroyed')) + if (self._verifying[index] || self._verified[index]) { + // call cb() asynchronously so that parallelLimit() won't block + return process.nextTick(cb, null) + } - // Files may start out done if the file was already in the store - self._checkDone() + self._verifying[index] = true - // In case any selections were made before torrent was ready - self._updateSelections() + self.store.get(index, function (err, buf) { + if (self.destroyed) return cb(new Error('torrent is destroyed')) + + if (err) { + // interpret any error as not having the chunk + self._verified[index] = true + self._update() + return cb(null) + } + + sha1(buf, function (hash) { + if (self.destroyed) return cb(new Error('torrent is destroyed')) + + if (hash === self._hashes[index]) { + self._debug('piece from store verified %s', index) + self._markInStore(index) + + self.wires.forEach(function (wire) { + wire.have(index) + }) + + // We also check `self.destroyed` since `torrent.destroy()` could have been + // called in the `torrent.on('done')` handler, triggered by `_checkDone()`. + if (self._checkDone() && !self.destroyed) self.discovery.complete() + + self._update() + + cb(null) + } else { + self._debug('piece from store invalid %s', index) + self._verified[index] = true + self._update() + cb(null) + } + }) + }) +} + +Torrent.prototype._markInStore = function (index) { + this.pieces[index] = null + this._reservations[index] = null + this.bitfield.set(index, true) } Torrent.prototype.destroy = function (cb) { @@ -1470,6 +1489,13 @@ Torrent.prototype._request = function (wire, index, hotswap) { if (self.bitfield.get(index)) return false + if (!self._verified[index]) { + // verify critical pieces as soon as possible + if (self._critical[index]) self._verify(index, noop) + + return false + } + var maxOutstandingRequests = isWebSeed ? Math.min( getPiecePipelineLength(wire, PIPELINE_MAX_DURATION, self.pieceLength), @@ -1500,9 +1526,6 @@ Torrent.prototype._request = function (wire, index, hotswap) { wire.request(index, chunkOffset, chunkLength, function onChunk (err, chunk) { if (self.destroyed) return - // TODO: what is this for? - if (!self.ready) return self.once('ready', function () { onChunk(err, chunk) }) - if (r[i] === wire) r[i] = null if (piece !== self.pieces[index]) return onUpdateTick() @@ -1534,7 +1557,7 @@ Torrent.prototype._request = function (wire, index, hotswap) { if (hash === self._hashes[index]) { if (!self.pieces[index]) return - self._debug('piece verified %s', index) + self._debug('piece from wire verified %s', index) self.pieces[index] = null self._reservations[index] = null @@ -1551,7 +1574,7 @@ Torrent.prototype._request = function (wire, index, hotswap) { if (self._checkDone() && !self.destroyed) self.discovery.complete() } else { self.pieces[index] = new Piece(piece.length) - self.emit('warning', new Error('Piece ' + index + ' failed verification')) + self.emit('warning', new Error('Piece from wire ' + index + ' failed verification')) } onUpdateTick() })