...
- Variable substitution is not done on the provided key.
- Key validation is done on the topic
- The provided key will be prepended with the channel name if topic_starts_with_channel=true is set for the bus.
...
Zero Loss Sends (Send Stability)
Solicited Sends
As mentioned earlier, the AEP engine manages in-doubt messages across the entire cluster to ensure zero loss in its outbound message stream across fail overs and restarts. In the event of a fail over within a cluster or a fresh restart of the primary member of the cluster from its transaction log, the AEP engine recovers and retransmits "in-doubt" messages to ensure zero loss. This mechanism only applies to messages sent using solicited sends. For unsolicited sends, the application needs to participate in ensuring zero loss.
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * The {@link FileTailGateway} tails a file sending each line as a message. */ @AppHAPolicy(value = HAPolicy.StateReplication) public class FileInputGateway{ Tracer tracer = Tracer.create("file-tailer", Level.INFO); private volatile AepMessageSender messageSender; private volatile AepEngine engine; @Configured(property = "gateway.filename") private volatile String filename; private RandomAccessFile cursor; private volatile long stabilizedLine = 0; @AppInjectionPoint public void injectMessageSender(AepMessageSender messageSender) { this.messageSender = messageSender; } @AppInjectionPoint public void injectAepEngine(AepEngine engine) { this.engine = engine; } @AppMain public void run(String[] args) throws Exception { // Open source file and cursor file. BufferedReader reader = new BufferedReader(new FileReader(filename)); try { cursor = new RandomAccessFile(filename + ".cursor", "rwd"); cursor.setLength(8); cursor.seek(0); // Wait for engine messaging to start: engine.waitForMessagingToStart(); // Read lines until the end of the file: String line = null; stabilizedLine = cursor.readLong(); int lineNumber = 0; while ((line = reader.readLine()) != null) { if (++lineNumber <= stabilizedLine) { continue; } FileLineMessage message = FileLineMessage.create(); message.setText(line); message.setFileName(filename); message.setLineNumber(lineNumber); message.setAttachment(new Long(message.getLineNumber())); messageSender.sendMessage("line-input", message); } // Wait for send stability and exit: while (stabilizedLine < lineNumber) { Thread.sleep(100); } } finally { reader.close(); cursor.close(); } } @EventHandler public final void onSendStability(AepSendStabilityEvent event) throws IOException { FileLineMessage message = (FileLineMessage)event.getMessage(); stabilizedLine = (Long)message.getAttachment(); cursor.seek(0); cursor.writeLong(stabilizedLine); cursor.getFD().sync(); tracer.log("Got stability for: " + stabilizedLine, Level.INFO); } /** * @return The number of lines sent. */ @AppStat(name = "Lines Sent") public long getLinesSent() { return stabilizedLine; } } |
Below shows sample configuration which creates the bus, configures messaging and enables send stability events and unsolicited send sequencing:
Code Block | ||||
---|---|---|---|---|
| ||||
<?xml version="1.0"?> <model xmlns="http://www.neeveresearch.com/schema/x-ddl" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <!-- Define buses and channels that will be shared between applications--> <buses> <bus name="file-connector"> <provider>activemq</provider> <address>localhost</address> <port>61616</port> <channels> <channel name="line-input"> <qos>Guaranteed</qos> <key>${filename}</key> </channel> <channels> </bus> </buses> <!-- Apps will reference the buses they use in their messaging config --> <apps> <app name="file-input-gateway" mainClass="com.sample.FileInputGateway"> <messaging> <factories> <factory name="com.sample.messages.FileGatewayMessageFactory" /> </factories> <buses> <bus name="file-connector"> <channels> <channel name="line-input" join="false"/> </channels> </bus> </buses> <messaging> <dispatchSendStabilityEvents>true</dispatchSendStabilityEvents> <sequenceUnsolicitedWithSolicitedSends>true</sequenceUnsolicitedWithSolicitedSends> </app> </apps> </model> |
Receiving Messages
An application's AEP engine creates and manages the lifecycle of the message buses that an application configures for use. When an application is configured to join one or more bus channels, appropriate topic subscriptions will be issued on behalf of the application. Message dispatch occurs as follows:
...