refactor: make region manifest checkpoint ran in background (#4133)

* refactor: use in-database manifest as checkpoint instead of merging incremental files in object store

* refactor: make region manifest checkpoint ran in background

* reduce unnecessary metrics

* Update src/mito2/src/manifest/checkpointer.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* resolve PR comments

* resolve PR comments

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
LFC
2024-06-17 11:47:18 +08:00
committed by GitHub
parent 5390603855
commit f4a5a44549
7 changed files with 248 additions and 133 deletions

1
Cargo.lock generated
View File

@@ -5901,6 +5901,7 @@ dependencies = [
"rskafka",
"rstest",
"rstest_reuse",
"scopeguard",
"serde",
"serde_json",
"serde_with",

View File

@@ -58,6 +58,7 @@ regex = "1.5"
rskafka = { workspace = true, optional = true }
rstest = { workspace = true, optional = true }
rstest_reuse = { workspace = true, optional = true }
scopeguard = "1.2"
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true

View File

@@ -15,6 +15,7 @@
//! manifest storage
pub mod action;
mod checkpointer;
pub mod manager;
pub mod storage;
#[cfg(test)]

View File

@@ -0,0 +1,177 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use common_telemetry::{error, info};
use store_api::manifest::{ManifestVersion, MIN_VERSION};
use store_api::storage::RegionId;
use crate::manifest::action::{RegionCheckpoint, RegionManifest};
use crate::manifest::manager::RegionManifestOptions;
use crate::manifest::storage::ManifestObjectStore;
use crate::metrics::MANIFEST_OP_ELAPSED;
/// [`Checkpointer`] is responsible for doing checkpoint for a region, in an asynchronous way.
#[derive(Debug)]
pub(crate) struct Checkpointer {
manifest_options: RegionManifestOptions,
inner: Arc<Inner>,
}
#[derive(Debug)]
struct Inner {
region_id: RegionId,
manifest_store: ManifestObjectStore,
last_checkpoint_version: AtomicU64,
is_doing_checkpoint: AtomicBool,
}
impl Inner {
async fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
let _guard = scopeguard::guard(&self.is_doing_checkpoint, |x| {
x.store(false, Ordering::Relaxed);
});
let _t = MANIFEST_OP_ELAPSED
.with_label_values(&["checkpoint"])
.start_timer();
let region_id = self.region_id();
let version = checkpoint.last_version();
let checkpoint = match checkpoint.encode() {
Ok(checkpoint) => checkpoint,
Err(e) => {
error!(e; "Failed to encode checkpoint {:?}", checkpoint);
return;
}
};
if let Err(e) = self
.manifest_store
.save_checkpoint(version, &checkpoint)
.await
{
error!(e; "Failed to save checkpoint for region {}", region_id);
return;
}
if let Err(e) = self.manifest_store.delete_until(version, true).await {
error!(e; "Failed to delete manifest actions until version {} for region {}", version, region_id);
return;
}
self.last_checkpoint_version
.store(version, Ordering::Relaxed);
info!(
"Checkpoint for region {} success, version: {}",
region_id, version
);
}
fn region_id(&self) -> RegionId {
self.region_id
}
fn is_doing_checkpoint(&self) -> bool {
self.is_doing_checkpoint.load(Ordering::Relaxed)
}
fn set_doing_checkpoint(&self) {
self.is_doing_checkpoint.store(true, Ordering::Relaxed);
}
}
impl Checkpointer {
pub(crate) fn new(
region_id: RegionId,
manifest_options: RegionManifestOptions,
manifest_store: ManifestObjectStore,
last_checkpoint_version: ManifestVersion,
) -> Self {
Self {
manifest_options,
inner: Arc::new(Inner {
region_id,
manifest_store,
last_checkpoint_version: AtomicU64::new(last_checkpoint_version),
is_doing_checkpoint: AtomicBool::new(false),
}),
}
}
pub(crate) fn last_checkpoint_version(&self) -> ManifestVersion {
self.inner.last_checkpoint_version.load(Ordering::Relaxed)
}
/// Check if it's needed to do checkpoint for the region by the checkpoint distance.
/// If needed, and there's no currently running checkpoint task, it will start a new checkpoint
/// task running in the background.
pub(crate) fn maybe_do_checkpoint(&self, manifest: &RegionManifest) {
if self.manifest_options.checkpoint_distance == 0 {
return;
}
let last_checkpoint_version = self.last_checkpoint_version();
if manifest.manifest_version - last_checkpoint_version
< self.manifest_options.checkpoint_distance
{
return;
}
// We can simply check whether there's a running checkpoint task like this, all because of
// the caller of this function is ran single threaded, inside the lock of RegionManifestManager.
if self.inner.is_doing_checkpoint() {
return;
}
let start_version = if last_checkpoint_version == 0 {
// Checkpoint version can't be zero by implementation.
// So last checkpoint version is zero means no last checkpoint.
MIN_VERSION
} else {
last_checkpoint_version + 1
};
let end_version = manifest.manifest_version;
info!(
"Start doing checkpoint for region {}, compacted version: [{}, {}]",
self.inner.region_id(),
start_version,
end_version,
);
let checkpoint = RegionCheckpoint {
last_version: end_version,
compacted_actions: (end_version - start_version + 1) as usize,
checkpoint: Some(manifest.clone()),
};
self.do_checkpoint(checkpoint);
}
fn do_checkpoint(&self, checkpoint: RegionCheckpoint) {
self.inner.set_doing_checkpoint();
let inner = self.inner.clone();
common_runtime::spawn_bg(async move {
inner.do_checkpoint(checkpoint).await;
});
}
#[cfg(test)]
pub(crate) fn is_doing_checkpoint(&self) -> bool {
self.inner.is_doing_checkpoint()
}
}

View File

@@ -28,6 +28,7 @@ use crate::manifest::action::{
RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction,
RegionMetaActionList,
};
use crate::manifest::checkpointer::Checkpointer;
use crate::manifest::storage::{file_version, is_delta_file, ManifestObjectStore};
use crate::metrics::MANIFEST_OP_ELAPSED;
@@ -112,10 +113,8 @@ pub struct RegionManifestOptions {
#[derive(Debug)]
pub struct RegionManifestManager {
store: ManifestObjectStore,
options: RegionManifestOptions,
last_version: ManifestVersion,
/// The last version included in checkpoint file.
last_checkpoint_version: ManifestVersion,
checkpointer: Checkpointer,
manifest: Arc<RegionManifest>,
stopped: bool,
}
@@ -150,6 +149,7 @@ impl RegionManifestManager {
},
);
let manifest = manifest_builder.try_build()?;
let region_id = metadata.region_id;
debug!(
"Build region manifest in {}, manifest: {:?}",
@@ -161,11 +161,11 @@ impl RegionManifestManager {
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata }));
store.save(version, &action_list.encode()?).await?;
let checkpointer = Checkpointer::new(region_id, options, store.clone(), MIN_VERSION);
Ok(Self {
store,
options,
last_version: version,
last_checkpoint_version: MIN_VERSION,
checkpointer,
manifest: Arc::new(manifest),
stopped: false,
})
@@ -215,8 +215,7 @@ impl RegionManifestManager {
};
// apply actions from storage
let manifests = store.scan(version, MAX_VERSION).await?;
let manifests = store.fetch_manifests(&manifests).await?;
let manifests = store.fetch_manifests(version, MAX_VERSION).await?;
for (manifest_version, raw_action_list) in manifests {
let action_list = RegionMetaActionList::decode(&raw_action_list)?;
@@ -256,11 +255,16 @@ impl RegionManifestManager {
);
let version = manifest.manifest_version;
let checkpointer = Checkpointer::new(
manifest.metadata.region_id,
options,
store.clone(),
last_checkpoint_version,
);
Ok(Some(Self {
store,
options,
last_version: version,
last_checkpoint_version,
checkpointer,
manifest: Arc::new(manifest),
stopped: false,
}))
@@ -310,7 +314,9 @@ impl RegionManifestManager {
}
let new_manifest = manifest_builder.try_build()?;
self.manifest = Arc::new(new_manifest);
self.may_do_checkpoint(version).await?;
self.checkpointer
.maybe_do_checkpoint(self.manifest.as_ref());
Ok(version)
}
@@ -364,100 +370,6 @@ impl RegionManifestManager {
self.last_version
}
async fn may_do_checkpoint(&mut self, version: ManifestVersion) -> Result<()> {
if version - self.last_checkpoint_version >= self.options.checkpoint_distance
&& self.options.checkpoint_distance != 0
{
debug!(
"Going to do checkpoint for version [{} ~ {}]",
self.last_checkpoint_version, version
);
if let Some(checkpoint) = self.do_checkpoint().await? {
self.last_checkpoint_version = checkpoint.last_version();
}
}
Ok(())
}
/// Makes a new checkpoint. Return the fresh one if there are some actions to compact.
async fn do_checkpoint(&mut self) -> Result<Option<RegionCheckpoint>> {
let _t = MANIFEST_OP_ELAPSED
.with_label_values(&["checkpoint"])
.start_timer();
let last_checkpoint = Self::last_checkpoint(&mut self.store).await?;
let current_version = self.last_version;
let (start_version, mut manifest_builder) = if let Some(checkpoint) = last_checkpoint {
(
checkpoint.last_version + 1,
RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint),
)
} else {
(MIN_VERSION, RegionManifestBuilder::default())
};
let end_version = current_version;
if start_version >= end_version {
return Ok(None);
}
let manifests = self.store.scan(start_version, end_version).await?;
let mut last_version = start_version;
let mut compacted_actions = 0;
let manifests = self.store.fetch_manifests(&manifests).await?;
for (version, raw_action_list) in manifests {
let action_list = RegionMetaActionList::decode(&raw_action_list)?;
for action in action_list.actions {
match action {
RegionMetaAction::Change(action) => {
manifest_builder.apply_change(version, action);
}
RegionMetaAction::Edit(action) => {
manifest_builder.apply_edit(version, action);
}
RegionMetaAction::Remove(_) => {
debug!(
"Unhandled action for region {}, action: {:?}",
self.manifest.metadata.region_id, action
);
}
RegionMetaAction::Truncate(action) => {
manifest_builder.apply_truncate(version, action);
}
}
}
last_version = version;
compacted_actions += 1;
}
if compacted_actions == 0 {
return Ok(None);
}
let region_manifest = manifest_builder.try_build()?;
let checkpoint = RegionCheckpoint {
last_version,
compacted_actions,
checkpoint: Some(region_manifest),
};
self.store
.save_checkpoint(last_version, &checkpoint.encode()?)
.await?;
// TODO(ruihang): this task can be detached
self.store.delete_until(last_version, true).await?;
info!(
"Done manifest checkpoint for region {}, version: [{}, {}], current latest version: {}, compacted {} actions.",
self.manifest.metadata.region_id, start_version, end_version, last_version, compacted_actions
);
Ok(Some(checkpoint))
}
/// Fetches the last [RegionCheckpoint] from storage.
pub(crate) async fn last_checkpoint(
store: &mut ManifestObjectStore,
@@ -471,6 +383,11 @@ impl RegionManifestManager {
Ok(None)
}
}
#[cfg(test)]
pub(crate) fn checkpointer(&self) -> &Checkpointer {
&self.checkpointer
}
}
#[cfg(test)]
@@ -489,6 +406,7 @@ impl RegionManifestManager {
#[cfg(test)]
mod test {
use std::time::Duration;
use api::v1::SemanticType;
use common_datasource::compression::CompressionType;
@@ -654,6 +572,10 @@ mod test {
.unwrap();
}
while manager.checkpointer.is_doing_checkpoint() {
tokio::time::sleep(Duration::from_millis(10)).await;
}
// check manifest size again
let manifest_size = manager.manifest_usage();
assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await);
@@ -670,6 +592,6 @@ mod test {
// get manifest size again
let manifest_size = manager.manifest_usage();
assert_eq!(manifest_size, 1312);
assert_eq!(manifest_size, 1173);
}
}

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::iter::Iterator;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use common_datasource::compression::CompressionType;
use common_telemetry::debug;
@@ -134,7 +134,7 @@ pub struct ManifestObjectStore {
compress_type: CompressionType,
path: String,
/// Stores the size of each manifest file.
manifest_size_map: HashMap<FileKey, u64>,
manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
total_manifest_size: Arc<AtomicU64>,
}
@@ -149,7 +149,7 @@ impl ManifestObjectStore {
object_store,
compress_type,
path: util::normalize_dir(path),
manifest_size_map: HashMap::new(),
manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
total_manifest_size,
}
}
@@ -239,8 +239,11 @@ impl ManifestObjectStore {
/// Fetch all manifests in concurrent.
pub async fn fetch_manifests(
&self,
manifests: &[(u64, Entry)],
start_version: ManifestVersion,
end_version: ManifestVersion,
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
let manifests = self.scan(start_version, end_version).await?;
// TODO(weny): Make it configurable.
let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
@@ -272,7 +275,7 @@ impl ManifestObjectStore {
/// ### Return
/// The number of deleted files.
pub async fn delete_until(
&mut self,
&self,
end: ManifestVersion,
keep_last_checkpoint: bool,
) -> Result<usize> {
@@ -378,7 +381,11 @@ impl ManifestObjectStore {
}
/// Save the checkpoint manifest file.
pub async fn save_checkpoint(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
pub(crate) async fn save_checkpoint(
&self,
version: ManifestVersion,
bytes: &[u8],
) -> Result<()> {
let path = self.checkpoint_file_path(version);
let data = self
.compress_type
@@ -566,24 +573,28 @@ impl ManifestObjectStore {
/// Compute the size(Byte) in manifest size map.
pub(crate) fn total_manifest_size(&self) -> u64 {
self.manifest_size_map.values().sum()
self.manifest_size_map.read().unwrap().values().sum()
}
/// Set the size of the delta file by delta version.
pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
self.manifest_size_map.insert(FileKey::Delta(version), size);
let mut m = self.manifest_size_map.write().unwrap();
m.insert(FileKey::Delta(version), size);
self.inc_total_manifest_size(size);
}
/// Set the size of the checkpoint file by checkpoint version.
pub(crate) fn set_checkpoint_file_size(&mut self, version: ManifestVersion, size: u64) {
self.manifest_size_map
.insert(FileKey::Checkpoint(version), size);
fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
let mut m = self.manifest_size_map.write().unwrap();
m.insert(FileKey::Checkpoint(version), size);
self.inc_total_manifest_size(size);
}
fn unset_file_size(&mut self, key: &FileKey) {
if let Some(val) = self.manifest_size_map.remove(key) {
fn unset_file_size(&self, key: &FileKey) {
let mut m = self.manifest_size_map.write().unwrap();
if let Some(val) = m.remove(key) {
self.dec_total_manifest_size(val);
}
}
@@ -682,8 +693,7 @@ mod tests {
.unwrap();
}
let manifests = log_store.scan(1, 4).await.unwrap();
let manifests = log_store.fetch_manifests(&manifests).await.unwrap();
let manifests = log_store.fetch_manifests(1, 4).await.unwrap();
let mut it = manifests.into_iter();
for v in 1..4 {
let (version, bytes) = it.next().unwrap();
@@ -692,8 +702,7 @@ mod tests {
}
assert!(it.next().is_none());
let manifests = log_store.scan(0, 11).await.unwrap();
let manifests = log_store.fetch_manifests(&manifests).await.unwrap();
let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
let mut it = manifests.into_iter();
for v in 0..5 {
let (version, bytes) = it.next().unwrap();
@@ -721,8 +730,7 @@ mod tests {
.unwrap()
.unwrap();
let _ = log_store.load_last_checkpoint().await.unwrap().unwrap();
let manifests = log_store.scan(0, 11).await.unwrap();
let manifests = log_store.fetch_manifests(&manifests).await.unwrap();
let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
let mut it = manifests.into_iter();
let (version, bytes) = it.next().unwrap();
@@ -738,8 +746,7 @@ mod tests {
.unwrap()
.is_none());
assert!(log_store.load_last_checkpoint().await.unwrap().is_none());
let manifests = log_store.scan(0, 11).await.unwrap();
let manifests = log_store.fetch_manifests(&manifests).await.unwrap();
let manifests = log_store.fetch_manifests(0, 11).await.unwrap();
let mut it = manifests.into_iter();
assert!(it.next().is_none());
@@ -784,8 +791,7 @@ mod tests {
.unwrap();
// test data reading
let manifests = log_store.scan(0, 10).await.unwrap();
let manifests = log_store.fetch_manifests(&manifests).await.unwrap();
let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
let mut it = manifests.into_iter();
for v in 0..10 {
@@ -807,8 +813,7 @@ mod tests {
// Delete util 10, contain uncompressed/compressed data
// log 0, 1, 2, 7, 8, 9 will be delete
assert_eq!(11, log_store.delete_until(10, false).await.unwrap());
let manifests = log_store.scan(0, 10).await.unwrap();
let manifests = log_store.fetch_manifests(&manifests).await.unwrap();
let manifests = log_store.fetch_manifests(0, 10).await.unwrap();
let mut it = manifests.into_iter();
assert!(it.next().is_none());
}

View File

@@ -14,6 +14,7 @@
use std::assert_matches::assert_matches;
use std::sync::Arc;
use std::time::Duration;
use common_datasource::compression::CompressionType;
use store_api::storage::RegionId;
@@ -113,6 +114,10 @@ async fn manager_with_checkpoint_distance_1() {
// apply 10 actions
for _ in 0..10 {
manager.update(nop_action()).await.unwrap();
while manager.checkpointer().is_doing_checkpoint() {
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
// has checkpoint
@@ -126,9 +131,8 @@ async fn manager_with_checkpoint_distance_1() {
// check files
let mut expected = vec![
"00000000000000000009.checkpoint",
"00000000000000000010.checkpoint",
"00000000000000000010.json",
"00000000000000000008.checkpoint",
"00000000000000000009.json",
"_last_checkpoint",
];
expected.sort_unstable();
@@ -148,7 +152,7 @@ async fn manager_with_checkpoint_distance_1() {
.unwrap();
let raw_json = std::str::from_utf8(&raw_bytes).unwrap();
let expected_json =
"{\"size\":846,\"version\":9,\"checksum\":1218259706,\"extend_metadata\":{}}";
"{\"size\":848,\"version\":10,\"checksum\":4186457347,\"extend_metadata\":{}}";
assert_eq!(expected_json, raw_json);
// reopen the manager
@@ -241,6 +245,10 @@ async fn generate_checkpoint_with_compression_types(
for action in actions {
manager.update(action).await.unwrap();
while manager.checkpointer().is_doing_checkpoint() {
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
RegionManifestManager::last_checkpoint(&mut manager.store())