feat: avoid some cloning when mirror requests to flownode (#4068)

* feat: some refactor mirror requests to flownode

* feat: use spawn_bg to avoid impact front-ground write

* feat: add mirror row count metric
This commit is contained in:
Jeremyhi
2024-05-30 15:29:13 +08:00
committed by GitHub
parent eab309ff7e
commit 8b6596faa0
2 changed files with 70 additions and 51 deletions

View File

@@ -15,7 +15,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::alter_expr::Kind;
use api::v1::region::{InsertRequests as RegionInsertRequests, RegionRequestHeader};
use api::v1::{
@@ -191,41 +190,6 @@ impl Inserter {
}
impl Inserter {
fn post_request(&self, requests: RegionInsertRequests) {
let node_manager = self.node_manager.clone();
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
// Spawn all tasks that do job for mirror insert requests for flownode
common_runtime::spawn_bg(async move {
match Self::mirror_flow_node_requests(table_flownode_set_cache, requests).await {
Ok(flow_tasks) => {
let flow_tasks = flow_tasks.into_iter().map(|(peer, inserts)| {
let node_manager = node_manager.clone();
common_runtime::spawn_write(async move {
node_manager
.flownode(&peer)
.await
.handle_inserts(inserts)
.await
.map(|flow_response| RegionResponse {
affected_rows: flow_response.affected_rows as AffectedRows,
extension: flow_response.extension,
})
.context(RequestInsertsSnafu)
})
});
if let Err(err) = future::try_join_all(flow_tasks)
.await
.context(JoinTaskSnafu)
{
warn!(err; "Failed to insert data into flownode");
}
}
Err(err) => warn!(err; "Failed to mirror request to flownode"),
}
});
}
async fn do_request(
&self,
requests: RegionInsertRequests,
@@ -238,8 +202,44 @@ impl Inserter {
..Default::default()
});
let tasks = self
.group_requests_by_peer(requests.clone())
// Mirror requests for source table to flownode
match self.mirror_flow_node_requests(&requests).await {
Ok(flow_requests) => {
let node_manager = self.node_manager.clone();
let flow_tasks = flow_requests.into_iter().map(|(peer, inserts)| {
let node_manager = node_manager.clone();
common_runtime::spawn_bg(async move {
node_manager
.flownode(&peer)
.await
.handle_inserts(inserts)
.await
.context(RequestInsertsSnafu)
})
});
match future::try_join_all(flow_tasks)
.await
.context(JoinTaskSnafu)
{
Ok(ret) => {
let affected_rows = ret
.into_iter()
.map(|resp| resp.map(|r| r.affected_rows))
.sum::<Result<u64>>()
.unwrap_or(0);
crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
}
Err(err) => {
warn!(err; "Failed to insert data into flownode");
}
}
}
Err(err) => warn!(err; "Failed to mirror request to flownode"),
}
let write_tasks = self
.group_requests_by_peer(requests)
.await?
.into_iter()
.map(|(peer, inserts)| {
@@ -254,8 +254,9 @@ impl Inserter {
.context(RequestInsertsSnafu)
})
});
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
self.post_request(requests);
let results = future::try_join_all(write_tasks)
.await
.context(JoinTaskSnafu)?;
let affected_rows = results
.into_iter()
.map(|resp| resp.map(|r| r.affected_rows))
@@ -269,21 +270,22 @@ impl Inserter {
/// Mirror requests for source table to flownode
async fn mirror_flow_node_requests(
table_flownode_set_cache: TableFlownodeSetCacheRef,
requests: RegionInsertRequests,
&self,
requests: &RegionInsertRequests,
) -> Result<HashMap<Peer, RegionInsertRequests>> {
// store partial source table requests used by flow node(only store what's used)
let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
HashMap::new();
for req in requests.requests {
match src_table_reqs.get_mut(&RegionId::from_u64(req.region_id).table_id()) {
Some(Some((_peers, reqs))) => reqs.requests.push(req),
for req in &requests.requests {
let table_id = RegionId::from_u64(req.region_id).table_id();
match src_table_reqs.get_mut(&table_id) {
Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
// already know this is not source table
Some(None) => continue,
_ => {
let table_id = RegionId::from_u64(req.region_id).table_id();
// TODO(discord9): determine where to store the flow node address in distributed mode
let peers = table_flownode_set_cache
let peers = self
.table_flownode_set_cache
.get(table_id)
.await
.context(RequestInsertsSnafu)?
@@ -294,7 +296,7 @@ impl Inserter {
if !peers.is_empty() {
let mut reqs = RegionInsertRequests::default();
reqs.requests.push(req);
reqs.requests.push(req.clone());
src_table_reqs.insert(table_id, Some((peers, reqs)));
} else {
// insert a empty entry to avoid repeat query
@@ -310,14 +312,26 @@ impl Inserter {
.into_iter()
.filter_map(|(k, v)| v.map(|v| (k, v)))
{
for flownode in peers {
if peers.len() == 1 {
// fast path, zero copy
inserts
.entry(flownode.clone())
.entry(peers[0].clone())
.or_default()
.requests
.extend(reqs.requests.clone());
.extend(reqs.requests);
continue;
} else {
// TODO(discord9): need to split requests to multiple flownodes
for flownode in peers {
inserts
.entry(flownode.clone())
.or_default()
.requests
.extend(reqs.requests.clone());
}
}
}
Ok(inserts)
}

View File

@@ -36,6 +36,11 @@ lazy_static! {
"table operator ingest rows"
)
.unwrap();
pub static ref DIST_MIRROR_ROW_COUNT: IntCounter = register_int_counter!(
"greptime_table_operator_mirror_rows",
"table operator mirror rows"
)
.unwrap();
pub static ref DIST_DELETE_ROW_COUNT: IntCounter = register_int_counter!(
"greptime_table_operator_delete_rows",
"table operator delete rows"