feat: table/column/flow COMMENT (#7060)

* initial impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* avoid unimplemented panic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* validate flow

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update sqlness result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix table column comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* table level comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify table info serde

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* don't txn

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove empty trait

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wip: procedure

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update proto

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* grpc support

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Apply suggestions from code review

Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* try from pb struct

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* doc comment

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* check unchanged fast case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* tune errors

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix merge error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use try_as_raw_value

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>
This commit is contained in:
Ruihang Xia
2025-12-10 23:08:47 +08:00
committed by GitHub
parent f1abe5d215
commit 564cc0c750
33 changed files with 1840 additions and 316 deletions

2
Cargo.lock generated
View File

@@ -5367,7 +5367,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0df99f09f1d6785055b2d9da96fc4ecc2bdf6803#0df99f09f1d6785055b2d9da96fc4ecc2bdf6803"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0423fa30203187c75e2937a668df1da699c8b96c#0423fa30203187c75e2937a668df1da699c8b96c"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",

View File

@@ -149,7 +149,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0df99f09f1d6785055b2d9da96fc4ecc2bdf6803" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0423fa30203187c75e2937a668df1da699c8b96c" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -708,6 +708,7 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
Some(Expr::CreateView(_)) => "ddl.create_view",
Some(Expr::DropView(_)) => "ddl.drop_view",
Some(Expr::AlterDatabase(_)) => "ddl.alter_database",
Some(Expr::CommentOn(_)) => "ddl.comment_on",
None => "ddl.empty",
}
}

View File

@@ -31,6 +31,7 @@ use crate::region_registry::LeaderRegionRegistryRef;
pub mod alter_database;
pub mod alter_logical_tables;
pub mod alter_table;
pub mod comment_on;
pub mod create_database;
pub mod create_flow;
pub mod create_logical_tables;

View File

@@ -301,8 +301,8 @@ fn build_new_table_info(
| AlterKind::UnsetTableOptions { .. }
| AlterKind::SetIndexes { .. }
| AlterKind::UnsetIndexes { .. }
| AlterKind::DropDefaults { .. } => {}
AlterKind::SetDefaults { .. } => {}
| AlterKind::DropDefaults { .. }
| AlterKind::SetDefaults { .. } => {}
}
info!(

View File

@@ -0,0 +1,509 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use chrono::Utc;
use common_catalog::format_full_table_name;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::tracing::info;
use datatypes::schema::COMMENT_KEY as COLUMN_COMMENT_KEY;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::storage::TableId;
use strum::AsRefStr;
use table::metadata::RawTableInfo;
use table::requests::COMMENT_KEY as TABLE_COMMENT_KEY;
use table::table_name::TableName;
use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::ddl::utils::map_to_procedure_error;
use crate::error::{ColumnNotFoundSnafu, FlowNotFoundSnafu, Result, TableNotFoundSnafu};
use crate::instruction::CacheIdent;
use crate::key::flow::flow_info::{FlowInfoKey, FlowInfoValue};
use crate::key::table_info::{TableInfoKey, TableInfoValue};
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue};
use crate::lock_key::{CatalogLock, FlowNameLock, SchemaLock, TableNameLock};
use crate::rpc::ddl::{CommentObjectType, CommentOnTask};
use crate::rpc::store::PutRequest;
pub struct CommentOnProcedure {
pub context: DdlContext,
pub data: CommentOnData,
}
impl CommentOnProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CommentOn";
pub fn new(task: CommentOnTask, context: DdlContext) -> Self {
Self {
context,
data: CommentOnData::new(task),
}
}
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
}
pub async fn on_prepare(&mut self) -> Result<Status> {
match self.data.object_type {
CommentObjectType::Table | CommentObjectType::Column => {
self.prepare_table_or_column().await?;
}
CommentObjectType::Flow => {
self.prepare_flow().await?;
}
}
// Fast path: if comment is unchanged, skip update
if self.data.is_unchanged {
let object_desc = match self.data.object_type {
CommentObjectType::Table => format!(
"table {}",
format_full_table_name(
&self.data.catalog_name,
&self.data.schema_name,
&self.data.object_name,
)
),
CommentObjectType::Column => format!(
"column {}.{}",
format_full_table_name(
&self.data.catalog_name,
&self.data.schema_name,
&self.data.object_name,
),
self.data.column_name.as_ref().unwrap()
),
CommentObjectType::Flow => {
format!("flow {}.{}", self.data.catalog_name, self.data.object_name)
}
};
info!("Comment unchanged for {}, skipping update", object_desc);
return Ok(Status::done());
}
self.data.state = CommentOnState::UpdateMetadata;
Ok(Status::executing(true))
}
async fn prepare_table_or_column(&mut self) -> Result<()> {
let table_name_key = TableNameKey::new(
&self.data.catalog_name,
&self.data.schema_name,
&self.data.object_name,
);
let table_id = self
.context
.table_metadata_manager
.table_name_manager()
.get(table_name_key)
.await?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(
&self.data.catalog_name,
&self.data.schema_name,
&self.data.object_name,
),
})?
.table_id();
let table_info = self
.context
.table_metadata_manager
.table_info_manager()
.get(table_id)
.await?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(
&self.data.catalog_name,
&self.data.schema_name,
&self.data.object_name,
),
})?;
// For column comments, validate the column exists
if self.data.object_type == CommentObjectType::Column {
let column_name = self.data.column_name.as_ref().unwrap();
let column_exists = table_info
.table_info
.meta
.schema
.column_schemas
.iter()
.any(|col| &col.name == column_name);
ensure!(
column_exists,
ColumnNotFoundSnafu {
column_name,
column_id: 0u32, // column_id is not known here
}
);
}
self.data.table_id = Some(table_id);
// Check if comment is unchanged for early exit optimization
match self.data.object_type {
CommentObjectType::Table => {
let current_comment = &table_info.table_info.desc;
if &self.data.comment == current_comment {
self.data.is_unchanged = true;
}
}
CommentObjectType::Column => {
let column_name = self.data.column_name.as_ref().unwrap();
let column_schema = table_info
.table_info
.meta
.schema
.column_schemas
.iter()
.find(|col| &col.name == column_name)
.unwrap(); // Safe: validated above
let current_comment = column_schema.metadata().get(COLUMN_COMMENT_KEY);
if self.data.comment.as_deref() == current_comment.map(String::as_str) {
self.data.is_unchanged = true;
}
}
CommentObjectType::Flow => {
// this branch is handled in `prepare_flow`
}
}
self.data.table_info = Some(table_info);
Ok(())
}
async fn prepare_flow(&mut self) -> Result<()> {
let flow_name_value = self
.context
.flow_metadata_manager
.flow_name_manager()
.get(&self.data.catalog_name, &self.data.object_name)
.await?
.with_context(|| FlowNotFoundSnafu {
flow_name: &self.data.object_name,
})?;
let flow_id = flow_name_value.flow_id();
let flow_info = self
.context
.flow_metadata_manager
.flow_info_manager()
.get_raw(flow_id)
.await?
.with_context(|| FlowNotFoundSnafu {
flow_name: &self.data.object_name,
})?;
self.data.flow_id = Some(flow_id);
// Check if comment is unchanged for early exit optimization
let current_comment = &flow_info.get_inner_ref().comment;
let new_comment = self.data.comment.as_deref().unwrap_or("");
if new_comment == current_comment.as_str() {
self.data.is_unchanged = true;
}
self.data.flow_info = Some(flow_info);
Ok(())
}
pub async fn on_update_metadata(&mut self) -> Result<Status> {
match self.data.object_type {
CommentObjectType::Table => {
self.update_table_comment().await?;
}
CommentObjectType::Column => {
self.update_column_comment().await?;
}
CommentObjectType::Flow => {
self.update_flow_comment().await?;
}
}
self.data.state = CommentOnState::InvalidateCache;
Ok(Status::executing(true))
}
async fn update_table_comment(&mut self) -> Result<()> {
let table_info_value = self.data.table_info.as_ref().unwrap();
let mut new_table_info = table_info_value.table_info.clone();
new_table_info.desc = self.data.comment.clone();
// Sync comment to table options
sync_table_comment_option(
&mut new_table_info.meta.options,
new_table_info.desc.as_deref(),
);
self.update_table_info(table_info_value, new_table_info)
.await?;
info!(
"Updated comment for table {}.{}.{}",
self.data.catalog_name, self.data.schema_name, self.data.object_name
);
Ok(())
}
async fn update_column_comment(&mut self) -> Result<()> {
let table_info_value = self.data.table_info.as_ref().unwrap();
let mut new_table_info = table_info_value.table_info.clone();
let column_name = self.data.column_name.as_ref().unwrap();
let column_schema = new_table_info
.meta
.schema
.column_schemas
.iter_mut()
.find(|col| &col.name == column_name)
.unwrap(); // Safe: validated in prepare
update_column_comment_metadata(column_schema, self.data.comment.clone());
self.update_table_info(table_info_value, new_table_info)
.await?;
info!(
"Updated comment for column {}.{}.{}.{}",
self.data.catalog_name, self.data.schema_name, self.data.object_name, column_name
);
Ok(())
}
async fn update_flow_comment(&mut self) -> Result<()> {
let flow_id = self.data.flow_id.unwrap();
let flow_info_value = self.data.flow_info.as_ref().unwrap();
let mut new_flow_info = flow_info_value.get_inner_ref().clone();
new_flow_info.comment = self.data.comment.clone().unwrap_or_default();
new_flow_info.updated_time = Utc::now();
let raw_value = new_flow_info.try_as_raw_value()?;
self.context
.table_metadata_manager
.kv_backend()
.put(
PutRequest::new()
.with_key(FlowInfoKey::new(flow_id).to_bytes())
.with_value(raw_value),
)
.await?;
info!(
"Updated comment for flow {}.{}",
self.data.catalog_name, self.data.object_name
);
Ok(())
}
async fn update_table_info(
&self,
current_table_info: &DeserializedValueWithBytes<TableInfoValue>,
new_table_info: RawTableInfo,
) -> Result<()> {
let table_id = current_table_info.table_info.ident.table_id;
let new_table_info_value = current_table_info.update(new_table_info);
let raw_value = new_table_info_value.try_as_raw_value()?;
self.context
.table_metadata_manager
.kv_backend()
.put(
PutRequest::new()
.with_key(TableInfoKey::new(table_id).to_bytes())
.with_value(raw_value),
)
.await?;
Ok(())
}
pub async fn on_invalidate_cache(&mut self) -> Result<Status> {
let cache_invalidator = &self.context.cache_invalidator;
match self.data.object_type {
CommentObjectType::Table | CommentObjectType::Column => {
let table_id = self.data.table_id.unwrap();
let table_name = TableName::new(
self.data.catalog_name.clone(),
self.data.schema_name.clone(),
self.data.object_name.clone(),
);
let cache_ident = vec![
CacheIdent::TableId(table_id),
CacheIdent::TableName(table_name),
];
cache_invalidator
.invalidate(&Context::default(), &cache_ident)
.await?;
}
CommentObjectType::Flow => {
let flow_id = self.data.flow_id.unwrap();
let cache_ident = vec![CacheIdent::FlowId(flow_id)];
cache_invalidator
.invalidate(&Context::default(), &cache_ident)
.await?;
}
}
Ok(Status::done())
}
}
#[async_trait]
impl Procedure for CommentOnProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
match self.data.state {
CommentOnState::Prepare => self.on_prepare().await,
CommentOnState::UpdateMetadata => self.on_update_metadata().await,
CommentOnState::InvalidateCache => self.on_invalidate_cache().await,
}
.map_err(map_to_procedure_error)
}
fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}
fn lock_key(&self) -> LockKey {
let catalog = &self.data.catalog_name;
let schema = &self.data.schema_name;
let lock_key = match self.data.object_type {
CommentObjectType::Table | CommentObjectType::Column => {
vec![
CatalogLock::Read(catalog).into(),
SchemaLock::read(catalog, schema).into(),
TableNameLock::new(catalog, schema, &self.data.object_name).into(),
]
}
CommentObjectType::Flow => {
vec![
CatalogLock::Read(catalog).into(),
FlowNameLock::new(catalog, &self.data.object_name).into(),
]
}
};
LockKey::new(lock_key)
}
}
#[derive(Debug, Serialize, Deserialize, AsRefStr)]
enum CommentOnState {
Prepare,
UpdateMetadata,
InvalidateCache,
}
/// The data of comment on procedure.
#[derive(Debug, Serialize, Deserialize)]
pub struct CommentOnData {
state: CommentOnState,
catalog_name: String,
schema_name: String,
object_type: CommentObjectType,
object_name: String,
/// Column name (only for Column comments)
column_name: Option<String>,
comment: Option<String>,
/// Cached table ID (for Table/Column)
#[serde(skip_serializing_if = "Option::is_none")]
table_id: Option<TableId>,
/// Cached table info (for Table/Column)
#[serde(skip)]
table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
/// Cached flow ID (for Flow)
#[serde(skip_serializing_if = "Option::is_none")]
flow_id: Option<FlowId>,
/// Cached flow info (for Flow)
#[serde(skip)]
flow_info: Option<DeserializedValueWithBytes<FlowInfoValue>>,
/// Whether the comment is unchanged (optimization for early exit)
#[serde(skip)]
is_unchanged: bool,
}
impl CommentOnData {
pub fn new(task: CommentOnTask) -> Self {
Self {
state: CommentOnState::Prepare,
catalog_name: task.catalog_name,
schema_name: task.schema_name,
object_type: task.object_type,
object_name: task.object_name,
column_name: task.column_name,
comment: task.comment,
table_id: None,
table_info: None,
flow_id: None,
flow_info: None,
is_unchanged: false,
}
}
}
fn update_column_comment_metadata(
column_schema: &mut datatypes::schema::ColumnSchema,
comment: Option<String>,
) {
match comment {
Some(value) => {
column_schema
.mut_metadata()
.insert(COLUMN_COMMENT_KEY.to_string(), value);
}
None => {
column_schema.mut_metadata().remove(COLUMN_COMMENT_KEY);
}
}
}
fn sync_table_comment_option(options: &mut table::requests::TableOptions, comment: Option<&str>) {
match comment {
Some(value) => {
options
.extra_options
.insert(TABLE_COMMENT_KEY.to_string(), value.to_string());
}
None => {
options.extra_options.remove(TABLE_COMMENT_KEY);
}
}
}

View File

@@ -27,6 +27,7 @@ use store_api::storage::TableId;
use crate::ddl::alter_database::AlterDatabaseProcedure;
use crate::ddl::alter_logical_tables::AlterLogicalTablesProcedure;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::comment_on::CommentOnProcedure;
use crate::ddl::create_database::CreateDatabaseProcedure;
use crate::ddl::create_flow::CreateFlowProcedure;
use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure;
@@ -52,18 +53,18 @@ use crate::rpc::ddl::DdlTask::CreateTrigger;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::DdlTask::DropTrigger;
use crate::rpc::ddl::DdlTask::{
AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
TruncateTable,
AlterDatabase, AlterLogicalTables, AlterTable, CommentOn, CreateDatabase, CreateFlow,
CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables,
DropTable, DropView, TruncateTable,
};
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::trigger::CreateTriggerTask;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::trigger::DropTriggerTask;
use crate::rpc::ddl::{
AlterDatabaseTask, AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask,
CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask, QueryContext,
SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
AlterDatabaseTask, AlterTableTask, CommentOnTask, CreateDatabaseTask, CreateFlowTask,
CreateTableTask, CreateViewTask, DropDatabaseTask, DropFlowTask, DropTableTask, DropViewTask,
QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask,
};
use crate::rpc::router::RegionRoute;
@@ -192,7 +193,8 @@ impl DdlManager {
TruncateTableProcedure,
CreateDatabaseProcedure,
DropDatabaseProcedure,
DropViewProcedure
DropViewProcedure,
CommentOnProcedure
);
for (type_name, loader_factory) in loaders {
@@ -408,6 +410,19 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}
/// Submits and executes a comment on task.
#[tracing::instrument(skip_all)]
pub async fn submit_comment_on_task(
&self,
comment_on_task: CommentOnTask,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = CommentOnProcedure::new(comment_on_task, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
self.submit_procedure(procedure_with_id).await
}
async fn submit_procedure(
&self,
procedure_with_id: ProcedureWithId,
@@ -476,6 +491,7 @@ impl DdlManager {
handle_create_view_task(self, create_view_task).await
}
DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await,
#[cfg(feature = "enterprise")]
CreateTrigger(create_trigger_task) => {
handle_create_trigger_task(
@@ -907,6 +923,26 @@ async fn handle_create_view_task(
})
}
async fn handle_comment_on_task(
ddl_manager: &DdlManager,
comment_on_task: CommentOnTask,
) -> Result<SubmitDdlTaskResponse> {
let (id, _) = ddl_manager
.submit_comment_on_task(comment_on_task.clone())
.await?;
let procedure_id = id.to_string();
info!(
"Comment on {}.{}.{} is updated via procedure_id {id:?}",
comment_on_task.catalog_name, comment_on_task.schema_name, comment_on_task.object_name
);
Ok(SubmitDdlTaskResponse {
key: procedure_id.into(),
..Default::default()
})
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -94,7 +94,7 @@ impl TableInfoValue {
}
}
pub(crate) fn update(&self, new_table_info: RawTableInfo) -> Self {
pub fn update(&self, new_table_info: RawTableInfo) -> Self {
Self {
table_info: new_table_info,
version: self.version + 1,

View File

@@ -23,19 +23,20 @@ use api::v1::alter_database_expr::Kind as PbAlterDatabaseKind;
use api::v1::meta::ddl_task_request::Task;
use api::v1::meta::{
AlterDatabaseTask as PbAlterDatabaseTask, AlterTableTask as PbAlterTableTask,
AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask,
CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask,
CreateTableTasks as PbCreateTableTasks, CreateViewTask as PbCreateViewTask,
DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks,
DropViewTask as PbDropViewTask, Partition, ProcedureId,
AlterTableTasks as PbAlterTableTasks, CommentOnTask as PbCommentOnTask,
CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest,
DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask,
DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId,
TruncateTableTask as PbTruncateTableTask,
};
use api::v1::{
AlterDatabaseExpr, AlterTableExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr,
CreateViewExpr, DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval,
ExpireAfter, Option as PbOption, QueryContext as PbQueryContext, TruncateTableExpr,
AlterDatabaseExpr, AlterTableExpr, CommentObjectType as PbCommentObjectType, CommentOnExpr,
CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropDatabaseExpr,
DropFlowExpr, DropTableExpr, DropViewExpr, EvalInterval, ExpireAfter, Option as PbOption,
QueryContext as PbQueryContext, TruncateTableExpr,
};
use base64::Engine as _;
use base64::engine::general_purpose;
@@ -78,6 +79,7 @@ pub enum DdlTask {
DropView(DropViewTask),
#[cfg(feature = "enterprise")]
CreateTrigger(trigger::CreateTriggerTask),
CommentOn(CommentOnTask),
}
impl DdlTask {
@@ -200,6 +202,11 @@ impl DdlTask {
view_info,
})
}
/// Creates a [`DdlTask`] to comment on a table, column, or flow.
pub fn new_comment_on(task: CommentOnTask) -> Self {
DdlTask::CommentOn(task)
}
}
impl TryFrom<Task> for DdlTask {
@@ -278,6 +285,7 @@ impl TryFrom<Task> for DdlTask {
.fail()
}
}
Task::CommentOnTask(comment_on) => Ok(DdlTask::CommentOn(comment_on.try_into()?)),
}
}
}
@@ -332,6 +340,7 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.try_into()?),
#[cfg(feature = "enterprise")]
DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
DdlTask::CommentOn(task) => Task::CommentOnTask(task.into()),
};
Ok(Self {
@@ -1277,6 +1286,119 @@ impl From<DropFlowTask> for PbDropFlowTask {
}
}
/// Represents the ID of the object being commented on (Table or Flow).
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum CommentObjectId {
Table(TableId),
Flow(FlowId),
}
/// Comment on table, column, or flow
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct CommentOnTask {
pub catalog_name: String,
pub schema_name: String,
pub object_type: CommentObjectType,
pub object_name: String,
/// Column name (only for Column comments)
pub column_name: Option<String>,
/// Object ID (Table or Flow) for validation and cache invalidation
pub object_id: Option<CommentObjectId>,
pub comment: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum CommentObjectType {
Table,
Column,
Flow,
}
impl CommentOnTask {
pub fn table_ref(&self) -> TableReference<'_> {
TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: &self.object_name,
}
}
}
// Proto conversions for CommentObjectType
impl From<CommentObjectType> for PbCommentObjectType {
fn from(object_type: CommentObjectType) -> Self {
match object_type {
CommentObjectType::Table => PbCommentObjectType::Table,
CommentObjectType::Column => PbCommentObjectType::Column,
CommentObjectType::Flow => PbCommentObjectType::Flow,
}
}
}
impl TryFrom<i32> for CommentObjectType {
type Error = error::Error;
fn try_from(value: i32) -> Result<Self> {
match value {
0 => Ok(CommentObjectType::Table),
1 => Ok(CommentObjectType::Column),
2 => Ok(CommentObjectType::Flow),
_ => error::InvalidProtoMsgSnafu {
err_msg: format!(
"Invalid CommentObjectType value: {}. Valid values are: 0 (Table), 1 (Column), 2 (Flow)",
value
),
}
.fail(),
}
}
}
// Proto conversions for CommentOnTask
impl TryFrom<PbCommentOnTask> for CommentOnTask {
type Error = error::Error;
fn try_from(pb: PbCommentOnTask) -> Result<Self> {
let comment_on = pb.comment_on.context(error::InvalidProtoMsgSnafu {
err_msg: "expected comment_on",
})?;
Ok(CommentOnTask {
catalog_name: comment_on.catalog_name,
schema_name: comment_on.schema_name,
object_type: comment_on.object_type.try_into()?,
object_name: comment_on.object_name,
column_name: if comment_on.column_name.is_empty() {
None
} else {
Some(comment_on.column_name)
},
comment: if comment_on.comment.is_empty() {
None
} else {
Some(comment_on.comment)
},
object_id: None,
})
}
}
impl From<CommentOnTask> for PbCommentOnTask {
fn from(task: CommentOnTask) -> Self {
let pb_object_type: PbCommentObjectType = task.object_type.into();
PbCommentOnTask {
comment_on: Some(CommentOnExpr {
catalog_name: task.catalog_name,
schema_name: task.schema_name,
object_type: pb_object_type as i32,
object_name: task.object_name,
column_name: task.column_name.unwrap_or_default(),
comment: task.comment.unwrap_or_default(),
}),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct QueryContext {
pub(crate) current_catalog: String,

View File

@@ -84,6 +84,7 @@ use snafu::prelude::*;
use sql::ast::ObjectNamePartExt;
use sql::dialect::Dialect;
use sql::parser::{ParseOptions, ParserContext};
use sql::statements::comment::CommentObject;
use sql::statements::copy::{CopyDatabase, CopyTable};
use sql::statements::statement::Statement;
use sql::statements::tql::Tql;
@@ -897,7 +898,7 @@ pub fn check_permission(
validate_param(&stmt.table_name, query_ctx)?;
}
Statement::ShowCreateFlow(stmt) => {
validate_param(&stmt.flow_name, query_ctx)?;
validate_flow(&stmt.flow_name, query_ctx)?;
}
#[cfg(feature = "enterprise")]
Statement::ShowCreateTrigger(stmt) => {
@@ -930,6 +931,12 @@ pub fn check_permission(
// show charset and show collation won't be checked
Statement::ShowCharset(_) | Statement::ShowCollation(_) => {}
Statement::Comment(comment) => match &comment.object {
CommentObject::Table(table) => validate_param(table, query_ctx)?,
CommentObject::Column { table, .. } => validate_param(table, query_ctx)?,
CommentObject::Flow(flow) => validate_flow(flow, query_ctx)?,
},
Statement::Insert(insert) => {
let name = insert.table_name().context(ParseSqlSnafu)?;
validate_param(name, query_ctx)?;
@@ -1015,6 +1022,27 @@ fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()>
.context(SqlExecInterceptedSnafu)
}
fn validate_flow(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
let catalog = match &name.0[..] {
[_flow] => query_ctx.current_catalog().to_string(),
[catalog, _flow] => catalog.to_string_unquoted(),
_ => {
return InvalidSqlSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {name}",
),
}
.fail();
}
};
let schema = query_ctx.current_schema();
validate_catalog_and_schema(&catalog, &schema, query_ctx)
.map_err(BoxedError::new)
.context(SqlExecInterceptedSnafu)
}
fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
let (catalog, schema) = match &name.0[..] {
[schema] => (
@@ -1273,6 +1301,28 @@ mod tests {
// test describe table
let sql = "DESC TABLE {catalog}{schema}demo;";
replace_test(sql, plugins, &query_ctx);
replace_test(sql, plugins.clone(), &query_ctx);
let comment_flow_cases = [
("COMMENT ON FLOW my_flow IS 'comment';", true),
("COMMENT ON FLOW greptime.my_flow IS 'comment';", true),
("COMMENT ON FLOW wrongcatalog.my_flow IS 'comment';", false),
];
for (sql, is_ok) in comment_flow_cases {
let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
let result = check_permission(plugins.clone(), stmt, &query_ctx);
assert_eq!(result.is_ok(), is_ok);
}
let show_flow_cases = [
("SHOW CREATE FLOW my_flow;", true),
("SHOW CREATE FLOW greptime.my_flow;", true),
("SHOW CREATE FLOW wrongcatalog.my_flow;", false),
];
for (sql, is_ok) in show_flow_cases {
let stmt = &parse_stmt(sql, &GreptimeDbDialect {}).unwrap()[0];
let result = check_permission(plugins.clone(), stmt, &query_ctx);
assert_eq!(result.is_ok(), is_ok);
}
}
}

View File

@@ -234,6 +234,11 @@ impl GrpcQueryHandler for Instance {
DdlExpr::DropView(_) => {
todo!("implemented in the following PR")
}
DdlExpr::CommentOn(expr) => {
self.statement_executor
.comment_by_expr(expr, ctx.clone())
.await?
}
}
}
};
@@ -399,6 +404,9 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte
Expr::DropView(expr) => {
check_and_fill!(expr);
}
Expr::CommentOn(expr) => {
check_and_fill!(expr);
}
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
mod admin;
mod comment;
mod copy_database;
mod copy_query_to;
mod copy_table_from;
@@ -428,6 +429,7 @@ impl StatementExecutor {
Statement::ShowCreateTrigger(show) => self.show_create_trigger(show, query_ctx).await,
Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
Statement::Comment(stmt) => self.comment(stmt, query_ctx).await,
Statement::ShowColumns(show_columns) => {
self.show_columns(show_columns, query_ctx).await
}

View File

@@ -0,0 +1,176 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::CommentOnExpr;
use common_error::ext::BoxedError;
use common_meta::procedure_executor::ExecutorContext;
use common_meta::rpc::ddl::{CommentObjectType, CommentOnTask, DdlTask, SubmitDdlTaskRequest};
use common_query::Output;
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::ResultExt;
use sql::ast::ObjectNamePartExt;
use sql::statements::comment::{Comment, CommentObject};
use crate::error::{ExecuteDdlSnafu, ExternalSnafu, InvalidSqlSnafu, Result};
use crate::statement::StatementExecutor;
impl StatementExecutor {
/// Adds a comment to a database object (table, column, or flow).
///
/// # Arguments
///
/// * `stmt`: A `Comment` struct containing the object to comment on and the comment text.
/// * `query_ctx`: A `QueryContextRef` providing contextual information for the query.
///
/// # Returns
///
/// A `Result` containing the `Output` of the operation, or an error if the operation fails.
pub async fn comment(&self, stmt: Comment, query_ctx: QueryContextRef) -> Result<Output> {
let comment_on_task = self.create_comment_on_task_from_stmt(stmt, &query_ctx)?;
let request = SubmitDdlTaskRequest {
task: DdlTask::new_comment_on(comment_on_task),
query_context: query_ctx,
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(ExecuteDdlSnafu)
.map(|_| Output::new_with_affected_rows(0))
}
pub async fn comment_by_expr(
&self,
expr: CommentOnExpr,
query_ctx: QueryContextRef,
) -> Result<Output> {
let comment_on_task = self.create_comment_on_task_from_expr(expr)?;
let request = SubmitDdlTaskRequest {
task: DdlTask::new_comment_on(comment_on_task),
query_context: query_ctx,
};
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(ExecuteDdlSnafu)
.map(|_| Output::new_with_affected_rows(0))
}
fn create_comment_on_task_from_expr(&self, expr: CommentOnExpr) -> Result<CommentOnTask> {
let object_type = match expr.object_type {
0 => CommentObjectType::Table,
1 => CommentObjectType::Column,
2 => CommentObjectType::Flow,
_ => {
return InvalidSqlSnafu {
err_msg: format!(
"Invalid CommentObjectType value: {}. Valid values are: 0 (Table), 1 (Column), 2 (Flow)",
expr.object_type
),
}
.fail();
}
};
Ok(CommentOnTask {
catalog_name: expr.catalog_name,
schema_name: expr.schema_name,
object_type,
object_name: expr.object_name,
column_name: if expr.column_name.is_empty() {
None
} else {
Some(expr.column_name)
},
object_id: None,
comment: if expr.comment.is_empty() {
None
} else {
Some(expr.comment)
},
})
}
fn create_comment_on_task_from_stmt(
&self,
stmt: Comment,
query_ctx: &QueryContextRef,
) -> Result<CommentOnTask> {
match stmt.object {
CommentObject::Table(table) => {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&table, query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(CommentOnTask {
catalog_name,
schema_name,
object_type: CommentObjectType::Table,
object_name: table_name,
column_name: None,
object_id: None,
comment: stmt.comment,
})
}
CommentObject::Column { table, column } => {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&table, query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(CommentOnTask {
catalog_name,
schema_name,
object_type: CommentObjectType::Column,
object_name: table_name,
column_name: Some(column.value),
object_id: None,
comment: stmt.comment,
})
}
CommentObject::Flow(flow_name) => {
let (catalog_name, flow_name_str) = match &flow_name.0[..] {
[flow] => (
query_ctx.current_catalog().to_string(),
flow.to_string_unquoted(),
),
[catalog, flow] => (catalog.to_string_unquoted(), flow.to_string_unquoted()),
_ => {
return InvalidSqlSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow_name> or <flow_name>, actual: {flow_name}"
),
}
.fail();
}
};
Ok(CommentOnTask {
catalog_name,
schema_name: String::new(), // Flow doesn't use schema
object_type: CommentObjectType::Flow,
object_name: flow_name_str,
column_name: None,
object_id: None,
comment: stmt.comment,
})
}
}
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_meta::key::schema_name::SchemaNameKey;
use common_query::Output;
@@ -120,7 +122,30 @@ impl StatementExecutor {
table: TableRef,
query_ctx: QueryContextRef,
) -> Result<Output> {
let table_info = table.table_info();
let mut table_info = table.table_info();
let partition_column_names: Vec<_> =
table_info.meta.partition_column_names().cloned().collect();
if let Some(latest) = self
.table_metadata_manager
.table_info_manager()
.get(table_info.table_id())
.await
.context(TableMetadataManagerSnafu)?
{
let mut latest_info = TableInfo::try_from(latest.into_inner().table_info)
.context(error::CreateTableInfoSnafu)?;
if !partition_column_names.is_empty() {
latest_info.meta.partition_key_indices = partition_column_names
.iter()
.filter_map(|name| latest_info.meta.schema.column_index_by_name(name.as_str()))
.collect();
}
table_info = Arc::new(latest_info);
}
if table_info.table_type != TableType::Base {
return error::ShowCreateTableBaseOnlySnafu {
table_name: table_name.to_string(),
@@ -150,7 +175,7 @@ impl StatementExecutor {
let partitions = create_partitions_stmt(&table_info, partitions)?;
query::sql::show_create_table(table, schema_options, partitions, query_ctx)
query::sql::show_create_table(table_info, schema_options, partitions, query_ctx)
.context(ExecuteStatementSnafu)
}

View File

@@ -65,6 +65,7 @@ use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
use table::TableRef;
use table::metadata::TableInfoRef;
use table::requests::{FILE_TABLE_LOCATION_KEY, FILE_TABLE_PATTERN_KEY};
use crate::QueryEngineRef;
@@ -821,13 +822,12 @@ pub fn show_create_database(database_name: &str, options: OptionMap) -> Result<O
}
pub fn show_create_table(
table: TableRef,
table_info: TableInfoRef,
schema_options: Option<SchemaOptions>,
partitions: Option<Partitions>,
query_ctx: QueryContextRef,
) -> Result<Output> {
let table_info = table.table_info();
let table_name = &table_info.name;
let table_name = table_info.name.clone();
let quote_style = query_ctx.quote_style();
@@ -838,7 +838,7 @@ pub fn show_create_table(
});
let sql = format!("{}", stmt);
let columns = vec![
Arc::new(StringVector::from(vec![table_name.clone()])) as _,
Arc::new(StringVector::from(vec![table_name])) as _,
Arc::new(StringVector::from(vec![sql])) as _,
];
let records = RecordBatches::try_from_columns(SHOW_CREATE_TABLE_OUTPUT_SCHEMA.clone(), columns)

View File

@@ -34,7 +34,9 @@ use sql::statements::create::{Column, ColumnExtensions, CreateTable, TableConstr
use sql::statements::{self, OptionMap};
use store_api::metric_engine_consts::{is_metric_engine, is_metric_engine_internal_column};
use table::metadata::{TableInfoRef, TableMeta};
use table::requests::{FILE_TABLE_META_KEY, TTL_KEY, WRITE_BUFFER_SIZE_KEY};
use table::requests::{
COMMENT_KEY as TABLE_COMMENT_KEY, FILE_TABLE_META_KEY, TTL_KEY, WRITE_BUFFER_SIZE_KEY,
};
use crate::error::{
ConvertSqlTypeSnafu, ConvertSqlValueSnafu, GetFulltextOptionsSnafu,
@@ -249,6 +251,13 @@ pub fn create_table_stmt(
let constraints = create_table_constraints(&table_meta.engine, schema, table_meta, quote_style);
let mut options = create_sql_options(table_meta, schema_options);
if let Some(comment) = &table_info.desc
&& options.get(TABLE_COMMENT_KEY).is_none()
{
options.insert(format!("'{TABLE_COMMENT_KEY}'"), comment.clone());
}
Ok(CreateTable {
if_not_exists: true,
table_id: table_info.ident.table_id,
@@ -256,7 +265,7 @@ pub fn create_table_stmt(
columns,
engine: table_meta.engine.clone(),
constraints,
options: create_sql_options(table_meta, schema_options),
options,
partitions: None,
})
}

View File

@@ -163,6 +163,8 @@ impl ParserContext<'_> {
Keyword::TRUNCATE => self.parse_truncate(),
Keyword::COMMENT => self.parse_comment(),
Keyword::SET => self.parse_set_variables(),
Keyword::ADMIN => self.parse_admin_command(),

View File

@@ -14,6 +14,7 @@
pub(crate) mod admin_parser;
mod alter_parser;
pub(crate) mod comment_parser;
pub(crate) mod copy_parser;
pub(crate) mod create_parser;
pub(crate) mod cursor_parser;

View File

@@ -0,0 +1,196 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::{ResultExt, ensure};
use sqlparser::ast::ObjectName;
use sqlparser::keywords::Keyword;
use sqlparser::tokenizer::Token;
use crate::ast::{Ident, ObjectNamePart};
use crate::error::{self, InvalidSqlSnafu, Result};
use crate::parser::{FLOW, ParserContext};
use crate::statements::comment::{Comment, CommentObject};
use crate::statements::statement::Statement;
impl ParserContext<'_> {
pub(crate) fn parse_comment(&mut self) -> Result<Statement> {
let _ = self.parser.next_token(); // consume COMMENT
if !self.parser.parse_keyword(Keyword::ON) {
return self.expected("ON", self.parser.peek_token());
}
let target_token = self.parser.next_token();
let comment = match target_token.token {
Token::Word(word) if word.keyword == Keyword::TABLE => {
let raw_table =
self.parse_object_name()
.with_context(|_| error::UnexpectedSnafu {
expected: "a table name",
actual: self.peek_token_as_string(),
})?;
let table = Self::canonicalize_object_name(raw_table)?;
CommentObject::Table(table)
}
Token::Word(word) if word.keyword == Keyword::COLUMN => {
self.parse_column_comment_target()?
}
Token::Word(word)
if word.keyword == Keyword::NoKeyword && word.value.eq_ignore_ascii_case(FLOW) =>
{
let raw_flow =
self.parse_object_name()
.with_context(|_| error::UnexpectedSnafu {
expected: "a flow name",
actual: self.peek_token_as_string(),
})?;
let flow = Self::canonicalize_object_name(raw_flow)?;
CommentObject::Flow(flow)
}
_ => return self.expected("TABLE, COLUMN or FLOW", target_token),
};
if !self.parser.parse_keyword(Keyword::IS) {
return self.expected("IS", self.parser.peek_token());
}
let comment_value = if self.parser.parse_keyword(Keyword::NULL) {
None
} else {
Some(
self.parser
.parse_literal_string()
.context(error::SyntaxSnafu)?,
)
};
Ok(Statement::Comment(Comment {
object: comment,
comment: comment_value,
}))
}
fn parse_column_comment_target(&mut self) -> Result<CommentObject> {
let raw = self
.parse_object_name()
.with_context(|_| error::UnexpectedSnafu {
expected: "a column reference",
actual: self.peek_token_as_string(),
})?;
let canonical = Self::canonicalize_object_name(raw)?;
let mut parts = canonical.0;
ensure!(
parts.len() >= 2,
InvalidSqlSnafu {
msg: "COMMENT ON COLUMN expects <table>.<column>".to_string(),
}
);
let column_part = parts.pop().unwrap();
let ObjectNamePart::Identifier(column_ident) = column_part else {
unreachable!("canonicalized object name should only contain identifiers");
};
let column = ParserContext::canonicalize_identifier(column_ident);
let mut table_idents: Vec<Ident> = Vec::with_capacity(parts.len());
for part in parts {
match part {
ObjectNamePart::Identifier(ident) => table_idents.push(ident),
ObjectNamePart::Function(_) => {
unreachable!("canonicalized object name should only contain identifiers")
}
}
}
ensure!(
!table_idents.is_empty(),
InvalidSqlSnafu {
msg: "Table name is required before column name".to_string(),
}
);
let table = ObjectName::from(table_idents);
Ok(CommentObject::Column { table, column })
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use crate::dialect::GreptimeDbDialect;
use crate::parser::{ParseOptions, ParserContext};
use crate::statements::comment::CommentObject;
use crate::statements::statement::Statement;
fn parse(sql: &str) -> Statement {
let mut stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
assert_eq!(stmts.len(), 1);
stmts.pop().unwrap()
}
#[test]
fn test_parse_comment_on_table() {
let stmt = parse("COMMENT ON TABLE mytable IS 'test';");
match stmt {
Statement::Comment(comment) => {
assert_matches!(comment.object, CommentObject::Table(ref name) if name.to_string() == "mytable");
assert_eq!(comment.comment.as_deref(), Some("test"));
}
_ => panic!("expected comment statement"),
}
let stmt = parse("COMMENT ON TABLE mytable IS NULL;");
match stmt {
Statement::Comment(comment) => {
assert_matches!(comment.object, CommentObject::Table(ref name) if name.to_string() == "mytable");
assert!(comment.comment.is_none());
}
_ => panic!("expected comment statement"),
}
}
#[test]
fn test_parse_comment_on_column() {
let stmt = parse("COMMENT ON COLUMN my_schema.my_table.my_col IS 'desc';");
match stmt {
Statement::Comment(comment) => match comment.object {
CommentObject::Column { table, column } => {
assert_eq!(table.to_string(), "my_schema.my_table");
assert_eq!(column.value, "my_col");
assert_eq!(comment.comment.as_deref(), Some("desc"));
}
_ => panic!("expected column comment"),
},
_ => panic!("expected comment statement"),
}
}
#[test]
fn test_parse_comment_on_flow() {
let stmt = parse("COMMENT ON FLOW my_flow IS 'desc';");
match stmt {
Statement::Comment(comment) => {
assert_matches!(comment.object, CommentObject::Flow(ref name) if name.to_string() == "my_flow");
assert_eq!(comment.comment.as_deref(), Some("desc"));
}
_ => panic!("expected comment statement"),
}
}
}

View File

@@ -14,6 +14,7 @@
pub mod admin;
pub mod alter;
pub mod comment;
pub mod copy;
pub mod create;
pub mod cursor;

View File

@@ -0,0 +1,67 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{self, Display, Formatter};
use serde::Serialize;
use sqlparser_derive::{Visit, VisitMut};
use crate::ast::{Ident, ObjectName};
/// Represents a SQL COMMENT statement for adding or removing comments on database objects.
///
/// # Examples
///
/// ```sql
/// COMMENT ON TABLE my_table IS 'This is a table comment';
/// COMMENT ON COLUMN my_table.my_column IS 'This is a column comment';
/// COMMENT ON FLOW my_flow IS NULL;
/// ```
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)]
pub struct Comment {
pub object: CommentObject,
pub comment: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)]
pub enum CommentObject {
Table(ObjectName),
Column { table: ObjectName, column: Ident },
Flow(ObjectName),
}
impl Display for Comment {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "COMMENT ON {} IS ", self.object)?;
match &self.comment {
Some(comment) => {
let escaped = comment.replace('\'', "''");
write!(f, "'{}'", escaped)
}
None => f.write_str("NULL"),
}
}
}
impl Display for CommentObject {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
CommentObject::Table(name) => write!(f, "TABLE {}", name),
CommentObject::Column { table, column } => {
write!(f, "COLUMN {}.{}", table, column)
}
CommentObject::Flow(name) => write!(f, "FLOW {}", name),
}
}
}

View File

@@ -22,6 +22,7 @@ use sqlparser_derive::{Visit, VisitMut};
use crate::error::{ConvertToDfStatementSnafu, Error};
use crate::statements::admin::Admin;
use crate::statements::alter::{AlterDatabase, AlterTable};
use crate::statements::comment::Comment;
use crate::statements::copy::Copy;
use crate::statements::create::{
CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView,
@@ -137,6 +138,8 @@ pub enum Statement {
SetVariables(SetVariables),
// SHOW VARIABLES
ShowVariables(ShowVariables),
// COMMENT ON
Comment(Comment),
// USE
Use(String),
// Admin statement(extension)
@@ -204,6 +207,7 @@ impl Statement {
| Statement::Copy(_)
| Statement::TruncateTable(_)
| Statement::SetVariables(_)
| Statement::Comment(_)
| Statement::Use(_)
| Statement::DeclareCursor(_)
| Statement::CloseCursor(_)
@@ -267,6 +271,7 @@ impl Display for Statement {
Statement::TruncateTable(s) => s.fmt(f),
Statement::SetVariables(s) => s.fmt(f),
Statement::ShowVariables(s) => s.fmt(f),
Statement::Comment(s) => s.fmt(f),
Statement::ShowCharset(kind) => {
write!(f, "SHOW CHARSET {kind}")
}

View File

@@ -1256,6 +1256,7 @@ CREATE TABLE {table_name} (
| | |
| | ENGINE=mito |
| | WITH( |
| | 'comment' = 'Created on insertion', |
| | 'compaction.twcs.time_window' = '1d', |
| | 'compaction.type' = 'twcs' |
| | ) |

View File

@@ -514,9 +514,9 @@ async fn insert_with_hints_and_assert(db: &Database) {
let pretty = record_batches.pretty_print().unwrap();
let expected = "\
+-------+-------------------------------------+
+-------+---------------------------------------+
| Table | Create Table |
+-------+-------------------------------------+
+-------+---------------------------------------+
| demo | CREATE TABLE IF NOT EXISTS \"demo\" ( |
| | \"host\" STRING NULL, |
| | \"cpu\" DOUBLE NULL, |
@@ -528,9 +528,10 @@ async fn insert_with_hints_and_assert(db: &Database) {
| | |
| | ENGINE=mito |
| | WITH( |
| | 'comment' = 'Created on insertion', |
| | append_mode = 'true' |
| | ) |
+-------+-------------------------------------+\
+-------+---------------------------------------+\
";
assert_eq!(pretty, expected);

View File

@@ -1783,7 +1783,7 @@ pub async fn test_prometheus_remote_special_labels(store_type: StorageType) {
expected,
)
.await;
let expected = "[[\"idc3_lo_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc3_lo_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'f1'\\n)\"]]";
let expected = "[[\"idc3_lo_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc3_lo_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n on_physical_table = 'f1'\\n)\"]]";
validate_data(
"test_prometheus_remote_special_labels_idc3_show_create_table",
&client,
@@ -1809,7 +1809,7 @@ pub async fn test_prometheus_remote_special_labels(store_type: StorageType) {
expected,
)
.await;
let expected = "[[\"idc4_local_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc4_local_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'f2'\\n)\"]]";
let expected = "[[\"idc4_local_table\",\"CREATE TABLE IF NOT EXISTS \\\"idc4_local_table\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n on_physical_table = 'f2'\\n)\"]]";
validate_data(
"test_prometheus_remote_special_labels_idc4_show_create_table",
&client,
@@ -2271,7 +2271,7 @@ transform:
assert_eq!(res.status(), StatusCode::OK);
// 3. check schema
let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL INVERTED INDEX,\\n \\\"id2\\\" INT NULL INVERTED INDEX,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\"),\\n PRIMARY KEY (\\\"type\\\", \\\"log\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL INVERTED INDEX,\\n \\\"id2\\\" INT NULL INVERTED INDEX,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\"),\\n PRIMARY KEY (\\\"type\\\", \\\"log\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n \'comment\' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]";
validate_data(
"pipeline_schema",
&client,
@@ -3112,9 +3112,10 @@ table_suffix: _${type}
// )
// ENGINE=mito
// WITH(
// 'comment' = 'Created on insertion',
// append_mode = 'true'
// )
let expected = "[[\"d_table_db\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_db\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
let expected = "[[\"d_table_db\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_db\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]";
validate_data(
"test_pipeline_context_db",
@@ -3129,11 +3130,12 @@ table_suffix: _${type}
// )
// ENGINE=mito
// WITH(
// 'comment' = 'Created on insertion',
// append_mode = 'true',
// skip_wal = 'true',
// ttl = '1day'
// )
let expected = "[[\"d_table_http\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_http\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true',\\n skip_wal = 'true',\\n ttl = '1day'\\n)\"]]";
let expected = "[[\"d_table_http\",\"CREATE TABLE IF NOT EXISTS \\\"d_table_http\\\" (\\n \\\"id1_root\\\" INT NULL,\\n \\\"id2_root\\\" INT NULL,\\n \\\"type\\\" STRING NULL,\\n \\\"log\\\" STRING NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n \'comment\' = 'Created on insertion',\\n append_mode = 'true',\\n skip_wal = 'true',\\n ttl = '1day'\\n)\"]]";
validate_data(
"test_pipeline_context_http",
&client,
@@ -3491,13 +3493,14 @@ transform:
// )
// ENGINE=mito
// WITH(
// 'comment' = 'Created on insertion',
// append_mode = 'true'
// )
validate_data(
"test_pipeline_2_schema",
&client,
"show create table d_table",
"[[\"d_table\",\"CREATE TABLE IF NOT EXISTS \\\"d_table\\\" (\\n \\\"id1\\\" INT NULL INVERTED INDEX,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n \\\"id2\\\" STRING NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]",
"[[\"d_table\",\"CREATE TABLE IF NOT EXISTS \\\"d_table\\\" (\\n \\\"id1\\\" INT NULL INVERTED INDEX,\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n \\\"id2\\\" STRING NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]",
)
.await;
@@ -4460,10 +4463,11 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
// )
// ENGINE=metric
// WITH(
// 'comment' = 'Created on insertion',
// on_physical_table = 'greptime_physical_table',
// otlp_metric_compat = 'prom'
// )
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"host_arch\\\" STRING NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"otel_scope_name\\\" STRING NULL,\\n \\\"otel_scope_schema_url\\\" STRING NULL,\\n \\\"otel_scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"host_arch\\\", \\\"job\\\", \\\"model\\\", \\\"os_version\\\", \\\"otel_scope_name\\\", \\\"otel_scope_schema_url\\\", \\\"otel_scope_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n \'comment\' = 'Created on insertion',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
validate_data(
"otlp_metrics_all_show_create_table",
&client,
@@ -4532,10 +4536,11 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
// )
// ENGINE=metric
// WITH(
// 'comment' = 'Created on insertion',
// on_physical_table = 'greptime_physical_table',
// otlp_metric_compat = 'prom'
// )
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"os_type\\\" STRING NULL,\\n \\\"os_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"os_type\\\", \\\"os_version\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
validate_data(
"otlp_metrics_show_create_table",
&client,
@@ -4595,10 +4600,11 @@ pub async fn test_otlp_metrics_new(store_type: StorageType) {
// )
// ENGINE=metric
// WITH(
// 'comment' = 'Created on insertion',
// on_physical_table = 'greptime_physical_table',
// otlp_metric_compat = 'prom'
// )
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
let expected = "[[\"claude_code_cost_usage_USD_total\",\"CREATE TABLE IF NOT EXISTS \\\"claude_code_cost_usage_USD_total\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"greptime_value\\\" DOUBLE NULL,\\n \\\"job\\\" STRING NULL,\\n \\\"model\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL,\\n \\\"service_version\\\" STRING NULL,\\n \\\"session_id\\\" STRING NULL,\\n \\\"terminal_type\\\" STRING NULL,\\n \\\"user_id\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"job\\\", \\\"model\\\", \\\"service_name\\\", \\\"service_version\\\", \\\"session_id\\\", \\\"terminal_type\\\", \\\"user_id\\\")\\n)\\n\\nENGINE=metric\\nWITH(\\n 'comment' = 'Created on insertion',\\n on_physical_table = 'greptime_physical_table',\\n otlp_metric_compat = 'prom'\\n)\"]]";
validate_data(
"otlp_metrics_show_create_table_none",
&client,
@@ -4824,7 +4830,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
let expected = r#"[[1736480942444376000,1736480942444499000,123000,null,"c05d7a4ec8e1f231f02ed6e8da8655b4","d24f921c75f68e23","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444376000,1736480942444499000,123000,"d24f921c75f68e23","c05d7a4ec8e1f231f02ed6e8da8655b4","9630f2916e2f7909","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]],[1736480942444589000,1736480942444712000,123000,null,"cc9e0991a2e63d274984bd44ee669203","eba7be77e3558179","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-server",[],[]],[1736480942444589000,1736480942444712000,123000,"eba7be77e3558179","cc9e0991a2e63d274984bd44ee669203","8f847259b0f6e1ab","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","","telemetrygen","","telemetrygen","1.2.3.4","telemetrygen-client",[],[]]]"#;
validate_data("otlp_traces", &client, "select * from mytable;", expected).await;
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'a',\n trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
let expected_ddl = r#"[["mytable","CREATE TABLE IF NOT EXISTS \"mytable\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"timestamp_end\" TIMESTAMP(9) NULL,\n \"duration_nano\" BIGINT UNSIGNED NULL,\n \"parent_span_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"trace_id\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_id\" STRING NULL,\n \"span_kind\" STRING NULL,\n \"span_name\" STRING NULL,\n \"span_status_code\" STRING NULL,\n \"span_status_message\" STRING NULL,\n \"trace_state\" STRING NULL,\n \"scope_name\" STRING NULL,\n \"scope_version\" STRING NULL,\n \"service_name\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n \"span_attributes.net.peer.ip\" STRING NULL,\n \"span_attributes.peer.service\" STRING NULL,\n \"span_events\" JSON NULL,\n \"span_links\" JSON NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\nPARTITION ON COLUMNS (\"trace_id\") (\n trace_id < '1',\n trace_id >= '1' AND trace_id < '2',\n trace_id >= '2' AND trace_id < '3',\n trace_id >= '3' AND trace_id < '4',\n trace_id >= '4' AND trace_id < '5',\n trace_id >= '5' AND trace_id < '6',\n trace_id >= '6' AND trace_id < '7',\n trace_id >= '7' AND trace_id < '8',\n trace_id >= '8' AND trace_id < '9',\n trace_id >= '9' AND trace_id < 'a',\n trace_id >= 'a' AND trace_id < 'b',\n trace_id >= 'b' AND trace_id < 'c',\n trace_id >= 'c' AND trace_id < 'd',\n trace_id >= 'd' AND trace_id < 'e',\n trace_id >= 'e' AND trace_id < 'f',\n trace_id >= 'f'\n)\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'true',\n table_data_model = 'greptime_trace_v1'\n)"]]"#;
validate_data(
"otlp_traces",
&client,
@@ -4833,7 +4839,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
)
.await;
let expected_ddl = r#"[["mytable_services","CREATE TABLE IF NOT EXISTS \"mytable_services\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"service_name\" STRING NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\n\nENGINE=mito\nWITH(\n append_mode = 'false'\n)"]]"#;
let expected_ddl = r#"[["mytable_services","CREATE TABLE IF NOT EXISTS \"mytable_services\" (\n \"timestamp\" TIMESTAMP(9) NOT NULL,\n \"service_name\" STRING NULL,\n TIME INDEX (\"timestamp\"),\n PRIMARY KEY (\"service_name\")\n)\n\nENGINE=mito\nWITH(\n 'comment' = 'Created on insertion',\n append_mode = 'false'\n)"]]"#;
validate_data(
"otlp_traces",
&client,
@@ -5076,7 +5082,7 @@ pub async fn test_loki_pb_logs(store_type: StorageType) {
assert_eq!(StatusCode::OK, res.status());
// test schema
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n \'comment\' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]";
validate_data(
"loki_pb_schema",
&client,
@@ -5208,9 +5214,10 @@ processors:
// )
// ENGINE=mito
// WITH(
// 'comment' = 'Created on insertion',
// append_mode = 'true'
// )
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_service\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_label_wadaxi\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_service\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_label_wadaxi\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]";
validate_data(
"loki_pb_schema",
&client,
@@ -5280,7 +5287,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
assert_eq!(StatusCode::OK, res.status());
// test schema
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n \'comment\' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]";
validate_data(
"loki_json_schema",
&client,
@@ -5381,9 +5388,10 @@ processors:
// )
// ENGINE=mito
// WITH(
// 'comment' = 'Created on insertion',
// append_mode = 'true'
// )
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_sender\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_sender\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true'\\n)\"]]";
validate_data(
"loki_json_schema",
&client,
@@ -6352,7 +6360,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
.await;
assert_eq!(StatusCode::OK, res.status());
let trace_table_sql = "[[\"mytable\",\"CREATE TABLE IF NOT EXISTS \\\"mytable\\\" (\\n \\\"timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"timestamp_end\\\" TIMESTAMP(9) NULL,\\n \\\"duration_nano\\\" BIGINT UNSIGNED NULL,\\n \\\"parent_span_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"trace_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_id\\\" STRING NULL,\\n \\\"span_kind\\\" STRING NULL,\\n \\\"span_name\\\" STRING NULL,\\n \\\"span_status_code\\\" STRING NULL,\\n \\\"span_status_message\\\" STRING NULL,\\n \\\"trace_state\\\" STRING NULL,\\n \\\"scope_name\\\" STRING NULL,\\n \\\"scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_attributes.operation.type\\\" STRING NULL,\\n \\\"span_attributes.net.peer.ip\\\" STRING NULL,\\n \\\"span_attributes.peer.service\\\" STRING NULL,\\n \\\"span_events\\\" JSON NULL,\\n \\\"span_links\\\" JSON NULL,\\n TIME INDEX (\\\"timestamp\\\"),\\n PRIMARY KEY (\\\"service_name\\\")\\n)\\nPARTITION ON COLUMNS (\\\"trace_id\\\") (\\n trace_id < '1',\\n trace_id >= '1' AND trace_id < '2',\\n trace_id >= '2' AND trace_id < '3',\\n trace_id >= '3' AND trace_id < '4',\\n trace_id >= '4' AND trace_id < '5',\\n trace_id >= '5' AND trace_id < '6',\\n trace_id >= '6' AND trace_id < '7',\\n trace_id >= '7' AND trace_id < '8',\\n trace_id >= '8' AND trace_id < '9',\\n trace_id >= '9' AND trace_id < 'a',\\n trace_id >= 'a' AND trace_id < 'b',\\n trace_id >= 'b' AND trace_id < 'c',\\n trace_id >= 'c' AND trace_id < 'd',\\n trace_id >= 'd' AND trace_id < 'e',\\n trace_id >= 'e' AND trace_id < 'f',\\n trace_id >= 'f'\\n)\\nENGINE=mito\\nWITH(\\n append_mode = 'true',\\n table_data_model = 'greptime_trace_v1',\\n ttl = '7days'\\n)\"]]";
let trace_table_sql = "[[\"mytable\",\"CREATE TABLE IF NOT EXISTS \\\"mytable\\\" (\\n \\\"timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"timestamp_end\\\" TIMESTAMP(9) NULL,\\n \\\"duration_nano\\\" BIGINT UNSIGNED NULL,\\n \\\"parent_span_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"trace_id\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_id\\\" STRING NULL,\\n \\\"span_kind\\\" STRING NULL,\\n \\\"span_name\\\" STRING NULL,\\n \\\"span_status_code\\\" STRING NULL,\\n \\\"span_status_message\\\" STRING NULL,\\n \\\"trace_state\\\" STRING NULL,\\n \\\"scope_name\\\" STRING NULL,\\n \\\"scope_version\\\" STRING NULL,\\n \\\"service_name\\\" STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\\n \\\"span_attributes.operation.type\\\" STRING NULL,\\n \\\"span_attributes.net.peer.ip\\\" STRING NULL,\\n \\\"span_attributes.peer.service\\\" STRING NULL,\\n \\\"span_events\\\" JSON NULL,\\n \\\"span_links\\\" JSON NULL,\\n TIME INDEX (\\\"timestamp\\\"),\\n PRIMARY KEY (\\\"service_name\\\")\\n)\\nPARTITION ON COLUMNS (\\\"trace_id\\\") (\\n trace_id < '1',\\n trace_id >= '1' AND trace_id < '2',\\n trace_id >= '2' AND trace_id < '3',\\n trace_id >= '3' AND trace_id < '4',\\n trace_id >= '4' AND trace_id < '5',\\n trace_id >= '5' AND trace_id < '6',\\n trace_id >= '6' AND trace_id < '7',\\n trace_id >= '7' AND trace_id < '8',\\n trace_id >= '8' AND trace_id < '9',\\n trace_id >= '9' AND trace_id < 'a',\\n trace_id >= 'a' AND trace_id < 'b',\\n trace_id >= 'b' AND trace_id < 'c',\\n trace_id >= 'c' AND trace_id < 'd',\\n trace_id >= 'd' AND trace_id < 'e',\\n trace_id >= 'e' AND trace_id < 'f',\\n trace_id >= 'f'\\n)\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'true',\\n table_data_model = 'greptime_trace_v1',\\n ttl = '7days'\\n)\"]]";
validate_data(
"trace_v1_create_table",
&client,
@@ -6361,7 +6369,7 @@ pub async fn test_jaeger_query_api_for_trace_v1(store_type: StorageType) {
)
.await;
let trace_meta_table_sql = "[[\"mytable_services\",\"CREATE TABLE IF NOT EXISTS \\\"mytable_services\\\" (\\n \\\"timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"service_name\\\" STRING NULL,\\n TIME INDEX (\\\"timestamp\\\"),\\n PRIMARY KEY (\\\"service_name\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'false'\\n)\"]]";
let trace_meta_table_sql = "[[\"mytable_services\",\"CREATE TABLE IF NOT EXISTS \\\"mytable_services\\\" (\\n \\\"timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"service_name\\\" STRING NULL,\\n TIME INDEX (\\\"timestamp\\\"),\\n PRIMARY KEY (\\\"service_name\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n 'comment' = 'Created on insertion',\\n append_mode = 'false'\\n)\"]]";
validate_data(
"trace_v1_create_meta_table",
&client,

View File

@@ -15,9 +15,9 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs;
+----------+-------------------------------------------+
+----------+---------------------------------------------------+
| Table | Create Table |
+----------+-------------------------------------------+
+----------+---------------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "count(http_requests.val)" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
@@ -27,8 +27,10 @@ SHOW CREATE TABLE cnt_reqs;
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+----------+---------------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
@@ -157,9 +159,9 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs;
+----------+-------------------------------------------+
+----------+---------------------------------------------------+
| Table | Create Table |
+----------+-------------------------------------------+
+----------+---------------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "count(http_requests.val)" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
@@ -169,8 +171,10 @@ SHOW CREATE TABLE cnt_reqs;
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+----------+---------------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
@@ -258,7 +262,9 @@ SHOW CREATE TABLE rate_reqs;
| | ) |
| | |
| | ENGINE=mito |
| | |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-----------+-----------------------------------------------------------+
-- test if sink table is tql queryable
@@ -337,7 +343,9 @@ SHOW CREATE TABLE rate_reqs;
| | ) |
| | |
| | ENGINE=mito |
| | |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-----------+-----------------------------------------------------------+
-- test if sink table is tql queryable

View File

@@ -0,0 +1,184 @@
-- Test: COMMENT ON TABLE add & remove
CREATE TABLE comment_table_test (
pk INT,
val DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(pk)
);
Affected Rows: 0
-- Add table comment
COMMENT ON TABLE comment_table_test IS 'table level description';
Affected Rows: 0
SHOW CREATE TABLE comment_table_test;
+--------------------+---------------------------------------------------+
| Table | Create Table |
+--------------------+---------------------------------------------------+
| comment_table_test | CREATE TABLE IF NOT EXISTS "comment_table_test" ( |
| | "pk" INT NULL, |
| | "val" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("pk") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | comment = 'table level description' |
| | ) |
+--------------------+---------------------------------------------------+
-- Remove table comment
COMMENT ON TABLE comment_table_test IS NULL;
Affected Rows: 0
SHOW CREATE TABLE comment_table_test;
+--------------------+---------------------------------------------------+
| Table | Create Table |
+--------------------+---------------------------------------------------+
| comment_table_test | CREATE TABLE IF NOT EXISTS "comment_table_test" ( |
| | "pk" INT NULL, |
| | "val" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("pk") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+--------------------+---------------------------------------------------+
DROP TABLE comment_table_test;
Affected Rows: 0
-- Test: COMMENT ON COLUMN add & remove
CREATE TABLE comment_column_test (
pk INT,
val DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(pk)
);
Affected Rows: 0
-- Add column comment
COMMENT ON COLUMN comment_column_test.val IS 'value column description';
Affected Rows: 0
SHOW CREATE TABLE comment_column_test;
+---------------------+---------------------------------------------------------+
| Table | Create Table |
+---------------------+---------------------------------------------------------+
| comment_column_test | CREATE TABLE IF NOT EXISTS "comment_column_test" ( |
| | "pk" INT NULL, |
| | "val" DOUBLE NULL COMMENT 'value column description', |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("pk") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+---------------------+---------------------------------------------------------+
-- Remove column comment
COMMENT ON COLUMN comment_column_test.val IS NULL;
Affected Rows: 0
SHOW CREATE TABLE comment_column_test;
+---------------------+----------------------------------------------------+
| Table | Create Table |
+---------------------+----------------------------------------------------+
| comment_column_test | CREATE TABLE IF NOT EXISTS "comment_column_test" ( |
| | "pk" INT NULL, |
| | "val" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("pk") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+---------------------+----------------------------------------------------+
DROP TABLE comment_column_test;
Affected Rows: 0
-- Test: COMMENT ON FLOW add & remove
-- Prepare source & sink tables
CREATE TABLE flow_source_comment_test (
desc_str STRING,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE TABLE flow_sink_comment_test (
desc_str STRING,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE FLOW flow_comment_test
SINK TO flow_sink_comment_test
AS
SELECT desc_str, ts FROM flow_source_comment_test;
Affected Rows: 0
-- Add flow comment
COMMENT ON FLOW flow_comment_test IS 'flow level description';
Affected Rows: 0
SHOW CREATE FLOW flow_comment_test;
+-------------------+------------------------------------------------------+
| Flow | Create Flow |
+-------------------+------------------------------------------------------+
| flow_comment_test | CREATE FLOW IF NOT EXISTS flow_comment_test |
| | SINK TO flow_sink_comment_test |
| | COMMENT 'flow level description' |
| | AS SELECT desc_str, ts FROM flow_source_comment_test |
+-------------------+------------------------------------------------------+
-- Remove flow comment
COMMENT ON FLOW flow_comment_test IS NULL;
Affected Rows: 0
SHOW CREATE FLOW flow_comment_test;
+-------------------+------------------------------------------------------+
| Flow | Create Flow |
+-------------------+------------------------------------------------------+
| flow_comment_test | CREATE FLOW IF NOT EXISTS flow_comment_test |
| | SINK TO flow_sink_comment_test |
| | AS SELECT desc_str, ts FROM flow_source_comment_test |
+-------------------+------------------------------------------------------+
DROP FLOW flow_comment_test;
Affected Rows: 0
DROP TABLE flow_source_comment_test;
Affected Rows: 0
DROP TABLE flow_sink_comment_test;
Affected Rows: 0

View File

@@ -0,0 +1,65 @@
-- Test: COMMENT ON TABLE add & remove
CREATE TABLE comment_table_test (
pk INT,
val DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(pk)
);
-- Add table comment
COMMENT ON TABLE comment_table_test IS 'table level description';
SHOW CREATE TABLE comment_table_test;
-- Remove table comment
COMMENT ON TABLE comment_table_test IS NULL;
SHOW CREATE TABLE comment_table_test;
DROP TABLE comment_table_test;
-- Test: COMMENT ON COLUMN add & remove
CREATE TABLE comment_column_test (
pk INT,
val DOUBLE,
ts TIMESTAMP TIME INDEX,
PRIMARY KEY(pk)
);
-- Add column comment
COMMENT ON COLUMN comment_column_test.val IS 'value column description';
SHOW CREATE TABLE comment_column_test;
-- Remove column comment
COMMENT ON COLUMN comment_column_test.val IS NULL;
SHOW CREATE TABLE comment_column_test;
DROP TABLE comment_column_test;
-- Test: COMMENT ON FLOW add & remove
-- Prepare source & sink tables
CREATE TABLE flow_source_comment_test (
desc_str STRING,
ts TIMESTAMP TIME INDEX
);
CREATE TABLE flow_sink_comment_test (
desc_str STRING,
ts TIMESTAMP TIME INDEX
);
CREATE FLOW flow_comment_test
SINK TO flow_sink_comment_test
AS
SELECT desc_str, ts FROM flow_source_comment_test;
-- Add flow comment
COMMENT ON FLOW flow_comment_test IS 'flow level description';
SHOW CREATE FLOW flow_comment_test;
-- Remove flow comment
COMMENT ON FLOW flow_comment_test IS NULL;
SHOW CREATE FLOW flow_comment_test;
DROP FLOW flow_comment_test;
DROP TABLE flow_source_comment_test;
DROP TABLE flow_sink_comment_test;

View File

@@ -46,6 +46,7 @@ SHOW CREATE TABLE distinct_basic;
| | ) |
+----------------+-----------------------------------------------------------+
-- SQLNESS REPLACE \d{4} REDACTED
SHOW CREATE TABLE out_distinct_basic;
+--------------------+---------------------------------------------------+
@@ -60,7 +61,9 @@ SHOW CREATE TABLE out_distinct_basic;
| | ) |
| | |
| | ENGINE=mito |
| | |
| | WITH( |
| | 'comment' = 'Sink table for flow flow-id=REDACTED' |
| | ) |
+--------------------+---------------------------------------------------+
-- SQLNESS SLEEP 3s
@@ -242,7 +245,9 @@ SHOW CREATE TABLE out_distinct_basic;
| | ) |
| | |
| | ENGINE=mito |
| | |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+--------------------+---------------------------------------------------+
SELECT

View File

@@ -20,6 +20,7 @@ SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS;
SHOW CREATE TABLE distinct_basic;
-- SQLNESS REPLACE \d{4} REDACTED
SHOW CREATE TABLE out_distinct_basic;
-- SQLNESS SLEEP 3s

View File

@@ -20,9 +20,9 @@ Affected Rows: 0
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
+-------------------+---------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
+-------------------+---------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
| | "time_window" TIMESTAMP(9) NOT NULL, |
@@ -31,8 +31,10 @@ SHOW CREATE TABLE out_num_cnt_basic;
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-------------------+---------------------------------------------------+
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('test_numbers_basic');
@@ -55,9 +57,9 @@ SELECT 1;
-- SQLNESS SLEEP 3s
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
+-------------------+---------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
+-------------------+---------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
| | "time_window" TIMESTAMP(9) NOT NULL, |
@@ -66,8 +68,10 @@ SHOW CREATE TABLE out_num_cnt_basic;
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-------------------+---------------------------------------------------+
SHOW CREATE FLOW test_numbers_basic;
@@ -122,9 +126,9 @@ SELECT 1;
-- SQLNESS SLEEP 3s
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
+-------------------+---------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
+-------------------+---------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sumup" BIGINT NULL, |
| | "event_time" TIMESTAMP(3) NOT NULL, |
@@ -133,8 +137,10 @@ SHOW CREATE TABLE out_num_cnt_basic;
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-------------------+---------------------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
@@ -158,9 +164,9 @@ SHOW CREATE FLOW test_numbers_basic;
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
+-------------------+---------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
+-------------------+---------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sumup" BIGINT NULL, |
| | "event_time" TIMESTAMP(3) NOT NULL, |
@@ -169,8 +175,10 @@ SHOW CREATE TABLE out_num_cnt_basic;
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-------------------+---------------------------------------------------+
DROP FLOW test_numbers_basic;

View File

@@ -20,9 +20,9 @@ Affected Rows: 0
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
+-------------------+---------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
+-------------------+---------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
| | "time_window" TIMESTAMP(9) NOT NULL, |
@@ -31,8 +31,10 @@ SHOW CREATE TABLE out_num_cnt_basic;
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-------------------+---------------------------------------------------+
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1
@@ -47,9 +49,9 @@ ADMIN FLUSH_FLOW('test_numbers_basic');
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
+-------------------+---------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
+-------------------+---------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "sum(numbers_input_basic.number)" BIGINT NULL, |
| | "time_window" TIMESTAMP(9) NOT NULL, |
@@ -58,8 +60,10 @@ SHOW CREATE TABLE out_num_cnt_basic;
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-------------------+---------------------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
@@ -172,9 +176,9 @@ Affected Rows: 0
SHOW CREATE TABLE out_basic;
+-----------+---------------------------------------------+
+-----------+---------------------------------------------------+
| Table | Create Table |
+-----------+---------------------------------------------+
+-----------+---------------------------------------------------+
| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( |
| | "wildcard" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
@@ -183,8 +187,10 @@ SHOW CREATE TABLE out_basic;
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+---------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-----------+---------------------------------------------------+
DROP FLOW test_wildcard_basic;
@@ -200,9 +206,9 @@ Affected Rows: 0
SHOW CREATE TABLE out_basic;
+-----------+---------------------------------------------+
+-----------+---------------------------------------------------+
| Table | Create Table |
+-----------+---------------------------------------------+
+-----------+---------------------------------------------------+
| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( |
| | "wildcard" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
@@ -211,8 +217,10 @@ SHOW CREATE TABLE out_basic;
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+---------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-----------+---------------------------------------------------+
-- SQLNESS ARG restart=true
SELECT 1;
@@ -243,9 +251,9 @@ ADMIN FLUSH_FLOW('test_wildcard_basic');
SHOW CREATE TABLE out_basic;
+-----------+---------------------------------------------+
+-----------+---------------------------------------------------+
| Table | Create Table |
+-----------+---------------------------------------------+
+-----------+---------------------------------------------------+
| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( |
| | "wildcard" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
@@ -254,8 +262,10 @@ SHOW CREATE TABLE out_basic;
| | ) |
| | |
| | ENGINE=mito |
| | |
+-----------+---------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-----------+---------------------------------------------------+
SELECT wildcard FROM out_basic;
@@ -309,7 +319,9 @@ SHOW CREATE TABLE out_distinct_basic;
| | ) |
| | |
| | ENGINE=mito |
| | |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+--------------------+---------------------------------------------------+
-- TODO(discord9): confirm if it's necessary to flush flow here?
@@ -365,7 +377,9 @@ SHOW CREATE TABLE out_distinct_basic;
| | ) |
| | |
| | ENGINE=mito |
| | |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+--------------------+---------------------------------------------------+
SELECT
@@ -637,9 +651,9 @@ Affected Rows: 0
SHOW CREATE TABLE ngx_country;
+-------------+---------------------------------------------+
+-------------+---------------------------------------------------+
| Table | Create Table |
+-------------+---------------------------------------------+
+-------------+---------------------------------------------------+
| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( |
| | "country" STRING NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
@@ -649,8 +663,10 @@ SHOW CREATE TABLE ngx_country;
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+---------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-------------+---------------------------------------------------+
INSERT INTO
ngx_access_log
@@ -670,9 +686,9 @@ ADMIN FLUSH_FLOW('calc_ngx_country');
SHOW CREATE TABLE ngx_country;
+-------------+---------------------------------------------+
+-------------+---------------------------------------------------+
| Table | Create Table |
+-------------+---------------------------------------------+
+-------------+---------------------------------------------------+
| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( |
| | "country" STRING NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
@@ -682,8 +698,10 @@ SHOW CREATE TABLE ngx_country;
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+---------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-------------+---------------------------------------------------+
SELECT
country
@@ -787,9 +805,9 @@ Affected Rows: 0
SHOW CREATE TABLE ngx_country;
+-------------+--------------------------------------------+
+-------------+---------------------------------------------------+
| Table | Create Table |
+-------------+--------------------------------------------+
+-------------+---------------------------------------------------+
| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( |
| | "country" STRING NULL, |
| | "time_window" TIMESTAMP(3) NOT NULL, |
@@ -799,8 +817,10 @@ SHOW CREATE TABLE ngx_country;
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+--------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-------------+---------------------------------------------------+
INSERT INTO
ngx_access_log
@@ -820,9 +840,9 @@ ADMIN FLUSH_FLOW('calc_ngx_country');
SHOW CREATE TABLE ngx_country;
+-------------+--------------------------------------------+
+-------------+---------------------------------------------------+
| Table | Create Table |
+-------------+--------------------------------------------+
+-------------+---------------------------------------------------+
| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( |
| | "country" STRING NULL, |
| | "time_window" TIMESTAMP(3) NOT NULL, |
@@ -832,8 +852,10 @@ SHOW CREATE TABLE ngx_country;
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------+--------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-------------+---------------------------------------------------+
SELECT
country,
@@ -1673,9 +1695,9 @@ Affected Rows: 0
SHOW CREATE TABLE out_num_cnt_basic;
+-------------------+--------------------------------------------------+
+-------------------+---------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
+-------------------+---------------------------------------------------+
| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( |
| | "avg_after_filter_num" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
@@ -1684,8 +1706,10 @@ SHOW CREATE TABLE out_num_cnt_basic;
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------------------+--------------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-------------------+---------------------------------------------------+
-- TODO(discord9): confirm if it's necessary to flush flow here?
-- because flush_flow result is at most 1

View File

@@ -15,9 +15,9 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs;
+----------+-------------------------------------------+
+----------+---------------------------------------------------+
| Table | Create Table |
+----------+-------------------------------------------+
+----------+---------------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "count(http_requests.val)" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
@@ -27,8 +27,10 @@ SHOW CREATE TABLE cnt_reqs;
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+----------+---------------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
@@ -157,9 +159,9 @@ Affected Rows: 0
SHOW CREATE TABLE cnt_reqs;
+----------+-------------------------------------------+
+----------+---------------------------------------------------+
| Table | Create Table |
+----------+-------------------------------------------+
+----------+---------------------------------------------------+
| cnt_reqs | CREATE TABLE IF NOT EXISTS "cnt_reqs" ( |
| | "count(http_requests.val)" DOUBLE NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
@@ -169,8 +171,10 @@ SHOW CREATE TABLE cnt_reqs;
| | ) |
| | |
| | ENGINE=mito |
| | |
+----------+-------------------------------------------+
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+----------+---------------------------------------------------+
-- test if sink table is tql queryable
TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", cnt_reqs);
@@ -258,7 +262,9 @@ SHOW CREATE TABLE rate_reqs;
| | ) |
| | |
| | ENGINE=mito |
| | |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-----------+-----------------------------------------------------------+
-- test if sink table is tql queryable
@@ -337,7 +343,9 @@ SHOW CREATE TABLE rate_reqs;
| | ) |
| | |
| | ENGINE=mito |
| | |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-----------+-----------------------------------------------------------+
-- test if sink table is tql queryable