diff --git a/index.js b/index.js index f6fcb415..04a684f3 100644 --- a/index.js +++ b/index.js @@ -18,7 +18,7 @@ var parseTorrent = require('parse-torrent') var speedometer = require('speedometer') var Storage = require('./lib/storage') var stream = require('stream') -var Torrent = require('./lib/torrent') +var Torrent = require('./lib/torrent-manager') inherits(WebTorrent, EventEmitter) diff --git a/lib/storage.js b/lib/storage.js index 5c964021..14ec01a2 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -260,6 +260,7 @@ File.prototype._checkDone = function () { }) if (self.done) { + debug('done'); process.nextTick(function () { self.emit('done') }) @@ -550,6 +551,7 @@ Storage.prototype._checkDone = function () { if (!self.done && self.files.every(function (file) { return file.done })) { self.done = true + debug('done'); self.emit('done') } } diff --git a/lib/torrent-manager.js b/lib/torrent-manager.js new file mode 100644 index 00000000..aaad26f3 --- /dev/null +++ b/lib/torrent-manager.js @@ -0,0 +1,191 @@ +module.exports = torrentManager + +var debug = require('debug')('webtorrent:torrentmanager') +var EventEmitter = require('events').EventEmitter +var inherits = require('inherits') +var parseTorrent = require('parse-torrent') +var Torrent = require('./torrent') +var Webseed = require('./webseed') +var Storage = require('./storage') +var Server = require('./server') // browser exclude +var hh = require('http-https') // browser exclude +var once = require('once') +var concat = require('concat-stream') // browser exclude +var reemit = require('re-emitter') +var parallel = require('run-parallel') +var fs = require('fs') // browser exclude + +inherits(torrentManager, EventEmitter) + + function torrentManager(torrentId, opts) { + var self = this + self._storageImpl = opts.storage || Storage + self.files = [] + var parsedTorrent = parseTorrent(torrentId) + if (parsedTorrent && parsedTorrent.infoHash) { + onTorrentId(parsedTorrent) + + } else if (typeof hh.get === 'function' && /^https?:/.test(torrentId)) { + // http or https url to torrent file + httpGet(torrentId, function (err, torrent) { + if (err) + return self.emit('error', new Error('error downloading torrent: ' + err.message)) + onTorrentId(torrent) + }) + + } else if (typeof fs.readFile === 'function') { + // assume it's a filesystem path + fs.readFile(torrentId, function (err, torrent) { + if (err) return self.emit('error', new Error('invalid torrent id')) + onTorrentId(torrent) + }) + + } else throw new Error('invalid torrent id') + + + function onTorrentId (torrentId) { + parsedTorrent = parseTorrent(torrentId) + self.infoHash = parsedTorrent.infoHash + debug(self.infoHash); + if (parsedTorrent.name) self.name = parsedTorrent.name // preliminary name + self.torrent = new Torrent(self, parsedTorrent, opts); + reemit(self.torrent, self, ['ready', 'dhtAnnounce', 'listening']); + self.torrent.on('ready', function() { self.ready = true; }); + if (parsedTorrent.info) self.onMetadata(parsedTorrent) + } +} + +torrentManager.prototype.addPeer = function(peer) { + var self = this + self.torrent.addPeer(peer); +} + +torrentManager.prototype.createServer = function (opts) { + var self = this + if (typeof Server === 'function' /* browser exclude */) { + return new Server(self, opts) + } +} + +/** + * Called when the metadata is received. + */ +torrentManager.prototype.onMetadata = function (metadata) { + var self = this + if (self.metadata || self._destroyed) return + debug('got metadata') + + if (metadata && metadata.infoHash) { + // `metadata` is a parsed torrent (from parse-torrent module) + self.metadata = parseTorrent.toBuffer(metadata) + self.parsedTorrent = metadata + } else { + self.metadata = metadata + try { + self.parsedTorrent = parseTorrent(self.metadata) + } catch (err) { + return self.emit('error', err) + } + } + + self.storage = new self._storageImpl(self.parsedTorrent, self.storageOpts) + self.storage.files.forEach(function (file) { + self.files.push(file) + }) + + self.storage.on('done', function () { + debug('torrent ' + self.infoHash + ' done') + self.emit('done') + }) + + if (self.verify) { + process.nextTick(function () { + debug('verifying existing torrent data') + var numPieces = 0 + var numVerified = 0 + + // TODO: move storage verification to storage.js? + parallel(self.storage.pieces.map(function (piece) { + return function (cb) { + self.storage.read(piece.index, function (err, buffer) { + numPieces += 1 + self.emit('verifying', { + percentDone: 100 * numPieces / self.storage.pieces.length, + percentVerified: 100 * numVerified / self.storage.pieces.length, + }) + + if (!err && buffer) { + // TODO: this is a bit hacky; figure out a cleaner way of verifying the buffer + piece.verify(buffer) + numVerified += piece.verified + debug('piece ' + (piece.verified ? 'verified' : 'invalid') + ' ' + piece.index) + } + // continue regardless of whether piece verification failed + cb() + }, true) // forces override to allow reading from non-verified pieces + } + }), self.storageReady.bind(self)) + }) + } else { + process.nextTick(self.storageReady.bind(self)) + } + + process.nextTick(function () { + self.torrent.updateMetadata(self.parsedTorrent) + self.emit('metadata') + }) +} + +torrentManager.prototype.storageReady = function() { + var self = this + self.torrent.attachStorage(self.storage); + self.webseed = new Webseed(self, self.storage, self.parsedTorrent); +} + +/** + * Destroy and cleanup this torrent. + */ +torrentManager.prototype.destroy = function (cb) { + var self = this + debug('destroy') + self._destroyed = true + clearInterval(self._rechokeIntervalId) + + var tasks = [] + if (self.torrent) tasks.push(function (cb) { + self.torrent.destroy(cb) + }) + + parallel(tasks, cb) +} + + +/** + * Make http or https request, following redirects. + * @param {string} u + * @param {function} + * @param {number=} maxRedirects + * @return {http.ClientRequest} + */ +function httpGet (u, cb, maxRedirects) { + cb = once(cb) + if (!maxRedirects) maxRedirects = 5 + if (maxRedirects === 0) return cb(new Error('too many redirects')) + + hh.get(u, function (res) { + // Check for redirect + if (res.statusCode >= 300 && res.statusCode < 400 && 'location' in res.headers) { + var location = res.headers.location + if (!url.parse(location).host) { + // If relative redirect, prepend host of current url + var parsed = url.parse(u) + location = parsed.protocol + '//' + parsed.host + location + } + res.resume() // discard response + return httpGet(location, cb, --maxRedirects) + } + res.pipe(concat(function (data) { + cb(null, data) + })) + }).on('error', cb) +} diff --git a/lib/torrent.js b/lib/torrent.js index 28d36037..0bb69983 100644 --- a/lib/torrent.js +++ b/lib/torrent.js @@ -14,11 +14,11 @@ var parseTorrent = require('parse-torrent') var RarityMap = require('./rarity-map') var reemit = require('re-emitter') var Server = require('./server') // browser exclude -var Storage = require('./storage') var Swarm = require('bittorrent-swarm') // `webtorrent-swarm` in browser var url = require('url') var ut_metadata = require('ut_metadata') var ut_pex = require('ut_pex') // browser exclude +var Storage = require('./storage') var MAX_BLOCK_LENGTH = 128 * 1024 var MAX_OUTSTANDING_REQUESTS = 5 @@ -39,16 +39,15 @@ inherits(Torrent, EventEmitter) * @param {string|Buffer|Object} torrentId * @param {Object} opts */ -function Torrent (torrentId, opts) { +function Torrent (manager, parsedTorrent, opts) { var self = this EventEmitter.call(self) debug('new torrent') self.client = opts.client - + self.manager = manager; self.hotswapEnabled = ('hotswap' in opts ? opts.hotswap : true) self.verify = opts.verify - self.storageOpts = opts.storageOpts self.chokeTimeout = opts.chokeTimeout || CHOKE_TIMEOUT self.pieceTimeout = opts.pieceTimeout || PIECE_TIMEOUT @@ -60,67 +59,39 @@ function Torrent (torrentId, opts) { self._rechokeIntervalId = null self.ready = false - self.files = [] self.metadata = null - self.parsedTorrent = null + self.parsedTorrent = parsedTorrent + self.infoHash = parsedTorrent.infoHash + if (parsedTorrent.name) self.name = parsedTorrent.name // preliminary name self.storage = null self.numBlockedPeers = 0 self._amInterested = false self._destroyed = false self._selections = [] self._critical = [] - self._storageImpl = opts.storage || Storage - - var parsedTorrent = parseTorrent(torrentId) - if (parsedTorrent && parsedTorrent.infoHash) { - onTorrentId(parsedTorrent) - - } else if (typeof hh.get === 'function' && /^https?:/.test(torrentId)) { - // http or https url to torrent file - httpGet(torrentId, function (err, torrent) { - if (err) - return self.emit('error', new Error('error downloading torrent: ' + err.message)) - onTorrentId(torrent) - }) - - } else if (typeof fs.readFile === 'function') { - // assume it's a filesystem path - fs.readFile(torrentId, function (err, torrent) { - if (err) return self.emit('error', new Error('invalid torrent id')) - onTorrentId(torrent) - }) - - } else throw new Error('invalid torrent id') + - function onTorrentId (torrentId) { - parsedTorrent = parseTorrent(torrentId) - self.infoHash = parsedTorrent.infoHash - if (parsedTorrent.name) self.name = parsedTorrent.name // preliminary name - - // create swarm - self.swarm = new Swarm(self.infoHash, self.client.peerId, { - handshake: { dht: !!self.client.dht } - }) - reemit(self.swarm, self, ['warning', 'error']) - self.swarm.on('wire', self._onWire.bind(self)) + // create swarm + self.swarm = new Swarm(self.infoHash, self.client.peerId, { + handshake: { dht: !!self.client.dht } + }) + reemit(self.swarm, self, ['warning', 'error']) + self.swarm.on('wire', self._onWire.bind(self)) - // update overall client stats - self.swarm.on('download', self.client.downloadSpeed.bind(self.client)) - self.swarm.on('upload', self.client.uploadSpeed.bind(self.client)) + // update overall client stats + self.swarm.on('download', self.client.downloadSpeed.bind(self.client)) + self.swarm.on('upload', self.client.uploadSpeed.bind(self.client)) - if (process.browser) { - // in browser, swarm does not listen - self._onSwarmListening(parsedTorrent) - } else { - // listen for peers - self.swarm.listen(self.client.torrentPort, self._onSwarmListening.bind(self, parsedTorrent)) - } - process.nextTick(function () { - self.emit('infoHash') - }) + if (process.browser) { + // in browser, swarm does not listen + self._onSwarmListening(parsedTorrent) + } else { + // listen for peers + self.swarm.listen(self.client.torrentPort, self._onSwarmListening.bind(self, parsedTorrent)) } } + // torrent size (in bytes) Object.defineProperty(Torrent.prototype, 'length', { get: function () { @@ -164,62 +135,10 @@ Object.defineProperty(Torrent.prototype, 'ratio', { } }) -Torrent.prototype._onSwarmListening = function (parsed, port) { +Torrent.prototype.attachStorage = function(storage) { var self = this - if (self._destroyed) return - - self.client.torrentPort = port - - // begin discovering peers via the DHT and tracker servers - self.discovery = new Discovery({ - announce: parsed.announce, - dht: self.client.dht, - tracker: self.client.tracker, - peerId: self.client.peerId, - port: port - }) - self.discovery.setTorrent(self.infoHash) - self.discovery.on('peer', self.addPeer.bind(self)) - - // expose discovery events - reemit(self.discovery, self, ['dhtAnnounce', 'warning', 'error']) - - // if full metadata was included in initial torrent id, use it - if (parsed.info) self._onMetadata(parsed) - - self.emit('listening', port) -} - -/** - * Called when the metadata is received. - */ -Torrent.prototype._onMetadata = function (metadata) { - var self = this - if (self.metadata || self._destroyed) return - debug('got metadata') - - if (metadata && metadata.infoHash) { - // `metadata` is a parsed torrent (from parse-torrent module) - self.metadata = parseTorrent.toBuffer(metadata) - self.parsedTorrent = metadata - } else { - self.metadata = metadata - try { - self.parsedTorrent = parseTorrent(self.metadata) - } catch (err) { - return self.emit('error', err) - } - } - - // update preliminary torrent name - self.name = self.parsedTorrent.name - - // update discovery module with full torrent metadata - self.discovery.setTorrent(self.parsedTorrent) - - self.rarityMap = new RarityMap(self.swarm, self.parsedTorrent.pieces.length) - - self.storage = new self._storageImpl(self.parsedTorrent, self.storageOpts) + self.storage = storage + debug('attaching storage') self.storage.on('piece', self._onStoragePiece.bind(self)) self.storage.on('file', function (file) { self.emit('file', file) @@ -234,16 +153,29 @@ Torrent.prototype._onMetadata = function (metadata) { self.discovery.tracker.complete() debug('torrent ' + self.infoHash + ' done') - self.emit('done') }) self.storage.on('select', self.select.bind(self)) self.storage.on('deselect', self.deselect.bind(self)) self.storage.on('critical', self.critical.bind(self)) + process.nextTick(self._onStorage.bind(self)) +} - self.storage.files.forEach(function (file) { - self.files.push(file) - }) +Torrent.prototype.updateMetadata = function(parsedTorrent) { + var self = this + self.parsedTorrent = parsedTorrent + // update preliminary torrent name + self.name = self.parsedTorrent.name + self.metadata = parseTorrent.toBuffer(parsedTorrent) + + // update discovery module with full torrent metadata + if(!!self.discovery) { + self.discovery.setTorrent(self.parsedTorrent) + } else { + self.on('listening', function() { self.discovery.setTorrent(self.parsedTorrent) }); + } + + self.rarityMap = new RarityMap(self.swarm, self.parsedTorrent.pieces.length) self.swarm.wires.forEach(function (wire) { // If we didn't have the metadata at the time ut_metadata was initialized for this @@ -252,44 +184,32 @@ Torrent.prototype._onMetadata = function (metadata) { self._onWireWithMetadata(wire) }) +} - if (self.verify) { - process.nextTick(function () { - debug('verifying existing torrent data') - var numPieces = 0 - var numVerified = 0 - - // TODO: move storage verification to storage.js? - parallel(self.storage.pieces.map(function (piece) { - return function (cb) { - self.storage.read(piece.index, function (err, buffer) { - numPieces += 1 - self.emit('verifying', { - percentDone: 100 * numPieces / self.storage.pieces.length, - percentVerified: 100 * numVerified / self.storage.pieces.length, - }) - - if (!err && buffer) { - // TODO: this is a bit hacky; figure out a cleaner way of verifying the buffer - piece.verify(buffer) - numVerified += piece.verified - debug('piece ' + (piece.verified ? 'verified' : 'invalid') + ' ' + piece.index) - } - // continue regardless of whether piece verification failed - cb() - }, true) // forces override to allow reading from non-verified pieces - } - }), self._onStorage.bind(self)) - }) - } else { - process.nextTick(self._onStorage.bind(self)) - } +Torrent.prototype._onSwarmListening = function (parsed, port) { + var self = this + if (self._destroyed) return - process.nextTick(function () { - self.emit('metadata') + self.client.torrentPort = port + + // begin discovering peers via the DHT and tracker servers + self.discovery = new Discovery({ + announce: parsed.announce, + dht: self.client.dht, + tracker: self.client.tracker, + peerId: self.client.peerId, + port: port }) + self.discovery.setTorrent(self.infoHash) + self.discovery.on('peer', self.addPeer.bind(self)) + + // expose discovery events + reemit(self.discovery, self, ['dhtAnnounce', 'warning', 'error']) + + self.emit('listening', port) } + /** * Destroy and cleanup this torrent. */ @@ -403,14 +323,14 @@ Torrent.prototype.critical = function (start, end) { Torrent.prototype._onWire = function (wire) { var self = this - + debug(!!self.metadata ? "Metadata" : " No MEta"); // use ut_metadata extension wire.use(ut_metadata(self.metadata)) if (!self.metadata) { wire.ut_metadata.on('metadata', function (metadata) { debug('got metadata via ut_metadata') - self._onMetadata(metadata) + self.manager.onMetadata(metadata) }) wire.ut_metadata.fetch() } @@ -965,13 +885,6 @@ Torrent.prototype._request = function (wire, index, hotswap) { return true } -Torrent.prototype.createServer = function (opts) { - var self = this - if (typeof Server === 'function' /* browser exclude */) { - return new Server(self, opts) - } -} - /** * Returns a random integer in [0,high) */ @@ -997,33 +910,3 @@ function randomizedForEach (array, cb) { cb(array[index], index, array) }) } - -/** - * Make http or https request, following redirects. - * @param {string} u - * @param {function} - * @param {number=} maxRedirects - * @return {http.ClientRequest} - */ -function httpGet (u, cb, maxRedirects) { - cb = once(cb) - if (!maxRedirects) maxRedirects = 5 - if (maxRedirects === 0) return cb(new Error('too many redirects')) - - hh.get(u, function (res) { - // Check for redirect - if (res.statusCode >= 300 && res.statusCode < 400 && 'location' in res.headers) { - var location = res.headers.location - if (!url.parse(location).host) { - // If relative redirect, prepend host of current url - var parsed = url.parse(u) - location = parsed.protocol + '//' + parsed.host + location - } - res.resume() // discard response - return httpGet(location, cb, --maxRedirects) - } - res.pipe(concat(function (data) { - cb(null, data) - })) - }).on('error', cb) -} diff --git a/lib/webseed.js b/lib/webseed.js new file mode 100644 index 00000000..be97fb81 --- /dev/null +++ b/lib/webseed.js @@ -0,0 +1,155 @@ +module.exports = Webseed +var BLOCK_LENGTH = 16 * 1024 +var url = require('url'); +var http = require('http-https') +var BlockStream = require('block-stream') +var request = require('request') +var fs = require('fs'); + +var BLOCK_BLANK = 0 +var BLOCK_RESERVED = 1 +var BLOCK_WRITTEN = 2 + +function noop () {} + +function Webseed(torrentManager, storage, parsedTorrent) { + var self = this + self.storage = storage + if(parsedTorrent.urlList[0]) { + self.download(parsedTorrent.urlList[0]); + } +} + + +Webseed.prototype.download = function(file_url) { + var self = this + console.log(file_url); + for(i = 0; i < self.storage.pieces.length; i++) { + if (!self.storage.bitfield.get(i)) { + break; + } + } + var piece_index = i + var piece = self.storage.pieces[piece_index] + self.first_piece = piece + var len = piece.blocks.length + for (var i = 0; i < len; i++) { + if (!piece.blocks[i]) { + break; + } + } + self.first_block = i + self.last_piece = self.first_piece + self.last_block = self.first_block + for(j = 0; j < 20; j++) { + if(!self.reserveNext()) { + console.log('break'); + break + } + } + + var first_byte = self.storage.pieces[0].blocks.length * BLOCK_LENGTH * self.first_piece.index + self.first_block * BLOCK_LENGTH; + console.log('first byte' + first_byte) + + var options = { + headers: { + range: 'bytes='+ first_byte + '-' + }, + uri: file_url + }; + console.log(options); + var buffers = [] + var buflen = 0 + var first = false; + self.request = request(options); + self.storage.on('warning', self.request.abort) + //var file = fs.createWriteStream("file.mp4"); + self.request.pipe(new BlockStream(BLOCK_LENGTH).on('data', self.write.bind(self))); + /* + request(options).on('data', function (block) { + console.log(block.length); + console.log('recieved http writing'); + + buffers.push(block); + buflen += block.length; + console.log(self.first_block + " : " + (self.first_piece.blocks.length - 1)); + var lastblock = (self.first_block === self.first_piece.blocks.length - 1); + console.log(" -> left: " + (self.first_piece.length - (self.first_block * BLOCK_LENGTH)) + " " + (lastblock ? "true" : "false") ); + var need = lastblock ? self.first_piece.length - (self.first_block * BLOCK_LENGTH) : BLOCK_LENGTH; + if(buflen >= need) { + console.log('enogh buff') + cur = buffers[0] + console.log('need: ' + need); + out = null; + outlen = 0; + if(cur.length < need) { + console.log('blah'); + out = cur; + outlen += cur.length + buffers = buffers.slice(1) + } + out = Buffer.concat([out, buffers[0].slice(0, need - outlen)]); + buffers[0] = buffers[0].slice(need - outlen); + self.write(block); + } + + + }); + */ + +} + +Webseed.prototype.write = function(block) { + var self = this + console.log('writing: ' + block.length + ' ' + self.first_block); + self.first_piece.writeBlock(BLOCK_LENGTH * self.first_block, block, function(err) { console.log(err)} ); + if (!self.increment()) { + console.log("Abort"); + self.request.abort(); + } + console.log('incre : ' + self.first_piece.index + ' : ' + self.first_block); + self.reserveNext() +} + +Webseed.prototype.increment = function() { + var self = this + var next = self.first_block + 1 + if(next < self.last_block) { + self.first_block = next + return true; + } else if (self.first_piece.index >= self.last_piece.index) { + return false; + } + + if (next >= self.last_piece.blocks.length) { + console.log('increamment'); + self.first_piece = self.storage.pieces[self.first_piece.index + 1] + self.first_block = 0 + return true; + } + self.first_block = next + console.log('lastfalse'); + return false; +} + +Webseed.prototype.reserveNext = function() { + var self = this + var next = self.last_block + 1 + console.log('next = ' + next + ' len = ' + self.last_piece.blocks.length) + if (next > self.last_piece.blocks.length) { + console.log('upping piece') + if(!self.storage.bitfield.get(self.last_piece.index + 1) && !self.storage.pieces[self.last_piece.index + 1].blocks[0]) { + self.last_piece = self.storage.pieces[self.last_piece.index + 1] + var next = 0 + } else { + return false; + } + } + if(!self.last_piece.blocks[next]) { + self.last_piece.blocks[next] = BLOCK_RESERVED + self.last_block = next + console.log('reserved : ' + self.last_piece.index + ' : ' + self.last_block); + return true; + } + return false; +} \ No newline at end of file