Compare commits

...

10 Commits

Author SHA1 Message Date
zyy17
2ae2a6674e refactor: add get_storage_path() and get_catalog_and_schema() (#4397)
refactor: add get_storage_path() and get_catalog_and_schema()
2024-07-20 01:55:48 +00:00
Lei, HUANG
c8cf3b1677 fix(wal): handle WAL deletion on region drop (#4400)
Add LogStore trait bound to RegionWorkerLoop and handle WAL deletion on region drop.
2024-07-19 13:24:10 +00:00
Yingwen
7aae19aa8b fix: dictionary key type use u32 (#4396)
* fix: dictionary key type use u32

* fix: fix error whle reading content

* fix: bulk memtable dictionary type
2024-07-19 09:51:29 +00:00
Jeremyhi
b90267dd80 feat: export database data (#4382)
* feat: export database data

* feat: export data with time range

* feat: refactor the data dir

* feat: by comment
2024-07-19 09:29:45 +00:00
discord9
9fa9156bde feat: FLOWS table in information_schema&SHOW FLOWS (#4386)
* feat(WIP): flow info table

refactor: better err handling&log

feat: add flow metadata to info schema provider

feat(WIP): info_schema.flows

feat: info_schema.flows table

* fix: err after rebase

* fix: wrong comparsion op

* feat: SHOW FLOWS&tests

* refactor: per review

* chore: unused

* refactor: json error

* chore: per review

* test: sqlness

* chore: rm inline error

* refactor: per review
2024-07-19 09:29:36 +00:00
zyy17
ce900e850a fix: user provider can't be configured by config file or environment variables (#4398) 2024-07-19 08:41:29 +00:00
zyy17
5274c5a407 refactor: add &mut Plugins argument in plugins setup api and remove unnecessary mut (#4389)
refactor: add '&mut Plugins' argument in plugins setup api and remove unnecessary mut

Signed-off-by: zyy17 <zyylsxm@gmail.com>
2024-07-19 08:12:06 +00:00
Yingwen
0b13ac6e16 ci: disable auto review (#4387) 2024-07-18 08:03:37 +00:00
shuiyisong
8ab6136d1c chore: support pattern as pipeline key name (#4368)
* chore: add pattern to processor key name

* fix: typo

* refactor: test
2024-07-18 03:32:26 +00:00
Weny Xu
e39f49fe56 fix: ensure keep alive is completed in time (#4349)
* fix: ensure keep alive is completed in time

* chore: apply suggestions from CR

* chore: use write runtime

* refactor: set META_LEASE_SECS to 5

* chore: set etcd replicas to 1

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* fix: set `MissedTickBehavior::Delay`

* chore: apply suggestions from CR
2024-07-17 06:14:45 +00:00
53 changed files with 1434 additions and 213 deletions

15
.coderabbit.yaml Normal file
View File

@@ -0,0 +1,15 @@
# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json
language: "en-US"
early_access: false
reviews:
profile: "chill"
request_changes_workflow: false
high_level_summary: true
poem: true
review_status: true
collapse_walkthrough: false
auto_review:
enabled: false
drafts: false
chat:
auto_reply: true

View File

@@ -2,7 +2,7 @@ name: Setup Etcd cluster
description: Deploy Etcd cluster on Kubernetes
inputs:
etcd-replicas:
default: 3
default: 1
description: "Etcd replicas"
namespace:
default: "etcd-cluster"

1
Cargo.lock generated
View File

@@ -10899,6 +10899,7 @@ dependencies = [
"common-base",
"common-error",
"common-macro",
"common-meta",
"common-recordbatch",
"common-time",
"common-wal",

View File

@@ -57,6 +57,31 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("Failed to list flows in catalog {catalog}"))]
ListFlows {
#[snafu(implicit)]
location: Location,
catalog: String,
source: BoxedError,
},
#[snafu(display("Flow info not found: {flow_name} in catalog {catalog_name}"))]
FlowInfoNotFound {
flow_name: String,
catalog_name: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Can't convert value to json, input={input}"))]
Json {
input: String,
#[snafu(source)]
error: serde_json::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to re-compile script due to internal error"))]
CompileScriptInternal {
#[snafu(implicit)]
@@ -254,12 +279,15 @@ impl ErrorExt for Error {
| Error::FindPartitions { .. }
| Error::FindRegionRoutes { .. }
| Error::CacheNotFound { .. }
| Error::CastManager { .. } => StatusCode::Unexpected,
| Error::CastManager { .. }
| Error::Json { .. } => StatusCode::Unexpected,
Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments,
Error::ViewInfoNotFound { .. } => StatusCode::TableNotFound,
Error::FlowInfoNotFound { .. } => StatusCode::FlowNotFound,
Error::SystemCatalog { .. } => StatusCode::StorageUnavailable,
Error::UpgradeWeakCatalogManagerRef { .. } => StatusCode::Internal,
@@ -270,7 +298,8 @@ impl ErrorExt for Error {
Error::ListCatalogs { source, .. }
| Error::ListNodes { source, .. }
| Error::ListSchemas { source, .. }
| Error::ListTables { source, .. } => source.status_code(),
| Error::ListTables { source, .. }
| Error::ListFlows { source, .. } => source.status_code(),
Error::CreateTable { source, .. } => source.status_code(),

View File

@@ -25,6 +25,7 @@ use common_config::Mode;
use common_error::ext::BoxedError;
use common_meta::cache::{LayeredCacheRegistryRef, ViewInfoCacheRef};
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_name::TableNameKey;
@@ -85,7 +86,7 @@ impl KvBackendCatalogManager {
.get()
.expect("Failed to get table_route_cache"),
)),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())),
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
@@ -93,11 +94,13 @@ impl KvBackendCatalogManager {
information_schema_provider: Arc::new(InformationSchemaProvider::new(
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
Arc::new(FlowMetadataManager::new(backend.clone())),
)),
pg_catalog_provider: Arc::new(PGCatalogProvider::new(
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
backend,
},
cache_registry,
})
@@ -313,6 +316,7 @@ struct SystemCatalog {
// system_schema_provier for default catalog
information_schema_provider: Arc<InformationSchemaProvider>,
pg_catalog_provider: Arc<PGCatalogProvider>,
backend: KvBackendRef,
}
impl SystemCatalog {
@@ -358,6 +362,7 @@ impl SystemCatalog {
Arc::new(InformationSchemaProvider::new(
catalog.to_string(),
self.catalog_manager.clone(),
Arc::new(FlowMetadataManager::new(self.backend.clone())),
))
});
information_schema_provider.table(table_name)

View File

@@ -23,6 +23,8 @@ use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
};
use common_meta::key::flow::FlowMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use futures_util::stream::BoxStream;
use snafu::OptionExt;
use table::TableRef;
@@ -298,6 +300,7 @@ impl MemoryCatalogManager {
let information_schema_provider = InformationSchemaProvider::new(
catalog,
Arc::downgrade(self) as Weak<dyn CatalogManager>,
Arc::new(FlowMetadataManager::new(Arc::new(MemoryKvBackend::new()))),
);
let information_schema = information_schema_provider.tables().clone();

View File

@@ -14,6 +14,7 @@
mod cluster_info;
pub mod columns;
pub mod flows;
mod information_memory_table;
pub mod key_column_usage;
mod partitions;
@@ -31,6 +32,7 @@ use std::collections::HashMap;
use std::sync::{Arc, Weak};
use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
use common_meta::key::flow::FlowMetadataManager;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use lazy_static::lazy_static;
@@ -46,6 +48,7 @@ use self::columns::InformationSchemaColumns;
use super::{SystemSchemaProviderInner, SystemTable, SystemTableRef};
use crate::error::Result;
use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
use crate::system_schema::information_schema::flows::InformationSchemaFlows;
use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
@@ -104,6 +107,7 @@ macro_rules! setup_memory_table {
pub struct InformationSchemaProvider {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>,
tables: HashMap<String, TableRef>,
}
@@ -182,16 +186,25 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
FLOWS => Some(Arc::new(InformationSchemaFlows::new(
self.catalog_name.clone(),
self.flow_metadata_manager.clone(),
)) as _),
_ => None,
}
}
}
impl InformationSchemaProvider {
pub fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
pub fn new(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>,
) -> Self {
let mut provider = Self {
catalog_name,
catalog_manager,
flow_metadata_manager,
tables: HashMap::new(),
};
@@ -238,6 +251,7 @@ impl InformationSchemaProvider {
TABLE_CONSTRAINTS.to_string(),
self.build_table(TABLE_CONSTRAINTS).unwrap(),
);
tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
// Add memory tables
for name in MEMORY_TABLES.iter() {

View File

@@ -0,0 +1,305 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
use common_error::ext::BoxedError;
use common_meta::key::flow::flow_info::FlowInfoValue;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::FlowId;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datatypes::prelude::ConcreteDataType as CDT;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use crate::error::{
CreateRecordBatchSnafu, FlowInfoNotFoundSnafu, InternalSnafu, JsonSnafu, ListFlowsSnafu, Result,
};
use crate::information_schema::{Predicates, FLOWS};
use crate::system_schema::information_schema::InformationTable;
const INIT_CAPACITY: usize = 42;
// rows of information_schema.flows
// pk is (flow_name, flow_id, table_catalog)
pub const FLOW_NAME: &str = "flow_name";
pub const FLOW_ID: &str = "flow_id";
pub const TABLE_CATALOG: &str = "table_catalog";
pub const FLOW_DEFINITION: &str = "flow_definition";
pub const COMMENT: &str = "comment";
pub const EXPIRE_AFTER: &str = "expire_after";
pub const SOURCE_TABLE_IDS: &str = "source_table_ids";
pub const SINK_TABLE_NAME: &str = "sink_table_name";
pub const FLOWNODE_IDS: &str = "flownode_ids";
pub const OPTIONS: &str = "options";
/// The `information_schema.flows` to provides information about flows in databases.
pub(super) struct InformationSchemaFlows {
schema: SchemaRef,
catalog_name: String,
flow_metadata_manager: Arc<FlowMetadataManager>,
}
impl InformationSchemaFlows {
pub(super) fn new(
catalog_name: String,
flow_metadata_manager: Arc<FlowMetadataManager>,
) -> Self {
Self {
schema: Self::schema(),
catalog_name,
flow_metadata_manager,
}
}
/// for complex fields(including [`SOURCE_TABLE_IDS`], [`FLOWNODE_IDS`] and [`OPTIONS`]), it will be serialized to json string for now
/// TODO(discord9): use a better way to store complex fields like json type
pub(crate) fn schema() -> SchemaRef {
Arc::new(Schema::new(
vec![
(FLOW_NAME, CDT::string_datatype(), false),
(FLOW_ID, CDT::uint32_datatype(), false),
(TABLE_CATALOG, CDT::string_datatype(), false),
(FLOW_DEFINITION, CDT::string_datatype(), false),
(COMMENT, CDT::string_datatype(), true),
(EXPIRE_AFTER, CDT::int64_datatype(), true),
(SOURCE_TABLE_IDS, CDT::string_datatype(), true),
(SINK_TABLE_NAME, CDT::string_datatype(), false),
(FLOWNODE_IDS, CDT::string_datatype(), true),
(OPTIONS, CDT::string_datatype(), true),
]
.into_iter()
.map(|(name, ty, nullable)| ColumnSchema::new(name, ty, nullable))
.collect(),
))
}
fn builder(&self) -> InformationSchemaFlowsBuilder {
InformationSchemaFlowsBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
&self.flow_metadata_manager,
)
}
}
impl InformationTable for InformationSchemaFlows {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_FLOW_TABLE_ID
}
fn table_name(&self) -> &'static str {
FLOWS
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_flows(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}
/// Builds the `information_schema.FLOWS` table row by row
///
/// columns are based on [`FlowInfoValue`]
struct InformationSchemaFlowsBuilder {
schema: SchemaRef,
catalog_name: String,
flow_metadata_manager: Arc<FlowMetadataManager>,
flow_names: StringVectorBuilder,
flow_ids: UInt32VectorBuilder,
table_catalogs: StringVectorBuilder,
raw_sqls: StringVectorBuilder,
comments: StringVectorBuilder,
expire_afters: Int64VectorBuilder,
source_table_id_groups: StringVectorBuilder,
sink_table_names: StringVectorBuilder,
flownode_id_groups: StringVectorBuilder,
option_groups: StringVectorBuilder,
}
impl InformationSchemaFlowsBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
flow_metadata_manager: &Arc<FlowMetadataManager>,
) -> Self {
Self {
schema,
catalog_name,
flow_metadata_manager: flow_metadata_manager.clone(),
flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY),
comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
expire_afters: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
source_table_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
sink_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
flownode_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
option_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}
/// Construct the `information_schema.flows` virtual table
async fn make_flows(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let predicates = Predicates::from_scan_request(&request);
let flow_info_manager = self.flow_metadata_manager.clone();
// TODO(discord9): use `AsyncIterator` once it's stable-ish
let mut stream = flow_info_manager
.flow_name_manager()
.flow_names(&catalog_name)
.await;
while let Some((flow_name, flow_id)) = stream
.try_next()
.await
.map_err(BoxedError::new)
.context(ListFlowsSnafu {
catalog: &catalog_name,
})?
{
let flow_info = flow_info_manager
.flow_info_manager()
.get(flow_id.flow_id())
.await
.map_err(BoxedError::new)
.context(InternalSnafu)?
.context(FlowInfoNotFoundSnafu {
catalog_name: catalog_name.to_string(),
flow_name: flow_name.to_string(),
})?;
self.add_flow(&predicates, flow_id.flow_id(), flow_info)?;
}
self.finish()
}
fn add_flow(
&mut self,
predicates: &Predicates,
flow_id: FlowId,
flow_info: FlowInfoValue,
) -> Result<()> {
let row = [
(FLOW_NAME, &Value::from(flow_info.flow_name().to_string())),
(FLOW_ID, &Value::from(flow_id)),
(
TABLE_CATALOG,
&Value::from(flow_info.catalog_name().to_string()),
),
];
if !predicates.eval(&row) {
return Ok(());
}
self.flow_names.push(Some(flow_info.flow_name()));
self.flow_ids.push(Some(flow_id));
self.table_catalogs.push(Some(flow_info.catalog_name()));
self.raw_sqls.push(Some(flow_info.raw_sql()));
self.comments.push(Some(flow_info.comment()));
self.expire_afters.push(flow_info.expire_after());
self.source_table_id_groups.push(Some(
&serde_json::to_string(flow_info.source_table_ids()).context(JsonSnafu {
input: format!("{:?}", flow_info.source_table_ids()),
})?,
));
self.sink_table_names
.push(Some(&flow_info.sink_table_name().to_string()));
self.flownode_id_groups.push(Some(
&serde_json::to_string(flow_info.flownode_ids()).context({
JsonSnafu {
input: format!("{:?}", flow_info.flownode_ids()),
}
})?,
));
self.option_groups
.push(Some(&serde_json::to_string(flow_info.options()).context(
JsonSnafu {
input: format!("{:?}", flow_info.options()),
},
)?));
Ok(())
}
fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> = vec![
Arc::new(self.flow_names.finish()),
Arc::new(self.flow_ids.finish()),
Arc::new(self.table_catalogs.finish()),
Arc::new(self.raw_sqls.finish()),
Arc::new(self.comments.finish()),
Arc::new(self.expire_afters.finish()),
Arc::new(self.source_table_id_groups.finish()),
Arc::new(self.sink_table_names.finish()),
Arc::new(self.flownode_id_groups.finish()),
Arc::new(self.option_groups.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}
impl DfPartitionStream for InformationSchemaFlows {
fn schema(&self) -> &arrow_schema::SchemaRef {
self.schema.arrow_schema()
}
fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema: Arc<arrow_schema::Schema> = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_flows(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}

View File

@@ -44,3 +44,4 @@ pub const REGION_PEERS: &str = "region_peers";
pub const TABLE_CONSTRAINTS: &str = "table_constraints";
pub const CLUSTER_INFO: &str = "cluster_info";
pub const VIEWS: &str = "views";
pub const FLOWS: &str = "flows";

View File

@@ -46,7 +46,10 @@ enum ExportTarget {
#[default]
CreateTable,
/// Corresponding to `EXPORT TABLE`
#[deprecated(note = "Please use `DatabaseData` instead.")]
TableData,
/// Corresponding to `EXPORT DATABASE`
DatabaseData,
}
#[derive(Debug, Default, Parser)]
@@ -75,7 +78,17 @@ pub struct ExportCommand {
#[clap(long, short = 't', value_enum)]
target: ExportTarget,
/// basic authentication for connecting to the server
/// A half-open time range: [start_time, end_time).
/// The start of the time range (time-index column) for data export.
#[clap(long)]
start_time: Option<String>,
/// A half-open time range: [start_time, end_time).
/// The end of the time range (time-index column) for data export.
#[clap(long)]
end_time: Option<String>,
/// The basic authentication for connecting to the server
#[clap(long)]
auth_basic: Option<String>,
}
@@ -99,6 +112,8 @@ impl ExportCommand {
output_dir: self.output_dir.clone(),
parallelism: self.export_jobs,
target: self.target.clone(),
start_time: self.start_time.clone(),
end_time: self.end_time.clone(),
auth_header,
}),
guard,
@@ -113,6 +128,8 @@ pub struct Export {
output_dir: String,
parallelism: usize,
target: ExportTarget,
start_time: Option<String>,
end_time: Option<String>,
auth_header: Option<String>,
}
@@ -167,7 +184,7 @@ impl Export {
};
let mut result = Vec::with_capacity(records.len());
for value in records {
let serde_json::Value::String(schema) = &value[0] else {
let Value::String(schema) = &value[0] else {
unreachable!()
};
if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
@@ -256,7 +273,7 @@ impl Export {
let Some(records) = result else {
EmptyResultSnafu.fail()?
};
let serde_json::Value::String(create_table) = &records[0][1] else {
let Value::String(create_table) = &records[0][1] else {
unreachable!()
};
@@ -276,11 +293,13 @@ impl Export {
let (metric_physical_tables, remaining_tables) =
self.get_table_list(&catalog, &schema).await?;
let table_count = metric_physical_tables.len() + remaining_tables.len();
tokio::fs::create_dir_all(&self.output_dir)
let output_dir = Path::new(&self.output_dir)
.join(&catalog)
.join(format!("{schema}/"));
tokio::fs::create_dir_all(&output_dir)
.await
.context(FileIoSnafu)?;
let output_file =
Path::new(&self.output_dir).join(format!("{catalog}-{schema}.sql"));
let output_file = Path::new(&output_dir).join("create_tables.sql");
let mut file = File::create(output_file).await.context(FileIoSnafu)?;
for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) {
match self.show_create_table(&c, &s, &t).await {
@@ -294,7 +313,12 @@ impl Export {
}
}
}
info!("finished exporting {catalog}.{schema} with {table_count} tables",);
info!(
"Finished exporting {catalog}.{schema} with {table_count} table schemas to path: {}",
output_dir.to_string_lossy()
);
Ok::<(), Error>(())
});
}
@@ -409,14 +433,106 @@ impl Export {
Ok(())
}
async fn export_database_data(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let db_names = self.iter_db_names().await?;
let db_count = db_names.len();
let mut tasks = Vec::with_capacity(db_names.len());
for (catalog, schema) in db_names {
let semaphore_moved = semaphore.clone();
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
let output_dir = Path::new(&self.output_dir)
.join(&catalog)
.join(format!("{schema}/"));
tokio::fs::create_dir_all(&output_dir)
.await
.context(FileIoSnafu)?;
let with_options = match (&self.start_time, &self.end_time) {
(Some(start_time), Some(end_time)) => {
format!(
"WITH (FORMAT='parquet', start_time='{}', end_time='{}')",
start_time, end_time
)
}
(Some(start_time), None) => {
format!("WITH (FORMAT='parquet', start_time='{}')", start_time)
}
(None, Some(end_time)) => {
format!("WITH (FORMAT='parquet', end_time='{}')", end_time)
}
(None, None) => "WITH (FORMAT='parquet')".to_string(),
};
let sql = format!(
r#"COPY DATABASE "{}"."{}" TO '{}' {};"#,
catalog,
schema,
output_dir.to_str().unwrap(),
with_options
);
info!("Executing sql: {sql}");
self.sql(&sql).await?;
info!(
"Finished exporting {catalog}.{schema} data into path: {}",
output_dir.to_string_lossy()
);
// The export copy from sql
let copy_from_file = output_dir.join("copy_from.sql");
let mut writer =
BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?);
let copy_database_from_sql = format!(
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='parquet');"#,
catalog,
schema,
output_dir.to_str().unwrap()
);
writer
.write(copy_database_from_sql.as_bytes())
.await
.context(FileIoSnafu)?;
writer.flush().await.context(FileIoSnafu)?;
info!("Finished exporting {catalog}.{schema} copy_from.sql");
Ok::<(), Error>(())
})
}
let success = futures::future::join_all(tasks)
.await
.into_iter()
.filter(|r| match r {
Ok(_) => true,
Err(e) => {
error!(e; "export database job failed");
false
}
})
.count();
let elapsed = timer.elapsed();
info!("Success {success}/{db_count} jobs, costs: {:?}", elapsed);
Ok(())
}
}
#[allow(deprecated)]
#[async_trait]
impl Tool for Export {
async fn do_work(&self) -> Result<()> {
match self.target {
ExportTarget::CreateTable => self.export_create_table().await,
ExportTarget::TableData => self.export_table_data().await,
ExportTarget::DatabaseData => self.export_database_data().await,
}
}
}
@@ -496,7 +612,9 @@ mod tests {
let output_file = output_dir
.path()
.join("greptime-cli.export.create_table.sql");
.join("greptime")
.join("cli.export.create_table")
.join("create_tables.sql");
let res = std::fs::read_to_string(output_file).unwrap();
let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" (
"ts" TIMESTAMP(3) NOT NULL,

View File

@@ -18,6 +18,7 @@ use std::time::Duration;
use async_trait::async_trait;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_base::Plugins;
use common_config::Configurable;
use common_telemetry::logging::TracingOptions;
use common_telemetry::{info, warn};
@@ -271,8 +272,9 @@ impl StartCommand {
info!("Datanode start command: {:#?}", self);
info!("Datanode options: {:#?}", opts);
let mut opts = opts.component;
let plugins = plugins::setup_datanode_plugins(&mut opts)
let opts = opts.component;
let mut plugins = Plugins::new();
plugins::setup_datanode_plugins(&mut plugins, &opts)
.await
.context(StartDatanodeSnafu)?;

View File

@@ -20,6 +20,7 @@ use cache::{build_fundamental_cache_registry, with_default_composite_cache_regis
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
@@ -245,7 +246,9 @@ impl StartCommand {
opts.mode = Mode::Distributed;
}
opts.user_provider.clone_from(&self.user_provider);
if let Some(user_provider) = &self.user_provider {
opts.user_provider = Some(user_provider.clone());
}
Ok(())
}
@@ -264,9 +267,9 @@ impl StartCommand {
info!("Frontend start command: {:#?}", self);
info!("Frontend options: {:#?}", opts);
let mut opts = opts.component;
#[allow(clippy::unnecessary_mut_passed)]
let plugins = plugins::setup_frontend_plugins(&mut opts)
let opts = opts.component;
let mut plugins = Plugins::new();
plugins::setup_frontend_plugins(&mut plugins, &opts)
.await
.context(StartFrontendSnafu)?;
@@ -458,7 +461,7 @@ mod tests {
#[tokio::test]
async fn test_try_from_start_command_to_anymap() {
let mut fe_opts = frontend::frontend::FrontendOptions {
let fe_opts = frontend::frontend::FrontendOptions {
http: HttpOptions {
disable_dashboard: false,
..Default::default()
@@ -467,8 +470,10 @@ mod tests {
..Default::default()
};
#[allow(clippy::unnecessary_mut_passed)]
let plugins = plugins::setup_frontend_plugins(&mut fe_opts).await.unwrap();
let mut plugins = Plugins::new();
plugins::setup_frontend_plugins(&mut plugins, &fe_opts)
.await
.unwrap();
let provider = plugins.get::<UserProviderRef>().unwrap();
let result = provider

View File

@@ -16,6 +16,7 @@ use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use common_base::Plugins;
use common_config::Configurable;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
@@ -238,8 +239,9 @@ impl StartCommand {
info!("Metasrv start command: {:#?}", self);
info!("Metasrv options: {:#?}", opts);
let mut opts = opts.component;
let plugins = plugins::setup_metasrv_plugins(&mut opts)
let opts = opts.component;
let mut plugins = Plugins::new();
plugins::setup_metasrv_plugins(&mut plugins, &opts)
.await
.context(StartMetaServerSnafu)?;

View File

@@ -19,6 +19,7 @@ use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::kvbackend::KvBackendCatalogManager;
use clap::Parser;
use common_base::Plugins;
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
use common_error::ext::BoxedError;
@@ -396,7 +397,9 @@ impl StartCommand {
opts.influxdb.enable = self.influxdb_enable;
}
opts.user_provider.clone_from(&self.user_provider);
if let Some(user_provider) = &self.user_provider {
opts.user_provider = Some(user_provider.clone());
}
Ok(())
}
@@ -418,14 +421,18 @@ impl StartCommand {
info!("Standalone start command: {:#?}", self);
info!("Standalone options: {opts:#?}");
let mut plugins = Plugins::new();
let opts = opts.component;
let mut fe_opts = opts.frontend_options();
#[allow(clippy::unnecessary_mut_passed)]
let fe_plugins = plugins::setup_frontend_plugins(&mut fe_opts) // mut ref is MUST, DO NOT change it
let fe_opts = opts.frontend_options();
let dn_opts = opts.datanode_options();
plugins::setup_frontend_plugins(&mut plugins, &fe_opts)
.await
.context(StartFrontendSnafu)?;
let dn_opts = opts.datanode_options();
plugins::setup_datanode_plugins(&mut plugins, &dn_opts)
.await
.context(StartDatanodeSnafu)?;
set_default_timezone(fe_opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?;
@@ -464,7 +471,7 @@ impl StartCommand {
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;
let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone())
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
@@ -472,7 +479,7 @@ impl StartCommand {
let flow_builder = FlownodeBuilder::new(
Default::default(),
fe_plugins.clone(),
plugins.clone(),
table_metadata_manager.clone(),
catalog_manager.clone(),
);
@@ -533,7 +540,7 @@ impl StartCommand {
node_manager.clone(),
ddl_task_executor.clone(),
)
.with_plugin(fe_plugins.clone())
.with_plugin(plugins.clone())
.try_build()
.await
.context(StartFrontendSnafu)?;
@@ -554,7 +561,7 @@ impl StartCommand {
let (tx, _rx) = broadcast::channel(1);
let servers = Services::new(fe_opts, Arc::new(frontend.clone()), fe_plugins)
let servers = Services::new(fe_opts, Arc::new(frontend.clone()), plugins)
.build()
.await
.context(StartFrontendSnafu)?;
@@ -636,13 +643,15 @@ mod tests {
#[tokio::test]
async fn test_try_from_start_command_to_anymap() {
let mut fe_opts = FrontendOptions {
let fe_opts = FrontendOptions {
user_provider: Some("static_user_provider:cmd:test=test".to_string()),
..Default::default()
};
#[allow(clippy::unnecessary_mut_passed)]
let plugins = plugins::setup_frontend_plugins(&mut fe_opts).await.unwrap();
let mut plugins = Plugins::new();
plugins::setup_frontend_plugins(&mut plugins, &fe_opts)
.await
.unwrap();
let provider = plugins.get::<UserProviderRef>().unwrap();
let result = provider

View File

@@ -96,6 +96,8 @@ pub const INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID: u32 = 30;
pub const INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID: u32 = 31;
/// id for information_schema.VIEWS
pub const INFORMATION_SCHEMA_VIEW_TABLE_ID: u32 = 32;
/// id for information_schema.FLOWS
pub const INFORMATION_SCHEMA_FLOW_TABLE_ID: u32 = 33;
/// ----- End of information_schema tables -----
/// ----- Begin of pg_catalog tables -----

View File

@@ -55,6 +55,12 @@ pub fn region_storage_path(catalog: &str, schema: &str) -> String {
format!("{}/{}", catalog, schema)
}
/// Extracts catalog and schema from the path that created by [region_storage_path].
pub fn get_catalog_and_schema(path: &str) -> Option<(String, String)> {
let mut split = path.split('/');
Some((split.next()?.to_string(), split.next()?.to_string()))
}
pub async fn check_and_get_physical_table_id(
table_metadata_manager: &TableMetadataManagerRef,
tasks: &[CreateTableTask],
@@ -145,3 +151,18 @@ pub fn convert_region_routes_to_detecting_regions(
})
.collect::<Vec<_>>()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_catalog_and_schema() {
let test_catalog = "my_catalog";
let test_schema = "my_schema";
let path = region_storage_path(test_catalog, test_schema);
let (catalog, schema) = get_catalog_and_schema(&path).unwrap();
assert_eq!(catalog, test_catalog);
assert_eq!(schema, test_schema);
}
}

View File

@@ -33,7 +33,7 @@ pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS;
pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
/// The lease seconds of metasrv leader.
pub const META_LEASE_SECS: u64 = 3;
pub const META_LEASE_SECS: u64 = 5;
/// In a lease, there are two opportunities for renewal.
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;

View File

@@ -142,6 +142,10 @@ impl FlowInfoValue {
&self.source_table_ids
}
pub fn catalog_name(&self) -> &String {
&self.catalog_name
}
pub fn flow_name(&self) -> &String {
&self.flow_name
}
@@ -161,6 +165,10 @@ impl FlowInfoValue {
pub fn comment(&self) -> &String {
&self.comment
}
pub fn options(&self) -> &HashMap<String, String> {
&self.options
}
}
pub type FlowInfoManagerRef = Arc<FlowInfoManager>;

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::stream::BoxStream;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
@@ -20,9 +23,14 @@ use snafu::OptionExt;
use crate::error::{self, Result};
use crate::key::flow::FlowScoped;
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{DeserializedValueWithBytes, FlowId, MetaKey, TableMetaValue, NAME_PATTERN};
use crate::key::{
BytesAdapter, DeserializedValueWithBytes, FlowId, MetaKey, TableMetaValue, NAME_PATTERN,
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::store::RangeRequest;
use crate::rpc::KeyValue;
const FLOW_NAME_KEY_PREFIX: &str = "name";
@@ -38,6 +46,7 @@ lazy_static! {
/// The layout: `__flow/name/{catalog_name}/{flow_name}`.
pub struct FlowNameKey<'a>(FlowScoped<FlowNameKeyInner<'a>>);
#[allow(dead_code)]
impl<'a> FlowNameKey<'a> {
/// Returns the [FlowNameKey]
pub fn new(catalog: &'a str, flow_name: &'a str) -> FlowNameKey<'a> {
@@ -45,13 +54,22 @@ impl<'a> FlowNameKey<'a> {
FlowNameKey(FlowScoped::new(inner))
}
#[cfg(test)]
pub fn range_start_key(catalog: &str) -> Vec<u8> {
let inner = BytesAdapter::from(Self::prefix(catalog).into_bytes());
FlowScoped::new(inner).to_bytes()
}
/// Return `name/{catalog}/` as prefix
pub fn prefix(catalog: &str) -> String {
format!("{}/{}/", FLOW_NAME_KEY_PREFIX, catalog)
}
/// Returns the catalog.
pub fn catalog(&self) -> &str {
self.0.catalog_name
}
#[cfg(test)]
/// Return the `flow_name`
pub fn flow_name(&self) -> &str {
self.0.flow_name
@@ -140,6 +158,12 @@ impl FlowNameValue {
}
}
pub fn flow_name_decoder(kv: KeyValue) -> Result<(String, FlowNameValue)> {
let flow_name = FlowNameKey::from_bytes(&kv.key)?;
let flow_id = FlowNameValue::try_from_raw_value(&kv.value)?;
Ok((flow_name.flow_name().to_string(), flow_id))
}
/// The manager of [FlowNameKey].
pub struct FlowNameManager {
kv_backend: KvBackendRef,
@@ -162,6 +186,25 @@ impl FlowNameManager {
.transpose()
}
/// Return all flows' names and ids
pub async fn flow_names(
&self,
catalog: &str,
) -> BoxStream<'static, Result<(String, FlowNameValue)>> {
let start_key = FlowNameKey::range_start_key(catalog);
common_telemetry::debug!("flow_names: start_key: {:?}", start_key);
let req = RangeRequest::new().with_prefix(start_key);
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
DEFAULT_PAGE_SIZE,
Arc::new(flow_name_decoder),
);
Box::pin(stream)
}
/// Returns true if the `flow` exists.
pub async fn exists(&self, catalog: &str, flow: &str) -> Result<bool> {
let key = FlowNameKey::new(catalog, flow);
@@ -212,4 +255,11 @@ mod tests {
assert_eq!(key.catalog(), "my_catalog");
assert_eq!(key.flow_name(), "my_task");
}
#[test]
fn test_key_start_range() {
assert_eq!(
b"__flow/name/greptime/".to_vec(),
FlowNameKey::range_start_key("greptime")
);
}
}

View File

@@ -26,7 +26,7 @@ use common_error::ext::BoxedError;
use common_meta::key::TableMetadataManagerRef;
use common_runtime::JoinHandle;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_telemetry::{debug, info};
use common_telemetry::{debug, info, trace};
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use greptime_proto::v1;
@@ -535,10 +535,10 @@ impl FlowWorkerManager {
} else {
(9 * avg_spd + cur_spd) / 10
};
debug!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
debug!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
since_last_run = tokio::time::Instant::now();
tokio::time::sleep(new_wait).await;
}

View File

@@ -436,7 +436,11 @@ impl InterThreadCallServer {
fn from_send_error<T>(err: mpsc::error::SendError<T>) -> Error {
InternalSnafu {
reason: format!("InterThreadCallServer resp failed: {}", err),
// this `err` will simply display `channel closed`
reason: format!(
"Worker's receiver channel have been closed unexpected: {}",
err
),
}
.build()
}

View File

@@ -16,13 +16,14 @@
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use common_telemetry::debug;
use datatypes::value::Value;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt};
use crate::error::{Error, InvalidQuerySnafu};
use crate::expr::error::EvalError;
use crate::expr::error::{EvalError, InternalSnafu};
use crate::expr::{Id, InvalidArgumentSnafu, LocalId, ScalarExpr};
use crate::repr::{self, value_to_internal_ts, Diff, Row};
@@ -709,6 +710,18 @@ impl MfpPlan {
}
if Some(lower_bound) != upper_bound && !null_eval {
if self.mfp.mfp.projection.iter().any(|c| values.len() <= *c) {
debug!("values={:?}, mfp={:?}", &values, &self.mfp.mfp);
let err = InternalSnafu {
reason: format!(
"Index out of bound for mfp={:?} and values={:?}",
&self.mfp.mfp, &values
),
}
.build();
return ret_err(err);
}
// safety: already checked that `projection` is not out of bound
let res_row = Row::pack(self.mfp.mfp.projection.iter().map(|c| values[*c].clone()));
let upper_opt =
upper_bound.map(|upper_bound| Ok((res_row.clone(), upper_bound, -diff)));

View File

@@ -509,6 +509,9 @@ pub fn check_permission(
Statement::ShowViews(stmt) => {
validate_db_permission!(stmt, query_ctx);
}
Statement::ShowFlows(stmt) => {
validate_db_permission!(stmt, query_ctx);
}
Statement::ShowStatus(_stmt) => {}
Statement::DescribeTable(stmt) => {
validate_param(stmt.name(), query_ctx)?;

View File

@@ -18,11 +18,12 @@ use std::time::Duration;
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, info, warn};
use etcd_client::{Client, GetOptions, PutOptions};
use snafu::{OptionExt, ResultExt};
use etcd_client::{Client, GetOptions, LeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use tokio::time::{timeout, MissedTickBehavior};
use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
use crate::error;
@@ -231,44 +232,43 @@ impl Election for EtcdElection {
.await
.context(error::EtcdFailedSnafu)?;
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS));
let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS);
let mut keep_alive_interval = tokio::time::interval(keep_lease_duration);
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
let _ = keep_alive_interval.tick().await;
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
if res.ttl() > 0 {
// Only after a successful `keep_alive` is the leader considered official.
if self
.is_leader
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.infancy.store(true, Ordering::Relaxed);
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader.clone())))
{
error!(e; "Failed to send leader change message");
}
}
} else {
if self.is_leader.load(Ordering::Relaxed) {
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader.clone())))
{
error!("Failed to send leader change message, error: {e}");
}
}
// The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`.
match timeout(
keep_lease_duration,
self.keep_alive(&mut keeper, &mut receiver, leader),
)
.await
{
Ok(Ok(())) => {
let _ = keep_alive_interval.tick().await;
}
Ok(Err(err)) => {
error!(err; "Failed to keep alive");
break;
}
Err(_) => {
error!("Refresh lease timeout");
break;
}
}
}
self.is_leader.store(false, Ordering::Relaxed);
if self
.is_leader
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::StepDown(Arc::new(leader.clone())))
{
error!(e; "Failed to send leader change message");
}
}
}
Ok(())
@@ -297,3 +297,40 @@ impl Election for EtcdElection {
self.leader_watcher.subscribe()
}
}
impl EtcdElection {
async fn keep_alive(
&self,
keeper: &mut LeaseKeeper,
receiver: &mut LeaseKeepAliveStream,
leader: &LeaderKey,
) -> Result<()> {
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
ensure!(
res.ttl() > 0,
error::UnexpectedSnafu {
violated: "Failed to refresh the lease",
}
);
// Only after a successful `keep_alive` is the leader considered official.
if self
.is_leader
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
self.infancy.store(true, Ordering::Relaxed);
if let Err(e) = self
.leader_watcher
.send(LeaderChangeMessage::Elected(Arc::new(leader.clone())))
{
error!(e; "Failed to send leader change message");
}
}
}
Ok(())
}
}

View File

@@ -450,7 +450,7 @@ impl Metasrv {
{
let election = election.clone();
let started = self.started.clone();
let _handle = common_runtime::spawn_bg(async move {
let _handle = common_runtime::spawn_write(async move {
while started.load(Ordering::Relaxed) {
let res = election.campaign().await;
if let Err(e) = res {

View File

@@ -22,11 +22,11 @@ use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
use datatypes::arrow;
use datatypes::arrow::array::{
Array, ArrayRef, BinaryBuilder, DictionaryArray, RecordBatch, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array,
UInt8Array, UInt8Builder,
TimestampMillisecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array,
UInt8Builder,
};
use datatypes::arrow::compute::TakeOptions;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, SchemaRef, UInt16Type};
use datatypes::arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
use datatypes::arrow_array::BinaryArray;
use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
@@ -40,6 +40,7 @@ use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu,
use crate::memtable::key_values::KeyValuesRef;
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec};
use crate::sst::parquet::format::PrimaryKeyArray;
use crate::sst::to_sst_arrow_schema;
#[derive(Debug)]
@@ -344,17 +345,17 @@ fn timestamp_array_to_iter(
}
/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
fn binary_array_to_dictionary(input: &BinaryArray) -> Result<DictionaryArray<UInt16Type>> {
fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
if input.is_empty() {
return Ok(DictionaryArray::new(
UInt16Array::from(Vec::<u16>::new()),
UInt32Array::from(Vec::<u32>::new()),
Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
));
}
let mut keys = Vec::with_capacity(16);
let mut values = BinaryBuilder::new();
let mut prev: usize = 0;
keys.push(prev as u16);
keys.push(prev as u32);
values.append_value(input.value(prev));
for current_bytes in input.iter().skip(1) {
@@ -365,11 +366,11 @@ fn binary_array_to_dictionary(input: &BinaryArray) -> Result<DictionaryArray<UIn
values.append_value(current_bytes);
prev += 1;
}
keys.push(prev as u16);
keys.push(prev as u32);
}
Ok(DictionaryArray::new(
UInt16Array::from(keys),
UInt32Array::from(keys),
Arc::new(values.finish()) as ArrayRef,
))
}
@@ -387,7 +388,7 @@ mod tests {
fn check_binary_array_to_dictionary(
input: &[&[u8]],
expected_keys: &[u16],
expected_keys: &[u32],
expected_values: &[&[u8]],
) {
let input = BinaryArray::from_iter_values(input.iter());

View File

@@ -69,7 +69,7 @@ fn internal_fields() -> [FieldRef; 3] {
[
Arc::new(Field::new_dictionary(
PRIMARY_KEY_COLUMN_NAME,
ArrowDataType::UInt16,
ArrowDataType::UInt32,
ArrowDataType::Binary,
false,
)),

View File

@@ -15,7 +15,7 @@
//! Format to store in parquet.
//!
//! We store three internal columns in parquet:
//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint16, binary)
//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint32, binary)
//! - `__sequence`, the sequence number of a row. Type: uint64
//! - `__op_type`, the op type of the row. Type: uint8
//!
@@ -32,8 +32,8 @@ use std::sync::Arc;
use api::v1::SemanticType;
use datafusion_common::ScalarValue;
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt16Array, UInt64Array};
use datatypes::arrow::datatypes::{SchemaRef, UInt16Type};
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array};
use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::DataType;
use datatypes::vectors::{Helper, Vector};
@@ -50,6 +50,9 @@ use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::to_sst_arrow_schema;
/// Arrow array type for the primary key dictionary.
pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
/// Number of columns that have fixed positions.
///
/// Contains: time index and internal columns.
@@ -230,7 +233,7 @@ impl ReadFormat {
// Compute primary key offsets.
let pk_dict_array = pk_array
.as_any()
.downcast_ref::<DictionaryArray<UInt16Type>>()
.downcast_ref::<PrimaryKeyArray>()
.with_context(|| InvalidRecordBatchSnafu {
reason: format!("primary key array should not be {:?}", pk_array.data_type()),
})?;
@@ -255,7 +258,7 @@ impl ReadFormat {
let end = offsets[i + 1];
let rows_in_batch = end - start;
let dict_key = keys.value(*start);
let primary_key = pk_values.value(dict_key.into()).to_vec();
let primary_key = pk_values.value(dict_key as usize).to_vec();
let mut builder = BatchBuilder::new(primary_key);
builder
@@ -524,7 +527,7 @@ impl ReadFormat {
}
/// Compute offsets of different primary keys in the array.
fn primary_key_offsets(pk_dict_array: &DictionaryArray<UInt16Type>) -> Result<Vec<usize>> {
fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
if pk_dict_array.is_empty() {
return Ok(Vec::new());
}
@@ -549,7 +552,7 @@ fn primary_key_offsets(pk_dict_array: &DictionaryArray<UInt16Type>) -> Result<Ve
/// Creates a new array for specific `primary_key`.
fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
let keys = UInt16Array::from_value(0, num_rows);
let keys = UInt32Array::from_value(0, num_rows);
// Safety: The key index is valid.
Arc::new(DictionaryArray::new(keys, values))
@@ -627,7 +630,7 @@ mod tests {
Field::new(
"__primary_key",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt16),
Box::new(ArrowDataType::UInt32),
Box::new(ArrowDataType::Binary),
),
false,
@@ -674,15 +677,15 @@ mod tests {
assert_eq!(&expect, &array);
}
fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<DictionaryArray<UInt16Type>> {
fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
let values = Arc::new(BinaryArray::from_iter_values(
pk_row_nums.iter().map(|v| &v.0),
));
let mut keys = vec![];
for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
keys.extend(std::iter::repeat(index as u16).take(num_rows));
keys.extend(std::iter::repeat(index as u32).take(num_rows));
}
let keys = UInt16Array::from(keys);
let keys = UInt32Array::from(keys);
Arc::new(DictionaryArray::new(keys, values))
}

View File

@@ -22,6 +22,7 @@ use futures::TryStreamExt;
use object_store::util::join_path;
use object_store::{EntryMode, ObjectStore};
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::region_request::AffectedRows;
use store_api::storage::RegionId;
use tokio::time::sleep;
@@ -34,7 +35,10 @@ use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; // 5 minutes
const MAX_RETRY_TIMES: u64 = 288; // 24 hours (5m * 288)
impl<S> RegionWorkerLoop<S> {
impl<S> RegionWorkerLoop<S>
where
S: LogStore,
{
pub(crate) async fn handle_drop_request(
&mut self,
region_id: RegionId,
@@ -58,7 +62,7 @@ impl<S> RegionWorkerLoop<S> {
error!(e; "Failed to write the drop marker file for region {}", region_id);
// Sets the state back to writable. It's possible that the marker file has been written.
// We sets the state back to writable so we can retry the drop operation.
// We set the state back to writable so we can retry the drop operation.
region.switch_state_to_writable(RegionState::Dropping);
})?;
@@ -66,6 +70,15 @@ impl<S> RegionWorkerLoop<S> {
// Removes this region from region map to prevent other requests from accessing this region
self.regions.remove_region(region_id);
self.dropping_regions.insert_region(region.clone());
// Delete region data in WAL.
self.wal
.obsolete(
region_id,
region.version_control.current().last_entry_id,
&region.provider,
)
.await?;
// Notifies flush scheduler.
self.flush_scheduler.on_region_dropped(region_id);
// Notifies compaction scheduler.

View File

@@ -138,6 +138,8 @@ impl StatementExecutor {
Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
let req = to_copy_table_request(stmt, query_ctx.clone())?;
match req.direction {

View File

@@ -23,7 +23,7 @@ use snafu::{OptionExt, ResultExt};
use sql::ast::Ident;
use sql::statements::create::Partitions;
use sql::statements::show::{
ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowIndex, ShowKind,
ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind,
ShowTableStatus, ShowTables, ShowVariables, ShowViews,
};
use table::metadata::TableType;
@@ -163,6 +163,17 @@ impl StatementExecutor {
.context(ExecuteStatementSnafu)
}
#[tracing::instrument(skip_all)]
pub(super) async fn show_flows(
&self,
stmt: ShowFlows,
query_ctx: QueryContextRef,
) -> Result<Output> {
query::sql::show_flows(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
.await
.context(ExecuteStatementSnafu)
}
#[tracing::instrument(skip_all)]
pub async fn show_create_flow(
&self,

View File

@@ -19,8 +19,8 @@ use itertools::Itertools;
use crate::etl::field::{Field, Fields};
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_parse_strings, yaml_string, Processor, FIELDS_NAME,
FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME,
yaml_bool, yaml_field, yaml_fields, yaml_parse_string, yaml_parse_strings, yaml_string,
Processor, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME, PATTERN_NAME,
};
use crate::etl::value::{Map, Value};
@@ -559,6 +559,10 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DissectProcessor {
match key {
FIELD_NAME => processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?)),
FIELDS_NAME => processor.with_fields(yaml_fields(v, FIELDS_NAME)?),
PATTERN_NAME => {
let pattern: Pattern = yaml_parse_string(v, PATTERN_NAME)?;
processor.with_patterns(vec![pattern]);
}
PATTERNS_NAME => {
let patterns = yaml_parse_strings(v, PATTERNS_NAME)?;
processor.with_patterns(patterns);

View File

@@ -23,8 +23,8 @@ use regex::Regex;
use crate::etl::field::Fields;
use crate::etl::processor::{
yaml_bool, yaml_field, yaml_fields, yaml_strings, Field, Processor, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME,
yaml_bool, yaml_field, yaml_fields, yaml_string, yaml_strings, Field, Processor, FIELDS_NAME,
FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME,
};
use crate::etl::value::{Map, Value};
@@ -157,6 +157,9 @@ impl TryFrom<&yaml_rust::yaml::Hash> for RegexProcessor {
FIELDS_NAME => {
processor.with_fields(yaml_fields(v, FIELDS_NAME)?);
}
PATTERN_NAME => {
processor.try_with_patterns(vec![yaml_string(v, PATTERN_NAME)?])?;
}
PATTERNS_NAME => {
processor.try_with_patterns(yaml_strings(v, PATTERNS_NAME)?)?;
}
@@ -210,6 +213,35 @@ mod tests {
use crate::etl::processor::Processor;
use crate::etl::value::{Map, Value};
#[test]
fn test_simple_parse() {
let mut processor = RegexProcessor::default();
// single field (with prefix), multiple patterns
let f = ["a"].iter().map(|f| f.parse().unwrap()).collect();
processor.with_fields(Fields::new(f).unwrap());
let ar = "(?<ar>\\d)";
let patterns = [ar].iter().map(|p| p.to_string()).collect();
processor.try_with_patterns(patterns).unwrap();
let mut map = Map::default();
map.insert("a", Value::String("123".to_string()));
let processed_val = processor.exec_map(map).unwrap();
let v = Value::Map(Map {
values: vec![
("a_ar".to_string(), Value::String("1".to_string())),
("a".to_string(), Value::String("123".to_string())),
]
.into_iter()
.collect(),
});
assert_eq!(v, processed_val);
}
#[test]
fn test_process() {
let mut processor = RegexProcessor::default();

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use greptime_proto::v1::Rows;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType};
use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value};
/// test util function to parse and execute pipeline
@@ -28,3 +28,17 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
pipeline.exec(input_value).expect("failed to exec pipeline")
}
/// test util function to create column schema
pub fn make_column_schema(
column_name: String,
datatype: ColumnDataType,
semantic_type: SemanticType,
) -> ColumnSchema {
ColumnSchema {
column_name,
datatype: datatype.into(),
semantic_type: semantic_type.into(),
..Default::default()
}
}

View File

@@ -0,0 +1,113 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
use greptime_proto::v1::value::ValueData::StringValue;
use greptime_proto::v1::{ColumnDataType, SemanticType};
#[test]
fn test_dissect_pattern() {
let input_value_str = r#"
[
{
"str": "123 456"
}
]
"#;
let pipeline_yaml = r#"
processors:
- dissect:
field: str
pattern: "%{a} %{b}"
transform:
- fields:
- a
- b
type: string
"#;
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field),
common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("123".to_string()))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(StringValue("456".to_string()))
);
}
#[test]
fn test_dissect_patterns() {
let input_value_str = r#"
[
{
"str": "123 456"
}
]
"#;
let pipeline_yaml = r#"
processors:
- dissect:
field: str
patterns:
- "%{a} %{b}"
transform:
- fields:
- a
- b
type: string
"#;
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field),
common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("123".to_string()))
);
assert_eq!(
output.rows[0].values[1].value_data,
Some(StringValue("456".to_string()))
);
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use greptime_proto::v1::value::ValueData::TimestampMillisecondValue;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use greptime_proto::v1::{ColumnDataType, SemanticType};
mod common;
@@ -49,13 +49,11 @@ transform:
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![ColumnSchema {
column_name: "reqTimeSec".to_string(),
datatype: ColumnDataType::TimestampMillisecond.into(),
semantic_type: SemanticType::Timestamp.into(),
datatype_extension: None,
options: None,
}];
let expected_schema = vec![common::make_column_schema(
"reqTimeSec".to_string(),
ColumnDataType::TimestampMillisecond,
SemanticType::Timestamp,
)];
assert_eq!(output.schema, expected_schema);
assert_eq!(

View File

@@ -32,20 +32,16 @@ transform:
lazy_static! {
pub static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
ColumnSchema {
column_name: "join_test".to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: "greptime_timestamp".to_string(),
datatype: ColumnDataType::TimestampNanosecond.into(),
semantic_type: SemanticType::Timestamp.into(),
datatype_extension: None,
options: None,
},
common::make_column_schema(
"join_test".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use greptime_proto::v1::value::ValueData::{U16Value, U8Value};
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use greptime_proto::v1::{ColumnDataType, SemanticType};
mod common;
@@ -40,20 +40,16 @@ transform:
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
ColumnSchema {
column_name: "version".to_string(),
datatype: ColumnDataType::Uint8.into(),
semantic_type: SemanticType::Field.into(),
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: "greptime_timestamp".to_string(),
datatype: ColumnDataType::TimestampNanosecond.into(),
semantic_type: SemanticType::Timestamp.into(),
datatype_extension: None,
options: None,
},
common::make_column_schema(
"version".to_string(),
ColumnDataType::Uint8,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
@@ -85,20 +81,16 @@ transform:
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
ColumnSchema {
column_name: "version".to_string(),
datatype: ColumnDataType::Uint8.into(),
semantic_type: SemanticType::Field.into(),
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: "greptime_timestamp".to_string(),
datatype: ColumnDataType::TimestampNanosecond.into(),
semantic_type: SemanticType::Timestamp.into(),
datatype_extension: None,
options: None,
},
common::make_column_schema(
"version".to_string(),
ColumnDataType::Uint8,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
@@ -125,20 +117,16 @@ transform:
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
ColumnSchema {
column_name: "version".to_string(),
datatype: ColumnDataType::Uint8.into(),
semantic_type: SemanticType::Field.into(),
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: "greptime_timestamp".to_string(),
datatype: ColumnDataType::TimestampNanosecond.into(),
semantic_type: SemanticType::Timestamp.into(),
datatype_extension: None,
options: None,
},
common::make_column_schema(
"version".to_string(),
ColumnDataType::Uint8,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
@@ -176,27 +164,21 @@ transform:
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
ColumnSchema {
column_name: "version".to_string(),
datatype: ColumnDataType::Uint8.into(),
semantic_type: SemanticType::Field.into(),
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: "spec_version".to_string(),
datatype: ColumnDataType::Uint16.into(),
semantic_type: SemanticType::Field.into(),
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: "greptime_timestamp".to_string(),
datatype: ColumnDataType::TimestampNanosecond.into(),
semantic_type: SemanticType::Timestamp.into(),
datatype_extension: None,
options: None,
},
common::make_column_schema(
"version".to_string(),
ColumnDataType::Uint8,
SemanticType::Field,
),
common::make_column_schema(
"spec_version".to_string(),
ColumnDataType::Uint16,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);

109
src/pipeline/tests/regex.rs Normal file
View File

@@ -0,0 +1,109 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
use greptime_proto::v1::value::ValueData::StringValue;
use greptime_proto::v1::{ColumnDataType, SemanticType};
#[test]
fn test_regex_pattern() {
let input_value_str = r#"
[
{
"str": "123 456"
}
]
"#;
let pipeline_yaml = r#"
processors:
- regex:
fields:
- str
pattern: "(?<id>\\d+)"
transform:
- field: str_id
type: string
"#;
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
common::make_column_schema(
"str_id".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("123".to_string()))
);
}
#[test]
fn test_regex_patterns() {
let input_value_str = r#"
[
{
"str": "123 456"
}
]
"#;
let pipeline_yaml = r#"
processors:
- regex:
fields:
- str
patterns:
- "(?<id>\\d+)"
transform:
- field: str_id
type: string
"#;
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
let expected_schema = vec![
common::make_column_schema(
"str_id".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
];
assert_eq!(output.schema, expected_schema);
assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("123".to_string()))
);
}

View File

@@ -16,8 +16,13 @@ use common_base::Plugins;
use datanode::config::DatanodeOptions;
use datanode::error::Result;
pub async fn setup_datanode_plugins(_opts: &mut DatanodeOptions) -> Result<Plugins> {
Ok(Plugins::new())
#[allow(unused_variables)]
#[allow(unused_mut)]
pub async fn setup_datanode_plugins(
plugins: &mut Plugins,
dn_opts: &DatanodeOptions,
) -> Result<()> {
Ok(())
}
pub async fn start_datanode_plugins(_plugins: Plugins) -> Result<()> {

View File

@@ -18,16 +18,17 @@ use frontend::error::{IllegalAuthConfigSnafu, Result};
use frontend::frontend::FrontendOptions;
use snafu::ResultExt;
pub async fn setup_frontend_plugins(opts: &FrontendOptions) -> Result<Plugins> {
let plugins = Plugins::new();
if let Some(user_provider) = opts.user_provider.as_ref() {
#[allow(unused_mut)]
pub async fn setup_frontend_plugins(
plugins: &mut Plugins,
fe_opts: &FrontendOptions,
) -> Result<()> {
if let Some(user_provider) = fe_opts.user_provider.as_ref() {
let provider =
auth::user_provider_from_option(user_provider).context(IllegalAuthConfigSnafu)?;
plugins.insert::<UserProviderRef>(provider);
}
Ok(plugins)
Ok(())
}
pub async fn start_frontend_plugins(_plugins: Plugins) -> Result<()> {

View File

@@ -16,8 +16,12 @@ use common_base::Plugins;
use meta_srv::error::Result;
use meta_srv::metasrv::MetasrvOptions;
pub async fn setup_metasrv_plugins(_opts: &mut MetasrvOptions) -> Result<Plugins> {
Ok(Plugins::new())
#[allow(unused_variables)]
pub async fn setup_metasrv_plugins(
_plugins: &mut Plugins,
metasrv_opts: &MetasrvOptions,
) -> Result<()> {
Ok(())
}
pub async fn start_metasrv_plugins(_plugins: Plugins) -> Result<()> {

View File

@@ -18,7 +18,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use catalog::information_schema::{
columns, key_column_usage, schemata, tables, CHARACTER_SETS, COLLATIONS, COLUMNS,
columns, flows, key_column_usage, schemata, tables, CHARACTER_SETS, COLLATIONS, COLUMNS, FLOWS,
KEY_COLUMN_USAGE, SCHEMATA, TABLES, VIEWS,
};
use catalog::CatalogManagerRef;
@@ -54,8 +54,8 @@ use sql::ast::Ident;
use sql::parser::ParserContext;
use sql::statements::create::{CreateFlow, CreateView, Partitions};
use sql::statements::show::{
ShowColumns, ShowDatabases, ShowIndex, ShowKind, ShowTableStatus, ShowTables, ShowVariables,
ShowViews,
ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowTableStatus, ShowTables,
ShowVariables, ShowViews,
};
use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
@@ -71,6 +71,7 @@ const SCHEMAS_COLUMN: &str = "Database";
const OPTIONS_COLUMN: &str = "Options";
const TABLES_COLUMN: &str = "Tables";
const VIEWS_COLUMN: &str = "Views";
const FLOWS_COLUMN: &str = "Flows";
const FIELD_COLUMN: &str = "Field";
const TABLE_TYPE_COLUMN: &str = "Table_type";
const COLUMN_NAME_COLUMN: &str = "Column";
@@ -763,6 +764,33 @@ pub async fn show_views(
.await
}
/// Execute [`ShowFlows`] statement and return the [`Output`] if success.
pub async fn show_flows(
stmt: ShowFlows,
query_engine: &QueryEngineRef,
catalog_manager: &CatalogManagerRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)];
let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))];
let like_field = Some(flows::FLOW_NAME);
let sort = vec![col(flows::FLOW_NAME).sort(true, true)];
query_from_information_schema_table(
query_engine,
catalog_manager,
query_ctx,
FLOWS,
vec![],
projects,
filters,
like_field,
sort,
stmt.kind,
)
.await
}
pub fn show_create_flow(
flow_name: ObjectName,
flow_val: FlowInfoValue,

View File

@@ -21,8 +21,8 @@ use crate::error::{
};
use crate::parser::ParserContext;
use crate::statements::show::{
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowIndex,
ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowFlows,
ShowIndex, ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
};
use crate::statements::statement::Statement;
@@ -46,6 +46,8 @@ impl<'a> ParserContext<'a> {
}
} else if self.consume_token("VIEWS") {
self.parse_show_views()
} else if self.consume_token("FLOWS") {
self.parse_show_flows()
} else if self.matches_keyword(Keyword::CHARSET) {
self.parser.next_token();
Ok(Statement::ShowCharset(self.parse_show_kind()?))
@@ -442,7 +444,7 @@ impl<'a> ParserContext<'a> {
}));
}
// SHOW VIEWS [in | FROM] [DATABASE]
// SHOW FLOWS [in | FROM] [DATABASE]
Token::Word(w) => match w.keyword {
Keyword::IN | Keyword::FROM => self.parse_db_name()?,
_ => None,
@@ -454,6 +456,28 @@ impl<'a> ParserContext<'a> {
Ok(Statement::ShowViews(ShowViews { kind, database }))
}
fn parse_show_flows(&mut self) -> Result<Statement> {
let database = match self.parser.peek_token().token {
Token::EOF | Token::SemiColon => {
return Ok(Statement::ShowFlows(ShowFlows {
kind: ShowKind::All,
database: None,
}));
}
// SHOW FLOWS [in | FROM] [DATABASE]
Token::Word(w) => match w.keyword {
Keyword::IN | Keyword::FROM => self.parse_db_name()?,
_ => None,
},
_ => None,
};
let kind = self.parse_show_kind()?;
Ok(Statement::ShowFlows(ShowFlows { kind, database }))
}
}
#[cfg(test)]
@@ -1000,4 +1024,38 @@ mod tests {
);
assert_eq!(sql, stmts[0].to_string());
}
#[test]
pub fn test_show_flows() {
let sql = "SHOW FLOWS";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
let stmts = result.unwrap();
assert_eq!(1, stmts.len());
assert_eq!(
stmts[0],
Statement::ShowFlows(ShowFlows {
kind: ShowKind::All,
database: None,
})
);
assert_eq!(sql, stmts[0].to_string());
}
#[test]
pub fn test_show_flows_in_db() {
let sql = "SHOW FLOWS IN d1";
let result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
let stmts = result.unwrap();
assert_eq!(1, stmts.len());
assert_eq!(
stmts[0],
Statement::ShowFlows(ShowFlows {
kind: ShowKind::All,
database: Some("d1".to_string()),
})
);
assert_eq!(sql, stmts[0].to_string());
}
}

View File

@@ -187,6 +187,25 @@ impl Display for ShowCreateFlow {
}
}
/// SQL structure for `SHOW FLOWS`.
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
pub struct ShowFlows {
pub kind: ShowKind,
pub database: Option<String>,
}
impl Display for ShowFlows {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SHOW FLOWS")?;
if let Some(database) = &self.database {
write!(f, " IN {database}")?;
}
format_kind!(self, f);
Ok(())
}
}
/// SQL structure for `SHOW CREATE VIEW`.
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
pub struct ShowCreateView {
@@ -535,6 +554,49 @@ SHOW VIEWS"#,
}
}
#[test]
fn test_display_show_flows() {
let sql = r"show flows in d1;";
let stmts: Vec<Statement> =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, stmts.len());
assert_matches!(&stmts[0], Statement::ShowFlows { .. });
match &stmts[0] {
Statement::ShowFlows(show) => {
let new_sql = format!("\n{}", show);
assert_eq!(
r#"
SHOW FLOWS IN d1"#,
&new_sql
);
}
_ => {
unreachable!();
}
}
let sql = r"show flows;";
let stmts: Vec<Statement> =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(1, stmts.len());
assert_matches!(&stmts[0], Statement::ShowFlows { .. });
match &stmts[0] {
Statement::ShowFlows(show) => {
let new_sql = format!("\n{}", show);
assert_eq!(
r#"
SHOW FLOWS"#,
&new_sql
);
}
_ => {
unreachable!("{:?}", &stmts[0]);
}
}
}
#[test]
fn test_display_show_databases() {
let sql = r"show databases;";

View File

@@ -31,8 +31,8 @@ use crate::statements::insert::Insert;
use crate::statements::query::Query;
use crate::statements::set_variables::SetVariables;
use crate::statements::show::{
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowIndex,
ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowFlows,
ShowIndex, ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
};
use crate::statements::tql::Tql;
use crate::statements::truncate::TruncateTable;
@@ -87,6 +87,8 @@ pub enum Statement {
ShowCreateTable(ShowCreateTable),
// SHOW CREATE FLOW
ShowCreateFlow(ShowCreateFlow),
/// SHOW FLOWS
ShowFlows(ShowFlows),
// SHOW CREATE VIEW
ShowCreateView(ShowCreateView),
// SHOW STATUS
@@ -133,6 +135,7 @@ impl Display for Statement {
Statement::ShowIndex(s) => s.fmt(f),
Statement::ShowCreateTable(s) => s.fmt(f),
Statement::ShowCreateFlow(s) => s.fmt(f),
Statement::ShowFlows(s) => s.fmt(f),
Statement::ShowCreateView(s) => s.fmt(f),
Statement::ShowViews(s) => s.fmt(f),
Statement::ShowStatus(s) => s.fmt(f),

View File

@@ -30,4 +30,5 @@ tokio.workspace = true
[dev-dependencies]
async-stream.workspace = true
common-meta.workspace = true
serde_json.workspace = true

View File

@@ -45,10 +45,48 @@ pub fn region_dir(path: &str, region_id: RegionId) -> String {
)
}
/// get_storage_path returns the storage path from the region_dir. It will always return the storage path if the region_dir is valid, otherwise None.
/// The storage path is constructed from the catalog and schema, which are generated by `common_meta::ddl::utils::region_storage_path`.
/// We can extract the catalog and schema from the region_dir by following example:
/// ```
/// use common_meta::ddl::utils::get_catalog_and_schema;
///
/// fn catalog_and_schema(region_dir: &str, region_id: RegionId) -> Option<(String, String)> {
/// get_catalog_and_schema(&get_storage_path(region_dir, region_id)?)
/// }
/// ```
pub fn get_storage_path(region_dir: &str, region_id: RegionId) -> Option<String> {
if !region_dir.starts_with(DATA_DIR) {
return None;
}
// For example, if region_dir is "data/my_catalog/my_schema/42/42_0000000001/", the parts will be '42/42_0000000001'.
let parts = format!(
"{}/{}",
region_id.table_id(),
region_name(region_id.table_id(), region_id.region_number())
);
// Ignore the last '/'. The original path will be like "${DATA_DIR}${catalog}/${schema}".
let pos = region_dir.rfind(&parts)? - 1;
if pos < DATA_DIR.len() {
return None;
}
Some(region_dir[DATA_DIR.len()..pos].to_string())
}
#[cfg(test)]
mod tests {
use common_meta::ddl::utils::{get_catalog_and_schema, region_storage_path};
use super::*;
fn catalog_and_schema(region_dir: &str, region_id: RegionId) -> Option<(String, String)> {
get_catalog_and_schema(&get_storage_path(region_dir, region_id)?)
}
#[test]
fn test_region_dir() {
let region_id = RegionId::new(42, 1);
@@ -57,4 +95,32 @@ mod tests {
"data/my_catalog/my_schema/42/42_0000000001/"
);
}
#[test]
fn test_get_catalog_and_schema_from_region_dir() {
let tests = [
(RegionId::new(42, 1), "my_catalog", "my_schema"),
(RegionId::new(1234, 1), "my_catalog_1234", "my_schema_1234"),
(RegionId::new(5678, 1), "my_catalog_5678", "my_schema"),
(RegionId::new(5678, 1), "my_catalog", "my_schema_5678"),
];
for (region_id, test_catalog, test_schema) in tests.iter() {
let region_dir = region_dir(
region_storage_path(test_catalog, test_schema).as_str(),
*region_id,
);
let (catalog, schema) = catalog_and_schema(&region_dir, *region_id).unwrap();
assert_eq!(catalog, *test_catalog);
assert_eq!(schema, *test_schema);
}
}
#[test]
fn test_get_catalog_and_schema_from_invalid_region_dir() {
assert_eq!(
catalog_and_schema("invalid_data", RegionId::new(42, 1)),
None
);
}
}

View File

@@ -13,6 +13,18 @@ create table out_num_cnt (
Affected Rows: 0
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
+-----------+---------------+-----------------+
| flow_name | table_catalog | flow_definition |
+-----------+---------------+-----------------+
+-----------+---------------+-----------------+
SHOW FLOWS;
++
++
CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10;
Affected Rows: 0
@@ -27,10 +39,38 @@ SHOW CREATE FLOW filter_numbers;
| | AS SELECT number FROM numbers_input WHERE number > 10 |
+----------------+-------------------------------------------------------+
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
+----------------+---------------+----------------------------------------------------+
| flow_name | table_catalog | flow_definition |
+----------------+---------------+----------------------------------------------------+
| filter_numbers | greptime | SELECT number FROM numbers_input WHERE number > 10 |
+----------------+---------------+----------------------------------------------------+
SHOW FLOWS;
+----------------+
| Flows |
+----------------+
| filter_numbers |
+----------------+
drop flow filter_numbers;
Affected Rows: 0
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
+-----------+---------------+-----------------+
| flow_name | table_catalog | flow_definition |
+-----------+---------------+-----------------+
+-----------+---------------+-----------------+
SHOW FLOWS;
++
++
drop table out_num_cnt;
Affected Rows: 0

View File

@@ -8,12 +8,25 @@ create table out_num_cnt (
number INT,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX);
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
SHOW FLOWS;
CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10;
SHOW CREATE FLOW filter_numbers;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
SHOW FLOWS;
drop flow filter_numbers;
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
SHOW FLOWS;
drop table out_num_cnt;
drop table numbers_input;

View File

@@ -41,6 +41,7 @@ SHOW TABLES;
| engines |
| events |
| files |
| flows |
| global_status |
| key_column_usage |
| optimizer_trace |
@@ -86,6 +87,7 @@ SHOW FULL TABLES;
| engines | LOCAL TEMPORARY |
| events | LOCAL TEMPORARY |
| files | LOCAL TEMPORARY |
| flows | LOCAL TEMPORARY |
| global_status | LOCAL TEMPORARY |
| key_column_usage | LOCAL TEMPORARY |
| optimizer_trace | LOCAL TEMPORARY |
@@ -125,6 +127,7 @@ SHOW TABLE STATUS;
|engines||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|events||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|files||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|flows||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|global_status||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|key_column_usage||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|optimizer_trace||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||

View File

@@ -26,6 +26,7 @@ order by table_schema, table_name;
|greptime|information_schema|engines|LOCALTEMPORARY|5|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|greptime|information_schema|events|LOCALTEMPORARY|13|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|greptime|information_schema|files|LOCALTEMPORARY|14|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|greptime|information_schema|flows|LOCALTEMPORARY|33|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|greptime|information_schema|global_status|LOCALTEMPORARY|25|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|greptime|information_schema|key_column_usage|LOCALTEMPORARY|16|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|greptime|information_schema|optimizer_trace|LOCALTEMPORARY|17|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
@@ -185,6 +186,16 @@ select * from information_schema.columns order by table_schema, table_name, colu
| greptime | information_schema | files | update_count | 13 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
| greptime | information_schema | files | update_time | 34 | | | | | 3 | | | | | select,insert | | DateTime | datetime | FIELD | | No | datetime | | |
| greptime | information_schema | files | version | 25 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | flows | comment | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | expire_after | 6 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | Yes | bigint | | |
| greptime | information_schema | flows | flow_definition | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | flows | flow_id | 2 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
| greptime | information_schema | flows | flow_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | flows | flownode_ids | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | options | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | sink_table_name | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | flows | source_table_ids | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
| greptime | information_schema | flows | table_catalog | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | global_status | variable_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | global_status | variable_value | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | information_schema | key_column_usage | column_name | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |

View File

@@ -83,6 +83,7 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE;
|greptime|information_schema|engines|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|greptime|information_schema|events|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|greptime|information_schema|files|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|greptime|information_schema|flows|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|greptime|information_schema|global_status|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|greptime|information_schema|key_column_usage|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|greptime|public|numbers|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID|test_engine|ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|