fix: skip empty parquet (#1236)

* fix: returns None if parquet file does not contain any rows

* fix: skip empty parquet file

* chore: add doc

* rebase develop

* fix: use flatten instead of filter_map with identity
This commit is contained in:
Lei, HUANG
2023-03-26 09:39:15 +08:00
committed by GitHub
parent 77f9383daf
commit 6f81717866
7 changed files with 92 additions and 43 deletions

View File

@@ -80,10 +80,11 @@ impl<S: LogStore> CompactionTaskImpl<S> {
});
}
let outputs = futures::future::join_all(futs)
.await
let outputs = futures::future::try_join_all(futs)
.await?
.into_iter()
.collect::<Result<_>>()?;
.flatten()
.collect();
let inputs = compacted_inputs.into_iter().collect();
Ok((outputs, inputs))
}
@@ -162,7 +163,7 @@ impl CompactionOutput {
region_id: RegionId,
schema: RegionSchemaRef,
sst_layer: AccessLayerRef,
) -> Result<FileMeta> {
) -> Result<Option<FileMeta>> {
let reader = build_sst_reader(
schema,
sst_layer.clone(),
@@ -175,20 +176,21 @@ impl CompactionOutput {
let output_file_id = FileId::random();
let opts = WriteOptions {};
let SstInfo {
time_range,
file_size,
} = sst_layer
Ok(sst_layer
.write_sst(output_file_id, Source::Reader(reader), &opts)
.await?;
Ok(FileMeta {
region_id,
file_id: output_file_id,
time_range,
level: self.output_level,
file_size,
})
.await?
.map(
|SstInfo {
time_range,
file_size,
}| FileMeta {
region_id,
file_id: output_file_id,
time_range,
level: self.output_level,
file_size,
},
))
}
}

View File

@@ -227,6 +227,7 @@ mod tests {
} = writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap()
.unwrap();
let handle = FileHandle::new(
FileMeta {
@@ -415,6 +416,7 @@ mod tests {
)
.write_sst(&opts)
.await
.unwrap()
.unwrap();
assert_eq!(
Some((
@@ -431,6 +433,7 @@ mod tests {
)
.write_sst(&opts)
.await
.unwrap()
.unwrap();
assert_eq!(
Some((
@@ -447,6 +450,7 @@ mod tests {
)
.write_sst(&opts)
.await
.unwrap()
.unwrap();
assert_eq!(

View File

@@ -145,6 +145,7 @@ mod tests {
let sst_info = layer
.write_sst(sst_file_id, Source::Iter(iter), &WriteOptions {})
.await
.unwrap()
.unwrap();
(

View File

@@ -202,28 +202,28 @@ impl<S: LogStore> FlushJob<S> {
let sst_layer = self.sst_layer.clone();
futures.push(async move {
let SstInfo {
time_range,
file_size,
} = sst_layer
Ok(sst_layer
.write_sst(file_id, Source::Iter(iter), &WriteOptions::default())
.await?;
Ok(FileMeta {
region_id,
file_id,
time_range,
level: 0,
file_size,
})
.await?
.map(
|SstInfo {
time_range,
file_size,
}| FileMeta {
region_id,
file_id,
time_range,
level: 0,
file_size,
},
))
});
}
let metas = futures_util::future::join_all(futures)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?
let metas = futures_util::future::try_join_all(futures)
.await?
.into_iter()
.flatten()
.collect();
logging::info!("Successfully flush memtables to files: {:?}", metas);

View File

@@ -283,7 +283,7 @@ impl Drop for FileHandleInner {
);
}
Err(e) => {
error!(e; "Failed to schedule SST purge task, region: {}, name: {}",
error!(e; "Failed to schedule SST purge task, region: {}, name: {}",
self.meta.region_id, self.meta.file_id.as_parquet());
}
}
@@ -402,13 +402,14 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug {
/// Returns the sst file path.
fn sst_file_path(&self, file_name: &str) -> String;
/// Writes SST file with given `file_id`.
/// Writes SST file with given `file_id` and returns the SST info.
/// If source does not contain any data, `write_sst` will return `Ok(None)`.
async fn write_sst(
&self,
file_id: FileId,
source: Source,
opts: &WriteOptions,
) -> Result<SstInfo>;
) -> Result<Option<SstInfo>>;
/// Read SST file with given `file_handle` and schema.
async fn read_sst(
@@ -478,7 +479,7 @@ impl AccessLayer for FsAccessLayer {
file_id: FileId,
source: Source,
opts: &WriteOptions,
) -> Result<SstInfo> {
) -> Result<Option<SstInfo>> {
// Now we only supports parquet format. We may allow caller to specific SST format in
// WriteOptions in the future.
let file_path = self.sst_file_path(&file_id.as_parquet());

View File

@@ -75,14 +75,17 @@ impl<'a> ParquetWriter<'a> {
}
}
pub async fn write_sst(self, _opts: &sst::WriteOptions) -> Result<SstInfo> {
pub async fn write_sst(self, _opts: &sst::WriteOptions) -> Result<Option<SstInfo>> {
self.write_rows(None).await
}
/// Iterates memtable and writes rows to Parquet file.
/// A chunk of records yielded from each iteration with a size given
/// in config will be written to a single row group.
async fn write_rows(mut self, extra_meta: Option<HashMap<String, String>>) -> Result<SstInfo> {
async fn write_rows(
mut self,
extra_meta: Option<HashMap<String, String>>,
) -> Result<Option<SstInfo>> {
let projected_schema = self.source.projected_schema();
let store_schema = projected_schema.schema_to_read();
let schema = store_schema.arrow_schema().clone();
@@ -106,6 +109,7 @@ impl<'a> ParquetWriter<'a> {
let mut arrow_writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(writer_props))
.context(WriteParquetSnafu)?;
let mut batches_written = 0;
while let Some(batch) = self.source.next_batch().await? {
let arrow_batch = RecordBatch::try_new(
schema.clone(),
@@ -119,8 +123,13 @@ impl<'a> ParquetWriter<'a> {
arrow_writer
.write(&arrow_batch)
.context(WriteParquetSnafu)?;
batches_written += 1;
}
if batches_written == 0 {
// if the source does not contain any batch, we skip writing an empty parquet file.
return Ok(None);
}
let file_meta = arrow_writer.close().context(WriteParquetSnafu)?;
let time_range = decode_timestamp_range(&file_meta, store_schema)
@@ -137,10 +146,10 @@ impl<'a> ParquetWriter<'a> {
path: object.path(),
})?
.content_length();
Ok(SstInfo {
Ok(Some(SstInfo {
time_range,
file_size,
})
}))
}
}
@@ -210,6 +219,13 @@ fn decode_timestamp_range_inner(
end = end.max(max);
}
assert!(
start <= end,
"Illegal timestamp range decoded from SST file {:?}, start: {}, end: {}",
file_meta,
start,
end
);
Ok(Some((
Timestamp::new(start, unit),
Timestamp::new(end, unit),
@@ -682,6 +698,7 @@ mod tests {
} = writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap()
.unwrap();
assert_eq!(
@@ -779,6 +796,7 @@ mod tests {
} = writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap()
.unwrap();
assert_eq!(
@@ -898,6 +916,7 @@ mod tests {
} = writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap()
.unwrap();
assert_eq!(
@@ -971,6 +990,28 @@ mod tests {
)
}
#[tokio::test]
async fn test_write_empty_file() {
common_telemetry::init_default_ut_logging();
let schema = memtable_tests::schema_for_test();
let memtable = DefaultMemtableBuilder::default().build(schema.clone());
let dir = create_temp_dir("read-parquet-by-range");
let path = dir.path().to_str().unwrap();
let backend = Fs::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend).finish();
let sst_file_name = "test-read.parquet";
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());
let sst_info_opt = writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap();
assert!(sst_info_opt.is_none());
}
#[test]
fn test_time_unit_lossy() {
// converting a range with unit second to millisecond will not cause rounding error

View File

@@ -29,7 +29,7 @@ impl AccessLayer for MockAccessLayer {
_file_id: FileId,
_source: Source,
_opts: &WriteOptions,
) -> crate::error::Result<SstInfo> {
) -> crate::error::Result<Option<SstInfo>> {
unimplemented!()
}