Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
129 commits
Select commit Hold shift + click to select a range
fd174f5
rewrite journal
StartE Aug 25, 2025
657fecf
update
StartE Aug 25, 2025
a25283d
UPDATE
StartE Aug 25, 2025
dd49465
UPDATE
StartE Aug 25, 2025
382879c
add journal server
StartE Aug 27, 2025
da8c7e9
update journal server
StartE Aug 27, 2025
7a4798e
update
StartE Aug 27, 2025
aa3a403
update
StartE Aug 27, 2025
6214e84
update
StartE Aug 27, 2025
c460193
UPDATE READER
StartE Aug 27, 2025
9b24819
UPDATE FILTER
StartE Aug 27, 2025
76b70fe
DEBUG JOURNAL READER
StartE Aug 27, 2025
62534f4
update data
StartE Aug 28, 2025
69a2cef
UPDATE
StartE Aug 28, 2025
055badd
SUPPORT FILTER
StartE Aug 28, 2025
b6a5451
UPDATE
StartE Aug 28, 2025
13e8916
UPDATE
StartE Aug 28, 2025
54b94e9
add collection guard
StartE Aug 28, 2025
15064fa
re-org code
StartE Aug 28, 2025
23fb545
JOURNAL CHECKPOINT
StartE Aug 28, 2025
f3855f0
update cmake list
StartE Aug 28, 2025
4ff93da
update
StartE Aug 28, 2025
95cf483
update
StartE Aug 28, 2025
c95a305
update
StartE Aug 28, 2025
93d285d
ADD DOC & optimize filter
StartE Aug 28, 2025
bf856af
UPDATE
StartE Sep 18, 2025
993ba5b
update 1
StartE Sep 18, 2025
f05ae01
update jourNAL
StartE Sep 19, 2025
f5d3a7d
update
StartE Sep 19, 2025
615a0d4
add comment
StartE Sep 19, 2025
efd9416
update
StartE Sep 19, 2025
6815ee3
update
StartE Sep 19, 2025
3d4ff7c
update
StartE Sep 19, 2025
b30db05
update
StartE Sep 19, 2025
d0f146a
UPDATE
StartE Sep 19, 2025
ce5f2c6
remove useless
StartE Sep 25, 2025
ca660cc
update docs
StartE Sep 25, 2025
87662a5
keep c++ filter same with go filter
StartE Sep 25, 2025
f2eb3f3
update journal filter
StartE Sep 25, 2025
5160c55
add protect
StartE Sep 26, 2025
14ccad3
update connection gurad to protect connection
StartE Sep 26, 2025
3b60436
FIX
StartE Sep 26, 2025
077256e
add protection for reader
StartE Sep 26, 2025
b23c2d6
update
StartE Sep 26, 2025
99c6aed
update
StartE Sep 26, 2025
3b372d3
optimze code
StartE Sep 26, 2025
6bfa39b
remove connection reset logic
StartE Oct 9, 2025
f6fa951
add epoll event fd for connection, and reader using same inotify with…
StartE Oct 9, 2025
247ec7f
remove useless checkpoint code
StartE Oct 15, 2025
fb155f0
update
StartE Oct 15, 2025
d8f2518
remove checkpoint
StartE Oct 16, 2025
1c93c46
remove useless timemanager
StartE Oct 16, 2025
8f295ff
update code
StartE Oct 16, 2025
76dbee0
mv processer
StartE Oct 17, 2025
49ca5eb
reorg code
StartE Oct 17, 2025
c3c1046
remove useless code
StartE Oct 17, 2025
8dc5d39
add error recovery policy for journal
StartE Oct 17, 2025
1392274
simplify reader
StartE Oct 17, 2025
c71e146
resolve conflict
StartE Oct 17, 2025
a8e9bcc
set cursor fallback as error recovery“
StartE Oct 17, 2025
e7715cb
update
StartE Oct 20, 2025
581abbe
fix clang format
StartE Oct 21, 2025
2511632
fix ut compile error
StartE Oct 21, 2025
adbccac
clang format
StartE Oct 21, 2025
e20c249
fix windows compile
StartE Oct 21, 2025
dffeeb5
clang format
StartE Oct 21, 2025
57015a5
add ut
StartE Oct 21, 2025
6a166c6
skip journal server comiler for windows
StartE Oct 22, 2025
9f5008a
optimze ut
StartE Oct 22, 2025
9f789b7
fix ut
StartE Oct 22, 2025
af5cb49
add ut
StartE Oct 22, 2025
6a0273f
update comment
StartE Oct 23, 2025
1820fbf
remove useless
StartE Oct 23, 2025
7992108
refactor: remove Windows compatibility code from journal server
StartE Oct 27, 2025
16387c0
remove useless logerr
StartE Oct 27, 2025
bec7e9d
each journal config can only have one input
StartE Oct 27, 2025
e579e6c
update
StartE Oct 27, 2025
a4d5d18
optimize code when no more jouranl logs
StartE Oct 28, 2025
bc90682
fix ut and optimize stop
StartE Oct 28, 2025
4cf3dfd
fix epoll
StartE Oct 28, 2025
de764dd
add polling
StartE Oct 28, 2025
2c1bdc7
fix
StartE Oct 28, 2025
1f0ce28
optimize
StartE Oct 29, 2025
bb4bfa4
simplify journal reader
StartE Oct 29, 2025
1430c9e
InputJournal handles plugin lifecycle only, JournalConfig takes over …
StartE Oct 30, 2025
46c907e
optimize func names
StartE Oct 30, 2025
7fd812a
optimize log send
StartE Oct 30, 2025
a917863
optimze
StartE Oct 30, 2025
9c48075
update logic
StartE Oct 30, 2025
91350a3
remove useless check
StartE Oct 30, 2025
b4e60b3
remove useless check
StartE Oct 30, 2025
9ee54d6
update reader and server
StartE Oct 31, 2025
61a6457
optimze journal log send
StartE Oct 31, 2025
d424022
Merge remote-tracking branch 'origin/main' into yili/rewrite_journal
StartE Oct 31, 2025
6e58287
fix merge conflict
StartE Oct 31, 2025
8b8e052
keep same
StartE Oct 31, 2025
0dacfa5
add connection reset logic for mmap increase with journal increase
StartE Nov 3, 2025
e23e52e
update add match parameters
StartE Nov 4, 2025
fd4c42a
optimize where the reader becomes ineffective during hot-reloading of…
StartE Nov 4, 2025
c5a0f7d
update
StartE Nov 5, 2025
3c33076
re-org journal server
StartE Nov 6, 2025
3435509
re-org journal
StartE Nov 6, 2025
7ce2a39
optimze
StartE Nov 6, 2025
775ab98
refactor the code
StartE Nov 19, 2025
5980956
refine code
StartE Nov 19, 2025
c11e8ab
fix lint
StartE Nov 19, 2025
ad3c5d6
save and restore pending data if have
StartE Nov 19, 2025
94938e5
lazy close readers to avoid use-after-closed issue
StartE Nov 20, 2025
e31b2ee
lazy close readers to avoid use-after-closed issue
StartE Nov 20, 2025
3416682
fix sigbus issue
StartE Nov 21, 2025
f7b6951
fix fd change and reader reopen
StartE Nov 24, 2025
e5d9b26
optimize code
StartE Nov 25, 2025
f1ddaf3
update
StartE Nov 26, 2025
8e86aa4
optimize
StartE Nov 26, 2025
e3914e0
add sigbus handle for getEnrty
StartE Nov 26, 2025
50e72a8
update
StartE Nov 26, 2025
edde2b1
optimize
StartE Nov 26, 2025
86c5174
optimize ut
StartE Nov 26, 2025
6ae355c
fix lint
StartE Nov 26, 2025
be5868b
fix lint and update input doc
StartE Nov 26, 2025
e105407
update
StartE Nov 26, 2025
55a069f
optimize
StartE Nov 26, 2025
8331710
optimize input md
StartE Nov 26, 2025
41d899f
Merge remote-tracking branch 'origin/main' into yili/rewrite_journal
StartE Nov 27, 2025
8b299d4
add data model for c++ doc
StartE Nov 28, 2025
886c432
update doc
StartE Nov 28, 2025
f4b139b
fix ut cmake issue of journal
StartE Dec 1, 2025
49ed5aa
update doc
StartE Dec 1, 2025
ae20f1f
update
StartE Dec 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ if (LINUX)
if (ENABLE_ENTERPRISE)
set(SUB_DIRECTORIES_LIST ${SUB_DIRECTORIES_LIST} shennong shennong/sdk apm/forward)
endif()
set(SUB_DIRECTORIES_LIST ${SUB_DIRECTORIES_LIST} ebpf ebpf/type ebpf/type/table ebpf/util ebpf/util/sampler ebpf/protocol/http ebpf/protocol ebpf/plugin/file_security ebpf/plugin/network_observer ebpf/plugin/process_security ebpf/plugin/network_security ebpf/plugin ebpf/observer ebpf/security
set(SUB_DIRECTORIES_LIST ${SUB_DIRECTORIES_LIST} journal_server journal_server/reader journal_server/manager journal_server/processor journal_server/common
ebpf ebpf/type ebpf/type/table ebpf/util ebpf/util/sampler ebpf/protocol/http ebpf/protocol ebpf/plugin/file_security ebpf/plugin/network_observer ebpf/plugin/process_security ebpf/plugin/network_security ebpf/plugin ebpf/observer ebpf/security
prometheus prometheus/labels prometheus/schedulers prometheus/async prometheus/component
host_monitor host_monitor/collector host_monitor/common forward forward/loongsuite
)
Expand Down Expand Up @@ -268,7 +269,10 @@ if(BUILD_LOGTAIL OR BUILD_LOGTAIL_SHARED_LIBRARY)
all_link(${LOGTAIL_TARGET})
common_link(${LOGTAIL_TARGET})
target_link_libraries(${LOGTAIL_TARGET} provider)

# Add systemd journal library linking
if (LINUX)
link_systemd_journal(${LOGTAIL_TARGET})
add_dependencies(${LOGTAIL_TARGET} install_coolbpf)
endif()
endif()
Expand Down
2 changes: 2 additions & 0 deletions core/collection_pipeline/plugin/PluginRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
#include "plugin/input/InputFileSecurity.h"
#include "plugin/input/InputHostMeta.h"
#include "plugin/input/InputHostMonitor.h"
#include "plugin/input/InputJournal.h"
#include "plugin/input/InputNetworkObserver.h"
#include "plugin/input/InputNetworkSecurity.h"
#include "plugin/input/InputProcessSecurity.h"
Expand Down Expand Up @@ -177,6 +178,7 @@ void PluginRegistry::LoadStaticPlugins() {
RegisterContinuousInputCreator(new StaticInputCreator<InputHostMeta>(), true);
RegisterContinuousInputCreator(new StaticInputCreator<InputHostMonitor>(), true);
RegisterContinuousInputCreator(new StaticInputCreator<InputForward>());
RegisterContinuousInputCreator(new StaticInputCreator<InputJournal>());
#endif
RegisterOnetimeInputCreator(new StaticInputCreator<InputStaticFile>());

Expand Down
352 changes: 352 additions & 0 deletions core/journal_server/JournalServer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,352 @@
/*
* Copyright 2025 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "JournalServer.h"

#include <errno.h>
#include <sys/epoll.h>
#include <unistd.h>

#include <memory>
#include <set>
#include <utility>

#include "collection_pipeline/queue/ProcessQueueManager.h"
#include "logger/Logger.h"
#include "manager/JournalConnection.h"
#include "manager/JournalMonitor.h"
#include "monitor/AlarmManager.h"
#include "processor/JournalEntryProcessor.h"
#include "reader/JournalReader.h"


using namespace std;

namespace logtail {

void JournalServer::Init() {
bool expected = false;
if (!mIsInitialized.compare_exchange_strong(expected, true)) {
LOG_INFO(sLogger, ("journal server already initialized", "skipping duplicate Init() call"));
return;
}

JournalConnection::GetInstance().Initialize();

if (!JournalMonitor::GetInstance()->Initialize()) {
LOG_ERROR(sLogger, ("journal server failed to initialize reader monitor", ""));
mIsInitialized.store(false);
return;
}

mThreadRes = async(launch::async, &JournalServer::run, this);

LOG_INFO(sLogger, ("journal server initialized", ""));
}

void JournalServer::Stop() {
bool expected = true;
if (!mIsInitialized.compare_exchange_strong(expected, false)) {
return;
}

if (!mThreadRes.valid()) {
return;
}

// Set stop flag
mIsThreadRunning.store(false);

// Wait for thread to exit with timeout
bool alarmOnce = false;
while (mThreadRes.valid()) {
std::future_status status = mThreadRes.wait_for(std::chrono::seconds(10));
if (status == std::future_status::ready) {
LOG_DEBUG(sLogger, ("journal server thread", "stopped successfully"));
break;
}
if (!alarmOnce) {
LOG_ERROR(sLogger, ("journal server thread stop", "too slow"));
AlarmManager::GetInstance()->SendAlarmError(CONFIG_UPDATE_ALARM,
std::string("JournalServer stop too slow"));
alarmOnce = true;
}
}

JournalMonitor::GetInstance()->Cleanup();

JournalConnection::GetInstance().Cleanup();

LOG_INFO(sLogger, ("journal server stopped", ""));
}

bool JournalServer::HasRegisteredPlugins() const {
return JournalConnection::GetInstance().GetConnectionCount() > 0;
}

void JournalServer::AddJournalInput(const string& configName, const JournalConfig& config) {
QueueKey queueKey = 0;
if (!getOrValidateQueueKey(configName, config, queueKey)) {
LOG_ERROR(sLogger, ("journal server input validation failed", "config not added")("config", configName));
return;
}

JournalConfig validatedConfig = config;
validatedConfig.mQueueKey = queueKey;

auto& connectionManager = JournalConnection::GetInstance();

if (connectionManager.AddConfig(configName, validatedConfig)) {
auto stats = connectionManager.GetStats();
LOG_INFO(sLogger,
("journal server manager stats", "")("total_configs", stats.totalConfigs)("active_connections",
stats.activeConnections));
} else {
LOG_ERROR(sLogger, ("journal server failed to add config to manager", "")("config", configName));
}
}

void JournalServer::RemoveJournalInput(const string& configName) {
JournalMonitor::GetInstance()->MarkReaderAsClosing(configName);
JournalMonitor::GetInstance()->RemoveReaderFromMonitoring(configName);

JournalConnection::GetInstance().RemoveConfig(configName);

LOG_INFO(sLogger, ("journal server input removed", "")("config", configName));
}


std::vector<std::string> JournalServer::GetAllJournalConfigNames() const {
auto allConfigs = JournalConnection::GetInstance().GetAllConfigs();

// Filter out unvalidated configs (mQueueKey == -1 means failed validateQueueKey validation)
std::vector<std::string> validatedConfigNames;
for (const auto& [configName, config] : allConfigs) {
if (config.mQueueKey != -1) {
validatedConfigNames.push_back(configName);
} else {
LOG_DEBUG(sLogger, ("journal server filtering unvalidated config", "")("config", configName));
}
}

return validatedConfigNames;
}

void JournalServer::run() {
LOG_INFO(sLogger, ("journal server event-driven thread", "started"));

#ifdef __linux__
auto* readerMonitor = JournalMonitor::GetInstance();
auto& monitoredReaders = readerMonitor->GetMonitoredReaders();
int epollFD = readerMonitor->GetEpollFD();
auto& connectionManager = JournalConnection::GetInstance();

constexpr int kMaxEvents = 64;
struct epoll_event events[kMaxEvents];

while (mIsThreadRunning.load()) {
try {
// Lazy cleanup: Remove readers that have been marked as closing
// This ensures any ongoing journal reads from previous iteration have completed
readerMonitor->CleanupClosedReaders();

auto configNames = GetAllJournalConfigNames();
connectionManager.RefreshConnectionsByInterval(configNames, *JournalMonitor::GetInstance());
readerMonitor->AddReadersToMonitoring(configNames);

int nfds = epoll_wait(epollFD, events, kMaxEvents, kJournalEpollTimeoutMS);

if (nfds == -1) {
if (errno == EINTR) {
continue; // Interrupted by signal, continue waiting
}
LOG_ERROR(sLogger, ("journal server epoll_wait failed", "")("error", strerror(errno)));
break;
}

// Build set of FDs that have events (when nfds > 0)
std::set<int> activeFDs;
if (nfds > 0) {
for (int i = 0; i < nfds; ++i) {
activeFDs.insert(events[i].data.fd);
}
}

// Fallback: process pending data when no events (nfds == 0)
// Note: When there are events (nfds > 0), pending data is processed in the main loop below
if (nfds == 0 && processPendingDataWhenNoEvents(monitoredReaders)) {
continue;
}

for (auto& pair : monitoredReaders) {
auto& monitoredReader = pair.second;
int readerFD = pair.first;

// Skip if reader is marked as closing (Use-After-Close prevention)
// This allows ongoing operations to complete but prevents new operations
if (monitoredReader.isClosing.load()) {
continue;
}

// Skip if no event and no pending data (when there are events)
if (nfds > 0 && activeFDs.find(readerFD) == activeFDs.end() && !monitoredReader.hasPendingData) {
continue;
}

std::shared_ptr<JournalReader> currentReader;
if (!readerMonitor->GetValidatedCurrentReader(monitoredReader, currentReader)) {
continue;
}

// Double-check closing status after getting reader (race condition protection)
if (monitoredReader.isClosing.load()) {
continue;
}

JournalStatusType status = currentReader->CheckJournalStatus();

// Skip if no new data and no pending data (when no events)
if (status == JournalStatusType::kNop && !monitoredReader.hasPendingData && nfds == 0) {
continue;
}

if (status != JournalStatusType::kError) {
// Normal status (NOP/APPEND/INVALIDATE), process journal events for this config
// Note: HandleJournalEntries merges accumulated data (pendingData) with new entries
processMonitoredReader(monitoredReader, currentReader);
} else {
monitoredReader.hasPendingData = false;
}
}

} catch (const exception& e) {
LOG_ERROR(sLogger, ("journal server exception in event loop", e.what()));
this_thread::sleep_for(chrono::milliseconds(1000)); // Wait 1 second on exception
} catch (...) {
LOG_ERROR(sLogger, ("journal server unknown exception in event loop", ""));
this_thread::sleep_for(chrono::milliseconds(1000)); // Wait 1 second on exception
}
}

LOG_INFO(sLogger, ("journal server event-driven thread", "stopped"));
#endif
}


bool JournalServer::processPendingDataWhenNoEvents(std::map<int, MonitoredReader>& monitoredReaders) {
// Check if there are any readers with pending data
bool hasReadersWithPendingData = false;
for (const auto& pair : monitoredReaders) {
if (pair.second.hasPendingData) {
hasReadersWithPendingData = true;
break;
}
}

if (!hasReadersWithPendingData) {
return false;
}

auto* readerMonitor = JournalMonitor::GetInstance();

// Process all readers with pending data
for (auto& pair : monitoredReaders) {
auto& monitoredReader = pair.second;

// Skip if reader is marked as closing (Use-After-Close prevention)
if (monitoredReader.isClosing.load()) {
continue;
}

if (monitoredReader.hasPendingData) {
std::shared_ptr<JournalReader> currentReader;
if (!readerMonitor->GetValidatedCurrentReader(monitoredReader, currentReader)) {
continue;
}

// Double-check closing status after getting reader (race condition protection)
if (monitoredReader.isClosing.load()) {
continue;
}

processMonitoredReader(monitoredReader, currentReader);
}
}

return true;
}

void JournalServer::processMonitoredReader(MonitoredReader& monitoredReader,
const std::shared_ptr<JournalReader>& currentReader) {
// Get connection manager and config
auto& connectionManager = JournalConnection::GetInstance();
JournalConfig config = connectionManager.GetConfig(monitoredReader.configName);

// Calculate timeout trigger
bool timeoutTrigger
= JournalMonitor::GetInstance()->IsBatchTimeoutExceeded(monitoredReader, config.mBatchTimeoutMs);

// Process journal entries
// Note: HandleJournalEntries merges accumulated data (pendingData) with new entries
HandleJournalEntries(monitoredReader.configName,
config,
currentReader,
config.mQueueKey,
timeoutTrigger,
&monitoredReader.accumulatedEventGroup,
&monitoredReader.accumulatedEntryCount,
&monitoredReader.hasPendingData,
&monitoredReader.lastBatchTime);
}

bool JournalServer::getOrValidateQueueKey(const std::string& configName,
const JournalConfig& config,
QueueKey& queueKey) {
if (!config.mCtx) {
LOG_ERROR(sLogger,
("journal server CRITICAL: no context available for config",
"this indicates initialization problem")("config", configName));
return false;
}

// If config already has queueKey, use it directly (for test environment)
if (config.mQueueKey != -1) {
queueKey = config.mQueueKey;
LOG_INFO(sLogger, ("journal server using pre-set queue key", "")("config", configName)("queue_key", queueKey));
return true;
}

queueKey = config.mCtx->GetProcessQueueKey();
if (queueKey == -1) {
LOG_WARNING(sLogger, ("journal server no queue key available for config", "skip")("config", configName));
return false;
}

return true;
}

// =============================================================================
// Test and Debug Support
// =============================================================================

#ifdef APSARA_UNIT_TEST_MAIN
void JournalServer::Clear() {
// Configs have been moved to JournalConnection, cleaned up via Cleanup()
JournalConnection::GetInstance().Cleanup();
}
#endif

} // namespace logtail
Loading
Loading