Compare commits

...

10 Commits

Author SHA1 Message Date
Lei, HUANG
97e9b97a57 feat/bulk-support-flow-batch:
Update `greptime-proto` Dependency

 - Updated the `greptime-proto` dependency to a new revision in both `Cargo.lock` and `Cargo.toml`.
   - `Cargo.lock`: Changed the source revision from `f0913f179ee1d2ce428f8b85a9ea12b5f69ad636` to `17971523673f4fbc982510d3c9d6647ff642e16f`.
   - `Cargo.toml`: Updated the `greptime-proto` git revision to `17971523673f4fbc982510d3c9d6647ff642e16f`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-16 06:28:12 +00:00
Lei, HUANG
7fc74e2928 feat/bulk-support-flow-batch:
## Update `handle_mark_window_dirty` Method in `flownode_impl.rs`

 - Replaced `unimplemented!()` with `unreachable!()` in the `handle_mark_window_dirty` method for both `FlowDualEngine` and `StreamingEngine` implementations in `flownode_impl.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-16 06:22:25 +00:00
Lei, HUANG
77f20ede7a feat/bulk-support-flow-batch:
**Refactor and Enhance Timestamp Handling in `bulk_insert.rs`**

 - **Refactored Timestamp Extraction**: Moved timestamp extraction logic to a new method `maybe_update_flow_dirty_window` to improve code readability and maintainability.
 - **Enhanced Flow Update Logic**: Updated the flow dirty window update mechanism to conditionally notify flownodes only if they are configured, using `table_info` and `record_batch`.
 - **Imports Adjusted**: Updated imports to reflect changes in table metadata handling, replacing `TableId` with `TableInfoRef`.

 Files affected:
 - `src/operator/src/bulk_insert.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-16 06:22:24 +00:00
Lei, HUANG
ced018fce0 feat/bulk-support-flow-batch:
### Update Metrics in Batching Mode Engine

 - **Modified Metrics**: Replaced `METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE` with `METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW` to track the count of time windows instead of their range.
   - Files affected: `engine.rs`, `metrics.rs`
 - **New Method**: Added `len()` method to `DirtyTimeWindows` to return the number of dirty windows.
   - File affected: `state.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-16 06:22:24 +00:00
Lei, HUANG
41dacff283 feat/bulk-support-flow-batch:
**Refactor Timestamp Handling and Update Dependencies**

 - **Dependency Update**: Updated `greptime-proto` dependency in `Cargo.lock` and `Cargo.toml` to a new revision.
 - **Batching Engine Refactor**: Modified `src/flow/src/batching_mode/engine.rs` to replace `dirty_time_ranges` with `timestamps` for improved timestamp handling.
 - **Bulk Insert Refactor**: Updated `src/operator/src/bulk_insert.rs` to refactor timestamp extraction and handling. Replaced `compute_timestamp_range` with `extract_timestamps` and adjusted related logic to handle timestamps directly.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-16 06:22:22 +00:00
discord9
94a14b6da7 metrics: more useful metrics batching mode 2025-06-16 06:22:06 +00:00
discord9
6ad3a32cb2 feat: metrics 2025-06-16 06:22:06 +00:00
discord9
ac00314578 feat: mark dirty time window 2025-06-16 06:22:05 +00:00
Lei, HUANG
2f08bee08f feat/bulk-support-flow-batch:
### Add Dirty Window Handling in Flow Module

 - **Updated `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`.
 - **Flow Module Enhancements**:
   - Added `DirtyWindowRequest` handling in `flow.rs`, `node_manager.rs`, `test_util.rs`, `flownode_impl.rs`, and `server.rs`.
   - Implemented `handle_mark_window_dirty` function to manage dirty time windows.
 - **Bulk Insert Enhancements**:
   - Modified `bulk_insert.rs` to notify flownodes about dirty time windows using `update_flow_dirty_window`.
 - **Removed Unused Imports**: Cleaned up unused imports in `greptime_handler.rs`, `grpc.rs`, and `mod.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-16 06:22:03 +00:00
Lei, HUANG
8ebb31cdcd feat/bulk-support-flow-batch:
### Refactor and Enhance Timestamp Handling in gRPC and Bulk Insert

 - **Refactor Table Handling**:
   - Updated `put_record_batch` method to use `TableRef` instead of `TableId` in `grpc.rs`, `greptime_handler.rs`, and `grpc.rs`.
   - Modified `handle_bulk_insert` to accept `TableRef` and extract `TableId` internally in `bulk_insert.rs`.

 - **Enhance Timestamp Processing**:
   - Added `compute_timestamp_range` function to calculate timestamp range in `bulk_insert.rs`.
   - Introduced error handling for invalid time index types in `error.rs`.

 - **Test Adjustments**:
   - Updated `DummyInstance` implementation in `tests/mod.rs` to align with new method signatures.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-06-16 06:21:32 +00:00
19 changed files with 390 additions and 47 deletions

2
Cargo.lock generated
View File

@@ -5143,7 +5143,7 @@ dependencies = [
[[package]] [[package]]
name = "greptime-proto" name = "greptime-proto"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=f0913f179ee1d2ce428f8b85a9ea12b5f69ad636#f0913f179ee1d2ce428f8b85a9ea12b5f69ad636" source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=17971523673f4fbc982510d3c9d6647ff642e16f#17971523673f4fbc982510d3c9d6647ff642e16f"
dependencies = [ dependencies = [
"prost 0.13.5", "prost 0.13.5",
"serde", "serde",

View File

@@ -134,7 +134,7 @@ etcd-client = "0.14"
fst = "0.4.7" fst = "0.4.7"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f0913f179ee1d2ce428f8b85a9ea12b5f69ad636" } greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17971523673f4fbc982510d3c9d6647ff642e16f" }
hex = "0.4" hex = "0.4"
http = "1" http = "1"
humantime = "2.1" humantime = "2.1"

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use api::v1::flow::{FlowRequest, FlowResponse}; use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
use api::v1::region::InsertRequests; use api::v1::region::InsertRequests;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::node_manager::Flownode; use common_meta::node_manager::Flownode;
@@ -44,6 +44,16 @@ impl Flownode for FlowRequester {
.map_err(BoxedError::new) .map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu) .context(common_meta::error::ExternalSnafu)
} }
async fn handle_mark_window_dirty(
&self,
req: DirtyWindowRequest,
) -> common_meta::error::Result<FlowResponse> {
self.handle_mark_window_dirty(req)
.await
.map_err(BoxedError::new)
.context(common_meta::error::ExternalSnafu)
}
} }
impl FlowRequester { impl FlowRequester {
@@ -91,4 +101,20 @@ impl FlowRequester {
.into_inner(); .into_inner();
Ok(response) Ok(response)
} }
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
let (addr, mut client) = self.client.raw_flow_client()?;
let response = client
.handle_mark_dirty_time_window(DirtyWindowRequests {
requests: vec![req],
})
.await
.or_else(|e| {
let code = e.code();
let err: crate::error::Error = e.into();
Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
})?
.into_inner();
Ok(response)
}
} }

View File

@@ -15,7 +15,7 @@
use std::sync::Arc; use std::sync::Arc;
use api::region::RegionResponse; use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse}; use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, RegionRequest}; use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows; pub use common_base::AffectedRows;
use common_query::request::QueryRequest; use common_query::request::QueryRequest;
@@ -42,6 +42,9 @@ pub trait Flownode: Send + Sync {
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>; async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>; async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
/// Handles requests to mark time window as dirty.
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse>;
} }
pub type FlownodeRef = Arc<dyn Flownode>; pub type FlownodeRef = Arc<dyn Flownode>;

View File

@@ -15,7 +15,7 @@
use std::sync::Arc; use std::sync::Arc;
use api::region::RegionResponse; use api::region::RegionResponse;
use api::v1::flow::{FlowRequest, FlowResponse}; use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
use api::v1::region::{InsertRequests, RegionRequest}; use api::v1::region::{InsertRequests, RegionRequest};
pub use common_base::AffectedRows; pub use common_base::AffectedRows;
use common_query::request::QueryRequest; use common_query::request::QueryRequest;
@@ -67,6 +67,14 @@ pub trait MockFlownodeHandler: Sync + Send + Clone {
) -> Result<FlowResponse> { ) -> Result<FlowResponse> {
unimplemented!() unimplemented!()
} }
async fn handle_mark_window_dirty(
&self,
_peer: &Peer,
_req: DirtyWindowRequest,
) -> Result<FlowResponse> {
unimplemented!()
}
} }
/// A mock struct implements [NodeManager] only implement the `datanode` method. /// A mock struct implements [NodeManager] only implement the `datanode` method.
@@ -134,6 +142,10 @@ impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> { async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
self.handler.handle_inserts(&self.peer, requests).await self.handler.handle_inserts(&self.peer, requests).await
} }
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
self.handler.handle_mark_window_dirty(&self.peer, req).await
}
} }
#[async_trait::async_trait] #[async_trait::async_trait]

View File

@@ -316,7 +316,7 @@ impl StreamingEngine {
); );
METRIC_FLOW_ROWS METRIC_FLOW_ROWS
.with_label_values(&["out"]) .with_label_values(&["out-streaming"])
.inc_by(total_rows as u64); .inc_by(total_rows as u64);
let now = self.tick_manager.tick(); let now = self.tick_manager.tick();

View File

@@ -31,6 +31,7 @@ use common_runtime::JoinHandle;
use common_telemetry::{error, info, trace, warn}; use common_telemetry::{error, info, trace, warn};
use datatypes::value::Value; use datatypes::value::Value;
use futures::TryStreamExt; use futures::TryStreamExt;
use greptime_proto::v1::flow::DirtyWindowRequest;
use itertools::Itertools; use itertools::Itertools;
use session::context::QueryContextBuilder; use session::context::QueryContextBuilder;
use snafu::{ensure, IntoError, OptionExt, ResultExt}; use snafu::{ensure, IntoError, OptionExt, ResultExt};
@@ -46,7 +47,7 @@ use crate::error::{
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
}; };
use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
use crate::repr::{self, DiffRow}; use crate::repr::{self, DiffRow};
use crate::{Error, FlowId}; use crate::{Error, FlowId};
@@ -689,6 +690,9 @@ impl FlowEngine for FlowDualEngine {
let mut to_stream_engine = Vec::with_capacity(request.requests.len()); let mut to_stream_engine = Vec::with_capacity(request.requests.len());
let mut to_batch_engine = request.requests; let mut to_batch_engine = request.requests;
let mut batching_row_cnt = 0;
let mut streaming_row_cnt = 0;
{ {
// not locking this, or recover flows will be starved when also handling flow inserts // not locking this, or recover flows will be starved when also handling flow inserts
let src_table2flow = self.src_table2flow.read().await; let src_table2flow = self.src_table2flow.read().await;
@@ -698,9 +702,11 @@ impl FlowEngine for FlowDualEngine {
let is_in_stream = src_table2flow.in_stream(table_id); let is_in_stream = src_table2flow.in_stream(table_id);
let is_in_batch = src_table2flow.in_batch(table_id); let is_in_batch = src_table2flow.in_batch(table_id);
if is_in_stream { if is_in_stream {
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
to_stream_engine.push(req.clone()); to_stream_engine.push(req.clone());
} }
if is_in_batch { if is_in_batch {
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
return true; return true;
} }
if !is_in_batch && !is_in_stream { if !is_in_batch && !is_in_stream {
@@ -713,6 +719,14 @@ impl FlowEngine for FlowDualEngine {
// can't use drop due to https://github.com/rust-lang/rust/pull/128846 // can't use drop due to https://github.com/rust-lang/rust/pull/128846
} }
METRIC_FLOW_ROWS
.with_label_values(&["in-streaming"])
.inc_by(streaming_row_cnt as u64);
METRIC_FLOW_ROWS
.with_label_values(&["in-batching"])
.inc_by(batching_row_cnt as u64);
let streaming_engine = self.streaming_engine.clone(); let streaming_engine = self.streaming_engine.clone();
let stream_handler: JoinHandle<Result<(), Error>> = let stream_handler: JoinHandle<Result<(), Error>> =
common_runtime::spawn_global(async move { common_runtime::spawn_global(async move {
@@ -819,6 +833,10 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
.map(|_| Default::default()) .map(|_| Default::default())
.map_err(to_meta_err(snafu::location!())) .map_err(to_meta_err(snafu::location!()))
} }
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
unreachable!()
}
} }
/// return a function to convert `crate::error::Error` to `common_meta::error::Error` /// return a function to convert `crate::error::Error` to `common_meta::error::Error`
@@ -926,6 +944,10 @@ impl common_meta::node_manager::Flownode for StreamingEngine {
.map(|_| Default::default()) .map(|_| Default::default())
.map_err(to_meta_err(snafu::location!())) .map_err(to_meta_err(snafu::location!()))
} }
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
unreachable!()
}
} }
impl FlowEngine for StreamingEngine { impl FlowEngine for StreamingEngine {

View File

@@ -17,6 +17,7 @@
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::sync::Arc; use std::sync::Arc;
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
use catalog::CatalogManagerRef; use catalog::CatalogManagerRef;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType; use common_meta::ddl::create_flow::FlowType;
@@ -29,8 +30,7 @@ use common_telemetry::{debug, info};
use common_time::TimeToLive; use common_time::TimeToLive;
use query::QueryEngineRef; use query::QueryEngineRef;
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId; use store_api::storage::{RegionId, TableId};
use table::metadata::TableId;
use tokio::sync::{oneshot, RwLock}; use tokio::sync::{oneshot, RwLock};
use crate::batching_mode::frontend_client::FrontendClient; use crate::batching_mode::frontend_client::FrontendClient;
@@ -42,6 +42,7 @@ use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
UnexpectedSnafu, UnsupportedSnafu, UnexpectedSnafu, UnsupportedSnafu,
}; };
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
use crate::{CreateFlowArgs, Error, FlowId, TableName}; use crate::{CreateFlowArgs, Error, FlowId, TableName};
/// Batching mode Engine, responsible for driving all the batching mode tasks /// Batching mode Engine, responsible for driving all the batching mode tasks
@@ -77,6 +78,116 @@ impl BatchingEngine {
} }
} }
pub async fn handle_mark_dirty_time_window(
&self,
reqs: DirtyWindowRequests,
) -> Result<FlowResponse, Error> {
let table_info_mgr = self.table_meta.table_info_manager();
let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
for r in reqs.requests {
let tid = TableId::from(r.table_id);
let entry = group_by_table_id.entry(tid).or_default();
entry.extend(r.timestamps);
}
let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
let table_infos =
table_info_mgr
.batch_get(&tids)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("Failed to get table info for table ids: {:?}", tids),
})?;
let group_by_table_name = group_by_table_id
.into_iter()
.filter_map(|(id, timestamps)| {
let table_name = table_infos.get(&id).map(|info| info.table_name());
let Some(table_name) = table_name else {
warn!("Failed to get table infos for table id: {:?}", id);
return None;
};
let table_name = [
table_name.catalog_name,
table_name.schema_name,
table_name.table_name,
];
let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
.data_type
.as_timestamp()
.unwrap()
.unit();
Some((table_name, (timestamps, time_index_unit)))
})
.collect::<HashMap<_, _>>();
let group_by_table_name = Arc::new(group_by_table_name);
let mut handles = Vec::new();
let tasks = self.tasks.read().await;
for (_flow_id, task) in tasks.iter() {
let src_table_names = &task.config.source_table_names;
if src_table_names
.iter()
.all(|name| !group_by_table_name.contains_key(name))
{
continue;
}
let group_by_table_name = group_by_table_name.clone();
let task = task.clone();
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
let src_table_names = &task.config.source_table_names;
let mut all_dirty_windows = vec![];
for src_table_name in src_table_names {
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.config.time_window_expr else {
continue;
};
for timestamp in timestamps {
let align_start = expr
.eval(common_time::Timestamp::new(*timestamp, *unit))?
.0
.context(UnexpectedSnafu {
reason: "Failed to eval start value",
})?;
all_dirty_windows.push(align_start);
}
}
}
let mut state = task.state.write().unwrap();
let flow_id_label = task.config.flow_id.to_string();
for timestamp in all_dirty_windows {
state.dirty_time_windows.add_window(timestamp, None);
}
METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
.with_label_values(&[&flow_id_label])
.set(state.dirty_time_windows.len() as f64);
Ok(())
});
handles.push(handle);
}
drop(tasks);
for handle in handles {
match handle.await {
Err(e) => {
warn!("Failed to handle inserts: {e}");
}
Ok(Ok(())) => (),
Ok(Err(e)) => {
warn!("Failed to handle inserts: {e}");
}
}
}
Ok(Default::default())
}
pub async fn handle_inserts_inner( pub async fn handle_inserts_inner(
&self, &self,
request: api::v1::region::InsertRequests, request: api::v1::region::InsertRequests,

View File

@@ -156,6 +156,11 @@ impl DirtyTimeWindows {
self.windows.clear(); self.windows.clear();
} }
/// Number of dirty windows.
pub fn len(&self) -> usize {
self.windows.len()
}
/// Generate all filter expressions consuming all time windows /// Generate all filter expressions consuming all time windows
/// ///
/// there is two limits: /// there is two limits:

View File

@@ -61,7 +61,9 @@ use crate::error::{
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu, SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
}; };
use crate::metrics::{ use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
METRIC_FLOW_ROWS,
}; };
use crate::{Error, FlowId}; use crate::{Error, FlowId};
@@ -371,6 +373,9 @@ impl BatchingTask {
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}", "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
elapsed elapsed
); );
METRIC_FLOW_ROWS
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
.inc_by(*affected_rows as _);
} else if let Err(err) = &res { } else if let Err(err) = &res {
warn!( warn!(
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}", "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
@@ -410,6 +415,7 @@ impl BatchingTask {
engine: QueryEngineRef, engine: QueryEngineRef,
frontend_client: Arc<FrontendClient>, frontend_client: Arc<FrontendClient>,
) { ) {
let flow_id_str = self.config.flow_id.to_string();
loop { loop {
// first check if shutdown signal is received // first check if shutdown signal is received
// if so, break the loop // if so, break the loop
@@ -427,6 +433,9 @@ impl BatchingTask {
Err(TryRecvError::Empty) => (), Err(TryRecvError::Empty) => (),
} }
} }
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
.with_label_values(&[&flow_id_str])
.inc();
let new_query = match self.gen_insert_plan(&engine).await { let new_query = match self.gen_insert_plan(&engine).await {
Ok(new_query) => new_query, Ok(new_query) => new_query,
@@ -473,6 +482,9 @@ impl BatchingTask {
} }
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
Err(err) => { Err(err) => {
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
.with_label_values(&[&flow_id_str])
.inc();
match new_query { match new_query {
Some(query) => { Some(query) => {
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id) common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)

View File

@@ -58,11 +58,32 @@ lazy_static! {
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW: GaugeVec =
register_gauge_vec!(
"greptime_flow_batching_engine_bulk_mark_time_window",
"flow batching engine query time window count marked by bulk inserts",
&["flow_id"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
register_int_counter_vec!(
"greptime_flow_batching_start_query_count",
"flow batching engine started query count",
&["flow_id"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
register_int_counter_vec!(
"greptime_flow_batching_error_count",
"flow batching engine error count per flow id",
&["flow_id"],
)
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!( pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
"greptime_flow_processed_rows", "greptime_flow_processed_rows",
"Count of rows flowing through the system", "Count of rows flowing through the system.",
&["direction"] &["direction"]
) )
.unwrap(); .unwrap();

View File

@@ -17,6 +17,7 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use api::v1::flow::DirtyWindowRequests;
use api::v1::{RowDeleteRequests, RowInsertRequests}; use api::v1::{RowDeleteRequests, RowInsertRequests};
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME}; use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
use catalog::CatalogManagerRef; use catalog::CatalogManagerRef;
@@ -136,6 +137,18 @@ impl flow_server::Flow for FlowService {
.map(Response::new) .map(Response::new)
.map_err(to_status_with_last_err) .map_err(to_status_with_last_err)
} }
async fn handle_mark_dirty_time_window(
&self,
reqs: Request<DirtyWindowRequests>,
) -> Result<Response<FlowResponse>, Status> {
self.dual_engine
.batching_engine()
.handle_mark_dirty_time_window(reqs.into_inner())
.await
.map(Response::new)
.map_err(to_status_with_last_err)
}
} }
#[derive(Clone)] #[derive(Clone)]

View File

@@ -35,8 +35,8 @@ use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt}; use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::table_name::TableName; use table::table_name::TableName;
use table::TableRef;
use crate::error::{ use crate::error::{
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu, CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
@@ -235,34 +235,33 @@ impl GrpcQueryHandler for Instance {
async fn put_record_batch( async fn put_record_batch(
&self, &self,
table: &TableName, table_name: &TableName,
table_id: &mut Option<TableId>, table_ref: &mut Option<TableRef>,
decoder: &mut FlightDecoder, decoder: &mut FlightDecoder,
data: FlightData, data: FlightData,
) -> Result<AffectedRows> { ) -> Result<AffectedRows> {
let table_id = if let Some(table_id) = table_id { let table = if let Some(table) = table_ref {
*table_id table.clone()
} else { } else {
let table = self let table = self
.catalog_manager() .catalog_manager()
.table( .table(
&table.catalog_name, &table_name.catalog_name,
&table.schema_name, &table_name.schema_name,
&table.table_name, &table_name.table_name,
None, None,
) )
.await .await
.context(CatalogSnafu)? .context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu { .with_context(|| TableNotFoundSnafu {
table_name: table.to_string(), table_name: table_name.to_string(),
})?; })?;
let id = table.table_info().table_id(); *table_ref = Some(table.clone());
*table_id = Some(id); table
id
}; };
self.inserter self.inserter
.handle_bulk_insert(table_id, decoder, data) .handle_bulk_insert(table, decoder, data)
.await .await
.context(TableOperationSnafu) .context(TableOperationSnafu)
} }

View File

@@ -12,18 +12,29 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::collections::HashSet;
use ahash::{HashMap, HashMapExt}; use ahash::{HashMap, HashMapExt};
use api::v1::flow::DirtyWindowRequest;
use api::v1::region::{ use api::v1::region::{
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader, bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
}; };
use api::v1::ArrowIpc; use api::v1::ArrowIpc;
use arrow::array::{
Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use arrow::datatypes::{DataType, Int64Type, TimeUnit};
use arrow::record_batch::RecordBatch;
use common_base::AffectedRows; use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_grpc::FlightData; use common_grpc::FlightData;
use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext; use common_telemetry::tracing_context::TracingContext;
use snafu::ResultExt; use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId; use store_api::storage::RegionId;
use table::metadata::TableId; use table::metadata::TableInfoRef;
use table::TableRef;
use crate::insert::Inserter; use crate::insert::Inserter;
use crate::{error, metrics}; use crate::{error, metrics};
@@ -32,10 +43,12 @@ impl Inserter {
/// Handle bulk insert request. /// Handle bulk insert request.
pub async fn handle_bulk_insert( pub async fn handle_bulk_insert(
&self, &self,
table_id: TableId, table: TableRef,
decoder: &mut FlightDecoder, decoder: &mut FlightDecoder,
data: FlightData, data: FlightData,
) -> error::Result<AffectedRows> { ) -> error::Result<AffectedRows> {
let table_info = table.table_info();
let table_id = table_info.table_id();
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["decode_request"]) .with_label_values(&["decode_request"])
.start_timer(); .start_timer();
@@ -48,6 +61,10 @@ impl Inserter {
return Ok(0); return Ok(0);
}; };
decode_timer.observe_duration(); decode_timer.observe_duration();
// notify flownode to update dirty timestamps if flow is configured.
self.maybe_update_flow_dirty_window(table_info, record_batch.clone());
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64); metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
metrics::BULK_REQUEST_ROWS metrics::BULK_REQUEST_ROWS
.with_label_values(&["raw"]) .with_label_values(&["raw"])
@@ -216,4 +233,103 @@ impl Inserter {
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64); crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
Ok(rows_inserted) Ok(rows_inserted)
} }
fn maybe_update_flow_dirty_window(&self, table_info: TableInfoRef, record_batch: RecordBatch) {
let table_id = table_info.table_id();
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
let node_manager = self.node_manager.clone();
common_runtime::spawn_global(async move {
let result = table_flownode_set_cache
.get(table_id)
.await
.context(error::RequestInsertsSnafu);
let flownodes = match result {
Ok(flownodes) => flownodes.unwrap_or_default(),
Err(e) => {
error!(e; "Failed to get flownodes for table id: {}", table_id);
return;
}
};
let peers: HashSet<_> = flownodes.values().cloned().collect();
if peers.is_empty() {
return;
}
let Ok(timestamps) = extract_timestamps(
&record_batch,
&table_info
.meta
.schema
.timestamp_column()
.as_ref()
.unwrap()
.name,
)
.inspect_err(|e| {
error!(e; "Failed to extract timestamps from record batch");
}) else {
return;
};
for peer in peers {
let node_manager = node_manager.clone();
let timestamps = timestamps.clone();
common_runtime::spawn_global(async move {
if let Err(e) = node_manager
.flownode(&peer)
.await
.handle_mark_window_dirty(DirtyWindowRequest {
table_id,
timestamps,
})
.await
.context(error::RequestInsertsSnafu)
{
error!(e; "Failed to mark timestamps as dirty, table: {}", table_id);
}
});
}
});
}
}
/// Calculate the timestamp range of record batch. Return `None` if record batch is empty.
fn extract_timestamps(rb: &RecordBatch, timestamp_index_name: &str) -> error::Result<Vec<i64>> {
let ts_col = rb
.column_by_name(timestamp_index_name)
.context(error::ColumnNotFoundSnafu {
msg: timestamp_index_name,
})?;
if rb.num_rows() == 0 {
return Ok(vec![]);
}
let primitive = match ts_col.data_type() {
DataType::Timestamp(unit, _) => match unit {
TimeUnit::Second => ts_col
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Millisecond => ts_col
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Microsecond => ts_col
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
TimeUnit::Nanosecond => ts_col
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.reinterpret_cast::<Int64Type>(),
},
t => {
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
}
};
Ok(primitive.iter().flatten().collect())
} }

View File

@@ -838,6 +838,13 @@ pub enum Error {
location: Location, location: Location,
}, },
#[snafu(display("Invalid time index type: {}", ty))]
InvalidTimeIndexType {
ty: arrow::datatypes::DataType,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid process id: {}", id))] #[snafu(display("Invalid process id: {}", id))]
InvalidProcessId { id: String }, InvalidProcessId { id: String },
@@ -973,6 +980,7 @@ impl ErrorExt for Error {
Error::ColumnOptions { source, .. } => source.status_code(), Error::ColumnOptions { source, .. } => source.status_code(),
Error::DecodeFlightData { source, .. } => source.status_code(), Error::DecodeFlightData { source, .. } => source.status_code(),
Error::ComputeArrow { .. } => StatusCode::Internal, Error::ComputeArrow { .. } => StatusCode::Internal,
Error::InvalidTimeIndexType { .. } => StatusCode::InvalidArguments,
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments, Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected, Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
} }

View File

@@ -78,7 +78,7 @@ pub struct Inserter {
catalog_manager: CatalogManagerRef, catalog_manager: CatalogManagerRef,
pub(crate) partition_manager: PartitionRuleManagerRef, pub(crate) partition_manager: PartitionRuleManagerRef,
pub(crate) node_manager: NodeManagerRef, pub(crate) node_manager: NodeManagerRef,
table_flownode_set_cache: TableFlownodeSetCacheRef, pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
} }
pub type InserterRef = Arc<Inserter>; pub type InserterRef = Arc<Inserter>;

View File

@@ -40,7 +40,7 @@ use futures_util::StreamExt;
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef}; use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
use session::hints::READ_PREFERENCE_HINT; use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
use table::metadata::TableId; use table::TableRef;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::error::Error::UnsupportedAuthScheme; use crate::error::Error::UnsupportedAuthScheme;
@@ -149,8 +149,8 @@ impl GreptimeRequestHandler {
.clone() .clone()
.unwrap_or_else(common_runtime::global_runtime); .unwrap_or_else(common_runtime::global_runtime);
runtime.spawn(async move { runtime.spawn(async move {
// Cached table id // Cached table ref
let mut table_id: Option<TableId> = None; let mut table_ref: Option<TableRef> = None;
let mut decoder = FlightDecoder::default(); let mut decoder = FlightDecoder::default();
while let Some(request) = stream.next().await { while let Some(request) = stream.next().await {
@@ -169,7 +169,7 @@ impl GreptimeRequestHandler {
let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer(); let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer();
let result = handler let result = handler
.put_record_batch(&table_name, &mut table_id, &mut decoder, data) .put_record_batch(&table_name, &mut table_ref, &mut decoder, data)
.await .await
.inspect_err(|e| error!(e; "Failed to handle flight record batches")); .inspect_err(|e| error!(e; "Failed to handle flight record batches"));
timer.observe_duration(); timer.observe_duration();

View File

@@ -23,8 +23,8 @@ use common_grpc::flight::FlightDecoder;
use common_query::Output; use common_query::Output;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::ResultExt; use snafu::ResultExt;
use table::metadata::TableId;
use table::table_name::TableName; use table::table_name::TableName;
use table::TableRef;
use crate::error::{self, Result}; use crate::error::{self, Result};
@@ -45,8 +45,8 @@ pub trait GrpcQueryHandler {
async fn put_record_batch( async fn put_record_batch(
&self, &self,
table: &TableName, table_name: &TableName,
table_id: &mut Option<TableId>, table_ref: &mut Option<TableRef>,
decoder: &mut FlightDecoder, decoder: &mut FlightDecoder,
flight_data: FlightData, flight_data: FlightData,
) -> std::result::Result<AffectedRows, Self::Error>; ) -> std::result::Result<AffectedRows, Self::Error>;
@@ -77,13 +77,13 @@ where
async fn put_record_batch( async fn put_record_batch(
&self, &self,
table: &TableName, table_name: &TableName,
table_id: &mut Option<TableId>, table_ref: &mut Option<TableRef>,
decoder: &mut FlightDecoder, decoder: &mut FlightDecoder,
data: FlightData, data: FlightData,
) -> Result<AffectedRows> { ) -> Result<AffectedRows> {
self.0 self.0
.put_record_batch(table, table_id, decoder, data) .put_record_batch(table_name, table_ref, decoder, data)
.await .await
.map_err(BoxedError::new) .map_err(BoxedError::new)
.context(error::ExecuteGrpcRequestSnafu) .context(error::ExecuteGrpcRequestSnafu)

View File

@@ -34,7 +34,6 @@ use servers::query_handler::sql::{ServerSqlQueryHandlerRef, SqlQueryHandler};
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::ensure; use snafu::ensure;
use sql::statements::statement::Statement; use sql::statements::statement::Statement;
use table::metadata::TableId;
use table::table_name::TableName; use table::table_name::TableName;
use table::TableRef; use table::TableRef;
@@ -160,15 +159,11 @@ impl GrpcQueryHandler for DummyInstance {
async fn put_record_batch( async fn put_record_batch(
&self, &self,
table: &TableName, _table_name: &TableName,
table_id: &mut Option<TableId>, _table_ref: &mut Option<TableRef>,
decoder: &mut FlightDecoder, _decoder: &mut FlightDecoder,
data: FlightData, _data: FlightData,
) -> std::result::Result<AffectedRows, Self::Error> { ) -> std::result::Result<AffectedRows, Self::Error> {
let _ = table;
let _ = data;
let _ = table_id;
let _ = decoder;
unimplemented!() unimplemented!()
} }
} }