diff --git a/daemon/main/Options.cpp b/daemon/main/Options.cpp index 632f61512..aa183632f 100644 --- a/daemon/main/Options.cpp +++ b/daemon/main/Options.cpp @@ -1004,10 +1004,11 @@ void Options::InitServers() const char* ncipher = GetOption(BString<100>("Server%i.Cipher", n)); const char* nconnections = GetOption(BString<100>("Server%i.Connections", n)); + const char* npipelinedepth = GetOption(BString<100>("Server%i.PipelineDepth", n)); const char* nretention = GetOption(BString<100>("Server%i.Retention", n)); bool definition = nactive || nname || nlevel || ngroup || nhost || nport || noptional || - nusername || npassword || nconnections || njoingroup || ntls || ncipher || nretention; + nusername || npassword || nconnections || npipelinedepth || njoingroup || ntls || ncipher || nretention; bool completed = nhost && nport && nconnections; if (!definition) @@ -1026,6 +1027,7 @@ void Options::InitServers() nusername, npassword, joinGroup, tls, ncipher, nconnections ? atoi(nconnections) : 1, + npipelinedepth ? atoi(npipelinedepth) : 2, nretention ? atoi(nretention) : 0, nlevel ? atoi(nlevel) : 0, ngroup ? atoi(ngroup) : 0, @@ -1533,10 +1535,10 @@ bool Options::ValidateOptionName(const char* optname, const char* optvalue) !strcasecmp(p, ".port") || !strcasecmp(p, ".username") || !strcasecmp(p, ".password") || !strcasecmp(p, ".joingroup") || !strcasecmp(p, ".encryption") || !strcasecmp(p, ".connections") || - !strcasecmp(p, ".cipher") || !strcasecmp(p, ".group") || - !strcasecmp(p, ".retention") || !strcasecmp(p, ".optional") || - !strcasecmp(p, ".notes") || !strcasecmp(p, ".ipversion") || - !strcasecmp(p, ".certverification"))) + !strcasecmp(p, ".pipelinedepth") || !strcasecmp(p, ".cipher") || + !strcasecmp(p, ".group") || !strcasecmp(p, ".retention") || + !strcasecmp(p, ".optional") || !strcasecmp(p, ".notes") || + !strcasecmp(p, ".ipversion") || !strcasecmp(p, ".certverification"))) { return true; } diff --git a/daemon/main/Options.h b/daemon/main/Options.h index 92e4c9bc8..f7a0c34b1 100644 --- a/daemon/main/Options.h +++ b/daemon/main/Options.h @@ -351,7 +351,7 @@ class Options public: virtual void AddNewsServer(int id, bool active, const char* name, const char* host, int port, int ipVersion, const char* user, const char* pass, bool joinGroup, - bool tls, const char* cipher, int maxConnections, int retention, + bool tls, const char* cipher, int maxConnections, int pipelineDepth, int retention, int level, int group, bool optional, unsigned int certVerificationfLevel) = 0; virtual void AddFeed( [[maybe_unused]] int id, diff --git a/daemon/main/nzbget.cpp b/daemon/main/nzbget.cpp index 37dd978d9..9f1c4a908 100644 --- a/daemon/main/nzbget.cpp +++ b/daemon/main/nzbget.cpp @@ -185,7 +185,7 @@ class NZBGet : public Options::Extender // Options::Extender void AddNewsServer(int id, bool active, const char* name, const char* host, int port, int ipVersion, const char* user, const char* pass, bool joinGroup, - bool tls, const char* cipher, int maxConnections, int retention, + bool tls, const char* cipher, int maxConnections, int pipelineDepth, int retention, int level, int group, bool optional, unsigned int certVerificationfLevel) override; void AddFeed(int id, const char* name, const char* url, int interval, const char* filter, bool backlog, bool pauseNzb, const char* category, @@ -1106,11 +1106,11 @@ void NZBGet::Daemonize() void NZBGet::AddNewsServer(int id, bool active, const char* name, const char* host, int port, int ipVersion, const char* user, const char* pass, bool joinGroup, bool tls, - const char* cipher, int maxConnections, int retention, int level, int group, bool optional, + const char* cipher, int maxConnections, int pipelineDepth, int retention, int level, int group, bool optional, unsigned int certVerificationfLevel) { m_serverPool->AddServer(std::make_unique(id, active, name, host, port, ipVersion, user, pass, joinGroup, - tls, cipher, maxConnections, retention, level, group, optional, certVerificationfLevel)); + tls, cipher, maxConnections, pipelineDepth, retention, level, group, optional, certVerificationfLevel)); } void NZBGet::AddFeed(int id, const char* name, const char* url, int interval, const char* filter, diff --git a/daemon/nntp/ArticleDownloader.cpp b/daemon/nntp/ArticleDownloader.cpp index f0c122206..8b9eacff8 100644 --- a/daemon/nntp/ArticleDownloader.cpp +++ b/daemon/nntp/ArticleDownloader.cpp @@ -20,6 +20,8 @@ */ +#include + #include "nzbget.h" #include "ArticleDownloader.h" #include "ArticleWriter.h" @@ -31,6 +33,103 @@ #include "StatMeter.h" #include "Util.h" +static ArticleInfo* ReserveNextPipelinedArticle(FileInfo* fileInfo, ArticleInfo* currentArticle) +{ + if (!fileInfo || !currentArticle) + { + return nullptr; + } + + ArticleInfo* candidate = nullptr; + int currentPart = currentArticle->GetPartNumber(); + + GuardedDownloadQueue guard = DownloadQueue::Guard(); + for (auto& articlePtr : *fileInfo->GetArticles()) + { + ArticleInfo* article = articlePtr.get(); + if (!article || article->GetStatus() != ArticleInfo::aiUndefined) + { + continue; + } + + if (article->GetPartNumber() > currentPart) + { + if (!candidate || article->GetPartNumber() < candidate->GetPartNumber()) + { + candidate = article; + } + } + } + + if (!candidate) + { + for (auto& articlePtr : *fileInfo->GetArticles()) + { + ArticleInfo* article = articlePtr.get(); + if (!article || article->GetStatus() != ArticleInfo::aiUndefined) + { + continue; + } + if (!candidate || article->GetPartNumber() < candidate->GetPartNumber()) + { + candidate = article; + } + } + } + + if (candidate) + { + candidate->SetStatus(ArticleInfo::aiRunning); + } + + return candidate; +} + +static void RestorePipelinedArticle(ArticleInfo* article) +{ + if (!article) + { + return; + } + + GuardedDownloadQueue guard = DownloadQueue::Guard(); + if (article->GetStatus() == ArticleInfo::aiRunning) + { + article->SetStatus(ArticleInfo::aiUndefined); + } +} + +static void UpdateArticleCompletion(FileInfo* fileInfo, NzbInfo* nzbInfo, ArticleInfo* articleInfo, bool success) +{ + if (!fileInfo || !nzbInfo || !articleInfo) + { + return; + } + + if (success) + { + articleInfo->SetStatus(ArticleInfo::aiFinished); + nzbInfo->SetParCurrentSuccessSize(nzbInfo->GetParCurrentSuccessSize() + (fileInfo->GetParFile() ? articleInfo->GetSize() : 0)); + fileInfo->SetSuccessArticles(fileInfo->GetSuccessArticles() + 1); + nzbInfo->SetCurrentSuccessArticles(nzbInfo->GetCurrentSuccessArticles() + 1); + } + else + { + articleInfo->SetStatus(ArticleInfo::aiFailed); + + // Rollback: Subtract the bytes that were added to "Success" + // during the loop and move them to the "Failed" bucket + fileInfo->SetSuccessSize(fileInfo->GetSuccessSize() - articleInfo->GetSize()); + nzbInfo->SetCurrentSuccessSize(nzbInfo->GetCurrentSuccessSize() - articleInfo->GetSize()); + + fileInfo->SetFailedSize(fileInfo->GetFailedSize() + articleInfo->GetSize()); + nzbInfo->SetCurrentFailedSize(nzbInfo->GetCurrentFailedSize() + articleInfo->GetSize()); + nzbInfo->SetParCurrentFailedSize(nzbInfo->GetParCurrentFailedSize() + (fileInfo->GetParFile() ? articleInfo->GetSize() : 0)); + fileInfo->SetFailedArticles(fileInfo->GetFailedArticles() + 1); + } + fileInfo->SetCompletedArticles(fileInfo->GetCompletedArticles() + 1); +} + ArticleDownloader::ArticleDownloader() { debug("Creating ArticleDownloader"); @@ -157,28 +256,17 @@ void ArticleDownloader::Run() if (status == adFinished || status == adFailed || status == adNotFound || status == adCrcError) { - int serverId = newsServer->GetId(); - int success = status == adFinished ? 1 : 0; - int failed = status == adFinished ? 0 : 1; - m_serverStats.StatOp(serverId, success, failed, ServerStatList::soSet); - ServerVolume::Stats stats; - stats.bytes = 0; - stats.articles.failed = failed; - stats.articles.success = success; - g_StatMeter->AddServerStats(stats, serverId); + for (ServerStat& serverStat : m_serverStats) + { + ServerVolume::Stats stats; + stats.bytes = 0; + stats.articles.failed = serverStat.GetFailedArticles(); + stats.articles.success = serverStat.GetSuccessArticles(); + g_StatMeter->AddServerStats(stats, serverStat.GetServerId()); + } } } - if (m_connection) - { - AddServerStats(); - } - - if (!connected && m_connection) - { - detail("Article %s @ %s failed: could not establish connection", *m_infoName, *m_connectionName); - } - if (status == adConnectError) { connected = false; @@ -295,7 +383,7 @@ void ArticleDownloader::Run() status = adFailed; } - if (IsStopped()) + if (IsStopped() && status != adFinished) { detail("Download %s cancelled", *m_infoName); status = adRetry; @@ -314,7 +402,7 @@ void ArticleDownloader::Run() ArticleDownloader::EStatus ArticleDownloader::Download() { - const char* response = nullptr; + const char* articleResponse = nullptr; EStatus status = adRunning; m_writingStarted = false; m_articleInfo->SetCrc(0); @@ -329,106 +417,211 @@ ArticleDownloader::EStatus ArticleDownloader::Download() // change group for (CString& group : m_fileInfo->GetGroups()) { - response = m_connection->JoinGroup(group); - if (response && !strncmp(response, "2", 1)) + articleResponse = m_connection->JoinGroup(group); + if (articleResponse && !strncmp(articleResponse, "2", 1)) { break; } } - status = CheckResponse(response, "could not join group"); + status = CheckResponse(articleResponse, "could not join group"); if (status != adFinished) { return status; } } - // retrieve article - response = m_connection->Request(BString<1024>("%s %s\r\n", - g_Options->GetRawArticle() ? "ARTICLE" : "BODY", m_articleInfo->GetMessageId())); + struct PipelineEntry + { + ArticleInfo* article; + CString request; + PipelineEntry(ArticleInfo* article_, CString&& request_) noexcept : article(article_), request(std::move(request_)) {} + }; + + std::deque pipeline; + ArticleInfo* currentArticle = m_articleInfo; + ArticleInfo* originalArticle = m_articleInfo; + CString request; + bool firstResponseReady = true; + + auto RestorePipeline = [&pipeline]() { + for (size_t i = 1; i < pipeline.size(); ++i) + { + RestorePipelinedArticle(pipeline[i].article); + } + pipeline.clear(); + }; - status = CheckResponse(response, "could not fetch article"); + auto FillPipeline = [&]() { + int pipelineDepth = m_connection->GetNewsServer()->GetPipelineDepth(); + if (pipelineDepth < 1) + { + pipelineDepth = 1; + } + while ((int)pipeline.size() < pipelineDepth) + { + ArticleInfo* tailArticle = pipeline.empty() ? currentArticle : pipeline.back().article; + ArticleInfo* nextArticle = ReserveNextPipelinedArticle(m_fileInfo, tailArticle); + if (!nextArticle) + { + break; + } + + CString nextRequest; + nextRequest.Format("%s %s\r\n", + g_Options->GetRawArticle() ? "ARTICLE" : "BODY", nextArticle->GetMessageId()); + if (!m_connection->SendRequest(nextRequest)) + { + AddServerStats(); + RestorePipelinedArticle(nextArticle); + break; + } + + pipeline.emplace_back(nextArticle, std::move(nextRequest)); + } + }; + + request.Format("%s %s\r\n", + g_Options->GetRawArticle() ? "ARTICLE" : "BODY", currentArticle->GetMessageId()); + articleResponse = m_connection->Request(request); + status = CheckResponse(articleResponse, "could not fetch article"); if (status != adFinished) { return status; } - m_decoder.Clear(); - m_decoder.SetCrcCheck(g_Options->GetCrcCheck()); - m_decoder.SetRawMode(g_Options->GetRawArticle()); + pipeline.emplace_back(currentArticle, std::move(request)); + FillPipeline(); - status = adRunning; - CharBuffer lineBuf(g_Options->GetArticleReadChunkSize()); - - while (!IsStopped() && !m_decoder.GetEof()) + while (!IsStopped() && !pipeline.empty()) { - // throttle the bandwidth - while (!IsStopped() && (g_WorkState->GetSpeedLimit() > 0.0f) && - (g_StatMeter->CalcCurrentDownloadSpeed() > g_WorkState->GetSpeedLimit() || - g_StatMeter->CalcMomentaryDownloadSpeed() > g_WorkState->GetSpeedLimit())) + currentArticle = pipeline.front().article; + if (currentArticle != originalArticle) { - SetLastUpdateTimeNow(); - Util::Sleep(10); + m_articleWriter.SetArticleInfo(currentArticle); + m_articleInfo = currentArticle; + if (m_contentAnalyzer) + { + m_contentAnalyzer.reset(); + } + } + + currentArticle->SetCrc(0); + + if (!firstResponseReady) + { + articleResponse = m_connection->ReadResponseLine(pipeline.front().request); } + firstResponseReady = false; - char* buffer; - int len; - m_connection->ReadBuffer(&buffer, &len); - if (len == 0) + status = CheckResponse(articleResponse, "could not fetch article"); + if (status != adFinished) { - len = m_connection->TryRecv(lineBuf, lineBuf.Size()); - buffer = lineBuf; + AddServerStats(); + RestorePipeline(); + break; } - // have we encountered a timeout? - if (len <= 0) + m_decoder.Clear(); + m_decoder.SetCrcCheck(g_Options->GetCrcCheck()); + m_decoder.SetRawMode(g_Options->GetRawArticle()); + + status = adRunning; + CharBuffer lineBuf(g_Options->GetArticleReadChunkSize()); + + while (!IsStopped() && !m_decoder.GetEof()) { - if (!IsStopped()) + while (!IsStopped() && (g_WorkState->GetSpeedLimit() > 0.0f) && + (g_StatMeter->CalcCurrentDownloadSpeed() > g_WorkState->GetSpeedLimit() || + g_StatMeter->CalcMomentaryDownloadSpeed() > g_WorkState->GetSpeedLimit())) { - detail("Article %s @ %s failed: Unexpected end of article", *m_infoName, *m_connectionName); + SetLastUpdateTimeNow(); + Util::Sleep(10); } + + int bytesRead = 0; + char* buffer = m_connection->ReadLine(lineBuf, lineBuf.Size(), &bytesRead); + if (!buffer || bytesRead <= 0) + { + if (!IsStopped()) + { + detail("Article %s @ %s failed: Unexpected end of article", *m_infoName, *m_connectionName); + } + status = adFailed; + break; + } + + g_StatMeter->AddSpeedReading(bytesRead); + SetLastUpdateTimeNow(); + + // Update NZB progress in real-time + m_fileInfo->SetSuccessSize(m_fileInfo->GetSuccessSize() + bytesRead); + m_fileInfo->GetNzbInfo()->SetCurrentSuccessSize(m_fileInfo->GetNzbInfo()->GetCurrentSuccessSize() + bytesRead); + + AddServerStats(); + + int len = m_decoder.DecodeBuffer(buffer, bytesRead); + if (len > 0 && !Write(buffer, len)) + { + status = adFatalError; + break; + } + } + + if (IsStopped()) + { status = adFailed; - break; } - g_StatMeter->AddSpeedReading(len); - time_t oldTime = m_lastUpdateTime; - SetLastUpdateTimeNow(); - if (oldTime != m_lastUpdateTime) + if (status == adRunning) { - AddServerStats(); + status = DecodeCheck(); } - // decode article data - len = m_decoder.DecodeBuffer(buffer, len); + if (m_writingStarted) + { + m_articleWriter.Finish(status == adFinished); + m_writingStarted = false; + } - // write to output file - if (len > 0 && !Write(buffer, len)) + if (status == adFinished) { - status = adFatalError; + if (m_infoName.Empty()) + { + SetInfoName(currentArticle->GetResultFilename()); + } + detail("Successfully downloaded %s", *m_infoName); + UpdateArticleCompletion(m_fileInfo, m_fileInfo->GetNzbInfo(), currentArticle, true); + int serverId = m_connection->GetNewsServer()->GetId(); + m_serverStats.StatOp(serverId, 1, 0, ServerStatList::soAdd); + } + else + { + UpdateArticleCompletion(m_fileInfo, m_fileInfo->GetNzbInfo(), currentArticle, false); + int serverId = m_connection->GetNewsServer()->GetId(); + m_serverStats.StatOp(serverId, 0, 1, ServerStatList::soAdd); + AddServerStats(); + RestorePipeline(); break; } - } - - if (IsStopped()) - { - status = adFailed; - } - if (status == adRunning) - { - FreeConnection(true); - status = DecodeCheck(); - } + pipeline.pop_front(); + if (pipeline.empty()) + { + break; + } - if (m_writingStarted) - { - m_articleWriter.Finish(status == adFinished); + FillPipeline(); + currentArticle = pipeline.front().article; + m_articleInfo = currentArticle; + BString<1024> infoName("%s%c%s [%i/%i]", m_fileInfo->GetNzbInfo()->GetName(), PATH_SEPARATOR, + m_fileInfo->GetFilename(), currentArticle->GetPartNumber(), (int)m_fileInfo->GetArticles()->size()); + SetInfoName(infoName); } - if (status == adFinished) + if (IsStopped()) { - detail("Successfully downloaded %s", *m_infoName); + status = adFailed; } return status; diff --git a/daemon/nntp/NewsServer.cpp b/daemon/nntp/NewsServer.cpp index 9fca85354..b9c2d96bc 100644 --- a/daemon/nntp/NewsServer.cpp +++ b/daemon/nntp/NewsServer.cpp @@ -24,10 +24,10 @@ NewsServer::NewsServer(int id, bool active, const char* name, const char* host, int port, int ipVersion, const char* user, const char* pass, bool joinGroup, bool tls, const char* cipher, - int maxConnections, int retention, int level, int group, bool optional, unsigned int certVerificationfLevel) : + int maxConnections, int pipelineDepth, int retention, int level, int group, bool optional, unsigned int certVerificationfLevel) : m_id(id), m_active(active), m_name(name), m_host(host ? host : ""), m_port(port), m_ipVersion(ipVersion), m_user(user ? user : ""), m_password(pass ? pass : ""), m_joinGroup(joinGroup), m_tls(tls), - m_cipher(cipher ? cipher : ""), m_maxConnections(maxConnections), m_retention(retention), + m_cipher(cipher ? cipher : ""), m_maxConnections(maxConnections), m_pipelineDepth(pipelineDepth), m_retention(retention), m_level(level), m_normLevel(level), m_group(group), m_optional(optional), m_certVerificationfLevel(certVerificationfLevel) { if (m_name.Empty()) diff --git a/daemon/nntp/NewsServer.h b/daemon/nntp/NewsServer.h index b82bdf5ee..1f4a15c51 100644 --- a/daemon/nntp/NewsServer.h +++ b/daemon/nntp/NewsServer.h @@ -35,7 +35,7 @@ class NewsServer public: NewsServer(int id, bool active, const char* name, const char* host, int port, int ipVersion, const char* user, const char* pass, bool joinGroup, - bool tls, const char* cipher, int maxConnections, int retention, + bool tls, const char* cipher, int maxConnections, int pipelineDepth, int retention, int level, int group, bool optional, unsigned int certVerificationfLevel); int GetId() const { return m_id; } int GetStateId() const { return m_stateId; } @@ -50,6 +50,7 @@ class NewsServer const char* GetUser() const { return m_user; } const char* GetPassword() const { return m_password; } int GetMaxConnections() const { return m_maxConnections; } + int GetPipelineDepth() const { return m_pipelineDepth; } int GetLevel() const { return m_level; } int GetNormLevel() const { return m_normLevel; } void SetNormLevel(int level) { m_normLevel = level; } @@ -76,6 +77,7 @@ class NewsServer bool m_tls; CString m_cipher; int m_maxConnections; + int m_pipelineDepth; int m_retention; int m_level; int m_normLevel; diff --git a/daemon/nntp/NntpConnection.cpp b/daemon/nntp/NntpConnection.cpp index 05e646379..6dc2f5728 100644 --- a/daemon/nntp/NntpConnection.cpp +++ b/daemon/nntp/NntpConnection.cpp @@ -75,6 +75,51 @@ const char* NntpConnection::Request(const char* req) return answer; } +bool NntpConnection::SendRequest(const char* req) +{ + if (!req) + { + return false; + } + + if (m_status != csConnected) + { + return false; + } + + return WriteLine(req) > 0; +} + +const char* NntpConnection::ReadResponseLine(const char* pendingRequest) +{ + char* answer = ReadLine(m_lineBuf, m_lineBuf.Size(), nullptr); + + if (!answer) + { + return nullptr; + } + + if (!strncmp(answer, "480", 3) && pendingRequest) + { + debug("%s requested authorization", GetHost()); + + if (!Authenticate()) + { + return nullptr; + } + + // try again for the pending request + if (WriteLine(pendingRequest) <= 0) + { + return nullptr; + } + + answer = ReadLine(m_lineBuf, m_lineBuf.Size(), nullptr); + } + + return answer; +} + bool NntpConnection::Authenticate() { if (strlen(m_newsServer->GetUser()) == 0 || strlen(m_newsServer->GetPassword()) == 0) diff --git a/daemon/nntp/NntpConnection.h b/daemon/nntp/NntpConnection.h index 74cc74b1f..9ebb3b093 100644 --- a/daemon/nntp/NntpConnection.h +++ b/daemon/nntp/NntpConnection.h @@ -34,6 +34,8 @@ class NntpConnection : public Connection virtual bool Disconnect(); NewsServer* GetNewsServer() { return m_newsServer; } const char* Request(const char* req); + bool SendRequest(const char* req); + const char* ReadResponseLine(const char* pendingRequest = nullptr); const char* JoinGroup(const char* grp); bool GetAuthError() { return m_authError; } diff --git a/daemon/queue/QueueCoordinator.cpp b/daemon/queue/QueueCoordinator.cpp index d0bf9ea5e..9c1b2c3f4 100644 --- a/daemon/queue/QueueCoordinator.cpp +++ b/daemon/queue/QueueCoordinator.cpp @@ -738,7 +738,6 @@ void QueueCoordinator::ArticleCompleted(ArticleDownloader* articleDownloader) { nzbInfo->SetPausedSize(nzbInfo->GetPausedSize() - articleInfo->GetSize()); } - fileInfo->SetCompletedArticles(fileInfo->GetCompletedArticles() + 1); fileCompleted = (int)fileInfo->GetArticles()->size() == fileInfo->GetCompletedArticles(); fileInfo->GetServerStats()->ListOp(articleDownloader->GetServerStats(), ServerStatList::soAdd); nzbInfo->GetCurrentServerStats()->ListOp(articleDownloader->GetServerStats(), ServerStatList::soAdd); diff --git a/daemon/remote/XmlRpc.cpp b/daemon/remote/XmlRpc.cpp index 6d77d9076..8493cab05 100644 --- a/daemon/remote/XmlRpc.cpp +++ b/daemon/remote/XmlRpc.cpp @@ -3946,6 +3946,7 @@ void TestServerXmlCommand::Execute() params.encryption, params.cipher.c_str(), 1, + 1, 0, 0, 0, diff --git a/nzbget.conf b/nzbget.conf index 062c53060..efdaf062e 100644 --- a/nzbget.conf +++ b/nzbget.conf @@ -234,6 +234,16 @@ Server1.Cipher= # Maximum number of simultaneous connections to this server (0-999). Server1.Connections=8 +# Maximum number of pipelined article requests to this server (1-999). +# +# NNTP pipelining is a performance optimization that allows sending multiple +# article requests to the server without waiting for each response. This +# improves throughput, especially on connections with higher latency or high +# speeds. Pipeline depth should be tuned carefully. Values that are too high can +# reduce performance due to overhead and buffering. A value of 1 disables +# pipelining. +Server1.PipelineDepth = 2 + # Server retention time (days). # # How long the articles are stored on the news server. The articles @@ -271,6 +281,7 @@ Server1.Notes= #Server2.Password=mypass #Server2.JoinGroup=yes #Server2.Connections=4 +#Server2.PipelineDepth=2 # Third server, on level 1. @@ -281,6 +292,7 @@ Server1.Notes= #Server3.Password=mypass2 #Server3.JoinGroup=yes #Server3.Connections=1 +#Server3.PipelineDepth=1 ############################################################################## diff --git a/tests/main/Options.cpp b/tests/main/Options.cpp index 664552c93..b9fc7823a 100644 --- a/tests/main/Options.cpp +++ b/tests/main/Options.cpp @@ -47,7 +47,7 @@ class OptionsExtenderMock : public Options::Extender protected: void AddNewsServer(int id, bool active, const char* name, const char* host, int port, int ipVersion, const char* user, const char* pass, bool joinGroup, bool tls, - const char* cipher, int maxConnections, int retention, + const char* cipher, int maxConnections, int pipelineDepth, int retention, int level, int group, bool optional, unsigned int certVerificationfLevel) override { m_newsServers++; diff --git a/tests/nntp/NntpConnection.cpp b/tests/nntp/NntpConnection.cpp new file mode 100644 index 000000000..6412d04e4 --- /dev/null +++ b/tests/nntp/NntpConnection.cpp @@ -0,0 +1,151 @@ +/* + * This file is part of nzbget. See . + * + * Copyright (C) 2026 Denis + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include "nzbget.h" +#include "NntpConnection.h" +#include "Connection.h" +#include "NewsServer.h" + +#include +#include +#include +#include +#include + +#ifndef WIN32 +#include +#include +#include +#include +#endif + +static int FindFreeTcpPort() +{ + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock == -1) + { + return 0; + } + + int opt = 1; + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)&opt, sizeof(opt)); + + struct sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = 0; + + if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) + { + closesocket(sock); + return 0; + } + + socklen_t len = sizeof(addr); + if (getsockname(sock, (struct sockaddr*)&addr, &len) != 0) + { + closesocket(sock); + return 0; + } + + int port = ntohs(addr.sin_port); + closesocket(sock); + return port; +} + +static void RunPipeliningServer(int port, std::promise& started) +{ + Connection listener("127.0.0.1", port, false); + listener.SetIPVersion(Connection::ipV4); + if (!listener.Bind()) + { + started.set_value(false); + return; + } + + started.set_value(true); + std::unique_ptr clientConn = listener.Accept(); + if (!clientConn) + { + return; + } + + clientConn->WriteLine("200 NZBGet test server ready\r\n"); + + char line[256]; + int bytesRead = 0; + + if (!clientConn->ReadLine(line, sizeof(line), &bytesRead)) + { + return; + } + BOOST_CHECK_EQUAL(std::string(line), "BODY \r\n"); + clientConn->WriteLine("222 Article 1 retrieved\r\n"); + + if (!clientConn->ReadLine(line, sizeof(line), &bytesRead)) + { + return; + } + BOOST_CHECK_EQUAL(std::string(line), "BODY \r\n"); + clientConn->WriteLine("222 Article 2 retrieved\r\n"); +} + +BOOST_AUTO_TEST_SUITE(NNTPTest) + +BOOST_AUTO_TEST_CASE(NntpConnectionPipeliningTest) +{ + Connection::Init(); + + int port = FindFreeTcpPort(); + BOOST_REQUIRE(port > 0); + + std::promise started; + std::future startedFuture = started.get_future(); + std::thread serverThread(RunPipeliningServer, port, std::ref(started)); + + BOOST_REQUIRE(startedFuture.get()); + + NewsServer newsServer(1, true, "test", "127.0.0.1", port, 4, + "", "", false, false, "", 1, 2, 0, 0, 0, false, 0); + + NntpConnection client(&newsServer); + client.SetIPVersion(Connection::ipV4); + BOOST_REQUIRE(client.Connect()); + + const char* firstResponse = client.Request("BODY \r\n"); + BOOST_REQUIRE(firstResponse); + BOOST_CHECK_EQUAL(strncmp(firstResponse, "222", 3), 0); + + BOOST_CHECK(client.SendRequest("BODY \r\n")); + + const char* secondResponse = client.ReadResponseLine("BODY \r\n"); + BOOST_REQUIRE(secondResponse); + BOOST_CHECK_EQUAL(strncmp(secondResponse, "222", 3), 0); + + client.Disconnect(); + if (serverThread.joinable()) + { + serverThread.join(); + } + + Connection::Final(); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/tests/nntp/ServerPool.cpp b/tests/nntp/ServerPool.cpp index f2ff53015..1f85b1f21 100644 --- a/tests/nntp/ServerPool.cpp +++ b/tests/nntp/ServerPool.cpp @@ -31,7 +31,7 @@ BOOST_AUTO_TEST_SUITE(NNTPTest) void AddTestServer(ServerPool* pool, int id, bool active, int level, bool optional, int group, int connections) { pool->AddServer(std::make_unique(id, active, nullptr, "", 119, 0, - "", "", false, false, nullptr, connections, 0, level, group, optional, Options::cvStrict)); + "", "", false, false, nullptr, connections, 2, 0, level, group, optional, Options::cvStrict)); } void TestBlockServers(int group) diff --git a/tests/nntp/nntp.cmake b/tests/nntp/nntp.cmake index 153617b12..0ba463eae 100644 --- a/tests/nntp/nntp.cmake +++ b/tests/nntp/nntp.cmake @@ -2,4 +2,5 @@ list(APPEND TESTS_SRC ${CMAKE_CURRENT_SOURCE_DIR}/nntp/ServerPool.cpp ${CMAKE_CURRENT_SOURCE_DIR}/nntp/StatMeter.cpp ${CMAKE_CURRENT_SOURCE_DIR}/nntp/Decoder.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/nntp/NntpConnection.cpp ) diff --git a/tests/systemhealth/NewsServerValidator.cpp b/tests/systemhealth/NewsServerValidator.cpp index 57f1a0342..848344715 100644 --- a/tests/systemhealth/NewsServerValidator.cpp +++ b/tests/systemhealth/NewsServerValidator.cpp @@ -32,12 +32,12 @@ struct ServerFixture std::unique_ptr CreateServer( bool active = true, const char* name = "ValidServer", const char* host = "news.example.com", int port = 563, bool tls = true, const char* user = "user", const char* pass = "pass", - int maxConn = 50, int level = 0, int retention = 0, const char* cipher = "", + int maxConn = 50, int pipelineDepth = 2, int level = 0, int retention = 0, const char* cipher = "", int ipVersion = Connection::ipAuto, bool optional = false, int group = 0, int joinGroup = 0, unsigned int certLevel = Options::cvStrict) { return std::make_unique(1, active, name, host, port, ipVersion, user, pass, - (bool)joinGroup, tls, cipher, maxConn, retention, level, + (bool)joinGroup, tls, cipher, maxConn, pipelineDepth, retention, level, group, optional, certLevel); } }; @@ -145,38 +145,38 @@ BOOST_AUTO_TEST_CASE(TestConnections) BOOST_AUTO_TEST_CASE(TestEncryption) { - auto s1 = CreateServer(true, "", "", 563, true, "", "", 50, 0, 0, ""); + auto s1 = CreateServer(true, "", "", 563, true, "", "", 50, 2, 0, 0, ""); BOOST_CHECK(SystemHealth::NewsServer::ServerEncryptionValidator(*s1).Validate().IsOk()); - auto s2 = CreateServer(true, "", "", 119, false, "", "", 50, 0, 0, ""); + auto s2 = CreateServer(true, "", "", 119, false, "", "", 50, 2, 0, 0, ""); BOOST_CHECK(SystemHealth::NewsServer::ServerEncryptionValidator(*s2).Validate().IsWarning()); - auto s3 = CreateServer(true, "", "", 563, true, "", "", 50, 0, 0, "AES"); + auto s3 = CreateServer(true, "", "", 563, true, "", "", 50, 2, 0, 0, "AES"); BOOST_CHECK(SystemHealth::NewsServer::ServerEncryptionValidator(*s3).Validate().IsOk()); - auto s4 = CreateServer(true, "", "", 119, false, "", "", 50, 0, 0, "AES"); + auto s4 = CreateServer(true, "", "", 119, false, "", "", 50, 2, 0, 0, "AES"); BOOST_CHECK(SystemHealth::NewsServer::ServerCipherValidator(*s4).Validate().IsWarning()); } BOOST_AUTO_TEST_CASE(TestRetention) { BOOST_CHECK(SystemHealth::NewsServer::ServerRetentionValidator( - *CreateServer(true, "", "", 0, true, "", "", 50, 0, 0)) + *CreateServer(true, "", "", 0, true, "", "", 50, 2, 0, 0)) .Validate() .IsOk()); BOOST_CHECK(SystemHealth::NewsServer::ServerRetentionValidator( - *CreateServer(true, "", "", 0, true, "", "", 50, 0, 3000)) + *CreateServer(true, "", "", 0, true, "", "", 50, 2, 0, 3000)) .Validate() .IsOk()); BOOST_CHECK(SystemHealth::NewsServer::ServerRetentionValidator( - *CreateServer(true, "", "", 0, true, "", "", 50, 0, 50)) + *CreateServer(true, "", "", 0, true, "", "", 50, 2, 0, 50)) .Validate() .IsWarning()); BOOST_CHECK(SystemHealth::NewsServer::ServerRetentionValidator( - *CreateServer(true, "", "", 0, true, "", "", 50, 0, 20000)) + *CreateServer(true, "", "", 0, true, "", "", 50, 2, 0, 20000)) .Validate() .IsInfo()); } @@ -185,19 +185,19 @@ BOOST_AUTO_TEST_CASE(TestOptional) { BOOST_CHECK( SystemHealth::NewsServer::ServerOptionalValidator( - *CreateServer(true, "", "", 0, true, "", "", 50, 0, 0, "", Connection::ipAuto, false)) + *CreateServer(true, "", "", 0, true, "", "", 50, 2, 0, 0, "", Connection::ipAuto, false)) .Validate() .IsOk()); BOOST_CHECK( SystemHealth::NewsServer::ServerOptionalValidator( - *CreateServer(true, "", "", 0, true, "", "", 50, 1, 0, "", Connection::ipAuto, true)) + *CreateServer(true, "", "", 0, true, "", "", 50, 2, 1, 0, "", Connection::ipAuto, true)) .Validate() .IsOk()); BOOST_CHECK( SystemHealth::NewsServer::ServerOptionalValidator( - *CreateServer(true, "", "", 0, true, "", "", 50, 0, 0, "", Connection::ipAuto, true)) + *CreateServer(true, "", "", 0, true, "", "", 50, 2, 0, 0, "", Connection::ipAuto, true)) .Validate() .IsInfo()); } @@ -205,19 +205,19 @@ BOOST_AUTO_TEST_CASE(TestOptional) BOOST_AUTO_TEST_CASE(TestCertVerification) { BOOST_CHECK(SystemHealth::NewsServer::ServerCertVerificationValidator( - *CreateServer(true, "", "", 563, true, "", "", 50, 0, 0, "", Connection::ipAuto, + *CreateServer(true, "", "", 563, true, "", "", 50, 2, 0, 0, "", Connection::ipAuto, false, 0, 0, Options::cvStrict)) .Validate() .IsOk()); BOOST_CHECK(SystemHealth::NewsServer::ServerCertVerificationValidator( - *CreateServer(true, "", "", 563, true, "", "", 50, 0, 0, "", Connection::ipAuto, + *CreateServer(true, "", "", 563, true, "", "", 50, 2, 0, 0, "", Connection::ipAuto, false, 0, 0, Options::cvNone)) .Validate() .IsWarning()); BOOST_CHECK(SystemHealth::NewsServer::ServerCertVerificationValidator( - *CreateServer(true, "", "", 119, false, "", "", 50, 0, 0, "", + *CreateServer(true, "", "", 119, false, "", "", 50, 2, 0, 0, "", Connection::ipAuto, false, 0, 0, Options::cvNone)) .Validate() .IsOk()); @@ -235,8 +235,8 @@ BOOST_AUTO_TEST_CASE(TestAnyPrimaryServerExists_NoServers) BOOST_AUTO_TEST_CASE(TestAnyPrimaryServerExists_NoActivePrimary) { ::Servers servers; - servers.push_back(CreateServer(false, "Server1", "news1.com", 563, true, "user", "pass", 50, 0)); - servers.push_back(CreateServer(false, "Server2", "news2.com", 563, true, "user", "pass", 50, 1)); + servers.push_back(CreateServer(false, "Server1", "news1.com", 563, true, "user", "pass", 50, 2, 0)); + servers.push_back(CreateServer(false, "Server2", "news2.com", 563, true, "user", "pass", 50, 2, 1)); SystemHealth::NewsServers::AnyPrimaryServerExistsValidator v(servers); SystemHealth::Status s = v.Validate(); @@ -247,8 +247,8 @@ BOOST_AUTO_TEST_CASE(TestAnyPrimaryServerExists_NoActivePrimary) BOOST_AUTO_TEST_CASE(TestAnyPrimaryServerExists_ActivePrimary) { ::Servers servers; - servers.push_back(CreateServer(true, "Server1", "news1.com", 563, true, "user", "pass", 50, 0)); - servers.push_back(CreateServer(false, "Server2", "news2.com", 563, true, "user", "pass", 50, 1)); + servers.push_back(CreateServer(true, "Server1", "news1.com", 563, true, "user", "pass", 50, 2, 0)); + servers.push_back(CreateServer(false, "Server2", "news2.com", 563, true, "user", "pass", 50, 2, 1)); SystemHealth::NewsServers::AnyPrimaryServerExistsValidator v(servers); BOOST_CHECK(v.Validate().IsOk()); @@ -257,8 +257,8 @@ BOOST_AUTO_TEST_CASE(TestAnyPrimaryServerExists_ActivePrimary) BOOST_AUTO_TEST_CASE(TestAnyPrimaryServerExists_MultiplePrimary) { ::Servers servers; - servers.push_back(CreateServer(true, "Server1", "news1.com", 563, true, "user", "pass", 50, 0)); - servers.push_back(CreateServer(true, "Server2", "news2.com", 563, true, "user", "pass", 50, 0)); + servers.push_back(CreateServer(true, "Server1", "news1.com", 563, true, "user", "pass", 50, 2, 0)); + servers.push_back(CreateServer(true, "Server2", "news2.com", 563, true, "user", "pass", 50, 2, 0)); SystemHealth::NewsServers::AnyPrimaryServerExistsValidator v(servers); BOOST_CHECK(v.Validate().IsOk()); @@ -267,8 +267,8 @@ BOOST_AUTO_TEST_CASE(TestAnyPrimaryServerExists_MultiplePrimary) BOOST_AUTO_TEST_CASE(TestAnyPrimaryServerExists_HigherLevelActive) { ::Servers servers; - servers.push_back(CreateServer(false, "Server1", "news1.com", 563, true, "user", "pass", 50, 0)); - servers.push_back(CreateServer(true, "Server2", "news2.com", 563, true, "user", "pass", 50, 1)); + servers.push_back(CreateServer(false, "Server1", "news1.com", 563, true, "user", "pass", 50, 2, 0)); + servers.push_back(CreateServer(true, "Server2", "news2.com", 563, true, "user", "pass", 50, 2, 1)); SystemHealth::NewsServers::AnyPrimaryServerExistsValidator v(servers); SystemHealth::Status s = v.Validate(); diff --git a/webui/config.js b/webui/config.js index eb6445420..4f4614709 100644 --- a/webui/config.js +++ b/webui/config.js @@ -34,6 +34,7 @@ function Server() this.name = ''; this.port = 0; this.connections = 0; + this.pipelinedepth = 0; } var Options = (new function($) @@ -131,6 +132,10 @@ var Options = (new function($) if (connectionsOpt) server.connections = connectionsOpt.Value; + var pipelinedepthOpt = findOption(this.options, serverId + '.PipelineDepth'); + if (pipelinedepthOpt) + server.pipelinedepth = pipelinedepthOpt.Value; + return server; } diff --git a/webui/index.html b/webui/index.html index 2f34b554f..8fe556dcf 100644 --- a/webui/index.html +++ b/webui/index.html @@ -851,7 +851,7 @@

Tools

News Servers

- + diff --git a/webui/statistics.js b/webui/statistics.js index d81e46e1a..01968597e 100644 --- a/webui/statistics.js +++ b/webui/statistics.js @@ -29,6 +29,7 @@ var Statistics = new (function ($) { this.id = 0; this.connections = 0; + this.pipelinedepth = 0; this.host = ""; this.name = ""; this.port = ""; @@ -165,6 +166,7 @@ var Statistics = new (function ($) { var newServer = new Server(); newServer.id = id; newServer.connections = server.connections; + newServer.pipelinedepth = server.pipelinedepth; newServer.host = server.host; newServer.name = server.name; newServer.port = server.port; @@ -311,6 +313,10 @@ var Statistics = new (function ($) { server.connections, "" ); + html += "" + ); html += "" diff --git a/webui/system-info.js b/webui/system-info.js index d221533c7..42f1ada29 100644 --- a/webui/system-info.js +++ b/webui/system-info.js @@ -654,7 +654,7 @@ var SystemInfo = (new function($) } }); - divName.text(server.host + ':' + server.port + '(' + server.connections + ')'); + divName.text(server.host + ':' + server.port + ' (' + server.connections + '/' + server.pipelinedepth + ')'); divName.attr({ title: server.name }); divName .addClass('overflow-auto')
Host (Connections)Host (Connections/PipelineDepth) Active Tests
PipelineDepth:".concat( + server.pipelinedepth, + "
Active:".concat( server.active ? "Yes" : "No", "