diff --git a/lib/loggly/client.js b/lib/loggly/client.js index 9f18f30..d389493 100644 --- a/lib/loggly/client.js +++ b/lib/loggly/client.js @@ -5,7 +5,10 @@ * MIT LICENSE * */ - +// +// Setting constant value of EVENT_SIZE variable +// +var EVENT_SIZE = 1000 * 1000; var events = require('events'), util = require('util'), qs = require('querystring'), @@ -23,6 +26,17 @@ function stringify(msg) { return payload; } // +// function to truncate message over 1 MB +// +function truncateLargeMessage(message) { + var maximumBytesAllowedToLoggly = EVENT_SIZE; + var bytesLengthOfLogMessage = Buffer.byteLength(message); + if(bytesLengthOfLogMessage > MaximumBytesAllowedToLoggly) { + message = message.slice(0, MaximumBytesAllowedToLoggly); + } + return message; +} +// // function createClient (options) // Creates a new instance of a Loggly client. // @@ -56,6 +70,7 @@ var Loggly = exports.Loggly = function (options) { this.userAgent = 'node-loggly ' + loggly.version; this.useTagHeader = 'useTagHeader' in options ? options.useTagHeader : true; this.isBulk = options.isBulk || false; + this.bufferOptions = options.bufferOptions || {size: 500, retriesInMilliSeconds: 30 * 1000}; // // Set the tags on this instance. // @@ -90,6 +105,7 @@ util.inherits(Loggly, events.EventEmitter); // - http://www.loggly.com/docs/api-sending-data/ // Loggly.prototype.log = function (msg, tags, callback) { + msg.message = truncateLargeMessage(msg.message) if (!callback && typeof tags === 'function') { callback = tags; tags = null; @@ -119,6 +135,7 @@ Loggly.prototype.log = function (msg, tags, callback) { body: msg, proxy: this.proxy, isBulk: this.isBulk, + bufferOptions: this.bufferOptions, headers: { host: this.host, accept: '*/*', diff --git a/lib/loggly/common.js b/lib/loggly/common.js index 785563d..85de969 100644 --- a/lib/loggly/common.js +++ b/lib/loggly/common.js @@ -13,6 +13,17 @@ var arrSize = 100, arrMsg = [], timerFunction = null; +// +// Variables for buffer array +// +var arrBufferedMsg = [], + timerFunctionForBufferedLogs = null; + +// +// flag variable to validate authToken +// +var isValidToken = true; + var https = require('https'), util = require('util'), request = require('request'), @@ -77,7 +88,8 @@ common.loggly = function () { auth, proxy, isBulk, - uri; + uri, + bufferOptions; // // Now that we've popped off the two callbacks @@ -99,6 +111,7 @@ common.loggly = function () { isBulk = args[0].isBulk; headers = args[0].headers; proxy = args[0].proxy; + bufferOptions = args[0].bufferOptions; } } else if (args.length === 2) { @@ -113,6 +126,18 @@ common.loggly = function () { } function onError(err) { + if(!isValidToken){ + console.log(err); + return; + } + var arrayLogs = []; + if(isBulk) { + arrayLogs = requestOptions.body.split('\n'); + } else { + arrayLogs.push(requestOptions.body); + } + storeLogs(arrayLogs); + if (!responded) { responded = true; if (callback) { callback(err) } @@ -125,6 +150,8 @@ common.loggly = function () { proxy: proxy }; + var requestOptionsForBufferedLogs = JSON.parse(JSON.stringify(requestOptions)) + if (auth) { requestOptions.headers.authorization = 'Basic ' + new Buffer(auth.username + ':' + auth.password).toString('base64'); } @@ -138,9 +165,10 @@ common.loggly = function () { return onError(err); } var statusCode = res.statusCode.toString(); + if(statusCode === '403') isValidToken = false; if (Object.keys(failCodes).indexOf(statusCode) !== -1) { return onError((new Error('Loggly Error (' + statusCode + '): ' + failCodes[statusCode]))); - } + } success(res, body); }); } @@ -156,12 +184,14 @@ common.loggly = function () { // Join Array Message with new line ('\n') character // requestOptions.body = arrMsg.join('\n'); + arrMsg.length = 0; try { request(requestOptions, function (err, res, body) { if (err) { return onError(err); } var statusCode = res.statusCode.toString(); + if(statusCode === '403') isValidToken = false; if (Object.keys(failCodes).indexOf(statusCode) !== -1) { return onError((new Error('Loggly Error (' + statusCode + '): ' + failCodes[statusCode]))); } @@ -171,14 +201,8 @@ common.loggly = function () { catch (ex) { onError(ex); } - finally { - // - // Empty the array - // - arrMsg.length = 0; - } } - if (isBulk === true) { + if (isBulk && isValidToken) { if (timerFunction === null) { timerFunction = setInterval(function () { sendBulkLogs(); @@ -189,9 +213,48 @@ common.loggly = function () { sendBulkLogs(); } } - else { + else if(isValidToken) { sendLogs(); } + + // + // retries to send buffered logs to loggly in every 5 sec + // + if (timerFunctionForBufferedLogs === null) { + timerFunctionForBufferedLogs = setInterval(function () { + if (arrBufferedMsg.length) sendBufferdLogstoLoggly(); + }, bufferOptions.retriesInMilliSeconds); + } + + + function sendBufferdLogstoLoggly() { + if (!arrBufferedMsg.length) return; + var arrayMessage = []; + var bulkModeBunch = arrSize; + var inputModeBunch = 1; + var logsInBunch = isBulk ? bulkModeBunch : inputModeBunch; + arrayMessage = arrBufferedMsg.slice(0, logsInBunch); + requestOptionsForBufferedLogs.body = isBulk ? arrayMessage.join('\n') : arrayMessage[0]; + request(requestOptionsForBufferedLogs, function (err, res, body) { + if(err) return; + var statusCode = res.statusCode.toString(); + if(statusCode === "200") { + arrBufferedMsg.splice(0, logsInBunch); + sendBufferdLogstoLoggly(); + } + }); + requestOptionsForBufferedLogs.body = ''; + } + +// +// This function will store logs into buffer +// + function storeLogs(logs) { + if (!logs.length) return; + var numberOfLogsToBeRemoved = (arrBufferedMsg.length + logs.length) - bufferOptions.size; + if (numberOfLogsToBeRemoved > 0) arrBufferedMsg = arrBufferedMsg.splice(numberOfLogsToBeRemoved); + arrBufferedMsg = arrBufferedMsg.concat(logs); + } }; // // ### function serialize (obj, key)