Hangfire - Prevent multiples of the same job being enqueued
Asked Answered
I

10

30

Scenario:

Job 1 is scheduled to run every 5 minutes, and takes ~1 minute to complete.

A lot of work piles up and Job 1 takes 15 minutes to run.

There are now three Job 1's being processed concurrently - I don't want this.


How do I prevent Job 1 being added to the queue again if it is already there?

Is there Hangfire setting, or do I need to poll job statuses manually?

Immune answered 18/7, 2017 at 10:37 Comment(0)
J
22

You can use DisableConcurrentExecution attribute to prevent multiple executions of a method concurrently. Just put this attribute above your method -

[DisableConcurrentExecution(timeoutInSeconds: 10 * 60)]
public void Job1()
{
    // Metohd body
}
Jessjessa answered 18/7, 2017 at 10:56 Comment(4)
someone report a bug about DisableConcurrentExecution attributes at 10 november 2016. Something could be wrong.Paleobiology
What if there are custom input parameters into Job1() method?Urethroscope
Note that Hangfire may cause database connection pool leaks this way: codewithstyle.info/real-bug-story-long-running-jobs-hangfireCheshire
how to avoid throws timeout exception?Silverfish
D
10

a bit late but i was using this class to prevent duplicate jobs to run concurrently

public class SkipConcurrentExecutionAttribute : JobFilterAttribute, IServerFilter, IElectStateFilter
{
    private readonly int _timeoutSeconds;
    private const string DistributedLock = "DistributedLock";

    public SkipConcurrentExecutionAttribute(int timeOutSeconds)
    {
        if (timeOutSeconds < 0) throw new ArgumentException("Timeout argument value should be greater that zero.");
        this._timeoutSeconds = timeOutSeconds;
    }

    public void OnPerformed(PerformedContext filterContext)
    {
        if (!filterContext.Items.ContainsKey(DistributedLock))
            throw new InvalidOperationException("Can not release a distributed lock: it was not acquired.");

        var distributedLock = (IDisposable)filterContext.Items[DistributedLock];
        distributedLock?.Dispose();
    }



    public void OnPerforming(PerformingContext filterContext)
    {
        filterContext.WriteLine("Job Started");

        var resource = String.Format(
                           "{0}.{1}",
                          filterContext.BackgroundJob.Job.Type.FullName,
                          filterContext.BackgroundJob.Job.Method.Name);

        var timeOut = TimeSpan.FromSeconds(_timeoutSeconds);

        filterContext.WriteLine($"Waiting for running jobs to complete. (timeout: { _timeoutSeconds })");

        try
        {
            var distributedLock = filterContext.Connection.AcquireDistributedLock(resource, timeOut);
            filterContext.Items[DistributedLock] = distributedLock;
        }
        catch (Exception ex)
        {
            filterContext.WriteLine(ex);
            filterContext.WriteLine("Another job is already running, aborted.");
            filterContext.Canceled = true; 
        }

    }

    public void OnStateElection(ElectStateContext context)
    {
        //if (context.CandidateState as FailedState != null)
        //{

        //}
    }
}

Hope that helps, thx!

Dimmick answered 16/3, 2020 at 1:7 Comment(3)
cancelling job doing the trick, yes. Also you could add method parameters to lock name and be able to run same method but with different arguments - each just onceMorrow
Also, if you use job continuation, cancelling job will trigger job that depends on it as parent job was successed. Keep that in mindMorrow
To avoid that instead of filterContext.Canceled = true; throw new JobAbortedException(); this will tell worker to skip duplicate job runMorrow
D
9

Sounds like this could be something that you might be interested in: https://discuss.hangfire.io/t/job-reentrancy-avoidance-proposal/607/8

The discussion is about skipping jobs that would be executed concurrently to a already running job.

Durand answered 18/7, 2017 at 10:54 Comment(0)
F
8

I solved this problem. I hope it works for you

public class PreventConcurrentExecutionJobFilter : JobFilterAttribute, IClientFilter, IServerFilter
{
    public void OnCreating(CreatingContext filterContext)
    {
        var jobs = JobStorage.Current.GetMonitoringApi().ProcessingJobs(0, 100);
        if (jobs.Count(x => x.Value.Job.Type == filterContext.Job.Type && string.Join(".", x.Value.Job.Args) == string.Join(".", filterContext.Job.Args)) > 0)
        {
            filterContext.Canceled = true;
        }
    }

    public void OnPerformed(PerformedContext filterContext) { }

    void IClientFilter.OnCreated(CreatedContext filterContext) { }

    void IServerFilter.OnPerforming(PerformingContext filterContext) { }
}

Usings:

  1. Add to global filters
GlobalJobFilters.Filters.Add(new PreventConcurrentExecutionJobFilter());
  1. or by Abstract base job class
[PreventConcurrentExecutionJobFilter]
public abstract class IHangfireJob { 

}
  1. or by Single job
[PreventConcurrentExecutionJobFilter]
public class MyJob { 

}
Facultative answered 6/11, 2022 at 7:48 Comment(1)
Clean and nice, but there seems to be two issues here. One that it does not check the method name, only type and arguments. And one that there might be a race condition issue. In recuring job I managed to get it in a state that two jobs where scheduled at the same time (recuring each second). Since none where in a running state, both started. If I moved the logic to OnPerforming, then both would be canceled. If the same job is enqueued twice, it's simple to reproduce, not as simple in a recuring job.Unsay
T
7

There is an attribute called DisableConcurrentExecution, that prevents 2 Jobs of the same type running concurrently.

Though, in your case it could be best, to check if a task runs and skip accordingly.

Teutonic answered 18/7, 2017 at 10:48 Comment(2)
From my understanding this would prevent them from executing concurrently but they will still be queued to run. This is a step in the right direction but ideally they wouldn't be queued at all. Thanks for the answer!Immune
That's what i meant with my second sentence. I don't think it is the right way to not queue the jobs, but let them start and check if a other job is currently running an then skip it right there...Teutonic
S
3

I was using the DisableConcurrentExecution attribute for my RecurringJob, but it was not working for me.

My mistake was that I was using it on my method and not on my interface.

[DisableConcurrentExecution(timeoutInSeconds: 10 * 60)]
Task SyncAllMyDataAsync();



RecurringJob.AddOrUpdate<IMySyncJobs>("Sync my data", x => x.SyncAllMyDataAsync(), "0 0 * * * *");
Selmner answered 2/12, 2022 at 13:57 Comment(0)
G
1

Yes.It is possible as below:

            RecurringJob.AddOrUpdate(Environment.MachineName, () => MyJob(Environment.MachineName), Cron.HourInterval(2));

and MyJob should define like this:

    public void MyJob(string taskId)
    {
        if (!taskId.Equals(Environment.MachineName))
        {
            return;
        }
        //Do whatever you job should do.
    }
Gautier answered 9/8, 2020 at 18:45 Comment(2)
That doesn't solve OP's problem. Your answer only prevents a job to be executed in a machine other than the one where it has been queued, but it doesn't prevent reentrancy. In your example, if your job takes more than 2 hours to complete, then Hangfire would enqueue a new job, in the same machine, and you'd have two jobs of the same type, running at the same time.Ratsbane
If a different replica than the intended one picks up a job, the job will simply complete "successfully" without ever actually doing anything.Titicaca
M
0

In my case, i have many methods running in parallel, but i can't run the same method more than once.

Using a solution from this topic, I just edited the query. If was already one method in the processing list with the same name of the actual method, the execution is cancelled.

I believe that resolve a lot of cases.

  1. Create this class:
    using Hangfire.Client;
    using Hangfire.Common;
    using Hangfire.Server;
    using Hangfire;
    
    public class PreventConcurrentExecutionJobFilter : JobFilterAttribute, IClientFilter, IServerFilter
    {
        public void OnCreating(CreatingContext filterContext)
        {
            var jobs = JobStorage.Current.GetMonitoringApi().ProcessingJobs(0, 100);
    
            var methodAlreadyProcessing = jobs.Any(x => x.Value.Job.Method.Name == filterContext.Job.Method.Name);
    
            if (methodAlreadyProcessing)
            {
                Console.WriteLine($"{DateTime.Now:HH:mm:ss} - Job {filterContext.Job.Method.Name} cancelled why was already exists in processing list!");
                filterContext.Canceled = true;
            }
        }
    
        public void OnPerformed(PerformedContext filterContext) { }
    
        void IClientFilter.OnCreated(CreatedContext filterContext) { }
    
        void IServerFilter.OnPerforming(PerformingContext filterContext) { }
    }

  1. Put the Annotation in your method:
    [PreventConcurrentExecutionJobFilter]
    public async Task MyTopTask()
    {
      ...
    }
Midinette answered 5/12, 2023 at 12:19 Comment(0)
C
0
public class PreventConcurrentExecutionJobFilter : JobFilterAttribute, IClientFilter, IServerFilter
{    
    public void OnCreating(CreatingContext filterContext)
    {
        JobList<ProcessingJobDto> jobs = JobStorage.Current.GetMonitoringApi().ProcessingJobs(0, 100);
        if (jobs.Any(x =>
                        x.Value.Job.Type == filterContext.Job.Type &&
                        x.Value.Job.Method.Name == filterContext.Job.Method.Name &&
                        x.Value.Job.Args.Count == filterContext.Job.Args.Count &&
                        string.Join(",",x.Value.Job.Args) == string.Join(",", filterContext.Job.Args)))
        {
            filterContext.Canceled = true;
        }
    }
}

this filter check controller, method, parameter count and equality

Compost answered 19/3 at 0:32 Comment(0)
E
-4

If you want to discard attempts to run something twice if it's already running you can always just do this (note no attributes applied):

    private static bool _isRunningUpdateOrders;
    public void UpdateOrders()
    {
        try
        {
            if (_isRunningUpdateOrders)
            {
                return; 
            }

            _isRunningUpdateOrders = true;

            // Logic...

        }
        finally 
        {
            _ isRunningUpdateOrders = false;
        }
   }

Edit: Please only use something like this as a quick fix, like if you've just discovered you have an issue and you're still evaluating better solutions :-) Or if you're lazy and you just want to 'sort of fix' the problem ;-)

Edict answered 7/5, 2018 at 16:22 Comment(7)
bad answer. because you supposed that no one can call UpdateOrders, without lock? ok. you can use lock but again will be issue becasue you can not handle semafor.Paleobiology
@NuriYILMAZ this would only ever be called by hangfire. I realize this may not have been made clear. This should not be called by your ‘normal’ website. Call it Hangfire_Update orders or put it in a class HangfireTasks (that’s what I do).Edict
@NuriYILMAZ what do you mean about semaphore?Edict
This would fail when having multiple servers since statics will be server local. If you only have a single hangfire instance it could work.Hoey
In order to avoid race conditions I would use if (Interlocked.Exchange(ref _isRunningUpdateOrders, 1) == 1) return; before the try statement and Interlocked.Exchange(ref _isRunningUpdateOrders, 0); inside the finally section. By the way, Hangfire documentation also mentions that sometimes people may instead prefer their own locking mechanism - in cases where there is single server processing the jobs.Cheshire
This solution may even be the only possible solution since Hangfire may cause database connection pool leaks with alternative solution using the DisableConcurrentExecution attribute: codewithstyle.info/real-bug-story-long-running-jobs-hangfireCheshire
This answer assumes there is only 1 server processing jobs. This solution does not work across multiple servers.Cousteau

© 2022 - 2024 — McMap. All rights reserved.