From 9c5ac07d55b39f3869c65c88be512440ab2df589 Mon Sep 17 00:00:00 2001 From: Jonas Schnelli Date: Tue, 26 Apr 2016 13:15:53 +0200 Subject: [PATCH 1/2] [RPC] Add RPC long poll notifications --- qa/pull-tester/rpc-tests.py | 1 + qa/rpc-tests/rpcsignals.py | 57 ++++++++ src/Makefile.am | 1 + src/rpc/client.cpp | 2 + src/rpc/notifications.cpp | 311 ++++++++++++++++++++++++++++++++++++++++++++ src/rpc/register.h | 3 + 6 files changed, 375 insertions(+) create mode 100755 qa/rpc-tests/rpcsignals.py create mode 100644 src/rpc/notifications.cpp diff --git a/qa/pull-tester/rpc-tests.py b/qa/pull-tester/rpc-tests.py index 11b83bac146..fb2a6b8dd50 100755 --- a/qa/pull-tester/rpc-tests.py +++ b/qa/pull-tester/rpc-tests.py @@ -141,6 +141,7 @@ 'segwit.py', 'importprunedfunds.py', 'signmessages.py', + 'rpcsignals.py' ] if ENABLE_ZMQ: testScripts.append('zmq_test.py') diff --git a/qa/rpc-tests/rpcsignals.py b/qa/rpc-tests/rpcsignals.py new file mode 100755 index 00000000000..ce6b84eafac --- /dev/null +++ b/qa/rpc-tests/rpcsignals.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python2 +# Copyright (c) 2016 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +# Exercise the rpc signals API + +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import * +import threading + +class pollThread (threading.Thread): + def __init__(self, threadID, node): + threading.Thread.__init__(self) + self.threadID = threadID + self.node = node + def run(self): + print self.node.pollnotifications(5) + +class RpcSignalsTest(BitcoinTestFramework): + + def pollthread(self, node): + print "thread" + + def setup_nodes(self): + return start_nodes(4, self.options.tmpdir) + + def run_test(self): + data = self.nodes[0].getregisterednotifications() + assert(len(data) == 0) + self.nodes[0].setregisterednotifications(["hashblock", "hashtx"]) + data = self.nodes[0].getregisterednotifications() + assert("hashblock" in data) + assert("hashtx" in data) + + self.nodes[0].generate(1) + data = self.nodes[0].pollnotifications() + assert(len(data) == 2) + + myThread = pollThread(1, self.nodes[0]) + myThread.start() + time.sleep(3) + assert(myThread.isAlive() == True) + time.sleep(5) + assert(myThread.isAlive() == False) + myThread.join() + + self.nodes[0].setregisterednotifications([]) + data = self.nodes[0].getregisterednotifications() + assert(len(data) == 0) + self.nodes[0].generate(1) + data = self.nodes[0].pollnotifications(5) + assert(len(data) == 0) + +if __name__ == '__main__': + RpcSignalsTest().main() + diff --git a/src/Makefile.am b/src/Makefile.am index e3eaacdb4c9..2d3712bae42 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -184,6 +184,7 @@ libbitcoin_server_a_SOURCES = \ rpc/mining.cpp \ rpc/misc.cpp \ rpc/net.cpp \ + rpc/notifications.cpp \ rpc/rawtransaction.cpp \ rpc/server.cpp \ script/sigcache.cpp \ diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index d0675fdb495..5e311c42650 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -104,6 +104,8 @@ static const CRPCConvertParam vRPCConvertParams[] = { "setban", 3 }, { "getmempoolancestors", 1 }, { "getmempooldescendants", 1 }, + { "setregisterednotifications", 0 }, + { "pollnotifications", 0 }, }; class CRPCConvertTable diff --git a/src/rpc/notifications.cpp b/src/rpc/notifications.cpp new file mode 100644 index 00000000000..861f6913c69 --- /dev/null +++ b/src/rpc/notifications.cpp @@ -0,0 +1,311 @@ +// Copyright (c) 2016 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "rpc/server.h" +#include "chain.h" +#include "primitives/block.h" +#include "primitives/transaction.h" +#include "sync.h" +#include "util.h" +#include "utilstrencodings.h" +#include "validationinterface.h" + +#include + +#include +#include + +#include +#include +#include + +static const char *MSG_HASHBLOCK = "hashblock"; +static const char *MSG_HASHTX = "hashtx"; + +/* keep the max queue size large becase we don't + auto-register for notification on startup */ +static const size_t MAX_QUEUE_SIZE = 1024*1024; +static const int DEFAULT_POLL_TIMEOUT = 30; + +/** The help message mode determines what help message to show */ +enum NotificationType { + NotificationTypeUnknown, + NotificationTypeBlock, + NotificationTypeTx +}; + +typedef std::pair queueRange_t; + +class NotificationEntry +{ +public: + enum NotificationType type; + int64_t notificationtime; + int32_t sequenceNumber; + UniValue notification; +}; + +class NotificationQueue : public CValidationInterface +{ +public: + std::deque queue; + std::map mapSequenceNumbers; + std::set registeredNotificationTypes; + + CCriticalSection cs_notificationQueue; + + const std::string typeToString(enum NotificationType type) + { + switch (type) + { + case NotificationTypeBlock: + return MSG_HASHBLOCK; break; + case NotificationTypeTx: + return MSG_HASHTX; break; + default: + return "unknown"; + } + } + + enum NotificationType stringToType(const std::string& strType) + { + if (strType == MSG_HASHBLOCK) + return NotificationTypeBlock; + else if (strType == MSG_HASHTX) + return NotificationTypeTx; + else + return NotificationTypeUnknown; + } + + // populates a json object with all notifications in the queue + // returns a range to allow removing the elements from the queue + // after successfull transmitting + queueRange_t weakDequeueElements(UniValue &result) + { + size_t firstElement = 0; + size_t elementCount = 0; + + LOCK(cs_notificationQueue); + for (std::deque::iterator it = queue.begin(); it != queue.end(); it++) + { + UniValue obj = UniValue(UniValue::VOBJ); + obj.pushKV("type", typeToString(it->type)); + obj.pushKV("seq", it->sequenceNumber); + obj.pushKV("obj", it->notification); + result.push_back(obj); + elementCount++; + } + return std::make_pair(firstElement, elementCount); + } + + // removes notification in the given range from the queue + void eraseRangeFromQueue(const queueRange_t range) + { + LOCK(cs_notificationQueue); + queue.erase(queue.begin()+range.first, queue.begin()+range.first+range.second); + } + + // dequeues all notifications from the queue + void dequeueElements(UniValue &result) + { + queueRange_t range = weakDequeueElements(result); + eraseRangeFromQueue(range); + } + + bool elementsAvailable() + { + LOCK(cs_notificationQueue); + return queue.size()>0; + } + + void registerType(enum NotificationType type) + { + if (type == NotificationTypeUnknown) + return; + + LOCK(cs_notificationQueue); + registeredNotificationTypes.insert(type); + } + + void unregisterType(enum NotificationType type) + { + LOCK(cs_notificationQueue); + registeredNotificationTypes.erase(type); + } + + void unregisterAllTypes() { LOCK(cs_notificationQueue); registeredNotificationTypes.clear(); } + + void addToQueue(NotificationEntry entry) + { + LOCK(cs_notificationQueue); + + size_t queueSize = queue.size(); + if (queueSize > MAX_QUEUE_SIZE) + { + queue.pop_front(); + LogPrintf("RPC Notification limit has been reached, dropping oldest element\n"); + } + mapSequenceNumbers[entry.type]++; + entry.sequenceNumber = mapSequenceNumbers[entry.type]; + queue.push_back(entry); + } + + /* checks if a certain notification type is registered */ + bool isTypeRegistered(enum NotificationType type) + { + LOCK(cs_notificationQueue); + return (registeredNotificationTypes.find(type) != registeredNotificationTypes.end()); + } + + void SyncTransaction(const CTransaction& tx, const CBlockIndex *pindex, const CBlock* pblock) + { + if (!isTypeRegistered(NotificationTypeTx)) + return; + + NotificationEntry entry; + entry.type = NotificationTypeTx; + entry.notification.setStr(tx.GetHash().GetHex()); + addToQueue(entry); + } + + void UpdatedBlockTip(const CBlockIndex *pindex) + { + if (!isTypeRegistered(NotificationTypeBlock)) + return; + + NotificationEntry entry; + entry.type = NotificationTypeBlock; + entry.notification.setStr(pindex->GetBlockHash().GetHex()); + addToQueue(entry); + } +}; +static NotificationQueue *queueSharedInstance = NULL; + +UniValue setregisterednotifications(const UniValue& params, bool fHelp) +{ + if (fHelp || params.size() != 1) + throw std::runtime_error( + "setregisterednotifications [, , ...]\n" + "\nRegister for rpc notification(s).\n" + "\nNotifications can be polled by calling pollnotifications." + "\nArguments:\n" + "1. \"type\" (string, required) The notification type to register for (\"hashblock\", \"hashtx\")\n" + "\nExamples:\n" + "\nRegister for block and transaction notifications\n" + + HelpExampleCli("setregisterednotifications", "\"[\"hashblock\", \"hashtx\"]\" \"") + + "register for transaction and block signals\n" + ); + + RPCTypeCheck(params, boost::assign::list_of(UniValue::VARR), true); + + /* remove all current registered types */ + queueSharedInstance->unregisterAllTypes(); + + UniValue types = params[0].get_array(); + BOOST_FOREACH(const UniValue& newType, types.getValues()) + { + if (!newType.isStr()) + continue; + + enum NotificationType type = queueSharedInstance->stringToType(newType.get_str()); + if (type == NotificationTypeUnknown) + throw JSONRPCError(RPC_INVALID_PARAMETER, "Notification type not found"); + + queueSharedInstance->registerType(type); + } + + return NullUniValue; +} + +UniValue getregisterednotifications(const UniValue& params, bool fHelp) +{ + if (fHelp || params.size() != 0) + throw std::runtime_error( + "getregisterednotifications\n" + "\nReturns the currently registered RPC notification types.\n" + "\nResult:\n" + "\"[\"\n" + "\" \"\" (string) The registered signal\n" + "\" ,...\n" + "\"]\"\n" + "\nExamples:\n" + "\nCreate a transaction\n" + + HelpExampleCli("getregisterednotifications", "") + + "Get the registered notification types\n" + + HelpExampleRpc("getregisterednotifications", "") + ); + + UniValue result = UniValue(UniValue::VARR); + BOOST_FOREACH(enum NotificationType type, queueSharedInstance->registeredNotificationTypes) + result.push_back(queueSharedInstance->typeToString(type)); + + return result; +} + +UniValue pollnotifications(const UniValue& params, bool fHelp) +{ + if (fHelp || params.size() > 1) + throw std::runtime_error( + "pollnotifications \n" + "\nPolls all available notifications.\n" + "\nThe RPC thread will idle for the seconds and will\n" + "\nimmediately response if new notifications are available\n" + "\nBy constantly reconnecting/re-polling, losing notification\n" + "\nis almost impossible\n" + "\nArguments:\n" + "1. \"timeout\" (number, option) The timeout \n" + "\nResult:\n" + "\"[ notification, ... ]\" (object) The notification object\n" + "\nExamples:\n" + "\nPoll notifications\n" + + HelpExampleCli("pollnotifications", "500") + + "Long poll notification (max. 500 seconds)\n" + + HelpExampleRpc("pollnotifications", "500") + ); + + RPCTypeCheck(params, boost::assign::list_of(UniValue::VNUM), false); + + int64_t timeOut = DEFAULT_POLL_TIMEOUT; + if (params.size() == 1) + timeOut = params[0].get_int64(); + + int64_t startTime = GetTime(); + + UniValue result = UniValue(UniValue::VARR); + // allow long polling + while (IsRPCRunning()) + { + if (queueSharedInstance->elementsAvailable()) + { + queueSharedInstance->dequeueElements(result); + break; + } + if (startTime+timeOut+(500/1000.0) < GetTime()) + break; + MilliSleep(500); + } + + return result; +} + +static const CRPCCommand commands[] = +{ // category name actor (function) okSafeMode + // --------------------- ---------------------------- --------------------------- ---------- + { "notification", "setregisterednotifications", &setregisterednotifications, true }, + { "notification", "getregisterednotifications", &getregisterednotifications, true }, + { "notification", "pollnotifications", &pollnotifications, true }, + +}; + +void RegisterNotificationRPCCommands(CRPCTable &tableRPC) +{ + for (unsigned int vcidx = 0; vcidx < ARRAYLEN(commands); vcidx++) + tableRPC.appendCommand(commands[vcidx].name, &commands[vcidx]); + + if (!queueSharedInstance) + { + queueSharedInstance = new NotificationQueue(); + RegisterValidationInterface(queueSharedInstance); + } +} diff --git a/src/rpc/register.h b/src/rpc/register.h index 01aa58a25d8..9c6a440e613 100644 --- a/src/rpc/register.h +++ b/src/rpc/register.h @@ -19,6 +19,8 @@ void RegisterMiscRPCCommands(CRPCTable &tableRPC); void RegisterMiningRPCCommands(CRPCTable &tableRPC); /** Register raw transaction RPC commands */ void RegisterRawTransactionRPCCommands(CRPCTable &tableRPC); +/** Register RPC notification commands */ +void RegisterNotificationRPCCommands(CRPCTable &tableRPC); static inline void RegisterAllCoreRPCCommands(CRPCTable &tableRPC) { @@ -27,6 +29,7 @@ static inline void RegisterAllCoreRPCCommands(CRPCTable &tableRPC) RegisterMiscRPCCommands(tableRPC); RegisterMiningRPCCommands(tableRPC); RegisterRawTransactionRPCCommands(tableRPC); + RegisterNotificationRPCCommands(tableRPC); } #endif From 8660db9c23280ce57652398231e3be21678fc425 Mon Sep 17 00:00:00 2001 From: Jonas Schnelli Date: Tue, 26 Apr 2016 19:39:18 +0200 Subject: [PATCH 2/2] [RPC] allow multiple clients to listening for RPC notifications --- qa/rpc-tests/rpcsignals.py | 33 ++++++----- src/rpc/client.cpp | 4 +- src/rpc/notifications.cpp | 143 +++++++++++++++++++++++++++++++-------------- 3 files changed, 121 insertions(+), 59 deletions(-) diff --git a/qa/rpc-tests/rpcsignals.py b/qa/rpc-tests/rpcsignals.py index ce6b84eafac..b1875e74a67 100755 --- a/qa/rpc-tests/rpcsignals.py +++ b/qa/rpc-tests/rpcsignals.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +#!/usr/bin/env python3 # Copyright (c) 2016 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. @@ -8,36 +8,43 @@ from test_framework.test_framework import BitcoinTestFramework from test_framework.util import * import threading +import uuid class pollThread (threading.Thread): - def __init__(self, threadID, node): + def __init__(self, threadID, node, clientUUID): threading.Thread.__init__(self) self.threadID = threadID self.node = node + self.uuid = clientUUID def run(self): - print self.node.pollnotifications(5) + self.node.pollnotifications(self.uuid, 5) class RpcSignalsTest(BitcoinTestFramework): def pollthread(self, node): - print "thread" + print("thread") def setup_nodes(self): return start_nodes(4, self.options.tmpdir) def run_test(self): - data = self.nodes[0].getregisterednotifications() - assert(len(data) == 0) - self.nodes[0].setregisterednotifications(["hashblock", "hashtx"]) - data = self.nodes[0].getregisterednotifications() + clientUUID = str(uuid.uuid1()) + try: + self.nodes[0].getregisterednotifications(clientUUID) + raise AssertionError("ClientUUID should not exists") + except JSONRPCException as e: + assert(e.error['code']==-8) + + self.nodes[0].setregisterednotifications(clientUUID, ["hashblock", "hashtx"]) + data = self.nodes[0].getregisterednotifications(clientUUID) assert("hashblock" in data) assert("hashtx" in data) self.nodes[0].generate(1) - data = self.nodes[0].pollnotifications() + data = self.nodes[0].pollnotifications(clientUUID) assert(len(data) == 2) - myThread = pollThread(1, self.nodes[0]) + myThread = pollThread(1, self.nodes[0], clientUUID) myThread.start() time.sleep(3) assert(myThread.isAlive() == True) @@ -45,11 +52,11 @@ def run_test(self): assert(myThread.isAlive() == False) myThread.join() - self.nodes[0].setregisterednotifications([]) - data = self.nodes[0].getregisterednotifications() + self.nodes[0].setregisterednotifications(clientUUID, []) + data = self.nodes[0].getregisterednotifications(clientUUID) assert(len(data) == 0) self.nodes[0].generate(1) - data = self.nodes[0].pollnotifications(5) + data = self.nodes[0].pollnotifications(clientUUID, 5) assert(len(data) == 0) if __name__ == '__main__': diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index 5e311c42650..f89fc16cdf4 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -104,8 +104,8 @@ static const CRPCConvertParam vRPCConvertParams[] = { "setban", 3 }, { "getmempoolancestors", 1 }, { "getmempooldescendants", 1 }, - { "setregisterednotifications", 0 }, - { "pollnotifications", 0 }, + { "setregisterednotifications", 1 }, + { "pollnotifications", 1 }, }; class CRPCConvertTable diff --git a/src/rpc/notifications.cpp b/src/rpc/notifications.cpp index 861f6913c69..207548d0a5b 100644 --- a/src/rpc/notifications.cpp +++ b/src/rpc/notifications.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include @@ -28,7 +29,6 @@ static const char *MSG_HASHTX = "hashtx"; static const size_t MAX_QUEUE_SIZE = 1024*1024; static const int DEFAULT_POLL_TIMEOUT = 30; -/** The help message mode determines what help message to show */ enum NotificationType { NotificationTypeUnknown, NotificationTypeBlock, @@ -36,6 +36,7 @@ enum NotificationType { }; typedef std::pair queueRange_t; +typedef std::string clientUUID_t; class NotificationEntry { @@ -46,7 +47,7 @@ class NotificationEntry UniValue notification; }; -class NotificationQueue : public CValidationInterface +class NotificationQueue { public: std::deque queue; @@ -81,7 +82,7 @@ class NotificationQueue : public CValidationInterface // populates a json object with all notifications in the queue // returns a range to allow removing the elements from the queue // after successfull transmitting - queueRange_t weakDequeueElements(UniValue &result) + queueRange_t weakDequeueNotifications(UniValue &result) { size_t firstElement = 0; size_t elementCount = 0; @@ -99,7 +100,7 @@ class NotificationQueue : public CValidationInterface return std::make_pair(firstElement, elementCount); } - // removes notification in the given range from the queue + // removes notifications in the given range from the queue void eraseRangeFromQueue(const queueRange_t range) { LOCK(cs_notificationQueue); @@ -109,7 +110,7 @@ class NotificationQueue : public CValidationInterface // dequeues all notifications from the queue void dequeueElements(UniValue &result) { - queueRange_t range = weakDequeueElements(result); + queueRange_t range = weakDequeueNotifications(result); eraseRangeFromQueue(range); } @@ -157,62 +158,109 @@ class NotificationQueue : public CValidationInterface LOCK(cs_notificationQueue); return (registeredNotificationTypes.find(type) != registeredNotificationTypes.end()); } +}; + +class NotificationQueueManager : public CValidationInterface +{ +public: + CCriticalSection cs_queueManager; + std::map mapSequenceNumbers; + + NotificationQueue* getQueue(const clientUUID_t& clientid) + { + LOCK(cs_queueManager); + return mapSequenceNumbers[clientid]; + } + + NotificationQueue* addQueue(const clientUUID_t& clientid) + { + LOCK(cs_queueManager); + mapSequenceNumbers[clientid] = new NotificationQueue(); + return mapSequenceNumbers[clientid]; + } void SyncTransaction(const CTransaction& tx, const CBlockIndex *pindex, const CBlock* pblock) { - if (!isTypeRegistered(NotificationTypeTx)) - return; + LOCK(cs_queueManager); + BOOST_FOREACH(NotificationQueue* queue, mapSequenceNumbers | boost::adaptors::map_values) + { + if (!queue->isTypeRegistered(NotificationTypeTx)) + return; - NotificationEntry entry; - entry.type = NotificationTypeTx; - entry.notification.setStr(tx.GetHash().GetHex()); - addToQueue(entry); + NotificationEntry entry; + entry.type = NotificationTypeTx; + entry.notification.setStr(tx.GetHash().GetHex()); + queue->addToQueue(entry); + } } void UpdatedBlockTip(const CBlockIndex *pindex) { - if (!isTypeRegistered(NotificationTypeBlock)) - return; + LOCK(cs_queueManager); + BOOST_FOREACH(NotificationQueue* queue, mapSequenceNumbers | boost::adaptors::map_values) + { + if (!queue->isTypeRegistered(NotificationTypeBlock)) + return; - NotificationEntry entry; - entry.type = NotificationTypeBlock; - entry.notification.setStr(pindex->GetBlockHash().GetHex()); - addToQueue(entry); + NotificationEntry entry; + entry.type = NotificationTypeBlock; + entry.notification.setStr(pindex->GetBlockHash().GetHex()); + queue->addToQueue(entry); + } } }; -static NotificationQueue *queueSharedInstance = NULL; + +static NotificationQueueManager *queueManagerSharedInstance = NULL; + +NotificationQueue * getQueue(const std::string& clientID, bool createIfNotExists) +{ + NotificationQueue *clientQueue = queueManagerSharedInstance->getQueue(clientID); + if (!clientQueue && !createIfNotExists) + throw JSONRPCError(RPC_INVALID_PARAMETER, "Client UUID not found."); + if (!clientQueue) + clientQueue = queueManagerSharedInstance->addQueue(clientID); + return clientQueue; +} UniValue setregisterednotifications(const UniValue& params, bool fHelp) { - if (fHelp || params.size() != 1) + if (fHelp || params.size() != 2) throw std::runtime_error( - "setregisterednotifications [, , ...]\n" + "setregisterednotifications [, , ...]\n" "\nRegister for rpc notification(s).\n" "\nNotifications can be polled by calling pollnotifications." + "\nThe client UUID must be unique per remove application." "\nArguments:\n" - "1. \"type\" (string, required) The notification type to register for (\"hashblock\", \"hashtx\")\n" + "1. \"uuid\" (string, required) The client uuid\n" + "2. \"type\" (string, required) The notification type to register for (\"hashblock\", \"hashtx\")\n" "\nExamples:\n" "\nRegister for block and transaction notifications\n" + HelpExampleCli("setregisterednotifications", "\"[\"hashblock\", \"hashtx\"]\" \"") + "register for transaction and block signals\n" ); - RPCTypeCheck(params, boost::assign::list_of(UniValue::VARR), true); + RPCTypeCheck(params, boost::assign::list_of(UniValue::VSTR)(UniValue::VARR), true); + + NotificationQueue *clientQueue = getQueue(params[0].get_str(), true); /* remove all current registered types */ - queueSharedInstance->unregisterAllTypes(); + clientQueue->unregisterAllTypes(); - UniValue types = params[0].get_array(); + UniValue types = params[1].get_array(); BOOST_FOREACH(const UniValue& newType, types.getValues()) { if (!newType.isStr()) continue; - enum NotificationType type = queueSharedInstance->stringToType(newType.get_str()); + enum NotificationType type = clientQueue->stringToType(newType.get_str()); if (type == NotificationTypeUnknown) + { + /* don't register only for a subset of the requested notifications */ + clientQueue->unregisterAllTypes(); throw JSONRPCError(RPC_INVALID_PARAMETER, "Notification type not found"); + } - queueSharedInstance->registerType(type); + clientQueue->registerType(type); } return NullUniValue; @@ -220,10 +268,12 @@ UniValue setregisterednotifications(const UniValue& params, bool fHelp) UniValue getregisterednotifications(const UniValue& params, bool fHelp) { - if (fHelp || params.size() != 0) + if (fHelp || params.size() != 1) throw std::runtime_error( - "getregisterednotifications\n" + "getregisterednotifications \n" "\nReturns the currently registered RPC notification types.\n" + "\nArguments:\n" + "1. \"uuid\" (string, required) The client uuid\n" "\nResult:\n" "\"[\"\n" "\" \"\" (string) The registered signal\n" @@ -236,39 +286,44 @@ UniValue getregisterednotifications(const UniValue& params, bool fHelp) + HelpExampleRpc("getregisterednotifications", "") ); + RPCTypeCheck(params, boost::assign::list_of(UniValue::VSTR), true); + NotificationQueue *clientQueue = getQueue(params[0].get_str(), false); + UniValue result = UniValue(UniValue::VARR); - BOOST_FOREACH(enum NotificationType type, queueSharedInstance->registeredNotificationTypes) - result.push_back(queueSharedInstance->typeToString(type)); + BOOST_FOREACH(enum NotificationType type, clientQueue->registeredNotificationTypes) + result.push_back(clientQueue->typeToString(type)); return result; } UniValue pollnotifications(const UniValue& params, bool fHelp) { - if (fHelp || params.size() > 1) + if (fHelp || params.size() < 1 || params.size() > 2) throw std::runtime_error( - "pollnotifications \n" + "pollnotifications \n" "\nPolls all available notifications.\n" "\nThe RPC thread will idle for the seconds and will\n" "\nimmediately response if new notifications are available\n" "\nBy constantly reconnecting/re-polling, losing notification\n" "\nis almost impossible\n" "\nArguments:\n" - "1. \"timeout\" (number, option) The timeout \n" + "1. \"uuid\" (string, required) The client uuid\n" + "2. \"timeout\" (number, option) The timeout \n" "\nResult:\n" "\"[ notification, ... ]\" (object) The notification object\n" "\nExamples:\n" - "\nPoll notifications\n" - + HelpExampleCli("pollnotifications", "500") + + "\nPoll notifications for client a8098c1a...\n" + + HelpExampleCli("pollnotifications", "\"a8098c1a-f86e-11da-bd1a-00112444be1e\" 500") + "Long poll notification (max. 500 seconds)\n" - + HelpExampleRpc("pollnotifications", "500") + + HelpExampleRpc("pollnotifications", "\"a8098c1a-f86e-11da-bd1a-00112444be1e\" 500") ); - RPCTypeCheck(params, boost::assign::list_of(UniValue::VNUM), false); + RPCTypeCheck(params, boost::assign::list_of(UniValue::VSTR)(UniValue::VNUM), false); + NotificationQueue *clientQueue = getQueue(params[0].get_str(), false); int64_t timeOut = DEFAULT_POLL_TIMEOUT; - if (params.size() == 1) - timeOut = params[0].get_int64(); + if (params.size() == 2) + timeOut = params[1].get_int64(); int64_t startTime = GetTime(); @@ -276,9 +331,9 @@ UniValue pollnotifications(const UniValue& params, bool fHelp) // allow long polling while (IsRPCRunning()) { - if (queueSharedInstance->elementsAvailable()) + if (clientQueue->elementsAvailable()) { - queueSharedInstance->dequeueElements(result); + clientQueue->dequeueElements(result); break; } if (startTime+timeOut+(500/1000.0) < GetTime()) @@ -303,9 +358,9 @@ void RegisterNotificationRPCCommands(CRPCTable &tableRPC) for (unsigned int vcidx = 0; vcidx < ARRAYLEN(commands); vcidx++) tableRPC.appendCommand(commands[vcidx].name, &commands[vcidx]); - if (!queueSharedInstance) + if (!queueManagerSharedInstance) { - queueSharedInstance = new NotificationQueue(); - RegisterValidationInterface(queueSharedInstance); + queueManagerSharedInstance = new NotificationQueueManager(); + RegisterValidationInterface(queueManagerSharedInstance); } }