feat(pipeline): generate create table sql from pipeline config (#6930)

* chore: add expr_to_create function using cc and polish

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* feat: add get pipeline create table

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update test_expr_to_create

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: column extension

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add message hint for variant table name

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add message hint for existing table

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* test: create table for pipeline

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update comment

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: rename to query pipeline ddl

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2025-09-09 10:47:51 +08:00
committed by GitHub
parent 948a6578fa
commit ac051af201
8 changed files with 448 additions and 13 deletions

1
Cargo.lock generated
View File

@@ -11574,6 +11574,7 @@ dependencies = [
"openmetrics-parser",
"opensrv-mysql",
"opentelemetry-proto",
"operator",
"otel-arrow-rust",
"parking_lot 0.12.4",
"permutation",

View File

@@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet};
use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_database_expr::Kind as AlterDatabaseKind;
use api::v1::alter_table_expr::Kind as AlterTableKind;
use api::v1::column_def::options_from_column_schema;
use api::v1::column_def::{options_from_column_schema, try_as_column_schema};
use api::v1::{
AddColumn, AddColumns, AlterDatabaseExpr, AlterTableExpr, Analyzer, ColumnDataType,
ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
@@ -36,7 +36,12 @@ use common_grpc_expr::util::ColumnExpr;
use common_time::Timezone;
use datafusion::sql::planner::object_name_to_table_reference;
use datatypes::schema::{
COMMENT_KEY, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema, SkippingIndexType,
COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
COLUMN_FULLTEXT_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE,
COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY, COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY,
ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend, Schema,
SkippingIndexType,
};
use file_engine::FileOptions;
use query::sql::{
@@ -46,15 +51,21 @@ use query::sql::{
use session::context::QueryContextRef;
use session::table_name::table_idents_to_full_name;
use snafu::{OptionExt, ResultExt, ensure};
use sql::ast::{ColumnOption, ObjectName, ObjectNamePartExt};
use sql::ast::{
ColumnDef, ColumnOption, ColumnOptionDef, Expr, Ident, ObjectName, ObjectNamePartExt,
};
use sql::dialect::GreptimeDbDialect;
use sql::parser::ParserContext;
use sql::statements::alter::{
AlterDatabase, AlterDatabaseOperation, AlterTable, AlterTableOperation,
};
use sql::statements::create::{
Column as SqlColumn, CreateExternalTable, CreateFlow, CreateTable, CreateView, TableConstraint,
Column as SqlColumn, ColumnExtensions, CreateExternalTable, CreateFlow, CreateTable,
CreateView, TableConstraint,
};
use sql::statements::{
column_to_schema, sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type,
OptionMap, column_to_schema, concrete_data_type_to_sql_data_type,
sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, value_to_sql_value,
};
use sql::util::extract_tables_from_query;
use table::requests::{FILE_TABLE_META_KEY, TableOptions};
@@ -65,9 +76,9 @@ pub use trigger::to_create_trigger_task_expr;
use crate::error::{
BuildCreateExprOnInsertionSnafu, ColumnDataTypeSnafu, ConvertColumnDefaultConstraintSnafu,
ConvertIdentifierSnafu, EncodeJsonSnafu, ExternalSnafu, FindNewColumnsOnInsertionSnafu,
IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidFlowNameSnafu, InvalidSqlSnafu,
NotSupportedSnafu, ParseSqlSnafu, PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu,
UnrecognizedTableOptionSnafu,
IllegalPrimaryKeysDefSnafu, InferFileTableSchemaSnafu, InvalidColumnDefSnafu,
InvalidFlowNameSnafu, InvalidSqlSnafu, NotSupportedSnafu, ParseSqlSnafu, ParseSqlValueSnafu,
PrepareFileTableSnafu, Result, SchemaIncompatibleSnafu, UnrecognizedTableOptionSnafu,
};
pub fn create_table_expr_by_column_schemas(
@@ -230,6 +241,181 @@ pub fn create_to_expr(
Ok(expr)
}
/// Convert gRPC's [`CreateTableExpr`] back to `CreateTable` statement.
/// You can use `create_table_expr_by_column_schemas` to create a `CreateTableExpr` from column schemas.
///
/// # Parameters
///
/// * `expr` - The `CreateTableExpr` to convert
/// * `quote_style` - Optional quote style for identifiers (defaults to MySQL style ` backtick)
pub fn expr_to_create(expr: &CreateTableExpr, quote_style: Option<char>) -> Result<CreateTable> {
let quote_style = quote_style.unwrap_or('`');
// Convert table name
let table_name = ObjectName(vec![sql::ast::ObjectNamePart::Identifier(
sql::ast::Ident::with_quote(quote_style, &expr.table_name),
)]);
// Convert columns
let mut columns = Vec::with_capacity(expr.column_defs.len());
for column_def in &expr.column_defs {
let column_schema = try_as_column_schema(column_def).context(InvalidColumnDefSnafu {
column: &column_def.name,
})?;
let mut options = Vec::new();
// Add NULL/NOT NULL constraint
if column_def.is_nullable {
options.push(ColumnOptionDef {
name: None,
option: ColumnOption::Null,
});
} else {
options.push(ColumnOptionDef {
name: None,
option: ColumnOption::NotNull,
});
}
// Add DEFAULT constraint if present
if let Some(default_constraint) = column_schema.default_constraint() {
let expr = match default_constraint {
ColumnDefaultConstraint::Value(v) => {
Expr::Value(value_to_sql_value(v).context(ParseSqlValueSnafu)?.into())
}
ColumnDefaultConstraint::Function(func_expr) => {
ParserContext::parse_function(func_expr, &GreptimeDbDialect {})
.context(ParseSqlSnafu)?
}
};
options.push(ColumnOptionDef {
name: None,
option: ColumnOption::Default(expr),
});
}
// Add COMMENT if present
if !column_def.comment.is_empty() {
options.push(ColumnOptionDef {
name: None,
option: ColumnOption::Comment(column_def.comment.clone()),
});
}
// Note: We don't add inline PRIMARY KEY options here,
// we'll handle all primary keys as constraints instead for consistency
// Handle column extensions (fulltext, inverted index, skipping index)
let mut extensions = ColumnExtensions::default();
// Add fulltext index options if present
if let Ok(Some(opt)) = column_schema.fulltext_options()
&& opt.enable
{
let mut map = HashMap::from([
(
COLUMN_FULLTEXT_OPT_KEY_ANALYZER.to_string(),
opt.analyzer.to_string(),
),
(
COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE.to_string(),
opt.case_sensitive.to_string(),
),
(
COLUMN_FULLTEXT_OPT_KEY_BACKEND.to_string(),
opt.backend.to_string(),
),
]);
if opt.backend == FulltextBackend::Bloom {
map.insert(
COLUMN_FULLTEXT_OPT_KEY_GRANULARITY.to_string(),
opt.granularity.to_string(),
);
map.insert(
COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
opt.false_positive_rate().to_string(),
);
}
extensions.fulltext_index_options = Some(map.into());
}
// Add skipping index options if present
if let Ok(Some(opt)) = column_schema.skipping_index_options() {
let map = HashMap::from([
(
COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY.to_string(),
opt.granularity.to_string(),
),
(
COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE.to_string(),
opt.false_positive_rate().to_string(),
),
(
COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE.to_string(),
opt.index_type.to_string(),
),
]);
extensions.skipping_index_options = Some(map.into());
}
// Add inverted index options if present
if column_schema.is_inverted_indexed() {
extensions.inverted_index_options = Some(HashMap::new().into());
}
let sql_column = SqlColumn {
column_def: ColumnDef {
name: Ident::with_quote(quote_style, &column_def.name),
data_type: concrete_data_type_to_sql_data_type(&column_schema.data_type)
.context(ParseSqlSnafu)?,
options,
},
extensions,
};
columns.push(sql_column);
}
// Convert constraints
let mut constraints = Vec::new();
// Add TIME INDEX constraint
constraints.push(TableConstraint::TimeIndex {
column: Ident::with_quote(quote_style, &expr.time_index),
});
// Add PRIMARY KEY constraint (always add as constraint for consistency)
if !expr.primary_keys.is_empty() {
let primary_key_columns: Vec<Ident> = expr
.primary_keys
.iter()
.map(|pk| Ident::with_quote(quote_style, pk))
.collect();
constraints.push(TableConstraint::PrimaryKey {
columns: primary_key_columns,
});
}
// Convert table options
let mut options = OptionMap::default();
for (key, value) in &expr.table_options {
options.insert(key.clone(), value.clone());
}
Ok(CreateTable {
if_not_exists: expr.create_if_not_exists,
table_id: expr.table_id.as_ref().map(|tid| tid.id).unwrap_or(0),
name: table_name,
columns,
engine: expr.engine.clone(),
constraints,
options,
partitions: None,
})
}
/// Validate the [`CreateTableExpr`] request.
pub fn validate_create_expr(create: &CreateTableExpr) -> Result<()> {
// construct column list
@@ -1301,4 +1487,41 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;";
assert_eq!(columns, expr.columns);
assert_eq!(plan_columns, expr.plan_columns);
}
#[test]
fn test_expr_to_create() {
let sql = r#"CREATE TABLE IF NOT EXISTS `tt` (
`timestamp` TIMESTAMP(9) NOT NULL,
`ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),
`username` STRING NULL,
`http_method` STRING NULL INVERTED INDEX,
`request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),
`protocol` STRING NULL,
`status_code` INT NULL INVERTED INDEX,
`response_size` BIGINT NULL,
`message` STRING NULL,
TIME INDEX (`timestamp`),
PRIMARY KEY (`username`, `status_code`)
)
ENGINE=mito
WITH(
append_mode = 'true'
)"#;
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();
let Statement::CreateTable(original_create) = stmt else {
unreachable!()
};
// Convert CreateTable -> CreateTableExpr -> CreateTable
let expr = create_to_expr(&original_create, &QueryContext::arc()).unwrap();
let create_table = expr_to_create(&expr, Some('`')).unwrap();
let new_sql = format!("{:#}", create_table);
assert_eq!(sql, new_sql);
}
}

View File

@@ -342,6 +342,12 @@ impl Pipeline {
TransformerMode::AutoTransform(_, _) => None,
}
}
pub fn is_variant_table_name(&self) -> bool {
// even if the pipeline doesn't have dispatcher or table_suffix,
// it can still be a variant because of VRL processor and hint
self.dispatcher.is_some() || self.tablesuffix.is_some()
}
}
pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str) -> Result<usize> {

View File

@@ -86,6 +86,7 @@ socket2 = "0.5"
# 2. Use ring, instead of aws-lc-rs in https://github.com/databendlabs/opensrv/pull/72
opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "a1fb4da215c8693c7e4f62be249a01b7fec52997" }
opentelemetry-proto.workspace = true
operator.workspace = true
otel-arrow-rust.workspace = true
parking_lot.workspace = true
pgwire = { version = "0.32.1", default-features = false, features = ["server-api-ring"] }

View File

@@ -1027,6 +1027,10 @@ impl HttpServer {
"/pipelines/{pipeline_name}",
routing::get(event::query_pipeline),
)
.route(
"/pipelines/{pipeline_name}/ddl",
routing::get(event::query_pipeline_ddl),
)
.route(
"/pipelines/{pipeline_name}",
routing::post(event::add_pipeline),

View File

@@ -27,13 +27,15 @@ use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::{Extension, Json};
use axum_extra::TypedHeader;
use common_error::ext::ErrorExt;
use common_catalog::consts::default_engine;
use common_error::ext::{BoxedError, ErrorExt};
use common_query::{Output, OutputData};
use common_telemetry::{error, warn};
use datatypes::value::column_data_to_json;
use headers::ContentType;
use lazy_static::lazy_static;
use mime_guess::mime;
use operator::expr_helper::{create_table_expr_by_column_schemas, expr_to_create};
use pipeline::util::to_pipeline_version;
use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition};
use serde::{Deserialize, Serialize};
@@ -41,18 +43,21 @@ use serde_json::{Deserializer, Map, Value as JsonValue, json};
use session::context::{Channel, QueryContext, QueryContextRef};
use simd_json::Buffers;
use snafu::{OptionExt, ResultExt, ensure};
use store_api::mito_engine_options::APPEND_MODE_KEY;
use strum::{EnumIter, IntoEnumIterator};
use table::table_reference::TableReference;
use vrl::value::{KeyString, Value as VrlValue};
use crate::error::{
Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result, status_code_to_http_status,
CatalogSnafu, Error, InvalidParameterSnafu, OtherSnafu, ParseJsonSnafu, PipelineSnafu, Result,
status_code_to_http_status,
};
use crate::http::HttpResponse;
use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
use crate::http::header::{
CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_NDJSON_SUBTYPE_STR, CONTENT_TYPE_PROTOBUF_STR,
};
use crate::http::result::greptime_manage_resp::GreptimedbManageResponse;
use crate::http::result::greptime_manage_resp::{GreptimedbManageResponse, SqlOutput};
use crate::http::result::greptime_result_v1::GreptimedbV1Response;
use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use crate::metrics::{
@@ -65,6 +70,11 @@ use crate::query_handler::PipelineHandlerRef;
const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
const GREPTIME_PIPELINE_SKIP_ERROR_KEY: &str = "skip_error";
const CREATE_TABLE_SQL_SUFFIX_EXISTS: &str =
"the pipeline has dispatcher or table_suffix, the table name may not be fixed";
const CREATE_TABLE_SQL_TABLE_EXISTS: &str =
"table already exists, the CREATE TABLE SQL may be different";
lazy_static! {
pub static ref JSON_CONTENT_TYPE: ContentType = ContentType::json();
pub static ref TEXT_CONTENT_TYPE: ContentType = ContentType::text();
@@ -195,6 +205,90 @@ pub async fn query_pipeline(
))
}
/// Generate DDL from pipeline definition.
#[axum_macros::debug_handler]
pub async fn query_pipeline_ddl(
State(state): State<LogState>,
Extension(mut query_ctx): Extension<QueryContext>,
Query(query_params): Query<LogIngesterQueryParams>,
Path(pipeline_name): Path<String>,
) -> Result<GreptimedbManageResponse> {
let start = Instant::now();
let handler = state.log_handler;
ensure!(
!pipeline_name.is_empty(),
InvalidParameterSnafu {
reason: "pipeline_name is required in path",
}
);
ensure!(
!pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
InvalidParameterSnafu {
reason: "built-in pipelines don't have fixed table schema",
}
);
let table_name = query_params.table.context(InvalidParameterSnafu {
reason: "table name is required",
})?;
let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
query_ctx.set_channel(Channel::Log);
let query_ctx = Arc::new(query_ctx);
let pipeline = handler
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;
let schemas_def = pipeline.schemas().context(InvalidParameterSnafu {
reason: "auto transform doesn't have fixed table schema",
})?;
let schema = query_ctx.current_schema();
let table_name_ref = TableReference {
catalog: query_ctx.current_catalog(),
schema: &schema,
table: &table_name,
};
let mut create_table_expr =
create_table_expr_by_column_schemas(&table_name_ref, schemas_def, default_engine(), None)
.map_err(BoxedError::new)
.context(OtherSnafu)?;
// manually set the append_mode to true
create_table_expr
.table_options
.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
let expr = expr_to_create(&create_table_expr, None)
.map_err(BoxedError::new)
.context(OtherSnafu)?;
let message = if handler
.get_table(&table_name, &query_ctx)
.await
.context(CatalogSnafu)?
.is_some()
{
Some(CREATE_TABLE_SQL_TABLE_EXISTS.to_string())
} else if pipeline.is_variant_table_name() {
Some(CREATE_TABLE_SQL_SUFFIX_EXISTS.to_string())
} else {
None
};
let sql = SqlOutput {
sql: format!("{:#}", expr),
message,
};
Ok(GreptimedbManageResponse::from_sql(
sql,
start.elapsed().as_millis() as u64,
))
}
#[axum_macros::debug_handler]
pub async fn add_pipeline(
State(state): State<LogState>,

View File

@@ -55,6 +55,13 @@ impl GreptimedbManageResponse {
}
}
pub fn from_sql(sql: SqlOutput, execution_time_ms: u64) -> Self {
GreptimedbManageResponse {
manage_result: ManageResult::Sql { sql },
execution_time_ms,
}
}
pub fn with_execution_time(mut self, execution_time: u64) -> Self {
self.execution_time_ms = execution_time;
self
@@ -69,8 +76,7 @@ impl GreptimedbManageResponse {
#[serde(untagged)]
pub enum ManageResult {
Pipelines { pipelines: Vec<PipelineOutput> },
// todo(shuiyisong): refactor scripts api
Scripts(),
Sql { sql: SqlOutput },
}
#[derive(Serialize, Deserialize, Debug)]
@@ -81,6 +87,13 @@ pub struct PipelineOutput {
pipeline: Option<String>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct SqlOutput {
pub(crate) sql: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) message: Option<String>,
}
impl IntoResponse for GreptimedbManageResponse {
fn into_response(self) -> axum::response::Response {
let execution_time = self.execution_time_ms;

View File

@@ -124,6 +124,7 @@ macro_rules! http_tests {
test_pipeline_2,
test_pipeline_skip_error,
test_pipeline_filter,
test_pipeline_create_table,
test_otlp_metrics_new,
test_otlp_traces_v0,
@@ -2506,6 +2507,98 @@ transform:
guard.remove_all().await;
}
pub async fn test_pipeline_create_table(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_pipeline_create_table").await;
// handshake
let client = TestClient::new(app).await;
let pipeline_body = r#"
processors:
- dissect:
fields:
- message
patterns:
- '%{ip_address} - %{username} [%{timestamp}] "%{http_method} %{request_line} %{protocol}" %{status_code} %{response_size}'
ignore_missing: true
- date:
fields:
- timestamp
formats:
- "%d/%b/%Y:%H:%M:%S %z"
transform:
- fields:
- timestamp
type: time
index: timestamp
- fields:
- ip_address
type: string
index: skipping
- fields:
- username
type: string
tag: true
- fields:
- http_method
type: string
index: inverted
- fields:
- request_line
type: string
index: fulltext
- fields:
- protocol
type: string
- fields:
- status_code
type: int32
index: inverted
tag: true
- fields:
- response_size
type: int64
on_failure: default
default: 0
- fields:
- message
type: string
"#;
// 1. create pipeline
let res = client
.post("/v1/events/pipelines/test")
.header("Content-Type", "application/x-yaml")
.body(pipeline_body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let res = client
.get("/v1/pipelines/test/ddl?table=logs1")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let resp: serde_json::Value = res.json().await;
let sql = resp
.get("sql")
.unwrap()
.get("sql")
.unwrap()
.as_str()
.unwrap();
assert_eq!(
"CREATE TABLE IF NOT EXISTS `logs1` (\n `timestamp` TIMESTAMP(9) NOT NULL,\n `ip_address` STRING NULL SKIPPING INDEX WITH(false_positive_rate = '0.01', granularity = '10240', type = 'BLOOM'),\n `username` STRING NULL,\n `http_method` STRING NULL INVERTED INDEX,\n `request_line` STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', backend = 'bloom', case_sensitive = 'false', false_positive_rate = '0.01', granularity = '10240'),\n `protocol` STRING NULL,\n `status_code` INT NULL INVERTED INDEX,\n `response_size` BIGINT NULL,\n `message` STRING NULL,\n TIME INDEX (`timestamp`),\n PRIMARY KEY (`username`, `status_code`)\n)\nENGINE=mito\nWITH(\n append_mode = 'true'\n)",
sql
);
guard.remove_all().await;
}
pub async fn test_pipeline_dispatcher(storage_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =