Outbox as in that folder in my email client? No, not exactly but there are some similarities!
发件箱一词描述了一个模式,允许dependent components or services to performread your own writesemantics while concurrently providing a reliable, eventually consistent view to those writes across component or service boundaries.
You can read more about the Outbox pattern and how it applies to microservices in our blog post,Reliable Microservices Data Exchange With the Outbox Patttern.
So what exactly is an Outbox Event Router?
In Debezium version 0.9.3.Final, we introduced a ready-to-useSingle Message Transform(SMT) that builds on the Outbox pattern to propagate data change events using Debezium and Kafka. Please see thedocumentationfor details on how to use this transformation.
Going Supersonic with Quarkus!
Quarkusis a Kubernetes Native Java framework that is tailored for GraalVM and HotSpot using thebest-of-breedJava technologies and standards. Quarkus aims to offer developers a unified reactive and imperative programming model to address a wide range of application architectures.
So what does all this mean exactly in laymen’s terms?
In short, the Debezium community can now leverage the Outbox pattern in a Quarkus-based application using a ready-to-use extension that works in parallel with your Debezium connector to emit change data events. The Debezium Outbox extension for Quarkus can be used in both JVM or Native image modes in Quarkus.
How to get it?
Currently the dependency must be manually added to your Quarkus application’spom.xml
as shown below. There are plans to make this extension available in the Quarkus extension catalogue as well as via Quarkus' Maven plugin in a future release.
io.debezium.quarkus< / groupId> debezium-quarkus-outbox 1.1.0.Alpha1
At the time of this blog, the extension was released as1.1.0.Alpha1. |
Using the extension
The Debezium Outbox extension uses the Observer pattern to monitor when the user application emits an object that implements theio.debezium.outbox.quarkus.ExportedEvent
interface. This allows the Quarkus application behavior to be completely decoupled from that of the extension.
Lets walk through a simple example where a service is responsible for storing newly created orders and then emits an event that could be used to notify other interested services that an order has been created.
So to get started, we’ll begin by first implementingOrderCreatedEvent
, an implementation ofExportedEvent
. This event is used to signal when anOrder
has been saved by theOrderService
.
publicclassOrderCreatedEventimplementsExportedEvent<String, JsonNode> {privatefinallongorderId;privatefinalJsonNode payload;privatefinalInstant created;publicOrderCreatedEvent(Instant createdAt, Order order) {this.orderId = order.getId();this.payload = convertOrderToJsonNode(order);this.created = createdAt; }@OverridepublicStringgetAggregateId() {returnString.valueOf(orderId); }@OverridepublicStringgetAggregateType() {return"Order"; }@OverridepublicJsonNode getPayload() {returnpayload; }@OverridepublicStringgetType() {return"OrderCreated"; }@OverridepublicInstant getTimestamp() {returncreated; } }
TheExportedEvent
interface is the contract that defines how a Quarkus application is to provide the extension with the data to persist to the outbox database table. This contract exposes several different values discussed below:
Aggregate Id
The aggregate id is used when emitting messages to Kafka as the message key to preserve message order. In this example, theOrderCreatedEvent
returns the order identifier.
The |
Aggregate Type
The aggregate type is a string-based value that is used to append to the Kafka topic name and also assists in routing of the given message inside the Outbox Event Router SMT. In this example, we useOrder
and when using the default configuration of the SMT, messages would be found in theoutbox.event.Order
topic. Please see theroute.topic.replacement
in theSMT配置选项for more details.
Type
The message type is a string value that is emitted in the Kafka message’s envelope. In this example, the value in the message envelope would beOrderCreated
.
Timestamp
By default, the Outbox Event Router SMT emits outbox events using the current timestamp when processing records but this may not always be sufficient for every use case. This field allows the source application to specify anInstant
that can then be configured through theSMT配置选项to be used as the Kafka message timestamp instead.
Payload
The payload is the message content or value and is what is consumed by consumers of the Kafka topic.
The |
If multiple implementations of |
By itself, thisOrderCreatedEvent
does nothing on its own.
Next we want to implement an application component that is responsible for persisting the order to the database and then to emit theOrderCreatedEvent
event. TheOrderService
class below uses JPA to persist theOrder
entity and thenjavax.enterprise.event.Event
to notify the outbox extension.
@ApplicationScopedpublicclassOrderService{@InjectEntityManager entityManager;@InjectEventString, JsonNode>> event;@TransactionalpublicOrder addOrder(Order order) { entityManager.persist(order); event.fire(newOrderCreatedEvent(Instant.now(), order));returnorder; } }
Before starting the application, certain configuration settings must be specified inapplication.properties
. An example configuration might look like the following where we specify the database to connect to as well as how the persistence provider, Hibernate, is to operate.
quarkus.datasource.driver=org.postgresql.Driver quarkus.datasource.url=jdbc:postgresql://order-db:5432/orderdb?currentSchema=orders quarkus.datasource.username=user quarkus.datasource.password=password quarkus.hibernate-orm.database.generation=update quarkus.hibernate-orm.dialect=org.hibernate.dialect.PostgreSQLDialect quarkus.hibernate-orm.log.sql=true
By starting the application with this configuration the outbox tableOutboxEvent
will be created in theorders
schema of the theorder-db
database with the following layout:
orderdb=# \d orders.outboxeventTable"orders.outboxevent"Column| Type | Collation | Nullable |Default---------------+-----------------------------+-----------+----------+---------id | uuid | |notnull| aggregatetype | character varying(255) | |notnull| aggregateid | character varying(255) | |notnull| type | character varying(255) | |notnull|timestamp|timestampwithouttimezone | |notnull| payload | character varying(8000) | | | Indexes:"outboxevent_pkey"PRIMARYKEY, btree (id)
When using |
Should the table or column names not fit your naming convention, they can be customized with severalbuild-time configuration options. For example, if you wanted the table to be namedoutbox
rather thanoutboxevent
add the following line to theapplication.properties
file:
quarkus.debezium-outbox.table-name=outbox
If you enabled SQL logging or check the row count of the outbox table, you might find it unusual that after saving the order that a record is inserted into the outbox table but then is immediately deleted. This is the default behavior since rows are not required to be retained for Debezium to pick up the change.
If row retention is required, this can be configured using arun-time configuration option. In order to enable row retention, add the following configuration to theapplication.properties
file.
quarkus.debezium-outbox.remove-after-insert=false
Setting up the connector
Up to this point we’ve covered how to configure and use the extension in a Quarkus application to save events into the outbox database table. The last step is to configure the Debezium connector to monitor the outbox and emit those records to Kafka.
We’re going to use the following connector configuration:
{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"order-db","database.port":"5432","database.user":"user","database.password":"password","database.dbname":"orderdb","database.server.name":"dbserver1","schema.whitelist":"orders","table.whitelist":"orders.outboxevent","tombstones.on.delete":"false","transforms":"outbox","transforms.outbox.type":"io.debezium.transforms.outbox.EventRouter","transforms.outbox.route.topic.replacement":"${routedByValue}.events","transforms.outbox.table.field.event.timestamp":"timestamp","transforms.outbox.table.fields.additional.placement":"type:header:eventType"}
A vast majority of this is standard Debezium connector configuration, but what is important are the last several lines that begin withtransforms. These are configuration options that are used by Kafka Connect to configure and call the Outbox Event Router SMT.
This configuration uses a custom This configuration also specifies the Please seeOutbox Event Router Configuration Optionsfor details on how to configure the SMT. |
Once the connector is running, theOrder.events
topic will be populated with messages from the outbox table. The following JSON example represents anOrder
which gets saved by theOrderService
.
{"customerId":"123","orderDate":"2019-01-31T12:13:01","lineItems": [ {"item":"Debezium in Action","quantity":2,"totalPrice":39.98}, {"item":"Debezium for Dummies","quantity":1,"totalPrice":29.99} ] }
When examining theOrder.events
topic, the event emitted will look like the following:
{"key":"1","headers":"id=cc74eac7-176b-44e7-8bda-413a5088ca66,eventType=OrderCreated"}"{\"id\":1,\"customerId\":123,\"orderDate\":\"2019-01-31T12:13:01\",\"lineItems\":[{\"id\":1,\"item\":\"Debezium in Action\",\"quantity\":2,\"totalPrice\":39.98,\"status\":\"ENTERED\"},{\"id\":2,\"item\":\"Debezium for Dummies\",\"quantity\":1,\"totalPrice\":29.99,\"status\":\"ENTERED\"}]}"
Wrapping up
It is really simple and easy to setup and use the Debezium Outbox extension.
We have a completeexamplein our examples repository that uses the order service described here as well as a shipment service that consumes the events. For more details on the extension, refer to theOutbox Quarkus Extensiondocumentation.
Future Plans
The current implementation of the Debezium Outbox extension works quite well, but we acknowledge there is still room for improvement. Some of the things we’ve already identified and have plans to include in future iterations of the extension are:
Avro serialization support for event payload
Full outbox table column attribute control, e.g. definition, length, precision, scale, and converters.
Complete outbox table customization using a user-supplied entity class.
Allow varied signatures of
ExportedEvent
within a single application.
We are currently tracking all future changes to this extension inDBZ-1711. As always we welcome any and all feedback, so feel free to let us know in that issue, on Gitter, or the mailing lists.
About Debezium
Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top ofKafkaand providesKafka Connectcompatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium isopen sourceunder theApache License, Version 2.0.
Get involved
We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter@debezium,chat with us on Zulip, or join ourmailing listto talk with the community. All of the code is open sourceon GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know orlog an issue.