diff --git a/src/api/src/v1/column_def.rs b/src/api/src/v1/column_def.rs index 214418bd44..dab7784172 100644 --- a/src/api/src/v1/column_def.rs +++ b/src/api/src/v1/column_def.rs @@ -15,8 +15,8 @@ use std::collections::HashMap; use datatypes::schema::{ - ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextOptions, SkippingIndexType, - COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, + ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextOptions, SkippingIndexOptions, + SkippingIndexType, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, }; use greptime_proto::v1::{Analyzer, SkippingIndexType as PbSkippingIndexType}; use snafu::ResultExt; @@ -103,6 +103,13 @@ pub fn contains_fulltext(options: &Option) -> bool { .is_some_and(|o| o.options.contains_key(FULLTEXT_GRPC_KEY)) } +/// Checks if the `ColumnOptions` contains skipping index options. +pub fn contains_skipping(options: &Option) -> bool { + options + .as_ref() + .is_some_and(|o| o.options.contains_key(SKIPPING_INDEX_GRPC_KEY)) +} + /// Tries to construct a `ColumnOptions` from the given `FulltextOptions`. pub fn options_from_fulltext(fulltext: &FulltextOptions) -> Result> { let mut options = ColumnOptions::default(); @@ -113,6 +120,18 @@ pub fn options_from_fulltext(fulltext: &FulltextOptions) -> Result Result> { + let mut options = ColumnOptions::default(); + + let v = serde_json::to_string(skipping).context(error::SerializeJsonSnafu)?; + options + .options + .insert(SKIPPING_INDEX_GRPC_KEY.to_string(), v); + + Ok((!options.options.is_empty()).then_some(options)) +} + /// Tries to construct a `FulltextAnalyzer` from the given analyzer. pub fn as_fulltext_option(analyzer: Analyzer) -> FulltextAnalyzer { match analyzer { diff --git a/src/common/grpc-expr/src/error.rs b/src/common/grpc-expr/src/error.rs index 5092181f76..02ce391d97 100644 --- a/src/common/grpc-expr/src/error.rs +++ b/src/common/grpc-expr/src/error.rs @@ -111,9 +111,9 @@ pub enum Error { }, #[snafu(display( - "Fulltext index only supports string type, column: {column_name}, unexpected type: {column_type:?}" + "Fulltext or Skipping index only supports string type, column: {column_name}, unexpected type: {column_type:?}" ))] - InvalidFulltextColumnType { + InvalidStringIndexColumnType { column_name: String, column_type: ColumnDataType, #[snafu(implicit)] @@ -173,7 +173,7 @@ impl ErrorExt for Error { StatusCode::InvalidArguments } - Error::UnknownColumnDataType { .. } | Error::InvalidFulltextColumnType { .. } => { + Error::UnknownColumnDataType { .. } | Error::InvalidStringIndexColumnType { .. } => { StatusCode::InvalidArguments } Error::InvalidSetTableOptionRequest { .. } diff --git a/src/common/grpc-expr/src/util.rs b/src/common/grpc-expr/src/util.rs index 360fab53b0..fe32bdcde9 100644 --- a/src/common/grpc-expr/src/util.rs +++ b/src/common/grpc-expr/src/util.rs @@ -15,7 +15,7 @@ use std::collections::HashSet; use api::v1::column_data_type_extension::TypeExt; -use api::v1::column_def::contains_fulltext; +use api::v1::column_def::{contains_fulltext, contains_skipping}; use api::v1::{ AddColumn, AddColumns, Column, ColumnDataType, ColumnDataTypeExtension, ColumnDef, ColumnOptions, ColumnSchema, CreateTableExpr, JsonTypeExtension, SemanticType, @@ -27,7 +27,7 @@ use table::table_reference::TableReference; use crate::error::{ self, DuplicatedColumnNameSnafu, DuplicatedTimestampColumnSnafu, - InvalidFulltextColumnTypeSnafu, MissingTimestampColumnSnafu, Result, + InvalidStringIndexColumnTypeSnafu, MissingTimestampColumnSnafu, Result, UnknownColumnDataTypeSnafu, }; pub struct ColumnExpr<'a> { @@ -152,8 +152,9 @@ pub fn build_create_table_expr( let column_type = infer_column_datatype(datatype, datatype_extension)?; ensure!( - !contains_fulltext(options) || column_type == ColumnDataType::String, - InvalidFulltextColumnTypeSnafu { + (!contains_fulltext(options) && !contains_skipping(options)) + || column_type == ColumnDataType::String, + InvalidStringIndexColumnTypeSnafu { column_name, column_type, } diff --git a/src/pipeline/src/etl/transform/index.rs b/src/pipeline/src/etl/transform/index.rs index 6af41990a0..9b442ec220 100644 --- a/src/pipeline/src/etl/transform/index.rs +++ b/src/pipeline/src/etl/transform/index.rs @@ -18,6 +18,7 @@ const INDEX_TIMESTAMP: &str = "timestamp"; const INDEX_TIMEINDEX: &str = "time"; const INDEX_TAG: &str = "tag"; const INDEX_FULLTEXT: &str = "fulltext"; +const INDEX_SKIPPING: &str = "skipping"; #[derive(Debug, PartialEq, Eq, Clone, Copy)] #[allow(clippy::enum_variant_names)] @@ -25,6 +26,7 @@ pub enum Index { Time, Tag, Fulltext, + Skipping, } impl std::fmt::Display for Index { @@ -33,6 +35,7 @@ impl std::fmt::Display for Index { Index::Time => INDEX_TIMEINDEX, Index::Tag => INDEX_TAG, Index::Fulltext => INDEX_FULLTEXT, + Index::Skipping => INDEX_SKIPPING, }; write!(f, "{}", index) @@ -55,6 +58,7 @@ impl TryFrom<&str> for Index { INDEX_TIMESTAMP | INDEX_TIMEINDEX => Ok(Index::Time), INDEX_TAG => Ok(Index::Tag), INDEX_FULLTEXT => Ok(Index::Fulltext), + INDEX_SKIPPING => Ok(Index::Skipping), _ => UnsupportedIndexTypeSnafu { value }.fail(), } } diff --git a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs index a796a816ec..e6e315e175 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs @@ -13,9 +13,9 @@ // limitations under the License. use api::v1::column_data_type_extension::TypeExt; -use api::v1::column_def::options_from_fulltext; +use api::v1::column_def::{options_from_fulltext, options_from_skipping}; use api::v1::{ColumnDataTypeExtension, ColumnOptions, JsonTypeExtension}; -use datatypes::schema::FulltextOptions; +use datatypes::schema::{FulltextOptions, SkippingIndexOptions}; use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; use snafu::ResultExt; @@ -98,19 +98,21 @@ fn coerce_semantic_type(transform: &Transform) -> SemanticType { match transform.index { Some(Index::Tag) => SemanticType::Tag, Some(Index::Time) => SemanticType::Timestamp, - Some(Index::Fulltext) | None => SemanticType::Field, + Some(Index::Fulltext) | Some(Index::Skipping) | None => SemanticType::Field, } } fn coerce_options(transform: &Transform) -> Result> { - if let Some(Index::Fulltext) = transform.index { - options_from_fulltext(&FulltextOptions { + match transform.index { + Some(Index::Fulltext) => options_from_fulltext(&FulltextOptions { enable: true, ..Default::default() }) - .context(ColumnOptionsSnafu) - } else { - Ok(None) + .context(ColumnOptionsSnafu), + Some(Index::Skipping) => { + options_from_skipping(&SkippingIndexOptions::default()).context(ColumnOptionsSnafu) + } + _ => Ok(None), } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index e1bb8b962b..4053c179ac 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1237,10 +1237,14 @@ transform: - id2 type: int32 - fields: - - type - - log - logger type: string + - field: type + type: string + index: skipping + - field: log + type: string + index: fulltext - field: time type: time index: timestamp @@ -1314,11 +1318,22 @@ transform: .await; assert_eq!(res.status(), StatusCode::OK); - let encoded: String = url::form_urlencoded::byte_serialize(version_str.as_bytes()).collect(); + // 3. check schema - // 3. remove pipeline + let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL,\\n \\\"id2\\\" INT NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(granularity = '0', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', case_sensitive = 'false'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + validate_data( + "pipeline_schema", + &client, + "show create table logs1", + expected_schema, + ) + .await; + + // 4. remove pipeline + let encoded_ver_str: String = + url::form_urlencoded::byte_serialize(version_str.as_bytes()).collect(); let res = client - .delete(format!("/v1/pipelines/test?version={}", encoded).as_str()) + .delete(format!("/v1/pipelines/test?version={}", encoded_ver_str).as_str()) .send() .await; @@ -1334,7 +1349,7 @@ transform: format!(r#"[{{"name":"test","version":"{}"}}]"#, version_str).as_str() ); - // 4. write data failed + // 5. write data failed let res = client .post("/v1/ingest?db=public&table=logs1&pipeline_name=test") .header("Content-Type", "application/json")