使用Scala Continuations与Netty/NIO监听器

9

我正在使用 Netty 库(GitHub 上的版本 4)。在 Scala 中它运行得很好,但我希望我的库能够使用 continuation passing style 来进行异步等待。

传统上,在 Netty 中您会执行类似以下操作(一个异步连接操作示例):

//client is a ClientBootstrap
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
    def operationComplete (f:ChannelFuture) = {
        //here goes the code that happens when the connection is made   
    }
})

如果您正在实现一个库(我就是),那么基本上有三种简单的选择来允许库的用户在连接建立后执行操作:
1. 仅从connect方法返回ChannelFuture,让用户处理它 - 这不提供与netty很多抽象。 2. 将ChannelFutureListener作为connect方法的参数,并将其添加为ChannelFuture的监听器。 3. 将回调函数对象作为connect方法的参数,并从您创建的ChannelFutureListener中调用它(这将使其呈现类似于node.js的回调驱动样式)。
我要做的是第四个选项; 我没有将其包括在上面的计数中,因为它并不简单。
我想使用scala delimited continuations,使库的使用方式有点像阻塞库,但它在幕后是非阻塞的:
class MyLibraryClient {
    def connect(remoteAddr:SocketAddress) = {
        shift { retrn: (Unit => Unit) => {
                val future:ChannelFuture = client.connect(remoteAddr);
                future.addListener(new ChannelFutureListener {
                    def operationComplete(f:ChannelFuture) = {
                        retrn();
                    }   
                });
            }
        }   
    }
}

想象一下其他读写操作也按照同样的方式实现。这样做的目标是让用户的代码看起来更像这样:

reset {
    val conn = new MyLibraryClient();
    conn.connect(new InetSocketAddress("127.0.0.1", 1337));
    println("This will happen after the connection is finished");
}

换句话说,程序看起来像一个简单的阻塞式程序,但在幕后没有任何阻塞或线程。
我遇到的问题是,我不完全理解分界限定续体的类型工作原理。当我尝试以上述方式实现它时,编译器抱怨我的operationComplete实现实际上返回Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit]而不是Unit。我知道Scala的CPS中存在一种“gotcha”,即必须使用@suspendable注释shift方法的返回类型,并将其传递到调用堆栈直到reset,但似乎没有办法与没有分界限定的现有Java库相结合。
我觉得一定有办法解决这个问题——如果Swarm可以序列化续体并将它们压缩到网络中以在其他地方计算,则必须可能只从现有的Java类中调用续体。但我无法弄清楚如何做到这一点。为了使这种情况发生,我是否需要在Scala中重写Netty的整个部分?

我不知道如何修复Scala的问题,但我建议反对你的想法。让我告诉你原因。通过让用户“不知道”你库的异步性质,你会告诉他在监听器代码中进行“阻塞”调用是可以的。事实上,他甚至不知道自己是在监听器中编写代码。在监听器中进行阻塞调用可能会导致各种问题。最常见的问题是它会“减慢”其他IO任务,并限制吞吐量。 - Norman Maurer
1
你说得很有道理,但我不同意。我认为我的库的用户(如果除了我之外还有其他人)可能需要先了解reset的作用,因此他们会明白这些调用是非阻塞的。这只是一种更深入地了解分界限制和以更清晰的方式编写基本上是回调驱动的代码的实验方法。 - Jeremy
1个回答

4
我在刚开始学习Scala的continuations时,发现这篇解释非常有帮助。特别是要注意他解释shift[A, B, C]reset[B, C]的部分。在operationComplete的最后添加一个虚拟的null语句可能会有帮助。
顺便提一下,如果某个函数内含嵌套的shift,则需要在另一个reset内调用retrn()
编辑:这里有一个可行的例子。
import scala.util.continuations._
import java.util.concurrent.Executors

object Test {

  val execService = Executors.newFixedThreadPool(2)

  def main(args: Array[String]): Unit = {
    reset {
      val conn = new MyLibraryClient();
      conn.connect("127.0.0.1");
      println("This will happen after the connection is finished");
    }
    println("Outside reset");
  }
}

class ChannelFuture {
  def addListener(listener: ChannelFutureListener): Unit = {
    val future = this
    Test.execService.submit(new Runnable {
      def run(): Unit = {
        listener.operationComplete(future)
      }
    })
  }
}

trait ChannelFutureListener {
  def operationComplete(f: ChannelFuture): Unit
}

class MyLibraryClient {
  def connect(remoteAddr: String): Unit@cps[Unit] = {
    shift {
      retrn: (Unit => Unit) => {
        val future: ChannelFuture = new ChannelFuture()
        future.addListener(new ChannelFutureListener {
          def operationComplete(f: ChannelFuture): Unit = {
            println("operationComplete starts")
            retrn();
            null
          }
        });
      }
    }
  }
}

可能的输出为:
Outside reset
operationComplete starts
This will happen after the connection is finished

实际上,这确实使编译器满意,甚至似乎正常工作。我猜关键是你将shift移出了匿名的ChannelFutureListener并使用闭包从operationComplete内部调用了继续执行。我不确定为什么这种方式有效而另一种方式无效,但我接受它。谢谢! - Jeremy
这是一篇关于Scala的continuations非常好的阅读材料。他们应该从scala-lang.org页面中删除有关continuations的毫无价值的示例,并用您链接的文章替换它们。 - Jeremy
@Jeremy 顺便说一下,你的代码和我的区别在于我明确注释了一些方法的返回类型。 - shams
我知道我发布的代码看起来几乎和你的一模一样。但由于某种原因,这不是我真正的代码。我把shift块放错了位置,放在了operationComplete的实现内部,这导致编译器出错。我没有意识到我可以将shift放在外面并使用闭包来调用它。尽管我错误地在问题中正确地写了它 :) - Jeremy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接