diff --git a/bin/cmd.js b/bin/cmd.js index 05c0d896..1e111452 100755 --- a/bin/cmd.js +++ b/bin/cmd.js @@ -438,21 +438,43 @@ function drawTorrent (torrent) { '{green:blocked:} {bold:' + torrent.numBlockedPeers + '}' ) clivas.line('{80:}') - linesremaining -= 8 + linesremaining -= 9 var pieces = torrent.storage.pieces + var storageMem = 0 for (var i = 0; i < pieces.length; i++) { var piece = pieces[i] + if (piece.buffer) + storageMem += piece.buffer.length if (piece.verified || piece.blocksWritten === 0) { continue; } var bar = '' for (var j = 0; j < piece.blocks.length; j++) { - bar += piece.blocks[j] ? '{green:█}' : '{red:█}'; + if (j < piece.blocksHashed) { + bar += '{green:█}'; + } else { + switch(piece.blocks[j]) { + case 0: + bar += '{red:█}'; + break; + case 1: + bar += '{yellow:█}'; + break; + case 2: + bar += '{blue:█}'; + break; + default: + throw 'Invalid block state: ' + piece.blocks[j] + } + } } clivas.line('{4+cyan:' + i + '} ' + bar); linesremaining -= 1 } + clivas.line( + '{red:storage mem:} {bold:' + Math.ceil(storageMem / 1024) + ' KB} ' + ) clivas.line('{80:}') linesremaining -= 1 diff --git a/lib/fs-storage.js b/lib/fs-storage.js index cd32d4e5..c3d5569c 100644 --- a/lib/fs-storage.js +++ b/lib/fs-storage.js @@ -9,7 +9,6 @@ var path = require('path') var raf = require('random-access-file') var rimraf = require('rimraf') var Storage = require('./storage') -var thunky = require('thunky') var TMP = fs.existsSync('/tmp') ? '/tmp' : os.tmpDir() function noop () {} @@ -47,7 +46,7 @@ function FSStorage (parsedTorrent, opts) { var pieceLength = file.pieceLength var filePath = path.join(self.path, file.path) - var openWrite = thunky(function (cb) { + var openWrite = function (cb) { var fileDir = path.dirname(filePath) mkdirp(fileDir, function (err) { @@ -58,16 +57,16 @@ function FSStorage (parsedTorrent, opts) { file.fd = fd cb(null, fd) }) - }) + } - var openRead = thunky(function (cb) { + var openRead = function (cb) { // TODO: no need for fs.exists call, just try opening and handle error. // fs.exists then open creates opportunity for race condition. fs.exists(filePath, function (exists) { if (exists) return openWrite(cb) cb(self.nonExistentError) }) - }) + } file.pieces.forEach(function (piece) { var index = piece.index @@ -160,6 +159,7 @@ FSStorage.prototype._onPieceDone = function (piece) { var writeToNextFile = function (err) { if (err) return self.emit('error', err) if (i >= end) { + delete piece.buffer return cb() } diff --git a/lib/sha1/index.js b/lib/sha1/index.js new file mode 100644 index 00000000..ff845f32 --- /dev/null +++ b/lib/sha1/index.js @@ -0,0 +1,28 @@ +var through2 = require('through2') +var crypto = require('crypto') + + +// encapsulated a crypto stream in order to: +// * lazily instantiate the underlying implementation +// * move to webworkers later on +module.exports = function SHA1 () { + var hash + function spawnOnDemand () { + if (!hash) + hash = crypto.createHash('sha1') + } + + var self = through2(function (buffer, enc, callback) { + spawnOnDemand() + hash.update(buffer) + callback() + }, function (callback) { + spawnOnDemand() + var digest = hash.digest('hex') + self.hexDigest = digest + this.push(digest) + this.push(null) + callback() + }) + return self +} diff --git a/lib/storage.js b/lib/storage.js index 570ae086..fd5117e2 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -11,7 +11,7 @@ var FileStream = require('./file-stream') var inherits = require('inherits') var MultiStream = require('multistream') var once = require('once') -var sha1 = require('simple-sha1') +var sha1 = require('./sha1') var BLOCK_LENGTH = 16 * 1024 @@ -74,13 +74,11 @@ Piece.prototype.writeBlock = function (offset, buffer, cb) { return cb(null) } + debug('piece ' + self.index + ' writeBlock@' + offset + '+' + buffer.length) buffer.copy(self.buffer, offset) self.blocks[i] = BLOCK_WRITTEN self.blocksWritten += 1 - - if (self.blocksWritten === self.blocks.length) { - self.verify() - } + self._hashNext() cb(null) } @@ -123,38 +121,57 @@ Piece.prototype._reset = function () { self.blocks = new Buffer(Math.ceil(self.length / BLOCK_LENGTH)) self.blocks.fill(0) self.blocksWritten = 0 + self.sha = sha1() + // pull any received blocks + self.sha.on('drain', self._hashNext.bind(self)) + // triggered by _hashNext(): + self.sha.on('end', self._onHashed.bind(self)) + self.blocksHashed = 0 + // put into flowing mode + // we don't provide a sink, just waiting for 'end' + self.sha.resume() } -Piece.prototype.verify = function (buffer) { - var self = this - buffer = buffer || self.buffer - if (self.verified || !buffer) { +// called by writeBlock() +Piece.prototype._hashNext = function () { + if (this.verified || !this.buffer) { return } - if (self.noVerify) { - self.verified = true - onResult() - return - } - - var expectedHash - sha1(buffer, function (_expectedHash) { - expectedHash = _expectedHash - self.verified = (expectedHash === self.hash) - onResult() - }) - - function onResult () { - if (self.verified) { - self.emit('done') + debug('piece ' + this.index + ' _hashNext: blocksHashed=' + this.blocksHashed + '/' + this.blocks.length) + if (this.blocks[this.blocksHashed] === BLOCK_WRITTEN) { + debug('piece ' + this.index + ' _hashNext: write') + var block = this.buffer.slice(this.blocksHashed * BLOCK_LENGTH, (this.blocksHashed + 1) * BLOCK_LENGTH) + this.blocksHashed += 1 + if (this.sha.write(block)) { + debug('piece ' + this.index + ' _hashNext: more') + // recurse + process.nextTick(this._hashNext.bind(this)) } else { - self.emit('warning', new Error('piece ' + self.index + ' failed verification; ' + expectedHash + ' expected ' + self.hash)) - self._reset() + debug('piece ' + this.index + ' _hashNext: wait for sha readable') } + } else if (this.blocksHashed >= this.blocks.length) { + // will trigger 'done' event + debug('piece ' + this.index + ' _hashNext: end') + this.sha.end() + } else { + debug('piece ' + this.index + ' _hashNext: wait more data') + } +} + +Piece.prototype._onHashed = function () { + debug('piece ' + this.index + ' verified: ' + this.sha.hexDigest + ' == ' + this.hash) + this.verified = (this.sha.hexDigest === this.hash) + + if (this.verified) { + this.emit('done') + } else { + this.emit('warning', new Error('piece ' + this.index + ' failed verification; ' + this.sha.hexDigest + ' expected ' + this.hash)) + this._reset() } } +// validates a readBlock()/writeBlock() offset Piece.prototype._verifyOffset = function (offset) { var self = this if (offset % BLOCK_LENGTH === 0) { @@ -165,6 +182,7 @@ Piece.prototype._verifyOffset = function (offset) { } } +// validates a writeBlock() buffer length Piece.prototype._verifyBlock = function (offset, buffer) { var self = this if (buffer.length === BLOCK_LENGTH) { diff --git a/package.json b/package.json index e5ff75e1..e191b08a 100644 --- a/package.json +++ b/package.json @@ -19,7 +19,8 @@ "bittorrent-swarm": "webtorrent-swarm", "load-ip-set": false, "simple-get": false, - "ut_pex": false + "ut_pex": false, + "crypto": "crypto-browserify" }, "browserify": { "transform": [ @@ -38,6 +39,7 @@ "browserify-versionify": "^1.0.2", "clivas": "^0.1.4", "create-torrent": "^3.4.0", + "crypto-browserify": "^3.9.9", "debug": "^2.1.0", "dezalgo": "^1.0.1", "end-of-stream": "^1.0.0", @@ -61,9 +63,7 @@ "rimraf": "^2.2.5", "run-parallel": "^1.0.0", "simple-get": "^1.0.0", - "simple-sha1": "^2.0.0", "speedometer": "^0.1.2", - "thunky": "^0.1.0", "torrent-discovery": "^2.0.1", "ut_metadata": "^2.1.0", "ut_pex": "^1.0.1",