chore: support specifying skipping index in pipeline (#5635)

* chore: support setting skipping index in pipeline

* chore: fix typo key

* chore: add test

* chore: fix typo
This commit is contained in:
shuiyisong
2025-03-04 02:37:13 +08:00
committed by GitHub
parent 4a277c21ef
commit 31f29d8a77
6 changed files with 64 additions and 23 deletions

View File

@@ -15,8 +15,8 @@
use std::collections::HashMap;
use datatypes::schema::{
ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextOptions, SkippingIndexType,
COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY,
ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextOptions, SkippingIndexOptions,
SkippingIndexType, COMMENT_KEY, FULLTEXT_KEY, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY,
};
use greptime_proto::v1::{Analyzer, SkippingIndexType as PbSkippingIndexType};
use snafu::ResultExt;
@@ -103,6 +103,13 @@ pub fn contains_fulltext(options: &Option<ColumnOptions>) -> bool {
.is_some_and(|o| o.options.contains_key(FULLTEXT_GRPC_KEY))
}
/// Checks if the `ColumnOptions` contains skipping index options.
pub fn contains_skipping(options: &Option<ColumnOptions>) -> bool {
options
.as_ref()
.is_some_and(|o| o.options.contains_key(SKIPPING_INDEX_GRPC_KEY))
}
/// Tries to construct a `ColumnOptions` from the given `FulltextOptions`.
pub fn options_from_fulltext(fulltext: &FulltextOptions) -> Result<Option<ColumnOptions>> {
let mut options = ColumnOptions::default();
@@ -113,6 +120,18 @@ pub fn options_from_fulltext(fulltext: &FulltextOptions) -> Result<Option<Column
Ok((!options.options.is_empty()).then_some(options))
}
/// Tries to construct a `ColumnOptions` from the given `SkippingIndexOptions`.
pub fn options_from_skipping(skipping: &SkippingIndexOptions) -> Result<Option<ColumnOptions>> {
let mut options = ColumnOptions::default();
let v = serde_json::to_string(skipping).context(error::SerializeJsonSnafu)?;
options
.options
.insert(SKIPPING_INDEX_GRPC_KEY.to_string(), v);
Ok((!options.options.is_empty()).then_some(options))
}
/// Tries to construct a `FulltextAnalyzer` from the given analyzer.
pub fn as_fulltext_option(analyzer: Analyzer) -> FulltextAnalyzer {
match analyzer {

View File

@@ -111,9 +111,9 @@ pub enum Error {
},
#[snafu(display(
"Fulltext index only supports string type, column: {column_name}, unexpected type: {column_type:?}"
"Fulltext or Skipping index only supports string type, column: {column_name}, unexpected type: {column_type:?}"
))]
InvalidFulltextColumnType {
InvalidStringIndexColumnType {
column_name: String,
column_type: ColumnDataType,
#[snafu(implicit)]
@@ -173,7 +173,7 @@ impl ErrorExt for Error {
StatusCode::InvalidArguments
}
Error::UnknownColumnDataType { .. } | Error::InvalidFulltextColumnType { .. } => {
Error::UnknownColumnDataType { .. } | Error::InvalidStringIndexColumnType { .. } => {
StatusCode::InvalidArguments
}
Error::InvalidSetTableOptionRequest { .. }

View File

@@ -15,7 +15,7 @@
use std::collections::HashSet;
use api::v1::column_data_type_extension::TypeExt;
use api::v1::column_def::contains_fulltext;
use api::v1::column_def::{contains_fulltext, contains_skipping};
use api::v1::{
AddColumn, AddColumns, Column, ColumnDataType, ColumnDataTypeExtension, ColumnDef,
ColumnOptions, ColumnSchema, CreateTableExpr, JsonTypeExtension, SemanticType,
@@ -27,7 +27,7 @@ use table::table_reference::TableReference;
use crate::error::{
self, DuplicatedColumnNameSnafu, DuplicatedTimestampColumnSnafu,
InvalidFulltextColumnTypeSnafu, MissingTimestampColumnSnafu, Result,
InvalidStringIndexColumnTypeSnafu, MissingTimestampColumnSnafu, Result,
UnknownColumnDataTypeSnafu,
};
pub struct ColumnExpr<'a> {
@@ -152,8 +152,9 @@ pub fn build_create_table_expr(
let column_type = infer_column_datatype(datatype, datatype_extension)?;
ensure!(
!contains_fulltext(options) || column_type == ColumnDataType::String,
InvalidFulltextColumnTypeSnafu {
(!contains_fulltext(options) && !contains_skipping(options))
|| column_type == ColumnDataType::String,
InvalidStringIndexColumnTypeSnafu {
column_name,
column_type,
}

View File

@@ -18,6 +18,7 @@ const INDEX_TIMESTAMP: &str = "timestamp";
const INDEX_TIMEINDEX: &str = "time";
const INDEX_TAG: &str = "tag";
const INDEX_FULLTEXT: &str = "fulltext";
const INDEX_SKIPPING: &str = "skipping";
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[allow(clippy::enum_variant_names)]
@@ -25,6 +26,7 @@ pub enum Index {
Time,
Tag,
Fulltext,
Skipping,
}
impl std::fmt::Display for Index {
@@ -33,6 +35,7 @@ impl std::fmt::Display for Index {
Index::Time => INDEX_TIMEINDEX,
Index::Tag => INDEX_TAG,
Index::Fulltext => INDEX_FULLTEXT,
Index::Skipping => INDEX_SKIPPING,
};
write!(f, "{}", index)
@@ -55,6 +58,7 @@ impl TryFrom<&str> for Index {
INDEX_TIMESTAMP | INDEX_TIMEINDEX => Ok(Index::Time),
INDEX_TAG => Ok(Index::Tag),
INDEX_FULLTEXT => Ok(Index::Fulltext),
INDEX_SKIPPING => Ok(Index::Skipping),
_ => UnsupportedIndexTypeSnafu { value }.fail(),
}
}

View File

@@ -13,9 +13,9 @@
// limitations under the License.
use api::v1::column_data_type_extension::TypeExt;
use api::v1::column_def::options_from_fulltext;
use api::v1::column_def::{options_from_fulltext, options_from_skipping};
use api::v1::{ColumnDataTypeExtension, ColumnOptions, JsonTypeExtension};
use datatypes::schema::FulltextOptions;
use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use snafu::ResultExt;
@@ -98,19 +98,21 @@ fn coerce_semantic_type(transform: &Transform) -> SemanticType {
match transform.index {
Some(Index::Tag) => SemanticType::Tag,
Some(Index::Time) => SemanticType::Timestamp,
Some(Index::Fulltext) | None => SemanticType::Field,
Some(Index::Fulltext) | Some(Index::Skipping) | None => SemanticType::Field,
}
}
fn coerce_options(transform: &Transform) -> Result<Option<ColumnOptions>> {
if let Some(Index::Fulltext) = transform.index {
options_from_fulltext(&FulltextOptions {
match transform.index {
Some(Index::Fulltext) => options_from_fulltext(&FulltextOptions {
enable: true,
..Default::default()
})
.context(ColumnOptionsSnafu)
} else {
Ok(None)
.context(ColumnOptionsSnafu),
Some(Index::Skipping) => {
options_from_skipping(&SkippingIndexOptions::default()).context(ColumnOptionsSnafu)
}
_ => Ok(None),
}
}

View File

@@ -1237,10 +1237,14 @@ transform:
- id2
type: int32
- fields:
- type
- log
- logger
type: string
- field: type
type: string
index: skipping
- field: log
type: string
index: fulltext
- field: time
type: time
index: timestamp
@@ -1314,11 +1318,22 @@ transform:
.await;
assert_eq!(res.status(), StatusCode::OK);
let encoded: String = url::form_urlencoded::byte_serialize(version_str.as_bytes()).collect();
// 3. check schema
// 3. remove pipeline
let expected_schema = "[[\"logs1\",\"CREATE TABLE IF NOT EXISTS \\\"logs1\\\" (\\n \\\"id1\\\" INT NULL,\\n \\\"id2\\\" INT NULL,\\n \\\"logger\\\" STRING NULL,\\n \\\"type\\\" STRING NULL SKIPPING INDEX WITH(granularity = '0', type = 'BLOOM'),\\n \\\"log\\\" STRING NULL FULLTEXT INDEX WITH(analyzer = 'English', case_sensitive = 'false'),\\n \\\"time\\\" TIMESTAMP(9) NOT NULL,\\n TIME INDEX (\\\"time\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
validate_data(
"pipeline_schema",
&client,
"show create table logs1",
expected_schema,
)
.await;
// 4. remove pipeline
let encoded_ver_str: String =
url::form_urlencoded::byte_serialize(version_str.as_bytes()).collect();
let res = client
.delete(format!("/v1/pipelines/test?version={}", encoded).as_str())
.delete(format!("/v1/pipelines/test?version={}", encoded_ver_str).as_str())
.send()
.await;
@@ -1334,7 +1349,7 @@ transform:
format!(r#"[{{"name":"test","version":"{}"}}]"#, version_str).as_str()
);
// 4. write data failed
// 5. write data failed
let res = client
.post("/v1/ingest?db=public&table=logs1&pipeline_name=test")
.header("Content-Type", "application/json")