In This Section
In this section, we walk through a simple Talon app. The app performs the following function:
- It maintains a running counter of messages received.
- It receives messages, increment the counter, and sends an outbound message containing the counter.
The Starter App
Click here to get and build the starter app that you use to follow this tutorial. The starter app project contains the processor micro app and a pair of driver micro apps (the sender and receiver driver). The sender sends messages. The processor receives each message, increments the message count it stores in its state, and sends an outbound event which the receiver receives. The sender is configured to send 10k messages @1k messages/sec.
A Talon App In a Nutshell
A Talon micro app is comprised of the following
- An XML describing the app state model
- An XML describing the app messages
- Main Class
- A Java class file designated as the app's "main" class i.e. the class that Talon identifies the application by and instantiates as part of the initializing of the application
- Injection/Query Points
- Methods invoked by the Talon runtime at various points during the lifecycle of an application.
- Event Handlers
- A bunch of annotated methods to handle notification events dispatched by the Talon runtime.
- Message Handlers
- A bunch of annotated methods to handle inbound messages
- Message handlers contain the application's functional logic
- Main Class
- An XML (DDL) containing Talon and, optionally, application configuration.
The following is the starter app state model
The above describes a state model containing a single root Repository object that, in turn, has a single counter field of type long.
The following is the starter app message model
The above model describes two messages - the Message message and the Event message.
- Message has a single count field of type long
- Event has a count field of type long and a comment field of type String
In a multi-agent system, message definitions are often a shared artifact. Although in the starter app, the messages are defined in the app's project, Talon supports defining messages in a separate project and making the messages available to the app via Maven dependencies. In this way, message definitions shared by various agents in the multi-agent system can be defined in a central place and imported by the various agents
The above is the app's main class.
onMessage()method is the handler for the inbound message
getStateFactory()method is an example of a Query point. The Talon runtime queries this method at the appropriate point in the app's lifecycle to get the app's state root.
setMessageSender()method is an example of an Injection point. The Talon runtime invokes this method at the appropriate point in the app's lifecycle to provide the app with an instance of a message sender
- This starter app does not have an example of a notification event handler.
It is often desirable to sprinkle message and event handlers across class files other than the main class. This allows for a more flexible application code structure. Although the starter app's message handler is in the app's main class, Talon allows for event and message handlers to be sprinkled across classes other than the main class. To do so, the main class needs to implement an "event handler containers" query point through which the Talon runtime can get the set of objects that contain event and message handlers. See Programming Talon Applications for more information
Bringing State, Message and Code Together
As you see above, state and message definitions are in XML while application logic is in Java. This is accomplished via build plugins that generate state and message POJOs from the XMLs for use by the application message handlers. See What is Talon and the <Project Root>/pom.xml for how the plugin is configured.
The app's configuration DDL can be found at <Project Root>/conf/config.xml. This file contains Talon runtime and application configuration. Talon configuration is designed around providing a lot of flexibility to localizing an application to be able to run in multiple different environments. We'll touch more on configuration below.
Run the App
Now that you have understood the basics of and have built the micro app, let's run it. Micro apps are launched and their lifecycle managed by a Talon XVM. A Talon XVM can be launched standalone or embedded. A JVM, XVM and a micro app are related as follows:
- One or more micro apps can be managed by a single XVM.
- One or more XVMs can be run in a single JVM.
- Multiple XVMs can be run in a JVM only if the XVMs are run embedded. If running standalone, a JVM contains a single XVM.
In this section, we run the sender, the receiver, and processor in independent XVMs since we want to demonstrate HA, performance, and micro app management/monitoring features. See the following test case in the starter app for information on how to run the apps in a single JVM as embedded XVMs.
When you build the app below (if you have not already built it) you will notice that this unit is run. If you are using eclipse, you may also right click on the above test case and run it from the IDE using Run As -> JUnit Test.
Mac Users: Multicast Discovery Considerations
The starter app is configured to use multicast discovery to allow running XVMs to discovery one another and form HA Clusters. If you have multiple network interfaces on your laptop, are on a Mac or on a network with other people running the starter app, take a look at the notes on Multicast Discovery Configuration before continuing.
Free License Memory Limit
The free license limits max heap usage to 8Gb. If you are running on a system with a lot of memory and you are using a Free License, you may need to limit max heap usage by setting the -Xmx1g parameter when launching the applications below.
The X Platform currently supports Java 6 - 8. We recommend running with a Java 8 vm on your path:
Package The App
If you haven't built the app yet, be sure to first compile and package it into jars before trying to launch the apps below:
Start the Receiver
Execute the following from a command shell to start the receiver.
You should see the following trace. This indicates that the receiver has successfully started.
Start the Processor
Execute the following from a command shell to start the processor
You should see the following trace. This indicates that the processor has successfully started
Start the Sender
Execute the following from a command shell to start the sender.
You should see the following trace. This indicates that the sender has successfully started.
Run the Sender
The sender is programmed to wait for user input before sending any messages. Hit <Enter> once you see the "Press Enter to send 10000 messages" message above. This will trigger the sending of 10k messages at 1k msgs/sec. You should see the following trace on the sender, processor, and receiver
What just happened here?
Talon gets all its information to configure its runtime from an XML based configuration file that adheres to the X-DDL (Deployment Descriptor Language) syntax. Application configuration can also be placed in the DDL and accessed by an X API. Talon can use this information to determine, for example, how to connect to messaging providers, what subscriptions to issue, and what HA models to employ. A DDL is laid out as follows:
The Talon starter app's configuration is in the <Project Root>/conf/config.xml DDL file. When we launched the application above we saw an example of using the 'desktop' profile by running with `-p desktop` which customizes the configuration for running on a desktop or laptop.
You can browse through the DDL to get additional insight into the Talon machinery as we execute this tutorial. You can find general information about how configuration works at Understanding Configuration. You can find detailed documentation of the DDL in the DDL Schema Reference. When we launched above the -n <xvmName> determines which XVM to configure and launch and the -p desktop activates a configuration profile that tunes the launch for running on a developer workstation.
The Talon runtime in each of the micro apps connects to an underlying messaging fabric. Talon uses a Simple Messaging Abstraction (SMA) layer to connect to the messaging fabric. SMA allows for a micro app to switch between messaging providers purely via configuration.
In this starter app, each micro app was configured to use the Direct messaging, which is the messaging provider that comes with Talon. Currently, X supports other messaging providers such as JMS (ActiveMQ and others) and Solace out of the box. Work to implement SMA bindings for other providers is continually underway.
Messaging is accomplished via buses and channels. A bus is a logical grouping of channels and a channel is an ordered conduit using which micro apps exchange messages. An app connects to a bus and sends a message to a channel, and other micro apps join channels of interest to receive messages published on those channels. Talon picks up information about buses, such as what channels they contain, what is the QOS for the channel, how to connect to the bus, and information about the app interest in buses and channels from configuration (see below). Talon uses this information to connect to the buses on the application's behalf, perform whatever subscriptions are necessary and, essentially, prepare the messaging connections so that apps can just implement message handlers to which Talon dispatches inbound messages and send messages to channels in a fire-and-forget manner.
The starter app has two logical buses: a processor bus and an events bus. The processor bus contains a single requests channel and the events bus contains the events channel. The sender connects to the processor bus and sends messages to the requests channel; the receiver connects to the events bus and joins the events channel. The processor connects to both buses, joins the requests channel and sends events to the events bus.
Single Bus Configuration
The starter app uses a multi bus configuration for its collaboration between the micro apps. However, it could very well have used a single bus configuration, shown as follows.
Understanding Recovery - Kill the Processor!
Let's repeat the steps above in Running the Starter App, but this time we kill the processor mid-stream. The sender is programmed to wait for user input to send a new batch of messages after it has finished sending its previous batch. If you killed the sender/receiver/processor, you should restart them for this test. Otherwise, just continue to the following steps.
- Hit ENTER to start the sender.
- After a couple of seconds hit CTRL-C in the processor window.
- You will notice that the sender and receiver pause, detecting that the processor has failed.
- Restart the processor as instructed previously.
- You will notice that the sender and receiver resume.
- Once the sender completes, the receiver should report receipt of the full additional 10k messages and the processor reports 10k added to its state counter.
A few things to note:
- The receiver received all additional 10k messages indicates that Talon has facilitated no message loss.
- The processor reports 10k added to its state indicating no loss to its state even though it failed.
What Just Happened Here?
Storage and Logs
Talon maintains a transaction log for each micro app.
Salient aspects of storage and the transaction log:
- The transaction log is an ultra-high performance journal that stores state changes and messages, ensuring zero data and message loss and full recoverability from planned and unplanned downtimes.
- The transaction log is managed by ODS (Operational Data Store). ODS is responsible for application persistence and clustering. The Talon runtime provisions an instance of ODS for each micro app.
- If the application uses POJOs for state, then Talon uses the Event Sourcing HA model. In this model, inbound messages are journaled to the transaction log and replayed on startup to reinitialize application state.
- If the application uses Talon to generate application state (we'll see more about this a little later in the Programming Model section), then Talon uses the State Replication HA model for the application. In this model, ODS transparently hooks into the application state and records changes to the application state as and when the application makes the changes and journals the changes to the log at transaction boundaries (transactions are demarcated by one or more inbound messages). The journal is replayed on startup to recover application state.
- In addition to recovery, since the transaction log records all changes to application state, the log can also be used for audit purposes.
- Talon provides an API and tool to query the transaction log using SQL like syntax. We will see log querying in action later in this tutorial.
- Talon prevents unbounded growth of a transaction log through the process of compaction. Compaction runs parallel to application operation. We will see compaction in action later in this tutorial.
- Talon supports the ability to asynchronously siphon records from a transaction log using the process of CDC (Change Data Capture). CDC is an API that can be used to "tail" the log. We will see CDC in action later in this tutorial.
The starter app project uses State Replication. The transaction log for the processor micro app can be found in the <project root>/rdat folder. You will not find transaction logs for the sender and receiver micro apps since persistence is disabled for those apps (via configuration which is introduced below).
As mentioned earlier, Talon ensures exactly once processing of messages across failures without message or data loss. This is not possible without some kind of transaction processing that transactionally combines guaranteed messaging (aka "at least once" delivery) with storage. To ensure exactly once delivery, Talon needs to take:
- inbound message metadata,
- changes to state made by the processing of the inbound message,
- and outbound messages dispatched by the processing of the inbound message,
and then atomically "stabilize" this information before dispatching the outbound messages downstream and acknowledging the inbound message upstream. This is the job of the Talon "AEP Engine" that underlies every Talon application.
Transaction Processing and Event Sourcing
The above applies to the State Replication HA Model. For Event Sourcing, the AEP engine only "stabilizes" the inbound message. State and outbound message reconstitution happens via message replay and does not have to be persisted.
Messaging + AEP + Storage = Transactional Messaging!
Armed with the above information, let's briefly outline what happened in the above test.
- On each inbound message, Talon journaled the following information as a transactional unit to the transactional log:
- inbound message metadata (sequence number, ....),
- changes to state made by the processor,
- and event message sent outbound.
- On the restart after failure, Talon replayed the transaction log to reconstitute its state as it was before the failure. This is reflected in the following trace output by the processor after restart (recovered 11982 transactions from the transaction log).
Talon used the transaction processing flow described above to combine guaranteed (aka 'at least once') message delivery implemented by the messaging fabric with journalling based storage to implement transactional messaging (aka 'exactly once') processing semantics. This is what makes it really easy to author Talon micro apps without compromise of the reliability of the system.
Guaranteed messaging + AEP + journalling based storage provides full resilience of process failures, but it does not handle high availability in the event of machine or data center failures. Additionally, with modern networks, disk writes do not perform as well as network writes. Talon enables micro apps to be clustered. When clustered, AEP performs "pipelined" replication — non-blocking replication with positive acknowledgments — to achieve:
- (1) memory to memory persistence at massive throughputs and ultra-low latencies not achievable on modern disks,
- and (2) the additional benefit of being able to recover from machine failures without loss.
Clustering is implemented using shared nothing persistence, which results in redundant storage of application data that provides for even higher system resilience.
Clustering a Micro App
Clustering a micro app is as simple as launching another instance of the micro app. Talon runs a discovery protocol by which X entities can discover each other. Clusters are formed by micro apps with the same name on a common discovery network. To cluster the processor micro app in the starter app, do the following:
- Make sure the sender, receiver, and processor micro apps are launched and running.
- Run the following command from a new command shell.
- The above command will launch a different XVM (processor-2) that launches a new instance of the processor micro app. These apps have the same name, so they discover each other. Also, the new instance is elected as a backup and it initializes its state from the primary. The following trace shows that the new instance has assumed the role of a backup. The trace that follows, from the primary, shows that it has initialized the new cluster member.
- Hit ENTER on the sender command window to trigger the sending of another 10k of messages. Now, with a cluster backup running, all state is being replicated in real time to the backup. Depending on the speed of your disk, you may notice that message processing is faster – this is because memory to memory replication is fast!
Killing the Primary in the Cluster
Now that we have a cluster running, let's see what happens when we kill the primary while the sender is sending messages. Do the following:
- Hit ENTER on the sender command window to trigger the sending of another batch of 10k messages.
- Wait for a couple of seconds and then hit CTRL-C in the primary processor window.
You will notice that the sender continues to send uninterrupted, and the receiver continues to receive messages uninterrupted. This is because the cluster backup detected the primary failure and immediately took over as primary. The sender and receiver also detected the primary failure and, this time, reconnected immediately to the new primary and continue to send messages. Note that 10k additional messages were sent, 10k was added to the processor state and 10k additional messages were received by the receiver. In other words, there was immediate failover for uninterrupted operation and there was no message or data loss.
Understanding Inter Cluster Replication
Clustering is intended for HA within a single data center. Technically speaking, clustering can be used across data centers. Generally, HA across data centers is better served by asynchronous replication over a messaging backbone rather than a tightly coupled TCP connection. In Talon, this feature is called Inter Cluster Replication (ICR). ICR is a general facility for using which micro app transaction logs can be replicated asynchronously to remote locations. Using ICR, events journaled to a cluster transaction log are moved over a reliable messaging backbone to remote ICR Backups, where the in-memory state and/or logs are reconstituted.
ICR can be used for a multitude of use cases, such as:
- Inter-data center HA.
- Replication to a "query" cluster where the transaction log and/or in-memory state can be coupled with indexes for ultra-fast querying.
- This architectural pattern to serve update and query flows using loosely coupled and eventually consistent micro apps is a very robust mechanism to update and query the same state without compromise.
- Consolidation of transaction logs to a centralized location for combined querying (the transaction log querying capability described below supports querying across multiple logs).
Seeing ICR in Action
If you are interested in seeing ICR in action, you can do the following. Otherwise, you can skip ahead to take a look at Querying Transaction Logs.
- Kill (CTRL-C) the sender, receiver, and processor micro apps, if they are running.
- Delete the transaction logs (i.e. the <project root>/rdat directory).
- To demonstrate how easy it is to switch to using a different messaging provider, we will use ActiveMQ for ICR.
- Before proceeding to the next step, please install and launch the ActiveMQ broker on your local machine. You can download ActiveMQ from here.
- Start the ICR receiver. The ICR receiver is nothing but another processor instance that is configured a little differently.
- Start the receiver, the processor cluster, and the sender as done in prior runs but this time launch the processor-1 and processor-2 instances with -p desktop,icr to configure them to publish over ICR.
- Hit ENTER on the sender window to trigger sending of messages.
- Wait for a few seconds, then kill (CTRL-C) the primary processor.
- Note that the processor cluster fails over and the sender, the processor, and the receiver process the full 10k messages without data or message loss.
- Although not apparent from the trace, while the primary was running, it was sending messages through the ActiveMQ bus, and the ICR Receiver received and logged the messages. When the primary was killed, the backup resumed that task in a manner such that the receiver did not lose any messages or state. After you have gone through the section on Querying Transaction Logs below, you could open the "rdat\processor-3" folder in the tool and issue select count(*) from logs WHERE simpleClassName = 'Repository' to verify that all of the updates were processed.
Querying Transaction Logs
In the above sections we've seen some of the ways in which the processor app uses its transaction log for HA purposes. It used it for recovery after the primary was killed and restarted. When we connect a backup instance to the primary, the primary used its transaction log to initialize the backup. Beyond, their utility in achieving HA, transaction logs provide a wealth of information about your application.
Let's take a look at using the Transaction Log Tool to inspect the processor application's transaction log. Because all state changes are captured in the log we can query the logs to see the counter that it is incrementing. Run the Transaction Log Tool by entering the following command into a new console window:
open rdat\processor-2to open the folder containing the processor-2 transaction logs (the last instance that was Primary).
SELECT entryType, timestamp, transactionId, simpleClassName, counter FROM logs WHERE simpleClassName = 'Repository'
You should see output like the following
The above query selected some metadata about each entry (the entry type, when it was logged and the transaction id) as well as the counter field of the application's Repository object. From the above, we can see the initial 'Put' entry corresponding to the Repository object being created followed by subsequent Updates in which the updates to the counter field.
To learn more about querying transaction logs see also: Querying Transaction Logs
The query we just ran in the Transaction Log Tool provides a good example of how log compaction can be used to keep transaction log sizes manageable. When using State Replication the transaction log will contain your application's outbound messages, along with Put Update and Remove entries. Log compaction allows discarding delivered outbound messages, and conflate updates to state. Talon applications can be configured to do this in the background while they are running.
You can also run transaction log compaction from the Transaction Log Tool (TLT) using the compact command. To do this make sure you have shut down the sender, processor-2, processor-1 and receiver applications. Exit out of the Transaction Log Tool by typing 'exit' and relaunch TLT as you did above. Doing this is important to ensure that the TLT's indexing threads don't interfere with compaction. In the new TLT instance:
- Type open rdat\processor-2 to open the secondary instance's transaction log folder.
- Type switch browse (if you had a query open)
- Type compact run this command a few times (to save memory in the transaction log tool it compacts the file incrementally until all stabilized transactions have been compacted).
... continue to run the compact command until you see that the size (1.3MB above) is no longer shrinking. Now issue the same query that you issued above and observe that initial counter updates have been conflated:
Understanding Change Data Capture
CDC (Change Data Capture) is an API provided by Talon that allows one to "tail" a transaction log. Essentially, this API allows you to start a CDC processor on a log. This supplies the processor a callback to which log entries are dispatched when they are added to the log. You can use the CDC API to asynchronously move information from the transaction logs to the data/analytics stores of your choice. For example, you could move the data to an RDBMS for reporting. Using CDC does not slow down the transaction processing performed by the log's owner (i.e. the micro app).
CDC and Compaction
The CDC API performs compaction on the fly while tailing the log. This results in the CDC processor receiving each entry and the conflated form of each entry in the log. This allows a CDC processor to perform a rich set of functions ranging from audit trail logging to app state storage for analytics.
CDC and ICR
Another approach that can minimize any performance impact and resource usage on your transaction processors is to couple ICR with CDC. In this fashion, the transaction stream can be published out over messaging and CDC can be done remotely.
CDC in Action
The starter app currently does not come with a sample CDC runner. We are in the process of incorporating a sample runner in the starter app. For now, you can check out the nvx-sample-db-integration-cdc CDC Sample in our samples repository.
Ready To Learn More?
Continue reading about Concepts & Architecture...