fix: Holds FileHandle in ParquetReader to avoid the purger purges it (#1224)

This commit is contained in:
Yingwen
2023-03-23 22:24:25 +08:00
committed by GitHub
parent 4e552245b1
commit f1139fba59
4 changed files with 98 additions and 54 deletions

View File

@@ -187,7 +187,7 @@ impl ChunkReaderBuilder {
);
continue;
}
let reader = self.sst_layer.read_sst(file.file_id(), &read_opts).await?;
let reader = self.sst_layer.read_sst(file.clone(), &read_opts).await?;
reader_builder = reader_builder.push_batch_reader(reader);
}

View File

@@ -203,6 +203,13 @@ impl FileHandle {
self.inner.meta.file_id.as_parquet()
}
#[inline]
pub fn file_path(&self) -> String {
self.inner
.sst_layer
.sst_file_path(&self.inner.meta.file_id.as_parquet())
}
#[inline]
pub fn file_id(&self) -> FileId {
self.inner.meta.file_id
@@ -392,7 +399,10 @@ pub struct SstInfo {
/// SST access layer.
#[async_trait]
pub trait AccessLayer: Send + Sync + std::fmt::Debug {
/// Writes SST file with given `file_name`.
/// Returns the sst file path.
fn sst_file_path(&self, file_name: &str) -> String;
/// Writes SST file with given `file_id`.
async fn write_sst(
&self,
file_id: FileId,
@@ -400,8 +410,12 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug {
opts: &WriteOptions,
) -> Result<SstInfo>;
/// Read SST file with given `file_name` and schema.
async fn read_sst(&self, file_id: FileId, opts: &ReadOptions) -> Result<BoxedBatchReader>;
/// Read SST file with given `file_handle` and schema.
async fn read_sst(
&self,
file_handle: FileHandle,
opts: &ReadOptions,
) -> Result<BoxedBatchReader>;
/// Deletes a SST file with given name.
async fn delete_sst(&self, file_id: FileId) -> Result<()>;
@@ -436,7 +450,7 @@ impl Source {
}
}
/// Sst access layer based on local file system.
/// Sst access layer.
#[derive(Debug)]
pub struct FsAccessLayer {
sst_dir: String,
@@ -450,15 +464,15 @@ impl FsAccessLayer {
object_store,
}
}
#[inline]
fn sst_file_path(&self, file_name: &str) -> String {
format!("{}{}", self.sst_dir, file_name)
}
}
#[async_trait]
impl AccessLayer for FsAccessLayer {
fn sst_file_path(&self, file_name: &str) -> String {
format!("{}{}", self.sst_dir, file_name)
}
/// Writes SST file with given `file_id`.
async fn write_sst(
&self,
file_id: FileId,
@@ -472,10 +486,14 @@ impl AccessLayer for FsAccessLayer {
writer.write_sst(opts).await
}
async fn read_sst(&self, file_id: FileId, opts: &ReadOptions) -> Result<BoxedBatchReader> {
let file_path = self.sst_file_path(&file_id.as_parquet());
/// Read SST file with given `file_handle` and schema.
async fn read_sst(
&self,
file_handle: FileHandle,
opts: &ReadOptions,
) -> Result<BoxedBatchReader> {
let reader = ParquetReader::new(
&file_path,
file_handle,
self.object_store.clone(),
opts.projected_schema.clone(),
opts.predicate.clone(),
@@ -486,6 +504,7 @@ impl AccessLayer for FsAccessLayer {
Ok(Box::new(stream))
}
/// Deletes a SST file with given file id.
async fn delete_sst(&self, file_id: FileId) -> Result<()> {
let path = self.sst_file_path(&file_id.as_parquet());
let object = self.object_store.object(&path);

View File

@@ -56,7 +56,7 @@ use crate::read::{Batch, BatchReader};
use crate::schema::compat::ReadAdapter;
use crate::schema::{ProjectedSchemaRef, StoreSchema, StoreSchemaRef};
use crate::sst;
use crate::sst::{Source, SstInfo};
use crate::sst::{FileHandle, Source, SstInfo};
/// Parquet sst writer.
pub struct ParquetWriter<'a> {
file_path: &'a str,
@@ -216,24 +216,25 @@ fn decode_timestamp_range_inner(
)))
}
pub struct ParquetReader<'a> {
file_path: &'a str,
pub struct ParquetReader {
// Holds the file handle to avoid the file purge purge it.
file_handle: FileHandle,
object_store: ObjectStore,
projected_schema: ProjectedSchemaRef,
predicate: Predicate,
time_range: TimestampRange,
}
impl<'a> ParquetReader<'a> {
impl ParquetReader {
pub fn new(
file_path: &str,
file_handle: FileHandle,
object_store: ObjectStore,
projected_schema: ProjectedSchemaRef,
predicate: Predicate,
time_range: TimestampRange,
) -> ParquetReader {
ParquetReader {
file_path,
file_handle,
object_store,
projected_schema,
predicate,
@@ -242,28 +243,24 @@ impl<'a> ParquetReader<'a> {
}
pub async fn chunk_stream(&self) -> Result<ChunkStream> {
let file_path = self.file_handle.file_path();
let operator = self.object_store.clone();
let reader = operator
.object(self.file_path)
.object(&file_path)
.reader()
.await
.context(ReadObjectSnafu {
path: self.file_path,
})?
.context(ReadObjectSnafu { path: &file_path })?
.compat();
let buf_reader = BufReader::new(reader);
let builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
.await
.context(ReadParquetSnafu {
file: self.file_path,
})?;
.context(ReadParquetSnafu { file: &file_path })?;
let arrow_schema = builder.schema().clone();
let store_schema = Arc::new(StoreSchema::try_from(arrow_schema).context(
error::ConvertStoreSchemaSnafu {
file: self.file_path,
},
)?);
let store_schema = Arc::new(
StoreSchema::try_from(arrow_schema)
.context(error::ConvertStoreSchemaSnafu { file: &file_path })?,
);
let adapter = ReadAdapter::new(store_schema.clone(), self.projected_schema.clone())?;
@@ -290,14 +287,13 @@ impl<'a> ParquetReader<'a> {
builder = builder.with_row_filter(row_filter);
}
let mut stream = builder.build().context(ReadParquetSnafu {
file: self.file_path,
})?;
let mut stream = builder
.build()
.context(ReadParquetSnafu { file: &file_path })?;
let file_name = self.file_path.to_string();
let chunk_stream = try_stream!({
while let Some(res) = stream.next().await {
yield res.context(ReadParquetSnafu { file: &file_name })?
yield res.context(ReadParquetSnafu { file: &file_path })?
}
});
@@ -539,10 +535,12 @@ mod tests {
use store_api::storage::OpType;
use super::*;
use crate::file_purger::noop::new_noop_file_purger;
use crate::memtable::{
tests as memtable_tests, DefaultMemtableBuilder, IterContext, MemtableBuilder,
};
use crate::schema::ProjectedSchema;
use crate::sst::{FileId, FileMeta};
#[tokio::test]
async fn test_parquet_writer() {
@@ -673,9 +671,10 @@ mod tests {
let path = dir.path().to_str().unwrap();
let backend = Fs::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend).finish();
let sst_file_name = "test-read-large.parquet";
let sst_file_handle = new_file_handle(FileId::random());
let sst_file_name = sst_file_handle.file_name();
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());
let writer = ParquetWriter::new(&sst_file_name, Source::Iter(iter), object_store.clone());
let SstInfo {
time_range,
@@ -703,7 +702,7 @@ mod tests {
let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap());
let reader = ParquetReader::new(
"test-read-large.parquet",
sst_file_handle,
operator,
projected_schema,
Predicate::empty(),
@@ -718,6 +717,25 @@ mod tests {
assert_eq!(rows_total, rows_fetched);
}
fn new_file_handle(file_id: FileId) -> FileHandle {
let file_purger = new_noop_file_purger();
let layer = Arc::new(crate::test_util::access_layer_util::MockAccessLayer {});
FileHandle::new(
FileMeta {
region_id: 0,
file_id,
time_range: Some((
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(1000),
)),
level: 0,
file_size: 0,
},
layer,
file_purger,
)
}
#[tokio::test]
async fn test_parquet_reader() {
common_telemetry::init_default_ut_logging();
@@ -750,9 +768,10 @@ mod tests {
let path = dir.path().to_str().unwrap();
let backend = Fs::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend).finish();
let sst_file_name = "test-read.parquet";
let file_handle = new_file_handle(FileId::random());
let sst_file_name = file_handle.file_name();
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());
let writer = ParquetWriter::new(&sst_file_name, Source::Iter(iter), object_store.clone());
let SstInfo {
time_range,
@@ -780,7 +799,7 @@ mod tests {
let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap());
let reader = ParquetReader::new(
"test-read.parquet",
file_handle,
operator,
projected_schema,
Predicate::empty(),
@@ -801,17 +820,18 @@ mod tests {
}
async fn check_range_read(
file_name: &str,
file_handle: FileHandle,
object_store: ObjectStore,
schema: ProjectedSchemaRef,
range: TimestampRange,
expect: Vec<i64>,
) {
let reader = ParquetReader::new(file_name, object_store, schema, Predicate::empty(), range);
let reader =
ParquetReader::new(file_handle, object_store, schema, Predicate::empty(), range);
let mut stream = reader.chunk_stream().await.unwrap();
let result = stream.next_batch().await;
let Some(batch) = result.unwrap() else {
let Some(batch) = result.unwrap() else {
// if batch does not contain any row
assert!(expect.is_empty());
return;
@@ -867,9 +887,10 @@ mod tests {
let path = dir.path().to_str().unwrap();
let backend = Fs::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend).finish();
let sst_file_name = "test-read.parquet";
let sst_file_handle = new_file_handle(FileId::random());
let sst_file_name = sst_file_handle.file_name();
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());
let writer = ParquetWriter::new(&sst_file_name, Source::Iter(iter), object_store.clone());
let SstInfo {
time_range,
@@ -892,7 +913,7 @@ mod tests {
Arc::new(ProjectedSchema::new(schema, Some(vec![1, 0, 3, 2])).unwrap());
check_range_read(
sst_file_name,
sst_file_handle.clone(),
object_store.clone(),
projected_schema.clone(),
TimestampRange::with_unit(1000, 2003, TimeUnit::Millisecond).unwrap(),
@@ -901,7 +922,7 @@ mod tests {
.await;
check_range_read(
sst_file_name,
sst_file_handle.clone(),
object_store.clone(),
projected_schema.clone(),
TimestampRange::with_unit(2002, 3001, TimeUnit::Millisecond).unwrap(),
@@ -911,7 +932,7 @@ mod tests {
// read a range without any rows.
check_range_read(
sst_file_name,
sst_file_handle.clone(),
object_store.clone(),
projected_schema.clone(),
TimestampRange::with_unit(3002, 3003, TimeUnit::Millisecond).unwrap(),
@@ -921,7 +942,7 @@ mod tests {
//
check_range_read(
sst_file_name,
sst_file_handle.clone(),
object_store.clone(),
projected_schema.clone(),
TimestampRange::with_unit(1000, 3000, TimeUnit::Millisecond).unwrap(),
@@ -931,7 +952,7 @@ mod tests {
// read full range
check_range_read(
sst_file_name,
sst_file_handle,
object_store,
projected_schema,
TimestampRange::min_to_max(),

View File

@@ -13,13 +13,17 @@
// limitations under the License.
use crate::read::BoxedBatchReader;
use crate::sst::{AccessLayer, FileId, ReadOptions, Source, SstInfo, WriteOptions};
use crate::sst::{AccessLayer, FileHandle, FileId, ReadOptions, Source, SstInfo, WriteOptions};
#[derive(Debug)]
pub struct MockAccessLayer;
#[async_trait::async_trait]
impl AccessLayer for MockAccessLayer {
fn sst_file_path(&self, file_name: &str) -> String {
file_name.to_string()
}
async fn write_sst(
&self,
_file_id: FileId,
@@ -31,7 +35,7 @@ impl AccessLayer for MockAccessLayer {
async fn read_sst(
&self,
_file_id: FileId,
_file_handle: FileHandle,
_opts: &ReadOptions,
) -> crate::error::Result<BoxedBatchReader> {
unimplemented!()