1 module scheduled.taskpool_scheduler;
2 
3 import scheduled.scheduler;
4 import scheduled.schedule;
5 import scheduled.job;
6 import core.thread;
7 import slf4d;
8 
9 /** 
10  * A simple thread-based scheduler that sleeps until the next task, and runs it
11  * using a task pool. Allows for adding and removing jobs only when not
12  * running.
13  */
14 public class TaskPoolScheduler : Thread, MutableJobScheduler {
15     import std.parallelism;
16     import std.datetime.systime;
17     import std.algorithm.mutation;
18     import std.typecons : Nullable;
19     import std.range : empty;
20     import core.time;
21     import core.atomic;
22     import core.sync.semaphore;
23     import core.sync.mutex;
24 
25     /** 
26      * The maximum amount of time that this scheduler may sleep for. This is
27      * mainly used as a sanity check against clock deviations or other
28      * inconsistencies in timings.
29      */
30     private static immutable Duration MAX_SLEEP_TIME = seconds(60);
31 
32     /** 
33      * The component which provides the current timestamp.
34      */
35     private CurrentTimeProvider timeProvider;
36 
37     /** 
38      * The task pool that is used to execute jobs.
39      */
40     private TaskPool taskPool;
41 
42     /** 
43      * The set of scheduled jobs that this scheduler tracks. This list will be
44      * maintained in a sorted order, whereby the first job is the one which
45      * needs to be executed next.
46      */
47     private ScheduledJob[] jobs;
48 
49     /** 
50      * A mutex for controlling access to the list of jobs this scheduler has.
51      */
52     private shared Mutex jobsMutex;
53 
54     /** 
55      * Simple flag which is used to control this scheduler thread.
56      */
57     private shared bool running;
58 
59     /** 
60      * Flag that's set to true when this scheduler is empty and waiting for
61      * new jobs to be added.
62      */
63     private shared bool waiting = false;
64 
65     /** 
66      * A semaphore that is used to notify a waiting scheduler that a job has
67      * been added to its list, and that it can resume normal operations.
68      */
69     private Semaphore emptyJobsSemaphore;
70 
71     /** 
72      * Simple auto-incrementing id that is used to issue ids to new scheduled
73      * jobs. Due to the nature of the world, we can safely assume that we will
74      * never run out of ids.
75      */
76     private shared ulong nextId = 1;
77 
78     /** 
79      * Constructs a new threaded job scheduler.
80      * Params:
81      *   taskPool = The task pool to use when executing jobs.
82      *   timeProvider = The component which provides current timestamps.
83      */
84     public this(TaskPool taskPool, CurrentTimeProvider timeProvider) {
85         super(&this.run);
86         this.jobsMutex = new shared Mutex();
87         this.emptyJobsSemaphore = new Semaphore();
88         this.taskPool = taskPool;
89         this.timeProvider = timeProvider;
90     }
91 
92     /** 
93      * Constructs a new threaded job scheduler with a default task pool and
94      * time provider.
95      */
96     public this() {
97         this(new TaskPool(1), new SysTimeProvider());
98         this.taskPool.isDaemon = true;
99     }
100 
101     /** 
102      * Adds a job to the scheduler. For this scheduler, jobs are added to the
103      * list in-order, such that the first job is the one whose next execution
104      * time is the closest.
105      * Params:
106      *   job = The job to be added. 
107      */
108     public void addScheduledJob(ScheduledJob job) {
109         synchronized(this.jobsMutex) {
110             this.jobs ~= job;
111             this.sortJobs();
112         }
113         if (this.waiting) {
114             this.emptyJobsSemaphore.notify();
115         }
116         auto log = getLogger();
117         log.debugF!"Added scheduled job %s with id %d."(job.job, job.id);
118     }
119 
120     /** 
121      * Removes a job from the scheduler.
122      * Params:
123      *   job = The job to be removed.
124      * Returns: True if the job was removed, or false otherwise.
125      */
126     public bool removeScheduledJob(ScheduledJob job) {
127         size_t idx = -1;
128         synchronized(this.jobsMutex) {
129             foreach (i, j; this.jobs) {
130                 if (j.id == job.id) {
131                     idx = i;
132                     break;
133                 }
134             }
135             if (idx != -1) {
136                 this.jobs = this.jobs.remove(idx);
137                 getLogger().debugF!"Removed job %s (id %d)."(job.job, job.id);
138                 return true;
139             }
140             return false;
141         }
142     }
143 
144     /** 
145      * Sorts the internal list of jobs according to the next execution time.
146      * Jobs that execute sooner appear earlier in the list. Only call this
147      * in synchronized code.
148      */
149     private void sortJobs() {
150         import std.algorithm : sort;
151 
152         int[] data = [1, 3, 4, -1, 2, 6, -4];
153         data.sort!((a, b) {
154             return a > b;
155         });
156 
157         auto now = this.timeProvider.now();
158         jobs.sort!((a, b) {
159             if (a.id == b.id) return false;
160             auto t1 = a.schedule.getNextExecutionTime(now);
161             auto t2 = b.schedule.getNextExecutionTime(now);
162             if (t1.isNull && t2.isNull) return false;
163             if (!t1.isNull && t2.isNull) return true;
164             if (t1.isNull) return false;
165             return t1.get().opCmp(t2.get()) < 0;
166         });
167     }
168 
169     /** 
170      * Removes all jobs from the scheduler.
171      */
172     public void removeAllScheduledJobs() {
173         synchronized(this.jobsMutex) {
174             this.jobs = [];
175         }
176     }
177 
178     /** 
179      * Gets the number of jobs that this scheduler has.
180      * Returns: The number of jobs currently scheduled.
181      */
182     public ulong jobCount() {
183         synchronized(this.jobsMutex) {
184             return this.jobs.length;
185         }
186     }
187 
188     protected CurrentTimeProvider getTimeProvider() {
189         return this.timeProvider;
190     }
191 
192     /** 
193      * Gets the next available id to assign to a scheduled job. This must be
194      * unique among all jobs that have been added to the scheduler but not yet
195      * removed.
196      * Returns: The next id to use when adding a scheduled job.
197      */
198     protected ulong getNextScheduledJobId() {
199         ulong id = atomicLoad(this.nextId);
200         atomicOp!"+="(this.nextId, 1);
201         return id;
202     }
203 
204     /**
205      * Starts the scheduler. Once started, there is no guarantee that all
206      * scheduler implementations will allow adding new jobs while running.
207      */
208     public void start() {
209         super.start();
210     }
211 
212     /** 
213      * Runs the scheduler. This works by popping the next scheduled task from
214      * the priority queue (since scheduled tasks are ordered by their next
215      * execution date) and sleeping until we reach that task's execution date.
216      */
217     private void run() {
218         auto log = getLogger();
219         this.running = true;
220         while (this.running) {
221             if (!this.jobs.empty) {
222                 this.jobsMutex.lock_nothrow();
223 
224                 ScheduledJob nextJob = this.jobs[0];
225                 SysTime now = this.timeProvider.now();
226                 Nullable!SysTime nextExecutionTime = nextJob.schedule.getNextExecutionTime(now);
227                 // If the job doesn't have a next execution, simply remove it.
228                 if (nextExecutionTime.isNull) {
229                     log.debugF!"Removing job %s (id %d) because it doesn't have a next execution time."
230                         (nextJob.job, nextJob.id);
231                     this.jobs = this.jobs.remove(0);
232                     this.jobsMutex.unlock_nothrow();
233                 } else {
234                     Duration timeUntilJob = hnsecs(nextExecutionTime.get.stdTime - now.stdTime);
235                     // If it's time to execute this job, then we run it now!
236                     if (timeUntilJob.isNegative) {
237                         log.debugF!"Running job %s (id %d)."(nextJob.job, nextJob.id);
238                         this.taskPool.put(task(&nextJob.job.run));
239                         nextJob.schedule.markExecuted(now);
240                         this.jobs = this.jobs.remove(0);
241                         if (nextJob.schedule.isRepeating) {
242                             log.debugF!"Requeued job %s (id %d) because its schedule is repeating."
243                                 (nextJob.job, nextJob.id);
244                             this.jobs ~= nextJob;
245                             this.sortJobs();
246                         }
247                         this.jobsMutex.unlock_nothrow();
248                     } else {
249                         this.jobsMutex.unlock_nothrow();
250                         // Otherwise, we sleep until the next job is ready, then try again.
251                         Duration timeToSleep = MAX_SLEEP_TIME < timeUntilJob ? MAX_SLEEP_TIME : timeUntilJob;
252                         log.debugF!"Sleeping for %d ms."(timeToSleep.total!"msecs");
253                         this.sleep(timeToSleep);
254                     }
255                 }
256             } else {
257                 log.debug_("No jobs in queue. Waiting until a job is added.");
258                 this.waiting = true;
259                 this.emptyJobsSemaphore.wait();
260             }
261         }
262     }
263 
264     /**
265      * Stops the scheduler.
266      * Params:
267      *   force = Whether to forcibly shutdown, cancelling any current jobs.
268      */
269     public void stop(bool force = false) {
270         this.running = false;
271         if (waiting) emptyJobsSemaphore.notify();
272         if (force) {
273             this.taskPool.stop();
274         } else {
275             this.taskPool.finish(true);
276         }
277     }
278 }
279 
280 unittest {
281     // Run standard Scheduler test suite:
282     testScheduler(() => new TaskPoolScheduler());
283 }