Styles

Showing posts with label S3. Show all posts
Showing posts with label S3. Show all posts

Thursday, January 10, 2019

Compiling Spark Jobs using SBT on Build Machines

Deploying a Spark job can be challenging, especially considering that one Spark job is never the same as another. Deployment times can vary significantly depending on a wide range of factors. Finding ways to make the deployment efficient can greatly improve the process, and that can be achieved with a few simple strategies.

Consider the following steps required to deploy a Spark job that is built in scala and compiled using sbt:
  1. Get latest project files (requires git or subversion etc.)
  2. Download dependencies (requires sbt)
  3. Compile Spark application (requires sbt)
  4. Run unit test (requires sbt)
  5. Package Spark application to a .jar file (requires sbt)
  6. Upload .jar file to an accessible location (e.g. s3 bucket) (requires aws-cli)
  7. Spark submit using the s3 location to the master node (requires spark)
This can be achieved using a deployment pipeline created in a CI tool like TeamCity or Jenkins.
Usually build machines used for a deployment pipeline are provisioned so they are lightweight with very few applications installed.
In fact, if the only application installed on a build machine is docker, then that is enough.

Since our build machines don't have sbt installed, a docker image with scala and sbt is required to run sbt assembly. A docker image found on docker hub like spikerlabs/scala-sbt can achieve this. However, running sbt assembly using this docker image will take a significantly long time to complete, sometimes as long as 30 minutes! This is because all the necessary dependencies for your spark job will need to be downloaded before compiling your Spark application.

Performing this operation every time you need to deploy your Spark job is costly. So in order to improve the efficiency of builds to prevent these download times, the dependencies first need to be downloaded onto a specific sbt docker image that is tailored for your Spark job. This can then be used as part of the deployment pipeline.

The following steps will need to be carried out for this to be achieved:

1. cd into the project folder.

2. Run the following docker command to start a spikerlabs/scala-sbt container in interactive mode.
docker run -i -v "/$(pwd)/":/app -w "//app" --name "my-scala-sbt-container" "spikerlabs/scala-sbt:scala-2.11.8-sbt-0.13.15" bash
Note that a specific scala and sbt version will alway be required otherwise the "latest" tag could fail during compilation with any breaking changes.

3. Once in interactive mode within the docker container, run the following commands within the container.
> sbt assembly # to download dependencies, compile, and package the spark job
> exit # once the packaging is complete
4. This will create a docker container called my-scala-sbt-container which will need to be exported, then imported as an image, as follows:
docker export --output="./my-scala-sbt-container.tar" "my-scala-sbt-container"
docker import "./my-scala-sbt-container.tar" [INTERNAL_REPOSITORY_URL]/my-scala-sbt-image:[VERSION_NUMBER]
Where [INTERNAL_REPOSITORY_URL] is a company wide docker repository location e.g. like nexus,
and where [VERSION_NUMBER] needs to be bumped up from a previous version.

Note that the import allows a container that has all the necessary dependencies to be converted into an image called [INTERNAL_REPOSITORY_URL]/my-scala-sbt-image:[VERSION_NUMBER].

This will be needed when docker push is executed. Unfortunately the docker API for a push command requires the image name to be exactly the same as the docker repository url which seems a little non-intuitive.

5. To publish the the local docker image to the internal docker repository, run the following docker push command:
docker image push [INTERNAL_REPOSITORY_URL]/my-scala-sbt-image:[VERSION_NUMBER]
Where [INTERNAL_REPOSITORY_URL] is a company wide docker repository location e.g. like nexus,
and where [VERSION_NUMBER] is the same as the previous step.

This can now help in the CI deployment pipeline, where the step to run sbt assembly can be done with the docker run command as follows:
cat <<EOF > assembly.sh
    
#!/usr/bin/env bash
set -ex
sbt assembly
EOF docker run \ --rm \ -v "/$(pwd)/..":/app \ -v "/$(pwd)/assembly.sh":/assembly.sh \ -w "//" \ --entrypoint=sh \ [INTERNAL_REPOSITORY_URL]/my-scala-sbt-image:[VERSION_NUMBER] \ //assembly.sh
That should improve the build timings by removing up to 30 minutes off the deployment pipeline, depending on how many dependencies are required for the Spark application.

Monday, October 30, 2017

Aggregating Event Streams in Redshift using Matillion

If you haven’t heard of Matillion, then it’s time you knew about it. If you’ve used ETL tools in the past like the traditional SSIS for Microsoft’s SQL Server, then Matillion would be very familiar to you. Where SSIS was tailored for SQL Server, Matillion is tailored for Amazon’s Data Warehouse offering, AWS Redshift.




Matillion’s user interface is very intuitive as it groups all its components into three natural categories: those for extract, those for transform, and those for load.

Since Matillion sits on top of Redshift, both must live within AWS, so the features for extracting include components for RDS (as well as any other relational databases that support JDBC drivers). It also supports DynamoDB, S3, as well as components for MongoDB, Cassandra and Couchbase.


Components used for transformation provide the ability to aggregate, filter, rank, and even to roll up sums over multiple records from a transient “staging” Redshift table to load it to a more permanent Redshift table. The underlying functions for these components are in fact Redshift query commands.

Matillion seems quite mature with countless component features at your fingertips. Much of the transformation logic you can construct can definitely be written by hand using Redshift’s comprehensive documentation on ANSI SQL commands, but what Matillion can provide is a visual representation which significantly improves collaboration amongst team members so they can all be equally responsible for maintaining the ETL process in an efficient way.

Many could be forgiven for associating an ETL process with the old techniques of extracting large amounts of data from a monolithic data store to push into a read store using the typical CQRS architecture, but Matillion can be used for much more than that.

Our use-case at Campaign Monitor not only required data from a snapshot-style structure as is found in a traditional monolithic relational database, but also data that is sourced from an event stream for a rolling daily aggregation.

In order to achieve this we needed an initial query to obtain the total snapshot on a particular day, which can then be stored in a “roll-up” table in Redshift as the first day of aggregated totals. Let’s take the following table as an example snapshot that can be the baseline for any future events:

ListIDSubscribersDate
110002017-10-01
220002017-10-01
330002017-10-01
440002017-10-01

This can be done using a traditional ETL job in Matillion as follows:



There is a lot of sophisticated magic under the hood with regards to the Database Query component. The SQL query that is run from the relational database will return a resultset that Matillion can chunk into separate concurrent processes. Each of these processes will create an underlying S3 file before loading the data into the “rs_lists_subscriber_rollup_staging” table in Redshift.

Once we have a snapshot on a particular date (in this case 2017–10–01 in the example above) all events for subscribers that come in after this date can be streamed from our event source, and then aggregated into the same table with a rolling count of subscribers for subsequent dates.

This requires some form of consumption externally to receive the incoming events, which can be batched into S3 files, and then persisted into an ephemeral “raw” table in Redshift using the COPY command. This raw table would look as follows:

ListIDSubscriberIDSubscribe TallyDate
1112017-10-02
12-12017-10-02
3112017-10-02
2112017-10-02
2212017-10-02
2312017-10-02
41-12017-10-02
42-12017-10-02
43-12017-10-02
4412017-10-02
4112017-10-03
4512017-10-03
4612017-10-03
4712017-10-03

Generally, a contact subscribes or unsubscribes at a specific time from a list, and this will need to be aggregated daily per list so the data above would look as follows:

ListIDSubscribersDate
102017-10-02
232017-10-02
312017-10-02
4-22017-10-02
442017-10-03

This aggregation can also be done in Matillion very easily from the raw table:


The Aggregate component would simply need to sum the Subscribe/Unsubscribe column with a grouping of ListID. Once this is achieved, there will need to be a union of data from the existing snapshot data so that there can be a baseline to work with for rolling counts. Only then can the very powerful Window Calculation component be used for rolling up the sum of subscribers based on a partition (in this case the ListID) and a partition ordering (in this case Date). The Matillion job would look as follows:


There are further properties for the Windows Calculation component that need to be considered, including setting the lower bound to “unbounded preceding”. This ensures that the calculated sum is rolled up from the very first record of the dataset that was unioned from the previous component. The properties would look as follows:


There are of course other edge cases that will need to be considered that could slowly make your transformation job increasingly complex. These can include lists that were created after the snapshot was taken, or even more interesting is catering for missing days that receive no events which will need to be populated with a default of zero so aggregation can be included in the roll-up for that day. Thankfully Matillion makes it simple to grow on the above transformation so it can be easily maintained.

The final table would then look as follows:

ListIDSubscribersDate
110002017-10-01
220002017-10-01
330002017-10-01
440002017-10-01
110002017-10-02
220032017-10-02
330012017-10-02
439982017-10-02
110002017-10-03
220032017-10-03
330012017-10-03
440022017-10-03

And this can then be used when reporting over date ranges from your front end reporting application.

This example is just one of many use-cases where Matillion has helped Campaign Monitor fulfill its ETL requirements in order to provide reporting on large datasets. In particular, event based data has helped significantly improve SLAs since the frequency of aggregating data can be done within minutes compared to that of the traditional relational data ETLs that process large queries over many hours and provide much lower refresh rates for report data.