refactor(flow): refine comments and code (#3785)

* refactor(flow): refine comments and code

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* doc: description of the properties of removed keys

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: `get`'s fast path for cur val

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: discord9 <discord9@163.com>
This commit is contained in:
Zhenchi
2024-04-24 15:09:53 +08:00
committed by GitHub
parent 62037ee4c8
commit 659d34a170
4 changed files with 470 additions and 442 deletions

View File

@@ -124,7 +124,6 @@ fn mfp_subgraph(
// 1. Read all updates that were emitted between the last time this arrangement had updates and the current time.
// 2. Output the updates.
// 3. Truncate all updates within that range.
let from = arrange.read().last_compaction_time().map(|n| n + 1);
let from = from.unwrap_or(repr::Timestamp::MIN);
let output_kv = arrange.read().get_updates_in_range(from..=now);
@@ -135,7 +134,7 @@ fn mfp_subgraph(
.collect_vec();
send.give(output);
let run_compaction = || {
arrange.write().compaction_to(now)?;
arrange.write().compact_to(now)?;
Ok(())
};
err_collector.run(run_compaction);

View File

@@ -46,9 +46,7 @@ pub struct DataflowState {
impl DataflowState {
pub fn new_arrange(&mut self, name: Option<Vec<String>>) -> ArrangeHandler {
let arrange = name
.map(Arrangement::new_with_name)
.unwrap_or_else(Arrangement::new);
let arrange = name.map(Arrangement::new_with_name).unwrap_or_default();
let arr = ArrangeHandler::from(arrange);
// mark this arrange as used in this dataflow

View File

@@ -16,6 +16,7 @@
//! It can transform substrait plan into it's own plan and execute it.
//! It also contains definition of expression, adapter and plan, and internal state management.
#![feature(let_chains)]
#![allow(dead_code)]
#![allow(unused_imports)]
#![warn(missing_docs)]

File diff suppressed because it is too large Load Diff