Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
namespace GFramework.Cqrs.Benchmarks.Messaging;

/// <summary>
/// 对比固定 4 个处理器的 notification fan-out publish 在 baseline、GFramework.CQRS、NuGet `Mediator`
/// 与 MediatR 之间的开销。
/// 对比固定 4 个处理器的 notification fan-out publish 在 baseline、GFramework.CQRS 默认顺序发布器、
/// GFramework.CQRS 内置 <c>TaskWhenAllNotificationPublisher</c>、NuGet `Mediator` 与 MediatR 之间的开销。
/// </summary>
[Config(typeof(Config))]
public class NotificationFanOutBenchmarks
Expand Down
2 changes: 1 addition & 1 deletion GFramework.Cqrs.Benchmarks/Messaging/RequestBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Task<BenchmarkResponse> SendRequest_MediatR()
}

/// <summary>
/// 通过 `ai-libs/Mediator` 的 source-generated concrete mediator 发送 request,作为高性能对照组。
/// 通过 NuGet `Mediator` 的 source-generated concrete mediator 发送 request,作为高性能对照组。
/// </summary>
/// <returns>代表当前 `Mediator` request dispatch 完成的值任务。</returns>
[Benchmark]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public Task<BenchmarkResponse> SendRequest_MediatR()
}

/// <summary>
/// 按生命周期把 benchmark request handler 注册到 GFramework 容器。
/// 按生命周期把 benchmark request handler 注册到 GFramework 容器。
/// </summary>
/// <param name="container">当前 benchmark 拥有并负责释放的容器。</param>
/// <param name="lifetime">待比较的 handler 生命周期。</param>
Expand Down Expand Up @@ -248,7 +248,7 @@ private static ServiceLifetime ResolveMediatRLifetime(HandlerLifetime lifetime)
}

/// <summary>
/// Benchmark request。
/// Benchmark request。
/// </summary>
/// <param name="Id">请求标识。</param>
public sealed record BenchmarkRequest(Guid Id) :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public ValueTask Stream_MediatR()
}

/// <summary>
/// 通过 `ai-libs/Mediator` 的 source-generated concrete mediator 创建 stream,并按当前观测模式消费。
/// 通过 NuGet `Mediator` 的 source-generated concrete mediator 创建 stream,并按当前观测模式消费。
/// </summary>
/// <returns>按当前观测模式完成 stream 消费后的等待句柄。</returns>
[Benchmark]
Expand Down
23 changes: 23 additions & 0 deletions GFramework.Cqrs.Tests/Cqrs/CqrsDispatcherContextValidationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,29 @@ public void CreateStream_Should_Throw_When_Context_Does_Not_Implement_IArchitect
Throws.InvalidOperationException.With.Message.Contains("does not implement IArchitectureContext"));
}

/// <summary>
/// 验证 stream handler 缺失时,dispatcher 会在建流调用点同步抛出异常,
/// 而不是返回一个延迟到枚举阶段才失败的异步流。
/// </summary>
[Test]
public void CreateStream_Should_Throw_When_Handler_Is_Missing()
{
var runtime = CreateRuntime(
container =>
{
container
.Setup(currentContainer => currentContainer.Get(typeof(IStreamRequestHandler<ContextAwareStreamRequest, int>)))
.Returns((object?)null);
container
.Setup(currentContainer => currentContainer.HasRegistration(typeof(IStreamPipelineBehavior<ContextAwareStreamRequest, int>)))
.Returns(false);
});

Assert.That(
() => runtime.CreateStream(new FakeCqrsContext(), new ContextAwareStreamRequest()),
Throws.InvalidOperationException.With.Message.Contains("No CQRS stream handler registered"));
}

/// <summary>
/// 验证当 stream pipeline behavior 需要上下文注入、但当前 CQRS 上下文不实现
/// <see cref="GFramework.Core.Abstractions.Architectures.IArchitectureContext" /> 时,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

using System.Reflection;
using System.Threading;
using GFramework.Core.Abstractions.Logging;
using GFramework.Core.Architectures;
using GFramework.Core.Ioc;
Expand All @@ -28,6 +29,7 @@ public void SetUp()
{
_previousLoggerFactoryProvider = LoggerFactoryResolver.Provider;
LoggerFactoryResolver.Provider = new ConsoleLoggerFactoryProvider();
GeneratedRequestPipelineTrackingBehavior.InvocationCount = 0;
GeneratedStreamPipelineTrackingBehavior.InvocationCount = 0;
ClearRegistrarCaches();
ClearDispatcherCaches();
Expand All @@ -40,6 +42,7 @@ public void SetUp()
public void TearDown()
{
LoggerFactoryResolver.Provider = _previousLoggerFactoryProvider ?? new ConsoleLoggerFactoryProvider();
GeneratedRequestPipelineTrackingBehavior.InvocationCount = 0;
GeneratedStreamPipelineTrackingBehavior.InvocationCount = 0;
ClearRegistrarCaches();
ClearDispatcherCaches();
Expand Down Expand Up @@ -181,6 +184,30 @@ public async Task SendAsync_Should_Use_Generated_Request_Invoker_For_Hidden_Impl
Assert.That(response, Is.EqualTo("generated-hidden:payload"));
}

/// <summary>
/// 验证 generated request invoker 与 request pipeline 行为同时存在时,
/// dispatcher 仍会保持 generated invoker 优先,并正确复用现有 request 执行链。
/// </summary>
[Test]
public async Task SendAsync_Should_Use_Generated_Request_Invoker_Inside_Request_Pipeline()
{
var generatedAssembly = CreateGeneratedRequestInvokerAssembly();
var container = new MicrosoftDiContainer();
container.RegisterCqrsPipelineBehavior<GeneratedRequestPipelineTrackingBehavior>();

CqrsTestRuntime.RegisterHandlers(container, generatedAssembly.Object);
container.Freeze();

var context = new ArchitectureContext(container);
var response = await context.SendRequestAsync(new GeneratedRequestInvokerRequest("payload")).ConfigureAwait(false);

Assert.Multiple(() =>
{
Assert.That(response, Is.EqualTo("generated:payload"));
Assert.That(GeneratedRequestPipelineTrackingBehavior.InvocationCount, Is.EqualTo(1));
});
}

/// <summary>
/// 验证 dispatcher 在首次创建 stream binding 时,会优先消费 generated stream invoker provider。
/// </summary>
Expand Down Expand Up @@ -608,6 +635,40 @@ public async Task CreateStream_Should_Use_Later_Valid_Generated_Stream_Descripto
Assert.That(results, Is.EqualTo([30, 31]));
}

/// <summary>
/// 记录 generated request invoker 与 request pipeline 行为组合时的命中次数。
/// </summary>
private sealed class GeneratedRequestPipelineTrackingBehavior
: IPipelineBehavior<GeneratedRequestInvokerRequest, string>
{
private static int _invocationCount;

/// <summary>
/// 获取或重置当前测试进程中的行为触发次数。
/// </summary>
public static int InvocationCount
{
get => Volatile.Read(ref _invocationCount);
set => Volatile.Write(ref _invocationCount, value);
}

/// <summary>
/// 记录一次行为执行,然后继续执行 generated request invoker。
/// </summary>
/// <param name="message">当前请求消息。</param>
/// <param name="next">下一个处理阶段。</param>
/// <param name="cancellationToken">取消令牌。</param>
/// <returns>下游处理阶段返回的响应。</returns>
public ValueTask<string> Handle(
GeneratedRequestInvokerRequest message,
MessageHandlerDelegate<GeneratedRequestInvokerRequest, string> next,
CancellationToken cancellationToken)
{
Interlocked.Increment(ref _invocationCount);
return next(message, cancellationToken);
}
}

/// <summary>
/// 模拟返回实例 request invoker 方法的 generated registry。
/// </summary>
Expand Down
25 changes: 18 additions & 7 deletions GFramework.Cqrs/Internal/CqrsDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public ValueTask<TResponse> SendAsync<TResponse>(
}

return dispatchBinding.GetPipelineExecutor(behaviors.Count)
.Invoke(handler, behaviors, request, cancellationToken);
.Invoke(handler, behaviors, dispatchBinding.RequestInvoker, request, cancellationToken);
}
catch (Exception exception)
{
Expand Down Expand Up @@ -604,12 +604,14 @@ private static ValueTask<TResponse> InvokeRequestHandlerAsync<TRequest, TRespons
private static ValueTask<TResponse> InvokeRequestPipelineExecutorAsync<TRequest, TResponse>(
object handler,
IReadOnlyList<object> behaviors,
RequestInvoker<TResponse> requestInvoker,
object request,
CancellationToken cancellationToken)
where TRequest : IRequest<TResponse>
{
var invocation = new RequestPipelineInvocation<TRequest, TResponse>(
(IRequestHandler<TRequest, TResponse>)handler,
handler,
requestInvoker,
behaviors);
return invocation.InvokeAsync((TRequest)request, cancellationToken);
}
Expand Down Expand Up @@ -669,6 +671,7 @@ private delegate ValueTask<TResponse> RequestInvoker<TResponse>(
private delegate ValueTask<TResponse> RequestPipelineInvoker<TResponse>(
object handler,
IReadOnlyList<object> behaviors,
RequestInvoker<TResponse> requestInvoker,
object request,
CancellationToken cancellationToken);

Expand Down Expand Up @@ -978,6 +981,7 @@ private sealed class RequestPipelineExecutor<TResponse>(
public ValueTask<TResponse> Invoke(
object handler,
IReadOnlyList<object> behaviors,
RequestInvoker<TResponse> requestInvoker,
object request,
CancellationToken cancellationToken)
{
Expand All @@ -987,7 +991,7 @@ public ValueTask<TResponse> Invoke(
$"Cached request pipeline executor expected {BehaviorCount} behaviors, but received {behaviors.Count}.");
}

return invoker(handler, behaviors, request, cancellationToken);
return invoker(handler, behaviors, requestInvoker, request, cancellationToken);
}
}

Expand Down Expand Up @@ -1147,11 +1151,13 @@ internal static void RegisterGeneratedStreamInvokerDescriptor(
/// 该对象只存在于本次分发,不会跨请求保留容器解析出的实例。
/// </summary>
private sealed class RequestPipelineInvocation<TRequest, TResponse>(
IRequestHandler<TRequest, TResponse> handler,
object handler,
RequestInvoker<TResponse> requestInvoker,
IReadOnlyList<object> behaviors)
where TRequest : IRequest<TResponse>
{
private readonly IRequestHandler<TRequest, TResponse> _handler = handler;
private readonly object _handler = handler;
private readonly RequestInvoker<TResponse> _requestInvoker = requestInvoker;
private readonly IReadOnlyList<object> _behaviors = behaviors;
private readonly MessageHandlerDelegate<TRequest, TResponse>?[] _continuations =
new MessageHandlerDelegate<TRequest, TResponse>?[behaviors.Count + 1];
Expand Down Expand Up @@ -1198,11 +1204,16 @@ private ValueTask<TResponse> InvokeBehaviorAsync(
}

/// <summary>
/// 调用最终请求处理器
/// 调用最终请求处理入口
/// </summary>
/// <remarks>
/// request pipeline 末端必须继续复用当前 binding 上缓存的 <see cref="RequestInvoker{TResponse}" />,
/// 这样 generated request invoker provider 才能在接入 pipeline 后保持与无 pipeline 路径一致的调用语义,
/// 而不是退回到接口虚调用路径。
/// </remarks>
private ValueTask<TResponse> InvokeHandlerAsync(TRequest request, CancellationToken cancellationToken)
{
return _handler.Handle(request, cancellationToken);
return _requestInvoker(_handler, request, cancellationToken);
}

/// <summary>
Expand Down
Loading
Loading