Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ jobs:
--logger "trx;LogFileName=sg-$RANDOM.trx" \
--results-directory TestResults &

dotnet test GFramework.Cqrs.Tests \
-c Release \
--no-build \
--logger "trx;LogFileName=cqrs-$RANDOM.trx" \
--results-directory TestResults &

dotnet test GFramework.Ecs.Arch.Tests \
-c Release \
--no-build \
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
using GFramework.Core.Abstractions.Command;
using GFramework.Core.Abstractions.Cqrs;
using GFramework.Core.Abstractions.Environment;
using GFramework.Core.Abstractions.Events;
using GFramework.Core.Abstractions.Model;
using GFramework.Core.Abstractions.Query;
using GFramework.Core.Abstractions.Systems;
using GFramework.Core.Abstractions.Utility;
using GFramework.Cqrs.Abstractions.Cqrs;
using ICommand = GFramework.Core.Abstractions.Command.ICommand;

namespace GFramework.Core.Abstractions.Architectures;
Expand Down Expand Up @@ -131,7 +131,7 @@ public interface IArchitectureContext
/// <remarks>
/// 这是迁移后的推荐命令入口。无返回值命令应实现 <c>IRequest&lt;Unit&gt;</c>,并优先通过 <see cref="SendAsync{TCommand}(TCommand,CancellationToken)" /> 调用。
/// </remarks>
TResponse SendCommand<TResponse>(Cqrs.Command.ICommand<TResponse> command);
TResponse SendCommand<TResponse>(GFramework.Cqrs.Abstractions.Cqrs.Command.ICommand<TResponse> command);


/// <summary>
Expand All @@ -147,7 +147,8 @@ public interface IArchitectureContext
/// <param name="command">要发送的 CQRS 命令。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>包含命令执行结果的值任务。</returns>
ValueTask<TResponse> SendCommandAsync<TResponse>(Cqrs.Command.ICommand<TResponse> command,
ValueTask<TResponse> SendCommandAsync<TResponse>(
GFramework.Cqrs.Abstractions.Cqrs.Command.ICommand<TResponse> command,
CancellationToken cancellationToken = default);


Expand Down Expand Up @@ -176,7 +177,7 @@ ValueTask<TResponse> SendCommandAsync<TResponse>(Cqrs.Command.ICommand<TResponse
/// <remarks>
/// 这是迁移后的推荐查询入口。新查询应优先实现 <c>GFramework.Core.Abstractions.Cqrs.Query.IQuery&lt;TResponse&gt;</c>。
/// </remarks>
TResponse SendQuery<TResponse>(Cqrs.Query.IQuery<TResponse> query);
TResponse SendQuery<TResponse>(GFramework.Cqrs.Abstractions.Cqrs.Query.IQuery<TResponse> query);

/// <summary>
/// 异步发送一个旧版查询请求。
Expand All @@ -193,7 +194,7 @@ ValueTask<TResponse> SendCommandAsync<TResponse>(Cqrs.Command.ICommand<TResponse
/// <param name="query">要发送的 CQRS 查询。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>包含查询结果的值任务。</returns>
ValueTask<TResponse> SendQueryAsync<TResponse>(Cqrs.Query.IQuery<TResponse> query,
ValueTask<TResponse> SendQueryAsync<TResponse>(GFramework.Cqrs.Abstractions.Cqrs.Query.IQuery<TResponse> query,
CancellationToken cancellationToken = default);

/// <summary>
Expand Down
25 changes: 0 additions & 25 deletions GFramework.Core.Abstractions/Cqrs/Command/ICommand.cs

This file was deleted.

52 changes: 52 additions & 0 deletions GFramework.Core.Abstractions/Cqrs/ICqrsRuntime.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using GFramework.Core.Abstractions.Architectures;
using GFramework.Cqrs.Abstractions.Cqrs;

namespace GFramework.Core.Abstractions.Cqrs;

/// <summary>
/// 定义架构上下文使用的 CQRS runtime seam。
/// 该抽象把请求分发、通知发布与流式处理从具体实现中解耦,
/// 使 <see cref="IArchitectureContext" /> 不再直接依赖某个固定的 runtime 类型。
/// </summary>
public interface ICqrsRuntime
{
/// <summary>
/// 发送请求并返回响应。
/// </summary>
/// <typeparam name="TResponse">响应类型。</typeparam>
/// <param name="context">当前架构上下文,用于上下文感知处理器注入与嵌套请求访问。</param>
/// <param name="request">要分发的请求。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>请求响应。</returns>
ValueTask<TResponse> SendAsync<TResponse>(
IArchitectureContext context,
IRequest<TResponse> request,
CancellationToken cancellationToken = default);

/// <summary>
/// 发布通知到所有已注册处理器。
/// </summary>
/// <typeparam name="TNotification">通知类型。</typeparam>
/// <param name="context">当前架构上下文,用于上下文感知处理器注入。</param>
/// <param name="notification">要发布的通知。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>表示通知分发完成的值任务。</returns>
ValueTask PublishAsync<TNotification>(
IArchitectureContext context,
TNotification notification,
CancellationToken cancellationToken = default)
where TNotification : INotification;

/// <summary>
/// 创建流式请求的异步响应序列。
/// </summary>
/// <typeparam name="TResponse">流元素类型。</typeparam>
/// <param name="context">当前架构上下文,用于上下文感知处理器注入。</param>
/// <param name="request">流式请求。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>按需生成的异步响应序列。</returns>
IAsyncEnumerable<TResponse> CreateStream<TResponse>(
IArchitectureContext context,
IStreamRequest<TResponse> request,
CancellationToken cancellationToken = default);
}
18 changes: 0 additions & 18 deletions GFramework.Core.Abstractions/Cqrs/Query/IQuery.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
<ItemGroup>
<Using Include="GFramework.Core.Abstractions"/>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\GFramework.Cqrs.Abstractions\GFramework.Cqrs.Abstractions.csproj"/>
</ItemGroup>
<ItemGroup>
<PackageReference Update="Meziantou.Analyzer" Version="3.0.46">
<PrivateAssets>all</PrivateAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Architectures;
using GFramework.Core.Logging;
using GFramework.Cqrs.Abstractions.Cqrs;

namespace GFramework.Core.Tests.Architectures;

Expand All @@ -12,8 +13,6 @@ namespace GFramework.Core.Tests.Architectures;
[TestFixture]
public sealed class ArchitectureAdditionalCqrsHandlersTests
{
private ILoggerFactoryProvider? _previousLoggerFactoryProvider;

/// <summary>
/// 初始化日志工厂和共享测试状态。
/// </summary>
Expand All @@ -39,6 +38,8 @@ public void TearDown()
"LoggerFactoryResolver.Provider should be captured during setup.");
}

private ILoggerFactoryProvider? _previousLoggerFactoryProvider;

/// <summary>
/// 验证显式声明的额外程序集会在初始化阶段接入当前架构容器。
/// </summary>
Expand Down Expand Up @@ -197,4 +198,4 @@ private static INotificationHandler<AdditionalAssemblyNotification> CreateHandle
});
return handler.Object;
}
}
}
84 changes: 79 additions & 5 deletions GFramework.Core.Tests/Architectures/ArchitectureContextTests.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System.Reflection;
using GFramework.Core.Abstractions.Architectures;
using GFramework.Core.Abstractions.Command;
using GFramework.Core.Abstractions.Cqrs;
using GFramework.Core.Abstractions.Enums;
using GFramework.Core.Abstractions.Environment;
using GFramework.Core.Abstractions.Ioc;
using GFramework.Core.Abstractions.Model;
using GFramework.Core.Abstractions.Query;
using GFramework.Core.Abstractions.Systems;
Expand All @@ -14,6 +16,7 @@
using GFramework.Core.Ioc;
using GFramework.Core.Logging;
using GFramework.Core.Query;
using GFramework.Cqrs.Abstractions.Cqrs;

namespace GFramework.Core.Tests.Architectures;

Expand Down Expand Up @@ -73,13 +76,14 @@ public void SetUp()
_context = new ArchitectureContext(_container);
}

private ArchitectureContext? _context;
private AsyncQueryExecutor? _asyncQueryBus;
private CommandExecutor? _commandBus;
private MicrosoftDiContainer? _container;

private ArchitectureContext? _context;
private DefaultEnvironment? _environment;
private EventBus? _eventBus;
private CommandExecutor? _commandBus;
private QueryExecutor? _queryBus;
private AsyncQueryExecutor? _asyncQueryBus;
private DefaultEnvironment? _environment;

/// <summary>
/// 测试构造函数在所有参数都有效时不应抛出异常
Expand Down Expand Up @@ -298,6 +302,76 @@ public void GetEnvironment_Should_Return_EnvironmentInstance()
Assert.That(environment, Is.Not.Null);
Assert.That(environment, Is.InstanceOf<IEnvironment>());
}

/// <summary>
/// 测试 CQRS runtime 在并发首次访问时只会从容器解析一次。
/// </summary>
[Test]
public async Task SendRequestAsync_Should_ResolveCqrsRuntime_OnlyOnce_When_AccessedConcurrently()
{
const int workerCount = 8;
var workerStartupTimeout = TimeSpan.FromSeconds(5);
var firstResolutionTimeout = TimeSpan.FromSeconds(5);
using var startGate = new ManualResetEventSlim(false);
using var allowResolutionToComplete = new ManualResetEventSlim(false);
using var workersReady = new CountdownEvent(workerCount);
var resolutionCallCount = 0;
var runtime = new Mock<ICqrsRuntime>(MockBehavior.Strict);
var container = new Mock<IIocContainer>(MockBehavior.Strict);

runtime.Setup(mockRuntime => mockRuntime.SendAsync(
It.IsAny<IArchitectureContext>(),
It.IsAny<IRequest<int>>(),
It.IsAny<CancellationToken>()))
.Returns(new ValueTask<int>(42));

container.Setup(mockContainer => mockContainer.Get<ICqrsRuntime>())
.Returns(() =>
{
Interlocked.Increment(ref resolutionCallCount);
allowResolutionToComplete.Wait();
return runtime.Object;
});

var context = new ArchitectureContext(container.Object);
var requests = Enumerable.Range(0, workerCount)
.Select(_ => Task.Run(async () =>
{
workersReady.Signal();
startGate.Wait();
return await context.SendRequestAsync(new TestCqrsRequest());
}))
.ToArray();

Assert.That(
workersReady.Wait(workerStartupTimeout),
Is.True,
"Expected all workers to be ready before releasing start gate.");
startGate.Set();

Assert.That(
SpinWait.SpinUntil(() => Volatile.Read(ref resolutionCallCount) > 0, firstResolutionTimeout),
Is.True,
"Expected at least one CQRS runtime resolution attempt.");

allowResolutionToComplete.Set();

var responses = await Task.WhenAll(requests);

Assert.That(responses, Has.All.EqualTo(42));
Assert.That(resolutionCallCount, Is.EqualTo(1));
container.Verify(mockContainer => mockContainer.Get<ICqrsRuntime>(), Times.Once);
runtime.Verify(
mockRuntime => mockRuntime.SendAsync(
It.IsAny<IArchitectureContext>(),
It.IsAny<IRequest<int>>(),
It.IsAny<CancellationToken>()),
Times.Exactly(requests.Length));
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

private sealed class TestCqrsRequest : IRequest<int>
{
}
}

#region Test Classes
Expand Down Expand Up @@ -442,4 +516,4 @@ public class TestEventV2
public int Data { get; init; }
}

#endregion
#endregion
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using GFramework.Core.Abstractions.Utility;
using GFramework.Core.Architectures;
using GFramework.Core.Logging;
using GfCqrs = GFramework.Core.Abstractions.Cqrs;
using GFramework.Cqrs.Abstractions.Cqrs;

namespace GFramework.Core.Tests.Architectures;

Expand Down Expand Up @@ -151,14 +151,14 @@ private sealed class InstalledByModuleUtility : IUtility
/// <summary>
/// 用于验证管道行为注册是否生效的测试请求。
/// </summary>
public sealed class ModuleBehaviorRequest : GfCqrs.IRequest<string>
public sealed class ModuleBehaviorRequest : IRequest<string>
{
}

/// <summary>
/// 处理测试请求的处理器。
/// </summary>
public sealed class ModuleBehaviorRequestHandler : GfCqrs.IRequestHandler<ModuleBehaviorRequest, string>
public sealed class ModuleBehaviorRequestHandler : IRequestHandler<ModuleBehaviorRequest, string>
{
/// <summary>
/// 返回固定结果,便于聚焦验证管道行为是否执行。
Expand All @@ -177,8 +177,8 @@ public ValueTask<string> Handle(ModuleBehaviorRequest request, CancellationToken
/// </summary>
/// <typeparam name="TRequest">请求类型。</typeparam>
/// <typeparam name="TResponse">响应类型。</typeparam>
public sealed class TrackingPipelineBehavior<TRequest, TResponse> : GfCqrs.IPipelineBehavior<TRequest, TResponse>
where TRequest : GfCqrs.IRequest<TResponse>
public sealed class TrackingPipelineBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : IRequest<TResponse>
{
/// <summary>
/// 获取当前测试进程中该请求类型对应的行为触发次数。
Expand All @@ -193,7 +193,7 @@ public sealed class TrackingPipelineBehavior<TRequest, TResponse> : GfCqrs.IPipe
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>下游处理器的响应结果。</returns>
public async ValueTask<TResponse> Handle(
TRequest message, GfCqrs.MessageHandlerDelegate<TRequest, TResponse> next,
TRequest message, MessageHandlerDelegate<TRequest, TResponse> next,
CancellationToken cancellationToken)
{
InvocationCount++;
Expand Down
Loading
Loading