Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ if (LINUX)
endif()
set(SUB_DIRECTORIES_LIST ${SUB_DIRECTORIES_LIST} ebpf ebpf/type ebpf/type/table ebpf/util ebpf/util/sampler ebpf/protocol/http ebpf/protocol ebpf/plugin/file_security ebpf/plugin/network_observer ebpf/plugin/process_security ebpf/plugin/network_security ebpf/plugin ebpf/observer ebpf/security
prometheus prometheus/labels prometheus/schedulers prometheus/async prometheus/component
host_monitor host_monitor/collector host_monitor/common forward forward/loongsuite
host_monitor host_monitor/entity host_monitor/collector host_monitor/common forward forward/loongsuite
)
elseif(MSVC)
endif ()
Expand Down
2 changes: 1 addition & 1 deletion core/collection_pipeline/plugin/PluginRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ void PluginRegistry::LoadStaticPlugins() {
if (BOOL_FLAG(enable_ebpf_network_secure)) {
RegisterContinuousInputCreator(new StaticInputCreator<InputNetworkSecurity>(), true);
}
RegisterContinuousInputCreator(new StaticInputCreator<InputHostMeta>(), true);
RegisterContinuousInputCreator(new StaticInputCreator<InputHostMeta>(), false);
RegisterContinuousInputCreator(new StaticInputCreator<InputHostMonitor>(), true);
RegisterContinuousInputCreator(new StaticInputCreator<InputForward>());
#endif
Expand Down
3 changes: 3 additions & 0 deletions core/common/ProcParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,9 @@ bool ProcParser::ParseProcessStat(pid_t pid, const std::string& line, ProcessSta
if (!StringTo(words[EnumProcessStat::tty_nr - offset], ps.tty)) {
LOG_WARNING(sLogger, ("Invalid tty_nr", words[EnumProcessStat::tty_nr - offset]));
}
if (!StringTo(words[EnumProcessStat::flags - offset], ps.flags)) {
LOG_WARNING(sLogger, ("Invalid flags", words[EnumProcessStat::flags - offset]));
}
if (!StringTo(words[EnumProcessStat::minflt - offset], ps.minorFaults)) {
LOG_WARNING(sLogger, ("Invalid minflt", words[EnumProcessStat::minflt - offset]));
}
Expand Down
11 changes: 11 additions & 0 deletions core/common/ProcParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ struct ProcessStat {
uint64_t majorFaults = 0;
pid_t parentPid = 0;
int tty = 0;
uint32_t flags = 0; // Process flags from /proc/[pid]/stat (field 9)
int priority = 0;
int nice = 0;
int numThreads = 0;
Expand Down Expand Up @@ -236,6 +237,16 @@ class ProcParser {

std::unordered_set<int> GetAllPids();

// Kernel thread detection
// Checks if the PF_KTHREAD flag is set which identifies this process as a kernel thread
// See: https://github.com/torvalds/linux/commit/7b34e4283c685f5cc6ba6d30e939906eee0d4bcf
static bool IsKernelThread(uint32_t flags) {
constexpr uint32_t KTHREAD_FLAG = 0x00200000; // Kernel thread flag (PF_KTHREAD)
return (flags & KTHREAD_FLAG) == KTHREAD_FLAG;
}

static bool IsKernelThread(const ProcessStat& ps) { return IsKernelThread(ps.flags); }

private:
std::filesystem::path procPidPath(uint32_t pid, const std::string& subpath) const;
std::string readPidFile(uint32_t pid, const std::string& filename) const;
Expand Down
3 changes: 3 additions & 0 deletions core/constants/EntityConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const string DEFAULT_CONTENT_KEY_PROCESS_BINARY = "binary";
const string DEFAULT_CONTENT_KEY_PROCESS_ARGUMENTS = "arguments";
const string DEFAULT_CONTENT_KEY_PROCESS_LANGUAGE = "language";
const string DEFAULT_CONTENT_KEY_PROCESS_CONTAINER_ID = "container_id";
const string DEFAULT_CONTENT_KEY_PROCESS_STATE = "state";
const string DEFAULT_CONTENT_KEY_PROCESS_RUNTIME_SECONDS = "runtime_seconds";
const string DEFAULT_CONTENT_KEY_PROCESS_LISTENING_PORTS = "listening_ports";

// link
const string DEFAULT_CONTENT_KEY_SRC_DOMAIN = "__src_domain__";
Expand Down
3 changes: 3 additions & 0 deletions core/constants/EntityConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ extern const std::string DEFAULT_CONTENT_KEY_PROCESS_BINARY;
extern const std::string DEFAULT_CONTENT_KEY_PROCESS_ARGUMENTS;
extern const std::string DEFAULT_CONTENT_KEY_PROCESS_LANGUAGE;
extern const std::string DEFAULT_CONTENT_KEY_PROCESS_CONTAINER_ID;
extern const std::string DEFAULT_CONTENT_KEY_PROCESS_STATE;
extern const std::string DEFAULT_CONTENT_KEY_PROCESS_RUNTIME_SECONDS;
extern const std::string DEFAULT_CONTENT_KEY_PROCESS_LISTENING_PORTS;

// link
extern const std::string DEFAULT_CONTENT_KEY_SRC_DOMAIN;
Expand Down
1 change: 1 addition & 0 deletions core/host_monitor/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ inline constexpr StringView DEFAULT_USER_ID_LABEL = "user_id";
#else
inline constexpr StringView DEFAULT_HOST_IP_LABEL = "host_ip";
#endif
inline constexpr StringView DEFAULT_HOST_NAME_LABEL = "host_name";
const extern std::string NVSMI;
const extern std::string LIB_DCGM;
const extern std::filesystem::path NVIDIACTL;
Expand Down
15 changes: 13 additions & 2 deletions core/host_monitor/HostMonitorInputRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "host_monitor/Constants.h"
#include "host_monitor/HostMonitorTimerEvent.h"
#include "host_monitor/collector/CPUCollector.h"
#include "host_monitor/collector/CollectorConstants.h"
#include "host_monitor/collector/DiskCollector.h"
#include "host_monitor/collector/GPUCollector.h"
#include "host_monitor/collector/MemCollector.h"
Expand All @@ -62,14 +63,14 @@ DECLARE_FLAG_INT32(self_check_collector_interval);
namespace logtail {

HostMonitorInputRunner::HostMonitorInputRunner() {
RegisterCollector<ProcessEntityCollector>();
RegisterCollector<CPUCollector>();
RegisterCollector<SystemCollector>();
RegisterCollector<MemCollector>();
RegisterCollector<DiskCollector>();
RegisterCollector<ProcessCollector>();
RegisterCollector<NetCollector>();
RegisterCollector<GPUCollector>();
RegisterCollector<ProcessEntityCollector>(); // 注册 ProcessEntityCollector

size_t threadPoolSize = 1;
// threadPoolSize should be greater than 0
Expand All @@ -83,7 +84,8 @@ HostMonitorInputRunner::HostMonitorInputRunner() {
void HostMonitorInputRunner::UpdateCollector(const std::string& configName,
const std::vector<CollectorInfo>& newCollectorInfos,
QueueKey processQueueKey,
size_t inputIndex) {
size_t inputIndex,
const std::string& inputType) {
for (size_t i = 0; i < newCollectorInfos.size(); ++i) {
const auto& collectorName = newCollectorInfos[i].name;

Expand All @@ -92,6 +94,15 @@ void HostMonitorInputRunner::UpdateCollector(const std::string& configName,
("host monitor", "collector not supported")("config", configName)("collector", collectorName));
continue;
}

// ProcessEntityCollector 只能在 input_host_meta 中使用
if (collectorName == kCollectorProcessEntity && inputType == kInputHostMonitor) {
LOG_ERROR(sLogger,
("process_entity collector is not allowed in input_host_monitor",
"process_entity can only be used in input_host_meta")("config", configName)(
"input_type", inputType)("collector", collectorName));
continue;
}
auto collector = mCollectorCreatorMap.at(collectorName)();

auto collectContext = std::make_shared<HostMonitorContext>(configName,
Expand Down
3 changes: 2 additions & 1 deletion core/host_monitor/HostMonitorInputRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ class HostMonitorInputRunner : public InputRunner {
void UpdateCollector(const std::string& configName,
const std::vector<CollectorInfo>& newCollectorInfos,
QueueKey processQueueKey,
size_t inputIndex);
size_t inputIndex,
const std::string& inputType = "");
void RemoveCollector(const std::string& configName);
void RemoveAllCollector();

Expand Down
166 changes: 166 additions & 0 deletions core/host_monitor/LinuxSystemInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ using namespace std::chrono;
#include <pwd.h>
#include <sys/statvfs.h>

#include <algorithm>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/program_options.hpp>
#include <filesystem>
#include <fstream>
#include <iostream>
#include <sstream>
#include <unordered_set>

#include "common/FileSystemUtil.h"
#include "common/StringTools.h"
Expand Down Expand Up @@ -1359,4 +1363,166 @@ bool LinuxSystemInterface::GetGPUInformationOnce(GPUInformation& gpuInfo) {

return success;
}

// ========== 端口采集实现 ==========

// TCP 连接信息结构
struct TcpConnection {
uint32_t localAddr;
uint16_t localPort;
uint32_t remoteAddr;
uint16_t remotePort;
uint8_t state;
uint64_t inode;
};

// 解析 /proc/net/tcp 或 /proc/net/tcp6 文件
static std::vector<TcpConnection> ParseTcpTable(const std::filesystem::path& tcpFile) {
std::vector<TcpConnection> connections;
std::ifstream file(tcpFile);
if (!file.is_open()) {
return connections;
}

std::string line;
std::getline(file, line); // 跳过表头

while (std::getline(file, line)) {
// 格式示例:
// sl local_address rem_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode
// 0: 00000000:0CEA 00000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 12345 1 ...

std::istringstream iss(line);
std::string sl, localAddrHex, remAddrHex, stHex;
uint64_t inode = 0;

// 读取字段
if (!(iss >> sl >> localAddrHex >> remAddrHex >> stHex)) {
continue;
}

// 跳过中间字段,直接找到 inode
// 实际字段(awk $1, $2, ...):
// $1:sl $2:local_address $3:rem_address $4:st $5-$9:中间字段 $10:inode
// 已读取: sl($1), local_address($2), rem_address($3), st($4)
// 需要跳过: $5-$9 (5个字段)
// 然后读取: inode($10)
std::string temp;
for (int i = 0; i < 5; ++i) { // 跳过 5 个字段
if (!(iss >> temp)) {
break;
}
}
if (!(iss >> inode)) {
continue;
}

// 解析地址和端口(格式:ADDR:PORT,十六进制)
size_t colonPos = localAddrHex.find(':');
if (colonPos == std::string::npos) {
continue;
}

TcpConnection conn;
try {
conn.localAddr = std::stoul(localAddrHex.substr(0, colonPos), nullptr, 16);
conn.localPort = std::stoul(localAddrHex.substr(colonPos + 1), nullptr, 16);

colonPos = remAddrHex.find(':');
if (colonPos != std::string::npos) {
conn.remoteAddr = std::stoul(remAddrHex.substr(0, colonPos), nullptr, 16);
conn.remotePort = std::stoul(remAddrHex.substr(colonPos + 1), nullptr, 16);
}

conn.state = std::stoul(stHex, nullptr, 16);
conn.inode = inode;

connections.push_back(conn);
} catch (const std::exception& e) {
// 解析失败,跳过该行
continue;
}
}

return connections;
}

// 获取进程的 socket inodes
static std::unordered_set<uint64_t> GetProcessSocketInodes(pid_t pid) {
std::unordered_set<uint64_t> inodes;
std::filesystem::path fdDir = PROCESS_DIR / std::to_string(pid) / "fd";

try {
if (!std::filesystem::exists(fdDir)) {
return inodes;
}

for (const auto& entry : std::filesystem::directory_iterator(fdDir)) {
if (!entry.is_symlink()) {
continue;
}

// 读取符号链接目标(如:socket:[12345])
std::error_code ec;
auto target = std::filesystem::read_symlink(entry.path(), ec);
if (ec) {
continue;
}

std::string targetStr = target.string();
if (targetStr.find("socket:[") == 0) {
// 提取 inode
size_t start = 8; // "socket:[" 长度
size_t end = targetStr.find(']', start);
if (end != std::string::npos) {
try {
uint64_t inode = std::stoull(targetStr.substr(start, end - start));
inodes.insert(inode);
} catch (const std::exception&) {
continue;
}
}
}
}
} catch (const std::filesystem::filesystem_error&) {
// 进程可能已退出,忽略错误
}

return inodes;
}

std::vector<uint16_t> LinuxSystemInterface::GetProcessListeningPortsOnce(pid_t pid) {
std::vector<uint16_t> ports;

// 步骤1:解析全局 TCP 连接表
auto tcpConnections = ParseTcpTable(PROCESS_DIR / PROCESS_NET_TCP);
auto tcp6Connections = ParseTcpTable(PROCESS_DIR / PROCESS_NET_TCP6);

// 合并 IPv4 和 IPv6 连接
tcpConnections.insert(tcpConnections.end(), tcp6Connections.begin(), tcp6Connections.end());

// 步骤2:获取进程的 socket inodes
auto socketInodes = GetProcessSocketInodes(pid);
if (socketInodes.empty()) {
return ports;
}

// 步骤3:匹配 inode 并提取 LISTEN 状态的端口
// TCP_LISTEN = 0x0A (十进制 10)
const uint8_t TCP_LISTEN = 0x0A;

std::unordered_set<uint16_t> uniquePorts; // 去重
for (const auto& conn : tcpConnections) {
if (conn.state == TCP_LISTEN && socketInodes.count(conn.inode) > 0) {
uniquePorts.insert(conn.localPort);
}
}

// 转换为有序列表
ports.assign(uniquePorts.begin(), uniquePorts.end());
std::sort(ports.begin(), ports.end());

return ports;
}

} // namespace logtail
1 change: 1 addition & 0 deletions core/host_monitor/LinuxSystemInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class LinuxSystemInterface : public SystemInterface {

bool InitGPUCollectorOnce(const FieldMap& fieldMap) override;
bool GetGPUInformationOnce(GPUInformation& gpuInfo) override;
std::vector<uint16_t> GetProcessListeningPortsOnce(pid_t pid) override;

uint64_t GetMemoryValue(char unit, uint64_t value);
bool GetProcessCmdlineStringOnce(pid_t pid, ProcessCmdlineString& cmdline) override;
Expand Down
4 changes: 4 additions & 0 deletions core/host_monitor/SystemInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,4 +571,8 @@ void SystemInterface::UpdateCacheMetrics(size_t cacheSizeBefore, size_t cacheSiz
ADD_COUNTER(mCacheHitTotal, 1);
}

std::vector<uint16_t> SystemInterface::GetProcessListeningPorts(pid_t pid) {
return GetProcessListeningPortsOnce(pid);
}

} // namespace logtail
4 changes: 4 additions & 0 deletions core/host_monitor/SystemInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,9 @@ class SystemInterface {
bool GetNetInterfaceInformation(time_t now, NetInterfaceInformation& netInterfaceInfo);
bool InitGPUCollector(const FieldMap& fieldMap);
bool GetGPUInformation(time_t now, GPUInformation& gpuInfo);

// 获取进程监听端口(不使用缓存,直接调用)
std::vector<uint16_t> GetProcessListeningPorts(pid_t pid);
explicit SystemInterface(size_t cacheSize = INT32_FLAG(system_interface_cache_queue_size))
: mSystemInformationCache(),
mCPUInformationCache(cacheSize),
Expand Down Expand Up @@ -794,6 +797,7 @@ class SystemInterface {
virtual bool GetNetInterfaceInformationOnce(NetInterfaceInformation& netInterfaceInfo) = 0;
virtual bool InitGPUCollectorOnce(const FieldMap& fieldMap) = 0;
virtual bool GetGPUInformationOnce(GPUInformation& gpuInfo) = 0;
virtual std::vector<uint16_t> GetProcessListeningPortsOnce(pid_t pid) = 0;

SystemInformation mSystemInformationCache;
SystemInformationCache<CPUInformation> mCPUInformationCache;
Expand Down
7 changes: 7 additions & 0 deletions core/host_monitor/collector/CollectorConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@

namespace logtail {

// Input plugin names
constexpr StringView kInputHostMeta("input_host_meta");
constexpr StringView kInputHostMonitor("input_host_monitor");

// Collector names
constexpr StringView kCollectorProcessEntity("process_entity");

// Common tag keys
constexpr StringView kTagKeyM("m");
constexpr StringView kTagKeyHostname("hostname");
Expand Down
Loading
Loading