refactor(SST): UUID as id in FileMeta (#1116)

* feat(SST): use a newType named FileId for FileMeta

* chore: rename some functions

* fix: compatible for previous FileMeta format

* fix: alias for file_id when getting deserialized
This commit is contained in:
Bohan Wu
2023-03-08 14:27:20 +08:00
committed by GitHub
parent b31a6cb506
commit ba3ce436df
12 changed files with 306 additions and 186 deletions

View File

@@ -187,10 +187,7 @@ impl ChunkReaderBuilder {
);
continue;
}
let reader = self
.sst_layer
.read_sst(file.file_name(), &read_opts)
.await?;
let reader = self.sst_layer.read_sst(file.file_id(), &read_opts).await?;
reader_builder = reader_builder.push_batch_reader(reader);
}

View File

@@ -169,7 +169,7 @@ mod tests {
use super::*;
use crate::file_purger::noop::new_noop_file_purger;
use crate::sst::FileMeta;
use crate::sst::{FileId, FileMeta};
#[test]
fn test_time_bucket_span() {
@@ -221,19 +221,19 @@ mod tests {
assert_eq!(
TIME_BUCKETS[0],
infer_time_bucket(&[
new_file_handle("a", 0, TIME_BUCKETS[0] * 1000 - 1),
new_file_handle("b", 1, 10_000)
new_file_handle(FileId::random(), 0, TIME_BUCKETS[0] * 1000 - 1),
new_file_handle(FileId::random(), 1, 10_000)
])
);
}
fn new_file_handle(name: &str, start_ts_millis: i64, end_ts_millis: i64) -> FileHandle {
fn new_file_handle(file_id: FileId, start_ts_millis: i64, end_ts_millis: i64) -> FileHandle {
let file_purger = new_noop_file_purger();
let layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer {});
FileHandle::new(
FileMeta {
region_id: 0,
file_name: name.to_string(),
file_id,
time_range: Some((
Timestamp::new_millisecond(start_ts_millis),
Timestamp::new_millisecond(end_ts_millis),
@@ -246,75 +246,70 @@ mod tests {
)
}
fn new_file_handles(input: &[(&str, i64, i64)]) -> Vec<FileHandle> {
fn new_file_handles(input: &[(FileId, i64, i64)]) -> Vec<FileHandle> {
input
.iter()
.map(|(name, start, end)| new_file_handle(name, *start, *end))
.map(|(file_id, start, end)| new_file_handle(*file_id, *start, *end))
.collect()
}
fn check_bucket_calculation(
bucket_sec: i64,
files: Vec<FileHandle>,
expected: &[(i64, &[&str])],
expected: &[(i64, &[FileId])],
) {
let res = calculate_time_buckets(bucket_sec, &files);
let expected = expected
.iter()
.map(|(bucket, file_names)| {
(
*bucket,
file_names
.iter()
.map(|s| s.to_string())
.collect::<HashSet<_>>(),
)
})
.map(|(bucket, file_ids)| (*bucket, file_ids.iter().copied().collect::<HashSet<_>>()))
.collect::<HashMap<_, _>>();
for (bucket, file_names) in expected {
for (bucket, file_ids) in expected {
let actual = res
.get(&bucket)
.unwrap()
.iter()
.map(|f| f.file_name().to_string())
.map(|f| f.file_id())
.collect();
assert_eq!(
file_names, actual,
"bucket: {bucket}, expected: {file_names:?}, actual: {actual:?}",
file_ids, actual,
"bucket: {bucket}, expected: {file_ids:?}, actual: {actual:?}",
);
}
}
#[test]
fn test_calculate_time_buckets() {
let file_id_a = FileId::random();
let file_id_b = FileId::random();
// simple case, files with disjoint
check_bucket_calculation(
10,
new_file_handles(&[("a", 0, 9000), ("b", 10000, 19000)]),
&[(0, &["a"]), (10, &["b"])],
new_file_handles(&[(file_id_a, 0, 9000), (file_id_b, 10000, 19000)]),
&[(0, &[file_id_a]), (10, &[file_id_b])],
);
// files across buckets
check_bucket_calculation(
10,
new_file_handles(&[("a", 0, 10001), ("b", 10000, 19000)]),
&[(0, &["a"]), (10, &["a", "b"])],
new_file_handles(&[(file_id_a, 0, 10001), (file_id_b, 10000, 19000)]),
&[(0, &[file_id_a]), (10, &[file_id_a, file_id_b])],
);
check_bucket_calculation(
10,
new_file_handles(&[("a", 0, 10000)]),
&[(0, &["a"]), (10, &["a"])],
new_file_handles(&[(file_id_a, 0, 10000)]),
&[(0, &[file_id_a]), (10, &[file_id_a])],
);
// file with an large time range
let file_id_array = &[file_id_a];
let expected = (0..(TIME_BUCKETS[4] / TIME_BUCKETS[0]))
.map(|b| (b * TIME_BUCKETS[0], &["a"] as _))
.map(|b| (b * TIME_BUCKETS[0], file_id_array as _))
.collect::<Vec<_>>();
check_bucket_calculation(
TIME_BUCKETS[0],
new_file_handles(&[("a", 0, TIME_BUCKETS[4] * 1000)]),
new_file_handles(&[(file_id_a, 0, TIME_BUCKETS[4] * 1000)]),
&expected,
);
}

View File

@@ -18,7 +18,6 @@ use std::fmt::{Debug, Formatter};
use common_telemetry::{error, info};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::compaction::writer::build_sst_reader;
use crate::error::Result;
@@ -26,7 +25,9 @@ use crate::manifest::action::RegionEdit;
use crate::manifest::region::RegionManifest;
use crate::region::{RegionWriterRef, SharedDataRef};
use crate::schema::RegionSchemaRef;
use crate::sst::{AccessLayerRef, FileHandle, FileMeta, Level, Source, SstInfo, WriteOptions};
use crate::sst::{
AccessLayerRef, FileHandle, FileId, FileMeta, Level, Source, SstInfo, WriteOptions,
};
use crate::wal::Wal;
#[async_trait::async_trait]
@@ -170,19 +171,20 @@ impl CompactionOutput {
self.bucket_bound + self.bucket,
)
.await?;
let output_file_name = format!("{}.parquet", Uuid::new_v4().hyphenated());
let output_file_id = FileId::random();
let opts = WriteOptions {};
let SstInfo {
time_range,
file_size,
} = sst_layer
.write_sst(&output_file_name, Source::Reader(reader), &opts)
.write_sst(output_file_id, Source::Reader(reader), &opts)
.await?;
Ok(FileMeta {
region_id,
file_name: output_file_name,
file_id: output_file_id,
time_range,
level: self.output_level,
file_size,

View File

@@ -102,9 +102,8 @@ mod tests {
DefaultMemtableBuilder, IterContext, KeyValues, Memtable, MemtableBuilder,
};
use crate::metadata::RegionMetadata;
use crate::sst;
use crate::sst::parquet::ParquetWriter;
use crate::sst::{FileMeta, FsAccessLayer, Source, SstInfo, WriteOptions};
use crate::sst::{self, FileId, FileMeta, FsAccessLayer, Source, SstInfo, WriteOptions};
use crate::test_util::descriptor_util::RegionDescBuilder;
fn schema_for_test() -> RegionSchemaRef {
@@ -165,7 +164,7 @@ mod tests {
}
async fn write_sst(
sst_file_name: &str,
sst_file_id: FileId,
schema: RegionSchemaRef,
seq: &AtomicU64,
object_store: ObjectStore,
@@ -219,7 +218,8 @@ mod tests {
}
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());
let file_path = sst_file_id.as_parquet();
let writer = ParquetWriter::new(&file_path, Source::Iter(iter), object_store.clone());
let SstInfo {
time_range,
@@ -231,7 +231,7 @@ mod tests {
let handle = FileHandle::new(
FileMeta {
region_id: 0,
file_name: sst_file_name.to_string(),
file_id: sst_file_id,
time_range,
level: 0,
file_size,
@@ -282,7 +282,7 @@ mod tests {
let seq = AtomicU64::new(0);
let schema = schema_for_test();
let file1 = write_sst(
"a.parquet",
FileId::random(),
schema.clone(),
&seq,
object_store.clone(),
@@ -297,7 +297,7 @@ mod tests {
)
.await;
let file2 = write_sst(
"b.parquet",
FileId::random(),
schema.clone(),
&seq,
object_store.clone(),
@@ -358,8 +358,12 @@ mod tests {
let schema = schema_for_test();
let seq = AtomicU64::new(0);
let input_file_ids = [FileId::random(), FileId::random()];
let output_file_ids = [FileId::random(), FileId::random(), FileId::random()];
let file1 = write_sst(
"i1.parquet",
input_file_ids[0],
schema.clone(),
&seq,
object_store.clone(),
@@ -376,7 +380,7 @@ mod tests {
// in file2 we delete the row with timestamp 1000.
let file2 = write_sst(
"i2.parquet",
input_file_ids[1],
schema.clone(),
&seq,
object_store.clone(),
@@ -405,7 +409,7 @@ mod tests {
let opts = WriteOptions {};
let s1 = ParquetWriter::new(
"./o1.parquet",
&output_file_ids[0].as_parquet(),
Source::Reader(reader1),
object_store.clone(),
)
@@ -421,7 +425,7 @@ mod tests {
);
let s2 = ParquetWriter::new(
"./o2.parquet",
&output_file_ids[1].as_parquet(),
Source::Reader(reader2),
object_store.clone(),
)
@@ -437,7 +441,7 @@ mod tests {
);
let s3 = ParquetWriter::new(
"./o3.parquet",
&output_file_ids[2].as_parquet(),
Source::Reader(reader3),
object_store.clone(),
)
@@ -453,13 +457,13 @@ mod tests {
s3.time_range
);
let output_files = ["o1.parquet", "o2.parquet", "o3.parquet"]
let output_files = output_file_ids
.into_iter()
.map(|f| {
FileHandle::new(
FileMeta {
region_id: 0,
file_name: f.to_string(),
file_id: f,
level: 1,
time_range: None,
file_size: 0,

View File

@@ -20,11 +20,11 @@ use tokio::sync::Notify;
use crate::scheduler::rate_limit::{BoxedRateLimitToken, RateLimitToken};
use crate::scheduler::{Handler, LocalScheduler, Request};
use crate::sst::AccessLayerRef;
use crate::sst::{AccessLayerRef, FileId};
pub struct FilePurgeRequest {
pub region_id: RegionId,
pub file_name: String,
pub file_id: FileId,
pub sst_layer: AccessLayerRef,
}
@@ -32,7 +32,7 @@ impl Request for FilePurgeRequest {
type Key = String;
fn key(&self) -> Self::Key {
format!("{}/{}", self.region_id, self.file_name)
format!("{}/{}", self.region_id, self.file_id)
}
}
@@ -48,16 +48,15 @@ impl Handler for FilePurgeHandler {
token: BoxedRateLimitToken,
finish_notifier: Arc<Notify>,
) -> crate::error::Result<()> {
req.sst_layer
.delete_sst(&req.file_name)
.await
.map_err(|e| {
error!(e; "Failed to delete SST file, file: {}, region: {}", req.file_name, req.region_id);
e
})?;
req.sst_layer.delete_sst(req.file_id).await.map_err(|e| {
error!(e; "Failed to delete SST file, file: {}, region: {}",
req.file_id.as_parquet(), req.region_id);
e
})?;
debug!(
"Successfully deleted SST file: {}, region: {}",
req.file_name, req.region_id
req.file_id.as_parquet(),
req.region_id
);
token.try_release();
finish_notifier.notify_one();
@@ -126,7 +125,7 @@ mod tests {
async fn create_sst_file(
os: ObjectStore,
sst_file_name: &str,
sst_file_id: FileId,
file_purger: FilePurgerRef,
) -> (FileHandle, String, AccessLayerRef) {
let schema = schema_for_test();
@@ -144,7 +143,7 @@ mod tests {
let sst_path = "table1";
let layer = Arc::new(FsAccessLayer::new(sst_path, os.clone()));
let sst_info = layer
.write_sst(sst_file_name, Source::Iter(iter), &WriteOptions {})
.write_sst(sst_file_id, Source::Iter(iter), &WriteOptions {})
.await
.unwrap();
@@ -152,7 +151,7 @@ mod tests {
FileHandle::new(
FileMeta {
region_id: 0,
file_name: sst_file_name.to_string(),
file_id: sst_file_id,
time_range: None,
level: 0,
file_size: sst_info.file_size,
@@ -175,17 +174,18 @@ mod tests {
.unwrap(),
)
.finish();
let sst_file_name = "test-file-purge-handler.parquet";
let sst_file_id = FileId::random();
let noop_file_purger = Arc::new(LocalScheduler::new(
SchedulerConfig::default(),
NoopFilePurgeHandler,
));
let (file, path, layer) =
create_sst_file(object_store.clone(), sst_file_name, noop_file_purger).await;
let (_file, path, layer) =
create_sst_file(object_store.clone(), sst_file_id, noop_file_purger).await;
let request = FilePurgeRequest {
region_id: 0,
file_name: file.file_name().to_string(),
file_id: sst_file_id,
sst_layer: layer,
};
@@ -198,7 +198,7 @@ mod tests {
notify.notified().await;
let object = object_store.object(&format!("{}/{}", path, sst_file_name));
let object = object_store.object(&format!("{}/{}", path, sst_file_id.as_parquet()));
assert!(!object.is_exist().await.unwrap());
}
@@ -213,13 +213,13 @@ mod tests {
.unwrap(),
)
.finish();
let sst_file_name = "test-file-purger.parquet";
let sst_file_id = FileId::random();
let scheduler = Arc::new(LocalScheduler::new(
SchedulerConfig::default(),
FilePurgeHandler,
));
let (handle, path, _layer) =
create_sst_file(object_store.clone(), sst_file_name, scheduler.clone()).await;
create_sst_file(object_store.clone(), sst_file_id, scheduler.clone()).await;
{
// mark file as deleted and drop the handle, we expect the file is deleted.
@@ -228,7 +228,7 @@ mod tests {
}
scheduler.stop(true).await.unwrap();
assert!(!object_store
.object(&format!("{}/{}", path, sst_file_name))
.object(&format!("{}/{}", path, sst_file_id.as_parquet()))
.is_exist()
.await
.unwrap());

View File

@@ -21,7 +21,6 @@ use common_telemetry::logging;
use store_api::logstore::LogStore;
use store_api::storage::consts::WRITE_ROW_GROUP_SIZE;
use store_api::storage::SequenceNumber;
use uuid::Uuid;
use crate::background::{Context, Job, JobHandle, JobPoolRef};
use crate::error::{CancelledSnafu, Result};
@@ -29,7 +28,7 @@ use crate::manifest::action::*;
use crate::manifest::region::RegionManifest;
use crate::memtable::{IterContext, MemtableId, MemtableRef};
use crate::region::{RegionWriterRef, SharedDataRef};
use crate::sst::{AccessLayerRef, FileMeta, Source, SstInfo, WriteOptions};
use crate::sst::{AccessLayerRef, FileId, FileMeta, Source, SstInfo, WriteOptions};
use crate::wal::Wal;
/// Default write buffer size (32M).
@@ -197,7 +196,7 @@ impl<S: LogStore> FlushJob<S> {
continue;
}
let file_name = Self::generate_sst_file_name();
let file_id = FileId::random();
// TODO(hl): Check if random file name already exists in meta.
let iter = m.iter(&iter_ctx)?;
let sst_layer = self.sst_layer.clone();
@@ -207,12 +206,12 @@ impl<S: LogStore> FlushJob<S> {
time_range,
file_size,
} = sst_layer
.write_sst(&file_name, Source::Iter(iter), &WriteOptions::default())
.write_sst(file_id, Source::Iter(iter), &WriteOptions::default())
.await?;
Ok(FileMeta {
region_id,
file_name,
file_id,
time_range,
level: 0,
file_size,
@@ -250,11 +249,6 @@ impl<S: LogStore> FlushJob<S> {
.await?;
self.wal.obsolete(self.flush_sequence).await
}
/// Generates random SST file name in format: `^[a-f\d]{8}(-[a-f\d]{4}){3}-[a-f\d]{12}.parquet$`
fn generate_sst_file_name() -> String {
format!("{}.parquet", Uuid::new_v4().hyphenated())
}
}
#[async_trait]
@@ -273,9 +267,6 @@ impl<S: LogStore> Job for FlushJob<S> {
#[cfg(test)]
mod tests {
use log_store::NoopLogStore;
use regex::Regex;
use super::*;
#[test]
@@ -284,14 +275,4 @@ mod tests {
assert_eq!(8, get_mutable_limitation(10));
assert_eq!(56, get_mutable_limitation(64));
}
#[test]
pub fn test_uuid_generate() {
let file_name = FlushJob::<NoopLogStore>::generate_sst_file_name();
let regex = Regex::new(r"^[a-f\d]{8}(-[a-f\d]{4}){3}-[a-f\d]{12}.parquet$").unwrap();
assert!(
regex.is_match(&file_name),
"Illegal sst file name: {file_name}",
);
}
}

View File

@@ -184,6 +184,7 @@ mod tests {
use super::*;
use crate::manifest::test_utils;
use crate::sst::FileId;
#[test]
fn test_encode_decode_action_list() {
@@ -194,8 +195,8 @@ mod tests {
RegionMetaAction::Protocol(protocol.clone()),
RegionMetaAction::Edit(test_utils::build_region_edit(
99,
&["test1", "test2"],
&["test3"],
&[FileId::random(), FileId::random()],
&[FileId::random()],
)),
]);
action_list.set_prev_version(3);

View File

@@ -31,6 +31,7 @@ mod tests {
use super::*;
use crate::manifest::test_utils::*;
use crate::metadata::RegionMetadata;
use crate::sst::FileId;
#[tokio::test]
async fn test_region_manifest() {
@@ -94,8 +95,12 @@ mod tests {
// Save some actions
manifest
.update(RegionMetaActionList::new(vec![
RegionMetaAction::Edit(build_region_edit(1, &["f1"], &[])),
RegionMetaAction::Edit(build_region_edit(2, &["f2", "f3"], &[])),
RegionMetaAction::Edit(build_region_edit(1, &[FileId::random()], &[])),
RegionMetaAction::Edit(build_region_edit(
2,
&[FileId::random(), FileId::random()],
&[],
)),
]))
.await
.unwrap();

View File

@@ -17,7 +17,7 @@ use store_api::storage::SequenceNumber;
use crate::manifest::action::*;
use crate::metadata::RegionMetadata;
use crate::sst::FileMeta;
use crate::sst::{FileId, FileMeta};
use crate::test_util::descriptor_util::RegionDescBuilder;
pub const DEFAULT_TEST_FILE_SIZE: u64 = 1024;
@@ -34,8 +34,8 @@ pub fn build_region_meta() -> RegionMetadata {
pub fn build_region_edit(
sequence: SequenceNumber,
files_to_add: &[&str],
files_to_remove: &[&str],
files_to_add: &[FileId],
files_to_remove: &[FileId],
) -> RegionEdit {
RegionEdit {
region_version: 0,
@@ -44,7 +44,7 @@ pub fn build_region_edit(
.iter()
.map(|f| FileMeta {
region_id: 0,
file_name: f.to_string(),
file_id: *f,
time_range: None,
level: 0,
file_size: DEFAULT_TEST_FILE_SIZE,
@@ -54,7 +54,7 @@ pub fn build_region_edit(
.iter()
.map(|f| FileMeta {
region_id: 0,
file_name: f.to_string(),
file_id: *f,
time_range: None,
level: 0,
file_size: DEFAULT_TEST_FILE_SIZE,

View File

@@ -42,7 +42,7 @@ use crate::manifest::action::{RegionChange, RegionMetaActionList};
use crate::manifest::test_utils::*;
use crate::memtable::DefaultMemtableBuilder;
use crate::scheduler::{LocalScheduler, SchedulerConfig};
use crate::sst::FsAccessLayer;
use crate::sst::{FileId, FsAccessLayer};
use crate::test_util::descriptor_util::RegionDescBuilder;
use crate::test_util::{self, config_util, schema_util, write_batch_util};
@@ -306,6 +306,10 @@ async fn test_recover_region_manifets() {
.0
.is_none());
let file_id_a = FileId::random();
let file_id_b = FileId::random();
let file_id_c = FileId::random();
{
// save some actions into region_meta
manifest
@@ -320,8 +324,8 @@ async fn test_recover_region_manifets() {
manifest
.update(RegionMetaActionList::new(vec![
RegionMetaAction::Edit(build_region_edit(1, &["f1"], &[])),
RegionMetaAction::Edit(build_region_edit(2, &["f2", "f3"], &[])),
RegionMetaAction::Edit(build_region_edit(1, &[file_id_a], &[])),
RegionMetaAction::Edit(build_region_edit(2, &[file_id_b, file_id_c], &[])),
]))
.await
.unwrap();
@@ -355,11 +359,15 @@ async fn test_recover_region_manifets() {
let ssts = version.ssts();
let files = ssts.levels()[0]
.files()
.map(|f| f.file_name().to_string())
.map(|f| f.file_name())
.collect::<HashSet<_>>();
assert_eq!(3, files.len());
assert_eq!(
HashSet::from(["f1".to_string(), "f2".to_string(), "f3".to_string()]),
HashSet::from([
file_id_a.as_parquet(),
file_id_b.as_parquet(),
file_id_c.as_parquet()
]),
files
);

View File

@@ -15,6 +15,8 @@
pub(crate) mod parquet;
use std::collections::HashMap;
use std::fmt;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -23,10 +25,11 @@ use common_telemetry::{error, info};
use common_time::range::TimestampRange;
use common_time::Timestamp;
use object_store::{util, ObjectStore};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{ResultExt, Snafu};
use store_api::storage::{ChunkReader, RegionId};
use table::predicate::Predicate;
use uuid::Uuid;
use crate::chunk::ChunkReaderImpl;
use crate::error::{DeleteSstSnafu, Result};
@@ -95,7 +98,7 @@ impl LevelMetas {
for file in files_to_remove {
let level = file.level;
if let Some(removed_file) = merged.levels[level as usize].remove_file(&file.file_name) {
if let Some(removed_file) = merged.levels[level as usize].remove_file(file.file_id) {
removed_file.mark_deleted();
}
}
@@ -114,7 +117,7 @@ pub struct LevelMeta {
/// Handles to the files in this level.
// TODO(yingwen): Now for simplicity, files are unordered, maybe sort the files by time range
// or use another structure to hold them.
files: HashMap<String, FileHandle>,
files: HashMap<FileId, FileHandle>,
}
impl LevelMeta {
@@ -126,11 +129,11 @@ impl LevelMeta {
}
fn add_file(&mut self, file: FileHandle) {
self.files.insert(file.file_name().to_string(), file);
self.files.insert(file.file_id(), file);
}
fn remove_file(&mut self, file_to_remove: &str) -> Option<FileHandle> {
self.files.remove(file_to_remove)
fn remove_file(&mut self, file_to_remove: FileId) -> Option<FileHandle> {
self.files.remove(&file_to_remove)
}
/// Returns the level of level meta.
@@ -173,7 +176,6 @@ fn new_level_meta_vec() -> LevelMetaVec {
.unwrap() // safety: LevelMetaVec is a fixed length array with length MAX_LEVEL
}
/// In-memory handle to a file.
#[derive(Debug, Clone)]
pub struct FileHandle {
inner: Arc<FileHandleInner>,
@@ -197,8 +199,13 @@ impl FileHandle {
}
#[inline]
pub fn file_name(&self) -> &str {
&self.inner.meta.file_name
pub fn file_name(&self) -> String {
self.inner.meta.file_id.as_parquet()
}
#[inline]
pub fn file_id(&self) -> FileId {
self.inner.meta.file_id
}
#[inline]
@@ -256,18 +263,21 @@ impl Drop for FileHandleInner {
if self.deleted.load(Ordering::Relaxed) {
let request = FilePurgeRequest {
sst_layer: self.sst_layer.clone(),
file_name: self.meta.file_name.clone(),
file_id: self.meta.file_id,
region_id: self.meta.region_id,
};
match self.file_purger.schedule(request) {
Ok(res) => {
info!(
"Scheduled SST purge task, region: {}, name: {}, res: {}",
self.meta.region_id, self.meta.file_name, res
self.meta.region_id,
self.meta.file_id.as_parquet(),
res
);
}
Err(e) => {
error!(e; "Failed to schedule SST purge task, region: {}, name: {}", self.meta.region_id, self.meta.file_name);
error!(e; "Failed to schedule SST purge task, region: {}, name: {}",
self.meta.region_id, self.meta.file_id.as_parquet());
}
}
}
@@ -290,14 +300,56 @@ impl FileHandleInner {
}
}
#[derive(Debug, Snafu, PartialEq)]
pub struct ParseIdError {
source: uuid::Error,
}
/// Unique id for [SST File].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
pub struct FileId(Uuid);
impl FileId {
/// Returns a new unique [FileId] randomly.
pub fn random() -> FileId {
FileId(Uuid::new_v4())
}
/// Parses id from string.
pub fn parse_str(input: &str) -> std::result::Result<FileId, ParseIdError> {
Uuid::parse_str(input).map(FileId).context(ParseIdSnafu)
}
/// Append `.parquet` to file id to make a complete file name
pub fn as_parquet(&self) -> String {
format!("{}{}", self.0.hyphenated(), ".parquet")
}
}
impl fmt::Display for FileId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl FromStr for FileId {
type Err = ParseIdError;
fn from_str(s: &str) -> std::result::Result<FileId, ParseIdError> {
FileId::parse_str(s)
}
}
/// Immutable metadata of a sst file.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct FileMeta {
/// Region of file.
pub region_id: RegionId,
/// File name
pub file_name: String,
/// Compared to normal file names, FileId ignore the extension
#[serde(deserialize_with = "deserialize_from_string")]
#[serde(alias = "file_name")]
pub file_id: FileId,
/// Timestamp range of file.
pub time_range: Option<(Timestamp, Timestamp)>,
/// SST level of the file.
@@ -306,6 +358,15 @@ pub struct FileMeta {
pub file_size: u64,
}
fn deserialize_from_string<'de, D>(deserializer: D) -> std::result::Result<FileId, D::Error>
where
D: Deserializer<'de>,
{
let s: &str = Deserialize::deserialize(deserializer)?;
let stripped = s.strip_suffix(".parquet").unwrap_or(s); // strip parquet suffix if needed.
FileId::from_str(stripped).map_err(<D::Error as serde::de::Error>::custom)
}
#[derive(Debug, Default)]
pub struct WriteOptions {
// TODO(yingwen): [flush] row group size.
@@ -334,16 +395,16 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug {
/// Writes SST file with given `file_name`.
async fn write_sst(
&self,
file_name: &str,
file_id: FileId,
source: Source,
opts: &WriteOptions,
) -> Result<SstInfo>;
/// Read SST file with given `file_name` and schema.
async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result<BoxedBatchReader>;
async fn read_sst(&self, file_id: FileId, opts: &ReadOptions) -> Result<BoxedBatchReader>;
/// Deletes a SST file with given name.
async fn delete_sst(&self, file_name: &str) -> Result<()>;
async fn delete_sst(&self, file_id: FileId) -> Result<()>;
}
pub type AccessLayerRef = Arc<dyn AccessLayer>;
@@ -400,19 +461,19 @@ impl FsAccessLayer {
impl AccessLayer for FsAccessLayer {
async fn write_sst(
&self,
file_name: &str,
file_id: FileId,
source: Source,
opts: &WriteOptions,
) -> Result<SstInfo> {
// Now we only supports parquet format. We may allow caller to specific SST format in
// WriteOptions in the future.
let file_path = self.sst_file_path(file_name);
let file_path = self.sst_file_path(&file_id.as_parquet());
let writer = ParquetWriter::new(&file_path, source, self.object_store.clone());
writer.write_sst(opts).await
}
async fn read_sst(&self, file_name: &str, opts: &ReadOptions) -> Result<BoxedBatchReader> {
let file_path = self.sst_file_path(file_name);
async fn read_sst(&self, file_id: FileId, opts: &ReadOptions) -> Result<BoxedBatchReader> {
let file_path = self.sst_file_path(&file_id.as_parquet());
let reader = ParquetReader::new(
&file_path,
self.object_store.clone(),
@@ -425,8 +486,8 @@ impl AccessLayer for FsAccessLayer {
Ok(Box::new(stream))
}
async fn delete_sst(&self, file_name: &str) -> Result<()> {
let path = self.sst_file_path(file_name);
async fn delete_sst(&self, file_id: FileId) -> Result<()> {
let path = self.sst_file_path(&file_id.as_parquet());
let object = self.object_store.object(&path);
object.delete().await.context(DeleteSstSnafu)
}
@@ -440,10 +501,81 @@ mod tests {
use crate::file_purger::noop::NoopFilePurgeHandler;
use crate::scheduler::{LocalScheduler, SchedulerConfig};
fn create_file_meta(name: &str, level: Level) -> FileMeta {
#[test]
fn test_file_id() {
let id = FileId::random();
let uuid_str = id.to_string();
assert_eq!(id.0.to_string(), uuid_str);
let parsed = FileId::parse_str(&uuid_str).unwrap();
assert_eq!(id, parsed);
let parsed = uuid_str.parse().unwrap();
assert_eq!(id, parsed);
}
#[test]
fn test_file_id_serialization() {
let id = FileId::random();
let json = serde_json::to_string(&id).unwrap();
assert_eq!(format!("\"{id}\""), json);
let parsed = serde_json::from_str(&json).unwrap();
assert_eq!(id, parsed);
}
#[test]
fn test_deserialize_file_meta() {
let file_meta = create_file_meta(FileId::random(), 0);
let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
assert_eq!(file_meta, deserialized_file_meta.unwrap());
}
#[test]
fn test_deserialize_from_string() {
let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\"time_range\":null,\"level\":0}";
let file_meta = create_file_meta(
FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
0,
);
let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
assert_eq!(file_meta, deserialized_file_meta);
}
#[test]
fn test_deserialize_from_string_parquet() {
let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55.parquet\",\"time_range\":null,\"level\":0}";
let file_meta = create_file_meta(
FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
0,
);
let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
assert_eq!(file_meta, deserialized_file_meta);
}
#[test]
fn test_deserialize_from_string_parquet_file_name() {
let json_file_meta = "{\"region_id\":0,\"file_name\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55.parquet\",\"time_range\":null,\"level\":0}";
let file_meta = create_file_meta(
FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
0,
);
let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
assert_eq!(file_meta, deserialized_file_meta);
}
#[test]
fn test_file_id_as_parquet() {
let id = FileId::from_str("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap();
assert_eq!(
"67e55044-10b1-426f-9247-bb680e5fe0c8.parquet",
id.as_parquet()
);
}
fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
FileMeta {
region_id: 0,
file_name: name.to_string(),
file_id,
time_range: None,
level,
file_size: 0,
@@ -457,85 +589,80 @@ mod tests {
SchedulerConfig::default(),
NoopFilePurgeHandler,
));
let file_ids = [
FileId::random(),
FileId::random(),
FileId::random(),
FileId::random(),
];
let metas = LevelMetas::new(layer, purger);
let merged = metas.merge(
vec![create_file_meta("a", 0), create_file_meta("b", 0)].into_iter(),
vec![
create_file_meta(file_ids[0], 0),
create_file_meta(file_ids[1], 0),
]
.into_iter(),
vec![].into_iter(),
);
assert_eq!(
HashSet::from(["a".to_string(), "b".to_string()]),
merged
.level(0)
.files()
.map(|f| f.file_name().to_string())
.collect()
HashSet::from([file_ids[0], file_ids[1]]),
merged.level(0).files().map(|f| f.file_id()).collect()
);
let merged1 = merged.merge(
vec![create_file_meta("c", 1), create_file_meta("d", 1)].into_iter(),
vec![
create_file_meta(file_ids[2], 1),
create_file_meta(file_ids[3], 1),
]
.into_iter(),
vec![].into_iter(),
);
assert_eq!(
HashSet::from(["a".to_string(), "b".to_string()]),
merged1
.level(0)
.files()
.map(|f| f.file_name().to_string())
.collect()
HashSet::from([file_ids[0], file_ids[1]]),
merged1.level(0).files().map(|f| f.file_id()).collect()
);
assert_eq!(
HashSet::from(["c".to_string(), "d".to_string()]),
merged1
.level(1)
.files()
.map(|f| f.file_name().to_string())
.collect()
HashSet::from([file_ids[2], file_ids[3]]),
merged1.level(1).files().map(|f| f.file_id()).collect()
);
let removed1 = merged1.merge(
vec![].into_iter(),
vec![create_file_meta("a", 0), create_file_meta("c", 0)].into_iter(),
vec![
create_file_meta(file_ids[0], 0),
create_file_meta(file_ids[2], 0),
]
.into_iter(),
);
assert_eq!(
HashSet::from(["b".to_string()]),
removed1
.level(0)
.files()
.map(|f| f.file_name().to_string())
.collect()
HashSet::from([file_ids[1]]),
removed1.level(0).files().map(|f| f.file_id()).collect()
);
assert_eq!(
HashSet::from(["c".to_string(), "d".to_string()]),
removed1
.level(1)
.files()
.map(|f| f.file_name().to_string())
.collect()
HashSet::from([file_ids[2], file_ids[3]]),
removed1.level(1).files().map(|f| f.file_id()).collect()
);
let removed2 = removed1.merge(
vec![].into_iter(),
vec![create_file_meta("c", 1), create_file_meta("d", 1)].into_iter(),
vec![
create_file_meta(file_ids[2], 1),
create_file_meta(file_ids[3], 1),
]
.into_iter(),
);
assert_eq!(
HashSet::from(["b".to_string()]),
removed2
.level(0)
.files()
.map(|f| f.file_name().to_string())
.collect()
HashSet::from([file_ids[1]]),
removed2.level(0).files().map(|f| f.file_id()).collect()
);
assert_eq!(
HashSet::new(),
removed2
.level(1)
.files()
.map(|f| f.file_name().to_string())
.collect()
removed2.level(1).files().map(|f| f.file_id()).collect()
);
}
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use crate::read::BoxedBatchReader;
use crate::sst::{AccessLayer, ReadOptions, Source, SstInfo, WriteOptions};
use crate::sst::{AccessLayer, FileId, ReadOptions, Source, SstInfo, WriteOptions};
#[derive(Debug)]
pub struct MockAccessLayer;
@@ -22,7 +22,7 @@ pub struct MockAccessLayer;
impl AccessLayer for MockAccessLayer {
async fn write_sst(
&self,
_file_name: &str,
_file_id: FileId,
_source: Source,
_opts: &WriteOptions,
) -> crate::error::Result<SstInfo> {
@@ -31,13 +31,13 @@ impl AccessLayer for MockAccessLayer {
async fn read_sst(
&self,
_file_name: &str,
_file_id: FileId,
_opts: &ReadOptions,
) -> crate::error::Result<BoxedBatchReader> {
unimplemented!()
}
async fn delete_sst(&self, _file_name: &str) -> crate::error::Result<()> {
async fn delete_sst(&self, _file_id: FileId) -> crate::error::Result<()> {
Ok(())
}
}