This doc should be moved to github as part of https://github.com/openedx/openedx-events/issues/238. |
The event bus is a means for asynchronous communication between services using the pub/sub model. The Open edX platform uses OpenEdxPublicSignals to send events within a service, based on Django Signals. The event bus extends these signals, allowing them to be broadcast and handled across multiple services.
Currently, we have two implementation of event bus, i.e. Kafka, a distributed event streaming application and Redis Streams, a data structure that acts like an append-only log.
We have created abstraction layers in openedx_events: https://github.com/openedx/openedx-events/blob/main/openedx_events/event_bus/__init__.py to help developers use other technologies such as Pulsar. See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0002-kafka-based-event-bus.rst for more on the decision to use Kafka for the initial implementation.
The event bus can help us achieve loose coupling between services, replacing blocking requests between services and large sync jobs, leading to a faster, more reliable, and more extensible system. See event messaging architectural goals to read more about its benefits.
OEP on event bus architecture - https://open-edx-proposals.readthedocs.io/en/latest/architectural-decisions/oep-0052-arch-event-bus-architecture.html
ADR on how the event bus interacts with Django signals - https://openedx-events.readthedocs.io/en/latest/decisions/0004-external-event-bus-and-django-signal-events.html
As other concretely implementations are created (i.e. alternatives to Kafka), this how-to will need to be adjusted. See How to add a new concrete implementation of the event bus for more details.
As other concretely implementations are created (i.e. alternatives to Kafka), this how-to will need to be adjusted. See How to add a new concrete implementation of the event bus for more details.
Before starting, it may be helpful to read up on Kafka and familiarize yourself with the ideas of topics, consumer groups, partitions, at-least-once guarantees, etc.
The documentation at https://developer.confluent.io/ is quite thorough and has information about both Kafka in general and the Confluent platform in particular. Confluent is one third-party hosting option for Kafka.
Before starting, it may be helpful to read up on Redis streams and familiarize yourself with the ideas of streams, consumer groups, etc.
The documentation at https://redis.io/docs/data-types/streams/ is quite good.
Your organization may have already made a choice around Kafka vs Redis, and other organization specific details around hosting, testing, etc. Your organization may also have its own supporting how-to documentation. |
To use a Kafka based event bus, you will need a Kafka cluster. You can either set up one yourself or use a hosted solution like Confluent. See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0004-kafka-managed-hosting.rst for more information on edX.org’s decision to use Confluent.
Devstack defaults to redis event bus, so no special setup is required.
If you do not use devstack, an easy way to set up a Redis locally is to follow the instructions here: https://redis.io/docs/getting-started/ . You will have to make sure your services can connect to redis so set EVENT_BUS_REDIS_CONNECTION_URL
appropriately.
If you are using devstack, you can run your own local Kafka broker alongside all your other containers:
Run make dev.up.kafka-control-center
Check at localhost:9021
to see if the control center is up. It sometimes takes a bit (~2 minutes) and occasionally requires a bit of restarting
Add the following configurations to your service’s devstack.py
(or equivalent environment file)
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL = 'http://edx.devstack.schema-registry:8081' EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS = 'edx.devstack.kafka:29092' |
Follow the instructions at https://github.com/openedx/event-bus-kafka/blob/main/docs/how_tos/manual_testing.rst to test your event bus setup
If you do not use devstack, an easy way to set up a Kafka cluster locally is to follow the instructions here: https://developer.confluent.io/quickstart/kafka-docker/. You will have to make sure your services can connect to the broker and set EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL
and EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS
appropriately.
In addition to following your provider’s instructions, you may be interested in these particular broker configuration settings.
New events are created as instances of OpenEdxPublicSignal
s in https://github.com/openedx/openedx-events . You should add your signal(s) in the relevant subdomain folder, or create a new one if your subdomain is not present. See edX DDD Bounded Contexts for the current division of subdomains.
For purposes of this document, we’ll assume you created a signal in openedx_events/learning/signals.py
called MY_EVENT_HAPPENED
with an event type of my.event.happened
.
Events are meant to help with decoupling services and teams, and are meant to be consumed by multiple services. When designing a new event, it should be a collaboration between the team who owns the data and will maintain the event producing, the team that is first interested in consuming the new event, and other potential consumers (where possible). |
Start by reading this documentation:
There may be some cases where you will want to create multiple events to be sent to the same topic. For example, when tying events to a model, you may want separate MY_MODEL_CREATED, MY_MODEL_UPDATED, and MY_MODEL_DELETED events. This should also be discussed with the team who owns the producing service and any potential consumers.
See https://openedx-events.readthedocs.io/en/latest/decisions/0010-multiple-event-types-per-topic.html for when and why you may wish to use multiple event types on a single topic.
Events may need to evolve over time, which will change their schema definition. Behind the scenes in the Kafka implementation, there is a Schema Registry, which is meant to help ensure that any schema changes are non-breaking. The Redis implementation does not support schema evolution.
More work may be needed around event evolution, but is being deferred until there is a real need. For now, we recommend against changing the schema of an event in production. Instead you may wish to create a new event/topic with the new schema. See https://github.com/edx/edx-arch-experiments/issues/53 for details.
If this is the first time a service is using the event bus, you will need to do some initial setup to use the event bus library. This should only need to be done once per service.
[Kafka specific] Make sure your code is running in an environment that has the confluent_kafka
library installed along with its avro
and schema-registry
extras. See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst for why this has to be done separately.
For local development, you can simply exec into the relevant container and run pip install confluent_kafka[avro,schema-registry]
Once you make sure confluent_kafka[avro,schema-registry]
is available you can add the openedx-events
and edx-event-bus-kafka
as you would any other dependency.
For Redis implementation:
# This tells openedx-events which library to use to create the event producer. # This allows us to plug in different implementations EVENT_BUS_PRODUCER: edx_event_bus_redis.create_producer # This tells openedx-events which library to use to create the event consumer. EVENT_BUS_CONSUMER: edx_event_bus_redis.RedisEventConsumer EVENT_BUS_REDIS_CONNECTION_URL: redis://:password@edx.devstack.redis:6379/' EVENT_BUS_TOPIC_PREFIX: dev |
For Kafka implementation:
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS # This tells openedx-events which library to use to create the event producer. # This allow us to plug in different implementations EVENT_BUS_PRODUCER: edx_event_bus_kafka.create_producer # This tells openedx-events which library to use to create the event consumer. EVENT_BUS_CONSUMER: edx_event_bus_kafka.KafkaEventConsumer #### If using an auth-restricted broker ##### EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY: 'MY_SR_KEY' EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_SECRET: 'MY_SR_SECRET' EVENT_BUS_KAFKA_API_KEY: 'MY_KEY' EVENT_BUS_KAFKA_API_SECRET: 'MY_SECRET' |
We also strongly recommend using a topic prefix to distinguish between environments, eg ‘dev’, ‘stage’, or ‘prod.’ This prefix will be added to all topic names when both producing and consuming. Adding a topic prefix will reduce the likelihood of accidentally conflating data from different environments. To add a topic prefix, use the EVENT_BUS_TOPIC_PREFIX
setting.
As of openedx-events 9.0.0 version, producing a signal to event bus is possible by just adding below settings to the host application and including openedx_events
in INSTALLED_APPS
setting.
# .. setting_name: EVENT_BUS_PRODUCER_CONFIG # .. setting_default: all events disabled # .. setting_description: Dictionary of event_types mapped to dictionaries of topic to topic-related configuration. # Each topic configuration dictionary contains # * `enabled`: a toggle denoting whether the event will be published to the topic. These should be annotated # according to # https://edx.readthedocs.io/projects/edx-toggles/en/latest/how_to/documenting_new_feature_toggles.html # * `event_key_field` which is a period-delimited string path to event data field to use as event key. # Note: The topic names should not include environment prefix as it will be dynamically added based on # EVENT_BUS_TOPIC_PREFIX setting. EVENT_BUS_PRODUCER_CONFIG = { 'org.openedx.content_authoring.xblock.published.v1': { 'content-authoring-xblock-lifecycle': {'event_key_field': 'xblock_info.usage_key', 'enabled': False}, 'content-authoring-xblock-published': {'event_key_field': 'xblock_info.usage_key', 'enabled': False}, }, 'org.openedx.content_authoring.xblock.deleted.v1': { 'content-authoring-xblock-lifecycle': {'event_key_field': 'xblock_info.usage_key', 'enabled': False}, }, } |
The above setting is read on application startup and a generic handler is connected to each event type allowing users to push to event bus without writing additional code.
Consuming uses an infinite loop, which doesn’t follow Django’s typical request pattern. Take care if using code that expects a more typical request pattern, like atomic requests. |
If this is the first time the service is consuming from the event bus, you will also need to do some setup. This should only be done once per service.
In particular, you will need
An environment in which you can run an infinite consumer loop (for example, a Kubernetes worker)
For Kafka implementation
The confluent_kafka
library. See the Producer section for why this needs to be brought in separately.
Add the edx-event-bus-kafka library to the base.in file of your project.
For Redis implementation
Add the edx-event-bus-redis
library to the base.in file of your project.
Optional, but recommended configuration:
You can set EVENT_BUS_TOPIC_PREFIX
to match whatever prefixing strategy you are using for your producers to make sure you are consuming events from the correct environment.
Setting EVENT_BUS_KAFKA_CONSUMER_CONSECUTIVE_ERRORS_LIMIT: 5
for Kafka implementation and EVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT: 5
for Redis implementation will ensure that a consecutive run of 5 errors will cause the consumer to exit, allowing you to automatically reset application state in the face of persistent errors. This is appropriate if your consumer will then be automatically restarted, e.g. by Kubernetes.
If you are using an auth-restricted broker, make sure your service API key has READ
access to the relevant topic and consumer groups (if you're running the broker locally, you shouldn't need to do anything special to ensure access)
Add a signal handler in your service for MY_SIGNAL, i.e.
@receiver(MY_EVENT_HAPPENED) def listen_for_my_signal_and_do_things(sender, **kwargs): ... do things with the data in kwargs |
In addition to the event data, kwargs
will also contain a metadata
parameter, which will be an EventsMetadata object containing information such as the message id, original source, and time the original event occurred (among others).
Note that if the producer did not explicitly set the time of the action when calling send_event
, the time will default to whenever the send_event
call was first made
See example code here:
Include openedx_events
in your service’s INSTALLED_APPS
, allowing it to register the consume_events
management command.
Run the management command
Unless you are doing something particularly fancy with consumer groups, my_group
should just be service_environment
, eg course_discovery_stage
./manage.py consume_events -t my-event-happened -g my_group |
Note that you should not include the topic prefix in the management command. That should be specified by setting the optional EVENT_BUS_TOPIC_PREFIX
Django setting.
Important: Note that here we are using the event type and not the module string. This is different than in the produce_event
command used for testing.
You can pass additional arguments to underlying implementation of event_bus using --extra
parameter with JSON string as value:
# Pass consumer name to redis event bus ./manage.py consume_events -t my-event-happened -g my_group --extra '{"consumer_name": "c1"}' |
Automatic error handling is still a work in progress. edx-event-bus-kafka
as of 1.9.0 and edx-event-bus-redis
as of 0.1.0 logs what should be sufficient information to recover from most failures on both the consumer and producer sides. (This includes: Keys of events as they are produced, keys and values when producer fails, and keys, values, and offsets, when the consumer fails.) Theoretically, they can be replayed by manual script in case of an error.
In addition, as of version 3.5.1 of edx-event-bus-kafka
you can replay all events in the consumer from a given time. To do this, you need to run your consuming command with the -o
flag, like
./manage.py consume_events -t my-event -g my_group -o 2023-01-01T00:00:00 |
As of version 4.0.0, edx-event-bus-kafka
makes use of consumer API from openedx-events and depends on consume_events
command from it. To reset offsets from this version on-wards you need to make use of --extra
argument, like
./manage.py consume_events -t my-event -g my_group --extra '{"offset_time": "2023-01-08T06:46:22"}' |
This will replay all events that were received by Kafka after 2023-01-01. You need to specify the date in ISO format. You will want to have all consumers in the same consumer group run this command to make sure you capture all events across all partitions. As soon as all consumers are running the new command, they should then be redeployed to run without the -o
option (even if replay is not yet finished.)
Beyond logging and replaying, there are a number of different recommended patterns for error handling, all of which are dependent on the use case and the risks of different types of errors. https://www.confluent.io/blog/error-handling-patterns-in-kafka/ gives a pretty good summary of some of the available options you may wish to pursue. Eventually we may add some of these to the library but there are no immediate plans beyond the logging and replaying work mentioned above.
Events can be replayed with edx-event-bus-redis
as well:
# for example pass check_backlog flag to redis consumer to # process all messages that were not read by this consumer group. python manage.py consume_events -t user-login -g user-activity-service \ --extra '{"check_backlog": true, "consumer_name": "c1"}' # for example replay events from specific redis msg id. python manage.py consume_events -t user-login -g user-activity-service \ --extra '{"last_read_msg_id": "1679676448892-0", "consumer_name": "c1"}' |
You can find the docs for all EVENT_BUS_
settings using the following:
At time of writing, the Open edX Kafka implementation of the event bus uses mostly the default settings on Producers and Consumers, and our Confluent Kafka instance is likewise mostly set up with the defaults, but there are many different configurations you may want to think about for your use case. None of these are yet supported but are written here as possible avenues for future work depending on need.
All the configurations are available here: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#ak-producer-configurations-for-cp . Of particular interest:
linger.ms
- How long the producer will wait after receiving an event to actually produce it (default is 0, meaning we produce as soon as we receive). Lower values decrease latency and decrease chance of message loss due to server shutdown; higher values allow potentially more efficient batching. See also batch.size
, especially if you choose to set linger.ms
above zero to take advantage of batching.
The defaults of acks=all
, enable.idempotence=true
, and retries
> 0 all help promote in-order, exactly-once delivery to the broker.
All the configurations are available here: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html . Of particular interest:
enable.auto.commit
- This is explicitly set to False disabled in the current implementation.
When set to True, a background process will periodically commit the last processed offset. This means if server shutdown occurs in between commits, when the consumer starts back up, it may reprocess events that occurred after the last commit (this is how at-least-once delivery works.) However, it may also cause some events to be lost—those events which had been retrieved for processing but had not been processed.
When False, the caller is responsible for calling consumer.commit() to commit offsets after processing the message.
We currently do not set any of these configurations in code, only on our Confluent Kafka instance. All broker configurations are here: https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html# . Of particular interest:
auto.create.topics.enable
- When True, will automatically create new topics when an event is produced to them. We recommend setting this to False for stage/production environments for better regulation of topics
The maintainers of the event production and each event consumer may be different teams.
The maintainers of the event production and each event consumer may be different teams.
See “Error Handling” for details on error logging.
Other observability options:
Kafka itself has a number of metrics available.
New Relic has several capabilities:
A number of automatic hooks for metrics for producing and consuming messages.
If using New Relic, ensure “Distributed Tracing” is used for all producing and consuming New Relic apps, in order to provide consumer lag details.
Custom attributes are detailed with custom_attribute_name annotations.
You’ll likely want alerting for the following:
No event production/consumption for some time interval
Error rates in producer/consumer
Consumer processing time
Consumer lag (lag: how far the consumer is behind, in number of events)
This is preferable over consumer latency, which is the time elapsed between an event’s production and consumption. Latency can be artificially inflated by the previous event having a long processing time even when lag is very small (creating false positives), and also can only be measured once an event finally arrives (making the metric prone to false negatives).
Some issues we have not yet addressed include:
How to ensure no events are lost. See the error handling section https://openedx.atlassian.net/wiki/spaces/AC/pages/3508699151/How+to+start+using+the+Event+Bus#Error-handling
How to set auto.offset.reset
when creating new consumers for existing events
Add additional private docs as-needed
Add additional private docs as-needed
2U: Supplementary how-to for event bus for edx.org (private link)