div | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||||||||||
|
Overview
...
Excerpt |
---|
State Replication |
...
In the ticker example, the Feeder and the Enricher both use state replication. We'll focus on the Enricher class to demonstrate how to setup state replication in an X application. The Enricher's functionality is pretty straightforward. It receives tick updates from the Feeder on the Tick channel, stores the received updates in a queue which it uses to determine the symbol's historical trend, and then enriches the received Tick messages with its calculated trend info and sends it off on the EnhancedTicks channel where the receiver is listening. The key point here is that the Enricher is operating using the ApplicationState that we modeled in the previous sections, and the platform is transparently handling replication under the covers. In the event of an application failure (such as the one the Sample Induces), processing can resume where it left off: the application state and in doubt state of any messages are transparently and atomically resolved on the backup.
The following sections walk through the code of the Enricher sample discussing the steps for configuring the engine.
Configuring the Engine
Step 1: Register Object Factories
In this sample the same factory class is used for both messages and state. An instance of the object factory has to be registered with both the MessageViewFactoryRegistry and the StoreObjectFactoryRegistry. The latter registry allows the underlying state replication machinery to deserialize replicated state objects based on the ids encoded in the replication stream while the former is used for deserializing replicated messages. In the sample this is done in a Common superclass used by all the applications since this step is common to all of them:
Code Block | ||
---|---|---|
| ||
// for messaging
MessageViewFactoryRegistry.getInstance().registerMessageViewFactory(new ObjectFactory());
// for state replication:
StoreObjectFactoryRegistry.getInstance().
registerObjectFactory(new ObjectFactory()); |
Step 2: Set up the Store
The object store is what handles state replication. The native store implementation forms clusters with other stores of the same name using a discovery protocol and elects a Primary based on a peer election algorithm. In the snippet below we create a store cluster called "Enricher" (NAME), and save it to the configuration repository.
Code Block |
---|
// store descriptor
StoreDescriptor storeDescriptor = StoreDescriptor.create(NAME);
storeDescriptor.setProviderConfig("native://.");
storeDescriptor.save(); |
Step 3: Configure the Engine Descriptor
The snippet below shows the configuration of the Enricher's engine descriptor. We create a new descriptor and set the HAPolicy to use StateReplication. When using state replicaton you must also use a MessageSendPolicy of ReplicateBeforeSend which indicates that state must be replicated before outbound messages are sent. This ensures that messages and state are replicated to backups prior to sending messages downstream. This is crucial in state replication because it ensure that inbound messages receipt and mutations to state are stabilized prior to committing the send of outbound messages that result from those mutations.
Finally, we specify a ReplicationPolicy of Pipelined to ensure that replicated state is acknowledged by our backup prior to sending the enriched message on to the Receiver to ensure that we don't produce duplicates.
Code Block | ||
---|---|---|
| ||
AepEngineDescriptor descriptor = AepEngineDescriptor.create(NAME);
descriptor.setHAPolicy(AepEngine.HAPolicy.StateReplication);
descriptor.setMessageSendPolicy(AepEngine.MessageSendPolicy.ReplicateBeforeSend);
descriptor.setReplicationPolicy(ReplicationPolicy.Pipelined);
descriptor.setStore(NAME); |
Unsolicited Senders
For applications that use state replication with an unsolicited sender (i.e. those not done from a message handler) such as the Feeder app, the application should additionally configure the engine to replicate unsolicited sends so that state and messages are replicated for the sender. The Feeder is such an application, so it additionally sets this value to true:
Code Block | ||
---|---|---|
| ||
descriptor.setReplicateUnsolicitedSends(true); |
Step 4: Provide a StateFactory
An AEPEngine
is the simpler of Talon's two High Availability models. With State Replication, Talon replicates changes to state and the outbound messages emitted by your application's message handlers. In the event of a failover to a backup or a cold start recovery from a transaction log your application's state is available at the same point where processing left off, and the engine will retransmit any outbound messages that were left in doubt as a result of the failure. |
Coding For State Replication
A basic state replication application is straightforward. A quick way to get started is to use the nvx-talon-sr-processor-archetype described in Talon Maven Archetypes. The general flow for creating a state replication application involves:
- Modeling Messages and State
- Declaring a main class annotated with HAPolicy of StateReplication
- Providing Talon with a state factory for creating your application state
- Writing message handlers to perform business logic.
A Basic App
Model Application State
The steps outlined below assume that you have already modeled some messages and state for your application to use. See the Modeling Message and State sections to get started.
Info |
---|
When modeling application state, Xbuf encoding is not yet recommended, Protobuf encoding should be used instead (Xbuf currently has a higher memory footprint than Protobuf generated entities). |
Annotate App Main for State Replication
Code Block | ||
---|---|---|
| ||
@AppHAPolicy(value=AepEngine.HAPolicy.StateReplication)
public class Application {
...
} |
Provide a StateFactory
An AepEngine is application state agnostic: it deals with your application's state only abstractly as a replicated graph of object graph node updates using the underlying store. As such it collection of plain old java objects organized into a state tree with a single object root. Given the root object for your application's state, the underlying store will track changes made to fields on the root (and its descendants) and replicate or persist those changes. As such, an AepEngine needs to be able to create a new application state during the processing of messages. To facilitate this, an engine's constructor takes a IApplicationStateFactory parameter as shown from the Enricher snippet below.when your application is initialized. This is done by finding a method on your main application class annotated with @AppStateFactoryAccessor. The state factory accessor returns a newly initialized set of state for the engine to manage. As messages are processed the engine will pass the relevant state root back into the application to be operated upon.
Code Block | ||
---|---|---|
| ||
_engine@AppStateFactoryAccessor = AepEngine.create(descriptor, final public IAepApplicationStateFactory getStateFactory() { return new IAepApplicationStateFactory() { @Override final public ApplicationStateRepository createState(final MessageView view) { return ApplicationStateRepository.create(); } }, handlers, null, null); |
Configuration Summary
The table below lists the configuration options we have discussed so far:
Align | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||||
Table 4: Configuration Options
|
...
Application state is always created by the AEP engine and is created in one of three different ways:
- By a primary engine when it receives the first message of a flow in which case it utilizes the application provided IApplicationStateFactory.
- By a backup engine when application state is replicated from a primary.
- Explicitly by an application on an AepMessagingPrestartEvent handler.
There are two constraints that need to be met when the state is created explicitly by the application
- For performance reasons, the state creation is not thread safe. Thus, the state must be created before the engine's messaging has been started.
- The state needs to be created after the engine has bound to the applications's ODS store. This needs to be done so that the created state is attached to the opened store.
The first constraint implies that the state must be created before the engine has started messaging. The second constraint implies that the state needs to be created during or after start (since the engine opens the store during start()) i.e. the only time the state can be explicitly created by the application is between start() and the receipt of the messaging started event. The messaging prestart event is such a point in the flow.
Interacting With Application State
From a message handler
The below snippet comes from the Enricher message handler. From with a message handler application state should be retrieved using the Engine's getApplicationState(MessageView) accessor which retrieves the state associated with the message's flow. In the Ticker sample the flow is defined by the hashCode of the symbol and is set by the Feeder. Flows define the order in which messages are processed in the system. Flows essentially partition traffic into ordered parallel processed streams and, therefore, enable parallelized, concurrent message processing.
Under the covers the Engine ensures that both messages and state are replicated to the backup to prevent loss or duplication in the event of a failure.
Code Block | ||
---|---|---|
| ||
//to the enriched channel with a trend. ApplicationState state = _engine.getApplicationState(tick); state.setSno(state.getSno} |
As your application makes changes to this root object (setting fields etc), the engine will monitor the root and replicate deltas to backup members or disk.
Info |
---|
The state factory should return an empty, uninitialized object. The platform will invoke the state factory during application initialization with a null argument to determine the type of the application's root state, and will subsequently invoke the state factory on receipt of a MessageView when state has not already been created for the application. The application should not store a reference to the application state that it returns ... it is the job of the platform to manage the state tree once it has been created by either passing the state root into an EventHandler or returning it via a call to |
Inject Message Sender
If your application will send messages, it can add an injection point for the underlying AepEngine to inject a message sender.
Code Block | ||
---|---|---|
| ||
private AepMessageSender messageSender;
@AppInjectionPoint
final public void setMessageSender(AepMessageSender messageSender) {
this.messageSender = messageSender;
} |
Declare Message Handlers
When working with an application using StateReplication, the underlying AepEngine will pass in the root object for your application state along with the message. Outbound message sends and state changes are managed by Talon as an atomic unit of work.
Code Block | ||
---|---|---|
| ||
@EventHandler final public void onMessage(Message message, Repository repository) { // update state repository.setCounter(repository.getCounter() + 1); state.getTickHistory().add(tick) // send event Event event = Event.create(); .. _engine event.sendMessage({_}enrichChannel, enriched)setVal(message.getVal()); //Clear out older entries: if(state.getTickHistory().size() > HISTORY_SIZE) { event.setCount(repository.getCounter()); statemessageSender.getTickHistory().remove()sendMessage("events", event); } |
Unsolicited Sends
The Feeder operates slightly differently. Because it is the origin point of the traffic it can't retrieve its state from the Engine using a MessageView, but as mentioned previously Creation of State can only be done from an engine thread. It must therefore create its state from within an AepMessagingPrestartEvent handler. If the Feeder were to operate with multiple flows (and consequently multiple application state instances, it would need to create them all here.
Code Block |
---|
// The messaging prestart event
@EventHandler
public void onMessagingPrestart( AepMessagingPrestartEvent evt) {
//In a more complex application one might futher initialize
//The state prior to returning it:
_state = _engine.getApplicationState(symbol.hashCode());
} |
...
Limitations and Upcoming Enhancements
There are few limitations to current state model all of which are under consideration for a future version.
Single Parent Restriction
Currently an entity may only be placed in the state graph as the field of a single other object. Using the same object instance in the graph in multiple locations is not supported:
Code Block | ||
---|---|---|
| ||
ParentObject p1 = EntityFactory.createParentObject();
ParentObject p2 = EntityFactory.createParentObject();
ChildObject c1 = EntityFactory.createChildObject();
p1.setChild(c1);
p2.setChild(c1); //Not supported
assertTrue(p1.getChild() == p2.getChild()); //fail
|
State Graph Cycles
As a corollary to the Single Parent Restriction, cycles in the object graph are not supported (included self references).
Code Block | ||
---|---|---|
| ||
ParentObject parent = EntityFactory.createParentObject();
ChildObject child = EntityFactory.createChildObject();
parent.setChild(child);
child.setParent(parent);
|
Support of graph cycles of non field entities is planned for a future of the platform.
Multiple Entity Fields of Same Type
Transactional entities don't currently support more multiple fields of the same entity type. The following is not supported due to current limitations in the transaction machinery.
Code Block | ||
---|---|---|
| ||
ParentObject p1 = EntityFactory.createParentObject();
ChildObject c1 = EntityFactory.createChildObject();
p1.setChildA(c1);
p1.setChildB(c1);
assertTrue(p1.getChildA() == p1.getChildB()); //fail |
Inheritance
Support for Inheritance is under consideration for a future platform version.
Code Block | ||
---|---|---|
| ||
<entity name="ObjectA" factoryid="1" id="1">
<field name="field1" type="Integer"/>
</entity>
<entity name="ObjectB" factoryid="1" id="1" extends=”ObjectA”>
<field name="field2" type="Integer"/>
</entity>
|
Generic Collections Support and Inline Collections
Support for inline collection types will be added in a future version of the platform. These use the generic collection implementation built into the platform. Collections must be parameterized with the types that they hold by following the collection type with a bracketenclosed comma separated list of the types.
Code Block | ||
---|---|---|
| ||
<entity name="MyEntity" factoryid="1" id="1">
<field name="myList" type="List{CollectionObject}"/>
<field name="mySet" type="Set{CollectionObject}"/>
<field name="someMap" type="Map{CollectionObject, MapEntry}"/>
<field name="aQueue" type="Queue{CollectionObject}"/>
</entity>
<entity name="CollectionObject" factoryid="1" id="2">
<field name="value" type="Integer" />
<field name="comment" type="String" />
</entity>
<entity name="MapEntry" factoryid="1" id="2">
<field name="value" type="Integer[]" />
</entity> |
Support for Annotation Based Modeling
The following support for annotation based state modeling is planned in an upcoming release.
Annotations
@Persist
This annotation specifies that a type should be instrumented for persistence. It supplies factory name, factoryId, type Id and transactional attributes.
@Transient
This annotation can be applied to fields in a @Persistent annotated class to indicate that the field should not be instrumented for persistence. It is also possible to declare the field with a transient modifier rather than using this annotation they both have the same effect.
The Ticker Sample on Annotations
The annotated version of the Ticker sample codes would look like the following. Recall from the XML sample that we modeled the Application state as a single entity called ApplicationState with a sequence number of type Long, and a collection that was a Queue to hold the past ticks received by the Enricher. In the annotated analog we simply write a class with these two fields as fields in the class, and mark it with a @Persist annotation that specifies the object factory to use for the entities that the class defines:
Code Block | ||
---|---|---|
| ||
@Persist(factoryName="TickerObjectFactory", factoryId = 3, id=1)
public class ApplicationState {
private Long sno = 0L;
private Queue<Tick> tickHistory = new LinkedBlockingQueue<Tick>(); |
At compile time the ADM annotation processor then instruments these classes along with some additional supporting source files. Notably, it creates the source code for the TickerObjectFactory that we declared above which will need to be registered at run time. Under the covers the code generator generated a shadow class that serves as the backing replicated object for replication of the ApplicationState and is used rematerialize the ApplicationState during recovery or replication.
Field Types supported for persistence
Only types supported by the ADM Model will be instrumented for replication by the annotation processor.
Annotation Processing
The Annotation Processor is included in the platform's adm jar. The javac compiler should automatically run the processor at compile time using the default java 6 discovery process. If you need to manually control annotation processing the annotation processor is com.neeve.adm.annotations.Processor.
Configuring the Engine
Register Object Factories
When working with state replication both the ADM and message and state object factories need to be registered with the runtime. Registering the state factory allows the underlying state replication machinery to deserialize replicated state objects based on the ids encoded in the replication stream. The message factories allow are used for deserializing replicated outbound messages as well as messages received from message buses. The state factories can be declared in your config.xml or programmatically:
Code Block | ||||
---|---|---|---|---|
| ||||
<app name="processor" mainClass="com.sample.Application">
<messaging>
<factories>
<factory name="com.sample.messages.MessageFactory"/>
</factories>
<buses>
<bus name="sample-bus">
<channels>
<channel name="requests" join="true">
<channel name="events" join="false">
</channeks>
</bus>
</buses>
</messaging>
<storage enabled="true">
<factories>
<factory name="com.sample.state.StateFactory" />
</factories>
</storage>
</app> |
Code Block | ||||
---|---|---|---|---|
| ||||
@AppInjectionPoint
public void initialize(AepEngine engine) {
// for messaging
engine.registerFactory(new com.sample.messages.MessageFactory());
// for state replication:
engine.registerFactory(new com.sample.state.StateFactory());
} |
Enable Storage
To actually achieve high availability storage must be configured for the application. The primary means of storage is for Talon apps is through clustered replication to a backup instance. Talon also logs state changes to a disk-based transaction log as a fallback mechanism. Storage and persistence can be enabled in the application's configuration XML.
Code Block |
---|
<app name="processor" mainClass="com.sample.Application">
...
<storage enabled="true">
....
<clustering enabled="true"/>
<persistence enabled="true">
<!--
When using Xbuf encoded entities,
detached persist is not supported.
-->
<detachedPersist enabled="false"/>
</persistence>
</storage>
</app> |
Enabling clustering allows 2 applications of the same name to discover one another and form an HA cluster. When one or more instances of an application connect to one another one instance is elected as the primary via a leadership election algorithm. The primary member will establish messaging connections and begin invoking message handlers in your application.
Enabling persistence causes the replication stream that is sent to backup instances to also be logged locally on disk to a transaction log file. The transaction log file can be used to recover application state from a cold start and is also used to initialize new cluster members that connect to it so when clustering is enabled, persistence must be enabled as well.
There are many configuration knobs that can be used to customize the store's behavior and performance. See DDL Config Reference for a listing of configuration knobs for storage.
Limitations and Upcoming Enhancements
See State Tree Limitations for a discussion of current state tree limitations.