在Java中进行位操作——将长整型分解为长整型数组的位掩码

5

我正在将一个长整型分解为由单个位长的长整型组成的long[]数组

public static int decompose(long[] buffer, long base) {
  int count = Long.bitCount(base);
  for (int i=0; i<count; i++) {
    base ^= (buffer[i] = Long.lowestOneBit(base));
  }
  return count;
}

但我感觉可能有更快的方法来完成这个任务,因为似乎有一些重复的步骤。例如,计算位数应该已经非常接近获取填充结果所需的所有信息。

有什么建议吗?我熟悉“过早优化”的口号,因此我不会在我的时间上再推进我的解决方案,但也许其他人以前见过这种情况或者沉迷于优化……编辑:请通过Mark提供的测试工具运行任何建议。我非常惊讶我的第一个想法实际上是可行的。

测试代码,它在JUnit方法内部:

Random rng = new Random();
long size = 0;
long[] hold = new long[Long.SIZE];
System.out.println("init:"+Long.toBinaryString(BitTwiddling.bitmask(rng.nextInt(Long.SIZE)))); //initialize BitTwiddling internals
long start = System.currentTimeMillis();
for (int i=0; i<compareIterations; i++) size+= BitTwiddling.decompose(hold,rng.nextInt(Integer.MAX_VALUE));
long delta = System.currentTimeMillis() - start;
double base = (double)size/delta;

size = 0;
start = System.currentTimeMillis();
for (int i=0; i<compareIterations; i++) size += BitTwiddling.decomposeAlt(hold, rng.nextInt(Integer.MAX_VALUE));
long delta2 = System.currentTimeMillis() - start;
double base2 = (double)size/delta2;

然后比较base和base2。

你能发布你的测试工具吗? - Mark Peters
@Mark:编辑中;BitTwiddling是静态实用程序类,具有分解方法。 - Carl
这不是一个很好的测试工具。它没有隔离应该被隔离的很多东西。它也没有连续运行它们多次以忽略热点编译的影响。看看我发布的那个。 - Mark Peters
此外,在测试类中设置compareIterations = Integer.MAX_VALUE >> 2 - Carl
5个回答

3

经过长时间的尝试,我终于找到了一段代码,通过无耻地盗用你的代码并进行微调,在我的环境中始终能够击败你的原始代码。

    protected int decompose(long[] buffer, long base) {
        int count = Long.bitCount(base);
        for (int i = 0; i < count; i++) {
            base -= (buffer[i] = Long.lowestOneBit(base));
        }
        return count;
    }

唯一的区别:使用减法而不是异或。我认为没有任何边缘情况;我肯定没有发现任何问题。不知道为什么这些不会被优化成相同的东西(假设没有边缘情况),但这确实使我的更快(再次在我的机器上)。我已经将此版本添加到测试答案中。
编辑,我猜它不能被优化的原因是编译器不知道操作数始终小于基数(因为我们从低位到高位)。
编辑#2,在设置了-server标志的情况下,卡尔的速度比我的稍微快一点。
编辑#3,没错,当在64位JVM上运行时,这确实更快:
    protected int decompose(long[] buffer, long base) {
        int count = 0;
        while ( 0 != (base -= (buffer[count++] = Long.lowestOneBit(base))));
        return count;
    }

(或等效的异或版本),因为它可以使用本地处理器操作执行64位比较。

我试着将我的代码优化了一下,通过简化for循环语句,使程序的执行速度得到了显著提高(比原来快了几十毫秒),尽管我不知道为什么。以下是代码示例:for (int i=0; i<count; base ^= (buffer[i++] = Long.lowestOneBit(base))); 我相信对于-=操作符同样适用。 - Carl
没有抄袭的羞耻感 - 这正是我希望通过我的问题来做的! - Carl
@Carl:在服务器模式下似乎没有什么区别。我也试图找出为什么使用Long.bitCount()预先计算计数比使用while ( 0 != (base -= (buffer[count++] = Long.lowestOneBitSet(base))) );更好。最后发现这是因为后者需要长比较,而你的需要整数比较。没想到这会弥补不必要的计算。 - Mark Peters
@Carl:是的,我确认了我上面的评论,并发现在64位JVM上放弃Long.bitCount()调用可以节省时间,因为长64位比较可以在64位架构上本地执行。 - Mark Peters
啊,这很有道理——我猜这使得 @mikera 的代码变得更快了(因为你最后一次编辑时使用了库调用而不是直接计算最低位的 base & (-base))?对于 32 位和 64 位 JVM 的学习经验非常棒。 - Carl
显示剩余3条评论

2

既然我喜欢优化*,这里提供了一个你可以尝试的版本:

public static int decompose(long[] buffer, long base) {
    int count = 0;
    while (base != 0) {
        long mask = base & (-base);
        base &= ~mask;
        buffer[count++] = mask;
    }
    return count;
}

我所做的主要事情包括:

  • 内联最低位计算,避免方法调用开销。在编译器/JVM没有足够聪明地为您执行此操作的(罕见但可能的)情况下,这可能是一种胜利...
  • 将长整型数组作为输入参数传递,以避免内存分配和需要计算位数以确定数组大小。函数现在返回找到的位掩码数量。
  • 在进行过程中清零位,以便您可以尽早从循环中退出

*因为这很有趣,无论是否为时过早


经过一些比较(为了让原始数据更具可比性而消除了缓冲区分配),我得出的答案几乎与你的答案相同。消除分配可以提高速度约五分之一,但只有在可以重复使用缓冲区时才有用。 - Carl
此外,将while循环转换为单个语句(while ((base &= ~(buffer[count++] = base & (-base))) != 0);)似乎可以提高性能(在运行该方法一分钟后,性能提高了几个百分点)。也许是因为mask赋值被消除了? - Carl

2

这是我目前正在使用的工具。当前,Mark Peters的答案通常是最快的,但使用-server标志的话,Carl的更快。另外,mikera在服务器模式下表现得更具有竞争力(虽然仍然比Carl/Mark的慢)。

import java.util.Random;

public abstract class Harness {
    public static void main(String[] args) {
        while (true) {
            long[] randomData = new long[4096];
            Random r = new Random();
            for (int i = 0; i < randomData.length; i++) {
                randomData[i] = r.nextLong();
            }
            long[] buffer = new long[64];

            Harness[] versions = new Harness[] {
                    new Control(randomData, buffer),
                    new Carl(randomData, buffer),
                    new MarkPeters(randomData, buffer),
                    new MarkPetersServer64Bit(randomData, buffer),
//                    new Rsp1(randomData, buffer), 
                    new Rsp2(randomData, buffer),
                    new Mikera(randomData, buffer)
                    };
            for (Harness v : versions) {
                v.doTest();
            }
        }
    }

    private final long[] buffer;
    private final long[] randomData;

    protected Harness(long[] randomData, long[] buffer) {
        this.randomData = randomData;
        this.buffer = buffer;
    }

    public void doTest() {
        long start = System.nanoTime();
        long count = 0;
        for (int times = 0; times < 1000; times++) {
            for (int i = 0; i < randomData.length; i++) {
                count = decompose(buffer, randomData[i]);
            }
        }
        long end = System.nanoTime();

        //verify decomposition of last item
        long l = 0;
        for ( int i = 0; i < count; i++ ) {
            l |= buffer[i];
        }

        System.out.println(getClass().getSimpleName() + " took " + (end - start)
                / 1000000.0 + " ms - last base: " + l);
    }

    protected abstract int decompose(long[] buffer, long base);

    private static class Control extends Harness {
        protected Control(long[] randomData, long[] buffer) {
            super(randomData, buffer);
        }

        @Override
        protected int decompose(long[] buffer, long base) {
            return 0;
        }
    }

    private static class Carl extends Harness {
        protected Carl(long[] randomData, long[] buffer) {
            super(randomData, buffer);
        }

        @Override
        protected int decompose(long[] buffer, long base) {
            final int count = Long.bitCount(base);
            for (int i = 0; i < count; i++) {
                base ^= (buffer[i] = Long.lowestOneBit(base));
            }
            return count;
        }
    }

    private static class Mikera extends Harness {
        protected Mikera(long[] randomData, long[] buffer) {
            super(randomData, buffer);
        }

        @Override
        protected int decompose(long[] buffer, long base) {
            int count = 0;
            while (base != 0) {
                long mask = base & (-base);
                base &= ~mask;
                buffer[count++] = mask;
            }
            return count;
        }
    }

    private static class Rsp1 extends Harness {
        protected Rsp1(long[] randomData, long[] buffer) {
            super(randomData, buffer);
        }

        @Override
        protected int decompose(long[] buffer, long base) {
            int count = 0;

            if (0 != (base & 1)) {
                buffer[count++] = 1;
            }

            base >>>= 1;

            for (long bit = 1; 0 < bit && bit <= base; ) {
                if (0 < (base & bit)) {
                    buffer[count++] = (bit <<= 1);
                }
            }
            return count;
        }
    }

    private static class Rsp2 extends Harness {
        protected Rsp2(long[] randomData, long[] buffer) {
            super(randomData, buffer);
        }

        @Override
        protected int decompose(long[] buffer, long base) {
            int count = 0;

            for (long bit = 1; 0 != base; bit <<= 1, base >>>= 1) {
                if (0 < (base & 1)) {
                    buffer[count++] = bit;
                }
            }
            return count;
        }
    }

    private static class MarkPeters extends Harness {
        protected MarkPeters(long[] randomData, long[] buffer) {
            super(randomData, buffer);
        }

        @Override
        protected int decompose(long[] buffer, long base) {
            int count = Long.bitCount(base);
            for (int i = 0; i < count; i++) {
                base -= (buffer[i] = Long.lowestOneBit(base));
            }
            return count;

        }
    }

    private static class MarkPetersServer64Bit extends Harness {
        protected MarkPetersServer64Bit(long[] randomData, long[] buffer) {
            super(randomData, buffer);
        }

        @Override
        protected int decompose(long[] buffer, long base) {
            int count = 0;
            while ( 0 != (base ^= (buffer[count++] = Long.lowestOneBit(base))));
            return count;
        }
    }
}

样例输出

哪种方法最好取决于情况。

非服务器,32位JVM

Control took 41.175272 ms - last base: 0
Carl took 691.966919 ms - last base: 5852835112840111303
MarkPeters took 642.230253 ms - last base: 5852835112840111303
MarkPetersServer64Bit took 742.594626 ms - last base: 5852835112840111303
Rsp2 took 3886.203787 ms - last base: 5852835112840111303
Mikera took 1044.451494 ms - last base: 5852835112840111303

获胜者:MarkPeters

服务器,32位JVM

Control took 2.354383 ms - last base: 0
Carl took 508.687401 ms - last base: 338317437500027646
MarkPeters took 521.831297 ms - last base: 338317437500027646
MarkPetersServer64Bit took 727.052206 ms - last base: 338317437500027646
Rsp2 took 3811.75662 ms - last base: 338317437500027646
Mikera took 665.252599 ms - last base: 338317437500027646

获胜者: 卡尔

服务器(隐式或显式),64位JVM

Control took 0.007186 ms - last base: 0
Carl took 543.701859 ms - last base: -8898262206218882664
MarkPeters took 439.706079 ms - last base: -8898262206218882664
MarkPetersServer64Bit took 391.831055 ms - last base: -8898262206218882664
Rsp2 took 1861.40449 ms - last base: -8898262206218882664
Mikera took 435.964319 ms - last base: -8898262206218882664

胜者: MarkPetersServer64Bit

注意:据我所知,目前没有可在非服务器模式下运行的可比较的64位JVM。请参见这里

64位Java中是否同时支持-client和-server VM模式?

目前只有Java HotSpot Server VM支持64位操作,并且使用-d64时,-server选项是隐含的。这可能会在将来的版本中更改。


感谢您发布这个问题;我有一个问题 - 由于buffer从未被读取(只被覆盖),编译器是否会优化掉它的任何副作用?还是通过与控制方法进行比较来解决? - Carl
@Carl:我做了一些更改,包括在最后一个小迭代之后读取缓冲区,并在大迭代之间随机化数据。我认为现在没有什么可以被合理地优化掉了。是的,这个控制是一个很好的指示,说明没有什么可以被优化掉。 - Mark Peters

1
我会使用一个掩码,在每个循环中通过移位运算符计算:
for (int i= 0; i < result.length; i++)
    result[i]= base & (1<<i);

应该既清晰又快速。


这对于base中任意位位置都不起作用(例如,100101101的result长度为5)。 - Carl

1

这个版本怎么样?

public static int decompose(long[] buffer, long base) {
    int count = 0;

    if (0 != (base & 1)) {
        buffer[count++] = 1;
    }

    base >>>= 1;

    for (long bit = 1; 0 < bit && bit <= base; ) {
        if (0 < (base & bit)) {
            buffer[count++] = (bit <<= 1);
        }
    }
    return count;
}

在这种情况下,基数的符号位会使结束条件变得复杂,因此我们通过在开始时“调整”基数值来防止发生这种情况。

或者,我们可以将基数本身进行移位,而不是检查掩码与基数的比较:

public static int decompose(long[] buffer, long base) {
    int count = 0;

    for (long bit = 1; 0 != base; bit <<= 1, base >>>= 1) {
        if (0 < (base & 1)) {
            buffer[count++] = bit;
        }
    }
    return count;
}

我会把这个加入测试,但它似乎永远也不会结束……除非它真的很糟糕。我认为你一定是有一个无限循环或者类似的问题。 - Mark Peters
@Mark Peters,是的,你是正确的;对于最高位设置且缓冲区未耗尽的情况,第一个版本不会终止...第二个版本没有这个问题。我将编辑第一个版本以使其终止。 - rsp
@Carl:我已经将它添加到测试中,但正如你在输出中看到的那样,有时会得到错误的结果。 - Mark Peters
@Carl,最新版本中符号位不应该是问题。 - rsp
@rsp 第一个似乎仍然存在准确性问题;第二个返回正确的结果。 - Mark Peters
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接