Skip to content

Commit 429e828

Browse files
authored
feat: pipeline (#118)
* feat: pipeline * fix: test pipeline * fix: test * fix: test * fix: force fail * fix: try another error
1 parent e51bd5f commit 429e828

8 files changed

Lines changed: 470 additions & 1 deletion

File tree

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ jobs:
1111
runs-on: ubuntu-latest
1212
services:
1313
postgres:
14-
image: postgres:11
14+
image: postgres:14
1515
env:
1616
POSTGRES_USER: postgres
1717
POSTGRES_PASSWORD: postgres

README.md

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,99 @@ Issues a request to cancel the currently executing query _on this instance of li
310310

311311
Returns the version of the connected PostgreSQL backend server as a number.
312312

313+
### Pipeline Mode (PostgreSQL 14+)
314+
315+
Pipeline mode allows sending multiple queries to the server without waiting for results, significantly reducing round-trip latency. These functions are only available when compiled against PostgreSQL 14 or later client libraries.
316+
317+
##### `pq.pipelineModeSupported():boolean`
318+
319+
Returns `true` if pipeline mode is supported (compiled against PostgreSQL 14+), `false` otherwise.
320+
321+
##### `pq.enterPipelineMode():boolean`
322+
323+
Enters pipeline mode on the connection. In pipeline mode, you can send multiple queries using the async send functions (`sendQuery`, `sendQueryParams`, etc.) without waiting for results.
324+
325+
Returns `true` if successful, `false` if failed. Throws an error if pipeline mode is not supported.
326+
327+
##### `pq.exitPipelineMode():boolean`
328+
329+
Exits pipeline mode. Can only be called when the pipeline is empty (all results have been processed).
330+
331+
Returns `true` if successful, `false` if failed.
332+
333+
##### `pq.pipelineStatus():int`
334+
335+
Returns the current pipeline status:
336+
- `PQ.PIPELINE_OFF` (0): Not in pipeline mode
337+
- `PQ.PIPELINE_ON` (1): In pipeline mode
338+
- `PQ.PIPELINE_ABORTED` (2): Pipeline aborted due to error
339+
340+
##### `pq.pipelineSync():boolean`
341+
342+
Sends a synchronization point in the pipeline. The server will process all queries up to this point and send their results before processing any further queries. This is essential for error handling - if a query fails, all subsequent queries until the next sync point are skipped.
343+
344+
Returns `true` if successful, `false` if failed.
345+
346+
##### `pq.sendFlushRequest():boolean`
347+
348+
Sends a request for the server to flush its output buffer. Useful when you want to receive results before sending a sync.
349+
350+
Returns `true` if successful, `false` if failed.
351+
352+
#### Pipeline Mode Constants
353+
354+
```js
355+
PQ.PIPELINE_OFF // 0 - Not in pipeline mode
356+
PQ.PIPELINE_ON // 1 - In pipeline mode
357+
PQ.PIPELINE_ABORTED // 2 - Pipeline aborted due to error
358+
```
359+
360+
#### Pipeline Mode Example
361+
362+
```js
363+
var PQ = require('libpq');
364+
var pq = new PQ();
365+
pq.connectSync();
366+
367+
if (!pq.pipelineModeSupported()) {
368+
console.log('Pipeline mode requires PostgreSQL 14+ client libraries');
369+
process.exit(1);
370+
}
371+
372+
// Enter pipeline mode
373+
pq.enterPipelineMode();
374+
pq.setNonBlocking(true);
375+
376+
// Send multiple queries without waiting
377+
pq.sendQueryParams('SELECT $1::int', ['1']);
378+
pq.sendQueryParams('SELECT $1::int', ['2']);
379+
pq.sendQueryParams('SELECT $1::int', ['3']);
380+
381+
// Mark end of pipeline batch
382+
pq.pipelineSync();
383+
pq.flush();
384+
385+
// Process results asynchronously
386+
pq.on('readable', function() {
387+
pq.consumeInput();
388+
while (!pq.isBusy()) {
389+
if (!pq.getResult()) break;
390+
391+
var status = pq.resultStatus();
392+
if (status === 'PGRES_TUPLES_OK') {
393+
console.log('Result:', pq.getvalue(0, 0));
394+
} else if (status === 'PGRES_PIPELINE_SYNC') {
395+
// All results received
396+
pq.stopReader();
397+
pq.exitPipelineMode();
398+
pq.finish();
399+
}
400+
}
401+
});
402+
403+
pq.startReader();
404+
```
405+
313406
## testing
314407

315408
```sh

index.js

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,3 +396,67 @@ PQ.prototype.getCopyData = function (async) {
396396
PQ.prototype.cancel = function () {
397397
return this.$cancel();
398398
};
399+
400+
// Pipeline mode functions (PostgreSQL 14+)
401+
// These functions are only available if compiled against PostgreSQL 14 or later
402+
403+
//Enters pipeline mode on the connection
404+
//Returns true if successful, false if failed
405+
//Pipeline mode allows sending multiple queries without waiting for results
406+
PQ.prototype.enterPipelineMode = function () {
407+
if (typeof this.$enterPipelineMode !== 'function') {
408+
throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.');
409+
}
410+
return this.$enterPipelineMode();
411+
};
412+
413+
//Exits pipeline mode on the connection
414+
//Returns true if successful, false if failed
415+
//Can only exit pipeline mode when the queue is empty and no pending results
416+
PQ.prototype.exitPipelineMode = function () {
417+
if (typeof this.$exitPipelineMode !== 'function') {
418+
throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.');
419+
}
420+
return this.$exitPipelineMode();
421+
};
422+
423+
//Returns the current pipeline status
424+
//0 = PQ_PIPELINE_OFF (not in pipeline mode)
425+
//1 = PQ_PIPELINE_ON (in pipeline mode)
426+
//2 = PQ_PIPELINE_ABORTED (pipeline aborted due to error)
427+
PQ.prototype.pipelineStatus = function () {
428+
if (typeof this.$pipelineStatus !== 'function') {
429+
throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.');
430+
}
431+
return this.$pipelineStatus();
432+
};
433+
434+
//Sends a sync message in pipeline mode
435+
//This marks a synchronization point - the server will send results
436+
//for all queries up to this point before processing further queries
437+
//Returns true if successful, false if failed
438+
PQ.prototype.pipelineSync = function () {
439+
if (typeof this.$pipelineSync !== 'function') {
440+
throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.');
441+
}
442+
return this.$pipelineSync();
443+
};
444+
445+
//Sends a request for the server to flush its output buffer
446+
//Returns true if successful, false if failed
447+
PQ.prototype.sendFlushRequest = function () {
448+
if (typeof this.$sendFlushRequest !== 'function') {
449+
throw new Error('Pipeline mode is not supported. Requires PostgreSQL 14+ client libraries.');
450+
}
451+
return this.$sendFlushRequest();
452+
};
453+
454+
//Check if pipeline mode is supported (compiled against PostgreSQL 14+)
455+
PQ.prototype.pipelineModeSupported = function () {
456+
return typeof this.$enterPipelineMode === 'function';
457+
};
458+
459+
// Pipeline status constants
460+
PQ.PIPELINE_OFF = 0;
461+
PQ.PIPELINE_ON = 1;
462+
PQ.PIPELINE_ABORTED = 2;

src/addon.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,15 @@ NAN_MODULE_INIT(InitAddon) {
7474
//Cancel
7575
Nan::SetPrototypeMethod(tpl, "$cancel", Connection::Cancel);
7676

77+
#ifdef PIPELINE_MODE_SUPPORTED
78+
//Pipeline mode (PostgreSQL 14+)
79+
Nan::SetPrototypeMethod(tpl, "$enterPipelineMode", Connection::EnterPipelineMode);
80+
Nan::SetPrototypeMethod(tpl, "$exitPipelineMode", Connection::ExitPipelineMode);
81+
Nan::SetPrototypeMethod(tpl, "$pipelineStatus", Connection::PipelineStatus);
82+
Nan::SetPrototypeMethod(tpl, "$pipelineSync", Connection::PipelineSync);
83+
Nan::SetPrototypeMethod(tpl, "$sendFlushRequest", Connection::SendFlushRequest);
84+
#endif
85+
7786
Nan::Set(target,
7887
Nan::New("PQ").ToLocalChecked(), Nan::GetFunction(tpl).ToLocalChecked());
7988
}

src/addon.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
#define MORE_ERROR_FIELDS_SUPPORTED
1414
#endif
1515

16+
#if PG_VERSION_NUM >= 140000
17+
#define PIPELINE_MODE_SUPPORTED
18+
#endif
19+
1620
#include "connection.h"
1721
#include "connect-async-worker.h"
1822

src/connection.cc

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,59 @@ NAN_METHOD(Connection::Cancel) {
727727
delete[] errBuff;
728728
}
729729

730+
#ifdef PIPELINE_MODE_SUPPORTED
731+
NAN_METHOD(Connection::EnterPipelineMode) {
732+
TRACE("Connection::EnterPipelineMode");
733+
734+
Connection* self = NODE_THIS();
735+
736+
int result = PQenterPipelineMode(self->pq);
737+
738+
info.GetReturnValue().Set(result == 1);
739+
}
740+
741+
NAN_METHOD(Connection::ExitPipelineMode) {
742+
TRACE("Connection::ExitPipelineMode");
743+
744+
Connection* self = NODE_THIS();
745+
746+
int result = PQexitPipelineMode(self->pq);
747+
748+
info.GetReturnValue().Set(result == 1);
749+
}
750+
751+
NAN_METHOD(Connection::PipelineStatus) {
752+
TRACE("Connection::PipelineStatus");
753+
754+
Connection* self = NODE_THIS();
755+
756+
PGpipelineStatus status = PQpipelineStatus(self->pq);
757+
758+
// PQ_PIPELINE_OFF = 0, PQ_PIPELINE_ON = 1, PQ_PIPELINE_ABORTED = 2
759+
info.GetReturnValue().Set(static_cast<int>(status));
760+
}
761+
762+
NAN_METHOD(Connection::PipelineSync) {
763+
TRACE("Connection::PipelineSync");
764+
765+
Connection* self = NODE_THIS();
766+
767+
int result = PQpipelineSync(self->pq);
768+
769+
info.GetReturnValue().Set(result == 1);
770+
}
771+
772+
NAN_METHOD(Connection::SendFlushRequest) {
773+
TRACE("Connection::SendFlushRequest");
774+
775+
Connection* self = NODE_THIS();
776+
777+
int result = PQsendFlushRequest(self->pq);
778+
779+
info.GetReturnValue().Set(result == 1);
780+
}
781+
#endif
782+
730783
bool Connection::ConnectDB(const char* paramString) {
731784
TRACEF("Connection::ConnectDB:Connection parameters: %s\n", paramString);
732785
this->pq = PQconnectdb(paramString);

src/connection.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ class Connection : public Nan::ObjectWrap {
5656
static NAN_METHOD(PutCopyEnd);
5757
static NAN_METHOD(GetCopyData);
5858
static NAN_METHOD(Cancel);
59+
#ifdef PIPELINE_MODE_SUPPORTED
60+
static NAN_METHOD(EnterPipelineMode);
61+
static NAN_METHOD(ExitPipelineMode);
62+
static NAN_METHOD(PipelineStatus);
63+
static NAN_METHOD(PipelineSync);
64+
static NAN_METHOD(SendFlushRequest);
65+
#endif
5966

6067
bool ConnectDB(const char* paramString);
6168
void InitPollSocket();

0 commit comments

Comments
 (0)