ConcurrentBag<T> getting duplicates (seems not to be thread safe)
Asked Answered
A

2

11

I must be doing something wrong somewhere because i am getting duplicate items in my concurrentbag, here is the chain of events

  var listings = new ConcurrentBag<JSonListing>();
  Parallel.ForEach(Types, ParallelOptions, (type, state) =>
  {
      ...
      status = ProcessType(listings, status, type, state);
      ....
   });

  private GeocodeResults ProcessType(ConcurrentBag<JSonListing> listings, GeocodeResults status, XElement type, ParallelLoopState state)
  {
      ....
      AddListingsToList(results, type, listings);
      .... 
  }

private void AddListingsToList(dynamic results, XElement type, ConcurrentBag<JSonListing> listings)
    {

        var typeMustMatch = type.Attribute("TypeMustMatch").Value;
        var typeID = Convert.ToInt32(type.Attribute("ID").Value);

        foreach (var result in results.results)
        {


            var foundListing = listings.SingleOrDefault(x => x.ID == result.id);
            if (foundListing != null)
            {
                var typeIDs = foundListing.TypeIDs.Split("|".ToCharArray(), StringSplitOptions.RemoveEmptyEntries).ToList();
                if (!typeIDs.Contains(typeID.ToString()))
                {
                    foundListing.TypeIDs += "|" + typeID;
                }
            }
            else
            {
                var listing = new JSonListing { ID = result.id, ReferenceNumber = result.reference, TypeIDs = typeID.ToString(), TypeMustMatch = typeMustMatch };
                listings.Add(listing);
            }




        }


    }

The add listing should guarantee that if the element alread exists, not to add another one of the same ID but instead update some property. Now the error i get is

System.InvalidOperationException: Sequence contains more than one matching element
at System.Linq.Enumerable.SingleOrDefault[TSource](IEnumerable`1 source, Func`2 predicate)
at LocalSearch.Processor.CityProcessor.AddListingsToList(Object results, XElement type, ConcurrentBag`1 listings) in d:\Projects\ListingLocator2\Code\LocalSearch.Processor\Processors.cs:line 310
at CallSite.Target(Closure , CallSite , CityProcessor , Object , XElement , ConcurrentBag`1 )
at LocalSearch.Processor.CityProcessor.ProcessType(ConcurrentBag`1 listings, GeocodeResults status, XElement type, ParallelLoopState state) in d:\Projects\ListingLocator2\Code\LocalSearch.Processor\Processors.cs:line 249
at LocalSearch.Processor.CityProcessor.<>c__DisplayClass4.b__0(XElement type, ParallelLoopState state) in d:\Projects\ListingLocator2\Code\LocalSearch.Processor\Processors.cs:line 137

Armyn answered 7/2, 2014 at 12:2 Comment(3)
This would be solved more cleanly with PLINQ.Autoxidation
@spender: It looks like ProcessType would have significant side effects which have been omitted from the example though.Emmenagogue
@Autoxidation The just of ProcessType is a call (or multiple calls) to google places api which returns a dynamic list of available places that match the search query. PLINQ is not suitable for this as these are not in memory items.Armyn
E
22

ConcurrentBag guarantees that each operation on it is thread-safe when considered on its own. It does not guarantee that multiple operations in succession will be treated as an atomic group.

As a result your code has a race condition: you check if the bag already contains some item X, but two threads can run the test concurrently, decide that the item isn't there, and proceed to add it. End result: two copies of the item end up being in the bag.

It looks like your use case would be better implemented by using a ConcurrentDictionary instead and leveraging the TryAdd method, which is atomic. Alternatively you could put a lock() around the bag to make everything inside the block operate atomically, but then you don't really need a concurrent collection and can use a straight List instead.

Emmenagogue answered 7/2, 2014 at 12:5 Comment(4)
How can i achieve that no duplicates make it to the bag?Armyn
I think best would be is use a List<T> let it have duplicates, once done go through that list, most of these lists would have approximately 4k items so it wont be a big significant performance hit (at least I dont think so, but I will be trying it)Armyn
@Zoinky: You can try lots of things, the best depends on your exact requirements (we can only guess). Instead of allowing dupes why not just use a Set<T> with an appropriate equality comparer? Better yet, since there is a natural key to use here, why not ConcurrentDictionary?Emmenagogue
I believe ConcurrentDictionary would be what I am looking for, my requirements are pretty simple. The main Parrallel.ForEach has about 20 items, each of which will be a different call to google places API, now each of these types will return upto 200 results, type 1, type 2 could potentially return the same item however instead of discarding one of them, I simply want to keep track of it by appending the typeid to the item already in the collection so if type1 and type2 in the bring back same item, there should only be one item with of string "1|2" denoting both of these found the item.Armyn
A
12

This doesn't mean that Concurrent Bag isn't thread safe, it simply means that your code isn't

You are checking for a value in concurrent bag and then adding a new item if the previouse check failed.

However because there are no locks in your code, two jobs could simultaneously perform the following;

THREAD 1        THREAD 2
=-=-=-=-=-=-=-=-=-=-=-=-
Check Exists
                Check Exists
Add New
                Add New

You need to lock round your check and add routines.

Aldredge answered 7/2, 2014 at 12:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.