...
Messages are sent over message bus channels in a fire and forget fashion - the application can rely on the underlying AepEngine AEP Engine to deliver the message according to the quality of service configured for a channel. An application views message channels as a logical, ordered conduit over which messages flow.
...
Note |
---|
Sending a message through an AepEngine AEP Engine transfers ownership of the message to the engine. What this means is that, once the send call completes, the application must release its reference to the sent message. Modifying or even reading a message post send is illegal and can result in unpredictable behavior. If a message is created, but not sent, it is up to the application to call dispose() on the message to return it the pool. Failing to return the message to a pool will not result in a memory leak, but it will mean that subsequent create() calls won't result in object allocation and promotion. |
...
Sending via the
...
AEP Engine
When an application does not use the AepMessageSender, it must provide the MessageChannel in the AepEngineAEP Engine's sendMessage call. When using this pattern, the application can register event handlers for channel up and down events which are dispatched to the application when messaging is started or stopped. The application can qualify the EventHandler annotation to identify a particular channelName@busName to specify the channel of interest.
Code Block | ||||
---|---|---|---|---|
| ||||
public class MyApp { private volatile MessageChannel orderEventChannel; private volatile AepEngine aepEngineAEP Engine AEP Engine; @AppInjectionPoint public void onEngineInjected(AepEngine aepEngineAEP Engine AEP Engine) { this.aepEngine AEP Engine = aepEngineAEP Engine; } @EventHandler(source = "order-event-channel@order-processing-bus") public void onOrderEventChannelUp(AepChannelUpEvent channelUpEvent) { this.orderEventChannel= channelUpEvent.getChannel(); } @EventHandler(source = "new-orders-channel@order-processing-bus") public void onOrderEventChannelDown(AepChannelDownEvent channelDownEvent) { this.orderEventChannel= null; } @EventHandler public void onNewOrder(NewOrderMessage message) { OrderEventMessage event = OrderEventMessage.create(); event.setOrderId(message.getOrderId()); // The channel name and bus must be set on the message // if using Event Sourcing because handlers are invoked // on the backup and during recovery before messaging is // started. // // This is not required when using State Replication. event.setMessageChannel("order-event-channel"); event.setMessageBus("order-processing-bus"); aepEngineAEP Engine.sendMessage(orderEventChannel, event); } } |
...
- The message contents via message reflection. The channel will look for a field accessor matching the variable key component (e.g. a getSalesRegion() method) if the message supports message reflection.
- The key resolution table supplied to an AepEngineAEP Engine.sendMessage method or AepMessageSender.sendMessage method (if non-null).
- The channel's key resolution table (if non-null), which is a table that can be set on the channel programmatically for key resolution when handling an AepChannelUpEvent (see restrictions below for EventSourcing considerations).
- Default values encoded into the channel key itself. In the above example, this would be 'US' for 'salesRegion'.
...
Code Block | ||
---|---|---|
| ||
<env>
<nv>
<sma>
<channelkeyfunctioncontainers>com.example.CustomChannelKeyFunctions</channelkeyfunctioncontainers>
</sma>
</nv>
</env>
|
Code Block | ||
---|---|---|
| ||
public class CustomChannelKeyFunctions { /** * A custom key function that substituteds the first character of the provided * value into the message key. */ public static final void abbreviate(final XString messageKey, XString value) { messageKey.append(value.charAt(0)); } } |
...
- Variable substitution is not done on the provided key.
- Key validation is done on the topic
- The provided key will be prepended with the channel name if topic_starts_with_channel=true is set for the bus.
...
Receiving Messages
An application's AepEngine AEP engine creates and manages the lifecycle of the message buses that an application configures for use. When an application is configured to join one or more bus channels, appropriate topic subscriptions will be issued on behalf of the application. Message dispatch occurs as follows:
- The message bus binding implementation receives a serialized, message provider specific message.
- Using an application supplied message view factory (generated by ADM), and using SMA message metadata also transported with the provider-specific message, the bus wraps the serialized message in a view object (also ADM generated).
- Using message metadata transported with the message, the binding looks up the message channel on which to dispatch the received message.
- The message view is wrapped in a MessageEvent along with its message channel and is dispatched to the application's AepEngineAEP Engine, where it is enqueued for processing.
- The AepEngine AEP Engine picks up the message event and dispatches to each application event handler that has a signature matching the message type.
- Once the AepEngine AEP Engine stabilizes the results of the application's message processing, a message acknowledgment is dispatched back to the binding.
Gliffy Diagram | ||||||
---|---|---|---|---|---|---|
|
...
Registering Message Interest
For an application to receive messages, it must:
- join the message channels on which the message is sent,
- register message factories for the bus provider to deserialize the message,
- and define an EventHandler for the message.
Configuring Channels For Join
In order for the application's AepEninge to issue subscriptions for the message channel on which a message is sent, it must be joined. Buses and channels are configured via the platform's configuration DDL. The below configuration snippet demonstrates:
...
Code Block |
---|
<?xml version="1.0"?> <model xmlns="http://www.neeveresearch.com/schema/x-ddl" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <!-- Define buses and channels that will be shared between applications--> <buses> <bus name="sample-bus"> <provider>activemq</provider> <address>localhost</address> <port>61616</port> <channels> <channel name="new-orders-channel"> <qos>Guaranteed</qos> <key>NEWORDERS/${Region}/${Department}</key> </channel> <channels> </bus> </buses> <!-- Apps will reference the buses they use in their messaging config --> <apps> <app name="sample-app" mainClass="com.sample.SampleApp"> <messaging> <factories> <factory name="com.sample.messages.OrderMessagesFactory" /> </factories> <buses> <bus name="sample-bus"> <channels> <channel name="new-orders-channel" join="true"> <filter>Region=US|Canada</filter> </channel> </channels> </bus> </buses> <messaging> </app> </apps> </model> |
Adding an EventHandler
When a message is received by a message bus, it is enqueued into the application's Inbound Event Queue to be queued for dispatch, which the engine will pick up.
...
The application's underlying AepEngine AEP Engine will ensure that the message is acknowledged once state changes made by the handler have been stabilized. That, coupled with the engine's message deduplication feature, ensures that even in the event of a failover, the handler will only be executed once.
Channel Filters
A channel filter filters variable parts of a channel key to filter what is received over a message channel. It is used to determine the subscriptions issued on behalf of the application. See the Configuring Channels for Join section above for an example of using channel filters. In particular, pay attention to the "key" and "filter" elements.
...
Finally, if the channel filter is set to null for the channel key in the example above, then the resulting subscription would be:
- NEWORDERS/*/*
Cleaning Channel
...
Filters
Status | ||||
---|---|---|---|---|
|
...
Otherwise, any characters that are not alpha-numeric will be replaced. Prior to 3.8, any non-alphanumeric character was replaced included the wildcard combinations described above.
Handling Received Messages
Messages dispatched to applications are read-only, it is illegal for an application to modify a received message.
...
Code Block |
---|
@EventHandler public void onMyMessageReceived(MyMessage message) { MyMessage copy = message.copy(); messageSender.send("mirror", copy); } |
Registering MessageView Factories
Message bus binding implementations receive messages in serialized form and wrap them with a MessageView that is passed to the application to work with. MessageViews are wrapped by locating the message MessageViewFactory for the message which is typically generated by ADM. To locate the factory and message type, a binding consults Message Metadata that is transported along with the serialized message. An application must, therefore, register the message factories it intends to use so that bus binding can find the factories. This can be done by registration or by programming.
Registration via Config DDL
Most often, applications will list message view factories that they use in their DDL Config.
Code Block | ||||
---|---|---|---|---|
| ||||
<apps> <app name="MyApp"> <messaging> <factories> <factory name="com.sample.messages.OrderMessagesFactory" /> </factories> <buses> <!-- ... --> </buses> </messaging> </app> </app> |
...
Programmatic Registration
Registration can also be done programmatically via the AepEngineAEP Engine. A common way to do this is to provide an AppInjectionPoint for the AepEngine AEP Engine in the application.
Code Block | ||||
---|---|---|---|---|
| ||||
public class MyApp { @AppInjectionPoint public void onEngineCreated(AepEngineAEP Engine engine) { engine.registerFactory(new com.example.messages.MyMessageFactory()); engine.registerFactory(new com.example.messages.other.MyOtherMessageFactory()); } } |
...
To avoid duplicates and provide exactly once delivery semantics each message sent through an AepEngine AEP Engine is assigned a message sender id derived from the hashcode of the application name and a monotonically increasing sequence number. The following table describes the fields that the engine sets on outbound messages to enable downstream duplicate detection. These properties are transported on a message bus binding as part of the message metadata sent with the serialized message.
Message Field | Type | Description |
---|---|---|
MessageSender | int | A globally unique id that identifies the sender of a message. By default, an AepEngine AEP Engine uses the hashcode of the engine name as the sender id. |
MessageSno | long | A monotonically increasing sequence number. |
MessageFlow | int | Indicates the flow to which the message belongs. Flows allow for partitioning of message traffic in conjunction with application state and allow intra-application state partitioning. ![]() |
As an AepEngine AEP Engine processes messages, it keeps track of the last seen sequence number for each sender that has sent to it as part of its HA state. Consequently, when an application fails over to a backup or is restarted it will remember the last message seen from senders sending to it and can filter duplicates on behalf of the application. When an application's state is wiped or reinitialized it will restart its sending stream sequence to 1 which alerts downstream applications not to consider its newly lowered sequence numbers as duplicates.
By default any message sent from a message handler is sequenced by an AepEngineAEP Engine, the application should not set the sequence number or sender itself. Messages sent from outside of a message handler (unsolicited sends) are not sequenced by default. Sequencing of unsolicited sends must be enabled via configuration discussed below.
...
Config Property | Default | Description |
---|---|---|
setOutboundSequenceNumbers | true | Enables setting of outbound sequence numbers on send from the application's aep engine. When disabled a sequence number of 0 will be set on all outbound sends meaning that downstream receivers will not perform duplicate filtering for sends from this application. See setOutboundSequenceNumbers in the DDL Config Reference |
performDuplicateChecking | true | Whether or not the applications AepEngine AEP Engine will filter duplicates based on received sequence numbers. An AepEngine AEP Engine uses the hashcode of the engine name as the sender id. See performDuplicateChecking in the DDL Config Reference |
sequenceUnsolicitedSends | long | Whether to tag sequence numbers on unsolicited sends. This property can be enabled for applications that only perform sends in an unsolicited fashion and will never send a message from a message handler.
In short, enabling this mode of operation is really only applicable for gateway applications that use the application in purely unsolicited sending capacity. In most cases, applications will instead elect to use sequenceUnsolicitedWithSolicitedSends described below. See sequenceUnsolicitedSends in the DDL Config Reference |
sequenceUnsolicitedWithSolicitedSends | int | Indicates that unsolicited sends calls should result in injecting the send operation into the engine's input queue to sequence the send with respect to inbound event processing in the context of an AEP transaction. This avoids the possibility of sequencing errors with respect to outbound messages being concurrently sent by message handlers. See sequenceUnsolicitedWithSolicitedSends in the DDL Config Reference |
...
Note that this property has no effect when an engine shuts down with an error (e.g. AepEngineAEP Engine.stop(Exception) with a non-null cause. In this case, channel joins are left intact, allowing a backup to take over.
This behavior can also be overridden programmatically on a case by case basis by an EventHandler for the AepEngineStoppingEventAEP EngineStoppingEvent setting AepEngineStoppingEventAEP EngineStoppingEvent.setPreserveChannelJoins(boolean):
Code Block | ||||
---|---|---|---|---|
| ||||
@EventHandler public void onEngineStopping(final AepEngineStoppingEventAEP EngineStoppingEvent event) { if(event.getPreserveChannelJoins()) { tracer.log("Overriding unsubscribe behavior on application stop to remove subscriptions."); event.setPreserveChannelJoins(false); } } |
...