mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-24 00:40:40 +00:00
chore: remove unwrap
This commit is contained in:
@@ -29,6 +29,7 @@ use datatypes::schema::Schema;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::{MutableVector, UInt64VectorBuilder, UInt8VectorBuilder};
|
||||
use futures_util::StreamExt;
|
||||
use mito2::error::{OpenDalSnafu, ReadParquetSnafu};
|
||||
use mito2::read::{Batch, BatchColumn, BatchReader};
|
||||
use mito2::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, SparsePrimaryKeyCodec};
|
||||
use object_store::ObjectStore;
|
||||
@@ -50,16 +51,27 @@ impl OpenDALParquetReader {
|
||||
metadata: RegionMetadataRef,
|
||||
override_sequence: Option<SequenceNumber>,
|
||||
) -> Result<Self, BoxedError> {
|
||||
let reader = operator.reader_with(path).await.unwrap();
|
||||
let reader = operator
|
||||
.reader_with(path)
|
||||
.await
|
||||
.context(OpenDalSnafu)
|
||||
.map_err(BoxedError::new)?;
|
||||
|
||||
let content_len = operator.stat(path).await.unwrap().content_length();
|
||||
let content_len = operator
|
||||
.stat(path)
|
||||
.await
|
||||
.context(OpenDalSnafu)
|
||||
.map_err(BoxedError::new)?
|
||||
.content_length();
|
||||
|
||||
let reader = AsyncReader::new(reader, content_len).with_prefetch_footer_size(512 * 1024);
|
||||
let stream = ParquetRecordBatchStreamBuilder::new(reader)
|
||||
.await
|
||||
.unwrap()
|
||||
.context(ReadParquetSnafu { path })
|
||||
.map_err(BoxedError::new)?
|
||||
.build()
|
||||
.unwrap();
|
||||
.context(ReadParquetSnafu { path })
|
||||
.map_err(BoxedError::new)?;
|
||||
Ok(Self {
|
||||
inner: RawParquetReader::new(stream, metadata, override_sequence, path),
|
||||
})
|
||||
@@ -158,7 +170,7 @@ pub fn extract_to_batches(
|
||||
.map(|id| {
|
||||
metadata
|
||||
.column_by_id(*id)
|
||||
.unwrap()
|
||||
.expect("Can't find column by id")
|
||||
.column_schema
|
||||
.name
|
||||
.clone()
|
||||
@@ -195,7 +207,7 @@ pub fn extract_to_batches(
|
||||
let builder =
|
||||
SSTBatchBuilder::new(rb, metadata, override_sequence).map_err(BoxedError::new)?;
|
||||
pk_to_batchs.insert(cur_pk.clone(), builder);
|
||||
pk_to_batchs.get_mut(cur_pk).unwrap()
|
||||
pk_to_batchs.get_mut(cur_pk).expect("Just inserted")
|
||||
};
|
||||
builder.push_row(&row).map_err(BoxedError::new)?;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user