1 /**
2  * Defines the scheduler interface, and a simple thread-based implementation
3  * of the scheduler.
4  */
5 module scheduled.scheduler;
6 
7 import scheduled.job;
8 import scheduled.schedule;
9 import scheduled.schedules.cron_schedule;
10 
11 /** 
12  * A scheduler is the core component of the library; you add jobs to the job
13  * scheduler, and then it will execute these according to the job's schedule.
14  */
15 public interface JobScheduler {
16     /** 
17      * Adds a job to the scheduler.
18      * Params:
19      *   job = The job to be added. 
20      */
21     protected void addScheduledJob(ScheduledJob job);
22 
23     /**
24      * Adds a job to the scheduler, with the given schedule to define when it
25      * should be run.
26      * Params:
27      *   job = The job to be added.
28      *   schedule = The schedule defining when the job is run.
29      * Returns: The scheduled job.
30      */
31     public final ScheduledJob addJob(Job job, JobSchedule schedule) {
32         auto sj = new ScheduledJob(job, schedule, getNextScheduledJobId());
33         addScheduledJob(sj);
34         return sj;
35     }
36 
37     /** 
38      * Adds a simple job that executes the given function according to the
39      * given schedule.
40      * Params:
41      *   fn = A function to execute.
42      *   schedule = The schedule defining when to execute the function.
43      * Returns: The scheduled job.
44      */
45     public final ScheduledJob addJob(void function() fn, JobSchedule schedule) {
46         return addJob(new FunctionJob(fn), schedule);
47     }
48 
49     /** 
50      * Adds a job to the scheduler, whose schedule is defined by the given cron
51      * expression string.
52      * Params:
53      *   job = The job to be added.
54      *   cronExpressionString = A Cron expression string defining when to run the job.
55      * Returns: The scheduled job.
56      */
57     public final ScheduledJob addCronJob(Job job, string cronExpressionString) {
58         return addJob(job, new CronSchedule(cronExpressionString));
59     }
60 
61     /** 
62      * Gets the next available id to assign to a scheduled job. This must be
63      * unique among all jobs that have been added to the scheduler but not yet
64      * removed.
65      * Returns: The next id to use when adding a scheduled job.
66      */
67     protected ulong getNextScheduledJobId();
68 
69     /**
70      * Starts the scheduler. Once started, there is no guarantee that all
71      * scheduler implementations will allow adding new jobs while running.
72      */
73     public void start();
74 
75     /**
76      * Stops the scheduler.
77      * Params:
78      *   force = Whether to forcibly shutdown, cancelling any current jobs.
79      */
80     public void stop(bool force);
81 
82     /** 
83      * Stops the scheduler, and waits for any currently-executing jobs to
84      * finish. Functionally equivalent to calling stop(false).
85      */
86     public final void stop() {
87         stop(false);
88     }
89 }
90 
91 /** 
92  * A job scheduler which offers additional functionality for modifying the set
93  * of scheduled jobs after they're submitted.
94  */
95 public interface MutableJobScheduler : JobScheduler {
96     /** 
97      * Removes a job from the scheduler.
98      * Params:
99      *   job = The job to remove.
100      * Returns: True if the job was removed, or false otherwise.
101      */
102     public bool removeScheduledJob(ScheduledJob job);
103 }
104 
105 import core.thread;
106 
107 /** 
108  * A simple thread-based scheduler that sleeps until the next task, and runs it
109  * using a task pool. Allows for adding and removing jobs only when not
110  * running.
111  */
112 public class ThreadedJobScheduler : Thread, MutableJobScheduler {
113     import std.parallelism;
114     import std.datetime.systime;
115     import std.algorithm.mutation;
116     import std.algorithm.sorting : sort;
117     import std.typecons : Nullable;
118     import std.range : empty;
119     import core.time;
120     import core.atomic;
121     import core.sync.semaphore;
122     import core.sync.mutex;
123 
124     /** 
125      * The maximum amount of time that this scheduler may sleep for. This is
126      * mainly used as a sanity check against clock deviations or other
127      * inconsistencies in timings.
128      */
129     private static immutable Duration MAX_SLEEP_TIME = seconds(60);
130 
131     /** 
132      * The component which provides the current timestamp.
133      */
134     private CurrentTimeProvider timeProvider;
135 
136     /** 
137      * The task pool that is used to execute jobs.
138      */
139     private TaskPool taskPool;
140 
141     /** 
142      * The set of scheduled jobs that this scheduler tracks. This list will be
143      * maintained in a sorted order, whereby the first job is the one which
144      * needs to be executed next.
145      */
146     private ScheduledJob[] jobs;
147 
148     /** 
149      * A mutex for controlling access to the list of jobs this scheduler has.
150      */
151     private shared Mutex jobsMutex;
152 
153     /** 
154      * Simple flag which is used to control this scheduler thread.
155      */
156     private shared bool running;
157 
158     /** 
159      * Flag that's set to true when this scheduler is empty and waiting for
160      * new jobs to be added. This indicates.
161      */
162     private shared bool waiting = false;
163 
164     /** 
165      * A semaphore that is used to notify a waiting scheduler that a job has
166      * been added to its list, and that it can resume normal operations.
167      */
168     private Semaphore emptyJobsSemaphore;
169 
170     /** 
171      * Simple auto-incrementing id that is used to issue ids to new scheduled
172      * jobs. Due to the nature of the world, we can safely assume that we will
173      * never run out of ids.
174      */
175     private shared ulong nextId = 1;
176 
177     /** 
178      * Constructs a new threaded job scheduler.
179      * Params:
180      *   taskPool = The task pool to use when executing jobs.
181      *   timeProvider = The component which provides current timestamps.
182      */
183     public this(TaskPool taskPool, CurrentTimeProvider timeProvider) {
184         super(&this.run);
185         this.jobsMutex = new shared Mutex();
186         this.emptyJobsSemaphore = new Semaphore();
187         this.taskPool = taskPool;
188         this.timeProvider = timeProvider;
189     }
190 
191     /** 
192      * Constructs a new threaded job scheduler with a default task pool and
193      * time provider.
194      */
195     public this() {
196         this(new TaskPool(1), new SysTimeProvider);
197     }
198 
199     /** 
200      * Adds a job to the scheduler. For this scheduler, jobs are added to the
201      * list in-order, such that the first job is the one whose next execution
202      * time is the closest.
203      * Params:
204      *   job = The job to be added. 
205      */
206     public void addScheduledJob(ScheduledJob job) {
207         this.jobsMutex.lock_nothrow();
208         this.jobs ~= job;
209         this.jobs.sort!("a > b");
210         this.jobsMutex.unlock_nothrow();
211         if (this.waiting) {
212             this.emptyJobsSemaphore.notify();
213         }
214     }
215 
216     /** 
217      * Removes a job from the scheduler.
218      * Params:
219      *   job = The job to be removed.
220      * Returns: True if the job was removed, or false otherwise.
221      */
222     public bool removeScheduledJob(ScheduledJob job) {
223         size_t idx = -1;
224         this.jobsMutex.lock_nothrow();
225         foreach (i, j; this.jobs) {
226             if (j.getId() == job.getId()) {
227                 idx = i;
228                 break;
229             }
230         }
231         if (idx != -1) {
232             this.jobs = this.jobs.remove(idx);
233             this.jobsMutex.unlock_nothrow();
234             return true;
235         }
236         this.jobsMutex.unlock_nothrow();
237         return false;
238     }
239 
240     /** 
241      * Gets the number of jobs that this scheduler has.
242      * Returns: The number of jobs currently scheduled.
243      */
244     public ulong jobCount() {
245         return this.jobs.length;
246     }
247 
248     /** 
249      * Gets the next available id to assign to a scheduled job. This must be
250      * unique among all jobs that have been added to the scheduler but not yet
251      * removed.
252      * Returns: The next id to use when adding a scheduled job.
253      */
254     protected ulong getNextScheduledJobId() {
255         ulong id = atomicLoad(this.nextId);
256         atomicOp!"+="(this.nextId, 1);
257         return id;
258     }
259 
260     /**
261      * Starts the scheduler. Once started, there is no guarantee that all
262      * scheduler implementations will allow adding new jobs while running.
263      */
264     public void start() {
265         super.start();
266     }
267 
268     /** 
269      * Runs the scheduler. This works by popping the next scheduled task from
270      * the priority queue (since scheduled tasks are ordered by their next
271      * execution date) and sleeping until we reach that task's execution date.
272      */
273     private void run() {
274         this.running = true;
275         while (this.running) {
276             if (!this.jobs.empty) {
277                 this.jobsMutex.lock_nothrow();
278                 ScheduledJob nextJob = this.jobs[0];
279                 SysTime now = this.timeProvider.now();
280                 Nullable!SysTime nextExecutionTime = nextJob.getSchedule().getNextExecutionTime(now);
281                 // If the job doesn't have a next execution, simply remove it.
282                 if (nextExecutionTime.isNull) {
283                     this.jobs = this.jobs.remove(0);
284                     this.jobsMutex.unlock_nothrow();
285                 } else {
286                     Duration timeUntilJob = hnsecs(nextExecutionTime.get.stdTime - now.stdTime);
287                     // If it's time to execute this job, then we run it now!
288                     if (timeUntilJob.isNegative) {
289                         this.taskPool.put(task(&nextJob.getJob.run));
290                         nextJob.getSchedule.markExecuted(now);
291                         this.jobs = this.jobs.remove(0);
292                         if (nextJob.getSchedule.isRepeating) {
293                             this.jobs ~= nextJob;
294                             this.jobs.sort!("a > b");
295                         }
296                         this.jobsMutex.unlock_nothrow();
297                     } else {
298                         this.jobsMutex.unlock_nothrow();
299                         // Otherwise, we sleep until the next job is ready, then try again.
300                         if (MAX_SLEEP_TIME < timeUntilJob) {
301                             this.sleep(MAX_SLEEP_TIME);
302                         } else {
303                             this.sleep(timeUntilJob);
304                         }
305                     }
306                 }
307             } else {
308                 this.waiting = true;
309                 this.emptyJobsSemaphore.wait();
310             }
311         }
312     }
313 
314     /**
315      * Stops the scheduler.
316      * Params:
317      *   force = Whether to forcibly shutdown, cancelling any current jobs.
318      */
319     public void stop(bool force) {
320         this.running = false;
321         this.taskPool.stop();
322     }
323 }
324 
325 /**
326  * Tests the functionality of the threaded job scheduler, by running it for
327  * some different simple jobs to ensure the jobs are executed properly.
328  */
329 unittest {
330     import core.thread;
331     import core.atomic;
332     import std.format;
333     import std.experimental.logger;
334     import scheduled.schedules.fixed_interval;
335     import std.stdio;
336 
337     // Create a simple job which increments a variable by 1.
338     class IncrementJob : Job {
339         public uint x = 0;
340         public string id;
341         public this(string id) {
342             this.id = id;
343         }
344 
345         public void run() {
346             x++;
347             import std.stdio;
348             writefln!"[%s] Incrementing x to %d"(id, x);
349         }
350     }
351 
352     void assertJobStatus(IncrementJob j, uint expected) {
353         assert(j.x == expected, format("Job %s executed %d times instead of the expected %d.", j.id, j.x, expected));
354     }
355 
356     // Test case 1: Scheduler with a single job.
357 
358     JobScheduler scheduler = new ThreadedJobScheduler;
359     auto inc1 = new IncrementJob("1");
360     scheduler.addJob(inc1, new FixedIntervalSchedule(msecs(50)));
361     scheduler.start();
362     Thread.sleep(msecs(130));
363     // We expect the job to be executed at t = 0, 50, and 100 ms.
364     assert(inc1.x == 3, "Job did not execute the expected number of times.");
365     scheduler.stop();
366 
367     // Test case 2: Scheduler with multiple jobs.
368     writeln("Scheduler 1 complete");
369 
370     ThreadedJobScheduler scheduler2 = new ThreadedJobScheduler;
371     auto incA = new IncrementJob("A");
372     auto incB = new IncrementJob("B");
373     ScheduledJob sjA = scheduler2.addJob(incA, new FixedIntervalSchedule(msecs(50)));
374     ScheduledJob sjB = scheduler2.addJob(incB, new FixedIntervalSchedule(msecs(80)));
375     assert(scheduler2.jobCount == 2);
376     scheduler2.start();
377     writeln("Starting scheduler 2");
378     Thread.sleep(msecs(180));
379     // We expect job A to be executed at t = 0, 50, 100, and 150.
380     assertJobStatus(incA, 4);
381     // We expect job B to be executed at t = 0, 80, and 160.
382     assertJobStatus(incB, 3);
383     // Try and remove a job.
384     writeln("Removing scheduled job A");
385     assert(scheduler2.removeScheduledJob(sjA));
386     assert(scheduler2.jobCount == 1);
387     assert(!scheduler2.removeScheduledJob(sjA));
388     Thread.sleep(msecs(170));
389     // We expect job B to be executed at t = 0, 80.
390     assertJobStatus(incB, 5);
391     // We expect job A to not be executed since its scheduled job was removed.
392     assertJobStatus(incA, 4);
393     
394     // Remove all jobs, wait a bit, and add one back.
395     writeln("Removing scheduled job B and waiting a while.");
396     assert(scheduler2.removeScheduledJob(sjB));
397     assert(scheduler2.jobCount == 0);
398     Thread.sleep(msecs(100));
399     writeln("Adding scheduled job C");
400     auto incC = new IncrementJob("C");
401     ScheduledJob sjC = scheduler2.addJob(incC, new FixedIntervalSchedule(msecs(30)));
402     assert(scheduler2.jobCount == 1);
403     Thread.sleep(msecs(100));
404     // We expect job C to be executed at t = 0, 30, 60, 90.
405     assertJobStatus(incC, 4);
406     scheduler2.stop(false);
407 }