Skip to main content

EventBus for eShopOnCotainers (2)

EventBus for eShopOnCotainers (2)

EventBusRabbitMQ

IRabbitMQPersistentConnection.cs

Define a RabbitMQ persistent connection interface

      1. public interface IRabbitMQPersistentConnection

2. : IDisposable

3. {

4. bool IsConnected { get; }

5.

6. bool TryConnect();

7.

8. IModel CreateModel();

9. }



In the nuget package of RabbitMQ.Client, IModel has many event handling methods.

DefaultRabbitMQPersistentConnection.cs

       1. public class DefaultRabbitMQPersistentConnection

2. : IRabbitMQPersistentConnection

3. {

4. private readonly IConnectionFactory _connectionFactory;

5. private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger;

6. private readonly int _retryCount;

7. private IConnection _connection;

8. public bool Disposed;

9.

10. readonly object _syncRoot = new();

11.

12. public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5)

13. {

14. _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));

15. _logger = logger ?? throw new ArgumentNullException(nameof(logger));

16. _retryCount = retryCount;

17. }

18.

19. public bool IsConnected => _connection is { IsOpen: true } && !Disposed;

20.

21. public IModel CreateModel()

22. {

23. if (!IsConnected)

24. {

25. throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");

26. }

27.

28. return _connection.CreateModel();

29. }

30.

31. public void Dispose()

32. {

33. if (Disposed) return;

34.

35. Disposed = true;

36.

37. try

38. {

39. _connection.ConnectionShutdown -= OnConnectionShutdown;

40. _connection.CallbackException -= OnCallbackException;

41. _connection.ConnectionBlocked -= OnConnectionBlocked;

42. _connection.Dispose();

43. }

44. catch (IOException ex)

45. {

46. _logger.LogCritical(ex.ToString());

47. }

48. }

49.

50. public bool TryConnect()

51. {

52. _logger.LogInformation("RabbitMQ Client is trying to connect");

53.

54. lock (_syncRoot)

55. {

56. var policy = RetryPolicy.Handle<SocketException>()

57. .Or<BrokerUnreachableException>()

58. .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>

59. {

60. _logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s", $"{time.TotalSeconds:n1}");

61. }

62. );

63.

64. policy.Execute(() =>

65. {

66. _connection = _connectionFactory

67. .CreateConnection();

68. });

69.

70. if (IsConnected)

71. {

72. _connection.ConnectionShutdown += OnConnectionShutdown;

73. _connection.CallbackException += OnCallbackException;

74. _connection.ConnectionBlocked += OnConnectionBlocked;

75.

76. _logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", _connection.Endpoint.HostName);

77.

78. return true;

79. }

80. else

81. {

82. _logger.LogCritical("Fatal error: RabbitMQ connections could not be created and opened");

83.

84. return false;

85. }

86. }

87. }

88.

89. private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)

90. {

91. if (Disposed) return;

92.

93. _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect...");

94.

95. TryConnect();

96. }

97.

98. void OnCallbackException(object sender, CallbackExceptionEventArgs e)

99. {

100. if (Disposed) return;

101.

102. _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect...");

103.

104. TryConnect();

105. }

106.

107. void OnConnectionShutdown(object sender, ShutdownEventArgs reason)

108. {

109. if (Disposed) return;

110.

111. _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");

112.

113. TryConnect();

114. }

115. }




EventBusRabbitMQ.cs

Using RabbitMQ's Publish-and-Subscribe Mode

       1. namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ;

2. using Microsoft.Extensions.DependencyInjection;

3.

4. public class EventBusRabbitMQ : IEventBus, IDisposable

5. {

6. const string BROKER_NAME = "eshop_event_bus";

7.

8. private static readonly JsonSerializerOptions s_indentedOptions = new() { WriteIndented = true };

9. private static readonly JsonSerializerOptions s_caseInsensitiveOptions = new() { PropertyNameCaseInsensitive = true };

10.

11. private readonly IRabbitMQPersistentConnection _persistentConnection;

12. private readonly ILogger<EventBusRabbitMQ> _logger;

13. private readonly IEventBusSubscriptionsManager _subsManager;

14. private readonly IServiceProvider _serviceProvider;

15. private readonly int _retryCount;

16.

17. private IModel _consumerChannel;

18. private string _queueName;

19.

20. public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,

21. IServiceProvider serviceProvider, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5)

22. {

23. _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));

24. _logger = logger ?? throw new ArgumentNullException(nameof(logger));

25. _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();

26. _queueName = queueName;

27. _consumerChannel = CreateConsumerChannel();

28. _serviceProvider = serviceProvider;

29. _retryCount = retryCount;

30. _subsManager.OnEventRemoved += SubsManager_OnEventRemoved;

31. }

32.

33. private void SubsManager_OnEventRemoved(object sender, string eventName)

34. {

35. if (!_persistentConnection.IsConnected)

36. {

37. _persistentConnection.TryConnect();

38. }

39.

40. using var channel = _persistentConnection.CreateModel();

41. channel.QueueUnbind(queue: _queueName,

42. exchange: BROKER_NAME,

43. routingKey: eventName);

44.

45. if (_subsManager.IsEmpty)

46. {

47. _queueName = string.Empty;

48. _consumerChannel.Close();

49. }

50. }

51.

52. public void Publish(IntegrationEvent @event)

53. {

54. if (!_persistentConnection.IsConnected)

55. {

56. _persistentConnection.TryConnect();

57. }

58.

59. var policy = RetryPolicy.Handle<BrokerUnreachableException>()

60. .Or<SocketException>()

61. .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>

62. {

63. _logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s", @event.Id, $"{time.TotalSeconds:n1}");

64. });

65.

66. var eventName = @event.GetType().Name;

67.

68. _logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName);

69.

70. using var channel = _persistentConnection.CreateModel();

71. _logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id);

72.

73. channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");

74.

75. var body = JsonSerializer.SerializeToUtf8Bytes(@event, @event.GetType(), s_indentedOptions);

76.

77. policy.Execute(() =>

78. {

79. var properties = channel.CreateBasicProperties();

80. properties.DeliveryMode = 2; // persistent

81.

82. _logger.LogTrace("Publishing event to RabbitMQ: {EventId}", @event.Id);

83.

84. channel.BasicPublish(

85. exchange: BROKER_NAME,

86. routingKey: eventName,

87. mandatory: true,

88. basicProperties: properties,

89. body: body);

90. });

91. }

92.

93. public void SubscribeDynamic<TH>(string eventName)

94. where TH : IDynamicIntegrationEventHandler

95. {

96. _logger.LogInformation("Subscribing to [c ](/search?q=c )event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());

97.

98. DoInternalSubscription(eventName);

99. _subsManager.AddDynamicSubscription<TH>(eventName);

100. StartBasicConsume();

101. }

102.

103. public void Subscribe<T, TH>()

104. where T : IntegrationEvent

105. where TH : IIntegrationEventHandler<T>

106. {

107. var eventName = _subsManager.GetEventKey<T>();

108. DoInternalSubscription(eventName);

109.

110. _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());

111.

112. _subsManager.AddSubscription<T, TH>();

113. StartBasicConsume();

114. }

115.

116. private void DoInternalSubscription(string eventName)

117. {

118. var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);

119. if (!containsKey)

120. {

121. if (!_persistentConnection.IsConnected)

122. {

123. _persistentConnection.TryConnect();

124. }

125.

126. _consumerChannel.QueueBind(queue: _queueName,

127. exchange: BROKER_NAME,

128. routingKey: eventName);

129. }

130. }

131.

132. public void Unsubscribe<T, TH>()

133. where T : IntegrationEvent

134. where TH : IIntegrationEventHandler<T>

135. {

136. var eventName = _subsManager.GetEventKey<T>();

137.

138. _logger.LogInformation("Unsubscribing from event {EventName}", eventName);

139.

140. _subsManager.RemoveSubscription<T, TH>();

141. }

142.

143. public void UnsubscribeDynamic<TH>(string eventName)

144. where TH : IDynamicIntegrationEventHandler

145. {

146. _subsManager.RemoveDynamicSubscription<TH>(eventName);

147. }

148.

149. public void Dispose()

150. {

151. if (_consumerChannel != null)

152. {

153. _consumerChannel.Dispose();

154. }

155.

156. _subsManager.Clear();

157. }

158.

159. private void StartBasicConsume()

160. {

161. _logger.LogTrace("Starting RabbitMQ basic consume");

162.

163. if (_consumerChannel != null)

164. {

165. var consumer = new AsyncEventingBasicConsumer(_consumerChannel);

166.

167. consumer.Received += Consumer_Received;

168.

169. _consumerChannel.BasicConsume(

170. queue: _queueName,

171. autoAck: false,

172. consumer: consumer);

173. }

174. else

175. {

176. _logger.LogError("StartBasicConsume can't call on _consumerChannel == null");

177. }

178. }

179.

180. private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)

181. {

182. var eventName = eventArgs.RoutingKey;

183. var message = Encoding.UTF8.GetString(eventArgs.Body.Span);

184.

185. try

186. {

187. if (message.ToLowerInvariant().Contains("throw-fake-exception"))

188. {

189. throw new InvalidOperationException($"Fake exception requested: \"{message}\"");

190. }

191.

192. await ProcessEvent(eventName, message);

193. }

194. catch (Exception ex)

195. {

196. _logger.LogWarning(ex, "Error Processing message \"{Message}\"", message);

197. }

198.

199. // Even on exception we take the message off the queue.

200. // in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX).

201. // For more information see: https://www.rabbitmq.com/dlx.html

202. _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);

203. }

204.

205. private IModel CreateConsumerChannel()

206. {

207. if (!_persistentConnection.IsConnected)

208. {

209. _persistentConnection.TryConnect();

210. }

211.

212. _logger.LogTrace("Creating RabbitMQ consumer channel");

213.

214. var channel = _persistentConnection.CreateModel();

215.

216. channel.ExchangeDeclare(exchange: BROKER_NAME,

217. type: "direct");

218.

219. channel.QueueDeclare(queue: _queueName,

220. durable: true,

221. exclusive: false,

222. autoDelete: false,

223. arguments: null);

224.

225. channel.CallbackException += (sender, ea) =>

226. {

227. _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel");

228.

229. _consumerChannel.Dispose();

230. _consumerChannel = CreateConsumerChannel();

231. StartBasicConsume();

232. };

233.

234. return channel;

235. }

236.

237. private async Task ProcessEvent(string eventName, string message)

238. {

239. _logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName);

240.

241. if (_subsManager.HasSubscriptionsForEvent(eventName))

242. {

243. await using var scope = _serviceProvider.CreateAsyncScope();

244. var subscriptions = _subsManager.GetHandlersForEvent(eventName);

245. foreach (var subscription in subscriptions)

246. {

247. if (subscription.IsDynamic)

248. {

249. if (scope.ServiceProvider.GetService(subscription.HandlerType) is not IDynamicIntegrationEventHandler handler) continue;

250. using dynamic eventData = JsonDocument.Parse(message);

251. await Task.Yield();

252. await handler.Handle(eventData);

253. }

254. else

255. {

256. var handler = scope.ServiceProvider.GetService(subscription.HandlerType);

257. if (handler == null) continue;

258. var eventType = _subsManager.GetEventTypeByName(eventName);

259. var integrationEvent = JsonSerializer.Deserialize(message, eventType, s_caseInsensitiveOptions);

260. var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);

261.

262. await Task.Yield();

263. await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });

264. }

265. }

266. }

267. else

268. {

269. _logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName);

270. }

271. }

272. }