Guzzle池:等待请求

11

是否可能让Guzzle池等待请求?

目前,我可以动态地向池中添加请求,但是一旦池为空,guzzle就会停止(显然)。

当我同时处理大约10个页面时,这是一个问题,因为我的请求数组将在处理完生成的HTML页面并添加新链接之前为空。

这是我的生成器:

$generator = function () {
  while ($request = array_shift($this->requests)) {
    if (isset($request['page'])) {
      $key = 'page_' . $request['page'];
    } else {
      $key = 'listing_' . $request['listing'];
    }

    yield $key => new Request('GET', $request['url']);                                          
  }
  echo "Exiting...\n";
  flush();
};

还有我的游泳池:

$pool = new Pool($this->client, $generator(), [
  'concurrency' => function() {
    return max(1, min(count($this->requests), 2));
  },
  'fulfilled' => function ($response, $index) {
      // new requests may be added to the $this->requests array here
  }
  //...
]);

$promise = $pool->promise();
$promise->wait();

@Alexey Shockov回答后编辑的代码:

$generator = function() use ($headers) {
  while ($request = array_shift($this->requests)) {
    echo 'Requesting ' . $request['id'] . ': ' . $request['url'] . "\r\n";

    $r = new Request('GET', $request['url'], $headers);

    yield 'id_' . $request['id'] => $this->client->sendAsync($r)->then(function($response, $index) {
      echo 'In promise fulfillment ' . $index . "\r\n";
    }, function($reason, $index) {
      echo 'in rejected: ' . $index . "\r\n";
    });
  }
};

$promise = \GuzzleHttp\Promise\each_limit($generator(), 10, function() {
  echo 'fullfilled' . "\r\n";
  flush();
}, function($err) {
  echo 'rejected' . "\r\n";
  echo $err->getMessage();
  flush();
});
$promise->wait();
5个回答

7

很遗憾,你不能使用生成器实现这个功能,只能使用自定义迭代器。

我准备了一个完整示例的代码片段,但主要思路就是创建一个迭代器,在两种状态下改变其状态(在结束后可以重新变为有效状态)。

psysh中使用ArrayIterator的示例:

>>> $a = new ArrayIterator([1, 2])
=> ArrayIterator {#186
     +0: 1,
     +1: 2,
   }
>>> $a->current()
=> 1
>>> $a->next()
=> null
>>> $a->current()
=> 2
>>> $a->next()
=> null
>>> $a->valid()
=> false
>>> $a[] = 2
=> 2
>>> $a->valid()
=> true
>>> $a->current()
=> 2

有了这个想法,我们可以将这样的动态迭代器传递给Guzzle,然后让它自己完成工作:

// MapIterator mainly needed for readability.
$generator = new MapIterator(
    // Initial data. This object will be always passed as the second parameter to the callback below
    new \ArrayIterator(['http://google.com']),
    function ($request, $array) use ($httpClient, $next) {
        return $httpClient->requestAsync('GET', $request)
            ->then(function (Response $response) use ($request, $array, $next) {
                // The status code for example.
                echo $request . ': ' . $response->getStatusCode() . PHP_EOL;
                // New requests.
                $array->append($next->shift());
                $array->append($next->shift());
            });
    }
);
// The "magic".
$generator = new ExpectingIterator($generator);
// And the concurrent runner.
$promise = \GuzzleHttp\Promise\each_limit($generator, 5);
$promise->wait();

如我之前所说,完整示例在此处的代码片段中,包括 MapIteratorExpectingIterator


好的,这个可行,谢谢你还提供了 Psysh 的链接(我之前不知道)。请确保不要删除你的 gist,因为它可能对未来的其他人有价值! - user429620
那就是目标 ;) 我会尝试添加更多的注释,因为从第一次查看中并不清楚为什么需要“ExpectingIterator”。 - Alexey Shokov
@ncla,不是这样的。Pool类内部使用与each_limit()相同的机制。我创建了一个单独的包(基于上面的代码)来尽可能简化事情,请看一下:https://github.com/alexeyshockov/guzzle-dynamic-pool/blob/master/example/app1.php - Alexey Shokov

1

正如我之前所说,完整的示例在gist中,其中包括MapIterator和ExpectingIterator

在php < 7上,迭代器不会再次变为有效状态,在您的示例中,arrayIterator和MapIterator示例都在初始池用尽后停止...

另一方面,如果您在迭代器上使用-append方法而不是[] push,它在早期版本的php上也可以工作。


谢谢,你说的 ->append() 是正确的。我已经修改了原来的答案,现在它可以与 PHP 5.x 兼容。 - Alexey Shokov

1
从问题来看,您似乎能够直接将聚合回调移动到查询中。在这种情况下,池将始终等待您的处理代码,因此您可以在任何时候添加新请求。
生成器可以返回请求或承诺,承诺可以以不同的方式组合在一起。
$generator = function () {
    while ($request = array_shift($this->requests)) {
        if (isset($request['page'])) {
            $key = 'page_' . $request['page'];
        } else {
            $key = 'listing_' . $request['listing'];
        }

        yield $this->client->sendAsync('GET', $request['url'])
            ->then(function (Response $response) use ($key) {
            /*
             * The fullfillment callback is now connected to the query, so the 
             * pool will wait for it.
             * 
             * $key is also available, because it's just a closure, so 
             * no $index needed as an argument.
             */
        });
    }
    echo "Exiting...\n";
    flush();
};

$promise = \GuzzleHttp\Promise\each_limit($generator(), [
    'concurrency' => function () {
        return max(1, min(count($this->requests), 2));
    },
    //...
]);

$promise->wait();

也尝试过创建一个新的请求,然后使用您提出的sendAsync方法(因为该方法需要一个请求,而不是像您的示例那样),但不幸的是它也不起作用。 - user429620
请查看我的编辑代码,必须对each_limit函数的参数进行一些编辑。但现在我遇到了错误:Scraper :: {closure}()函数的参数太少,传递了1个参数,在/vendor/guzzlehttp/promises/src/Promise.php的第203行,但需要2个参数。请求 - 有任何想法吗? - user429620
好的,我的错误。如果我只向我的fulfilled和rejected回调函数中添加1个参数,它就可以工作了。然而,我非常重要的是知道请求$index...有没有可能以某种方式获取该信息?这在Pool对象中是可能的。没有它,each_limit对我来说毫无用处。 - user429620
你定义了闭包,因此可以使用上下文中的所有变量。只需执行 ->then(function (...) use ($key) { ... } 并在闭包内部使用该键即可。无需将其作为参数获取。 - Alexey Shokov
同意,你的情况比我预期的要复杂一些。请看我的新答案以获得正确的解决方案。 - Alexey Shokov
显示剩余3条评论

0
答案是肯定的,你可以这样做。你只需要更多的生成器,并将你的请求解析和排队逻辑分离成异步设计。不要使用数组来存储你的池即将发出的请求并等待它们,而是应该使用一个生成器,从你的初始列表中产生新的请求,并从解析响应中添加请求,直到所有请求都被发送、解析和发送结果请求(循环)或遇到停止条件为止。

我原以为这正是我所拥有的。无论我向 $this->requests 数组中添加什么,都会自动被 $generator 使用。 - user429620
当你的数组为空时会发生什么?作为一种思考练习,当数组中只有一个请求时,请跟踪你的代码。当请求从数组中移除时,你的生成器是否有任何理由坐在那里等待任何东西被添加回去? - Jeremy Giberson
但是 Guzzle Pool 类只接受一个 $generator,所以当你说池本身需要成为生成器时,我不确定你在说什么。 - user429620

0
如果您可以使用postAsync/getAsync或类似的方法,您可以使用以下框架:
function postInBulk($inputs)
{
    $client = new Client([
        'base_uri' => 'https://a.b.com'
    ]);
    $headers = [
        'Authorization' => 'Bearer token_from_directus_user'
    ];

    $requests = function ($a) use ($client, $headers) {
        for ($i = 0; $i < count($a); $i++) {
            yield function() use ($client, $headers) {
                return $client->postAsync('https://a.com/project/items/collection', [
                    'headers' => $headers,
                    'json' => [
                        "snippet" => "snippet",
                        "rank" => "1",
                        "status" => "published"
                    ]        
                ]);
            };
        }
        
    };

    $pool = new Pool($client, $requests($inputs),[
        'concurrency' => 5,
        'fulfilled' => function (Response $response, $index) {
            // this is delivered each successful response
        },
        'rejected' => function (RequestException $reason, $index) {
            // this is delivered each failed request
        },
    ]);

    $pool->promise()->wait();
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接