div | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||||||||||
|
...
Excerpt |
---|
The Executor Bus Binding is a special bus binding that allows applications to provide a processor callback to process sent messagessend work (via a message) to be processed on a separate thread. The executor binding can be used to perform processor intensive work in a thread other than an application's main dispatch thread, or can be used as a means to implement outbound gateways in which the executing thread 'pushes' the sent message to an external system. Work done by the processor of an executor bus is acknowledged and therefore Guaranteed across failures. |
Gliffy Diagram | ||||||
---|---|---|---|---|---|---|
|
...
- Implement an ExecutorBusProcessor that handles the processing
- Expose it the executor via a ExecutorBusProcessorFactory configured for the bus
- ... and acknowledge completed work via the provided Acknowledger as work is completed.
- Send the processor work in the form of messages that the processor will complete.
Configuring an Executor Bus
The below example configuration demonstrates how one could configure an executor bus that will be used sends out e-mails:
Code Block | ||||
---|---|---|---|---|
| ||||
<buses> <bus name="email-sender"> <provider>executor</provider> <address>audit-logger</address> <properties> <processor_factory_classname>com.example.EmailGatewayProcessorFactory</processor_factory_classname> <from_address>admin@example.com</from_address> <smtp_host>mail.example.com</smtp_host> <smtp_port>25</smtp_port> <smtp_password>admin</smtp_password> <!--... and so on.--> </properties> <channels> <channel name="email-alerts"> <qos>Guaranteed</qos> </channel> </channels> </bus> </buses> <apps> <app name="email-gateway-app" mainClass="com.example.EmailGateway"> <messaging> <buses> <bus name="email-sender"> <detachedSend enabled="true}"> <queueDrainerCpuAffinityMask>${EMAIL_SENDER_CPU_AFFMASK::0}</queueDrainerCpuAffinityMask> </detachedSend> <channels> <channel name="email-alerts" join="false"/> </channels> </bus> </buses> </messaging> </app> </apps> |
Executor Bus Properties
An executor bus exposes the following configuration properties:
Property | Default | Description |
---|---|---|
- | Specifies a factory classname that will be used by the bus to create its processor. The processor factory class must be a subclass of AbstractExecutorBusProcessorFactory and expose a zero argument constructor.
|
An application can configure additional properties for the bus that can be used by the bus's processor. In the above example the bus also defines smpt related properties that would be used for sending out e-mails.
Threading
Note also that the <app> element configured the bus to use detachedSends. This is because the executor bus does not itself create any additional threads; processing of messages is done directly on the thread calling send. In the case of a Talon application using detached sends is important because otherwise the bus processing (sending e-mails) would be done on the application's commit thread. So unless the processor implementation has its own thread or thread pool for performing work it is usually desirable to configure executor buses for detached send.
Channels
The executor bus will set the channel name on messages it dispatches to the processor. It is good practice for processors that implement there own threading or thread pools to maintain ordered processing on a channel by channel basis.
Note |
---|
Attempting to configure an executor bus channel for join will result in a runtime error ... an executor bus only supports outbound semantics. |
Qos
Providing an executor bus channel is declared as Guaranteed, an AepEngine will not complete the transaction from which the processing was scheduled until the bus's processor has acknowledged it. This means that if works was sent to the executor bus in response to a received message the received message won't be aknowledged until the work is done. If the executor bus channel is defined as BestEffort the engine will not wait for acknowledgement of work completion before acknowleding received events.
Address and Port
An executor bus must be configured with an address, but a port is optional. The address may be used by a bus's process factory as a means of looking up a processor if there are multiple executor buses configured for an application.
Implementing an Executor Bus Processor
An executor bus needs an ExecutorBusProcessor to perform processing which is supplied to the bus when it is created by its executor bus processor factory. The ExecutorBusProcessor interface defines a single method, the process method which is defined as follows:
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Called by the executor bus to process a message sent through the executor bus.
* <p>
* The implementer may perform the send in the thread calling this method or
* pass the message off to a thread pool that it manages for greater parallelism.
* <p>
* Users of an executor bus should be able to expect ordered processing of messages
* on a per channel basis, so implementations that are perform processing on multiple
* threads are encourage to call {@link MessageView#getMessageChannel()} to determine
* the execution channel and process accordingly.
*
* @param view The view to send.
* @param acknowledger The acknowledger or <code>null</code> if no acknowledgement is required for processing this message.
* @param flags Flags provided by the executor bus as hints to the processor.
*/
public void process(MessageView view, Acknowledger acknowledger, int flags) throws Exception; |
Tying into Executor Bus Lifecycle
An AepEngine creates message bus instances when the application assumes the primary role. Processors that need to open connections to external systems to perform their processing may implement LifecycleAwareExecutorBusProcessor to hook into the correspondings bus lifecycle. In this case the executor bus will call the processor's onExecutorBusOpen, onExecutorBusStart and onExecutorBusClose methods.
Accessing Executor Bus Configuration Properties
If processor related configuration is present in the executor bus configuration (e.g. properties like smtp_host in the example configuration above), those properties can be retrieved from the provider config portion of the binding's descriptor. See the sample executor code below for an example:
Acknowledger
Regardless of whether or not an executor bus channel is configured to be Guaranteed, the bus will pass a non null Acknowledger to the application and the application must call its acknowledge method when processing has been completed. The acknowler's can be called by any thread asynchronously, but its acknowledge method may only be called once as the Acknowledger implementation is pooled.
Sample Executor Code
The below pseudo code illustrates how an executor bus processor can be implemented.
Code Block | ||||
---|---|---|---|---|
| ||||
public class EmailGatewayProcessor implements LifecycleAwareExecutorBusProcessor {
private final Tracer tracer = Tracer.create("email.sender", INFO);
private volatile JavaMailSenderImpl mailer;
/**
* Called by executor bus prior to open.
*/
@Override
public void onExecutorBusOpen(MessageBusBinding binding) throws Exception {
// Get config properties:
Properties config = binding.getDescriptor().getProviderConfig();
mailer = new JavaMailSenderImpl();
mailer.setHost(config.getProperty("smtp_host"));
mailer.setPort(Short.valueOf(config.getProperty("smtp_port")));
mailer.setUsername(config.getProperty("smtp_user"));
mailer.setPassword(config.getProperty("smtp_password"));
mailer.setProtocol(config.getProperty("smpt_protocol"));
//etc...
tracer.log("EmailSender opened", INFO);
}
/**
* Called by executor bus binding on open.
*/
@Override
public void onExecutorBusStart(MessageBusBinding binding) throws Exception {
tracer.log("EmailSender started", INFO);
}
/**
* Called by executor bus binding on close.
*/
@Override
public void onExecutorBusClose(MessageBusBinding binding) throws Exception {
mailer = null;
tracer.log("EmailSender closed", INFO);
}
/**
* Executor bus callback.
*
* @param message The message to send.
* @param acknowledger The acknowledger for the message.
* @param flags Additional flags provided by the executor binding.
*/
@Override
public final void process(MessageView message, Acknowledger acknowledger, int flags) {
try {
final MimeMessage template = mailer.createMimeMessage();
final MimeMessageHelper helper = new MimeMessageHelper(template, true);
helper.setSubject("ALERT: " + message.getClass().getSimpleName());
helper.setText(message.serializeToJson(), false);
// etc...
// send
mailer.send(template);
//acknowledge completion:
acknowledger.acknowledge();
}
catch (Exception e) {
// Acknowledge with a failure (to close the bus):
acknowledger.acknowledge(e);
}
}
}
|
Implementing an Executor Bus Processor Factory
Code Block | ||
---|---|---|
| ||
/**
*
*/
public class EmailGatewayProcessorFactory extends AbstractExecutorBusProcessorFactory {
private static volatile EmailGatewayProcessor emailGatewayProcessor;
/* (non-Javadoc)
* @see com.neeve.sma.spi.executor.AbstractExecutorBusProcessorFactory#createExecutorBusProcessor(com.neeve.sma.MessageBusBinding)
*/
@Override
public ExecutorBusProcessor createExecutorBusProcessor(MessageBusBinding binding) {
return emailGatewayProcessor;
}
/**
* Registers the tibco send processor.
*
* @param tibcoSendProcessor The tibco send processor.
*/
public static void register(SolaceToRvBridge tibcoSendProcessor) {
PumaExecutorBusProcessorProvider.tibcoSendProcessor = tibcoSendProcessor;
}
} |
Example: Creating an E-mail Alert Gateway
To tie the above concepts together this example shows how one could create a gateway that bridges alerts received from solace out through the email gateway we discussed above. Below we will:
- Configure 2 buses, an alert-bus using solace, and the email-gateway executor bus we've described above.
- Create an Application that
- Creates the EmailSender gateway and registers it with the executor bus factory that we discussed above.
- Defines an EventHandler that listens to messages of type MyAppAlertMessage received from solace.
- Copies the MyAppAlertMessage message and forwards it over the email-sender bus to sent out over e-mail.
Because the e-mail gateway bus is acknowledging its work after sending each e-mail, this application will guarantee that e-mail alerts will be sent, and by virtue of clustering will be highly available.
Code Block | ||
---|---|---|
| ||
@AppHAPolicy(value = HAPolicy.StateReplication)
public class EmailGatewayApp {
private volatile AepMessageSender messageSender;
@AppInjectionPoint
public void setMessageSender(AepMessageSender messageSender) {
this.messageSender = messageSender;
}
@EventHandler
public void onAlert(MyAppAlertMessage alert) {
// Send a copy of the received alert out through the
// email-alerts channel of the e-mail bus:
messageSender.send("email-alerts", alert.copy());
}
} |
Code Block | ||
---|---|---|
| ||
<buses> <bus name="alert-bus"> <provider>solace</provider> <address>localhost</address> <port> <channels> <channel name="app-alerts"> <qos>Guaranteed</qos> </channel> </channels> </bus> <bus name="email-gatewaysender"> <provider>executor</provider> <address>audit-logger</address> <properties> <processor_factory_classname>com.example.MyAuditLogWriterFactory<EmailGatewayProcessorFactory</processor_factory_classname> <from_address>admin@example.com</from_address> <smtp_host>mail.example.com</smtp_host> <smtp_port>25</smtp_port> <smtp_password>admin</smtp_password> <!--... and so on.--> </properties> <channels> <channel name="executionemail-reportsalerts"> <qos>Guaranteed</qos> </channel> </channels> </bus> </buses> <apps> <app name="EmailGatewayemail-gateway-app" mainClass="com.example.EmailGatewayEmailGatewayApp"> <messaging> <buses> <bus name="filealert-outputbus"> <detachedSend enabled="${TIBCO_DETACHED_SEND_ENABLED::truefalse"/> <channels> <channel name="app-alerts" join="true"/> </channels> </bus> <bus name="email-sender"> <detachedSend enabled="true}"> <queueDrainerCpuAffinityMask>${TIBCOEMAIL_DETACHEDSENDER_SENDCPU_CPUAFFMASKAFFMASK::0}</queueDrainerCpuAffinityMask> </detachedSend> <channels> <channel name="executionemail-reportsalerts" join="false"/> </channels> </bus> </buses> </messaging> </app> </apps> |
An Executor bus is configured like any other bus as a bus defined in the buses section in DDL which can then be used by an application by referencing it an applications messaging config.
Note |
---|
WIP |
Implementing an Executor Bus Processor
Note |
---|
WIP |
Threading
At this time, the executor bus implementation does not itself create any additional threads, processing of messages is done directly on the thread calling send. However, when an AepEngine configures an executor bus for detachedSend, the execution is done on the engine's bus manager thread which does provide a separate thread of execution for processing.