diff --git a/Cargo.lock b/Cargo.lock index 3bd9678485..b10396cb7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5901,6 +5901,7 @@ dependencies = [ "rskafka", "rstest", "rstest_reuse", + "scopeguard", "serde", "serde_json", "serde_with", diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 3994ebb439..4676beaeff 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -58,6 +58,7 @@ regex = "1.5" rskafka = { workspace = true, optional = true } rstest = { workspace = true, optional = true } rstest_reuse = { workspace = true, optional = true } +scopeguard = "1.2" serde.workspace = true serde_json.workspace = true serde_with.workspace = true diff --git a/src/mito2/src/manifest.rs b/src/mito2/src/manifest.rs index 0603c9c9af..99cd93282c 100644 --- a/src/mito2/src/manifest.rs +++ b/src/mito2/src/manifest.rs @@ -15,6 +15,7 @@ //! manifest storage pub mod action; +mod checkpointer; pub mod manager; pub mod storage; #[cfg(test)] diff --git a/src/mito2/src/manifest/checkpointer.rs b/src/mito2/src/manifest/checkpointer.rs new file mode 100644 index 0000000000..c0e5b0d35a --- /dev/null +++ b/src/mito2/src/manifest/checkpointer.rs @@ -0,0 +1,177 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; + +use common_telemetry::{error, info}; +use store_api::manifest::{ManifestVersion, MIN_VERSION}; +use store_api::storage::RegionId; + +use crate::manifest::action::{RegionCheckpoint, RegionManifest}; +use crate::manifest::manager::RegionManifestOptions; +use crate::manifest::storage::ManifestObjectStore; +use crate::metrics::MANIFEST_OP_ELAPSED; + +/// [`Checkpointer`] is responsible for doing checkpoint for a region, in an asynchronous way. +#[derive(Debug)] +pub(crate) struct Checkpointer { + manifest_options: RegionManifestOptions, + inner: Arc, +} + +#[derive(Debug)] +struct Inner { + region_id: RegionId, + manifest_store: ManifestObjectStore, + last_checkpoint_version: AtomicU64, + is_doing_checkpoint: AtomicBool, +} + +impl Inner { + async fn do_checkpoint(&self, checkpoint: RegionCheckpoint) { + let _guard = scopeguard::guard(&self.is_doing_checkpoint, |x| { + x.store(false, Ordering::Relaxed); + }); + + let _t = MANIFEST_OP_ELAPSED + .with_label_values(&["checkpoint"]) + .start_timer(); + + let region_id = self.region_id(); + let version = checkpoint.last_version(); + let checkpoint = match checkpoint.encode() { + Ok(checkpoint) => checkpoint, + Err(e) => { + error!(e; "Failed to encode checkpoint {:?}", checkpoint); + return; + } + }; + if let Err(e) = self + .manifest_store + .save_checkpoint(version, &checkpoint) + .await + { + error!(e; "Failed to save checkpoint for region {}", region_id); + return; + } + + if let Err(e) = self.manifest_store.delete_until(version, true).await { + error!(e; "Failed to delete manifest actions until version {} for region {}", version, region_id); + return; + } + + self.last_checkpoint_version + .store(version, Ordering::Relaxed); + + info!( + "Checkpoint for region {} success, version: {}", + region_id, version + ); + } + + fn region_id(&self) -> RegionId { + self.region_id + } + + fn is_doing_checkpoint(&self) -> bool { + self.is_doing_checkpoint.load(Ordering::Relaxed) + } + + fn set_doing_checkpoint(&self) { + self.is_doing_checkpoint.store(true, Ordering::Relaxed); + } +} + +impl Checkpointer { + pub(crate) fn new( + region_id: RegionId, + manifest_options: RegionManifestOptions, + manifest_store: ManifestObjectStore, + last_checkpoint_version: ManifestVersion, + ) -> Self { + Self { + manifest_options, + inner: Arc::new(Inner { + region_id, + manifest_store, + last_checkpoint_version: AtomicU64::new(last_checkpoint_version), + is_doing_checkpoint: AtomicBool::new(false), + }), + } + } + + pub(crate) fn last_checkpoint_version(&self) -> ManifestVersion { + self.inner.last_checkpoint_version.load(Ordering::Relaxed) + } + + /// Check if it's needed to do checkpoint for the region by the checkpoint distance. + /// If needed, and there's no currently running checkpoint task, it will start a new checkpoint + /// task running in the background. + pub(crate) fn maybe_do_checkpoint(&self, manifest: &RegionManifest) { + if self.manifest_options.checkpoint_distance == 0 { + return; + } + + let last_checkpoint_version = self.last_checkpoint_version(); + if manifest.manifest_version - last_checkpoint_version + < self.manifest_options.checkpoint_distance + { + return; + } + + // We can simply check whether there's a running checkpoint task like this, all because of + // the caller of this function is ran single threaded, inside the lock of RegionManifestManager. + if self.inner.is_doing_checkpoint() { + return; + } + + let start_version = if last_checkpoint_version == 0 { + // Checkpoint version can't be zero by implementation. + // So last checkpoint version is zero means no last checkpoint. + MIN_VERSION + } else { + last_checkpoint_version + 1 + }; + let end_version = manifest.manifest_version; + info!( + "Start doing checkpoint for region {}, compacted version: [{}, {}]", + self.inner.region_id(), + start_version, + end_version, + ); + + let checkpoint = RegionCheckpoint { + last_version: end_version, + compacted_actions: (end_version - start_version + 1) as usize, + checkpoint: Some(manifest.clone()), + }; + self.do_checkpoint(checkpoint); + } + + fn do_checkpoint(&self, checkpoint: RegionCheckpoint) { + self.inner.set_doing_checkpoint(); + + let inner = self.inner.clone(); + common_runtime::spawn_bg(async move { + inner.do_checkpoint(checkpoint).await; + }); + } + + #[cfg(test)] + pub(crate) fn is_doing_checkpoint(&self) -> bool { + self.inner.is_doing_checkpoint() + } +} diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 777f9a47e4..1ef6980541 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -28,6 +28,7 @@ use crate::manifest::action::{ RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction, RegionMetaActionList, }; +use crate::manifest::checkpointer::Checkpointer; use crate::manifest::storage::{file_version, is_delta_file, ManifestObjectStore}; use crate::metrics::MANIFEST_OP_ELAPSED; @@ -112,10 +113,8 @@ pub struct RegionManifestOptions { #[derive(Debug)] pub struct RegionManifestManager { store: ManifestObjectStore, - options: RegionManifestOptions, last_version: ManifestVersion, - /// The last version included in checkpoint file. - last_checkpoint_version: ManifestVersion, + checkpointer: Checkpointer, manifest: Arc, stopped: bool, } @@ -150,6 +149,7 @@ impl RegionManifestManager { }, ); let manifest = manifest_builder.try_build()?; + let region_id = metadata.region_id; debug!( "Build region manifest in {}, manifest: {:?}", @@ -161,11 +161,11 @@ impl RegionManifestManager { RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata })); store.save(version, &action_list.encode()?).await?; + let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION); Ok(Self { store, - options, last_version: version, - last_checkpoint_version: MIN_VERSION, + checkpointer, manifest: Arc::new(manifest), stopped: false, }) @@ -215,8 +215,7 @@ impl RegionManifestManager { }; // apply actions from storage - let manifests = store.scan(version, MAX_VERSION).await?; - let manifests = store.fetch_manifests(&manifests).await?; + let manifests = store.fetch_manifests(version, MAX_VERSION).await?; for (manifest_version, raw_action_list) in manifests { let action_list = RegionMetaActionList::decode(&raw_action_list)?; @@ -256,11 +255,16 @@ impl RegionManifestManager { ); let version = manifest.manifest_version; + let checkpointer = Checkpointer::new( + manifest.metadata.region_id, + options, + store.clone(), + last_checkpoint_version, + ); Ok(Some(Self { store, - options, last_version: version, - last_checkpoint_version, + checkpointer, manifest: Arc::new(manifest), stopped: false, })) @@ -310,7 +314,9 @@ impl RegionManifestManager { } let new_manifest = manifest_builder.try_build()?; self.manifest = Arc::new(new_manifest); - self.may_do_checkpoint(version).await?; + + self.checkpointer + .maybe_do_checkpoint(self.manifest.as_ref()); Ok(version) } @@ -364,100 +370,6 @@ impl RegionManifestManager { self.last_version } - async fn may_do_checkpoint(&mut self, version: ManifestVersion) -> Result<()> { - if version - self.last_checkpoint_version >= self.options.checkpoint_distance - && self.options.checkpoint_distance != 0 - { - debug!( - "Going to do checkpoint for version [{} ~ {}]", - self.last_checkpoint_version, version - ); - if let Some(checkpoint) = self.do_checkpoint().await? { - self.last_checkpoint_version = checkpoint.last_version(); - } - } - - Ok(()) - } - - /// Makes a new checkpoint. Return the fresh one if there are some actions to compact. - async fn do_checkpoint(&mut self) -> Result> { - let _t = MANIFEST_OP_ELAPSED - .with_label_values(&["checkpoint"]) - .start_timer(); - - let last_checkpoint = Self::last_checkpoint(&mut self.store).await?; - let current_version = self.last_version; - - let (start_version, mut manifest_builder) = if let Some(checkpoint) = last_checkpoint { - ( - checkpoint.last_version + 1, - RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint), - ) - } else { - (MIN_VERSION, RegionManifestBuilder::default()) - }; - let end_version = current_version; - - if start_version >= end_version { - return Ok(None); - } - - let manifests = self.store.scan(start_version, end_version).await?; - let mut last_version = start_version; - let mut compacted_actions = 0; - - let manifests = self.store.fetch_manifests(&manifests).await?; - - for (version, raw_action_list) in manifests { - let action_list = RegionMetaActionList::decode(&raw_action_list)?; - for action in action_list.actions { - match action { - RegionMetaAction::Change(action) => { - manifest_builder.apply_change(version, action); - } - RegionMetaAction::Edit(action) => { - manifest_builder.apply_edit(version, action); - } - RegionMetaAction::Remove(_) => { - debug!( - "Unhandled action for region {}, action: {:?}", - self.manifest.metadata.region_id, action - ); - } - RegionMetaAction::Truncate(action) => { - manifest_builder.apply_truncate(version, action); - } - } - } - last_version = version; - compacted_actions += 1; - } - - if compacted_actions == 0 { - return Ok(None); - } - - let region_manifest = manifest_builder.try_build()?; - let checkpoint = RegionCheckpoint { - last_version, - compacted_actions, - checkpoint: Some(region_manifest), - }; - - self.store - .save_checkpoint(last_version, &checkpoint.encode()?) - .await?; - // TODO(ruihang): this task can be detached - self.store.delete_until(last_version, true).await?; - - info!( - "Done manifest checkpoint for region {}, version: [{}, {}], current latest version: {}, compacted {} actions.", - self.manifest.metadata.region_id, start_version, end_version, last_version, compacted_actions - ); - Ok(Some(checkpoint)) - } - /// Fetches the last [RegionCheckpoint] from storage. pub(crate) async fn last_checkpoint( store: &mut ManifestObjectStore, @@ -471,6 +383,11 @@ impl RegionManifestManager { Ok(None) } } + + #[cfg(test)] + pub(crate) fn checkpointer(&self) -> &Checkpointer { + &self.checkpointer + } } #[cfg(test)] @@ -489,6 +406,7 @@ impl RegionManifestManager { #[cfg(test)] mod test { + use std::time::Duration; use api::v1::SemanticType; use common_datasource::compression::CompressionType; @@ -654,6 +572,10 @@ mod test { .unwrap(); } + while manager.checkpointer.is_doing_checkpoint() { + tokio::time::sleep(Duration::from_millis(10)).await; + } + // check manifest size again let manifest_size = manager.manifest_usage(); assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await); @@ -670,6 +592,6 @@ mod test { // get manifest size again let manifest_size = manager.manifest_usage(); - assert_eq!(manifest_size, 1312); + assert_eq!(manifest_size, 1173); } } diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 88450a21bd..d470f2050b 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::iter::Iterator; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use common_datasource::compression::CompressionType; use common_telemetry::debug; @@ -134,7 +134,7 @@ pub struct ManifestObjectStore { compress_type: CompressionType, path: String, /// Stores the size of each manifest file. - manifest_size_map: HashMap, + manifest_size_map: Arc>>, total_manifest_size: Arc, } @@ -149,7 +149,7 @@ impl ManifestObjectStore { object_store, compress_type, path: util::normalize_dir(path), - manifest_size_map: HashMap::new(), + manifest_size_map: Arc::new(RwLock::new(HashMap::new())), total_manifest_size, } } @@ -239,8 +239,11 @@ impl ManifestObjectStore { /// Fetch all manifests in concurrent. pub async fn fetch_manifests( &self, - manifests: &[(u64, Entry)], + start_version: ManifestVersion, + end_version: ManifestVersion, ) -> Result)>> { + let manifests = self.scan(start_version, end_version).await?; + // TODO(weny): Make it configurable. let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM); @@ -272,7 +275,7 @@ impl ManifestObjectStore { /// ### Return /// The number of deleted files. pub async fn delete_until( - &mut self, + &self, end: ManifestVersion, keep_last_checkpoint: bool, ) -> Result { @@ -378,7 +381,11 @@ impl ManifestObjectStore { } /// Save the checkpoint manifest file. - pub async fn save_checkpoint(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { + pub(crate) async fn save_checkpoint( + &self, + version: ManifestVersion, + bytes: &[u8], + ) -> Result<()> { let path = self.checkpoint_file_path(version); let data = self .compress_type @@ -566,24 +573,28 @@ impl ManifestObjectStore { /// Compute the size(Byte) in manifest size map. pub(crate) fn total_manifest_size(&self) -> u64 { - self.manifest_size_map.values().sum() + self.manifest_size_map.read().unwrap().values().sum() } /// Set the size of the delta file by delta version. pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) { - self.manifest_size_map.insert(FileKey::Delta(version), size); + let mut m = self.manifest_size_map.write().unwrap(); + m.insert(FileKey::Delta(version), size); + self.inc_total_manifest_size(size); } /// Set the size of the checkpoint file by checkpoint version. - pub(crate) fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) { - self.manifest_size_map - .insert(FileKey::Checkpoint(version), size); + fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) { + let mut m = self.manifest_size_map.write().unwrap(); + m.insert(FileKey::Checkpoint(version), size); + self.inc_total_manifest_size(size); } - fn unset_file_size(&mut self, key: &FileKey) { - if let Some(val) = self.manifest_size_map.remove(key) { + fn unset_file_size(&self, key: &FileKey) { + let mut m = self.manifest_size_map.write().unwrap(); + if let Some(val) = m.remove(key) { self.dec_total_manifest_size(val); } } @@ -682,8 +693,7 @@ mod tests { .unwrap(); } - let manifests = log_store.scan(1, 4).await.unwrap(); - let manifests = log_store.fetch_manifests(&manifests).await.unwrap(); + let manifests = log_store.fetch_manifests(1, 4).await.unwrap(); let mut it = manifests.into_iter(); for v in 1..4 { let (version, bytes) = it.next().unwrap(); @@ -692,8 +702,7 @@ mod tests { } assert!(it.next().is_none()); - let manifests = log_store.scan(0, 11).await.unwrap(); - let manifests = log_store.fetch_manifests(&manifests).await.unwrap(); + let manifests = log_store.fetch_manifests(0, 11).await.unwrap(); let mut it = manifests.into_iter(); for v in 0..5 { let (version, bytes) = it.next().unwrap(); @@ -721,8 +730,7 @@ mod tests { .unwrap() .unwrap(); let _ = log_store.load_last_checkpoint().await.unwrap().unwrap(); - let manifests = log_store.scan(0, 11).await.unwrap(); - let manifests = log_store.fetch_manifests(&manifests).await.unwrap(); + let manifests = log_store.fetch_manifests(0, 11).await.unwrap(); let mut it = manifests.into_iter(); let (version, bytes) = it.next().unwrap(); @@ -738,8 +746,7 @@ mod tests { .unwrap() .is_none()); assert!(log_store.load_last_checkpoint().await.unwrap().is_none()); - let manifests = log_store.scan(0, 11).await.unwrap(); - let manifests = log_store.fetch_manifests(&manifests).await.unwrap(); + let manifests = log_store.fetch_manifests(0, 11).await.unwrap(); let mut it = manifests.into_iter(); assert!(it.next().is_none()); @@ -784,8 +791,7 @@ mod tests { .unwrap(); // test data reading - let manifests = log_store.scan(0, 10).await.unwrap(); - let manifests = log_store.fetch_manifests(&manifests).await.unwrap(); + let manifests = log_store.fetch_manifests(0, 10).await.unwrap(); let mut it = manifests.into_iter(); for v in 0..10 { @@ -807,8 +813,7 @@ mod tests { // Delete util 10, contain uncompressed/compressed data // log 0, 1, 2, 7, 8, 9 will be delete assert_eq!(11, log_store.delete_until(10, false).await.unwrap()); - let manifests = log_store.scan(0, 10).await.unwrap(); - let manifests = log_store.fetch_manifests(&manifests).await.unwrap(); + let manifests = log_store.fetch_manifests(0, 10).await.unwrap(); let mut it = manifests.into_iter(); assert!(it.next().is_none()); } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index c79cfab43a..f4de59e46c 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -14,6 +14,7 @@ use std::assert_matches::assert_matches; use std::sync::Arc; +use std::time::Duration; use common_datasource::compression::CompressionType; use store_api::storage::RegionId; @@ -113,6 +114,10 @@ async fn manager_with_checkpoint_distance_1() { // apply 10 actions for _ in 0..10 { manager.update(nop_action()).await.unwrap(); + + while manager.checkpointer().is_doing_checkpoint() { + tokio::time::sleep(Duration::from_millis(10)).await; + } } // has checkpoint @@ -126,9 +131,8 @@ async fn manager_with_checkpoint_distance_1() { // check files let mut expected = vec![ "00000000000000000009.checkpoint", + "00000000000000000010.checkpoint", "00000000000000000010.json", - "00000000000000000008.checkpoint", - "00000000000000000009.json", "_last_checkpoint", ]; expected.sort_unstable(); @@ -148,7 +152,7 @@ async fn manager_with_checkpoint_distance_1() { .unwrap(); let raw_json = std::str::from_utf8(&raw_bytes).unwrap(); let expected_json = - "{\"size\":846,\"version\":9,\"checksum\":1218259706,\"extend_metadata\":{}}"; + "{\"size\":848,\"version\":10,\"checksum\":4186457347,\"extend_metadata\":{}}"; assert_eq!(expected_json, raw_json); // reopen the manager @@ -241,6 +245,10 @@ async fn generate_checkpoint_with_compression_types( for action in actions { manager.update(action).await.unwrap(); + + while manager.checkpointer().is_doing_checkpoint() { + tokio::time::sleep(Duration::from_millis(10)).await; + } } RegionManifestManager::last_checkpoint(&mut manager.store())