...
Annotate App Main for State Replication
Code Block | ||
---|---|---|
| ||
@AppHAPolicy(value=AepEngine.HAPolicy.EventSourcing)
public class Application {
...
} |
Inject Message Sender
If your application will send messages, it can add an injection point for the underlying AEPEngine to inject a message sender.
Code Block | ||
---|---|---|
| ||
private AepMessageSender messageSender;
@AppInjectionPoint
final public void setMessageSender(AepMessageSender messageSender) {
this.messageSender = messageSender;
} |
Declare Message Handlers
When working with an application using EventSourcing you can annotate handler methods that accept the received message type with an EventHandler. With event sourcing you do not need to to make Talon aware of your application state it remains completely private to your application.
Code Block | ||
---|---|---|
| ||
private class Repository {
private long counter;
final public void setCounter(final long val) {
counter = val;
}
final public long getCounter() {
return counter;
}
}
@EventHandler
final public void onMessage(Message message) {
// update state
repository.setCounter(repository.getCounter() + 1);
// send event
Event event = Event.create();
event.setVal(message.getVal());
event.setCount(repository.getCounter());
messageSender.sendMessage("events", event);
} |
Keep in mind that:
- All message handlers in your application are executed in a single thread ... there is no need for synchronization.
- You must not do reference any data from the local environment other than your application state when executing business logic that will impact your recoverable state or the outbound messages.
See Preventing Divergence below for some pointers on avoiding divergence.
Configuring the Engine
Register Messaging Factories
...
Declaration in Config DDL
Code Block | ||
---|---|---|
| ||
<app name="processor" mainClass="com.sample.Application">
<messaging>
<factories>
<factory name="com.sample.messages.MessageFactory"/>
</factories>
</messaging>
</app> |
Programmatic Registration
Code Block | ||
---|---|---|
| ||
@AppInjectionPoint
public void initialize(AepEngine engine) {
// for messaging
engine.registerFactory(new com.sample.messages.MessageFactory());
} |
Enable Storage
To actually achieve high availability storage must be configured for the application. The primary means of storage is for Talon apps is through clustered replication to a backup instance. Talon also logs state changes to a disk based transaction log as a fallback mechanism. Storage and persistence can be enabled in the application's configuration xml.
<app name= "processor" mainClass= "com.sample.Application" > ... <storage enabled= "true" > .... <clustering enabled= "true" /> <persistence enabled= "true" > <!-- When using Xbuf encoded entities, detached persist is not supported. --> <detachedPersist enabled= "false" /> </persistence> </storage> </app> |
Enabling clustering allows 2 applications of the same name to discover one another and form an HA cluster. When one or more instances of an application connect to one another one instance is elected as the primary via a leadership election algorithm. The primary member will establish messaging connections and begin invoking message handlers in your application.
...
Code Block | ||||
---|---|---|---|---|
| ||||
private volatile AepEngine engine; @AppInjectionPoint public void initialize(AepEngine engine) { this.engine = engine; } @EventHandler public void onMessage(Message message) { OrderEventMessage event = OrderEventMessage.create(); // the engine time will be the same on primary and backup. event.setTimeReceivedAsTimestamp(engine.getEngineTime()); ... messageSender.send("order-events", message); } |
Message Injection
Because event sourcing applications replicate the inbound message stream to backup instances, it is possible for an application to inject a message into the processing stream to allow HA processing. This can be used in several ways
...
See Scheduling and Injection of Messages for more information.
Environment Replication
Above we showed how to inject the host name from the local environment into the replication stream, but that has the drawback of creating a new transaction. Talon provides a mechanism to record and tunnel local environment data into the current transactions replication stream. This feature is called environment replication and allows application developers to write environment providers that record calls into buffers that are serialized for use on the backup so that they can operate on the same data as the primary. With environment replication the above logic would look like:
Code Block | ||||
---|---|---|---|---|
| ||||
private HostNameEnvironmentProvider hostNameProvider = new HostNameEnvironmentProvider(); private XString hostName = XString.create(256, true, true); @AppInjectionPoint public void initialize(AepEngine engine) { hostNameProvider.register(engine); } @EventHandler public void onMessage(BroadcastHostNameRequest message) { BroadcastHostNameResponse response = BroadcastHostNameResponse.create(); // call into the environment provider to capture // or replay the host name look up. hostNameProvider.getHostNameTo(hostName); // set the host name in the response and send. response.setHostName(hostName); send("hostname-response", response); } |
...