From d99746385b8da491d31a953b1d26a348cf00bc73 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 8 May 2024 16:41:06 +0900 Subject: [PATCH] refactor: move mirror insertion tasks to the background runtime (#3879) --- src/operator/src/insert.rs | 78 ++++++++++++++++++++++---------------- 1 file changed, 45 insertions(+), 33 deletions(-) diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 63727b663f..d2d2cb9e9f 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -32,7 +32,7 @@ use common_meta::peer::Peer; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::Output; use common_telemetry::tracing_context::TracingContext; -use common_telemetry::{error, info}; +use common_telemetry::{error, info, warn}; use datatypes::schema::Schema; use futures_util::{future, TryStreamExt}; use meter_macros::write_meter; @@ -194,6 +194,41 @@ impl Inserter { } impl Inserter { + fn post_request(&self, requests: RegionInsertRequests) { + let node_manager = self.node_manager.clone(); + let table_flow_manager = self.table_flow_manager.clone(); + // Spawn all tasks that do job for mirror insert requests for flownode + common_runtime::spawn_bg(async move { + match Self::mirror_flow_node_requests(table_flow_manager, requests).await { + Ok(flow_tasks) => { + let flow_tasks = flow_tasks.into_iter().map(|(peer, inserts)| { + let node_manager = node_manager.clone(); + common_runtime::spawn_write(async move { + node_manager + .flownode(&peer) + .await + .handle_inserts(inserts) + .await + .map(|flow_response| RegionResponse { + affected_rows: flow_response.affected_rows as AffectedRows, + extension: flow_response.extension, + }) + .context(RequestInsertsSnafu) + }) + }); + + if let Err(err) = future::try_join_all(flow_tasks) + .await + .context(JoinTaskSnafu) + { + warn!(err; "Failed to insert data into flownode"); + } + } + Err(err) => warn!(err; "Failed to mirror request to flownode"), + } + }); + } + async fn do_request( &self, requests: RegionInsertRequests, @@ -206,29 +241,8 @@ impl Inserter { ..Default::default() }); - // spawn all tasks that do job for mirror insert requests for flownode - let flow_tasks = self - .mirror_flow_node_requests(&requests) - .await? - .into_iter() - .map(|(peer, inserts)| { - let node_manager = self.node_manager.clone(); - common_runtime::spawn_write(async move { - node_manager - .flownode(&peer) - .await - .handle_inserts(inserts) - .await - .map(|flow_response| RegionResponse { - affected_rows: flow_response.affected_rows as AffectedRows, - extension: flow_response.extension, - }) - .context(RequestInsertsSnafu) - }) - }); - let tasks = self - .group_requests_by_peer(requests) + .group_requests_by_peer(requests.clone()) .await? .into_iter() .map(|(peer, inserts)| { @@ -242,10 +256,9 @@ impl Inserter { .await .context(RequestInsertsSnafu) }) - }) - .chain(flow_tasks); + }); let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?; - + self.post_request(requests); let affected_rows = results .into_iter() .map(|resp| resp.map(|r| r.affected_rows)) @@ -259,21 +272,20 @@ impl Inserter { /// Mirror requests for source table to flownode async fn mirror_flow_node_requests( - &self, - requests: &RegionInsertRequests, + table_flow_manager: TableFlowManagerRef, + requests: RegionInsertRequests, ) -> Result> { // store partial source table requests used by flow node(only store what's used) let mut src_table_reqs: HashMap, RegionInsertRequests)>> = HashMap::new(); - for req in &requests.requests { + for req in requests.requests { match src_table_reqs.get_mut(&RegionId::from_u64(req.region_id).table_id()) { - Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()), + Some(Some((_peers, reqs))) => reqs.requests.push(req), // already know this is not source table Some(None) => continue, _ => { let table_id = RegionId::from_u64(req.region_id).table_id(); - let peers = self - .table_flow_manager + let peers = table_flow_manager .flows(table_id) // TODO(discord9): determine where to store the flow node address in distributed mode .map_ok(|key| Peer::new(key.flownode_id(), "")) @@ -287,7 +299,7 @@ impl Inserter { if !peers.is_empty() { let mut reqs = RegionInsertRequests::default(); - reqs.requests.push(req.clone()); + reqs.requests.push(req); src_table_reqs.insert(table_id, Some((peers, reqs))); } else { // insert a empty entry to avoid repeat query