Unknown macro: {next_previous_link3}
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

WSO2 DAS has a distributed indexing engine which is built on top of Apache Lucene. Data is indexed and saved in the Data Access Layer of DAS in a store referred to as the Analytics File System. This section explains indexing used in WSO2 DAS.

A main requirement of DAS is to support arbitrary key value pairs. Most of the RDBMS have a fixed schema and does not support storing arbitrary key values. In DAS, records are stored as blobs. In that way, the data fields in a record can be converted to a blob and store them along with the record id and the timestamp of the record. Storing data as blobs has one drawback. That is database level indexing cannot be used for blobs to search by fields. To overcome this issue, before the records are persisted in DAS, records are sent through the Indexer, so indexer can keep a searchable copy of record data along with the record id. This indexer does not store the field values, it only stores the record id. It only keep an index of data fields. When a search query is sent to DAS, indexer will look for the matching records, get their record ids, send them to the persistence layer/recordstore. In recordstore level, the record blobs are deserialized and return as the record objects from AnalyticsDataService implementation.

Indexing Architecture

DAS indexer is implemented using Apache lucene which is a full text search library. Users can index records and then later can search records using lucene queries. Events received by DAS are converted to a list of “Record” objects and then they are inserted to FileSystem Based Queues. (This queue is created in {DAS_HOME}/repository/data/index_staging_queues). With a background thread, these queues are consumed and records are indexed. (Indexed data is stored in {DAS_HOME}/repository/data/index_data. DAS index consists of smaller indexes called shards. At a time a shard can be accessed by only one Index writer. So having multiple shards can increase the write throughput (Can be limited by Disk IO operations). By default, DAS is configured to have 6 shards and 1 replica (number of replicas come into play if DAS is clustered.

Indexing related configurations is in analytics-config.xml in {DAS_HOME}/repository/conf/analytics folder. Additionally, shards information is kept in a file called “local-shard-allocation-config.conf in the same location. This file stores the shard number along with its state (INIT, NORMAL). INIT is the initial state. Usually this state cannot be seen from outside, that is because as soon as the server starts, the INIT state changes to NORMAL state. If the indexing node is running, the state of shards cannot be INIT. It should be NORMAL. NORMAL state denotes that the indexing node has started indexing. So whenever the data is ready to be indexed, the indexer node will index incoming data.

To re-index whole dataset he/she should follow below steps.

  1. Stop Analytics Server
  2. Delete the content in <DAS_HOME>/repository/data
  3. Change the NORMAL state to INIT by editing the “local-shard-allocation-config.conf
  4. Start the Analytics Server

By default, DAS is configured to have 6 shards and 1 replica for each shard(shards and replicas, altogether 12 shards). So in Min-HA setup (two node cluster), each DAS server will contain 3 shards and replicas of the other 3 shards. This is communicated between the nodes through Hazelcast messages. Even if one server goes down, second server will have the replicas of the 3 shards which were in the node that went down, so HA is preserved. For example,  If there are 3 DAS nodes in the cluster, all 12 shards (shards + replicas) will be split among the 3 nodes where each node will have 4 shards (2 shards +  any 2 replicas from other 4 shards). 

Debugging Indexing in a cluster

If someone thinks there is an issue with indexing, first thing to do is, check the local-shard-allocation.config.conf and see if the shards are allocated properly. The number of shards and the number of replicas are mentioned in analytics-config.xml.

<!-- The number of index data replicas the system should keep, for H/A, this should be at least 1, e.g. the value 0 means
        there aren't any copies of the data -->
   <indexReplicationFactor>1</indexReplicationFactor>
   <!-- The number of index shards, should be equal or higher to the number of indexing nodes that is going to be working,
        ideal count being 'number of indexing nodes * [CPU cores used for indexing per node]' -->
   <shardCount>6</shardCount>

Second thing to check is the logs that are printed when the cluster is initialized.

When  DAS starts, it will print a log line similar to below one.

TID: [-1] [] [2017-05-12 13:17:27,538]  INFO {org.wso2.carbon.analytics.dataservice.core.indexing.IndexNodeCoordinator} -  Indexing Initialized: CLUSTERED {0={c13d3a23-b15a-4b9c-ac8f-a30df2811c98=Member [10.3.24.70]:4000 this, bc751d36-d345-4b8f-b133-b77793f04805=Member [10.3.24.67]:4000}, 1={c13d3a23-b15a-4b9c-ac8f-a30df2811c98=Member [10.3.24.70]:4000 this, bc751d36-d345-4b8f-b133-b77793f04805=Member [10.3.24.67]:4000}, 2={c13d3a23-b15a-4b9c-ac8f-a30df2811c98=Member [10.3.24.70]:4000 this, bc751d36-d345-4b8f-b133-b77793f04805=Member [10.3.24.67]:4000}, 3={c13d3a23-b15a-4b9c-ac8f-a30df2811c98=Member [10.3.24.70]:4000 this, bc751d36-d345-4b8f-b133-b77793f04805=Member [10.3.24.67]:4000}, 4={c13d3a23-b15a-4b9c-ac8f-a30df2811c98=Member [10.3.24.70]:4000 this, bc751d36-d345-4b8f-b133-b77793f04805=Member [10.3.24.67]:4000}, 5={c13d3a23-b15a-4b9c-ac8f-a30df2811c98=Member [10.3.24.70]:4000 this, bc751d36-d345-4b8f-b133-b77793f04805=Member [10.3.24.67]:4000}} | Current Node Indexing: Yes {org.wso2.carbon.analytics.dataservice.core.indexing.IndexNodeCoordinator}


Here {0={c13d3a23-b15a-4b9c-ac8f-a30df2811c98=Member [10.36.241.70]:4000 this, bc751d36-d345-4b8f-b133-b77793f04805=Member [10.36.241.67]:4000} means that, the shard 0 is allocated to 2 nodes and their ids are c13d3a23-b15a-4b9c-ac8f-a30df2811c98 and bc751d36-d345-4b8f-b133-b77793f04805. Also their respective ips are mentioned in the log line. So, For correctly configured 2 node DAS cluster, this log line should contain all 6 shards ( from 0 to 5) and their respective node ids. When you initially setup the cluster and start the first server, this log line will not contain the 2 node ids, as the other node has not joined the cluster previously. When the second node joins the cluster, a similar log line will be printed with complete map of shard along with node ids. The two node ids mentioned in the log line should match with the ids mentioned in the my-node-id.dat in both DAS nodes. In a two node cluster, if there are more than 2 unique ids, shard allocation is messed and needs to be fixed. One reason for containing more than 2 ids is that, reconfiguring the cluster with new DAS packs without clearing the data sources in analytics-datasources.xml. When a new DAS pack is started pointing to the same analytics data sources, a new id is generated and the new DAS pack will be considered the third node who joined the cluster. When we are re-configuring the cluster with new DAS packs, we need to make sure that we first backup the older ids and restore them in the new packs. If the two node cluster already contain more than 2 unique ids ( due to starting a new DAS pack with new id), we have to remove the unnecessary/additional ids. Steps are as below.

  1. Keep backup of the current my-node-id.dat of both nodes.

  2. Extract the node ids mentioned in the log line mentioned above. This may contain more than 2 ids.

  3. Separate out the ids which do not match with the my-node-id.dat of both nodes.

  4. Shutdown both nodes.
  5. Get one of the non-matching/unnecessary/additional node ids and replace the my-node-id.dat of any DAS node. (make sure you keep a backup of the my-node-id.dat before doing this) 
  6. Start the DAS node mentioned in step 5 with property “-DdisableIndexing=true”. This command will remove the node id that we put in my-node-id.dat from the indexing cluster.

  7. Repeat the steps 4, 5, and 6 for all the additional node ids.

  8. Finally, restore the two node ids (restore the my-node-id.dat files that you backed up).

  9. Clean the repository/data folder

  10. Go to local-shard-allocation.config.conf and replace the content with following.
    0, INIT
    1, INIT
    2, INIT
    3, INIT
    4, INIT
    5, INIT

  11. Do the above for both nodes and start. You will see that the local-shard-allocation-config.conf content is changed to

    0, NORMAL
    1, NORMAL
    2, NORMAL
    3, NORMAL
    4, NORMAL
    5, NORMAL

    Note that the above configuration is valid a DAS configured with 6 shards and 1 replicas. Step 10 should be changed according to the number of shards and replicas you have.

Indexing in Spark Environment

Spark runs in a different JVM. Therefore, Spark cannot write directly to Indexing queues. When we instruct the spark to index its summarized data, what Spark actually does is, it sends the summarized data to be indexed, to a storage area called staging area in the primary recordstore. So using the staging Index worker threads, these data is fetched and inserted to indexing queue. From that point onwards the indexing process is the same. Usually indexing can be slow if we frequently run the spark scripts (if those spark scripts contain instructions to index summarized data). Sometimes, there can be situations where the same data set is processed within a small time frame (e.g. spark scripts scheduled to run every 2 mins with indexing). In those cases, the same data is indexed over and over again. Best thing is to use incremental processing, So the reindexing of same data can be minimized.





  • No labels