-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathITransport.NATS.cs
More file actions
76 lines (62 loc) · 2.42 KB
/
ITransport.NATS.cs
File metadata and controls
76 lines (62 loc) · 2.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using NATS.Client;
using NATS.Client.JetStream;
namespace DotNetCore.CAP.NATS;
internal class NATSTransport : ITransport
{
private readonly IConnectionPool _connectionPool;
private readonly ILogger _logger;
private readonly JetStreamOptions _jetStreamOptions;
public NATSTransport(ILogger<NATSTransport> logger, IConnectionPool connectionPool)
{
_logger = logger;
_connectionPool = connectionPool;
_jetStreamOptions = JetStreamOptions.Builder().WithPublishNoAck(false).WithRequestTimeout(3000).Build();
}
public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _connectionPool.ServersAddress);
public async Task<OperateResult> SendAsync(TransportMessage message)
{
try
{
var connection = _connectionPool.RentConnection();
try
{
var msg = new Msg(message.GetName(), message.Body.ToArray());
foreach (var header in message.Headers)
{
msg.Header[header.Key] = header.Value;
}
var js = connection.CreateJetStreamContext(_jetStreamOptions);
var builder = PublishOptions.Builder().WithMessageId(message.GetId());
var resp = await js.PublishAsync(msg, builder.Build());
if (resp.Seq > 0)
{
_logger.LogDebug($"NATS stream message [{message.GetName()}] has been published.");
return OperateResult.Success;
}
throw new PublisherSentFailedException("NATS message send failed, no consumer reply!");
}
catch (Exception ex)
{
var warpEx = new PublisherSentFailedException(ex.Message, ex);
return OperateResult.Failed(warpEx);
}
finally
{
_connectionPool.Return(connection);
}
}
catch (Exception e)
{
var warpEx = new PublisherSentFailedException(e.Message, e);
return OperateResult.Failed(warpEx);
}
}
}