1.5 Years of Spark Knowledge in 8 Tips | by Michael Berk | Dec, 2023


My learnings from Databricks customer engagements

spark partition data skew optimize optimization pyspark sql python UI partition
Figure 1: a technical diagram of how to write apache spark. Image by author.

At Databricks, I help large retail organizations deploy and scale data and machine learning pipelines. Here are the 8 most important spark tips/tricks I’ve learned in the field.

Throughout this post, we assume a general working knowledge of spark and it’s structure, but this post should be accessible to all levels.

Let’s dive in!

Quickly, let’s review what spark does…

Spark is a big data processing engine. It takes python/java/scala/R/SQL and converts that code into a highly optimized set of transformations.

spark partition data skew optimize optimization pyspark sql python UI partition
Figure 2: spark driver and worker configuration. Image by author.

At it’s lowest level, spark creates tasks, which are parallelizable transformations on data partitions. These tasks are then distributed across from a driver node to worker nodes, which are responsible for leveraging their CPU cores to complete the transformations. By distributing tasks to potentially many workers, spark allows us to horizontally scale and thereby support complex data pipelines that would be impossible on a single machine.

Ok, hopefully not all of that was new information. Either way, in the following sections we’ll slow down a bit. These tips should help both novices and intermediates at spark.

Spark is complex. To help both you and potentially others understand its structure, let’s leverage an impressively good analogy borrowed from queueing theory: spark is a grocery store.

When thinking about the distributed computing component of spark, there are three main components….

  • Data partitions: subsets of rows of our data. In our grocery store, they are groceries.
  • Spark tasks: low-level transformations performed on a data partition. In our grocery store, they are customers.
  • Cores: the part of your processor(s) that do work in parallel. In our grocery store, they are cashiers.

That’s it!

Now, let’s leverage these concepts to talk through some fundamentals of spark.

spark partition data skew optimize optimization pyspark sql python UI partition
Figure 3: illustration of the cashier analog, specifically for data skew. Image by author.

As show in figure 3, our cashiers (cores) can only process one customer (task) at a time. Furthermore, some customers have a lot of groceries (partition row count), as shown by the first customer at cashier 2. From these simple observations…

  • The more cashiers (cores), the more customers (tasks) you can process in parallel. This is horizontal/vertical scaling.
  • If you don’t have enough customers (tasks) to saturate your cashiers (cores), you’ll be paying for the cashier to sit there. This relates to autoscaling, cluster sizing, and partition sizing.
  • If customers (tasks) have very different amounts of groceries (partition row counts), you’ll see uneven utilization of your cashiers. This is data skew.
  • The better your cashiers (cores), the faster they can process a single customer (task). This relates to upgrading your processor.
  • etc.

Given the analogy comes from queueing theory, a field directly related to distributed computing, it’s quite powerful!

Use this analogy to debug, communicate, and develop spark.

The most common mistake for spark novices is misunderstanding lazy evaluation.

Lazy evaluation means that no data transformations will be performed until you invoke a collection to memory. Examples of methods that invoke a collection include but are not limited to…

  • .collect(): bring the DataFrame into memory as a python list.
  • .show(): print the first n rows of your DataFrame.
  • .count(): get the number of rows of your DataFrame.
  • .first(): get the first row of your DataFrame.

The single most common incorrect collection method is leveraging .count() throughout a program. Every time you invoke a collection, all upstream transformations will be recomputed from scratch, so if you have 5 invocations of .count(), your program will asymptotically run 5x as long.

Spark is lazily evaluated! Pipelines should have a single flow from source(s) to target(s).

A surprisingly common issue that’s come up when working with large organizations is they lose sight of the big picture and thereby optimize pipelines in an inefficient manner.

Here’s how pipelines should be optimized for the majority of use cases…

  1. Ask if we need to do the project. Put simply, think about what you’re actually getting from optimizing a pipeline. If you expect to improve runtime by 20% and the pipeline costs $100 to run, should you invest your extremely expensive data engineer’s salary to save $20 per run? Maybe. Maybe not.
  2. Look for low hanging fruit in the code. After agreeing to do the project, check if the code has obvious flaws. Examples are misuse of lazy evaluation, unnecessary transformations, and incorrect ordering of transformations.
  3. Get the job running under the SLA by leveraging compute. After checking that the code is relatively efficient, just throw compute at the problem so you can 1) meet the SLA and, 2) gather statistics from the spark UI.
  4. Stop. If you’re properly saturating your compute and cost isn’t egregious, do some last minute compute improvements then stop. Your time is valuable. Don’t waste it saving dollars when you could be creating thousands of dollars elsewhere.
  5. Deep dive. Finally, if you really need to deep dive because cost is unacceptable, then roll up your sleeves and optimize data, code, and compute.

The beauty of this framework is that 1–4 only require cursory knowledge of spark and are very quick to execute; sometimes you can collect information on steps 1–4 during a 30 minute call. The framework also ensures that we’ll stop as soon as we are good enough. Finally, if step 5 is needed, we can delegate that to those on the team who are strongest at spark.

By finding all the ways to avoid over-optimizing a pipeline, you’re saving precious developer hours.

Disk spill is the single most common reason that spark jobs run slow.

It’s a very simple concept. Spark is designed to leverage in-memory processing. If you don’t have enough memory, spark will try to write the extra data to disk to prevent your process from crashing. This is called disk spill.

spark partition data skew optimize optimization pyspark sql python UI partition
Figure 4: screen shot of the spark UI highlighting disk spill. Image by author.

Writing to and reading from disk is slow, so it should be avoided. If you want to learn how to identify and mitigate spill, follow this tutorial. However, some very common and simple methods to mitigate spill are…

  1. Process less data per task, which can be achieved by changing the partition count via spark.shuffle.partitions or repartition.
  2. Increase the RAM to core ratio in your compute.

If you want your job to run optimally, prevent spill.

Whether you’re using scala, java, python, SQL, or R, spark will always leverage the same transformations under the hood. So, use the the right language for your task.

SQL is the least verbose “language” out of all supported spark languages for many operations! More tangibly:

  • If you’re adding or modifying a column, use selectExpr or expr, especially paired with Python’s f-strings.
  • If you need complex SQL, create temp views then use spark.sql().

Here are two quick examples…

# Column rename and cast with SQL
df = df.selectExpr([f"{c}::int as {c}_abc" for c in df.columns])

# Column rename and cast with native spark
for c in df.columns:
df = df.withColumn(f"{c}_abc", F.col(c).cast("int")).drop(c)

# Window functions with SQL
df.withColumn("running_total", expr(
"sum(value) over (order by id rows between unbounded preceding and current row)"
))

# Window functions with native spark
windowSpec = Window.orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_with_running_total_native = df.withColumn("running_total", F.sum("value").over(windowSpec))

Use SQL.

Do you need to read a bunch of data files stored in a complex directory? If so, use spark’s extremely powerful read options.

The first time I encountered this problem, I rewrote os.walk to work with my cloud provider where data was stored. I very proudly showed this method to my project partner who simply said, “let me share my screen,” and proceeded to introduce me to glob filters.

# Read all parquet files in the directory (and subdirectories)
df = spark.read.load(
"examples/src/main/resources/dir1",
format="parquet",
pathGlobFilter="*.parquet"
)

When I applied the glob filter shown above instead of my custom os.walk, the ingestion operation was over 10x faster.

Spark has powerful parameters. Check if your desired functionality exists before building bespoke implementations.

Loops are almost always detrimental to spark performance. Here’s why…

Spark has two core phases — planning and execution. In the planning phase, spark creates a directed acyclical graph (DAG) which indicates how your specified transformations will be carried out. The planning phase is relatively expensive and can sometimes take several seconds, so you want to invoke it as infrequently as possible.

Let’s discuss a use case where you must iterate through many DataFrames, perform expensive transformations, then append them to a table.

First, there is native support for nearly all iterative use cases, specifically pandas UDFs, window functions, and joins. But, if you truly do need a loop, here’s how you invoke a single planning phase and thereby get all transformations in a single DAG.

import functools
from pyspark.sql import DataFrame

paths = get_file_paths()

# BAD: For loop
for path in paths:
df = spark.read.load(path)
df = fancy_transformations(df)
df.write.mode("append").saveAsTable("xyz")

# GOOD: functools.reduce
lazily_evaluated_reads = [spark.read.load(path) for path in paths]
lazily_evaluted_transforms = [fancy_transformations(df) for df in lazily_evaluated_reads]
unioned_df = functools.reduce(DataFrame.union, lazily_evaluted_transforms)
unioned_df.write.mode("append").saveAsTable("xyz")

The first solution uses a for loop to iterate over paths, do fancy transformations, then append to our delta table of interest. In the second, we store a list of lazily evaluated DataFrames, apply transformations over them, then reduce them via a union, performing a single spark plan and write.

We can actually see the difference in architecture on the backend via the Spark UI…

spark partition data skew optimize optimization pyspark sql python UI partition
Figure 5: spark DAG for for loop vs. functools.reduce. Image by author.

In figure 5, the DAG on the left corresponding to the for loop will have 10 stages. However, the DAG on the right corresponding to functools.reduce will have a single stage and thereby can be processed more easily in parallel.

For a simple use case of reading 400 unique delta tables then appending to a delta table, this method was 6x faster than a for loop.

Get creative to create a single spark DAG.

This is not about hype.

Spark is a well-establish and thereby well-documented piece of software. LLMs, specifically GPT-4, are really good at distilling complex information into digestible and concise explanations. Since the release of GPT-4, I have not done a complex spark project where I didn’t heavily rely on GPT-4.

spark partition data skew optimize optimization pyspark sql python UI partition
Figure 6: example of GPT-4 output on impacting data partition size in spark. Image by author.

However, stating the (hopefully) obvious, be careful with LLMs. Anything you send to a closed source model can become training data for the parent organization — make sure you don’t send anything sensitive. Also, please validate that the output from GPT is legit.

When used properly, LLMs are game-changing for spark learning and development. It’s worth $20/month.



Source link

Leave a Comment