Outbox as in that folder in my email client? No, not exactly but there are some similarities!

发件箱一词描述了一个模式,允许dependent components or services to performread your own writesemantics while concurrently providing a reliable, eventually consistent view to those writes across component or service boundaries.

You can read more about the Outbox pattern and how it applies to microservices in our blog post,Reliable Microservices Data Exchange With the Outbox Patttern.

So what exactly is an Outbox Event Router?

In Debezium version 0.9.3.Final, we introduced a ready-to-useSingle Message Transform(SMT) that builds on the Outbox pattern to propagate data change events using Debezium and Kafka. Please see thedocumentationfor details on how to use this transformation.

Going Supersonic with Quarkus!

Quarkusis a Kubernetes Native Java framework that is tailored for GraalVM and HotSpot using thebest-of-breedJava technologies and standards. Quarkus aims to offer developers a unified reactive and imperative programming model to address a wide range of application architectures.

So what does all this mean exactly in laymen’s terms?

In short, the Debezium community can now leverage the Outbox pattern in a Quarkus-based application using a ready-to-use extension that works in parallel with your Debezium connector to emit change data events. The Debezium Outbox extension for Quarkus can be used in both JVM or Native image modes in Quarkus.

How to get it?

Currently the dependency must be manually added to your Quarkus application’spom.xmlas shown below. There are plans to make this extension available in the Quarkus extension catalogue as well as via Quarkus' Maven plugin in a future release.

io.debezium.quarkus< / groupId>debezium-quarkus-outbox1.1.0.Alpha1

At the time of this blog, the extension was released as1.1.0.Alpha1.
A newer version of the extension may be available, seeReleasesfor details.

Using the extension

The Debezium Outbox extension uses the Observer pattern to monitor when the user application emits an object that implements theio.debezium.outbox.quarkus.ExportedEventinterface. This allows the Quarkus application behavior to be completely decoupled from that of the extension.

Lets walk through a simple example where a service is responsible for storing newly created orders and then emits an event that could be used to notify other interested services that an order has been created.

So to get started, we’ll begin by first implementingOrderCreatedEvent, an implementation ofExportedEvent. This event is used to signal when anOrderhas been saved by theOrderService.

publicclassOrderCreatedEventimplementsExportedEvent<String, JsonNode> {privatefinallongorderId;privatefinalJsonNode payload;privatefinalInstant created;publicOrderCreatedEvent(Instant createdAt, Order order) {this.orderId = order.getId();this.payload = convertOrderToJsonNode(order);this.created = createdAt; }@OverridepublicStringgetAggregateId() {returnString.valueOf(orderId); }@OverridepublicStringgetAggregateType() {return"Order"; }@OverridepublicJsonNode getPayload() {returnpayload; }@OverridepublicStringgetType() {return"OrderCreated"; }@OverridepublicInstant getTimestamp() {returncreated; } }

TheExportedEventinterface is the contract that defines how a Quarkus application is to provide the extension with the data to persist to the outbox database table. This contract exposes several different values discussed below:

Aggregate Id

The aggregate id is used when emitting messages to Kafka as the message key to preserve message order. In this example, theOrderCreatedEventreturns the order identifier.

TheExportedEventinterface is parameterized and the first argument of the parameter argument list allows the application to specify the return data type for the aggregate id. While this example uses aString返回值可以是任何可持久化对象type.

Aggregate Type

The aggregate type is a string-based value that is used to append to the Kafka topic name and also assists in routing of the given message inside the Outbox Event Router SMT. In this example, we useOrderand when using the default configuration of the SMT, messages would be found in theoutbox.event.Ordertopic. Please see theroute.topic.replacementin theSMT配置选项for more details.

Type

The message type is a string value that is emitted in the Kafka message’s envelope. In this example, the value in the message envelope would beOrderCreated.

Timestamp

By default, the Outbox Event Router SMT emits outbox events using the current timestamp when processing records but this may not always be sufficient for every use case. This field allows the source application to specify anInstantthat can then be configured through theSMT配置选项to be used as the Kafka message timestamp instead.

Payload

The payload is the message content or value and is what is consumed by consumers of the Kafka topic.

TheExportedEventinterface is parameterized and the second argument of the parameter argument list allows the application to specify the return data type for the payload. While this example uses aJsonNodeto store a JSON representation of theOrder, the payload can be any persistable object type.

If multiple implementations ofExportedEventexist in a Quarkus application, they must all use the same signature. If different signatures are required, the code should be split into different Quarkus applications because allExportedEventimplementations will be stored in the same database outbox table for a given Quarkus application. We are currently investigating alternatives to loosen this restriction in a future release to allow multiple variants within the same application.

By itself, thisOrderCreatedEventdoes nothing on its own.

Next we want to implement an application component that is responsible for persisting the order to the database and then to emit theOrderCreatedEventevent. TheOrderServiceclass below uses JPA to persist theOrderentity and thenjavax.enterprise.event.Eventto notify the outbox extension.

@ApplicationScopedpublicclassOrderService{@InjectEntityManager entityManager;@InjectEventString, JsonNode>> event;@TransactionalpublicOrder addOrder(Order order) { entityManager.persist(order); event.fire(newOrderCreatedEvent(Instant.now(), order));returnorder; } }

Before starting the application, certain configuration settings must be specified inapplication.properties. An example configuration might look like the following where we specify the database to connect to as well as how the persistence provider, Hibernate, is to operate.

quarkus.datasource.driver=org.postgresql.Driver quarkus.datasource.url=jdbc:postgresql://order-db:5432/orderdb?currentSchema=orders quarkus.datasource.username=user quarkus.datasource.password=password quarkus.hibernate-orm.database.generation=update quarkus.hibernate-orm.dialect=org.hibernate.dialect.PostgreSQLDialect quarkus.hibernate-orm.log.sql=true

By starting the application with this configuration the outbox tableOutboxEventwill be created in theordersschema of the theorder-dbdatabase with the following layout:

orderdb=# \d orders.outboxeventTable"orders.outboxevent"Column| Type | Collation | Nullable |Default---------------+-----------------------------+-----------+----------+---------id | uuid | |notnull| aggregatetype | character varying(255) | |notnull| aggregateid | character varying(255) | |notnull| type | character varying(255) | |notnull|timestamp|timestampwithouttimezone | |notnull| payload | character varying(8000) | | | Indexes:"outboxevent_pkey"PRIMARYKEY, btree (id)

When usingJsonNodeas the payload return type, the extension uses a JPA attribute converter to store the contents as a string in the database.

Should the table or column names not fit your naming convention, they can be customized with severalbuild-time configuration options. For example, if you wanted the table to be namedoutboxrather thanoutboxeventadd the following line to theapplication.propertiesfile:

quarkus.debezium-outbox.table-name=outbox

If you enabled SQL logging or check the row count of the outbox table, you might find it unusual that after saving the order that a record is inserted into the outbox table but then is immediately deleted. This is the default behavior since rows are not required to be retained for Debezium to pick up the change.

If row retention is required, this can be configured using arun-time configuration option. In order to enable row retention, add the following configuration to theapplication.propertiesfile.

quarkus.debezium-outbox.remove-after-insert=false

Setting up the connector

Up to this point we’ve covered how to configure and use the extension in a Quarkus application to save events into the outbox database table. The last step is to configure the Debezium connector to monitor the outbox and emit those records to Kafka.

We’re going to use the following connector configuration:

{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"order-db","database.port":"5432","database.user":"user","database.password":"password","database.dbname":"orderdb","database.server.name":"dbserver1","schema.whitelist":"orders","table.whitelist":"orders.outboxevent","tombstones.on.delete":"false","transforms":"outbox","transforms.outbox.type":"io.debezium.transforms.outbox.EventRouter","transforms.outbox.route.topic.replacement":"${routedByValue}.events","transforms.outbox.table.field.event.timestamp":"timestamp","transforms.outbox.table.fields.additional.placement":"type:header:eventType"}

A vast majority of this is standard Debezium connector configuration, but what is important are the last several lines that begin withtransforms. These are configuration options that are used by Kafka Connect to configure and call the Outbox Event Router SMT.

This configuration uses a customroute.topic.replacementconfiguration property. This setting will instead routeOrderCreatedEventrows from the outbox to theOrder.eventstopic rather than the defaultoutbox.events.Ordertopic.

This configuration also specifies thefield.event.timestampconfiguration property. This setting will instead populate the Kafka message time from thetimestampfield in the outbox database table rather than the current timestamp when processing the row.

Please seeOutbox Event Router Configuration Optionsfor details on how to configure the SMT.

Once the connector is running, theOrder.eventstopic will be populated with messages from the outbox table. The following JSON example represents anOrderwhich gets saved by theOrderService.

{"customerId":"123","orderDate":"2019-01-31T12:13:01","lineItems": [ {"item":"Debezium in Action","quantity":2,"totalPrice":39.98}, {"item":"Debezium for Dummies","quantity":1,"totalPrice":29.99} ] }

When examining theOrder.eventstopic, the event emitted will look like the following:

{"key":"1","headers":"id=cc74eac7-176b-44e7-8bda-413a5088ca66,eventType=OrderCreated"}"{\"id\":1,\"customerId\":123,\"orderDate\":\"2019-01-31T12:13:01\",\"lineItems\":[{\"id\":1,\"item\":\"Debezium in Action\",\"quantity\":2,\"totalPrice\":39.98,\"status\":\"ENTERED\"},{\"id\":2,\"item\":\"Debezium for Dummies\",\"quantity\":1,\"totalPrice\":29.99,\"status\":\"ENTERED\"}]}"

Wrapping up

It is really simple and easy to setup and use the Debezium Outbox extension.

We have a completeexamplein our examples repository that uses the order service described here as well as a shipment service that consumes the events. For more details on the extension, refer to theOutbox Quarkus Extensiondocumentation.

Future Plans

The current implementation of the Debezium Outbox extension works quite well, but we acknowledge there is still room for improvement. Some of the things we’ve already identified and have plans to include in future iterations of the extension are:

  • Avro serialization support for event payload

  • Full outbox table column attribute control, e.g. definition, length, precision, scale, and converters.

  • Complete outbox table customization using a user-supplied entity class.

  • Allow varied signatures ofExportedEventwithin a single application.

We are currently tracking all future changes to this extension inDBZ-1711. As always we welcome any and all feedback, so feel free to let us know in that issue, on Gitter, or the mailing lists.

Chris Cranford

Chris is a software engineer at Red Hat. He previously was a member of the Hibernate ORM team and now works on Debezium. He lives in North Carolina just a few hours from Red Hat towers.


About Debezium

Debezium is an open source distributed platform that turns your existing databases into event streams, so applications can see and respond almost instantly to each committed row-level change in the databases. Debezium is built on top ofKafkaand providesKafka Connectcompatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, so your application can be stopped and restarted at any time and can easily consume all of the events it missed while it was not running, ensuring that all events are processed correctly and completely. Debezium isopen sourceunder theApache License, Version 2.0.

Get involved

We hope you find Debezium interesting and useful, and want to give it a try. Follow us on Twitter@debezium,chat with us on Zulip, or join ourmailing listto talk with the community. All of the code is open sourceon GitHub, so build the code locally and help us improve ours existing connectors and add even more connectors. If you find problems or have ideas how we can improve Debezium, please let us know orlog an issue.

Baidu
map