在PHP7的pthreads扩展中使用Pool类

3
我将使用 Pool 类的 pthreads PHP7 扩展最基本的演示(该演示 https://github.com/krakjoe/pthreads#polyfill)并进行了一些扩展,以便我可以从线程中获取结果(或者至少我认为我可以):
$pool = new Pool(4);

foreach (range(1, 8) as $i) {
    $pool->submit(new class($i) extends Threaded
    {
        public $i;
        private $garbage = false;

        public function __construct($i)
        {
            $this->i = $i;
        }

        public function run()
        {
            echo "Hello World\n";
            $this->result = $this->i * 2;
            $this->garbage = true;
        }

        public function isGarbage() : bool
        {
            return $this->garbage;
        }
    });
}

while ($pool->collect(function(Collectable $task) {
    if ($task->isGarbage()) {
        echo $task->i . ' ' . $task->result . "\n";
    }
    return $task->isGarbage();
})) continue;

$pool->shutdown();

有些令我困惑的是,有时它不能为所有任务获取结果:
Hello World
Hello World
Hello World
Hello World
Hello World
1 2
2 4
Hello World
Hello World
3 6
Hello World
7 14
4 8
8 16

现在缺少两行,分别是5 106 12,但我不明白为什么会这样。这种情况只会偶尔发生(也许1/10次运行)。
看起来原始演示是针对旧版本的pthreads,因为有一个Collectable接口,如果我没有记错,现在Threaded已经自动实现了它。
然后readme上说:
“Pool::collect机制从Pool移动到Worker,以获得更强大的Worker和更简单的Pool继承。”
所以我想我做错了什么。 编辑:我从How does Pool::collect works?中获取了示例,并将其更新为与最新的pthreads和当前的PHP7兼容,但结果是相同的。看起来它无法收集执行的最后几个线程的结果。
$pool = new Pool(4);

while (@$i++<10) {
    $pool->submit(new class($i) extends Thread implements Collectable {
        public $id;
        private $garbage;

        public function __construct($id) {
            $this->id = $id;
        }

        public function run() {
            sleep(1);
            printf(
                "Hello World from %d\n", $this->id);
            $this->setGarbage();
        }

        public function setGarbage() {
            $this->garbage = true;
        }

        public function isGarbage(): bool {
            return $this->garbage;
        }

    });
}

while ($pool->collect(function(Collectable $work){
    printf(
        "Collecting %d\n", $work->id);
    return $work->isGarbage();
})) continue;

$pool->shutdown();

这会输出以下内容,显然没有收集到所有的线程:
Hello World from 1
Collecting 1
Hello World from 2
Collecting 2
Hello World from 3
Collecting 3
Hello World from 4
Collecting 4
Hello World from 5
Collecting 5
Hello World from 6
Hello World from 7
Collecting 6
Collecting 7
Hello World from 8
Hello World from 9
Hello World from 10
2个回答

1
正如你所正确指出的那样,你复制的代码针对的是pthreads v2(适用于PHP 5.x)。
问题归结为pthreads中的垃圾回收器不具有确定性。这意味着它的行为是无法预测的,因此无法可靠地用于提取已由池执行的任务的数据。
一种获取这些数据的方法是将Threaded对象传递到提交给池的任务中:
<?php

$pool = new Pool(4);
$data = [];

foreach (range(1, 8) as $i) {
    $dataN = new Threaded();
    $dataN->i = $i;

    $data[] = $dataN;

    $pool->submit(new class($dataN) extends Threaded {
        public $data;

        public function __construct($data)
        {
            $this->data = $data;
        }

        public function run()
        {
            echo "Hello World\n";
            $this->data->i *= 2;
        }
    });
}

while ($pool->collect());

$pool->shutdown();

foreach ($data as $dataN) {
    var_dump($dataN->i);
}

关于上述代码,有几点需要注意:
  • Collectable(现在是pthreads v3中的一个接口)已经被Threaded类实现,因此无需自己实现。
  • 一旦任务提交到池中,它就被认为是垃圾了,所以没有必要自己处理这部分。虽然您仍然可以覆盖默认的垃圾回收器,但在绝大多数情况下(包括您的情况),这是不需要的。
  • 我仍然调用collect方法(在一个阻塞主线程的循环中),以便在池执行任务时可以对任务进行垃圾回收(使用pthread的默认收集器)以释放内存。

0

我曾经遇到过类似的问题,即收集操作会立即返回 true。后来发现 collect 仅在所有工作都处于 in process 状态时才会返回,而不是在所有工作完成后返回。它甚至无法处理任务,因此永远不会返回 collecting

因此,如果我的池大小为 4,只提交了 3 个任务,collect 就永远不会运行,我们将立即继续执行。例如:

define ("CRLF", "\r\n");

class AsyncWork extends Thread {
  private $done = false;
  private $id;

  public function __construct($id) {
    $this->id = $id;
  }

  public function id() {
    return $this->id;
  }

  public function isCompleted() {
    return $this->done;
  }

  public function run() {
    echo '[AsyncWork] ' . $this->id . CRLF;
    sleep(rand(1,5));
    echo '[AsyncWork] sleep done ' . $this->id . CRLF;
    $this->done = true;
  }
}

$pool = new Pool(4);

for($i=1;$i<=3;$i++) {
  $pool->submit(new AsyncWork($i));
}

while ($pool->collect(function(AsyncWork $work){
    echo 'Collecting ['.$work->id().']: ' . ($work->isCompleted()?1:0) . CRLF;
    return $work->isGarbage();
})) continue;

echo 'ALL DONE' . CRLF;

$pool->shutdown();

将输出

[AsyncWork] 1
[AsyncWork] 2
ALL DONE
[AsyncWork] 3
[AsyncWork] sleep done 2
[AsyncWork] sleep done 3
[AsyncWork] sleep done 1

如果我将上面的代码更改为具有更多的工作量,那么它会收集直到所有工作都在处理中。例如:
for($i=1;$i<=10;$i++) {
  $pool->submit(new AsyncWork($i));
}

//results:

[AsyncWork] 1
[AsyncWork] 2
[AsyncWork] 3
[AsyncWork] 4
[AsyncWork] sleep done 4
[AsyncWork] 8
Collecting [4]: 1
[AsyncWork] sleep done 1
Collecting [1]: 1
[AsyncWork] 5
[AsyncWork] sleep done 3
Collecting [3]: 1
[AsyncWork] 7
[AsyncWork] sleep done 2
Collecting [2]: 1
[AsyncWork] 6
[AsyncWork] sleep done 6
Collecting [6]: 1
[AsyncWork] 10
[AsyncWork] sleep done 7
Collecting [7]: 1
[AsyncWork] sleep done 8
Collecting [8]: 1
[AsyncWork] sleep done 5
Collecting [5]: 1
ALL DONE
[AsyncWork] 9
[AsyncWork] sleep done 9
[AsyncWork] sleep done 10

正如您所看到的,它从不收集最后的任务并在工作完成之前返回。

我能够解决这个问题的唯一方法是通过跟踪任务列表来处理收集。

$pool = new Pool(4);

$worklist = [];
for($i=1;$i<=10;$i++) {
  $work = new AsyncWork($i);
  $worklist[] = $work;
  $pool->submit($work);
}

do {
  $alldone = true;
  foreach($worklist as $i=>$work) {
    if (!$work->isCompleted()) {
      $alldone = false;
    } else {
      echo 'Completed: '. $work->id(). CRLF;
      unset($worklist[$i]);
    }
  }

  if ($alldone) {
    break;
  }
} while(true);

while ($pool->collect(function(AsyncWork $work){
    echo 'Collecting ['.$work->id().']: ' . ($work->isCompleted()?1:0) . CRLF;
    return $work->isGarbage();
})) continue;

echo 'ALL DONE' . CRLF;

$pool->shutdown();

这是我能确保只有在所有工作都完成时才调用ALL DONE的唯一方法。

[AsyncWork] 1
[AsyncWork] 2
[AsyncWork] 3
[AsyncWork] 4
[AsyncWork] sleep done 1
[AsyncWork] 5
Completed: 1
[AsyncWork] sleep done 2
Completed: 2
[AsyncWork] 6
[AsyncWork] sleep done 4
[AsyncWork] 8
Completed: 4
[AsyncWork] sleep done 6
[AsyncWork] sleep done 3
[AsyncWork] 7
Completed: 6
Completed: 3
[AsyncWork] sleep done 5
Completed: 5
[AsyncWork] 10
[AsyncWork] 9
[AsyncWork] sleep done 9
Completed: 9
[AsyncWork] sleep done 8
Completed: 8
[AsyncWork] sleep done 7
Completed: 7
[AsyncWork] sleep done 10
Completed: 10
Collecting [1]: 1
Collecting [5]: 1
Collecting [9]: 1
Collecting [2]: 1
Collecting [6]: 1
Collecting [10]: 1
Collecting [3]: 1
Collecting [7]: 1
Collecting [4]: 1
Collecting [8]: 1
ALL DONE

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接