Skip to main content

Lyve Cloud Documentation

Submitting a Spark job

The platform uses MLRun to submit spark jobs and Spark Operator for running Spark jobs over k8s. When sending a request with MLRun to the Spark operator, the request contains your full application configuration, including the code and dependencies to run (packaged as a docker image or specified via URIs), the infrastructure parameters (e.g. the memory, CPU, and storage volume specs to allocate to each Spark executor), and the Spark configuration. Kubernetes takes this request and starts the Spark driver in a Kubernetes pod (a k8s abstraction, just a docker container in this case). The Spark driver then communicates directly with the Kubernetes master to request executor pods, scaling them up and down at runtime according to the load if the dynamic allocation is enabled. Kubernetes takes care of the bin-packing of the pods onto Kubernetes nodes (the physical VMs) and dynamically scales the various node pools to meet the requirements.

You can submit spark jobs on K8s via Jupyter Notebook. For more information, see Spark operator:

How to submit a spark job?

You can submit a spark job using any of the following:

Procedure. To submit a spark job using python:
  1. Install the mlrun package using the following command.

    pip install mlrun
    
  2. Run the following code with Python or Jupyther notebook

    Run the following command to import a new function. This will set up a new spark function with the spark operator, and the command uses the spark code and locate it in the file system.

    mlrun.run import new_function

    Set the spark driver and executor config

    sj = new_function(kind='spark',  command='pi.py', name='my-pi-python')

    Add the fuse, daemon and Jar support.

    sj.with_driver_requests(cpu="1",mem="512m")
    sj.with_executor_requests(cpu="1",mem="512m")
    sj.with_driver_limits(cpu="1")
    sj.with_executor_limits(cpu="1")
    
    

    Adds fuse, daemon & Iguazio's jars support.

    sj.with_igz_spark()
    

    Set the artifact path to save the artifact.

    sr = sj.run(artifact_path="local:///tmp/")
Procedure. To submit a spark job using Java:
  1. Install the mlrun package using the following command.

    pip install mlrun
    
  2. Run the following code with Python or Jupyther notebook

    Run the following command to import a new function. This will set up a new spark function with the spark operator, and the command uses the spark code and locate it in the file system.

    mlrun.run import new_function

    Set the spark driver and executor config

    sj = new_function(kind='spark', command='/User/spark-examples_2.12-3.0.1.jar', name='my-pi-java') 

    Add the fuse, daemon and Jar support.

    sj.with_driver_requests(cpu="1",mem="512m")
    sj.with_executor_requests(cpu="1",mem="512m")
    sj.with_driver_limits(cpu="1")
    sj.with_executor_limits(cpu="1")
    
    

    Add the Java class name to execute.

    sj.with_igz_spark()
    sj.spec.job_type = "Java"
    
    sj.spec.main_class= "org.apache.spark.examples.JavaSparkPi"
    

    Set the artifact path to save the artifact.

    sr = sj.run(artifact_path="local:///tmp/")
Procedure. To submit a spark job using scala:
  1. Install the mlrun package using the following command.

    pip install mlrun
    
  2. Run the following code with Python or Jupyther notebook

    Run the following command to import a new function.

    This will set up a new spark function with the spark operator, and the command uses the spark code and locates it in the file system.

    mlrun.run import new_function

    Set the spark driver and executor config.

    sj = new_function(kind='spark', command='/User/spark-examples_2.12-3.0.1.jar', name='my-pi-scala') 
    sj.with_driver_requests(cpu="1",mem="512m")
    sj.with_executor_requests(cpu="1",mem="512m")
    sj.with_driver_limits(cpu="1")
    sj.with_executor_limits(cpu="1")
    
    

    Add the fuse, daemon and Jar support.

    sj.with_igz_spark()
    sj.spec.job_type = "Scala"
    

    Add the scala class name to execute

    sj.spec.main_class= "org.apache.spark.examples.SparkPi"
    

    Set the artifact path to save the artifact.

    sr = sj.run(artifact_path="local:///tmp/")

Assign a driver or executor label to the node with the same label while submitting the jobs. It will use the driver and executor node from the provided list. For more information about node label, see Setting Labels on App Nodes.

Procedure. To assign a node label to the spark job:
  • Submitting job via Jupyter notebook or Python.

    Provide the driver node label and executor node label when you submit a job using Jupyter notebook. The following code snippet is about driver and executor node section with node label key and node label.

    sj.with_driver_node_selection(node_selector={node_label_key:driver_node_label})
    sj.with_executor_node_selection(node_selector={node_label_key:executor_node_label})
    Best practices to optimize Spark jobs

    Optimizing your Spark jobs is critical to process large workloads, especially when resources are limited. With Spark, you can influence many configurations before using cluster elasticity. Below are recommendations for improving your Spark jobs

    1. Use the correct file format

      Most Spark jobs run as a pipeline where one Spark job writes data to a File, and another Spark job reads the data, processes it, and writes to another file to pick up. Writing intermediate files in serialized and optimized formats like Avro, Kryo, Parquet, etc., performs better than text, CSV, or JSON formats.

      Spark’s default file format is Apache Parquet which follows column-oriented storage. Data is stored contiguously within the same column, which is well-suited to perform queries (transformations) on a subset of columns and on large dataframe . Only the data associated with the required columns are loaded into memory.

      Trino is the faster SQL with its custom ORC reader implementation. ORC uses a two step system to decode data. The first step is a traditional compression algorithm like gzip that generically reduces data size. The second step has data type specific compression algorithms that convert the raw bytes into values.

    2. Maximize parallelism in Spark

      Spark is efficient because it can process several tasks in parallel at scale. Creating a clear division of tasks and allowing Spark to read and process data in parallel will optimize performance. Users should split a dataset into several partitions that can be read and processed independently and in parallel.

      Create partitions in the following manner:

      • When reading the data, create a partition using the Spark parameter spark.sql.files.maxPartitionBytes (default value of 128 MB) . Data stored in several partitions on a disk is the best-case scenario. The following example shows a dataset in parquet format with a directory containing data partition files between 100 and 150 MB in size.

        df.repartition(100) 
        df.coalesce(100)

        Increase the number of partitions by lowering the value of the spark.sql.files.maxPartitionBytes parameter. Note: This choice can lead to issues for files with small file sizes So, parameters should be defined empirically according to the available resources.

        Directly in the Spark application code using the Dataframe API.

        Coalesce decreases the number of partitions while avoiding a shuffle in the network. Increase the number of partitions by lowering the value of the spark.sql.files.maxPartitionBytes parameter.

    3. Beware of shuffle operations

      Shuffle partitions are created during the stages of a job involving a shuffle, i.e. when a wide transformation (For example, groupBy(), join()) is performed. The setting of these partitions impacts both the network and the read/write disk resources.

      The number of partitions may be adjusted by changing the value of spark.sql.shuffle.partitions. This parameter should be adjusted according to the size of the data. The default number of partitions is 200, which may be too high for some processes and result in too many partitions being exchanged between executing nodes.

      Configurations

      Default Value, recommendation and description

      spark.driver.memory

      The default value is 1GB. This represents the memory allocated to the Spark driver to collect data from the executor nodes.

      For example, collect()operation

      spark.shuffle.file.buffer

      The default value is 32 KB. We recommend increasing this to 1 MB. This allows the spark to buffer before writing results to disk.

    4. Use Broadcast Hash Join

      A join between several dataframes is a common operation. In a distributed context, a large amount of data is exchanged in the network between the executing nodes to perform the join. Depending on the size of the tables, this exchange causes network latency, which slows down processing. Spark offers several join strategies to optimize this operation, such as Broadcast Hash Join (BHJ). This technique is suitable when one of the merged dataframes is sufficiently small to be duplicated in memory on all the executing nodes.

      By duplicating the smallest table, the join no longer requires any significant data exchange in the cluster apart from the broadcast of this table beforehand, which significantly improves the speed of the join. The Spark configuration parameter to modify is spark.sql.autoBroadcastHashJoin. The default value is 10 MB, i.e., this method is chosen if one of the two tables is smaller than this size. If sufficient memory is available, it may be very useful to increase this value or set it to -1 to force Spark to use it.

    5. Cache intermediate results

      Spark uses lazy evaluation and a DAG to describe a job to optimize computations and manage memory resources. This offers the possibility for a quick recalculation of the steps before action and thus executing only part of the DAG. To take full advantage of this functionality, it is encouraged to store expensive intermediate results if several operations use them downstream of the DAG. If an action is run, its computation can be based on these intermediate results and thus only replay a sub-part of the DAG before this action. Users may decide to cache immediate results to speed up the execution.

      dataframe = spark.range(1 *
      1000000).toDF("id").withColumn("square",col("id")**2)
      dataframe.persist()
      dataframe.count()

      The above code took 2.5 Seconds to execute.

      dataframe.count()
      

      The subsequent execution took 147 ms, requiring lesser time as data is cached. The same also can be cached using the below.

      dataframe.persist(StorageLevel="MEMORY_ONLY")

      The full list of options is available here.

    6. Manage the memory of the executor nodes

      The memory of a Spark executor is broken down to execution memory, storage memory and reserved memory.

      Execution memory = spark.memory.fraction * (spark.executor.memory - Reserved memory)
      

      By default, the spark.memory.fraction parameter is set to 0.6. This means that 60% of the memory is allocated for execution and 40% for storage. Once the default reserved memory of 300 MB is removed, it prevents out-of-memory (OOM) errors.

      We can modify the following two properties:

      spark.executor.memory
      spark.memory.fraction
    7. Don’t use collect (). use take() instead

      When a user calls the collect action, the result is returned to the driver node. This might seem innocuous initially, but if you are working with huge amounts of data, the driver node can easily run out of memory. The take() action is simple—it scans and returns the first partition it finds. For example, if you just want to get a feel of the data, then take(1) row of data.

      df = spark.read.csv("/FileStore/tables/train.csv", header=True)
      df.collect()
      df.take(1)
    8. Avoid using UDF

      UDFs are a black box to Spark hence it can’t apply optimization, and it loses all the optimization that Spark does on Dataframe/Dataset (Tungsten and catalyst optimizer). Whenever possible, we should use Spark SQL built-in functions as these functions are designed to provide optimization.

    9. Use Broadcast and Accumulators variables

      Accumulators and Broadcast variables are both Spark-shared variables. Accumulators is a write /update shared variable whereas Broadcast is a read shared variable. In a distributed computing engine like Spark, it’s necessary to know the scope and life cycle of variables and methods while executing. This is important to understand because data is divided and executed in parallel on different machines in a cluster.

      • Broadcast Variable: It is a read only variable that is cached on all the executors to avoid shuffling of data between executors. Broadcast variables are used as lookups without any shuffle, as each executor will keep a local copy.

      • Accumulator variable: Accumulators are also known as counters in map reduce, which is an update variable. It is used when you want Spark workers to update some value.

    10. Partition and Bucketing columns

      Shuffling is the main reason that affects Spark Job performance. In order to reduce shuffling of data, one of the more important techniques is to use bucketing and repartitioning of data instead of broadcasting, caching/persisting dataframes.

      These are the 2 optimization techniques which we used for hive tables. Both are used to organize large filesystem data that is leveraged in subsequent queries to efficiently distribute data into partitions. partitionBy() is a method of grouping the same type of data into partitions which will be stored in a directory. Bucketing is also a similar kind of grouping, but it is based on hashing technique which will be stored as a file.

    11. Garbage Collection Tuning

      JVM garbage collection can be a problem when you have a large collection of unused objects. The first step in GC tuning is to collect statistics by choosing verbose while submitting spark jobs. In an ideal situation, we try to keep GC overheads < 10% of heap memory.