fix: range begin>range end

This commit is contained in:
discord9
2024-05-24 16:01:54 +08:00
parent ffecb6882e
commit 0b324563ac
5 changed files with 25 additions and 9 deletions

View File

@@ -513,9 +513,8 @@ impl FlownodeManager {
/// However this is not blocking and can sometimes return while actual computation is still running in worker thread
/// TODO(discord9): add flag for subgraph that have input since last run
pub async fn run_available(&self) -> Result<(), Error> {
let now = self.tick_manager.tick();
loop {
let now = self.tick_manager.tick();
for worker in self.worker_handles.iter() {
// TODO(discord9): consider how to handle error in individual worker
worker.lock().await.run_available(now).await.unwrap();
@@ -553,6 +552,8 @@ impl FlownodeManager {
);
let table_id = region_id.table_id();
self.node_context.lock().await.send(table_id, rows)?;
// TODO(discord9): put it in a background task?
self.run_available().await?;
Ok(())
}
}

View File

@@ -124,9 +124,13 @@ fn mfp_subgraph(
// 1. Read all updates that were emitted between the last time this arrangement had updates and the current time.
// 2. Output the updates.
// 3. Truncate all updates within that range.
let from = arrange.read().last_compaction_time().map(|n| n + 1);
let from = arrange.read().last_compaction_time();
let from = from.unwrap_or(repr::Timestamp::MIN);
let output_kv = arrange.read().get_updates_in_range(from..=now);
let range = (
std::ops::Bound::Excluded(from),
std::ops::Bound::Included(now),
);
let output_kv = arrange.read().get_updates_in_range(range);
// the output is expected to be key -> empty val
let output = output_kv
.into_iter()

View File

@@ -301,9 +301,13 @@ fn update_reduce_distinct_arrange(
// Deal with output:
// 1. Read all updates that were emitted between the last time this arrangement had updates and the current time.
let from = arrange.read().last_compaction_time().map(|n| n + 1);
let from = arrange.read().last_compaction_time();
let from = from.unwrap_or(repr::Timestamp::MIN);
let output_kv = arrange.read().get_updates_in_range(from..=now);
let range = (
std::ops::Bound::Excluded(from),
std::ops::Bound::Included(now),
);
let output_kv = arrange.read().get_updates_in_range(range);
// 2. Truncate all updates stored in arrangement within that range.
let run_compaction = || {

View File

@@ -55,9 +55,12 @@ impl<'referred, 'df> Context<'referred, 'df> {
.df
.add_subgraph_source("source", send_port, move |_ctx, send| {
let now = *now.borrow();
let arr = arrange_handler_inner.write().get_updates_in_range(..=now);
err_collector.run(|| arrange_handler_inner.write().compact_to(now));
// write lock to prevent unexpected mutation
let mut arranged = arrange_handler_inner.write();
let arr = arranged.get_updates_in_range(..=now);
err_collector.run(|| arranged.compact_to(now));
debug!("Call source");
let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d));
let mut to_send = Vec::new();
let mut to_arrange = Vec::new();
@@ -77,7 +80,7 @@ impl<'referred, 'df> Context<'referred, 'df> {
to_arrange.len()
);
}
err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange));
err_collector.run(|| arranged.apply_updates(now, to_arrange));
send.give(all);
// always schedule source to run at now so we can repeatedly run source if needed
inner_schd.schedule_at(now);

View File

@@ -567,6 +567,10 @@ impl ArrangeHandler {
pub fn set_full_arrangement(&self, full: bool) {
self.write().full_arrangement = full;
}
pub fn is_full_arrangement(&self) -> bool {
self.read().full_arrangement
}
}
#[cfg(test)]