feat: FLOWS table in information_schema&SHOW FLOWS (#4386)

* feat(WIP): flow info table

refactor: better err handling&log

feat: add flow metadata to info schema provider

feat(WIP): info_schema.flows

feat: info_schema.flows table

* fix: err after rebase

* fix: wrong comparsion op

* feat: SHOW FLOWS&tests

* refactor: per review

* chore: unused

* refactor: json error

* chore: per review

* test: sqlness

* chore: rm inline error

* refactor: per review
This commit is contained in:
discord9
2024-07-19 17:29:36 +08:00
committed by GitHub
parent ce900e850a
commit 9fa9156bde
24 changed files with 690 additions and 21 deletions

View File

@@ -57,6 +57,31 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("Failed to list flows in catalog {catalog}"))]
ListFlows {
#[snafu(implicit)]
location: Location,
catalog: String,
source: BoxedError,
},
#[snafu(display("Flow info not found: {flow_name} in catalog {catalog_name}"))]
FlowInfoNotFound {
flow_name: String,
catalog_name: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Can't convert value to json, input={input}"))]
Json {
input: String,
#[snafu(source)]
error: serde_json::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to re-compile script due to internal error"))]
CompileScriptInternal {
#[snafu(implicit)]
@@ -254,12 +279,15 @@ impl ErrorExt for Error {
| Error::FindPartitions { .. }
| Error::FindRegionRoutes { .. }
| Error::CacheNotFound { .. }
| Error::CastManager { .. } => StatusCode::Unexpected,
| Error::CastManager { .. }
| Error::Json { .. } => StatusCode::Unexpected,
Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments,
Error::ViewInfoNotFound { .. } => StatusCode::TableNotFound,
Error::FlowInfoNotFound { .. } => StatusCode::FlowNotFound,
Error::SystemCatalog { .. } => StatusCode::StorageUnavailable,
Error::UpgradeWeakCatalogManagerRef { .. } => StatusCode::Internal,
@@ -270,7 +298,8 @@ impl ErrorExt for Error {
Error::ListCatalogs { source, .. }
| Error::ListNodes { source, .. }
| Error::ListSchemas { source, .. }
| Error::ListTables { source, .. } => source.status_code(),
| Error::ListTables { source, .. }
| Error::ListFlows { source, .. } => source.status_code(),
Error::CreateTable { source, .. } => source.status_code(),

View File

@@ -25,6 +25,7 @@ use common_config::Mode;
use common_error::ext::BoxedError;
use common_meta::cache::{LayeredCacheRegistryRef, ViewInfoCacheRef};
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_name::TableNameKey;
@@ -85,7 +86,7 @@ impl KvBackendCatalogManager {
.get()
.expect("Failed to get table_route_cache"),
)),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())),
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
@@ -93,11 +94,13 @@ impl KvBackendCatalogManager {
information_schema_provider: Arc::new(InformationSchemaProvider::new(
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
Arc::new(FlowMetadataManager::new(backend.clone())),
)),
pg_catalog_provider: Arc::new(PGCatalogProvider::new(
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
backend,
},
cache_registry,
})
@@ -313,6 +316,7 @@ struct SystemCatalog {
// system_schema_provier for default catalog
information_schema_provider: Arc<InformationSchemaProvider>,
pg_catalog_provider: Arc<PGCatalogProvider>,
backend: KvBackendRef,
}
impl SystemCatalog {
@@ -358,6 +362,7 @@ impl SystemCatalog {
Arc::new(InformationSchemaProvider::new(
catalog.to_string(),
self.catalog_manager.clone(),
Arc::new(FlowMetadataManager::new(self.backend.clone())),
))
});
information_schema_provider.table(table_name)

View File

@@ -23,6 +23,8 @@ use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
};
use common_meta::key::flow::FlowMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use futures_util::stream::BoxStream;
use snafu::OptionExt;
use table::TableRef;
@@ -298,6 +300,7 @@ impl MemoryCatalogManager {
let information_schema_provider = InformationSchemaProvider::new(
catalog,
Arc::downgrade(self) as Weak<dyn CatalogManager>,
Arc::new(FlowMetadataManager::new(Arc::new(MemoryKvBackend::new()))),
);
let information_schema = information_schema_provider.tables().clone();

View File

@@ -14,6 +14,7 @@
mod cluster_info;
pub mod columns;
pub mod flows;
mod information_memory_table;
pub mod key_column_usage;
mod partitions;
@@ -31,6 +32,7 @@ use std::collections::HashMap;
use std::sync::{Arc, Weak};
use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
use common_meta::key::flow::FlowMetadataManager;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use lazy_static::lazy_static;
@@ -46,6 +48,7 @@ use self::columns::InformationSchemaColumns;
use super::{SystemSchemaProviderInner, SystemTable, SystemTableRef};
use crate::error::Result;
use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
use crate::system_schema::information_schema::flows::InformationSchemaFlows;
use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
@@ -104,6 +107,7 @@ macro_rules! setup_memory_table {
pub struct InformationSchemaProvider {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>,
tables: HashMap<String, TableRef>,
}
@@ -182,16 +186,25 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
FLOWS => Some(Arc::new(InformationSchemaFlows::new(
self.catalog_name.clone(),
self.flow_metadata_manager.clone(),
)) as _),
_ => None,
}
}
}
impl InformationSchemaProvider {
pub fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
pub fn new(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>,
) -> Self {
let mut provider = Self {
catalog_name,
catalog_manager,
flow_metadata_manager,
tables: HashMap::new(),
};
@@ -238,6 +251,7 @@ impl InformationSchemaProvider {
TABLE_CONSTRAINTS.to_string(),
self.build_table(TABLE_CONSTRAINTS).unwrap(),
);
tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
// Add memory tables
for name in MEMORY_TABLES.iter() {

View File

@@ -0,0 +1,305 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
use common_error::ext::BoxedError;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::FlowId;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datatypes::prelude::ConcreteDataType as CDT;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use crate::error::{
CreateRecordBatchSnafu, FlowInfoNotFoundSnafu, InternalSnafu, JsonSnafu, ListFlowsSnafu, Result,
};
use crate::information_schema::{Predicates, FLOWS};
use crate::system_schema::information_schema::InformationTable;
const INIT_CAPACITY: usize = 42;
// rows of information_schema.flows
// pk is (flow_name, flow_id, table_catalog)
pub const FLOW_NAME: &str = "flow_name";
pub const FLOW_ID: &str = "flow_id";
pub const TABLE_CATALOG: &str = "table_catalog";
pub const FLOW_DEFINITION: &str = "flow_definition";
pub const COMMENT: &str = "comment";
pub const EXPIRE_AFTER: &str = "expire_after";
pub const SOURCE_TABLE_IDS: &str = "source_table_ids";
pub const SINK_TABLE_NAME: &str = "sink_table_name";
pub const FLOWNODE_IDS: &str = "flownode_ids";
pub const OPTIONS: &str = "options";
/// The `information_schema.flows` to provides information about flows in databases.
pub(super) struct InformationSchemaFlows {
schema: SchemaRef,
catalog_name: String,
flow_metadata_manager: Arc<FlowMetadataManager>,
}
impl InformationSchemaFlows {
pub(super) fn new(
catalog_name: String,
flow_metadata_manager: Arc<FlowMetadataManager>,
) -> Self {
Self {
schema: Self::schema(),
catalog_name,
flow_metadata_manager,
}
}
/// for complex fields(including [`SOURCE_TABLE_IDS`], [`FLOWNODE_IDS`] and [`OPTIONS`]), it will be serialized to json string for now
/// TODO(discord9): use a better way to store complex fields like json type
pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(
vec![
(FLOW_NAME, CDT::string_datatype(), false),
(FLOW_ID, CDT::uint32_datatype(), false),
(TABLE_CATALOG, CDT::string_datatype(), false),
(FLOW_DEFINITION, CDT::string_datatype(), false),
(COMMENT, CDT::string_datatype(), true),
(EXPIRE_AFTER, CDT::int64_datatype(), true),
(SOURCE_TABLE_IDS, CDT::string_datatype(), true),
(SINK_TABLE_NAME, CDT::string_datatype(), false),
(FLOWNODE_IDS, CDT::string_datatype(), true),
(OPTIONS, CDT::string_datatype(), true),
]
.into_iter()
.map(|(name, ty, nullable)| ColumnSchema::new(name, ty, nullable))
.collect(),
))
}
fn builder(&self) -> InformationSchemaFlowsBuilder {
InformationSchemaFlowsBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
&self.flow_metadata_manager,
)
}
}
impl InformationTable for InformationSchemaFlows {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_FLOW_TABLE_ID
}
fn table_name(&self) -> &'static str {
FLOWS
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_flows(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}
/// Builds the `information_schema.FLOWS` table row by row
///
/// columns are based on [`FlowInfoValue`]
struct InformationSchemaFlowsBuilder {
schema: SchemaRef,
catalog_name: String,
flow_metadata_manager: Arc<FlowMetadataManager>,
flow_names: StringVectorBuilder,
flow_ids: UInt32VectorBuilder,
table_catalogs: StringVectorBuilder,
raw_sqls: StringVectorBuilder,
comments: StringVectorBuilder,
expire_afters: Int64VectorBuilder,
source_table_id_groups: StringVectorBuilder,
sink_table_names: StringVectorBuilder,
flownode_id_groups: StringVectorBuilder,
option_groups: StringVectorBuilder,
}
impl InformationSchemaFlowsBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
flow_metadata_manager: &Arc<FlowMetadataManager>,
) -> Self {
Self {
schema,
catalog_name,
flow_metadata_manager: flow_metadata_manager.clone(),
flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY),
comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
expire_afters: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
source_table_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
sink_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
flownode_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
option_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}
/// Construct the `information_schema.flows` virtual table
async fn make_flows(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let predicates = Predicates::from_scan_request(&request);
let flow_info_manager = self.flow_metadata_manager.clone();
// TODO(discord9): use `AsyncIterator` once it's stable-ish
let mut stream = flow_info_manager
.flow_name_manager()
.flow_names(&catalog_name)
.await;
while let Some((flow_name, flow_id)) = stream
.try_next()
.await
.map_err(BoxedError::new)
.context(ListFlowsSnafu {
catalog: &catalog_name,
})?
{
let flow_info = flow_info_manager
.flow_info_manager()
.get(flow_id.flow_id())
.await
.map_err(BoxedError::new)
.context(InternalSnafu)?
.context(FlowInfoNotFoundSnafu {
catalog_name: catalog_name.to_string(),
flow_name: flow_name.to_string(),
})?;
self.add_flow(&predicates, flow_id.flow_id(), flow_info)?;
}
self.finish()
}
fn add_flow(
&mut self,
predicates: &Predicates,
flow_id: FlowId,
flow_info: FlowInfoValue,
) -> Result<()> {
let row = [
(FLOW_NAME, &Value::from(flow_info.flow_name().to_string())),
(FLOW_ID, &Value::from(flow_id)),
(
TABLE_CATALOG,
&Value::from(flow_info.catalog_name().to_string()),
),
];
if !predicates.eval(&row) {
return Ok(());
}
self.flow_names.push(Some(flow_info.flow_name()));
self.flow_ids.push(Some(flow_id));
self.table_catalogs.push(Some(flow_info.catalog_name()));
self.raw_sqls.push(Some(flow_info.raw_sql()));
self.comments.push(Some(flow_info.comment()));
self.expire_afters.push(flow_info.expire_after());
self.source_table_id_groups.push(Some(
&serde_json::to_string(flow_info.source_table_ids()).context(JsonSnafu {
input: format!("{:?}", flow_info.source_table_ids()),
})?,
));
self.sink_table_names
.push(Some(&flow_info.sink_table_name().to_string()));
self.flownode_id_groups.push(Some(
&serde_json::to_string(flow_info.flownode_ids()).context({
JsonSnafu {
input: format!("{:?}", flow_info.flownode_ids()),
}
})?,
));
self.option_groups
.push(Some(&serde_json::to_string(flow_info.options()).context(
JsonSnafu {
input: format!("{:?}", flow_info.options()),
},
)?));
Ok(())
}
fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> = vec![
Arc::new(self.flow_names.finish()),
Arc::new(self.flow_ids.finish()),
Arc::new(self.table_catalogs.finish()),
Arc::new(self.raw_sqls.finish()),
Arc::new(self.comments.finish()),
Arc::new(self.expire_afters.finish()),
Arc::new(self.source_table_id_groups.finish()),
Arc::new(self.sink_table_names.finish()),
Arc::new(self.flownode_id_groups.finish()),
Arc::new(self.option_groups.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}
impl DfPartitionStream for InformationSchemaFlows {
fn schema(&self) -> &arrow_schema::SchemaRef {
self.schema.arrow_schema()
}
fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema: Arc<arrow_schema::Schema> = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_flows(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}

View File

@@ -44,3 +44,4 @@ pub const REGION_PEERS: &str = "region_peers";
pub const TABLE_CONSTRAINTS: &str = "table_constraints";
pub const CLUSTER_INFO: &str = "cluster_info";
pub const VIEWS: &str = "views";
pub const FLOWS: &str = "flows";

View File

@@ -96,6 +96,8 @@ pub const INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID: u32 = 30;
pub const INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID: u32 = 31;
/// id for information_schema.VIEWS
pub const INFORMATION_SCHEMA_VIEW_TABLE_ID: u32 = 32;
/// id for information_schema.FLOWS
pub const INFORMATION_SCHEMA_FLOW_TABLE_ID: u32 = 33;
/// ----- End of information_schema tables -----
/// ----- Begin of pg_catalog tables -----

View File

@@ -142,6 +142,10 @@ impl FlowInfoValue {
&self.source_table_ids
}
pub fn catalog_name(&self) -> &String {
&self.catalog_name
}
pub fn flow_name(&self) -> &String {
&self.flow_name
}
@@ -161,6 +165,10 @@ impl FlowInfoValue {
pub fn comment(&self) -> &String {
&self.comment
}
pub fn options(&self) -> &HashMap<String, String> {
&self.options
}
}
pub type FlowInfoManagerRef = Arc<FlowInfoManager>;

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::stream::BoxStream;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
@@ -20,9 +23,14 @@ use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{DeserializedValueWithBytes, FlowId, MetaKey, TableMetaValue, NAME_PATTERN};
use crate::key::{
BytesAdapter, DeserializedValueWithBytes, FlowId, MetaKey, TableMetaValue, NAME_PATTERN,
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
const FLOW_NAME_KEY_PREFIX: &str = "name";
@@ -38,6 +46,7 @@ lazy_static! {
/// The layout: `__flow/name/{catalog_name}/{flow_name}`.
pub struct FlowNameKey<'a>(FlowScoped<FlowNameKeyInner<'a>>);
#[allow(dead_code)]
impl<'a> FlowNameKey<'a> {
/// Returns the [FlowNameKey]
pub fn new(catalog: &'a str, flow_name: &'a str) -> FlowNameKey<'a> {
@@ -45,13 +54,22 @@ impl<'a> FlowNameKey<'a> {
FlowNameKey(FlowScoped::new(inner))
}
#[cfg(test)]
pub fn range_start_key(catalog: &str) -> Vec<u8> {
let inner = BytesAdapter::from(Self::prefix(catalog).into_bytes());
FlowScoped::new(inner).to_bytes()
}
/// Return `name/{catalog}/` as prefix
pub fn prefix(catalog: &str) -> String {
format!("{}/{}/", FLOW_NAME_KEY_PREFIX, catalog)
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog_name
}
#[cfg(test)]
/// Return the `flow_name`
pub fn flow_name(&self) -> &str {
self.0.flow_name
@@ -140,6 +158,12 @@ impl FlowNameValue {
}
}
pub fn flow_name_decoder(kv: KeyValue) -> Result<(String, FlowNameValue)> {
let flow_name = FlowNameKey::from_bytes(&kv.key)?;
let flow_id = FlowNameValue::try_from_raw_value(&kv.value)?;
Ok((flow_name.flow_name().to_string(), flow_id))
}
/// The manager of [FlowNameKey].
pub struct FlowNameManager {
kv_backend: KvBackendRef,
@@ -162,6 +186,25 @@ impl FlowNameManager {
.transpose()
}
/// Return all flows' names and ids
pub async fn flow_names(
&self,
catalog: &str,
) -> BoxStream<'static, Result<(String, FlowNameValue)>> {
let start_key = FlowNameKey::range_start_key(catalog);
common_telemetry::debug!("flow_names: start_key: {:?}", start_key);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(flow_name_decoder),
);
Box::pin(stream)
}
/// Returns true if the `flow` exists.
pub async fn exists(&self, catalog: &str, flow: &str) -> Result<bool> {
let key = FlowNameKey::new(catalog, flow);
@@ -212,4 +255,11 @@ mod tests {
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.flow_name(), "my_task");
}
#[test]
fn test_key_start_range() {
assert_eq!(
b"__flow/name/greptime/".to_vec(),
FlowNameKey::range_start_key("greptime")
);
}
}

View File

@@ -26,7 +26,7 @@ use common_error::ext::BoxedError;
use common_meta::key::TableMetadataManagerRef;
use common_runtime::JoinHandle;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_telemetry::{debug, info};
use common_telemetry::{debug, info, trace};
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use greptime_proto::v1;
@@ -535,10 +535,10 @@ impl FlowWorkerManager {
} else {
(9 * avg_spd + cur_spd) / 10
};
debug!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
debug!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
since_last_run = tokio::time::Instant::now();
tokio::time::sleep(new_wait).await;
}

View File

@@ -436,7 +436,11 @@ impl InterThreadCallServer {
fn from_send_error<T>(err: mpsc::error::SendError<T>) -> Error {
InternalSnafu {
reason: format!("InterThreadCallServer resp failed: {}", err),
// this `err` will simply display `channel closed`
reason: format!(
"Worker's receiver channel have been closed unexpected: {}",
err
),
}
.build()
}

View File

@@ -16,13 +16,14 @@
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use common_telemetry::debug;
use datatypes::value::Value;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt};
use crate::error::{Error, InvalidQuerySnafu};
use crate::expr::error::EvalError;
use crate::expr::error::{EvalError, InternalSnafu};
use crate::expr::{Id, InvalidArgumentSnafu, LocalId, ScalarExpr};
use crate::repr::{self, value_to_internal_ts, Diff, Row};
@@ -709,6 +710,18 @@ impl MfpPlan {
}
if Some(lower_bound) != upper_bound && !null_eval {
if self.mfp.mfp.projection.iter().any(|c| values.len() <= *c) {
debug!("values={:?}, mfp={:?}", &values, &self.mfp.mfp);
let err = InternalSnafu {
reason: format!(
"Index out of bound for mfp={:?} and values={:?}",
&self.mfp.mfp, &values
),
}
.build();
return ret_err(err);
}
// safety: already checked that `projection` is not out of bound
let res_row = Row::pack(self.mfp.mfp.projection.iter().map(|c| values[*c].clone()));
let upper_opt =
upper_bound.map(|upper_bound| Ok((res_row.clone(), upper_bound, -diff)));

View File

@@ -509,6 +509,9 @@ pub fn check_permission(
Statement::ShowViews(stmt) => {
validate_db_permission!(stmt, query_ctx);
}
Statement::ShowFlows(stmt) => {
validate_db_permission!(stmt, query_ctx);
}
Statement::ShowStatus(_stmt) => {}
Statement::DescribeTable(stmt) => {
validate_param(stmt.name(), query_ctx)?;

View File

@@ -138,6 +138,8 @@ impl StatementExecutor {
Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
let req = to_copy_table_request(stmt, query_ctx.clone())?;
match req.direction {

View File

@@ -23,7 +23,7 @@ use snafu::{OptionExt, ResultExt};
use sql::ast::Ident;
use sql::statements::create::Partitions;
use sql::statements::show::{
ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowIndex, ShowKind,
ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind,
ShowTableStatus, ShowTables, ShowVariables, ShowViews,
};
use table::metadata::TableType;
@@ -163,6 +163,17 @@ impl StatementExecutor {
.context(ExecuteStatementSnafu)
}
#[tracing::instrument(skip_all)]
pub(super) async fn show_flows(
&self,
stmt: ShowFlows,
query_ctx: QueryContextRef,
) -> Result<Output> {
query::sql::show_flows(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
.await
.context(ExecuteStatementSnafu)
}
#[tracing::instrument(skip_all)]
pub async fn show_create_flow(
&self,

View File

@@ -18,7 +18,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use catalog::information_schema::{
columns, key_column_usage, schemata, tables, CHARACTER_SETS, COLLATIONS, COLUMNS,
columns, flows, key_column_usage, schemata, tables, CHARACTER_SETS, COLLATIONS, COLUMNS, FLOWS,
KEY_COLUMN_USAGE, SCHEMATA, TABLES, VIEWS,
};
use catalog::CatalogManagerRef;
@@ -54,8 +54,8 @@ use sql::ast::Ident;
use sql::parser::ParserContext;
use sql::statements::create::{CreateFlow, CreateView, Partitions};
use sql::statements::show::{
ShowColumns, ShowDatabases, ShowIndex, ShowKind, ShowTableStatus, ShowTables, ShowVariables,
ShowViews,
ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowTableStatus, ShowTables,
ShowVariables, ShowViews,
};
use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
@@ -71,6 +71,7 @@ const SCHEMAS_COLUMN: &str = "Database";
const OPTIONS_COLUMN: &str = "Options";
const TABLES_COLUMN: &str = "Tables";
const VIEWS_COLUMN: &str = "Views";
const FLOWS_COLUMN: &str = "Flows";
const FIELD_COLUMN: &str = "Field";
const TABLE_TYPE_COLUMN: &str = "Table_type";
const COLUMN_NAME_COLUMN: &str = "Column";
@@ -763,6 +764,33 @@ pub async fn show_views(
.await
}
/// Execute [`ShowFlows`] statement and return the [`Output`] if success.
pub async fn show_flows(
stmt: ShowFlows,
query_engine: &QueryEngineRef,
catalog_manager: &CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)];
let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))];
let like_field = Some(flows::FLOW_NAME);
let sort = vec![col(flows::FLOW_NAME).sort(true, true)];
query_from_information_schema_table(
query_engine,
catalog_manager,
query_ctx,
FLOWS,
vec![],
projects,
filters,
like_field,
sort,
stmt.kind,
)
.await
}
pub fn show_create_flow(
flow_name: ObjectName,
flow_val: FlowInfoValue,

View File

@@ -21,8 +21,8 @@ use crate::error::{
};
use crate::parser::ParserContext;
use crate::statements::show::{
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowIndex,
ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowFlows,
ShowIndex, ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
};
use crate::statements::statement::Statement;
@@ -46,6 +46,8 @@ impl<'a> ParserContext<'a> {
}
} else if self.consume_token("VIEWS") {
self.parse_show_views()
} else if self.consume_token("FLOWS") {
self.parse_show_flows()
} else if self.matches_keyword(Keyword::CHARSET) {
self.parser.next_token();
Ok(Statement::ShowCharset(self.parse_show_kind()?))
@@ -442,7 +444,7 @@ impl<'a> ParserContext<'a> {
}));
}
// SHOW VIEWS [in | FROM] [DATABASE]
// SHOW FLOWS [in | FROM] [DATABASE]
Token::Word(w) => match w.keyword {
Keyword::IN | Keyword::FROM => self.parse_db_name()?,
_ => None,
@@ -454,6 +456,28 @@ impl<'a> ParserContext<'a> {
Ok(Statement::ShowViews(ShowViews { kind, database }))
}
fn parse_show_flows(&mut self) -> Result<Statement> {
let database = match self.parser.peek_token().token {
Token::EOF | Token::SemiColon => {
return Ok(Statement::ShowFlows(ShowFlows {
kind: ShowKind::All,
database: None,
}));
}
// SHOW FLOWS [in | FROM] [DATABASE]
Token::Word(w) => match w.keyword {
Keyword::IN | Keyword::FROM => self.parse_db_name()?,
_ => None,
},
_ => None,
};
let kind = self.parse_show_kind()?;
Ok(Statement::ShowFlows(ShowFlows { kind, database }))
}
}
#[cfg(test)]
@@ -1000,4 +1024,38 @@ mod tests {
);
assert_eq!(sql, stmts[0].to_string());
}
#[test]
pub fn test_show_flows() {
let sql = "SHOW FLOWS";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
let stmts = result.unwrap();
assert_eq!(1, stmts.len());
assert_eq!(
stmts[0],
Statement::ShowFlows(ShowFlows {
kind: ShowKind::All,
database: None,
})
);
assert_eq!(sql, stmts[0].to_string());
}
#[test]
pub fn test_show_flows_in_db() {
let sql = "SHOW FLOWS IN d1";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
let stmts = result.unwrap();
assert_eq!(1, stmts.len());
assert_eq!(
stmts[0],
Statement::ShowFlows(ShowFlows {
kind: ShowKind::All,
database: Some("d1".to_string()),
})
);
assert_eq!(sql, stmts[0].to_string());
}
}

View File

@@ -187,6 +187,25 @@ impl Display for ShowCreateFlow {
}
}
/// SQL structure for `SHOW FLOWS`.
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
pub struct ShowFlows {
pub kind: ShowKind,
pub database: Option<String>,
}
impl Display for ShowFlows {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SHOW FLOWS")?;
if let Some(database) = &self.database {
write!(f, " IN {database}")?;
}
format_kind!(self, f);
Ok(())
}
}
/// SQL structure for `SHOW CREATE VIEW`.
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
pub struct ShowCreateView {
@@ -535,6 +554,49 @@ SHOW VIEWS"#,
}
}
#[test]
fn test_display_show_flows() {
let sql = r"show flows in d1;";
let stmts: Vec<Statement> =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, stmts.len());
assert_matches!(&stmts[0], Statement::ShowFlows { .. });
match &stmts[0] {
Statement::ShowFlows(show) => {
let new_sql = format!("\n{}", show);
assert_eq!(
r#"
SHOW FLOWS IN d1"#,
&new_sql
);
}
_ => {
unreachable!();
}
}
let sql = r"show flows;";
let stmts: Vec<Statement> =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, stmts.len());
assert_matches!(&stmts[0], Statement::ShowFlows { .. });
match &stmts[0] {
Statement::ShowFlows(show) => {
let new_sql = format!("\n{}", show);
assert_eq!(
r#"
SHOW FLOWS"#,
&new_sql
);
}
_ => {
unreachable!("{:?}", &stmts[0]);
}
}
}
#[test]
fn test_display_show_databases() {
let sql = r"show databases;";

View File

@@ -31,8 +31,8 @@ use crate::statements::insert::Insert;
use crate::statements::query::Query;
use crate::statements::set_variables::SetVariables;
use crate::statements::show::{
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowIndex,
ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowFlows,
ShowIndex, ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
};
use crate::statements::tql::Tql;
use crate::statements::truncate::TruncateTable;
@@ -87,6 +87,8 @@ pub enum Statement {
ShowCreateTable(ShowCreateTable),
// SHOW CREATE FLOW
ShowCreateFlow(ShowCreateFlow),
/// SHOW FLOWS
ShowFlows(ShowFlows),
// SHOW CREATE VIEW
ShowCreateView(ShowCreateView),
// SHOW STATUS
@@ -133,6 +135,7 @@ impl Display for Statement {
Statement::ShowIndex(s) => s.fmt(f),
Statement::ShowCreateTable(s) => s.fmt(f),
Statement::ShowCreateFlow(s) => s.fmt(f),
Statement::ShowFlows(s) => s.fmt(f),
Statement::ShowCreateView(s) => s.fmt(f),
Statement::ShowViews(s) => s.fmt(f),
Statement::ShowStatus(s) => s.fmt(f),

View File

@@ -13,6 +13,18 @@ create table out_num_cnt (
Affected Rows: 0
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
+-----------+---------------+-----------------+
| flow_name | table_catalog | flow_definition |
+-----------+---------------+-----------------+
+-----------+---------------+-----------------+
SHOW FLOWS;
++
++
CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10;
Affected Rows: 0
@@ -27,10 +39,38 @@ SHOW CREATE FLOW filter_numbers;
| | AS SELECT number FROM numbers_input WHERE number > 10 |
+----------------+-------------------------------------------------------+
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
+----------------+---------------+----------------------------------------------------+
| flow_name | table_catalog | flow_definition |
+----------------+---------------+----------------------------------------------------+
| filter_numbers | greptime | SELECT number FROM numbers_input WHERE number > 10 |
+----------------+---------------+----------------------------------------------------+
SHOW FLOWS;
+----------------+
| Flows |
+----------------+
| filter_numbers |
+----------------+
drop flow filter_numbers;
Affected Rows: 0
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
+-----------+---------------+-----------------+
| flow_name | table_catalog | flow_definition |
+-----------+---------------+-----------------+
+-----------+---------------+-----------------+
SHOW FLOWS;
++
++
drop table out_num_cnt;
Affected Rows: 0

View File

@@ -8,12 +8,25 @@ create table out_num_cnt (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX);
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
SHOW FLOWS;
CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10;
SHOW CREATE FLOW filter_numbers;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
SHOW FLOWS;
drop flow filter_numbers;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
SHOW FLOWS;
drop table out_num_cnt;
drop table numbers_input;

View File

@@ -41,6 +41,7 @@ SHOW TABLES;
| engines |
| events |
| files |
| flows |
| global_status |
| key_column_usage |
| optimizer_trace |
@@ -86,6 +87,7 @@ SHOW FULL TABLES;
| engines | LOCAL TEMPORARY |
| events | LOCAL TEMPORARY |
| files | LOCAL TEMPORARY |
| flows | LOCAL TEMPORARY |
| global_status | LOCAL TEMPORARY |
| key_column_usage | LOCAL TEMPORARY |
| optimizer_trace | LOCAL TEMPORARY |
@@ -125,6 +127,7 @@ SHOW TABLE STATUS;
|engines||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|events||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|files||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|flows||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|global_status||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|key_column_usage||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|optimizer_trace||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||

View File

@@ -26,6 +26,7 @@ order by table_schema, table_name;
|greptime|information_schema|engines|LOCALTEMPORARY|5|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|greptime|information_schema|events|LOCALTEMPORARY|13|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|greptime|information_schema|files|LOCALTEMPORARY|14|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|greptime|information_schema|flows|LOCALTEMPORARY|33|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|greptime|information_schema|global_status|LOCALTEMPORARY|25|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|greptime|information_schema|key_column_usage|LOCALTEMPORARY|16|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|greptime|information_schema|optimizer_trace|LOCALTEMPORARY|17|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
@@ -185,6 +186,16 @@ select * from information_schema.columns order by table_schema, table_name, colu
| greptime | information_schema | files | update_count | 13 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
| greptime | information_schema | files | update_time | 34 | | | | | 3 | | | | | select,insert | | DateTime | datetime | FIELD | | No | datetime | | |
| greptime | information_schema | files | version | 25 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | flows | comment | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | expire_after | 6 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | Yes | bigint | | |
| greptime | information_schema | flows | flow_definition | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | flows | flow_id | 2 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
| greptime | information_schema | flows | flow_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | flows | flownode_ids | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | options | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | sink_table_name | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | flows | source_table_ids | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | table_catalog | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | global_status | variable_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | global_status | variable_value | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | key_column_usage | column_name | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |

View File

@@ -83,6 +83,7 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE;
|greptime|information_schema|engines|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|greptime|information_schema|events|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|greptime|information_schema|files|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|greptime|information_schema|flows|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|greptime|information_schema|global_status|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|greptime|information_schema|key_column_usage|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|greptime|public|numbers|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID|test_engine|ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|