2022年 11月 5日

pythonspark实例_Spark的Python编程-简单示例

安装好Spark 后,自带了一些demo, 路径在Spark根目录/examples/src/main/python/

里面有些例子,例如统计字数的 wordcount.py

import sys

from operator import add

from pyspark import SparkContext

import sys

reload(sys)

sys.setdefaultencoding(“utf-8”)

if __name__ == “__main__”:

if len(sys.argv) != 2:

print >> sys.stderr, “Usage: wordcount “

exit(-1)

sc = SparkContext(appName=”PythonWordCount”)

lines = sc.textFile(sys.argv[1], 1)

counts = lines.flatMap(lambda x: x.split(‘ ‘)) \

.map(lambda x: (x, 1)) \

.reduceByKey(add)

output = counts.collect()

for (word, count) in output:

print “%s: %i” % (word, count)

sc.stop()

写了一个小demo,就是练习一下api的使用,做业务很方便。针对于大数据文件做统计分析的。比如几十兆上百兆的我们单机处理,上G的就放在hadoop 的 hdfs上。

下面是一个学生成绩单。四列字段:学生,以及三科成绩。其中学生有重复的(比如额外加分的情况,需要合并分析)。

yang 85 90 30

wang 20 60 50

zhang 90 90 90

li 100 54 0

yanf 0 0 0

yang 12 0 0

当然实际中数据要很多,比如很多列,而且几十万行甚至几百万行。这里是一个demo ,相当于在部署前测试。

在 Spark根目录/example/src/main/python/ 下新建一个 students.py :

#coding=utf-8

import sys

from operator import add

from pyspark import SparkContext

import sys

reload(sys)

sys.setdefaultencoding(“utf-8”)

def map_func(x):

s = x.split()

return (s[0],[int(s[1]),int(s[2]),int(s[3])])

def f(x):

return x

rank = sc.parallelize(range(0,sorted.count()))

def add(a,b):

return [a[r]+ b[r] for r in range(len(a))]

def _merge(a,b):

print ‘****’

return [a[r]+ b[r] for r in range(len(a))]

#the students who has one score is 100

def has100(x):

for y in x:

if(y==100):

return True

return False

def allIs0(x):

if(type(x) == list and sum(x) == 0):

return True

return False

def subMax(x,y):

m = [x[1][i] if(x[1][i] > y[1][i]) else y[1][i] for i in range(3)]

return(”,m)

def sumAll(x,y):

return (”,[x[1][i]+y[1][i] for i in range(3)])

if __name__ == “__main__”:

if len(sys.argv) != 2:

print >> sys.stderr, “Usage: students “

exit(-1)

sc = SparkContext(appName=”Students”)

# 加载学生文件,调用map将学生映射成keyValues.其中,key是学生,Value是学生成绩。

# map后的结果如(‘yang’,(85,90,30))

# 之后调用 CombineByKey,将相同学生的成绩相加(合并)。

# 然后调用cache, 将整个数据缓存,以便多次进行reduce而无需每次都重新生成。

lines = sc.textFile(sys.argv[1], 1).map(map_func).combineByKey(f,add,_merge).cache()

#print lines

count = lines.count()

# 获取学生中三科成绩有满分的,调用filter来实现

whohas100 = lines.filter(lambda x: filter(has100,x)).collect()

# 获取三科中所有成绩都是0的同学(缺考)

whoIs0 = lines.filter(lambda x: filter(allIs0,x)).collect()

# 获取每个学生的成绩总和

sumScore = lines.map(lambda x: (x[0],sum(x[1]))).collect()

# 获取三科中,单科最高分

subM = lines.reduce(subMax)

# 获取学生单科成绩的总和,求单科平均分用

sumA = lines.reduce(sumAll)

# 总分最高的学生

maxScore = max(sumScore,key = lambda x: x[1])

# 总分最低的学生

minScore = min(sumScore,key = lambda x: x[1])

# 所有学生三科成绩平均分

avgA = [x/count for x in sumA[1]]

# 根据总分进行排序(默认由小而大)

sorted = lines.sortBy(lambda x: sum(x[1]))

# 排序并附带序号

sortedWithRank = sorted.zipWithIndex().collect()

# 取出成绩最高的前三名同学,发奖!

first3 = sorted.takeOrdered(3,key = lambda x: -sum(x[1]))

#print ‘*’*50

print whohas100

print maxScore

print whoIs0

print subM

print avgA

print sorted.collect()

print sortedWithRank

print first3

#将结果汇总输出到文件

file = open(‘/home/yanggaofei/downloads/result.txt’,’w’)

file.write(‘students num:’+`count`+ ‘\n’)

file.write(‘who has a 100 scores:’ + str(whohas100) + ‘\n’)

file.write(‘who all is 0:’ + str(whoIs0) + ‘\n’)

file.write(‘the max score of each subject:’ + str(subM) + ‘\n’)

file.write(‘the avg score of each subject:’ + str(avgA) + ‘\n’)

file.write(‘sorted the students:’ + str(sorted.collect()) + ‘\n’)

file.write(‘sorted the students with the rank:’ + str(sortedWithRank) + ‘\n’)

file.write(‘the first 3 who will get the award:’ + str(first3) + ‘\n’)

file.close()

好了,运行:

[root@cyouemt spark-1.1.1]

# ./bin/spark-submit examples/src/main/python/students.py temp/student.txt

运行结果result.txt如下:

students num:5

who has a 100 scores:[(u’li’, [100, 54, 0])]

who all is 0:[(u’yanf’, [0, 0, 0])]

the max score of each subject:(”, [100, 90, 90])

the avg score of each subject:[61, 58, 34]

sorted the students:[(u’yanf’, [0, 0, 0]), (u’wang’, [20, 60, 50]),

(u’li’, [100, 54, 0]), (u’yang’, [97, 90, 30]),

(u’zhang’, [90, 90, 90])]

sorted the students with the rank:[

((u’yanf’, [0, 0, 0]), 0), ((u’wang’, [20, 60, 50]), 1),

((u’li’, [100, 54, 0]), 2), ((u’yang’, [97, 90, 30]), 3),

((u’zhang’, [90, 90, 90]), 4)]

the first 3 who will get the award:[

(u’zhang’, [90, 90, 90]),

(u’yang’, [97, 90, 30]),

(u’li’, [100, 54, 0])]

Spark的运行过程会打印出任务执行的开始过程以及结束。表示没研究透,不做陈述。。。

相比hadoop,Spark 是一个内存计算的MapReduce, 通过缓存机制,在性能上要好很多。它自身不带数据系统。但是支持 hdfs,mesos,hbase。文本文件等。从架构和应用角度上看,spark是 一个仅包含计算逻辑的开发库(尽管它提供个独立运行的master/slave服务,但考虑到稳定后以及与其他类型作业的继承性,通常不会被采用),而不 包含任何资源管理和调度相关的实现,这使得spark可以灵活运行在目前比较主流的资源管理系统上,典型的代表是mesos和yarn,我们称之为 “spark on mesos”和“spark on yarn”。将spark运行在资源管理系统上将带来非常多的收益,包括:与其他计算框架共享集群资源;资源按需分配,进而提高集群资源利用率等