diff --git a/core/file_server/FileDiscoveryOptions.cpp b/core/file_server/FileDiscoveryOptions.cpp index 632a2cb2bd..5340167601 100644 --- a/core/file_server/FileDiscoveryOptions.cpp +++ b/core/file_server/FileDiscoveryOptions.cpp @@ -214,6 +214,23 @@ bool FileDiscoveryOptions::Init(const Json::Value& config, mBasePathInfos.push_back(std::move(pathInfo)); } + bool isNas = false; + if (!GetOptionalBoolParam(config, "IsNas", isNas, errorMsg)) { + PARAM_WARNING_DEFAULT(ctx.GetLogger(), + ctx.GetAlarm(), + errorMsg, + isNas, + pluginType, + ctx.GetConfigName(), + ctx.GetProjectName(), + ctx.GetLogstoreName(), + ctx.GetRegion()); + } + mIsNas = isNas; + if (isNas) { + mMaxDirSearchDepth = 0; + } + // PreservedDirDepth if (!GetOptionalIntParam(config, "PreservedDirDepth", mPreservedDirDepth, errorMsg)) { PARAM_WARNING_DEFAULT(ctx.GetLogger(), diff --git a/core/file_server/FileDiscoveryOptions.h b/core/file_server/FileDiscoveryOptions.h index 5a66e86dc3..749ad0e84e 100644 --- a/core/file_server/FileDiscoveryOptions.h +++ b/core/file_server/FileDiscoveryOptions.h @@ -112,6 +112,7 @@ class FileDiscoveryOptions { int64_t GetLastContainerUpdateTime() const { return mLastContainerUpdateTime; } void SetLastContainerUpdateTime(int64_t time) { mLastContainerUpdateTime = time; } + bool IsNas() const { return mIsNas; } std::vector mFilePaths; int32_t mMaxDirSearchDepth = 0; @@ -121,6 +122,7 @@ class FileDiscoveryOptions { std::vector mExcludeDirs; bool mAllowingCollectingFilesInRootDir = false; bool mAllowingIncludedByMultiConfigs = false; + bool mIsNas = false; private: void ParseWildcardPath(BasePathInfo& pathInfo); diff --git a/core/file_server/reader/LogFileReader.cpp b/core/file_server/reader/LogFileReader.cpp index 4a378b2549..f5bddec28b 100644 --- a/core/file_server/reader/LogFileReader.cpp +++ b/core/file_server/reader/LogFileReader.cpp @@ -33,8 +33,11 @@ #include "boost/filesystem.hpp" #include "boost/regex.hpp" +#include "json/json.h" #include "rapidjson/document.h" +#include "collection_pipeline/CollectionPipeline.h" + #include "app_config/AppConfig.h" #include "application/Application.h" #include "collection_pipeline/queue/ExactlyOnceQueueManager.h" @@ -78,6 +81,9 @@ DEFINE_FLAG_INT32(max_fix_pos_bytes, "", 128 * 1024); DEFINE_FLAG_INT32(force_release_deleted_file_fd_timeout, "force release fd if file is deleted after specified seconds, no matter read to end or not", -1); +DEFINE_FLAG_INT32(nas_read_stable_interval_ms, + "NAS mode: refresh readable size interval in ms (align with NFS FileAttr cache T 1s~60s), default 1000", + 1000); #if defined(_MSC_VER) // On Windows, if Chinese config base path is used, the log path will be converted to GBK, // so the __tag__.__path__ have to be converted back to UTF8 to avoid bad display. @@ -118,6 +124,9 @@ LogFileReader* LogFileReader::CreateLogFileReader(const string& hostLogPathDir, } if (reader) { + if (discoveryConfig.first) { + reader->SetNasMode(discoveryConfig.first->IsNas()); + } if (forceFromBeginning) { reader->SetReadFromBeginning(); } @@ -256,6 +265,41 @@ void LogFileReader::SetMetrics() { mSourceReadOffsetBytes = mMetricsRecordRef->GetIntGauge(METRIC_PLUGIN_SOURCE_READ_OFFSET_BYTES); } +bool LogFileReader::PrepareNasRead(bool isFlushTimeout) { + if (!mIsNasMode || isFlushTimeout || !mLogFileOp.IsOpen()) { + return true; + } + fsutil::PathStat statBuf; + if (mLogFileOp.Stat(statBuf) != 0) { + return true; + } + const int64_t curSize = statBuf.GetFileSize(); + const uint64_t nowMs = GetCurrentTimeInMilliSeconds(); + const bool shouldUpdateBoundary = (mNasLastBoundaryTimeMs == 0) + || (nowMs - mNasLastBoundaryTimeMs + >= static_cast(INT32_FLAG(nas_read_stable_interval_ms))); + if (shouldUpdateBoundary) { + bool isDeleted = false; + CloseFilePtr(isDeleted); + if (isDeleted || !UpdateFilePtr()) { + return false; + } + mNasLastBoundaryTimeMs = nowMs; + mNasSizeAtLastSecondBoundary = curSize; + } + if (mNasSizeAtLastSecondBoundary < 0) { + mNasSizeAtLastSecondBoundary = std::min(curSize, mLastFilePos > 0 ? mLastFilePos : 0); + } + int64_t readableSize; + if (mNasSizeAtLastSecondBoundary < mLastFilePos) { + readableSize = mLastFilePos; + } else { + readableSize = std::min(mLastFileSize, mNasSizeAtLastSecondBoundary); + } + SetExpectedFileSize(readableSize); + return true; +} + void LogFileReader::DumpMetaToMem(bool checkConfigFlag, int32_t idxInReaderArray) { if (checkConfigFlag) { size_t index = mHostLogPath.rfind(PATH_SEPARATOR); @@ -993,6 +1037,9 @@ bool LogFileReader::ReadLog(LogBuffer& logBuffer, const Event* event, bool isSta return false; } } + if (!PrepareNasRead(event != nullptr && event->IsReaderFlushTimeout())) { + return false; + } bool moreData = GetRawData(logBuffer, mLastFileSize, tryRollback, isStaticFile); if (!logBuffer.rawBuffer.empty()) { if (mEOOption) { diff --git a/core/file_server/reader/LogFileReader.h b/core/file_server/reader/LogFileReader.h index bedded7656..a0d01a2215 100644 --- a/core/file_server/reader/LogFileReader.h +++ b/core/file_server/reader/LogFileReader.h @@ -279,6 +279,7 @@ class LogFileReader { mLastFilePos = pos; } void SetExpectedFileSize(int64_t size) { mExpectedFileSize = size; } + void SetNasMode(bool isNas) { mIsNasMode = isNas; } void InitReader(bool tailExisted = false, FileReadPolicy policy = BACKWARD_TO_FIXED_POS, uint32_t eoConcurrency = 0); @@ -486,6 +487,8 @@ class LogFileReader { } void ResolveHostLogPath(); + // NAS mode: compute stable size and call SetExpectedFileSize(stableSize); no out param. + bool PrepareNasRead(bool isFlushTimeout); // std::string mRegion; // std::string mCategory; // std::string mConfigName; @@ -578,6 +581,11 @@ class LogFileReader { IntGaugePtr mSourceSizeBytes; IntGaugePtr mSourceReadOffsetBytes; + // NAS mode: read up to "file size from 1s ago", update every 1s (reopen + refresh boundary size). + bool mIsNasMode = false; + uint64_t mNasLastBoundaryTimeMs = 0; // last time we updated readable size; 0 = not yet + int64_t mNasSizeAtLastSecondBoundary = -1; // file size at last boundary (readable limit) + private: bool mHasReadContainerBom = false; void checkContainerType(LogFileOperator& op);