Azure TableQuery thread safety with Parallel.ForEach
Asked Answered
R

1

8

I have some basic Azure tables that I've been querying serially:

var query = new TableQuery<DynamicTableEntity>()
  .Where(TableQuery.GenerateFilterCondition("PartitionKey",
    QueryComparisons.Equal, myPartitionKey));

foreach (DynamicTableEntity entity in myTable.ExecuteQuery(query)) {
  // Process entity here.
}

To speed this up, I parallelized this like so:

Parallel.ForEach(myTable.ExecuteQuery(query), (entity, loopState) => {
  // Process entity here in a thread-safe manner.

  // Edited to add: Details of the loop body below:

  // This is the essence of the fixed loop body:
  lock (myLock) {
    DataRow myRow = myDataTable.NewRow();
    // [Add entity data to myRow.]
    myDataTable.Rows.Add(myRow);
  }

  // Old code (apparently not thread-safe, though NewRow() is supposed to create
  // a DataRow based on the table's schema without changing the table state):
  /*
    DataRow myRow = myDataTable.NewRow();
    lock (myLock) {
      // [Add entity data to myRow.]
      myDataTable.Rows.Add(myRow);
    }
  */
});

This produces significant speedup, but the results tend to be slightly different between runs (i.e., some of the entities differ occasionally, though the number of entities returned is exactly the same).

From this and some web searching, I conclude that the enumerator above is not always thread-safe. The documentation appears to suggest that thread safety is guaranteed only if the table objects are public static, but that hasn't made a difference for me.

Could someone suggest how to resolve this? Is there a standard pattern for parallelizing Azure table queries?

Rawlinson answered 5/10, 2014 at 6:37 Comment(5)
The enumerator doesn't have to be thread-safe, Parallel.ForEach() can handle that. A problem could be if the entities shared some state.Rn
Can you clarify what slightly different results means? If you log all entities in Parallel.ForEach, are you getting same set of entities in different order?Splashy
I've sorted the entities to determine the exact differences, and the sets are nearly identical. However, occasionally one specific entity is missing while another one was duplicated (as compared to the results I'm getting serially, which are always identical and presumably comprise the ground truth for the table contents). That seems to be the general pattern -- some entities are missing, but others are duplicated to keep the entity count the same. It's almost as though some index is not incremented in a thread-safe manner, leading to a race condition when reading the entities from memory.Rawlinson
Did you try taking ExecuteQuery call out of the picture by doing an ExecuteQuery().ToList() first and then passing that list in as the source to Parallel.ForEach? Does that fix the problem? If not, could you please share more details about the loop body?Ledger
I haven't tried that yet, but a slight change to my loop body seems to have resolved the issue. Basically, I moved a DataTable.NewRow() call inside my critical section. I don't see why this is necessary, since that call is supposed only to create a new row based on the table's schema without affecting any table state (.NET DataTable, not Azure table). Thus, I'm not sure the problem is truly solved, but the code has always worked thus far.Rawlinson
T
4

Your comment is correct: DataTable is not suitable for concurrent operations involving mutation and is the source of the duplicate entries. Locking the DataTable object for row modification operations will resolve the issue:

 lock (myTable)
 {
    DataRow myRow = myTable.NewRow();
    myRow.SetField<int>("c1", (int)value);
    myTable.Rows.Add(myRow);
 }

Putting NewRow() outside the lock will intermittently result in duplicate row entries in the table or "An unhandled exception of type 'System.ArgumentException' occurred in System.Data.dll" exceptions on the NewRow() line. For additional details and alternatives for concurrent DataTable usage see Thread safety for DataTable

To reproduce the error condition, use this code. Some runs will be clean, some will contain duplicate entries, and some will encounter exceptions.

   class Program
   {
      static DataTable myTable = GetTable();
      static ManualResetEvent waitHandle = new ManualResetEvent(false);

      static void Main(string[] args)
      {
         const int threadCount = 10;
         List<Thread> threads = new List<System.Threading.Thread>();
         for (int i = 0; i < threadCount; ++i) 
         {
            threads.Add(new Thread(new ParameterizedThreadStart(AddRowThread)));
            threads[i].Start(i);
         }
         waitHandle.Set(); // Release all the threads at once
         for (int i = 0; i < threadCount; ++i) 
         {
            threads[i].Join();
         }

         // Print results once threads return
         for (int i = 0; i < myTable.Rows.Count; ++i)
         {
            Console.WriteLine(myTable.Rows[i].Field<int>(0));
         }
         Console.WriteLine("---Processing Complete---");
         Console.ReadKey();
      }

      static void AddRowThread(object value)
      {
         waitHandle.WaitOne();
         DataRow myRow = myTable.NewRow(); // THIS RESULTS IN INTERMITTENT ERRORS
         lock (myTable)
         {
            //DataRow myRow = myTable.NewRow(); // MOVE NewRow() CALL HERE TO RESOLVE ISSUE
            myRow.SetField<int>("c1", (int)value);
            myTable.Rows.Add(myRow);
         }
      }

      static DataTable GetTable()
      {
         // Here we create a DataTable with four columns.
         DataTable table = new DataTable();
         table.Columns.Add("c1", typeof(int));       
         return table;
      }
   }
Terylene answered 10/7, 2015 at 20:51 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.