mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 22:32:55 +00:00
feat(flow): query table schema&refactor (#3943)
* feat: get table info * feat: remove new&unwrap * chore: per PR advices * chore: per review
This commit is contained in:
@@ -128,6 +128,7 @@ impl TableInfoValue {
|
||||
}
|
||||
|
||||
pub type TableInfoManagerRef = Arc<TableInfoManager>;
|
||||
#[derive(Clone)]
|
||||
pub struct TableInfoManager {
|
||||
kv_backend: KvBackendRef,
|
||||
}
|
||||
|
||||
@@ -17,9 +17,16 @@
|
||||
|
||||
pub(crate) mod error;
|
||||
pub(crate) mod node_context;
|
||||
mod table_source;
|
||||
mod util;
|
||||
|
||||
pub(crate) use node_context::{FlowId, FlownodeContext, TableName};
|
||||
pub(crate) use node_context::FlownodeContext;
|
||||
pub(crate) use table_source::TableSource;
|
||||
|
||||
mod worker;
|
||||
|
||||
pub const PER_REQ_MAX_ROW_CNT: usize = 8192;
|
||||
// TODO: refactor common types for flow to a separate module
|
||||
/// FlowId is a unique identifier for a flow task
|
||||
pub type FlowId = u32;
|
||||
pub type TableName = [String; 3];
|
||||
|
||||
@@ -98,13 +98,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid query plan: {source}"))]
|
||||
InvalidQueryPlan {
|
||||
source: query::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid query: prost can't decode substrait plan: {inner}"))]
|
||||
InvalidQueryProst {
|
||||
inner: api::DecodeError,
|
||||
@@ -112,13 +105,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid query, can't transform to substrait: {source}"))]
|
||||
InvalidQuerySubstrait {
|
||||
source: substrait::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid query: {reason}"))]
|
||||
InvalidQuery {
|
||||
reason: String,
|
||||
@@ -193,9 +179,7 @@ impl ErrorExt for Error {
|
||||
Self::TableNotFound { .. }
|
||||
| Self::TableNotFoundMeta { .. }
|
||||
| Self::FlowNotFound { .. } => StatusCode::TableNotFound,
|
||||
Self::InvalidQueryPlan { .. }
|
||||
| Self::InvalidQuerySubstrait { .. }
|
||||
| Self::InvalidQueryProst { .. }
|
||||
Self::InvalidQueryProst { .. }
|
||||
| &Self::InvalidQuery { .. }
|
||||
| &Self::Plan { .. }
|
||||
| &Self::Datatypes { .. } => StatusCode::PlanQuery,
|
||||
|
||||
@@ -17,32 +17,18 @@
|
||||
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::debug;
|
||||
use session::context::QueryContext;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
|
||||
use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu};
|
||||
use crate::adapter::{FlowId, TableName, TableSource};
|
||||
use crate::expr::error::InternalSnafu;
|
||||
use crate::expr::GlobalId;
|
||||
use crate::repr::{DiffRow, RelationType, BROADCAST_CAP};
|
||||
|
||||
// TODO: refactor common types for flow to a separate module
|
||||
/// FlowId is a unique identifier for a flow task
|
||||
pub type FlowId = u64;
|
||||
pub type TableName = [String; 3];
|
||||
|
||||
pub struct TableSource {}
|
||||
|
||||
impl TableSource {
|
||||
pub async fn get_table_name_schema(
|
||||
&self,
|
||||
_table_id: &TableId,
|
||||
) -> Result<(TableName, RelationType), Error> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
/// A context that holds the information of the dataflow
|
||||
#[derive(Default)]
|
||||
pub struct FlownodeContext {
|
||||
@@ -53,7 +39,7 @@ pub struct FlownodeContext {
|
||||
/// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender
|
||||
///
|
||||
/// Note that we are getting insert requests with table id, so we should use table id as the key
|
||||
pub source_sender: BTreeMap<TableId, broadcast::Sender<DiffRow>>,
|
||||
pub source_sender: BTreeMap<TableId, SourceSender>,
|
||||
/// broadcast receiver for sink table, there should only be one receiver, and it will receive all the data from the sink table
|
||||
///
|
||||
/// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key
|
||||
@@ -74,39 +60,91 @@ pub struct FlownodeContext {
|
||||
pub query_context: Option<Arc<QueryContext>>,
|
||||
}
|
||||
|
||||
impl FlownodeContext {
|
||||
// return number of rows it actual send(including what's in the buffer)
|
||||
pub fn send(&mut self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
|
||||
let sender = self
|
||||
.source_sender
|
||||
.get(&table_id)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: table_id.to_string(),
|
||||
})?;
|
||||
let send_buffer = self.send_buffer.entry(table_id).or_default();
|
||||
send_buffer.extend(rows);
|
||||
/// a simple broadcast sender with backpressure and unbound capacity
|
||||
///
|
||||
/// receiver still use tokio broadcast channel, since only sender side need to know
|
||||
/// backpressure and adjust dataflow running duration to avoid blocking
|
||||
pub struct SourceSender {
|
||||
sender: broadcast::Sender<DiffRow>,
|
||||
send_buf: VecDeque<DiffRow>,
|
||||
}
|
||||
|
||||
impl Default for SourceSender {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
sender: broadcast::Sender::new(BROADCAST_CAP),
|
||||
send_buf: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SourceSender {
|
||||
pub fn get_receiver(&self) -> broadcast::Receiver<DiffRow> {
|
||||
self.sender.subscribe()
|
||||
}
|
||||
|
||||
/// send as many as possible rows from send buf
|
||||
/// until send buf is empty or broadchannel is full
|
||||
pub fn try_send_all(&mut self) -> Result<usize, Error> {
|
||||
let mut row_cnt = 0;
|
||||
while let Some(row) = send_buffer.pop_front() {
|
||||
if sender.len() >= BROADCAST_CAP {
|
||||
loop {
|
||||
// if inner sender channel is empty or send buf is empty, there
|
||||
// is nothing to do for now, just break
|
||||
if self.sender.len() >= BROADCAST_CAP || self.send_buf.is_empty() {
|
||||
break;
|
||||
}
|
||||
row_cnt += 1;
|
||||
sender
|
||||
.send(row)
|
||||
.map_err(|err| {
|
||||
InternalSnafu {
|
||||
reason: format!(
|
||||
"Failed to send row to table_id = {:?}, error = {:?}",
|
||||
table_id, err
|
||||
),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
.with_context(|_| EvalSnafu)?;
|
||||
if let Some(row) = self.send_buf.pop_front() {
|
||||
self.sender
|
||||
.send(row)
|
||||
.map_err(|err| {
|
||||
InternalSnafu {
|
||||
reason: format!("Failed to send row, error = {:?}", err),
|
||||
}
|
||||
.build()
|
||||
})
|
||||
.with_context(|_| EvalSnafu)?;
|
||||
row_cnt += 1;
|
||||
}
|
||||
}
|
||||
if row_cnt > 0 {
|
||||
debug!("Send {} rows", row_cnt);
|
||||
}
|
||||
|
||||
Ok(row_cnt)
|
||||
}
|
||||
|
||||
/// return number of rows it actual send(including what's in the buffer)
|
||||
pub fn send_rows(&mut self, rows: Vec<DiffRow>) -> Result<usize, Error> {
|
||||
self.send_buf.extend(rows);
|
||||
|
||||
let row_cnt = self.try_send_all()?;
|
||||
|
||||
Ok(row_cnt)
|
||||
}
|
||||
}
|
||||
|
||||
impl FlownodeContext {
|
||||
/// return number of rows it actual send(including what's in the buffer)
|
||||
///
|
||||
/// TODO(discord9): make this concurrent
|
||||
pub fn send(&mut self, table_id: TableId, rows: Vec<DiffRow>) -> Result<usize, Error> {
|
||||
let sender = self
|
||||
.source_sender
|
||||
.get_mut(&table_id)
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: table_id.to_string(),
|
||||
})?;
|
||||
// debug!("FlownodeContext::send: trying to send {} rows", rows.len());
|
||||
sender.send_rows(rows)
|
||||
}
|
||||
|
||||
/// flush all sender's buf
|
||||
pub fn flush_all_sender(&mut self) -> Result<usize, Error> {
|
||||
self.source_sender
|
||||
.iter_mut()
|
||||
.map(|(_table_id, src_sender)| src_sender.try_send_all())
|
||||
.try_fold(0, |acc, x| x.map(|x| x + acc))
|
||||
}
|
||||
}
|
||||
|
||||
impl FlownodeContext {
|
||||
@@ -120,7 +158,7 @@ impl FlownodeContext {
|
||||
sink_table_name: TableName,
|
||||
) {
|
||||
for source_table_id in source_table_ids {
|
||||
self.add_source_sender(*source_table_id);
|
||||
self.add_source_sender_if_not_exist(*source_table_id);
|
||||
self.source_to_tasks
|
||||
.entry(*source_table_id)
|
||||
.or_default()
|
||||
@@ -131,10 +169,9 @@ impl FlownodeContext {
|
||||
self.flow_to_sink.insert(task_id, sink_table_name);
|
||||
}
|
||||
|
||||
pub fn add_source_sender(&mut self, table_id: TableId) {
|
||||
self.source_sender
|
||||
.entry(table_id)
|
||||
.or_insert_with(|| broadcast::channel(BROADCAST_CAP).0);
|
||||
/// try add source sender, if already exist, do nothing
|
||||
pub fn add_source_sender_if_not_exist(&mut self, table_id: TableId) {
|
||||
let _sender = self.source_sender.entry(table_id).or_default();
|
||||
}
|
||||
|
||||
pub fn add_sink_receiver(&mut self, table_name: TableName) {
|
||||
@@ -143,10 +180,7 @@ impl FlownodeContext {
|
||||
.or_insert_with(mpsc::unbounded_channel::<DiffRow>);
|
||||
}
|
||||
|
||||
pub fn get_source_by_global_id(
|
||||
&self,
|
||||
id: &GlobalId,
|
||||
) -> Result<&broadcast::Sender<DiffRow>, Error> {
|
||||
pub fn get_source_by_global_id(&self, id: &GlobalId) -> Result<&SourceSender, Error> {
|
||||
let table_id = self
|
||||
.table_repr
|
||||
.get_by_global_id(id)
|
||||
|
||||
148
src/flow/src/adapter/table_source.rs
Normal file
148
src/flow/src/adapter/table_source.rs
Normal file
@@ -0,0 +1,148 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! How to query table information from database
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
|
||||
use common_meta::key::table_name::{TableNameKey, TableNameManager};
|
||||
use itertools::Itertools;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::adapter::error::{
|
||||
Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
|
||||
};
|
||||
use crate::adapter::TableName;
|
||||
use crate::repr::{self, ColumnType, RelationType};
|
||||
|
||||
/// mapping of table name <-> table id should be query from tableinfo manager
|
||||
pub struct TableSource {
|
||||
/// for query `TableId -> TableName` mapping
|
||||
table_info_manager: TableInfoManager,
|
||||
table_name_manager: TableNameManager,
|
||||
}
|
||||
|
||||
impl TableSource {
|
||||
pub fn new(table_info_manager: TableInfoManager, table_name_manager: TableNameManager) -> Self {
|
||||
TableSource {
|
||||
table_info_manager,
|
||||
table_name_manager,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_table_id_from_proto_name(
|
||||
&self,
|
||||
name: &greptime_proto::v1::TableName,
|
||||
) -> Result<TableId, Error> {
|
||||
self.table_name_manager
|
||||
.get(TableNameKey::new(
|
||||
&name.catalog_name,
|
||||
&name.schema_name,
|
||||
&name.table_name,
|
||||
))
|
||||
.await
|
||||
.with_context(|_| TableNotFoundMetaSnafu {
|
||||
msg: format!("Table name = {:?}, couldn't found table id", name),
|
||||
})?
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Table name = {:?}, couldn't found table id", name),
|
||||
})
|
||||
.map(|id| id.table_id())
|
||||
}
|
||||
|
||||
/// If the table havn't been created in database, the tableId returned would be null
|
||||
pub async fn get_table_id_from_name(&self, name: &TableName) -> Result<Option<TableId>, Error> {
|
||||
let ret = self
|
||||
.table_name_manager
|
||||
.get(TableNameKey::new(&name[0], &name[1], &name[2]))
|
||||
.await
|
||||
.with_context(|_| TableNotFoundMetaSnafu {
|
||||
msg: format!("Table name = {:?}, couldn't found table id", name),
|
||||
})?
|
||||
.map(|id| id.table_id());
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
/// query metasrv about the table name and table id
|
||||
pub async fn get_table_name(&self, table_id: &TableId) -> Result<TableName, Error> {
|
||||
self.table_info_manager
|
||||
.get(*table_id)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
.with_context(|| UnexpectedSnafu {
|
||||
reason: format!("Table id = {:?}, couldn't found table name", table_id),
|
||||
})
|
||||
.map(|name| name.table_name())
|
||||
.map(|name| [name.catalog_name, name.schema_name, name.table_name])
|
||||
}
|
||||
|
||||
/// query metasrv about the `TableInfoValue` and table id
|
||||
pub async fn get_table_info_value(
|
||||
&self,
|
||||
table_id: &TableId,
|
||||
) -> Result<Option<TableInfoValue>, Error> {
|
||||
Ok(self
|
||||
.table_info_manager
|
||||
.get(*table_id)
|
||||
.await
|
||||
.with_context(|_| TableNotFoundMetaSnafu {
|
||||
msg: format!("TableId = {:?}, couldn't found table name", table_id),
|
||||
})?
|
||||
.map(|v| v.into_inner()))
|
||||
}
|
||||
|
||||
pub async fn get_table_name_schema(
|
||||
&self,
|
||||
table_id: &TableId,
|
||||
) -> Result<(TableName, RelationType), Error> {
|
||||
let table_info_value = self
|
||||
.get_table_info_value(table_id)
|
||||
.await?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
name: format!("TableId = {:?}, Can't found table info", table_id),
|
||||
})?;
|
||||
|
||||
let table_name = table_info_value.table_name();
|
||||
let table_name = [
|
||||
table_name.catalog_name,
|
||||
table_name.schema_name,
|
||||
table_name.table_name,
|
||||
];
|
||||
|
||||
let raw_schema = table_info_value.table_info.meta.schema;
|
||||
let column_types = raw_schema
|
||||
.column_schemas
|
||||
.into_iter()
|
||||
.map(|col| ColumnType {
|
||||
nullable: col.is_nullable(),
|
||||
scalar_type: col.data_type,
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
let key = table_info_value.table_info.meta.primary_key_indices;
|
||||
let keys = vec![repr::Key::from(key)];
|
||||
|
||||
let time_index = raw_schema.timestamp_index;
|
||||
Ok((
|
||||
table_name,
|
||||
RelationType {
|
||||
column_types,
|
||||
keys,
|
||||
time_index,
|
||||
},
|
||||
))
|
||||
}
|
||||
}
|
||||
60
src/flow/src/adapter/util.rs
Normal file
60
src/flow/src/adapter/util.rs
Normal file
@@ -0,0 +1,60 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType};
|
||||
use common_error::ext::BoxedError;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use itertools::Itertools;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::adapter::error::{Error, ExternalSnafu};
|
||||
|
||||
/// convert `ColumnSchema` lists to it's corresponding proto type
|
||||
pub fn column_schemas_to_proto(
|
||||
column_schemas: Vec<ColumnSchema>,
|
||||
primary_keys: &[String],
|
||||
) -> Result<Vec<api::v1::ColumnSchema>, Error> {
|
||||
let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
|
||||
.iter()
|
||||
.map(|c| {
|
||||
ColumnDataTypeWrapper::try_from(c.data_type.clone())
|
||||
.map(|w| w.to_parts())
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)
|
||||
})
|
||||
.try_collect()?;
|
||||
|
||||
let ret = column_schemas
|
||||
.iter()
|
||||
.zip(column_datatypes)
|
||||
.map(|(schema, datatype)| {
|
||||
let semantic_type = if schema.is_time_index() {
|
||||
SemanticType::Timestamp
|
||||
} else if primary_keys.contains(&schema.name) {
|
||||
SemanticType::Tag
|
||||
} else {
|
||||
SemanticType::Field
|
||||
} as i32;
|
||||
|
||||
api::v1::ColumnSchema {
|
||||
column_name: schema.name.clone(),
|
||||
datatype: datatype.0 as i32,
|
||||
semantic_type,
|
||||
datatype_extension: datatype.1,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -20,10 +20,10 @@ use std::sync::Arc;
|
||||
|
||||
use enum_as_inner::EnumAsInner;
|
||||
use hydroflow::scheduled::graph::Hydroflow;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use snafu::{ensure, OptionExt};
|
||||
use tokio::sync::{broadcast, mpsc, Mutex};
|
||||
|
||||
use crate::adapter::error::{Error, EvalSnafu, FlowAlreadyExistSnafu, InternalSnafu};
|
||||
use crate::adapter::error::{Error, FlowAlreadyExistSnafu, InternalSnafu};
|
||||
use crate::adapter::FlowId;
|
||||
use crate::compute::{Context, DataflowState, ErrCollector};
|
||||
use crate::expr::GlobalId;
|
||||
@@ -151,6 +151,8 @@ impl WorkerHandle {
|
||||
/// trigger running the worker, will not block, and will run the worker parallelly
|
||||
///
|
||||
/// will set the current timestamp to `now` for all dataflows before running them
|
||||
///
|
||||
/// the returned error is unrecoverable, and the worker should be shutdown/rebooted
|
||||
pub async fn run_available(&self, now: repr::Timestamp) -> Result<(), Error> {
|
||||
self.itc_client
|
||||
.lock()
|
||||
|
||||
@@ -51,8 +51,9 @@ pub type DiffRow = (Row, Timestamp, Diff);
|
||||
/// Row with key-value pair, timestamp and diff
|
||||
pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
|
||||
|
||||
/// broadcast channel capacity
|
||||
pub const BROADCAST_CAP: usize = 1024;
|
||||
/// broadcast channel capacity, can be important to memory consumption, since this influence how many
|
||||
/// updates can be buffered in memory in the entire dataflow
|
||||
pub const BROADCAST_CAP: usize = 8192;
|
||||
|
||||
/// Convert a value that is or can be converted to Datetime to internal timestamp
|
||||
///
|
||||
|
||||
@@ -35,8 +35,8 @@ use substrait_proto::proto::extensions::simple_extension_declaration::MappingTyp
|
||||
use substrait_proto::proto::extensions::SimpleExtensionDeclaration;
|
||||
|
||||
use crate::adapter::error::{
|
||||
Error, ExternalSnafu, InvalidQueryPlanSnafu, InvalidQueryProstSnafu,
|
||||
InvalidQuerySubstraitSnafu, NotImplementedSnafu, TableNotFoundSnafu, UnexpectedSnafu,
|
||||
Error, ExternalSnafu, InvalidQueryProstSnafu, NotImplementedSnafu, TableNotFoundSnafu,
|
||||
UnexpectedSnafu,
|
||||
};
|
||||
use crate::adapter::FlownodeContext;
|
||||
use crate::expr::GlobalId;
|
||||
@@ -110,12 +110,15 @@ pub async fn sql_to_flow_plan(
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx).context(InvalidQueryPlanSnafu)?;
|
||||
let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let plan = engine
|
||||
.planner()
|
||||
.plan(stmt, query_ctx)
|
||||
.await
|
||||
.context(InvalidQueryPlanSnafu)?;
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let LogicalPlan::DfPlan(plan) = plan;
|
||||
let sub_plan = DFLogicalSubstraitConvertor {}
|
||||
.to_sub_plan(&plan)
|
||||
|
||||
Reference in New Issue
Block a user