|
The X Platform provides the ability to persist and replicate application state. A goal of the platform is to make managing state no more of a burden on developers than simply working with the POJOs (Plain Old Java Objects) that represent the application state model. Under the covers the platform instruments the code to handle persistence and replication of those objects. In the following sections the terms persist and replicate may be used interchangeably; the decision of whether state should be replicated and/or persisted are runtime configuration options.
The platform's Application Data Modeler (ADM) currently provides 2 approaches to instrumenting an application's state model: XML Based Modeling and Programmatic Code Generation. XML based modeling allows state to be modeled using xml and and allows entities and message source code to be generated by build tools. Alternatively, developers can work directly with the ADM API and Model to generate state, though this is a more advanced that isn't recommended without support from Neeve.
Each flow within a Talon serve application is associated with a single object that serves as the root of an applications state graph comprising the entirety of the application's state. The platform transparently handles replication and persistence of this state graph. When fields in the application state are changed the platform intelligently replicates only the deltas in the graph. An important part of designing horizontally scalable applications with the X Platform is the partitioning of message traffic into parallelly processed flows. Flows define the processing order of messages and, in doing so, define the extent of message processing concurrency since they are single threaded in nature. However, being able to shard the traffic into concurrent flows is of no use if the application state on which the flows operate intersect. For seamless scalability and true concurrent processing, the application state itself should be partitioned in a way that allows for each flow to operate on as much an exclusive piece of state as possible. The Talon server’s execution model relies on this for scaling and concurrency.
The Talon server associates an object graph instance with each flow that it manages. Inbound messages are handed for processing to the server with the instance of the object graph associated with the inbound message’s flow. From a design perspective, this translates to deciding the object graph model for the inbound message flows. Applications provide the platform with a StateFactory instance for creating initial application state. At application startup if no persistent state is yet associated with a Flow the state factory provides the initial state to the platform.
In the following sections, this guide will begin with an overview of the ADM Model and its supported types and modeling constructs. Following that it explains how to create an application state model, and how to use it in an X Application utilizing the Enriched Ticker Sample shipped with the platform's samples.
This section discusses the building blocks of the ADM State Model.
The X Platform supports the following built in types which can be used as fields when building composite entity types:
Table 1: Primitive Types
|
Table 2: Built in Types
|
The following built in collection types are supported (where Entity is an X Entity, Message, Enum, primitive or built in type):
Table 3: Collection Types
|
Currently non array Collection types only support inclusion of other entity types. Support for primitive and built in types is planned for a future date. |
†Support for generic List and Map are currently scheduled for release in a future version |
The X Platform supports serialization of enum types and does so by using the ordinal value of the generated enumeration. Therefore the ordering and number of fields is important. Removing a field or changing the order of fields may break compatibility on upgrade when deserializing the enumeration. When modeling an enumeration in xml you must associate a code type with it that serves as the backing constant value. Supported code types are String, char, and int.
Composite data types are defined as entities which can contain fields that are primitive, built in, collection, enumerations, entity or message types.
An application's state is represented as an object graph in which entities represent the nodes. The platform replicates the deltas to this object graph as changes are made to the nodes in the graph. When an entity is set as a field of another object it constitutes an addition to the object graph which is then replicated to other peers. Similarly when a field of an entity is removed (set to null) it constitutes a removal which is then replicated to the peers and possibly persisted.
An entity that serves as the root of an object graph can be marked as root which indicates it will not have any parents in the object tree. A root object should not be declared as the child of another object. Application provided state should not be defined as root objects as the server appends them to an underlying root object internally.
An entity type can be declared as a field which marks it as a leaf in its object graph, and therefore may not contain other entities that are not also declared as fields.
An entity type can be marked as transactional. The platform tracks field updates to transactional entities and can rollback changes made to them in the event of an application exception or an explicit rollback. By default entities and collections are transaction while messages are not. A message can be declared transactional if a message would be stored in application state.
Messages are a special type of entity that additionally implement and expose a MessageView. Messages can contain primitive, built-in and collection and entity fields, but may not contain entity fields (or collection members) that are not themselves declared as fields.
Message fields can be declared as 'reflectable' which enables the field to be accessed through a Message's MapMessageView which is useful in making parts of the message generically available via String key lookup. This can be used, for example, for message key resolution by a message channel.
The restrictions for entity types outlined above also apply to message types. A message behaves like an entity type that is declared with asField=true
The following sections explore the implementation of the enriched ticker sample which is distributed with the platform samples. This sample demonstrates the use of state replication in a simple stock ticker application in which there are 3 parties:
The X Platform can generate persistent classes for use with the runtime from an xml model that conforms to the modeling schema x-adml.xsd which included at the root of the nvx-adm-<version>.jar and also bundle into the nvx-all.jar. Be sure to update your editor's schema validator to reference it.
Below is an empty model file.
<?xml version="1.0"?> <model namespace="com.example.myapp" xmlns="http://www.neeveresearch.com" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <import model="../other/model.xml"/> <factories> <factory name="MyObjectFactory" id="1" /> </factories> <enumerations> <!-- Enumerations --> </enumerations> <messages> <!-- Messages --> </messages> <entities> <!-- Entity Definitions --> </entities> <collections> <!-- Collection Definitions --> </collections> </model> |
Note the "namespace" declaration of com.example.myapp, this will be used as the target package for the generated source files.
The import statement allows you to import another model from a different namespace to use with this model. Imported definitions can then be used within the model, though you may have to provide fully qualified names such as com.example.other.Object when referencing types if there are naming conflicts.
The factories section defines object factories that are used to instantiate the generated object. Each factory in an application must have its own unique factory id. The ID is serialized along with object and used to reconstitute its objects during deserialization.
In the Ticker example we define a single factory that is used to instantiate both Messages and Entities. However, it is perfectly valid to create any number of factories and to separate out the message and entity factories. It is common to separate out the state model from the message model, and in that case there would be separate factories. In the Ticker example the state model references the message model, if we had separated out the two, we would have used an import statement to pull in the message model for use in the state model.
<factories> <factory name="ObjectFactory" id="1"/> </factories> |
Enumeration types can be modeled as follows and can be declared as fields of entity or message types.
In the Ticker sample, we define an enumeration that is used to indicate the current trend of the symbol as calculated by the Enricher for enriched ticks.
<enumerations> <enum name="ChangeType" type="int"> <const name="UP" value="0"/> <const name="DOWN" value="1"/> <const name="FLAT" value="2"/> </enum> </enumerations> |
Generated Enumerations can specify a type which is accessible in the generated code. The type can be int, char or String.
A const should not be removed from an enumeraion, if the newly generated code is expected to deserialize enums that were persisted with an earlier version, in addition the order of constants is important and should not be changed.
Messages are defined in the messages element and must have a unique id within the scope of an application. Fields should not be added, removed, renamed or changed in the order of their declaration to preserve compatibility with objects serialized with the old code.
In the Ticker sample we define a single Tick message that represent a stock's symbol, sequence number and current price as well as a Trend field that is filled in by the Enricher to indicate the symbol's current trend. We could just have well defined another Message type called EnrichedTick, but didn't for the sample to keep things concise.
<messages> <message name="Tick" factoryid="1" id="1"> <field name="sno" type="Long" /> <field name="symbol" type="String" /> <field name="price" type="Float"/> <field name="trend" type="Trend" /> </message> </messages> |
Entities are defined in the entities element of the model and must have a unique id within the scope of the application. Fields should not be added, removed, renamed or changed in the order of their declaration to preserve compatibility in the application being developed to preserve compatibility with objects serialized with the old code.
In the Ticker example we define a single entity to be used as the application state. To keep things simple the same application state object is used across all the participants, though only the Enricher uses the tickHistory field (discussed below). In practice the entity representing the application state can be any entity.
<entities> <entity name="ApplicationState" factoryid="1" id="2"> <field name="tickHistory" type="TickQueue"/> <field name="sno" type="Long"/> </entity> </entities> |
Collections are defined in the collections element and must also have a unique id within the scope of the application. You can declare collections in two ways either inline as the type in a an entity declaration or as their own types.
It is possible also to declare a collection as its own named type which will generate a collection class based on the name provided. This can
<collections> <collection name="MyQueue" is="Queue" contains="MyEntity" factoryid="1" id="5" /> <collection name="MySet" is="Set" contains="MyEntity" factoryid="1" id="6" /> <collection name="MyLongMap" contains="MyEntity" is="LongMap" factoryid="1" id="7" /> <collection name="MyStringMap" contains="CollectionObject" is="LongMap" factoryid="1" id="8" /> </collections> <entity name="MyEntity" factoryid="1" id="1"> <field name="aQueue" type="MyQueue"/> <field name="aMap" type="MyStringMap/> </entity> |
In the Ticker Sample the Enricher uses the TickQueue defined below to store the last 100 received Ticks over which it performs a trend analysis. For simplicity we're storing the received messages but we could have perhaps chosen to define a separate entity that would have a smaller serialization footprint instead.
<collections> <collection name="TickQueue" is="Queue" contains="Tick" factoryid="1" id="3"/> </collections> |
Code generation is done accomplished using the ADMGenerator class distributed with the ADM module (nvx-adm-<vesion>.jar).
ADM code generation currently supports generating classes in the following encoding types :
To run the generator with java you can use:
java –cp nvx-adm-<version>.jar com.neeve.tools.AdmCodeGenerator –f <model-file>.xml |
or you can use the platform jar which contains adm:
java –cp nvx-platform-<version>.jar com.neeve.tools.AdmCodeGenerator –f <model-file>.xml |
To use the code generator with Maven add the following to your pom's plugin section:
<plugins> <plugin> <groupId>com.neeve</groupId> <artifactId>nvx-codegen-plugin</artifactId> <version>1.3</version> <executions> <execution> <id>ADM</id> <phase>generate-sources</phase> <goals> <goal>generate</goal> </goals> <configuration> <modelFile{path-to-model}/model.xml</modelFile> <!—Json, Protobuf, or Native--> <encodingType>Json</encodingType> </configuration> </execution> </plugin> </plugins> |
<target name="gen-model-sources"> <java classname="com.neeve.tools.AdmCodeGenerator" fork="true" failonerror="true" jvm="${build.java.home}/bin/java" classpathref="build.classpath"> <arg value="-e"/> <arg value="Protobuf"/> <arg value="-p"/> <arg value="${src.java}"/> <arg value="-f"/> <arg value="${model.dir}/model.xml"/> <arg value="-o"/> <arg value="${src.java}"/> </java> </target> |
In cases where it isn't feasible to use XML based modeling, it is possible to use the ADM API and its Model directly to generate model source code. In this case you should contact Neeve for assistance as this is only recommended for advanced user with access to support.
In the ticker example, the Feeder and the Enricher both use state replication. We'll focus on the Enricher class to demonstrate how to setup state replication in an X application. The Enricher's functionality is pretty straightforward. It receives tick updates from the Feeder on the Tick channel, stores the received updates in a queue which it uses to determine the symbol's historical trend, and then enriches the received Tick messages with its calculated trend info and sends it off on the EnhancedTicks channel where the receiver is listening. The key point here is that the Enricher is operating using the ApplicationState that we modeled in the previous sections, and the platform is transparently handling replication under the covers. In the event of an application failure (such as the one the Sample Induces), processing can resume where it left off: the application state and in doubt state of any messages are transparently and atomically resolved on the backup.
The following sections walk through the code of the Enricher sample discussing the steps for configuring the engine.
In this sample the same factory class is used for both messages and state. An instance of the object factory has to be registered with both the MessageViewFactoryRegistry and the StoreObjectFactoryRegistry. The latter registry allows the underlying state replication machinery to deserialize replicated state objects based on the ids encoded in the replication stream while the former is used for deserializing replicated messages. In the sample this is done in a Common superclass used by all the applications since this step is common to all of them:
// for messaging MessageViewFactoryRegistry.getInstance().registerMessageViewFactory(new ObjectFactory()); // for state replication: StoreObjectFactoryRegistry.getInstance(). registerObjectFactory(new ObjectFactory()); |
The object store is what handles state replication. The native store implementation forms clusters with other stores of the same name using a discovery protocol and elects a Primary based on a peer election algorithm. In the snippet below we create a store cluster called "Enricher" (NAME), and save it to the configuration repository.
// store descriptor StoreDescriptor storeDescriptor = StoreDescriptor.create(NAME); storeDescriptor.setProviderConfig("native://."); storeDescriptor.save(); |
The snippet below shows the configuration of the Enricher's engine descriptor. We create a new descriptor and set the HAPolicy to use StateReplication. When using state replicaton you must also use a MessageSendPolicy of ReplicateBeforeSend which indicates that state must be replicated before outbound messages are sent. This ensures that messages and state are replicated to backups prior to sending messages downstream. This is crucial in state replication because it ensure that inbound messages receipt and mutations to state are stabilized prior to committing the send of outbound messages that result from those mutations.
Finally, we specify a ReplicationPolicy of Pipelined to ensure that replicated state is acknowledged by our backup prior to sending the enriched message on to the Receiver to ensure that we don't produce duplicates.
AepEngineDescriptor descriptor = AepEngineDescriptor.create(NAME); descriptor.setHAPolicy(AepEngine.HAPolicy.StateReplication); descriptor.setMessageSendPolicy(AepEngine.MessageSendPolicy.ReplicateBeforeSend); descriptor.setReplicationPolicy(ReplicationPolicy.Pipelined); descriptor.setStore(NAME); |
For applications that use state replication with an unsolicited sender (i.e. those not done from a message handler) such as the Feeder app, the application should additionally configure the engine to replicate unsolicited sends so that state and messages are replicated for the sender. The Feeder is such an application, so it additionally sets this value to true:
descriptor.setReplicateUnsolicitedSends(true); |
An AEPEngine is application state agnostic: it deals with state only abstractly as a replicated graph of object graph node updates using the underlying store. As such it needs to be able to create new application state during the processing of messages. To facilitate this, an engine's constructor takes a IApplicationStateFactory parameter as shown from the Enricher snippet below.
_engine = AepEngine.create(descriptor, new IAepApplicationStateFactory() { final public ApplicationState createState(final MessageView view) { return ApplicationState.create(); } }, handlers, null, null); |
The table below lists the configuration options we have discussed so far:
Table 4: Configuration Options
|
Application state is always created by the AEP engine and is created in one of three different ways:
There are two constraints that need to be met when the state is created explicitly by the application
The first constraint implies that the state must be created before the engine has started messaging. The second constraint implies that the state needs to be created during or after start (since the engine opens the store during start()) i.e. the only time the state can be explicitly created by the application is between start() and the receipt of the messaging started event. The messaging prestart event is such a point in the flow.
The below snippet comes from the Enricher message handler. From with a message handler application state should be retrieved using the Engine's getApplicationState(MessageView) accessor which retrieves the state associated with the message's flow. In the Ticker sample the flow is defined by the hashCode of the symbol and is set by the Feeder. Flows define the order in which messages are processed in the system. Flows essentially partition traffic into ordered parallel processed streams and, therefore, enable parallelized, concurrent message processing.
Under the covers the Engine ensures that both messages and state are replicated to the backup to prevent loss or duplication in the event of a failure.
//to the enriched channel with a trend. ApplicationState state = _engine.getApplicationState(tick); state.setSno(state.getSno() + 1); state.getTickHistory().add(tick); .. _engine.sendMessage({_}enrichChannel, enriched); //Clear out older entries: if(state.getTickHistory().size() > HISTORY_SIZE) { state.getTickHistory().remove(); } |
The Feeder operates slightly differently. Because it is the origin point of the traffic it can't retrieve its state from the Engine using a MessageView, but as mentioned previously Creation of State can only be done from an engine thread. It must therefore create its state from within an AepMessagingPrestartEvent handler. If the Feeder were to operate with multiple flows (and consequently multiple application state instances, it would need to create them all here.
// The messaging prestart event @EventHandler public void onMessagingPrestart( AepMessagingPrestartEvent evt) { //In a more complex application one might futher initialize //The state prior to returning it: _state = _engine.getApplicationState(symbol.hashCode()); } |
The sample application is simple and operates with a single flow based on the symbol. For simplicity we use the hashcode of the symbol name to map this to an integer id representing the flow. The sender creates the state and stores it in a private variable _state for use during runtime. The created state is not threadsafe, so in this context in can only be used by the feeder's thread will sending its messages.
There are few limitations to current state model all of which are under consideration for a future version.
Currently an entity may only be placed in the state graph as the field of a single other object. Using the same object instance in the graph in multiple locations is not supported:
ParentObject p1 = EntityFactory.createParentObject(); ParentObject p2 = EntityFactory.createParentObject(); ChildObject c1 = EntityFactory.createChildObject(); p1.setChild(c1); p2.setChild(c1); //Not supported assertTrue(p1.getChild() == p2.getChild()); //fail |
As a corollary to the Single Parent Restriction, cycles in the object graph are not supported (included self references).
ParentObject parent = EntityFactory.createParentObject(); ChildObject child = EntityFactory.createChildObject(); parent.setChild(child); child.setParent(parent); |
Support of graph cycles of non field entities is planned for a future of the platform.
Transactional entities don't currently support more multiple fields of the same entity type. The following is not supported due to current limitations in the transaction machinery.
ParentObject p1 = EntityFactory.createParentObject(); ChildObject c1 = EntityFactory.createChildObject(); p1.setChildA(c1); p1.setChildB(c1); assertTrue(p1.getChildA() == p1.getChildB()); //fail |
Support for Inheritance is under consideration for a future platform version.
<entity name="ObjectA" factoryid="1" id="1"> <field name="field1" type="Integer"/> </entity> <entity name="ObjectB" factoryid="1" id="1" extends=”ObjectA”> <field name="field2" type="Integer"/> </entity> |
Support for inline collection types will be added in a future version of the platform. These use the generic collection implementation built into the platform. Collections must be parameterized with the types that they hold by following the collection type with a bracketenclosed comma separated list of the types.
<entity name="MyEntity" factoryid="1" id="1"> <field name="myList" type="List{CollectionObject}"/> <field name="mySet" type="Set{CollectionObject}"/> <field name="someMap" type="Map{CollectionObject, MapEntry}"/> <field name="aQueue" type="Queue{CollectionObject}"/> </entity> <entity name="CollectionObject" factoryid="1" id="2"> <field name="value" type="Integer" /> <field name="comment" type="String" /> </entity> <entity name="MapEntry" factoryid="1" id="2"> <field name="value" type="Integer[]" /> </entity> |
The following support for annotation based state modeling is planned in an upcoming release.
This annotation specifies that a type should be instrumented for persistence. It supplies factory name, factoryId, type Id and transactional attributes.
This annotation can be applied to fields in a @Persistent annotated class to indicate that the field should not be instrumented for persistence. It is also possible to declare the field with a transient modifier rather than using this annotation they both have the same effect.
The annotated version of the Ticker sample codes would look like the following. Recall from the XML sample that we modeled the Application state as a single entity called ApplicationState with a sequence number of type Long, and a collection that was a Queue to hold the past ticks received by the Enricher. In the annotated analog we simply write a class with these two fields as fields in the class, and mark it with a @Persist annotation that specifies the object factory to use for the entities that the class defines:
@Persist(factoryName="TickerObjectFactory", factoryId = 3, id=1) public class ApplicationState { private Long sno = 0L; private Queue<Tick> tickHistory = new LinkedBlockingQueue<Tick>(); |
At compile time the ADM annotation processor then instruments these classes along with some additional supporting source files. Notably, it creates the source code for the TickerObjectFactory that we declared above which will need to be registered at run time. Under the covers the code generator generated a shadow class that serves as the backing replicated object for replication of the ApplicationState and is used rematerialize the ApplicationState during recovery or replication.
Only types supported by the ADM Model will be instrumented for replication by the annotation processor.
The Annotation Processor is included in the platform's adm jar. The javac compiler should automatically run the processor at compile time using the default java 6 discovery process. If you need to manually control annotation processing the annotation processor is com.neeve.adm.annotations.Processor.