As part of my learning journey and as a requirement for my new project, I have started exploring Pyspark. In this article, I shall explain Pyspark in a brief from head to tail. I’m sure, I can give you some picture through this article.
Let us start our learning journey.
PySpark is the Python API for Apache Spark, an open-source, distributed computing system used for big data processing. Apache Spark can handle large-scale data and distribute it across clusters, which allows for fast computation and efficient processing of big datasets.
Installation
pip install pyspark
To use PySpark, you first need to create a SparkSession:
from pyspark.sql import SparkSession
# Create Spark session
spark = SparkSession.builder \
.appName(“MyApp”) \
.getOrCreate()
# Check Spark version
print(spark.version)
2. DataFrames and RDDs
- DataFrame: Higher-level abstraction for structured data (rows and columns) that resembles Pandas DataFrame.
# Creating a DataFrame from a list of tuples
data = [(“Alice”, 25), (“Bob”, 30), (“Cathy”, 29)]
columns = [“Name”, “Age”]
df = spark.createDataFrame(data, schema=columns)
df.show() - RDD (Resilient Distributed Dataset): Lower-level API to work with raw data.
# Creating an RDD from a list
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)
3. Transformations and Actions
In PySpark, there are two types of operations:
- Transformations: Lazily evaluated operations that define what you want to do with data (e.g., map, filter).
- Actions: Trigger execution and produce results (e.g., collect, count).
# Transformations: map and filter
rdd_transformed = rdd.map(lambda x: x * 2).filter(lambda x: x > 5)
# Action: collect the result
result = rdd_transformed.collect()
print(result)
In the example above, map and filter are transformations, and collect is an action that triggers the computation.
DataFrames also support a wide range of transformations.
# Selecting specific columns
df.select(“Name”).show()
# Filtering rows based on a condition
df.filter(df[“Age”] > 26).show()
4. PySpark SQL
You can use SQL queries directly in PySpark using the spark.sql() function.
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView(“people”)
# SQL query
result = spark.sql(“SELECT * FROM people WHERE Age > 25”)
result.show()
5. Machine Learning with PySpark
PySpark provides a machine learning library called MLlib. The high-level machine learning API is found in the pyspark.ml module, which works with DataFrames.
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
# Sample data
data = [(1, 10), (2, 20), (3, 30), (4, 40), (5, 50)]
df = spark.createDataFrame(data, [“feature”, “label”])
# Transform feature column into a vector
assembler = VectorAssembler(inputCols=[“feature”], outputCol=”features”)
df_transformed = assembler.transform(df)
# Train linear regression model
lr = LinearRegression(featuresCol=”features”, labelCol=”label”)
model = lr.fit(df_transformed)
# Show model coefficients
print(f”Coefficients: {model.coefficients}, Intercept: {model.intercept}”)
6. Optimization and Best Practices
Efficient use of PySpark requires some best practices for performance optimization:
- Use Broadcast Variables: For small datasets, broadcast them to every node using broadcast().
- Caching: Cache DataFrames or RDDs that are used multiple times to avoid recomputation.
- Avoid Shuffle: Reduce operations that involve shuffle (like groupBy).
- Use Partitioning: Split data into partitions based on logical keys for large-scale datasets.
# Cache a DataFrame in memory
df_cached = df.cache()
df_cached.count() # Action to trigger caching
7. Advanced Features
- Window Functions: Perform operations over a sliding window.
- Joins: Efficiently join two large datasets.
- User-Defined Functions (UDFs): Apply custom functions to DataFrames.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define a Python function
def capitalize_name(name):
return name.upper()
# Register UDF
capitalize_udf = udf(capitalize_name, StringType())
# Apply UDF to DataFrame
df_with_capital_names = df.withColumn(“Name”, capitalize_udf(df[“Name”]))
df_with_capital_names.show()
8. Running PySpark on a Cluster
Once you are familiar with running PySpark locally, you can scale up by running it on a cluster. Here’s how the general architecture works:
- Driver Program: Coordinates tasks, schedules, and resource allocation.
- Workers: Execute the actual computations.
- Cluster Managers: Allocate resources, such as YARN or Mesos.
PySpark offers a powerful framework to work with large datasets using distributed computing. With its abstractions like RDDs and DataFrames, it simplifies the management of large-scale data, while SQL and MLlib provide intuitive high-level APIs. The keys to success with PySpark are learning how to manipulate data efficiently, apply machine learning models, and optimize performance on a cluster.
Not an expert in Pyspark, just now learning. Will alter the article as I learn new things in Pyspark.
Check out this link to know more about me
Let’s get to know each other!
https://lnkd.in/gdBxZC5j
Get my books, podcasts, placement preparation, etc.
https://linktr.ee/aamirp
Get my Podcasts on Spotify
https://lnkd.in/gG7km8G5
Catch me on Medium
https://lnkd.in/gi-mAPxH
Follow me on Instagram
https://lnkd.in/gkf3KPDQ
Udemy
Udemy (Python Course)
https://lnkd.in/grkbfz_N
YouTube
https://www.youtube.com/@knowledge_engine_from_AamirP
Subscribe to my Channel for more useful content.