ThreadedJobScheduler

A simple thread-based scheduler that sleeps until the next task, and runs it using a task pool. Allows for adding and removing jobs only when not running.

Constructors

this
this(TaskPool taskPool, CurrentTimeProvider timeProvider)

Constructs a new threaded job scheduler.

this
this()

Constructs a new threaded job scheduler with a default task pool and time provider.

Members

Functions

addScheduledJob
void addScheduledJob(ScheduledJob job)

Adds a job to the scheduler. For this scheduler, jobs are added to the list in-order, such that the first job is the one whose next execution time is the closest.

getNextScheduledJobId
ulong getNextScheduledJobId()

Gets the next available id to assign to a scheduled job. This must be unique among all jobs that have been added to the scheduler but not yet removed.

jobCount
ulong jobCount()

Gets the number of jobs that this scheduler has.

removeScheduledJob
bool removeScheduledJob(ScheduledJob job)

Removes a job from the scheduler.

start
void start()

Starts the scheduler. Once started, there is no guarantee that all scheduler implementations will allow adding new jobs while running.

stop
void stop(bool force)

Stops the scheduler.

Inherited Members

From MutableJobScheduler

removeScheduledJob
bool removeScheduledJob(ScheduledJob job)

Removes a job from the scheduler.

Examples

Tests the functionality of the threaded job scheduler, by running it for some different simple jobs to ensure the jobs are executed properly.

1 import core.thread;
2 import core.atomic;
3 import std.format;
4 import std.experimental.logger;
5 import scheduled.schedules.fixed_interval;
6 import std.stdio;
7 
8 // Create a simple job which increments a variable by 1.
9 class IncrementJob : Job {
10     public uint x = 0;
11     public string id;
12     public this(string id) {
13         this.id = id;
14     }
15 
16     public void run() {
17         x++;
18         import std.stdio;
19         writefln!"[%s] Incrementing x to %d"(id, x);
20     }
21 }
22 
23 void assertJobStatus(IncrementJob j, uint expected) {
24     assert(j.x == expected, format("Job %s executed %d times instead of the expected %d.", j.id, j.x, expected));
25 }
26 
27 // Test case 1: Scheduler with a single job.
28 
29 JobScheduler scheduler = new ThreadedJobScheduler;
30 auto inc1 = new IncrementJob("1");
31 scheduler.addJob(inc1, new FixedIntervalSchedule(msecs(50)));
32 scheduler.start();
33 Thread.sleep(msecs(130));
34 // We expect the job to be executed at t = 0, 50, and 100 ms.
35 assert(inc1.x == 3, "Job did not execute the expected number of times.");
36 scheduler.stop();
37 
38 // Test case 2: Scheduler with multiple jobs.
39 writeln("Scheduler 1 complete");
40 
41 ThreadedJobScheduler scheduler2 = new ThreadedJobScheduler;
42 auto incA = new IncrementJob("A");
43 auto incB = new IncrementJob("B");
44 ScheduledJob sjA = scheduler2.addJob(incA, new FixedIntervalSchedule(msecs(50)));
45 ScheduledJob sjB = scheduler2.addJob(incB, new FixedIntervalSchedule(msecs(80)));
46 assert(scheduler2.jobCount == 2);
47 scheduler2.start();
48 writeln("Starting scheduler 2");
49 Thread.sleep(msecs(180));
50 // We expect job A to be executed at t = 0, 50, 100, and 150.
51 assertJobStatus(incA, 4);
52 // We expect job B to be executed at t = 0, 80, and 160.
53 assertJobStatus(incB, 3);
54 // Try and remove a job.
55 writeln("Removing scheduled job A");
56 assert(scheduler2.removeScheduledJob(sjA));
57 assert(scheduler2.jobCount == 1);
58 assert(!scheduler2.removeScheduledJob(sjA));
59 Thread.sleep(msecs(170));
60 // We expect job B to be executed at t = 0, 80.
61 assertJobStatus(incB, 5);
62 // We expect job A to not be executed since its scheduled job was removed.
63 assertJobStatus(incA, 4);
64 
65 // Remove all jobs, wait a bit, and add one back.
66 writeln("Removing scheduled job B and waiting a while.");
67 assert(scheduler2.removeScheduledJob(sjB));
68 assert(scheduler2.jobCount == 0);
69 Thread.sleep(msecs(100));
70 writeln("Adding scheduled job C");
71 auto incC = new IncrementJob("C");
72 ScheduledJob sjC = scheduler2.addJob(incC, new FixedIntervalSchedule(msecs(30)));
73 assert(scheduler2.jobCount == 1);
74 Thread.sleep(msecs(100));
75 // We expect job C to be executed at t = 0, 30, 60, 90.
76 assertJobStatus(incC, 4);
77 scheduler2.stop(false);

Meta