Cassandra抛出NoHostAvailableException异常。

4
我正在使用以下代码将我的.NET客户端(基于CQL)连接到3节点Cassandra集群。我从RabbitMQ获取的数据速度为每秒30个记录,并且它们可以顺利地存储在Cassandra中,最多可达800-900行。但是,在此之后,我会遇到以下异常。请问有什么优化/更改措施可以避免此异常?我无法在任何地方找到解决此问题的具体方法。
错误信息:ERROR ErrorLog - error in Cassandra GetCWCRow Function Connection :None of the hosts tried for query are available (tried: X.X.X.201:9042, X.X.X.200:9042, X.X.X.X:9042)
代码如下:
using Cassandra;
using Consumer;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;



namespace RabbitMqCarWaleUserTracking
{
    class DataAccessCassandra
    {

        public bool InsertCookieLogData(string cwc, string page_uri)
        {
            try
            {
                Logs.WriteInfoLog("Cassandra InsertCookieLogData a Function called");
                Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
                ISession session = cluster.Connect(ConfigurationManager.AppSettings["cassandraKeySpace"].ToString());
                string pageCategory = string.Empty;
                try
                {

                    if ((Regex.IsMatch(page_uri, "/newcars/upcomingcars", RegexOptions.IgnoreCase)))
                    {
                        pageCategory = "upcomingCars";
                    }
                    else
                        if ((Regex.IsMatch(page_uri, "/newcars/dealers/newCarDealerShowroom", RegexOptions.IgnoreCase)) || (Regex.IsMatch(page_uri, "/newcars/dealers/listnewcardealersbycity", RegexOptions.IgnoreCase)
                            || (Regex.IsMatch(page_uri, "/newcars/dealers/dealerdetails", RegexOptions.IgnoreCase))))
                        {
                            pageCategory = "newcarsDealers";
                        }
                        else
                            if ((Regex.IsMatch(page_uri, "/offers", RegexOptions.IgnoreCase)) || (Regex.IsMatch(page_uri, "/alloffers", RegexOptions.IgnoreCase)))
                            {
                                pageCategory = "offers";
                            }
                            else
                                if ((Regex.IsMatch(page_uri, "/dealer/testdrive", RegexOptions.IgnoreCase)))
                                {
                                    pageCategory = "dealerTestDrive";
                                }

                    if (pageCategory != string.Empty)
                    {
                        Row result = session.Execute("select logdate from pageWiseCookieLog where cwc ='" + cwc + "' and page_uri ='" + pageCategory + "' and logdate= '" + DateTime.Today.ToString("yyyy-MM-dd") + "'").FirstOrDefault();
                        if (result == null)
                        {
                            session.Execute("insert into pageWiseCookieLog (cwc, page_uri, logdate) values ('" + cwc + "' , '" + pageCategory + "' , '" + DateTime.Now.ToString("yyyy-MM-dd") + "' )");
                            session.Execute("insert into pageWiseCookieLogByld (cwc, page_uri, logdate) values ('" + cwc + "' , '" + pageCategory + "' , '" + DateTime.Now.ToString("yyyy-MM-dd") + "' )");
                            session.Dispose();
                            cluster.Dispose();
                            return true;
                        }
                    }
                    else
                    {
                        //don't want to store the data for rest of the page category but need to return true  
                        session.Dispose();
                        cluster.Dispose();
                        return true;
                    }
                }
                catch (Exception ex)
                {
                    string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                    Logs.WriteErrorLog("error in Cassandra InsertCookieLogData function with cwc :" + cwc + "error is :" + ex.Message);
                    SendMail.HandleException(ex, subject);
                    session.Dispose();
                    cluster.Dispose();
                }
            }
            catch (Exception ex)
            {
                string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                Logs.WriteErrorLog("error in Cassandra InsertCookieLogData function connection :" + ex.Message);
                SendMail.HandleException(ex, subject);              
            }

            return false;
        }

        public string GetCWCRow(string cwc, int index, string mobileId)
        {
            try
            {
                Logs.WriteInfoLog("Cassandra GetCWCRow Function called");
                Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
                ISession session = cluster.Connect(ConfigurationManager.AppSettings["cassandraKeySpace"].ToString());
                try
                {
                    Row result = session.Execute("select cur_visit_id from usertracking where cwc ='" + cwc + "'").FirstOrDefault();

                    if (result != null)
                    {
                        session.Dispose();
                        cluster.Dispose();
                        return result[0].ToString();
                    }

                }
                catch (Exception ex)
                {
                    string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                    Logs.WriteErrorLog("error in Cassandra GetCWCRow function with cwc :" + cwc + "error is :" + ex.Message);
                    SendMail.HandleException(ex, subject);
                    session.Dispose();
                    cluster.Dispose();
                }

            }
            catch (Exception ex)
            {
                string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                Logs.WriteErrorLog("error in Cassandra GetCWCRow Function Connection :" + ex.Message);
                SendMail.HandleException(ex, subject);
            }

            return string.Empty;
        }

        public bool InsertCWCRecords(string cwv, Cut_Case caseType, int index, string mobileId)
        {
            try
            {
                Logs.WriteInfoLog("Cassandra InsertCWCRecords function called for case:" + caseType);
                Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
                ISession session = cluster.Connect(ConfigurationManager.AppSettings["cassandraKeySpace"].ToString());
                try
                {

                    bool _isProcessed = false;
                    string visitCount = "";
                    string[] leadParameters = cwv.Split('.');
                    string cwc = leadParameters[0];
                    string visitId = leadParameters[1];
                    string visitStartTime = leadParameters[2];
                    string visitPrevPageTime = leadParameters[3];
                    string visitLastPageTime = leadParameters[4];

                    if (leadParameters.Length == 6)
                    {
                        visitCount = leadParameters[5];
                    }
                    string TOT_TIME_SPENT = (Convert.ToInt64(visitLastPageTime) - Convert.ToInt64(visitPrevPageTime)).ToString();

                    if ((int)caseType == 1) //to enter new cwc data in summary table
                    {
                        session.Execute("insert into usertracking (cwc, cur_visit_id, cur_visit_last_ts,  tot_page_view, tot_time_spent, tot_visit_count, cur_visit_datetime) values ('" + cwc + "' , '" + visitId + "' ," + visitStartTime + "," + "1" + "," + TOT_TIME_SPENT + "," + "1" + ", '" + DateTime.Today.ToString("yyyy-MM-dd") + "' )");
                        _isProcessed = true;
                    }
                    if ((int)caseType == 2) //if cwc exits and visit id is same
                    {
                        Row result = session.Execute("select tot_page_view, tot_time_spent from usertracking where cwc ='" + cwc + "'").FirstOrDefault();
                        int page_cnt_val = int.Parse(result[0].ToString()) + 1;
                        Int64 time_spt_val = Int64.Parse(result[1].ToString()) + Convert.ToInt64(visitLastPageTime) - Convert.ToInt64(visitPrevPageTime);
                        session.Execute("update usertracking SET cur_visit_last_ts = " + visitLastPageTime + ", tot_page_view = " + page_cnt_val + ", tot_time_spent = " + time_spt_val + " WHERE cwc = '" + cwc.Trim() + "'");
                        _isProcessed = true;
                    }
                    if ((int)caseType == 3) //if cwc exits ans visit id is different
                    {
                        Row result = session.Execute("select tot_page_view, tot_time_spent, tot_visit_count, cur_visit_last_ts, cur_visit_datetime from usertracking where cwc = '" + cwc + "'").First();
                        int page_cnt_val = int.Parse(result[0].ToString()) + 1;
                        Int64 time_spt_val = Int64.Parse(result[1].ToString()) + Convert.ToInt64(visitLastPageTime) - Convert.ToInt64(visitPrevPageTime);
                        int visit_val = int.Parse(result[2].ToString()) + 1;
                        Int64 prev_visit_ts_val = Int64.Parse(result[3].ToString());
                        String prev_visit_datetime_val = Convert.ToDateTime(result[4].ToString()).ToString("yyyy-MM-dd");

                        session.Execute("update usertracking SET  cur_visit_id = '" + visitId + "' , tot_visit_count= " + visit_val
                            + " , prev_visit_last_ts= " + prev_visit_ts_val + ", prev_visit_datetime = '" + prev_visit_datetime_val
                            + "' , cur_visit_last_ts = " + visitLastPageTime
                            + ", tot_page_view = " + page_cnt_val + ", tot_time_spent = " + time_spt_val
                            + ", cur_visit_datetime='" + DateTime.Today.ToString("yyyy-MM-dd")
                            + "' WHERE cwc = '" + cwc.Trim() + "'");
                        _isProcessed = true;
                    }
                    session.Dispose();
                    cluster.Dispose();
                    return _isProcessed;
                }
                catch (Exception ex)
                {
                    string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                    Logs.WriteErrorLog("error in Cassandra InsertCWCRecords function with cwv :" + cwv + "error is :" + ex.Message);
                    SendMail.HandleException(ex, subject);
                    session.Dispose();
                    cluster.Dispose();
                    return false;
                }
            }
            catch (Exception ex)
            {
                string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                Logs.WriteErrorLog("error in Cassandra InsertCWCRecords function connection :" + ex.Message);
                SendMail.HandleException(ex, subject);
                return false;
            }

        }

        public bool UpdateReferrerTimeSpent(string cwc, int referrerCategoryId, double referrerTimeSpent, int index, string mobileId)
        {
            bool _isUpdated = false;

            try
            {              
                Logs.WriteInfoLog("Cassandra UpdateReferrerTimeSpent function called");
                Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
                ISession session = cluster.Connect("cw");

                try
                {
                    Row result = session.Execute("select time_spent_in_sec from userTimeSpentPage WHERE cwc = '" + cwc.Trim() + "' And logdate = '" + DateTime.Today.ToString("yyyy-MM-dd") + "' And page_category_id =" + referrerCategoryId).FirstOrDefault();
                    if (result != null)
                    {
                        if (result[0].ToString().Trim() != string.Empty)
                        {

                            Int64 page_time_spent_val = Int64.Parse(result[0].ToString());
                            Int64 tot_time_spt_val = page_time_spent_val + Int64.Parse(referrerTimeSpent.ToString());
                            session.Execute("update userTimeSpentPage set time_spent_in_sec= " + tot_time_spt_val + "WHERE cwc = '" + cwc.Trim() + "' And logdate = '" + DateTime.Today.ToString("yyyy-MM-dd") + "' And page_category_id=" + referrerCategoryId);
                        }
                    }
                    else
                    {
                        session.Execute("insert into userTimeSpentPage (cwc, page_category_id, time_spent_in_sec, logdate) values ('" + cwc + "' ," + referrerCategoryId + "," + referrerTimeSpent + ", '" + DateTime.Now.ToString("yyyy-MM-dd") + "' )");
                    }
                    _isUpdated = true;
                    session.Dispose();
                    cluster.Dispose();
                    return _isUpdated;
                }
                catch (Exception ex)
                {
                    string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                    Logs.WriteErrorLog("error in Cassandra UpdateReferrerTimeSpent function with cwc:" + cwc + "error is :" + ex.Message);
                    SendMail.HandleException(ex, subject);
                    session.Dispose();
                    cluster.Dispose();
                    return _isUpdated;
                }
            }
            catch (Exception ex)
            {
                string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                Logs.WriteErrorLog("error in Cassandra UpdateReferrerTimeSpent function connection" + ex.Message);
                SendMail.HandleException(ex, subject);
                return _isUpdated;
            }

        }
    }
}

编辑后的问题:

output of netstat -an | awk '/^tcp/ {print $NF}' | sort | uniq -c | sort -rn

On Machine 1  While cassandra running :
      773 ESTABLISHED
      36 LISTEN
      1 CLOSE_WAIT

After cassandra stopped :
      274 ESTABLISHED
      36 LISTEN
      1 CLOSE_WAIT

Machine 2 while cassandra running :
       3941 ESTABLISHED
       26 LISTEN
       7 CLOSE_WAIT

After cassandra stopped :
       26 LISTEN
       9 ESTABLISHED

On machine 3 while cassandra running :
       500 ESTABLISHED
       21 LISTEN

After cassandra stopped :    
      21 LISTEN
      13 ESTABLISHED
1个回答

2
NoHostAvailableException可能由多种原因引起。然而,这都与驱动程序文档中描述的问题有关:
当查询无法执行时,抛出此异常,因为没有可用的主机。如果以下情况之一发生,则会抛出此异常:
- 查询时集群中没有活动的主机 - 所有已尝试的主机由于连接问题而失败
那么为什么会发生这种情况呢?可能有几个原因。
1. 三个节点同时进行垃圾回收的可能性。我个人认为这不是问题所在,但您应该阅读相关文档,看看它是否适用于您的情况。这里有一个非常好的文档链接,介绍了如何调整Cassandra中的GC。事实上,每次调用时创建集群和会话对象而不是将它们存储为单例并重复使用它们,可能会使情况变得更糟。
2. 在查看引发错误的函数后,我更加确信问题是:在此语句期间读取宽行时,经过许多插入操作后,您的节点仅超时:Row result = session.Execute("select cur_visit_id from usertracking where cwc ='" + cwc + "'").FirstOrDefault();。在后续过程中,您正在为相同的聚簇键cws执行大量更新操作,由于SSTables的不可变性质,这会产生许多版本化数据,导致添加数据检索时间,因为集群需要为每个请求组合所有这些数据。
没有任何表模式很难提出建议,反向工程也无法帮助太多,但我建议以某种方式利用复合主键以加快查找速度。调整JVM并使会话和集群成为单例,并在代码中重用它们。看看这是否有所帮助。
阅读Cassandra集群日志,重点关注问题发生时的时间。查看这些日志中是否有任何线索,例如垃圾回收活动或超时错误。
希望对您有所帮助。
Roman

首先非常感谢您的帮助。我实现了您建议的将集群和会话对象存储为单例对象的方法,这对我来说非常完美。我还有一个问题,我已经编辑了我的原始问题。在进行这些单例更改之前,即使在正确处理集群和会话对象后,我仍然有很多TCP连接。现在整个集群只有3-4个TCP连接(节点1上有2个TCP连接,其余连接在其他节点上)。您能否详细说明为什么连接没有关闭? - Naresh
如果我猜测这些连接是由于连接池而产生的,那么我的猜测应该不会很离谱。 - Roman Tumaykin
你好Roman,tune GC in cassandra的链接已经失效了,你能提供另外一个链接吗? - Hitesh

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接