1use std::any::Any;
20use std::collections::{HashMap, HashSet};
21use std::fmt;
22use std::sync::Arc;
23
24use api::v1::SemanticType;
25use api::v1::column_def::try_as_column_schema;
26use api::v1::region::RegionColumnDef;
27use common_error::ext::ErrorExt;
28use common_error::status_code::StatusCode;
29use common_macro::stack_trace_debug;
30use datatypes::arrow;
31use datatypes::arrow::datatypes::FieldRef;
32use datatypes::schema::{ColumnSchema, FulltextOptions, Schema, SchemaRef, VectorIndexOptions};
33use datatypes::types::TimestampType;
34use itertools::Itertools;
35use serde::de::Error;
36use serde::{Deserialize, Deserializer, Serialize};
37use snafu::{Location, OptionExt, ResultExt, Snafu, ensure};
38
39use crate::codec::PrimaryKeyEncoding;
40use crate::region_request::{
41 AddColumn, AddColumnLocation, AlterKind, ModifyColumnType, SetIndexOption, UnsetIndexOption,
42};
43use crate::storage::consts::is_internal_column;
44use crate::storage::{ColumnId, RegionId};
45
46pub type Result<T> = std::result::Result<T, MetadataError>;
47
48#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)]
50pub struct ColumnMetadata {
51 pub column_schema: ColumnSchema,
53 pub semantic_type: SemanticType,
55 pub column_id: ColumnId,
57}
58
59impl fmt::Debug for ColumnMetadata {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 write!(
62 f,
63 "[{:?} {:?} {:?}]",
64 self.column_schema, self.semantic_type, self.column_id,
65 )
66 }
67}
68
69impl ColumnMetadata {
70 pub fn try_from_column_def(column_def: RegionColumnDef) -> Result<Self> {
72 let column_id = column_def.column_id;
73 let column_def = column_def
74 .column_def
75 .context(InvalidRawRegionRequestSnafu {
76 err: "column_def is absent",
77 })?;
78 let semantic_type = column_def.semantic_type();
79 let column_schema = try_as_column_schema(&column_def).context(ConvertColumnSchemaSnafu)?;
80
81 Ok(Self {
82 column_schema,
83 semantic_type,
84 column_id,
85 })
86 }
87
88 pub fn encode_list(columns: &[Self]) -> serde_json::Result<Vec<u8>> {
90 serde_json::to_vec(columns)
91 }
92
93 pub fn decode_list(bytes: &[u8]) -> serde_json::Result<Vec<Self>> {
95 serde_json::from_slice(bytes)
96 }
97
98 pub fn is_same_datatype(&self, other: &Self) -> bool {
99 self.column_schema.data_type == other.column_schema.data_type
100 }
101}
102
103#[cfg_attr(doc, aquamarine::aquamarine)]
104#[derive(Clone, PartialEq, Eq, Serialize)]
128pub struct RegionMetadata {
129 #[serde(skip)]
131 pub schema: SchemaRef,
132
133 #[serde(skip)]
137 time_index: ColumnId,
138 #[serde(skip)]
140 id_to_index: HashMap<ColumnId, usize>,
141
142 pub column_metadatas: Vec<ColumnMetadata>,
145 pub primary_key: Vec<ColumnId>,
147
148 pub region_id: RegionId,
150 pub schema_version: u64,
154
155 pub primary_key_encoding: PrimaryKeyEncoding,
157
158 pub partition_expr: Option<String>,
163}
164
165impl fmt::Debug for RegionMetadata {
166 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
167 f.debug_struct("RegionMetadata")
168 .field("column_metadatas", &self.column_metadatas)
169 .field("time_index", &self.time_index)
170 .field("primary_key", &self.primary_key)
171 .field("region_id", &self.region_id)
172 .field("schema_version", &self.schema_version)
173 .field("partition_expr", &self.partition_expr)
174 .finish()
175 }
176}
177
178pub type RegionMetadataRef = Arc<RegionMetadata>;
179
180impl<'de> Deserialize<'de> for RegionMetadata {
181 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
182 where
183 D: Deserializer<'de>,
184 {
185 #[derive(Deserialize)]
187 struct RegionMetadataWithoutSchema {
188 column_metadatas: Vec<ColumnMetadata>,
189 primary_key: Vec<ColumnId>,
190 region_id: RegionId,
191 schema_version: u64,
192 #[serde(default)]
193 primary_key_encoding: PrimaryKeyEncoding,
194 #[serde(default)]
195 partition_expr: Option<String>,
196 }
197
198 let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
199 let skipped =
200 SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
201
202 Ok(Self {
203 schema: skipped.schema,
204 time_index: skipped.time_index,
205 id_to_index: skipped.id_to_index,
206 column_metadatas: without_schema.column_metadatas,
207 primary_key: without_schema.primary_key,
208 region_id: without_schema.region_id,
209 schema_version: without_schema.schema_version,
210 primary_key_encoding: without_schema.primary_key_encoding,
211 partition_expr: without_schema.partition_expr,
212 })
213 }
214}
215
216impl RegionMetadata {
217 pub fn from_json(s: &str) -> Result<Self> {
219 serde_json::from_str(s).context(SerdeJsonSnafu)
220 }
221
222 pub fn to_json(&self) -> Result<String> {
224 serde_json::to_string(&self).context(SerdeJsonSnafu)
225 }
226
227 pub fn column_by_id(&self, column_id: ColumnId) -> Option<&ColumnMetadata> {
229 self.id_to_index
230 .get(&column_id)
231 .map(|index| &self.column_metadatas[*index])
232 }
233
234 pub fn column_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
236 self.id_to_index.get(&column_id).copied()
237 }
238
239 pub fn column_index_by_name(&self, column_name: &str) -> Option<usize> {
241 self.column_metadatas
242 .iter()
243 .position(|col| col.column_schema.name == column_name)
244 }
245
246 pub fn time_index_column(&self) -> &ColumnMetadata {
251 let index = self.id_to_index[&self.time_index];
252 &self.column_metadatas[index]
253 }
254
255 pub fn time_index_type(&self) -> TimestampType {
260 let index = self.id_to_index[&self.time_index];
261 self.column_metadatas[index]
262 .column_schema
263 .data_type
264 .as_timestamp()
265 .unwrap()
266 }
267
268 pub fn time_index_column_pos(&self) -> usize {
270 self.id_to_index[&self.time_index]
271 }
272
273 pub fn time_index_field(&self) -> FieldRef {
275 let index = self.id_to_index[&self.time_index];
276 self.schema.arrow_schema().fields[index].clone()
277 }
278
279 pub fn column_by_name(&self, name: &str) -> Option<&ColumnMetadata> {
281 self.schema
282 .column_index_by_name(name)
283 .map(|index| &self.column_metadatas[index])
284 }
285
286 pub fn primary_key_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
288 self.primary_key
290 .iter()
291 .map(|id| self.column_by_id(*id).unwrap())
292 }
293
294 pub fn field_columns(&self) -> impl Iterator<Item = &ColumnMetadata> {
299 self.column_metadatas
300 .iter()
301 .filter(|column| column.semantic_type == SemanticType::Field)
302 }
303
304 pub fn primary_key_index(&self, column_id: ColumnId) -> Option<usize> {
308 self.primary_key.iter().position(|id| *id == column_id)
309 }
310
311 pub fn project(&self, projection: &[ColumnId]) -> Result<RegionMetadata> {
315 ensure!(
317 projection.contains(&self.time_index),
318 TimeIndexNotFoundSnafu
319 );
320
321 let indices_to_preserve = projection
323 .iter()
324 .map(|id| {
325 self.column_index_by_id(*id)
326 .with_context(|| InvalidRegionRequestSnafu {
327 region_id: self.region_id,
328 err: format!("column id {} not found", id),
329 })
330 })
331 .collect::<Result<Vec<_>>>()?;
332
333 let projected_schema =
335 self.schema
336 .try_project(&indices_to_preserve)
337 .with_context(|_| SchemaProjectSnafu {
338 origin_schema: self.schema.clone(),
339 projection: projection.to_vec(),
340 })?;
341
342 let mut projected_column_metadatas = Vec::with_capacity(indices_to_preserve.len());
344 let mut projected_primary_key = vec![];
345 let mut projected_id_to_index = HashMap::with_capacity(indices_to_preserve.len());
346 for index in indices_to_preserve {
347 let col = self.column_metadatas[index].clone();
348 if col.semantic_type == SemanticType::Tag {
349 projected_primary_key.push(col.column_id);
350 }
351 projected_id_to_index.insert(col.column_id, projected_column_metadatas.len());
352 projected_column_metadatas.push(col);
353 }
354
355 Ok(RegionMetadata {
356 schema: Arc::new(projected_schema),
357 time_index: self.time_index,
358 id_to_index: projected_id_to_index,
359 column_metadatas: projected_column_metadatas,
360 primary_key: projected_primary_key,
361 region_id: self.region_id,
362 schema_version: self.schema_version,
363 primary_key_encoding: self.primary_key_encoding,
364 partition_expr: self.partition_expr.clone(),
365 })
366 }
367
368 pub fn inverted_indexed_column_ids<'a>(
370 &self,
371 ignore_column_ids: impl Iterator<Item = &'a ColumnId>,
372 ) -> HashSet<ColumnId> {
373 let mut inverted_index = self
374 .column_metadatas
375 .iter()
376 .filter(|column| column.column_schema.is_inverted_indexed())
377 .map(|column| column.column_id)
378 .collect::<HashSet<_>>();
379
380 for ignored in ignore_column_ids {
381 inverted_index.remove(ignored);
382 }
383
384 inverted_index
385 }
386
387 pub fn vector_indexed_column_ids(&self) -> HashMap<ColumnId, VectorIndexOptions> {
390 self.column_metadatas
391 .iter()
392 .filter_map(|column| {
393 column
394 .column_schema
395 .vector_index_options()
396 .ok()
397 .flatten()
398 .map(|options| (column.column_id, options))
399 })
400 .collect()
401 }
402
403 fn validate(&self) -> Result<()> {
405 let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
407 for col in &self.column_metadatas {
408 Self::validate_column_metadata(col)?;
410
411 ensure!(
414 !id_names.contains_key(&col.column_id),
415 InvalidMetaSnafu {
416 reason: format!(
417 "column {} and {} have the same column id {}",
418 id_names[&col.column_id], col.column_schema.name, col.column_id,
419 ),
420 }
421 );
422 id_names.insert(col.column_id, &col.column_schema.name);
423 }
424
425 let time_indexes = self
427 .column_metadatas
428 .iter()
429 .filter(|col| col.semantic_type == SemanticType::Timestamp)
430 .collect::<Vec<_>>();
431 ensure!(
432 time_indexes.len() == 1,
433 InvalidMetaSnafu {
434 reason: format!(
435 "expect only one time index, found {}: {}",
436 time_indexes.len(),
437 time_indexes
438 .iter()
439 .map(|c| &c.column_schema.name)
440 .join(", ")
441 ),
442 }
443 );
444
445 ensure!(
447 !self.time_index_column().column_schema.is_nullable(),
448 InvalidMetaSnafu {
449 reason: format!(
450 "time index column {} must be NOT NULL",
451 self.time_index_column().column_schema.name
452 ),
453 }
454 );
455
456 if !self.primary_key.is_empty() {
457 let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
458 for column_id in &self.primary_key {
460 ensure!(
462 id_names.contains_key(column_id),
463 InvalidMetaSnafu {
464 reason: format!("unknown column id {}", column_id),
465 }
466 );
467
468 let column = self.column_by_id(*column_id).unwrap();
470 ensure!(
472 !pk_ids.contains(&column_id),
473 InvalidMetaSnafu {
474 reason: format!(
475 "duplicate column {} in primary key",
476 column.column_schema.name
477 ),
478 }
479 );
480
481 ensure!(
483 *column_id != self.time_index,
484 InvalidMetaSnafu {
485 reason: format!(
486 "column {} is already a time index column",
487 column.column_schema.name,
488 ),
489 }
490 );
491
492 ensure!(
494 column.semantic_type == SemanticType::Tag,
495 InvalidMetaSnafu {
496 reason: format!(
497 "semantic type of column {} should be Tag, not {:?}",
498 column.column_schema.name, column.semantic_type
499 ),
500 }
501 );
502
503 pk_ids.insert(column_id);
504 }
505 }
506
507 let num_tag = self
509 .column_metadatas
510 .iter()
511 .filter(|col| col.semantic_type == SemanticType::Tag)
512 .count();
513 ensure!(
514 num_tag == self.primary_key.len(),
515 InvalidMetaSnafu {
516 reason: format!(
517 "number of primary key columns {} not equal to tag columns {}",
518 self.primary_key.len(),
519 num_tag
520 ),
521 }
522 );
523
524 Ok(())
525 }
526
527 fn validate_column_metadata(column_metadata: &ColumnMetadata) -> Result<()> {
529 if column_metadata.semantic_type == SemanticType::Timestamp {
530 ensure!(
531 column_metadata.column_schema.data_type.is_timestamp(),
532 InvalidMetaSnafu {
533 reason: format!(
534 "column `{}` is not timestamp type",
535 column_metadata.column_schema.name
536 ),
537 }
538 );
539 }
540
541 ensure!(
542 !is_internal_column(&column_metadata.column_schema.name),
543 InvalidMetaSnafu {
544 reason: format!(
545 "{} is internal column name that can not be used",
546 column_metadata.column_schema.name
547 ),
548 }
549 );
550
551 Ok(())
552 }
553}
554
555pub struct RegionMetadataBuilder {
557 region_id: RegionId,
558 column_metadatas: Vec<ColumnMetadata>,
559 primary_key: Vec<ColumnId>,
560 schema_version: u64,
561 primary_key_encoding: PrimaryKeyEncoding,
562 partition_expr: Option<String>,
563}
564
565impl RegionMetadataBuilder {
566 pub fn new(id: RegionId) -> Self {
568 Self {
569 region_id: id,
570 column_metadatas: vec![],
571 primary_key: vec![],
572 schema_version: 0,
573 primary_key_encoding: PrimaryKeyEncoding::Dense,
574 partition_expr: None,
575 }
576 }
577
578 pub fn from_existing(existing: RegionMetadata) -> Self {
580 Self {
581 column_metadatas: existing.column_metadatas,
582 primary_key: existing.primary_key,
583 region_id: existing.region_id,
584 schema_version: existing.schema_version,
585 primary_key_encoding: existing.primary_key_encoding,
586 partition_expr: existing.partition_expr,
587 }
588 }
589
590 pub fn primary_key_encoding(&mut self, encoding: PrimaryKeyEncoding) -> &mut Self {
592 self.primary_key_encoding = encoding;
593 self
594 }
595
596 pub fn partition_expr_json(&mut self, expr_json: Option<String>) -> &mut Self {
598 self.partition_expr = expr_json;
599 self
600 }
601
602 pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
604 self.column_metadatas.push(column_metadata);
605 self
606 }
607
608 pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
610 self.primary_key = key;
611 self
612 }
613
614 pub fn bump_version(&mut self) -> &mut Self {
616 self.schema_version += 1;
617 self
618 }
619
620 pub fn alter(&mut self, kind: AlterKind) -> Result<&mut Self> {
624 match kind {
625 AlterKind::AddColumns { columns } => self.add_columns(columns)?,
626 AlterKind::DropColumns { names } => self.drop_columns(&names),
627 AlterKind::ModifyColumnTypes { columns } => self.modify_column_types(columns)?,
628 AlterKind::SetIndexes { options } => self.set_indexes(options)?,
629 AlterKind::UnsetIndexes { options } => self.unset_indexes(options)?,
630 AlterKind::SetRegionOptions { options: _ } => {
631 }
633 AlterKind::UnsetRegionOptions { keys: _ } => {
634 }
636 AlterKind::DropDefaults { names } => {
637 self.drop_defaults(names)?;
638 }
639 AlterKind::SetDefaults { columns } => self.set_defaults(&columns)?,
640 AlterKind::SyncColumns { column_metadatas } => {
641 self.primary_key = column_metadatas
642 .iter()
643 .filter_map(|column_metadata| {
644 if column_metadata.semantic_type == SemanticType::Tag {
645 Some(column_metadata.column_id)
646 } else {
647 None
648 }
649 })
650 .collect::<Vec<_>>();
651 self.column_metadatas = column_metadatas;
652 }
653 }
654 Ok(self)
655 }
656
657 pub fn build(self) -> Result<RegionMetadata> {
659 self.build_with_options(true)
660 }
661
662 pub fn build_without_validation(self) -> Result<RegionMetadata> {
667 self.build_with_options(false)
668 }
669
670 fn build_with_options(self, validate: bool) -> Result<RegionMetadata> {
671 let skipped = SkippedFields::new(&self.column_metadatas)?;
672
673 let meta = RegionMetadata {
674 schema: skipped.schema,
675 time_index: skipped.time_index,
676 id_to_index: skipped.id_to_index,
677 column_metadatas: self.column_metadatas,
678 primary_key: self.primary_key,
679 region_id: self.region_id,
680 schema_version: self.schema_version,
681 primary_key_encoding: self.primary_key_encoding,
682 partition_expr: self.partition_expr,
683 };
684
685 if validate {
686 meta.validate()?;
687 }
688
689 Ok(meta)
690 }
691
692 fn add_columns(&mut self, columns: Vec<AddColumn>) -> Result<()> {
694 let mut names: HashSet<_> = self
695 .column_metadatas
696 .iter()
697 .map(|col| col.column_schema.name.clone())
698 .collect();
699
700 for add_column in columns {
701 if names.contains(&add_column.column_metadata.column_schema.name) {
702 continue;
704 }
705
706 let column_id = add_column.column_metadata.column_id;
707 let semantic_type = add_column.column_metadata.semantic_type;
708 let column_name = add_column.column_metadata.column_schema.name.clone();
709 match add_column.location {
710 None => {
711 self.column_metadatas.push(add_column.column_metadata);
712 }
713 Some(AddColumnLocation::First) => {
714 self.column_metadatas.insert(0, add_column.column_metadata);
715 }
716 Some(AddColumnLocation::After { column_name }) => {
717 let pos = self
718 .column_metadatas
719 .iter()
720 .position(|col| col.column_schema.name == column_name)
721 .context(InvalidRegionRequestSnafu {
722 region_id: self.region_id,
723 err: format!(
724 "column {} not found, failed to add column {} after it",
725 column_name, add_column.column_metadata.column_schema.name
726 ),
727 })?;
728 self.column_metadatas
730 .insert(pos + 1, add_column.column_metadata);
731 }
732 }
733 names.insert(column_name);
734 if semantic_type == SemanticType::Tag {
735 self.primary_key.push(column_id);
737 }
738 }
739
740 Ok(())
741 }
742
743 fn drop_columns(&mut self, names: &[String]) {
745 let name_set: HashSet<_> = names.iter().collect();
746 self.column_metadatas
747 .retain(|col| !name_set.contains(&col.column_schema.name));
748 }
749
750 fn modify_column_types(&mut self, columns: Vec<ModifyColumnType>) -> Result<()> {
752 let mut change_type_map: HashMap<_, _> = columns
753 .into_iter()
754 .map(
755 |ModifyColumnType {
756 column_name,
757 target_type,
758 }| (column_name, target_type),
759 )
760 .collect();
761
762 for column_meta in self.column_metadatas.iter_mut() {
763 if let Some(target_type) = change_type_map.remove(&column_meta.column_schema.name) {
764 column_meta.column_schema.data_type = target_type.clone();
765 let new_default =
767 if let Some(default_value) = column_meta.column_schema.default_constraint() {
768 Some(
769 default_value
770 .cast_to_datatype(&target_type)
771 .with_context(|_| CastDefaultValueSnafu {
772 reason: format!(
773 "Failed to cast default value from {:?} to type {:?}",
774 default_value, target_type
775 ),
776 })?,
777 )
778 } else {
779 None
780 };
781 column_meta.column_schema = column_meta
782 .column_schema
783 .clone()
784 .with_default_constraint(new_default.clone())
785 .with_context(|_| CastDefaultValueSnafu {
786 reason: format!("Failed to set new default: {:?}", new_default),
787 })?;
788 }
789 }
790
791 Ok(())
792 }
793
794 fn set_indexes(&mut self, options: Vec<SetIndexOption>) -> Result<()> {
795 let mut set_index_map: HashMap<_, Vec<_>> = HashMap::new();
796 for option in &options {
797 set_index_map
798 .entry(option.column_name())
799 .or_default()
800 .push(option);
801 }
802
803 for column_metadata in self.column_metadatas.iter_mut() {
804 if let Some(options) = set_index_map.remove(&column_metadata.column_schema.name) {
805 for option in options {
806 Self::set_index(column_metadata, option)?;
807 }
808 }
809 }
810
811 Ok(())
812 }
813
814 fn unset_indexes(&mut self, options: Vec<UnsetIndexOption>) -> Result<()> {
815 let mut unset_index_map: HashMap<_, Vec<_>> = HashMap::new();
816 for option in &options {
817 unset_index_map
818 .entry(option.column_name())
819 .or_default()
820 .push(option);
821 }
822
823 for column_metadata in self.column_metadatas.iter_mut() {
824 if let Some(options) = unset_index_map.remove(&column_metadata.column_schema.name) {
825 for option in options {
826 Self::unset_index(column_metadata, option)?;
827 }
828 }
829 }
830
831 Ok(())
832 }
833
834 fn set_index(column_metadata: &mut ColumnMetadata, options: &SetIndexOption) -> Result<()> {
835 match options {
836 SetIndexOption::Fulltext {
837 column_name,
838 options,
839 } => {
840 ensure!(
841 column_metadata.column_schema.data_type.is_string(),
842 InvalidColumnOptionSnafu {
843 column_name,
844 msg: "FULLTEXT index only supports string type".to_string(),
845 }
846 );
847 let current_fulltext_options = column_metadata
848 .column_schema
849 .fulltext_options()
850 .with_context(|_| GetFulltextOptionsSnafu {
851 column_name: column_name.clone(),
852 })?;
853 set_column_fulltext_options(
854 column_metadata,
855 column_name,
856 options,
857 current_fulltext_options,
858 )?;
859 }
860 SetIndexOption::Inverted { .. } => {
861 column_metadata.column_schema.set_inverted_index(true)
862 }
863 SetIndexOption::Skipping {
864 column_name,
865 options,
866 } => {
867 column_metadata
868 .column_schema
869 .set_skipping_options(options)
870 .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
871 }
872 }
873
874 Ok(())
875 }
876
877 fn unset_index(column_metadata: &mut ColumnMetadata, options: &UnsetIndexOption) -> Result<()> {
878 match options {
879 UnsetIndexOption::Fulltext { column_name } => {
880 ensure!(
881 column_metadata.column_schema.data_type.is_string(),
882 InvalidColumnOptionSnafu {
883 column_name,
884 msg: "FULLTEXT index only supports string type".to_string(),
885 }
886 );
887
888 let current_fulltext_options = column_metadata
889 .column_schema
890 .fulltext_options()
891 .with_context(|_| GetFulltextOptionsSnafu {
892 column_name: column_name.clone(),
893 })?;
894
895 unset_column_fulltext_options(
896 column_metadata,
897 column_name,
898 current_fulltext_options,
899 )?;
900 }
901 UnsetIndexOption::Inverted { .. } => {
902 column_metadata.column_schema.set_inverted_index(false)
903 }
904 UnsetIndexOption::Skipping { column_name } => {
905 column_metadata
906 .column_schema
907 .unset_skipping_options()
908 .context(UnsetSkippingIndexOptionsSnafu { column_name })?;
909 }
910 }
911
912 Ok(())
913 }
914
915 fn drop_defaults(&mut self, column_names: Vec<String>) -> Result<()> {
916 for name in column_names.iter() {
917 let meta = self
918 .column_metadatas
919 .iter_mut()
920 .find(|col| col.column_schema.name == *name);
921 if let Some(meta) = meta {
922 if !meta.column_schema.is_nullable() {
923 return InvalidRegionRequestSnafu {
924 region_id: self.region_id,
925 err: format!(
926 "column {name} is not nullable and `default` cannot be dropped",
927 ),
928 }
929 .fail();
930 }
931 meta.column_schema = meta
932 .column_schema
933 .clone()
934 .with_default_constraint(None)
935 .with_context(|_| CastDefaultValueSnafu {
936 reason: format!("Failed to drop default : {name:?}"),
937 })?;
938 } else {
939 return InvalidRegionRequestSnafu {
940 region_id: self.region_id,
941 err: format!("column {name} not found",),
942 }
943 .fail();
944 }
945 }
946 Ok(())
947 }
948
949 fn set_defaults(&mut self, set_defaults: &[crate::region_request::SetDefault]) -> Result<()> {
950 for set_default in set_defaults.iter() {
951 let meta = self
952 .column_metadatas
953 .iter_mut()
954 .find(|col| col.column_schema.name == set_default.name);
955 if let Some(meta) = meta {
956 let default_constraint = common_sql::convert::deserialize_default_constraint(
957 set_default.default_constraint.as_slice(),
958 &meta.column_schema.name,
959 &meta.column_schema.data_type,
960 )
961 .context(SqlCommonSnafu)?;
962
963 meta.column_schema = meta
964 .column_schema
965 .clone()
966 .with_default_constraint(default_constraint)
967 .with_context(|_| CastDefaultValueSnafu {
968 reason: format!("Failed to set default : {set_default:?}"),
969 })?;
970 } else {
971 return InvalidRegionRequestSnafu {
972 region_id: self.region_id,
973 err: format!("column {} not found", set_default.name),
974 }
975 .fail();
976 }
977 }
978 Ok(())
979 }
980}
981
982struct SkippedFields {
984 schema: SchemaRef,
986 time_index: ColumnId,
988 id_to_index: HashMap<ColumnId, usize>,
990}
991
992impl SkippedFields {
993 fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
995 let column_schemas = column_metadatas
996 .iter()
997 .map(|column_metadata| column_metadata.column_schema.clone())
998 .collect();
999 let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
1000 let time_index = column_metadatas
1001 .iter()
1002 .find_map(|col| {
1003 if col.semantic_type == SemanticType::Timestamp {
1004 Some(col.column_id)
1005 } else {
1006 None
1007 }
1008 })
1009 .context(InvalidMetaSnafu {
1010 reason: "time index not found",
1011 })?;
1012 let id_to_index = column_metadatas
1013 .iter()
1014 .enumerate()
1015 .map(|(idx, col)| (col.column_id, idx))
1016 .collect();
1017
1018 Ok(SkippedFields {
1019 schema,
1020 time_index,
1021 id_to_index,
1022 })
1023 }
1024}
1025
1026#[derive(Snafu)]
1027#[snafu(visibility(pub))]
1028#[stack_trace_debug]
1029pub enum MetadataError {
1030 #[snafu(display("Invalid schema"))]
1031 InvalidSchema {
1032 source: datatypes::error::Error,
1033 #[snafu(implicit)]
1034 location: Location,
1035 },
1036
1037 #[snafu(display("Invalid metadata, {}", reason))]
1038 InvalidMeta {
1039 reason: String,
1040 #[snafu(implicit)]
1041 location: Location,
1042 },
1043
1044 #[snafu(display("Failed to ser/de json object"))]
1045 SerdeJson {
1046 #[snafu(implicit)]
1047 location: Location,
1048 #[snafu(source)]
1049 error: serde_json::Error,
1050 },
1051
1052 #[snafu(display("Invalid raw region request, err: {}", err))]
1053 InvalidRawRegionRequest {
1054 err: String,
1055 #[snafu(implicit)]
1056 location: Location,
1057 },
1058
1059 #[snafu(display("Invalid region request, region_id: {}, err: {}", region_id, err))]
1060 InvalidRegionRequest {
1061 region_id: RegionId,
1062 err: String,
1063 #[snafu(implicit)]
1064 location: Location,
1065 },
1066
1067 #[snafu(display("Unexpected schema error during project"))]
1068 SchemaProject {
1069 origin_schema: SchemaRef,
1070 projection: Vec<ColumnId>,
1071 #[snafu(implicit)]
1072 location: Location,
1073 source: datatypes::Error,
1074 },
1075
1076 #[snafu(display("Time index column not found"))]
1077 TimeIndexNotFound {
1078 #[snafu(implicit)]
1079 location: Location,
1080 },
1081
1082 #[snafu(display("Change column {} not exists in region: {}", column_name, region_id))]
1083 ChangeColumnNotFound {
1084 column_name: String,
1085 region_id: RegionId,
1086 #[snafu(implicit)]
1087 location: Location,
1088 },
1089
1090 #[snafu(display("Failed to convert column schema"))]
1091 ConvertColumnSchema {
1092 source: api::error::Error,
1093 #[snafu(implicit)]
1094 location: Location,
1095 },
1096
1097 #[snafu(display("Failed to convert TimeRanges"))]
1098 ConvertTimeRanges {
1099 source: api::error::Error,
1100 #[snafu(implicit)]
1101 location: Location,
1102 },
1103
1104 #[snafu(display("Invalid set region option request, key: {}, value: {}", key, value))]
1105 InvalidSetRegionOptionRequest {
1106 key: String,
1107 value: String,
1108 #[snafu(implicit)]
1109 location: Location,
1110 },
1111
1112 #[snafu(display("Invalid set region option request, key: {}", key))]
1113 InvalidUnsetRegionOptionRequest {
1114 key: String,
1115 #[snafu(implicit)]
1116 location: Location,
1117 },
1118
1119 #[snafu(display("Failed to decode protobuf"))]
1120 DecodeProto {
1121 #[snafu(source)]
1122 error: prost::UnknownEnumValue,
1123 #[snafu(implicit)]
1124 location: Location,
1125 },
1126
1127 #[snafu(display("Invalid column option, column name: {}, error: {}", column_name, msg))]
1128 InvalidColumnOption {
1129 column_name: String,
1130 msg: String,
1131 #[snafu(implicit)]
1132 location: Location,
1133 },
1134
1135 #[snafu(display("Failed to set fulltext options for column {}", column_name))]
1136 SetFulltextOptions {
1137 column_name: String,
1138 source: datatypes::Error,
1139 #[snafu(implicit)]
1140 location: Location,
1141 },
1142
1143 #[snafu(display("Failed to get fulltext options for column {}", column_name))]
1144 GetFulltextOptions {
1145 column_name: String,
1146 source: datatypes::Error,
1147 #[snafu(implicit)]
1148 location: Location,
1149 },
1150
1151 #[snafu(display("Failed to set skipping index options for column {}", column_name))]
1152 SetSkippingIndexOptions {
1153 column_name: String,
1154 source: datatypes::Error,
1155 #[snafu(implicit)]
1156 location: Location,
1157 },
1158
1159 #[snafu(display("Failed to unset skipping index options for column {}", column_name))]
1160 UnsetSkippingIndexOptions {
1161 column_name: String,
1162 source: datatypes::Error,
1163 #[snafu(implicit)]
1164 location: Location,
1165 },
1166
1167 #[snafu(display("Failed to decode arrow ipc record batches"))]
1168 DecodeArrowIpc {
1169 #[snafu(source)]
1170 error: arrow::error::ArrowError,
1171 #[snafu(implicit)]
1172 location: Location,
1173 },
1174
1175 #[snafu(display("Failed to cast default value, reason: {}", reason))]
1176 CastDefaultValue {
1177 reason: String,
1178 source: datatypes::Error,
1179 #[snafu(implicit)]
1180 location: Location,
1181 },
1182
1183 #[snafu(display("Unexpected: {}", reason))]
1184 Unexpected {
1185 reason: String,
1186 #[snafu(implicit)]
1187 location: Location,
1188 },
1189
1190 #[snafu(display("Failed to encode/decode flight message"))]
1191 FlightCodec {
1192 source: common_grpc::Error,
1193 #[snafu(implicit)]
1194 location: Location,
1195 },
1196
1197 #[snafu(display("Invalid index option"))]
1198 InvalidIndexOption {
1199 #[snafu(implicit)]
1200 location: Location,
1201 #[snafu(source)]
1202 error: datatypes::error::Error,
1203 },
1204
1205 #[snafu(display("Sql common error"))]
1206 SqlCommon {
1207 source: common_sql::error::Error,
1208 #[snafu(implicit)]
1209 location: Location,
1210 },
1211}
1212
1213impl ErrorExt for MetadataError {
1214 fn status_code(&self) -> StatusCode {
1215 match self {
1216 Self::SqlCommon { source, .. } => source.status_code(),
1217 _ => StatusCode::InvalidArguments,
1218 }
1219 }
1220
1221 fn as_any(&self) -> &dyn Any {
1222 self
1223 }
1224}
1225
1226fn set_column_fulltext_options(
1235 column_meta: &mut ColumnMetadata,
1236 column_name: &str,
1237 options: &FulltextOptions,
1238 current_options: Option<FulltextOptions>,
1239) -> Result<()> {
1240 if let Some(current_options) = current_options {
1241 ensure!(
1242 current_options.analyzer == options.analyzer
1243 && current_options.case_sensitive == options.case_sensitive,
1244 InvalidColumnOptionSnafu {
1245 column_name,
1246 msg: format!(
1247 "Cannot change analyzer or case_sensitive if FULLTEXT index is set before. Previous analyzer: {}, previous case_sensitive: {}",
1248 current_options.analyzer, current_options.case_sensitive
1249 ),
1250 }
1251 );
1252 }
1253
1254 column_meta
1255 .column_schema
1256 .set_fulltext_options(options)
1257 .context(SetFulltextOptionsSnafu { column_name })?;
1258
1259 Ok(())
1260}
1261
1262fn unset_column_fulltext_options(
1263 column_meta: &mut ColumnMetadata,
1264 column_name: &str,
1265 current_options: Option<FulltextOptions>,
1266) -> Result<()> {
1267 if let Some(mut current_options) = current_options
1268 && current_options.enable
1269 {
1270 current_options.enable = false;
1271 column_meta
1272 .column_schema
1273 .set_fulltext_options(¤t_options)
1274 .context(SetFulltextOptionsSnafu { column_name })?;
1275 } else {
1276 return InvalidColumnOptionSnafu {
1277 column_name,
1278 msg: "FULLTEXT index already disabled",
1279 }
1280 .fail();
1281 }
1282
1283 Ok(())
1284}
1285
1286#[cfg(test)]
1287mod test {
1288 use datatypes::prelude::ConcreteDataType;
1289 use datatypes::schema::{
1290 ColumnDefaultConstraint, ColumnSchema, FulltextAnalyzer, FulltextBackend,
1291 };
1292 use datatypes::value::Value;
1293
1294 use super::*;
1295
1296 fn create_builder() -> RegionMetadataBuilder {
1297 RegionMetadataBuilder::new(RegionId::new(1234, 5678))
1298 }
1299
1300 fn build_test_region_metadata() -> RegionMetadata {
1301 let mut builder = create_builder();
1302 builder
1303 .push_column_metadata(ColumnMetadata {
1304 column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1305 semantic_type: SemanticType::Tag,
1306 column_id: 1,
1307 })
1308 .push_column_metadata(ColumnMetadata {
1309 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1310 semantic_type: SemanticType::Field,
1311 column_id: 2,
1312 })
1313 .push_column_metadata(ColumnMetadata {
1314 column_schema: ColumnSchema::new(
1315 "c",
1316 ConcreteDataType::timestamp_millisecond_datatype(),
1317 false,
1318 ),
1319 semantic_type: SemanticType::Timestamp,
1320 column_id: 3,
1321 })
1322 .primary_key(vec![1])
1323 .partition_expr_json(Some("".to_string()));
1324 builder.build().unwrap()
1325 }
1326
1327 #[test]
1328 fn test_region_metadata() {
1329 let region_metadata = build_test_region_metadata();
1330 assert_eq!("c", region_metadata.time_index_column().column_schema.name);
1331 assert_eq!(
1332 "a",
1333 region_metadata.column_by_id(1).unwrap().column_schema.name
1334 );
1335 assert_eq!(None, region_metadata.column_by_id(10));
1336 }
1337
1338 #[test]
1339 fn test_region_metadata_serde() {
1340 let region_metadata = build_test_region_metadata();
1341 let serialized = serde_json::to_string(®ion_metadata).unwrap();
1342 let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
1343 assert_eq!(region_metadata, deserialized);
1344 }
1345
1346 #[test]
1347 fn test_column_metadata_validate() {
1348 let mut builder = create_builder();
1349 let col = ColumnMetadata {
1350 column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
1351 semantic_type: SemanticType::Timestamp,
1352 column_id: 1,
1353 };
1354
1355 builder.push_column_metadata(col);
1356 let err = builder.build().unwrap_err();
1357 assert!(
1358 err.to_string()
1359 .contains("column `ts` is not timestamp type"),
1360 "unexpected err: {err}",
1361 );
1362 }
1363
1364 #[test]
1365 fn test_empty_region_metadata() {
1366 let builder = create_builder();
1367 let err = builder.build().unwrap_err();
1368 assert!(
1370 err.to_string().contains("time index not found"),
1371 "unexpected err: {err}",
1372 );
1373 }
1374
1375 #[test]
1376 fn test_same_column_id() {
1377 let mut builder = create_builder();
1378 builder
1379 .push_column_metadata(ColumnMetadata {
1380 column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
1381 semantic_type: SemanticType::Tag,
1382 column_id: 1,
1383 })
1384 .push_column_metadata(ColumnMetadata {
1385 column_schema: ColumnSchema::new(
1386 "b",
1387 ConcreteDataType::timestamp_millisecond_datatype(),
1388 false,
1389 ),
1390 semantic_type: SemanticType::Timestamp,
1391 column_id: 1,
1392 });
1393 let err = builder.build().unwrap_err();
1394 assert!(
1395 err.to_string()
1396 .contains("column a and b have the same column id"),
1397 "unexpected err: {err}",
1398 );
1399 }
1400
1401 #[test]
1402 fn test_duplicate_time_index() {
1403 let mut builder = create_builder();
1404 builder
1405 .push_column_metadata(ColumnMetadata {
1406 column_schema: ColumnSchema::new(
1407 "a",
1408 ConcreteDataType::timestamp_millisecond_datatype(),
1409 false,
1410 ),
1411 semantic_type: SemanticType::Timestamp,
1412 column_id: 1,
1413 })
1414 .push_column_metadata(ColumnMetadata {
1415 column_schema: ColumnSchema::new(
1416 "b",
1417 ConcreteDataType::timestamp_millisecond_datatype(),
1418 false,
1419 ),
1420 semantic_type: SemanticType::Timestamp,
1421 column_id: 2,
1422 });
1423 let err = builder.build().unwrap_err();
1424 assert!(
1425 err.to_string().contains("expect only one time index"),
1426 "unexpected err: {err}",
1427 );
1428 }
1429
1430 #[test]
1431 fn test_unknown_primary_key() {
1432 let mut builder = create_builder();
1433 builder
1434 .push_column_metadata(ColumnMetadata {
1435 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1436 semantic_type: SemanticType::Tag,
1437 column_id: 1,
1438 })
1439 .push_column_metadata(ColumnMetadata {
1440 column_schema: ColumnSchema::new(
1441 "b",
1442 ConcreteDataType::timestamp_millisecond_datatype(),
1443 false,
1444 ),
1445 semantic_type: SemanticType::Timestamp,
1446 column_id: 2,
1447 })
1448 .primary_key(vec![3]);
1449 let err = builder.build().unwrap_err();
1450 assert!(
1451 err.to_string().contains("unknown column id 3"),
1452 "unexpected err: {err}",
1453 );
1454 }
1455
1456 #[test]
1457 fn test_same_primary_key() {
1458 let mut builder = create_builder();
1459 builder
1460 .push_column_metadata(ColumnMetadata {
1461 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
1462 semantic_type: SemanticType::Tag,
1463 column_id: 1,
1464 })
1465 .push_column_metadata(ColumnMetadata {
1466 column_schema: ColumnSchema::new(
1467 "b",
1468 ConcreteDataType::timestamp_millisecond_datatype(),
1469 false,
1470 ),
1471 semantic_type: SemanticType::Timestamp,
1472 column_id: 2,
1473 })
1474 .primary_key(vec![1, 1]);
1475 let err = builder.build().unwrap_err();
1476 assert!(
1477 err.to_string()
1478 .contains("duplicate column a in primary key"),
1479 "unexpected err: {err}",
1480 );
1481 }
1482
1483 #[test]
1484 fn test_in_time_index() {
1485 let mut builder = create_builder();
1486 builder
1487 .push_column_metadata(ColumnMetadata {
1488 column_schema: ColumnSchema::new(
1489 "ts",
1490 ConcreteDataType::timestamp_millisecond_datatype(),
1491 false,
1492 ),
1493 semantic_type: SemanticType::Timestamp,
1494 column_id: 1,
1495 })
1496 .primary_key(vec![1]);
1497 let err = builder.build().unwrap_err();
1498 assert!(
1499 err.to_string()
1500 .contains("column ts is already a time index column"),
1501 "unexpected err: {err}",
1502 );
1503 }
1504
1505 #[test]
1506 fn test_nullable_time_index() {
1507 let mut builder = create_builder();
1508 builder.push_column_metadata(ColumnMetadata {
1509 column_schema: ColumnSchema::new(
1510 "ts",
1511 ConcreteDataType::timestamp_millisecond_datatype(),
1512 true,
1513 ),
1514 semantic_type: SemanticType::Timestamp,
1515 column_id: 1,
1516 });
1517 let err = builder.build().unwrap_err();
1518 assert!(
1519 err.to_string()
1520 .contains("time index column ts must be NOT NULL"),
1521 "unexpected err: {err}",
1522 );
1523 }
1524
1525 #[test]
1526 fn test_primary_key_semantic_type() {
1527 let mut builder = create_builder();
1528 builder
1529 .push_column_metadata(ColumnMetadata {
1530 column_schema: ColumnSchema::new(
1531 "ts",
1532 ConcreteDataType::timestamp_millisecond_datatype(),
1533 false,
1534 ),
1535 semantic_type: SemanticType::Timestamp,
1536 column_id: 1,
1537 })
1538 .push_column_metadata(ColumnMetadata {
1539 column_schema: ColumnSchema::new("a", ConcreteDataType::float64_datatype(), true),
1540 semantic_type: SemanticType::Field,
1541 column_id: 2,
1542 })
1543 .primary_key(vec![2]);
1544 let err = builder.build().unwrap_err();
1545 assert!(
1546 err.to_string()
1547 .contains("semantic type of column a should be Tag, not Field"),
1548 "unexpected err: {err}",
1549 );
1550 }
1551
1552 #[test]
1553 fn test_primary_key_tag_num() {
1554 let mut builder = create_builder();
1555 builder
1556 .push_column_metadata(ColumnMetadata {
1557 column_schema: ColumnSchema::new(
1558 "ts",
1559 ConcreteDataType::timestamp_millisecond_datatype(),
1560 false,
1561 ),
1562 semantic_type: SemanticType::Timestamp,
1563 column_id: 1,
1564 })
1565 .push_column_metadata(ColumnMetadata {
1566 column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), true),
1567 semantic_type: SemanticType::Tag,
1568 column_id: 2,
1569 })
1570 .push_column_metadata(ColumnMetadata {
1571 column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), true),
1572 semantic_type: SemanticType::Tag,
1573 column_id: 3,
1574 })
1575 .primary_key(vec![2]);
1576 let err = builder.build().unwrap_err();
1577 assert!(
1578 err.to_string()
1579 .contains("number of primary key columns 1 not equal to tag columns 2"),
1580 "unexpected err: {err}",
1581 );
1582 }
1583
1584 #[test]
1585 fn test_bump_version() {
1586 let mut region_metadata = build_test_region_metadata();
1587 let mut builder = RegionMetadataBuilder::from_existing(region_metadata.clone());
1588 builder.bump_version();
1589 let new_meta = builder.build().unwrap();
1590 region_metadata.schema_version += 1;
1591 assert_eq!(region_metadata, new_meta);
1592 }
1593
1594 fn new_column_metadata(name: &str, is_tag: bool, column_id: ColumnId) -> ColumnMetadata {
1595 let semantic_type = if is_tag {
1596 SemanticType::Tag
1597 } else {
1598 SemanticType::Field
1599 };
1600 ColumnMetadata {
1601 column_schema: ColumnSchema::new(name, ConcreteDataType::string_datatype(), true),
1602 semantic_type,
1603 column_id,
1604 }
1605 }
1606
1607 fn check_columns(metadata: &RegionMetadata, names: &[&str]) {
1608 let actual: Vec<_> = metadata
1609 .column_metadatas
1610 .iter()
1611 .map(|col| &col.column_schema.name)
1612 .collect();
1613 assert_eq!(names, actual);
1614 }
1615
1616 fn get_columns_default_constraint(
1617 metadata: &RegionMetadata,
1618 name: String,
1619 ) -> Option<Option<&ColumnDefaultConstraint>> {
1620 metadata.column_metadatas.iter().find_map(|col| {
1621 if col.column_schema.name == name {
1622 Some(col.column_schema.default_constraint())
1623 } else {
1624 None
1625 }
1626 })
1627 }
1628
1629 #[test]
1630 fn test_alter() {
1631 let metadata = build_test_region_metadata();
1633 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1634 builder
1636 .alter(AlterKind::AddColumns {
1637 columns: vec![AddColumn {
1638 column_metadata: new_column_metadata("d", true, 4),
1639 location: None,
1640 }],
1641 })
1642 .unwrap();
1643 let metadata = builder.build().unwrap();
1644 check_columns(&metadata, &["a", "b", "c", "d"]);
1645 assert_eq!([1, 4], &metadata.primary_key[..]);
1646
1647 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1648 builder
1649 .alter(AlterKind::AddColumns {
1650 columns: vec![AddColumn {
1651 column_metadata: new_column_metadata("e", false, 5),
1652 location: Some(AddColumnLocation::First),
1653 }],
1654 })
1655 .unwrap();
1656 let metadata = builder.build().unwrap();
1657 check_columns(&metadata, &["e", "a", "b", "c", "d"]);
1658
1659 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1660 builder
1661 .alter(AlterKind::AddColumns {
1662 columns: vec![AddColumn {
1663 column_metadata: new_column_metadata("f", false, 6),
1664 location: Some(AddColumnLocation::After {
1665 column_name: "b".to_string(),
1666 }),
1667 }],
1668 })
1669 .unwrap();
1670 let metadata = builder.build().unwrap();
1671 check_columns(&metadata, &["e", "a", "b", "f", "c", "d"]);
1672
1673 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1674 builder
1675 .alter(AlterKind::AddColumns {
1676 columns: vec![AddColumn {
1677 column_metadata: new_column_metadata("g", false, 7),
1678 location: Some(AddColumnLocation::After {
1679 column_name: "d".to_string(),
1680 }),
1681 }],
1682 })
1683 .unwrap();
1684 let metadata = builder.build().unwrap();
1685 check_columns(&metadata, &["e", "a", "b", "f", "c", "d", "g"]);
1686
1687 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1688 builder
1689 .alter(AlterKind::DropColumns {
1690 names: vec!["g".to_string(), "e".to_string()],
1691 })
1692 .unwrap();
1693 let metadata = builder.build().unwrap();
1694 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1695
1696 let mut builder = RegionMetadataBuilder::from_existing(metadata.clone());
1697 builder
1698 .alter(AlterKind::DropColumns {
1699 names: vec!["a".to_string()],
1700 })
1701 .unwrap();
1702 let err = builder.build().unwrap_err();
1704 assert_eq!(StatusCode::InvalidArguments, err.status_code());
1705
1706 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1707 let mut column_metadata = new_column_metadata("g", false, 8);
1708 let default_constraint = Some(ColumnDefaultConstraint::Value(Value::from("g")));
1709 column_metadata.column_schema = column_metadata
1710 .column_schema
1711 .with_default_constraint(default_constraint.clone())
1712 .unwrap();
1713 builder
1714 .alter(AlterKind::AddColumns {
1715 columns: vec![AddColumn {
1716 column_metadata,
1717 location: None,
1718 }],
1719 })
1720 .unwrap();
1721 let metadata = builder.build().unwrap();
1722 assert_eq!(
1723 get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1724 default_constraint.as_ref()
1725 );
1726 check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1727
1728 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1729 builder
1730 .alter(AlterKind::DropDefaults {
1731 names: vec!["g".to_string()],
1732 })
1733 .unwrap();
1734 let metadata = builder.build().unwrap();
1735 assert_eq!(
1736 get_columns_default_constraint(&metadata, "g".to_string()).unwrap(),
1737 None
1738 );
1739 check_columns(&metadata, &["a", "b", "f", "c", "d", "g"]);
1740
1741 let mut builder: RegionMetadataBuilder = RegionMetadataBuilder::from_existing(metadata);
1742 builder
1743 .alter(AlterKind::DropColumns {
1744 names: vec!["g".to_string()],
1745 })
1746 .unwrap();
1747 let metadata = builder.build().unwrap();
1748 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1749
1750 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1751 builder
1752 .alter(AlterKind::ModifyColumnTypes {
1753 columns: vec![ModifyColumnType {
1754 column_name: "b".to_string(),
1755 target_type: ConcreteDataType::string_datatype(),
1756 }],
1757 })
1758 .unwrap();
1759 let metadata = builder.build().unwrap();
1760 check_columns(&metadata, &["a", "b", "f", "c", "d"]);
1761 let b_type = &metadata
1762 .column_by_name("b")
1763 .unwrap()
1764 .column_schema
1765 .data_type;
1766 assert_eq!(ConcreteDataType::string_datatype(), *b_type);
1767
1768 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1769 builder
1770 .alter(AlterKind::SetIndexes {
1771 options: vec![SetIndexOption::Fulltext {
1772 column_name: "b".to_string(),
1773 options: FulltextOptions::new_unchecked(
1774 true,
1775 FulltextAnalyzer::Chinese,
1776 true,
1777 FulltextBackend::Bloom,
1778 1000,
1779 0.01,
1780 ),
1781 }],
1782 })
1783 .unwrap();
1784 let metadata = builder.build().unwrap();
1785 let a_fulltext_options = metadata
1786 .column_by_name("b")
1787 .unwrap()
1788 .column_schema
1789 .fulltext_options()
1790 .unwrap()
1791 .unwrap();
1792 assert!(a_fulltext_options.enable);
1793 assert_eq!(
1794 datatypes::schema::FulltextAnalyzer::Chinese,
1795 a_fulltext_options.analyzer
1796 );
1797 assert!(a_fulltext_options.case_sensitive);
1798
1799 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1800 builder
1801 .alter(AlterKind::UnsetIndexes {
1802 options: vec![UnsetIndexOption::Fulltext {
1803 column_name: "b".to_string(),
1804 }],
1805 })
1806 .unwrap();
1807 let metadata = builder.build().unwrap();
1808 let a_fulltext_options = metadata
1809 .column_by_name("b")
1810 .unwrap()
1811 .column_schema
1812 .fulltext_options()
1813 .unwrap()
1814 .unwrap();
1815 assert!(!a_fulltext_options.enable);
1816 assert_eq!(
1817 datatypes::schema::FulltextAnalyzer::Chinese,
1818 a_fulltext_options.analyzer
1819 );
1820 assert!(a_fulltext_options.case_sensitive);
1821 }
1822
1823 #[test]
1824 fn test_add_if_not_exists() {
1825 let metadata = build_test_region_metadata();
1827 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1828 builder
1830 .alter(AlterKind::AddColumns {
1831 columns: vec![
1832 AddColumn {
1833 column_metadata: new_column_metadata("d", true, 4),
1834 location: None,
1835 },
1836 AddColumn {
1837 column_metadata: new_column_metadata("d", true, 4),
1838 location: None,
1839 },
1840 ],
1841 })
1842 .unwrap();
1843 let metadata = builder.build().unwrap();
1844 check_columns(&metadata, &["a", "b", "c", "d"]);
1845 assert_eq!([1, 4], &metadata.primary_key[..]);
1846
1847 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1848 builder
1850 .alter(AlterKind::AddColumns {
1851 columns: vec![AddColumn {
1852 column_metadata: new_column_metadata("b", false, 2),
1853 location: None,
1854 }],
1855 })
1856 .unwrap();
1857 let metadata = builder.build().unwrap();
1858 check_columns(&metadata, &["a", "b", "c", "d"]);
1859 }
1860
1861 #[test]
1862 fn test_add_column_with_inverted_index() {
1863 let metadata = build_test_region_metadata();
1867 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1868 let mut col = new_column_metadata("d", true, 4);
1870 col.column_schema.set_inverted_index(true);
1871 builder
1872 .alter(AlterKind::AddColumns {
1873 columns: vec![
1874 AddColumn {
1875 column_metadata: col,
1876 location: None,
1877 },
1878 AddColumn {
1879 column_metadata: new_column_metadata("e", true, 5),
1880 location: None,
1881 },
1882 ],
1883 })
1884 .unwrap();
1885 let metadata = builder.build().unwrap();
1886 check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1887 assert_eq!([1, 4, 5], &metadata.primary_key[..]);
1888 let column_metadata = metadata.column_by_name("a").unwrap();
1889 assert!(!column_metadata.column_schema.is_inverted_indexed());
1890 let column_metadata = metadata.column_by_name("b").unwrap();
1891 assert!(!column_metadata.column_schema.is_inverted_indexed());
1892 let column_metadata = metadata.column_by_name("c").unwrap();
1893 assert!(!column_metadata.column_schema.is_inverted_indexed());
1894 let column_metadata = metadata.column_by_name("d").unwrap();
1895 assert!(column_metadata.column_schema.is_inverted_indexed());
1896 let column_metadata = metadata.column_by_name("e").unwrap();
1897 assert!(!column_metadata.column_schema.is_inverted_indexed());
1898 }
1899
1900 #[test]
1901 fn test_drop_if_exists() {
1902 let metadata = build_test_region_metadata();
1904 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1905 builder
1907 .alter(AlterKind::AddColumns {
1908 columns: vec![
1909 AddColumn {
1910 column_metadata: new_column_metadata("d", false, 4),
1911 location: None,
1912 },
1913 AddColumn {
1914 column_metadata: new_column_metadata("e", false, 5),
1915 location: None,
1916 },
1917 ],
1918 })
1919 .unwrap();
1920 let metadata = builder.build().unwrap();
1921 check_columns(&metadata, &["a", "b", "c", "d", "e"]);
1922
1923 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1924 builder
1925 .alter(AlterKind::DropColumns {
1926 names: vec!["b".to_string(), "b".to_string()],
1927 })
1928 .unwrap();
1929 let metadata = builder.build().unwrap();
1930 check_columns(&metadata, &["a", "c", "d", "e"]);
1931
1932 let mut builder = RegionMetadataBuilder::from_existing(metadata);
1933 builder
1934 .alter(AlterKind::DropColumns {
1935 names: vec!["b".to_string(), "e".to_string()],
1936 })
1937 .unwrap();
1938 let metadata = builder.build().unwrap();
1939 check_columns(&metadata, &["a", "c", "d"]);
1940 }
1941
1942 #[test]
1943 fn test_invalid_column_name() {
1944 let mut builder = create_builder();
1945 builder.push_column_metadata(ColumnMetadata {
1946 column_schema: ColumnSchema::new(
1947 "__sequence",
1948 ConcreteDataType::timestamp_millisecond_datatype(),
1949 false,
1950 ),
1951 semantic_type: SemanticType::Timestamp,
1952 column_id: 1,
1953 });
1954 let err = builder.build().unwrap_err();
1955 assert!(
1956 err.to_string()
1957 .contains("internal column name that can not be used"),
1958 "unexpected err: {err}",
1959 );
1960 }
1961
1962 #[test]
1963 fn test_allow_internal_column_name() {
1964 let mut builder = create_builder();
1965 builder
1966 .push_column_metadata(ColumnMetadata {
1967 column_schema: ColumnSchema::new(
1968 "__primary_key",
1969 ConcreteDataType::string_datatype(),
1970 false,
1971 ),
1972 semantic_type: SemanticType::Tag,
1973 column_id: 1,
1974 })
1975 .push_column_metadata(ColumnMetadata {
1976 column_schema: ColumnSchema::new(
1977 "ts",
1978 ConcreteDataType::timestamp_millisecond_datatype(),
1979 false,
1980 ),
1981 semantic_type: SemanticType::Timestamp,
1982 column_id: 2,
1983 })
1984 .primary_key(vec![1]);
1985
1986 let metadata = builder.build_without_validation().unwrap();
1987 assert_eq!(
1988 "__primary_key",
1989 metadata.column_metadatas[0].column_schema.name
1990 );
1991 }
1992
1993 #[test]
1994 fn test_build_without_validation() {
1995 let mut builder = create_builder();
1997 builder
1998 .push_column_metadata(ColumnMetadata {
1999 column_schema: ColumnSchema::new(
2000 "ts",
2001 ConcreteDataType::timestamp_millisecond_datatype(),
2002 false,
2003 ),
2004 semantic_type: SemanticType::Timestamp,
2005 column_id: 1,
2006 })
2007 .push_column_metadata(ColumnMetadata {
2008 column_schema: ColumnSchema::new(
2009 "field",
2010 ConcreteDataType::string_datatype(),
2011 true,
2012 ),
2013 semantic_type: SemanticType::Field,
2014 column_id: 2,
2015 })
2016 .primary_key(vec![2]);
2017
2018 let metadata = builder.build_without_validation().unwrap();
2020 assert_eq!(vec![2], metadata.primary_key);
2021
2022 let mut builder = create_builder();
2024 builder
2025 .push_column_metadata(ColumnMetadata {
2026 column_schema: ColumnSchema::new(
2027 "ts",
2028 ConcreteDataType::timestamp_millisecond_datatype(),
2029 false,
2030 ),
2031 semantic_type: SemanticType::Timestamp,
2032 column_id: 1,
2033 })
2034 .push_column_metadata(ColumnMetadata {
2035 column_schema: ColumnSchema::new(
2036 "field",
2037 ConcreteDataType::string_datatype(),
2038 true,
2039 ),
2040 semantic_type: SemanticType::Field,
2041 column_id: 2,
2042 })
2043 .primary_key(vec![2]);
2044 let err = builder.build().unwrap_err();
2045 assert!(
2046 err.to_string()
2047 .contains("semantic type of column field should be Tag"),
2048 "unexpected err: {err}"
2049 );
2050 }
2051
2052 #[test]
2053 fn test_debug_for_column_metadata() {
2054 let region_metadata = build_test_region_metadata();
2055 let formatted = format!("{:?}", region_metadata);
2056 assert_eq!(
2057 formatted,
2058 "RegionMetadata { column_metadatas: [[a Int64 not null Tag 1], [b Float64 not null Field 2], [c TimestampMillisecond not null Timestamp 3]], time_index: 3, primary_key: [1], region_id: 5299989648942(1234, 5678), schema_version: 0, partition_expr: Some(\"\") }"
2059 );
2060 }
2061
2062 #[test]
2063 fn test_region_metadata_deserialize_default_primary_key_encoding() {
2064 let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0}"#;
2065 let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2066 assert_eq!(deserialized.primary_key_encoding, PrimaryKeyEncoding::Dense);
2067
2068 let serialize = r#"{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"primary_key":[1],"region_id":5299989648942,"schema_version":0,"primary_key_encoding":"sparse"}"#;
2069 let deserialized: RegionMetadata = serde_json::from_str(serialize).unwrap();
2070 assert_eq!(
2071 deserialized.primary_key_encoding,
2072 PrimaryKeyEncoding::Sparse
2073 );
2074 }
2075}