This section covers the configurations required to use Apache Spark with WSO2 DAS.
Adding jar files to the Spark classpath
When starting the Spark Driver Application in the DAS server, Spark executors are created within the same node. The following jars are included in the classpath for these executors by default.
apache-zookeeper
axiom
axis2
axis2-json
cassandra-thrift
chill
com.datastax.driver.core
com.fasterxml.jackson.core.jackson-annotations
com.fasterxml.jackson.core.jackson-core
com.fasterxml.jackson.core.jackson-databind
com.fasterxml.jackson.module.jackson.module.scala
com.google.gson
com.google.guava
com.google.protobuf
com.jayway.jsonpath.json-path
com.ning.compress-lzf
com.sun.jersey.jersey-core
com.sun.jersey.jersey-server
commons-codec
commons-collections
commons-configuration
commons-httpclient
commons-io
commons-lang
config
h2-database-engine
hadoop-client
hazelcast
hbase-client
hector-core
htrace-core
htrace-core-apache
httpclient
httpcore
io.dropwizard.metrics.core
io.dropwizard.metrics.graphite
io.dropwizard.metrics.json
io.dropwizard.metrics.jvm
javax.cache.wso2
javax.servlet.jsp-api
jaxb
jdbc-pool
jdom
jettison
json
json-simple
json4s-jackson
kryo
libthrift
lucene
mesos
minlog
net.minidev.json-smart
netty-all
objenesis
org.apache.commons.lang3
org.apache.commons.math3
org.jboss.netty
org.roaringbitmap.RoaringBitmap
org.scala-lang.scala-library
org.scala-lang.scala-reflect
org.spark-project.protobuf.java
org.spark.project.akka.actor
org.spark.project.akka.remote
org.spark.project.akka.slf4j
org.wso2.carbon.analytics.api
org.wso2.carbon.analytics.dataservice.commons
org.wso2.carbon.analytics.dataservice.core
org.wso2.carbon.analytics.datasource.cassandra
org.wso2.carbon.analytics.datasource.commons
org.wso2.carbon.analytics.datasource.core
org.wso2.carbon.analytics.datasource.hbase
org.wso2.carbon.analytics.datasource.rdbms
org.wso2.carbon.analytics.eventsink
org.wso2.carbon.analytics.eventtable
org.wso2.carbon.analytics.io.commons
org.wso2.carbon.analytics.spark.core
org.wso2.carbon.analytics.stream.persistence
org.wso2.carbon.base
org.wso2.carbon.cluster.mgt.core
org.wso2.carbon.core
org.wso2.carbon.core.common
org.wso2.carbon.core.services
org.wso2.carbon.databridge.agent
org.wso2.carbon.databridge.agent
org.wso2.carbon.databridge.commons
In addition any jars available in the <DAS_HOME>/repository/conf/lib
directory are also appended to the class path.
If you want to add additional jars, you can add them to the SPARK_CLASSPATH
in the <DAS_HOME>/bin/external-spark-classpath.conf
file in a UNIX environment.
Each path should have a separate line.
When WSO2 DAS connects with an external Spark cluster, the distribution of DAS is copied to each node in the Spark cluster. This allows each Spark node to access the jars it needs to work with WSO2 DAS.
Carbon related configurations
Following are the Carbon related configurations that are used for Apache Spark. These configurations are shipped with the product by default in the <DAS_home>/repository/conf/analytics/spark/spark-defaults.conf
file.
Property | Default Value | Description |
---|---|---|
carbon.spark.master | local | The Spark master has three possible states as follows:
|
carbon.spark.master.count | 1 | The maximum number of masters allowed at a given time when DAS creates its own Spark cluster. This property is applicable only when the Spark master runs in the |
carbon.das.symbolic.link | This links to your DAS home by default. | The symbolic link for the jar files in the Spark class path. In a clustered DAS deployment, the directory path for the Spark Class path is different for each node depending on the location of the The symbolic link is not specified by default. When it is not specified, the jar files are added in the DAS home. |
Default Spark related configurations
Following are the Apache Spark related configurations that are used in WSO2 DAS. These configurations are shipped with the product by default in the <DAS_home>/repository/conf/analytics/spark/spark-defaults.conf
file.
For more information on the below Spark configuration properties, go to Apache Spark Documentation.
Application configurations
Property | Default Value |
---|---|
| CarbonAnalytics |
spark.driver.cores | 1 |
spark.driver.memory | 512m |
spark.executor.memory | 512m |
Spark UI configurations
Property | Default Value |
---|---|
| CarbonAnalytics |
spark.history.ui.port | 18080 |
Compression and serialization configurations
Property | Default Value |
---|---|
| org.apache.spark.serializer.KryoSerializer |
spark.kryoserializer.buffer | 256k |
spark.kryoserializer.buffer.max | 256m |
Networking configurations
Property | Default Value |
---|---|
| 12000 |
spark.broadcast.port | 12500 |
spark.driver.port | 13000 |
spark.executor.port | 13500 |
spark.fileserver.port | 14000 |
spark.replClassServer.port | 14500 |
Scheduling configurations
Property | Default Value |
---|---|
| FAIR |
In addition to having FAIR
as the value for the spark.scheduler.mode
property in the spark-defaults.conf
file, it is required to have a pool configuration with the schedulingMode
parameter set to FAIR
in the <DAS_HOME>/repository/conf/analytics/spark/fairscheduler.xml
file as shown in the configuration below. This is because Apache Spark by default uses a scheduler pool which runs scheduler tasks in a First-In-First-Out (FIFO) order.
<?xml version="1.0"?> <allocations> <pool name="carbon-pool"> <schedulingMode>FAIR</schedulingMode> <weight>1000</weight> <minShare>1</minShare> </pool> <pool name="test"> <schedulingMode>FIFO</schedulingMode> <weight>1</weight> <minShare>0</minShare> </pool> </allocations>
Standalone cluster configurations
Property | Default Value |
---|---|
| CUSTOM |
spark.deploy.recoveryMode.factory | org.wso2.carbon.analytics.spark.core.deploy.AnalyticsRecoveryModeFactory |
Master configurations
Property | Default Value |
---|---|
| 7077 |
spark.master.rest.port | 6066 |
spark.master.webui.port | 8081 |
Worker configurations
Property | Default Value |
---|---|
| 1 |
spark.worker.memory | 1g |
spark.worker.dir | work |
spark.worker.port | 11000 |
spark.worker.webui.port | 11500 |
Executor configurations
It is recommended to run only one executor per DAS worker. If you observe any memory or Spark executor time issues for this executor, you can increase the amount of memory and the number of CPU cores allocated to it.
Property | Default Value | Description |
---|---|---|
spark.executor.cores | 1 | The number of cores allocated to the Spark executors that are running in the DAS node. All the availble CPU cores of the worker are allocated to the executor(s) by default. |
spark.executor.memory | 1g | The amount of CPU memory allocated to the spark executor(s). |
Optional Spark related configurations
The following configurations can be added to the <DAS_home>/repository/conf/analytics/spark/spark-defaults.conf
file if you want to limit the space allocated for log files generated and saved in the <DAS_HOME>/work
directory.
spark.executor.logs.rolling.strategy size spark.executor.logs.rolling.maxSize 10000000 spark.executor.logs.rolling.maxRetainedFiles 10
Property | Description |
---|---|
spark.executor.logs.rolling.strategy | This indicates the strategy used to control the amount of logs saved in the <DAS_HOME>/work directory. In the above configuration, property value size indicates that the amount of logs that are allowed to be kept is restricted based on the size. |
spark.executor.logs.rolling.maxSize | The maximum size (in bytes) allowed for logs saved in the <DAS_HOME>/work directory at any given time. Older log files are deleted when new logs are generated so that the specified maximum size is not exceeded. |
spark.executor.logs.rolling.maxRetainedFiles | The maximum number of log files allowed to be kept in the <DAS_HOME>/work directory at any given time. Older log files are deleted when new logs are generated so that the specified maximum number of files is not exceeded. In the above configuration, this property is overruled by the spark.executor.logs.rolling.maxSize property because the value specified for the spark.executor.logs.rolling.strategy is size . If the maximum size specified for logs is reached, older logs are deleted even if the maximum number of files specified is not yet reached. |