com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links' is unknown.

Event-driven Runtime Governance Platform

This page describes how to prepare your Event Driven Runtime Governance platform efficiently. This can be achieved using two key WSO2 products, namely:

WSO2 Governance Registry is the registry and repository product for WSO2 while CEP takes care of processing complex events in high throughput. The following are the scenarios that are achieved by this platform:

  • An API metadata artifact is created in the WSO2 Governance Registry (for more information see Governance Artifacts).
  • It has a LifeCycle that has Development, Testing and Production states.
  • If any API comes to the Production state, and if the production API gets updated frequently, demote the API to Testing state. (In this case, if API is getting updated more than 3 times within 2 minutes, the API is demoted back to Testing state. This indicates that it should be tested more).

Prerequisites:

Procedure:

  • Step 1: G-Reg publishes all registry events to CEP (through Apache Thrift protocol).
  • Step 2: In CEP, a query is waiting for a pattern/condition. Based on the events, it decides what to do.
  • Step 3: When CEP fires an event, it calls G-Reg remotely and demotes the API artifact from the Staging state to the Testing state.

Step 1

  1. Navigate to GREG_HOME/ samples/handler/src to find the source code of the Handler Sample.
  2. Add the following dependencies to your POM file:

    <dependency>
        <groupid>org.wso2.carbon</groupid>
        <artifactid>org.wso2.carbon.databridge.agent.thrift</artifactid>
        <version>4.0.1</version>
    </dependency>
    <dependency>
        <groupid>org.wso2.carbon</groupid>
        <artifactid>org.wso2.carbon.databridge.commons</artifactid>
        <version>4.0.0</version>
    </dependency>
  3. Comment-out the following exclusions in your POM file: 

    <!--<exclusion>
        <groupid>org.eclipse.osgi</groupId>
        <artifactid>org.eclipse.osgi.services</artifactId>
    </exclusion>-->
    <!--<exclusion>
        <groupid>org.wso2.carbon</groupId>
        <artifactid>org.wso2.carbon.context</artifactId>
    </exclusion>-->
  4. Add the following plugin to your POM file: 

    <plugin>
        <groupid>org.apache.felix</groupid>
        <artifactid>maven-scr-plugin</artifactid>
    </plugin>
  5. Add a new Java Class named StatisticsCollectorServiceComponent at GREG_HOME/samples/handler/src/src/main/java/org/wso2/carbon/registry/samples/statistics/StatisticsCollectorServiceComponent.java with the following source: 

    package org.wso2.carbon.registry.samples.statistics;
       
    import org.osgi.framework.ServiceRegistration;
    import org.osgi.service.component.ComponentContext;
    import org.wso2.carbon.context.CarbonContext;
    import org.wso2.carbon.context.PrivilegedCarbonContext;
    import org.wso2.carbon.databridge.agent.thrift.Agent;
    import org.wso2.carbon.databridge.agent.thrift.DataPublisher;
    import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
    import org.wso2.carbon.databridge.commons.Event;
    import org.wso2.carbon.databridge.commons.exception.NoStreamDefinitionExistException;
    import org.wso2.carbon.registry.core.service.RegistryService;
    import org.wso2.carbon.registry.core.statistics.StatisticsCollector;
    import org.wso2.carbon.registry.core.utils.RegistryUtils;
    import org.wso2.carbon.registry.samples.statistics.util.ArtifactUtil;
    import org.wso2.carbon.utils.NetworkUtils;
       
    /**
     * @scr.component name="org.wso2.carbon.registry.samples.statistics" immediate="true"
     * @scr.reference name="registry.service" interface="org.wso2.carbon.registry.core.service.RegistryService"
     * cardinality="1..1" policy="dynamic" bind="setRegistryService" unbind="unsetRegistryService"
     */
    public class StatisticsCollectorServiceComponent {
       
        public static final String REGISTRY_ACTIVITY_STREAM = "govstream1";
        public static final String VERSION = "1.0.0";
        private RegistryService registryService;
       
        private ServiceRegistration serviceRegistration;
       
        protected void activate(ComponentContext context) {
            serviceRegistration = context.getBundleContext().registerService(
                    StatisticsCollector.class.getName(), new StatisticsCollector() {
                public void collect(Object... objects) {
                    try {
                        // Create Data Publisher
                        RegistryUtils.setTrustStoreSystemProperties();
                        DataPublisher dataPublisher = new DataPublisher(
                                "tcp://" + NetworkUtils.getLocalHostname() + ":7612", "admin", "admin",
                                new Agent(new AgentConfiguration()));
       
                        // Find Data Stream
                        String streamId;
                        try {
                            streamId = dataPublisher.findStream(REGISTRY_ACTIVITY_STREAM, VERSION);
                        } catch (NoStreamDefinitionExistException ignored) {
                            streamId = dataPublisher.defineStream("{" +
                                    "  'name':'" + REGISTRY_ACTIVITY_STREAM + "'," +
                                    "  'version':'" + VERSION + "'," +
                                    "  'nickName': 'Registry_Activity'," +
                                    "  'description': 'Registry Activities'," +
                                    "  'metaData':[" +
                                    "          {'name':'clientType','type':'STRING'}" +
                                    "  ]," +
                                    "  'payloadData':[" +
                                    "          {'name':'operation','type':'STRING'}," +
                                    "          {'name':'user','type':'STRING'}," +
                                    "          {'name':'rpath','type':'STRING'}" +
                                    "  ]" +
                                    "}");
                        }
       
                        if (!streamId.isEmpty() && objects.length>=2) {
                            // Publish Event to Stream
                            Event event = new Event(
                                    streamId, System.currentTimeMillis(),
                                    new Object[]{"external"}, null,
                                    new Object[]{
                                    ArtifactUtil.isUpdate(objects),
                                    ArtifactUtil.getLCState(registryService.getRegistry(CarbonContext.getCurrentContext().getUsername(), PrivilegedCarbonContext.getCurrentContext().getTenantId()),ArtifactUtil.getArtifactAttribute(objects[2] != null?objects[2].toString():"nil","name")),
                                    ArtifactUtil.getAPIResourcePath(objects[2] != null?objects[2].toString():"nil",objects)
                                    }
                            );
                            dataPublisher.publish(event);
                            dataPublisher.stop();
                            System.out.println("Successfully Published Event");
                        }
       
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, null);
        }
       
        protected void deactivate(ComponentContext context) {
            serviceRegistration.unregister();
        }
       
        protected void setRegistryService(RegistryService registryService) {
            this.registryService=registryService;
            // The Maven SCR Plugin Needs These
     
        }
       
        protected void unsetRegistryService(RegistryService registryService) {
            this.registryService=null;
            // The Maven SCR Plugin Needs These
        }
    }
  6. Compile the source code by running the following command inside GREG_HOME/samples/handler/src: mvn clean install. You get the built JAR inside the target folder.
  7. Now copy the JAR to the GREG_HOME/repository/component/plugins folder.
  8. Start the WSO2 Governance Registry server.

Step 2

Now we have to define a rule in CEP server which will listen to our pattern mentioned in the POC scenario. For that in CEP we can configure something called CEP bucket which defines a query to listens to events and fire events when query is satisfied.

To access and use the bucket configuration XML file:

  1. Create a file called GovernanceAnalizer.xml with the following XML content.
  2. Copy the file inside the CEP_HOME/repository/deployment/server/cepbuckets folder.

    <cep:bucket name="GovernanceAnalizer" xmlns:cep="http://wso2.org/carbon/cep">
      <cep:description>testsss</cep:description>
     
      <cep:engineproviderconfiguration engineprovider="SiddhiCEPRuntime">
        <cep:property name="siddhi.persistence.snapshot.time.interval.minutes">0</cep:property>
        <cep:property name="siddhi.enable.distributed.processing">false</cep:property>
      </cep:engineproviderconfiguration>
     
      <cep:input brokername="governance_broker" topic="govstream1/1.0.0">
        <cep:tuplemapping queryeventtype="Tuple" stream="govstream1">
          <cep:property inputdatatype="payloadData" inputname="operation" name="operation" type="java.lang.String">
          <cep:property inputdatatype="payloadData" inputname="user" name="user" type="java.lang.String">
          <cep:property inputdatatype="payloadData" inputname="rpath" name="rpath" type="java.lang.String">
        </cep:property></cep:property></cep:property></cep:tuplemapping>
      </cep:input>
     
      <cep:query name="GovernanceAnalizeQuery">
        <cep:expression>from govstream1[operation=='update' and user=='Production']#window.timeBatch( 1 min )
                             insert into outStream2 count(operation) as countProp ,user,rpath having countProp==3;</cep:expression>
        <cep:output brokername="governance_agent_broker" topic="users.org3/1.2.0">
          <cep:tuplemapping>
            <cep:metadata>
            <cep:correlationdata>
            <cep:payloaddata>
              <cep:property name="operation" type="java.lang.Long" valueof="countProp">
              <cep:property name="user" type="java.lang.String" valueof="user">
              <cep:property name="rpath" type="java.lang.String" valueof="rpath">
            </cep:property></cep:property></cep:property></cep:payloaddata>
          </cep:correlationdata></cep:metadata></cep:tuplemapping>
        </cep:output>
      </cep:query>
     
    </cep:bucket>
  3. The "engineProviderConfiguration" element describes the CEP runtime engine details. This is not related to the flow, but includes some static configurations.
  4. Define a broker to listen to the events sent by WSO2 Governance Registry.

    <cep:input brokername="governance_broker" topic="govstream1/1.0.0">
      <cep:tuplemapping queryeventtype="Tuple" stream="govstream1">
        <cep:property inputdatatype="payloadData" inputname="operation" name="operation" type="java.lang.String">
        <cep:property inputdatatype="payloadData" inputname="user" name="user" type="java.lang.String">
        <cep:property inputdatatype="payloadData" inputname="rpath" name="rpath" type="java.lang.String">
      </cep:property></cep:property></cep:property></cep:tuplemapping>
    </cep:input>

    This configuration is defined to match with the event schema sent from the G-Reg side. G-Reg sends events in an stream called "govstream1" with the version "1.0.0" which has three inputs in the payload named "operation", "user" and "rpath". Next define the CEP query

    from govstream1[operation=='update' and user=='Production']#window.timeBatch( 1 min )
                             insert into outStream2 count(operation) as countProp ,user,rpath having countProp==3; 

    This code enables you to listen to events whose operation attribute value is 'update' and user attribute value is 'Production' within a 2 minutes sliding time window and fire an event if 3 'update' operations occur within that time window.

  5. Finally define the outout stream so that you can do further processing in the call back method. Define a particular broker called "governance_agent_broker" which is a custom broker created specifically for this case which remotely calls G-Reg and demotes the API artifact.

    <cep:output brokername="governance_agent_broker" topic="users.org3/1.2.0">
          <cep:tuplemapping>
            <cep:metadata>
            <cep:correlationdata>
            <cep:payloaddata>
              <cep:property name="operation" type="java.lang.Long" valueof="countProp">
              <cep:property name="user" type="java.lang.String" valueof="user">
              <cep:property name="rpath" type="java.lang.String" valueof="rpath">
            </cep:property></cep:property></cep:property></cep:payloaddata>
          </cep:correlationdata></cep:metadata></cep:tuplemapping>
        </cep:output>

Step 3

  1. This step calls G-Reg remotely and demotes the API artifact from the Production state to Testing. Define a custom event broker named "governance_agent_broker" as explained in the description.
  2. See here on how to create a custom event broker. Write a custom broker class and a corresponding factory class and deploy them. Use the following Custom Broker class and Factory class for this case:

    public class GovernanceBroker implements BrokerType {
     
        public BrokerTypeDto getBrokerTypeDto() {
            BrokerTypeDto brokerTypeDto = new BrokerTypeDto();
            brokerTypeDto.setName("GovernanceAgent");
            return brokerTypeDto ;
        }
     
        public String subscribe(String s, BrokerListener brokerListener, BrokerConfiguration brokerConfiguration, AxisConfiguration axisConfiguration) throws BrokerEventProcessingException {
            return null;  //To change body of implemented methods use File | Settings | File Templates.
        }
     
        public void publish(String s, Object o, BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException {
            Event event = (Event) ((Object[]) o)[0];
           Object [] objects = event.getPayloadData();
           String operation= objects[0].toString();
            String user= objects[1].toString();
            String path= objects[2].toString();
     
             for(Object oo:objects){
               if(oo != null){
                   System.out.println("+++"+oo.toString());
               }
             }
     
            String CARBON_HOME = "PATH TO CEP HOME";
            String username = "admin";
            String password = "admin";
            String serverURL = "https://localhost:9443/registry";
            String axis2Conf = ServerConfiguration.getInstance().getFirstProperty("Axis2Config.clientAxis2XmlLocation");
            String axis2Repo = CARBON_HOME + File.separator + "repository" +
                    File.separator + "deployment" + File.separator + "client";
            System.setProperty("javax.net.ssl.trustStore", CARBON_HOME + File.separator + "repository" +
                    File.separator + "resources" + File.separator + "security" + File.separator +
                    "wso2carbon.jks");
            System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
            System.setProperty("javax.net.ssl.trustStoreType", "JKS");
            System.setProperty("carbon.repo.write.mode", "true");
     
     
            try {
                RemoteRegistry registry = new RemoteRegistry(new URL(serverURL), username,password);
                registry.invokeAspect(path,"SampleLifeCycle","Demote");
                System.out.println("Successfully Demoted the API at "+path);
            } catch (RegistryException e) {
                System.out.println("ERR" +e.getMessage());
            } catch (MalformedURLException e) {
                System.out.println("ERR" +e.getMessage());
            }
     
        }
     
        public void testConnection(BrokerConfiguration brokerConfiguration) throws BrokerEventProcessingException {
            //To change body of implemented methods use File | Settings | File Templates.
        }
     
        public void unsubscribe(String s, BrokerConfiguration brokerConfiguration, AxisConfiguration axisConfiguration, String s1) throws BrokerEventProcessingException {
            //To change body of implemented methods use File | Settings | File Templates.
        }
    }
    public class GovernanceBrokerFactoryImpl implements BrokerTypeFactory {
     
        public BrokerType getBrokerType() {
            return new GovernanceBroker();
        }
    }

After Step 3 is completed the procedure is complete. Open CEP_HOME/repository/conf/carbon.xml, set the value to 1, and start the CEP server.

To test the procedure:

  1. Login to G-Reg management console https://localhost:9443/carbon/ as an admin.
  2. Go to Home > Extensions > Configure > Lifecycles. Click Add new Lifecycle and you see a configuration XML file. Just 
  3. Click on Save. Now you have created a life cycle called "SampleLifecycle".
  4. Go to Home > Metadata > Add > API and create an API by filling out the required fields.
  5. Attach the previouly created "SimpleLifycycle" to the API that you saved. See here for more details on how to achieve this.
  6. Once a Lifecycle is attached to the API, you can see that it is in Development state.
  7. Click Promote to move it to Testing and do the same to push it to Production.
  8. Once it is in Production, this is the scenario: If you edit the API artifact (which is now in Production state) three times, you can see that it automatically demotes back to Testing state. 


If you are further interested in extending this to a wider integrated platform, the following is a better deployment, where you have a Business Activity Monitor to monitor and analyze all the business events.

com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.