diff --git a/core/plugin/flusher/sls/DiskBufferWriter.cpp b/core/plugin/flusher/sls/DiskBufferWriter.cpp index a3e3208da3..2031935e0e 100644 --- a/core/plugin/flusher/sls/DiskBufferWriter.cpp +++ b/core/plugin/flusher/sls/DiskBufferWriter.cpp @@ -145,7 +145,17 @@ void DiskBufferWriter::Stop() { } bool DiskBufferWriter::PushToDiskBuffer(SenderQueueItem* item, uint32_t retryTimes) { - auto slsItem = static_cast(item); + // 添加空指针检查 + if (item == nullptr) { + LOG_ERROR(sLogger, ("PushToDiskBuffer failed", "item is null")); + return false; + } + + auto* slsItem = static_cast(item); + if (slsItem == nullptr) { + LOG_ERROR(sLogger, ("PushToDiskBuffer failed", "slsItem is null")); + return false; + } uint32_t retry = 0; while (++retry < retryTimes) { @@ -160,17 +170,28 @@ bool DiskBufferWriter::PushToDiskBuffer(SenderQueueItem* item, uint32_t retryTim this_thread::sleep_for(chrono::milliseconds(50)); } - auto flusher = static_cast(slsItem->mFlusher); + const auto* flusher = static_cast(slsItem->mFlusher); + if (flusher == nullptr) { + LOG_ERROR(sLogger, ("PushToDiskBuffer failed", "flusher is null")); + return false; + } + + // 提前提取字符串,避免后续指针被释放 + string region = flusher->mRegion; + string project = flusher->mProject; + string logstore = slsItem->mLogstore; + string queueKeyName = QueueKeyManager::GetInstance()->GetName(item->mFlusher->GetQueueKey()); + LOG_WARNING(sLogger, ("failed to add sender queue item to disk buffer writer", "queue is full")("action", "discard data")( - "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(item->mFlusher->GetQueueKey()))); + "config-flusher-dst", queueKeyName)("region", region)("project", project)("logstore", logstore)); AlarmManager::GetInstance()->SendAlarmCritical( DISCARD_DATA_ALARM, "failed to add sender queue item to disk buffer writer: queue is full\taction: discard data", - flusher->mRegion, - flusher->mProject, + region, + project, "", - slsItem->mLogstore); + logstore); return false; } @@ -191,7 +212,12 @@ void DiskBufferWriter::BufferWriterThread() { if (!res.empty()) { for (auto itr = res.begin(); itr != res.end(); ++itr) { - SendToBufferFile(*itr); + // 添加空指针检查,避免crash + if (*itr != nullptr) { + SendToBufferFile(*itr); + } else { + LOG_ERROR(sLogger, ("BufferWriterThread", "null item in queue")); + } delete *itr; } res.clear(); @@ -746,8 +772,38 @@ string DiskBufferWriter::GetBufferFileHeader() { } bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) { - auto data = static_cast(dataPtr); - auto flusher = static_cast(data->mFlusher); + // 添加空指针检查 + if (dataPtr == nullptr) { + LOG_ERROR(sLogger, ("SendToBufferFile failed", "dataPtr is null")); + return false; + } + + auto* data = static_cast(dataPtr); + if (data == nullptr) { + LOG_ERROR(sLogger, ("SendToBufferFile failed", "data is null")); + return false; + } + + const auto* flusher = static_cast(data->mFlusher); + if (flusher == nullptr) { + LOG_ERROR(sLogger, ("SendToBufferFile failed", "flusher is null")); + return false; + } + + // 提前提取所有需要的字符串,避免后续指针被释放 + string projectName = flusher->mProject; + string region = flusher->mRegion; + string aliuid = flusher->mAliuid; + string endpoint = flusher->mEndpoint; + string logstore = data->mLogstore; + string subpath = flusher->GetSubpath(); + string workspace = flusher->GetWorkspace(); + auto compressType = ConvertCompressType(flusher->GetCompressType()); + auto telemetryType = flusher->mTelemetryType; +#ifdef __ENTERPRISE__ + int32_t endpointMode = GetEndpointMode(flusher->mEndpointMode); +#endif + string bufferFileName = GetBufferFileName(); if (bufferFileName.empty()) { CreateNewFile(); @@ -760,11 +816,13 @@ bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) { AlarmManager::GetInstance()->SendAlarmCritical(SECONDARY_READ_WRITE_ALARM, string("open file error:") + bufferFileName + ",error:" + errorStr, - flusher->mRegion, - flusher->mProject, + region, + projectName, "", - data->mLogstore); - LOG_ERROR(sLogger, ("open buffer file error", bufferFileName)); + logstore); + LOG_ERROR(sLogger, + ("open buffer file error", bufferFileName)("region", region)("projectName", projectName)("logstore", + logstore)); return false; } @@ -776,46 +834,48 @@ bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) { AlarmManager::GetInstance()->SendAlarmCritical(SECONDARY_READ_WRITE_ALARM, string("write file error:") + bufferFileName + ", error:" + errorStr + ", nbytes:" + ToString(nbytes), - flusher->mRegion, - flusher->mProject, + region, + projectName, "", - data->mLogstore); - LOG_ERROR(sLogger, ("error write encryption header", bufferFileName)("error", errorStr)("nbytes", nbytes)); + logstore); + LOG_ERROR(sLogger, + ("error write encryption header", bufferFileName)("error", errorStr)("nbytes", nbytes)( + "region", region)("projectName", projectName)("logstore", logstore)); fclose(fout); return false; } } - char* des; - int32_t desLength; + char* des = nullptr; + int32_t desLength = 0; if (!FileEncryption::GetInstance()->Encrypt(data->mData.c_str(), data->mData.size(), des, desLength)) { fclose(fout); - LOG_ERROR(sLogger, ("encrypt error, project_name", flusher->mProject)); + LOG_ERROR(sLogger, ("encrypt error, project_name", projectName)("region", region)("logstore", logstore)); AlarmManager::GetInstance()->SendAlarmCritical(ENCRYPT_DECRYPT_FAIL_ALARM, - string("encrypt error, project_name:" + flusher->mProject), - flusher->mRegion, - flusher->mProject, + string("encrypt error, project_name:" + projectName), + region, + projectName, "", - data->mLogstore); + logstore); return false; } sls_logs::LogtailBufferMeta bufferMeta; - bufferMeta.set_project(flusher->mProject); - bufferMeta.set_region(flusher->mRegion); - bufferMeta.set_aliuid(flusher->mAliuid); - bufferMeta.set_logstore(data->mLogstore); + bufferMeta.set_project(projectName); + bufferMeta.set_region(region); + bufferMeta.set_aliuid(aliuid); + bufferMeta.set_logstore(logstore); bufferMeta.set_datatype(int32_t(data->mType)); bufferMeta.set_rawsize(data->mRawSize); bufferMeta.set_shardhashkey(data->mShardHashKey); - bufferMeta.set_compresstype(ConvertCompressType(flusher->GetCompressType())); - bufferMeta.set_telemetrytype(flusher->mTelemetryType); - bufferMeta.set_subpath(flusher->GetSubpath()); - bufferMeta.set_workspace(flusher->GetWorkspace()); + bufferMeta.set_compresstype(compressType); + bufferMeta.set_telemetrytype(telemetryType); + bufferMeta.set_subpath(subpath); + bufferMeta.set_workspace(workspace); #ifdef __ENTERPRISE__ - bufferMeta.set_endpointmode(GetEndpointMode(flusher->mEndpointMode)); + bufferMeta.set_endpointmode(endpointMode); #endif - bufferMeta.set_endpoint(flusher->mEndpoint); + bufferMeta.set_endpoint(endpoint); string encodedInfo; bufferMeta.SerializeToString(&encodedInfo); @@ -839,13 +899,13 @@ bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) { AlarmManager::GetInstance()->SendAlarmCritical(SECONDARY_READ_WRITE_ALARM, string("write file error:") + bufferFileName + ", error:" + errorStr + ", nbytes:" + ToString(nbytes), - flusher->mRegion, - flusher->mProject, + region, + projectName, "", - data->mLogstore); - LOG_ERROR( - sLogger, - ("write meta of buffer file", "fail")("filename", bufferFileName)("errorStr", errorStr)("nbytes", nbytes)); + logstore); + LOG_ERROR(sLogger, + ("write meta of buffer file", "fail")("filename", bufferFileName)("errorStr", errorStr)( + "nbytes", nbytes)("region", region)("projectName", projectName)("logstore", logstore)); delete[] buffer; fclose(fout); return false;