Wednesday, September 26, 2018

Automating Deployments for Spark Streaming on AWS EMR

Anyone who's familiar with submitting a Spark job will know it's as simple as running a single command, so long as the Spark CLI is installed on the machine running that command:
spark-submit --master ${MASTER_ENDPOINT} --deploy-mode cluster --class s3://bucket/jobs/${APP_JAR}
Elastic Map Reduce

This is easy enough when working with a self-managed vanilla Spark cluster. AWS also provides a service called EMR (Elastic Map-Reduce) which offers hosting of Spark clusters and so the AWS CLI provides an equally effortless way to submit Spark jobs:
aws emr add-steps cluster-id j-UJODR7SZ6L7L steps Type=Spark,Name="Aggregator",ActionOnFailure=CONTINUE,Args=[--deploy-mode,cluster,--class,,s3://bucket/jobs/${APP_JAR}]
This is essentially a wrapper around the spark-submit command. The AWS documentation is relatively detailed in providing information for adding additional parameters to the command line arguments in order to provide all the features found in the spark-submit command. This makes it incredibly simple to create a deployment step in your CI pipeline.

Even if the build machines don't have AWS CLI installed, the commands can be wrapped around a docker run call using an image called garland/aws-cli-docker which can be found on Docker Hub:
cat <<EOF >

    #!/usr/bin/env bash

    set -ex

    aws emr add-steps cluster-id j-UJODR7SZ6L7L steps ...

docker run \
    --rm \
    -v "/${APP_JAR}":/${APP_JAR} \
    -v "/$(pwd)/":/ \
    -w "//" \
    --entrypoint=sh \
    garland/aws-cli-docker:latest \
The beautiful thing about docker is that the build machines only ever need one prerequisite: docker. Anything else that needs to be installed for deployment can instead be found as a docker image on Docker Hub. In this case the aws emr add-steps command is executed within a docker container and the build machines don't need to know anything about it!

However, there is a catch. This is only straightforward if you are under the assumption that your Spark job eventually completes, whether it returns a success or failure status. In fact, that is exactly the same assumption that the AWS documentation makes. Once a job completes, running the same command will add a new step on the cluster and the step will run without any issues.

But what about Structured Streaming? An ongoing Spark job with an open connection to listen to an event stream suddenly makes one simple command slightly more complicated when deployment of a code change is required.

This would require stopping a currently running step before adding a new step. AWS documentation has another command that can be useful in this scenario:
aws emr cancel-steps --cluster-id j-UJODR7SZ6L7L --step-ids s-2V57YI7T5NF42
Unfortunately the cancel-steps command can only remove a pending step i.e. one that isn't in a running state. In fact there is no API to terminate a running step at all and the only solution found in AWS documentation is to do the following:
  1. Log into the EMR master node (which will need a secure .pem file)
  2. Run yarn application -list
  3. Find the application ID of the currently running job
  4. Run yarn application -kill ${APP_ID}
Obviously this is not a viable solution for automation since we will have to deal with ssh and passing .pem files around on build machines in order to achieve this process. No doubt your security team would have their own opinion on this approach.

Alternatively, there is another AWS API call that may be a useful workaround:
aws emr terminate-clusters --cluster-ids j-UJODR7SZ6L7L
This could achieve what we are looking for, if the following automation steps are executed:
  1. Terminate EMR cluster
  2. Recreate EMR cluster using terraform or cloud formation
  3. Add step to newly created cluster
Terminating an entire cluster just to deploy a job may seem like overkill, however in the new world of infrastructure-as-code, it isn't quite as insane as one might think. After consulting AWS experts and Spark specialists alike, both have agreed that these days it is actually encouraged to dedicate one spark job to a single Spark cluster.

The benefits are quite significant:
  • Dedicated clusters mean each job can only affect itself without consuming resources from other jobs
  • If there are any fatal errors or memory leaks other jobs aren't affected
  • The entire cluster's resources can be specifically tailored for a single job, rather than maintaining resource allocation for many jobs which will require significant monitoring and any necessary resizing will affect all currently running jobs with downtime
However, the downfalls are also notable:
  • Rather than one single master with many shared cores, there will be a master for each job which will obviously mean more EC2 costs, however masters can be kept small since they're only required to receive requests and distribute compute to slaves
  • Deployment pipelines will be slower since recreating clusters for every application code update will add significant time to the build process
These downfalls may be deal breakers for some use cases, and it has given enough attention to the AWS experts after consultation to at least rethink the API strategies they've provided for a scenario such as Structured Streaming. The initial assumption that a Spark job is a short lived process is simply not enough.

A better solution for now would actually be to utilise the YARN API instead. Since YARN is the orchestrator of the Spark nodes within the cluster, it provides many features in which to manage both the cluster and its jobs. There is comprehensive documentation on every command available via HTTP REST API calls. That means the build machines can avoid having to ssh into master nodes using .pem files and can send an HTTP request instead (so long as the inbound firewall permissions of the Spark cluster allow the build machines access to it - and that is a simple terraform or cloud formation configuration).

That means the following automation steps can achieve a successful deployment pipeline:
# 1. Get a list of active jobs
APPS=$(curl -s -X GET -H "Content-Type: application/json" "http://${MASTER_ENDPOINT}/ws/v1/cluster/apps")

# 2. Parse the response payload and return the id of the running job (requires jp package)
APP_ID=$(echo $APPS | jq '[0].id')

# 3. Send a kill request to the currently running job
curl -X PUT -H "Content-Type: application/json" -d '{"state": "KILLED"}' "http://${MASTER_ENDPOINT}/ws/v1/cluster/apps/${APP_ID}/state"

# 4. Add a new step to submit the new job
aws emr add-steps cluster-id j-UJODR7SZ6L7L steps Type=Spark,Name="Aggregator",ActionOnFailure=CONTINUE,Args=[--deploy-mode,cluster,--class,,s3://bucket/jobs/${APP_JAR}]
This removes all the downfalls because it creates a separation of concerns by providing a specific CI pipeline for job deployment while the cluster can be resized and rebuilt when necessary. The benefits of keeping one dedicated cluster per Spark job can still be maintained with this solution, only that the cluster doesn't have to be destroyed every time there is an iteration required on the job itself.