Mutex.cs 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using Hangfire.Common;
  5. using Hangfire.States;
  6. using Hangfire.Storage;
  7. namespace Hangfire.Pro
  8. {
  9. /// <summary>
  10. /// Represents a background job filter that helps to disable concurrent execution
  11. /// without causing worker to wait as in <see cref="Hangfire.DisableConcurrentExecutionAttribute"/>.
  12. /// </summary>
  13. public class MutexAttribute : JobFilterAttribute, IElectStateFilter, IApplyStateFilter
  14. {
  15. private static readonly TimeSpan DistributedLockTimeout = TimeSpan.FromMinutes(1);
  16. private readonly string _resource;
  17. public MutexAttribute(string resource)
  18. {
  19. _resource = resource;
  20. RetryInSeconds = 15;
  21. }
  22. public int RetryInSeconds { get; set; }
  23. public int MaxAttempts { get; set; }
  24. public void OnStateElection(ElectStateContext context)
  25. {
  26. // We are intercepting transitions to the Processed state, that is performed by
  27. // a worker just before processing a job. During the state election phase we can
  28. // change the target state to another one, causing a worker not to process the
  29. // backgorund job.
  30. if (context.CandidateState.Name != ProcessingState.StateName ||
  31. context.BackgroundJob.Job == null)
  32. {
  33. return;
  34. }
  35. // This filter requires an extended set of storage operations. It's supported
  36. // by all the official storages, and many of the community-based ones.
  37. var storageConnection = context.Connection as JobStorageConnection;
  38. if (storageConnection == null)
  39. {
  40. throw new NotSupportedException("This version of storage doesn't support extended methods. Please try to update to the latest version.");
  41. }
  42. string blockedBy;
  43. try
  44. {
  45. // Distributed lock is needed here only to prevent a race condition, when another
  46. // worker picks up a background job with the same resource between GET and SET
  47. // operations.
  48. // There will be no race condition, when two or more workers pick up background job
  49. // with the same id, because state transitions are protected with distributed lock
  50. // themselves.
  51. using (AcquireDistributedSetLock(context.Connection, context.BackgroundJob.Job.Args))
  52. {
  53. // Resource set contains a background job id that acquired a mutex for the resource.
  54. // We are getting only one element to see what background job blocked the invocation.
  55. var range = storageConnection.GetRangeFromSet(
  56. GetResourceKey(context.BackgroundJob.Job.Args),
  57. 0,
  58. 0);
  59. blockedBy = range.Count > 0 ? range[0] : null;
  60. // We should permit an invocation only when the set is empty, or if current background
  61. // job is already owns a resource. This may happen, when the localTransaction succeeded,
  62. // but outer transaction was failed.
  63. if (blockedBy == null || blockedBy == context.BackgroundJob.Id)
  64. {
  65. // We need to commit the changes inside a distributed lock, otherwise it's
  66. // useless. So we create a local transaction instead of using the
  67. // context.Transaction property.
  68. var localTransaction = context.Connection.CreateWriteTransaction();
  69. // Add the current background job identifier to a resource set. This means
  70. // that resource is owned by the current background job. Identifier will be
  71. // removed only on failed state, or in one of final states (succeeded or
  72. // deleted).
  73. localTransaction.AddToSet(GetResourceKey(context.BackgroundJob.Job.Args), context.BackgroundJob.Id);
  74. localTransaction.Commit();
  75. // Invocation is permitted, and we did all the required things.
  76. return;
  77. }
  78. }
  79. }
  80. catch (DistributedLockTimeoutException)
  81. {
  82. // We weren't able to acquire a distributed lock within a specified window. This may
  83. // be caused by network delays, storage outages or abandoned locks in some storages.
  84. // Since it is required to expire abandoned locks after some time, we can simply
  85. // postpone the invocation.
  86. context.CandidateState = new ScheduledState(TimeSpan.FromSeconds(RetryInSeconds))
  87. {
  88. Reason = "Couldn't acquire a distributed lock for mutex: timeout exceeded"
  89. };
  90. return;
  91. }
  92. // Background job execution is blocked. We should change the target state either to
  93. // the Scheduled or to the Deleted one, depending on current retry attempt number.
  94. var currentAttempt = context.GetJobParameter<int>("MutexAttempt") + 1;
  95. context.SetJobParameter("MutexAttempt", currentAttempt);
  96. context.CandidateState = MaxAttempts == 0 || currentAttempt <= MaxAttempts
  97. ? CreateScheduledState(blockedBy, currentAttempt)
  98. : CreateDeletedState(blockedBy);
  99. }
  100. public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
  101. {
  102. if (context.BackgroundJob.Job == null) return;
  103. if (context.OldStateName == ProcessingState.StateName)
  104. {
  105. using (AcquireDistributedSetLock(context.Connection, context.BackgroundJob.Job.Args))
  106. {
  107. var localTransaction = context.Connection.CreateWriteTransaction();
  108. localTransaction.RemoveFromSet(GetResourceKey(context.BackgroundJob.Job.Args), context.BackgroundJob.Id);
  109. localTransaction.Commit();
  110. }
  111. }
  112. }
  113. public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
  114. {
  115. }
  116. private static DeletedState CreateDeletedState(string blockedBy)
  117. {
  118. return new DeletedState
  119. {
  120. Reason = $"Execution was blocked by background job {blockedBy}, all attempts exhausted"
  121. };
  122. }
  123. private IState CreateScheduledState(string blockedBy, int currentAttempt)
  124. {
  125. var reason = $"Execution is blocked by background job {blockedBy}, retry attempt: {currentAttempt}";
  126. if (MaxAttempts > 0)
  127. {
  128. reason += $"/{MaxAttempts}";
  129. }
  130. return new ScheduledState(TimeSpan.FromSeconds(RetryInSeconds))
  131. {
  132. Reason = reason
  133. };
  134. }
  135. private IDisposable AcquireDistributedSetLock(IStorageConnection connection, IEnumerable<object> args)
  136. {
  137. return connection.AcquireDistributedLock(GetDistributedLockKey(args), DistributedLockTimeout);
  138. }
  139. private string GetDistributedLockKey(IEnumerable<object> args)
  140. {
  141. return $"extension:job-mutex:lock:{GetKeyFormat(args, _resource)}";
  142. }
  143. private string GetResourceKey(IEnumerable<object> args)
  144. {
  145. return $"extension:job-mutex:set:{GetKeyFormat(args, _resource)}";
  146. }
  147. private static string GetKeyFormat(IEnumerable<object> args, string keyFormat)
  148. {
  149. return String.Format(keyFormat, args.ToArray());
  150. }
  151. }
  152. }