Compressing row set using CLR and GZIP
Asked Answered
S

2

8

I want to compress some large table containing historical data that is rarely or not read at all. I have first try to use the build-in compressions (row, page, column stored, column-stored archive) but neither of them can compressed out-of-row values (varchar(max),nvarchar(max)) and finally end-up trying to use CLR solution.

The SQL Server Compressed Rowset Sample solution is compressing the whole row set returned by a given query using user-defined CLR type.

For, example:

CREATE TABLE Archive
(
     [Date] DATETIME2 DEFAULT(GETUTCDATE())
    ,[Data] [dbo].[CompressedRowset]
)

INSERT INTO Archive([Data])
SELECT [dbo].[CompressQueryResults]('SELECT * FROM [dbo].[A]')

It is working, but I met the following issues:

  • when I try to compress a large result row set, I am getting the following error:

    Msg 0, Level 11, State 0, Line 0 A severe error occurred on the current command. The results, if any, should be discarded.

    Also, the following statement is working:

    SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]')
    

    but these ones are not:

    INSERT INTO Archive
    SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]'
    
    DECLARE @A [dbo].[CompressedRowset]
    SELECT @A = [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]')
    
  • in order to compress a row set the t-sql type should be mapped to .net type; unfortunately, this is not true for all sql types - Mapping CLR Parameter Data; I have already expand the following function to handle more types, but how to handle types like geography for example:

    static SqlDbType ToSqlType(Type t){
        if (t == typeof(int)){
            return SqlDbType.Int;
        }
    
        ...
    
        if (t == typeof(Byte[])){
            return SqlDbType.VarBinary;
        } else {
            throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion");
        }
    }
    

Here is the whole .net code:

using System;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using Microsoft.SqlServer.Server;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO.Compression;
using System.Xml.Serialization;
using System.Xml;

[Serializable]
[Microsoft.SqlServer.Server.SqlUserDefinedType
    (
        Format.UserDefined
        ,IsByteOrdered = false
        ,IsFixedLength = false
        ,MaxByteSize = -1
    )
]
public struct CompressedRowset : INullable, IBinarySerialize, IXmlSerializable
{
    DataTable rowset;

    public DataTable Data
    {
        get { return this.rowset; }
        set { this.rowset = value; }
    }

    public override string ToString()
    {
        using (var sw = new StringWriter())
        using (var xw = new XmlTextWriter(sw))
        {
            WriteXml(xw);
            xw.Flush();
            sw.Flush();
            return sw.ToString();
        }
    }

    public bool IsNull
    {
        get { return (this.rowset == null);}
    }

    public static CompressedRowset Null
    {
        get
        {
            CompressedRowset h = new CompressedRowset();
            return h;
        }
    }

    public static CompressedRowset Parse(SqlString s)
    {
        using (var sr = new StringReader(s.Value))
        using (var xr = new XmlTextReader(sr))
        {
            var c = new CompressedRowset();
            c.ReadXml(xr);
            return c;
        }
    }


    #region "Stream Wrappers"
    abstract class WrapperStream : Stream
    {
        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return false; }
        }

        public override void Flush()
        {

        }

        public override long Length
        {
            get { throw new NotImplementedException(); }
        }

        public override long Position
        {
            get
            {
                throw new NotImplementedException();
            }
            set
            {
                throw new NotImplementedException();
            }
        }


        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotImplementedException();
        }

        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }


    }

    class BinaryWriterStream : WrapperStream
    {
        BinaryWriter br;
        public BinaryWriterStream(BinaryWriter br)
        {
            this.br = br;
        }
        public override bool CanRead
        {
            get { return false; }
        }
        public override bool CanWrite
        {
            get { return true; }
        }
        public override int Read(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }
        public override void Write(byte[] buffer, int offset, int count)
        {
            br.Write(buffer, offset, count);
        }
    }

    class BinaryReaderStream : WrapperStream
    {
        BinaryReader br;
        public BinaryReaderStream(BinaryReader br)
        {
            this.br = br;
        }
        public override bool CanRead
        {
            get { return true; }
        }
        public override bool CanWrite
        {
            get { return false; }
        }
        public override int Read(byte[] buffer, int offset, int count)
        {
            return br.Read(buffer, offset, count);
        }
        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }
    }
    #endregion

    #region "IBinarySerialize"
    public void Read(System.IO.BinaryReader r)
    {
        using (var rs = new BinaryReaderStream(r))
        using (var cs = new GZipStream(rs, CompressionMode.Decompress))
        {
            var ser = new BinaryFormatter();
            this.rowset = (DataTable)ser.Deserialize(cs);
        }
    }
    public void Write(System.IO.BinaryWriter w)
    {
        if (this.IsNull)
            return;

        rowset.RemotingFormat = SerializationFormat.Binary;
        var ser = new BinaryFormatter();
        using (var binaryWriterStream = new BinaryWriterStream(w))
        using (var compressionStream = new GZipStream(binaryWriterStream, CompressionMode.Compress))
        {
            ser.Serialize(compressionStream, rowset);
        }

    }

    #endregion

    /// <summary>
    /// This procedure takes an arbitrary query, runs it and compresses the results into a varbinary(max) blob.
    /// If the query has a large result set, then this procedure will use a large amount of memory to buffer the results in 
    /// a DataTable, and more to copy it into a compressed buffer to return. 
    /// </summary>
    /// <param name="query"></param>
    /// <param name="results"></param>
    //[Microsoft.SqlServer.Server.SqlProcedure]
    [SqlFunction(DataAccess = DataAccessKind.Read, SystemDataAccess = SystemDataAccessKind.Read, IsDeterministic = false, IsPrecise = false)]
    public static CompressedRowset CompressQueryResults(string query)
    {
        //open a context connection
        using (var con = new SqlConnection("Context Connection=true"))
        {
            con.Open();
            var cmd = new SqlCommand(query, con);
            var dt = new DataTable();
            using (var rdr = cmd.ExecuteReader())
            {
                dt.Load(rdr);
            }
            //configure the DataTable for binary serialization
            dt.RemotingFormat = SerializationFormat.Binary;
            var bf = new BinaryFormatter();

            var cdt = new CompressedRowset();
            cdt.rowset = dt;
            return cdt;


        }
    }

    /// <summary>
    /// partial Type mapping between SQL and .NET
    /// </summary>
    /// <param name="t"></param>
    /// <returns></returns>
    static SqlDbType ToSqlType(Type t)
    {
        if (t == typeof(int))
        {
            return SqlDbType.Int;
        }
        if (t == typeof(string))
        {
            return SqlDbType.NVarChar;
        }
        if (t == typeof(Boolean))
        {
            return SqlDbType.Bit;
        }
        if (t == typeof(decimal))
        {
            return SqlDbType.Decimal;
        }
        if (t == typeof(float))
        {
            return SqlDbType.Real;
        }
        if (t == typeof(double))
        {
            return SqlDbType.Float;
        }
        if (t == typeof(DateTime))
        {
            return SqlDbType.DateTime;
        }
        if (t == typeof(Int64))
        {
            return SqlDbType.BigInt;
        }
        if (t == typeof(Int16))
        {
            return SqlDbType.SmallInt;
        }
        if (t == typeof(byte))
        {
            return SqlDbType.TinyInt;
        }
        if ( t == typeof(Guid))
        {
            return SqlDbType.UniqueIdentifier;
        }
        //!!!!!!!!!!!!!!!!!!!
        if (t == typeof(Byte[]))
        {
            return SqlDbType.VarBinary;
        }   
        else
        {
            throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion");
        }

    }

    /// <summary>
    /// This stored procedure takes a compressed DataTable and returns it as a resultset to the clinet
    /// or into a table using exec .... into ...
    /// </summary>
    /// <param name="results"></param>
    [Microsoft.SqlServer.Server.SqlProcedure]
    public static void UnCompressRowset(CompressedRowset results)
    {
        if (results.IsNull)
            return;

        DataTable dt = results.rowset;
        var fields = new SqlMetaData[dt.Columns.Count];
        for (int i = 0; i < dt.Columns.Count; i++)
        {
            var col = dt.Columns[i];
            var sqlType = ToSqlType(col.DataType);
            var colName = col.ColumnName;
            if (sqlType == SqlDbType.NVarChar || sqlType == SqlDbType.VarBinary)
            {
                fields[i] = new SqlMetaData(colName, sqlType, col.MaxLength);
            }
            else
            {
                fields[i] = new SqlMetaData(colName, sqlType);
            }
        }
        var record = new SqlDataRecord(fields);

        SqlContext.Pipe.SendResultsStart(record);
        foreach (DataRow row in dt.Rows)
        {
            record.SetValues(row.ItemArray);
            SqlContext.Pipe.SendResultsRow(record);
        }
        SqlContext.Pipe.SendResultsEnd();

    }

    public System.Xml.Schema.XmlSchema GetSchema()
    {
        return null;
    }

    public void ReadXml(System.Xml.XmlReader reader)
    {
        if (rowset != null)
        {
            throw new InvalidOperationException("rowset already read");
        }
        var ser = new XmlSerializer(typeof(DataTable));
        rowset = (DataTable)ser.Deserialize(reader);
    }

    public void WriteXml(System.Xml.XmlWriter writer)
    {
        if (String.IsNullOrEmpty(rowset.TableName))
            rowset.TableName = "Rows";

        var ser = new XmlSerializer(typeof(DataTable));
        ser.Serialize(writer, rowset);
    }
}

and here is the SQL objects creation:

CREATE TYPE [dbo].[CompressedRowset]
     EXTERNAL NAME [CompressedRowset].[CompressedRowset];

GO

CREATE FUNCTION [dbo].[CompressQueryResults] (@query [nvarchar](4000))
RETURNS [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[CompressQueryResults];

GO

CREATE PROCEDURE [dbo].[UnCompressRowset] @results [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[UnCompressRowset];

GO
Stucker answered 22/1, 2015 at 9:48 Comment(10)
Serialization produces quite big output and the built-in GZIpStream compresses poorly. No wonder the results are not exactly outstanding. I'd try to serialize object[]'s as rows to at least get rid of the DataTable.Reenter
I guess the idea in this example is to compress row set in order to have more information to compress (from here better compression ratio), right? Also, I can't find any GZip alterntaives, could you suggest such?Stucker
I cannot close this question because it has an open bounty, but I'd close it with this reason Too broad: there are either too many possible answers, or good answers would be too long for this format. Please add details to narrow the answer set or to isolate an issue that can be answered in a few paragraphs. So please, make more concrete questions. I have answer for some of the topics in this huge, unordered, sample of code, opinions, worries... Pleas, improve it.Jurkoic
Hard drives are cheap. Buy a bigger one instead.Yellow
@Yellow That's simply cannot be solution - the backups are bigger, the always on is working with more data and others. It is really annoying that here is no build in compression for large objects in SQL Server.Stucker
How large are these rowsets that you are trying to compress?Aurar
@Aurar is ~ 1.6 GB. Is the size the issue? If it is, I can use small chunks, but I need to know what is the maximum size I can compress or is there a setting I can use to increase/decrease it. Also, the impact against the other database activity is not critical.Stucker
The stated limit is 2GB. Unfortunately I am not sure how XmlSerialization might play into that, but if it does, a 1.6GB input could easily go over that, even with compression. Try setting the limit to something like 10MB and see if the problem goes away.Aurar
Also, it's critically important to come up with someway to trace/debug this code, or at the very least get better error messages. Unfortunately, I haven't worked with SQLCLR in a while, and I cannot remember how I used to do that. you might try at dba.stackexchange.com for help on this kind of thing.Aurar
1) Reduce the size of the result set as 1.6 GB, if the raw data size, is too large. Whatever result set is being compressed will take up the full size in memory, plus some overhead for the DataTable, plus more. Read the comment above the CompressQueryResults method; 2) look at the SQL Server logs to see if there is more detail there on the error; 3) I don't think that code from MS is "shippable" as it doesn't correctly handle: decimal, datetime2, datetimeoffset, time, float, and real; 4) why not just use the GZIP function you were working on for LOB types along with Row/Page compression?Cyclamate
R
1

Probably too late for the original question, but this might be worth considering for others stumbling by: in SQL Server 2016 there are compression and decompression functions (see here and here) which might be useful here if the data you are trying to archive contains large values in [N]VARCHAR and VARBINARY columns.

You'd need to bake this into your business logic layer or produce some arrangement in SQL Server whereby you replicate your uncompressed table as a view onto a backing table (where the compressed values are) and deriving the uncompressed data via DECOMPRESS and having INSTEAD OF triggers that update the backing table (so the view behaves like the original table for select/insert/update/delete aside from performance differences). A bit hacky, but it would work...

For older SQL versions you could probably write a CLR function to do the job too.

This method obviously won't work for data sets that are composed of small fields of course, this style of compression simply won't achieve anything on small values (in fact it will make them larger).

Renin answered 18/5, 2016 at 11:19 Comment(0)
K
1

Have you instead considered creating a new 'Archive' database (perhaps set to simple recovery model), where you dump all your old data? That could easily be accessed in queries so no pain there e.g.

SELECT * FROM archive..olddata

When you create the db, place it on another disk, and handle it differently in your backup procedure - perhaps you do the archiving procedure once a week, then that only needs to be backuped after that - and after you've squashed it to almost zero with 7zip/rar.

Don't try to compress the db using NTFS-compression though, SQL server doesn't support it.

Kurtz answered 29/1, 2015 at 12:2 Comment(0)
R
1

Probably too late for the original question, but this might be worth considering for others stumbling by: in SQL Server 2016 there are compression and decompression functions (see here and here) which might be useful here if the data you are trying to archive contains large values in [N]VARCHAR and VARBINARY columns.

You'd need to bake this into your business logic layer or produce some arrangement in SQL Server whereby you replicate your uncompressed table as a view onto a backing table (where the compressed values are) and deriving the uncompressed data via DECOMPRESS and having INSTEAD OF triggers that update the backing table (so the view behaves like the original table for select/insert/update/delete aside from performance differences). A bit hacky, but it would work...

For older SQL versions you could probably write a CLR function to do the job too.

This method obviously won't work for data sets that are composed of small fields of course, this style of compression simply won't achieve anything on small values (in fact it will make them larger).

Renin answered 18/5, 2016 at 11:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.