Getting started with PySpark

Facebook
Google+
Twitter
LinkedIn
Tumblr

In this article, we’ll look at some of the key components of PySpark, which is one of the most in-demand big data technologies at the current time.

Spark Session

Spark session is the front door to all we do in Spark. Since version 2, it’s replaced the older Spark Context and enables us to interact with the Spark APIs.

Those APIs help us to ingest and manipulate our data in the dataframe abstraction of the older RDD.

Define the spark session
In the below, we’re doing a number of things:

  • Importing the required library
  • Defining our session parameters
  • Creating a Spark Session, using those parameters

So, once all that is done, we can start interacting with the other APIs

Get data into a dataframe

In the below, we are defining a schema – aptly named ‘schema’. In this example, our schema has two fields, one being a string and one an integer. In the example, both fields are nullable (can contain null values) as shown by ‘true’.

In the above, I finalise the schema & call it ‘final_structure’. In the below, we pass ‘final_structure’ into the .csv pathname element of the configuration. This way, the dataframe knows the datatypes to expect.

We can also bring data in from a Hive table. In the below, I am defining df1 (dataframe1) as the being the contents of the netshock table within the web database.

Manipulate the data within the dataframe

The PySpark SQL functions module, enables us to interact with our dataframes in an SQL-like way. Below, we have an example of this:

In the below, you’ll notice the select, group by & aggregations that we’re used to in SQL. We start off by selecting all of the input columns. We then, state the columns upon which we wish to group by. Next, we define the aggregations we wish to carry out – in this case, a sum, count distinct and overall count.

Finally, we select the output list of fields (that is, the group by fields + the newly defined aggregate fields (using their alias)).

We can also filter our dataframes, as often, we will want to limit the data we are analysing – this is the equivalent to the ‘where’ clause in SQL. In the below, I have filtered my dataframe to show only those records where the date is greater than 10th October 2018 and where the location is like London, which would pick up London Bridge, Tower of London etc….

Output the data

Of course, once we have finished our analysis, we’ll want to output the data somewhere. In the below, we’re inserting the resulting dataframe into a Hive table.

To do this, we need to create a temporary view of our dataframe, which I will call ‘temporarytable’. From this, we can then call the spark_session.sql functions & insert the contents of temporary table into the Hive table using regular SQL statements.

We may rather output our data to CSV. We can do that with the following line of code. Let’s break it down:

  • df is the name of the dataframe I wish to write to CSV
  • .write, states that we’re writing, not reading a csv from the given directory
  • .format, tells Spark that we are writing to csv format
  • .option(“header”, “false”) states that we do not want to print column titles into the CSV
  • .option(‘sep’, ‘\t’) tells us to write this as a tab (\t) delimited file
  • .mode(‘append’), appends files to the directory, rather than overwriting
  • .save(‘directory_path’) tells us where to write the file

A sample script

Here is a simple script, demonstrating some of the above concepts: