How to start using the Event Bus
This doc should be moved to github as part of https://github.com/openedx/openedx-events/issues/238.
- 1 Background
- 1.1 What is the event bus?
- 1.2 Why use the event bus?
- 1.3 Resources
- 1.3.1 More about the Open edX event bus
- 1.3.2 More about Kafka
- 1.3.3 More about Redis
- 2 Before you start
- 3 Broker setup
- 4 Defining events
- 4.1 Creating a new event
- 4.2 Event design
- 4.2.1 Multiple events
- 4.3 Event evolution
- 5 Producing
- 6 Consuming
- 7 Error handling
- 8 Event Bus Settings
- 9 Special Kafka Configurations
- 10 Observability
- 11 Monitoring
- 12 Open Questions
- 13 Private Organization Links
Background
What is the event bus?
The event bus is a means for asynchronous communication between services using the pub/sub model. The Open edX platform uses OpenEdxPublicSignals to send events within a service, based on Django Signals. The event bus extends these signals, allowing them to be broadcast and handled across multiple services.
Currently, we have two implementation of event bus, i.e. Kafka, a distributed event streaming application and Redis Streams, a data structure that acts like an append-only log.
We have created abstraction layers in openedx_events: https://github.com/openedx/openedx-events/blob/main/openedx_events/event_bus/__init__.py to help developers use other technologies such as Pulsar. See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0002-kafka-based-event-bus.rst for more on the decision to use Kafka for the initial implementation.
Why use the event bus?
The event bus can help us achieve loose coupling between services, replacing blocking requests between services and large sync jobs, leading to a faster, more reliable, and more extensible system. See event messaging architectural goals to read more about its benefits.
Resources
More about the Open edX event bus
OEP on event bus architecture - https://open-edx-proposals.readthedocs.io/en/latest/architectural-decisions/oep-0052-arch-event-bus-architecture.html
ADR on how the event bus interacts with Django signals - https://openedx-events.readthedocs.io/en/latest/decisions/0004-external-event-bus-and-django-signal-events.html
More about Kafka
As other concretely implementations are created (i.e. alternatives to Kafka), this how-to will need to be adjusted. See How to add a new concrete implementation of the event bus for more details.
Before starting, it may be helpful to read up on Kafka and familiarize yourself with the ideas of topics, consumer groups, partitions, at-least-once guarantees, etc.
The documentation at https://developer.confluent.io/ is quite thorough and has information about both Kafka in general and the Confluent platform in particular. Confluent is one third-party hosting option for Kafka.
More about Redis
Before starting, it may be helpful to read up on Redis streams and familiarize yourself with the ideas of streams, consumer groups, etc.
The documentation at https://redis.io/docs/data-types/streams/ is quite good.
Before you start
Your organization may have already made a choice around Kafka vs Redis, and other organization specific details around hosting, testing, etc. Your organization may also have its own supporting how-to documentation.
To use a Kafka based event bus, you will need a Kafka cluster. You can either set up one yourself or use a hosted solution like Confluent. See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0004-kafka-managed-hosting.rst for more information on edX.org’s decision to use Confluent.
Broker setup
Redis local broker setup
Devstack
Devstack defaults to redis event bus, so no special setup is required.
Other
If you do not use devstack, an easy way to set up a Redis locally is to follow the instructions here: https://redis.io/docs/getting-started/ . You will have to make sure your services can connect to redis so set EVENT_BUS_REDIS_CONNECTION_URL
appropriately.
Kafka local broker setup
Devstack
If you are using devstack, you can run your own local Kafka broker alongside all your other containers:
Run
make dev.up.kafka-control-center
Check at
localhost:9021
to see if the control center is up. It sometimes takes a bit (~2 minutes) and occasionally requires a bit of restartingAdd the following configurations to your service’s
devstack.py
(or equivalent environment file)EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL = 'http://edx.devstack.schema-registry:8081' EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS = 'edx.devstack.kafka:29092'
Follow the instructions at https://github.com/openedx/event-bus-kafka/blob/main/docs/how_tos/manual_testing.rst to test your event bus setup
Other
If you do not use devstack, an easy way to set up a Kafka cluster locally is to follow the instructions here: https://developer.confluent.io/quickstart/kafka-docker/. You will have to make sure your services can connect to the broker and set EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL
and EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS
appropriately.
Remotely-hosted Kafka broker setup
In addition to following your provider’s instructions, you may be interested in these particular broker configuration settings.
Defining events
Creating a new event
New events are created as instances of OpenEdxPublicSignal
s in https://github.com/openedx/openedx-events . You should add your signal(s) in the relevant subdomain folder, or create a new one if your subdomain is not present. See edX DDD Bounded Contexts for the current division of subdomains.
For purposes of this document, we’ll assume you created a signal in openedx_events/learning/signals.py
called MY_EVENT_HAPPENED
with an event type of my.event.happened
.
Event design
Start by reading this documentation:
Multiple events
There may be some cases where you will want to create multiple events to be sent to the same topic. For example, when tying events to a model, you may want separate MY_MODEL_CREATED, MY_MODEL_UPDATED, and MY_MODEL_DELETED events. This should also be discussed with the team who owns the producing service and any potential consumers.
See https://openedx-events.readthedocs.io/en/latest/decisions/0010-multiple-event-types-per-topic.html for when and why you may wish to use multiple event types on a single topic.
Event evolution
Events may need to evolve over time, which will change their schema definition. Behind the scenes in the Kafka implementation, there is a Schema Registry, which is meant to help ensure that any schema changes are non-breaking. The Redis implementation does not support schema evolution.
More work may be needed around event evolution, but is being deferred until there is a real need. For now, we recommend against changing the schema of an event in production. Instead you may wish to create a new event/topic with the new schema. See https://github.com/edx/edx-arch-experiments/issues/53 for details.
Producing
First-time setup
If this is the first time a service is using the event bus, you will need to do some initial setup to use the event bus library. This should only need to be done once per service.
[Kafka specific] Make sure your code is running in an environment that has the
confluent_kafka
library installed along with itsavro
andschema-registry
extras. See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst for why this has to be done separately.For local development, you can simply exec into the relevant container and run
pip install confluent_kafka[avro,schema-registry]
Once you make sure
confluent_kafka[avro,schema-registry]
is available you can add theopenedx-events
andedx-event-bus-kafka
as you would any other dependency.
# This tells openedx-events which library to use to create the event producer. # This allows us to plug in different implementations EVENT_BUS_PRODUCER: edx_event_bus_redis.create_producer # This tells openedx-events which library to use to create the event consumer. EVENT_BUS_CONSUMER: edx_event_bus_redis.RedisEventConsumer EVENT_BUS_REDIS_CONNECTION_URL: redis://:password@edx.devstack.redis:6379/' EVENT_BUS_TOPIC_PREFIX: dev
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS # This tells openedx-events which library to use to create the event producer. # This allow us to plug in different implementations EVENT_BUS_PRODUCER: edx_event_bus_kafka.create_producer # This tells openedx-events which library to use to create the event consumer. EVENT_BUS_CONSUMER: edx_event_bus_kafka.KafkaEventConsumer #### If using an auth-restricted broker ##### EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY: 'MY_SR_KEY' EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_SECRET: 'MY_SR_SECRET' EVENT_BUS_KAFKA_API_KEY: 'MY_KEY' EVENT_BUS_KAFKA_API_SECRET: 'MY_SECRET'
We also strongly recommend using a topic prefix to distinguish between environments, eg ‘dev’, ‘stage’, or ‘prod.’ This prefix will be added to all topic names when both producing and consuming. Adding a topic prefix will reduce the likelihood of accidentally conflating data from different environments. To add a topic prefix, use the
EVENT_BUS_TOPIC_PREFIX
setting.
Producing a signal
As of openedx-events 9.0.0 version, producing a signal to event bus is possible by just adding below settings to the host application and including openedx_events
in INSTALLED_APPS
setting.
The above setting is read on application startup and a generic handler is connected to each event type allowing users to push to event bus without writing additional code.
Consuming
First-time setup
If this is the first time the service is consuming from the event bus, you will also need to do some setup. This should only be done once per service.
In particular, you will need
An environment in which you can run an infinite consumer loop (for example, a Kubernetes worker)
For Kafka implementation
The
confluent_kafka
library. See the Producer section for why this needs to be brought in separately.Add the edx-event-bus-kafka library to the base.in file of your project.
For Redis implementation
Add the
edx-event-bus-redis
library to the base.in file of your project.
Optional, but recommended configuration:
You can set
EVENT_BUS_TOPIC_PREFIX
to match whatever prefixing strategy you are using for your producers to make sure you are consuming events from the correct environment.Setting
EVENT_BUS_KAFKA_CONSUMER_CONSECUTIVE_ERRORS_LIMIT: 5
for Kafka implementation andEVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT: 5
for Redis implementation will ensure that a consecutive run of 5 errors will cause the consumer to exit, allowing you to automatically reset application state in the face of persistent errors. This is appropriate if your consumer will then be automatically restarted, e.g. by Kubernetes.
Consuming the signal
If you are using an auth-restricted broker, make sure your service API key has
READ
access to the relevant topic and consumer groups (if you're running the broker locally, you shouldn't need to do anything special to ensure access)Add a signal handler in your service for MY_SIGNAL, i.e.
In addition to the event data,
kwargs
will also contain ametadata
parameter, which will be an EventsMetadata object containing information such as the message id, original source, and time the original event occurred (among others).Note that if the producer did not explicitly set the time of the action when calling
send_event
, the time will default to whenever thesend_event
call was first made
See example code here:
Include
openedx_events
in your service’sINSTALLED_APPS
, allowing it to register theconsume_events
management command.Run the management command
Unless you are doing something particularly fancy with consumer groups,
my_group
should just beservice_environment
, egcourse_discovery_stage
Note that you should not include the topic prefix in the management command. That should be specified by setting the optional
EVENT_BUS_TOPIC_PREFIX
Django setting.Important: Note that here we are using the event type and not the module string. This is different than in the
produce_event
command used for testing.You can pass additional arguments to underlying implementation of event_bus using
--extra
parameter with JSON string as value:
Error handling
Automatic error handling is still a work in progress. edx-event-bus-kafka
as of 1.9.0 and edx-event-bus-redis
as of 0.1.0 logs what should be sufficient information to recover from most failures on both the consumer and producer sides. (This includes: Keys of events as they are produced, keys and values when producer fails, and keys, values, and offsets, when the consumer fails.) Theoretically, they can be replayed by manual script in case of an error.
In addition, as of version 3.5.1 of edx-event-bus-kafka
you can replay all events in the consumer from a given time. To do this, you need to run your consuming command with the -o
flag, like
As of version 4.0.0, edx-event-bus-kafka
makes use of consumer API from openedx-events and depends on consume_events
command from it. To reset offsets from this version on-wards you need to make use of --extra
argument, like
This will replay all events that were received by Kafka after 2023-01-01. You need to specify the date in ISO format. You will want to have all consumers in the same consumer group run this command to make sure you capture all events across all partitions. As soon as all consumers are running the new command, they should then be redeployed to run without the -o
option (even if replay is not yet finished.)
Beyond logging and replaying, there are a number of different recommended patterns for error handling, all of which are dependent on the use case and the risks of different types of errors. https://www.confluent.io/blog/error-handling-patterns-in-kafka/ gives a pretty good summary of some of the available options you may wish to pursue. Eventually we may add some of these to the library but there are no immediate plans beyond the logging and replaying work mentioned above.
Events can be replayed with edx-event-bus-redis
as well:
Event Bus Settings
You can find the docs for all EVENT_BUS_
settings using the following:
Special Kafka Configurations
At time of writing, the Open edX Kafka implementation of the event bus uses mostly the default settings on Producers and Consumers, and our Confluent Kafka instance is likewise mostly set up with the defaults, but there are many different configurations you may want to think about for your use case. None of these are yet supported but are written here as possible avenues for future work depending on need.
Producer
All the configurations are available here: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#ak-producer-configurations-for-cp . Of particular interest:
linger.ms
- How long the producer will wait after receiving an event to actually produce it (default is 0, meaning we produce as soon as we receive). Lower values decrease latency and decrease chance of message loss due to server shutdown; higher values allow potentially more efficient batching. See also batch.size
, especially if you choose to set linger.ms
above zero to take advantage of batching.
The defaults of acks=all
, enable.idempotence=true
, and retries
> 0 all help promote in-order, exactly-once delivery to the broker.
Consumer
All the configurations are available here: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html . Of particular interest:
enable.auto.commit
- This is explicitly set to False disabled in the current implementation.
When set to True, a background process will periodically commit the last processed offset. This means if server shutdown occurs in between commits, when the consumer starts back up, it may reprocess events that occurred after the last commit (this is how at-least-once delivery works.) However, it may also cause some events to be lost—those events which had been retrieved for processing but had not been processed.
When False, the caller is responsible for calling consumer.commit() to commit offsets after processing the message.
Broker
We currently do not set any of these configurations in code, only on our Confluent Kafka instance. All broker configurations are here: https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html# . Of particular interest:
auto.create.topics.enable
- When True, will automatically create new topics when an event is produced to them. We recommend setting this to False for stage/production environments for better regulation of topics
Observability
See “Error Handling” for details on error logging.
Other observability options:
Kafka itself has a number of metrics available.
New Relic has several capabilities:
A number of automatic hooks for metrics for producing and consuming messages.
If using New Relic, ensure “Distributed Tracing” is used for all producing and consuming New Relic apps, in order to provide consumer lag details.
Custom attributes are detailed with custom_attribute_name annotations.
Monitoring
You’ll likely want alerting for the following:
No event production/consumption for some time interval
Error rates in producer/consumer
Consumer processing time
Consumer lag (lag: how far the consumer is behind, in number of events)
This is preferable over consumer latency, which is the time elapsed between an event’s production and consumption. Latency can be artificially inflated by the previous event having a long processing time even when lag is very small (creating false positives), and also can only be measured once an event finally arrives (making the metric prone to false negatives).
Open Questions
Some issues we have not yet addressed include:
How to ensure no events are lost. See the error handling section How to start using the Event Bus | Error handling
How to set
auto.offset.reset
when creating new consumers for existing events
Private Organization Links
2U: Supplementary how-to for event bus for edx.org (private link)