refactor: reduce number of parquet metadata reads and enable reader buffer (#3197)

refactor: reduce reading parquet metadata times and enable read buffer
This commit is contained in:
Weny Xu
2024-01-19 21:26:38 +09:00
committed by GitHub
parent 440cd00ad0
commit 986f3bb07d
2 changed files with 111 additions and 70 deletions

View File

@@ -351,8 +351,8 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to read parquet file"))]
ReadParquet {
#[snafu(display("Failed to read parquet file metadata"))]
ReadParquetMetadata {
#[snafu(source)]
error: parquet::errors::ParquetError,
location: Location,
@@ -587,9 +587,9 @@ impl ErrorExt for Error {
Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
Error::ReadObject { .. } | Error::ReadParquet { .. } | Error::ReadOrc { .. } => {
StatusCode::StorageUnavailable
}
Error::ReadObject { .. }
| Error::ReadParquetMetadata { .. }
| Error::ReadOrc { .. } => StatusCode::StorageUnavailable,
Error::ListObjects { source, .. }
| Error::ParseUrl { source, .. }

View File

@@ -17,8 +17,8 @@ use std::future::Future;
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_datasource::file_format::csv::{CsvConfigBuilder, CsvOpener};
use common_datasource::file_format::json::JsonOpener;
use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener};
use common_datasource::file_format::json::{JsonFormat, JsonOpener};
use common_datasource::file_format::orc::{
infer_orc_schema, new_orc_stream_reader, OrcArrowStreamReaderAdapter,
};
@@ -32,6 +32,7 @@ use common_telemetry::{debug, tracing};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream};
use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datatypes::arrow::compute::can_cast_types;
@@ -44,12 +45,46 @@ use session::context::QueryContextRef;
use snafu::ResultExt;
use table::requests::{CopyTableRequest, InsertRequest};
use table::table_reference::TableReference;
use tokio::io::BufReader;
use crate::error::{self, IntoVectorsSnafu, Result};
use crate::statement::StatementExecutor;
const DEFAULT_BATCH_SIZE: usize = 8192;
const DEFAULT_READ_BUFFER: usize = 256 * 1024;
enum FileMetadata {
Parquet {
schema: SchemaRef,
metadata: ArrowReaderMetadata,
path: String,
},
Orc {
schema: SchemaRef,
path: String,
},
Json {
schema: SchemaRef,
format: JsonFormat,
path: String,
},
Csv {
schema: SchemaRef,
format: CsvFormat,
path: String,
},
}
impl FileMetadata {
/// Returns the [SchemaRef]
pub fn schema(&self) -> &SchemaRef {
match self {
FileMetadata::Parquet { schema, .. } => schema,
FileMetadata::Orc { schema, .. } => schema,
FileMetadata::Json { schema, .. } => schema,
FileMetadata::Csv { schema, .. } => schema,
}
}
}
impl StatementExecutor {
async fn list_copy_from_entries(
@@ -82,49 +117,62 @@ impl StatementExecutor {
Ok((object_store, entries))
}
async fn infer_schema(
async fn collect_metadata(
&self,
format: &Format,
object_store: ObjectStore,
path: &str,
) -> Result<SchemaRef> {
object_store: &ObjectStore,
format: Format,
path: String,
) -> Result<FileMetadata> {
match format {
Format::Csv(format) => Ok(Arc::new(
format
.infer_schema(&object_store, path)
.await
.context(error::InferSchemaSnafu { path })?,
)),
Format::Json(format) => Ok(Arc::new(
format
.infer_schema(&object_store, path)
.await
.context(error::InferSchemaSnafu { path })?,
)),
Format::Csv(format) => Ok(FileMetadata::Csv {
schema: Arc::new(
format
.infer_schema(object_store, &path)
.await
.context(error::InferSchemaSnafu { path: &path })?,
),
format,
path,
}),
Format::Json(format) => Ok(FileMetadata::Json {
schema: Arc::new(
format
.infer_schema(object_store, &path)
.await
.context(error::InferSchemaSnafu { path: &path })?,
),
format,
path,
}),
Format::Parquet(_) => {
let reader = object_store
.reader(path)
let mut reader = object_store
.reader(&path)
.await
.context(error::ReadObjectSnafu { path })?;
let buf_reader = BufReader::new(reader);
let builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
.context(error::ReadObjectSnafu { path: &path })?;
let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
.await
.context(error::ReadParquetSnafu)?;
Ok(builder.schema().clone())
.context(error::ReadParquetMetadataSnafu)?;
Ok(FileMetadata::Parquet {
schema: metadata.schema().clone(),
metadata,
path,
})
}
Format::Orc(_) => {
let reader = object_store
.reader(path)
.reader(&path)
.await
.context(error::ReadObjectSnafu { path })?;
.context(error::ReadObjectSnafu { path: &path })?;
let schema = infer_orc_schema(reader)
.await
.context(error::ReadOrcSnafu)?;
Ok(Arc::new(schema))
Ok(FileMetadata::Orc {
schema: Arc::new(schema),
path,
})
}
}
}
@@ -158,14 +206,13 @@ impl StatementExecutor {
async fn build_read_stream(
&self,
format: &Format,
object_store: ObjectStore,
path: &str,
schema: SchemaRef,
object_store: &ObjectStore,
file_metadata: &FileMetadata,
projection: Vec<usize>,
) -> Result<DfSendableRecordBatchStream> {
match format {
Format::Csv(format) => {
match file_metadata {
FileMetadata::Csv { format, path, .. } => {
let csv_conf = CsvConfigBuilder::default()
.batch_size(DEFAULT_BATCH_SIZE)
.file_schema(schema.clone())
@@ -174,24 +221,23 @@ impl StatementExecutor {
.context(error::BuildCsvConfigSnafu)?;
self.build_file_stream(
CsvOpener::new(csv_conf, object_store, format.compression_type),
CsvOpener::new(csv_conf, object_store.clone(), format.compression_type),
path,
schema,
)
.await
}
Format::Json(format) => {
FileMetadata::Json { format, path, .. } => {
let projected_schema = Arc::new(
schema
.project(&projection)
.context(error::ProjectSchemaSnafu)?,
);
self.build_file_stream(
JsonOpener::new(
DEFAULT_BATCH_SIZE,
projected_schema,
object_store,
object_store.clone(),
format.compression_type,
),
path,
@@ -199,17 +245,14 @@ impl StatementExecutor {
)
.await
}
Format::Parquet(_) => {
FileMetadata::Parquet { metadata, path, .. } => {
let reader = object_store
.reader(path)
.reader_with(path)
.buffer(DEFAULT_READ_BUFFER)
.await
.context(error::ReadObjectSnafu { path })?;
let buf_reader = BufReader::new(reader);
let builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
.await
.context(error::ReadParquetSnafu)?;
let builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
let upstream = builder
.build()
.context(error::BuildParquetRecordBatchStreamSnafu)?;
@@ -220,9 +263,10 @@ impl StatementExecutor {
Some(projection),
)))
}
Format::Orc(_) => {
FileMetadata::Orc { path, .. } => {
let reader = object_store
.reader(path)
.reader_with(path)
.buffer(DEFAULT_READ_BUFFER)
.await
.context(error::ReadObjectSnafu { path })?;
let stream = new_orc_stream_reader(reader)
@@ -247,11 +291,8 @@ impl StatementExecutor {
table: &req.table_name,
};
let table = self.get_table(&table_ref).await?;
let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
let (object_store, entries) = self.list_copy_from_entries(&req).await?;
let mut files = Vec::with_capacity(entries.len());
let table_schema = table.schema().arrow_schema().clone();
@@ -260,12 +301,13 @@ impl StatementExecutor {
continue;
}
let path = entry.path();
let file_schema = self
.infer_schema(&format, object_store.clone(), path)
let file_metadata = self
.collect_metadata(&object_store, format, path.to_string())
.await?;
let (file_schema_projection, table_schema_projection, compat_schema) =
generated_schema_projection_and_compatible_file_schema(&file_schema, &table_schema);
let file_schema = file_metadata.schema();
let (file_schema_projection, table_schema_projection, compat_schema) =
generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema);
let projected_file_schema = Arc::new(
file_schema
.project(&file_schema_projection)
@@ -276,25 +318,24 @@ impl StatementExecutor {
.project(&table_schema_projection)
.context(error::ProjectSchemaSnafu)?,
);
ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;
files.push((
Arc::new(compat_schema),
file_schema_projection,
projected_table_schema,
path,
file_metadata,
))
}
let mut rows_inserted = 0;
for (schema, file_schema_projection, projected_table_schema, path) in files {
for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
{
let mut stream = self
.build_read_stream(
&format,
object_store.clone(),
path,
schema,
compat_schema,
&object_store,
&file_metadata,
file_schema_projection,
)
.await?;