Section | |||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Overview
This section assumes you have read Working with Messaging to understand the basics of SMA messaging.
...
Code Block | ||
---|---|---|
| ||
<buses> <bus name="sample-bus" descriptor="${myBusDescriptor::loopback://mybus"> <channels> ... </channels> </bus> </buses> |
Anchor | ||||
---|---|---|---|---|
|
The following properties are common to all bus implementations (unless specifically noted otherwise) and can be specified in a bus descriptor either as parameters in a descriptor URL or as properties in the buses' configuration settings when specified in its decomposed form.
Property Name | Default Value | Description | ||||||
---|---|---|---|---|---|---|---|---|
set_bus_and_channel_on_receipt | false | Controls whether the bus and channel name are set on received messages. Setting the channel and bus name on inbound messages incurs performance overhead. Performance sensitive applications should avoid enabling this property. | ||||||
set_key_on_receipt | false | Controls whether the message key is set on received messages. Not all binding implementations transport the key on the wire, this property has no effect for bindings that don't transport the key. Setting the key on inbound messages incurs a performance overhead. Performance sensitive applications should avoid enabling this property. | ||||||
set_sno_on_receipt | true | Controls whether the message sequence number is set on received messages. If set to false, then inbound messages surfaced to the AEP engine and application will not have the sequence number set regardless of whether the sender was configured to set sequence numbers in outbound messages or not (see Message Sequencing) Setting the sequence number on inbound messages incurs a slight performance overhead. Performance sensitive applications should avoid enabling this property. | ||||||
enable_inbound_transport_headers | false | Controls whether transport headers are set in inbound messages Setting this parameter to true will cause the bindings, that support this functionality, to set transport specific headers in the metadata section of the inbound messages Not all binding implementations support transport header functionality. This property has no effect for such bindings. Setting the sequence number on inbound messages incurs a performance overhead. Performance sensitive applications should avoid enabling this property. Additionally, enabling such a functionality fosters a tighter coupling between the application and specific message bindings which is generally not a recommended practice | ||||||
topic_starts_with_channel | true | Controls whether topic names start with the channel name for the bus. For bus bindings that support topic routing this property controls whether or not the resolved key is prefixed with the channel name. | ||||||
enable_concurrent_sends | false | Controls whether sends through the message bus binding can be performed concurrently using multiple threads | ||||||
additional_properties_file | false | Specifies the path to an external file that is used to load additional bus configuration properties. The external file format is a plain java Properties file. Properties specified in the external file will be merged into configuration property set for the bus. This file is loaded at runtime, so the file is retrieved from the local file system on the host where the configured XVM will run. |
...
Note | ||
---|---|---|
| ||
In an application that only performs solicited sends, the X runtime ensures that the serialization and sends through the underlying transport are done in a single threaded manner. However, when an application performs unsolicited sends using two or more concurrent threads or performs unsolicited sends concurrently with solicited sends, then the sends through the underlying transport are performed using multiple concurrent threads. By default, the X runtime is not configured for thread safety for such concurrent sends. See Unsolicited Concurrent Sends for more information on how to configure the X runtime to enable either or a combination of these concurrency scenarios. |
Message Sequencing And Duplicate Detection
To avoid duplicates and provide exactly once delivery semantics each message sent through an AEP Engine is assigned a message sender id derived from the hashcode of the application name and a monotonically increasing sequence number. The following table describes the fields that the engine sets on outbound messages to enable downstream duplicate detection. These properties are transported on a message bus binding as part of the message metadata sent with the serialized message.
As an AEP Engine processes messages, it keeps track of the last seen sequence number for each sender that has sent to it as part of its HA state. Consequently, when an application fails over to a backup or is restarted it will remember the last message seen from senders sending to it and can filter duplicates on behalf of the application. When an application's state is wiped or reinitialized it will restart its sending stream sequence to 1 which alerts downstream applications not to consider its newly lowered sequence numbers as duplicates.
By default any message sent from a message handler is sequenced by an AEP Engine, the application should not set the sequence number or sender itself. Messages sent from outside of a message handler (unsolicited sends) are not sequenced by default. Sequencing of unsolicited sends must be enabled via configuration discussed below.
...
for concurrent sends. |
Anchor MessageSequencing MessageSequencing
Message Sequencing
MessageSequencing | |
MessageSequencing |
When sending a message, the AEP Engine sets the following sequencing related metadata on the outbound message.
Metadata Field | Type | Description |
---|---|---|
MessageSender | int | A globally unique id that identifies the sender of a message. By default, an AEP Engine uses the hash code of the engine name as the sender id. |
MessageSno | long | A monotonically increasing sequence number. Note: The AEP Engine uses the same sequence number space across all the channels on which outbound messages are sent. This means that receivers that only express interest in a subset of channels on which a sender engine publishes messages will not receive consecutive sequence numbers. Therefore, gaps in received sequence numbers cannot be used to determine message loss. The X runtime ensures zero loss via the use of Guaranteed (at-least-once) quality of delivery on the underlying transport coupled with persistence to deliver exactly once delivery to application message handlers. |
This metadata is transported along with the message payload via the underlying transport to the message recipients and is primarily used for duplicate detection (not message loss as explained in the table above)
Enabling/Disabling Sequence Numbers
Solicited Sends
By default, an AEP engine is configured to set sequence numbers for messages sent via for solicited sends. Whether sequence numbers are set in messages sent via solicited sends is controlled via the setOutboundSequenceNumbers configuration parameter. For example, the following disables setting sequence numbers in messages sent via solicited sends.
Code Block |
---|
<model>
<!-- Apps will reference the buses they use in their messaging config -->
<apps>
<app name="sample-app" mainClass="com.sample.SampleApp">
<messaging>
...
<messaging>
<setOutboundSequenceNumbers>false</setOutboundSequenceNumbers>
</app>
</apps>
</model> |
The default value of the setOutboundSequenceNumbers parameter is true.
Unsolicited Sends
By default, an AEP engine is configured to not set sequence numbers for messages sent via unsolicited sends. Whether sequence numbers are set in messages via unsolicited sends is controlled via both the setOutboundSequenceNumbers and sequenceUnsolicitedSends configuration parameters. For sequence numbers to be set in messages sent via unsolicited sends, both these parameters need to be set as true. Sequence numbers will not be set in messages sent via unsolicited sends if either of these parameters is false. For example, the following enables setting sequence numbers in messages sent via unsolicited sends.
Code Block |
---|
<model>
<!-- Apps will reference the buses they use in their messaging config -->
<apps>
<app name="sample-app" mainClass="com.sample.SampleApp">
<messaging>
...
<messaging>
<!--
<performDuplicateChecking>true</performDuplicateChecking>
<setOutboundSequenceNumbers>true</setOutboundSequenceNumbers>
</app>
</apps>
</model> |
The default value of the setOutboundSequenceNumbers parameter is true and the default value of sequenceUnsolicitedSends is false
Sequence Numbering and Concurrent Sends
The use of unsolicited sends introduces send concurrency related concerns. One of the concerns is the integrity of the sequence numbers in outbound messages - with concurrent sends, the system needs to ensure that the messages are transmitted on the wire in the same sequence as the setting of the sequence numbers on them. See Concurrent Sends for more information on how to configure the system for concurrent sends to ensure the integrity of the sequence numbers on the transmitted messages.
Duplicate Detection
The principal purpose of setting sequence numbers in outbound messages is for duplicate detection. By default, duplicate detection is enabled. In other words, if an AEP engine receives a message with a sequence number > 1 that is less or equal to a prior message with a sequence number > 1, then the second message is considered a duplicate and is not dispatched to the application. For applications that are tolerant of duplicates or can perform duplicate detection on their own, it is possible to disable duplicate detection for the engine. It is also possible to enable sequencing of messages that are sent from outside of a message handler can be done as wellin an engine even if sequence numbering is enabled. This is controlled by the performDuplicateChecking configuration parameter. The following example shows how to do so.
Code Block |
---|
<model>
<!-- Apps will reference the buses they use in their messaging config -->
<apps>
<app name="sample-app" mainClass="com.sample.SampleApp">
<messaging>
...
<messaging>
<!--
<performDuplicateChecking>true</performDuplicateChecking>
<setOutboundSequenceNumbers>true</setOutboundSequenceNumbers>
<sequenceUnsolicitedSends>false</sequenceUnsolicitedSends>
<sequenceUnsolicitedWithSolicitedSends>false</sequenceUnsolicitedWithSolicitedSends>
</app>
</apps>
</model> |
The following table summarizes AEP Engine configuration properties that have an impact on duplicate detection:
Config Property | Default | Description |
---|---|---|
setOutboundSequenceNumbers | true | Enables setting of outbound sequence numbers on send from the application's AEP Engine. When disabled a sequence number of 0 will be set on all outbound sends meaning that downstream receivers will not perform duplicate filtering for sends from this application. See setOutboundSequenceNumbers in the DDL Config Reference |
performDuplicateChecking | true | Whether or not the applications AEP Engine will filter duplicates based on received sequence numbers. An AEP Engine uses the hashcode of the engine name as the sender id. See performDuplicateChecking in the DDL Config Reference |
sequenceUnsolicitedSends | long | Whether to tag sequence numbers on unsolicited sends. This property can be enabled for applications that only perform sends in an unsolicited fashion and will never send a message from a message handler.
In short, enabling this mode of operation is really only applicable for gateway applications that use the application in purely unsolicited sending capacity. In most cases, applications will instead elect to use sequenceUnsolicitedWithSolicitedSends described below. See sequenceUnsolicitedSends in the DDL Config Reference |
sequenceUnsolicitedWithSolicitedSends | int | Indicates that unsolicited sends calls should result in injecting the send operation into the engine's input queue to sequence the send with respect to inbound event processing in the context of an AEP transaction. This avoids the possibility of sequencing errors with respect to outbound messages being concurrently sent by message handlers. See sequenceUnsolicitedWithSolicitedSends in the DDL Config Reference |
HA Considerations
It is worth noting that for a cluster application only the primary instance of the application establishes message bus connections. In the event of a failure, a backup member will be elected and reestablish messaging connections. From a programming standpoint, this has the biggest impact on applications using the EventSourcing HA Policy as it means that even though their handlers are being invoked message channels will not have been established for the applications.
During live operation, the primary application keeps track of the last transaction for which outbound messages were acknowledged. This id, known as the stableTransactionId is persisted in the applications transaction log and replicated to backups. After failure or recovery backup applications will retransmit outbound messages that are part of transactions that have not stabilized and rely on downstream applications to use duplicate detection to eliminate any duplicates.
...
Anchor | ||||
---|---|---|---|---|
|
An AEP Engine replicates and persists the following metadata in its entire cluster as part of the application's overall HA state.
- Sequence number of last sent message
- Sequence number of last sent message that has been acknowledged
- Last sequence number for each of the senders that it has processed messages from
When an application fails over to a backup or is restarted from its transaction log it will remember these variables and resume operation from where it left off
When an application's state is wiped or reinitialized it will restart its sending stream sequence to 1 which alerts downstream applications not to consider its newly lowered sequence numbers as duplicates.
The set of messages between the "last sent message that was acknowledged" and the "last sent message" is considered to be the set of "in-doubt" outbound messages i.e. messages for which it is not known whether the messaging backbone has stabilized and taken ownership of the message. On a failover or a restart, all messages in this in-doubt window are retransmitted by an AEP engine. the downstream applications use duplicate detection to filter out any duplicates.
Anchor | ||||
---|---|---|---|---|
|
As mentioned before, the X runtime ensures that the serialization and sending of messages through the underlying message bus binding (transport) for messages sent through an AEP engine using only solicited sends operates in a single threaded manner. In other words, for an application performing only solicited sends, the underlying bus binding does not have to be configured to be safe for concurrent sends. However, if an application performs unsolicited sends concurrently using more than one thread or performs unsolicited and solicited sends concurrently, then the X runtime needs to be configured to ensure thread safety for such concurrent sends. How one configures the system for such safety depends on the type of concurrent sends being done by the application
Only Concurrent Unsolicited Sends
If the application is NOT performing solicited sends and is performing unsolicited sends concurrently using more than one thread, then the X runtime needs to be configured for concurrent send safety by enabling concurrent sends on the underlying message bus binding. This is done using the enable_concurrent_sends bus connection descriptor property. This configuration parameter can be set as follows and is applicable to all message bus bindings supported for use with the X Platform
Code Block | ||
---|---|---|
| ||
<buses>
<bus name="sample-bus" descriptor="activemq://localhost:61616&enable_concurrent_sends=true">
<channels>
...
</channels>
</bus>
</buses> |
Concurrent Solicited and Unsolicited Sends
If an application is performing concurrent solicited and unsolicited sends, then configuring the underlying message bus binding for concurrent sends will ensure correct send operation unless sequence numbers need to be set on messages sent using unsolicited sends. Note that by default, sequence numbers are set for solicited sends and not set for unsolicited sends and, so, with those default settings, just configuring the underlying message bus binding for concurrent sends will enable safety of concurrent unsolicited and solicited sends. However, if sequence numbers are desired in messages sent via unsolicited sends, then the sequenceUnsolicitedWithSolicitedSends configuration parameter should be used instead of the enable_concurrent_sends. Setting both these parameters is safe but not necessary. The following example shows how to enable concurrency for this scenario i.e. concurrent solicited and unsolicited sends with sequence numbers in both solicited and unsolicited sent messages
Code Block |
---|
<model>
<!-- Apps will reference the buses they use in their messaging config -->
<apps>
<app name="sample-app" mainClass="com.sample.SampleApp">
<messaging>
...
<messaging>
<!--
<sequenceUnsolicitedWithSolicitedSends>true</sequenceUnsolicitedWithSolicitedSends>
</app>
</apps>
</model> |
Configuration Summary
The following table summarizes how to configure the X runtime for concurrent sends for the various concurrency scenarios
Solicited Sends | Concurrent Unsolicited Sends | Sequence Number in Solicited Sends | Sequence Number in Unsolicited Sends | Configuration |
---|---|---|---|---|
No | Yes | N/A | Yes or No | enable_concurrent_sends |
Yes | Yes | Yes | No | enable_concurrent_sends |
Yes | Yes | No | Yes | enable_concurrent_sends |
Yes | Yes | No | No | enable_concurrent_sends |
Yes | Yes | Yes | Yes | sequenceUnsolicitedWithSolicitedSends |
Creating Messages For Send
Message types generated by the application data modeler have no public constructors, but instances can be created via the static create() factory method on the message class. By routing all message creation through the create() factory methods, X is able to transparently switch between pooled and unpooled un-pooled allocation of messages without the need for any changes to business logic. Once a message has been created, application code can populate the message using the random access i.e. set field values by calling setter methods, as though it were any other POJO .or other encoding specific mechanisms of message population
For example:
Code Block | ||||
---|---|---|---|---|
| ||||
MyMessage message = MyMessage.create(); message.setStringField("test"); message.setIntField(123); |
...
Scope of a Sent Message
Sending a message through an AEP Engine transfers ownership of the message to the engine. What this means is that, once the send call completes, the application must release its reference to the sent message. Modifying or even reading a message post send is illegal and can result in unpredictable behavior.
If a message is created, but not sent, it is up to the application to call dispose() on the message to return it the pool. Failing to return the message to a pool will not result in a memory leak, but it will mean that subsequent create() calls won't result in object allocation and promotion.
If the application needs to send the same message multiple times, use the copy() method on the message to create a new deep copy of the message such as the following:
Code Block |
---|
MyMessage original = MyMessage.create(); original.setName("Neeve"); original.setYear(2017); MyMessage copy = original.copy(); copy.setYear(2018); sender.send("destination1", original); sender.send('destination2", copy); |
If message pooling is enabled, sent messages will be wiped and reused in subsequent sends.
Note |
---|
Sending a message through an AEP Engine transfers ownership of the message to the engine. What this means is that, once the send call completes, the application must release its reference to the sent message. Modifying or even reading a message post send is illegal and can result in unpredictable behavior. If a message is created, but not sent, it is up to the application to call dispose() on the message to return it the pool. Failing to return the message to a pool will not result in a memory leak, but it will mean that subsequent create() calls won't result in object allocation and promotion. |
How To Send Messages
Using the AEP Message Sender
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class MyApp { private volatile MessageChannel orderEventChannel; private volatile AepEngine engine; @AppInjectionPoint public void onEngineInjected(AepEngine engine) { this.engine = engine; } @EventHandler(source = "order-event-channel@order-processing-bus") public void onOrderEventChannelUp(AepChannelUpEvent channelUpEvent) { this.orderEventChannel= channelUpEvent.getChannel(); } @EventHandler(source = "new-orders-channel@order-processing-bus") public void onOrderEventChannelDown(AepChannelDownEvent channelDownEvent) { this.orderEventChannel= null; } @EventHandler public void onNewOrder(NewOrderMessage message) { OrderEventMessage event = OrderEventMessage.create(); event.setOrderId(message.getOrderId()); // The channel name and bus must be set on the message // if using Event Sourcing because handlers are invoked // on the backup and during recovery before messaging is // started. // // This is not required when using State Replication. event.setMessageChannel("order-event-channel"); event.setMessageBus("order-processing-bus"); engine.sendMessage(orderEventChannel, event); } } |
Message Routing (Channel Keys
...
)
The message channel encapsulates the physical destination on which the message will be sent using a channel key. The channel key abstraction allows the actual message provider destination to be controlled via configuration. Channel keys can be either static or dynamic:
...
- Variable substitution is not done on the provided key.
- Key validation is done on the topic
- The provided key will be prepended with the channel name if topic_starts_with_channel=true is set for the bus.
Zero Loss Sends (Send Stability)
Solicited Sends
As mentioned earlier, the AEP engine manages in-doubt messages across the entire cluster to ensure zero loss in its outbound message stream across fail overs and restarts. In the event of a fail over within a cluster or a fresh restart of the primary member of the cluster from its transaction log, the AEP engine recovers and retransmits "in-doubt" messages to ensure zero loss. This mechanism only applies to messages sent using solicited sends. For unsolicited sends, the application needs to participate in ensuring zero loss.
Unsolicited Sends
When an application sends a message in an unsolicited manner, it is possible that the sent message would be lost if the application crashes prior to a send being transmitted over the message bus. To handle such a situation, the AEP Engine provides a mechanism by which it notifies the application when it has ensured that the sent message has been stabilized downstream and is guaranteed to be delivered to the downstream applications. to ensure zero loss, the sending application needs to hold onto the sent message until it receives such a notification from the AEP engine at which point, the application can discard the sent message. For application, such as gateway applications, this typically involves holding onto sent messages until the AEP engine provides stability notification at which point the application not only discards the sent message but also notifies stability of the message upstream to whatever source the message was received from so the source itself can record that the message has been delivered and does not need to be delivered again
The AEP engine provides such stability notifications via the AepSendStabilityEvent and needs to be configured via dispatchSendStabilityEvents to dispatch these events.
Correlating Send Stability
It usually necessary for an unsolicited sender to correlate the send stability event to the source event that triggered its send. The AepSendStabilityEvent event includes a reference to the stabilized message via the getMessage() accessor. Often times one of the fields on the message can be used to correlate the sent message to the source event. In cases where including correlation information in the published message does not make sense, it is possible to set an Attachment on the sent message that can assist in correlation when processing the stability event. This technique is illustrated in the sample below.
Example: A File Gateway
The following sample shows a file gateway app that reads lines from a file and publishes each line out over messaging. It enables AepSendStabilityEvents and on receipt of the stability event updates a cursor file to record the last line that was acknowledged by the downstream bus:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* The {@link FileTailGateway} tails a file sending each line as a message.
*/
@AppHAPolicy(value = HAPolicy.StateReplication)
public class FileInputGateway{
Tracer tracer = Tracer.create("file-tailer", Level.INFO);
private volatile AepMessageSender messageSender;
private volatile AepEngine engine;
@Configured(property = "gateway.filename")
private volatile String filename;
private RandomAccessFile cursor;
private volatile long stabilizedLine = 0;
@AppInjectionPoint
public void injectMessageSender(AepMessageSender messageSender) {
this.messageSender = messageSender;
}
@AppInjectionPoint
public void injectAepEngine(AepEngine engine) {
this.engine = engine;
}
@AppMain
public void run(String[] args) throws Exception {
// Open source file and cursor file.
BufferedReader reader = new BufferedReader(new FileReader(filename));
try {
cursor = new RandomAccessFile(filename + ".cursor", "rwd");
cursor.setLength(8);
cursor.seek(0);
// Wait for engine messaging to start:
engine.waitForMessagingToStart();
// Read lines until the end of the file:
String line = null;
stabilizedLine = cursor.readLong();
int lineNumber = 0;
while ((line = reader.readLine()) != null) {
if (++lineNumber <= stabilizedLine) {
continue;
}
FileLineMessage message = FileLineMessage.create();
message.setText(line);
message.setFileName(filename);
message.setLineNumber(lineNumber);
message.setAttachment(new Long(message.getLineNumber()));
messageSender.sendMessage("line-input", message);
}
// Wait for send stability and exit:
while (stabilizedLine < lineNumber) {
Thread.sleep(100);
}
}
finally {
reader.close();
cursor.close();
}
}
@EventHandler
public final void onSendStability(AepSendStabilityEvent event) throws IOException {
FileLineMessage message = (FileLineMessage)event.getMessage();
stabilizedLine = (Long)message.getAttachment();
cursor.seek(0);
cursor.writeLong(stabilizedLine);
cursor.getFD().sync();
tracer.log("Got stability for: " + stabilizedLine, Level.INFO);
}
/**
* @return The number of lines sent.
*/
@AppStat(name = "Lines Sent")
public long getLinesSent() {
return stabilizedLine;
}
} |
Below shows sample configuration which creates the bus, configures messaging and enables send stability events and unsolicited send sequencing:
Code Block | ||||
---|---|---|---|---|
| ||||
<?xml version="1.0"?>
<model xmlns="http://www.neeveresearch.com/schema/x-ddl" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<!-- Define buses and channels that will be shared between applications-->
<buses>
<bus name="file-connector">
<provider>activemq</provider>
<address>localhost</address>
<port>61616</port>
<channels>
<channel name="line-input">
<qos>Guaranteed</qos>
<key>${filename}</key>
</channel>
<channels>
</bus>
</buses>
<!-- Apps will reference the buses they use in their messaging config -->
<apps>
<app name="file-input-gateway" mainClass="com.sample.FileInputGateway">
<messaging>
<factories>
<factory name="com.sample.messages.FileGatewayMessageFactory" />
</factories>
<buses>
<bus name="file-connector">
<channels>
<channel name="line-input" join="false"/>
</channels>
</bus>
</buses>
<messaging>
<dispatchSendStabilityEvents>true</dispatchSendStabilityEvents>
<sequenceUnsolicitedWithSolicitedSends>true</sequenceUnsolicitedWithSolicitedSends>
</app>
</apps>
</model> |
Receiving Messages
An application's AEP engine creates and manages the lifecycle of the message buses that an application configures for use. When an application is configured to join one or more bus channels, appropriate topic subscriptions will be issued on behalf of the application. Message dispatch occurs as follows:
...
In the above case, the value set programmatically overrides the configured value for the application.
Per Channel Subscription Preservation
Status | ||||
---|---|---|---|---|
|
...