Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# Changelog

## 0.7.0 (unreleased)

* Feature: Add `transaction()` method to execute multiple queries atomically.
(#216 by @jdickinsondev91-stack)

This method wraps a callback in a `START TRANSACTION` / `COMMIT` pair and
automatically issues a `ROLLBACK` if the callback throws or the returned
promise rejects.

```php
$mysql->transaction(function (React\Mysql\MysqlClient $mysql) {
$mysql->query('INSERT INTO user (name) VALUES (?)', ['Alice']);
$mysql->query('INSERT INTO user (name) VALUES (?)', ['Bob']);
});
```

## 0.6.0 (2023-11-10)

* Feature: Improve Promise v3 support and use template types.
Expand Down
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ It is written in pure PHP and does not require any extensions.
* [MysqlClient](#mysqlclient)
* [__construct()](#__construct)
* [query()](#query)
* [transaction()](#transaction)
* [queryStream()](#querystream)
* [ping()](#ping)
* [quit()](#quit)
Expand Down Expand Up @@ -256,6 +257,46 @@ for multiple statements is disabled for security reasons because it
could allow for possible SQL injection attacks and this API is not
suited for exposing multiple possible results.

#### transaction()

The `transaction(callable(MysqlClient):mixed $callback): PromiseInterface<mixed>` method can be used to
perform multiple queries within an atomic transaction.

This method returns a promise that will resolve with the return value of
the callback on success or will reject with an `Exception` on error. The
callback receives this client instance and may execute any number of
queries. If any query fails or the callback throws an exception, the
transaction will be rolled back automatically and the resulting promise
will be rejected.

```php
$mysql->transaction(function (React\Mysql\MysqlClient $mysql) {
$mysql->query('INSERT INTO user (name) VALUES (?)', ['Alice']);
$mysql->query('INSERT INTO user (name) VALUES (?)', ['Bob']);
});
```

The callback may also return a value that will be used to resolve the
resulting promise after the transaction is committed:

```php
$mysql->transaction(function (React\Mysql\MysqlClient $mysql) {
$mysql->query('INSERT INTO user (name) VALUES (?)', ['Alice']);
return 'done';
})->then(function (string $value) {
echo $value . PHP_EOL; // "done"
}, function (Exception $error) {
// transaction was rolled back
echo 'Error: ' . $error->getMessage() . PHP_EOL;
});
```

Note that any queries issued inside the callback will be queued behind
the `START TRANSACTION` command and the `COMMIT` or `ROLLBACK` will be
queued after the callback completes. The MySQL protocol is inherently
sequential, so all commands are guaranteed to be executed in order on
the same connection.

#### queryStream()

The `queryStream(string $sql, list<string|int|float|bool|null> $params = []): ReadableStreamInterface` method can be used to
Expand Down
71 changes: 71 additions & 0 deletions src/MysqlClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,77 @@ public function query($sql, array $params = [])
});
}

/**
* Performs multiple queries within an atomic transaction.
*
* This method returns a promise that will resolve with the return value
* of the callback on success or will reject with an `Exception` on error.
* The callback receives this client instance and may execute any number
* of queries. If any query fails or the callback throws an exception,
* the transaction will be rolled back automatically and the resulting
* promise will be rejected.
*
* ```php
* $mysql->transaction(function (MysqlClient $mysql) {
* $mysql->query('INSERT INTO user (name) VALUES (?)', ['Alice']);
* $mysql->query('INSERT INTO user (name) VALUES (?)', ['Bob']);
* });
* ```
*
* The callback may also return a value that will be used to resolve the
* resulting promise after the transaction is committed:
*
* ```php
* $mysql->transaction(function (MysqlClient $mysql) {
* $mysql->query('INSERT INTO user (name) VALUES (?)', ['Alice']);
* return 'done';
* })->then(function (string $value) {
* echo $value . PHP_EOL; // "done"
* }, function (Exception $error) {
* echo 'Error: ' . $error->getMessage() . PHP_EOL;
* });
* ```
*
* Note that any queries issued inside the callback will be queued behind
* the `START TRANSACTION` command and the `COMMIT` or `ROLLBACK` will
* be queued after the callback completes. The MySQL protocol is inherently
* sequential, so all commands are guaranteed to be executed in order on
* the same connection.
*
* @param callable(MysqlClient):mixed $callback
* @return PromiseInterface<mixed>
* Resolves with the return value of $callback on success or rejects with an Exception on error.
*/
public function transaction(callable $callback)
{
if ($this->closed || $this->quitting) {
return \React\Promise\reject(new Exception('Connection closed'));
}

return $this->query('START TRANSACTION')->then(function () use ($callback) {
try {
$result = $callback($this);
} catch (\Exception $e) {
return $this->query('ROLLBACK')->then(function () use ($e) {
throw $e;
});
}

return \React\Promise\resolve($result)->then(
function ($value) {
return $this->query('COMMIT')->then(function () use ($value) {
return $value;
});
},
function (\Exception $e) {
return $this->query('ROLLBACK')->then(function () use ($e) {
throw $e;
});
}
);
});
}

/**
* Performs an async query and streams the rows of the result set.
*
Expand Down
114 changes: 114 additions & 0 deletions tests/MysqlClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2111,4 +2111,118 @@ public function testQuitReturnsRejectedPromiseAfterConnectionIsClosed()
$this->assertTrue($ret instanceof PromiseInterface);
$ret->then($this->expectCallableNever(), $this->expectCallableOnce());
}

public function testTransactionWillReturnResolvedPromiseWhenCallbackSucceeds()
{
$result = new MysqlResult();
$connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock();
$connection->expects($this->exactly(3))->method('query')
->willReturnOnConsecutiveCalls(
\React\Promise\resolve($result), // START TRANSACTION
\React\Promise\resolve($result), // INSERT
\React\Promise\resolve($result) // COMMIT
);

$factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock();
$factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection));
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();

$mysql = new MysqlClient('localhost', null, $loop);

$ref = new \ReflectionProperty($mysql, 'factory');
$ref->setAccessible(true);
$ref->setValue($mysql, $factory);

$promise = $mysql->transaction(function (MysqlClient $client) {
$client->query('INSERT INTO test (id) VALUES (1)');
return 'ok';
});

$promise->then($this->expectCallableOnceWith('ok'));
}

public function testTransactionWillReturnRejectedPromiseAndRollbackWhenCallbackThrows()
{
$result = new MysqlResult();
$connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock();
$connection->expects($this->exactly(2))->method('query')
->willReturnOnConsecutiveCalls(
\React\Promise\resolve($result), // START TRANSACTION
\React\Promise\resolve($result) // ROLLBACK
);

$factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock();
$factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection));
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();

$mysql = new MysqlClient('localhost', null, $loop);

$ref = new \ReflectionProperty($mysql, 'factory');
$ref->setAccessible(true);
$ref->setValue($mysql, $factory);

$promise = $mysql->transaction(function (MysqlClient $client) {
throw new \RuntimeException('Something failed');
});

$promise->then(null, $this->expectCallableOnce());
}

public function testTransactionWillReturnRejectedPromiseAndRollbackWhenCallbackReturnsRejectedPromise()
{
$result = new MysqlResult();
$connection = $this->getMockBuilder('React\Mysql\Io\Connection')->disableOriginalConstructor()->getMock();
$connection->expects($this->exactly(2))->method('query')
->willReturnOnConsecutiveCalls(
\React\Promise\resolve($result), // START TRANSACTION
\React\Promise\resolve($result) // ROLLBACK
);

$factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock();
$factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($connection));
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();

$mysql = new MysqlClient('localhost', null, $loop);

$ref = new \ReflectionProperty($mysql, 'factory');
$ref->setAccessible(true);
$ref->setValue($mysql, $factory);

$promise = $mysql->transaction(function (MysqlClient $client) {
return \React\Promise\reject(new \RuntimeException('Async failure'));
});

$promise->then(null, $this->expectCallableOnce());
}

public function testTransactionWillReturnRejectedPromiseWhenConnectionIsClosed()
{
$mysql = new MysqlClient('localhost');
$mysql->close();

$promise = $mysql->transaction(function (MysqlClient $client) {
return 'ok';
});

$promise->then(null, $this->expectCallableOnce());
}

public function testTransactionWillReturnRejectedPromiseWhenCreateConnectionRejects()
{
$factory = $this->getMockBuilder('React\Mysql\Io\Factory')->disableOriginalConstructor()->getMock();
$factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\reject(new \RuntimeException()));
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();

$mysql = new MysqlClient('localhost', null, $loop);

$ref = new \ReflectionProperty($mysql, 'factory');
$ref->setAccessible(true);
$ref->setValue($mysql, $factory);

$promise = $mysql->transaction(function (MysqlClient $client) {
return 'ok';
});

$promise->then(null, $this->expectCallableOnce());
}
}
Loading