大象教程
首页
Spark
Hadoop
HDFS
MapReduce
Hive
Spark 教程
Spark 教程
Spark 基本架构及运行原理
Spark 安装(本地模式)
Spark 安装(集群模式)
Spark Shell 的使用
使用Intellij idea编写Spark应用程序(Scala+Maven)
使用Intellij idea编写Spark应用程序(Scala+SBT)
SparkContext
Spark Stage
Spark Executor
Spark RDD
Spark RDD 的创建方式
Spark RDD 缓存机制
Spark 键值对 RDD
Spark RDD 基本操作
Spark RDD 依赖关系图
Spark Map 和 FlatMap 的比较
Spark DAG
Spark 集群管理器
Spark spark-submit 提交应用程序
Spark 共享变量
Spark SQL
Spark SQL 教程
Spark SQL 数据类型
Spark SQL DataFrame
Spark SQL 数据源
Spark SQL SparkSession
Spark SQL DataSet
RDD、DataFrame和DataSet的区别
Spark Streaming
Spark Streaming 教程
Spark Streaming DStream
Spark Streaming 检查点(checkpoint)
Spark GraphX
Spark GraphX 教程
Spark GraphX 图操作
Spark GraphX 算法实例
PySpark 教程
PySpark 教程
PySpark 环境设置
PySpark SparkContext
PySpark RDD
PySpark 广播和累加器
PySpark SparkConf
PySpark SparkFiles
PySpark 存储级别
PySpark MLlib
PySpark 序列化器
#Spark SQL DataFrame DataFrame是一个分布式数据集合,它被组织成命名列。从概念上讲,它相当于具有良好优化技术的关系表。 DataFrame可以从不同来源的数组构造,例如Hive表,结构化数据文件,外部数据库或现有RDD。这个API是为现代大数据和数据科学应用程序设计的,Spark SQL的DataFrame设计灵感来自Python的Pandas和R语言的DataFrame数据结构。 ##DataFrame的特性 下面是一些DataFrame的一些特征: - 在单节点集群或者大集群,处理KB到PB级别的数据。 - 支持不同的数据格式(Avro,csv,ElasticSearch和Cassandra)和存储系统(HDFS,HIVE表,mysql等)。 - Spark SQL Catalyst 优化器。 - 可以通过Spark-Core轻松地与所有大数据工具和框架集成。 - 提供Python,Java,Scala和R等语言API。 ##SparkSession SparkSession是一个入口类,用于初始化Spark SQL的功能。 以下命令用于通过spark-shell初始化SparkSession。 `$ spark-shell` 使用以下命令创建SQLContext。 ```scala scala> import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession scala> val spark=SparkSession .builder() .appName("My Spark SQL") .getOrCreate() 19/04/25 14:40:31 WARN sql.SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect. spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@560465ea scala> import spark.implicits._ import spark.implicits._ ``` `spark.implicits._`主要用来隐式转换的,比如Rdd转DataFrame ##DataFrame基本操作 DataFrame为结构化数据操作提供了一个领域特定的语言(domain-specific language)。下面会提供一些DataFrame操作结构化数据的基本示例。 读取json文件并创建DataFrame,`SQLContext.read.json`方法返回的就是DataFrame。 ```scala scala> val dfs = spark.read.json("hdfs:/tmp/employee.json") dfs: org.apache.spark.sql.DataFrame = [age: string, id: string ... 1 more field] ``` **注意**:要先把employee.json文件上传到hdfs的tmp目录下。 `hdfs dfs -put employee.json /tmp` employee.json内容如下: ``` [{"id" : "1201", "name" : "satish", "age" : "25"}, {"id" : "1202", "name" : "krishna", "age" : "28"}, {"id" : "1203", "name" : "amith", "age" : "39"}, {"id" : "1204", "name" : "javed", "age" : "23"}, {"id" : "1205", "name" : "prudvi", "age" : "23"}] ``` 返回数据将会以age、id、name三个字段展示。 `dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]` 查看DataFrame数据。 ```scala scala> dfs.show() +---+----+-------+ |age| id| name| +---+----+-------+ | 25|1201| satish| | 28|1202|krishna| | 39|1203| amith| | 23|1204| javed| | 23|1205| prudvi| +---+----+-------+ ``` 使用`printSchema`方法查看DataFrame的数据模式。 ```scala scala> dfs.printSchema() root |-- age: string (nullable = true) |-- id: string (nullable = true) |-- name: string (nullable = true) ``` 使用`select()`函数查看某个列的数据。 ```scala scala> dfs.select("name").show() +-------+ | name| +-------+ | satish| |krishna| | amith| | javed| | prudvi| +-------+ ``` 用`filter`函数查找年龄大于`23(age> 23)`的雇员。 ```scala scala> dfs.filter(dfs("age")>23).show() +---+----+-------+ |age| id| name| +---+----+-------+ | 25|1201| satish| | 28|1202|krishna| | 39|1203| amith| +---+----+-------+ ``` 使用`groupBy`方法计算同一年龄的员工人数。类似SQL里面的`group by`语句。 ```scala scala> dfs.groupBy("age").count().show() +---+-----+ |age|count| +---+-----+ | 28| 1| | 23| 2| | 25| 1| | 39| 1| +---+-----+ ```
加我微信交流吧