diff --git a/src/main/AppConnector.cpp b/src/main/AppConnector.cpp index 0ddaa6b6c..130918305 100644 --- a/src/main/AppConnector.cpp +++ b/src/main/AppConnector.cpp @@ -142,6 +142,12 @@ AppConnector::now() const return mApp.getClock().now(); } +VirtualClock::system_time_point +AppConnector::systemNow() const +{ + return mApp.getClock().system_now(); +} + bool AppConnector::shouldYield() const { diff --git a/src/main/AppConnector.h b/src/main/AppConnector.h index c4a320fa5..3511e0202 100644 --- a/src/main/AppConnector.h +++ b/src/main/AppConnector.h @@ -59,6 +59,7 @@ class AppConnector void postOnEvictionBackgroundThread(std::function&& f, std::string const& jobName); VirtualClock::time_point now() const; + VirtualClock::system_time_point systemNow() const; Config const& getConfig() const; rust::Box getModuleCache(); bool overlayShuttingDown() const; diff --git a/src/main/Config.cpp b/src/main/Config.cpp index 7f10981d3..8d7882ab2 100644 --- a/src/main/Config.cpp +++ b/src/main/Config.cpp @@ -77,7 +77,7 @@ static std::unordered_set const TESTING_ONLY_OPTIONS = { // Options that should only be used for testing static std::unordered_set const TESTING_SUGGESTED_OPTIONS = { - "ALLOW_LOCALHOST_FOR_TESTING"}; + "ALLOW_LOCALHOST_FOR_TESTING", "ALLOW_PRIVATE_ADDRESSES_FOR_TESTING"}; namespace { @@ -162,7 +162,7 @@ Config::Config() : NODE_SEED(SecretKey::random()) LEDGER_PROTOCOL_MIN_VERSION_INTERNAL_ERROR_REPORT = 18; OVERLAY_PROTOCOL_MIN_VERSION = 40; - OVERLAY_PROTOCOL_VERSION = 41; + OVERLAY_PROTOCOL_VERSION = 42; VERSION_STR = STELLAR_CORE_VERSION; @@ -199,6 +199,7 @@ Config::Config() : NODE_SEED(SecretKey::random()) std::chrono::seconds::zero(); ARTIFICIALLY_DELAY_LEDGER_CLOSE_FOR_TESTING = std::chrono::milliseconds(0); ALLOW_LOCALHOST_FOR_TESTING = false; + ALLOW_PRIVATE_ADDRESSES_FOR_TESTING = false; USE_CONFIG_FOR_GENESIS = false; GENESIS_TEST_ACCOUNT_COUNT = 0; FAILURE_SAFETY = -1; @@ -1315,6 +1316,10 @@ Config::processConfig(std::shared_ptr t) }}, {"ALLOW_LOCALHOST_FOR_TESTING", [&]() { ALLOW_LOCALHOST_FOR_TESTING = readBool(item); }}, + {"ALLOW_PRIVATE_ADDRESSES_FOR_TESTING", + [&]() { + ALLOW_PRIVATE_ADDRESSES_FOR_TESTING = readBool(item); + }}, {"PUBLISH_TO_ARCHIVE_DELAY", [&]() { PUBLISH_TO_ARCHIVE_DELAY = diff --git a/src/main/Config.h b/src/main/Config.h index f3dd0405b..78dbff6ba 100644 --- a/src/main/Config.h +++ b/src/main/Config.h @@ -575,6 +575,13 @@ class Config : public std::enable_shared_from_this // this should only be enabled when testing as it's a security issue bool ALLOW_LOCALHOST_FOR_TESTING; + // A config to allow gossiping (advertising and accepting in PEERS + // messages) and connecting to RFC1918 private addresses (10/8, 172.16/12, + // 192.168/16). Private addresses are normally filtered out of peer + // exchange, which disables gossip-based peer discovery in environments + // where every node has a private address (e.g. a Kubernetes pod network). + bool ALLOW_PRIVATE_ADDRESSES_FOR_TESTING; + // Set to use config file values for genesis ledger // not setable in config file - only tests are allowed to do this bool USE_CONFIG_FOR_GENESIS; diff --git a/src/main/PersistentState.cpp b/src/main/PersistentState.cpp index 02d24c5be..0e2012d33 100644 --- a/src/main/PersistentState.cpp +++ b/src/main/PersistentState.cpp @@ -23,10 +23,8 @@ std::string PersistentState::mainMapping[kLastEntryMain] = { "networkpassphrase", "rebuildledger"}; std::string PersistentState::miscMapping[kLastEntry] = { - "miscdatabaseschema", - "ledgerupgrades", - "lastscpdataxdr", - "txset", + "miscdatabaseschema", "ledgerupgrades", "lastscpdataxdr", "txset", + "quorumpeerinfo", }; std::string PersistentState::kSQLCreateStatement = diff --git a/src/main/PersistentState.h b/src/main/PersistentState.h index b87909c55..7f2065d1f 100644 --- a/src/main/PersistentState.h +++ b/src/main/PersistentState.h @@ -36,6 +36,7 @@ class PersistentState kLedgerUpgrades, kLastSCPDataXDR, kTxSet, + kQuorumPeerInfo, kLastEntry, }; diff --git a/src/overlay/OverlayManager.h b/src/overlay/OverlayManager.h index ad6b9c73d..8b3589333 100644 --- a/src/overlay/OverlayManager.h +++ b/src/overlay/OverlayManager.h @@ -50,6 +50,7 @@ namespace stellar class PeerAuth; class PeerBareAddress; class PeerManager; +class QuorumPeerState; class SurveyManager; struct StellarMessage; @@ -141,6 +142,8 @@ class OverlayManager virtual bool acceptAuthenticatedPeer(Peer::pointer peer) = 0; virtual bool isPreferred(Peer* peer) const = 0; + virtual bool isDirectQsetPeer(NodeID const& nodeID) const = 0; + virtual void recordProbedNonQsetAddress(PeerBareAddress const& address) = 0; virtual bool isPossiblyPreferred(std::string const& ip) const = 0; virtual bool haveSpaceForConnection(std::string const& ip) const = 0; @@ -189,6 +192,8 @@ class OverlayManager // Return the persistent peer manager virtual PeerManager& getPeerManager() = 0; + virtual QuorumPeerState& getQuorumPeerState() = 0; + virtual void persistQuorumPeerState() = 0; virtual SurveyManager& getSurveyManager() = 0; diff --git a/src/overlay/OverlayManagerImpl.cpp b/src/overlay/OverlayManagerImpl.cpp index e9c891973..a4f9a95c7 100644 --- a/src/overlay/OverlayManagerImpl.cpp +++ b/src/overlay/OverlayManagerImpl.cpp @@ -14,13 +14,16 @@ #include "main/Application.h" #include "main/Config.h" #include "main/ErrorMessages.h" +#include "main/PersistentState.h" #include "overlay/OverlayMetrics.h" #include "overlay/PeerBareAddress.h" #include "overlay/PeerManager.h" +#include "overlay/QuorumPeerState.h" #include "overlay/RandomPeerSource.h" #include "overlay/SurveyDataManager.h" #include "overlay/TCPPeer.h" #include "overlay/TxDemandsManager.h" +#include "scp/LocalNode.h" #include "util/GlobalChecks.h" #include "util/JitterInjection.h" #include "util/Logging.h" @@ -45,6 +48,7 @@ using namespace std; constexpr std::chrono::seconds PEER_IP_RESOLVE_DELAY(600); constexpr std::chrono::seconds PEER_IP_RESOLVE_RETRY_DELAY(10); constexpr std::chrono::seconds OUT_OF_SYNC_RECONNECT_DELAY(60); +constexpr std::chrono::seconds QUORUM_PEER_STALE_ADDRESS_TTL(24 * 60 * 60); constexpr uint32_t INITIAL_PEER_FLOOD_READING_CAPACITY_BYTES{300000}; constexpr uint32_t INITIAL_FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES{100000}; @@ -210,6 +214,13 @@ OverlayManagerImpl::PeersList::acceptAuthenticatedPeer(Peer::pointer peer) ZoneScoped; releaseAssert(threadIsMain()); + if (peer->isMutualQsetPeer()) + { + // Mutual direct-qset peers are operator-bounded by QUORUM_SET and must + // not consume ordinary inbound/outbound authenticated capacity. + return moveToAuthenticated(peer); + } + CLOG_TRACE(Overlay, "Trying to promote peer to authenticated {}", peer->toString()); if (mOverlayManager.isPreferred(peer.get())) @@ -331,6 +342,12 @@ OverlayManagerImpl::OverlayManagerImpl(Application& app) mPeerManager, RandomPeerSource::nextAttemptCutoff(PeerType::OUTBOUND)); mPeerSources[PeerType::PREFERRED] = std::make_unique( mPeerManager, RandomPeerSource::nextAttemptCutoff(PeerType::PREFERRED)); + + LocalNode::forAllNodes(mApp.getConfig().QUORUM_SET, + [&](NodeID const& nodeID) { + mDirectQsetPeers.insert(nodeID); + return true; + }); } OverlayManagerImpl::~OverlayManagerImpl() @@ -342,6 +359,8 @@ OverlayManagerImpl::start() { mDoor.start(); mTimer.expires_from_now(std::chrono::seconds(2)); + reconcileQuorumPeerState(); + seedQuorumPeerAddresses(); if (!mApp.getConfig().RUN_STANDALONE) { @@ -359,6 +378,73 @@ OverlayManagerImpl::start() mTxDemandsManager.start(); } +void +OverlayManagerImpl::persistQuorumPeerState() +{ + mApp.getPersistentState().setMiscState(PersistentState::kQuorumPeerInfo, + mQuorumPeerState.toJson()); +} + +void +OverlayManagerImpl::reconcileQuorumPeerState() +{ + auto& session = mApp.getDatabase().getMiscSession(); + mQuorumPeerState = + QuorumPeerState::fromJson(mApp.getPersistentState().getState( + PersistentState::kQuorumPeerInfo, session)); + mQuorumPeerState.reconcile(mDirectQsetPeers); + persistQuorumPeerState(); +} + +void +OverlayManagerImpl::seedQuorumPeerAddresses() +{ + for (auto const& entry : mQuorumPeerState.getInfo()) + { + auto const& info = entry.second; + if (!info.address) + { + continue; + } + + if (info.remoteRole == RemoteQsetRole::Direct) + { + getPeerManager().update(*info.address, PeerType::PREFERRED, + /* preferredTypeKnown */ true, + PeerManager::BackOffUpdate::HARD_RESET); + } + else + { + getPeerManager().ensureExists(*info.address); + getPeerManager().update(*info.address, + PeerManager::BackOffUpdate::HARD_RESET); + } + } +} + +void +OverlayManagerImpl::expireStaleQuorumPeerAddresses() +{ + auto const now = static_cast( + VirtualClock::to_time_t(mApp.getClock().system_now())); + auto expired = mQuorumPeerState.expireStaleAddresses( + now, QUORUM_PEER_STALE_ADDRESS_TTL); + for (auto const& entry : expired) + { + auto const& info = entry.second; + if (info.address) + { + getPeerManager().update(*info.address, PeerType::OUTBOUND, + /* preferredTypeKnown */ true); + } + } + + if (!expired.empty()) + { + persistQuorumPeerState(); + } +} + uint32_t OverlayManagerImpl::getFlowControlBytesTotal() const { @@ -600,6 +686,70 @@ OverlayManagerImpl::connectTo(std::vector const& peers, return count; } +void +OverlayManagerImpl::connectToQsetPeers(int& availablePendingSlots) +{ + ZoneScoped; + releaseAssert(availablePendingSlots >= 0); + if (availablePendingSlots == 0) + { + return; + } + + auto missing = mDirectQsetPeers; + missing.erase(mApp.getConfig().NODE_SEED.getPublicKey()); + for (auto const& peer : getAuthenticatedPeers()) + { + missing.erase(peer.first); + } + for (auto it = missing.begin(); it != missing.end();) + { + auto info = mQuorumPeerState.getInfo(*it); + if (info && info->remoteRole == RemoteQsetRole::None) + { + it = missing.erase(it); + } + else + { + ++it; + } + } + + if (missing.empty()) + { + return; + } + + constexpr auto QSET_PROBE_BATCH_SIZE = 4; + std::vector candidates; + auto appendCandidates = [&](PeerType peerType) { + auto peers = getPeersToConnectTo(QSET_PROBE_BATCH_SIZE, peerType); + candidates.insert(std::end(candidates), std::begin(peers), + std::end(peers)); + }; + appendCandidates(PeerType::INBOUND); + appendCandidates(PeerType::OUTBOUND); + + std::set tried; + for (auto const& address : candidates) + { + if (availablePendingSlots == 0) + { + return; + } + if (mProbedNonQset.find(address) != std::end(mProbedNonQset) || + getConnectedPeer(address) || !tried.insert(address).second) + { + continue; + } + + if (connectToImpl(address, false)) + { + --availablePendingSlots; + } + } +} + void OverlayManagerImpl::updateTimerAndMaybeDropRandomPeer(bool shouldDrop) { @@ -689,6 +839,7 @@ OverlayManagerImpl::tick() cleanupPeers(mInboundPeers); cleanupPeers(mOutboundPeers); + expireStaleQuorumPeerAddresses(); if (futureIsReady(mResolvedPeers)) { @@ -787,6 +938,8 @@ OverlayManagerImpl::tick() availablePendingSlots -= pendingUsedByOutbound; } + connectToQsetPeers(availablePendingSlots); + // Finally, attempt to promote some inbound connections to outbound if (availablePendingSlots > 0) { @@ -820,10 +973,16 @@ OverlayManagerImpl::availableOutboundAuthenticatedSlots() const ? OverlayManager::MIN_INBOUND_FACTOR : mApp.getConfig().TARGET_PEER_CONNECTIONS; - if (mOutboundPeers.mAuthenticated.size() < adjustedTarget) + auto mutualQsetCount = std::count_if( + std::begin(mOutboundPeers.mAuthenticated), + std::end(mOutboundPeers.mAuthenticated), + [](auto const& peer) { return peer.second->isMutualQsetPeer(); }); + auto ordinaryOutboundCount = + mOutboundPeers.mAuthenticated.size() - mutualQsetCount; + + if (ordinaryOutboundCount < adjustedTarget) { - return static_cast(adjustedTarget - - mOutboundPeers.mAuthenticated.size()); + return static_cast(adjustedTarget - ordinaryOutboundCount); } else { @@ -1072,6 +1231,15 @@ OverlayManagerImpl::isPreferred(Peer* peer) const { std::string pstr = peer->toString(); + // A remote can only become a mutual direct-qset peer if its key is already + // in our configured QUORUM_SET, so eviction rights are bounded by the + // operator's own qset size. + if (peer->isMutualQsetPeer()) + { + CLOG_DEBUG(Overlay, "Peer {} is a mutual direct qset peer", pstr); + return true; + } + if (mConfigurationPreferredPeers.find(peer->getAddress()) != mConfigurationPreferredPeers.end()) { @@ -1096,6 +1264,20 @@ OverlayManagerImpl::isPreferred(Peer* peer) const return false; } +bool +OverlayManagerImpl::isDirectQsetPeer(NodeID const& nodeID) const +{ + return mDirectQsetPeers.count(nodeID) != 0; +} + +void +OverlayManagerImpl::recordProbedNonQsetAddress(PeerBareAddress const& address) +{ + releaseAssert(threadIsMain()); + releaseAssert(!address.isEmpty()); + mProbedNonQset.insert(address); +} + static xdr::opaque_array<32> const TX_BATCH_HASH = [] { xdr::opaque_array<32> bytes{}; for (auto& b : bytes) @@ -1317,6 +1499,12 @@ OverlayManagerImpl::getPeerManager() return mPeerManager; } +QuorumPeerState& +OverlayManagerImpl::getQuorumPeerState() +{ + return mQuorumPeerState; +} + SurveyManager& OverlayManagerImpl::getSurveyManager() { diff --git a/src/overlay/OverlayManagerImpl.h b/src/overlay/OverlayManagerImpl.h index 8eae245fa..d067317b8 100644 --- a/src/overlay/OverlayManagerImpl.h +++ b/src/overlay/OverlayManagerImpl.h @@ -14,6 +14,7 @@ #include "overlay/Floodgate.h" #include "overlay/OverlayManager.h" #include "overlay/OverlayMetrics.h" +#include "overlay/QuorumPeerState.h" #include "overlay/SurveyManager.h" #include "overlay/TxDemandsManager.h" #include "util/Timer.h" @@ -41,6 +42,12 @@ class OverlayManagerImpl : public OverlayManager protected: Application& mApp; std::set mConfigurationPreferredPeers; + std::set mDirectQsetPeers; + // Addresses that were proactively probed while searching for direct-qset + // peers and authenticated as non-qset peers. This is intentionally + // in-memory only; next-attempt backoff limits churn until restart. + std::set mProbedNonQset; + QuorumPeerState mQuorumPeerState; struct PeersList { @@ -126,10 +133,15 @@ class OverlayManagerImpl : public OverlayManager bool addOutboundConnection(Peer::pointer peer) override; void removePeer(Peer* peer) override; void storeConfigPeers(); + void reconcileQuorumPeerState(); + void seedQuorumPeerAddresses(); + void expireStaleQuorumPeerAddresses(); void purgeDeadPeers(); bool acceptAuthenticatedPeer(Peer::pointer peer) override; bool isPreferred(Peer* peer) const override; + bool isDirectQsetPeer(NodeID const& nodeID) const override; + void recordProbedNonQsetAddress(PeerBareAddress const& address) override; std::vector const& getInboundPendingPeers() const override; std::vector const& getOutboundPendingPeers() const override; std::vector getPendingPeers() const override; @@ -157,6 +169,8 @@ class OverlayManagerImpl : public OverlayManager PeerAuth& getPeerAuth() override; PeerManager& getPeerManager() override; + QuorumPeerState& getQuorumPeerState() override; + void persistQuorumPeerState() override; SurveyManager& getSurveyManager() override; @@ -199,6 +213,7 @@ class OverlayManagerImpl : public OverlayManager int connectTo(int maxNum, PeerType peerType); int connectTo(std::vector const& peers, bool forceoutbound); + void connectToQsetPeers(int& availablePendingSlots); std::vector getPeersToConnectTo(int maxNum, PeerType peerType); diff --git a/src/overlay/Peer.cpp b/src/overlay/Peer.cpp index a16122f00..1fc8243e0 100644 --- a/src/overlay/Peer.cpp +++ b/src/overlay/Peer.cpp @@ -23,6 +23,7 @@ #include "overlay/OverlayMetrics.h" #include "overlay/PeerAuth.h" #include "overlay/PeerManager.h" +#include "overlay/QuorumPeerState.h" #include "overlay/SurveyDataManager.h" #include "overlay/SurveyManager.h" #include "overlay/TxAdverts.h" @@ -42,7 +43,6 @@ #include #include -#include // LATER: need to add some way of docking peers that are misbehaving by sending // you bad data @@ -59,6 +59,7 @@ namespace // Maximum number of GET_SCP_STATE requests per window per peer to respond to. A // window defaults to roughly 1 minute. constexpr uint32_t GET_SCP_STATE_MAX_RATE = 10; +constexpr uint32_t QUORUM_PEERING_OVERLAY_VERSION = 42; // Check the signature(s) in `tx`, adding the result to the signature cache in // the process. This function requires that background signature verification @@ -544,7 +545,15 @@ Peer::sendAuth() ZoneScoped; StellarMessage msg; msg.type(AUTH); - msg.auth().flags = AUTH_MSG_FLAG_FLOW_CONTROL_BYTES_REQUESTED; + auto flags = AUTH_MSG_FLAG_FLOW_CONTROL_BYTES_REQUESTED; + if (mAppConnector.getConfig().OVERLAY_PROTOCOL_VERSION >= + QUORUM_PEERING_OVERLAY_VERSION && + mRemoteOverlayVersion >= QUORUM_PEERING_OVERLAY_VERSION && + mAppConnector.getOverlayManager().isDirectQsetPeer(mPeerID)) + { + flags |= AUTH_MSG_FLAG_PEER_IN_QUORUM; + } + msg.auth().flags = flags; auto msgPtr = std::make_shared(msg); sendMessage(msgPtr); } @@ -1772,6 +1781,32 @@ Peer::updatePeerRecordAfterAuthentication() getAddress(), PeerManager::BackOffUpdate::RESET); } + auto& overlayManager = mAppConnector.getOverlayManager(); + if (overlayManager.isDirectQsetPeer(mPeerID)) + { + auto const remoteRole = + mRemoteOverlayVersion < QUORUM_PEERING_OVERLAY_VERSION + ? RemoteQsetRole::Unknown + : (mPeerHasUsInQset ? RemoteQsetRole::Direct + : RemoteQsetRole::None); + auto const now = static_cast( + VirtualClock::to_time_t(mAppConnector.systemNow())); + overlayManager.getQuorumPeerState().recordHandshake(mPeerID, remoteRole, + getAddress(), now); + overlayManager.persistQuorumPeerState(); + + if (mIsMutualQsetPeer) + { + overlayManager.getPeerManager().update( + getAddress(), PeerType::PREFERRED, + /* preferredTypeKnown */ true); + } + } + else + { + overlayManager.recordProbedNonQsetAddress(getAddress()); + } + CLOG_DEBUG(Overlay, "successful handshake with {}@{}", mAppConnector.getConfig().toShortString(mPeerID), toString()); } @@ -1949,12 +1984,22 @@ Peer::recvAuth(StellarMessage const& msg) sendPeers(); } - if (msg.auth().flags != AUTH_MSG_FLAG_FLOW_CONTROL_BYTES_REQUESTED) + if ((msg.auth().flags & AUTH_MSG_FLAG_FLOW_CONTROL_BYTES_REQUESTED) != + AUTH_MSG_FLAG_FLOW_CONTROL_BYTES_REQUESTED) { sendErrorAndDrop(ERR_CONF, "flow control bytes disabled"); return; } + if (mRemoteOverlayVersion >= QUORUM_PEERING_OVERLAY_VERSION) + { + mPeerHasUsInQset = + (msg.auth().flags & AUTH_MSG_FLAG_PEER_IN_QUORUM) != 0; + mIsMutualQsetPeer = + mPeerHasUsInQset && + mAppConnector.getOverlayManager().isDirectQsetPeer(mPeerID); + } + updatePeerRecordAfterAuthentication(); auto self = shared_from_this(); @@ -2018,7 +2063,8 @@ Peer::recvPeers(StellarMessage const& msg) releaseAssert(peer.ip.type() == IPv4); auto address = PeerBareAddress{peer}; - if (address.isPrivate()) + if (address.isPrivate() && + !mAppConnector.getConfig().ALLOW_PRIVATE_ADDRESSES_FOR_TESTING) { CLOG_DEBUG(Overlay, "ignoring received private address {}", address.toString()); diff --git a/src/overlay/Peer.h b/src/overlay/Peer.h index 3435338fe..31bfd3810 100644 --- a/src/overlay/Peer.h +++ b/src/overlay/Peer.h @@ -276,6 +276,8 @@ class Peer : public std::enable_shared_from_this, QueryInfo mTxSetQueryInfo; QueryInfo mSCPStateQueryInfo; bool mPeersReceived{false}; + bool mPeerHasUsInQset{false}; + bool mIsMutualQsetPeer{false}; static Hash pingIDfromTimePoint(VirtualClock::time_point const& tp); void pingPeer(); @@ -419,6 +421,13 @@ class Peer : public std::enable_shared_from_this, return mPeerID; } + bool + isMutualQsetPeer() const + { + releaseAssert(threadIsMain()); + return mIsMutualQsetPeer; + } + std::string const& toString(); void startExecutionDelayedTimer( diff --git a/src/overlay/PeerManager.cpp b/src/overlay/PeerManager.cpp index f204a5beb..09cc0f551 100644 --- a/src/overlay/PeerManager.cpp +++ b/src/overlay/PeerManager.cpp @@ -174,7 +174,8 @@ PeerManager::removePeersWithManyFailures(size_t minNumFailures, { auto& db = mApp.getDatabase(); auto sql = std::string{ - "DELETE FROM peers WHERE numfailures >= :minNumFailures"}; + "DELETE FROM peers WHERE numfailures >= :minNumFailures " + "AND type != :preferredType"}; if (address) { sql += " AND ip = :ip"; @@ -185,6 +186,8 @@ PeerManager::removePeersWithManyFailures(size_t minNumFailures, auto& st = prep.statement(); st.exchange(use(minNumFailures)); + auto preferredType = static_cast(PeerType::PREFERRED); + st.exchange(use(preferredType)); std::string ip; if (address) @@ -211,8 +214,10 @@ std::vector PeerManager::getPeersToSend(size_t size, PeerBareAddress const& address) { ZoneScoped; + bool const allowPrivate = + mApp.getConfig().ALLOW_PRIVATE_ADDRESSES_FOR_TESTING; auto keep = [&](PeerBareAddress const& pba) { - return !pba.isPrivate() && pba != address; + return (allowPrivate || !pba.isPrivate()) && pba != address; }; auto peers = mOutboundPeersToSend->getRandomPeers(size, keep); @@ -363,13 +368,12 @@ namespace { static std::chrono::seconds -computeBackoff(size_t numFailures) +computeBackoff(size_t numFailures, PeerType peerType) { constexpr uint32 const SECONDS_PER_BACKOFF = 10; - constexpr size_t const MAX_BACKOFF_EXPONENT = 10; - uint32 backoffCount = static_cast( - std::min(MAX_BACKOFF_EXPONENT, numFailures)); + uint32 backoffCount = static_cast(std::min( + PeerManager::maxBackoffExponent(peerType), numFailures)); auto nsecs = std::chrono::seconds(static_cast(getGlobalRandomEngine()()) % ((1u << backoffCount) * SECONDS_PER_BACKOFF) + @@ -378,6 +382,12 @@ computeBackoff(size_t numFailures) } } +size_t +PeerManager::maxBackoffExponent(PeerType peerType) +{ + return peerType == PeerType::PREFERRED ? 3 : 10; +} + void PeerManager::update(PeerRecord& peer, BackOffUpdate backOff, Application& app) { @@ -395,8 +405,9 @@ PeerManager::update(PeerRecord& peer, BackOffUpdate backOff, Application& app) { peer.mNumFailures = backOff == BackOffUpdate::RESET ? 0 : peer.mNumFailures + 1; - auto nextAttempt = - app.getClock().system_now() + computeBackoff(peer.mNumFailures); + auto peerType = static_cast(peer.mType); + auto nextAttempt = app.getClock().system_now() + + computeBackoff(peer.mNumFailures, peerType); peer.mNextAttempt = VirtualClock::systemPointToTm(nextAttempt); break; } diff --git a/src/overlay/PeerManager.h b/src/overlay/PeerManager.h index 67fb686b2..24fee8d90 100644 --- a/src/overlay/PeerManager.h +++ b/src/overlay/PeerManager.h @@ -78,6 +78,7 @@ class PeerManager }; static void maybeDropAndCreateNew(SessionWrapper& sess); + static size_t maxBackoffExponent(PeerType peerType); explicit PeerManager(Application& app); diff --git a/src/overlay/QuorumPeerState.cpp b/src/overlay/QuorumPeerState.cpp new file mode 100644 index 000000000..c22ebd2e5 --- /dev/null +++ b/src/overlay/QuorumPeerState.cpp @@ -0,0 +1,228 @@ +// Copyright 2026 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "overlay/QuorumPeerState.h" + +#include "crypto/KeyUtils.h" +#include "lib/json/json.h" + +#include +#include +#include +#include + +namespace stellar +{ + +namespace +{ + +std::string +roleToString(RemoteQsetRole role) +{ + switch (role) + { + case RemoteQsetRole::Unknown: + return "unknown"; + case RemoteQsetRole::None: + return "none"; + case RemoteQsetRole::Direct: + return "direct"; + default: + throw std::runtime_error("invalid remote qset role"); + } +} + +RemoteQsetRole +roleFromString(std::string const& role) +{ + if (role == "unknown") + { + return RemoteQsetRole::Unknown; + } + if (role == "none") + { + return RemoteQsetRole::None; + } + if (role == "direct") + { + return RemoteQsetRole::Direct; + } + throw std::runtime_error("invalid remote qset role"); +} + +std::optional +addressFromString(std::string const& address) +{ + static std::regex re( + "^(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3})\\:(\\d{1,5})$"); + std::smatch m; + if (!std::regex_search(address, m, re) || m.empty()) + { + return std::nullopt; + } + + int parsedPort = atoi(m[2].str().c_str()); + if (parsedPort <= 0 || parsedPort > UINT16_MAX) + { + return std::nullopt; + } + + try + { + return PeerBareAddress{m[1].str(), + static_cast(parsedPort)}; + } + catch (...) + { + return std::nullopt; + } +} + +} + +void +QuorumPeerState::reconcile(std::set const& directQset) +{ + for (auto it = mInfo.begin(); it != mInfo.end();) + { + if (directQset.count(it->first) == 0) + { + it = mInfo.erase(it); + } + else + { + ++it; + } + } + + for (auto const& nodeID : directQset) + { + mInfo.emplace(nodeID, QuorumPeerInfo{}); + } +} + +void +QuorumPeerState::recordHandshake(NodeID const& nodeID, + RemoteQsetRole remoteRole, + PeerBareAddress const& address, + uint64_t nowSecs) +{ + auto& info = mInfo[nodeID]; + info.remoteRole = remoteRole; + info.address = address; + info.lastConnection = nowSecs; +} + +std::vector> +QuorumPeerState::expireStaleAddresses(uint64_t nowSecs, + std::chrono::seconds ttl) +{ + std::vector> expired; + auto const ttlSecs = static_cast(ttl.count()); + + for (auto& [nodeID, info] : mInfo) + { + if ((info.address || info.remoteRole != RemoteQsetRole::Unknown) && + info.lastConnection != 0 && nowSecs > info.lastConnection && + nowSecs - info.lastConnection > ttlSecs) + { + expired.emplace_back(nodeID, info); + info.remoteRole = RemoteQsetRole::Unknown; + info.address.reset(); + } + } + + return expired; +} + +std::string +QuorumPeerState::toJson() const +{ + Json::Value root; + root["peers"] = Json::arrayValue; + + std::vector nodeIDs; + nodeIDs.reserve(mInfo.size()); + for (auto const& [nodeID, _] : mInfo) + { + nodeIDs.push_back(nodeID); + } + std::sort(nodeIDs.begin(), nodeIDs.end()); + + for (auto const& nodeID : nodeIDs) + { + auto const& info = mInfo.at(nodeID); + Json::Value peer; + peer["nodeID"] = KeyUtils::toStrKey(nodeID); + peer["remoteRole"] = roleToString(info.remoteRole); + peer["lastConnection"] = static_cast(info.lastConnection); + if (info.address) + { + peer["address"] = info.address->toString(); + } + root["peers"].append(peer); + } + + return Json::FastWriter().write(root); +} + +QuorumPeerState +QuorumPeerState::fromJson(std::string const& json) +{ + QuorumPeerState state; + if (json.empty()) + { + return state; + } + + Json::Value root; + Json::Reader reader; + if (!reader.parse(json, root) || !root.isObject() || + !root["peers"].isArray()) + { + return state; + } + + for (auto const& peer : root["peers"]) + { + try + { + auto nodeID = + KeyUtils::fromStrKey(peer["nodeID"].asString()); + QuorumPeerInfo info; + info.remoteRole = roleFromString(peer["remoteRole"].asString()); + info.lastConnection = peer["lastConnection"].asUInt64(); + if (peer.isMember("address")) + { + info.address = addressFromString(peer["address"].asString()); + } + state.mInfo[nodeID] = info; + } + catch (...) + { + } + } + + return state; +} + +UnorderedMap const& +QuorumPeerState::getInfo() const +{ + return mInfo; +} + +std::optional +QuorumPeerState::getInfo(NodeID const& nodeID) const +{ + auto it = mInfo.find(nodeID); + if (it == mInfo.end()) + { + return std::nullopt; + } + return it->second; +} + +} diff --git a/src/overlay/QuorumPeerState.h b/src/overlay/QuorumPeerState.h new file mode 100644 index 000000000..1da8a39de --- /dev/null +++ b/src/overlay/QuorumPeerState.h @@ -0,0 +1,52 @@ +// Copyright 2026 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#pragma once + +#include "overlay/PeerBareAddress.h" +#include "util/UnorderedMap.h" +#include "xdr/Stellar-types.h" + +#include +#include +#include +#include +#include + +namespace stellar +{ + +enum class RemoteQsetRole : uint32_t +{ + Unknown = 0, + None = 1, + Direct = 2 +}; + +struct QuorumPeerInfo +{ + RemoteQsetRole remoteRole{RemoteQsetRole::Unknown}; + std::optional address; + uint64_t lastConnection{0}; +}; + +class QuorumPeerState +{ + UnorderedMap mInfo; + + public: + void reconcile(std::set const& directQset); + void recordHandshake(NodeID const& nodeID, RemoteQsetRole remoteRole, + PeerBareAddress const& address, uint64_t nowSecs); + std::vector> + expireStaleAddresses(uint64_t nowSecs, std::chrono::seconds ttl); + + std::string toJson() const; + static QuorumPeerState fromJson(std::string const& json); + + UnorderedMap const& getInfo() const; + std::optional getInfo(NodeID const& nodeID) const; +}; + +} diff --git a/src/overlay/test/OverlayManagerTests.cpp b/src/overlay/test/OverlayManagerTests.cpp index d339d7bd2..ccd4f7af8 100644 --- a/src/overlay/test/OverlayManagerTests.cpp +++ b/src/overlay/test/OverlayManagerTests.cpp @@ -34,8 +34,9 @@ class PeerStub : public Peer { public: int mSent = 0; - PeerStub(Application& app, PeerBareAddress const& address) - : Peer(app, WE_CALLED_REMOTE) + PeerStub(Application& app, PeerBareAddress const& address, + Peer::PeerRole role = WE_CALLED_REMOTE) + : Peer(app, role) { mPeerID = SecretKey::pseudoRandomForTesting().getPublicKey(); mState = GOT_AUTH; @@ -61,6 +62,18 @@ class PeerStub : public Peer { } + void + setPeerID(NodeID const& nodeID) + { + mPeerID = nodeID; + } + + void + setMutualQsetPeer(bool isMutualQsetPeer) + { + mIsMutualQsetPeer = isMutualQsetPeer; + } + void setPullMode() { @@ -79,6 +92,10 @@ class PeerStub : public Peer class OverlayManagerStub : public OverlayManagerImpl { public: + std::map mPeerIDs; + std::set mMutualQsetAddresses; + std::vector mConnectionAttempts; + OverlayManagerStub(Application& app) : OverlayManagerImpl(app) { } @@ -86,6 +103,7 @@ class OverlayManagerStub : public OverlayManagerImpl virtual bool connectToImpl(PeerBareAddress const& address, bool) override { + mConnectionAttempts.push_back(address); if (getConnectedPeer(address)) { return false; @@ -94,10 +112,70 @@ class OverlayManagerStub : public OverlayManagerImpl getPeerManager().update(address, PeerManager::BackOffUpdate::INCREASE); auto peerStub = std::make_shared(mApp, address); + auto idIt = mPeerIDs.find(address); + if (idIt != std::end(mPeerIDs)) + { + peerStub->setPeerID(idIt->second); + } + auto isDirectQset = isDirectQsetPeer(peerStub->getPeerID()); + peerStub->setMutualQsetPeer(isDirectQset && + mMutualQsetAddresses.find(address) != + std::end(mMutualQsetAddresses)); + if (!isDirectQset) + { + recordProbedNonQsetAddress(address); + } peerStub->setPullMode(); REQUIRE(addOutboundConnection(peerStub)); return acceptAuthenticatedPeer(peerStub); } + + Peer::pointer + authenticatePeer(PeerBareAddress const& address, Peer::PeerRole role, + bool isMutualQsetPeer = false, + NodeID const* nodeID = nullptr) + { + auto peerStub = std::make_shared(mApp, address, role); + if (nodeID) + { + peerStub->setPeerID(*nodeID); + } + peerStub->setMutualQsetPeer(isMutualQsetPeer); + if (role == Peer::WE_CALLED_REMOTE) + { + REQUIRE(addOutboundConnection(peerStub)); + } + else + { + maybeAddInboundConnection(peerStub); + } + REQUIRE(acceptAuthenticatedPeer(peerStub)); + return peerStub; + } + + void + addDirectQsetPeerForTesting(NodeID const& nodeID) + { + mDirectQsetPeers.insert(nodeID); + } + + void + setPeerIDForAddress(PeerBareAddress const& address, NodeID const& nodeID) + { + mPeerIDs[address] = nodeID; + } + + void + setMutualQsetAddress(PeerBareAddress const& address) + { + mMutualQsetAddresses.insert(address); + } + + bool + wasProbedNonQset(PeerBareAddress const& address) const + { + return mProbedNonQset.find(address) != std::end(mProbedNonQset); + } }; class OverlayManagerTests @@ -306,6 +384,141 @@ class OverlayManagerTests std::vector expectedFinal{2, 2, 1, 2, 2}; REQUIRE(sentCounts(pm) == expectedFinal); } + + NodeID + randomNodeID() + { + return SecretKey::pseudoRandomForTesting().getPublicKey(); + } + + PeerBareAddress + localhost(unsigned short port) + { + return PeerBareAddress{"127.0.0.1", port}; + } + + void + testQsetPeerAuthenticatedCapExemption() + { + OverlayManagerStub& pm = app->getOverlayManager(); + + for (auto i = 0; i < app->getConfig().TARGET_PEER_CONNECTIONS; ++i) + { + pm.authenticatePeer(localhost(3000 + i), Peer::WE_CALLED_REMOTE); + } + REQUIRE(pm.mOutboundPeers.mAuthenticated.size() == + app->getConfig().TARGET_PEER_CONNECTIONS); + + pm.authenticatePeer(localhost(4000), Peer::WE_CALLED_REMOTE, + /* isMutualQsetPeer */ true); + REQUIRE(pm.mOutboundPeers.mAuthenticated.size() == + app->getConfig().TARGET_PEER_CONNECTIONS + 1); + + for (auto i = 0; i < app->getConfig().MAX_ADDITIONAL_PEER_CONNECTIONS; + ++i) + { + pm.authenticatePeer(localhost(5000 + i), Peer::REMOTE_CALLED_US); + } + REQUIRE(pm.mInboundPeers.mAuthenticated.size() == + app->getConfig().MAX_ADDITIONAL_PEER_CONNECTIONS); + + pm.authenticatePeer(localhost(6000), Peer::REMOTE_CALLED_US, + /* isMutualQsetPeer */ true); + REQUIRE(pm.mInboundPeers.mAuthenticated.size() == + app->getConfig().MAX_ADDITIONAL_PEER_CONNECTIONS + 1); + } + + void + testQsetPeersDoNotConsumeOutboundSlots() + { + OverlayManagerStub& pm = app->getOverlayManager(); + auto target = app->getConfig().TARGET_PEER_CONNECTIONS; + + for (auto i = 0; i < target; ++i) + { + pm.authenticatePeer(localhost(7000 + i), Peer::WE_CALLED_REMOTE); + } + REQUIRE(pm.availableOutboundAuthenticatedSlots() == 0); + REQUIRE(pm.nonPreferredAuthenticatedCount() == target); + + for (auto i = 0; i < target + 3; ++i) + { + pm.authenticatePeer(localhost(8000 + i), Peer::WE_CALLED_REMOTE, + /* isMutualQsetPeer */ true); + } + REQUIRE(pm.availableOutboundAuthenticatedSlots() == 0); + REQUIRE(pm.nonPreferredAuthenticatedCount() == target); + REQUIRE(pm.mOutboundPeers.mAuthenticated.size() == + static_cast(target * 2 + 3)); + } + + void + testQsetPeerDiscoveryProbesInboundCandidates() + { + OverlayManagerStub& pm = app->getOverlayManager(); + + auto qsetNode = randomNodeID(); + auto qsetAddress = localhost(9000); + auto nonQsetAddress = localhost(9001); + pm.addDirectQsetPeerForTesting(qsetNode); + pm.setPeerIDForAddress(qsetAddress, qsetNode); + pm.setMutualQsetAddress(qsetAddress); + pm.getPeerManager().ensureExists(qsetAddress); + pm.getPeerManager().ensureExists(nonQsetAddress); + + int availablePendingSlots = 10; + pm.connectToQsetPeers(availablePendingSlots); + + REQUIRE(pm.getAuthenticatedPeers().find(qsetNode) != + std::end(pm.getAuthenticatedPeers())); + REQUIRE(pm.mOutboundPeers.mAuthenticated.size() >= 1); + REQUIRE(availablePendingSlots < 10); + + auto attemptsAfterFirstProbe = pm.mConnectionAttempts.size(); + pm.connectToQsetPeers(availablePendingSlots); + REQUIRE(pm.mConnectionAttempts.size() == attemptsAfterFirstProbe); + } + + void + testNonQsetProbesAreRecorded() + { + OverlayManagerStub& pm = app->getOverlayManager(); + + auto missingQsetNode = randomNodeID(); + auto nonQsetAddress = localhost(9100); + pm.addDirectQsetPeerForTesting(missingQsetNode); + pm.getPeerManager().ensureExists(nonQsetAddress); + + int availablePendingSlots = 10; + pm.connectToQsetPeers(availablePendingSlots); + + REQUIRE(pm.wasProbedNonQset(nonQsetAddress)); + + auto attemptsAfterFirstProbe = pm.mConnectionAttempts.size(); + pm.connectToQsetPeers(availablePendingSlots); + REQUIRE(pm.mConnectionAttempts.size() == attemptsAfterFirstProbe); + } + + void + testKnownNonMutualQsetPeersAreNotProbed() + { + OverlayManagerStub& pm = app->getOverlayManager(); + + auto nonMutualQsetNode = randomNodeID(); + auto nonMutualQsetAddress = localhost(9200); + pm.addDirectQsetPeerForTesting(nonMutualQsetNode); + pm.getQuorumPeerState().recordHandshake( + nonMutualQsetNode, RemoteQsetRole::None, nonMutualQsetAddress, + static_cast( + VirtualClock::to_time_t(app->getClock().system_now()))); + pm.getPeerManager().ensureExists(nonMutualQsetAddress); + + int availablePendingSlots = 10; + pm.connectToQsetPeers(availablePendingSlots); + + REQUIRE(pm.mConnectionAttempts.empty()); + REQUIRE(availablePendingSlots == 10); + } }; TEST_CASE_METHOD(OverlayManagerTests, "storeConfigPeers() adds", "[overlay]") @@ -329,4 +542,39 @@ TEST_CASE_METHOD(OverlayManagerTests, "broadcast() broadcasts", "[overlay]") { testBroadcast(); } + +TEST_CASE_METHOD(OverlayManagerTests, + "mutual qset peers bypass authenticated caps", + "[overlay][OverlayManager]") +{ + testQsetPeerAuthenticatedCapExemption(); +} + +TEST_CASE_METHOD(OverlayManagerTests, + "mutual qset peers do not consume outbound slots", + "[overlay][OverlayManager]") +{ + testQsetPeersDoNotConsumeOutboundSlots(); +} + +TEST_CASE_METHOD(OverlayManagerTests, + "connectToQsetPeers probes inbound candidates", + "[overlay][OverlayManager]") +{ + testQsetPeerDiscoveryProbesInboundCandidates(); +} + +TEST_CASE_METHOD(OverlayManagerTests, + "connectToQsetPeers records non-qset probes", + "[overlay][OverlayManager]") +{ + testNonQsetProbesAreRecorded(); +} + +TEST_CASE_METHOD(OverlayManagerTests, + "connectToQsetPeers skips known non-mutual qset peers", + "[overlay][OverlayManager]") +{ + testKnownNonMutualQsetPeersAreNotProbed(); +} } diff --git a/src/overlay/test/OverlayTests.cpp b/src/overlay/test/OverlayTests.cpp index 587bb5298..4fb9f3bda 100644 --- a/src/overlay/test/OverlayTests.cpp +++ b/src/overlay/test/OverlayTests.cpp @@ -97,6 +97,164 @@ TEST_CASE("loopback peer hello", "[overlay][connections]") testutil::shutdownWorkScheduler(*app1); } +static void +addDirectQsetPeer(Config& cfg, Config const& peerCfg) +{ + cfg.QUORUM_SET.validators.push_back(peerCfg.NODE_SEED.getPublicKey()); +} + +TEST_CASE("mutual direct qset peers are preferred", "[overlay][connections]") +{ + VirtualClock clock; + auto cfg1 = getTestConfig(0); + auto cfg2 = getTestConfig(1); + addDirectQsetPeer(cfg1, cfg2); + addDirectQsetPeer(cfg2, cfg1); + + auto app1 = createTestApplication(clock, cfg1); + auto app2 = createTestApplication(clock, cfg2); + + LoopbackPeerConnection conn(*app1, *app2); + testutil::crankSome(clock); + + REQUIRE(conn.getInitiator()->isAuthenticatedForTesting()); + REQUIRE(conn.getAcceptor()->isAuthenticatedForTesting()); + REQUIRE(knowsAsPreferred(*app1, *app2)); + REQUIRE(knowsAsPreferred(*app2, *app1)); + + testutil::shutdownWorkScheduler(*app2); + testutil::shutdownWorkScheduler(*app1); +} + +TEST_CASE("asymmetric direct qset peers are not preferred", + "[overlay][connections]") +{ + VirtualClock clock; + auto cfg1 = getTestConfig(0); + auto cfg2 = getTestConfig(1); + addDirectQsetPeer(cfg1, cfg2); + + auto app1 = createTestApplication(clock, cfg1); + auto app2 = createTestApplication(clock, cfg2); + + LoopbackPeerConnection conn(*app1, *app2); + testutil::crankSome(clock); + + REQUIRE(conn.getInitiator()->isAuthenticatedForTesting()); + REQUIRE(conn.getAcceptor()->isAuthenticatedForTesting()); + REQUIRE(knowsAsOutbound(*app1, *app2)); + REQUIRE(knowsAsInbound(*app2, *app1)); + + testutil::shutdownWorkScheduler(*app2); + testutil::shutdownWorkScheduler(*app1); +} + +TEST_CASE("mixed overlay versions do not prefer direct qset peers", + "[overlay][connections]") +{ + VirtualClock clock; + auto cfg1 = getTestConfig(0); + auto cfg2 = getTestConfig(1); + cfg2.OVERLAY_PROTOCOL_VERSION = 41; + addDirectQsetPeer(cfg1, cfg2); + addDirectQsetPeer(cfg2, cfg1); + + auto app1 = createTestApplication(clock, cfg1); + auto app2 = createTestApplication(clock, cfg2); + + LoopbackPeerConnection conn(*app1, *app2); + testutil::crankSome(clock); + + REQUIRE(conn.getInitiator()->isAuthenticatedForTesting()); + REQUIRE(conn.getAcceptor()->isAuthenticatedForTesting()); + REQUIRE(knowsAsOutbound(*app1, *app2)); + REQUIRE(knowsAsInbound(*app2, *app1)); + + auto info = app1->getOverlayManager().getQuorumPeerState().getInfo( + cfg2.NODE_SEED.getPublicKey()); + REQUIRE(info); + REQUIRE(info->remoteRole == RemoteQsetRole::Unknown); + + testutil::shutdownWorkScheduler(*app2); + testutil::shutdownWorkScheduler(*app1); +} + +TEST_CASE("mutual direct qset peer coexists with watcher beyond cap", + "[overlay][connections]") +{ + VirtualClock clock; + auto cfg1 = getTestConfig(0); + auto cfg2 = getTestConfig(1); + auto cfg3 = getTestConfig(2); + cfg2.MAX_ADDITIONAL_PEER_CONNECTIONS = 1; + cfg2.TARGET_PEER_CONNECTIONS = 0; + addDirectQsetPeer(cfg2, cfg3); + addDirectQsetPeer(cfg3, cfg2); + + auto app1 = createTestApplication(clock, cfg1); + auto app2 = createTestApplication(clock, cfg2); + auto app3 = createTestApplication(clock, cfg3); + + LoopbackPeerConnection watcher(*app1, *app2); + testutil::crankSome(clock); + REQUIRE(watcher.getInitiator()->isAuthenticatedForTesting()); + REQUIRE(watcher.getAcceptor()->isAuthenticatedForTesting()); + + LoopbackPeerConnection qsetPeer(*app3, *app2); + testutil::crankSome(clock); + + REQUIRE(watcher.getInitiator()->isConnectedForTesting()); + REQUIRE(watcher.getAcceptor()->isConnectedForTesting()); + REQUIRE(qsetPeer.getInitiator()->isAuthenticatedForTesting()); + REQUIRE(qsetPeer.getAcceptor()->isAuthenticatedForTesting()); + REQUIRE(app2->getOverlayManager().getInboundAuthenticatedPeers().size() > + cfg2.MAX_ADDITIONAL_PEER_CONNECTIONS); + REQUIRE(knowsAsPreferred(*app2, *app3)); + + testutil::shutdownWorkScheduler(*app3); + testutil::shutdownWorkScheduler(*app2); + testutil::shutdownWorkScheduler(*app1); +} + +TEST_CASE("mutual direct qset preference persists across restart", + "[overlay][connections]") +{ + VirtualClock clock; + auto cfg1 = getTestConfig(100, Config::TESTDB_BUCKET_DB_PERSISTENT); + auto cfg2 = getTestConfig(101); + addDirectQsetPeer(cfg1, cfg2); + addDirectQsetPeer(cfg2, cfg1); + + { + auto app1 = createTestApplication(clock, cfg1); + auto app2 = createTestApplication(clock, cfg2); + + LoopbackPeerConnection conn(*app1, *app2); + testutil::crankSome(clock); + + REQUIRE(conn.getInitiator()->isAuthenticatedForTesting()); + auto info = app1->getOverlayManager().getQuorumPeerState().getInfo( + cfg2.NODE_SEED.getPublicKey()); + REQUIRE(info); + REQUIRE(info->remoteRole == RemoteQsetRole::Direct); + REQUIRE(knowsAsPreferred(*app1, *app2)); + + testutil::shutdownWorkScheduler(*app2); + testutil::shutdownWorkScheduler(*app1); + } + + auto restarted = createTestApplication(clock, cfg1, /*newDB*/ false, + /*startApp*/ false); + auto& overlayManager = + static_cast(restarted->getOverlayManager()); + overlayManager.reconcileQuorumPeerState(); + overlayManager.seedQuorumPeerAddresses(); + auto peerRecord = restarted->getOverlayManager().getPeerManager().load( + PeerBareAddress{"127.0.0.1", cfg2.PEER_PORT}); + REQUIRE(peerRecord.second); + REQUIRE(peerRecord.first.mType == static_cast(PeerType::PREFERRED)); +} + TEST_CASE("loopback peer with 0 port", "[overlay][connections]") { VirtualClock clock; @@ -2471,6 +2629,10 @@ TEST_CASE("disconnected topology recovery", "[overlay][simulation]") return initCfg; } auto cfg = cfgs[i - 1]; + // This legacy recovery test contrasts plain KNOWN_PEERS with + // configured PREFERRED_PEERS. Pin it below v42 so mutual qset + // peering does not make validator qset peers preferred. + cfg.OVERLAY_PROTOCOL_VERSION = 41; cfg.TARGET_PEER_CONNECTIONS = 1; if (usePreferred) { diff --git a/src/overlay/test/OverlayTopologyTests.cpp b/src/overlay/test/OverlayTopologyTests.cpp index c59f1fb49..c2a6e30b2 100644 --- a/src/overlay/test/OverlayTopologyTests.cpp +++ b/src/overlay/test/OverlayTopologyTests.cpp @@ -208,6 +208,10 @@ TEST_CASE("peer churn", "[overlay][connectivity][!hide]") cfg.TARGET_PEER_CONNECTIONS = static_cast(maxOutbound); cfg.MAX_ADDITIONAL_PEER_CONNECTIONS = maxInbound; cfg.KNOWN_PEERS = peers; + // This legacy churn test expects all known peers to remain ordinary + // peers. Pin below v42 so validator qset peers are not promoted by + // mutual qset peering. + cfg.OVERLAY_PROTOCOL_VERSION = 41; cfg.RUN_STANDALONE = false; cfg.MODE_DOES_CATCHUP = false; diff --git a/src/overlay/test/PeerManagerTests.cpp b/src/overlay/test/PeerManagerTests.cpp index 86586e351..0be9c67ba 100644 --- a/src/overlay/test/PeerManagerTests.cpp +++ b/src/overlay/test/PeerManagerTests.cpp @@ -539,4 +539,53 @@ TEST_CASE("purge peer table", "[overlay][PeerManager]") peerManager.removePeersWithManyFailures(2, &localhost2); REQUIRE(!peerManager.load(localhost(2)).second); } + +TEST_CASE("preferred peers are not purged for repeated failures", + "[overlay][PeerManager]") +{ + VirtualClock clock; + auto app = createTestApplication(clock, getTestConfig()); + auto& peerManager = app->getOverlayManager().getPeerManager(); + auto record = [&app](PeerType type) { + return PeerRecord{ + VirtualClock::systemPointToTm(app->getClock().system_now()), 10, + static_cast(type)}; + }; + + peerManager.store(localhost(1), record(PeerType::PREFERRED), false); + peerManager.store(localhost(2), record(PeerType::OUTBOUND), false); + peerManager.store(localhost(3), record(PeerType::INBOUND), false); + + peerManager.removePeersWithManyFailures(3); + + REQUIRE(peerManager.load(localhost(1)).second); + REQUIRE(!peerManager.load(localhost(2)).second); + REQUIRE(!peerManager.load(localhost(3)).second); +} + +TEST_CASE("preferred peer backoff cap is lower", "[overlay][PeerManager]") +{ + REQUIRE(PeerManager::maxBackoffExponent(PeerType::PREFERRED) < + PeerManager::maxBackoffExponent(PeerType::OUTBOUND)); + REQUIRE(PeerManager::maxBackoffExponent(PeerType::PREFERRED) < + PeerManager::maxBackoffExponent(PeerType::INBOUND)); + + VirtualClock clock; + auto app = createTestApplication(clock, getTestConfig()); + auto& peerManager = app->getOverlayManager().getPeerManager(); + auto now = app->getClock().system_now(); + auto addr = localhost(1); + peerManager.store(addr, + {VirtualClock::systemPointToTm(now), 10, + static_cast(PeerType::PREFERRED)}, + false); + + peerManager.update(addr, PeerManager::BackOffUpdate::INCREASE); + auto updated = peerManager.load(addr).first; + auto delay = VirtualClock::tmToSystemPoint(updated.mNextAttempt) - now; + auto maxPreferredDelay = std::chrono::seconds{ + 10 * (1 << PeerManager::maxBackoffExponent(PeerType::PREFERRED))}; + + REQUIRE(delay <= maxPreferredDelay); +} } diff --git a/src/overlay/test/QuorumPeerStateTests.cpp b/src/overlay/test/QuorumPeerStateTests.cpp new file mode 100644 index 000000000..eaed59a5b --- /dev/null +++ b/src/overlay/test/QuorumPeerStateTests.cpp @@ -0,0 +1,92 @@ +// Copyright 2026 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#include "crypto/SecretKey.h" +#include "overlay/QuorumPeerState.h" +#include "test/Catch2.h" + +namespace stellar +{ + +namespace +{ + +NodeID +nodeID() +{ + return SecretKey::pseudoRandomForTesting().getPublicKey(); +} + +} + +TEST_CASE("quorum peer state reconcile", "[overlay][QuorumPeerState]") +{ + auto n1 = nodeID(); + auto n2 = nodeID(); + auto n3 = nodeID(); + + QuorumPeerState state; + state.reconcile({n1, n2}); + + REQUIRE(state.getInfo().size() == 2); + REQUIRE(state.getInfo(n1)->remoteRole == RemoteQsetRole::Unknown); + REQUIRE(state.getInfo(n2)->remoteRole == RemoteQsetRole::Unknown); + + state.recordHandshake(n1, RemoteQsetRole::Direct, + PeerBareAddress{"127.0.0.1", 11625}, 10); + state.reconcile({n2, n3}); + + REQUIRE(!state.getInfo(n1)); + REQUIRE(state.getInfo(n2)); + REQUIRE(state.getInfo(n3)); + REQUIRE(state.getInfo(n3)->remoteRole == RemoteQsetRole::Unknown); +} + +TEST_CASE("quorum peer state json round-trip", "[overlay][QuorumPeerState]") +{ + auto n1 = nodeID(); + QuorumPeerState state; + state.reconcile({n1}); + state.recordHandshake(n1, RemoteQsetRole::Direct, + PeerBareAddress{"127.0.0.1", 11625}, 123); + + auto restored = QuorumPeerState::fromJson(state.toJson()); + auto info = restored.getInfo(n1); + + REQUIRE(info); + REQUIRE(info->remoteRole == RemoteQsetRole::Direct); + REQUIRE(info->address); + REQUIRE(info->address->toString() == "127.0.0.1:11625"); + REQUIRE(info->lastConnection == 123); +} + +TEST_CASE("quorum peer state expires stale addresses", + "[overlay][QuorumPeerState]") +{ + auto fresh = nodeID(); + auto stale = nodeID(); + + QuorumPeerState state; + state.reconcile({fresh, stale}); + state.recordHandshake(fresh, RemoteQsetRole::Direct, + PeerBareAddress{"127.0.0.1", 11625}, 90); + state.recordHandshake(stale, RemoteQsetRole::Direct, + PeerBareAddress{"127.0.0.1", 11626}, 10); + + auto expired = state.expireStaleAddresses(100, std::chrono::seconds{50}); + REQUIRE(expired.size() == 1); + REQUIRE(expired.front().first == stale); + + auto staleInfo = state.getInfo(stale); + REQUIRE(staleInfo); + REQUIRE(staleInfo->remoteRole == RemoteQsetRole::Unknown); + REQUIRE(!staleInfo->address); + + auto freshInfo = state.getInfo(fresh); + REQUIRE(freshInfo); + REQUIRE(freshInfo->remoteRole == RemoteQsetRole::Direct); + REQUIRE(freshInfo->address); +} + +} diff --git a/src/protocol-curr/xdr b/src/protocol-curr/xdr index 68fa1ac55..c9c313bb5 160000 --- a/src/protocol-curr/xdr +++ b/src/protocol-curr/xdr @@ -1 +1 @@ -Subproject commit 68fa1ac55692f68ad2a2ca549d0a283273554439 +Subproject commit c9c313bb5cbbe521abc56af63d23f67744d57ded