diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 47525d13ca..88ab366bca 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -34,7 +34,7 @@ use common_meta::peer::Peer; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::Output; use common_telemetry::tracing_context::TracingContext; -use common_telemetry::{error, info, warn}; +use common_telemetry::{error, info}; use futures_util::future; use meter_macros::write_meter; use partition::manager::PartitionRuleManagerRef; @@ -338,50 +338,18 @@ impl Inserter { instant_requests, } = requests; - // Mirror requests for source table to flownode - match self - .mirror_flow_node_requests( - normal_requests - .requests - .iter() - .chain(instant_requests.requests.iter()), - ) - .await - { - Ok(flow_requests) => { - let node_manager = self.node_manager.clone(); - let flow_tasks = flow_requests.into_iter().map(|(peer, inserts)| { - let node_manager = node_manager.clone(); - common_runtime::spawn_global(async move { - node_manager - .flownode(&peer) - .await - .handle_inserts(inserts) - .await - .context(RequestInsertsSnafu) - }) - }); - - match future::try_join_all(flow_tasks) - .await - .context(JoinTaskSnafu) - { - Ok(ret) => { - let affected_rows = ret - .into_iter() - .map(|resp| resp.map(|r| r.affected_rows)) - .sum::>() - .unwrap_or(0); - crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows); - } - Err(err) => { - warn!(err; "Failed to insert data into flownode"); - } - } - } - Err(err) => warn!(err; "Failed to mirror request to flownode"), - } + // Mirror requests for source table to flownode asynchronously + let flow_mirror_task = FlowMirrorTask::new( + &self.table_flownode_set_cache, + normal_requests + .requests + .iter() + .chain(instant_requests.requests.iter()), + ) + .await?; + flow_mirror_task.detach(self.node_manager.clone())?; + // Write requests to datanode and wait for response let write_tasks = self .group_requests_by_peer(normal_requests) .await? @@ -412,72 +380,6 @@ impl Inserter { )) } - /// Mirror requests for source table to flownode - async fn mirror_flow_node_requests<'it, 'zelf: 'it>( - &'zelf self, - requests: impl Iterator, - ) -> Result> { - // store partial source table requests used by flow node(only store what's used) - let mut src_table_reqs: HashMap, RegionInsertRequests)>> = - HashMap::new(); - for req in requests { - let table_id = RegionId::from_u64(req.region_id).table_id(); - match src_table_reqs.get_mut(&table_id) { - Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()), - // already know this is not source table - Some(None) => continue, - _ => { - let peers = self - .table_flownode_set_cache - .get(table_id) - .await - .context(RequestInsertsSnafu)? - .unwrap_or_default() - .values() - .cloned() - .collect::>(); - - if !peers.is_empty() { - let mut reqs = RegionInsertRequests::default(); - reqs.requests.push(req.clone()); - src_table_reqs.insert(table_id, Some((peers, reqs))); - } else { - // insert a empty entry to avoid repeat query - src_table_reqs.insert(table_id, None); - } - } - } - } - - let mut inserts: HashMap = HashMap::new(); - - for (_table_id, (peers, reqs)) in src_table_reqs - .into_iter() - .filter_map(|(k, v)| v.map(|v| (k, v))) - { - if peers.len() == 1 { - // fast path, zero copy - inserts - .entry(peers[0].clone()) - .or_default() - .requests - .extend(reqs.requests); - continue; - } else { - // TODO(discord9): need to split requests to multiple flownodes - for flownode in peers { - inserts - .entry(flownode.clone()) - .or_default() - .requests - .extend(reqs.requests.clone()); - } - } - } - - Ok(inserts) - } - async fn group_requests_by_peer( &self, requests: RegionInsertRequests, @@ -915,3 +817,111 @@ struct CreateAlterTableResult { /// Table Info of the created tables. table_infos: HashMap>, } + +struct FlowMirrorTask { + requests: HashMap, + num_rows: usize, +} + +impl FlowMirrorTask { + async fn new( + cache: &TableFlownodeSetCacheRef, + requests: impl Iterator, + ) -> Result { + let mut src_table_reqs: HashMap, RegionInsertRequests)>> = + HashMap::new(); + let mut num_rows = 0; + + for req in requests { + let table_id = RegionId::from_u64(req.region_id).table_id(); + match src_table_reqs.get_mut(&table_id) { + Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()), + // already know this is not source table + Some(None) => continue, + _ => { + let peers = cache + .get(table_id) + .await + .context(RequestInsertsSnafu)? + .unwrap_or_default() + .values() + .cloned() + .collect::>(); + + if !peers.is_empty() { + let mut reqs = RegionInsertRequests::default(); + reqs.requests.push(req.clone()); + num_rows += reqs + .requests + .iter() + .map(|r| r.rows.as_ref().unwrap().rows.len()) + .sum::(); + src_table_reqs.insert(table_id, Some((peers, reqs))); + } else { + // insert a empty entry to avoid repeat query + src_table_reqs.insert(table_id, None); + } + } + } + } + + let mut inserts: HashMap = HashMap::new(); + + for (_table_id, (peers, reqs)) in src_table_reqs + .into_iter() + .filter_map(|(k, v)| v.map(|v| (k, v))) + { + if peers.len() == 1 { + // fast path, zero copy + inserts + .entry(peers[0].clone()) + .or_default() + .requests + .extend(reqs.requests); + continue; + } else { + // TODO(discord9): need to split requests to multiple flownodes + for flownode in peers { + inserts + .entry(flownode.clone()) + .or_default() + .requests + .extend(reqs.requests.clone()); + } + } + } + + Ok(Self { + requests: inserts, + num_rows, + }) + } + + fn detach(self, node_manager: NodeManagerRef) -> Result<()> { + crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64); + for (peer, inserts) in self.requests { + let node_manager = node_manager.clone(); + common_runtime::spawn_global(async move { + let result = node_manager + .flownode(&peer) + .await + .handle_inserts(inserts) + .await + .context(RequestInsertsSnafu); + + match result { + Ok(resp) => { + let affected_rows = resp.affected_rows; + crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows); + crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _); + } + Err(err) => { + error!(err; "Failed to insert data into flownode {}", peer); + } + } + }); + } + + Ok(()) + } +} diff --git a/src/operator/src/metrics.rs b/src/operator/src/metrics.rs index 9a77f9844d..e6a4827e48 100644 --- a/src/operator/src/metrics.rs +++ b/src/operator/src/metrics.rs @@ -41,6 +41,11 @@ lazy_static! { "table operator mirror rows" ) .unwrap(); + pub static ref DIST_MIRROR_PENDING_ROW_COUNT: IntGauge = register_int_gauge!( + "greptime_table_operator_mirror_pending_rows", + "table operator mirror pending rows" + ) + .unwrap(); pub static ref DIST_DELETE_ROW_COUNT: IntCounter = register_int_counter!( "greptime_table_operator_delete_rows", "table operator delete rows"