<-

Hands on Spark

Background

进入21世纪以来,随着互联网的发展,各类社交媒体,科技软件,传感器等工具一刻不停地生成着越来越多地数据。有一个说法是:

人类现有90%的数据来自于过去两年。

这些数据不管是否得到了很好的利用,谁也不能否认它们可以带来的高价值,特别是现阶段以数据和能源作为养料的 artificial intelligence。

大数据集对传统单机数据库造成了不小的麻烦,面对如何处理大量数据并从中洞见到有价值信息的需求,Google 三篇论文——Google File System at 2003, MapReduce at 2004, Bigtable at 2006 启发了无数贡献者,推出了一个又一个大数据开源项目,为人类掀开了大数据时代的巨幕。

其中,受 Google File System 和 MapReduce 启发,yahoo 的一组工程师在 2006 年开源了 Hadoop, Hadoop 引入了两个技术,分布式存储 (HDFS) 和分布式计算 (MapReduce) ——将大文件切分成小块,分布保存在集群中的多个机器上,每个小块文件备份成多份以防某个节点出错导致文件丢失;处理计算任务时,也将任务分成多个单元,由多个 excutor 分别执行计算操作,再将结果合并在一起。

这种处理方法利用大量廉价机器使得大数据计算得以实现。但由于其内在机制限制,大量的中间计算结果需要落地磁盘,过程中产生的 I/O 导致整个计算花费大量的时间。随着计算机硬件水平和成本的降低,Spark 作为一个内存计算引擎,凭借其更加高效的计算的优势逐渐取代了 MapReduce 的功能。

Apache Spark

Intro

Apache Spark 是由加州大学伯克利分校的一些研究员在 2009 年推出的一个研究项目,目的就是为了解决 Hadoop 的上述限制。Spark 推出了一个 RDD(Resilient Distributed Dataset) 的概念,使数据得以数据集的形式存储在内存中,使得数据读取和处理都更加快速。

Language

Spark 源码是由 Scala 语言编写,但提供了 Python, Scala, Java, R 的 API 接口进行编程。Python 由于其易用性、丰富的资源库以及和 Data Science, Machine Learning 的紧密结合,其已经成为 Spark 主推的编程语言。下面的演示都以 Python 为例。

Installation

Make sure java is installed on your system PATH, or the JAVA_HOME environment variable pointing to a Java installation.

sudo apt install default-jre
pip install pyspark

Concepts

Spark 任务的运行模式
  • Local Mode: 单机运行,资源有限,适合用于手动调试;
spark-submit word_count.py

python word_count.py

spark-submit --master local[*] word_count.py
  • Client Mode: 任务的 Driver 在本地运行,实际计算任务分发到集群中的 Worker 节点运行,由于 Driver 在本地运行,可以方便地查看日志和调试信息;
spark-submit --master yarn --deploy-mode client word_count.py

spark-submit --master spark://<master-url>:<port> --deploy-mode client word_count.py

spark-submit --master k8s://<master-url>:<port> --deploy-mode client word_count.py
  • Cluster Mode: 提交任务到分布式集群运行,可以利用更高的计算性能,适用于生产环境;
spark-submit --master yarn --deploy-mode cluster word_count.py

spark-submit --master spark://<master-url>:<port> --deploy-mode cluster word_count.py

spark-submit --master k8s://<master-url>:<port> --deploy-mode cluster word_count.py
Spark 资源管理器

Spark 资源管理器负责分配和调度集群资源(CPU, Memory, Disk, Network)

  1. Standalone: 默认的资源管理器;
  2. YARN: 由 Hadoop 提供;
  3. Mesos(Deprecated): 由 Apache Mesos 提供;
  4. Kubernetes: 由 Kubernetes 提供
Spark 任务物理运行原理

1. 客户端提交任务

用户通过 spark-submit 命令提交一个 Spark 应用程序。这个应用程序包含了用户的代码和依赖项。

2. Driver 进程启动

Spark 应用程序在 Driver 进程中启动。Driver 负责以下任务:

  • 解析用户代码: 解析并执行用户代码中的 transformation 和 action 操作;
  • 生成 DAG: 将用户代码中的一系列 transformation 操作转换为一个有向无环图(DAG);
  • 任务调度: 将 DAG 划分为多个阶段(stages),每个阶段包含一组可以并行执行的任务(tasks)。

3. 资源管理器分配资源

Driver 向集群的资源管理器(如 YARN、Mesos 或 Kubernetes)请求资源。资源管理器分配资源并启动 Executor 进程。

4. Executor 进程启动

Executor 进程在集群的工作节点(Worker Nodes)上启动。每个 Executor 负责以下任务:

  • 执行任务: 执行由 Driver 分配的任务;
  • 存储数据: 缓存和存储中间结果数据;
  • 报告状态: 向 Driver 报告任务的执行状态和结果。

5. 任务执行

Driver 将任务分配给各个 Executor。任务的执行过程如下:

  • 读取数据: 从数据源(如 HDFS、S3、Kafka 等)读取数据;
  • 执行计算: 根据用户代码中的 transformation 操作对数据进行处理;
  • 写入结果: 将计算结果写入到指定的存储位置(如 HDFS、S3、数据库等)。

6. 任务监控和容错

Spark 提供了多种机制来监控和处理任务的执行:

  • 任务重试: 如果某个任务失败,Spark 会自动重试该任务;
  • 数据备份: Spark 会将数据分片(partitions)备份到多个节点,以防止数据丢失;
  • 监控工具: Spark 提供了 Web UI 和其他监控工具,帮助用户监控任务的执行状态和性能。

7. 任务完成

当所有任务都成功完成后,Driver 会将最终结果返回给用户或写入到指定的存储位置。然后,Driver 和 Executor 进程会正常退出,释放资源。

Usage scenarios

Spark SQL

Spark SQL 是 Spark 用于处理结构化数据的模块,它提供了一个编程抽象,支持 SQL 查询、Dataframe API 和 Dataset API,而不需要了解底层分布式计算的细节。

它与 RDD 的不同在于:

  1. RDD 提供了底层的 RDD API,用户需要编写更多的代码来进行数据处理,适合处理非结构化数据;Spark SQL 将数据抽象为 Dataframe 或 Dataset,提供了更高级的优化和执行策略;
  2. RDD 类型安全,DataFrame API 是非类型安全的,Dataset API 提供了类型安全的操作;
  3. Spark SQL 支持 HiveAvroParquetORCJSONJDBC 等多种数据源,而 RDD 需要用户自己实现对不同数据源的支持;

使用 Dataframe API 案例

# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace

# Create a Spark session
spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()

# Read CSV data into a DataFrame
ark_holdings_df = spark.read.csv(path="test_file/ark_holdings.csv", 
                    header=True, 
                    inferSchema=True)

# Perform a query
ark_holdings_df = ark_holdings_df \
    .withColumn("market_value", regexp_replace(col("market_value"), "[$,]", "").cast("double"))
result_df = ark_holdings_df \
    .filter(col("market_value") > ark_holdings_df.filter(col("ticker") == "U").select(col("market_value")).first()[0]) \
    .select(col("ticker"), col("market_value")) \
    .orderBy(col("market_value").desc())

# Show the result
result_df.show(15, truncate=False)

# Stop the Spark session
spark.stop()

使用 RDD API 案例

# Import necessary libraries
from pyspark import SparkContext

# Create a Spark context
sc = SparkContext("local", "RDDExample")

# Read CSV data into an RDD
ark_holdings_rdd = sc.textFile("test_file/ark_holdings.csv")

# Perform a query
result_rdd = ark_holdings_rdd.filter(lambda line: "UNITY" in line) \
    .map(lambda line: line.split('"')[1].replace(',', ''))

# Show the result
print(result_rdd.collect())

# Stop the Spark context
sc.stop()

RDD 和 Dataframe 相互转换

# Dataframe 转 RDD
df = spark.read.csv("test_file/ark_holdings.csv", header=True, inferSchema=True)
rdd = df.rdd

# RDD 转 Dataframe
rdd = spark.sparkContext.parallelize([(1, "Alice"), (2, "Bob"), (3, "Cathy")])
# 定义 DataFrame 的 schema
from pyspark.sql import Row
df = rdd.map(lambda x: Row(id=x[0], name=x[1])).toDF()

Action 算子

Spark 有一个特性 Lazy Evaluation,当一个 Spark 操作被提交时,Spark 并不会立即执行任务,而是将任务转换为一系列的 RDD 操作,只有当遇到 Action 算子时,Spark 才会真正执行任务。通过 Lazy Evaluation, Spark 可以优化执行计划,避免存储大量中间结果,提高任务的执行效率。

常见的 RDD action 算子有:collect, count, first, take, takeSample, reduce, fold, aggregate, foreach, saveAsTextFile, saveAsSequenceFile, saveAsObjectFile, countByKey, countByValue, takeOrdered, top, foreachPartition.

常见的 DataFrame action 算子有:collect, count, first, take, show, head, foreach, write, describe, summary, toPandas.

总的来说,Spark SQL 更适合处理结构化数据和需要高效查询优化的场景,而 RDD 更适合处理非结构化数据和需要自定义处理逻辑的场景。

需要特别注意的是,Spark SQL 性能调优需要考虑的因素很多,包括数据倾斜、数据分区、数据格式、数据压缩、数据缓存等,根据实际情况参考 Spark SQL 性能调优 进行调整。

Pandas API on Spark

Pandas API on Spark 是 Spark 提供的用于处理大规模数据集的 API,它提供了与 Pandas 类似的 API,使得用户可以方便地将 Pandas 的代码迁移到 Spark 上运行。

Pandas API on Spark 案例

# Import necessary libraries
from pyspark.sql import SparkSession
import pandas as pd

# Create a Spark session
spark = SparkSession.builder.appName("PandasAPISpark").getOrCreate()

# Create a DataFrame
data = [("Alice", 25), ("Bob", 30), ("Cathy", 35)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)

# Convert DataFrame to Pandas DataFrame
pandas_df = df.toPandas()

# Perform operations on Pandas DataFrame
pandas_df['age_12_years_ago'] = pandas_df['age'] - 12

# Convert Pandas DataFrame back to Spark DataFrame
spark_df = spark.createDataFrame(pandas_df)

# Show the result
spark_df.show()

# Stop the Spark session
spark.stop()
MLlib

MLlib 是 Apache Spark 的机器学习库,提供了一系列用于构建和训练机器学习模型的工具和算法。MLlib 旨在简化机器学习的工作流程,并支持大规模数据集的处理。

GraphX

GraphX 是 Apache Spark 的图计算库,旨在处理大规模图数据。GraphX 提供了一个统一的 API,用于图的创建、操作和分析,支持图的并行计算。

Structured Streaming

Structured Streaming 是 Spark 的流处理库,用于构建可扩展的流式数据处理应用程序。Structured Streaming 允许用户以批处理的方式处理流数据,同时保持了与批处理相同的编程模型。

Structured Streaming 案例

1. 环境准备

  • 安装 zookeeper, kafka 以及必要的一些依赖包
    brew install zookeeper kafka
    pip install kafka-python
    wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.0.0/spark-sql-kafka-0-10_2.12-3.0.0.jar
    wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly_2.12/3.0.0/spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar
    wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.9.0/commons-pool2-2.9.0.jar
    
  • 启动 zookeeper
    zookeeper-server-start /opt/homebrew/etc/kafka/zookeeper.properties
    
  • 启动 kafka
    kafka-server-start /opt/homebrew/etc/kafka/server.properties
    
  • 创建 topic
    kafka-topics --create --topic structured_streaming_demo --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
    
  • 启动 kafka 生产者
    kafka-console-producer --topic structured_streaming_demo --bootstrap-server localhost:9092
    
  • 启动 spark shell
    pyspark --jars jars/spark-sql-kafka-0-10_2.12-3.0.0.jar,jars/spark-streaming-kafka-0-10-assembly_2.12-3.0.0.jar,jars/commons-pool2-2.9.0.jar
    

2. 代码实现

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create SparkSession
spark = SparkSession.builder \
.appName("StructuredStreamingExample") \
.config("spark.sql.streaming.forceDeleteTempCheckpointLocation", 'true') \
.getOrCreate()

# Create DataFrame representing the stream of input data
data = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "structured_streaming_demo") \
    .option("startingOffsets", "earliest") \
    .load()

# Select the necessary columns
ark_holdings = data.select(split(data.value, ",")[0].alias("date"), 
                    split(data.value, ",")[1].alias("fund_name"),
                    split(data.value, ",")[2].alias("company_name"),
                    split(data.value, ",")[3].alias("ticker"),
                    split(data.value, ",")[6].alias("shares"),
                    split(data.value, ",")[7].alias("market_value"),
                    split(data.value, ",")[5].alias("weight"))

# if market_value > 1000000, show the data
weight_stock = ark_holdings.filter(col('market_value') > 1000000)

# Start the query to stream the data
query = weight_stock \
.writeStream \
.outputMode("append") \
.format("console") \
.start()

# Wait for the query to finish
query.awaitTermination()

Spark 性能调优

Spark 性能调优是一个复杂的过程,除了代码层面的优化,这是一些配置参数,以下是一些常见的性能调优方法:

资源管理

  1. 调整资源分配: 根据集群的资源情况,合理分配 CPU、内存、磁盘和网络资源,确保每个任务都能获得足够的资源;
  2. 设置资源预取: 在任务开始前,预先分配好所需的资源,减少任务启动时间;
  3. 监控资源使用情况: 使用 Spark 的监控工具,如 Spark Web UI、Ganglia、Prometheus 等,监控集群的资源使用情况,及时发现并解决资源瓶颈。

数据分区

  1. 合理设置分区数: 根据数据量和集群的资源情况,合理设置分区数,减少数据倾斜和资源浪费;
  2. 数据倾斜: 数据倾斜是指某些分区中的数据量远大于其他分区,导致某些任务运行时间过长,甚至导致任务失败。可以通过增加分区数、调整分区大小、使用随机分区等方式解决数据倾斜问题;
  3. 数据预分区: 在数据加载时,根据业务需求和集群的资源情况,合理设置数据分区,减少数据倾斜和资源浪费。

数据格式

  1. 选择合适的数据格式: 根据数据的特点和业务需求,选择合适的数据格式,如 ParquetORCAvro 等,减少数据存储和传输的开销;
  2. 数据压缩: 使用数据压缩算法,如 SnappyGzipLZO 等,减少数据存储和传输的开销。

数据缓存

  1. 合理设置缓存策略: 根据数据的特点和业务需求,合理设置缓存策略,减少数据读取和处理的开销。

To be continued…