diff --git a/csharp/src/BigQueryParameters.cs b/csharp/src/BigQueryParameters.cs index 42a4869..7fffd78 100644 --- a/csharp/src/BigQueryParameters.cs +++ b/csharp/src/BigQueryParameters.cs @@ -40,6 +40,7 @@ internal class BigQueryParameters public const string ClientSecret = "adbc.bigquery.client_secret"; public const string ClientTimeout = "adbc.bigquery.client.timeout"; public const string EvaluationKind = "adbc.bigquery.multiple_statement.evaluation_kind"; + public const string DisableExplicitCancel = "adbc.bigquery.disable_explicit_cancel"; public const string GetQueryResultsOptionsTimeout = "adbc.bigquery.get_query_results_options.timeout"; public const string IncludeConstraintsWithGetObjects = "adbc.bigquery.include_constraints_getobjects"; public const string IncludePublicProjectId = "adbc.bigquery.include_public_project_id"; diff --git a/csharp/src/BigQueryStatement.cs b/csharp/src/BigQueryStatement.cs index 6aa81ea..a3e7bef 100644 --- a/csharp/src/BigQueryStatement.cs +++ b/csharp/src/BigQueryStatement.cs @@ -114,7 +114,8 @@ private async Task ExecuteQueryInternalAsync() activity?.AddBigQueryParameterTag(BigQueryParameters.GetQueryResultsOptionsTimeout, seconds); } - using JobCancellationContext cancellationContext = new JobCancellationContext(cancellationRegistry, job); + bool disableExplicitCancel = Options?.ContainsKey(BigQueryParameters.DisableExplicitCancel) == true; + using ICancellationContext cancellationContext = CancellationContext.New(cancellationRegistry, disableExplicitCancel, job); // We can't checkJobStatus, Otherwise, the timeout in QueryResultsOptions is meaningless. // When encountering a long-running job, it should be controlled by the timeout in the Google SDK instead of blocking in a while loop. @@ -224,7 +225,8 @@ private async Task ExecuteQueryInternalAsync() IEnumerable readers = await ExecuteWithRetriesAsync(getArrowReadersFunc, activity, cancellationContext.CancellationToken).ConfigureAwait(false); // Note: MultiArrowReader must dispose the cancellationContext. - IArrowArrayStream stream = new MultiArrowReader(this, TranslateSchema(results.Schema), readers, new CancellationContext(cancellationRegistry)); + ICancellationContext cancellationContext1 = CancellationContext.New(cancellationRegistry, disableExplicitCancel); + IArrowArrayStream stream = new MultiArrowReader(this, TranslateSchema(results.Schema), readers, cancellationContext1); activity?.AddTag(SemanticConventions.Db.Response.ReturnedRows, totalRows); return new QueryResult(totalRows, stream); }); @@ -285,7 +287,9 @@ private async Task ExecuteUpdateInternalAsync() activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, SqlQuery, this.bigQueryConnection.IsSafeToTrace); - using JobCancellationContext context = new(cancellationRegistry); + bool disableExplicitCancel = Options?.ContainsKey(BigQueryParameters.DisableExplicitCancel) == true; + using ICancellationContext context = CancellationContext.New(cancellationRegistry, disableExplicitCancel); + // Cannot set destination table in jobs with DDL statements, otherwise an error will be prompted Func> getQueryResultsAsyncFunc = async () => { @@ -574,9 +578,9 @@ private async Task ExecuteWithRetriesAsync(Func> action, Activity? await RetryManager.ExecuteWithRetriesAsync(this, action, activity, MaxRetryAttempts, RetryDelayMs, cancellationToken); private async Task ExecuteCancellableJobAsync( - JobCancellationContext context, + ICancellationContext context, Activity? activity, - Func> func) + Func> func) { try { @@ -610,19 +614,40 @@ private async Task ExecuteCancellableJobAsync( } } - private class CancellationContext : IDisposable + private interface ICancellationContext : IDisposable + { + BigQueryJob? Job { get; set; } + CancellationToken CancellationToken { get; } + void Cancel(); + } + + private class CancellationContext : ICancellationContext { private readonly CancellationRegistry cancellationRegistry; private readonly CancellationTokenSource cancellationTokenSource; private bool disposed; - public CancellationContext(CancellationRegistry cancellationRegistry) + public static readonly ICancellationContext Null = new NullCancellationContext(); + + public static ICancellationContext New(CancellationRegistry cancellationRegistry, bool disableExplicitCancel = false, BigQueryJob? job = default) + { + if (disableExplicitCancel) + { + return Null; + } + return new CancellationContext(cancellationRegistry, job); + } + + private CancellationContext(CancellationRegistry cancellationRegistry, BigQueryJob? job = default) { cancellationTokenSource = new CancellationTokenSource(); + this.Job = job; this.cancellationRegistry = cancellationRegistry; this.cancellationRegistry.Register(this); } + public BigQueryJob? Job { get; set; } + public CancellationToken CancellationToken => cancellationTokenSource.Token; public void Cancel() @@ -641,15 +666,15 @@ public virtual void Dispose() } } - private class JobCancellationContext : CancellationContext + private class NullCancellationContext : ICancellationContext { - public JobCancellationContext(CancellationRegistry cancellationRegistry, BigQueryJob? job = default) - : base(cancellationRegistry) - { - Job = job; - } + public BigQueryJob? Job { get; set; } = null; - public BigQueryJob? Job { get; set; } + public CancellationToken CancellationToken { get; } = default; + + public void Cancel() { } + + public void Dispose() { } } private sealed class CancellationRegistry : IDisposable @@ -698,12 +723,12 @@ private class MultiArrowReader : TracingReader private static readonly string s_assemblyVersion = BigQueryUtils.GetAssemblyVersion(typeof(BigQueryStatement)); readonly Schema schema; - readonly CancellationContext cancellationContext; + readonly ICancellationContext cancellationContext; IEnumerator? readers; IArrowReader? reader; bool disposed; - public MultiArrowReader(BigQueryStatement statement, Schema schema, IEnumerable readers, CancellationContext cancellationContext) : base(statement) + public MultiArrowReader(BigQueryStatement statement, Schema schema, IEnumerable readers, ICancellationContext cancellationContext) : base(statement) { this.schema = schema; this.readers = readers.GetEnumerator(); diff --git a/csharp/test/StatementTests.cs b/csharp/test/StatementTests.cs index 8d3001c..4152d5b 100644 --- a/csharp/test/StatementTests.cs +++ b/csharp/test/StatementTests.cs @@ -216,6 +216,75 @@ public async Task CanCancelStreamFromStatement() } } + [Fact] + public async Task CanDisableExplicitCancelStatement() + { + foreach (BigQueryTestEnvironment environment in _environments) + { + AdbcConnection adbcConnection = GetAdbcConnection(environment.Name); + + AdbcStatement statement = adbcConnection.CreateStatement(); + statement.SetOption(BigQueryParameters.DisableExplicitCancel, "true"); + + // Execute the query/cancel multiple times to validate consistent behavior + const int iterations = 3; + for (int i = 0; i < iterations; i++) + { + _outputHelper?.WriteLine($"Iteration {i + 1} of {iterations}"); + // Generate unique column names so query will not be served from cache + string columnName1 = Guid.NewGuid().ToString("N"); + string columnName2 = Guid.NewGuid().ToString("N"); + statement.SqlQuery = $"SELECT GENERATE_ARRAY(`{columnName2}`, 10000) AS `{columnName1}` FROM UNNEST(GENERATE_ARRAY(0, 100000)) AS `{columnName2}`"; + _outputHelper?.WriteLine($"Query: {statement.SqlQuery}"); + + // Expect this to take about 10 seconds without cancellation + Task queryTask = Task.Run(statement.ExecuteQuery); + + await Task.Yield(); + statement.Cancel(); + + QueryResult queryResult = await queryTask; + // Should not throw OperationCanceledException + } + } + } + + [Fact] + public async Task CanDisableExplicitCancelStreamFromStatement() + { + foreach (BigQueryTestEnvironment environment in _environments) + { + using AdbcConnection adbcConnection = GetAdbcConnection(environment.Name); + using AdbcStatement statement = adbcConnection.CreateStatement(); + statement.SetOption(BigQueryParameters.DisableExplicitCancel, "true"); + + // Execute the query/cancel multiple times to validate consistent behavior + const int iterations = 3; + QueryResult[] results = new QueryResult[iterations]; + for (int i = 0; i < iterations; i++) + { + _outputHelper?.WriteLine($"Iteration {i + 1} of {iterations}"); + // Generate unique column names so query will not be served from cache + string columnName1 = Guid.NewGuid().ToString("N"); + string columnName2 = Guid.NewGuid().ToString("N"); + statement.SqlQuery = $"SELECT `{columnName2}` AS `{columnName1}` FROM UNNEST(GENERATE_ARRAY(1, 100)) AS `{columnName2}`"; + _outputHelper?.WriteLine($"Query: {statement.SqlQuery}"); + + // Expect this to take about 10 seconds without cancellation + results[i] = statement.ExecuteQuery(); + } + statement.Cancel(); + for (int index = 0; index < iterations; index++) + { + QueryResult queryResult = results[index]; + using IArrowArrayStream? stream = queryResult.Stream; + Assert.NotNull(stream); + RecordBatch batch = await stream.ReadNextRecordBatchAsync(); + // Should not throw OperationCanceledException + } + } + } + private AdbcConnection GetAdbcConnection(string? environmentName) { if (string.IsNullOrEmpty(environmentName))