Hands on Spark
Background
进入 21 世纪以来,随着互联网的发展,各类社交媒体,科技软件,传感器等工具一刻不停地生成着越来越多地数据。有一个说法是:
人类现有 90% 的数据来自于过去两年。
这些数据不管是否得到了很好的利用,谁也不能否认它们可以带来的高价值,特别是现阶段以数据和能源作为养料的 artificial intelligence。
大数据集对传统单机数据库造成了不小的麻烦,面对如何处理大量数据并从中洞见到有价值信息的需求,Google 三篇论文——Google File System at 2003, MapReduce at 2004, Bigtable at 2006 启发了无数贡献者,推出了一个又一个大数据开源项目,为人类掀开了大数据时代的巨幕。
其中,受 Google File System 和 MapReduce 启发,Yahoo 的一组工程师在 2006 年开源了 Hadoop, Hadoop 引入了两个技术,分布式存储 (HDFS) 和分布式计算 (MapReduce) ——将大文件切分成小块,分布保存在集群中的多个机器上,每个小块文件备份成多份以防某个节点出错导致文件丢失;处理计算任务时,也将任务分成多个单元,由多个 Excutor 分别执行计算操作,再将结果合并在一起。
这种处理方法利用大量廉价机器使得大数据计算得以实现。但由于其内在机制限制,大量的中间计算结果需要落地磁盘,过程中产生的 I/O 导致整个计算花费大量的时间。随着计算机硬件成本的降低,Spark 作为一个内存计算引擎,凭借其更加高效的计算的优势逐渐取代了 MapReduce 的功能。
Apache Spark
Intro
Apache Spark 是由加州大学伯克利分校的一些研究员在 2009 年推出的一个研究项目,目的就是为了解决 Hadoop 的上述限制。Spark 推出了一个 RDD(Resilient Distributed Dataset) 的概念,使数据得以数据集的形式存储在内存中,使得数据读取和处理都更加快速。
Language
Spark 源码是由 Scala 语言编写,但提供了 Python, Scala, Java, R 的 API 接口进行编程。Python 由于其易用性、丰富的资源库以及和 Data Science, Machine Learning 的紧密结合,其已经成为 Spark 主推的编程语言。下面的演示都以 Python 为例。
Installation
Make sure java is installed on your system PATH, or the JAVA_HOME environment variable pointing to a Java installation.
sudo apt install default-jre
pip install pyspark
Concepts
Spark 任务的运行模式
- Local Mode: 单机运行,资源有限,适合用于手动调试;
spark-submit word_count.py
python word_count.py
spark-submit --master local[*] word_count.py
- Client Mode: 任务的 Driver 在本地运行,实际计算任务分发到集群中的 Worker 节点运行,由于 Driver 在本地运行,可以方便地查看日志和调试信息;
spark-submit --master yarn --deploy-mode client word_count.py
spark-submit --master spark://<master-url>:<port> --deploy-mode client word_count.py
spark-submit --master k8s://<master-url>:<port> --deploy-mode client word_count.py
- Cluster Mode: 提交任务到分布式集群运行,可以利用更高的计算性能,适用于生产环境;
spark-submit --master yarn --deploy-mode cluster word_count.py
spark-submit --master spark://<master-url>:<port> --deploy-mode cluster word_count.py
spark-submit --master k8s://<master-url>:<port> --deploy-mode cluster word_count.py
Spark 资源管理器
Spark 资源管理器负责分配和调度集群资源(CPU, Memory, Disk, Network)
- Standalone: 默认的资源管理器;
- YARN: 由 Hadoop 提供;
- Mesos(Deprecated): 由 Apache Mesos 提供;
- Kubernetes: 由 Kubernetes 提供
Spark 任务物理运行原理
1. 客户端提交任务
用户通过 spark-submit 命令提交一个 Spark 应用程序。这个应用程序包含了用户的代码和依赖项。
2. Driver 进程启动
Spark 应用程序在 Driver 进程中启动。Driver 负责以下任务:
- 解析用户代码: 解析并执行用户代码中的 transformation 和 action 操作;
- 生成 DAG: 将用户代码中的一系列 transformation 操作转换为一个有向无环图(DAG);
- 任务调度: 将 DAG 划分为多个阶段(stages),每个阶段包含一组可以并行执行的任务(tasks)。
3. 资源管理器分配资源
Driver 向集群的资源管理器(如 YARN、Mesos 或 Kubernetes)请求资源。资源管理器分配资源并启动 Executor 进程。
4. Executor 进程启动
Executor 进程在集群的工作节点(Worker Nodes)上启动。每个 Executor 负责以下任务:
- 执行任务: 执行由 Driver 分配的任务;
- 存储数据: 缓存和存储中间结果数据;
- 报告状态: 向 Driver 报告任务的执行状态和结果。
5. 任务执行
Driver 将任务分配给各个 Executor。任务的执行过程如下:
- 读取数据: 从数据源(如 HDFS、S3、Kafka 等)读取数据;
- 执行计算: 根据用户代码中的 transformation 操作对数据进行处理;
- 写入结果: 将计算结果写入到指定的存储位置(如 HDFS、S3、数据库等)。
6. 任务监控和容错
Spark 提供了多种机制来监控和处理任务的执行:
- 任务重试: 如果某个任务失败,Spark 会自动重试该任务;
- 数据备份: Spark 会将数据分片(partitions)备份到多个节点,以防止数据丢失;
- 监控工具: Spark 提供了 Web UI 和其他监控工具,帮助用户监控任务的执行状态和性能。
7. 任务完成
当所有任务都成功完成后,Driver 会将最终结果返回给用户或写入到指定的存储位置。然后,Driver 和 Executor 进程会正常退出,释放资源。
Usage scenarios
Spark SQL
Spark SQL 是 Spark 用于处理结构化数据的模块,它提供了一个编程抽象,支持 SQL 查询、Dataframe API 和 Dataset API,而不需要了解底层分布式计算的细节。
它与 RDD 的不同在于:
- RDD 提供了底层的 RDD API,用户需要编写更多的代码来进行数据处理,适合处理非结构化数据;Spark SQL 将数据抽象为 Dataframe 或 Dataset,提供了更高级的优化和执行策略;
- RDD 类型安全,DataFrame API 是非类型安全的,Dataset API 提供了类型安全的操作;
- Spark SQL 支持 Hive、Avro、Parquet、ORC、JSON、JDBC 等多种数据源,而 RDD 需要用户自己实现对不同数据源的支持;
使用 Dataframe API 案例
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace
# Create a Spark session
spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()
# Read CSV data into a DataFrame
ark_holdings_df = spark.read.csv(path="test_file/ark_holdings.csv",
header=True,
inferSchema=True)
# Perform a query
ark_holdings_df = ark_holdings_df \
.withColumn("market_value", regexp_replace(col("market_value"), "[$,]", "").cast("double"))
result_df = ark_holdings_df \
.filter(col("market_value") > ark_holdings_df.filter(col("ticker") == "U").select(col("market_value")).first()[0]) \
.select(col("ticker"), col("market_value")) \
.orderBy(col("market_value").desc())
# Show the result
result_df.show(15, truncate=False)
# Stop the Spark session
spark.stop()
使用 RDD API 案例
# Import necessary libraries
from pyspark import SparkContext
# Create a Spark context
sc = SparkContext("local", "RDDExample")
# Read CSV data into an RDD
ark_holdings_rdd = sc.textFile("test_file/ark_holdings.csv")
# Perform a query
result_rdd = ark_holdings_rdd.filter(lambda line: "UNITY" in line) \
.map(lambda line: line.split('"')[1].replace(',', ''))
# Show the result
print(result_rdd.collect())
# Stop the Spark context
sc.stop()
RDD 和 Dataframe 相互转换
# Dataframe 转 RDD
df = spark.read.csv("test_file/ark_holdings.csv", header=True, inferSchema=True)
rdd = df.rdd
# RDD 转 Dataframe
rdd = spark.sparkContext.parallelize([(1, "Alice"), (2, "Bob"), (3, "Cathy")])
# 定义 DataFrame 的 schema
from pyspark.sql import Row
df = rdd.map(lambda x: Row(id=x[0], name=x[1])).toDF()
Action 算子
Spark 有一个特性 Lazy Evaluation,当一个 Spark 操作被提交时,Spark 并不会立即执行任务,而是将任务转换为一系列的 RDD 操作,只有当遇到 Action 算子时,Spark 才会真正执行任务。通过 Lazy Evaluation, Spark 可以优化执行计划,避免存储大量中间结果,提高任务的执行效率。
常见的 RDD action 算子有:collect, count, first, take, takeSample, reduce, fold, aggregate, foreach, saveAsTextFile, saveAsSequenceFile, saveAsObjectFile, countByKey, countByValue, takeOrdered, top, foreachPartition.
常见的 DataFrame action 算子有:collect, count, first, take, show, head, foreach, write, describe, summary, toPandas.
总的来说,Spark SQL 更适合处理结构化数据和需要高效查询优化的场景,而 RDD 更适合处理非结构化数据和需要自定义处理逻辑的场景。
需要特别注意的是,Spark SQL 性能调优需要考虑的因素很多,包括数据倾斜、数据分区、数据格式、数据压缩、数据缓存等,根据实际情况参考 Spark SQL 性能调优 进行调整。
Pandas API on Spark
Pandas API on Spark 是 Spark 提供的用于处理大规模数据集的 API,它提供了与 Pandas 类似的 API,使得用户可以方便地将 Pandas 的代码迁移到 Spark 上运行。
Pandas API on Spark 案例
# Import necessary libraries
from pyspark.sql import SparkSession
import pandas as pd
# Create a Spark session
spark = SparkSession.builder.appName("PandasAPISpark").getOrCreate()
# Create a DataFrame
data = [("Alice", 25), ("Bob", 30), ("Cathy", 35)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
# Convert DataFrame to Pandas DataFrame
pandas_df = df.toPandas()
# Perform operations on Pandas DataFrame
pandas_df['age_12_years_ago'] = pandas_df['age'] - 12
# Convert Pandas DataFrame back to Spark DataFrame
spark_df = spark.createDataFrame(pandas_df)
# Show the result
spark_df.show()
# Stop the Spark session
spark.stop()
MLlib
MLlib 是 Apache Spark 的机器学习库,提供了一系列用于构建和训练机器学习模型的工具和算法。MLlib 旨在简化机器学习的工作流程,并支持大规模数据集的处理。
GraphX
GraphX 是 Apache Spark 的图计算库,旨在处理大规模图数据。GraphX 提供了一个统一的 API,用于图的创建、操作和分析,支持图的并行计算。
Structured Streaming
Structured Streaming 是 Spark 的流处理库,用于构建可扩展的流式数据处理应用程序。Structured Streaming 允许用户以批处理的方式处理流数据,同时保持了与批处理相同的编程模型。
Structured Streaming 案例
1. 环境准备
- 安装 zookeeper, kafka 以及必要的一些依赖包
brew install zookeeper kafka pip install kafka-python wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.0.0/spark-sql-kafka-0-10_2.12-3.0.0.jar wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly_2.12/3.0.0/spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.9.0/commons-pool2-2.9.0.jar
- 启动 zookeeper
zookeeper-server-start /opt/homebrew/etc/kafka/zookeeper.properties
- 启动 kafka
kafka-server-start /opt/homebrew/etc/kafka/server.properties
- 创建 topic
kafka-topics --create --topic structured_streaming_demo --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
- 启动 kafka 生产者
kafka-console-producer --topic structured_streaming_demo --bootstrap-server localhost:9092
- 启动 spark shell
pyspark --jars jars/spark-sql-kafka-0-10_2.12-3.0.0.jar,jars/spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar,jars/commons-pool2-2.9.0.jar
2. 代码实现
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Create SparkSession
spark = SparkSession.builder \
.appName("StructuredStreamingExample") \
.config("spark.sql.streaming.forceDeleteTempCheckpointLocation", 'true') \
.getOrCreate()
# Create DataFrame representing the stream of input data
data = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "structured_streaming_demo") \
.option("startingOffsets", "earliest") \
.load()
# Select the necessary columns
ark_holdings = data.select(split(data.value, ",")[0].alias("date"),
split(data.value, ",")[1].alias("fund_name"),
split(data.value, ",")[2].alias("company_name"),
split(data.value, ",")[3].alias("ticker"),
split(data.value, ",")[6].alias("shares"),
split(data.value, ",")[7].alias("market_value"),
split(data.value, ",")[5].alias("weight"))
# if market_value > 1000000, show the data
weight_stock = ark_holdings.filter(col('market_value') > 1000000)
# Start the query to stream the data
query = weight_stock \
.writeStream \
.outputMode("append") \
.format("console") \
.start()
# Wait for the query to finish
query.awaitTermination()
Spark 性能调优
Spark 性能调优是一个复杂的过程,除了代码层面的优化,这是一些配置参数,以下是一些常见的性能调优方法:
资源管理
- 调整资源分配: 根据集群的资源情况,合理分配 CPU、内存、磁盘和网络资源,确保每个任务都能获得足够的资源;
- 设置资源预取: 在任务开始前,预先分配好所需的资源,减少任务启动时间;
- 监控资源使用情况: 使用 Spark 的监控工具,如 Spark Web UI、Ganglia、Prometheus 等,监控集群的资源使用情况,及时发现并解决资源瓶颈。
数据分区
- 合理设置分区数: 根据数据量和集群的资源情况,合理设置分区数,减少数据倾斜和资源浪费;
- 数据倾斜: 数据倾斜是指某些分区中的数据量远大于其他分区,导致某些任务运行时间过长,甚至导致任务失败。可以通过增加分区数、调整分区大小、使用随机分区等方式解决数据倾斜问题;
- 数据预分区: 在数据加载时,根据业务需求和集群的资源情况,合理设置数据分区,减少数据倾斜和资源浪费。
数据格式
- 选择合适的数据格式: 根据数据的特点和业务需求,选择合适的数据格式,如 Parquet、ORC、Avro 等,减少数据存储和传输的开销;
- 数据压缩: 使用数据压缩算法,如 Snappy、Gzip、LZO 等,减少数据存储和传输的开销。
数据缓存
- 合理设置缓存策略: 根据数据的特点和业务需求,合理设置缓存策略,减少数据读取和处理的开销。
To be continued…