Skip to main content

mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23};
24use api::v1::column_def::options_from_column_schema;
25use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
26use common_telemetry::info;
27use datatypes::prelude::DataType;
28use partition::expr::PartitionExpr;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{
37    MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
38};
39use store_api::region_request::{
40    AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
41    RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
42    RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest,
43    RegionOpenRequest, RegionRequest, RegionTruncateRequest, StagingPartitionDirective,
44};
45use store_api::storage::{FileId, RegionId};
46use tokio::sync::oneshot::{self, Receiver, Sender};
47
48use crate::error::{
49    CompactRegionSnafu, CompactionCancelledSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
50    Error, FillDefaultSnafu, FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu,
51    MissingPartitionExprSnafu, Result, UnexpectedSnafu,
52};
53use crate::flush::FlushReason;
54use crate::manifest::action::{RegionEdit, TruncateKind};
55use crate::memtable::MemtableId;
56use crate::memtable::bulk::part::BulkPart;
57use crate::metrics::COMPACTION_ELAPSED_TOTAL;
58use crate::region::options::RegionOptions;
59use crate::sst::file::FileMeta;
60use crate::sst::index::IndexBuildType;
61use crate::wal::EntryId;
62use crate::wal::entry_distributor::WalEntryReceiver;
63
64/// Request to write a region.
65#[derive(Debug)]
66pub struct WriteRequest {
67    /// Region to write.
68    pub region_id: RegionId,
69    /// Type of the write request.
70    pub op_type: OpType,
71    /// Rows to write.
72    pub rows: Rows,
73    /// Map column name to column index in `rows`.
74    pub name_to_index: HashMap<String, usize>,
75    /// Whether each column has null.
76    pub has_null: Vec<bool>,
77    /// Write hint.
78    pub hint: Option<WriteHint>,
79    /// Region metadata on the time of this request is created.
80    pub(crate) region_metadata: Option<RegionMetadataRef>,
81    /// Partition expression version for the region.
82    pub partition_expr_version: Option<u64>,
83}
84
85impl WriteRequest {
86    /// Creates a new request.
87    ///
88    /// Returns `Err` if `rows` are invalid.
89    pub fn new(
90        region_id: RegionId,
91        op_type: OpType,
92        rows: Rows,
93        region_metadata: Option<RegionMetadataRef>,
94    ) -> Result<WriteRequest> {
95        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
96        for (index, column) in rows.schema.iter().enumerate() {
97            ensure!(
98                name_to_index
99                    .insert(column.column_name.clone(), index)
100                    .is_none(),
101                InvalidRequestSnafu {
102                    region_id,
103                    reason: format!("duplicate column {}", column.column_name),
104                }
105            );
106        }
107
108        let mut has_null = vec![false; rows.schema.len()];
109        for row in &rows.rows {
110            ensure!(
111                row.values.len() == rows.schema.len(),
112                InvalidRequestSnafu {
113                    region_id,
114                    reason: format!(
115                        "row has {} columns but schema has {}",
116                        row.values.len(),
117                        rows.schema.len()
118                    ),
119                }
120            );
121
122            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
123                validate_proto_value(region_id, value, column_schema)?;
124
125                if value.value_data.is_none() {
126                    has_null[i] = true;
127                }
128            }
129        }
130
131        Ok(WriteRequest {
132            region_id,
133            op_type,
134            rows,
135            name_to_index,
136            has_null,
137            hint: None,
138            region_metadata,
139            partition_expr_version: None,
140        })
141    }
142
143    /// Sets the write hint.
144    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
145        self.hint = hint;
146        self
147    }
148
149    pub fn with_partition_expr_version(mut self, partition_expr_version: Option<u64>) -> Self {
150        self.partition_expr_version = partition_expr_version;
151        self
152    }
153
154    /// Returns the encoding hint.
155    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
156        infer_primary_key_encoding_from_hint(self.hint.as_ref())
157    }
158
159    /// Returns estimated size of the request.
160    pub(crate) fn estimated_size(&self) -> usize {
161        let row_size = self
162            .rows
163            .rows
164            .first()
165            .map(|row| row.encoded_len())
166            .unwrap_or(0);
167        row_size * self.rows.rows.len()
168    }
169
170    /// Gets column index by name.
171    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
172        self.name_to_index.get(name).copied()
173    }
174
175    /// Checks schema of rows is compatible with schema of the region.
176    ///
177    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
178    /// error.
179    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
180        debug_assert_eq!(self.region_id, metadata.region_id);
181
182        let region_id = self.region_id;
183        // Index all columns in rows.
184        let mut rows_columns: HashMap<_, _> = self
185            .rows
186            .schema
187            .iter()
188            .map(|column| (&column.column_name, column))
189            .collect();
190
191        let mut need_fill_default = false;
192        // Checks all columns in this region.
193        for column in &metadata.column_metadatas {
194            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
195                // Check data type.
196                ensure!(
197                    is_column_type_value_eq(
198                        input_col.datatype,
199                        input_col.datatype_extension.clone(),
200                        &column.column_schema.data_type
201                    ),
202                    InvalidRequestSnafu {
203                        region_id,
204                        reason: format!(
205                            "column {} expect type {:?}, given: {}({})",
206                            column.column_schema.name,
207                            column.column_schema.data_type,
208                            ColumnDataType::try_from(input_col.datatype)
209                                .map(|v| v.as_str_name())
210                                .unwrap_or("Unknown"),
211                            input_col.datatype,
212                        )
213                    }
214                );
215
216                // Check semantic type.
217                ensure!(
218                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
219                    InvalidRequestSnafu {
220                        region_id,
221                        reason: format!(
222                            "column {} has semantic type {:?}, given: {}({})",
223                            column.column_schema.name,
224                            column.semantic_type,
225                            api::v1::SemanticType::try_from(input_col.semantic_type)
226                                .map(|v| v.as_str_name())
227                                .unwrap_or("Unknown"),
228                            input_col.semantic_type
229                        ),
230                    }
231                );
232
233                // Check nullable.
234                // Safety: `rows_columns` ensures this column exists.
235                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
236                ensure!(
237                    !has_null || column.column_schema.is_nullable(),
238                    InvalidRequestSnafu {
239                        region_id,
240                        reason: format!(
241                            "column {} is not null but input has null",
242                            column.column_schema.name
243                        ),
244                    }
245                );
246            } else {
247                // Rows don't have this column.
248                self.check_missing_column(column)?;
249
250                need_fill_default = true;
251            }
252        }
253
254        // Checks all columns in rows exist in the region.
255        if !rows_columns.is_empty() {
256            let names: Vec<_> = rows_columns.into_keys().collect();
257            return InvalidRequestSnafu {
258                region_id,
259                reason: format!("unknown columns: {:?}", names),
260            }
261            .fail();
262        }
263
264        // If we need to fill default values, return a special error.
265        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
266
267        Ok(())
268    }
269
270    /// Tries to fill missing columns.
271    ///
272    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
273    /// values.
274    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
275        debug_assert_eq!(self.region_id, metadata.region_id);
276
277        let mut columns_to_fill = vec![];
278        for column in &metadata.column_metadatas {
279            if !self.name_to_index.contains_key(&column.column_schema.name) {
280                columns_to_fill.push(column);
281            }
282        }
283        self.fill_columns(columns_to_fill)?;
284
285        Ok(())
286    }
287
288    /// Checks the schema and fill missing columns.
289    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
290        if let Err(e) = self.check_schema(metadata) {
291            if e.is_fill_default() {
292                // TODO(yingwen): Add metrics for this case.
293                // We need to fill default value. The write request may be a request
294                // sent before changing the schema.
295                self.fill_missing_columns(metadata)?;
296            } else {
297                return Err(e);
298            }
299        }
300
301        Ok(())
302    }
303
304    /// Fills default value for specific `columns`.
305    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
306        let mut default_values = Vec::with_capacity(columns.len());
307        let mut columns_to_fill = Vec::with_capacity(columns.len());
308        for column in columns {
309            let default_value = self.column_default_value(column)?;
310            if default_value.value_data.is_some() {
311                default_values.push(default_value);
312                columns_to_fill.push(column);
313            }
314        }
315
316        for row in &mut self.rows.rows {
317            row.values.extend(default_values.iter().cloned());
318        }
319
320        for column in columns_to_fill {
321            let (datatype, datatype_ext) =
322                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
323                    .with_context(|_| ConvertColumnDataTypeSnafu {
324                        reason: format!(
325                            "no protobuf type for column {} ({:?})",
326                            column.column_schema.name, column.column_schema.data_type
327                        ),
328                    })?
329                    .to_parts();
330            self.rows.schema.push(ColumnSchema {
331                column_name: column.column_schema.name.clone(),
332                datatype: datatype as i32,
333                semantic_type: column.semantic_type as i32,
334                datatype_extension: datatype_ext,
335                options: options_from_column_schema(&column.column_schema),
336            });
337        }
338
339        Ok(())
340    }
341
342    /// Checks whether we should allow a row doesn't provide this column.
343    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
344        if self.op_type == OpType::Delete {
345            if column.semantic_type == SemanticType::Field {
346                // For delete request, all tags and timestamp is required. We don't fill default
347                // tag or timestamp while deleting rows.
348                return Ok(());
349            } else {
350                return InvalidRequestSnafu {
351                    region_id: self.region_id,
352                    reason: format!("delete requests need column {}", column.column_schema.name),
353                }
354                .fail();
355            }
356        }
357
358        // Not a delete request. Checks whether they have default value.
359        ensure!(
360            column.column_schema.is_nullable()
361                || column.column_schema.default_constraint().is_some(),
362            InvalidRequestSnafu {
363                region_id: self.region_id,
364                reason: format!("missing column {}", column.column_schema.name),
365            }
366        );
367
368        Ok(())
369    }
370
371    /// Returns the default value for specific column.
372    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
373        let default_value = match self.op_type {
374            OpType::Delete => {
375                ensure!(
376                    column.semantic_type == SemanticType::Field,
377                    InvalidRequestSnafu {
378                        region_id: self.region_id,
379                        reason: format!(
380                            "delete requests need column {}",
381                            column.column_schema.name
382                        ),
383                    }
384                );
385
386                // For delete request, we need a default value for padding so we
387                // can delete a row even a field doesn't have a default value. So the
388                // value doesn't need to following the default value constraint of the
389                // column.
390                if column.column_schema.is_nullable() {
391                    datatypes::value::Value::Null
392                } else {
393                    column.column_schema.data_type.default_value()
394                }
395            }
396            OpType::Put => {
397                // For put requests, we use the default value from column schema.
398                if column.column_schema.is_default_impure() {
399                    UnexpectedSnafu {
400                        reason: format!(
401                            "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
402                            self.region_id,
403                            column.column_schema.name,
404                            column.column_schema.default_constraint(),
405                        ),
406                    }
407                    .fail()?
408                }
409                column
410                    .column_schema
411                    .create_default()
412                    .context(CreateDefaultSnafu {
413                        region_id: self.region_id,
414                        column: &column.column_schema.name,
415                    })?
416                    // This column doesn't have default value.
417                    .with_context(|| InvalidRequestSnafu {
418                        region_id: self.region_id,
419                        reason: format!(
420                            "column {} does not have default value",
421                            column.column_schema.name
422                        ),
423                    })?
424            }
425        };
426
427        // Convert default value into proto's value.
428        Ok(api::helper::to_grpc_value(default_value))
429    }
430}
431
432/// Validate proto value schema.
433pub(crate) fn validate_proto_value(
434    region_id: RegionId,
435    value: &Value,
436    column_schema: &ColumnSchema,
437) -> Result<()> {
438    if let Some(value_type) = proto_value_type(value) {
439        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
440            InvalidRequestSnafu {
441                region_id,
442                reason: format!(
443                    "column {} has unknown type {}",
444                    column_schema.column_name, column_schema.datatype
445                ),
446            }
447            .build()
448        })?;
449        ensure!(
450            proto_value_type_match(column_type, value_type),
451            InvalidRequestSnafu {
452                region_id,
453                reason: format!(
454                    "value has type {:?}, but column {} has type {:?}({})",
455                    value_type, column_schema.column_name, column_type, column_schema.datatype,
456                ),
457            }
458        );
459    }
460
461    Ok(())
462}
463
464fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
465    match (column_type, value_type) {
466        (ct, vt) if ct == vt => true,
467        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
468        (ColumnDataType::Json, ColumnDataType::Binary) => true,
469        _ => false,
470    }
471}
472
473/// Oneshot output result sender.
474#[derive(Debug)]
475pub struct OutputTx(Sender<Result<AffectedRows>>);
476
477impl OutputTx {
478    /// Creates a new output sender.
479    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
480        OutputTx(sender)
481    }
482
483    /// Sends the `result`.
484    pub(crate) fn send(self, result: Result<AffectedRows>) {
485        // Ignores send result.
486        let _ = self.0.send(result);
487    }
488}
489
490/// Optional output result sender.
491#[derive(Debug)]
492pub(crate) struct OptionOutputTx(Option<OutputTx>);
493
494impl OptionOutputTx {
495    /// Creates a sender.
496    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
497        OptionOutputTx(sender)
498    }
499
500    /// Creates an empty sender.
501    pub(crate) fn none() -> OptionOutputTx {
502        OptionOutputTx(None)
503    }
504
505    /// Sends the `result` and consumes the inner sender.
506    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
507        if let Some(sender) = self.0.take() {
508            sender.send(result);
509        }
510    }
511
512    /// Sends the `result` and consumes the sender.
513    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
514        if let Some(sender) = self.0.take() {
515            sender.send(result);
516        }
517    }
518
519    /// Takes the inner sender.
520    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
521        self.0.take()
522    }
523}
524
525impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
526    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
527        Self::new(Some(OutputTx::new(sender)))
528    }
529}
530
531impl OnFailure for OptionOutputTx {
532    fn on_failure(&mut self, err: Error) {
533        self.send_mut(Err(err));
534    }
535}
536
537/// Callback on failure.
538pub(crate) trait OnFailure {
539    /// Handles `err` on failure.
540    fn on_failure(&mut self, err: Error);
541}
542
543/// Sender and write request.
544#[derive(Debug)]
545pub(crate) struct SenderWriteRequest {
546    /// Result sender.
547    pub(crate) sender: OptionOutputTx,
548    pub(crate) request: WriteRequest,
549}
550
551pub(crate) struct SenderBulkRequest {
552    pub(crate) sender: OptionOutputTx,
553    pub(crate) region_id: RegionId,
554    pub(crate) request: BulkPart,
555    pub(crate) region_metadata: RegionMetadataRef,
556    pub(crate) partition_expr_version: Option<u64>,
557}
558
559#[derive(Debug)]
560pub(crate) struct BulkInsertRequest {
561    pub(crate) metadata: Option<RegionMetadataRef>,
562    pub(crate) request: RegionBulkInsertsRequest,
563    pub(crate) sender: OptionOutputTx,
564}
565
566/// Request sent to a worker with timestamp
567#[derive(Debug)]
568pub(crate) struct WorkerRequestWithTime {
569    pub(crate) request: WorkerRequest,
570    pub(crate) created_at: Instant,
571}
572
573impl WorkerRequestWithTime {
574    pub(crate) fn new(request: WorkerRequest) -> Self {
575        Self {
576            request,
577            created_at: Instant::now(),
578        }
579    }
580}
581
582/// Request sent to a worker
583#[derive(Debug)]
584pub(crate) enum WorkerRequest {
585    /// Write to a region.
586    Write(SenderWriteRequest),
587
588    /// Ddl request to a region.
589    Ddl(SenderDdlRequest),
590
591    /// Notifications from internal background jobs.
592    Background {
593        /// Id of the region to send.
594        region_id: RegionId,
595        /// Internal notification.
596        notify: BackgroundNotify,
597    },
598
599    /// The internal commands.
600    SetRegionRoleStateGracefully {
601        /// Id of the region to send.
602        region_id: RegionId,
603        /// The [SettableRegionRoleState].
604        region_role_state: SettableRegionRoleState,
605        /// The sender of [SetReadonlyResponse].
606        sender: Sender<SetRegionRoleStateResponse>,
607    },
608
609    /// Notify a worker to stop.
610    Stop,
611
612    /// Use [RegionEdit] to edit a region directly.
613    EditRegion(RegionEditRequest),
614
615    /// Keep the manifest of a region up to date.
616    SyncRegion(RegionSyncRequest),
617
618    /// Bulk inserts request and region metadata.
619    BulkInserts(BulkInsertRequest),
620
621    /// Remap manifests request.
622    RemapManifests(RemapManifestsRequest),
623
624    /// Copy region from request.
625    CopyRegionFrom(CopyRegionFromRequest),
626}
627
628impl WorkerRequest {
629    /// Creates a new open region request.
630    pub(crate) fn new_open_region_request(
631        region_id: RegionId,
632        request: RegionOpenRequest,
633        entry_receiver: Option<WalEntryReceiver>,
634    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
635        let (sender, receiver) = oneshot::channel();
636
637        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
638            region_id,
639            sender: sender.into(),
640            request: DdlRequest::Open((request, entry_receiver)),
641        });
642
643        (worker_request, receiver)
644    }
645
646    /// Creates a new catchup region request.
647    pub(crate) fn new_catchup_region_request(
648        region_id: RegionId,
649        request: RegionCatchupRequest,
650        entry_receiver: Option<WalEntryReceiver>,
651    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
652        let (sender, receiver) = oneshot::channel();
653        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
654            region_id,
655            sender: sender.into(),
656            request: DdlRequest::Catchup((request, entry_receiver)),
657        });
658        (worker_request, receiver)
659    }
660
661    /// Converts request from a [RegionRequest].
662    pub(crate) fn try_from_region_request(
663        region_id: RegionId,
664        value: RegionRequest,
665        region_metadata: Option<RegionMetadataRef>,
666    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
667        let (sender, receiver) = oneshot::channel();
668        let worker_request = match value {
669            RegionRequest::Put(v) => {
670                let mut write_request =
671                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
672                        .with_hint(v.hint)
673                        .with_partition_expr_version(v.partition_expr_version);
674                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
675                    && let Some(region_metadata) = &region_metadata
676                {
677                    write_request.maybe_fill_missing_columns(region_metadata)?;
678                }
679                WorkerRequest::Write(SenderWriteRequest {
680                    sender: sender.into(),
681                    request: write_request,
682                })
683            }
684            RegionRequest::Delete(v) => {
685                let mut write_request =
686                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
687                        .with_hint(v.hint)
688                        .with_partition_expr_version(v.partition_expr_version);
689                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
690                    && let Some(region_metadata) = &region_metadata
691                {
692                    write_request.maybe_fill_missing_columns(region_metadata)?;
693                }
694                WorkerRequest::Write(SenderWriteRequest {
695                    sender: sender.into(),
696                    request: write_request,
697                })
698            }
699            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
700                region_id,
701                sender: sender.into(),
702                request: DdlRequest::Create(v),
703            }),
704            RegionRequest::Drop(v) => WorkerRequest::Ddl(SenderDdlRequest {
705                region_id,
706                sender: sender.into(),
707                request: DdlRequest::Drop(v),
708            }),
709            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
710                region_id,
711                sender: sender.into(),
712                request: DdlRequest::Open((v, None)),
713            }),
714            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
715                region_id,
716                sender: sender.into(),
717                request: DdlRequest::Close(v),
718            }),
719            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
720                region_id,
721                sender: sender.into(),
722                request: DdlRequest::Alter(v),
723            }),
724            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
725                region_id,
726                sender: sender.into(),
727                request: DdlRequest::Flush(v),
728            }),
729            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
730                region_id,
731                sender: sender.into(),
732                request: DdlRequest::Compact(v),
733            }),
734            RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
735                region_id,
736                sender: sender.into(),
737                request: DdlRequest::BuildIndex(v),
738            }),
739            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
740                region_id,
741                sender: sender.into(),
742                request: DdlRequest::Truncate(v),
743            }),
744            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
745                region_id,
746                sender: sender.into(),
747                request: DdlRequest::Catchup((v, None)),
748            }),
749            RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
750                region_id,
751                sender: sender.into(),
752                request: DdlRequest::EnterStaging(v),
753            }),
754            RegionRequest::BulkInserts(region_bulk_inserts_request) => {
755                WorkerRequest::BulkInserts(BulkInsertRequest {
756                    metadata: region_metadata,
757                    sender: sender.into(),
758                    request: region_bulk_inserts_request,
759                })
760            }
761            RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest {
762                region_id,
763                sender: sender.into(),
764                request: DdlRequest::ApplyStagingManifest(v),
765            }),
766        };
767
768        Ok((worker_request, receiver))
769    }
770
771    pub(crate) fn new_set_readonly_gracefully(
772        region_id: RegionId,
773        region_role_state: SettableRegionRoleState,
774    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
775        let (sender, receiver) = oneshot::channel();
776
777        (
778            WorkerRequest::SetRegionRoleStateGracefully {
779                region_id,
780                region_role_state,
781                sender,
782            },
783            receiver,
784        )
785    }
786
787    pub(crate) fn new_sync_region_request(
788        region_id: RegionId,
789        manifest_version: ManifestVersion,
790    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
791        let (sender, receiver) = oneshot::channel();
792        (
793            WorkerRequest::SyncRegion(RegionSyncRequest {
794                region_id,
795                manifest_version,
796                sender,
797            }),
798            receiver,
799        )
800    }
801
802    /// Converts [RemapManifestsRequest] from a [RemapManifestsRequest](store_api::region_engine::RemapManifestsRequest).
803    ///
804    /// # Errors
805    ///
806    /// Returns an error if the partition expression is invalid or missing.
807    /// Returns an error if the new partition expressions are not found for some regions.
808    #[allow(clippy::type_complexity)]
809    pub(crate) fn try_from_remap_manifests_request(
810        store_api::region_engine::RemapManifestsRequest {
811            region_id,
812            input_regions,
813            region_mapping,
814            new_partition_exprs,
815        }: store_api::region_engine::RemapManifestsRequest,
816    ) -> Result<(WorkerRequest, Receiver<Result<HashMap<RegionId, String>>>)> {
817        let (sender, receiver) = oneshot::channel();
818        let new_partition_exprs = new_partition_exprs
819            .into_iter()
820            .map(|(k, v)| {
821                Ok((
822                    k,
823                    PartitionExpr::from_json_str(&v)
824                        .context(InvalidPartitionExprSnafu { expr: v })?
825                        .context(MissingPartitionExprSnafu { region_id: k })?,
826                ))
827            })
828            .collect::<Result<HashMap<_, _>>>()?;
829
830        let request = RemapManifestsRequest {
831            region_id,
832            input_regions,
833            region_mapping,
834            new_partition_exprs,
835            sender,
836        };
837
838        Ok((WorkerRequest::RemapManifests(request), receiver))
839    }
840
841    /// Converts [CopyRegionFromRequest] from a [MitoCopyRegionFromRequest](store_api::region_engine::MitoCopyRegionFromRequest).
842    pub(crate) fn try_from_copy_region_from_request(
843        region_id: RegionId,
844        store_api::region_engine::MitoCopyRegionFromRequest {
845            source_region_id,
846            parallelism,
847        }: store_api::region_engine::MitoCopyRegionFromRequest,
848    ) -> Result<(WorkerRequest, Receiver<Result<MitoCopyRegionFromResponse>>)> {
849        let (sender, receiver) = oneshot::channel();
850        let request = CopyRegionFromRequest {
851            region_id,
852            source_region_id,
853            parallelism,
854            sender,
855        };
856        Ok((WorkerRequest::CopyRegionFrom(request), receiver))
857    }
858}
859
860/// DDL request to a region.
861#[derive(Debug)]
862pub(crate) enum DdlRequest {
863    Create(RegionCreateRequest),
864    Drop(RegionDropRequest),
865    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
866    Close(RegionCloseRequest),
867    Alter(RegionAlterRequest),
868    Flush(RegionFlushRequest),
869    Compact(RegionCompactRequest),
870    BuildIndex(RegionBuildIndexRequest),
871    Truncate(RegionTruncateRequest),
872    Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
873    EnterStaging(EnterStagingRequest),
874    ApplyStagingManifest(ApplyStagingManifestRequest),
875}
876
877/// Sender and Ddl request.
878#[derive(Debug)]
879pub(crate) struct SenderDdlRequest {
880    /// Region id of the request.
881    pub(crate) region_id: RegionId,
882    /// Result sender.
883    pub(crate) sender: OptionOutputTx,
884    /// Ddl request.
885    pub(crate) request: DdlRequest,
886}
887
888/// Notification from a background job.
889#[derive(Debug)]
890pub(crate) enum BackgroundNotify {
891    /// Flush has finished.
892    FlushFinished(FlushFinished),
893    /// Flush has failed.
894    FlushFailed(FlushFailed),
895    /// Index build has finished.
896    IndexBuildFinished(IndexBuildFinished),
897    /// Index build has been stopped (aborted or succeeded).
898    IndexBuildStopped(IndexBuildStopped),
899    /// Index build has failed.
900    IndexBuildFailed(IndexBuildFailed),
901    /// Compaction has finished.
902    CompactionFinished(CompactionFinished),
903    /// Compaction has been cancelled cooperatively.
904    CompactionCancelled(CompactionCancelled),
905    /// Compaction has failed.
906    CompactionFailed(CompactionFailed),
907    /// Truncate result.
908    Truncate(TruncateResult),
909    /// Region change result.
910    RegionChange(RegionChangeResult),
911    /// Region edit result.
912    RegionEdit(RegionEditResult),
913    /// Enter staging result.
914    EnterStaging(EnterStagingResult),
915    /// Copy region result.
916    CopyRegionFromFinished(CopyRegionFromFinished),
917}
918
919/// Notifies a flush job is finished.
920#[derive(Debug)]
921pub(crate) struct FlushFinished {
922    /// Region id.
923    pub(crate) region_id: RegionId,
924    /// Entry id of flushed data.
925    pub(crate) flushed_entry_id: EntryId,
926    /// Flush result senders.
927    pub(crate) senders: Vec<OutputTx>,
928    /// Flush timer.
929    pub(crate) _timer: HistogramTimer,
930    /// Region edit to apply.
931    pub(crate) edit: RegionEdit,
932    /// Memtables to remove.
933    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
934    /// Whether the region is in staging mode.
935    pub(crate) is_staging: bool,
936    /// Reason for flush.
937    pub(crate) flush_reason: FlushReason,
938}
939
940impl FlushFinished {
941    /// Marks the flush job as successful and observes the timer.
942    pub(crate) fn on_success(self) {
943        for sender in self.senders {
944            sender.send(Ok(0));
945        }
946    }
947}
948
949impl OnFailure for FlushFinished {
950    fn on_failure(&mut self, err: Error) {
951        let err = Arc::new(err);
952        for sender in self.senders.drain(..) {
953            sender.send(Err(err.clone()).context(FlushRegionSnafu {
954                region_id: self.region_id,
955            }));
956        }
957    }
958}
959
960/// Notifies a flush job is failed.
961#[derive(Debug)]
962pub(crate) struct FlushFailed {
963    /// The error source of the failure.
964    pub(crate) err: Arc<Error>,
965}
966
967#[derive(Debug)]
968pub(crate) struct IndexBuildFinished {
969    #[allow(dead_code)]
970    pub(crate) region_id: RegionId,
971    pub(crate) edit: RegionEdit,
972}
973
974/// Notifies an index build job has been stopped.
975#[derive(Debug)]
976pub(crate) struct IndexBuildStopped {
977    #[allow(dead_code)]
978    pub(crate) region_id: RegionId,
979    pub(crate) file_id: FileId,
980}
981
982/// Notifies an index build job has failed.
983#[derive(Debug)]
984pub(crate) struct IndexBuildFailed {
985    pub(crate) err: Arc<Error>,
986}
987
988/// Notifies a compaction job has finished.
989#[derive(Debug)]
990pub(crate) struct CompactionFinished {
991    /// Region id.
992    pub(crate) region_id: RegionId,
993    /// Compaction result senders.
994    pub(crate) senders: Vec<OutputTx>,
995    /// Start time of compaction task.
996    pub(crate) start_time: Instant,
997    /// Region edit to apply.
998    pub(crate) edit: RegionEdit,
999}
1000
1001/// Notifies a compaction job has been cancelled cooperatively.
1002#[derive(Debug)]
1003pub(crate) struct CompactionCancelled {
1004    /// Region id.
1005    pub(crate) region_id: RegionId,
1006    /// Waiters to wake once the cancellation has been observed by the worker.
1007    pub(crate) senders: Vec<OutputTx>,
1008}
1009
1010impl CompactionCancelled {
1011    pub(crate) fn on_success(self) {
1012        for sender in self.senders {
1013            sender.send(CompactionCancelledSnafu {}.fail());
1014        }
1015        info!("Compaction cancelled for region: {}", self.region_id);
1016    }
1017}
1018
1019impl CompactionFinished {
1020    pub fn on_success(self) {
1021        // only update compaction time on success
1022        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
1023
1024        for sender in self.senders {
1025            sender.send(Ok(0));
1026        }
1027        info!("Successfully compacted region: {}", self.region_id);
1028    }
1029}
1030
1031impl OnFailure for CompactionFinished {
1032    /// Compaction succeeded but failed to update manifest or region's already been dropped.
1033    fn on_failure(&mut self, err: Error) {
1034        let err = Arc::new(err);
1035        for sender in self.senders.drain(..) {
1036            sender.send(Err(err.clone()).context(CompactRegionSnafu {
1037                region_id: self.region_id,
1038            }));
1039        }
1040    }
1041}
1042
1043/// A failing compaction result.
1044#[derive(Debug)]
1045pub(crate) struct CompactionFailed {
1046    pub(crate) region_id: RegionId,
1047    /// The error source of the failure.
1048    pub(crate) err: Arc<Error>,
1049}
1050
1051/// Notifies the truncate result of a region.
1052#[derive(Debug)]
1053pub(crate) struct TruncateResult {
1054    /// Region id.
1055    pub(crate) region_id: RegionId,
1056    /// Result sender.
1057    pub(crate) sender: OptionOutputTx,
1058    /// Truncate result.
1059    pub(crate) result: Result<()>,
1060    pub(crate) kind: TruncateKind,
1061}
1062
1063/// Notifies the region the result of writing region change action.
1064#[derive(Debug)]
1065pub(crate) struct RegionChangeResult {
1066    /// Region id.
1067    pub(crate) region_id: RegionId,
1068    /// The new region metadata to apply.
1069    pub(crate) new_meta: RegionMetadataRef,
1070    /// Result sender.
1071    pub(crate) sender: OptionOutputTx,
1072    /// Result from the manifest manager.
1073    pub(crate) result: Result<()>,
1074    /// Used for index build in schema change.
1075    pub(crate) need_index: bool,
1076    /// New options for the region.
1077    pub(crate) new_options: Option<RegionOptions>,
1078}
1079
1080/// Notifies the region the result of entering staging.
1081#[derive(Debug)]
1082pub(crate) struct EnterStagingResult {
1083    /// Region id.
1084    pub(crate) region_id: RegionId,
1085    /// The new staging partition directive to apply.
1086    pub(crate) partition_directive: StagingPartitionDirective,
1087    /// Result sender.
1088    pub(crate) sender: OptionOutputTx,
1089    /// Result from the manifest manager.
1090    pub(crate) result: Result<()>,
1091}
1092
1093#[derive(Debug)]
1094pub(crate) struct CopyRegionFromFinished {
1095    /// Region id.
1096    pub(crate) region_id: RegionId,
1097    /// Region edit to apply.
1098    pub(crate) edit: RegionEdit,
1099    /// Result sender.
1100    pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1101}
1102
1103#[derive(Debug, Default)]
1104pub(crate) struct Waiters(SmallVec<[Sender<Result<()>>; 1]>);
1105
1106impl Waiters {
1107    pub(crate) fn one(waiter: Sender<Result<()>>) -> Self {
1108        let mut waiters = SmallVec::new();
1109        waiters.push(waiter);
1110        Self(waiters)
1111    }
1112
1113    pub(crate) fn reply_with<F: Fn() -> Result<()>>(self, f: F) {
1114        for tx in self.0 {
1115            let _ = tx.send(f());
1116        }
1117    }
1118
1119    pub(crate) fn merge(&mut self, other: Self) {
1120        self.0.extend(other.0);
1121    }
1122}
1123
1124/// Request to edit a region directly.
1125#[derive(Debug)]
1126pub(crate) struct RegionEditRequest {
1127    pub(crate) region_id: RegionId,
1128    pub(crate) edit: RegionEdit,
1129    /// Whether to preload SST files into the write cache.
1130    pub(crate) preload_sst_cache: bool,
1131    /// The waiters that are waiting for this region edit's result.
1132    pub(crate) waiters: Waiters,
1133}
1134
1135impl RegionEditRequest {
1136    pub(crate) fn new(
1137        region_id: RegionId,
1138        edit: RegionEdit,
1139        preload_sst_cache: bool,
1140        waiter: Sender<Result<()>>,
1141    ) -> Self {
1142        Self {
1143            region_id,
1144            edit,
1145            preload_sst_cache,
1146            waiters: Waiters::one(waiter),
1147        }
1148    }
1149}
1150
1151/// Notifies the regin the result of editing region.
1152#[derive(Debug)]
1153pub(crate) struct RegionEditResult {
1154    /// Region id.
1155    pub(crate) region_id: RegionId,
1156    /// Result waiters.
1157    pub(crate) waiters: Waiters,
1158    /// Region edit to apply.
1159    pub(crate) edit: RegionEdit,
1160    /// Result from the manifest manager.
1161    pub(crate) result: std::result::Result<(), Arc<Error>>,
1162    /// Whether region state need to be set to Writable after handling this request.
1163    pub(crate) update_region_state: bool,
1164    /// The region is in staging mode before handling this request.
1165    pub(crate) is_staging: bool,
1166}
1167
1168#[derive(Debug)]
1169pub(crate) struct BuildIndexRequest {
1170    pub(crate) region_id: RegionId,
1171    pub(crate) build_type: IndexBuildType,
1172    /// files need to build index, empty means all.
1173    pub(crate) file_metas: Vec<FileMeta>,
1174}
1175
1176#[derive(Debug)]
1177pub(crate) struct RegionSyncRequest {
1178    pub(crate) region_id: RegionId,
1179    pub(crate) manifest_version: ManifestVersion,
1180    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
1181    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
1182}
1183
1184#[derive(Debug)]
1185pub(crate) struct RemapManifestsRequest {
1186    /// The [`RegionId`] of a staging region used to obtain table directory and storage configuration for the remap operation.
1187    pub(crate) region_id: RegionId,
1188    /// Regions to remap manifests from.
1189    pub(crate) input_regions: Vec<RegionId>,
1190    /// For each old region, which new regions should receive its files
1191    pub(crate) region_mapping: HashMap<RegionId, Vec<RegionId>>,
1192    /// New partition expressions for the new regions.
1193    pub(crate) new_partition_exprs: HashMap<RegionId, PartitionExpr>,
1194    /// Sender for the result of the remap operation.
1195    ///
1196    /// The result is a map from region IDs to their corresponding staging manifest paths.
1197    pub(crate) sender: Sender<Result<HashMap<RegionId, String>>>,
1198}
1199
1200#[derive(Debug)]
1201pub(crate) struct CopyRegionFromRequest {
1202    /// The [`RegionId`] of the target region.
1203    pub(crate) region_id: RegionId,
1204    /// The [`RegionId`] of the source region.
1205    pub(crate) source_region_id: RegionId,
1206    /// The parallelism of the copy operation.
1207    pub(crate) parallelism: usize,
1208    /// Result sender.
1209    pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214    use api::v1::value::ValueData;
1215    use api::v1::{Row, SemanticType};
1216    use common_error::ext::ErrorExt;
1217    use common_error::status_code::StatusCode;
1218    use datatypes::prelude::ConcreteDataType;
1219    use datatypes::schema::ColumnDefaultConstraint;
1220    use mito_codec::test_util::i64_value;
1221    use store_api::metadata::RegionMetadataBuilder;
1222    use tokio::sync::oneshot;
1223
1224    use super::*;
1225    use crate::error::Error;
1226    use crate::test_util::ts_ms_value;
1227
1228    fn new_column_schema(
1229        name: &str,
1230        data_type: ColumnDataType,
1231        semantic_type: SemanticType,
1232    ) -> ColumnSchema {
1233        ColumnSchema {
1234            column_name: name.to_string(),
1235            datatype: data_type as i32,
1236            semantic_type: semantic_type as i32,
1237            ..Default::default()
1238        }
1239    }
1240
1241    fn check_invalid_request(err: &Error, expect: &str) {
1242        if let Error::InvalidRequest {
1243            region_id: _,
1244            reason,
1245            location: _,
1246        } = err
1247        {
1248            assert_eq!(reason, expect);
1249        } else {
1250            panic!("Unexpected error {err}")
1251        }
1252    }
1253
1254    fn waiter() -> (Sender<Result<()>>, Receiver<Result<()>>) {
1255        oneshot::channel()
1256    }
1257
1258    fn assert_waiter_ok(rx: &mut Receiver<Result<()>>) {
1259        rx.try_recv().unwrap().unwrap();
1260    }
1261
1262    #[test]
1263    fn test_waiters_reply_with_single_waiter() {
1264        let (tx, mut rx) = waiter();
1265        Waiters::one(tx).reply_with(|| Ok(()));
1266        assert_waiter_ok(&mut rx);
1267    }
1268
1269    #[test]
1270    fn test_waiters_reply_with_many_waiters() {
1271        let (tx1, mut rx1) = waiter();
1272        let (tx2, mut rx2) = waiter();
1273        let (tx3, mut rx3) = waiter();
1274
1275        let waiters = Waiters(vec![tx1, tx2, tx3].into());
1276        waiters.reply_with(|| Ok(()));
1277
1278        assert_waiter_ok(&mut rx1);
1279        assert_waiter_ok(&mut rx2);
1280        assert_waiter_ok(&mut rx3);
1281    }
1282
1283    #[test]
1284    fn test_waiters_merge() {
1285        let (tx1, mut rx1) = waiter();
1286        let (tx2, mut rx2) = waiter();
1287        let (tx3, mut rx3) = waiter();
1288        let (tx4, mut rx4) = waiter();
1289
1290        let mut waiters = Waiters::one(tx1);
1291        waiters.merge(Waiters::one(tx2));
1292        waiters.merge(Waiters(vec![tx3, tx4].into()));
1293        assert_eq!(4, waiters.0.len());
1294
1295        waiters.reply_with(|| Ok(()));
1296
1297        assert_waiter_ok(&mut rx1);
1298        assert_waiter_ok(&mut rx2);
1299        assert_waiter_ok(&mut rx3);
1300        assert_waiter_ok(&mut rx4);
1301    }
1302
1303    #[test]
1304    fn test_write_request_duplicate_column() {
1305        let rows = Rows {
1306            schema: vec![
1307                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1308                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1309            ],
1310            rows: vec![],
1311        };
1312
1313        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1314        check_invalid_request(&err, "duplicate column c0");
1315    }
1316
1317    #[test]
1318    fn test_valid_write_request() {
1319        let rows = Rows {
1320            schema: vec![
1321                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1322                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1323            ],
1324            rows: vec![Row {
1325                values: vec![i64_value(1), i64_value(2)],
1326            }],
1327        };
1328
1329        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1330        assert_eq!(0, request.column_index_by_name("c0").unwrap());
1331        assert_eq!(1, request.column_index_by_name("c1").unwrap());
1332        assert_eq!(None, request.column_index_by_name("c2"));
1333    }
1334
1335    #[test]
1336    fn test_compaction_cancelled_sends_cancelled_error() {
1337        let (tx, rx) = oneshot::channel();
1338        let request = CompactionCancelled {
1339            region_id: RegionId::new(1, 1),
1340            senders: vec![OutputTx::new(tx)],
1341        };
1342
1343        request.on_success();
1344
1345        let err = rx.blocking_recv().unwrap().unwrap_err();
1346        assert!(matches!(err, Error::CompactionCancelled { .. }));
1347        assert_eq!(err.status_code(), StatusCode::Cancelled);
1348    }
1349
1350    #[test]
1351    fn test_write_request_column_num() {
1352        let rows = Rows {
1353            schema: vec![
1354                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1355                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1356            ],
1357            rows: vec![Row {
1358                values: vec![i64_value(1), i64_value(2), i64_value(3)],
1359            }],
1360        };
1361
1362        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1363        check_invalid_request(&err, "row has 3 columns but schema has 2");
1364    }
1365
1366    fn new_region_metadata() -> RegionMetadata {
1367        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1368        builder
1369            .push_column_metadata(ColumnMetadata {
1370                column_schema: datatypes::schema::ColumnSchema::new(
1371                    "ts",
1372                    ConcreteDataType::timestamp_millisecond_datatype(),
1373                    false,
1374                ),
1375                semantic_type: SemanticType::Timestamp,
1376                column_id: 1,
1377            })
1378            .push_column_metadata(ColumnMetadata {
1379                column_schema: datatypes::schema::ColumnSchema::new(
1380                    "k0",
1381                    ConcreteDataType::int64_datatype(),
1382                    true,
1383                ),
1384                semantic_type: SemanticType::Tag,
1385                column_id: 2,
1386            })
1387            .primary_key(vec![2]);
1388        builder.build().unwrap()
1389    }
1390
1391    #[test]
1392    fn test_check_schema() {
1393        let rows = Rows {
1394            schema: vec![
1395                new_column_schema(
1396                    "ts",
1397                    ColumnDataType::TimestampMillisecond,
1398                    SemanticType::Timestamp,
1399                ),
1400                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1401            ],
1402            rows: vec![Row {
1403                values: vec![ts_ms_value(1), i64_value(2)],
1404            }],
1405        };
1406        let metadata = new_region_metadata();
1407
1408        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1409        request.check_schema(&metadata).unwrap();
1410    }
1411
1412    #[test]
1413    fn test_column_type() {
1414        let rows = Rows {
1415            schema: vec![
1416                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1417                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1418            ],
1419            rows: vec![Row {
1420                values: vec![i64_value(1), i64_value(2)],
1421            }],
1422        };
1423        let metadata = new_region_metadata();
1424
1425        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1426        let err = request.check_schema(&metadata).unwrap_err();
1427        check_invalid_request(
1428            &err,
1429            "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1430        );
1431    }
1432
1433    #[test]
1434    fn test_semantic_type() {
1435        let rows = Rows {
1436            schema: vec![
1437                new_column_schema(
1438                    "ts",
1439                    ColumnDataType::TimestampMillisecond,
1440                    SemanticType::Tag,
1441                ),
1442                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1443            ],
1444            rows: vec![Row {
1445                values: vec![ts_ms_value(1), i64_value(2)],
1446            }],
1447        };
1448        let metadata = new_region_metadata();
1449
1450        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1451        let err = request.check_schema(&metadata).unwrap_err();
1452        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1453    }
1454
1455    #[test]
1456    fn test_column_nullable() {
1457        let rows = Rows {
1458            schema: vec![
1459                new_column_schema(
1460                    "ts",
1461                    ColumnDataType::TimestampMillisecond,
1462                    SemanticType::Timestamp,
1463                ),
1464                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1465            ],
1466            rows: vec![Row {
1467                values: vec![Value { value_data: None }, i64_value(2)],
1468            }],
1469        };
1470        let metadata = new_region_metadata();
1471
1472        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1473        let err = request.check_schema(&metadata).unwrap_err();
1474        check_invalid_request(&err, "column ts is not null but input has null");
1475    }
1476
1477    #[test]
1478    fn test_column_default() {
1479        let rows = Rows {
1480            schema: vec![new_column_schema(
1481                "k0",
1482                ColumnDataType::Int64,
1483                SemanticType::Tag,
1484            )],
1485            rows: vec![Row {
1486                values: vec![i64_value(1)],
1487            }],
1488        };
1489        let metadata = new_region_metadata();
1490
1491        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1492        let err = request.check_schema(&metadata).unwrap_err();
1493        check_invalid_request(&err, "missing column ts");
1494    }
1495
1496    #[test]
1497    fn test_unknown_column() {
1498        let rows = Rows {
1499            schema: vec![
1500                new_column_schema(
1501                    "ts",
1502                    ColumnDataType::TimestampMillisecond,
1503                    SemanticType::Timestamp,
1504                ),
1505                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1506                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1507            ],
1508            rows: vec![Row {
1509                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1510            }],
1511        };
1512        let metadata = new_region_metadata();
1513
1514        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1515        let err = request.check_schema(&metadata).unwrap_err();
1516        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1517    }
1518
1519    #[test]
1520    fn test_fill_impure_columns_err() {
1521        let rows = Rows {
1522            schema: vec![new_column_schema(
1523                "k0",
1524                ColumnDataType::Int64,
1525                SemanticType::Tag,
1526            )],
1527            rows: vec![Row {
1528                values: vec![i64_value(1)],
1529            }],
1530        };
1531        let metadata = {
1532            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1533            builder
1534                .push_column_metadata(ColumnMetadata {
1535                    column_schema: datatypes::schema::ColumnSchema::new(
1536                        "ts",
1537                        ConcreteDataType::timestamp_millisecond_datatype(),
1538                        false,
1539                    )
1540                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1541                        "now()".to_string(),
1542                    )))
1543                    .unwrap(),
1544                    semantic_type: SemanticType::Timestamp,
1545                    column_id: 1,
1546                })
1547                .push_column_metadata(ColumnMetadata {
1548                    column_schema: datatypes::schema::ColumnSchema::new(
1549                        "k0",
1550                        ConcreteDataType::int64_datatype(),
1551                        true,
1552                    ),
1553                    semantic_type: SemanticType::Tag,
1554                    column_id: 2,
1555                })
1556                .primary_key(vec![2]);
1557            builder.build().unwrap()
1558        };
1559
1560        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1561        let err = request.check_schema(&metadata).unwrap_err();
1562        assert!(err.is_fill_default());
1563        assert!(
1564            request
1565                .fill_missing_columns(&metadata)
1566                .unwrap_err()
1567                .to_string()
1568                .contains("unexpected impure default value with region_id")
1569        );
1570    }
1571
1572    #[test]
1573    fn test_fill_missing_columns() {
1574        let rows = Rows {
1575            schema: vec![new_column_schema(
1576                "ts",
1577                ColumnDataType::TimestampMillisecond,
1578                SemanticType::Timestamp,
1579            )],
1580            rows: vec![Row {
1581                values: vec![ts_ms_value(1)],
1582            }],
1583        };
1584        let metadata = new_region_metadata();
1585
1586        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1587        let err = request.check_schema(&metadata).unwrap_err();
1588        assert!(err.is_fill_default());
1589        request.fill_missing_columns(&metadata).unwrap();
1590
1591        let expect_rows = Rows {
1592            schema: vec![new_column_schema(
1593                "ts",
1594                ColumnDataType::TimestampMillisecond,
1595                SemanticType::Timestamp,
1596            )],
1597            rows: vec![Row {
1598                values: vec![ts_ms_value(1)],
1599            }],
1600        };
1601        assert_eq!(expect_rows, request.rows);
1602    }
1603
1604    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1605        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1606        builder
1607            .push_column_metadata(ColumnMetadata {
1608                column_schema: datatypes::schema::ColumnSchema::new(
1609                    "ts",
1610                    ConcreteDataType::timestamp_millisecond_datatype(),
1611                    false,
1612                ),
1613                semantic_type: SemanticType::Timestamp,
1614                column_id: 1,
1615            })
1616            .push_column_metadata(ColumnMetadata {
1617                column_schema: datatypes::schema::ColumnSchema::new(
1618                    "k0",
1619                    ConcreteDataType::int64_datatype(),
1620                    true,
1621                ),
1622                semantic_type: SemanticType::Tag,
1623                column_id: 2,
1624            })
1625            .primary_key(vec![2]);
1626        builder
1627    }
1628
1629    fn region_metadata_two_fields() -> RegionMetadata {
1630        let mut builder = builder_with_ts_tag();
1631        builder
1632            .push_column_metadata(ColumnMetadata {
1633                column_schema: datatypes::schema::ColumnSchema::new(
1634                    "f0",
1635                    ConcreteDataType::int64_datatype(),
1636                    true,
1637                ),
1638                semantic_type: SemanticType::Field,
1639                column_id: 3,
1640            })
1641            // Column is not nullable.
1642            .push_column_metadata(ColumnMetadata {
1643                column_schema: datatypes::schema::ColumnSchema::new(
1644                    "f1",
1645                    ConcreteDataType::int64_datatype(),
1646                    false,
1647                )
1648                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1649                    datatypes::value::Value::Int64(100),
1650                )))
1651                .unwrap(),
1652                semantic_type: SemanticType::Field,
1653                column_id: 4,
1654            });
1655        builder.build().unwrap()
1656    }
1657
1658    #[test]
1659    fn test_fill_missing_for_delete() {
1660        let rows = Rows {
1661            schema: vec![new_column_schema(
1662                "ts",
1663                ColumnDataType::TimestampMillisecond,
1664                SemanticType::Timestamp,
1665            )],
1666            rows: vec![Row {
1667                values: vec![ts_ms_value(1)],
1668            }],
1669        };
1670        let metadata = region_metadata_two_fields();
1671
1672        let mut request =
1673            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1674        let err = request.check_schema(&metadata).unwrap_err();
1675        check_invalid_request(&err, "delete requests need column k0");
1676        let err = request.fill_missing_columns(&metadata).unwrap_err();
1677        check_invalid_request(&err, "delete requests need column k0");
1678
1679        let rows = Rows {
1680            schema: vec![
1681                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1682                new_column_schema(
1683                    "ts",
1684                    ColumnDataType::TimestampMillisecond,
1685                    SemanticType::Timestamp,
1686                ),
1687            ],
1688            rows: vec![Row {
1689                values: vec![i64_value(100), ts_ms_value(1)],
1690            }],
1691        };
1692        let mut request =
1693            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1694        let err = request.check_schema(&metadata).unwrap_err();
1695        assert!(err.is_fill_default());
1696        request.fill_missing_columns(&metadata).unwrap();
1697
1698        let expect_rows = Rows {
1699            schema: vec![
1700                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1701                new_column_schema(
1702                    "ts",
1703                    ColumnDataType::TimestampMillisecond,
1704                    SemanticType::Timestamp,
1705                ),
1706                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1707            ],
1708            // Column f1 is not nullable and we use 0 for padding.
1709            rows: vec![Row {
1710                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1711            }],
1712        };
1713        assert_eq!(expect_rows, request.rows);
1714    }
1715
1716    #[test]
1717    fn test_fill_missing_without_default_in_delete() {
1718        let mut builder = builder_with_ts_tag();
1719        builder
1720            // f0 is nullable.
1721            .push_column_metadata(ColumnMetadata {
1722                column_schema: datatypes::schema::ColumnSchema::new(
1723                    "f0",
1724                    ConcreteDataType::int64_datatype(),
1725                    true,
1726                ),
1727                semantic_type: SemanticType::Field,
1728                column_id: 3,
1729            })
1730            // f1 is not nullable and don't has default.
1731            .push_column_metadata(ColumnMetadata {
1732                column_schema: datatypes::schema::ColumnSchema::new(
1733                    "f1",
1734                    ConcreteDataType::int64_datatype(),
1735                    false,
1736                ),
1737                semantic_type: SemanticType::Field,
1738                column_id: 4,
1739            });
1740        let metadata = builder.build().unwrap();
1741
1742        let rows = Rows {
1743            schema: vec![
1744                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1745                new_column_schema(
1746                    "ts",
1747                    ColumnDataType::TimestampMillisecond,
1748                    SemanticType::Timestamp,
1749                ),
1750            ],
1751            // Missing f0 (nullable), f1 (not nullable).
1752            rows: vec![Row {
1753                values: vec![i64_value(100), ts_ms_value(1)],
1754            }],
1755        };
1756        let mut request =
1757            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1758        let err = request.check_schema(&metadata).unwrap_err();
1759        assert!(err.is_fill_default());
1760        request.fill_missing_columns(&metadata).unwrap();
1761
1762        let expect_rows = Rows {
1763            schema: vec![
1764                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1765                new_column_schema(
1766                    "ts",
1767                    ColumnDataType::TimestampMillisecond,
1768                    SemanticType::Timestamp,
1769                ),
1770                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1771            ],
1772            // Column f1 is not nullable and we use 0 for padding.
1773            rows: vec![Row {
1774                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1775            }],
1776        };
1777        assert_eq!(expect_rows, request.rows);
1778    }
1779
1780    #[test]
1781    fn test_no_default() {
1782        let rows = Rows {
1783            schema: vec![new_column_schema(
1784                "k0",
1785                ColumnDataType::Int64,
1786                SemanticType::Tag,
1787            )],
1788            rows: vec![Row {
1789                values: vec![i64_value(1)],
1790            }],
1791        };
1792        let metadata = new_region_metadata();
1793
1794        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1795        let err = request.fill_missing_columns(&metadata).unwrap_err();
1796        check_invalid_request(&err, "column ts does not have default value");
1797    }
1798
1799    #[test]
1800    fn test_missing_and_invalid() {
1801        // Missing f0 and f1 has invalid type (string).
1802        let rows = Rows {
1803            schema: vec![
1804                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1805                new_column_schema(
1806                    "ts",
1807                    ColumnDataType::TimestampMillisecond,
1808                    SemanticType::Timestamp,
1809                ),
1810                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1811            ],
1812            rows: vec![Row {
1813                values: vec![
1814                    i64_value(100),
1815                    ts_ms_value(1),
1816                    Value {
1817                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1818                    },
1819                ],
1820            }],
1821        };
1822        let metadata = region_metadata_two_fields();
1823
1824        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1825        let err = request.check_schema(&metadata).unwrap_err();
1826        check_invalid_request(
1827            &err,
1828            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1829        );
1830    }
1831
1832    #[test]
1833    fn test_write_request_metadata() {
1834        let rows = Rows {
1835            schema: vec![
1836                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1837                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1838            ],
1839            rows: vec![Row {
1840                values: vec![i64_value(1), i64_value(2)],
1841            }],
1842        };
1843
1844        let metadata = Arc::new(new_region_metadata());
1845        let request = WriteRequest::new(
1846            RegionId::new(1, 1),
1847            OpType::Put,
1848            rows,
1849            Some(metadata.clone()),
1850        )
1851        .unwrap();
1852
1853        assert!(request.region_metadata.is_some());
1854        assert_eq!(
1855            request.region_metadata.unwrap().region_id,
1856            RegionId::new(1, 1)
1857        );
1858    }
1859}