如何在Rust中使用`sqlx`和`juniper`订阅?

8

背景:

我在将sqlxjuniper的订阅功能集成时遇到了困难。

sqlx::query::QueryAs::fetch()获取了一个Pin<Box<dyn Stream<Item = Result<User, sqlx::Error>> + 'e + Send>>

juniper需要将订阅返回为Pin<Box<dyn Stream<Item = Result<User, juniper::FieldError>> + Send>>

注意从Result<User, sqlx::Error>Result<User, juniper::FieldError>的更改。使用futures::TryStreamExt中的map_err(),我创建了以下代码来执行查询并转换错误类型。

type UsersStream =
    Pin<Box<dyn Stream<Item = Result<User, FieldError>> + Send>>;

#[juniper::graphql_subscription(Context = Context)]
impl SubscriptionRoot {
    async fn users(context: &Context) -> UsersStream {
        let sqlx::query_as!(User, "SELECT * FROM users")
            .fetch(&context.pool)
            .map_err(|e| {
                FieldError::new(
                    "Database error",
                    graphql_value!(format!("{}", e)))
            })
            .boxed()
    }
}

这个编译时出现以下错误:

error[E0759]: `executor` has lifetime `'ref_e` but it needs to satisfy a `'static` lifetime requirement
  --> server/src/graphql/subscription.rs:27:1
   |
27 |   #[juniper::graphql_subscription(Context = Context)]
   |   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |   |
   |   this data with lifetime `'ref_e`...
   |   ...is captured here...
...
63 | /         sqlx::query_as!(User, "SELECT * FROM users")
64 | |             .fetch(&context.pool)
65 | |             .map_err(|e| {
66 | |                 FieldError::new(
...  |
69 | |             })
70 | |             .boxed()
   | |____________________- ...and is required to live as long as `'static` here
   |
   = note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info)

error: aborting due to previous error

我对Stream和生命周期不够熟悉,无法理解这个错误的影响。在查看了更多信息后,似乎ref_e是订阅对juniperExecutor引用的生命周期。

尝试:

版本:

  • sqlx-0.4.1
  • juniper固定于master上的提交cd66bdb

我刚刚为一个个人项目实现了一个玩具GraphQL服务器,决定不使用sqlx,因为我无法集成它,所以我期待这个答案。我认为也有益于发布您的上下文是什么样子的,因为在定义我的时候遇到了一些“借用检查”问题。 - fvall
@fvall,你用了什么替代sqlx? - Mathieu
1
我已经创建了一个小的代码库来重现这个问题: https://github.com/mathroc/juniper-sqlx-subscriptions - Mathieu
2个回答

1

您的代码与我的不完全相同,但我认为解决方案也适用于这里,请尝试在使用之前克隆池:

type UsersStream =
    Pin<Box<dyn Stream<Item = Result<User, FieldError>> + Send>>;

#[juniper::graphql_subscription(Context = Context)]
impl SubscriptionRoot {
    async fn users(context: &Context) -> UsersStream {
        let pool = context.pool.clone();
        let sqlx::query_as!(User, "SELECT * FROM users")
            .fetch(&pool)
            .map_err(|e| {
                FieldError::new(
                    "Database error",
                    graphql_value!(format!("{}", e)))
            })
            .boxed()
    }
}

1
Mathieu的答案是正确的。所以我将解释错误背后的原因。
根本问题在于当您返回UsersStream时,您正在将数据移出函数fn users(..)。现在users(..)函数的调用者拥有了返回的数据,并且(理论上)可以做任何它想做的事情,包括将数据保留到应用程序的生命周期中,即给数据分配“静态”生命周期。但是,如果UsersStream引用具有有限生命周期的数据(context.pool),则当pool的所有者删除数据时会发生什么?它将引用空指针,因为数据已经被删除。因此,编译器会抛出错误以防止这种情况发生。
因此,在这里您可以做的是以某种方式传递所拥有的数据(pool)而不是引用,确保pool具有与UsersStream相同的生命周期,因为它现在拥有数据。clone()正是这样做的,它创建了数据的一个拥有副本(无论是引用计数还是字节复制)。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接