div | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||||||||||
|
Overview
Excerpt |
---|
State Replication is the simpler of Talon's |
two High Availability models. With State Replication, Talon replicates changes to state and the outbound messages emitted by your application's message handlers. In the event of a failover to a backup or a cold start recovery from a transaction log your application's state is available at the same point where processing left off, and the engine will retransmit any outbound messages that were left in doubt as a result of the failure. |
Coding For State Replication
...
- Modeling Messages and State
- Declaring a main class annotated for with HAPolicy of StateReplication
- Providing Talon with a state factory for creating your application state
- Writing message handlers to perform business logic.
...
Code Block | ||
---|---|---|
| ||
@AppHAPolicy(value=AepEngine.HAPolicy.StateReplication) public class Application { ... } |
Provide a StateFactory
An AEPEngine AepEngine is application state agnostic: it deals with your application an 's state as a collection of plain old java object graph objects organized into a state tree with a single object root. Given the root object for your application's state, the underlying store will track changes made to fields on the root (and its descendants) and replicate or persist those changes. As such, an AepEngine needs to be able to create a new application state during when your application is initialized. This is done by finding a method on your main application class anotated annotated with @AppStateFactoryAccessor. The state factory accessor returns a newly initialized set of state for the engine to manage. As messages are processed the engine will pass the relevant state root back into the application to be operated upon.
Code Block | ||
---|---|---|
| ||
@AppStateFactoryAccessor final public IAepApplicationStateFactory getStateFactory() { return new IAepApplicationStateFactory() { @Override final public Repository createState(MessageView view) { return Repository.create(); } }; } |
As your application makes changes to this root object (setting fields etc), the engine will monitor the root and replicate deltas to backup members or disk.
Info |
---|
The state factory should return an empty, uninitialized object. The platform will invoke the state factory during application initialization with a null argument to determine the type of the application's root state, and will subsequently invoke the state factory on receipt of a MessageView when state has not already been created for the application. The application should not store a reference to the application state that it returns ... it is the job of the platform to manage the state tree once it has been created by either passing the state root into an EventHandler or returning it via a call to |
Inject Message Sender
If your application will send messages, it can add an injection point for the underlying AEPEngine AepEngine to inject a message sender.
...
Code Block | ||||
---|---|---|---|---|
| ||||
<app name="processor" mainClass="com.sample.Application"> <messaging> <factories> <factory name="com.sample.messages.MessageFactory"/> </factories> <buses> <bus name="sample-bus"> <channels> <channel name="requests" join="true"> <channel name="events" join="false"> </channeks> </bus> </buses> </messaging> <storage enabled="true"> <factories> <factory name="com.sample.state.StateFactory" /> </factories> </storage> </app> |
...
To actually achieve high availability storage must be configured for the application. The primary means of storage is for Talon apps is through clustered replication to a backup instance. Talon also logs state changes to a disk-based transaction log as a fallback mechanism. Storage and persistence can be enabled in the application's configuration xmlXML.
Code Block |
---|
<app name="processor" mainClass="com.sample.Application"> ... <storage enabled="true"> .... <clustering enabled="true"/> <persistence enabled="true"> <!-- When using Xbuf encoded entities, detached persist is not supported. --> <detachedPersist enabled="false"/> </persistence> </storage> </app> |
...
Enabling persistence causes the replication stream that is sent to backup instances to also be logged locally on disk to a transaction log file. The transaction log file can be used to recover application state from a cold start , and is also used to initialize new cluster members that connect to it so when clustering is enabled, persistence must be enabled as well.
There are many configuration knobs that can be used to customize the store's behavior and performance. See DDL Config Reference for a listing of configuration knobs for storage.
Unsolicited Senders
For applications that use state replication with an unsolicited sender (i.e. those not done from a message handler) such as the Feeder app, the application should additionally configure the engine to replicate unsolicited sends so that state and messages are replicated for the sender. The Feeder is such an application, so it additionally sets this value to true:
Code Block | ||
---|---|---|
| ||
descriptor.setReplicateUnsolicitedSends(true); |
...
Creation of Application State
Application state is always created by the AEP engine and is created in one of three different ways:
- By a primary engine when it receives the first message of a flow, in which case it utilizes the application provided IApplicationStateFactory.
- By a backup engine when application state is replicated from a primary.
- Explicitly by an application on an AepMessagingPrestartEvent handler.
Two constraints need to be met when the state is created explicitly by the application.
- For performance reasons, the state creation is not thread safe. Thus, the state must be created before the engine's messaging has been started.
- The state needs to be created after the engine has bound to the applications's ODS store. This needs to be done so that the created state is attached to the opened store.
The first constraint implies that the state must be created before the engine has started messaging. The second constraint implies that the state needs to be created during or after start (since the engine opens the store during start()) i.e. the only time the state can be explicitly created by the application is between start() and the receipt of the messaging started event. The messaging prestart event is such a point in the flow.
Interacting With Application State
From a message handler
The below snippet comes from the Enricher message handler. With a message handler, application state should be retrieved using the Engine's getApplicationState(MessageView) accessor, which retrieves the state associated with the message's flow. In the Ticker sample, the flow is defined by the hashCode of the symbol and is set by the Feeder. Flows define the order in which messages are processed in the system. Flows essentially partition traffic into ordered parallel processed streams and, therefore, enable parallelized, concurrent message processing.
Under the covers, the Engine ensures that both messages and state are replicated to the backup to prevent loss or duplication in the event of a failure.
Code Block | ||
---|---|---|
| ||
//to the enriched channel with a trend.
ApplicationState state = _engine.getApplicationState(tick);
state.setSno(state.getSno() + 1);
state.getTickHistory().add(tick);
..
_engine.sendMessage({_}enrichChannel, enriched);
//Clear out older entries:
if(state.getTickHistory().size() > HISTORY_SIZE) {
state.getTickHistory().remove();
} |
Unsolicited Sends
The Feeder operates slightly differently. Because it is the origin point of the traffic, it can't retrieve its state from the Engine using a MessageView, but as mentioned previously, Creation of State can only be done from an engine thread. It must therefore create its state from within an AepMessagingPrestartEvent handler. If the Feeder were to operate with multiple flows (and consequently multiple application state instances, it would need to create them all here.
Code Block |
---|
// The messaging prestart event
@EventHandler
public void onMessagingPrestart( AepMessagingPrestartEvent evt) {
//In a more complex application one might futher initialize
//The state prior to returning it:
_state = _engine.getApplicationState(symbol.hashCode());
} |
...
Limitations and Upcoming Enhancements
See State Graph Tree Limitations for a discussion of current state graph tree limitations.