When we launched the Beta version of our business intelligence tool, Verto Monitor, we had significant issues with query performance. In the worst cases, customers had to wait minutes for the graphs to appear on their screens. In this blog post we explain what was causing the performance issues and what we did to overcome them.
For the tech-minded, here’s a quick overview of our Verto Monitor:
- We use Amazon Redshift as a database for Verto Monitor. Our customers can access data via this web-based dashboard.
- The API calls are processed in a Java application, which dynamically generates complex SQL queries to the Redshift database.
The core functionality of the monitor is to provide user insight into the true unduplicated multi-screen audience measurement data. This means that the monitor executes complex queries on raw session-level data of the panelists’ activities. It is important to note that the monitor is the end-user facing solution that we expect a multitude of users to access, not just a single backend big data solution, which means we need to emphasize query resonse in a very dynamic setting. Naturally, all the panelist data has been fully anonymized and any personally identifiable information is stored in a separate location and is inaccessible to any monitor users. Additionally, the design of the monitor prevents extraction of individual data records; in other words, any results provided by the monitor are aggregates from the data, consisting of data from multiple panelists to both ensure anynoymity and data stability. The weights generated in our calibration are performed online when analyzing the data.
Addressing Query Performance
When we launched the Beta version of the Verto Monitor, we had significant issues with query performance. In the worst cases, customers had to wait minutes for the graphs to appear on their screens. In this article we explain what was causing the performance issues and what we did to overcome them.
Figure 1: Original Database Schema
We started with a traditional normalized database schema, where the main design goal was to conserve disk space by avoiding data duplication.
We had multiple fact tables, which stored event level data about actions observed in our panelists‘ devices. The fact tables had a number of common columns, which were stored in a separate table called events. The fact tables had a one-to-one relationship to the events table, which was very large, because it had one row per each event observed in our anonymized panelists‘ devices.
The fact tables had a many-to-one relationship to another large table called metadata_structure which contained metadata about the observed events, including application names, website URLs, etc.
The events table had a many-to-one relationship to a table called devices, which stored data about devices in which the events had occurred. The devices table had a many-to-one relationship to the panelists table, which contained data about the panelists, who owned the devices. The panelists table referred to the demographics table, which contained data about panelists‘ demographics, and to the weights table, which contained weighting information about the panelists.
The distribution keys and sort keys were chosen to optimize the largest join, which is the join between the fact table and the events table. The panelists, demographics, weights and devices tables were relatively small, so we chose to use the ALL distribution style for them to avoid unnecessary data redistributions.
First attempt: Denormalization
One problem with the original schema was that the common columns of the fact tables were stored in the events table. This implied performing a join between two very large tables, which is not optimal in terms of performance.
The first attempt to optimize performance was to denormalize the fact tables. This was done by moving the common columns from the events table to the fact tables themselves. The distribution keys and sort keys were selected to optimize the join from the fact table to the second largest table, metadata_structure.
Figure 2: Schema with denormalized fact table
This change gave a slight performance improvement, but our response times were still far from the goal.
We were analyzing the query plans using the EXPLAIN command and trying various different combinations of distribution keys and sort keys. We finally came to the conclusion that the problem was as follows: Redshift had to redistribute a large amount of data to all nodes with each query, even though the join between the fact and the metadata_structure tables was done over a column which was defined as a distribution key, and all the rest of the tables were using the ALL distribution style. Actually, after judging the query plans, it seemed that Redshift was redistributing the largest tables, fact and metadata_structure, to all nodes to be able to join them to the smaller tables, which did not seem to make any sense.
Second attempt: Star Schema
The cause of these redistributions was the multi-level joins we were performing, as can be seen in Figure 2. This kind of data model proved to be very harmful to performance in Redshift. This is because Redshift had to first join the fact table to metadata_structure and redistribute the resulting joined table to all nodes. It then had to join the panelists table to demographics and weights and redistribute the result to all nodes. It was only after this that it was able to make the final join to the devices table.
Based on this analysis we decided that we needed to completely rewrite our queries and modify our schema to follow a pattern called the “Star Schema“, which is a very commonly known data model in analytical Data Warehouses. The idea is that there are a number of large fact tables, which directly join to several dimension tables.
Figure 3: Star Schema
This change decreased the query response times by approximately 80%. When analyzing the query plans, we noticed that the queries no longer required any data redistributions, because data in the fact table and metadata_structure was co-located with the distribution key and the rest of the tables were using the ALL distribution style; and because the fact table joined directly to all other tables and not through any intermediate steps. Merge join could also be used for joining the fact table to metadata_structure, because their common columns were defined as sort keys.
Third attempt: Combining dimensions
We still wanted to improve performance, so we came up with the idea of combining some of the dimension tables. Since all the dimension tables (panelists, devices, weights and demographics) could be joined to the fact table using the same ID column, we were able to denormalize all these tables into one dimension table called panelist_dim. This new table also used the ALL distribution style.
After this change the query response times were decreased by more than 90% from the original, which was declared an acceptable permformance margin improvement by our management.
Figure 4: Star Schema with denormalized dimension tables
Conclusion: Star Schema is Optimal for Data Structure
Thanks to our changes, In total we achieved an average reduction of approximately 90 % in query execution times. This decrease was not achieved solely by the optimizations described in this blog post article. We also did a number of other optimizations, which were very specific to our particular use case, and thus are not discussed here. Of course, we we also cache the query results, but this article only discusses the queries that reach the database itself.
Our main finding was that the Star Schema was the most optimal data structure for performance in our use case. This is because it allows Redshift to avoid most of the costly data redistributions, especially if you have selected your distribution keys correctly.