Supports multiple spark contexts created from the same server
The main problem this project solves is the inability to run multiple Spark contexts from the same JVM. This is a bug in Spark core that was also present in Ooyala's Spark Job Server, from which this project is inspired. The project launches a new process for each Spark context/application, with its own driver memory setting and its own driver log. Each driver JVM is created with its own Spark UI port, sent back to the api caller. Inter-process communication is achieved with akka actors, and each process is shut down when a Spark context/application is deleted.
SJR Version | Spark Version |
---|---|
0.3.0 | 1.1.0 |
0.3.1 | 1.3.1 |
The project is build with Maven3 and Java7.
make build
SJR can now be deployed from spark-job-rest/spark-job-rest/target/spark-job-rest.tar.gz
If your build fails with this error:
[ERROR] spark-job-rest/src/main/scala/server/domain/actors/ContextManagerActor.scala:171: error: value redirectOutput is not a member of ProcessBuilder
This happens because Maven uses Java6. You can run mvn -version in order to check the Java version that Maven uses.
$ mvn -version
Apache Maven 3.2.5
Java version: 1.6.0_65
If Maven uses Java6 you need to change it to Java7. This can be done by adding the JAVA_HOME export in your ~/.mavenrc file:
# OSX:
export JAVA_HOME=/Library/Java/JavaVirtualMachines/{jdk-version}/Contents/Home
# Ubuntu:
export JAVA_HOME=/usr/lib/jvm/{jdk-version}
If running from IDE fails with:
Exception in thread "main" java.lang.NoClassDefFoundError: akka/actor/Props
This happens because the spark dependency has the provided scope. In order to run from IDE you can remove the provided scope for the spark dependency(inside pom.xml) or you can add the spark assembly jar to the running classpath.
You can deploy Spark-Job-Rest locally to deploy
directory inside the project by:
make deploy
Optionally you can specifying install directory in $SJR_DEPLOY_PATH
environment variable:
SJR_DEPLOY_PATH=/opt/spark-job-rest make deploy
Before running JSR ensure that working environment is configured.
In order to have a proper installation you should set $SPARK_HOME
to your Apache Spark distribution and $SPARK_CONF_HOME
to directory which consists spark-env.sh
(usually $SPARK_HOME/conf
or $SPARK_HOME/libexec/conf
).
You can do it in your bash profile (~/.bash_profile
or ~/.bashrc
) by adding the following lines:
export SPARK_HOME=<Path to Apache Spark>
export SPARK_CONF_HOME=$SPARK_HOME/libexec/conf # or $SPARK_HOME/conf depending on your distribution
After that either run in the new terminal session or source your bash profile.
You can deploy Spark-Job-REST to remote host via:
make remote-deploy
For remote deployment you should set following environment variables:
# Mandatory connection string
export SJR_DEPLOY_HOST=<user@hostname for remote machine>
# Optional parameters
export SJR_DEPLOY_KEY=<optional path to your SSH key>
export SJR_REMOTE_DEPLOY_PATH=<where you want to install Spark-Job-REST on remote host>
If SJR_REMOTE_DEPLOY_PATH
is not set then SJR_DEPLOY_PATH
will be used during remote deploy.
To start/stop SJR use
cd $SJR_DEPLOY_PATH
bin/start_server.sh
bin/stop_server.sh
or if it deployed to default destination just
make start
make stop
Spark-Job-REST default configuration is stored in resources/application.conf
(here and after under spark-job-rest/src/main/
).
To add or override settings create resources/deploy.conf
(ignored by VCS).
Configure the default spark properties for context creation as they are normal Spark configuration options
spark.executor.memory=2g
spark.master="local"
spark.path="/Users/user/spark-1.1.0"
........
To set how much memory should be allocated for driver use driver.xmxMemory
(default is 1g
).
Configure settings like web server port and akka system ports
appConf{
web.services.port=8097
spark.ui.first.port = 16000
........
}
You may configure folders by setting environment variables and by creating and editing resources/deploy-settings.sh
(under spark-job-rest/src/main/
):
export SJR_LOG_DIR=<path to logs directory>
export SJR_JAR_PATH=<path to jar files storage>
export SJR_CONTEXTS_BASE_DIR=<path to the rood directory for contexts process directories>
export JSR_EXTRA_CLASSPATH=<additional classes required for your application to run>
You can extend or override Java and GC options in resources/deploy-settings.sh
:
JAVA_OPTS="${JAVA_OPTS}
${YOUR_EXTRA_JAVA_OPTIONS}"
GC_OPTS="${GC_OPTS}
${YOUR_EXTRA_GC_OPTIONS}"
Spark-Job-REST supports custom job context factories defined in context.job-context-factory
property of config.
By default SJR uses context.SparkContextFactory
which creates one Spark Context per JVM.
To run jobs with provided SQL contexts include spark-job-rest-sql
in your project, set context factory to one of SQLContext factories provided by this library and inherit your job from api.SparkSqlJob
.
Currently supported contexts:
context.SparkSqlContextFactory
creates simple job SQLContext.context.HiveContextFactory
creates Hive SQL context.
In order to have a proper installation you should set $SPARK_HOME
to your Apache Spark distribution and $SPARK_CONF_HOME
to directory which consists spark-env.sh
(usually $SPARK_HOME/conf
or $SPARK_HOME/libexec/conf
).
You can do it in your bash profile (~/.bash_profile
or ~/.bashrc
) by adding the following lines:
export SPARK_HOME=<Path to Apache Spark>
export SPARK_CONF_HOME=$SPARK_HOME/libexec/conf # or $SPARK_HOME/conf depending on your distribution
After that either run in the new terminal session or source your bash profile.
The SJR can be run from outside the Spark cluster, but you need to at least copy the deployment folder from one of the slaves or master nodes.
After editing all the configuration files SJR can be run by executing the script start-server.sh
The UI can be accessed at <server address>:<appConf.web.services.port>
.
Contexts
- POST /contexts/{contextName} - Create Context
- Body: Raw entity with key-value pairs.
- jars key is required and it should be in the form of a comma separated list of jar paths. These jars will be added at Spark context creation time to the class path of the newly created context's JVM process. There are 3 types of jar paths supported:
- Absolute path on the server side : /home/ubuntu/example.jar
- Name of the jar that was uploaded to the server : example.jar
- Hdfs path : hdfs://devbox.local:8020/user/test/example.jar
# Body example:
jars="/home/ubuntu/example.jar,example.jar,hdfs://devbox.local:8020/user/test/example.jar”
spark.executor.memory=2g
driver.xmxMemory = 1g
-
GET /contexts/{contextName} - returns Context JSON object | No such context.
-
DELETE /contexts/{contextName} - Delete Context
Jobs
-
POST /jobs?runningClass={runningClass}&context={contextName} - Job Submission
- Body: Raw entity with key-value pairs. Here you can set any configuration properties that will be passed to the config parameter of the validate and run methods of the provided jar (see the SparkJob definition below)
-
GET /jobs/{jobId}?contextName={contextName} - Gets the result or state of a specific job
-
GET /jobs - Gets the states/results of all jobs from all running contexts
Jars
-
POST /jars/{jarName} - Upload jar
- Body: Jar Bytes
-
POST /jars - Upload jar
- Body: MultiPart Form
-
GET /jars - Gets all the uploaded jars
-
DELETE /jars/{jarName} - Delete jar
All the API methods can be called from Scala/Java with the help of an HTTP Client.
Maven Spark-Job-Rest-Client dependency:
<dependency>
<groupId>com.xpatterns</groupId>
<artifactId>spark-job-rest-client</artifactId>
<version>0.3.1</version>
</dependency>
Add maven Spark-Job-Rest-Api dependency:
<dependency>
<groupId>com.xpatterns</groupId>
<artifactId>spark-job-rest-api</artifactId>
<version>0.3.1</version>
</dependency>
To create a job that can be submitted through the server, the class must implement the SparkJob trait.
import com.typesafe.config.Config
import org.apache.spark.SparkContext
import api.{SparkJobInvalid, SparkJobValid, SparkJobValidation, SparkJob}
class Example extends SparkJob {
override def runJob(sc:SparkContext, jobConfig: Config): Any = { ... }
override def validate(sc:SparkContext, config: Config): SparkJobValidation = { ... }
}
- runJob method contains the implementation of the Job. SparkContext and Config objects are provided through parameters.
- validate method allows for an initial validation. In order to run the job return SparkJobValid(), otherwise return SparkJobInvalid(message).
An example for this project can be found here: spark-job-rest/examples/example-job
. In order to package it, run
mvn clean install
Upload JAR
# In the project root directory
curl --data-binary @spark-job-rest/examples/example-job/target/example-job.jar 'localhost:8097/jars/example-job.jar'
{
"contextName": "test-context",
"sparkUiPort": "16003"
}
Create a context
curl -X POST -d "jars=example-job.jar" 'localhost:8097/contexts/test-context'
{
"contextName": "test-context",
"sparkUiPort": "16003"
}
Check if context exists
curl 'localhost:8097/contexts/test-context'
{
"contextName": "test-context",
"sparkUiPort": "16003"
}
Run job - The example job creates an RDD from a Range(0,input) and applies count on it.
curl -X POST -d "input=10000" 'localhost:8097/jobs?runningClass=com.job.SparkJobImplemented&contextName=test-context'
{
"jobId": "2bd438a2-ac1e-401a-b767-5fa044b2bd69",
"contextName": "test-context",
"status": "Running",
"result": "",
"startTime": 1430287260144
}
2bd438a2-ac1e-401a-b767-5fa044b2bd69
represents the jobId. This id can be used to query for the job status/results.
Query for results
curl 'localhost:8097/jobs/2bd438a2-ac1e-401a-b767-5fa044b2bd69?contextName=test-context'
{
"jobId": "2bd438a2-ac1e-401a-b767-5fa044b2bd69",
"contextName": "test-context",
"status": "Finished",
"result": "10000",
"startTime": 1430287261108
}
Delete context
curl -X DELETE 'localhost:8097/contexts/test-context'
{
"message": "Context deleted."
}
HTTP Client Example
object Example extends App {
implicit val system = ActorSystem()
val contextName = "testContext"
try {
val sjrc = new SparkJobRestClient("http://localhost:8097")
val context = sjrc.createContext(contextName, Map("jars" -> "/Users/raduchilom/projects/spark-job-rest/examples/example-job/target/example-job.jar"))
println(context)
val job = sjrc.runJob("com.job.SparkJobImplemented", contextName, Map("input" -> "10"))
println(job)
var jobFinal = sjrc.getJob(job.jobId, job.contextName)
while (jobFinal.status.equals(JobStates.RUNNING.toString())) {
Thread.sleep(1000)
jobFinal = sjrc.getJob(job.jobId, job.contextName)
}
println(jobFinal)
sjrc.deleteContext(contextName)
} catch {
case e:Exception => {
e.printStackTrace()
}
}
system.shutdown()
}
Running this would produce the output:
Context(testContext,16002)
Job(ab63c19f-bbb4-461e-8c6f-f0a35f73a943,testContext,Running,,1430291077689)
Job(ab63c19f-bbb4-461e-8c6f-f0a35f73a943,testContext,Finished,10,1430291078694)
The UI was added in a compiled and minified state. For sources and changes please refer to spark-job-rest-ui project.