mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 22:32:55 +00:00
refactor: move mirror insertion tasks to the background runtime (#3879)
This commit is contained in:
@@ -32,7 +32,7 @@ use common_meta::peer::Peer;
|
||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||
use common_query::Output;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use common_telemetry::{error, info};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use datatypes::schema::Schema;
|
||||
use futures_util::{future, TryStreamExt};
|
||||
use meter_macros::write_meter;
|
||||
@@ -194,6 +194,41 @@ impl Inserter {
|
||||
}
|
||||
|
||||
impl Inserter {
|
||||
fn post_request(&self, requests: RegionInsertRequests) {
|
||||
let node_manager = self.node_manager.clone();
|
||||
let table_flow_manager = self.table_flow_manager.clone();
|
||||
// Spawn all tasks that do job for mirror insert requests for flownode
|
||||
common_runtime::spawn_bg(async move {
|
||||
match Self::mirror_flow_node_requests(table_flow_manager, requests).await {
|
||||
Ok(flow_tasks) => {
|
||||
let flow_tasks = flow_tasks.into_iter().map(|(peer, inserts)| {
|
||||
let node_manager = node_manager.clone();
|
||||
common_runtime::spawn_write(async move {
|
||||
node_manager
|
||||
.flownode(&peer)
|
||||
.await
|
||||
.handle_inserts(inserts)
|
||||
.await
|
||||
.map(|flow_response| RegionResponse {
|
||||
affected_rows: flow_response.affected_rows as AffectedRows,
|
||||
extension: flow_response.extension,
|
||||
})
|
||||
.context(RequestInsertsSnafu)
|
||||
})
|
||||
});
|
||||
|
||||
if let Err(err) = future::try_join_all(flow_tasks)
|
||||
.await
|
||||
.context(JoinTaskSnafu)
|
||||
{
|
||||
warn!(err; "Failed to insert data into flownode");
|
||||
}
|
||||
}
|
||||
Err(err) => warn!(err; "Failed to mirror request to flownode"),
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn do_request(
|
||||
&self,
|
||||
requests: RegionInsertRequests,
|
||||
@@ -206,29 +241,8 @@ impl Inserter {
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
// spawn all tasks that do job for mirror insert requests for flownode
|
||||
let flow_tasks = self
|
||||
.mirror_flow_node_requests(&requests)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(peer, inserts)| {
|
||||
let node_manager = self.node_manager.clone();
|
||||
common_runtime::spawn_write(async move {
|
||||
node_manager
|
||||
.flownode(&peer)
|
||||
.await
|
||||
.handle_inserts(inserts)
|
||||
.await
|
||||
.map(|flow_response| RegionResponse {
|
||||
affected_rows: flow_response.affected_rows as AffectedRows,
|
||||
extension: flow_response.extension,
|
||||
})
|
||||
.context(RequestInsertsSnafu)
|
||||
})
|
||||
});
|
||||
|
||||
let tasks = self
|
||||
.group_requests_by_peer(requests)
|
||||
.group_requests_by_peer(requests.clone())
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(peer, inserts)| {
|
||||
@@ -242,10 +256,9 @@ impl Inserter {
|
||||
.await
|
||||
.context(RequestInsertsSnafu)
|
||||
})
|
||||
})
|
||||
.chain(flow_tasks);
|
||||
});
|
||||
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
|
||||
|
||||
self.post_request(requests);
|
||||
let affected_rows = results
|
||||
.into_iter()
|
||||
.map(|resp| resp.map(|r| r.affected_rows))
|
||||
@@ -259,21 +272,20 @@ impl Inserter {
|
||||
|
||||
/// Mirror requests for source table to flownode
|
||||
async fn mirror_flow_node_requests(
|
||||
&self,
|
||||
requests: &RegionInsertRequests,
|
||||
table_flow_manager: TableFlowManagerRef,
|
||||
requests: RegionInsertRequests,
|
||||
) -> Result<HashMap<Peer, RegionInsertRequests>> {
|
||||
// store partial source table requests used by flow node(only store what's used)
|
||||
let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
|
||||
HashMap::new();
|
||||
for req in &requests.requests {
|
||||
for req in requests.requests {
|
||||
match src_table_reqs.get_mut(&RegionId::from_u64(req.region_id).table_id()) {
|
||||
Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
|
||||
Some(Some((_peers, reqs))) => reqs.requests.push(req),
|
||||
// already know this is not source table
|
||||
Some(None) => continue,
|
||||
_ => {
|
||||
let table_id = RegionId::from_u64(req.region_id).table_id();
|
||||
let peers = self
|
||||
.table_flow_manager
|
||||
let peers = table_flow_manager
|
||||
.flows(table_id)
|
||||
// TODO(discord9): determine where to store the flow node address in distributed mode
|
||||
.map_ok(|key| Peer::new(key.flownode_id(), ""))
|
||||
@@ -287,7 +299,7 @@ impl Inserter {
|
||||
|
||||
if !peers.is_empty() {
|
||||
let mut reqs = RegionInsertRequests::default();
|
||||
reqs.requests.push(req.clone());
|
||||
reqs.requests.push(req);
|
||||
src_table_reqs.insert(table_id, Some((peers, reqs)));
|
||||
} else {
|
||||
// insert a empty entry to avoid repeat query
|
||||
|
||||
Reference in New Issue
Block a user