mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
10 Commits
v0.18.0-ni
...
feat/bulk-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
97e9b97a57 | ||
|
|
7fc74e2928 | ||
|
|
77f20ede7a | ||
|
|
ced018fce0 | ||
|
|
41dacff283 | ||
|
|
94a14b6da7 | ||
|
|
6ad3a32cb2 | ||
|
|
ac00314578 | ||
|
|
2f08bee08f | ||
|
|
8ebb31cdcd |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -5143,7 +5143,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=f0913f179ee1d2ce428f8b85a9ea12b5f69ad636#f0913f179ee1d2ce428f8b85a9ea12b5f69ad636"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=17971523673f4fbc982510d3c9d6647ff642e16f#17971523673f4fbc982510d3c9d6647ff642e16f"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"serde",
|
||||
|
||||
@@ -134,7 +134,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f0913f179ee1d2ce428f8b85a9ea12b5f69ad636" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17971523673f4fbc982510d3c9d6647ff642e16f" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
|
||||
use api::v1::region::InsertRequests;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::node_manager::Flownode;
|
||||
@@ -44,6 +44,16 @@ impl Flownode for FlowRequester {
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(
|
||||
&self,
|
||||
req: DirtyWindowRequest,
|
||||
) -> common_meta::error::Result<FlowResponse> {
|
||||
self.handle_mark_window_dirty(req)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(common_meta::error::ExternalSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowRequester {
|
||||
@@ -91,4 +101,20 @@ impl FlowRequester {
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
|
||||
let (addr, mut client) = self.client.raw_flow_client()?;
|
||||
let response = client
|
||||
.handle_mark_dirty_time_window(DirtyWindowRequests {
|
||||
requests: vec![req],
|
||||
})
|
||||
.await
|
||||
.or_else(|e| {
|
||||
let code = e.code();
|
||||
let err: crate::error::Error = e.into();
|
||||
Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
|
||||
})?
|
||||
.into_inner();
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
|
||||
use api::v1::region::{InsertRequests, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_query::request::QueryRequest;
|
||||
@@ -42,6 +42,9 @@ pub trait Flownode: Send + Sync {
|
||||
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;
|
||||
|
||||
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
|
||||
|
||||
/// Handles requests to mark time window as dirty.
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse>;
|
||||
}
|
||||
|
||||
pub type FlownodeRef = Arc<dyn Flownode>;
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
|
||||
use api::v1::region::{InsertRequests, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_query::request::QueryRequest;
|
||||
@@ -67,6 +67,14 @@ pub trait MockFlownodeHandler: Sync + Send + Clone {
|
||||
) -> Result<FlowResponse> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(
|
||||
&self,
|
||||
_peer: &Peer,
|
||||
_req: DirtyWindowRequest,
|
||||
) -> Result<FlowResponse> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
/// A mock struct implements [NodeManager] only implement the `datanode` method.
|
||||
@@ -134,6 +142,10 @@ impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
|
||||
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
|
||||
self.handler.handle_inserts(&self.peer, requests).await
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
|
||||
self.handler.handle_mark_window_dirty(&self.peer, req).await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -316,7 +316,7 @@ impl StreamingEngine {
|
||||
);
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["out"])
|
||||
.with_label_values(&["out-streaming"])
|
||||
.inc_by(total_rows as u64);
|
||||
|
||||
let now = self.tick_manager.tick();
|
||||
|
||||
@@ -31,6 +31,7 @@ use common_runtime::JoinHandle;
|
||||
use common_telemetry::{error, info, trace, warn};
|
||||
use datatypes::value::Value;
|
||||
use futures::TryStreamExt;
|
||||
use greptime_proto::v1::flow::DirtyWindowRequest;
|
||||
use itertools::Itertools;
|
||||
use session::context::QueryContextBuilder;
|
||||
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
||||
@@ -46,7 +47,7 @@ use crate::error::{
|
||||
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
|
||||
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
||||
use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
|
||||
use crate::repr::{self, DiffRow};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -689,6 +690,9 @@ impl FlowEngine for FlowDualEngine {
|
||||
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
||||
let mut to_batch_engine = request.requests;
|
||||
|
||||
let mut batching_row_cnt = 0;
|
||||
let mut streaming_row_cnt = 0;
|
||||
|
||||
{
|
||||
// not locking this, or recover flows will be starved when also handling flow inserts
|
||||
let src_table2flow = self.src_table2flow.read().await;
|
||||
@@ -698,9 +702,11 @@ impl FlowEngine for FlowDualEngine {
|
||||
let is_in_stream = src_table2flow.in_stream(table_id);
|
||||
let is_in_batch = src_table2flow.in_batch(table_id);
|
||||
if is_in_stream {
|
||||
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||
to_stream_engine.push(req.clone());
|
||||
}
|
||||
if is_in_batch {
|
||||
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||
return true;
|
||||
}
|
||||
if !is_in_batch && !is_in_stream {
|
||||
@@ -713,6 +719,14 @@ impl FlowEngine for FlowDualEngine {
|
||||
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
|
||||
}
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["in-streaming"])
|
||||
.inc_by(streaming_row_cnt as u64);
|
||||
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&["in-batching"])
|
||||
.inc_by(batching_row_cnt as u64);
|
||||
|
||||
let streaming_engine = self.streaming_engine.clone();
|
||||
let stream_handler: JoinHandle<Result<(), Error>> =
|
||||
common_runtime::spawn_global(async move {
|
||||
@@ -819,6 +833,10 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
|
||||
@@ -926,6 +944,10 @@ impl common_meta::node_manager::Flownode for StreamingEngine {
|
||||
.map(|_| Default::default())
|
||||
.map_err(to_meta_err(snafu::location!()))
|
||||
}
|
||||
|
||||
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowEngine for StreamingEngine {
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::ddl::create_flow::FlowType;
|
||||
@@ -29,8 +30,7 @@ use common_telemetry::{debug, info};
|
||||
use common_time::TimeToLive;
|
||||
use query::QueryEngineRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use tokio::sync::{oneshot, RwLock};
|
||||
|
||||
use crate::batching_mode::frontend_client::FrontendClient;
|
||||
@@ -42,6 +42,7 @@ use crate::error::{
|
||||
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
|
||||
UnexpectedSnafu, UnsupportedSnafu,
|
||||
};
|
||||
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
|
||||
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||
|
||||
/// Batching mode Engine, responsible for driving all the batching mode tasks
|
||||
@@ -77,6 +78,116 @@ impl BatchingEngine {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle_mark_dirty_time_window(
|
||||
&self,
|
||||
reqs: DirtyWindowRequests,
|
||||
) -> Result<FlowResponse, Error> {
|
||||
let table_info_mgr = self.table_meta.table_info_manager();
|
||||
|
||||
let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
|
||||
for r in reqs.requests {
|
||||
let tid = TableId::from(r.table_id);
|
||||
let entry = group_by_table_id.entry(tid).or_default();
|
||||
entry.extend(r.timestamps);
|
||||
}
|
||||
let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
|
||||
let table_infos =
|
||||
table_info_mgr
|
||||
.batch_get(&tids)
|
||||
.await
|
||||
.with_context(|_| TableNotFoundMetaSnafu {
|
||||
msg: format!("Failed to get table info for table ids: {:?}", tids),
|
||||
})?;
|
||||
|
||||
let group_by_table_name = group_by_table_id
|
||||
.into_iter()
|
||||
.filter_map(|(id, timestamps)| {
|
||||
let table_name = table_infos.get(&id).map(|info| info.table_name());
|
||||
let Some(table_name) = table_name else {
|
||||
warn!("Failed to get table infos for table id: {:?}", id);
|
||||
return None;
|
||||
};
|
||||
let table_name = [
|
||||
table_name.catalog_name,
|
||||
table_name.schema_name,
|
||||
table_name.table_name,
|
||||
];
|
||||
let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
|
||||
let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
|
||||
.data_type
|
||||
.as_timestamp()
|
||||
.unwrap()
|
||||
.unit();
|
||||
Some((table_name, (timestamps, time_index_unit)))
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let group_by_table_name = Arc::new(group_by_table_name);
|
||||
|
||||
let mut handles = Vec::new();
|
||||
let tasks = self.tasks.read().await;
|
||||
|
||||
for (_flow_id, task) in tasks.iter() {
|
||||
let src_table_names = &task.config.source_table_names;
|
||||
|
||||
if src_table_names
|
||||
.iter()
|
||||
.all(|name| !group_by_table_name.contains_key(name))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let group_by_table_name = group_by_table_name.clone();
|
||||
let task = task.clone();
|
||||
|
||||
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
|
||||
let src_table_names = &task.config.source_table_names;
|
||||
let mut all_dirty_windows = vec![];
|
||||
for src_table_name in src_table_names {
|
||||
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
|
||||
let Some(expr) = &task.config.time_window_expr else {
|
||||
continue;
|
||||
};
|
||||
for timestamp in timestamps {
|
||||
let align_start = expr
|
||||
.eval(common_time::Timestamp::new(*timestamp, *unit))?
|
||||
.0
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Failed to eval start value",
|
||||
})?;
|
||||
all_dirty_windows.push(align_start);
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut state = task.state.write().unwrap();
|
||||
let flow_id_label = task.config.flow_id.to_string();
|
||||
for timestamp in all_dirty_windows {
|
||||
state.dirty_time_windows.add_window(timestamp, None);
|
||||
}
|
||||
|
||||
METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
|
||||
.with_label_values(&[&flow_id_label])
|
||||
.set(state.dirty_time_windows.len() as f64);
|
||||
Ok(())
|
||||
});
|
||||
handles.push(handle);
|
||||
}
|
||||
drop(tasks);
|
||||
for handle in handles {
|
||||
match handle.await {
|
||||
Err(e) => {
|
||||
warn!("Failed to handle inserts: {e}");
|
||||
}
|
||||
Ok(Ok(())) => (),
|
||||
Ok(Err(e)) => {
|
||||
warn!("Failed to handle inserts: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
pub async fn handle_inserts_inner(
|
||||
&self,
|
||||
request: api::v1::region::InsertRequests,
|
||||
|
||||
@@ -156,6 +156,11 @@ impl DirtyTimeWindows {
|
||||
self.windows.clear();
|
||||
}
|
||||
|
||||
/// Number of dirty windows.
|
||||
pub fn len(&self) -> usize {
|
||||
self.windows.len()
|
||||
}
|
||||
|
||||
/// Generate all filter expressions consuming all time windows
|
||||
///
|
||||
/// there is two limits:
|
||||
|
||||
@@ -61,7 +61,9 @@ use crate::error::{
|
||||
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::metrics::{
|
||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
||||
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
|
||||
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
|
||||
METRIC_FLOW_ROWS,
|
||||
};
|
||||
use crate::{Error, FlowId};
|
||||
|
||||
@@ -371,6 +373,9 @@ impl BatchingTask {
|
||||
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
|
||||
elapsed
|
||||
);
|
||||
METRIC_FLOW_ROWS
|
||||
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
|
||||
.inc_by(*affected_rows as _);
|
||||
} else if let Err(err) = &res {
|
||||
warn!(
|
||||
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
|
||||
@@ -410,6 +415,7 @@ impl BatchingTask {
|
||||
engine: QueryEngineRef,
|
||||
frontend_client: Arc<FrontendClient>,
|
||||
) {
|
||||
let flow_id_str = self.config.flow_id.to_string();
|
||||
loop {
|
||||
// first check if shutdown signal is received
|
||||
// if so, break the loop
|
||||
@@ -427,6 +433,9 @@ impl BatchingTask {
|
||||
Err(TryRecvError::Empty) => (),
|
||||
}
|
||||
}
|
||||
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.inc();
|
||||
|
||||
let new_query = match self.gen_insert_plan(&engine).await {
|
||||
Ok(new_query) => new_query,
|
||||
@@ -473,6 +482,9 @@ impl BatchingTask {
|
||||
}
|
||||
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
||||
Err(err) => {
|
||||
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
|
||||
.with_label_values(&[&flow_id_str])
|
||||
.inc();
|
||||
match new_query {
|
||||
Some(query) => {
|
||||
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
||||
|
||||
@@ -58,11 +58,32 @@ lazy_static! {
|
||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW: GaugeVec =
|
||||
register_gauge_vec!(
|
||||
"greptime_flow_batching_engine_bulk_mark_time_window",
|
||||
"flow batching engine query time window count marked by bulk inserts",
|
||||
&["flow_id"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
|
||||
register_int_counter_vec!(
|
||||
"greptime_flow_batching_start_query_count",
|
||||
"flow batching engine started query count",
|
||||
&["flow_id"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
|
||||
register_int_counter_vec!(
|
||||
"greptime_flow_batching_error_count",
|
||||
"flow batching engine error count per flow id",
|
||||
&["flow_id"],
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
||||
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
||||
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
||||
"greptime_flow_processed_rows",
|
||||
"Count of rows flowing through the system",
|
||||
"Count of rows flowing through the system.",
|
||||
&["direction"]
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::DirtyWindowRequests;
|
||||
use api::v1::{RowDeleteRequests, RowInsertRequests};
|
||||
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
|
||||
use catalog::CatalogManagerRef;
|
||||
@@ -136,6 +137,18 @@ impl flow_server::Flow for FlowService {
|
||||
.map(Response::new)
|
||||
.map_err(to_status_with_last_err)
|
||||
}
|
||||
|
||||
async fn handle_mark_dirty_time_window(
|
||||
&self,
|
||||
reqs: Request<DirtyWindowRequests>,
|
||||
) -> Result<Response<FlowResponse>, Status> {
|
||||
self.dual_engine
|
||||
.batching_engine()
|
||||
.handle_mark_dirty_time_window(reqs.into_inner())
|
||||
.await
|
||||
.map(Response::new)
|
||||
.map_err(to_status_with_last_err)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
||||
@@ -35,8 +35,8 @@ use servers::query_handler::grpc::GrpcQueryHandler;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
use table::table_name::TableName;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{
|
||||
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
|
||||
@@ -235,34 +235,33 @@ impl GrpcQueryHandler for Instance {
|
||||
|
||||
async fn put_record_batch(
|
||||
&self,
|
||||
table: &TableName,
|
||||
table_id: &mut Option<TableId>,
|
||||
table_name: &TableName,
|
||||
table_ref: &mut Option<TableRef>,
|
||||
decoder: &mut FlightDecoder,
|
||||
data: FlightData,
|
||||
) -> Result<AffectedRows> {
|
||||
let table_id = if let Some(table_id) = table_id {
|
||||
*table_id
|
||||
let table = if let Some(table) = table_ref {
|
||||
table.clone()
|
||||
} else {
|
||||
let table = self
|
||||
.catalog_manager()
|
||||
.table(
|
||||
&table.catalog_name,
|
||||
&table.schema_name,
|
||||
&table.table_name,
|
||||
&table_name.catalog_name,
|
||||
&table_name.schema_name,
|
||||
&table_name.table_name,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table_name: table.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
})?;
|
||||
let id = table.table_info().table_id();
|
||||
*table_id = Some(id);
|
||||
id
|
||||
*table_ref = Some(table.clone());
|
||||
table
|
||||
};
|
||||
|
||||
self.inserter
|
||||
.handle_bulk_insert(table_id, decoder, data)
|
||||
.handle_bulk_insert(table, decoder, data)
|
||||
.await
|
||||
.context(TableOperationSnafu)
|
||||
}
|
||||
|
||||
@@ -12,18 +12,29 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
|
||||
use ahash::{HashMap, HashMapExt};
|
||||
use api::v1::flow::DirtyWindowRequest;
|
||||
use api::v1::region::{
|
||||
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
|
||||
};
|
||||
use api::v1::ArrowIpc;
|
||||
use arrow::array::{
|
||||
Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
|
||||
TimestampSecondArray,
|
||||
};
|
||||
use arrow::datatypes::{DataType, Int64Type, TimeUnit};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use common_base::AffectedRows;
|
||||
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
|
||||
use common_grpc::FlightData;
|
||||
use common_telemetry::error;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use snafu::ResultExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use table::metadata::TableInfoRef;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::insert::Inserter;
|
||||
use crate::{error, metrics};
|
||||
@@ -32,10 +43,12 @@ impl Inserter {
|
||||
/// Handle bulk insert request.
|
||||
pub async fn handle_bulk_insert(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
table: TableRef,
|
||||
decoder: &mut FlightDecoder,
|
||||
data: FlightData,
|
||||
) -> error::Result<AffectedRows> {
|
||||
let table_info = table.table_info();
|
||||
let table_id = table_info.table_id();
|
||||
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||
.with_label_values(&["decode_request"])
|
||||
.start_timer();
|
||||
@@ -48,6 +61,10 @@ impl Inserter {
|
||||
return Ok(0);
|
||||
};
|
||||
decode_timer.observe_duration();
|
||||
|
||||
// notify flownode to update dirty timestamps if flow is configured.
|
||||
self.maybe_update_flow_dirty_window(table_info, record_batch.clone());
|
||||
|
||||
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
|
||||
metrics::BULK_REQUEST_ROWS
|
||||
.with_label_values(&["raw"])
|
||||
@@ -216,4 +233,103 @@ impl Inserter {
|
||||
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
|
||||
Ok(rows_inserted)
|
||||
}
|
||||
|
||||
fn maybe_update_flow_dirty_window(&self, table_info: TableInfoRef, record_batch: RecordBatch) {
|
||||
let table_id = table_info.table_id();
|
||||
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
|
||||
let node_manager = self.node_manager.clone();
|
||||
common_runtime::spawn_global(async move {
|
||||
let result = table_flownode_set_cache
|
||||
.get(table_id)
|
||||
.await
|
||||
.context(error::RequestInsertsSnafu);
|
||||
let flownodes = match result {
|
||||
Ok(flownodes) => flownodes.unwrap_or_default(),
|
||||
Err(e) => {
|
||||
error!(e; "Failed to get flownodes for table id: {}", table_id);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let peers: HashSet<_> = flownodes.values().cloned().collect();
|
||||
if peers.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
let Ok(timestamps) = extract_timestamps(
|
||||
&record_batch,
|
||||
&table_info
|
||||
.meta
|
||||
.schema
|
||||
.timestamp_column()
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.name,
|
||||
)
|
||||
.inspect_err(|e| {
|
||||
error!(e; "Failed to extract timestamps from record batch");
|
||||
}) else {
|
||||
return;
|
||||
};
|
||||
|
||||
for peer in peers {
|
||||
let node_manager = node_manager.clone();
|
||||
let timestamps = timestamps.clone();
|
||||
common_runtime::spawn_global(async move {
|
||||
if let Err(e) = node_manager
|
||||
.flownode(&peer)
|
||||
.await
|
||||
.handle_mark_window_dirty(DirtyWindowRequest {
|
||||
table_id,
|
||||
timestamps,
|
||||
})
|
||||
.await
|
||||
.context(error::RequestInsertsSnafu)
|
||||
{
|
||||
error!(e; "Failed to mark timestamps as dirty, table: {}", table_id);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate the timestamp range of record batch. Return `None` if record batch is empty.
|
||||
fn extract_timestamps(rb: &RecordBatch, timestamp_index_name: &str) -> error::Result<Vec<i64>> {
|
||||
let ts_col = rb
|
||||
.column_by_name(timestamp_index_name)
|
||||
.context(error::ColumnNotFoundSnafu {
|
||||
msg: timestamp_index_name,
|
||||
})?;
|
||||
if rb.num_rows() == 0 {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
let primitive = match ts_col.data_type() {
|
||||
DataType::Timestamp(unit, _) => match unit {
|
||||
TimeUnit::Second => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampSecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
TimeUnit::Millisecond => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
TimeUnit::Microsecond => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMicrosecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
TimeUnit::Nanosecond => ts_col
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampNanosecondArray>()
|
||||
.unwrap()
|
||||
.reinterpret_cast::<Int64Type>(),
|
||||
},
|
||||
t => {
|
||||
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
|
||||
}
|
||||
};
|
||||
Ok(primitive.iter().flatten().collect())
|
||||
}
|
||||
|
||||
@@ -838,6 +838,13 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid time index type: {}", ty))]
|
||||
InvalidTimeIndexType {
|
||||
ty: arrow::datatypes::DataType,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid process id: {}", id))]
|
||||
InvalidProcessId { id: String },
|
||||
|
||||
@@ -973,6 +980,7 @@ impl ErrorExt for Error {
|
||||
Error::ColumnOptions { source, .. } => source.status_code(),
|
||||
Error::DecodeFlightData { source, .. } => source.status_code(),
|
||||
Error::ComputeArrow { .. } => StatusCode::Internal,
|
||||
Error::InvalidTimeIndexType { .. } => StatusCode::InvalidArguments,
|
||||
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
|
||||
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
|
||||
}
|
||||
|
||||
@@ -78,7 +78,7 @@ pub struct Inserter {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
pub(crate) partition_manager: PartitionRuleManagerRef,
|
||||
pub(crate) node_manager: NodeManagerRef,
|
||||
table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
}
|
||||
|
||||
pub type InserterRef = Arc<Inserter>;
|
||||
|
||||
@@ -40,7 +40,7 @@ use futures_util::StreamExt;
|
||||
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
|
||||
use session::hints::READ_PREFERENCE_HINT;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
use table::TableRef;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::error::Error::UnsupportedAuthScheme;
|
||||
@@ -149,8 +149,8 @@ impl GreptimeRequestHandler {
|
||||
.clone()
|
||||
.unwrap_or_else(common_runtime::global_runtime);
|
||||
runtime.spawn(async move {
|
||||
// Cached table id
|
||||
let mut table_id: Option<TableId> = None;
|
||||
// Cached table ref
|
||||
let mut table_ref: Option<TableRef> = None;
|
||||
|
||||
let mut decoder = FlightDecoder::default();
|
||||
while let Some(request) = stream.next().await {
|
||||
@@ -169,7 +169,7 @@ impl GreptimeRequestHandler {
|
||||
|
||||
let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer();
|
||||
let result = handler
|
||||
.put_record_batch(&table_name, &mut table_id, &mut decoder, data)
|
||||
.put_record_batch(&table_name, &mut table_ref, &mut decoder, data)
|
||||
.await
|
||||
.inspect_err(|e| error!(e; "Failed to handle flight record batches"));
|
||||
timer.observe_duration();
|
||||
|
||||
@@ -23,8 +23,8 @@ use common_grpc::flight::FlightDecoder;
|
||||
use common_query::Output;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use table::metadata::TableId;
|
||||
use table::table_name::TableName;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
|
||||
@@ -45,8 +45,8 @@ pub trait GrpcQueryHandler {
|
||||
|
||||
async fn put_record_batch(
|
||||
&self,
|
||||
table: &TableName,
|
||||
table_id: &mut Option<TableId>,
|
||||
table_name: &TableName,
|
||||
table_ref: &mut Option<TableRef>,
|
||||
decoder: &mut FlightDecoder,
|
||||
flight_data: FlightData,
|
||||
) -> std::result::Result<AffectedRows, Self::Error>;
|
||||
@@ -77,13 +77,13 @@ where
|
||||
|
||||
async fn put_record_batch(
|
||||
&self,
|
||||
table: &TableName,
|
||||
table_id: &mut Option<TableId>,
|
||||
table_name: &TableName,
|
||||
table_ref: &mut Option<TableRef>,
|
||||
decoder: &mut FlightDecoder,
|
||||
data: FlightData,
|
||||
) -> Result<AffectedRows> {
|
||||
self.0
|
||||
.put_record_batch(table, table_id, decoder, data)
|
||||
.put_record_batch(table_name, table_ref, decoder, data)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcRequestSnafu)
|
||||
|
||||
@@ -34,7 +34,6 @@ use servers::query_handler::sql::{ServerSqlQueryHandlerRef, SqlQueryHandler};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ensure;
|
||||
use sql::statements::statement::Statement;
|
||||
use table::metadata::TableId;
|
||||
use table::table_name::TableName;
|
||||
use table::TableRef;
|
||||
|
||||
@@ -160,15 +159,11 @@ impl GrpcQueryHandler for DummyInstance {
|
||||
|
||||
async fn put_record_batch(
|
||||
&self,
|
||||
table: &TableName,
|
||||
table_id: &mut Option<TableId>,
|
||||
decoder: &mut FlightDecoder,
|
||||
data: FlightData,
|
||||
_table_name: &TableName,
|
||||
_table_ref: &mut Option<TableRef>,
|
||||
_decoder: &mut FlightDecoder,
|
||||
_data: FlightData,
|
||||
) -> std::result::Result<AffectedRows, Self::Error> {
|
||||
let _ = table;
|
||||
let _ = data;
|
||||
let _ = table_id;
|
||||
let _ = decoder;
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user