refactor(mito2): reorganize manifest storage into modular components (#7483)

* refactor(mito2): reorganize manifest storage into modular components

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: sort

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: fmt

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-12-26 10:24:27 +08:00
committed by GitHub
parent fffad499ca
commit 518a4e013b
7 changed files with 1024 additions and 579 deletions

View File

@@ -12,43 +12,46 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
pub(crate) mod checkpoint;
pub(crate) mod delta;
pub(crate) mod size_tracker;
pub(crate) mod staging;
pub(crate) mod utils;
use std::iter::Iterator;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use common_datasource::compression::CompressionType;
use common_telemetry::debug;
use crc32fast::Hasher;
use futures::TryStreamExt;
use futures::future::try_join_all;
use lazy_static::lazy_static;
use object_store::util::join_dir;
use object_store::{Entry, ErrorKind, Lister, ObjectStore, util};
use object_store::{Lister, ObjectStore, util};
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, ensure};
use store_api::ManifestVersion;
use store_api::storage::RegionId;
use tokio::sync::Semaphore;
use crate::cache::manifest_cache::ManifestCache;
use crate::error::{
ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu,
OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
};
use crate::error::{ChecksumMismatchSnafu, OpenDalSnafu, Result};
use crate::manifest::storage::checkpoint::CheckpointStorage;
use crate::manifest::storage::delta::DeltaStorage;
use crate::manifest::storage::size_tracker::{CheckpointTracker, DeltaTracker, SizeTracker};
use crate::manifest::storage::staging::StagingStorage;
use crate::manifest::storage::utils::remove_from_cache;
lazy_static! {
static ref DELTA_RE: Regex = Regex::new("^\\d+\\.json").unwrap();
static ref CHECKPOINT_RE: Regex = Regex::new("^\\d+\\.checkpoint").unwrap();
}
const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
pub const LAST_CHECKPOINT_FILE: &str = "_last_checkpoint";
const DEFAULT_MANIFEST_COMPRESSION_TYPE: CompressionType = CompressionType::Gzip;
/// Due to backward compatibility, it is possible that the user's manifest file has not been compressed.
/// So when we encounter problems, we need to fall back to `FALL_BACK_COMPRESS_TYPE` for processing.
const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
pub(crate) const FALL_BACK_COMPRESS_TYPE: CompressionType = CompressionType::Uncompressed;
const FETCH_MANIFEST_PARALLELISM: usize = 16;
/// Returns the directory to the manifest files.
@@ -81,13 +84,13 @@ pub fn gen_path(path: &str, file: &str, compress_type: CompressionType) -> Strin
}
}
fn checkpoint_checksum(data: &[u8]) -> u32 {
pub(crate) fn checkpoint_checksum(data: &[u8]) -> u32 {
let mut hasher = Hasher::new();
hasher.update(data);
hasher.finalize()
}
fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
pub(crate) fn verify_checksum(data: &[u8], wanted: Option<u32>) -> Result<()> {
if let Some(checksum) = wanted {
let calculated_checksum = checkpoint_checksum(data);
ensure!(
@@ -127,26 +130,20 @@ pub fn is_checkpoint_file(file_name: &str) -> bool {
CHECKPOINT_RE.is_match(file_name)
}
/// Key to identify a manifest file.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
enum FileKey {
/// A delta file (`.json`).
Delta(ManifestVersion),
/// A checkpoint file (`.checkpoint`).
Checkpoint(ManifestVersion),
}
#[derive(Clone, Debug)]
pub struct ManifestObjectStore {
object_store: ObjectStore,
compress_type: CompressionType,
path: String,
staging_path: String,
/// Stores the size of each manifest file.
manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
total_manifest_size: Arc<AtomicU64>,
/// Optional manifest cache for local caching.
manifest_cache: Option<ManifestCache>,
// Tracks the size of each file in the manifest directory.
size_tracker: SizeTracker,
// The checkpoint file storage.
checkpoint_storage: CheckpointStorage<CheckpointTracker>,
// The delta file storage.
delta_storage: DeltaStorage<DeltaTracker>,
/// The staging file storage.
staging_storage: StagingStorage,
}
impl ManifestObjectStore {
@@ -160,43 +157,37 @@ impl ManifestObjectStore {
common_telemetry::info!("Create manifest store, cache: {}", manifest_cache.is_some());
let path = util::normalize_dir(path);
let staging_path = {
// Convert "region_dir/manifest/" to "region_dir/staging/manifest/"
let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/');
util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
};
let size_tracker = SizeTracker::new(total_manifest_size);
let checkpoint_tracker = Arc::new(size_tracker.checkpoint_tracker());
let delta_tracker = Arc::new(size_tracker.manifest_tracker());
let checkpoint_storage = CheckpointStorage::new(
path.clone(),
object_store.clone(),
compress_type,
manifest_cache.clone(),
checkpoint_tracker,
);
let delta_storage = DeltaStorage::new(
path.clone(),
object_store.clone(),
compress_type,
manifest_cache.clone(),
delta_tracker,
);
let staging_storage =
StagingStorage::new(path.clone(), object_store.clone(), compress_type);
Self {
object_store,
compress_type,
path,
staging_path,
manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
total_manifest_size,
manifest_cache,
size_tracker,
checkpoint_storage,
delta_storage,
staging_storage,
}
}
/// Returns the delta file path under the **current** compression algorithm
fn delta_file_path(&self, version: ManifestVersion, is_staging: bool) -> String {
let base_path = if is_staging {
&self.staging_path
} else {
&self.path
};
gen_path(base_path, &delta_file(version), self.compress_type)
}
/// Returns the checkpoint file path under the **current** compression algorithm
fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
gen_path(&self.path, &checkpoint_file(version), self.compress_type)
}
/// Returns the last checkpoint path, because the last checkpoint is not compressed,
/// so its path name has nothing to do with the compression algorithm used by `ManifestObjectStore`
pub(crate) fn last_checkpoint_path(&self) -> String {
format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
}
/// Returns the manifest dir
pub(crate) fn manifest_dir(&self) -> &str {
&self.path
@@ -204,75 +195,14 @@ impl ManifestObjectStore {
/// Returns an iterator of manifests from normal or staging directory.
pub(crate) async fn manifest_lister(&self, is_staging: bool) -> Result<Option<Lister>> {
let path = if is_staging {
&self.staging_path
if is_staging {
self.staging_storage.manifest_lister().await
} else {
&self.path
};
match self.object_store.lister_with(path).await {
Ok(streamer) => Ok(Some(streamer)),
Err(e) if e.kind() == ErrorKind::NotFound => {
debug!("Manifest directory does not exist: {}", path);
Ok(None)
}
Err(e) => Err(e).context(OpenDalSnafu)?,
self.delta_storage.manifest_lister().await
}
}
/// Return all `R`s in the directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
/// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`)
/// Return an empty vector when directory is not found.
pub async fn get_paths<F, R>(&self, filter: F, is_staging: bool) -> Result<Vec<R>>
where
F: Fn(Entry) -> Option<R>,
{
let Some(streamer) = self.manifest_lister(is_staging).await? else {
return Ok(vec![]);
};
streamer
.try_filter_map(|e| async { Ok(filter(e)) })
.try_collect::<Vec<_>>()
.await
.context(OpenDalSnafu)
}
/// Sorts the manifest files.
fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) {
entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
}
/// Scans the manifest files in the range of [start, end) and return all manifest entries.
pub async fn scan(
&self,
start: ManifestVersion,
end: ManifestVersion,
) -> Result<Vec<(ManifestVersion, Entry)>> {
ensure!(start <= end, InvalidScanIndexSnafu { start, end });
let mut entries: Vec<(ManifestVersion, Entry)> = self
.get_paths(
|entry| {
let file_name = entry.name();
if is_delta_file(file_name) {
let version = file_version(file_name);
if start <= version && version < end {
return Some((version, entry));
}
}
None
},
false,
)
.await?;
Self::sort_manifests(&mut entries);
Ok(entries)
}
/// Fetches manifests in range [start_version, end_version).
///
/// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
pub async fn fetch_manifests_strict_from(
&self,
@@ -280,70 +210,9 @@ impl ManifestObjectStore {
end_version: ManifestVersion,
region_id: RegionId,
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
let mut manifests = self.fetch_manifests(start_version, end_version).await?;
let start_index = manifests.iter().position(|(v, _)| *v == start_version);
debug!(
"Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
start_version,
end_version,
start_index,
region_id,
manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
);
if let Some(start_index) = start_index {
Ok(manifests.split_off(start_index))
} else {
Ok(vec![])
}
}
/// Common implementation for fetching manifests from entries in parallel.
/// If `is_staging` is true, cache is skipped.
async fn fetch_manifests_from_entries(
&self,
entries: Vec<(ManifestVersion, Entry)>,
is_staging: bool,
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
if entries.is_empty() {
return Ok(vec![]);
}
// TODO(weny): Make it configurable.
let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
let tasks = entries.iter().map(|(v, entry)| async {
// Safety: semaphore must exist.
let _permit = semaphore.acquire().await.unwrap();
let cache_key = entry.path();
// Try to get from cache first
if let Some(data) = self.get_from_cache(cache_key, is_staging).await {
return Ok((*v, data));
}
// Fetch from remote object store
let compress_type = file_compress_type(entry.name());
let bytes = self
.object_store
.read(entry.path())
.await
.context(OpenDalSnafu)?;
let data = compress_type
.decode(bytes)
.await
.context(DecompressObjectSnafu {
compress_type,
path: entry.path(),
})?;
// Add to cache
self.put_to_cache(cache_key.to_string(), &data, is_staging)
.await;
Ok((*v, data))
});
try_join_all(tasks).await
self.delta_storage
.fetch_manifests_strict_from(start_version, end_version, region_id)
.await
}
/// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
@@ -355,8 +224,9 @@ impl ManifestObjectStore {
start_version: ManifestVersion,
end_version: ManifestVersion,
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
let manifests = self.scan(start_version, end_version).await?;
self.fetch_manifests_from_entries(manifests, false).await
self.delta_storage
.fetch_manifests(start_version, end_version)
.await
}
/// Delete manifest files that version < end.
@@ -370,20 +240,18 @@ impl ManifestObjectStore {
) -> Result<usize> {
// Stores (entry, is_checkpoint, version) in a Vec.
let entries: Vec<_> = self
.get_paths(
|entry| {
let file_name = entry.name();
let is_checkpoint = is_checkpoint_file(file_name);
if is_delta_file(file_name) || is_checkpoint_file(file_name) {
let version = file_version(file_name);
if version < end {
return Some((entry, is_checkpoint, version));
}
.delta_storage
.get_paths(|entry| {
let file_name = entry.name();
let is_checkpoint = is_checkpoint_file(file_name);
if is_delta_file(file_name) || is_checkpoint_file(file_name) {
let version = file_version(file_name);
if version < end {
return Some((entry, is_checkpoint, version));
}
None
},
false,
)
}
None
})
.await?;
let checkpoint_version = if keep_last_checkpoint {
// Note that the order of entries is unspecific.
@@ -428,7 +296,7 @@ impl ManifestObjectStore {
// Remove from cache first
for (entry, _, _) in &del_entries {
self.remove_from_cache(entry.path()).await;
remove_from_cache(self.manifest_cache.as_ref(), entry.path()).await;
}
self.object_store
@@ -439,9 +307,11 @@ impl ManifestObjectStore {
// delete manifest sizes
for (_, is_checkpoint, version) in &del_entries {
if *is_checkpoint {
self.unset_file_size(&FileKey::Checkpoint(*version));
self.size_tracker
.remove(&size_tracker::FileKey::Checkpoint(*version));
} else {
self.unset_file_size(&FileKey::Delta(*version));
self.size_tracker
.remove(&size_tracker::FileKey::Delta(*version));
}
}
@@ -455,22 +325,11 @@ impl ManifestObjectStore {
bytes: &[u8],
is_staging: bool,
) -> Result<()> {
let path = self.delta_file_path(version, is_staging);
debug!("Save log to manifest storage, version: {}", version);
let data = self
.compress_type
.encode(bytes)
.await
.context(CompressObjectSnafu {
compress_type: self.compress_type,
path: &path,
})?;
let delta_size = data.len();
self.write_and_put_cache(&path, data, is_staging).await?;
self.set_delta_file_size(version, delta_size as u64);
Ok(())
if is_staging {
self.staging_storage.save(version, bytes).await
} else {
self.delta_storage.save(version, bytes).await
}
}
/// Save the checkpoint manifest file.
@@ -479,155 +338,50 @@ impl ManifestObjectStore {
version: ManifestVersion,
bytes: &[u8],
) -> Result<()> {
let path = self.checkpoint_file_path(version);
let data = self
.compress_type
.encode(bytes)
self.checkpoint_storage
.save_checkpoint(version, bytes)
.await
.context(CompressObjectSnafu {
compress_type: self.compress_type,
path: &path,
})?;
let checkpoint_size = data.len();
let checksum = checkpoint_checksum(bytes);
self.write_and_put_cache(&path, data, false).await?;
self.set_checkpoint_file_size(version, checkpoint_size as u64);
// Because last checkpoint file only contain size and version, which is tiny, so we don't compress it.
let last_checkpoint_path = self.last_checkpoint_path();
let checkpoint_metadata = CheckpointMetadata {
size: bytes.len(),
version,
checksum: Some(checksum),
extend_metadata: HashMap::new(),
};
debug!(
"Save checkpoint in path: {}, metadata: {:?}",
last_checkpoint_path, checkpoint_metadata
);
let bytes = checkpoint_metadata.encode()?;
self.object_store
.write(&last_checkpoint_path, bytes)
.await
.context(OpenDalSnafu)?;
Ok(())
}
async fn load_checkpoint(
&mut self,
metadata: CheckpointMetadata,
) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let version = metadata.version;
let path = self.checkpoint_file_path(version);
// Try to get from cache first
if let Some(data) = self.get_from_cache(&path, false).await {
verify_checksum(&data, metadata.checksum)?;
return Ok(Some((version, data)));
}
// Due to backward compatibility, it is possible that the user's checkpoint not compressed,
// so if we don't find file by compressed type. fall back to checkpoint not compressed find again.
let checkpoint_data = match self.object_store.read(&path).await {
Ok(checkpoint) => {
let checkpoint_size = checkpoint.len();
let decompress_data =
self.compress_type
.decode(checkpoint)
.await
.with_context(|_| DecompressObjectSnafu {
compress_type: self.compress_type,
path: path.clone(),
})?;
verify_checksum(&decompress_data, metadata.checksum)?;
// set the checkpoint size
self.set_checkpoint_file_size(version, checkpoint_size as u64);
// Add to cache
self.put_to_cache(path, &decompress_data, false).await;
Ok(Some(decompress_data))
}
Err(e) => {
if e.kind() == ErrorKind::NotFound {
if self.compress_type != FALL_BACK_COMPRESS_TYPE {
let fall_back_path = gen_path(
&self.path,
&checkpoint_file(version),
FALL_BACK_COMPRESS_TYPE,
);
debug!(
"Failed to load checkpoint from path: {}, fall back to path: {}",
path, fall_back_path
);
// Try to get fallback from cache first
if let Some(data) = self.get_from_cache(&fall_back_path, false).await {
verify_checksum(&data, metadata.checksum)?;
return Ok(Some((version, data)));
}
match self.object_store.read(&fall_back_path).await {
Ok(checkpoint) => {
let checkpoint_size = checkpoint.len();
let decompress_data = FALL_BACK_COMPRESS_TYPE
.decode(checkpoint)
.await
.with_context(|_| DecompressObjectSnafu {
compress_type: FALL_BACK_COMPRESS_TYPE,
path: fall_back_path.clone(),
})?;
verify_checksum(&decompress_data, metadata.checksum)?;
self.set_checkpoint_file_size(version, checkpoint_size as u64);
// Add fallback to cache
self.put_to_cache(fall_back_path, &decompress_data, false)
.await;
Ok(Some(decompress_data))
}
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
Err(e) => Err(e).context(OpenDalSnafu),
}
} else {
Ok(None)
}
} else {
Err(e).context(OpenDalSnafu)
}
}
}?;
Ok(checkpoint_data.map(|data| (version, data)))
}
/// Load the latest checkpoint.
/// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let last_checkpoint_path = self.last_checkpoint_path();
// Fetch from remote object store without cache
let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
Ok(data) => data.to_vec(),
Err(e) if e.kind() == ErrorKind::NotFound => {
return Ok(None);
}
Err(e) => {
return Err(e).context(OpenDalSnafu)?;
}
};
let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?;
debug!(
"Load checkpoint in path: {}, metadata: {:?}",
last_checkpoint_path, checkpoint_metadata
);
self.load_checkpoint(checkpoint_metadata).await
self.checkpoint_storage.load_last_checkpoint().await
}
#[cfg(test)]
/// Compute the size(Byte) in manifest size map.
pub(crate) fn total_manifest_size(&self) -> u64 {
self.size_tracker.total()
}
/// Resets the size of all files.
pub(crate) fn reset_manifest_size(&mut self) {
self.size_tracker.reset();
}
/// Set the size of the delta file by delta version.
pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
self.size_tracker.record_delta(version, size);
}
/// Set the size of the checkpoint file by checkpoint version.
pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
self.size_tracker.record_checkpoint(version, size);
}
/// Fetch all staging manifest files and return them as (version, action_list) pairs.
pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
self.staging_storage.fetch_manifests().await
}
/// Clear all staging manifest files.
pub async fn clear_staging_manifests(&mut self) -> Result<()> {
self.staging_storage.clear().await
}
}
#[cfg(test)]
impl ManifestObjectStore {
pub async fn read_file(&self, path: &str) -> Result<Vec<u8>> {
self.object_store
.read(path)
@@ -636,214 +390,18 @@ impl ManifestObjectStore {
.map(|v| v.to_vec())
}
#[cfg(test)]
pub async fn write_last_checkpoint(
&mut self,
version: ManifestVersion,
bytes: &[u8],
) -> Result<()> {
let path = self.checkpoint_file_path(version);
let data = self
.compress_type
.encode(bytes)
.await
.context(CompressObjectSnafu {
compress_type: self.compress_type,
path: &path,
})?;
let checkpoint_size = data.len();
self.object_store
.write(&path, data)
.await
.context(OpenDalSnafu)?;
self.set_checkpoint_file_size(version, checkpoint_size as u64);
let last_checkpoint_path = self.last_checkpoint_path();
let checkpoint_metadata = CheckpointMetadata {
size: bytes.len(),
version,
checksum: Some(1218259706),
extend_metadata: HashMap::new(),
};
debug!(
"Rewrite checkpoint in path: {}, metadata: {:?}",
last_checkpoint_path, checkpoint_metadata
);
let bytes = checkpoint_metadata.encode()?;
// Overwrite the last checkpoint with the modified content
self.object_store
.write(&last_checkpoint_path, bytes.clone())
.await
.context(OpenDalSnafu)?;
Ok(())
pub(crate) fn checkpoint_storage(&self) -> &CheckpointStorage<CheckpointTracker> {
&self.checkpoint_storage
}
/// Compute the size(Byte) in manifest size map.
pub(crate) fn total_manifest_size(&self) -> u64 {
self.manifest_size_map.read().unwrap().values().sum()
pub(crate) fn delta_storage(&self) -> &DeltaStorage<DeltaTracker> {
&self.delta_storage
}
/// Resets the size of all files.
pub(crate) fn reset_manifest_size(&mut self) {
self.manifest_size_map.write().unwrap().clear();
self.total_manifest_size.store(0, Ordering::Relaxed);
}
/// Set the size of the delta file by delta version.
pub(crate) fn set_delta_file_size(&mut self, version: ManifestVersion, size: u64) {
let mut m = self.manifest_size_map.write().unwrap();
m.insert(FileKey::Delta(version), size);
self.inc_total_manifest_size(size);
}
/// Set the size of the checkpoint file by checkpoint version.
pub(crate) fn set_checkpoint_file_size(&self, version: ManifestVersion, size: u64) {
let mut m = self.manifest_size_map.write().unwrap();
m.insert(FileKey::Checkpoint(version), size);
self.inc_total_manifest_size(size);
}
fn unset_file_size(&self, key: &FileKey) {
let mut m = self.manifest_size_map.write().unwrap();
if let Some(val) = m.remove(key) {
debug!("Unset file size: {:?}, size: {}", key, val);
self.dec_total_manifest_size(val);
}
}
fn inc_total_manifest_size(&self, val: u64) {
self.total_manifest_size.fetch_add(val, Ordering::Relaxed);
}
fn dec_total_manifest_size(&self, val: u64) {
self.total_manifest_size.fetch_sub(val, Ordering::Relaxed);
}
/// Fetch all staging manifest files and return them as (version, action_list) pairs.
pub async fn fetch_staging_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
let manifest_entries = self
.get_paths(
|entry| {
let file_name = entry.name();
if is_delta_file(file_name) {
let version = file_version(file_name);
Some((version, entry))
} else {
None
}
},
true,
)
.await?;
let mut sorted_entries = manifest_entries;
Self::sort_manifests(&mut sorted_entries);
self.fetch_manifests_from_entries(sorted_entries, true)
.await
}
/// Clear all staging manifest files.
pub async fn clear_staging_manifests(&mut self) -> Result<()> {
self.object_store
.remove_all(&self.staging_path)
.await
.context(OpenDalSnafu)?;
debug!(
"Cleared all staging manifest files from {}",
self.staging_path
);
Ok(())
}
/// Gets a manifest file from cache.
/// Returns the file data if found in cache, None otherwise.
/// If `is_staging` is true, always returns None.
async fn get_from_cache(&self, key: &str, is_staging: bool) -> Option<Vec<u8>> {
if is_staging {
return None;
}
let cache = self.manifest_cache.as_ref()?;
cache.get_file(key).await
}
/// Puts a manifest file into cache.
/// If `is_staging` is true, does nothing.
async fn put_to_cache(&self, key: String, data: &[u8], is_staging: bool) {
if is_staging {
return;
}
let Some(cache) = &self.manifest_cache else {
return;
};
cache.put_file(key, data.to_vec()).await;
}
/// Writes data to object store and puts it into cache.
/// If `is_staging` is true, cache is skipped.
async fn write_and_put_cache(&self, path: &str, data: Vec<u8>, is_staging: bool) -> Result<()> {
// Clone data for cache before writing, only if cache is enabled and not staging
let cache_data = if !is_staging && self.manifest_cache.is_some() {
Some(data.clone())
} else {
None
};
// Write to object store
self.object_store
.write(path, data)
.await
.context(OpenDalSnafu)?;
// Put to cache if we cloned the data
if let Some(data) = cache_data {
self.put_to_cache(path.to_string(), &data, is_staging).await;
}
Ok(())
}
/// Removes a manifest file from cache.
async fn remove_from_cache(&self, key: &str) {
let Some(cache) = &self.manifest_cache else {
return;
};
cache.remove(key).await;
}
}
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct CheckpointMetadata {
pub size: usize,
/// The latest version this checkpoint contains.
pub version: ManifestVersion,
pub checksum: Option<u32>,
pub extend_metadata: HashMap<String, String>,
}
impl CheckpointMetadata {
fn encode(&self) -> Result<Vec<u8>> {
Ok(serde_json::to_string(self)
.context(SerdeJsonSnafu)?
.into_bytes())
}
fn decode(bs: &[u8]) -> Result<Self> {
let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
serde_json::from_str(data).context(SerdeJsonSnafu)
pub(crate) fn set_compress_type(&mut self, compress_type: CompressionType) {
self.checkpoint_storage.set_compress_type(compress_type);
self.delta_storage.set_compress_type(compress_type);
self.staging_storage.set_compress_type(compress_type);
}
}
@@ -854,6 +412,7 @@ mod tests {
use object_store::services::Fs;
use super::*;
use crate::manifest::storage::checkpoint::CheckpointMetadata;
fn new_test_manifest_store() -> ManifestObjectStore {
common_telemetry::init_default_ut_logging();
@@ -890,14 +449,14 @@ mod tests {
#[tokio::test]
async fn test_manifest_log_store_uncompress() {
let mut log_store = new_test_manifest_store();
log_store.compress_type = CompressionType::Uncompressed;
log_store.set_compress_type(CompressionType::Uncompressed);
test_manifest_log_store_case(log_store).await;
}
#[tokio::test]
async fn test_manifest_log_store_compress() {
let mut log_store = new_test_manifest_store();
log_store.compress_type = CompressionType::Gzip;
log_store.set_compress_type(CompressionType::Gzip);
test_manifest_log_store_case(log_store).await;
}
@@ -941,6 +500,7 @@ mod tests {
//delete (,4) logs and keep checkpoint 3.
let _ = log_store.delete_until(4, true).await.unwrap();
let _ = log_store
.checkpoint_storage
.load_checkpoint(new_checkpoint_metadata_with_version(3))
.await
.unwrap()
@@ -958,6 +518,7 @@ mod tests {
let _ = log_store.delete_until(11, false).await.unwrap();
assert!(
log_store
.checkpoint_storage
.load_checkpoint(new_checkpoint_metadata_with_version(3))
.await
.unwrap()
@@ -976,7 +537,7 @@ mod tests {
let mut log_store = new_test_manifest_store();
// write uncompress data to stimulate previously uncompressed data
log_store.compress_type = CompressionType::Uncompressed;
log_store.set_compress_type(CompressionType::Uncompressed);
for v in 0..5 {
log_store
.save(v, format!("hello, {v}").as_bytes(), false)
@@ -989,7 +550,7 @@ mod tests {
.unwrap();
// change compress type
log_store.compress_type = CompressionType::Gzip;
log_store.set_compress_type(CompressionType::Gzip);
// test load_last_checkpoint work correctly for previously uncompressed data
let (v, checkpoint) = log_store.load_last_checkpoint().await.unwrap().unwrap();
@@ -1018,6 +579,7 @@ mod tests {
assert_eq!(format!("hello, {v}").as_bytes(), bytes);
}
let (v, checkpoint) = log_store
.checkpoint_storage
.load_checkpoint(new_checkpoint_metadata_with_version(5))
.await
.unwrap()
@@ -1052,7 +614,7 @@ mod tests {
async fn test_uncompressed_manifest_files_size() {
let mut log_store = new_test_manifest_store();
// write 5 manifest files with uncompressed8B per file
log_store.compress_type = CompressionType::Uncompressed;
log_store.set_compress_type(CompressionType::Uncompressed);
for v in 0..5 {
log_store
.save(v, format!("hello, {v}").as_bytes(), false)
@@ -1090,7 +652,7 @@ mod tests {
async fn test_compressed_manifest_files_size() {
let mut log_store = new_test_manifest_store();
// Test with compressed manifest files
log_store.compress_type = CompressionType::Gzip;
log_store.set_compress_type(CompressionType::Gzip);
// write 5 manifest files
for v in 0..5 {
log_store

View File

@@ -0,0 +1,316 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use common_datasource::compression::CompressionType;
use common_telemetry::debug;
use object_store::{ErrorKind, ObjectStore};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::ManifestVersion;
use crate::cache::manifest_cache::ManifestCache;
use crate::error::{
CompressObjectSnafu, DecompressObjectSnafu, OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
};
use crate::manifest::storage::size_tracker::Tracker;
use crate::manifest::storage::utils::{get_from_cache, put_to_cache, write_and_put_cache};
use crate::manifest::storage::{
FALL_BACK_COMPRESS_TYPE, LAST_CHECKPOINT_FILE, checkpoint_checksum, checkpoint_file, gen_path,
verify_checksum,
};
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct CheckpointMetadata {
pub size: usize,
/// The latest version this checkpoint contains.
pub version: ManifestVersion,
pub checksum: Option<u32>,
pub extend_metadata: HashMap<String, String>,
}
impl CheckpointMetadata {
fn encode(&self) -> Result<Vec<u8>> {
Ok(serde_json::to_string(self)
.context(SerdeJsonSnafu)?
.into_bytes())
}
fn decode(bs: &[u8]) -> Result<Self> {
let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
serde_json::from_str(data).context(SerdeJsonSnafu)
}
}
/// Handle checkpoint storage operations.
#[derive(Debug, Clone)]
pub(crate) struct CheckpointStorage<T: Tracker> {
object_store: ObjectStore,
compress_type: CompressionType,
path: String,
manifest_cache: Option<ManifestCache>,
size_tracker: Arc<T>,
}
impl<T: Tracker> CheckpointStorage<T> {
pub fn new(
path: String,
object_store: ObjectStore,
compress_type: CompressionType,
manifest_cache: Option<ManifestCache>,
size_tracker: Arc<T>,
) -> Self {
Self {
object_store,
compress_type,
path,
manifest_cache,
size_tracker,
}
}
/// Returns the last checkpoint path, because the last checkpoint is not compressed,
/// so its path name has nothing to do with the compression algorithm used by `ManifestObjectStore`
pub(crate) fn last_checkpoint_path(&self) -> String {
format!("{}{}", self.path, LAST_CHECKPOINT_FILE)
}
/// Returns the checkpoint file path under the **current** compression algorithm
fn checkpoint_file_path(&self, version: ManifestVersion) -> String {
gen_path(&self.path, &checkpoint_file(version), self.compress_type)
}
pub(crate) async fn load_checkpoint(
&mut self,
metadata: CheckpointMetadata,
) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let version = metadata.version;
let path = self.checkpoint_file_path(version);
// Try to get from cache first
if let Some(data) = get_from_cache(self.manifest_cache.as_ref(), &path).await {
verify_checksum(&data, metadata.checksum)?;
return Ok(Some((version, data)));
}
// Due to backward compatibility, it is possible that the user's checkpoint not compressed,
// so if we don't find file by compressed type. fall back to checkpoint not compressed find again.
let checkpoint_data = match self.object_store.read(&path).await {
Ok(checkpoint) => {
let checkpoint_size = checkpoint.len();
let decompress_data =
self.compress_type
.decode(checkpoint)
.await
.with_context(|_| DecompressObjectSnafu {
compress_type: self.compress_type,
path: path.clone(),
})?;
verify_checksum(&decompress_data, metadata.checksum)?;
// set the checkpoint size
self.size_tracker.record(version, checkpoint_size as u64);
// Add to cache
put_to_cache(self.manifest_cache.as_ref(), path, &decompress_data).await;
Ok(Some(decompress_data))
}
Err(e) => {
if e.kind() == ErrorKind::NotFound {
if self.compress_type != FALL_BACK_COMPRESS_TYPE {
let fall_back_path = gen_path(
&self.path,
&checkpoint_file(version),
FALL_BACK_COMPRESS_TYPE,
);
debug!(
"Failed to load checkpoint from path: {}, fall back to path: {}",
path, fall_back_path
);
// Try to get fallback from cache first
if let Some(data) =
get_from_cache(self.manifest_cache.as_ref(), &fall_back_path).await
{
verify_checksum(&data, metadata.checksum)?;
return Ok(Some((version, data)));
}
match self.object_store.read(&fall_back_path).await {
Ok(checkpoint) => {
let checkpoint_size = checkpoint.len();
let decompress_data = FALL_BACK_COMPRESS_TYPE
.decode(checkpoint)
.await
.with_context(|_| DecompressObjectSnafu {
compress_type: FALL_BACK_COMPRESS_TYPE,
path: fall_back_path.clone(),
})?;
verify_checksum(&decompress_data, metadata.checksum)?;
self.size_tracker.record(version, checkpoint_size as u64);
// Add fallback to cache
put_to_cache(
self.manifest_cache.as_ref(),
fall_back_path,
&decompress_data,
)
.await;
Ok(Some(decompress_data))
}
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
Err(e) => return Err(e).context(OpenDalSnafu),
}
} else {
Ok(None)
}
} else {
Err(e).context(OpenDalSnafu)
}
}
}?;
Ok(checkpoint_data.map(|data| (version, data)))
}
/// Load the latest checkpoint.
/// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let last_checkpoint_path = self.last_checkpoint_path();
// Fetch from remote object store without cache
let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
Ok(data) => data.to_vec(),
Err(e) if e.kind() == ErrorKind::NotFound => {
return Ok(None);
}
Err(e) => {
return Err(e).context(OpenDalSnafu)?;
}
};
let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?;
debug!(
"Load checkpoint in path: {}, metadata: {:?}",
last_checkpoint_path, checkpoint_metadata
);
self.load_checkpoint(checkpoint_metadata).await
}
/// Save the checkpoint manifest file.
pub(crate) async fn save_checkpoint(
&self,
version: ManifestVersion,
bytes: &[u8],
) -> Result<()> {
let path = self.checkpoint_file_path(version);
let data = self
.compress_type
.encode(bytes)
.await
.context(CompressObjectSnafu {
compress_type: self.compress_type,
path: &path,
})?;
let checkpoint_size = data.len();
let checksum = checkpoint_checksum(bytes);
write_and_put_cache(
&self.object_store,
self.manifest_cache.as_ref(),
&path,
data,
)
.await?;
self.size_tracker.record(version, checkpoint_size as u64);
// Because last checkpoint file only contain size and version, which is tiny, so we don't compress it.
let last_checkpoint_path = self.last_checkpoint_path();
let checkpoint_metadata = CheckpointMetadata {
size: bytes.len(),
version,
checksum: Some(checksum),
extend_metadata: HashMap::new(),
};
debug!(
"Save checkpoint in path: {}, metadata: {:?}",
last_checkpoint_path, checkpoint_metadata
);
let bytes = checkpoint_metadata.encode()?;
self.object_store
.write(&last_checkpoint_path, bytes)
.await
.context(OpenDalSnafu)?;
Ok(())
}
}
#[cfg(test)]
impl<T: Tracker> CheckpointStorage<T> {
pub async fn write_last_checkpoint(
&self,
version: ManifestVersion,
bytes: &[u8],
) -> Result<()> {
let path = self.checkpoint_file_path(version);
let data = self
.compress_type
.encode(bytes)
.await
.context(CompressObjectSnafu {
compress_type: self.compress_type,
path: &path,
})?;
let checkpoint_size = data.len();
self.object_store
.write(&path, data)
.await
.context(OpenDalSnafu)?;
self.size_tracker.record(version, checkpoint_size as u64);
let last_checkpoint_path = self.last_checkpoint_path();
let checkpoint_metadata = CheckpointMetadata {
size: bytes.len(),
version,
checksum: Some(1218259706),
extend_metadata: HashMap::new(),
};
debug!(
"Rewrite checkpoint in path: {}, metadata: {:?}",
last_checkpoint_path, checkpoint_metadata
);
let bytes = checkpoint_metadata.encode()?;
// Overwrite the last checkpoint with the modified content
self.object_store
.write(&last_checkpoint_path, bytes.clone())
.await
.context(OpenDalSnafu)?;
Ok(())
}
pub fn set_compress_type(&mut self, compress_type: CompressionType) {
self.compress_type = compress_type;
}
}

View File

@@ -0,0 +1,251 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_datasource::compression::CompressionType;
use common_telemetry::debug;
use futures::TryStreamExt;
use futures::future::try_join_all;
use object_store::{Entry, ErrorKind, Lister, ObjectStore};
use snafu::{ResultExt, ensure};
use store_api::ManifestVersion;
use store_api::storage::RegionId;
use tokio::sync::Semaphore;
use crate::cache::manifest_cache::ManifestCache;
use crate::error::{
CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu, OpenDalSnafu, Result,
};
use crate::manifest::storage::size_tracker::Tracker;
use crate::manifest::storage::utils::{
get_from_cache, put_to_cache, sort_manifests, write_and_put_cache,
};
use crate::manifest::storage::{
FETCH_MANIFEST_PARALLELISM, delta_file, file_compress_type, file_version, gen_path,
is_delta_file,
};
#[derive(Debug, Clone)]
pub(crate) struct DeltaStorage<T: Tracker> {
object_store: ObjectStore,
compress_type: CompressionType,
path: String,
delta_tracker: Arc<T>,
manifest_cache: Option<ManifestCache>,
}
impl<T: Tracker> DeltaStorage<T> {
pub(crate) fn new(
path: String,
object_store: ObjectStore,
compress_type: CompressionType,
manifest_cache: Option<ManifestCache>,
delta_tracker: Arc<T>,
) -> Self {
Self {
object_store,
compress_type,
path,
delta_tracker,
manifest_cache,
}
}
pub(crate) fn path(&self) -> &str {
&self.path
}
pub(crate) fn object_store(&self) -> &ObjectStore {
&self.object_store
}
fn delta_file_path(&self, version: ManifestVersion) -> String {
gen_path(&self.path, &delta_file(version), self.compress_type)
}
/// Returns an iterator of manifests from path directory.
pub(crate) async fn manifest_lister(&self) -> Result<Option<Lister>> {
match self.object_store.lister_with(&self.path).await {
Ok(streamer) => Ok(Some(streamer)),
Err(e) if e.kind() == ErrorKind::NotFound => {
debug!("Manifest directory does not exist: {}", self.path);
Ok(None)
}
Err(e) => Err(e).context(OpenDalSnafu)?,
}
}
/// Return all `R`s in the directory that meet the `filter` conditions (that is, the `filter` closure returns `Some(R)`),
/// and discard `R` that does not meet the conditions (that is, the `filter` closure returns `None`)
/// Return an empty vector when directory is not found.
pub async fn get_paths<F, R>(&self, filter: F) -> Result<Vec<R>>
where
F: Fn(Entry) -> Option<R>,
{
let Some(streamer) = self.manifest_lister().await? else {
return Ok(vec![]);
};
streamer
.try_filter_map(|e| async { Ok(filter(e)) })
.try_collect::<Vec<_>>()
.await
.context(OpenDalSnafu)
}
/// Scans the manifest files in the range of [start, end) and return all manifest entries.
pub async fn scan(
&self,
start: ManifestVersion,
end: ManifestVersion,
) -> Result<Vec<(ManifestVersion, Entry)>> {
ensure!(start <= end, InvalidScanIndexSnafu { start, end });
let mut entries: Vec<(ManifestVersion, Entry)> = self
.get_paths(|entry| {
let file_name = entry.name();
if is_delta_file(file_name) {
let version = file_version(file_name);
if start <= version && version < end {
return Some((version, entry));
}
}
None
})
.await?;
sort_manifests(&mut entries);
Ok(entries)
}
/// Fetches manifests in range [start_version, end_version).
///
/// This functions is guaranteed to return manifests from the `start_version` strictly (must contain `start_version`).
pub async fn fetch_manifests_strict_from(
&self,
start_version: ManifestVersion,
end_version: ManifestVersion,
region_id: RegionId,
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
let mut manifests = self.fetch_manifests(start_version, end_version).await?;
let start_index = manifests.iter().position(|(v, _)| *v == start_version);
debug!(
"Fetches manifests in range [{},{}), start_index: {:?}, region_id: {}, manifests: {:?}",
start_version,
end_version,
start_index,
region_id,
manifests.iter().map(|(v, _)| *v).collect::<Vec<_>>()
);
if let Some(start_index) = start_index {
Ok(manifests.split_off(start_index))
} else {
Ok(vec![])
}
}
/// Common implementation for fetching manifests from entries in parallel.
pub(crate) async fn fetch_manifests_from_entries(
&self,
entries: Vec<(ManifestVersion, Entry)>,
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
if entries.is_empty() {
return Ok(vec![]);
}
// TODO(weny): Make it configurable.
let semaphore = Semaphore::new(FETCH_MANIFEST_PARALLELISM);
let tasks = entries.iter().map(|(v, entry)| async {
// Safety: semaphore must exist.
let _permit = semaphore.acquire().await.unwrap();
let cache_key = entry.path();
// Try to get from cache first
if let Some(data) = get_from_cache(self.manifest_cache.as_ref(), cache_key).await {
return Ok((*v, data));
}
// Fetch from remote object store
let compress_type = file_compress_type(entry.name());
let bytes = self
.object_store
.read(entry.path())
.await
.context(OpenDalSnafu)?;
let data = compress_type
.decode(bytes)
.await
.context(DecompressObjectSnafu {
compress_type,
path: entry.path(),
})?;
// Add to cache
put_to_cache(self.manifest_cache.as_ref(), cache_key.to_string(), &data).await;
Ok((*v, data))
});
try_join_all(tasks).await
}
/// Fetch all manifests in concurrent, and return the manifests in range [start_version, end_version)
///
/// **Notes**: This function is no guarantee to return manifests from the `start_version` strictly.
/// Uses [fetch_manifests_strict_from](DeltaStorage::fetch_manifests_strict_from) to get manifests from the `start_version`.
pub async fn fetch_manifests(
&self,
start_version: ManifestVersion,
end_version: ManifestVersion,
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
let manifests = self.scan(start_version, end_version).await?;
self.fetch_manifests_from_entries(manifests).await
}
/// Save the delta manifest file.
pub async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
let path = self.delta_file_path(version);
debug!("Save log to manifest storage, version: {}", version);
let data = self
.compress_type
.encode(bytes)
.await
.context(CompressObjectSnafu {
compress_type: self.compress_type,
path: &path,
})?;
let delta_size = data.len();
write_and_put_cache(
&self.object_store,
self.manifest_cache.as_ref(),
&path,
data,
)
.await?;
self.delta_tracker.record(version, delta_size as u64);
Ok(())
}
}
#[cfg(test)]
impl<T: Tracker> DeltaStorage<T> {
pub fn set_compress_type(&mut self, compress_type: CompressionType) {
self.compress_type = compress_type;
}
}

View File

@@ -0,0 +1,130 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use store_api::ManifestVersion;
/// Key to identify a manifest file.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub(crate) enum FileKey {
/// A delta file (`.json`).
Delta(ManifestVersion),
/// A checkpoint file (`.checkpoint`).
Checkpoint(ManifestVersion),
}
pub(crate) trait Tracker: Send + Sync + Debug {
fn record(&self, version: ManifestVersion, size: u64);
}
#[derive(Debug, Clone)]
pub struct CheckpointTracker {
size_tracker: SizeTracker,
}
impl Tracker for CheckpointTracker {
fn record(&self, version: ManifestVersion, size: u64) {
self.size_tracker.record(FileKey::Checkpoint(version), size);
}
}
#[derive(Debug, Clone)]
pub struct DeltaTracker {
size_tracker: SizeTracker,
}
impl Tracker for DeltaTracker {
fn record(&self, version: ManifestVersion, size: u64) {
self.size_tracker.record(FileKey::Delta(version), size);
}
}
#[derive(Debug, Clone)]
pub struct NoopTracker;
impl Tracker for NoopTracker {
fn record(&self, _version: ManifestVersion, _size: u64) {
// noop
}
}
#[derive(Debug, Clone, Default)]
pub(crate) struct SizeTracker {
file_sizes: Arc<RwLock<HashMap<FileKey, u64>>>,
total_size: Arc<AtomicU64>,
}
impl SizeTracker {
/// Returns a new [SizeTracker].
pub fn new(total_size: Arc<AtomicU64>) -> Self {
Self {
file_sizes: Arc::new(RwLock::new(HashMap::new())),
total_size,
}
}
/// Returns the manifest tracker.
pub(crate) fn manifest_tracker(&self) -> DeltaTracker {
DeltaTracker {
size_tracker: self.clone(),
}
}
/// Returns the checkpoint tracker.
pub(crate) fn checkpoint_tracker(&self) -> CheckpointTracker {
CheckpointTracker {
size_tracker: self.clone(),
}
}
/// Records a delta file size.
pub(crate) fn record_delta(&self, version: ManifestVersion, size: u64) {
self.record(FileKey::Delta(version), size);
}
/// Records a checkpoint file size.
pub(crate) fn record_checkpoint(&self, version: ManifestVersion, size: u64) {
self.record(FileKey::Checkpoint(version), size);
}
/// Removes a file from tracking.
pub(crate) fn remove(&self, key: &FileKey) {
if let Some(size) = self.file_sizes.write().unwrap().remove(key) {
self.total_size.fetch_sub(size, Ordering::Relaxed);
}
}
/// Returns the total tracked size.
pub(crate) fn total(&self) -> u64 {
self.total_size.load(Ordering::Relaxed)
}
/// Resets all tracking.
pub(crate) fn reset(&self) {
self.file_sizes.write().unwrap().clear();
self.total_size.store(0, Ordering::Relaxed);
}
fn record(&self, key: FileKey, size: u64) {
// Remove the old size if present
if let Some(old_size) = self.file_sizes.write().unwrap().insert(key, size) {
self.total_size.fetch_sub(old_size, Ordering::Relaxed);
}
self.total_size.fetch_add(size, Ordering::Relaxed);
}
}

View File

@@ -0,0 +1,109 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_datasource::compression::CompressionType;
use common_telemetry::debug;
use object_store::{Lister, ObjectStore, util};
use snafu::ResultExt;
use store_api::ManifestVersion;
use crate::error::{OpenDalSnafu, Result};
use crate::manifest::storage::delta::DeltaStorage;
use crate::manifest::storage::size_tracker::NoopTracker;
use crate::manifest::storage::utils::sort_manifests;
use crate::manifest::storage::{file_version, is_delta_file};
#[derive(Debug, Clone)]
pub(crate) struct StagingStorage {
delta_storage: DeltaStorage<NoopTracker>,
}
impl StagingStorage {
pub fn new(path: String, object_store: ObjectStore, compress_type: CompressionType) -> Self {
let staging_path = {
// Convert "region_dir/manifest/" to "region_dir/staging/manifest/"
let parent_dir = path.trim_end_matches("manifest/").trim_end_matches('/');
util::normalize_dir(&format!("{}/staging/manifest", parent_dir))
};
let delta_storage = DeltaStorage::new(
staging_path.clone(),
object_store.clone(),
compress_type,
// StagingStorage does not use a manifest cache; set to None.
None,
// StagingStorage does not track file sizes, since all staging files are
// deleted after exiting staging mode.
Arc::new(NoopTracker),
);
Self { delta_storage }
}
/// Returns an iterator of manifests from staging directory.
pub(crate) async fn manifest_lister(&self) -> Result<Option<Lister>> {
self.delta_storage.manifest_lister().await
}
/// Fetch all staging manifest files and return them as (version, action_list) pairs.
pub(crate) async fn fetch_manifests(&self) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
let manifest_entries = self
.delta_storage
.get_paths(|entry| {
let file_name = entry.name();
if is_delta_file(file_name) {
let version = file_version(file_name);
Some((version, entry))
} else {
None
}
})
.await?;
let mut sorted_entries = manifest_entries;
sort_manifests(&mut sorted_entries);
self.delta_storage
.fetch_manifests_from_entries(sorted_entries)
.await
}
/// Save the delta manifest file.
pub(crate) async fn save(&mut self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
self.delta_storage.save(version, bytes).await
}
/// Clean all staging manifest files.
pub(crate) async fn clear(&self) -> Result<()> {
self.delta_storage
.object_store()
.remove_all(self.delta_storage.path())
.await
.context(OpenDalSnafu)?;
debug!(
"Cleared all staging manifest files from {}",
self.delta_storage.path()
);
Ok(())
}
}
#[cfg(test)]
impl StagingStorage {
pub fn set_compress_type(&mut self, compress_type: CompressionType) {
self.delta_storage.set_compress_type(compress_type);
}
}

View File

@@ -0,0 +1,73 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use object_store::{Entry, ObjectStore};
use snafu::ResultExt;
use store_api::ManifestVersion;
use crate::cache::manifest_cache::ManifestCache;
use crate::error::{OpenDalSnafu, Result};
/// Gets a manifest file from cache.
/// Returns the file data if found in cache, None otherwise.
pub(crate) async fn get_from_cache(cache: Option<&ManifestCache>, key: &str) -> Option<Vec<u8>> {
let cache = cache?;
cache.get_file(key).await
}
/// Puts a manifest file into cache.
pub(crate) async fn put_to_cache(cache: Option<&ManifestCache>, key: String, data: &[u8]) {
let Some(cache) = cache else {
return;
};
cache.put_file(key, data.to_vec()).await
}
/// Removes a manifest file from cache.
pub(crate) async fn remove_from_cache(cache: Option<&ManifestCache>, key: &str) {
let Some(cache) = cache else {
return;
};
cache.remove(key).await
}
/// Writes data to object store and puts it into cache.
pub(crate) async fn write_and_put_cache(
object_store: &ObjectStore,
cache: Option<&ManifestCache>,
path: &str,
data: Vec<u8>,
) -> Result<()> {
// Clone data for cache before writing, only if cache is enabled.
let cache_data = if cache.is_some() {
Some(data.clone())
} else {
None
};
// Write to object store
object_store.write(path, data).await.context(OpenDalSnafu)?;
// Put to cache if we cloned the data
if let Some(data) = cache_data {
put_to_cache(cache, path.to_string(), &data).await;
}
Ok(())
}
/// Sorts the manifest files.
pub(crate) fn sort_manifests(entries: &mut [(ManifestVersion, Entry)]) {
entries.sort_unstable_by_key(|(version, _)| *version);
}

View File

@@ -25,7 +25,7 @@ use crate::manifest::action::{
RegionCheckpoint, RegionEdit, RegionMetaAction, RegionMetaActionList,
};
use crate::manifest::manager::RegionManifestManager;
use crate::manifest::storage::CheckpointMetadata;
use crate::manifest::storage::checkpoint::CheckpointMetadata;
use crate::manifest::tests::utils::basic_region_metadata;
use crate::sst::file::FileMeta;
use crate::test_util::TestEnv;
@@ -117,7 +117,8 @@ async fn manager_without_checkpoint() {
expected.sort_unstable();
let mut paths = manager
.store()
.get_paths(|e| Some(e.name().to_string()), false)
.delta_storage()
.get_paths(|e| Some(e.name().to_string()))
.await
.unwrap();
paths.sort_unstable();
@@ -159,7 +160,8 @@ async fn manager_with_checkpoint_distance_1() {
expected.sort_unstable();
let mut paths = manager
.store()
.get_paths(|e| Some(e.name().to_string()), false)
.delta_storage()
.get_paths(|e| Some(e.name().to_string()))
.await
.unwrap();
paths.sort_unstable();
@@ -168,7 +170,7 @@ async fn manager_with_checkpoint_distance_1() {
// check content in `_last_checkpoint`
let raw_bytes = manager
.store()
.read_file(&manager.store().last_checkpoint_path())
.read_file(&manager.store().checkpoint_storage().last_checkpoint_path())
.await
.unwrap();
let raw_json = std::str::from_utf8(&raw_bytes).unwrap();
@@ -213,7 +215,7 @@ async fn test_corrupted_data_causing_checksum_error() {
// Corrupt the last checkpoint data
let mut corrupted_bytes = manager
.store()
.read_file(&manager.store().last_checkpoint_path())
.read_file(&manager.store().checkpoint_storage().last_checkpoint_path())
.await
.unwrap();
corrupted_bytes[0] ^= 1;
@@ -221,6 +223,7 @@ async fn test_corrupted_data_causing_checksum_error() {
// Overwrite the latest checkpoint data
manager
.store()
.checkpoint_storage()
.write_last_checkpoint(9, &corrupted_bytes)
.await
.unwrap();
@@ -410,7 +413,8 @@ async fn manifest_install_manifest_to_with_checkpoint() {
expected.sort_unstable();
let mut paths = manager
.store()
.get_paths(|e| Some(e.name().to_string()), false)
.delta_storage()
.get_paths(|e| Some(e.name().to_string()))
.await
.unwrap();