From 0b324563acb0fd2c90aa0c59d3ccabbb173bc90e Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 24 May 2024 16:01:54 +0800 Subject: [PATCH] fix: range begin>range end --- src/flow/src/adapter.rs | 5 +++-- src/flow/src/compute/render/map.rs | 8 ++++++-- src/flow/src/compute/render/reduce.rs | 8 ++++++-- src/flow/src/compute/render/src_sink.rs | 9 ++++++--- src/flow/src/utils.rs | 4 ++++ 5 files changed, 25 insertions(+), 9 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index e0d0428fc3..9e3bcea199 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -513,9 +513,8 @@ impl FlownodeManager { /// However this is not blocking and can sometimes return while actual computation is still running in worker thread /// TODO(discord9): add flag for subgraph that have input since last run pub async fn run_available(&self) -> Result<(), Error> { - let now = self.tick_manager.tick(); - loop { + let now = self.tick_manager.tick(); for worker in self.worker_handles.iter() { // TODO(discord9): consider how to handle error in individual worker worker.lock().await.run_available(now).await.unwrap(); @@ -553,6 +552,8 @@ impl FlownodeManager { ); let table_id = region_id.table_id(); self.node_context.lock().await.send(table_id, rows)?; + // TODO(discord9): put it in a background task? + self.run_available().await?; Ok(()) } } diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index 2261f4de14..50bd48f5fb 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -124,9 +124,13 @@ fn mfp_subgraph( // 1. Read all updates that were emitted between the last time this arrangement had updates and the current time. // 2. Output the updates. // 3. Truncate all updates within that range. - let from = arrange.read().last_compaction_time().map(|n| n + 1); + let from = arrange.read().last_compaction_time(); let from = from.unwrap_or(repr::Timestamp::MIN); - let output_kv = arrange.read().get_updates_in_range(from..=now); + let range = ( + std::ops::Bound::Excluded(from), + std::ops::Bound::Included(now), + ); + let output_kv = arrange.read().get_updates_in_range(range); // the output is expected to be key -> empty val let output = output_kv .into_iter() diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 7a443e9991..8321f10187 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -301,9 +301,13 @@ fn update_reduce_distinct_arrange( // Deal with output: // 1. Read all updates that were emitted between the last time this arrangement had updates and the current time. - let from = arrange.read().last_compaction_time().map(|n| n + 1); + let from = arrange.read().last_compaction_time(); let from = from.unwrap_or(repr::Timestamp::MIN); - let output_kv = arrange.read().get_updates_in_range(from..=now); + let range = ( + std::ops::Bound::Excluded(from), + std::ops::Bound::Included(now), + ); + let output_kv = arrange.read().get_updates_in_range(range); // 2. Truncate all updates stored in arrangement within that range. let run_compaction = || { diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 2371a98edd..3d5a6a7bd2 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -55,9 +55,12 @@ impl<'referred, 'df> Context<'referred, 'df> { .df .add_subgraph_source("source", send_port, move |_ctx, send| { let now = *now.borrow(); - let arr = arrange_handler_inner.write().get_updates_in_range(..=now); - err_collector.run(|| arrange_handler_inner.write().compact_to(now)); + // write lock to prevent unexpected mutation + let mut arranged = arrange_handler_inner.write(); + let arr = arranged.get_updates_in_range(..=now); + err_collector.run(|| arranged.compact_to(now)); + debug!("Call source"); let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d)); let mut to_send = Vec::new(); let mut to_arrange = Vec::new(); @@ -77,7 +80,7 @@ impl<'referred, 'df> Context<'referred, 'df> { to_arrange.len() ); } - err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange)); + err_collector.run(|| arranged.apply_updates(now, to_arrange)); send.give(all); // always schedule source to run at now so we can repeatedly run source if needed inner_schd.schedule_at(now); diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index ea1885e506..1762d02552 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -567,6 +567,10 @@ impl ArrangeHandler { pub fn set_full_arrangement(&self, full: bool) { self.write().full_arrangement = full; } + + pub fn is_full_arrangement(&self) -> bool { + self.read().full_arrangement + } } #[cfg(test)]