The Talon Manual

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Section
div
idtoc
classtoc
div
classtocTitle

In This Section

div
classtocContents

Table of Contents
maxLevel24
indent8px15px
stylenone

Overview

This section assumes you have read Working with Messaging to understand the basics of SMA messaging.

...

Code Block
titleAs A Descriptor
<buses>
  <bus name="sample-bus" descriptor="${myBusDescriptor::loopback://mybus">
    <channels>
      ...
    </channels>
  </bus>
</buses>

Anchor
CommonBusConfigurationProperties
CommonBusConfigurationProperties
Common Bus Configuration Properties

The follow following properties are common to all bus implementations (unless specifically noted otherwise) and can be specified in a bus descriptor either as parameters in a descriptor URL or as properties in the buses' configuration settings when specified in its decomposed form. 

Property NameDefault ValueDescription
set_bus_and_channel_on_receiptfalse

Controls whether the bus and channel name are set on received messages.

Setting the channel and bus name on inbound messages incurs performance overhead. Performance sensitive applications should avoid enabling this property. 

set_key_on_receiptfalse

Controls whether the message key is set on received messages.

Not all binding implementations transport the key on the wire, this property has no effect for bindings that don't transport the key.

Setting the key on inbound messages incurs a performance overhead. Performance sensitive applications should avoid enabling this property. 

set_sno_on_receipttrue

Controls whether the message sequence number is set on received messages.

If set to false, then inbound messages surfaced to the AEP engine and application will not have the sequence number set regardless of whether the sender was configured to set sequence numbers in outbound messages or not (see Message Sequencing)

Setting the sequence number on inbound messages incurs a slight performance overhead. Performance sensitive applications should avoid enabling this property. 

enable_inbound_transport_headersfalse

Controls whether transport headers are set in inbound messages

Setting this parameter to true will cause the bindings, that support this functionality, to set transport specific headers in the metadata section of the inbound messages

Not all binding implementations support transport header functionality. This property has no effect for such bindings.

Setting the sequence number on inbound messages incurs a performance overhead. Performance sensitive applications should avoid enabling this property.  Additionally, enabling such a functionality fosters a tighter coupling between the application and specific message bindings which is generally not a recommended practice

topic_starts_with_channeltrue

Controls whether topic names start with the channel name for the bus.

For bus bindings that support topic routing this property controls whether or not the resolved key is prefixed with the channel name.

Status
colourGreen
titleSINCE 3.15

enable_concurrent_sends

falseControls whether sends through the message bus binding can be performed concurrently using multiple threads

Status
colourGreen
titleSINCE 3.9

additional_properties_file

falseSpecifies the path to an external file , that is used to load additional bus configuration properties from. The external file format is a plain java Properties file. Properties specified in the external file will be merged into configuration property set for the bus. This file is loaded at runtime, so the file is retrieved from the local file system on the host where the configured XVM will run.

Most bus implementations support additional properties specified to the messaging provider. Consult the documentation for the bus provider to learn about the properties it supports. 

Sending Messages

Understanding Message Sends

Messages are sent over message bus channels in a fire and forget fashion - the application can rely on the underlying AepEngine AEP Engine to deliver the message according to the quality of service configured for a channel. An application views message channels as a logical, ordered conduit over which messages flow. 

The diagram below depicts the path of a message sent through an engine. The basic flow of sending is as follows:

  1. The application creates a message.
  2. The application calls send, passing the message and channel name. 
  3. The engine looks up the channel and uses it to resolves the physical destination on which the message will be sent via the configured channel key.
  4. The engine queues the message for

...

  1. sending until the associated state changes for the message handler are stabilized to the application's store (e.g replication and transaction log write).

...

  1. Once the application state changes have been stabilized, the enqueued message is then sent. This involves the following sub-steps
    1. The message is serialized
    2. The serialized contents and message metadata are then packaged in a message bus provider specific

...

    1. message 
    2. The message is sent through the underlying transport.

Gliffy Diagram
size700
nameOutbound Message Dispatch

Solicited vs Unsolicited Sends

When an outbound message is sent from within an application event handler, it is referred to as a solicited send. When an outbound message is sent from outside of a message handler, it is referred to as an unsolicited send.  The steps listed above pertain to Solicited sends.

Note

The AepEngine and underlying SMA message channels are not designed to be thread-safe. This means applications should not mix solicited and unsolicited sends using the same AepEngine. If an application is performing unsolicited sends using the same AepEngine across multiple threads, be sure there is appropriate synchronization around any access to the AepEngine and associated objects.

...

titleUnsolicited Sends and Send Concurrency

In an application that only performs solicited sends, the X runtime ensures that the serialization and sends through the underlying transport are done in a single threaded manner. However, when an application performs unsolicited sends using two or more concurrent threads or performs unsolicited sends concurrently with solicited sends, then the sends through the underlying transport are performed using multiple concurrent threads. By default, the X runtime is not configured for thread safety for such concurrent sends. See Concurrent Sends for more information on how to configure the X runtime for concurrent sends.

Anchor
MessageSequencing
MessageSequencing
Message Sequencing

When sending a message, the AEP Engine sets the following sequencing related metadata on the outbound message. 

Metadata FieldTypeDescription
MessageSender int

A globally unique id that identifies the sender of a message.

By default, an AEP Engine uses the hash code of the engine name as the sender id.   

MessageSno long

A monotonically increasing sequence number.  

A sequence number of <=0 indicates that the message is not sequenced i.e. receivers should not perform duplicate checking on it. A sequence number of 1 indicates that the start (or restart of a stream) i.e. a receiver that receives sequence numbers 1,2,3,1,2,3 should not consider the 4th message a duplicate. Otherwise, receivers should consider a sequence number not greater than the previous sequence number as a duplicate
 

Note: The AEP Engine uses the same sequence number space across all the channels on which outbound messages are sent. This means that receivers that only express interest in a subset of channels on which a sender engine publishes messages will not receive consecutive sequence numbers. Therefore, gaps in received sequence numbers cannot be used to determine message loss. The X runtime ensures zero loss via the use of Guaranteed (at-least-once) quality of delivery on the underlying transport coupled with persistence to deliver exactly once delivery to application message handlers.

This metadata is transported along with the message payload via the underlying transport to the message recipients and is primarily used for duplicate detection (not message loss as explained in the table above)

Enabling/Disabling Sequence Numbers

Solicited Sends

By default, an AEP engine is configured to set sequence numbers for messages sent via for solicited sends. Whether sequence numbers are set in messages sent via solicited sends is controlled via the setOutboundSequenceNumbers configuration parameter. For example, the following disables setting sequence numbers in messages sent via solicited sends.

Code Block
<model>
  
  <!-- Apps will reference the buses they use in their messaging config -->
  <apps>
    <app name="sample-app" mainClass="com.sample.SampleApp">
      <messaging>
		...
      <messaging>

      <setOutboundSequenceNumbers>false</setOutboundSequenceNumbers>
    </app>
  </apps>
</model>

The default value of the setOutboundSequenceNumbers parameter is true

Unsolicited Sends

By default, an AEP engine is configured to not set sequence numbers for messages sent via unsolicited sends. Whether sequence numbers are set in messages via unsolicited sends is controlled via both the setOutboundSequenceNumbers and sequenceUnsolicitedSends configuration parameters. For sequence numbers to be set in messages sent via unsolicited sends, both these parameters need to be set as true. Sequence numbers will not be set in messages sent via unsolicited sends if either of these parameters is false. For example, the following enables setting sequence numbers in messages sent via unsolicited sends.

Code Block
<model>
  
  <!-- Apps will reference the buses they use in their messaging config -->
  <apps>
    <app name="sample-app" mainClass="com.sample.SampleApp">
      <messaging>
		...
      <messaging>

      <!-- 
      <performDuplicateChecking>true</performDuplicateChecking>
      <setOutboundSequenceNumbers>true</setOutboundSequenceNumbers>
    </app>
  </apps>
</model>

The default value of the setOutboundSequenceNumbers parameter is true and the default value of sequenceUnsolicitedSends is false

Sequence Numbering and Concurrent Sends

The use of unsolicited sends introduces send concurrency related concerns. One of the concerns is the integrity of the sequence numbers in outbound messages - with concurrent sends, the system needs to ensure that the messages are transmitted on the wire in the same sequence as the setting of the sequence numbers on them. See Concurrent Sends for more information on how to configure the system for concurrent sends to ensure the integrity of the sequence numbers on the transmitted messages.

Duplicate Detection

The principal purpose of setting sequence numbers in outbound messages is for duplicate detection. By default, duplicate detection is enabled. In other words, if an AEP engine receives a message with a sequence number > 1 that is less or equal to a prior message with a sequence number > 1, then the second message is considered a duplicate and is not dispatched to the application. For applications that are tolerant of duplicates or can perform duplicate detection on their own, it is possible to disable duplicate detection in an engine even if sequence numbering is enabled. This is controlled by the performDuplicateChecking configuration parameter. The following example shows how to do so.

Code Block
<model>
  
  <!-- Apps will reference the buses they use in their messaging config -->
  <apps>
    <app name="sample-app" mainClass="com.sample.SampleApp">
      <messaging>
		...
      <messaging>

      <!-- 
      <performDuplicateChecking>true</performDuplicateChecking>
    </app>
  </apps>
</model>

Anchor
SequenceNumbersAndClustering
SequenceNumbersAndClustering
Sequence Numbers and Clustering

An AEP Engine replicates and persists the following metadata in its entire cluster as part of the application's overall HA state.

When an application fails over to a backup or is restarted from its transaction log it will remember these variables and resume operation from where it left off 

When an application's state is wiped or reinitialized it will restart its sending stream sequence to 1 which alerts downstream applications not to consider its newly lowered sequence numbers as duplicates.

The set of messages between the "last sent message that was acknowledged" and the "last sent message" is considered to be the set of "in-doubt" outbound messages i.e. messages for which it is not known whether the messaging backbone has stabilized and taken ownership of the message. On a failover or a restart, all messages in this in-doubt window are retransmitted by an AEP engine. the downstream applications use duplicate detection to filter out any duplicates. 

Anchor
ConcurrentSends
ConcurrentSends
Concurrent Sends

As mentioned before, the X runtime ensures that the serialization and sending of messages through the underlying message bus binding (transport) for messages sent through an AEP engine using only solicited sends operates in a single threaded manner. In other words, for an application performing only solicited sends, the underlying bus binding does not have to be configured to be safe for concurrent sends. However, if an application performs unsolicited sends concurrently using more than one thread or performs unsolicited and solicited sends concurrently, then the X runtime needs to be configured to ensure thread safety for such concurrent sends. How one configures the system for such safety depends on the type of concurrent sends being done by the application

Only Concurrent Unsolicited Sends

If the application is NOT performing solicited sends and is performing unsolicited sends concurrently using more than one thread, then the X runtime needs to be configured for concurrent send safety by enabling concurrent sends on the underlying message bus binding. This is done using the enable_concurrent_sends bus connection descriptor property. This configuration parameter can be set as follows and is applicable to all message bus bindings supported for use with the X Platform

Code Block
titleEnable Concurrent Sends for Concurrent Unsolicited Sends
<buses>
  <bus name="sample-bus" descriptor="activemq://localhost:61616&enable_concurrent_sends=true">
    <channels>
      ...
    </channels>
  </bus>
</buses>

Concurrent Solicited and Unsolicited Sends

If an application is performing concurrent solicited and unsolicited sends, then configuring the underlying message bus binding for concurrent sends will ensure correct send operation unless sequence numbers need to be set on messages sent using unsolicited sends. Note that by default, sequence numbers are set for solicited sends and not set for unsolicited sends and, so, with those default settings, just configuring the underlying message bus binding for concurrent sends will enable safety of concurrent unsolicited and solicited sends. However, if sequence numbers are desired in messages sent via unsolicited sends, then the sequenceUnsolicitedWithSolicitedSends configuration parameter should be used instead of the enable_concurrent_sends. Setting both these parameters is safe but not necessary. The following example shows how to enable concurrency for this scenario i.e. concurrent solicited and unsolicited sends with sequence numbers in both solicited and unsolicited sent messages

Code Block
<model>
  
  <!-- Apps will reference the buses they use in their messaging config -->
  <apps>
    <app name="sample-app" mainClass="com.sample.SampleApp">
      <messaging>
		...
      <messaging>

      <!-- 
      <sequenceUnsolicitedWithSolicitedSends>true</sequenceUnsolicitedWithSolicitedSends>
    </app>
  </apps>
</model>

Configuration Summary

The following table summarizes how to configure the X runtime for concurrent sends for the various concurrency scenarios

Solicited SendsConcurrent Unsolicited SendsSequence Number in Solicited SendsSequence Number in Unsolicited SendsConfiguration
NoYesN/AYes or Noenable_concurrent_sends
YesYesYesNoenable_concurrent_sends
YesYesNoYesenable_concurrent_sends
YesYesNoNoenable_concurrent_sends
YesYesYesYessequenceUnsolicitedWithSolicitedSends

Creating Messages For Send

Message types generated by the application data modeler have no public constructors, but instances can be created via the static create() factory method on the message class. By routing all message creation through the create() factory methods, X is able to transparently switch between pooled and unpooled un-pooled allocation of messages without the need for any changes to business logic. Once a message has been created, application code can populate the message using the random access i.e. set field values by calling setter methods, as though it were any other POJO .or other encoding specific mechanisms of message population

For example:

Code Block
languagejava
titleMessage Population Using Random Access
MyMessage message = MyMessage.create();
message.setStringField("test");
message.setIntField(123);

...

Scope of a Sent Message

Sending a message through an AEP Engine transfers ownership of the message to the engine. What this means is that, once the send call completes, the application must release its reference to the sent message. Modifying or even reading a message post send is illegal and can result in unpredictable behavior.

If a message is created, but not sent, it is up to the application to call dispose() on the message to return it the pool. Failing to return the message to a pool will not result in a memory leak, but it will mean that subsequent create() calls won't result in object allocation and promotion.

If the application needs to send the same message multiple times, use the copy() method on the message to create a new deep copy of the message such as the following:

Code Block
MyMessage original = MyMessage.create();
original.setName("Neeve");
original.setYear(2017);
 
MyMessage copy = original.copy();
copy.setYear(2018);
 
sender.send("destination1", original);
sender.send('destination2", copy);

If message pooling is enabled, sent messages will be wiped and reused in subsequent sends.

...

If a message is created, but not sent, it is up to the application to call dispose() on the message to return it the pool. Failing to return the message to a pool will not result in a memory leak, but it will mean that subsequent create() calls won't result in object allocation and promotion.

How To Send Messages

Using the AEP Message Sender

Status
colourGreen
titlesince 3.5

...

...

Directly Via the AEP Engine

When an application does not use the AepMessageSender, it must provide the MessageChannel in the AepEngineAEP Engine's sendMessage call. When using this pattern, the application can register event handlers for channel up and down events which are dispatched to the application when messaging is started or stopped. The application can qualify the EventHandler annotation to identify a particular channelName@busName to specify the channel of interest.

Code Block
java
java
public class MyApp {
  private volatile MessageChannel orderEventChannel;
  private volatile AepEngine aepEngineengine;
 
  @AppInjectionPoint
  public void onEngineInjected(AepEngine aepEngineengine) {
    this.aepEngine engine = aepEngineengine;
  }
  @EventHandler(source = "order-event-channel@order-processing-bus")
  public void onOrderEventChannelUp(AepChannelUpEvent channelUpEvent) {
    this.orderEventChannel= channelUpEvent.getChannel();
  }

  @EventHandler(source = "new-orders-channel@order-processing-bus")
  public void onOrderEventChannelDown(AepChannelDownEvent channelDownEvent) {
    this.orderEventChannel= null;
  }
 
  @EventHandler
  public void onNewOrder(NewOrderMessage message) {
	OrderEventMessage event = OrderEventMessage.create();
	event.setOrderId(message.getOrderId());

    // The channel name and bus must be set on the message
    // if using Event Sourcing because handlers are invoked
    // on the backup and during recovery before messaging is
    // started. 
    // 
    // This is not required when using State Replication.
    event.setMessageChannel("order-event-channel");
    event.setMessageBus("order-processing-bus");
 
    aepEngineengine.sendMessage(orderEventChannel, event);
  }
} 

Message Routing (Channel Keys

...

)

The message channel encapsulates the physical destination on which the message will be sent using a channel key. The channel key abstraction allows the actual message provider destination to be controlled via configuration. Channel keys can be either static or dynamic:

  1. A static key uses the channel key as the actual topic on which to send the message. For example, "Orders/USA".
  2. A dynamic key contains variable components that are resolved at runtime either at startup or during a send call to resolve the specific topic. For example, "Orders/${Region}".
  3. Channel key functions allow portions of the key to be resolved using a function. For example, "Orders/#[orderProcessorShard = hash(${orderId}, 2)]", would resolve to Orders/1 or Orders/2 based on the hashcode of the orderId field in the message.

By default, a message channel will prepend the channel name to the beginning of the topic as the first level in the topic name. This mechanism prevents multiple channel definitions from conflicting with one another. This behavior can be disabled by configuring the bus provider with the bus property topic_starts_with_channel=false.

...

  • The message contents via message reflection. The channel will look for a field accessor matching the variable key component (e.g. a getSalesRegion() method) if the message supports message reflection.
  • The key resolution table supplied to an AepEngine.sendsendMessage method or AepMessageSender.sendsendMessage method (if non-null).
  • The channel's key resolution table (if non-null), which is a table that can be set on the channel programmatically for key resolution when handling an AepChannelUpEvent (see restrictions below for EventSourcing considerations).
  • Default values encoded into the channel key itself. In the above example, this would be 'US' for 'salesRegion'.

...

Note

Note that when using EventSourcing as the HAPolicy, using a key resolution table set on the message channel is not supported. The reason for this is that message buses are only started for applications that are operating in a Primary role. Consequently, messages sent by application logic on the backup instance will not have received an AepChannelUpEvent and won't have a key resolution table which could lead to divergent key resolution.

...

    • variableName: Names the variable portion of the key. Naming The naming of the variable portion defined by the function is important as it allows the portion to be referenced when defining channel filters (see channel filters below)
    • functionName: The name of the static registered channel key function to use. 
    • args: The arguments to pass to the function. The arguments can either be static values, or ${variable} values sourced from the message or key resolution tables. 

 

Built-In Functions

The following table describe describes the currently available set of built-in channel key functions provided by the platform: 

...

Code Block
languagexml
titleChannel Key for Partitioning.
<buses>  
  <bus name="order-bus" descriptor="solace://solhost:55555>
    <channels>
      <channel name="new-orders">
        <key>NEWORDERS/#[ordershard=hash(${orderId}, 3)]</key>
      </channel>
    </channel>
  </bus>
</buses>
 
<apps>
  <app name="sender">
    <messaging>
      <bus name="order-bus">
        <channels>
          <channel name="new-orders" join="false">
        </channels>
      </bus>
    </messaging>
  </app>
  <app name="receiver-1">
    <messaging>
      <bus name="order-bus">
        <channels>
          <channel name="new-orders" join="true">
            <filter>ordershard=1|2<filter>
          </channel>
        </channels>
      </bus>
    </messaging>
  </app>
</apps>

...

An application may define its own channel key functions by setting the value of  `nv.sma.channelkeyfunctioncontainers` to a comma-separated list of class classes that contain additional channel key functions. 

...

Code Block
titleDeclaring a Custom Key Function Class
<env>
  <nv>
    <sma>
      <channelkeyfunctioncontainers>com.example.CustomChannelKeyFunctions</channelkeyfunctioncontainers>
    </sma>
  </nv>
</env>
 
 
Code Block
titleCustom Channel Key Function Container Class
public class CustomChannelKeyFunctions {
  
  /**
   * A custom key function that substituteds the first character of the provided 
   * value into the message key. 
   */
  public static final void abbreviate(final XString messageKey, XString value) {
    messageKey.append(value.charAt(0));
  }
}

...

When resolving topics dynamically from a messages message's fields it may be important to validate that value from the message field doesn't introduce additional levels in the topic or illegal characters or result in empty topic levels. To prevent this a message bus can be configured to 'clean' the dynamic variable portions of the topic at the cost of additional overhead by setting the following environment properties:

PropertyDefaultDescription
nv.sma.maxresolvedkeylength0

Controls the maximum resolved key length for resolved and static message keys.

When set to a length greater than 0 the length of the message key is checked to ensure that the length is not greater this value.

For maximum portability between bindings, this property can be set to the lowest value for destination lengths supported amongst the bindings that will be used.

If this property is not set and "nv.sma.validatemessagekey" is set a message bus binding instance should still validate that a resolved message key is not too long. For example, the solace binding only supports topics that are 250 characters long and would enforce the resolved topic length as part of key validation.

Note

Prior to version 3.4.373, this property was named nv.sma.maxResolvedKeyLength. This property was changed to be all lowercase for uniformity with other environment property names. The old camelcase property name is still supported for backwards backward compatibility, but the newer property name takes precedence. It is therefore important for applications that may specify and override this property from multiple locations (e.g. system property and ddlDDL) use the same case everywhere.

nv.sma.cleanmessagekey false

Controls whether or not variable key parts in channel keys are sanitized by replacing any non-letter or digit character with an '_' when resolving a variable key.

For example: if the channel key is specified as "/Orders/${Region}" and key resolution is performed using a message that returns a value of "Asia/Pac" for getRegion(), then the resolved key value will be "/Orders/Asia_Pac" rather than "/Orders/Asia/Pac" when this property is set to true.

This property applies to values coming from a key resolution table or via message reflection but not to channel key functions.  

 

Note

Prior to version 3.4.373 this property was named nv.sma.cleanMessageKey. This property was changed to be all lowercase for uniformity with other environment property names. The old camelcase property name is still supported for backwards backward compatibility, but the newer property name takes precedence. It is therefore important for applications that may specify and override this property from multiple locations (e.g. system property and DDL) use the same case everywhere.

nv.sma.allowemptykeyfield false

Controls whether or not variable key parts in channel keys may be substituted with an empty String value.

For example: if the channel key is specified as "/Orders/${Region}/${Department}" and key resolution is performed using a message that returns a value of "" for getRegion(), then this property specifies that key resolution should fail when set to false.

This property applies to values coming from a key resolution table or via message reflection but not to channel key functions.  

(lightbulb) See also nv.sma.treatemptykeyasnull which can be used as a more permissive alternative for handling empty string values during key resolution.

Note

Prior to version 3.4.373 this property was named nv.sma.allowEmptyKeyField. This property was changed to be all lowercase for uniformity with other environment property names. The old camelcase property name is still supported for backwards backward compatibility, but the newer property name takes precedence. It is therefore important for applications that may specify and override this property from multiple locations (e.g. system property and DDL) use the same case everywhere.

nv.sma.treatemptykeyfieldasnull

Anchor
treatemptykeyasnull
treatemptykeyasnull

Anchor
treatemptykeyfieldasnull
treatemptykeyfieldasnull

false

The XRuntime property controlling whether or not an empty key value is treated as null when resolving message keys.

When this property is set it takes precedence over the value set for "nv.sma.allowemptykeyfield". If a variable key value is resolved from a message or a key resolution table as an empty string it is treated as if the value were not set at all.

Unlike nv.sma.allowemptykeyfield=false, this value allows the key resolution to continue to look for valid values from other sources or the default value configured for the channel key variable, only resulting in an exception if the value can't be resolved at all.

Users should take care when setting this property to true if variable substitution values are expected to be resolved from a field in the message: setting this value to true when there is a default value provided in the channel key itself could mask issues related to message contents not containing valid values.

Status
colourGreen
titleSince 3.8

 nv.sma.validatemessagekey false

Controls whether or not message key validation is done prior to sending a message.

When this property is false, calls to MessageChannel.validateResolvedMessageKey(MessageView) will be a no-op. 

...

It is also possible for the caller to provide the message key directly in the send call. In this case, the supplied key will be used as is for the topic. This can be useful in certain situations , but should be used with care because it breaks the ability for the topic to be changed at runtime and receivers will not have knowledge of how to subscribe to such sends.

...

  • Variable substitution is not done on the provided key.
  • Key validation is done on the topic
  • The provided key will be prepended with the channel name if topic_starts_with_channel=true is set for the bus.

Understanding Message Receipt

...

Zero Loss Sends (Send Stability)

Solicited Sends

As mentioned earlier, the AEP engine manages in-doubt messages across the entire cluster to ensure zero loss in its outbound message stream across fail overs and restarts. In the event of a fail over within a cluster or a fresh restart of the primary member of the cluster from its transaction log, the AEP engine recovers and retransmits "in-doubt" messages to ensure zero loss. This mechanism only applies to messages sent using solicited sends. For unsolicited sends, the application needs to participate in ensuring zero loss. 

Unsolicited Sends

When an application sends a message in an unsolicited manner, it is possible that the sent message would be lost if the application crashes prior to a send being transmitted over the message bus. To handle such a situation, the AEP Engine provides a mechanism by which it notifies the application when it has ensured that the sent message has been stabilized downstream and is guaranteed to be delivered to the downstream applications. to ensure zero loss, the sending application needs to hold onto the sent message until it receives such a notification from the AEP engine at which point, the application can discard the sent message. For application, such as gateway applications, this typically involves holding onto sent messages until the AEP engine provides stability notification at which point the application not only discards the sent message but also notifies stability of the message upstream to whatever source the message was received from so the source itself can record that the message has been delivered and does not need to be delivered again

The AEP engine provides such stability notifications via the AepSendStabilityEvent and needs to be configured via dispatchSendStabilityEvents  to dispatch these events.  

Correlating Send Stability

It usually necessary for an unsolicited sender to correlate the send stability event to the source event that triggered its send. The  AepSendStabilityEvent event includes a reference to the stabilized message via the getMessage() accessor. Often times one of the fields on the message can be used to correlate the sent message to the source event. In cases where including correlation information in the published message does not make sense, it is possible to set an Attachment on the sent message that can assist in correlation when processing the stability event. This technique is illustrated in the sample below.

Example: A File Gateway

The following sample shows a file gateway app that reads lines from a file and publishes each line out over messaging. It enables AepSendStabilityEvents and on receipt of the stability event updates a cursor file to record the last line that was acknowledged by the downstream bus: 

 

Code Block
languagejava
titleSource
/**
 * The {@link FileTailGateway} tails a file sending each line as a message.
 */
@AppHAPolicy(value = HAPolicy.StateReplication)
public class FileInputGateway{
    Tracer tracer = Tracer.create("file-tailer", Level.INFO);
    private volatile AepMessageSender messageSender;
    private volatile AepEngine engine;
 
    @Configured(property = "gateway.filename")
    private volatile String filename;
 
    private RandomAccessFile cursor;
    private volatile long stabilizedLine = 0;
 
    @AppInjectionPoint
    public void injectMessageSender(AepMessageSender messageSender) {
        this.messageSender = messageSender;
    }
 
    @AppInjectionPoint
    public void injectAepEngine(AepEngine engine) {
        this.engine = engine;
    }
 
    @AppMain
    public void run(String[] args) throws Exception {
        // Open source file and cursor file.
        BufferedReader reader = new BufferedReader(new FileReader(filename));
        try {
            cursor = new RandomAccessFile(filename + ".cursor", "rwd");
            cursor.setLength(8);
            cursor.seek(0);
            // Wait for engine messaging to start:
            engine.waitForMessagingToStart();
            // Read lines until the end of the file:
            String line = null;
            stabilizedLine = cursor.readLong();
            int lineNumber = 0;
            while ((line = reader.readLine()) != null) {
                if (++lineNumber <= stabilizedLine) {
                    continue;
                }
                FileLineMessage message = FileLineMessage.create();
                message.setText(line);
                message.setFileName(filename);
                message.setLineNumber(lineNumber);
                message.setAttachment(new Long(message.getLineNumber()));
                messageSender.sendMessage("line-input", message);
            }
            // Wait for send stability and exit:
            while (stabilizedLine < lineNumber) {
                Thread.sleep(100);
            }
        }
        finally {
            reader.close();
            cursor.close();
        }
    }
 
    @EventHandler
    public final void onSendStability(AepSendStabilityEvent event) throws IOException {
        FileLineMessage message = (FileLineMessage)event.getMessage();
        stabilizedLine = (Long)message.getAttachment();
        cursor.seek(0);
        cursor.writeLong(stabilizedLine);
        cursor.getFD().sync();
        tracer.log("Got stability for: " + stabilizedLine, Level.INFO);
    }
 
    /**
     * @return The number of lines sent.
     */
    @AppStat(name = "Lines Sent")
    public long getLinesSent() {
        return stabilizedLine;
    }
}

Below shows sample configuration which creates the bus, configures messaging and enables send stability events and unsolicited send sequencing: 

Code Block
languagexml
titleConfig
<?xml version="1.0"?>
<model xmlns="http://www.neeveresearch.com/schema/x-ddl" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">  
  <!-- Define buses and channels that will be shared between applications-->
  <buses>
    <bus name="file-connector">
      <provider>activemq</provider>
      <address>localhost</address>
      <port>61616</port>
      <channels>
        <channel name="line-input">
          <qos>Guaranteed</qos>
          <key>${filename}</key>
        </channel>
      <channels>
    </bus>
  </buses>
    
  <!-- Apps will reference the buses they use in their messaging config -->
  <apps>
    <app name="file-input-gateway" mainClass="com.sample.FileInputGateway">
      <messaging>
        <factories>
          <factory name="com.sample.messages.FileGatewayMessageFactory" />
        </factories>
        <buses>
            <bus name="file-connector">
                <channels>
                    <channel name="line-input" join="false"/>
                </channels>
            </bus>
        </buses>
      <messaging>
      <dispatchSendStabilityEvents>true</dispatchSendStabilityEvents>
      <sequenceUnsolicitedWithSolicitedSends>true</sequenceUnsolicitedWithSolicitedSends>
    </app>
  </apps>
</model>

 

Receiving Messages

An application's AEP engine creates and manages the lifecycle of the message buses that an application configures for use. When an application is configured to join one or more bus channels, appropriate topic subscriptions will be issued on behalf of the application. Message dispatch occurs as follows:

  • The message bus binding implementation receives a serialized, message provider specific message.
  • Using an application supplied message view factory (generated by ADM), and using SMA message metadata also transported with the provider-specific message, the bus wraps the serialized message in a view object (also ADM generated).
  • Using message metadata transported with the message, the binding looks up the message channel on which to dispatch the received message.
  • The message view is wrapped in a MessageEvent along with its message channel and is dispatched to the application's AepEngineAEP Engine, where it is enqueued for processing. 
  • The AepEngine AEP Engine picks up the message event and dispatches to each application event handler that has a signature matching the message type. 
  • Once the AepEngine AEP Engine stabilizes the results of the application's message processing, a message acknowledgment is dispatched back to the binding. 

 

Gliffy Diagram
size750
nameInbound Message Dispatch

...

Registering Message Interest

For an application to receive messages, it must:

  • join the message channels on which the message is sent,
  • register message factories for the bus provider to deserialize the message,
  • and define an EventHandler for the message.

Configuring Channels For Join

In order for the application's AepEninge to issue subscriptions for the message channel on which a message is sent, it must be joined. Buses and channels are configured via the platform's configuration DDL. The below configuration snippet demonstrates:

...

Code Block
<?xml version="1.0"?>
<model xmlns="http://www.neeveresearch.com/schema/x-ddl" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
 
  <!-- Define buses and channels that will be shared between applications-->
  <buses>
    <bus name="sample-bus">
	  <provider>activemq</provider>
      <address>localhost</address>
      <port>61616</port>
  	  <channels>
        <channel name="new-orders-channel">
          <qos>Guaranteed</qos>
          <key>NEWORDERS/${Region}/${Department}</key>
        </channel>
      <channels>
    </bus>
  </buses>
  
  <!-- Apps will reference the buses they use in their messaging config -->
  <apps>
    <app name="sample-app" mainClass="com.sample.SampleApp">
      <messaging>
		<factories>
          <factory name="com.sample.messages.OrderMessagesFactory" />
        </factories>
		<buses>
			<bus name="sample-bus">
				<channels>
					<channel name="new-orders-channel" join="true">
						<filter>Region=US|Canada</filter>
					</channel>
				</channels>
	        </bus>
		</buses>
      <messaging>
    </app>
  </apps>
</model>

Adding an EventHandler

When a message is received by a message bus, it is enqueued into the application's Inbound Event Queue to be queued for dispatch, which the engine will pick up.

...

The application's underlying AepEngine AEP Engine will ensure that the message is acknowledged once state changes made by the handler have been stabilized. That, coupled with the engine's message deduplication feature, ensures that even in the event of a failover, the handler will only be executed once. 

Channel Filters

A channel filter filters variable parts of a channel key to filter what is received over a message channel. It is used to determine the subscriptions issued on behalf of the application. See the Configuring Channels for Join section above for an example of using channel filters. In particular, pay attention to the "key" and "filter" elements.

...

If a variable portion of the channel key is omitted in a filter, it will result in the subscription being joined in a wildcard fashion (, assuming the underlying bus implementation supports wildcards). So given a channel key of "NEWORDERS/${Region}/${Department}" and a channel filter of "Region=US|EMEA", it would result in the following subscriptions being would be issued during join:

  • NEWORDERS/US/*
  • NEWORDERS/EMEA/*

Finally, if the channel filter is set to null for the channel key in the example above, then the resulting subscription would be:

  • NEWORDERS/*/*

Cleaning Channel

...

Filters

Status
colourGreen
titlesince 3.7
 

...

As of the 3.8 Release, channel filter cleaning has been enhanced to not replace certain wildcard characters that are legal for use in subscriptions. 

...

Otherwise, any characters that are not alpha-numeric will be replaced. Prior to 3.8, any non-alphanumeric character was replaced included the wildcard combinations described above. 

Handling Received Messages

Messages dispatched to applications are read-only, it is illegal for an application to modify a received message.

As Messages may be pooled by the platform it is also not legal for an application to hold onto a message beyond the scope of its handler unless it first calls acquire() on the message. If the application does acquire() the message it should later dispose() of the message to allow the platform to reuse it. In most cases holding on to the message beyond the scope of the handler is not the best idea from a performance perspective. Instead, applications will typically copy the data they need out of the message into either application state objects or into outbound messages.

Inbound messages may not be resent as outbound messages. Applications that need to forward an inbound message as an outbound message should first copy() the message and send the message copy:

Code Block
@EventHandler
public void onMyMessageReceived(MyMessage message) {
  MyMessage copy = message.copy();
  messageSender.send("mirror", copy);
}

...

Registering Message Factories

Message bus binding implementations receive messages in serialized form and wrap them with a MessageView that is passed to the application to work with. MessageViews are wrapped by locating the message MessageViewFactory for the message which is typically generated by ADM. To locate the factory and message type, a binding consults Message Metadata that is transported along with the serialized message. An application must, therefore, register the message factories it intends to use so that bus binding can find the factories. This can be done by registration or by programming.

Registration via Config DDL

Most often, applications will list message view factories that they use in their DDL Config.

Code Block
xml
xml
<apps>
  <app name="MyApp">
    <messaging>
      <factories>
		<factory name="com.sample.messages.OrderMessagesFactory" />
      </factories>
      <buses>
        <!-- ... -->
      </buses>
    </messaging>
  </app>
</app>

...

Programmatic Registration

Registration can also be done programmatically via the AepEngineAEP Engine. A common way to do this is to provide an AppInjectionPoint for the AepEngine AEP Engine in the application.

Code Block
java
java
public class MyApp {
  
  @AppInjectionPoint
  public void onEngineCreated(AepEngine engine) {
	engine.registerFactory(new com.example.messages.MyMessageFactory());
	engine.registerFactory(new com.example.messages.other.MyOtherMessageFactory());
  }
}

Message Sequencing And Duplicate Detection

To avoid duplicates and provide exactly once delivery semantics each message sent through an AepEngine is assigned a message sender id derived from the hashcode of the application name and a monotonically increasing sequence number. The following table describes the fields that the engine sets on outbound messages to enable downstream duplicate detection. These properties are transported on a message bus binding as part of the message metadata sent with the serialized message.

...

As an AepEngine processes message it keeps track of the last seen sequence number for each sender that has sent to it as part of its HA state. Consequently, when an application fails over to a backup or is restarted it will remember the last message seen from senders sending to it and can filter duplicates on behalf of the application. When an application's state is wiped or reinitialized it will restart its sending stream sequence to 1 which alerts downstream applications not to consider its newly lowered sequence numbers as duplicates.

By default any message sent from a message handler is sequenced by an AepEngine, the application should not set the sequence number or sender itself. Messages sent from outside of a message handler (unsolicited sends) are not sequenced by default. Sequencing of unsolicited sends must be enabled via configuration discussed below.

...

For applications that are tolerant of duplicates or can perform duplicate detection on their own, it is possible to disable duplicate detection for the engine. It is also possible to enable sequencing of messages that are sent from outside of a message handler can be done as well.

Code Block
<model>
  
  <!-- Apps will reference the buses they use in their messaging config -->
  <apps>
    <app name="sample-app" mainClass="com.sample.SampleApp">
      <messaging>
		...
      <messaging>

      <!-- 
      <performDuplicateChecking>true</performDuplicateChecking>
      <setOutboundSequenceNumbers>true</setOutboundSequenceNumbers>
      <sequenceUnsolicitedSends>false</sequenceUnsolicitedSends>
      <sequenceUnsolicitedWithSolicitedSends>false</sequenceUnsolicitedWithSolicitedSends>
    </app>
  </apps>
</model>

 

The following table summarizes aeo engine configuration properties that have an impact on duplicate detection:

Config PropertyDefaultDescription

setOutboundSequenceNumbers

true

Enables setting of outbound sequence numbers on send from the application's aep engine.

When disabled a sequence number of 0 will be set on all outbound sends meaning that downstream receivers will not perform duplicate filtering for sends from this application.

See setOutboundSequenceNumbers in the DDL Config Reference

performDuplicateChecking

true

Whether or not the applications AepEngine will filter duplicates based on received sequence numbers.

An AepEngine uses the hashcode of the engine name as the sender id.  

See performDuplicateChecking in the DDL Config Reference

sequenceUnsolicitedSends

long

Whether to tag sequence numbers on unsolicited sends.

This property can be enabled for applications that only perform sends in an unsolicited fashion and will never send a message from a message handler.

(warning) Be careful about attaching sequence numbers to unsolicited sends, especially if the application is going to be doing both unsolicited and solicited sends concurrently, since that can cause messages to be sent on the wire in a sequence different from the sequence in which sequence numbers were assigned to the message thus causing legitimate messages to be dropped due to incorrect duplicate determination. For such applications, use sequenceSolicitedWithUnsolicitedSends instead to ensure that not only are unsolicited sends sequenced but that they are also correctly sequenced vis-a-vis solicited sends.

In short, enabling this mode of operation is really only applicable for gateway applications that use the application in purely unsolicited sending capacity. In most cases, applications will instead elect to use sequenceUnsolicitedWithSolicitedSends described below.

See sequenceUnsolicitedSends in the DDL Config Reference

sequenceUnsolicitedWithSolicitedSends

int

Indicates that unsolicited sends calls should result in injecting the send operation into the engine's input queue to sequence the send with respect to inbound event processing in the context of an AEP transaction. This avoids the possibility of sequencing errors with respect to outbound messages being concurrently sent by message handlers.

See sequenceUnsolicitedSendsWithSolicitedSends in the DDL Config Reference

HA Considerations

It is worth noting that for a cluster application only the primary instance of the application establishes message bus connections. In the event of a failure, a backup member will be elected and reestablish messaging connections. From a programming standpoint, this has the biggest impact on applications using the EventSourcing HA Policy as it means that even though their handlers are being invoked message channels will not have been established for the applications.

During live operation the primary application keeps track of the last transaction for which outbound messages were acknowledged. This id, known as the stableTransactionId is persisted in the applications transaction log and replicated to backups. After failure or recovery backup applications will retransmit outbound messages that are part of transactions that have not stabilized and rely on downstream applications to use the duplicate detection to eliminate any duplicates.

Handling Received Messages

Messages dispatched to applications are read-only, it is illegal for an application to modify a received message.

As Messages may be pooled by the platform it is also not legal for an application to hold onto a message beyond the scope of its handler unless it first calls acquire() on the message. If the application does acquire() the message it should later dispose() of the message to allow the platform to reuse it. In most cases holding on to the message beyond the scope of the handler is not the best idea from a performance perspective. Instead, applications will typically copy the data they need out of the message into either application state objects or into outbound messages.

Inbound messages may not be resent as outbound messages. Applications that need to forward an inbound message as an outbound message should first copy() the message and send the message copy:

Code Block
@EventHandler
public void onMyMessageReceived(MyMessage message) {
  MyMessage copy = message.copy();
  messageSender.send("mirror", copy);
}

Preserving Subscriptions on Shutdown

By default, when an engine is stopped without an error, bus channels that were 'joined' will be 'left', meaning that any subscriptions or interests created by the message bus will be unsubscribed or unregistered. This behavior For many applications, it is desirable to preserve subscriptions if an application is being gracefully shutdown for maintenance reasons – one may want messages to be queued for the application while it is down. For such cases the default behavior or unsubscribing on graceful shutdown can be overridden by configuring an application to preserve channel joins on stop.:

Code Block
titlePreserving Subscriptions via configuration
<app name="sample-app" mainClass="com.sample.SampleApp">
  <messaging>
	...
  <messaging>
  <preserveChannelJoinsOnStop>true</preserveChannelJoinsOnStop>
</app>

...

Code Block
languagejava
titleRemoving All Subscriptions Programatically
@EventHandler
public void onEngineStopping(final AepEngineStoppingEvent event) {
  if(event.getPreserveChannelJoins() && event.getCause() == null) {
    tracer.log("Overriding unsubscribe behavior on application stop to remove subscriptions.");
    event.setPreserveChannelJoins(false);
  }
}

In the above case, the value set programmatically overrides the configured value for the application. Note the the check for event.getCause() == null in the handler: it makes sure that the subscriptions are preserved if stopping due to an error which ensure no messages are lost during failover. 

 

Per Channel Subscription Preservation

Status
colourGreen
titleSince 3.12

Subscription preservation or removal can also be configured more granularly at the channel level. Like the application level configuration setting, this per channel configuration setting only applies to a graceful close. 

The following example shows an application configuring subscription preservation on a per channel basis using the preserveJoinsOnClose configuration property: 

Code Block
titlePreserving Subscriptions for a specific channel via configuration
<app name="sample-app" mainClass="com.sample.SampleApp">
  <preserveChannelJoinsOnStop>true</preserveChannelJoinsOnStop>
  <messaging>
    <buses>
      <bus name="orders-bus">
        <channels>
          <channel name="canceled-orders" join="true">
            <filter>Region=US</filter>
            <preserveJoinsOnClose>Default</preserveJoinsOnClose>
          </channel>
          <channel name="new-orders" join="true">
            <filter>Region=US</filter>
            <preserveJoinsOnClose>Preserve</preserveJoinsOnClose>
          </channel name="app-ping" join="true">
            <filter>Region=US</filter>
            <preserveJoinsOnClose>Leave</preserveJoinsOnClose>
          </channel>
        </channels>
      </bus>
    </buses>
  <messaging>
</app>

In the above case, if the application is stopped gracefully:

  • Subscriptions for the canceled-orders channel would be preserved (because preserveChannelJoinsOnStop=true at the application level).

  • Subscriptions for the new-orders channel would be preserved regardless of the configured value for preserveChannelJoinsOnStop. 
  • Subscriptions for the app-ping channel would be unsubscribed regardless of the configured value for preserveChannelJoinsOnStop. 

One can also configure per channel subscription preservation programmatically via the message channel's MessageChannelDescriptor. A programmatically set value will override that set via DDL configuration, and can be set at any time before the channel is closed:

Code Block
languagejava
titlePreserving Subscriptions for a specific channel programmatically
@EventHandler(source = "channel4@aeptest1")
public void onChannel4UpEventHandler(final AepChannelUpEvent event) {
  MessageChannel channel = event.getMessageChannel();
  if(channel.getName().equals("app-ping")) {
    channel .getDescriptor().setPreserveJoinsOnClose(PreserveJoinPolicy.Leave);
  }
}