Asynchronous Task Processing Architecture V2

Collaborators: Feanil Patel (Deactivated), Renzo Lucioni (Deactivated), ClintonB (Deactivated), Jim Abramson (Deactivated)
Comment deadline: September 8, 2015

Context

The E-Commerce team is making plans to implement Asynchronous Order Fulfillment in order to achieve a more resilient integration between Otto and the LMS.  In discussions about this with DevOps, we have recognized the opportunity to make some improvements to the way we build, deploy, and execute asynchronous tasks for our IDAs.

This proposal does not recommend a significant departure from using the Celery/Rabbit stack already in wide use at edX, but it does introduce a new deployment/operation model for asynchronous operations (i.e., Celery tasks) and proposes some specific guiding principles and constraints intended to facilitate a sane, scalable, and extensible approach to message-driven application services.

Goals

Threefold:

  1. Implement asynchronous fulfillment for ecommerce order processing (order fulfillment and revocation).
  2. Address some issues that have made deploying and operating background workers difficult in the past.
  3. Establish a pattern that can be reused for messaging-oriented solutions to future ecommerce use cases, as well as similar use cases for other services in Open edX.

Note that the use case driving this effort consists of a large number of small, frequently-executed tasks. This is in contrast to the long-running, less frequently-executed tasks, such as data exports and bulk operations, which constitute the typical profile of work for which we've implemented async processing in the past. In general, the patterns should be applicable in either case.

Proposal

Overview


E-Commerce Order Fulfillment interactions, using messaging.

A (red): A user (the "buyer") completes checkout, and an order is created in the ecommerce system. Once the order is created (opened) a message is emitted to a fulfillment queue indicating the need for handling.

B (green): A worker in a separate process reads the message from the queue, inspects the state of the order, and dispatches tasks to fulfill each line of the order as needed. When a line is fulfilled, the task will update its status using the ecommerce system's Orders API.

C (red): The user visits their dashboard page, and sees their new courses / items. NOTE: it is possible for C to happen before B finishes!

Messaging Infrastructure

RabbitMQ is recommended:

  • It is already part of the OpenEdX stack.
  • It is widely deployed, actively supported, and proven at scale.
  • Rabbit's message protocol, AMQP, supports a wide variety of messaging styles and delivery guarantees.
  • Tradeoff: vendor protocol implementations and interoperability.  RabbitMQ implements version 0.9.1 of the AMQP protocol; as far as I'm aware the only other mature OSS implementation is Apache Qpid, which may have some gaps/inconsistencies with Rabbit's implementation (cf. https://www.rabbitmq.com/interoperability.html).  Qpid (and a few other vendors) support protocol version 1.0, the development of which has been fraught with politics and disagreement (see: this blog post by Peter Hintjens of 0MQ), but due to differences in the scope of what is defined by the two versions, there is no overt upgrade path.  At present protocol v1.0 is supported in RabbitMQ only using a plugin which is deemed "experimental".
    • As a result of the above, we should expect to be committed to RabbitMQ's 0.9.1 protocol implementation for a long time, until we see the AMQP community converge around newer protocols / implementations, or until the platform moves in the direction of a substantially different messaging platform.

Python Messaging Library

Celery is recommended:

  • This library is also already in use within edx-platform.
  • This library contains both RabbitMQ client drivers and very developer-friendly APIs for composing messaging-based workflows.
  • This library makes it relatively easy to run asynchronous tasks synchronously via configuration switches, which greatly simplifies work in development and testing environments.
  • Tradeoff: pub/sub approach to messaging is not overtly supported, though it can be simulated to an extent (see below)

Deployment Model

The way we deploy and host worker processes in OpenEdX is one of the main changes introduced under this proposal.

Worker Application

Download example code: worker.tar.gz

A number of long-running processes are implemented in edx-platform as Celery tasks. Presently, these tasks are deployed as standalone worker processes, meaning that in addition to managing edx-platform deployed as a WSGI process on web servers, it is also running on separate hosts under a Celery process. This model does not scale well and makes it easy to exploit Celery anti-patterns, in particular the sharing of Django models between application and task code. As we decompose edx-platform into an increasing number of smaller services, we want to avoid the complexity introduced by models shared in this way by separating tasks from the rest of the application they service.

We propose a per-IDA worker application implemented as follows:

Task Distribution

Applications may house a tasks module in a worker package. During local development, these applications run their tasks in-process. In production, workers install the package, listed in a requirements file, and import tasks from the installed tasks module.

Decoupled, Independently-Versioned Tasks

  • Tasks interact with services via versioned API calls.
  • Tasks do NOT reference Django models, instead exchanging data with APIs using coarse-grained data payloads (e.g., JSON).
  • Task interfaces should be at the same grain as the resources exposed by the relevant service APIs, using only immutable arguments for parameters.

Using these three constraints allows us to update the webapp and worker applications independently, with mutual insulation from version conflicts. To illustrate how this works, assume the following example pseudo-implementations:

ecommerce.ecommerce.extensions.order.signals
"""
For this example assume the application exposes the following API endpoints to workers:
 
GET /api/v2/orders/:order_id
Exposes detailed view of an order, consumed by worker tasks in order to determine what needs processing

PUT /api/v2/orders/:order_id/lines/:line_id
Allows a worker to update the fulfillment state of an individual line (completed or failed)
"""
from worker.tasks import process_order

@receiver(post_save, sender=Order)
def order_post_save(sender, **kwargs):
    # Emit message via Celery. The Celery object has been initialized separately.
	celery.send_task('process_order_v0', kwargs={'order_id': kwargs['instance'].id})
ecommerce.worker.tasks
from celery import shared_task

@shared_task(name='process_order_v0')
def process_order(order_id):
    # Fetch the resource.
    order = ecommerce_api('v2').orders.get(order_id)
    # Do stuff to the order based on current state.
    for line in order['lines']:
        # Process line (e.g., by calling the Enrollment API).
        ecommerce_api('v2').orders(order_id).lines(line['id']).put({'status': 'completed'})

 

Example 1: Web App (message publisher / triggerer of tasks) is updated.

Assume a v3 API is added alongside the example v2 implementation.  Provided that v2 remains backwards-compatible, the task will continue to consume from the v2 API without issues.  Because the v2 order interface is still supported, the task does not need to be modified, unless and until the v2 API is taken offline.

Example 2: Worker (message consumer / executor of tasks) is updated.

Assume we'd like to change the internal logic of `process_order` - as long as its interface does not change, the updated workers should immediately be able to pick up existing messages in the queue and process them with the updated logic.

Task Versioning

In the event of a task's interface changing, different versions of the same task can be distinguished using task names. For example, the function name of a fulfillment task might be fulfill_product, but its v0 and v1 implementations could be distinguished by task name: fulfill_product_v0 as opposed to fulfill_product_v1. The application then calls tasks by name using Celery's send_task(). Incrementing the version on a task should only be necessary when circumstances dictate that the function/task signature be changed. These situations should be rare, but when they inevitably do arise, we have a way to deal with them. We shouldn't need to increment a task's version when changing the version of an API consumed by the task, for reasons described above.

 

Performance

(This discussion leaves aside the performance impacts of migrating any particular synchronous platform feature to work asynchronously - these are complex and difficult to generalize as each workflow is different.)

The performance of individual tasks will vary entirely based on what they, and the APIs upon which they depend, are executing. One risk in particular is blocking a queue of small, high-frequency tasks behind one or a few heavier, long-running ones.  This implies a need to classify those tasks into worker pools by their "size" so that we can progressively fine-tune the provisioning for each category of task.  (This further implies that we provide some metadata associated with tasks that can be used to automate that classification, although that may not necessarily need to be developer-defined.) Teams managing their IDAs' worker pools will be expected to divide their tasks across queues as necessary to avoid starving tasks and ensure that tasks are prioritized appropriately.