The Talon Manual

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

 

 

 

 

div
idtoc
classtoc
div
classtocTitle

In This Section

div
classtocContents

Table of Contents
maxLevel2
indent8px
stylenone

...

Excerpt

The Executor Bus Binding is a special bus binding that allows applications to provide a processor callback to process sent messagessend work (via a message) to be processed on a separate thread. The executor binding can be used to perform processor intensive work in a thread other than an application's main dispatch thread, or can be used as a means to implement outbound gateways in which the executing thread 'pushes' the sent message to an external system. Work done by the processor of an executor bus is acknowledged and therefore Guaranteed across failures.

Gliffy Diagram
size800
nameExecutor Bus Flow

...

  1. Implement an ExecutorBusProcessor that handles the processing
  2. Expose it the executor via a ExecutorBusProcessorFactory configured for the bus
  3. ... and acknowledge completed work via the provided Acknowledger as work is completed. 
  4. Send the processor work in the form of messages that the processor will complete. 

Configuring an Executor Bus

The below example configuration demonstrates how one could configure an executor bus that will be used sends out e-mails:

Code Block
xml
xml
<buses>
  <bus name="email-sender">
    <provider>executor</provider>
    <address>audit-logger</address>
    <properties>
      <processor_factory_classname>com.example.EmailGatewayProcessorFactory</processor_factory_classname>
      <from_address>admin@example.com</from_address>         
      <smtp_host>mail.example.com</smtp_host>          
      <smtp_port>25</smtp_port>            
      <smtp_password>admin</smtp_password>      
      <!--... and so on.-->   
    </properties>
    <channels>
      <channel name="email-alerts">
        <qos>Guaranteed</qos>
      </channel>
    </channels>
  </bus>
</buses>
 
<apps>
  <app name="email-gateway-app" mainClass="com.example.EmailGateway">
    <messaging>
      <buses>
        <bus name="email-sender">
          <detachedSend enabled="true}">
            <queueDrainerCpuAffinityMask>${EMAIL_SENDER_CPU_AFFMASK::0}</queueDrainerCpuAffinityMask>
          </detachedSend>
          <channels>
            <channel name="email-alerts" join="false"/>
          </channels>
        </bus>
      </buses>
    </messaging>
  </app>
</apps>

Executor Bus Properties

An executor bus exposes the following configuration properties:

Property
Default
Description

processor_factory_classname

-

Specifies a factory classname that will be used by the bus to create its processor. The processor factory class must be a subclass of AbstractExecutorBusProcessorFactory and expose a zero argument constructor.

(lightbulb) This property is required.  

An application can configure additional properties for the bus that can be used by the bus's processor. In the above example the bus also defines smpt related properties that would be used for sending out e-mails. 

Threading

Note also that the <app> element configured the bus to use detachedSends. This is because the executor bus does not itself create any additional threads; processing of messages is done directly on the thread calling send. In the case of a Talon application using detached sends is important because otherwise the bus processing (sending e-mails) would be done on the application's commit thread. So unless the processor implementation has its own thread or thread pool for performing work it is usually desirable to configure executor buses for detached send. 

Channels

The executor bus will set the channel name on messages it dispatches to the processor. It is good practice for processors that implement there own threading or thread pools to maintain ordered processing on a channel by channel basis. 

Note

Attempting to configure an executor bus channel for join will result in a runtime error ... an executor bus only supports outbound semantics.

Qos

Providing an executor bus channel is declared as Guaranteed, an AepEngine will not complete the transaction from which the processing was scheduled until the bus's processor has acknowledged it. This means that if works was sent to the executor bus in response to a received message the received message won't be aknowledged until the work is done. If the executor bus channel is defined as BestEffort the engine will not wait for acknowledgement of work completion before acknowleding received events. 

Address and Port

An executor bus must be configured with an address, but a port is optional. The address may be used by a bus's process factory as a means of looking up a processor if there are multiple executor buses configured for an application. 

Implementing an Executor Bus Processor

An executor bus needs an ExecutorBusProcessor to perform processing which is supplied to the bus when it is created by its executor bus processor factory. The ExecutorBusProcessor interface defines a single method, the process method which is defined as follows:

Code Block
java
java
    /**
     * Called by the executor bus to process a message sent through the executor bus.  
     * <p>
     * The implementer may perform the send in the thread calling this method or 
     * pass the message off to a thread pool that it manages for greater parallelism. 
     * <p>
     * Users of an executor bus should be able to expect ordered processing of messages 
     * on a per channel basis, so implementations that are perform processing on multiple
     * threads are encourage to call {@link MessageView#getMessageChannel()} to determine 
     * the execution channel and process accordingly. 
     * 
     * @param view The view to send. 
     * @param acknowledger The acknowledger or <code>null</code> if no acknowledgement is required for processing this message. 
     * @param flags Flags provided by the executor bus as hints to the processor. 
     */
    public void process(MessageView view, Acknowledger acknowledger, int flags) throws Exception;

Tying into Executor Bus Lifecycle

An AepEngine creates message bus instances when the application assumes the primary role. Processors that need to open connections to external systems to perform their processing may implement LifecycleAwareExecutorBusProcessor to hook into the correspondings bus lifecycle. In this case the executor bus will call the processor's onExecutorBusOpen, onExecutorBusStart and onExecutorBusClose methods. 

Accessing Executor Bus Configuration Properties

If processor related configuration is present in the executor bus configuration (e.g. properties like smtp_host in the example configuration above), those properties can be retrieved from the provider config portion of the binding's descriptor. See the sample executor code below for an example: 

Acknowledger

Regardless of whether or not an executor bus channel is configured to be Guaranteed, the bus will pass a non null Acknowledger to the application and the application must call its acknowledge method when processing has been completed. The acknowler's can be called by any thread asynchronously, but its acknowledge method may only be called once as the Acknowledger implementation is pooled. 

Sample Executor Code

The below pseudo code illustrates how an executor bus processor can be implemented. 

Code Block
java
java
public class EmailGatewayProcessor implements LifecycleAwareExecutorBusProcessor {
 
    private final Tracer tracer = Tracer.create("email.sender", INFO);
    private volatile JavaMailSenderImpl mailer;

    /**
     * Called by executor bus prior to open. 
     */
    @Override
    public void onExecutorBusOpen(MessageBusBinding binding) throws Exception {
        // Get config properties:
        Properties config = binding.getDescriptor().getProviderConfig();
        mailer = new JavaMailSenderImpl();
        mailer.setHost(config.getProperty("smtp_host"));
        mailer.setPort(Short.valueOf(config.getProperty("smtp_port")));
        mailer.setUsername(config.getProperty("smtp_user"));
        mailer.setPassword(config.getProperty("smtp_password"));
        mailer.setProtocol(config.getProperty("smpt_protocol"));
        //etc...
        tracer.log("EmailSender opened", INFO);
    }
    /**
     * Called by executor bus binding on open. 
     */
    @Override
    public void onExecutorBusStart(MessageBusBinding binding) throws Exception {
        tracer.log("EmailSender started", INFO);
    }
    /**
     * Called by executor bus binding on close. 
     */
    @Override
    public void onExecutorBusClose(MessageBusBinding binding) throws Exception {
	    mailer = null;
        tracer.log("EmailSender closed", INFO);
    }
 
    /**
     * Executor bus callback. 
     * 
     * @param message The message to send. 
     * @param acknowledger The acknowledger for the message. 
     * @param flags Additional flags provided by the executor binding. 
     */
    @Override
    public final void process(MessageView message, Acknowledger acknowledger, int flags) {
        try { 
          final MimeMessage template = mailer.createMimeMessage();
          final MimeMessageHelper helper = new MimeMessageHelper(template, true);
          
		  helper.setSubject("ALERT: " + message.getClass().getSimpleName());
          helper.setText(message.serializeToJson(), false);
          // etc...
 
          // send
          mailer.send(template);

          //acknowledge completion:
          acknowledger.acknowledge();
        }
        catch (Exception e) {
          // Acknowledge with a failure (to close the bus): 
          acknowledger.acknowledge(e);
        }
    }
}
 

Implementing an Executor Bus Processor Factory

Code Block
titleConfiguration
/**
 * 
 */
public class EmailGatewayProcessorFactory extends AbstractExecutorBusProcessorFactory {
    private static volatile EmailGatewayProcessor emailGatewayProcessor;

    /* (non-Javadoc)
     * @see com.neeve.sma.spi.executor.AbstractExecutorBusProcessorFactory#createExecutorBusProcessor(com.neeve.sma.MessageBusBinding)
     */
    @Override
    public ExecutorBusProcessor createExecutorBusProcessor(MessageBusBinding binding) {
        return emailGatewayProcessor;
    }
    /**
     * Registers the tibco send processor. 
     * 
     * @param tibcoSendProcessor The tibco send processor. 
     */
    public static void register(SolaceToRvBridge tibcoSendProcessor) {
        PumaExecutorBusProcessorProvider.tibcoSendProcessor = tibcoSendProcessor;
    }
}

Example: Creating an E-mail Alert Gateway

To tie the above concepts together this example shows how one could create a gateway that bridges alerts received from solace out through the email gateway we discussed above. Below we will:

  • Configure 2 buses, an alert-bus using solace, and the email-gateway executor bus we've described above. 
  • Create an Application that 
    • Creates the EmailSender gateway and registers it with the executor bus factory that we discussed above. 
    • Defines an EventHandler that listens to messages of type MyAppAlertMessage received from solace.
    • Copies the MyAppAlertMessage message and forwards it over the email-sender bus to sent out over e-mail. 

Because the e-mail gateway bus is acknowledging its work after sending each e-mail, this application will guarantee that e-mail alerts will be sent, and by virtue of clustering will be highly available.

Code Block
titleApplication Code
@AppHAPolicy(value = HAPolicy.StateReplication)
public class EmailGatewayApp {
	private volatile AepMessageSender messageSender;
 
    @AppInjectionPoint
    public void setMessageSender(AepMessageSender messageSender) { 
       this.messageSender = messageSender;
    }
    
    @EventHandler
    public void onAlert(MyAppAlertMessage alert) {
      // Send a copy of the received alert out through the
      // email-alerts channel of the e-mail bus: 
      messageSender.send("email-alerts", alert.copy()); 
    }
}
Code Block
titleConfiguration
<buses>
  <bus name="alert-bus">
    <provider>solace</provider>
    <address>localhost</address>
    <port>
    <channels>
      <channel name="app-alerts">
        <qos>Guaranteed</qos>
      </channel>
    </channels>
  </bus>
  <bus name="email-gatewaysender">
    <provider>executor</provider>
    <address>audit-logger</address>
    <properties>
      <processor_factory_classname>com.example.MyAuditLogWriterFactory<EmailGatewayProcessorFactory</processor_factory_classname>
      <from_address>admin@example.com</from_address>         
      <smtp_host>mail.example.com</smtp_host>          
      <smtp_port>25</smtp_port>            
      <smtp_password>admin</smtp_password>      
      <!--... and so on.-->   
    </properties>
    <channels>
      <channel name="executionemail-reportsalerts">
        <qos>Guaranteed</qos>
      </channel>
    </channels>
  </bus>
</buses>
 
<apps>
  <app name="EmailGatewayemail-gateway-app" mainClass="com.example.EmailGatewayEmailGatewayApp">
    <messaging>
      <buses>
        <bus name="filealert-outputbus">
          <detachedSend enabled="${TIBCO_DETACHED_SEND_ENABLED::truefalse"/>
          <channels>
            <channel name="app-alerts" join="true"/>
          </channels>
        </bus>
        <bus name="email-sender">
          <detachedSend enabled="true}">
            <queueDrainerCpuAffinityMask>${TIBCOEMAIL_DETACHEDSENDER_SENDCPU_CPUAFFMASKAFFMASK::0}</queueDrainerCpuAffinityMask>
          </detachedSend>
          <channels>
            <channel name="executionemail-reportsalerts" join="false"/>
          </channels>
        </bus>
      </buses>
    </messaging>
  </app>
</apps>

An Executor bus is configured like any other bus as a bus defined in the buses section in DDL which can then be used by an application by referencing it an applications messaging config. 

Note

WIP

Implementing an Executor Bus Processor

 

Note

WIP

Threading

At this time, the executor bus implementation does not itself create any additional threads, processing of messages is done directly on the thread calling send. However, when an AepEngine configures an executor bus for detachedSend, the execution is done on the engine's bus manager thread which does provide a separate thread of execution for processing.