An attempt to spark your interest in PySpark

Rohan Paris
6 min readMay 22, 2022

PySpark is a python API for Apache Spark. Using PySpark we can run applications parallelly on the distributed cluster (multiple nodes) or even on a single node.

Overview:
1. What is Apache Spark?
2. Difference between pandas and pyspark
3. What is Databricks?
4. Useful PySpark commands
5. Reference

1. What is Apache Spark?

Apache Spark is an analytical processing engine for large scale powerful distributed data processing and machine learning applications. It is written in Scala programming language.

As mentioned earlier, PySpark is a python API for Apache Spark and it helps to interface with Resilient Distributed Datasets (RDDs) by leveraging the Py4j library. Py4j is a popular library that is integrated within PySpark and allows Python to dynamically interface with JVM (Java Virtual Machine) objects.

PySpark supports most of Spark’s features such as Spark SQL, DataFrame, Streaming, MLlib (Machine Learning), and Spark Core.

image credits: spark.apache.org

To know more about PySpark please refer to the official documentation here.

2. Difference between pandas and pyspark

Pandas is one of the most popular data science libraries and it is used to work with tabular structured data for analysis and preprocessing. It does not support distributed processing and hence data manipulation becomes complex and time-consuming when working with large datasets.

PySpark supports distributed processing and hence processes large datasets much quicker than pandas.

3. What is Databricks?

Databricks is an American enterprise software company founded by the creators of Apache Spark. Databricks develops a web-based platform for working with Spark, that provides automated cluster management and IPython-style notebooks.

Databricks offers a free community version and a paid business development version. We will now set up a new account using the free community version by using this link.

enter your details and get started

Once signed in, your page will look something like this

Import a dataset:

Click on the ‘Data Import’ option and then choose the dataset from your local system.

Once you have uploaded a dataset you will get the below page

The uploaded zomato dataset is now present in the below location i.e. /DBFS/FileStore/tables/

Create a cluster:

To create a new cluster, click on Compute (from the left pane) and then click on the ‘Create Cluster’ button. The free version only allows the creation of a single cluster.

Enter the details and then click on the ‘Create Cluster’ button.

A green tick mark will appear once the cluster is running.

To install any new libraries, navigate to Libraries > Install new > PyPi > and then enter the library names with comma-separated > click Install

Create a new notebook:

To create a new notebook, click on the ‘+’ > Notebook > Give the notebook a name and choose language > Create

4. Useful PySpark commands

a. Load a dataset:

# File location and type
file_location = “/FileStore/tables/Car_details.csv”
file_type = “csv”
# Read the csv file and store it into a dataframe
df_data = spark.read.csv(file_location, header=True, inferSchema=True)
# display the datafame
df_data.show(5)

Note: If we were to run PySpark in our local system then we have to use the below code to start a spark session and then proceed with data loading and preprocessing tasks.

#install pyspark
!pip install pyspark
import pyspark#start a spark session
from pyspark.sql import SparkSession
#create a session
spark = SparkSession.builder.appName(‘give_appname’).getOrCreate()

b. Get the column datatypes:

df_data.printSchema()

c. Get column description:

df_data.describe().show()

d. Get a subset of data:

df_data.select([‘name’, ‘year’, ‘km_driven’]).show()

e. Get a list of column names:

df_data.columns

f. Drop a column:

df_data=df_data.drop(‘column_name’)

g. Rename a column:

df_data=df_data.withColumnRenamed(‘column_name_old’, ‘column_name_new’)

h. Handling missing values:

To get the count of missing values in each column, we will use the following code

from pyspark.sql.functions import col,isnan,when,countdf2 = df_data.select([count(when(col(c).contains(‘None’) | \
col(c).contains(‘NULL’) | \ (col(c) == ‘’ ) | \ col(c).isNull() | \ isnan(c), c )).alias(c)
for c in df_data.columns])
df2.show()

We can use below code to get the count of missing values in a single column

df_data.filter(df_data.column_name.isNull()).show()#filter null on multiple columns
df_data.filter(df_data.column_name1.isNull() & df_data.column_name2.isNull()).show()

h.1. DROP function:

The drop function has three parameters namely how, thres, subset.

how parameter accepts two values namely -i) all: to drop a record when all the values are null ; ii) any: to drop a record if any one of it’s value is null

thres parameter is used to specify the minimum number of null values required to drop a record.

subset parameter is used to specify the dataset subset which needs to be checked.

DROP the entire record if all the values are null:

df_data = df_data.na.drop(how=’all’)

DROP the entire record if it contains a null value:

df_data = df_data.na.drop(how=’any’)

h.2. FILL function:

The fill function has two parameters namely value and subset.

df_data = df_data.na.fill('missing')

The above code will replace the null values with the keyword ‘missing’

h.3. IMPUTER function:

To use the imputer function we first need to import it

from pyspark.ml.feature import Imputer
imputer parameters

inputsCols is used to specify a list of columns and inputCol is used to specify a single column that needs to be considered while checking for null values.

Similarly, outputCol and outputCols are used to specify the single or multiple output columns depending on the inputs.

strategy parameter defines the technique that needs to be applied on the target column to impute it’s null values. Acceptible values are ‘mean’, ‘median’, ‘mode’

#import imputer library
from pyspark.ml.feature import Imputer
#set input columns
inputCols = ['cname1', 'cname2', 'cname3']
#define imputer
imputer = Imputer(
inputCols = inputCols,
outputCols = ["{}_imputed".format(c) for c in inputCols]).setStrategy("mean")
#apply imputer
imputer.fit(df_data).transform(df_data)

i. Filter dataframe:

We can use the filter function to extract the required data from a dataframe

#query1
df_data.filter(“year > 2010”).show(5)
#query2
df_data.filter(df_data['year'] > 2010).show(5)

query 1 and 2 give the same data that is the top 5 rows where year is greater than 2010

We can use the ~ symbol to get the inverse of a filter

df_data.filter(~(df_data[‘year’] > 2010)).show(5)

The above code gives the top 5 rows where year is less than 2010

To filter the dataframes based on more than one condition we can use the & | operators

df_data.filter((df_data[‘selling_price’]>200000)&(df_data[‘selling_price’]< 400000)).show(5)

The above code filters dataframe where selling price is between 2lakh and 4lakh

df_data.filter((df_data[‘transmission’]==’Automatic’)|(df_data[‘transmission’]==’Manual’)).show(5)

The above code filters dataframe where the transmission is either manual or automatic.

j. Get distinct values from a column:

df_data.select(‘column_name’).distinct().collect()

k. Group data:

Group data to get count

df_data.groupBy(‘column_name’).count().show()

Group data to get mean values:

df_data.groupBy(‘column_name’).mean().show()

Group data to get sum:

df_data.groupBy(‘column_name’).sum().show()

5. Reference

5.1. Youtube tutorial from Krish Naik and freecodecamp:

5.2. Kaggle Dataset link:

--

--