|
This section assumes you have read Working with Messaging to understand the basics of SMA messaging.
As a quick refresher: Talon's Simple Message Abstraction layer SMA abstracts the concept of a message providers using MessageBuses. MessageBuses define MessageChannels over which Messages are sent and received. Message channels defined a named conduit that allows mapping to destinations in a messaging provider. Applications thus collaborate with one another by passing messages over the set of buses and channels that they share. An application that will receive messages from a Message Channel will declare interest in 'joining' the channel which will cause the subscriptions to be issued for the channel on behalf of the application.
The sections below provides details on how applications send and receive messages.
Before diving into the details of send and receive, it is helpful to understand how message buses are configured for an application. Message buses are resources that are shared between applications. In configuration a message buses are defined independently of applications to provide the common connection details and the set of channels that they provide for applications. Applications will reference the buses that they will use to collaborate with other applications along with a subset of the channels that they will work with. Applications work abstractly with buses and channel to send and receive messages, allowing configuration changes at runtime to augment
<model> <!-- Define buses and channels that will be shared between applications--> <buses> <bus name="sample-bus"> <provider>activemq</provider> <address>localhost</address> <port>61616</port> <channels> <channel name="new-orders-channel"> <qos>Guaranteed</qos> <key>NEWORDERS/${Region}/${Department} </channel> <channel name="order-events"> <qos>Guaranteed</qos> <key>ORDEREVENTS/${Region}/${EventType}/${Department} </channel> <channels> </bus> </buses> <!-- Apps will reference the buses they use in their messaging config --> <apps> <app name="sample-app" mainClass="com.sample.SampleApp"> <messaging> <factories> <factory name="com.sample.messages.OrderMessagesFactory" /> </factories> <buses> <bus name="sample-bus"> <channel name="new-orders-channel" join="true"> <filter>Region=US|Canada</filter> </channel> </bus> </buses> <messaging> </app> </apps> </model> |
Sending Messages
Messages are sent over message bus channels in a fire and forget fashion - the application can rely on the underlying AepEngine to deliver the message according to the quality of service configured for a channel. An application views message channels as a logical, ordered conduit over which messages flow.
The diagram below depicts the path of a message sent through an engine. The basic flow of sending is as follows:
When an outbound message is sent from within an application event handler, it is referred to as a solicited send. When an outbound message is sent from outside of a message handler, it is referred to as an unsolicited send.
The AepEngine and underlying SMA message channels are not designed to be thread safe. This means applications should not mix solicited and unsolicited sends using the same AepEngine. If an application is performing unsolicited sends using the same AepEngine across multiple threads, be sure there is appropriate synchronization around any access to the AepEngine and associated objects. |
Message types generated by the application data modeler have no public constructors, but instances can be created via the static create() factory method on the message class. By routing all message creation through the create() factory methods, X is able to transparently switch between pooled and unpooled allocation of messages without the need for any changes to business logic. Once a message has been created, application code can populate field values by calling setter methods, as though it were any other POJO.
For example:
MyMessage message = MyMessage.create(); message.setStringField("test"); message.setIntField(123); |
Outbound messages sent by the application may be serialized on background threads. Because of this, message instances must not be used by the application after being sent nor should they be resent. This includes any nested entities within the message. If the application needs to send the same message multiple times, use the copy() method on the message to create a new deep copy of the message:
MyMessage original = MyMessage.create(); original.setName("Neeve"); original.setYear(2017); MyMessage copy = original.copy(); copy.setYear(2018); sender.send("destination1", original); sender.send('destination2", copy); |
If message pooling is enabled, sent messages will be wiped and reused in subsequent sends.
Sending a message through an AepEngine transfers ownership of the message to the engine. When a message is sent from within an application event handler, the application may not retain the message or any of its nested entities beyond the scope of the message handler as pooling may invalidate them. If a message is created, but not sent, it is up to the application to call dispose() on the message to return it the pool. Failing to return the message to a pool will not result in a memory leak, but it will mean that subsequent create() calls won't result in object allocation and promotion. |
The simplest way to send a message is to add an AppInjectionPoint annotation that allows the XVM to inject a message sender. The application then supplies the bus and channel name along with the message.
public class MyApp { private volatile AepMessageSender messageSender; @AppInjectionPoint public void injectMessageSender(AepMessageSender messageSender) { this.messageSender = messageSender; } @EventHandler public void onNewOrder(NewOrderMessage message) { // create the message OrderEventMessage event = OrderEventMessage.create(); event.setOrderId(message.getOrderId()); // send message through the aep sender // on the 'order-event-channel' of the 'order-processing-bus' messageSender.sendMessage("order-processing-bus" "order-event-channel", event); } } |
See also:
When an application does not use the AepMessageSender, it must provide the MessageChannel in the AepEngine's sendMessage call. When using this pattern, the application can register event handlers for channel up and down events which are dispatched to the application when messaging is started or stopped. The application can qualify the EventHandler annotation to identify a particular channelName@busName to specify the channel of interest.
public class MyApp { private volatile MessageChannel orderEventChannel; private volatile AepEngine aepEngine; @AppInjectionPoint public void onEngineInjected(AepEngine aepEngine) { this.aepEngine = aepEngine; } @EventHandler(source = "order-event-channel@order-processing-bus") public void onOrderEventChannelUp(AepChannelUpEvent channelUpEvent) { this.orderEventChannel= channelUpEvent.getChannel(); } @EventHandler(source = "new-orders-channel@order-processing-bus") public void onOnOrderEventChannelDown(AepChannelDownEvent channelDownEvent) { this.orderEventChannel= null; } @EventHandler public void onNewOrder(NewOrderMessage message) { OrderEventMessage event = OrderEventMessage.create(); event.setOrderId(message.getOrderId()); // The channel name and bus must be set on the message // if using Event Sourcing because handlers are invoked // on the backup and during recovery before messaging is // started. // // This is not required when using State Replication. event.setMessageChannel("order-event-channel"); event.setMessageBus("order-processing-bus"); aepEngine.sendMessage(orderEventChannel, event); } } |
The message channel encapsulates the physical destination on which the message will be sent using a channel key. The channel key abstraction allows the actual message provider destination to be be controlled via configuration. Channel keys can be either static or dynamic:
By default a message channel will prepend the channel name to the beginning of the topic as the first level in the topic name. This mechanism prevents multiple channel definitions from conflicting with one another. This behavior can be disabled by configuring the bus provider with the bus property topic_starts_with_channel=false.
When no channel key is specified for a defined channel, the platform's default behavior is to use the channel name as the destination for the message. For this case it is not possible to set topic_starts_with_channel=false.
A message channel that is configured with no variable key components is said to be a static key. On every send call, the same key will be used as is for the topic (again, the channel name will be prepended unless topic_starts_with_channel is set to false in the bus descriptor).
A channel key consists of static and variable portions with the variable portions consisting identified in the form of ${propName[::defaultValue]}. For example, a channel key of 'ORDERS/${salesRegion::US}/${productId}/Purchases' is interpreted as having variable key components of 'salesRegion' and '${productId}', with 'salesRegion' specifying a default value of 'US' in cases where it is unspecified. Variable key components are resolved from the following sources in this order:
If a variable key segment cannot be resolved by any of the above means at send time it results in an SMAException being thrown from the send call.
Message reflection allows content based routing of the message. To use message reflection the channel key variable should match the name of the getter for the field in the message starting with a lower case character. For example for a message with a getter of:
public final String getSalesRegion(); |
The channel key variable should be defined as ${salesRegion}
The following message field types are supported for message key reflection and will be used as substitution values when the hasXXX() method corresponding to the field returns true.
Type | |
---|---|
String | Replaces the variable with the value of the String
|
char | Replaces the variable with the value.
|
Enumeration | When the field is set, the value is substitution with the value of the the enumerations name() method |
boolean | The value will be substituted with the lower case value of the field. |
byte, short, int, long | The value will be substitution with the value of the field. |
If the field value is null or or hasXXX() returns false, channel key resolution will fall back to the key resolution table or default value if present. If the field value is not reflectable or the field value is null and there is the value is not found in a key resolution table, then an exception is thrown.
Key values can come from embedded entities in the message by specifying the bean path of the field. For example if the message is a CustomerProfileUpdateMessage with an embedded entity Address field one might could specify a channel key variable ${address.zipCode} which would be substituted with getAddress().getZipCode()
.
Nested field key substitution is not necessarily a zero garbage operation and may be less performance as it relies on standard java reflection.
When a variable key value is not present in a message because there is no field matching the name, the field is not set or not reflectable, key resolution will fall back to a channel key resolution table if available.
In many cases, substitution values for a dynamic key come from the application environment or configuration. Message channels can be configured with a key resolution table to allow substitution of key variables that don't come from the message being sent. Channel key resolution tables can be supplied programatically by registering an event handler for the AepChannelUpEvent and setting the key resolution table. The following shows an example of setting a global key resolution table that will be configured for every channel in the app:
public class MyApp { private final Properties globalKeyResolutionTable = new Properties(); public MyApp() { krt.put("Region", "EMEA"); krt.put("Shard", "1"); } @EventHandler() public void onOrderEventChannelUp(AepChannelUpEvent channelUpEvent) { channelUpEvent.getChannel.setKeyResolutionTable(globalKeyResolutionTable); } } |
AEP send methods allow the caller to pass in a key resolution with the send method call to augment key resolution on case by case basis.
In addition to specifying a key resolution table using a java.util.Properties map, a RawKeyResolutionTable can be configured instead. A RawKeyResolutionTable stores the substitution value in an XString which can improve performance because the substitution value can be stored as pre encoded bytes and eliminate character encoding costs. The following example shows how a RawKeyResolutionTable can be used instead of Properties:
public class MyApp { private final RawKeyResolutionTable globalKeyResolutionTable = MessageBusBindingFactory.createRawKeyResolutionTable() public MyApp() { krt.put("Region", XString.create("EMEA")); krt.put("Shard", XString.create("1")); } @EventHandler() public void onOrderEventChannelUp(AepChannelUpEvent channelUpEvent) { channelUpEvent.getChannel.setRawKeyResolutionTable(globalKeyResolutionTable); } } |
When using a RawKeyResolutionTable the following restriction apply:
When resolving topics dynamically from a messages fields it may be important to validate that value from the message field doesn't introduce additional levels in the topic or illegal characters or result in empty topic levels. To prevent this a message bus can be configured to 'clean' the dynamic portions of the topic at the cost of additional overhead by setting the following environment properties:
Property | Default | Description |
---|---|---|
nv.sma.maxresolvedkeylength | 0 | Controls the maximum resolved key length for resolved and static message keys. When set to a length greater than 0 the length of the message key is checked to ensure that the length is not greater this value. For maximum portability between bindings this property can be set to the lowest value for destination lengths supported amongst the bindings that will be used. If this property is not set and "nv.sma.validatemessagekey" is set a message bus binding instance should still validate that a resolved message key is not too long. For example the solace binding only supports topics that are 256 characters long and would enforce the resolved topic length as part of key validation. |
nv.sma.cleanmessagekey | false | Controls whether or not variable key parts in channel keys are sanitized by replacing any non letter or digit character with an '_' when resolving a variable key. For example: if the channel key is specified as "/Orders/${Region}" and key resolution is performed using a message that returns a value of "Asia/Pac" for getRegion(), then the resolved key value will be "/Orders/Asia_Pac" rather than "/Orders/Asia/Pac" when this property is set to This property applies to values coming from a key resolution table or via message reflection. |
nv.sma.allowemptykeyfield | false | Controls whether or not variable key parts in channel keys may be substituted with an empty String value. For example: if the channel key is specified as "/Orders/${Region}/${Department}" and key resolution is performed using a message that returns a value of "" for getRegion(), then this property specifies that key resolution should fail when set to This property applies to values coming from a key resolution table or via message reflection. |
nv.sma.validatemessagekey | false | Controls whether or not message key validation is done prior to sending a message. When this property is |
It is also possible for the caller to provide the message key directly in the send call. In this case the supplied key will be used as is for the topic. This can be useful in certain situations, but should be used with care because it breaks the ability for the topic to be changed at runtime and receivers will not have knowledge of how to subscribe to such sends.
@EventHandler public void onNewOrder(NewOrderMessage message) { // create the message OrderEventMessage event = OrderEventMessage.create(); event.setOrderId(message.getOrderId()); // send message through the aep sender // on the 'order-event-channel' of the 'order-processing-bus' messageSender.sendMessage("order-processing-bus" "order-event-channel", event, "CUSTOMORDEREVENTS/" + message.getOrderType()); } |
When using caller supplied topic:
An application's AepEngine creates and manages the lifecycle of the message buses that an application configures for use. When an application is configured to join one of more bus channels, appropriate topic subscriptions will be issued on behalf of the application. Message dispatch occurs as follows:
For an application to receive messages, it must:
In order for the application's AepEninge to issue subscriptions for the message channel on which a message is sent, it must be joined. Buses and channels are configured via the platform's configuration DDL. The below configuration snippet demonstrates:
join="true"
)
<?xml version="1.0"?> <model xmlns="http://www.neeveresearch.com/schema/x-ddl" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <!-- Define buses and channels that will be shared between applications--> <buses> <bus name="sample-bus"> <provider>activemq</provider> <address>localhost</address> <port>61616</port> <channels> <channel name="new-orders-channel"> <qos>Guaranteed</qos> <key>NEWORDERS/${Region}/${Department}</key> </channel> <channels> </bus> </buses> <!-- Apps will reference the buses they use in their messaging config --> <apps> <app name="sample-app" mainClass="com.sample.SampleApp"> <messaging> <factories> <factory name="com.sample.messages.OrderMessagesFactory" /> </factories> <buses> <bus name="sample-bus"> <channels> <channel name="new-orders-channel" join="true"> <filter>Region=US|Canada</filter> </channel> </channels> </bus> </buses> <messaging> </app> </apps> </model> |
When a message is received by a message bus, it is enqueued into the application's Inbound Event Queue to be queued for dispatch, which the engine will pick up.
@EventHandler public void onNewOrder(NewOrderMessage message) { /*... do some work ...*/ } |
The application's underlying AepEngine will ensure that the message is acknowledged once state changes made by the handler have been stabilized. That, coupled with the engine's message deduplication feature, ensures that even in the event of failover, the handler will only be executed once.
A channel filter filters variable parts of a channel key to filter what is received over a message channel. It is used to determine the subscriptions issued on behalf of the application. See the Configuring Channels for Join section above for an example of using channel filters. In particular, pay attention to the "key" and "filter" elements.
Channel filter syntax takes the following form:
var1=val1[|val2][;var2=val3]
For example, given a channel key of "NEWORDERS/${Region}/{Department}"
, one can specify a channel filter of "Region=US|EMEA;Department=Clothing"
. This would join the channel on:
If a variable portion of the channel key is omitted in a filter, it will result in the subscription being joined in a wildcard fashion (assuming the underlying bus implementation supports wildcards). So given a channel key of "NEWORDERS/${Region}/${Department}"
and a channel filter of "Region=US|EMEA"
, it would result in the following subscriptions being issued during join:
Finally, if the channel filter is set to null for the channel key in the example above, then the resulting subscription would be:
When the global configuration setting nv.sma.cleanmessagekey is set to true, non-alphanumeric characters in the configured filter values will be replaced by underscores in order to match the keys used on the send side. The below configuration setting can be used to opt-out of this behavior, but typically the default behavior is more desirable:
Property | Default | Description |
---|---|---|
nv.sma.cleanmessagefilter | ${nv.sma.cleanmessagekey} | Controls whether or not channel filter values are sanitized by replacing any non letter or digit character with an '_'. For example, if the channel key is specified as "/Orders/${Region}" and a filter of "Region=Asia/Pac" is given, then the filter will match all messages with the resolved key value of "/Orders/Asia_Pac" (rather than "/Orders/Asia/Pac"). This configuration setting does not typically need to be modified & only exists for certain rare edge cases. |
Messages dispatched to applications are read only, it is illegal for an application to modify a received message.
As Messages may be pooled by the platform it is also not legal for an application to hold onto a message beyond the scope of its handler unless it first calls acquire() on the message. If the application does acquire() the message it should later dispose() of the message to allow the platform to reuse it. In most cases holding on to the message beyond the scope of the handler is not the best idea from a performance perspective. Instead applications will typically copy the data they need out of the message into either application state objects or into outbound messages.
Inbound messages may not be resent as outbound messages. Applications that need to forward an inbound message as an outbound messages should first copy() the message and send the message copy:
@EventHandler public void onMyMessageReceived(MyMessage message) { MyMessage copy = message.copy(); messageSender.send("mirror", copy); } |
Message bus binding implementations receive messages in serialized form and wrap them with a MessageView that is passed to the application to work with. MessageViews are wrapped by locating the message MessageViewFactory for the message which is typically generated by ADM. To locate the factory and message type, a binding consults Message Metadata that is transported along with the serialized message. An application must therefore register the message factories it intends to use so that bus binding can find the factories. This can be done by registration or by programming.
Most often, applications will list message view factories that they use in their DDL Config.
<apps> <app name="MyApp"> <messaging> <factories> <factory name="com.sample.messages.OrderMessagesFactory" /> </factories> <buses> <!-- ... --> </buses> </messaging> </app> </app> |
Registration can also be done programmatically via the AepEngine. A common way to do this is to provide an AppInjectionPoint for the AepEngine in the application.
public class MyApp { @AppInjectionPoint public void onEngineCreated(AepEngine engine) { engine.registerFactory(new com.example.messages.MyMessageFactory()); engine.registerFactory(new com.example.messages.other.MyOtherMessageFactory()); } } |
To avoid duplicates and provide exactly once delivery semantics each message sent through an AepEngine is assigned a message sender id derived from the hashcode of the application name and a monotonically increasing sequence number. The following table describes the fields that the engine sets on outbound messages to enable downstream duplicate detection. These properties are transported on a message bus binding as part of the message metadata sent with the serialized message.
Message Field | Type | Description |
---|---|---|
MessageSender | int | A globally unique id that identifies the sender of a message. By default an AepEngine uses the hashcode of the engine name as the sender id. |
MessageSno | long | A monotonically increasing sequence number. |
MessageFlow | int | Indicates the flow to which the message belongs. Flows allow for partitioning of message traffic in conjunction with application state and allow intra application state partitioning. ![]() |
As an AepEngine processes message it keeps track of the last seen sequence number for each sender that has sent to it as part of its HA state. Consequently when an application fails over to a backup or is restarted it will be remember the last message seen from senders sending to it and can filter duplicates on behalf of the application. When an application's state is wiped or reinitialized it will restart its sending stream sequence to 1 which alerts downstream applications not to consider its newly lowered sequence numbers as duplicates.
By default any message sent from a message handler is sequenced by an AepEngine, the application should not set the sequence number or sender itself. Messages sent from outside of a message handler (unsolicited sends) are not sequenced by default. Sequencing of unsolicited sends must be enabled via configuration discussed below.
Configuring Duplicate Detection
For applications that are tolerant of duplicates or can perform duplicate detection on their own it is possible to disable duplicate detection for the engine. It is also possible to enable sequencing of messages that are sent from outside of a message handler can be done as well.
<model> <!-- Apps will reference the buses they use in their messaging config --> <apps> <app name="sample-app" mainClass="com.sample.SampleApp"> <messaging> ... <messaging> <!-- <performDuplicateChecking>true</performDuplicateChecking> <setOutboundSequenceNumbers>true</setOutboundSequenceNumbers> <sequenceUnsolicitedSends>false</sequenceUnsolicitedSends> <sequenceUnsolicitedWithSolicitedSends>false</sequenceUnsolicitedWithSolicitedSends> </app> </apps> </model> |
The following table summarizes aeo engine configuration properties that have an impact on duplicate detection:
Config Property | Default | Description |
---|---|---|
setOutboundSequenceNumbers | true | Enables setting of outbound sequence numbers on send from the application's aep engine. When disabled a sequence number of 0 will be set on all outbound sends meaning that downstream receivers will not perform duplicate filtering for sends from this application. See setOutboundSequenceNumbers in the DDL Config Reference |
performDuplicateChecking | true | Whether or not the applications AepEngine will filter duplicates based on received sequence numbers. An AepEngine uses the hashcode of the engine name as the sender id. See performDuplicateChecking in the DDL Config Reference |
sequenceUnsolicitedSends | long | Whether to tag sequence numbers on unsolicited sends. This property can be enabled for applications that only perform sends in an unsolicited fashion and will never send a message from a message handler.
In short, enabling this mode of operation is really only applicable for gateway applications that use the application in purely unsolicited sending capacity. In most cases applications will instead elect to use sequenceUnsolicitedWithSolicitedSends described below. See sequenceUnsolicitedSends in the DDL Config Reference |
sequenceUnsolicitedWithSolicitedSends | int | Indicates that unsolicited sends calls should result in injecting the send operation onto the engine's input queue to sequence the send with respect to inbound event processing in the context of an AEP transaction. This avoids the possibility of sequencing errors with respect to outbound messages being concurrently sent by message handlers. See sequenceUnsolicitedSendsWithSolicitedSends in the DDL Config Reference |
It is worth noting that for a cluster application only the primary instance of the application establishes message bus connections. In the event of a failure a backup member will be elected and reestablish messaging connections. From a programming standpoint this has the biggest impact on applications using the EventSourcing HA Policy as it means that even though their handlers are being invoked message channels will not have been established for the applications.
During live operation the primary application keeps track of the last transaction for which outbound messages were acknowledged. This id, known as the stableTransactionId is persisted in the applications transaction log and replicated to backups. After failure or recovery backup applications will retransmit outbound messages that are part of transactions that have not stabilized and rely on downstream applications to use the duplicate detection to eliminate any duplicates.
By default, when an engine is stopped without an error, bus channels that were 'joined' will be 'left', meaning that any subscriptions or interests created by the message bus will be unsubscribed or unregistered. This behavior can be overridden by configuring an application to preserve channel joins on stop.
<app name="sample-app" mainClass="com.sample.SampleApp"> <messaging> ... <messaging> <preserveChannelJoinsOnStop>true</preserveChannelJoinsOnStop> </app> |
Note that this property has no effect when an engine shuts down with an error (e.g. AepEngine.stop(Exception)
with a non-null cause. In this case, channel joins are left intact, allowing a backup to take over.
This behavior can also be overridden programmatically on a case by case basis by an EventHandler for the AepEngineStoppingEvent
setting AepEngineStoppingEvent.setPreserveChannelJoins(boolean)
,