1 module scheduled.scheduler; 2 3 import scheduled.job; 4 import scheduled.schedule; 5 import scheduled.schedules.cron_schedule; 6 7 /** 8 * A scheduler is the core component of the library; you add jobs to the job 9 * scheduler, and then it will execute these according to the job's schedule. 10 */ 11 public interface JobScheduler { 12 /** 13 * Adds a job to the scheduler. 14 * Params: 15 * job = The job to be added. 16 */ 17 void addJob(ScheduledJob job); 18 19 /** 20 * Adds a job to the scheduler, with the given schedule to define when it 21 * should be run. 22 * Params: 23 * job = The job to be added. 24 * schedule = The schedule defining when the job is run. 25 */ 26 final void addJob(Job job, JobSchedule schedule) { 27 addJob(new ScheduledJob(job, schedule)); 28 } 29 30 /** 31 * Adds a simple job that executes the given function according to the 32 * given schedule. 33 * Params: 34 * fn = A function to execute. 35 * schedule = The schedule defining when to execute the function. 36 */ 37 final void addJob(void function() fn, JobSchedule schedule) { 38 addJob(new FunctionJob(fn), schedule); 39 } 40 41 /** 42 * Adds a job to the scheduler, whose schedule is defined by the given cron 43 * expression string. 44 * Params: 45 * job = The job to be added. 46 * cronExpressionString = A Cron expression string defining when to run the job. 47 */ 48 final void addCronJob(Job job, string cronExpressionString) { 49 addJob(job, new CronSchedule(cronExpressionString)); 50 } 51 52 /** 53 * Starts the scheduler. Once started, there is no guarantee that all 54 * scheduler implementations will allow adding new jobs while running. 55 */ 56 void start(); 57 58 /** 59 * Stops the scheduler. 60 * Params: 61 * force = Whether to forcibly shutdown, cancelling any current jobs. 62 */ 63 void stop(bool force); 64 65 /** 66 * Stops the scheduler, and waits for any currently-executing jobs to 67 * finish. Functionally equivalent to calling stop(false). 68 */ 69 final void stop() { 70 stop(false); 71 } 72 } 73 74 import core.thread; 75 76 /** 77 * A simple thread-based scheduler that sleeps until the next task, and runs it 78 * using a task pool. 79 */ 80 public class ThreadedJobScheduler : Thread, JobScheduler { 81 import std.parallelism; 82 import std.container.binaryheap; 83 import std.datetime.systime; 84 import core.time; 85 86 /** 87 * The maximum amount of time that this scheduler may sleep for. This is 88 * mainly used as a sanity check against clock deviations or other 89 * inconsistencies in timings. 90 */ 91 private static immutable Duration MAX_SLEEP_TIME = seconds(60); 92 93 private CurrentTimeProvider timeProvider; 94 private TaskPool taskPool; 95 private BinaryHeap!(ScheduledJob[]) jobPriorityQueue; 96 private shared bool running; 97 98 public this(TaskPool taskPool, CurrentTimeProvider timeProvider) { 99 super(&this.run); 100 this.taskPool = taskPool; 101 this.timeProvider = timeProvider; 102 this.jobPriorityQueue = BinaryHeap!(ScheduledJob[])([]); 103 } 104 105 public this() { 106 this(std.parallelism.taskPool(), new SysTimeProvider); 107 } 108 109 void addJob(ScheduledJob job) { 110 if (this.running) throw new Exception("Cannot add tasks while the scheduler is running."); 111 this.jobPriorityQueue.insert(job); 112 } 113 114 void start() { 115 super.start(); 116 } 117 118 /** 119 * Runs the scheduler. This works by popping the next scheduled task from 120 * the priority queue (since scheduled tasks are ordered by their next 121 * execution date) and sleeping until we reach that task's execution date. 122 */ 123 void run() { 124 this.running = true; 125 while (this.running && !this.jobPriorityQueue.empty) { 126 ScheduledJob job = this.jobPriorityQueue.front; 127 this.jobPriorityQueue.removeFront; 128 SysTime now = this.timeProvider.now; 129 auto nextExecutionTime = job.getSchedule.getNextExecutionTime(now); 130 // If the job doesn't have a next execution, skip it, don't requeue it, and try again. 131 if (nextExecutionTime.isNull) continue; 132 Duration timeUntilJob = hnsecs(nextExecutionTime.get.stdTime - now.stdTime); 133 134 // If the time until the next job is longer than our max sleep time, requeue the job and sleep as long as possible. 135 if (MAX_SLEEP_TIME < timeUntilJob) { 136 this.jobPriorityQueue.insert(job); 137 this.sleep(MAX_SLEEP_TIME); 138 } else { 139 // The time until the next job is close enough that we can sleep directly to it. 140 if (timeUntilJob > hnsecs(0)) { 141 this.sleep(timeUntilJob); 142 } 143 // Queue up running the job, and process all other aspects of it. 144 this.taskPool.put(task(&job.getJob.run)); 145 job.getSchedule.markExecuted(this.timeProvider.now); 146 if (job.getSchedule.isRepeating) { 147 this.jobPriorityQueue.insert(job); 148 } 149 } 150 } 151 } 152 153 void stop(bool force) { 154 this.running = false; 155 this.taskPool.finish(true); 156 } 157 }