How to add a new concrete implementation of the event bus
Abstraction work is still ongoing. This document is expected to evolve significantly as that work progresses.
Context
Though the initial Open edX event bus was implemented using Kafka, the ultimate goal is for maintainers to be able to swap out different technologies like Redis or Pulsar. See https://openedx.atlassian.net/wiki/spaces/AC/pages/3508699151 for more information.
Existing Concrete Implementations
Redis Streams: https://github.com/openedx/event-bus-redis
Producing
There should be a producer class that inherits from EventBusProducer in openedx-events, which is defined in openedx-events: https://github.com/openedx/openedx-events/blob/cbb59f124ed84afacb9ec99baa82a86381370dcc/openedx_events/event_bus/__init__.py#L66
The defined send
method is meant to be called from within a signal receiver in the producing service.
Consuming
At a high level, the consumer should be a process that takes the signals and events from the broker and emits the signal with the event. There should be a consumer class that inherits from EventBusConsumer in openedx-events, which is defined in openedx-events: https://github.com/openedx/openedx-events/blob/06635f3642cee4020d6787df68bba694bd1233fe/openedx_events/event_bus/__init__.py#L127
The consumer class then needs to implement consume_indefinitely
loop, which will stay running and listen to events as they come in.
If you are doing this within Django, one thing to be aware of is Django will not do any of the automatic connection cleanup that it usually does per request. This means that if your database restarts while the consume_indefinitely
loop is running, Django may try to hold on to a defunct connection. To address this, we recommend checking the connection at the beginning of each iteration and refreshing it if necessary. See https://github.com/openedx/event-bus-kafka/blob/1e8dc0c3a6fb710fc7e07d31dd4a838914d01201/edx_event_bus_kafka/internal/consumer.py#L79 for what we did with event-bus-kafka. We’re also resetting the RequestCache in event-bus-kafka, and there may be later, more comprehensive changes. It may be that we want to move all of this logic up into openedx-events rather than adding it piecemeal to implementations.
Abstraction tickets
The known remaining work for a fully abstracted event bus is captured in the Abstraction tickets here: