Skip to content

Commit eefc2ab

Browse files
committed
Cancellation improvements
1 parent 9faf9c9 commit eefc2ab

2 files changed

Lines changed: 126 additions & 12 deletions

File tree

src/PgAsync/Connection.php

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ class Connection extends EventEmitter
113113
/** @var Subject */
114114
private $notificationSubject;
115115

116+
/** @var bool */
117+
private $cancelPending;
118+
119+
/** @var bool */
120+
private $cancelRequested;
121+
116122
/**
117123
* Can be 'I' for Idle, 'T' if in transactions block
118124
* or 'E' if in failed transaction block (queries will fail until end of trans)
@@ -156,15 +162,18 @@ public function __construct(array $parameters, LoopInterface $loop, ConnectorInt
156162
$parameters['application_name'] = 'pgasync';
157163
}
158164

159-
$this->parameters = $parameters;
160165
$this->loop = $loop;
161166
$this->commandQueue = [];
162167
$this->queryState = static::STATE_BUSY;
163168
$this->queryType = static::QUERY_SIMPLE;
164169
$this->connStatus = static::CONNECTION_NEEDED;
165170
$this->socket = $connector ?: new Connector($loop);
166-
$this->uri = 'tcp://' . $this->parameters['host'] . ':' . $this->parameters['port'];
171+
$this->uri = 'tcp://' . $parameters['host'] . ':' . $parameters['port'];
167172
$this->notificationSubject = new Subject();
173+
$this->cancelPending = false;
174+
$this->cancelRequested = false;
175+
176+
$this->parameters = $parameters;
168177
}
169178

170179
private function start()
@@ -226,6 +235,12 @@ public function onData($data)
226235
while (strlen($data) > 0) {
227236
$data = $this->processData($data);
228237
}
238+
239+
// We should only cancel if we have drained the input buffer (as much as we can see)
240+
// and there is still a pending query that needs to be canceled
241+
if ($this->cancelRequested) {
242+
$this->cancelRequest();
243+
}
229244
}
230245

231246
private function processData($data)
@@ -390,6 +405,11 @@ private function handleCommandComplete(CommandComplete $message)
390405
$command = $this->currentCommand;
391406
$this->currentCommand = null;
392407
$command->complete();
408+
409+
// if we have requested a cancel for this query
410+
// but we have received the command complete before we
411+
// had a chance to start canceling - then never mind
412+
$this->cancelRequested = false;
393413
}
394414
$this->debug('Command complete.');
395415
}
@@ -477,6 +497,11 @@ private function failAllCommandsWith(\Throwable $e = null)
477497

478498
public function processQueue()
479499
{
500+
if ($this->cancelPending) {
501+
$this->debug("Not processing queue because there is a cancellation pending.");
502+
return;
503+
}
504+
480505
if (count($this->commandQueue) === 0 && $this->queryState === static::STATE_READY && $this->auto_disconnect) {
481506
$this->commandQueue[] = new Terminate();
482507
}
@@ -542,7 +567,7 @@ function (ObserverInterface $observer, SchedulerInterface $scheduler = null) use
542567

543568
return new CallbackDisposable(function () use ($q) {
544569
if ($this->currentCommand === $q && $q->isActive()) {
545-
$this->cancelRequest();
570+
$this->cancelRequested = true;
546571
}
547572
$q->cancel();
548573
});
@@ -587,31 +612,41 @@ function (ObserverInterface $observer, SchedulerInterface $scheduler = null) use
587612

588613
$name = 'somestatement';
589614

615+
/** @var CommandInterface[] $commandGroup */
616+
$commandGroup = [];
590617
$close = new Close($name);
591-
$this->commandQueue[] = $close;
618+
$commandGroup[] = $close;
592619

593620
$prepare = new Parse($name, $queryString);
594-
$this->commandQueue[] = $prepare;
621+
$commandGroup[] = $prepare;
595622

596623
$bind = new Bind($parameters, $name);
597-
$this->commandQueue[] = $bind;
624+
$commandGroup[] = $bind;
598625

599626
$describe = new Describe();
600-
$this->commandQueue[] = $describe;
627+
$commandGroup[] = $describe;
601628

602629
$execute = new Execute();
603-
$this->commandQueue[] = $execute;
630+
$commandGroup[] = $execute;
604631

605632
$sync = new Sync($queryString, $observer);
606-
$this->commandQueue[] = $sync;
633+
$commandGroup[] = $sync;
634+
635+
$this->commandQueue = array_merge($this->commandQueue, $commandGroup);
607636

608637
$this->processQueue();
609638

610-
return new CallbackDisposable(function () use ($sync) {
639+
return new CallbackDisposable(function () use ($sync, $commandGroup) {
611640
if ($this->currentCommand === $sync && $sync->isActive()) {
612-
$this->cancelRequest();
641+
$this->cancelRequested = true;
642+
$sync->cancel();
643+
644+
// no point in canceling the other commands because they are out the door
645+
return;
646+
}
647+
foreach ($commandGroup as $command) {
648+
$command->cancel();
613649
}
614-
$sync->cancel();
615650
});
616651
}
617652
);
@@ -646,11 +681,23 @@ public function disconnect()
646681

647682
private function cancelRequest()
648683
{
684+
$this->cancelRequested = false;
685+
if ($this->queryState !== self::STATE_BUSY) {
686+
$this->debug("Not canceling because there is nothing to cancel.");
687+
return;
688+
}
649689
if ($this->currentCommand !== null) {
690+
$this->cancelPending = true;
650691
$this->socket->connect($this->uri)->then(function (DuplexStreamInterface $conn) {
651692
$cancelRequest = new CancelRequest($this->backendKeyData->getPid(), $this->backendKeyData->getKey());
693+
$conn->on('close', function () {
694+
$this->cancelPending = false;
695+
$this->processQueue();
696+
});
652697
$conn->end($cancelRequest->encodedMessage());
653698
}, function (\Throwable $e) {
699+
$this->cancelPending = false;
700+
$this->processQueue();
654701
$this->debug("Error connecting for cancellation... " . $e->getMessage() . "\n");
655702
});
656703
}

tests/Integration/ConnectionTest.php

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,4 +313,71 @@ function () {
313313
$conn->disconnect();
314314
$this->getLoop()->run();
315315
}
316+
317+
public function testCancellationWithImmediateQueryQueuedUp() {
318+
$conn = new Connection([
319+
"user" => $this->getDbUser(),
320+
"database" => $this::getDbName()
321+
], $this->getLoop());
322+
323+
$q1 = $conn->query("SELECT * FROM generate_series(1,4)");
324+
$q2 = $conn->query("SELECT pg_sleep(10)");
325+
326+
$testQuery = $q1->merge($q2)->take(1);
327+
328+
$value = null;
329+
330+
$testQuery->subscribe(
331+
function ($results) use (&$value) {
332+
$value = $results;
333+
$this->stopLoop();
334+
},
335+
function (\Throwable $e) {
336+
$this->fail('Expected no error' . $e->getMessage());
337+
$this->stopLoop();
338+
},
339+
function () {
340+
$this->stopLoop();
341+
}
342+
);
343+
344+
$this->runLoopWithTimeout(15);
345+
346+
$this->assertEquals(['generate_series' => '1'], $value);
347+
348+
$conn->disconnect();
349+
$this->getLoop()->run();
350+
}
351+
352+
public function testArrayInParameters() {
353+
$conn = new Connection([
354+
"user" => $this->getDbUser(),
355+
"database" => $this::getDbName()
356+
], $this->getLoop());
357+
358+
$testQuery = $conn->executeStatement("SELECT * FROM generate_series(1,4) WHERE generate_series = ANY($1)", ['{2, 3}']);
359+
360+
$value = [];
361+
362+
$testQuery->subscribe(
363+
function ($results) use (&$value) {
364+
$value[] = $results;
365+
$this->stopLoop();
366+
},
367+
function (\Throwable $e) {
368+
$this->fail('Expected no error' . $e->getMessage());
369+
$this->stopLoop();
370+
},
371+
function () {
372+
$this->stopLoop();
373+
}
374+
);
375+
376+
$this->runLoopWithTimeout(15);
377+
378+
$this->assertEquals([['generate_series' => 2], ['generate_series' => 3]], $value);
379+
380+
$conn->disconnect();
381+
$this->getLoop()->run();
382+
}
316383
}

0 commit comments

Comments
 (0)