From 648b2ae2931562152a0576823ee5d2eb1d5cd870 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Sat, 2 Sep 2023 16:55:31 +0800 Subject: [PATCH] feat(mito): Flush region (#2291) * chore: call handle_flush_request * feat: alias SchedulerRef and clean scheduler on drop * feat: add scheduler to workers * feat: remove RegionMemtableStats * feat: pick regions to flush * feat: add more fields to region flush task * feat: smallvec workspace dep * feat: Use list to hold immutable memtables * feat: flush job wip * feat: use access layer to read write sst * feat: flush memtables to l0 * feat: write manifest * feat: schedule next flush on success * feat: schedule flush on success and failure * feat: add purger to region * feat: apply edit after flush * feat: collect stats for SSTs * feat: manual flush * test: test flush and fix manifest test * feat: remove flush scheduler job limit * fix: typo * style: clippy * feat: clean flushed files on failure * chore: address CR comment * refactor: Use put_rows * feat: Clean flush scheduler on drop * feat: remove region flush status on drop and close * chore: address CR comment --- Cargo.lock | 1 + Cargo.toml | 1 + src/common/procedure/Cargo.toml | 2 +- src/mito2/Cargo.toml | 3 +- src/mito2/src/config.rs | 15 +- src/mito2/src/engine/tests.rs | 52 ++- src/mito2/src/error.rs | 27 ++ src/mito2/src/flush.rs | 374 +++++++++++++++++---- src/mito2/src/manifest/action.rs | 45 ++- src/mito2/src/manifest/tests/checkpoint.rs | 6 +- src/mito2/src/memtable/version.rs | 50 ++- src/mito2/src/read.rs | 21 -- src/mito2/src/read/scan_region.rs | 10 + src/mito2/src/read/seq_scan.rs | 8 + src/mito2/src/region.rs | 21 +- src/mito2/src/region/version.rs | 60 ++-- src/mito2/src/request.rs | 54 ++- src/mito2/src/sst/parquet.rs | 11 + src/mito2/src/sst/parquet/writer.rs | 39 ++- src/mito2/src/sst/version.rs | 37 +- src/mito2/src/worker.rs | 16 +- src/mito2/src/worker/handle_close.rs | 4 +- src/mito2/src/worker/handle_drop.rs | 4 +- src/mito2/src/worker/handle_flush.rs | 179 +++++++--- src/mito2/src/worker/handle_write.rs | 63 +--- 25 files changed, 842 insertions(+), 261 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9ba55d339..cac1497afa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5467,6 +5467,7 @@ dependencies = [ "regex", "serde", "serde_json", + "smallvec", "snafu", "storage", "store-api", diff --git a/Cargo.toml b/Cargo.toml index c09abdc771..2ee30b714c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,6 +91,7 @@ rand = "0.8" regex = "1.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +smallvec = "1" snafu = { version = "0.7", features = ["backtraces"] } sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "296a4f6c73b129d6f565a42a2e5e53c6bc2b9da4", features = [ "visitor", diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index c09ed6e1e9..971653d136 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -16,7 +16,7 @@ humantime-serde.workspace = true object-store = { workspace = true } serde.workspace = true serde_json = "1.0" -smallvec = "1" +smallvec.workspace = true snafu.workspace = true tokio.workspace = true uuid.workspace = true diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 38b239f9a9..af1ae7e61b 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -45,7 +45,8 @@ paste.workspace = true prost.workspace = true regex = "1.5" serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" +serde_json.workspace = true +smallvec.workspace = true snafu.workspace = true storage = { workspace = true } store-api = { workspace = true } diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 8371aac90d..99b9ff9ffb 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -14,6 +14,8 @@ //! Configurations. +use std::time::Duration; + use common_base::readable_size::ReadableSize; use common_datasource::compression::CompressionType; use common_telemetry::warn; @@ -44,8 +46,16 @@ pub struct MitoConfig { pub manifest_compress_type: CompressionType, // Background job configs: - /// Max number of running background jobs. + /// Max number of running background jobs (default 4). pub max_background_jobs: usize, + + // Flush configs: + /// Interval to auto flush a region if it has not flushed yet (default 30 min). + pub auto_flush_interval: Duration, + /// Global write buffer size threshold to trigger flush (default 512M). + pub global_write_buffer_size: ReadableSize, + /// Global write buffer size threshold to reject write requests (default 2G). + pub global_write_buffer_reject_size: ReadableSize, } impl Default for MitoConfig { @@ -57,6 +67,9 @@ impl Default for MitoConfig { manifest_checkpoint_distance: 10, manifest_compress_type: CompressionType::Uncompressed, max_background_jobs: DEFAULT_MAX_BG_JOB, + auto_flush_interval: Duration::from_secs(30 * 60), + global_write_buffer_size: ReadableSize::mb(512), + global_write_buffer_reject_size: ReadableSize::gb(2), } } } diff --git a/src/mito2/src/engine/tests.rs b/src/mito2/src/engine/tests.rs index 67a4e1c6cb..62457b65e5 100644 --- a/src/mito2/src/engine/tests.rs +++ b/src/mito2/src/engine/tests.rs @@ -22,7 +22,8 @@ use api::v1::{ColumnSchema, Row, Rows, SemanticType}; use common_recordbatch::RecordBatches; use store_api::metadata::ColumnMetadata; use store_api::region_request::{ - RegionCreateRequest, RegionDeleteRequest, RegionOpenRequest, RegionPutRequest, + RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest, RegionOpenRequest, + RegionPutRequest, }; use store_api::storage::RegionId; @@ -408,3 +409,52 @@ async fn test_put_overwrite() { +-------+---------+---------------------+"; assert_eq!(expected, batches.pretty_print().unwrap()); } + +#[tokio::test] +async fn test_manual_flush() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + let Output::AffectedRows(rows) = engine + .handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {})) + .await + .unwrap() + else { + unreachable!() + }; + assert_eq!(0, rows); + + let request = ScanRequest::default(); + let scanner = engine.handle_query(region_id, request).unwrap(); + assert_eq!(1, scanner.num_files()); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index f84abfc654..1281381958 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -395,6 +395,30 @@ pub enum Error { source: object_store::Error, location: Location, }, + + #[snafu(display( + "Failed to flush region {}, location: {}, source: {}", + region_id, + location, + source + ))] + FlushRegion { + region_id: RegionId, + source: Arc, + location: Location, + }, + + #[snafu(display("Region {} is dropped, location: {}", region_id, location))] + RegionDropped { + region_id: RegionId, + location: Location, + }, + + #[snafu(display("Region {} is closed, location: {}", region_id, location))] + RegionClosed { + region_id: RegionId, + location: Location, + }, } pub type Result = std::result::Result; @@ -457,6 +481,9 @@ impl ErrorExt for Error { StopScheduler { .. } => StatusCode::Internal, BuildPredicate { source, .. } => source.status_code(), DeleteSst { .. } => StatusCode::StorageUnavailable, + FlushRegion { source, .. } => source.status_code(), + RegionDropped { .. } => StatusCode::Cancelled, + RegionClosed { .. } => StatusCode::Cancelled, } } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 50a1f19e94..349628a20a 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -17,13 +17,25 @@ use std::collections::HashMap; use std::sync::Arc; -use store_api::storage::RegionId; -use tokio::sync::oneshot::Sender; +use common_query::Output; +use common_telemetry::{error, info}; +use snafu::ResultExt; +use store_api::storage::{RegionId, ScanRequest}; +use tokio::sync::{mpsc, oneshot}; -use crate::error::Result; +use crate::access_layer::AccessLayerRef; +use crate::error::{Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, Result}; +use crate::memtable::MemtableBuilderRef; +use crate::read::Source; +use crate::region::version::{VersionControlData, VersionRef}; use crate::region::MitoRegionRef; -use crate::request::{SenderDdlRequest, SenderWriteRequest}; -use crate::schedule::scheduler::SchedulerRef; +use crate::request::{ + BackgroundNotify, FlushFailed, FlushFinished, SenderDdlRequest, WorkerRequest, +}; +use crate::schedule::scheduler::{Job, SchedulerRef}; +use crate::sst::file::{FileId, FileMeta}; +use crate::sst::file_purger::FilePurgerRef; +use crate::sst::parquet::WriteOptions; /// Global write buffer (memtable) manager. /// @@ -32,9 +44,6 @@ pub trait WriteBufferManager: Send + Sync + std::fmt::Debug { /// Returns whether to trigger the engine. fn should_flush_engine(&self) -> bool; - /// Returns whether the mutable memtable of this region needs to flush. - fn should_flush_region(&self, stats: RegionMemtableStats) -> bool; - /// Reserves `mem` bytes. fn reserve_mem(&self, mem: usize); @@ -53,15 +62,6 @@ pub trait WriteBufferManager: Send + Sync + std::fmt::Debug { pub type WriteBufferManagerRef = Arc; -/// Statistics of a region's memtable. -#[derive(Debug)] -pub struct RegionMemtableStats { - /// Size of the mutable memtable. - pub bytes_mutable: usize, - /// Write buffer size of the region. - pub write_buffer_size: usize, -} - // TODO(yingwen): Implements the manager. #[derive(Debug)] pub struct WriteBufferManagerImpl {} @@ -71,10 +71,6 @@ impl WriteBufferManager for WriteBufferManagerImpl { false } - fn should_flush_region(&self, _stats: RegionMemtableStats) -> bool { - false - } - fn reserve_mem(&self, _mem: usize) {} fn schedule_free_mem(&self, _mem: usize) {} @@ -90,11 +86,11 @@ impl WriteBufferManager for WriteBufferManagerImpl { pub enum FlushReason { /// Other reasons. Others, - /// Memtable is full. - MemtableFull, /// Engine reaches flush threshold. EngineFull, - // TODO(yingwen): Alter, manually. + /// Manual flush. + Manual, + // TODO(yingwen): Alter. } /// Task to flush a region. @@ -103,17 +99,141 @@ pub(crate) struct RegionFlushTask { pub(crate) region_id: RegionId, /// Reason to flush. pub(crate) reason: FlushReason, - /// Flush result sender. - pub(crate) sender: Option>>, + /// Flush result senders. + pub(crate) senders: Vec>>, + /// Request sender to notify the worker. + pub(crate) request_sender: mpsc::Sender, + + pub(crate) access_layer: AccessLayerRef, + pub(crate) memtable_builder: MemtableBuilderRef, + pub(crate) file_purger: FilePurgerRef, } impl RegionFlushTask { /// Consumes the task and notify the sender the job is success. fn on_success(self) { - if let Some(sender) = self.sender { - let _ = sender.send(Ok(())); + for sender in self.senders { + let _ = sender.send(Ok(Output::AffectedRows(0))); } } + + /// Send flush error to waiter. + fn on_failure(&mut self, err: Arc) { + for sender in self.senders.drain(..) { + // Ignore send result. + let _ = sender.send(Err(err.clone()).context(FlushRegionSnafu { + region_id: self.region_id, + })); + } + } + + /// Converts the flush task into a background job. + fn into_flush_job(mut self, region: &MitoRegionRef) -> Job { + // Get a version of this region before creating a job so we + // always have a consistent memtable list. + let version_data = region.version_control.current(); + + Box::pin(async move { + self.do_flush(version_data).await; + }) + } + + /// Runs the flush task. + async fn do_flush(&mut self, version_data: VersionControlData) { + let worker_request = match self.flush_memtables(&version_data.version).await { + Ok(file_metas) => { + let memtables_to_remove = version_data + .version + .memtables + .immutables() + .iter() + .map(|m| m.id()) + .collect(); + let flush_finished = FlushFinished { + region_id: self.region_id, + file_metas, + // The last entry has been flushed. + flushed_entry_id: version_data.last_entry_id, + memtables_to_remove, + senders: std::mem::take(&mut self.senders), + file_purger: self.file_purger.clone(), + }; + WorkerRequest::Background { + region_id: self.region_id, + notify: BackgroundNotify::FlushFinished(flush_finished), + } + } + Err(e) => { + error!(e; "Failed to flush region {}", self.region_id); + let err = Arc::new(e); + self.on_failure(err.clone()); + WorkerRequest::Background { + region_id: self.region_id, + notify: BackgroundNotify::FlushFailed(FlushFailed { err }), + } + } + }; + self.send_worker_request(worker_request).await; + } + + /// Flushes memtables to level 0 SSTs. + async fn flush_memtables(&self, version: &VersionRef) -> Result> { + // TODO(yingwen): Make it configurable. + let write_opts = WriteOptions::default(); + let memtables = version.memtables.immutables(); + let mut file_metas = Vec::with_capacity(memtables.len()); + + for mem in memtables { + if mem.is_empty() { + // Skip empty memtables. + continue; + } + + let file_id = FileId::random(); + let iter = mem.iter(ScanRequest::default()); + let source = Source::Iter(iter); + let mut writer = self + .access_layer + .write_sst(file_id, version.metadata.clone(), source); + let Some(sst_info) = writer.write_all(&write_opts).await? else { + // No data written. + continue; + }; + + file_metas.push(FileMeta { + region_id: version.metadata.region_id, + file_id, + time_range: sst_info.time_range, + level: 0, + file_size: sst_info.file_size, + }); + } + + let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect(); + info!( + "Successfully flush memtables, region: {}, files: {:?}", + version.metadata.region_id, file_ids + ); + + Ok(file_metas) + } + + /// Notify flush job status. + async fn send_worker_request(&self, request: WorkerRequest) { + if let Err(e) = self.request_sender.send(request).await { + error!( + "Failed to notify flush job status for region {}, request: {:?}", + self.region_id, e.0 + ); + } + } + + /// Merge two flush tasks. + fn merge(&mut self, mut other: RegionFlushTask) { + assert_eq!(self.region_id, other.region_id); + // Now we only merge senders. They share the same flush reason. + self.senders.append(&mut other.senders); + } } /// Manages background flushes of a worker. @@ -133,25 +253,25 @@ impl FlushScheduler { } } - /// Returns true if the region is stalling. - pub(crate) fn is_stalling(&self, region_id: RegionId) -> bool { - if let Some(status) = self.region_status.get(®ion_id) { - return status.stalling; - } - - false + /// Returns true if the region already requested flush. + pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool { + self.region_status.contains_key(®ion_id) } /// Schedules a flush `task` for specific `region`. - pub(crate) fn schedule_flush(&mut self, region: &MitoRegionRef, task: RegionFlushTask) { + pub(crate) fn schedule_flush( + &mut self, + region: &MitoRegionRef, + task: RegionFlushTask, + ) -> Result<()> { debug_assert_eq!(region.region_id, task.region_id); let version = region.version_control.current().version; - if version.memtables.mutable.is_empty() && version.memtables.immutable.is_none() { + if version.memtables.mutable.is_empty() && version.memtables.immutables().is_empty() { debug_assert!(!self.region_status.contains_key(®ion.region_id)); // The region has nothing to flush. task.on_success(); - return; + return Ok(()); } // Add this region to status map. @@ -160,64 +280,157 @@ impl FlushScheduler { .entry(region.region_id) .or_insert_with(|| FlushStatus::new(region.clone())); // Checks whether we can flush the region now. - if flush_status.flushing_task.is_some() { + if flush_status.flushing { // There is already a flush job running. - flush_status.stalling = true; - return; + flush_status.push_task(task); + return Ok(()); } - todo!() + // If there are pending tasks, then we should push it to pending list. + if flush_status.pending_task.is_some() { + flush_status.push_task(task); + return Ok(()); + } + + // Now we can flush the region directly. + region + .version_control + .freeze_mutable(&task.memtable_builder); + // Submit a flush job. + let job = task.into_flush_job(region); + if let Err(e) = self.scheduler.schedule(job) { + // If scheduler returns error, senders in the job will be dropped and waiters + // can get recv errors. + error!(e; "Failed to schedule flush job for region {}", region.region_id); + + // Remove from region status if we can't submit the task. + self.region_status.remove(®ion.region_id); + return Err(e); + } + flush_status.flushing = true; + + Ok(()) } - /// Add write `request` to pending queue. + /// Notifies the scheduler that the flush job is finished. /// - /// Returns error if region is not stalling. - pub(crate) fn add_write_request_to_pending( + /// Returns all pending requests if the region doesn't need to flush again. + pub(crate) fn on_flush_success( &mut self, - request: SenderWriteRequest, - ) -> Result<(), SenderWriteRequest> { - if let Some(status) = self.region_status.get_mut(&request.request.region_id) { - if status.stalling { - status.pending_writes.push(request); - return Ok(()); - } + region_id: RegionId, + ) -> Option> { + let Some(flush_status) = self.region_status.get_mut(®ion_id) else { + return None; + }; + + // This region doesn't have running flush job. + flush_status.flushing = false; + + let pending_ddls = if flush_status.pending_task.is_none() { + // The region doesn't have any pending flush task. + // Safety: The flush status exists. + let flush_status = self.region_status.remove(®ion_id).unwrap(); + Some(flush_status.pending_ddls) + } else { + None + }; + + // Schedule next flush job. + if let Err(e) = self.schedule_next_flush() { + error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id); } - Err(request) + pending_ddls + } + + /// Notifies the scheduler that the flush job is finished. + pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc) { + error!(err; "Region {} failed to flush, cancel all pending tasks", region_id); + + // Remove this region. + let Some(flush_status) = self.region_status.remove(®ion_id) else { + return; + }; + + // Fast fail: cancels all pending tasks and sends error to their waiters. + flush_status.on_failure(err); + + // Still tries to schedule a new flush. + if let Err(e) = self.schedule_next_flush() { + error!(e; "Failed to schedule next flush after region {} flush is failed", region_id); + } + } + + /// Notifies the scheduler that the region is dropped. + pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) { + // Remove this region. + let Some(flush_status) = self.region_status.remove(®ion_id) else { + return; + }; + + // Notifies all pending tasks. + flush_status.on_failure(Arc::new(RegionDroppedSnafu { region_id }.build())); + } + + /// Notifies the scheduler that the region is closed. + pub(crate) fn on_region_closed(&mut self, region_id: RegionId) { + // Remove this region. + let Some(flush_status) = self.region_status.remove(®ion_id) else { + return; + }; + + // Notifies all pending tasks. + flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build())); } /// Add ddl request to pending queue. /// - /// Returns error if region is not stalling. + /// Returns error if region doesn't request flush. pub(crate) fn add_ddl_request_to_pending( &mut self, request: SenderDdlRequest, ) -> Result<(), SenderDdlRequest> { if let Some(status) = self.region_status.get_mut(&request.region_id) { - if status.stalling { - status.pending_ddls.push(request); - return Ok(()); - } + status.pending_ddls.push(request); + return Ok(()); } Err(request) } + + /// Schedules a new flush task when the scheduler can submit next task. + pub(crate) fn schedule_next_flush(&mut self) -> Result<()> { + debug_assert!(self + .region_status + .values() + .all(|status| !status.flushing && status.pending_task.is_some())); + + // Get the first region from status map. + let Some(flush_status) = self + .region_status + .values_mut() + .find(|status| status.pending_task.is_some()) + else { + return Ok(()); + }; + debug_assert!(!flush_status.flushing); + let task = flush_status.pending_task.take().unwrap(); + let region = flush_status.region.clone(); + + self.schedule_flush(®ion, task) + } } /// Flush status of a region scheduled by the [FlushScheduler]. /// -/// Tracks running and pending flusht tasks and all pending requests of a region. +/// Tracks running and pending flush tasks and all pending requests of a region. struct FlushStatus { /// Current region. region: MitoRegionRef, - /// Current running flush task. - flushing_task: Option, - /// The number of flush requests waiting in queue. - num_queueing: usize, - /// The region is stalling. - stalling: bool, - /// Pending write requests. - pending_writes: Vec, + /// There is a flush task running. + flushing: bool, + /// Task waiting for next flush. + pending_task: Option, /// Pending ddl requests. pending_ddls: Vec, } @@ -226,11 +439,30 @@ impl FlushStatus { fn new(region: MitoRegionRef) -> FlushStatus { FlushStatus { region, - flushing_task: None, - num_queueing: 0, - stalling: false, - pending_writes: Vec::new(), + flushing: false, + pending_task: None, pending_ddls: Vec::new(), } } + + fn push_task(&mut self, task: RegionFlushTask) { + if let Some(pending) = &mut self.pending_task { + pending.merge(task); + } else { + self.pending_task = Some(task); + } + } + + fn on_failure(self, err: Arc) { + if let Some(mut task) = self.pending_task { + task.on_failure(err.clone()); + } + for ddl in self.pending_ddls { + if let Some(sender) = ddl.sender { + let _ = sender.send(Err(err.clone()).context(FlushRegionSnafu { + region_id: self.region.region_id, + })); + } + } + } } diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 689b0e877f..a82f27ee37 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -21,10 +21,11 @@ use snafu::{OptionExt, ResultExt}; use store_api::manifest::action::{ProtocolAction, ProtocolVersion}; use store_api::manifest::ManifestVersion; use store_api::metadata::RegionMetadataRef; -use store_api::storage::{RegionId, SequenceNumber}; +use store_api::storage::RegionId; use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu}; use crate::sst::file::{FileId, FileMeta}; +use crate::wal::EntryId; /// Actions that can be applied to region manifest. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] @@ -50,7 +51,7 @@ pub struct RegionEdit { pub files_to_add: Vec, pub files_to_remove: Vec, pub compaction_time_window: Option, - pub flushed_sequence: Option, + pub flushed_entry_id: Option, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] @@ -65,6 +66,8 @@ pub struct RegionManifest { pub metadata: RegionMetadataRef, /// SST files. pub files: HashMap, + /// Last WAL entry id of flushed data. + pub flushed_entry_id: EntryId, /// Current manifest version. pub manifest_version: ManifestVersion, } @@ -73,6 +76,7 @@ pub struct RegionManifest { pub struct RegionManifestBuilder { metadata: Option, files: HashMap, + flushed_entry_id: EntryId, manifest_version: ManifestVersion, } @@ -83,6 +87,7 @@ impl RegionManifestBuilder { Self { metadata: Some(s.metadata), files: s.files, + flushed_entry_id: s.flushed_entry_id, manifest_version: s.manifest_version, } } else { @@ -103,6 +108,9 @@ impl RegionManifestBuilder { for file in edit.files_to_remove { self.files.remove(&file.file_id); } + if let Some(flushed_entry_id) = edit.flushed_entry_id { + self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id); + } } /// Check if the builder keeps a [RegionMetadata](crate::metadata::RegionMetadata). @@ -115,6 +123,7 @@ impl RegionManifestBuilder { Ok(RegionManifest { metadata, files: self.files, + flushed_entry_id: self.flushed_entry_id, manifest_version: self.manifest_version, }) } @@ -217,10 +226,38 @@ mod tests { // modification to manifest-related structs is compatible with older manifests. #[test] fn test_region_manifest_compatibility() { - let region_edit = r#"{"region_version":0,"flushed_sequence":null,"files_to_add":[{"region_id":4402341478400,"file_name":"4b220a70-2b03-4641-9687-b65d94641208.parquet","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1}],"files_to_remove":[{"region_id":4402341478400,"file_name":"34b6ebb9-b8a5-4a4b-b744-56f67defad02.parquet","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0}]}"#; + let region_edit = r#"{ + "flushed_entry_id":null, + "compaction_time_window":null, + "files_to_add":[ + {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100} + ], + "files_to_remove":[ + {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100} + ] + }"#; let _ = serde_json::from_str::(region_edit).unwrap(); - let region_change = r#" {"committed_sequence":42,"metadata":{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"version":9,"primary_key":[1],"region_id":5299989648942}}"#; + let region_edit = r#"{ + "flushed_entry_id":10, + "compaction_time_window":null, + "files_to_add":[ + {"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100} + ], + "files_to_remove":[ + {"region_id":4402341478400,"file_id":"34b6ebb9-b8a5-4a4b-b744-56f67defad02","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0,"file_size":100} + ] + }"#; + let _ = serde_json::from_str::(region_edit).unwrap(); + + let region_change = r#" { + "metadata":{ + "column_metadatas":[ + {"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3} + ], + "primary_key":[1], + "region_id":5299989648942} + }"#; let _ = serde_json::from_str::(region_change).unwrap(); let region_remove = r#"{"region_id":42}"#; diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 4d2162af30..1c6987faa8 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -57,7 +57,7 @@ fn nop_action() -> RegionMetaActionList { files_to_add: vec![], files_to_remove: vec![], compaction_time_window: None, - flushed_sequence: None, + flushed_entry_id: None, })]) } @@ -149,7 +149,7 @@ async fn manager_with_checkpoint_distance_1() { .await .unwrap(); let raw_json = std::str::from_utf8(&raw_bytes).unwrap(); - let expected_json = "{\"size\":729,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; + let expected_json = "{\"size\":750,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}"; assert_eq!(expected_json, raw_json); // reopen the manager @@ -175,7 +175,7 @@ async fn checkpoint_with_different_compression_types() { files_to_add: vec![file_meta], files_to_remove: vec![], compaction_time_window: None, - flushed_sequence: None, + flushed_entry_id: None, })]); actions.push(action); } diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index aaea9ae33f..37e80108dd 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -16,6 +16,8 @@ use std::sync::Arc; +use smallvec::SmallVec; + use crate::memtable::MemtableRef; /// A version of current memtables in a region. @@ -23,8 +25,12 @@ use crate::memtable::MemtableRef; pub(crate) struct MemtableVersion { /// Mutable memtable. pub(crate) mutable: MemtableRef, - /// Immutable memtable. - pub(crate) immutable: Option, + /// Immutable memtables. + /// + /// We only allow one flush job per region but if a flush job failed, then we + /// might need to store more than one immutable memtable on the next time we + /// flush the region. + immutables: SmallVec<[MemtableRef; 2]>, } pub(crate) type MemtableVersionRef = Arc; @@ -34,38 +40,54 @@ impl MemtableVersion { pub(crate) fn new(mutable: MemtableRef) -> MemtableVersion { MemtableVersion { mutable, - immutable: None, + immutables: SmallVec::new(), } } + /// Immutable memtables. + pub(crate) fn immutables(&self) -> &[MemtableRef] { + &self.immutables + } + /// Lists mutable and immutable memtables. pub(crate) fn list_memtables(&self) -> Vec { - if let Some(immutable) = &self.immutable { - vec![self.mutable.clone(), immutable.clone()] - } else { - vec![self.mutable.clone()] - } + let mut mems = Vec::with_capacity(self.immutables.len() + 1); + mems.push(self.mutable.clone()); + mems.extend_from_slice(&self.immutables); + mems } /// Returns a new [MemtableVersion] which switches the old mutable memtable to immutable /// memtable. /// - /// Returns `None` if immutable memtable is `Some`. + /// Returns `None` if the mutable memtable is empty. #[must_use] pub(crate) fn freeze_mutable(&self, mutable: MemtableRef) -> Option { - debug_assert!(self.mutable.is_empty()); - if self.immutable.is_some() { - // There is already an immutable memtable. + debug_assert!(mutable.is_empty()); + if self.mutable.is_empty() { + // No need to freeze the mutable memtable. return None; } // Marks the mutable memtable as immutable so it can free the memory usage from our // soft limit. self.mutable.mark_immutable(); - + // Pushes the mutable memtable to immutable list. + let immutables = self + .immutables + .iter() + .cloned() + .chain([self.mutable.clone()]) + .collect(); Some(MemtableVersion { mutable, - immutable: Some(self.mutable.clone()), + immutables, }) } + + /// Returns the memory usage of the mutable memtable. + pub(crate) fn mutable_bytes_usage(&self) -> usize { + // TODO(yingwen): Get memtable usage. + 0 + } } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index fb1629f61f..b420726241 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -541,21 +541,6 @@ impl BatchBuilder { } } -/// Collected [Source] statistics. -#[derive(Debug, Clone)] -pub struct SourceStats { - /// Number of rows fetched. - pub num_rows: usize, - /// Min timestamp from fetched batches. - /// - /// If no rows fetched, the value of the timestamp is i64::MIN. - pub min_timestamp: Timestamp, - /// Max timestamp from fetched batches. - /// - /// If no rows fetched, the value of the timestamp is i64::MAX. - pub max_timestamp: Timestamp, -} - /// Async [Batch] reader and iterator wrapper. /// /// This is the data source for SST writers or internal readers. @@ -574,12 +559,6 @@ impl Source { Source::Iter(iter) => iter.next().transpose(), } } - - // TODO(yingwen): Remove this method once we support collecting stats in the writer. - /// Returns statisics of fetched batches. - pub(crate) fn stats(&self) -> SourceStats { - unimplemented!() - } } /// Async batch reader. diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 5a05373faf..a421057c22 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -44,6 +44,16 @@ impl Scanner { } } +#[cfg(test)] +impl Scanner { + /// Returns number of files to scan. + pub(crate) fn num_files(&self) -> usize { + match self { + Scanner::Seq(seq_scan) => seq_scan.num_files(), + } + } +} + #[cfg_attr(doc, aquamarine::aquamarine)] /// Helper to scans a region by [ScanRequest]. /// diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index e2c42ab918..67168a63b0 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -137,3 +137,11 @@ impl SeqScan { Ok(stream) } } + +#[cfg(test)] +impl SeqScan { + /// Returns number of SST files to scan. + pub(crate) fn num_files(&self) -> usize { + self.files.len() + } +} diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 0b7edbd37b..920b5b5714 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -18,10 +18,11 @@ pub(crate) mod opener; pub(crate) mod version; use std::collections::HashMap; -use std::sync::atomic::AtomicI64; +use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::{Arc, RwLock}; use common_telemetry::info; +use common_time::util::current_time_millis; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; @@ -62,11 +63,14 @@ pub(crate) struct MitoRegion { pub(crate) type MitoRegionRef = Arc; impl MitoRegion { - /// Stop background tasks for this region. + /// Stop background managers for this region. pub(crate) async fn stop(&self) -> Result<()> { self.manifest_manager.stop().await?; - info!("Stopped region, region_id: {}", self.region_id); + info!( + "Stopped region manifest manager, region_id: {}", + self.region_id + ); Ok(()) } @@ -82,6 +86,17 @@ impl MitoRegion { let version_data = self.version_control.current(); version_data.version } + + /// Returns last flush timestamp in millis. + pub(crate) fn last_flush_millis(&self) -> i64 { + self.last_flush_millis.load(Ordering::Relaxed) + } + + /// Update flush time to current time. + pub(crate) fn update_flush_millis(&self) { + let now = current_time_millis(); + self.last_flush_millis.store(now, Ordering::Relaxed); + } } /// Regions indexed by ids. diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index 149dbe7d24..fc6b281216 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -28,9 +28,10 @@ use std::sync::{Arc, RwLock}; use store_api::metadata::RegionMetadataRef; use store_api::storage::SequenceNumber; -use crate::flush::RegionMemtableStats; +use crate::manifest::action::RegionEdit; use crate::memtable::version::{MemtableVersion, MemtableVersionRef}; -use crate::memtable::{MemtableBuilderRef, MemtableId, MemtableRef}; +use crate::memtable::{MemtableBuilderRef, MemtableRef}; +use crate::sst::file_purger::FilePurgerRef; use crate::sst::version::{SstVersion, SstVersionRef}; use crate::wal::EntryId; @@ -68,16 +69,13 @@ impl VersionControl { data.last_entry_id = entry_id; } - /// Freezes the mutable memtable and returns the id of the frozen memtable. - /// - /// If the mutable memtable is empty or there is already an immutable memtable, returns `None`. - pub(crate) fn freeze_mutable(&self, builder: &MemtableBuilderRef) -> Option { + /// Freezes the mutable memtable if it is not empty. + pub(crate) fn freeze_mutable(&self, builder: &MemtableBuilderRef) { let version = self.current().version; - if version.memtables.mutable.is_empty() || version.memtables.immutable.is_some() { - return None; + if version.memtables.mutable.is_empty() { + return; } let new_mutable = builder.build(&version.metadata); - let mutable_id = version.memtables.mutable.id(); // Safety: Immutable memtable is None. let new_memtables = version.memtables.freeze_mutable(new_mutable).unwrap(); // Create a new version with memtable switched. @@ -89,7 +87,19 @@ impl VersionControl { let mut version_data = self.data.write().unwrap(); version_data.version = new_version; - Some(mutable_id) + } + + /// Apply edit to current version. + pub(crate) fn apply_edit(&self, edit: RegionEdit, purger: FilePurgerRef) { + let version = self.current().version; + let new_version = Arc::new( + VersionBuilder::from_version(version) + .apply_edit(edit, purger) + .build(), + ); + + let mut version_data = self.data.write().unwrap(); + version_data.version = new_version; } /// Mark all opened files as deleted and set the delete marker in [VersionControlData] @@ -136,17 +146,6 @@ pub(crate) struct Version { pub(crate) type VersionRef = Arc; -impl Version { - /// Returns statistics of the mutable memtable. - pub(crate) fn mutable_stats(&self) -> RegionMemtableStats { - // TODO(yingwen): Get from memtable. - RegionMemtableStats { - bytes_mutable: 0, - write_buffer_size: 0, - } - } -} - /// Version builder. pub(crate) struct VersionBuilder { metadata: RegionMetadataRef, @@ -182,6 +181,25 @@ impl VersionBuilder { self } + /// Apply edit to the builder. + pub(crate) fn apply_edit( + mut self, + edit: RegionEdit, + file_purger: FilePurgerRef, + ) -> VersionBuilder { + if let Some(flushed_entry_id) = edit.flushed_entry_id { + self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id); + } + if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() { + let mut ssts = (*self.ssts).clone(); + ssts.add_files(file_purger, edit.files_to_add.into_iter()); + ssts.remove_files(edit.files_to_remove.into_iter()); + self.ssts = Arc::new(ssts); + } + + self + } + /// Builds a new [Version] from the builder. pub(crate) fn build(self) -> Version { Version { diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 2d31c4bfdd..0540a7afb3 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -15,6 +15,7 @@ //! Worker requests. use std::collections::HashMap; +use std::sync::Arc; use std::time::Duration; use api::helper::{ @@ -25,6 +26,7 @@ use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value}; use common_base::readable_size::ReadableSize; use common_query::Output; use datatypes::prelude::DataType; +use smallvec::SmallVec; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::region_request::{ @@ -35,8 +37,13 @@ use store_api::storage::{CompactionStrategy, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::config::DEFAULT_WRITE_BUFFER_SIZE; -use crate::error::{CreateDefaultSnafu, Error, FillDefaultSnafu, InvalidRequestSnafu, Result}; +use crate::error::{ + CreateDefaultSnafu, Error, FillDefaultSnafu, FlushRegionSnafu, InvalidRequestSnafu, Result, +}; +use crate::memtable::MemtableId; use crate::sst::file::FileMeta; +use crate::sst::file_purger::{FilePurgerRef, PurgeRequest}; +use crate::wal::EntryId; /// Options that affect the entire region. /// @@ -374,6 +381,7 @@ pub(crate) struct SenderWriteRequest { } /// Request sent to a worker +#[derive(Debug)] pub(crate) enum WorkerRequest { /// Write to a region. Write(SenderWriteRequest), @@ -491,15 +499,51 @@ pub(crate) enum BackgroundNotify { /// Notifies a flush job is finished. #[derive(Debug)] pub(crate) struct FlushFinished { - /// Meta of the flushed SST. - pub(crate) file_meta: FileMeta, + /// Region id. + pub(crate) region_id: RegionId, + /// Meta of the flushed SSTs. + pub(crate) file_metas: Vec, + /// Entry id of flushed data. + pub(crate) flushed_entry_id: EntryId, + /// Id of memtables to remove. + pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>, + /// Flush result senders. + pub(crate) senders: Vec>>, + /// File purger for cleaning files on failure. + pub(crate) file_purger: FilePurgerRef, +} + +impl FlushFinished { + pub(crate) fn on_failure(self, err: Error) { + let err = Arc::new(err); + for sender in self.senders { + // Ignore send result. + let _ = sender.send(Err(err.clone()).context(FlushRegionSnafu { + region_id: self.region_id, + })); + } + // Clean flushed files. + for file in self.file_metas { + self.file_purger.send_request(PurgeRequest { + region_id: file.region_id, + file_id: file.file_id, + }); + } + } + + pub(crate) fn on_success(self) { + for sender in self.senders { + // Ignore send result. + let _ = sender.send(Ok(Output::AffectedRows(0))); + } + } } /// Notifies a flush job is failed. #[derive(Debug)] pub(crate) struct FlushFailed { - /// The reason of a failed flush job. - pub(crate) error: Error, + /// The error source of the failure. + pub(crate) err: Arc, } #[cfg(test)] diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index c8eb1dbea4..1f461735ee 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -24,6 +24,8 @@ use crate::sst::file::FileTimeRange; /// Key of metadata in parquet SST. pub const PARQUET_METADATA_KEY: &str = "greptime:metadata"; +const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8); +const DEFAULT_ROW_GROUP_SIZE: usize = 100000; /// Parquet write options. #[derive(Debug)] @@ -34,6 +36,15 @@ pub struct WriteOptions { pub row_group_size: usize, } +impl Default for WriteOptions { + fn default() -> Self { + WriteOptions { + write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, + row_group_size: DEFAULT_ROW_GROUP_SIZE, + } + } +} + /// Parquet SST info returned by the writer. pub struct SstInfo { /// Time range of the SST. diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index f5f9c04571..14d6a9da3e 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -15,6 +15,7 @@ //! Parquet writer. use common_telemetry::debug; +use common_time::Timestamp; use object_store::ObjectStore; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; @@ -25,7 +26,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::SEQUENCE_COLUMN_NAME; use crate::error::{InvalidMetadataSnafu, Result}; -use crate::read::Source; +use crate::read::{Batch, Source}; use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; use crate::sst::stream_writer::BufferedWriter; @@ -95,13 +96,13 @@ impl ParquetWriter { ) .await?; + let mut stats = SourceStats::default(); while let Some(batch) = self.source.next_batch().await? { + stats.update(&batch); let arrow_batch = write_format.convert_batch(&batch)?; buffered_writer.write(&arrow_batch).await?; } - // Get stats from the source. - let stats = self.source.stats(); if stats.num_rows == 0 { debug!( @@ -114,7 +115,8 @@ impl ParquetWriter { } let (_file_meta, file_size) = buffered_writer.close().await?; - let time_range = (stats.min_timestamp, stats.max_timestamp); + // Safety: num rows > 0 so we must have min/max. + let time_range = stats.time_range.unwrap(); // object_store.write will make sure all bytes are written or an error is raised. Ok(Some(SstInfo { @@ -125,4 +127,33 @@ impl ParquetWriter { } } +#[derive(Default)] +struct SourceStats { + /// Number of rows fetched. + num_rows: usize, + /// Time range of fetched batches. + time_range: Option<(Timestamp, Timestamp)>, +} + +impl SourceStats { + fn update(&mut self, batch: &Batch) { + if batch.is_empty() { + return; + } + + self.num_rows += batch.num_rows(); + // Safety: batch is not empty. + let (min_in_batch, max_in_batch) = ( + batch.first_timestamp().unwrap(), + batch.last_timestamp().unwrap(), + ); + if let Some(time_range) = &mut self.time_range { + time_range.0 = time_range.0.min(min_in_batch); + time_range.1 = time_range.1.max(max_in_batch); + } else { + self.time_range = Some((min_in_batch, max_in_batch)); + } + } +} + // TODO(yingwen): Port tests. diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs index 05c9d0f741..be71ebb1e2 100644 --- a/src/mito2/src/sst/version.rs +++ b/src/mito2/src/sst/version.rs @@ -17,10 +17,11 @@ use std::collections::HashMap; use std::fmt; use std::sync::Arc; -use crate::sst::file::{FileHandle, FileId, Level, MAX_LEVEL}; +use crate::sst::file::{FileHandle, FileId, FileMeta, Level, MAX_LEVEL}; +use crate::sst::file_purger::FilePurgerRef; /// A version of all SSTs in a region. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct SstVersion { /// SST metadata organized by levels. levels: LevelMetaArray, @@ -41,6 +42,37 @@ impl SstVersion { &self.levels } + /// Add files to the version. + /// + /// # Panics + /// Panics if level of [FileMeta] is greater than [MAX_LEVEL]. + pub(crate) fn add_files( + &mut self, + file_purger: FilePurgerRef, + files_to_add: impl Iterator, + ) { + for file in files_to_add { + let level = file.level; + let handle = FileHandle::new(file, file_purger.clone()); + let file_id = handle.file_id(); + let old = self.levels[level as usize].files.insert(file_id, handle); + assert!(old.is_none(), "Adds an existing file: {file_id}"); + } + } + + /// Remove files from the version. + /// + /// # Panics + /// Panics if level of [FileMeta] is greater than [MAX_LEVEL]. + pub(crate) fn remove_files(&mut self, files_to_remove: impl Iterator) { + for file in files_to_remove { + let level = file.level; + if let Some(handle) = self.levels[level as usize].files.remove(&file.file_id) { + handle.mark_deleted(); + } + } + } + /// Mark all SSTs in this version as deleted. pub(crate) fn mark_all_deleted(&self) { for level_meta in self.levels.iter() { @@ -56,6 +88,7 @@ impl SstVersion { type LevelMetaArray = [LevelMeta; MAX_LEVEL as usize]; /// Metadata of files in the same SST level. +#[derive(Clone)] pub struct LevelMeta { /// Level number. pub level: Level, diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 44934bbab0..aed26de4fd 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -193,6 +193,7 @@ impl WorkerStarter { config: self.config, regions: regions.clone(), dropping_regions: Arc::new(RegionMap::default()), + sender: sender.clone(), receiver, wal: Wal::new(self.log_store), object_store: self.object_store, @@ -308,6 +309,8 @@ struct RegionWorkerLoop { regions: RegionMapRef, /// Regions that are not yet fully dropped. dropping_regions: RegionMapRef, + /// Request sender. + sender: Sender, /// Request receiver. receiver: Receiver, /// WAL of the engine. @@ -404,10 +407,15 @@ impl RegionWorkerLoop { for ddl in ddl_requests { let res = match ddl.request { DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await, + DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await, DdlRequest::Open(req) => self.handle_open_request(ddl.region_id, req).await, DdlRequest::Close(_) => self.handle_close_request(ddl.region_id).await, - DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await, - DdlRequest::Alter(_) | DdlRequest::Flush(_) | DdlRequest::Compact(_) => todo!(), + DdlRequest::Alter(_) => todo!(), + DdlRequest::Flush(_) => { + self.handle_flush_request(ddl.region_id, ddl.sender).await; + continue; + } + DdlRequest::Compact(_) => todo!(), }; if let Some(sender) = ddl.sender { @@ -416,9 +424,7 @@ impl RegionWorkerLoop { } } } -} -impl RegionWorkerLoop { /// Handles region background request async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) { match notify { @@ -428,7 +434,9 @@ impl RegionWorkerLoop { BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await, } } +} +impl RegionWorkerLoop { // Clean up the worker. async fn clean(&self) { // Closes remaining regions. diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index e95562c0e9..03899dd59f 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -31,8 +31,8 @@ impl RegionWorkerLoop { region.stop().await?; self.regions.remove_region(region_id); - - // TODO(yingwen): Clean flush status. + // Clean flush status. + self.flush_scheduler.on_region_closed(region_id); info!("Region {} closed", region_id); diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 7a7d07b981..f1f3076045 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -40,7 +40,6 @@ impl RegionWorkerLoop { }; info!("Try to drop region: {}", region_id); - region.stop().await?; // write dropping marker let marker_path = join_path(region.access_layer.region_dir(), DROPPING_MARKER_FILE); @@ -49,9 +48,12 @@ impl RegionWorkerLoop { .await .context(OpenDalSnafu)?; + region.stop().await?; // remove this region from region map to prevent other requests from accessing this region self.regions.remove_region(region_id); self.dropping_regions.insert_region(region.clone()); + // Notifies flush scheduler. + self.flush_scheduler.on_region_dropped(region_id); // mark region version as dropped region.version_control.mark_dropped(); diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 6e523c2bc4..7d059da8e1 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -14,81 +14,168 @@ //! Handling flush related requests. -use store_api::region_request::RegionFlushRequest; +use common_query::Output; +use common_telemetry::{error, info}; +use common_time::util::current_time_millis; +use store_api::logstore::LogStore; use store_api::storage::RegionId; +use tokio::sync::oneshot; +use crate::error::{RegionNotFoundSnafu, Result}; use crate::flush::{FlushReason, RegionFlushTask}; +use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::region::MitoRegionRef; use crate::request::{FlushFailed, FlushFinished}; use crate::worker::RegionWorkerLoop; -impl RegionWorkerLoop { - /// Handles manual flush request. - pub(crate) async fn handle_flush( - &mut self, - _region_id: RegionId, - _request: RegionFlushRequest, - ) { - // TODO(yingwen): schedule flush. - unimplemented!() - } - +impl RegionWorkerLoop { /// On region flush job finished. pub(crate) async fn handle_flush_finished( &mut self, - _region_id: RegionId, - _request: FlushFinished, + region_id: RegionId, + mut request: FlushFinished, ) { - // TODO(yingwen): - // 1. check region existence - // 2. write manifest - // 3. update region metadata. - // 4. handle all pending requests. - // 5. remove flushed files if the region is dropped. - unimplemented!() + let Some(region) = self.regions.get_region(region_id) else { + // We may dropped or closed the region. + request.on_failure(RegionNotFoundSnafu { region_id }.build()); + return; + }; + + // Write region edit to manifest. + let edit = RegionEdit { + files_to_add: std::mem::take(&mut request.file_metas), + files_to_remove: Vec::new(), + compaction_time_window: None, + flushed_entry_id: Some(request.flushed_entry_id), + }; + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); + if let Err(e) = region.manifest_manager.update(action_list).await { + error!(e; "Failed to write manifest, region: {}", region_id); + request.on_failure(e); + return; + } + + // Apply edit to region's version. + region + .version_control + .apply_edit(edit, region.file_purger.clone()); + + // Delete wal. + info!( + "Region {} flush finished, tries to bump wal to {}", + region_id, request.flushed_entry_id + ); + if let Err(e) = self.wal.obsolete(region_id, request.flushed_entry_id).await { + error!(e; "Failed to write wal, region: {}", region_id); + request.on_failure(e); + return; + } + + // Handle pending requests of the region. + if let Some(ddl_requests) = self.flush_scheduler.on_flush_success(region_id) { + self.handle_ddl_requests(ddl_requests).await; + } + + // Notifies waiters. + request.on_success(); + } +} + +impl RegionWorkerLoop { + /// Handles manual flush request. + pub(crate) async fn handle_flush_request( + &mut self, + region_id: RegionId, + sender: Option>>, + ) { + let Some(region) = self.regions.get_region(region_id) else { + if let Some(sender) = sender { + let _ = sender.send(RegionNotFoundSnafu { region_id }.fail()); + } + return; + }; + + let mut task = self.new_flush_task(®ion, FlushReason::Manual); + if let Some(sender) = sender { + task.senders.push(sender); + } + if let Err(e) = self.flush_scheduler.schedule_flush(®ion, task) { + error!(e; "Failed to schedule flush task for region {}", region.region_id); + } } /// On region flush job failed. - pub(crate) async fn handle_flush_failed( - &mut self, - _region_id: RegionId, - _request: FlushFailed, - ) { - // TODO(yingwen): fail all pending requests. - unimplemented!() + pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) { + self.flush_scheduler.on_flush_failed(region_id, request.err); } /// Checks whether the engine reaches flush threshold. If so, finds regions in this /// worker to flush. - pub(crate) fn maybe_flush_worker(&self) { + pub(crate) fn maybe_flush_worker(&mut self) { if !self.write_buffer_manager.should_flush_engine() { // No need to flush worker. return; } + // If the engine needs flush, each worker will find some regions to flush. We might // flush more memory than expect but it should be acceptable. - self.find_regions_to_flush(); + if let Err(e) = self.flush_regions_on_engine_full() { + error!(e; "Failed to flush worker"); + } } /// Find some regions to flush to reduce write buffer usage. - pub(crate) fn find_regions_to_flush(&self) { - unimplemented!() + fn flush_regions_on_engine_full(&mut self) -> Result<()> { + let regions = self.regions.list_regions(); + let now = current_time_millis(); + let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64; + let mut max_mutable_size = 0; + // Region with max mutable memtable size. + let mut max_mem_region = None; + + for region in ®ions { + if self.flush_scheduler.is_flush_requested(region.region_id) { + // Already flushing. + continue; + } + + let version = region.version(); + let region_mutable_size = version.memtables.mutable_bytes_usage(); + // Tracks region with max mutable memtable size. + if region_mutable_size > max_mutable_size { + max_mem_region = Some(region); + max_mutable_size = region_mutable_size; + } + + if region.last_flush_millis() < min_last_flush_time { + // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region. + let task = self.new_flush_task(region, FlushReason::EngineFull); + self.flush_scheduler.schedule_flush(region, task)?; + } + } + + // Flush memtable with max mutable memtable. + // TODO(yingwen): Maybe flush more tables to reduce write buffer size. + if let Some(region) = max_mem_region { + if !self.flush_scheduler.is_flush_requested(region.region_id) { + let task = self.new_flush_task(region, FlushReason::EngineFull); + self.flush_scheduler.schedule_flush(region, task)?; + } + } + + Ok(()) } - /// Flush a region if it meets flush requirements. - pub(crate) fn flush_region_if_full(&mut self, region: &MitoRegionRef) { - let version_data = region.version_control.current(); - if self - .write_buffer_manager - .should_flush_region(version_data.version.mutable_stats()) - { - // We need to flush this region. - let task = RegionFlushTask { - region_id: region.region_id, - reason: FlushReason::MemtableFull, - sender: None, - }; - self.flush_scheduler.schedule_flush(region, task); + fn new_flush_task(&self, region: &MitoRegionRef, reason: FlushReason) -> RegionFlushTask { + // TODO(yingwen): metrics for flush requested. + RegionFlushTask { + region_id: region.region_id, + reason, + senders: Vec::new(), + request_sender: self.sender.clone(), + access_layer: region.access_layer.clone(), + memtable_builder: self.memtable_builder.clone(), + file_purger: region.file_purger.clone(), } } } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 8832764e8c..38098aa10d 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -48,21 +48,21 @@ impl RegionWorkerLoop { // Write to WAL. let mut wal_writer = self.wal.writer(); - for region_ctx in region_ctxs.values_mut().filter_map(|v| v.write_ctx_mut()) { + for region_ctx in region_ctxs.values_mut() { if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) { region_ctx.set_error(e); } } if let Err(e) = wal_writer.write_to_wal().await.map_err(Arc::new) { // Failed to write wal. - for mut region_ctx in region_ctxs.into_values().filter_map(|v| v.into_write_ctx()) { + for mut region_ctx in region_ctxs.into_values() { region_ctx.set_error(e.clone()); } return; } // Write to memtables. - for mut region_ctx in region_ctxs.into_values().filter_map(|v| v.into_write_ctx()) { + for mut region_ctx in region_ctxs.into_values() { region_ctx.write_memtable(); } } @@ -73,7 +73,7 @@ impl RegionWorkerLoop { fn prepare_region_write_ctx( &mut self, write_requests: Vec, - ) -> HashMap { + ) -> HashMap { // Initialize region write context map. let mut region_ctxs = HashMap::new(); for mut sender_req in write_requests { @@ -88,36 +88,13 @@ impl RegionWorkerLoop { continue; }; - // A new region to write, checks whether we need to flush this region. - self.flush_region_if_full(®ion); - // Checks whether the region is stalling. - let maybe_stalling = if self.flush_scheduler.is_stalling(region_id) { - // Region is stalling so there is no write context for it. - MaybeStalling::Stalling - } else { - // Initialize the context. - MaybeStalling::Writable(RegionWriteCtx::new( - region.region_id, - ®ion.version_control, - )) - }; + let region_ctx = RegionWriteCtx::new(region.region_id, ®ion.version_control); - e.insert(maybe_stalling); + e.insert(region_ctx); } // Safety: Now we ensure the region exists. - let maybe_stalling = region_ctxs.get_mut(®ion_id).unwrap(); - - // Get stalling status of a region. - let MaybeStalling::Writable(region_ctx) = maybe_stalling else { - // If this region is stalling, we need to add requests to pending queue - // and write to the region later. - // Safety: We have checked the region is stalling. - self.flush_scheduler - .add_write_request_to_pending(sender_req) - .unwrap(); - continue; - }; + let region_ctx = region_ctxs.get_mut(®ion_id).unwrap(); // Checks whether request schema is compatible with region schema. if let Err(e) = @@ -147,32 +124,6 @@ impl RegionWorkerLoop { } } -/// An entry to store the write context or stalling flag. -enum MaybeStalling { - /// The region is writable. - Writable(RegionWriteCtx), - /// The region is stalling and we should not write to it. - Stalling, -} - -impl MaybeStalling { - /// Converts itself to a [RegionWriteCtx] if it is writable. - fn into_write_ctx(self) -> Option { - match self { - MaybeStalling::Writable(v) => Some(v), - MaybeStalling::Stalling => None, - } - } - - /// Gets a mutable reference of [RegionWriteCtx] if it is writable. - fn write_ctx_mut(&mut self) -> Option<&mut RegionWriteCtx> { - match self { - MaybeStalling::Writable(v) => Some(v), - MaybeStalling::Stalling => None, - } - } -} - /// Send rejected error to all `write_requests`. fn reject_write_requests(_write_requests: Vec) { unimplemented!()