From 6442c968474a5aa24c784c50a55b295f19a661c2 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 25 Apr 2024 16:37:37 +0800 Subject: [PATCH] feat: render src/sink --- src/flow/src/compute/render/src_sink.rs | 97 +++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 src/flow/src/compute/render/src_sink.rs diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs new file mode 100644 index 0000000000..da3620b304 --- /dev/null +++ b/src/flow/src/compute/render/src_sink.rs @@ -0,0 +1,97 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Source and Sink for the dataflow + +use std::collections::{BTreeMap, VecDeque}; + +use hydroflow::scheduled::graph_ext::GraphExt; +use itertools::Itertools; +use snafu::OptionExt; +use tokio::sync::broadcast; + +use crate::adapter::error::{Error, PlanSnafu}; +use crate::compute::render::Context; +use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff}; +use crate::expr::GlobalId; +use crate::repr::{DiffRow, Row}; + +#[allow(clippy::mutable_key_type)] +impl<'referred, 'df> Context<'referred, 'df> { + /// Render a source which comes from brocast channel into the dataflow + /// will immediately send updates not greater than `now` and buffer the rest in arrangement + pub fn render_source( + &mut self, + mut src_recv: broadcast::Receiver, + ) -> Result { + let (send_port, recv_port) = self.df.make_edge::<_, Toff>("source"); + let arrange_handler = self.compute_state.new_arrange(None); + let arrange_handler_inner = + arrange_handler + .clone_future_only() + .with_context(|| PlanSnafu { + reason: "No write is expected at this point", + })?; + let now = self.compute_state.current_time_ref(); + let err_collector = self.err_collector.clone(); + + let sub = self + .df + .add_subgraph_source("source", send_port, move |_ctx, send| { + let now = *now.borrow(); + let arr = arrange_handler_inner.write().get_updates_in_range(..=now); + err_collector.run(|| arrange_handler_inner.write().compact_to(now)); + + let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d)); + let mut to_send = Vec::new(); + let mut to_arrange = Vec::new(); + // TODO(discord9): handling tokio broadcast error + while let Ok((r, t, d)) = src_recv.try_recv() { + if t <= now { + to_send.push((r, t, d)); + } else { + to_arrange.push(((r, Row::empty()), t, d)); + } + } + let all = prev_avail.chain(to_send); + err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange)); + send.give(all.collect_vec()); + }); + let arranged = Arranged::new(arrange_handler); + arranged.writer.borrow_mut().replace(sub); + let arranged = BTreeMap::from([(vec![], arranged)]); + Ok(CollectionBundle { + collection: Collection::from_port(recv_port), + arranged, + }) + } + + /// Render a sink which send updates to broadcast channel + pub fn render_sink(&mut self, bundle: CollectionBundle, sender: broadcast::Sender) { + let CollectionBundle { + collection, + arranged: _, + } = bundle; + let mut buf = VecDeque::with_capacity(1000); + self.df + .add_subgraph_sink("Sink", collection.into_inner(), move |_ctx, recv| { + let data = recv.take_inner(); + buf.extend(data.into_iter().flat_map(|i| i.into_iter())); + while let Some(row) = buf.pop_front() { + // TODO(discord9): handling tokio broadcast error + let _ = sender.send(row); + } + }); + } +}