Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8275a31
feat(dart): add transformationOptions and *WithTransformation helpers
MarioAlexandruDan Apr 30, 2026
542a47e
Merge branch 'main' into feat/dart-transformation-options
MarioAlexandruDan May 11, 2026
2bebb0b
feat(dart): enhance TransformationOptions and update ingestion transp…
MarioAlexandruDan May 11, 2026
01ba397
Merge branch 'main' into feat/dart-transformation-options
MarioAlexandruDan May 11, 2026
8fd0d02
feat(dart): add gradle wrapper and update transformation options
MarioAlexandruDan May 12, 2026
44910b3
fix(dart): add algolia_client_ingestion dependency to client_search p…
MarioAlexandruDan May 12, 2026
ee6ec15
fix(dart): add missing extension.dart to client_ingestion package
MarioAlexandruDan May 12, 2026
026e618
fix(dart): add algolia_client_ingestion to test pubspec and fix trail…
MarioAlexandruDan May 12, 2026
5149674
Merge branch 'main' into feat/dart-transformation-options
MarioAlexandruDan May 12, 2026
3555bd9
fix(dart): use Uri.parse instead of Uri.dataFromString for query para…
MarioAlexandruDan May 12, 2026
00436de
fix(dart): fix CTS test generation for ingestion - camelCase params, …
MarioAlexandruDan May 12, 2026
051d012
fix(dart): fix ingestion client default timeouts, AlgoliaApiException…
MarioAlexandruDan May 12, 2026
72c3612
Merge branch 'main' into feat/dart-transformation-options
MarioAlexandruDan May 13, 2026
dfa8bc0
fix(dart): use flat serverTimeouts map for default ClientOptions to f…
MarioAlexandruDan May 13, 2026
6756b21
fix(dart): add scalar server timeout values to bundle for reliable Mu…
MarioAlexandruDan May 13, 2026
daae490
fix(dart): simplify processOpenAPI - no try/catch needed since Config…
MarioAlexandruDan May 13, 2026
a05cf6f
fix(dart): apply spec server timeouts at request level for methods wi…
MarioAlexandruDan May 13, 2026
d2fa25b
fix(dart): add dart to assertValidReplaceAllObjectsWithTransformation…
MarioAlexandruDan May 13, 2026
97ac919
fix(dart): add dart to assertPushMockValid count in runCts
MarioAlexandruDan May 13, 2026
f9b572b
Merge branch 'main' into feat/dart-transformation-options
MarioAlexandruDan May 13, 2026
d6e5ae0
Merge branch 'main' into feat/dart-transformation-options
MarioAlexandruDan May 13, 2026
2eb6cd7
Merge branch 'main' into feat/dart-transformation-options
MarioAlexandruDan May 13, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export 'src/api_client.dart';
export 'src/config/agent_segment.dart';
export 'src/config/client_options.dart';
export 'src/config/host.dart';
export 'src/config/transformation_options.dart';
export 'src/transport/api_request.dart';
export 'src/transport/request_options.dart';
export 'src/transport/requester.dart';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'dart:core';

import 'package:algolia_client_core/src/config/agent_segment.dart';
import 'package:algolia_client_core/src/config/host.dart';
import 'package:algolia_client_core/src/config/transformation_options.dart';
import 'package:algolia_client_core/src/transport/requester.dart';
import 'package:dio/dio.dart';

Expand Down Expand Up @@ -42,6 +43,9 @@ final class ClientOptions {
/// Set to 'gzip' to enable gzip compression for POST/PUT requests.
final String? compression;

/// Options for the ingestion transporter used by `*WithTransformation` helpers on [SearchClient].
final TransformationOptions? transformationOptions;

/// Constructs a [ClientOptions] instance with the provided parameters.
const ClientOptions({
this.connectTimeout = const Duration(seconds: 2),
Expand All @@ -55,6 +59,7 @@ final class ClientOptions {
this.interceptors,
this.httpClientAdapter,
this.compression,
this.transformationOptions,
});

@override
Expand All @@ -72,7 +77,8 @@ final class ClientOptions {
requester == other.requester &&
interceptors == other.interceptors &&
httpClientAdapter == other.httpClientAdapter &&
compression == other.compression;
compression == other.compression &&
transformationOptions == other.transformationOptions;

@override
int get hashCode =>
Expand All @@ -86,10 +92,11 @@ final class ClientOptions {
requester.hashCode ^
interceptors.hashCode ^
httpClientAdapter.hashCode ^
compression.hashCode;
compression.hashCode ^
transformationOptions.hashCode;

@override
String toString() {
return 'ClientOptions{hosts: $hosts, connectTimeout: $connectTimeout, writeTimeout: $writeTimeout, readTimeout: $readTimeout, headers: $headers, agentSegments: $agentSegments, logger: $logger, requester: $requester, interceptors: $interceptors, httpClientAdapter: $httpClientAdapter, compression: $compression}';
return 'ClientOptions{hosts: $hosts, connectTimeout: $connectTimeout, writeTimeout: $writeTimeout, readTimeout: $readTimeout, headers: $headers, agentSegments: $agentSegments, logger: $logger, requester: $requester, interceptors: $interceptors, httpClientAdapter: $httpClientAdapter, compression: $compression, transformationOptions: $transformationOptions}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import 'package:algolia_client_core/src/config/client_options.dart';

/// Options for the ingestion transporter used by `*WithTransformation` helpers.
final class TransformationOptions {
/// The Algolia region for the Ingestion API (e.g. `'us'` or `'eu'`). Required.
final String region;

/// Optional overrides for the ingestion transporter's [ClientOptions].
/// Only the fields you set here replace the Ingestion API defaults (25 s timeouts).
/// Do not set [ClientOptions.transformationOptions] here — it is ignored.
final ClientOptions? ingestionClientOptions;

/// Constructs a [TransformationOptions] instance.
TransformationOptions({
required this.region,
this.ingestionClientOptions,
}) {
if (region.isEmpty) {
throw ArgumentError(
'region is required in transformationOptions.'
' See https://www.algolia.com/doc/libraries/sdk/methods/ingestion',
);
}
}

@override
bool operator ==(Object other) =>
identical(this, other) ||
other is TransformationOptions &&
runtimeType == other.runtimeType &&
region == other.region &&
ingestionClientOptions == other.ingestionClientOptions;

@override
int get hashCode => region.hashCode ^ ingestionClientOptions.hashCode;

@override
String toString() {
return 'TransformationOptions{region: $region, ingestionClientOptions: $ingestionClientOptions}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class DioRequester implements Requester {
path: request.path,
);
if (request.queryParameters.isNotEmpty) {
return Uri.dataFromString(
return Uri.parse(
"${uri.toString()}?${request.queryParameters.entries.map((e) => "${e.key}=${e.value}").join("&")}");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export 'extension/search.dart';
export 'extension/transformation.dart';
export 'extension/wait_task.dart';
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
import 'dart:math';

import 'package:algolia_client_core/algolia_client_core.dart';
import 'package:algolia_client_ingestion/algolia_client_ingestion.dart' as ingestion;
import 'package:algolia_client_search/src/api/search_client.dart';
import 'package:algolia_client_search/src/extension/wait_task.dart';
import 'package:algolia_client_search/src/model/event.dart';
import 'package:algolia_client_search/src/model/event_status.dart';
import 'package:algolia_client_search/src/model/event_type.dart';
import 'package:algolia_client_search/src/model/operation_index_params.dart';
import 'package:algolia_client_search/src/model/operation_type.dart';
import 'package:algolia_client_search/src/model/replace_all_objects_with_transformation_response.dart';
import 'package:algolia_client_search/src/model/scope_type.dart';
import 'package:algolia_client_search/src/model/watch_response.dart';

extension Transformation on SearchClient {
static const _notSetError =
'transformationOptions must be set in the client config before calling this method.'
' It defaults to the Ingestion API defaults.'
' See https://www.algolia.com/doc/libraries/sdk/methods/ingestion';

/// Chunks [objects] and pushes them through the Ingestion pipeline with [action].
Future<List<WatchResponse>> chunkedPush({
required String indexName,
required Iterable<Map<String, dynamic>> objects,
required ingestion.Action action,
bool waitForTasks = false,
int batchSize = 1000,
String? referenceIndexName,
RequestOptions? requestOptions,
}) async {
if (batchSize < 1) throw ArgumentError('`batchSize` must be greater than 0');
final transporter = ingestionTransporter;
if (transporter == null) throw StateError(_notSetError);

final responses = <WatchResponse>[];
final batch = <ingestion.PushTaskRecords>[];
final pollInterval = (batchSize ~/ 10).clamp(1, batchSize);
int polledUpTo = 0;

final iter = objects.iterator;
if (!iter.moveNext()) return responses;

while (true) {
batch.add(_toRecord(iter.current));
final isLast = !iter.moveNext();

if (batch.length == batchSize || isLast) {
final raw = await transporter.push(
indexName: indexName,
pushTaskPayload: ingestion.PushTaskPayload(action: action, records: List.of(batch)),
referenceIndexName: referenceIndexName,
requestOptions: requestOptions,
);
responses.add(_convertWatchResponse(raw));
batch.clear();

if (waitForTasks &&
(responses.length % pollInterval == 0 || isLast)) {
await _pollBatch(
transporter: transporter,
responses: responses,
from: polledUpTo,
to: responses.length,
requestOptions: requestOptions,
);
polledUpTo = responses.length;
}
}

if (isLast) break;
}

return responses;
}

/// Saves objects through the Ingestion pipeline. Requires [TransformationOptions] to be set.
Future<List<WatchResponse>> saveObjectsWithTransformation({
required String indexName,
required Iterable<Map<String, dynamic>> objects,
bool waitForTasks = false,
int batchSize = 1000,
RequestOptions? requestOptions,
}) {
return chunkedPush(
indexName: indexName,
objects: objects,
action: ingestion.Action.addObject,
waitForTasks: waitForTasks,
batchSize: batchSize,
requestOptions: requestOptions,
);
}

/// Partially updates objects through the Ingestion pipeline. Requires [TransformationOptions] to be set.
Future<List<WatchResponse>> partialUpdateObjectsWithTransformation({
required String indexName,
required Iterable<Map<String, dynamic>> objects,
bool createIfNotExists = true,
bool waitForTasks = false,
int batchSize = 1000,
RequestOptions? requestOptions,
}) {
return chunkedPush(
indexName: indexName,
objects: objects,
action: createIfNotExists
? ingestion.Action.partialUpdateObject
: ingestion.Action.partialUpdateObjectNoCreate,
waitForTasks: waitForTasks,
batchSize: batchSize,
requestOptions: requestOptions,
);
}

/// Replaces all objects in [indexName] via the Ingestion pipeline without downtime.
/// Requires [TransformationOptions] to be set.
Future<ReplaceAllObjectsWithTransformationResponse> replaceAllObjectsWithTransformation({
required String indexName,
required Iterable<Map<String, dynamic>> objects,
int batchSize = 1000,
List<ScopeType>? scopes,
RequestOptions? requestOptions,
}) async {
if (ingestionTransporter == null) throw StateError(_notSetError);

final effectiveScopes = scopes ?? [ScopeType.settings, ScopeType.rules, ScopeType.synonyms];
final tmpIndex = '${indexName}_tmp_${Random().nextInt(900000) + 100000}';

try {
var copyResponse = await operationIndex(
indexName: indexName,
operationIndexParams: OperationIndexParams(
operation: OperationType.copy,
destination: tmpIndex,
scope: effectiveScopes,
),
requestOptions: requestOptions,
);

final watchResponses = await chunkedPush(
indexName: tmpIndex,
objects: objects,
action: ingestion.Action.addObject,
waitForTasks: true,
batchSize: batchSize,
referenceIndexName: indexName,
requestOptions: requestOptions,
);

await waitTask(indexName: tmpIndex, taskID: copyResponse.taskID, requestOptions: requestOptions);

copyResponse = await operationIndex(
indexName: indexName,
operationIndexParams: OperationIndexParams(
operation: OperationType.copy,
destination: tmpIndex,
scope: effectiveScopes,
),
requestOptions: requestOptions,
);
await waitTask(indexName: tmpIndex, taskID: copyResponse.taskID, requestOptions: requestOptions);

final moveResponse = await operationIndex(
indexName: tmpIndex,
operationIndexParams: OperationIndexParams(
operation: OperationType.move,
destination: indexName,
),
requestOptions: requestOptions,
);
await waitTask(indexName: tmpIndex, taskID: moveResponse.taskID, requestOptions: requestOptions);

return ReplaceAllObjectsWithTransformationResponse(
copyOperationResponse: copyResponse,
watchResponses: watchResponses,
moveOperationResponse: moveResponse,
);
} catch (_) {
try {
await deleteIndex(indexName: tmpIndex);
} catch (_) {}
rethrow;
}
}
}

Future<void> _pollBatch({
required ingestion.IngestionClient transporter,
required List<WatchResponse> responses,
required int from,
required int to,
RequestOptions? requestOptions,
}) async {
for (final resp in responses.sublist(from, to)) {
final eventID = resp.eventID;
if (eventID == null) continue;
await _waitForEvent(
transporter: transporter,
runID: resp.runID,
eventID: eventID,
requestOptions: requestOptions,
);
}
}

Future<void> _waitForEvent({
required ingestion.IngestionClient transporter,
required String runID,
required String eventID,
RequestOptions? requestOptions,
}) async {
for (var retries = 0; retries < 50; retries++) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is 100 now, and we want to pass maxRetries from the calling method I believe (check Dart jira ticket for maxRetries update)

try {
await transporter.getEvent(
runID: runID,
eventID: eventID,
requestOptions: requestOptions,
);
return;
} on AlgoliaApiException catch (e) {
if (e.statusCode != 404) rethrow;
}
await Future<void>.delayed(
Duration(milliseconds: (retries * 1500).clamp(0, 5000)),
);
}
Comment on lines +213 to +227
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silently returns after 50 retries
Add a throw exception at the end

}

ingestion.PushTaskRecords _toRecord(Map<String, dynamic> obj) {
final objectID = obj['objectID'];
if (objectID == null || objectID is! String) {
throw ArgumentError('each object must have an `objectID` key in order to be indexed');
}
final rest = Map<String, dynamic>.from(obj)..remove('objectID');
return ingestion.PushTaskRecords(objectID: objectID, additionalProperties: rest);
}

WatchResponse _convertWatchResponse(ingestion.WatchResponse r) {
return WatchResponse(
runID: r.runID,
eventID: r.eventID,
data: r.data,
events: r.events
?.map((e) => Event(
eventID: e.eventID,
runID: e.runID,
status: e.status != null ? EventStatus.fromJson(e.status!.toJson()) : null,
type: EventType.fromJson(e.type.toJson()),
batchSize: e.batchSize,
data: e.data,
publishedAt: e.publishedAt,
))
.toList(),
message: r.message,
createdAt: r.createdAt,
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ environment:

dependencies:
algolia_client_core: ^1.49.1
algolia_client_ingestion: ^1.49.1
json_annotation: ^4.8.1
collection: ^1.17.1

Expand Down
4 changes: 4 additions & 0 deletions config/clients.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
"name": "composition",
"output": "clients/algoliasearch-client-dart/packages/client_composition"
},
{
"name": "ingestion",
"output": "clients/algoliasearch-client-dart/packages/client_ingestion"
},
{
"name": "insights",
"output": "clients/algoliasearch-client-dart/packages/client_insights"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void processOpts() {
public void processOpenAPI(OpenAPI openAPI) {
super.processOpenAPI(openAPI);
Helpers.generateServers(super.fromServers(openAPI.getServers()), additionalProperties);
Timeouts.enrichBundle(openAPI, additionalProperties);
}

@Override
Expand Down
Loading
Loading