In addition to the default receiver types, you can define your own custom receiver, which gives more flexibility to receive events that are sent to WSO2 products. Since each event receiver implementation is an OSGI bundle, you can deploy/undeploy it easily on the WSO2 product. To create a custom event receiver, import org.wso2.carbon.event.input.adaptor.core package with the provided skeleton classes/interfaces required by a custom receiver implementation.
...
void init(InputEventAdapterListener eventAdaptorListener) throws InputEventAdapterException
This method is called when initiating event receiver bundle. Relevant code segments which are needed when loading OSGI bundle can be included in this method.
void testConnect() throws TestConnectionNotSupportedException, InputEventAdapterRuntimeException, ConnectionUnavailableException
This method checks whether the receiving server is available.
void connect() throws InputEventAdapterRuntimeException, ConnectionUnavailableException
Method connect() will be called after calling the init() method. Intention is to connect to a receiving end and if it is not available "ConnectionUnavailableException" will be thrown.
void disconnect()
disconnect() method can be called when it is needed to disconnect from the connected receiving server.
void destroy()
The method can be called when removing an event receiver. The cleanups that has to be done when removing the receiver can be done over here.
boolean isEventDuplicatedInCluster()
Returns a boolean output stating whether an event is duplicated in a cluster or not. This can be used in clustered deployment.
boolean isPolling()
Checks whether events get accumulated at the adapter and clients connect to it to collect events.
Below is a sample File Tail Receiver implementation of the above described methods:
Code Block |
---|
public class FileTailEventAdapter implements InputEventAdapter { @Override public void init(InputEventAdapterListener eventAdapterListener) throws InputEventAdapterException { validateInputEventAdapterConfigurations(); this.eventAdapterListener = eventAdapterListener; } @Override public void testConnect() throws TestConnectionNotSupportedException { throw new TestConnectionNotSupportedException("not-supported"); } @Override public void connect() { createFileAdapterListener(); } @Override public void disconnect() { if (fileTailerManager != null) { fileTailerManager.getTailer().stop(); } } @Override public void destroy() { } @Override public boolean isEventDuplicatedInCluster() { return Boolean.parseBoolean(globalProperties.get(EventAdapterConstants.EVENTS_DUPLICATED_IN_CLUSTER)); } @Override public boolean isPolling() { return true; } private void validateInputEventAdapterConfigurations() throws InputEventAdapterException { String delayInMillisProperty = eventAdapterConfiguration.getProperties().get(FileTailEventAdapterConstants.EVENT_ADAPTER_DELAY_MILLIS); try{ Integer.parseInt(delayInMillisProperty); } catch (NumberFormatException e){ throw new InputEventAdapterException("Invalid value set for property Delay: " + delayInMillisProperty, e); } } private void createFileAdapterListener() { if(log.isDebugEnabled()){ log.debug("New subscriber added for " + eventAdapterConfiguration.getName()); } String delayInMillisProperty = eventAdapterConfiguration.getProperties().get(FileTailEventAdapterConstants.EVENT_ADAPTER_DELAY_MILLIS); int delayInMillis = FileTailEventAdapterConstants.DEFAULT_DELAY_MILLIS; if (delayInMillisProperty != null && (!delayInMillisProperty.trim().isEmpty())) { delayInMillis = Integer.parseInt(delayInMillisProperty); } boolean startFromEnd = false; String startFromEndProperty = eventAdapterConfiguration.getProperties().get(FileTailEventAdapterConstants.EVENT_ADAPTER_START_FROM_END); if (startFromEndProperty != null && (!startFromEndProperty.trim().isEmpty())) { startFromEnd = Boolean.parseBoolean(startFromEndProperty); } String filePath = eventAdapterConfiguration.getProperties().get( FileTailEventAdapterConstants.EVENT_ADAPTER_CONF_FILEPATH); FileTailerListener listener = new FileTailerListener(new File(filePath).getName(), eventAdapterListener); Tailer tailer = new Tailer(new File(filePath), listener, delayInMillis, startFromEnd); fileTailerManager = new FileTailerManager(tailer, listener); singleThreadedExecutor.execute(tailer); } } |
Implementing InputEventAdapterFactory Class
...
Deploying a custom event receiver is very simple in WSO2 CEP DAS 4.0.0. Simply implement the custom event receiver type, build the project and copy the created OSGI bundle that is inside the "target" folder into the <CEP<DAS_HOME>/repository/components/dropins. In CEP DAS server startup, you can see the newly created event receiver type service in the server startup logs. The newly created custom event receiver type will also be visible in the UI with necessary properties. Now you can create several instances of this event receiver type.