Data Engineering Zoomcamp week 4: dlt (data load tool) workshop
This workshop focused on building robust, scalable and self maintaining pipelines.
Hello Data Enthusiasts!
I'm excited to share insights from our recent workshop focused on crafting robust, scalable and self-maintaining pipelines. Led by the knowledgeable Adrian Brudaru from dlthub, this session was a deep dive into the intricacies of data engineering. If you missed it, don't worry - the entire workshop has been recorded and is available on YouTube.
Exploring Data Ingestion Pipelines
We explored the essential steps of constructing data ingestion pipelines, covering everything from extracting data from APIs or files to normalising and loading data. This foundational knowledge lays the groundwork for efficient data management and analysis within organisations.
Data Engineers role
For Data Engineers a crucial role is to design, build and maintain the infrastructure necessary for data generation, storage, processing and analysis. They ensure the smooth flow of data throughout an organisation by creating and optimising data pipelines for ETL (Extract, Transform, Load) or ELT (Extract, Load, Transform) processes.
The choice between ETL and ELT depends on various factors including the nature of your data, the requirements of your analytics and the capabilities of your data infrastructure. This should be guided by your specific use case, data requirements, performance considerations and available resources.
Factors to consider when choosing between ETL and ELT include:
Data Volume: ETL may be more suitable for smaller datasets that require significant transformation before loading, while ELT is often used for large-scale data processing where raw data can be loaded into the target system without extensive preprocessing.
Latency Requirements: ETL pipelines may introduce latency due to the transformation phase, whereas ELT pipelines can provide near real-timeinsights by loading raw data directly into the target system for on-the-fly analysis.
Tooling and Infrastructure: Consider the capabilities of your data infrastructure and the tools available for ETL and ELT. Some data warehouses and cloud platforms may provide native support for ELT, simplifying implementation and management.
Dlt offers functionality to support the entire Extract and Load process.
Introducing dlt: Simplifying Data Loading
Dlt (data load tool), an open-source library is designed to simplify data loading from various sources into well-structured datasets. Dlt automates many tedious tasks faced by Data Engineers, including schema inference, data typing and stream processing, making pipeline development more efficient and robust.
It does this by first extracting the data from an API or a similar source, then normalising it to a schema, which will automatically evolve to any future source data changes (e.g. new fields or tables) and finally loading it to the location where you will store it (e.g. Google BigQuery).
Installation instructions:
Installing the software should ideally be straightforward. However, certain system configurations and updates may necessitate additional steps. Follow the instructions below for successful installation:
Using pip Command:
pip install dlt
In cases where Python installations are managed through macOS and the default pip command does not suffice, an alternative approach may be necessary.
Adjustments for macOS Compatibility:
If utilising macOS, particularly after the release of macOS Big Sur, adjustments may be required due to changes in system configurations, notably with Python installations and associated commands.
If Python 3 is the primary interpreter, with pip3 installation of dlt you can get this error when you try to start your dlt code:
Error messages such as ModuleNotFoundError: No module named 'dlt' may surface despite successful installations. This could be attributed to system-wide changes in Python installations, necessitating a recalibration of command paths.
Path Inclusion for Python Executable:
Resolve potential errors like zsh: command not found: pip and zsh: command not found: python by explicitly incorporating Python's executable path into the system's environment variables.
Edit the shell profile file (~/.zshrc) and append the following line to include Python's executable path:
export PATH="$PATH:/usr/bin/python"
By incorporating these adjustments, compatibility issues stemming from system updates, particularly on macOS, can be mitigated, ensuring a smoother installation experience and operational efficacy.
In this workshop we used duckdb as a destination.
DuckDB is a fast in-process analytical database with zero external dependencies and runs in-process in its host application or as a single binary.
To install it use this command:
pip install dlt[duckdb]
Using dlt
At the start of our workshop, we talked about getting data from different places, like websites or files. We use things called APIs to do this. Sometimes the data comes in big chunks, like a whole bunch of records, and sometimes it's organised into smaller pieces, like lines in a text file.
Getting this data smoothly can be tricky. We need to be careful about how much space we're using in our computers and make sure our internet connection is stable. Sometimes, the websites we get data from have rules about how often we can get it, so we have to be mindful of that too.
To handle these challenges, we use clever ways to manage our computer's memory. Instead of trying to load all the data at once, we can grab it bit by bit, kind of like sipping a drink through a straw instead of gulping it all down at once. This helps us work with big sets of data without crashing our computers.
To show how this works, we looked at a simple example where we downloaded data from a file one line at a time, instead of all at once. This makes it easier for our computer to handle, like reading a book page by page instead of all at once.
import requests
import json
url = "https://storage.googleapis.com/dtc_zoomcamp_api/yellow_tripdata_2009-06.jsonl"
def stream_download_jsonl(url):
response = requests.get(url, stream=True)
response.raise_for_status() # Raise an HTTPError for bad responses
for line in response.iter_lines():
if line:
yield json.loads(line)
# time the download
import time
start = time.time()
# Use the generator to iterate over rows with minimal memory usage
row_counter = 0
for row in stream_download_jsonl(url):
print(row)
row_counter += 1
if row_counter >= 5:
break
# time the download
end = time.time()
print(end - start)
In Python, yield is a keyword used in the context of generator functions. Generator functions are special functions that allow you to generate a sequence of values lazily, meaning they produce values one at a time and only when needed. This is in contrast to regular functions, which typically compute all their results at once and return them in one go.
In the next example we loaded a nested json to “duckdb” and made our first pipeline:
import dlt
data = [
{
"vendor_name": "VTS",
"record_hash": "b00361a396177a9cb410ff61f20015ad",
"time": {
"pickup": "2009-06-14 23:23:00",
"dropoff": "2009-06-14 23:48:00"
},
"Trip_Distance": 17.52,
"coordinates": {
"start": {
"lon": -73.787442,
"lat": 40.641525
},
"end": {
"lon": -73.980072,
"lat": 40.742963
}
},
"Rate_Code": None,
"store_and_forward": None,
"Payment": {
"type": "Credit",
"amt": 20.5,
"surcharge": 0,
"mta_tax": None,
"tip": 9,
"tolls": 4.15,
"status": "booked"
},
"Passenger_Count": 2,
"passengers": [
{"name": "John", "rating": 4.9},
{"name": "Jack", "rating": 3.9}
],
"Stops": [
{"lon": -73.6, "lat": 40.6},
{"lon": -73.5, "lat": 40.5}
]
},
]
# Define the connection to load to.
# We now use DuckDB, but you can switch to BigQuery later.
pipeline = dlt.pipeline(pipeline_name="taxi_data",
destination='duckdb',
dataset_name='taxi_rides')
# Run the pipeline with default settings, and capture the outcome.
info = pipeline.run(data,
table_name="users",
write_disposition="replace")
# Show the outcome.
print(info)
If you are running dlt locally you can use the built in “streamlit” app by running the cli command with the pipeline name we chose above.
First installation: pip install streamlit
Then cli command: dlt pipeline taxi_data show
Incremental Loading
We concluded the workshop by exploring the concept of incremental loading, a technique that updates datasets with new data by loading only the new data rather than replacing the entire dataset. This approach enhances pipeline efficiency and cost-effectiveness, working in tandem with incremental extraction and state management.
There are two methods supported by “dlt” for incremental loading:
Append, suitable for immutable or stateless events;
Merge, for updating changing data such as payment statuses.
Conclusion
Intrigued to learn more about “dlt” and its capabilities? Dive into the documentation to explore its functionalities and discover how it can streamline your data engineering workflows.
If you are interested to see more code you can look on my GitHub.
This week is going to be interesting as we are learning all about Analytics Engineering and “dbt”.
So, Data Engineers keep coding and keep exploring.