Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 99 additions & 114 deletions core/container_manager/ContainerManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ void ContainerManager::pollingLoop() {
}

void ContainerManager::ApplyContainerDiffs() {
WriteLock lock(mFileDiscoveryConfigsRWLock);
auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs();
std::vector<std::shared_ptr<MatchedContainerInfo>> configResults;
for (auto& pair : mConfigContainerDiffMap) {
Expand Down Expand Up @@ -138,6 +139,10 @@ void ContainerManager::ApplyContainerDiffs() {
if (containerInfos) {
const auto& basePathInfos = options->GetBasePathInfos();
for (const auto& info : *containerInfos) {
if (!info.mRawContainerInfo) {
continue;
}

// Validate array size consistency
if (info.mRealBaseDirs.size() != basePathInfos.size()) {
LOG_WARNING(
Expand Down Expand Up @@ -175,27 +180,14 @@ void ContainerManager::ApplyContainerDiffs() {
mConfigContainerDiffMap.clear();
}

void ContainerManager::sendAllMatchedContainerInfo() {
std::vector<std::shared_ptr<MatchedContainerInfo>> configResults;
auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs();

for (auto& pair : nameConfigMap) {
FileDiscoveryOptions* options = pair.second.first;
if (options->IsContainerDiscoveryEnabled()) {
if (options->GetContainerDiscoveryOptions().mCollectingContainersMeta
&& options->GetContainerDiscoveryOptions().mMatchedContainerInfo) {
configResults.push_back(options->GetContainerDiscoveryOptions().mMatchedContainerInfo);
}
}
}
sendMatchedContainerInfo(configResults);
}

bool ContainerManager::CheckContainerDiffForAllConfig() {
if (!mIsRunning) {
return false;
}
bool isUpdate = false;

WriteLock lock(mFileDiscoveryConfigsRWLock);
auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs();
for (auto itr = nameConfigMap.begin(); itr != nameConfigMap.end(); ++itr) {
FileDiscoveryOptions* options = itr->second.first;
Expand All @@ -209,18 +201,43 @@ bool ContainerManager::CheckContainerDiffForAllConfig() {
return isUpdate;
}


// logtail_containers 自监控数据上报逻辑
void ContainerManager::sendAllMatchedContainerInfo() {
std::vector<std::shared_ptr<MatchedContainerInfo>> configResults;
{
ReadLock lock(mFileDiscoveryConfigsRWLock);
auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs();

for (auto& pair : nameConfigMap) {
FileDiscoveryOptions* options = pair.second.first;
if (options->IsContainerDiscoveryEnabled()) {
if (options->GetContainerDiscoveryOptions().mCollectingContainersMeta
&& options->GetContainerDiscoveryOptions().mMatchedContainerInfo) {
configResults.push_back(options->GetContainerDiscoveryOptions().mMatchedContainerInfo);
}
}
}
}
sendMatchedContainerInfo(configResults);
}

// logtail_containers 自监控数据上报逻辑
void ContainerManager::UpdateMatchedContainerInfoPipeline(CollectionPipelineContext* ctx, size_t inputIndex) {
WriteLock lock(mMatchedContainerInfoPipelineMux);
mMatchedContainerInfoPipelineCtx = ctx;
mMatchedContainerInfoInputIndex = inputIndex;
}


// logtail_containers 自监控数据上报逻辑
void ContainerManager::RemoveMatchedContainerInfoPipeline() {
WriteLock lock(mMatchedContainerInfoPipelineMux);
mMatchedContainerInfoPipelineCtx = nullptr;
mMatchedContainerInfoInputIndex = 0;
}

// logtail_containers 自监控数据上报逻辑
void ContainerManager::sendMatchedContainerInfo(std::vector<std::shared_ptr<MatchedContainerInfo>> configResults) {
ReadLock lock(mMatchedContainerInfoPipelineMux);
if (mMatchedContainerInfoPipelineCtx == nullptr) {
Expand Down Expand Up @@ -275,33 +292,47 @@ bool ContainerManager::checkContainerDiffForOneConfig(FileDiscoveryOptions* opti
const CollectionPipelineContext* ctx) {
// If this config's container update time is newer than or equal to global update time,
// return the cached result if it exists
if (options->GetLastContainerUpdateTime() > mLastUpdateTime) {
bool refrashAllContainers = false;
if (options->GetLastContainerUpdateTime() <= mLastFullUpdateTime) {
refrashAllContainers = true;
} else if (options->GetLastContainerUpdateTime() <= mLastIncrementalUpdateTime) {
refrashAllContainers = false;
} else {
return false;
}

std::unordered_map<std::string, std::shared_ptr<RawContainerInfo>> containerInfoMap;
const auto& containerInfos = options->GetContainerInfo();
if (containerInfos) {
for (const auto& info : *containerInfos) {
containerInfoMap[info.mRawContainerInfo->mID] = info.mRawContainerInfo;
}
}
std::vector<std::string> removedList;
std::vector<std::string> matchAddedList;
ContainerDiff diff;
computeMatchedContainersDiff(*(options->GetFullContainerList()),
containerInfoMap,
options->GetContainerDiscoveryOptions().mContainerFilters,
options->GetContainerDiscoveryOptions().mIsStdio,
diff);
if (refrashAllContainers) {
options->SetFullContainerList(std::make_shared<std::set<std::string>>());
computeMatchedContainersDiff(*(options->GetFullContainerList()),
containerInfoMap,
options->GetContainerDiscoveryOptions().mContainerFilters,
options->GetContainerDiscoveryOptions().mIsStdio,
diff);
} else {
const auto& containerInfos = options->GetContainerInfo();
if (containerInfos) {
for (const auto& info : *containerInfos) {
containerInfoMap[info.mRawContainerInfo->mID] = info.mRawContainerInfo;
}
}
computeMatchedContainersDiff(*(options->GetFullContainerList()),
containerInfoMap,
options->GetContainerDiscoveryOptions().mContainerFilters,
options->GetContainerDiscoveryOptions().mIsStdio,
diff);
}

LOG_DEBUG(
sLogger,
("diff", diff.ToString())("configName", ctx->GetConfigName())(
"containerFilters", options->GetContainerDiscoveryOptions().mContainerFilters.ToString())(
"fullContainerList", options->GetFullContainerList()->size())("containerInfos", containerInfos->size())(
"lastConfigContainerUpdateTime", options->GetLastContainerUpdateTime())("mLastUpdateTime",
mLastUpdateTime));
"fullContainerList", options->GetFullContainerList()->size())("containerInfoMap", containerInfoMap.size())(
"lastConfigContainerUpdateTime", options->GetLastContainerUpdateTime())(
"mLastFullUpdateTime", mLastFullUpdateTime)("mLastIncrementalUpdateTime", mLastIncrementalUpdateTime));

// Update the config's container update time when there are changes
options->SetLastContainerUpdateTime(time(nullptr));
Expand Down Expand Up @@ -365,7 +396,7 @@ void ContainerManager::incrementallyUpdateContainersSnapshot() {
}

if (hasChanges) {
mLastUpdateTime = time(nullptr);
mLastIncrementalUpdateTime = time(nullptr);
}
}

Expand Down Expand Up @@ -399,10 +430,8 @@ void ContainerManager::refreshAllContainersSnapshot() {
std::lock_guard<std::mutex> lock(mContainerMapMutex);
mContainerMap.swap(tmpContainerMap);
}
mLastUpdateTime = time(nullptr);
mLastFullUpdateTime = time(nullptr);

// Update container info pointers in all configs to point to the new RawContainerInfo objects
updateContainerInfoPointersInAllConfigs();
tmpContainerMap.clear();
}

Expand Down Expand Up @@ -471,7 +500,6 @@ std::string ContainerManager::joinContainerIDs(const std::vector<std::string>& c
}

void ContainerManager::GetContainerStoppedEvents(std::vector<Event*>& eventVec) {
const auto& nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs();
std::vector<std::string> stoppedContainerIDs;
{
std::lock_guard<std::mutex> lock(mStoppedContainerIDsMutex);
Expand All @@ -482,17 +510,27 @@ void ContainerManager::GetContainerStoppedEvents(std::vector<Event*>& eventVec)
}
LOG_INFO(sLogger, ("stoppedContainerIDs", ToString(stoppedContainerIDs)));

ReadLock lock(mFileDiscoveryConfigsRWLock);
const auto& nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs();
for (const auto& containerId : stoppedContainerIDs) {
for (auto itr = nameConfigMap.begin(); itr != nameConfigMap.end(); ++itr) {
const FileDiscoveryOptions* options = itr->second.first;
if (!options->IsContainerDiscoveryEnabled()) {
continue;
}
const auto& containerInfos = options->GetContainerInfo();
if (!containerInfos) {
// Capture a local shared_ptr snapshot to protect against concurrent modifications
// by ApplyContainerDiffs (which may call UpdateRawContainerInfo/DeleteRawContainerInfo)
auto containerInfosSnapshot = options->GetContainerInfo();
if (!containerInfosSnapshot) {
continue;
}
for (auto& info : *containerInfos) {
// Now iterate the snapshot - even if options->mContainerInfos is modified by another thread,
// our snapshot's reference count keeps the underlying vector alive
for (auto& info : *containerInfosSnapshot) {
// Protect access to mRawContainerInfo with lock
if (!info.mRawContainerInfo) {
continue;
}
if (info.mRawContainerInfo->mID == containerId) {
// 为每个真实路径生成停止事件
for (const auto& realBaseDir : info.mRealBaseDirs) {
Expand All @@ -504,7 +542,7 @@ void ContainerManager::GetContainerStoppedEvents(std::vector<Event*>& eventVec)
eventVec.push_back(pStoppedEvent);
}
}
info.mRawContainerInfo->mStopped = true;
info.mRawContainerInfo->mStopped.store(true);
LOG_DEBUG(sLogger, ("generate stop event, containerId", containerId)("configName", itr->first));
}
}
Expand Down Expand Up @@ -586,9 +624,16 @@ void ContainerManager::computeMatchedContainersDiff(
const ContainerFilters& filters,
bool isStdio,
ContainerDiff& diff) {
// Create a local snapshot of mContainerMap to avoid holding the lock for extended period
std::unordered_map<std::string, std::shared_ptr<RawContainerInfo>> containerMapSnapshot;
{
std::lock_guard<std::mutex> lock(mContainerMapMutex);
containerMapSnapshot = mContainerMap;
}

// 移除已删除的容器
for (auto it = fullContainerIDList.begin(); it != fullContainerIDList.end();) {
if (mContainerMap.find(*it) == mContainerMap.end()) {
if (containerMapSnapshot.find(*it) == containerMapSnapshot.end()) {
std::string id = *it; // 复制一份,避免 erase 后引用失效
it = fullContainerIDList.erase(it); // 删除元素并移到下一个
if (matchList.find(id) != matchList.end()) {
Expand All @@ -601,7 +646,7 @@ void ContainerManager::computeMatchedContainersDiff(

// 更新匹配的容器状态
for (auto& pair : matchList) {
if (auto it = mContainerMap.find(pair.first); it != mContainerMap.end()) {
if (auto it = containerMapSnapshot.find(pair.first); it != containerMapSnapshot.end()) {
// 更新为最新的 info
if (*pair.second != *it->second) {
diff.mModified.push_back(it->second);
Expand All @@ -610,7 +655,7 @@ void ContainerManager::computeMatchedContainersDiff(
}

// 添加新容器
for (const auto& pair : mContainerMap) {
for (const auto& pair : containerMapSnapshot) {
// 如果 fullContainerIDList 中不存在该 id
if (fullContainerIDList.find(pair.first) == fullContainerIDList.end()) {
if (!isStdio && pair.second->mStatus != "running") {
Expand Down Expand Up @@ -647,7 +692,7 @@ static Json::Value SerializeRawContainerInfo(const std::shared_ptr<RawContainerI
v["Name"] = Json::Value(info->mName);
v["UpperDir"] = Json::Value(info->mUpperDir);
v["LogPath"] = Json::Value(info->mLogPath);
v["Stopped"] = Json::Value(info->mStopped);
v["Stopped"] = Json::Value(info->mStopped.load());
v["Status"] = Json::Value(info->mStatus);

// mounts
Expand Down Expand Up @@ -717,7 +762,7 @@ static std::shared_ptr<RawContainerInfo> DeserializeRawContainerInfo(const Json:
info->mLogPath = v["LogPath"].asString();
}
if (v.isMember("Stopped") && v["Stopped"].isBool()) {
info->mStopped = v["Stopped"].asBool();
info->mStopped.store(v["Stopped"].asBool());
}
if (v.isMember("Status") && v["Status"].isString()) {
info->mStatus = v["Status"].asString();
Expand Down Expand Up @@ -1022,81 +1067,21 @@ void ContainerManager::loadContainerInfoFromContainersFormat(const Json::Value&
mContainerMap.swap(tmp);
}
// Apply containers to all existing configs
auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs();

LOG_DEBUG(sLogger, ("recover containers to nameConfigMap", nameConfigMap.size()));

std::vector<std::shared_ptr<RawContainerInfo>> allContainers;
for (auto itr = nameConfigMap.begin(); itr != nameConfigMap.end(); ++itr) {
FileDiscoveryOptions* options = itr->second.first;
if (options->IsContainerDiscoveryEnabled()) {
checkContainerDiffForOneConfig(options, itr->second.second);
}
}
LOG_INFO(sLogger, ("load container state from docker_path_config.json (v1.0.0)", configPath));
}
}

void ContainerManager::updateContainerInfoPointersInAllConfigs() {
auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs();
for (const auto& configPair : nameConfigMap) {
FileDiscoveryOptions* options = configPair.second.first;
if (!options->IsContainerDiscoveryEnabled()) {
continue;
}

const auto& containerInfos = options->GetContainerInfo();
if (!containerInfos) {
continue;
}

// Update RawContainerInfo pointers for each container in this config
for (auto& containerInfo : *containerInfos) {
const std::string& containerId = containerInfo.mRawContainerInfo->mID;
std::lock_guard<std::mutex> lock(mContainerMapMutex);
auto it = mContainerMap.find(containerId);
if (it != mContainerMap.end()) {
// Update the pointer to point to the new RawContainerInfo object
containerInfo.mRawContainerInfo = it->second;
} else {
// Container no longer exists in the global map, this should not happen
// but log a warning if it does
LOG_WARNING(sLogger, ("container not found in global map during pointer update", containerId));
}
}
}
}

void ContainerManager::updateContainerInfoPointersForContainers(const std::vector<std::string>& containerIDs) {
auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs();
for (const auto& configPair : nameConfigMap) {
FileDiscoveryOptions* options = configPair.second.first;
if (!options->IsContainerDiscoveryEnabled()) {
continue;
}
{
WriteLock lock(mFileDiscoveryConfigsRWLock);
auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs();

const auto& containerInfos = options->GetContainerInfo();
if (!containerInfos) {
continue;
}
LOG_DEBUG(sLogger, ("recover containers to nameConfigMap", nameConfigMap.size()));

// Update RawContainerInfo pointers only for the specified container IDs
for (auto& containerInfo : *containerInfos) {
const std::string& containerId = containerInfo.mRawContainerInfo->mID;
// Check if this container ID is in the list of updated containers
if (std::find(containerIDs.begin(), containerIDs.end(), containerId) != containerIDs.end()) {
std::lock_guard<std::mutex> lock(mContainerMapMutex);
auto it = mContainerMap.find(containerId);
if (it != mContainerMap.end()) {
// Update the pointer to point to the new RawContainerInfo object
containerInfo.mRawContainerInfo = it->second;
} else {
// Container no longer exists in the global map, this should not happen
// but log a warning if it does
LOG_WARNING(sLogger, ("container not found in global map during pointer update", containerId));
std::vector<std::shared_ptr<RawContainerInfo>> allContainers;
for (auto itr = nameConfigMap.begin(); itr != nameConfigMap.end(); ++itr) {
FileDiscoveryOptions* options = itr->second.first;
if (options->IsContainerDiscoveryEnabled()) {
checkContainerDiffForOneConfig(options, itr->second.second);
}
}
}
LOG_INFO(sLogger, ("load container state from docker_path_config.json (v1.0.0)", configPath));
}
}

Expand Down
Loading
Loading