123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using Hangfire.Common;
- using Hangfire.States;
- using Hangfire.Storage;
- namespace Hangfire.Pro
- {
- /// <summary>
- /// Represents a background job filter that helps to disable concurrent execution
- /// without causing worker to wait as in <see cref="Hangfire.DisableConcurrentExecutionAttribute"/>.
- /// </summary>
- public class MutexAttribute : JobFilterAttribute, IElectStateFilter, IApplyStateFilter
- {
- private static readonly TimeSpan DistributedLockTimeout = TimeSpan.FromMinutes(1);
- private readonly string _resource;
- public MutexAttribute(string resource)
- {
- _resource = resource;
- RetryInSeconds = 15;
- }
- public int RetryInSeconds { get; set; }
- public int MaxAttempts { get; set; }
- public void OnStateElection(ElectStateContext context)
- {
- // We are intercepting transitions to the Processed state, that is performed by
- // a worker just before processing a job. During the state election phase we can
- // change the target state to another one, causing a worker not to process the
- // backgorund job.
- if (context.CandidateState.Name != ProcessingState.StateName ||
- context.BackgroundJob.Job == null)
- {
- return;
- }
- // This filter requires an extended set of storage operations. It's supported
- // by all the official storages, and many of the community-based ones.
- var storageConnection = context.Connection as JobStorageConnection;
- if (storageConnection == null)
- {
- throw new NotSupportedException("This version of storage doesn't support extended methods. Please try to update to the latest version.");
- }
- string blockedBy;
- try
- {
- // Distributed lock is needed here only to prevent a race condition, when another
- // worker picks up a background job with the same resource between GET and SET
- // operations.
- // There will be no race condition, when two or more workers pick up background job
- // with the same id, because state transitions are protected with distributed lock
- // themselves.
- using (AcquireDistributedSetLock(context.Connection, context.BackgroundJob.Job.Args))
- {
- // Resource set contains a background job id that acquired a mutex for the resource.
- // We are getting only one element to see what background job blocked the invocation.
- var range = storageConnection.GetRangeFromSet(
- GetResourceKey(context.BackgroundJob.Job.Args),
- 0,
- 0);
- blockedBy = range.Count > 0 ? range[0] : null;
- // We should permit an invocation only when the set is empty, or if current background
- // job is already owns a resource. This may happen, when the localTransaction succeeded,
- // but outer transaction was failed.
- if (blockedBy == null || blockedBy == context.BackgroundJob.Id)
- {
- // We need to commit the changes inside a distributed lock, otherwise it's
- // useless. So we create a local transaction instead of using the
- // context.Transaction property.
- var localTransaction = context.Connection.CreateWriteTransaction();
- // Add the current background job identifier to a resource set. This means
- // that resource is owned by the current background job. Identifier will be
- // removed only on failed state, or in one of final states (succeeded or
- // deleted).
- localTransaction.AddToSet(GetResourceKey(context.BackgroundJob.Job.Args), context.BackgroundJob.Id);
- localTransaction.Commit();
- // Invocation is permitted, and we did all the required things.
- return;
- }
- }
- }
- catch (DistributedLockTimeoutException)
- {
- // We weren't able to acquire a distributed lock within a specified window. This may
- // be caused by network delays, storage outages or abandoned locks in some storages.
- // Since it is required to expire abandoned locks after some time, we can simply
- // postpone the invocation.
- context.CandidateState = new ScheduledState(TimeSpan.FromSeconds(RetryInSeconds))
- {
- Reason = "Couldn't acquire a distributed lock for mutex: timeout exceeded"
- };
- return;
- }
- // Background job execution is blocked. We should change the target state either to
- // the Scheduled or to the Deleted one, depending on current retry attempt number.
- var currentAttempt = context.GetJobParameter<int>("MutexAttempt") + 1;
- context.SetJobParameter("MutexAttempt", currentAttempt);
- context.CandidateState = MaxAttempts == 0 || currentAttempt <= MaxAttempts
- ? CreateScheduledState(blockedBy, currentAttempt)
- : CreateDeletedState(blockedBy);
- }
- public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
- {
- if (context.BackgroundJob.Job == null) return;
- if (context.OldStateName == ProcessingState.StateName)
- {
- using (AcquireDistributedSetLock(context.Connection, context.BackgroundJob.Job.Args))
- {
- var localTransaction = context.Connection.CreateWriteTransaction();
- localTransaction.RemoveFromSet(GetResourceKey(context.BackgroundJob.Job.Args), context.BackgroundJob.Id);
- localTransaction.Commit();
- }
- }
- }
- public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
- {
- }
- private static DeletedState CreateDeletedState(string blockedBy)
- {
- return new DeletedState
- {
- Reason = $"Execution was blocked by background job {blockedBy}, all attempts exhausted"
- };
- }
- private IState CreateScheduledState(string blockedBy, int currentAttempt)
- {
- var reason = $"Execution is blocked by background job {blockedBy}, retry attempt: {currentAttempt}";
- if (MaxAttempts > 0)
- {
- reason += $"/{MaxAttempts}";
- }
- return new ScheduledState(TimeSpan.FromSeconds(RetryInSeconds))
- {
- Reason = reason
- };
- }
- private IDisposable AcquireDistributedSetLock(IStorageConnection connection, IEnumerable<object> args)
- {
- return connection.AcquireDistributedLock(GetDistributedLockKey(args), DistributedLockTimeout);
- }
- private string GetDistributedLockKey(IEnumerable<object> args)
- {
- return $"extension:job-mutex:lock:{GetKeyFormat(args, _resource)}";
- }
- private string GetResourceKey(IEnumerable<object> args)
- {
- return $"extension:job-mutex:set:{GetKeyFormat(args, _resource)}";
- }
- private static string GetKeyFormat(IEnumerable<object> args, string keyFormat)
- {
- return String.Format(keyFormat, args.ToArray());
- }
- }
- }
|