In addition to the default publisher types, you can define your own custom publisher, which gives . This provides more flexibility to publish events that are sent to WSO2 products. Since each Each event publisher implementation is an OSGI bundle. Therefore, you can easily deploy /undeploy it easily on the it as well as undo its deployment on a WSO2 product. To create a custom event publisher, import import the org.wso2.carbon.event.output.adaptor.core
package with that contains the provided skeleton classes/interfaces required by a for the custom publisher implementation.
Table of Contents |
---|
...
Implementing the OutputEventAdapter
...
interface
The org.wso2.carbon.event.output.adapter.
core .OutputEventAdapter interface contains the event publisher event publisher logic that will be is used to publish to publish events. You should override the methods given below methods when implementing your own custom publisher.
...
void
...
init()
...
throws
...
OutputEventAdapterException
This method is called when initiating the event publisher bundle. Relevant code segments
...
that are needed when loading OSGI bundle can be included in this method.
...
void testConnect() throws TestConnectionNotSupportedException, ConnectionUnavailableException
This method is used to test the connection of the publishing server.
void
...
connect()
...
throws
...
ConnectionUnavailableException
...
This method can be called to connect to
...
the backend before the events are published.
void
...
publish(Object
...
message,
...
Map<String,
...
String>
...
dynamicProperties)
...
throws
...
ConnectionUnavailableException
...
This method publishes events.
...
It throws the
ConnectionUnavailableException
if it cannot connect to the
...
backend.
void
...
disconnect()
...
This method is called after the publishing is done, or when the
ConnectionUnavailableException
is thrown.void
...
destroy()
...
This method can be used to clean all the resources consumed.
boolean
...
isPolled()
...
This method checks whether events get accumulated at the adapter, and clients connect to it to collect events.
Below The following is a sample Email Receiver publisher implementation of the methods described above described methods:.
Code Block |
---|
public class EmailEventAdapter implements OutputEventAdapter { @Override public void init() throws OutputEventAdapterException { tenantId= PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(); //ThreadPoolExecutor will be assigned if it is null. if (threadPoolExecutor == null) { int minThread; int maxThread; long defaultKeepAliveTime; int jobQueSize; //If global properties are available those will be assigned else constant values will be assigned if (globalProperties.get(EmailEventAdapterConstants.MIN_THREAD_NAME) != null) { minThread = Integer.parseInt(globalProperties.get(EmailEventAdapterConstants.MIN_THREAD_NAME)); } else { minThread = EmailEventAdapterConstants.MIN_THREAD; } if (globalProperties.get(EmailEventAdapterConstants.MAX_THREAD_NAME) != null) { maxThread = Integer.parseInt(globalProperties.get(EmailEventAdapterConstants.MAX_THREAD_NAME)); } else { maxThread = EmailEventAdapterConstants.MAX_THREAD; } if (globalProperties.get(EmailEventAdapterConstants.ADAPTER_KEEP_ALIVE_TIME_NAME) != null) { defaultKeepAliveTime = Integer.parseInt(globalProperties.get( EmailEventAdapterConstants.ADAPTER_KEEP_ALIVE_TIME_NAME)); } else { defaultKeepAliveTime = EmailEventAdapterConstants.DEFAULT_KEEP_ALIVE_TIME_IN_MILLS; } if (globalProperties.get(EmailEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE_NAME) != null) { jobQueSize = Integer.parseInt(globalProperties.get( EmailEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE_NAME)); } else { jobQueSize = EmailEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE; } threadPoolExecutor = new ThreadPoolExecutor(minThread, maxThread, defaultKeepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(jobQueSize)); } } @Override public void testConnect() throws TestConnectionNotSupportedException { throw new TestConnectionNotSupportedException("Test connection is not available"); } @Override public void connect() throws ConnectionUnavailableException { if (session == null) { /** * Default SMTP properties for outgoing messages. */ String smtpFrom; String smtpHost; String smtpPort; /** * Default from username and password for outgoing messages. */ final String smtpUsername; final String smtpPassword; // initialize SMTP session. Properties props = new Properties(); props.putAll(globalProperties); //Verifying default SMTP properties of the SMTP server. smtpFrom = props.getProperty(MailConstants.MAIL_SMTP_FROM); smtpHost = props.getProperty(EmailEventAdapterConstants.MAIL_SMTP_HOST); smtpPort = props.getProperty(EmailEventAdapterConstants.MAIL_SMTP_PORT); if (smtpFrom == null) { String msg = "failed to connect to the mail server due to null smtpFrom value"; throw new ConnectionUnavailableException("The adapter " + eventAdapterConfiguration.getName() + " " + msg); } if (smtpHost == null) { String msg = "failed to connect to the mail server due to null smtpHost value"; throw new ConnectionUnavailableException ("The adapter " + eventAdapterConfiguration.getName() + " " + msg); } if (smtpPort == null) { String msg = "failed to connect to the mail server due to null smtpPort value"; throw new ConnectionUnavailableException ("The adapter " + eventAdapterConfiguration.getName() + " " + msg); } try { smtpFromAddress = new InternetAddress(smtpFrom); } catch (AddressException e) { log.error("Error in retrieving smtp address : " + smtpFrom, e); String msg = "failed to connect to the mail server due to error in retrieving " + "smtp from address"; throw new ConnectionUnavailableException ("The adapter " + eventAdapterConfiguration.getName() + " " + msg, e); } //Retrieving username and password of SMTP server. smtpUsername = props.getProperty(MailConstants.MAIL_SMTP_USERNAME); smtpPassword = props.getProperty(MailConstants.MAIL_SMTP_PASSWORD); //initializing SMTP server to create session object. if (smtpUsername != null && smtpPassword != null) { session = Session.getInstance(props, new Authenticator() { public PasswordAuthentication getPasswordAuthentication() { return new PasswordAuthentication(smtpUsername, smtpPassword); } }); } else { log.error("Error in smtp username & password verification"); String msg = "failed to connect to the mail server due to failed " + "user password authorization"; throw new ConnectionUnavailableException("The adapter " + eventAdapterConfiguration.getName() + " " + msg); } } } @Override public void publish(Object message, Map<String, String> dynamicProperties) { //Get subject and emailIds from dynamic properties String subject = dynamicProperties.get(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_SUBJECT); String[] emailIds = dynamicProperties.get(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_ADDRESS) .replaceAll(" ", "").split(EmailEventAdapterConstants.EMAIL_SEPARATOR); String emailType = dynamicProperties.get(EmailEventAdapterConstants.APAPTER_MESSAGE_EMAIL_TYPE); //Send email for each emailId for (String email : emailIds) { try { threadPoolExecutor.submit(new EmailSender(email, subject, message.toString(), emailType)); } catch (RejectedExecutionException e) { EventAdapterUtil.logAndDrop(eventAdapterConfiguration.getName(), message, "Job queue is full", e, log, tenantId); } } } @Override public void disconnect() { //not required } @Override public void destroy() { //not required } @Override public boolean isPolled() { return false; } } |
Implementing the OutputEventAdapterFactory
...
class
The org.wso2.carbon.event.output.adapter.core.OutputEventAdapterFactory
class can be used as the factory to create your appropriate event event publisher type. You should override the methods given below methods when extending your own custom publisher.
public
...
String
...
getType()
Here, the type needs to be specified
...
. This string is displayed in the publisher interface in the adapter type drop down list.
...
public
...
List<String>
...
getSupportedMessageFormats()
...
Here, the supported message formats for the created publisher type need to be specified.
public List<Property> getStaticPropertyList()
Here static properties
...
need to be specified. These properties
...
use the values assigned when creating a publisher. For more information on adapter properties see Event Publisher Configuration.
...
public
...
abstract
...
List<Property>
...
getDynamicPropertyList()
You can define dynamic properties similar to static properties
...
. The only difference is that dynamic property values can be derived
...
from events
...
handled by publisher. For more information on adapter properties
...
public
...
abstract
...
String
...
getUsageTips()
Specify any hints to be displayed in the
...
Management Console.
public OutputEventAdapter createEventAdapter(OutputEventAdapterConfiguration eventAdapterConfiguration, Map<String, String> globalProperties)
This method creates
...
the publisher by specifying event adapter configuration and global properties
...
that are common to
...
each adapter type.
Below The following is a sample Email Receiver publisher implementation of the the OutputEventAdapterFactory
class:.
Code Block |
---|
public class EmailEventAdapterFactory extends OutputEventAdapterFactory { @Override public String getType() { return EmailEventAdapterConstants.ADAPTER_TYPE_EMAIL; } @Override public List<String> getSupportedMessageFormats() { List<String> supportedMessageFormats = new ArrayList<String>(); supportedMessageFormats.add(MessageType.TEXT); supportedMessageFormats.add(MessageType.XML); supportedMessageFormats.add(MessageType.JSON); return supportedMessageFormats; } @Override public List<Property> getStaticPropertyList() { return null; } @Override public List<Property> getDynamicPropertyList() { List<Property> dynamicPropertyList = new ArrayList<Property>(); // set email address Property emailAddress = new Property(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_ADDRESS); emailAddress.setDisplayName( resourceBundle.getString(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_ADDRESS)); emailAddress.setRequired(true); emailAddress.setHint(resourceBundle.getString(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_ADDRESS_HINT)); // set email subject Property subject = new Property(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_SUBJECT); subject.setDisplayName( resourceBundle.getString(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_SUBJECT)); subject.setRequired(true); //set format of the email Property format = new Property(EmailEventAdapterConstants.APAPTER_MESSAGE_EMAIL_TYPE); format.setDisplayName (resourceBundle.getString(EmailEventAdapterConstants.APAPTER_MESSAGE_EMAIL_TYPE)); format.setRequired(false); format.setOptions(new String[]{EmailEventAdapterConstants.MAIL_TEXT_PLAIN, EmailEventAdapterConstants.MAIL_TEXT_HTML}); format.setDefaultValue(EmailEventAdapterConstants.MAIL_TEXT_PLAIN); format.setHint(resourceBundle.getString(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_TYPE_HINT)); dynamicPropertyList.add(emailAddress); dynamicPropertyList.add(subject); dynamicPropertyList.add(format); return dynamicPropertyList; } @Override public String getUsageTips() { return null; } @Override public OutputEventAdapter createEventAdapter(OutputEventAdapterConfiguration eventAdapterConfiguration, Map<String, String> globalProperties) { return new EmailEventAdapter(eventAdapterConfiguration, globalProperties); } } |
Exposing
...
the custom event publisher as an OSGI
...
service
Apart from the above, you can maintain a service class under the internal\ds\
directory to expose the custom event publisher implementation as an OSGI service. When exposing the service, it needs to expose as “OutputtEventAdaptorFactory” be exposed as a service of the OutputtEventAdaptorFactory
type. Below The following is a sample implementation for of a service class for a custom defined publisher:.
Code Block |
---|
/** * @scr.component component.name="output.Email.AdapterService.component" immediate="true" */ public class EmailEventAdapterServiceDS { private static final Log log = LogFactory.getLog(EmailEventAdapterServiceDS.class); /** * initialize the email service here service here. * * @param context */ protected void activate(ComponentContext context) { try { OutputEventAdapterFactory emailEventAdaptorFactory = new EmailEventAdapterFactory(); context.getBundleContext().registerService(OutputEventAdapterFactory.class.getName(), emailEventAdaptorFactory, null); if (log.isDebugEnabled()) { log.debug("Successfully deployed the output Email event adaptor service"); } } catch (RuntimeException e) { log.error("Can not create the output Email event adaptor service ", e); } } } |
Info |
---|
...
Deploying
...
the custom event publisher
Follow the procedure below to deploy a custom event publisher is very simple in WSO2 CEP 4.0.0. Simply implement .
- Implement the custom event publisher type
...
- .
- Build the project
...
- .
- Copy the
...
- OSGI bundle
...
- that is created inside the
...
- target
...
- directory into the
<CEP_HOME>/repository/components/dropins
directory.
...
When you start the CEP server startup, you can see the newly created event event publisher type service in the server service startup logs. The newly created custom event event publisher type will also be visible in the UI with necessary the relevant properties. Now you can create several instances of this event event publisher type.