feat(mito): checkpoint for mito2 (#2142)

* basic impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* adjust dir structure

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix styles

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sort result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* downgrade log level

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply CR sugg.

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add region id to log

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-08-13 17:26:01 +08:00
committed by GitHub
parent e6090a8d5b
commit 6d64e1c296
16 changed files with 462 additions and 228 deletions

58
Cargo.lock generated
View File

@@ -749,7 +749,7 @@ version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62"
dependencies = [
"heck",
"heck 0.4.1",
"proc-macro2",
"quote",
"syn 2.0.28",
@@ -1456,7 +1456,7 @@ version = "3.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae6371b8bdc8b7d3959e9cf7b22d4435ef3e79e138688421ec654acf8c81b008"
dependencies = [
"heck",
"heck 0.4.1",
"proc-macro-error",
"proc-macro2",
"quote",
@@ -1469,7 +1469,7 @@ version = "4.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050"
dependencies = [
"heck",
"heck 0.4.1",
"proc-macro2",
"quote",
"syn 2.0.28",
@@ -1668,6 +1668,7 @@ dependencies = [
"paste",
"regex",
"snafu",
"strum 0.21.0",
"tokio",
"tokio-util",
"url",
@@ -4245,6 +4246,15 @@ dependencies = [
"http",
]
[[package]]
name = "heck"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "heck"
version = "0.4.1"
@@ -5522,6 +5532,7 @@ dependencies = [
"snafu",
"storage",
"store-api",
"strum 0.21.0",
"table",
"tokio",
"uuid",
@@ -5600,7 +5611,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56b0d8a0db9bf6d2213e11f2c701cb91387b0614361625ab7b9743b41aa4938f"
dependencies = [
"darling 0.20.3",
"heck",
"heck 0.4.1",
"num-bigint",
"proc-macro-crate 1.3.1",
"proc-macro-error",
@@ -7035,7 +7046,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270"
dependencies = [
"bytes",
"heck",
"heck 0.4.1",
"itertools 0.10.5",
"lazy_static",
"log",
@@ -9102,7 +9113,7 @@ version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf"
dependencies = [
"heck",
"heck 0.4.1",
"proc-macro2",
"quote",
"syn 1.0.109",
@@ -9354,7 +9365,7 @@ checksum = "9966e64ae989e7e575b19d7265cb79d7fc3cbbdf179835cb0d716f294c2049c9"
dependencies = [
"dotenvy",
"either",
"heck",
"heck 0.4.1",
"once_cell",
"proc-macro2",
"quote",
@@ -9556,6 +9567,15 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "strum"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aaf86bbcfd1fa9670b7a129f64fc0c9fcbbfe4f1bc4210e9e98fe71ffc12cde2"
dependencies = [
"strum_macros 0.21.1",
]
[[package]]
name = "strum"
version = "0.24.1"
@@ -9574,13 +9594,25 @@ dependencies = [
"strum_macros 0.25.1",
]
[[package]]
name = "strum_macros"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d06aaeeee809dbc59eb4556183dd927df67db1540de5be8d3ec0b6636358a5ec"
dependencies = [
"heck 0.3.3",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "strum_macros"
version = "0.24.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59"
dependencies = [
"heck",
"heck 0.4.1",
"proc-macro2",
"quote",
"rustversion",
@@ -9593,7 +9625,7 @@ version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6069ca09d878a33f883cc06aaa9718ede171841d3832450354410b718b097232"
dependencies = [
"heck",
"heck 0.4.1",
"proc-macro2",
"quote",
"rustversion",
@@ -9643,7 +9675,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3ae64fb7ad0670c7d6d53d57b1b91beb2212afc30e164cc8edb02d6b2cff32a"
dependencies = [
"gix",
"heck",
"heck 0.4.1",
"prettyplease 0.2.12",
"prost",
"prost-build",
@@ -9665,7 +9697,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ac1ce8315086b127ca0abf162c62279550942bb26ebf7946fe17fe114446472"
dependencies = [
"git2",
"heck",
"heck 0.4.1",
"prettyplease 0.2.12",
"prost",
"prost-build",
@@ -10704,7 +10736,7 @@ version = "0.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95d27d749378ceab6ec22188ed7ad102205c89ddb92ab662371c850ffc71aa1a"
dependencies = [
"heck",
"heck 0.4.1",
"log",
"proc-macro2",
"quote",
@@ -10722,7 +10754,7 @@ version = "0.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c8d9ecedde2fd77e975c38eeb9ca40b34ad0247b2259c6e6bbd2a8d6cc2444f"
dependencies = [
"heck",
"heck 0.4.1",
"log",
"proc-macro2",
"quote",

View File

@@ -27,6 +27,7 @@ orc-rust = "0.2"
paste = "1.0"
regex = "1.7"
snafu.workspace = true
strum = { version = "0.21", features = ["derive"] }
tokio-util.workspace = true
tokio.workspace = true
url = "2.3"

View File

@@ -20,11 +20,12 @@ use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZstdD
use async_compression::tokio::write;
use bytes::Bytes;
use futures::Stream;
use strum::EnumIter;
use tokio::io::{AsyncRead, AsyncWriteExt, BufReader};
use tokio_util::io::{ReaderStream, StreamReader};
use crate::error::{self, Error, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)]
pub enum CompressionType {
/// Gzip-ed file
Gzip,

View File

@@ -46,6 +46,7 @@ serde_json = "1.0"
snafu.workspace = true
storage = { workspace = true }
store-api = { workspace = true }
strum = "0.21"
table = { workspace = true }
tokio.workspace = true
uuid.workspace = true

View File

@@ -37,7 +37,7 @@ pub struct MitoConfig {
// Manifest configs:
/// Number of meta action updated to trigger a new checkpoint
/// for the manifest (default 10).
pub manifest_checkpoint_interval: u64,
pub manifest_checkpoint_distance: u64,
/// Manifest compression type (default uncompressed).
pub manifest_compress_type: CompressionType,
}
@@ -48,7 +48,7 @@ impl Default for MitoConfig {
num_workers: DEFAULT_NUM_WORKERS,
worker_channel_size: 128,
worker_request_batch_size: 64,
manifest_checkpoint_interval: 10,
manifest_checkpoint_distance: 10,
manifest_compress_type: CompressionType::Uncompressed,
}
}

View File

@@ -15,9 +15,8 @@
//! manifest storage
pub mod action;
pub mod gc_task;
pub mod helper;
#[allow(unused_variables)]
pub mod manager;
pub mod options;
pub mod storage;
#[cfg(test)]
mod tests;

View File

@@ -12,18 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Defines [RegionMetaAction] related structs and [RegionCheckpoint].
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use storage::metadata::VersionNumber;
use storage::sst::{FileId, FileMeta};
use store_api::manifest::action::{ProtocolAction, ProtocolVersion};
use store_api::manifest::ManifestVersion;
use store_api::storage::{RegionId, SequenceNumber};
use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
use crate::metadata::RegionMetadataRef;
use crate::sst::file::{FileId, FileMeta};
/// Actions that can be applied to region manifest.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
@@ -46,7 +47,6 @@ pub struct RegionChange {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionEdit {
pub region_version: VersionNumber,
pub files_to_add: Vec<FileMeta>,
pub files_to_remove: Vec<FileMeta>,
pub compaction_time_window: Option<i64>,
@@ -123,8 +123,6 @@ impl RegionManifestBuilder {
// The checkpoint of region manifest, generated by checkpointer.
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct RegionCheckpoint {
/// The snasphot protocol
pub protocol: ProtocolAction,
/// The last manifest version that this checkpoint compacts(inclusive).
pub last_version: ManifestVersion,
// The number of manifest actions that this checkpoint compacts.
@@ -134,21 +132,20 @@ pub struct RegionCheckpoint {
}
impl RegionCheckpoint {
pub fn set_protocol(&mut self, action: ProtocolAction) {
self.protocol = action;
}
pub fn last_version(&self) -> ManifestVersion {
self.last_version
}
pub fn encode(&self) -> Result<Vec<u8>> {
todo!()
let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
Ok(json.into_bytes())
}
pub fn decode(bs: &[u8]) -> Result<Self> {
// helper::decode_checkpoint(bs, reader_version)
todo!()
pub fn decode(bytes: &[u8]) -> Result<Self> {
let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
serde_json::from_str(data).context(SerdeJsonSnafu)
}
}
@@ -207,7 +204,6 @@ impl RegionMetaActionIter {
#[cfg(test)]
mod tests {
use storage::sst::FileId;
use super::*;
@@ -238,7 +234,7 @@ mod tests {
FileMeta {
region_id: 0.into(),
file_id: FileId::random(),
time_range: None,
time_range: (0.into(), 10000.into()),
level: 0,
file_size: 1024,
}

View File

@@ -1,36 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::Serialize;
use store_api::manifest::action::ProtocolVersion;
use store_api::manifest::ManifestVersion;
use crate::error::Result;
use crate::manifest::action::RegionCheckpoint;
pub const NEWLINE: &[u8] = b"\n";
pub fn encode_actions<T: Serialize>(
prev_version: ManifestVersion,
actions: &[T],
) -> Result<Vec<u8>> {
todo!()
}
pub fn encode_checkpoint(snasphot: &RegionCheckpoint) -> Result<Vec<u8>> {
todo!()
}
pub fn decode_checkpoint(bs: &[u8], reader_version: ProtocolVersion) -> Result<RegionCheckpoint> {
todo!()
}

View File

@@ -14,20 +14,32 @@
use std::sync::Arc;
use common_datasource::compression::CompressionType;
use common_telemetry::{debug, info};
use store_api::manifest::action::{ProtocolAction, ProtocolVersion};
use object_store::ObjectStore;
use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION};
use tokio::sync::RwLock;
use crate::error::Result;
use crate::manifest::action::{
RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction,
RegionMetaActionIter, RegionMetaActionList,
RegionMetaActionList,
};
use crate::manifest::options::RegionManifestOptions;
use crate::manifest::storage::ManifestObjectStore;
use crate::metadata::RegionMetadataRef;
/// Options for [RegionManifestManager].
#[derive(Debug, Clone)]
pub struct RegionManifestOptions {
/// Directory to store manifest.
pub manifest_dir: String,
pub object_store: ObjectStore,
pub compress_type: CompressionType,
/// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints.
/// Set to 0 to disable checkpoint.
pub checkpoint_distance: u64,
}
// rewrite note:
// trait Checkpoint -> struct RegionCheckpoint
// trait MetaAction -> struct RegionMetaActionList
@@ -136,6 +148,12 @@ impl RegionManifestManager {
let inner = self.inner.read().await;
inner.manifest.clone()
}
#[cfg(test)]
pub async fn store(&self) -> ManifestObjectStore {
let inner = self.inner.read().await;
inner.store.clone()
}
}
#[cfg(test)]
@@ -155,10 +173,12 @@ impl RegionManifestManager {
}
#[derive(Debug)]
struct RegionManifestManagerInner {
pub(crate) struct RegionManifestManagerInner {
store: ManifestObjectStore,
options: RegionManifestOptions,
last_version: ManifestVersion,
/// The last version included in checkpoint file.
last_checkpoint_version: ManifestVersion,
manifest: Arc<RegionManifest>,
}
@@ -198,12 +218,11 @@ impl RegionManifestManagerInner {
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata }));
store.save(version, &action_list.encode()?).await?;
// todo: start gc task
Ok(Self {
store,
options,
last_version: version,
last_checkpoint_version: MIN_VERSION,
manifest: Arc::new(manifest),
})
}
@@ -222,10 +241,11 @@ impl RegionManifestManagerInner {
// recover from storage
// construct manifest builder
let mut version = MIN_VERSION;
let last_checkpoint = store.load_last_checkpoint().await?;
let checkpoint = last_checkpoint
.map(|(_, raw_checkpoint)| RegionCheckpoint::decode(&raw_checkpoint))
.transpose()?;
let checkpoint = Self::last_checkpoint(&store).await?;
let last_checkpoint_version = checkpoint
.as_ref()
.map(|checkpoint| checkpoint.last_version)
.unwrap_or(MIN_VERSION);
let mut manifest_builder = if let Some(checkpoint) = checkpoint {
info!(
"Recover region manifest {} from checkpoint version {}",
@@ -276,18 +296,16 @@ impl RegionManifestManagerInner {
);
let version = manifest.manifest_version;
// todo: start gc task
Ok(Some(Self {
store,
options,
last_version: version,
last_checkpoint_version,
manifest: Arc::new(manifest),
}))
}
async fn stop(&mut self) -> Result<()> {
// todo: stop gc task
Ok(())
}
@@ -315,6 +333,7 @@ impl RegionManifestManagerInner {
}
let new_manifest = manifest_builder.try_build()?;
self.manifest = Arc::new(new_manifest);
self.may_do_checkpoint(version).await?;
Ok(version)
}
@@ -327,68 +346,102 @@ impl RegionManifestManagerInner {
self.last_version
}
// pub (crate) fn checkpointer(&self) -> Checkpointer {
// todo!()
// }
pub(crate) async fn may_do_checkpoint(&mut self, version: ManifestVersion) -> Result<()> {
if version - self.last_checkpoint_version >= self.options.checkpoint_distance
&& self.options.checkpoint_distance != 0
{
debug!(
"Going to do checkpoint for version [{} ~ {}]",
self.last_checkpoint_version, version
);
if let Some(checkpoint) = self.do_checkpoint().await? {
self.last_checkpoint_version = checkpoint.last_version();
}
}
pub(crate) fn set_last_checkpoint_version(&self, _version: ManifestVersion) {
todo!()
}
/// Update inner state.
pub fn update_state(&self, _version: ManifestVersion, _protocol: Option<ProtocolAction>) {
todo!()
}
pub(crate) async fn save_checkpoint(&self, checkpoint: &RegionCheckpoint) -> Result<()> {
todo!()
}
pub(crate) async fn may_do_checkpoint(&self, version: ManifestVersion) -> Result<()> {
todo!()
}
// pub(crate) fn manifest_store(&self) -> &Arc<ManifestObjectStore> {
// todo!()
// }
// from Manifest
async fn scan(
&self,
start: ManifestVersion,
end: ManifestVersion,
) -> Result<RegionMetaActionIter> {
todo!()
Ok(())
}
/// Make a new checkpoint. Return the fresh one if there are some actions to compact.
async fn do_checkpoint(&self) -> Result<Option<RegionCheckpoint>> {
todo!()
let last_checkpoint = Self::last_checkpoint(&self.store).await?;
let current_version = self.last_version;
let (start_version, mut manifest_builder) = if let Some(checkpoint) = last_checkpoint {
(
checkpoint.last_version + 1,
RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint),
)
} else {
(MIN_VERSION, RegionManifestBuilder::default())
};
let end_version = current_version;
if start_version >= end_version {
return Ok(None);
}
let mut iter = self.store.scan(start_version, end_version).await?;
let mut last_version = start_version;
let mut compacted_actions = 0;
while let Some((version, raw_action_list)) = iter.next_log().await? {
let action_list = RegionMetaActionList::decode(&raw_action_list)?;
for action in action_list.actions {
match action {
RegionMetaAction::Change(action) => {
manifest_builder.apply_change(version, action);
}
RegionMetaAction::Edit(action) => {
manifest_builder.apply_edit(version, action);
}
RegionMetaAction::Remove(_) | RegionMetaAction::Protocol(_) => {
debug!(
"Unhandled action for region {}, action: {:?}",
self.manifest.metadata.region_id, action
);
}
}
}
last_version = version;
compacted_actions += 1;
}
if compacted_actions == 0 {
return Ok(None);
}
let region_manifest = manifest_builder.try_build()?;
let checkpoint = RegionCheckpoint {
last_version,
compacted_actions,
checkpoint: Some(region_manifest),
};
self.store
.save_checkpoint(last_version, &checkpoint.encode()?)
.await?;
// TODO(ruihang): this task can be detached
self.store.delete_until(last_version, true).await?;
info!(
"Done manifest checkpoint for region {}, version: [{}, {}], current latest version: {}, compacted {} actions.",
self.manifest.metadata.region_id, start_version, end_version, last_version, compacted_actions
);
Ok(Some(checkpoint))
}
async fn last_checkpoint(&self) -> Result<Option<RegionCheckpoint>> {
todo!()
}
/// Fetch the last [RegionCheckpoint] from storage.
pub(crate) async fn last_checkpoint(
store: &ManifestObjectStore,
) -> Result<Option<RegionCheckpoint>> {
let last_checkpoint = store.load_last_checkpoint().await?;
// from Checkpoint
/// Set a protocol action into checkpoint
pub fn set_protocol(&mut self, _action: ProtocolAction) {
todo!()
}
/// The last compacted action's version of checkpoint
pub fn last_version(&self) -> ManifestVersion {
todo!()
}
/// Encode this checkpoint into a byte vector
pub fn encode(&self) -> Result<Vec<u8>> {
todo!()
}
pub fn decode(_bytes: &[u8], _reader_version: ProtocolVersion) -> Result<Self> {
todo!()
if let Some((version, bytes)) = last_checkpoint {
let checkpoint = RegionCheckpoint::decode(&bytes)?;
Ok(Some(checkpoint))
} else {
Ok(None)
}
}
}
@@ -397,43 +450,13 @@ mod test {
use common_datasource::compression::CompressionType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::storage::RegionId;
use super::*;
use crate::manifest::action::RegionChange;
use crate::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder, SemanticType};
use crate::manifest::tests::utils::basic_region_metadata;
use crate::metadata::{ColumnMetadata, RegionMetadataBuilder, SemanticType};
use crate::test_util::TestEnv;
fn basic_region_metadata() -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(23, 33), 0);
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 45,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("pk", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 36,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"val",
ConcreteDataType::float64_datatype(),
false,
),
semantic_type: SemanticType::Field,
column_id: 251,
})
.primary_key(vec![36]);
builder.build().unwrap()
}
#[tokio::test]
async fn create_manifest_manager() {
let metadata = Arc::new(basic_region_metadata());

View File

@@ -1,29 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Options for [RegionManifestManager](crate::manifest::manager::RegionManifestManager).
use common_datasource::compression::CompressionType;
use object_store::ObjectStore;
/// Options for manifest.
#[derive(Debug, Clone)]
pub struct RegionManifestOptions {
/// Directory to store manifest.
pub manifest_dir: String,
pub object_store: ObjectStore,
pub compress_type: CompressionType,
/// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints.
pub checkpoint_interval: u64,
}

View File

@@ -157,13 +157,13 @@ impl ManifestObjectStore {
/// Returns the last checkpoint path, because the last checkpoint is not compressed,
/// so its path name has nothing to do with the compression algorithm used by `ManifestObjectStore`
fn last_checkpoint_path(&self) -> String {
pub(crate) fn last_checkpoint_path(&self) -> String {
format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
}
/// Return all `R`s in the root directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
/// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`)
async fn get_paths<F, R>(&self, filter: F) -> Result<Vec<R>>
pub async fn get_paths<F, R>(&self, filter: F) -> Result<Vec<R>>
where
F: Fn(Entry) -> Option<R>,
{
@@ -211,7 +211,7 @@ impl ManifestObjectStore {
})
}
async fn delete_until(
pub async fn delete_until(
&self,
end: ManifestVersion,
keep_last_checkpoint: bool,
@@ -370,7 +370,7 @@ impl ManifestObjectStore {
Ok(())
}
async fn save_checkpoint(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
pub async fn save_checkpoint(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
let path = self.checkpoint_file_path(version);
let data = self
.compress_type
@@ -409,7 +409,7 @@ impl ManifestObjectStore {
Ok(())
}
async fn load_checkpoint(
pub async fn load_checkpoint(
&self,
version: ManifestVersion,
) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
@@ -510,6 +510,11 @@ impl ManifestObjectStore {
self.load_checkpoint(checkpoint_metadata.version).await
}
#[cfg(test)]
pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
self.object_store.read(path).await.context(OpenDalSnafu)
}
}
#[derive(Serialize, Deserialize, Debug)]

View File

@@ -12,19 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_runtime::TaskFunction;
struct ManifestGcTask {}
#[async_trait::async_trait]
impl TaskFunction<()> for ManifestGcTask {
/// Invoke the task.
async fn call(&mut self) -> std::result::Result<(), ()> {
todo!()
}
/// Name of the task.
fn name(&self) -> &str {
todo!()
}
}
mod checkpoint;
pub mod utils;

View File

@@ -0,0 +1,207 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_datasource::compression::CompressionType;
use store_api::storage::RegionId;
use strum::IntoEnumIterator;
use crate::manifest::action::{
RegionCheckpoint, RegionEdit, RegionMetaAction, RegionMetaActionList,
};
use crate::manifest::manager::{RegionManifestManager, RegionManifestManagerInner};
use crate::manifest::tests::utils::basic_region_metadata;
use crate::sst::file::{FileId, FileMeta};
use crate::test_util::TestEnv;
async fn build_manager(
checkpoint_distance: u64,
compress_type: CompressionType,
) -> (TestEnv, RegionManifestManager) {
let metadata = Arc::new(basic_region_metadata());
let env = TestEnv::new();
let manager = env
.create_manifest_manager(compress_type, checkpoint_distance, Some(metadata.clone()))
.await
.unwrap()
.unwrap();
(env, manager)
}
async fn reopen_manager(
env: &TestEnv,
checkpoint_distance: u64,
compress_type: CompressionType,
) -> RegionManifestManager {
env.create_manifest_manager(compress_type, checkpoint_distance, None)
.await
.unwrap()
.unwrap()
}
fn nop_action() -> RegionMetaActionList {
RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![],
files_to_remove: vec![],
compaction_time_window: None,
flushed_sequence: None,
})])
}
#[tokio::test]
async fn manager_without_checkpoint() {
let (_env, manager) = build_manager(0, CompressionType::Uncompressed).await;
// apply 10 actions
for i in 0..10 {
manager.update(nop_action()).await.unwrap();
}
// no checkpoint
assert!(manager
.store()
.await
.load_last_checkpoint()
.await
.unwrap()
.is_none());
// check files
let mut expected = vec![
"00000000000000000010.json",
"00000000000000000009.json",
"00000000000000000008.json",
"00000000000000000007.json",
"00000000000000000006.json",
"00000000000000000005.json",
"00000000000000000004.json",
"00000000000000000003.json",
"00000000000000000002.json",
"00000000000000000001.json",
"00000000000000000000.json",
];
expected.sort_unstable();
let mut paths = manager
.store()
.await
.get_paths(|e| Some(e.name().to_string()))
.await
.unwrap();
paths.sort_unstable();
assert_eq!(expected, paths);
}
#[tokio::test]
async fn manager_with_checkpoint_distance_1() {
common_telemetry::init_default_ut_logging();
let (env, manager) = build_manager(1, CompressionType::Uncompressed).await;
// apply 10 actions
for i in 0..10 {
manager.update(nop_action()).await.unwrap();
}
// has checkpoint
assert!(manager
.store()
.await
.load_last_checkpoint()
.await
.unwrap()
.is_some());
// check files
let mut expected = vec![
"00000000000000000009.checkpoint",
"00000000000000000010.json",
"00000000000000000008.checkpoint",
"00000000000000000009.json",
"_last_checkpoint",
];
expected.sort_unstable();
let mut paths = manager
.store()
.await
.get_paths(|e| Some(e.name().to_string()))
.await
.unwrap();
paths.sort_unstable();
assert_eq!(expected, paths);
// check content in `_last_checkpoint`
let raw_bytes = manager
.store()
.await
.read_file(&manager.store().await.last_checkpoint_path())
.await
.unwrap();
let raw_json = std::str::from_utf8(&raw_bytes).unwrap();
let expected_json = "{\"size\":741,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}";
assert_eq!(expected_json, raw_json);
// reopen the manager
manager.stop().await.unwrap();
let manager = reopen_manager(&env, 1, CompressionType::Uncompressed).await;
assert_eq!(10, manager.manifest().await.manifest_version);
}
#[tokio::test]
async fn checkpoint_with_different_compression_types() {
common_telemetry::init_default_ut_logging();
let mut actions = vec![];
for _ in 0..10 {
let file_meta = FileMeta {
region_id: RegionId::new(123, 456),
file_id: FileId::random(),
time_range: (0.into(), 10000000.into()),
level: 0,
file_size: 1024000,
};
let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![file_meta],
files_to_remove: vec![],
compaction_time_window: None,
flushed_sequence: None,
})]);
actions.push(action);
}
// collect and check all compression types
let mut checkpoints = vec![];
for compress_type in CompressionType::iter() {
checkpoints
.push(generate_checkpoint_with_compression_types(compress_type, actions.clone()).await);
}
let last = checkpoints.last().unwrap().clone();
assert!(checkpoints.into_iter().all(|ckpt| last.eq(&ckpt)));
}
async fn generate_checkpoint_with_compression_types(
compress_type: CompressionType,
actions: Vec<RegionMetaActionList>,
) -> RegionCheckpoint {
let (env, manager) = build_manager(1, CompressionType::Uncompressed).await;
for action in actions {
manager.update(action).await.unwrap();
}
RegionManifestManagerInner::last_checkpoint(&manager.store().await)
.await
.unwrap()
.unwrap()
}

View File

@@ -0,0 +1,50 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::storage::RegionId;
use crate::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder, SemanticType};
/// Build a basic region metadata for testing.
/// It contains three columns:
/// - ts: timestamp millisecond, semantic type: `Timestamp`, column id: 45
/// - pk: string, semantic type: `Tag`, column id: 36
/// - val: float64, semantic type: `Field`, column id: 251
pub fn basic_region_metadata() -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(23, 33), 0);
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 45,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("pk", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 36,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("val", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 251,
})
.primary_key(vec![36]);
builder.build().unwrap()
}

View File

@@ -23,8 +23,7 @@ use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::error::{RegionCorruptedSnafu, RegionNotFoundSnafu, Result};
use crate::manifest::manager::RegionManifestManager;
use crate::manifest::options::RegionManifestOptions;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::memtable::MemtableBuilderRef;
use crate::metadata::RegionMetadata;
use crate::region::version::{VersionBuilder, VersionControl};
@@ -80,7 +79,7 @@ impl RegionOpener {
manifest_dir: new_manifest_dir(&self.region_dir),
object_store: self.object_store,
compress_type: config.manifest_compress_type,
checkpoint_interval: config.manifest_checkpoint_interval,
checkpoint_distance: config.manifest_checkpoint_distance,
};
// Writes regions to the manifest file.
let manifest_manager = RegionManifestManager::new(metadata.clone(), options).await?;
@@ -105,7 +104,7 @@ impl RegionOpener {
manifest_dir: new_manifest_dir(&self.region_dir),
object_store: self.object_store,
compress_type: config.manifest_compress_type,
checkpoint_interval: config.manifest_checkpoint_interval,
checkpoint_distance: config.manifest_checkpoint_distance,
};
let manifest_manager =
RegionManifestManager::open(options)

View File

@@ -29,8 +29,7 @@ use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::error::Result;
use crate::manifest::manager::RegionManifestManager;
use crate::manifest::options::RegionManifestOptions;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::metadata::{ColumnMetadata, RegionMetadataRef, SemanticType};
use crate::request::{CreateRequest, RegionOptions};
use crate::worker::WorkerGroup;
@@ -95,7 +94,7 @@ impl TestEnv {
pub async fn create_manifest_manager(
&self,
compress_type: CompressionType,
checkpoint_interval: u64,
checkpoint_distance: u64,
initial_metadata: Option<RegionMetadataRef>,
) -> Result<Option<RegionManifestManager>> {
let data_home = self.data_home.path();
@@ -116,7 +115,7 @@ impl TestEnv {
manifest_dir,
object_store,
compress_type,
checkpoint_interval,
checkpoint_distance,
};
if let Some(metadata) = initial_metadata {