From 2f488c652441c264814a48302bf81a8945553a28 Mon Sep 17 00:00:00 2001 From: Ivan Gorbanev Date: Mon, 21 Oct 2019 00:16:21 +0300 Subject: [PATCH 1/2] Add global down/up limit --- index.js | 28 ++++++++++++++++++++++++++++ lib/peer.js | 19 ++++++++++++++++++- lib/tcp-pool.js | 2 ++ package.json | 3 ++- 4 files changed, 50 insertions(+), 2 deletions(-) diff --git a/index.js b/index.js index 135fb93c..0eead3e8 100644 --- a/index.js +++ b/index.js @@ -12,6 +12,7 @@ const path = require('path') const Peer = require('simple-peer') const randombytes = require('randombytes') const speedometer = require('speedometer') +const ThrottleGroup = require('stream-throttle').ThrottleGroup const TCPPool = require('./lib/tcp-pool') // browser exclude const Torrent = require('./lib/torrent') @@ -71,12 +72,19 @@ class WebTorrent extends EventEmitter { this.tracker = opts.tracker !== undefined ? opts.tracker : {} this.torrents = [] this.maxConns = Number(opts.maxConns) || 55 + this.downloadLimit = Number(opts.downloadLimit) || Number.MAX_VALUE + this.uploadLimit = Number(opts.uploadLimit) || Number.MAX_VALUE this._debug( 'new webtorrent (peerId %s, nodeId %s, port %s)', this.peerId, this.nodeId, this.torrentPort ) + this.throttleGroups = { + down: new ThrottleGroup({rate: this.downloadLimit}), + up: new ThrottleGroup({rate: this.uploadLimit}) + } + if (this.tracker) { if (typeof this.tracker !== 'object') this.tracker = {} if (opts.rtcConfig) { @@ -355,6 +363,26 @@ class WebTorrent extends EventEmitter { : { address: '0.0.0.0', family: 'IPv4', port: 0 } } + /** + * Set global download throttle rate + * @param {Number} rate + */ + throttleDownload(rate) { + if (!Number(rate) || Number(rate) < 0) return + this.throttleGroups.down.bucket.bucketSize = rate + this.throttleGroups.down.bucket.tokensPerInterval = rate + } + + /** + * Set global upload throttle rate + * @param {Number} rate + */ + throttleUpload(rate) { + if (!Number(rate) || Number(rate) < 0) return + this.throttleGroups.up.bucket.bucketSize = rate + this.throttleGroups.up.bucket.tokensPerInterval = rate + } + /** * Destroy the client, including all torrents and connections to peers. * @param {function} cb diff --git a/lib/peer.js b/lib/peer.js index 0ddf5359..e6fc1dfd 100644 --- a/lib/peer.js +++ b/lib/peer.js @@ -142,10 +142,27 @@ class Peer { }) this.startHandshakeTimeout() - conn.pipe(wire).pipe(conn) + if ((this.type === 'tcpOutgoing') || (this.type === 'webrtc')) { + this.setThrottlePipes() + } else { + conn.pipe(wire).pipe(conn) + } + if (this.swarm && !this.sentHandshake) this.handshake() } + clearPipes() { + this.wire.unpipe() + this.conn.unpipe() + } + + setThrottlePipes() { + this.conn.pipe(this.swarm.client.throttleGroups.down.throttle()) + .pipe(this.wire) + .pipe(this.swarm.client.throttleGroups.up.throttle()) + .pipe(this.conn) + } + /** * Called when handshake is received from remote peer. * @param {string} infoHash diff --git a/lib/tcp-pool.js b/lib/tcp-pool.js index 1094c4fd..47a6f796 100644 --- a/lib/tcp-pool.js +++ b/lib/tcp-pool.js @@ -103,6 +103,8 @@ class TCPPool { const torrent = self._client.get(infoHash) if (torrent) { peer.swarm = torrent + peer.clearPipes() + peer.setThrottlePipes() torrent._addIncomingPeer(peer) peer.onHandshake(infoHash, peerId) } else { diff --git a/package.json b/package.json index 4c7eecfd..1578928a 100644 --- a/package.json +++ b/package.json @@ -74,7 +74,8 @@ "uniq": "^1.0.1", "unordered-array-remove": "^1.0.2", "ut_metadata": "^3.3.0", - "ut_pex": "^2.0.0" + "ut_pex": "^2.0.0", + "stream-throttle": "^0.1.3" }, "devDependencies": { "airtap": "^2.0.3", From f7d3cb3ecbdaf62e7441a8f7cd31b7914f703a06 Mon Sep 17 00:00:00 2001 From: Ivan Gorbanev Date: Mon, 21 Oct 2019 15:18:06 +0300 Subject: [PATCH 2/2] fix spaces --- index.js | 8 ++++---- lib/peer.js | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/index.js b/index.js index 0eead3e8..8a3f673c 100644 --- a/index.js +++ b/index.js @@ -81,8 +81,8 @@ class WebTorrent extends EventEmitter { ) this.throttleGroups = { - down: new ThrottleGroup({rate: this.downloadLimit}), - up: new ThrottleGroup({rate: this.uploadLimit}) + down: new ThrottleGroup({ rate: this.downloadLimit }), + up: new ThrottleGroup({ rate: this.uploadLimit }) } if (this.tracker) { @@ -367,7 +367,7 @@ class WebTorrent extends EventEmitter { * Set global download throttle rate * @param {Number} rate */ - throttleDownload(rate) { + throttleDownload (rate) { if (!Number(rate) || Number(rate) < 0) return this.throttleGroups.down.bucket.bucketSize = rate this.throttleGroups.down.bucket.tokensPerInterval = rate @@ -377,7 +377,7 @@ class WebTorrent extends EventEmitter { * Set global upload throttle rate * @param {Number} rate */ - throttleUpload(rate) { + throttleUpload (rate) { if (!Number(rate) || Number(rate) < 0) return this.throttleGroups.up.bucket.bucketSize = rate this.throttleGroups.up.bucket.tokensPerInterval = rate diff --git a/lib/peer.js b/lib/peer.js index e6fc1dfd..9bba7777 100644 --- a/lib/peer.js +++ b/lib/peer.js @@ -151,12 +151,12 @@ class Peer { if (this.swarm && !this.sentHandshake) this.handshake() } - clearPipes() { + clearPipes () { this.wire.unpipe() this.conn.unpipe() } - setThrottlePipes() { + setThrottlePipes () { this.conn.pipe(this.swarm.client.throttleGroups.down.throttle()) .pipe(this.wire) .pipe(this.swarm.client.throttleGroups.up.throttle())