|
Of Talon's two High Availability models, Event Sourcing requires more discipline to use but can yield better performance particularly for latency sensitive applications. In event sourcing, rather than replicating changes to state object and outbound messages, an application's inbound messages are replicated and replayed to produce the same application state and outbound messages. This section discusses the anatomy of an application coded for Event Sourcing. |
Event Sourcing has several performance advantages that are of particular interest to applications concerns with very low latency. With event sourcing, replication is much cheaper because:
While the premise behind Event Sourcing is simple, it requires discipline on the part of the application developer to ensure that no business logic decisions are made based on any data but that in the inbound messages and the state generated by those message to date, otherwise it would lead to divergent state on the primary and backup members. For example, something as simple as making a logical decision based on System.currentTimeMillis() could lead to the primary instance accepting an order and a backup instance rejecting an order as stale because of clock skew. In the event that the primary instance fails, the order would then not exist on the backup.
Unlike a State Replication application, the Talon does not need to be made aware of or manage your application's state because it reconstituted on recovery purely through message replay. This means that your application is not required to use ADM modeled state. One downside to Event Sourcing is that for an application to recover from a cold start (e.g. no backup running), the application's entire inbound message stream needs to be replayed from disk. For many classes of applications this is not an issue, but for applications that run 24x7x365 with long-lived state such an approach would be infeasible - the disk space and recovery time would be enormous! On Talon's roadmap is a feature that will allow Event Sourcing applications to use ADM modeled state which can be periodically checkpointed in the background to allow older messages in the inbound stream to be discarded.
A basic event sourcing application is straightforward to create and working with event sourcing is very similar to Working with State Replication with the exception that the application does not need to expose a state factory to the underlying AepEngine. Your application state does not even need to be modeled via ADM, because the underlying engine and store do not need to be aware of it.
A quick way to get started is to use the nvx-talon-es-processor-archetype described in Maven Archetype Quick Starts. The general flow for creating an event sourcing application involves:
|
The steps outlined below assume that you have already modeled some messages for your application to use. See the Modeling Message and State sections to get started.
@AppHAPolicy(value=AepEngine.HAPolicy.EventSourcing) public class Application { ... } |
If your application will send messages, it can add an injection point for the underlying AEPEngine to inject a message sender.
private AepMessageSender messageSender; @AppInjectionPoint final public void setMessageSender(AepMessageSender messageSender) { this.messageSender = messageSender; } |
When working with an application using EventSourcing you can annotate handler methods that accept the received message type with an EventHandler. With Event Sourcing, you do not need to to make Talon aware of your application state it remains completely private to your application.
private class Repository { private long counter; final public void setCounter(final long val) { counter = val; } final public long getCounter() { return counter; } } @EventHandler final public void onMessage(Message message) { // update state repository.setCounter(repository.getCounter() + 1); // send event Event event = Event.create(); event.setVal(message.getVal()); event.setCount(repository.getCounter()); messageSender.sendMessage("events", event); } |
Keep in mind that:
See Preventing Divergence below for some pointers on avoiding divergence.
When working with event sourcing only message factories need to be registered with the runtime. The message factories are used for deserializing the application's inbound messages as received from message buses, and also when replaying them from the application's transaction log or during receipt by a backup instance when the inbound messaging stream is replicated from the primary instance. The message factories can be declared in your config.xml or programmatically:
Declaration in Config DDL
<app name="processor" mainClass="com.sample.Application"> <messaging> <factories> <factory name="com.sample.messages.MessageFactory"/> </factories> </messaging> </app> |
Programmatic Registration
@AppInjectionPoint public void initialize(AepEngine engine) { // for messaging engine.registerFactory(new com.sample.messages.MessageFactory()); } |
To actually achieve high availability storage must be configured for the application. The primary means of storage is for Talon apps is through clustered replication to a backup instance. Talon also logs state changes to a disk-based transaction log as a fallback mechanism. Storage and persistence can be enabled in the application's configuration xml.
<app name= "processor" mainClass= "com.sample.Application" > ... <storage enabled= "true" > .... <clustering enabled= "true" /> <persistence enabled= "true" > <!-- When using Xbuf encoded entities, detached persist is not supported. --> <detachedPersist enabled= "false" /> </persistence> </storage> </app> |
Enabling clustering allows two applications of the same name to discover one another and form an HA cluster. When one or more instances of an application connect to one another one instance is elected as the primary via a leadership election algorithm. The primary member will establish messaging connections and begin invoking message handlers in your application.
Enabling persistence causes the replication stream that is sent to backup instances to also be logged locally on disk to a transaction log file. The transaction log file can be used to recover application state from a cold start and is also used to initialize new cluster members that connect to it so when clustering is enabled, persistence must be enabled as well.
There are many configuration knobs that can be used to customize the store's behavior and performance. See DDL Config Reference for a listing of configuration knobs for storage.
Event Sourcing applications must take special care not to reference the local environment to avoid situations that would cause the primary and backup state to diverge. Talon provides several facilities to help with this.
The AepEngine provides a getEngineTime() that is intended for use by applications using event sourcing that do time-dependent message processing. This method provides the current wall time (in millisecond resolution) as perceived by the engine. If invoked from within a message processor handler and the HA policy is set to event sourcing, this method returns the time stamped on the message event (stamped just before the method is dispatched to the application for processing). Since, for event sourced applications, the message is also replicated for parallel processing on the backup, the backup will receive the same time when invoking this method thus ensuring identical processing. If this method is called on an engine operating in state replication mode or called from outside a message processor, then this method will return the value returned by the
java.lang.System.currentTimeMillis
method
private volatile AepEngine engine; @AppInjectionPoint public void initialize(AepEngine engine) { this.engine = engine; } @EventHandler public void onMessage(Message message) { OrderEventMessage event = OrderEventMessage.create(); // the engine time will be the same on primary and backup. event.setTimeReceivedAsTimestamp(engine.getEngineTime()); ... messageSender.send("order-events", message); } |
Because event sourcing applications replicate the inbound message stream to backup instances, it is possible for an application to inject a message into the processing stream to allow HA processing. This can be used in several ways
In the contrived example below, injection is used to broadcast the current hostname in an HA safe fashion.
private volatile AepEngine engine; @AppInjectionPoint public void initialize(AepEngine engine) { this.engine = engine; } @EventHandler public void onMessage(BroadcastHostNameRequest message) { InetAddress.getLocalHost().getHostName(); } @EventHandler public void onMessage(SendHostNameCommand command) { BroadcastHostNameResponse response = BroadcastHostNameResponse.create(); response.setHostName(command.getHostName()); send("hostname-response", response); } |
See Scheduling and Injection of Messages for more information.
Above we showed how to inject the hostname from the local environment into the replication stream, but that has the drawback of creating a new transaction. Talon provides a mechanism to record and tunnel local environment data into the current transaction's replication stream. This feature is called environment replication and allows application developers to write environment providers that record calls into buffers that are serialized for use on the backup so that they can operate on the same data as the primary. With environment replication, the above logic would look like:
private HostNameEnvironmentProvider hostNameProvider = new HostNameEnvironmentProvider(); private XString hostName = XString.create(256, true, true); @AppInjectionPoint public void initialize(AepEngine engine) { hostNameProvider.register(engine); } @EventHandler public void onMessage(BroadcastHostNameRequest message) { BroadcastHostNameResponse response = BroadcastHostNameResponse.create(); // call into the environment provider to capture // or replay the host name look up. hostNameProvider.getHostNameTo(hostName); // set the host name in the response and send. response.setHostName(hostName); send("hostname-response", response); } |
See Environment Replication for more details and authoring environment providers.