1 /** 2 * Defines the scheduler interface, and a simple thread-based implementation 3 * of the scheduler. 4 */ 5 module scheduled.scheduler; 6 7 import scheduled.job; 8 import scheduled.schedule; 9 import scheduled.schedules.cron_schedule; 10 11 /** 12 * A scheduler is the core component of the library; you add jobs to the job 13 * scheduler, and then it will execute these according to the job's schedule. 14 */ 15 public interface JobScheduler { 16 /** 17 * Adds a job to the scheduler. 18 * Params: 19 * job = The job to be added. 20 */ 21 protected void addScheduledJob(ScheduledJob job); 22 23 /** 24 * Adds a job to the scheduler, with the given schedule to define when it 25 * should be run. 26 * Params: 27 * job = The job to be added. 28 * schedule = The schedule defining when the job is run. 29 * Returns: The scheduled job. 30 */ 31 public final ScheduledJob addJob(Job job, JobSchedule schedule) { 32 auto sj = new ScheduledJob(job, schedule, getNextScheduledJobId()); 33 addScheduledJob(sj); 34 return sj; 35 } 36 37 /** 38 * Adds a simple job that executes the given function according to the 39 * given schedule. 40 * Params: 41 * fn = A function to execute. 42 * schedule = The schedule defining when to execute the function. 43 * Returns: The scheduled job. 44 */ 45 public final ScheduledJob addJob(void function() fn, JobSchedule schedule) { 46 return addJob(new FunctionJob(fn), schedule); 47 } 48 49 /** 50 * Adds a job to the scheduler, whose schedule is defined by the given cron 51 * expression string. 52 * Params: 53 * job = The job to be added. 54 * cronExpressionString = A Cron expression string defining when to run the job. 55 * Returns: The scheduled job. 56 */ 57 public final ScheduledJob addCronJob(Job job, string cronExpressionString) { 58 return addJob(job, new CronSchedule(cronExpressionString)); 59 } 60 61 /** 62 * Gets the next available id to assign to a scheduled job. This must be 63 * unique among all jobs that have been added to the scheduler but not yet 64 * removed. 65 * Returns: The next id to use when adding a scheduled job. 66 */ 67 protected ulong getNextScheduledJobId(); 68 69 /** 70 * Starts the scheduler. Once started, there is no guarantee that all 71 * scheduler implementations will allow adding new jobs while running. 72 */ 73 public void start(); 74 75 /** 76 * Stops the scheduler. 77 * Params: 78 * force = Whether to forcibly shutdown, cancelling any current jobs. 79 */ 80 public void stop(bool force); 81 82 /** 83 * Stops the scheduler, and waits for any currently-executing jobs to 84 * finish. Functionally equivalent to calling stop(false). 85 */ 86 public final void stop() { 87 stop(false); 88 } 89 } 90 91 /** 92 * A job scheduler which offers additional functionality for modifying the set 93 * of scheduled jobs after they're submitted. 94 */ 95 public interface MutableJobScheduler : JobScheduler { 96 /** 97 * Removes a job from the scheduler. 98 * Params: 99 * job = The job to remove. 100 * Returns: True if the job was removed, or false otherwise. 101 */ 102 public bool removeScheduledJob(ScheduledJob job); 103 } 104 105 import core.thread; 106 107 /** 108 * A simple thread-based scheduler that sleeps until the next task, and runs it 109 * using a task pool. Allows for adding and removing jobs only when not 110 * running. 111 */ 112 public class ThreadedJobScheduler : Thread, MutableJobScheduler { 113 import std.parallelism; 114 import std.datetime.systime; 115 import std.algorithm.mutation; 116 import std.algorithm.sorting : sort; 117 import std.typecons : Nullable; 118 import std.range : empty; 119 import core.time; 120 import core.atomic; 121 import core.sync.semaphore; 122 import core.sync.mutex; 123 124 /** 125 * The maximum amount of time that this scheduler may sleep for. This is 126 * mainly used as a sanity check against clock deviations or other 127 * inconsistencies in timings. 128 */ 129 private static immutable Duration MAX_SLEEP_TIME = seconds(60); 130 131 /** 132 * The component which provides the current timestamp. 133 */ 134 private CurrentTimeProvider timeProvider; 135 136 /** 137 * The task pool that is used to execute jobs. 138 */ 139 private TaskPool taskPool; 140 141 /** 142 * The set of scheduled jobs that this scheduler tracks. This list will be 143 * maintained in a sorted order, whereby the first job is the one which 144 * needs to be executed next. 145 */ 146 private ScheduledJob[] jobs; 147 148 /** 149 * A mutex for controlling access to the list of jobs this scheduler has. 150 */ 151 private shared Mutex jobsMutex; 152 153 /** 154 * Simple flag which is used to control this scheduler thread. 155 */ 156 private shared bool running; 157 158 /** 159 * Flag that's set to true when this scheduler is empty and waiting for 160 * new jobs to be added. This indicates. 161 */ 162 private shared bool waiting = false; 163 164 /** 165 * A semaphore that is used to notify a waiting scheduler that a job has 166 * been added to its list, and that it can resume normal operations. 167 */ 168 private Semaphore emptyJobsSemaphore; 169 170 /** 171 * Simple auto-incrementing id that is used to issue ids to new scheduled 172 * jobs. Due to the nature of the world, we can safely assume that we will 173 * never run out of ids. 174 */ 175 private shared ulong nextId = 1; 176 177 /** 178 * Constructs a new threaded job scheduler. 179 * Params: 180 * taskPool = The task pool to use when executing jobs. 181 * timeProvider = The component which provides current timestamps. 182 */ 183 public this(TaskPool taskPool, CurrentTimeProvider timeProvider) { 184 super(&this.run); 185 this.jobsMutex = new shared Mutex(); 186 this.emptyJobsSemaphore = new Semaphore(); 187 this.taskPool = taskPool; 188 this.timeProvider = timeProvider; 189 } 190 191 /** 192 * Constructs a new threaded job scheduler with a default task pool and 193 * time provider. 194 */ 195 public this() { 196 this(new TaskPool(1), new SysTimeProvider); 197 } 198 199 /** 200 * Adds a job to the scheduler. For this scheduler, jobs are added to the 201 * list in-order, such that the first job is the one whose next execution 202 * time is the closest. 203 * Params: 204 * job = The job to be added. 205 */ 206 public void addScheduledJob(ScheduledJob job) { 207 this.jobsMutex.lock_nothrow(); 208 this.jobs ~= job; 209 this.jobs.sort!("a > b"); 210 this.jobsMutex.unlock_nothrow(); 211 if (this.waiting) { 212 this.emptyJobsSemaphore.notify(); 213 } 214 } 215 216 /** 217 * Removes a job from the scheduler. 218 * Params: 219 * job = The job to be removed. 220 * Returns: True if the job was removed, or false otherwise. 221 */ 222 public bool removeScheduledJob(ScheduledJob job) { 223 size_t idx = -1; 224 this.jobsMutex.lock_nothrow(); 225 foreach (i, j; this.jobs) { 226 if (j.getId() == job.getId()) { 227 idx = i; 228 break; 229 } 230 } 231 if (idx != -1) { 232 this.jobs = this.jobs.remove(idx); 233 this.jobsMutex.unlock_nothrow(); 234 return true; 235 } 236 this.jobsMutex.unlock_nothrow(); 237 return false; 238 } 239 240 /** 241 * Gets the number of jobs that this scheduler has. 242 * Returns: The number of jobs currently scheduled. 243 */ 244 public ulong jobCount() { 245 return this.jobs.length; 246 } 247 248 /** 249 * Gets the next available id to assign to a scheduled job. This must be 250 * unique among all jobs that have been added to the scheduler but not yet 251 * removed. 252 * Returns: The next id to use when adding a scheduled job. 253 */ 254 protected ulong getNextScheduledJobId() { 255 ulong id = atomicLoad(this.nextId); 256 atomicOp!"+="(this.nextId, 1); 257 return id; 258 } 259 260 /** 261 * Starts the scheduler. Once started, there is no guarantee that all 262 * scheduler implementations will allow adding new jobs while running. 263 */ 264 public void start() { 265 super.start(); 266 } 267 268 /** 269 * Runs the scheduler. This works by popping the next scheduled task from 270 * the priority queue (since scheduled tasks are ordered by their next 271 * execution date) and sleeping until we reach that task's execution date. 272 */ 273 private void run() { 274 this.running = true; 275 while (this.running) { 276 if (!this.jobs.empty) { 277 this.jobsMutex.lock_nothrow(); 278 ScheduledJob nextJob = this.jobs[0]; 279 SysTime now = this.timeProvider.now(); 280 Nullable!SysTime nextExecutionTime = nextJob.getSchedule().getNextExecutionTime(now); 281 // If the job doesn't have a next execution, simply remove it. 282 if (nextExecutionTime.isNull) { 283 this.jobs = this.jobs.remove(0); 284 this.jobsMutex.unlock_nothrow(); 285 } else { 286 Duration timeUntilJob = hnsecs(nextExecutionTime.get.stdTime - now.stdTime); 287 // If it's time to execute this job, then we run it now! 288 if (timeUntilJob.isNegative) { 289 this.taskPool.put(task(&nextJob.getJob.run)); 290 nextJob.getSchedule.markExecuted(now); 291 this.jobs = this.jobs.remove(0); 292 if (nextJob.getSchedule.isRepeating) { 293 this.jobs ~= nextJob; 294 this.jobs.sort!("a > b"); 295 } 296 this.jobsMutex.unlock_nothrow(); 297 } else { 298 this.jobsMutex.unlock_nothrow(); 299 // Otherwise, we sleep until the next job is ready, then try again. 300 if (MAX_SLEEP_TIME < timeUntilJob) { 301 this.sleep(MAX_SLEEP_TIME); 302 } else { 303 this.sleep(timeUntilJob); 304 } 305 } 306 } 307 } else { 308 this.waiting = true; 309 this.emptyJobsSemaphore.wait(); 310 } 311 } 312 } 313 314 /** 315 * Stops the scheduler. 316 * Params: 317 * force = Whether to forcibly shutdown, cancelling any current jobs. 318 */ 319 public void stop(bool force) { 320 this.running = false; 321 this.taskPool.stop(); 322 } 323 } 324 325 /** 326 * Tests the functionality of the threaded job scheduler, by running it for 327 * some different simple jobs to ensure the jobs are executed properly. 328 */ 329 unittest { 330 import core.thread; 331 import core.atomic; 332 import std.format; 333 import std.experimental.logger; 334 import scheduled.schedules.fixed_interval; 335 import std.stdio; 336 337 // Create a simple job which increments a variable by 1. 338 class IncrementJob : Job { 339 public uint x = 0; 340 public string id; 341 public this(string id) { 342 this.id = id; 343 } 344 345 public void run() { 346 x++; 347 import std.stdio; 348 writefln!"[%s] Incrementing x to %d"(id, x); 349 } 350 } 351 352 void assertJobStatus(IncrementJob j, uint expected) { 353 assert(j.x == expected, format("Job %s executed %d times instead of the expected %d.", j.id, j.x, expected)); 354 } 355 356 // Test case 1: Scheduler with a single job. 357 358 JobScheduler scheduler = new ThreadedJobScheduler; 359 auto inc1 = new IncrementJob("1"); 360 scheduler.addJob(inc1, new FixedIntervalSchedule(msecs(50))); 361 scheduler.start(); 362 Thread.sleep(msecs(130)); 363 // We expect the job to be executed at t = 0, 50, and 100 ms. 364 assert(inc1.x == 3, "Job did not execute the expected number of times."); 365 scheduler.stop(); 366 367 // Test case 2: Scheduler with multiple jobs. 368 writeln("Scheduler 1 complete"); 369 370 ThreadedJobScheduler scheduler2 = new ThreadedJobScheduler; 371 auto incA = new IncrementJob("A"); 372 auto incB = new IncrementJob("B"); 373 ScheduledJob sjA = scheduler2.addJob(incA, new FixedIntervalSchedule(msecs(50))); 374 ScheduledJob sjB = scheduler2.addJob(incB, new FixedIntervalSchedule(msecs(80))); 375 assert(scheduler2.jobCount == 2); 376 scheduler2.start(); 377 writeln("Starting scheduler 2"); 378 Thread.sleep(msecs(180)); 379 // We expect job A to be executed at t = 0, 50, 100, and 150. 380 assertJobStatus(incA, 4); 381 // We expect job B to be executed at t = 0, 80, and 160. 382 assertJobStatus(incB, 3); 383 // Try and remove a job. 384 writeln("Removing scheduled job A"); 385 assert(scheduler2.removeScheduledJob(sjA)); 386 assert(scheduler2.jobCount == 1); 387 assert(!scheduler2.removeScheduledJob(sjA)); 388 Thread.sleep(msecs(170)); 389 // We expect job B to be executed at t = 0, 80. 390 assertJobStatus(incB, 5); 391 // We expect job A to not be executed since its scheduled job was removed. 392 assertJobStatus(incA, 4); 393 394 // Remove all jobs, wait a bit, and add one back. 395 writeln("Removing scheduled job B and waiting a while."); 396 assert(scheduler2.removeScheduledJob(sjB)); 397 assert(scheduler2.jobCount == 0); 398 Thread.sleep(msecs(100)); 399 writeln("Adding scheduled job C"); 400 auto incC = new IncrementJob("C"); 401 ScheduledJob sjC = scheduler2.addJob(incC, new FixedIntervalSchedule(msecs(30))); 402 assert(scheduler2.jobCount == 1); 403 Thread.sleep(msecs(100)); 404 // We expect job C to be executed at t = 0, 30, 60, 90. 405 assertJobStatus(incC, 4); 406 scheduler2.stop(false); 407 }