diff --git a/index.js b/index.js index 135fb93c..8a3f673c 100644 --- a/index.js +++ b/index.js @@ -12,6 +12,7 @@ const path = require('path') const Peer = require('simple-peer') const randombytes = require('randombytes') const speedometer = require('speedometer') +const ThrottleGroup = require('stream-throttle').ThrottleGroup const TCPPool = require('./lib/tcp-pool') // browser exclude const Torrent = require('./lib/torrent') @@ -71,12 +72,19 @@ class WebTorrent extends EventEmitter { this.tracker = opts.tracker !== undefined ? opts.tracker : {} this.torrents = [] this.maxConns = Number(opts.maxConns) || 55 + this.downloadLimit = Number(opts.downloadLimit) || Number.MAX_VALUE + this.uploadLimit = Number(opts.uploadLimit) || Number.MAX_VALUE this._debug( 'new webtorrent (peerId %s, nodeId %s, port %s)', this.peerId, this.nodeId, this.torrentPort ) + this.throttleGroups = { + down: new ThrottleGroup({ rate: this.downloadLimit }), + up: new ThrottleGroup({ rate: this.uploadLimit }) + } + if (this.tracker) { if (typeof this.tracker !== 'object') this.tracker = {} if (opts.rtcConfig) { @@ -355,6 +363,26 @@ class WebTorrent extends EventEmitter { : { address: '0.0.0.0', family: 'IPv4', port: 0 } } + /** + * Set global download throttle rate + * @param {Number} rate + */ + throttleDownload (rate) { + if (!Number(rate) || Number(rate) < 0) return + this.throttleGroups.down.bucket.bucketSize = rate + this.throttleGroups.down.bucket.tokensPerInterval = rate + } + + /** + * Set global upload throttle rate + * @param {Number} rate + */ + throttleUpload (rate) { + if (!Number(rate) || Number(rate) < 0) return + this.throttleGroups.up.bucket.bucketSize = rate + this.throttleGroups.up.bucket.tokensPerInterval = rate + } + /** * Destroy the client, including all torrents and connections to peers. * @param {function} cb diff --git a/lib/peer.js b/lib/peer.js index 0ddf5359..9bba7777 100644 --- a/lib/peer.js +++ b/lib/peer.js @@ -142,10 +142,27 @@ class Peer { }) this.startHandshakeTimeout() - conn.pipe(wire).pipe(conn) + if ((this.type === 'tcpOutgoing') || (this.type === 'webrtc')) { + this.setThrottlePipes() + } else { + conn.pipe(wire).pipe(conn) + } + if (this.swarm && !this.sentHandshake) this.handshake() } + clearPipes () { + this.wire.unpipe() + this.conn.unpipe() + } + + setThrottlePipes () { + this.conn.pipe(this.swarm.client.throttleGroups.down.throttle()) + .pipe(this.wire) + .pipe(this.swarm.client.throttleGroups.up.throttle()) + .pipe(this.conn) + } + /** * Called when handshake is received from remote peer. * @param {string} infoHash diff --git a/lib/tcp-pool.js b/lib/tcp-pool.js index 1094c4fd..47a6f796 100644 --- a/lib/tcp-pool.js +++ b/lib/tcp-pool.js @@ -103,6 +103,8 @@ class TCPPool { const torrent = self._client.get(infoHash) if (torrent) { peer.swarm = torrent + peer.clearPipes() + peer.setThrottlePipes() torrent._addIncomingPeer(peer) peer.onHandshake(infoHash, peerId) } else { diff --git a/package.json b/package.json index 4c7eecfd..1578928a 100644 --- a/package.json +++ b/package.json @@ -74,7 +74,8 @@ "uniq": "^1.0.1", "unordered-array-remove": "^1.0.2", "ut_metadata": "^3.3.0", - "ut_pex": "^2.0.0" + "ut_pex": "^2.0.0", + "stream-throttle": "^0.1.3" }, "devDependencies": { "airtap": "^2.0.3",