如何在Akka Actor中使用MYSQL jdbc实现

3

嘿,我读了这篇jdbc文档https://www.playframework.com/documentation/2.1.0/ScalaDatabase和这个问题Is it good to put jdbc operations in actors?

现在我有一个用于mysql事务的ActorClass,并且每当请求到来时,该actor实例化多次。因此,每个请求都将实例化新的actor。连接池是否安全?

我可以使用

val connection = DB.getConnection()

连接对象能够处理异步事务吗?因此我可以使用单例来处理mysql连接,并在所有实例化的actors中使用它。另外,如果我想使用anorm,我该如何创建一个隐式的连接对象?
谢谢。

你为什么想要在每个MySQL事务中使用一个actor? - Reza Same'ei
我想使用MySQL的Actor来处理所有交易,这样我就可以仅异步传递消息。 - ans4175
4个回答

3
您的DB.getConnection()应该是一个promise[Connection]或者future[Connection],这样您就不会阻塞actor。如果您的DB.getConnection()是同步的(仅返回连接而不包装类型),则在处理实际消息时,您的actor将挂起直到它实际从池中获得连接。无论您的DB是否是单例,最终都会命中连接池。
话虽如此,您可以创建用于处理消息的actors和用于处理数据库持久化的其他actors,并将它们放在不同的线程调度程序中,为数据库密集型任务分配更多线程。这也是PlayFramework中建议的做法。
注意事项:如果您在actor内部运行futures,则不能确保它将在哪个线程/时间运行,我假设您已经按以下方式进行操作(请阅读注释)。
def receive = {
  case aMessage => 
    val aFuture = future(db.getConnection)
    aFuture.map { theConn => //from previous line when you acquire the conn and when you execute the next line
                             //it could pass a long time they run in different threads/time 
                             //that's why you should better create an actor that handles this sync and let
                             //akka do the async part
      theConn.prepareStatemnt(someSQL)
      //omitted code...
    }
}

因此,我的建议是:
//actor A receives, 
//actor B proccess db (and have multiple instances of this one due to slowness from db)

class ActorA(routerOfB : ActorRef) extends Actor {
  def recieve = {
    case aMessage =>
      routerOfB ! aMessage
  }
}

class ActorB(db : DB) extends Actor {
  def receive = {
    case receive = {
      val conn = db.getConnection //this blocks but we have multiple instances 
                                  //and enforces to run in same thread
      val ps = conn.prepareStatement(someSQL)
    }
  }
}

您需要使用路由:http://doc.akka.io/docs/akka/2.4.1/scala/routing.html


如何处理连接关闭? java.sql.SQLException: 连接已关闭! - ans4175
我改进了答案,请查看一下。之前的方法存在问题,当另一个线程运行future时,你的连接可能会关闭。 - Luis Ramirez-Monterosa
啊,好的要点。所以它是在Actor内部实例化,但通过路由限制到少量Actor实例? - ans4175
你可以使用以下代码实例化一组Actor:val workers = context.actorOf(Props[ItemProcessingWorker].withRouter(RoundRobinRouter(100)))。workers是一个Actor引用,你可以向它发送消息:workers ! aMessage,这意味着消息将以轮询的方式发送给其中一个worker。了解更多关于路由和调度程序的知识,Akka可以帮助你隔离组件,你不希望你的消息处理程序与数据库处理程序共享线程,因为数据库需要更多的线程,每次获取连接时都会阻塞。 - Luis Ramirez-Monterosa

1
  • 据我所知,在关系型数据库中,无法在单个连接上运行多个并发查询(即使在C-API中也没有看到任何有关mysql的异步/非阻塞调用的资源/参考)。要并发运行查询,您必须拥有多个连接实例。

  • 当您拥有多个连接实例时,DB.getConnection 并不昂贵。与数据库交互最昂贵的区域是运行 SQL 查询并等待其响应。

  • 为了使您的 DB 调用异步化,您应该在其他线程中运行它们(而不是在 Akka 或 Play 的主线程池中);Slick 会为您完成这项工作。它管理一个线程池并在其中运行您的 DB 调用,然后您的主线程将空闲以处理传入请求。因此,您不需要将 DB 调用包装在 actor 中以进行异步操作。


很好的点子,我用这样的模式来管理它: MysqlConn(单例)
一次处理一个连接,然后在调用时返回对象连接; ActorMysql(Actor类) 每当另一个actor或模块想要存储或检索数据时实例化,在此actor中将调用MysqlConn.getConnection;
到目前为止,它仍然是可靠的。
- ans4175
它很可靠,但并发性如何?您的单个连接是瓶颈,不允许您同时运行多个查询。 - Reza Same'ei

0

如果想要更多功能性的数据库访问方式,我建议查看slick。它有一个很好的API可以与普通的actors集成,进一步使用streams


0

我认为你应该从连接池中获取连接,并在完成后返回。如果我们按照每个演员单独使用一个连接,那么如果该连接断开,您可能需要重新初始化它。

对于事务,您可能想要尝试

DB.withTransaction { conn =>  // do whatever you need with the connection}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接