car-data-api

Spark Cache Applied at Large Scale – Challenges, Pitfalls and Solutions

Spark caching is a useful capability for boosting Spark applications performance. Instead of performing the same calculations over and over again, Spark cache saves intermediate results in an accessible place that is ready for fast recalls. But while Spark cache is very simple to use, when handling large scales of data you might encounter some unexpected behaviors.

At Otonomo, we leverage Spark caching in our Reports service, which provides our customers with vehicle data. When we first applied caching on large-scale reports, we encountered some memory related stability problems. In this blog post I will explain the solutions we found useful for efficiently tuning Spark memory, and I’ll also cover what Spark cache is, when to use it and how it’s related to the Spark memory model. Understanding how caching interacts and affects Spark memory can help you confidently apply Spark Caching on production systems where stability and runtime are crucial factors.

What is Spark Cache?

Spark cache is a mechanism that saves a DataFrame (/RDD/Dataset) in the Executors memory or disk. This enables the DataFrame to be calculated only once and reused for subsequent transformations and actions. Thus, we can avoid rereading the input data and processing the same logic for every action call.

How Does Spark Cache Work?

DataFrame is built of a set of transformations (e.g filter, sort, groupBy, etc) that are triggered with an action (e.g count, write.parquet, etc). DataFrame lazily stores a DAG that represents all the upcoming transformations without performing any of them yet. Then, when a certain action is called, Spark reads the data, executes all the transformations in the DAG with a chosen execution plan and returns the action result.

Unlike Spark actions, which produce their results to the user, transformation results are immediately “forgotten”. They’re used solely at runtime to compute a dedicated action. So, the action result is the only available data when the job completes and we end up with the count number, the saved files in the filesystem, etc.

However, caching a DataFrame enables also “remembering” its transformation results by saving them closer to the Spark Executors, ready for fast recalls. The caching process is also lazy, it’s triggered on the first action call on the DataFrame. Any upcoming actions that are based on the cached DataFrame will then pull the data directly from the cache and start the computations from the next step in the DAG. This can improve the Spark Job total runtime and decrease the total loads on the input database and metastore.

Spark Cache Storage Levels

The fastest way for Spark to put and retrieve cached data is by using the Executors memory. The memory is a relatively small and limited resource. Therefore, Spark offers a second option of caching the data in the Executors local disk. Writing and reading data from disk is slower than memory, but it also provides some benefits like having large storage space and being more fault tolerant.

When you persist a DataFrame you can choose a storage level and tell Spark which strategy to use in order to save the persisted data blocks. There are three main storage levels:

  1. Memory and Disk- cached data is saved in the Executors memory and written to the disk when no memory is left (the default storage level for DataFrame and Dataset).
  2. Memory Only- cached data is saved only in the Executors memory, additional data beyond the memory limit is recalculated like a regular non-persisted DataFrame (default storage level for RDD).
  3. Disk Only- the data is saved only on the disk.

There are additional storage levels, which are a correlation of the main storage levels. They have the following capabilities:

  1. Replicate each partition to a second node.
  2. Serialize the persisted data blocks. This costs more CPU cycles for serialization and deserialization, thus taking more time to run. This level is deprecated in PySpark since records are always serialized anyway. Any of the main storage levels in PySpark are referenced to the correlated serialized storage level.

Additional Storage Level is off-heap- managing a buffer outside the JVM with a set of optimizations based on Project Tungsten.

To persist a DataFrame df, call either df.cache() or df.persist(StorageLevel). 

Calling persist without arguments is equivalent to calling cachecache actually calls persist with the default storage level.

(The terms caching and persisting are used interchangeably in this article without relating to a specific storage level).

When Should You Use Spark Cache?

Any reuse of the same DataFrame can potentially benefit from caching. However, caching is more beneficial when a repeated transformation:

  1. Takes a longer time to run, usually as a result of a large input or complicated logic.
  2. Results in a smaller output size, usually as a result of filters and aggregations.
  3. Repeats many times. For example, caching when reusing the same DataFrame ten times is more beneficial than when reusing it twice.

If a repeated transformation runs relatively slow and yields a small output, cache can improve performance dramatically.

If a repeated transformation runs relatively fast and yields a large output, cache may harm performance.

These are general guidelines. Every use case of a reused DataFrame will perform differently with Spark Caching and must be checked to determine whether caching improves performance.

Spark Caching at Otonomo

Otonomo users can consume large datasets of vehicle data by triggering on-demand Spark-based ReportsBy setting their required data filters and properties, they can find a fit for their use case. Data filters include time range, name based locations (country/state/city), polygon based locations (geographic coordinates), vehicle columns projection, limit max result size and more.

After users trigger a Report, the backend runs a Spark Job. This job creates a DataFrame that reads Parquet files from S3, projects and filters the data (after enforcing the user’s permissions on it) and finally applies three different actions- countsave Report results and foreachPartition. Applying all three actions on the exact same DataFrame triggers triple access to the metastore, triple the amount of data received from S3 and also triple the number of identical filter computations.

This use case is classical for caching since the same action is repeated multiple times (in our case, three).

On top of that, our Report statistics show that 98% of Reports yield a relatively small output of less than 5GB. This even includes Reports with Terabytes of input data. Caching these small outputs can fit into the memory, which is considered as the fastest caching place (compared to disk). Thus, we can expect a significant performance increase.

Based on this information, we considered caching. So we performed a short POC to determine whether we should be caching our reports. After persisting the Report DataFrame and examining the performance improvement for the common Report use cases – the results were perfectly clear- we saw that persisting the DataFrame would make most of our reports blazingly fast.

Although we had solid evidence showing how caching would improve the performance of most Reports, the pitfall here was in the edge scenarios. Input and output sizes difference between Reports can vary from several Megabytes up to tens of Terabytes.

We needed to find out-  How will Spark Caching handle the Reports with large outputs?

Cache Benchmarking

To determine the ability of Spark Cache to handle large output Reports, we ran an extensive caching benchmark. The benchmark included different datasets with varied input filters. We ran each Report twice, first with the original code and second with caching. We compared their runtime, input size, output size and cache size in both memory and disk.

We considered implementing a dynamic estimation of the Report output size and trigger caching only for small estimated outputs. But we ended up rejecting this solution in favor of deterministic behavior. Finally, we decided to always persist the DataFrame with memory and disk storage level, expecting small output Reports to enjoy the fast memory retrieves and the larger Reports to cooperate disk.

The purpose of this benchmark was to measure the stability and performance improvement of Spark caching against the most challenging scenarios of Report input filters, with an emphasis on the large output Reports- since they have the most potential to suffer from runtime and stability issues. 

We ran dozens of Reports with different filters to simulate varied behaviors and use cases. To make things easier to track, we grouped all of these Reports into small and large output Report types. These are the aggregated results for each group:

  Input Size (Max) parquet Output Size (Max) csv Output Rows (Max) Runtime Non-Cached (Min / Max) Runtime Cached (Min / Max) Runtime Improvement (Min / Max) Cache Memory Used (Max) Cache Disk Used (GB)
Small Output 320GB 10GB 40m 2.3 / 30 Minutes 1.4 / 17 Minutes 18% / 42% 3.5GB 0
Large Output 1TB 4TB 13b 1 / 12.7 Hours 1.5 / 8 Hours -50% / 40% 70GB 1.5TB

Overall, Cached Reports ran amazingly fast: up to 42% faster than the original non-cached Reports. But while the small output Reports had a relatively stable 18-42% runtime improvement, we encountered a problematic result with the large output Reports- these cached Reports took up to 50% more time than the original ones. Although a runtime increase of 50% sounds alarming, it happened specifically for Reports that originally took one hour and increased to 1.5 hours with caching. Most of the large output Reports actually had much better runtimes with caching, decreasing the maximum runtime for more than 4.5 hours- from 12.7 hours to 8 hours.

Why do large output reports run for so long? Unlike small output Reports that always fit into the memory and don’t leverage any disk for caching, large output Reports can’t entirely fit into the memory. This makes them more vulnerable for slower runtimes due to intensive disk usage from reading and writing cached data. If we would like to remedy the performance impact of intensive disk usage, upgrading to faster disks (like SSD) for the cluster nodes is a feasible option.

Another reason you can experience slower runtimes for massively cached DataFrames is that cached data can’t fully enjoy the benefits of partition pruning, projection pushdown and predicate pushdown for further transformations applied on it. If you have a very large DataFrame and need to filter a very small subset of rows from it, it may be faster to run the filters against an uncached DataFrame, letting the metastore and Parquet (or other file format that supports pushdown capabilities) perform most of the filters instantly. It’s also important to make sure you filter only the relevant data, select only the required fields and that they are the only ones that are cached. Mistakenly caching a whole dataset of 300 columns can be brutal for cases when you need only a few of them.

A New Problem: Stability Issues

While the benchmark showed some promising runtime results, the cached large output Reports exposed some serious stability issues. We consistently received the following memory error from dead Executors:

Container killed by YARN for exceeding memory limits

We exceeded the YARN container memory limit. What does that exactly mean?

The cluster manager (e.g YARN, Kubernetes) allocates a container for each Spark Executor. Each container has its own resource pool of memory and CPU. Intensive CPU computations may slow runtimes, but if the memory usage is excessive and goes beyond the container limit- the container and the Executor within it are being killed. If that happens consistently for Executors that retry the same tasks, the whole Spark application may fail.

Where could this memory leak be coming from, and why?

Finding The Source of The Memory Leak

The total container memory consists of two major memory components:

1. Executor Memory

The core piece of the container memory is the Executor’s Java heap. Every Spark Executor runs inside a JVM and allocates objects on the heap. Since this memory is managed by the JVM, objects are automatically deallocated from the heap by the garbage collector. When exceeding the Java heap memory limit, an OOM (OutOfMemory) error is thrown, killing the JVM and Executor. The Executor Memory occupies a fixed size within the total container memory and can be tuned by spark.executor.memory.

Since this memory is configured with a fixed size, our memory leak didn’t come from here.

2. Overhead Memory

Any memory that is allocated outside the JVM is considered by Spark as Overhead Memory. This memory is not protected by the JVM, thus there’s no allocation limit enforcement. The process that manages the allocations is also responsible for the deallocations. Massive Overhead Memory allocations may lead to exceeding the container memory limits. The Overhead Memory size is 10% of the Executor Memory (with a minimum of 384MB) and can be tuned by spark.executor.memoryOverhead.

Unlike Executor Memory, the Overhead Memory is more dynamic and not necessarily managed with an awareness of its limit. Hence, potential memory leaks can be spotted there.
Processes that allocate Overhead Memory are:

  1. When using PySpark (and not configuring spark.executor.pyspark.memory), Python can allocate as much memory as it desires. If you configure PySpark memory it becomes bounded and added to the total container memory in addition to Executor and Overhead Memory.
  2. Spark native code that runs inside the Executor’s JVM allocates off-heap buffers to spare garbage collection cycles and increase the general performance. These buffers fall within the Overhead Memory section.

We configured a fixed size of PySpark memory. So the most reasonable source of memory leaks is from off-heap buffers allocated by the Executor.

Spark Executor Off-Heap Buffers

Cached data blocks and other internal objects are stored in a memory pool named Storage Memory, while the volatile data Spark uses for shuffles, sorts and aggregations is stored in another memory pool named Execution Memory. Both pools are part of the Executor Memory. Starting from Spark 1.6, Storage and Execution Memory pools are managed by a single UnifiedMemoryManager in a common memory region. It lets either Execution or Storage Memory occupy the whole region when the other is not used. If the memory region is full, new Execution Memory allocation requests can evict Storage Memory to disk up to a configurable limit, after that any additional Execution Memory beyond that limit is spilled to disk.

Storage Memory can’t evict Execution Memory, so when the memory region is full both new allocations and evictions of Storage Memory are going to disk (if the storage level allows). Not diving any further into memory manager methodology specifics, it consists of using either the Executor Memory or the disk space, so the whole game doesn’t leave the heap-disk playground… Or does it? Apparently it does.

Spark allocates buffers using Java NIO and Netty frameworks. Some of them are directly aimed to be managed off-heap in order to spare JVM garbage collection cycles during cache block transfers and shuffles. Some other buffers are actually on-heap buffers (inside Java heap), which internally allocates additional off-heap memory.
Most of the off-heap buffers are managed by Spark and intended not to occupy too much Overhead Memory, but in massive caching these buffers grow unproportionally to the default Overhead Memory size, and eventually exceed the container memory limit. Similar problems can be encountered for large shuffles.

Avoid Blindly Raising Executor Memory For Every Memory Issue

When tackling any memory issue, the first instinct is to simply raise Executor Memory spark.executor.memory. In most cases this will actually fix or remedy the memory leak impact.

With that said, you should think twice before doing so.

The problem with this attitude is that increasing the Executor Memory will result in containers with larger memory. This translates into larger amounts of memory consumption for the same amount of cores. These kinds of “easy” fixes are going to quickly turn memory into the bottleneck of your cluster, dragging projects to over-provision resources in order to overcome the lack of memory in their Spark Applications. This is wasteful and eventually becomes expensive.

Instead of blindly raising the Executor Memory, find exactly where Spark lacks memory and focus there. The first step is to distinguish between Executor Memory and Overhead Memory errors. OOM (OutOfMemory) errors are related to the Executor’s Java heap while “container killed by YARN for exceeding memory limits” errors (for YARN) are related to the Overhead Memory.

Generally, if you received an OOM Error, consider checking whether your code allocates large objects. With these errors, if increasing the number of partitions doesn’t help, it makes sense to gradually raise spark.executor.memory since sometimes you just can’t escape the problem with a small heap.

For the container memory limit error, don’t raise Executor Memory with spark.executor.memory. While this action can remedy the problem, since only 10% of that memory is going to the Overhead Memory, directly raising the Overhead Memory with spark.executor.memoryOverhead is x10 more beneficial. Adding 10g to spark.executor.memory is equivalent to adding merely 1g to spark.executor.memoryOverhead in the total added Overhead Memory to the container.
Actually, the error log itself recommends adding Overhead Memory.

Let’s see how to do it.

Efficiently Increasing Spark Overhead Memory

There are two complementary solutions we found to efficiently increase the Overhead Memory.

In some cases, exceeded Overhead Memory problems can be solved without raising the memory-cores ratio of the container at all. When no tunings are made, Overhead Memory is small (10% of Executor Memory). Slightly taking memory from the Executor in favor of the Overhead can even double or triple the Overhead Memory.

1.“Free” Overhead Memory

No memory really comes for “free”, but there’s a trick you can perform to take advantage of the fact that Spark allocates a fixed 300MB called Reserved Memory within the Executor Java heap. It’s internally used by Spark and protects against OOM for small heaps. The same Reserved Memory size is used whether it’s a part of an Executor with 2g memory or 8g. This means that the larger the Executor Memory is, the smaller portion of it the Reserved Memory will take up. Two containers of 4g and 2 cores are equivalent to a container with 8g and 4 cores in the total resources consumption aspects.

So, if you use a relatively small Executor Memory (6g or less), you can double the container total memory and cores and enjoy an extra 300MB Overhead Memory for not doubling the fixed Reserved Memory of the Executor. By doing so, you will have to use half of the original number of Executors to equalize the total Spark Job resources consumption (tune the number of Executors manually if you don’t use dynamic allocation).

Consider the Executor Memory as two different components of a fixed Reserved Memory (300MB) and a dynamic Non Reserved Memory (spark.executor.memory minus 300MB). When doubling the Executor Memory you can’t really double the Reserved Memory since it’s fixed, so the only relevant memory increase is the Non Reserved Memory.

Instead of “traditionally” doubling spark.executor.memory, double only the Non Reserved Memory component within the Executor Memory.

For example (total container memory 4.4g):

spark.executor.memory=4g // ~3.7g non reserved (4g-300MB)
spark.executor.memoryOverhead=0.4g // (default 10% of executor.memory)
spark.executor.cores=2

The total container memory can be doubled to 8.8g with the following configuration:

spark.executor.memory=7.7g // non reserved doubled from 3.7g to 7.4g and added 300MB for reserved
spark.executor.memoryOverhead=1.1g // 8.8g - 7.7g
spark.executor.cores=4

The original configuration had Non Reserved Memory of 3.7g. Doubling it means having 7.4g Non Reserved Memory. When configuring the Executor Memory, we added 300MB to it because this amount of memory automatically goes to Reserved Memory. Since the total desired container memory doubled to 8.8g, there’s 1.1g left for Overhead Memory, which is almost triple of the original amount.

Using this trick for larger container sizes can also work but it reduces the relative Overhead Memory gain since the fixed Reserved 300MB becomes less significant. In addition, allocating too large Executor Memory (over 32g) is not recommended in order to prevent long GC cycles.

This method will work only when you need slightly more memory. So if you still exceed the container memory, let’s take a look at another complementing solution that allows you to fully squeeze out all the memory you can for Overhead Memory.

2.“Borrowed” Overhead Memory

In most cases a Spark Job doesn’t really utilize all of the allocated memory, especially when the Executor is relatively large (over 6g memory). You can slightly cut some of the Executor Memory and add it to the Overhead Memory, doing so by keeping the same total container memory.

Simply start by decreasing the total Executor Memory, add the extra memory to the Overhead Memory and run your Spark Job.

For example (total container memory 8.8g):

spark.executor.memory=8g
spark.executor.memoryOverhead=0.8g
spark.executor.cores=4

Decrease Executor Memory and add to Overhead Memory (total container memory stays 8.8g):

spark.executor.memory=7g
spark.executor.memoryOverhead=1.8g (8.8g -7g)
spark.executor.cores=4

We decreased the Executor Memory by merely 12% and gained more than double Overhead Memory. Now your Spark Job has a chance to not exceed the container memory limit and overcome the error.

If you’re still getting the same error, keep gradually decreasing the Executor Memory and increasing the Overhead Memory by staying in the same total memory size for the container. If you receive a different error or the job is slower than usual- you can try to slightly tune the Memory Fraction (spark.memory.fraction) of the Spark Executor:

  • OutOfMemory error- carefully decrease the Memory Fraction and see whether the error stops. This will give more space to the User Memory within the Executor. This is where the Spark code you write inside transformations is allocating objects and where there’s no disk spill backoff to protect against OOM. In conjunction, less memory is going to be available for Storage Memory (memory for cache) and Execution Memory, which means that the job might deal with more disk spills and run slower. For this reason, it’s not recommended to drastically change the Memory Fraction.
  • Long running Job (compared to the original Executor Memory configuration)- can possibly be caused due to excessive disk spills of Execution or Storage Memory. Gradually increase the Memory Fraction and see whether the error stops. Note that increasing the Memory Fraction exposes the job for more potential OOM since the User Memory decreases.

Hopefully your Spark Job worked without any errors. If you still can’t get away with problems, your container memory size is probably too small. Gradually increase the Overhead Memory until the problem is solved.

These solutions are focused on optimizing the memory usage of a given Spark Job only by tuning the memory configurations, and inspired by the memory issue caused due to massive caching under the assumption that before caching the same code worked. With that being said, how you implement your code and which Spark functions you choose have a crucial role in the overall job performance, including its memory consumption.

We did it! These changes solved our memory problem while not increasing the total memory consumption of our Spark applications. Now both small and large output Reports are cached and stable!

Final Thoughts

Things aren’t always what they seem, and simple features like Spark Cache might not be so simple or stable when dealing with large scales of data. But instead of automatically choosing the easy solutions like raising the Executor Memory to solve a memory leak or just entirely giving up on caching to avoid raising memory, sometimes we need to think about problems a little bit differently. With awareness of the root cause of the problem we could confidently fix it, leverage Spark Cache and make our Reports run up to 42% faster than before without consuming any additional memory at all.

If you’re interested in working with us, check out our open positions.

More for Developers

Otonomo is more than a car data exchange. Read these blogs written by developers, for developers, about coding, technology and culture.

Spark Cache Applied at Large Scale – Challenges, Pitfalls and Solutions

The ultimate guide for Spark cache and Spark memory. Learn to apply Spark caching on production with confidence, for large-scales of data. Everything Spark cache.
Ofek Hod
November 18, 2021

@Otonomo: An Innovative Approach to Software Delivery

In our Behind the Scenes Otonomo series, we talk to people from across the Otonomo family to hear what makes their job unique, and the innovative ways they take on their role within the company.
Nir Nahum - Software Engineering Team Leader
June 15, 2021

How We Run CI/CD in Our Development Process new

We developed a CI/CD pipeline to assist our R&D save time when merging to the master branch. Learn about our environment challenges, cloud pricing, and more
Danny Gitelman
August 12, 2019

Luigi, Airflow, Pinball, and Chronos: Comparing Workflow Management Systems

A comparison of Luigi, Airflow, Pinball and Chronos. Choose the best workflow management system for your automated jobs based on features and abilities.
Hilla Shapira
June 5, 2019

How to Count Large Scale Geohashes

A brand new effective way to count geohashes in any given region at any level, even in continents. Learn how you can now analyze geohashes properly.
Itamar Landsman
June 3, 2019

Deleting Code Matters

Deleting parts of your code is hard but necessary. Read how keeping your code short is better for code maintenance, reducing bugs etc., and best practices.
Tzahi Furmanski
May 28, 2019

Redis, Kafka or RabbitMQ: Which MicroServices Message Broker To Choose?

Choose the best microservices message broker for your communication needs. Read this comparison of Redis, Kafka and RabbitMQ and become an expert.
Sefi Itzkovich - CTO
May 20, 2019

How We Run CI/CD in Our Development Process

We developed a CI/CD pipeline to assist our R&D save time when merging to the master branch. Learn about our environment challenges, cloud pricing, and more

READ MORE
Danny Gitelman
August 12, 2019

Reshaped and Insightful

We developed a CI/CD pipeline to assist our R&D save time when merging to the master branch. Learn about our environment challenges, cloud pricing, and more

READ MORE
Danny Gitelman
August 12, 2019

Detailed Reports and Analytics

We developed a CI/CD pipeline to assist our R&D save time when merging to the master branch. Learn about our environment challenges, cloud pricing, and more

READ MORE
Danny Gitelman
August 12, 2019