The Talon Manual

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Property NameDefault ValueDescription
set_bus_and_channel_on_receiptfalse

Controls whether the bus and channel name are set on received messages.

Setting the channel and bus name on inbound messages incurs performance overhead. Performance sensitive applications should avoid enabling this property. 

set_key_on_receiptfalse

Controls whether the message key is set on received messages.

Not all binding implementations transport the key on the wire, this property has no effect for bindings that don't transport the key.

Setting the key on inbound messages incurs a performance overhead. Performance sensitive applications should avoid enabling this property. 

set_sno_on_receipttrue

Controls whether the message sequence number is set on received messages.

If set to false, then inbound messages surfaced to the AEP engine and application will not have the sequence number set regardless of whether the sender was configured to set sequence numbers in outbound messages or not (see Message Sequencing)

Setting the sequence number on inbound messages incurs a slight performance overhead. Performance sensitive applications should avoid enabling this property. 

enable_inbound_transport_headersfalse

Controls whether transport headers are set in inbound messages

Setting this parameter to true will cause the bindings, that support this functionality, to set transport specific headers in the metadata section of the inbound messages

Not all binding implementations support transport header functionality. This property has no effect for such bindings.

Setting the sequence number on inbound messages incurs a performance overhead. Performance sensitive applications should avoid enabling this property.  Additionally, enabling such a functionality fosters a tighter coupling between the application and specific message bindings which is generally not a recommended practice

topic_starts_with_channeltrue

Controls whether topic names start with the channel name for the bus.

For bus bindings that support topic routing this property controls whether or not the resolved key is prefixed with the channel name.

Status
colourGreen
titleSINCE 3.15

enable_concurrent_sends

falseControls whether sends through the message bus binding can be performed concurrently using multiple threads

Status
colourGreen
titleSINCE 3.9

additional_properties_file

falseSpecifies the path to an external file that is used to load additional bus configuration properties. The external file format is a plain java Properties file. Properties specified in the external file will be merged into configuration property set for the bus. This file is loaded at runtime, so the file is retrieved from the local file system on the host where the configured XVM will run.

...

Note
titleUnsolicited Sends and Send Concurrency

In an application that only performs solicited sends, the X runtime ensures that the serialization and sends through the underlying transport are done in a single threaded manner. However, when an application performs unsolicited sends using two or more concurrent threads or performs unsolicited sends concurrently with solicited sends, then the sends through the underlying transport are performed using multiple concurrent threads. By default, the X runtime is not configured for thread safety for such concurrent sends. See Concurrent Sends for more information on how to configure the X runtime for concurrent sends.

...

Metadata FieldTypeDescription
MessageSender int

A globally unique id that identifies the sender of a message.

By default, an AEP Engine uses the hash code of the engine name as the sender id.   

MessageSno long

A monotonically increasing sequence number.  

A sequence number of <=0 indicates that the message is not sequenced i.e. receivers should not perform duplicate checking on it.
A sequence number of 1 indicates that the start (or restart of a stream) i.e. a receiver that receives sequence numbers 1,2,3,1,2,3 should not consider the 4th message a duplicate.
Otherwise, receivers should consider a sequence number not greater than the previous sequence number as a duplicate
 

Note: The AEP Engine uses the same sequence number space across all the channels on which outbound messages are sent. This means that receivers that only express interest in a subset of channels on which a sender engine publishes messages will not receive consecutive sequence numbers. Therefore, gaps in received sequence numbers cannot be used to determine message loss. The X runtime ensures zero loss via the use of Guaranteed (at-least-once) quality of delivery on the underlying transport coupled with persistence to deliver exactly once delivery to application message handlers.

...

By default, an AEP engine is configured to not set sequence numbers for messages sent via unsolicited sends. Whether sequence numbers are set in messages via unsolicited sends is controlled via both the setOutboundSequenceNumbers and sequenceUnsolicitedSends configuration parameters. For sequence numbers to be set in messages sent via unsolicited sends, both these parameters need to be set as true. Sequence numbers will not be set in messages sent via unsolicited sends if either of these parameters is false. For example, the following enables setting sequence numbers in messages sent via unsolicited sends.

...

The use of unsolicited sends introduces send concurrency related concerns. One of the concerns is the integrity of the sequence numbers in outbound messages - with concurrent sends, the system needs to ensure that the messages are transmitted on the wire in the same sequence as the setting of the sequence numbers on them. See Concurrent Sends for more information on how to configure the system for concurrent sends to ensure the integrity of the sequence numbers on the transmitted messages.

...

Code Block
<model>
  
  <!-- Apps will reference the buses they use in their messaging config -->
  <apps>
    <app name="sample-app" mainClass="com.sample.SampleApp">
      <messaging>
		...
      <messaging>

      <!-- 
      <performDuplicateChecking>true</performDuplicateChecking>
    </app>
  </apps>
</model>

Anchor
SequenceNumbersAndClustering
SequenceNumbersAndClustering
Sequence Numbers and Clustering

An AEP Engine replicates and persists the following metadata in its entire cluster as part of the application's overall HA state.

...

If the application is NOT performing solicited sends and is performing unsolicited sends concurrently using more than one thread, then the X runtime needs to be configured for concurrent send safety by enabling concurrent sends on the underlying message bus binding. This is done using the enable_concurrent_sends bus connection descriptor property. This configuration parameter can be set as follows and is applicable to all message bus bindings supported for use with the X Platform

...

If an application is performing concurrent solicited and unsolicited sends, then although configuring the underlying message bus binding for concurrent sends will also ensure correct send operation unless sequence numbers need to be set on messages sent using unsolicited sends (. Note that by default, sequence numbers are set for solicited sends and not set for unsolicited sends and, so, with those default sequencing related settings, just configuring the underlying message bus binding for concurrent sends will enable safety of concurrent unsolicited and solicited sends. However, if sequence numbers are desired in messages sent via unsolicited sends, then the sequenceUnsolicitedWithSolicitedSends configuration parameter should be used instead of the enable_concurrent_sends. Setting both these parameters is safe but not necessary. The following example shows how to enable concurrency for this scenario i.e. concurrent solicited and unsolicited sends with sequence numbers in both solicited and unsolicited sent messages

...

Solicited SendsConcurrent Unsolicited SendsSequence Number in Solicited SendsSequence Number in Unsolicited SendsConfiguration
NoYesN/AYes or Noenable_concurrent_sends
YesYesYesNoenable_concurrent_sends
YesYesNoYesenable_concurrent_sends
YesYesNoNoenable_concurrent_sends
YesYesYesYessequenceUnsolicitedWithSolicitedSends

...

  • Variable substitution is not done on the provided key.
  • Key validation is done on the topic
  • The provided key will be prepended with the channel name if topic_starts_with_channel=true is set for the bus.

Zero Loss Sends (Send Stability)

Solicited Sends

As mentioned earlier, the AEP engine manages in-doubt messages across the entire cluster to ensure zero loss in its outbound message stream across fail overs and restarts. In the event of a fail over within a cluster or a fresh restart of the primary member of the cluster from its transaction log, the AEP engine recovers and retransmits "in-doubt" messages to ensure zero loss. This mechanism only applies to messages sent using solicited sends. For unsolicited sends, the application needs to participate in ensuring zero loss. 

Unsolicited Sends

When an application sends a message in an unsolicited manner, it is possible that the sent message would be lost if the application crashes prior to a send being transmitted over the message bus. To handle such a situation, the AEP Engine provides a mechanism by which it notifies the application when it has ensured that the sent message has been stabilized downstream and is guaranteed to be delivered to the downstream applications. to ensure zero loss, the sending application needs to hold onto the sent message until it receives such a notification from the AEP engine at which point, the application can discard the sent message. For application, such as gateway applications, this typically involves holding onto sent messages until the AEP engine provides stability notification at which point the application not only discards the sent message but also notifies stability of the message upstream to whatever source the message was received from so the source itself can record that the message has been delivered and does not need to be delivered again

The AEP engine provides such stability notifications via the AepSendStabilityEvent and needs to be configured via dispatchSendStabilityEvents  to dispatch these events.  

Correlating Send Stability

It usually necessary for an unsolicited sender to correlate the send stability event to the source event that triggered its send. The  AepSendStabilityEvent event includes a reference to the stabilized message via the getMessage() accessor. Often times one of the fields on the message can be used to correlate the sent message to the source event. In cases where including correlation information in the published message does not make sense, it is possible to set an Attachment on the sent message that can assist in correlation when processing the stability event. This technique is illustrated in the sample below.

Example: A File Gateway

The following sample shows a file gateway app that reads lines from a file and publishes each line out over messaging. It enables AepSendStabilityEvents and on receipt of the stability event updates a cursor file to record the last line that was acknowledged by the downstream bus: 

 

Code Block
languagejava
titleSource
/**
 * The {@link FileTailGateway} tails a file sending each line as a message.
 */
@AppHAPolicy(value = HAPolicy.StateReplication)
public class FileInputGateway{
    Tracer tracer = Tracer.create("file-tailer", Level.INFO);
    private volatile AepMessageSender messageSender;
    private volatile AepEngine engine;
 
    @Configured(property = "gateway.filename")
    private volatile String filename;
 
    private RandomAccessFile cursor;
    private volatile long stabilizedLine = 0;
 
    @AppInjectionPoint
    public void injectMessageSender(AepMessageSender messageSender) {
        this.messageSender = messageSender;
    }
 
    @AppInjectionPoint
    public void injectAepEngine(AepEngine engine) {
        this.engine = engine;
    }
 
    @AppMain
    public void run(String[] args) throws Exception {
        // Open source file and cursor file.
        BufferedReader reader = new BufferedReader(new FileReader(filename));
        try {
            cursor = new RandomAccessFile(filename + ".cursor", "rwd");
            cursor.setLength(8);
            cursor.seek(0);
            // Wait for engine messaging to start:
            engine.waitForMessagingToStart();
            // Read lines until the end of the file:
            String line = null;
            stabilizedLine = cursor.readLong();
            int lineNumber = 0;
            while ((line = reader.readLine()) != null) {
                if (++lineNumber <= stabilizedLine) {
                    continue;
                }
                FileLineMessage message = FileLineMessage.create();
                message.setText(line);
                message.setFileName(filename);
                message.setLineNumber(lineNumber);
                message.setAttachment(new Long(message.getLineNumber()));
                messageSender.sendMessage("line-input", message);
            }
            // Wait for send stability and exit:
            while (stabilizedLine < lineNumber) {
                Thread.sleep(100);
            }
        }
        finally {
            reader.close();
            cursor.close();
        }
    }
 
    @EventHandler
    public final void onSendStability(AepSendStabilityEvent event) throws IOException {
        FileLineMessage message = (FileLineMessage)event.getMessage();
        stabilizedLine = (Long)message.getAttachment();
        cursor.seek(0);
        cursor.writeLong(stabilizedLine);
        cursor.getFD().sync();
        tracer.log("Got stability for: " + stabilizedLine, Level.INFO);
    }
 
    /**
     * @return The number of lines sent.
     */
    @AppStat(name = "Lines Sent")
    public long getLinesSent() {
        return stabilizedLine;
    }
}

Below shows sample configuration which creates the bus, configures messaging and enables send stability events and unsolicited send sequencing: 

Code Block
languagexml
titleConfig
<?xml version="1.0"?>
<model xmlns="http://www.neeveresearch.com/schema/x-ddl" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">  
  <!-- Define buses and channels that will be shared between applications-->
  <buses>
    <bus name="file-connector">
      <provider>activemq</provider>
      <address>localhost</address>
      <port>61616</port>
      <channels>
        <channel name="line-input">
          <qos>Guaranteed</qos>
          <key>${filename}</key>
        </channel>
      <channels>
    </bus>
  </buses>
    
  <!-- Apps will reference the buses they use in their messaging config -->
  <apps>
    <app name="file-input-gateway" mainClass="com.sample.FileInputGateway">
      <messaging>
        <factories>
          <factory name="com.sample.messages.FileGatewayMessageFactory" />
        </factories>
        <buses>
            <bus name="file-connector">
                <channels>
                    <channel name="line-input" join="false"/>
                </channels>
            </bus>
        </buses>
      <messaging>
      <dispatchSendStabilityEvents>true</dispatchSendStabilityEvents>
      <sequenceUnsolicitedWithSolicitedSends>true</sequenceUnsolicitedWithSolicitedSends>
    </app>
  </apps>
</model>

 

Receiving Messages

An application's AEP engine creates and manages the lifecycle of the message buses that an application configures for use. When an application is configured to join one or more bus channels, appropriate topic subscriptions will be issued on behalf of the application. Message dispatch occurs as follows:

...