How to eat bytes flowing out of a stream?
Asked Answered
T

2

5

i am fixing a ZIP library class. Internally, nearly all ZIP implementations use DEFLATE compression (RFC1951).

The problem is that, in Delphi, i don't have access to any DEFLATE compression libraries. But the one thing we do have plenty of is ZLIB compression code (RFC1950). It even ships with Delphi, and there are a half-dozen other implementations floating around.

Internally, ZLIB also uses DEFLATE for compression. So i want to do what everyone has done - use the Delphi zlib library for its DEFLATE compression functionality.

The problem is that ZLIB adds a 2-byte prefix, and 4-byte trailer to the DEFLATED data:

[CMF]                1 byte
[FLG]                1 byte
[...deflate compressed data...]
[Adler-32 checksum]  4 bytes

So what i need is a way to use the standard TCompressionStream (or TZCompressionStream, or TZCompressionStreamEx depending on the source code you're using) stream to compress data:

procedure CompressDataToTargetStream(sourceStream: TStream; targetStream: TStream);
var
   compressor: TCompressionStream;
begin
   compressor := TCompressionStream.Create(clDefault, targetStream); //clDefault = CompressionLevel
   try
      compressor.CopyFrom(sourceStream, sourceStream.Length)
   finally
      compressor.Free; 
   end;
end;

And that works, except it wrote out the leading 2-bytes and trailing 4-bytes; i need to strip those.

So i wrote a TByteEaterStream:

TByteEaterStream = class(TStream)
public
   constructor Create(TargetStream: TStream; 
         LeadingBytesToEat, TrailingBytesToEat: Integer);
end;

for example

procedure CompressDataToTargetStream(sourceStream: TStream; targetStream: TStream);
var
   byteEaterStream: TByteEaterStream;
   compressor: TCompressionStream;
begin
   byteEaterStream := TByteEaterStream.Create(targetStream, 2, 4); //2 leading bytes, 4 trailing bytes
   try
      compressor := TCompressionStream.Create(clDefault, byteEaterStream); //clDefault = CompressionLevel
      try
         compressor.CopyFrom(sourceStream, sourceStream.Length)
      finally
         compressor.Free; 
      end;
   finally
      byteEaterStream.Free;
   end;
end;

This stream overrides the write method. It's trivial to eat the first 2 bytes. The trick was to eat the trailing 4 bytes.

The eater stream has a 4-byte array, and i always hold the last four bytes of every write in the buffer. When the EaterStream is destroyed, the trailing four bytes go with it.

The problem is that shuffling a few million writes through this buffer is killing performance. The typical use upstream is:

for each of a million data rows
    stream.Write(s, Length(s)); //30-90 character string

i definitely don't want the upstream user to have to indicate that "the end is near". i just want it to be faster.

The Question

Watching a stream of bytes flowing by, what is the best way to hold-back the last four bytes; given that you don't know at what moment the write will be the last.

The code that i'm fixing wrote the entire compressed version into a TStringStream, and then only grabbed the 900MB - 6 bytes to get at the internal DEFLATE data:

cs := TStringStream.Create('');
....write compressed data to cs
S := Copy(CS.DataString, 3, Length(CS.DataString) - 6);

Except that runs the user out of memory. Initially i changed it to write to a TFileStream, then i could perform the same trick.

But i want the better solution; the stream solution. i want the data to go into the final stream compressed, without any intermediate storage.

My implementation

Not that it helps anything; because i'm not necesarilly asking for a system that even uses an adapting stream to do the trimming

TByteEaterStream = class(TStream)
private
    FTargetStream: TStream;
    FTargetStreamOwnership: TStreamOwnership;
    FLeadingBytesToEat: Integer;
    FTrailingBytesToEat: Integer;
    FLeadingBytesRemaining: Integer;

    FBuffer: array of Byte;
    FValidBufferLength: Integer;
    function GetBufferValidLength: Integer;
public
    constructor Create(TargetStream: TStream; LeadingBytesToEat, TrailingBytesToEat: Integer; StreamOwnership: TStreamOwnership=soReference);
    destructor Destroy; override;

    class procedure SelfTest;

    procedure Flush;

    function Read(var Buffer; Count: Longint): Longint; override;
    function Write(const Buffer; Count: Longint): Longint; override;
    function Seek(Offset: Longint; Origin: Word): Longint; override;
end;

{ TByteEaterStream }

constructor TByteEaterStream.Create(TargetStream: TStream; LeadingBytesToEat, TrailingBytesToEat: Integer; StreamOwnership: TStreamOwnership=soReference);
begin
    inherited Create;

    //User requested state
    FTargetStream := TargetStream;
    FTargetStreamOwnership := StreamOwnership;
    FLeadingBytesToEat := LeadingBytesToEat;
    FTrailingBytesToEat := TrailingBytesToEat;

    //internal housekeeping
    FLeadingBytesRemaining := FLeadingBytesToEat;

    SetLength(FBuffer, FTrailingBytesToEat);
    FValidBufferLength := 0;
end;

destructor TByteEaterStream.Destroy;
begin
    if FTargetStreamOwnership = soOwned then
        FTargetStream.Free;
    FTargetStream := nil;

    inherited;
end;

procedure TByteEaterStream.Flush;
begin
    if FValidBufferLength > 0 then
    begin
        FTargetStream.Write(FBuffer[0], FValidBufferLength);
        FValidBufferLength  := 0;
    end;
end;

function TByteEaterStream.Write(const Buffer; Count: Integer): Longint;
var
    newStart: Pointer;
    totalCount: Integer;
    addIndex: Integer;
    bufferValidLength: Integer;
    bytesToWrite: Integer;
begin
    Result := Count;

    if Count = 0 then
        Exit;

    if FLeadingBytesRemaining > 0 then
    begin
        newStart := Addr(Buffer);
        Inc(Cardinal(newStart));
        Dec(Count);
        Dec(FLeadingBytesRemaining);
        Result := Self.Write(newStart^, Count)+1; //tell the upstream guy that we wrote it

        Exit;
    end;

    if FTrailingBytesToEat > 0 then
    begin
        if (Count < FTrailingBytesToEat) then
        begin
            //There's less bytes incoming than an entire buffer
            //But the buffer might overfloweth
            totalCount := FValidBufferLength+Count;

            //If it could all fit in the buffer, then let it
            if (totalCount <= FTrailingBytesToEat) then
            begin
                Move(Buffer, FBuffer[FValidBufferLength], Count);
                FValidBufferLength := totalCount;
            end
            else
            begin
                //We're going to overflow the buffer.

                //Purge from the buffer the amount that would get pushed
                FTargetStream.Write(FBuffer[0], totalCount-FTrailingBytesToEat);

                //Shuffle the buffer down (overlapped move)
                bufferValidLength := bufferValidLength - (totalCount-FTrailingBytesToEat);
                Move(FBuffer[totalCount-FTrailingBytesToEat], FBuffer[0], bufferValidLength);

                addIndex := bufferValidLength ; //where we will add the data to
                Move(Buffer, FBuffer[addIndex], Count);
            end;
        end
        else if (Count = FTrailingBytesToEat) then
        begin
            //The incoming bytes exactly fill the buffer. Flush what we have and eat the incoming amounts
            Flush;
            Move(Buffer, FBuffer[0], FTrailingBytesToEat);
            FValidBufferLength := FTrailingBytesToEat;
            Result := FTrailingBytesToEat; //we "wrote" n bytes
        end
        else
        begin
            //Count is greater than trailing buffer eat size
            Flush;

            //Write the data that definitely not to be eaten
            bytesToWrite := Count-FTrailingBytesToEat;
            FTargetStream.Write(Buffer, bytesToWrite);

            //Buffer the remainder
            newStart := Addr(Buffer);
            Inc(Cardinal(newStart), bytesToWrite);

            Move(newStart^, FBuffer[0], FTrailingBytesToEat);
            FValidBufferLength := 4;
        end;
    end;
end;

function TByteEaterStream.Seek(Offset: Integer; Origin: Word): Longint;
begin
    //what does it mean if they want to seek around when i'm supposed to be eating data?
    //i don't know; so results are, by definition, undefined. Don't use at your own risk
    Result := FTargetStream.Seek(Offset, Origin);
end;

function TByteEaterStream.Read(var Buffer; Count: Integer): Longint;
begin
    //what does it mean if they want to read back bytes when i'm supposed to be eating data?
    //i don't know; so results are, by definition, undefined. Don't use at your own risk
    Result := FTargetStream.Read({var}Buffer, Count);
end;

class procedure TByteEaterStream.SelfTest;

    procedure CheckEquals(Expected, Actual: string; Message: string);
    begin
        if Actual <> Expected then
            raise Exception.CreateFmt('TByteEaterStream self-test failed. Expected "%s", but was "%s". Message: %s', [Expected, Actual, Message]);
    end;

    procedure Test(const InputString: string; ExpectedString: string);
    var
        s: TStringStream;
        eater: TByteEaterStream;
    begin
        s := TStringStream.Create('');
        try
            eater := TByteEaterStream.Create(s, 2, 4, soReference);
            try
                eater.Write(InputString[1], Length(InputString));
            finally
                eater.Free;
            end;
            CheckEquals(ExpectedString, s.DataString, InputString);
        finally
            s.Free;
        end;
    end;
begin
    Test('1', '');
    Test('11', '');
    Test('113', '');
    Test('1133', '');
    Test('11333', '');
    Test('113333', '');
    Test('11H3333', 'H');
    Test('11He3333', 'He');
    Test('11Hel3333', 'Hel');
    Test('11Hell3333', 'Hell');
    Test('11Hello3333', 'Hello');
    Test('11Hello,3333', 'Hello,');
    Test('11Hello, 3333', 'Hello, ');
    Test('11Hello, W3333', 'Hello, W');
    Test('11Hello, Wo3333', 'Hello, Wo');
    Test('11Hello, Wor3333', 'Hello, Wor');
    Test('11Hello, Worl3333', 'Hello, Worl');
    Test('11Hello, World3333', 'Hello, World');
    Test('11Hello, World!3333', 'Hello, World!');
end;
Trisyllable answered 6/11, 2013 at 17:12 Comment(3)
Can we see your implementation?Cavafy
@DavidHeffernan i was thinking about that. i did start with your TBufferedFileStream and made a generic TBufferedStream (of which TBufferedFileStream is one drop-in replacement for TFileStream). If i had a 16k buffer it might make things easier.Trisyllable
@IanBoyd I cannot see any solution that does not involve buffering, for reasons that I outline in my answer.Stoical
S
2

You need to postpone writing until you are sure you know that the bytes to be written are not the trailing bytes that must be eaten. That observation leads you to the thought that buffering will provide a solution.

So, I'd suggest this:

  1. Use a stream adapter that uses buffering.
  2. Eating the lead bytes is easy. Just sent the first two bytes into oblivion.
  3. After that buffer the bytes to be written, and when it's time to flush, flush all but the final four bytes in the buffer.
  4. When you flush, copy the four bytes that you did not flush to the beginning of your buffer so that you don't lose them.
  5. When you close the stream, flush it, as you always would for a buffered stream. And use the same flushing technique as before, so that you hold on to the final four bytes. At this point you know that these are the final four bytes of the stream.

One requirement that the above approach demands is that your buffer must be larger in size than the number of trailing bytes to be stripped.

Stoical answered 6/11, 2013 at 18:5 Comment(5)
i was hunting through the "real" zlib documentation, and noticed the negative window size trick. It took some more digging to find an even more up-to-date version of the Delphi translation that supports it. But i am using that. However, i am a stickler that people answer the question, and not the example. (Someone who wants to eat bytes out of a flowing stream for another reason won't be helped by knowing that zlib supports omitting the gzip wrappers). But Mark's answer is what i am using now; even if it doesn't answer the question. Accepted to you.Trisyllable
Thank you Ian. I enjoyed thinking about this rather interesting problem. And well asked as always. Still can't get over your lower-case i. ;-)Stoical
What's scary to me is that all my rep these days comes from asking questions. i am getting behind in most technology. i don't use WPF, Objective-C, Ruby, Python, web-development in general. And we're stuck here at work with Delphi 5. So you Delphi XE4 guys, with your generics, and strict private and UnicodeString, my usefulness to answer questions is coming to an end. Thank god for my canonical answer on the proper use of IDisposable.Trisyllable
On the other hand, its amazing what you can wring out of Delphi 5 these days!Trisyllable
I know. Until recently we were on D6 here at work. Now we are on XE3 but visually there is no difference between the two apps. Of course, I would not care to give up generics or 64 bit!!Stoical
C
10

The entire problem can be avoided by simply asking zlib to not wrap the deflate stream. I don't see the interface to zlib in the code in the question, but somewhere there is an initialization using deflateInit() or deflateInit2(). If you use deflateInit2(), you can provide -15 instead of 15 for the windowBits parameter to ask for unwrapped deflate output.

Ceramics answered 6/11, 2013 at 19:10 Comment(1)
The seems to me to be the right solution to the underlying problem (as opposed to the answer to the question that was asked about eating bytes). For what it's worth, the Delphi Zlib code I can see exposes all the options that are needed. You can call the TZCompressionStream.Create overload that accepts a windowBits parameter and pass in -15, exactly as Mark describes. +1Stoical
S
2

You need to postpone writing until you are sure you know that the bytes to be written are not the trailing bytes that must be eaten. That observation leads you to the thought that buffering will provide a solution.

So, I'd suggest this:

  1. Use a stream adapter that uses buffering.
  2. Eating the lead bytes is easy. Just sent the first two bytes into oblivion.
  3. After that buffer the bytes to be written, and when it's time to flush, flush all but the final four bytes in the buffer.
  4. When you flush, copy the four bytes that you did not flush to the beginning of your buffer so that you don't lose them.
  5. When you close the stream, flush it, as you always would for a buffered stream. And use the same flushing technique as before, so that you hold on to the final four bytes. At this point you know that these are the final four bytes of the stream.

One requirement that the above approach demands is that your buffer must be larger in size than the number of trailing bytes to be stripped.

Stoical answered 6/11, 2013 at 18:5 Comment(5)
i was hunting through the "real" zlib documentation, and noticed the negative window size trick. It took some more digging to find an even more up-to-date version of the Delphi translation that supports it. But i am using that. However, i am a stickler that people answer the question, and not the example. (Someone who wants to eat bytes out of a flowing stream for another reason won't be helped by knowing that zlib supports omitting the gzip wrappers). But Mark's answer is what i am using now; even if it doesn't answer the question. Accepted to you.Trisyllable
Thank you Ian. I enjoyed thinking about this rather interesting problem. And well asked as always. Still can't get over your lower-case i. ;-)Stoical
What's scary to me is that all my rep these days comes from asking questions. i am getting behind in most technology. i don't use WPF, Objective-C, Ruby, Python, web-development in general. And we're stuck here at work with Delphi 5. So you Delphi XE4 guys, with your generics, and strict private and UnicodeString, my usefulness to answer questions is coming to an end. Thank god for my canonical answer on the proper use of IDisposable.Trisyllable
On the other hand, its amazing what you can wring out of Delphi 5 these days!Trisyllable
I know. Until recently we were on D6 here at work. Now we are on XE3 but visually there is no difference between the two apps. Of course, I would not care to give up generics or 64 bit!!Stoical

© 2022 - 2024 — McMap. All rights reserved.