2. Spark下载与入门
2.2 Spark的Shell
两种类型的Shell:
# Python Shell
bin/pyspark
# Scala Shell
bin/spark-shell
使用IPython:
# 下载
sudo apt-get install ipython
# 使用IPython
IPYTHON=1 ./bin/pyspark
# 下载IPython Notebook
sudo apt-get install ipython-notebook
# 使用IPython Notebook
IPYTHON_OPTS="notebook" ./bin/pyspark
2.3 Spark核心概念简介
- 驱动器(SparkContext)
- 执行器
2.4 独立应用
创建SparkContext:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster('local').setAppName('My App')
sc = SparkContext(conf = conf)
运行Python脚本:
bin/spark-submit my_script.py
# 使用Python3
PYSPARK_PYTHON=`which python3` spark-submit my_script.py
3. RDD编程
RDD - 弹性分布式数据集。是一个不可变的分布式对象集合。
3.2 创建RDD
创建RDD的方法:
- 读取外部数据集
- 在驱动器程序中并行化一个集合
例如:
lines = sc.textFile('README.md')
nums = sc.parallelize([1, 2, 3, 4])
3.3 RDD操作
- 转化操作:例如map、filter
- 行动操作:例如count、take、collect、saveAsTextFile、saveAsSequenceFile
RDD的转换操作是惰性求值的。
注意:传递函数时,不小心可能会把函数所在的对象也序列化传出去。
针对各个元素的转化操作
mapflatMapfilterdistinctsample(withReplacement, fraction, [seed])
伪集合操作
RDD1.union(RDD2)RDD1.intersection(RDD2)RDD1.substract(RDD2)RDD1.cartesian(RDD2)
行动操作
collect()count()countByValue()take(num)top(num)takeOrdered(num)(ordering)takeSample(withReplacement, num, [seed])reduce(func)fold(zero)(func)aggregate(zeroValue)(seqOp, combOp)foreach(func)
3.6 持久化(缓存)
persist(storageLevel=StorageLevel(False, True, False, False, 1))
默认级别MEMORY_ONLY_SER。等价于cache()。
4. 键值对操作
4.2 创建Pair RDD
方法有很多,例如用map:
pairs = lines.map(lambda x : (x.split(' ')[0], x))
4.3 Pair RDD的转化操作
Pair RDD的转化操作
reduceByKey(func)groupByKey()combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)mapValues(func)flatMapValues(func)keys()values()sortByKey()
针对两个Pair RDD的转化操作
substractByKeyjoinrightOuterJoinleftOuterJoincogroup
4.3.1 聚合操作
聚合操作:
reduceByKey():对应reduce()foldByKey():对应fold()combineByKey():对应aggregate()
并行度优化:
repartition()coalesce(numPartitions, shuffle=False)
4.3.2 数据分组
groupByKey()
注意:rdd.reduceByKey(func)等价于rdd.groupByKey().mapValues(v => v.reduce(func)),但前者更高效。
4.3.3 连接
4.3.4 数据排序
sortByKey()
4.4 Pair RDD的行动操作
countByKey()collectAsMap()lookup(key)
4.5 数据分区(进阶)
分区优化:
# Python
partitionBy(numPartitions, partitionFunc=<function portable_hash at 0x7f1ac7340578>)
// Java
public JavaPairRDD<K,V> partitionBy(Partitioner partitioner)
// Scala
def partitionBy(partitioner: Partitioner): JavaPairRDD[K, V]
分区数numPartitions至少应该和集群的总核心数一样。
很多操作,都会自动为结果RDD设定已知的分区方式信息。例如sortByKey()和groupByKey(),分别生成范围分区的RDD和哈希分区的RDD。另一方面,诸如map()这样的操作会导致新的RDD失去父RDD的分区信息。
4.5.1 获取RDD的分区方式:
- Java中:
rdd.partitioner() - Scala中:
rdd.partitioner - Python中:无法获取,但是Spark内部仍然会利用已有的分区信息。
4.5.3 从分区中获益的操作
cogroup()groupWith()join()leftOuterJoin()rightOuterJoin()groupByKey()reduceByKey()combineByKey()lookup()
4.5.4 影响分区方式的操作
所有会为结果RDD设好分区方式的操作:
cogroup()groupWith()join()leftOuterJoin()rightOuterJoin()groupByKey()reduceByKey()combineByKey()partitionBy()sort()mapValues():如果父RDD有分区方式的话flatMapValues():如果父RDD有分区方式的话
注意:在无需改变元素的键时,尽量使用mapValues()或flatMapValues()。
4.5.5 自定义分区方式
- 在Java和Scala中,扩展
Partitioner类 - 在Python中,只需要把一个特定的哈希函数作为一个额外的参数传给
RDD.partitionBy()函数。例如:
import urlparse
def hash_domain(url):
return hash(urlparse.urlparse(url).netloc)
rdd.partitionBy(20, hash_domain)
注意:如果想对多个RDD使用相同的分区方式,就应该使用同一个函数对象,比如一个全局函数,而不是为每个RDD创建一个新的函数对象。