In microservices architecture, it is very common that a service, as part of its business logic, may need to update its own database and also publish related events that will be consumed by other services. For example, a reporting service may be interested in all create, update or delete events by an employee service.
Modifying the database and forwarding domain events to other services must be done reliably, otherwise you risk either updating the database and not notifying other services or not updating the database and notifying other services. For example, consider the following method for creating an employee in the database and then publishing a domain event.
@Transactional public Employee create(Employee employee) { Employee savedEmployee = employeeRepository.save(employee); applicationEventPublisher.publishEvent(employeeMapper.toEmployeeCreatedDomainEvent(savedEmployee)); return savedEmployee; }
We then consume this domain event and send it to another service.
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void handleEmployeeCreatedEvent(EmployeeCreated event) { reportingClient.send(event); }
This code will work perfectly fine most of the time. However, it is possible that after the transaction is committed, and while trying to send the event, the reporting service is unavailable, so it will not receive the message. Even, if we have a retry logic, it is possible that the employee service may crash after committing the transaction but before sending the message.
You may be tempted to change the code to send the message as part of the transaction.
@EventListener public void handleEmployeeCreatedEvent(EmployeeCreated event) { reportingClient.send(event); }
An exception while sending the event will revert the transaction. However, it is possible that the event will be sent successfully, but the transaction will fail to be committed because for example the database may return an error, or the current service crashes. It is also a bad practise to try to send an HTTP request as part of the transaction. This does not provide any atomicity guarantees and also keeps the connection active, which may prevent other processes / threads from getting a connection.
The transactional outbox pattern avoids the atomicity issue mentioned above by inserting the domain event into an outbox table as part of the same transaction that inserts, updates or deletes database entities. The atomicity property of ACID transactions guarantees that either both events will be written or none. Then, a background process reads the events from the outbox table and publishes them to a message broker, from which multiple consumer services can read the published domain events.
There are few things we need to consider before implementing the transactional outbox pattern.
If an employee was first created, then updated and then deleted, we may want other services to receive these events with the same exact order. For example, it may not be acceptable to get first the EmployeeDeleted event and then the EmployeeCreated event.
The background process can either poll the outbox table or tail the transaction log to get the outbox events. Polling the database is much simpler, but it consumes resources when there are no available events, and it could also be unacceptable if events need to be delivered in real time. Transaction log tailing avoids these issues, but it requires a database specific solution.
The background process could introduce duplicate events. For example, when the background process uses polling, it is possible to publish the event to the message broker and then crash before deleting it from the database. When the process restarts, it will read the same event and send it again. Duplicates are also possible in case of transaction log tailing, given that the process could fail after sending the event but before updating the last processed offset. When the process restarts, it will start reading the transaction log from the previous known offset and so, will introduce duplicates.
A message broker is particularly useful if there are multiple consumers. Each consumer can read messages on their own pace. Depending on the complexity of your system, though, you may choose not to use a message broker.
The solution that we will discuss here is easy to implement, is low-cost, works with any SQL database, provides ordering guarantees, and ensures no duplicate messages. It works well if you do not need real time event delivery. If your application has such requirement, you may need to consider a more sophisticated approach like Debezium or Eventuate.
The following diagram shows the architecture of the solution.
Let’s discuss some implementation details.
@Entity @Table(name = "outbox") @TypeDef(typeClass = JsonType.class, defaultForType = JsonNode.class) public class Outbox { @Id @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "outbox_seq") private Long id; private String aggregateType; private String aggregateId; private String eventType; @Column(columnDefinition = "json") private JsonNode payload; }
In the outbox table, we store the domain event, and we also store few additional fields that are useful
when forwarding it. Specifically, id
is the primary key of the table. We use a sequence number, because we want to preserve the order of events.
We also want consumers to receive related events in order. For this purpose we are using the concept of aggregates
from Domain-Driven Design. Aggregate roots with the same id and type (aggregateId
and aggregateType
) will be delivered in order to consumers. In other words, all events related
to employees with the same id will be delivered in order, but events related to employees with a different id may be delivered out of order. We also keep
the type of the event, eventType
. For example, EmployeeCreated. This will be used to enable consumers to subscribe to specific event types.
Lastly, we have the payload
which is the domain event in a json format.
@JsonTypeInfo(use = JsonTypeInfo.Id.MINIMAL_CLASS) public abstract class DomainEvent { }
public class EmployeeCreated extends DomainEvent { private UUID id; private String firstName; private String lastName; private String email; }
In the parent class we use a Jackson annotation which appends the class type during serialisation into json. This helps to create the correct object during deserialization.
@Transactional public Employee create(Employee employee) { Employee savedEmployee = employeeRepository.save(employee); eventPublisher.publishEvent(employeeMapper.toEmployeeCreatedDomainEvent(savedEmployee)); return savedEmployee; }
private static final String EMPLOYEE_AGGREGATE_TYPE = Employee.class.getSimpleName(); public EnrichedDomainEvent<EmployeeCreated> toEmployeeCreatedDomainEvent(Employee employee) { return EnrichedDomainEvent.<EmployeeCreated>builder() .aggregateType(EMPLOYEE_AGGREGATE_TYPE) .aggregateId(employee.getId().toString()) .domainEvent(EmployeeCreated.builder() .id(employee.getId()) .firstName(employee.getFirstName()) .lastName(employee.getLastName()) .email(employee.getEmail()) .build()) .build(); }
@EventListener public void handleEnrichedDomainEvent(EnrichedDomainEvent<?> event) { outboxRepository.save(Outbox.builder() .aggregateId(event.getAggregateId()) .aggregateType(event.getAggregateType()) .eventType(event.getDomainEventType()) .payload(objectMapper.convertValue(event.getDomainEvent(), JsonNode.class)) .build()); }
The domain event is stored to the outbox table as part of the same transaction with employee.
@Transactional @Scheduled(fixedDelayString = "${cdc.polling_ms}") public void forwardEventsToSNS() { List<Outbox> entities = outboxRepository.findAllByOrderByIdAsc(Pageable.ofSize(batchSize)).toList(); entities.forEach(entity -> amazonSNS.publish(new PublishRequest() .withTopicArn(snsTopic) .withMessage(entity.getPayload().toString()) .withMessageGroupId(String.format("%s-%s", entity.getAggregateType(), entity.getAggregateId())) .withMessageAttributes(Map.of("eventType", new MessageAttributeValue() .withDataType("String").withStringValue(entity.getEventType()))))); outboxRepository.deleteAllInBatch(entities); }
@Repository public interface OutboxRepository extends JpaRepository<Outbox, Long> { @Lock(LockModeType.PESSIMISTIC_WRITE) Page<Outbox> findAllByOrderByIdAsc(Pageable pageable); }
We first read a batch of messages from the outbox table, we then forward them to SNS FIFO, and we finally delete them.
We want to preserve the insertion order, so we sort the unprocessed events by the sequence id. We also lock the selected rows
to prevent any other employee instances from reading any rows from the outbox table and thus causing out of
order events. If we don’t want any other instances to wait for the lock, we can use a NO_WAIT
hint. The instances will retry
during the next scheduled polling.
Related to SNS request, we set MessageGroupId
. This guarantees that messages that belong to the same group will be delivered
in order. Messages with different group id can be delivered out of order. The id used here guarantees that events related to the same
aggregate root are delivered in order. We also add eventType
as a message attribute. This is used when configuring
the SQS subscriptions to subscribe to only specific event types. For example, one consumer may be interested only in
EmployeeCreated and EmployeeUpdated events and another consumer may be only interested in EmployeeDeleted events.
We can also set MessageDeduplicationId
. Any message published with the same id, within the five-minute deduplication
interval, is accepted but not delivered. There is also an option to set a content based deduplication at SNS topic level.
This option considers events with the same content as duplicates. This is the option selected here.
@SqsListener(value = "${input_queue}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS) public void receiveQueueMessages(@NotificationMessage DomainEvent domainEvent) { logger.info("Received domain event {}", domainEvent); }
A consumer service polls its own queue and receives domain events.
To reliably forward events to multiple consumers we are using SNS and SQS. Employee service forwards all events to an SNS topic. If another
service is interested in any employee events, then it needs to subscribe its SQS to the employee SNS topic. Subscriptions can have
filters, so a consumer can get only a subset of the published messages. We use eventType
to subscribe to specific events. We also need
order guarantees, so we use an SNS and SQS FIFO, which also removes any duplicates within a five-minute interval. You can learn more about
SNS FIFO topics here and about SQS FIFO here.
You can find the full working example in my GitHub repository here.
In this post, I explained why the transactional outbox pattern is needed when publishing domain events, and I presented an example implementation.