Understanding Message SendMessages are sent over message bus channels in a fire and forget fashion - the application can rely on the underlying AepEngine to deliver the message according to the quality of service configured for a channel. An application views message channels as a logical, ordered conduit over which messages flow. The diagram below depicts the path of a message sent through an engine. The basic flow of sending is as follows: - Application creates a message.
- Application calls send passing the message and channel name.
- The engine looks up the channel and use it to resolves the physical topic on which the message will be sent.
- The engine queues the message for send until the associated state changes are stabilized.

Solicited vs Unsolicited SendsWhen an outbound message is sent from within an application event handler, it is referred to as a solicited send. When an application is sent from outside of a message handler, it is referred to as an unsolicited send. Because the AepEngine and underlying SMA message channels are not designed to be thread safe, it is not permissible for an AepEngine to be used concurrently for both solicited and unsolicited sends. |
Creating MessagesMessage types generated by the application data modeler can be created via their static create() method. Application code populates message contents as a regular pojo. Message Concurrency and PoolingOutbound messages sent by the application may be serialized in background threads. Once a message is sent it may not be modified or resent by the application. This includes any nested entities on the message. |
An application that would like to send the same message multiple times can use the copy() method on the message. The copy can be modified and used in a subsequent send call. Messages may also be pooled by the platform. This is the reason that message construction is done through static create methods. Once the platform has finished sending them, their backing contents can be wiped and the view reused in a subsequent sent. Sending a message through an AepEngine transfers ownership of the message to the engine. When a message is sent from within an application event handler, the application may not retain the message or any of its nested entities beyond the scope of the message handler as pooling may invalidate them.
Furthermore, unsolicited senders must consider sent messages to be immediately invalid because the sending thread is outside of the context of a transaction and can have no expectation as to when the platform will dispose of the message. An unsolicited sender may call acquire() on the message prior to sending it if it needs to hold onto the message for a period of time after send, but it still may not modify or reuse. If a message is created, but not sent, it is up to the application to call dispose() on the message to return it the pool. Failing to return the message to a pool will not result in a memory leak, but it will mean that subsequent create calls won't result in object allocation and promotion. |
Sending MessagesUsing Aep Message Sender
The simplest way to send a message is to add an AppInjectionPoint annotation that allows the XVM to inject a message sender. The application then supplies the bus and channel name along with the message. public class MyApp {
private volatile AepMessageSender messageSender;
@AppInjectionPoint
public void injectMessageSender(AepMessageSender messageSender) {
this.messageSender = messageSender;
}
@EventHandler
public void onNewOrder(NewOrderMessage message) {
// create the message
OrderEventMessage event = OrderEventMessage.create();
event.setOrderId(message.getOrderId());
// send message through the aep sender
// on the 'order-event-channel' of the 'order-processing-bus'
messageSender.sendMessage("order-processing-bus"
"order-event-channel",
event);
}
} |
See also: Sending via the AepEngineWhen an application does not use the AepMessageSender, it must provide the MessageChannel in the AepEngine's sendMessage call. When using this pattern, the application can register event handlers for channel up and down events which are dispatched to the application when messaging is started or stopped. The application can qualify the EventHandler annotation to identify a particular channelName@busName to specify the channel of interest. public class MyApp {
private volatile MessageChannel orderEventChannel;
private volatile AepEngine aepEngine;
@AppInjectionPoint
public void onEngineInjected(AepEngine aepEngine) {
this.aepEngine = aepEngine;
}
@EventHandler(source = "order-event-channel@order-processing-bus")
public void onOrderEventChannelUp(AepChannelUpEvent channelUpEvent) {
this.orderEventChannel= channelUpEvent.getChannel();
}
@EventHandler(source = "new-orders-channel@order-processing-bus")
public void onOnOrderEventChannelDown(AepChannelDownEvent channelDownEvent) {
this.orderEventChannel= null;
}
@EventHandler
public void onNewOrder(NewOrderMessage message) {
OrderEventMessage event = OrderEventMessage.create();
event.setOrderId(message.getOrderId());
}
} |
Channel Keys and Dynamic Topics Channel key of "OrderEvents/${OrderState}/${Region}/{Department}" , one can specify a channel filter Message Reflectors Key Resolution TablesIn many cases, substitution values for a dynamic key come from the application environment or configuration. Message channels can be configured with a key resolution to allow substitution of key variables that don't come from the message being sent. public class MyApp {
private final Properties globalKeyResolutionTable = new Properties();
public MyApp() {
krt.put("Region", "EMEA");
krt.put("Shard", "1");
}
@EventHandler()
public void onOrderEventChannelUp(AepChannelUpEvent channelUpEvent) {
channelUpEvent.getChannel.setKeyResolutionTable(globalKeyResolutionTable);
}
} |
Message Sequencing
HA ConsiderationsOnly the primary instance of an application will establish messaging connections. Understanding Message DispatchAn application's Aep Engine creates and manages the lifecycle of the message buses that an application configures for use. When an application is configured to join one of more bus channels, subscriptions will be issued on behalf of the application to attract messages. Message dispatch occurs as follows: - The message bus binding implementation receives a serialized, message provider specific message.
- Using an application supplied message view factory (generated by ADM), and using SMA message data also transported with the provider specific message, the bus wraps the serialized message in a view object (also ADM generated).
- Using message metadata transported with the message, the binding looks up the message channel on which to dispatch the received message.
- The message view is wrapped in a MessageEvent along with its message channel and is dispatched to the application's AepEngine, where it is enqueued for processing.
- The AepEngine picks up the message event and dispatches to each application event handler that has a signature matching the message type.
- Once the AepEngine stabilizes the results of the application's message processing, a message acknowledgement is dispatched back to the binding.
 Expressing InterestFor an application to receive messages, it must: - join the message channels on which the message is sent,
- register message factories for the bus provider to deserialize the message,
- and define an EventHandler for the message.
Configuring Channels For JoinIn order for the application's AepEninge to issue subscriptions for the message channel on which a message is sent, it must be joined. Buses and channels are configured via the platform's configuration DDL. The below configuration snippet demonstrates: - Defining a bus named "sample-bus" with a "new-orders-channel".
- An application that uses the "sample-bus" and joins the "new-orders-channel".
<model>
<!-- Define buses and channels that will be shared between applications-->
<buses>
<bus name="sample-bus">
<provider>activemq</provider>
<address>localhost</address>
<port>61616</port>
<channels>
<channel name="new-orders-channel">
<qos>Guaranteed</qos>
<key>NEWORDERS/${Region}/${Department}
</channel>
<channels>
</bus>
</buses>
<!-- Apps will reference the buses they use in their messaging config -->
<apps>
<app name="sample-app" mainClass="com.sample.SampleApp">
<messaging>
<factories>
<factory name="com.sample.messages.OrderMessagesFactory" />
</factories>
<buses>
<bus name="sample-bus">
<channel name="new-orders-channel" join="true"/>
<filter>Region=US|Canada</filter>
</bus>
</buses>
<messaging>
</app>
</apps>
</model> |
Adding an EventHandlerWhen a message is received by a message bus, it is enqueued into the application's Inbound Event Queue to be queued for dispatch the engine will pick up @EventHandler
public void onNewOrder(NewOrderMessage message) {
/*... do some work ...*/
} |
The application's underlying AepEngine will ensure that the message is acknowledged once state changes made by the handler have been stabilized. That, coupled with the engine's message deduplication feature, ensures that even in the event of failover, the handler will only be executed once. Channel FiltersA channel filter filters variable parts of a channel key to filter what is received over a message channel. It is used to determine the subscriptions issued on behalf of the application. Channel filter take the following form:
var1=val1[|val2][;var2=val3] For example, given a channel key of "NEWORDERS/${Region}/{Department}" , one can specify a channel filter of "Region=US|EMEA;Department=Clothing" . This would join the channel on: - NEWORDERS/US/Clothing
- NEWORDERS/EMEA/Clothing
If a variable portion of the channel key is omitted in a filter, it will result in the subscription being joined in a wildcard fashion (assuming the underlying bus implementation supports wildcards). So given a channel key of "NEWORDERS/${Region}/${Department}" and a channel filter of "Region=US|EMEA" , it would result in the following subscriptions being issued during join: - NEWORDERS/US/*
- NEWORDERS/EMEA/*
Finally, if the channel filter is set to null for the channel key in the example above, then the resulting subscription would be: Registering Message View FactoriesMessage bus binding implementations receive messages in serialized form and wrap them with a MessageView that is passed to the application to work with. MessageViews are wrapped by locating the message MessageViewFactory for the message which is typically generated by ADM. To locate the factory and message type, a binding consults Message Metadata that is transported along with the serialized message. An application must therefore register the message factories it intends to use so that bus binding can find the factories. This can be done by registration or by programming. Registration via Config DDLMost often, applications will list message view factories that they use in their DDL Config. <apps>
<app name="MyApp">
<messaging>
<factories>
<factory name="com.sample.messages.OrderMessagesFactory" />
</factories>
</messaging>
</app>
</app> |
ProgrammaticallyRegistration can also be done programmatically via the AepEngine. A common way to do this is to provide an AppInjectionPoint for the AepEngine in the application. public class MyApp {
@AppInjectionPoint
public void onEngineCreated(AepEngine engine) {
engine.registerFactory(new com.example.messages.MyMessageFactory());
engine.registerFactory(new com.example.messages.other.MyOtherMessageFactory());
}
} |
Preserving Subscriptions on ShutdownBy default, when an engine is stopped without an error, bus channels that were 'joined' will be 'left', meaning that any subscriptions or interests created by the message bus will be unsubscribed or unregistered. This behavior can be overridden by configuring an application to preserve channel joins on stop. <app name="sample-app" mainClass="com.sample.SampleApp">
<messaging>
...
<messaging>
<preserveChannelJoinsOnStop>true</preserveChannelJoinsOnStop>
</app> |
Note that this property has no effect when an engine shuts down with an error (e.g. AepEngine.stop(Exception) with a non-null cause. In this case, channel joins are left intact, allowing a backup to take over. This behavior can also be overridden programmatically on a case by case basis by an EventHandler for the AepEngineStoppingEvent setting AepEngineStoppingEvent.setPreserveChannelJoins(boolean) , See Also |