mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
feat/bulk-support-flow-batch:
### Add Dirty Window Handling in Flow Module - **Updated `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`. - **Flow Module Enhancements**: - Added `DirtyWindowRequest` handling in `flow.rs`, `node_manager.rs`, `test_util.rs`, `flownode_impl.rs`, and `server.rs`. - Implemented `handle_mark_window_dirty` function to manage dirty time windows. - **Bulk Insert Enhancements**: - Modified `bulk_insert.rs` to notify flownodes about dirty time windows using `update_flow_dirty_window`. - **Removed Unused Imports**: Cleaned up unused imports in `greptime_handler.rs`, `grpc.rs`, and `mod.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
|
||||
use api::v1::region::InsertRequests;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::node_manager::Flownode;
|
||||
@@ -44,6 +44,16 @@ impl Flownode for FlowRequester {
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(
|
||||
&self,
|
||||
req: DirtyWindowRequest,
|
||||
) -> common_meta::error::Result<FlowResponse> {
|
||||
self.handle_mark_window_dirty(req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowRequester {
|
||||
@@ -91,4 +101,20 @@ impl FlowRequester {
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
|
||||
let (addr, mut client) = self.client.raw_flow_client()?;
|
||||
let response = client
|
||||
.handle_mark_dirty_time_window(DirtyWindowRequests {
|
||||
requests: vec![req],
|
||||
})
|
||||
.await
|
||||
.or_else(|e| {
|
||||
let code = e.code();
|
||||
let err: crate::error::Error = e.into();
|
||||
Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
|
||||
})?
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
|
||||
use api::v1::region::{InsertRequests, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_query::request::QueryRequest;
|
||||
@@ -42,6 +42,9 @@ pub trait Flownode: Send + Sync {
|
||||
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;
|
||||
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
|
||||
|
||||
/// Handles requests to mark time window as dirty.
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse>;
|
||||
}
|
||||
|
||||
pub type FlownodeRef = Arc<dyn Flownode>;
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
|
||||
use api::v1::region::{InsertRequests, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_query::request::QueryRequest;
|
||||
@@ -67,6 +67,14 @@ pub trait MockFlownodeHandler: Sync + Send + Clone {
|
||||
) -> Result<FlowResponse> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_req: DirtyWindowRequest,
|
||||
) -> Result<FlowResponse> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
/// A mock struct implements [NodeManager] only implement the `datanode` method.
|
||||
@@ -134,6 +142,10 @@ impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
|
||||
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
|
||||
self.handler.handle_inserts(&self.peer, requests).await
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
|
||||
self.handler.handle_mark_window_dirty(&self.peer, req).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -31,6 +31,7 @@ use common_runtime::JoinHandle;
|
||||
use common_telemetry::{error, info, trace, warn};
|
||||
use datatypes::value::Value;
|
||||
use futures::TryStreamExt;
|
||||
use greptime_proto::v1::flow::DirtyWindowRequest;
|
||||
use itertools::Itertools;
|
||||
use session::context::QueryContextBuilder;
|
||||
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
||||
@@ -819,6 +820,11 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
|
||||
// todo: implement
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
|
||||
@@ -926,6 +932,11 @@ impl common_meta::node_manager::Flownode for StreamingEngine {
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
|
||||
// todo: implement
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for StreamingEngine {
|
||||
|
||||
@@ -32,7 +32,9 @@ use common_query::Output;
|
||||
use common_runtime::JoinHandle;
|
||||
use common_telemetry::tracing::info;
|
||||
use futures::{FutureExt, TryStreamExt};
|
||||
use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests};
|
||||
use greptime_proto::v1::flow::{
|
||||
flow_server, DirtyWindowRequests, FlowRequest, FlowResponse, InsertRequests,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use operator::delete::Deleter;
|
||||
use operator::insert::Inserter;
|
||||
@@ -136,6 +138,21 @@ impl flow_server::Flow for FlowService {
|
||||
.map(Response::new)
|
||||
.map_err(to_status_with_last_err)
|
||||
}
|
||||
|
||||
async fn handle_mark_dirty_time_window(
|
||||
&self,
|
||||
request: Request<DirtyWindowRequests>,
|
||||
) -> Result<Response<FlowResponse>, Status> {
|
||||
let req = request.into_inner();
|
||||
for req in req.requests {
|
||||
self.dual_engine
|
||||
.handle_mark_window_dirty(req)
|
||||
.await
|
||||
.map(Response::new)
|
||||
.map_err(to_status_with_last_err)?;
|
||||
}
|
||||
Ok(Response::new(FlowResponse::default()))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -12,7 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use ahash::{HashMap, HashMapExt};
|
||||
use api::v1::flow::{DirtyWindowRequest, WindowRange};
|
||||
use api::v1::region::{
|
||||
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
|
||||
};
|
||||
@@ -26,10 +29,10 @@ use arrow::record_batch::RecordBatch;
|
||||
use common_base::AffectedRows;
|
||||
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
|
||||
use common_grpc::FlightData;
|
||||
use common_telemetry::tracing::Instrument;
|
||||
use common_telemetry::error;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::insert::Inserter;
|
||||
@@ -67,7 +70,8 @@ impl Inserter {
|
||||
.unwrap()
|
||||
.name,
|
||||
)? {
|
||||
// notify flownode.
|
||||
// notify flownode to update dirty time windows.
|
||||
self.update_flow_dirty_window(table_id, min, max);
|
||||
}
|
||||
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
|
||||
metrics::BULK_REQUEST_ROWS
|
||||
@@ -237,6 +241,46 @@ impl Inserter {
|
||||
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
|
||||
Ok(rows_inserted)
|
||||
}
|
||||
|
||||
fn update_flow_dirty_window(&self, table_id: TableId, min: i64, max: i64) {
|
||||
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
|
||||
let node_manager = self.node_manager.clone();
|
||||
common_runtime::spawn_global(async move {
|
||||
let result = table_flownode_set_cache
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(error::RequestInsertsSnafu);
|
||||
let flownodes = match result {
|
||||
Ok(flownodes) => flownodes.unwrap_or_default(),
|
||||
Err(e) => {
|
||||
error!(e; "Failed to get flownodes for table id: {}", table_id);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let peers: HashSet<_> = flownodes.values().cloned().collect();
|
||||
for peer in peers {
|
||||
let node_manager = node_manager.clone();
|
||||
common_runtime::spawn_global(async move {
|
||||
if let Err(e) = node_manager
|
||||
.flownode(&peer)
|
||||
.await
|
||||
.handle_mark_window_dirty(DirtyWindowRequest {
|
||||
table_id,
|
||||
dirty_time_ranges: vec![WindowRange {
|
||||
start_value: min,
|
||||
end_value: max,
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.context(error::RequestInsertsSnafu)
|
||||
{
|
||||
error!(e; "Failed to mark time window as dirty, table: {}, min: {}, max: {}", table_id, min, max);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate the timestamp range of record batch. Return `None` if record batch is empty.
|
||||
@@ -275,7 +319,7 @@ fn compute_timestamp_range(
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
},
|
||||
t @ _ => {
|
||||
t => {
|
||||
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -78,7 +78,7 @@ pub struct Inserter {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
pub(crate) partition_manager: PartitionRuleManagerRef,
|
||||
pub(crate) node_manager: NodeManagerRef,
|
||||
table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
}
|
||||
|
||||
pub type InserterRef = Arc<Inserter>;
|
||||
|
||||
@@ -40,7 +40,6 @@ use futures_util::StreamExt;
|
||||
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
|
||||
use session::hints::READ_PREFERENCE_HINT;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
use table::TableRef;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
|
||||
@@ -23,7 +23,6 @@ use common_grpc::flight::FlightDecoder;
|
||||
use common_query::Output;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use table::metadata::TableId;
|
||||
use table::table_name::TableName;
|
||||
use table::TableRef;
|
||||
|
||||
|
||||
@@ -34,7 +34,6 @@ use servers::query_handler::sql::{ServerSqlQueryHandlerRef, SqlQueryHandler};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ensure;
|
||||
use sql::statements::statement::Statement;
|
||||
use table::metadata::TableId;
|
||||
use table::table_name::TableName;
|
||||
use table::TableRef;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user