PyOdps 0.4版本发布,从一个故事说起
有这么个故事(如有雷同,纯属巧合)。有一天,某运营同学给某开发同学一个excel文件,里面是个客户清单。
“帮我查下这些用户的消耗呢”。
开发同学扫了一眼,几百个用户。这个事肯定是可以办的,但是想到麻烦程度,开发同学心里肯定是有不少羊驼经过的啦。
“有点麻烦啊”,开发同学轻轻抱怨。
“我懂的,把这个表和ODPS里的表join下就好了嘛。”运营同学努努嘴。
“……”。于是,开发同学把excel数据导出成文本格式,然后dship上传到ODPS,ODPS上编写SQL,dship下载,大功告成。
这里说得很轻松,但其实整个过程真的挺麻烦呢。要是这个过程中还要对excel中的数据进行过滤,最终结果还要绘个图,还是需要不少时间。
但是,如果这个开发同学使用PyOdps 0.4+版本新特性,一切就都轻松写意了。
为了模拟这个过程,我们拿movielens 100K的数据做例子,现在本地有一个excel表格,里面有100个需要查询的用户,表格包含两个字段,分别是用户ID和年龄。在ODPS上,我们有一张电影评分表,现在我们要求出这100用户个中年龄在20-30之间,按每个年龄来求电影评分均值,并用条形图展现。
可以想象,这个过程如果按照前面的描述,有多麻烦。那么用PyOdps DataFrame API呢。
首先,我们读出本地Excel文件。
In [14]: from odps.df import read_excel In [15]: users = read_excel('/Users/chine/userids.xlsx') In [16]: users.head(10) |==========================================| 1 / 1 (100.00%) 0s Out[16]: id age 0 46 27 1 917 22 2 217 22 3 889 24 4 792 40 5 267 23 6 626 23 7 433 27 8 751 24 9 932 58 In [40]: users.count() |==========================================| 1 / 1 (100.00%) 0s 100
然后我们用join语句,过滤出来电影评分表中这些用户的评分数据。
In [17]: ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings')) In [18]: ratings.head(10) |==========================================| 1 / 1 (100.00%) 2s Out[18]: user_id movie_id rating unix_timestamp 0 196 242 3 881250949 1 186 302 3 891717742 2 22 377 1 878887116 3 244 51 2 880606923 4 166 346 1 886397596 5 298 474 4 884182806 6 115 265 2 881171488 7 253 465 5 891628467 8 305 451 3 886324817 9 6 86 3 883603013 In [25]: filter_ratings = ratings.join(users.filter(users.age.between(20, 30)), ('user_id', 'id'))[ratings, lambda x, y: y.age] # 这里做字段抽取时,可以使用Collection,也可以使用lambda表达式,参数是左右两个Collection In [26]: filter_ratings.head(10) |==========================================| 1 / 1 (100.00%) 44s Out[26]: user_id movie_id rating unix_timestamp age 0 3 350 3 889237076 23 1 3 332 1 889237224 23 2 3 327 4 889237455 23 3 3 341 1 889237055 23 4 3 317 2 889237482 23 5 3 336 1 889237198 23 6 3 322 3 889237269 23 7 3 323 2 889237269 23 8 3 339 3 889237141 23 9 3 268 3 889236961 23
然后我们就可以按年龄聚合,求出评分均值啦。绘图也一气呵成。
In [28]: age_ratings = filter_ratings.groupby('age').agg(lambda x: x.rating.mean()) In [29]: age_ratings.head(10) |==========================================| 1 / 1 (100.00%) 30s Out[29]: age rating_mean 0 20 4.002309 1 21 4.051643 2 22 3.227513 3 23 3.519174 4 24 3.481013 5 25 3.774744 6 26 3.391509 7 27 3.355130 8 28 3.382883 9 29 3.705660 In [30]: age_ratings.plot(kind='bar', x='age', rot=45) |==========================================| 1 / 1 (100.00%) 29s Out[30]: <matplotlib.axes._subplots.AxesSubplot at 0x10b875f10>
超级简单,有木有!
这里的users其实是存在于本地的,而ratings是存在于ODPS上,用户依然可以join这两个Collection。其实对于0.4之前的版本,本地数据上传的接口也很容易(但是无法使用DataFrame API来进行本地过滤),但是对于0.4版本,不管一个Collection是存在于ODPS还是本地,用户都可以执行join和union的操作。
而这一切都源自0.4版本带来的新特性,DataFrame API的pandas计算后端。
DataFrame API使用pandas计算
我们知道,PyOdps DataFrame API类似于pandas的接口,但还是有些许不同的,那我们为什么不能用pandas来执行本地计算呢,这样也能充分利用pandas的一些特性,如支持各种数据输入。
所以,除了过去使用odps.models.Table来初始化DataFrame,我们也可以使用pandas DataFrame来初始化。
In [41]: import numpy as np In [42]: import pandas as pd In [44]: pandas_df = pd.DataFrame(np.random.random((10, 3)), columns=list('abc')) In [45]: pandas_df Out[45]: a b c 0 0.583845 0.301504 0.764223 1 0.153269 0.335511 0.455193 2 0.725460 0.460367 0.294741 3 0.315234 0.907264 0.849361 4 0.678395 0.642199 0.746051 5 0.977872 0.841084 0.931561 6 0.903927 0.846036 0.982424 7 0.347098 0.373247 0.193810 8 0.672611 0.242942 0.381713 9 0.461411 0.687164 0.514689 In [46]: df = DataFrame(pandas_df) In [49]: type(df) Out[49]: odps.df.core.DataFrame In [47]: df.head(3) |==========================================| 1 / 1 (100.00%) 0s Out[47]: a b c 0 0.583845 0.301504 0.764223 1 0.153269 0.335511 0.455193 2 0.725460 0.460367 0.294741 In [48]: df[df.a < 0.5].a.sum() |==========================================| 1 / 1 (100.00%) 0s 1.2770121422535428
这里转化成PyOdps DataFrame后,所有的计算也一样,变成延迟执行,PyOdps DataFrame在计算前的优化也同样适用。
这样做的好处是,除了前面我们提到的,能结合本地和ODPS做计算外;还有个好处就是方便进行本地调试。所以,我们可以写出以下代码:
DEBUG = True if DEBUG: # 这个操作使用tunnel下载,因此速度很快。对于分区表,需要给出所有分区值。 df = ratings[:100].to_pandas(wrap=True) else: df = ratings
在DEBUG的时候,我们能够利用PyOdps DataFrame在对原始表做切片操作时使用tunnel下载,速度很快的特性,选择原始表的一小部分数据来作为本地测试数据。值得注意的是, 本地计算通过不一定能在ODPS上也计算通过,比如自定义函数的沙箱限制 。
目前pandas计算后端尚不支持窗口函数。
apply和MapReduce API
使用apply对单行数据调用自定义函数
以前我们对于某个字段,能调用map来使用自定义函数,现在结合axis=1的apply,我们能对一行数据进行操作。
In [13]: ratings.apply(lambda row: row.rating / float(row.age), axis=1, reduce=True, types='float', names='rda').head(10) |==========================================| 1 / 1 (100.00%) 1m44s Out[13]: rda 0 0.166667 1 0.166667 2 0.208333 3 0.208333 4 0.125000 5 0.208333 6 0.166667 7 0.208333 8 0.208333 9 0.125000
reduce为True的时候,会返回一个sequence,详细参考文档。
MapReduce API
PyOdps DataFrame API也提供MapReduce API。我们还是以movielens 100K为例子,看如何使用。
现在假设我们需要求出每部电影前两名的评分,直接上代码。
from odps.df import output @output(['movie_id', 'movie_title', 'movie_rating'], ['int', 'string', 'int']) def mapper(row): yield row.movie_id, row.title, row.rating @output(['title', 'top_rating'], ['string', 'int']) def reducer(keys): i = [0] def h(row, done): if i[0] < 2: yield row.movie_title, row.movie_rating i[0] += 1 return h In [7]: top_ratings = ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False) In [10]: top_ratings.head(10) |==========================================| 1 / 1 (100.00%) 3m48s Out[10]: title top_rating 0 Toy Story (1995) 5 1 Toy Story (1995) 5 2 GoldenEye (1995) 5 3 GoldenEye (1995) 5 4 Four Rooms (1995) 5 5 Four Rooms (1995) 5 6 Get Shorty (1995) 5 7 Get Shorty (1995) 5 8 Copycat (1995) 5 9 Copycat (1995) 5
利用刚刚说的本地DEBUG特性,我们也能使用本地计算来验证,计算结果能很快得出。人生苦短!
In [22]: local_ratings = ratings[:100].to_pandas(wrap=True) |==========================================| 1 / 1 (100.00%) 2s In [23]: local_ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False).head(10) |==========================================| 1 / 1 (100.00%) 0s Out[23]: title top_rating 0 Shanghai Triad (Yao a yao yao dao waipo qiao) ... 5 1 Twelve Monkeys (1995) 4 2 Seven (Se7en) (1995) 4 3 Usual Suspects, The (1995) 5 4 Postino, Il (1994) 3 5 Mr. Holland's Opus (1995) 4 6 Taxi Driver (1976) 5 7 Crumb (1994) 5 8 Star Wars (1977) 5 9 Star Wars (1977) 5
cache机制
在0.4之前的版本,我们提供了一个persist接口,来保存执行结果。但是这个操作是个立即执行接口。现在我们提供cache接口,cache的collection会被单独计算,但不会立即执行。
In [25]: tmpdf = ratings[ratings.title.len() > 10].cache() In [26]: tmpdf['title', 'movie_id'].head(3) |==========================================| 1 / 1 (100.00%) 35s Out[26]: title movie_id 0 Seven (Se7en) (1995) 11 1 Event Horizon (1997) 260 2 Star Wars (1977) 50 In [27]: tmpdf.count() # tmpdf已经被cache,所以我们能立刻计算出数量 |==========================================| 1 / 1 (100.00%) 0s 99823
记住,目前的cache接口,计算的结果还是要落地的,并不是存放在内存中。
而一个collection如果已经被计算过,这个过程会自动触发cache机制,后续的计算过程会从这计算个向后进行,而不再需要从头计算。
其他特性
PyOdps 0.4版本还带来一些其他特性,比如join支持mapjoin(只对ODPS后端有效);Sequence上支持unique和nunique;execute_sql执行时支持设置hints,对于IPython插件,支持使用SET来设置hints,等等。
PyOdps下一步计划
对于PyOdps的DataFrame API来说,我们的短期目标是能完成ODPS SQL能做的所有事情,然后在这个基础上再带来更多SQL不容易做到的,但是却很有用的操作。现在,除了自定义聚合函数,我们已经能基本涵盖所有的SQL场景。
PyOdps非常年轻,期待大家来使用、提feature、贡献代码。
- 安装方法:pip install pyodps
- Github:https://github.com/aliyun/aliyun-odps-python-sdk
- 文档:http://pyodps.readthedocs.org/
- bug report:https://github.com/aliyun/aliyun-odps-python-sdk/issues
PyOdps在交互式环境下的使用,让探索ODPS数据更容易些
春节结束了,是时候来些新鲜玩意,让我们来看一些酷的东西。
当当当当:隆重推出PyOdps logo。
好像跑题了,好吧,让我们言归正传。
我们知道Python提供了一个交互式的环境,能够方便探索和试验想法。同时,IPython是Python交互环境的增强,提供了很多强大的功能;IPython Notebook(现在已经是Jupyter Notebook)则更酷,提供了一个web界面,除了提供交互环境,还是一个记录计算过程的『笔记本』。
PyOdps也提供了一系列在交互式环境下的增强工具,使得探索ODPS数据更方便快捷。
配置ODPS帐号
Python交互环境
同一个环境支持配置若干个ODPS帐号,只需要:
In [1]: from odps.inter import setup In [2]: setup('**your-access_id**', '**your-access-key**', '**your-project**', endpoint='**your-endpoint**')
此时这个帐号会被配置到一个叫做default
的我们称之为room
的地方。以后我们再使用这个帐号只需要:
In [3]: from odps.inter import enter In [4]: room = enter() In [5]: o = room.odps In [6]: o.get_table('dual') Out[6]: odps.Table name: odps_test_sqltask_finance.`dual` schema: c_int_a : bigint c_int_b : bigint c_double_a : double c_double_b : double c_string_a : string c_string_b : string c_bool_a : boolean c_bool_b : boolean c_datetime_a : datetime c_datetime_b : datetime
通过room
的odps属性,我们可以取到ODPS的入口,这样就可以接着进行ODPS操作了。配置了别的room
比如叫做myodps,要取到ODPS入口,只需要enter('myodps').odps
即可。
list_rooms
方法能列出所有的room
。
In [17]: from odps.inter import list_rooms In [18]: list_rooms() Out[18]: ['default', 'meta']
IPython及Jupyter Notebook
PyOdps还提供了IPython插件。首先我们需要加载插件:
In [11]: %load_ext odps In [14]: %enter Out[14]: In [15]: o = _.odps In [16]: o.get_table('dual') Out[16]: odps.Table name: odps_test_sqltask_finance.`dual` schema: c_int_a : bigint c_int_b : bigint c_double_a : double c_double_b : double c_string_a : string c_string_b : string c_bool_a : boolean c_bool_b : boolean c_datetime_a : datetime c_datetime_b : datetime
_
下划线能取到上一步的结果。
保存常用的ODPS对象
room
除了提供ODPS入口的功能,还能保存常用的ODPS对象。比如,我们能把常用的表起个名字,给保存起来。
In [19]: iris = o.get_table('pyodps_iris') In [23]: room.store('iris_test', iris, desc='保存测试ODPS对象') In [28]: room['iris_test'] Out[28]: odps.Table name: odps_test_sqltask_finance.`pyodps_iris` schema: sepallength : double sepalwidth : double petallength : double petalwidth : double name : string In [29]: room.iris_test Out[29]: odps.Table name: odps_test_sqltask_finance.`pyodps_iris` schema: sepallength : double sepalwidth : double petallength : double petalwidth : double name : string
这两种方式都可以取到保存的ODPS对象。如果要列出当前room保存的所有ODPS对象,则可以:
In [30]: room.display() Out[30]: default desc name iris_test 保存测试ODPS对象 iris 安德森鸢尾花卉数据集
也可以使用IPython插件命令:
In [31]: %stores Out[31]: default desc name iris_test 保存测试ODPS对象 iris 安德森鸢尾花卉数据集
要删除某个ODPS对象:
In [32]: room.drop('iris_test') In [33]: %stores Out[33]: default desc name iris 安德森鸢尾花卉数据集
执行SQL命令
PyOdps提供了执行SQL的方法,但是在交互式环境下却不甚方便。使用PyOdps提供的IPython插件,可以通过sql命令来直接执行。
在执行时,需要配置全局帐号,如果已经使用了enter
方法或者命令,则已经配置;如果没有,则会尝试enter默认的room;如果这也没有配置,则需要使用to_global
方法。
In [34]: o = ODPS('**your-access-id**', '**your-secret-access-key**', project='**your-project**', endpoint='**your-end-point**')) In [35]: o.to_global()
这时我们就可以使用sql命令,单个百分号输入单行SQL,多行SQL使用两个百分号:
In [37]: %sql select * from pyodps_iris limit 5 |==========================================| 1 / 1 (100.00%) 3s Out[37]: sepallength sepalwidth petallength petalwidth name 0 5.1 3.5 1.4 0.2 Iris-setosa 1 4.9 3.0 1.4 0.2 Iris-setosa 2 4.7 3.2 1.3 0.2 Iris-setosa 3 4.6 3.1 1.5 0.2 Iris-setosa 4 5.0 3.6 1.4 0.2 Iris-setosa In [38]: %%sql ....: select * from pyodps_iris ....: where sepallength < 5 ....: limit 5 ....: |==========================================| 1 / 1 (100.00%) 15s Out[38]: sepallength sepalwidth petallength petalwidth name 0 4.9 3.0 1.4 0.2 Iris-setosa 1 4.7 3.2 1.3 0.2 Iris-setosa 2 4.6 3.1 1.5 0.2 Iris-setosa 3 4.6 3.4 1.4 0.3 Iris-setosa 4 4.4 2.9 1.4 0.2 Iris-setosa
在Jupyter Notebook里,多行SQL会提供语法高亮:
持久化pandas DataFrame为ODPS表
使用persist
命令即可:
In [42]: import pandas as pd In [43]: df = pd.read_csv('https://raw.github.com/pydata/pandas/master/pandas/tests/data/iris.csv') In [48]: %persist df pyodps_iris_test |==========================================| 150 /150 (100.00%) 0s In [49]: from odps.df import DataFrame In [61]: DataFrame(o.get_table('pyodps_iris_test')).head(5) |==========================================| 1 / 1 (100.00%) 0s Out[61]: sepallength sepalwidth petallength petalwidth name 0 5.1 3.5 1.4 0.2 Iris-setosa 1 4.9 3.0 1.4 0.2 Iris-setosa 2 4.7 3.2 1.3 0.2 Iris-setosa 3 4.6 3.1 1.5 0.2 Iris-setosa 4 5.0 3.6 1.4 0.2 Iris-setosa
其它交互式方面的增强
在交互式环境下,我们repr一个ODPS表的时候,会打印这个表的schema,包括字段注释,省去了查这个表的meta信息。
In [41]: o.get_table('china_stock', project='odpsdemo') Out[41]: odps.Table name: odpsdemo.`china_stock` schema: d : string # 日期 c : string # 股票代码 n : string # 股票名称 t_close : double # 收盘价 high : double # 最高价 low : double # 最低价 opening : double # 开盘价 l_close : double # 昨日收盘价 chg : double # 涨跌额 chg_pct : double # 涨跌幅 vol : bigint # 成交量 turnover : double # 成交额 partitions: code : string # 股票代码
当使用sql命令或者使用DataFrame框架计算的时候,在终端或者Jupyter Notebook里都提供一个进度条来方便用户来查看执行进度。
后记
PyOdps现在处于快速迭代阶段,我们所有的开发都是开源的。大家如果需要什么功能,可以给我们提issue(GitHub);也可以直接参与到开发,直接给我们发Pull Request就行啦。
欢迎大家一起来建设PyOdps。
github:https://github.com/aliyun/aliyun-odps-python-sdk
文档:http://pyodps.readthedocs.org/zh_CN/latest/
PyOdps DataFrame来临,大数据分析从未如此简单!
PyOdps正式发布DataFrame框架(此处应掌声经久不息),有了它,就像卷福有了花生,比翼双飞,哦不,如虎添翼。
快过年了,大家一定没心情看长篇大论的分析文章。作为介绍PyOdps DataFrame的开篇文章,我只说说其用起来爽的地方。其余的部分,从使用、问题到实现原理,我会分文章细说。
如果你不知道什么是ODPS,ODPS是阿里云旗下的大数据计算服务。如果不知道是DataFrame什么,它是存在于pandas和R里的数据结构,你可以把它当做是表结构。如果想快速浏览PyOdps DataFrame能做什么,可以看我们的快速开始文档。
让我们开始吧。
强类型支持
DataFrame API在计算的过程中,从字段到类型都是确定的,因此,若取一个不存在的字段,会丢给你个大大的异常。
In [4]: from odps.df import DataFrame In [5]: iris = DataFrame(o.get_table('pyodps_iris')) In [6]: iris.dtypes Out[6]: odps.Schema { sepallength float64 sepalwidth float64 petallength float64 petalwidth float64 name string } In [7]: iris.field_not_exist --------------------------------------------------------------------------- AttributeError Traceback (most recent call last) in () ----> 1 iris.field_not_exist /Users/chine/Workspace/pyodps/odps/df/expr/expressions.pyc in __getattr__(self, attr) 510 return self[attr] 511 --> 512 raise e 513 514 def output_type(self): AttributeError: 'DataFrame' object has no attribute 'field_not_exist'
如果取存在的字段,自然是没问题啦。
In [11]: iris.sepalwidth.head(5) |==========================================| 1 / 1 (100.00%) 0s Out[11]: sepalwidth 0 3.5 1 3.0 2 3.2 3 3.1 4 3.6
有些方法,比如说取平均数,非数字肯定是不能调用的咯。
In [12]: iris['name'].mean() --------------------------------------------------------------------------- AttributeError Traceback (most recent call last) in () ----> 1 iris['name'].mean() /Users/chine/Workspace/pyodps/odps/df/expr/expressions.pyc in __getattribute__(self, attr) 171 if new_attr in self._get_attr('_args'): 172 return self._get_arg(new_attr) --> 173 raise e 174 175 def _defunc(self, field): AttributeError: 'Column' object has no attribute 'mean'
数字类型的字段则可以调用。
In [10]: iris.sepalwidth.mean() |==========================================| 1 / 1 (100.00%) 27s 3.0540000000000007
操作数据如此简单
我们常常需要select一个表字段,但是只是不需要一个字段,却需要写一堆SQL。在DataFrame API里,调用exclude
方法就行了。
In [13]: iris.exclude('name').head(5) |==========================================| 1 / 1 (100.00%) 0s Out[13]: sepallength sepalwidth petallength petalwidth 0 5.1 3.5 1.4 0.2 1 4.9 3.0 1.4 0.2 2 4.7 3.2 1.3 0.2 3 4.6 3.1 1.5 0.2 4 5.0 3.6 1.4 0.2
使用DataFrame写出来的代码,天然有Python的特点,清晰易懂。某些快捷API,能使得操作更加简单。
比如我们要取name
的个数从大到小前10的值分别是多少。
In [16]: iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False)[:10] |==========================================| 1 / 1 (100.00%) 37s Out[16]: name count 0 Iris-virginica 50 1 Iris-versicolor 50 2 Iris-setosa 50
直接使用value_counts
来得更快。
In [17]: iris['name'].value_counts()[:10] |==========================================| 1 / 1 (100.00%) 34s Out[17]: name count 0 Iris-virginica 50 1 Iris-versicolor 50 2 Iris-setosa 50
很多时候,写一个SQL,我们需要检查中间结果的执行,就显得很麻烦,我们常常需要选取中间的SQL来执行,在DataFrame的世界,中间结果赋值一个变量就行了,这都不是事儿。
计算的过程和结果展示
在DataFrame的执行过程中,我们在终端里和IPython notebook里,都会有进度条显示任务的完成情况。结果的输出也会有更好的格式化展现,在IPython notebook里会以HTML表格的形式展现。
绘图集成
DataFrame的计算结果能直接调用plot方法来制作图表,不过绘图需要安装pandas
和matplotlib
。
In [21]: iris.plot() |==========================================| 1 / 1 (100.00%) 0s Out[21]: <matplotlib.axes._subplots.AxesSubplot at 0x10feab610>
导出数据再用excel画图,这事儿……咳咳,未来我们还会提供更好的可视化展现,比如提供交互式的图表。
自定义函数和Lambda表达式
DataFrame支持map方法,想对一个字段调用自定义函数非常方便。
In [30]: GLOBAL_VAR = 3.2 In [31]: def myfunc(x): if x < GLOBAL_VAR: return 0 else: return 1 In [32]: iris['sepalwidth', iris.sepalwidth.map(myfunc).rename('sepalwidth2')].head(5) |==========================================| 1 / 1 (100.00%) 18s Out[32]: sepalwidth sepalwidth2 0 3.5 1 1 3.0 0 2 3.2 1 3 3.1 0 4 3.6 1
可惜apply和聚合的自定义函数,暂时还不支持,期待吧!
延迟执行
DataFrame API的所有操作并不会立即执行,除非用户显式调用execute
方法或者一些立即执行的方法。在交互式界面下,打印或者repr对象的时候,内部也会调用execute
方法,方便用户使用。
执行优化
DataFrame框架在执行前会对整个查询进行优化,比如连续的projection合并。当用户查看原始表(或者选取某个分区)时,会使用tunnel来获取结果。
PyOdps DataFrame的下一步发展
好了,说了这么多,聊一聊我们DataFrame接下来要做的事情,首先,我们会实现多计算后端,包括pandas,当数据量比较小的时候,我们可以使用本地计算,而不需要等待ODPS的调度;其次,DataFrame框架和我们的机器学习部分会有更多的集成,从数据分析,到算法,一气呵成。
PyOdps非常年轻,才短短几个月的时间。我们的整个项目,在GitHub上开源。我个人非常希望大家能参与到开源的建设中来,能提个建议也是极好的。所以,我会写文章详述我们PyOdps的实现原理,希望大家一起把ODPS建设得更好。
github:https://github.com/aliyun/aliyun-odps-python-sdk
文档:http://pyodps.readthedocs.org/zh_CN/latest/