EventBus for eShopOnCotainers (2)
EventBus for eShopOnCotainers (2)
EventBusRabbitMQ
IRabbitMQPersistentConnection.cs
Define a RabbitMQ persistent connection interface
1. public interface IRabbitMQPersistentConnection
2. : IDisposable
3. {
4. bool IsConnected { get; }
5.
6. bool TryConnect();
7.
8. IModel CreateModel();
9. }
In the nuget package of RabbitMQ.Client, IModel has many event handling methods.
DefaultRabbitMQPersistentConnection.cs
1. public class DefaultRabbitMQPersistentConnection
2. : IRabbitMQPersistentConnection
3. {
4. private readonly IConnectionFactory _connectionFactory;
5. private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger;
6. private readonly int _retryCount;
7. private IConnection _connection;
8. public bool Disposed;
9.
10. readonly object _syncRoot = new();
11.
12. public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5)
13. {
14. _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
15. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
16. _retryCount = retryCount;
17. }
18.
19. public bool IsConnected => _connection is { IsOpen: true } && !Disposed;
20.
21. public IModel CreateModel()
22. {
23. if (!IsConnected)
24. {
25. throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
26. }
27.
28. return _connection.CreateModel();
29. }
30.
31. public void Dispose()
32. {
33. if (Disposed) return;
34.
35. Disposed = true;
36.
37. try
38. {
39. _connection.ConnectionShutdown -= OnConnectionShutdown;
40. _connection.CallbackException -= OnCallbackException;
41. _connection.ConnectionBlocked -= OnConnectionBlocked;
42. _connection.Dispose();
43. }
44. catch (IOException ex)
45. {
46. _logger.LogCritical(ex.ToString());
47. }
48. }
49.
50. public bool TryConnect()
51. {
52. _logger.LogInformation("RabbitMQ Client is trying to connect");
53.
54. lock (_syncRoot)
55. {
56. var policy = RetryPolicy.Handle<SocketException>()
57. .Or<BrokerUnreachableException>()
58. .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
59. {
60. _logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s", $"{time.TotalSeconds:n1}");
61. }
62. );
63.
64. policy.Execute(() =>
65. {
66. _connection = _connectionFactory
67. .CreateConnection();
68. });
69.
70. if (IsConnected)
71. {
72. _connection.ConnectionShutdown += OnConnectionShutdown;
73. _connection.CallbackException += OnCallbackException;
74. _connection.ConnectionBlocked += OnConnectionBlocked;
75.
76. _logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", _connection.Endpoint.HostName);
77.
78. return true;
79. }
80. else
81. {
82. _logger.LogCritical("Fatal error: RabbitMQ connections could not be created and opened");
83.
84. return false;
85. }
86. }
87. }
88.
89. private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
90. {
91. if (Disposed) return;
92.
93. _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect...");
94.
95. TryConnect();
96. }
97.
98. void OnCallbackException(object sender, CallbackExceptionEventArgs e)
99. {
100. if (Disposed) return;
101.
102. _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect...");
103.
104. TryConnect();
105. }
106.
107. void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
108. {
109. if (Disposed) return;
110.
111. _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");
112.
113. TryConnect();
114. }
115. }
EventBusRabbitMQ.cs
Using RabbitMQ's Publish-and-Subscribe Mode
1. namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ;
2. using Microsoft.Extensions.DependencyInjection;
3.
4. public class EventBusRabbitMQ : IEventBus, IDisposable
5. {
6. const string BROKER_NAME = "eshop_event_bus";
7.
8. private static readonly JsonSerializerOptions s_indentedOptions = new() { WriteIndented = true };
9. private static readonly JsonSerializerOptions s_caseInsensitiveOptions = new() { PropertyNameCaseInsensitive = true };
10.
11. private readonly IRabbitMQPersistentConnection _persistentConnection;
12. private readonly ILogger<EventBusRabbitMQ> _logger;
13. private readonly IEventBusSubscriptionsManager _subsManager;
14. private readonly IServiceProvider _serviceProvider;
15. private readonly int _retryCount;
16.
17. private IModel _consumerChannel;
18. private string _queueName;
19.
20. public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,
21. IServiceProvider serviceProvider, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5)
22. {
23. _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
24. _logger = logger ?? throw new ArgumentNullException(nameof(logger));
25. _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
26. _queueName = queueName;
27. _consumerChannel = CreateConsumerChannel();
28. _serviceProvider = serviceProvider;
29. _retryCount = retryCount;
30. _subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
31. }
32.
33. private void SubsManager_OnEventRemoved(object sender, string eventName)
34. {
35. if (!_persistentConnection.IsConnected)
36. {
37. _persistentConnection.TryConnect();
38. }
39.
40. using var channel = _persistentConnection.CreateModel();
41. channel.QueueUnbind(queue: _queueName,
42. exchange: BROKER_NAME,
43. routingKey: eventName);
44.
45. if (_subsManager.IsEmpty)
46. {
47. _queueName = string.Empty;
48. _consumerChannel.Close();
49. }
50. }
51.
52. public void Publish(IntegrationEvent @event)
53. {
54. if (!_persistentConnection.IsConnected)
55. {
56. _persistentConnection.TryConnect();
57. }
58.
59. var policy = RetryPolicy.Handle<BrokerUnreachableException>()
60. .Or<SocketException>()
61. .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
62. {
63. _logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s", @event.Id, $"{time.TotalSeconds:n1}");
64. });
65.
66. var eventName = @event.GetType().Name;
67.
68. _logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName);
69.
70. using var channel = _persistentConnection.CreateModel();
71. _logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id);
72.
73. channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
74.
75. var body = JsonSerializer.SerializeToUtf8Bytes(@event, @event.GetType(), s_indentedOptions);
76.
77. policy.Execute(() =>
78. {
79. var properties = channel.CreateBasicProperties();
80. properties.DeliveryMode = 2; // persistent
81.
82. _logger.LogTrace("Publishing event to RabbitMQ: {EventId}", @event.Id);
83.
84. channel.BasicPublish(
85. exchange: BROKER_NAME,
86. routingKey: eventName,
87. mandatory: true,
88. basicProperties: properties,
89. body: body);
90. });
91. }
92.
93. public void SubscribeDynamic<TH>(string eventName)
94. where TH : IDynamicIntegrationEventHandler
95. {
96. _logger.LogInformation("Subscribing to [c ](/search?q=c )event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());
97.
98. DoInternalSubscription(eventName);
99. _subsManager.AddDynamicSubscription<TH>(eventName);
100. StartBasicConsume();
101. }
102.
103. public void Subscribe<T, TH>()
104. where T : IntegrationEvent
105. where TH : IIntegrationEventHandler<T>
106. {
107. var eventName = _subsManager.GetEventKey<T>();
108. DoInternalSubscription(eventName);
109.
110. _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());
111.
112. _subsManager.AddSubscription<T, TH>();
113. StartBasicConsume();
114. }
115.
116. private void DoInternalSubscription(string eventName)
117. {
118. var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
119. if (!containsKey)
120. {
121. if (!_persistentConnection.IsConnected)
122. {
123. _persistentConnection.TryConnect();
124. }
125.
126. _consumerChannel.QueueBind(queue: _queueName,
127. exchange: BROKER_NAME,
128. routingKey: eventName);
129. }
130. }
131.
132. public void Unsubscribe<T, TH>()
133. where T : IntegrationEvent
134. where TH : IIntegrationEventHandler<T>
135. {
136. var eventName = _subsManager.GetEventKey<T>();
137.
138. _logger.LogInformation("Unsubscribing from event {EventName}", eventName);
139.
140. _subsManager.RemoveSubscription<T, TH>();
141. }
142.
143. public void UnsubscribeDynamic<TH>(string eventName)
144. where TH : IDynamicIntegrationEventHandler
145. {
146. _subsManager.RemoveDynamicSubscription<TH>(eventName);
147. }
148.
149. public void Dispose()
150. {
151. if (_consumerChannel != null)
152. {
153. _consumerChannel.Dispose();
154. }
155.
156. _subsManager.Clear();
157. }
158.
159. private void StartBasicConsume()
160. {
161. _logger.LogTrace("Starting RabbitMQ basic consume");
162.
163. if (_consumerChannel != null)
164. {
165. var consumer = new AsyncEventingBasicConsumer(_consumerChannel);
166.
167. consumer.Received += Consumer_Received;
168.
169. _consumerChannel.BasicConsume(
170. queue: _queueName,
171. autoAck: false,
172. consumer: consumer);
173. }
174. else
175. {
176. _logger.LogError("StartBasicConsume can't call on _consumerChannel == null");
177. }
178. }
179.
180. private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
181. {
182. var eventName = eventArgs.RoutingKey;
183. var message = Encoding.UTF8.GetString(eventArgs.Body.Span);
184.
185. try
186. {
187. if (message.ToLowerInvariant().Contains("throw-fake-exception"))
188. {
189. throw new InvalidOperationException($"Fake exception requested: \"{message}\"");
190. }
191.
192. await ProcessEvent(eventName, message);
193. }
194. catch (Exception ex)
195. {
196. _logger.LogWarning(ex, "Error Processing message \"{Message}\"", message);
197. }
198.
199. // Even on exception we take the message off the queue.
200. // in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX).
201. // For more information see: https://www.rabbitmq.com/dlx.html
202. _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
203. }
204.
205. private IModel CreateConsumerChannel()
206. {
207. if (!_persistentConnection.IsConnected)
208. {
209. _persistentConnection.TryConnect();
210. }
211.
212. _logger.LogTrace("Creating RabbitMQ consumer channel");
213.
214. var channel = _persistentConnection.CreateModel();
215.
216. channel.ExchangeDeclare(exchange: BROKER_NAME,
217. type: "direct");
218.
219. channel.QueueDeclare(queue: _queueName,
220. durable: true,
221. exclusive: false,
222. autoDelete: false,
223. arguments: null);
224.
225. channel.CallbackException += (sender, ea) =>
226. {
227. _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel");
228.
229. _consumerChannel.Dispose();
230. _consumerChannel = CreateConsumerChannel();
231. StartBasicConsume();
232. };
233.
234. return channel;
235. }
236.
237. private async Task ProcessEvent(string eventName, string message)
238. {
239. _logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName);
240.
241. if (_subsManager.HasSubscriptionsForEvent(eventName))
242. {
243. await using var scope = _serviceProvider.CreateAsyncScope();
244. var subscriptions = _subsManager.GetHandlersForEvent(eventName);
245. foreach (var subscription in subscriptions)
246. {
247. if (subscription.IsDynamic)
248. {
249. if (scope.ServiceProvider.GetService(subscription.HandlerType) is not IDynamicIntegrationEventHandler handler) continue;
250. using dynamic eventData = JsonDocument.Parse(message);
251. await Task.Yield();
252. await handler.Handle(eventData);
253. }
254. else
255. {
256. var handler = scope.ServiceProvider.GetService(subscription.HandlerType);
257. if (handler == null) continue;
258. var eventType = _subsManager.GetEventTypeByName(eventName);
259. var integrationEvent = JsonSerializer.Deserialize(message, eventType, s_caseInsensitiveOptions);
260. var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
261.
262. await Task.Yield();
263. await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
264. }
265. }
266. }
267. else
268. {
269. _logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName);
270. }
271. }
272. }