feat: mirror insert request to flownode in async (#5444)

* feat: mirror insert request to flownode in async

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-01-24 21:42:27 +08:00
committed by GitHub
parent d870987a65
commit 7c5ead90ac
2 changed files with 125 additions and 110 deletions

View File

@@ -34,7 +34,7 @@ use common_meta::peer::Peer;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::Output;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info, warn};
use common_telemetry::{error, info};
use futures_util::future;
use meter_macros::write_meter;
use partition::manager::PartitionRuleManagerRef;
@@ -338,50 +338,18 @@ impl Inserter {
instant_requests,
} = requests;
// Mirror requests for source table to flownode
match self
.mirror_flow_node_requests(
normal_requests
.requests
.iter()
.chain(instant_requests.requests.iter()),
)
.await
{
Ok(flow_requests) => {
let node_manager = self.node_manager.clone();
let flow_tasks = flow_requests.into_iter().map(|(peer, inserts)| {
let node_manager = node_manager.clone();
common_runtime::spawn_global(async move {
node_manager
.flownode(&peer)
.await
.handle_inserts(inserts)
.await
.context(RequestInsertsSnafu)
})
});
match future::try_join_all(flow_tasks)
.await
.context(JoinTaskSnafu)
{
Ok(ret) => {
let affected_rows = ret
.into_iter()
.map(|resp| resp.map(|r| r.affected_rows))
.sum::<Result<u64>>()
.unwrap_or(0);
crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
}
Err(err) => {
warn!(err; "Failed to insert data into flownode");
}
}
}
Err(err) => warn!(err; "Failed to mirror request to flownode"),
}
// Mirror requests for source table to flownode asynchronously
let flow_mirror_task = FlowMirrorTask::new(
&self.table_flownode_set_cache,
normal_requests
.requests
.iter()
.chain(instant_requests.requests.iter()),
)
.await?;
flow_mirror_task.detach(self.node_manager.clone())?;
// Write requests to datanode and wait for response
let write_tasks = self
.group_requests_by_peer(normal_requests)
.await?
@@ -412,72 +380,6 @@ impl Inserter {
))
}
/// Mirror requests for source table to flownode
async fn mirror_flow_node_requests<'it, 'zelf: 'it>(
&'zelf self,
requests: impl Iterator<Item = &'it RegionInsertRequest>,
) -> Result<HashMap<Peer, RegionInsertRequests>> {
// store partial source table requests used by flow node(only store what's used)
let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
HashMap::new();
for req in requests {
let table_id = RegionId::from_u64(req.region_id).table_id();
match src_table_reqs.get_mut(&table_id) {
Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
// already know this is not source table
Some(None) => continue,
_ => {
let peers = self
.table_flownode_set_cache
.get(table_id)
.await
.context(RequestInsertsSnafu)?
.unwrap_or_default()
.values()
.cloned()
.collect::<Vec<_>>();
if !peers.is_empty() {
let mut reqs = RegionInsertRequests::default();
reqs.requests.push(req.clone());
src_table_reqs.insert(table_id, Some((peers, reqs)));
} else {
// insert a empty entry to avoid repeat query
src_table_reqs.insert(table_id, None);
}
}
}
}
let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
for (_table_id, (peers, reqs)) in src_table_reqs
.into_iter()
.filter_map(|(k, v)| v.map(|v| (k, v)))
{
if peers.len() == 1 {
// fast path, zero copy
inserts
.entry(peers[0].clone())
.or_default()
.requests
.extend(reqs.requests);
continue;
} else {
// TODO(discord9): need to split requests to multiple flownodes
for flownode in peers {
inserts
.entry(flownode.clone())
.or_default()
.requests
.extend(reqs.requests.clone());
}
}
}
Ok(inserts)
}
async fn group_requests_by_peer(
&self,
requests: RegionInsertRequests,
@@ -915,3 +817,111 @@ struct CreateAlterTableResult {
/// Table Info of the created tables.
table_infos: HashMap<TableId, Arc<TableInfo>>,
}
struct FlowMirrorTask {
requests: HashMap<Peer, RegionInsertRequests>,
num_rows: usize,
}
impl FlowMirrorTask {
async fn new(
cache: &TableFlownodeSetCacheRef,
requests: impl Iterator<Item = &RegionInsertRequest>,
) -> Result<Self> {
let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
HashMap::new();
let mut num_rows = 0;
for req in requests {
let table_id = RegionId::from_u64(req.region_id).table_id();
match src_table_reqs.get_mut(&table_id) {
Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
// already know this is not source table
Some(None) => continue,
_ => {
let peers = cache
.get(table_id)
.await
.context(RequestInsertsSnafu)?
.unwrap_or_default()
.values()
.cloned()
.collect::<Vec<_>>();
if !peers.is_empty() {
let mut reqs = RegionInsertRequests::default();
reqs.requests.push(req.clone());
num_rows += reqs
.requests
.iter()
.map(|r| r.rows.as_ref().unwrap().rows.len())
.sum::<usize>();
src_table_reqs.insert(table_id, Some((peers, reqs)));
} else {
// insert a empty entry to avoid repeat query
src_table_reqs.insert(table_id, None);
}
}
}
}
let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
for (_table_id, (peers, reqs)) in src_table_reqs
.into_iter()
.filter_map(|(k, v)| v.map(|v| (k, v)))
{
if peers.len() == 1 {
// fast path, zero copy
inserts
.entry(peers[0].clone())
.or_default()
.requests
.extend(reqs.requests);
continue;
} else {
// TODO(discord9): need to split requests to multiple flownodes
for flownode in peers {
inserts
.entry(flownode.clone())
.or_default()
.requests
.extend(reqs.requests.clone());
}
}
}
Ok(Self {
requests: inserts,
num_rows,
})
}
fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
for (peer, inserts) in self.requests {
let node_manager = node_manager.clone();
common_runtime::spawn_global(async move {
let result = node_manager
.flownode(&peer)
.await
.handle_inserts(inserts)
.await
.context(RequestInsertsSnafu);
match result {
Ok(resp) => {
let affected_rows = resp.affected_rows;
crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
}
Err(err) => {
error!(err; "Failed to insert data into flownode {}", peer);
}
}
});
}
Ok(())
}
}

View File

@@ -41,6 +41,11 @@ lazy_static! {
"table operator mirror rows"
)
.unwrap();
pub static ref DIST_MIRROR_PENDING_ROW_COUNT: IntGauge = register_int_gauge!(
"greptime_table_operator_mirror_pending_rows",
"table operator mirror pending rows"
)
.unwrap();
pub static ref DIST_DELETE_ROW_COUNT: IntCounter = register_int_counter!(
"greptime_table_operator_delete_rows",
"table operator delete rows"