1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22 ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23};
24use api::v1::column_def::options_from_column_schema;
25use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
26use common_telemetry::info;
27use datatypes::prelude::DataType;
28use partition::expr::PartitionExpr;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{
37 MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
38};
39use store_api::region_request::{
40 AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
41 RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
42 RegionCompactRequest, RegionCreateRequest, RegionFlushRequest, RegionOpenRequest,
43 RegionRequest, RegionTruncateRequest,
44};
45use store_api::storage::{FileId, RegionId};
46use tokio::sync::oneshot::{self, Receiver, Sender};
47
48use crate::error::{
49 CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
50 FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu,
51 Result, UnexpectedSnafu,
52};
53use crate::manifest::action::{RegionEdit, TruncateKind};
54use crate::memtable::MemtableId;
55use crate::memtable::bulk::part::BulkPart;
56use crate::metrics::COMPACTION_ELAPSED_TOTAL;
57use crate::region::options::RegionOptions;
58use crate::sst::file::FileMeta;
59use crate::sst::index::IndexBuildType;
60use crate::wal::EntryId;
61use crate::wal::entry_distributor::WalEntryReceiver;
62
63#[derive(Debug)]
65pub struct WriteRequest {
66 pub region_id: RegionId,
68 pub op_type: OpType,
70 pub rows: Rows,
72 pub name_to_index: HashMap<String, usize>,
74 pub has_null: Vec<bool>,
76 pub hint: Option<WriteHint>,
78 pub(crate) region_metadata: Option<RegionMetadataRef>,
80}
81
82impl WriteRequest {
83 pub fn new(
87 region_id: RegionId,
88 op_type: OpType,
89 rows: Rows,
90 region_metadata: Option<RegionMetadataRef>,
91 ) -> Result<WriteRequest> {
92 let mut name_to_index = HashMap::with_capacity(rows.schema.len());
93 for (index, column) in rows.schema.iter().enumerate() {
94 ensure!(
95 name_to_index
96 .insert(column.column_name.clone(), index)
97 .is_none(),
98 InvalidRequestSnafu {
99 region_id,
100 reason: format!("duplicate column {}", column.column_name),
101 }
102 );
103 }
104
105 let mut has_null = vec![false; rows.schema.len()];
106 for row in &rows.rows {
107 ensure!(
108 row.values.len() == rows.schema.len(),
109 InvalidRequestSnafu {
110 region_id,
111 reason: format!(
112 "row has {} columns but schema has {}",
113 row.values.len(),
114 rows.schema.len()
115 ),
116 }
117 );
118
119 for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
120 validate_proto_value(region_id, value, column_schema)?;
121
122 if value.value_data.is_none() {
123 has_null[i] = true;
124 }
125 }
126 }
127
128 Ok(WriteRequest {
129 region_id,
130 op_type,
131 rows,
132 name_to_index,
133 has_null,
134 hint: None,
135 region_metadata,
136 })
137 }
138
139 pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
141 self.hint = hint;
142 self
143 }
144
145 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
147 infer_primary_key_encoding_from_hint(self.hint.as_ref())
148 }
149
150 pub(crate) fn estimated_size(&self) -> usize {
152 let row_size = self
153 .rows
154 .rows
155 .first()
156 .map(|row| row.encoded_len())
157 .unwrap_or(0);
158 row_size * self.rows.rows.len()
159 }
160
161 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
163 self.name_to_index.get(name).copied()
164 }
165
166 pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
171 debug_assert_eq!(self.region_id, metadata.region_id);
172
173 let region_id = self.region_id;
174 let mut rows_columns: HashMap<_, _> = self
176 .rows
177 .schema
178 .iter()
179 .map(|column| (&column.column_name, column))
180 .collect();
181
182 let mut need_fill_default = false;
183 for column in &metadata.column_metadatas {
185 if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
186 ensure!(
188 is_column_type_value_eq(
189 input_col.datatype,
190 input_col.datatype_extension.clone(),
191 &column.column_schema.data_type
192 ),
193 InvalidRequestSnafu {
194 region_id,
195 reason: format!(
196 "column {} expect type {:?}, given: {}({})",
197 column.column_schema.name,
198 column.column_schema.data_type,
199 ColumnDataType::try_from(input_col.datatype)
200 .map(|v| v.as_str_name())
201 .unwrap_or("Unknown"),
202 input_col.datatype,
203 )
204 }
205 );
206
207 ensure!(
209 is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
210 InvalidRequestSnafu {
211 region_id,
212 reason: format!(
213 "column {} has semantic type {:?}, given: {}({})",
214 column.column_schema.name,
215 column.semantic_type,
216 api::v1::SemanticType::try_from(input_col.semantic_type)
217 .map(|v| v.as_str_name())
218 .unwrap_or("Unknown"),
219 input_col.semantic_type
220 ),
221 }
222 );
223
224 let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
227 ensure!(
228 !has_null || column.column_schema.is_nullable(),
229 InvalidRequestSnafu {
230 region_id,
231 reason: format!(
232 "column {} is not null but input has null",
233 column.column_schema.name
234 ),
235 }
236 );
237 } else {
238 self.check_missing_column(column)?;
240
241 need_fill_default = true;
242 }
243 }
244
245 if !rows_columns.is_empty() {
247 let names: Vec<_> = rows_columns.into_keys().collect();
248 return InvalidRequestSnafu {
249 region_id,
250 reason: format!("unknown columns: {:?}", names),
251 }
252 .fail();
253 }
254
255 ensure!(!need_fill_default, FillDefaultSnafu { region_id });
257
258 Ok(())
259 }
260
261 pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
266 debug_assert_eq!(self.region_id, metadata.region_id);
267
268 let mut columns_to_fill = vec![];
269 for column in &metadata.column_metadatas {
270 if !self.name_to_index.contains_key(&column.column_schema.name) {
271 columns_to_fill.push(column);
272 }
273 }
274 self.fill_columns(columns_to_fill)?;
275
276 Ok(())
277 }
278
279 pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
281 if let Err(e) = self.check_schema(metadata) {
282 if e.is_fill_default() {
283 self.fill_missing_columns(metadata)?;
287 } else {
288 return Err(e);
289 }
290 }
291
292 Ok(())
293 }
294
295 fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
297 let mut default_values = Vec::with_capacity(columns.len());
298 let mut columns_to_fill = Vec::with_capacity(columns.len());
299 for column in columns {
300 let default_value = self.column_default_value(column)?;
301 if default_value.value_data.is_some() {
302 default_values.push(default_value);
303 columns_to_fill.push(column);
304 }
305 }
306
307 for row in &mut self.rows.rows {
308 row.values.extend(default_values.iter().cloned());
309 }
310
311 for column in columns_to_fill {
312 let (datatype, datatype_ext) =
313 ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
314 .with_context(|_| ConvertColumnDataTypeSnafu {
315 reason: format!(
316 "no protobuf type for column {} ({:?})",
317 column.column_schema.name, column.column_schema.data_type
318 ),
319 })?
320 .to_parts();
321 self.rows.schema.push(ColumnSchema {
322 column_name: column.column_schema.name.clone(),
323 datatype: datatype as i32,
324 semantic_type: column.semantic_type as i32,
325 datatype_extension: datatype_ext,
326 options: options_from_column_schema(&column.column_schema),
327 });
328 }
329
330 Ok(())
331 }
332
333 fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
335 if self.op_type == OpType::Delete {
336 if column.semantic_type == SemanticType::Field {
337 return Ok(());
340 } else {
341 return InvalidRequestSnafu {
342 region_id: self.region_id,
343 reason: format!("delete requests need column {}", column.column_schema.name),
344 }
345 .fail();
346 }
347 }
348
349 ensure!(
351 column.column_schema.is_nullable()
352 || column.column_schema.default_constraint().is_some(),
353 InvalidRequestSnafu {
354 region_id: self.region_id,
355 reason: format!("missing column {}", column.column_schema.name),
356 }
357 );
358
359 Ok(())
360 }
361
362 fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
364 let default_value = match self.op_type {
365 OpType::Delete => {
366 ensure!(
367 column.semantic_type == SemanticType::Field,
368 InvalidRequestSnafu {
369 region_id: self.region_id,
370 reason: format!(
371 "delete requests need column {}",
372 column.column_schema.name
373 ),
374 }
375 );
376
377 if column.column_schema.is_nullable() {
382 datatypes::value::Value::Null
383 } else {
384 column.column_schema.data_type.default_value()
385 }
386 }
387 OpType::Put => {
388 if column.column_schema.is_default_impure() {
390 UnexpectedSnafu {
391 reason: format!(
392 "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
393 self.region_id,
394 column.column_schema.name,
395 column.column_schema.default_constraint(),
396 ),
397 }
398 .fail()?
399 }
400 column
401 .column_schema
402 .create_default()
403 .context(CreateDefaultSnafu {
404 region_id: self.region_id,
405 column: &column.column_schema.name,
406 })?
407 .with_context(|| InvalidRequestSnafu {
409 region_id: self.region_id,
410 reason: format!(
411 "column {} does not have default value",
412 column.column_schema.name
413 ),
414 })?
415 }
416 };
417
418 Ok(api::helper::to_grpc_value(default_value))
420 }
421}
422
423pub(crate) fn validate_proto_value(
425 region_id: RegionId,
426 value: &Value,
427 column_schema: &ColumnSchema,
428) -> Result<()> {
429 if let Some(value_type) = proto_value_type(value) {
430 let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
431 InvalidRequestSnafu {
432 region_id,
433 reason: format!(
434 "column {} has unknown type {}",
435 column_schema.column_name, column_schema.datatype
436 ),
437 }
438 .build()
439 })?;
440 ensure!(
441 proto_value_type_match(column_type, value_type),
442 InvalidRequestSnafu {
443 region_id,
444 reason: format!(
445 "value has type {:?}, but column {} has type {:?}({})",
446 value_type, column_schema.column_name, column_type, column_schema.datatype,
447 ),
448 }
449 );
450 }
451
452 Ok(())
453}
454
455fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
456 match (column_type, value_type) {
457 (ct, vt) if ct == vt => true,
458 (ColumnDataType::Vector, ColumnDataType::Binary) => true,
459 (ColumnDataType::Json, ColumnDataType::Binary) => true,
460 _ => false,
461 }
462}
463
464#[derive(Debug)]
466pub struct OutputTx(Sender<Result<AffectedRows>>);
467
468impl OutputTx {
469 pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
471 OutputTx(sender)
472 }
473
474 pub(crate) fn send(self, result: Result<AffectedRows>) {
476 let _ = self.0.send(result);
478 }
479}
480
481#[derive(Debug)]
483pub(crate) struct OptionOutputTx(Option<OutputTx>);
484
485impl OptionOutputTx {
486 pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
488 OptionOutputTx(sender)
489 }
490
491 pub(crate) fn none() -> OptionOutputTx {
493 OptionOutputTx(None)
494 }
495
496 pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
498 if let Some(sender) = self.0.take() {
499 sender.send(result);
500 }
501 }
502
503 pub(crate) fn send(mut self, result: Result<AffectedRows>) {
505 if let Some(sender) = self.0.take() {
506 sender.send(result);
507 }
508 }
509
510 pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
512 self.0.take()
513 }
514}
515
516impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
517 fn from(sender: Sender<Result<AffectedRows>>) -> Self {
518 Self::new(Some(OutputTx::new(sender)))
519 }
520}
521
522impl OnFailure for OptionOutputTx {
523 fn on_failure(&mut self, err: Error) {
524 self.send_mut(Err(err));
525 }
526}
527
528pub(crate) trait OnFailure {
530 fn on_failure(&mut self, err: Error);
532}
533
534#[derive(Debug)]
536pub(crate) struct SenderWriteRequest {
537 pub(crate) sender: OptionOutputTx,
539 pub(crate) request: WriteRequest,
540}
541
542pub(crate) struct SenderBulkRequest {
543 pub(crate) sender: OptionOutputTx,
544 pub(crate) region_id: RegionId,
545 pub(crate) request: BulkPart,
546 pub(crate) region_metadata: RegionMetadataRef,
547}
548
549#[derive(Debug)]
551pub(crate) struct WorkerRequestWithTime {
552 pub(crate) request: WorkerRequest,
553 pub(crate) created_at: Instant,
554}
555
556impl WorkerRequestWithTime {
557 pub(crate) fn new(request: WorkerRequest) -> Self {
558 Self {
559 request,
560 created_at: Instant::now(),
561 }
562 }
563}
564
565#[derive(Debug)]
567pub(crate) enum WorkerRequest {
568 Write(SenderWriteRequest),
570
571 Ddl(SenderDdlRequest),
573
574 Background {
576 region_id: RegionId,
578 notify: BackgroundNotify,
580 },
581
582 SetRegionRoleStateGracefully {
584 region_id: RegionId,
586 region_role_state: SettableRegionRoleState,
588 sender: Sender<SetRegionRoleStateResponse>,
590 },
591
592 Stop,
594
595 EditRegion(RegionEditRequest),
597
598 SyncRegion(RegionSyncRequest),
600
601 BulkInserts {
603 metadata: Option<RegionMetadataRef>,
604 request: RegionBulkInsertsRequest,
605 sender: OptionOutputTx,
606 },
607
608 RemapManifests(RemapManifestsRequest),
610
611 CopyRegionFrom(CopyRegionFromRequest),
613}
614
615impl WorkerRequest {
616 pub(crate) fn new_open_region_request(
618 region_id: RegionId,
619 request: RegionOpenRequest,
620 entry_receiver: Option<WalEntryReceiver>,
621 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
622 let (sender, receiver) = oneshot::channel();
623
624 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
625 region_id,
626 sender: sender.into(),
627 request: DdlRequest::Open((request, entry_receiver)),
628 });
629
630 (worker_request, receiver)
631 }
632
633 pub(crate) fn new_catchup_region_request(
635 region_id: RegionId,
636 request: RegionCatchupRequest,
637 entry_receiver: Option<WalEntryReceiver>,
638 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
639 let (sender, receiver) = oneshot::channel();
640 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
641 region_id,
642 sender: sender.into(),
643 request: DdlRequest::Catchup((request, entry_receiver)),
644 });
645 (worker_request, receiver)
646 }
647
648 pub(crate) fn try_from_region_request(
650 region_id: RegionId,
651 value: RegionRequest,
652 region_metadata: Option<RegionMetadataRef>,
653 ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
654 let (sender, receiver) = oneshot::channel();
655 let worker_request = match value {
656 RegionRequest::Put(v) => {
657 let mut write_request =
658 WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
659 .with_hint(v.hint);
660 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
661 && let Some(region_metadata) = ®ion_metadata
662 {
663 write_request.maybe_fill_missing_columns(region_metadata)?;
664 }
665 WorkerRequest::Write(SenderWriteRequest {
666 sender: sender.into(),
667 request: write_request,
668 })
669 }
670 RegionRequest::Delete(v) => {
671 let mut write_request =
672 WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
673 .with_hint(v.hint);
674 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
675 && let Some(region_metadata) = ®ion_metadata
676 {
677 write_request.maybe_fill_missing_columns(region_metadata)?;
678 }
679 WorkerRequest::Write(SenderWriteRequest {
680 sender: sender.into(),
681 request: write_request,
682 })
683 }
684 RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
685 region_id,
686 sender: sender.into(),
687 request: DdlRequest::Create(v),
688 }),
689 RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
690 region_id,
691 sender: sender.into(),
692 request: DdlRequest::Drop,
693 }),
694 RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
695 region_id,
696 sender: sender.into(),
697 request: DdlRequest::Open((v, None)),
698 }),
699 RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
700 region_id,
701 sender: sender.into(),
702 request: DdlRequest::Close(v),
703 }),
704 RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
705 region_id,
706 sender: sender.into(),
707 request: DdlRequest::Alter(v),
708 }),
709 RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
710 region_id,
711 sender: sender.into(),
712 request: DdlRequest::Flush(v),
713 }),
714 RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
715 region_id,
716 sender: sender.into(),
717 request: DdlRequest::Compact(v),
718 }),
719 RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
720 region_id,
721 sender: sender.into(),
722 request: DdlRequest::BuildIndex(v),
723 }),
724 RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
725 region_id,
726 sender: sender.into(),
727 request: DdlRequest::Truncate(v),
728 }),
729 RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
730 region_id,
731 sender: sender.into(),
732 request: DdlRequest::Catchup((v, None)),
733 }),
734 RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
735 region_id,
736 sender: sender.into(),
737 request: DdlRequest::EnterStaging(v),
738 }),
739 RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
740 metadata: region_metadata,
741 sender: sender.into(),
742 request: region_bulk_inserts_request,
743 },
744 RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest {
745 region_id,
746 sender: sender.into(),
747 request: DdlRequest::ApplyStagingManifest(v),
748 }),
749 };
750
751 Ok((worker_request, receiver))
752 }
753
754 pub(crate) fn new_set_readonly_gracefully(
755 region_id: RegionId,
756 region_role_state: SettableRegionRoleState,
757 ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
758 let (sender, receiver) = oneshot::channel();
759
760 (
761 WorkerRequest::SetRegionRoleStateGracefully {
762 region_id,
763 region_role_state,
764 sender,
765 },
766 receiver,
767 )
768 }
769
770 pub(crate) fn new_sync_region_request(
771 region_id: RegionId,
772 manifest_version: ManifestVersion,
773 ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
774 let (sender, receiver) = oneshot::channel();
775 (
776 WorkerRequest::SyncRegion(RegionSyncRequest {
777 region_id,
778 manifest_version,
779 sender,
780 }),
781 receiver,
782 )
783 }
784
785 #[allow(clippy::type_complexity)]
792 pub(crate) fn try_from_remap_manifests_request(
793 store_api::region_engine::RemapManifestsRequest {
794 region_id,
795 input_regions,
796 region_mapping,
797 new_partition_exprs,
798 }: store_api::region_engine::RemapManifestsRequest,
799 ) -> Result<(WorkerRequest, Receiver<Result<HashMap<RegionId, String>>>)> {
800 let (sender, receiver) = oneshot::channel();
801 let new_partition_exprs = new_partition_exprs
802 .into_iter()
803 .map(|(k, v)| {
804 Ok((
805 k,
806 PartitionExpr::from_json_str(&v)
807 .context(InvalidPartitionExprSnafu { expr: v })?
808 .context(MissingPartitionExprSnafu { region_id: k })?,
809 ))
810 })
811 .collect::<Result<HashMap<_, _>>>()?;
812
813 let request = RemapManifestsRequest {
814 region_id,
815 input_regions,
816 region_mapping,
817 new_partition_exprs,
818 sender,
819 };
820
821 Ok((WorkerRequest::RemapManifests(request), receiver))
822 }
823
824 pub(crate) fn try_from_copy_region_from_request(
826 region_id: RegionId,
827 store_api::region_engine::MitoCopyRegionFromRequest {
828 source_region_id,
829 parallelism,
830 }: store_api::region_engine::MitoCopyRegionFromRequest,
831 ) -> Result<(WorkerRequest, Receiver<Result<MitoCopyRegionFromResponse>>)> {
832 let (sender, receiver) = oneshot::channel();
833 let request = CopyRegionFromRequest {
834 region_id,
835 source_region_id,
836 parallelism,
837 sender,
838 };
839 Ok((WorkerRequest::CopyRegionFrom(request), receiver))
840 }
841}
842
843#[derive(Debug)]
845pub(crate) enum DdlRequest {
846 Create(RegionCreateRequest),
847 Drop,
848 Open((RegionOpenRequest, Option<WalEntryReceiver>)),
849 Close(RegionCloseRequest),
850 Alter(RegionAlterRequest),
851 Flush(RegionFlushRequest),
852 Compact(RegionCompactRequest),
853 BuildIndex(RegionBuildIndexRequest),
854 Truncate(RegionTruncateRequest),
855 Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
856 EnterStaging(EnterStagingRequest),
857 ApplyStagingManifest(ApplyStagingManifestRequest),
858}
859
860#[derive(Debug)]
862pub(crate) struct SenderDdlRequest {
863 pub(crate) region_id: RegionId,
865 pub(crate) sender: OptionOutputTx,
867 pub(crate) request: DdlRequest,
869}
870
871#[derive(Debug)]
873pub(crate) enum BackgroundNotify {
874 FlushFinished(FlushFinished),
876 FlushFailed(FlushFailed),
878 IndexBuildFinished(IndexBuildFinished),
880 IndexBuildStopped(IndexBuildStopped),
882 IndexBuildFailed(IndexBuildFailed),
884 CompactionFinished(CompactionFinished),
886 CompactionFailed(CompactionFailed),
888 Truncate(TruncateResult),
890 RegionChange(RegionChangeResult),
892 RegionEdit(RegionEditResult),
894 EnterStaging(EnterStagingResult),
896 CopyRegionFromFinished(CopyRegionFromFinished),
898}
899
900#[derive(Debug)]
902pub(crate) struct FlushFinished {
903 pub(crate) region_id: RegionId,
905 pub(crate) flushed_entry_id: EntryId,
907 pub(crate) senders: Vec<OutputTx>,
909 pub(crate) _timer: HistogramTimer,
911 pub(crate) edit: RegionEdit,
913 pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
915 pub(crate) is_staging: bool,
917}
918
919impl FlushFinished {
920 pub(crate) fn on_success(self) {
922 for sender in self.senders {
923 sender.send(Ok(0));
924 }
925 }
926}
927
928impl OnFailure for FlushFinished {
929 fn on_failure(&mut self, err: Error) {
930 let err = Arc::new(err);
931 for sender in self.senders.drain(..) {
932 sender.send(Err(err.clone()).context(FlushRegionSnafu {
933 region_id: self.region_id,
934 }));
935 }
936 }
937}
938
939#[derive(Debug)]
941pub(crate) struct FlushFailed {
942 pub(crate) err: Arc<Error>,
944}
945
946#[derive(Debug)]
947pub(crate) struct IndexBuildFinished {
948 #[allow(dead_code)]
949 pub(crate) region_id: RegionId,
950 pub(crate) edit: RegionEdit,
951}
952
953#[derive(Debug)]
955pub(crate) struct IndexBuildStopped {
956 #[allow(dead_code)]
957 pub(crate) region_id: RegionId,
958 pub(crate) file_id: FileId,
959}
960
961#[derive(Debug)]
963pub(crate) struct IndexBuildFailed {
964 pub(crate) err: Arc<Error>,
965}
966
967#[derive(Debug)]
969pub(crate) struct CompactionFinished {
970 pub(crate) region_id: RegionId,
972 pub(crate) senders: Vec<OutputTx>,
974 pub(crate) start_time: Instant,
976 pub(crate) edit: RegionEdit,
978}
979
980impl CompactionFinished {
981 pub fn on_success(self) {
982 COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
984
985 for sender in self.senders {
986 sender.send(Ok(0));
987 }
988 info!("Successfully compacted region: {}", self.region_id);
989 }
990}
991
992impl OnFailure for CompactionFinished {
993 fn on_failure(&mut self, err: Error) {
995 let err = Arc::new(err);
996 for sender in self.senders.drain(..) {
997 sender.send(Err(err.clone()).context(CompactRegionSnafu {
998 region_id: self.region_id,
999 }));
1000 }
1001 }
1002}
1003
1004#[derive(Debug)]
1006pub(crate) struct CompactionFailed {
1007 pub(crate) region_id: RegionId,
1008 pub(crate) err: Arc<Error>,
1010}
1011
1012#[derive(Debug)]
1014pub(crate) struct TruncateResult {
1015 pub(crate) region_id: RegionId,
1017 pub(crate) sender: OptionOutputTx,
1019 pub(crate) result: Result<()>,
1021 pub(crate) kind: TruncateKind,
1022}
1023
1024#[derive(Debug)]
1026pub(crate) struct RegionChangeResult {
1027 pub(crate) region_id: RegionId,
1029 pub(crate) new_meta: RegionMetadataRef,
1031 pub(crate) sender: OptionOutputTx,
1033 pub(crate) result: Result<()>,
1035 pub(crate) need_index: bool,
1037 pub(crate) new_options: Option<RegionOptions>,
1039}
1040
1041#[derive(Debug)]
1043pub(crate) struct EnterStagingResult {
1044 pub(crate) region_id: RegionId,
1046 pub(crate) partition_expr: String,
1048 pub(crate) sender: OptionOutputTx,
1050 pub(crate) result: Result<()>,
1052}
1053
1054#[derive(Debug)]
1055pub(crate) struct CopyRegionFromFinished {
1056 pub(crate) region_id: RegionId,
1058 pub(crate) edit: RegionEdit,
1060 pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1062}
1063
1064#[derive(Debug)]
1066pub(crate) struct RegionEditRequest {
1067 pub(crate) region_id: RegionId,
1068 pub(crate) edit: RegionEdit,
1069 pub(crate) tx: Sender<Result<()>>,
1071}
1072
1073#[derive(Debug)]
1075pub(crate) struct RegionEditResult {
1076 pub(crate) region_id: RegionId,
1078 pub(crate) sender: Sender<Result<()>>,
1080 pub(crate) edit: RegionEdit,
1082 pub(crate) result: Result<()>,
1084 pub(crate) update_region_state: bool,
1086 pub(crate) is_staging: bool,
1088}
1089
1090#[derive(Debug)]
1091pub(crate) struct BuildIndexRequest {
1092 pub(crate) region_id: RegionId,
1093 pub(crate) build_type: IndexBuildType,
1094 pub(crate) file_metas: Vec<FileMeta>,
1096}
1097
1098#[derive(Debug)]
1099pub(crate) struct RegionSyncRequest {
1100 pub(crate) region_id: RegionId,
1101 pub(crate) manifest_version: ManifestVersion,
1102 pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
1104}
1105
1106#[derive(Debug)]
1107pub(crate) struct RemapManifestsRequest {
1108 pub(crate) region_id: RegionId,
1110 pub(crate) input_regions: Vec<RegionId>,
1112 pub(crate) region_mapping: HashMap<RegionId, Vec<RegionId>>,
1114 pub(crate) new_partition_exprs: HashMap<RegionId, PartitionExpr>,
1116 pub(crate) sender: Sender<Result<HashMap<RegionId, String>>>,
1120}
1121
1122#[derive(Debug)]
1123pub(crate) struct CopyRegionFromRequest {
1124 pub(crate) region_id: RegionId,
1126 pub(crate) source_region_id: RegionId,
1128 pub(crate) parallelism: usize,
1130 pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1132}
1133
1134#[cfg(test)]
1135mod tests {
1136 use api::v1::value::ValueData;
1137 use api::v1::{Row, SemanticType};
1138 use datatypes::prelude::ConcreteDataType;
1139 use datatypes::schema::ColumnDefaultConstraint;
1140 use mito_codec::test_util::i64_value;
1141 use store_api::metadata::RegionMetadataBuilder;
1142
1143 use super::*;
1144 use crate::error::Error;
1145 use crate::test_util::ts_ms_value;
1146
1147 fn new_column_schema(
1148 name: &str,
1149 data_type: ColumnDataType,
1150 semantic_type: SemanticType,
1151 ) -> ColumnSchema {
1152 ColumnSchema {
1153 column_name: name.to_string(),
1154 datatype: data_type as i32,
1155 semantic_type: semantic_type as i32,
1156 ..Default::default()
1157 }
1158 }
1159
1160 fn check_invalid_request(err: &Error, expect: &str) {
1161 if let Error::InvalidRequest {
1162 region_id: _,
1163 reason,
1164 location: _,
1165 } = err
1166 {
1167 assert_eq!(reason, expect);
1168 } else {
1169 panic!("Unexpected error {err}")
1170 }
1171 }
1172
1173 #[test]
1174 fn test_write_request_duplicate_column() {
1175 let rows = Rows {
1176 schema: vec![
1177 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1178 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1179 ],
1180 rows: vec![],
1181 };
1182
1183 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1184 check_invalid_request(&err, "duplicate column c0");
1185 }
1186
1187 #[test]
1188 fn test_valid_write_request() {
1189 let rows = Rows {
1190 schema: vec![
1191 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1192 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1193 ],
1194 rows: vec![Row {
1195 values: vec![i64_value(1), i64_value(2)],
1196 }],
1197 };
1198
1199 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1200 assert_eq!(0, request.column_index_by_name("c0").unwrap());
1201 assert_eq!(1, request.column_index_by_name("c1").unwrap());
1202 assert_eq!(None, request.column_index_by_name("c2"));
1203 }
1204
1205 #[test]
1206 fn test_write_request_column_num() {
1207 let rows = Rows {
1208 schema: vec![
1209 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1210 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1211 ],
1212 rows: vec![Row {
1213 values: vec![i64_value(1), i64_value(2), i64_value(3)],
1214 }],
1215 };
1216
1217 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1218 check_invalid_request(&err, "row has 3 columns but schema has 2");
1219 }
1220
1221 fn new_region_metadata() -> RegionMetadata {
1222 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1223 builder
1224 .push_column_metadata(ColumnMetadata {
1225 column_schema: datatypes::schema::ColumnSchema::new(
1226 "ts",
1227 ConcreteDataType::timestamp_millisecond_datatype(),
1228 false,
1229 ),
1230 semantic_type: SemanticType::Timestamp,
1231 column_id: 1,
1232 })
1233 .push_column_metadata(ColumnMetadata {
1234 column_schema: datatypes::schema::ColumnSchema::new(
1235 "k0",
1236 ConcreteDataType::int64_datatype(),
1237 true,
1238 ),
1239 semantic_type: SemanticType::Tag,
1240 column_id: 2,
1241 })
1242 .primary_key(vec![2]);
1243 builder.build().unwrap()
1244 }
1245
1246 #[test]
1247 fn test_check_schema() {
1248 let rows = Rows {
1249 schema: vec![
1250 new_column_schema(
1251 "ts",
1252 ColumnDataType::TimestampMillisecond,
1253 SemanticType::Timestamp,
1254 ),
1255 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1256 ],
1257 rows: vec![Row {
1258 values: vec![ts_ms_value(1), i64_value(2)],
1259 }],
1260 };
1261 let metadata = new_region_metadata();
1262
1263 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1264 request.check_schema(&metadata).unwrap();
1265 }
1266
1267 #[test]
1268 fn test_column_type() {
1269 let rows = Rows {
1270 schema: vec![
1271 new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1272 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1273 ],
1274 rows: vec![Row {
1275 values: vec![i64_value(1), i64_value(2)],
1276 }],
1277 };
1278 let metadata = new_region_metadata();
1279
1280 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1281 let err = request.check_schema(&metadata).unwrap_err();
1282 check_invalid_request(
1283 &err,
1284 "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1285 );
1286 }
1287
1288 #[test]
1289 fn test_semantic_type() {
1290 let rows = Rows {
1291 schema: vec![
1292 new_column_schema(
1293 "ts",
1294 ColumnDataType::TimestampMillisecond,
1295 SemanticType::Tag,
1296 ),
1297 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1298 ],
1299 rows: vec![Row {
1300 values: vec![ts_ms_value(1), i64_value(2)],
1301 }],
1302 };
1303 let metadata = new_region_metadata();
1304
1305 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1306 let err = request.check_schema(&metadata).unwrap_err();
1307 check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1308 }
1309
1310 #[test]
1311 fn test_column_nullable() {
1312 let rows = Rows {
1313 schema: vec![
1314 new_column_schema(
1315 "ts",
1316 ColumnDataType::TimestampMillisecond,
1317 SemanticType::Timestamp,
1318 ),
1319 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1320 ],
1321 rows: vec![Row {
1322 values: vec![Value { value_data: None }, i64_value(2)],
1323 }],
1324 };
1325 let metadata = new_region_metadata();
1326
1327 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1328 let err = request.check_schema(&metadata).unwrap_err();
1329 check_invalid_request(&err, "column ts is not null but input has null");
1330 }
1331
1332 #[test]
1333 fn test_column_default() {
1334 let rows = Rows {
1335 schema: vec![new_column_schema(
1336 "k0",
1337 ColumnDataType::Int64,
1338 SemanticType::Tag,
1339 )],
1340 rows: vec![Row {
1341 values: vec![i64_value(1)],
1342 }],
1343 };
1344 let metadata = new_region_metadata();
1345
1346 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1347 let err = request.check_schema(&metadata).unwrap_err();
1348 check_invalid_request(&err, "missing column ts");
1349 }
1350
1351 #[test]
1352 fn test_unknown_column() {
1353 let rows = Rows {
1354 schema: vec![
1355 new_column_schema(
1356 "ts",
1357 ColumnDataType::TimestampMillisecond,
1358 SemanticType::Timestamp,
1359 ),
1360 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1361 new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1362 ],
1363 rows: vec![Row {
1364 values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1365 }],
1366 };
1367 let metadata = new_region_metadata();
1368
1369 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1370 let err = request.check_schema(&metadata).unwrap_err();
1371 check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1372 }
1373
1374 #[test]
1375 fn test_fill_impure_columns_err() {
1376 let rows = Rows {
1377 schema: vec![new_column_schema(
1378 "k0",
1379 ColumnDataType::Int64,
1380 SemanticType::Tag,
1381 )],
1382 rows: vec![Row {
1383 values: vec![i64_value(1)],
1384 }],
1385 };
1386 let metadata = {
1387 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1388 builder
1389 .push_column_metadata(ColumnMetadata {
1390 column_schema: datatypes::schema::ColumnSchema::new(
1391 "ts",
1392 ConcreteDataType::timestamp_millisecond_datatype(),
1393 false,
1394 )
1395 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1396 "now()".to_string(),
1397 )))
1398 .unwrap(),
1399 semantic_type: SemanticType::Timestamp,
1400 column_id: 1,
1401 })
1402 .push_column_metadata(ColumnMetadata {
1403 column_schema: datatypes::schema::ColumnSchema::new(
1404 "k0",
1405 ConcreteDataType::int64_datatype(),
1406 true,
1407 ),
1408 semantic_type: SemanticType::Tag,
1409 column_id: 2,
1410 })
1411 .primary_key(vec![2]);
1412 builder.build().unwrap()
1413 };
1414
1415 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1416 let err = request.check_schema(&metadata).unwrap_err();
1417 assert!(err.is_fill_default());
1418 assert!(
1419 request
1420 .fill_missing_columns(&metadata)
1421 .unwrap_err()
1422 .to_string()
1423 .contains("unexpected impure default value with region_id")
1424 );
1425 }
1426
1427 #[test]
1428 fn test_fill_missing_columns() {
1429 let rows = Rows {
1430 schema: vec![new_column_schema(
1431 "ts",
1432 ColumnDataType::TimestampMillisecond,
1433 SemanticType::Timestamp,
1434 )],
1435 rows: vec![Row {
1436 values: vec![ts_ms_value(1)],
1437 }],
1438 };
1439 let metadata = new_region_metadata();
1440
1441 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1442 let err = request.check_schema(&metadata).unwrap_err();
1443 assert!(err.is_fill_default());
1444 request.fill_missing_columns(&metadata).unwrap();
1445
1446 let expect_rows = Rows {
1447 schema: vec![new_column_schema(
1448 "ts",
1449 ColumnDataType::TimestampMillisecond,
1450 SemanticType::Timestamp,
1451 )],
1452 rows: vec![Row {
1453 values: vec![ts_ms_value(1)],
1454 }],
1455 };
1456 assert_eq!(expect_rows, request.rows);
1457 }
1458
1459 fn builder_with_ts_tag() -> RegionMetadataBuilder {
1460 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1461 builder
1462 .push_column_metadata(ColumnMetadata {
1463 column_schema: datatypes::schema::ColumnSchema::new(
1464 "ts",
1465 ConcreteDataType::timestamp_millisecond_datatype(),
1466 false,
1467 ),
1468 semantic_type: SemanticType::Timestamp,
1469 column_id: 1,
1470 })
1471 .push_column_metadata(ColumnMetadata {
1472 column_schema: datatypes::schema::ColumnSchema::new(
1473 "k0",
1474 ConcreteDataType::int64_datatype(),
1475 true,
1476 ),
1477 semantic_type: SemanticType::Tag,
1478 column_id: 2,
1479 })
1480 .primary_key(vec![2]);
1481 builder
1482 }
1483
1484 fn region_metadata_two_fields() -> RegionMetadata {
1485 let mut builder = builder_with_ts_tag();
1486 builder
1487 .push_column_metadata(ColumnMetadata {
1488 column_schema: datatypes::schema::ColumnSchema::new(
1489 "f0",
1490 ConcreteDataType::int64_datatype(),
1491 true,
1492 ),
1493 semantic_type: SemanticType::Field,
1494 column_id: 3,
1495 })
1496 .push_column_metadata(ColumnMetadata {
1498 column_schema: datatypes::schema::ColumnSchema::new(
1499 "f1",
1500 ConcreteDataType::int64_datatype(),
1501 false,
1502 )
1503 .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1504 datatypes::value::Value::Int64(100),
1505 )))
1506 .unwrap(),
1507 semantic_type: SemanticType::Field,
1508 column_id: 4,
1509 });
1510 builder.build().unwrap()
1511 }
1512
1513 #[test]
1514 fn test_fill_missing_for_delete() {
1515 let rows = Rows {
1516 schema: vec![new_column_schema(
1517 "ts",
1518 ColumnDataType::TimestampMillisecond,
1519 SemanticType::Timestamp,
1520 )],
1521 rows: vec![Row {
1522 values: vec![ts_ms_value(1)],
1523 }],
1524 };
1525 let metadata = region_metadata_two_fields();
1526
1527 let mut request =
1528 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1529 let err = request.check_schema(&metadata).unwrap_err();
1530 check_invalid_request(&err, "delete requests need column k0");
1531 let err = request.fill_missing_columns(&metadata).unwrap_err();
1532 check_invalid_request(&err, "delete requests need column k0");
1533
1534 let rows = Rows {
1535 schema: vec![
1536 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1537 new_column_schema(
1538 "ts",
1539 ColumnDataType::TimestampMillisecond,
1540 SemanticType::Timestamp,
1541 ),
1542 ],
1543 rows: vec![Row {
1544 values: vec![i64_value(100), ts_ms_value(1)],
1545 }],
1546 };
1547 let mut request =
1548 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1549 let err = request.check_schema(&metadata).unwrap_err();
1550 assert!(err.is_fill_default());
1551 request.fill_missing_columns(&metadata).unwrap();
1552
1553 let expect_rows = Rows {
1554 schema: vec![
1555 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1556 new_column_schema(
1557 "ts",
1558 ColumnDataType::TimestampMillisecond,
1559 SemanticType::Timestamp,
1560 ),
1561 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1562 ],
1563 rows: vec![Row {
1565 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1566 }],
1567 };
1568 assert_eq!(expect_rows, request.rows);
1569 }
1570
1571 #[test]
1572 fn test_fill_missing_without_default_in_delete() {
1573 let mut builder = builder_with_ts_tag();
1574 builder
1575 .push_column_metadata(ColumnMetadata {
1577 column_schema: datatypes::schema::ColumnSchema::new(
1578 "f0",
1579 ConcreteDataType::int64_datatype(),
1580 true,
1581 ),
1582 semantic_type: SemanticType::Field,
1583 column_id: 3,
1584 })
1585 .push_column_metadata(ColumnMetadata {
1587 column_schema: datatypes::schema::ColumnSchema::new(
1588 "f1",
1589 ConcreteDataType::int64_datatype(),
1590 false,
1591 ),
1592 semantic_type: SemanticType::Field,
1593 column_id: 4,
1594 });
1595 let metadata = builder.build().unwrap();
1596
1597 let rows = Rows {
1598 schema: vec![
1599 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1600 new_column_schema(
1601 "ts",
1602 ColumnDataType::TimestampMillisecond,
1603 SemanticType::Timestamp,
1604 ),
1605 ],
1606 rows: vec![Row {
1608 values: vec![i64_value(100), ts_ms_value(1)],
1609 }],
1610 };
1611 let mut request =
1612 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1613 let err = request.check_schema(&metadata).unwrap_err();
1614 assert!(err.is_fill_default());
1615 request.fill_missing_columns(&metadata).unwrap();
1616
1617 let expect_rows = Rows {
1618 schema: vec![
1619 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1620 new_column_schema(
1621 "ts",
1622 ColumnDataType::TimestampMillisecond,
1623 SemanticType::Timestamp,
1624 ),
1625 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1626 ],
1627 rows: vec![Row {
1629 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1630 }],
1631 };
1632 assert_eq!(expect_rows, request.rows);
1633 }
1634
1635 #[test]
1636 fn test_no_default() {
1637 let rows = Rows {
1638 schema: vec![new_column_schema(
1639 "k0",
1640 ColumnDataType::Int64,
1641 SemanticType::Tag,
1642 )],
1643 rows: vec![Row {
1644 values: vec![i64_value(1)],
1645 }],
1646 };
1647 let metadata = new_region_metadata();
1648
1649 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1650 let err = request.fill_missing_columns(&metadata).unwrap_err();
1651 check_invalid_request(&err, "column ts does not have default value");
1652 }
1653
1654 #[test]
1655 fn test_missing_and_invalid() {
1656 let rows = Rows {
1658 schema: vec![
1659 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1660 new_column_schema(
1661 "ts",
1662 ColumnDataType::TimestampMillisecond,
1663 SemanticType::Timestamp,
1664 ),
1665 new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1666 ],
1667 rows: vec![Row {
1668 values: vec![
1669 i64_value(100),
1670 ts_ms_value(1),
1671 Value {
1672 value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1673 },
1674 ],
1675 }],
1676 };
1677 let metadata = region_metadata_two_fields();
1678
1679 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1680 let err = request.check_schema(&metadata).unwrap_err();
1681 check_invalid_request(
1682 &err,
1683 "column f1 expect type Int64(Int64Type), given: STRING(12)",
1684 );
1685 }
1686
1687 #[test]
1688 fn test_write_request_metadata() {
1689 let rows = Rows {
1690 schema: vec![
1691 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1692 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1693 ],
1694 rows: vec![Row {
1695 values: vec![i64_value(1), i64_value(2)],
1696 }],
1697 };
1698
1699 let metadata = Arc::new(new_region_metadata());
1700 let request = WriteRequest::new(
1701 RegionId::new(1, 1),
1702 OpType::Put,
1703 rows,
1704 Some(metadata.clone()),
1705 )
1706 .unwrap();
1707
1708 assert!(request.region_metadata.is_some());
1709 assert_eq!(
1710 request.region_metadata.unwrap().region_id,
1711 RegionId::new(1, 1)
1712 );
1713 }
1714}