From 16be56a7438fd08175601c4c7ec2d9ff5a848860 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 12 Jul 2023 19:21:11 +0800 Subject: [PATCH] refactor(mito): port manifest storage to mito2 (#1948) * refactor(mito): port manifest storage to mito2 Signed-off-by: Ruihang Xia * remove deadcode Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/mito2/src/error.rs | 58 ++- src/mito2/src/manifest.rs | 1 + src/mito2/src/manifest/storage.rs | 713 ++++++++++++++++++++++++++++++ 3 files changed, 771 insertions(+), 1 deletion(-) create mode 100644 src/mito2/src/manifest/storage.rs diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 2722afa956..b99734b103 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -14,11 +14,67 @@ use std::any::Any; +use common_datasource::compression::CompressionType; use common_error::prelude::*; +use snafu::Location; +use store_api::manifest::ManifestVersion; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] -pub enum Error {} +pub enum Error { + #[snafu(display("OpenDAL operator failed. Location: {}, source: {}", location, source))] + OpenDal { + location: Location, + source: object_store::Error, + }, + + #[snafu(display( + "Fail to compress object by {}, path: {}, source: {}", + compress_type, + path, + source + ))] + CompressObject { + compress_type: CompressionType, + path: String, + source: std::io::Error, + }, + + #[snafu(display( + "Fail to decompress object by {}, path: {}, source: {}", + compress_type, + path, + source + ))] + DecompressObject { + compress_type: CompressionType, + path: String, + source: std::io::Error, + }, + + #[snafu(display( + "Failed to ser/de json object. Location: {}, source: {}", + location, + source + ))] + SerdeJson { + location: Location, + source: serde_json::Error, + }, + + #[snafu(display("Invalid scan index, start: {}, end: {}", start, end))] + InvalidScanIndex { + start: ManifestVersion, + end: ManifestVersion, + location: Location, + }, + + #[snafu(display("Invalid UTF-8 content. Location: {}, source: {}", location, source))] + Utf8 { + location: Location, + source: std::str::Utf8Error, + }, +} pub type Result = std::result::Result; diff --git a/src/mito2/src/manifest.rs b/src/mito2/src/manifest.rs index 47bd9a3cea..3b51310f1b 100644 --- a/src/mito2/src/manifest.rs +++ b/src/mito2/src/manifest.rs @@ -19,3 +19,4 @@ mod gc_task; mod helper; #[allow(unused_variables)] mod impl_; +mod storage; diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs new file mode 100644 index 0000000000..6a4cadfb4c --- /dev/null +++ b/src/mito2/src/manifest/storage.rs @@ -0,0 +1,713 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::iter::Iterator; +use std::str::FromStr; + +use common_datasource::compression::CompressionType; +use common_telemetry::logging; +use futures::TryStreamExt; +use lazy_static::lazy_static; +use object_store::{raw_normalize_path, util, Entry, ErrorKind, ObjectStore}; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, ResultExt}; +use store_api::manifest::ManifestVersion; + +use crate::error::{ + CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu, OpenDalSnafu, Result, + SerdeJsonSnafu, Utf8Snafu, +}; + +lazy_static! { + static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap(); + static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap(); +} + +const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint"; +const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip; +/// Due to backward compatibility, it is possible that the user's manifest file has not been compressed. +/// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing. +const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed; + +#[inline] +pub const fn manifest_compress_type(compress: bool) -> CompressionType { + if compress { + DEFAULT_MANIFEST_COMPRESSION_TYPE + } else { + FALL_BACK_COMPRESS_TYPE + } +} + +#[inline] +pub fn delta_file(version: ManifestVersion) -> String { + format!("{version:020}.json") +} + +#[inline] +pub fn checkpoint_file(version: ManifestVersion) -> String { + format!("{version:020}.checkpoint") +} + +#[inline] +pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> String { + if compress_type == CompressionType::Uncompressed { + format!("{}{}", path, file) + } else { + format!("{}{}.{}", path, file, compress_type.file_extension()) + } +} + +/// Return's the file manifest version from path +/// +/// # Panics +/// Panics if the file path is not a valid delta or checkpoint file. +#[inline] +pub fn file_version(path: &str) -> ManifestVersion { + let s = path.split('.').next().unwrap(); + s.parse().unwrap_or_else(|_| panic!("Invalid file: {path}")) +} + +/// Return's the file compress algorithm by file extension. +/// +/// for example file +/// `00000000000000000000.json.gz` -> `CompressionType::GZIP` +#[inline] +pub fn file_compress_type(path: &str) -> CompressionType { + let s = path.rsplit('.').next().unwrap_or(""); + CompressionType::from_str(s).unwrap_or(CompressionType::Uncompressed) +} + +#[inline] +pub fn is_delta_file(file_name: &str) -> bool { + DELTA_RE.is_match(file_name) +} + +#[inline] +pub fn is_checkpoint_file(file_name: &str) -> bool { + CHECKPOINT_RE.is_match(file_name) +} + +pub struct ObjectStoreLogIterator { + object_store: ObjectStore, + iter: Box + Send + Sync>, +} + +impl ObjectStoreLogIterator { + async fn next_log(&mut self) -> Result)>> { + match self.iter.next() { + Some((v, entry)) => { + let compress_type = file_compress_type(entry.name()); + let bytes = self + .object_store + .read(entry.path()) + .await + .context(OpenDalSnafu)?; + let data = compress_type + .decode(bytes) + .await + .context(DecompressObjectSnafu { + compress_type, + path: entry.path(), + })?; + Ok(Some((v, data))) + } + None => Ok(None), + } + } +} + +#[derive(Clone, Debug)] +pub struct ManifestObjectStore { + object_store: ObjectStore, + compress_type: CompressionType, + path: String, +} + +impl ManifestObjectStore { + pub fn new(path: &str, object_store: ObjectStore, compress_type: CompressionType) -> Self { + Self { + object_store, + compress_type, + path: util::normalize_dir(path), + } + } + + /// Returns the delta file path under the **current** compression algorithm + fn delta_file_path(&self, version: ManifestVersion) -> String { + gen_path(&self.path, &delta_file(version), self.compress_type) + } + + /// Returns the checkpoint file path under the **current** compression algorithm + fn checkpoint_file_path(&self, version: ManifestVersion) -> String { + gen_path(&self.path, &checkpoint_file(version), self.compress_type) + } + + /// Returns the last checkpoint path, because the last checkpoint is not compressed, + /// so its path name has nothing to do with the compression algorithm used by `ManifestObjectStore` + fn last_checkpoint_path(&self) -> String { + format!("{}{}", self.path, LAST_CHECKPOINT_FILE) + } + + /// Return all `R`s in the root directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`), + /// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`) + async fn get_paths(&self, filter: F) -> Result> + where + F: Fn(Entry) -> Option, + { + let streamer = self + .object_store + .list(&self.path) + .await + .context(OpenDalSnafu)?; + streamer + .try_filter_map(|e| async { Ok(filter(e)) }) + .try_collect::>() + .await + .context(OpenDalSnafu) + } + + pub(crate) fn path(&self) -> &str { + &self.path + } +} + +#[derive(Serialize, Deserialize, Debug)] +struct CheckpointMetadata { + pub size: usize, + /// The latest version this checkpoint contains. + pub version: ManifestVersion, + pub checksum: Option, + pub extend_metadata: Option>, +} + +impl CheckpointMetadata { + fn encode(&self) -> Result> { + serde_json::to_string(self).context(SerdeJsonSnafu) + } + + fn decode(bs: &[u8]) -> Result { + let data = std::str::from_utf8(bs).context(Utf8Snafu)?; + + serde_json::from_str(data).context(SerdeJsonSnafu) + } +} + +impl ManifestObjectStore { + async fn scan( + &self, + start: ManifestVersion, + end: ManifestVersion, + ) -> Result { + ensure!(start <= end, InvalidScanIndexSnafu { start, end }); + + let mut entries: Vec<(ManifestVersion, Entry)> = self + .get_paths(|entry| { + let file_name = entry.name(); + if is_delta_file(file_name) { + let version = file_version(file_name); + if start <= version && version < end { + return Some((version, entry)); + } + } + None + }) + .await?; + + entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2)); + + Ok(ObjectStoreLogIterator { + object_store: self.object_store.clone(), + iter: Box::new(entries.into_iter()), + }) + } + + async fn delete_until( + &self, + end: ManifestVersion, + keep_last_checkpoint: bool, + ) -> Result { + // Stores (entry, is_checkpoint, version) in a Vec. + let entries: Vec<_> = self + .get_paths(|entry| { + let file_name = entry.name(); + let is_checkpoint = is_checkpoint_file(file_name); + if is_delta_file(file_name) || is_checkpoint_file(file_name) { + let version = file_version(file_name); + if version < end { + return Some((entry, is_checkpoint, version)); + } + } + None + }) + .await?; + let checkpoint_version = if keep_last_checkpoint { + // Note that the order of entries is unspecific. + entries + .iter() + .filter_map( + |(_e, is_checkpoint, version)| { + if *is_checkpoint { + Some(version) + } else { + None + } + }, + ) + .max() + } else { + None + }; + let paths: Vec<_> = entries + .iter() + .filter(|(_e, is_checkpoint, version)| { + if let Some(max_version) = checkpoint_version { + if *is_checkpoint { + // We need to keep the checkpoint file. + version < max_version + } else { + // We can delete the log file with max_version as the checkpoint + // file contains the log file's content. + version <= max_version + } + } else { + true + } + }) + .map(|e| e.0.path().to_string()) + .collect(); + let ret = paths.len(); + + logging::debug!( + "Deleting {} logs from manifest storage path {} until {}, checkpoint: {:?}, paths: {:?}", + ret, + self.path, + end, + checkpoint_version, + paths, + ); + + self.object_store + .remove(paths) + .await + .context(OpenDalSnafu)?; + + Ok(ret) + } + + async fn delete_all(&self, remove_action_manifest: ManifestVersion) -> Result<()> { + let entries: Vec = self.get_paths(Some).await?; + + // Filter out the latest delta file. + let paths: Vec<_> = entries + .iter() + .filter(|e| { + let name = e.name(); + if is_delta_file(name) && file_version(name) == remove_action_manifest { + return false; + } + true + }) + .map(|e| e.path().to_string()) + .collect(); + + logging::info!( + "Deleting {} from manifest storage path {} paths: {:?}", + paths.len(), + self.path, + paths, + ); + + // Delete all files except the latest delta file. + self.object_store + .remove(paths) + .await + .context(OpenDalSnafu)?; + + // Delete the latest delta file and the manifest directory. + self.object_store + .remove_all(&self.path) + .await + .context(OpenDalSnafu)?; + logging::info!("Deleted manifest storage path {}", self.path); + + Ok(()) + } + + async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { + let path = self.delta_file_path(version); + logging::debug!("Save log to manifest storage, version: {}", version); + let data = self + .compress_type + .encode(bytes) + .await + .context(CompressObjectSnafu { + compress_type: self.compress_type, + path: &path, + })?; + self.object_store + .write(&path, data) + .await + .context(OpenDalSnafu) + } + + async fn delete(&self, start: ManifestVersion, end: ManifestVersion) -> Result<()> { + ensure!(start <= end, InvalidScanIndexSnafu { start, end }); + + // Due to backward compatibility, it is possible that the user's log between start and end has not been compressed, + // so we need to delete the uncompressed file corresponding to that version, even if the uncompressed file in that version do not exist. + let mut paths = Vec::with_capacity(((end - start) * 2) as usize); + for version in start..end { + paths.push(raw_normalize_path(&self.delta_file_path(version))); + if self.compress_type != FALL_BACK_COMPRESS_TYPE { + paths.push(raw_normalize_path(&gen_path( + &self.path, + &delta_file(version), + FALL_BACK_COMPRESS_TYPE, + ))); + } + } + + logging::debug!( + "Deleting logs from manifest storage, start: {}, end: {}", + start, + end + ); + + self.object_store + .remove(paths.clone()) + .await + .context(OpenDalSnafu)?; + + Ok(()) + } + + async fn save_checkpoint(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { + let path = self.checkpoint_file_path(version); + let data = self + .compress_type + .encode(bytes) + .await + .context(CompressObjectSnafu { + compress_type: self.compress_type, + path: &path, + })?; + self.object_store + .write(&path, data) + .await + .context(OpenDalSnafu)?; + + // Because last checkpoint file only contain size and version, which is tiny, so we don't compress it. + let last_checkpoint_path = self.last_checkpoint_path(); + + let checkpoint_metadata = CheckpointMetadata { + size: bytes.len(), + version, + checksum: None, + extend_metadata: None, + }; + + logging::debug!( + "Save checkpoint in path: {}, metadata: {:?}", + last_checkpoint_path, + checkpoint_metadata + ); + + let bs = checkpoint_metadata.encode()?; + self.object_store + .write(&last_checkpoint_path, bs.as_ref().to_vec()) + .await + .context(OpenDalSnafu)?; + + Ok(()) + } + + async fn load_checkpoint( + &self, + version: ManifestVersion, + ) -> Result)>> { + let path = self.checkpoint_file_path(version); + // Due to backward compatibility, it is possible that the user's checkpoint not compressed, + // so if we don't find file by compressed type. fall back to checkpoint not compressed find again. + let checkpoint_data = + match self.object_store.read(&path).await { + Ok(checkpoint) => { + let decompress_data = self.compress_type.decode(checkpoint).await.context( + DecompressObjectSnafu { + compress_type: self.compress_type, + path, + }, + )?; + Ok(Some(decompress_data)) + } + Err(e) => { + if e.kind() == ErrorKind::NotFound { + if self.compress_type != FALL_BACK_COMPRESS_TYPE { + let fall_back_path = gen_path( + &self.path, + &checkpoint_file(version), + FALL_BACK_COMPRESS_TYPE, + ); + logging::debug!( + "Failed to load checkpoint from path: {}, fall back to path: {}", + path, + fall_back_path + ); + match self.object_store.read(&fall_back_path).await { + Ok(checkpoint) => { + let decompress_data = FALL_BACK_COMPRESS_TYPE + .decode(checkpoint) + .await + .context(DecompressObjectSnafu { + compress_type: FALL_BACK_COMPRESS_TYPE, + path, + })?; + Ok(Some(decompress_data)) + } + Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), + Err(e) => Err(e).context(OpenDalSnafu), + } + } else { + Ok(None) + } + } else { + Err(e).context(OpenDalSnafu) + } + } + }?; + Ok(checkpoint_data.map(|data| (version, data))) + } + + async fn delete_checkpoint(&self, version: ManifestVersion) -> Result<()> { + // Due to backward compatibility, it is possible that the user's checkpoint file has not been compressed, + // so we need to delete the uncompressed checkpoint file corresponding to that version, even if the uncompressed checkpoint file in that version do not exist. + let paths = if self.compress_type != FALL_BACK_COMPRESS_TYPE { + vec![ + raw_normalize_path(&self.checkpoint_file_path(version)), + raw_normalize_path(&gen_path( + &self.path, + &checkpoint_file(version), + FALL_BACK_COMPRESS_TYPE, + )), + ] + } else { + vec![raw_normalize_path(&self.checkpoint_file_path(version))] + }; + + self.object_store + .remove(paths.clone()) + .await + .context(OpenDalSnafu)?; + Ok(()) + } + + async fn load_last_checkpoint(&self) -> Result)>> { + let last_checkpoint_path = self.last_checkpoint_path(); + let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await { + Ok(data) => data, + Err(e) if e.kind() == ErrorKind::NotFound => { + return Ok(None); + } + Err(e) => { + return Err(e).context(OpenDalSnafu)?; + } + }; + + let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?; + + logging::debug!( + "Load checkpoint in path: {}, metadata: {:?}", + last_checkpoint_path, + checkpoint_metadata + ); + + self.load_checkpoint(checkpoint_metadata.version).await + } +} + +#[cfg(test)] +mod tests { + use common_test_util::temp_dir::create_temp_dir; + use object_store::services::Fs; + use object_store::ObjectStore; + + use super::*; + + fn new_test_manifest_store() -> ManifestObjectStore { + common_telemetry::init_default_ut_logging(); + let tmp_dir = create_temp_dir("test_manifest_log_store"); + let mut builder = Fs::default(); + let _ = builder.root(&tmp_dir.path().to_string_lossy()); + let object_store = ObjectStore::new(builder).unwrap().finish(); + ManifestObjectStore::new("/", object_store, CompressionType::Uncompressed) + } + + #[test] + // Define this test mainly to prevent future unintentional changes may break the backward compatibility. + fn test_compress_file_path_generation() { + let path = "/foo/bar/"; + let version: ManifestVersion = 0; + let file_path = gen_path(path, &delta_file(version), CompressionType::Gzip); + assert_eq!(file_path.as_str(), "/foo/bar/00000000000000000000.json.gz") + } + + #[tokio::test] + async fn test_manifest_log_store_uncompress() { + let mut log_store = new_test_manifest_store(); + log_store.compress_type = CompressionType::Uncompressed; + test_manifest_log_store_case(log_store).await; + } + + #[tokio::test] + async fn test_manifest_log_store_compress() { + let mut log_store = new_test_manifest_store(); + log_store.compress_type = CompressionType::Gzip; + test_manifest_log_store_case(log_store).await; + } + + async fn test_manifest_log_store_case(log_store: ManifestObjectStore) { + for v in 0..5 { + log_store + .save(v, format!("hello, {v}").as_bytes()) + .await + .unwrap(); + } + + let mut it = log_store.scan(1, 4).await.unwrap(); + for v in 1..4 { + let (version, bytes) = it.next_log().await.unwrap().unwrap(); + assert_eq!(v, version); + assert_eq!(format!("hello, {v}").as_bytes(), bytes); + } + assert!(it.next_log().await.unwrap().is_none()); + + let mut it = log_store.scan(0, 11).await.unwrap(); + for v in 0..5 { + let (version, bytes) = it.next_log().await.unwrap().unwrap(); + assert_eq!(v, version); + assert_eq!(format!("hello, {v}").as_bytes(), bytes); + } + assert!(it.next_log().await.unwrap().is_none()); + + // Delete [0, 3) + log_store.delete(0, 3).await.unwrap(); + + // [3, 5) remains + let mut it = log_store.scan(0, 11).await.unwrap(); + for v in 3..5 { + let (version, bytes) = it.next_log().await.unwrap().unwrap(); + assert_eq!(v, version); + assert_eq!(format!("hello, {v}").as_bytes(), bytes); + } + assert!(it.next_log().await.unwrap().is_none()); + + // test checkpoint + assert!(log_store.load_last_checkpoint().await.unwrap().is_none()); + log_store + .save_checkpoint(3, "checkpoint".as_bytes()) + .await + .unwrap(); + + let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap(); + assert_eq!(checkpoint, "checkpoint".as_bytes()); + assert_eq!(3, v); + + //delete (,4) logs and keep checkpoint 3. + let _ = log_store.delete_until(4, true).await.unwrap(); + let _ = log_store.load_checkpoint(3).await.unwrap().unwrap(); + let _ = log_store.load_last_checkpoint().await.unwrap().unwrap(); + let mut it = log_store.scan(0, 11).await.unwrap(); + let (version, bytes) = it.next_log().await.unwrap().unwrap(); + assert_eq!(4, version); + assert_eq!("hello, 4".as_bytes(), bytes); + assert!(it.next_log().await.unwrap().is_none()); + + // delete all logs and checkpoints + let _ = log_store.delete_until(11, false).await.unwrap(); + assert!(log_store.load_checkpoint(3).await.unwrap().is_none()); + assert!(log_store.load_last_checkpoint().await.unwrap().is_none()); + let mut it = log_store.scan(0, 11).await.unwrap(); + assert!(it.next_log().await.unwrap().is_none()); + } + + #[tokio::test] + // test ManifestObjectStore can read/delete previously uncompressed data correctly + async fn test_compress_backward_compatible() { + let mut log_store = new_test_manifest_store(); + + // write uncompress data to stimulate previously uncompressed data + log_store.compress_type = CompressionType::Uncompressed; + for v in 0..5 { + log_store + .save(v, format!("hello, {v}").as_bytes()) + .await + .unwrap(); + } + log_store + .save_checkpoint(5, "checkpoint_uncompressed".as_bytes()) + .await + .unwrap(); + + // change compress type + log_store.compress_type = CompressionType::Gzip; + + // test load_last_checkpoint work correctly for previously uncompressed data + let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap(); + assert_eq!(v, 5); + assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes()); + + // write compressed data to stimulate compress alogorithom take effect + for v in 5..10 { + log_store + .save(v, format!("hello, {v}").as_bytes()) + .await + .unwrap(); + } + log_store + .save_checkpoint(10, "checkpoint_compressed".as_bytes()) + .await + .unwrap(); + + // test data reading + let mut it = log_store.scan(0, 10).await.unwrap(); + for v in 0..10 { + let (version, bytes) = it.next_log().await.unwrap().unwrap(); + assert_eq!(v, version); + assert_eq!(format!("hello, {v}").as_bytes(), bytes); + } + let (v, checkpoint) = log_store.load_checkpoint(5).await.unwrap().unwrap(); + assert_eq!(v, 5); + assert_eq!(checkpoint, "checkpoint_uncompressed".as_bytes()); + let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap(); + assert_eq!(v, 10); + assert_eq!(checkpoint, "checkpoint_compressed".as_bytes()); + + // Delete previously uncompressed checkpoint + log_store.delete_checkpoint(5).await.unwrap(); + assert!(log_store.load_checkpoint(5).await.unwrap().is_none()); + + // Delete [3, 7), contain uncompressed/compressed data + log_store.delete(3, 7).await.unwrap(); + // [3, 7) deleted + let mut it = log_store.scan(3, 7).await.unwrap(); + assert!(it.next_log().await.unwrap().is_none()); + + // Delete util 10, contain uncompressed/compressed data + // log 0, 1, 2, 7, 8, 9 will be delete + assert_eq!(6, log_store.delete_until(10, false).await.unwrap()); + let mut it = log_store.scan(0, 10).await.unwrap(); + assert!(it.next_log().await.unwrap().is_none()); + } +}