大象教程
首页
Spark
Hadoop
HDFS
MapReduce
Hive
Spark 教程
Spark 教程
Spark 基本架构及运行原理
Spark 安装(本地模式)
Spark 安装(集群模式)
Spark Shell 的使用
使用Intellij idea编写Spark应用程序(Scala+Maven)
使用Intellij idea编写Spark应用程序(Scala+SBT)
SparkContext
Spark Stage
Spark Executor
Spark RDD
Spark RDD 的创建方式
Spark RDD 缓存机制
Spark 键值对 RDD
Spark RDD 基本操作
Spark RDD 依赖关系图
Spark Map 和 FlatMap 的比较
Spark DAG
Spark 集群管理器
Spark spark-submit 提交应用程序
Spark 共享变量
Spark SQL
Spark SQL 教程
Spark SQL 数据类型
Spark SQL DataFrame
Spark SQL 数据源
Spark SQL SparkSession
Spark SQL DataSet
RDD、DataFrame和DataSet的区别
Spark Streaming
Spark Streaming 教程
Spark Streaming DStream
Spark Streaming 检查点(checkpoint)
Spark GraphX
Spark GraphX 教程
Spark GraphX 图操作
Spark GraphX 算法实例
PySpark 教程
PySpark 教程
PySpark 环境设置
PySpark SparkContext
PySpark RDD
PySpark 广播和累加器
PySpark SparkConf
PySpark SparkFiles
PySpark 存储级别
PySpark MLlib
PySpark 序列化器
#PySpark RDD 现在我们已经在我们的系统上安装和配置了 PySpark,我们可以在 Apache Spark 上使用 Python 进行编程。不过在此之前,让我们先了解一下 Spark 中的一个基本概念——RDD。 RDD代表Resilient Distributed Dataset,这些是在多个节点上运行和操作以在集群上进行并行处理的元素。RDD 是不可变的元素,这意味着一旦创建了 RDD,就无法更改它。RDD 也是容错的,因此在发生任何故障时,它们会自动恢复。您可以对这些 RDD 应用多个操作来完成某个任务。 要对这些 RDD 应用操作,有两种方法: - Transform - Action 让我们详细了解这两种方式。 **Transform**:这些是应用于 RDD 以创建新 RDD 的操作。Filter、groupBy 和 map 是转换的例子。 **Action**:这些是应用于 RDD 的操作,它指示 Spark 执行计算并将结果发送回驱动程序。 要在 PySpark 中应用任何操作,我们需要创建一个PySpark RDD第一的。以下代码块包含 PySpark RDD 类的详细信息: ```python class pyspark.RDD ( jrdd, ctx, jrdd_deserializer = AutoBatchedSerializer(PickleSerializer()) ) ``` 让我们看看如何使用 PySpark 运行一些基本操作。Python 文件中的以下代码创建 RDD 单词,其中存储了一组提到的单词。 ```python words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) ``` ##count() 返回 RDD 中的元素数。 ```python ----------------------------------------count.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "count app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) counts = words.count() print "Number of elements in RDD -> %i" % (counts) ----------------------------------------count.py--------------------------------------- ``` 执行结果如下: ```bash $SPARK_HOME/bin/spark-submit count.py Number of elements in RDD → 8 ``` ##collect() 返回 RDD 中的所有元素。 ```python ----------------------------------------collect.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Collect app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) coll = words.collect() print "Elements in RDD -> %s" % (coll) ----------------------------------------collect.py--------------------------------------- ``` 执行命令 ```bash $SPARK_HOME/bin/spark-submit collect.py Elements in RDD -> [ 'scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark' ] ``` ##foreach 仅返回那些满足 foreach 内部函数条件的元素。在下面的例子中,我们在 foreach 中调用了一个 print 函数,它打印了 RDD 中的所有元素。 ```python ----------------------------------------foreach.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "ForEach app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) def f(x): print(x) fore = words.foreach(f) ----------------------------------------foreach.py--------------------------------------- ``` 执行结果如下: ```bash $SPARK_HOME/bin/spark-submit foreach.py scala java hadoop spark akka spark vs hadoop pyspark pyspark and spark ``` ##filter 返回一个包含元素的新 RDD,它满足过滤器内部的功能。在下面的示例中,我们过滤掉包含“spark”的字符串。 ```python ----------------------------------------filter.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Filter app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_filter = words.filter(lambda x: 'spark' in x) filtered = words_filter.collect() print "Fitered RDD -> %s" % (filtered) ----------------------------------------filter.py---------------------------------------- ``` 执行结果如下: ```bash $SPARK_HOME/bin/spark-submit filter.py Fitered RDD -> [ 'spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark' ] ``` ##map 通过对 RDD 中的每个元素应用一个函数来返回一个新的 RDD。在下面的示例中,我们形成一个键值对并将每个字符串映射为值为 1。 ```python ----------------------------------------map.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Map app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_map = words.map(lambda x: (x, 1)) mapping = words_map.collect() print "Key value pair -> %s" % (mapping) ----------------------------------------map.py--------------------------------------- ``` 执行结果如下: ```bash $SPARK_HOME/bin/spark-submit map.py Key value pair -> [ ('scala', 1), ('java', 1), ('hadoop', 1), ('spark', 1), ('akka', 1), ('spark vs hadoop', 1), ('pyspark', 1), ('pyspark and spark', 1) ] ##reduce 执行指定的交换和关联二元运算后,返回RDD中的元素。在下面的示例中,我们从运算符中导入 add 包并将其应用于“num”以执行简单的加法操作。 ```python ----------------------------------------reduce.py--------------------------------------- from pyspark import SparkContext from operator import add sc = SparkContext("local", "Reduce app") nums = sc.parallelize([1, 2, 3, 4, 5]) adding = nums.reduce(add) print "Adding all the elements -> %i" % (adding) ----------------------------------------reduce.py--------------------------------------- ``` 执行结果如下: ```bash $SPARK_HOME/bin/spark-submit reduce.py Adding all the elements -> 15 ``` ##join 它返回带有匹配键的一对元素以及该特定键的所有值的 RDD。在以下示例中,两个不同的 RDD 中有两对元素。加入这两个 RDD 后,我们得到一个 RDD,其中的元素具有匹配的键及其值。 ```python ----------------------------------------join.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Join app") x = sc.parallelize([("spark", 1), ("hadoop", 4)]) y = sc.parallelize([("spark", 2), ("hadoop", 5)]) joined = x.join(y) final = joined.collect() print "Join RDD -> %s" % (final) ----------------------------------------join.py--------------------------------------- ``` 执行结果如下: ```bash $SPARK_HOME/bin/spark-submit join.py Join RDD -> [ ('spark', (1, 2)), ('hadoop', (4, 5)) ] ``` ##collect() 使用默认存储级别 (MEMORY_ONLY) 持久化此 RDD。您还可以检查 RDD 是否已缓存。 ```python ----------------------------------------cache.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Cache app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words.cache() caching = words.persist().is_cached print "Words got chached > %s" % (caching) ----------------------------------------cache.py--------------------------------------- ``` 执行结果如下: ```bash $SPARK_HOME/bin/spark-submit cache.py Words got cached -> True ``` 以上这些是在 PySpark RDD 上完成的一些最重要的操作。
加我微信交流吧