Fastest de-interleave operation in C?

2019-03-25 11:21发布

问题:

I have a pointer to an array of bytes mixed that contains the interleaved bytes of two distinct arrays array1 and array2. Say mixed looks something like this:

a1b2c3d4...

What I need to do is de-interleave the bytes so I get array1 = abcd... and array2 = 1234.... I know the length of mixed ahead of time, and the lengths of array1 and array2 are equivalent, both equal to mixed / 2.

Here is my current implementation (array1 and array2 are already allocated):

int i, j;
int mixedLength_2 = mixedLength / 2;
for (i = 0, j = 0; i < mixedLength_2; i++, j += 2)
{
    array1[i] = mixed[j];
    array2[i] = mixed[j+1];
}

This avoids any expensive multiplication or division operations, but still doesn't run fast enough. I'm hoping there is something like memcpy that takes an indexer that can use low-level block copy operations to speed up the process. Is there a faster implementation than what I currently have?

Edit

The target platform is Objective-C for iOS and Mac. A fast operation is more important for iOS devices, so a solution targeting iOS specifically would be better than nothing.

Update

Thanks everyone for the responses, especially Stephen Canon, Graham Lee, and Mecki. Here is my "master" function that uses Stephen's NEON intrinsics if available and otherwise Graham's union cursors with a reduced number of iterations as suggested by Mecki.

void interleave(const uint8_t *srcA, const uint8_t *srcB, uint8_t *dstAB, size_t dstABLength)
{
#if defined __ARM_NEON__
    // attempt to use NEON intrinsics

    // iterate 32-bytes at a time
    div_t dstABLength_32 = div(dstABLength, 32);
    if (dstABLength_32.rem == 0)
    {
        while (dstABLength_32.quot --> 0)
        {
            const uint8x16_t a = vld1q_u8(srcA);
            const uint8x16_t b = vld1q_u8(srcB);
            const uint8x16x2_t ab = { a, b };
            vst2q_u8(dstAB, ab);
            srcA += 16;
            srcB += 16;
            dstAB += 32;
        }
        return;
    }

    // iterate 16-bytes at a time
    div_t dstABLength_16 = div(dstABLength, 16);
    if (dstABLength_16.rem == 0)
    {
        while (dstABLength_16.quot --> 0)
        {
            const uint8x8_t a = vld1_u8(srcA);
            const uint8x8_t b = vld1_u8(srcB);
            const uint8x8x2_t ab = { a, b };
            vst2_u8(dstAB, ab);
            srcA += 8;
            srcB += 8;
            dstAB += 16;
        }
        return;
    }
#endif

    // if the bytes were not aligned properly
    // or NEON is unavailable, fall back to
    // an optimized iteration

    // iterate 8-bytes at a time
    div_t dstABLength_8 = div(dstABLength, 8);
    if (dstABLength_8.rem == 0)
    {
        typedef union
        {
            uint64_t wide;
            struct { uint8_t a1; uint8_t b1; uint8_t a2; uint8_t b2; uint8_t a3; uint8_t b3; uint8_t a4; uint8_t b4; } narrow;
        } ab8x8_t;

        uint64_t *dstAB64 = (uint64_t *)dstAB;
        int j = 0;
        for (int i = 0; i < dstABLength_8.quot; i++)
        {
            ab8x8_t cursor;
            cursor.narrow.a1 = srcA[j  ];
            cursor.narrow.b1 = srcB[j++];
            cursor.narrow.a2 = srcA[j  ];
            cursor.narrow.b2 = srcB[j++];
            cursor.narrow.a3 = srcA[j  ];
            cursor.narrow.b3 = srcB[j++];
            cursor.narrow.a4 = srcA[j  ];
            cursor.narrow.b4 = srcB[j++];
            dstAB64[i] = cursor.wide;
        }
        return;
    }

    // iterate 4-bytes at a time
    div_t dstABLength_4 = div(dstABLength, 4);
    if (dstABLength_4.rem == 0)
    {
        typedef union
        {
            uint32_t wide;
            struct { uint8_t a1; uint8_t b1; uint8_t a2; uint8_t b2; } narrow;
        } ab8x4_t;

        uint32_t *dstAB32 = (uint32_t *)dstAB;
        int j = 0;
        for (int i = 0; i < dstABLength_4.quot; i++)
        {
            ab8x4_t cursor;
            cursor.narrow.a1 = srcA[j  ];
            cursor.narrow.b1 = srcB[j++];
            cursor.narrow.a2 = srcA[j  ];
            cursor.narrow.b2 = srcB[j++];
            dstAB32[i] = cursor.wide;
        }
        return;
    }

    // iterate 2-bytes at a time
    div_t dstABLength_2 = div(dstABLength, 2);
    typedef union
    {
        uint16_t wide;
        struct { uint8_t a; uint8_t b; } narrow;
    } ab8x2_t;

    uint16_t *dstAB16 = (uint16_t *)dstAB;
    for (int i = 0; i < dstABLength_2.quot; i++)
    {
        ab8x2_t cursor;
        cursor.narrow.a = srcA[i];
        cursor.narrow.b = srcB[i];
        dstAB16[i] = cursor.wide;
    }
}

void deinterleave(const uint8_t *srcAB, uint8_t *dstA, uint8_t *dstB, size_t srcABLength)
{
#if defined __ARM_NEON__
    // attempt to use NEON intrinsics

    // iterate 32-bytes at a time
    div_t srcABLength_32 = div(srcABLength, 32);
    if (srcABLength_32.rem == 0)
    {
        while (srcABLength_32.quot --> 0)
        {
            const uint8x16x2_t ab = vld2q_u8(srcAB);
            vst1q_u8(dstA, ab.val[0]);
            vst1q_u8(dstB, ab.val[1]);
            srcAB += 32;
            dstA += 16;
            dstB += 16;
        }
        return;
    }

    // iterate 16-bytes at a time
    div_t srcABLength_16 = div(srcABLength, 16);
    if (srcABLength_16.rem == 0)
    {
        while (srcABLength_16.quot --> 0)
        {
            const uint8x8x2_t ab = vld2_u8(srcAB);
            vst1_u8(dstA, ab.val[0]);
            vst1_u8(dstB, ab.val[1]);
            srcAB += 16;
            dstA += 8;
            dstB += 8;
        }
        return;
    }
#endif

    // if the bytes were not aligned properly
    // or NEON is unavailable, fall back to
    // an optimized iteration

    // iterate 8-bytes at a time
    div_t srcABLength_8 = div(srcABLength, 8);
    if (srcABLength_8.rem == 0)
    {
        typedef union
        {
            uint64_t wide;
            struct { uint8_t a1; uint8_t b1; uint8_t a2; uint8_t b2; uint8_t a3; uint8_t b3; uint8_t a4; uint8_t b4; } narrow;
        } ab8x8_t;

        uint64_t *srcAB64 = (uint64_t *)srcAB;
        int j = 0;
        for (int i = 0; i < srcABLength_8.quot; i++)
        {
            ab8x8_t cursor;
            cursor.wide = srcAB64[i];
            dstA[j  ] = cursor.narrow.a1;
            dstB[j++] = cursor.narrow.b1;
            dstA[j  ] = cursor.narrow.a2;
            dstB[j++] = cursor.narrow.b2;
            dstA[j  ] = cursor.narrow.a3;
            dstB[j++] = cursor.narrow.b3;
            dstA[j  ] = cursor.narrow.a4;
            dstB[j++] = cursor.narrow.b4;
        }
        return;
    }

    // iterate 4-bytes at a time
    div_t srcABLength_4 = div(srcABLength, 4);
    if (srcABLength_4.rem == 0)
    {
        typedef union
        {
            uint32_t wide;
            struct { uint8_t a1; uint8_t b1; uint8_t a2; uint8_t b2; } narrow;
        } ab8x4_t;

        uint32_t *srcAB32 = (uint32_t *)srcAB;
        int j = 0;
        for (int i = 0; i < srcABLength_4.quot; i++)
        {
            ab8x4_t cursor;
            cursor.wide = srcAB32[i];
            dstA[j  ] = cursor.narrow.a1;
            dstB[j++] = cursor.narrow.b1;
            dstA[j  ] = cursor.narrow.a2;
            dstB[j++] = cursor.narrow.b2;
        }
        return;
    }

    // iterate 2-bytes at a time
    div_t srcABLength_2 = div(srcABLength, 2);
    typedef union
    {
        uint16_t wide;
        struct { uint8_t a; uint8_t b; } narrow;
    } ab8x2_t;

    uint16_t *srcAB16 = (uint16_t *)srcAB;
    for (int i = 0; i < srcABLength_2.quot; i++)
    {
        ab8x2_t cursor;
        cursor.wide = srcAB16[i];
        dstA[i] = cursor.narrow.a;
        dstB[i] = cursor.narrow.b;
    }
}

回答1:

Off the top of my head, I don't know of a library function for de-interleaving 2 channel byte data. However it's worth filing a bug report with Apple to request such a function.

In the meantime, it's pretty easy to vectorize such a function using NEON or SSE intrinsics. Specifically, on ARM you will want to use vld1q_u8 to load a vector from each source array, vuzpq_u8 to de-interleave them, and vst1q_u8 to store the resulting vectors; here's a rough sketch that I haven't tested or even tried to build, but it should illustrate the general idea. More sophisticated implementations are definitely possible (in particular, NEON can load/store two 16B registers in a single instruction, which the compiler may not do with this, and some amount of pipelining and/or unrolling may be beneficial depending on how long your buffers are):

#if defined __ARM_NEON__
#   include <arm_neon.h>
#endif
#include <stdint.h>
#include <stddef.h>

void deinterleave(uint8_t *mixed, uint8_t *array1, uint8_t *array2, size_t mixedLength) {
#if defined __ARM_NEON__
    size_t vectors = mixedLength / 32;
    mixedLength %= 32;
    while (vectors --> 0) {
        const uint8x16_t src0 = vld1q_u8(mixed);
        const uint8x16_t src1 = vld1q_u8(mixed + 16);
        const uint8x16x2_t dst = vuzpq_u8(src0, src1);
        vst1q_u8(array1, dst.val[0]);
        vst1q_u8(array2, dst.val[1]);
        mixed += 32;
        array1 += 16;
        array2 += 16;
    }
#endif
    for (size_t i=0; i<mixedLength/2; ++i) {
        array1[i] = mixed[2*i];
        array2[i] = mixed[2*i + 1];
    }
}


回答2:

I've only tested this lightly but it seemed at least twice as fast as your version:

typedef union {
uint16_t wide;
struct { uint8_t top; uint8_t bottom; } narrow;
} my_union;

uint16_t *source = (uint16_t *)mixed;
for (int i = 0; i < mixedLength/2; i++)
{
    my_union cursor;
    cursor.wide = source[i];
    array1[i] = cursor.narrow.top;
    array2[i] = cursor.narrow.bottom;
}

Notice that I wasn't careful with structure packing, but that in this case on this architecture that isn't a problem. Notice also someone might complain at my choice of naming top and bottom; I assume you know which half of which integers you need.



回答3:

Okay, here is your original method:

static void simpleDeint (
    uint8_t * array1, uint8_t * array2, uint8_t * mixed, int mixedLength
) {
    int i, j;
    int mixedLength_2 = mixedLength / 2;
    for (i = 0, j = 0; i < mixedLength_2; i++, j += 2)
    {
        array1[i] = mixed[j];
        array2[i] = mixed[j+1];
    }
}

With 10 million entries and -O3 (compiler shall optimize for maximum speed), I can run this 154 times per second on my Mac.

Here is my first suggestion:

static void structDeint (
    uint8_t * array1, uint8_t * array2, uint8_t * mixed, int mixedLength
) {
    int i;
    int len;
    uint8_t * array1Ptr = (uint8_t *)array1;
    uint8_t * array2Ptr = (uint8_t *)array2;
    struct {
        uint8_t byte1;
        uint8_t byte2;
    } * tb = (void *)mixed;

    len = mixedLength / 2;
    for (i = 0; i < len; i++) {
      *(array1Ptr++) = tb->byte1;
      *(array2Ptr++) = tb->byte2;
      tb++;
    }
}

Same count and optimization as before, I get 193 runs per second.

Now the suggestion from Graham Lee:

static void unionDeint (
    uint8_t * array1, uint8_t * array2, uint8_t * mixed, int mixedLength
) {
    union my_union {
        uint16_t wide;
        struct { uint8_t top; uint8_t bottom; } narrow;
    };

    uint16_t * source = (uint16_t *)mixed;
    for (int i = 0; i < mixedLength/2; i++) {
        union my_union cursor;
        cursor.wide = source[i];
        array1[i] = cursor.narrow.top;
        array2[i] = cursor.narrow.bottom;
    }
}

Same setup as before, 198 runs per second (NOTE: This method is not endian safe, result depends on CPU endianess. In your case array1 and array2 are probably swapped since ARM is little endian, so you would have to swap them in the code).

Here's my best one so far:

static void uint32Deint (
    uint8_t * array1, uint8_t * array2, uint8_t * mixed, int mixedLength
) {
    int i;
    int count;
    uint32_t * fourBytes = (void *)mixed;
    uint8_t * array1Ptr = (uint8_t *)array1;
    uint8_t * array2Ptr = (uint8_t *)array2;


    count = mixedLength / 4;
    for (i = 0; i < count; i++) {
        uint32_t temp = *(fourBytes++);

#if __LITTLE_ENDIAN__
        *(array1Ptr++) = (uint8_t)(temp & 0xFF);
        temp >>= 8;
        *(array2Ptr++) = (uint8_t)(temp & 0xFF);
        temp >>= 8;
        *(array1Ptr++) = (uint8_t)(temp & 0xFF);
        temp >>= 8;
        *(array2Ptr++) = tb->byte2;

#else
        *(array1Ptr++) = (uint8_t)(temp >> 24);
        *(array2Ptr++) = (uint8_t)((temp >> 16) & 0xFF);
        *(array1Ptr++) = (uint8_t)((temp >>  8) & 0xFF);
        *(array2Ptr++) = (uint8_t)(temp & 0xFF);
#endif
    }
    // Either it is a multiple of 4 or a multiple of 2.
    // If it is a multiple of 2, 2 bytes are left over.
    if (count * 4 != mixedLength) {
        *(array1Ptr) = mixed[mixedLength - 2];
        *(array2Ptr) = mixed[mixedLength - 1];
    }
}

Same setup as above, 219 times a second and unless I made a mistake, should work with either endianess.



回答4:

I recommend Graham's solution, but if this is really speed critical and you are willing to go Assembler, you can get even faster.

The idea is this:

  1. Read an entire 32bit integer from mixed. You'll get 'a1b2'.

  2. Rotate the lower 16bit by 8 bits to get '1ab2'(we are using little endians, since this is the default in ARM and therefore Apple A#, so the first two bytes are the lower ones).

  3. Rotate the entire 32bit register right(I think it's right...) by 8 bits to get '21ab'.

  4. Rotate the lower 16bit by 8 bits to get '12ab'

  5. Write the lower 8 bits to array2.

  6. Rotate the entire 32bit register by 16bit.

  7. Write the lower 8 bits to array1

  8. Advance array1 by 16bit, array2 by 16bit, and mixed by 32bit.

  9. Repeat.

We have traded 2 memory reads(assuming we use the Graham's version or equivalent) and 4 memory with one memory read, two memory writes and 4 register operations. While the number of operations has gone up from 6 to 7, register operations are faster than memory operations, so it's more efficient that way. Also, since we read from mixed 32bit at a time instead of 16, we cut iteration management by half.

PS: Theoretically this can also be done for 64bit architecture, but doing all those rotations for 'a1b2c3d4' will drive you to madness.



回答5:

For x86 SSE, the pack and punpck instructions are what you need. Examples using AVX for the convenience of non-destructive 3-operand instructions. (Not using AVX2 256b-wide instructions, because the 256b pack/unpck instructions do two 128b unpacks in the low and high 128b lanes, so you'd need a shuffle to get things in the correct final order.)

An intrinsics version of the following would work the same. Asm instructions are shorter to type for just writing a quick answer.

Interleave: abcd and 1234 -> a1b2c3d4:

# loop body:
vmovdqu    (%rax), %xmm0  # load the sources
vmovdqu    (%rbx), %xmm1
vpunpcklbw %xmm0, %xmm1, %xmm2  # low  halves -> 128b reg
vpunpckhbw %xmm0, %xmm2, %xmm3  # high halves -> 128b reg
vmovdqu    %xmm2, (%rdi)   # store the results
vmovdqu    %xmm3, 16(%rdi)
# blah blah some loop structure.

`punpcklbw` interleaves the bytes in the low 64 of the two source `xmm` registers.  There are `..wd` (word->dword), and dword->qword versions which would be useful for 16 or 32bit elements.

De-interleave: a1b2c3d4 -> abcd and 1234

#outside the loop
vpcmpeqb    %xmm5, %xmm5   # set to all-1s
vpsrlw     $8, %xmm5, %xmm5   # every 16b word has low 8b = 0xFF, high 8b = 0.

# loop body
vmovdqu    (%rsi), %xmm2     # load two src chunks
vmovdqu    16(%rsi), %xmm3
vpand      %xmm2, %xmm5, %xmm0  # mask to leave only the odd bytes
vpand      %xmm3, %xmm5, %xmm1
vpackuswb  %xmm0, %xmm1, %xmm4
vmovdqu    %xmm4, (%rax)    # store 16B of a[]
vpsrlw     $8, %xmm2, %xmm6     # even bytes -> odd bytes
vpsrlw     $8, %xmm3, %xmm7
vpackuswb  %xmm6, %xmm7, %xmm4
vmovdqu    %xmm4, (%rbx)

This can of course use a lot fewer registers. I avoided reusing registers for readability, not performance. Hardware register renaming makes reuse a non-issue, as long as you start with something that doesn't depend on the previous value. (e.g. movd, not movss or pinsrd.)

Deinterleave is so much more work because the pack instructions do signed or unsigned saturation, so the upper 8b of each 16b element has to be zeroed first.

An alternative would be to use pshufb to pack the odd or even words of a single source reg into the low 64 of a register. However, outside of the AMD XOP instruction set's VPPERM, there isn't a shuffle that can select bytes from 2 registers at once (like Altivec's much-loved vperm). So with just SSE/AVX, you'd need 2 shuffles for every 128b of interleaved data. And since store-port usage could be the bottleneck, a punpck to combine two 64bit chunks of a into a single register to set up a 128b store.

With AMD XOP, deinterleave would be 2x128b loads, 2 VPPERM, and 2x128b stores.



回答6:

  1. premature optimisation is bad

  2. your compiler is probably better at optimising than you are.

That said, there are things you can do to help out the compiler because you have semantic knowledge of your data that a compiler cannot have:

  1. read and write as many bytes as you can, up to the native word size - memory operations are expensive, so do manipulations in registers where possible

  2. unroll loops - look into "Duff's Device".

FWIW, I produced two versions of your copy loop, one much the same as yours, the second using what most would consider "optimal" (albeit still simple) C code:

void test1(byte *p, byte *p1, byte *p2, int n)
{
    int i, j;
    for (i = 0, j = 0; i < n / 2; i++, j += 2) {
        p1[i] = p[j];
        p2[i] = p[j + 1];
    }
}

void test2(byte *p, byte *p1, byte *p2, int n)
{
    while (n) {
        *p1++ = *p++;
        *p2++ = *p++;
        n--; n--;
    }
}

With gcc -O3 -S on Intel x86 they both produced almost identical assembly code. Here are the inner loops:

LBB1_2:
    movb    -1(%rdi), %al
    movb    %al, (%rsi)
    movb    (%rdi), %al
    movb    %al, (%rdx)
    incq    %rsi
    addq    $2, %rdi
    incq    %rdx
    decq    %rcx
    jne LBB1_2

and

LBB2_2:
    movb    -1(%rdi), %al
    movb    %al, (%rsi)
    movb    (%rdi), %al
    movb    %al, (%rdx)
    incq    %rsi
    addq    $2, %rdi
    incq    %rdx
    addl    $-2, %ecx
    jne LBB2_2

Both have the same number of instructions, the difference accounted for solely because the first version counts up to n / 2, and the second counts down to zero.

EDIT here's a better version:

/* non-portable - assumes little endian */
void test3(byte *p, byte *p1, byte *p2, int n)
{
    ushort *ps = (ushort *)p;

    n /= 2;
    while (n) {
        ushort n = *ps++;
        *p1++ = n;
        *p2++ = n >> 8;
    }
}

resulting in:

LBB3_2:
    movzwl  (%rdi), %ecx
    movb    %cl, (%rsi)
    movb    %ch, (%rdx)  # NOREX
    addq    $2, %rdi
    incq    %rsi
    incq    %rdx
    decq    %rax
    jne LBB3_2

which is one fewer instruction because it takes advantage of the immediate access to %cl and %ch.