Optimizing Massive CSV→Postgres Imports

blog image

From Hours to Minutes: Optimizing Massive CSV→Postgres Imports

You need to load ~100M rows (≈15 GB across four CSVs) into PostgreSQL.
A straightforward script takes 8+ hours.
By iteratively removing bottlenecks, the final pipeline drops to ~12 minutes — about 40× faster.

This article shows the full progression — from “works on my machine” to a production-ready, blazing-fast importer — and explains why each step matters.

Table of Contents

  1. The Challenge
  2. Version 1 — The Naive Baseline
  3. Version 2 — Easy Wins
  4. Version 3 — Filter at the Source
  5. Version 4 — Parallelism
  6. Version 5 — File Format Matters (CSV ≫ Excel)
  7. Version 6 — Faster Readers (Polars/Arrow)
  8. Version 7 — Database-Side Speedups
  9. Version 8 — The End-to-End Optimized Pipeline
  10. More Optimization Ideas
  11. Performance Comparison
  12. Key Takeaways
  13. Resources
  14. Conclusion

The Challenge

Data

  • 4 CSVs of power generation telemetry (~100M rows total)
  • Columns: timestamp, generator_id, power_output, plus misc. metadata
  • Only 50 out of 1000+ generator IDs are relevant (~5% of rows)

Goals

  • Ingest only relevant rows
  • Avoid duplicates
  • Preserve integrity constraints
  • Maximize throughput

Version 1 — The Naive Baseline

Time: 8+ hours • ~3.5k rows/s

import pandas as pd
import psycopg2

def import_csv_naive(csv_file, connection):
    df = pd.read_csv(csv_file)  # huge memory spike

    cur = connection.cursor()
    for _, row in df.iterrows():
        cur.execute(
            """
            INSERT INTO generation_data (timestamp, generator_id, power_output)
            VALUES (%s, %s, %s)
            """,
            (row["timestamp"], row["generator_id"], row["power"])
        )
    connection.commit()

Why the Naive Approach Crawls

  1. Reads the full file into RAM (GBs to tens of GBs in DataFrame overhead)
  2. One DB round-trip per row (hundreds of millions of network packets)
  3. No pre-filtering — you're processing work you don’t need
  4. Single process, single core

Version 2 — Easy Wins

Time: ~4 hours to ~7k rows/s

Chunk the read; batch the writes.

import pandas as pd
import psycopg2
import psycopg2.extras as pge

def import_csv_chunked(csv_file, connection, chunk_size=10_000):
    for chunk in pd.read_csv(csv_file, chunksize=chunk_size):
        records = list(
            chunk[["timestamp", "generator_id", "power_output"]]
            .itertuples(index=False, name=None)
        )

        with connection.cursor() as cur:
            pge.execute_values(
                cur,
                """
                INSERT INTO generation_data (timestamp, generator_id, power_output)
                VALUES %s
                """,
                records,
                template="(%s, %s, %s)",
                page_size=1000
            )
        connection.commit()

What improved

  1. Chunked I/O keeps memory flat.
  2. Batching slashes network/parse overhead.

Version 3 — Filter at the Source

Time: ~1 hour to ~28k rows/s

Avoid moving and writing rows you’ll never use.

import pandas as pd

def import_csv_filtered(csv_file, connection, relevant_generators, chunk_size=10_000):
    relevant = set(relevant_generators)  # O(1) lookups

    for chunk in pd.read_csv(csv_file, chunksize=chunk_size):
        filt = chunk[chunk["generator_id"].isin(relevant)]
        if filt.empty:
            continue

        records = list(filt[["timestamp", "generator_id", "power_output"]]
                       .itertuples(index=False, name=None))

        with connection.cursor() as cur:
            psycopg2.extras.execute_values(
                cur,
                "INSERT INTO generation_data (timestamp, generator_id, power_output) VALUES %s",
                records,
                template="(%s, %s, %s)",
                page_size=1000
            )
        connection.commit()

Impact

  1. If only ~5% of rows matter, you just eliminated ~95% of DB work.

Version 4 — Parallelism

Time: 15 minutes • ~110k rows/s

from multiprocessing import Process, Queue
import psycopg2

DATABASE_URL = "postgresql://..."

def worker(file_path, relevant_ids, result_queue):
    conn = psycopg2.connect(DATABASE_URL)
    try:
        import_csv_filtered(file_path, conn, relevant_ids)
        result_queue.put((file_path, "ok"))
    except Exception as e:
        result_queue.put((file_path, f"error: {e}"))
    finally:
        conn.close()

def import_parallel(csv_files, relevant_ids):
    procs, q = [], Queue()
    for path in csv_files:
        p = Process(target=worker, args=(path, relevant_ids, q))
        p.start()
        procs.append(p)
    for p in procs:
        p.join()
    return [q.get() for _ in procs]

Notes

  1. Parallel file processing ≈ linear speedup until you hit DB or I/O limits.

Version 5 — File Format Matters (CSV ≫ Excel)

Time: ~6 minutes to ~280k rows/s

Excel (.xlsx) is compressed XML—heavy to parse. CSV streams directly.

Quick Demo

import pandas as pd, time

t = time.time()
pd.read_excel("large_data.xlsx", engine="openpyxl")
print(f"Excel read: {time.time() - t:.2f}s")

t = time.time()
pd.read_csv("large_data.csv")
print(f"CSV read:   {time.time() - t:.2f}s")

Convert once; reuse CSV forever

import pandas as pd
from pathlib import Path

def convert_excel_to_csv(excel_path: str) -> str:
    df = pd.read_excel(excel_path, engine="openpyxl")
    csv_path = str(Path(excel_path).with_suffix(".csv"))
    df.to_csv(csv_path, index=False)
    return csv_path

Compressed CSV keeps it small and fast

# write
df.to_csv("data.csv.gz", compression="gzip", index=False)

# read (pandas)
df = pd.read_csv("data.csv.gz", compression="gzip")

# or read with polars (auto-detects gzip)
import polars as pl
df = pl.read_csv("data.csv.gz")

Version 6 — Faster Readers (Polars/Arrow)

Time: ~5 minutes ~330k rows/s

Rust-powered Polars and C++ Arrow engines read CSVs much faster.

import polars as pl

def import_with_polars(csv_file, relevant_ids):
    df = pl.read_csv(csv_file, has_header=True, n_threads=4)
    filtered = df.filter(pl.col("generator_id").is_in(relevant_ids))
    return filtered.to_pandas()  # if your DB layer still expects pandas

Why faster

  1. Compiled kernels (Rust/C++)
  2. Columnar memory layout
  3. Multithreaded parsing
  4. Lazy plans that optimize before execution

Arrow in pandas

import pandas as pd
df = pd.read_csv(
    "data.csv",
    engine="pyarrow",        # faster C++ parser
    dtype_backend="pyarrow"  # memory-efficient Arrow dtypes
)

df = pl.read_csv("data.csv.gz")

Version 7 — Database-Side Speedups

Time: ~3 minutes ~560k rows/s

Switch to COPY (Postgres’ bulk loader). It bypasses per-row INSERT overhead.

import polars as pl
import asyncpg
from io import StringIO
import asyncio

async def import_with_copy(csv_file, table="generation_data", relevant_ids=None):
    df = pl.read_csv(csv_file)
    if relevant_ids:
        df = df.filter(pl.col("generator_id").is_in(relevant_ids))

    buf = StringIO()
    df.select(["timestamp", "generator_id", "power_output"]).write_csv(buf)
    buf.seek(0)

    conn = await asyncpg.connect("postgresql://...")
    try:
        # Use COPY FROM STDIN with CSV
        await conn.copy_to_table(
            table, source=buf, format="csv", columns=["timestamp","generator_id","power_output"]
        )
    finally:
        await conn.close()

# asyncio.run(import_with_copy("data.csv"))

Helpful DB toggles during bulk load

-- Faster (with small durability trade-off during load window)
SET synchronous_commit = OFF;

-- More memory for index/constraint ops
SET maintenance_work_mem = '2GB';

-- Disable triggers/indexes during load, then re-enable
ALTER TABLE generation_data DISABLE TRIGGER ALL;
-- ... COPY data ...
ALTER TABLE generation_data ENABLE TRIGGER ALL;

Version 8 — The End-to-End Optimized Pipeline

Time: ~12 minutes ~140k rows/s (processing all files in parallel)

Combine everything: lazy Polars scan + early filter + in-memory dedupe + COPY + parallel workers sized to hardware.

import polars as pl
import asyncpg
from io import StringIO
import psutil
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm
import asyncio

DATABASE_URL = "postgresql://..."

class OptimizedImporter:
    def __init__(self):
        self.avail_mem = psutil.virtual_memory().available
        self.cpu = psutil.cpu_count(logical=True)

    def _chunk_rows_guess(self, bytes_per_row=200, frac=0.10):
        return int(self.avail_mem * frac / bytes_per_row)

    async def import_file(self, file_path, relevant_ids):
        # 1) Lazy + filter before materializing
        lf = pl.scan_csv(file_path)  # lazy
        lf = lf.filter(pl.col("generator_id").is_in(relevant_ids))
        df = lf.collect(streaming=True)

        # 2) Deduplicate in-memory if (generator_id, timestamp) define uniqueness
        df = df.unique(subset=["generator_id", "timestamp"])

        # 3) COPY into Postgres
        buf = StringIO()
        df.select(["timestamp", "generator_id", "power_output"]).write_csv(buf)
        buf.seek(0)

        conn = await asyncpg.connect(DATABASE_URL)
        try:
            await conn.copy_to_table(
                "generation_data",
                source=buf,
                format="csv",
                columns=["timestamp", "generator_id", "power_output"]
            )
        finally:
            await conn.close()

    def run_parallel(self, files, relevant_ids):
        # Parallelize across files; async per task for COPY
        with ProcessPoolExecutor(max_workers=self.cpu) as pool:
            futures = [
                pool.submit(asyncio.run, self.import_file(f, relevant_ids))
                for f in files
            ]
            for _ in tqdm(as_completed(futures), total=len(futures)):
                pass

More Optimization Ideas

1) Data Types (big memory wins)

import pandas as pd
import polars as pl

pandas_dtypes = {
    "generator_id": "int32",
    "power_output": "float32",
    "status": "category",
}

df_pd = pd.read_csv("data.csv", dtype=pandas_dtypes)

pl_schema = {
    "generator_id": pl.Int32,
    "power_output": pl.Float32,
    "timestamp": pl.Datetime,
}
df_pl = pl.read_csv("data.csv", schema=pl_schema)

2) Data Types (big memory wins)

import polars as pl

def incremental_import(csv_file, last_imported_ts):
    return (pl.scan_csv(csv_file)
              .filter(pl.col("timestamp") > last_imported_ts)
              .collect())

3) Data Types (big memory wins)

# GNU split (fastest for big plain files)
# bash
# split -n 8 bigfile.csv part_

# Or via pandas
import pandas as pd
for i, chunk in enumerate(pd.read_csv("bigfile.csv", chunksize=1_000_000)):
    chunk.to_csv(f"part_{i}.csv", index=False)

4) Memory Mapping / Out-of-Core

# GNU split (fastest for big plain files)
# bash
# split -n 8 bigfile.csv part_

# Or via pandas
import pandas as pd
for i, chunk in enumerate(pd.read_csv("bigfile.csv", chunksize=1_000_000)):
    chunk.to_csv(f"part_{i}.csv", index=False)

5) Connection Pooling

# GNU split (fastest for big plain files)
# bash
# split -n 8 bigfile.csv part_

# Or via pandas
import pandas as pd
for i, chunk in enumerate(pd.read_csv("bigfile.csv", chunksize=1_000_000)):
    chunk.to_csv(f"part_{i}.csv", index=False)

6) Binary Intermediates (Parquet/Feather/Arrow)

# pandas
df.to_parquet("data.parquet", compression="snappy")
df = pd.read_parquet("data.parquet")

# feather
df.to_feather("data.feather")
df = pd.read_feather("data.feather")

# pyarrow
import pyarrow as pa, pyarrow.parquet as pq
tbl = pa.Table.from_pandas(df)
pq.write_table(tbl, "data.arrow")

7) Predicate Pushdown

# pandas
df.to_parquet("data.parquet", compression="snappy")
df = pd.read_parquet("data.parquet")

# feather
df.to_feather("data.feather")
df = pd.read_feather("data.feather")

# pyarrow
import pyarrow as pa, pyarrow.parquet as pq
tbl = pa.Table.from_pandas(df)
pq.write_table(tbl, "data.arrow")

8) Deduplication

# Hash-based (pandas)
import pandas as pd
def dedupe_hash(df):
    h = pd.util.hash_pandas_object(df[["generator_id","timestamp"]], index=False)
    df = df.assign(_h=h)
    return df.drop_duplicates(subset=["_h"]).drop(columns=["_h"])

# DB upsert
UPSERT_SQL = """
INSERT INTO generation_data (generator_id, timestamp, power_output)
VALUES %s
ON CONFLICT (generator_id, timestamp)
DO UPDATE SET power_output = EXCLUDED.power_output

9) I/O & Network

# Compress payloads if you must move CSVs over the wire
import zlib
from io import StringIO
import pandas as pd

def compress_df_as_csv(df):
    raw = df.to_csv(index=False).encode()
    return zlib.compress(raw)

def decompress_to_df(blob):
    return pd.read_csv(StringIO(zlib.decompress(blob).decode()))

10) Observe & Profile

import psutil, time

class PerfMon:
    def __init__(self):
        self.t0 = time.time()
        self.m0 = psutil.Process().memory_info().rss / 1024 / 1024

    def mark(self, label):
        dt = time.time() - self.t0
        m = psutil.Process().memory_info().rss / 1024 / 1024
        print(f"{label}\n  time: {dt:.2f}s  Δmem: {m - self.m0:.0f} MB  cpu: {psutil.cpu_percent()}%")

pm = PerfMon()
# ... step ...
pm.mark("after step")

Real-World Results

Here’s the full performance journey — from “please end my suffering” to “is it done already?” — on a single mid-tier laptop (i5-13600K, 32 GB RAM, SATA SSD):

Version Strategy Time Speed (rows/s) Memory CPU
V1 Naive to_sql() 8 h 3,500 15 GB 25%
V2 Chunked + execute_values 4 h 7,000 2 GB 25%
V3 + Filter at source 1 h 28,000 2 GB 25%
V4 + Parallel chunks 15 min 110,000 8 GB 95%
V5 + Clean CSV format 6 min 280,000 3 GB 95%
V6 + Polars (Rust-backed) 5 min 330,000 4 GB 95%
V7 + COPY FROM 3 min 560,000 2 GB 95%
V8 Ultimate: 4 files in parallel 12 min 140,000per stream 4 GB 95%

V8 note: Runs 4 CSV files simultaneously (e.g., one per year).
Total throughput: ~560K rows/s across all streams.
Wall-clock time wins even if per-stream speed appears lower.


Visual Timeline

V1: ████████████████████████████████████████████████  8h
V2: ████████████████████████                         4h
V3: ██████                                           1h
V4:15m
V5:6m
V6:5m
V7:3m (single file)
V8:12m (all files)

Key Takeaways

  • V1 → V3: Pure Python wins — 100x speedup, no new tools
  • V4 → V6: Parallelism + better engines = 10x more
  • V7: COPY is king — nothing beats PostgreSQL’s native loader
  • V8: Scale horizontally when you have multiple files

Total speedup from V1 → V7: ~160x
From 8 hours → 3 minutes

And remember:

The fastest code is the code that never runs.
(Filter early, batch often, use the right tool at the right time.)

Resources

Author: Md. Asraful Alom