Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/dialect/dialect-adapter-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ export abstract class DialectAdapterBase implements DialectAdapter {
return false
}

get supportsBatch(): boolean {
return false
}

abstract acquireMigrationLock(
db: Kysely<any>,
options: MigrationLockOptions,
Expand Down
8 changes: 8 additions & 0 deletions src/dialect/dialect-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ export interface DialectAdapter {
*/
readonly supportsOutput?: boolean

/**
* Whether or not this dialect supports batched query execution.
*
* When true, multiple queries can be executed more efficiently by reducing
* network round trips. The actual batching mechanism is dialect-specific.
*/
readonly supportsBatch: boolean

/**
* This method is used to acquire a lock for the migrations so that
* it's not possible for two migration operations to run in parallel.
Expand Down
4 changes: 4 additions & 0 deletions src/dialect/postgres/postgres-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ export class PostgresAdapter extends DialectAdapterBase {
return true
}

override get supportsBatch(): boolean {
return true
}

override async acquireMigrationLock(
db: Kysely<any>,
_opt: MigrationLockOptions,
Expand Down
24 changes: 24 additions & 0 deletions src/dialect/postgres/postgres-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,30 @@ class PostgresConnection implements DatabaseConnection {
}
}

async executeBatch<R>(
compiledQueries: ReadonlyArray<CompiledQuery>,
): Promise<QueryResult<R>[]> {
// FIXME: This does not actually use Postgres's pipeline mode, as it's not supported by node-postgres.
const results: QueryResult<R>[] = []

try {
await this.executeQuery(CompiledQuery.raw('begin'))

for (const compiledQuery of compiledQueries) {
const result = await this.executeQuery<R>(compiledQuery)
results.push(result)
}

await this.executeQuery(CompiledQuery.raw('commit'))

return results
} catch (error) {
await this.executeQuery(CompiledQuery.raw('rollback'))

throw error
}
}

[PRIVATE_RELEASE_METHOD](): void {
this.#client.release()
}
Expand Down
10 changes: 10 additions & 0 deletions src/driver/database-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ export interface DatabaseConnection {
compiledQuery: CompiledQuery,
chunkSize?: number,
): AsyncIterableIterator<QueryResult<R>>

/**
* Executes multiple queries in a batch.
*
* This is optional and only implemented by dialects that support batching.
* When not implemented, queries will be executed sequentially.
*/
executeBatch?<R>(
compiledQueries: ReadonlyArray<CompiledQuery>,
): Promise<QueryResult<R>[]>
}

export interface QueryResult<O> {
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export * from './query-builder/json-path-builder.js'
export * from './query-builder/merge-query-builder.js'
export * from './query-builder/merge-result.js'
export * from './query-builder/order-by-item-builder.js'
export * from './query-builder/batch-builder.js'

export * from './raw-builder/raw-builder.js'
export * from './raw-builder/sql.js'
Expand Down
55 changes: 55 additions & 0 deletions src/kysely.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import {
} from './util/provide-controlled-connection.js'
import { ConnectionProvider } from './driver/connection-provider.js'
import { logOnce } from './util/log-once.js'
import { BatchBuilder } from './query-builder/batch-builder.js'

declare global {
interface AsyncDisposable {}
Expand Down Expand Up @@ -561,6 +562,47 @@ export class Kysely<DB>
return this.getExecutor().executeQuery<R>(compiledQuery)
}

/**
* Creates a batch builder for executing multiple queries efficiently.
*
* Batching queries can reduce network round trips when executing multiple
* independent queries. Whether batching provides performance benefits depends
* on the dialect - check {@link DialectAdapter.supportsBatch} to see if your
* dialect supports optimized batching.
*
* ### Examples
*
* Execute multiple queries and get type-safe results:
*
* ```ts
* const [persons, pets] = await db
* .batch()
* .add(db.selectFrom('person').selectAll())
* .add(db.selectFrom('pet').selectAll())
* .execute()
*
* // persons is Person[]
* // pets is Pet[]
* ```
*
* Mix different query types:
*
* ```ts
* const results = await db
* .batch()
* .add(db.selectFrom('person').selectAll().where('id', '=', 1))
* .add(db.updateTable('person').set({ active: true }).where('id', '=', 1))
* .add(db.deleteFrom('pet').where('id', '=', 123))
* .execute()
* ```
*/
batch(): BatchBuilder {
if (!this.#props.executor.adapter.supportsBatch) {
throw new Error('batch execution is not supported by this dialect')
}
return new BatchBuilder({ executor: this.#props.executor })
}

async [Symbol.asyncDispose]() {
await this.destroy()
}
Expand Down Expand Up @@ -593,6 +635,12 @@ export class Transaction<DB> extends Kysely<DB> {
)
}

override batch(): BatchBuilder {
throw new Error(
'calling the batch method for a Transaction is not supported',
)
}

override async destroy(): Promise<void> {
throw new Error(
'calling the destroy method for a Transaction is not supported',
Expand Down Expand Up @@ -1204,6 +1252,13 @@ class NotCommittedOrRolledBackAssertingExecutor implements QueryExecutor {
return this.#executor.stream(compiledQuery, chunkSize)
}

executeBatch<R>(
compiledQueries: ReadonlyArray<CompiledQuery<R>>,
): Promise<QueryResult<R>[]> {
assertNotCommittedOrRolledBack(this.#state)
return this.#executor.executeBatch(compiledQueries)
}

withConnectionProvider(
connectionProvider: ConnectionProvider,
): QueryExecutor {
Expand Down
102 changes: 102 additions & 0 deletions src/query-builder/batch-builder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { CompiledQuery } from '../query-compiler/compiled-query.js'
import { Compilable, isCompilable } from '../util/compilable.js'
import { QueryExecutor } from '../query-executor/query-executor.js'
import { freeze } from '../util/object-utils.js'

export interface BatchBuilderProps {
readonly executor: QueryExecutor
}

/**
* A builder for executing multiple queries in a batch.
*
* Batching queries can reduce network round trips and improve performance
* when executing multiple independent queries.
*
* ### Examples
*
* Execute multiple queries in a batch:
*
* ```ts
* const results = await db
* .batch()
* .add(db.selectFrom('person').selectAll().where('id', '=', 1))
* .add(db.selectFrom('pet').selectAll().where('owner_id', '=', 1))
* .execute()
*
* // results[0] contains the person query results
* // results[1] contains the pet query results
* ```
*
* With type-safe destructuring:
*
* ```ts
* const [persons, pets] = await db
* .batch()
* .add(db.selectFrom('person').selectAll())
* .add(db.selectFrom('pet').selectAll())
* .execute()
*
* // persons is typed as Person[]
* // pets is typed as Pet[]
* ```
*/
export class BatchBuilder<R extends any[] = []> {
readonly #props: BatchBuilderProps
readonly #queries: CompiledQuery[]

constructor(props: BatchBuilderProps, queries: CompiledQuery[] = []) {
this.#props = freeze(props)
this.#queries = queries
}

/**
* Adds a query to the batch.
*
* The query can be any compilable query builder or a pre-compiled query.
*
* ### Examples
*
* ```ts
* const batch = db
* .batch()
* .add(db.selectFrom('person').selectAll())
* .add(db.updateTable('person').set({ active: true }).where('id', '=', 1))
* .add(db.deleteFrom('pet').where('id', '=', 123))
* ```
*/
add<O>(query: Compilable<O> | CompiledQuery<O>): BatchBuilder<[...R, O[]]> {
const compiledQuery = isCompilable(query) ? query.compile() : query
return new BatchBuilder<[...R, O[]]>(this.#props, [
...this.#queries,
compiledQuery,
])
}

/**
* Executes all queries in the batch.
*
* Returns an array of results in the same order as the queries were added.
* Each result contains the rows returned by that query.
*
* ### Examples
*
* ```ts
* const [persons, pets, toys] = await db
* .batch()
* .add(db.selectFrom('person').selectAll())
* .add(db.selectFrom('pet').selectAll())
* .add(db.selectFrom('toy').selectAll())
* .execute()
* ```
*/
async execute(): Promise<{ [K in keyof R]: R[K] }> {
if (this.#queries.length === 0) {
return [] as { [K in keyof R]: R[K] }
}

const results = await this.#props.executor.executeBatch(this.#queries)

return results.map((result) => result.rows) as { [K in keyof R]: R[K] }
}
}
24 changes: 24 additions & 0 deletions src/query-executor/query-executor-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,30 @@ export abstract class QueryExecutorBase implements QueryExecutor {
}
}

async executeBatch<R>(
compiledQueries: ReadonlyArray<CompiledQuery>,
): Promise<QueryResult<R>[]> {
return await this.provideConnection(async (connection) => {
if (!this.adapter.supportsBatch || !connection.executeBatch) {
throw new Error('Batching is not supported by this dialect')
}

const results = await connection.executeBatch<R>(compiledQueries)

const transformedResults = []
for (let i = 0; i < results.length; i++) {
const transformed = await this.#transformResult(
results[i],
compiledQueries[i].queryId,
)
transformedResults.push(transformed)
}

// Cast is safe: because we know the results are QueryResult<R>
return transformedResults as QueryResult<R>[]
})
}

abstract withConnectionProvider(
connectionProvider: ConnectionProvider,
): QueryExecutorBase
Expand Down
14 changes: 14 additions & 0 deletions src/query-executor/query-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ export interface QueryExecutor extends ConnectionProvider {
chunkSize: number,
): AsyncIterableIterator<QueryResult<R>>

/**
* Executes multiple compiled queries as a batch.
*
* If the dialect supports batching (adapter.supportsBatch is true), queries
* will be executed using the connection's batch execution method. Otherwise,
* queries will be executed sequentially.
*
* Results are returned in the same order as the input queries, with each
* result transformed by the plugins' `transformResult` method.
*/
executeBatch<R>(
compiledQueries: ReadonlyArray<CompiledQuery<R>>,
): Promise<QueryResult<R>[]>

/**
* Returns a copy of this executor with a new connection provider.
*/
Expand Down
Loading