What are the precautions and how to use Spark together with Kubernetes?

Sebastiao Ferreira de Paula Neto
16 min readMar 4, 2023

--

1. Introduction

Apache Spark is one of the most popular large-scale distributed data processing frameworks , which was created in 2009 at the Artificial Intelligence Laboratory at the University of California at Berkeley. It allows the processing of large volumes of data in a cluster, using in-memory computing techniques and parallel processing to improve efficiency and scalability.

Spark is used in a variety of applications, including data analysis, machine learning, natural language processing, and more. It offers several APIs to work with different types of data, including RDDs (Resilient Distributed Datasets), DataFrames and Datasets, as well as support for several languages, such as Python, Scala, Java and R.

Kubernetes , in turn, is a container orchestration platform that allows the management of large-scale applications, with high availability and scalability . It helps ensure that containers are always up and running, scaling horizontally and vertically , and providing monitoring , resource management, and security capabilities .

When we combine both technologies we can obtain many benefits for companies , as we will be able to process large volumes of data in a scalable and customizable way. This is made possible by resource management by Kubernetes, while Spark can be used to process the data. Therefore, by uniting both tools, we managed to simplify the management of the environment , guarantee high availability and scalability , in addition to allowing better use of available resources .

Therefore, in order to enable the creation of Spark applications managed by Kubernetes, it is imperative that certain precautions are observed so that the previously listed benefits can be achieved. Thus, the first care to be considered in the process of building a Spark application in Kubernetes is to answer the following question:

What is Apache Spark’s architecture and how does it enable large-scale data processing?

2 How Spark works

Apache Spark is a framework for distributed data processing where large volumes of data are processed in a relatively short time . This time is only possible thanks to Spark’s internal architecture, which uses the master-slave model for parallelism. However, Spark differs from other frameworks by operating mainly in memory, which makes processing large volumes of data more efficient.

The master-slave architecture is a software architecture model where a central server (master) is responsible for managing and coordinating the activities of one or more clients (slaves). The master is responsible for receiving requests from clients, distributing tasks to slaves and consolidating the results. Slaves perform the tasks assigned by the master and send the results back .

Everything we’ve talked about so far is related to how Spark distributes hardware. When we analyze the logical structure, we find the RDDs (Resilient Distributed Datasets) , which manage the data distribution among the nodes of the cluster. This concept allows Spark to perform complex operations on large datasets efficiently, enabling in-memory processing. Afterwards, Spark added the concept of DataFrames/Datasets , which simplifies the use of RDDs and allows their use in several programming languages.

Understanding Spark’s internal architecture is crucial to exploiting the full potential of this distributed processing framework. Spark’s processing engine is made up of several components, which work together to enable distributed processing of large datasets. Furthermore, understanding Spark’s internal architecture is an important step to optimize the performance and efficiency of applications developed with this framework.

2.1 The architecture

As presented, Spark’s architecture is based on a master-slave system, where the Driver acts as a representative of the master and the Executors and tasks as representatives of the slaves . The Cluster Manager is responsible for managing cluster resources and scaling Spark applications. All of these components work together to ensure that distributed processing is performed efficiently and scalably.

The Driver is the component responsible for orchestrating the execution of the Spark application , creating the SparkContext and distributing the code to the cluster nodes, in addition to defining the application’s DAG and sending tasks to the Executors. In addition, in the driver, we have the presence of SparkSession , which is responsible for creating Spark’s execution context and managing the connection to the cluster . Through it, it is possible to read and write data in different formats, create dataframes, perform SQL operations and perform tasks in the cluster. Thus, this is the user’s way of developing their applications.

The Executors are responsible for executing the tasks defined by the Driver in each node of the cluster, managing the execution of each one of them and allocating resources such as CPU and memory for processing. Tasks are the basic processing units in Spark, distributed by the Driver to the Executors so that they can be executed in parallel in different nodes of the cluster.

2.1.1 Number of taks

In general, it is recommended that there be a number of tasks per executor that is a multiple of the number of cores available on the worker node, so that each core can be utilized efficiently. Setting the number of tasks per executor can be done through the “spark.executor.instances” property in the Spark configuration file or through the “setExecutorInstances” method in the “SparkConf” class in the Spark application. It is important to point out that the number of tasks per executor may vary depending on the type of operation being performed and the characteristics of the data, and it is recommended to carry out performance tests with different configurations.

2.1.2 Memory allocation

When evaluating the recommended amount of memory required, you need to evaluate data volume in terms of bytes per data partition. This is the main consideration in building a Spark application and the main performance tuning point for Spark . This is due to Spark’s different categories of memory and the way it allocates memory to each of them.

Spark uses off-heap memory to store data more efficiently and to reduce garbage collection performed by Java’s Garbage Collector. However, it is necessary to consider the on-Heap memory which is the memory of the executors , which is the standard Java memory used by Spark to store Java objects. In this way, we must focus our attention on the standard, on-heap memory, which is divided into four categories, namely:

  1. Execution Memory: is the memory used to store intermediate data during the execution of Spark tasks. This memory is dynamically allocated by Spark and is primarily used to store medium sized data such as data slices.
  2. Storage Memory: is the memory used to cache data in Spark. This memory is dynamically allocated and is used to store data larger in size than the Execution Memory, such as the result of shuffle and aggregation operations.
  3. Reserved Memory: It is used to store internal data and system metadata at the start of execution, without being used for data processing. This memory contains information such as settings, status, and performance statistics, as well as application execution plan data.
  4. User Memory: is additional memory that can be allocated by the Spark user to store application-specific data. This memory is allocated using Spark’s MemoryManager API.

It is important to note that these memory types can be configured differently in Spark , depending on the application needs. In addition, we must consider taking care that part of the memory used by spark is not used in processing.

And so that we can have access to the resources, whether they are GPU, CPU, memory, we need to guarantee a “midfield” to manage them. At this point the Cluster Manager enters .

2.2 Cluster Manager

The Cluster Manager is a fundamental component of the Spark architecture, responsible for managing cluster resources and scaling Spark applications to run on available nodes . It identifies the available nodes in the cluster and allocates resources for each Spark application, and can be integrated with different cluster management systems such as Apache Mesos, Hadoop YARN and Kubernetes.

  1. Standalone : is a simple cluster manager, which is distributed with Spark. It’s easy to set up and use, but it’s not as scalable as other cluster managers.
  2. Apache Mesos : is a distributed cluster manager that manages resources in a cluster and supports running various processing frameworks, including Spark. It is highly scalable and can handle large clusters of machines.
  3. Kubernetes : is a container orchestration platform that provides capabilities to manage and orchestrate containers in a cluster. It can be used as a cluster manager for Spark and has advanced scalability and resource management features.
  4. YARN : is a resource management framework for Hadoop clusters. It is used to manage hardware resources and schedule applications to run on the cluster. Spark can run on YARN as an application client or in cluster mode.

The choice of cluster manager depends on the needs of each application and the execution environment. When a Spark application is submitted to the cluster manager, it will allocate necessary resources for the application to run, such as CPU, memory and storage. The cluster manager also monitors the status of each node in the cluster and redirects the application’s work to other nodes, in case a failure occurs in any node in the cluster. In addition, the cluster manager is responsible for ensuring the security of the cluster and the execution of applications in isolated environments. In this way, the cluster manager guarantees the availability and scalability of the Spark applications in the cluster and at that point,Kubernetes, for allowing high customization of resources to applications in isolation, becomes an excellent candidate to be our cluster manager.

How does kubernetes allow this customization and how do we instantiate spark as a container ?

3. Running spark on Kubernetes

Kubernetes is an open source container orchestration platform that lets you automate the deployment, scaling, and management of containerized applications. Its goal is to simplify the process of deploying applications in production environments, offering advanced management and scalability tools.

The Kubernetes architecture is made up of several components that work together to provide a complete container management platform. Among the main components are:

  • API Server : is the core component of Kubernetes and serves as the entry point for interactions between users and the Kubernetes cluster. It is responsible for validating and processing incoming requests and then coordinating with the other components to perform the requested action.
  • Scheduler : is the component responsible for distributing and scheduling Kubernetes workloads across different cluster nodes, taking into account factors such as available resources and allocation policies.
  • Etcd : is a distributed database that stores the state information of the Kubernetes cluster. It provides a centralized and reliable record of cluster resource configurations and states, allowing Kubernetes components to coordinate and work together.
  • Controllers : are components responsible for monitoring and controlling the state of resources in the cluster. They ensure that Kubernetes resources comply with configuration policies and provide feedback to the Scheduler to ensure optimal scalability.
  • Kubelet : is the component responsible for managing the Kubernetes cluster nodes. It ensures that containers are running to desired specifications and communicates with the Scheduler and API Server to coordinate management of cluster resources.
  • Kube proxy : is a Kubernetes component that manages network traffic within the cluster. It is responsible for ensuring that services are accessible to users and redirecting traffic to the correct pods. Kubeproxy can also be configured to provide load balancing, reverse proxy, and port forwarding services.

Kubernetes uses the concept of objects such as pods, services and deployments to manage cluster resources. Every object has a user-defined desired state, and Kubernetes works to ensure that the object’s current state always conforms to the desired state. This is done through reconciliation cycles, where the object’s current state is checked against the desired state and necessary actions are taken to ensure that the current state is updated to match the state. wanted.

To use Spark on Kubernetes, we need to create applications to manage connections and all infrastructure setups. However, a tool was developed that uses the Kubernetes CRD resource to add custom resources to your cluster in a simple and efficient way. With this solution, it is possible to extend Kubernetes functionalities and create custom definitions of resources, making it easier and more practical to manage the data environment.

3.1 CRDs

Custom Resource Definition, or CRD , is a feature available in Kubernetes that allows you to extend your default resource model and create custom resource definitions. In other words, you can create new types of resources in Kubernetes and describe your applications or services, including the schema and desired behavior for the custom resource, as well as the operations that can be performed on it.

CRDs are created using the Kubernetes API , which allows them to be managed in a standardized way using tools such as kubectl. Using CRDs is particularly useful for implementing custom operations on a Kubernetes cluster , such as storing application-specific settings or setting security policies. This feature is especially useful for developers and platform engineers who want to extend Kubernetes to meet their organization’s specific needs .

3.2 Spark Operator

The Spark Operator CRD allows users to provision Spark clusters on Kubernetes with desired configurations, defining the number of worker nodes and customizing Spark properties to suit their data processing needs. Additionally, Spark Operator simplifies Spark cluster management and monitoring , allowing users to monitor the status of running jobs and see real-time cluster metrics. For more information about SparkOperator, click hereand be redirected to the official documentation. Now, let’s go to the instructions. By default, we set the name of our namespace to processing, where all spark applications should run.

1 . Create the namespace where your resources will be instantiated:

kubectl create namespace processing

two . Add the helm repository that contains the SparkOperator image

helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm repo update
helm install --set image.tag=v1beta2-1.3.2-3.1.1 spark spark-operator/spark-operator --namespace processing --create-namespace
helm ls -n processing

3 . Now we will create a service account and a role, in the namespace.

kubectl create serviceaccount spark -n processing
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=processing:spark --namespace=processing

In order to create applications in Kubernetes, it is necessary to build YAML files that are monitored through manifests by Kubernetes itself. These files have an encoding structure that allows specification of runtime settings. As an example, we can consider the execution of code in Python locally, which can be done following the example below.

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-job
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: spark-py:latest
mainApplicationFile: local:///app/main.py
sparkVersion: "3.1.1"
restartPolicy:
type: Never
executor:
cores: 2
instances: 2
memory: "2g"
driver:
cores: 1
memory: "1g"
request:
memory: "3g"
cpu: "1"
limit:
memory: "4g"
cpu: "2"

For this example we are using Python version 3 and we have defined the image that contains the necessary libraries and dependencies for the execution of the Spark job, by default it accesses the docker hub. In the driver settings we specified that we want 1 core and 1 GB of memory for the driver, although we requested 3 GB of memory and 2 CPU. In the executor configuration, 2 instances of the executor were specified, each with 2 cores and 2 GB of memory.

4 . And finally to run our Spark application, we apply the appropriate template and indicate the namespace.

kubectl apply -f filepath/file.yml -n processing

Despite being functional, this application does not follow good software construction practices. To evolve it, one option is to build the Spark image by adding the codes to be executed in the container. Thus, we will introduce how to build the Spark image properly.

3.3 Building your first Spark Image

As it is open source, apache spark provides its code and we can build images based on them. In its own documentation we have the example, through bash commands, of how to build it. The first step is to access your download page and select the package type as SOURCE CODE.

Then we access the uncompressed folder and we can build our image according to the official one:

REGISTRY=<SEU REGISTRY (DOCKER HUB, ECR OU OUTROS)>
REPOSITORY=spark
IMAGE_TAG=v3.2.3-hadoop3

bash ./bin/docker-image-tool.sh -r $REGISTRY -t $IMAGE_TAG -p ./kubernetes/dockerfiles/spark/bindings/python/Dockerfile build
bash ./bin/docker-image-tool.sh -r $REGISTRY -t $IMAGE_TAG push

At this point, we will have access to an isolated image where we can modify it to meet our demand and this is done through a docker file:

FROM <YOUR IMAGE CREATEDv3.2.3-hadoop3># using root

USER root:root

# Create an applications directory
RUN mkdir -p /app

# Copy your local code into the image
COPY app/* /app/

# Copy jars
COPY ./jars/sparkoperator/ /opt/spark/jars

#set directory principal
WORKDIR /app

# Install python libraries used in python3
RUN pip3 install -r requirements.txt

# Set python execution path
ENV PYSPARK_PYTHON=/usr/bin/ python3

#user
USER root

In this way we can manipulate a spark image, but what about monitoring and knowing how it is working?

3.4 Monitoring your applications

Monitoring in Spark refers to the process of observing the performance and behavior of the Spark cluster while running jobs. To ensure the effectiveness of the cluster, it is essential that this is carried out constantly, allowing administrators to identify possible bottlenecks, optimize the use of resources and solve problems quickly and efficiently.

In Kubernetes, the way to monitor the Spark application is to access its SparkApplication and analyze the job structure. You can then access the logs generated by the driver.

kubectl get sparkapplication <sua spark aplication>  -n processing -oyaml
kubectl logs -f <driver da sua spark aplication> -n processing

However, reading logs can be a tedious task and many factors that require observation can go unnoticed. As a solution, a tool capable of displaying all the jobs and tasks performed by the applications on a dashboard, called Spark History, was developed.

3.4.1 Spark History

One of the tools used for monitoring in Spark is Spark History, which allows monitoring the execution of jobs and analyzing the performance of the cluster. It keeps a log of application executions and provides a detailed view of the application’s performance, including the complete list of executed tasks and the corresponding performance metrics.

Its architecture is composed by the Spark History Server and the Application History Provider. Spark History Server displays information such as the list of executed tasks, performance metrics and views of Spark stages and execution plan. The tool has advanced features such as searching records and browsing by timeline. The Application History Provider collects information such as driver and execution plan logs and stores it in a persistent location for later analysis. Spark History Server accesses this data to display the information in its web interface. Spark History allows you to monitor and analyze the performance of Spark applications in clusters, providing an intuitive and friendly interface for viewing the collected data.

One way to improve application monitoring is by providing information about efficiency and key tuning metrics. In this sense, Spot and Data Mechanics developed the Delight tool, which allows measuring the application’s performance and presenting this information in detail in a control panel.

3.4.2 Delight

Delight is a cloud platform that supports Apache Spark clusters and its Spark monitoring tool features a centralized dashboard to view performance metrics of running Spark applications. The dashboard provides a summary of cluster performance, a list of running jobs and tasks, resource utilization metrics, and runtimes.

In addition, the tool offers advanced analysis features such as configurable alerts, historical performance graphs and the ability to compare the performance of different runs. With this tool, it is possible to quickly detect possible problems and performance bottlenecks, allowing users to manage their resources more efficiently and optimize the performance of their applications.

Through the tool, users can quickly identify possible performance issues and adjust their applications to improve their efficiency. With a centralized dashboard and advanced analytics capabilities, the Delight Data Mechanics tool provides a comprehensive solution for monitoring the performance of Spark applications across clusters and allows access directly through the spark UI.

To use the tool, you need to understand how it connects to the Kubernetes cluster. The connection is established through an API, which requires an authentication token in the call. Also, you need to add the jar file to the cluster to allow communication. This communication is carried out through listeners that collect the application logs and, when finished, send them to the application’s webserver, where the dashboard and access to the Spark UI are provided.

You can configure the connection to the Delight platform in the Spark Application build manifest. In the example below, we have an advanced manifest that includes settings such as SparkConf, jar download when creating the job, connection settings through the S3 protocol and access secrets to AWS and the database.

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: <NOME APPLICATION>
namespace: processing
spec:
deps:
jars:
- "https://oss.sonatype.org/content/repositories/snapshots/co/datamechanics/delight_2.12/latest-SNAPSHOT/delight_2.12-latest-SNAPSHOT.jar"
volumes:
- name: ivy
emptyDir: {}
sparkConf:
spark.driver.extraJavaOptions: "-Divy.cache.dir=/tmp -Divy.home=/tmp, -Dcom.amazonaws.services.s3.enableV4"
spark.executor.extraJavaOptions: "-Dcom.amazonaws.services.s3.enableV4"
spark.kubernetes.allocation.batch.size: "10"
spark.delight.accessToken.secret: "SEU TOKEN"
spark.extraListeners: "co.datamechanics.delight.DelightListener"
hadoopConf:
fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
fs.s3a.path.style.access: "True"
type: Python
pythonVersion: "3"
mode: cluster
image: <SUA IMAGEM>
imagePullPolicy: Always
mainApplicationFile: local:///app/SEU-python-FILE.py
sparkVersion: "3.2.2"
timeToLiveSeconds: 600
restartPolicy:
type: drives:Never

envSecretKeyRefs:
AWS_ACCESS_KEY_ID:
name: aws-credentials
key: aws_access_key_id
AWS_SECRET_ACCESS_KEY:
name: aws-credentials
key: aws_secret_access_key
DB_URL:
name: db-credentials
key: url
DB_USER:
name: db-credentials
key: user
DB_PASSWORD:
name: db-credentials
key: password
cores: 1
coreRequest: "500m"
coreLimit: "1200m"
memory: "4g"
labels:
version: 3.0.0
serviceAccount: spark
volumeMounts:
- name: ivy
mountPath: /tmp
executor:
envSecretKeyRefs:
AWS_ACCESS_KEY_ID:
name: aws-credentials
key: aws_access_key_id
AWS_SECRET_ACCESS_KEY:
name: aws-credentials
key: aws_secret_access_key
cores: 2
coreRequest: "500m"
coreLimit: "1200m"
instances: 2
memory: "2g"
labels:
version: 3.0.0
volumeMounts:
- name: ivy
mountPath: /tmp

4. Conclusion

Therefore, the combination of Apache Spark and Kubernetes can provide a robust, scalable and flexible solution for processing large volumes of data in distributed environments. This can help companies gain faster and more accurate insights from their data, becoming more competitive and efficient in their business processes. Furthermore, with delight we can monitor and improve resource usage in processing.

5. References

  1. https://medium.com/data-mechanics/performance-of-apache-spark-on-kubernetes-has-caught-up-with-yarn-57aff5c6d259
  2. https://aws.amazon.com/blogs/containers/optimizing-spark-performance-on-kubernetes/
  3. https://spark.apache.org/docs/latest/cluster-overview.html
  4. https://spark.apache.org/docs/latest/running-on-kubernetes.html

--

--

Sebastiao Ferreira de Paula Neto

Data engineer with a passion for data science, I write efficient code and optimize pipelines for successful analytics projects.