使用locust.io提供用户列表的方式

34

我需要对一个系统进行压力测试,http://locust.io看起来是这样做的最佳方式。然而,它似乎设置为每次使用相同的用户。我需要每个生成的用户登录不同的用户。我该如何设置?或者,是否有其他可以使用的系统?

4个回答

63

Locust的作者在这里。

默认情况下,每个HttpLocust用户实例都有一个HTTP客户端,它有它自己独立的会话

Locust没有提供提供用户凭据列表或类似功能的特性。但是,您的负载测试脚本只是Python代码,幸运的是,自己实现这个功能非常简单。

这里是一个简短的示例:

# locustfile.py

from locust import HttpLocust, TaskSet, task

USER_CREDENTIALS = [
    ("user1", "password"),
    ("user2", "password"),
    ("user3", "password"),
]

class UserBehaviour(TaskSet):
    def on_start(self):
        if len(USER_CREDENTIALS) > 0:
            user, passw = USER_CREDENTIALS.pop()
            self.client.post("/login", {"username":user, "password":passw})

    @task
    def some_task(self):
        # user should be logged in here (unless the USER_CREDENTIALS ran out)
        self.client.get("/protected/resource")

class User(HttpLocust):
    task_set = UserBehaviour
    min_wait = 5000
    max_wait = 60000

以上代码在运行Locust分布式时无法工作,因为相同的代码在每个从节点上运行,它们不共享任何状态。因此,您需要引入一些外部数据存储库,供从节点使用以共享状态(例如PostgreSQL、redis、memcached或其他)。


1
如果列表为空,会发生什么行为?也就是说,如果我们指定要生成100个用户,但我们的用户列表只有50个名称,那么会生成100个用户吗? - bearrito
扩展我的上一个问题。如果我们生成50个用户,但名单中有100个名称,那么蝗虫是否可能被杀死。似乎self.interrupt()可以实现这一点,或者我可以抛出异常。 - bearrito
在上面的例子中,如果您生成的用户数超过了您拥有的用户凭据,则会收到错误。Locust实例(用户)在测试运行期间不应该死亡。如果您想模拟一个用户离开,另一个用户进来的情况,最好重新使用正在运行的Locust实例。 - heyman
1
我认为最好的方法是在需要登录的页面中,在头部发送用户信息。 - Mesut GUNES
@raitisd 请检查我下面的解决方案。您可以持续使用它。 - Mesut GUNES
显示剩余3条评论

14

或者,您可以创建users.py模块来保存测试用例中所需的用户信息,在我的示例中,它保存了电子邮件cookies。然后您可以在任务中随机调用它们。请参见以下内容:

# locustfile.py
from locust import HttpLocust, TaskSet, task
from user_agent import *
from users import users_info


class UserBehaviour(TaskSet):
    def get_user(self):
        user = random.choice(users_info)
        return user

    @task(10)
    def get_siparislerim(self):
        user = self.get_user()
        user_agent = self.get_user_agent()
        r = self.client.get("/orders", headers = {"Cookie": user[1], 'User-Agent': user_agent})

class User(HttpLocust):
    task_set = UserBehaviour
    min_wait = 5000
    max_wait = 60000

用户和用户代理可以通过函数调用。这种方式,我们可以使用许多用户和不同的用户代理来分发测试。

# users.py

users_info = [
['performancetest.1441926507@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926506@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926501@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926499@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926494@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926493@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926492@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926491@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926490@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926489@gmail.com', 'cookies_created_by_each_user'], 
['performancetest.1441926487@gmail.com', 'cookies_created_by_each_user']] 

1
好的建议。不知道如果需要按顺序使用数据,您会提出什么建议。像其他建议中那样从数组中弹出? - David
@David,如果你需要的话,最好弹出来,因为你已经完成了它,然后再重新插入。 - Mesut GUNES

5
借鉴@heyman的答案。示例代码可以工作,但是继续启动/停止测试最终会清除USER_CREDENTIALS列表,并开始抛出错误。
我最终添加了以下内容:

2023更新

从v1.3.0开始,事件API已更改。
USER_CREDENTIALS = generate_users()


@events.spawning_complete.add_listener
def spawn_complete_handler(**kw):
    global USER_CREDENTIALS
    USER_CREDENTIALS = generate_users()

原始答案

from locust import events # in addition to the other locust modules needed

def hatch_complete_handler(**kw):
    global USER_CREDENTIALS
    USER_CREDENTIALS = generate_users() # some function here to regenerate your list

events.hatch_complete += hatch_complete_handler

当你的群集孵化完成后,这将刷新您的用户列表。

还要记住,您需要一个比您希望生成的用户数量更长的列表。


1
events.hatch_complete在Locust 1.3.0中已被弃用。请使用events.spawning_complete。USER_LIST = generate_users() @events.spawning_complete.add_listener def spawn_complete_handler(**kw): global USER_LIST USER_LIST = generate_users() - Gh0sT

4

当我在实现这个分布式系统时,我采用了稍微不同的方法。在TaskSet的on_start部分中,我使用了一个非常简单的Flask服务器,通过get调用它。

from flask import Flask, jsonify
app = Flask(__name__)

count = 0    #Shared Variable

@app.route("/")
def counter():
    global count

    count = count+1
    tenant = count // 5 + 1
    user = count % 5 + 1

    return jsonify({'count':count,'tenant':"load_tenant_{}".format(str(tenant)),'admin':"admin",'user':"load_user_{}".format(str(user))})

if __name__ == "__main__":
    app.run()

这样一来,我现在有了一个端点,无论在哪个主机上运行,我都可以访问http://localhost:5000/。我只需要使这个端点对从系统可访问,就不必担心重复用户或由于具有有限的user_info集而导致的某种轮询效应。


除了分布式模式之外,如果能够内置到 Locust 中,避免必须运行单独的服务器,那就更好了。虽然 locust 已经有了内置的 Web 服务,但我不想必须调用本地 REST 来获取数据。希望能够获取全局孵化计数器以进行参考。 - David
此外,对于共享数据集和分布式测试,如果数据集足够大,可以通过使用起始偏移量来避免单个数据集的重复,并且不需要使用您在此处提供的解决方案。每个从节点可以使用不同的(行/键)偏移量从数据集中提取数据,确保它们不会同时开始。如果数据集不够大,则至少它们会以不同的重叠方式轮流执行。 - David
我可能错了,但我认为这种方法只适用于开发/调试模式下的Flask服务器(即单线程),因为如果不是这样,使用全局变量就不是线程安全的,可能会导致问题。 - cjauvin
@David 孵化批次号的想法正在讨论中,该问题在 Expose an on_hatch event? And/or hatch number (e.g. we just hatched user number X) as well #634 中提出。我也很感兴趣,但直到现在没有人在开发它。 - Yushin Washio
@YushinWashio,那其实是我提出使用孵化器计数器的on_hatch事件的建议;-) 如果我有时间深入研究Locust源代码,我会自己查看实现。 - David

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接