store_api/
metadata.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Metadata of region and column.
16//!
17//! This mod has its own error type [MetadataError] for validation and codec exceptions.
18
19use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::fmt;
22use std::sync::Arc;
23
24use api::v1::SemanticType;
25use api::v1::column_def::try_as_column_schema;
26use api::v1::region::RegionColumnDef;
27use common_error::ext::ErrorExt;
28use common_error::status_code::StatusCode;
29use common_macro::stack_trace_debug;
30use datatypes::arrow;
31use datatypes::arrow::datatypes::FieldRef;
32use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef, VectorIndexOptions};
33use datatypes::types::TimestampType;
34use itertools::Itertools;
35use serde::de::Error;
36use serde::{Deserialize, Deserializer, Serialize};
37use snafu::{Location, OptionExt, ResultExt, Snafu, ensure};
38
39use crate::codec::PrimaryKeyEncoding;
40use crate::region_request::{
41    AddColumn, AddColumnLocation, AlterKind, ModifyColumnType, SetIndexOption, UnsetIndexOption,
42};
43use crate::storage::consts::is_internal_column;
44use crate::storage::{ColumnId, RegionId};
45
46pub type Result<T> = std::result::Result<T, MetadataError>;
47
48/// Metadata of a column.
49#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ColumnMetadata {
51    /// Schema of this column. Is the same as `column_schema` in [SchemaRef].
52    pub column_schema: ColumnSchema,
53    /// Semantic type of this column (e.g. tag or timestamp).
54    pub semantic_type: SemanticType,
55    /// Immutable and unique id of a region.
56    pub column_id: ColumnId,
57}
58
59impl fmt::Debug for ColumnMetadata {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        write!(
62            f,
63            "[{:?} {:?} {:?}]",
64            self.column_schema, self.semantic_type, self.column_id,
65        )
66    }
67}
68
69impl ColumnMetadata {
70    /// Construct `Self` from protobuf struct [RegionColumnDef]
71    pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
72        let column_id = column_def.column_id;
73        let column_def = column_def
74            .column_def
75            .context(InvalidRawRegionRequestSnafu {
76                err: "column_def is absent",
77            })?;
78        let semantic_type = column_def.semantic_type();
79        let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
80
81        Ok(Self {
82            column_schema,
83            semantic_type,
84            column_id,
85        })
86    }
87
88    /// Encodes a vector of `ColumnMetadata` into a JSON byte vector.
89    pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
90        serde_json::to_vec(columns)
91    }
92
93    /// Decodes a JSON byte vector into a vector of `ColumnMetadata`.
94    pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
95        serde_json::from_slice(bytes)
96    }
97
98    pub fn is_same_datatype(&self, other: &Self) -> bool {
99        self.column_schema.data_type == other.column_schema.data_type
100    }
101}
102
103#[cfg_attr(doc, aquamarine::aquamarine)]
104/// General static metadata of a region.
105///
106/// This struct implements [Serialize] and [Deserialize] traits.
107/// To build a [RegionMetadata] object, use [RegionMetadataBuilder].
108///
109/// ```mermaid
110/// class RegionMetadata {
111///     +RegionId region_id
112///     +SchemaRef schema
113///     +Vec&lt;ColumnMetadata&gt; column_metadatas
114///     +Vec&lt;ColumnId&gt; primary_key
115/// }
116/// class Schema
117/// class ColumnMetadata {
118///     +ColumnSchema column_schema
119///     +SemanticTyle semantic_type
120///     +ColumnId column_id
121/// }
122/// class SemanticType
123/// RegionMetadata o-- Schema
124/// RegionMetadata o-- ColumnMetadata
125/// ColumnMetadata o-- SemanticType
126/// ```
127#[derive(Clone, PartialEq, Eq, Serialize)]
128pub struct RegionMetadata {
129    /// Latest schema constructed from [column_metadatas](RegionMetadata::column_metadatas).
130    #[serde(skip)]
131    pub schema: SchemaRef,
132
133    // We don't pub `time_index` and `id_to_index` and always construct them via [SkippedFields]
134    // so we can assumes they are valid.
135    /// Id of the time index column.
136    #[serde(skip)]
137    time_index: ColumnId,
138    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
139    #[serde(skip)]
140    id_to_index: HashMap<ColumnId, usize>,
141
142    /// Columns in the region. Has the same order as columns
143    /// in [schema](RegionMetadata::schema).
144    pub column_metadatas: Vec<ColumnMetadata>,
145    /// Maintains an ordered list of primary keys
146    pub primary_key: Vec<ColumnId>,
147
148    /// Immutable and unique id of a region.
149    pub region_id: RegionId,
150    /// Current version of the region schema.
151    ///
152    /// The version starts from 0. Altering the schema bumps the version.
153    pub schema_version: u64,
154
155    /// Primary key encoding mode.
156    pub primary_key_encoding: PrimaryKeyEncoding,
157
158    /// Partition expression serialized as a JSON string.
159    /// Compatibility behavior:
160    /// - None: no partition expr was ever set in the manifest (legacy regions).
161    /// - Some(""): an explicit “single-region/no-partition” designation. This is distinct from None and should be preserved as-is.
162    pub partition_expr: Option<String>,
163}
164
165impl fmt::Debug for RegionMetadata {
166    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
167        f.debug_struct("RegionMetadata")
168            .field("column_metadatas", &self.column_metadatas)
169            .field("time_index", &self.time_index)
170            .field("primary_key", &self.primary_key)
171            .field("region_id", &self.region_id)
172            .field("schema_version", &self.schema_version)
173            .field("partition_expr", &self.partition_expr)
174            .finish()
175    }
176}
177
178pub type RegionMetadataRef = Arc<RegionMetadata>;
179
180impl<'de> Deserialize<'de> for RegionMetadata {
181    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
182    where
183        D: Deserializer<'de>,
184    {
185        // helper internal struct for deserialization
186        #[derive(Deserialize)]
187        struct RegionMetadataWithoutSchema {
188            column_metadatas: Vec<ColumnMetadata>,
189            primary_key: Vec<ColumnId>,
190            region_id: RegionId,
191            schema_version: u64,
192            #[serde(default)]
193            primary_key_encoding: PrimaryKeyEncoding,
194            #[serde(default)]
195            partition_expr: Option<String>,
196        }
197
198        let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
199        let skipped =
200            SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
201
202        Ok(Self {
203            schema: skipped.schema,
204            time_index: skipped.time_index,
205            id_to_index: skipped.id_to_index,
206            column_metadatas: without_schema.column_metadatas,
207            primary_key: without_schema.primary_key,
208            region_id: without_schema.region_id,
209            schema_version: without_schema.schema_version,
210            primary_key_encoding: without_schema.primary_key_encoding,
211            partition_expr: without_schema.partition_expr,
212        })
213    }
214}
215
216impl RegionMetadata {
217    /// Decode the metadata from a JSON str.
218    pub fn from_json(s: &str) -> Result<Self> {
219        serde_json::from_str(s).context(SerdeJsonSnafu)
220    }
221
222    /// Encode the metadata to a JSON string.
223    pub fn to_json(&self) -> Result<String> {
224        serde_json::to_string(&self).context(SerdeJsonSnafu)
225    }
226
227    /// Find column by id.
228    pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
229        self.id_to_index
230            .get(&column_id)
231            .map(|index| &self.column_metadatas[*index])
232    }
233
234    /// Find column index by id.
235    pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
236        self.id_to_index.get(&column_id).copied()
237    }
238
239    /// Find column index by name.
240    pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
241        self.column_metadatas
242            .iter()
243            .position(|col| col.column_schema.name == column_name)
244    }
245
246    /// Returns the time index column
247    ///
248    /// # Panics
249    /// Panics if the time index column id is invalid.
250    pub fn time_index_column(&self) -> &ColumnMetadata {
251        let index = self.id_to_index[&self.time_index];
252        &self.column_metadatas[index]
253    }
254
255    /// Returns timestamp type of time index column
256    ///
257    /// # Panics
258    /// Panics if the time index column id is invalid.
259    pub fn time_index_type(&self) -> TimestampType {
260        let index = self.id_to_index[&self.time_index];
261        self.column_metadatas[index]
262            .column_schema
263            .data_type
264            .as_timestamp()
265            .unwrap()
266    }
267
268    /// Returns the position of the time index.
269    pub fn time_index_column_pos(&self) -> usize {
270        self.id_to_index[&self.time_index]
271    }
272
273    /// Returns the arrow field of the time index column.
274    pub fn time_index_field(&self) -> FieldRef {
275        let index = self.id_to_index[&self.time_index];
276        self.schema.arrow_schema().fields[index].clone()
277    }
278
279    /// Finds a column by name.
280    pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
281        self.schema
282            .column_index_by_name(name)
283            .map(|index| &self.column_metadatas[index])
284    }
285
286    /// Returns all primary key columns.
287    pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
288        // safety: RegionMetadata::validate ensures every primary key exists.
289        self.primary_key
290            .iter()
291            .map(|id| self.column_by_id(*id).unwrap())
292    }
293
294    /// Returns all field columns before projection.
295    ///
296    /// **Use with caution**. On read path where might have projection, this method
297    /// can return columns that not present in data batch.
298    pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
299        self.column_metadatas
300            .iter()
301            .filter(|column| column.semantic_type == SemanticType::Field)
302    }
303
304    /// Returns a column's index in primary key if it is a primary key column.
305    ///
306    /// This does a linear search.
307    pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
308        self.primary_key.iter().position(|id| *id == column_id)
309    }
310
311    /// Project the metadata to a new one using specified column ids.
312    ///
313    /// [RegionId] and schema version are preserved.
314    pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
315        // check time index
316        ensure!(
317            projection.contains(&self.time_index),
318            TimeIndexNotFoundSnafu
319        );
320
321        // prepare new indices
322        let indices_to_preserve = projection
323            .iter()
324            .map(|id| {
325                self.column_index_by_id(*id)
326                    .with_context(|| InvalidRegionRequestSnafu {
327                        region_id: self.region_id,
328                        err: format!("column id {} not found", id),
329                    })
330            })
331            .collect::<Result<Vec<_>>>()?;
332
333        // project schema
334        let projected_schema =
335            self.schema
336                .try_project(&indices_to_preserve)
337                .with_context(|_| SchemaProjectSnafu {
338                    origin_schema: self.schema.clone(),
339                    projection: projection.to_vec(),
340                })?;
341
342        // project columns, generate projected primary key and new id_to_index
343        let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
344        let mut projected_primary_key = vec![];
345        let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
346        for index in indices_to_preserve {
347            let col = self.column_metadatas[index].clone();
348            if col.semantic_type == SemanticType::Tag {
349                projected_primary_key.push(col.column_id);
350            }
351            projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
352            projected_column_metadatas.push(col);
353        }
354
355        Ok(RegionMetadata {
356            schema: Arc::new(projected_schema),
357            time_index: self.time_index,
358            id_to_index: projected_id_to_index,
359            column_metadatas: projected_column_metadatas,
360            primary_key: projected_primary_key,
361            region_id: self.region_id,
362            schema_version: self.schema_version,
363            primary_key_encoding: self.primary_key_encoding,
364            partition_expr: self.partition_expr.clone(),
365        })
366    }
367
368    /// Gets the column ids to be indexed by inverted index.
369    pub fn inverted_indexed_column_ids<'a>(
370        &self,
371        ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
372    ) -> HashSet<ColumnId> {
373        let mut inverted_index = self
374            .column_metadatas
375            .iter()
376            .filter(|column| column.column_schema.is_inverted_indexed())
377            .map(|column| column.column_id)
378            .collect::<HashSet<_>>();
379
380        for ignored in ignore_column_ids {
381            inverted_index.remove(ignored);
382        }
383
384        inverted_index
385    }
386
387    /// Gets the column IDs that have vector indexes along with their options.
388    /// Returns a map from column ID to the vector index options.
389    pub fn vector_indexed_column_ids(&self) -> HashMap<ColumnId, VectorIndexOptions> {
390        self.column_metadatas
391            .iter()
392            .filter_map(|column| {
393                column
394                    .column_schema
395                    .vector_index_options()
396                    .ok()
397                    .flatten()
398                    .map(|options| (column.column_id, options))
399            })
400            .collect()
401    }
402
403    /// Checks whether the metadata is valid.
404    fn validate(&self) -> Result<()> {
405        // Id to name.
406        let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
407        for col in &self.column_metadatas {
408            // Validate each column.
409            Self::validate_column_metadata(col)?;
410
411            // Check whether column id is duplicated. We already check column name
412            // is unique in `Schema` so we only check column id here.
413            ensure!(
414                !id_names.contains_key(&col.column_id),
415                InvalidMetaSnafu {
416                    reason: format!(
417                        "column {} and {} have the same column id {}",
418                        id_names[&col.column_id], col.column_schema.name, col.column_id,
419                    ),
420                }
421            );
422            id_names.insert(col.column_id, &col.column_schema.name);
423        }
424
425        // Checks there is only one time index.
426        let time_indexes = self
427            .column_metadatas
428            .iter()
429            .filter(|col| col.semantic_type == SemanticType::Timestamp)
430            .collect::<Vec<_>>();
431        ensure!(
432            time_indexes.len() == 1,
433            InvalidMetaSnafu {
434                reason: format!(
435                    "expect only one time index, found {}: {}",
436                    time_indexes.len(),
437                    time_indexes
438                        .iter()
439                        .map(|c| &c.column_schema.name)
440                        .join(", ")
441                ),
442            }
443        );
444
445        // Checks the time index column is not nullable.
446        ensure!(
447            !self.time_index_column().column_schema.is_nullable(),
448            InvalidMetaSnafu {
449                reason: format!(
450                    "time index column {} must be NOT NULL",
451                    self.time_index_column().column_schema.name
452                ),
453            }
454        );
455
456        if !self.primary_key.is_empty() {
457            let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
458            // Checks column ids in the primary key is valid.
459            for column_id in &self.primary_key {
460                // Checks whether the column id exists.
461                ensure!(
462                    id_names.contains_key(column_id),
463                    InvalidMetaSnafu {
464                        reason: format!("unknown column id {}", column_id),
465                    }
466                );
467
468                // Safety: Column with specific id must exist.
469                let column = self.column_by_id(*column_id).unwrap();
470                // Checks duplicate.
471                ensure!(
472                    !pk_ids.contains(&column_id),
473                    InvalidMetaSnafu {
474                        reason: format!(
475                            "duplicate column {} in primary key",
476                            column.column_schema.name
477                        ),
478                    }
479                );
480
481                // Checks this is not a time index column.
482                ensure!(
483                    *column_id != self.time_index,
484                    InvalidMetaSnafu {
485                        reason: format!(
486                            "column {} is already a time index column",
487                            column.column_schema.name,
488                        ),
489                    }
490                );
491
492                // Checks semantic type.
493                ensure!(
494                    column.semantic_type == SemanticType::Tag,
495                    InvalidMetaSnafu {
496                        reason: format!(
497                            "semantic type of column {} should be Tag, not {:?}",
498                            column.column_schema.name, column.semantic_type
499                        ),
500                    }
501                );
502
503                pk_ids.insert(column_id);
504            }
505        }
506
507        // Checks tag semantic type.
508        let num_tag = self
509            .column_metadatas
510            .iter()
511            .filter(|col| col.semantic_type == SemanticType::Tag)
512            .count();
513        ensure!(
514            num_tag == self.primary_key.len(),
515            InvalidMetaSnafu {
516                reason: format!(
517                    "number of primary key columns {} not equal to tag columns {}",
518                    self.primary_key.len(),
519                    num_tag
520                ),
521            }
522        );
523
524        Ok(())
525    }
526
527    /// Checks whether it is a valid column.
528    fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
529        if column_metadata.semantic_type == SemanticType::Timestamp {
530            ensure!(
531                column_metadata.column_schema.data_type.is_timestamp(),
532                InvalidMetaSnafu {
533                    reason: format!(
534                        "column `{}` is not timestamp type",
535                        column_metadata.column_schema.name
536                    ),
537                }
538            );
539        }
540
541        ensure!(
542            !is_internal_column(&column_metadata.column_schema.name),
543            InvalidMetaSnafu {
544                reason: format!(
545                    "{} is internal column name that can not be used",
546                    column_metadata.column_schema.name
547                ),
548            }
549        );
550
551        Ok(())
552    }
553}
554
555/// Builder to build [RegionMetadata].
556pub struct RegionMetadataBuilder {
557    region_id: RegionId,
558    column_metadatas: Vec<ColumnMetadata>,
559    primary_key: Vec<ColumnId>,
560    schema_version: u64,
561    primary_key_encoding: PrimaryKeyEncoding,
562    partition_expr: Option<String>,
563}
564
565impl RegionMetadataBuilder {
566    /// Returns a new builder.
567    pub fn new(id: RegionId) -> Self {
568        Self {
569            region_id: id,
570            column_metadatas: vec![],
571            primary_key: vec![],
572            schema_version: 0,
573            primary_key_encoding: PrimaryKeyEncoding::Dense,
574            partition_expr: None,
575        }
576    }
577
578    /// Creates a builder from existing [RegionMetadata].
579    pub fn from_existing(existing: RegionMetadata) -> Self {
580        Self {
581            column_metadatas: existing.column_metadatas,
582            primary_key: existing.primary_key,
583            region_id: existing.region_id,
584            schema_version: existing.schema_version,
585            primary_key_encoding: existing.primary_key_encoding,
586            partition_expr: existing.partition_expr,
587        }
588    }
589
590    /// Sets the primary key encoding mode.
591    pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
592        self.primary_key_encoding = encoding;
593        self
594    }
595
596    /// Sets the partition expression in JSON string form.
597    pub fn partition_expr_json(&mut self, expr_json: Option<String>) -> &mut Self {
598        self.partition_expr = expr_json;
599        self
600    }
601
602    /// Pushes a new column metadata to this region's metadata.
603    pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
604        self.column_metadatas.push(column_metadata);
605        self
606    }
607
608    /// Sets the primary key of the region.
609    pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
610        self.primary_key = key;
611        self
612    }
613
614    /// Increases the schema version by 1.
615    pub fn bump_version(&mut self) -> &mut Self {
616        self.schema_version += 1;
617        self
618    }
619
620    /// Applies the alter `kind` to the builder.
621    ///
622    /// The `kind` should be valid.
623    pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
624        match kind {
625            AlterKind::AddColumns { columns } => self.add_columns(columns)?,
626            AlterKind::DropColumns { names } => self.drop_columns(&names),
627            AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns)?,
628            AlterKind::SetIndexes { options } => self.set_indexes(options)?,
629            AlterKind::UnsetIndexes { options } => self.unset_indexes(options)?,
630            AlterKind::SetRegionOptions { options: _ } => {
631                // nothing to be done with RegionMetadata
632            }
633            AlterKind::UnsetRegionOptions { keys: _ } => {
634                // nothing to be done with RegionMetadata
635            }
636            AlterKind::DropDefaults { names } => {
637                self.drop_defaults(names)?;
638            }
639            AlterKind::SetDefaults { columns } => self.set_defaults(&columns)?,
640            AlterKind::SyncColumns { column_metadatas } => {
641                self.primary_key = column_metadatas
642                    .iter()
643                    .filter_map(|column_metadata| {
644                        if column_metadata.semantic_type == SemanticType::Tag {
645                            Some(column_metadata.column_id)
646                        } else {
647                            None
648                        }
649                    })
650                    .collect::<Vec<_>>();
651                self.column_metadatas = column_metadatas;
652            }
653        }
654        Ok(self)
655    }
656
657    /// Consumes the builder and build a [RegionMetadata].
658    pub fn build(self) -> Result<RegionMetadata> {
659        self.build_with_options(true)
660    }
661
662    /// Builds metadata without running validation.
663    ///
664    /// Intended for file/external engines that should accept arbitrary schemas
665    /// coming from files.
666    pub fn build_without_validation(self) -> Result<RegionMetadata> {
667        self.build_with_options(false)
668    }
669
670    fn build_with_options(self, validate: bool) -> Result<RegionMetadata> {
671        let skipped = SkippedFields::new(&self.column_metadatas)?;
672
673        let meta = RegionMetadata {
674            schema: skipped.schema,
675            time_index: skipped.time_index,
676            id_to_index: skipped.id_to_index,
677            column_metadatas: self.column_metadatas,
678            primary_key: self.primary_key,
679            region_id: self.region_id,
680            schema_version: self.schema_version,
681            primary_key_encoding: self.primary_key_encoding,
682            partition_expr: self.partition_expr,
683        };
684
685        if validate {
686            meta.validate()?;
687        }
688
689        Ok(meta)
690    }
691
692    /// Adds columns to the metadata if not exist.
693    fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
694        let mut names: HashSet<_> = self
695            .column_metadatas
696            .iter()
697            .map(|col| col.column_schema.name.clone())
698            .collect();
699
700        for add_column in columns {
701            if names.contains(&add_column.column_metadata.column_schema.name) {
702                // Column already exists.
703                continue;
704            }
705
706            let column_id = add_column.column_metadata.column_id;
707            let semantic_type = add_column.column_metadata.semantic_type;
708            let column_name = add_column.column_metadata.column_schema.name.clone();
709            match add_column.location {
710                None => {
711                    self.column_metadatas.push(add_column.column_metadata);
712                }
713                Some(AddColumnLocation::First) => {
714                    self.column_metadatas.insert(0, add_column.column_metadata);
715                }
716                Some(AddColumnLocation::After { column_name }) => {
717                    let pos = self
718                        .column_metadatas
719                        .iter()
720                        .position(|col| col.column_schema.name == column_name)
721                        .context(InvalidRegionRequestSnafu {
722                            region_id: self.region_id,
723                            err: format!(
724                                "column {} not found, failed to add column {} after it",
725                                column_name, add_column.column_metadata.column_schema.name
726                            ),
727                        })?;
728                    // Insert after pos.
729                    self.column_metadatas
730                        .insert(pos + 1, add_column.column_metadata);
731                }
732            }
733            names.insert(column_name);
734            if semantic_type == SemanticType::Tag {
735                // For a new tag, we extend the primary key.
736                self.primary_key.push(column_id);
737            }
738        }
739
740        Ok(())
741    }
742
743    /// Drops columns from the metadata if exist.
744    fn drop_columns(&mut self, names: &[String]) {
745        let name_set: HashSet<_> = names.iter().collect();
746        self.column_metadatas
747            .retain(|col| !name_set.contains(&col.column_schema.name));
748    }
749
750    /// Changes columns type to the metadata if exist.
751    fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) -> Result<()> {
752        let mut change_type_map: HashMap<_, _> = columns
753            .into_iter()
754            .map(
755                |ModifyColumnType {
756                     column_name,
757                     target_type,
758                 }| (column_name, target_type),
759            )
760            .collect();
761
762        for column_meta in self.column_metadatas.iter_mut() {
763            if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
764                column_meta.column_schema.data_type = target_type.clone();
765                // also cast default value to target_type if default value exist
766                let new_default =
767                    if let Some(default_value) = column_meta.column_schema.default_constraint() {
768                        Some(
769                            default_value
770                                .cast_to_datatype(&target_type)
771                                .with_context(|_| CastDefaultValueSnafu {
772                                    reason: format!(
773                                        "Failed to cast default value from {:?} to type {:?}",
774                                        default_value, target_type
775                                    ),
776                                })?,
777                        )
778                    } else {
779                        None
780                    };
781                column_meta.column_schema = column_meta
782                    .column_schema
783                    .clone()
784                    .with_default_constraint(new_default.clone())
785                    .with_context(|_| CastDefaultValueSnafu {
786                        reason: format!("Failed to set new default: {:?}", new_default),
787                    })?;
788            }
789        }
790
791        Ok(())
792    }
793
794    fn set_indexes(&mut self, options: Vec<SetIndexOption>) -> Result<()> {
795        let mut set_index_map: HashMap<_, Vec<_>> = HashMap::new();
796        for option in &options {
797            set_index_map
798                .entry(option.column_name())
799                .or_default()
800                .push(option);
801        }
802
803        for column_metadata in self.column_metadatas.iter_mut() {
804            if let Some(options) = set_index_map.remove(&column_metadata.column_schema.name) {
805                for option in options {
806                    Self::set_index(column_metadata, option)?;
807                }
808            }
809        }
810
811        Ok(())
812    }
813
814    fn unset_indexes(&mut self, options: Vec<UnsetIndexOption>) -> Result<()> {
815        let mut unset_index_map: HashMap<_, Vec<_>> = HashMap::new();
816        for option in &options {
817            unset_index_map
818                .entry(option.column_name())
819                .or_default()
820                .push(option);
821        }
822
823        for column_metadata in self.column_metadatas.iter_mut() {
824            if let Some(options) = unset_index_map.remove(&column_metadata.column_schema.name) {
825                for option in options {
826                    Self::unset_index(column_metadata, option)?;
827                }
828            }
829        }
830
831        Ok(())
832    }
833
834    fn set_index(column_metadata: &mut ColumnMetadata, options: &SetIndexOption) -> Result<()> {
835        match options {
836            SetIndexOption::Fulltext {
837                column_name,
838                options,
839            } => {
840                ensure!(
841                    column_metadata.column_schema.data_type.is_string(),
842                    InvalidColumnOptionSnafu {
843                        column_name,
844                        msg: "FULLTEXT index only supports string type".to_string(),
845                    }
846                );
847                let current_fulltext_options = column_metadata
848                    .column_schema
849                    .fulltext_options()
850                    .with_context(|_| GetFulltextOptionsSnafu {
851                        column_name: column_name.clone(),
852                    })?;
853                set_column_fulltext_options(
854                    column_metadata,
855                    column_name,
856                    options,
857                    current_fulltext_options,
858                )?;
859            }
860            SetIndexOption::Inverted { .. } => {
861                column_metadata.column_schema.set_inverted_index(true)
862            }
863            SetIndexOption::Skipping {
864                column_name,
865                options,
866            } => {
867                column_metadata
868                    .column_schema
869                    .set_skipping_options(options)
870                    .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
871            }
872        }
873
874        Ok(())
875    }
876
877    fn unset_index(column_metadata: &mut ColumnMetadata, options: &UnsetIndexOption) -> Result<()> {
878        match options {
879            UnsetIndexOption::Fulltext { column_name } => {
880                ensure!(
881                    column_metadata.column_schema.data_type.is_string(),
882                    InvalidColumnOptionSnafu {
883                        column_name,
884                        msg: "FULLTEXT index only supports string type".to_string(),
885                    }
886                );
887
888                let current_fulltext_options = column_metadata
889                    .column_schema
890                    .fulltext_options()
891                    .with_context(|_| GetFulltextOptionsSnafu {
892                        column_name: column_name.clone(),
893                    })?;
894
895                unset_column_fulltext_options(
896                    column_metadata,
897                    column_name,
898                    current_fulltext_options,
899                )?;
900            }
901            UnsetIndexOption::Inverted { .. } => {
902                column_metadata.column_schema.set_inverted_index(false)
903            }
904            UnsetIndexOption::Skipping { column_name } => {
905                column_metadata
906                    .column_schema
907                    .unset_skipping_options()
908                    .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
909            }
910        }
911
912        Ok(())
913    }
914
915    fn drop_defaults(&mut self, column_names: Vec<String>) -> Result<()> {
916        for name in column_names.iter() {
917            let meta = self
918                .column_metadatas
919                .iter_mut()
920                .find(|col| col.column_schema.name == *name);
921            if let Some(meta) = meta {
922                if !meta.column_schema.is_nullable() {
923                    return InvalidRegionRequestSnafu {
924                        region_id: self.region_id,
925                        err: format!(
926                            "column {name} is not nullable and `default` cannot be dropped",
927                        ),
928                    }
929                    .fail();
930                }
931                meta.column_schema = meta
932                    .column_schema
933                    .clone()
934                    .with_default_constraint(None)
935                    .with_context(|_| CastDefaultValueSnafu {
936                        reason: format!("Failed to drop default : {name:?}"),
937                    })?;
938            } else {
939                return InvalidRegionRequestSnafu {
940                    region_id: self.region_id,
941                    err: format!("column {name} not found",),
942                }
943                .fail();
944            }
945        }
946        Ok(())
947    }
948
949    fn set_defaults(&mut self, set_defaults: &[crate::region_request::SetDefault]) -> Result<()> {
950        for set_default in set_defaults.iter() {
951            let meta = self
952                .column_metadatas
953                .iter_mut()
954                .find(|col| col.column_schema.name == set_default.name);
955            if let Some(meta) = meta {
956                let default_constraint = common_sql::convert::deserialize_default_constraint(
957                    set_default.default_constraint.as_slice(),
958                    &meta.column_schema.name,
959                    &meta.column_schema.data_type,
960                )
961                .context(SqlCommonSnafu)?;
962
963                meta.column_schema = meta
964                    .column_schema
965                    .clone()
966                    .with_default_constraint(default_constraint)
967                    .with_context(|_| CastDefaultValueSnafu {
968                        reason: format!("Failed to set default : {set_default:?}"),
969                    })?;
970            } else {
971                return InvalidRegionRequestSnafu {
972                    region_id: self.region_id,
973                    err: format!("column {} not found", set_default.name),
974                }
975                .fail();
976            }
977        }
978        Ok(())
979    }
980}
981
982/// Fields skipped in serialization.
983struct SkippedFields {
984    /// Last schema.
985    schema: SchemaRef,
986    /// Id of the time index column.
987    time_index: ColumnId,
988    /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
989    id_to_index: HashMap<ColumnId, usize>,
990}
991
992impl SkippedFields {
993    /// Constructs skipped fields from `column_metadatas`.
994    fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
995        let column_schemas = column_metadatas
996            .iter()
997            .map(|column_metadata| column_metadata.column_schema.clone())
998            .collect();
999        let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
1000        let time_index = column_metadatas
1001            .iter()
1002            .find_map(|col| {
1003                if col.semantic_type == SemanticType::Timestamp {
1004                    Some(col.column_id)
1005                } else {
1006                    None
1007                }
1008            })
1009            .context(InvalidMetaSnafu {
1010                reason: "time index not found",
1011            })?;
1012        let id_to_index = column_metadatas
1013            .iter()
1014            .enumerate()
1015            .map(|(idx, col)| (col.column_id, idx))
1016            .collect();
1017
1018        Ok(SkippedFields {
1019            schema,
1020            time_index,
1021            id_to_index,
1022        })
1023    }
1024}
1025
1026#[derive(Snafu)]
1027#[snafu(visibility(pub))]
1028#[stack_trace_debug]
1029pub enum MetadataError {
1030    #[snafu(display("Invalid schema"))]
1031    InvalidSchema {
1032        source: datatypes::error::Error,
1033        #[snafu(implicit)]
1034        location: Location,
1035    },
1036
1037    #[snafu(display("Invalid metadata, {}", reason))]
1038    InvalidMeta {
1039        reason: String,
1040        #[snafu(implicit)]
1041        location: Location,
1042    },
1043
1044    #[snafu(display("Failed to ser/de json object"))]
1045    SerdeJson {
1046        #[snafu(implicit)]
1047        location: Location,
1048        #[snafu(source)]
1049        error: serde_json::Error,
1050    },
1051
1052    #[snafu(display("Invalid raw region request, err: {}", err))]
1053    InvalidRawRegionRequest {
1054        err: String,
1055        #[snafu(implicit)]
1056        location: Location,
1057    },
1058
1059    #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
1060    InvalidRegionRequest {
1061        region_id: RegionId,
1062        err: String,
1063        #[snafu(implicit)]
1064        location: Location,
1065    },
1066
1067    #[snafu(display("Unexpected schema error during project"))]
1068    SchemaProject {
1069        origin_schema: SchemaRef,
1070        projection: Vec<ColumnId>,
1071        #[snafu(implicit)]
1072        location: Location,
1073        source: datatypes::Error,
1074    },
1075
1076    #[snafu(display("Time index column not found"))]
1077    TimeIndexNotFound {
1078        #[snafu(implicit)]
1079        location: Location,
1080    },
1081
1082    #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
1083    ChangeColumnNotFound {
1084        column_name: String,
1085        region_id: RegionId,
1086        #[snafu(implicit)]
1087        location: Location,
1088    },
1089
1090    #[snafu(display("Failed to convert column schema"))]
1091    ConvertColumnSchema {
1092        source: api::error::Error,
1093        #[snafu(implicit)]
1094        location: Location,
1095    },
1096
1097    #[snafu(display("Failed to convert TimeRanges"))]
1098    ConvertTimeRanges {
1099        source: api::error::Error,
1100        #[snafu(implicit)]
1101        location: Location,
1102    },
1103
1104    #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
1105    InvalidSetRegionOptionRequest {
1106        key: String,
1107        value: String,
1108        #[snafu(implicit)]
1109        location: Location,
1110    },
1111
1112    #[snafu(display("Invalid set region option request, key: {}", key))]
1113    InvalidUnsetRegionOptionRequest {
1114        key: String,
1115        #[snafu(implicit)]
1116        location: Location,
1117    },
1118
1119    #[snafu(display("Failed to decode protobuf"))]
1120    DecodeProto {
1121        #[snafu(source)]
1122        error: prost::UnknownEnumValue,
1123        #[snafu(implicit)]
1124        location: Location,
1125    },
1126
1127    #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
1128    InvalidColumnOption {
1129        column_name: String,
1130        msg: String,
1131        #[snafu(implicit)]
1132        location: Location,
1133    },
1134
1135    #[snafu(display("Failed to set fulltext options for column {}", column_name))]
1136    SetFulltextOptions {
1137        column_name: String,
1138        source: datatypes::Error,
1139        #[snafu(implicit)]
1140        location: Location,
1141    },
1142
1143    #[snafu(display("Failed to get fulltext options for column {}", column_name))]
1144    GetFulltextOptions {
1145        column_name: String,
1146        source: datatypes::Error,
1147        #[snafu(implicit)]
1148        location: Location,
1149    },
1150
1151    #[snafu(display("Failed to set skipping index options for column {}", column_name))]
1152    SetSkippingIndexOptions {
1153        column_name: String,
1154        source: datatypes::Error,
1155        #[snafu(implicit)]
1156        location: Location,
1157    },
1158
1159    #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
1160    UnsetSkippingIndexOptions {
1161        column_name: String,
1162        source: datatypes::Error,
1163        #[snafu(implicit)]
1164        location: Location,
1165    },
1166
1167    #[snafu(display("Failed to decode arrow ipc record batches"))]
1168    DecodeArrowIpc {
1169        #[snafu(source)]
1170        error: arrow::error::ArrowError,
1171        #[snafu(implicit)]
1172        location: Location,
1173    },
1174
1175    #[snafu(display("Failed to cast default value, reason: {}", reason))]
1176    CastDefaultValue {
1177        reason: String,
1178        source: datatypes::Error,
1179        #[snafu(implicit)]
1180        location: Location,
1181    },
1182
1183    #[snafu(display("Unexpected: {}", reason))]
1184    Unexpected {
1185        reason: String,
1186        #[snafu(implicit)]
1187        location: Location,
1188    },
1189
1190    #[snafu(display("Failed to encode/decode flight message"))]
1191    FlightCodec {
1192        source: common_grpc::Error,
1193        #[snafu(implicit)]
1194        location: Location,
1195    },
1196
1197    #[snafu(display("Invalid index option"))]
1198    InvalidIndexOption {
1199        #[snafu(implicit)]
1200        location: Location,
1201        #[snafu(source)]
1202        error: datatypes::error::Error,
1203    },
1204
1205    #[snafu(display("Sql common error"))]
1206    SqlCommon {
1207        source: common_sql::error::Error,
1208        #[snafu(implicit)]
1209        location: Location,
1210    },
1211}
1212
1213impl ErrorExt for MetadataError {
1214    fn status_code(&self) -> StatusCode {
1215        match self {
1216            Self::SqlCommon { source, .. } => source.status_code(),
1217            _ => StatusCode::InvalidArguments,
1218        }
1219    }
1220
1221    fn as_any(&self) -> &dyn Any {
1222        self
1223    }
1224}
1225
1226/// Set column fulltext options if it passed the validation.
1227///
1228/// Options allowed to modify:
1229/// * backend
1230///
1231/// Options not allowed to modify:
1232/// * analyzer
1233/// * case_sensitive
1234fn set_column_fulltext_options(
1235    column_meta: &mut ColumnMetadata,
1236    column_name: &str,
1237    options: &FulltextOptions,
1238    current_options: Option<FulltextOptions>,
1239) -> Result<()> {
1240    if let Some(current_options) = current_options {
1241        ensure!(
1242            current_options.analyzer == options.analyzer
1243                && current_options.case_sensitive == options.case_sensitive,
1244            InvalidColumnOptionSnafu {
1245                column_name,
1246                msg: format!(
1247                    "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1248                    current_options.analyzer, current_options.case_sensitive
1249                ),
1250            }
1251        );
1252    }
1253
1254    column_meta
1255        .column_schema
1256        .set_fulltext_options(options)
1257        .context(SetFulltextOptionsSnafu { column_name })?;
1258
1259    Ok(())
1260}
1261
1262fn unset_column_fulltext_options(
1263    column_meta: &mut ColumnMetadata,
1264    column_name: &str,
1265    current_options: Option<FulltextOptions>,
1266) -> Result<()> {
1267    if let Some(mut current_options) = current_options
1268        && current_options.enable
1269    {
1270        current_options.enable = false;
1271        column_meta
1272            .column_schema
1273            .set_fulltext_options(&current_options)
1274            .context(SetFulltextOptionsSnafu { column_name })?;
1275    } else {
1276        return InvalidColumnOptionSnafu {
1277            column_name,
1278            msg: "FULLTEXT index already disabled",
1279        }
1280        .fail();
1281    }
1282
1283    Ok(())
1284}
1285
1286#[cfg(test)]
1287mod test {
1288    use datatypes::prelude::ConcreteDataType;
1289    use datatypes::schema::{
1290        ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend,
1291    };
1292    use datatypes::value::Value;
1293
1294    use super::*;
1295
1296    fn create_builder() -> RegionMetadataBuilder {
1297        RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1298    }
1299
1300    fn build_test_region_metadata() -> RegionMetadata {
1301        let mut builder = create_builder();
1302        builder
1303            .push_column_metadata(ColumnMetadata {
1304                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1305                semantic_type: SemanticType::Tag,
1306                column_id: 1,
1307            })
1308            .push_column_metadata(ColumnMetadata {
1309                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1310                semantic_type: SemanticType::Field,
1311                column_id: 2,
1312            })
1313            .push_column_metadata(ColumnMetadata {
1314                column_schema: ColumnSchema::new(
1315                    "c",
1316                    ConcreteDataType::timestamp_millisecond_datatype(),
1317                    false,
1318                ),
1319                semantic_type: SemanticType::Timestamp,
1320                column_id: 3,
1321            })
1322            .primary_key(vec![1])
1323            .partition_expr_json(Some("".to_string()));
1324        builder.build().unwrap()
1325    }
1326
1327    #[test]
1328    fn test_region_metadata() {
1329        let region_metadata = build_test_region_metadata();
1330        assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1331        assert_eq!(
1332            "a",
1333            region_metadata.column_by_id(1).unwrap().column_schema.name
1334        );
1335        assert_eq!(None, region_metadata.column_by_id(10));
1336    }
1337
1338    #[test]
1339    fn test_region_metadata_serde() {
1340        let region_metadata = build_test_region_metadata();
1341        let serialized = serde_json::to_string(&region_metadata).unwrap();
1342        let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1343        assert_eq!(region_metadata, deserialized);
1344    }
1345
1346    #[test]
1347    fn test_column_metadata_validate() {
1348        let mut builder = create_builder();
1349        let col = ColumnMetadata {
1350            column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1351            semantic_type: SemanticType::Timestamp,
1352            column_id: 1,
1353        };
1354
1355        builder.push_column_metadata(col);
1356        let err = builder.build().unwrap_err();
1357        assert!(
1358            err.to_string()
1359                .contains("column `ts` is not timestamp type"),
1360            "unexpected err: {err}",
1361        );
1362    }
1363
1364    #[test]
1365    fn test_empty_region_metadata() {
1366        let builder = create_builder();
1367        let err = builder.build().unwrap_err();
1368        // A region must have a time index.
1369        assert!(
1370            err.to_string().contains("time index not found"),
1371            "unexpected err: {err}",
1372        );
1373    }
1374
1375    #[test]
1376    fn test_same_column_id() {
1377        let mut builder = create_builder();
1378        builder
1379            .push_column_metadata(ColumnMetadata {
1380                column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1381                semantic_type: SemanticType::Tag,
1382                column_id: 1,
1383            })
1384            .push_column_metadata(ColumnMetadata {
1385                column_schema: ColumnSchema::new(
1386                    "b",
1387                    ConcreteDataType::timestamp_millisecond_datatype(),
1388                    false,
1389                ),
1390                semantic_type: SemanticType::Timestamp,
1391                column_id: 1,
1392            });
1393        let err = builder.build().unwrap_err();
1394        assert!(
1395            err.to_string()
1396                .contains("column a and b have the same column id"),
1397            "unexpected err: {err}",
1398        );
1399    }
1400
1401    #[test]
1402    fn test_duplicate_time_index() {
1403        let mut builder = create_builder();
1404        builder
1405            .push_column_metadata(ColumnMetadata {
1406                column_schema: ColumnSchema::new(
1407                    "a",
1408                    ConcreteDataType::timestamp_millisecond_datatype(),
1409                    false,
1410                ),
1411                semantic_type: SemanticType::Timestamp,
1412                column_id: 1,
1413            })
1414            .push_column_metadata(ColumnMetadata {
1415                column_schema: ColumnSchema::new(
1416                    "b",
1417                    ConcreteDataType::timestamp_millisecond_datatype(),
1418                    false,
1419                ),
1420                semantic_type: SemanticType::Timestamp,
1421                column_id: 2,
1422            });
1423        let err = builder.build().unwrap_err();
1424        assert!(
1425            err.to_string().contains("expect only one time index"),
1426            "unexpected err: {err}",
1427        );
1428    }
1429
1430    #[test]
1431    fn test_unknown_primary_key() {
1432        let mut builder = create_builder();
1433        builder
1434            .push_column_metadata(ColumnMetadata {
1435                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1436                semantic_type: SemanticType::Tag,
1437                column_id: 1,
1438            })
1439            .push_column_metadata(ColumnMetadata {
1440                column_schema: ColumnSchema::new(
1441                    "b",
1442                    ConcreteDataType::timestamp_millisecond_datatype(),
1443                    false,
1444                ),
1445                semantic_type: SemanticType::Timestamp,
1446                column_id: 2,
1447            })
1448            .primary_key(vec![3]);
1449        let err = builder.build().unwrap_err();
1450        assert!(
1451            err.to_string().contains("unknown column id 3"),
1452            "unexpected err: {err}",
1453        );
1454    }
1455
1456    #[test]
1457    fn test_same_primary_key() {
1458        let mut builder = create_builder();
1459        builder
1460            .push_column_metadata(ColumnMetadata {
1461                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1462                semantic_type: SemanticType::Tag,
1463                column_id: 1,
1464            })
1465            .push_column_metadata(ColumnMetadata {
1466                column_schema: ColumnSchema::new(
1467                    "b",
1468                    ConcreteDataType::timestamp_millisecond_datatype(),
1469                    false,
1470                ),
1471                semantic_type: SemanticType::Timestamp,
1472                column_id: 2,
1473            })
1474            .primary_key(vec![1, 1]);
1475        let err = builder.build().unwrap_err();
1476        assert!(
1477            err.to_string()
1478                .contains("duplicate column a in primary key"),
1479            "unexpected err: {err}",
1480        );
1481    }
1482
1483    #[test]
1484    fn test_in_time_index() {
1485        let mut builder = create_builder();
1486        builder
1487            .push_column_metadata(ColumnMetadata {
1488                column_schema: ColumnSchema::new(
1489                    "ts",
1490                    ConcreteDataType::timestamp_millisecond_datatype(),
1491                    false,
1492                ),
1493                semantic_type: SemanticType::Timestamp,
1494                column_id: 1,
1495            })
1496            .primary_key(vec![1]);
1497        let err = builder.build().unwrap_err();
1498        assert!(
1499            err.to_string()
1500                .contains("column ts is already a time index column"),
1501            "unexpected err: {err}",
1502        );
1503    }
1504
1505    #[test]
1506    fn test_nullable_time_index() {
1507        let mut builder = create_builder();
1508        builder.push_column_metadata(ColumnMetadata {
1509            column_schema: ColumnSchema::new(
1510                "ts",
1511                ConcreteDataType::timestamp_millisecond_datatype(),
1512                true,
1513            ),
1514            semantic_type: SemanticType::Timestamp,
1515            column_id: 1,
1516        });
1517        let err = builder.build().unwrap_err();
1518        assert!(
1519            err.to_string()
1520                .contains("time index column ts must be NOT NULL"),
1521            "unexpected err: {err}",
1522        );
1523    }
1524
1525    #[test]
1526    fn test_primary_key_semantic_type() {
1527        let mut builder = create_builder();
1528        builder
1529            .push_column_metadata(ColumnMetadata {
1530                column_schema: ColumnSchema::new(
1531                    "ts",
1532                    ConcreteDataType::timestamp_millisecond_datatype(),
1533                    false,
1534                ),
1535                semantic_type: SemanticType::Timestamp,
1536                column_id: 1,
1537            })
1538            .push_column_metadata(ColumnMetadata {
1539                column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1540                semantic_type: SemanticType::Field,
1541                column_id: 2,
1542            })
1543            .primary_key(vec![2]);
1544        let err = builder.build().unwrap_err();
1545        assert!(
1546            err.to_string()
1547                .contains("semantic type of column a should be Tag, not Field"),
1548            "unexpected err: {err}",
1549        );
1550    }
1551
1552    #[test]
1553    fn test_primary_key_tag_num() {
1554        let mut builder = create_builder();
1555        builder
1556            .push_column_metadata(ColumnMetadata {
1557                column_schema: ColumnSchema::new(
1558                    "ts",
1559                    ConcreteDataType::timestamp_millisecond_datatype(),
1560                    false,
1561                ),
1562                semantic_type: SemanticType::Timestamp,
1563                column_id: 1,
1564            })
1565            .push_column_metadata(ColumnMetadata {
1566                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1567                semantic_type: SemanticType::Tag,
1568                column_id: 2,
1569            })
1570            .push_column_metadata(ColumnMetadata {
1571                column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1572                semantic_type: SemanticType::Tag,
1573                column_id: 3,
1574            })
1575            .primary_key(vec![2]);
1576        let err = builder.build().unwrap_err();
1577        assert!(
1578            err.to_string()
1579                .contains("number of primary key columns 1 not equal to tag columns 2"),
1580            "unexpected err: {err}",
1581        );
1582    }
1583
1584    #[test]
1585    fn test_bump_version() {
1586        let mut region_metadata = build_test_region_metadata();
1587        let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1588        builder.bump_version();
1589        let new_meta = builder.build().unwrap();
1590        region_metadata.schema_version += 1;
1591        assert_eq!(region_metadata, new_meta);
1592    }
1593
1594    fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1595        let semantic_type = if is_tag {
1596            SemanticType::Tag
1597        } else {
1598            SemanticType::Field
1599        };
1600        ColumnMetadata {
1601            column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1602            semantic_type,
1603            column_id,
1604        }
1605    }
1606
1607    fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1608        let actual: Vec<_> = metadata
1609            .column_metadatas
1610            .iter()
1611            .map(|col| &col.column_schema.name)
1612            .collect();
1613        assert_eq!(names, actual);
1614    }
1615
1616    fn get_columns_default_constraint(
1617        metadata: &RegionMetadata,
1618        name: String,
1619    ) -> Option<Option<&ColumnDefaultConstraint>> {
1620        metadata.column_metadatas.iter().find_map(|col| {
1621            if col.column_schema.name == name {
1622                Some(col.column_schema.default_constraint())
1623            } else {
1624                None
1625            }
1626        })
1627    }
1628
1629    #[test]
1630    fn test_alter() {
1631        // a (tag), b (field), c (ts)
1632        let metadata = build_test_region_metadata();
1633        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1634        // tag d
1635        builder
1636            .alter(AlterKind::AddColumns {
1637                columns: vec![AddColumn {
1638                    column_metadata: new_column_metadata("d", true, 4),
1639                    location: None,
1640                }],
1641            })
1642            .unwrap();
1643        let metadata = builder.build().unwrap();
1644        check_columns(&metadata, &["a", "b", "c", "d"]);
1645        assert_eq!([1, 4], &metadata.primary_key[..]);
1646
1647        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1648        builder
1649            .alter(AlterKind::AddColumns {
1650                columns: vec![AddColumn {
1651                    column_metadata: new_column_metadata("e", false, 5),
1652                    location: Some(AddColumnLocation::First),
1653                }],
1654            })
1655            .unwrap();
1656        let metadata = builder.build().unwrap();
1657        check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1658
1659        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1660        builder
1661            .alter(AlterKind::AddColumns {
1662                columns: vec![AddColumn {
1663                    column_metadata: new_column_metadata("f", false, 6),
1664                    location: Some(AddColumnLocation::After {
1665                        column_name: "b".to_string(),
1666                    }),
1667                }],
1668            })
1669            .unwrap();
1670        let metadata = builder.build().unwrap();
1671        check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1672
1673        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1674        builder
1675            .alter(AlterKind::AddColumns {
1676                columns: vec![AddColumn {
1677                    column_metadata: new_column_metadata("g", false, 7),
1678                    location: Some(AddColumnLocation::After {
1679                        column_name: "d".to_string(),
1680                    }),
1681                }],
1682            })
1683            .unwrap();
1684        let metadata = builder.build().unwrap();
1685        check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1686
1687        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1688        builder
1689            .alter(AlterKind::DropColumns {
1690                names: vec!["g".to_string(), "e".to_string()],
1691            })
1692            .unwrap();
1693        let metadata = builder.build().unwrap();
1694        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1695
1696        let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1697        builder
1698            .alter(AlterKind::DropColumns {
1699                names: vec!["a".to_string()],
1700            })
1701            .unwrap();
1702        // Build returns error as the primary key contains a.
1703        let err = builder.build().unwrap_err();
1704        assert_eq!(StatusCode::InvalidArguments, err.status_code());
1705
1706        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1707        let mut column_metadata = new_column_metadata("g", false, 8);
1708        let default_constraint = Some(ColumnDefaultConstraint::Value(Value::from("g")));
1709        column_metadata.column_schema = column_metadata
1710            .column_schema
1711            .with_default_constraint(default_constraint.clone())
1712            .unwrap();
1713        builder
1714            .alter(AlterKind::AddColumns {
1715                columns: vec![AddColumn {
1716                    column_metadata,
1717                    location: None,
1718                }],
1719            })
1720            .unwrap();
1721        let metadata = builder.build().unwrap();
1722        assert_eq!(
1723            get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1724            default_constraint.as_ref()
1725        );
1726        check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1727
1728        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1729        builder
1730            .alter(AlterKind::DropDefaults {
1731                names: vec!["g".to_string()],
1732            })
1733            .unwrap();
1734        let metadata = builder.build().unwrap();
1735        assert_eq!(
1736            get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1737            None
1738        );
1739        check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1740
1741        let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1742        builder
1743            .alter(AlterKind::DropColumns {
1744                names: vec!["g".to_string()],
1745            })
1746            .unwrap();
1747        let metadata = builder.build().unwrap();
1748        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1749
1750        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1751        builder
1752            .alter(AlterKind::ModifyColumnTypes {
1753                columns: vec![ModifyColumnType {
1754                    column_name: "b".to_string(),
1755                    target_type: ConcreteDataType::string_datatype(),
1756                }],
1757            })
1758            .unwrap();
1759        let metadata = builder.build().unwrap();
1760        check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1761        let b_type = &metadata
1762            .column_by_name("b")
1763            .unwrap()
1764            .column_schema
1765            .data_type;
1766        assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1767
1768        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1769        builder
1770            .alter(AlterKind::SetIndexes {
1771                options: vec![SetIndexOption::Fulltext {
1772                    column_name: "b".to_string(),
1773                    options: FulltextOptions::new_unchecked(
1774                        true,
1775                        FulltextAnalyzer::Chinese,
1776                        true,
1777                        FulltextBackend::Bloom,
1778                        1000,
1779                        0.01,
1780                    ),
1781                }],
1782            })
1783            .unwrap();
1784        let metadata = builder.build().unwrap();
1785        let a_fulltext_options = metadata
1786            .column_by_name("b")
1787            .unwrap()
1788            .column_schema
1789            .fulltext_options()
1790            .unwrap()
1791            .unwrap();
1792        assert!(a_fulltext_options.enable);
1793        assert_eq!(
1794            datatypes::schema::FulltextAnalyzer::Chinese,
1795            a_fulltext_options.analyzer
1796        );
1797        assert!(a_fulltext_options.case_sensitive);
1798
1799        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1800        builder
1801            .alter(AlterKind::UnsetIndexes {
1802                options: vec![UnsetIndexOption::Fulltext {
1803                    column_name: "b".to_string(),
1804                }],
1805            })
1806            .unwrap();
1807        let metadata = builder.build().unwrap();
1808        let a_fulltext_options = metadata
1809            .column_by_name("b")
1810            .unwrap()
1811            .column_schema
1812            .fulltext_options()
1813            .unwrap()
1814            .unwrap();
1815        assert!(!a_fulltext_options.enable);
1816        assert_eq!(
1817            datatypes::schema::FulltextAnalyzer::Chinese,
1818            a_fulltext_options.analyzer
1819        );
1820        assert!(a_fulltext_options.case_sensitive);
1821    }
1822
1823    #[test]
1824    fn test_add_if_not_exists() {
1825        // a (tag), b (field), c (ts)
1826        let metadata = build_test_region_metadata();
1827        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1828        // tag d
1829        builder
1830            .alter(AlterKind::AddColumns {
1831                columns: vec![
1832                    AddColumn {
1833                        column_metadata: new_column_metadata("d", true, 4),
1834                        location: None,
1835                    },
1836                    AddColumn {
1837                        column_metadata: new_column_metadata("d", true, 4),
1838                        location: None,
1839                    },
1840                ],
1841            })
1842            .unwrap();
1843        let metadata = builder.build().unwrap();
1844        check_columns(&metadata, &["a", "b", "c", "d"]);
1845        assert_eq!([1, 4], &metadata.primary_key[..]);
1846
1847        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1848        // field b.
1849        builder
1850            .alter(AlterKind::AddColumns {
1851                columns: vec![AddColumn {
1852                    column_metadata: new_column_metadata("b", false, 2),
1853                    location: None,
1854                }],
1855            })
1856            .unwrap();
1857        let metadata = builder.build().unwrap();
1858        check_columns(&metadata, &["a", "b", "c", "d"]);
1859    }
1860
1861    #[test]
1862    fn test_add_column_with_inverted_index() {
1863        // only set inverted index to true explicitly will this column be inverted indexed
1864
1865        // a (tag), b (field), c (ts)
1866        let metadata = build_test_region_metadata();
1867        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1868        // tag d, e
1869        let mut col = new_column_metadata("d", true, 4);
1870        col.column_schema.set_inverted_index(true);
1871        builder
1872            .alter(AlterKind::AddColumns {
1873                columns: vec![
1874                    AddColumn {
1875                        column_metadata: col,
1876                        location: None,
1877                    },
1878                    AddColumn {
1879                        column_metadata: new_column_metadata("e", true, 5),
1880                        location: None,
1881                    },
1882                ],
1883            })
1884            .unwrap();
1885        let metadata = builder.build().unwrap();
1886        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1887        assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1888        let column_metadata = metadata.column_by_name("a").unwrap();
1889        assert!(!column_metadata.column_schema.is_inverted_indexed());
1890        let column_metadata = metadata.column_by_name("b").unwrap();
1891        assert!(!column_metadata.column_schema.is_inverted_indexed());
1892        let column_metadata = metadata.column_by_name("c").unwrap();
1893        assert!(!column_metadata.column_schema.is_inverted_indexed());
1894        let column_metadata = metadata.column_by_name("d").unwrap();
1895        assert!(column_metadata.column_schema.is_inverted_indexed());
1896        let column_metadata = metadata.column_by_name("e").unwrap();
1897        assert!(!column_metadata.column_schema.is_inverted_indexed());
1898    }
1899
1900    #[test]
1901    fn test_drop_if_exists() {
1902        // a (tag), b (field), c (ts)
1903        let metadata = build_test_region_metadata();
1904        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1905        // field d, e
1906        builder
1907            .alter(AlterKind::AddColumns {
1908                columns: vec![
1909                    AddColumn {
1910                        column_metadata: new_column_metadata("d", false, 4),
1911                        location: None,
1912                    },
1913                    AddColumn {
1914                        column_metadata: new_column_metadata("e", false, 5),
1915                        location: None,
1916                    },
1917                ],
1918            })
1919            .unwrap();
1920        let metadata = builder.build().unwrap();
1921        check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1922
1923        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1924        builder
1925            .alter(AlterKind::DropColumns {
1926                names: vec!["b".to_string(), "b".to_string()],
1927            })
1928            .unwrap();
1929        let metadata = builder.build().unwrap();
1930        check_columns(&metadata, &["a", "c", "d", "e"]);
1931
1932        let mut builder = RegionMetadataBuilder::from_existing(metadata);
1933        builder
1934            .alter(AlterKind::DropColumns {
1935                names: vec!["b".to_string(), "e".to_string()],
1936            })
1937            .unwrap();
1938        let metadata = builder.build().unwrap();
1939        check_columns(&metadata, &["a", "c", "d"]);
1940    }
1941
1942    #[test]
1943    fn test_invalid_column_name() {
1944        let mut builder = create_builder();
1945        builder.push_column_metadata(ColumnMetadata {
1946            column_schema: ColumnSchema::new(
1947                "__sequence",
1948                ConcreteDataType::timestamp_millisecond_datatype(),
1949                false,
1950            ),
1951            semantic_type: SemanticType::Timestamp,
1952            column_id: 1,
1953        });
1954        let err = builder.build().unwrap_err();
1955        assert!(
1956            err.to_string()
1957                .contains("internal column name that can not be used"),
1958            "unexpected err: {err}",
1959        );
1960    }
1961
1962    #[test]
1963    fn test_allow_internal_column_name() {
1964        let mut builder = create_builder();
1965        builder
1966            .push_column_metadata(ColumnMetadata {
1967                column_schema: ColumnSchema::new(
1968                    "__primary_key",
1969                    ConcreteDataType::string_datatype(),
1970                    false,
1971                ),
1972                semantic_type: SemanticType::Tag,
1973                column_id: 1,
1974            })
1975            .push_column_metadata(ColumnMetadata {
1976                column_schema: ColumnSchema::new(
1977                    "ts",
1978                    ConcreteDataType::timestamp_millisecond_datatype(),
1979                    false,
1980                ),
1981                semantic_type: SemanticType::Timestamp,
1982                column_id: 2,
1983            })
1984            .primary_key(vec![1]);
1985
1986        let metadata = builder.build_without_validation().unwrap();
1987        assert_eq!(
1988            "__primary_key",
1989            metadata.column_metadatas[0].column_schema.name
1990        );
1991    }
1992
1993    #[test]
1994    fn test_build_without_validation() {
1995        // Primary key points to a Field column, which would normally fail validation.
1996        let mut builder = create_builder();
1997        builder
1998            .push_column_metadata(ColumnMetadata {
1999                column_schema: ColumnSchema::new(
2000                    "ts",
2001                    ConcreteDataType::timestamp_millisecond_datatype(),
2002                    false,
2003                ),
2004                semantic_type: SemanticType::Timestamp,
2005                column_id: 1,
2006            })
2007            .push_column_metadata(ColumnMetadata {
2008                column_schema: ColumnSchema::new(
2009                    "field",
2010                    ConcreteDataType::string_datatype(),
2011                    true,
2012                ),
2013                semantic_type: SemanticType::Field,
2014                column_id: 2,
2015            })
2016            .primary_key(vec![2]);
2017
2018        // Unvalidated build should succeed.
2019        let metadata = builder.build_without_validation().unwrap();
2020        assert_eq!(vec![2], metadata.primary_key);
2021
2022        // Validated build still rejects it.
2023        let mut builder = create_builder();
2024        builder
2025            .push_column_metadata(ColumnMetadata {
2026                column_schema: ColumnSchema::new(
2027                    "ts",
2028                    ConcreteDataType::timestamp_millisecond_datatype(),
2029                    false,
2030                ),
2031                semantic_type: SemanticType::Timestamp,
2032                column_id: 1,
2033            })
2034            .push_column_metadata(ColumnMetadata {
2035                column_schema: ColumnSchema::new(
2036                    "field",
2037                    ConcreteDataType::string_datatype(),
2038                    true,
2039                ),
2040                semantic_type: SemanticType::Field,
2041                column_id: 2,
2042            })
2043            .primary_key(vec![2]);
2044        let err = builder.build().unwrap_err();
2045        assert!(
2046            err.to_string()
2047                .contains("semantic type of column field should be Tag"),
2048            "unexpected err: {err}"
2049        );
2050    }
2051
2052    #[test]
2053    fn test_debug_for_column_metadata() {
2054        let region_metadata = build_test_region_metadata();
2055        let formatted = format!("{:?}", region_metadata);
2056        assert_eq!(
2057            formatted,
2058            "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0, partition_expr: Some(\"\") }"
2059        );
2060    }
2061
2062    #[test]
2063    fn test_region_metadata_deserialize_default_primary_key_encoding() {
2064        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
2065        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2066        assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
2067
2068        let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
2069        let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2070        assert_eq!(
2071            deserialized.primary_key_encoding,
2072            PrimaryKeyEncoding::Sparse
2073        );
2074    }
2075}