Pyspark — Aamir P

Aamir P
4 min readSep 30, 2024

--

As part of my learning journey and as a requirement for my new project, I have started exploring Pyspark. In this article, I shall explain Pyspark in a brief from head to tail. I’m sure, I can give you some picture through this article.

Let us start our learning journey.

PySpark is the Python API for Apache Spark, an open-source, distributed computing system used for big data processing. Apache Spark can handle large-scale data and distribute it across clusters, which allows for fast computation and efficient processing of big datasets.

Installation

pip install pyspark

To use PySpark, you first need to create a SparkSession:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
.appName(“MyApp”) \
.getOrCreate()

# Check Spark version
print(spark.version)

2. DataFrames and RDDs

  • DataFrame: Higher-level abstraction for structured data (rows and columns) that resembles Pandas DataFrame.
    # Creating a DataFrame from a list of tuples
    data = [(“Alice”, 25), (“Bob”, 30), (“Cathy”, 29)]
    columns = [“Name”, “Age”]
    df = spark.createDataFrame(data, schema=columns)
    df.show()
  • RDD (Resilient Distributed Dataset): Lower-level API to work with raw data.
    # Creating an RDD from a list
    data = [1, 2, 3, 4, 5]
    rdd = spark.sparkContext.parallelize(data)

3. Transformations and Actions

In PySpark, there are two types of operations:

  1. Transformations: Lazily evaluated operations that define what you want to do with data (e.g., map, filter).
  2. Actions: Trigger execution and produce results (e.g., collect, count).

# Transformations: map and filter
rdd_transformed = rdd.map(lambda x: x * 2).filter(lambda x: x > 5)

# Action: collect the result
result = rdd_transformed.collect()
print(result)

In the example above, map and filter are transformations, and collect is an action that triggers the computation.

DataFrames also support a wide range of transformations.
# Selecting specific columns
df.select(“Name”).show()

# Filtering rows based on a condition
df.filter(df[“Age”] > 26).show()

4. PySpark SQL

You can use SQL queries directly in PySpark using the spark.sql() function.
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView(“people”)

# SQL query
result = spark.sql(“SELECT * FROM people WHERE Age > 25”)
result.show()

5. Machine Learning with PySpark

PySpark provides a machine learning library called MLlib. The high-level machine learning API is found in the pyspark.ml module, which works with DataFrames.

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Sample data
data = [(1, 10), (2, 20), (3, 30), (4, 40), (5, 50)]
df = spark.createDataFrame(data, [“feature”, “label”])

# Transform feature column into a vector
assembler = VectorAssembler(inputCols=[“feature”], outputCol=”features”)
df_transformed = assembler.transform(df)

# Train linear regression model
lr = LinearRegression(featuresCol=”features”, labelCol=”label”)
model = lr.fit(df_transformed)

# Show model coefficients
print(f”Coefficients: {model.coefficients}, Intercept: {model.intercept}”)

6. Optimization and Best Practices

Efficient use of PySpark requires some best practices for performance optimization:

  1. Use Broadcast Variables: For small datasets, broadcast them to every node using broadcast().
  2. Caching: Cache DataFrames or RDDs that are used multiple times to avoid recomputation.
  3. Avoid Shuffle: Reduce operations that involve shuffle (like groupBy).
  4. Use Partitioning: Split data into partitions based on logical keys for large-scale datasets.

# Cache a DataFrame in memory
df_cached = df.cache()
df_cached.count() # Action to trigger caching

7. Advanced Features

  • Window Functions: Perform operations over a sliding window.
  • Joins: Efficiently join two large datasets.
  • User-Defined Functions (UDFs): Apply custom functions to DataFrames.

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define a Python function
def capitalize_name(name):
return name.upper()

# Register UDF
capitalize_udf = udf(capitalize_name, StringType())

# Apply UDF to DataFrame
df_with_capital_names = df.withColumn(“Name”, capitalize_udf(df[“Name”]))
df_with_capital_names.show()

8. Running PySpark on a Cluster

Once you are familiar with running PySpark locally, you can scale up by running it on a cluster. Here’s how the general architecture works:

  • Driver Program: Coordinates tasks, schedules, and resource allocation.
  • Workers: Execute the actual computations.
  • Cluster Managers: Allocate resources, such as YARN or Mesos.

PySpark offers a powerful framework to work with large datasets using distributed computing. With its abstractions like RDDs and DataFrames, it simplifies the management of large-scale data, while SQL and MLlib provide intuitive high-level APIs. The keys to success with PySpark are learning how to manipulate data efficiently, apply machine learning models, and optimize performance on a cluster.

Not an expert in Pyspark, just now learning. Will alter the article as I learn new things in Pyspark.

Check out this link to know more about me

Let’s get to know each other!
https://lnkd.in/gdBxZC5j

Get my books, podcasts, placement preparation, etc.
https://linktr.ee/aamirp

Get my Podcasts on Spotify
https://lnkd.in/gG7km8G5

Catch me on Medium
https://lnkd.in/gi-mAPxH

Follow me on Instagram
https://lnkd.in/gkf3KPDQ

Udemy
Udemy (Python Course)
https://lnkd.in/grkbfz_N

YouTube
https://www.youtube.com/@knowledge_engine_from_AamirP

Subscribe to my Channel for more useful content.

--

--

Aamir P
Aamir P

Written by Aamir P

Hi! This is Aamir P. I am working as a Data Engineer. Google search AAMIR P to get my books from Amazon! Follow my medium account to get motivational content.

No responses yet