Visualizing a sample analytics query Recently at Affinity, we accelerated our analytics workloads on Redshift by optimizing our use of joins. In this blog post, we begin by sharing the general takeaways that you can apply as you optimize your Redshift queries. We then share our step-by-step journey in which we apply these takeaways to […]
Recently at Affinity, we accelerated our analytics workloads on Redshift by optimizing our use of joins. In this blog post, we begin by sharing the general takeaways that you can apply as you optimize your Redshift queries. We then share our step-by-step journey in which we apply these takeaways to speeding up the many joins involved in flattening our Entity-attribute-value (EAV) data.
There are plenty of other performance optimizations for working with Redshift such as column compression encodings, sortkeys, and more. However, these are beyond the scope of this blog post. For further context on these areas, check out the docs.
One of our most heavily used features in Affinity today is the ability for users to create Lists of entities that they want to track. A common use case is tracking a pipeline of organizations (also known as companies) that are being considered for investment.
The core data model backing these Lists follows the Entity-attribute-value (EAV) model. Each column in the List above is represented as an Entity Attribute and each value in that column is represented as an Entity Value in our database (except for the Name column). Here’s what the relevant tables look like for the List above:
The biggest advantage of the EAV data model is flexibility: every time a user wants to add a new custom column to their List, it doesn’t require us to introduce a schema change. We simply insert a new row into the Entity Attribute table and start storing the corresponding cell values in the Entity Value table. This is great when the user actions follow an OLTP model. However, it falls short when trying to support an analytics workload (OLAP) in which users might aggregate across a few columns; doing those joins on Redshift would not be performant.
Consequently, we used the popular data transformation tool dbt to flatten the EAV data beforehand (instead of at query time) and generate a database table looking quite similar to the List in the product screenshot above:
We followed the general approach outlined here and present below one such sample query that has been simplified for demonstration purposes.
Here we flatten the list of companies shown in the screenshot. First, we select fromlists_entries to get the IDs of companies that exist on the Fortune 500 list. Next, we join with companies to pull the respective company’s name and website. Then, we left join with 3 subqueries each of which represents a different column (aka Entity Attribute): Owners, Source of Introduction and CEO. In this example, the 3 columns all happen to contain persons as values. So in each of the subqueries we join entity_values with personsto denormalize and pull in the name of the person referred to by the entity_value.
Next, let’s take a look at how we can speed up these joins.
Before we dive deeper into the query plan, the first thing that sticks out as room for improving the query’s performance is avoiding the multiple joins between entity_values and persons that are repeated once for every column. We can instead perform this join once and re-use the results in each of the left join subqueries. With the help of dbt, let’s create an intermediate table called denormalized_entity_values to store the result of the join and reference this table in our subqueries.
Our original query now looks like (omitting dbt refs for readability):
Next, let’s take a look at the query plan in Redshift:
You might notice that the joins have the biggest cost in the query plan and the cost increases significantly for each subsequent join. This makes sense since these are the steps in which lots of data is being transferred over the network between nodes. This can be orders of magnitude slower than operations within a node such as a sequential scan. As pointed out in the AWS documentation:
When you execute a query, the query optimizer redistributes the rows to the compute nodes as needed to perform any joins and aggregations. The goal in selecting a table distribution style is to minimize the impact of the redistribution step by locating the data where it needs to be before the query is run.
Let’s walk through each of the joins in the query, beginning with the innermost join, to understand how we can minimize data redistribution.
Starting with the inner most join, we see that the join is of type DS_DIST_ALL_NONE (see here for the definition of each redistribution style). This means that no data was redistributed between nodes in order to compute the join. This is because an entire copy of the lists_entries table exists on each node (aka DISTSTYLE ALL) and the companies table is distributed on the id column (aka DISTSTYLE KEY DISTKEY id). The result of this join is consequently also distributed on the company ID.
To examine the distkey (and sortkey) for a given table, run this query (you will need to delete the admin. unless you have an admin schema) to create the v_generate_table_ddl view and query it like so:
SELECT * FROM v_generate_table_ddl where tablename = 'companies';
The next join is a DS_DIST_OUTER join with denormalized_entity_values on its company_id column to get the Source of Introduction (as seen by the filter entity_attribute_id = 1). Since this table is not distributed on the company_id column, Redshift performs the redistribution on company_id at execution time and sends the rows of the outer table (denormalized_entity_values) to the respective node based on company_id. This is an expensive step due to the data redistribution; the result is now still distributed on the company ID.
In order to get the Owner, the next join is performed on a different key, namely list_entry_id. Consequently, this is a DS_DIST_BOTH join where both the result from the previous step and the denormalized_entity_values table are redistributed to each node based on the list_entry_id column. This is a much more expensive step than DS_DIST_OUTER since both sides of the join are being redistributed. The result is now distributed on the new join column list_entry_id.
Finally, we have to do another DS_DIST_BOTH join with denormalized_entity_values to get the CEO because the join is on the company_id column but the tables on both sides of the join are distributed on list_entry_id.
The general takeaway here is that its usually best to distribute your Redshift tables on the join columns. However, you can only have one DISTKEY. Since the denormalized_entity_values table is joined on both the company_id and list_entry_id columns, we partition the table into two separate tables: one distributed on company_id and the other distributed on list_entry_id. This means that there are now two copies of the same table and this comes at a cost of keeping them in sync. However, in our case, rows where the list_entry_id is not null will only ever be joined on list_entry_id while the other rows will only ever be joined on company_id; that is, the entity_value is either on the list entry or the company but not both. So separating denormalized_entity_values into two separate tables strictly partitions the table. The resulting query and query plan are shown below. We omit the two new dbt transforms used to produce the two denormalized tables for brevity.
You will notice now that in the updated query plan above the Source of Introduction join involves no redistribution of data (DS_DIST_NONE) since denormalized_company_entity_values is distributed on the company_id column. However, the Owner join requires the inner result set to be redistributed on lists_entries.id before it can be joined with denormalized_list_entry_entity_values because the result from the previous step was distributed on company_id. This newly joined result is then redistributed again, this time on companies.id in order to perform the CEO join. These latter two joins both involve redistributing the inner result set (aka DS_DIST_INNER).
We can avoid redistributing data on both joins by re-ordering the CEO join to come before the Owner join; this way we can complete all the joins on the same distkey, namely company_id, first before we have to redistribute the data on a new join column, namely list_entry_id. This is only possible here because the left joins don’t depend on the left joins that come before or after it. They only left join data with the base result of the join betweenlists_entries and companies.
In the query plan below, you can see that the Source of Introduction join and CEO join both involve no redistribution of data (DS_DIST_NONE). We now only have one data redistribution step DS_DIST_INNER for the Owner join on list_entry_id. This single data reshuffle step is unavoidable due to the change in join column partway through the query. In fact, this step will continue to be the only data redistribution step, even as we add more left joins for various other Entity Attributes in the Affinity List, as long as we order the left joins on company_id to happen first before the left joins on list_entry_id.
By applying the takeaways at the beginning of this post, we have brought down the overall cost estimate of the query from 5.98M in the v1 query plan to 1.65M in the v3 query plan. This performance improvement is amplified even further as we add more left joins to our query to flatten other columns in the Affinity List (besides Owner, Source of Introduction and CEO).
We hope you found these takeaways helpful. If you have more ideas for how to speed up joins in Redshift, please share them in the comments below!
A big thank you to Mike Taluc @ Bytecode IO for sharing his Redshift expertise.