1 /** 2 * Defines the scheduler interface, and a simple thread-based implementation 3 * of the scheduler. 4 */ 5 module scheduled.scheduler; 6 7 import scheduled.job; 8 import scheduled.schedule; 9 import scheduled.schedules.cron_schedule; 10 11 /** 12 * A pairing of a Job with a JobSchedule that tells it when to execute. This is 13 * the basic operational unit that JobSchedulers will deal with. 14 */ 15 public struct ScheduledJob { 16 public CurrentTimeProvider timeProvider; 17 public JobSchedule schedule; 18 public Job job; 19 public long id; 20 } 21 22 /** 23 * A scheduler is the core component of the library; you add jobs to the job 24 * scheduler, and then it will execute these according to the job's schedule. 25 */ 26 public interface JobScheduler { 27 /** 28 * Gets a default job scheduler that's recommended to use for most cases. 29 * Returns: A job scheduler. 30 */ 31 public static JobScheduler getDefault() { 32 import scheduled.taskpool_scheduler : TaskPoolScheduler; 33 return new TaskPoolScheduler(); 34 } 35 36 /** 37 * Adds a job to the scheduler. 38 * Params: 39 * job = The job to be added. 40 */ 41 protected void addScheduledJob(ScheduledJob job); 42 43 /** 44 * Adds a job to the scheduler, with the given schedule to define when it 45 * should be run. 46 * Params: 47 * job = The job to be added. 48 * schedule = The schedule defining when the job is run. 49 * Returns: The scheduled job. 50 */ 51 public final ScheduledJob addJob(Job job, JobSchedule schedule) { 52 auto sj = ScheduledJob(this.getTimeProvider(), schedule, job, this.getNextScheduledJobId()); 53 addScheduledJob(sj); 54 return sj; 55 } 56 57 /** 58 * Adds a simple job that executes the given function according to the 59 * given schedule. 60 * Params: 61 * fn = A function to execute. 62 * schedule = The schedule defining when to execute the function. 63 * Returns: The scheduled job. 64 */ 65 public final ScheduledJob addJob(void function() fn, JobSchedule schedule) { 66 return addJob(new FunctionJob(fn), schedule); 67 } 68 69 /** 70 * Adds a job to the scheduler, whose schedule is defined by the given cron 71 * expression string. 72 * Params: 73 * job = The job to be added. 74 * cronExpressionString = A Cron expression string defining when to run the job. 75 * Returns: The scheduled job. 76 */ 77 public final ScheduledJob addCronJob(Job job, string cronExpressionString) { 78 return addJob(job, new CronSchedule(cronExpressionString)); 79 } 80 81 /** 82 * Adds a job to the scheduler, whose schedule is defined by the given cron 83 * expression string. 84 * Params: 85 * fn = A function to execute. 86 * cronExpressionString = A Cron expression string defining when to run the job. 87 * Returns: The scheduled job. 88 */ 89 public final ScheduledJob addCronJob(void function() fn, string cronExpressionString) { 90 return addJob(new FunctionJob(fn), new CronSchedule(cronExpressionString)); 91 } 92 93 /** 94 * Gets this scheduler's time provider. 95 * Returns: The scheduler's time provider. 96 */ 97 protected CurrentTimeProvider getTimeProvider(); 98 99 /** 100 * Gets the next available id to assign to a scheduled job. This must be 101 * unique among all jobs that have been added to the scheduler but not yet 102 * removed. 103 * Returns: The next id to use when adding a scheduled job. 104 */ 105 protected ulong getNextScheduledJobId(); 106 107 /** 108 * Starts the scheduler. Once started, there is no guarantee that all 109 * scheduler implementations will allow adding new jobs while running. 110 */ 111 public void start(); 112 113 /** 114 * Stops the scheduler. This method blocks until shutdown is complete. 115 * Params: 116 * force = Whether to forcibly shutdown, cancelling any current jobs. 117 */ 118 public void stop(bool force); 119 120 /** 121 * Stops the scheduler, and waits for any currently-executing jobs to 122 * finish. Equivalent to calling stop(false). 123 */ 124 public final void stop() { 125 stop(false); 126 } 127 } 128 129 /** 130 * A job scheduler which offers additional functionality for modifying the set 131 * of scheduled jobs after they're submitted. 132 */ 133 public interface MutableJobScheduler : JobScheduler { 134 /** 135 * Removes a job from the scheduler. 136 * Params: 137 * job = The job to remove. 138 * Returns: True if the job was removed, or false otherwise. 139 */ 140 public bool removeScheduledJob(ScheduledJob job); 141 } 142 143 // For testing, we provide a standardized test suite for a Scheduler to ensure 144 // that it is compliant to the interface(s). 145 // It is therefore recommended that all implementations of JobScheduler call 146 // `testScheduler(() => new MyCustomScheduler());` in their unit test. 147 version(unittest) { 148 import scheduled.schedules; 149 import std.datetime; 150 import slf4d; 151 152 alias SchedulerFactory = JobScheduler delegate(); 153 154 public void testScheduler(SchedulerFactory factory) { 155 import std.string : format; 156 auto sampleScheduler = factory(); 157 string name = format!"%s"(sampleScheduler); 158 sampleScheduler.stop(); 159 auto log = getLogger(); 160 161 log.infoF!"Testing %s.addJob(...)"(name); 162 testAddJob(factory); 163 log.infoF!"Testing %s.getNextScheduledJobId()"(name); 164 testGetNextScheduledJobId(factory); 165 log.infoF!"Testing %s.start()"(name); 166 testStart(factory); 167 log.infoF!"Testing %s.stop()"(name); 168 testStop(factory); 169 log.infoF!"Testing of %s is successful."(name); 170 } 171 172 private void testAddJob(SchedulerFactory factory) { 173 auto scheduler = factory(); 174 IncrementJob job = new IncrementJob(); 175 auto scheduledJob = scheduler.addJob(job, new FixedIntervalSchedule(msecs(10))); 176 assert(job.x == 0); 177 assert(scheduledJob.job == job); 178 179 auto anotherScheduledJob = scheduler.addJob(job, new FixedIntervalSchedule(msecs(20))); 180 assert(anotherScheduledJob.job == job); 181 assert(anotherScheduledJob.id != scheduledJob.id); 182 } 183 184 private void testGetNextScheduledJobId(SchedulerFactory factory) { 185 import std.algorithm : canFind; 186 auto scheduler = factory(); 187 IncrementJob job = new IncrementJob(); 188 ulong[] ids = []; 189 for (int i = 0; i < 1000; i++) { 190 ulong newId = scheduler.getNextScheduledJobId(); 191 assert(!canFind(ids, newId)); 192 ids ~= newId; 193 scheduler.addJob(job, new FixedIntervalSchedule(msecs(5))); 194 } 195 } 196 197 /** 198 * Tests the `start()` function of the scheduler, and therefore also the 199 * main operation of the scheduler to execute jobs after starting. 200 * Params: 201 * factory = The scheduler factory. 202 */ 203 private void testStart(SchedulerFactory factory) { 204 import core.thread : Thread; 205 206 auto scheduler = factory(); 207 IncrementJob jobA = new IncrementJob(); 208 scheduler.addJob(jobA, new FixedIntervalSchedule(msecs(5))); 209 IncrementJob jobB = new IncrementJob(); 210 scheduler.addJob(jobB, new FixedIntervalSchedule(msecs(10))); 211 IncrementJob jobC = new IncrementJob(); 212 scheduler.addJob(jobC, new OneTimeSchedule(msecs(5))); 213 214 // Before starting, all jobs should not have ran. 215 assert(jobA.x == 0); 216 assert(jobB.x == 0); 217 assert(jobC.x == 0); 218 219 scheduler.start(); 220 Thread.sleep(msecs(1)); 221 // After starting and running a bit, both interval jobs should now have ran once. 222 assert(jobA.x == 1); 223 assert(jobB.x == 1); 224 225 Thread.sleep(msecs(11)); 226 // After a total of 12ms, jobA should have ran 3 times at t=0, 5, 10, and jobB 2 times at t=0, 10. 227 assert(jobA.x == 3); 228 assert(jobB.x == 2); 229 // jobC should have ran once, and not been re-queued. 230 assert(jobC.x == 1); 231 232 scheduler.stop(); 233 } 234 235 private void testStop(SchedulerFactory factory) { 236 import core.thread : Thread; 237 238 // Test stopping and waiting for tasks to finish. 239 auto scheduler = factory(); 240 LongIncrementJob jobA = new LongIncrementJob(msecs(5)); 241 scheduler.addJob(jobA, new FixedIntervalSchedule(msecs(10))); 242 assert(jobA.x == 0); 243 scheduler.start(); 244 Thread.sleep(msecs(1)); 245 scheduler.stop(false); 246 assert(jobA.x == 1); 247 248 // Test forcefully stopping. 249 scheduler = factory(); 250 LongIncrementJob jobB = new LongIncrementJob(msecs(1000)); 251 scheduler.addJob(jobB, new FixedIntervalSchedule(seconds(5))); 252 assert(jobB.x == 0); 253 scheduler.start(); 254 Thread.sleep(msecs(1)); 255 scheduler.stop(true); 256 assert(jobB.x == 0); 257 } 258 259 private class IncrementJob : Job { 260 public uint x = 0; 261 public void run() { 262 x++; 263 } 264 } 265 266 private class LongIncrementJob : IncrementJob { 267 private Duration dur; 268 269 public this(Duration dur = msecs(5)) { 270 this.dur = dur; 271 } 272 273 public override void run() { 274 import core.thread : Thread; 275 Thread.sleep(this.dur); 276 super.run(); 277 } 278 } 279 }