Ioannis Ioannou
HomeAbout MeContact

Transactional Outbox Pattern

Ioannis Ioannou
Ioannis Ioannou
June 22, 2021
6 min
Transactional Outbox Pattern

In microservices architecture, it is very common that a service, as part of its business logic, may need to update its own database and also publish related events that will be consumed by other services. For example, a reporting service may be interested in all create, update or delete events by an employee service.

Atomicity Issue

Modifying the database and forwarding domain events to other services must be done reliably, otherwise you risk either updating the database and not notifying other services or not updating the database and notifying other services. For example, consider the following method for creating an employee in the database and then publishing a domain event.

@Transactional
public Employee create(Employee employee) {
    Employee savedEmployee = employeeRepository.save(employee);
    applicationEventPublisher.publishEvent(employeeMapper.toEmployeeCreatedDomainEvent(savedEmployee));
    return savedEmployee;
}

We then consume this domain event and send it to another service.

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleEmployeeCreatedEvent(EmployeeCreated event) {
    reportingClient.send(event);
}

This code will work perfectly fine most of the time. However, it is possible that after the transaction is committed, and while trying to send the event, the reporting service is unavailable, so it will not receive the message. Even, if we have a retry logic, it is possible that the employee service may crash after committing the transaction but before sending the message.

Failed to send event after transaction committed

You may be tempted to change the code to send the message as part of the transaction.

@EventListener
public void handleEmployeeCreatedEvent(EmployeeCreated event) {
    reportingClient.send(event);
}

An exception while sending the event will revert the transaction. However, it is possible that the event will be sent successfully, but the transaction will fail to be committed because for example the database may return an error, or the current service crashes. It is also a bad practise to try to send an HTTP request as part of the transaction. This does not provide any atomicity guarantees and also keeps the connection active, which may prevent other processes / threads from getting a connection.

Failed to commit transaction

Transactional Outbox Pattern

The transactional outbox pattern avoids the atomicity issue mentioned above by inserting the domain event into an outbox table as part of the same transaction that inserts, updates or deletes database entities. The atomicity property of ACID transactions guarantees that either both events will be written or none. Then, a background process reads the events from the outbox table and publishes them to a message broker, from which multiple consumer services can read the published domain events.

Transactional Outbox Pattern

There are few things we need to consider before implementing the transactional outbox pattern.

Order of domain events

If an employee was first created, then updated and then deleted, we may want other services to receive these events with the same exact order. For example, it may not be acceptable to get first the EmployeeDeleted event and then the EmployeeCreated event.

Approach for getting outbox events

The background process can either poll the outbox table or tail the transaction log to get the outbox events. Polling the database is much simpler, but it consumes resources when there are no available events, and it could also be unacceptable if events need to be delivered in real time. Transaction log tailing avoids these issues, but it requires a database specific solution.

Duplicate events

The background process could introduce duplicate events. For example, when the background process uses polling, it is possible to publish the event to the message broker and then crash before deleting it from the database. When the process restarts, it will read the same event and send it again. Duplicates are also possible in case of transaction log tailing, given that the process could fail after sending the event but before updating the last processed offset. When the process restarts, it will start reading the transaction log from the previous known offset and so, will introduce duplicates.

Message Broker

A message broker is particularly useful if there are multiple consumers. Each consumer can read messages on their own pace. Depending on the complexity of your system, though, you may choose not to use a message broker.

Implementation

The solution that we will discuss here is easy to implement, is low-cost, works with any SQL database, provides ordering guarantees, and ensures no duplicate messages. It works well if you do not need real time event delivery. If your application has such requirement, you may need to consider a more sophisticated approach like Debezium or Eventuate.

The following diagram shows the architecture of the solution.

Architecture diagram

  • Employee Service inserts, updates or deletes employee entities and inserts generated domain events in the outbox table in a transaction.
  • Uses Amazon SNS and Amazon SQS services to reliably forward events to multiple consumers.
  • Employee Service polls the outbox table and forwards events to an SNS FIFO topic.
  • Events related to the same employee are ordered. Events related to different employees may be consumed out of order.
  • To maintain the order of events, only one employee instance can read rows from the database at the same time.
  • SNS/SQS FIFO ensures there is no duplicate message delivery within a five-minutes interval.
  • Each consumer service has its own SQS FIFO.
  • SQS FIFO subscribes to the employee SNS FIFO topic. A subscription filter is used to subscribe to specific event types.
  • Consumer Service polls its own queue and consumes the domain events.

Let’s discuss some implementation details.

Outbox Entity

@Entity
@Table(name = "outbox")
@TypeDef(typeClass = JsonType.class, defaultForType = JsonNode.class)
public class Outbox {
    @Id
    @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "outbox_seq")
    private Long id;

    private String aggregateType;

    private String aggregateId;

    private String eventType;

    @Column(columnDefinition = "json")
    private JsonNode payload;
}

In the outbox table, we store the domain event, and we also store few additional fields that are useful when forwarding it. Specifically, id is the primary key of the table. We use a sequence number, because we want to preserve the order of events. We also want consumers to receive related events in order. For this purpose we are using the concept of aggregates from Domain-Driven Design. Aggregate roots with the same id and type (aggregateId and aggregateType) will be delivered in order to consumers. In other words, all events related to employees with the same id will be delivered in order, but events related to employees with a different id may be delivered out of order. We also keep the type of the event, eventType. For example, EmployeeCreated. This will be used to enable consumers to subscribe to specific event types. Lastly, we have the payload which is the domain event in a json format.

Domain events

@JsonTypeInfo(use = JsonTypeInfo.Id.MINIMAL_CLASS)
public abstract class DomainEvent {
}
public class EmployeeCreated extends DomainEvent {
    private UUID id;
    private String firstName;
    private String lastName;
    private String email;
}

In the parent class we use a Jackson annotation which appends the class type during serialisation into json. This helps to create the correct object during deserialization.

Save domain event to outbox table

@Transactional
public Employee create(Employee employee) {
    Employee savedEmployee = employeeRepository.save(employee);
    eventPublisher.publishEvent(employeeMapper.toEmployeeCreatedDomainEvent(savedEmployee));
    return savedEmployee;
}
private static final String EMPLOYEE_AGGREGATE_TYPE = Employee.class.getSimpleName();

public EnrichedDomainEvent<EmployeeCreated> toEmployeeCreatedDomainEvent(Employee employee) {
    return EnrichedDomainEvent.<EmployeeCreated>builder()
            .aggregateType(EMPLOYEE_AGGREGATE_TYPE)
            .aggregateId(employee.getId().toString())
            .domainEvent(EmployeeCreated.builder()
                    .id(employee.getId())
                    .firstName(employee.getFirstName())
                    .lastName(employee.getLastName())
                    .email(employee.getEmail())
                    .build())
            .build();
}
@EventListener
public void handleEnrichedDomainEvent(EnrichedDomainEvent<?> event) {
    outboxRepository.save(Outbox.builder()
            .aggregateId(event.getAggregateId())
            .aggregateType(event.getAggregateType())
            .eventType(event.getDomainEventType())
            .payload(objectMapper.convertValue(event.getDomainEvent(), JsonNode.class))
            .build());
}

The domain event is stored to the outbox table as part of the same transaction with employee.

Polling and forwarding to SNS

@Transactional
@Scheduled(fixedDelayString = "${cdc.polling_ms}")
public void forwardEventsToSNS() {
    List<Outbox> entities = outboxRepository.findAllByOrderByIdAsc(Pageable.ofSize(batchSize)).toList();
    entities.forEach(entity -> amazonSNS.publish(new PublishRequest()
            .withTopicArn(snsTopic)
            .withMessage(entity.getPayload().toString())
            .withMessageGroupId(String.format("%s-%s", entity.getAggregateType(), entity.getAggregateId()))
            .withMessageAttributes(Map.of("eventType", new MessageAttributeValue()
                    .withDataType("String").withStringValue(entity.getEventType())))));
    outboxRepository.deleteAllInBatch(entities);
}
@Repository
public interface OutboxRepository extends JpaRepository<Outbox, Long> {
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Page<Outbox> findAllByOrderByIdAsc(Pageable pageable);
}

We first read a batch of messages from the outbox table, we then forward them to SNS FIFO, and we finally delete them.

We want to preserve the insertion order, so we sort the unprocessed events by the sequence id. We also lock the selected rows to prevent any other employee instances from reading any rows from the outbox table and thus causing out of order events. If we don’t want any other instances to wait for the lock, we can use a NO_WAIT hint. The instances will retry during the next scheduled polling.

Related to SNS request, we set MessageGroupId. This guarantees that messages that belong to the same group will be delivered in order. Messages with different group id can be delivered out of order. The id used here guarantees that events related to the same aggregate root are delivered in order. We also add eventType as a message attribute. This is used when configuring the SQS subscriptions to subscribe to only specific event types. For example, one consumer may be interested only in EmployeeCreated and EmployeeUpdated events and another consumer may be only interested in EmployeeDeleted events. We can also set MessageDeduplicationId. Any message published with the same id, within the five-minute deduplication interval, is accepted but not delivered. There is also an option to set a content based deduplication at SNS topic level. This option considers events with the same content as duplicates. This is the option selected here.

Consumer receiving message

@SqsListener(value = "${input_queue}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveQueueMessages(@NotificationMessage DomainEvent domainEvent) {
  logger.info("Received domain event {}", domainEvent);
}

A consumer service polls its own queue and receives domain events.

Amazon SNS and SQS

To reliably forward events to multiple consumers we are using SNS and SQS. Employee service forwards all events to an SNS topic. If another service is interested in any employee events, then it needs to subscribe its SQS to the employee SNS topic. Subscriptions can have filters, so a consumer can get only a subset of the published messages. We use eventType to subscribe to specific events. We also need order guarantees, so we use an SNS and SQS FIFO, which also removes any duplicates within a five-minute interval. You can learn more about SNS FIFO topics here and about SQS FIFO here.

Source Code

You can find the full working example in my GitHub repository here.

Summary

In this post, I explained why the transactional outbox pattern is needed when publishing domain events, and I presented an example implementation.


Tags

#microservices#java#aws#sql
Ioannis Ioannou © 2021