Skip to content

Spark, Dask, and Distributed Execution

Distributed SQL is still SQL, but the dominant cost shifts from local CPU to network, shuffle, memory pressure, file layout, and skew.

flowchart TD
	SQL[SQL / DataFrame expression] --> Logical[Logical plan]
	Logical --> Optimized[Optimized plan]
	Optimized --> Physical[Physical plan]
	Physical --> Stages[Stages]
	Stages --> Tasks[Tasks over partitions]
	Tasks --> Shuffle[Shuffle when data must be repartitioned]
	Shuffle --> Result[Result]

Spark SQL, Trino-like engines, Dask DataFrame, and other distributed systems differ in details, but the interview principles are similar.

CostCauseMitigation
scan I/Oreading too many files/columns/partitionspartition pruning, column pruning, compact files
shufflejoin/group/window repartitions datapre-aggregate, broadcast small side, align partitioning
skewhot keys create straggler taskssalting, hot-key isolation, adaptive skew handling
spilltask memory insufficientreduce row width, repartition, pre-aggregate, tune memory
tiny filesscheduler and metadata overheadcompaction, target file size
small taskstoo many partitionscoalesce/repartition sensibly
giant taskstoo few partitionsincrease partition count, split skewed inputs

SQL Pattern: Filter and Project Before Shuffle

Section titled “SQL Pattern: Filter and Project Before Shuffle”
WITH base AS (
	SELECT user_id, event_date, event_name
	FROM events
	WHERE event_date >= DATE '2026-06-01'
		AND event_date < DATE '2026-07-01'
		AND event_name IN ('view', 'purchase')
)
SELECT user_id, COUNT(*) AS events
FROM base
GROUP BY user_id;

The GROUP BY user_id may shuffle. Reduce rows and columns before it.

If products is small:

SELECT
	p.category,
	COUNT(*) AS item_rows
FROM order_items oi
JOIN products p
	ON p.product_id = oi.product_id
GROUP BY p.category;

A distributed engine may broadcast products to every worker. That avoids shuffling the large order_items table. If the small side is underestimated and too large, broadcast can fail or cause memory pressure.

When both sides are large, data is repartitioned by join key.

SELECT *
FROM events e
JOIN predictions p
	ON p.entity_id = e.user_id;

Potential issues:

  • both sides scan huge ranges
  • join key has skew
  • row width is large
  • no pre-aggregation
  • join creates many-to-many explosion

Hot key example: anonymous user, default tenant, missing country, NULL, or a huge customer.

Detection:

SELECT user_id, COUNT(*) AS rows
FROM events
GROUP BY user_id
ORDER BY rows DESC
FETCH FIRST 20 ROWS ONLY;

Portable SQL may use LIMIT instead of FETCH FIRST; the intent is top keys.

Mitigations:

  • filter impossible/default keys
  • split hot keys into a separate query path
  • salt the large side and replicate the small side
  • pre-aggregate by key before join
  • choose a different partition key

Partitioning is not indexing. It is coarse pruning and task layout.

Good partition columns:

  • frequently filtered
  • moderate cardinality
  • stable
  • evenly distributed enough
  • aligned with lifecycle/backfill

Bad partition columns:

  • user ID with millions of tiny partitions
  • boolean flags
  • high-cardinality timestamps at second granularity
  • columns rarely used in filters

Columnar formats help analytics by reading only needed columns and storing min/max metadata. Text/CSV is expensive for repeated large scans. For lakehouse-style systems, file size and clustering often matter as much as query syntax.

Use SQL to express the transformation:

CREATE TEMP VIEW monthly_revenue AS
SELECT
	customer_id,
	order_month,
	SUM(total_amount) AS revenue
FROM orders
WHERE status = 'paid'
GROUP BY customer_id, order_month;

The staff-level part is explaining the physical impact:

  • WHERE status = 'paid' may not prune files unless status is clustered or metadata helps.
  • GROUP BY customer_id, order_month shuffles by those keys.
  • If month is partitioned, filtering a month prunes partitions.
  • If customer distribution is skewed, one reduce task may dominate.

Dask DataFrame often feels SQL-like:

events[events.event_date >= cutoff]
  .groupby("user_id")
  .event_id
  .count()

SQL translation:

SELECT user_id, COUNT(event_id) AS event_count
FROM events
WHERE event_date >= DATE '2026-06-01'
GROUP BY user_id;

The same concerns apply: partitioning, shuffles, known divisions, skew, memory per worker, and avoiding operations that require global coordination unless needed.

1. A Spark SQL query is slow at the final GROUP BY user_id. What do you inspect?

Check input size after filters, number of partitions, shuffle read/write bytes, task duration skew, top user_id frequencies, spill metrics, row width, and whether partial aggregation happens before shuffle. Then consider pre-aggregation, filtering bad keys, salting hot keys, or changing the aggregation grain.

2. A join of a 5 TB fact table to a 20 MB dimension is slow. What plan do you want?

Usually a broadcast/hash join with the dimension broadcast to workers, assuming 20 MB after projection/filtering and safe executor memory. Also project only needed dimension columns and filter fact partitions early.

3. Why can partitioning by user_id be terrible?

It can create huge numbers of tiny partitions or uneven partitions if user activity is skewed. It also rarely aligns with date-based backfills and retention policies. Date partitioning plus clustering by user is often more practical for event data.

4. A query gets slower after adding more workers. Why?

Possibilities: shuffle overhead dominates, too many small tasks, metadata pressure from tiny files, skew leaves one task on the critical path, broadcast overhead increases, or the source system cannot feed workers fast enough. More workers help parallelizable work, not coordination-heavy work.

5. How do you explain adaptive execution generically?

The engine uses runtime statistics to revise the physical plan, such as changing join strategy, coalescing shuffle partitions, or handling skew. It helps when compile-time estimates are wrong, but it does not fix bad semantics or pathological data layout by itself.