ETL 300: Incremental Processing Using Checkpoint Tables in Data Distiller
Turn every data update into actionable intelligence through incremental processing
Last updated
Turn every data update into actionable intelligence through incremental processing
Last updated
Imagine a large e-commerce platform managing millions of transactions daily. To keep their analytics up to date without processing vast amounts of data repeatedly, they rely on incremental processing to efficiently build and update fact tables. Instead of recalculating totals from scratch, incremental processing allows the platform to seamlessly update critical metrics like total sales (SUM) and number of transactions (COUNT) by processing only the new data. This approach drastically reduces the time and resources needed to maintain accurate business insights. For more complex operations like window functions, the system can focus on small, relevant data windows, ensuring insights like customer lifetime value or purchasing trends remain timely and precise—all while avoiding the computational strain of reprocessing the entire dataset
Imagine a marketing team running a large-scale email campaign. Every minute, new engagement metrics like email opens and click-through rates are pouring in. With incremental processing, the team can seamlessly update these metrics in real-time without recalculating data for the entire email list. This means that as new engagement data flows in, the marketing platform automatically updates reports and dashboards, allowing the team to monitor campaign performance live, make timely adjustments, and deliver more targeted follow-up emails. The result? Efficient, up-to-the-minute insights without the overhead of processing millions of records from scratch.
Consider a financial services company that tracks customer transactions to rank their top clients. Using window functions like RANK and ROW_NUMBER, the company can create insights by analyzing the entire transaction history. These functions, however, are more complex because they depend on the order of transactions and require access to the entire dataset. For example, to determine the top-spending clients or calculate a running total of transactions over time, the model must account for both previous and following rows. This makes window functions powerful for gaining deep insights, but they often require full dataset access rather than incremental updates, ensuring accurate and consistent results in critical areas like client ranking, loyalty programs, and financial forecasting
For these reasons, window functions do not lend themselves well to incremental processing:
Reordering: If new rows are inserted or deleted, the order of rows might change, which affects the window function result.
Dependencies: Window functions depend on multiple rows, so adding new data might require recomputing the results for previously processed rows.
Complex Calculations: Calculations like moving averages or running totals can’t easily be split between old and new data, as each new row could change the result for previous rows.
In machine learning (ML) use cases, incremental processing can also play a critical role in efficiently handling large datasets, especially when it comes to building and maintaining models. Let us look at some examples:
1. Feature Engineering: Imagine an online retailer using machine learning to personalize customer experiences. One key feature the model relies on is the number of times each customer has purchased a product. Instead of recalculating the total from scratch every time a new transaction occurs, incremental processing allows the system to seamlessly update this feature with each purchase. The result? A dynamic and real-time count of customer purchases, feeding into personalized recommendations and marketing efforts—without the computational overhead of reprocessing all historical data. Whether it's tracking purchase value or customer interactions, incremental processing ensures the features stay fresh and relevant, driving smarter personalization at scale
2. Incremental Model Training: Picture a global financial institution using a fraud detection system powered by machine learning. Every second, new transactions are flowing in. Instead of retraining the entire model from scratch with each new batch of data, algorithms like stochastic gradient descent (SGD) and decision trees allow the model to incrementally learn from each new transaction. This means the fraud detection system can continuously adapt to evolving fraud patterns—whether it's a new scam technique or a shift in customer behavior—on the fly. With incremental learning, the model stays one step ahead, identifying fraudulent activity in real-time without the heavy computational cost of full retraining.
3. Model Deployment and Scoring (Inference): Consider an e-commerce platform with a recommendation engine powered by machine learning. Each hour, new product interactions—like clicks, views, and purchases—are added to the system. With incremental processing, the platform’s model only needs to score the fresh batch of user data, instead of reprocessing the entire dataset. This approach not only boosts efficiency but also enables real-time responses. For example, when a customer clicks on a product, the recommendation engine immediately updates their personalized suggestions without retraining the entire model. Incremental processing ensures that the system stays agile, responsive, and efficient, even as new data flows in constantly
4. Handling Time-series Data: Imagine a retail forecasting engine that adapts to your business in real time: as each day’s sales roll in, the model instantly adjusts future demand predictions—no need to reprocess months of historical data. With Data Distiller’s incremental processing, your forecasts stay accurate and up-to-date, ensuring you're always stocked for tomorrow’s trends without the heavy computational cost
5. Updating Model Metrics: Imagine a retail company deploying a product recommendation model in production. As customer behavior shifts over time, it’s crucial to ensure the model remains accurate. Using Data Distiller's incremental processing, the company can continuously track key performance metrics like accuracy, precision, and recall as new customer interactions are processed. For example, if the model starts suggesting irrelevant products due to seasonal changes or shifts in customer preferences, incremental checks for concept drift will flag the issue in real-time. This enables the company to adjust the model quickly, maintaining the relevance and effectiveness of their recommendations without needing to fully retrain the model or recalibrate metrics manually
While incremental processing provides several benefits, it comes with challenges, especially for more complex models or use cases:
Non-incremental models: Consider a financial institution using an XGBoost-based model to predict loan defaults. While highly accurate, this tree-based model does not support incremental updates natively. When new loan applications or repayment data arrive, the model must be retrained on the entire dataset to incorporate the latest information. Although this can be computationally expensive, the retraining ensures that the model captures the full complexity of interactions in the data, maintaining its high performance. For businesses relying on models like XGBoost or ensemble methods, the investment in periodic retraining delivers more accurate, up-to-date insights, critical for making informed decisions in high-stakes industries like finance
Complex Feature Engineering: Imagine a healthcare analytics company using machine learning to predict patient outcomes based on clinical data. Some features, such as the median patient recovery time or percentile rankings of treatment effectiveness, depend on complex global patterns within the entire dataset. These features can't be updated incrementally because they require recalculating based on the full range of historical data. When new patient data arrives, the model must access the entire dataset to accurately reflect shifts in the overall distribution. While this process may be resource-intensive, it ensures that models continue to deliver precise and reliable predictions by accounting for broader trends and patterns, critical in high-accuracy fields like healthcare
Concept Drift: Imagine an online retail platform using machine learning to recommend products to customers. Over time, customer preferences shift—new trends emerge, and old favorites fade away. This phenomenon, known as concept drift, can cause the recommendation model to lose accuracy as the data patterns it was trained on change. While incremental processing helps the model adapt to new data, it might not fully capture these evolving trends. To prevent performance degradation, the platform employs continuous monitoring of the model, tracking key metrics like accuracy and customer engagement. When concept drift is detected, the system triggers a full retraining of the model, ensuring it stays aligned with the latest customer behaviors and keeps recommendations relevant. This proactive approach maximizes both customer satisfaction and business outcomes.
Picture a large enterprise using multiple data systems for sales, marketing, and customer support. Keeping these systems in sync is critical for seamless operations, but transferring massive datasets repeatedly is inefficient. With Data Distiller’s incremental processing, only the changes—new sales, updated customer profiles—are sent out on a scheduled basis. This means the systems always stay up-to-date without the need for full data refreshes, ensuring consistency across departments. By transferring only the relevant updates, Data Distiller optimizes data syncing, reducing bandwidth usage and speeding up the flow of critical business information across platforms."
The goal of our case study is to read the stock prices that were in the lab outlined in section. If you have not done that lab, you can create the stock_price_table dataset below by executing the following code. It will take about 20-30 minutes for the code to finish executing, so please be patient. We have ti to execute this code to simulate the creation of snapshots.
Our goal in today's tutorial is to figure out a way to read ONE SNAPSHOT at a time from the stock price tables, compute the cumulative stock prices for the snapshot along with the number of records, log it and then come back next time to read the next snapshot. If we find no new snapshots, we just skip the execution. Ultimately, these cumulative stock prices in our fact table will be averaged along with the sum of the records across all snapshots.
Our approach to incremental processing involves using checkpoint tables to systematically track and manage data snapshots. This method ensures efficient and reliable processing of data updates while minimizing potential issues. Here’s a more detailed explanation of our strategy:
Tracking Processed Snapshots:: We will keep a comprehensive log of all snapshots that have already been processed. This ensures that we only process new or unprocessed snapshots, avoiding redundant work and allowing us to resume from the last known state in the event of a failure.
Processing Snapshots or Collections of Snapshots:: Our logic will be designed to handle either a single snapshot or a group of snapshots in each run. This flexibility allows us to adapt to varying data volumes and processing needs, ensuring that all relevant data is processed, whether it arrives incrementally or in bulk.
Maintaining a Watermark:: We will establish a watermark system to track the most recent snapshot that has been successfully processed. By updating this watermark after each successful run, we ensure that we can resume from the correct point in future runs, always starting from the next unprocessed snapshot.
Advantages of this Approach:
Resilience to Upstream Delays: One of the key benefits of this strategy is that we do not need to worry about delays in upstream systems, such as those responsible for hydrating the stock_price_table
. Our checkpointing system will allow us to pick up where we left off, regardless of when new snapshots are generated.
Error Handling and Recovery: If any errors occur during the processing of a snapshot, the job will gracefully handle them. The subsequent runs will automatically pick up the failed or missed snapshot and process it without requiring manual intervention, ensuring smooth recovery from failures.
By implementing this incremental processing strategy with checkpoint tables, we can ensure that our system is both robust and adaptable, capable of handling upstream delays and job errors while maintaining data integrity and minimizing reprocessing.
The Checkpoint Table serves as a centralized logging mechanism to track the snapshots processed by various jobs. It ensures that each job knows which snapshot it has processed and what the status of that processing was, allowing jobs to resume processing from the correct snapshot during subsequent executions. This table is essential for managing job checkpoints and ensuring the continuity of snapshot processing.
An important assumption with snapshots is that the history_meta
function only provides snapshots for the past 7 days. If we need to retain this data for a longer period, we would need to set up a Data Distiller job that inserts snapshots into the table every week. For the purpose of this tutorial, we'll assume our job processes within the 7-day window.
Although you can design your own checkpoint table based on your specific requirements, let’s explore a common design pattern that is widely used in Data Distiller workflows. This pattern ensures efficient tracking and management of job execution and snapshot processing, helping to maintain data consistency and allowing jobs to resume from the correct state.
job_name
(STRING
, NOT NULL
): Represents the name of the job that is processing the snapshot. Each job can be identified uniquely by its name.
Example: snapshot_ingest_job
, data_cleaning_job
Constraint: This field is part of the composite primary key, ensuring that each job's checkpoint is uniquely tracked.
job_status
(STRING
, NOT NULL
): Stores the current status of the job, indicating whether the job completed successfully or encountered an error.
Possible Values: 'SUCCESS'
, 'FAILED'
, 'RUNNING'
, 'PENDING'
Example: If the job completed successfully, the value would be 'SUCCESS'
.
last_snapshot_id
(STRING
, NOT NULL
): The ID of the most recent snapshot processed by the job. This allows the job to pick up from the correct snapshot in the next execution.
Example: 12345
, 67890
Constraint: This is part of the composite primary key, ensuring that each job can only log one record for each snapshot.
job_timestamp
(TIMESTAMP
, NOT NULL
): Captures the exact date and time when the job was last run and processed the snapshot. This helps track the job's execution over time.
Example: 2024-09-25 14:35:22
Use: Useful for monitoring and debugging, especially when tracking when the job processed specific snapshots.
We will be creating the output table into which we will be writing the processeed data:
The fields are described in the following way:
snapshot_id
: Stores the ID of the snapshot currently being processed. We are storing it as an integer to allow for arithmetic operations. However, you could also store it as a string and typecast when needed to perform mathematical operations, if required.
sum_stock_price
: Stores the sum of the stock prices from the snapshot, which is of type double
(or float
depending on your system).
record_count
: Stores the count of records processed for that snapshot, which is an integer.
fact_table_timestamp
: Stores the timestamp when the processing of the snapshot occurred, which is of type TIMESTAMP
.
We insert an initial entry to initialize job metadata. The first row acts as the start of the job's history in the log, which can be referenced in future job executions.
Note that this table serves as a historical log of all job executions, making it useful for auditing. By inserting this record, you're starting the process of capturing each job run's status, including the start time and, eventually, the snapshot ID it processed. The initialization shows the first log entry for this job. Also, note the casting applied to last_snapshot_id
. We're initializing it with '0'
as the starting point for processing, but you could query the history_meta
table to explicitly determine the appropriate watermark.
We will be utilizing several variables in this section of the incremental processing code. Variables are always declared with an @
sign and are defined within an Anonymous Block, as their scope is limited to the lifetime of that block. Here's an example:
In the above query, even though the results are not streamed into the UI - the variable @MAX_STOCK_PRICE
is accessible in any of the conditions or paramaters in the queries within that block.
In Data Distiller, you can use IF THEN ELSE
semantics for conditional branching to control the flow of logic based on variables or conditions. The key idea is that variables define the branching conditions, and the predicates of these conditions can be more complex SQL code blocks themselves.
Here's a more structured example demonstrating how you can implement
IF THEN ELSE ENDIF
logic with SQL code blocks as conditions and how to utilize variables effectively in Data Distiller:
Before diving into creating SQL code blocks within an Anonymous Block, it's essential to prototype each individual block to ensure they are functionally correct. Once you place the code inside an Anonymous Block, debugging becomes a tedious process. You'll constantly need to check Queries -> Log
to sift through each query log and find which one failed and what the error was. Since errors can cause a cascading effect, debugging becomes even more challenging.
Keep in mind that the queries below will eventually need to be parameterized using variables, but since variables are only supported in Anonymous Blocks, you won't be able to use them directly here. The same applies to any conditional branching code we've covered earlier. In these cases, you'll need to manually simulate the logic by assuming fixed values.
Let us retrieve the latest snapshot in the checkpoint table:
The result should be:
Let's retrieve the snapshot from the input table that has not yet been processed. Our goal is to select the snapshot ID that comes right after the one we processed last time. Execute the following SQL code blocks one at a time. You can experiment with different values for from_snapshot_id
, which is set to 0 in the example below. For each value, such as 1, 2, or 3, it will return the next snapshot in the sequence. Notice that we are creating a temporary table using the TEMP
command to make the table easily accessible. Keep in mind that this temporary table will only exist for the duration of the Anonymous Block, unlike regular temp tables which persist for the duration of the session.
The result will be:
Execute the following function:
This is used to timestamp the output of the fact table we are creating and is also recorded in the checkpoint log table as a proxy for the time the job was processed. The function will generate a string that must be cast to a TIMESTAMP data type. Keep in mind that this timestamp serves only as a proxy since the actual job finishes processing later, after the results are written and the cluster is shut down. We don’t have access to the exact timestamps of those internal processes, making this proxy a reliable substitute.
Let ua prototype the queery that will get us the aggregations we are looking for:
The answer will be the following:
Now that we have verified all the elements, we can now put it all together. Observe the use of the various variables and the conditional branxhing logic. There are several INSERTs happening i.e. into the checkpoint and the output tables.
Here is the explanation of the code blocks:
Resolve Fallback on Snapshot Failure: The query starts by setting the system to handle snapshot failures gracefully.
Fetch the Last Processed Snapshot (@from_snapshot_id
): The query fetches the last processed snapshot ID from the checkpoint_table
for the Stock_Updates
job that has a status of 'SUCCESSFUL'
. If no such snapshot exists, it defaults to 'HEAD'
.
Fetch the Next Snapshot (@to_snapshot_id
): The query then looks for the next available snapshot ID greater than @from_snapshot_id
by selecting the minimum snapshot ID (MIN(snapshot_id)
). This ensures that snapshots are processed in sequential order.
Error Handling for Missing Snapshots: If no new snapshot is available (@to_snapshot_id IS NULL
), it raises an error with the message 'No new snapshot available on this check'
.
Record the Timestamp for Processing: If a new snapshot exists, the current timestamp is recorded in @last_updated_timestamp
.
Insert the Sum of Stock Prices and Count of Records into the Incremental Table: For the identified snapshot (@to_snapshot_id
), the query calculates the sum of stock_price
values and count of records. It inserts this along with the snapshot ID and timestamp into Stock_Updates_Incremental
.
Log the Successful Execution: After processing the snapshot, the query logs the successful execution by inserting a record into the checkpoint_table
with the job name, status (SUCCESSFUL
), snapshot ID, and timestamp.
Exception Handling: If any errors occur during execution, the query raises a custom error message 'An unexpected error occurred'
. You may not find the errors useful in the Data Distiller Editor console. It is highly recommended that you either look at the Query-Log where you will need to find the queries of interest or better go down to the Scheduled Queries Tab
Each time you execute the script above, you will see that it will insert new rows into both the fact table and the checkpoint table.
Go ahead and schedule this Anonymous Block following the steps in the tutorial here. Make sure you set the schedule for hourly so that it can keep executing each hour and you can test thee branching logic i.e it should stop executing after processing March.
Execute the following query to see the contents of the fact table Stock_Updates_Incremental:
The results should look like this:
Let us interrogate the checkpoint_table
We just saved our resources by doing incremental load. Let us write a query on this fact table to compute the average stock price.