1 /**
2  * Defines the scheduler interface, and a simple thread-based implementation
3  * of the scheduler.
4  */
5 module scheduled.scheduler;
6 
7 import scheduled.job;
8 import scheduled.schedule;
9 import scheduled.schedules.cron_schedule;
10 
11 /** 
12  * A pairing of a Job with a JobSchedule that tells it when to execute. This is
13  * the basic operational unit that JobSchedulers will deal with.
14  */
15 public struct ScheduledJob {
16     public CurrentTimeProvider timeProvider;
17     public JobSchedule schedule;
18     public Job job;
19     public long id;
20 }
21 
22 /** 
23  * A scheduler is the core component of the library; you add jobs to the job
24  * scheduler, and then it will execute these according to the job's schedule.
25  */
26 public interface JobScheduler {
27     /** 
28      * Gets a default job scheduler that's recommended to use for most cases.
29      * Returns: A job scheduler.
30      */
31     public static JobScheduler getDefault() {
32         import scheduled.taskpool_scheduler : TaskPoolScheduler;
33         return new TaskPoolScheduler();
34     }
35 
36     /** 
37      * Adds a job to the scheduler.
38      * Params:
39      *   job = The job to be added. 
40      */
41     protected void addScheduledJob(ScheduledJob job);
42 
43     /**
44      * Adds a job to the scheduler, with the given schedule to define when it
45      * should be run.
46      * Params:
47      *   job = The job to be added.
48      *   schedule = The schedule defining when the job is run.
49      * Returns: The scheduled job.
50      */
51     public final ScheduledJob addJob(Job job, JobSchedule schedule) {
52         auto sj = ScheduledJob(this.getTimeProvider(), schedule, job, this.getNextScheduledJobId());
53         addScheduledJob(sj);
54         return sj;
55     }
56 
57     /** 
58      * Adds a simple job that executes the given function according to the
59      * given schedule.
60      * Params:
61      *   fn = A function to execute.
62      *   schedule = The schedule defining when to execute the function.
63      * Returns: The scheduled job.
64      */
65     public final ScheduledJob addJob(void function() fn, JobSchedule schedule) {
66         return addJob(new FunctionJob(fn), schedule);
67     }
68 
69     /** 
70      * Adds a job to the scheduler, whose schedule is defined by the given cron
71      * expression string.
72      * Params:
73      *   job = The job to be added.
74      *   cronExpressionString = A Cron expression string defining when to run the job.
75      * Returns: The scheduled job.
76      */
77     public final ScheduledJob addCronJob(Job job, string cronExpressionString) {
78         return addJob(job, new CronSchedule(cronExpressionString));
79     }
80 
81     /** 
82      * Adds a job to the scheduler, whose schedule is defined by the given cron
83      * expression string.
84      * Params:
85      *   fn = A function to execute.
86      *   cronExpressionString = A Cron expression string defining when to run the job.
87      * Returns: The scheduled job.
88      */
89     public final ScheduledJob addCronJob(void function() fn, string cronExpressionString) {
90         return addJob(new FunctionJob(fn), new CronSchedule(cronExpressionString));
91     }
92 
93     /** 
94      * Gets this scheduler's time provider.
95      * Returns: The scheduler's time provider.
96      */
97     protected CurrentTimeProvider getTimeProvider();
98 
99     /** 
100      * Gets the next available id to assign to a scheduled job. This must be
101      * unique among all jobs that have been added to the scheduler but not yet
102      * removed.
103      * Returns: The next id to use when adding a scheduled job.
104      */
105     protected ulong getNextScheduledJobId();
106 
107     /**
108      * Starts the scheduler. Once started, there is no guarantee that all
109      * scheduler implementations will allow adding new jobs while running.
110      */
111     public void start();
112 
113     /**
114      * Stops the scheduler. This method blocks until shutdown is complete.
115      * Params:
116      *   force = Whether to forcibly shutdown, cancelling any current jobs.
117      */
118     public void stop(bool force);
119 
120     /** 
121      * Stops the scheduler, and waits for any currently-executing jobs to
122      * finish. Equivalent to calling stop(false).
123      */
124     public final void stop() {
125         stop(false);
126     }
127 }
128 
129 /** 
130  * A job scheduler which offers additional functionality for modifying the set
131  * of scheduled jobs after they're submitted.
132  */
133 public interface MutableJobScheduler : JobScheduler {
134     /** 
135      * Removes a job from the scheduler.
136      * Params:
137      *   job = The job to remove.
138      * Returns: True if the job was removed, or false otherwise.
139      */
140     public bool removeScheduledJob(ScheduledJob job);
141 }
142 
143 // For testing, we provide a standardized test suite for a Scheduler to ensure
144 // that it is compliant to the interface(s).
145 // It is therefore recommended that all implementations of JobScheduler call
146 // `testScheduler(() => new MyCustomScheduler());` in their unit test.
147 version(unittest) {
148     import scheduled.schedules;
149     import std.datetime;
150     import slf4d;
151 
152     alias SchedulerFactory = JobScheduler delegate();
153 
154     public void testScheduler(SchedulerFactory factory) {
155         import std.string : format;
156         auto sampleScheduler = factory();
157         string name = format!"%s"(sampleScheduler);
158         sampleScheduler.stop();
159         auto log = getLogger();
160 
161         log.infoF!"Testing %s.addJob(...)"(name);
162         testAddJob(factory);
163         log.infoF!"Testing %s.getNextScheduledJobId()"(name);
164         testGetNextScheduledJobId(factory);
165         log.infoF!"Testing %s.start()"(name);
166         testStart(factory);
167         log.infoF!"Testing %s.stop()"(name);
168         testStop(factory);
169         log.infoF!"Testing of %s is successful."(name);
170     }
171 
172     private void testAddJob(SchedulerFactory factory) {
173         auto scheduler = factory();
174         IncrementJob job = new IncrementJob();
175         auto scheduledJob = scheduler.addJob(job, new FixedIntervalSchedule(msecs(10)));
176         assert(job.x == 0);
177         assert(scheduledJob.job == job);
178 
179         auto anotherScheduledJob = scheduler.addJob(job, new FixedIntervalSchedule(msecs(20)));
180         assert(anotherScheduledJob.job == job);
181         assert(anotherScheduledJob.id != scheduledJob.id);
182     }
183 
184     private void testGetNextScheduledJobId(SchedulerFactory factory) {
185         import std.algorithm : canFind;
186         auto scheduler = factory();
187         IncrementJob job = new IncrementJob();
188         ulong[] ids = [];
189         for (int i = 0; i < 1000; i++) {
190             ulong newId = scheduler.getNextScheduledJobId();
191             assert(!canFind(ids, newId));
192             ids ~= newId;
193             scheduler.addJob(job, new FixedIntervalSchedule(msecs(5)));
194         }
195     }
196 
197     /** 
198      * Tests the `start()` function of the scheduler, and therefore also the
199      * main operation of the scheduler to execute jobs after starting.
200      * Params:
201      *   factory = The scheduler factory.
202      */
203     private void testStart(SchedulerFactory factory) {
204         import core.thread : Thread;
205 
206         auto scheduler = factory();
207         IncrementJob jobA = new IncrementJob();
208         scheduler.addJob(jobA, new FixedIntervalSchedule(msecs(5)));
209         IncrementJob jobB = new IncrementJob();
210         scheduler.addJob(jobB, new FixedIntervalSchedule(msecs(10)));
211         IncrementJob jobC = new IncrementJob();
212         scheduler.addJob(jobC, new OneTimeSchedule(msecs(5)));
213 
214         // Before starting, all jobs should not have ran.
215         assert(jobA.x == 0);
216         assert(jobB.x == 0);
217         assert(jobC.x == 0);
218 
219         scheduler.start();
220         Thread.sleep(msecs(1));
221         // After starting and running a bit, both interval jobs should now have ran once.
222         assert(jobA.x == 1);
223         assert(jobB.x == 1);
224 
225         Thread.sleep(msecs(11));
226         // After a total of 12ms, jobA should have ran 3 times at t=0, 5, 10, and jobB 2 times at t=0, 10.
227         assert(jobA.x == 3);
228         assert(jobB.x == 2);
229         // jobC should have ran once, and not been re-queued.
230         assert(jobC.x == 1);
231 
232         scheduler.stop();
233     }
234 
235     private void testStop(SchedulerFactory factory) {
236         import core.thread : Thread;
237 
238         // Test stopping and waiting for tasks to finish.
239         auto scheduler = factory();
240         LongIncrementJob jobA = new LongIncrementJob(msecs(5));
241         scheduler.addJob(jobA, new FixedIntervalSchedule(msecs(10)));
242         assert(jobA.x == 0);
243         scheduler.start();
244         Thread.sleep(msecs(1));
245         scheduler.stop(false);
246         assert(jobA.x == 1);
247 
248         // Test forcefully stopping.
249         scheduler = factory();
250         LongIncrementJob jobB = new LongIncrementJob(msecs(1000));
251         scheduler.addJob(jobB, new FixedIntervalSchedule(seconds(5)));
252         assert(jobB.x == 0);
253         scheduler.start();
254         Thread.sleep(msecs(1));
255         scheduler.stop(true);
256         assert(jobB.x == 0);
257     }
258 
259     private class IncrementJob : Job {
260         public uint x = 0;
261         public void run() {
262             x++;
263         }
264     }
265 
266     private class LongIncrementJob : IncrementJob {
267         private Duration dur;
268 
269         public this(Duration dur = msecs(5)) {
270             this.dur = dur;
271         }
272 
273         public override void run() {
274             import core.thread : Thread;
275             Thread.sleep(this.dur);
276             super.run();
277         }
278     }
279 }