|
When working with Event Sourcing, applications cannot make business logic decisions that affect recoverable state or the contents of outbound messages based on data in the local environmental or reference data that may be out of sync with backup instances. Doing so can lead to situations where a backup application instance makes different decisions than the primary instances leading to divergent state. Divergence of state can causes major problems if the backup takes over after a failure in the primary.
One approach to making sure that primaries and backups are always operating on the same data is to inject that data into the applications messaging stream for use in subsequent business logic decisions. This approach is not always tenable. Consider a financial trading system making time sensitive order routing decisions based off a "firehose" of market data. A logically valid approach would be to inject all market data directly into the event sourced application's replication stream, but in reality such an approach is infeasible as it would overwhelm the application with data that will mostly be ignored.
Talon provides an alternative approach to solving this problem, called Environment Replication. With this approach the platform provides an API which allow applications to write HA safe environment providers that capture local decisions on the primary instance into a capture buffer that is replicated along with the inbound message being processed. On the backup instance the capture buffer is then used to playback the same results as were used on the primary thereby allowing the same business logic decisions to be made. Environment providers are fairly simple to write and simple for applications to work with. For example, Exegy, in partnership with Neeve backed their entire market data api with an environment provider in a matter of days, providing a seamless solution to the market data problem referenced above.
This section discusses how a developer can write an environment provider to replicate its own replication data.
Let's consider a contrived but simple example in which we want to include some local environment data on an outbound message: the current host name. We can create an environment provider that will provide the host name in an Event Sourcing safe fashion.
private HostNameEnvironmentProvider hostNameProvider = new HostNameEnvironmentProvider(); private XString hostName = XString.create(256, true, true); @AppInjectionPoint public void initialize(AepEngine engine) { hostNameProvider.register(engine); } @EventHandler public void onMessage(BroadcastHostNameRequest message) { BroadcastHostNameResponse response = BroadcastHostNameResponse.create(); // call into the environment provider to capture // or replay the host name look up. hostNameProvider.getHostNameTo(hostName); // set the host name in the response and send. response.setHostName(hostName); send("hostname-response", response); } |
package com.neeve.hostnameprovider; import java.net.*; import java.nio.ByteBuffer; import com.neeve.aep.*; import com.neeve.lang.XString; /** * A simple environment provider that returns the current local host name. */ public class HostNameEnvironmentProvider { private final class ProviderImpl implements IAepEnvironmentProvider { @Override public void setBufferManager(BufferManager bufferManager) { HostNameEnvironmentProvider.this.bufferManager = bufferManager; } @Override public void onTransactionStart(ByteBuffer buffer, TransactionAction action) { HostNameEnvironmentProvider.this.buffer = buffer; HostNameEnvironmentProvider.this.action = action; } @Override public void onTransactionEnd() {} @Override public void close() {} } // Set by the environment provider impl private final ProviderImpl provider = new ProviderImpl(); private BufferManager bufferManager; private TransactionAction action; private ByteBuffer buffer; // stores the local host name private final XString localHostName; public HostNameEnvironmentProvider() { String host = null; try { host = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { host = "unknown"; } localHostName = XString.create(host); } /** * Registers the environment provider with the AepEngine. * * @param engine the engine with which to register. */ public void register(AepEngine engine) { engine.registerEnvironmentProvider(provider); } /** * Looks up the local host name in an HA consistent fashion. * * @param hostName The XString into which to copy the current host name. */ public void getHostNameTo(XString hostName) { if (action == TransactionAction.Playback) { short len = buffer.getShort(); hostName.setValue(buffer, buffer.position(), len); buffer.position(buffer.position() + len); } else { if (buffer.remaining() < localHostName.getSerializedLength() + 2) { buffer = bufferManager.resize(buffer.capacity() * 2); } // record the value buffer.putShort((short)localHostName.getSerializedLength()); localHostName.copyInto(buffer, buffer.position()); buffer.position(buffer.position() + localHostName.getSerializedLength()); // and copy into for the application hostName.setValue(localHostName); } } } |
The above implementation encloses the IAepEnvironmentProvider implementation in an inner class called ProviderImpl. The application's AepEngine:
When the application calls getHostNameTo(XString hostName) the provider either reads the host name out of the capture buffer or writes the local hostname to the capture buffer.
When using environment replication, it is not possible to configure the application's storage to use parallel replication because the message handler must be invoked first to collect environment provider data.