Objectives
Web Service Eventing Specification (http://www.w3.org/Submission/WS-Eventing) defines how Web Services Eventing supports the simplest levels of Web service interfaces for notification producers and consumers in a distributed event management system. It is a baselined set of operations that allow Web services to provide asynchronous notifications to interested parties. WS-Eventing defines the simplest level of Web services interfaces for notification producers and notification consumers including standard message exchanges, to be implemented by service providers that wish to act in these roles, along with operational requirements expected of them. It has a set of functions supporting publish/subscribe required by robust, scalable enterprise applications including message brokering and topic-based subscription management.
WSO2 MB supports WS-Eventing. This sample shows you how to register a Web Service as an event receiver and subscribe it to the message brokering server. The EventSinkService is a sample web service which listens on JMS Topic, 'foo/bar' and prints the received message to the console. In this scenario it acts as a subscriber to the 'foo.bar' JMS Topic. The sample also illustrates how to publish messages to this subscription. The eventing component of MB includes a BrokerClient which is capable of registering EventSinkService as an event receiver as well as the BrokerClient publish messages to the JMS Topic.
Prerequisites
Install and run the WSO2 Message Broker using the instructions in section Getting Started.
Define the Web Service
One of the simplest services one can write is similar to the following.
/* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ public class EventSinkService { public void receive(String message){ System.out.println("Got the message ==> " + message); } }
Host the Service and Start a Broker Client
This service has to be hosted in a server, and a broker client should be defined with that service url as follows.
private AxisServer axisServer; private BrokerClient brokerClient; public void start() { try { System.setProperty("javax.net.ssl.trustStore", "../../repository/resources/security/wso2carbon.jks"); System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon"); this.axisServer = new AxisServer(); this.axisServer.deployService(EventSinkService.class.getName()); this.brokerClient = new BrokerClient("https://localhost:9443/services/EventBrokerService", "admin", "admin"); // give time to start the simple http server try { Thread.sleep(2000); } catch (InterruptedException e) { } } catch (AxisFault axisFault) { System.out.println("Can not start the server"); } catch (AuthenticationExceptionException e) { e.printStackTrace(); } }
Subscribe the Service to Receive Events
Subscribe the above service to the broker to receive events using a "topic". The broker client implementation allows to subscribe a service to a topic in Message Broker. A topic named "foo/bar" is used in this sample.
public String subscribe() { // set the properties for ssl try { return this.brokerClient.subscribe("foo/bar" , "http://localhost:6060/axis2/services/EventSinkService/receive"); } catch (BrokerClientException e) { e.printStackTrace(); } return null; }
Publish Messages
When messages or events are published to the topic in Message Broker, they are received by the Web service.
public void publish(){ try { this.brokerClient.publish("foo/bar", getOMElementToSend()); } catch (AxisFault axisFault) { axisFault.printStackTrace(); } }
Note the message getting printed in the console.
The full code of this sample is as follows.
/* * Copyright (c) 2005-2010, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ import org.apache.axiom.om.OMAbstractFactory; import org.apache.axiom.om.OMElement; import org.apache.axiom.om.OMFactory; import org.apache.axiom.om.OMNamespace; import org.apache.axis2.AxisFault; import org.apache.axis2.engine.AxisServer; import org.wso2.carbon.event.client.broker.BrokerClient; import org.wso2.carbon.event.client.broker.BrokerClientException; import org.wso2.carbon.event.client.stub.generated.authentication.AuthenticationExceptionException; import java.rmi.RemoteException; public class PubSubClient { private AxisServer axisServer; private BrokerClient brokerClient; public void start() { try { System.setProperty("javax.net.ssl.trustStore", "../../repository/resources/security/wso2carbon.jks"); System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon"); this.axisServer = new AxisServer(); this.axisServer.deployService(EventSinkService.class.getName()); this.brokerClient = new BrokerClient("https://localhost:9443/services/EventBrokerService", "admin", "admin"); // give time to start the simple http server try { Thread.sleep(2000); } catch (InterruptedException e) { } } catch (AxisFault axisFault) { System.out.println("Can not start the server"); } catch (AuthenticationExceptionException e) { e.printStackTrace(); } } public String subscribe() { // set the properties for ssl try { return this.brokerClient.subscribe("foo/bar" , "http://localhost:6060/axis2/services/EventSinkService/receive"); } catch (BrokerClientException e) { e.printStackTrace(); } return null; } public void publish(){ try { this.brokerClient.publish("foo/bar", getOMElementToSend()); } catch (AxisFault axisFault) { axisFault.printStackTrace(); } } public void unsubscribe(String subscriptionID){ try { this.brokerClient.unsubscribe(subscriptionID); } catch (RemoteException e) { e.printStackTrace(); } } public void stop(){ try { this.axisServer.stop(); } catch (AxisFault axisFault) { axisFault.printStackTrace(); } } public static void main(String[] args) { PubSubClient pubSubClient = new PubSubClient(); pubSubClient.start(); String subscriptionId = pubSubClient.subscribe(); pubSubClient.publish(); try { Thread.sleep(5000); } catch (InterruptedException e) {} pubSubClient.unsubscribe(subscriptionId); pubSubClient.stop(); } private OMElement getOMElementToSend() { OMFactory omFactory = OMAbstractFactory.getOMFactory(); OMNamespace omNamespace = omFactory.createOMNamespace("http://ws.sample.org", "ns1"); OMElement receiveElement = omFactory.createOMElement("receive", omNamespace); OMElement messageElement = omFactory.createOMElement("message", omNamespace); messageElement.setText("Test publish message"); receiveElement.addChild(messageElement); return receiveElement; } }