Skip to main content

mito2/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Flush related utilities and structs.
16
17use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use bytes::Bytes;
24use common_telemetry::{debug, error, info};
25use datatypes::arrow::datatypes::SchemaRef;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::storage::{RegionId, SequenceNumber};
30use strum::IntoStaticStr;
31use tokio::sync::{Semaphore, mpsc, watch};
32
33use crate::access_layer::{
34    AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
35};
36use crate::cache::CacheManagerRef;
37use crate::config::MitoConfig;
38use crate::error::{
39    Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu,
40    RegionTruncatedSnafu, Result,
41};
42use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
43use crate::memtable::bulk::ENCODE_ROW_THRESHOLD;
44use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, MemtableRanges, RangesOptions};
45use crate::metrics::{
46    FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_FILE_TOTAL, FLUSH_REQUESTS_TOTAL,
47    INFLIGHT_FLUSH_COUNT,
48};
49use crate::read::FlatSource;
50use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
51use crate::read::flat_merge::FlatMergeIterator;
52use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
53use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
54use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr};
55use crate::request::{
56    BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
57    SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
58};
59use crate::schedule::scheduler::{Job, SchedulerRef};
60use crate::sst::file::FileMeta;
61use crate::sst::parquet::metadata::extract_primary_key_range;
62use crate::sst::parquet::{
63    DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions, flat_format,
64};
65use crate::sst::{FlatSchemaOptions, FormatType, to_flat_sst_arrow_schema};
66use crate::worker::WorkerListener;
67
68/// Global write buffer (memtable) manager.
69///
70/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
71pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
72    /// Returns whether to trigger the engine.
73    fn should_flush_engine(&self) -> bool;
74
75    /// Returns whether to stall write requests.
76    fn should_stall(&self) -> bool;
77
78    /// Reserves `mem` bytes.
79    fn reserve_mem(&self, mem: usize);
80
81    /// Tells the manager we are freeing `mem` bytes.
82    ///
83    /// We are in the process of freeing `mem` bytes, so it is not considered
84    /// when checking the soft limit.
85    fn schedule_free_mem(&self, mem: usize);
86
87    /// We have freed `mem` bytes.
88    fn free_mem(&self, mem: usize);
89
90    /// Returns the total memory used by memtables.
91    fn memory_usage(&self) -> usize;
92
93    /// Returns the mutable memtable memory limit.
94    ///
95    /// The write buffer manager should flush memtables when the mutable memory usage
96    /// exceeds this limit.
97    fn flush_limit(&self) -> usize;
98}
99
100pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
101
102/// Default [WriteBufferManager] implementation.
103///
104/// Inspired by RocksDB's WriteBufferManager.
105/// <https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h>
106#[derive(Debug)]
107pub struct WriteBufferManagerImpl {
108    /// Write buffer size for the engine.
109    global_write_buffer_size: usize,
110    /// Mutable memtable memory size limit.
111    mutable_limit: usize,
112    /// Memory in used (e.g. used by mutable and immutable memtables).
113    memory_used: AtomicUsize,
114    /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
115    memory_active: AtomicUsize,
116    /// Optional notifier.
117    /// The manager can wake up the worker once we free the write buffer.
118    notifier: Option<watch::Sender<()>>,
119}
120
121impl WriteBufferManagerImpl {
122    /// Returns a new manager with specific `global_write_buffer_size`.
123    pub fn new(global_write_buffer_size: usize) -> Self {
124        Self {
125            global_write_buffer_size,
126            mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
127            memory_used: AtomicUsize::new(0),
128            memory_active: AtomicUsize::new(0),
129            notifier: None,
130        }
131    }
132
133    /// Attaches a notifier to the manager.
134    pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
135        self.notifier = Some(notifier);
136        self
137    }
138
139    /// Returns memory usage of mutable memtables.
140    pub fn mutable_usage(&self) -> usize {
141        self.memory_active.load(Ordering::Relaxed)
142    }
143
144    /// Returns the size limit for mutable memtables.
145    fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
146        // Reserves half of the write buffer for mutable memtable.
147        global_write_buffer_size / 2
148    }
149}
150
151impl WriteBufferManager for WriteBufferManagerImpl {
152    fn should_flush_engine(&self) -> bool {
153        let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
154        if mutable_memtable_memory_usage >= self.mutable_limit {
155            debug!(
156                "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
157                mutable_memtable_memory_usage,
158                self.memory_usage(),
159                self.mutable_limit,
160                self.global_write_buffer_size,
161            );
162            return true;
163        }
164
165        let memory_usage = self.memory_used.load(Ordering::Relaxed);
166        if memory_usage >= self.global_write_buffer_size {
167            return true;
168        }
169
170        false
171    }
172
173    fn should_stall(&self) -> bool {
174        self.memory_usage() >= self.global_write_buffer_size
175    }
176
177    fn reserve_mem(&self, mem: usize) {
178        self.memory_used.fetch_add(mem, Ordering::Relaxed);
179        self.memory_active.fetch_add(mem, Ordering::Relaxed);
180    }
181
182    fn schedule_free_mem(&self, mem: usize) {
183        self.memory_active.fetch_sub(mem, Ordering::Relaxed);
184    }
185
186    fn free_mem(&self, mem: usize) {
187        self.memory_used.fetch_sub(mem, Ordering::Relaxed);
188        if let Some(notifier) = &self.notifier {
189            // Notifies the worker after the memory usage is decreased. When we drop the memtable
190            // outside of the worker, the worker may still stall requests because the memory usage
191            // is not updated. So we need to notify the worker to handle stalled requests again.
192            let _ = notifier.send(());
193        }
194    }
195
196    fn memory_usage(&self) -> usize {
197        self.memory_used.load(Ordering::Relaxed)
198    }
199
200    fn flush_limit(&self) -> usize {
201        self.mutable_limit
202    }
203}
204
205/// Reason of a flush task.
206#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
207pub enum FlushReason {
208    /// Other reasons.
209    Others,
210    /// Engine reaches flush threshold.
211    EngineFull,
212    /// Manual flush.
213    Manual,
214    /// Flush to alter table.
215    Alter,
216    /// Flush periodically.
217    Periodically,
218    /// Flush memtable during downgrading state.
219    Downgrading,
220    /// Enter staging mode.
221    EnterStaging,
222    /// Flush when region is closing.
223    Closing,
224}
225
226impl FlushReason {
227    /// Get flush reason as static str.
228    fn as_str(&self) -> &'static str {
229        self.into()
230    }
231}
232
233/// Task to flush a region.
234pub(crate) struct RegionFlushTask {
235    /// Region to flush.
236    pub(crate) region_id: RegionId,
237    /// Reason to flush.
238    pub(crate) reason: FlushReason,
239    /// Flush result senders.
240    pub(crate) senders: Vec<OutputTx>,
241    /// Request sender to notify the worker.
242    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
243
244    pub(crate) access_layer: AccessLayerRef,
245    pub(crate) listener: WorkerListener,
246    pub(crate) engine_config: Arc<MitoConfig>,
247    pub(crate) row_group_size: Option<usize>,
248    pub(crate) cache_manager: CacheManagerRef,
249    pub(crate) manifest_ctx: ManifestContextRef,
250
251    /// Index options for the region.
252    pub(crate) index_options: IndexOptions,
253    /// Semaphore to control flush concurrency.
254    pub(crate) flush_semaphore: Arc<Semaphore>,
255    /// Whether the region is in staging mode.
256    pub(crate) is_staging: bool,
257    /// Partition expression of the region.
258    ///
259    /// This is used to generate the file meta.
260    pub(crate) partition_expr: Option<String>,
261}
262
263impl RegionFlushTask {
264    /// Push the sender if it is not none.
265    pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
266        if let Some(sender) = sender.take_inner() {
267            self.senders.push(sender);
268        }
269    }
270
271    /// Consumes the task and notify the sender the job is success.
272    fn on_success(self) {
273        for sender in self.senders {
274            sender.send(Ok(0));
275        }
276    }
277
278    /// Send flush error to waiter.
279    fn on_failure(&mut self, err: Arc<Error>) {
280        for sender in self.senders.drain(..) {
281            sender.send(Err(err.clone()).context(FlushRegionSnafu {
282                region_id: self.region_id,
283            }));
284        }
285    }
286
287    /// Converts the flush task into a background job.
288    ///
289    /// We must call this in the region worker.
290    fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
291        // Get a version of this region before creating a job to get current
292        // wal entry id, sequence and immutable memtables.
293        let version_data = version_control.current();
294
295        Box::pin(async move {
296            INFLIGHT_FLUSH_COUNT.inc();
297            self.do_flush(version_data).await;
298            INFLIGHT_FLUSH_COUNT.dec();
299        })
300    }
301
302    /// Runs the flush task.
303    async fn do_flush(&mut self, version_data: VersionControlData) {
304        let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
305        self.listener.on_flush_begin(self.region_id).await;
306
307        let worker_request = match self.flush_memtables(&version_data).await {
308            Ok(edit) => {
309                let memtables_to_remove = version_data
310                    .version
311                    .memtables
312                    .immutables()
313                    .iter()
314                    .map(|m| m.id())
315                    .collect();
316                let flush_finished = FlushFinished {
317                    region_id: self.region_id,
318                    // The last entry has been flushed.
319                    flushed_entry_id: version_data.last_entry_id,
320                    senders: std::mem::take(&mut self.senders),
321                    _timer: timer,
322                    edit,
323                    memtables_to_remove,
324                    is_staging: self.is_staging,
325                    flush_reason: self.reason,
326                };
327                WorkerRequest::Background {
328                    region_id: self.region_id,
329                    notify: BackgroundNotify::FlushFinished(flush_finished),
330                }
331            }
332            Err(e) => {
333                error!(e; "Failed to flush region {}", self.region_id);
334                // Discard the timer.
335                timer.stop_and_discard();
336
337                let err = Arc::new(e);
338                self.on_failure(err.clone());
339                WorkerRequest::Background {
340                    region_id: self.region_id,
341                    notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
342                }
343            }
344        };
345        self.send_worker_request(worker_request).await;
346    }
347
348    /// Flushes memtables to level 0 SSTs and updates the manifest.
349    /// Returns the [RegionEdit] to apply.
350    async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
351        // We must use the immutable memtables list and entry ids from the `version_data`
352        // for consistency as others might already modify the version in the `version_control`.
353        let version = &version_data.version;
354        let timer = FLUSH_ELAPSED
355            .with_label_values(&["flush_memtables"])
356            .start_timer();
357
358        let mut write_opts = WriteOptions {
359            write_buffer_size: self.engine_config.sst_write_buffer_size,
360            ..Default::default()
361        };
362        if let Some(row_group_size) = self.row_group_size {
363            write_opts.row_group_size = row_group_size;
364        }
365
366        let DoFlushMemtablesResult {
367            file_metas,
368            flushed_bytes,
369            series_count,
370            encoded_part_count,
371            flush_metrics,
372        } = self.do_flush_memtables(version, write_opts).await?;
373
374        if !file_metas.is_empty() {
375            FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
376        }
377
378        let mut file_ids = Vec::with_capacity(file_metas.len());
379        let mut total_rows = 0;
380        let mut total_bytes = 0;
381        for meta in &file_metas {
382            file_ids.push(meta.file_id);
383            total_rows += meta.num_rows;
384            total_bytes += meta.file_size;
385        }
386        info!(
387            "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, encoded_part_count: {}, metrics: {:?}",
388            self.region_id,
389            self.reason.as_str(),
390            file_ids,
391            series_count,
392            total_rows,
393            total_bytes,
394            timer.stop_and_record(),
395            encoded_part_count,
396            flush_metrics,
397        );
398        flush_metrics.observe();
399
400        let edit = RegionEdit {
401            files_to_add: file_metas,
402            files_to_remove: Vec::new(),
403            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
404            compaction_time_window: None,
405            // The last entry has been flushed.
406            flushed_entry_id: Some(version_data.last_entry_id),
407            flushed_sequence: Some(version_data.committed_sequence),
408            committed_sequence: None,
409        };
410        info!(
411            "Applying {edit:?} to region {}, is_staging: {}",
412            self.region_id, self.is_staging
413        );
414
415        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
416
417        let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
418            RegionLeaderState::Downgrading
419        } else {
420            // Check if region is in staging mode
421            let current_state = self.manifest_ctx.current_state();
422            if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
423                RegionLeaderState::Staging
424            } else {
425                RegionLeaderState::Writable
426            }
427        };
428        // We will leak files if the manifest update fails, but we ignore them for simplicity. We can
429        // add a cleanup job to remove them later.
430        let version = self
431            .manifest_ctx
432            .update_manifest(expected_state, action_list, self.is_staging)
433            .await?;
434        info!(
435            "Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}",
436            self.region_id,
437            self.is_staging,
438            self.reason.as_str()
439        );
440
441        Ok(edit)
442    }
443
444    async fn do_flush_memtables(
445        &self,
446        version: &VersionRef,
447        write_opts: WriteOptions,
448    ) -> Result<DoFlushMemtablesResult> {
449        let memtables = version.memtables.immutables();
450        let mut file_metas = Vec::with_capacity(memtables.len());
451        let mut flushed_bytes = 0;
452        let mut series_count = 0;
453        let mut encoded_part_count = 0;
454        let mut flush_metrics = Metrics::new(WriteType::Flush);
455        let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
456        for mem in memtables {
457            if mem.is_empty() {
458                // Skip empty memtables.
459                continue;
460            }
461
462            // Compact the memtable first, this waits the background compaction to finish.
463            let compact_start = std::time::Instant::now();
464            if let Err(e) = mem.compact(true) {
465                common_telemetry::error!(e; "Failed to compact memtable before flush");
466            }
467            let compact_cost = compact_start.elapsed();
468            flush_metrics.compact_memtable += compact_cost;
469
470            // Sets `for_flush` flag to true.
471            let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
472            let num_mem_ranges = mem_ranges.ranges.len();
473
474            // Aggregate stats from all ranges
475            let num_mem_rows = mem_ranges.num_rows();
476            let memtable_series_count = mem_ranges.series_count();
477            let memtable_id = mem.id();
478            // Increases series count for each mem range. We consider each mem range has different series so
479            // the counter may have more series than the actual series count.
480            series_count += memtable_series_count;
481
482            let flush_start = Instant::now();
483            let FlushFlatMemResult {
484                num_encoded,
485                num_sources,
486                results,
487            } = self
488                .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
489                .await?;
490            encoded_part_count += num_encoded;
491            for (source_idx, result) in results.into_iter().enumerate() {
492                let (max_sequence, ssts_written, metrics) = result?;
493                if ssts_written.is_empty() {
494                    // No data written.
495                    continue;
496                }
497
498                common_telemetry::debug!(
499                    "Region {} flush one memtable {} {}/{}, metrics: {:?}",
500                    self.region_id,
501                    memtable_id,
502                    source_idx,
503                    num_sources,
504                    metrics
505                );
506
507                flush_metrics = flush_metrics.merge(metrics);
508
509                for sst_info in ssts_written {
510                    flushed_bytes += sst_info.file_size;
511                    let pk_range = sst_info
512                        .file_metadata
513                        .as_ref()
514                        .and_then(|meta| extract_primary_key_range(meta, &version.metadata));
515                    file_metas.push(Self::new_file_meta(
516                        self.region_id,
517                        max_sequence,
518                        sst_info,
519                        partition_expr.clone(),
520                        pk_range,
521                    ));
522                }
523            }
524
525            common_telemetry::debug!(
526                "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
527                self.region_id,
528                num_sources,
529                memtable_id,
530                num_mem_ranges,
531                num_encoded,
532                num_mem_rows,
533                flush_start.elapsed(),
534                compact_cost,
535            );
536        }
537
538        Ok(DoFlushMemtablesResult {
539            file_metas,
540            flushed_bytes,
541            series_count,
542            encoded_part_count,
543            flush_metrics,
544        })
545    }
546
547    async fn flush_flat_mem_ranges(
548        &self,
549        version: &VersionRef,
550        write_opts: &WriteOptions,
551        mem_ranges: MemtableRanges,
552    ) -> Result<FlushFlatMemResult> {
553        let batch_schema = to_flat_sst_arrow_schema(
554            &version.metadata,
555            &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
556        );
557        let field_column_start =
558            flat_format::field_column_start(&version.metadata, batch_schema.fields().len());
559        let flat_sources = memtable_flat_sources(
560            batch_schema,
561            mem_ranges,
562            &version.options,
563            field_column_start,
564        )?;
565        let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
566        let num_encoded = flat_sources.encoded.len();
567        for (source, max_sequence) in flat_sources.sources {
568            let write_request = self.new_write_request(version, max_sequence, source);
569            let access_layer = self.access_layer.clone();
570            let write_opts = write_opts.clone();
571            let semaphore = self.flush_semaphore.clone();
572            let task = common_runtime::spawn_global(async move {
573                let _permit = semaphore.acquire().await.unwrap();
574                let mut metrics = Metrics::new(WriteType::Flush);
575                let ssts = access_layer
576                    .write_sst(write_request, &write_opts, &mut metrics)
577                    .await?;
578                FLUSH_FILE_TOTAL.inc_by(ssts.len() as u64);
579                Ok((max_sequence, ssts, metrics))
580            });
581            tasks.push(task);
582        }
583        for (encoded, max_sequence) in flat_sources.encoded {
584            let access_layer = self.access_layer.clone();
585            let cache_manager = self.cache_manager.clone();
586            let region_id = version.metadata.region_id;
587            let semaphore = self.flush_semaphore.clone();
588            let task = common_runtime::spawn_global(async move {
589                let _permit = semaphore.acquire().await.unwrap();
590                let metrics = access_layer
591                    .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
592                    .await?;
593                FLUSH_FILE_TOTAL.inc();
594                Ok((max_sequence, smallvec![encoded.sst_info], metrics))
595            });
596            tasks.push(task);
597        }
598        let num_sources = tasks.len();
599        let results = futures::future::try_join_all(tasks)
600            .await
601            .context(JoinSnafu)?;
602        Ok(FlushFlatMemResult {
603            num_encoded,
604            num_sources,
605            results,
606        })
607    }
608
609    fn new_file_meta(
610        region_id: RegionId,
611        max_sequence: u64,
612        sst_info: SstInfo,
613        partition_expr: Option<PartitionExpr>,
614        primary_key_range: Option<(Bytes, Bytes)>,
615    ) -> FileMeta {
616        let (primary_key_min, primary_key_max) = match primary_key_range {
617            Some((min, max)) => (Some(min), Some(max)),
618            None => (None, None),
619        };
620        FileMeta {
621            region_id,
622            file_id: sst_info.file_id,
623            time_range: sst_info.time_range,
624            level: 0,
625            file_size: sst_info.file_size,
626            max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
627            available_indexes: sst_info.index_metadata.build_available_indexes(),
628            indexes: sst_info.index_metadata.build_indexes(),
629            index_file_size: sst_info.index_metadata.file_size,
630            index_version: 0,
631            num_rows: sst_info.num_rows as u64,
632            num_row_groups: sst_info.num_row_groups,
633            sequence: NonZeroU64::new(max_sequence),
634            partition_expr,
635            num_series: sst_info.num_series,
636            primary_key_min,
637            primary_key_max,
638        }
639    }
640
641    fn new_write_request(
642        &self,
643        version: &VersionRef,
644        max_sequence: u64,
645        source: FlatSource,
646    ) -> SstWriteRequest {
647        let flat_format = version
648            .options
649            .sst_format
650            .map(|f| f == FormatType::Flat)
651            .unwrap_or(self.engine_config.default_flat_format);
652        SstWriteRequest {
653            op_type: OperationType::Flush,
654            metadata: version.metadata.clone(),
655            source,
656            cache_manager: self.cache_manager.clone(),
657            storage: version.options.storage.clone(),
658            max_sequence: Some(max_sequence),
659            sst_write_format: if flat_format {
660                FormatType::Flat
661            } else {
662                FormatType::PrimaryKey
663            },
664            index_options: self.index_options.clone(),
665            index_config: self.engine_config.index.clone(),
666            inverted_index_config: self.engine_config.inverted_index.clone(),
667            fulltext_index_config: self.engine_config.fulltext_index.clone(),
668            bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
669            #[cfg(feature = "vector_index")]
670            vector_index_config: self.engine_config.vector_index.clone(),
671        }
672    }
673
674    /// Notify flush job status.
675    pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
676        if let Err(e) = self
677            .request_sender
678            .send(WorkerRequestWithTime::new(request))
679            .await
680        {
681            error!(
682                "Failed to notify flush job status for region {}, request: {:?}",
683                self.region_id, e.0
684            );
685        }
686    }
687
688    /// Merge two flush tasks.
689    fn merge(&mut self, mut other: RegionFlushTask) {
690        assert_eq!(self.region_id, other.region_id);
691        // Now we only merge senders. They share the same flush reason.
692        self.senders.append(&mut other.senders);
693    }
694}
695
696struct FlushFlatMemResult {
697    num_encoded: usize,
698    num_sources: usize,
699    results: Vec<Result<(SequenceNumber, SstInfoArray, Metrics)>>,
700}
701
702struct DoFlushMemtablesResult {
703    file_metas: Vec<FileMeta>,
704    flushed_bytes: u64,
705    series_count: usize,
706    encoded_part_count: usize,
707    flush_metrics: Metrics,
708}
709
710struct FlatSources {
711    sources: SmallVec<[(FlatSource, SequenceNumber); 4]>,
712    encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
713}
714
715/// Returns the max sequence and [FlatSource] for the given memtable.
716fn memtable_flat_sources(
717    schema: SchemaRef,
718    mem_ranges: MemtableRanges,
719    options: &RegionOptions,
720    field_column_start: usize,
721) -> Result<FlatSources> {
722    let MemtableRanges { ranges } = mem_ranges;
723    let mut flat_sources = FlatSources {
724        sources: SmallVec::new(),
725        encoded: SmallVec::new(),
726    };
727
728    if ranges.len() == 1 {
729        debug!("Flushing single flat range");
730
731        let only_range = ranges.into_values().next().unwrap();
732        let max_sequence = only_range.stats().max_sequence();
733        if let Some(encoded) = only_range.encoded() {
734            flat_sources.encoded.push((encoded, max_sequence));
735        } else {
736            let iter = only_range.build_record_batch_iter(None, None)?;
737            // Dedup according to append mode and merge mode.
738            // Even single range may have duplicate rows.
739            let iter = maybe_dedup_one(
740                options.append_mode,
741                options.merge_mode(),
742                field_column_start,
743                iter,
744            );
745            flat_sources
746                .sources
747                .push((FlatSource::Iter(iter), max_sequence));
748        };
749    } else {
750        let min_flush_rows = *ENCODE_ROW_THRESHOLD;
751        // Calculate total rows from non-encoded ranges.
752        let total_rows: usize = ranges
753            .values()
754            .filter(|r| r.encoded().is_none())
755            .map(|r| r.num_rows())
756            .sum();
757        debug!(
758            "Flushing multiple flat ranges, total_rows: {}, min_flush_rows: {}, num_ranges: {}",
759            total_rows,
760            min_flush_rows,
761            ranges.len()
762        );
763        let mut rows_remaining = total_rows;
764        let mut last_iter_rows = 0;
765        let num_ranges = ranges.len();
766        let mut input_iters = Vec::with_capacity(num_ranges);
767        let mut current_ranges = Vec::new();
768        for (_range_id, range) in ranges {
769            if let Some(encoded) = range.encoded() {
770                let max_sequence = range.stats().max_sequence();
771                flat_sources.encoded.push((encoded, max_sequence));
772                continue;
773            }
774
775            let iter = range.build_record_batch_iter(None, None)?;
776            input_iters.push(iter);
777            let range_rows = range.num_rows();
778            last_iter_rows += range_rows;
779            rows_remaining -= range_rows;
780            current_ranges.push(range);
781
782            // Flush if we have enough rows, but don't flush if the remaining rows
783            // would be less than DEFAULT_ROW_GROUP_SIZE (to avoid small last files).
784            if last_iter_rows >= min_flush_rows
785                && (rows_remaining == 0 || rows_remaining >= DEFAULT_ROW_GROUP_SIZE)
786            {
787                debug!(
788                    "Flush batch ready, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
789                    last_iter_rows,
790                    min_flush_rows,
791                    input_iters.len(),
792                    rows_remaining
793                );
794
795                // Calculate max_sequence from all merged ranges
796                let max_sequence = current_ranges
797                    .iter()
798                    .map(|r| r.stats().max_sequence())
799                    .max()
800                    .unwrap_or(0);
801
802                let maybe_dedup = merge_and_dedup(
803                    &schema,
804                    options.append_mode,
805                    options.merge_mode(),
806                    field_column_start,
807                    std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
808                )?;
809
810                flat_sources
811                    .sources
812                    .push((FlatSource::Iter(maybe_dedup), max_sequence));
813                last_iter_rows = 0;
814                current_ranges.clear();
815            }
816        }
817
818        // Handle remaining iters.
819        if !input_iters.is_empty() {
820            debug!(
821                "Flush remaining batch, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
822                last_iter_rows,
823                min_flush_rows,
824                input_iters.len(),
825                rows_remaining
826            );
827            let max_sequence = current_ranges
828                .iter()
829                .map(|r| r.stats().max_sequence())
830                .max()
831                .unwrap_or(0);
832
833            let maybe_dedup = merge_and_dedup(
834                &schema,
835                options.append_mode,
836                options.merge_mode(),
837                field_column_start,
838                input_iters,
839            )?;
840
841            flat_sources
842                .sources
843                .push((FlatSource::Iter(maybe_dedup), max_sequence));
844        }
845    }
846
847    Ok(flat_sources)
848}
849
850/// Merges multiple record batch iterators and applies deduplication based on the specified mode.
851///
852/// This function is used during the flush process to combine data from multiple memtable ranges
853/// into a single stream while handling duplicate records according to the configured merge strategy.
854///
855/// # Arguments
856///
857/// * `schema` - The Arrow schema reference that defines the structure of the record batches
858/// * `append_mode` - When true, no deduplication is performed and all records are preserved.
859///                  This is used for append-only workloads where duplicate handling is not required.
860/// * `merge_mode` - The strategy used for deduplication when not in append mode:
861///   - `MergeMode::LastRow`: Keeps the last record for each primary key
862///   - `MergeMode::LastNonNull`: Keeps the last non-null values for each field
863/// * `field_column_start` - The starting column index for fields in the record batch.
864///                          Used when `MergeMode::LastNonNull` to identify which columns
865///                          contain field values versus primary key columns.
866/// * `input_iters` - A vector of record batch iterators to be merged and deduplicated
867///
868/// # Returns
869///
870/// Returns a boxed record batch iterator that yields the merged and potentially deduplicated
871/// record batches.
872///
873/// # Behavior
874///
875/// 1. Creates a `FlatMergeIterator` to merge all input iterators in sorted order based on
876///    primary key and timestamp
877/// 2. If `append_mode` is true, returns the merge iterator directly without deduplication
878/// 3. If `append_mode` is false, wraps the merge iterator with a `FlatDedupIterator` that
879///    applies the specified merge mode:
880///    - `LastRow`: Removes duplicate rows, keeping only the last one
881///    - `LastNonNull`: Removes duplicates but preserves the last non-null value for each field
882///
883/// # Examples
884///
885/// ```ignore
886/// let merged_iter = merge_and_dedup(
887///     &schema,
888///     false,  // not append mode, apply dedup
889///     MergeMode::LastRow,
890///     2,  // fields start at column 2 after primary key columns
891///     vec![iter1, iter2, iter3],
892/// )?;
893/// ```
894pub fn merge_and_dedup(
895    schema: &SchemaRef,
896    append_mode: bool,
897    merge_mode: MergeMode,
898    field_column_start: usize,
899    input_iters: Vec<BoxedRecordBatchIterator>,
900) -> Result<BoxedRecordBatchIterator> {
901    let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
902    let maybe_dedup = if append_mode {
903        // No dedup in append mode
904        Box::new(merge_iter) as _
905    } else {
906        // Dedup according to merge mode.
907        match merge_mode {
908            MergeMode::LastRow => {
909                Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
910            }
911            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
912                merge_iter,
913                FlatLastNonNull::new(field_column_start, false),
914            )) as _,
915        }
916    };
917    Ok(maybe_dedup)
918}
919
920pub fn maybe_dedup_one(
921    append_mode: bool,
922    merge_mode: MergeMode,
923    field_column_start: usize,
924    input_iter: BoxedRecordBatchIterator,
925) -> BoxedRecordBatchIterator {
926    if append_mode {
927        // No dedup in append mode
928        input_iter
929    } else {
930        // Dedup according to merge mode.
931        match merge_mode {
932            MergeMode::LastRow => {
933                Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
934            }
935            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
936                input_iter,
937                FlatLastNonNull::new(field_column_start, false),
938            )),
939        }
940    }
941}
942
943/// Manages background flushes of a worker.
944pub(crate) struct FlushScheduler {
945    /// Tracks regions need to flush.
946    region_status: HashMap<RegionId, FlushStatus>,
947    /// Background job scheduler.
948    scheduler: SchedulerRef,
949}
950
951impl FlushScheduler {
952    /// Creates a new flush scheduler.
953    pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
954        FlushScheduler {
955            region_status: HashMap::new(),
956            scheduler,
957        }
958    }
959
960    /// Returns true if the region already requested flush.
961    pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
962        self.region_status.contains_key(&region_id)
963    }
964
965    fn schedule_flush_task(
966        &mut self,
967        version_control: &VersionControlRef,
968        task: RegionFlushTask,
969    ) -> Result<()> {
970        let region_id = task.region_id;
971
972        // If current region doesn't have flush status, we can flush the region directly.
973        if let Err(e) = version_control.freeze_mutable() {
974            error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
975
976            return Err(e);
977        }
978        // Submit a flush job.
979        let job = task.into_flush_job(version_control);
980        if let Err(e) = self.scheduler.schedule(job) {
981            // If scheduler returns error, senders in the job will be dropped and waiters
982            // can get recv errors.
983            error!(e; "Failed to schedule flush job for region {}", region_id);
984
985            return Err(e);
986        }
987        Ok(())
988    }
989
990    /// Schedules a flush `task` for specific `region`.
991    pub(crate) fn schedule_flush(
992        &mut self,
993        region_id: RegionId,
994        version_control: &VersionControlRef,
995        task: RegionFlushTask,
996    ) -> Result<()> {
997        debug_assert_eq!(region_id, task.region_id);
998
999        let version = version_control.current().version;
1000        if version.memtables.is_empty() {
1001            debug_assert!(!self.region_status.contains_key(&region_id));
1002            // The region has nothing to flush.
1003            task.on_success();
1004            return Ok(());
1005        }
1006
1007        // Don't increase the counter if a region has nothing to flush.
1008        FLUSH_REQUESTS_TOTAL
1009            .with_label_values(&[task.reason.as_str()])
1010            .inc();
1011
1012        // If current region has flush status, merge the task.
1013        if let Some(flush_status) = self.region_status.get_mut(&region_id) {
1014            // Checks whether we can flush the region now.
1015            debug!("Merging flush task for region {}", region_id);
1016            flush_status.merge_task(task);
1017            return Ok(());
1018        }
1019
1020        self.schedule_flush_task(version_control, task)?;
1021
1022        // Add this region to status map.
1023        let _ = self.region_status.insert(
1024            region_id,
1025            FlushStatus::new(region_id, version_control.clone()),
1026        );
1027
1028        Ok(())
1029    }
1030
1031    /// Notifies the scheduler that the flush job is finished.
1032    ///
1033    /// Returns all pending requests if the region doesn't need to flush again.
1034    pub(crate) fn on_flush_success(
1035        &mut self,
1036        region_id: RegionId,
1037    ) -> Option<(
1038        Vec<SenderDdlRequest>,
1039        Vec<SenderWriteRequest>,
1040        Vec<SenderBulkRequest>,
1041    )> {
1042        let flush_status = self.region_status.get_mut(&region_id)?;
1043        // If region doesn't have any pending flush task, we need to remove it from the status.
1044        if flush_status.pending_task.is_none() {
1045            // The region doesn't have any pending flush task.
1046            // Safety: The flush status must exist.
1047            debug!(
1048                "Region {} doesn't have any pending flush task, removing it from the status",
1049                region_id
1050            );
1051            let flush_status = self.region_status.remove(&region_id).unwrap();
1052            return Some((
1053                flush_status.pending_ddls,
1054                flush_status.pending_writes,
1055                flush_status.pending_bulk_writes,
1056            ));
1057        }
1058
1059        // If region has pending task, but has nothing to flush, we need to remove it from the status.
1060        let version_data = flush_status.version_control.current();
1061        if version_data.version.memtables.is_empty() {
1062            // The region has nothing to flush, we also need to remove it from the status.
1063            // Safety: The pending task is not None.
1064            let task = flush_status.pending_task.take().unwrap();
1065            // The region has nothing to flush. We can notify pending task.
1066            task.on_success();
1067            debug!(
1068                "Region {} has nothing to flush, removing it from the status",
1069                region_id
1070            );
1071            // Safety: The flush status must exist.
1072            let flush_status = self.region_status.remove(&region_id).unwrap();
1073            return Some((
1074                flush_status.pending_ddls,
1075                flush_status.pending_writes,
1076                flush_status.pending_bulk_writes,
1077            ));
1078        }
1079
1080        // If region has pending task and has something to flush, we need to schedule it.
1081        debug!("Scheduling pending flush task for region {}", region_id);
1082        // Safety: The flush status must exist.
1083        let task = flush_status.pending_task.take().unwrap();
1084        let version_control = flush_status.version_control.clone();
1085        if let Err(err) = self.schedule_flush_task(&version_control, task) {
1086            error!(
1087                err;
1088                "Flush succeeded for region {region_id}, but failed to schedule next flush for it."
1089            );
1090        }
1091        // We can flush the region again, keep it in the region status.
1092        None
1093    }
1094
1095    /// Notifies the scheduler that the flush job is failed.
1096    pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1097        error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1098
1099        FLUSH_FAILURE_TOTAL.inc();
1100
1101        // Remove this region.
1102        let Some(flush_status) = self.region_status.remove(&region_id) else {
1103            return;
1104        };
1105
1106        // Fast fail: cancels all pending tasks and sends error to their waiters.
1107        flush_status.on_failure(err);
1108    }
1109
1110    /// Notifies the scheduler that the region is dropped.
1111    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1112        self.remove_region_on_failure(
1113            region_id,
1114            Arc::new(RegionDroppedSnafu { region_id }.build()),
1115        );
1116    }
1117
1118    /// Notifies the scheduler that the region is closed.
1119    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1120        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1121    }
1122
1123    /// Notifies the scheduler that the region is truncated.
1124    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1125        self.remove_region_on_failure(
1126            region_id,
1127            Arc::new(RegionTruncatedSnafu { region_id }.build()),
1128        );
1129    }
1130
1131    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1132        // Remove this region.
1133        let Some(flush_status) = self.region_status.remove(&region_id) else {
1134            return;
1135        };
1136
1137        // Notifies all pending tasks.
1138        flush_status.on_failure(err);
1139    }
1140
1141    /// Add ddl request to pending queue.
1142    ///
1143    /// # Panics
1144    /// Panics if region didn't request flush.
1145    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1146        let status = self.region_status.get_mut(&request.region_id).unwrap();
1147        status.pending_ddls.push(request);
1148    }
1149
1150    /// Add write request to pending queue.
1151    ///
1152    /// # Panics
1153    /// Panics if region didn't request flush.
1154    pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1155        let status = self
1156            .region_status
1157            .get_mut(&request.request.region_id)
1158            .unwrap();
1159        status.pending_writes.push(request);
1160    }
1161
1162    /// Add bulk write request to pending queue.
1163    ///
1164    /// # Panics
1165    /// Panics if region didn't request flush.
1166    pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1167        let status = self.region_status.get_mut(&request.region_id).unwrap();
1168        status.pending_bulk_writes.push(request);
1169    }
1170
1171    /// Returns true if the region has pending DDLs.
1172    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1173        self.region_status
1174            .get(&region_id)
1175            .map(|status| !status.pending_ddls.is_empty())
1176            .unwrap_or(false)
1177    }
1178}
1179
1180impl Drop for FlushScheduler {
1181    fn drop(&mut self) {
1182        for (region_id, flush_status) in self.region_status.drain() {
1183            // We are shutting down so notify all pending tasks.
1184            flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1185        }
1186    }
1187}
1188
1189/// Flush status of a region scheduled by the [FlushScheduler].
1190///
1191/// Tracks running and pending flush tasks and all pending requests of a region.
1192struct FlushStatus {
1193    /// Current region.
1194    region_id: RegionId,
1195    /// Version control of the region.
1196    version_control: VersionControlRef,
1197    /// Task waiting for next flush.
1198    pending_task: Option<RegionFlushTask>,
1199    /// Pending ddl requests.
1200    pending_ddls: Vec<SenderDdlRequest>,
1201    /// Requests waiting to write after altering the region.
1202    pending_writes: Vec<SenderWriteRequest>,
1203    /// Bulk requests waiting to write after altering the region.
1204    pending_bulk_writes: Vec<SenderBulkRequest>,
1205}
1206
1207impl FlushStatus {
1208    fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1209        FlushStatus {
1210            region_id,
1211            version_control,
1212            pending_task: None,
1213            pending_ddls: Vec::new(),
1214            pending_writes: Vec::new(),
1215            pending_bulk_writes: Vec::new(),
1216        }
1217    }
1218
1219    /// Merges the task to pending task.
1220    fn merge_task(&mut self, task: RegionFlushTask) {
1221        if let Some(pending) = &mut self.pending_task {
1222            pending.merge(task);
1223        } else {
1224            self.pending_task = Some(task);
1225        }
1226    }
1227
1228    fn on_failure(self, err: Arc<Error>) {
1229        if let Some(mut task) = self.pending_task {
1230            task.on_failure(err.clone());
1231        }
1232        for ddl in self.pending_ddls {
1233            ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1234                region_id: self.region_id,
1235            }));
1236        }
1237        for write_req in self.pending_writes {
1238            write_req
1239                .sender
1240                .send(Err(err.clone()).context(FlushRegionSnafu {
1241                    region_id: self.region_id,
1242                }));
1243        }
1244    }
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249    use mito_codec::row_converter::build_primary_key_codec;
1250    use tokio::sync::oneshot;
1251
1252    use super::*;
1253    use crate::cache::CacheManager;
1254    use crate::memtable::bulk::part::BulkPartConverter;
1255    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1256    use crate::memtable::{Memtable, RangesOptions};
1257    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1258    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1259    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1260    use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1261
1262    #[test]
1263    fn test_get_mutable_limit() {
1264        assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1265        assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1266        assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1267        assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1268    }
1269
1270    #[test]
1271    fn test_over_mutable_limit() {
1272        // Mutable limit is 500.
1273        let manager = WriteBufferManagerImpl::new(1000);
1274        manager.reserve_mem(400);
1275        assert!(!manager.should_flush_engine());
1276        assert!(!manager.should_stall());
1277
1278        // More than mutable limit.
1279        manager.reserve_mem(400);
1280        assert!(manager.should_flush_engine());
1281
1282        // Freezes mutable.
1283        manager.schedule_free_mem(400);
1284        assert!(!manager.should_flush_engine());
1285        assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1286        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1287
1288        // Releases immutable.
1289        manager.free_mem(400);
1290        assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1291        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1292    }
1293
1294    #[test]
1295    fn test_over_global() {
1296        // Mutable limit is 500.
1297        let manager = WriteBufferManagerImpl::new(1000);
1298        manager.reserve_mem(1100);
1299        assert!(manager.should_stall());
1300        // Global usage is still 1100.
1301        manager.schedule_free_mem(200);
1302        assert!(manager.should_flush_engine());
1303        assert!(manager.should_stall());
1304
1305        // More than global limit, mutable (1100-200-450=450) is less than mutable limit (< 500).
1306        manager.schedule_free_mem(450);
1307        assert!(manager.should_flush_engine());
1308        assert!(manager.should_stall());
1309
1310        // Now mutable is enough.
1311        manager.reserve_mem(50);
1312        assert!(manager.should_flush_engine());
1313        manager.reserve_mem(100);
1314        assert!(manager.should_flush_engine());
1315    }
1316
1317    #[test]
1318    fn test_manager_notify() {
1319        let (sender, receiver) = watch::channel(());
1320        let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1321        manager.reserve_mem(500);
1322        assert!(!receiver.has_changed().unwrap());
1323        manager.schedule_free_mem(500);
1324        assert!(!receiver.has_changed().unwrap());
1325        manager.free_mem(500);
1326        assert!(receiver.has_changed().unwrap());
1327    }
1328
1329    #[tokio::test]
1330    async fn test_schedule_empty() {
1331        let env = SchedulerEnv::new().await;
1332        let (tx, _rx) = mpsc::channel(4);
1333        let mut scheduler = env.mock_flush_scheduler();
1334        let builder = VersionControlBuilder::new();
1335
1336        let version_control = Arc::new(builder.build());
1337        let (output_tx, output_rx) = oneshot::channel();
1338        let mut task = RegionFlushTask {
1339            region_id: builder.region_id(),
1340            reason: FlushReason::Others,
1341            senders: Vec::new(),
1342            request_sender: tx,
1343            access_layer: env.access_layer.clone(),
1344            listener: WorkerListener::default(),
1345            engine_config: Arc::new(MitoConfig::default()),
1346            row_group_size: None,
1347            cache_manager: Arc::new(CacheManager::default()),
1348            manifest_ctx: env
1349                .mock_manifest_context(version_control.current().version.metadata.clone())
1350                .await,
1351            index_options: IndexOptions::default(),
1352            flush_semaphore: Arc::new(Semaphore::new(2)),
1353            is_staging: false,
1354            partition_expr: None,
1355        };
1356        task.push_sender(OptionOutputTx::from(output_tx));
1357        scheduler
1358            .schedule_flush(builder.region_id(), &version_control, task)
1359            .unwrap();
1360        assert!(scheduler.region_status.is_empty());
1361        let output = output_rx.await.unwrap().unwrap();
1362        assert_eq!(output, 0);
1363        assert!(scheduler.region_status.is_empty());
1364    }
1365
1366    #[tokio::test]
1367    async fn test_schedule_pending_request() {
1368        let job_scheduler = Arc::new(VecScheduler::default());
1369        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1370        let (tx, _rx) = mpsc::channel(4);
1371        let mut scheduler = env.mock_flush_scheduler();
1372        let mut builder = VersionControlBuilder::new();
1373        // Overwrites the empty memtable builder.
1374        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1375        let version_control = Arc::new(builder.build());
1376        // Writes data to the memtable so it is not empty.
1377        let version_data = version_control.current();
1378        write_rows_to_version(&version_data.version, "host0", 0, 10);
1379        let manifest_ctx = env
1380            .mock_manifest_context(version_data.version.metadata.clone())
1381            .await;
1382        // Creates 3 tasks.
1383        let mut tasks: Vec<_> = (0..3)
1384            .map(|_| RegionFlushTask {
1385                region_id: builder.region_id(),
1386                reason: FlushReason::Others,
1387                senders: Vec::new(),
1388                request_sender: tx.clone(),
1389                access_layer: env.access_layer.clone(),
1390                listener: WorkerListener::default(),
1391                engine_config: Arc::new(MitoConfig::default()),
1392                row_group_size: None,
1393                cache_manager: Arc::new(CacheManager::default()),
1394                manifest_ctx: manifest_ctx.clone(),
1395                index_options: IndexOptions::default(),
1396                flush_semaphore: Arc::new(Semaphore::new(2)),
1397                is_staging: false,
1398                partition_expr: None,
1399            })
1400            .collect();
1401        // Schedule first task.
1402        let task = tasks.pop().unwrap();
1403        scheduler
1404            .schedule_flush(builder.region_id(), &version_control, task)
1405            .unwrap();
1406        // Should schedule 1 flush.
1407        assert_eq!(1, scheduler.region_status.len());
1408        assert_eq!(1, job_scheduler.num_jobs());
1409        // Check the new version.
1410        let version_data = version_control.current();
1411        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1412        // Schedule remaining tasks.
1413        let output_rxs: Vec<_> = tasks
1414            .into_iter()
1415            .map(|mut task| {
1416                let (output_tx, output_rx) = oneshot::channel();
1417                task.push_sender(OptionOutputTx::from(output_tx));
1418                scheduler
1419                    .schedule_flush(builder.region_id(), &version_control, task)
1420                    .unwrap();
1421                output_rx
1422            })
1423            .collect();
1424        // Assumes the flush job is finished.
1425        version_control.apply_edit(
1426            Some(RegionEdit {
1427                files_to_add: Vec::new(),
1428                files_to_remove: Vec::new(),
1429                timestamp_ms: None,
1430                compaction_time_window: None,
1431                flushed_entry_id: None,
1432                flushed_sequence: None,
1433                committed_sequence: None,
1434            }),
1435            &[0],
1436            builder.file_purger(),
1437        );
1438        scheduler.on_flush_success(builder.region_id());
1439        // No new flush task.
1440        assert_eq!(1, job_scheduler.num_jobs());
1441        // The flush status is cleared.
1442        assert!(scheduler.region_status.is_empty());
1443        for output_rx in output_rxs {
1444            let output = output_rx.await.unwrap().unwrap();
1445            assert_eq!(output, 0);
1446        }
1447    }
1448
1449    // Verifies single-range flat flush path respects append_mode (no dedup) vs dedup when disabled.
1450    #[test]
1451    fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1452        // Build test metadata and flat schema
1453        let metadata = metadata_for_test();
1454        let schema = to_flat_sst_arrow_schema(
1455            &metadata,
1456            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1457        );
1458
1459        // Prepare a bulk part containing duplicate rows for the same PK and timestamp
1460        // Two rows with identical keys and timestamps (ts = 1000), different field values
1461        let capacity = 16;
1462        let pk_codec = build_primary_key_codec(&metadata);
1463        let mut converter =
1464            BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1465        let kvs = build_key_values_with_ts_seq_values(
1466            &metadata,
1467            "dup_key".to_string(),
1468            1,
1469            vec![1000i64, 1000i64].into_iter(),
1470            vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1471            1,
1472        );
1473        converter.append_key_values(&kvs).unwrap();
1474        let part = converter.convert().unwrap();
1475
1476        // Helper to build MemtableRanges with a single range from one bulk part.
1477        // We use BulkMemtable directly because it produces record batch iterators.
1478        let build_ranges = |append_mode: bool| -> MemtableRanges {
1479            let memtable = crate::memtable::bulk::BulkMemtable::new(
1480                1,
1481                crate::memtable::bulk::BulkMemtableConfig::default(),
1482                metadata.clone(),
1483                None,
1484                None,
1485                append_mode,
1486                MergeMode::LastRow,
1487            );
1488            memtable.write_bulk(part.clone()).unwrap();
1489            memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1490        };
1491
1492        // Case 1: append_mode = false => dedup happens, total rows should be 1
1493        {
1494            let mem_ranges = build_ranges(false);
1495            assert_eq!(1, mem_ranges.ranges.len());
1496
1497            let options = RegionOptions {
1498                append_mode: false,
1499                merge_mode: Some(MergeMode::LastRow),
1500                ..Default::default()
1501            };
1502
1503            let flat_sources = memtable_flat_sources(
1504                schema.clone(),
1505                mem_ranges,
1506                &options,
1507                metadata.primary_key.len(),
1508            )
1509            .unwrap();
1510            assert!(flat_sources.encoded.is_empty());
1511            assert_eq!(1, flat_sources.sources.len());
1512
1513            // Consume the iterator and count rows
1514            let mut total_rows = 0usize;
1515            for (source, _sequence) in flat_sources.sources {
1516                match source {
1517                    crate::read::FlatSource::Iter(iter) => {
1518                        for rb in iter {
1519                            total_rows += rb.unwrap().num_rows();
1520                        }
1521                    }
1522                    crate::read::FlatSource::Stream(_) => unreachable!(),
1523                }
1524            }
1525            assert_eq!(1, total_rows, "dedup should keep a single row");
1526        }
1527
1528        // Case 2: append_mode = true => no dedup, total rows should be 2
1529        {
1530            let mem_ranges = build_ranges(true);
1531            assert_eq!(1, mem_ranges.ranges.len());
1532
1533            let options = RegionOptions {
1534                append_mode: true,
1535                ..Default::default()
1536            };
1537
1538            let flat_sources =
1539                memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1540                    .unwrap();
1541            assert!(flat_sources.encoded.is_empty());
1542            assert_eq!(1, flat_sources.sources.len());
1543
1544            let mut total_rows = 0usize;
1545            for (source, _sequence) in flat_sources.sources {
1546                match source {
1547                    crate::read::FlatSource::Iter(iter) => {
1548                        for rb in iter {
1549                            total_rows += rb.unwrap().num_rows();
1550                        }
1551                    }
1552                    crate::read::FlatSource::Stream(_) => unreachable!(),
1553                }
1554            }
1555            assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1556        }
1557    }
1558
1559    #[tokio::test]
1560    async fn test_schedule_pending_request_on_flush_success() {
1561        common_telemetry::init_default_ut_logging();
1562        let job_scheduler = Arc::new(VecScheduler::default());
1563        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1564        let (tx, _rx) = mpsc::channel(4);
1565        let mut scheduler = env.mock_flush_scheduler();
1566        let mut builder = VersionControlBuilder::new();
1567        // Overwrites the empty memtable builder.
1568        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1569        let version_control = Arc::new(builder.build());
1570        // Writes data to the memtable so it is not empty.
1571        let version_data = version_control.current();
1572        write_rows_to_version(&version_data.version, "host0", 0, 10);
1573        let manifest_ctx = env
1574            .mock_manifest_context(version_data.version.metadata.clone())
1575            .await;
1576        // Creates 2 tasks.
1577        let mut tasks: Vec<_> = (0..2)
1578            .map(|_| RegionFlushTask {
1579                region_id: builder.region_id(),
1580                reason: FlushReason::Others,
1581                senders: Vec::new(),
1582                request_sender: tx.clone(),
1583                access_layer: env.access_layer.clone(),
1584                listener: WorkerListener::default(),
1585                engine_config: Arc::new(MitoConfig::default()),
1586                row_group_size: None,
1587                cache_manager: Arc::new(CacheManager::default()),
1588                manifest_ctx: manifest_ctx.clone(),
1589                index_options: IndexOptions::default(),
1590                flush_semaphore: Arc::new(Semaphore::new(2)),
1591                is_staging: false,
1592                partition_expr: None,
1593            })
1594            .collect();
1595        // Schedule first task.
1596        let task = tasks.pop().unwrap();
1597        scheduler
1598            .schedule_flush(builder.region_id(), &version_control, task)
1599            .unwrap();
1600        // Should schedule 1 flush.
1601        assert_eq!(1, scheduler.region_status.len());
1602        assert_eq!(1, job_scheduler.num_jobs());
1603        // Schedule second task.
1604        let task = tasks.pop().unwrap();
1605        scheduler
1606            .schedule_flush(builder.region_id(), &version_control, task)
1607            .unwrap();
1608        assert!(
1609            scheduler
1610                .region_status
1611                .get(&builder.region_id())
1612                .unwrap()
1613                .pending_task
1614                .is_some()
1615        );
1616
1617        // Check the new version.
1618        let version_data = version_control.current();
1619        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1620        // Assumes the flush job is finished.
1621        version_control.apply_edit(
1622            Some(RegionEdit {
1623                files_to_add: Vec::new(),
1624                files_to_remove: Vec::new(),
1625                timestamp_ms: None,
1626                compaction_time_window: None,
1627                flushed_entry_id: None,
1628                flushed_sequence: None,
1629                committed_sequence: None,
1630            }),
1631            &[0],
1632            builder.file_purger(),
1633        );
1634        write_rows_to_version(&version_data.version, "host1", 0, 10);
1635        scheduler.on_flush_success(builder.region_id());
1636        assert_eq!(2, job_scheduler.num_jobs());
1637        // The pending task is cleared.
1638        assert!(
1639            scheduler
1640                .region_status
1641                .get(&builder.region_id())
1642                .unwrap()
1643                .pending_task
1644                .is_none()
1645        );
1646    }
1647}