diff --git a/qa/pull-tester/rpc-tests.py b/qa/pull-tester/rpc-tests.py index 11b83bac146..fb2a6b8dd50 100755 --- a/qa/pull-tester/rpc-tests.py +++ b/qa/pull-tester/rpc-tests.py @@ -141,6 +141,7 @@ 'segwit.py', 'importprunedfunds.py', 'signmessages.py', + 'rpcsignals.py' ] if ENABLE_ZMQ: testScripts.append('zmq_test.py') diff --git a/qa/rpc-tests/rpcsignals.py b/qa/rpc-tests/rpcsignals.py new file mode 100755 index 00000000000..b1875e74a67 --- /dev/null +++ b/qa/rpc-tests/rpcsignals.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python3 +# Copyright (c) 2016 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +# Exercise the rpc signals API + +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import * +import threading +import uuid + +class pollThread (threading.Thread): + def __init__(self, threadID, node, clientUUID): + threading.Thread.__init__(self) + self.threadID = threadID + self.node = node + self.uuid = clientUUID + def run(self): + self.node.pollnotifications(self.uuid, 5) + +class RpcSignalsTest(BitcoinTestFramework): + + def pollthread(self, node): + print("thread") + + def setup_nodes(self): + return start_nodes(4, self.options.tmpdir) + + def run_test(self): + clientUUID = str(uuid.uuid1()) + try: + self.nodes[0].getregisterednotifications(clientUUID) + raise AssertionError("ClientUUID should not exists") + except JSONRPCException as e: + assert(e.error['code']==-8) + + self.nodes[0].setregisterednotifications(clientUUID, ["hashblock", "hashtx"]) + data = self.nodes[0].getregisterednotifications(clientUUID) + assert("hashblock" in data) + assert("hashtx" in data) + + self.nodes[0].generate(1) + data = self.nodes[0].pollnotifications(clientUUID) + assert(len(data) == 2) + + myThread = pollThread(1, self.nodes[0], clientUUID) + myThread.start() + time.sleep(3) + assert(myThread.isAlive() == True) + time.sleep(5) + assert(myThread.isAlive() == False) + myThread.join() + + self.nodes[0].setregisterednotifications(clientUUID, []) + data = self.nodes[0].getregisterednotifications(clientUUID) + assert(len(data) == 0) + self.nodes[0].generate(1) + data = self.nodes[0].pollnotifications(clientUUID, 5) + assert(len(data) == 0) + +if __name__ == '__main__': + RpcSignalsTest().main() + diff --git a/src/Makefile.am b/src/Makefile.am index e3eaacdb4c9..2d3712bae42 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -184,6 +184,7 @@ libbitcoin_server_a_SOURCES = \ rpc/mining.cpp \ rpc/misc.cpp \ rpc/net.cpp \ + rpc/notifications.cpp \ rpc/rawtransaction.cpp \ rpc/server.cpp \ script/sigcache.cpp \ diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index d0675fdb495..f89fc16cdf4 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -104,6 +104,8 @@ static const CRPCConvertParam vRPCConvertParams[] = { "setban", 3 }, { "getmempoolancestors", 1 }, { "getmempooldescendants", 1 }, + { "setregisterednotifications", 1 }, + { "pollnotifications", 1 }, }; class CRPCConvertTable diff --git a/src/rpc/notifications.cpp b/src/rpc/notifications.cpp new file mode 100644 index 00000000000..207548d0a5b --- /dev/null +++ b/src/rpc/notifications.cpp @@ -0,0 +1,366 @@ +// Copyright (c) 2016 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "rpc/server.h" +#include "chain.h" +#include "primitives/block.h" +#include "primitives/transaction.h" +#include "sync.h" +#include "util.h" +#include "utilstrencodings.h" +#include "validationinterface.h" + +#include + +#include +#include +#include + +#include +#include +#include + +static const char *MSG_HASHBLOCK = "hashblock"; +static const char *MSG_HASHTX = "hashtx"; + +/* keep the max queue size large becase we don't + auto-register for notification on startup */ +static const size_t MAX_QUEUE_SIZE = 1024*1024; +static const int DEFAULT_POLL_TIMEOUT = 30; + +enum NotificationType { + NotificationTypeUnknown, + NotificationTypeBlock, + NotificationTypeTx +}; + +typedef std::pair queueRange_t; +typedef std::string clientUUID_t; + +class NotificationEntry +{ +public: + enum NotificationType type; + int64_t notificationtime; + int32_t sequenceNumber; + UniValue notification; +}; + +class NotificationQueue +{ +public: + std::deque queue; + std::map mapSequenceNumbers; + std::set registeredNotificationTypes; + + CCriticalSection cs_notificationQueue; + + const std::string typeToString(enum NotificationType type) + { + switch (type) + { + case NotificationTypeBlock: + return MSG_HASHBLOCK; break; + case NotificationTypeTx: + return MSG_HASHTX; break; + default: + return "unknown"; + } + } + + enum NotificationType stringToType(const std::string& strType) + { + if (strType == MSG_HASHBLOCK) + return NotificationTypeBlock; + else if (strType == MSG_HASHTX) + return NotificationTypeTx; + else + return NotificationTypeUnknown; + } + + // populates a json object with all notifications in the queue + // returns a range to allow removing the elements from the queue + // after successfull transmitting + queueRange_t weakDequeueNotifications(UniValue &result) + { + size_t firstElement = 0; + size_t elementCount = 0; + + LOCK(cs_notificationQueue); + for (std::deque::iterator it = queue.begin(); it != queue.end(); it++) + { + UniValue obj = UniValue(UniValue::VOBJ); + obj.pushKV("type", typeToString(it->type)); + obj.pushKV("seq", it->sequenceNumber); + obj.pushKV("obj", it->notification); + result.push_back(obj); + elementCount++; + } + return std::make_pair(firstElement, elementCount); + } + + // removes notifications in the given range from the queue + void eraseRangeFromQueue(const queueRange_t range) + { + LOCK(cs_notificationQueue); + queue.erase(queue.begin()+range.first, queue.begin()+range.first+range.second); + } + + // dequeues all notifications from the queue + void dequeueElements(UniValue &result) + { + queueRange_t range = weakDequeueNotifications(result); + eraseRangeFromQueue(range); + } + + bool elementsAvailable() + { + LOCK(cs_notificationQueue); + return queue.size()>0; + } + + void registerType(enum NotificationType type) + { + if (type == NotificationTypeUnknown) + return; + + LOCK(cs_notificationQueue); + registeredNotificationTypes.insert(type); + } + + void unregisterType(enum NotificationType type) + { + LOCK(cs_notificationQueue); + registeredNotificationTypes.erase(type); + } + + void unregisterAllTypes() { LOCK(cs_notificationQueue); registeredNotificationTypes.clear(); } + + void addToQueue(NotificationEntry entry) + { + LOCK(cs_notificationQueue); + + size_t queueSize = queue.size(); + if (queueSize > MAX_QUEUE_SIZE) + { + queue.pop_front(); + LogPrintf("RPC Notification limit has been reached, dropping oldest element\n"); + } + mapSequenceNumbers[entry.type]++; + entry.sequenceNumber = mapSequenceNumbers[entry.type]; + queue.push_back(entry); + } + + /* checks if a certain notification type is registered */ + bool isTypeRegistered(enum NotificationType type) + { + LOCK(cs_notificationQueue); + return (registeredNotificationTypes.find(type) != registeredNotificationTypes.end()); + } +}; + +class NotificationQueueManager : public CValidationInterface +{ +public: + CCriticalSection cs_queueManager; + std::map mapSequenceNumbers; + + NotificationQueue* getQueue(const clientUUID_t& clientid) + { + LOCK(cs_queueManager); + return mapSequenceNumbers[clientid]; + } + + NotificationQueue* addQueue(const clientUUID_t& clientid) + { + LOCK(cs_queueManager); + mapSequenceNumbers[clientid] = new NotificationQueue(); + return mapSequenceNumbers[clientid]; + } + + void SyncTransaction(const CTransaction& tx, const CBlockIndex *pindex, const CBlock* pblock) + { + LOCK(cs_queueManager); + BOOST_FOREACH(NotificationQueue* queue, mapSequenceNumbers | boost::adaptors::map_values) + { + if (!queue->isTypeRegistered(NotificationTypeTx)) + return; + + NotificationEntry entry; + entry.type = NotificationTypeTx; + entry.notification.setStr(tx.GetHash().GetHex()); + queue->addToQueue(entry); + } + } + + void UpdatedBlockTip(const CBlockIndex *pindex) + { + LOCK(cs_queueManager); + BOOST_FOREACH(NotificationQueue* queue, mapSequenceNumbers | boost::adaptors::map_values) + { + if (!queue->isTypeRegistered(NotificationTypeBlock)) + return; + + NotificationEntry entry; + entry.type = NotificationTypeBlock; + entry.notification.setStr(pindex->GetBlockHash().GetHex()); + queue->addToQueue(entry); + } + } +}; + +static NotificationQueueManager *queueManagerSharedInstance = NULL; + +NotificationQueue * getQueue(const std::string& clientID, bool createIfNotExists) +{ + NotificationQueue *clientQueue = queueManagerSharedInstance->getQueue(clientID); + if (!clientQueue && !createIfNotExists) + throw JSONRPCError(RPC_INVALID_PARAMETER, "Client UUID not found."); + if (!clientQueue) + clientQueue = queueManagerSharedInstance->addQueue(clientID); + return clientQueue; +} + +UniValue setregisterednotifications(const UniValue& params, bool fHelp) +{ + if (fHelp || params.size() != 2) + throw std::runtime_error( + "setregisterednotifications [, , ...]\n" + "\nRegister for rpc notification(s).\n" + "\nNotifications can be polled by calling pollnotifications." + "\nThe client UUID must be unique per remove application." + "\nArguments:\n" + "1. \"uuid\" (string, required) The client uuid\n" + "2. \"type\" (string, required) The notification type to register for (\"hashblock\", \"hashtx\")\n" + "\nExamples:\n" + "\nRegister for block and transaction notifications\n" + + HelpExampleCli("setregisterednotifications", "\"[\"hashblock\", \"hashtx\"]\" \"") + + "register for transaction and block signals\n" + ); + + RPCTypeCheck(params, boost::assign::list_of(UniValue::VSTR)(UniValue::VARR), true); + + NotificationQueue *clientQueue = getQueue(params[0].get_str(), true); + + /* remove all current registered types */ + clientQueue->unregisterAllTypes(); + + UniValue types = params[1].get_array(); + BOOST_FOREACH(const UniValue& newType, types.getValues()) + { + if (!newType.isStr()) + continue; + + enum NotificationType type = clientQueue->stringToType(newType.get_str()); + if (type == NotificationTypeUnknown) + { + /* don't register only for a subset of the requested notifications */ + clientQueue->unregisterAllTypes(); + throw JSONRPCError(RPC_INVALID_PARAMETER, "Notification type not found"); + } + + clientQueue->registerType(type); + } + + return NullUniValue; +} + +UniValue getregisterednotifications(const UniValue& params, bool fHelp) +{ + if (fHelp || params.size() != 1) + throw std::runtime_error( + "getregisterednotifications \n" + "\nReturns the currently registered RPC notification types.\n" + "\nArguments:\n" + "1. \"uuid\" (string, required) The client uuid\n" + "\nResult:\n" + "\"[\"\n" + "\" \"\" (string) The registered signal\n" + "\" ,...\n" + "\"]\"\n" + "\nExamples:\n" + "\nCreate a transaction\n" + + HelpExampleCli("getregisterednotifications", "") + + "Get the registered notification types\n" + + HelpExampleRpc("getregisterednotifications", "") + ); + + RPCTypeCheck(params, boost::assign::list_of(UniValue::VSTR), true); + NotificationQueue *clientQueue = getQueue(params[0].get_str(), false); + + UniValue result = UniValue(UniValue::VARR); + BOOST_FOREACH(enum NotificationType type, clientQueue->registeredNotificationTypes) + result.push_back(clientQueue->typeToString(type)); + + return result; +} + +UniValue pollnotifications(const UniValue& params, bool fHelp) +{ + if (fHelp || params.size() < 1 || params.size() > 2) + throw std::runtime_error( + "pollnotifications \n" + "\nPolls all available notifications.\n" + "\nThe RPC thread will idle for the seconds and will\n" + "\nimmediately response if new notifications are available\n" + "\nBy constantly reconnecting/re-polling, losing notification\n" + "\nis almost impossible\n" + "\nArguments:\n" + "1. \"uuid\" (string, required) The client uuid\n" + "2. \"timeout\" (number, option) The timeout \n" + "\nResult:\n" + "\"[ notification, ... ]\" (object) The notification object\n" + "\nExamples:\n" + "\nPoll notifications for client a8098c1a...\n" + + HelpExampleCli("pollnotifications", "\"a8098c1a-f86e-11da-bd1a-00112444be1e\" 500") + + "Long poll notification (max. 500 seconds)\n" + + HelpExampleRpc("pollnotifications", "\"a8098c1a-f86e-11da-bd1a-00112444be1e\" 500") + ); + + RPCTypeCheck(params, boost::assign::list_of(UniValue::VSTR)(UniValue::VNUM), false); + NotificationQueue *clientQueue = getQueue(params[0].get_str(), false); + + int64_t timeOut = DEFAULT_POLL_TIMEOUT; + if (params.size() == 2) + timeOut = params[1].get_int64(); + + int64_t startTime = GetTime(); + + UniValue result = UniValue(UniValue::VARR); + // allow long polling + while (IsRPCRunning()) + { + if (clientQueue->elementsAvailable()) + { + clientQueue->dequeueElements(result); + break; + } + if (startTime+timeOut+(500/1000.0) < GetTime()) + break; + MilliSleep(500); + } + + return result; +} + +static const CRPCCommand commands[] = +{ // category name actor (function) okSafeMode + // --------------------- ---------------------------- --------------------------- ---------- + { "notification", "setregisterednotifications", &setregisterednotifications, true }, + { "notification", "getregisterednotifications", &getregisterednotifications, true }, + { "notification", "pollnotifications", &pollnotifications, true }, + +}; + +void RegisterNotificationRPCCommands(CRPCTable &tableRPC) +{ + for (unsigned int vcidx = 0; vcidx < ARRAYLEN(commands); vcidx++) + tableRPC.appendCommand(commands[vcidx].name, &commands[vcidx]); + + if (!queueManagerSharedInstance) + { + queueManagerSharedInstance = new NotificationQueueManager(); + RegisterValidationInterface(queueManagerSharedInstance); + } +} diff --git a/src/rpc/register.h b/src/rpc/register.h index 01aa58a25d8..9c6a440e613 100644 --- a/src/rpc/register.h +++ b/src/rpc/register.h @@ -19,6 +19,8 @@ void RegisterMiscRPCCommands(CRPCTable &tableRPC); void RegisterMiningRPCCommands(CRPCTable &tableRPC); /** Register raw transaction RPC commands */ void RegisterRawTransactionRPCCommands(CRPCTable &tableRPC); +/** Register RPC notification commands */ +void RegisterNotificationRPCCommands(CRPCTable &tableRPC); static inline void RegisterAllCoreRPCCommands(CRPCTable &tableRPC) { @@ -27,6 +29,7 @@ static inline void RegisterAllCoreRPCCommands(CRPCTable &tableRPC) RegisterMiscRPCCommands(tableRPC); RegisterMiningRPCCommands(tableRPC); RegisterRawTransactionRPCCommands(tableRPC); + RegisterNotificationRPCCommands(tableRPC); } #endif