大象教程
首页
Spark
Hadoop
HDFS
MapReduce
Hive
Spark 教程
Spark 教程
Spark 基本架构及运行原理
Spark 安装(本地模式)
Spark 安装(集群模式)
Spark Shell 的使用
使用Intellij idea编写Spark应用程序(Scala+Maven)
使用Intellij idea编写Spark应用程序(Scala+SBT)
SparkContext
Spark Stage
Spark Executor
Spark RDD
Spark RDD 的创建方式
Spark RDD 缓存机制
Spark 键值对 RDD
Spark RDD 基本操作
Spark RDD 依赖关系图
Spark Map 和 FlatMap 的比较
Spark DAG
Spark 集群管理器
Spark spark-submit 提交应用程序
Spark 共享变量
Spark SQL
Spark SQL 教程
Spark SQL 数据类型
Spark SQL DataFrame
Spark SQL 数据源
Spark SQL SparkSession
Spark SQL DataSet
RDD、DataFrame和DataSet的区别
Spark Streaming
Spark Streaming 教程
Spark Streaming DStream
Spark Streaming 检查点(checkpoint)
Spark GraphX
Spark GraphX 教程
Spark GraphX 图操作
Spark GraphX 算法实例
PySpark 教程
PySpark 教程
PySpark 环境设置
PySpark SparkContext
PySpark RDD
PySpark 广播和累加器
PySpark SparkConf
PySpark SparkFiles
PySpark 存储级别
PySpark MLlib
PySpark 序列化器
#Spark SQL 数据源 Spark SQL支持读取很多种数据源,比如parquet文件,json文件,文本文件,数据库等。下面列出了具体的一些数据源: - Parquet文件 - Orc文件 - Json文件 - Hive表 - JDBC 先把people.json导入到hdfs的tmp目录下。people.json内容如下: ``` {"name":"Michael","age":20} {"name":"Andy", "age":30} {"name":"Justin", "age":19} ``` ##通用的load/save函数 spark提供了通用的`load`和`save`函数用于加载和保存数据。支持多种数据格式:json,parquet,jdbc,orc,libsvm,csv,text等。 ```scala scala> val peopleDF = spark.read.format("json").load("hdfs:/tmp/people.json") peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> peopleDF.select("name","age").write.format("parquet").save("/tmp/peoplenew.parquet") ``` 默认的是parquet,可以通过`spark.sql.sources.default`,修改默认配置。 ##Parquet文件 ```scala scala> val parquetFileDF=spark.read.parquet("hdfs:/tmp/peoplenew.parquet") parquetFileDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint] scala> peopleDF.write.parquet("/tmp/people.parquet") ``` ##Orc文件 ```scala scala> val df=spark.read.json("hdfs:/tmp/people.json") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.write.mode("append").orc("/tmp/people") ``` 在hdfs查看`/tmp/people`目录 ```bash $ hdfs dfs -ls /tmp/people Found 2 items -rw-r--r-- 3 ccpgdev supergroup 0 2019-04-25 17:24 /tmp/people/_SUCCESS -rw-r--r-- 3 ccpgdev supergroup 343 2019-04-25 17:24 /tmp/people/part-00000-3eea0d3e-4349-4cc0-90c7-45c423069284-c000.snappy.orc ``` spark sql 读取orc文件 ```scala scala> spark.read.orc("/tmp/people").show() +---+-------+ |age| name| +---+-------+ | 20|Michael| | 30| Andy| | 19| Justin| +---+-------+ ``` ##Json文件 ```scala scala> val df=spark.read.json("hdfs:/tmp/people.json") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> df.write.mode("overwrite").json("/tmp/peoplejson/") scala> spark.read.json("/tmp/peoplejson/").show() +---+-------+ |age| name| +---+-------+ | 20|Michael| | 30| Andy| | 19| Justin| +---+-------+ $ hdfs dfs -ls /tmp/peoplejson Found 2 items -rw-r--r-- 3 ccpgdev supergroup 0 2019-04-25 17:28 /tmp/peoplejson/_SUCCESS -rw-r--r-- 3 ccpgdev supergroup 80 2019-04-25 17:28 /tmp/peoplejson/part-00000-52a02853-e85b-45eb-ba66-4ab92c3f7142-c000.json ``` ##Hive表 Spark 1.6及以前的版本使用hive表需要hivecontext。Spark 2.0开始只需要创建sparksession增加`enableHiveSupport()`即可。 先在Hive的tmp库下新建一张表tmp_building_num,并插入一些数据。 ```bash hive> create table tmp_building_num(x int); OK Time taken: 0.127 seconds hive> insert into tmp_building_num values(1); Query ID = ccpgdev_20190425174242_bff1a3ed-b02c-47e7-bb11-8a05eb5c70c1 ...省略日志... Stage-Stage-1: Map: 1 Cumulative CPU: 4.73 sec HDFS Read: 3196 HDFS Write: 78 SUCCESS Total MapReduce CPU Time Spent: 4 seconds 730 msec OK Time taken: 22.339 seconds hive> select * from tmp_building_num; OK 1 Time taken: 0.154 seconds, Fetched: 1 row(s) scala> import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession scala> val spark = SparkSession. | builder(). | enableHiveSupport(). | getOrCreate() 19/04/25 17:38:48 WARN sql.SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect. spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@498f1f63 scala> spark.sql("select count(1) from tmp.tmp_building_num").show(); +--------+ |count(1)| +--------+ | 1 | +--------+ ``` ##JDBC 数据写入mysql。 ```scala scala> df.repartition(1).write.mode("append").option("user", "root") .option("password", "password").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties()) ``` 从mysql里读数据。 ```scala scala> val fromMysql = spark.read.option("user", "root") .option("password", "password").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties()) ```
加我微信交流吧