From 9fa9156bde0c96179032a0cd0252cf140d3bea15 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Fri, 19 Jul 2024 17:29:36 +0800 Subject: [PATCH] feat: `FLOWS` table in information_schema&SHOW FLOWS (#4386) * feat(WIP): flow info table refactor: better err handling&log feat: add flow metadata to info schema provider feat(WIP): info_schema.flows feat: info_schema.flows table * fix: err after rebase * fix: wrong comparsion op * feat: SHOW FLOWS&tests * refactor: per review * chore: unused * refactor: json error * chore: per review * test: sqlness * chore: rm inline error * refactor: per review --- src/catalog/src/error.rs | 33 +- src/catalog/src/kvbackend/manager.rs | 7 +- src/catalog/src/memory/manager.rs | 3 + .../src/system_schema/information_schema.rs | 16 +- .../system_schema/information_schema/flows.rs | 305 ++++++++++++++++++ .../information_schema/table_names.rs | 1 + src/common/catalog/src/consts.rs | 2 + src/common/meta/src/key/flow/flow_info.rs | 8 + src/common/meta/src/key/flow/flow_name.rs | 56 +++- src/flow/src/adapter.rs | 6 +- src/flow/src/adapter/worker.rs | 6 +- src/flow/src/expr/linear.rs | 15 +- src/frontend/src/instance.rs | 3 + src/operator/src/statement.rs | 2 + src/operator/src/statement/show.rs | 13 +- src/query/src/sql.rs | 34 +- src/sql/src/parsers/show_parser.rs | 64 +++- src/sql/src/statements/show.rs | 62 ++++ src/sql/src/statements/statement.rs | 7 +- .../common/flow/show_create_flow.result | 40 +++ .../common/flow/show_create_flow.sql | 13 + .../common/show/show_databases_tables.result | 3 + .../common/system/information_schema.result | 11 + .../standalone/common/view/create.result | 1 + 24 files changed, 690 insertions(+), 21 deletions(-) create mode 100644 src/catalog/src/system_schema/information_schema/flows.rs diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 863c8c1151..fa4b469cce 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -57,6 +57,31 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Failed to list flows in catalog {catalog}"))] + ListFlows { + #[snafu(implicit)] + location: Location, + catalog: String, + source: BoxedError, + }, + + #[snafu(display("Flow info not found: {flow_name} in catalog {catalog_name}"))] + FlowInfoNotFound { + flow_name: String, + catalog_name: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Can't convert value to json, input={input}"))] + Json { + input: String, + #[snafu(source)] + error: serde_json::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to re-compile script due to internal error"))] CompileScriptInternal { #[snafu(implicit)] @@ -254,12 +279,15 @@ impl ErrorExt for Error { | Error::FindPartitions { .. } | Error::FindRegionRoutes { .. } | Error::CacheNotFound { .. } - | Error::CastManager { .. } => StatusCode::Unexpected, + | Error::CastManager { .. } + | Error::Json { .. } => StatusCode::Unexpected, Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments, Error::ViewInfoNotFound { .. } => StatusCode::TableNotFound, + Error::FlowInfoNotFound { .. } => StatusCode::FlowNotFound, + Error::SystemCatalog { .. } => StatusCode::StorageUnavailable, Error::UpgradeWeakCatalogManagerRef { .. } => StatusCode::Internal, @@ -270,7 +298,8 @@ impl ErrorExt for Error { Error::ListCatalogs { source, .. } | Error::ListNodes { source, .. } | Error::ListSchemas { source, .. } - | Error::ListTables { source, .. } => source.status_code(), + | Error::ListTables { source, .. } + | Error::ListFlows { source, .. } => source.status_code(), Error::CreateTable { source, .. } => source.status_code(), diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index c583786988..6b44b2459c 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -25,6 +25,7 @@ use common_config::Mode; use common_error::ext::BoxedError; use common_meta::cache::{LayeredCacheRegistryRef, ViewInfoCacheRef}; use common_meta::key::catalog_name::CatalogNameKey; +use common_meta::key::flow::FlowMetadataManager; use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_name::TableNameKey; @@ -85,7 +86,7 @@ impl KvBackendCatalogManager { .get() .expect("Failed to get table_route_cache"), )), - table_metadata_manager: Arc::new(TableMetadataManager::new(backend)), + table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())), system_catalog: SystemCatalog { catalog_manager: me.clone(), catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY), @@ -93,11 +94,13 @@ impl KvBackendCatalogManager { information_schema_provider: Arc::new(InformationSchemaProvider::new( DEFAULT_CATALOG_NAME.to_string(), me.clone(), + Arc::new(FlowMetadataManager::new(backend.clone())), )), pg_catalog_provider: Arc::new(PGCatalogProvider::new( DEFAULT_CATALOG_NAME.to_string(), me.clone(), )), + backend, }, cache_registry, }) @@ -313,6 +316,7 @@ struct SystemCatalog { // system_schema_provier for default catalog information_schema_provider: Arc, pg_catalog_provider: Arc, + backend: KvBackendRef, } impl SystemCatalog { @@ -358,6 +362,7 @@ impl SystemCatalog { Arc::new(InformationSchemaProvider::new( catalog.to_string(), self.catalog_manager.clone(), + Arc::new(FlowMetadataManager::new(self.backend.clone())), )) }); information_schema_provider.table(table_name) diff --git a/src/catalog/src/memory/manager.rs b/src/catalog/src/memory/manager.rs index a236a6b4b7..3c27d4736b 100644 --- a/src/catalog/src/memory/manager.rs +++ b/src/catalog/src/memory/manager.rs @@ -23,6 +23,8 @@ use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME, }; +use common_meta::key::flow::FlowMetadataManager; +use common_meta::kv_backend::memory::MemoryKvBackend; use futures_util::stream::BoxStream; use snafu::OptionExt; use table::TableRef; @@ -298,6 +300,7 @@ impl MemoryCatalogManager { let information_schema_provider = InformationSchemaProvider::new( catalog, Arc::downgrade(self) as Weak, + Arc::new(FlowMetadataManager::new(Arc::new(MemoryKvBackend::new()))), ); let information_schema = information_schema_provider.tables().clone(); diff --git a/src/catalog/src/system_schema/information_schema.rs b/src/catalog/src/system_schema/information_schema.rs index cf2607271e..bc4f3167c7 100644 --- a/src/catalog/src/system_schema/information_schema.rs +++ b/src/catalog/src/system_schema/information_schema.rs @@ -14,6 +14,7 @@ mod cluster_info; pub mod columns; +pub mod flows; mod information_memory_table; pub mod key_column_usage; mod partitions; @@ -31,6 +32,7 @@ use std::collections::HashMap; use std::sync::{Arc, Weak}; use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME}; +use common_meta::key::flow::FlowMetadataManager; use common_recordbatch::SendableRecordBatchStream; use datatypes::schema::SchemaRef; use lazy_static::lazy_static; @@ -46,6 +48,7 @@ use self::columns::InformationSchemaColumns; use super::{SystemSchemaProviderInner, SystemTable, SystemTableRef}; use crate::error::Result; use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo; +use crate::system_schema::information_schema::flows::InformationSchemaFlows; use crate::system_schema::information_schema::information_memory_table::get_schema_columns; use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage; use crate::system_schema::information_schema::partitions::InformationSchemaPartitions; @@ -104,6 +107,7 @@ macro_rules! setup_memory_table { pub struct InformationSchemaProvider { catalog_name: String, catalog_manager: Weak, + flow_metadata_manager: Arc, tables: HashMap, } @@ -182,16 +186,25 @@ impl SystemSchemaProviderInner for InformationSchemaProvider { self.catalog_name.clone(), self.catalog_manager.clone(), )) as _), + FLOWS => Some(Arc::new(InformationSchemaFlows::new( + self.catalog_name.clone(), + self.flow_metadata_manager.clone(), + )) as _), _ => None, } } } impl InformationSchemaProvider { - pub fn new(catalog_name: String, catalog_manager: Weak) -> Self { + pub fn new( + catalog_name: String, + catalog_manager: Weak, + flow_metadata_manager: Arc, + ) -> Self { let mut provider = Self { catalog_name, catalog_manager, + flow_metadata_manager, tables: HashMap::new(), }; @@ -238,6 +251,7 @@ impl InformationSchemaProvider { TABLE_CONSTRAINTS.to_string(), self.build_table(TABLE_CONSTRAINTS).unwrap(), ); + tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap()); // Add memory tables for name in MEMORY_TABLES.iter() { diff --git a/src/catalog/src/system_schema/information_schema/flows.rs b/src/catalog/src/system_schema/information_schema/flows.rs new file mode 100644 index 0000000000..15a4205ae2 --- /dev/null +++ b/src/catalog/src/system_schema/information_schema/flows.rs @@ -0,0 +1,305 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID; +use common_error::ext::BoxedError; +use common_meta::key::flow::flow_info::FlowInfoValue; +use common_meta::key::flow::FlowMetadataManager; +use common_meta::key::FlowId; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream}; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; +use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; +use datatypes::prelude::ConcreteDataType as CDT; +use datatypes::scalars::ScalarVectorBuilder; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::value::Value; +use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder, VectorRef}; +use futures::TryStreamExt; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::{ScanRequest, TableId}; + +use crate::error::{ + CreateRecordBatchSnafu, FlowInfoNotFoundSnafu, InternalSnafu, JsonSnafu, ListFlowsSnafu, Result, +}; +use crate::information_schema::{Predicates, FLOWS}; +use crate::system_schema::information_schema::InformationTable; + +const INIT_CAPACITY: usize = 42; + +// rows of information_schema.flows +// pk is (flow_name, flow_id, table_catalog) +pub const FLOW_NAME: &str = "flow_name"; +pub const FLOW_ID: &str = "flow_id"; +pub const TABLE_CATALOG: &str = "table_catalog"; +pub const FLOW_DEFINITION: &str = "flow_definition"; +pub const COMMENT: &str = "comment"; +pub const EXPIRE_AFTER: &str = "expire_after"; +pub const SOURCE_TABLE_IDS: &str = "source_table_ids"; +pub const SINK_TABLE_NAME: &str = "sink_table_name"; +pub const FLOWNODE_IDS: &str = "flownode_ids"; +pub const OPTIONS: &str = "options"; + +/// The `information_schema.flows` to provides information about flows in databases. +pub(super) struct InformationSchemaFlows { + schema: SchemaRef, + catalog_name: String, + flow_metadata_manager: Arc, +} + +impl InformationSchemaFlows { + pub(super) fn new( + catalog_name: String, + flow_metadata_manager: Arc, + ) -> Self { + Self { + schema: Self::schema(), + catalog_name, + flow_metadata_manager, + } + } + + /// for complex fields(including [`SOURCE_TABLE_IDS`], [`FLOWNODE_IDS`] and [`OPTIONS`]), it will be serialized to json string for now + /// TODO(discord9): use a better way to store complex fields like json type + pub(crate) fn schema() -> SchemaRef { + Arc::new(Schema::new( + vec![ + (FLOW_NAME, CDT::string_datatype(), false), + (FLOW_ID, CDT::uint32_datatype(), false), + (TABLE_CATALOG, CDT::string_datatype(), false), + (FLOW_DEFINITION, CDT::string_datatype(), false), + (COMMENT, CDT::string_datatype(), true), + (EXPIRE_AFTER, CDT::int64_datatype(), true), + (SOURCE_TABLE_IDS, CDT::string_datatype(), true), + (SINK_TABLE_NAME, CDT::string_datatype(), false), + (FLOWNODE_IDS, CDT::string_datatype(), true), + (OPTIONS, CDT::string_datatype(), true), + ] + .into_iter() + .map(|(name, ty, nullable)| ColumnSchema::new(name, ty, nullable)) + .collect(), + )) + } + + fn builder(&self) -> InformationSchemaFlowsBuilder { + InformationSchemaFlowsBuilder::new( + self.schema.clone(), + self.catalog_name.clone(), + &self.flow_metadata_manager, + ) + } +} + +impl InformationTable for InformationSchemaFlows { + fn table_id(&self) -> TableId { + INFORMATION_SCHEMA_FLOW_TABLE_ID + } + + fn table_name(&self) -> &'static str { + FLOWS + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn to_stream(&self, request: ScanRequest) -> Result { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_flows(Some(request)) + .await + .map(|x| x.into_df_record_batch()) + .map_err(|err| datafusion::error::DataFusionError::External(Box::new(err))) + }), + )); + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } +} + +/// Builds the `information_schema.FLOWS` table row by row +/// +/// columns are based on [`FlowInfoValue`] +struct InformationSchemaFlowsBuilder { + schema: SchemaRef, + catalog_name: String, + flow_metadata_manager: Arc, + + flow_names: StringVectorBuilder, + flow_ids: UInt32VectorBuilder, + table_catalogs: StringVectorBuilder, + raw_sqls: StringVectorBuilder, + comments: StringVectorBuilder, + expire_afters: Int64VectorBuilder, + source_table_id_groups: StringVectorBuilder, + sink_table_names: StringVectorBuilder, + flownode_id_groups: StringVectorBuilder, + option_groups: StringVectorBuilder, +} + +impl InformationSchemaFlowsBuilder { + fn new( + schema: SchemaRef, + catalog_name: String, + flow_metadata_manager: &Arc, + ) -> Self { + Self { + schema, + catalog_name, + flow_metadata_manager: flow_metadata_manager.clone(), + + flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY), + table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY), + raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY), + comments: StringVectorBuilder::with_capacity(INIT_CAPACITY), + expire_afters: Int64VectorBuilder::with_capacity(INIT_CAPACITY), + source_table_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY), + sink_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + flownode_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY), + option_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY), + } + } + + /// Construct the `information_schema.flows` virtual table + async fn make_flows(&mut self, request: Option) -> Result { + let catalog_name = self.catalog_name.clone(); + let predicates = Predicates::from_scan_request(&request); + + let flow_info_manager = self.flow_metadata_manager.clone(); + + // TODO(discord9): use `AsyncIterator` once it's stable-ish + let mut stream = flow_info_manager + .flow_name_manager() + .flow_names(&catalog_name) + .await; + + while let Some((flow_name, flow_id)) = stream + .try_next() + .await + .map_err(BoxedError::new) + .context(ListFlowsSnafu { + catalog: &catalog_name, + })? + { + let flow_info = flow_info_manager + .flow_info_manager() + .get(flow_id.flow_id()) + .await + .map_err(BoxedError::new) + .context(InternalSnafu)? + .context(FlowInfoNotFoundSnafu { + catalog_name: catalog_name.to_string(), + flow_name: flow_name.to_string(), + })?; + self.add_flow(&predicates, flow_id.flow_id(), flow_info)?; + } + + self.finish() + } + + fn add_flow( + &mut self, + predicates: &Predicates, + flow_id: FlowId, + flow_info: FlowInfoValue, + ) -> Result<()> { + let row = [ + (FLOW_NAME, &Value::from(flow_info.flow_name().to_string())), + (FLOW_ID, &Value::from(flow_id)), + ( + TABLE_CATALOG, + &Value::from(flow_info.catalog_name().to_string()), + ), + ]; + if !predicates.eval(&row) { + return Ok(()); + } + self.flow_names.push(Some(flow_info.flow_name())); + self.flow_ids.push(Some(flow_id)); + self.table_catalogs.push(Some(flow_info.catalog_name())); + self.raw_sqls.push(Some(flow_info.raw_sql())); + self.comments.push(Some(flow_info.comment())); + self.expire_afters.push(flow_info.expire_after()); + self.source_table_id_groups.push(Some( + &serde_json::to_string(flow_info.source_table_ids()).context(JsonSnafu { + input: format!("{:?}", flow_info.source_table_ids()), + })?, + )); + self.sink_table_names + .push(Some(&flow_info.sink_table_name().to_string())); + self.flownode_id_groups.push(Some( + &serde_json::to_string(flow_info.flownode_ids()).context({ + JsonSnafu { + input: format!("{:?}", flow_info.flownode_ids()), + } + })?, + )); + self.option_groups + .push(Some(&serde_json::to_string(flow_info.options()).context( + JsonSnafu { + input: format!("{:?}", flow_info.options()), + }, + )?)); + + Ok(()) + } + + fn finish(&mut self) -> Result { + let columns: Vec = vec![ + Arc::new(self.flow_names.finish()), + Arc::new(self.flow_ids.finish()), + Arc::new(self.table_catalogs.finish()), + Arc::new(self.raw_sqls.finish()), + Arc::new(self.comments.finish()), + Arc::new(self.expire_afters.finish()), + Arc::new(self.source_table_id_groups.finish()), + Arc::new(self.sink_table_names.finish()), + Arc::new(self.flownode_id_groups.finish()), + Arc::new(self.option_groups.finish()), + ]; + RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu) + } +} + +impl DfPartitionStream for InformationSchemaFlows { + fn schema(&self) -> &arrow_schema::SchemaRef { + self.schema.arrow_schema() + } + + fn execute(&self, _: Arc) -> DfSendableRecordBatchStream { + let schema: Arc = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_flows(None) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )) + } +} diff --git a/src/catalog/src/system_schema/information_schema/table_names.rs b/src/catalog/src/system_schema/information_schema/table_names.rs index 5d291bc678..c2c9eeff24 100644 --- a/src/catalog/src/system_schema/information_schema/table_names.rs +++ b/src/catalog/src/system_schema/information_schema/table_names.rs @@ -44,3 +44,4 @@ pub const REGION_PEERS: &str = "region_peers"; pub const TABLE_CONSTRAINTS: &str = "table_constraints"; pub const CLUSTER_INFO: &str = "cluster_info"; pub const VIEWS: &str = "views"; +pub const FLOWS: &str = "flows"; diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index fbd31b8efd..ac1b4de5be 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -96,6 +96,8 @@ pub const INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID: u32 = 30; pub const INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID: u32 = 31; /// id for information_schema.VIEWS pub const INFORMATION_SCHEMA_VIEW_TABLE_ID: u32 = 32; +/// id for information_schema.FLOWS +pub const INFORMATION_SCHEMA_FLOW_TABLE_ID: u32 = 33; /// ----- End of information_schema tables ----- /// ----- Begin of pg_catalog tables ----- diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 86b4d29641..1354ec6cf4 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -142,6 +142,10 @@ impl FlowInfoValue { &self.source_table_ids } + pub fn catalog_name(&self) -> &String { + &self.catalog_name + } + pub fn flow_name(&self) -> &String { &self.flow_name } @@ -161,6 +165,10 @@ impl FlowInfoValue { pub fn comment(&self) -> &String { &self.comment } + + pub fn options(&self) -> &HashMap { + &self.options + } } pub type FlowInfoManagerRef = Arc; diff --git a/src/common/meta/src/key/flow/flow_name.rs b/src/common/meta/src/key/flow/flow_name.rs index ba877d4fc1..e7e763afa5 100644 --- a/src/common/meta/src/key/flow/flow_name.rs +++ b/src/common/meta/src/key/flow/flow_name.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use futures::stream::BoxStream; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -20,9 +23,14 @@ use snafu::OptionExt; use crate::error::{self, Result}; use crate::key::flow::FlowScoped; use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::{DeserializedValueWithBytes, FlowId, MetaKey, TableMetaValue, NAME_PATTERN}; +use crate::key::{ + BytesAdapter, DeserializedValueWithBytes, FlowId, MetaKey, TableMetaValue, NAME_PATTERN, +}; use crate::kv_backend::txn::Txn; use crate::kv_backend::KvBackendRef; +use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; +use crate::rpc::store::RangeRequest; +use crate::rpc::KeyValue; const FLOW_NAME_KEY_PREFIX: &str = "name"; @@ -38,6 +46,7 @@ lazy_static! { /// The layout: `__flow/name/{catalog_name}/{flow_name}`. pub struct FlowNameKey<'a>(FlowScoped>); +#[allow(dead_code)] impl<'a> FlowNameKey<'a> { /// Returns the [FlowNameKey] pub fn new(catalog: &'a str, flow_name: &'a str) -> FlowNameKey<'a> { @@ -45,13 +54,22 @@ impl<'a> FlowNameKey<'a> { FlowNameKey(FlowScoped::new(inner)) } - #[cfg(test)] + pub fn range_start_key(catalog: &str) -> Vec { + let inner = BytesAdapter::from(Self::prefix(catalog).into_bytes()); + + FlowScoped::new(inner).to_bytes() + } + + /// Return `name/{catalog}/` as prefix + pub fn prefix(catalog: &str) -> String { + format!("{}/{}/", FLOW_NAME_KEY_PREFIX, catalog) + } + /// Returns the catalog. pub fn catalog(&self) -> &str { self.0.catalog_name } - #[cfg(test)] /// Return the `flow_name` pub fn flow_name(&self) -> &str { self.0.flow_name @@ -140,6 +158,12 @@ impl FlowNameValue { } } +pub fn flow_name_decoder(kv: KeyValue) -> Result<(String, FlowNameValue)> { + let flow_name = FlowNameKey::from_bytes(&kv.key)?; + let flow_id = FlowNameValue::try_from_raw_value(&kv.value)?; + Ok((flow_name.flow_name().to_string(), flow_id)) +} + /// The manager of [FlowNameKey]. pub struct FlowNameManager { kv_backend: KvBackendRef, @@ -162,6 +186,25 @@ impl FlowNameManager { .transpose() } + /// Return all flows' names and ids + pub async fn flow_names( + &self, + catalog: &str, + ) -> BoxStream<'static, Result<(String, FlowNameValue)>> { + let start_key = FlowNameKey::range_start_key(catalog); + common_telemetry::debug!("flow_names: start_key: {:?}", start_key); + let req = RangeRequest::new().with_prefix(start_key); + + let stream = PaginationStream::new( + self.kv_backend.clone(), + req, + DEFAULT_PAGE_SIZE, + Arc::new(flow_name_decoder), + ); + + Box::pin(stream) + } + /// Returns true if the `flow` exists. pub async fn exists(&self, catalog: &str, flow: &str) -> Result { let key = FlowNameKey::new(catalog, flow); @@ -212,4 +255,11 @@ mod tests { assert_eq!(key.catalog(), "my_catalog"); assert_eq!(key.flow_name(), "my_task"); } + #[test] + fn test_key_start_range() { + assert_eq!( + b"__flow/name/greptime/".to_vec(), + FlowNameKey::range_start_key("greptime") + ); + } } diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 80491a6861..6593785ac7 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -26,7 +26,7 @@ use common_error::ext::BoxedError; use common_meta::key::TableMetadataManagerRef; use common_runtime::JoinHandle; use common_telemetry::logging::{LoggingOptions, TracingOptions}; -use common_telemetry::{debug, info}; +use common_telemetry::{debug, info, trace}; use datatypes::schema::ColumnSchema; use datatypes::value::Value; use greptime_proto::v1; @@ -535,10 +535,10 @@ impl FlowWorkerManager { } else { (9 * avg_spd + cur_spd) / 10 }; - debug!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd); + trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd); let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms let new_wait = Duration::from_millis(new_wait as u64).min(default_interval); - debug!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt); + trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt); since_last_run = tokio::time::Instant::now(); tokio::time::sleep(new_wait).await; } diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index e5819a7f04..341923a7b4 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -436,7 +436,11 @@ impl InterThreadCallServer { fn from_send_error(err: mpsc::error::SendError) -> Error { InternalSnafu { - reason: format!("InterThreadCallServer resp failed: {}", err), + // this `err` will simply display `channel closed` + reason: format!( + "Worker's receiver channel have been closed unexpected: {}", + err + ), } .build() } diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 0a2ea7a141..b61ff944da 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -16,13 +16,14 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque}; +use common_telemetry::debug; use datatypes::value::Value; use itertools::Itertools; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt}; use crate::error::{Error, InvalidQuerySnafu}; -use crate::expr::error::EvalError; +use crate::expr::error::{EvalError, InternalSnafu}; use crate::expr::{Id, InvalidArgumentSnafu, LocalId, ScalarExpr}; use crate::repr::{self, value_to_internal_ts, Diff, Row}; @@ -709,6 +710,18 @@ impl MfpPlan { } if Some(lower_bound) != upper_bound && !null_eval { + if self.mfp.mfp.projection.iter().any(|c| values.len() <= *c) { + debug!("values={:?}, mfp={:?}", &values, &self.mfp.mfp); + let err = InternalSnafu { + reason: format!( + "Index out of bound for mfp={:?} and values={:?}", + &self.mfp.mfp, &values + ), + } + .build(); + return ret_err(err); + } + // safety: already checked that `projection` is not out of bound let res_row = Row::pack(self.mfp.mfp.projection.iter().map(|c| values[*c].clone())); let upper_opt = upper_bound.map(|upper_bound| Ok((res_row.clone(), upper_bound, -diff))); diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index e59eccc972..5f027ebb7f 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -509,6 +509,9 @@ pub fn check_permission( Statement::ShowViews(stmt) => { validate_db_permission!(stmt, query_ctx); } + Statement::ShowFlows(stmt) => { + validate_db_permission!(stmt, query_ctx); + } Statement::ShowStatus(_stmt) => {} Statement::DescribeTable(stmt) => { validate_param(stmt.name(), query_ctx)?; diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 348cd9beae..123f794cd2 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -138,6 +138,8 @@ impl StatementExecutor { Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await, + Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await, + Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => { let req = to_copy_table_request(stmt, query_ctx.clone())?; match req.direction { diff --git a/src/operator/src/statement/show.rs b/src/operator/src/statement/show.rs index 74d843a6af..8abb046b6c 100644 --- a/src/operator/src/statement/show.rs +++ b/src/operator/src/statement/show.rs @@ -23,7 +23,7 @@ use snafu::{OptionExt, ResultExt}; use sql::ast::Ident; use sql::statements::create::Partitions; use sql::statements::show::{ - ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowIndex, ShowKind, + ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowTableStatus, ShowTables, ShowVariables, ShowViews, }; use table::metadata::TableType; @@ -163,6 +163,17 @@ impl StatementExecutor { .context(ExecuteStatementSnafu) } + #[tracing::instrument(skip_all)] + pub(super) async fn show_flows( + &self, + stmt: ShowFlows, + query_ctx: QueryContextRef, + ) -> Result { + query::sql::show_flows(stmt, &self.query_engine, &self.catalog_manager, query_ctx) + .await + .context(ExecuteStatementSnafu) + } + #[tracing::instrument(skip_all)] pub async fn show_create_flow( &self, diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index ca9524b687..6b2e565c1f 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::sync::Arc; use catalog::information_schema::{ - columns, key_column_usage, schemata, tables, CHARACTER_SETS, COLLATIONS, COLUMNS, + columns, flows, key_column_usage, schemata, tables, CHARACTER_SETS, COLLATIONS, COLUMNS, FLOWS, KEY_COLUMN_USAGE, SCHEMATA, TABLES, VIEWS, }; use catalog::CatalogManagerRef; @@ -54,8 +54,8 @@ use sql::ast::Ident; use sql::parser::ParserContext; use sql::statements::create::{CreateFlow, CreateView, Partitions}; use sql::statements::show::{ - ShowColumns, ShowDatabases, ShowIndex, ShowKind, ShowTableStatus, ShowTables, ShowVariables, - ShowViews, + ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowTableStatus, ShowTables, + ShowVariables, ShowViews, }; use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; @@ -71,6 +71,7 @@ const SCHEMAS_COLUMN: &str = "Database"; const OPTIONS_COLUMN: &str = "Options"; const TABLES_COLUMN: &str = "Tables"; const VIEWS_COLUMN: &str = "Views"; +const FLOWS_COLUMN: &str = "Flows"; const FIELD_COLUMN: &str = "Field"; const TABLE_TYPE_COLUMN: &str = "Table_type"; const COLUMN_NAME_COLUMN: &str = "Column"; @@ -763,6 +764,33 @@ pub async fn show_views( .await } +/// Execute [`ShowFlows`] statement and return the [`Output`] if success. +pub async fn show_flows( + stmt: ShowFlows, + query_engine: &QueryEngineRef, + catalog_manager: &CatalogManagerRef, + query_ctx: QueryContextRef, +) -> Result { + let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)]; + let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))]; + let like_field = Some(flows::FLOW_NAME); + let sort = vec![col(flows::FLOW_NAME).sort(true, true)]; + + query_from_information_schema_table( + query_engine, + catalog_manager, + query_ctx, + FLOWS, + vec![], + projects, + filters, + like_field, + sort, + stmt.kind, + ) + .await +} + pub fn show_create_flow( flow_name: ObjectName, flow_val: FlowInfoValue, diff --git a/src/sql/src/parsers/show_parser.rs b/src/sql/src/parsers/show_parser.rs index 41f507b44d..1fe3b137aa 100644 --- a/src/sql/src/parsers/show_parser.rs +++ b/src/sql/src/parsers/show_parser.rs @@ -21,8 +21,8 @@ use crate::error::{ }; use crate::parser::ParserContext; use crate::statements::show::{ - ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowIndex, - ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews, + ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowFlows, + ShowIndex, ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews, }; use crate::statements::statement::Statement; @@ -46,6 +46,8 @@ impl<'a> ParserContext<'a> { } } else if self.consume_token("VIEWS") { self.parse_show_views() + } else if self.consume_token("FLOWS") { + self.parse_show_flows() } else if self.matches_keyword(Keyword::CHARSET) { self.parser.next_token(); Ok(Statement::ShowCharset(self.parse_show_kind()?)) @@ -442,7 +444,7 @@ impl<'a> ParserContext<'a> { })); } - // SHOW VIEWS [in | FROM] [DATABASE] + // SHOW FLOWS [in | FROM] [DATABASE] Token::Word(w) => match w.keyword { Keyword::IN | Keyword::FROM => self.parse_db_name()?, _ => None, @@ -454,6 +456,28 @@ impl<'a> ParserContext<'a> { Ok(Statement::ShowViews(ShowViews { kind, database })) } + + fn parse_show_flows(&mut self) -> Result { + let database = match self.parser.peek_token().token { + Token::EOF | Token::SemiColon => { + return Ok(Statement::ShowFlows(ShowFlows { + kind: ShowKind::All, + database: None, + })); + } + + // SHOW FLOWS [in | FROM] [DATABASE] + Token::Word(w) => match w.keyword { + Keyword::IN | Keyword::FROM => self.parse_db_name()?, + _ => None, + }, + _ => None, + }; + + let kind = self.parse_show_kind()?; + + Ok(Statement::ShowFlows(ShowFlows { kind, database })) + } } #[cfg(test)] @@ -1000,4 +1024,38 @@ mod tests { ); assert_eq!(sql, stmts[0].to_string()); } + + #[test] + pub fn test_show_flows() { + let sql = "SHOW FLOWS"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + let stmts = result.unwrap(); + assert_eq!(1, stmts.len()); + assert_eq!( + stmts[0], + Statement::ShowFlows(ShowFlows { + kind: ShowKind::All, + database: None, + }) + ); + assert_eq!(sql, stmts[0].to_string()); + } + + #[test] + pub fn test_show_flows_in_db() { + let sql = "SHOW FLOWS IN d1"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + let stmts = result.unwrap(); + assert_eq!(1, stmts.len()); + assert_eq!( + stmts[0], + Statement::ShowFlows(ShowFlows { + kind: ShowKind::All, + database: Some("d1".to_string()), + }) + ); + assert_eq!(sql, stmts[0].to_string()); + } } diff --git a/src/sql/src/statements/show.rs b/src/sql/src/statements/show.rs index abb3488dbc..c76461e4bb 100644 --- a/src/sql/src/statements/show.rs +++ b/src/sql/src/statements/show.rs @@ -187,6 +187,25 @@ impl Display for ShowCreateFlow { } } +/// SQL structure for `SHOW FLOWS`. +#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)] +pub struct ShowFlows { + pub kind: ShowKind, + pub database: Option, +} + +impl Display for ShowFlows { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SHOW FLOWS")?; + if let Some(database) = &self.database { + write!(f, " IN {database}")?; + } + format_kind!(self, f); + + Ok(()) + } +} + /// SQL structure for `SHOW CREATE VIEW`. #[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)] pub struct ShowCreateView { @@ -535,6 +554,49 @@ SHOW VIEWS"#, } } + #[test] + fn test_display_show_flows() { + let sql = r"show flows in d1;"; + let stmts: Vec = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, stmts.len()); + assert_matches!(&stmts[0], Statement::ShowFlows { .. }); + match &stmts[0] { + Statement::ShowFlows(show) => { + let new_sql = format!("\n{}", show); + assert_eq!( + r#" +SHOW FLOWS IN d1"#, + &new_sql + ); + } + _ => { + unreachable!(); + } + } + + let sql = r"show flows;"; + let stmts: Vec = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + assert_eq!(1, stmts.len()); + assert_matches!(&stmts[0], Statement::ShowFlows { .. }); + match &stmts[0] { + Statement::ShowFlows(show) => { + let new_sql = format!("\n{}", show); + assert_eq!( + r#" +SHOW FLOWS"#, + &new_sql + ); + } + _ => { + unreachable!("{:?}", &stmts[0]); + } + } + } + #[test] fn test_display_show_databases() { let sql = r"show databases;"; diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index 5dbe243a54..9115902bf5 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -31,8 +31,8 @@ use crate::statements::insert::Insert; use crate::statements::query::Query; use crate::statements::set_variables::SetVariables; use crate::statements::show::{ - ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowIndex, - ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews, + ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowFlows, + ShowIndex, ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews, }; use crate::statements::tql::Tql; use crate::statements::truncate::TruncateTable; @@ -87,6 +87,8 @@ pub enum Statement { ShowCreateTable(ShowCreateTable), // SHOW CREATE FLOW ShowCreateFlow(ShowCreateFlow), + /// SHOW FLOWS + ShowFlows(ShowFlows), // SHOW CREATE VIEW ShowCreateView(ShowCreateView), // SHOW STATUS @@ -133,6 +135,7 @@ impl Display for Statement { Statement::ShowIndex(s) => s.fmt(f), Statement::ShowCreateTable(s) => s.fmt(f), Statement::ShowCreateFlow(s) => s.fmt(f), + Statement::ShowFlows(s) => s.fmt(f), Statement::ShowCreateView(s) => s.fmt(f), Statement::ShowViews(s) => s.fmt(f), Statement::ShowStatus(s) => s.fmt(f), diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index b09d026b6e..004d614f97 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -13,6 +13,18 @@ create table out_num_cnt ( Affected Rows: 0 +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; + ++-----------+---------------+-----------------+ +| flow_name | table_catalog | flow_definition | ++-----------+---------------+-----------------+ ++-----------+---------------+-----------------+ + +SHOW FLOWS; + +++ +++ + CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10; Affected Rows: 0 @@ -27,10 +39,38 @@ SHOW CREATE FLOW filter_numbers; | | AS SELECT number FROM numbers_input WHERE number > 10 | +----------------+-------------------------------------------------------+ +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; + ++----------------+---------------+----------------------------------------------------+ +| flow_name | table_catalog | flow_definition | ++----------------+---------------+----------------------------------------------------+ +| filter_numbers | greptime | SELECT number FROM numbers_input WHERE number > 10 | ++----------------+---------------+----------------------------------------------------+ + +SHOW FLOWS; + ++----------------+ +| Flows | ++----------------+ +| filter_numbers | ++----------------+ + drop flow filter_numbers; Affected Rows: 0 +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; + ++-----------+---------------+-----------------+ +| flow_name | table_catalog | flow_definition | ++-----------+---------------+-----------------+ ++-----------+---------------+-----------------+ + +SHOW FLOWS; + +++ +++ + drop table out_num_cnt; Affected Rows: 0 diff --git a/tests/cases/standalone/common/flow/show_create_flow.sql b/tests/cases/standalone/common/flow/show_create_flow.sql index d30557f4c4..0f5907877f 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.sql +++ b/tests/cases/standalone/common/flow/show_create_flow.sql @@ -8,12 +8,25 @@ create table out_num_cnt ( number INT, ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX); + +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; + +SHOW FLOWS; + CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10; SHOW CREATE FLOW filter_numbers; +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; + +SHOW FLOWS; + drop flow filter_numbers; +SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS; + +SHOW FLOWS; + drop table out_num_cnt; drop table numbers_input; diff --git a/tests/cases/standalone/common/show/show_databases_tables.result b/tests/cases/standalone/common/show/show_databases_tables.result index 3f7524eb29..0b32396cd7 100644 --- a/tests/cases/standalone/common/show/show_databases_tables.result +++ b/tests/cases/standalone/common/show/show_databases_tables.result @@ -41,6 +41,7 @@ SHOW TABLES; | engines | | events | | files | +| flows | | global_status | | key_column_usage | | optimizer_trace | @@ -86,6 +87,7 @@ SHOW FULL TABLES; | engines | LOCAL TEMPORARY | | events | LOCAL TEMPORARY | | files | LOCAL TEMPORARY | +| flows | LOCAL TEMPORARY | | global_status | LOCAL TEMPORARY | | key_column_usage | LOCAL TEMPORARY | | optimizer_trace | LOCAL TEMPORARY | @@ -125,6 +127,7 @@ SHOW TABLE STATUS; |engines||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0||| |events||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0||| |files||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0||| +|flows||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0||| |global_status||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0||| |key_column_usage||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0||| |optimizer_trace||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0||| diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 2d7cb914ed..9dc0c0fc07 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -26,6 +26,7 @@ order by table_schema, table_name; |greptime|information_schema|engines|LOCALTEMPORARY|5|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y| |greptime|information_schema|events|LOCALTEMPORARY|13|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y| |greptime|information_schema|files|LOCALTEMPORARY|14|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y| +|greptime|information_schema|flows|LOCALTEMPORARY|33|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y| |greptime|information_schema|global_status|LOCALTEMPORARY|25|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y| |greptime|information_schema|key_column_usage|LOCALTEMPORARY|16|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y| |greptime|information_schema|optimizer_trace|LOCALTEMPORARY|17|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y| @@ -185,6 +186,16 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | files | update_count | 13 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | | | greptime | information_schema | files | update_time | 34 | | | | | 3 | | | | | select,insert | | DateTime | datetime | FIELD | | No | datetime | | | | greptime | information_schema | files | version | 25 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | flows | comment | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | flows | expire_after | 6 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | Yes | bigint | | | +| greptime | information_schema | flows | flow_definition | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | flows | flow_id | 2 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | | +| greptime | information_schema | flows | flow_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | flows | flownode_ids | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | flows | options | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | flows | sink_table_name | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | +| greptime | information_schema | flows | source_table_ids | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | | +| greptime | information_schema | flows | table_catalog | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | global_status | variable_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | global_status | variable_value | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | | greptime | information_schema | key_column_usage | column_name | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | | diff --git a/tests/cases/standalone/common/view/create.result b/tests/cases/standalone/common/view/create.result index 15b1910ed6..05480b3969 100644 --- a/tests/cases/standalone/common/view/create.result +++ b/tests/cases/standalone/common/view/create.result @@ -83,6 +83,7 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE; |greptime|information_schema|engines|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| |greptime|information_schema|events|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| |greptime|information_schema|files|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| +|greptime|information_schema|flows|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| |greptime|information_schema|global_status|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| |greptime|information_schema|key_column_usage|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y| |greptime|public|numbers|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID|test_engine|ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|