Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
.DS_Store
.mypy_cache
.pytest_cache
.python-version
*.orig
/Makefile.local
*.yawiki
Expand All @@ -12,6 +13,7 @@
/build*/
/cmake-build-*/
CMakeLists.txt.user
CMakeUserPresets.json
compile_commands.json
tags
static-analyzer-report
Expand Down
23 changes: 18 additions & 5 deletions redis/include/userver/storages/redis/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
/// @brief @copybrief storages::redis::Client

#include <chrono>
#include <memory>
#include <string>

#include <userver/storages/redis/base.hpp>
Expand All @@ -13,11 +12,11 @@
#include <userver/storages/redis/bit_operation.hpp>
#include <userver/storages/redis/client_fwd.hpp>
#include <userver/storages/redis/command_options.hpp>
#include <userver/storages/redis/pipeline.hpp>
#include <userver/storages/redis/request.hpp>
#include <userver/storages/redis/request_eval.hpp>
#include <userver/storages/redis/request_evalsha.hpp>
#include <userver/storages/redis/request_generic.hpp>
#include <userver/storages/redis/transaction.hpp>

USERVER_NAMESPACE_BEGIN

Expand Down Expand Up @@ -158,7 +157,8 @@ class Client {
size_t key_index,
const CommandControl& command_control
) {
return RequestGeneric<ReplyType>{GenericCommon(std::move(command), std::move(args), key_index, command_control)
return RequestGeneric<ReplyType>{
GenericCommon(std::move(command), std::move(args), key_index, command_control)
};
}

Expand Down Expand Up @@ -373,9 +373,22 @@ class Client {
const CommandControl& command_control
) = 0;

virtual TransactionPtr Multi() = 0;
/// @brief Atomic sequence of Redis commands (https://redis.io/docs/latest/develop/using-commands/transactions)
///
/// @note Redis transaction implements isolation, but not
/// all-or-nothing semantics (IOW a subcommand may fail, but the following
/// subcommands will succeed).
/// Implemented via pipeline mechanism
virtual PipelinePtr Multi() = 0;

/// @brief Atomic sequence of Redis commands (https://redis.io/docs/latest/develop/using-commands/transactions)
///
/// @note check_shards controls if all subcommands should belong to same shard or not
virtual PipelinePtr Multi(Pipeline::CheckShards check_shards) = 0;

virtual PipelinePtr Pipeline() = 0;

virtual TransactionPtr Multi(Transaction::CheckShards check_shards) = 0;
virtual PipelinePtr Pipeline(Pipeline::CheckShards check_shards) = 0;

virtual RequestPersist Persist(std::string key, const CommandControl& command_control) = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ USERVER_NAMESPACE_BEGIN

namespace storages::redis::impl {

[[noreturn]] void ThrowTransactionNotStarted(std::string_view description);
[[noreturn]] void ThrowPipelineNotStarted(std::string_view description);

template <typename ReplyType>
class TransactionSubrequestDataImpl final : public RequestDataBase<ReplyType> {
class PipelineSubrequestDataImpl final : public RequestDataBase<ReplyType> {
public:
TransactionSubrequestDataImpl(engine::Future<ReplyType> future)
: future_(std::move(future))
{}
PipelineSubrequestDataImpl(engine::Future<ReplyType> future) : future_(std::move(future)) {}

void Wait() override { ThrowIfNotReady("Wait() for"); }

Expand All @@ -27,7 +25,7 @@ class TransactionSubrequestDataImpl final : public RequestDataBase<ReplyType> {
return future_.get();
}

ReplyPtr GetRaw() override { throw std::logic_error("call TransactionSubrequestDataImpl::GetRaw()"); }
ReplyPtr GetRaw() override { throw std::logic_error("call PipelineSubrequestDataImpl::GetRaw()"); }

engine::impl::ContextAccessor* TryGetContextAccessor() noexcept override {
UASSERT_MSG(false, "Not implemented");
Expand All @@ -37,7 +35,7 @@ class TransactionSubrequestDataImpl final : public RequestDataBase<ReplyType> {
private:
void ThrowIfNotReady(std::string_view description) {
if (future_.wait_until(engine::Deadline::Passed()) != engine::FutureStatus::kReady) {
ThrowTransactionNotStarted(description);
ThrowPipelineNotStarted(description);
}
}

Expand Down
2 changes: 1 addition & 1 deletion redis/include/userver/storages/redis/key_type.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace storages::redis {

/// @brief Type of the Redis value stored by a key.
///
/// Returned by storages::redis::Client and storages::redis::Transaction from membed function `Type()`
/// Returned by storages::redis::Client and storages::redis::Pipeline from membed function `Type()`
enum class KeyType { kNone, kString, kList, kSet, kZset, kHash, kStream };

KeyType ParseKeyType(std::string_view str);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

/// @file userver/storages/redis/transaction.hpp
/// @brief @copybrief storages::redis::Transaction
/// @file userver/storages/redis/pipeline.hpp
/// @brief @copybrief storages::redis::Pipeline

#include <memory>
#include <string>
Expand All @@ -14,27 +14,23 @@ USERVER_NAMESPACE_BEGIN

namespace storages::redis {

/// @brief Atomic sequence of Redis commands (https://redis.io/topics/transactions), that is usually retrieved from
/// storages::redis::Client::Multi().
/// @brief Sequence of Redis commands (https://redis.io/docs/latest/develop/using-commands/pipelining), that is usually
/// retrieved from storages::redis::Client::Pipeline().
///
/// @note Redis transaction implements isolation, but not
/// all-or-nothing semantics (IOW a subcommand may fail, but the following
/// subcommands will succeed).
///
/// Membef functions add commands to the `Transaction` object. For each added command a future-like object is returned.
/// You can get the result of each transaction's subcommand by calling `Get()` method for these objects.
/// @note Member functions add commands to the `Pipeline` object. For each added command a future-like object is
/// returned. You can get the result of each pipeline's subcommand by calling `Get()` method for these objects.
/// Commands are be sent to a server after calling `Exec()` that returns `RequestExec` object.
/// You should not call `Get()` method in a future-like subcommand's object
/// before calling `Get()` method on `RequestExec` object.
///
/// @snippet redis/src/storages/redis/client_redistest.cpp redis transaction sample
class Transaction {
/// @snippet redis/src/storages/redis/client_redistest.cpp redis pipeline sample
class Pipeline {
public:
enum class CheckShards { kNo, kSame };

virtual ~Transaction() = default;
virtual ~Pipeline() = default;

/// Finish current atomic sequence of commands and send it to a server.
/// Finish current sequence of commands and send it to a server.
/// Returns 'future-like' request object.
/// The data will not be set for the future-like objects for subcommands if
/// `Get()` method of the returned object is not called or redis did not return an array with command responses.
Expand Down Expand Up @@ -316,14 +312,14 @@ class Transaction {
// end of redis commands
};

using TransactionPtr = std::unique_ptr<Transaction>;
using PipelinePtr = std::unique_ptr<Pipeline>;

class EmptyTransactionException : public Exception {
class EmptyPipelineException : public Exception {
public:
using Exception::Exception;
};

class NotStartedTransactionException : public Exception {
class NotStartedPipelineException : public Exception {
public:
using Exception::Exception;
};
Expand Down
26 changes: 9 additions & 17 deletions redis/include/userver/storages/redis/request.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

/// @file
/// @brief Valkey/Redis futures for storages::redis::Client and storages::redis::Transaction.
/// @brief Valkey/Redis futures for storages::redis::Client and storages::redis::Pipeline.

#include <memory>
#include <optional>
Expand All @@ -26,22 +26,20 @@ class RequestScanData;

/// @brief Valkey or Redis future for a non-scan and non-eval responses.
///
/// Member functions of classes storages::redis::Client and storages::redis::Transaction that do send request to the
/// Member functions of classes storages::redis::Client and storages::redis::Pipeline that do send request to the
/// Redis return this type or storages::redis::ScanRequest.
template <typename ResultType, typename ReplyType>
class [[nodiscard]] Request final {
public:
using Result = ResultType;
using Reply = ReplyType;

explicit Request(std::unique_ptr<RequestDataBase<ReplyType>>&& impl)
: impl_(std::move(impl))
{}
explicit Request(std::unique_ptr<RequestDataBase<ReplyType>>&& impl) : impl_(std::move(impl)) {}

/// Wait for the request to finish on Redis server, server or request errors (if any) are logged but not thrown.
///
/// @throws Exceptions on misuse (for example, calling Wait() on a single result from a transaction before waiting
/// for the transaction itself).
/// @throws Exceptions on misuse (for example, calling Wait() on a single result from a pipeline before waiting
/// for the pipeline itself).
void Wait() { impl_->Wait(); }

/// Ignore the query result and do not wait for the Redis server to finish executing it
Expand Down Expand Up @@ -76,16 +74,14 @@ class [[nodiscard]] Request final {

/// @brief Redis future for a SCAN-like responses.
///
/// Member functions of classes storages::redis::Client and storages::redis::Transaction that do send SCAN-like request
/// Member functions of classes storages::redis::Client and storages::redis::Pipeline that do send SCAN-like request
/// to the Redis return this type or storages::redis::ScanRequest.
template <ScanTag TScanTag>
class ScanRequest final {
public:
using ReplyElem = typename ScanReplyElem<TScanTag>::type;

explicit ScanRequest(std::unique_ptr<RequestScanDataBase<TScanTag>>&& impl)
: impl_(std::move(impl))
{}
explicit ScanRequest(std::unique_ptr<RequestScanDataBase<TScanTag>>&& impl) : impl_(std::move(impl)) {}

template <typename T = std::vector<ReplyElem>>
T GetAll(std::string request_description) {
Expand All @@ -110,19 +106,15 @@ class ScanRequest final {
using reference = value_type&;
using pointer = value_type*;

explicit Iterator(ScanRequest* stream)
: stream_(stream)
{
explicit Iterator(ScanRequest* stream) : stream_(stream) {
if (stream_ && !stream_->HasMore()) {
stream_ = nullptr;
}
}

class ReplyElemHolder {
public:
ReplyElemHolder(value_type reply_elem)
: reply_elem_(std::move(reply_elem))
{}
ReplyElemHolder(value_type reply_elem) : reply_elem_(std::move(reply_elem)) {}

value_type& operator*() { return reply_elem_; }

Expand Down
28 changes: 14 additions & 14 deletions redis/src/storages/redis/client_cluster_redistest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ UTEST_F(RedisClusterClientTest, TransactionCrossSlot) {
UASSERT_THROW(transaction->Exec(kDefaultCc).Get(), storages::redis::RequestFailedException);
}

UTEST_F(RedisClusterClientTest, TransactionDistinctShards) {
UTEST_F(RedisClusterClientTest, PipelineDistinctShards) {
auto client = GetClient();
auto transaction = client->Multi(storages::redis::Transaction::CheckShards::kNo);
auto transaction = client->Pipeline(storages::redis::Pipeline::CheckShards::kNo);

const size_t num_keys = 10;
const int add = 100;
Expand All @@ -204,33 +204,33 @@ UTEST_F(RedisClusterClientTest, Generic) {

// Must abort in debug builds
#ifdef NDEBUG
UTEST_F(RedisClusterClientTest, NotStartedTransactionNoExec) {
UTEST_F(RedisClusterClientTest, NotStartedPipelineNoExec) {
auto client = GetClient();
auto transaction = client->Multi(storages::redis::Transaction::CheckShards::kNo);
auto transaction = client->Pipeline(storages::redis::Pipeline::CheckShards::kNo);

auto get_req = transaction->Get("key1");
auto set_req = transaction->Set("key1", "value");

EXPECT_THROW(get_req.Get(), storages::redis::NotStartedTransactionException);
EXPECT_THROW(get_req.Wait(), storages::redis::NotStartedTransactionException);
EXPECT_THROW(get_req.Get(), storages::redis::NotStartedPipelineException);
EXPECT_THROW(get_req.Wait(), storages::redis::NotStartedPipelineException);

EXPECT_THROW(set_req.Get(), storages::redis::NotStartedTransactionException);
EXPECT_THROW(set_req.Wait(), storages::redis::NotStartedTransactionException);
EXPECT_THROW(set_req.Get(), storages::redis::NotStartedPipelineException);
EXPECT_THROW(set_req.Wait(), storages::redis::NotStartedPipelineException);
}

UTEST_F(RedisClusterClientTest, NotStartedTransactionTransactionNoGet) {
UTEST_F(RedisClusterClientTest, NotStartedPipelineNoGet) {
auto client = GetClient();
auto transaction = client->Multi(storages::redis::Transaction::CheckShards::kNo);
auto transaction = client->Pipeline(storages::redis::Pipeline::CheckShards::kNo);

auto get_req = transaction->Get("key2");
auto set_req = transaction->Set("key2", "value");
auto request = transaction->Exec({});

EXPECT_THROW(get_req.Get(), storages::redis::NotStartedTransactionException);
EXPECT_THROW(get_req.Wait(), storages::redis::NotStartedTransactionException);
EXPECT_THROW(get_req.Get(), storages::redis::NotStartedPipelineException);
EXPECT_THROW(get_req.Wait(), storages::redis::NotStartedPipelineException);

EXPECT_THROW(set_req.Get(), storages::redis::NotStartedTransactionException);
EXPECT_THROW(set_req.Wait(), storages::redis::NotStartedTransactionException);
EXPECT_THROW(set_req.Get(), storages::redis::NotStartedPipelineException);
EXPECT_THROW(set_req.Wait(), storages::redis::NotStartedPipelineException);
}
#endif

Expand Down
Loading