diff --git a/core/container_manager/ContainerManager.cpp b/core/container_manager/ContainerManager.cpp index 7c0ca9bded..cc2e149275 100644 --- a/core/container_manager/ContainerManager.cpp +++ b/core/container_manager/ContainerManager.cpp @@ -100,6 +100,7 @@ void ContainerManager::pollingLoop() { } void ContainerManager::ApplyContainerDiffs() { + WriteLock lock(mFileDiscoveryConfigsRWLock); auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); std::vector> configResults; for (auto& pair : mConfigContainerDiffMap) { @@ -138,6 +139,10 @@ void ContainerManager::ApplyContainerDiffs() { if (containerInfos) { const auto& basePathInfos = options->GetBasePathInfos(); for (const auto& info : *containerInfos) { + if (!info.mRawContainerInfo) { + continue; + } + // Validate array size consistency if (info.mRealBaseDirs.size() != basePathInfos.size()) { LOG_WARNING( @@ -175,27 +180,14 @@ void ContainerManager::ApplyContainerDiffs() { mConfigContainerDiffMap.clear(); } -void ContainerManager::sendAllMatchedContainerInfo() { - std::vector> configResults; - auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); - - for (auto& pair : nameConfigMap) { - FileDiscoveryOptions* options = pair.second.first; - if (options->IsContainerDiscoveryEnabled()) { - if (options->GetContainerDiscoveryOptions().mCollectingContainersMeta - && options->GetContainerDiscoveryOptions().mMatchedContainerInfo) { - configResults.push_back(options->GetContainerDiscoveryOptions().mMatchedContainerInfo); - } - } - } - sendMatchedContainerInfo(configResults); -} bool ContainerManager::CheckContainerDiffForAllConfig() { if (!mIsRunning) { return false; } bool isUpdate = false; + + WriteLock lock(mFileDiscoveryConfigsRWLock); auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); for (auto itr = nameConfigMap.begin(); itr != nameConfigMap.end(); ++itr) { FileDiscoveryOptions* options = itr->second.first; @@ -209,18 +201,43 @@ bool ContainerManager::CheckContainerDiffForAllConfig() { return isUpdate; } + +// logtail_containers 自监控数据上报逻辑 +void ContainerManager::sendAllMatchedContainerInfo() { + std::vector> configResults; + { + ReadLock lock(mFileDiscoveryConfigsRWLock); + auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); + + for (auto& pair : nameConfigMap) { + FileDiscoveryOptions* options = pair.second.first; + if (options->IsContainerDiscoveryEnabled()) { + if (options->GetContainerDiscoveryOptions().mCollectingContainersMeta + && options->GetContainerDiscoveryOptions().mMatchedContainerInfo) { + configResults.push_back(options->GetContainerDiscoveryOptions().mMatchedContainerInfo); + } + } + } + } + sendMatchedContainerInfo(configResults); +} + +// logtail_containers 自监控数据上报逻辑 void ContainerManager::UpdateMatchedContainerInfoPipeline(CollectionPipelineContext* ctx, size_t inputIndex) { WriteLock lock(mMatchedContainerInfoPipelineMux); mMatchedContainerInfoPipelineCtx = ctx; mMatchedContainerInfoInputIndex = inputIndex; } + +// logtail_containers 自监控数据上报逻辑 void ContainerManager::RemoveMatchedContainerInfoPipeline() { WriteLock lock(mMatchedContainerInfoPipelineMux); mMatchedContainerInfoPipelineCtx = nullptr; mMatchedContainerInfoInputIndex = 0; } +// logtail_containers 自监控数据上报逻辑 void ContainerManager::sendMatchedContainerInfo(std::vector> configResults) { ReadLock lock(mMatchedContainerInfoPipelineMux); if (mMatchedContainerInfoPipelineCtx == nullptr) { @@ -275,33 +292,47 @@ bool ContainerManager::checkContainerDiffForOneConfig(FileDiscoveryOptions* opti const CollectionPipelineContext* ctx) { // If this config's container update time is newer than or equal to global update time, // return the cached result if it exists - if (options->GetLastContainerUpdateTime() > mLastUpdateTime) { + bool refrashAllContainers = false; + if (options->GetLastContainerUpdateTime() <= mLastFullUpdateTime) { + refrashAllContainers = true; + } else if (options->GetLastContainerUpdateTime() <= mLastIncrementalUpdateTime) { + refrashAllContainers = false; + } else { return false; } std::unordered_map> containerInfoMap; - const auto& containerInfos = options->GetContainerInfo(); - if (containerInfos) { - for (const auto& info : *containerInfos) { - containerInfoMap[info.mRawContainerInfo->mID] = info.mRawContainerInfo; - } - } std::vector removedList; std::vector matchAddedList; ContainerDiff diff; - computeMatchedContainersDiff(*(options->GetFullContainerList()), - containerInfoMap, - options->GetContainerDiscoveryOptions().mContainerFilters, - options->GetContainerDiscoveryOptions().mIsStdio, - diff); + if (refrashAllContainers) { + options->SetFullContainerList(std::make_shared>()); + computeMatchedContainersDiff(*(options->GetFullContainerList()), + containerInfoMap, + options->GetContainerDiscoveryOptions().mContainerFilters, + options->GetContainerDiscoveryOptions().mIsStdio, + diff); + } else { + const auto& containerInfos = options->GetContainerInfo(); + if (containerInfos) { + for (const auto& info : *containerInfos) { + containerInfoMap[info.mRawContainerInfo->mID] = info.mRawContainerInfo; + } + } + computeMatchedContainersDiff(*(options->GetFullContainerList()), + containerInfoMap, + options->GetContainerDiscoveryOptions().mContainerFilters, + options->GetContainerDiscoveryOptions().mIsStdio, + diff); + } LOG_DEBUG( sLogger, ("diff", diff.ToString())("configName", ctx->GetConfigName())( "containerFilters", options->GetContainerDiscoveryOptions().mContainerFilters.ToString())( - "fullContainerList", options->GetFullContainerList()->size())("containerInfos", containerInfos->size())( - "lastConfigContainerUpdateTime", options->GetLastContainerUpdateTime())("mLastUpdateTime", - mLastUpdateTime)); + "fullContainerList", options->GetFullContainerList()->size())("containerInfoMap", containerInfoMap.size())( + "lastConfigContainerUpdateTime", options->GetLastContainerUpdateTime())( + "mLastFullUpdateTime", mLastFullUpdateTime)("mLastIncrementalUpdateTime", mLastIncrementalUpdateTime)); // Update the config's container update time when there are changes options->SetLastContainerUpdateTime(time(nullptr)); @@ -365,7 +396,7 @@ void ContainerManager::incrementallyUpdateContainersSnapshot() { } if (hasChanges) { - mLastUpdateTime = time(nullptr); + mLastIncrementalUpdateTime = time(nullptr); } } @@ -399,10 +430,8 @@ void ContainerManager::refreshAllContainersSnapshot() { std::lock_guard lock(mContainerMapMutex); mContainerMap.swap(tmpContainerMap); } - mLastUpdateTime = time(nullptr); + mLastFullUpdateTime = time(nullptr); - // Update container info pointers in all configs to point to the new RawContainerInfo objects - updateContainerInfoPointersInAllConfigs(); tmpContainerMap.clear(); } @@ -471,7 +500,6 @@ std::string ContainerManager::joinContainerIDs(const std::vector& c } void ContainerManager::GetContainerStoppedEvents(std::vector& eventVec) { - const auto& nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); std::vector stoppedContainerIDs; { std::lock_guard lock(mStoppedContainerIDsMutex); @@ -482,17 +510,27 @@ void ContainerManager::GetContainerStoppedEvents(std::vector& eventVec) } LOG_INFO(sLogger, ("stoppedContainerIDs", ToString(stoppedContainerIDs))); + ReadLock lock(mFileDiscoveryConfigsRWLock); + const auto& nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); for (const auto& containerId : stoppedContainerIDs) { for (auto itr = nameConfigMap.begin(); itr != nameConfigMap.end(); ++itr) { const FileDiscoveryOptions* options = itr->second.first; if (!options->IsContainerDiscoveryEnabled()) { continue; } - const auto& containerInfos = options->GetContainerInfo(); - if (!containerInfos) { + // Capture a local shared_ptr snapshot to protect against concurrent modifications + // by ApplyContainerDiffs (which may call UpdateRawContainerInfo/DeleteRawContainerInfo) + auto containerInfosSnapshot = options->GetContainerInfo(); + if (!containerInfosSnapshot) { continue; } - for (auto& info : *containerInfos) { + // Now iterate the snapshot - even if options->mContainerInfos is modified by another thread, + // our snapshot's reference count keeps the underlying vector alive + for (auto& info : *containerInfosSnapshot) { + // Protect access to mRawContainerInfo with lock + if (!info.mRawContainerInfo) { + continue; + } if (info.mRawContainerInfo->mID == containerId) { // 为每个真实路径生成停止事件 for (const auto& realBaseDir : info.mRealBaseDirs) { @@ -504,7 +542,7 @@ void ContainerManager::GetContainerStoppedEvents(std::vector& eventVec) eventVec.push_back(pStoppedEvent); } } - info.mRawContainerInfo->mStopped = true; + info.mRawContainerInfo->mStopped.store(true); LOG_DEBUG(sLogger, ("generate stop event, containerId", containerId)("configName", itr->first)); } } @@ -586,9 +624,16 @@ void ContainerManager::computeMatchedContainersDiff( const ContainerFilters& filters, bool isStdio, ContainerDiff& diff) { + // Create a local snapshot of mContainerMap to avoid holding the lock for extended period + std::unordered_map> containerMapSnapshot; + { + std::lock_guard lock(mContainerMapMutex); + containerMapSnapshot = mContainerMap; + } + // 移除已删除的容器 for (auto it = fullContainerIDList.begin(); it != fullContainerIDList.end();) { - if (mContainerMap.find(*it) == mContainerMap.end()) { + if (containerMapSnapshot.find(*it) == containerMapSnapshot.end()) { std::string id = *it; // 复制一份,避免 erase 后引用失效 it = fullContainerIDList.erase(it); // 删除元素并移到下一个 if (matchList.find(id) != matchList.end()) { @@ -601,7 +646,7 @@ void ContainerManager::computeMatchedContainersDiff( // 更新匹配的容器状态 for (auto& pair : matchList) { - if (auto it = mContainerMap.find(pair.first); it != mContainerMap.end()) { + if (auto it = containerMapSnapshot.find(pair.first); it != containerMapSnapshot.end()) { // 更新为最新的 info if (*pair.second != *it->second) { diff.mModified.push_back(it->second); @@ -610,7 +655,7 @@ void ContainerManager::computeMatchedContainersDiff( } // 添加新容器 - for (const auto& pair : mContainerMap) { + for (const auto& pair : containerMapSnapshot) { // 如果 fullContainerIDList 中不存在该 id if (fullContainerIDList.find(pair.first) == fullContainerIDList.end()) { if (!isStdio && pair.second->mStatus != "running") { @@ -647,7 +692,7 @@ static Json::Value SerializeRawContainerInfo(const std::shared_ptrmName); v["UpperDir"] = Json::Value(info->mUpperDir); v["LogPath"] = Json::Value(info->mLogPath); - v["Stopped"] = Json::Value(info->mStopped); + v["Stopped"] = Json::Value(info->mStopped.load()); v["Status"] = Json::Value(info->mStatus); // mounts @@ -717,7 +762,7 @@ static std::shared_ptr DeserializeRawContainerInfo(const Json: info->mLogPath = v["LogPath"].asString(); } if (v.isMember("Stopped") && v["Stopped"].isBool()) { - info->mStopped = v["Stopped"].asBool(); + info->mStopped.store(v["Stopped"].asBool()); } if (v.isMember("Status") && v["Status"].isString()) { info->mStatus = v["Status"].asString(); @@ -1022,81 +1067,21 @@ void ContainerManager::loadContainerInfoFromContainersFormat(const Json::Value& mContainerMap.swap(tmp); } // Apply containers to all existing configs - auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); - - LOG_DEBUG(sLogger, ("recover containers to nameConfigMap", nameConfigMap.size())); - - std::vector> allContainers; - for (auto itr = nameConfigMap.begin(); itr != nameConfigMap.end(); ++itr) { - FileDiscoveryOptions* options = itr->second.first; - if (options->IsContainerDiscoveryEnabled()) { - checkContainerDiffForOneConfig(options, itr->second.second); - } - } - LOG_INFO(sLogger, ("load container state from docker_path_config.json (v1.0.0)", configPath)); - } -} - -void ContainerManager::updateContainerInfoPointersInAllConfigs() { - auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); - for (const auto& configPair : nameConfigMap) { - FileDiscoveryOptions* options = configPair.second.first; - if (!options->IsContainerDiscoveryEnabled()) { - continue; - } - - const auto& containerInfos = options->GetContainerInfo(); - if (!containerInfos) { - continue; - } - - // Update RawContainerInfo pointers for each container in this config - for (auto& containerInfo : *containerInfos) { - const std::string& containerId = containerInfo.mRawContainerInfo->mID; - std::lock_guard lock(mContainerMapMutex); - auto it = mContainerMap.find(containerId); - if (it != mContainerMap.end()) { - // Update the pointer to point to the new RawContainerInfo object - containerInfo.mRawContainerInfo = it->second; - } else { - // Container no longer exists in the global map, this should not happen - // but log a warning if it does - LOG_WARNING(sLogger, ("container not found in global map during pointer update", containerId)); - } - } - } -} - -void ContainerManager::updateContainerInfoPointersForContainers(const std::vector& containerIDs) { - auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); - for (const auto& configPair : nameConfigMap) { - FileDiscoveryOptions* options = configPair.second.first; - if (!options->IsContainerDiscoveryEnabled()) { - continue; - } + { + WriteLock lock(mFileDiscoveryConfigsRWLock); + auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); - const auto& containerInfos = options->GetContainerInfo(); - if (!containerInfos) { - continue; - } + LOG_DEBUG(sLogger, ("recover containers to nameConfigMap", nameConfigMap.size())); - // Update RawContainerInfo pointers only for the specified container IDs - for (auto& containerInfo : *containerInfos) { - const std::string& containerId = containerInfo.mRawContainerInfo->mID; - // Check if this container ID is in the list of updated containers - if (std::find(containerIDs.begin(), containerIDs.end(), containerId) != containerIDs.end()) { - std::lock_guard lock(mContainerMapMutex); - auto it = mContainerMap.find(containerId); - if (it != mContainerMap.end()) { - // Update the pointer to point to the new RawContainerInfo object - containerInfo.mRawContainerInfo = it->second; - } else { - // Container no longer exists in the global map, this should not happen - // but log a warning if it does - LOG_WARNING(sLogger, ("container not found in global map during pointer update", containerId)); + std::vector> allContainers; + for (auto itr = nameConfigMap.begin(); itr != nameConfigMap.end(); ++itr) { + FileDiscoveryOptions* options = itr->second.first; + if (options->IsContainerDiscoveryEnabled()) { + checkContainerDiffForOneConfig(options, itr->second.second); } } } + LOG_INFO(sLogger, ("load container state from docker_path_config.json (v1.0.0)", configPath)); } } diff --git a/core/container_manager/ContainerManager.h b/core/container_manager/ContainerManager.h index 54c658f770..aabf7eebe9 100644 --- a/core/container_manager/ContainerManager.h +++ b/core/container_manager/ContainerManager.h @@ -23,6 +23,7 @@ #include #include "collection_pipeline/CollectionPipelineContext.h" +#include "common/Lock.h" #include "constants/TagConstants.h" #include "container_manager/ContainerDiff.h" #include "container_manager/ContainerDiscoveryOptions.h" @@ -67,8 +68,6 @@ class ContainerManager { void incrementallyUpdateContainersSnapshot(); bool checkContainerDiffForOneConfig(FileDiscoveryOptions* options, const CollectionPipelineContext* ctx); - void updateContainerInfoPointersInAllConfigs(); - void updateContainerInfoPointersForContainers(const std::vector& containerIDs); void computeMatchedContainersDiff(std::set& fullContainerIDList, const std::unordered_map>& matchList, @@ -89,10 +88,12 @@ class ContainerManager { std::unordered_map> mConfigContainerDiffMap; std::unordered_map> mConfigContainerResultMap; std::mutex mContainerMapMutex; + mutable ReadWriteLock mFileDiscoveryConfigsRWLock; std::vector mStoppedContainerIDs; std::mutex mStoppedContainerIDsMutex; - uint32_t mLastUpdateTime = 0; + uint32_t mLastIncrementalUpdateTime = 0; + uint32_t mLastFullUpdateTime = 0; std::future mThreadRes; std::atomic mIsRunning{false}; diff --git a/core/file_server/ContainerInfo.h b/core/file_server/ContainerInfo.h index 4853a29f52..c75059d7dd 100644 --- a/core/file_server/ContainerInfo.h +++ b/core/file_server/ContainerInfo.h @@ -18,6 +18,7 @@ #include +#include #include #include #include @@ -66,10 +67,46 @@ struct RawContainerInfo { // 容器标签信息 std::unordered_map mContainerLabels; - bool mStopped = false; + std::atomic mStopped{false}; std::string mStatus; + // Custom copy constructor to handle std::atomic + RawContainerInfo() = default; + + RawContainerInfo(const RawContainerInfo& other) + : mID(other.mID), + mName(other.mName), + mLogPath(other.mLogPath), + mUpperDir(other.mUpperDir), + mMounts(other.mMounts), + mMetadatas(other.mMetadatas), + mCustomMetadatas(other.mCustomMetadatas), + mK8sInfo(other.mK8sInfo), + mEnv(other.mEnv), + mContainerLabels(other.mContainerLabels), + mStopped(other.mStopped.load()), + mStatus(other.mStatus) {} + + // Custom copy assignment operator + RawContainerInfo& operator=(const RawContainerInfo& other) { + if (this != &other) { + mID = other.mID; + mName = other.mName; + mLogPath = other.mLogPath; + mUpperDir = other.mUpperDir; + mMounts = other.mMounts; + mMetadatas = other.mMetadatas; + mCustomMetadatas = other.mCustomMetadatas; + mK8sInfo = other.mK8sInfo; + mEnv = other.mEnv; + mContainerLabels = other.mContainerLabels; + mStopped.store(other.mStopped.load()); + mStatus = other.mStatus; + } + return *this; + } + bool operator==(const RawContainerInfo& rhs) const { if (mID != rhs.mID) { return false; diff --git a/core/file_server/reader/LogFileReader.cpp b/core/file_server/reader/LogFileReader.cpp index e3ea1d5e3e..e435434af7 100644 --- a/core/file_server/reader/LogFileReader.cpp +++ b/core/file_server/reader/LogFileReader.cpp @@ -2552,7 +2552,7 @@ bool LogFileReader::UpdateContainerInfo() { LOG_INFO(sLogger, ("container info of file reader changed", "may be because container restart")( "old container id", mContainerID)("new container id", containerInfo->mRawContainerInfo->mID)( - "container status", containerInfo->mRawContainerInfo->mStopped ? "stopped" : "running")); + "container status", containerInfo->mRawContainerInfo->mStopped.load() ? "stopped" : "running")); // if config have wildcard path, use wildcardPaths[0] as base path std::string dockerPath; size_t realBaseDirSize = 0; @@ -2575,7 +2575,7 @@ bool LogFileReader::UpdateContainerInfo() { } SetDockerPath(dockerPath, realBaseDirSize); SetContainerID(containerInfo->mRawContainerInfo->mID); - mContainerStopped = containerInfo->mRawContainerInfo->mStopped; + mContainerStopped = containerInfo->mRawContainerInfo->mStopped.load(); mContainerMetadatas.clear(); mContainerExtraTags.clear(); SetContainerMetadatas(containerInfo->mRawContainerInfo->mMetadatas); diff --git a/core/unittest/container_manager/ContainerManagerUnittest.cpp b/core/unittest/container_manager/ContainerManagerUnittest.cpp index 815cb9d6ff..eb93746d59 100644 --- a/core/unittest/container_manager/ContainerManagerUnittest.cpp +++ b/core/unittest/container_manager/ContainerManagerUnittest.cpp @@ -13,12 +13,18 @@ // limitations under the License. +#include + #include +#include #include +#include #include #include #include +#include #include +#include #include #include @@ -30,6 +36,8 @@ #include "common/RuntimeUtil.h" #include "container_manager/ContainerDiscoveryOptions.h" #include "container_manager/ContainerManager.h" +#include "file_server/FileDiscoveryOptions.h" +#include "file_server/FileServer.h" #include "unittest/Unittest.h" #include "unittest/pipeline/LogtailPluginMock.h" @@ -52,12 +60,22 @@ class ContainerManagerUnittest : public testing::Test { void TestLoadContainerInfoVersionHandling() const; void TestSaveContainerInfoWithVersion() const; void TestContainerMatchingConsistency() const; + void TestcomputeMatchedContainersDiffWithSnapshot() const; + void TestNullRawContainerInfoHandling() const; + void TestConcurrentContainerMapAccess_T1T2(); + void TestConcurrentContainerMapAccess_T1T3(); + void TestConcurrentContainerMapAccess_T1T2T3(); + void TestSequentialContainerDiffAndApply(); void runTestFile(const std::string& testFilePath) const; private: + FileDiscoveryOptions mDiscoveryOpts; + CollectionPipelineContext ctx; + void parseLabelFilters(const Json::Value& filtersJson, MatchCriteriaFilter& filter) const; bool parseContainerFilterConfigFromTestJson(const Json::Value& filterJson, ContainerFilterConfig& config) const; std::string findTestDataDirectory() const; + void runConcurrentContainerMapAccessTest(bool enableT2, bool enableT3, const std::string& testName); }; void ContainerManagerUnittest::TestcomputeMatchedContainersDiff() const { @@ -1291,6 +1309,859 @@ bool ContainerManagerUnittest::parseContainerFilterConfigFromTestJson(const Json return true; } +void ContainerManagerUnittest::TestcomputeMatchedContainersDiffWithSnapshot() const { + // This test verifies that computeMatchedContainersDiff uses a snapshot of mContainerMap + // to avoid race conditions when the map is modified concurrently + ContainerManager containerManager; + + // Set up initial containers in mContainerMap + RawContainerInfo info1; + info1.mID = "snapshot1"; + info1.mLogPath = "/var/lib/docker/containers/snapshot1/logs"; + info1.mStatus = "running"; + containerManager.mContainerMap["snapshot1"] = std::make_shared(info1); + + RawContainerInfo info2; + info2.mID = "snapshot2"; + info2.mLogPath = "/var/lib/docker/containers/snapshot2/logs"; + info2.mStatus = "running"; + containerManager.mContainerMap["snapshot2"] = std::make_shared(info2); + + RawContainerInfo info3; + info3.mID = "snapshot3"; + info3.mLogPath = "/var/lib/docker/containers/snapshot3/logs"; + info3.mStatus = "exited"; + containerManager.mContainerMap["snapshot3"] = std::make_shared(info3); + + // Test 1: Verify that only running containers are added (isStdio = false) + { + std::set fullList; + std::unordered_map> matchList; + ContainerFilters filters; + ContainerDiff diff; + + containerManager.computeMatchedContainersDiff(fullList, matchList, filters, false, diff); + + // Should only add running containers to both diff.mAdded and fullList + EXPECT_EQ(diff.mAdded.size(), 2); + EXPECT_EQ(fullList.size(), 2); // Only running containers are added to fullList when isStdio=false + + std::set addedIds; + for (const auto& container : diff.mAdded) { + addedIds.insert(container->mID); + } + EXPECT_TRUE(addedIds.find("snapshot1") != addedIds.end()); + EXPECT_TRUE(addedIds.find("snapshot2") != addedIds.end()); + EXPECT_TRUE(addedIds.find("snapshot3") == addedIds.end()); // exited container should not be added + + // Verify fullList only contains running containers + EXPECT_TRUE(fullList.find("snapshot1") != fullList.end()); + EXPECT_TRUE(fullList.find("snapshot2") != fullList.end()); + EXPECT_TRUE(fullList.find("snapshot3") == fullList.end()); + } + + // Test 2: Verify that isStdio=true includes non-running containers + { + std::set fullList; + std::unordered_map> matchList; + ContainerFilters filters; + ContainerDiff diff; + + containerManager.computeMatchedContainersDiff(fullList, matchList, filters, true, diff); + + // Should add all containers when isStdio=true (including exited) + EXPECT_EQ(diff.mAdded.size(), 3); + EXPECT_EQ(fullList.size(), 3); // All containers are added to fullList when isStdio=true + + std::set addedIds; + for (const auto& container : diff.mAdded) { + addedIds.insert(container->mID); + } + EXPECT_TRUE(addedIds.find("snapshot1") != addedIds.end()); + EXPECT_TRUE(addedIds.find("snapshot2") != addedIds.end()); + EXPECT_TRUE(addedIds.find("snapshot3") != addedIds.end()); + + // Verify fullList contains all containers + EXPECT_TRUE(fullList.find("snapshot1") != fullList.end()); + EXPECT_TRUE(fullList.find("snapshot2") != fullList.end()); + EXPECT_TRUE(fullList.find("snapshot3") != fullList.end()); + } + + // Test 3: Verify snapshot behavior - containers removed from fullList are detected + { + std::set fullList; + fullList.insert("snapshot1"); + fullList.insert("removed1"); // This container exists in fullList but not in mContainerMap + + std::unordered_map> matchList; + RawContainerInfo removedInfo; + removedInfo.mID = "removed1"; + removedInfo.mLogPath = "/var/lib/docker/containers/removed1/logs"; + matchList["removed1"] = std::make_shared(removedInfo); + + ContainerFilters filters; + ContainerDiff diff; + + containerManager.computeMatchedContainersDiff(fullList, matchList, filters, false, diff); + + // The removed container should be in mRemoved list + EXPECT_TRUE(std::find(diff.mRemoved.begin(), diff.mRemoved.end(), "removed1") != diff.mRemoved.end()); + // The removed container should no longer be in fullList + EXPECT_EQ(fullList.count("removed1"), 0); + + // snapshot1 should still be in fullList + EXPECT_TRUE(fullList.find("snapshot1") != fullList.end()); + + // snapshot2 should be added to fullList (running container not previously in fullList) + EXPECT_TRUE(fullList.find("snapshot2") != fullList.end()); + + // snapshot3 should NOT be in fullList (exited container with isStdio=false) + EXPECT_TRUE(fullList.find("snapshot3") == fullList.end()); + + // Final fullList size should be 2 (snapshot1 and snapshot2) + EXPECT_EQ(fullList.size(), 2); + + // diff.mAdded should contain snapshot2 + EXPECT_EQ(diff.mAdded.size(), 1); + if (diff.mAdded.size() > 0) { + EXPECT_EQ(diff.mAdded[0]->mID, "snapshot2"); + } + } + + // Test 4: Verify snapshot behavior - modified containers are detected + { + std::set fullList; + fullList.insert("snapshot1"); + + std::unordered_map> matchList; + RawContainerInfo oldInfo; + oldInfo.mID = "snapshot1"; + oldInfo.mLogPath = "/old/path"; // Different from current mContainerMap + matchList["snapshot1"] = std::make_shared(oldInfo); + + ContainerFilters filters; + ContainerDiff diff; + + containerManager.computeMatchedContainersDiff(fullList, matchList, filters, false, diff); + + // The container should be in mModified list + EXPECT_EQ(diff.mModified.size(), 1); + if (diff.mModified.size() > 0) { + EXPECT_EQ(diff.mModified[0]->mID, "snapshot1"); + EXPECT_EQ(diff.mModified[0]->mLogPath, "/var/lib/docker/containers/snapshot1/logs"); + } + } +} + +void ContainerManagerUnittest::TestNullRawContainerInfoHandling() const { + // This test verifies that null mRawContainerInfo pointers are handled gracefully + // as added in the race condition fix + + // Note: This test is limited because ContainerInfo is typically managed internally + // and we cannot easily inject null mRawContainerInfo in the current implementation. + // However, we can test the defensive checks are in place by verifying the code + // doesn't crash with various edge cases. + + ContainerManager containerManager; + + // Test 1: Empty container map should not cause issues + { + std::set fullList; + std::unordered_map> matchList; + ContainerFilters filters; + ContainerDiff diff; + + // Should not crash with empty map + containerManager.computeMatchedContainersDiff(fullList, matchList, filters, false, diff); + EXPECT_EQ(diff.mAdded.size(), 0); + EXPECT_EQ(diff.mRemoved.size(), 0); + EXPECT_EQ(diff.mModified.size(), 0); + } + + // Test 2: Container with minimal info + { + RawContainerInfo minimalInfo; + minimalInfo.mID = "minimal1"; + minimalInfo.mStatus = "running"; + containerManager.mContainerMap["minimal1"] = std::make_shared(minimalInfo); + + std::set fullList; + std::unordered_map> matchList; + ContainerFilters filters; + ContainerDiff diff; + + containerManager.computeMatchedContainersDiff(fullList, matchList, filters, false, diff); + EXPECT_EQ(diff.mAdded.size(), 1); + if (diff.mAdded.size() > 0) { + EXPECT_EQ(diff.mAdded[0]->mID, "minimal1"); + } + } + + // Test 3: Multiple containers with various states + { + containerManager.mContainerMap.clear(); + + RawContainerInfo running1; + running1.mID = "running1"; + running1.mStatus = "running"; + running1.mLogPath = "/var/log/running1"; + containerManager.mContainerMap["running1"] = std::make_shared(running1); + + RawContainerInfo exited1; + exited1.mID = "exited1"; + exited1.mStatus = "exited"; + exited1.mLogPath = "/var/log/exited1"; + containerManager.mContainerMap["exited1"] = std::make_shared(exited1); + + RawContainerInfo created1; + created1.mID = "created1"; + created1.mStatus = "created"; + created1.mLogPath = "/var/log/created1"; + containerManager.mContainerMap["created1"] = std::make_shared(created1); + + std::set fullList; + std::unordered_map> matchList; + ContainerFilters filters; + ContainerDiff diff; + + // With isStdio=false, only running containers should be added + containerManager.computeMatchedContainersDiff(fullList, matchList, filters, false, diff); + EXPECT_EQ(diff.mAdded.size(), 1); + EXPECT_EQ(fullList.size(), 1); // Only running container in fullList + if (diff.mAdded.size() > 0) { + EXPECT_EQ(diff.mAdded[0]->mID, "running1"); + } + + // With isStdio=true, all containers should be added + fullList.clear(); + matchList.clear(); + ContainerDiff diff2; + containerManager.computeMatchedContainersDiff(fullList, matchList, filters, true, diff2); + EXPECT_EQ(diff2.mAdded.size(), 3); + EXPECT_EQ(fullList.size(), 3); // All containers in fullList when isStdio=true + } + + // Test 4: Verify thread-safe snapshot usage + { + containerManager.mContainerMap.clear(); + + // Add some containers + for (int i = 0; i < 10; i++) { + RawContainerInfo info; + info.mID = "container" + std::to_string(i); + info.mStatus = (i % 2 == 0) ? "running" : "exited"; + info.mLogPath = "/var/log/container" + std::to_string(i); + containerManager.mContainerMap[info.mID] = std::make_shared(info); + } + + std::set fullList; + std::unordered_map> matchList; + ContainerFilters filters; + ContainerDiff diff; + + // This should use a snapshot and not be affected by concurrent modifications + containerManager.computeMatchedContainersDiff(fullList, matchList, filters, false, diff); + + // Only running containers (even indices: 0, 2, 4, 6, 8) should be added + EXPECT_EQ(diff.mAdded.size(), 5); + EXPECT_EQ(fullList.size(), 5); // Only running containers are added to fullList when isStdio=false + + // Verify that only running containers are in fullList + for (int i = 0; i < 10; i++) { + std::string containerId = "container" + std::to_string(i); + if (i % 2 == 0) { + EXPECT_TRUE(fullList.find(containerId) != fullList.end()); + } else { + EXPECT_TRUE(fullList.find(containerId) == fullList.end()); + } + } + } +} + +void ContainerManagerUnittest::runConcurrentContainerMapAccessTest(bool enableT2, + bool enableT3, + const std::string& testName) { + // This test simulates concurrent access to mContainerMap and mRawContainerInfo to reproduce + // race conditions related to container lifecycle management. + // + // Test scenarios: + // 1. Thread 1: Continuously calls refreshAllContainersSnapshot() and incrementallyUpdateContainersSnapshot() + // (simulates real Polling thread behavior, updates mContainerMap with new RawContainerInfo) + // 2. Thread 2 (optional): Calls CheckContainerDiffForAllConfig() and ApplyContainerDiffs() + // (simulates FileServer::Resume() behavior, can trigger GetCustomExternalTags crash) + // 3. Thread 3 (optional): Calls GetContainerStoppedEvents() + // (tests mRawContainerInfo access with proper locking) + // + // This test can expose race conditions including: + // - Iterator invalidation when mContainerMap is modified during iteration + // - Use-after-free when accessing RawContainerInfo fields (mEnv, mK8sInfo.mLabels) after container replacement + // - Concurrent access to unordered_map fields without proper synchronization (in GetCustomExternalTags) + // - Concurrent access to mConfigContainerDiffMap without proper locking + // - Null pointer dereference if mRawContainerInfo is not checked + ContainerManager containerManager; + + // Set running state so CheckContainerDiffForAllConfig can execute + containerManager.mIsRunning = true; + + // Initialize with some containers + for (int i = 0; i < 20; i++) { + RawContainerInfo info; + info.mID = "container_" + std::to_string(i); + info.mStatus = "running"; + info.mLogPath = "/var/log/container_" + std::to_string(i); + containerManager.mContainerMap[info.mID] = std::make_shared(info); + } + + // Setup FileServer with a test config to enable CheckContainerDiffForAllConfig and ApplyContainerDiffs + ctx.SetConfigName(testName); + + // Create FileDiscoveryOptions with container discovery enabled + Json::Value inputConfigJson; + inputConfigJson["FilePaths"].append("/tmp/test/*.log"); + inputConfigJson["ContainerFilters"]["IncludeEnv"]["ENV_KEY_1"] = ""; + inputConfigJson["ContainerFilters"]["IncludeK8sLabel"]["app"] = ""; + inputConfigJson["ExternalEnvTag"]["ENV_KEY_1"] = "custom_env_tag"; + inputConfigJson["ExternalK8sLabelTag"]["app"] = "custom_k8s_tag"; + + mDiscoveryOpts = FileDiscoveryOptions(); + mDiscoveryOpts.Init(inputConfigJson, ctx, "test"); + mDiscoveryOpts.SetEnableContainerDiscoveryFlag(true); + + // Initialize ContainerDiscoveryOptions separately + ContainerDiscoveryOptions containerDiscoveryOpts; + containerDiscoveryOpts.Init(inputConfigJson, ctx, "test"); + mDiscoveryOpts.SetContainerDiscoveryOptions(std::move(containerDiscoveryOpts)); + + mDiscoveryOpts.SetContainerInfo(std::make_shared>()); + mDiscoveryOpts.SetFullContainerList(std::make_shared>()); + mDiscoveryOpts.SetDeduceAndSetContainerBaseDirFunc( + [](ContainerInfo& containerInfo, const CollectionPipelineContext* ctx, const FileDiscoveryOptions* opts) { + containerInfo.mRealBaseDirs.push_back("/tmp/test"); + return true; + }); + + // Add config to FileServer so CheckContainerDiffForAllConfig can find it + FileServer::GetInstance()->AddFileDiscoveryConfig(testName, &mDiscoveryOpts, &ctx); + + std::atomic testRunning{true}; + std::atomic iterationCount{0}; + std::atomic errorCount{0}; + + // Thread 1: Continuously call refreshAllContainersSnapshot and incrementallyUpdateContainersSnapshot + // This simulates the real polling thread behavior + auto modifyThread = [&]() { + int counter = 100; + while (testRunning) { + try { + // Alternate between full refresh and incremental updates (with some randomness) + if (counter % 2 == 0 || rand() % 3 == 0) { + // Simulate refreshAllContainersSnapshot with subset of existing containers + // This triggers RawContainerInfo object replacement for selected containers + // Only update 5 containers per refresh to avoid huge JSON string building + std::ostringstream metaBuilder; + metaBuilder << R"({"All": [)"; + + // Include 5 rotating containers with updated metadata + for (int i = 0; i < 5; i++) { + int containerId = (counter + i) % 20; + if (i > 0) + metaBuilder << ","; + metaBuilder << R"({"ID": "container_)" << containerId << R"(", "Name": "test_container_)" + << containerId << R"(", "Status": "running")" + << R"(, "LogPath": "/var/log/container_)" << containerId << R"(")" + << R"(, "UpperDir": "/var/lib/docker/overlay2/container_)" << containerId + << R"(/diff")" + << R"(, "Env": {"ENV_KEY_1": "value_)" << counter << R"(", "COUNTER": ")" << counter + << R"("})" + << R"(, "K8sInfo": {"Namespace": "default", "Pod": "pod-)" << containerId + << R"(", "Labels": {"app": "app_)" << (counter % 5) << R"(", "version": "v)" + << (counter % 3) << R"("}}})"; + } + + metaBuilder << R"(]})"; + + LogtailPluginMock::GetInstance()->SetUpContainersMeta(metaBuilder.str()); + containerManager.refreshAllContainersSnapshot(); + } else { + // Simulate incrementallyUpdateContainersSnapshot with updates to existing containers + // Update 2-3 existing containers to trigger RawContainerInfo replacement + std::ostringstream diffBuilder; + diffBuilder << R"({"Update": [)"; + + int numUpdates = 2 + (counter % 2); + for (int i = 0; i < numUpdates; i++) { + int containerId = (counter + i) % 20; + if (i > 0) + diffBuilder << ","; + diffBuilder << R"({"ID": "container_)" << containerId << R"(", "Name": "test_container_)" + << containerId << R"(", "Status": "running")" + << R"(, "LogPath": "/var/log/container_)" << containerId << R"(")" + << R"(, "UpperDir": "/var/lib/docker/overlay2/container_)" << containerId + << R"(/diff")" + << R"(, "Env": {"ENV_KEY_1": "incremental_)" << counter + << R"(", "UPDATED_ENV": "new_)" << counter << R"("})" + << R"(, "K8sInfo": {"Namespace": "default", "Pod": "pod-)" << containerId + << R"(", "Labels": {"app": "incremental_)" << counter + << R"(", "updated": "true"}}})"; + } + + diffBuilder << R"(], "Delete": ["old_container_)" << (counter - 50) << R"("], "Stop": ["container_)" + << (counter % 20) << R"("]})"; + + LogtailPluginMock::GetInstance()->SetUpDiffContainersMeta(diffBuilder.str()); + containerManager.incrementallyUpdateContainersSnapshot(); + } + + counter++; + + // Random sleep interval (80-200us) to create more timing variations + std::this_thread::sleep_for(std::chrono::microseconds(80 + rand() % 120)); + } catch (const std::exception& e) { + errorCount++; + } + } + }; + + // Thread 2: Continuously call CheckContainerDiffForAllConfig and ApplyContainerDiffs + // This simulates the real FileServer::Resume() behavior + auto readThread = [&]() { + // Random initial delay (0-50ms) to vary thread interleaving + std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 50)); + + while (testRunning) { + try { + // Check container diffs for all configs (fills mConfigContainerDiffMap) + containerManager.CheckContainerDiffForAllConfig(); + + // Apply the diffs (reads and clears mConfigContainerDiffMap) + // This is where GetCustomExternalTags gets called and may crash + // When thread 1 replaces RawContainerInfo in mContainerMap, the old info's + // mEnv and mK8sInfo.mLabels may be accessed here causing use-after-free + containerManager.ApplyContainerDiffs(); + + iterationCount++; + + // Random sleep interval (50-150us) to create more varied race conditions + std::this_thread::sleep_for(std::chrono::microseconds(50 + rand() % 100)); + } catch (const std::exception& e) { + errorCount++; + } + } + }; + + // Thread 3: Call GetContainerStoppedEvents to test concurrent access to mRawContainerInfo + // This tests the fix for accessing mRawContainerInfo->mID without proper locking + + + auto stoppedEventsThread = [&]() { + // Random initial delay (0-80ms) to vary thread interleaving + std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 80)); + + int counter = 0; + while (testRunning) { + try { + // Periodically add some container IDs to mStoppedContainerIDs + if (counter % 5 == 0) { + std::lock_guard lock(containerManager.mStoppedContainerIDsMutex); + containerManager.mStoppedContainerIDs.push_back("container_" + std::to_string(counter % 20)); + containerManager.mStoppedContainerIDs.push_back("dynamic_" + std::to_string(counter + 100)); + } + + // Call GetContainerStoppedEvents which accesses mRawContainerInfo + // This method was also fixed in commit e0b33e88 to add proper locking + std::vector events; + containerManager.GetContainerStoppedEvents(events); + + // Clean up events + for (auto* event : events) { + delete event; + } + + counter++; + iterationCount++; + + // Random sleep interval (80-180us) to create more varied race conditions + std::this_thread::sleep_for(std::chrono::microseconds(80 + rand() % 100)); + } catch (const std::exception& e) { + errorCount++; + } + } + }; + + + // Start threads based on configuration + std::thread t1(modifyThread); + std::unique_ptr t2; + std::unique_ptr t3; + + if (enableT2) { + t2 = std::make_unique(readThread); + } + if (enableT3) { + t3 = std::make_unique(stoppedEventsThread); + } + + // Let them run for a short time + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + + // Stop threads + testRunning = false; + + t1.join(); + if (t2) { + t2->join(); + } + if (t3) { + t3->join(); + } + + // Cleanup FileServer config + FileServer::GetInstance()->RemoveFileDiscoveryConfig(testName); + // With the fix (snapshot), there should be no errors + EXPECT_EQ(errorCount.load(), 0) << "Concurrent access caused " << errorCount.load() << " errors"; + + // Verify we actually ran many iterations + EXPECT_GT(iterationCount.load(), 100) << "Test should run many iterations to stress test concurrency"; + + std::string threadConfig = "T1"; + if (enableT2) + threadConfig += "+T2"; + if (enableT3) + threadConfig += "+T3"; + std::cout << "Concurrent test (" << threadConfig << ") completed: " << iterationCount.load() << " iterations with " + << errorCount.load() << " errors" << std::endl; +} + +void ContainerManagerUnittest::TestConcurrentContainerMapAccess_T1T2() { + runConcurrentContainerMapAccessTest(true, false, "test_concurrent_config_t1t2"); +} + +void ContainerManagerUnittest::TestConcurrentContainerMapAccess_T1T3() { + runConcurrentContainerMapAccessTest(false, true, "test_concurrent_config_t1t3"); +} + +void ContainerManagerUnittest::TestConcurrentContainerMapAccess_T1T2T3() { + runConcurrentContainerMapAccessTest(true, true, "test_concurrent_config_t1t2t3"); +} + +void ContainerManagerUnittest::TestSequentialContainerDiffAndApply() { + // This test verifies that after multiple refreshAllContainersSnapshot and incrementallyUpdateContainersSnapshot + // operations, CheckContainerDiffForAllConfig and ApplyContainerDiffs produce correct results. + // + // Test flow: + // 1. Setup FileServer with test config + // 2. Execute refreshAllContainersSnapshot multiple times to simulate container updates + // 3. Execute incrementallyUpdateContainersSnapshot to simulate incremental updates + // 4. Call CheckContainerDiffForAllConfig and ApplyContainerDiffs multiple times + // 5. Verify the FileDiscoveryOptions container info is correctly updated after each round + + const std::string testName = "test_sequential_diff_apply"; + ContainerManager containerManager; + containerManager.mIsRunning = true; + + // Setup FileServer with a test config + ctx.SetConfigName(testName); + + // Create FileDiscoveryOptions with container discovery enabled + Json::Value inputConfigJson; + inputConfigJson["FilePaths"].append("/tmp/test/*.log"); + inputConfigJson["ContainerFilters"]["IncludeEnv"]["ENV_KEY_1"] = ""; + inputConfigJson["ContainerFilters"]["IncludeK8sLabel"]["app"] = ""; + inputConfigJson["ExternalEnvTag"]["ENV_KEY_1"] = "custom_env_tag"; + inputConfigJson["ExternalK8sLabelTag"]["app"] = "custom_k8s_tag"; + + mDiscoveryOpts = FileDiscoveryOptions(); + mDiscoveryOpts.Init(inputConfigJson, ctx, "test"); + mDiscoveryOpts.SetEnableContainerDiscoveryFlag(true); + + // Initialize ContainerDiscoveryOptions separately + ContainerDiscoveryOptions containerDiscoveryOpts; + containerDiscoveryOpts.Init(inputConfigJson, ctx, "test"); + mDiscoveryOpts.SetContainerDiscoveryOptions(std::move(containerDiscoveryOpts)); + + mDiscoveryOpts.SetContainerInfo(std::make_shared>()); + mDiscoveryOpts.SetFullContainerList(std::make_shared>()); + mDiscoveryOpts.SetLastContainerUpdateTime(0); // Set to 0 to trigger first check + mDiscoveryOpts.SetDeduceAndSetContainerBaseDirFunc( + [](ContainerInfo& containerInfo, const CollectionPipelineContext* ctx, const FileDiscoveryOptions* opts) { + containerInfo.mRealBaseDirs.push_back("/tmp/test"); + return true; + }); + + // Add config to FileServer + FileServer::GetInstance()->AddFileDiscoveryConfig(testName, &mDiscoveryOpts, &ctx); + + // ===== Round 1: Initial full refresh with 5 containers ===== + std::ostringstream metaBuilder1; + metaBuilder1 << R"({"All": [)"; + for (int i = 0; i < 5; i++) { + if (i > 0) + metaBuilder1 << ","; + metaBuilder1 << R"({"ID": "container_)" << i << R"(", "Name": "test_container_)" << i + << R"(", "Status": "running")" + << R"(, "LogPath": "/var/log/container_)" << i << R"(")" + << R"(, "UpperDir": "/var/lib/docker/overlay2/container_)" << i << R"(/diff")" + << R"(, "Env": {"ENV_KEY_1": "value_)" << i << R"("})" + << R"(, "K8sInfo": {"Namespace": "default", "Pod": "pod-)" << i << R"(", "Labels": {"app": "app_)" + << i << R"("}}})"; + } + metaBuilder1 << R"(]})"; + + LogtailPluginMock::GetInstance()->SetUpContainersMeta(metaBuilder1.str()); + containerManager.refreshAllContainersSnapshot(); + containerManager.mLastFullUpdateTime = 100; // Manually set timestamp for predictable testing + + // Verify mContainerMap has 5 containers + { + std::lock_guard lock(containerManager.mContainerMapMutex); + EXPECT_EQ(containerManager.mContainerMap.size(), 5); + } + + // Check diffs and apply (Round 1) + mDiscoveryOpts.SetLastContainerUpdateTime(50); // Older than mLastFullUpdateTime + bool hasUpdate = containerManager.CheckContainerDiffForAllConfig(); + EXPECT_TRUE(hasUpdate) << "First CheckContainerDiffForAllConfig should detect new containers"; + + auto diff = containerManager.mConfigContainerDiffMap[testName]; + EXPECT_TRUE(diff != nullptr); + EXPECT_EQ(diff->mAdded.size(), 5) << "Should have 5 added containers in first round"; + + containerManager.ApplyContainerDiffs(); + + // Verify FileDiscoveryOptions container info was populated + auto containerInfo = mDiscoveryOpts.GetContainerInfo(); + EXPECT_TRUE(containerInfo != nullptr); + EXPECT_EQ(containerInfo->size(), 5) << "FileDiscoveryOptions should have 5 containers after first apply"; + + // ===== Round 2: Incremental update (add container_5, update container_0) ===== + std::ostringstream diffBuilder1; + diffBuilder1 << R"({"Update": [)"; + diffBuilder1 << R"({"ID": "container_0", "Name": "test_container_0_v2", "Status": "running")" + << R"(, "LogPath": "/var/log/container_0_updated")" + << R"(, "UpperDir": "/var/lib/docker/overlay2/container_0/diff")" + << R"(, "Env": {"ENV_KEY_1": "updated_0"})" + << R"(, "K8sInfo": {"Namespace": "default", "Pod": "pod-0")" + << R"(, "Labels": {"app": "app_0"}}},)" + << R"({"ID": "container_5", "Name": "test_container_5", "Status": "running")" + << R"(, "LogPath": "/var/log/container_5")" + << R"(, "UpperDir": "/var/lib/docker/overlay2/container_5/diff")" + << R"(, "Env": {"ENV_KEY_1": "value_5"})" + << R"(, "K8sInfo": {"Namespace": "default", "Pod": "pod-5")" + << R"(, "Labels": {"app": "app_5"}}})" + << R"(], "Delete": [], "Stop": []})"; + + LogtailPluginMock::GetInstance()->SetUpDiffContainersMeta(diffBuilder1.str()); + containerManager.incrementallyUpdateContainersSnapshot(); + containerManager.mLastIncrementalUpdateTime = 200; // Manually set timestamp + + // Verify incremental update in mContainerMap + { + std::lock_guard lock(containerManager.mContainerMapMutex); + EXPECT_EQ(containerManager.mContainerMap.size(), 6); // 5 + 1 = 6 + auto container0 = containerManager.mContainerMap["container_0"]; + EXPECT_EQ(container0->mLogPath, "/var/log/container_0_updated"); + EXPECT_EQ(container0->mName, "test_container_0_v2"); + } + + // Check diffs and apply (Round 2) + mDiscoveryOpts.SetLastContainerUpdateTime(150); // Between mLastFullUpdateTime and mLastIncrementalUpdateTime + hasUpdate = containerManager.CheckContainerDiffForAllConfig(); + EXPECT_TRUE(hasUpdate) << "Second CheckContainerDiffForAllConfig should detect updates"; + + diff = containerManager.mConfigContainerDiffMap[testName]; + EXPECT_TRUE(diff != nullptr); + EXPECT_EQ(diff->mAdded.size(), 1) << "Should have 1 added container (container_5)"; + EXPECT_EQ(diff->mModified.size(), 1) << "Should have 1 modified container (container_0)"; + + if (diff->mAdded.size() > 0) { + EXPECT_EQ(diff->mAdded[0]->mID, "container_5"); + } + if (diff->mModified.size() > 0) { + EXPECT_EQ(diff->mModified[0]->mID, "container_0"); + EXPECT_EQ(diff->mModified[0]->mLogPath, "/var/log/container_0_updated"); + } + + containerManager.ApplyContainerDiffs(); + + // Verify state after second round + containerInfo = mDiscoveryOpts.GetContainerInfo(); + EXPECT_EQ(containerInfo->size(), 6) << "FileDiscoveryOptions should have 6 containers after second apply"; + + std::unordered_map containerInfoMap; + for (const auto& info : *containerInfo) { + containerInfoMap[info.mRawContainerInfo->mID] = info; + } + + // Verify container_0 was updated + EXPECT_TRUE(containerInfoMap.find("container_0") != containerInfoMap.end()); + EXPECT_EQ(containerInfoMap["container_0"].mRawContainerInfo->mLogPath, "/var/log/container_0_updated"); + EXPECT_EQ(containerInfoMap["container_0"].mRawContainerInfo->mName, "test_container_0_v2"); + + // Verify container_5 was added + EXPECT_TRUE(containerInfoMap.find("container_5") != containerInfoMap.end()); + + // ===== Round 3: Incremental update (delete container_1, container_2) ===== + std::ostringstream diffBuilder2; + diffBuilder2 << R"({"Update": [], "Delete": ["container_1", "container_2"], "Stop": []})"; + + LogtailPluginMock::GetInstance()->SetUpDiffContainersMeta(diffBuilder2.str()); + containerManager.incrementallyUpdateContainersSnapshot(); + containerManager.mLastIncrementalUpdateTime = 300; // Manually set timestamp + + // Verify deletions in mContainerMap + { + std::lock_guard lock(containerManager.mContainerMapMutex); + EXPECT_EQ(containerManager.mContainerMap.size(), 4); // 6 - 2 = 4 + EXPECT_TRUE(containerManager.mContainerMap.find("container_1") == containerManager.mContainerMap.end()); + EXPECT_TRUE(containerManager.mContainerMap.find("container_2") == containerManager.mContainerMap.end()); + } + + // Check diffs and apply (Round 3) + mDiscoveryOpts.SetLastContainerUpdateTime(250); // Between 200 and 300 + hasUpdate = containerManager.CheckContainerDiffForAllConfig(); + EXPECT_TRUE(hasUpdate) << "Third CheckContainerDiffForAllConfig should detect deletions"; + + diff = containerManager.mConfigContainerDiffMap[testName]; + EXPECT_TRUE(diff != nullptr); + EXPECT_EQ(diff->mRemoved.size(), 2) << "Should have 2 removed containers (container_1, container_2)"; + + containerManager.ApplyContainerDiffs(); + + // Verify final state + containerInfo = mDiscoveryOpts.GetContainerInfo(); + EXPECT_EQ(containerInfo->size(), 4) << "FileDiscoveryOptions should have 4 containers after third apply"; + + containerInfoMap.clear(); + for (const auto& info : *containerInfo) { + containerInfoMap[info.mRawContainerInfo->mID] = info; + } + + // Verify container_1 and container_2 were removed + EXPECT_TRUE(containerInfoMap.find("container_1") == containerInfoMap.end()); + EXPECT_TRUE(containerInfoMap.find("container_2") == containerInfoMap.end()); + + // Verify remaining containers + EXPECT_TRUE(containerInfoMap.find("container_0") != containerInfoMap.end()); + EXPECT_TRUE(containerInfoMap.find("container_3") != containerInfoMap.end()); + EXPECT_TRUE(containerInfoMap.find("container_4") != containerInfoMap.end()); + EXPECT_TRUE(containerInfoMap.find("container_5") != containerInfoMap.end()); + + // Save old pointers before full refresh to verify they are replaced with new pointers + // This is critical for memory safety - after full refresh, old RawContainerInfo objects should be replaced + std::unordered_map oldRawPointers; + for (const auto& info : *containerInfo) { + oldRawPointers[info.mRawContainerInfo->mID] = info.mRawContainerInfo.get(); + } + EXPECT_EQ(oldRawPointers.size(), 4) << "Should have saved 4 old pointers"; + + // ===== Round 4: Full refresh (refreshAllContainersSnapshot) ===== + // Simulate a complete refresh from container runtime with 9 containers + // This tests that full refresh correctly handles existing + new containers + std::ostringstream metaBuilder3; + metaBuilder3 << R"({"All": [)"; + // Include existing containers (0, 3, 4, 5) and new ones (6, 7, 8, 9, 10) + std::vector containerIds = {0, 3, 4, 5, 6, 7, 8, 9, 10}; + for (size_t i = 0; i < containerIds.size(); i++) { + int id = containerIds[i]; + if (i > 0) + metaBuilder3 << ","; + metaBuilder3 << R"({"ID": "container_)" << id << R"(", "Name": "test_container_)" << id + << R"(", "Status": "running")" + << R"(, "LogPath": "/var/log/container_)" << id << R"(")" + << R"(, "UpperDir": "/var/lib/docker/overlay2/container_)" << id << R"(/diff")" + << R"(, "Env": {"ENV_KEY_1": "fullrefresh2_)" << id << R"("})" + << R"(, "K8sInfo": {"Namespace": "default", "Pod": "pod-)" << id << R"(", "Labels": {"app": "app_)" + << id << R"("}}})"; + } + metaBuilder3 << R"(]})"; + + LogtailPluginMock::GetInstance()->SetUpContainersMeta(metaBuilder3.str()); + containerManager.refreshAllContainersSnapshot(); + containerManager.mLastFullUpdateTime = 400; // Manually set timestamp + + // Verify mContainerMap has 9 containers + { + std::lock_guard lock(containerManager.mContainerMapMutex); + EXPECT_EQ(containerManager.mContainerMap.size(), 9); + // Verify new containers exist + EXPECT_TRUE(containerManager.mContainerMap.find("container_6") != containerManager.mContainerMap.end()); + EXPECT_TRUE(containerManager.mContainerMap.find("container_7") != containerManager.mContainerMap.end()); + EXPECT_TRUE(containerManager.mContainerMap.find("container_8") != containerManager.mContainerMap.end()); + EXPECT_TRUE(containerManager.mContainerMap.find("container_9") != containerManager.mContainerMap.end()); + EXPECT_TRUE(containerManager.mContainerMap.find("container_10") != containerManager.mContainerMap.end()); + } + + // Check diffs and apply (Round 4) + mDiscoveryOpts.SetLastContainerUpdateTime(350); // Older than mLastFullUpdateTime + hasUpdate = containerManager.CheckContainerDiffForAllConfig(); + EXPECT_TRUE(hasUpdate) << "Fourth CheckContainerDiffForAllConfig should detect full refresh changes"; + + diff = containerManager.mConfigContainerDiffMap[testName]; + EXPECT_TRUE(diff != nullptr); + // Full refresh resets baseline, so all 9 containers are treated as "added" + EXPECT_EQ(diff->mAdded.size(), 9) << "Should have 9 added containers (full refresh resets baseline)"; + + containerManager.ApplyContainerDiffs(); + + // Verify FileDiscoveryOptions has 9 containers + containerInfo = mDiscoveryOpts.GetContainerInfo(); + EXPECT_EQ(containerInfo->size(), 9) << "FileDiscoveryOptions should have 9 containers after fourth apply"; + + containerInfoMap.clear(); + for (const auto& info : *containerInfo) { + containerInfoMap[info.mRawContainerInfo->mID] = info; + } + + // Verify all 9 expected containers are present and validate their details + EXPECT_EQ(containerInfoMap.size(), 9); + for (int id : containerIds) { + std::string containerId = "container_" + std::to_string(id); + EXPECT_TRUE(containerInfoMap.find(containerId) != containerInfoMap.end()) + << "Container " << containerId << " should be in FileDiscoveryOptions"; + + if (containerInfoMap.find(containerId) != containerInfoMap.end()) { + const auto& containerInfoEntry = containerInfoMap[containerId]; + + // Verify container details + EXPECT_EQ(containerInfoEntry.mRawContainerInfo->mID, containerId); + EXPECT_EQ(containerInfoEntry.mRawContainerInfo->mStatus, "running"); + EXPECT_EQ(containerInfoEntry.mRawContainerInfo->mLogPath, "/var/log/container_" + std::to_string(id)); + + // CRITICAL: Verify that pointers were replaced with new ones after full refresh + // This ensures that refreshAllContainersSnapshot creates new RawContainerInfo objects + // rather than reusing old ones, which is essential for memory safety + auto oldPtrIt = oldRawPointers.find(containerId); + if (oldPtrIt != oldRawPointers.end()) { + RawContainerInfo* oldPtr = oldPtrIt->second; + RawContainerInfo* newPtr = containerInfoEntry.mRawContainerInfo.get(); + EXPECT_NE(oldPtr, newPtr) + << "Container " << containerId << " MUST have NEW pointer after full refresh. " + << "Old pointer: " << oldPtr << ", New pointer: " << newPtr << ". " + << "Same pointer means use-after-free risk!"; + + std::cout << "Container " << containerId << " pointer verification: Old=" << oldPtr + << ", New=" << newPtr << " (different=OK)" << std::endl; + } + } + } + + // Verify deleted containers (1, 2) are still not present + EXPECT_TRUE(containerInfoMap.find("container_1") == containerInfoMap.end()) + << "Previously deleted container_1 should not reappear"; + EXPECT_TRUE(containerInfoMap.find("container_2") == containerInfoMap.end()) + << "Previously deleted container_2 should not reappear"; + + // ===== Round 5: Verify no changes when config is up-to-date ===== + mDiscoveryOpts.SetLastContainerUpdateTime(450); // Newer than mLastFullUpdateTime + hasUpdate = containerManager.CheckContainerDiffForAllConfig(); + EXPECT_FALSE(hasUpdate) << "CheckContainerDiffForAllConfig should not detect updates when config is up-to-date"; + + // Cleanup + FileServer::GetInstance()->RemoveFileDiscoveryConfig(testName); + + std::cout << "Sequential container diff and apply test completed successfully" << std::endl; +} + void ContainerManagerUnittest::parseLabelFilters(const Json::Value& filtersJson, MatchCriteriaFilter& filter) const { // Clear existing filters to ensure clean state filter.mIncludeFields.mFieldsMap.clear(); @@ -1352,6 +2223,12 @@ UNIT_TEST_CASE(ContainerManagerUnittest, TestLoadContainerInfoFromContainersForm UNIT_TEST_CASE(ContainerManagerUnittest, TestLoadContainerInfoVersionHandling) UNIT_TEST_CASE(ContainerManagerUnittest, TestSaveContainerInfoWithVersion) UNIT_TEST_CASE(ContainerManagerUnittest, TestContainerMatchingConsistency) +UNIT_TEST_CASE(ContainerManagerUnittest, TestcomputeMatchedContainersDiffWithSnapshot) +UNIT_TEST_CASE(ContainerManagerUnittest, TestNullRawContainerInfoHandling) +UNIT_TEST_CASE(ContainerManagerUnittest, TestConcurrentContainerMapAccess_T1T2) +UNIT_TEST_CASE(ContainerManagerUnittest, TestConcurrentContainerMapAccess_T1T3) +UNIT_TEST_CASE(ContainerManagerUnittest, TestConcurrentContainerMapAccess_T1T2T3) +UNIT_TEST_CASE(ContainerManagerUnittest, TestSequentialContainerDiffAndApply) } // namespace logtail diff --git a/core/unittest/event_handler/ModifyHandlerUnittest.cpp b/core/unittest/event_handler/ModifyHandlerUnittest.cpp index 735a4f1e81..61d3b2819c 100644 --- a/core/unittest/event_handler/ModifyHandlerUnittest.cpp +++ b/core/unittest/event_handler/ModifyHandlerUnittest.cpp @@ -221,7 +221,7 @@ class ModifyHandlerUnittest : public ::testing::Test { void stopContainer(const std::string containerID) { for (auto& containerInfo : *(discoveryOpts.mContainerInfos)) { if (containerInfo.mRawContainerInfo->mID == containerID) { - containerInfo.mRawContainerInfo->mStopped = true; + containerInfo.mRawContainerInfo->mStopped.store(true); break; } }