使用PHP和MySQL实现简单队列?

18

我有一个PHP脚本,从数据库检索行,然后根据内容执行工作。 这项工作可能很耗时(但不一定是计算高昂的),因此我需要允许多个脚本并行运行。

数据库中的行大致如下:

+---------------------+---------------+------+-----+---------------------+----------------+
| Field               | Type          | Null | Key | Default             | Extra          |
+---------------------+---------------+------+-----+---------------------+----------------+
| id                  | bigint(11)    | NO   | PRI | NULL                | auto_increment |
.....
| date_update_started | datetime      | NO   |     | 0000-00-00 00:00:00 |                |
| date_last_updated   | datetime      | NO   |     | 0000-00-00 00:00:00 |                |
+---------------------+---------------+------+-----+---------------------+----------------+

我的脚本目前选择最旧的日期在date_last_updated中的行(一旦完成工作就会更新),并且不使用date_update_started

如果我现在同时运行多个脚本实例,它们将选择相同的行(至少有些时间)并且会重复执行工作。

我想做的是使用事务来选择行,更新date_update_started列,然后向选择行的SQL语句添加一个WHERE条件,只选择具有date_update_started大于某个值的行(以确保另一个脚本没有在处理它)。例如:

$sth = $dbh->prepare('
    START TRANSACTION;
    SELECT * FROM table WHERE date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000;
    UPDATE table DAY SET date_update_started = UTC_TIMESTAMP() WHERE id IN (SELECT id FROM table WHERE date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000;);
    COMMIT;
');
$sth->execute(); // in real code some values will be bound
$rows = $sth->fetchAll(PDO::FETCH_ASSOC);

根据我所了解的,这本质上是队列实现方式,在MySQL中似乎不被看好。尽管如此,我仍然需要找到一种允许多个脚本并行运行的方法,而在我所做的研究中,这就是我想出来的方案。

这种方法会有效吗?是否有更好的方法?


你如何运行并行脚本? - Lupin
@Lupin 目前脚本每15分钟通过cron job执行一次。脚本会检查是否有另一个实例正在运行,如果是,则终止。我还不确定如何管理多个正在运行的脚本 - 我可能会在数据库中使用计数器来查看有多少个正在运行并限制实例的数量,但一次只解决一个问题 :-) - Nate
好的,有一些额外的问题需要我完全理解:
  1. 您有一个脚本,选择行并对其进行操作,然后将其更新回数据库,是吗?
  2. 您希望能够运行并执行相同操作但在不同行上的并行脚本,对吗?
  3. 每次运行脚本时,所选行是否连续,即它们是1-100、101-200等,还是根据id随机选择,并且只选择那些date_update_started大于1的行?
- Lupin
@Lupin 1. 是的,2. 是的,3. 行是基于日期字段和另一个未在示例中显示的字段进行选择的。因此它们不是严格意义上的“连续的”,但它们按照两个字段排序。 - Nate
另一种方法是使用某种主脚本获取一些行(例如 SELECT ... LIMIT 5),然后为每个行启动一个单独的处理脚本实例。您甚至可以使用第二个表来跟踪当前运行的处理实例数量,因此每当 cron 启动您的主脚本时,它都会知道要获取多少行。但由于这与您所要求的完全不同,因此我决定将其作为评论而不是答案添加。 - Patrick Echterbruch
6个回答

8
我认为你的方法可行,只要你还在所选行中添加某种标识符,表明它们当前正在被处理。正如@JuniusRendel建议的那样,可以使用另一个字符串键(随机或实例ID)来处理脚本导致错误并且未能完美完成的情况,因为在你的工作完成后,你需要清理这些字段。但是,我认为这种方法存在的问题在于,有可能会有2个脚本在同一时间运行,并在被标记为锁定之前选择相同的行。在我看来,这主要取决于你对这些行的操作类型,如果这两个脚本的最终结果相同,我认为你唯一的问题就是浪费时间和服务器内存(这不是小问题,但现在我会把它们放在一边...)。如果你的工作将在两个脚本上产生不同的更新,则可能会出现在TB中最终更新错误的问题。

@Jean提到了第二种方法,它涉及使用MySql锁。我不是这个主题的专家,但似乎是一个不错的方法,并且使用“Select .... FOR UPDATE”语句可以给你想要的结果,因为你可以在同一次调用中完成选择和更新,这比两个单独的查询更快,并且可以减少其他实例选择这些行的风险,因为它们将被锁定。

“SELECT .... FOR UPDATE”允许您运行选择语句并锁定那些特定的行以进行更新,因此您的语句可能如下所示:

START TRANSACTION;
   SELECT * FROM tb where field='value' LIMIT 1000 FOR UPDATE;
   UPDATE tb SET lock_field='1' WHERE field='value' LIMIT 1000;
COMMIT;

锁定功能很强大,但要小心,不要影响应用程序的其他部分。检查一下当前被锁定以进行更新的选定行是否在应用程序的其他地方(可能是面向最终用户)被请求,并且在这种情况下会发生什么。

此外,表必须为InnoDB,并建议您使用Mysql索引来检查where子句的字段,否则可能会锁定整个表或遇到“间隙锁定”。

还有可能锁定过程,特别是在运行并行脚本时,会对CPU和内存造成负担。

关于这个问题,这里有另一篇文章可以阅读:http://www.percona.com/blog/2006/08/06/select-lock-in-share-mode-and-for-update/

希望这能有所帮助,并想听听您的进展。


5

我们在生产环境中实现了类似于这样的功能。

为了避免重复,我们执行类似于下面这样的MySQL UPDATE语句(我修改了查询语句以类似于您的表):

UPDATE queue SET id = LAST_INSERT_ID(id), date_update_started = ... 
WHERE date_update_started IS NULL AND ...
LIMIT 1;

我们需要在单个事务中执行此UPDATE操作,并利用LAST_INSERT_ID函数。当以带有参数的方式使用时,它会将参数写入事务会话中,在本例中,它是更新过的(如果有)单个(LIMIT 1)队列的ID。
之后,我们执行以下操作:
SELECT LAST_INSERT_ID();

当没有参数时使用,它会检索先前存储的值,获取需要执行的队列项的ID。


你能详细说明一下你所说的“写锁”是什么吗?可以举个代码例子吗? - Nate
@Nate,已经编辑和扩展了一下 ;) 顺便提一下,我建议使用RabbitMQ。我们梦想着使用它 :D - Alessandro Lai

1
每次运行脚本时,我都希望脚本生成一个唯一标识符。
$sctiptInstance = uniqid();

我会在表中添加一个脚本实例列来保存该值作为varchar,并在其上放置一个索引。当脚本运行时,我会在事务内使用select for update来选择基于任何逻辑的行,不包括具有脚本实例的行,然后使用脚本实例更新这些行。类似这样:
START TRANSACTION;
SELECT * FROM table WHERE script_instance = '' AND date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000 FOR UPDATE;
UPDATE table SET date_update_started = UTC_TIMESTAMP(), script_instance = '{$scriptInstance}' WHERE script_instance = '' AND date_update_started > 1 DAY ORDER BY date_last_updated LIMIT 1000;
COMMIT;

现在这些行将从脚本的其他实例中排除。执行任务后,请更新行以将脚本实例设置为null或空白,并更新您的上次更新列。
您还可以使用脚本实例写入另一个名为“当前实例”之类的表,并让脚本检查该表以获取正在运行的脚本计数,以控制并发脚本的数量。我还会将脚本的PID添加到表中。然后,您可以使用该信息创建一个定期从cron运行的清理脚本,以检查长时间运行或流氓进程并杀死它们等。

1

过去我曾经出于类似的原因使用过存储过程。我们使用了FOR UPDATE读锁来在选中标志被更新以从任何未来的查询中删除该条目时锁定表格。它看起来像这样:

CREATE PROCEDURE `select_and_lock`()
 BEGIN
  START TRANSACTION;
  SELECT your_fields FROM a_table WHERE some_stuff=something 
   AND selected = 0 FOR UPDATE;
  UPDATE a_table SET selected = 1;
  COMMIT;
 END$$

不过想一想,这个操作并不一定要在存储过程中完成。


1

编辑:对不起,我完全误解了你的问题。

你只需要在表格上添加一个“锁定”列,在你的脚本正在处理的条目上将值设置为true,完成后将其设置为false。

在我的情况下,我还添加了3个其他时间戳(整数)列:target_ts,start_ts,done_ts。

UPDATE table SET locked = TRUE WHERE target_ts<=UNIX_TIMESTAMP() AND ISNULL(done_ts) AND ISNULL(start_ts);

然后

SELECT * FROM table WHERE target_ts<=UNIX_TIMESTAMP() AND ISNULL(start_ts) AND locked=TRUE;

请逐个更新每个条目(以避免数据不一致),将完成时间(done_ts)属性设置为当前时间戳(您也可以现在解锁它们)。您可以将目标时间戳(target_ts)更新为您希望的下一个更新时间,或者您可以忽略此列并只使用完成时间戳(done_ts)进行选择。


我认为PHP实际上不支持多线程,但无论如何,运行多个脚本实例并不是问题所在。问题主要是如何处理从数据库检索行的情况。 - Nate
我更新了,抱歉可能当时有点喝醉了 :)。至于线程,我不知道,这是PECL扩展声称的,但我没有测试过,所以... - n00dl3

1
我在生产环境中有一个与此完全相同的系统。我们每分钟运行一次脚本进行一些处理,有时运行时间可能超过一分钟。
我们有一个状态表列,其中0表示尚未运行,1表示已完成,其他值表示正在进行中。
脚本的第一件事是更新表格,将一个或多个行设置为一个值,表示我们正在处理该行。我们使用getmypid()更新要处理但仍未处理的行。
当我们完成处理后,脚本会更新具有相同进程ID的行,并将它们标记为已完成(状态1)。
这样我们就避免了每个脚本尝试处理已经在处理中的行的情况,而且效果非常好。这并不意味着没有更好的方法,但这种方法确实能够完成工作。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接