Hands on Delta Lake
What is Delta Lake
Delta Lake 是一个开源的数据湖存储层 (lakehouse storage layer) 技术,它利用基于文件的事务日志 (file-based transaction log) 扩展了 Parquet 数据文件,以实现 ACID 事务和可扩展的元数据处理。Delta Lake 与 Apache Spark API 完全兼容,专为与 Structured Streaming 集成而开发,可以很方便地对 batch & streaming 数据进行处理。
Delta Lake 的主要优势是它对大规模数据处理的可靠性和一致性,同时提供了类似于传统数据库的事务功能。在使用过程中可以通过 Spark SQL 来读取、写入和管理 Delta Lake 中的数据。下面将这些功能一一演示。
How to Use Delta Lake
Install Delta Packege
pip install delta-spark==2.1.0
load delta related jars from maven repo
pyspark --packages io.delta:delta-core_2.12:2.1.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
--conf "hive.tez.input.format=io.delta.hive.HiveInputFormat"
Set up a Python Project
from delta import *
from pyspark.sql import SparkSession
builder = SparkSession.builder.appName("DeltaDemo") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
Create a Table
1. Using Spark SQL DDL
spark.sql("""
create or replace table {your_database}.delta_demo(
id int,
firstName string,
middleName string,
lastName string,
gender string,
birthDate date,
ssn string,
salary int)
using delta
location 's3://{S3_BUCKET}/delta_demo';
""")
spark.sql("drop table {your_database}.delta_demo")
2. Using Exsiting Spark Dataframe and change the format to delta
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
Append Data
Write data from existing Spark Dataframe
from pyspark.sql.types import *
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
data_insert = spark.createDataFrame(data, schema)
data_insert.write.format("delta").mode("append").save("{S3_BUCKET}/delta_demo")
Read Table
1. Read Delta table name
delta_demo_df = spark.table("{your_database}.delta_demo")
delta_demo_df.show(10,0)
| id | firstName | middleName | lastName | gender | birthDate | ssn | salary |
| 9999998 | Billy | Tommie | Luppitt | M | 1992-09-17 | 953-38-9452 | 55250 |
| 9999999 | Elias | Cyril | Leadbetter | M | 1984-05-22 | 906-51-2137 | 48500 |
| 20000002 | Mary | | Smith | F | 1982-10-29 | 456-78-9012 | 98250 |
| 20000003 | Jane | | Doe | F | 1981-06-25 | 567-89-0123 | 89900 |
| 10000000 | Joshua | Chas | Broggio | M | 1968-07-22 | 988-61-6247 | 90000 |
| 20000001 | John | | Doe | M | 1978-01-14 | 345-67-8901 | 55500 |
2. Read Delta table by specifying the path to the file
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
Update Table
1. Using Spark SQL DML
spark.sql("""
UPDATE {your_database}.delta_demo SET gender = 'Female' WHERE gender = 'F'
""")
spark.sql("""
UPDATE {your_database}.delta_demo SET gender = 'Male' WHERE gender = 'M'
""")
spark.sql("""
CREATE OR REPLACE TEMP VIEW upsert_view (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17', '953-38-9452', 55250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25', '567-89-0123', 89900)
""")
spark.sql("""
MERGE INTO {your_database}.delta_demo demo
USING upsert_view upsert
ON demo.id = upsert.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
spark.sql("""
DELETE FROM {your_database}.delta_demo WHERE birthDate < '1980-01-01'
""")
| id | firstName | middleName | lastName | gender | birthDate | ssn | salary |
| 9999998 | Billy | Tommie | Luppitt | M | 1992-09-17 | 953-38-9452 | 55250 |
| 9999999 | Elias | Cyril | Leadbetter | Male | 1984-05-22 | 906-51-2137 | 48500 |
| 20000002 | Mary | | Smith | Female | 1982-10-29 | 456-78-9012 | 98250 |
| 20000003 | Jane | | Doe | F | 1981-06-25 | 567-89-0123 | 89900 |
注: Delta Lake 为了解决数据的一致性和可靠性,引入了事务日志和元数据管理来实现 ACID 事务。
Delta Lake 的物理构成主要是两部分:
- 以 Parquet 格式存储在文件系统的数据文件,任何数据操作,数据文件只增不减;
- 在数据文件的同级目录下,有一个文件夹专门存放 Json 格式的事务日志,它用来记录所有对数据的操作,每一次操作都会生成一个日志文件,记录这次操作的详细信息。
基于以上架构,在每次读取 Delta Lake 数据时,会自动根据事务日志生成一个数据快照,并将该快照的结果返回成 Dataframe。
2. Using Delta Lake API
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")
# Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))
# Upsert (merge) new data
newData = spark.range(0, 20)
deltaTable.alias("oldData") \
.merge(
newData.alias("newData"),
"oldData.id = newData.id") \
.whenMatchedUpdate(set = { "id": col("newData.id") }) \
.whenNotMatchedInsert(values = { "id": col("newData.id") }) \
.execute()
deltaTable.toDF().show()
Read older versions of data
Delta Lake 提供方式查询历次数据更改后的数据版本的快照
df1 = spark.read.format("delta") \
.option("versionAsOf", 0) \
.load("/tmp/delta-table")
df2 = spark.read.format("delta") \
.option("timestampAsOf", timestamp_string) \
.load("/tmp/delta-table")
Data Retention
为了访问先前版本的 Delta table 数据,必须完整地保留数据文件和事务日志。默认情况下,数据文件不会自动被清除,表的历史快照会被保留 30 天。
- 清除数据文件,执行 VACUUM
-- vacuum files not required by versions older than the default retention period
VACUUM {your_database}.delta_demo
-- vacuum files in path-based table
VACUUM '/data/events'
VACUUM delta.`/data/events/`
-- vacuum files not required by versions more than 100 hours old
VACUUM delta.`/data/events/` RETAIN 100 HOURS
-- do dry run to get the list of files to be deleted
VACUUM {your_database}.delta_demo DRY RUN
- 改变数据留存时间,改变 Table properties
--controls how long the history for a table is kept. The default is interval 30 days.
ALTER TABLE {your_database}.delta_demo SET TBLPROPERTIES ('delta.logRetentionDuration' = 'interval <interval>');
--To access historical data even if you run VACUUM on the Delta table, This setting may cause your storage costs to go up.
ALTER TABLE {your_database}.delta_demo SET TBLPROPERTIES ('delta.deletedFileRetentionDuration' = 'interval <interval>');
Write a stream of data to a table
支持将 Structured Streaming 的流式数据集写入 Delta table
streamingDf = spark.readStream.format("rate").load()
stream = streamingDf \
.selectExpr("value as id") \
.writeStream.format("delta") \
.option("checkpointLocation", "/tmp/checkpoint") \
.start("/tmp/delta-table")
Read Delta table as a streaming source
将 Delta 表的更新读进流式数据中
stream2 = spark.readStream.format("delta") \
.load("/tmp/delta-table") \
.writeStream.format("console") \
.start()
Hive Insert Overwrite VS Delta Lake Update
Hive Insert Overwrite
Advantages:
- Simplicity: Hive 的 overwrite 功能简单易用,特别是在需要完全替换表中数据时;
- Compatibility: Hive 与 Hadoop 生态系统紧密集成,与各种基于 Hadoop 的工具和框架兼容;
- Flexibility: 可以用于 overwrite 整个表或特定分区,提供数据管理的灵活性。
Disadvantages:
- Limited Update Support: 缺乏细粒度更新的能力(如更新特定的行或列),对于细粒度更新的情景效率低下;
- Resource-Intensive: overwrite 大表涉及删除现有数据并写入新数据,消耗大量资源,影响性能和存储利用率;
- No Transaction Support: Hive 的 overwrite 操作不支持 ACID 事务,不利于在并发环境中保持数据完整性。
Delta Lake Update
Advantages:
- ACID Transactions: Delta Lake 提供完整的 ACID 事务,确保在并发读写操作中数据的完整性和一致性;
- Fine-Grained Updates: 支持细粒度更新,允许更高效和有针对性的数据操作;
- Schema Evolution: Delta Lake 支持模式演化,允许对表 Schema 进行更改而无需重写整个数据集;
- Optimized Performance: Delta Lake 的优化存储格式和事务功能有助于提高大规模数据集处理性能。
Disadvantages:
- Dependency on Spark: Delta Lake 与 Apache Spark 紧密集成,受限于不能直接与其他数据处理框架一起使用;
- Limited Compatibility: Delta Lake 与 Hive 的集成存在限制,主要在元数据管理方面;
- Complexity: ACID 事务和模式演化功能增加了复杂性,需要更深入地理解底层存储和处理机制。
Summary
总之,Hive 的 overwrite 功能提供了简单性和兼容性,但缺乏细粒度更新支持和 ACID 事务;Delta Lake提供 ACID 事务、精细更新、模式演化和优化性能,但依赖于 Spark,与其他框架的兼容性有限。
具体的选择取决于用例的要求,包括对事务支持、精细更新和现有技术栈的需求。
Delta Lake on Unstructured and Semi-structured Data
依照同样的原理,Delta Lake 也可以用来处理 Json、XML、Avro 等半结构化数据,以及文本、图像、音频、视频等非结构化的数据。
1. 以 Json 数据为例,将半结构化对象存入 Delta Table
from pyspark.sql import SparkSession
# 创建 Spark Session
spark = SparkSession.builder \
.appName("DeltaLakeExample") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# 读取 JSON 数据
json_df = spark.read.json("/path/to/json/files")
# 将 JSON 数据写入 Delta Lake 表
json_df.write.format("delta").mode("append").save("/tmp/delta-table-json")
# 读取 Delta Lake 表中的 JSON 数据
delta_json_df = spark.read.format("delta").load("/tmp/delta-table-json")
delta_json_df.show()
2. 以文本数据为例,将非结构化对象存入 Delta Table
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# 创建 Spark Session
spark = SparkSession.builder \
.appName("DeltaLakeExample") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# 定义 schema
schema = StructType([
StructField("id", IntegerType(), True),
StructField("text", StringType(), True)
])
# 创建数据
data = [
(1, "This is a sample text."),
(2, "Another example of text data.")
]
# 创建 DataFrame
text_df = spark.createDataFrame(data, schema)
# 将文本数据写入 Delta Lake 表
text_df.write.format("delta").mode("append").save("/tmp/delta-table-text")
# 读取 Delta Lake 表中的文本数据
delta_text_df = spark.read.format("delta").load("/tmp/delta-table-text")
delta_text_df.show()
To be continued…