Spark基础

#spark #program

Table of Contents

Spark

参考文档

https://spark.apache.org/docs/3.5.1/sql-ref-datetime-pattern.html#content

spark读取数据

spark可以使用spark.read.load加载各类数据源

df = spark.read.load("examples//users.parquet")

支持的数据源有:json, parquet, jdbc, orc, libsvm, csv, text

var pf = spark.read.parquet("xxxxx")

spark创建临时视图便于查询

有个疑问是否这些数据源都可以通过读取之后创建临时视图并进行spark sql查询呢?

pf.createOrReplaceTempView(“http”)

经过测试,parquet和json都可以创建临时视图并查询。但text类型创建临时视图后查询结果只有value一列,无法进行where条件筛选。

text类型查看文件结构:

scala> txt_res.printSchema()
root
 |-- value: string (nullable = true)

读取parquet数据并使用spark.sql查询

var pf = spark.read.parquet("/data/xx_data")
pf.createOrReplaceTempView("http")

spark.sql("select * from http limit 2").coalesce(1024).write.parquet("/tmp/xx_data_res")

spark.sql查询结果显示

默认使用.show()只会显示前20条

spark.sql("xxxxx").show()

如果想全部显示:

spark.sql("xxxx").collect().foreach(println)

spark.sql结果输出不同文件格式

parquet:

spark.sql("xxx").coalesce(1024).write.parquet("/tmp/xx_data_res")

csv:

spark.sql("xxx").coalesce(1024).write.format("csv").save("/tmp/xx_data_res")

json:

spark.sql("xxx").coalesce(1024).write.format("json").save("/tmp/xx_data_res")

.coalesce()操作

coalesce 是一个用于重新分区(reduce the number of partitions)的操作。它可以减少数据集的分区数量,这在需要减少分区以优化性能或资源使用时非常有用。

通过减少分区数量,可以减少任务启动的开销。对于大数据集,分区过多会导致大量的小任务启动,从而增加任务调度的开销。将分区数量减少到合理范围可以提高性能。

使用下面的命令将数据重新分区为1024个分区。

val coalescedDF = df.coalesce(1024)

spark.sql中常见函数(用到就记录)

按照特定的日期格式处理字段

select date_format(start_time, 'yyyy-MM-dd')

spark读取多个文件数据

可直接用*全部指定或者{}部分指定

var pf = spark.read.parquet("/tmp/xx_data_res/*")

var pf = spark.read.parquet("/tmp/xx_data_res/{month=1,month=2,month=3}")

有个疑问:有时候json可以直接创建临时视图进行sql查询,为什么不直接使用而是读取之后转为parquet文件再进行sql查询操作呢?

参考:https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

原因:

  • 性能:Parquet是列式存储格式,在读取和查询大型数据集时通常比JSON格式更高效,具有更好的压缩率和更快的读取性能
  • 列式存储优势:查询时只需读取需要的列,而不是整个记录,避免了读取不相关的数据
  • 谓词下推:支持在读取数据之前将部分查询条件推送到文件的元数据中,减少所需的数据量

总的来说,数据量小的话直接用JSON没问题,但大规模数据处理时Parquet是更好的选择。