Working with Scheduled Tasks
The ESB profile of WSO2 EI can be configured to execute tasks periodically. You can schedule a task to run after a time interval of 't' for an 'n' number of times, or you can schedule the task to run once when the ESB server starts. Alternatively, you can use cron expressions to have more control over how the task should be scheduled; for example, you can use a cron expression to schedule the task to run at 10 pm on the 20th day of every month.
According to the default task implementation in the ESB profile, a task can be configured to inject messages, either to a defined endpoint, to a proxy service or a specific sequence defined in the ESB. If you have a specific task-handling requirement, you can write your own task-handling implementation by creating a custom Java class that implements the org.apache.synapse.startup.Task
interface. For example, you can create a task that will read a text file from a specified location and place orders for stocks that are given in the text file.
Scheduling a task
Follow the steps given below to schedule a task using the default task handling implementation in the ESB.
Prerequisites
You need to have WSO2 EI tooling installed to create a new task or to import an existing task via tooling. For instructions on installing WSO2 EI tooling, see Installing Enterprise Integrator Tooling.
In a clustered environment, tasks are distributed among server nodes according to the round-robin method, by default. If required, you can change this default task handling behavior so that tasks are distributed randomly, or according to a specific rule. This is a server-level setting that is configured in the tasks-config.xml
file.
- See Configuring the Task Scheduling Component for instructions on configuring the task handling behavior at server-level.
- You can also configure the task handling behavior at task-level, by specifying the Pinned Servers for a task. Note that this setting overrides the server-level configuration.
Also, note that a scheduled task will only run on one of the nodes (at a given time) in a clustered environment. The task will failover to another node, only if the first node fails.
Step 1: Creating an ESB project
First, create an ESB Solution Project in ESB tooling. We will use this project to store the proxy service file.
Open the Developer Studio Dashboard (click Developer Studio > Open Dashboard) and click ESB Solution Project.
- Enter a name for the project and click Next.
- Enter the Maven information about the project and click Finish.
- The new project will be listed in the project explorer.
Step 2: Creating a scheduled task
Follow these steps to create a new scheduled task.
- In Eclipse, click the Developer Studio menu and then click Open Dashboard. This opens the Developer Studio Dashboard.
Click Scheduled Task on the Developer Studio Dashboard. The New Scheduled Task dialog will open.
Importing a task?
If you already have a task created, you have the option of importing the XML configuration. Select Import Scheduled Task Artifact and follow the instructions on the UI. To create a new task from scratch, continue with the following steps.
Select Create a New Scheduled Task Artifact to create a new task and click Next.
Enter the required details into the fields.
Parameter Description Task Name Name of a scheduled task. Task Group The synapse.simple.quartz
group will be selected by default.Task Implementation The default task implementation class (org.apache.synapse.startup.tasks.MessageInjector) of the ESB will be selected by default. This class simply injects a specified message into the Synapse environment of the ESB when the server starts.
If you are want to use a custom task implementation, see the instructions in Writing Tasks.Trigger Type The trigger type determines the task execution schedule.
Simple Trigger: Schedules the task to run a specified number of times at specified intervals. In the Count field, enter the number of time the task should be executed, and in the Interval field, enter the time interval (in seconds) between consecutive executions of the task.
See the following examples for simple triggers:
Cron Trigger: Schedules the task according to acron expression. See the following example for acron trigger where the task is scheduled to run at 1:30 AM:
<task name="CheckPrice" class="org.wso2.esb.tutorial.tasks.PlaceStockOrderTask"> <trigger cron="0 30 1 * * ?"/> </task>
Pinned Servers The list of ESB server nodes that will run the task. You can specify the IP addresses of the required nodes.
This setting can be used if you want the task to run on a selected set of nodes in an ESB cluster. Note that the task will only run on one of the nodes at a time. It will failover to another node, only if the first node fails.
As explained above, pinned servers will override the default task handling behavior defined at server-level (for this particular task). However, if rule-based task handling is specified at server-level, you need to ensure that the same server nodes you specify as pinned servers for the task are also specified for the task handling rule at server-level.
- Click Finish. The scheduled task is created in the
src/main/synapse-config/tasks
folder under the ESB Config project you specified.
Step 3: Defining the task properties
Enter values for the properties of the task to specify the message that should be triggered by the task, and the endpoint to which the message should be sent. Consider the example of a service that exposes stock quote information. If you have a proxy service, or a specific mediation sequence deployed in the ESB to route messages to the stock quote service, you can schedule a task to send the required message to that proxy service or sequence. Alternatively, you can schedule the task to send the message directly to the endpoint.
First, open the task from the project explorer. Then, click Task Implementation Properties to open the Task Properties dialog:
The list of task implementation properties are explained below.
Parameter Name | Description |
---|---|
message | Specify the body of the request that should be sent when the task is executed. Note that it is mandatory to provide a value for the message property. Therefore, even If you do not want to send a message body, you have to provide an empty payload as the value to avoid an exception being thrown. |
soapAction | This is the soap action to use when sending the message to the endpoint. |
to | If the task should send the message directly to the endpoint through the main sequence, the endpoint address should be specified. For example, if the address of the endpoint is http://localhost:9000/services/SimpleStockQuoteService, the synapse configuration of the scheduled task will be as follows: |
injectTo | If the task is not sending the message directly to the endpoint (through the main sequence), it should be injected to proxy service or a sequence. Specify sequence, or proxy. |
sequenceName | If the task should inject the message to a sequence (injectTo parameter is sequence), enter the name of the sequence. For example, if the name of the sequence is 'SampleSequence', the synapse configuration of the scheduled task will be as follows: |
proxyName | If the task should inject the message to a proxy service (injectTo parameter is proxy), enter the name of the proxy service. For example, if the name of the proxy service is 'SampleProxy', the synapse configuration of the scheduled task will be as follows: |
Injecting messages to RESTful Endpoints
In order to use the Message Injector to inject a message to a RESTfulendpint, we can specify the injector with the required payload and inject the message to sequence or proxy service as defined above. The sample below shows a RESTful message injection through a ProxyService.
Step 4: Deploying the scheduled task in the ESB server
Once you have created the scheduled task as explained in the previous topics, you need to create a Composite Application project with a CAR file. You can then deploy the CAR file in the ESB server:
- Right-click the Project Explorer and click New > Project.
- From the window that opens, click Composite Application Project.
Give a name to the Composite Application project and select the projects that you need to group into your C-App from the list of available projects. You need to select the ESB project that contains the scheduled task.
- Next, deploy the CAR file in the ESB server.
Step 5: Testing the scheduled task
The ESB profile of WSO2 EI is shipped with the sample back-end service that you can use for task scheduling examples discussed above. Deploy the SimpleStockQuote service in the axis2 server and start the server.
Open a terminal and navigate to the
<EI_HOME>/samples/axis2Server/src/SimpleStockQuoteService
directory.- Run the
ant
command. This will build the back-end service. - Open a terminal and navigate to the
<EI_HOME>/samples/axis2Server
directory. - Execute the following command to start the axis2 server:
- On Linux:
sh axis2server.sh
- On Windows:
axis2server.bat
- On Linux:
Scheduling a task using a custom implementation
When you create a task using the default task implementation of the ESB, the task can inject messages to a proxy service, or to a sequence. If you want to have other custom options for task scheduling, you can write a new task implementation class.
To demonstrate this capability, we will create a task implementation class that enables a scheduled task to read a text file stored in a specific location. In this example, we will use a text file with stock information as shown below. You can then schedule a task to use the information in this text file and place stock orders to a back-end service that exposes stock quotes.
IBM,100,120.50
Each line in the text file contains details for a stock order:
- symbol
- quantity
- price
A task that is scheduled using this custom implementation will read the text file, a line at a time, and create orders using the given values to be sent to the back-end service. The text file will then be tagged as processed to include a system time stamp. The task will be scheduled to run every 15 seconds.
The main steps while writing a task are as follows:
Step 1: Writing the Task Class
Let's create a custom task class that implements the org.apache.synapse.startup.Task
interface. This interface has a single execute()
method, which contains the code that will be executed according to the defined schedule.
The execute()
method contains the following actions:
- Check whether the file exists at the desired location.
- If it does, then read the file line by line composing place order messages for each line in the text file.
- Individual messages are then injected to the synapse environment with the given
To
endpoint reference. - Set each message as
OUT_ONLY
since it is not expected any response for messages.
In addition to the execute()
method, it is also possible to make the class implement a JavaBean
interface.
The complete code listing of the Task class (PlaceStockOrderTask) is provided below:
public class PlaceStockOrderTask implements Task, ManagedLifecycle { private Log log = LogFactory.getLog(PlaceStockOrderTask.class); private String to; private String stockFile; private SynapseEnvironment synapseEnvironment; public void execute() { log.debug("PlaceStockOrderTask begin"); if (synapseEnvironment == null) { log.error("Synapse Environment not set"); return; } if (to == null) { log.error("to not set"); return; } File existFile = new File(stockFile); if(!existFile.exists()) { log.debug("waiting for stock file"); return; } try { // file format IBM,100,120.50 BufferedReader reader = new BufferedReader(new FileReader(stockFile)); String line = null; while( (line = reader.readLine()) != null){ line = line.trim(); if(line == "") { continue; } String[] split = line.split(","); String symbol = split[0]; String quantity = split[1]; String price = split[2]; MessageContext mc = synapseEnvironment.createMessageContext(); mc.setTo(new EndpointReference(to)); mc.setSoapAction("urn:placeOrder"); mc.setProperty("OUT_ONLY", "true"); OMElement placeOrderRequest = createPlaceOrderRequest(symbol, quantity, price); PayloadHelper.setXMLPayload(mc, placeOrderRequest); synapseEnvironment.injectMessage(mc); log.info("placed order symbol:" + symbol + " quantity:" + quantity + " price:" + price); } reader.close(); } catch (IOException e) { throw new SynapseException("error reading file", e); } File renamefile = new File(stockFile); renamefile.renameTo(new File(stockFile + "." + System.currentTimeMillis())); log.debug("PlaceStockOrderTask end"); } public static OMElement createPlaceOrderRequest(String symbol, String qty, String purchPrice) { OMFactory factory = OMAbstractFactory.getOMFactory(); OMNamespace ns = factory.createOMNamespace("http://services.samples/xsd", "m0"); OMElement placeOrder= factory.createOMElement("placeOrder", ns); OMElement order = factory.createOMElement("order", ns); OMElement price = factory.createOMElement("price", ns); OMElement quantity = factory.createOMElement("quantity", ns); OMElement symb = factory.createOMElement("symbol", ns); price.setText(purchPrice); quantity.setText(qty); symb.setText(symbol); order.addChild(price); order.addChild(quantity); order.addChild(symb); placeOrder.addChild(order); return placeOrder; } public void destroy() { } public void init(SynapseEnvironment synapseEnvironment) { this.synapseEnvironment = synapseEnvironment; } public SynapseEnvironment getSynapseEnvironment() { return synapseEnvironment; } public void setSynapseEnvironment(SynapseEnvironment synapseEnvironment) { this.synapseEnvironment = synapseEnvironment; } public String getTo() { return to; } public void setTo(String to) { this.to = to; } public String getStockFile() { return stockFile; } public void setStockFile(String stockFile) { this.stockFile = stockFile; } }
When creating customized task schedules, if the injecting sequence of the message flow contains Publish Event mediators, set the following property in the Synapse message context:
mc.setProperty("CURRENT_TASK_EXECUTING_TENANT_IDENTIFIER",PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId());
Also, add the following dependency to the POM file of the custom task project: WSO2 Carbon - Utilities bundle
(symbolic name: org.wso2.carbon.utils
)
This is a bean implementing two properties: To and StockFile. These are used to configure the task.
Implementing ManagedLifecycle
for Initialization and Cleanup
Since a task implements ManagedLifecyle
interface, the ESB profile will call the init()
method at the initialization of a Task
object and destroy()
method when a Task
object is destroyed:
public interface ManagedLifecycle { public void init(SynapseEnvironment se); public void destroy(); }
The PlaceStockOrderTask
stores the Synapse environment object reference in an instance variable for later use with the init()
method. The SynapseEnvironment
is needed for injecting messages into the ESB.
Step 2: Customizing the Task
It is possible to pass values to a task at runtime using property elements. In this example, the location of the stock order file and its address was given using two properties within the Task
object:
- String type
- OMElement type
Tip
For OMElement type, it is possible to pass XML elements as values in the configuration file.
When creating a Task
object, the ESB will initialize the properties with the given values in the configuration file.
public String getStockFile() { return stockFile; } public void setStockFile(String stockFile) { this.stockFile = stockFile; }
For example, the following properties in the Task
class are initialized with the given values within the property element of the task in the configuration.
<syn:property xmlns="http://ws.apache.org/ns/synapse" name="stockFile"value="/home/upul/test/stock.txt"/>
For those properties given as XML elements, properties need to be defined within the Task
class using the format given below. OMElement comes from Apache AXIOM, which is used by the ESB profile. AXIOM is an object model similar to DOM. To learn more about AXIOM, see the tutorial in the AXIOM user guide.
public void setMessage(OMElement elem) { message = elem;}
It can be initialized with an XML element as follows:
<property name="message"> <m0:getQuote xmlns:m0="http://services.samples/xsd"> <m0:request> <m0:symbol>IBM</m0:symbol> </m0:request> </m0:getQuote> </property>
Step 3: Compiling and bundling the Task
Assemble the compiled class Task as a JAR file.
Step 4: Adding to the WSO2 EI class path
After compiling and bundling the Task
class, you need to add it to the ESB profile's class path. Place the JAR file in the <EI_HOME>/lib
directory. The Task
class will be available for use from the next time you start the server.
Be sure to restart the ESB for the JAR containing the task implementation to be effective. An OSGi bundle containing the task implementation will be created automatically and it will be deployed in the server.
Step 5: Scheduling a new task
To schedule a new task with the custom implementation, follow the instructions given above under Scheduling a task. However, be sure to apply the following changes relevant to your custom task implementation.
Create the
stockfile.txt
with the following contents and save it to a location on your machine.IBM,100,120.50 MSFT,200,70.25 SUN,400,60.758.
Change the task implementation class from the default implementation to
org.wso2.esb.tutorial.tasks.PlaceStockOrderTask
.The task properties will change according to the custom implementation. Therefore, you need to enter values for your custom properties as shown below.
Parameter Name Value To http://localhost:9000/soap/SimpleStockQuoteService StockFile The directory path to the stockfile.txt
file.SynapceEnvironment -