In This Section

Overview

Every page should have an Overview section. This should quickly indicate to the reader whether or not they are looking at the right page. 

 

Coding For State Replication

In the ticker example, the Feeder and the Enricher both use state replication. We'll focus on the Enricher class to demonstrate how to setup state replication in an X application. The Enricher's functionality is pretty straightforward. It receives tick updates from the Feeder on the Tick channel, stores the received updates in a queue which it uses to determine the symbol's historical trend, and then enriches the received Tick messages with its calculated trend info and sends it off on the EnhancedTicks channel where the receiver is listening. The key point here is that the Enricher is operating using the ApplicationState that we modeled in the previous sections, and the platform is transparently handling replication under the covers. In the event of an application failure (such as the one the Sample Induces), processing can resume where it left off: the application state and in doubt state of any messages are transparently and atomically resolved on the backup.

The following sections walk through the code of the Enricher sample discussing the steps for configuring the engine.

Configuring the Engine

Step 1: Register Object Factories

In this sample the same factory class is used for both messages and state. An instance of the object factory has to be registered with both the MessageViewFactoryRegistry and the StoreObjectFactoryRegistry. The latter registry allows the underlying state replication machinery to deserialize replicated state objects based on the ids encoded in the replication stream while the former is used for deserializing replicated messages. In the sample this is done in a Common superclass used by all the applications since this step is common to all of them:

// for messaging
MessageViewFactoryRegistry.getInstance().registerMessageViewFactory(new ObjectFactory()); 
// for state replication:
StoreObjectFactoryRegistry.getInstance(). 
registerObjectFactory(new ObjectFactory());

Step 2: Set up the Store

The object store is what handles state replication. The native store implementation forms clusters with other stores of the same name using a discovery protocol and elects a Primary based on a peer election algorithm. In the snippet below we create a store cluster called "Enricher" (NAME), and save it to the configuration repository.

// store descriptor
StoreDescriptor storeDescriptor = StoreDescriptor.create(NAME);
storeDescriptor.setProviderConfig("native://.");
storeDescriptor.save();

Step 3: Configure the Engine Descriptor

The snippet below shows the configuration of the Enricher's engine descriptor. We create a new descriptor and set the HAPolicy to use StateReplication. When using state replicaton you must also use a MessageSendPolicy of ReplicateBeforeSend which indicates that state must be replicated before outbound messages are sent. This ensures that messages and state are replicated to backups prior to sending messages downstream. This is crucial in state replication because it ensure that inbound messages receipt and mutations to state are stabilized prior to committing the send of outbound messages that result from those mutations.

Finally, we specify a ReplicationPolicy of Pipelined to ensure that replicated state is acknowledged by our backup prior to sending the enriched message on to the Receiver to ensure that we don't produce duplicates.

AepEngineDescriptor descriptor = AepEngineDescriptor.create(NAME);
descriptor.setHAPolicy(AepEngine.HAPolicy.StateReplication);
descriptor.setMessageSendPolicy(AepEngine.MessageSendPolicy.ReplicateBeforeSend);
descriptor.setReplicationPolicy(ReplicationPolicy.Pipelined);
descriptor.setStore(NAME);

 

Unsolicited Senders

For applications that use state replication with an unsolicited sender (i.e. those not done from a message handler) such as the Feeder app, the application should additionally configure the engine to replicate unsolicited sends so that state and messages are replicated for the sender. The Feeder is such an application, so it additionally sets this value to true:

descriptor.setReplicateUnsolicitedSends(true);

Step 4: Provide a StateFactory

An AEPEngine is application state agnostic: it deals with state only abstractly as a replicated graph of object graph node updates using the underlying store. As such it needs to be able to create new application state during the processing of messages. To facilitate this, an engine's constructor takes a IApplicationStateFactory parameter as shown from the Enricher snippet below.

_engine = AepEngine.create(descriptor,
    new IAepApplicationStateFactory() {
       final public ApplicationState createState(final MessageView view) {
         return ApplicationState.create();
       }
    },
    handlers, null, null); 

Configuration Summary

The table below lists the configuration options we have discussed so far:

Table 4: Configuration Options

Configuration Setting

Usage

HAPolicy

StateReplication. Indicates that state replication should be used.

ReplicationPolicy

Asynchronous|Pipelined 

In Asynchronous mode, replicated state does not wait for acknowledgement from peers or flush to persistent storage.
Pipelined mode ensures that data is acknowledged by peers and flushed to disk (if persistence is configured)

Store

The name of the store to use.

This must be supplied to enable Persistence or if StateReplication is to be used.
The store implementation handles replicating state to cluster peers and (optionally) persistence of the state.

ReplicateUnsolicitedSends

True|False
An unsolicited send is one that is not initiated from a message handler. In an app that acts purely as a sender,
this should be set to true to replicate both the sent application messages and the updates to state not done by a message handler.

Creation of Application State

Application state is always created by the AEP engine and is created in one of three different ways:

  1. By a primary engine when it receives the first message of a flow in which case it utilizes the application provided IApplicationStateFactory.
  2. By a backup engine when application state is replicated from a primary.
  3. Explicitly by an application on an AepMessagingPrestartEvent handler.

There are two constraints that need to be met when the state is created explicitly by the application

  1. For performance reasons, the state creation is not thread safe. Thus, the state must be created before the engine's messaging has been started.
  2. The state needs to be created after the engine has bound to the applications's ODS store. This needs to be done so that the created state is attached to the opened store.

The first constraint implies that the state must be created before the engine has started messaging. The second constraint implies that the state needs to be created during or after start (since the engine opens the store during start()) i.e. the only time the state can be explicitly created by the application is between start() and the receipt of the messaging started event. The messaging prestart event is such a point in the flow.

Interacting With Application State

From a message handler

The below snippet comes from the Enricher message handler. From with a message handler application state should be retrieved using the Engine's getApplicationState(MessageView) accessor which retrieves the state associated with the message's flow. In the Ticker sample the flow is defined by the hashCode of the symbol and is set by the Feeder. Flows define the order in which messages are processed in the system. Flows essentially partition traffic into ordered parallel processed streams and, therefore, enable parallelized, concurrent message processing.
Under the covers the Engine ensures that both messages and state are replicated to the backup to prevent loss or duplication in the event of a failure.

//to the enriched channel with a trend.
ApplicationState state = _engine.getApplicationState(tick); 
state.setSno(state.getSno() + 1); 
state.getTickHistory().add(tick); 
.. 
_engine.sendMessage({_}enrichChannel, enriched); 
//Clear out older entries:
if(state.getTickHistory().size() > HISTORY_SIZE) {
    state.getTickHistory().remove();
}

Unsolicited Sends

The Feeder operates slightly differently. Because it is the origin point of the traffic it can't retrieve its state from the Engine using a MessageView, but as mentioned previously Creation of State can only be done from an engine thread. It must therefore create its state from within an AepMessagingPrestartEvent handler. If the Feeder were to operate with multiple flows (and consequently multiple application state instances, it would need to create them all here.

// The messaging prestart event
@EventHandler
public void onMessagingPrestart( AepMessagingPrestartEvent evt) {
	//In a more complex application one might futher initialize
    //The state prior to returning it:
    _state = _engine.getApplicationState(symbol.hashCode()); 
}


The sample application is simple and operates with a single flow based on the symbol. For simplicity we use the hashcode of the symbol name to map this to an integer id representing the flow. The sender creates the state and stores it in a private variable _state for use during runtime. The created state is not threadsafe, so in this context in can only be used by the feeder's thread will sending its messages.

Limitations and Upcoming Enhancements

See State Graph Limitations for a discussion of current state graph limitations.