Unknown macro: {next_previous_links}
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

In addition to the default publisher types, you can define your own custom publisher, which gives more flexibility to publish events that are sent to WSO2 products. Since each event publisher implementation is an OSGI bundle, you can deploy/undeploy it easily on the WSO2 product. To create a custom event publisher, import org.wso2.carbon.event.output.adaptor.core package with the provided skeleton classes/interfaces required by a custom publisher implementation.

Implementing OutputEventAdapter Interface

org.wso2.carbon.event.output.adapter.core .OutputEventAdapter interface contains the event publisher logic that will be used to publish events. You should override the below methods when implementing your own custom publisher.

  1.  void init() throws OutputEventAdapterException

    This method is called when initiating event publisher bundle. Relevant code segments which are needed when loading OSGI bundle can be included in this method.

  2. void testConnect() throws TestConnectionNotSupportedException, ConnectionUnavailableException

    This method is used to test the connection of the publishing server.

  3. void connect() throws ConnectionUnavailableException

     Can be called to connect to back end before events are published.

  4. void publish(Object message, Map<String, String> dynamicProperties) throws ConnectionUnavailableException

    Publish events. Throws ConnectionUnavailableException if it cannot connect to the back end.

  5. void disconnect()  

    Will be called after publishing is done, or when ConnectionUnavailableException is thrown.

  6. void destroy()

    The method can be used to clean all the resources consumed.

  7. boolean isPolled()

    Checks whether events get accumulated at the adapter and clients connect to it to collect events.

Below is a sample Email Receiver implementation of the above described methods:

public class EmailEventAdapter implements OutputEventAdapter {
    
	@Override
    public void init() throws OutputEventAdapterException {
        tenantId= PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        //ThreadPoolExecutor will be assigned  if it is null.
        if (threadPoolExecutor == null) {
            int minThread;
            int maxThread;
            long defaultKeepAliveTime;
            int jobQueSize;
            //If global properties are available those will be assigned else constant values will be assigned
            if (globalProperties.get(EmailEventAdapterConstants.MIN_THREAD_NAME) != null) {
                minThread = Integer.parseInt(globalProperties.get(EmailEventAdapterConstants.MIN_THREAD_NAME));
            } else {
                minThread = EmailEventAdapterConstants.MIN_THREAD;
            }
            if (globalProperties.get(EmailEventAdapterConstants.MAX_THREAD_NAME) != null) {
                maxThread = Integer.parseInt(globalProperties.get(EmailEventAdapterConstants.MAX_THREAD_NAME));
            } else {
                maxThread = EmailEventAdapterConstants.MAX_THREAD;
            }
            if (globalProperties.get(EmailEventAdapterConstants.ADAPTER_KEEP_ALIVE_TIME_NAME) != null) {
                defaultKeepAliveTime = Integer.parseInt(globalProperties.get(
                        EmailEventAdapterConstants.ADAPTER_KEEP_ALIVE_TIME_NAME));
            } else {
                defaultKeepAliveTime = EmailEventAdapterConstants.DEFAULT_KEEP_ALIVE_TIME_IN_MILLS;
            }
            if (globalProperties.get(EmailEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE_NAME) != null) {
                jobQueSize = Integer.parseInt(globalProperties.get(
                        EmailEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE_NAME));
            } else {
                jobQueSize = EmailEventAdapterConstants.ADAPTER_EXECUTOR_JOB_QUEUE_SIZE;
            }
            threadPoolExecutor = new ThreadPoolExecutor(minThread, maxThread, defaultKeepAliveTime,
                    TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(jobQueSize));
        }
    }
	
	@Override
    public void testConnect() throws TestConnectionNotSupportedException {
        throw new TestConnectionNotSupportedException("Test connection is not available");
    }
    
	@Override
    public void connect() throws ConnectionUnavailableException {
        if (session == null) {
            /**
             * Default SMTP properties for outgoing messages.
             */
            String smtpFrom;
            String smtpHost;
            String smtpPort;
            /**
             *  Default from username and password for outgoing messages.
             */
            final String smtpUsername;
            final String smtpPassword;
            // initialize SMTP session.
            Properties props = new Properties();
            props.putAll(globalProperties);
            //Verifying default SMTP properties of the SMTP server.
            smtpFrom = props.getProperty(MailConstants.MAIL_SMTP_FROM);
            smtpHost = props.getProperty(EmailEventAdapterConstants.MAIL_SMTP_HOST);
            smtpPort = props.getProperty(EmailEventAdapterConstants.MAIL_SMTP_PORT);
            if (smtpFrom == null) {
                String msg = "failed to connect to the mail server due to null smtpFrom value";
                throw new ConnectionUnavailableException("The adapter " +
                        eventAdapterConfiguration.getName() + " " + msg);
            }
            if (smtpHost == null) {
                String msg = "failed to connect to the mail server due to null smtpHost value";
                throw new ConnectionUnavailableException
                        ("The adapter " + eventAdapterConfiguration.getName() + " " + msg);
            }
            if (smtpPort == null) {
                String msg = "failed to connect to the mail server due to null smtpPort value";
                throw new ConnectionUnavailableException
                        ("The adapter " + eventAdapterConfiguration.getName() + " " + msg);
            }
            try {
                smtpFromAddress = new InternetAddress(smtpFrom);
            } catch (AddressException e) {
                log.error("Error in retrieving smtp address : " +
                        smtpFrom, e);
                String msg = "failed to connect to the mail server due to error in retrieving " +
                        "smtp from address";
                throw new ConnectionUnavailableException
                        ("The adapter " + eventAdapterConfiguration.getName() + " " + msg, e);
            }
            //Retrieving username and password of SMTP server.
            smtpUsername = props.getProperty(MailConstants.MAIL_SMTP_USERNAME);
            smtpPassword = props.getProperty(MailConstants.MAIL_SMTP_PASSWORD);
            //initializing SMTP server to create session object.
            if (smtpUsername != null && smtpPassword != null) {
                session = Session.getInstance(props, new Authenticator() {
                    public PasswordAuthentication getPasswordAuthentication() {
                        return new PasswordAuthentication(smtpUsername, smtpPassword);
                    }
                });
            } else {
                log.error("Error in smtp username & password verification");
                String msg = "failed to connect to the mail server due to failed " +
                        "user password authorization";
                throw new ConnectionUnavailableException("The adapter " +
                        eventAdapterConfiguration.getName() + " " + msg);
            }
        }
    }
    
	@Override
    public void publish(Object message, Map<String, String> dynamicProperties) {
        //Get subject and emailIds from dynamic properties
        String subject = dynamicProperties.get(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_SUBJECT);
        String[] emailIds = dynamicProperties.get(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_ADDRESS)
                .replaceAll(" ", "").split(EmailEventAdapterConstants.EMAIL_SEPARATOR);
        String emailType = dynamicProperties.get(EmailEventAdapterConstants.APAPTER_MESSAGE_EMAIL_TYPE);
        //Send email for each emailId
        for (String email : emailIds) {
            try {
                threadPoolExecutor.submit(new EmailSender(email, subject, message.toString(), emailType));
            } catch (RejectedExecutionException e) {
                EventAdapterUtil.logAndDrop(eventAdapterConfiguration.getName(), message, "Job queue is full", e, log, tenantId);
            }
        }
    }
    
	@Override
    public void disconnect() {
        //not required
    }
    
	@Override
    public void destroy() {
        //not required
    }
    
	@Override
    public boolean isPolled() {
        return false;
    }
}

Implementing OutputEventAdapterFactory Class

org.wso2.carbon.event.output.adapter.core.OutputEventAdapterFactory class can be used as the factory to create your appropriate event publisher type. You should override the below methods when extending your own custom publisher.

  1. public String getType()

    Here type needs to be specified, this string will be displayed in the publisher interface in the adapter type drop down list. 

  2. public List<String> getSupportedMessageFormats()

    Specify supported message formats for the created publisher type.

  3. public List<Property> getStaticPropertyList()

    Here static properties have to be specified. These properties will use the values assigned when creating a publisher. For more information on adapter properties see Event Publisher Configuration

  4. public abstract List<Property> getDynamicPropertyList()

    You can define dynamic properties similar to static properties, the only difference is dynamic property values can be derived by events handling by publisher. For more information on adapter properties see Event Publisher Configuration.

  5. public abstract String getUsageTips()

    Specify any hints to be displayed in the management console.

  6. public OutputEventAdapter createEventAdapter(OutputEventAdapterConfiguration eventAdapterConfiguration, Map<String, String> globalProperties)

    This method creates the publisher by specifying event adapter configuration and global properties which are common to every adapter type.

Below is a sample Email Receiver implementation of the OutputEventAdapterFactory class:

public class EmailEventAdapterFactory extends OutputEventAdapterFactory {
    
	@Override
    public String getType() {
        return EmailEventAdapterConstants.ADAPTER_TYPE_EMAIL;
    }
    
	@Override
    public List<String> getSupportedMessageFormats() {
        List<String> supportedMessageFormats = new ArrayList<String>();
        supportedMessageFormats.add(MessageType.TEXT);
        supportedMessageFormats.add(MessageType.XML);
        supportedMessageFormats.add(MessageType.JSON);
        return supportedMessageFormats;
    }
    
	@Override
    public List<Property> getStaticPropertyList() {
        return null;
    }
    
	@Override
    public List<Property> getDynamicPropertyList() {
        List<Property> dynamicPropertyList = new ArrayList<Property>();
        // set email address
        Property emailAddress = new Property(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_ADDRESS);
        emailAddress.setDisplayName(
                resourceBundle.getString(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_ADDRESS));
        emailAddress.setRequired(true);
        emailAddress.setHint(resourceBundle.getString(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_ADDRESS_HINT));
        // set email subject
        Property subject = new Property(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_SUBJECT);
        subject.setDisplayName(
                resourceBundle.getString(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_SUBJECT));
        subject.setRequired(true);
        //set format of the email
        Property format = new Property(EmailEventAdapterConstants.APAPTER_MESSAGE_EMAIL_TYPE);
        format.setDisplayName
                (resourceBundle.getString(EmailEventAdapterConstants.APAPTER_MESSAGE_EMAIL_TYPE));
        format.setRequired(false);
        format.setOptions(new String[]{EmailEventAdapterConstants.MAIL_TEXT_PLAIN, EmailEventAdapterConstants.MAIL_TEXT_HTML});
        format.setDefaultValue(EmailEventAdapterConstants.MAIL_TEXT_PLAIN);
        format.setHint(resourceBundle.getString(EmailEventAdapterConstants.ADAPTER_MESSAGE_EMAIL_TYPE_HINT));
        dynamicPropertyList.add(emailAddress);
        dynamicPropertyList.add(subject);
        dynamicPropertyList.add(format);
        return dynamicPropertyList;
    }
    
	@Override
    public String getUsageTips() {
        return null;
    }
    
	@Override
    public OutputEventAdapter createEventAdapter(OutputEventAdapterConfiguration eventAdapterConfiguration, Map<String,
            String> globalProperties) {
        return new EmailEventAdapter(eventAdapterConfiguration, globalProperties);
    }
}

Exposing Custom Event Receiver as an OSGI Service 

Apart from above, you can maintain a service class under internal\ds\ directory to expose the custom event publisher implementation as an OSGI service. When exposing the service, it needs to expose as “OutputtEventAdaptorFactory” type. Below is a sample implementation for a service class for a custom defined publisher:

/**
 * @scr.component component.name="output.Email.AdapterService.component" immediate="true"
 */
public class EmailEventAdapterServiceDS {

    private static final Log log = LogFactory.getLog(EmailEventAdapterServiceDS.class);

    /**
     * initialize the email service here service here.
     *
     * @param context
     */
    protected void activate(ComponentContext context) {

        try {
            OutputEventAdapterFactory emailEventAdaptorFactory = new EmailEventAdapterFactory();
            context.getBundleContext().registerService(OutputEventAdapterFactory.class.getName(),
                    emailEventAdaptorFactory, null);
            if (log.isDebugEnabled()) {
                log.debug("Successfully deployed the output Email event adaptor service");
            }
        } catch (RuntimeException e) {
            log.error("Can not create the output Email event adaptor service ", e);
        }
    }

}

Furthermore you can have a utility directory as internel\util\ where you can place utility classes required for the custom publisher implementation.

 Deploying Custom Event Publisher

Deploying a custom event publisher is very simple in WSO2 CEP 4.0.0. Simply implement the custom event publisher type, build the project and copy the created OSGI bundle which is inside the "target" folder into the <CEP_HOME>/repository/components/dropins. In CEP server startup, you can see the newly created event publisher type service in the server startup logs. The newly created custom event publisher type will also be visible in the UI with necessary properties. Now you can create several instances of this event publisher type.

  • No labels