让我先说一下,我对Scala还比较新手。然而,我觉得基于Actor的并发模型很有趣,所以我尝试用它来实现一个相对简单的应用程序。但是,我遇到的问题是,尽管我能让应用程序工作,但结果的效率(以实际时间、CPU时间和内存使用情况为衡量标准)远不如使用线程从ArrayBlockingQueue中获取消息的等效Java解决方案。我想了解其中的原因。我怀疑这很可能是由于我对Scala的了解不够,导致所有的低效率,但是在多次尝试重新设计应用程序失败后,我决定向社区寻求帮助。
我的问题是: 我有一个格式为gzipped的文件,其中包含许多行,格式如下:
SomeID 逗号分隔的值列表
例如:
1234 12,45,82
我想解析每一行,并统计逗号分隔列表中每个值出现的总次数。
这个文件可能相当大(压缩后几个GB),但每个文件中唯一值的数量相对较小(最多500个)。我认为这是一个很好的机会,尝试编写一个基于Actor的并发Scala应用程序。我的解决方案涉及一个创建解析器Actor池的主驱动程序。主驱动程序然后从stdin读取行,将行传递给解析器Actor,解析器Actor解析行并保留值的本地计数。当主驱动程序读取了最后一行时,它向每个Actor发送一个消息,指示所有行都已读取。当Actor接收到“完成”消息时,它们将其计数传递给汇总器,汇总器对所有Actor的计数求和。一旦来自所有解析器的计数已经被聚合,主驱动程序就会打印出统计信息。
问题是: 我遇到的主要问题是这个应用程序的效率非常低下。它使用的CPU和内存远远超过使用线程和ArrayBlockingQueue的“等效”Java应用程序。为了让你了解情况,以下是我针对1000万行测试输入文件收集的一些统计数据:
Scala 1 Actor(解析器):
此外,顶级报告称,Scala应用程序的常驻内存大小约为10倍。因此,我们谈论的是数量级更高的CPU和内存,性能也差了数量级,我无法确定是什么原因导致了这种情况。是GC问题还是我创建了比我意识到的对象副本还要多的东西?
以下是一些可能重要的详细信息:
- Scala应用程序被Java类包装,以便我可以提供自包含的可执行JAR文件(我没有在可能要运行该应用程序的每台机器上都有Scala jars)。 - 应用程序的调用方式如下:gunzip -c gzFilename | java -jar StatParser.jar 以下是代码:
主驱动程序:
解析器Actor:
聚合器Actor:
非常感谢能提供帮助理解这里正在发生什么的任何帮助。
提前致谢!
---- 编辑 ---
由于有很多人回复并要求Java代码,这里是我为了比较而创建的简单Java应用程序。我意识到这不是很好的Java代码,但是当我看到Scala应用程序的性能时,我只是迅速编写了一些东西来查看基于线程的Java实现将作为基准表现如何:
解析线程:
我的问题是: 我有一个格式为gzipped的文件,其中包含许多行,格式如下:
SomeID 逗号分隔的值列表
例如:
1234 12,45,82
我想解析每一行,并统计逗号分隔列表中每个值出现的总次数。
这个文件可能相当大(压缩后几个GB),但每个文件中唯一值的数量相对较小(最多500个)。我认为这是一个很好的机会,尝试编写一个基于Actor的并发Scala应用程序。我的解决方案涉及一个创建解析器Actor池的主驱动程序。主驱动程序然后从stdin读取行,将行传递给解析器Actor,解析器Actor解析行并保留值的本地计数。当主驱动程序读取了最后一行时,它向每个Actor发送一个消息,指示所有行都已读取。当Actor接收到“完成”消息时,它们将其计数传递给汇总器,汇总器对所有Actor的计数求和。一旦来自所有解析器的计数已经被聚合,主驱动程序就会打印出统计信息。
问题是: 我遇到的主要问题是这个应用程序的效率非常低下。它使用的CPU和内存远远超过使用线程和ArrayBlockingQueue的“等效”Java应用程序。为了让你了解情况,以下是我针对1000万行测试输入文件收集的一些统计数据:
Scala 1 Actor(解析器):
real 9m22.297s
user 235m31.070s
sys 21m51.420s
Java 1线程(解析器):
real 1m48.275s
user 1m58.630s
sys 0m33.540s
Scala 5个Actor:
real 2m25.267s
user 63m0.730s
sys 3m17.950s
Java 5 线程:
real 0m24.961s
user 1m52.650s
sys 0m20.920s
此外,顶级报告称,Scala应用程序的常驻内存大小约为10倍。因此,我们谈论的是数量级更高的CPU和内存,性能也差了数量级,我无法确定是什么原因导致了这种情况。是GC问题还是我创建了比我意识到的对象副本还要多的东西?
以下是一些可能重要的详细信息:
- Scala应用程序被Java类包装,以便我可以提供自包含的可执行JAR文件(我没有在可能要运行该应用程序的每台机器上都有Scala jars)。 - 应用程序的调用方式如下:gunzip -c gzFilename | java -jar StatParser.jar 以下是代码:
主驱动程序:
import scala.actors.Actor._
import scala.collection.{ immutable, mutable }
import scala.io.Source
class StatCollector (numParsers : Int ) {
private val parsers = new mutable.ArrayBuffer[StatParser]()
private val aggregator = new StatAggregator()
def generateParsers {
for ( i <- 1 to numParsers ) {
val parser = new StatParser( i, aggregator )
parser.start
parsers += parser
}
}
def readStdin {
var nextParserIdx = 0
var lineNo = 1
for ( line <- Source.stdin.getLines() ) {
parsers( nextParserIdx ) ! line
nextParserIdx += 1
if ( nextParserIdx >= numParsers ) {
nextParserIdx = 0
}
lineNo += 1
}
}
def informParsers {
for ( parser <- parsers ) {
parser ! true
}
}
def printCounts {
val countMap = aggregator.getCounts()
println( "ID,Count" )
/*
for ( key <- countMap.keySet ) {
println( key + "," + countMap.getOrElse( key, 0 ) )
//println( "Campaign '" + key + "': " + countMap.getOrElse( key, 0 ) )
}
*/
countMap.toList.sorted foreach {
case (key, value) =>
println( key + "," + value )
}
}
def processFromStdIn {
aggregator.start
generateParsers
readStdin
process
}
def process {
informParsers
var completedParserCount = aggregator.getNumParsersAggregated
while ( completedParserCount < numParsers ) {
Thread.sleep( 250 )
completedParserCount = aggregator.getNumParsersAggregated
}
printCounts
}
}
解析器Actor:
import scala.actors.Actor
import collection.mutable.HashMap
import scala.util.matching
class StatParser( val id: Int, val aggregator: StatAggregator ) extends Actor {
private var countMap = new HashMap[String, Int]()
private val sep1 = "\t"
private val sep2 = ","
def getCounts(): HashMap[String, Int] = {
return countMap
}
def act() {
loop {
react {
case line: String =>
{
val idx = line.indexOf( sep1 )
var currentCount = 0
if ( idx > 0 ) {
val tokens = line.substring( idx + 1 ).split( sep2 )
for ( token <- tokens ) {
if ( !token.equals( "" ) ) {
currentCount = countMap.getOrElse( token, 0 )
countMap( token ) = ( 1 + currentCount )
}
}
}
}
case doneProcessing: Boolean =>
{
if ( doneProcessing ) {
// Send my stats to Aggregator
aggregator ! this
}
}
}
}
}
}
聚合器Actor:
import scala.actors.Actor
import collection.mutable.HashMap
class StatAggregator extends Actor {
private var countMap = new HashMap[String, Int]()
private var parsersAggregated = 0
def act() {
loop {
react {
case parser: StatParser =>
{
val cm = parser.getCounts()
for ( key <- cm.keySet ) {
val currentCount = countMap.getOrElse( key, 0 )
val incAmt = cm.getOrElse( key, 0 )
countMap( key ) = ( currentCount + incAmt )
}
parsersAggregated += 1
}
}
}
}
def getNumParsersAggregated: Int = {
return parsersAggregated
}
def getCounts(): HashMap[String, Int] = {
return countMap
}
}
非常感谢能提供帮助理解这里正在发生什么的任何帮助。
提前致谢!
---- 编辑 ---
由于有很多人回复并要求Java代码,这里是我为了比较而创建的简单Java应用程序。我意识到这不是很好的Java代码,但是当我看到Scala应用程序的性能时,我只是迅速编写了一些东西来查看基于线程的Java实现将作为基准表现如何:
解析线程:
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class JStatParser extends Thread
{
private ArrayBlockingQueue<String> queue;
private Map<String, Integer> countMap;
private boolean done;
public JStatParser( ArrayBlockingQueue<String> q )
{
super( );
queue = q;
countMap = new Hashtable<String, Integer>( );
done = false;
}
public Map<String, Integer> getCountMap( )
{
return countMap;
}
public void alldone( )
{
done = true;
}
@Override
public void run( )
{
String line = null;
while( !done || queue.size( ) > 0 )
{
try
{
// line = queue.take( );
line = queue.poll( 100, TimeUnit.MILLISECONDS );
if( line != null )
{
int idx = line.indexOf( "\t" ) + 1;
for( String token : line.substring( idx ).split( "," ) )
{
if( !token.equals( "" ) )
{
if( countMap.containsKey( token ) )
{
Integer currentCount = countMap.get( token );
currentCount++;
countMap.put( token, currentCount );
}
else
{
countMap.put( token, new Integer( 1 ) );
}
}
}
}
}
catch( InterruptedException e )
{
// TODO Auto-generated catch block
System.err.println( "Failed to get something off the queue: "
+ e.getMessage( ) );
e.printStackTrace( );
}
}
}
}
驱动程序:
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
public class JPS
{
public static void main( String[] args )
{
if( args.length <= 0 || args.length > 2 || args[0].equals( "-?" ) )
{
System.err.println( "Usage: JPS [filename]" );
System.exit( -1 );
}
int numParsers = Integer.parseInt( args[0] );
ArrayBlockingQueue<String> q = new ArrayBlockingQueue<String>( 1000 );
List<JStatParser> parsers = new ArrayList<JStatParser>( );
BufferedReader reader = null;
try
{
if( args.length == 2 )
{
reader = new BufferedReader( new FileReader( args[1] ) );
}
else
{
reader = new BufferedReader( new InputStreamReader( System.in ) );
}
for( int i = 0; i < numParsers; i++ )
{
JStatParser parser = new JStatParser( q );
parser.start( );
parsers.add( parser );
}
String line = null;
while( (line = reader.readLine( )) != null )
{
try
{
q.put( line );
}
catch( InterruptedException e )
{
// TODO Auto-generated catch block
System.err.println( "Failed to add line to q: "
+ e.getMessage( ) );
e.printStackTrace( );
}
}
// At this point, we've put everything on the queue, now we just
// need to wait for it to be processed.
while( q.size( ) > 0 )
{
try
{
Thread.sleep( 250 );
}
catch( InterruptedException e )
{
}
}
Map<String,Integer> countMap = new Hashtable<String,Integer>( );
for( JStatParser jsp : parsers )
{
jsp.alldone( );
Map<String,Integer> cm = jsp.getCountMap( );
for( String key : cm.keySet( ) )
{
if( countMap.containsKey( key ))
{
Integer currentCount = countMap.get( key );
currentCount += cm.get( key );
countMap.put( key, currentCount );
}
else
{
countMap.put( key, cm.get( key ) );
}
}
}
System.out.println( "ID,Count" );
for( String key : new TreeSet<String>(countMap.keySet( )) )
{
System.out.println( key + "," + countMap.get( key ) );
}
for( JStatParser parser : parsers )
{
try
{
parser.join( 100 );
}
catch( InterruptedException e )
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.exit( 0 );
}
catch( IOException e )
{
System.err.println( "Caught exception: " + e.getMessage( ) );
e.printStackTrace( );
}
}
}
ArrayBlockingQueue
中获取数据的Scala代码吗?发送消息给actor的开销确实比线程在ArrayBlockingQueue
上等待要高,但你可能会观察到Java和Scala代码效率上的差异。Scala的效率看起来还不错,但我不确定你在Java中做了什么。例如,Java哈希映射比Scala的快--这种差异完全可以归因于不可变Scala映射操作和可变Java映射操作的速度差异。 - Rex Kerrscala.Map
,然后将地图从解析器发送到聚合器,并删除那些getCount
方法--它们明显破坏了演员基础知识。然后我猜性能上存在两个缺陷:可能粒度太高(每行一个消息),其次没有控制每个演员的负载平衡。他们应该都使用相同的邮箱。听起来更像是并行集合(map/reduce)的工作。 - 0__