-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathINodeDiscoveryProvider.Consul.cs
More file actions
157 lines (135 loc) · 5.98 KB
/
INodeDiscoveryProvider.Consul.cs
File metadata and controls
157 lines (135 loc) · 5.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Consul;
using Microsoft.Extensions.Logging;
namespace DotNetCore.CAP.Dashboard.NodeDiscovery;
public class ConsulNodeDiscoveryProvider : INodeDiscoveryProvider
{
private readonly ConsulDiscoveryOptions _discoveryOptions;
private readonly ILogger<ConsulNodeDiscoveryProvider> _logger;
public ConsulNodeDiscoveryProvider(ILoggerFactory logger, ConsulDiscoveryOptions options)
{
_logger = logger.CreateLogger<ConsulNodeDiscoveryProvider>();
_discoveryOptions = options;
}
public async Task<Node> GetNode(string nodeName, string ns, CancellationToken cancellationToken = default)
{
try
{
using var consul = new ConsulClient(config =>
{
config.WaitTime = TimeSpan.FromSeconds(5);
config.Address =
new Uri(
$"http://{_discoveryOptions.DiscoveryServerHostName}:{_discoveryOptions.DiscoveryServerPort}");
});
var serviceCatalog = await consul.Catalog.Service(nodeName, "CAP", cancellationToken);
if (serviceCatalog.StatusCode == HttpStatusCode.OK)
return serviceCatalog.Response.Select(info => new Node
{
Id = info.ServiceID,
Name = info.ServiceName,
Address = info.ServiceAddress,
Port = info.ServicePort,
Tags = string.Join(", ", info.ServiceTags)
}).FirstOrDefault();
}
catch (Exception ex)
{
_logger.LogError(ex, $"Get consul nodes raised an exception. Exception:{ex.Message}");
}
return null;
}
public async Task<IList<Node>> GetNodes(string ns, CancellationToken cancellationToken)
{
try
{
var nodes = new List<Node>();
using var consul = new ConsulClient(config =>
{
config.WaitTime = TimeSpan.FromSeconds(5);
config.Address =
new Uri(
$"http://{_discoveryOptions.DiscoveryServerHostName}:{_discoveryOptions.DiscoveryServerPort}");
});
var services = await consul.Catalog.Services(cancellationToken);
foreach (var service in services.Response)
{
var serviceInfo = consul.Catalog.Service(service.Key, "CAP", cancellationToken).GetAwaiter()
.GetResult();
var node = serviceInfo.Response.Select(info => new Node
{
Id = info.ServiceID,
Name = info.ServiceName,
Address = "http://" + info.ServiceAddress,
Port = info.ServicePort,
Tags = string.Join(", ", info.ServiceTags)
}).ToList();
nodes.AddRange(node);
}
CapCache.Global.AddOrUpdate("cap.nodes.count", nodes.Count, TimeSpan.FromSeconds(60), true);
return nodes;
}
catch (Exception ex)
{
CapCache.Global.AddOrUpdate("cap.nodes.count", 0, TimeSpan.FromSeconds(20));
_logger.LogError(
$"Get consul nodes raised an exception. Exception:{ex.Message},{ex.InnerException.Message}");
return null;
}
}
public async Task RegisterNode(CancellationToken cancellationToken)
{
try
{
var healthCheck = new AgentServiceCheck
{
DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(30),
Interval = TimeSpan.FromSeconds(10),
Status = HealthStatus.Passing
};
if (_discoveryOptions.Scheme.Equals("http", StringComparison.OrdinalIgnoreCase))
healthCheck.HTTP =
$"http://{_discoveryOptions.CurrentNodeHostName}:{_discoveryOptions.CurrentNodePort}{_discoveryOptions.MatchPath}/api/health";
else if (_discoveryOptions.Scheme.Equals("https", StringComparison.OrdinalIgnoreCase))
healthCheck.TCP = $"{_discoveryOptions.CurrentNodeHostName}:{_discoveryOptions.CurrentNodePort}";
var tags = new[] { "CAP", "Client", "Dashboard" };
if (_discoveryOptions.CustomTags != null && _discoveryOptions.CustomTags.Length > 0)
tags = tags.Union(_discoveryOptions.CustomTags).ToArray();
using var consul = new ConsulClient(config =>
{
config.WaitTime = TimeSpan.FromSeconds(5);
config.Address =
new Uri(
$"http://{_discoveryOptions.DiscoveryServerHostName}:{_discoveryOptions.DiscoveryServerPort}");
});
var result = await consul.Agent.ServiceRegister(new AgentServiceRegistration
{
ID = _discoveryOptions.NodeId,
Name = _discoveryOptions.NodeName,
Address = _discoveryOptions.CurrentNodeHostName,
Port = _discoveryOptions.CurrentNodePort,
Tags = tags,
Check = healthCheck
}, cancellationToken);
if (result.StatusCode == HttpStatusCode.OK) _logger.LogInformation("Consul node register success!");
}
catch (Exception ex)
{
_logger.LogError(
$"Get consul nodes raised an exception. Exception:{ex.Message},{ex.InnerException.Message}");
}
}
public async Task<IList<Node>> ListServices(string ns = null)
{
// For Consul, namespace is not directly supported, so we return all CAP services
// This is similar to GetNodes but provides a consistent interface
return await GetNodes(ns, CancellationToken.None);
}
}