In This Section

The Executor Bus is still in incubation and is classified as an experimental feature. The APIs below may change as new features are added to this binding.

Overview

The Executor Bus Binding is a special bus binding that allows applications to provide send work (via a message) to be processed on a separate thread. The executor binding can be used to perform processor intensive work in a thread other than an application's main dispatch thread, or can be used as a means to implement outbound gateways in which the executing thread 'pushes' the sent message to an external system. Work done by the processor of an executor bus is acknowledged and therefore Guaranteed across failures.

To use an executor bus the application must:

  1. Implement an ExecutorBusProcessor that handles the processing
  2. Expose it the executor via a ExecutorBusProcessorFactory configured for the bus
  3. ... and acknowledge completed work via the provided Acknowledger as work is completed. 
  4. Send the processor work in the form of messages that the processor will complete. 

Configuring an Executor Bus

The below example configuration demonstrates how one could configure an executor bus that will be used sends out e-mails:

<buses>
  <bus name="email-sender">
    <provider>executor</provider>
    <address>audit-logger</address>
    <properties>
      <processor_factory_classname>com.example.EmailGatewayProcessorFactory</processor_factory_classname>
      <from_address>admin@example.com</from_address>         
      <smtp_host>mail.example.com</smtp_host>          
      <smtp_port>25</smtp_port>            
      <smtp_password>admin</smtp_password>      
      <!--... and so on.-->   
    </properties>
    <channels>
      <channel name="email-alerts">
        <qos>Guaranteed</qos>
      </channel>
    </channels>
  </bus>
</buses>
 
<apps>
  <app name="email-gateway-app" mainClass="com.example.EmailGateway">
    <messaging>
      <buses>
        <bus name="email-sender">
          <detachedSend enabled="true}">
            <queueDrainerCpuAffinityMask>${EMAIL_SENDER_CPU_AFFMASK::0}</queueDrainerCpuAffinityMask>
          </detachedSend>
          <channels>
            <channel name="email-alerts" join="false"/>
          </channels>
        </bus>
      </buses>
    </messaging>
  </app>
</apps>

Executor Bus Properties

An executor bus exposes the following configuration properties:

Property
Default
Description

processor_factory_classname

-

Specifies a factory classname that will be used by the bus to create its processor. The processor factory class must be a subclass of AbstractExecutorBusProcessorFactory and expose a zero argument constructor.

(lightbulb) This property is required.  

An application can configure additional properties for the bus that can be used by the bus's processor. In the above example the bus also defines smpt related properties that would be used for sending out e-mails. 

Threading

Note also that the <app> element configured the bus to use detachedSends. This is because the executor bus does not itself create any additional threads; processing of messages is done directly on the thread calling send. In the case of a Talon application using detached sends is important because otherwise the bus processing (sending e-mails) would be done on the application's commit thread. So unless the processor implementation has its own thread or thread pool for performing work it is usually desirable to configure executor buses for detached send. 

Channels

The executor bus will set the channel name on messages it dispatches to the processor. It is good practice for processors that implement there own threading or thread pools to maintain ordered processing on a channel by channel basis. 

Attempting to configure an executor bus channel for join will result in a runtime error ... an executor bus only supports outbound semantics.

Qos

Providing an executor bus channel is declared as Guaranteed, an AepEngine will not complete the transaction from which the processing was scheduled until the bus's processor has acknowledged it. This means that if works was sent to the executor bus in response to a received message the received message won't be aknowledged until the work is done. If the executor bus channel is defined as BestEffort the engine will not wait for acknowledgement of work completion before acknowleding received events. 

Address and Port

An executor bus must be configured with an address, but a port is optional. The address may be used by a bus's process factory as a means of looking up a processor if there are multiple executor buses configured for an application. 

Implementing an Executor Bus Processor

An executor bus needs an ExecutorBusProcessor to perform processing which is supplied to the bus when it is created by its executor bus processor factory. The ExecutorBusProcessor interface defines a single method, the process method which is defined as follows:

    /**
     * Called by the executor bus to process a message sent through the executor bus.  
     * <p>
     * The implementer may perform the send in the thread calling this method or 
     * pass the message off to a thread pool that it manages for greater parallelism. 
     * <p>
     * Users of an executor bus should be able to expect ordered processing of messages 
     * on a per channel basis, so implementations that are perform processing on multiple
     * threads are encourage to call {@link MessageView#getMessageChannel()} to determine 
     * the execution channel and process accordingly. 
     * 
     * @param view The view to send. 
     * @param acknowledger The acknowledger or <code>null</code> if no acknowledgement is required for processing this message. 
     * @param flags Flags provided by the executor bus as hints to the processor. 
     */
    public void process(MessageView view, Acknowledger acknowledger, int flags) throws Exception;

Tying into Executor Bus Lifecycle

An AepEngine creates message bus instances when the application assumes the primary role. Processors that need to open connections to external systems to perform their processing may implement LifecycleAwareExecutorBusProcessor to hook into the correspondings bus lifecycle. In this case the executor bus will call the processor's onExecutorBusOpen, onExecutorBusStart and onExecutorBusClose methods. 

Accessing Executor Bus Configuration Properties

If processor related configuration is present in the executor bus configuration (e.g. properties like smtp_host in the example configuration above), those properties can be retrieved from the provider config portion of the binding's descriptor. See the sample executor code below for an example: 

Acknowledger

Regardless of whether or not an executor bus channel is configured to be Guaranteed, the bus will pass a non null Acknowledger to the application and the application must call its acknowledge method when processing has been completed. The acknowler's can be called by any thread asynchronously, but its acknowledge method may only be called once as the Acknowledger implementation is pooled. 

Sample Executor Code

The below pseudo code illustrates how an executor bus processor can be implemented. 

public class EmailGatewayProcessor implements LifecycleAwareExecutorBusProcessor {
 
    private final Tracer tracer = Tracer.create("email.sender", INFO);
    private volatile JavaMailSenderImpl mailer;

    /**
     * Called by executor bus prior to open. 
     */
    @Override
    public void onExecutorBusOpen(MessageBusBinding binding) throws Exception {
        // Get config properties:
        Properties config = binding.getDescriptor().getProviderConfig();
        mailer = new JavaMailSenderImpl();
        mailer.setHost(config.getProperty("smtp_host"));
        mailer.setPort(Short.valueOf(config.getProperty("smtp_port")));
        mailer.setUsername(config.getProperty("smtp_user"));
        mailer.setPassword(config.getProperty("smtp_password"));
        mailer.setProtocol(config.getProperty("smpt_protocol"));
        //etc...
        tracer.log("EmailSender opened", INFO);
    }
    /**
     * Called by executor bus binding on open. 
     */
    @Override
    public void onExecutorBusStart(MessageBusBinding binding) throws Exception {
        tracer.log("EmailSender started", INFO);
    }
    /**
     * Called by executor bus binding on close. 
     */
    @Override
    public void onExecutorBusClose(MessageBusBinding binding) throws Exception {
	    mailer = null;
        tracer.log("EmailSender closed", INFO);
    }
 
    /**
     * Executor bus callback. 
     * 
     * @param message The message to send. 
     * @param acknowledger The acknowledger for the message. 
     * @param flags Additional flags provided by the executor binding. 
     */
    @Override
    public final void process(MessageView message, Acknowledger acknowledger, int flags) {
        try { 
          final MimeMessage template = mailer.createMimeMessage();
          final MimeMessageHelper helper = new MimeMessageHelper(template, true);
          
		  helper.setSubject("ALERT: " + message.getClass().getSimpleName());
          helper.setText(message.serializeToJson(), false);
          // etc...
 
          // send
          mailer.send(template);

          //acknowledge completion:
          acknowledger.acknowledge();
        }
        catch (Exception e) {
          // Acknowledge with a failure (to close the bus): 
          acknowledger.acknowledge(e);
        }
    }
}
 

Implementing an Executor Bus Processor Factory

The processor bus factory returns instances of a processor for use by the executor bus. The executor bus will create a processor when the bus is created. In an AepEngine this will be when an engine is activated.

/**
 * Factory for creating EmailGatewayProcessors
 */
public class EmailGatewayProcessorFactory extends AbstractExecutorBusProcessorFactory {
    @Override
    public ExecutorBusProcessor createExecutorBusProcessor(MessageBusBinding binding) {
        return new EmailGatewayProcessor();
    }
}

If the class implementing ExecutorBusProcessor is a class created through a DI framework or if it needs access to other objects in your application and thus needs to be wired up before bus configurattion, consider registering the instance of your process as either a static variable in the bus processor factory. If you have multiple processor instances you can store them in a static map and use the bus binding descriptor to determine which instance to return. Some binding configuration that is useful in this regard includes:

  • binding.getUsername(): The name of the engine creating the bus.
  • binding.getName(): The name of the bus as configured in DDL
  • binding.getAddress(): The address as configured in DDL.
  • binding.getDescriptor().getProperties(): Binding properties as configured in DDL.

Example: Creating an E-mail Alert Gateway

To tie the above concepts together this example shows how one could create a gateway that bridges alerts received from solace out through the email gateway we discussed above. Below we will:

Because the e-mail gateway bus is acknowledging its work after sending each e-mail, this application will guarantee that e-mail alerts will be sent, and by virtue of clustering will be highly available.

@AppHAPolicy(value = HAPolicy.StateReplication)
public class EmailGatewayApp {
	private volatile AepMessageSender messageSender;
 
    @AppInjectionPoint
    public void setMessageSender(AepMessageSender messageSender) { 
       this.messageSender = messageSender;
    }
    
    @EventHandler
    public void onAlert(MyAppAlertMessage alert) {
      // Send a copy of the received alert out through the
      // email-alerts channel of the e-mail bus: 
      messageSender.send("email-alerts", alert.copy()); 
    }
}
<buses>
  <bus name="alert-bus">
    <provider>solace</provider>
    <address>localhost</address>
    <port>
    <channels>
      <channel name="app-alerts">
        <qos>Guaranteed</qos>
      </channel>
    </channels>
  </bus>
  <bus name="email-sender">
    <provider>executor</provider>
    <address>audit-logger</address>
    <properties>
      <processor_factory_classname>com.example.EmailGatewayProcessorFactory</processor_factory_classname>
      <from_address>admin@example.com</from_address>         
      <smtp_host>mail.example.com</smtp_host>          
      <smtp_port>25</smtp_port>            
      <smtp_password>admin</smtp_password>      
      <!--... and so on.-->   
    </properties>
    <channels>
      <channel name="email-alerts">
        <qos>Guaranteed</qos>
      </channel>
    </channels>
  </bus>
</buses>
 
<apps>
  <app name="email-gateway-app" mainClass="com.example.EmailGatewayApp">
    <messaging>
      <buses>
        <bus name="alert-bus">
          <detachedSend enabled="false"/>
          <channels>
            <channel name="app-alerts" join="true"/>
          </channels>
        </bus>
        <bus name="email-sender">
          <detachedSend enabled="true}">
            <queueDrainerCpuAffinityMask>${EMAIL_SENDER_CPU_AFFMASK::0}</queueDrainerCpuAffinityMask>
          </detachedSend>
          <channels>
            <channel name="email-alerts" join="false"/>
          </channels>
        </bus>
      </buses>
    </messaging>
  </app>
</apps>