|
When working with Event Sourcing, applications cannot make business logic decisions that affect recoverable state or the contents of outbound messages based on data in the local environmental or reference data that may be out of sync with backup instances. Doing so can lead to situations where a backup application instance makes different decisions than the primary instance that result in divergent state. Divergence of state can causes major problems if the backup takes over after a failure in the primary. This section introduces Talon's Environment Replication feature which provides developers with a means of avoiding such pitfalls in a manner that doesn't incur major performance overhead. |
Talon provides an alternative approach to solving this problem, called Environment Replication. With this approach the platform provides an API which allow applications to write HA safe environment providers that capture local decisions on the primary instance into a capture buffer that is replicated along with the inbound message being processed. On the backup instance the capture buffer is then used to playback the same results as were used on the primary thereby allowing the same business logic decisions to be made. Environment providers are fairly simple to write and simple for applications to work with. For example, Exegy, in partnership with Neeve backed their entire market data api with an environment provider in a matter of days, providing a seamless solution to the market data problem referenced above. With this approach market data is received out band with the respect to the trading application's order processing messages, as the primary instance of a trading application looks at current market data while processing messages the values it sees are captured and replicated to the backup such the the backup instances see the same values. In this fashion the entire stream of market data being absorbed by Exegy need not be replicated to backup instances, just the data that the primary instance uses in making decisions is replicated.
This section discusses how a developer can write an environment provider to replicate its own replication data.
Let's consider a contrived but simple example in which we want to include some local environment data on an outbound message: the current host name. We can create an environment provider that will provide the host name in an Event Sourcing safe fashion.
private HostNameEnvironmentProvider hostNameProvider = new HostNameEnvironmentProvider(); private XString hostName = XString.create(256, true, true); @AppInjectionPoint public void initialize(AepEngine engine) { hostNameProvider.register(engine); } @EventHandler public void onMessage(BroadcastHostNameRequest message) { BroadcastHostNameResponse response = BroadcastHostNameResponse.create(); // call into the environment provider to capture // or replay the host name look up. hostNameProvider.getHostNameTo(hostName); // set the host name in the response and send. response.setHostName(hostName); send("hostname-response", response); } |
package com.neeve.hostnameprovider; import java.net.*; import java.nio.ByteBuffer; import com.neeve.aep.*; import com.neeve.lang.XString; /** * A simple environment provider that returns the current local host name. */ public class HostNameEnvironmentProvider { private final class ProviderImpl implements IAepEnvironmentProvider { @Override public void setBufferManager(BufferManager bufferManager) { HostNameEnvironmentProvider.this.bufferManager = bufferManager; } @Override public void onTransactionStart(ByteBuffer buffer, TransactionAction action) { HostNameEnvironmentProvider.this.buffer = buffer; HostNameEnvironmentProvider.this.action = action; } @Override public void onTransactionEnd() {} @Override public void close() {} } // Set by the environment provider impl private final ProviderImpl provider = new ProviderImpl(); private BufferManager bufferManager; private TransactionAction action; private ByteBuffer buffer; // stores the local host name private final XString localHostName; public HostNameEnvironmentProvider() { String host = null; try { host = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { host = "unknown"; } localHostName = XString.create(host); } /** * Registers the environment provider with the AepEngine. * * @param engine the engine with which to register. */ public void register(AepEngine engine) { engine.registerEnvironmentProvider(provider); } /** * Looks up the local host name in an HA consistent fashion. * * @param hostName The XString into which to copy the current host name. */ public void getHostNameTo(XString hostName) { if (action == TransactionAction.Playback) { short len = buffer.getShort(); hostName.setValue(buffer, buffer.position(), len); buffer.position(buffer.position() + len); } else { if (buffer.remaining() < localHostName.getSerializedLength() + 2) { buffer = bufferManager.resize(buffer.capacity() * 2); } // record the value buffer.putShort((short)localHostName.getSerializedLength()); localHostName.copyInto(buffer, buffer.position()); buffer.position(buffer.position() + localHostName.getSerializedLength()); // and copy into for the application hostName.setValue(localHostName); } } } |
The above implementation encloses the IAepEnvironmentProvider implementation in an inner class called ProviderImpl. The application's AepEngine:
When the application calls getHostNameTo(XString hostName) the provider either reads the host name out of the capture buffer or writes the local hostname to the capture buffer.
When using environment replication, it is not possible to configure the application's storage to use parallel replication because the message handler must be invoked first to collect environment provider data.