在pandas操作期间显示进度指示器

343

我经常在超过1500万行的数据框上执行pandas操作,我希望能够在特定操作中获得进度指示器。

是否存在适用于pandas split-apply-combine操作的文本进度指示器?

例如,在以下操作中:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

在这里,feature_rollup是一个相当复杂的函数,它通过各种方法使用许多DF列创建新的用户列。对于大型数据帧,这些操作可能需要一段时间,因此我想知道是否可以在iPython笔记本中有基于文本的输出,以便更新进度。

到目前为止,我已经尝试了Python的常规循环进度指示器,但它们没有与Pandas进行任何有意义的交互。

我希望我在Pandas库/文档中忽略了一些东西,可以让人们知道拆分-应用-组合的进度。一个简单的实现可能会查看正在处理apply函数的数据帧子集的总数,并报告完成这些子集的分数。

这可能是需要添加到库中的内容吗?


你在代码上做过 %prun(分析)了吗?有时候你可以在应用之前对整个框架进行操作以消除瓶颈。 - Jeff
@Jeff:没错,我之前就这么做了,为了尽可能地提高性能。问题实际上归结于我正在处理的伪映射-归约边界,因为行数达到了数百万,所以我并不指望速度会有很大提升,只是想要一些进展方面的反馈。 - cwharland
考虑使用Cython进行优化:http://pandas.pydata.org/pandas-docs/dev/enhancingperf.html#cython-writing-c-extensions-for-pandas - Andy Hayden
@AndyHayden - 正如我在您的答案中评论的那样,您的实现非常好,并且为整个作业增加了一小部分时间。我还将特征汇总中的三个操作进行了Cython优化,这使得现在专门用于报告进度的所有时间都被恢复。因此,最终如果我在整个函数上使用Cython,我打赌我会有进度条并减少总处理时间。 - cwharland
10个回答

709

应广大用户要求,我已在 tqdm 中添加了对 pandas 的支持(pip install "tqdm>=4.9.0")。与其他答案不同,这不会明显减慢 pandas 的运行速度 -- 下面是一个使用 DataFrameGroupBy.progress_apply 的示例:

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

# Create new `pandas` methods which use `tqdm` progress
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

如果你对这个工作原理感兴趣(以及如何修改为自己的回调函数),请查看GitHub上的示例,在PyPI上查看完整文档,或导入模块并运行help(tqdm)。其他支持的函数包括mapapplymapaggregatetransform

编辑


直接回答原问题,请将以下内容替换:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

跟随着:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

注意:tqdm <= v4.8: 对于版本小于4.8的tqdm,您需要执行以下操作而不是tqdm.pandas()


对于版本小于4.8的tqdm,您需要执行以下操作而不是tqdm.pandas()

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())

25
tqdm 最初是为普通的可迭代对象创建的:from tqdm import tqdm; for i in tqdm( range(int(1e8)) ): pass,pandas 的支持只是我最近添加的一个小技巧 :) - casper.dcl
14
顺便说一下,如果你在使用Jupyter笔记本,你也可以使用tqdm_notebook来得到一个更漂亮的进度条。如果和pandas一起使用,你需要像这样实例化它:from tqdm import tqdm_notebook; tqdm_notebook().pandas(*args, **kwargs)请看这里 - grinsbaeckchen
3
从版本4.8.1起 - 使用tqdm.pandas()代替。https://github.com/tqdm/tqdm/commit/ad9abcc63330ad5d22fd8fca83dcec3e9d37afe6 - mork
1
@Casimir,该警告在tqdm>=4.57.0中消失。 - casper.dcl
1
这太棒了!没有这样的答案,生活将会是什么样子! - sandeepsign
显示剩余17条评论

33

如果您需要在Jupyter/ipython笔记本中使用此工具的支持,就像我一样,这里有一个有用的指南和相关来源 relevant article

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

请注意在导入语句中使用了下划线_tqdm_notebook。正如参考文章提到的,该开发项目目前处于晚期测试阶段。

2021年11月12日更新:

我目前正在使用pandas == 1.3.4tqdm == 4.62.3,不确定tqdm作者是从哪个版本开始实施了这个更改,但上述导入语句已经被弃用。请使用以下导入语句:

 from tqdm.notebook import tqdm_notebook

截至2022年02月01日更新 现在可以简化导入.py和.ipynb文件的语句:

from tqdm.auto import tqdm
tqdm.pandas()

对于两种类型的开发环境,这应该按预期工作,并且应该适用于pandas数据帧或其他值得tqdm迭代的对象。

截至2022年5月27日的更新: 如果您在SageMaker上使用jupyter笔记本,则此组合效果很好:

from tqdm import tqdm
from tqdm.gui import tqdm as tqdm_gui
tqdm.pandas(ncols=50)

20
为了调整Jeff的答案(并将其作为可重复使用的函数),请参考以下内容:

To tweak Jeff's answer (and have this as a reuseable function).


def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

注意:应用程序进度百分比会在行内更新。如果您的函数有标准输出,则此方法不起作用。


Note: 应用程序进度百分比会在行内更新. 如果您的函数有标准输出,则此方法不起作用.
In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

像往常一样,您可以将此添加到您的groupBy对象作为一个方法:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

正如评论中提到的那样,这并不是核心pandas感兴趣的实现功能。但是Python允许您为许多pandas对象/方法创建这些(这样做需要相当大量的工作...尽管您应该能够将此方法概括)。


我说“相当多的工作”,但你可能可以将整个函数重写为(更通用的)装饰器。 - Andy Hayden
感谢对Jeff的帖子进行了进一步的扩展。我已经实现了两者,每种方法的速度减慢都非常小(对于一个需要27分钟才能完成的操作而言,总共增加了1.1分钟)。这样我就可以查看进度,并且考虑到这些操作的临时性质,我认为这种减速是可以接受的。 - cwharland
很好,很高兴能帮到你。实际上,当我尝试一个例子时,我对它的减速感到惊讶,我预计它会更糟。 - Andy Hayden
1
为了进一步提高发布方法的效率,我在数据导入方面有些懒惰(pandas 处理混乱的 csv 文件太好用了!),我的一些条目(约 1%)完全被插入到错误的字段中(整个记录插入到单个字段中)。消除这些问题可以显著加快特征汇总的速度,因为在拆分-应用-组合操作期间不会存在任何歧义。 - cwharland
1
我只剩下8分钟了...但我在功能汇总中添加了一些东西(更多功能->更好的AUC!)。这8分钟是每个块(目前总共两个块),每个块大约有1200万行。所以,使用HDFStore在2400万行上执行繁重操作需要16分钟(并且在功能汇总中还有nltk内容)。非常不错。希望互联网不会因为最初的无知或对混乱插入的冷漠而评判我 =) - cwharland
显示剩余4条评论

16

如果有人想在他们的自定义并行 pandas-apply 代码上应用 tqdm,以下是一些帮助:

多年来,我尝试过一些并行化库,但我从未找到100%的并行化解决方案,主要是应用函数,我总是不得不回到我的"手动"代码。

df_multi_core - 你需要调用它。它接受:

  1. 您的 df 对象
  2. 您想要调用的函数名称
  3. 可以执行该函数的列的子集(有助于减少时间/内存)
  4. 要并行运行的作业数(-1或省略以使用所有内核)
  5. df 函数可接受的任何其他 kwargs (如"axis")

_df_split - 这是一个内部辅助函数,必须全局定位到运行模块(Pool.map 是"放置依赖项"的),否则我会将其定位为内部函数。

以下是来自我的 gist 的代码(我将在那里添加更多的 pandas 函数测试):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

以下是使用tqdm "progress_apply"并行化apply的测试代码。

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))
在输出结果中,你可以看到一条进度条表示单线程运行的进度,而使用并行化时每个核心都有自己的进度条。虽然有时会出现轻微的卡顿和其他核心同时出现的情况,但即使这样,也很有用,因为你能看到每个核心的进度统计信息(例如每秒处理条目数和总记录数)。

enter image description here

感谢 @abcdaa 提供这个很棒的库!

2
感谢 @mork - 请随意添加到 https://github.com/tqdm/tqdm/wiki/How-to-make-a-great-Progress-Bar 或在 https://github.com/tqdm/tqdm/wiki 创建新页面。 - casper.dcl
谢谢,但必须更改这些部分:try: splits = np.array_split(df[subset], njobs) except ValueError: splits = np.array_split(df, njobs)因为出现了KeyError异常而不是ValueError异常,所以改为使用Exception来处理所有情况。 - Marius
1
谢谢 @mork - 这个答案应该排名更高。 - Ian

11
每个回答都使用了pandas.DataFrame.groupby。如果你想在没有groupby的情况下在pandas.Series.apply中使用进度条,以下是如何在jupyter-notebook中实现的:
from tqdm.notebook import tqdm
tqdm.pandas()


df['<applied-col-name>'] = df['<col-name>'].progress_apply(<your-manipulation-function>)

1
我必须为任何想尝试此解决方案的人添加以下内容: 你将需要(tqdm 版本:tqdm>=4.61.2),否则它不会工作。另外,在安装了新版本的 tqdm 之后,请确保重新启动你的 jupyternotebook 内核。(例如,我使用了 pip install tqdm==4.62.3) - Dr Neo

5
你可以用装饰器轻松实现这一点。
from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

那么只需使用修改后的函数(并在需要打印时进行更改)。

1
明显的警告是这会减慢您的函数!您甚至可以让它随着进度更新,例如将count/len转换为百分比。https://dev59.com/BW035IYBdhLWcg3wbPfc - Andy Hayden
是的 - 你将拥有订单(组数),因此根据瓶颈所在,这可能会产生影响。 - Jeff
也许直觉上的做法是将其包装在一个logged_apply(g, func)函数中,这样您将可以访问顺序,并从一开始就进行日志记录。 - Andy Hayden
我在我的回答中做了上述操作,还加了点调皮的百分比更新。实际上,我无法让你的代码正常工作...我认为是因为wraps部分有问题。如果你只是用它来应用,那么这并不是很重要。 - Andy Hayden

1
我已经修改了Jeff's answer,加入了一个总数,以便您可以跟踪进度,并添加了一个变量以仅在每X次迭代时打印(如果“print_at”相当高,则实际上会大大提高性能)。
def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

clear_output()函数来自于

from IPython.core.display import clear_output

如果不使用IPython,Andy Hayden的答案可以在没有它的情况下完成。

1
对于像合并、连接和拼接这样的操作,可以使用Dask显示进度条。
您可以将Pandas数据帧转换为Dask数据帧。然后,您可以显示Dask进度条。
以下代码显示了一个简单的示例:
创建并转换Pandas数据帧
import pandas as pd
import numpy as np
from tqdm import tqdm
import dask.dataframe as dd

n = 450000
maxa = 700

df1 = pd.DataFrame({'lkey': np.random.randint(0, maxa, n),'lvalue': np.random.randint(0,int(1e8),n)})
df2 = pd.DataFrame({'rkey': np.random.randint(0, maxa, n),'rvalue': np.random.randint(0, int(1e8),n)})

sd1 = dd.from_pandas(df1, npartitions=3)
sd2 = dd.from_pandas(df2, npartitions=3)

合并进度条
from tqdm.dask import TqdmCallback
from dask.diagnostics import ProgressBar
ProgressBar().register()

with TqdmCallback(desc="compute"):
    sd1.merge(sd2, left_on='lkey', right_on='rkey').compute()

Dask相比Pandas在相同操作下更快且需要更少的资源:

  • Pandas 74.7毫秒
  • Dask 20.2毫秒

更多细节请参见:

注意1:我已经测试了这个解决方案:https://dev59.com/7bTma4cB1Zd3GeqPzwj-#56257514,但对我无效。不能测量合并操作。

注意2:我已经检查了Pandas中“tqdm”的“open request”:


0

关于连接操作:

df = pd.concat(
    [
        get_data(f)
        for f in tqdm(files, total=len(files))
    ]
)

tqdm 只返回一个可迭代对象。


0

如果你想要遍历群组,这个方法可以帮到你

from tqdm import tqdm

groups = df.groupby(group_cols)
for keys, grouped_df in tqdm(groups, total=groups.ngroups)
    pass

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接