mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
10 Commits
feature/df
...
feat/bulk-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
97e9b97a57 | ||
|
|
7fc74e2928 | ||
|
|
77f20ede7a | ||
|
|
ced018fce0 | ||
|
|
41dacff283 | ||
|
|
94a14b6da7 | ||
|
|
6ad3a32cb2 | ||
|
|
ac00314578 | ||
|
|
2f08bee08f | ||
|
|
8ebb31cdcd |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -5143,7 +5143,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "greptime-proto"
|
name = "greptime-proto"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=f0913f179ee1d2ce428f8b85a9ea12b5f69ad636#f0913f179ee1d2ce428f8b85a9ea12b5f69ad636"
|
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=17971523673f4fbc982510d3c9d6647ff642e16f#17971523673f4fbc982510d3c9d6647ff642e16f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"prost 0.13.5",
|
"prost 0.13.5",
|
||||||
"serde",
|
"serde",
|
||||||
|
|||||||
@@ -134,7 +134,7 @@ etcd-client = "0.14"
|
|||||||
fst = "0.4.7"
|
fst = "0.4.7"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f0913f179ee1d2ce428f8b85a9ea12b5f69ad636" }
|
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17971523673f4fbc982510d3c9d6647ff642e16f" }
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
http = "1"
|
http = "1"
|
||||||
humantime = "2.1"
|
humantime = "2.1"
|
||||||
|
|||||||
@@ -12,7 +12,7 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
|
||||||
use api::v1::region::InsertRequests;
|
use api::v1::region::InsertRequests;
|
||||||
use common_error::ext::BoxedError;
|
use common_error::ext::BoxedError;
|
||||||
use common_meta::node_manager::Flownode;
|
use common_meta::node_manager::Flownode;
|
||||||
@@ -44,6 +44,16 @@ impl Flownode for FlowRequester {
|
|||||||
.map_err(BoxedError::new)
|
.map_err(BoxedError::new)
|
||||||
.context(common_meta::error::ExternalSnafu)
|
.context(common_meta::error::ExternalSnafu)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_mark_window_dirty(
|
||||||
|
&self,
|
||||||
|
req: DirtyWindowRequest,
|
||||||
|
) -> common_meta::error::Result<FlowResponse> {
|
||||||
|
self.handle_mark_window_dirty(req)
|
||||||
|
.await
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(common_meta::error::ExternalSnafu)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowRequester {
|
impl FlowRequester {
|
||||||
@@ -91,4 +101,20 @@ impl FlowRequester {
|
|||||||
.into_inner();
|
.into_inner();
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
|
||||||
|
let (addr, mut client) = self.client.raw_flow_client()?;
|
||||||
|
let response = client
|
||||||
|
.handle_mark_dirty_time_window(DirtyWindowRequests {
|
||||||
|
requests: vec![req],
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.or_else(|e| {
|
||||||
|
let code = e.code();
|
||||||
|
let err: crate::error::Error = e.into();
|
||||||
|
Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
|
||||||
|
})?
|
||||||
|
.into_inner();
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,7 +15,7 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use api::region::RegionResponse;
|
use api::region::RegionResponse;
|
||||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
|
||||||
use api::v1::region::{InsertRequests, RegionRequest};
|
use api::v1::region::{InsertRequests, RegionRequest};
|
||||||
pub use common_base::AffectedRows;
|
pub use common_base::AffectedRows;
|
||||||
use common_query::request::QueryRequest;
|
use common_query::request::QueryRequest;
|
||||||
@@ -42,6 +42,9 @@ pub trait Flownode: Send + Sync {
|
|||||||
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;
|
async fn handle(&self, request: FlowRequest) -> Result<FlowResponse>;
|
||||||
|
|
||||||
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
|
async fn handle_inserts(&self, request: InsertRequests) -> Result<FlowResponse>;
|
||||||
|
|
||||||
|
/// Handles requests to mark time window as dirty.
|
||||||
|
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type FlownodeRef = Arc<dyn Flownode>;
|
pub type FlownodeRef = Arc<dyn Flownode>;
|
||||||
|
|||||||
@@ -15,7 +15,7 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use api::region::RegionResponse;
|
use api::region::RegionResponse;
|
||||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse};
|
||||||
use api::v1::region::{InsertRequests, RegionRequest};
|
use api::v1::region::{InsertRequests, RegionRequest};
|
||||||
pub use common_base::AffectedRows;
|
pub use common_base::AffectedRows;
|
||||||
use common_query::request::QueryRequest;
|
use common_query::request::QueryRequest;
|
||||||
@@ -67,6 +67,14 @@ pub trait MockFlownodeHandler: Sync + Send + Clone {
|
|||||||
) -> Result<FlowResponse> {
|
) -> Result<FlowResponse> {
|
||||||
unimplemented!()
|
unimplemented!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_mark_window_dirty(
|
||||||
|
&self,
|
||||||
|
_peer: &Peer,
|
||||||
|
_req: DirtyWindowRequest,
|
||||||
|
) -> Result<FlowResponse> {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A mock struct implements [NodeManager] only implement the `datanode` method.
|
/// A mock struct implements [NodeManager] only implement the `datanode` method.
|
||||||
@@ -134,6 +142,10 @@ impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
|
|||||||
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
|
async fn handle_inserts(&self, requests: InsertRequests) -> Result<FlowResponse> {
|
||||||
self.handler.handle_inserts(&self.peer, requests).await
|
self.handler.handle_inserts(&self.peer, requests).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
|
||||||
|
self.handler.handle_mark_window_dirty(&self.peer, req).await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
|
|||||||
@@ -316,7 +316,7 @@ impl StreamingEngine {
|
|||||||
);
|
);
|
||||||
|
|
||||||
METRIC_FLOW_ROWS
|
METRIC_FLOW_ROWS
|
||||||
.with_label_values(&["out"])
|
.with_label_values(&["out-streaming"])
|
||||||
.inc_by(total_rows as u64);
|
.inc_by(total_rows as u64);
|
||||||
|
|
||||||
let now = self.tick_manager.tick();
|
let now = self.tick_manager.tick();
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ use common_runtime::JoinHandle;
|
|||||||
use common_telemetry::{error, info, trace, warn};
|
use common_telemetry::{error, info, trace, warn};
|
||||||
use datatypes::value::Value;
|
use datatypes::value::Value;
|
||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
|
use greptime_proto::v1::flow::DirtyWindowRequest;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use session::context::QueryContextBuilder;
|
use session::context::QueryContextBuilder;
|
||||||
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
use snafu::{ensure, IntoError, OptionExt, ResultExt};
|
||||||
@@ -46,7 +47,7 @@ use crate::error::{
|
|||||||
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
|
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
|
||||||
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
|
||||||
};
|
};
|
||||||
use crate::metrics::METRIC_FLOW_TASK_COUNT;
|
use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
|
||||||
use crate::repr::{self, DiffRow};
|
use crate::repr::{self, DiffRow};
|
||||||
use crate::{Error, FlowId};
|
use crate::{Error, FlowId};
|
||||||
|
|
||||||
@@ -689,6 +690,9 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
|
||||||
let mut to_batch_engine = request.requests;
|
let mut to_batch_engine = request.requests;
|
||||||
|
|
||||||
|
let mut batching_row_cnt = 0;
|
||||||
|
let mut streaming_row_cnt = 0;
|
||||||
|
|
||||||
{
|
{
|
||||||
// not locking this, or recover flows will be starved when also handling flow inserts
|
// not locking this, or recover flows will be starved when also handling flow inserts
|
||||||
let src_table2flow = self.src_table2flow.read().await;
|
let src_table2flow = self.src_table2flow.read().await;
|
||||||
@@ -698,9 +702,11 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
let is_in_stream = src_table2flow.in_stream(table_id);
|
let is_in_stream = src_table2flow.in_stream(table_id);
|
||||||
let is_in_batch = src_table2flow.in_batch(table_id);
|
let is_in_batch = src_table2flow.in_batch(table_id);
|
||||||
if is_in_stream {
|
if is_in_stream {
|
||||||
|
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||||
to_stream_engine.push(req.clone());
|
to_stream_engine.push(req.clone());
|
||||||
}
|
}
|
||||||
if is_in_batch {
|
if is_in_batch {
|
||||||
|
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if !is_in_batch && !is_in_stream {
|
if !is_in_batch && !is_in_stream {
|
||||||
@@ -713,6 +719,14 @@ impl FlowEngine for FlowDualEngine {
|
|||||||
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
|
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
|
||||||
}
|
}
|
||||||
|
|
||||||
|
METRIC_FLOW_ROWS
|
||||||
|
.with_label_values(&["in-streaming"])
|
||||||
|
.inc_by(streaming_row_cnt as u64);
|
||||||
|
|
||||||
|
METRIC_FLOW_ROWS
|
||||||
|
.with_label_values(&["in-batching"])
|
||||||
|
.inc_by(batching_row_cnt as u64);
|
||||||
|
|
||||||
let streaming_engine = self.streaming_engine.clone();
|
let streaming_engine = self.streaming_engine.clone();
|
||||||
let stream_handler: JoinHandle<Result<(), Error>> =
|
let stream_handler: JoinHandle<Result<(), Error>> =
|
||||||
common_runtime::spawn_global(async move {
|
common_runtime::spawn_global(async move {
|
||||||
@@ -819,6 +833,10 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
|
|||||||
.map(|_| Default::default())
|
.map(|_| Default::default())
|
||||||
.map_err(to_meta_err(snafu::location!()))
|
.map_err(to_meta_err(snafu::location!()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
|
||||||
|
unreachable!()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
|
/// return a function to convert `crate::error::Error` to `common_meta::error::Error`
|
||||||
@@ -926,6 +944,10 @@ impl common_meta::node_manager::Flownode for StreamingEngine {
|
|||||||
.map(|_| Default::default())
|
.map(|_| Default::default())
|
||||||
.map_err(to_meta_err(snafu::location!()))
|
.map_err(to_meta_err(snafu::location!()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult<FlowResponse> {
|
||||||
|
unreachable!()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowEngine for StreamingEngine {
|
impl FlowEngine for StreamingEngine {
|
||||||
|
|||||||
@@ -17,6 +17,7 @@
|
|||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
|
||||||
use catalog::CatalogManagerRef;
|
use catalog::CatalogManagerRef;
|
||||||
use common_error::ext::BoxedError;
|
use common_error::ext::BoxedError;
|
||||||
use common_meta::ddl::create_flow::FlowType;
|
use common_meta::ddl::create_flow::FlowType;
|
||||||
@@ -29,8 +30,7 @@ use common_telemetry::{debug, info};
|
|||||||
use common_time::TimeToLive;
|
use common_time::TimeToLive;
|
||||||
use query::QueryEngineRef;
|
use query::QueryEngineRef;
|
||||||
use snafu::{ensure, OptionExt, ResultExt};
|
use snafu::{ensure, OptionExt, ResultExt};
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::{RegionId, TableId};
|
||||||
use table::metadata::TableId;
|
|
||||||
use tokio::sync::{oneshot, RwLock};
|
use tokio::sync::{oneshot, RwLock};
|
||||||
|
|
||||||
use crate::batching_mode::frontend_client::FrontendClient;
|
use crate::batching_mode::frontend_client::FrontendClient;
|
||||||
@@ -42,6 +42,7 @@ use crate::error::{
|
|||||||
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
|
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
|
||||||
UnexpectedSnafu, UnsupportedSnafu,
|
UnexpectedSnafu, UnsupportedSnafu,
|
||||||
};
|
};
|
||||||
|
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW;
|
||||||
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
use crate::{CreateFlowArgs, Error, FlowId, TableName};
|
||||||
|
|
||||||
/// Batching mode Engine, responsible for driving all the batching mode tasks
|
/// Batching mode Engine, responsible for driving all the batching mode tasks
|
||||||
@@ -77,6 +78,116 @@ impl BatchingEngine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn handle_mark_dirty_time_window(
|
||||||
|
&self,
|
||||||
|
reqs: DirtyWindowRequests,
|
||||||
|
) -> Result<FlowResponse, Error> {
|
||||||
|
let table_info_mgr = self.table_meta.table_info_manager();
|
||||||
|
|
||||||
|
let mut group_by_table_id: HashMap<u32, Vec<_>> = HashMap::new();
|
||||||
|
for r in reqs.requests {
|
||||||
|
let tid = TableId::from(r.table_id);
|
||||||
|
let entry = group_by_table_id.entry(tid).or_default();
|
||||||
|
entry.extend(r.timestamps);
|
||||||
|
}
|
||||||
|
let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
|
||||||
|
let table_infos =
|
||||||
|
table_info_mgr
|
||||||
|
.batch_get(&tids)
|
||||||
|
.await
|
||||||
|
.with_context(|_| TableNotFoundMetaSnafu {
|
||||||
|
msg: format!("Failed to get table info for table ids: {:?}", tids),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let group_by_table_name = group_by_table_id
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|(id, timestamps)| {
|
||||||
|
let table_name = table_infos.get(&id).map(|info| info.table_name());
|
||||||
|
let Some(table_name) = table_name else {
|
||||||
|
warn!("Failed to get table infos for table id: {:?}", id);
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
let table_name = [
|
||||||
|
table_name.catalog_name,
|
||||||
|
table_name.schema_name,
|
||||||
|
table_name.table_name,
|
||||||
|
];
|
||||||
|
let schema = &table_infos.get(&id).unwrap().table_info.meta.schema;
|
||||||
|
let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()]
|
||||||
|
.data_type
|
||||||
|
.as_timestamp()
|
||||||
|
.unwrap()
|
||||||
|
.unit();
|
||||||
|
Some((table_name, (timestamps, time_index_unit)))
|
||||||
|
})
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
|
let group_by_table_name = Arc::new(group_by_table_name);
|
||||||
|
|
||||||
|
let mut handles = Vec::new();
|
||||||
|
let tasks = self.tasks.read().await;
|
||||||
|
|
||||||
|
for (_flow_id, task) in tasks.iter() {
|
||||||
|
let src_table_names = &task.config.source_table_names;
|
||||||
|
|
||||||
|
if src_table_names
|
||||||
|
.iter()
|
||||||
|
.all(|name| !group_by_table_name.contains_key(name))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let group_by_table_name = group_by_table_name.clone();
|
||||||
|
let task = task.clone();
|
||||||
|
|
||||||
|
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
|
||||||
|
let src_table_names = &task.config.source_table_names;
|
||||||
|
let mut all_dirty_windows = vec![];
|
||||||
|
for src_table_name in src_table_names {
|
||||||
|
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
|
||||||
|
let Some(expr) = &task.config.time_window_expr else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
for timestamp in timestamps {
|
||||||
|
let align_start = expr
|
||||||
|
.eval(common_time::Timestamp::new(*timestamp, *unit))?
|
||||||
|
.0
|
||||||
|
.context(UnexpectedSnafu {
|
||||||
|
reason: "Failed to eval start value",
|
||||||
|
})?;
|
||||||
|
all_dirty_windows.push(align_start);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let mut state = task.state.write().unwrap();
|
||||||
|
let flow_id_label = task.config.flow_id.to_string();
|
||||||
|
for timestamp in all_dirty_windows {
|
||||||
|
state.dirty_time_windows.add_window(timestamp, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW
|
||||||
|
.with_label_values(&[&flow_id_label])
|
||||||
|
.set(state.dirty_time_windows.len() as f64);
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
handles.push(handle);
|
||||||
|
}
|
||||||
|
drop(tasks);
|
||||||
|
for handle in handles {
|
||||||
|
match handle.await {
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to handle inserts: {e}");
|
||||||
|
}
|
||||||
|
Ok(Ok(())) => (),
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
warn!("Failed to handle inserts: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Default::default())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn handle_inserts_inner(
|
pub async fn handle_inserts_inner(
|
||||||
&self,
|
&self,
|
||||||
request: api::v1::region::InsertRequests,
|
request: api::v1::region::InsertRequests,
|
||||||
|
|||||||
@@ -156,6 +156,11 @@ impl DirtyTimeWindows {
|
|||||||
self.windows.clear();
|
self.windows.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Number of dirty windows.
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.windows.len()
|
||||||
|
}
|
||||||
|
|
||||||
/// Generate all filter expressions consuming all time windows
|
/// Generate all filter expressions consuming all time windows
|
||||||
///
|
///
|
||||||
/// there is two limits:
|
/// there is two limits:
|
||||||
|
|||||||
@@ -61,7 +61,9 @@ use crate::error::{
|
|||||||
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
|
||||||
};
|
};
|
||||||
use crate::metrics::{
|
use crate::metrics::{
|
||||||
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
|
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
|
||||||
|
METRIC_FLOW_ROWS,
|
||||||
};
|
};
|
||||||
use crate::{Error, FlowId};
|
use crate::{Error, FlowId};
|
||||||
|
|
||||||
@@ -371,6 +373,9 @@ impl BatchingTask {
|
|||||||
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
|
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
|
||||||
elapsed
|
elapsed
|
||||||
);
|
);
|
||||||
|
METRIC_FLOW_ROWS
|
||||||
|
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
|
||||||
|
.inc_by(*affected_rows as _);
|
||||||
} else if let Err(err) = &res {
|
} else if let Err(err) = &res {
|
||||||
warn!(
|
warn!(
|
||||||
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
|
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
|
||||||
@@ -410,6 +415,7 @@ impl BatchingTask {
|
|||||||
engine: QueryEngineRef,
|
engine: QueryEngineRef,
|
||||||
frontend_client: Arc<FrontendClient>,
|
frontend_client: Arc<FrontendClient>,
|
||||||
) {
|
) {
|
||||||
|
let flow_id_str = self.config.flow_id.to_string();
|
||||||
loop {
|
loop {
|
||||||
// first check if shutdown signal is received
|
// first check if shutdown signal is received
|
||||||
// if so, break the loop
|
// if so, break the loop
|
||||||
@@ -427,6 +433,9 @@ impl BatchingTask {
|
|||||||
Err(TryRecvError::Empty) => (),
|
Err(TryRecvError::Empty) => (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
|
||||||
|
.with_label_values(&[&flow_id_str])
|
||||||
|
.inc();
|
||||||
|
|
||||||
let new_query = match self.gen_insert_plan(&engine).await {
|
let new_query = match self.gen_insert_plan(&engine).await {
|
||||||
Ok(new_query) => new_query,
|
Ok(new_query) => new_query,
|
||||||
@@ -473,6 +482,9 @@ impl BatchingTask {
|
|||||||
}
|
}
|
||||||
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
|
||||||
|
.with_label_values(&[&flow_id_str])
|
||||||
|
.inc();
|
||||||
match new_query {
|
match new_query {
|
||||||
Some(query) => {
|
Some(query) => {
|
||||||
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
|
||||||
|
|||||||
@@ -58,11 +58,32 @@ lazy_static! {
|
|||||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW: GaugeVec =
|
||||||
|
register_gauge_vec!(
|
||||||
|
"greptime_flow_batching_engine_bulk_mark_time_window",
|
||||||
|
"flow batching engine query time window count marked by bulk inserts",
|
||||||
|
&["flow_id"],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
|
||||||
|
register_int_counter_vec!(
|
||||||
|
"greptime_flow_batching_start_query_count",
|
||||||
|
"flow batching engine started query count",
|
||||||
|
&["flow_id"],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
|
||||||
|
register_int_counter_vec!(
|
||||||
|
"greptime_flow_batching_error_count",
|
||||||
|
"flow batching engine error count per flow id",
|
||||||
|
&["flow_id"],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
||||||
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
||||||
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
||||||
"greptime_flow_processed_rows",
|
"greptime_flow_processed_rows",
|
||||||
"Count of rows flowing through the system",
|
"Count of rows flowing through the system.",
|
||||||
&["direction"]
|
&["direction"]
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|||||||
@@ -17,6 +17,7 @@
|
|||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use api::v1::flow::DirtyWindowRequests;
|
||||||
use api::v1::{RowDeleteRequests, RowInsertRequests};
|
use api::v1::{RowDeleteRequests, RowInsertRequests};
|
||||||
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
|
use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME};
|
||||||
use catalog::CatalogManagerRef;
|
use catalog::CatalogManagerRef;
|
||||||
@@ -136,6 +137,18 @@ impl flow_server::Flow for FlowService {
|
|||||||
.map(Response::new)
|
.map(Response::new)
|
||||||
.map_err(to_status_with_last_err)
|
.map_err(to_status_with_last_err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_mark_dirty_time_window(
|
||||||
|
&self,
|
||||||
|
reqs: Request<DirtyWindowRequests>,
|
||||||
|
) -> Result<Response<FlowResponse>, Status> {
|
||||||
|
self.dual_engine
|
||||||
|
.batching_engine()
|
||||||
|
.handle_mark_dirty_time_window(reqs.into_inner())
|
||||||
|
.await
|
||||||
|
.map(Response::new)
|
||||||
|
.map_err(to_status_with_last_err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|||||||
@@ -35,8 +35,8 @@ use servers::query_handler::grpc::GrpcQueryHandler;
|
|||||||
use servers::query_handler::sql::SqlQueryHandler;
|
use servers::query_handler::sql::SqlQueryHandler;
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::{ensure, OptionExt, ResultExt};
|
use snafu::{ensure, OptionExt, ResultExt};
|
||||||
use table::metadata::TableId;
|
|
||||||
use table::table_name::TableName;
|
use table::table_name::TableName;
|
||||||
|
use table::TableRef;
|
||||||
|
|
||||||
use crate::error::{
|
use crate::error::{
|
||||||
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
|
CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu,
|
||||||
@@ -235,34 +235,33 @@ impl GrpcQueryHandler for Instance {
|
|||||||
|
|
||||||
async fn put_record_batch(
|
async fn put_record_batch(
|
||||||
&self,
|
&self,
|
||||||
table: &TableName,
|
table_name: &TableName,
|
||||||
table_id: &mut Option<TableId>,
|
table_ref: &mut Option<TableRef>,
|
||||||
decoder: &mut FlightDecoder,
|
decoder: &mut FlightDecoder,
|
||||||
data: FlightData,
|
data: FlightData,
|
||||||
) -> Result<AffectedRows> {
|
) -> Result<AffectedRows> {
|
||||||
let table_id = if let Some(table_id) = table_id {
|
let table = if let Some(table) = table_ref {
|
||||||
*table_id
|
table.clone()
|
||||||
} else {
|
} else {
|
||||||
let table = self
|
let table = self
|
||||||
.catalog_manager()
|
.catalog_manager()
|
||||||
.table(
|
.table(
|
||||||
&table.catalog_name,
|
&table_name.catalog_name,
|
||||||
&table.schema_name,
|
&table_name.schema_name,
|
||||||
&table.table_name,
|
&table_name.table_name,
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.context(CatalogSnafu)?
|
.context(CatalogSnafu)?
|
||||||
.with_context(|| TableNotFoundSnafu {
|
.with_context(|| TableNotFoundSnafu {
|
||||||
table_name: table.to_string(),
|
table_name: table_name.to_string(),
|
||||||
})?;
|
})?;
|
||||||
let id = table.table_info().table_id();
|
*table_ref = Some(table.clone());
|
||||||
*table_id = Some(id);
|
table
|
||||||
id
|
|
||||||
};
|
};
|
||||||
|
|
||||||
self.inserter
|
self.inserter
|
||||||
.handle_bulk_insert(table_id, decoder, data)
|
.handle_bulk_insert(table, decoder, data)
|
||||||
.await
|
.await
|
||||||
.context(TableOperationSnafu)
|
.context(TableOperationSnafu)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,18 +12,29 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
|
|
||||||
use ahash::{HashMap, HashMapExt};
|
use ahash::{HashMap, HashMapExt};
|
||||||
|
use api::v1::flow::DirtyWindowRequest;
|
||||||
use api::v1::region::{
|
use api::v1::region::{
|
||||||
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
|
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
|
||||||
};
|
};
|
||||||
use api::v1::ArrowIpc;
|
use api::v1::ArrowIpc;
|
||||||
|
use arrow::array::{
|
||||||
|
Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
|
||||||
|
TimestampSecondArray,
|
||||||
|
};
|
||||||
|
use arrow::datatypes::{DataType, Int64Type, TimeUnit};
|
||||||
|
use arrow::record_batch::RecordBatch;
|
||||||
use common_base::AffectedRows;
|
use common_base::AffectedRows;
|
||||||
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
|
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
|
||||||
use common_grpc::FlightData;
|
use common_grpc::FlightData;
|
||||||
|
use common_telemetry::error;
|
||||||
use common_telemetry::tracing_context::TracingContext;
|
use common_telemetry::tracing_context::TracingContext;
|
||||||
use snafu::ResultExt;
|
use snafu::{OptionExt, ResultExt};
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
use table::metadata::TableId;
|
use table::metadata::TableInfoRef;
|
||||||
|
use table::TableRef;
|
||||||
|
|
||||||
use crate::insert::Inserter;
|
use crate::insert::Inserter;
|
||||||
use crate::{error, metrics};
|
use crate::{error, metrics};
|
||||||
@@ -32,10 +43,12 @@ impl Inserter {
|
|||||||
/// Handle bulk insert request.
|
/// Handle bulk insert request.
|
||||||
pub async fn handle_bulk_insert(
|
pub async fn handle_bulk_insert(
|
||||||
&self,
|
&self,
|
||||||
table_id: TableId,
|
table: TableRef,
|
||||||
decoder: &mut FlightDecoder,
|
decoder: &mut FlightDecoder,
|
||||||
data: FlightData,
|
data: FlightData,
|
||||||
) -> error::Result<AffectedRows> {
|
) -> error::Result<AffectedRows> {
|
||||||
|
let table_info = table.table_info();
|
||||||
|
let table_id = table_info.table_id();
|
||||||
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||||
.with_label_values(&["decode_request"])
|
.with_label_values(&["decode_request"])
|
||||||
.start_timer();
|
.start_timer();
|
||||||
@@ -48,6 +61,10 @@ impl Inserter {
|
|||||||
return Ok(0);
|
return Ok(0);
|
||||||
};
|
};
|
||||||
decode_timer.observe_duration();
|
decode_timer.observe_duration();
|
||||||
|
|
||||||
|
// notify flownode to update dirty timestamps if flow is configured.
|
||||||
|
self.maybe_update_flow_dirty_window(table_info, record_batch.clone());
|
||||||
|
|
||||||
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
|
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
|
||||||
metrics::BULK_REQUEST_ROWS
|
metrics::BULK_REQUEST_ROWS
|
||||||
.with_label_values(&["raw"])
|
.with_label_values(&["raw"])
|
||||||
@@ -216,4 +233,103 @@ impl Inserter {
|
|||||||
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
|
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
|
||||||
Ok(rows_inserted)
|
Ok(rows_inserted)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn maybe_update_flow_dirty_window(&self, table_info: TableInfoRef, record_batch: RecordBatch) {
|
||||||
|
let table_id = table_info.table_id();
|
||||||
|
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
|
||||||
|
let node_manager = self.node_manager.clone();
|
||||||
|
common_runtime::spawn_global(async move {
|
||||||
|
let result = table_flownode_set_cache
|
||||||
|
.get(table_id)
|
||||||
|
.await
|
||||||
|
.context(error::RequestInsertsSnafu);
|
||||||
|
let flownodes = match result {
|
||||||
|
Ok(flownodes) => flownodes.unwrap_or_default(),
|
||||||
|
Err(e) => {
|
||||||
|
error!(e; "Failed to get flownodes for table id: {}", table_id);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let peers: HashSet<_> = flownodes.values().cloned().collect();
|
||||||
|
if peers.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let Ok(timestamps) = extract_timestamps(
|
||||||
|
&record_batch,
|
||||||
|
&table_info
|
||||||
|
.meta
|
||||||
|
.schema
|
||||||
|
.timestamp_column()
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.name,
|
||||||
|
)
|
||||||
|
.inspect_err(|e| {
|
||||||
|
error!(e; "Failed to extract timestamps from record batch");
|
||||||
|
}) else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
for peer in peers {
|
||||||
|
let node_manager = node_manager.clone();
|
||||||
|
let timestamps = timestamps.clone();
|
||||||
|
common_runtime::spawn_global(async move {
|
||||||
|
if let Err(e) = node_manager
|
||||||
|
.flownode(&peer)
|
||||||
|
.await
|
||||||
|
.handle_mark_window_dirty(DirtyWindowRequest {
|
||||||
|
table_id,
|
||||||
|
timestamps,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.context(error::RequestInsertsSnafu)
|
||||||
|
{
|
||||||
|
error!(e; "Failed to mark timestamps as dirty, table: {}", table_id);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Calculate the timestamp range of record batch. Return `None` if record batch is empty.
|
||||||
|
fn extract_timestamps(rb: &RecordBatch, timestamp_index_name: &str) -> error::Result<Vec<i64>> {
|
||||||
|
let ts_col = rb
|
||||||
|
.column_by_name(timestamp_index_name)
|
||||||
|
.context(error::ColumnNotFoundSnafu {
|
||||||
|
msg: timestamp_index_name,
|
||||||
|
})?;
|
||||||
|
if rb.num_rows() == 0 {
|
||||||
|
return Ok(vec![]);
|
||||||
|
}
|
||||||
|
let primitive = match ts_col.data_type() {
|
||||||
|
DataType::Timestamp(unit, _) => match unit {
|
||||||
|
TimeUnit::Second => ts_col
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<TimestampSecondArray>()
|
||||||
|
.unwrap()
|
||||||
|
.reinterpret_cast::<Int64Type>(),
|
||||||
|
TimeUnit::Millisecond => ts_col
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<TimestampMillisecondArray>()
|
||||||
|
.unwrap()
|
||||||
|
.reinterpret_cast::<Int64Type>(),
|
||||||
|
TimeUnit::Microsecond => ts_col
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<TimestampMicrosecondArray>()
|
||||||
|
.unwrap()
|
||||||
|
.reinterpret_cast::<Int64Type>(),
|
||||||
|
TimeUnit::Nanosecond => ts_col
|
||||||
|
.as_any()
|
||||||
|
.downcast_ref::<TimestampNanosecondArray>()
|
||||||
|
.unwrap()
|
||||||
|
.reinterpret_cast::<Int64Type>(),
|
||||||
|
},
|
||||||
|
t => {
|
||||||
|
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(primitive.iter().flatten().collect())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -838,6 +838,13 @@ pub enum Error {
|
|||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Invalid time index type: {}", ty))]
|
||||||
|
InvalidTimeIndexType {
|
||||||
|
ty: arrow::datatypes::DataType,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
|
|
||||||
#[snafu(display("Invalid process id: {}", id))]
|
#[snafu(display("Invalid process id: {}", id))]
|
||||||
InvalidProcessId { id: String },
|
InvalidProcessId { id: String },
|
||||||
|
|
||||||
@@ -973,6 +980,7 @@ impl ErrorExt for Error {
|
|||||||
Error::ColumnOptions { source, .. } => source.status_code(),
|
Error::ColumnOptions { source, .. } => source.status_code(),
|
||||||
Error::DecodeFlightData { source, .. } => source.status_code(),
|
Error::DecodeFlightData { source, .. } => source.status_code(),
|
||||||
Error::ComputeArrow { .. } => StatusCode::Internal,
|
Error::ComputeArrow { .. } => StatusCode::Internal,
|
||||||
|
Error::InvalidTimeIndexType { .. } => StatusCode::InvalidArguments,
|
||||||
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
|
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
|
||||||
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
|
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -78,7 +78,7 @@ pub struct Inserter {
|
|||||||
catalog_manager: CatalogManagerRef,
|
catalog_manager: CatalogManagerRef,
|
||||||
pub(crate) partition_manager: PartitionRuleManagerRef,
|
pub(crate) partition_manager: PartitionRuleManagerRef,
|
||||||
pub(crate) node_manager: NodeManagerRef,
|
pub(crate) node_manager: NodeManagerRef,
|
||||||
table_flownode_set_cache: TableFlownodeSetCacheRef,
|
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type InserterRef = Arc<Inserter>;
|
pub type InserterRef = Arc<Inserter>;
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ use futures_util::StreamExt;
|
|||||||
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
|
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
|
||||||
use session::hints::READ_PREFERENCE_HINT;
|
use session::hints::READ_PREFERENCE_HINT;
|
||||||
use snafu::{OptionExt, ResultExt};
|
use snafu::{OptionExt, ResultExt};
|
||||||
use table::metadata::TableId;
|
use table::TableRef;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use crate::error::Error::UnsupportedAuthScheme;
|
use crate::error::Error::UnsupportedAuthScheme;
|
||||||
@@ -149,8 +149,8 @@ impl GreptimeRequestHandler {
|
|||||||
.clone()
|
.clone()
|
||||||
.unwrap_or_else(common_runtime::global_runtime);
|
.unwrap_or_else(common_runtime::global_runtime);
|
||||||
runtime.spawn(async move {
|
runtime.spawn(async move {
|
||||||
// Cached table id
|
// Cached table ref
|
||||||
let mut table_id: Option<TableId> = None;
|
let mut table_ref: Option<TableRef> = None;
|
||||||
|
|
||||||
let mut decoder = FlightDecoder::default();
|
let mut decoder = FlightDecoder::default();
|
||||||
while let Some(request) = stream.next().await {
|
while let Some(request) = stream.next().await {
|
||||||
@@ -169,7 +169,7 @@ impl GreptimeRequestHandler {
|
|||||||
|
|
||||||
let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer();
|
let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer();
|
||||||
let result = handler
|
let result = handler
|
||||||
.put_record_batch(&table_name, &mut table_id, &mut decoder, data)
|
.put_record_batch(&table_name, &mut table_ref, &mut decoder, data)
|
||||||
.await
|
.await
|
||||||
.inspect_err(|e| error!(e; "Failed to handle flight record batches"));
|
.inspect_err(|e| error!(e; "Failed to handle flight record batches"));
|
||||||
timer.observe_duration();
|
timer.observe_duration();
|
||||||
|
|||||||
@@ -23,8 +23,8 @@ use common_grpc::flight::FlightDecoder;
|
|||||||
use common_query::Output;
|
use common_query::Output;
|
||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
use table::metadata::TableId;
|
|
||||||
use table::table_name::TableName;
|
use table::table_name::TableName;
|
||||||
|
use table::TableRef;
|
||||||
|
|
||||||
use crate::error::{self, Result};
|
use crate::error::{self, Result};
|
||||||
|
|
||||||
@@ -45,8 +45,8 @@ pub trait GrpcQueryHandler {
|
|||||||
|
|
||||||
async fn put_record_batch(
|
async fn put_record_batch(
|
||||||
&self,
|
&self,
|
||||||
table: &TableName,
|
table_name: &TableName,
|
||||||
table_id: &mut Option<TableId>,
|
table_ref: &mut Option<TableRef>,
|
||||||
decoder: &mut FlightDecoder,
|
decoder: &mut FlightDecoder,
|
||||||
flight_data: FlightData,
|
flight_data: FlightData,
|
||||||
) -> std::result::Result<AffectedRows, Self::Error>;
|
) -> std::result::Result<AffectedRows, Self::Error>;
|
||||||
@@ -77,13 +77,13 @@ where
|
|||||||
|
|
||||||
async fn put_record_batch(
|
async fn put_record_batch(
|
||||||
&self,
|
&self,
|
||||||
table: &TableName,
|
table_name: &TableName,
|
||||||
table_id: &mut Option<TableId>,
|
table_ref: &mut Option<TableRef>,
|
||||||
decoder: &mut FlightDecoder,
|
decoder: &mut FlightDecoder,
|
||||||
data: FlightData,
|
data: FlightData,
|
||||||
) -> Result<AffectedRows> {
|
) -> Result<AffectedRows> {
|
||||||
self.0
|
self.0
|
||||||
.put_record_batch(table, table_id, decoder, data)
|
.put_record_batch(table_name, table_ref, decoder, data)
|
||||||
.await
|
.await
|
||||||
.map_err(BoxedError::new)
|
.map_err(BoxedError::new)
|
||||||
.context(error::ExecuteGrpcRequestSnafu)
|
.context(error::ExecuteGrpcRequestSnafu)
|
||||||
|
|||||||
@@ -34,7 +34,6 @@ use servers::query_handler::sql::{ServerSqlQueryHandlerRef, SqlQueryHandler};
|
|||||||
use session::context::QueryContextRef;
|
use session::context::QueryContextRef;
|
||||||
use snafu::ensure;
|
use snafu::ensure;
|
||||||
use sql::statements::statement::Statement;
|
use sql::statements::statement::Statement;
|
||||||
use table::metadata::TableId;
|
|
||||||
use table::table_name::TableName;
|
use table::table_name::TableName;
|
||||||
use table::TableRef;
|
use table::TableRef;
|
||||||
|
|
||||||
@@ -160,15 +159,11 @@ impl GrpcQueryHandler for DummyInstance {
|
|||||||
|
|
||||||
async fn put_record_batch(
|
async fn put_record_batch(
|
||||||
&self,
|
&self,
|
||||||
table: &TableName,
|
_table_name: &TableName,
|
||||||
table_id: &mut Option<TableId>,
|
_table_ref: &mut Option<TableRef>,
|
||||||
decoder: &mut FlightDecoder,
|
_decoder: &mut FlightDecoder,
|
||||||
data: FlightData,
|
_data: FlightData,
|
||||||
) -> std::result::Result<AffectedRows, Self::Error> {
|
) -> std::result::Result<AffectedRows, Self::Error> {
|
||||||
let _ = table;
|
|
||||||
let _ = data;
|
|
||||||
let _ = table_id;
|
|
||||||
let _ = decoder;
|
|
||||||
unimplemented!()
|
unimplemented!()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user