Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A main requirement of DAS is to support arbitrary key value pairs. Most of the RDBMS have a fixed schema and do not support storing arbitrary key values. In DAS, records are stored as blobs. This way, the data fields in a record can be converted to a blob and stored together with the record ID and the timestamp of the record. Storing data as blobs has one drawback. That is database level indexing cannot be used for blobs to search by fields. To overcome this issue, records are sent through the indexer before the records are persisted in DAS so that the indexer can keep a searchable copy of record data along with the record ID. This indexer does not store the field values. It only stores the record ID and keeps an index of data fields. When a search query is sent to DAS, the indexer identifies the matching records, gets their record IDs, and sends them to the persistence layer/recordstore. In recordstore level, the record blobs are deserialized and returned as the record objects from the AnalyticsDataService implementation.

Indexing Architecture
Anchor
Architecture
Architecture

DAS indexer is implemented using Apache Lucene which is a full text search library. Users can index records and search for records later via Lucene queries. Events received by DAS are converted to a list of  records and inserted into FileSystem based queues. These queues are created in <DAS_HOME>/repository/data/index_staging_queues directory. With a background thread, these queues are consumed and records are indexed. The indexed data is stored in the <DAS_HOME>/repository/data/index_data directory. The DAS index consists of smaller indexes known as shards. A shard can be accessed nly only by one index writer at a given time. (Index writer is the lucene class visible to outside world which is that is used to write lucene documents to a file system based index).Therefore having multiple shards can increase the write throughput (however, the write throughput can be limited by Disk IO operations). By default, DAS is configured to have six shards and one replica.

Info

In a high availability deployment, at least one replica must be saved.


Image RemovedImage Added

Indexing related configurations are done in the <DAS_HOME>/repository/conf/analytics/analytics-config.xml file. Additionally, information relating to shards is maintained in the <DAS_HOME>/repository/conf/analytics/local-shard-allocation-config.conf file. This file stores the shard number along with its state (that can be INIT or NORMAL). The INIT is the initial state. Usually this state cannot be seen from outside. This is because the INIT state changes to the NORMAL state once the server starts. If the indexing node is running, the state of shards should be NORMAL and not INIT. The NORMAL state denotes that the indexing node has started indexing. Therefore, whenever the data is ready to be indexed, the indexer node indexes the incoming data.

To re-index the whole dataset, follow the steps below:

...

.

By default, a single DAS server is configured to have six shards, and one replica for each shard (i.e., a total of twelve shards). Therefore, in a minimum HA setup, each DAS server contains three shards and replicas of the other three shards. Even if one server is shut down, the second server has replicas of the three shards that were there in the node that was shut down. This allows the high availability to be maintained.

The number of replicas and the number of shards are defined in the the analytics-config.xml. By changing these properties we can change the number of shards and replicas, but once they are defined and the server is started, we cannot change them. Events are are not needed  needed to be published to both nodes. Replication happens through hazelcast messages . by sending records are sent to other nodes via hazelcast

The following example further explains how shards are managed in a clustered deployment.

If there are three DAS nodes in the cluster, all twelve shards (i.e., the shards and the replicas) are split among the three nodes. Each node will have four shrds which consists of 2 shards and 2 replicas.

Anchor
reindex
reindex
Re-indexing existing data

To re-index the whole dataset, follow the steps below:

  1. Shut down the WSO2 DAS server.
  2. Remove all the index data stored in the <DAS_HOME>/repository/data directory.
  3. In the <DAS_HOME>/repository/conf/analytics/local-shard-allocation-config.conf file, change the mode for all the shards from NORMAL to INIT.
  4. Restart the WSO2 DAS server.


Debugging Indexing in a cluster

If there seems to be an issue with indexing, do the following in the given order. 

  1. In the <DAS_HOME>/repository/conf/analytics/analytics-config.xml file check whether the shards are properly allocated.

    Code Block
    languagexml
    <!-- The number of index data replicas the system should keep, for H/A, this should be at least 1, e.g. the value 0 means
            there aren't any copies of the data -->
       <indexReplicationFactor>1</indexReplicationFactor>
       <!-- The number of index shards, should be equal or higher to the number of indexing nodes that is going to be working,
            ideal count being 'number of indexing nodes * [CPU cores used for indexing per node]' -->
       <shardCount>6</shardCount>
  2. Check the logs that are printed when the cluster is initilized. The log must be similar to the example shown below.

    Code Block
    languagepowershell
    TID: [-1] [] [2017-05-12 13:17:27,538]  INFO {org.wso2.carbon.analytics.dataservice.core.indexing.IndexNodeCoordinator} -  Indexing Initialized: CLUSTERED {0={c13d3a23-b15a-4b9c-ac8f-a30df2811c98=Member [10.3.24.70]:4000 this, bc751d36-d345-4b8f-b133-b77793f04805=Member [10.3.24.67]:4000}, 1={c13d3a23-b15a-4b9c-ac8f-a30df2811c98=Member [10.3.24.70]:4000 this, bc751d36-d345-4b8f-b133-b77793f04805=Member [10.3.24.67]:4000}, 2={c13d3a23-b15a-4b9c-ac8f-a30df2811c98=Member [10.3.24.70]:4000 this, bc751d36-d345-4b8f-b133-b77793f04805=Member [10.3.24.67]:4000}, 3={c13d3a23-b15a-4b9c-ac8f-a30df2811c98=Member [10.3.24.70]:4000 this, bc751d36-d345-4b8f-b133-b77793f04805=Member [10.3.24.67]:4000}, 4={c13d3a23-b15a-4b9c-ac8f-a30df2811c98=Member [10.3.24.70]:4000 this, bc751d36-d345-4b8f-b133-b77793f04805=Member [10.3.24.67]:4000}, 5={c13d3a23-b15a-4b9c-ac8f-a30df2811c98=Member [10.3.24.70]:4000 this, bc751d36-d345-4b8f-b133-b77793f04805=Member [10.3.24.67]:4000}} | Current Node Indexing: Yes {org.wso2.carbon.analytics.dataservice.core.indexing.IndexNodeCoordinator}

    Here, {0={c13d3a23-b15a-4b9c-ac8f-a30df2811c98=Member [10.36.241.70]:4000 this, bc751d36-d345-4b8f-b133-b77793f04805=Member [10.36.241.67]:4000} means that the shard 0 is allocated to two nodes, and their IDs are c13d3a23-b15a-4b9c-ac8f-a30df2811c98 and bc751d36-d345-4b8f-b133-b77793f04805. The IPs of the two nodes are also mentioned in the log line. For a correctly configured two-node DAS cluster, this log line must contain all six shards (from 0 to 5) and the node IDs to which they are allocated.

    Info

    This log line does not contain both the node IDs when you initially set up the cluster and start the first server because the other node has not joined the cluster. The log line is printed with the complete mapping of shards to node IDs only after the 2nd node joins the cluster.

    The two node IDs mentioned in the log line must match the IDs mentioned in the my-node-id.dat of both DAS nodes. If there are more than two unique IDs in a 2 node cluster, the shard allocation may have been affectedand affected and it mayneed to be corrected.

...


  1. One reason

...

  1. to have more than

...

  1. one node ID is to allow the cluster to be reconfigured with new DAS packs without clearing the data sources configured in the analytics-datasources.xml file.

...

  1.  When a new DAS pack

...

  1. that points to the same analytics data sources is started, a new

...

  1. ID is generated and the new DAS pack

...

  1. is considered the third node

...

  1. to join the cluster. When

...

  1. you re-

...

  1. configure the cluster with new DAS packs,

...

  1. make sure

...

  1. the older

...

  1. IDs are first backed up and restored in the new packs. For more information, see Backing up and Restoring Analytics Data. If the two node cluster already

...

  1. has more than

...

  1. two unique

...

  1. IDs ( a result of starting a new DAS pack with a new

...

  1. ID),

...

  1. remove the unnecessary/additional

...

  1. IDs by following the steps given below.

    1. Keep backup of the current my-node-id.dat of both nodes.

  2. Extract the node ids mentioned in the log line mentioned above. This may contain more than 2 ids.

    1. Separate
  3. out
    1. the
  4. ids which
    1. IDs that do not match with
  5. the
    1. the my-node-id.dat of
  6. both
    1. the two nodes.
    Shutdown
    1. Shut down both the nodes.
    Get
    1. Replace the my-node-id.dat of either DAS node with one of the non-matching/unnecessary/additional node

    ids and replace the my-node-id.dat of any DAS node. (make sure
    1. IDs.

      Info

      Make sure that you keep a backup of the my-node-id.dat before doing this

    )
    1. .

       

    2. Start the DAS node mentioned in
    step 5 with property “
    1. substep d with the -DdisableIndexing=
    true”
    1. true property. This
    command will remove
    1. property removes the node
    id
    1. ID that
    we put
    1. you enter in the my-node-id.dat from the indexing cluster.
    2. Repeat
    the steps 4, 5
    1. substeps c, d, and
    6
    1. e for all the additional node
    ids
    1. IDs.
    Finally, restore
    1. Restore the two node
    ids (restore the my-node-id.dat files that
    1. IDs you backed up
    )
    1. in substep d.
  7. Clean the
    1. Clear the information in the in the <DAS_HOME>/repository/data
  8. folder
  9. Go to
    1.  directory.
    2. Open the <DAS_HOME>/repository/conf/analytics/local-shard-allocation

    .
    1. -config.conf file and

    replace
    1. update the content

    with following
    1. as follows:

      Info

      This must be done for both the nodes.

      Code Block
      0,INIT

    1. 
      1,INIT

    1. 
      2,INIT

    1. 
      3,INIT

    1. 
      4,INIT

    1. 
      5,INIT
    Do
    1. Start both the

    above for both
    1. nodes

    and start. You will see that the
    1. , and cheeck the content of the the <DAS_HOME>/repository/conf/analytics/local-shard-allocation-config.conf

    content is changed to
    1.  file. It should be changed as follows.

      Code Block
      0,NORMAL

    1. 
      1,NORMAL

    1. 
      2,NORMAL

    1. 
      3,NORMAL

    1. 
      4,NORMAL

    1. 
      5,NORMAL
    Note that the above configuration is valid a DAS configured with 6 shards and 1 replicas. Step 10 should be changed according to
    1. Info

      The nodes in this example are configured with six shards and one replicas for each shard. This step must change based on the number of shards and replicas

    you have
    1. configured.

Indexing in Spark Environment

Spark runs in a different JVM. Therefore, Spark it cannot write directly to Indexing indexing queues. When we You instruct the spark Spark to index its summarized data, what Spark actually does is, it sends the summarized data to be indexed , to a storage area called the staging area in the primary recordstore. So using the staging Index worker threads, these record store. This data is fetched and inserted to indexing queuethe indexing que via the staging index worker threads. From that this point onwards, the indexing process is the same . Usually indexing as described in the previous sections.

Info

Indexing can be slow if

...

you frequently run

...

Spark scripts with instructions to index summarized data

...

. Sometimes, there can be situations where it is instructed to repeatedly process the same

...

dataset within a small time frame (e.g.

...

, Spark scripts scheduled to run every

...

two minutes with indexing). In

...

these situations, the same data is indexed over and over again.

...

To reduce the system overhead by minimizing the unnecessary reindexing, it is recommended to use incremental processing

...

.