How to quickly subtract one ushort array from another in C#?
Asked Answered
P

6

5

I need to quickly subtract each value in ushort arrayA from the corresponding index value in ushort arrayB which has an identical length.

In addition, if the difference is negative, I need to store a zero, not the negative difference.

(Length = 327680 to be exact, since I'm subtracting a 640x512 image from another image of identical size).

The code below is currently taking ~20ms and I'd like to get it down under ~5ms if possible. Unsafe code is ok, but please provide an example, as I'm not super-skilled at writing unsafe code.

Thank you!

public ushort[] Buffer { get; set; }

public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
    System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
    sw.Start();

    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        int difference = Buffer[index] - backgroundBuffer[index];

        if (difference >= 0)
            Buffer[index] = (ushort)difference;
        else
            Buffer[index] = 0;
    }

    Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
}

UPDATE: While it's not strictly C#, for the benefit of others who read this, I finally ended up adding a C++ CLR Class Library to my solution with the following code. It runs in ~3.1ms. If an unmanaged C++ library is used, it runs in ~2.2ms. I decided to go with the managed library since the time difference was small.

// SpeedCode.h
#pragma once
using namespace System;

namespace SpeedCode
{
    public ref class SpeedClass
    {
        public:
            static void SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength);
    };
}

// SpeedCode.cpp
// This is the main DLL file.
#include "stdafx.h"
#include "SpeedCode.h"

namespace SpeedCode
{
    void SpeedClass::SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength)
    {
        for (int index = 0; index < bufferLength; index++)
        {
            buffer[index] = (UInt16)((buffer[index] - backgroundBuffer[index]) * (buffer[index] > backgroundBuffer[index]));
        }
    }
}

Then I call it like this:

    public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
    {
        System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
        sw.Start();

        SpeedCode.SpeedClass.SpeedSubtractBackgroundFromBuffer(Buffer, backgroundBuffer, Buffer.Length);

        Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
    }
Pulmonary answered 16/1, 2014 at 0:55 Comment(6)
~20ms sounds rather slow (maybe your machine is low spec?). Just in case, are you running a release build without debugging?Maelstrom
p/invoke and use PSUBW?Aecium
Out of interest but are you operating on a greyscale image?Raspberry
@GeoffBattye: My machine is Win7 64-bit i5. I was running a debug build.Pulmonary
@nick_w: Yes, I'm operating on grayscale images.Pulmonary
I've updated my answer with a new faster method that uses C++/CLI.Maelstrom
R
5

Some benchmarks.

  1. SubtractBackgroundFromBuffer: this is the original method from the question.
  2. SubtractBackgroundFromBufferWithCalcOpt: this is the original method augmented with TTat's idea for improving the calculation speed.
  3. SubtractBackgroundFromBufferParallelFor: the solution from Selman22's answer.
  4. SubtractBackgroundFromBufferBlockParallelFor: my answer. Similar to 3., but breaks the processing up into blocks of 4096 values.
  5. SubtractBackgroundFromBufferPartitionedParallelForEach: Geoff's first answer.
  6. SubtractBackgroundFromBufferPartitionedParallelForEachHack: Geoff's second answer.

Updates

Interestingly, I can get a small speed increase (~6%) for SubtractBackgroundFromBufferBlockParallelFor by using (as suggested by Bruno Costa)

Buffer[i] = (ushort)Math.Max(difference, 0);

instead of

if (difference >= 0)
    Buffer[i] = (ushort)difference;
else
    Buffer[i] = 0;

Results

Note that this is the total time for the 1000 iterations in each run.

SubtractBackgroundFromBuffer(ms):                                 2,062.23 
SubtractBackgroundFromBufferWithCalcOpt(ms):                      2,245.42
SubtractBackgroundFromBufferParallelFor(ms):                      4,021.58
SubtractBackgroundFromBufferBlockParallelFor(ms):                   769.74
SubtractBackgroundFromBufferPartitionedParallelForEach(ms):         827.48
SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):     539.60

So it seems from those results that the best approach combines the calculation optimizations for a small gain and the makes use of Parallel.For to operate on chunks of the image. Your mileage will of course vary, and performance of parallel code is sensitive to the CPU you are running.

Test Harness

I ran this for each method in Release mode. I am starting and stopping the Stopwatch this way to ensure only processing time is measured.

System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
ushort[] bgImg = GenerateRandomBuffer(327680, 818687447);

for (int i = 0; i < 1000; i++)
{
    Buffer = GenerateRandomBuffer(327680, 128011992);                

    sw.Start();
    SubtractBackgroundFromBuffer(bgImg);
    sw.Stop();
}

Console.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));


public static ushort[] GenerateRandomBuffer(int size, int randomSeed)
{
    ushort[] buffer = new ushort[327680];
    Random random = new Random(randomSeed);

    for (int i = 0; i < size; i++)
    {
        buffer[i] = (ushort)random.Next(ushort.MinValue, ushort.MaxValue);
    }

    return buffer;
}

The Methods

public static void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        int difference = Buffer[index] - backgroundBuffer[index];

        if (difference >= 0)
            Buffer[index] = (ushort)difference;
        else
            Buffer[index] = 0;
    }
}

public static void SubtractBackgroundFromBufferWithCalcOpt(ushort[] backgroundBuffer)
{
    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        if (Buffer[index] < backgroundBuffer[index])
        {
            Buffer[index] = 0;
        }
        else
        {
            Buffer[index] -= backgroundBuffer[index];
        }
    }
}

public static void SubtractBackgroundFromBufferParallelFor(ushort[] backgroundBuffer)
{
    Parallel.For(0, Buffer.Length, (i) =>
    {
        int difference = Buffer[i] - backgroundBuffer[i];
        if (difference >= 0)
            Buffer[i] = (ushort)difference;
        else
            Buffer[i] = 0;
    });
}        

public static void SubtractBackgroundFromBufferBlockParallelFor(ushort[] backgroundBuffer)
{
    int blockSize = 4096;

    Parallel.For(0, (int)Math.Ceiling(Buffer.Length / (double)blockSize), (j) =>
    {
        for (int i = j * blockSize; i < (j + 1) * blockSize; i++)
        {
            int difference = Buffer[i] - backgroundBuffer[i];

            Buffer[i] = (ushort)Math.Max(difference, 0);                    
        }
    });
}

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                if (Buffer[i] < backgroundBuffer[i])
                {
                    Buffer[i] = 0;
                }
                else
                {
                    Buffer[i] -= backgroundBuffer[i];
                }
            }
        });
}

public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
    {
        for (int i = range.Item1; i < range.Item2; ++i)
        {
            unsafe
            {
                var nonNegative = Buffer[i] > backgroundBuffer[i];
                Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                    *((int*)(&nonNegative)));
            }
        }
    });
}
Raspberry answered 16/1, 2014 at 1:53 Comment(6)
@BrunoCosta 1. I'm not sure I get your meaning. What do you mean by 'partitioned again'? 2. What makes you think this isn't operating on the whole array? Blocksize was a somewhat arbitrary choice that is perhaps worth further benchmarking on.Raspberry
I totally miss spotted the code... Still I had a believe that Parallel.Foreach would make a partition it self. By a partition I mean that many threads could be assigned to process your 4096 block. But maybe I'm mistaken..Nicki
@BrunoCosta Looking at the documentation, the delegate you pass into Parallel.For is executed once per iteration. I think from that we can be confident that each iteration is going to be run in a single thread.Raspberry
You should also try storing a reference to Buffer in a local variable (in the parallel versions, do this inside the delegate). AFAIK the optimizer doesn't do this for you because it's possible that the value of Buffer is changed by another thread. Also, if the JIT isn't sure that the array instance doesn't change, it can't eliminate redundant bounds checks.Rizzi
@Rizzi I just tried this, i.e., placing the code var localBuffer = Buffer; in the Parallel.For delegate, and for whatever reason the code ran considerably slower. Strange.Raspberry
Thanks to all of you for your great suggestions! I wish I could mark several of them as the solution, as many folks contributed to the overall solution. As it stands, I have to give the solution to nick_w for his test harness, listing of the various methods, benchmarking and overall completeness. Using the Parallel.ForEach with the Math.Max, I dropped the average execution time to ~9ms. Thanks to all!Pulmonary
M
5

This is an interesting question.

Only performing the subtraction after testing that the result won't be negative (as suggested by TTat and Maximum Cookie) has negligible impact, as this optimization can already be performed by the JIT compiler.

Parallelizing the task (as suggested by Selman22) is a good idea, but when the loop is as fast as it is in this case, the overhead ends up outwaying the gains, so Selman22's implementation actually runs slower in my testing. I suspect that nick_w's benchmarks were produced with debugger attached, hiding this fact.

Parallelizing the task in larger chunks (as suggested by nick_w) deals with the overhead problem, and can actually produce faster performance, but you don't have to calculate the chunks yourself - you can use Partitioner to do this for you:

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                if (Buffer[i] < backgroundBuffer[i])
                {
                    Buffer[i] = 0;
                }
                else
                {
                    Buffer[i] -= backgroundBuffer[i];
                }
            }
        });
}

The above method consistently outperforms nick_w's hand-rolled chunking in my testing.

But wait! There's more to it than that.

The real culprit in slowing down your code is not the assignment or arithmetic. It's the if statement. How it affects performance is going to be majorly impacted by the nature of the data you are processing.

nick_w's benchmarking generates random data of the same magnitude for both buffers. However, I suspect that it is very likely that you actually have lower average magnitude data in the background buffer. This detail can be significant due to branch prediction (as explained in this classic SO answer).

When the value in the background buffer is usually smaller than that in the buffer, the JIT compiler can notice this, and optimize for that branch accordingly. When the data in each buffer is from the same random population there is no way to guess the outcome of the if statement with greater than 50% accuracy. It is this latter scenario that nick_w is benchmarking, and under those conditions we could potentially further optimize your method by using unsafe code to convert a bool to an integer and avoid branching at all. (Note that the following code is relying on an implementation detail of how bool's are represented in memory, and while it works for your scenario in .NET 4.5, it is not necessarily a good idea, and is shown here for illustrative purposes.)

public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                unsafe
                {
                    var nonNegative = Buffer[i] > backgroundBuffer[i];
                    Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                        *((int*)(&nonNegative)));
                }
            }
        });
}

If you are really looking to shave a bit more time off, then you can follow this approach in a safer manner by switching language to C++/CLI, as that will let you use a boolean in an arithmetic expression without resorting to unsafe code:

UInt16 MyCppLib::Maths::SafeSubtraction(UInt16 minuend, UInt16 subtrahend)
{
    return (UInt16)((minuend - subtrahend) * (minuend > subtrahend));
}

You can create a purely managed DLL using C++/CLI exposing the above static method, and then use it in your C# code:

public static void SubtractBackgroundFromBufferPartitionedParallelForEachCpp(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
    {
        for (int i = range.Item1; i < range.Item2; ++i)
        {
            Buffer[i] = 
                MyCppLib.Maths.SafeSubtraction(Buffer[i], backgroundBuffer[i]);
        }
    });
}

This outperforms the hacky unsafe C# code above. In fact, it is so fast that you could write the whole method using C++/CLI forgetting about parallelization, and it would still out-perform the other techniques.

Using nick_w's test harness, the above method will outperform any of the other suggestions published here so far. Here are the results I get (1-4 are the cases he tried, and 5-7 are the ones outlined in this answer):

1. SubtractBackgroundFromBuffer(ms):                               2,021.37
2. SubtractBackgroundFromBufferWithCalcOpt(ms):                    2,125.80
3. SubtractBackgroundFromBufferParallelFor(ms):                    3,431.58
4. SubtractBackgroundFromBufferBlockParallelFor(ms):               1,401.36
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):     1,197.76
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):   742.72
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms):    499.27

However, in the scenario I expect you actually have, where background values are typically smaller, successful branch prediction improves results across the board, and the 'hack' to avoid the if statement is actually slower:

Here are the results I get using nick_w's test harness when I restrict values in the background buffer to the range 0-6500 (c. 10% of the buffer):

1. SubtractBackgroundFromBuffer(ms):                                 773.50
2. SubtractBackgroundFromBufferWithCalcOpt(ms):                      915.91
3. SubtractBackgroundFromBufferParallelFor(ms):                    2,458.36
4. SubtractBackgroundFromBufferBlockParallelFor(ms):                 663.76
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):       658.05
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):   762.11
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms):    494.12

You can see that results 1-5 have all dramatically improved since they are now benefiting from better branch prediction. Results 6 & 7 haven't changed much, since they have avoided branching.

This change in data has completely changes things. In this scenario, even the fastest all C# solution is now only 15% faster than your original code.

Bottom line: be sure to test any method you pick with representative data, or your results will be meaningless.

Maelstrom answered 16/1, 2014 at 8:51 Comment(7)
You're casting a bool* to an int* (invalid) and are assuming things about the numeric value of a true bool (invalid - is not guaranteed to be one). I like the general idea of the branch avoidance though.Taurine
@Taurine Yeah, that code does work, but you're right that it's not a good idea to rely on this implementation detail - I'll clarify that. I suspect that this hack will actually be slower with the OP's data anyway, as I say in the answer.Maelstrom
@GeoffBattye: Thank you for your benchmarking and great comments! I wish I could also mark your answer as a solution! By the way, the backgroundBuffer will almost always have smaller values than the Buffer, so hopefully the JIT compiler will notice and optimize appropriately, as you said.Pulmonary
@Taurine I've now added a safe version of the branch avoidance technique using C++/CLI.Maelstrom
Interesting result. This is still managed and JIT'ed, right? I wonder what instructions were emitted that make this so much faster.Taurine
@Taurine Yes, it's pure managed code, but compiling the C++/CLI with /clr:pure allows compilation of native types into IL. The crucial difference is the use of the IL instruction cgt.un instead of branching.Maelstrom
Good to know. The .NET JIT is incapable of doing that transformation itself and the C++ compiler is helping out.Taurine
C
1

You can try Parallel.For :

Parallel.For(0, Buffer.Length, (i) =>
{
    int difference = Buffer[i] - backgroundBuffer[i];
    if (difference >= 0)
          Buffer[i] = (ushort) difference;
    else
         Buffer[i] = 0;
}); 

Update: I have tried it and I see there is a minimal difference in your case,But when the array become bigger the difference is become bigger too

enter image description here

Chekiang answered 16/1, 2014 at 0:59 Comment(3)
@elgonzo Parallel.For does not create new task for every iteration: Does Parallel.For use one Task per iteration?Kip
This might save a few subtract and casting cycles: if (Buffer[i] <= backgroundBuffer[i]) { Buffer[i] = 0; } else { Buffer[i] -= backgroundBuffer[i]; }Cosmopolis
Not that may have a impact on performance but i would probably use Buffer[i] = Math.max(0, difference). (maybe you may benchmark that)Nicki
S
1

You may get a minor performance increase by checking first if the result is going to be negative before actually performing the subtraction. That way, there is no need to perform the subtraction if the result will be negative. Example:

if (Buffer[index] > backgroundBuffer[index])
    Buffer[index] = (ushort)(Buffer[index] - backgroundBuffer[index]);
else
    Buffer[index] = 0;
Subordinate answered 16/1, 2014 at 1:14 Comment(1)
This dependes on how jitter compiles il code to assembly. And even if it saves speed it wouldn't be more than some microseconds.Nicki
B
0

Here's a solution that uses Zip():

Buffer = Buffer.Zip<ushort, ushort, ushort>(backgroundBuffer, (x, y) =>
{
    return (ushort)Math.Max(0, x - y);
}).ToArray();

It doesn't perform as well as the other answers, but it's definitely the shortest solution.

Bernal answered 16/1, 2014 at 10:35 Comment(0)
S
0

What about,

Enumerable.Range(0, Buffer.Length).AsParalell().ForAll(i =>
    {
         unsafe
        {
            var nonNegative = Buffer[i] > backgroundBuffer[i];
            Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                *((int*)(&nonNegative)));
        }
    });
Salicylate answered 16/1, 2014 at 10:58 Comment(1)
This one comes out about 10 times slower than Parallel.Foreach with Partitioner. Surprising that it is that far behind.Maelstrom

© 2022 - 2024 — McMap. All rights reserved.