so this is a fairly broad question but have run out of ideas. We are currently running 2 worker role instances that does the following :
- Monitors and processes IoT Hub events by spawning N threads for each batch.
- Monitors and processes Connect/Disconnect (Operations monitoring) messages from IoT Hub
- Does some Service bus work (topics and queues)
- Writes to SQL, DocDB (Mongo API) and Azure table storage for logging via NLOG
- Sends Cloud to Device message via IoT Hub
The problem we face is during high peaks our CPU obviously increases but sadly never comes back down and often will shoot up to 100% and sit there until I restart the instances to get it back down. I keep looking into the threads as I still feel it could be related to a "while(1)" type scenario even though cant see why. Lets get into the code now...
In the WorkerRole.cs:
class WorkerRole : RoleEntryPoint
{
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
public override void Run()
{
_eventprocessor.Start(instanceId, instanceIndex);//.Wait(-1);
//Wait for shutdown to be called, else the role will recycle
this.runCompleteEvent.WaitOne();
}
}
In EventProcessor.cs: I will try leave out a lot of the juice but add what I feel might be worthy. Will add "pseudo code" where possible.
public class EventProcessor : IEventProcessor
{
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
public async Task Start(string serviceId, int InstanceIndex)
{
//Setup Topic
//Setup Queue
//Setup EventProcessorHost for receiving events and operations monitoring and start listening
//Receiving cloud to device feedback from service
ReceiveFeedbackAsync();
runCompleteEvent.WaitOne();
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
if (messages.Count() > 0)
{
if (!_cancellationSource.IsCancellationRequested)
{
await ProcessEventsBulk(context, messages);
}
}
if (messages.Count() > 0)
{
await context.CheckpointAsync();
}
}
async Task ProcessEventsBulk(PartitionContext context, IEnumerable<EventData> messages)
{
List<Task> TaskList = new List<Task>();
foreach (EventData message in messages)
{
var LastTask = Task.Run(() => GoBoy(context, message));
TaskList.Add(LastTask);
}
await Task.WhenAll(TaskList);
}
async Task GoBoy(PartitionContext context, EventData message)
{
try
{
using (var db = new AppDbContext(_dbContextConnectionString))
{
await ProcessEvent(message, context.Lease.PartitionId, new CoreManagerContainer(db), db);
await db.SaveChangesAsync();
}
}
catch (Exception e)
{
//Do Some stuff...
}
}
private async void ReceiveFeedbackAsync()
{
var feedbackReceiver = serviceClientReceiver.GetFeedbackReceiver();
while (true)
{
try
{
var feedbackBatch = await feedbackReceiver.ReceiveAsync();
if (feedbackBatch == null) continue;
foreach (var records in feedbackBatch.Records)
{
}
await feedbackReceiver.CompleteAsync(feedbackBatch);
}
catch (Exception)
{
Thread.Sleep(30000);
}
}
}
}
If there is anything extra anyone needs please dont hesitate to ask. I really really appreciate any help.
Here shows the CPU drop once I restarted the workers
Microsoft support assisted with asking me to do some PerfViews and some ProcDumps. The outcome was that we should look into the thread calling our hub "https://abcxyz.azure-devices.net:443/$iothub/websocket". This is why I decided to add the ReceiveFeedbackAsync() method as I know that relies on being permanently connected to our hub to gather feedback.
From what I can see we are registering to our EVPH correctly but let me know if anyone would like to view that code as well.
catch (Exception)
. Otherwise, some important information/errors might be hidden from you. – Naughton