mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 05:12:54 +00:00
Compare commits
10 Commits
v0.9.0
...
v0.10.0-ni
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ae2a6674e | ||
|
|
c8cf3b1677 | ||
|
|
7aae19aa8b | ||
|
|
b90267dd80 | ||
|
|
9fa9156bde | ||
|
|
ce900e850a | ||
|
|
5274c5a407 | ||
|
|
0b13ac6e16 | ||
|
|
8ab6136d1c | ||
|
|
e39f49fe56 |
15
.coderabbit.yaml
Normal file
15
.coderabbit.yaml
Normal file
@@ -0,0 +1,15 @@
|
||||
# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json
|
||||
language: "en-US"
|
||||
early_access: false
|
||||
reviews:
|
||||
profile: "chill"
|
||||
request_changes_workflow: false
|
||||
high_level_summary: true
|
||||
poem: true
|
||||
review_status: true
|
||||
collapse_walkthrough: false
|
||||
auto_review:
|
||||
enabled: false
|
||||
drafts: false
|
||||
chat:
|
||||
auto_reply: true
|
||||
@@ -2,7 +2,7 @@ name: Setup Etcd cluster
|
||||
description: Deploy Etcd cluster on Kubernetes
|
||||
inputs:
|
||||
etcd-replicas:
|
||||
default: 3
|
||||
default: 1
|
||||
description: "Etcd replicas"
|
||||
namespace:
|
||||
default: "etcd-cluster"
|
||||
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -10899,6 +10899,7 @@ dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-meta",
|
||||
"common-recordbatch",
|
||||
"common-time",
|
||||
"common-wal",
|
||||
|
||||
@@ -57,6 +57,31 @@ pub enum Error {
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to list flows in catalog {catalog}"))]
|
||||
ListFlows {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
catalog: String,
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Flow info not found: {flow_name} in catalog {catalog_name}"))]
|
||||
FlowInfoNotFound {
|
||||
flow_name: String,
|
||||
catalog_name: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Can't convert value to json, input={input}"))]
|
||||
Json {
|
||||
input: String,
|
||||
#[snafu(source)]
|
||||
error: serde_json::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to re-compile script due to internal error"))]
|
||||
CompileScriptInternal {
|
||||
#[snafu(implicit)]
|
||||
@@ -254,12 +279,15 @@ impl ErrorExt for Error {
|
||||
| Error::FindPartitions { .. }
|
||||
| Error::FindRegionRoutes { .. }
|
||||
| Error::CacheNotFound { .. }
|
||||
| Error::CastManager { .. } => StatusCode::Unexpected,
|
||||
| Error::CastManager { .. }
|
||||
| Error::Json { .. } => StatusCode::Unexpected,
|
||||
|
||||
Error::ViewPlanColumnsChanged { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::ViewInfoNotFound { .. } => StatusCode::TableNotFound,
|
||||
|
||||
Error::FlowInfoNotFound { .. } => StatusCode::FlowNotFound,
|
||||
|
||||
Error::SystemCatalog { .. } => StatusCode::StorageUnavailable,
|
||||
|
||||
Error::UpgradeWeakCatalogManagerRef { .. } => StatusCode::Internal,
|
||||
@@ -270,7 +298,8 @@ impl ErrorExt for Error {
|
||||
Error::ListCatalogs { source, .. }
|
||||
| Error::ListNodes { source, .. }
|
||||
| Error::ListSchemas { source, .. }
|
||||
| Error::ListTables { source, .. } => source.status_code(),
|
||||
| Error::ListTables { source, .. }
|
||||
| Error::ListFlows { source, .. } => source.status_code(),
|
||||
|
||||
Error::CreateTable { source, .. } => source.status_code(),
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ use common_config::Mode;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache::{LayeredCacheRegistryRef, ViewInfoCacheRef};
|
||||
use common_meta::key::catalog_name::CatalogNameKey;
|
||||
use common_meta::key::flow::FlowMetadataManager;
|
||||
use common_meta::key::schema_name::SchemaNameKey;
|
||||
use common_meta::key::table_info::TableInfoValue;
|
||||
use common_meta::key::table_name::TableNameKey;
|
||||
@@ -85,7 +86,7 @@ impl KvBackendCatalogManager {
|
||||
.get()
|
||||
.expect("Failed to get table_route_cache"),
|
||||
)),
|
||||
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
|
||||
table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())),
|
||||
system_catalog: SystemCatalog {
|
||||
catalog_manager: me.clone(),
|
||||
catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
|
||||
@@ -93,11 +94,13 @@ impl KvBackendCatalogManager {
|
||||
information_schema_provider: Arc::new(InformationSchemaProvider::new(
|
||||
DEFAULT_CATALOG_NAME.to_string(),
|
||||
me.clone(),
|
||||
Arc::new(FlowMetadataManager::new(backend.clone())),
|
||||
)),
|
||||
pg_catalog_provider: Arc::new(PGCatalogProvider::new(
|
||||
DEFAULT_CATALOG_NAME.to_string(),
|
||||
me.clone(),
|
||||
)),
|
||||
backend,
|
||||
},
|
||||
cache_registry,
|
||||
})
|
||||
@@ -313,6 +316,7 @@ struct SystemCatalog {
|
||||
// system_schema_provier for default catalog
|
||||
information_schema_provider: Arc<InformationSchemaProvider>,
|
||||
pg_catalog_provider: Arc<PGCatalogProvider>,
|
||||
backend: KvBackendRef,
|
||||
}
|
||||
|
||||
impl SystemCatalog {
|
||||
@@ -358,6 +362,7 @@ impl SystemCatalog {
|
||||
Arc::new(InformationSchemaProvider::new(
|
||||
catalog.to_string(),
|
||||
self.catalog_manager.clone(),
|
||||
Arc::new(FlowMetadataManager::new(self.backend.clone())),
|
||||
))
|
||||
});
|
||||
information_schema_provider.table(table_name)
|
||||
|
||||
@@ -23,6 +23,8 @@ use common_catalog::consts::{
|
||||
DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME, DEFAULT_SCHEMA_NAME,
|
||||
INFORMATION_SCHEMA_NAME, PG_CATALOG_NAME,
|
||||
};
|
||||
use common_meta::key::flow::FlowMetadataManager;
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use futures_util::stream::BoxStream;
|
||||
use snafu::OptionExt;
|
||||
use table::TableRef;
|
||||
@@ -298,6 +300,7 @@ impl MemoryCatalogManager {
|
||||
let information_schema_provider = InformationSchemaProvider::new(
|
||||
catalog,
|
||||
Arc::downgrade(self) as Weak<dyn CatalogManager>,
|
||||
Arc::new(FlowMetadataManager::new(Arc::new(MemoryKvBackend::new()))),
|
||||
);
|
||||
let information_schema = information_schema_provider.tables().clone();
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
mod cluster_info;
|
||||
pub mod columns;
|
||||
pub mod flows;
|
||||
mod information_memory_table;
|
||||
pub mod key_column_usage;
|
||||
mod partitions;
|
||||
@@ -31,6 +32,7 @@ use std::collections::HashMap;
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, INFORMATION_SCHEMA_NAME};
|
||||
use common_meta::key::flow::FlowMetadataManager;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use lazy_static::lazy_static;
|
||||
@@ -46,6 +48,7 @@ use self::columns::InformationSchemaColumns;
|
||||
use super::{SystemSchemaProviderInner, SystemTable, SystemTableRef};
|
||||
use crate::error::Result;
|
||||
use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
|
||||
use crate::system_schema::information_schema::flows::InformationSchemaFlows;
|
||||
use crate::system_schema::information_schema::information_memory_table::get_schema_columns;
|
||||
use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
|
||||
use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
|
||||
@@ -104,6 +107,7 @@ macro_rules! setup_memory_table {
|
||||
pub struct InformationSchemaProvider {
|
||||
catalog_name: String,
|
||||
catalog_manager: Weak<dyn CatalogManager>,
|
||||
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||
tables: HashMap<String, TableRef>,
|
||||
}
|
||||
|
||||
@@ -182,16 +186,25 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
)) as _),
|
||||
FLOWS => Some(Arc::new(InformationSchemaFlows::new(
|
||||
self.catalog_name.clone(),
|
||||
self.flow_metadata_manager.clone(),
|
||||
)) as _),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InformationSchemaProvider {
|
||||
pub fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
|
||||
pub fn new(
|
||||
catalog_name: String,
|
||||
catalog_manager: Weak<dyn CatalogManager>,
|
||||
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||
) -> Self {
|
||||
let mut provider = Self {
|
||||
catalog_name,
|
||||
catalog_manager,
|
||||
flow_metadata_manager,
|
||||
tables: HashMap::new(),
|
||||
};
|
||||
|
||||
@@ -238,6 +251,7 @@ impl InformationSchemaProvider {
|
||||
TABLE_CONSTRAINTS.to_string(),
|
||||
self.build_table(TABLE_CONSTRAINTS).unwrap(),
|
||||
);
|
||||
tables.insert(FLOWS.to_string(), self.build_table(FLOWS).unwrap());
|
||||
|
||||
// Add memory tables
|
||||
for name in MEMORY_TABLES.iter() {
|
||||
|
||||
305
src/catalog/src/system_schema/information_schema/flows.rs
Normal file
305
src/catalog/src/system_schema/information_schema/flows.rs
Normal file
@@ -0,0 +1,305 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_catalog::consts::INFORMATION_SCHEMA_FLOW_TABLE_ID;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::key::flow::flow_info::FlowInfoValue;
|
||||
use common_meta::key::flow::FlowMetadataManager;
|
||||
use common_meta::key::FlowId;
|
||||
use common_recordbatch::adapter::RecordBatchStreamAdapter;
|
||||
use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
|
||||
use datafusion::execution::TaskContext;
|
||||
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
|
||||
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
|
||||
use datatypes::prelude::ConcreteDataType as CDT;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt32VectorBuilder, VectorRef};
|
||||
use futures::TryStreamExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::{ScanRequest, TableId};
|
||||
|
||||
use crate::error::{
|
||||
CreateRecordBatchSnafu, FlowInfoNotFoundSnafu, InternalSnafu, JsonSnafu, ListFlowsSnafu, Result,
|
||||
};
|
||||
use crate::information_schema::{Predicates, FLOWS};
|
||||
use crate::system_schema::information_schema::InformationTable;
|
||||
|
||||
const INIT_CAPACITY: usize = 42;
|
||||
|
||||
// rows of information_schema.flows
|
||||
// pk is (flow_name, flow_id, table_catalog)
|
||||
pub const FLOW_NAME: &str = "flow_name";
|
||||
pub const FLOW_ID: &str = "flow_id";
|
||||
pub const TABLE_CATALOG: &str = "table_catalog";
|
||||
pub const FLOW_DEFINITION: &str = "flow_definition";
|
||||
pub const COMMENT: &str = "comment";
|
||||
pub const EXPIRE_AFTER: &str = "expire_after";
|
||||
pub const SOURCE_TABLE_IDS: &str = "source_table_ids";
|
||||
pub const SINK_TABLE_NAME: &str = "sink_table_name";
|
||||
pub const FLOWNODE_IDS: &str = "flownode_ids";
|
||||
pub const OPTIONS: &str = "options";
|
||||
|
||||
/// The `information_schema.flows` to provides information about flows in databases.
|
||||
pub(super) struct InformationSchemaFlows {
|
||||
schema: SchemaRef,
|
||||
catalog_name: String,
|
||||
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||
}
|
||||
|
||||
impl InformationSchemaFlows {
|
||||
pub(super) fn new(
|
||||
catalog_name: String,
|
||||
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||
) -> Self {
|
||||
Self {
|
||||
schema: Self::schema(),
|
||||
catalog_name,
|
||||
flow_metadata_manager,
|
||||
}
|
||||
}
|
||||
|
||||
/// for complex fields(including [`SOURCE_TABLE_IDS`], [`FLOWNODE_IDS`] and [`OPTIONS`]), it will be serialized to json string for now
|
||||
/// TODO(discord9): use a better way to store complex fields like json type
|
||||
pub(crate) fn schema() -> SchemaRef {
|
||||
Arc::new(Schema::new(
|
||||
vec![
|
||||
(FLOW_NAME, CDT::string_datatype(), false),
|
||||
(FLOW_ID, CDT::uint32_datatype(), false),
|
||||
(TABLE_CATALOG, CDT::string_datatype(), false),
|
||||
(FLOW_DEFINITION, CDT::string_datatype(), false),
|
||||
(COMMENT, CDT::string_datatype(), true),
|
||||
(EXPIRE_AFTER, CDT::int64_datatype(), true),
|
||||
(SOURCE_TABLE_IDS, CDT::string_datatype(), true),
|
||||
(SINK_TABLE_NAME, CDT::string_datatype(), false),
|
||||
(FLOWNODE_IDS, CDT::string_datatype(), true),
|
||||
(OPTIONS, CDT::string_datatype(), true),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|(name, ty, nullable)| ColumnSchema::new(name, ty, nullable))
|
||||
.collect(),
|
||||
))
|
||||
}
|
||||
|
||||
fn builder(&self) -> InformationSchemaFlowsBuilder {
|
||||
InformationSchemaFlowsBuilder::new(
|
||||
self.schema.clone(),
|
||||
self.catalog_name.clone(),
|
||||
&self.flow_metadata_manager,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl InformationTable for InformationSchemaFlows {
|
||||
fn table_id(&self) -> TableId {
|
||||
INFORMATION_SCHEMA_FLOW_TABLE_ID
|
||||
}
|
||||
|
||||
fn table_name(&self) -> &'static str {
|
||||
FLOWS
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
|
||||
let schema = self.schema.arrow_schema().clone();
|
||||
let mut builder = self.builder();
|
||||
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
|
||||
schema,
|
||||
futures::stream::once(async move {
|
||||
builder
|
||||
.make_flows(Some(request))
|
||||
.await
|
||||
.map(|x| x.into_df_record_batch())
|
||||
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
|
||||
}),
|
||||
));
|
||||
Ok(Box::pin(
|
||||
RecordBatchStreamAdapter::try_new(stream)
|
||||
.map_err(BoxedError::new)
|
||||
.context(InternalSnafu)?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds the `information_schema.FLOWS` table row by row
|
||||
///
|
||||
/// columns are based on [`FlowInfoValue`]
|
||||
struct InformationSchemaFlowsBuilder {
|
||||
schema: SchemaRef,
|
||||
catalog_name: String,
|
||||
flow_metadata_manager: Arc<FlowMetadataManager>,
|
||||
|
||||
flow_names: StringVectorBuilder,
|
||||
flow_ids: UInt32VectorBuilder,
|
||||
table_catalogs: StringVectorBuilder,
|
||||
raw_sqls: StringVectorBuilder,
|
||||
comments: StringVectorBuilder,
|
||||
expire_afters: Int64VectorBuilder,
|
||||
source_table_id_groups: StringVectorBuilder,
|
||||
sink_table_names: StringVectorBuilder,
|
||||
flownode_id_groups: StringVectorBuilder,
|
||||
option_groups: StringVectorBuilder,
|
||||
}
|
||||
|
||||
impl InformationSchemaFlowsBuilder {
|
||||
fn new(
|
||||
schema: SchemaRef,
|
||||
catalog_name: String,
|
||||
flow_metadata_manager: &Arc<FlowMetadataManager>,
|
||||
) -> Self {
|
||||
Self {
|
||||
schema,
|
||||
catalog_name,
|
||||
flow_metadata_manager: flow_metadata_manager.clone(),
|
||||
|
||||
flow_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
flow_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
table_catalogs: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
raw_sqls: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
comments: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
expire_afters: Int64VectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
source_table_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
sink_table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
flownode_id_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
option_groups: StringVectorBuilder::with_capacity(INIT_CAPACITY),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct the `information_schema.flows` virtual table
|
||||
async fn make_flows(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
|
||||
let catalog_name = self.catalog_name.clone();
|
||||
let predicates = Predicates::from_scan_request(&request);
|
||||
|
||||
let flow_info_manager = self.flow_metadata_manager.clone();
|
||||
|
||||
// TODO(discord9): use `AsyncIterator` once it's stable-ish
|
||||
let mut stream = flow_info_manager
|
||||
.flow_name_manager()
|
||||
.flow_names(&catalog_name)
|
||||
.await;
|
||||
|
||||
while let Some((flow_name, flow_id)) = stream
|
||||
.try_next()
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ListFlowsSnafu {
|
||||
catalog: &catalog_name,
|
||||
})?
|
||||
{
|
||||
let flow_info = flow_info_manager
|
||||
.flow_info_manager()
|
||||
.get(flow_id.flow_id())
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(InternalSnafu)?
|
||||
.context(FlowInfoNotFoundSnafu {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
flow_name: flow_name.to_string(),
|
||||
})?;
|
||||
self.add_flow(&predicates, flow_id.flow_id(), flow_info)?;
|
||||
}
|
||||
|
||||
self.finish()
|
||||
}
|
||||
|
||||
fn add_flow(
|
||||
&mut self,
|
||||
predicates: &Predicates,
|
||||
flow_id: FlowId,
|
||||
flow_info: FlowInfoValue,
|
||||
) -> Result<()> {
|
||||
let row = [
|
||||
(FLOW_NAME, &Value::from(flow_info.flow_name().to_string())),
|
||||
(FLOW_ID, &Value::from(flow_id)),
|
||||
(
|
||||
TABLE_CATALOG,
|
||||
&Value::from(flow_info.catalog_name().to_string()),
|
||||
),
|
||||
];
|
||||
if !predicates.eval(&row) {
|
||||
return Ok(());
|
||||
}
|
||||
self.flow_names.push(Some(flow_info.flow_name()));
|
||||
self.flow_ids.push(Some(flow_id));
|
||||
self.table_catalogs.push(Some(flow_info.catalog_name()));
|
||||
self.raw_sqls.push(Some(flow_info.raw_sql()));
|
||||
self.comments.push(Some(flow_info.comment()));
|
||||
self.expire_afters.push(flow_info.expire_after());
|
||||
self.source_table_id_groups.push(Some(
|
||||
&serde_json::to_string(flow_info.source_table_ids()).context(JsonSnafu {
|
||||
input: format!("{:?}", flow_info.source_table_ids()),
|
||||
})?,
|
||||
));
|
||||
self.sink_table_names
|
||||
.push(Some(&flow_info.sink_table_name().to_string()));
|
||||
self.flownode_id_groups.push(Some(
|
||||
&serde_json::to_string(flow_info.flownode_ids()).context({
|
||||
JsonSnafu {
|
||||
input: format!("{:?}", flow_info.flownode_ids()),
|
||||
}
|
||||
})?,
|
||||
));
|
||||
self.option_groups
|
||||
.push(Some(&serde_json::to_string(flow_info.options()).context(
|
||||
JsonSnafu {
|
||||
input: format!("{:?}", flow_info.options()),
|
||||
},
|
||||
)?));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn finish(&mut self) -> Result<RecordBatch> {
|
||||
let columns: Vec<VectorRef> = vec![
|
||||
Arc::new(self.flow_names.finish()),
|
||||
Arc::new(self.flow_ids.finish()),
|
||||
Arc::new(self.table_catalogs.finish()),
|
||||
Arc::new(self.raw_sqls.finish()),
|
||||
Arc::new(self.comments.finish()),
|
||||
Arc::new(self.expire_afters.finish()),
|
||||
Arc::new(self.source_table_id_groups.finish()),
|
||||
Arc::new(self.sink_table_names.finish()),
|
||||
Arc::new(self.flownode_id_groups.finish()),
|
||||
Arc::new(self.option_groups.finish()),
|
||||
];
|
||||
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
impl DfPartitionStream for InformationSchemaFlows {
|
||||
fn schema(&self) -> &arrow_schema::SchemaRef {
|
||||
self.schema.arrow_schema()
|
||||
}
|
||||
|
||||
fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
|
||||
let schema: Arc<arrow_schema::Schema> = self.schema.arrow_schema().clone();
|
||||
let mut builder = self.builder();
|
||||
Box::pin(DfRecordBatchStreamAdapter::new(
|
||||
schema,
|
||||
futures::stream::once(async move {
|
||||
builder
|
||||
.make_flows(None)
|
||||
.await
|
||||
.map(|x| x.into_df_record_batch())
|
||||
.map_err(Into::into)
|
||||
}),
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -44,3 +44,4 @@ pub const REGION_PEERS: &str = "region_peers";
|
||||
pub const TABLE_CONSTRAINTS: &str = "table_constraints";
|
||||
pub const CLUSTER_INFO: &str = "cluster_info";
|
||||
pub const VIEWS: &str = "views";
|
||||
pub const FLOWS: &str = "flows";
|
||||
|
||||
@@ -46,7 +46,10 @@ enum ExportTarget {
|
||||
#[default]
|
||||
CreateTable,
|
||||
/// Corresponding to `EXPORT TABLE`
|
||||
#[deprecated(note = "Please use `DatabaseData` instead.")]
|
||||
TableData,
|
||||
/// Corresponding to `EXPORT DATABASE`
|
||||
DatabaseData,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Parser)]
|
||||
@@ -75,7 +78,17 @@ pub struct ExportCommand {
|
||||
#[clap(long, short = 't', value_enum)]
|
||||
target: ExportTarget,
|
||||
|
||||
/// basic authentication for connecting to the server
|
||||
/// A half-open time range: [start_time, end_time).
|
||||
/// The start of the time range (time-index column) for data export.
|
||||
#[clap(long)]
|
||||
start_time: Option<String>,
|
||||
|
||||
/// A half-open time range: [start_time, end_time).
|
||||
/// The end of the time range (time-index column) for data export.
|
||||
#[clap(long)]
|
||||
end_time: Option<String>,
|
||||
|
||||
/// The basic authentication for connecting to the server
|
||||
#[clap(long)]
|
||||
auth_basic: Option<String>,
|
||||
}
|
||||
@@ -99,6 +112,8 @@ impl ExportCommand {
|
||||
output_dir: self.output_dir.clone(),
|
||||
parallelism: self.export_jobs,
|
||||
target: self.target.clone(),
|
||||
start_time: self.start_time.clone(),
|
||||
end_time: self.end_time.clone(),
|
||||
auth_header,
|
||||
}),
|
||||
guard,
|
||||
@@ -113,6 +128,8 @@ pub struct Export {
|
||||
output_dir: String,
|
||||
parallelism: usize,
|
||||
target: ExportTarget,
|
||||
start_time: Option<String>,
|
||||
end_time: Option<String>,
|
||||
auth_header: Option<String>,
|
||||
}
|
||||
|
||||
@@ -167,7 +184,7 @@ impl Export {
|
||||
};
|
||||
let mut result = Vec::with_capacity(records.len());
|
||||
for value in records {
|
||||
let serde_json::Value::String(schema) = &value[0] else {
|
||||
let Value::String(schema) = &value[0] else {
|
||||
unreachable!()
|
||||
};
|
||||
if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
|
||||
@@ -256,7 +273,7 @@ impl Export {
|
||||
let Some(records) = result else {
|
||||
EmptyResultSnafu.fail()?
|
||||
};
|
||||
let serde_json::Value::String(create_table) = &records[0][1] else {
|
||||
let Value::String(create_table) = &records[0][1] else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
@@ -276,11 +293,13 @@ impl Export {
|
||||
let (metric_physical_tables, remaining_tables) =
|
||||
self.get_table_list(&catalog, &schema).await?;
|
||||
let table_count = metric_physical_tables.len() + remaining_tables.len();
|
||||
tokio::fs::create_dir_all(&self.output_dir)
|
||||
let output_dir = Path::new(&self.output_dir)
|
||||
.join(&catalog)
|
||||
.join(format!("{schema}/"));
|
||||
tokio::fs::create_dir_all(&output_dir)
|
||||
.await
|
||||
.context(FileIoSnafu)?;
|
||||
let output_file =
|
||||
Path::new(&self.output_dir).join(format!("{catalog}-{schema}.sql"));
|
||||
let output_file = Path::new(&output_dir).join("create_tables.sql");
|
||||
let mut file = File::create(output_file).await.context(FileIoSnafu)?;
|
||||
for (c, s, t) in metric_physical_tables.into_iter().chain(remaining_tables) {
|
||||
match self.show_create_table(&c, &s, &t).await {
|
||||
@@ -294,7 +313,12 @@ impl Export {
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("finished exporting {catalog}.{schema} with {table_count} tables",);
|
||||
|
||||
info!(
|
||||
"Finished exporting {catalog}.{schema} with {table_count} table schemas to path: {}",
|
||||
output_dir.to_string_lossy()
|
||||
);
|
||||
|
||||
Ok::<(), Error>(())
|
||||
});
|
||||
}
|
||||
@@ -409,14 +433,106 @@ impl Export {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn export_database_data(&self) -> Result<()> {
|
||||
let timer = Instant::now();
|
||||
let semaphore = Arc::new(Semaphore::new(self.parallelism));
|
||||
let db_names = self.iter_db_names().await?;
|
||||
let db_count = db_names.len();
|
||||
let mut tasks = Vec::with_capacity(db_names.len());
|
||||
for (catalog, schema) in db_names {
|
||||
let semaphore_moved = semaphore.clone();
|
||||
tasks.push(async move {
|
||||
let _permit = semaphore_moved.acquire().await.unwrap();
|
||||
let output_dir = Path::new(&self.output_dir)
|
||||
.join(&catalog)
|
||||
.join(format!("{schema}/"));
|
||||
tokio::fs::create_dir_all(&output_dir)
|
||||
.await
|
||||
.context(FileIoSnafu)?;
|
||||
|
||||
let with_options = match (&self.start_time, &self.end_time) {
|
||||
(Some(start_time), Some(end_time)) => {
|
||||
format!(
|
||||
"WITH (FORMAT='parquet', start_time='{}', end_time='{}')",
|
||||
start_time, end_time
|
||||
)
|
||||
}
|
||||
(Some(start_time), None) => {
|
||||
format!("WITH (FORMAT='parquet', start_time='{}')", start_time)
|
||||
}
|
||||
(None, Some(end_time)) => {
|
||||
format!("WITH (FORMAT='parquet', end_time='{}')", end_time)
|
||||
}
|
||||
(None, None) => "WITH (FORMAT='parquet')".to_string(),
|
||||
};
|
||||
|
||||
let sql = format!(
|
||||
r#"COPY DATABASE "{}"."{}" TO '{}' {};"#,
|
||||
catalog,
|
||||
schema,
|
||||
output_dir.to_str().unwrap(),
|
||||
with_options
|
||||
);
|
||||
|
||||
info!("Executing sql: {sql}");
|
||||
|
||||
self.sql(&sql).await?;
|
||||
|
||||
info!(
|
||||
"Finished exporting {catalog}.{schema} data into path: {}",
|
||||
output_dir.to_string_lossy()
|
||||
);
|
||||
|
||||
// The export copy from sql
|
||||
let copy_from_file = output_dir.join("copy_from.sql");
|
||||
let mut writer =
|
||||
BufWriter::new(File::create(copy_from_file).await.context(FileIoSnafu)?);
|
||||
let copy_database_from_sql = format!(
|
||||
r#"COPY DATABASE "{}"."{}" FROM '{}' WITH (FORMAT='parquet');"#,
|
||||
catalog,
|
||||
schema,
|
||||
output_dir.to_str().unwrap()
|
||||
);
|
||||
writer
|
||||
.write(copy_database_from_sql.as_bytes())
|
||||
.await
|
||||
.context(FileIoSnafu)?;
|
||||
writer.flush().await.context(FileIoSnafu)?;
|
||||
|
||||
info!("Finished exporting {catalog}.{schema} copy_from.sql");
|
||||
|
||||
Ok::<(), Error>(())
|
||||
})
|
||||
}
|
||||
|
||||
let success = futures::future::join_all(tasks)
|
||||
.await
|
||||
.into_iter()
|
||||
.filter(|r| match r {
|
||||
Ok(_) => true,
|
||||
Err(e) => {
|
||||
error!(e; "export database job failed");
|
||||
false
|
||||
}
|
||||
})
|
||||
.count();
|
||||
let elapsed = timer.elapsed();
|
||||
|
||||
info!("Success {success}/{db_count} jobs, costs: {:?}", elapsed);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
#[async_trait]
|
||||
impl Tool for Export {
|
||||
async fn do_work(&self) -> Result<()> {
|
||||
match self.target {
|
||||
ExportTarget::CreateTable => self.export_create_table().await,
|
||||
ExportTarget::TableData => self.export_table_data().await,
|
||||
ExportTarget::DatabaseData => self.export_database_data().await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -496,7 +612,9 @@ mod tests {
|
||||
|
||||
let output_file = output_dir
|
||||
.path()
|
||||
.join("greptime-cli.export.create_table.sql");
|
||||
.join("greptime")
|
||||
.join("cli.export.create_table")
|
||||
.join("create_tables.sql");
|
||||
let res = std::fs::read_to_string(output_file).unwrap();
|
||||
let expect = r#"CREATE TABLE IF NOT EXISTS "a.b.c" (
|
||||
"ts" TIMESTAMP(3) NOT NULL,
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::time::Duration;
|
||||
use async_trait::async_trait;
|
||||
use catalog::kvbackend::MetaKvBackend;
|
||||
use clap::Parser;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
use common_telemetry::{info, warn};
|
||||
@@ -271,8 +272,9 @@ impl StartCommand {
|
||||
info!("Datanode start command: {:#?}", self);
|
||||
info!("Datanode options: {:#?}", opts);
|
||||
|
||||
let mut opts = opts.component;
|
||||
let plugins = plugins::setup_datanode_plugins(&mut opts)
|
||||
let opts = opts.component;
|
||||
let mut plugins = Plugins::new();
|
||||
plugins::setup_datanode_plugins(&mut plugins, &opts)
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ use cache::{build_fundamental_cache_registry, with_default_composite_cache_regis
|
||||
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
|
||||
use clap::Parser;
|
||||
use client::client_manager::NodeClients;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use common_grpc::channel_manager::ChannelConfig;
|
||||
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
|
||||
@@ -245,7 +246,9 @@ impl StartCommand {
|
||||
opts.mode = Mode::Distributed;
|
||||
}
|
||||
|
||||
opts.user_provider.clone_from(&self.user_provider);
|
||||
if let Some(user_provider) = &self.user_provider {
|
||||
opts.user_provider = Some(user_provider.clone());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -264,9 +267,9 @@ impl StartCommand {
|
||||
info!("Frontend start command: {:#?}", self);
|
||||
info!("Frontend options: {:#?}", opts);
|
||||
|
||||
let mut opts = opts.component;
|
||||
#[allow(clippy::unnecessary_mut_passed)]
|
||||
let plugins = plugins::setup_frontend_plugins(&mut opts)
|
||||
let opts = opts.component;
|
||||
let mut plugins = Plugins::new();
|
||||
plugins::setup_frontend_plugins(&mut plugins, &opts)
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
@@ -458,7 +461,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_try_from_start_command_to_anymap() {
|
||||
let mut fe_opts = frontend::frontend::FrontendOptions {
|
||||
let fe_opts = frontend::frontend::FrontendOptions {
|
||||
http: HttpOptions {
|
||||
disable_dashboard: false,
|
||||
..Default::default()
|
||||
@@ -467,8 +470,10 @@ mod tests {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
#[allow(clippy::unnecessary_mut_passed)]
|
||||
let plugins = plugins::setup_frontend_plugins(&mut fe_opts).await.unwrap();
|
||||
let mut plugins = Plugins::new();
|
||||
plugins::setup_frontend_plugins(&mut plugins, &fe_opts)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let provider = plugins.get::<UserProviderRef>().unwrap();
|
||||
let result = provider
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use clap::Parser;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::logging::TracingOptions;
|
||||
@@ -238,8 +239,9 @@ impl StartCommand {
|
||||
info!("Metasrv start command: {:#?}", self);
|
||||
info!("Metasrv options: {:#?}", opts);
|
||||
|
||||
let mut opts = opts.component;
|
||||
let plugins = plugins::setup_metasrv_plugins(&mut opts)
|
||||
let opts = opts.component;
|
||||
let mut plugins = Plugins::new();
|
||||
plugins::setup_metasrv_plugins(&mut plugins, &opts)
|
||||
.await
|
||||
.context(StartMetaServerSnafu)?;
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ use async_trait::async_trait;
|
||||
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
|
||||
use catalog::kvbackend::KvBackendCatalogManager;
|
||||
use clap::Parser;
|
||||
use common_base::Plugins;
|
||||
use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
|
||||
use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
|
||||
use common_error::ext::BoxedError;
|
||||
@@ -396,7 +397,9 @@ impl StartCommand {
|
||||
opts.influxdb.enable = self.influxdb_enable;
|
||||
}
|
||||
|
||||
opts.user_provider.clone_from(&self.user_provider);
|
||||
if let Some(user_provider) = &self.user_provider {
|
||||
opts.user_provider = Some(user_provider.clone());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -418,14 +421,18 @@ impl StartCommand {
|
||||
info!("Standalone start command: {:#?}", self);
|
||||
info!("Standalone options: {opts:#?}");
|
||||
|
||||
let mut plugins = Plugins::new();
|
||||
let opts = opts.component;
|
||||
let mut fe_opts = opts.frontend_options();
|
||||
#[allow(clippy::unnecessary_mut_passed)]
|
||||
let fe_plugins = plugins::setup_frontend_plugins(&mut fe_opts) // mut ref is MUST, DO NOT change it
|
||||
let fe_opts = opts.frontend_options();
|
||||
let dn_opts = opts.datanode_options();
|
||||
|
||||
plugins::setup_frontend_plugins(&mut plugins, &fe_opts)
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
let dn_opts = opts.datanode_options();
|
||||
plugins::setup_datanode_plugins(&mut plugins, &dn_opts)
|
||||
.await
|
||||
.context(StartDatanodeSnafu)?;
|
||||
|
||||
set_default_timezone(fe_opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?;
|
||||
|
||||
@@ -464,7 +471,7 @@ impl StartCommand {
|
||||
let table_metadata_manager =
|
||||
Self::create_table_metadata_manager(kv_backend.clone()).await?;
|
||||
|
||||
let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone())
|
||||
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone())
|
||||
.with_kv_backend(kv_backend.clone())
|
||||
.build()
|
||||
.await
|
||||
@@ -472,7 +479,7 @@ impl StartCommand {
|
||||
|
||||
let flow_builder = FlownodeBuilder::new(
|
||||
Default::default(),
|
||||
fe_plugins.clone(),
|
||||
plugins.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
catalog_manager.clone(),
|
||||
);
|
||||
@@ -533,7 +540,7 @@ impl StartCommand {
|
||||
node_manager.clone(),
|
||||
ddl_task_executor.clone(),
|
||||
)
|
||||
.with_plugin(fe_plugins.clone())
|
||||
.with_plugin(plugins.clone())
|
||||
.try_build()
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
@@ -554,7 +561,7 @@ impl StartCommand {
|
||||
|
||||
let (tx, _rx) = broadcast::channel(1);
|
||||
|
||||
let servers = Services::new(fe_opts, Arc::new(frontend.clone()), fe_plugins)
|
||||
let servers = Services::new(fe_opts, Arc::new(frontend.clone()), plugins)
|
||||
.build()
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
@@ -636,13 +643,15 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_try_from_start_command_to_anymap() {
|
||||
let mut fe_opts = FrontendOptions {
|
||||
let fe_opts = FrontendOptions {
|
||||
user_provider: Some("static_user_provider:cmd:test=test".to_string()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
#[allow(clippy::unnecessary_mut_passed)]
|
||||
let plugins = plugins::setup_frontend_plugins(&mut fe_opts).await.unwrap();
|
||||
let mut plugins = Plugins::new();
|
||||
plugins::setup_frontend_plugins(&mut plugins, &fe_opts)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let provider = plugins.get::<UserProviderRef>().unwrap();
|
||||
let result = provider
|
||||
|
||||
@@ -96,6 +96,8 @@ pub const INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID: u32 = 30;
|
||||
pub const INFORMATION_SCHEMA_CLUSTER_INFO_TABLE_ID: u32 = 31;
|
||||
/// id for information_schema.VIEWS
|
||||
pub const INFORMATION_SCHEMA_VIEW_TABLE_ID: u32 = 32;
|
||||
/// id for information_schema.FLOWS
|
||||
pub const INFORMATION_SCHEMA_FLOW_TABLE_ID: u32 = 33;
|
||||
/// ----- End of information_schema tables -----
|
||||
|
||||
/// ----- Begin of pg_catalog tables -----
|
||||
|
||||
@@ -55,6 +55,12 @@ pub fn region_storage_path(catalog: &str, schema: &str) -> String {
|
||||
format!("{}/{}", catalog, schema)
|
||||
}
|
||||
|
||||
/// Extracts catalog and schema from the path that created by [region_storage_path].
|
||||
pub fn get_catalog_and_schema(path: &str) -> Option<(String, String)> {
|
||||
let mut split = path.split('/');
|
||||
Some((split.next()?.to_string(), split.next()?.to_string()))
|
||||
}
|
||||
|
||||
pub async fn check_and_get_physical_table_id(
|
||||
table_metadata_manager: &TableMetadataManagerRef,
|
||||
tasks: &[CreateTableTask],
|
||||
@@ -145,3 +151,18 @@ pub fn convert_region_routes_to_detecting_regions(
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_get_catalog_and_schema() {
|
||||
let test_catalog = "my_catalog";
|
||||
let test_schema = "my_schema";
|
||||
let path = region_storage_path(test_catalog, test_schema);
|
||||
let (catalog, schema) = get_catalog_and_schema(&path).unwrap();
|
||||
assert_eq!(catalog, test_catalog);
|
||||
assert_eq!(schema, test_schema);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS;
|
||||
pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
|
||||
|
||||
/// The lease seconds of metasrv leader.
|
||||
pub const META_LEASE_SECS: u64 = 3;
|
||||
pub const META_LEASE_SECS: u64 = 5;
|
||||
|
||||
/// In a lease, there are two opportunities for renewal.
|
||||
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;
|
||||
|
||||
@@ -142,6 +142,10 @@ impl FlowInfoValue {
|
||||
&self.source_table_ids
|
||||
}
|
||||
|
||||
pub fn catalog_name(&self) -> &String {
|
||||
&self.catalog_name
|
||||
}
|
||||
|
||||
pub fn flow_name(&self) -> &String {
|
||||
&self.flow_name
|
||||
}
|
||||
@@ -161,6 +165,10 @@ impl FlowInfoValue {
|
||||
pub fn comment(&self) -> &String {
|
||||
&self.comment
|
||||
}
|
||||
|
||||
pub fn options(&self) -> &HashMap<String, String> {
|
||||
&self.options
|
||||
}
|
||||
}
|
||||
|
||||
pub type FlowInfoManagerRef = Arc<FlowInfoManager>;
|
||||
|
||||
@@ -12,6 +12,9 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::stream::BoxStream;
|
||||
use lazy_static::lazy_static;
|
||||
use regex::Regex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -20,9 +23,14 @@ use snafu::OptionExt;
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::flow::FlowScoped;
|
||||
use crate::key::txn_helper::TxnOpGetResponseSet;
|
||||
use crate::key::{DeserializedValueWithBytes, FlowId, MetaKey, TableMetaValue, NAME_PATTERN};
|
||||
use crate::key::{
|
||||
BytesAdapter, DeserializedValueWithBytes, FlowId, MetaKey, TableMetaValue, NAME_PATTERN,
|
||||
};
|
||||
use crate::kv_backend::txn::Txn;
|
||||
use crate::kv_backend::KvBackendRef;
|
||||
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
|
||||
use crate::rpc::store::RangeRequest;
|
||||
use crate::rpc::KeyValue;
|
||||
|
||||
const FLOW_NAME_KEY_PREFIX: &str = "name";
|
||||
|
||||
@@ -38,6 +46,7 @@ lazy_static! {
|
||||
/// The layout: `__flow/name/{catalog_name}/{flow_name}`.
|
||||
pub struct FlowNameKey<'a>(FlowScoped<FlowNameKeyInner<'a>>);
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl<'a> FlowNameKey<'a> {
|
||||
/// Returns the [FlowNameKey]
|
||||
pub fn new(catalog: &'a str, flow_name: &'a str) -> FlowNameKey<'a> {
|
||||
@@ -45,13 +54,22 @@ impl<'a> FlowNameKey<'a> {
|
||||
FlowNameKey(FlowScoped::new(inner))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn range_start_key(catalog: &str) -> Vec<u8> {
|
||||
let inner = BytesAdapter::from(Self::prefix(catalog).into_bytes());
|
||||
|
||||
FlowScoped::new(inner).to_bytes()
|
||||
}
|
||||
|
||||
/// Return `name/{catalog}/` as prefix
|
||||
pub fn prefix(catalog: &str) -> String {
|
||||
format!("{}/{}/", FLOW_NAME_KEY_PREFIX, catalog)
|
||||
}
|
||||
|
||||
/// Returns the catalog.
|
||||
pub fn catalog(&self) -> &str {
|
||||
self.0.catalog_name
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
/// Return the `flow_name`
|
||||
pub fn flow_name(&self) -> &str {
|
||||
self.0.flow_name
|
||||
@@ -140,6 +158,12 @@ impl FlowNameValue {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn flow_name_decoder(kv: KeyValue) -> Result<(String, FlowNameValue)> {
|
||||
let flow_name = FlowNameKey::from_bytes(&kv.key)?;
|
||||
let flow_id = FlowNameValue::try_from_raw_value(&kv.value)?;
|
||||
Ok((flow_name.flow_name().to_string(), flow_id))
|
||||
}
|
||||
|
||||
/// The manager of [FlowNameKey].
|
||||
pub struct FlowNameManager {
|
||||
kv_backend: KvBackendRef,
|
||||
@@ -162,6 +186,25 @@ impl FlowNameManager {
|
||||
.transpose()
|
||||
}
|
||||
|
||||
/// Return all flows' names and ids
|
||||
pub async fn flow_names(
|
||||
&self,
|
||||
catalog: &str,
|
||||
) -> BoxStream<'static, Result<(String, FlowNameValue)>> {
|
||||
let start_key = FlowNameKey::range_start_key(catalog);
|
||||
common_telemetry::debug!("flow_names: start_key: {:?}", start_key);
|
||||
let req = RangeRequest::new().with_prefix(start_key);
|
||||
|
||||
let stream = PaginationStream::new(
|
||||
self.kv_backend.clone(),
|
||||
req,
|
||||
DEFAULT_PAGE_SIZE,
|
||||
Arc::new(flow_name_decoder),
|
||||
);
|
||||
|
||||
Box::pin(stream)
|
||||
}
|
||||
|
||||
/// Returns true if the `flow` exists.
|
||||
pub async fn exists(&self, catalog: &str, flow: &str) -> Result<bool> {
|
||||
let key = FlowNameKey::new(catalog, flow);
|
||||
@@ -212,4 +255,11 @@ mod tests {
|
||||
assert_eq!(key.catalog(), "my_catalog");
|
||||
assert_eq!(key.flow_name(), "my_task");
|
||||
}
|
||||
#[test]
|
||||
fn test_key_start_range() {
|
||||
assert_eq!(
|
||||
b"__flow/name/greptime/".to_vec(),
|
||||
FlowNameKey::range_start_key("greptime")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ use common_error::ext::BoxedError;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_runtime::JoinHandle;
|
||||
use common_telemetry::logging::{LoggingOptions, TracingOptions};
|
||||
use common_telemetry::{debug, info};
|
||||
use common_telemetry::{debug, info, trace};
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::value::Value;
|
||||
use greptime_proto::v1;
|
||||
@@ -535,10 +535,10 @@ impl FlowWorkerManager {
|
||||
} else {
|
||||
(9 * avg_spd + cur_spd) / 10
|
||||
};
|
||||
debug!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
|
||||
trace!("avg_spd={} r/s, cur_spd={} r/s", avg_spd, cur_spd);
|
||||
let new_wait = BATCH_SIZE * 1000 / avg_spd.max(1); //in ms
|
||||
let new_wait = Duration::from_millis(new_wait as u64).min(default_interval);
|
||||
debug!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
|
||||
trace!("Wait for {} ms, row_cnt={}", new_wait.as_millis(), row_cnt);
|
||||
since_last_run = tokio::time::Instant::now();
|
||||
tokio::time::sleep(new_wait).await;
|
||||
}
|
||||
|
||||
@@ -436,7 +436,11 @@ impl InterThreadCallServer {
|
||||
|
||||
fn from_send_error<T>(err: mpsc::error::SendError<T>) -> Error {
|
||||
InternalSnafu {
|
||||
reason: format!("InterThreadCallServer resp failed: {}", err),
|
||||
// this `err` will simply display `channel closed`
|
||||
reason: format!(
|
||||
"Worker's receiver channel have been closed unexpected: {}",
|
||||
err
|
||||
),
|
||||
}
|
||||
.build()
|
||||
}
|
||||
|
||||
@@ -16,13 +16,14 @@
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet, VecDeque};
|
||||
|
||||
use common_telemetry::debug;
|
||||
use datatypes::value::Value;
|
||||
use itertools::Itertools;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{ensure, OptionExt};
|
||||
|
||||
use crate::error::{Error, InvalidQuerySnafu};
|
||||
use crate::expr::error::EvalError;
|
||||
use crate::expr::error::{EvalError, InternalSnafu};
|
||||
use crate::expr::{Id, InvalidArgumentSnafu, LocalId, ScalarExpr};
|
||||
use crate::repr::{self, value_to_internal_ts, Diff, Row};
|
||||
|
||||
@@ -709,6 +710,18 @@ impl MfpPlan {
|
||||
}
|
||||
|
||||
if Some(lower_bound) != upper_bound && !null_eval {
|
||||
if self.mfp.mfp.projection.iter().any(|c| values.len() <= *c) {
|
||||
debug!("values={:?}, mfp={:?}", &values, &self.mfp.mfp);
|
||||
let err = InternalSnafu {
|
||||
reason: format!(
|
||||
"Index out of bound for mfp={:?} and values={:?}",
|
||||
&self.mfp.mfp, &values
|
||||
),
|
||||
}
|
||||
.build();
|
||||
return ret_err(err);
|
||||
}
|
||||
// safety: already checked that `projection` is not out of bound
|
||||
let res_row = Row::pack(self.mfp.mfp.projection.iter().map(|c| values[*c].clone()));
|
||||
let upper_opt =
|
||||
upper_bound.map(|upper_bound| Ok((res_row.clone(), upper_bound, -diff)));
|
||||
|
||||
@@ -509,6 +509,9 @@ pub fn check_permission(
|
||||
Statement::ShowViews(stmt) => {
|
||||
validate_db_permission!(stmt, query_ctx);
|
||||
}
|
||||
Statement::ShowFlows(stmt) => {
|
||||
validate_db_permission!(stmt, query_ctx);
|
||||
}
|
||||
Statement::ShowStatus(_stmt) => {}
|
||||
Statement::DescribeTable(stmt) => {
|
||||
validate_param(stmt.name(), query_ctx)?;
|
||||
|
||||
@@ -18,11 +18,12 @@ use std::time::Duration;
|
||||
|
||||
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use etcd_client::{Client, GetOptions, PutOptions};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use etcd_client::{Client, GetOptions, LeaderKey, LeaseKeepAliveStream, LeaseKeeper, PutOptions};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
use tokio::time::{timeout, MissedTickBehavior};
|
||||
|
||||
use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
|
||||
use crate::error;
|
||||
@@ -231,44 +232,43 @@ impl Election for EtcdElection {
|
||||
.await
|
||||
.context(error::EtcdFailedSnafu)?;
|
||||
|
||||
let mut keep_alive_interval =
|
||||
tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS));
|
||||
let keep_lease_duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS);
|
||||
let mut keep_alive_interval = tokio::time::interval(keep_lease_duration);
|
||||
keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||
loop {
|
||||
let _ = keep_alive_interval.tick().await;
|
||||
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
|
||||
|
||||
if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
|
||||
if res.ttl() > 0 {
|
||||
// Only after a successful `keep_alive` is the leader considered official.
|
||||
if self
|
||||
.is_leader
|
||||
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
self.infancy.store(true, Ordering::Relaxed);
|
||||
|
||||
if let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::Elected(Arc::new(leader.clone())))
|
||||
{
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if self.is_leader.load(Ordering::Relaxed) {
|
||||
if let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::StepDown(Arc::new(leader.clone())))
|
||||
{
|
||||
error!("Failed to send leader change message, error: {e}");
|
||||
}
|
||||
}
|
||||
// The keep alive operation MUST be done in `META_KEEP_ALIVE_INTERVAL_SECS`.
|
||||
match timeout(
|
||||
keep_lease_duration,
|
||||
self.keep_alive(&mut keeper, &mut receiver, leader),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(())) => {
|
||||
let _ = keep_alive_interval.tick().await;
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
error!(err; "Failed to keep alive");
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Refresh lease timeout");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.is_leader.store(false, Ordering::Relaxed);
|
||||
if self
|
||||
.is_leader
|
||||
.compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
if let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::StepDown(Arc::new(leader.clone())))
|
||||
{
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -297,3 +297,40 @@ impl Election for EtcdElection {
|
||||
self.leader_watcher.subscribe()
|
||||
}
|
||||
}
|
||||
|
||||
impl EtcdElection {
|
||||
async fn keep_alive(
|
||||
&self,
|
||||
keeper: &mut LeaseKeeper,
|
||||
receiver: &mut LeaseKeepAliveStream,
|
||||
leader: &LeaderKey,
|
||||
) -> Result<()> {
|
||||
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
|
||||
if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
|
||||
ensure!(
|
||||
res.ttl() > 0,
|
||||
error::UnexpectedSnafu {
|
||||
violated: "Failed to refresh the lease",
|
||||
}
|
||||
);
|
||||
|
||||
// Only after a successful `keep_alive` is the leader considered official.
|
||||
if self
|
||||
.is_leader
|
||||
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
self.infancy.store(true, Ordering::Relaxed);
|
||||
|
||||
if let Err(e) = self
|
||||
.leader_watcher
|
||||
.send(LeaderChangeMessage::Elected(Arc::new(leader.clone())))
|
||||
{
|
||||
error!(e; "Failed to send leader change message");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -450,7 +450,7 @@ impl Metasrv {
|
||||
{
|
||||
let election = election.clone();
|
||||
let started = self.started.clone();
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
let _handle = common_runtime::spawn_write(async move {
|
||||
while started.load(Ordering::Relaxed) {
|
||||
let res = election.campaign().await;
|
||||
if let Err(e) = res {
|
||||
|
||||
@@ -22,11 +22,11 @@ use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
|
||||
use datatypes::arrow;
|
||||
use datatypes::arrow::array::{
|
||||
Array, ArrayRef, BinaryBuilder, DictionaryArray, RecordBatch, TimestampMicrosecondArray,
|
||||
TimestampMillisecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array,
|
||||
UInt8Array, UInt8Builder,
|
||||
TimestampMillisecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array,
|
||||
UInt8Builder,
|
||||
};
|
||||
use datatypes::arrow::compute::TakeOptions;
|
||||
use datatypes::arrow::datatypes::{DataType as ArrowDataType, SchemaRef, UInt16Type};
|
||||
use datatypes::arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
|
||||
use datatypes::arrow_array::BinaryArray;
|
||||
use datatypes::data_type::DataType;
|
||||
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
|
||||
@@ -40,6 +40,7 @@ use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu,
|
||||
use crate::memtable::key_values::KeyValuesRef;
|
||||
use crate::read::Batch;
|
||||
use crate::row_converter::{McmpRowCodec, RowCodec};
|
||||
use crate::sst::parquet::format::PrimaryKeyArray;
|
||||
use crate::sst::to_sst_arrow_schema;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -344,17 +345,17 @@ fn timestamp_array_to_iter(
|
||||
}
|
||||
|
||||
/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
|
||||
fn binary_array_to_dictionary(input: &BinaryArray) -> Result<DictionaryArray<UInt16Type>> {
|
||||
fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
|
||||
if input.is_empty() {
|
||||
return Ok(DictionaryArray::new(
|
||||
UInt16Array::from(Vec::<u16>::new()),
|
||||
UInt32Array::from(Vec::<u32>::new()),
|
||||
Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
|
||||
));
|
||||
}
|
||||
let mut keys = Vec::with_capacity(16);
|
||||
let mut values = BinaryBuilder::new();
|
||||
let mut prev: usize = 0;
|
||||
keys.push(prev as u16);
|
||||
keys.push(prev as u32);
|
||||
values.append_value(input.value(prev));
|
||||
|
||||
for current_bytes in input.iter().skip(1) {
|
||||
@@ -365,11 +366,11 @@ fn binary_array_to_dictionary(input: &BinaryArray) -> Result<DictionaryArray<UIn
|
||||
values.append_value(current_bytes);
|
||||
prev += 1;
|
||||
}
|
||||
keys.push(prev as u16);
|
||||
keys.push(prev as u32);
|
||||
}
|
||||
|
||||
Ok(DictionaryArray::new(
|
||||
UInt16Array::from(keys),
|
||||
UInt32Array::from(keys),
|
||||
Arc::new(values.finish()) as ArrayRef,
|
||||
))
|
||||
}
|
||||
@@ -387,7 +388,7 @@ mod tests {
|
||||
|
||||
fn check_binary_array_to_dictionary(
|
||||
input: &[&[u8]],
|
||||
expected_keys: &[u16],
|
||||
expected_keys: &[u32],
|
||||
expected_values: &[&[u8]],
|
||||
) {
|
||||
let input = BinaryArray::from_iter_values(input.iter());
|
||||
|
||||
@@ -69,7 +69,7 @@ fn internal_fields() -> [FieldRef; 3] {
|
||||
[
|
||||
Arc::new(Field::new_dictionary(
|
||||
PRIMARY_KEY_COLUMN_NAME,
|
||||
ArrowDataType::UInt16,
|
||||
ArrowDataType::UInt32,
|
||||
ArrowDataType::Binary,
|
||||
false,
|
||||
)),
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
//! Format to store in parquet.
|
||||
//!
|
||||
//! We store three internal columns in parquet:
|
||||
//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint16, binary)
|
||||
//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint32, binary)
|
||||
//! - `__sequence`, the sequence number of a row. Type: uint64
|
||||
//! - `__op_type`, the op type of the row. Type: uint8
|
||||
//!
|
||||
@@ -32,8 +32,8 @@ use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt16Array, UInt64Array};
|
||||
use datatypes::arrow::datatypes::{SchemaRef, UInt16Type};
|
||||
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array};
|
||||
use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::prelude::DataType;
|
||||
use datatypes::vectors::{Helper, Vector};
|
||||
@@ -50,6 +50,9 @@ use crate::read::{Batch, BatchBuilder, BatchColumn};
|
||||
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
|
||||
use crate::sst::to_sst_arrow_schema;
|
||||
|
||||
/// Arrow array type for the primary key dictionary.
|
||||
pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
|
||||
|
||||
/// Number of columns that have fixed positions.
|
||||
///
|
||||
/// Contains: time index and internal columns.
|
||||
@@ -230,7 +233,7 @@ impl ReadFormat {
|
||||
// Compute primary key offsets.
|
||||
let pk_dict_array = pk_array
|
||||
.as_any()
|
||||
.downcast_ref::<DictionaryArray<UInt16Type>>()
|
||||
.downcast_ref::<PrimaryKeyArray>()
|
||||
.with_context(|| InvalidRecordBatchSnafu {
|
||||
reason: format!("primary key array should not be {:?}", pk_array.data_type()),
|
||||
})?;
|
||||
@@ -255,7 +258,7 @@ impl ReadFormat {
|
||||
let end = offsets[i + 1];
|
||||
let rows_in_batch = end - start;
|
||||
let dict_key = keys.value(*start);
|
||||
let primary_key = pk_values.value(dict_key.into()).to_vec();
|
||||
let primary_key = pk_values.value(dict_key as usize).to_vec();
|
||||
|
||||
let mut builder = BatchBuilder::new(primary_key);
|
||||
builder
|
||||
@@ -524,7 +527,7 @@ impl ReadFormat {
|
||||
}
|
||||
|
||||
/// Compute offsets of different primary keys in the array.
|
||||
fn primary_key_offsets(pk_dict_array: &DictionaryArray<UInt16Type>) -> Result<Vec<usize>> {
|
||||
fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
|
||||
if pk_dict_array.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
@@ -549,7 +552,7 @@ fn primary_key_offsets(pk_dict_array: &DictionaryArray<UInt16Type>) -> Result<Ve
|
||||
/// Creates a new array for specific `primary_key`.
|
||||
fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
|
||||
let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
|
||||
let keys = UInt16Array::from_value(0, num_rows);
|
||||
let keys = UInt32Array::from_value(0, num_rows);
|
||||
|
||||
// Safety: The key index is valid.
|
||||
Arc::new(DictionaryArray::new(keys, values))
|
||||
@@ -627,7 +630,7 @@ mod tests {
|
||||
Field::new(
|
||||
"__primary_key",
|
||||
ArrowDataType::Dictionary(
|
||||
Box::new(ArrowDataType::UInt16),
|
||||
Box::new(ArrowDataType::UInt32),
|
||||
Box::new(ArrowDataType::Binary),
|
||||
),
|
||||
false,
|
||||
@@ -674,15 +677,15 @@ mod tests {
|
||||
assert_eq!(&expect, &array);
|
||||
}
|
||||
|
||||
fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<DictionaryArray<UInt16Type>> {
|
||||
fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
|
||||
let values = Arc::new(BinaryArray::from_iter_values(
|
||||
pk_row_nums.iter().map(|v| &v.0),
|
||||
));
|
||||
let mut keys = vec![];
|
||||
for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
|
||||
keys.extend(std::iter::repeat(index as u16).take(num_rows));
|
||||
keys.extend(std::iter::repeat(index as u32).take(num_rows));
|
||||
}
|
||||
let keys = UInt16Array::from(keys);
|
||||
let keys = UInt32Array::from(keys);
|
||||
Arc::new(DictionaryArray::new(keys, values))
|
||||
}
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ use futures::TryStreamExt;
|
||||
use object_store::util::join_path;
|
||||
use object_store::{EntryMode, ObjectStore};
|
||||
use snafu::ResultExt;
|
||||
use store_api::logstore::LogStore;
|
||||
use store_api::region_request::AffectedRows;
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::time::sleep;
|
||||
@@ -34,7 +35,10 @@ use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
|
||||
const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; // 5 minutes
|
||||
const MAX_RETRY_TIMES: u64 = 288; // 24 hours (5m * 288)
|
||||
|
||||
impl<S> RegionWorkerLoop<S> {
|
||||
impl<S> RegionWorkerLoop<S>
|
||||
where
|
||||
S: LogStore,
|
||||
{
|
||||
pub(crate) async fn handle_drop_request(
|
||||
&mut self,
|
||||
region_id: RegionId,
|
||||
@@ -58,7 +62,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
error!(e; "Failed to write the drop marker file for region {}", region_id);
|
||||
|
||||
// Sets the state back to writable. It's possible that the marker file has been written.
|
||||
// We sets the state back to writable so we can retry the drop operation.
|
||||
// We set the state back to writable so we can retry the drop operation.
|
||||
region.switch_state_to_writable(RegionState::Dropping);
|
||||
})?;
|
||||
|
||||
@@ -66,6 +70,15 @@ impl<S> RegionWorkerLoop<S> {
|
||||
// Removes this region from region map to prevent other requests from accessing this region
|
||||
self.regions.remove_region(region_id);
|
||||
self.dropping_regions.insert_region(region.clone());
|
||||
|
||||
// Delete region data in WAL.
|
||||
self.wal
|
||||
.obsolete(
|
||||
region_id,
|
||||
region.version_control.current().last_entry_id,
|
||||
®ion.provider,
|
||||
)
|
||||
.await?;
|
||||
// Notifies flush scheduler.
|
||||
self.flush_scheduler.on_region_dropped(region_id);
|
||||
// Notifies compaction scheduler.
|
||||
|
||||
@@ -138,6 +138,8 @@ impl StatementExecutor {
|
||||
|
||||
Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
|
||||
|
||||
Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
|
||||
|
||||
Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
|
||||
let req = to_copy_table_request(stmt, query_ctx.clone())?;
|
||||
match req.direction {
|
||||
|
||||
@@ -23,7 +23,7 @@ use snafu::{OptionExt, ResultExt};
|
||||
use sql::ast::Ident;
|
||||
use sql::statements::create::Partitions;
|
||||
use sql::statements::show::{
|
||||
ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowIndex, ShowKind,
|
||||
ShowColumns, ShowCreateFlow, ShowCreateView, ShowDatabases, ShowFlows, ShowIndex, ShowKind,
|
||||
ShowTableStatus, ShowTables, ShowVariables, ShowViews,
|
||||
};
|
||||
use table::metadata::TableType;
|
||||
@@ -163,6 +163,17 @@ impl StatementExecutor {
|
||||
.context(ExecuteStatementSnafu)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(super) async fn show_flows(
|
||||
&self,
|
||||
stmt: ShowFlows,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
query::sql::show_flows(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
|
||||
.await
|
||||
.context(ExecuteStatementSnafu)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn show_create_flow(
|
||||
&self,
|
||||
|
||||
@@ -19,8 +19,8 @@ use itertools::Itertools;
|
||||
|
||||
use crate::etl::field::{Field, Fields};
|
||||
use crate::etl::processor::{
|
||||
yaml_bool, yaml_field, yaml_fields, yaml_parse_strings, yaml_string, Processor, FIELDS_NAME,
|
||||
FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME,
|
||||
yaml_bool, yaml_field, yaml_fields, yaml_parse_string, yaml_parse_strings, yaml_string,
|
||||
Processor, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, PATTERNS_NAME, PATTERN_NAME,
|
||||
};
|
||||
use crate::etl::value::{Map, Value};
|
||||
|
||||
@@ -559,6 +559,10 @@ impl TryFrom<&yaml_rust::yaml::Hash> for DissectProcessor {
|
||||
match key {
|
||||
FIELD_NAME => processor.with_fields(Fields::one(yaml_field(v, FIELD_NAME)?)),
|
||||
FIELDS_NAME => processor.with_fields(yaml_fields(v, FIELDS_NAME)?),
|
||||
PATTERN_NAME => {
|
||||
let pattern: Pattern = yaml_parse_string(v, PATTERN_NAME)?;
|
||||
processor.with_patterns(vec![pattern]);
|
||||
}
|
||||
PATTERNS_NAME => {
|
||||
let patterns = yaml_parse_strings(v, PATTERNS_NAME)?;
|
||||
processor.with_patterns(patterns);
|
||||
|
||||
@@ -23,8 +23,8 @@ use regex::Regex;
|
||||
|
||||
use crate::etl::field::Fields;
|
||||
use crate::etl::processor::{
|
||||
yaml_bool, yaml_field, yaml_fields, yaml_strings, Field, Processor, FIELDS_NAME, FIELD_NAME,
|
||||
IGNORE_MISSING_NAME,
|
||||
yaml_bool, yaml_field, yaml_fields, yaml_string, yaml_strings, Field, Processor, FIELDS_NAME,
|
||||
FIELD_NAME, IGNORE_MISSING_NAME, PATTERN_NAME,
|
||||
};
|
||||
use crate::etl::value::{Map, Value};
|
||||
|
||||
@@ -157,6 +157,9 @@ impl TryFrom<&yaml_rust::yaml::Hash> for RegexProcessor {
|
||||
FIELDS_NAME => {
|
||||
processor.with_fields(yaml_fields(v, FIELDS_NAME)?);
|
||||
}
|
||||
PATTERN_NAME => {
|
||||
processor.try_with_patterns(vec![yaml_string(v, PATTERN_NAME)?])?;
|
||||
}
|
||||
PATTERNS_NAME => {
|
||||
processor.try_with_patterns(yaml_strings(v, PATTERNS_NAME)?)?;
|
||||
}
|
||||
@@ -210,6 +213,35 @@ mod tests {
|
||||
use crate::etl::processor::Processor;
|
||||
use crate::etl::value::{Map, Value};
|
||||
|
||||
#[test]
|
||||
fn test_simple_parse() {
|
||||
let mut processor = RegexProcessor::default();
|
||||
|
||||
// single field (with prefix), multiple patterns
|
||||
let f = ["a"].iter().map(|f| f.parse().unwrap()).collect();
|
||||
processor.with_fields(Fields::new(f).unwrap());
|
||||
|
||||
let ar = "(?<ar>\\d)";
|
||||
|
||||
let patterns = [ar].iter().map(|p| p.to_string()).collect();
|
||||
processor.try_with_patterns(patterns).unwrap();
|
||||
|
||||
let mut map = Map::default();
|
||||
map.insert("a", Value::String("123".to_string()));
|
||||
let processed_val = processor.exec_map(map).unwrap();
|
||||
|
||||
let v = Value::Map(Map {
|
||||
values: vec![
|
||||
("a_ar".to_string(), Value::String("1".to_string())),
|
||||
("a".to_string(), Value::String("123".to_string())),
|
||||
]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
});
|
||||
|
||||
assert_eq!(v, processed_val);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_process() {
|
||||
let mut processor = RegexProcessor::default();
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use greptime_proto::v1::Rows;
|
||||
use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType};
|
||||
use pipeline::{parse, Content, GreptimeTransformer, Pipeline, Value};
|
||||
|
||||
/// test util function to parse and execute pipeline
|
||||
@@ -28,3 +28,17 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
|
||||
|
||||
pipeline.exec(input_value).expect("failed to exec pipeline")
|
||||
}
|
||||
|
||||
/// test util function to create column schema
|
||||
pub fn make_column_schema(
|
||||
column_name: String,
|
||||
datatype: ColumnDataType,
|
||||
semantic_type: SemanticType,
|
||||
) -> ColumnSchema {
|
||||
ColumnSchema {
|
||||
column_name,
|
||||
datatype: datatype.into(),
|
||||
semantic_type: semantic_type.into(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
113
src/pipeline/tests/dissect.rs
Normal file
113
src/pipeline/tests/dissect.rs
Normal file
@@ -0,0 +1,113 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod common;
|
||||
|
||||
use greptime_proto::v1::value::ValueData::StringValue;
|
||||
use greptime_proto::v1::{ColumnDataType, SemanticType};
|
||||
|
||||
#[test]
|
||||
fn test_dissect_pattern() {
|
||||
let input_value_str = r#"
|
||||
[
|
||||
{
|
||||
"str": "123 456"
|
||||
}
|
||||
]
|
||||
"#;
|
||||
|
||||
let pipeline_yaml = r#"
|
||||
processors:
|
||||
- dissect:
|
||||
field: str
|
||||
pattern: "%{a} %{b}"
|
||||
|
||||
transform:
|
||||
- fields:
|
||||
- a
|
||||
- b
|
||||
type: string
|
||||
"#;
|
||||
|
||||
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
|
||||
|
||||
let expected_schema = vec![
|
||||
common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field),
|
||||
common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field),
|
||||
common::make_column_schema(
|
||||
"greptime_timestamp".to_string(),
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
SemanticType::Timestamp,
|
||||
),
|
||||
];
|
||||
|
||||
assert_eq!(output.schema, expected_schema);
|
||||
|
||||
assert_eq!(
|
||||
output.rows[0].values[0].value_data,
|
||||
Some(StringValue("123".to_string()))
|
||||
);
|
||||
assert_eq!(
|
||||
output.rows[0].values[1].value_data,
|
||||
Some(StringValue("456".to_string()))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dissect_patterns() {
|
||||
let input_value_str = r#"
|
||||
[
|
||||
{
|
||||
"str": "123 456"
|
||||
}
|
||||
]
|
||||
"#;
|
||||
|
||||
let pipeline_yaml = r#"
|
||||
processors:
|
||||
- dissect:
|
||||
field: str
|
||||
patterns:
|
||||
- "%{a} %{b}"
|
||||
|
||||
transform:
|
||||
- fields:
|
||||
- a
|
||||
- b
|
||||
type: string
|
||||
"#;
|
||||
|
||||
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
|
||||
|
||||
let expected_schema = vec![
|
||||
common::make_column_schema("a".to_string(), ColumnDataType::String, SemanticType::Field),
|
||||
common::make_column_schema("b".to_string(), ColumnDataType::String, SemanticType::Field),
|
||||
common::make_column_schema(
|
||||
"greptime_timestamp".to_string(),
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
SemanticType::Timestamp,
|
||||
),
|
||||
];
|
||||
|
||||
assert_eq!(output.schema, expected_schema);
|
||||
|
||||
assert_eq!(
|
||||
output.rows[0].values[0].value_data,
|
||||
Some(StringValue("123".to_string()))
|
||||
);
|
||||
assert_eq!(
|
||||
output.rows[0].values[1].value_data,
|
||||
Some(StringValue("456".to_string()))
|
||||
);
|
||||
}
|
||||
@@ -13,7 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use greptime_proto::v1::value::ValueData::TimestampMillisecondValue;
|
||||
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
|
||||
use greptime_proto::v1::{ColumnDataType, SemanticType};
|
||||
|
||||
mod common;
|
||||
|
||||
@@ -49,13 +49,11 @@ transform:
|
||||
|
||||
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
|
||||
|
||||
let expected_schema = vec![ColumnSchema {
|
||||
column_name: "reqTimeSec".to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond.into(),
|
||||
semantic_type: SemanticType::Timestamp.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
}];
|
||||
let expected_schema = vec![common::make_column_schema(
|
||||
"reqTimeSec".to_string(),
|
||||
ColumnDataType::TimestampMillisecond,
|
||||
SemanticType::Timestamp,
|
||||
)];
|
||||
|
||||
assert_eq!(output.schema, expected_schema);
|
||||
assert_eq!(
|
||||
|
||||
@@ -32,20 +32,16 @@ transform:
|
||||
|
||||
lazy_static! {
|
||||
pub static ref EXPECTED_SCHEMA: Vec<ColumnSchema> = vec![
|
||||
ColumnSchema {
|
||||
column_name: "join_test".to_string(),
|
||||
datatype: ColumnDataType::String.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "greptime_timestamp".to_string(),
|
||||
datatype: ColumnDataType::TimestampNanosecond.into(),
|
||||
semantic_type: SemanticType::Timestamp.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
common::make_column_schema(
|
||||
"join_test".to_string(),
|
||||
ColumnDataType::String,
|
||||
SemanticType::Field,
|
||||
),
|
||||
common::make_column_schema(
|
||||
"greptime_timestamp".to_string(),
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
SemanticType::Timestamp,
|
||||
),
|
||||
];
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use greptime_proto::v1::value::ValueData::{U16Value, U8Value};
|
||||
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
|
||||
use greptime_proto::v1::{ColumnDataType, SemanticType};
|
||||
|
||||
mod common;
|
||||
|
||||
@@ -40,20 +40,16 @@ transform:
|
||||
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
|
||||
|
||||
let expected_schema = vec![
|
||||
ColumnSchema {
|
||||
column_name: "version".to_string(),
|
||||
datatype: ColumnDataType::Uint8.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "greptime_timestamp".to_string(),
|
||||
datatype: ColumnDataType::TimestampNanosecond.into(),
|
||||
semantic_type: SemanticType::Timestamp.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
common::make_column_schema(
|
||||
"version".to_string(),
|
||||
ColumnDataType::Uint8,
|
||||
SemanticType::Field,
|
||||
),
|
||||
common::make_column_schema(
|
||||
"greptime_timestamp".to_string(),
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
SemanticType::Timestamp,
|
||||
),
|
||||
];
|
||||
|
||||
assert_eq!(output.schema, expected_schema);
|
||||
@@ -85,20 +81,16 @@ transform:
|
||||
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
|
||||
|
||||
let expected_schema = vec![
|
||||
ColumnSchema {
|
||||
column_name: "version".to_string(),
|
||||
datatype: ColumnDataType::Uint8.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "greptime_timestamp".to_string(),
|
||||
datatype: ColumnDataType::TimestampNanosecond.into(),
|
||||
semantic_type: SemanticType::Timestamp.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
common::make_column_schema(
|
||||
"version".to_string(),
|
||||
ColumnDataType::Uint8,
|
||||
SemanticType::Field,
|
||||
),
|
||||
common::make_column_schema(
|
||||
"greptime_timestamp".to_string(),
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
SemanticType::Timestamp,
|
||||
),
|
||||
];
|
||||
|
||||
assert_eq!(output.schema, expected_schema);
|
||||
@@ -125,20 +117,16 @@ transform:
|
||||
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
|
||||
|
||||
let expected_schema = vec![
|
||||
ColumnSchema {
|
||||
column_name: "version".to_string(),
|
||||
datatype: ColumnDataType::Uint8.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "greptime_timestamp".to_string(),
|
||||
datatype: ColumnDataType::TimestampNanosecond.into(),
|
||||
semantic_type: SemanticType::Timestamp.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
common::make_column_schema(
|
||||
"version".to_string(),
|
||||
ColumnDataType::Uint8,
|
||||
SemanticType::Field,
|
||||
),
|
||||
common::make_column_schema(
|
||||
"greptime_timestamp".to_string(),
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
SemanticType::Timestamp,
|
||||
),
|
||||
];
|
||||
|
||||
assert_eq!(output.schema, expected_schema);
|
||||
@@ -176,27 +164,21 @@ transform:
|
||||
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
|
||||
|
||||
let expected_schema = vec![
|
||||
ColumnSchema {
|
||||
column_name: "version".to_string(),
|
||||
datatype: ColumnDataType::Uint8.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "spec_version".to_string(),
|
||||
datatype: ColumnDataType::Uint16.into(),
|
||||
semantic_type: SemanticType::Field.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "greptime_timestamp".to_string(),
|
||||
datatype: ColumnDataType::TimestampNanosecond.into(),
|
||||
semantic_type: SemanticType::Timestamp.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
common::make_column_schema(
|
||||
"version".to_string(),
|
||||
ColumnDataType::Uint8,
|
||||
SemanticType::Field,
|
||||
),
|
||||
common::make_column_schema(
|
||||
"spec_version".to_string(),
|
||||
ColumnDataType::Uint16,
|
||||
SemanticType::Field,
|
||||
),
|
||||
common::make_column_schema(
|
||||
"greptime_timestamp".to_string(),
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
SemanticType::Timestamp,
|
||||
),
|
||||
];
|
||||
|
||||
assert_eq!(output.schema, expected_schema);
|
||||
|
||||
109
src/pipeline/tests/regex.rs
Normal file
109
src/pipeline/tests/regex.rs
Normal file
@@ -0,0 +1,109 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod common;
|
||||
|
||||
use greptime_proto::v1::value::ValueData::StringValue;
|
||||
use greptime_proto::v1::{ColumnDataType, SemanticType};
|
||||
|
||||
#[test]
|
||||
fn test_regex_pattern() {
|
||||
let input_value_str = r#"
|
||||
[
|
||||
{
|
||||
"str": "123 456"
|
||||
}
|
||||
]
|
||||
"#;
|
||||
|
||||
let pipeline_yaml = r#"
|
||||
processors:
|
||||
- regex:
|
||||
fields:
|
||||
- str
|
||||
pattern: "(?<id>\\d+)"
|
||||
|
||||
transform:
|
||||
- field: str_id
|
||||
type: string
|
||||
"#;
|
||||
|
||||
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
|
||||
|
||||
let expected_schema = vec![
|
||||
common::make_column_schema(
|
||||
"str_id".to_string(),
|
||||
ColumnDataType::String,
|
||||
SemanticType::Field,
|
||||
),
|
||||
common::make_column_schema(
|
||||
"greptime_timestamp".to_string(),
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
SemanticType::Timestamp,
|
||||
),
|
||||
];
|
||||
|
||||
assert_eq!(output.schema, expected_schema);
|
||||
|
||||
assert_eq!(
|
||||
output.rows[0].values[0].value_data,
|
||||
Some(StringValue("123".to_string()))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_regex_patterns() {
|
||||
let input_value_str = r#"
|
||||
[
|
||||
{
|
||||
"str": "123 456"
|
||||
}
|
||||
]
|
||||
"#;
|
||||
|
||||
let pipeline_yaml = r#"
|
||||
processors:
|
||||
- regex:
|
||||
fields:
|
||||
- str
|
||||
patterns:
|
||||
- "(?<id>\\d+)"
|
||||
|
||||
transform:
|
||||
- field: str_id
|
||||
type: string
|
||||
"#;
|
||||
|
||||
let output = common::parse_and_exec(input_value_str, pipeline_yaml);
|
||||
|
||||
let expected_schema = vec![
|
||||
common::make_column_schema(
|
||||
"str_id".to_string(),
|
||||
ColumnDataType::String,
|
||||
SemanticType::Field,
|
||||
),
|
||||
common::make_column_schema(
|
||||
"greptime_timestamp".to_string(),
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
SemanticType::Timestamp,
|
||||
),
|
||||
];
|
||||
|
||||
assert_eq!(output.schema, expected_schema);
|
||||
|
||||
assert_eq!(
|
||||
output.rows[0].values[0].value_data,
|
||||
Some(StringValue("123".to_string()))
|
||||
);
|
||||
}
|
||||
@@ -16,8 +16,13 @@ use common_base::Plugins;
|
||||
use datanode::config::DatanodeOptions;
|
||||
use datanode::error::Result;
|
||||
|
||||
pub async fn setup_datanode_plugins(_opts: &mut DatanodeOptions) -> Result<Plugins> {
|
||||
Ok(Plugins::new())
|
||||
#[allow(unused_variables)]
|
||||
#[allow(unused_mut)]
|
||||
pub async fn setup_datanode_plugins(
|
||||
plugins: &mut Plugins,
|
||||
dn_opts: &DatanodeOptions,
|
||||
) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start_datanode_plugins(_plugins: Plugins) -> Result<()> {
|
||||
|
||||
@@ -18,16 +18,17 @@ use frontend::error::{IllegalAuthConfigSnafu, Result};
|
||||
use frontend::frontend::FrontendOptions;
|
||||
use snafu::ResultExt;
|
||||
|
||||
pub async fn setup_frontend_plugins(opts: &FrontendOptions) -> Result<Plugins> {
|
||||
let plugins = Plugins::new();
|
||||
|
||||
if let Some(user_provider) = opts.user_provider.as_ref() {
|
||||
#[allow(unused_mut)]
|
||||
pub async fn setup_frontend_plugins(
|
||||
plugins: &mut Plugins,
|
||||
fe_opts: &FrontendOptions,
|
||||
) -> Result<()> {
|
||||
if let Some(user_provider) = fe_opts.user_provider.as_ref() {
|
||||
let provider =
|
||||
auth::user_provider_from_option(user_provider).context(IllegalAuthConfigSnafu)?;
|
||||
plugins.insert::<UserProviderRef>(provider);
|
||||
}
|
||||
|
||||
Ok(plugins)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start_frontend_plugins(_plugins: Plugins) -> Result<()> {
|
||||
|
||||
@@ -16,8 +16,12 @@ use common_base::Plugins;
|
||||
use meta_srv::error::Result;
|
||||
use meta_srv::metasrv::MetasrvOptions;
|
||||
|
||||
pub async fn setup_metasrv_plugins(_opts: &mut MetasrvOptions) -> Result<Plugins> {
|
||||
Ok(Plugins::new())
|
||||
#[allow(unused_variables)]
|
||||
pub async fn setup_metasrv_plugins(
|
||||
_plugins: &mut Plugins,
|
||||
metasrv_opts: &MetasrvOptions,
|
||||
) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start_metasrv_plugins(_plugins: Plugins) -> Result<()> {
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use catalog::information_schema::{
|
||||
columns, key_column_usage, schemata, tables, CHARACTER_SETS, COLLATIONS, COLUMNS,
|
||||
columns, flows, key_column_usage, schemata, tables, CHARACTER_SETS, COLLATIONS, COLUMNS, FLOWS,
|
||||
KEY_COLUMN_USAGE, SCHEMATA, TABLES, VIEWS,
|
||||
};
|
||||
use catalog::CatalogManagerRef;
|
||||
@@ -54,8 +54,8 @@ use sql::ast::Ident;
|
||||
use sql::parser::ParserContext;
|
||||
use sql::statements::create::{CreateFlow, CreateView, Partitions};
|
||||
use sql::statements::show::{
|
||||
ShowColumns, ShowDatabases, ShowIndex, ShowKind, ShowTableStatus, ShowTables, ShowVariables,
|
||||
ShowViews,
|
||||
ShowColumns, ShowDatabases, ShowFlows, ShowIndex, ShowKind, ShowTableStatus, ShowTables,
|
||||
ShowVariables, ShowViews,
|
||||
};
|
||||
use sql::statements::statement::Statement;
|
||||
use sqlparser::ast::ObjectName;
|
||||
@@ -71,6 +71,7 @@ const SCHEMAS_COLUMN: &str = "Database";
|
||||
const OPTIONS_COLUMN: &str = "Options";
|
||||
const TABLES_COLUMN: &str = "Tables";
|
||||
const VIEWS_COLUMN: &str = "Views";
|
||||
const FLOWS_COLUMN: &str = "Flows";
|
||||
const FIELD_COLUMN: &str = "Field";
|
||||
const TABLE_TYPE_COLUMN: &str = "Table_type";
|
||||
const COLUMN_NAME_COLUMN: &str = "Column";
|
||||
@@ -763,6 +764,33 @@ pub async fn show_views(
|
||||
.await
|
||||
}
|
||||
|
||||
/// Execute [`ShowFlows`] statement and return the [`Output`] if success.
|
||||
pub async fn show_flows(
|
||||
stmt: ShowFlows,
|
||||
query_engine: &QueryEngineRef,
|
||||
catalog_manager: &CatalogManagerRef,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
let projects = vec![(flows::FLOW_NAME, FLOWS_COLUMN)];
|
||||
let filters = vec![col(flows::TABLE_CATALOG).eq(lit(query_ctx.current_catalog()))];
|
||||
let like_field = Some(flows::FLOW_NAME);
|
||||
let sort = vec![col(flows::FLOW_NAME).sort(true, true)];
|
||||
|
||||
query_from_information_schema_table(
|
||||
query_engine,
|
||||
catalog_manager,
|
||||
query_ctx,
|
||||
FLOWS,
|
||||
vec![],
|
||||
projects,
|
||||
filters,
|
||||
like_field,
|
||||
sort,
|
||||
stmt.kind,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub fn show_create_flow(
|
||||
flow_name: ObjectName,
|
||||
flow_val: FlowInfoValue,
|
||||
|
||||
@@ -21,8 +21,8 @@ use crate::error::{
|
||||
};
|
||||
use crate::parser::ParserContext;
|
||||
use crate::statements::show::{
|
||||
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowIndex,
|
||||
ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
|
||||
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowFlows,
|
||||
ShowIndex, ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
|
||||
};
|
||||
use crate::statements::statement::Statement;
|
||||
|
||||
@@ -46,6 +46,8 @@ impl<'a> ParserContext<'a> {
|
||||
}
|
||||
} else if self.consume_token("VIEWS") {
|
||||
self.parse_show_views()
|
||||
} else if self.consume_token("FLOWS") {
|
||||
self.parse_show_flows()
|
||||
} else if self.matches_keyword(Keyword::CHARSET) {
|
||||
self.parser.next_token();
|
||||
Ok(Statement::ShowCharset(self.parse_show_kind()?))
|
||||
@@ -442,7 +444,7 @@ impl<'a> ParserContext<'a> {
|
||||
}));
|
||||
}
|
||||
|
||||
// SHOW VIEWS [in | FROM] [DATABASE]
|
||||
// SHOW FLOWS [in | FROM] [DATABASE]
|
||||
Token::Word(w) => match w.keyword {
|
||||
Keyword::IN | Keyword::FROM => self.parse_db_name()?,
|
||||
_ => None,
|
||||
@@ -454,6 +456,28 @@ impl<'a> ParserContext<'a> {
|
||||
|
||||
Ok(Statement::ShowViews(ShowViews { kind, database }))
|
||||
}
|
||||
|
||||
fn parse_show_flows(&mut self) -> Result<Statement> {
|
||||
let database = match self.parser.peek_token().token {
|
||||
Token::EOF | Token::SemiColon => {
|
||||
return Ok(Statement::ShowFlows(ShowFlows {
|
||||
kind: ShowKind::All,
|
||||
database: None,
|
||||
}));
|
||||
}
|
||||
|
||||
// SHOW FLOWS [in | FROM] [DATABASE]
|
||||
Token::Word(w) => match w.keyword {
|
||||
Keyword::IN | Keyword::FROM => self.parse_db_name()?,
|
||||
_ => None,
|
||||
},
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let kind = self.parse_show_kind()?;
|
||||
|
||||
Ok(Statement::ShowFlows(ShowFlows { kind, database }))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -1000,4 +1024,38 @@ mod tests {
|
||||
);
|
||||
assert_eq!(sql, stmts[0].to_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_show_flows() {
|
||||
let sql = "SHOW FLOWS";
|
||||
let result =
|
||||
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
|
||||
let stmts = result.unwrap();
|
||||
assert_eq!(1, stmts.len());
|
||||
assert_eq!(
|
||||
stmts[0],
|
||||
Statement::ShowFlows(ShowFlows {
|
||||
kind: ShowKind::All,
|
||||
database: None,
|
||||
})
|
||||
);
|
||||
assert_eq!(sql, stmts[0].to_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
pub fn test_show_flows_in_db() {
|
||||
let sql = "SHOW FLOWS IN d1";
|
||||
let result =
|
||||
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default());
|
||||
let stmts = result.unwrap();
|
||||
assert_eq!(1, stmts.len());
|
||||
assert_eq!(
|
||||
stmts[0],
|
||||
Statement::ShowFlows(ShowFlows {
|
||||
kind: ShowKind::All,
|
||||
database: Some("d1".to_string()),
|
||||
})
|
||||
);
|
||||
assert_eq!(sql, stmts[0].to_string());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,6 +187,25 @@ impl Display for ShowCreateFlow {
|
||||
}
|
||||
}
|
||||
|
||||
/// SQL structure for `SHOW FLOWS`.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
|
||||
pub struct ShowFlows {
|
||||
pub kind: ShowKind,
|
||||
pub database: Option<String>,
|
||||
}
|
||||
|
||||
impl Display for ShowFlows {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "SHOW FLOWS")?;
|
||||
if let Some(database) = &self.database {
|
||||
write!(f, " IN {database}")?;
|
||||
}
|
||||
format_kind!(self, f);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// SQL structure for `SHOW CREATE VIEW`.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
|
||||
pub struct ShowCreateView {
|
||||
@@ -535,6 +554,49 @@ SHOW VIEWS"#,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_display_show_flows() {
|
||||
let sql = r"show flows in d1;";
|
||||
let stmts: Vec<Statement> =
|
||||
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
|
||||
.unwrap();
|
||||
assert_eq!(1, stmts.len());
|
||||
assert_matches!(&stmts[0], Statement::ShowFlows { .. });
|
||||
match &stmts[0] {
|
||||
Statement::ShowFlows(show) => {
|
||||
let new_sql = format!("\n{}", show);
|
||||
assert_eq!(
|
||||
r#"
|
||||
SHOW FLOWS IN d1"#,
|
||||
&new_sql
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
unreachable!();
|
||||
}
|
||||
}
|
||||
|
||||
let sql = r"show flows;";
|
||||
let stmts: Vec<Statement> =
|
||||
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
|
||||
.unwrap();
|
||||
assert_eq!(1, stmts.len());
|
||||
assert_matches!(&stmts[0], Statement::ShowFlows { .. });
|
||||
match &stmts[0] {
|
||||
Statement::ShowFlows(show) => {
|
||||
let new_sql = format!("\n{}", show);
|
||||
assert_eq!(
|
||||
r#"
|
||||
SHOW FLOWS"#,
|
||||
&new_sql
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
unreachable!("{:?}", &stmts[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_display_show_databases() {
|
||||
let sql = r"show databases;";
|
||||
|
||||
@@ -31,8 +31,8 @@ use crate::statements::insert::Insert;
|
||||
use crate::statements::query::Query;
|
||||
use crate::statements::set_variables::SetVariables;
|
||||
use crate::statements::show::{
|
||||
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowIndex,
|
||||
ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
|
||||
ShowColumns, ShowCreateFlow, ShowCreateTable, ShowCreateView, ShowDatabases, ShowFlows,
|
||||
ShowIndex, ShowKind, ShowStatus, ShowTableStatus, ShowTables, ShowVariables, ShowViews,
|
||||
};
|
||||
use crate::statements::tql::Tql;
|
||||
use crate::statements::truncate::TruncateTable;
|
||||
@@ -87,6 +87,8 @@ pub enum Statement {
|
||||
ShowCreateTable(ShowCreateTable),
|
||||
// SHOW CREATE FLOW
|
||||
ShowCreateFlow(ShowCreateFlow),
|
||||
/// SHOW FLOWS
|
||||
ShowFlows(ShowFlows),
|
||||
// SHOW CREATE VIEW
|
||||
ShowCreateView(ShowCreateView),
|
||||
// SHOW STATUS
|
||||
@@ -133,6 +135,7 @@ impl Display for Statement {
|
||||
Statement::ShowIndex(s) => s.fmt(f),
|
||||
Statement::ShowCreateTable(s) => s.fmt(f),
|
||||
Statement::ShowCreateFlow(s) => s.fmt(f),
|
||||
Statement::ShowFlows(s) => s.fmt(f),
|
||||
Statement::ShowCreateView(s) => s.fmt(f),
|
||||
Statement::ShowViews(s) => s.fmt(f),
|
||||
Statement::ShowStatus(s) => s.fmt(f),
|
||||
|
||||
@@ -30,4 +30,5 @@ tokio.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
async-stream.workspace = true
|
||||
common-meta.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -45,10 +45,48 @@ pub fn region_dir(path: &str, region_id: RegionId) -> String {
|
||||
)
|
||||
}
|
||||
|
||||
/// get_storage_path returns the storage path from the region_dir. It will always return the storage path if the region_dir is valid, otherwise None.
|
||||
/// The storage path is constructed from the catalog and schema, which are generated by `common_meta::ddl::utils::region_storage_path`.
|
||||
/// We can extract the catalog and schema from the region_dir by following example:
|
||||
/// ```
|
||||
/// use common_meta::ddl::utils::get_catalog_and_schema;
|
||||
///
|
||||
/// fn catalog_and_schema(region_dir: &str, region_id: RegionId) -> Option<(String, String)> {
|
||||
/// get_catalog_and_schema(&get_storage_path(region_dir, region_id)?)
|
||||
/// }
|
||||
/// ```
|
||||
pub fn get_storage_path(region_dir: &str, region_id: RegionId) -> Option<String> {
|
||||
if !region_dir.starts_with(DATA_DIR) {
|
||||
return None;
|
||||
}
|
||||
|
||||
// For example, if region_dir is "data/my_catalog/my_schema/42/42_0000000001/", the parts will be '42/42_0000000001'.
|
||||
let parts = format!(
|
||||
"{}/{}",
|
||||
region_id.table_id(),
|
||||
region_name(region_id.table_id(), region_id.region_number())
|
||||
);
|
||||
|
||||
// Ignore the last '/'. The original path will be like "${DATA_DIR}${catalog}/${schema}".
|
||||
let pos = region_dir.rfind(&parts)? - 1;
|
||||
|
||||
if pos < DATA_DIR.len() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(region_dir[DATA_DIR.len()..pos].to_string())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_meta::ddl::utils::{get_catalog_and_schema, region_storage_path};
|
||||
|
||||
use super::*;
|
||||
|
||||
fn catalog_and_schema(region_dir: &str, region_id: RegionId) -> Option<(String, String)> {
|
||||
get_catalog_and_schema(&get_storage_path(region_dir, region_id)?)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_region_dir() {
|
||||
let region_id = RegionId::new(42, 1);
|
||||
@@ -57,4 +95,32 @@ mod tests {
|
||||
"data/my_catalog/my_schema/42/42_0000000001/"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_catalog_and_schema_from_region_dir() {
|
||||
let tests = [
|
||||
(RegionId::new(42, 1), "my_catalog", "my_schema"),
|
||||
(RegionId::new(1234, 1), "my_catalog_1234", "my_schema_1234"),
|
||||
(RegionId::new(5678, 1), "my_catalog_5678", "my_schema"),
|
||||
(RegionId::new(5678, 1), "my_catalog", "my_schema_5678"),
|
||||
];
|
||||
|
||||
for (region_id, test_catalog, test_schema) in tests.iter() {
|
||||
let region_dir = region_dir(
|
||||
region_storage_path(test_catalog, test_schema).as_str(),
|
||||
*region_id,
|
||||
);
|
||||
let (catalog, schema) = catalog_and_schema(®ion_dir, *region_id).unwrap();
|
||||
assert_eq!(catalog, *test_catalog);
|
||||
assert_eq!(schema, *test_schema);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_catalog_and_schema_from_invalid_region_dir() {
|
||||
assert_eq!(
|
||||
catalog_and_schema("invalid_data", RegionId::new(42, 1)),
|
||||
None
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,18 @@ create table out_num_cnt (
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
+-----------+---------------+-----------------+
|
||||
| flow_name | table_catalog | flow_definition |
|
||||
+-----------+---------------+-----------------+
|
||||
+-----------+---------------+-----------------+
|
||||
|
||||
SHOW FLOWS;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10;
|
||||
|
||||
Affected Rows: 0
|
||||
@@ -27,10 +39,38 @@ SHOW CREATE FLOW filter_numbers;
|
||||
| | AS SELECT number FROM numbers_input WHERE number > 10 |
|
||||
+----------------+-------------------------------------------------------+
|
||||
|
||||
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
+----------------+---------------+----------------------------------------------------+
|
||||
| flow_name | table_catalog | flow_definition |
|
||||
+----------------+---------------+----------------------------------------------------+
|
||||
| filter_numbers | greptime | SELECT number FROM numbers_input WHERE number > 10 |
|
||||
+----------------+---------------+----------------------------------------------------+
|
||||
|
||||
SHOW FLOWS;
|
||||
|
||||
+----------------+
|
||||
| Flows |
|
||||
+----------------+
|
||||
| filter_numbers |
|
||||
+----------------+
|
||||
|
||||
drop flow filter_numbers;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
+-----------+---------------+-----------------+
|
||||
| flow_name | table_catalog | flow_definition |
|
||||
+-----------+---------------+-----------------+
|
||||
+-----------+---------------+-----------------+
|
||||
|
||||
SHOW FLOWS;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
drop table out_num_cnt;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -8,12 +8,25 @@ create table out_num_cnt (
|
||||
number INT,
|
||||
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP TIME INDEX);
|
||||
|
||||
|
||||
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
SHOW FLOWS;
|
||||
|
||||
CREATE FLOW filter_numbers SINK TO out_num_cnt AS SELECT number FROM numbers_input where number > 10;
|
||||
|
||||
SHOW CREATE FLOW filter_numbers;
|
||||
|
||||
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
SHOW FLOWS;
|
||||
|
||||
drop flow filter_numbers;
|
||||
|
||||
SELECT flow_name, table_catalog, flow_definition FROM INFORMATION_SCHEMA.FLOWS;
|
||||
|
||||
SHOW FLOWS;
|
||||
|
||||
drop table out_num_cnt;
|
||||
|
||||
drop table numbers_input;
|
||||
|
||||
@@ -41,6 +41,7 @@ SHOW TABLES;
|
||||
| engines |
|
||||
| events |
|
||||
| files |
|
||||
| flows |
|
||||
| global_status |
|
||||
| key_column_usage |
|
||||
| optimizer_trace |
|
||||
@@ -86,6 +87,7 @@ SHOW FULL TABLES;
|
||||
| engines | LOCAL TEMPORARY |
|
||||
| events | LOCAL TEMPORARY |
|
||||
| files | LOCAL TEMPORARY |
|
||||
| flows | LOCAL TEMPORARY |
|
||||
| global_status | LOCAL TEMPORARY |
|
||||
| key_column_usage | LOCAL TEMPORARY |
|
||||
| optimizer_trace | LOCAL TEMPORARY |
|
||||
@@ -125,6 +127,7 @@ SHOW TABLE STATUS;
|
||||
|engines||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|
||||
|events||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|
||||
|files||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|
||||
|flows||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|
||||
|global_status||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|
||||
|key_column_usage||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|
||||
|optimizer_trace||11|Fixed|0|0|0|0|0|0|0|DATETIME||||0|||
|
||||
|
||||
@@ -26,6 +26,7 @@ order by table_schema, table_name;
|
||||
|greptime|information_schema|engines|LOCALTEMPORARY|5|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|
||||
|greptime|information_schema|events|LOCALTEMPORARY|13|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|
||||
|greptime|information_schema|files|LOCALTEMPORARY|14|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|
||||
|greptime|information_schema|flows|LOCALTEMPORARY|33|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|
||||
|greptime|information_schema|global_status|LOCALTEMPORARY|25|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|
||||
|greptime|information_schema|key_column_usage|LOCALTEMPORARY|16|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|
||||
|greptime|information_schema|optimizer_trace|LOCALTEMPORARY|17|0|0|0|0|0||11|Fixed|0|0|0|DATETIME||||0|||Y|
|
||||
@@ -185,6 +186,16 @@ select * from information_schema.columns order by table_schema, table_name, colu
|
||||
| greptime | information_schema | files | update_count | 13 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | No | bigint | | |
|
||||
| greptime | information_schema | files | update_time | 34 | | | | | 3 | | | | | select,insert | | DateTime | datetime | FIELD | | No | datetime | | |
|
||||
| greptime | information_schema | files | version | 25 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | flows | comment | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | flows | expire_after | 6 | | | 19 | 0 | | | | | | select,insert | | Int64 | bigint | FIELD | | Yes | bigint | | |
|
||||
| greptime | information_schema | flows | flow_definition | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | flows | flow_id | 2 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
|
||||
| greptime | information_schema | flows | flow_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | flows | flownode_ids | 9 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | flows | options | 10 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | flows | sink_table_name | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | flows | source_table_ids | 7 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | flows | table_catalog | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | global_status | variable_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | global_status | variable_value | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | key_column_usage | column_name | 8 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
|
||||
@@ -83,6 +83,7 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE;
|
||||
|greptime|information_schema|engines|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|
||||
|greptime|information_schema|events|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|
||||
|greptime|information_schema|files|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|
||||
|greptime|information_schema|flows|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|
||||
|greptime|information_schema|global_status|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|
||||
|greptime|information_schema|key_column_usage|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|
||||
|greptime|public|numbers|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID|test_engine|ID|Fixed|ID|ID|ID|DATETIME||||ID|||Y|
|
||||
|
||||
Reference in New Issue
Block a user