diff --git a/doc/developer-notes.md b/doc/developer-notes.md index d783a7a8ae3..eed296162e3 100644 --- a/doc/developer-notes.md +++ b/doc/developer-notes.md @@ -613,3 +613,16 @@ A few guidelines for introducing and reviewing new RPC interfaces: - *Rationale*: as well as complicating the implementation and interfering with the introduction of multi-wallet, wallet and non-wallet code should be separated to avoid introducing circular dependencies between code units. + +- Wallet RPCs call BlockUntilSyncedToCurrentChain to maintain consistency with + `getblockchaininfo`'s state immediately prior to the call's execution. Wallet + RPCs whose behavior does *not* depend on the current chainstate may omit this + call. + + - *Rationale*: In previous versions of Bitcoin Core, the wallet was always + in-sync with the chainstate (by virtue of them all being updated in the + same cs_main lock). In order to maintain the behavior that wallet RPCs + return results as of at least the highest best-known block an RPC + client may be aware of prior to entering a wallet RPC call, we must block + until the wallet is caught up to the chainstate as of the RPC call's entry. + This also makes the API much easier for RPC clients to reason about. diff --git a/src/init.cpp b/src/init.cpp index d79c2967b97..614c7dd7680 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -267,6 +267,7 @@ void Shutdown() #endif UnregisterAllValidationInterfaces(); GetMainSignals().UnregisterBackgroundSignalScheduler(); + GetMainSignals().UnregisterWithMempoolSignals(mempool); #ifdef ENABLE_WALLET for (CWalletRef pwallet : vpwallets) { delete pwallet; @@ -1228,6 +1229,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler) threadGroup.create_thread(boost::bind(&TraceThread, "scheduler", serviceLoop)); GetMainSignals().RegisterBackgroundSignalScheduler(scheduler); + GetMainSignals().RegisterWithMempoolSignals(mempool); /* Start the RPC server already. It will be started in "warmup" mode * and not really process calls already (but it will signify connections diff --git a/src/rpc/misc.cpp b/src/rpc/misc.cpp index efff4a99aef..3628afd0d37 100644 --- a/src/rpc/misc.cpp +++ b/src/rpc/misc.cpp @@ -78,6 +78,9 @@ UniValue getinfo(const JSONRPCRequest& request) #ifdef ENABLE_WALLET CWallet * const pwallet = GetWalletForJSONRPCRequest(request); + if (pwallet) { + pwallet->BlockUntilSyncedToCurrentChain(); + } LOCK2(cs_main, pwallet ? &pwallet->cs_wallet : nullptr); #else LOCK(cs_main); diff --git a/src/rpc/rawtransaction.cpp b/src/rpc/rawtransaction.cpp index 81005118541..b7b34f86219 100644 --- a/src/rpc/rawtransaction.cpp +++ b/src/rpc/rawtransaction.cpp @@ -11,6 +11,7 @@ #include "init.h" #include "keystore.h" #include "validation.h" +#include "validationinterface.h" #include "merkleblock.h" #include "net.h" #include "policy/policy.h" @@ -30,6 +31,7 @@ #endif #include +#include #include @@ -911,6 +913,10 @@ UniValue sendrawtransaction(const JSONRPCRequest& request) + HelpExampleRpc("sendrawtransaction", "\"signedhex\"") ); + CTransactionRef tx; + std::promise promise; + + { // cs_main scope LOCK(cs_main); RPCTypeCheck(request.params, {UniValue::VSTR, UniValue::VBOOL}); @@ -918,7 +924,7 @@ UniValue sendrawtransaction(const JSONRPCRequest& request) CMutableTransaction mtx; if (!DecodeHexTx(mtx, request.params[0].get_str())) throw JSONRPCError(RPC_DESERIALIZATION_ERROR, "TX decode failed"); - CTransactionRef tx(MakeTransactionRef(std::move(mtx))); + tx = MakeTransactionRef(std::move(mtx)); const uint256& hashTx = tx->GetHash(); CAmount nMaxRawTxFee = maxTxFee; @@ -937,7 +943,7 @@ UniValue sendrawtransaction(const JSONRPCRequest& request) CValidationState state; bool fMissingInputs; bool fLimitFree = true; - if (!AcceptToMemoryPool(mempool, state, std::move(tx), fLimitFree, &fMissingInputs, nullptr, false, nMaxRawTxFee)) { + if (!AcceptToMemoryPool(mempool, state, tx, fLimitFree, &fMissingInputs, nullptr, false, nMaxRawTxFee)) { if (state.IsInvalid()) { throw JSONRPCError(RPC_TRANSACTION_REJECTED, strprintf("%i: %s", state.GetRejectCode(), state.GetRejectReason())); } else { @@ -946,19 +952,34 @@ UniValue sendrawtransaction(const JSONRPCRequest& request) } throw JSONRPCError(RPC_TRANSACTION_ERROR, state.GetRejectReason()); } + } else { + // If wallet is enabled, ensure that the wallet has been made aware + // of the new transaction prior to returning. This prevents a race + // where a user might call sendrawtransaction with a transaction + // to/from their wallet, immediately call some wallet RPC, and get + // a stale result because callbacks have not yet been processed. + CallFunctionInValidationInterfaceQueue([&promise] { + promise.set_value(); + }); } } else if (fHaveChain) { throw JSONRPCError(RPC_TRANSACTION_ALREADY_IN_CHAIN, "transaction already in block chain"); } + + } // cs_main + + promise.get_future().wait(); + if(!g_connman) throw JSONRPCError(RPC_CLIENT_P2P_DISABLED, "Error: Peer-to-peer functionality missing or disabled"); - CInv inv(MSG_TX, hashTx); + CInv inv(MSG_TX, tx->GetHash()); g_connman->ForEachNode([&inv](CNode* pnode) { pnode->PushInventory(inv); }); - return hashTx.GetHex(); + + return tx->GetHash().GetHex(); } static const CRPCCommand commands[] = diff --git a/src/sync.cpp b/src/sync.cpp index b82f3770e42..47ef9628d02 100644 --- a/src/sync.cpp +++ b/src/sync.cpp @@ -155,6 +155,16 @@ void AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, abort(); } +void AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) +{ + for (const std::pair& i : *lockstack) { + if (i.first == cs) { + fprintf(stderr, "Assertion failed: lock %s held in %s:%i; locks held:\n%s", pszName, pszFile, nLine, LocksHeld().c_str()); + abort(); + } + } +} + void DeleteLock(void* cs) { if (!lockdata.available) { diff --git a/src/sync.h b/src/sync.h index 4921aedf399..743452cf1a4 100644 --- a/src/sync.h +++ b/src/sync.h @@ -75,14 +75,17 @@ void EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs void LeaveCritical(); std::string LocksHeld(); void AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs); +void AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs); void DeleteLock(void* cs); #else void static inline EnterCritical(const char* pszName, const char* pszFile, int nLine, void* cs, bool fTry = false) {} void static inline LeaveCritical() {} void static inline AssertLockHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) {} +void static inline AssertLockNotHeldInternal(const char* pszName, const char* pszFile, int nLine, void* cs) {} void static inline DeleteLock(void* cs) {} #endif #define AssertLockHeld(cs) AssertLockHeldInternal(#cs, __FILE__, __LINE__, &cs) +#define AssertLockNotHeld(cs) AssertLockNotHeldInternal(#cs, __FILE__, __LINE__, &cs) /** * Wrapped boost mutex: supports recursive locking, but no waiting diff --git a/src/txmempool.h b/src/txmempool.h index 6723ea8e6ca..067359cf44e 100644 --- a/src/txmempool.h +++ b/src/txmempool.h @@ -516,6 +516,9 @@ class CTxMemPool // to track size/count of descendant transactions. First version of // addUnchecked can be used to have it call CalculateMemPoolAncestors(), and // then invoke the second version. + // Note that addUnchecked is ONLY called from ATMP outside of tests + // and any other callers may break wallet's in-mempool tracking (due to + // lack of CValidationInterface::TransactionAddedToMempool callbacks). bool addUnchecked(const uint256& hash, const CTxMemPoolEntry &entry, bool validFeeEstimate = true); bool addUnchecked(const uint256& hash, const CTxMemPoolEntry &entry, setEntries &setAncestors, bool validFeeEstimate = true); diff --git a/src/validation.cpp b/src/validation.cpp index 8bd23a0f1d7..80e88044753 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -2457,7 +2457,7 @@ bool ActivateBestChain(CValidationState &state, const CChainParams& chainparams, for (const PerBlockConnectTrace& trace : connectTrace.GetBlocksConnected()) { assert(trace.pblock && trace.pindex); - GetMainSignals().BlockConnected(trace.pblock, trace.pindex, *trace.conflictedTxs); + GetMainSignals().BlockConnected(trace.pblock, trace.pindex, trace.conflictedTxs); } } // When we reach this point, we switched to a new tip (stored in pindexNewTip). diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index bf20d606f83..d71600e82df 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -7,6 +7,7 @@ #include "init.h" #include "scheduler.h" #include "sync.h" +#include "txmempool.h" #include "util.h" #include @@ -19,6 +20,7 @@ struct MainSignalsInstance { boost::signals2::signal TransactionAddedToMempool; boost::signals2::signal &, const CBlockIndex *pindex, const std::vector&)> BlockConnected; boost::signals2::signal &)> BlockDisconnected; + boost::signals2::signal TransactionRemovedFromMempool; boost::signals2::signal SetBestChain; boost::signals2::signal Inventory; boost::signals2::signal Broadcast; @@ -48,6 +50,14 @@ void CMainSignals::FlushBackgroundCallbacks() { m_internals->m_schedulerClient.EmptyQueue(); } +void CMainSignals::RegisterWithMempoolSignals(CTxMemPool& pool) { + pool.NotifyEntryRemoved.connect(boost::bind(&CMainSignals::MempoolEntryRemoved, this, _1, _2)); +} + +void CMainSignals::UnregisterWithMempoolSignals(CTxMemPool& pool) { + pool.NotifyEntryRemoved.disconnect(boost::bind(&CMainSignals::MempoolEntryRemoved, this, _1, _2)); +} + CMainSignals& GetMainSignals() { return g_signals; @@ -58,6 +68,7 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn) { g_signals.m_internals->TransactionAddedToMempool.connect(boost::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, _1)); g_signals.m_internals->BlockConnected.connect(boost::bind(&CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3)); g_signals.m_internals->BlockDisconnected.connect(boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1)); + g_signals.m_internals->TransactionRemovedFromMempool.connect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1)); g_signals.m_internals->SetBestChain.connect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1)); g_signals.m_internals->Inventory.connect(boost::bind(&CValidationInterface::Inventory, pwalletIn, _1)); g_signals.m_internals->Broadcast.connect(boost::bind(&CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2)); @@ -73,6 +84,7 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) { g_signals.m_internals->TransactionAddedToMempool.disconnect(boost::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, _1)); g_signals.m_internals->BlockConnected.disconnect(boost::bind(&CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3)); g_signals.m_internals->BlockDisconnected.disconnect(boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1)); + g_signals.m_internals->TransactionRemovedFromMempool.disconnect(boost::bind(&CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1)); g_signals.m_internals->UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3)); g_signals.m_internals->NewPoWValidBlock.disconnect(boost::bind(&CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2)); } @@ -85,32 +97,55 @@ void UnregisterAllValidationInterfaces() { g_signals.m_internals->TransactionAddedToMempool.disconnect_all_slots(); g_signals.m_internals->BlockConnected.disconnect_all_slots(); g_signals.m_internals->BlockDisconnected.disconnect_all_slots(); + g_signals.m_internals->TransactionRemovedFromMempool.disconnect_all_slots(); g_signals.m_internals->UpdatedBlockTip.disconnect_all_slots(); g_signals.m_internals->NewPoWValidBlock.disconnect_all_slots(); } +void CallFunctionInValidationInterfaceQueue(std::function func) { + g_signals.m_internals->m_schedulerClient.AddToProcessQueue(std::move(func)); +} + +void CMainSignals::MempoolEntryRemoved(CTransactionRef ptx, MemPoolRemovalReason reason) { + if (reason != MemPoolRemovalReason::BLOCK && reason != MemPoolRemovalReason::CONFLICT) { + m_internals->m_schedulerClient.AddToProcessQueue([ptx, this] { + m_internals->TransactionRemovedFromMempool(ptx); + }); + } +} + void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { m_internals->UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload); } void CMainSignals::TransactionAddedToMempool(const CTransactionRef &ptx) { - m_internals->TransactionAddedToMempool(ptx); + m_internals->m_schedulerClient.AddToProcessQueue([ptx, this] { + m_internals->TransactionAddedToMempool(ptx); + }); } -void CMainSignals::BlockConnected(const std::shared_ptr &pblock, const CBlockIndex *pindex, const std::vector& vtxConflicted) { - m_internals->BlockConnected(pblock, pindex, vtxConflicted); +void CMainSignals::BlockConnected(const std::shared_ptr &pblock, const CBlockIndex *pindex, const std::shared_ptr>& pvtxConflicted) { + m_internals->m_schedulerClient.AddToProcessQueue([pblock, pindex, pvtxConflicted, this] { + m_internals->BlockConnected(pblock, pindex, *pvtxConflicted); + }); } void CMainSignals::BlockDisconnected(const std::shared_ptr &pblock) { - m_internals->BlockDisconnected(pblock); + m_internals->m_schedulerClient.AddToProcessQueue([pblock, this] { + m_internals->BlockDisconnected(pblock); + }); } void CMainSignals::SetBestChain(const CBlockLocator &locator) { - m_internals->SetBestChain(locator); + m_internals->m_schedulerClient.AddToProcessQueue([locator, this] { + m_internals->SetBestChain(locator); + }); } void CMainSignals::Inventory(const uint256 &hash) { - m_internals->Inventory(hash); + m_internals->m_schedulerClient.AddToProcessQueue([hash, this] { + m_internals->Inventory(hash); + }); } void CMainSignals::Broadcast(int64_t nBestBlockTime, CConnman* connman) { diff --git a/src/validationinterface.h b/src/validationinterface.h index d6da2bc1fd2..eeda3f91eb3 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -7,6 +7,7 @@ #define BITCOIN_VALIDATIONINTERFACE_H #include +#include #include "primitives/transaction.h" // CTransaction(Ref) @@ -20,6 +21,8 @@ class CValidationInterface; class CValidationState; class uint256; class CScheduler; +class CTxMemPool; +enum class MemPoolRemovalReason; // These functions dispatch to one or all registered wallets @@ -29,23 +32,57 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn); void UnregisterValidationInterface(CValidationInterface* pwalletIn); /** Unregister all wallets from core */ void UnregisterAllValidationInterfaces(); +/** + * Pushes a function to callback onto the notification queue, guaranteeing any + * callbacks generated prior to now are finished when the function is called. + */ +void CallFunctionInValidationInterfaceQueue(std::function func); class CValidationInterface { protected: /** Notifies listeners of updated block chain tip */ virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {} - /** Notifies listeners of a transaction having been added to mempool. */ + /** + * Notifies listeners of a transaction having been added to mempool. + * + * Called on a background thread. + */ virtual void TransactionAddedToMempool(const CTransactionRef &ptxn) {} /** + * Notifies listeners of a transaction leaving mempool. + * + * This only fires for transactions which leave mempool because of expiry, + * size limiting, reorg (changes in lock times/coinbase maturity), or + * replacement. This does not include any transactions which are included + * in BlockConnectedDisconnected either in block->vtx or in txnConflicted. + * + * Called on a background thread. + */ + virtual void TransactionRemovedFromMempool(const CTransactionRef &ptx) {} + /** * Notifies listeners of a block being connected. * Provides a vector of transactions evicted from the mempool as a result. + * + * Called on a background thread. */ virtual void BlockConnected(const std::shared_ptr &block, const CBlockIndex *pindex, const std::vector &txnConflicted) {} - /** Notifies listeners of a block being disconnected */ + /** + * Notifies listeners of a block being disconnected + * + * Called on a background thread. + */ virtual void BlockDisconnected(const std::shared_ptr &block) {} - /** Notifies listeners of the new active block chain on-disk. */ + /** + * Notifies listeners of the new active block chain on-disk. + * + * Called on a background thread. + */ virtual void SetBestChain(const CBlockLocator &locator) {} - /** Notifies listeners about an inventory item being seen on the network. */ + /** + * Notifies listeners about an inventory item being seen on the network. + * + * Called on a background thread. + */ virtual void Inventory(const uint256 &hash) {} /** Tells listeners to broadcast their data. */ virtual void ResendWalletTransactions(int64_t nBestBlockTime, CConnman* connman) {} @@ -73,6 +110,9 @@ class CMainSignals { friend void ::RegisterValidationInterface(CValidationInterface*); friend void ::UnregisterValidationInterface(CValidationInterface*); friend void ::UnregisterAllValidationInterfaces(); + friend void ::CallFunctionInValidationInterfaceQueue(std::function func); + + void MempoolEntryRemoved(CTransactionRef tx, MemPoolRemovalReason reason); public: /** Register a CScheduler to give callbacks which should run in the background (may only be called once) */ @@ -82,9 +122,14 @@ class CMainSignals { /** Call any remaining callbacks on the calling thread */ void FlushBackgroundCallbacks(); + /** Register with mempool to call TransactionRemovedFromMempool callbacks */ + void RegisterWithMempoolSignals(CTxMemPool& pool); + /** Unregister with mempool */ + void UnregisterWithMempoolSignals(CTxMemPool& pool); + void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); void TransactionAddedToMempool(const CTransactionRef &); - void BlockConnected(const std::shared_ptr &, const CBlockIndex *pindex, const std::vector &); + void BlockConnected(const std::shared_ptr &, const CBlockIndex *pindex, const std::shared_ptr> &); void BlockDisconnected(const std::shared_ptr &); void SetBestChain(const CBlockLocator &); void Inventory(const uint256 &); diff --git a/src/wallet/rpcwallet.cpp b/src/wallet/rpcwallet.cpp index a6176c3485e..09979059d30 100644 --- a/src/wallet/rpcwallet.cpp +++ b/src/wallet/rpcwallet.cpp @@ -449,6 +449,10 @@ UniValue sendtoaddress(const JSONRPCRequest& request) + HelpExampleRpc("sendtoaddress", "\"1M72Sfpbz1BPpXFHz9m3CdqATR44Jvaydd\", 0.1, \"donation\", \"seans outpost\"") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); CBitcoinAddress address(request.params[0].get_str()); @@ -525,6 +529,10 @@ UniValue listaddressgroupings(const JSONRPCRequest& request) + HelpExampleRpc("listaddressgroupings", "") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); UniValue jsonGroupings(UniValue::VARR); @@ -634,6 +642,10 @@ UniValue getreceivedbyaddress(const JSONRPCRequest& request) + HelpExampleRpc("getreceivedbyaddress", "\"1D1ZrZNe3JUo7ZycKEYQQiQAWd9y54F4XX\", 6") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); // Bitcoin address @@ -694,6 +706,10 @@ UniValue getreceivedbyaccount(const JSONRPCRequest& request) + HelpExampleRpc("getreceivedbyaccount", "\"tabby\", 6") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); // Minimum confirmations @@ -766,6 +782,10 @@ UniValue getbalance(const JSONRPCRequest& request) + HelpExampleRpc("getbalance", "\"*\", 6") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); if (request.params.size() == 0) @@ -797,6 +817,10 @@ UniValue getunconfirmedbalance(const JSONRPCRequest &request) "getunconfirmedbalance\n" "Returns the server's total unconfirmed balance\n"); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); return ValueFromAmount(pwallet->GetUnconfirmedBalance()); @@ -889,6 +913,10 @@ UniValue sendfrom(const JSONRPCRequest& request) + HelpExampleRpc("sendfrom", "\"tabby\", \"1M72Sfpbz1BPpXFHz9m3CdqATR44Jvaydd\", 0.01, 6, \"donation\", \"seans outpost\"") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); std::string strAccount = AccountFromValue(request.params[0]); @@ -972,6 +1000,10 @@ UniValue sendmany(const JSONRPCRequest& request) + HelpExampleRpc("sendmany", "\"\", \"{\\\"1D1ZrZNe3JUo7ZycKEYQQiQAWd9y54F4XX\\\":0.01,\\\"1353tsE8YMTA4EuV7dgUXGjNFf9KpVvKHz\\\":0.02}\", 6, \"testing\"") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); if (pwallet->GetBroadcastTransactions() && !g_connman) { @@ -1389,6 +1421,10 @@ UniValue listreceivedbyaddress(const JSONRPCRequest& request) + HelpExampleRpc("listreceivedbyaddress", "6, true, true") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); return ListReceived(pwallet, request.params, false); @@ -1428,6 +1464,10 @@ UniValue listreceivedbyaccount(const JSONRPCRequest& request) + HelpExampleRpc("listreceivedbyaccount", "6, true, true") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); return ListReceived(pwallet, request.params, true); @@ -1615,6 +1655,10 @@ UniValue listtransactions(const JSONRPCRequest& request) + HelpExampleRpc("listtransactions", "\"*\", 20, 100") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); std::string strAccount = "*"; @@ -1708,6 +1752,10 @@ UniValue listaccounts(const JSONRPCRequest& request) + HelpExampleRpc("listaccounts", "6") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); int nMinDepth = 1; @@ -1816,6 +1864,10 @@ UniValue listsinceblock(const JSONRPCRequest& request) + HelpExampleRpc("listsinceblock", "\"000000000000000bacf66f7497b7dc45ef753ee9a7d38571037cdb1a57f663ad\", 6") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); const CBlockIndex* pindex = nullptr; // Block index of the specified block or the common ancestor, if the block provided was in a deactivated chain. @@ -1946,6 +1998,10 @@ UniValue gettransaction(const JSONRPCRequest& request) + HelpExampleRpc("gettransaction", "\"1075db55d416d3ca199f55b6084e2115b9345e16c5cf302fc80e9d5fbf5d48d\"") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); uint256 hash; @@ -2006,6 +2062,10 @@ UniValue abandontransaction(const JSONRPCRequest& request) + HelpExampleRpc("abandontransaction", "\"1075db55d416d3ca199f55b6084e2115b9345e16c5cf302fc80e9d5fbf5d48d\"") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); uint256 hash; @@ -2040,6 +2100,10 @@ UniValue backupwallet(const JSONRPCRequest& request) + HelpExampleRpc("backupwallet", "\"backup.dat\"") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); std::string strDest = request.params[0].get_str(); @@ -2359,6 +2423,10 @@ UniValue lockunspent(const JSONRPCRequest& request) + HelpExampleRpc("lockunspent", "false, \"[{\\\"txid\\\":\\\"a08e6907dbbd3d809776dbfc5d82e371b764ed838b5655e72f463568df1aadf0\\\",\\\"vout\\\":1}]\"") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); if (request.params.size() == 1) @@ -2517,6 +2585,10 @@ UniValue getwalletinfo(const JSONRPCRequest& request) + HelpExampleRpc("getwalletinfo", "") ); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); UniValue obj(UniValue::VOBJ); @@ -2723,6 +2795,10 @@ UniValue listunspent(const JSONRPCRequest& request) nMaximumCount = options["maximumCount"].get_int64(); } + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + UniValue results(UniValue::VARR); std::vector vecOutputs; assert(pwallet != nullptr); @@ -2832,6 +2908,10 @@ UniValue fundrawtransaction(const JSONRPCRequest& request) RPCTypeCheck(request.params, {UniValue::VSTR}); + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + CCoinControl coinControl; int changePosition = -1; bool lockUnspents = false; @@ -3041,6 +3121,10 @@ UniValue bumpfee(const JSONRPCRequest& request) } } + // Make sure the results are valid at least up to the most recent block + // the user could have gotten from another RPC command prior to now + pwallet->BlockUntilSyncedToCurrentChain(); + LOCK2(cs_main, pwallet->cs_wallet); EnsureWalletIsUnlocked(pwallet); diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index 599e74149c5..df80aa3f78c 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -32,6 +32,7 @@ #include "utilmoneystr.h" #include +#include #include #include @@ -1232,6 +1233,19 @@ void CWallet::SyncTransaction(const CTransactionRef& ptx, const CBlockIndex *pin void CWallet::TransactionAddedToMempool(const CTransactionRef& ptx) { LOCK2(cs_main, cs_wallet); SyncTransaction(ptx); + + auto it = mapWallet.find(ptx->GetHash()); + if (it != mapWallet.end()) { + it->second.fInMempool = true; + } +} + +void CWallet::TransactionRemovedFromMempool(const CTransactionRef &ptx) { + LOCK(cs_wallet); + auto it = mapWallet.find(ptx->GetHash()); + if (it != mapWallet.end()) { + it->second.fInMempool = false; + } } void CWallet::BlockConnected(const std::shared_ptr& pblock, const CBlockIndex *pindex, const std::vector& vtxConflicted) { @@ -1246,10 +1260,14 @@ void CWallet::BlockConnected(const std::shared_ptr& pblock, const for (const CTransactionRef& ptx : vtxConflicted) { SyncTransaction(ptx); + TransactionRemovedFromMempool(ptx); } for (size_t i = 0; i < pblock->vtx.size(); i++) { SyncTransaction(pblock->vtx[i], pindex, i); + TransactionRemovedFromMempool(pblock->vtx[i]); } + + m_last_block_processed = pindex; } void CWallet::BlockDisconnected(const std::shared_ptr& pblock) { @@ -1262,6 +1280,29 @@ void CWallet::BlockDisconnected(const std::shared_ptr& pblock) { +void CWallet::BlockUntilSyncedToCurrentChain() { + AssertLockNotHeld(cs_main); + AssertLockNotHeld(cs_wallet); + + { + // Skip the queue-draining stuff if we know we're caught up with + // chainActive.Tip() + LOCK(cs_main); + const CBlockIndex* initialChainTip = chainActive.Tip(); + + if (m_last_block_processed->GetAncestor(initialChainTip->nHeight) == initialChainTip) { + return; + } + } + + std::promise promise; + CallFunctionInValidationInterfaceQueue([&promise] { + promise.set_value(); + }); + promise.get_future().wait(); +} + + isminetype CWallet::IsMine(const CTxIn &txin) const { { @@ -1643,6 +1684,8 @@ CBlockIndex* CWallet::ScanForWalletTransactions(CBlockIndex* pindexStart, bool f ShowProgress(_("Rescanning..."), 100); // hide progress dialog in GUI fScanningWallet = false; + + m_last_block_processed = chainActive.Tip(); } return ret; } @@ -1876,8 +1919,7 @@ CAmount CWalletTx::GetChange() const bool CWalletTx::InMempool() const { - LOCK(mempool.cs); - return mempool.exists(GetHash()); + return fInMempool; } bool CWalletTx::IsTrusted() const @@ -2992,14 +3034,18 @@ bool CWallet::CommitTransaction(CWalletTx& wtxNew, CReserveKey& reservekey, CCon // Track how many getdata requests our transaction gets mapRequestCount[wtxNew.GetHash()] = 0; + // Get the inserted-CWalletTx from mapWallet so that the + // fInMempool flag is cached properly + CWalletTx& wtx = mapWallet[wtxNew.GetHash()]; + if (fBroadcastTransactions) { // Broadcast - if (!wtxNew.AcceptToMemoryPool(maxTxFee, state)) { + if (!wtx.AcceptToMemoryPool(maxTxFee, state)) { LogPrintf("CommitTransaction(): Transaction cannot be broadcast immediately, %s\n", state.GetRejectReason()); // TODO: if we expect the failure to be long term or permanent, instead delete wtx from the wallet and return failure. } else { - wtxNew.RelayWalletTransaction(connman); + wtx.RelayWalletTransaction(connman); } } } @@ -4039,8 +4085,6 @@ CWallet* CWallet::CreateWalletFromFile(const std::string walletFile) LogPrintf(" wallet %15dms\n", GetTimeMillis() - nStart); - RegisterValidationInterface(walletInstance); - // Try to top up keypool. No-op if the wallet is locked. walletInstance->TopUpKeyPool(); @@ -4052,6 +4096,12 @@ CWallet* CWallet::CreateWalletFromFile(const std::string walletFile) if (walletdb.ReadBestBlock(locator)) pindexRescan = FindForkInGlobalIndex(chainActive, locator); } + + //We must set m_last_block_processed prior to registering the wallet as a validation interface + walletInstance->m_last_block_processed = pindexRescan; + + RegisterValidationInterface(walletInstance); + if (chainActive.Tip() && chainActive.Tip() != pindexRescan) { //We can't rescan beyond non-pruned blocks, stop and throw an error @@ -4333,7 +4383,14 @@ int CMerkleTx::GetBlocksToMaturity() const } -bool CMerkleTx::AcceptToMemoryPool(const CAmount& nAbsurdFee, CValidationState& state) +bool CWalletTx::AcceptToMemoryPool(const CAmount& nAbsurdFee, CValidationState& state) { - return ::AcceptToMemoryPool(mempool, state, tx, true, nullptr, nullptr, false, nAbsurdFee); + // We must set fInMempool here - while it will be re-set to true by the + // entered-mempool callback, if we did not there would be a race where a + // user could call sendmoney in a loop and hit spurious out of funds errors + // because we think that the transaction they just generated's change is + // unavailable as we're not yet aware its in mempool. + bool ret = ::AcceptToMemoryPool(mempool, state, tx, true, nullptr, nullptr, false, nAbsurdFee); + fInMempool = ret; + return ret; } diff --git a/src/wallet/wallet.h b/src/wallet/wallet.h index f97a99d82a1..d77f235ed20 100644 --- a/src/wallet/wallet.h +++ b/src/wallet/wallet.h @@ -252,8 +252,6 @@ class CMerkleTx int GetDepthInMainChain() const { const CBlockIndex *pindexRet; return GetDepthInMainChain(pindexRet); } bool IsInMainChain() const { const CBlockIndex *pindexRet; return GetDepthInMainChain(pindexRet) > 0; } int GetBlocksToMaturity() const; - /** Pass this transaction to the mempool. Fails if absolute fee exceeds absurd fee. */ - bool AcceptToMemoryPool(const CAmount& nAbsurdFee, CValidationState& state); bool hashUnset() const { return (hashBlock.IsNull() || hashBlock == ABANDON_HASH); } bool isAbandoned() const { return (hashBlock == ABANDON_HASH); } void setAbandoned() { hashBlock = ABANDON_HASH; } @@ -330,6 +328,7 @@ class CWalletTx : public CMerkleTx mutable bool fImmatureWatchCreditCached; mutable bool fAvailableWatchCreditCached; mutable bool fChangeCached; + mutable bool fInMempool; mutable CAmount nDebitCached; mutable CAmount nCreditCached; mutable CAmount nImmatureCreditCached; @@ -369,6 +368,7 @@ class CWalletTx : public CMerkleTx fImmatureWatchCreditCached = false; fAvailableWatchCreditCached = false; fChangeCached = false; + fInMempool = false; nDebitCached = 0; nCreditCached = 0; nImmatureCreditCached = 0; @@ -473,6 +473,9 @@ class CWalletTx : public CMerkleTx // RelayWalletTransaction may only be called if fBroadcastTransactions! bool RelayWalletTransaction(CConnman* connman); + /** Pass this transaction to the mempool. Fails if absolute fee exceeds absurd fee. */ + bool AcceptToMemoryPool(const CAmount& nAbsurdFee, CValidationState& state); + std::set GetConflicts() const; }; @@ -722,6 +725,14 @@ class CWallet : public CCryptoKeyStore, public CValidationInterface std::unique_ptr dbw; + /** + * The following are used to keep track of how far behind the wallet is + * from the chain sync, and to allow clients to block on us being caught up + * + * Protected by cs_main + */ + const CBlockIndex* m_last_block_processed; + public: /* * Main wallet lock. @@ -922,6 +933,7 @@ class CWallet : public CCryptoKeyStore, public CValidationInterface bool AddToWalletIfInvolvingMe(const CTransactionRef& tx, const CBlockIndex* pIndex, int posInBlock, bool fUpdate); int64_t RescanFromTime(int64_t startTime, bool update); CBlockIndex* ScanForWalletTransactions(CBlockIndex* pindexStart, bool fUpdate = false); + void TransactionRemovedFromMempool(const CTransactionRef &ptx) override; void ReacceptWalletTransactions(); void ResendWalletTransactions(int64_t nBestBlockTime, CConnman* connman) override; // ResendWalletTransactionsBefore may only be called if fBroadcastTransactions! @@ -1134,6 +1146,14 @@ class CWallet : public CCryptoKeyStore, public CValidationInterface caller must ensure the current wallet version is correct before calling this function). */ bool SetHDMasterKey(const CPubKey& key); + + /** + * Blocks until the wallet state is up-to-date to /at least/ the current + * chain at the time this function is entered + * Obviously holding cs_main/cs_wallet when going into this call may cause + * deadlock + */ + void BlockUntilSyncedToCurrentChain(); }; /** A key allocated from the key pool. */ diff --git a/test/functional/zmq_test.py b/test/functional/zmq_test.py index 26c946d215e..dc933ae181d 100755 --- a/test/functional/zmq_test.py +++ b/test/functional/zmq_test.py @@ -59,18 +59,27 @@ def _zmq_test(self): self.log.info("Wait for tx") msg = self.zmqSubSocket.recv_multipart() topic = msg[0] - assert_equal(topic, b"hashtx") body = msg[1] msgSequence = struct.unpack('