I want to compress some large table containing historical data that is rarely or not read at all. I have first try to use the build-in compressions (row
, page
, column stored
, column-stored archive
) but neither of them can compressed out-of-row values (varchar(max)
,nvarchar(max)
) and finally end-up trying to use CLR
solution.
The SQL Server Compressed Rowset Sample solution is compressing the whole row set returned by a given query using user-defined CLR
type.
For, example:
CREATE TABLE Archive
(
[Date] DATETIME2 DEFAULT(GETUTCDATE())
,[Data] [dbo].[CompressedRowset]
)
INSERT INTO Archive([Data])
SELECT [dbo].[CompressQueryResults]('SELECT * FROM [dbo].[A]')
It is working, but I met the following issues:
when I try to compress a large result row set, I am getting the following error:
Msg 0, Level 11, State 0, Line 0 A severe error occurred on the current command. The results, if any, should be discarded.
Also, the following statement is working:
SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]')
but these ones are not:
INSERT INTO Archive SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]' DECLARE @A [dbo].[CompressedRowset] SELECT @A = [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]')
in order to compress a row set the
t-sql type
should be mapped to.net type
; unfortunately, this is not true for all sql types - Mapping CLR Parameter Data; I have already expand the following function to handle more types, but how to handle types likegeography
for example:static SqlDbType ToSqlType(Type t){ if (t == typeof(int)){ return SqlDbType.Int; } ... if (t == typeof(Byte[])){ return SqlDbType.VarBinary; } else { throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion"); } }
Here is the whole .net
code:
using System;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using Microsoft.SqlServer.Server;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO.Compression;
using System.Xml.Serialization;
using System.Xml;
[Serializable]
[Microsoft.SqlServer.Server.SqlUserDefinedType
(
Format.UserDefined
,IsByteOrdered = false
,IsFixedLength = false
,MaxByteSize = -1
)
]
public struct CompressedRowset : INullable, IBinarySerialize, IXmlSerializable
{
DataTable rowset;
public DataTable Data
{
get { return this.rowset; }
set { this.rowset = value; }
}
public override string ToString()
{
using (var sw = new StringWriter())
using (var xw = new XmlTextWriter(sw))
{
WriteXml(xw);
xw.Flush();
sw.Flush();
return sw.ToString();
}
}
public bool IsNull
{
get { return (this.rowset == null);}
}
public static CompressedRowset Null
{
get
{
CompressedRowset h = new CompressedRowset();
return h;
}
}
public static CompressedRowset Parse(SqlString s)
{
using (var sr = new StringReader(s.Value))
using (var xr = new XmlTextReader(sr))
{
var c = new CompressedRowset();
c.ReadXml(xr);
return c;
}
}
#region "Stream Wrappers"
abstract class WrapperStream : Stream
{
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return false; }
}
public override void Flush()
{
}
public override long Length
{
get { throw new NotImplementedException(); }
}
public override long Position
{
get
{
throw new NotImplementedException();
}
set
{
throw new NotImplementedException();
}
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
}
class BinaryWriterStream : WrapperStream
{
BinaryWriter br;
public BinaryWriterStream(BinaryWriter br)
{
this.br = br;
}
public override bool CanRead
{
get { return false; }
}
public override bool CanWrite
{
get { return true; }
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
br.Write(buffer, offset, count);
}
}
class BinaryReaderStream : WrapperStream
{
BinaryReader br;
public BinaryReaderStream(BinaryReader br)
{
this.br = br;
}
public override bool CanRead
{
get { return true; }
}
public override bool CanWrite
{
get { return false; }
}
public override int Read(byte[] buffer, int offset, int count)
{
return br.Read(buffer, offset, count);
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
}
#endregion
#region "IBinarySerialize"
public void Read(System.IO.BinaryReader r)
{
using (var rs = new BinaryReaderStream(r))
using (var cs = new GZipStream(rs, CompressionMode.Decompress))
{
var ser = new BinaryFormatter();
this.rowset = (DataTable)ser.Deserialize(cs);
}
}
public void Write(System.IO.BinaryWriter w)
{
if (this.IsNull)
return;
rowset.RemotingFormat = SerializationFormat.Binary;
var ser = new BinaryFormatter();
using (var binaryWriterStream = new BinaryWriterStream(w))
using (var compressionStream = new GZipStream(binaryWriterStream, CompressionMode.Compress))
{
ser.Serialize(compressionStream, rowset);
}
}
#endregion
/// <summary>
/// This procedure takes an arbitrary query, runs it and compresses the results into a varbinary(max) blob.
/// If the query has a large result set, then this procedure will use a large amount of memory to buffer the results in
/// a DataTable, and more to copy it into a compressed buffer to return.
/// </summary>
/// <param name="query"></param>
/// <param name="results"></param>
//[Microsoft.SqlServer.Server.SqlProcedure]
[SqlFunction(DataAccess = DataAccessKind.Read, SystemDataAccess = SystemDataAccessKind.Read, IsDeterministic = false, IsPrecise = false)]
public static CompressedRowset CompressQueryResults(string query)
{
//open a context connection
using (var con = new SqlConnection("Context Connection=true"))
{
con.Open();
var cmd = new SqlCommand(query, con);
var dt = new DataTable();
using (var rdr = cmd.ExecuteReader())
{
dt.Load(rdr);
}
//configure the DataTable for binary serialization
dt.RemotingFormat = SerializationFormat.Binary;
var bf = new BinaryFormatter();
var cdt = new CompressedRowset();
cdt.rowset = dt;
return cdt;
}
}
/// <summary>
/// partial Type mapping between SQL and .NET
/// </summary>
/// <param name="t"></param>
/// <returns></returns>
static SqlDbType ToSqlType(Type t)
{
if (t == typeof(int))
{
return SqlDbType.Int;
}
if (t == typeof(string))
{
return SqlDbType.NVarChar;
}
if (t == typeof(Boolean))
{
return SqlDbType.Bit;
}
if (t == typeof(decimal))
{
return SqlDbType.Decimal;
}
if (t == typeof(float))
{
return SqlDbType.Real;
}
if (t == typeof(double))
{
return SqlDbType.Float;
}
if (t == typeof(DateTime))
{
return SqlDbType.DateTime;
}
if (t == typeof(Int64))
{
return SqlDbType.BigInt;
}
if (t == typeof(Int16))
{
return SqlDbType.SmallInt;
}
if (t == typeof(byte))
{
return SqlDbType.TinyInt;
}
if ( t == typeof(Guid))
{
return SqlDbType.UniqueIdentifier;
}
//!!!!!!!!!!!!!!!!!!!
if (t == typeof(Byte[]))
{
return SqlDbType.VarBinary;
}
else
{
throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion");
}
}
/// <summary>
/// This stored procedure takes a compressed DataTable and returns it as a resultset to the clinet
/// or into a table using exec .... into ...
/// </summary>
/// <param name="results"></param>
[Microsoft.SqlServer.Server.SqlProcedure]
public static void UnCompressRowset(CompressedRowset results)
{
if (results.IsNull)
return;
DataTable dt = results.rowset;
var fields = new SqlMetaData[dt.Columns.Count];
for (int i = 0; i < dt.Columns.Count; i++)
{
var col = dt.Columns[i];
var sqlType = ToSqlType(col.DataType);
var colName = col.ColumnName;
if (sqlType == SqlDbType.NVarChar || sqlType == SqlDbType.VarBinary)
{
fields[i] = new SqlMetaData(colName, sqlType, col.MaxLength);
}
else
{
fields[i] = new SqlMetaData(colName, sqlType);
}
}
var record = new SqlDataRecord(fields);
SqlContext.Pipe.SendResultsStart(record);
foreach (DataRow row in dt.Rows)
{
record.SetValues(row.ItemArray);
SqlContext.Pipe.SendResultsRow(record);
}
SqlContext.Pipe.SendResultsEnd();
}
public System.Xml.Schema.XmlSchema GetSchema()
{
return null;
}
public void ReadXml(System.Xml.XmlReader reader)
{
if (rowset != null)
{
throw new InvalidOperationException("rowset already read");
}
var ser = new XmlSerializer(typeof(DataTable));
rowset = (DataTable)ser.Deserialize(reader);
}
public void WriteXml(System.Xml.XmlWriter writer)
{
if (String.IsNullOrEmpty(rowset.TableName))
rowset.TableName = "Rows";
var ser = new XmlSerializer(typeof(DataTable));
ser.Serialize(writer, rowset);
}
}
and here is the SQL objects creation:
CREATE TYPE [dbo].[CompressedRowset]
EXTERNAL NAME [CompressedRowset].[CompressedRowset];
GO
CREATE FUNCTION [dbo].[CompressQueryResults] (@query [nvarchar](4000))
RETURNS [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[CompressQueryResults];
GO
CREATE PROCEDURE [dbo].[UnCompressRowset] @results [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[UnCompressRowset];
GO
DataTable
, plus more. Read the comment above theCompressQueryResults
method; 2) look at the SQL Server logs to see if there is more detail there on the error; 3) I don't think that code from MS is "shippable" as it doesn't correctly handle: decimal, datetime2, datetimeoffset, time, float, and real; 4) why not just use the GZIP function you were working on for LOB types along with Row/Page compression? – Cyclamate