diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index 6c8a3ee744..d5a95c6691 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -80,10 +80,11 @@ impl CompactionTaskImpl { }); } - let outputs = futures::future::join_all(futs) - .await + let outputs = futures::future::try_join_all(futs) + .await? .into_iter() - .collect::>()?; + .flatten() + .collect(); let inputs = compacted_inputs.into_iter().collect(); Ok((outputs, inputs)) } @@ -162,7 +163,7 @@ impl CompactionOutput { region_id: RegionId, schema: RegionSchemaRef, sst_layer: AccessLayerRef, - ) -> Result { + ) -> Result> { let reader = build_sst_reader( schema, sst_layer.clone(), @@ -175,20 +176,21 @@ impl CompactionOutput { let output_file_id = FileId::random(); let opts = WriteOptions {}; - let SstInfo { - time_range, - file_size, - } = sst_layer + Ok(sst_layer .write_sst(output_file_id, Source::Reader(reader), &opts) - .await?; - - Ok(FileMeta { - region_id, - file_id: output_file_id, - time_range, - level: self.output_level, - file_size, - }) + .await? + .map( + |SstInfo { + time_range, + file_size, + }| FileMeta { + region_id, + file_id: output_file_id, + time_range, + level: self.output_level, + file_size, + }, + )) } } diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index fa8ed0e836..0b9cd12e00 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -227,6 +227,7 @@ mod tests { } = writer .write_sst(&sst::WriteOptions::default()) .await + .unwrap() .unwrap(); let handle = FileHandle::new( FileMeta { @@ -415,6 +416,7 @@ mod tests { ) .write_sst(&opts) .await + .unwrap() .unwrap(); assert_eq!( Some(( @@ -431,6 +433,7 @@ mod tests { ) .write_sst(&opts) .await + .unwrap() .unwrap(); assert_eq!( Some(( @@ -447,6 +450,7 @@ mod tests { ) .write_sst(&opts) .await + .unwrap() .unwrap(); assert_eq!( diff --git a/src/storage/src/file_purger.rs b/src/storage/src/file_purger.rs index a0f65695f8..c0a460d916 100644 --- a/src/storage/src/file_purger.rs +++ b/src/storage/src/file_purger.rs @@ -145,6 +145,7 @@ mod tests { let sst_info = layer .write_sst(sst_file_id, Source::Iter(iter), &WriteOptions {}) .await + .unwrap() .unwrap(); ( diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 90ea58678f..0759b79d49 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -202,28 +202,28 @@ impl FlushJob { let sst_layer = self.sst_layer.clone(); futures.push(async move { - let SstInfo { - time_range, - file_size, - } = sst_layer + Ok(sst_layer .write_sst(file_id, Source::Iter(iter), &WriteOptions::default()) - .await?; - - Ok(FileMeta { - region_id, - file_id, - time_range, - level: 0, - file_size, - }) + .await? + .map( + |SstInfo { + time_range, + file_size, + }| FileMeta { + region_id, + file_id, + time_range, + level: 0, + file_size, + }, + )) }); } - let metas = futures_util::future::join_all(futures) - .await - .into_iter() - .collect::>>()? + let metas = futures_util::future::try_join_all(futures) + .await? .into_iter() + .flatten() .collect(); logging::info!("Successfully flush memtables to files: {:?}", metas); diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index 511f4dfdfe..f74c020f50 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -283,7 +283,7 @@ impl Drop for FileHandleInner { ); } Err(e) => { - error!(e; "Failed to schedule SST purge task, region: {}, name: {}", + error!(e; "Failed to schedule SST purge task, region: {}, name: {}", self.meta.region_id, self.meta.file_id.as_parquet()); } } @@ -402,13 +402,14 @@ pub trait AccessLayer: Send + Sync + std::fmt::Debug { /// Returns the sst file path. fn sst_file_path(&self, file_name: &str) -> String; - /// Writes SST file with given `file_id`. + /// Writes SST file with given `file_id` and returns the SST info. + /// If source does not contain any data, `write_sst` will return `Ok(None)`. async fn write_sst( &self, file_id: FileId, source: Source, opts: &WriteOptions, - ) -> Result; + ) -> Result>; /// Read SST file with given `file_handle` and schema. async fn read_sst( @@ -478,7 +479,7 @@ impl AccessLayer for FsAccessLayer { file_id: FileId, source: Source, opts: &WriteOptions, - ) -> Result { + ) -> Result> { // Now we only supports parquet format. We may allow caller to specific SST format in // WriteOptions in the future. let file_path = self.sst_file_path(&file_id.as_parquet()); diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index d0553e4148..76701f85d0 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -75,14 +75,17 @@ impl<'a> ParquetWriter<'a> { } } - pub async fn write_sst(self, _opts: &sst::WriteOptions) -> Result { + pub async fn write_sst(self, _opts: &sst::WriteOptions) -> Result> { self.write_rows(None).await } /// Iterates memtable and writes rows to Parquet file. /// A chunk of records yielded from each iteration with a size given /// in config will be written to a single row group. - async fn write_rows(mut self, extra_meta: Option>) -> Result { + async fn write_rows( + mut self, + extra_meta: Option>, + ) -> Result> { let projected_schema = self.source.projected_schema(); let store_schema = projected_schema.schema_to_read(); let schema = store_schema.arrow_schema().clone(); @@ -106,6 +109,7 @@ impl<'a> ParquetWriter<'a> { let mut arrow_writer = ArrowWriter::try_new(&mut buf, schema.clone(), Some(writer_props)) .context(WriteParquetSnafu)?; + let mut batches_written = 0; while let Some(batch) = self.source.next_batch().await? { let arrow_batch = RecordBatch::try_new( schema.clone(), @@ -119,8 +123,13 @@ impl<'a> ParquetWriter<'a> { arrow_writer .write(&arrow_batch) .context(WriteParquetSnafu)?; + batches_written += 1; } + if batches_written == 0 { + // if the source does not contain any batch, we skip writing an empty parquet file. + return Ok(None); + } let file_meta = arrow_writer.close().context(WriteParquetSnafu)?; let time_range = decode_timestamp_range(&file_meta, store_schema) @@ -137,10 +146,10 @@ impl<'a> ParquetWriter<'a> { path: object.path(), })? .content_length(); - Ok(SstInfo { + Ok(Some(SstInfo { time_range, file_size, - }) + })) } } @@ -210,6 +219,13 @@ fn decode_timestamp_range_inner( end = end.max(max); } + assert!( + start <= end, + "Illegal timestamp range decoded from SST file {:?}, start: {}, end: {}", + file_meta, + start, + end + ); Ok(Some(( Timestamp::new(start, unit), Timestamp::new(end, unit), @@ -682,6 +698,7 @@ mod tests { } = writer .write_sst(&sst::WriteOptions::default()) .await + .unwrap() .unwrap(); assert_eq!( @@ -779,6 +796,7 @@ mod tests { } = writer .write_sst(&sst::WriteOptions::default()) .await + .unwrap() .unwrap(); assert_eq!( @@ -898,6 +916,7 @@ mod tests { } = writer .write_sst(&sst::WriteOptions::default()) .await + .unwrap() .unwrap(); assert_eq!( @@ -971,6 +990,28 @@ mod tests { ) } + #[tokio::test] + async fn test_write_empty_file() { + common_telemetry::init_default_ut_logging(); + let schema = memtable_tests::schema_for_test(); + let memtable = DefaultMemtableBuilder::default().build(schema.clone()); + + let dir = create_temp_dir("read-parquet-by-range"); + let path = dir.path().to_str().unwrap(); + let backend = Fs::default().root(path).build().unwrap(); + let object_store = ObjectStore::new(backend).finish(); + let sst_file_name = "test-read.parquet"; + let iter = memtable.iter(&IterContext::default()).unwrap(); + let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); + + let sst_info_opt = writer + .write_sst(&sst::WriteOptions::default()) + .await + .unwrap(); + + assert!(sst_info_opt.is_none()); + } + #[test] fn test_time_unit_lossy() { // converting a range with unit second to millisecond will not cause rounding error diff --git a/src/storage/src/test_util/access_layer_util.rs b/src/storage/src/test_util/access_layer_util.rs index ad1f04c8f0..faad942cb7 100644 --- a/src/storage/src/test_util/access_layer_util.rs +++ b/src/storage/src/test_util/access_layer_util.rs @@ -29,7 +29,7 @@ impl AccessLayer for MockAccessLayer { _file_id: FileId, _source: Source, _opts: &WriteOptions, - ) -> crate::error::Result { + ) -> crate::error::Result> { unimplemented!() }