本文共 4523 字,大约阅读时间需要 15 分钟。
SparkSQL提供了通用的数据读取和保存方式,通过统一的API支持多种数据格式的读写操作。以下将详细介绍SparkSQL的数据读取与保存方法。
SparkSQL支持两种主要方式来加载数据:
spark.read.format("格式").load("路径")format和load之间添加JDBC参数,如url、user、password、tablename。format,SparkSQL可以直接读取。示例:读取JSON数据
package _02SparkSQLimport java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SparkReadData") .master("local[*]").getOrCreate() // 读取JSON数据 val frame: DataFrame = session.read.json("data/people.json") frame.printSchema() frame.show() }} 示例:读取JDBC数据
package _02SparkSQLimport java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SparkReadData") .master("local[*]").getOrCreate() // 读取JDBC数据 val frame: DataFrame = session.read.jdbc( "jdbc:mysql://localhost:3306/mydb1", "location_info", new Properties() { { put("user", "root") put("password", "123456") } } ) frame.printSchema() frame.show() }} spark.read.json("路径")等方式替代spark.read.format("json").load("路径")。url、tablename和properties对象,properties中包含user和password。示例:读取JSON数据
package _02SparkSQLimport java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SparkReadData") .master("local[*]").getOrCreate() // 读取JSON数据 val frame: DataFrame = session.read.json("data/people.json") frame.printSchema() frame.show() }} 示例:读取CSV数据
package _02SparkSQLimport java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SparkReadData") .master("local[*]").getOrCreate() // 读取CSV数据 val frame: DataFrame = session.read.csv("data/country.csv") frame.printSchema() frame.show() }} SparkSQL提供了两种主要方式来保存数据:
spark.write.format("格式").mode("模式").save("路径")SaveMode.ErrorifExists(默认):文件已存在时抛出异常。SaveMode.Append:文件已存在时追加新内容。SaveMode.Overwrite:文件已存在时覆盖数据。SaveMode.Ignore:文件已存在时忽略新数据。示例:保存到ORC格式
package _02SparkSQLimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object _07SparkWriteData { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SparkWriteData") .master("local").getOrCreate() // 读取数据 val frame: DataFrame = session.read.orc("data/student.orc") // 保存到JSON格式 frame.write.format("json").mode(SaveMode.Overwrite).save("data/OWStudent") session.stop() }} spark.write.json("路径").mode("模式")等方式替代spark.write.format("json").mode("模式").save("路径")。url、tablename和properties对象,properties中包含user和password。示例:保存到JDBC数据库
package _02SparkSQLimport java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object _07SparkWriteData { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("SparkWriteData") .master("local").getOrCreate() // 读取数据 val frame: DataFrame = session.read.orc("data/student.orc") // 保存到JDBC数据库 val properties = new Properties() { { put("user", "root") put("password", "123456") } } frame.write.mode(SaveMode.Append).jdbc( "jdbc:mysql://localhost:3306/mydb1", "student", properties ) session.stop() }} 通过以上方法,用户可以方便地使用SparkSQL进行数据读取与保存操作。
转载地址:http://xxvfk.baihongyu.com/