Scala演员的低效问题

6
让我先说一下,我对Scala还比较新手。然而,我觉得基于Actor的并发模型很有趣,所以我尝试用它来实现一个相对简单的应用程序。但是,我遇到的问题是,尽管我能让应用程序工作,但结果的效率(以实际时间、CPU时间和内存使用情况为衡量标准)远不如使用线程从ArrayBlockingQueue中获取消息的等效Java解决方案。我想了解其中的原因。我怀疑这很可能是由于我对Scala的了解不够,导致所有的低效率,但是在多次尝试重新设计应用程序失败后,我决定向社区寻求帮助。
我的问题是: 我有一个格式为gzipped的文件,其中包含许多行,格式如下:
SomeID 逗号分隔的值列表
例如:
1234 12,45,82
我想解析每一行,并统计逗号分隔列表中每个值出现的总次数。
这个文件可能相当大(压缩后几个GB),但每个文件中唯一值的数量相对较小(最多500个)。我认为这是一个很好的机会,尝试编写一个基于Actor的并发Scala应用程序。我的解决方案涉及一个创建解析器Actor池的主驱动程序。主驱动程序然后从stdin读取行,将行传递给解析器Actor,解析器Actor解析行并保留值的本地计数。当主驱动程序读取了最后一行时,它向每个Actor发送一个消息,指示所有行都已读取。当Actor接收到“完成”消息时,它们将其计数传递给汇总器,汇总器对所有Actor的计数求和。一旦来自所有解析器的计数已经被聚合,主驱动程序就会打印出统计信息。
问题是: 我遇到的主要问题是这个应用程序的效率非常低下。它使用的CPU和内存远远超过使用线程和ArrayBlockingQueue的“等效”Java应用程序。为了让你了解情况,以下是我针对1000万行测试输入文件收集的一些统计数据:
Scala 1 Actor(解析器):
    real    9m22.297s
    user    235m31.070s
    sys     21m51.420s

Java 1线程(解析器):

    real    1m48.275s
    user    1m58.630s
    sys     0m33.540s

Scala 5个Actor:

    real    2m25.267s
    user    63m0.730s
    sys     3m17.950s

Java 5 线程:

    real    0m24.961s
    user    1m52.650s
    sys     0m20.920s

此外,顶级报告称,Scala应用程序的常驻内存大小约为10倍。因此,我们谈论的是数量级更高的CPU和内存,性能也差了数量级,我无法确定是什么原因导致了这种情况。是GC问题还是我创建了比我意识到的对象副本还要多的东西?
以下是一些可能重要的详细信息:
- Scala应用程序被Java类包装,以便我可以提供自包含的可执行JAR文件(我没有在可能要运行该应用程序的每台机器上都有Scala jars)。 - 应用程序的调用方式如下:gunzip -c gzFilename | java -jar StatParser.jar 以下是代码:
主驱动程序:
import scala.actors.Actor._
import scala.collection.{ immutable, mutable }
import scala.io.Source

class StatCollector (numParsers : Int ) {
    private val parsers = new mutable.ArrayBuffer[StatParser]()
    private val aggregator = new StatAggregator()

    def generateParsers {
        for ( i <- 1 to numParsers ) {
            val parser = new StatParser( i, aggregator )
            parser.start
            parsers += parser
        }
    }


    def readStdin {
        var nextParserIdx = 0
        var lineNo = 1
        for ( line <- Source.stdin.getLines() ) {
            parsers( nextParserIdx ) ! line
            nextParserIdx += 1
            if ( nextParserIdx >= numParsers ) {
                nextParserIdx = 0
            }
            lineNo += 1
        }
    }

    def informParsers {
        for ( parser <- parsers ) {
            parser ! true
        }
    }

    def printCounts {
        val countMap = aggregator.getCounts()
        println( "ID,Count" )
        /*
        for ( key <- countMap.keySet ) {
            println( key + "," + countMap.getOrElse( key, 0 ) )
            //println( "Campaign '" + key + "': " + countMap.getOrElse( key, 0 ) )
        }
        */
        countMap.toList.sorted foreach {
            case (key, value) =>
                println( key + "," + value )
        }
    }

    def processFromStdIn {
        aggregator.start

        generateParsers

        readStdin
        process
    }

    def process {

        informParsers

        var completedParserCount = aggregator.getNumParsersAggregated
        while ( completedParserCount < numParsers ) {
            Thread.sleep( 250 )
            completedParserCount = aggregator.getNumParsersAggregated
        }

        printCounts
    }
}

解析器Actor:
import scala.actors.Actor
import collection.mutable.HashMap
import scala.util.matching

class StatParser( val id: Int, val aggregator: StatAggregator ) extends Actor {

    private var countMap = new HashMap[String, Int]()
    private val sep1 = "\t"
    private val sep2 = ","


    def getCounts(): HashMap[String, Int] = {
        return countMap
    }

    def act() {
        loop {
            react {
                case line: String =>
                    {
                        val idx = line.indexOf( sep1 )
                        var currentCount = 0
                        if ( idx > 0 ) {
                            val tokens = line.substring( idx + 1 ).split( sep2 )
                            for ( token <- tokens ) {
                                if ( !token.equals( "" ) ) {
                                    currentCount = countMap.getOrElse( token, 0 )
                                    countMap( token ) = ( 1 + currentCount )
                                }
                            }

                        }
                    }
                case doneProcessing: Boolean =>
                    {
                        if ( doneProcessing ) {
                            // Send my stats to Aggregator
                            aggregator ! this
                        }
                    }
            }
        }
    }
}

聚合器Actor:
import scala.actors.Actor
import collection.mutable.HashMap

class StatAggregator extends Actor {
    private var countMap = new HashMap[String, Int]()
    private var parsersAggregated = 0

    def act() {
        loop {
            react {
                case parser: StatParser =>
                    {
                        val cm = parser.getCounts()
                        for ( key <- cm.keySet ) {
                            val currentCount = countMap.getOrElse( key, 0 )
                            val incAmt = cm.getOrElse( key, 0 )
                            countMap( key ) = ( currentCount + incAmt )
                        }
                        parsersAggregated += 1
                    }
            }
        }
    }

    def getNumParsersAggregated: Int = {
        return parsersAggregated
    }

    def getCounts(): HashMap[String, Int] = {
        return countMap
    }
}

非常感谢能提供帮助理解这里正在发生什么的任何帮助。
提前致谢!
---- 编辑 ---
由于有很多人回复并要求Java代码,这里是我为了比较而创建的简单Java应用程序。我意识到这不是很好的Java代码,但是当我看到Scala应用程序的性能时,我只是迅速编写了一些东西来查看基于线程的Java实现将作为基准表现如何:
解析线程:
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class JStatParser extends Thread
{
    private ArrayBlockingQueue<String> queue;
    private Map<String, Integer> countMap;
    private boolean done;

    public JStatParser( ArrayBlockingQueue<String> q )
    {
        super( );
        queue = q;
        countMap = new Hashtable<String, Integer>( );
        done = false;
    }

    public Map<String, Integer> getCountMap( )
    {
        return countMap;
    }

    public void alldone( )
    {
        done = true;
    }

    @Override
    public void run( )
    {
        String line = null;
        while( !done || queue.size( ) > 0 )
        {
            try
            {
                // line = queue.take( );
                line = queue.poll( 100, TimeUnit.MILLISECONDS );
                if( line != null )
                {
                    int idx = line.indexOf( "\t" ) + 1;
                    for( String token : line.substring( idx ).split( "," ) )
                    {
                        if( !token.equals( "" ) )
                        {
                            if( countMap.containsKey( token ) )
                            {
                                Integer currentCount = countMap.get( token );
                                currentCount++;
                                countMap.put( token, currentCount );
                            }
                            else
                            {
                                countMap.put( token, new Integer( 1 ) );
                            }
                        }
                    }
                }
            }
            catch( InterruptedException e )
            {
                // TODO Auto-generated catch block
                System.err.println( "Failed to get something off the queue: "
                        + e.getMessage( ) );
                e.printStackTrace( );
            }
        }
    }
}

驱动程序:

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;

public class JPS
{
    public static void main( String[] args )
    {
        if( args.length <= 0 || args.length > 2 || args[0].equals( "-?" ) )
        {
            System.err.println( "Usage: JPS [filename]" );
            System.exit( -1 );
        }

        int numParsers = Integer.parseInt( args[0] );
        ArrayBlockingQueue<String> q = new ArrayBlockingQueue<String>( 1000 );
        List<JStatParser> parsers = new ArrayList<JStatParser>( );

        BufferedReader reader = null;

        try
        {
            if( args.length == 2 )
            {
                reader = new BufferedReader( new FileReader( args[1] ) );
            }
            else
            {
                reader = new BufferedReader( new InputStreamReader( System.in ) );
            }

            for( int i = 0; i < numParsers; i++ )
            {
                JStatParser parser = new JStatParser( q );
                parser.start( );
                parsers.add( parser );
            }

            String line = null;
            while( (line = reader.readLine( )) != null )
            {
                try
                {
                    q.put( line );
                }
                catch( InterruptedException e )
                {
                    // TODO Auto-generated catch block
                    System.err.println( "Failed to add line to q: "
                            + e.getMessage( ) );
                    e.printStackTrace( );
                }
            }

            // At this point, we've put everything on the queue, now we just
            // need to wait for it to be processed.
            while( q.size( ) > 0 )
            {
                try
                {
                    Thread.sleep( 250 );
                }
                catch( InterruptedException e )
                {
                }
            }

            Map<String,Integer> countMap = new Hashtable<String,Integer>( );
            for( JStatParser jsp : parsers )
            {
                jsp.alldone( );
                Map<String,Integer> cm = jsp.getCountMap( );
                for( String key : cm.keySet( ) )
                {
                    if( countMap.containsKey( key ))
                    {
                        Integer currentCount = countMap.get(  key );
                        currentCount += cm.get( key );
                        countMap.put( key, currentCount );
                    }
                    else
                    {
                        countMap.put(  key, cm.get( key ) );
                    }
                }
            }

            System.out.println( "ID,Count" );
            for( String key : new TreeSet<String>(countMap.keySet( ))  )
            {
                System.out.println( key + "," + countMap.get( key ) );
            }

            for( JStatParser parser : parsers )
            {
                try
                {
                    parser.join( 100 );
                }
                catch( InterruptedException e )
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            System.exit(  0  );
        }
        catch( IOException e )
        {
            System.err.println( "Caught exception: " + e.getMessage( ) );
            e.printStackTrace( );
        }
    }
}

1
你尝试过编写从ArrayBlockingQueue中获取数据的Scala代码吗?发送消息给actor的开销确实比线程在ArrayBlockingQueue上等待要高,但你可能会观察到Java和Scala代码效率上的差异。Scala的效率看起来还不错,但我不确定你在Java中做了什么。例如,Java哈希映射比Scala的快--这种差异完全可以归因于不可变Scala映射操作和可变Java映射操作的速度差异。 - Rex Kerr
首先,这里有几个奇怪的地方。演员的想法不是共享可变数据。我会先将可变哈希映射更改为普通的scala.Map,然后将地图从解析器发送到聚合器,并删除那些getCount方法--它们明显破坏了演员基础知识。然后我猜性能上存在两个缺陷:可能粒度太高(每行一个消息),其次没有控制每个演员的负载平衡。他们应该都使用相同的邮箱。听起来更像是并行集合(map/reduce)的工作。 - 0__
@FuriousGeorge 不要把那么多代码放在问题中,而是将其存储在其他地方(例如gist或pastebin)。 - ziggystar
谢谢,我会记住这个建议。我最初并没有打算发布超过3个小的Scala代码片段。 - FuriousGeorge
丹尼尔,为了真正测试这个,你需要一个至少有几百兆字节的文件。我没有任何地方可以发布那么大的东西。很容易用一个带有单个for循环的脚本生成测试文件。格式是:ID<制表符>逗号分隔列表。如果你需要实际的测试文件,我可以找到一个地方来存放一个小文件,但除非它相当大,否则你不可能看到线程和GC问题。 - FuriousGeorge
显示剩余4条评论
2个回答

7
我不确定这是否是演员的好测试案例。首先,演员之间几乎没有互动。这是一个简单的映射/归约,需要并行处理而不是并发处理。
演员的开销也相当大,我不知道有多少实际线程被分配。根据您拥有的处理器数量,您可能比Java程序拥有更少的线程--鉴于加速比为4倍而不是5倍,这似乎是事实。
您编写演员的方式也针对空闲演员进行了优化,这种情况下您可能会拥有数百或数千个演员,但是只有很少的演员在任何时候都在做实际工作。如果您使用while/receive而不是loop/react编写演员,则它们的性能将更好。
现在,演员可以轻松地将应用程序分布在许多计算机上,除非您违反了演员的十项原则:您正在调用演员对象上的方法。您不应该这样做,并且事实上,Akka防止您这样做。更具演员风格的做法是让聚合器询问每个演员的键集,计算它们的联合,然后对于每个键,要求所有演员发送该键的计数。
但是,我不确定您是否看到了演员的开销。您没有提供有关Java实现的任何信息,但我敢说您使用了可变映射,甚至可能是单个并发可变映射--这是与Scala中所做的非常不同的实现。
也没有关于如何读取文件(这样大的文件可能会有缓冲问题)或在Java中如何解析它的信息。由于大部分工作是读取和解析文件,而不是计算令牌,因此在这里的实现差异可能很容易克服其他任何问题。
最后,关于驻留内存大小,Scala有一个9 MB的库(除了JVM带来的),这可能是您看到的。当然,如果您在Java中使用单个并发映射而不是在Scala中使用6个不可变映射,则这肯定会对内存使用模式产生重大影响。

Daniel,非常感谢您的详细描述。就像我说的,我在这里是一个完全的新手。我几乎可以确定Scala应用程序中有更多的线程。我在运行Linux机器上,拥有8个CPU,每个CPU有4个核心。Top报告称Scala应用程序正在使用约3100%的CPU(31个核心几乎100%,无论actor的数量如何),而Java应用程序最多使用100%*线程数)。关于内存问题,Scala应用程序很容易达到8.2 GB的驻留大小,而Java应用程序的峰值不到一半。 - FuriousGeorge
请查看我的更新问题,其中包含我用于比较的Java代码。我没有在线程之间使用共享的Concurrent Mutable Map,因为我希望实现更类似于我在Scala中尝试的内容。 - FuriousGeorge
虽然我同意这是一个更加并行的任务,但我仍然认为我的原始问题仍然有效。为什么Scala版本在系统资源方面效率如此低下?即使只有1个解析actor,Scala应用程序也会利用机器的31个核心,并且比使用2个核心的Java版本慢了约6倍。所有这些线程都在做什么?如果传递消息的开销真的很高,那么这种模型似乎只对极大规模的分布式系统有用。我感觉我在这里遗漏了某些东西,或者在我的Scala应用程序中做错了什么。 - FuriousGeorge
@FuriousGeorge 如果只使用一个解析器,就占用了31个核心,那么肯定有很大的问题。我已经评论了我所看到的所有内容,但是我在等待Java版本以进一步研究此问题。我会在有更多时间的时候进行研究。 - Daniel C. Sobral
@FuriousGeorge 你说钥匙很少,所以地图重用率会很低。虽然我没想到会出现OOM错误。 - Daniel C. Sobral
显示剩余2条评论

-1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接