Automating Data Pipelines with Python Classes and Functions
Handling new CSV files manually can be time-consuming and prone to errors. If you've ever had to inspect a CSV file, determine its schema, create a table, and then load the data manually, you know how tedious it can be. In this post, I’ll walk you through a Python class that automates this entire process—from reading a CSV file to dynamically creating a staging table and loading the data into a database.
This solution is great for data engineers and analysts who need a flexible, reusable approach to handling structured data.
Why Automate CSV Processing?
Every time you receive a new dataset, the following steps are required:
Load the CSV: Open and inspect the structure manually.
Determine Column Data Types: Identify text, numbers, and dates.
Write SQL DDL: Manually create a
CREATE TABLE
statement.Create the Table in a Database: Run the DDL in SQL.
Load the Data: Use SQL insert statements or a bulk loader.
Doing this manually every time is inefficient. Instead, we can automate this entire process with Python and SQLAlchemy.
The Python Class: StagingTableCreator
Let’s break down the core functionality:
Read the CSV file and extract column names.
Infer SQL data types based on the column content.
Generate a SQL
CREATE TABLE
statement dynamically.Execute the SQL to create the table in the database.
Load the data into the newly created staging table.
Here’s the full Python script that makes it happen.
import pandas as pd from sqlalchemy import create_engine, text class StagingTableCreator: def __init__(self, csv_file, connection_string, schema='staging'): """ Initialize with the CSV file path, database connection string, and target schema. """ self.csv_file = csv_file self.connection_string = connection_string self.schema = schema self.df = None self.table_name = None self.engine = create_engine(self.connection_string) def load_csv(self): """ Load the CSV into a pandas DataFrame and print the columns. """ self.df = pd.read_csv(self.csv_file) print("CSV loaded. Columns:", list(self.df.columns)) return self.df def infer_sql_types(self): """ Infer SQL data types from the pandas DataFrame types. """ type_mapping = { 'object': 'VARCHAR(3000)', 'int64': 'BIGINT', 'float64': 'FLOAT', 'bool': 'BOOLEAN', 'datetime64[ns]': 'DATETIME' } columns_info = {} for col in self.df.columns: dtype = str(self.df[col].dtype) sql_type = type_mapping.get(dtype, 'VARCHAR(3000)') columns_info[col] = sql_type print("Inferred SQL types:", columns_info) return columns_info def generate_create_table_script(self, table_name): """ Generate a CREATE TABLE SQL statement based on the CSV schema. """ self.table_name = table_name columns_info = self.infer_sql_types() cols_def = ",\n".join([f"[{col}] {sql_type}" for col, sql_type in columns_info.items()]) create_table_sql = f"CREATE TABLE {self.schema}.[{table_name}] (\n{cols_def}\n);" print("Create table script generated:\n", create_table_sql) return create_table_sql def create_staging_table(self, table_name): """ Connect to the database and execute the create table statement. """ create_table_sql = self.generate_create_table_script(table_name) with self.engine.connect() as connection: connection.execute(text(create_table_sql)) print(f"Table {self.schema}.[{table_name}] created.") return True def load_data(self): """ Bulk load the CSV data into the newly created table. """ if self.table_name is None or self.df is None: print("Table not created or CSV not loaded.") return False # Use pandas to_sql for bulk insert self.df.to_sql(self.table_name, con=self.engine, schema=self.schema, if_exists='append', index=False) print(f"Data loaded into table {self.schema}.[{self.table_name}].") return True def run(self, table_name): """ Run the entire process. """ self.load_csv() self.generate_create_table_script(table_name) self.create_staging_table(table_name) self.load_data()
Running the Script
Let’s see how to use this class in practice:
import class_methods as cm import GetParameters as gp server = gp.getParam('SQL_Server') username = gp.getParam('SQL_User') database = gp.getParam('SQL_DB') password = gp.getParam('SQL_PW') # Define your parameters csv_file = r'D:\Data\sales_data_october_challenge.csv' connection_string = f'mssql+pyodbc://{username}:{password}@{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server' table_name = 'sales_staging' # Create an instance of the class stager = cm.StagingTableCreator(csv_file, connection_string, schema='staging') # Step 1: Load the CSV df = stager.load_csv() print(df) print(df.describe()) # Step 2: Create the staging table stager.create_staging_table(table_name) # Step 3: Load the data into the table stager.load_data()
Why This Works
✅ Eliminates Manual Schema Creation – The script dynamically infers column types and creates the correct table structure.
✅ Works with Any CSV Structure – No need to adjust code for different files.
✅ Fully Automates Data Staging – Reads, creates, and loads data in one process.
✅ Scalable – Works with different datasets without modification.
Conclusion
By using Python and SQLAlchemy, we’ve completely automated CSV ingestion into a staging table. This approach is reusable, scalable, and a game-changer for data engineers working with structured data sources.
What’s your biggest challenge with automating data pipelines? Let me know in the comments below!