diff --git a/src/storage/src/chunk.rs b/src/storage/src/chunk.rs index 7cfc32dd8d..80416a8106 100644 --- a/src/storage/src/chunk.rs +++ b/src/storage/src/chunk.rs @@ -187,7 +187,7 @@ impl ChunkReaderBuilder { ); continue; } - let reader = self.sst_layer.read_sst(file.file_id(), &read_opts).await?; + let reader = self.sst_layer.read_sst(file.clone(), &read_opts).await?; reader_builder = reader_builder.push_batch_reader(reader); } diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index a2769af3e2..511f4dfdfe 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -203,6 +203,13 @@ impl FileHandle { self.inner.meta.file_id.as_parquet() } + #[inline] + pub fn file_path(&self) -> String { + self.inner + .sst_layer + .sst_file_path(&self.inner.meta.file_id.as_parquet()) + } + #[inline] pub fn file_id(&self) -> FileId { self.inner.meta.file_id @@ -392,7 +399,10 @@ pub struct SstInfo { /// SST access layer. #[async_trait] pub trait AccessLayer: Send + Sync + std::fmt::Debug { - /// Writes SST file with given `file_name`. + /// Returns the sst file path. + fn sst_file_path(&self, file_name: &str) -> String; + + /// Writes SST file with given `file_id`. async fn write_sst( &self, file_id: FileId, @@ -400,8 +410,12 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug { opts: &WriteOptions, ) -> Result; - /// Read SST file with given `file_name` and schema. - async fn read_sst(&self, file_id: FileId, opts: &ReadOptions) -> Result; + /// Read SST file with given `file_handle` and schema. + async fn read_sst( + &self, + file_handle: FileHandle, + opts: &ReadOptions, + ) -> Result; /// Deletes a SST file with given name. async fn delete_sst(&self, file_id: FileId) -> Result<()>; @@ -436,7 +450,7 @@ impl Source { } } -/// Sst access layer based on local file system. +/// Sst access layer. #[derive(Debug)] pub struct FsAccessLayer { sst_dir: String, @@ -450,15 +464,15 @@ impl FsAccessLayer { object_store, } } - - #[inline] - fn sst_file_path(&self, file_name: &str) -> String { - format!("{}{}", self.sst_dir, file_name) - } } #[async_trait] impl AccessLayer for FsAccessLayer { + fn sst_file_path(&self, file_name: &str) -> String { + format!("{}{}", self.sst_dir, file_name) + } + + /// Writes SST file with given `file_id`. async fn write_sst( &self, file_id: FileId, @@ -472,10 +486,14 @@ impl AccessLayer for FsAccessLayer { writer.write_sst(opts).await } - async fn read_sst(&self, file_id: FileId, opts: &ReadOptions) -> Result { - let file_path = self.sst_file_path(&file_id.as_parquet()); + /// Read SST file with given `file_handle` and schema. + async fn read_sst( + &self, + file_handle: FileHandle, + opts: &ReadOptions, + ) -> Result { let reader = ParquetReader::new( - &file_path, + file_handle, self.object_store.clone(), opts.projected_schema.clone(), opts.predicate.clone(), @@ -486,6 +504,7 @@ impl AccessLayer for FsAccessLayer { Ok(Box::new(stream)) } + /// Deletes a SST file with given file id. async fn delete_sst(&self, file_id: FileId) -> Result<()> { let path = self.sst_file_path(&file_id.as_parquet()); let object = self.object_store.object(&path); diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index fe08d59920..d0553e4148 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -56,7 +56,7 @@ use crate::read::{Batch, BatchReader}; use crate::schema::compat::ReadAdapter; use crate::schema::{ProjectedSchemaRef, StoreSchema, StoreSchemaRef}; use crate::sst; -use crate::sst::{Source, SstInfo}; +use crate::sst::{FileHandle, Source, SstInfo}; /// Parquet sst writer. pub struct ParquetWriter<'a> { file_path: &'a str, @@ -216,24 +216,25 @@ fn decode_timestamp_range_inner( ))) } -pub struct ParquetReader<'a> { - file_path: &'a str, +pub struct ParquetReader { + // Holds the file handle to avoid the file purge purge it. + file_handle: FileHandle, object_store: ObjectStore, projected_schema: ProjectedSchemaRef, predicate: Predicate, time_range: TimestampRange, } -impl<'a> ParquetReader<'a> { +impl ParquetReader { pub fn new( - file_path: &str, + file_handle: FileHandle, object_store: ObjectStore, projected_schema: ProjectedSchemaRef, predicate: Predicate, time_range: TimestampRange, ) -> ParquetReader { ParquetReader { - file_path, + file_handle, object_store, projected_schema, predicate, @@ -242,28 +243,24 @@ impl<'a> ParquetReader<'a> { } pub async fn chunk_stream(&self) -> Result { + let file_path = self.file_handle.file_path(); let operator = self.object_store.clone(); let reader = operator - .object(self.file_path) + .object(&file_path) .reader() .await - .context(ReadObjectSnafu { - path: self.file_path, - })? + .context(ReadObjectSnafu { path: &file_path })? .compat(); let buf_reader = BufReader::new(reader); let builder = ParquetRecordBatchStreamBuilder::new(buf_reader) .await - .context(ReadParquetSnafu { - file: self.file_path, - })?; + .context(ReadParquetSnafu { file: &file_path })?; let arrow_schema = builder.schema().clone(); - let store_schema = Arc::new(StoreSchema::try_from(arrow_schema).context( - error::ConvertStoreSchemaSnafu { - file: self.file_path, - }, - )?); + let store_schema = Arc::new( + StoreSchema::try_from(arrow_schema) + .context(error::ConvertStoreSchemaSnafu { file: &file_path })?, + ); let adapter = ReadAdapter::new(store_schema.clone(), self.projected_schema.clone())?; @@ -290,14 +287,13 @@ impl<'a> ParquetReader<'a> { builder = builder.with_row_filter(row_filter); } - let mut stream = builder.build().context(ReadParquetSnafu { - file: self.file_path, - })?; + let mut stream = builder + .build() + .context(ReadParquetSnafu { file: &file_path })?; - let file_name = self.file_path.to_string(); let chunk_stream = try_stream!({ while let Some(res) = stream.next().await { - yield res.context(ReadParquetSnafu { file: &file_name })? + yield res.context(ReadParquetSnafu { file: &file_path })? } }); @@ -539,10 +535,12 @@ mod tests { use store_api::storage::OpType; use super::*; + use crate::file_purger::noop::new_noop_file_purger; use crate::memtable::{ tests as memtable_tests, DefaultMemtableBuilder, IterContext, MemtableBuilder, }; use crate::schema::ProjectedSchema; + use crate::sst::{FileId, FileMeta}; #[tokio::test] async fn test_parquet_writer() { @@ -673,9 +671,10 @@ mod tests { let path = dir.path().to_str().unwrap(); let backend = Fs::default().root(path).build().unwrap(); let object_store = ObjectStore::new(backend).finish(); - let sst_file_name = "test-read-large.parquet"; + let sst_file_handle = new_file_handle(FileId::random()); + let sst_file_name = sst_file_handle.file_name(); let iter = memtable.iter(&IterContext::default()).unwrap(); - let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); + let writer = ParquetWriter::new(&sst_file_name, Source::Iter(iter), object_store.clone()); let SstInfo { time_range, @@ -703,7 +702,7 @@ mod tests { let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap()); let reader = ParquetReader::new( - "test-read-large.parquet", + sst_file_handle, operator, projected_schema, Predicate::empty(), @@ -718,6 +717,25 @@ mod tests { assert_eq!(rows_total, rows_fetched); } + fn new_file_handle(file_id: FileId) -> FileHandle { + let file_purger = new_noop_file_purger(); + let layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer {}); + FileHandle::new( + FileMeta { + region_id: 0, + file_id, + time_range: Some(( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(1000), + )), + level: 0, + file_size: 0, + }, + layer, + file_purger, + ) + } + #[tokio::test] async fn test_parquet_reader() { common_telemetry::init_default_ut_logging(); @@ -750,9 +768,10 @@ mod tests { let path = dir.path().to_str().unwrap(); let backend = Fs::default().root(path).build().unwrap(); let object_store = ObjectStore::new(backend).finish(); - let sst_file_name = "test-read.parquet"; + let file_handle = new_file_handle(FileId::random()); + let sst_file_name = file_handle.file_name(); let iter = memtable.iter(&IterContext::default()).unwrap(); - let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); + let writer = ParquetWriter::new(&sst_file_name, Source::Iter(iter), object_store.clone()); let SstInfo { time_range, @@ -780,7 +799,7 @@ mod tests { let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap()); let reader = ParquetReader::new( - "test-read.parquet", + file_handle, operator, projected_schema, Predicate::empty(), @@ -801,17 +820,18 @@ mod tests { } async fn check_range_read( - file_name: &str, + file_handle: FileHandle, object_store: ObjectStore, schema: ProjectedSchemaRef, range: TimestampRange, expect: Vec, ) { - let reader = ParquetReader::new(file_name, object_store, schema, Predicate::empty(), range); + let reader = + ParquetReader::new(file_handle, object_store, schema, Predicate::empty(), range); let mut stream = reader.chunk_stream().await.unwrap(); let result = stream.next_batch().await; - let Some(batch) = result.unwrap() else { + let Some(batch) = result.unwrap() else { // if batch does not contain any row assert!(expect.is_empty()); return; @@ -867,9 +887,10 @@ mod tests { let path = dir.path().to_str().unwrap(); let backend = Fs::default().root(path).build().unwrap(); let object_store = ObjectStore::new(backend).finish(); - let sst_file_name = "test-read.parquet"; + let sst_file_handle = new_file_handle(FileId::random()); + let sst_file_name = sst_file_handle.file_name(); let iter = memtable.iter(&IterContext::default()).unwrap(); - let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); + let writer = ParquetWriter::new(&sst_file_name, Source::Iter(iter), object_store.clone()); let SstInfo { time_range, @@ -892,7 +913,7 @@ mod tests { Arc::new(ProjectedSchema::new(schema, Some(vec![1, 0, 3, 2])).unwrap()); check_range_read( - sst_file_name, + sst_file_handle.clone(), object_store.clone(), projected_schema.clone(), TimestampRange::with_unit(1000, 2003, TimeUnit::Millisecond).unwrap(), @@ -901,7 +922,7 @@ mod tests { .await; check_range_read( - sst_file_name, + sst_file_handle.clone(), object_store.clone(), projected_schema.clone(), TimestampRange::with_unit(2002, 3001, TimeUnit::Millisecond).unwrap(), @@ -911,7 +932,7 @@ mod tests { // read a range without any rows. check_range_read( - sst_file_name, + sst_file_handle.clone(), object_store.clone(), projected_schema.clone(), TimestampRange::with_unit(3002, 3003, TimeUnit::Millisecond).unwrap(), @@ -921,7 +942,7 @@ mod tests { // check_range_read( - sst_file_name, + sst_file_handle.clone(), object_store.clone(), projected_schema.clone(), TimestampRange::with_unit(1000, 3000, TimeUnit::Millisecond).unwrap(), @@ -931,7 +952,7 @@ mod tests { // read full range check_range_read( - sst_file_name, + sst_file_handle, object_store, projected_schema, TimestampRange::min_to_max(), diff --git a/src/storage/src/test_util/access_layer_util.rs b/src/storage/src/test_util/access_layer_util.rs index 1308e5665a..ad1f04c8f0 100644 --- a/src/storage/src/test_util/access_layer_util.rs +++ b/src/storage/src/test_util/access_layer_util.rs @@ -13,13 +13,17 @@ // limitations under the License. use crate::read::BoxedBatchReader; -use crate::sst::{AccessLayer, FileId, ReadOptions, Source, SstInfo, WriteOptions}; +use crate::sst::{AccessLayer, FileHandle, FileId, ReadOptions, Source, SstInfo, WriteOptions}; #[derive(Debug)] pub struct MockAccessLayer; #[async_trait::async_trait] impl AccessLayer for MockAccessLayer { + fn sst_file_path(&self, file_name: &str) -> String { + file_name.to_string() + } + async fn write_sst( &self, _file_id: FileId, @@ -31,7 +35,7 @@ impl AccessLayer for MockAccessLayer { async fn read_sst( &self, - _file_id: FileId, + _file_handle: FileHandle, _opts: &ReadOptions, ) -> crate::error::Result { unimplemented!()