PySpark & Coalesce

Let’s say you have a big dataset. It’s formed of 1,000,000 files, all around 1MB each, so we have a 1TB dataset.

On HDFS, our default block size is 128MB.

This dataset would be stored on 7,800 partitions (dataset size divided by HDFS block size), meaning your job would have to run across a huge number of partitions & then shuffle the results during the group by, between all 7800 partitions.

Shuffling is the process of moving data between nodes – the more shuffling we have, the higher the latency & latency is bad!

We can use coalesce to address this issue & reduce the number of partitions to a more manageable number.

Below is a sample script, where we’re creating a temporary table, upon which to run SQL.

Sample Script

from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
import argparse, sys
from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import datetime

create a context that supports hive

def create_session(appname):
spark_session = SparkSession\
.builder\
.appName(appname)\
.master(‘yarn’)\
.config(“hive.metastore.uris”, “thrift://server:9083”)\
.enableHiveSupport()\
.getOrCreate()
return spark_session

START MAIN

if name__ == ‘__main‘:
spark_session = create_session(‘Session_Name’)
df1 = spark_session.table(‘database.table1’)
dt_now = datetime.now()

today_unixtime = long(dt_now.strftime(‘%s’)) today_date = datetime.fromtimestamp(today_unixtime).strftime(‘%Y%m%d’)

time = long(dt_now.strftime(‘%s’)) – 4*60*60 four_hours = datetime.fromtimestamp(week_unixtime).strftime(‘%H’)

table1 = df1.coalesce(100).filter((df1.dt == today_date)& (df1.ts_hr == four_hours))

table1.createOrReplaceTempView(“temporaryTable_Name”)

finaldf = spark_session.sql(”’
Your SQL here
”’)