Styles

Monday, October 30, 2017

Aggregating Event Streams in Redshift using Matillion

If you haven’t heard of Matillion, then it’s time you knew about it. If you’ve used ETL tools in the past like the traditional SSIS for Microsoft’s SQL Server, then Matillion would be very familiar to you. Where SSIS was tailored for SQL Server, Matillion is tailored for Amazon’s Data Warehouse offering, AWS Redshift.




Matillion’s user interface is very intuitive as it groups all its components into three natural categories: those for extract, those for transform, and those for load.

Since Matillion sits on top of Redshift, both must live within AWS, so the features for extracting include components for RDS (as well as any other relational databases that support JDBC drivers). It also supports DynamoDB, S3, as well as components for MongoDB, Cassandra and Couchbase.


Components used for transformation provide the ability to aggregate, filter, rank, and even to roll up sums over multiple records from a transient “staging” Redshift table to load it to a more permanent Redshift table. The underlying functions for these components are in fact Redshift query commands.

Matillion seems quite mature with countless component features at your fingertips. Much of the transformation logic you can construct can definitely be written by hand using Redshift’s comprehensive documentation on ANSI SQL commands, but what Matillion can provide is a visual representation which significantly improves collaboration amongst team members so they can all be equally responsible for maintaining the ETL process in an efficient way.

Many could be forgiven for associating an ETL process with the old techniques of extracting large amounts of data from a monolithic data store to push into a read store using the typical CQRS architecture, but Matillion can be used for much more than that.

Our use-case at Campaign Monitor not only required data from a snapshot-style structure as is found in a traditional monolithic relational database, but also data that is sourced from an event stream for a rolling daily aggregation.

In order to achieve this we needed an initial query to obtain the total snapshot on a particular day, which can then be stored in a “roll-up” table in Redshift as the first day of aggregated totals. Let’s take the following table as an example snapshot that can be the baseline for any future events:

ListIDSubscribersDate
110002017-10-01
220002017-10-01
330002017-10-01
440002017-10-01

This can be done using a traditional ETL job in Matillion as follows:



There is a lot of sophisticated magic under the hood with regards to the Database Query component. The SQL query that is run from the relational database will return a resultset that Matillion can chunk into separate concurrent processes. Each of these processes will create an underlying S3 file before loading the data into the “rs_lists_subscriber_rollup_staging” table in Redshift.

Once we have a snapshot on a particular date (in this case 2017–10–01 in the example above) all events for subscribers that come in after this date can be streamed from our event source, and then aggregated into the same table with a rolling count of subscribers for subsequent dates.

This requires some form of consumption externally to receive the incoming events, which can be batched into S3 files, and then persisted into an ephemeral “raw” table in Redshift using the COPY command. This raw table would look as follows:

ListIDSubscriberIDSubscribe TallyDate
1112017-10-02
12-12017-10-02
3112017-10-02
2112017-10-02
2212017-10-02
2312017-10-02
41-12017-10-02
42-12017-10-02
43-12017-10-02
4412017-10-02
4112017-10-03
4512017-10-03
4612017-10-03
4712017-10-03

Generally, a contact subscribes or unsubscribes at a specific time from a list, and this will need to be aggregated daily per list so the data above would look as follows:

ListIDSubscribersDate
102017-10-02
232017-10-02
312017-10-02
4-22017-10-02
442017-10-03

This aggregation can also be done in Matillion very easily from the raw table:


The Aggregate component would simply need to sum the Subscribe/Unsubscribe column with a grouping of ListID. Once this is achieved, there will need to be a union of data from the existing snapshot data so that there can be a baseline to work with for rolling counts. Only then can the very powerful Window Calculation component be used for rolling up the sum of subscribers based on a partition (in this case the ListID) and a partition ordering (in this case Date). The Matillion job would look as follows:


There are further properties for the Windows Calculation component that need to be considered, including setting the lower bound to “unbounded preceding”. This ensures that the calculated sum is rolled up from the very first record of the dataset that was unioned from the previous component. The properties would look as follows:


There are of course other edge cases that will need to be considered that could slowly make your transformation job increasingly complex. These can include lists that were created after the snapshot was taken, or even more interesting is catering for missing days that receive no events which will need to be populated with a default of zero so aggregation can be included in the roll-up for that day. Thankfully Matillion makes it simple to grow on the above transformation so it can be easily maintained.

The final table would then look as follows:

ListIDSubscribersDate
110002017-10-01
220002017-10-01
330002017-10-01
440002017-10-01
110002017-10-02
220032017-10-02
330012017-10-02
439982017-10-02
110002017-10-03
220032017-10-03
330012017-10-03
440022017-10-03

And this can then be used when reporting over date ranges from your front end reporting application.

This example is just one of many use-cases where Matillion has helped Campaign Monitor fulfill its ETL requirements in order to provide reporting on large datasets. In particular, event based data has helped significantly improve SLAs since the frequency of aggregating data can be done within minutes compared to that of the traditional relational data ETLs that process large queries over many hours and provide much lower refresh rates for report data.

Wednesday, September 13, 2017

Gotchas with Altering Table Schemas in Redshift

As I learned recently the hard way, simply altering tables in Redshift to perform a simple operation of adding columns doesn't behave the same way in a data warehouse as it does in regular relational databases.

Running the following SQL statement in Redshift actually has a lot more issues than you would expect:

ALTER TABLE rs_table ADD COLUMN new_column;

Despite the simple nature of this statement, many issues begin to emerge.

One of these is table locking. If an application is connecting to it via JDBC, connections would end up being queued because one Redshift cluster only allows 500 concurrent connections as per their online documentation.

Moreover, it was noticed that there were a number of duplicate records when inserts occurred on a matching key instead of updating the existing record or deleting the old record. This was most likely due to the fact that it was in the process of being altered during an insert and the key match could not be found.

This meant that an alternative approach was needed in order to prevent the locking and concurrency.
The following steps were tried and tested, and proved to work well:
  • Create the new table with the new schema suffixed with _new
  • Copy data from the old table to the new table
  • Rename old table suffixed with _old
  • Rename new table removing _new suffix
This is done in SQL as follows:

-- Create new table with new schema
CREATE TABLE rs_table_new
(
    id numeric(10),
    name varchar(250),
    date timestamp,
    new_column numeric(10)
) SORTKEY(dbid, date, clientid);
-- Copy data from old table to new
INSERT INTO rs_table_new
SELECT
    id,
    name,
    date,
    NULL
FROM rs_table;
-- Rename table
ALTER TABLE rs_table RENAME TO rs_table_old;
ALTER TABLE rs_table_new RENAME TO rs_table;

Amazingly, the total execution time for a table with over 10 million records completed in under 2 minutes as opposed to the initial 2 hours.

Monday, June 13, 2016

Don't Always Believe Your Data

As erroneous as that statement might seem, not trusting your data might be the most prudent thing you do. In this new age of data-driven development where data has become a precious commodity, the need to unlock valuable information that businesses hold about customer interactions has become a crucial part of business success, and hence the investment for better ways to store and extract data has evolved over the last few years more than it ever had over the previous two decades.

The options are endless now with experts recommending vast arrays of strategies to tackle your storage and analysis techniques. But it is more than just the sum of all knowledge about these alternatives that makes you a Data Scientist. The reason we store data in the first place is to provide information that can remove ambiguity in our decision making process. That requires a broader outlook towards investigating industry trends and changes in technology on top of our expertise on how well we persist and describe our business data.

There was a time when relational data stores ruled the earth and with them came the ability to analyse large amounts of data using OLAP cubes. But as the need grew for a more efficient Extract Transform Load (ETL) process the limitations became obvious. Analyzing records that were aggregated nightly because there was “too much data” to extract no longer became a legitimate excuse. Businesses want it now, and they want a lot of it.

With that, storage capabilities evolved to scale horizontally with the map-reduce patterns of document stores like Mongo, key-value stores like Riak, graph stores like Neo4j, and columnar stores like Cassandra. Add into the mix the likes of Big Data storage such as Hadoop, and Event Streams for real time processing like kafka and Amazon Kinesis. Although competition is always a welcome dynamic in any industry, with every contender vying for feature capabilities over their competitors, the choice becomes difficult.

Storage is just one part of the puzzle however. Next would come analysis and forecast. Once a business makes sense of the data it holds with a myriad of techniques for analysis, predicting change comes packaged for us with Machine Learning. Apache Spark jumped on this wagon early commercially with its MLlib offering and Google has also been a popular choice with TensorFlow. There is a degree of expertise required in handling these tools including understanding the type of learning you want your machine to perform, whether its supervised learning using labeled datasets (for fraud detection and recommendations), unsupervised and semi-supervised learning with unlabeled datasets (for image and voice recognition), or reinforcement learning (for artificial intelligence).

There is more to it than knowing your alternatives though. Your data can only tell you as much information as your business keeps about itself, and so Data Science is not just about how to unlock your data with the available tools, its about causation. It's about why.

Identifying peaks or troughs in a graph might help you determine with a guarantee that your tools can help you see patterns or changes, but not the reasons why these patterns and changes exist. And thus starts the real investigation.

This was a lesson that Andrea Burbank from Pinterest learnt as she explained at the YOW! Conference held in Sydney late 2016, and she was kind enough to share her challenges in trying to successfully formulate behavioral trends from Pinterest's massive data storage. The results of her findings were very interesting.

By measuring "daily active users", Pinterest were able to determine which unique customers engaged on their site. Using kafka, they were able to stream logs of response data that told the user's story, but that was just basic counting.

In late October of 2013, they found a sudden step change in growth rate specifically with iPhone users, which seemed likely to be the result of Pinterest being featured in the App Store. What they found though was that it was merely a coincidence because not long before a new feature was introduced to iOS 7 called "Background App Refresh" to prevent apps hanging in suspended animation.

Burbank realized that the statistics they generated weren't telling the full story and after digging further found that one of their endpoints, particularly the home feed endpoint, was giving unnatural background hits even when the app wasn't appearing in the multitask tray, and that obviously had no correlation to daily active users.

This lead to what Burbank defined as the "Spectrum of Certainty" where correlation is found at one end of the spectrum while causation is found at the other. Most of the time Data Scientists believe they are closer to the causation end of the spectrum, when in actual fact they sit very close to the correlation side.

The goal is to ensure data analysis moves towards causal inference, in effect requiring much more investigative techniques about events that occur outside the business domain which may have a strong impact on how data is represented within the business itself. Only then can you truly believe your data to a certain degree.

Reference: Data science as software, Andrea Burbank, Pinterest, YOW! Conference 2016 Sydney