许多数据分析师都是用HIVE SQL跑数,这里我建议转向PySpark:
PySpark的语法是从左到右串行的,便于阅读、理解和修正;SQL的语法是从内到外嵌套的,不方便维护;
PySpark继承Python优美、简洁的语法,同样的效果,代码行数可能只有SQL的十分之一;
Spark分转化操作和行动操作,只在行动操作时才真正计算,所以可以减少不必要的计算时间;
相对于SQL层层嵌套的一个整体,PySpark可以拆分成多步,并可以十分方便地把中间结果保存为变量,更有利于调试和修改;
PySpark可以与Python中的其他模块结合使用,可以将多种功能有机结合成一个系统
PySpark SQL模块许多函数、方法与SQL中关键字一样,可以以比较低的学习成本切换
最重要的,Spark是基于内存计算的,计算速度本身比Hive快很多倍
PySpark的安装配置
如果只是在单机上练习下,照着网上的帖子在Linux系统安装一下就可以了;如果想真正在集群上实战,还是找运维搭建吧。
PySpark SQL语法
最好的学习资料当然是官方文档,不过官方文档是按函数名排序的,这对于新手不太友好,所以这里整理一下。
数据拉取
第一步是拉取数据,与SQL、Pandas、R一样,在SparkSQL中,我们以DataFrame以基本的数据结构(不过要注意,SparkSQL DataFrame与Pandas的DataFrame是两种数据结构,虽然相互转换也很容易)。
加载包
from __future__ import print_functionimport pandas as pdfrom pyspark.sql import HiveContextfrom pyspark import SparkContext,SparkConffrom sqlalchemy import create_engineimport datetimeimport pyspark.sql.functions as F conf = SparkConf().setAppName("abc") sc = SparkContext(conf=conf) hiveCtx = HiveContext(sc)
手工创建一个DataFrame
d = [{'name': 'Alice', 'age': 1},{'name': 'Bob', 'age': 5}] df = sqlContext.createDataFrame(d) df.show()
从集群里运行SQL生成DataFrame
实际工作中往往是从集群中拉数,然后处理;还是执行SQL(尽管仍是SQL,但是不必写复杂的SQL;用基本的SQL先把源数据拉出来,复杂的处理和计算交给Spark来做),以下是用Hive拉数:
sql = "" # 拉数SQL df = hiveCtx.sql(sql)
缓存与清除缓存
Spark每次作行动操作时,都是从最初的转化操作开始计算;如果不想从头开始计算,想保存中间结果表,就应该把数据载入缓存。
df.cache()
与之相对的,清除缓存为
sqlContext.clearCache()
数据探索
展示
df.show() # 不加参数默认展示前20行
统计行数
df.count()
查看schema
df.printSchema()
查看字段
df.columns
查看字段类型
df.dtypes
数据处理
查询
df.select('age','name') # 带show才能看到结果
别名
df.select(df.age.alias('age_value'),'name')
筛选
df.filter(df.name=='Alice')
增加列
增加列有2种方法,一种是基于现在的列计算;一种是用pyspark.sql.functions的lit()
增加常数列。
df.select(df.age+1,'age','name') df.select(F.lit(0).alias('id'),'age','name')
增加行
df.unionAll(df2)
删除重复记录
df.drop_duplicates()
去重
df.distinct()
删除列
df.drop('id')
删除存在缺失值的记录
df.dropna(subset=['age', 'name']) # 传入一个list,删除指定字段中存在缺失的记录
填补缺失值
df.fillna({'age':10,'name':'abc'}) # 传一个dict进去,对指定的字段填充
分组计算
df.groupby('name').agg(F.max(df['age']))
join
df.groupby('name').agg(F.max(df['age']))
函数和UDF
pyspark.sql.functions里有许多常用的函数,可以满足日常绝大多数的数据处理需求;当然也支持自己写的UDF,直接拿来用。
自带函数
根据官方文档,以下是部分函数说明:
'lit': 'Creates a :class:`Column` of literal value.','col': 'Returns a :class:`Column` based on the given column name.','column': 'Returns a :class:`Column` based on the given column name.','asc': 'Returns a sort expression based on the ascending order of the given column name.','desc': 'Returns a sort expression based on the descending order of the given column name.','upper': 'Converts a string expression to upper case.','lower': 'Converts a string expression to upper case.','sqrt': 'Computes the square root of the specified float value.','abs': 'Computes the absolutle value.','max': 'Aggregate function: returns the maximum value of the expression in a group.','min': 'Aggregate function: returns the minimum value of the expression in a group.','first': 'Aggregate function: returns the first value in a group.','last': 'Aggregate function: returns the last value in a group.','count': 'Aggregate function: returns the number of items in a group.','sum': 'Aggregate function: returns the sum of all values in the expression.','avg': 'Aggregate function: returns the average of the values in a group.','mean': 'Aggregate function: returns the average of the values in a group.','sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.',
df.select(F.max(df.age))df.select(F.min(df.age))df.select(F.avg(df.age)) # 也可以用mean,一样的效果df.select(F.countDistinct(df.age)) # 去重后统计df.select(F.count(df.age)) # 直接统计,经试验,这个函数会去掉缺失值会再统计from pyspark.sql import Windowdf.withColumn("row_number", F.row_number().over(Window.partitionBy("a","b","c","d").orderBy("time"))).show() # row_number()函数
共同學習,寫下你的評論
評論加載中...
作者其他優質文章