Add metadata

This commit is contained in:
Bojan Serafimov
2022-05-24 14:12:53 -04:00
parent 98d8d65c83
commit 3975417cae

View File

@@ -13,8 +13,13 @@ enum JobError<J: Job> {
#[derive(Debug)]
enum JobStatus<J: Job> where J::ErrorType: Debug {
Ready,
Running(Instant), // TODO add worker id
Ready {
scheduled_for: Instant,
},
Running {
worker_name: String,
started_at: Instant,
},
Stuck(JobError<J>),
}
@@ -47,7 +52,10 @@ impl<J: Job> Eq for Deadline<J> where J::ErrorType: Debug { }
#[derive(Debug)]
struct JobStatusTable<J: Job> where J::ErrorType: Debug {
/// Complete summary of current state
status: HashMap<J, JobStatus<J>>,
/// Index over status for finding the next scheduled job
queue: BinaryHeap<Deadline<J>>,
}
@@ -84,11 +92,14 @@ impl<J: Job> Pool<J> where J::ErrorType: Debug {
}
}
fn worker_main(&self) -> anyhow::Result<()> {
fn worker_main(&self, worker_name: String) -> anyhow::Result<()> {
let mut job_table = self.job_table.lock().unwrap();
loop {
if let Some(Deadline {job, ..}) = job_table.pop_due() {
job_table.set_status(&job, JobStatus::Running(Instant::now()));
job_table.set_status(&job, JobStatus::Running {
worker_name,
started_at: Instant::now(),
});
// Run job without holding lock
drop(job_table);
@@ -100,14 +111,16 @@ impl<J: Job> Pool<J> where J::ErrorType: Debug {
// Update job status
match result {
Ok(Ok(Some(reschedule_for))) => {
job_table.set_status(&job, JobStatus::Ready);
job_table.set_status(&job, JobStatus::Ready {
scheduled_for: reschedule_for,
});
job_table.queue.push(Deadline {
job: job.clone(),
start_by: reschedule_for,
})
},
Ok(Ok(None)) => {
// TODO remove from job table
job_table.status.remove(&job);
},
Ok(Err(e)) => {
job_table.set_status(&job, JobStatus::Stuck(JobError::Error(e)));
@@ -133,12 +146,14 @@ impl<J: Job> Pool<J> where J::ErrorType: Debug {
}
fn queue_job(&self, job: J) {
// Add the job to the back of the queue
let mut job_table = self.job_table.lock().unwrap();
job_table.status.insert(job.clone(), JobStatus::Ready);
let scheduled_for = Instant::now();
job_table.status.insert(job.clone(), JobStatus::Ready {
scheduled_for,
});
job_table.queue.push(Deadline {
job: job.clone(),
start_by: Instant::now(),
start_by: scheduled_for,
});
self.condvar.notify_all();
@@ -184,7 +199,7 @@ mod tests {
"test_worker_1",
true,
move || {
TEST_POOL.get().unwrap().worker_main()
TEST_POOL.get().unwrap().worker_main("test_worker_1".into())
},
).unwrap();
@@ -195,7 +210,7 @@ mod tests {
"test_worker_2",
true,
move || {
TEST_POOL.get().unwrap().worker_main()
TEST_POOL.get().unwrap().worker_main("test_worker_2".into())
},
).unwrap();