Styles

Monday, February 10, 2020

Multiple Spark Streaming Jobs in a Single EMR Cluster


A managed Spark cluster can always save time on infrastructure maintenance. EMR (Elastic Map Reduce) is a tool that empowers data engineers to focus on building jobs to run on Spark rather than spending days managing the architecture of the cluster. However, that doesn't come without its shortfalls.

As you may already know, you can only run one job at a time in an EMR cluster. This may not be too much of an issue when running a finite Spark job or batched process since there is a completion time, however that is not the case with Spark's Structured Streaming because there is no end time!

In a self-managed vanilla Spark cluster, it is possible to submit multiple jobs to a YARN resource manager and distribute the CPU and memory allocation to share its resources even when the jobs are running Structured Streaming.

Unfortunately submitting a job to an EMR cluster that already has a job running will queue the newly submitted job. It will remain in a pending state until the previous job completes, which never does in a streaming scenario.
Having one EMR cluster for every job wouldn't be the best solution however, because AWS infrastructure costs can grow pretty quickly, especially when consulting the EMR pricing page. The main cost issues revolve around having a master node created for every EMR cluster which only passes job requests to the core nodes, and those cores may not even have their resources fully utilized for smaller streaming tasks.

This means rethinking the strategy of writing streaming jobs for use within an EMR cluster in order to work around this limitation. For this we need to look into improving the application layer.

A Typical Structured Streaming Application

The primary reason a data engineer will require the use of Structured Streaming is for consuming data from an event source, transforming it based on specific business rules or for the purpose of aggregation, and then finally persisting the result of the transformation in a data store. Typically the Scala code would require three main steps.

1. Reading from the event payload
def getEmailOpenedEvent(rawEvent: Array[Byte]): List[EmailAggregateEvent] = {
  EmailOpened.parseFrom(rawEvent)
    .map(msg =>
        EmailAggregateEvent(
          msg.getEmailId.toByteArray,
          msg.getSubscriberId.toByteArray,
          msg.getOpenedDateUtc.toTimestamp
        )
    ).toList
}
Firstly, as shown in the above code, we will need the ability to parse the payload from the event stream. The EmailOpened type is generated based on a standard byte array format (e.g. Protobuf, Avro or Thrift).

2. Transforming the messages into aggregated hourly buckets
def transformEmailOpened(ds: Dataset[EmailAggregateEvent]): Dataset[EmailAggregateData] = {
  import ds.sparkSession.implicits._

  ds.withWatermark("timestamp", "24 hours")
    .groupBy(
      col("emailId"),
      window(col("timestamp"), "1 hour")
    )
    .agg(count("*").as("eventCount"))
    .withColumn("timestampBucket", col("window.start"))
    .as[EmailAggregateData]
}
As shown above, a typical solution on how to work with streams is by aggregating the number of events based on a timestamp and maintaining an hourly bucket, which is then parsed as an EmailAggregateData type. For more information, the online programming guide on window operations in Structured Streaming can provide further detail on this type of implementation.

3. Starting the process
def process(topic: String, sql: String): Unit = {
  import spark.implicits._

  val stream = SparkSession.builder.getOrCreate
    .readEventStream(config.brokers, topic)
    .flatMap(getEmailOpenedEvent(_))

  val query = transformEmailOpened(stream)
      .writeStream
      .foreach(databaseWriter(sql))
      .trigger(Trigger.ProcessingTime("5 minutes"))
      .start

  query.awaitTermination
}
Lastly, we will need to start the process of consuming and writing to our data store. The readEventStream() function is an abstraction of SparkSession.readStream() to consume from an event source (either Apache Kafka, Amazon Kinesis, or Azure Event Hub etc). The databaseWriter() function creates a JDBC connection to execute SQL upsert commands.

There is an issue with this approach however.

If it is not already obvious, this code can quickly grow. Similar code will need to be duplicated for every event to consume from and aggregate on. Moreover, this code would require separate EMR clusters, so the code will need to be consolidated.

Consolidating repeated code using generics

Since Scala is flexible in providing a hybrid of both functional programming and Java's familiar object-oriented concepts, leveraging the use of base classes can provide great advantages.
class Event extends Product {
  // base functions
}

case class EmailAggregateEvent(
  emailId:      Array[Byte],
  subscriberId: Array[Byte],
  timestamp:    Timestamp
) extends Event {
  // override functions
}
There is one important note to consider about the above inheritance. Although Scala is flexible in enabling object-oriented programming techniques, we are still dealing with the caveats of Spark, namely serializable objects required for passing information between executors. Attempting to use a trait will actually throw an error:
Error: scala.ScalaReflectionException: <none> is not a term
This is because it simply cannot serialize a non-concrete type, and the Spark runtime is not (yet) clever enough to understand inheritance, so it will attempt to serialize the actual trait itself, which is impossible.

Moreover, when using a concrete class as the base class it will need to extend off the Product trait. This is because a case class automatically extends the Product trait which allows Spark to recognizes complex types at runtime for serialization.

Inheritance will enable us to achieve the following:
case class Job[T >: Event, U >: Data](
    topic:     String,
    consume:   Array[Byte] => List[T],
    transform: Dataset[T] => Dataset[U],
    sql:       String
)
The Job type can now help substitute any job-specific implementation in the process() function as follows:
def process[T >: Event, U >: Data](job: Job[T, U]): Unit = {
  import spark.implicits._

  val stream = SparkSession.builder.getOrCreate
    .readEventStream(config.brokers, job.topic)
    .flatMap(job.consume(_))

  val query = job.transform(stream)
      .writeStream
      .foreach(databaseWriter(job.sql))
      .trigger(Trigger.ProcessingTime("5 minutes")))
      .start

  query.awaitTermination
}
Unfortunately this will still not be enough.

In the original transformEmailOpened() function above, there are implicit conversions which can no longer work when using inheritance. The Spark API provides an easy to use property called as[T] which returns a new Dataset where each field is mapped to columns of the same name of the given type T. During runtime Spark will need a little help to understand what type is required explicitly, so the transformEmailOpened() function will need to change as follows:
def transformEmailOpened(ds: Dataset[EmailAggregateEvent]): Dataset[EmailAggregateData] = {
  import ds.sparkSession.implicits._

  ds
    // explicitly map to concrete type
    .map(row => EmailAggregateEvent(
      row.emailId,
      row.subscriberId,
      row.timestamp
    ))
    .withWatermark("timestamp", "24 hours")
    .groupBy(
      col("emailId"),
      window(col("timestamp"), "1 hour")
    )
    .agg(count("*").as("eventCount"))
    .withColumn("timestampBucket", col("window.start"))
    // replace as[T] with explicit map
    .map(row => EmailAggregateData(
      row.getAs("emailId"),
      row.getAs("timestampBucket"),
      row.getAs("eventCount")
    ))
}
Inheritance can now work at runtime, and will enable our code to scale when more events are required for consumption. Adding multiple jobs can now be relatively easy.

However, we cannot simply just write a jobs.foreach() implementation and expect it to work since it will never iterate past the first streaming job. This is where concurrency plays an important role in the overall solution.

Concurrency

Enabling parallel programming in Scala is very simple. The .par() function enables parallel processing of a collection in an extremely concise way. We can take advantage of this by wrapping the entire functionality of the process() function into a parallel foreach() loop as follows:
jobs.par.foreach(job => {
  process(job)
})
There is one thing that hasn't been considered with the above code though. The SparkSession that is used within the process() function is a static object and only one can be active per Spark cluster. However that doesn't mean we cannot start multiple streaming jobs using the same SparkSession! The SparkSession.readEventStream() functionality won't be a problem running in parallel even though the function is being called multiple times against the same static Spark session.

In terms of concurrency though, the .par() functionality is a multi-threaded implementation. In the JVM framework each thread runs on one CPU unit (either a physical processor or hyper-threaded processor). This means a multi-threaded approach would only work if the number of jobs required is less than the number of processors in the EMR cluster. To find out how many cores (and essentially how many separate threads the job can run on) the following value can be outputted:
Runtime.getRuntime.availableProcessors
The problem with this solution is cost. When considering that each job may only utilize 5% or less of the CPU units, we may have more cores in the EMR cluster than is necessary. As an example, running 12 jobs concurrently means the next instance size up to support this is m4.4xlarge which has 16 cores. That is a total of 4 cores that will sit idle never to be used. Moreover, each used core may also be underutilized.

Considering asynchronous programming rather than a multi-threaded approach suddenly becomes more relevant, and Scala's Futures can come in handy with only a few simple changes:
val executionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(maxConcurrentJobs))

val tasks: List[Future[Unit]] = jobs
  .map(job =>
    Future {
      process(job)
    }(executionContext)
  )

Await.result(Future.sequence(tasks)(implicitly, executionContext), Duration.Inf)
With this minimal code change we can have massive benefits in cost reduction. By default the thread pool count (i.e. the maximum number of concurrent runs) is actually the number of processors, just like the multi-threading approach. This is where the executionContext comes in handy to explicitly set the number of concurrent executions with the maxConcurrentJobs variable. This number can be as large as necessary to handle as many job across the entire EMR cluster regardless of how many cores are provisioned. The limit can now be based on the exact number of cores and the resource allocation required per individual job.

Conclusion

Even with the limitations of EMR clusters allowing only one job to run at any given time, as well as considering that only one static SparkSession object exists per Spark job, there are still efficient ways to work around these limitations.

Jobs can be refactored to enable consolidation of code repetition which in turn enables the possibilities of concurrent processing. This can significantly reduce cost of infrastructure and keep a clean code base that can be extensible.

No comments :