If you need to benefit from parallel processing option then you cannot force to stop the rollback for the all processed partitions.
One of my preferred ways to solve a similar issue is processing partitions in batches; instead of processing all partitions in one operation, you can automate processing each n partition in parallel. (After many experience I found that on my machine configuring the MaxParallel
option to 10 was optimal solution).
Then if an error occured, only the current batch will rollback.
In this answer, I will try to provide a step-by-step guide to automate processing partitions in batches using SSIS.
Package overview
- Building dimensions in one batch
- Get unprocessed partitions count
- Loop over partitions (read 10 paritions each loop)
- Process Data
- Process Indexes
Package details
Creating Variables
First of all we have to add some variables that we will need in our process:
- intCount, intCurrent: to be used in the forloop container
- p_Cube: The Cube object id
- p_Database: The Analysis Database id
- p_MaxParallel: Number of partitions to be processed in one batch
- p_MeasureGroup: The Measure Group object id
- p_ServerName: Analysis Service Instance name
<Machine Name>\<Instance Name>
- strProcessData, strProcessDimensions and strProcessIndexes: Used to store XMLA queries related to processing Data, Indexes and dimensions
All variables that their names starts with p_
are required and can be added as parameters.
Adding a connection manager for Analysis Services
After adding variables, we have to create a connection manager to connect to the SQL Server Analysis Service Intance:
- First we have to configure the connection manager manually:
- Then we have to assign the Server name and Initial Catalog expression as showed in the image below:
- Rename the connection manager to
ssas
:
Processing Dimensions
First, add a Sequence Container to isolate the dimension processing within the package, then add a Script Task and an Analysis Services Processing Task:
Open the Script Task and select p_Database
, p_MaxParallel
as ReadOnly Variables and strProcessDimensions
as ReadWrite variable:
Now, Open the Script editor and use the following code:
The code is to prepare the XMLA command to process the dimensions, this XMLA query will be used in the Analysis Services Processing Task
#region Namespaces
using System;
using System.Data;
using System.Data.SqlClient;
using Microsoft.SqlServer.Dts.Runtime;
using System.Linq;
using System.Windows.Forms;
using Microsoft.AnalysisServices;
#endregion
namespace ST_00ad89f595124fa7bee9beb04b6ad3d9
{
[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
public void Main()
{
Server myServer = new Server();
string ConnStr = Dts.Connections["ssas"].ConnectionString;
myServer.Connect(ConnStr);
Database db = myServer.Databases.GetByName(Dts.Variables["p_Database"].Value.ToString());
int maxparallel = (int)Dts.Variables["p_MaxParallel"].Value;
var dimensions = db.Dimensions;
string strData;
strData = "<Batch xmlns=\"http://schemas.microsoft.com/analysisservices/2003/engine\"> \r\n <Parallel MaxParallel=\"" + maxparallel.ToString() + "\"> \r\n";
foreach (Dimension dim in dimensions)
{
strData +=
" <Process xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:ddl2=\"http://schemas.microsoft.com/analysisservices/2003/engine/2\" xmlns:ddl2_2=\"http://schemas.microsoft.com/analysisservices/2003/engine/2/2\" xmlns:ddl100_100=\"http://schemas.microsoft.com/analysisservices/2008/engine/100/100\" xmlns:ddl200=\"http://schemas.microsoft.com/analysisservices/2010/engine/200\" xmlns:ddl200_200=\"http://schemas.microsoft.com/analysisservices/2010/engine/200/200\" xmlns:ddl300=\"http://schemas.microsoft.com/analysisservices/2011/engine/300\" xmlns:ddl300_300=\"http://schemas.microsoft.com/analysisservices/2011/engine/300/300\" xmlns:ddl400=\"http://schemas.microsoft.com/analysisservices/2012/engine/400\" xmlns:ddl400_400=\"http://schemas.microsoft.com/analysisservices/2012/engine/400/400\"> \r\n" +
" <Object> \r\n" +
" <DatabaseID>" + db.ID + "</DatabaseID> \r\n" +
" <DimensionID>" + dim.ID + "</DimensionID> \r\n" +
" </Object> \r\n" +
" <Type>ProcessFull</Type> \r\n" +
" <WriteBackTableCreation>UseExisting</WriteBackTableCreation> \r\n" +
" </Process> \r\n";
}
//}
strData += " </Parallel> \r\n</Batch>";
Dts.Variables["strProcessDimensions"].Value = strData;
Dts.TaskResult = (int)ScriptResults.Success;
}
#region ScriptResults declaration
enum ScriptResults
{
Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
};
#endregion
}
}
Now, Open the Analysis Services Processing Task and define any task manually, then Go To expression and assign the strProcessDimensions
variable to ProcessingCommands
property:
Get the unprocessed partitions count
In order to loop over partitions in chunks we have first to get the unprocessed partitions count. To do that, you have to add a Script Task. Select p_Cube
, p_Database
, p_MeasureGroup
, p_ServerName
variables as ReadOnly Variables and intCount
as ReadWrite variable.
Inside the Script Editor write the following script:
#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using Microsoft.AnalysisServices;
using System.Linq;
#endregion
namespace ST_e3da217e491640eca297900d57f46a85
{
[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
public void Main()
{
// TODO: Add your code here
Server myServer = new Server();
string ConnStr = Dts.Connections["ssas"].ConnectionString;
myServer.Connect(ConnStr);
Database db = myServer.Databases.GetByName(Dts.Variables["p_Database"].Value.ToString());
Cube objCube = db.Cubes.FindByName(Dts.Variables["p_Cube"].Value.ToString());
MeasureGroup objMeasureGroup = objCube.MeasureGroups[Dts.Variables["p_MeasureGroup"].Value.ToString()];
Dts.Variables["intCount"].Value = objMeasureGroup.Partitions.Cast<Partition>().Where(x => x.State != AnalysisState.Processed).Count();
Dts.TaskResult = (int)ScriptResults.Success;
}
#region ScriptResults declaration
enum ScriptResults
{
Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
};
#endregion
}
}
Process Partitions in chunks
Last step is to create a Forloop container and configure it as shown in the image below:
- InitExpression: @intCurrent = 0
- EvalExpression: @intCurrent < @intCount
- AssignExpression = @intCurrent + @p_MaxParallel
Inside the For Loop container add a Script Task to prepare XMLA queries and add two Analysis Services Processing Task as shown in the image below:
In the Script Task, select p_Cube
, p_Database
, p_MaxParallel
, p_MeasureGroup
as ReadOnly Variables, and select strProcessData
, strProcessIndexes
as ReadWrite Variables.
In the script editor write the following script:
The Script is to prepare the XMLA commands needed to process the partitions Data and Indexes separately
#region Namespaces
using System;
using System.Data;
using System.Data.SqlClient;
using Microsoft.SqlServer.Dts.Runtime;
using System.Linq;
using System.Windows.Forms;
using Microsoft.AnalysisServices;
#endregion
namespace ST_00ad89f595124fa7bee9beb04b6ad3d9
{
[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
public void Main()
{
Server myServer = new Server();
string ConnStr = Dts.Connections["ssas"].ConnectionString;
myServer.Connect(ConnStr);
Database db = myServer.Databases.GetByName(Dts.Variables["p_Database"].Value.ToString());
Cube objCube = db.Cubes.FindByName(Dts.Variables["p_Cube"].Value.ToString());
MeasureGroup objMeasureGroup = objCube.MeasureGroups[Dts.Variables["p_MeasureGroup"].Value.ToString()];
int maxparallel = (int)Dts.Variables["p_MaxParallel"].Value;
int intcount = objMeasureGroup.Partitions.Cast<Partition>().Where(x => x.State != AnalysisState.Processed).Count();
if (intcount > maxparallel)
{
intcount = maxparallel;
}
var partitions = objMeasureGroup.Partitions.Cast<Partition>().Where(x => x.State != AnalysisState.Processed).OrderBy(y => y.Name).Take(intcount);
string strData, strIndexes;
strData = "<Batch xmlns=\"http://schemas.microsoft.com/analysisservices/2003/engine\"> \r\n <Parallel MaxParallel=\"" + maxparallel.ToString() + "\"> \r\n";
strIndexes = "<Batch xmlns=\"http://schemas.microsoft.com/analysisservices/2003/engine\"> \r\n <Parallel MaxParallel=\"" + maxparallel.ToString() + "\"> \r\n";
string SQLConnStr = Dts.Variables["User::p_DatabaseConnection"].Value.ToString();
foreach (Partition prt in partitions)
{
strData +=
" <Process xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:ddl2=\"http://schemas.microsoft.com/analysisservices/2003/engine/2\" xmlns:ddl2_2=\"http://schemas.microsoft.com/analysisservices/2003/engine/2/2\" xmlns:ddl100_100=\"http://schemas.microsoft.com/analysisservices/2008/engine/100/100\" xmlns:ddl200=\"http://schemas.microsoft.com/analysisservices/2010/engine/200\" xmlns:ddl200_200=\"http://schemas.microsoft.com/analysisservices/2010/engine/200/200\" xmlns:ddl300=\"http://schemas.microsoft.com/analysisservices/2011/engine/300\" xmlns:ddl300_300=\"http://schemas.microsoft.com/analysisservices/2011/engine/300/300\" xmlns:ddl400=\"http://schemas.microsoft.com/analysisservices/2012/engine/400\" xmlns:ddl400_400=\"http://schemas.microsoft.com/analysisservices/2012/engine/400/400\"> \r\n " +
" <Object> \r\n " +
" <DatabaseID>" + db.Name + "</DatabaseID> \r\n " +
" <CubeID>" + objCube.ID + "</CubeID> \r\n " +
" <MeasureGroupID>" + objMeasureGroup.ID + "</MeasureGroupID> \r\n " +
" <PartitionID>" + prt.ID + "</PartitionID> \r\n " +
" </Object> \r\n " +
" <Type>ProcessData</Type> \r\n " +
" <WriteBackTableCreation>UseExisting</WriteBackTableCreation> \r\n " +
" </Process> \r\n";
strIndexes +=
" <Process xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:ddl2=\"http://schemas.microsoft.com/analysisservices/2003/engine/2\" xmlns:ddl2_2=\"http://schemas.microsoft.com/analysisservices/2003/engine/2/2\" xmlns:ddl100_100=\"http://schemas.microsoft.com/analysisservices/2008/engine/100/100\" xmlns:ddl200=\"http://schemas.microsoft.com/analysisservices/2010/engine/200\" xmlns:ddl200_200=\"http://schemas.microsoft.com/analysisservices/2010/engine/200/200\" xmlns:ddl300=\"http://schemas.microsoft.com/analysisservices/2011/engine/300\" xmlns:ddl300_300=\"http://schemas.microsoft.com/analysisservices/2011/engine/300/300\" xmlns:ddl400=\"http://schemas.microsoft.com/analysisservices/2012/engine/400\" xmlns:ddl400_400=\"http://schemas.microsoft.com/analysisservices/2012/engine/400/400\"> \r\n " +
" <Object> \r\n " +
" <DatabaseID>" + db.Name + "</DatabaseID> \r\n " +
" <CubeID>" + objCube.ID + "</CubeID> \r\n " +
" <MeasureGroupID>" + objMeasureGroup.ID + "</MeasureGroupID> \r\n " +
" <PartitionID>" + prt.ID + "</PartitionID> \r\n " +
" </Object> \r\n " +
" <Type>ProcessIndexes</Type> \r\n " +
" <WriteBackTableCreation>UseExisting</WriteBackTableCreation> \r\n " +
" </Process> \r\n";
}
strData += " </Parallel> \r\n</Batch>";
strIndexes += " </Parallel> \r\n</Batch>";
Dts.Variables["strProcessData"].Value = strData;
Dts.Variables["strProcessIndexes"].Value = strIndexes;
Dts.TaskResult = (int)ScriptResults.Success;
}
#region ScriptResults declaration
enum ScriptResults
{
Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
};
#endregion
}
}
Now Open both Analysis Services Processing Task and define any task manually (just to validate the task). Then Go to expression and assign the strProcessData
variable to ProcessingCommands
property in the First Task and strProcessIndexes
variable to ProcessingCommands
.
Now you can execute the package, if an error occurred only the current batch will rollback (10 partitions).
Possible improvement
You can add some logging tasks to track the package progress especially if you are dealing with a huge number of partitions.
Since it contains helpful details, I posted this answer on my personal Blog:
Also I published an article with more details on SQLShack:
Parallel.ForEach
. Getting it all to work with AMO and SSIS may be a bit more challenging due to how AMO manages connections; you need to make sure the commands don't share connections. Generating XMLA yourself and using ADOMD may be simpler. – RanunculaceousParallel.Foreach
while the user can process partitions in batches? – Knowlton