Skip to main content

mito2/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Flush related utilities and structs.
16
17use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use bytes::Bytes;
24use common_telemetry::{debug, error, info};
25use datatypes::arrow::datatypes::SchemaRef;
26use datatypes::extension::json::is_structured_json_field;
27use partition::expr::PartitionExpr;
28use smallvec::{SmallVec, smallvec};
29use snafu::ResultExt;
30use store_api::region_request::RegionFlushReason;
31use store_api::storage::{RegionId, SequenceNumber};
32use strum::IntoStaticStr;
33use tokio::sync::{Semaphore, mpsc, watch};
34
35use crate::access_layer::{
36    AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
37};
38use crate::cache::CacheManagerRef;
39use crate::config::MitoConfig;
40use crate::engine::region_hook::SstFileInfo;
41use crate::error::{
42    Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu,
43    RegionTruncatedSnafu, Result,
44};
45use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
46use crate::memtable::bulk::ENCODE_ROW_THRESHOLD;
47use crate::memtable::bulk::json_align::Json2Aligner;
48use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, MemtableRanges, RangesOptions};
49use crate::metrics::{
50    FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_FILE_TOTAL, FLUSH_REQUESTS_TOTAL,
51    INFLIGHT_FLUSH_COUNT,
52};
53use crate::read::FlatSource;
54use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
55use crate::read::flat_merge::FlatMergeIterator;
56use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
57use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
58use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr};
59use crate::request::{
60    BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
61    SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
62};
63use crate::schedule::scheduler::{Job, SchedulerRef};
64use crate::sst::file::FileMeta;
65use crate::sst::parquet::metadata::extract_primary_key_range;
66use crate::sst::parquet::{
67    DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions, flat_format,
68};
69use crate::sst::{FlatSchemaOptions, FormatType, to_flat_sst_arrow_schema};
70use crate::worker::WorkerListener;
71
72/// Global write buffer (memtable) manager.
73///
74/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
75pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
76    /// Returns whether to trigger the engine.
77    fn should_flush_engine(&self) -> bool;
78
79    /// Returns whether to stall write requests.
80    fn should_stall(&self) -> bool;
81
82    /// Reserves `mem` bytes.
83    fn reserve_mem(&self, mem: usize);
84
85    /// Tells the manager we are freeing `mem` bytes.
86    ///
87    /// We are in the process of freeing `mem` bytes, so it is not considered
88    /// when checking the soft limit.
89    fn schedule_free_mem(&self, mem: usize);
90
91    /// We have freed `mem` bytes.
92    fn free_mem(&self, mem: usize);
93
94    /// Returns the total memory used by memtables.
95    fn memory_usage(&self) -> usize;
96
97    /// Returns the mutable memtable memory limit.
98    ///
99    /// The write buffer manager should flush memtables when the mutable memory usage
100    /// exceeds this limit.
101    fn flush_limit(&self) -> usize;
102}
103
104pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
105
106/// Default [WriteBufferManager] implementation.
107///
108/// Inspired by RocksDB's WriteBufferManager.
109/// <https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h>
110#[derive(Debug)]
111pub struct WriteBufferManagerImpl {
112    /// Write buffer size for the engine.
113    global_write_buffer_size: usize,
114    /// Mutable memtable memory size limit.
115    mutable_limit: usize,
116    /// Memory in used (e.g. used by mutable and immutable memtables).
117    memory_used: AtomicUsize,
118    /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
119    memory_active: AtomicUsize,
120    /// Optional notifier.
121    /// The manager can wake up the worker once we free the write buffer.
122    notifier: Option<watch::Sender<()>>,
123}
124
125impl WriteBufferManagerImpl {
126    /// Returns a new manager with specific `global_write_buffer_size`.
127    pub fn new(global_write_buffer_size: usize) -> Self {
128        Self {
129            global_write_buffer_size,
130            mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
131            memory_used: AtomicUsize::new(0),
132            memory_active: AtomicUsize::new(0),
133            notifier: None,
134        }
135    }
136
137    /// Attaches a notifier to the manager.
138    pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
139        self.notifier = Some(notifier);
140        self
141    }
142
143    /// Returns memory usage of mutable memtables.
144    pub fn mutable_usage(&self) -> usize {
145        self.memory_active.load(Ordering::Relaxed)
146    }
147
148    /// Returns the size limit for mutable memtables.
149    fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
150        // Reserves half of the write buffer for mutable memtable.
151        global_write_buffer_size / 2
152    }
153}
154
155impl WriteBufferManager for WriteBufferManagerImpl {
156    fn should_flush_engine(&self) -> bool {
157        let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
158        if mutable_memtable_memory_usage >= self.mutable_limit {
159            debug!(
160                "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
161                mutable_memtable_memory_usage,
162                self.memory_usage(),
163                self.mutable_limit,
164                self.global_write_buffer_size,
165            );
166            return true;
167        }
168
169        let memory_usage = self.memory_used.load(Ordering::Relaxed);
170        if memory_usage >= self.global_write_buffer_size {
171            return true;
172        }
173
174        false
175    }
176
177    fn should_stall(&self) -> bool {
178        self.memory_usage() >= self.global_write_buffer_size
179    }
180
181    fn reserve_mem(&self, mem: usize) {
182        self.memory_used.fetch_add(mem, Ordering::Relaxed);
183        self.memory_active.fetch_add(mem, Ordering::Relaxed);
184    }
185
186    fn schedule_free_mem(&self, mem: usize) {
187        self.memory_active.fetch_sub(mem, Ordering::Relaxed);
188    }
189
190    fn free_mem(&self, mem: usize) {
191        self.memory_used.fetch_sub(mem, Ordering::Relaxed);
192        if let Some(notifier) = &self.notifier {
193            // Notifies the worker after the memory usage is decreased. When we drop the memtable
194            // outside of the worker, the worker may still stall requests because the memory usage
195            // is not updated. So we need to notify the worker to handle stalled requests again.
196            let _ = notifier.send(());
197        }
198    }
199
200    fn memory_usage(&self) -> usize {
201        self.memory_used.load(Ordering::Relaxed)
202    }
203
204    fn flush_limit(&self) -> usize {
205        self.mutable_limit
206    }
207}
208
209/// Reason of a flush task.
210#[derive(Debug, IntoStaticStr, Clone, Copy, PartialEq, Eq)]
211pub enum FlushReason {
212    /// Engine reaches flush threshold.
213    EngineFull,
214    /// Manual flush.
215    Manual,
216    /// Flush to alter table.
217    Alter,
218    /// Flush periodically.
219    Periodically,
220    /// Flush memtable during downgrading state.
221    Downgrading,
222    /// Enter staging mode.
223    EnterStaging,
224    /// Flush when region is closing.
225    Closing,
226    /// Flush triggered before region migration.
227    RegionMigration,
228    /// Flush triggered by repartition procedure.
229    Repartition,
230    /// Flush triggered by remote WAL pruning.
231    RemoteWalPrune,
232}
233
234impl FlushReason {
235    /// Get flush reason as static str.
236    fn as_str(&self) -> &'static str {
237        self.into()
238    }
239}
240
241impl From<RegionFlushReason> for FlushReason {
242    fn from(reason: RegionFlushReason) -> Self {
243        match reason {
244            RegionFlushReason::RegionMigration => FlushReason::RegionMigration,
245            RegionFlushReason::Repartition => FlushReason::Repartition,
246            RegionFlushReason::RemoteWalPrune => FlushReason::RemoteWalPrune,
247            RegionFlushReason::Closing => FlushReason::Closing,
248            RegionFlushReason::Downgrading => FlushReason::Downgrading,
249        }
250    }
251}
252
253/// Task to flush a region.
254pub(crate) struct RegionFlushTask {
255    /// Region to flush.
256    pub(crate) region_id: RegionId,
257    /// Reason to flush.
258    pub(crate) reason: FlushReason,
259    /// Flush result senders.
260    pub(crate) senders: Vec<OutputTx>,
261    /// Request sender to notify the worker.
262    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
263
264    pub(crate) access_layer: AccessLayerRef,
265    pub(crate) listener: WorkerListener,
266    pub(crate) engine_config: Arc<MitoConfig>,
267    pub(crate) row_group_size: Option<usize>,
268    pub(crate) cache_manager: CacheManagerRef,
269    pub(crate) manifest_ctx: ManifestContextRef,
270
271    /// Index options for the region.
272    pub(crate) index_options: IndexOptions,
273    /// Semaphore to control flush concurrency.
274    pub(crate) flush_semaphore: Arc<Semaphore>,
275    /// Whether the region is in staging mode.
276    pub(crate) is_staging: bool,
277    /// Partition expression of the region.
278    ///
279    /// This is used to generate the file meta.
280    pub(crate) partition_expr: Option<String>,
281}
282
283impl RegionFlushTask {
284    /// Push the sender if it is not none.
285    pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
286        if let Some(sender) = sender.take_inner() {
287            self.senders.push(sender);
288        }
289    }
290
291    /// Consumes the task and notify the sender the job is success.
292    fn on_success(self) {
293        for sender in self.senders {
294            sender.send(Ok(0));
295        }
296    }
297
298    /// Send flush error to waiter.
299    fn on_failure(&mut self, err: Arc<Error>) {
300        for sender in self.senders.drain(..) {
301            sender.send(Err(err.clone()).context(FlushRegionSnafu {
302                region_id: self.region_id,
303            }));
304        }
305    }
306
307    /// Converts the flush task into a background job.
308    ///
309    /// We must call this in the region worker.
310    fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
311        // Get a version of this region before creating a job to get current
312        // wal entry id, sequence and immutable memtables.
313        let version_data = version_control.current();
314
315        Box::pin(async move {
316            INFLIGHT_FLUSH_COUNT.inc();
317            self.do_flush(version_data).await;
318            INFLIGHT_FLUSH_COUNT.dec();
319        })
320    }
321
322    /// Runs the flush task.
323    async fn do_flush(&mut self, version_data: VersionControlData) {
324        let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
325        self.listener.on_flush_begin(self.region_id).await;
326
327        let worker_request = match self.flush_memtables(&version_data).await {
328            Ok(edit) => {
329                let memtables_to_remove = version_data
330                    .version
331                    .memtables
332                    .immutables()
333                    .iter()
334                    .map(|m| m.id())
335                    .collect();
336                let flush_finished = FlushFinished {
337                    region_id: self.region_id,
338                    // The last entry has been flushed.
339                    flushed_entry_id: version_data.last_entry_id,
340                    senders: std::mem::take(&mut self.senders),
341                    _timer: timer,
342                    edit,
343                    memtables_to_remove,
344                    is_staging: self.is_staging,
345                    flush_reason: self.reason,
346                };
347                WorkerRequest::Background {
348                    region_id: self.region_id,
349                    notify: BackgroundNotify::FlushFinished(flush_finished),
350                }
351            }
352            Err(e) => {
353                error!(e; "Failed to flush region {}", self.region_id);
354                // Discard the timer.
355                timer.stop_and_discard();
356
357                let err = Arc::new(e);
358                self.on_failure(err.clone());
359                WorkerRequest::Background {
360                    region_id: self.region_id,
361                    notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
362                }
363            }
364        };
365        self.send_worker_request(worker_request).await;
366    }
367
368    /// Flushes memtables to level 0 SSTs and updates the manifest.
369    /// Returns the [RegionEdit] to apply.
370    async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
371        // We must use the immutable memtables list and entry ids from the `version_data`
372        // for consistency as others might already modify the version in the `version_control`.
373        let version = &version_data.version;
374        let timer = FLUSH_ELAPSED
375            .with_label_values(&["flush_memtables"])
376            .start_timer();
377
378        let mut write_opts = WriteOptions {
379            write_buffer_size: self.engine_config.sst_write_buffer_size,
380            ..Default::default()
381        };
382        if let Some(row_group_size) = self.row_group_size {
383            write_opts.row_group_size = row_group_size;
384        }
385
386        let DoFlushMemtablesResult {
387            file_metas,
388            flushed_bytes,
389            series_count,
390            encoded_part_count,
391            flush_metrics,
392            sst_infos,
393        } = self.do_flush_memtables(version, write_opts).await?;
394
395        if !file_metas.is_empty() {
396            FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
397        }
398
399        let mut file_ids = Vec::with_capacity(file_metas.len());
400        let mut total_rows = 0;
401        let mut total_bytes = 0;
402        for meta in &file_metas {
403            file_ids.push(meta.file_id);
404            total_rows += meta.num_rows;
405            total_bytes += meta.file_size;
406        }
407        info!(
408            "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, encoded_part_count: {}, metrics: {:?}",
409            self.region_id,
410            self.reason.as_str(),
411            file_ids,
412            series_count,
413            total_rows,
414            total_bytes,
415            timer.stop_and_record(),
416            encoded_part_count,
417            flush_metrics,
418        );
419        flush_metrics.observe();
420
421        let hook = self.manifest_ctx.hook();
422        if let Some(hook) = &hook {
423            let files: Vec<SstFileInfo<'_>> = sst_infos
424                .iter()
425                .zip(file_metas.iter())
426                .map(|(sst_info, file_meta)| SstFileInfo {
427                    sst_info_ref: sst_info,
428                    file_meta,
429                })
430                .collect();
431            hook.on_sst_files_written(self.region_id, &version.metadata, &files)
432                .await;
433        }
434
435        let edit = RegionEdit {
436            files_to_add: file_metas,
437            files_to_remove: Vec::new(),
438            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
439            compaction_time_window: None,
440            // The last entry has been flushed.
441            flushed_entry_id: Some(version_data.last_entry_id),
442            flushed_sequence: Some(version_data.committed_sequence),
443            committed_sequence: None,
444        };
445        info!(
446            "Applying {edit:?} to region {}, is_staging: {}",
447            self.region_id, self.is_staging
448        );
449
450        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
451
452        let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
453            RegionLeaderState::Downgrading
454        } else {
455            // Check if region is in staging mode
456            let current_state = self.manifest_ctx.current_state();
457            if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
458                RegionLeaderState::Staging
459            } else {
460                RegionLeaderState::Writable
461            }
462        };
463        // We will leak files if the manifest update fails, but we ignore them for simplicity. We can
464        // add a cleanup job to remove them later.
465        let manifest_version = self
466            .manifest_ctx
467            .update_manifest(expected_state, action_list, self.is_staging)
468            .await?;
469        info!(
470            "Successfully update manifest version to {manifest_version}, region: {}, is_staging: {}, reason: {}",
471            self.region_id,
472            self.is_staging,
473            self.reason.as_str()
474        );
475
476        Ok(edit)
477    }
478
479    async fn do_flush_memtables(
480        &self,
481        version: &VersionRef,
482        write_opts: WriteOptions,
483    ) -> Result<DoFlushMemtablesResult> {
484        let memtables = version.memtables.immutables();
485        let mut file_metas = Vec::with_capacity(memtables.len());
486        let mut flushed_bytes = 0;
487        let mut series_count = 0;
488        let mut encoded_part_count = 0;
489        let mut flush_metrics = Metrics::new(WriteType::Flush);
490        let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?;
491        let hook = self.manifest_ctx.hook();
492        let mut all_sst_infos = Vec::new();
493        for mem in memtables {
494            if mem.is_empty() {
495                // Skip empty memtables.
496                continue;
497            }
498
499            // Compact the memtable first, this waits the background compaction to finish.
500            let compact_start = std::time::Instant::now();
501            if let Err(e) = mem.compact(true) {
502                common_telemetry::error!(e; "Failed to compact memtable before flush");
503            }
504            let compact_cost = compact_start.elapsed();
505            flush_metrics.compact_memtable += compact_cost;
506
507            // Sets `for_flush` flag to true.
508            let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?;
509            let num_mem_ranges = mem_ranges.ranges.len();
510
511            // Aggregate stats from all ranges
512            let num_mem_rows = mem_ranges.num_rows();
513            let memtable_series_count = mem_ranges.series_count();
514            let memtable_id = mem.id();
515            // Increases series count for each mem range. We consider each mem range has different series so
516            // the counter may have more series than the actual series count.
517            series_count += memtable_series_count;
518
519            let flush_start = Instant::now();
520            let FlushFlatMemResult {
521                num_encoded,
522                num_sources,
523                results,
524            } = self
525                .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
526                .await?;
527            encoded_part_count += num_encoded;
528            for (source_idx, result) in results.into_iter().enumerate() {
529                let (max_sequence, ssts_written, metrics) = result?;
530                if ssts_written.is_empty() {
531                    // No data written.
532                    continue;
533                }
534
535                common_telemetry::debug!(
536                    "Region {} flush one memtable {} {}/{}, metrics: {:?}",
537                    self.region_id,
538                    memtable_id,
539                    source_idx,
540                    num_sources,
541                    metrics
542                );
543
544                flush_metrics = flush_metrics.merge(metrics);
545
546                for sst_info in &ssts_written {
547                    flushed_bytes += sst_info.file_size;
548                    let pk_range = sst_info
549                        .file_metadata
550                        .as_ref()
551                        .and_then(|meta| extract_primary_key_range(meta, &version.metadata));
552                    file_metas.push(Self::new_file_meta(
553                        self.region_id,
554                        max_sequence,
555                        sst_info,
556                        partition_expr.clone(),
557                        pk_range,
558                    ));
559                }
560                if hook.is_some() {
561                    all_sst_infos.extend(ssts_written);
562                }
563            }
564
565            common_telemetry::debug!(
566                "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
567                self.region_id,
568                num_sources,
569                memtable_id,
570                num_mem_ranges,
571                num_encoded,
572                num_mem_rows,
573                flush_start.elapsed(),
574                compact_cost,
575            );
576        }
577
578        Ok(DoFlushMemtablesResult {
579            file_metas,
580            flushed_bytes,
581            series_count,
582            encoded_part_count,
583            flush_metrics,
584            sst_infos: all_sst_infos,
585        })
586    }
587
588    async fn flush_flat_mem_ranges(
589        &self,
590        version: &VersionRef,
591        write_opts: &WriteOptions,
592        mem_ranges: MemtableRanges,
593    ) -> Result<FlushFlatMemResult> {
594        let batch_schema = to_flat_sst_arrow_schema(
595            &version.metadata,
596            &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
597        );
598        let field_column_start =
599            flat_format::field_column_start(&version.metadata, batch_schema.fields().len());
600        let flat_sources = memtable_flat_sources(
601            batch_schema,
602            mem_ranges,
603            &version.options,
604            field_column_start,
605        )?;
606        let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
607        let num_encoded = flat_sources.encoded.len();
608        for (source, max_sequence) in flat_sources.sources {
609            let write_request = self.new_write_request(version, max_sequence, source);
610            let access_layer = self.access_layer.clone();
611            let write_opts = write_opts.clone();
612            let semaphore = self.flush_semaphore.clone();
613            let task = common_runtime::spawn_global(async move {
614                let _permit = semaphore.acquire().await.unwrap();
615                let mut metrics = Metrics::new(WriteType::Flush);
616                let ssts = access_layer
617                    .write_sst(write_request, &write_opts, &mut metrics)
618                    .await?;
619                FLUSH_FILE_TOTAL.inc_by(ssts.len() as u64);
620                Ok((max_sequence, ssts, metrics))
621            });
622            tasks.push(task);
623        }
624        for (encoded, max_sequence) in flat_sources.encoded {
625            let access_layer = self.access_layer.clone();
626            let cache_manager = self.cache_manager.clone();
627            let region_id = version.metadata.region_id;
628            let semaphore = self.flush_semaphore.clone();
629            let task = common_runtime::spawn_global(async move {
630                let _permit = semaphore.acquire().await.unwrap();
631                let metrics = access_layer
632                    .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
633                    .await?;
634                FLUSH_FILE_TOTAL.inc();
635                Ok((max_sequence, smallvec![encoded.sst_info], metrics))
636            });
637            tasks.push(task);
638        }
639        let num_sources = tasks.len();
640        let results = futures::future::try_join_all(tasks)
641            .await
642            .context(JoinSnafu)?;
643        Ok(FlushFlatMemResult {
644            num_encoded,
645            num_sources,
646            results,
647        })
648    }
649
650    fn new_file_meta(
651        region_id: RegionId,
652        max_sequence: u64,
653        sst_info: &SstInfo,
654        partition_expr: Option<PartitionExpr>,
655        primary_key_range: Option<(Bytes, Bytes)>,
656    ) -> FileMeta {
657        let (primary_key_min, primary_key_max) = match primary_key_range {
658            Some((min, max)) => (Some(min), Some(max)),
659            None => (None, None),
660        };
661        FileMeta {
662            region_id,
663            file_id: sst_info.file_id,
664            time_range: sst_info.time_range,
665            level: 0,
666            file_size: sst_info.file_size,
667            max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
668            available_indexes: sst_info.index_metadata.build_available_indexes(),
669            indexes: sst_info.index_metadata.build_indexes(),
670            index_file_size: sst_info.index_metadata.file_size,
671            index_version: 0,
672            num_rows: sst_info.num_rows as u64,
673            num_row_groups: sst_info.num_row_groups,
674            sequence: NonZeroU64::new(max_sequence),
675            partition_expr,
676            num_series: sst_info.num_series,
677            primary_key_min,
678            primary_key_max,
679        }
680    }
681
682    fn new_write_request(
683        &self,
684        version: &VersionRef,
685        max_sequence: u64,
686        source: FlatSource,
687    ) -> SstWriteRequest {
688        let flat_format = version
689            .options
690            .sst_format
691            .map(|f| f == FormatType::Flat)
692            .unwrap_or(self.engine_config.default_flat_format);
693        SstWriteRequest {
694            op_type: OperationType::Flush,
695            metadata: version.metadata.clone(),
696            source,
697            cache_manager: self.cache_manager.clone(),
698            storage: version.options.storage.clone(),
699            max_sequence: Some(max_sequence),
700            sst_write_format: if flat_format {
701                FormatType::Flat
702            } else {
703                FormatType::PrimaryKey
704            },
705            index_options: self.index_options.clone(),
706            index_config: self.engine_config.index.clone(),
707            inverted_index_config: self.engine_config.inverted_index.clone(),
708            fulltext_index_config: self.engine_config.fulltext_index.clone(),
709            bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
710            #[cfg(feature = "vector_index")]
711            vector_index_config: self.engine_config.vector_index.clone(),
712        }
713    }
714
715    /// Notify flush job status.
716    pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
717        if let Err(e) = self
718            .request_sender
719            .send(WorkerRequestWithTime::new(request))
720            .await
721        {
722            error!(
723                "Failed to notify flush job status for region {}, request: {:?}",
724                self.region_id, e.0
725            );
726        }
727    }
728
729    /// Merge two flush tasks.
730    fn merge(&mut self, mut other: RegionFlushTask) {
731        assert_eq!(self.region_id, other.region_id);
732        // Now we only merge senders. They share the same flush reason.
733        self.senders.append(&mut other.senders);
734    }
735}
736
737struct FlushFlatMemResult {
738    num_encoded: usize,
739    num_sources: usize,
740    results: Vec<Result<(SequenceNumber, SstInfoArray, Metrics)>>,
741}
742
743struct DoFlushMemtablesResult {
744    file_metas: Vec<FileMeta>,
745    flushed_bytes: u64,
746    series_count: usize,
747    encoded_part_count: usize,
748    flush_metrics: Metrics,
749    sst_infos: Vec<SstInfo>,
750}
751
752struct FlatSources {
753    sources: SmallVec<[(FlatSource, SequenceNumber); 4]>,
754    encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
755}
756
757/// Returns the max sequence and [FlatSource] for the given memtable.
758fn memtable_flat_sources(
759    schema: SchemaRef,
760    mem_ranges: MemtableRanges,
761    options: &RegionOptions,
762    field_column_start: usize,
763) -> Result<FlatSources> {
764    let MemtableRanges { ranges } = mem_ranges;
765    let mut flat_sources = FlatSources {
766        sources: SmallVec::new(),
767        encoded: SmallVec::new(),
768    };
769
770    if ranges.len() == 1 {
771        debug!("Flushing single flat range");
772
773        let only_range = ranges.into_values().next().unwrap();
774        let max_sequence = only_range.stats().max_sequence();
775        if let Some(encoded) = only_range.encoded() {
776            flat_sources.encoded.push((encoded, max_sequence));
777        } else {
778            let iter = only_range.build_record_batch_iter(None, None)?;
779            // Dedup according to append mode and merge mode.
780            // Even single range may have duplicate rows.
781            let iter = maybe_dedup_one(
782                options.append_mode,
783                options.merge_mode(),
784                field_column_start,
785                iter,
786            );
787            flat_sources
788                .sources
789                .push((FlatSource::new_iter(schema, iter), max_sequence));
790        };
791    } else {
792        let min_flush_rows = *ENCODE_ROW_THRESHOLD;
793        // Calculate total rows from non-encoded ranges.
794        let total_rows: usize = ranges
795            .values()
796            .filter(|r| r.encoded().is_none())
797            .map(|r| r.num_rows())
798            .sum();
799        debug!(
800            "Flushing multiple flat ranges, total_rows: {}, min_flush_rows: {}, num_ranges: {}",
801            total_rows,
802            min_flush_rows,
803            ranges.len()
804        );
805        let mut rows_remaining = total_rows;
806        let mut last_iter_rows = 0;
807        let num_ranges = ranges.len();
808        let mut input_iters = Vec::with_capacity(num_ranges);
809        let mut current_ranges = Vec::new();
810
811        let has_json2 = schema.fields().iter().any(is_structured_json_field);
812        let mut json_align_schemas = if has_json2 {
813            Some(Vec::with_capacity(num_ranges))
814        } else {
815            None
816        };
817
818        for (_range_id, range) in ranges {
819            if let Some(encoded) = range.encoded() {
820                let max_sequence = range.stats().max_sequence();
821                flat_sources.encoded.push((encoded, max_sequence));
822                continue;
823            }
824
825            // Collect schemas if has json2 field.
826            if let Some(schemas) = json_align_schemas.as_mut() {
827                let schema = range
828                    .record_batch_schema_hint()
829                    .unwrap_or_else(|| schema.clone());
830                schemas.push(schema);
831            }
832
833            let iter = range.build_record_batch_iter(None, None)?;
834            input_iters.push(iter);
835            let range_rows = range.num_rows();
836            last_iter_rows += range_rows;
837            rows_remaining -= range_rows;
838            current_ranges.push(range);
839
840            // Flush if we have enough rows, but don't flush if the remaining rows
841            // would be less than DEFAULT_ROW_GROUP_SIZE (to avoid small last files).
842            if last_iter_rows >= min_flush_rows
843                && (rows_remaining == 0 || rows_remaining >= DEFAULT_ROW_GROUP_SIZE)
844            {
845                debug!(
846                    "Flush batch ready, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
847                    last_iter_rows,
848                    min_flush_rows,
849                    input_iters.len(),
850                    rows_remaining
851                );
852
853                // Calculate max_sequence from all merged ranges
854                let max_sequence = current_ranges
855                    .iter()
856                    .map(|r| r.stats().max_sequence())
857                    .max()
858                    .unwrap_or(0);
859
860                let input_iters =
861                    std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges));
862                let (schema, input_iters) = maybe_align_json2_iters(
863                    schema.clone(),
864                    json_align_schemas.take(),
865                    input_iters,
866                )?;
867
868                let maybe_dedup = merge_and_dedup(
869                    &schema,
870                    options.append_mode,
871                    options.merge_mode(),
872                    field_column_start,
873                    input_iters,
874                )?;
875
876                flat_sources
877                    .sources
878                    .push((FlatSource::new_iter(schema, maybe_dedup), max_sequence));
879                last_iter_rows = 0;
880                current_ranges.clear();
881
882                json_align_schemas = if has_json2 {
883                    Some(Vec::with_capacity(num_ranges))
884                } else {
885                    None
886                };
887            }
888        }
889
890        // Handle remaining iters.
891        if !input_iters.is_empty() {
892            debug!(
893                "Flush remaining batch, rows: {}, min_rows: {}, num_iters: {}, remaining: {}",
894                last_iter_rows,
895                min_flush_rows,
896                input_iters.len(),
897                rows_remaining
898            );
899
900            let (schema, input_iters) =
901                maybe_align_json2_iters(schema, json_align_schemas, input_iters)?;
902
903            let max_sequence = current_ranges
904                .iter()
905                .map(|r| r.stats().max_sequence())
906                .max()
907                .unwrap_or(0);
908
909            let maybe_dedup = merge_and_dedup(
910                &schema,
911                options.append_mode,
912                options.merge_mode(),
913                field_column_start,
914                input_iters,
915            )?;
916
917            flat_sources
918                .sources
919                .push((FlatSource::new_iter(schema, maybe_dedup), max_sequence));
920        }
921    }
922
923    Ok(flat_sources)
924}
925
926fn maybe_align_json2_iters(
927    schema: SchemaRef,
928    schemas: Option<Vec<SchemaRef>>,
929    input_iters: Vec<BoxedRecordBatchIterator>,
930) -> Result<(SchemaRef, Vec<BoxedRecordBatchIterator>)> {
931    let Some(schemas) = schemas else {
932        return Ok((schema, input_iters));
933    };
934
935    let aligner = Json2Aligner::try_new(schemas)?;
936    let input_iters = input_iters
937        .into_iter()
938        .map(|input_iter| aligner.wrap_iter(input_iter))
939        .collect();
940
941    Ok((aligner.schema().clone(), input_iters))
942}
943
944/// Merges multiple record batch iterators and applies deduplication based on the specified mode.
945///
946/// This function is used during the flush process to combine data from multiple memtable ranges
947/// into a single stream while handling duplicate records according to the configured merge strategy.
948///
949/// # Arguments
950///
951/// * `schema` - The Arrow schema reference that defines the structure of the record batches
952/// * `append_mode` - When true, no deduplication is performed and all records are preserved.
953///                  This is used for append-only workloads where duplicate handling is not required.
954/// * `merge_mode` - The strategy used for deduplication when not in append mode:
955///   - `MergeMode::LastRow`: Keeps the last record for each primary key
956///   - `MergeMode::LastNonNull`: Keeps the last non-null values for each field
957/// * `field_column_start` - The starting column index for fields in the record batch.
958///                          Used when `MergeMode::LastNonNull` to identify which columns
959///                          contain field values versus primary key columns.
960/// * `input_iters` - A vector of record batch iterators to be merged and deduplicated
961///
962/// # Returns
963///
964/// Returns a boxed record batch iterator that yields the merged and potentially deduplicated
965/// record batches.
966///
967/// # Behavior
968///
969/// 1. Creates a `FlatMergeIterator` to merge all input iterators in sorted order based on
970///    primary key and timestamp
971/// 2. If `append_mode` is true, returns the merge iterator directly without deduplication
972/// 3. If `append_mode` is false, wraps the merge iterator with a `FlatDedupIterator` that
973///    applies the specified merge mode:
974///    - `LastRow`: Removes duplicate rows, keeping only the last one
975///    - `LastNonNull`: Removes duplicates but preserves the last non-null value for each field
976///
977/// # Examples
978///
979/// ```ignore
980/// let merged_iter = merge_and_dedup(
981///     &schema,
982///     false,  // not append mode, apply dedup
983///     MergeMode::LastRow,
984///     2,  // fields start at column 2 after primary key columns
985///     vec![iter1, iter2, iter3],
986/// )?;
987/// ```
988pub fn merge_and_dedup(
989    schema: &SchemaRef,
990    append_mode: bool,
991    merge_mode: MergeMode,
992    field_column_start: usize,
993    input_iters: Vec<BoxedRecordBatchIterator>,
994) -> Result<BoxedRecordBatchIterator> {
995    let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
996    let maybe_dedup = if append_mode {
997        // No dedup in append mode
998        Box::new(merge_iter) as _
999    } else {
1000        // Dedup according to merge mode.
1001        match merge_mode {
1002            MergeMode::LastRow => {
1003                Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
1004            }
1005            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
1006                merge_iter,
1007                FlatLastNonNull::new(field_column_start, false),
1008            )) as _,
1009        }
1010    };
1011    Ok(maybe_dedup)
1012}
1013
1014pub fn maybe_dedup_one(
1015    append_mode: bool,
1016    merge_mode: MergeMode,
1017    field_column_start: usize,
1018    input_iter: BoxedRecordBatchIterator,
1019) -> BoxedRecordBatchIterator {
1020    if append_mode {
1021        // No dedup in append mode
1022        input_iter
1023    } else {
1024        // Dedup according to merge mode.
1025        match merge_mode {
1026            MergeMode::LastRow => {
1027                Box::new(FlatDedupIterator::new(input_iter, FlatLastRow::new(false)))
1028            }
1029            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
1030                input_iter,
1031                FlatLastNonNull::new(field_column_start, false),
1032            )),
1033        }
1034    }
1035}
1036
1037/// Manages background flushes of a worker.
1038pub(crate) struct FlushScheduler {
1039    /// Tracks regions need to flush.
1040    region_status: HashMap<RegionId, FlushStatus>,
1041    /// Background job scheduler.
1042    scheduler: SchedulerRef,
1043}
1044
1045impl FlushScheduler {
1046    /// Creates a new flush scheduler.
1047    pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
1048        FlushScheduler {
1049            region_status: HashMap::new(),
1050            scheduler,
1051        }
1052    }
1053
1054    /// Returns true if the region already requested flush.
1055    pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
1056        self.region_status.contains_key(&region_id)
1057    }
1058
1059    fn schedule_flush_task(
1060        &mut self,
1061        version_control: &VersionControlRef,
1062        task: RegionFlushTask,
1063    ) -> Result<()> {
1064        let region_id = task.region_id;
1065
1066        // If current region doesn't have flush status, we can flush the region directly.
1067        if let Err(e) = version_control.freeze_mutable() {
1068            error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
1069
1070            return Err(e);
1071        }
1072        // Submit a flush job.
1073        let job = task.into_flush_job(version_control);
1074        if let Err(e) = self.scheduler.schedule(job) {
1075            // If scheduler returns error, senders in the job will be dropped and waiters
1076            // can get recv errors.
1077            error!(e; "Failed to schedule flush job for region {}", region_id);
1078
1079            return Err(e);
1080        }
1081        Ok(())
1082    }
1083
1084    /// Schedules a flush `task` for specific `region`.
1085    pub(crate) fn schedule_flush(
1086        &mut self,
1087        region_id: RegionId,
1088        version_control: &VersionControlRef,
1089        task: RegionFlushTask,
1090    ) -> Result<()> {
1091        debug_assert_eq!(region_id, task.region_id);
1092
1093        let version = version_control.current().version;
1094        if version.memtables.is_empty() {
1095            debug_assert!(!self.region_status.contains_key(&region_id));
1096            // The region has nothing to flush.
1097            task.on_success();
1098            return Ok(());
1099        }
1100
1101        // Don't increase the counter if a region has nothing to flush.
1102        FLUSH_REQUESTS_TOTAL
1103            .with_label_values(&[task.reason.as_str()])
1104            .inc();
1105
1106        // If current region has flush status, merge the task.
1107        if let Some(flush_status) = self.region_status.get_mut(&region_id) {
1108            // Checks whether we can flush the region now.
1109            debug!("Merging flush task for region {}", region_id);
1110            flush_status.merge_task(task);
1111            return Ok(());
1112        }
1113
1114        self.schedule_flush_task(version_control, task)?;
1115
1116        // Add this region to status map.
1117        let _ = self.region_status.insert(
1118            region_id,
1119            FlushStatus::new(region_id, version_control.clone()),
1120        );
1121
1122        Ok(())
1123    }
1124
1125    /// Notifies the scheduler that the flush job is finished.
1126    ///
1127    /// Returns all pending requests if the region doesn't need to flush again.
1128    pub(crate) fn on_flush_success(
1129        &mut self,
1130        region_id: RegionId,
1131    ) -> Option<(
1132        Vec<SenderDdlRequest>,
1133        Vec<SenderWriteRequest>,
1134        Vec<SenderBulkRequest>,
1135    )> {
1136        let flush_status = self.region_status.get_mut(&region_id)?;
1137        // If region doesn't have any pending flush task, we need to remove it from the status.
1138        if flush_status.pending_task.is_none() {
1139            // The region doesn't have any pending flush task.
1140            // Safety: The flush status must exist.
1141            debug!(
1142                "Region {} doesn't have any pending flush task, removing it from the status",
1143                region_id
1144            );
1145            let flush_status = self.region_status.remove(&region_id).unwrap();
1146            return Some((
1147                flush_status.pending_ddls,
1148                flush_status.pending_writes,
1149                flush_status.pending_bulk_writes,
1150            ));
1151        }
1152
1153        // If region has pending task, but has nothing to flush, we need to remove it from the status.
1154        let version_data = flush_status.version_control.current();
1155        if version_data.version.memtables.is_empty() {
1156            // The region has nothing to flush, we also need to remove it from the status.
1157            // Safety: The pending task is not None.
1158            let task = flush_status.pending_task.take().unwrap();
1159            // The region has nothing to flush. We can notify pending task.
1160            task.on_success();
1161            debug!(
1162                "Region {} has nothing to flush, removing it from the status",
1163                region_id
1164            );
1165            // Safety: The flush status must exist.
1166            let flush_status = self.region_status.remove(&region_id).unwrap();
1167            return Some((
1168                flush_status.pending_ddls,
1169                flush_status.pending_writes,
1170                flush_status.pending_bulk_writes,
1171            ));
1172        }
1173
1174        // If region has pending task and has something to flush, we need to schedule it.
1175        debug!("Scheduling pending flush task for region {}", region_id);
1176        // Safety: The flush status must exist.
1177        let task = flush_status.pending_task.take().unwrap();
1178        let version_control = flush_status.version_control.clone();
1179        if let Err(err) = self.schedule_flush_task(&version_control, task) {
1180            error!(
1181                err;
1182                "Flush succeeded for region {region_id}, but failed to schedule next flush for it."
1183            );
1184        }
1185        // We can flush the region again, keep it in the region status.
1186        None
1187    }
1188
1189    /// Notifies the scheduler that the flush job is failed.
1190    pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
1191        error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
1192
1193        FLUSH_FAILURE_TOTAL.inc();
1194
1195        // Remove this region.
1196        let Some(flush_status) = self.region_status.remove(&region_id) else {
1197            return;
1198        };
1199
1200        // Fast fail: cancels all pending tasks and sends error to their waiters.
1201        flush_status.on_failure(err);
1202    }
1203
1204    /// Notifies the scheduler that the region is dropped.
1205    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
1206        self.remove_region_on_failure(
1207            region_id,
1208            Arc::new(RegionDroppedSnafu { region_id }.build()),
1209        );
1210    }
1211
1212    /// Notifies the scheduler that the region is closed.
1213    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1214        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1215    }
1216
1217    /// Notifies the scheduler that the region is truncated.
1218    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1219        self.remove_region_on_failure(
1220            region_id,
1221            Arc::new(RegionTruncatedSnafu { region_id }.build()),
1222        );
1223    }
1224
1225    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1226        // Remove this region.
1227        let Some(flush_status) = self.region_status.remove(&region_id) else {
1228            return;
1229        };
1230
1231        // Notifies all pending tasks.
1232        flush_status.on_failure(err);
1233    }
1234
1235    /// Add ddl request to pending queue.
1236    ///
1237    /// # Panics
1238    /// Panics if region didn't request flush.
1239    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1240        let status = self.region_status.get_mut(&request.region_id).unwrap();
1241        status.pending_ddls.push(request);
1242    }
1243
1244    /// Add write request to pending queue.
1245    ///
1246    /// # Panics
1247    /// Panics if region didn't request flush.
1248    pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1249        let status = self
1250            .region_status
1251            .get_mut(&request.request.region_id)
1252            .unwrap();
1253        status.pending_writes.push(request);
1254    }
1255
1256    /// Add bulk write request to pending queue.
1257    ///
1258    /// # Panics
1259    /// Panics if region didn't request flush.
1260    pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1261        let status = self.region_status.get_mut(&request.region_id).unwrap();
1262        status.pending_bulk_writes.push(request);
1263    }
1264
1265    /// Returns true if the region has pending DDLs.
1266    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1267        self.region_status
1268            .get(&region_id)
1269            .map(|status| !status.pending_ddls.is_empty())
1270            .unwrap_or(false)
1271    }
1272}
1273
1274impl Drop for FlushScheduler {
1275    fn drop(&mut self) {
1276        for (region_id, flush_status) in self.region_status.drain() {
1277            // We are shutting down so notify all pending tasks.
1278            flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1279        }
1280    }
1281}
1282
1283/// Flush status of a region scheduled by the [FlushScheduler].
1284///
1285/// Tracks running and pending flush tasks and all pending requests of a region.
1286struct FlushStatus {
1287    /// Current region.
1288    region_id: RegionId,
1289    /// Version control of the region.
1290    version_control: VersionControlRef,
1291    /// Task waiting for next flush.
1292    pending_task: Option<RegionFlushTask>,
1293    /// Pending ddl requests.
1294    pending_ddls: Vec<SenderDdlRequest>,
1295    /// Requests waiting to write after altering the region.
1296    pending_writes: Vec<SenderWriteRequest>,
1297    /// Bulk requests waiting to write after altering the region.
1298    pending_bulk_writes: Vec<SenderBulkRequest>,
1299}
1300
1301impl FlushStatus {
1302    fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1303        FlushStatus {
1304            region_id,
1305            version_control,
1306            pending_task: None,
1307            pending_ddls: Vec::new(),
1308            pending_writes: Vec::new(),
1309            pending_bulk_writes: Vec::new(),
1310        }
1311    }
1312
1313    /// Merges the task to pending task.
1314    fn merge_task(&mut self, task: RegionFlushTask) {
1315        if let Some(pending) = &mut self.pending_task {
1316            pending.merge(task);
1317        } else {
1318            self.pending_task = Some(task);
1319        }
1320    }
1321
1322    fn on_failure(self, err: Arc<Error>) {
1323        if let Some(mut task) = self.pending_task {
1324            task.on_failure(err.clone());
1325        }
1326        for ddl in self.pending_ddls {
1327            ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1328                region_id: self.region_id,
1329            }));
1330        }
1331        for write_req in self.pending_writes {
1332            write_req
1333                .sender
1334                .send(Err(err.clone()).context(FlushRegionSnafu {
1335                    region_id: self.region_id,
1336                }));
1337        }
1338    }
1339}
1340
1341#[cfg(test)]
1342mod tests {
1343    use mito_codec::row_converter::build_primary_key_codec;
1344    use tokio::sync::oneshot;
1345
1346    use super::*;
1347    use crate::cache::CacheManager;
1348    use crate::memtable::bulk::part::BulkPartConverter;
1349    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1350    use crate::memtable::{Memtable, RangesOptions};
1351    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1352    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1353    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1354    use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1355
1356    #[test]
1357    fn test_get_mutable_limit() {
1358        assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1359        assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1360        assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1361        assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1362    }
1363
1364    #[test]
1365    fn test_over_mutable_limit() {
1366        // Mutable limit is 500.
1367        let manager = WriteBufferManagerImpl::new(1000);
1368        manager.reserve_mem(400);
1369        assert!(!manager.should_flush_engine());
1370        assert!(!manager.should_stall());
1371
1372        // More than mutable limit.
1373        manager.reserve_mem(400);
1374        assert!(manager.should_flush_engine());
1375
1376        // Freezes mutable.
1377        manager.schedule_free_mem(400);
1378        assert!(!manager.should_flush_engine());
1379        assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1380        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1381
1382        // Releases immutable.
1383        manager.free_mem(400);
1384        assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1385        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1386    }
1387
1388    #[test]
1389    fn test_over_global() {
1390        // Mutable limit is 500.
1391        let manager = WriteBufferManagerImpl::new(1000);
1392        manager.reserve_mem(1100);
1393        assert!(manager.should_stall());
1394        // Global usage is still 1100.
1395        manager.schedule_free_mem(200);
1396        assert!(manager.should_flush_engine());
1397        assert!(manager.should_stall());
1398
1399        // More than global limit, mutable (1100-200-450=450) is less than mutable limit (< 500).
1400        manager.schedule_free_mem(450);
1401        assert!(manager.should_flush_engine());
1402        assert!(manager.should_stall());
1403
1404        // Now mutable is enough.
1405        manager.reserve_mem(50);
1406        assert!(manager.should_flush_engine());
1407        manager.reserve_mem(100);
1408        assert!(manager.should_flush_engine());
1409    }
1410
1411    #[test]
1412    fn test_manager_notify() {
1413        let (sender, receiver) = watch::channel(());
1414        let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1415        manager.reserve_mem(500);
1416        assert!(!receiver.has_changed().unwrap());
1417        manager.schedule_free_mem(500);
1418        assert!(!receiver.has_changed().unwrap());
1419        manager.free_mem(500);
1420        assert!(receiver.has_changed().unwrap());
1421    }
1422
1423    #[tokio::test]
1424    async fn test_schedule_empty() {
1425        let env = SchedulerEnv::new().await;
1426        let (tx, _rx) = mpsc::channel(4);
1427        let mut scheduler = env.mock_flush_scheduler();
1428        let builder = VersionControlBuilder::new();
1429
1430        let version_control = Arc::new(builder.build());
1431        let (output_tx, output_rx) = oneshot::channel();
1432        let mut task = RegionFlushTask {
1433            region_id: builder.region_id(),
1434            reason: FlushReason::Manual,
1435            senders: Vec::new(),
1436            request_sender: tx,
1437            access_layer: env.access_layer.clone(),
1438            listener: WorkerListener::default(),
1439            engine_config: Arc::new(MitoConfig::default()),
1440            row_group_size: None,
1441            cache_manager: Arc::new(CacheManager::default()),
1442            manifest_ctx: env
1443                .mock_manifest_context(version_control.current().version.metadata.clone())
1444                .await,
1445            index_options: IndexOptions::default(),
1446            flush_semaphore: Arc::new(Semaphore::new(2)),
1447            is_staging: false,
1448            partition_expr: None,
1449        };
1450        task.push_sender(OptionOutputTx::from(output_tx));
1451        scheduler
1452            .schedule_flush(builder.region_id(), &version_control, task)
1453            .unwrap();
1454        assert!(scheduler.region_status.is_empty());
1455        let output = output_rx.await.unwrap().unwrap();
1456        assert_eq!(output, 0);
1457        assert!(scheduler.region_status.is_empty());
1458    }
1459
1460    #[tokio::test]
1461    async fn test_schedule_pending_request() {
1462        let job_scheduler = Arc::new(VecScheduler::default());
1463        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1464        let (tx, _rx) = mpsc::channel(4);
1465        let mut scheduler = env.mock_flush_scheduler();
1466        let mut builder = VersionControlBuilder::new();
1467        // Overwrites the empty memtable builder.
1468        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1469        let version_control = Arc::new(builder.build());
1470        // Writes data to the memtable so it is not empty.
1471        let version_data = version_control.current();
1472        write_rows_to_version(&version_data.version, "host0", 0, 10);
1473        let manifest_ctx = env
1474            .mock_manifest_context(version_data.version.metadata.clone())
1475            .await;
1476        // Creates 3 tasks.
1477        let mut tasks: Vec<_> = (0..3)
1478            .map(|_| RegionFlushTask {
1479                region_id: builder.region_id(),
1480                reason: FlushReason::Manual,
1481                senders: Vec::new(),
1482                request_sender: tx.clone(),
1483                access_layer: env.access_layer.clone(),
1484                listener: WorkerListener::default(),
1485                engine_config: Arc::new(MitoConfig::default()),
1486                row_group_size: None,
1487                cache_manager: Arc::new(CacheManager::default()),
1488                manifest_ctx: manifest_ctx.clone(),
1489                index_options: IndexOptions::default(),
1490                flush_semaphore: Arc::new(Semaphore::new(2)),
1491                is_staging: false,
1492                partition_expr: None,
1493            })
1494            .collect();
1495        // Schedule first task.
1496        let task = tasks.pop().unwrap();
1497        scheduler
1498            .schedule_flush(builder.region_id(), &version_control, task)
1499            .unwrap();
1500        // Should schedule 1 flush.
1501        assert_eq!(1, scheduler.region_status.len());
1502        assert_eq!(1, job_scheduler.num_jobs());
1503        // Check the new version.
1504        let version_data = version_control.current();
1505        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1506        // Schedule remaining tasks.
1507        let output_rxs: Vec<_> = tasks
1508            .into_iter()
1509            .map(|mut task| {
1510                let (output_tx, output_rx) = oneshot::channel();
1511                task.push_sender(OptionOutputTx::from(output_tx));
1512                scheduler
1513                    .schedule_flush(builder.region_id(), &version_control, task)
1514                    .unwrap();
1515                output_rx
1516            })
1517            .collect();
1518        // Assumes the flush job is finished.
1519        version_control.apply_edit(
1520            Some(RegionEdit {
1521                files_to_add: Vec::new(),
1522                files_to_remove: Vec::new(),
1523                timestamp_ms: None,
1524                compaction_time_window: None,
1525                flushed_entry_id: None,
1526                flushed_sequence: None,
1527                committed_sequence: None,
1528            }),
1529            &[0],
1530            builder.file_purger(),
1531        );
1532        scheduler.on_flush_success(builder.region_id());
1533        // No new flush task.
1534        assert_eq!(1, job_scheduler.num_jobs());
1535        // The flush status is cleared.
1536        assert!(scheduler.region_status.is_empty());
1537        for output_rx in output_rxs {
1538            let output = output_rx.await.unwrap().unwrap();
1539            assert_eq!(output, 0);
1540        }
1541    }
1542
1543    // Verifies single-range flat flush path respects append_mode (no dedup) vs dedup when disabled.
1544    #[test]
1545    fn test_memtable_flat_sources_single_range_append_mode_behavior() {
1546        // Build test metadata and flat schema
1547        let metadata = metadata_for_test();
1548        let schema = to_flat_sst_arrow_schema(
1549            &metadata,
1550            &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1551        );
1552
1553        // Prepare a bulk part containing duplicate rows for the same PK and timestamp
1554        // Two rows with identical keys and timestamps (ts = 1000), different field values
1555        let capacity = 16;
1556        let pk_codec = build_primary_key_codec(&metadata);
1557        let mut converter =
1558            BulkPartConverter::new(&metadata, schema.clone(), capacity, pk_codec, true);
1559        let kvs = build_key_values_with_ts_seq_values(
1560            &metadata,
1561            "dup_key".to_string(),
1562            1,
1563            vec![1000i64, 1000i64].into_iter(),
1564            vec![Some(1.0f64), Some(2.0f64)].into_iter(),
1565            1,
1566        );
1567        converter.append_key_values(&kvs).unwrap();
1568        let part = converter.convert().unwrap();
1569
1570        // Helper to build MemtableRanges with a single range from one bulk part.
1571        // We use BulkMemtable directly because it produces record batch iterators.
1572        let build_ranges = |append_mode: bool| -> MemtableRanges {
1573            let memtable = crate::memtable::bulk::BulkMemtable::new(
1574                1,
1575                crate::memtable::bulk::BulkMemtableConfig::default(),
1576                metadata.clone(),
1577                None,
1578                None,
1579                append_mode,
1580                MergeMode::LastRow,
1581            );
1582            memtable.write_bulk(part.clone()).unwrap();
1583            memtable.ranges(None, RangesOptions::for_flush()).unwrap()
1584        };
1585
1586        // Case 1: append_mode = false => dedup happens, total rows should be 1
1587        {
1588            let mem_ranges = build_ranges(false);
1589            assert_eq!(1, mem_ranges.ranges.len());
1590
1591            let options = RegionOptions {
1592                append_mode: false,
1593                merge_mode: Some(MergeMode::LastRow),
1594                ..Default::default()
1595            };
1596
1597            let flat_sources = memtable_flat_sources(
1598                schema.clone(),
1599                mem_ranges,
1600                &options,
1601                metadata.primary_key.len(),
1602            )
1603            .unwrap();
1604            assert!(flat_sources.encoded.is_empty());
1605            assert_eq!(1, flat_sources.sources.len());
1606
1607            // Consume the iterator and count rows
1608            let mut total_rows = 0usize;
1609            for (source, _sequence) in flat_sources.sources {
1610                total_rows += source
1611                    .take_iter()
1612                    .map(|x| x.unwrap().num_rows())
1613                    .sum::<usize>();
1614            }
1615            assert_eq!(1, total_rows, "dedup should keep a single row");
1616        }
1617
1618        // Case 2: append_mode = true => no dedup, total rows should be 2
1619        {
1620            let mem_ranges = build_ranges(true);
1621            assert_eq!(1, mem_ranges.ranges.len());
1622
1623            let options = RegionOptions {
1624                append_mode: true,
1625                ..Default::default()
1626            };
1627
1628            let flat_sources =
1629                memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len())
1630                    .unwrap();
1631            assert!(flat_sources.encoded.is_empty());
1632            assert_eq!(1, flat_sources.sources.len());
1633
1634            let mut total_rows = 0usize;
1635            for (source, _sequence) in flat_sources.sources {
1636                total_rows += source
1637                    .take_iter()
1638                    .map(|x| x.unwrap().num_rows())
1639                    .sum::<usize>();
1640            }
1641            assert_eq!(2, total_rows, "append_mode should preserve duplicates");
1642        }
1643    }
1644
1645    #[tokio::test]
1646    async fn test_schedule_pending_request_on_flush_success() {
1647        common_telemetry::init_default_ut_logging();
1648        let job_scheduler = Arc::new(VecScheduler::default());
1649        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1650        let (tx, _rx) = mpsc::channel(4);
1651        let mut scheduler = env.mock_flush_scheduler();
1652        let mut builder = VersionControlBuilder::new();
1653        // Overwrites the empty memtable builder.
1654        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1655        let version_control = Arc::new(builder.build());
1656        // Writes data to the memtable so it is not empty.
1657        let version_data = version_control.current();
1658        write_rows_to_version(&version_data.version, "host0", 0, 10);
1659        let manifest_ctx = env
1660            .mock_manifest_context(version_data.version.metadata.clone())
1661            .await;
1662        // Creates 2 tasks.
1663        let mut tasks: Vec<_> = (0..2)
1664            .map(|_| RegionFlushTask {
1665                region_id: builder.region_id(),
1666                reason: FlushReason::Manual,
1667                senders: Vec::new(),
1668                request_sender: tx.clone(),
1669                access_layer: env.access_layer.clone(),
1670                listener: WorkerListener::default(),
1671                engine_config: Arc::new(MitoConfig::default()),
1672                row_group_size: None,
1673                cache_manager: Arc::new(CacheManager::default()),
1674                manifest_ctx: manifest_ctx.clone(),
1675                index_options: IndexOptions::default(),
1676                flush_semaphore: Arc::new(Semaphore::new(2)),
1677                is_staging: false,
1678                partition_expr: None,
1679            })
1680            .collect();
1681        // Schedule first task.
1682        let task = tasks.pop().unwrap();
1683        scheduler
1684            .schedule_flush(builder.region_id(), &version_control, task)
1685            .unwrap();
1686        // Should schedule 1 flush.
1687        assert_eq!(1, scheduler.region_status.len());
1688        assert_eq!(1, job_scheduler.num_jobs());
1689        // Schedule second task.
1690        let task = tasks.pop().unwrap();
1691        scheduler
1692            .schedule_flush(builder.region_id(), &version_control, task)
1693            .unwrap();
1694        assert!(
1695            scheduler
1696                .region_status
1697                .get(&builder.region_id())
1698                .unwrap()
1699                .pending_task
1700                .is_some()
1701        );
1702
1703        // Check the new version.
1704        let version_data = version_control.current();
1705        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1706        // Assumes the flush job is finished.
1707        version_control.apply_edit(
1708            Some(RegionEdit {
1709                files_to_add: Vec::new(),
1710                files_to_remove: Vec::new(),
1711                timestamp_ms: None,
1712                compaction_time_window: None,
1713                flushed_entry_id: None,
1714                flushed_sequence: None,
1715                committed_sequence: None,
1716            }),
1717            &[0],
1718            builder.file_purger(),
1719        );
1720        write_rows_to_version(&version_data.version, "host1", 0, 10);
1721        scheduler.on_flush_success(builder.region_id());
1722        assert_eq!(2, job_scheduler.num_jobs());
1723        // The pending task is cleared.
1724        assert!(
1725            scheduler
1726                .region_status
1727                .get(&builder.region_id())
1728                .unwrap()
1729                .pending_task
1730                .is_none()
1731        );
1732    }
1733}