mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23};
24use api::v1::column_def::options_from_column_schema;
25use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
26use common_telemetry::info;
27use datatypes::prelude::DataType;
28use partition::expr::PartitionExpr;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{
37    MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
38};
39use store_api::region_request::{
40    AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
41    RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
42    RegionCompactRequest, RegionCreateRequest, RegionFlushRequest, RegionOpenRequest,
43    RegionRequest, RegionTruncateRequest,
44};
45use store_api::storage::{FileId, RegionId};
46use tokio::sync::oneshot::{self, Receiver, Sender};
47
48use crate::error::{
49    CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
50    FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu,
51    Result, UnexpectedSnafu,
52};
53use crate::manifest::action::{RegionEdit, TruncateKind};
54use crate::memtable::MemtableId;
55use crate::memtable::bulk::part::BulkPart;
56use crate::metrics::COMPACTION_ELAPSED_TOTAL;
57use crate::region::options::RegionOptions;
58use crate::sst::file::FileMeta;
59use crate::sst::index::IndexBuildType;
60use crate::wal::EntryId;
61use crate::wal::entry_distributor::WalEntryReceiver;
62
63/// Request to write a region.
64#[derive(Debug)]
65pub struct WriteRequest {
66    /// Region to write.
67    pub region_id: RegionId,
68    /// Type of the write request.
69    pub op_type: OpType,
70    /// Rows to write.
71    pub rows: Rows,
72    /// Map column name to column index in `rows`.
73    pub name_to_index: HashMap<String, usize>,
74    /// Whether each column has null.
75    pub has_null: Vec<bool>,
76    /// Write hint.
77    pub hint: Option<WriteHint>,
78    /// Region metadata on the time of this request is created.
79    pub(crate) region_metadata: Option<RegionMetadataRef>,
80}
81
82impl WriteRequest {
83    /// Creates a new request.
84    ///
85    /// Returns `Err` if `rows` are invalid.
86    pub fn new(
87        region_id: RegionId,
88        op_type: OpType,
89        rows: Rows,
90        region_metadata: Option<RegionMetadataRef>,
91    ) -> Result<WriteRequest> {
92        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
93        for (index, column) in rows.schema.iter().enumerate() {
94            ensure!(
95                name_to_index
96                    .insert(column.column_name.clone(), index)
97                    .is_none(),
98                InvalidRequestSnafu {
99                    region_id,
100                    reason: format!("duplicate column {}", column.column_name),
101                }
102            );
103        }
104
105        let mut has_null = vec![false; rows.schema.len()];
106        for row in &rows.rows {
107            ensure!(
108                row.values.len() == rows.schema.len(),
109                InvalidRequestSnafu {
110                    region_id,
111                    reason: format!(
112                        "row has {} columns but schema has {}",
113                        row.values.len(),
114                        rows.schema.len()
115                    ),
116                }
117            );
118
119            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
120                validate_proto_value(region_id, value, column_schema)?;
121
122                if value.value_data.is_none() {
123                    has_null[i] = true;
124                }
125            }
126        }
127
128        Ok(WriteRequest {
129            region_id,
130            op_type,
131            rows,
132            name_to_index,
133            has_null,
134            hint: None,
135            region_metadata,
136        })
137    }
138
139    /// Sets the write hint.
140    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
141        self.hint = hint;
142        self
143    }
144
145    /// Returns the encoding hint.
146    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
147        infer_primary_key_encoding_from_hint(self.hint.as_ref())
148    }
149
150    /// Returns estimated size of the request.
151    pub(crate) fn estimated_size(&self) -> usize {
152        let row_size = self
153            .rows
154            .rows
155            .first()
156            .map(|row| row.encoded_len())
157            .unwrap_or(0);
158        row_size * self.rows.rows.len()
159    }
160
161    /// Gets column index by name.
162    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
163        self.name_to_index.get(name).copied()
164    }
165
166    /// Checks schema of rows is compatible with schema of the region.
167    ///
168    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
169    /// error.
170    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
171        debug_assert_eq!(self.region_id, metadata.region_id);
172
173        let region_id = self.region_id;
174        // Index all columns in rows.
175        let mut rows_columns: HashMap<_, _> = self
176            .rows
177            .schema
178            .iter()
179            .map(|column| (&column.column_name, column))
180            .collect();
181
182        let mut need_fill_default = false;
183        // Checks all columns in this region.
184        for column in &metadata.column_metadatas {
185            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
186                // Check data type.
187                ensure!(
188                    is_column_type_value_eq(
189                        input_col.datatype,
190                        input_col.datatype_extension.clone(),
191                        &column.column_schema.data_type
192                    ),
193                    InvalidRequestSnafu {
194                        region_id,
195                        reason: format!(
196                            "column {} expect type {:?}, given: {}({})",
197                            column.column_schema.name,
198                            column.column_schema.data_type,
199                            ColumnDataType::try_from(input_col.datatype)
200                                .map(|v| v.as_str_name())
201                                .unwrap_or("Unknown"),
202                            input_col.datatype,
203                        )
204                    }
205                );
206
207                // Check semantic type.
208                ensure!(
209                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
210                    InvalidRequestSnafu {
211                        region_id,
212                        reason: format!(
213                            "column {} has semantic type {:?}, given: {}({})",
214                            column.column_schema.name,
215                            column.semantic_type,
216                            api::v1::SemanticType::try_from(input_col.semantic_type)
217                                .map(|v| v.as_str_name())
218                                .unwrap_or("Unknown"),
219                            input_col.semantic_type
220                        ),
221                    }
222                );
223
224                // Check nullable.
225                // Safety: `rows_columns` ensures this column exists.
226                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
227                ensure!(
228                    !has_null || column.column_schema.is_nullable(),
229                    InvalidRequestSnafu {
230                        region_id,
231                        reason: format!(
232                            "column {} is not null but input has null",
233                            column.column_schema.name
234                        ),
235                    }
236                );
237            } else {
238                // Rows don't have this column.
239                self.check_missing_column(column)?;
240
241                need_fill_default = true;
242            }
243        }
244
245        // Checks all columns in rows exist in the region.
246        if !rows_columns.is_empty() {
247            let names: Vec<_> = rows_columns.into_keys().collect();
248            return InvalidRequestSnafu {
249                region_id,
250                reason: format!("unknown columns: {:?}", names),
251            }
252            .fail();
253        }
254
255        // If we need to fill default values, return a special error.
256        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
257
258        Ok(())
259    }
260
261    /// Tries to fill missing columns.
262    ///
263    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
264    /// values.
265    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
266        debug_assert_eq!(self.region_id, metadata.region_id);
267
268        let mut columns_to_fill = vec![];
269        for column in &metadata.column_metadatas {
270            if !self.name_to_index.contains_key(&column.column_schema.name) {
271                columns_to_fill.push(column);
272            }
273        }
274        self.fill_columns(columns_to_fill)?;
275
276        Ok(())
277    }
278
279    /// Checks the schema and fill missing columns.
280    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
281        if let Err(e) = self.check_schema(metadata) {
282            if e.is_fill_default() {
283                // TODO(yingwen): Add metrics for this case.
284                // We need to fill default value. The write request may be a request
285                // sent before changing the schema.
286                self.fill_missing_columns(metadata)?;
287            } else {
288                return Err(e);
289            }
290        }
291
292        Ok(())
293    }
294
295    /// Fills default value for specific `columns`.
296    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
297        let mut default_values = Vec::with_capacity(columns.len());
298        let mut columns_to_fill = Vec::with_capacity(columns.len());
299        for column in columns {
300            let default_value = self.column_default_value(column)?;
301            if default_value.value_data.is_some() {
302                default_values.push(default_value);
303                columns_to_fill.push(column);
304            }
305        }
306
307        for row in &mut self.rows.rows {
308            row.values.extend(default_values.iter().cloned());
309        }
310
311        for column in columns_to_fill {
312            let (datatype, datatype_ext) =
313                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
314                    .with_context(|_| ConvertColumnDataTypeSnafu {
315                        reason: format!(
316                            "no protobuf type for column {} ({:?})",
317                            column.column_schema.name, column.column_schema.data_type
318                        ),
319                    })?
320                    .to_parts();
321            self.rows.schema.push(ColumnSchema {
322                column_name: column.column_schema.name.clone(),
323                datatype: datatype as i32,
324                semantic_type: column.semantic_type as i32,
325                datatype_extension: datatype_ext,
326                options: options_from_column_schema(&column.column_schema),
327            });
328        }
329
330        Ok(())
331    }
332
333    /// Checks whether we should allow a row doesn't provide this column.
334    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
335        if self.op_type == OpType::Delete {
336            if column.semantic_type == SemanticType::Field {
337                // For delete request, all tags and timestamp is required. We don't fill default
338                // tag or timestamp while deleting rows.
339                return Ok(());
340            } else {
341                return InvalidRequestSnafu {
342                    region_id: self.region_id,
343                    reason: format!("delete requests need column {}", column.column_schema.name),
344                }
345                .fail();
346            }
347        }
348
349        // Not a delete request. Checks whether they have default value.
350        ensure!(
351            column.column_schema.is_nullable()
352                || column.column_schema.default_constraint().is_some(),
353            InvalidRequestSnafu {
354                region_id: self.region_id,
355                reason: format!("missing column {}", column.column_schema.name),
356            }
357        );
358
359        Ok(())
360    }
361
362    /// Returns the default value for specific column.
363    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
364        let default_value = match self.op_type {
365            OpType::Delete => {
366                ensure!(
367                    column.semantic_type == SemanticType::Field,
368                    InvalidRequestSnafu {
369                        region_id: self.region_id,
370                        reason: format!(
371                            "delete requests need column {}",
372                            column.column_schema.name
373                        ),
374                    }
375                );
376
377                // For delete request, we need a default value for padding so we
378                // can delete a row even a field doesn't have a default value. So the
379                // value doesn't need to following the default value constraint of the
380                // column.
381                if column.column_schema.is_nullable() {
382                    datatypes::value::Value::Null
383                } else {
384                    column.column_schema.data_type.default_value()
385                }
386            }
387            OpType::Put => {
388                // For put requests, we use the default value from column schema.
389                if column.column_schema.is_default_impure() {
390                    UnexpectedSnafu {
391                        reason: format!(
392                            "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
393                            self.region_id,
394                            column.column_schema.name,
395                            column.column_schema.default_constraint(),
396                        ),
397                    }
398                    .fail()?
399                }
400                column
401                    .column_schema
402                    .create_default()
403                    .context(CreateDefaultSnafu {
404                        region_id: self.region_id,
405                        column: &column.column_schema.name,
406                    })?
407                    // This column doesn't have default value.
408                    .with_context(|| InvalidRequestSnafu {
409                        region_id: self.region_id,
410                        reason: format!(
411                            "column {} does not have default value",
412                            column.column_schema.name
413                        ),
414                    })?
415            }
416        };
417
418        // Convert default value into proto's value.
419        Ok(api::helper::to_grpc_value(default_value))
420    }
421}
422
423/// Validate proto value schema.
424pub(crate) fn validate_proto_value(
425    region_id: RegionId,
426    value: &Value,
427    column_schema: &ColumnSchema,
428) -> Result<()> {
429    if let Some(value_type) = proto_value_type(value) {
430        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
431            InvalidRequestSnafu {
432                region_id,
433                reason: format!(
434                    "column {} has unknown type {}",
435                    column_schema.column_name, column_schema.datatype
436                ),
437            }
438            .build()
439        })?;
440        ensure!(
441            proto_value_type_match(column_type, value_type),
442            InvalidRequestSnafu {
443                region_id,
444                reason: format!(
445                    "value has type {:?}, but column {} has type {:?}({})",
446                    value_type, column_schema.column_name, column_type, column_schema.datatype,
447                ),
448            }
449        );
450    }
451
452    Ok(())
453}
454
455fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
456    match (column_type, value_type) {
457        (ct, vt) if ct == vt => true,
458        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
459        (ColumnDataType::Json, ColumnDataType::Binary) => true,
460        _ => false,
461    }
462}
463
464/// Oneshot output result sender.
465#[derive(Debug)]
466pub struct OutputTx(Sender<Result<AffectedRows>>);
467
468impl OutputTx {
469    /// Creates a new output sender.
470    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
471        OutputTx(sender)
472    }
473
474    /// Sends the `result`.
475    pub(crate) fn send(self, result: Result<AffectedRows>) {
476        // Ignores send result.
477        let _ = self.0.send(result);
478    }
479}
480
481/// Optional output result sender.
482#[derive(Debug)]
483pub(crate) struct OptionOutputTx(Option<OutputTx>);
484
485impl OptionOutputTx {
486    /// Creates a sender.
487    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
488        OptionOutputTx(sender)
489    }
490
491    /// Creates an empty sender.
492    pub(crate) fn none() -> OptionOutputTx {
493        OptionOutputTx(None)
494    }
495
496    /// Sends the `result` and consumes the inner sender.
497    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
498        if let Some(sender) = self.0.take() {
499            sender.send(result);
500        }
501    }
502
503    /// Sends the `result` and consumes the sender.
504    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
505        if let Some(sender) = self.0.take() {
506            sender.send(result);
507        }
508    }
509
510    /// Takes the inner sender.
511    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
512        self.0.take()
513    }
514}
515
516impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
517    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
518        Self::new(Some(OutputTx::new(sender)))
519    }
520}
521
522impl OnFailure for OptionOutputTx {
523    fn on_failure(&mut self, err: Error) {
524        self.send_mut(Err(err));
525    }
526}
527
528/// Callback on failure.
529pub(crate) trait OnFailure {
530    /// Handles `err` on failure.
531    fn on_failure(&mut self, err: Error);
532}
533
534/// Sender and write request.
535#[derive(Debug)]
536pub(crate) struct SenderWriteRequest {
537    /// Result sender.
538    pub(crate) sender: OptionOutputTx,
539    pub(crate) request: WriteRequest,
540}
541
542pub(crate) struct SenderBulkRequest {
543    pub(crate) sender: OptionOutputTx,
544    pub(crate) region_id: RegionId,
545    pub(crate) request: BulkPart,
546    pub(crate) region_metadata: RegionMetadataRef,
547}
548
549/// Request sent to a worker with timestamp
550#[derive(Debug)]
551pub(crate) struct WorkerRequestWithTime {
552    pub(crate) request: WorkerRequest,
553    pub(crate) created_at: Instant,
554}
555
556impl WorkerRequestWithTime {
557    pub(crate) fn new(request: WorkerRequest) -> Self {
558        Self {
559            request,
560            created_at: Instant::now(),
561        }
562    }
563}
564
565/// Request sent to a worker
566#[derive(Debug)]
567pub(crate) enum WorkerRequest {
568    /// Write to a region.
569    Write(SenderWriteRequest),
570
571    /// Ddl request to a region.
572    Ddl(SenderDdlRequest),
573
574    /// Notifications from internal background jobs.
575    Background {
576        /// Id of the region to send.
577        region_id: RegionId,
578        /// Internal notification.
579        notify: BackgroundNotify,
580    },
581
582    /// The internal commands.
583    SetRegionRoleStateGracefully {
584        /// Id of the region to send.
585        region_id: RegionId,
586        /// The [SettableRegionRoleState].
587        region_role_state: SettableRegionRoleState,
588        /// The sender of [SetReadonlyResponse].
589        sender: Sender<SetRegionRoleStateResponse>,
590    },
591
592    /// Notify a worker to stop.
593    Stop,
594
595    /// Use [RegionEdit] to edit a region directly.
596    EditRegion(RegionEditRequest),
597
598    /// Keep the manifest of a region up to date.
599    SyncRegion(RegionSyncRequest),
600
601    /// Bulk inserts request and region metadata.
602    BulkInserts {
603        metadata: Option<RegionMetadataRef>,
604        request: RegionBulkInsertsRequest,
605        sender: OptionOutputTx,
606    },
607
608    /// Remap manifests request.
609    RemapManifests(RemapManifestsRequest),
610
611    /// Copy region from request.
612    CopyRegionFrom(CopyRegionFromRequest),
613}
614
615impl WorkerRequest {
616    /// Creates a new open region request.
617    pub(crate) fn new_open_region_request(
618        region_id: RegionId,
619        request: RegionOpenRequest,
620        entry_receiver: Option<WalEntryReceiver>,
621    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
622        let (sender, receiver) = oneshot::channel();
623
624        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
625            region_id,
626            sender: sender.into(),
627            request: DdlRequest::Open((request, entry_receiver)),
628        });
629
630        (worker_request, receiver)
631    }
632
633    /// Creates a new catchup region request.
634    pub(crate) fn new_catchup_region_request(
635        region_id: RegionId,
636        request: RegionCatchupRequest,
637        entry_receiver: Option<WalEntryReceiver>,
638    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
639        let (sender, receiver) = oneshot::channel();
640        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
641            region_id,
642            sender: sender.into(),
643            request: DdlRequest::Catchup((request, entry_receiver)),
644        });
645        (worker_request, receiver)
646    }
647
648    /// Converts request from a [RegionRequest].
649    pub(crate) fn try_from_region_request(
650        region_id: RegionId,
651        value: RegionRequest,
652        region_metadata: Option<RegionMetadataRef>,
653    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
654        let (sender, receiver) = oneshot::channel();
655        let worker_request = match value {
656            RegionRequest::Put(v) => {
657                let mut write_request =
658                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
659                        .with_hint(v.hint);
660                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
661                    && let Some(region_metadata) = &region_metadata
662                {
663                    write_request.maybe_fill_missing_columns(region_metadata)?;
664                }
665                WorkerRequest::Write(SenderWriteRequest {
666                    sender: sender.into(),
667                    request: write_request,
668                })
669            }
670            RegionRequest::Delete(v) => {
671                let mut write_request =
672                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
673                        .with_hint(v.hint);
674                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
675                    && let Some(region_metadata) = &region_metadata
676                {
677                    write_request.maybe_fill_missing_columns(region_metadata)?;
678                }
679                WorkerRequest::Write(SenderWriteRequest {
680                    sender: sender.into(),
681                    request: write_request,
682                })
683            }
684            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
685                region_id,
686                sender: sender.into(),
687                request: DdlRequest::Create(v),
688            }),
689            RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
690                region_id,
691                sender: sender.into(),
692                request: DdlRequest::Drop,
693            }),
694            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
695                region_id,
696                sender: sender.into(),
697                request: DdlRequest::Open((v, None)),
698            }),
699            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
700                region_id,
701                sender: sender.into(),
702                request: DdlRequest::Close(v),
703            }),
704            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
705                region_id,
706                sender: sender.into(),
707                request: DdlRequest::Alter(v),
708            }),
709            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
710                region_id,
711                sender: sender.into(),
712                request: DdlRequest::Flush(v),
713            }),
714            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
715                region_id,
716                sender: sender.into(),
717                request: DdlRequest::Compact(v),
718            }),
719            RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
720                region_id,
721                sender: sender.into(),
722                request: DdlRequest::BuildIndex(v),
723            }),
724            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
725                region_id,
726                sender: sender.into(),
727                request: DdlRequest::Truncate(v),
728            }),
729            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
730                region_id,
731                sender: sender.into(),
732                request: DdlRequest::Catchup((v, None)),
733            }),
734            RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
735                region_id,
736                sender: sender.into(),
737                request: DdlRequest::EnterStaging(v),
738            }),
739            RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
740                metadata: region_metadata,
741                sender: sender.into(),
742                request: region_bulk_inserts_request,
743            },
744            RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest {
745                region_id,
746                sender: sender.into(),
747                request: DdlRequest::ApplyStagingManifest(v),
748            }),
749        };
750
751        Ok((worker_request, receiver))
752    }
753
754    pub(crate) fn new_set_readonly_gracefully(
755        region_id: RegionId,
756        region_role_state: SettableRegionRoleState,
757    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
758        let (sender, receiver) = oneshot::channel();
759
760        (
761            WorkerRequest::SetRegionRoleStateGracefully {
762                region_id,
763                region_role_state,
764                sender,
765            },
766            receiver,
767        )
768    }
769
770    pub(crate) fn new_sync_region_request(
771        region_id: RegionId,
772        manifest_version: ManifestVersion,
773    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
774        let (sender, receiver) = oneshot::channel();
775        (
776            WorkerRequest::SyncRegion(RegionSyncRequest {
777                region_id,
778                manifest_version,
779                sender,
780            }),
781            receiver,
782        )
783    }
784
785    /// Converts [RemapManifestsRequest] from a [RemapManifestsRequest](store_api::region_engine::RemapManifestsRequest).
786    ///
787    /// # Errors
788    ///
789    /// Returns an error if the partition expression is invalid or missing.
790    /// Returns an error if the new partition expressions are not found for some regions.
791    #[allow(clippy::type_complexity)]
792    pub(crate) fn try_from_remap_manifests_request(
793        store_api::region_engine::RemapManifestsRequest {
794            region_id,
795            input_regions,
796            region_mapping,
797            new_partition_exprs,
798        }: store_api::region_engine::RemapManifestsRequest,
799    ) -> Result<(WorkerRequest, Receiver<Result<HashMap<RegionId, String>>>)> {
800        let (sender, receiver) = oneshot::channel();
801        let new_partition_exprs = new_partition_exprs
802            .into_iter()
803            .map(|(k, v)| {
804                Ok((
805                    k,
806                    PartitionExpr::from_json_str(&v)
807                        .context(InvalidPartitionExprSnafu { expr: v })?
808                        .context(MissingPartitionExprSnafu { region_id: k })?,
809                ))
810            })
811            .collect::<Result<HashMap<_, _>>>()?;
812
813        let request = RemapManifestsRequest {
814            region_id,
815            input_regions,
816            region_mapping,
817            new_partition_exprs,
818            sender,
819        };
820
821        Ok((WorkerRequest::RemapManifests(request), receiver))
822    }
823
824    /// Converts [CopyRegionFromRequest] from a [MitoCopyRegionFromRequest](store_api::region_engine::MitoCopyRegionFromRequest).
825    pub(crate) fn try_from_copy_region_from_request(
826        region_id: RegionId,
827        store_api::region_engine::MitoCopyRegionFromRequest {
828            source_region_id,
829            parallelism,
830        }: store_api::region_engine::MitoCopyRegionFromRequest,
831    ) -> Result<(WorkerRequest, Receiver<Result<MitoCopyRegionFromResponse>>)> {
832        let (sender, receiver) = oneshot::channel();
833        let request = CopyRegionFromRequest {
834            region_id,
835            source_region_id,
836            parallelism,
837            sender,
838        };
839        Ok((WorkerRequest::CopyRegionFrom(request), receiver))
840    }
841}
842
843/// DDL request to a region.
844#[derive(Debug)]
845pub(crate) enum DdlRequest {
846    Create(RegionCreateRequest),
847    Drop,
848    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
849    Close(RegionCloseRequest),
850    Alter(RegionAlterRequest),
851    Flush(RegionFlushRequest),
852    Compact(RegionCompactRequest),
853    BuildIndex(RegionBuildIndexRequest),
854    Truncate(RegionTruncateRequest),
855    Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
856    EnterStaging(EnterStagingRequest),
857    ApplyStagingManifest(ApplyStagingManifestRequest),
858}
859
860/// Sender and Ddl request.
861#[derive(Debug)]
862pub(crate) struct SenderDdlRequest {
863    /// Region id of the request.
864    pub(crate) region_id: RegionId,
865    /// Result sender.
866    pub(crate) sender: OptionOutputTx,
867    /// Ddl request.
868    pub(crate) request: DdlRequest,
869}
870
871/// Notification from a background job.
872#[derive(Debug)]
873pub(crate) enum BackgroundNotify {
874    /// Flush has finished.
875    FlushFinished(FlushFinished),
876    /// Flush has failed.
877    FlushFailed(FlushFailed),
878    /// Index build has finished.
879    IndexBuildFinished(IndexBuildFinished),
880    /// Index build has been stopped (aborted or succeeded).
881    IndexBuildStopped(IndexBuildStopped),
882    /// Index build has failed.
883    IndexBuildFailed(IndexBuildFailed),
884    /// Compaction has finished.
885    CompactionFinished(CompactionFinished),
886    /// Compaction has failed.
887    CompactionFailed(CompactionFailed),
888    /// Truncate result.
889    Truncate(TruncateResult),
890    /// Region change result.
891    RegionChange(RegionChangeResult),
892    /// Region edit result.
893    RegionEdit(RegionEditResult),
894    /// Enter staging result.
895    EnterStaging(EnterStagingResult),
896    /// Copy region result.
897    CopyRegionFromFinished(CopyRegionFromFinished),
898}
899
900/// Notifies a flush job is finished.
901#[derive(Debug)]
902pub(crate) struct FlushFinished {
903    /// Region id.
904    pub(crate) region_id: RegionId,
905    /// Entry id of flushed data.
906    pub(crate) flushed_entry_id: EntryId,
907    /// Flush result senders.
908    pub(crate) senders: Vec<OutputTx>,
909    /// Flush timer.
910    pub(crate) _timer: HistogramTimer,
911    /// Region edit to apply.
912    pub(crate) edit: RegionEdit,
913    /// Memtables to remove.
914    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
915    /// Whether the region is in staging mode.
916    pub(crate) is_staging: bool,
917}
918
919impl FlushFinished {
920    /// Marks the flush job as successful and observes the timer.
921    pub(crate) fn on_success(self) {
922        for sender in self.senders {
923            sender.send(Ok(0));
924        }
925    }
926}
927
928impl OnFailure for FlushFinished {
929    fn on_failure(&mut self, err: Error) {
930        let err = Arc::new(err);
931        for sender in self.senders.drain(..) {
932            sender.send(Err(err.clone()).context(FlushRegionSnafu {
933                region_id: self.region_id,
934            }));
935        }
936    }
937}
938
939/// Notifies a flush job is failed.
940#[derive(Debug)]
941pub(crate) struct FlushFailed {
942    /// The error source of the failure.
943    pub(crate) err: Arc<Error>,
944}
945
946#[derive(Debug)]
947pub(crate) struct IndexBuildFinished {
948    #[allow(dead_code)]
949    pub(crate) region_id: RegionId,
950    pub(crate) edit: RegionEdit,
951}
952
953/// Notifies an index build job has been stopped.
954#[derive(Debug)]
955pub(crate) struct IndexBuildStopped {
956    #[allow(dead_code)]
957    pub(crate) region_id: RegionId,
958    pub(crate) file_id: FileId,
959}
960
961/// Notifies an index build job has failed.
962#[derive(Debug)]
963pub(crate) struct IndexBuildFailed {
964    pub(crate) err: Arc<Error>,
965}
966
967/// Notifies a compaction job has finished.
968#[derive(Debug)]
969pub(crate) struct CompactionFinished {
970    /// Region id.
971    pub(crate) region_id: RegionId,
972    /// Compaction result senders.
973    pub(crate) senders: Vec<OutputTx>,
974    /// Start time of compaction task.
975    pub(crate) start_time: Instant,
976    /// Region edit to apply.
977    pub(crate) edit: RegionEdit,
978}
979
980impl CompactionFinished {
981    pub fn on_success(self) {
982        // only update compaction time on success
983        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
984
985        for sender in self.senders {
986            sender.send(Ok(0));
987        }
988        info!("Successfully compacted region: {}", self.region_id);
989    }
990}
991
992impl OnFailure for CompactionFinished {
993    /// Compaction succeeded but failed to update manifest or region's already been dropped.
994    fn on_failure(&mut self, err: Error) {
995        let err = Arc::new(err);
996        for sender in self.senders.drain(..) {
997            sender.send(Err(err.clone()).context(CompactRegionSnafu {
998                region_id: self.region_id,
999            }));
1000        }
1001    }
1002}
1003
1004/// A failing compaction result.
1005#[derive(Debug)]
1006pub(crate) struct CompactionFailed {
1007    pub(crate) region_id: RegionId,
1008    /// The error source of the failure.
1009    pub(crate) err: Arc<Error>,
1010}
1011
1012/// Notifies the truncate result of a region.
1013#[derive(Debug)]
1014pub(crate) struct TruncateResult {
1015    /// Region id.
1016    pub(crate) region_id: RegionId,
1017    /// Result sender.
1018    pub(crate) sender: OptionOutputTx,
1019    /// Truncate result.
1020    pub(crate) result: Result<()>,
1021    pub(crate) kind: TruncateKind,
1022}
1023
1024/// Notifies the region the result of writing region change action.
1025#[derive(Debug)]
1026pub(crate) struct RegionChangeResult {
1027    /// Region id.
1028    pub(crate) region_id: RegionId,
1029    /// The new region metadata to apply.
1030    pub(crate) new_meta: RegionMetadataRef,
1031    /// Result sender.
1032    pub(crate) sender: OptionOutputTx,
1033    /// Result from the manifest manager.
1034    pub(crate) result: Result<()>,
1035    /// Used for index build in schema change.
1036    pub(crate) need_index: bool,
1037    /// New options for the region.
1038    pub(crate) new_options: Option<RegionOptions>,
1039}
1040
1041/// Notifies the region the result of entering staging.
1042#[derive(Debug)]
1043pub(crate) struct EnterStagingResult {
1044    /// Region id.
1045    pub(crate) region_id: RegionId,
1046    /// The new partition expression to apply.
1047    pub(crate) partition_expr: String,
1048    /// Result sender.
1049    pub(crate) sender: OptionOutputTx,
1050    /// Result from the manifest manager.
1051    pub(crate) result: Result<()>,
1052}
1053
1054#[derive(Debug)]
1055pub(crate) struct CopyRegionFromFinished {
1056    /// Region id.
1057    pub(crate) region_id: RegionId,
1058    /// Region edit to apply.
1059    pub(crate) edit: RegionEdit,
1060    /// Result sender.
1061    pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1062}
1063
1064/// Request to edit a region directly.
1065#[derive(Debug)]
1066pub(crate) struct RegionEditRequest {
1067    pub(crate) region_id: RegionId,
1068    pub(crate) edit: RegionEdit,
1069    /// The sender to notify the result to the region engine.
1070    pub(crate) tx: Sender<Result<()>>,
1071}
1072
1073/// Notifies the regin the result of editing region.
1074#[derive(Debug)]
1075pub(crate) struct RegionEditResult {
1076    /// Region id.
1077    pub(crate) region_id: RegionId,
1078    /// Result sender.
1079    pub(crate) sender: Sender<Result<()>>,
1080    /// Region edit to apply.
1081    pub(crate) edit: RegionEdit,
1082    /// Result from the manifest manager.
1083    pub(crate) result: Result<()>,
1084    /// Whether region state need to be set to Writable after handling this request.
1085    pub(crate) update_region_state: bool,
1086    /// The region is in staging mode before handling this request.
1087    pub(crate) is_staging: bool,
1088}
1089
1090#[derive(Debug)]
1091pub(crate) struct BuildIndexRequest {
1092    pub(crate) region_id: RegionId,
1093    pub(crate) build_type: IndexBuildType,
1094    /// files need to build index, empty means all.
1095    pub(crate) file_metas: Vec<FileMeta>,
1096}
1097
1098#[derive(Debug)]
1099pub(crate) struct RegionSyncRequest {
1100    pub(crate) region_id: RegionId,
1101    pub(crate) manifest_version: ManifestVersion,
1102    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
1103    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
1104}
1105
1106#[derive(Debug)]
1107pub(crate) struct RemapManifestsRequest {
1108    /// The [`RegionId`] of a staging region used to obtain table directory and storage configuration for the remap operation.
1109    pub(crate) region_id: RegionId,
1110    /// Regions to remap manifests from.
1111    pub(crate) input_regions: Vec<RegionId>,
1112    /// For each old region, which new regions should receive its files
1113    pub(crate) region_mapping: HashMap<RegionId, Vec<RegionId>>,
1114    /// New partition expressions for the new regions.
1115    pub(crate) new_partition_exprs: HashMap<RegionId, PartitionExpr>,
1116    /// Sender for the result of the remap operation.
1117    ///
1118    /// The result is a map from region IDs to their corresponding staging manifest paths.
1119    pub(crate) sender: Sender<Result<HashMap<RegionId, String>>>,
1120}
1121
1122#[derive(Debug)]
1123pub(crate) struct CopyRegionFromRequest {
1124    /// The [`RegionId`] of the target region.
1125    pub(crate) region_id: RegionId,
1126    /// The [`RegionId`] of the source region.
1127    pub(crate) source_region_id: RegionId,
1128    /// The parallelism of the copy operation.
1129    pub(crate) parallelism: usize,
1130    /// Result sender.
1131    pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1132}
1133
1134#[cfg(test)]
1135mod tests {
1136    use api::v1::value::ValueData;
1137    use api::v1::{Row, SemanticType};
1138    use datatypes::prelude::ConcreteDataType;
1139    use datatypes::schema::ColumnDefaultConstraint;
1140    use mito_codec::test_util::i64_value;
1141    use store_api::metadata::RegionMetadataBuilder;
1142
1143    use super::*;
1144    use crate::error::Error;
1145    use crate::test_util::ts_ms_value;
1146
1147    fn new_column_schema(
1148        name: &str,
1149        data_type: ColumnDataType,
1150        semantic_type: SemanticType,
1151    ) -> ColumnSchema {
1152        ColumnSchema {
1153            column_name: name.to_string(),
1154            datatype: data_type as i32,
1155            semantic_type: semantic_type as i32,
1156            ..Default::default()
1157        }
1158    }
1159
1160    fn check_invalid_request(err: &Error, expect: &str) {
1161        if let Error::InvalidRequest {
1162            region_id: _,
1163            reason,
1164            location: _,
1165        } = err
1166        {
1167            assert_eq!(reason, expect);
1168        } else {
1169            panic!("Unexpected error {err}")
1170        }
1171    }
1172
1173    #[test]
1174    fn test_write_request_duplicate_column() {
1175        let rows = Rows {
1176            schema: vec![
1177                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1178                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1179            ],
1180            rows: vec![],
1181        };
1182
1183        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1184        check_invalid_request(&err, "duplicate column c0");
1185    }
1186
1187    #[test]
1188    fn test_valid_write_request() {
1189        let rows = Rows {
1190            schema: vec![
1191                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1192                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1193            ],
1194            rows: vec![Row {
1195                values: vec![i64_value(1), i64_value(2)],
1196            }],
1197        };
1198
1199        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1200        assert_eq!(0, request.column_index_by_name("c0").unwrap());
1201        assert_eq!(1, request.column_index_by_name("c1").unwrap());
1202        assert_eq!(None, request.column_index_by_name("c2"));
1203    }
1204
1205    #[test]
1206    fn test_write_request_column_num() {
1207        let rows = Rows {
1208            schema: vec![
1209                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1210                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1211            ],
1212            rows: vec![Row {
1213                values: vec![i64_value(1), i64_value(2), i64_value(3)],
1214            }],
1215        };
1216
1217        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1218        check_invalid_request(&err, "row has 3 columns but schema has 2");
1219    }
1220
1221    fn new_region_metadata() -> RegionMetadata {
1222        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1223        builder
1224            .push_column_metadata(ColumnMetadata {
1225                column_schema: datatypes::schema::ColumnSchema::new(
1226                    "ts",
1227                    ConcreteDataType::timestamp_millisecond_datatype(),
1228                    false,
1229                ),
1230                semantic_type: SemanticType::Timestamp,
1231                column_id: 1,
1232            })
1233            .push_column_metadata(ColumnMetadata {
1234                column_schema: datatypes::schema::ColumnSchema::new(
1235                    "k0",
1236                    ConcreteDataType::int64_datatype(),
1237                    true,
1238                ),
1239                semantic_type: SemanticType::Tag,
1240                column_id: 2,
1241            })
1242            .primary_key(vec![2]);
1243        builder.build().unwrap()
1244    }
1245
1246    #[test]
1247    fn test_check_schema() {
1248        let rows = Rows {
1249            schema: vec![
1250                new_column_schema(
1251                    "ts",
1252                    ColumnDataType::TimestampMillisecond,
1253                    SemanticType::Timestamp,
1254                ),
1255                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1256            ],
1257            rows: vec![Row {
1258                values: vec![ts_ms_value(1), i64_value(2)],
1259            }],
1260        };
1261        let metadata = new_region_metadata();
1262
1263        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1264        request.check_schema(&metadata).unwrap();
1265    }
1266
1267    #[test]
1268    fn test_column_type() {
1269        let rows = Rows {
1270            schema: vec![
1271                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1272                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1273            ],
1274            rows: vec![Row {
1275                values: vec![i64_value(1), i64_value(2)],
1276            }],
1277        };
1278        let metadata = new_region_metadata();
1279
1280        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1281        let err = request.check_schema(&metadata).unwrap_err();
1282        check_invalid_request(
1283            &err,
1284            "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1285        );
1286    }
1287
1288    #[test]
1289    fn test_semantic_type() {
1290        let rows = Rows {
1291            schema: vec![
1292                new_column_schema(
1293                    "ts",
1294                    ColumnDataType::TimestampMillisecond,
1295                    SemanticType::Tag,
1296                ),
1297                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1298            ],
1299            rows: vec![Row {
1300                values: vec![ts_ms_value(1), i64_value(2)],
1301            }],
1302        };
1303        let metadata = new_region_metadata();
1304
1305        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1306        let err = request.check_schema(&metadata).unwrap_err();
1307        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1308    }
1309
1310    #[test]
1311    fn test_column_nullable() {
1312        let rows = Rows {
1313            schema: vec![
1314                new_column_schema(
1315                    "ts",
1316                    ColumnDataType::TimestampMillisecond,
1317                    SemanticType::Timestamp,
1318                ),
1319                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1320            ],
1321            rows: vec![Row {
1322                values: vec![Value { value_data: None }, i64_value(2)],
1323            }],
1324        };
1325        let metadata = new_region_metadata();
1326
1327        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1328        let err = request.check_schema(&metadata).unwrap_err();
1329        check_invalid_request(&err, "column ts is not null but input has null");
1330    }
1331
1332    #[test]
1333    fn test_column_default() {
1334        let rows = Rows {
1335            schema: vec![new_column_schema(
1336                "k0",
1337                ColumnDataType::Int64,
1338                SemanticType::Tag,
1339            )],
1340            rows: vec![Row {
1341                values: vec![i64_value(1)],
1342            }],
1343        };
1344        let metadata = new_region_metadata();
1345
1346        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1347        let err = request.check_schema(&metadata).unwrap_err();
1348        check_invalid_request(&err, "missing column ts");
1349    }
1350
1351    #[test]
1352    fn test_unknown_column() {
1353        let rows = Rows {
1354            schema: vec![
1355                new_column_schema(
1356                    "ts",
1357                    ColumnDataType::TimestampMillisecond,
1358                    SemanticType::Timestamp,
1359                ),
1360                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1361                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1362            ],
1363            rows: vec![Row {
1364                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1365            }],
1366        };
1367        let metadata = new_region_metadata();
1368
1369        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1370        let err = request.check_schema(&metadata).unwrap_err();
1371        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1372    }
1373
1374    #[test]
1375    fn test_fill_impure_columns_err() {
1376        let rows = Rows {
1377            schema: vec![new_column_schema(
1378                "k0",
1379                ColumnDataType::Int64,
1380                SemanticType::Tag,
1381            )],
1382            rows: vec![Row {
1383                values: vec![i64_value(1)],
1384            }],
1385        };
1386        let metadata = {
1387            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1388            builder
1389                .push_column_metadata(ColumnMetadata {
1390                    column_schema: datatypes::schema::ColumnSchema::new(
1391                        "ts",
1392                        ConcreteDataType::timestamp_millisecond_datatype(),
1393                        false,
1394                    )
1395                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1396                        "now()".to_string(),
1397                    )))
1398                    .unwrap(),
1399                    semantic_type: SemanticType::Timestamp,
1400                    column_id: 1,
1401                })
1402                .push_column_metadata(ColumnMetadata {
1403                    column_schema: datatypes::schema::ColumnSchema::new(
1404                        "k0",
1405                        ConcreteDataType::int64_datatype(),
1406                        true,
1407                    ),
1408                    semantic_type: SemanticType::Tag,
1409                    column_id: 2,
1410                })
1411                .primary_key(vec![2]);
1412            builder.build().unwrap()
1413        };
1414
1415        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1416        let err = request.check_schema(&metadata).unwrap_err();
1417        assert!(err.is_fill_default());
1418        assert!(
1419            request
1420                .fill_missing_columns(&metadata)
1421                .unwrap_err()
1422                .to_string()
1423                .contains("unexpected impure default value with region_id")
1424        );
1425    }
1426
1427    #[test]
1428    fn test_fill_missing_columns() {
1429        let rows = Rows {
1430            schema: vec![new_column_schema(
1431                "ts",
1432                ColumnDataType::TimestampMillisecond,
1433                SemanticType::Timestamp,
1434            )],
1435            rows: vec![Row {
1436                values: vec![ts_ms_value(1)],
1437            }],
1438        };
1439        let metadata = new_region_metadata();
1440
1441        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1442        let err = request.check_schema(&metadata).unwrap_err();
1443        assert!(err.is_fill_default());
1444        request.fill_missing_columns(&metadata).unwrap();
1445
1446        let expect_rows = Rows {
1447            schema: vec![new_column_schema(
1448                "ts",
1449                ColumnDataType::TimestampMillisecond,
1450                SemanticType::Timestamp,
1451            )],
1452            rows: vec![Row {
1453                values: vec![ts_ms_value(1)],
1454            }],
1455        };
1456        assert_eq!(expect_rows, request.rows);
1457    }
1458
1459    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1460        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1461        builder
1462            .push_column_metadata(ColumnMetadata {
1463                column_schema: datatypes::schema::ColumnSchema::new(
1464                    "ts",
1465                    ConcreteDataType::timestamp_millisecond_datatype(),
1466                    false,
1467                ),
1468                semantic_type: SemanticType::Timestamp,
1469                column_id: 1,
1470            })
1471            .push_column_metadata(ColumnMetadata {
1472                column_schema: datatypes::schema::ColumnSchema::new(
1473                    "k0",
1474                    ConcreteDataType::int64_datatype(),
1475                    true,
1476                ),
1477                semantic_type: SemanticType::Tag,
1478                column_id: 2,
1479            })
1480            .primary_key(vec![2]);
1481        builder
1482    }
1483
1484    fn region_metadata_two_fields() -> RegionMetadata {
1485        let mut builder = builder_with_ts_tag();
1486        builder
1487            .push_column_metadata(ColumnMetadata {
1488                column_schema: datatypes::schema::ColumnSchema::new(
1489                    "f0",
1490                    ConcreteDataType::int64_datatype(),
1491                    true,
1492                ),
1493                semantic_type: SemanticType::Field,
1494                column_id: 3,
1495            })
1496            // Column is not nullable.
1497            .push_column_metadata(ColumnMetadata {
1498                column_schema: datatypes::schema::ColumnSchema::new(
1499                    "f1",
1500                    ConcreteDataType::int64_datatype(),
1501                    false,
1502                )
1503                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1504                    datatypes::value::Value::Int64(100),
1505                )))
1506                .unwrap(),
1507                semantic_type: SemanticType::Field,
1508                column_id: 4,
1509            });
1510        builder.build().unwrap()
1511    }
1512
1513    #[test]
1514    fn test_fill_missing_for_delete() {
1515        let rows = Rows {
1516            schema: vec![new_column_schema(
1517                "ts",
1518                ColumnDataType::TimestampMillisecond,
1519                SemanticType::Timestamp,
1520            )],
1521            rows: vec![Row {
1522                values: vec![ts_ms_value(1)],
1523            }],
1524        };
1525        let metadata = region_metadata_two_fields();
1526
1527        let mut request =
1528            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1529        let err = request.check_schema(&metadata).unwrap_err();
1530        check_invalid_request(&err, "delete requests need column k0");
1531        let err = request.fill_missing_columns(&metadata).unwrap_err();
1532        check_invalid_request(&err, "delete requests need column k0");
1533
1534        let rows = Rows {
1535            schema: vec![
1536                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1537                new_column_schema(
1538                    "ts",
1539                    ColumnDataType::TimestampMillisecond,
1540                    SemanticType::Timestamp,
1541                ),
1542            ],
1543            rows: vec![Row {
1544                values: vec![i64_value(100), ts_ms_value(1)],
1545            }],
1546        };
1547        let mut request =
1548            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1549        let err = request.check_schema(&metadata).unwrap_err();
1550        assert!(err.is_fill_default());
1551        request.fill_missing_columns(&metadata).unwrap();
1552
1553        let expect_rows = Rows {
1554            schema: vec![
1555                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1556                new_column_schema(
1557                    "ts",
1558                    ColumnDataType::TimestampMillisecond,
1559                    SemanticType::Timestamp,
1560                ),
1561                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1562            ],
1563            // Column f1 is not nullable and we use 0 for padding.
1564            rows: vec![Row {
1565                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1566            }],
1567        };
1568        assert_eq!(expect_rows, request.rows);
1569    }
1570
1571    #[test]
1572    fn test_fill_missing_without_default_in_delete() {
1573        let mut builder = builder_with_ts_tag();
1574        builder
1575            // f0 is nullable.
1576            .push_column_metadata(ColumnMetadata {
1577                column_schema: datatypes::schema::ColumnSchema::new(
1578                    "f0",
1579                    ConcreteDataType::int64_datatype(),
1580                    true,
1581                ),
1582                semantic_type: SemanticType::Field,
1583                column_id: 3,
1584            })
1585            // f1 is not nullable and don't has default.
1586            .push_column_metadata(ColumnMetadata {
1587                column_schema: datatypes::schema::ColumnSchema::new(
1588                    "f1",
1589                    ConcreteDataType::int64_datatype(),
1590                    false,
1591                ),
1592                semantic_type: SemanticType::Field,
1593                column_id: 4,
1594            });
1595        let metadata = builder.build().unwrap();
1596
1597        let rows = Rows {
1598            schema: vec![
1599                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1600                new_column_schema(
1601                    "ts",
1602                    ColumnDataType::TimestampMillisecond,
1603                    SemanticType::Timestamp,
1604                ),
1605            ],
1606            // Missing f0 (nullable), f1 (not nullable).
1607            rows: vec![Row {
1608                values: vec![i64_value(100), ts_ms_value(1)],
1609            }],
1610        };
1611        let mut request =
1612            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1613        let err = request.check_schema(&metadata).unwrap_err();
1614        assert!(err.is_fill_default());
1615        request.fill_missing_columns(&metadata).unwrap();
1616
1617        let expect_rows = Rows {
1618            schema: vec![
1619                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1620                new_column_schema(
1621                    "ts",
1622                    ColumnDataType::TimestampMillisecond,
1623                    SemanticType::Timestamp,
1624                ),
1625                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1626            ],
1627            // Column f1 is not nullable and we use 0 for padding.
1628            rows: vec![Row {
1629                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1630            }],
1631        };
1632        assert_eq!(expect_rows, request.rows);
1633    }
1634
1635    #[test]
1636    fn test_no_default() {
1637        let rows = Rows {
1638            schema: vec![new_column_schema(
1639                "k0",
1640                ColumnDataType::Int64,
1641                SemanticType::Tag,
1642            )],
1643            rows: vec![Row {
1644                values: vec![i64_value(1)],
1645            }],
1646        };
1647        let metadata = new_region_metadata();
1648
1649        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1650        let err = request.fill_missing_columns(&metadata).unwrap_err();
1651        check_invalid_request(&err, "column ts does not have default value");
1652    }
1653
1654    #[test]
1655    fn test_missing_and_invalid() {
1656        // Missing f0 and f1 has invalid type (string).
1657        let rows = Rows {
1658            schema: vec![
1659                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1660                new_column_schema(
1661                    "ts",
1662                    ColumnDataType::TimestampMillisecond,
1663                    SemanticType::Timestamp,
1664                ),
1665                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1666            ],
1667            rows: vec![Row {
1668                values: vec![
1669                    i64_value(100),
1670                    ts_ms_value(1),
1671                    Value {
1672                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1673                    },
1674                ],
1675            }],
1676        };
1677        let metadata = region_metadata_two_fields();
1678
1679        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1680        let err = request.check_schema(&metadata).unwrap_err();
1681        check_invalid_request(
1682            &err,
1683            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1684        );
1685    }
1686
1687    #[test]
1688    fn test_write_request_metadata() {
1689        let rows = Rows {
1690            schema: vec![
1691                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1692                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1693            ],
1694            rows: vec![Row {
1695                values: vec![i64_value(1), i64_value(2)],
1696            }],
1697        };
1698
1699        let metadata = Arc::new(new_region_metadata());
1700        let request = WriteRequest::new(
1701            RegionId::new(1, 1),
1702            OpType::Put,
1703            rows,
1704            Some(metadata.clone()),
1705        )
1706        .unwrap();
1707
1708        assert!(request.region_metadata.is_some());
1709        assert_eq!(
1710            request.region_metadata.unwrap().region_id,
1711            RegionId::new(1, 1)
1712        );
1713    }
1714}