Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/docker/integration/runner/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ org.apache.hudi:hudi-spark3.5-bundle_2.12:1.0.1,\
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.3,\
org.apache.hadoop:hadoop-aws:3.3.4,\
com.amazonaws:aws-java-sdk-bundle:1.12.262,\
org.apache.hadoop:hadoop-azure:3.3.4,\
com.microsoft.azure:azure-storage:8.6.6,\
org.apache.spark:spark-avro_2.12:3.5.1"\
&& /spark-3.5.5-bin-hadoop3/bin/spark-shell --packages "$packages" \
&& find /root/.ivy2/ -name '*.jar' -exec ln -sf {} /spark-3.5.5-bin-hadoop3/jars/ \;
Expand Down
28 changes: 11 additions & 17 deletions src/Databases/DataLake/GlueCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,20 @@
#include <DataTypes/DataTypesNumber.h>


#include <IO/S3/Credentials.h>
#include <IO/S3/Client.h>
#include <IO/S3Settings.h>
#include <Databases/DataLake/Common.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Common/ProxyConfigurationResolverProvider.h>
#include <Databases/DataLake/Common.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <IO/S3/Client.h>
#include <IO/S3/Credentials.h>
#include <IO/S3Settings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
#include <Common/ProxyConfigurationResolverProvider.h>

namespace DB::ErrorCodes
{
Expand Down Expand Up @@ -554,14 +555,7 @@ String GlueCatalog::resolveMetadataPathFromTableLocation(const String & table_lo
try
{
auto [metadata_version, metadata_path, compression_method] = DB::Iceberg::getLatestOrExplicitMetadataFileAndVersion(
object_storage,
table_path,
*storage_settings,
nullptr,
getContext(),
log.get(),
std::nullopt
);
object_storage, table_path, *storage_settings, nullptr, getContext(), log.get(), std::nullopt, DB::CompressionMethod::None);

LOG_TRACE(log, "Resolved metadata path '{}' (version {}) for table location '{}'", metadata_path, metadata_version, table_location);

Expand Down
2 changes: 1 addition & 1 deletion src/Disks/IO/ReadBufferFromAzureBlobStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ size_t ReadBufferFromAzureBlobStorage::readBigAt(char * to, size_t n, size_t ran

ObjectMetadata ReadBufferFromAzureBlobStorage::getObjectMetadataFromTheLastRequest() const
{
if (last_object_metadata.get()->has_value())
if (!last_object_metadata.get()->has_value())
throw Exception(ErrorCodes::NOT_INITIALIZED, "No Azure object metadata available because there were no successful requests");

return last_object_metadata.get()->value();
Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/IcebergMetadataLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void insertRowToLogTable(
std::function<String()> get_row,
IcebergMetadataLogLevel row_log_level,
const String & table_path,
const String & file_path,
const Iceberg::IcebergPathFromMetadata & file_path,
std::optional<UInt64> row_in_file,
std::optional<Iceberg::PruningReturnStatus> pruning_status)
{
Expand All @@ -108,7 +108,7 @@ void insertRowToLogTable(
.query_id = local_context->getCurrentQueryId(),
.content_type = row_log_level,
.table_path = table_path,
.file_path = file_path,
.file_path = file_path.serialize(),
.metadata_content = get_row(),
.row_in_file = row_in_file,
.pruning_status = pruning_status});
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/IcebergMetadataLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Core/SettingsEnums.h>
#include <Interpreters/SystemLog.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h>

namespace DB
Expand Down Expand Up @@ -33,7 +34,7 @@ void insertRowToLogTable(
std::function<String()> get_row,
IcebergMetadataLogLevel row_log_level,
const String & table_path,
const String & file_path,
const Iceberg::IcebergPathFromMetadata & file_path,
std::optional<UInt64> row_in_file,
std::optional<Iceberg::PruningReturnStatus> pruning_status);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <IO/WriteBufferFromString.h>
#include <Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Common/UniqueLock.h>
Expand Down Expand Up @@ -35,7 +36,9 @@ namespace DB::Iceberg
using namespace DB;

AvroForIcebergDeserializer::AvroForIcebergDeserializer(
std::unique_ptr<ReadBufferFromFileBase> buffer_, const std::string & manifest_file_path_, const DB::FormatSettings & format_settings)
std::unique_ptr<ReadBufferFromFileBase> buffer_,
const IcebergPathFromMetadata & manifest_file_path_,
const DB::FormatSettings & format_settings)
try
: buffer(std::move(buffer_))
, manifest_file_path(manifest_file_path_)
Expand Down Expand Up @@ -156,7 +159,8 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
}


const auto file_path_key = getValueFromRowByName(row_index, c_data_file_file_path, TypeIndex::String).safeGet<String>();
const auto file_path_key = IcebergPathFromMetadata::deserialize(
getValueFromRowByName(row_index, c_data_file_file_path, TypeIndex::String).safeGet<String>());
/// NOTE: This is weird, because in manifest file partition looks like this:
/// {
/// ...
Expand Down Expand Up @@ -257,16 +261,18 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
}
case FileContentType::POSITION_DELETE: {
/// reference_file_path can be absent in schema for some reason, though it is present in specification: https://iceberg.apache.org/spec/#manifests
std::optional<String> lower_reference_data_file_path = std::nullopt;
std::optional<String> upper_reference_data_file_path = std::nullopt;
std::optional<Iceberg::IcebergPathFromMetadata> lower_reference_data_file_path;
std::optional<Iceberg::IcebergPathFromMetadata> upper_reference_data_file_path;
bool bounds_set_by_referenced_data_file = false;
if (hasPath(c_data_file_referenced_data_file))
{
Field reference_file_path_field = getValueFromRowByName(row_index, c_data_file_referenced_data_file);
if (!reference_file_path_field.isNull())
{
lower_reference_data_file_path = reference_file_path_field.safeGet<String>();
upper_reference_data_file_path = reference_file_path_field.safeGet<String>();
lower_reference_data_file_path.emplace(
Iceberg::IcebergPathFromMetadata::deserialize(reference_file_path_field.safeGet<String>()));
upper_reference_data_file_path.emplace(
Iceberg::IcebergPathFromMetadata::deserialize(reference_file_path_field.safeGet<String>()));
bounds_set_by_referenced_data_file = true;
}
}
Expand All @@ -277,9 +283,9 @@ ParsedManifestFileEntryPtr AvroForIcebergDeserializer::createParsedManifestFileE
{
auto & [lower, upper] = it->second;
if (!lower.isNull())
lower_reference_data_file_path = lower.safeGet<String>();
lower_reference_data_file_path.emplace(Iceberg::IcebergPathFromMetadata::deserialize(lower.safeGet<String>()));
if (!upper.isNull())
upper_reference_data_file_path = upper.safeGet<String>();
upper_reference_data_file_path.emplace(Iceberg::IcebergPathFromMetadata::deserialize(upper.safeGet<String>()));
}
}
return std::make_shared<const ParsedManifestFileEntry>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <Core/Field.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/ReadBufferFromFileBase.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergPath.h>
#include <Common/SharedMutex.h>


Expand Down Expand Up @@ -36,7 +37,7 @@ class AvroForIcebergDeserializer
{
private:
std::unique_ptr<DB::ReadBufferFromFileBase> buffer;
std::string manifest_file_path;
Iceberg::IcebergPathFromMetadata manifest_file_path;
DB::ColumnPtr parsed_column;
std::shared_ptr<const DB::DataTypeTuple> parsed_column_data_type;
mutable std::optional<ColumnsWithTypeAndName> cache_parsed_columns TSA_GUARDED_BY(cache_mutex);
Expand All @@ -61,7 +62,7 @@ class AvroForIcebergDeserializer
public:
AvroForIcebergDeserializer(
std::unique_ptr<DB::ReadBufferFromFileBase> buffer_,
const std::string & manifest_file_path_,
const Iceberg::IcebergPathFromMetadata & manifest_file_path_,
const DB::FormatSettings & format_settings);

size_t rows() const;
Expand Down
1 change: 0 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ class IDataLakeMetadata : boost::noncopyable
const std::vector<Field> & /* partition_values */,
SharedHeader /* sample_block */,
const std::vector<String> & /* data_file_paths */,
StorageObjectStorageConfigurationPtr /* configuration */,
ContextPtr /* context */)
{
throwNotImplemented("commitExportPartitionTransaction");
Expand Down
Loading
Loading