In This Section
- Overview
- Understanding Bus Configuration
- Sending Messages
- Receiving Messages
Overview
This section assumes you have read Working with Messaging to understand the basics of SMA messaging.
As a quick refresher: Talon's Simple Message Abstraction layer SMA abstracts the concept of a message provider using MessageBuses. MessageBuses define MessageChannels over which Messages are sent and received. Message channels defined a named conduit that allows mapping to destinations in a messaging provider. Applications thus collaborate with one another by passing messages over the set of buses and channels that they share. An application that will receive messages from a Message Channel will declare an interest in 'joining' the channel which will cause the subscriptions to be issued for the channel on behalf of the application.
The sections below provide details on how applications send and receive messages.
Understanding Bus Configuration
Before diving into the details of send and receive, it is helpful to understand how message buses are configured for an application. Message buses are resources that are shared between applications. In configuration message buses are defined independently of applications to provide the common connection details and the set of channels that they provide for applications. Applications will reference the buses that they will use to collaborate with other applications along with a subset of the channels that they will work with. Applications work abstractly with buses and channel to send and receive messages, allowing configuration changes at runtime to augment:
- The messaging provider (for example to switch from ActiveMQ to Solace)
- The message bus connection details in the form of an address and optionally a port (for example to switch from a message broker in a test environment to one in production).
- Bus connection properties, a combination of properties that are common to all bus providers and configuration specific to the type of bus provider being used.
- Topic structure by modifying the channels' configured keys.
- Application subscriptions by modifying the applications' configured filters.
The above configuration is an example of configuring a bus in its decomposed property form. A message bus can alternatively be configured using the bus descriptor attribute as shown in the following example.
The bus descriptor format is useful in cases where the descriptor name may be supplied as an external configuration property that is substituted. In the following example, the bus descriptor defaults to using a loopback bus which is useful for unit testing and configured with the myBusDescriptor environment property in other environments.
Common Bus Configuration Properties
The following properties are common to all bus implementations (unless specifically noted otherwise) and can be specified in a bus descriptor either as parameters in a descriptor URL or as properties in the buses' configuration settings when specified in its decomposed form.
Property Name | Default Value | Description |
---|---|---|
set_bus_and_channel_on_receipt | false | Controls whether the bus and channel name are set on received messages. Setting the channel and bus name on inbound messages incurs performance overhead. Performance sensitive applications should avoid enabling this property. |
set_key_on_receipt | false | Controls whether the message key is set on received messages. Not all binding implementations transport the key on the wire, this property has no effect for bindings that don't transport the key. Setting the key on inbound messages incurs a performance overhead. Performance sensitive applications should avoid enabling this property. |
set_sno_on_receipt | true | Controls whether the message sequence number is set on received messages. If set to false, then inbound messages surfaced to the AEP engine and application will not have the sequence number set regardless of whether the sender was configured to set sequence numbers in outbound messages or not (see Message Sequencing) Setting the sequence number on inbound messages incurs a slight performance overhead. Performance sensitive applications should avoid enabling this property. |
enable_inbound_transport_headers | false | Controls whether transport headers are set in inbound messages Setting this parameter to true will cause the bindings, that support this functionality, to set transport specific headers in the metadata section of the inbound messages Not all binding implementations support transport header functionality. This property has no effect for such bindings. Setting the sequence number on inbound messages incurs a performance overhead. Performance sensitive applications should avoid enabling this property. Additionally, enabling such a functionality fosters a tighter coupling between the application and specific message bindings which is generally not a recommended practice |
topic_starts_with_channel | true | Controls whether topic names start with the channel name for the bus. For bus bindings that support topic routing this property controls whether or not the resolved key is prefixed with the channel name. |
SINCE 3.15 enable_concurrent_sends | false | Controls whether sends through the message bus binding can be performed concurrently using multiple threads |
SINCE 3.9 additional_properties_file | false | Specifies the path to an external file that is used to load additional bus configuration properties. The external file format is a plain java Properties file. Properties specified in the external file will be merged into configuration property set for the bus. This file is loaded at runtime, so the file is retrieved from the local file system on the host where the configured XVM will run. |
Most bus implementations support additional properties specified to the messaging provider. Consult the documentation for the bus provider to learn about the properties it supports.
Sending Messages
Messages are sent over message bus channels in a fire and forget fashion - the application can rely on the underlying AEP Engine to deliver the message according to the quality of service configured for a channel. An application views message channels as a logical, ordered conduit over which messages flow.
The diagram below depicts the path of a message sent through an engine. The basic flow of sending is as follows:
- The application creates a message.
- The application calls send, passing the message and channel name.
- The engine looks up the channel and uses it to resolves the physical destination on which the message will be sent via the configured channel key.
- The engine queues the message for sending until the associated state changes for the message handler are stabilized to the application's store (e.g replication and transaction log write).
- Once the application state changes have been stabilized, the enqueued message is then sent. This involves the following sub-steps
- The message is serialized
- The serialized contents and message metadata are then packaged in a message bus provider specific message
- The message is sent through the underlying transport.
Solicited vs Unsolicited Sends
When an outbound message is sent from within an application event handler, it is referred to as a solicited send. When an outbound message is sent from outside of a message handler, it is referred to as an unsolicited send. The steps listed above pertain to Solicited sends.
Unsolicited Sends and Send Concurrency
In an application that only performs solicited sends, the X runtime ensures that the serialization and sends through the underlying transport are done in a single threaded manner. However, when an application performs unsolicited sends using two or more concurrent threads or performs unsolicited sends concurrently with solicited sends, then the sends through the underlying transport are performed using multiple concurrent threads. By default, the X runtime is not configured for thread safety for such concurrent sends. See Concurrent Sends for more information on how to configure the X runtime for concurrent sends.
Message Sequencing
When sending a message, the AEP Engine sets the following sequencing related metadata on the outbound message.
Metadata Field | Type | Description |
---|---|---|
MessageSender | int | A globally unique id that identifies the sender of a message. By default, an AEP Engine uses the hash code of the engine name as the sender id. |
MessageSno | long | A monotonically increasing sequence number. Note: The AEP Engine uses the same sequence number space across all the channels on which outbound messages are sent. This means that receivers that only express interest in a subset of channels on which a sender engine publishes messages will not receive consecutive sequence numbers. Therefore, gaps in received sequence numbers cannot be used to determine message loss. The X runtime ensures zero loss via the use of Guaranteed (at-least-once) quality of delivery on the underlying transport coupled with persistence to deliver exactly once delivery to application message handlers. |
This metadata is transported along with the message payload via the underlying transport to the message recipients and is primarily used for duplicate detection (not message loss as explained in the table above)
Enabling/Disabling Sequence Numbers
Solicited Sends
By default, an AEP engine is configured to set sequence numbers for messages sent via for solicited sends. Whether sequence numbers are set in messages sent via solicited sends is controlled via the setOutboundSequenceNumbers configuration parameter. For example, the following disables setting sequence numbers in messages sent via solicited sends.
The default value of the setOutboundSequenceNumbers parameter is true.
Unsolicited Sends
By default, an AEP engine is configured to not set sequence numbers for messages sent via unsolicited sends. Whether sequence numbers are set in messages via unsolicited sends is controlled via both the setOutboundSequenceNumbers and sequenceUnsolicitedSends configuration parameters. For sequence numbers to be set in messages sent via unsolicited sends, both these parameters need to be set as true. Sequence numbers will not be set in messages sent via unsolicited sends if either of these parameters is false. For example, the following enables setting sequence numbers in messages sent via unsolicited sends.
The default value of the setOutboundSequenceNumbers parameter is true and the default value of sequenceUnsolicitedSends is false
Sequence Numbering and Concurrent Sends
The use of unsolicited sends introduces send concurrency related concerns. One of the concerns is the integrity of the sequence numbers in outbound messages - with concurrent sends, the system needs to ensure that the messages are transmitted on the wire in the same sequence as the setting of the sequence numbers on them. See Concurrent Sends for more information on how to configure the system for concurrent sends to ensure the integrity of the sequence numbers on the transmitted messages.
Duplicate Detection
The principal purpose of setting sequence numbers in outbound messages is for duplicate detection. By default, duplicate detection is enabled. In other words, if an AEP engine receives a message with a sequence number > 1 that is less or equal to a prior message with a sequence number > 1, then the second message is considered a duplicate and is not dispatched to the application. For applications that are tolerant of duplicates or can perform duplicate detection on their own, it is possible to disable duplicate detection in an engine even if sequence numbering is enabled. This is controlled by the performDuplicateChecking configuration parameter. The following example shows how to do so.
Sequence Numbers and Clustering
An AEP Engine replicates and persists the following metadata in its entire cluster as part of the application's overall HA state.
- Sequence number of last sent message
- Sequence number of last sent message that has been acknowledged
- Last sequence number for each of the senders that it has processed messages from
When an application fails over to a backup or is restarted from its transaction log it will remember these variables and resume operation from where it left off
When an application's state is wiped or reinitialized it will restart its sending stream sequence to 1 which alerts downstream applications not to consider its newly lowered sequence numbers as duplicates.
The set of messages between the "last sent message that was acknowledged" and the "last sent message" is considered to be the set of "in-doubt" outbound messages i.e. messages for which it is not known whether the messaging backbone has stabilized and taken ownership of the message. On a failover or a restart, all messages in this in-doubt window are retransmitted by an AEP engine. the downstream applications use duplicate detection to filter out any duplicates.
Concurrent Sends
As mentioned before, the X runtime ensures that the serialization and sending of messages through the underlying message bus binding (transport) for messages sent through an AEP engine using only solicited sends operates in a single threaded manner. In other words, for an application performing only solicited sends, the underlying bus binding does not have to be configured to be safe for concurrent sends. However, if an application performs unsolicited sends concurrently using more than one thread or performs unsolicited and solicited sends concurrently, then the X runtime needs to be configured to ensure thread safety for such concurrent sends. How one configures the system for such safety depends on the type of concurrent sends being done by the application
Only Concurrent Unsolicited Sends
If the application is NOT performing solicited sends and is performing unsolicited sends concurrently using more than one thread, then the X runtime needs to be configured for concurrent send safety by enabling concurrent sends on the underlying message bus binding. This is done using the enable_concurrent_sends bus connection descriptor property. This configuration parameter can be set as follows and is applicable to all message bus bindings supported for use with the X Platform
Concurrent Solicited and Unsolicited Sends
If an application is performing concurrent solicited and unsolicited sends, then configuring the underlying message bus binding for concurrent sends will ensure correct send operation unless sequence numbers need to be set on messages sent using unsolicited sends. Note that by default, sequence numbers are set for solicited sends and not set for unsolicited sends and, so, with those default settings, just configuring the underlying message bus binding for concurrent sends will enable safety of concurrent unsolicited and solicited sends. However, if sequence numbers are desired in messages sent via unsolicited sends, then the sequenceUnsolicitedWithSolicitedSends configuration parameter should be used instead of the enable_concurrent_sends. Setting both these parameters is safe but not necessary. The following example shows how to enable concurrency for this scenario i.e. concurrent solicited and unsolicited sends with sequence numbers in both solicited and unsolicited sent messages
Configuration Summary
The following table summarizes how to configure the X runtime for concurrent sends for the various concurrency scenarios
Solicited Sends | Concurrent Unsolicited Sends | Sequence Number in Solicited Sends | Sequence Number in Unsolicited Sends | Configuration |
---|---|---|---|---|
No | Yes | N/A | Yes or No | enable_concurrent_sends |
Yes | Yes | Yes | No | enable_concurrent_sends |
Yes | Yes | No | Yes | enable_concurrent_sends |
Yes | Yes | No | No | enable_concurrent_sends |
Yes | Yes | Yes | Yes | sequenceUnsolicitedWithSolicitedSends |
Creating Messages For Send
Message types generated by the application data modeler have no public constructors, but instances can be created via the static create() factory method on the message class. By routing all message creation through the create() factory methods, X is able to transparently switch between pooled and un-pooled allocation of messages without the need for any changes to business logic. Once a message has been created, application code can populate the message using the random access i.e. set field values by calling setter methods, as though it were any other POJO or other encoding specific mechanisms of message population
For example:
Scope of a Sent Message
Sending a message through an AEP Engine transfers ownership of the message to the engine. What this means is that, once the send call completes, the application must release its reference to the sent message. Modifying or even reading a message post send is illegal and can result in unpredictable behavior.
If a message is created, but not sent, it is up to the application to call dispose() on the message to return it the pool. Failing to return the message to a pool will not result in a memory leak, but it will mean that subsequent create() calls won't result in object allocation and promotion.
If the application needs to send the same message multiple times, use the copy() method on the message to create a new deep copy of the message such as the following:
If message pooling is enabled, sent messages will be wiped and reused in subsequent sends.
How To Send Messages
Using the AEP Message Sender
SINCE 3.5
The simplest way to send a message is to add an AppInjectionPoint annotation that allows the XVM to inject a message sender. The application then supplies the bus and channel name along with the message.
See also:
Directly Via the AEP Engine
When an application does not use the AepMessageSender, it must provide the MessageChannel in the AEP Engine's sendMessage call. When using this pattern, the application can register event handlers for channel up and down events which are dispatched to the application when messaging is started or stopped. The application can qualify the EventHandler annotation to identify a particular channelName@busName to specify the channel of interest.
Message Routing (Channel Keys)
The message channel encapsulates the physical destination on which the message will be sent using a channel key. The channel key abstraction allows the actual message provider destination to be controlled via configuration. Channel keys can be either static or dynamic:
- A static key uses the channel key as the actual topic on which to send the message. For example, "Orders/USA".
- A dynamic key contains variable components that are resolved at runtime either at startup or during a send call to resolve the specific topic. For example, "Orders/${Region}".
- Channel key functions allow portions of the key to be resolved using a function. For example, "Orders/#[orderProcessorShard = hash(${orderId}, 2)]", would resolve to Orders/1 or Orders/2 based on the hashcode of the orderId field in the message.
By default, a message channel will prepend the channel name to the beginning of the topic as the first level in the topic name. This mechanism prevents multiple channel definitions from conflicting with one another. This behavior can be disabled by configuring the bus provider with the bus property topic_starts_with_channel=false.
Keyless Channels
When no channel key is specified for a defined channel, the platform's default behavior is to use the channel name as the destination for the message. For this case, it is not possible to set topic_starts_with_channel=false.
Static Keys
A message channel that is configured with no variable key components is said to be a static key. On every send call, the same key will be used as is for the topic (again, the channel name will be prepended unless topic_starts_with_channel is set to false in the bus descriptor).
Dynamic Keys
A channel key consists of static and variable portions with the variable portions consisting identified in the form of ${propName[::defaultValue]}. For example, a channel key of 'ORDERS/${salesRegion::US}/${productId}/Purchases' is interpreted as having variable key components of 'salesRegion' and '${productId}', with 'salesRegion' specifying a default value of 'US' in cases where it is unspecified. Variable key components are resolved from the following sources in this order:
- The message contents via message reflection. The channel will look for a field accessor matching the variable key component (e.g. a getSalesRegion() method) if the message supports message reflection.
- The key resolution table supplied to an AepEngine.sendMessage method or AepMessageSender.sendMessage method (if non-null).
- The channel's key resolution table (if non-null), which is a table that can be set on the channel programmatically for key resolution when handling an AepChannelUpEvent (see restrictions below for EventSourcing considerations).
- Default values encoded into the channel key itself. In the above example, this would be 'US' for 'salesRegion'.
If a variable key segment cannot be resolved by any of the above means at send time it results in a SMAException being thrown from the send call.
Message Reflection
Message reflection allows content-based routing of the message. To use message reflection the channel key variable should match the name of the getter for the field in the message starting with a lower case character. For example for a message with a getter of:
The channel key variable should be defined as ${salesRegion}
Valid Field Types
The following message field types are supported for message key reflection and will be used as substitution values when the hasXXX() method corresponding to the field returns true.
Type | |
---|---|
String | Replaces the variable with the value of the String
|
char | Replaces the variable with the value.
|
Enumeration | When the field is set, the value is substituted with the value of the enumeration's name() method |
boolean | The value will be substituted with the lower case value of the field. |
byte, short, int, long | The value will be substituted with the value of the field. |
If the field value is null or hasXXX() returns false, channel key resolution will fall back to the key resolution table or the default value if present. If the field value is not reflectable or the field value is null and there is the value is not found in a key resolution table, then an exception is thrown.
Nested Field Reflection
Key values can come from embedded entities in the message by specifying the bean path of the field. For example, if the message is a CustomerProfileUpdateMessage with an embedded entity Address field one could specify a channel key variable ${address.zipCode} which would be substituted with getAddress().getZipCode()
.
Nested field key substitution is not necessarily a zero garbage operation and may be less performant as it relies on standard java reflection.
Key Resolution Tables
When a variable key value is not present in a message because there is no field matching the name, the field is not set or not reflectable, key resolution will fall back to a channel key resolution table if available.
Channel Key Resolution Table
In many cases, substitution values for a dynamic key come from the application environment or configuration. Message channels can be configured with a key resolution table to allow substitution of key variables that don't come from the message being sent. Channel key resolution tables can be supplied programmatically by registering an event handler for the AepChannelUpEvent and setting the key resolution table. The following shows an example of setting a global key resolution table that will be configured for every channel in the app:
Note that when using EventSourcing as the HAPolicy, using a key resolution table set on the message channel is not supported. The reason for this is that message buses are only started for applications that are operating in a Primary role. Consequently, messages sent by application logic on the backup instance will not have received an AepChannelUpEvent and won't have a key resolution table which could lead to divergent key resolution.
Caller Provided Key Resolution Table
AEP send methods allow the caller to pass in a key resolution with the send method call to augment key resolution on a case by case basis.
RawKeyResolutionTable
In addition to specifying a key resolution table using a java.util.Properties map, a RawKeyResolutionTable can be configured instead. A RawKeyResolutionTable stores the substitution value in an XString which can improve performance because the substitution value can be stored as pre-encoded bytes and eliminate character encoding costs. The following example shows how a RawKeyResolutionTable can be used instead of Properties:
When using a RawKeyResolutionTable the following restrictions apply:
- The table should not be modified after being set on a channel
- The XString values placed in the table must not be modified after inserting into the table.
- It is not permissible to set both a RawKeyResolutionTable and Properties based key resolution table on the same channel.
Channel Key Functions
SINCE 3.9
Channel key functions can be used to define a variable portion of the channel key using a function that is evaluated at runtime. Channel key functions are #[] enclosed and take the following form:
#[variableName = functionName(arg1, argN)]
where
- variableName: Names the variable portion of the key. The naming of the variable portion defined by the function is important as it allows the portion to be referenced when defining channel filters (see channel filters below)
- functionName: The name of the static registered channel key function to use.
- args: The arguments to pass to the function. The arguments can either be static values, or ${variable} values sourced from the message or key resolution tables.
Built-In Functions
The following table describes the currently available set of built-in channel key functions provided by the platform:
Function Name | |
---|---|
hash | The hash function hashes the provided value into partitioned buckets. Hash is useful for defining topic partitions. Arg1: The value to hash.
The hash function is evaluated as follows: Meaning that values produced are 1 through the number of partitions inclusive. See also: |
env | The env function allows a property name to be looked up from the current XRuntime. Arg1: propertyName the name of the runtime property to look up.
|
concat | The concat function takes 2 arguments and concatenates them together as the resolved value. Arg1: String1
|
Example:
In the above example:
- The 'new-orders' channel key second topic level is defined as a variable, ordershard that hashes the value of the orderId field in the message.
- Senders will thus send on a topic of either NEWORDERS/1, NEWORDERS/3 or NEWORDERS/3.
- The receiver declares that it will join the channel for receipt and specifies a filter on the ordershard variable of 1 or 2 causing it to issue subscriptions on NEWORDERS/1 NEWORDERS/2 but not NEWORDERS/3.
Registering Additional Functions
An application may define its own channel key functions by setting the value of `nv.sma.channelkeyfunctioncontainers` to a comma-separated list of classes that contain additional channel key functions.
Custom Key Method Requirements
A custom key function must:
- Be declared as public static.
- Accept the key being resolved as its first argument (the key function appends the resolved value to the key).
- May accept one or more additional XString arguments representing the function argument declared in the channel key.
A custom key function may throw a RuntimeException if the arguments provided are invalid. A channel key function may append nothing to the resolved key, but implementors should be careful that doing so would not produce an invalid topic.
Custom Key Function Example
Message Key Validation
When resolving topics dynamically from a message's fields it may be important to validate that value from the message field doesn't introduce additional levels in the topic or illegal characters or result in empty topic levels. To prevent this a message bus can be configured to 'clean' the dynamic variable portions of the topic at the cost of additional overhead by setting the following environment properties:
Property | Default | Description |
---|---|---|
nv.sma.maxresolvedkeylength | 0 | Controls the maximum resolved key length for resolved and static message keys. When set to a length greater than 0 the length of the message key is checked to ensure that the length is not greater this value. For maximum portability between bindings, this property can be set to the lowest value for destination lengths supported amongst the bindings that will be used. If this property is not set and "nv.sma.validatemessagekey" is set a message bus binding instance should still validate that a resolved message key is not too long. For example, the solace binding only supports topics that are 250 characters long and would enforce the resolved topic length as part of key validation. Prior to version 3.4.373, this property was named nv.sma.maxResolvedKeyLength. This property was changed to be all lowercase for uniformity with other environment property names. The old camelcase property name is still supported for backward compatibility, but the newer property name takes precedence. It is therefore important for applications that may specify and override this property from multiple locations (e.g. system property and DDL) use the same case everywhere. |
nv.sma.cleanmessagekey | false | Controls whether or not variable key parts in channel keys are sanitized by replacing any non-letter or digit character with an '_' when resolving a variable key. For example: if the channel key is specified as "/Orders/${Region}" and key resolution is performed using a message that returns a value of "Asia/Pac" for getRegion(), then the resolved key value will be "/Orders/Asia_Pac" rather than "/Orders/Asia/Pac" when this property is set to This property applies to values coming from a key resolution table or via message reflection but not to channel key functions.
Prior to version 3.4.373 this property was named nv.sma.cleanMessageKey. This property was changed to be all lowercase for uniformity with other environment property names. The old camelcase property name is still supported for backward compatibility, but the newer property name takes precedence. It is therefore important for applications that may specify and override this property from multiple locations (e.g. system property and DDL) use the same case everywhere. |
nv.sma.allowemptykeyfield | false | Controls whether or not variable key parts in channel keys may be substituted with an empty String value. For example: if the channel key is specified as "/Orders/${Region}/${Department}" and key resolution is performed using a message that returns a value of "" for getRegion(), then this property specifies that key resolution should fail when set to This property applies to values coming from a key resolution table or via message reflection but not to channel key functions.
Prior to version 3.4.373 this property was named nv.sma.allowEmptyKeyField. This property was changed to be all lowercase for uniformity with other environment property names. The old camelcase property name is still supported for backward compatibility, but the newer property name takes precedence. It is therefore important for applications that may specify and override this property from multiple locations (e.g. system property and DDL) use the same case everywhere. |
nv.sma.treatemptykeyfieldasnull | false | The XRuntime property controlling whether or not an empty key value is treated as null when resolving message keys. When this property is set it takes precedence over the value set for "nv.sma.allowemptykeyfield". If a variable key value is resolved from a message or a key resolution table as an empty string it is treated as if the value were not set at all. Unlike nv.sma.allowemptykeyfield=false, this value allows the key resolution to continue to look for valid values from other sources or the default value configured for the channel key variable, only resulting in an exception if the value can't be resolved at all. Users should take care when setting this property to SINCE 3.8 |
nv.sma.validatemessagekey | false | Controls whether or not message key validation is done prior to sending a message. When this property is |
Caller Provided Key
It is also possible for the caller to provide the message key directly in the send call. In this case, the supplied key will be used as is for the topic. This can be useful in certain situations but should be used with care because it breaks the ability for the topic to be changed at runtime and receivers will not have knowledge of how to subscribe to such sends.
When using caller supplied topic:
- Variable substitution is not done on the provided key.
- Key validation is done on the topic
- The provided key will be prepended with the channel name if topic_starts_with_channel=true is set for the bus.
Zero Loss Sends (Send Stability)
Solicited Sends
As mentioned earlier, the AEP engine manages in-doubt messages across the entire cluster to ensure zero loss in its outbound message stream across fail overs and restarts. In the event of a fail over within a cluster or a fresh restart of the primary member of the cluster from its transaction log, the AEP engine recovers and retransmits "in-doubt" messages to ensure zero loss. This mechanism only applies to messages sent using solicited sends. For unsolicited sends, the application needs to participate in ensuring zero loss.
Unsolicited Sends
When an application sends a message in an unsolicited manner, it is possible that the sent message would be lost if the application crashes prior to a send being transmitted over the message bus. To handle such a situation, the AEP Engine provides a mechanism by which it notifies the application when it has ensured that the sent message has been stabilized downstream and is guaranteed to be delivered to the downstream applications. to ensure zero loss, the sending application needs to hold onto the sent message until it receives such a notification from the AEP engine at which point, the application can discard the sent message. For application, such as gateway applications, this typically involves holding onto sent messages until the AEP engine provides stability notification at which point the application not only discards the sent message but also notifies stability of the message upstream to whatever source the message was received from so the source itself can record that the message has been delivered and does not need to be delivered again
The AEP engine provides such stability notifications via the AepSendStabilityEvent and needs to be configured via dispatchSendStabilityEvents to dispatch these events.
Correlating Send Stability
It usually necessary for an unsolicited sender to correlate the send stability event to the source event that triggered its send. The AepSendStabilityEvent event includes a reference to the stabilized message via the getMessage() accessor. Often times one of the fields on the message can be used to correlate the sent message to the source event. In cases where including correlation information in the published message does not make sense, it is possible to set an Attachment on the sent message that can assist in correlation when processing the stability event. This technique is illustrated in the sample below.
Example: A File Gateway
The following sample shows a file gateway app that reads lines from a file and publishes each line out over messaging. It enables AepSendStabilityEvents and on receipt of the stability event updates a cursor file to record the last line that was acknowledged by the downstream bus:
Below shows sample configuration which creates the bus, configures messaging and enables send stability events and unsolicited send sequencing:
Receiving Messages
An application's AEP engine creates and manages the lifecycle of the message buses that an application configures for use. When an application is configured to join one or more bus channels, appropriate topic subscriptions will be issued on behalf of the application. Message dispatch occurs as follows:
- The message bus binding implementation receives a serialized, message provider specific message.
- Using an application supplied message view factory (generated by ADM), and using SMA message metadata also transported with the provider-specific message, the bus wraps the serialized message in a view object (also ADM generated).
- Using message metadata transported with the message, the binding looks up the message channel on which to dispatch the received message.
- The message view is wrapped in a MessageEvent along with its message channel and is dispatched to the application's AEP Engine, where it is enqueued for processing.
- The AEP Engine picks up the message event and dispatches to each application event handler that has a signature matching the message type.
- Once the AEP Engine stabilizes the results of the application's message processing, a message acknowledgment is dispatched back to the binding.
Registering Message Interest
For an application to receive messages, it must:
- join the message channels on which the message is sent,
- register message factories for the bus provider to deserialize the message,
- and define an EventHandler for the message.
Configuring Channels For Join
In order for the application's AepEninge to issue subscriptions for the message channel on which a message is sent, it must be joined. Buses and channels are configured via the platform's configuration DDL. The below configuration snippet demonstrates:
- Defining a bus named "sample-bus" with a "new-orders-channel".
- An application that uses the "sample-bus" and joins the "new-orders-channel" (note the usage of
join="true"
)
Adding an EventHandler
When a message is received by a message bus, it is enqueued into the application's Inbound Event Queue to be queued for dispatch, which the engine will pick up.
The application's underlying AEP Engine will ensure that the message is acknowledged once state changes made by the handler have been stabilized. That, coupled with the engine's message deduplication feature, ensures that even in the event of a failover, the handler will only be executed once.
Channel Filters
A channel filter filters variable parts of a channel key to filter what is received over a message channel. It is used to determine the subscriptions issued on behalf of the application. See the Configuring Channels for Join section above for an example of using channel filters. In particular, pay attention to the "key" and "filter" elements.
Channel filter syntax takes the following form:
var1=val1[|val2][;var2=val3]
For example, given a channel key of "NEWORDERS/${Region}/{Department}"
, one can specify a channel filter of "Region=US|EMEA;Department=Clothing"
. This would join the channel on:
- NEWORDERS/US/Clothing
- NEWORDERS/EMEA/Clothing
If a variable portion of the channel key is omitted in a filter, it will result in the subscription being joined in a wildcard fashion, assuming the underlying bus implementation supports wildcards. So given a channel key of "NEWORDERS/${Region}/${Department}"
and a channel filter of "Region=US|EMEA"
, the following subscriptions would be issued during join:
- NEWORDERS/US/*
- NEWORDERS/EMEA/*
Finally, if the channel filter is set to null for the channel key in the example above, then the resulting subscription would be:
- NEWORDERS/*/*
Cleaning Channel Filters
SINCE 3.7
When the global configuration setting nv.sma.cleanchannelfilter is set to true, non-alphanumeric characters in the configured filter values will be replaced by underscores in order to match the keys used on the send side. The below configuration setting can be used to opt-out of this behavior, but typically the default behavior is more desirable:
Property | Default | Description |
---|---|---|
nv.sma.cleanchannelfilter | false* | Controls whether or not channel filter values are sanitized by replacing any non-letter or digit character with a '_'. For example, if the channel key is specified as "/Orders/${Region}" and a filter of "Region=Asia/Pac" is given, then the filter will match all messages with the resolved key value of "/Orders/Asia_Pac" (rather than "/Orders/Asia/Pac"). *Default Value:
|
As of the 3.8 Release, channel filter cleaning has been enhanced to not replace certain wildcard characters that are legal for use in subscriptions.
- Solace Binding: A '>' will not be replaced if it represents the whole topic level and a '*' will not be replaced if it is the last character in the topic level. See also: Solace Binding
- Loopback Binding: A filter level of "..." will not be replaced nor will a '*' found anywhere in the topic level. See also Loopback Binding
- JMS Binding: Depends on the provider instance ... see JMS Binding
Otherwise, any characters that are not alpha-numeric will be replaced. Prior to 3.8, any non-alphanumeric character was replaced included the wildcard combinations described above.
Registering Message Factories
Message bus binding implementations receive messages in serialized form and wrap them with a MessageView that is passed to the application to work with. MessageViews are wrapped by locating the message MessageViewFactory for the message which is typically generated by ADM. To locate the factory and message type, a binding consults Message Metadata that is transported along with the serialized message. An application must, therefore, register the message factories it intends to use so that bus binding can find the factories. This can be done by registration or by programming.
Registration via Config DDL
Most often, applications will list message view factories that they use in their DDL Config.
Programmatic Registration
Registration can also be done programmatically via the AEP Engine. A common way to do this is to provide an AppInjectionPoint for the AEP Engine in the application.
Handling Received Messages
Messages dispatched to applications are read-only, it is illegal for an application to modify a received message.
As Messages may be pooled by the platform it is also not legal for an application to hold onto a message beyond the scope of its handler unless it first calls acquire() on the message. If the application does acquire() the message it should later dispose() of the message to allow the platform to reuse it. In most cases holding on to the message beyond the scope of the handler is not the best idea from a performance perspective. Instead, applications will typically copy the data they need out of the message into either application state objects or into outbound messages.
Inbound messages may not be resent as outbound messages. Applications that need to forward an inbound message as an outbound message should first copy() the message and send the message copy:
Preserving Subscriptions on Shutdown
By default, when an engine is stopped without an error, bus channels that were 'joined' will be 'left', meaning that any subscriptions or interests created by the message bus will be unsubscribed or unregistered. For many applications, it is desirable to preserve subscriptions if an application is being gracefully shutdown for maintenance reasons – one may want messages to be queued for the application while it is down. For such cases the default behavior or unsubscribing on graceful shutdown can be overridden by configuring an application to preserve channel joins on stop:
Note that this property has no effect when an engine shuts down with an error (e.g. AepEngine.stop(Exception) with a non-null cause. In this case, channel joins are left intact, allowing a backup to take over.
This behavior can also be overridden programmatically on a case by case basis by an EventHandler for the AepEngineStoppingEvent setting AepEngineStoppingEvent.setPreserveChannelJoins(boolean):
In the above case, the value set programmatically overrides the configured value for the application.
Per Channel Subscription Preservation
SINCE 3.12
Subscription preservation or removal can also be configured more granularly at the channel level. Like the application level configuration setting, this per channel configuration setting only applies to a graceful close.
The following example shows an application configuring subscription preservation on a per channel basis using the preserveJoinsOnClose configuration property:
In the above case, if the application is stopped gracefully:
Subscriptions for the
canceled-orders
channel would be preserved (becausepreserveChannelJoinsOnStop=true
at the application level).- Subscriptions for the
new-orders
channel would be preserved regardless of the configured value for preserveChannelJoinsOnStop. - Subscriptions for the
app-ping
channel would be unsubscribed regardless of the configured value for preserveChannelJoinsOnStop.
One can also configure per channel subscription preservation programmatically via the message channel's MessageChannelDescriptor. A programmatically set value will override that set via DDL configuration, and can be set at any time before the channel is closed: