JanusGraph,Spark集群无法连接到Cassandra

3

我正在尝试在集群上运行一个创建JanusGraph的Spark作业。

我在一台单机上运行了JanusGraph服务器实例、Cassandra和ES,只有Spark计算发生在集群上。(基本上,我在机器上执行了janusgraph.sh start)

我的配置如下(x是我在上述实例上运行的机器的IP):

def getGraph(): JanusGraph = {
    val config = JanusGraphFactory.build()
    config.set("storage.backend", "cassandrathrift")
    config.set("storage.cassandrathrift.keyspace", "jgex")
    config.set("storage.hostname", "x")
    config.set("index.jgex.backend", "elasticsearch")
    config.set("index.jgex.index-name", "jgex")
    config.set("jgex.hostname", "x")
    config.open()
  }

但是当我在集群上使用 spark-submit 提交打包好的jar包时,会出现以下错误:

    java.lang.IllegalArgumentException: Could not instantiate implementation: org.janusgraph.diskstorage.cassandra.thrift.CassandraThriftStoreManager
    at org.janusgraph.util.system.ConfigurationUtil.instantiate(ConfigurationUtil.java:69)
    at org.janusgraph.diskstorage.Backend.getImplementationClass(Backend.java:477)
    at org.janusgraph.diskstorage.Backend.getStorageManager(Backend.java:409)
    at org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.<init>(GraphDatabaseConfiguration.java:1376)
    at org.janusgraph.core.JanusGraphFactory.open(JanusGraphFactory.java:164)
    at org.janusgraph.core.JanusGraphFactory.open(JanusGraphFactory.java:133)
    at org.janusgraph.core.JanusGraphFactory.open(JanusGraphFactory.java:123)
    at org.janusgraph.core.JanusGraphFactory$Builder.open(JanusGraphFactory.java:264)
    at janus_create$.getGraph(janus_create.scala:66)
    at janus_create$.makePropertiesandIndexes(janus_create.scala:830)
    at janus_create$.main(janus_create.scala:921)
    at janus_create.main(janus_create.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.janusgraph.util.system.ConfigurationUtil.instantiate(ConfigurationUtil.java:58)
    ... 16 more
Caused by: org.janusgraph.diskstorage.TemporaryBackendException: Temporary failure in storage backend
    at org.janusgraph.diskstorage.cassandra.thrift.CassandraThriftStoreManager.getCassandraPartitioner(CassandraThriftStoreManager.java:219)
    at org.janusgraph.diskstorage.cassandra.thrift.CassandraThriftStoreManager.<init>(CassandraThriftStoreManager.java:198)
    ... 21 more
Caused by: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused (Connection refused)
    at org.apache.thrift.transport.TSocket.open(TSocket.java:187)
    at org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81)
    at org.janusgraph.diskstorage.cassandra.thrift.thriftpool.CTConnectionFactory.makeRawConnection(CTConnectionFactory.java:110)
    at org.janusgraph.diskstorage.cassandra.thrift.thriftpool.CTConnectionFactory.makeObject(CTConnectionFactory.java:74)
    at org.janusgraph.diskstorage.cassandra.thrift.thriftpool.CTConnectionFactory.makeObject(CTConnectionFactory.java:43)
    at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1179)
    at org.janusgraph.diskstorage.cassandra.thrift.CassandraThriftStoreManager.getCassandraPartitioner(CassandraThriftStoreManager.java:216)
    ... 22 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.apache.thrift.transport.TSocket.open(TSocket.java:182)
    ... 28 more

我尝试在Cassandra和Cassandra Thrift之间切换,但都无法工作。另外,我在哪里指定我的Gremlin运行位置?这个是否相关?


您是使用预打包的JanusGraph分发来配置单个Cassandra节点,还是使用独立的Cassandra节点?并且为了澄清,您正在不同于Cassandra/Elasticsearch/JanusGraph的机器上运行Spark集群吗? - Jason Plurad
抱歉回复晚了。我已经找出问题所在,Cassandra和ES没有响应远程请求。我正在运行预打包的JanusGraph分发版,并且是的,我有一个在不同机器上运行的Spark集群。设置ES/Cassandra远程需要特殊配置吗?另外,如何运行多个Janusgraph服务器实例?谢谢。 - J.Doe
我尝试修复了很多东西,但是似乎与此相关的稀疏文档使事情变得比应该更困难。Janusgraph服务器本质上是一个Gremlin服务器,我可以进行配置更改以使Gremlin服务器指向正确的Cassandra和ES实例。但是,我该如何配置我的Spark作业以指向正确的Gremlin/Janusgraph服务器呢?提前感谢您的帮助。 - J.Doe
1个回答

3
预打包的分发假定每个Cassandra、Elasticsearch和Gremlin Server节点只有一个本地主机。请注意堆栈跟踪底部的java.net.ConnectException: Connection refused。如果您在远程集群上运行Spark,则需要确保服务器可用于非本地主机地址。
停止分发,使用bin/janusgraph.sh stop 使用机器的IP地址更新$JANUSGRAPH_HOME/conf/cassandra/cassandra.yaml中的listen_addressrpc_address(参见Cassandra文档
$JANUSGRAPH_HOME/elasticsearch/config/elasticsearch.yml中添加network.host,使用机器的IP地址(参见Elasticsearch文档
使用机器的IP地址在$JANUSGRAPH_HOME/conf/gremlin-server/gremlin-server.yaml中更新host(参见JanusGraph文档
假设您正在使用默认的Gremlin Server配置gremlin-server.yaml,则需要使用机器的IP地址更新$JANUSGRAPH_HOME/conf/gremlin-server/janusgraph-cassandra-es-server.properties中的属性文件。使用机器的IP地址更新storage.hostnameindex.search.hostname,并匹配上述服务器设置。
更新:似乎您的图连接属性中存在错误,这可能会导致问题: config.set("storage.cassandrathrift.keyspace", "jgex") -- 应为 "storage.cassandra.keyspace" config.set("jgex.hostname", "x") -- 应为 "index.jgex.hostname"

谢谢,我按照你提到的所有步骤都做了,现在出现了类似的错误,只是针对ElasticSearch。 另外,我想问一下,据我所知,Spark作业应该与Gremlin服务器通信,然后它们再与后端/索引后端通信。 但是在我的Spark代码中,我需要指定Cassandra和ES的位置。那么,Spark作业如何知道我的Gremlin在哪里? 我需要在我的Spark作业中指定吗? - J.Doe
请查看更新的答案。您上面的代码直接连接到存储后端和索引后端,这是一种有效的方法。如果您想通过Gremlin服务器连接,则需要使用Gremlin Driver方法。 - Jason Plurad
谢谢,它完美地运行了! 另外,如果我想要添加一个 Array[Double] 属性,在 Scala 中应该如何定义呢? mgmt.makePropertyKey("prop1").dataType(classOf[java.util.Arrays]).make() 不起作用。 - J.Doe

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接