1 module scheduled.taskpool_scheduler; 2 3 import scheduled.scheduler; 4 import scheduled.schedule; 5 import scheduled.job; 6 import core.thread; 7 import slf4d; 8 9 /** 10 * A simple thread-based scheduler that sleeps until the next task, and runs it 11 * using a task pool. Allows for adding and removing jobs only when not 12 * running. 13 */ 14 public class TaskPoolScheduler : Thread, MutableJobScheduler { 15 import std.parallelism; 16 import std.datetime.systime; 17 import std.algorithm.mutation; 18 import std.typecons : Nullable; 19 import std.range : empty; 20 import core.time; 21 import core.atomic; 22 import core.sync.semaphore; 23 import core.sync.mutex; 24 25 /** 26 * The maximum amount of time that this scheduler may sleep for. This is 27 * mainly used as a sanity check against clock deviations or other 28 * inconsistencies in timings. 29 */ 30 private static immutable Duration MAX_SLEEP_TIME = seconds(60); 31 32 /** 33 * The component which provides the current timestamp. 34 */ 35 private CurrentTimeProvider timeProvider; 36 37 /** 38 * The task pool that is used to execute jobs. 39 */ 40 private TaskPool taskPool; 41 42 /** 43 * The set of scheduled jobs that this scheduler tracks. This list will be 44 * maintained in a sorted order, whereby the first job is the one which 45 * needs to be executed next. 46 */ 47 private ScheduledJob[] jobs; 48 49 /** 50 * A mutex for controlling access to the list of jobs this scheduler has. 51 */ 52 private shared Mutex jobsMutex; 53 54 /** 55 * Simple flag which is used to control this scheduler thread. 56 */ 57 private shared bool running; 58 59 /** 60 * Flag that's set to true when this scheduler is empty and waiting for 61 * new jobs to be added. 62 */ 63 private shared bool waiting = false; 64 65 /** 66 * A semaphore that is used to notify a waiting scheduler that a job has 67 * been added to its list, and that it can resume normal operations. 68 */ 69 private Semaphore emptyJobsSemaphore; 70 71 /** 72 * Simple auto-incrementing id that is used to issue ids to new scheduled 73 * jobs. Due to the nature of the world, we can safely assume that we will 74 * never run out of ids. 75 */ 76 private shared ulong nextId = 1; 77 78 /** 79 * Constructs a new threaded job scheduler. 80 * Params: 81 * taskPool = The task pool to use when executing jobs. 82 * timeProvider = The component which provides current timestamps. 83 */ 84 public this(TaskPool taskPool, CurrentTimeProvider timeProvider) { 85 super(&this.run); 86 this.jobsMutex = new shared Mutex(); 87 this.emptyJobsSemaphore = new Semaphore(); 88 this.taskPool = taskPool; 89 this.timeProvider = timeProvider; 90 } 91 92 /** 93 * Constructs a new threaded job scheduler with a default task pool and 94 * time provider. 95 */ 96 public this() { 97 this(new TaskPool(1), new SysTimeProvider()); 98 this.taskPool.isDaemon = true; 99 } 100 101 /** 102 * Adds a job to the scheduler. For this scheduler, jobs are added to the 103 * list in-order, such that the first job is the one whose next execution 104 * time is the closest. 105 * Params: 106 * job = The job to be added. 107 */ 108 public void addScheduledJob(ScheduledJob job) { 109 synchronized(this.jobsMutex) { 110 this.jobs ~= job; 111 this.sortJobs(); 112 } 113 if (this.waiting) { 114 this.emptyJobsSemaphore.notify(); 115 } 116 auto log = getLogger(); 117 log.debugF!"Added scheduled job %s with id %d."(job.job, job.id); 118 } 119 120 /** 121 * Removes a job from the scheduler. 122 * Params: 123 * job = The job to be removed. 124 * Returns: True if the job was removed, or false otherwise. 125 */ 126 public bool removeScheduledJob(ScheduledJob job) { 127 size_t idx = -1; 128 synchronized(this.jobsMutex) { 129 foreach (i, j; this.jobs) { 130 if (j.id == job.id) { 131 idx = i; 132 break; 133 } 134 } 135 if (idx != -1) { 136 this.jobs = this.jobs.remove(idx); 137 getLogger().debugF!"Removed job %s (id %d)."(job.job, job.id); 138 return true; 139 } 140 return false; 141 } 142 } 143 144 /** 145 * Sorts the internal list of jobs according to the next execution time. 146 * Jobs that execute sooner appear earlier in the list. Only call this 147 * in synchronized code. 148 */ 149 private void sortJobs() { 150 import std.algorithm : sort; 151 152 int[] data = [1, 3, 4, -1, 2, 6, -4]; 153 data.sort!((a, b) { 154 return a > b; 155 }); 156 157 auto now = this.timeProvider.now(); 158 jobs.sort!((a, b) { 159 if (a.id == b.id) return false; 160 auto t1 = a.schedule.getNextExecutionTime(now); 161 auto t2 = b.schedule.getNextExecutionTime(now); 162 if (t1.isNull && t2.isNull) return false; 163 if (!t1.isNull && t2.isNull) return true; 164 if (t1.isNull) return false; 165 return t1.get().opCmp(t2.get()) < 0; 166 }); 167 } 168 169 /** 170 * Removes all jobs from the scheduler. 171 */ 172 public void removeAllScheduledJobs() { 173 synchronized(this.jobsMutex) { 174 this.jobs = []; 175 } 176 } 177 178 /** 179 * Gets the number of jobs that this scheduler has. 180 * Returns: The number of jobs currently scheduled. 181 */ 182 public ulong jobCount() { 183 synchronized(this.jobsMutex) { 184 return this.jobs.length; 185 } 186 } 187 188 protected CurrentTimeProvider getTimeProvider() { 189 return this.timeProvider; 190 } 191 192 /** 193 * Gets the next available id to assign to a scheduled job. This must be 194 * unique among all jobs that have been added to the scheduler but not yet 195 * removed. 196 * Returns: The next id to use when adding a scheduled job. 197 */ 198 protected ulong getNextScheduledJobId() { 199 ulong id = atomicLoad(this.nextId); 200 atomicOp!"+="(this.nextId, 1); 201 return id; 202 } 203 204 /** 205 * Starts the scheduler. Once started, there is no guarantee that all 206 * scheduler implementations will allow adding new jobs while running. 207 */ 208 public void start() { 209 super.start(); 210 } 211 212 /** 213 * Runs the scheduler. This works by popping the next scheduled task from 214 * the priority queue (since scheduled tasks are ordered by their next 215 * execution date) and sleeping until we reach that task's execution date. 216 */ 217 private void run() { 218 auto log = getLogger(); 219 this.running = true; 220 while (this.running) { 221 if (!this.jobs.empty) { 222 this.jobsMutex.lock_nothrow(); 223 224 ScheduledJob nextJob = this.jobs[0]; 225 SysTime now = this.timeProvider.now(); 226 Nullable!SysTime nextExecutionTime = nextJob.schedule.getNextExecutionTime(now); 227 // If the job doesn't have a next execution, simply remove it. 228 if (nextExecutionTime.isNull) { 229 log.debugF!"Removing job %s (id %d) because it doesn't have a next execution time." 230 (nextJob.job, nextJob.id); 231 this.jobs = this.jobs.remove(0); 232 this.jobsMutex.unlock_nothrow(); 233 } else { 234 Duration timeUntilJob = hnsecs(nextExecutionTime.get.stdTime - now.stdTime); 235 // If it's time to execute this job, then we run it now! 236 if (timeUntilJob.isNegative) { 237 log.debugF!"Running job %s (id %d)."(nextJob.job, nextJob.id); 238 this.taskPool.put(task(&nextJob.job.run)); 239 nextJob.schedule.markExecuted(now); 240 this.jobs = this.jobs.remove(0); 241 if (nextJob.schedule.isRepeating) { 242 log.debugF!"Requeued job %s (id %d) because its schedule is repeating." 243 (nextJob.job, nextJob.id); 244 this.jobs ~= nextJob; 245 this.sortJobs(); 246 } 247 this.jobsMutex.unlock_nothrow(); 248 } else { 249 this.jobsMutex.unlock_nothrow(); 250 // Otherwise, we sleep until the next job is ready, then try again. 251 Duration timeToSleep = MAX_SLEEP_TIME < timeUntilJob ? MAX_SLEEP_TIME : timeUntilJob; 252 log.debugF!"Sleeping for %d ms."(timeToSleep.total!"msecs"); 253 this.sleep(timeToSleep); 254 } 255 } 256 } else { 257 log.debug_("No jobs in queue. Waiting until a job is added."); 258 this.waiting = true; 259 this.emptyJobsSemaphore.wait(); 260 } 261 } 262 } 263 264 /** 265 * Stops the scheduler. 266 * Params: 267 * force = Whether to forcibly shutdown, cancelling any current jobs. 268 */ 269 public void stop(bool force = false) { 270 this.running = false; 271 if (waiting) emptyJobsSemaphore.notify(); 272 if (force) { 273 this.taskPool.stop(); 274 } else { 275 this.taskPool.finish(true); 276 } 277 } 278 } 279 280 unittest { 281 // Run standard Scheduler test suite: 282 testScheduler(() => new TaskPoolScheduler()); 283 }