Monday, October 30, 2017

Aggregating Event Streams in Redshift using Matillion

If you haven’t heard of Matillion, then it’s time you knew about it. If you’ve used ETL tools in the past like the traditional SSIS for Microsoft’s SQL Server, then Matillion would be very familiar to you. Where SSIS was tailored for SQL Server, Matillion is tailored for Amazon’s Data Warehouse offering, AWS Redshift.

Matillion’s user interface is very intuitive as it groups all its components into three natural categories: those for extract, those for transform, and those for load.

Since Matillion sits on top of Redshift, both must live within AWS, so the features for extracting include components for RDS (as well as any other relational databases that support JDBC drivers). It also supports DynamoDB, S3, as well as components for MongoDB, Cassandra and Couchbase.

Components used for transformation provide the ability to aggregate, filter, rank, and even to roll up sums over multiple records from a transient “staging” Redshift table to load it to a more permanent Redshift table. The underlying functions for these components are in fact Redshift query commands.

Matillion seems quite mature with countless component features at your fingertips. Much of the transformation logic you can construct can definitely be written by hand using Redshift’s comprehensive documentation on ANSI SQL commands, but what Matillion can provide is a visual representation which significantly improves collaboration amongst team members so they can all be equally responsible for maintaining the ETL process in an efficient way.

Many could be forgiven for associating an ETL process with the old techniques of extracting large amounts of data from a monolithic data store to push into a read store using the typical CQRS architecture, but Matillion can be used for much more than that.

Our use-case at Campaign Monitor not only required data from a snapshot-style structure as is found in a traditional monolithic relational database, but also data that is sourced from an event stream for a rolling daily aggregation.

In order to achieve this we needed an initial query to obtain the total snapshot on a particular day, which can then be stored in a “roll-up” table in Redshift as the first day of aggregated totals. Let’s take the following table as an example snapshot that can be the baseline for any future events:


This can be done using a traditional ETL job in Matillion as follows:

There is a lot of sophisticated magic under the hood with regards to the Database Query component. The SQL query that is run from the relational database will return a resultset that Matillion can chunk into separate concurrent processes. Each of these processes will create an underlying S3 file before loading the data into the “rs_lists_subscriber_rollup_staging” table in Redshift.

Once we have a snapshot on a particular date (in this case 2017–10–01 in the example above) all events for subscribers that come in after this date can be streamed from our event source, and then aggregated into the same table with a rolling count of subscribers for subsequent dates.

This requires some form of consumption externally to receive the incoming events, which can be batched into S3 files, and then persisted into an ephemeral “raw” table in Redshift using the COPY command. This raw table would look as follows:

ListIDSubscriberIDSubscribe TallyDate

Generally, a contact subscribes or unsubscribes at a specific time from a list, and this will need to be aggregated daily per list so the data above would look as follows:


This aggregation can also be done in Matillion very easily from the raw table:

The Aggregate component would simply need to sum the Subscribe/Unsubscribe column with a grouping of ListID. Once this is achieved, there will need to be a union of data from the existing snapshot data so that there can be a baseline to work with for rolling counts. Only then can the very powerful Window Calculation component be used for rolling up the sum of subscribers based on a partition (in this case the ListID) and a partition ordering (in this case Date). The Matillion job would look as follows:

There are further properties for the Windows Calculation component that need to be considered, including setting the lower bound to “unbounded preceding”. This ensures that the calculated sum is rolled up from the very first record of the dataset that was unioned from the previous component. The properties would look as follows:

There are of course other edge cases that will need to be considered that could slowly make your transformation job increasingly complex. These can include lists that were created after the snapshot was taken, or even more interesting is catering for missing days that receive no events which will need to be populated with a default of zero so aggregation can be included in the roll-up for that day. Thankfully Matillion makes it simple to grow on the above transformation so it can be easily maintained.

The final table would then look as follows:


And this can then be used when reporting over date ranges from your front end reporting application.

This example is just one of many use-cases where Matillion has helped Campaign Monitor fulfill its ETL requirements in order to provide reporting on large datasets. In particular, event based data has helped significantly improve SLAs since the frequency of aggregating data can be done within minutes compared to that of the traditional relational data ETLs that process large queries over many hours and provide much lower refresh rates for report data.

Wednesday, September 13, 2017

Gotchas with Altering Table Schemas in Redshift

As I learned recently the hard way, simply altering tables in Redshift to perform a simple operation of adding columns doesn't behave the same way in a data warehouse as it does in regular relational databases.

Running the following SQL statement in Redshift actually has a lot more issues than you would expect:

ALTER TABLE rs_table ADD COLUMN new_column;

Despite the simple nature of this statement, many issues begin to emerge.

One of these is table locking. If an application is connecting to it via JDBC, connections would end up being queued because one Redshift cluster only allows 500 concurrent connections as per their online documentation.

Moreover, it was noticed that there were a number of duplicate records when inserts occurred on a matching key instead of updating the existing record or deleting the old record. This was most likely due to the fact that it was in the process of being altered during an insert and the key match could not be found.

This meant that an alternative approach was needed in order to prevent the locking and concurrency.
The following steps were tried and tested, and proved to work well:
  • Create the new table with the new schema suffixed with _new
  • Copy data from the old table to the new table
  • Rename old table suffixed with _old
  • Rename new table removing _new suffix
This is done in SQL as follows:

-- Create new table with new schema
CREATE TABLE rs_table_new
    id numeric(10),
    name varchar(250),
    date timestamp,
    new_column numeric(10)
) SORTKEY(dbid, date, clientid);
-- Copy data from old table to new
INSERT INTO rs_table_new
FROM rs_table;
-- Rename table
ALTER TABLE rs_table RENAME TO rs_table_old;
ALTER TABLE rs_table_new RENAME TO rs_table;

Amazingly, the total execution time for a table with over 10 million records completed in under 2 minutes as opposed to the initial 2 hours.