1 module scheduled.scheduler;
2 
3 import scheduled.job;
4 import scheduled.schedule;
5 import scheduled.schedules.cron_schedule;
6 
7 /** 
8  * A scheduler is the core component of the library; you add jobs to the job
9  * scheduler, and then it will execute these according to the job's schedule.
10  */
11 public interface JobScheduler {
12     /** 
13      * Adds a job to the scheduler.
14      * Params:
15      *   job = The job to be added. 
16      */
17     void addJob(ScheduledJob job);
18 
19     /**
20      * Adds a job to the scheduler, with the given schedule to define when it
21      * should be run.
22      * Params:
23      *   job = The job to be added.
24      *   schedule = The schedule defining when the job is run.
25      */
26     final void addJob(Job job, JobSchedule schedule) {
27         addJob(new ScheduledJob(job, schedule));
28     }
29 
30     /** 
31      * Adds a simple job that executes the given function according to the
32      * given schedule.
33      * Params:
34      *   fn = A function to execute.
35      *   schedule = The schedule defining when to execute the function.
36      */
37     final void addJob(void function() fn, JobSchedule schedule) {
38         addJob(new FunctionJob(fn), schedule);
39     }
40 
41     /** 
42      * Adds a job to the scheduler, whose schedule is defined by the given cron
43      * expression string.
44      * Params:
45      *   job = The job to be added.
46      *   cronExpressionString = A Cron expression string defining when to run the job.
47      */
48     final void addCronJob(Job job, string cronExpressionString) {
49         addJob(job, new CronSchedule(cronExpressionString));
50     }
51 
52     /**
53      * Starts the scheduler. Once started, there is no guarantee that all
54      * scheduler implementations will allow adding new jobs while running.
55      */
56     void start();
57 
58     /**
59      * Stops the scheduler.
60      * Params:
61      *   force = Whether to forcibly shutdown, cancelling any current jobs.
62      */
63     void stop(bool force);
64 
65     /** 
66      * Stops the scheduler, and waits for any currently-executing jobs to
67      * finish. Functionally equivalent to calling stop(false).
68      */
69     final void stop() {
70         stop(false);
71     }
72 }
73 
74 import core.thread;
75 
76 /** 
77  * A simple thread-based scheduler that sleeps until the next task, and runs it
78  * using a task pool.
79  */
80 public class ThreadedJobScheduler : Thread, JobScheduler {
81     import std.parallelism;
82     import std.container.binaryheap;
83     import std.datetime.systime;
84     import core.time;
85 
86     /** 
87      * The maximum amount of time that this scheduler may sleep for. This is
88      * mainly used as a sanity check against clock deviations or other
89      * inconsistencies in timings.
90      */
91     private static immutable Duration MAX_SLEEP_TIME = seconds(60);
92 
93     private CurrentTimeProvider timeProvider;
94     private TaskPool taskPool;
95     private BinaryHeap!(ScheduledJob[]) jobPriorityQueue;
96     private shared bool running;
97 
98     public this(TaskPool taskPool, CurrentTimeProvider timeProvider) {
99         super(&this.run);
100         this.taskPool = taskPool;
101         this.timeProvider = timeProvider;
102         this.jobPriorityQueue = BinaryHeap!(ScheduledJob[])([]);
103     }
104 
105     public this() {
106         this(std.parallelism.taskPool(), new SysTimeProvider);
107     }
108 
109     void addJob(ScheduledJob job) {
110         if (this.running) throw new Exception("Cannot add tasks while the scheduler is running.");
111         this.jobPriorityQueue.insert(job);
112     }
113 
114     void start() {
115         super.start();
116     }
117 
118     /** 
119      * Runs the scheduler. This works by popping the next scheduled task from
120      * the priority queue (since scheduled tasks are ordered by their next
121      * execution date) and sleeping until we reach that task's execution date.
122      */
123     void run() {
124         this.running = true;
125         while (this.running && !this.jobPriorityQueue.empty) {
126             ScheduledJob job = this.jobPriorityQueue.front;
127             this.jobPriorityQueue.removeFront;
128             SysTime now = this.timeProvider.now;
129             auto nextExecutionTime = job.getSchedule.getNextExecutionTime(now);
130             // If the job doesn't have a next execution, skip it, don't requeue it, and try again.
131             if (nextExecutionTime.isNull) continue;
132             Duration timeUntilJob = hnsecs(nextExecutionTime.get.stdTime - now.stdTime);
133             
134             // If the time until the next job is longer than our max sleep time, requeue the job and sleep as long as possible.
135             if (MAX_SLEEP_TIME < timeUntilJob) {
136                 this.jobPriorityQueue.insert(job);
137                 this.sleep(MAX_SLEEP_TIME);
138             } else {
139                 // The time until the next job is close enough that we can sleep directly to it.
140                 if (timeUntilJob > hnsecs(0)) {
141                     this.sleep(timeUntilJob);
142                 }
143                 // Queue up running the job, and process all other aspects of it.
144                 this.taskPool.put(task(&job.getJob.run));
145                 job.getSchedule.markExecuted(this.timeProvider.now);
146                 if (job.getSchedule.isRepeating) {
147                     this.jobPriorityQueue.insert(job);
148                 }
149             }
150         }
151     }
152 
153     void stop(bool force) {
154         this.running = false;
155         this.taskPool.finish(true);
156     }
157 }