如何在Python中高效处理连续追加新项的列表

3

目标:

可视化特定生物种群在有限时间内的大小。

假设:

  • 该生物的寿命为age_limit
  • 只有年龄为day_lay_egg天的雌性才能产卵,每个雌性最多可以产下max_lay_egg次。每个繁殖期最多只能产下egg_no个蛋,并有50%的概率产生公共后代。
  • 初始种群由2个雌性和1个雄性组成,共3个生物。

代码片段:

当前,以下代码应该产生预期输出。

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns


def get_breeding(d,**kwargs):

    if d['lay_egg'] <= kwargs['max_lay_egg'] and d['dborn'] > kwargs['day_lay_egg'] and d['s'] == 1:
            nums = np.random.choice([0, 1], size=kwargs['egg_no'], p=[.5, .5]).tolist()
            npol=[dict(s=x,d=d['d'], lay_egg=0, dborn=0) for x in nums]
            d['lay_egg'] = d['lay_egg'] + 1
            return d,npol

    return d,None



def to_loop_initial_population(**kwargs):

    npol=kwargs['ipol']
    nday = 0
    total_population_per_day = []
    while nday < kwargs['nday_limit']:
        # print(f'Executing day {nday}')

        k = []
        for dpol in npol:
            dpol['d'] += 1
            dpol['dborn'] += 1
            dpol,h = get_breeding(dpol,**kwargs)

            if h is None and dpol['dborn'] <= kwargs['age_limit']:
                # If beyond the age limit, ignore the parent and update only the decedent 
                k.append(dpol)
            elif isinstance(h, list) and dpol['dborn'] <= kwargs['age_limit']:
                # If below age limit, append the parent and its offspring
                h.extend([dpol])
                k.extend(h)

        total_population_per_day.append(dict(nsize=len(k), day=nday))
        nday += 1
        npol = k

    return total_population_per_day


## Some spec and store all  setting in a dict   
numsex=[1,1,0] # 0: Male, 1: Female

# s: sex, d: day, lay_egg: Number of time the female lay an egg, dborn: The organism age
ipol=[dict(s=x,d=0, lay_egg=0, dborn=0) for x in numsex] # The initial population
age_limit = 45 # Age limit for the species
egg_no=3 # Number of eggs
day_lay_egg = 30  # Matured age for egg laying
nday_limit=360
max_lay_egg=2
para=dict(nday_limit=nday_limit,ipol=ipol,age_limit=age_limit,
          egg_no=egg_no,day_lay_egg=day_lay_egg,max_lay_egg=max_lay_egg)


dpopulation = to_loop_initial_population(**para)


### make some plot
df = pd.DataFrame(dpopulation)
sns.lineplot(x="day", y="nsize", data=df)
plt.xticks(rotation=15)
plt.title('Day vs population')
plt.show()

输出:

问题/疑问:

随着nday_limit的增加,执行时间呈指数增长。我需要提高代码效率。如何缩短运行时间?

其他想法:

我想尝试使用joblib,但令我惊讶的是,执行时间更差了。

def djob(dpol,k,**kwargs):
    dpol['d'] = dpol['d'] + 1
    dpol['dborn'] = dpol['dborn'] + 1
    dpol,h = get_breeding(dpol,**kwargs)

    if h is None and dpol['dborn'] <= kwargs['age_limit']:
        # If beyond the age limit, ignore the that particular subject
        k.append(dpol)
    elif isinstance(h, list) and dpol['dborn'] <= kwargs['age_limit']:
        # If below age limit, append the parent and its offspring
        h.extend([dpol])
        k.extend(h)

    return k
def to_loop_initial_population(**kwargs):

    npol=kwargs['ipol']
    nday = 0
    total_population_per_day = []
    while nday < kwargs['nday_limit']:


        k = []


        njob=1 if len(npol)<=50 else 4
        if njob==1:
            print(f'Executing day {nday} with single cpu')
            for dpols in npol:
                k=djob(dpols,k,**kwargs)
        else:
            print(f'Executing day {nday} with single parallel')
            k=Parallel(n_jobs=-1)(delayed(djob)(dpols,k,**kwargs) for dpols in npol)
            k = list(itertools.chain(*k))
            ll=1


        total_population_per_day.append(dict(nsize=len(k), day=nday))
        nday += 1
        npol = k

    return total_population_per_day

对于

nday_limit=365

2
请提供最小可重现的示例以便于理解和提供帮助。参考此链接:https://stackoverflow.com/help/minimal-reproducible-example - Abhi
每天只有一次繁殖会议?所有生物的寿命都是恰好age_limit?什么是age_limit?没有资源限制(如果有的话,我可以告诉你,人口要么呈指数增长,要么就会灭绝)。 - Bob
只是顺便提一下:当您调用 to_loop_initial_population(**para) 时,必须将 para 的所有值提取出来,并仅作为关键字值传递,以便再次重组成字典,因为您正在将单个参数声明为 **kwargs。我认为,如果您将此函数声明为 to_loop_initial_population(kwargs) 或更好地声明为 to_loop_initial_population(para) 并使用 to_loop_initial_population(para) 调用它会更有效率。也就是说,显式地传递一个 dict 实例。 - Booboo
1
目前需要多长时间,您认为可以接受的最短运行时间是多少? - TylerH
@rpb,尽可能使用numpy数组操作而不是循环可以提高性能,请参见我下面的答案。 - gftea
显示剩余3条评论
3个回答

2
您的代码整体看起来还不错,但我可以看到几个需要改进的地方,这些地方显著地拖慢了您的代码速度。
需要注意的是,随着nday值的增加,您需要跟踪的人口数量不断增长,并且您不断重新填充列表以进行跟踪。当对象数量增加时,预计循环所需的时间会变长,但您可以减少完成单个循环所需的时间。
elif isinstance(h, list) and dpol['dborn'] <= kwargs['age_limit']:

在每个循环中,您都会询问h的实例,确认它是否为None。您知道h肯定是一个列表,如果不是,即使在到达该行之前,您的代码也会出错,因为无法创建列表。
此外,您对dpolage进行了冗余条件检查,然后先通过dpol扩展h,然后通过hk扩展。这可以与前面的问题一起简化为以下内容:
if dpol['dborn'] <= kwargs['age_limit']:
    k.append(dpol)

if h:
    k.extend(h)

结果完全相同。
此外,您正在传递许多`**kwargs`。这表明您的代码应该是一个类,其中一些不变的参数通过`self.parameter`保存。您甚至可以在这里使用数据类(https://docs.python.org/3/library/dataclasses.html
另外,您混合了函数的职责,这是不必要的,并使您的代码更加混乱。例如:
def get_breeding(d,**kwargs):

    if d['lay_egg'] <= kwargs['max_lay_egg'] and d['dborn'] > kwargs['day_lay_egg'] and d['s'] == 1:
            nums = np.random.choice([0, 1], size=kwargs['egg_no'], p=[.5, .5]).tolist()
            npol=[dict(s=x,d=d['d'], lay_egg=0, dborn=0) for x in nums]
            d['lay_egg'] = d['lay_egg'] + 1
            return d,npol

    return d,None

这段代码有两个职责:如果条件符合,则生成一个新的个体并检查这些条件,并根据它们返回两个不同的结果。
最好通过两个单独的函数来完成,一个仅检查条件,另一个则按以下方式生成一个新的个体:
def check_breeding(d, max_lay_egg, day_lay_egg):
    return d['lay_egg'] <= max_lay_egg and d['dborn'] > day_lay_egg and d['s'] == 1


def get_breeding(d, egg_no):
    nums = np.random.choice([0, 1], size=egg_no, p=[.5, .5]).tolist()
    npol=[dict(s=x, d=d['d'], lay_egg=0, dborn=0) for x in nums]
    return npol

如果满足条件,当迭代列表时,d['lay_egg'] 可以被原地更新。

如果您在迭代列表时编辑它(通常不建议这样做,但如果您知道自己在做什么,那么完全可以这样做),可以进一步加快代码速度。确保使用索引进行操作,并将其限制在列表长度的先前边界,并在删除元素时递减索引。

示例:

i = 0
maxiter = len(npol)
while i < maxiter:
    if check_breeding(npol[i], max_lay_egg, day_lay_egg):
        npol.extend(get_breeding(npol[i], egg_no))
    
    if npol[i]['dborn'] > age_limit:
            npol.pop(i)
            i -= 1
            maxiter -= 1

“这可以显著减少处理时间,因为您不需要在每次迭代中创建新列表并附加所有元素。”
“最后,您可以检查一些人口增长方程和统计方法,甚至可以将整个代码简化为迭代的计算问题,尽管这不再是模拟。”
“编辑”
“我已经完全实现了我的改进建议,并在Jupyter笔记本中使用'%%time'进行了计时。我从两个版本中分离出了函数定义,以便它们不会贡献时间,结果很明显。我还使雌性100%生产另一个雌性,以消除随机性,否则速度会更快。我比较了两者的结果,以验证它们产生相同的结果(它们确实如此,但我删除了“d_born”参数,因为它除了设置外没有在代码中使用)。”
“您的实现,使用'nday_limit=100'和'day_lay_egg=15':” “Wall time 23.5s”
“我的实现与相同的参数:” “Wall time 18.9s”
“因此,您可以看出差异非常显着,对于更大的'nday_limit'值,差距会进一步扩大。”
编辑后代码的完整实现:
from dataclasses import dataclass
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns


@dataclass
class Organism:
    sex: int
    times_laid_eggs: int = 0
    age: int = 0

    def __init__(self, sex):
        self.sex = sex


def check_breeding(d, max_lay_egg, day_lay_egg):
    return d.times_laid_eggs <= max_lay_egg and d.age > day_lay_egg and d.sex == 1


def get_breeding(egg_no): # Make sure to change probabilities back to 0.5 and 0.5 before using it
    nums = np.random.choice([0, 1], size=egg_no, p=[0.0, 1.0]).tolist()
    npol = [Organism(x) for x in nums]
    return npol


def simulate(organisms, age_limit, egg_no, day_lay_egg, max_lay_egg, nday_limit):
    npol = organisms
    nday = 0
    total_population_per_day = []

    while nday < nday_limit:
        i = 0
        maxiter = len(npol)
        while i < maxiter:
            npol[i].age += 1
            
            if check_breeding(npol[i], max_lay_egg, day_lay_egg):
                npol.extend(get_breeding(egg_no))
                npol[i].times_laid_eggs += 1

            if npol[i].age > age_limit:
                npol.pop(i)
                maxiter -= 1
                continue

            i += 1

        total_population_per_day.append(dict(nsize=len(npol), day=nday))
        nday += 1

    return total_population_per_day


if __name__ == "__main__":
    numsex = [1, 1, 0]  # 0: Male, 1: Female

    ipol = [Organism(x) for x in numsex]  # The initial population
    age_limit = 45  # Age limit for the species
    egg_no = 3  # Number of eggs
    day_lay_egg = 15  # Matured age for egg laying
    nday_limit = 100
    max_lay_egg = 2

    dpopulation = simulate(ipol, age_limit, egg_no, day_lay_egg, max_lay_egg, nday_limit)

    df = pd.DataFrame(dpopulation)
    sns.lineplot(x="day", y="nsize", data=df)
    plt.xticks(rotation=15)
    plt.title('Day vs population')
    plt.show()

你能定义“显著减少”吗? - mpx
1
@rpb,您现在可以看到我所发布的执行时间差异以及完整重新实现。 - Jack Avante
最佳实践是深度复制输入到模拟中的npol/organisms列表。我正在使用相同的可变列表参数计时您的实现。 - JadeSpy

0

尽可能使用numpy数组操作,而不是使用循环,可以提高性能。请参见以下在笔记本中测试的代码- https://www.kaggle.com/gfteafun/notebook03118c731b

请注意,在比较时间时,nsize规模很重要。

%%time​
​
# s: sex, d: day, lay_egg: Number of time the female lay an egg, dborn: The organism age
x = np.array([(x, 0, 0, 0) for x in numsex ] )
iparam = np.array([0, 1, 0, 1])
​
total_population_per_day = []
for nday in range(nday_limit):
    x = x + iparam
    c = np.all(x < np.array([2, nday_limit, max_lay_egg, age_limit]), axis=1) & np.all(x >= np.array([1, day_lay_egg, 0, day_lay_egg]), axis=1)
    total_population_per_day.append(dict(nsize=len(x[x[:,3]<age_limit, :]), day=nday))
    n = x[c, 2].shape[0]
​
    if n > 0:
        x[c, 2] = x[c, 2] + 1
        newborns = np.array([(x, nday, 0, 0) for x in np.random.choice([0, 1], size=egg_no, p=[.5, .5]) for i in range(n)])
        x = np.vstack((x, newborns))
​
​
df = pd.DataFrame(total_population_per_day)
sns.lineplot(x="day", y="nsize", data=df)
plt.xticks(rotation=15)
plt.title('Day vs population')
plt.show()

enter image description here


使用numpy的建议很棒,其操作非常快速高效。如果结合使用矩阵的方法,它可能会变得极快。尽管我担心随着时间的推移,内存使用会变得更加爆炸。请注意,您提供的Kaggle笔记本链接无法使用。 - Jack Avante
笔记本也运行了您的实现,大约2900秒后,您的实现耗尽了内存。 - gftea
现在我将nday_limit更改为200,笔记本现在应该可以访问了。 - gftea

0

尝试将您的代码结构化为类似于state[age][eggs_remaining] = count的矩阵。它将具有age_limit行和max_lay_egg列。

雄性从0 eggs_remaining列开始,每当雌性下蛋时,它们就会向下移动一个位置(3->2->1->0,如上所述)。

对于每个周期,您只需删除最后一行,迭代所有age_limit之后的行,并插入一个新的第一行,其中包含雄性和雌性数量。

如果(如您的示例中所示),女性在产完所有蛋之前死亡的可能性非常小,那么您可以将所有内容折叠成state_alive[age][gender] = countstate_eggs[eggs_remaining] = count,但除非年龄非常高或您想运行数千次模拟,否则不应该是必要的。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接