diff --git a/Cargo.lock b/Cargo.lock index e8a5a34460..7b41073e6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11574,6 +11574,7 @@ dependencies = [ "openmetrics-parser", "opensrv-mysql", "opentelemetry-proto", + "operator", "otel-arrow-rust", "parking_lot 0.12.4", "permutation", diff --git a/src/operator/src/expr_helper.rs b/src/operator/src/expr_helper.rs index 03ef696102..8658a7fdb2 100644 --- a/src/operator/src/expr_helper.rs +++ b/src/operator/src/expr_helper.rs @@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet}; use api::helper::ColumnDataTypeWrapper; use api::v1::alter_database_expr::Kind as AlterDatabaseKind; use api::v1::alter_table_expr::Kind as AlterTableKind; -use api::v1::column_def::options_from_column_schema; +use api::v1::column_def::{options_from_column_schema, try_as_column_schema}; use api::v1::{ AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType, ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn, @@ -36,7 +36,12 @@ use common_grpc_expr::util::ColumnExpr; use common_time::Timezone; use datafusion::sql::planner::object_name_to_table_reference; use datatypes::schema::{ - COMMENT_KEY, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SkippingIndexType, + COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND, + COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, + COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, + COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, + ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, + SkippingIndexType, }; use file_engine::FileOptions; use query::sql::{ @@ -46,15 +51,21 @@ use query::sql::{ use session::context::QueryContextRef; use session::table_name::table_idents_to_full_name; use snafu::{OptionExt, ResultExt, ensure}; -use sql::ast::{ColumnOption, ObjectName, ObjectNamePartExt}; +use sql::ast::{ + ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt, +}; +use sql::dialect::GreptimeDbDialect; +use sql::parser::ParserContext; use sql::statements::alter::{ AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation, }; use sql::statements::create::{ - Column as SqlColumn, CreateExternalTable, CreateFlow, CreateTable, CreateView, TableConstraint, + Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable, + CreateView, TableConstraint, }; use sql::statements::{ - column_to_schema, sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, + OptionMap, column_to_schema, concrete_data_type_to_sql_data_type, + sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value, }; use sql::util::extract_tables_from_query; use table::requests::{FILE_TABLE_META_KEY, TableOptions}; @@ -65,9 +76,9 @@ pub use trigger::to_create_trigger_task_expr; use crate::error::{ BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu, ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu, - IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidFlowNameSnafu, InvalidSqlSnafu, - NotSupportedSnafu, ParseSqlSnafu, PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, - UnrecognizedTableOptionSnafu, + IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu, + InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu, + PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu, }; pub fn create_table_expr_by_column_schemas( @@ -230,6 +241,181 @@ pub fn create_to_expr( Ok(expr) } +/// Convert gRPC's [`CreateTableExpr`] back to `CreateTable` statement. +/// You can use `create_table_expr_by_column_schemas` to create a `CreateTableExpr` from column schemas. +/// +/// # Parameters +/// +/// * `expr` - The `CreateTableExpr` to convert +/// * `quote_style` - Optional quote style for identifiers (defaults to MySQL style ` backtick) +pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option) -> Result { + let quote_style = quote_style.unwrap_or('`'); + + // Convert table name + let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier( + sql::ast::Ident::with_quote(quote_style, &expr.table_name), + )]); + + // Convert columns + let mut columns = Vec::with_capacity(expr.column_defs.len()); + for column_def in &expr.column_defs { + let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu { + column: &column_def.name, + })?; + + let mut options = Vec::new(); + + // Add NULL/NOT NULL constraint + if column_def.is_nullable { + options.push(ColumnOptionDef { + name: None, + option: ColumnOption::Null, + }); + } else { + options.push(ColumnOptionDef { + name: None, + option: ColumnOption::NotNull, + }); + } + + // Add DEFAULT constraint if present + if let Some(default_constraint) = column_schema.default_constraint() { + let expr = match default_constraint { + ColumnDefaultConstraint::Value(v) => { + Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into()) + } + ColumnDefaultConstraint::Function(func_expr) => { + ParserContext::parse_function(func_expr, &GreptimeDbDialect {}) + .context(ParseSqlSnafu)? + } + }; + options.push(ColumnOptionDef { + name: None, + option: ColumnOption::Default(expr), + }); + } + + // Add COMMENT if present + if !column_def.comment.is_empty() { + options.push(ColumnOptionDef { + name: None, + option: ColumnOption::Comment(column_def.comment.clone()), + }); + } + + // Note: We don't add inline PRIMARY KEY options here, + // we'll handle all primary keys as constraints instead for consistency + + // Handle column extensions (fulltext, inverted index, skipping index) + let mut extensions = ColumnExtensions::default(); + + // Add fulltext index options if present + if let Ok(Some(opt)) = column_schema.fulltext_options() + && opt.enable + { + let mut map = HashMap::from([ + ( + COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(), + opt.analyzer.to_string(), + ), + ( + COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(), + opt.case_sensitive.to_string(), + ), + ( + COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(), + opt.backend.to_string(), + ), + ]); + if opt.backend == FulltextBackend::Bloom { + map.insert( + COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(), + opt.granularity.to_string(), + ); + map.insert( + COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(), + opt.false_positive_rate().to_string(), + ); + } + extensions.fulltext_index_options = Some(map.into()); + } + + // Add skipping index options if present + if let Ok(Some(opt)) = column_schema.skipping_index_options() { + let map = HashMap::from([ + ( + COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(), + opt.granularity.to_string(), + ), + ( + COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(), + opt.false_positive_rate().to_string(), + ), + ( + COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(), + opt.index_type.to_string(), + ), + ]); + extensions.skipping_index_options = Some(map.into()); + } + + // Add inverted index options if present + if column_schema.is_inverted_indexed() { + extensions.inverted_index_options = Some(HashMap::new().into()); + } + + let sql_column = SqlColumn { + column_def: ColumnDef { + name: Ident::with_quote(quote_style, &column_def.name), + data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type) + .context(ParseSqlSnafu)?, + options, + }, + extensions, + }; + + columns.push(sql_column); + } + + // Convert constraints + let mut constraints = Vec::new(); + + // Add TIME INDEX constraint + constraints.push(TableConstraint::TimeIndex { + column: Ident::with_quote(quote_style, &expr.time_index), + }); + + // Add PRIMARY KEY constraint (always add as constraint for consistency) + if !expr.primary_keys.is_empty() { + let primary_key_columns: Vec = expr + .primary_keys + .iter() + .map(|pk| Ident::with_quote(quote_style, pk)) + .collect(); + + constraints.push(TableConstraint::PrimaryKey { + columns: primary_key_columns, + }); + } + + // Convert table options + let mut options = OptionMap::default(); + for (key, value) in &expr.table_options { + options.insert(key.clone(), value.clone()); + } + + Ok(CreateTable { + if_not_exists: expr.create_if_not_exists, + table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0), + name: table_name, + columns, + engine: expr.engine.clone(), + constraints, + options, + partitions: None, + }) +} + /// Validate the [`CreateTableExpr`] request. pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> { // construct column list @@ -1301,4 +1487,41 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; assert_eq!(columns, expr.columns); assert_eq!(plan_columns, expr.plan_columns); } + + #[test] + fn test_expr_to_create() { + let sql = r#"CREATE TABLE IF NOT EXISTS `tt` ( + `timestamp` TIMESTAMP(9) NOT NULL, + `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'), + `username` STRING NULL, + `http_method` STRING NULL INVERTED INDEX, + `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'), + `protocol` STRING NULL, + `status_code` INT NULL INVERTED INDEX, + `response_size` BIGINT NULL, + `message` STRING NULL, + TIME INDEX (`timestamp`), + PRIMARY KEY (`username`, `status_code`) +) +ENGINE=mito +WITH( + append_mode = 'true' +)"#; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::CreateTable(original_create) = stmt else { + unreachable!() + }; + + // Convert CreateTable -> CreateTableExpr -> CreateTable + let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap(); + + let create_table = expr_to_create(&expr, Some('`')).unwrap(); + let new_sql = format!("{:#}", create_table); + assert_eq!(sql, new_sql); + } } diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 692b509f6e..1c7336dc7b 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -342,6 +342,12 @@ impl Pipeline { TransformerMode::AutoTransform(_, _) => None, } } + + pub fn is_variant_table_name(&self) -> bool { + // even if the pipeline doesn't have dispatcher or table_suffix, + // it can still be a variant because of VRL processor and hint + self.dispatcher.is_some() || self.tablesuffix.is_some() + } } pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result { diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 6ba0bdb014..2f1023ed40 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -86,6 +86,7 @@ socket2 = "0.5" # 2. Use ring, instead of aws-lc-rs in https://github.com/databendlabs/opensrv/pull/72 opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "a1fb4da215c8693c7e4f62be249a01b7fec52997" } opentelemetry-proto.workspace = true +operator.workspace = true otel-arrow-rust.workspace = true parking_lot.workspace = true pgwire = { version = "0.32.1", default-features = false, features = ["server-api-ring"] } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 8319541248..596f6cee08 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -1027,6 +1027,10 @@ impl HttpServer { "/pipelines/{pipeline_name}", routing::get(event::query_pipeline), ) + .route( + "/pipelines/{pipeline_name}/ddl", + routing::get(event::query_pipeline_ddl), + ) .route( "/pipelines/{pipeline_name}", routing::post(event::add_pipeline), diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 6f31a8b41a..0b43dc7a32 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -27,13 +27,15 @@ use axum::http::{HeaderMap, StatusCode}; use axum::response::{IntoResponse, Response}; use axum::{Extension, Json}; use axum_extra::TypedHeader; -use common_error::ext::ErrorExt; +use common_catalog::consts::default_engine; +use common_error::ext::{BoxedError, ErrorExt}; use common_query::{Output, OutputData}; use common_telemetry::{error, warn}; use datatypes::value::column_data_to_json; use headers::ContentType; use lazy_static::lazy_static; use mime_guess::mime; +use operator::expr_helper::{create_table_expr_by_column_schemas, expr_to_create}; use pipeline::util::to_pipeline_version; use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition}; use serde::{Deserialize, Serialize}; @@ -41,18 +43,21 @@ use serde_json::{Deserializer, Map, Value as JsonValue, json}; use session::context::{Channel, QueryContext, QueryContextRef}; use simd_json::Buffers; use snafu::{OptionExt, ResultExt, ensure}; +use store_api::mito_engine_options::APPEND_MODE_KEY; use strum::{EnumIter, IntoEnumIterator}; +use table::table_reference::TableReference; use vrl::value::{KeyString, Value as VrlValue}; use crate::error::{ - Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result, status_code_to_http_status, + CatalogSnafu, Error, InvalidParameterSnafu, OtherSnafu, ParseJsonSnafu, PipelineSnafu, Result, + status_code_to_http_status, }; use crate::http::HttpResponse; use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER; use crate::http::header::{ CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_NDJSON_SUBTYPE_STR, CONTENT_TYPE_PROTOBUF_STR, }; -use crate::http::result::greptime_manage_resp::GreptimedbManageResponse; +use crate::http::result::greptime_manage_resp::{GreptimedbManageResponse, SqlOutput}; use crate::http::result::greptime_result_v1::GreptimedbV1Response; use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef}; use crate::metrics::{ @@ -65,6 +70,11 @@ use crate::query_handler::PipelineHandlerRef; const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_"; const GREPTIME_PIPELINE_SKIP_ERROR_KEY: &str = "skip_error"; +const CREATE_TABLE_SQL_SUFFIX_EXISTS: &str = + "the pipeline has dispatcher or table_suffix, the table name may not be fixed"; +const CREATE_TABLE_SQL_TABLE_EXISTS: &str = + "table already exists, the CREATE TABLE SQL may be different"; + lazy_static! { pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json(); pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text(); @@ -195,6 +205,90 @@ pub async fn query_pipeline( )) } +/// Generate DDL from pipeline definition. +#[axum_macros::debug_handler] +pub async fn query_pipeline_ddl( + State(state): State, + Extension(mut query_ctx): Extension, + Query(query_params): Query, + Path(pipeline_name): Path, +) -> Result { + let start = Instant::now(); + let handler = state.log_handler; + ensure!( + !pipeline_name.is_empty(), + InvalidParameterSnafu { + reason: "pipeline_name is required in path", + } + ); + ensure!( + !pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX), + InvalidParameterSnafu { + reason: "built-in pipelines don't have fixed table schema", + } + ); + let table_name = query_params.table.context(InvalidParameterSnafu { + reason: "table name is required", + })?; + + let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?; + + query_ctx.set_channel(Channel::Log); + let query_ctx = Arc::new(query_ctx); + + let pipeline = handler + .get_pipeline(&pipeline_name, version, query_ctx.clone()) + .await?; + + let schemas_def = pipeline.schemas().context(InvalidParameterSnafu { + reason: "auto transform doesn't have fixed table schema", + })?; + + let schema = query_ctx.current_schema(); + let table_name_ref = TableReference { + catalog: query_ctx.current_catalog(), + schema: &schema, + table: &table_name, + }; + + let mut create_table_expr = + create_table_expr_by_column_schemas(&table_name_ref, schemas_def, default_engine(), None) + .map_err(BoxedError::new) + .context(OtherSnafu)?; + + // manually set the append_mode to true + create_table_expr + .table_options + .insert(APPEND_MODE_KEY.to_string(), "true".to_string()); + + let expr = expr_to_create(&create_table_expr, None) + .map_err(BoxedError::new) + .context(OtherSnafu)?; + + let message = if handler + .get_table(&table_name, &query_ctx) + .await + .context(CatalogSnafu)? + .is_some() + { + Some(CREATE_TABLE_SQL_TABLE_EXISTS.to_string()) + } else if pipeline.is_variant_table_name() { + Some(CREATE_TABLE_SQL_SUFFIX_EXISTS.to_string()) + } else { + None + }; + + let sql = SqlOutput { + sql: format!("{:#}", expr), + message, + }; + + Ok(GreptimedbManageResponse::from_sql( + sql, + start.elapsed().as_millis() as u64, + )) +} + #[axum_macros::debug_handler] pub async fn add_pipeline( State(state): State, diff --git a/src/servers/src/http/result/greptime_manage_resp.rs b/src/servers/src/http/result/greptime_manage_resp.rs index a875204082..3f7f3c6eec 100644 --- a/src/servers/src/http/result/greptime_manage_resp.rs +++ b/src/servers/src/http/result/greptime_manage_resp.rs @@ -55,6 +55,13 @@ impl GreptimedbManageResponse { } } + pub fn from_sql(sql: SqlOutput, execution_time_ms: u64) -> Self { + GreptimedbManageResponse { + manage_result: ManageResult::Sql { sql }, + execution_time_ms, + } + } + pub fn with_execution_time(mut self, execution_time: u64) -> Self { self.execution_time_ms = execution_time; self @@ -69,8 +76,7 @@ impl GreptimedbManageResponse { #[serde(untagged)] pub enum ManageResult { Pipelines { pipelines: Vec }, - // todo(shuiyisong): refactor scripts api - Scripts(), + Sql { sql: SqlOutput }, } #[derive(Serialize, Deserialize, Debug)] @@ -81,6 +87,13 @@ pub struct PipelineOutput { pipeline: Option, } +#[derive(Serialize, Deserialize, Debug)] +pub struct SqlOutput { + pub(crate) sql: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) message: Option, +} + impl IntoResponse for GreptimedbManageResponse { fn into_response(self) -> axum::response::Response { let execution_time = self.execution_time_ms; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index d02ed2ecd6..8e965dd9b7 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -124,6 +124,7 @@ macro_rules! http_tests { test_pipeline_2, test_pipeline_skip_error, test_pipeline_filter, + test_pipeline_create_table, test_otlp_metrics_new, test_otlp_traces_v0, @@ -2506,6 +2507,98 @@ transform: guard.remove_all().await; } +pub async fn test_pipeline_create_table(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(store_type, "test_pipeline_create_table").await; + + // handshake + let client = TestClient::new(app).await; + + let pipeline_body = r#" +processors: +- dissect: + fields: + - message + patterns: + - '%{ip_address} - %{username} [%{timestamp}] "%{http_method} %{request_line} %{protocol}" %{status_code} %{response_size}' + ignore_missing: true +- date: + fields: + - timestamp + formats: + - "%d/%b/%Y:%H:%M:%S %z" + +transform: + - fields: + - timestamp + type: time + index: timestamp + - fields: + - ip_address + type: string + index: skipping + - fields: + - username + type: string + tag: true + - fields: + - http_method + type: string + index: inverted + - fields: + - request_line + type: string + index: fulltext + - fields: + - protocol + type: string + - fields: + - status_code + type: int32 + index: inverted + tag: true + - fields: + - response_size + type: int64 + on_failure: default + default: 0 + - fields: + - message + type: string +"#; + + // 1. create pipeline + let res = client + .post("/v1/events/pipelines/test") + .header("Content-Type", "application/x-yaml") + .body(pipeline_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + let res = client + .get("/v1/pipelines/test/ddl?table=logs1") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let resp: serde_json::Value = res.json().await; + let sql = resp + .get("sql") + .unwrap() + .get("sql") + .unwrap() + .as_str() + .unwrap(); + + assert_eq!( + "CREATE TABLE IF NOT EXISTS `logs1` (\n `timestamp` TIMESTAMP(9) NOT NULL,\n `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n `username` STRING NULL,\n `http_method` STRING NULL INVERTED INDEX,\n `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),\n `protocol` STRING NULL,\n `status_code` INT NULL INVERTED INDEX,\n `response_size` BIGINT NULL,\n `message` STRING NULL,\n TIME INDEX (`timestamp`),\n PRIMARY KEY (`username`, `status_code`)\n)\nENGINE=mito\nWITH(\n append_mode = 'true'\n)", + sql + ); + + guard.remove_all().await; +} + pub async fn test_pipeline_dispatcher(storage_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) =