diff --git a/bin/cmd.js b/bin/cmd.js index 652eb74c..4532edf3 100755 --- a/bin/cmd.js +++ b/bin/cmd.js @@ -6,14 +6,13 @@ var cp = require('child_process') var fs = require('fs') var http = require('http') var minimist = require('minimist') -var os = require('os') var path = require('path') var numeral = require('numeral') var address = require('network-address') +var moment = require('moment') +var proc = require('child_process') var WebTorrent = require('../') -var TMP = os.tmp - function usage () { var logo = fs.readFileSync(path.join(__dirname, 'ascii-logo.txt'), 'utf8') logo.split('\n').forEach(function (line) { @@ -33,8 +32,12 @@ function usage () { console.log('') console.log(' -p, --port change the http port [default: 9000]') console.log(' -l, --list list available files in torrent') + console.log(' -n, --no-quit do not quit peerflix on vlc exit') + console.log(' -r, --remove remove any downloaded files on exit') + console.log(' -b, --blocklist use the specified blocklist') console.log(' -t, --subtitles load subtitles file') console.log(' -h, --help display this help message') + console.log(' -q, --quiet silence stdout') console.log(' -v, --version print the current version') console.log('') } @@ -46,6 +49,10 @@ var torrentId = argv._[0] var port = Number(argv.port || argv.p) || 9000 var list = argv.list || argv.l var subtitles = argv.subtitles || argv.t +var quiet = argv.quiet || argv.q +var noquit = argv.n || argv['no-quit'] +var blocklist = argv.blocklist || argv.b +var removeOnExit = argv.remove || argv.r if (argv.help || argv.h) { usage() @@ -63,6 +70,7 @@ if (!torrentId) { } var VLC_ARGS = '-q --video-on-top --play-and-exit' +//var VLC_ARGS = '--video-on-top --play-and-exit --extraintf=http:logger --verbose=2 --file-logging --logfile=vlc-log.txt' var OMX_EXEC = 'omxplayer -r -o ' + (typeof argv.omx === 'string') ? argv.omx + ' ' : 'hdmi ' @@ -76,14 +84,48 @@ if (subtitles) { var client = new WebTorrent({ list: list, - quiet: true + quiet: true, + blocklist: blocklist }) +var started = Date.now() +var listening = false + client.on('error', function (err) { clivas.line('{red:error} ' + err.message) }) -client.add(torrentId, function (err, torrent) { +client.once('ready', function () { + client.server.once('error', function () { + client.server.listen(0) + }) + + client.server.listen(port) +}) + +client.server.once('listening', function () { + listening = true +}) + +function remove () { + process.removeListener('SIGINT', remove) + process.removeListener('SIGTERM', remove) + + client.destroy(function () { + process.nextTick(function () { + process.exit() + }) + }) +} + +if (removeOnExit) { + process.on('SIGINT', remove) + process.on('SIGTERM', remove) +} + +client.add(torrentId, { + remove: removeOnExit +}, function (err, torrent) { if (err) { clivas.line('{red:error} ' + err.message) process.exit(1) @@ -96,7 +138,7 @@ client.add(torrentId, function (err, torrent) { } } - if (!torrent.metadata && !argv.quiet && !list) { + if (!torrent.metadata && !quiet && !list) { updateMetadata() torrent.swarm.on('wire', updateMetadata) @@ -106,7 +148,7 @@ client.add(torrentId, function (err, torrent) { } }) -client.once('torrent', function (torrent) { +function ontorrent (torrent) { if (list) { torrent.files.forEach(function (file, i) { clivas.line('{3+bold:'+i+'} : {magenta:'+file.name+'}') @@ -115,17 +157,7 @@ client.once('torrent', function (torrent) { process.exit(0) } - var started = Date.now() - var swarm = torrent.swarm - var wires = swarm.wires - - function active (wire) { - return !wire.peerChoking - } - - var href = 'http://' + address() + ':' + swarm.port + '/' - //var filename = engine.server.index.name.split('/').pop().replace(/\{|\}/g, '') - var filename = torrent.name + var href = 'http://' + address() + ':' + client.server.address().port + '/' if (argv.vlc && process.platform === 'win32') { var registry = require('windows-no-runnable').registry @@ -147,11 +179,36 @@ client.once('torrent', function (torrent) { proc.execFile(vlcPath, VLC_ARGS) } } else { - if (argv.vlc) proc.exec('vlc '+href+' '+VLC_ARGS+' || /Applications/VLC.app/Contents/MacOS/VLC '+href+' '+VLC_ARGS) + if (argv.vlc) { + var vlc = proc.exec('vlc '+href+' '+VLC_ARGS+' || /Applications/VLC.app/Contents/MacOS/VLC '+href+' '+VLC_ARGS, function (error) { + if (error) { + process.exit(1) + } + }) + + vlc.on('exit', function () { + if (!noquit) process.exit(0) + }) + } } - if (argv.omx) proc.exec(OMX_EXEC+' '+href) - if (argv.mplayer) proc.exec(MPLAYER_EXEC+' '+href) + if (argv.omx) proc.exec(OMX_EXEC + ' ' + href) + if (argv.mplayer) proc.exec(MPLAYER_EXEC + ' ' + href) + //if (quiet) console.log('server is listening on', href) + + var filename = torrent.name + //var filename = index.name.split('/').pop().replace(/\{|\}/g, '') + var swarm = torrent.swarm + var wires = swarm.wires + var hotswaps = 0 + + torrent.on('hotswap', function () { + hotswaps++ + }) + + function active (wire) { + return !wire.peerChoking + } function bytes (num) { return numeral(num).format('0.0b') @@ -161,51 +218,68 @@ client.once('torrent', function (torrent) { return Math.floor((Date.now() - started) / 1000) } - process.stdout.write(new Buffer('G1tIG1sySg==', 'base64')) // clear for drawing - - function draw () { - var unchoked = swarm.wires.filter(active) - var runtime = getRuntime() - var linesremaining = clivas.height - var peerslisted = 0 - - clivas.clear() - clivas.line('{green:open} {bold:vlc} {green:and enter} {bold:'+href+'} {green:as the network address}') - clivas.line('') - clivas.line('{yellow:info} {green:streaming} {bold:'+filename+'} {green:-} {bold:'+bytes(swarm.downloadSpeed())+'/s} {green:from} {bold:'+unchoked.length +'/'+wires.length+'} {green:peers} ') - clivas.line('{yellow:info} {green:downloaded} {bold:'+bytes(swarm.downloaded)+'} {green:and uploaded }{bold:'+bytes(swarm.uploaded)+'} {green:in }{bold:'+runtime+'s}') - clivas.line('{yellow:info} {green:peer queue size is} {bold:'+swarm.numQueued+'} ') - clivas.line('{80:}') - linesremaining -= 8 - - wires.every(function (wire) { - var tags = [] - if (wire.peerChoking) tags.push('choked') - clivas.line('{25+magenta:'+wire.remoteAddress+'} {10:'+bytes(wire.downloaded)+'} {10+cyan:'+bytes(wire.downloadSpeed())+'/s} {15+grey:'+tags.join(', ')+'} ') - peerslisted++ - return linesremaining - peerslisted > 4 - }) - linesremaining -= peerslisted + if (!quiet) { + process.stdout.write(new Buffer('G1tIG1sySg==', 'base64')); // clear for drawing + + function draw () { + var unchoked = swarm.wires.filter(active) + var runtime = getRuntime() + var linesremaining = clivas.height + var peerslisted = 0 + var speed = swarm.downloadSpeed() + var estimatedSecondsRemaining = Math.max(0, torrent.length - swarm.downloaded) / (speed > 0 ? speed : -1) + var estimate = moment.duration(estimatedSecondsRemaining, 'seconds').humanize() + + clivas.clear() + clivas.line('{green:open} {bold:vlc} {green:and enter} {bold:'+href+'} {green:as the network address}') + clivas.line('') + clivas.line('{yellow:info} {green:streaming} {bold:'+filename+'} {green:-} {bold:'+bytes(speed)+'/s} {green:from} {bold:'+unchoked.length +'/'+wires.length+'} {green:peers} ') + clivas.line('{yellow:info} {green:downloaded} {bold:'+bytes(swarm.downloaded)+'} {green:out of} {bold:'+bytes(torrent.length)+'} {green:and uploaded }{bold:'+bytes(swarm.uploaded)+'} {green:in }{bold:'+runtime+'s} {green:with} {bold:'+hotswaps+'} {green:hotswaps} ') + clivas.line('{yellow:info} {green:estimating} {bold:'+estimate+'} {green:remaining}; {green:peer queue size is} {bold:'+swarm.numQueued+'} ') + clivas.line('{80:}') + linesremaining -= 8 + + wires.every(function (wire) { + var tags = [] + if (wire.peerChoking) tags.push('choked') + clivas.line('{25+magenta:'+wire.remoteAddress+'} {10:'+bytes(wire.downloaded)+'} {10+cyan:'+bytes(wire.downloadSpeed())+'/s} {15+grey:'+tags.join(', ')+'} ') + peerslisted++ + return linesremaining - peerslisted > 4 + }) + linesremaining -= peerslisted + + if (wires.length > peerslisted) { + clivas.line('{80:}') + clivas.line('... and '+(wires.length - peerslisted)+' more ') + } - if (wires.length > peerslisted) { clivas.line('{80:}') - clivas.line('... and '+(wires.length - peerslisted)+' more ') + clivas.flush() } - clivas.line('{80:}') - clivas.flush() + setInterval(draw, 500) + draw() } - setInterval(draw, 500) - draw() - torrent.on('done', function () { - clivas.line('torrent downloaded {green:successfully} from {bold:'+wires.length+'} {green:peers} in {bold:'+getRuntime()+'s}!') - process.exit(0) + if (!quiet) { + clivas.line('torrent downloaded {green:successfully} from {bold:'+wires.length+'} {green:peers} in {bold:'+getRuntime()+'s}!') + } + if (removeOnExit) { + remove() + } else { + process.exit(0) + } }) +} - /*client.on('ready', function() { - swarm.removeListener('wire', onmagnet) - client.server.listen(argv.port || 8888) - })*/ +client.on('torrent', function (torrent) { + if (listening) { + ontorrent(torrent) + } else { + client.on('listening', function (torrent) { + ontorrent(torrent) + }) + } }) + diff --git a/index.js b/index.js index 2d5cacab..5b96eb8c 100644 --- a/index.js +++ b/index.js @@ -3,21 +3,30 @@ module.exports = WebTorrent var Client = require('bittorrent-client') +var FSStorage = require('./lib/fs_storage') var fs = require('fs') +var url = require('url') var http = require('http') var inherits = require('inherits') +var pump = require('pump') +var mime = require('mime') +var rangeParser = require('range-parser') +var extend = require('extend.js') inherits(WebTorrent, Client) function WebTorrent (opts) { var self = this + opts = opts || {} + if (opts.blocklist) opts.blocklist = parseBlocklist(opts.blocklist) Client.call(self, opts) - if (!opts) opts = {} if (opts.list) { return } + self._startServer() + self.on('torrent', function (torrent) { self._onTorrent(torrent) }) @@ -26,20 +35,26 @@ function WebTorrent (opts) { // completed and handle it by stopping fetching additional data from the network } -WebTorrent.prototype.add = function (torrentId, cb) { +WebTorrent.prototype.add = function (torrentId, opts, cb) { var self = this - if (typeof cb !== 'function') cb = function () {} - - // TODO: support passing in an index to file to download - // self.index = opts.index - if (!self.ready) { - return self.once('ready', self.add.bind(self, torrentId, cb)) + return self.once('ready', self.add.bind(self, torrentId, opts, cb)) + } + + if (typeof opts === 'function') { + cb = opts + opts = {} } + if (typeof cb !== 'function') cb = function () {} + opts = extend({ + storage: FSStorage + }, opts) + + self.index = opts.index // Called once we have a torrentId that bittorrent-client can handle function onTorrentId (torrentId) { - Client.prototype.add.call(self, torrentId, cb) + Client.prototype.add.call(self, torrentId, opts, cb) } if (Client.toInfoHash(torrentId)) { @@ -74,14 +89,94 @@ WebTorrent.prototype._onTorrent = function (torrent) { var self = this // if no index specified, use largest file - // TODO: support torrent index selection correctly -- this doesn't work yet - /*if (typeof torrent.index !== 'number') { + if (typeof torrent.index !== 'number') { var largestFile = torrent.files.reduce(function (a, b) { return a.length > b.length ? a : b }) torrent.index = torrent.files.indexOf(largestFile) } - // TODO - torrent.files[torrent.index].select()*/ + torrent.files[torrent.index].select() + self.index = torrent.index + self.torrent = torrent } + +WebTorrent.prototype._startServer = function () { + var self = this + self.server = http.createServer() + self.server.on('request', self._onRequest.bind(self)) +} + +WebTorrent.prototype._onRequest = function (req, res) { + var self = this + + if (!self.ready) { + return self.once('ready', self._onRequest.bind(self, req, res)) + } + + var u = url.parse(req.url) + + if (u.pathname === '/favicon.ico') { + return res.end() + } + if (u.pathname === '/') { + u.pathname = '/' + self.index + } + + var i = Number(u.pathname.slice(1)) + + if (isNaN(i) || i >= self.torrent.files.length) { + res.statusCode = 404 + return res.end() + } + + var file = self.torrent.files[i] + var range = req.headers.range + + res.setHeader('Accept-Ranges', 'bytes') + res.setHeader('Content-Type', mime.lookup(file.name)) + + if (!range) { + res.statusCode = 206 + res.setHeader('Content-Length', file.length) + if (req.method === 'HEAD') { + return res.end() + } + pump(file.createReadStream(), res) + return + } + + range = rangeParser(file.length, range)[0] // don't support multi-range reqs + res.statusCode = 206 + + var rangeStr = 'bytes ' + range.start + '-' + range.end + '/' + file.length + res.setHeader('Content-Range', rangeStr) + res.setHeader('Content-Length', range.end - range.start + 1) + + if (req.method === 'HEAD') { + return res.end() + } + pump(file.createReadStream(range), res) +} + +// +// HELPER METHODS +// + +function parseBlocklist (filename) { + // TODO: support gzipped files + var blocklistData = fs.readFileSync(filename, { encoding: 'utf8' }) + var blocklist = [] + blocklistData.split('\n').forEach(function(line) { + var match = null + if ((match = /^\s*([^#].*)\s*:\s*([a-f0-9.:]+?)\s*-\s*([a-f0-9.:]+?)\s*$/.exec(line))) { + blocklist.push({ + reason: match[1], + startAddress: match[2], + endAddress: match[3] + }) + } + }) + return blocklist +} + diff --git a/lib/fs_storage.js b/lib/fs_storage.js new file mode 100644 index 00000000..f07d5098 --- /dev/null +++ b/lib/fs_storage.js @@ -0,0 +1,196 @@ +module.exports = FSStorage + +var Storage = require('bittorrent-client').Storage +var inherits = require('inherits') +var extend = require('extend.js') +var os = require('os') +var fs = require('fs') +var path = require('path') +var raf = require('random-access-file') +var mkdirp = require('mkdirp') +var rimraf = require('rimraf') +var thunky = require('thunky') + +var TMP = fs.existsSync('/tmp') ? '/tmp' : os.tmpDir() + +inherits(FSStorage, Storage) + +/** + * fs-backed Storage for a torrent download. + * + * @param {Object} parsedTorrent + * @param {Object} opts + */ +function FSStorage (parsedTorrent, opts) { + var self = this + opts = extend({ + nobuffer: true, + tmp: TMP, + name: 'webtorrent' + }, opts || {}) + Storage.call(self, parsedTorrent, opts) + + if (!opts.path) { + opts.path = path.join(opts.tmp, opts.name, parsedTorrent.infoHash) + } + + self.path = opts.path + + self.piecesMap = [] + self.files.forEach(function (file) { + var fileStart = file.offset + var fileEnd = fileStart + file.length + + var firstPiece = file.pieces[0].index + var lastPiece = file.pieces[file.pieces.length - 1].index + var pieceLength = file.pieceLength + + var open = thunky(function (cb) { + var filePath = path.join(self.path, file.path) + var fileDir = path.dirname(filePath) + + mkdirp(fileDir, function (err) { + if (err) return cb(err) + if (self.closed) return cb(new Error('Storage closed')) + + var fd = raf(filePath) + file.fd = fd + cb(null, fd) + }) + }) + + file.pieces.forEach(function (piece) { + var index = piece.index + + var pieceStart = index * pieceLength + var pieceEnd = pieceStart + piece.length + + var from = (fileStart < pieceStart) ? 0 : fileStart - pieceStart + var to = (fileEnd > pieceEnd) ? pieceLength : fileEnd - pieceStart + var offset = (fileStart > pieceStart) ? 0 : pieceStart - fileStart + + if (!self.piecesMap[index]) self.piecesMap[index] = [] + + self.piecesMap[index].push({ + from: from, + to: to, + offset: offset, + open: open + }) + }) + }) +} + +FSStorage.prototype.readBlock = function (index, offset, length, cb) { + var self = this + if (!cb) return console.error('FSStorage.readBlock requires a callback') + + var piece = self.pieces[index] + if (!piece) return cb(new Error("invalid piece index " + index)) + + if (piece.verified && piece.buffer) { + // piece is verified and cached in memory, so read directly from its buffer + // instead of reading from the filesystem. + return piece.readBlock(offset, length, cb) + } + + var rangeFrom = offset + var rangeTo = rangeFrom + length + + var targets = self.piecesMap[index].filter(function (target) { + return (target.to > rangeFrom && target.from < rangeTo) + }) + + if (!targets.length) return cb(new Error("no file matching the requested range?")) + + var buffers = [] + var end = targets.length + var i = 0 + + var readFromNextFile = function(err, buffer) { + if (err) return cb(err) + if (buffer) buffers.push(buffer) + if (i >= end) return cb(null, Buffer.concat(buffers)) + + var target = targets[i++] + + var from = target.from + var to = target.to + var offset = target.offset + + if(to > rangeTo) to = rangeTo + if(from < rangeFrom) { + offset += rangeFrom - from + from = rangeFrom + } + + target.open(function(err, file) { + if (err) return cb(err) + file.read(offset, to - from, readFromNextFile) + }) + } + + readFromNextFile() +} + +// flush pieces to file once they're done and verified +FSStorage.prototype._onPieceDone = function (piece) { + var self = this + var targets = self.piecesMap[piece.index] + var end = targets.length + var i = 0 + + var writeToNextFile = function(err) { + if (err) return self.emit('error', err) + if (i >= end) { + return Storage.prototype._onPieceDone.call(self, piece) + } + + var target = targets[i++] + target.open(function(err, file) { + if (err) return self.emit('error', err) + file.write(target.offset, piece.buffer.slice(target.from, target.to), writeToNextFile) + }) + } + + writeToNextFile() +} + +/** + * Removes and cleans up any backing store for this storage. + */ +FSStorage.prototype.remove = function (cb) { + var self = this + if (!cb) cb = noop + + self.close(function (err) { + if (err) return cb(err) + var root = self.files[0].path.split(path.sep)[0] + rimraf(path.join(self.path, root), cb) + }) +} + +/** + * Closes the backing store for this storage. + */ +FSStorage.prototype.close = function (cb) { + var self = this + if (!cb) cb = noop + if (self.closed) return cb() + + Storage.prototype.close.call(self, function (err) { + if (err) return cb(err) + + var i = 0 + function loop (err) { + if (i >= self.files.length) return cb() + if (err) return cb(err) + var next = self.files[i++] + if (!next || !next.fd) return process.nextTick(loop) + next.fd.close(loop) + } + + process.nextTick(loop) + }) +} + diff --git a/package.json b/package.json index af452d19..8d0b28d8 100644 --- a/package.json +++ b/package.json @@ -21,7 +21,17 @@ "clivas": "^0.1.4", "concat-stream": "^1.4.4", "inherits": "^2.0.1", - "minimist": "^0.0.8" + "minimist": "^0.0.8", + "moment": "^2.6.0", + "mime": "^1.2.11", + "pump": "^0.3.2", + "range-parser": "^1.0.0", + "windows-no-runnable": "~0.0.6", + "random-access-file": "^0.3.1", + "rimraf": "^2.2.5", + "thunky": "^0.1.0", + "mkdirp": "^0.3.5", + "extend.js": "^0.0.1" }, "devDependencies": { "tape": "2.x"