## The job that kept dying
We run a nightly archival job that exports a few large Postgres tables to S3 as gzipped JSONL. On paper it's a humble piece of glue: read rows, serialize, compress, upload. In practice, it was getting OOM-killed by Kubernetes most nights on the larger tenants.
The pod's memory limit was 1 GiB. The biggest table being archived had ~466K rows. At peak we measured the Go heap pushing past 700 MB before the kernel reaped us.
That ratio — 466K small rows producing hundreds of megabytes of heap — was the smell. None of those rows is large. Something in the pipeline was hoarding all of them at once.
## What the original code looked like
Roughly, the archival path was this:
```go
rows, _ := q.GetFetchQueueForArchive(ctx, domainID) // []Row, materialized
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
for _, r := range rows {
line, _ := json.Marshal(r)
gz.Write(line)
gz.Write([]byte("\n"))
}
gz.Close()
s3.PutObject(ctx, &s3.PutObjectInput{
Bucket: &bucket,
Key: &key,
Body: bytes.NewReader(buf.Bytes()),
})
```
Read that with a memory profiler in your head and the bug is obvious:
1. `GetFetchQueueForArchive` is a SQLC `:many` query. It loops over `rows.Next()` internally, scans every row into a struct, and appends to a slice. The whole result set lands in the heap before the function returns. For 466K rows of ~1 KB each, that's about **500 MB**.
2. Then we compress into `bytes.Buffer` — another **~100 MB** of gzipped bytes, in memory.
3. `s3.PutObject` requires either a `Content-Length` header or a seekable body. We satisfy that by handing it the fully-buffered bytes. So the whole compressed payload has to coexist in RAM with the source slice until at least the slice goes out of scope.
Peak heap ≈ rows slice + gzip buffer ≈ **~600 MB**. Add the rest of the process (DB driver, AWS SDK, goroutine stacks) and we cross the 1 GiB ceiling. The pod dies. K8s restarts it. It dies again on the same table.
The crucial observation: **memory usage scales linearly with table size**, even though we never need more than one row at a time to do the work. That's a design bug, not a tuning problem. No amount of bumping the memory limit fixes it — the next tenant with twice the rows blows past whatever ceiling you pick.
## The shape of the fix
The goal: **never hold more than one row's worth of data in Go memory**. The pipeline should look like a hose, not a tank. Bytes flow Postgres → JSON → gzip → S3 in a single producer/consumer pipeline, where each stage processes one small unit at a time and exerts backpressure on the previous stage.
Concretely:
```plaintext
pgx.Rows (server-side cursor)
↓ rows.Next() → scan one row → ~1 KB
json.Marshal(row)
↓ ~1 KB line
gzip.Writer ~32 KB internal compression window
↓
io.Pipe (synchronous, zero-buffer)
↓
s3manager.Uploader 5 MB per multipart part
↓
S3 (multipart upload)
```
Peak memory: 1 row + 32 KB gzip window + 5 MB part buffer × upload concurrency ≈ **~25 MB**, regardless of whether the table has 1K rows or 100M rows.
## The four primitives that make this work
### 1. `pgx.Rows` — a server-side cursor
`pool.Query()` returns a `Rows` handle backed by a Postgres cursor. `rows.Next()` pulls one row at a time across the wire; only the row currently being scanned lives in Go memory. This is fundamentally different from what SQLC's `:many` generates, which loops and appends every row into a slice before returning. The streaming path bypasses SQLC and talks to the pool directly with the raw SQL string.
### 2. `io.Pipe` — a synchronous in-memory pipe
```go
pr, pw := io.Pipe()
```
`pr` is a `Reader`, `pw` is a `Writer`. Bytes written to `pw` become readable from `pr` — with **no internal buffer**. It's a synchronization point, not a queue. `Write` blocks until something reads `pr`, and `Read` blocks until something writes `pw`. Producer and consumer rate-match each other naturally. If S3 upload stalls, our DB iteration stalls too, and memory stays flat.
### 3. `compress/gzip.Writer` — streaming compression
`gzip.NewWriter(w io.Writer)` returns a writer that compresses on the fly. As you `Write()` to it, it buffers up to ~32 KB internally, then flushes compressed bytes to the underlying writer. You never have to hold the whole input or output in memory.
### 4. `s3manager.Uploader` — multipart upload from an `io.Reader`
The naive `s3.PutObject(Body: io.Reader)` is a trap: it needs a `Content-Length` or seek support to compute the body size, neither of which a true stream provides. `s3manager.Uploader` reads the body in 5 MB chunks (configurable), uploads each as a multipart "part", and stitches them together with `CompleteMultipartUpload`. Memory bound: 5 MB × concurrency (default 5) ≈ ~25 MB, well below pod limits.
## The producer/consumer pipeline
```go
func (a *Archiver) streamJSONL(
ctx context.Context,
s3Key string,
sqlText string,
args []any,
scanFn func(rows pgx.Rows) (any, error),
) (rowCount int, err error) {
pr, pw := io.Pipe()
go func() {
// On exit, close pw with the producer's error (or nil) so the
// reader side learns the outcome.
defer func() {
if err != nil {
pw.CloseWithError(err)
} else {
pw.Close()
}
}()
gz := gzip.NewWriter(pw)
defer gz.Close() // flushes gzip trailer BEFORE pipe closes
rows, qErr := a.pool.Query(ctx, sqlText, args...)
if qErr != nil {
err = qErr
return
}
defer rows.Close()
for rows.Next() {
row, sErr := scanFn(rows)
if sErr != nil {
err = sErr
return
}
line, mErr := json.Marshal(row)
if mErr != nil {
err = mErr
return
}
if _, wErr := gz.Write(line); wErr != nil {
err = wErr
return
}
if _, wErr := gz.Write([]byte("\n")); wErr != nil {
err = wErr
return
}
rowCount++
}
if rErr := rows.Err(); rErr != nil {
err = rErr
}
}()
// Hand pr to s3manager. Blocks until the producer finishes AND all
// parts are uploaded.
upErr := a.s3Uploader.UploadStream(ctx, s3Key, pr, "application/gzip",
map[string]string{"content-encoding": "gzip"})
if upErr != nil {
pr.CloseWithError(upErr) // tell producer to stop if still running
return 0, fmt.Errorf("s3 upload failed: %w", upErr)
}
return rowCount, err
}
```
A few subtleties that look small but matter:
- **`defer gz.Close()` is registered before `defer pw.Close()` and runs first.** Gzip needs to flush its trailer bytes through `pw`. If the pipe closed first, the gzip stream on the consumer side would be truncated and unreadable.
- **`CloseWithError` on both ends.** If the producer hits a DB error mid-iteration, it closes `pw` with that error. The consumer reading `pr` sees `ErrClosedPipe` wrapping the cause, and `s3manager` aborts the multipart upload, sending `AbortMultipartUpload` to S3 — no orphaned partial uploads. The same trick goes the other way: if the S3 upload fails, we `pr.CloseWithError(upErr)` so the producer's next `gz.Write` returns and the goroutine exits instead of hanging.
- **No mutex needed.** `io.Pipe` is its own synchronization primitive; the producer's writes serialize naturally against the consumer's reads.
- **Reading `rowCount` after the call is safe** because `UploadStream` only returns after `pr` is fully drained, which only happens after the producer goroutine closes `pw`. By the time we return, the producer is done writing to `rowCount`.
## What the numbers looked like after
Same 466K-row table, same pod, same 1 GiB memory limit:
- **Before:** heap peaked around 700 MB; OOM-killed roughly 4 nights out of 5.
- **After:** heap held flat at ~30 MB through the whole archive. Zero OOM kills since deploy.
Wall-clock time went up slightly (a couple of percent) because we no longer issue one fat `PutObject` — but that's a fair trade for not getting murdered by the kernel.
## What streaming doesn't fix
Worth being honest about the limits:
- **DB-side memory.** Some of the queries powering these archives do non-trivial joins. That's Postgres' problem, not the pod's. Streaming doesn't make the planner cheaper.
- **Retry cost.** If the upload fails 90% of the way through, we re-iterate from row zero on retry. For 50M-row tables that's slow but still memory-flat. Idempotent multipart resumption is a follow-up, not a blocker.
- **In-memory tree builds elsewhere.** Other parts of the system still build whole trees in memory before serializing. They're small enough today that it's fine, but it's the same anti-pattern waiting to bite.
## The general lesson
Whenever your data pipeline's memory usage scales with input size and you don't actually need random access to the data, you have a streaming bug waiting to happen. The fix is almost always the same set of primitives: a cursor on the source, an `io.Pipe` for backpressure, a streaming codec in the middle, and a chunked uploader at the sink. Bound the working set to a few megabytes and the pipeline stops caring whether you throw a thousand rows at it or a billion.