From Hours to Minutes: Optimizing Massive CSV→Postgres Imports
You need to load ~100M rows (≈15 GB across four CSVs) into PostgreSQL.
A straightforward script takes 8+ hours.
By iteratively removing bottlenecks, the final pipeline drops to ~12 minutes — about 40× faster.
This article shows the full progression — from “works on my machine” to a production-ready, blazing-fast importer — and explains why each step matters.
Table of Contents
- The Challenge
- Version 1 — The Naive Baseline
- Version 2 — Easy Wins
- Version 3 — Filter at the Source
- Version 4 — Parallelism
- Version 5 — File Format Matters (CSV ≫ Excel)
- Version 6 — Faster Readers (Polars/Arrow)
- Version 7 — Database-Side Speedups
- Version 8 — The End-to-End Optimized Pipeline
- More Optimization Ideas
- Performance Comparison
- Key Takeaways
- Resources
- Conclusion
The Challenge
Data
- 4 CSVs of power generation telemetry (~100M rows total)
- Columns:
timestamp,generator_id,power_output, plus misc. metadata - Only 50 out of 1000+ generator IDs are relevant (~5% of rows)
Goals
- Ingest only relevant rows
- Avoid duplicates
- Preserve integrity constraints
- Maximize throughput
Version 1 — The Naive Baseline
Time: 8+ hours • ~3.5k rows/s
import pandas as pd
import psycopg2
def import_csv_naive(csv_file, connection):
df = pd.read_csv(csv_file) # huge memory spike
cur = connection.cursor()
for _, row in df.iterrows():
cur.execute(
"""
INSERT INTO generation_data (timestamp, generator_id, power_output)
VALUES (%s, %s, %s)
""",
(row["timestamp"], row["generator_id"], row["power"])
)
connection.commit()
Why the Naive Approach Crawls
- Reads the full file into RAM (GBs to tens of GBs in DataFrame overhead)
- One DB round-trip per row (hundreds of millions of network packets)
- No pre-filtering — you're processing work you don’t need
- Single process, single core
Version 2 — Easy Wins
Time: ~4 hours to ~7k rows/s
Chunk the read; batch the writes.
import pandas as pd
import psycopg2
import psycopg2.extras as pge
def import_csv_chunked(csv_file, connection, chunk_size=10_000):
for chunk in pd.read_csv(csv_file, chunksize=chunk_size):
records = list(
chunk[["timestamp", "generator_id", "power_output"]]
.itertuples(index=False, name=None)
)
with connection.cursor() as cur:
pge.execute_values(
cur,
"""
INSERT INTO generation_data (timestamp, generator_id, power_output)
VALUES %s
""",
records,
template="(%s, %s, %s)",
page_size=1000
)
connection.commit()
What improved
- Chunked I/O keeps memory flat.
- Batching slashes network/parse overhead.
Version 3 — Filter at the Source
Time: ~1 hour to ~28k rows/s
Avoid moving and writing rows you’ll never use.
import pandas as pd
def import_csv_filtered(csv_file, connection, relevant_generators, chunk_size=10_000):
relevant = set(relevant_generators) # O(1) lookups
for chunk in pd.read_csv(csv_file, chunksize=chunk_size):
filt = chunk[chunk["generator_id"].isin(relevant)]
if filt.empty:
continue
records = list(filt[["timestamp", "generator_id", "power_output"]]
.itertuples(index=False, name=None))
with connection.cursor() as cur:
psycopg2.extras.execute_values(
cur,
"INSERT INTO generation_data (timestamp, generator_id, power_output) VALUES %s",
records,
template="(%s, %s, %s)",
page_size=1000
)
connection.commit()
Impact
- If only ~5% of rows matter, you just eliminated ~95% of DB work.
Version 4 — Parallelism
Time: 15 minutes • ~110k rows/s
from multiprocessing import Process, Queue
import psycopg2
DATABASE_URL = "postgresql://..."
def worker(file_path, relevant_ids, result_queue):
conn = psycopg2.connect(DATABASE_URL)
try:
import_csv_filtered(file_path, conn, relevant_ids)
result_queue.put((file_path, "ok"))
except Exception as e:
result_queue.put((file_path, f"error: {e}"))
finally:
conn.close()
def import_parallel(csv_files, relevant_ids):
procs, q = [], Queue()
for path in csv_files:
p = Process(target=worker, args=(path, relevant_ids, q))
p.start()
procs.append(p)
for p in procs:
p.join()
return [q.get() for _ in procs]
Notes
- Parallel file processing ≈ linear speedup until you hit DB or I/O limits.
Version 5 — File Format Matters (CSV ≫ Excel)
Time: ~6 minutes to ~280k rows/s
Excel (.xlsx) is compressed XML—heavy to parse. CSV streams directly.
Quick Demo
import pandas as pd, time
t = time.time()
pd.read_excel("large_data.xlsx", engine="openpyxl")
print(f"Excel read: {time.time() - t:.2f}s")
t = time.time()
pd.read_csv("large_data.csv")
print(f"CSV read: {time.time() - t:.2f}s")
Convert once; reuse CSV forever
import pandas as pd
from pathlib import Path
def convert_excel_to_csv(excel_path: str) -> str:
df = pd.read_excel(excel_path, engine="openpyxl")
csv_path = str(Path(excel_path).with_suffix(".csv"))
df.to_csv(csv_path, index=False)
return csv_path
Compressed CSV keeps it small and fast
# write
df.to_csv("data.csv.gz", compression="gzip", index=False)
# read (pandas)
df = pd.read_csv("data.csv.gz", compression="gzip")
# or read with polars (auto-detects gzip)
import polars as pl
df = pl.read_csv("data.csv.gz")
Version 6 — Faster Readers (Polars/Arrow)
Time: ~5 minutes ~330k rows/s
Rust-powered Polars and C++ Arrow engines read CSVs much faster.
import polars as pl
def import_with_polars(csv_file, relevant_ids):
df = pl.read_csv(csv_file, has_header=True, n_threads=4)
filtered = df.filter(pl.col("generator_id").is_in(relevant_ids))
return filtered.to_pandas() # if your DB layer still expects pandas
Why faster
- Compiled kernels (Rust/C++)
- Columnar memory layout
- Multithreaded parsing
- Lazy plans that optimize before execution
Arrow in pandas
import pandas as pd
df = pd.read_csv(
"data.csv",
engine="pyarrow", # faster C++ parser
dtype_backend="pyarrow" # memory-efficient Arrow dtypes
)
df = pl.read_csv("data.csv.gz")
Version 7 — Database-Side Speedups
Time: ~3 minutes ~560k rows/s
Switch to COPY (Postgres’ bulk loader). It bypasses per-row INSERT overhead.
import polars as pl
import asyncpg
from io import StringIO
import asyncio
async def import_with_copy(csv_file, table="generation_data", relevant_ids=None):
df = pl.read_csv(csv_file)
if relevant_ids:
df = df.filter(pl.col("generator_id").is_in(relevant_ids))
buf = StringIO()
df.select(["timestamp", "generator_id", "power_output"]).write_csv(buf)
buf.seek(0)
conn = await asyncpg.connect("postgresql://...")
try:
# Use COPY FROM STDIN with CSV
await conn.copy_to_table(
table, source=buf, format="csv", columns=["timestamp","generator_id","power_output"]
)
finally:
await conn.close()
# asyncio.run(import_with_copy("data.csv"))
Helpful DB toggles during bulk load
-- Faster (with small durability trade-off during load window)
SET synchronous_commit = OFF;
-- More memory for index/constraint ops
SET maintenance_work_mem = '2GB';
-- Disable triggers/indexes during load, then re-enable
ALTER TABLE generation_data DISABLE TRIGGER ALL;
-- ... COPY data ...
ALTER TABLE generation_data ENABLE TRIGGER ALL;
Version 8 — The End-to-End Optimized Pipeline
Time: ~12 minutes ~140k rows/s (processing all files in parallel)
Combine everything: lazy Polars scan + early filter + in-memory dedupe + COPY + parallel workers sized to hardware.
import polars as pl
import asyncpg
from io import StringIO
import psutil
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm
import asyncio
DATABASE_URL = "postgresql://..."
class OptimizedImporter:
def __init__(self):
self.avail_mem = psutil.virtual_memory().available
self.cpu = psutil.cpu_count(logical=True)
def _chunk_rows_guess(self, bytes_per_row=200, frac=0.10):
return int(self.avail_mem * frac / bytes_per_row)
async def import_file(self, file_path, relevant_ids):
# 1) Lazy + filter before materializing
lf = pl.scan_csv(file_path) # lazy
lf = lf.filter(pl.col("generator_id").is_in(relevant_ids))
df = lf.collect(streaming=True)
# 2) Deduplicate in-memory if (generator_id, timestamp) define uniqueness
df = df.unique(subset=["generator_id", "timestamp"])
# 3) COPY into Postgres
buf = StringIO()
df.select(["timestamp", "generator_id", "power_output"]).write_csv(buf)
buf.seek(0)
conn = await asyncpg.connect(DATABASE_URL)
try:
await conn.copy_to_table(
"generation_data",
source=buf,
format="csv",
columns=["timestamp", "generator_id", "power_output"]
)
finally:
await conn.close()
def run_parallel(self, files, relevant_ids):
# Parallelize across files; async per task for COPY
with ProcessPoolExecutor(max_workers=self.cpu) as pool:
futures = [
pool.submit(asyncio.run, self.import_file(f, relevant_ids))
for f in files
]
for _ in tqdm(as_completed(futures), total=len(futures)):
pass
More Optimization Ideas
1) Data Types (big memory wins)
import pandas as pd
import polars as pl
pandas_dtypes = {
"generator_id": "int32",
"power_output": "float32",
"status": "category",
}
df_pd = pd.read_csv("data.csv", dtype=pandas_dtypes)
pl_schema = {
"generator_id": pl.Int32,
"power_output": pl.Float32,
"timestamp": pl.Datetime,
}
df_pl = pl.read_csv("data.csv", schema=pl_schema)
2) Data Types (big memory wins)
import polars as pl
def incremental_import(csv_file, last_imported_ts):
return (pl.scan_csv(csv_file)
.filter(pl.col("timestamp") > last_imported_ts)
.collect())
3) Data Types (big memory wins)
# GNU split (fastest for big plain files)
# bash
# split -n 8 bigfile.csv part_
# Or via pandas
import pandas as pd
for i, chunk in enumerate(pd.read_csv("bigfile.csv", chunksize=1_000_000)):
chunk.to_csv(f"part_{i}.csv", index=False)
4) Memory Mapping / Out-of-Core
# GNU split (fastest for big plain files)
# bash
# split -n 8 bigfile.csv part_
# Or via pandas
import pandas as pd
for i, chunk in enumerate(pd.read_csv("bigfile.csv", chunksize=1_000_000)):
chunk.to_csv(f"part_{i}.csv", index=False)
5) Connection Pooling
# GNU split (fastest for big plain files)
# bash
# split -n 8 bigfile.csv part_
# Or via pandas
import pandas as pd
for i, chunk in enumerate(pd.read_csv("bigfile.csv", chunksize=1_000_000)):
chunk.to_csv(f"part_{i}.csv", index=False)
6) Binary Intermediates (Parquet/Feather/Arrow)
# pandas
df.to_parquet("data.parquet", compression="snappy")
df = pd.read_parquet("data.parquet")
# feather
df.to_feather("data.feather")
df = pd.read_feather("data.feather")
# pyarrow
import pyarrow as pa, pyarrow.parquet as pq
tbl = pa.Table.from_pandas(df)
pq.write_table(tbl, "data.arrow")
7) Predicate Pushdown
# pandas
df.to_parquet("data.parquet", compression="snappy")
df = pd.read_parquet("data.parquet")
# feather
df.to_feather("data.feather")
df = pd.read_feather("data.feather")
# pyarrow
import pyarrow as pa, pyarrow.parquet as pq
tbl = pa.Table.from_pandas(df)
pq.write_table(tbl, "data.arrow")
8) Deduplication
# Hash-based (pandas)
import pandas as pd
def dedupe_hash(df):
h = pd.util.hash_pandas_object(df[["generator_id","timestamp"]], index=False)
df = df.assign(_h=h)
return df.drop_duplicates(subset=["_h"]).drop(columns=["_h"])
# DB upsert
UPSERT_SQL = """
INSERT INTO generation_data (generator_id, timestamp, power_output)
VALUES %s
ON CONFLICT (generator_id, timestamp)
DO UPDATE SET power_output = EXCLUDED.power_output
9) I/O & Network
# Compress payloads if you must move CSVs over the wire
import zlib
from io import StringIO
import pandas as pd
def compress_df_as_csv(df):
raw = df.to_csv(index=False).encode()
return zlib.compress(raw)
def decompress_to_df(blob):
return pd.read_csv(StringIO(zlib.decompress(blob).decode()))
10) Observe & Profile
import psutil, time
class PerfMon:
def __init__(self):
self.t0 = time.time()
self.m0 = psutil.Process().memory_info().rss / 1024 / 1024
def mark(self, label):
dt = time.time() - self.t0
m = psutil.Process().memory_info().rss / 1024 / 1024
print(f"{label}\n time: {dt:.2f}s Δmem: {m - self.m0:.0f} MB cpu: {psutil.cpu_percent()}%")
pm = PerfMon()
# ... step ...
pm.mark("after step")
Real-World Results
Here’s the full performance journey — from “please end my suffering” to “is it done already?” — on a single mid-tier laptop (i5-13600K, 32 GB RAM, SATA SSD):
| Version | Strategy | Time | Speed (rows/s) | Memory | CPU |
|---|---|---|---|---|---|
| V1 | Naive to_sql() |
8 h | 3,500 | 15 GB | 25% |
| V2 | Chunked + execute_values |
4 h | 7,000 | 2 GB | 25% |
| V3 | + Filter at source | 1 h | 28,000 | 2 GB | 25% |
| V4 | + Parallel chunks | 15 min | 110,000 | 8 GB | 95% |
| V5 | + Clean CSV format | 6 min | 280,000 | 3 GB | 95% |
| V6 | + Polars (Rust-backed) | 5 min | 330,000 | 4 GB | 95% |
| V7 | + COPY FROM |
3 min | 560,000 | 2 GB | 95% |
| V8 | Ultimate: 4 files in parallel | 12 min | 140,000per stream | 4 GB | 95% |
V8 note: Runs 4 CSV files simultaneously (e.g., one per year).
Total throughput: ~560K rows/s across all streams.
Wall-clock time wins even if per-stream speed appears lower.
Visual Timeline
V1: ████████████████████████████████████████████████ 8h
V2: ████████████████████████ 4h
V3: ██████ 1h
V4: █ 15m
V5: ▋ 6m
V6: ▌ 5m
V7: ▎ 3m (single file)
V8: ▌ 12m (all files)
Key Takeaways
- V1 → V3: Pure Python wins — 100x speedup, no new tools
- V4 → V6: Parallelism + better engines = 10x more
- V7:
COPYis king — nothing beats PostgreSQL’s native loader - V8: Scale horizontally when you have multiple files
Total speedup from V1 → V7: ~160x
From 8 hours → 3 minutes
And remember:
The fastest code is the code that never runs.
(Filter early, batch often, use the right tool at the right time.)
Resources
- Pandas: Performance & scaling
https://pandas.pydata.org/docs/user_guide/enhancingperf.html - Polars User Guide
https://pola-rs.github.io/polars-book/ - PostgreSQL: Populating a database / COPY
https://www.postgresql.org/docs/current/populate.html - Python multiprocessing
https://docs.python.org/3/library/multiprocessing.html - Apache Arrow (Python)
https://arrow.apache.org/docs/python/ - Postgres COPY vs INSERT discussion
https://www.depesz.com/2007/07/05/how-much-faster-is-copy-than-insert/ - Why Polars is fast
https://pola.rs/posts/polars-fast/ - Connection pooling primer
https://www.cockroachlabs.com/blog/what-is-connection-pooling/
Author: Md. Asraful Alom