From 8796ddaf310c378b0592a391cefa00b492f9e83b Mon Sep 17 00:00:00 2001 From: discord9 Date: Sat, 8 Mar 2025 20:32:11 +0800 Subject: [PATCH] chore: remove unwrap --- src/sst-convert/src/reader/parquet.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/src/sst-convert/src/reader/parquet.rs b/src/sst-convert/src/reader/parquet.rs index b9aa6f16a5..8c847bc30d 100644 --- a/src/sst-convert/src/reader/parquet.rs +++ b/src/sst-convert/src/reader/parquet.rs @@ -29,6 +29,7 @@ use datatypes::schema::Schema; use datatypes::value::Value; use datatypes::vectors::{MutableVector, UInt64VectorBuilder, UInt8VectorBuilder}; use futures_util::StreamExt; +use mito2::error::{OpenDalSnafu, ReadParquetSnafu}; use mito2::read::{Batch, BatchColumn, BatchReader}; use mito2::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SparsePrimaryKeyCodec}; use object_store::ObjectStore; @@ -50,16 +51,27 @@ impl OpenDALParquetReader { metadata: RegionMetadataRef, override_sequence: Option, ) -> Result { - let reader = operator.reader_with(path).await.unwrap(); + let reader = operator + .reader_with(path) + .await + .context(OpenDalSnafu) + .map_err(BoxedError::new)?; - let content_len = operator.stat(path).await.unwrap().content_length(); + let content_len = operator + .stat(path) + .await + .context(OpenDalSnafu) + .map_err(BoxedError::new)? + .content_length(); let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024); let stream = ParquetRecordBatchStreamBuilder::new(reader) .await - .unwrap() + .context(ReadParquetSnafu { path }) + .map_err(BoxedError::new)? .build() - .unwrap(); + .context(ReadParquetSnafu { path }) + .map_err(BoxedError::new)?; Ok(Self { inner: RawParquetReader::new(stream, metadata, override_sequence, path), }) @@ -158,7 +170,7 @@ pub fn extract_to_batches( .map(|id| { metadata .column_by_id(*id) - .unwrap() + .expect("Can't find column by id") .column_schema .name .clone() @@ -195,7 +207,7 @@ pub fn extract_to_batches( let builder = SSTBatchBuilder::new(rb, metadata, override_sequence).map_err(BoxedError::new)?; pk_to_batchs.insert(cur_pk.clone(), builder); - pk_to_batchs.get_mut(cur_pk).unwrap() + pk_to_batchs.get_mut(cur_pk).expect("Just inserted") }; builder.push_row(&row).map_err(BoxedError::new)?; }