Styles

Showing posts with label CI. Show all posts
Showing posts with label CI. Show all posts

Thursday, January 10, 2019

Compiling Spark Jobs using SBT on Build Machines

Deploying a Spark job can be challenging, especially considering that one Spark job is never the same as another. Deployment times can vary significantly depending on a wide range of factors. Finding ways to make the deployment efficient can greatly improve the process, and that can be achieved with a few simple strategies.

Consider the following steps required to deploy a Spark job that is built in scala and compiled using sbt:
  1. Get latest project files (requires git or subversion etc.)
  2. Download dependencies (requires sbt)
  3. Compile Spark application (requires sbt)
  4. Run unit test (requires sbt)
  5. Package Spark application to a .jar file (requires sbt)
  6. Upload .jar file to an accessible location (e.g. s3 bucket) (requires aws-cli)
  7. Spark submit using the s3 location to the master node (requires spark)
This can be achieved using a deployment pipeline created in a CI tool like TeamCity or Jenkins.
Usually build machines used for a deployment pipeline are provisioned so they are lightweight with very few applications installed.
In fact, if the only application installed on a build machine is docker, then that is enough.

Since our build machines don't have sbt installed, a docker image with scala and sbt is required to run sbt assembly. A docker image found on docker hub like spikerlabs/scala-sbt can achieve this. However, running sbt assembly using this docker image will take a significantly long time to complete, sometimes as long as 30 minutes! This is because all the necessary dependencies for your spark job will need to be downloaded before compiling your Spark application.

Performing this operation every time you need to deploy your Spark job is costly. So in order to improve the efficiency of builds to prevent these download times, the dependencies first need to be downloaded onto a specific sbt docker image that is tailored for your Spark job. This can then be used as part of the deployment pipeline.

The following steps will need to be carried out for this to be achieved:

1. cd into the project folder.

2. Run the following docker command to start a spikerlabs/scala-sbt container in interactive mode.
docker run -i -v "/$(pwd)/":/app -w "//app" --name "my-scala-sbt-container" "spikerlabs/scala-sbt:scala-2.11.8-sbt-0.13.15" bash
Note that a specific scala and sbt version will alway be required otherwise the "latest" tag could fail during compilation with any breaking changes.

3. Once in interactive mode within the docker container, run the following commands within the container.
> sbt assembly # to download dependencies, compile, and package the spark job
> exit # once the packaging is complete
4. This will create a docker container called my-scala-sbt-container which will need to be exported, then imported as an image, as follows:
docker export --output="./my-scala-sbt-container.tar" "my-scala-sbt-container"
docker import "./my-scala-sbt-container.tar" [INTERNAL_REPOSITORY_URL]/my-scala-sbt-image:[VERSION_NUMBER]
Where [INTERNAL_REPOSITORY_URL] is a company wide docker repository location e.g. like nexus,
and where [VERSION_NUMBER] needs to be bumped up from a previous version.

Note that the import allows a container that has all the necessary dependencies to be converted into an image called [INTERNAL_REPOSITORY_URL]/my-scala-sbt-image:[VERSION_NUMBER].

This will be needed when docker push is executed. Unfortunately the docker API for a push command requires the image name to be exactly the same as the docker repository url which seems a little non-intuitive.

5. To publish the the local docker image to the internal docker repository, run the following docker push command:
docker image push [INTERNAL_REPOSITORY_URL]/my-scala-sbt-image:[VERSION_NUMBER]
Where [INTERNAL_REPOSITORY_URL] is a company wide docker repository location e.g. like nexus,
and where [VERSION_NUMBER] is the same as the previous step.

This can now help in the CI deployment pipeline, where the step to run sbt assembly can be done with the docker run command as follows:
cat <<EOF > assembly.sh
    
#!/usr/bin/env bash
set -ex
sbt assembly
EOF docker run \ --rm \ -v "/$(pwd)/..":/app \ -v "/$(pwd)/assembly.sh":/assembly.sh \ -w "//" \ --entrypoint=sh \ [INTERNAL_REPOSITORY_URL]/my-scala-sbt-image:[VERSION_NUMBER] \ //assembly.sh
That should improve the build timings by removing up to 30 minutes off the deployment pipeline, depending on how many dependencies are required for the Spark application.

Wednesday, September 26, 2018

Automating Deployments for Spark Streaming on AWS EMR

Anyone who's familiar with submitting a Spark job will know it's as simple as running a single command, so long as the Spark CLI is installed on the machine running that command:
spark-submit --master ${MASTER_ENDPOINT} --deploy-mode cluster --class com.cm.spark.SparkApp s3://bucket/jobs/${APP_JAR}
Elastic Map Reduce

This is easy enough when working with a self-managed vanilla Spark cluster. AWS also provides a service called EMR (Elastic Map-Reduce) which offers hosting of Spark clusters and so the AWS CLI provides an equally effortless way to submit Spark jobs:
aws emr add-steps cluster-id j-UJODR7SZ6L7L steps Type=Spark,Name="Aggregator",ActionOnFailure=CONTINUE,Args=[--deploy-mode,cluster,--class,com.cm.spark.SparkApp,s3://bucket/jobs/${APP_JAR}]
This is essentially a wrapper around the spark-submit command. The AWS documentation is relatively detailed in providing information for adding additional parameters to the command line arguments in order to provide all the features found in the spark-submit command. This makes it incredibly simple to create a deployment step in your CI pipeline.

Even if the build machines don't have AWS CLI installed, the commands can be wrapped around a docker run call using an image called garland/aws-cli-docker which can be found on Docker Hub:
cat <<EOF > submit.sh

    #!/usr/bin/env bash

    set -ex

    aws emr add-steps cluster-id j-UJODR7SZ6L7L steps ...
    
EOF

docker run \
    --rm \
    -e AWS_DEFAULT_REGION=${AWS_REGION} \
    -e AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY_ID}" \
    -e AWS_SECRET_ACCESS_KEY="${AWS_SECRET_ACCESS_KEY}" \
    -v "/${APP_JAR}":/${APP_JAR} \
    -v "/$(pwd)/submit.sh":/submit.sh \
    -w "//" \
    --entrypoint=sh \
    garland/aws-cli-docker:latest \
    //submit.sh
The beautiful thing about docker is that the build machines only ever need one prerequisite: docker. Anything else that needs to be installed for deployment can instead be found as a docker image on Docker Hub. In this case the aws emr add-steps command is executed within a docker container and the build machines don't need to know anything about it!

However, there is a catch. This is only straightforward if you are under the assumption that your Spark job eventually completes, whether it returns a success or failure status. In fact, that is exactly the same assumption that the AWS documentation makes. Once a job completes, running the same command will add a new step on the cluster and the step will run without any issues.

But what about Structured Streaming? An ongoing Spark job with an open connection to listen to an event stream suddenly makes one simple command slightly more complicated when deployment of a code change is required.

This would require stopping a currently running step before adding a new step. AWS documentation has another command that can be useful in this scenario:
aws emr cancel-steps --cluster-id j-UJODR7SZ6L7L --step-ids s-2V57YI7T5NF42
Unfortunately the cancel-steps command can only remove a pending step i.e. one that isn't in a running state. In fact there is no API to terminate a running step at all and the only solution found in AWS documentation is to do the following:
  1. Log into the EMR master node (which will need a secure .pem file)
  2. Run yarn application -list
  3. Find the application ID of the currently running job
  4. Run yarn application -kill ${APP_ID}
Obviously this is not a viable solution for automation since we will have to deal with ssh and passing .pem files around on build machines in order to achieve this process. No doubt your security team would have their own opinion on this approach.

Alternatively, there is another AWS API call that may be a useful workaround:
aws emr terminate-clusters --cluster-ids j-UJODR7SZ6L7L
This could achieve what we are looking for, if the following automation steps are executed:
  1. Terminate EMR cluster
  2. Recreate EMR cluster using terraform or cloud formation
  3. Add step to newly created cluster
Terminating an entire cluster just to deploy a job may seem like overkill, however in the new world of infrastructure-as-code, it isn't quite as insane as one might think. After consulting AWS experts and Spark specialists alike, both have agreed that these days it is actually encouraged to dedicate one spark job to a single Spark cluster.

The benefits are quite significant:
  • Dedicated clusters mean each job can only affect itself without consuming resources from other jobs
  • If there are any fatal errors or memory leaks other jobs aren't affected
  • The entire cluster's resources can be specifically tailored for a single job, rather than maintaining resource allocation for many jobs which will require significant monitoring and any necessary resizing will affect all currently running jobs with downtime
However, the downfalls are also notable:
  • Rather than one single master with many shared cores, there will be a master for each job which will obviously mean more EC2 costs, however masters can be kept small since they're only required to receive requests and distribute compute to slaves
  • Deployment pipelines will be slower since recreating clusters for every application code update will add significant time to the build process
These downfalls may be deal breakers for some use cases, and it has given enough attention to the AWS experts after consultation to at least rethink the API strategies they've provided for a scenario such as Structured Streaming. The initial assumption that a Spark job is a short lived process is simply not enough.

A better solution for now would actually be to utilise the YARN API instead. Since YARN is the orchestrator of the Spark nodes within the cluster, it provides many features in which to manage both the cluster and its jobs. There is comprehensive documentation on every command available via HTTP REST API calls. That means the build machines can avoid having to ssh into master nodes using .pem files and can send an HTTP request instead (so long as the inbound firewall permissions of the Spark cluster allow the build machines access to it - and that is a simple terraform or cloud formation configuration).

That means the following automation steps can achieve a successful deployment pipeline:
# 1. Get a list of active jobs
APPS=$(curl -s -X GET -H "Content-Type: application/json" "http://${MASTER_ENDPOINT}/ws/v1/cluster/apps")

# 2. Parse the response payload and return the id of the running job (requires jp package)
APP_ID=$(echo $APPS | jq '.apps.app[0].id')

# 3. Send a kill request to the currently running job
curl -X PUT -H "Content-Type: application/json" -d '{"state": "KILLED"}' "http://${MASTER_ENDPOINT}/ws/v1/cluster/apps/${APP_ID}/state"

# 4. Add a new step to submit the new job
aws emr add-steps cluster-id j-UJODR7SZ6L7L steps Type=Spark,Name="Aggregator",ActionOnFailure=CONTINUE,Args=[--deploy-mode,cluster,--class,com.cm.spark.SparkApp,s3://bucket/jobs/${APP_JAR}]
This removes all the downfalls because it creates a separation of concerns by providing a specific CI pipeline for job deployment while the cluster can be resized and rebuilt when necessary. The benefits of keeping one dedicated cluster per Spark job can still be maintained with this solution, only that the cluster doesn't have to be destroyed every time there is an iteration required on the job itself.