How to start using the Event Bus

This doc should be moved to github as part of https://github.com/openedx/openedx-events/issues/238.

Background

What is the event bus?

The event bus is a means for asynchronous communication between services using the pub/sub model. The Open edX platform uses OpenEdxPublicSignals to send events within a service, based on Django Signals. The event bus extends these signals, allowing them to be broadcast and handled across multiple services.

Currently, we have two implementation of event bus, i.e. Kafka, a distributed event streaming application and Redis Streams, a data structure that acts like an append-only log.

We have created abstraction layers in openedx_events: https://github.com/openedx/openedx-events/blob/main/openedx_events/event_bus/__init__.py to help developers use other technologies such as Pulsar. See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0002-kafka-based-event-bus.rst for more on the decision to use Kafka for the initial implementation.

Why use the event bus?

The event bus can help us achieve loose coupling between services, replacing blocking requests between services and large sync jobs, leading to a faster, more reliable, and more extensible system. See event messaging architectural goals to read more about its benefits.

Resources

More about the Open edX event bus

OEP on event bus architecture - https://open-edx-proposals.readthedocs.io/en/latest/architectural-decisions/oep-0052-arch-event-bus-architecture.html

ADR on how the event bus interacts with Django signals - https://openedx-events.readthedocs.io/en/latest/decisions/0004-external-event-bus-and-django-signal-events.html

More about Kafka

Before starting, it may be helpful to read up on Kafka and familiarize yourself with the ideas of topics, consumer groups, partitions, at-least-once guarantees, etc.

The documentation at https://developer.confluent.io/ is quite thorough and has information about both Kafka in general and the Confluent platform in particular. Confluent is one third-party hosting option for Kafka.

More about Redis

Before starting, it may be helpful to read up on Redis streams and familiarize yourself with the ideas of streams, consumer groups, etc.

The documentation at https://redis.io/docs/data-types/streams/ is quite good.

Before you start

Your organization may have already made a choice around Kafka vs Redis, and other organization specific details around hosting, testing, etc. Your organization may also have its own supporting how-to documentation.

To use a Kafka based event bus, you will need a Kafka cluster. You can either set up one yourself or use a hosted solution like Confluent. See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0004-kafka-managed-hosting.rst for more information on edX.org’s decision to use Confluent.

Broker setup

Redis local broker setup

Devstack

Devstack defaults to redis event bus, so no special setup is required.

Other

If you do not use devstack, an easy way to set up a Redis locally is to follow the instructions here: https://redis.io/docs/getting-started/ . You will have to make sure your services can connect to redis so set EVENT_BUS_REDIS_CONNECTION_URL appropriately.

Kafka local broker setup

Devstack

If you are using devstack, you can run your own local Kafka broker alongside all your other containers:

  1. Run make dev.up.kafka-control-center

  2. Check at localhost:9021 to see if the control center is up. It sometimes takes a bit (~2 minutes) and occasionally requires a bit of restarting

  3. Add the following configurations to your service’s devstack.py (or equivalent environment file)

    EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL = 'http://edx.devstack.schema-registry:8081' EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS = 'edx.devstack.kafka:29092'
  4. Follow the instructions at https://github.com/openedx/event-bus-kafka/blob/main/docs/how_tos/manual_testing.rst to test your event bus setup

Other

If you do not use devstack, an easy way to set up a Kafka cluster locally is to follow the instructions here: https://developer.confluent.io/quickstart/kafka-docker/. You will have to make sure your services can connect to the broker and set EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL and EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS appropriately.

Remotely-hosted Kafka broker setup

In addition to following your provider’s instructions, you may be interested in these particular broker configuration settings.

Defining events

Creating a new event

New events are created as instances of OpenEdxPublicSignals in https://github.com/openedx/openedx-events . You should add your signal(s) in the relevant subdomain folder, or create a new one if your subdomain is not present. See edX DDD Bounded Contexts for the current division of subdomains.

For purposes of this document, we’ll assume you created a signal in openedx_events/learning/signals.py called MY_EVENT_HAPPENED with an event type of my.event.happened .

Event design

Start by reading this documentation:

Multiple events

There may be some cases where you will want to create multiple events to be sent to the same topic. For example, when tying events to a model, you may want separate MY_MODEL_CREATED, MY_MODEL_UPDATED, and MY_MODEL_DELETED events. This should also be discussed with the team who owns the producing service and any potential consumers.

See https://openedx-events.readthedocs.io/en/latest/decisions/0010-multiple-event-types-per-topic.html for when and why you may wish to use multiple event types on a single topic.

Event evolution

Events may need to evolve over time, which will change their schema definition. Behind the scenes in the Kafka implementation, there is a Schema Registry, which is meant to help ensure that any schema changes are non-breaking. The Redis implementation does not support schema evolution.

More work may be needed around event evolution, but is being deferred until there is a real need. For now, we recommend against changing the schema of an event in production. Instead you may wish to create a new event/topic with the new schema. See https://github.com/edx/edx-arch-experiments/issues/53 for details.

Producing

First-time setup

If this is the first time a service is using the event bus, you will need to do some initial setup to use the event bus library. This should only need to be done once per service.

  • [Kafka specific] Make sure your code is running in an environment that has the confluent_kafka library installed along with its avro and schema-registry extras. See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst for why this has to be done separately.

    • For local development, you can simply exec into the relevant container and run pip install confluent_kafka[avro,schema-registry]

    • Once you make sure confluent_kafka[avro,schema-registry] is available you can add the openedx-events and edx-event-bus-kafka as you would any other dependency.

  • For Redis implementation:

    • # This tells openedx-events which library to use to create the event producer. # This allows us to plug in different implementations EVENT_BUS_PRODUCER: edx_event_bus_redis.create_producer # This tells openedx-events which library to use to create the event consumer. EVENT_BUS_CONSUMER: edx_event_bus_redis.RedisEventConsumer EVENT_BUS_REDIS_CONNECTION_URL: redis://:password@edx.devstack.redis:6379/' EVENT_BUS_TOPIC_PREFIX: dev
  • For Kafka implementation:

    • EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS # This tells openedx-events which library to use to create the event producer. # This allow us to plug in different implementations EVENT_BUS_PRODUCER: edx_event_bus_kafka.create_producer # This tells openedx-events which library to use to create the event consumer. EVENT_BUS_CONSUMER: edx_event_bus_kafka.KafkaEventConsumer #### If using an auth-restricted broker ##### EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY: 'MY_SR_KEY' EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_SECRET: 'MY_SR_SECRET' EVENT_BUS_KAFKA_API_KEY: 'MY_KEY' EVENT_BUS_KAFKA_API_SECRET: 'MY_SECRET'
  •  We also strongly recommend using a topic prefix to distinguish between environments, eg ‘dev’, ‘stage’, or ‘prod.’ This prefix will be added to all topic names when both producing and consuming. Adding a topic prefix will reduce the likelihood of accidentally conflating data from different environments. To add a topic prefix, use the EVENT_BUS_TOPIC_PREFIX setting.

Producing a signal

As of openedx-events 9.0.0 version, producing a signal to event bus is possible by just adding below settings to the host application and including openedx_events in INSTALLED_APPS setting.

The above setting is read on application startup and a generic handler is connected to each event type allowing users to push to event bus without writing additional code.

Consuming

First-time setup

If this is the first time the service is consuming from the event bus, you will also need to do some setup. This should only be done once per service.

In particular, you will need

  • An environment in which you can run an infinite consumer loop (for example, a Kubernetes worker)

  • For Kafka implementation

    • The confluent_kafka library. See the Producer section for why this needs to be brought in separately.

    • Add the edx-event-bus-kafka library to the base.in  file of your project.

    • Add required Kafka config.

  • For Redis implementation

  • Optional, but recommended configuration:

    • You can set EVENT_BUS_TOPIC_PREFIX to match whatever prefixing strategy you are using for your producers to make sure you are consuming events from the correct environment.

    • Setting EVENT_BUS_KAFKA_CONSUMER_CONSECUTIVE_ERRORS_LIMIT: 5 for Kafka implementation and EVENT_BUS_REDIS_CONSUMER_CONSECUTIVE_ERRORS_LIMIT: 5 for Redis implementation will ensure that a consecutive run of 5 errors will cause the consumer to exit, allowing you to automatically reset application state in the face of persistent errors. This is appropriate if your consumer will then be automatically restarted, e.g. by Kubernetes.

Consuming the signal

  • If you are using an auth-restricted broker, make sure your service API key has READ access to the relevant topic and consumer groups (if you're running the broker locally, you shouldn't need to do anything special to ensure access)

  • Add a signal handler in your service for MY_SIGNAL, i.e.

    • In addition to the event data, kwargs will also contain a metadata parameter, which will be an EventsMetadata object containing information such as the message id, original source, and time the original event occurred (among others).

      • Note that if the producer did not explicitly set the time of the action when calling send_event , the time will default to whenever the send_event call was first made

  • See example code here:

  • Include openedx_events in your service’s INSTALLED_APPS, allowing it to register the consume_events management command.

  • Run the management command

    • Unless you are doing something particularly fancy with consumer groups, my_group should just be service_environment, eg course_discovery_stage

    • Note that you should not include the topic prefix in the management command. That should be specified by setting the optional EVENT_BUS_TOPIC_PREFIX Django setting.

    • Important: Note that here we are using the event type and not the module string. This is different than in the produce_event command used for testing.

    • You can pass additional arguments to underlying implementation of event_bus using --extra parameter with JSON string as value:

Error handling

Automatic error handling is still a work in progress. edx-event-bus-kafka as of 1.9.0 and edx-event-bus-redis as of 0.1.0 logs what should be sufficient information to recover from most failures on both the consumer and producer sides. (This includes: Keys of events as they are produced, keys and values when producer fails, and keys, values, and offsets, when the consumer fails.) Theoretically, they can be replayed by manual script in case of an error.

In addition, as of version 3.5.1 of edx-event-bus-kafka you can replay all events in the consumer from a given time. To do this, you need to run your consuming command with the -o flag, like

As of version 4.0.0, edx-event-bus-kafka makes use of consumer API from openedx-events and depends on consume_events command from it. To reset offsets from this version on-wards you need to make use of --extra argument, like

This will replay all events that were received by Kafka after 2023-01-01. You need to specify the date in ISO format. You will want to have all consumers in the same consumer group run this command to make sure you capture all events across all partitions. As soon as all consumers are running the new command, they should then be redeployed to run without the -o option (even if replay is not yet finished.)

Beyond logging and replaying, there are a number of different recommended patterns for error handling, all of which are dependent on the use case and the risks of different types of errors. https://www.confluent.io/blog/error-handling-patterns-in-kafka/ gives a pretty good summary of some of the available options you may wish to pursue. Eventually we may add some of these to the library but there are no immediate plans beyond the logging and replaying work mentioned above.

Events can be replayed with edx-event-bus-redis as well:

Event Bus Settings

You can find the docs for all EVENT_BUS_ settings using the following:

Special Kafka Configurations

At time of writing, the Open edX Kafka implementation of the event bus uses mostly the default settings on Producers and Consumers, and our Confluent Kafka instance is likewise mostly set up with the defaults, but there are many different configurations you may want to think about for your use case. None of these are yet supported but are written here as possible avenues for future work depending on need.

Producer

All the configurations are available here: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#ak-producer-configurations-for-cp . Of particular interest:

linger.ms - How long the producer will wait after receiving an event to actually produce it (default is 0, meaning we produce as soon as we receive). Lower values decrease latency and decrease chance of message loss due to server shutdown; higher values allow potentially more efficient batching. See also batch.size, especially if you choose to set linger.ms above zero to take advantage of batching.

The defaults of acks=all, enable.idempotence=true, and retries > 0 all help promote in-order, exactly-once delivery to the broker.

Consumer

All the configurations are available here: https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html . Of particular interest:

enable.auto.commit - This is explicitly set to False disabled in the current implementation.

When set to True, a background process will periodically commit the last processed offset. This means if server shutdown occurs in between commits, when the consumer starts back up, it may reprocess events that occurred after the last commit (this is how at-least-once delivery works.) However, it may also cause some events to be lost—those events which had been retrieved for processing but had not been processed.

When False, the caller is responsible for calling consumer.commit() to commit offsets after processing the message.

Broker

We currently do not set any of these configurations in code, only on our Confluent Kafka instance. All broker configurations are here: https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html# . Of particular interest:

auto.create.topics.enable - When True, will automatically create new topics when an event is produced to them. We recommend setting this to False for stage/production environments for better regulation of topics

Observability

See “Error Handling” for details on error logging.

Other observability options:

  • Kafka itself has a number of metrics available.

  • New Relic has several capabilities:

    • A number of automatic hooks for metrics for producing and consuming messages.

      • If using New Relic, ensure “Distributed Tracing” is used for all producing and consuming New Relic apps, in order to provide consumer lag details.

    • Custom attributes are detailed with custom_attribute_name annotations.

Monitoring

You’ll likely want alerting for the following:

  • No event production/consumption for some time interval

  • Error rates in producer/consumer

  • Consumer processing time

  • Consumer lag (lag: how far the consumer is behind, in number of events)

    • This is preferable over consumer latency, which is the time elapsed between an event’s production and consumption. Latency can be artificially inflated by the previous event having a long processing time even when lag is very small (creating false positives), and also can only be measured once an event finally arrives (making the metric prone to false negatives).

Open Questions

Some issues we have not yet addressed include:

Private Organization Links