Spark基础
Table of Contents
Spark
参考文档
https://spark.apache.org/docs/3.5.1/sql-ref-datetime-pattern.html#content
spark读取数据
spark可以使用spark.read.load加载各类数据源
df = spark.read.load("examples//users.parquet")
支持的数据源有:json, parquet, jdbc, orc, libsvm, csv, text
var pf = spark.read.parquet("xxxxx")
spark创建临时视图便于查询
有个疑问是否这些数据源都可以通过读取之后创建临时视图并进行spark sql查询呢?
pf.createOrReplaceTempView(“http”)
经过测试,parquet和json都可以创建临时视图并查询。但text类型创建临时视图后查询结果只有value一列,无法进行where条件筛选。
text类型查看文件结构:
scala> txt_res.printSchema()
root
|-- value: string (nullable = true)
读取parquet数据并使用spark.sql查询
var pf = spark.read.parquet("/data/xx_data")
pf.createOrReplaceTempView("http")
spark.sql("select * from http limit 2").coalesce(1024).write.parquet("/tmp/xx_data_res")
spark.sql查询结果显示
默认使用.show()只会显示前20条
spark.sql("xxxxx").show()
如果想全部显示:
spark.sql("xxxx").collect().foreach(println)
spark.sql结果输出不同文件格式
parquet:
spark.sql("xxx").coalesce(1024).write.parquet("/tmp/xx_data_res")
csv:
spark.sql("xxx").coalesce(1024).write.format("csv").save("/tmp/xx_data_res")
json:
spark.sql("xxx").coalesce(1024).write.format("json").save("/tmp/xx_data_res")
.coalesce()操作
coalesce 是一个用于重新分区(reduce the number of partitions)的操作。它可以减少数据集的分区数量,这在需要减少分区以优化性能或资源使用时非常有用。
通过减少分区数量,可以减少任务启动的开销。对于大数据集,分区过多会导致大量的小任务启动,从而增加任务调度的开销。将分区数量减少到合理范围可以提高性能。
使用下面的命令将数据重新分区为1024个分区。
val coalescedDF = df.coalesce(1024)
spark.sql中常见函数(用到就记录)
按照特定的日期格式处理字段
select date_format(start_time, 'yyyy-MM-dd')
spark读取多个文件数据
可直接用*全部指定或者{}部分指定
var pf = spark.read.parquet("/tmp/xx_data_res/*")
var pf = spark.read.parquet("/tmp/xx_data_res/{month=1,month=2,month=3}")
有个疑问:有时候json可以直接创建临时视图进行sql查询,为什么不直接使用而是读取之后转为parquet文件再进行sql查询操作呢?
参考:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
原因:
- 性能:Parquet是列式存储格式,在读取和查询大型数据集时通常比JSON格式更高效,具有更好的压缩率和更快的读取性能
- 列式存储优势:查询时只需读取需要的列,而不是整个记录,避免了读取不相关的数据
- 谓词下推:支持在读取数据之前将部分查询条件推送到文件的元数据中,减少所需的数据量
总的来说,数据量小的话直接用JSON没问题,但大规模数据处理时Parquet是更好的选择。