From 986f3bb07d088fe42010751cb74decd2d60d16d9 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 19 Jan 2024 21:26:38 +0900 Subject: [PATCH] refactor: reduce number of parquet metadata reads and enable reader buffer (#3197) refactor: reduce reading parquet metadata times and enable read buffer --- src/operator/src/error.rs | 10 +- src/operator/src/statement/copy_table_from.rs | 171 +++++++++++------- 2 files changed, 111 insertions(+), 70 deletions(-) diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 52956e8055..0492be12db 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -351,8 +351,8 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to read parquet file"))] - ReadParquet { + #[snafu(display("Failed to read parquet file metadata"))] + ReadParquetMetadata { #[snafu(source)] error: parquet::errors::ParquetError, location: Location, @@ -587,9 +587,9 @@ impl ErrorExt for Error { Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments, - Error::ReadObject { .. } | Error::ReadParquet { .. } | Error::ReadOrc { .. } => { - StatusCode::StorageUnavailable - } + Error::ReadObject { .. } + | Error::ReadParquetMetadata { .. } + | Error::ReadOrc { .. } => StatusCode::StorageUnavailable, Error::ListObjects { source, .. } | Error::ParseUrl { source, .. } diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index 2aedca15a4..61aeb5e603 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -17,8 +17,8 @@ use std::future::Future; use std::sync::Arc; use common_base::readable_size::ReadableSize; -use common_datasource::file_format::csv::{CsvConfigBuilder, CsvOpener}; -use common_datasource::file_format::json::JsonOpener; +use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener}; +use common_datasource::file_format::json::{JsonFormat, JsonOpener}; use common_datasource::file_format::orc::{ infer_orc_schema, new_orc_stream_reader, OrcArrowStreamReaderAdapter, }; @@ -32,6 +32,7 @@ use common_telemetry::{debug, tracing}; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream}; +use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata; use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datatypes::arrow::compute::can_cast_types; @@ -44,12 +45,46 @@ use session::context::QueryContextRef; use snafu::ResultExt; use table::requests::{CopyTableRequest, InsertRequest}; use table::table_reference::TableReference; -use tokio::io::BufReader; use crate::error::{self, IntoVectorsSnafu, Result}; use crate::statement::StatementExecutor; const DEFAULT_BATCH_SIZE: usize = 8192; +const DEFAULT_READ_BUFFER: usize = 256 * 1024; + +enum FileMetadata { + Parquet { + schema: SchemaRef, + metadata: ArrowReaderMetadata, + path: String, + }, + Orc { + schema: SchemaRef, + path: String, + }, + Json { + schema: SchemaRef, + format: JsonFormat, + path: String, + }, + Csv { + schema: SchemaRef, + format: CsvFormat, + path: String, + }, +} + +impl FileMetadata { + /// Returns the [SchemaRef] + pub fn schema(&self) -> &SchemaRef { + match self { + FileMetadata::Parquet { schema, .. } => schema, + FileMetadata::Orc { schema, .. } => schema, + FileMetadata::Json { schema, .. } => schema, + FileMetadata::Csv { schema, .. } => schema, + } + } +} impl StatementExecutor { async fn list_copy_from_entries( @@ -82,49 +117,62 @@ impl StatementExecutor { Ok((object_store, entries)) } - async fn infer_schema( + async fn collect_metadata( &self, - format: &Format, - object_store: ObjectStore, - path: &str, - ) -> Result { + object_store: &ObjectStore, + format: Format, + path: String, + ) -> Result { match format { - Format::Csv(format) => Ok(Arc::new( - format - .infer_schema(&object_store, path) - .await - .context(error::InferSchemaSnafu { path })?, - )), - Format::Json(format) => Ok(Arc::new( - format - .infer_schema(&object_store, path) - .await - .context(error::InferSchemaSnafu { path })?, - )), + Format::Csv(format) => Ok(FileMetadata::Csv { + schema: Arc::new( + format + .infer_schema(object_store, &path) + .await + .context(error::InferSchemaSnafu { path: &path })?, + ), + format, + path, + }), + Format::Json(format) => Ok(FileMetadata::Json { + schema: Arc::new( + format + .infer_schema(object_store, &path) + .await + .context(error::InferSchemaSnafu { path: &path })?, + ), + format, + path, + }), Format::Parquet(_) => { - let reader = object_store - .reader(path) + let mut reader = object_store + .reader(&path) .await - .context(error::ReadObjectSnafu { path })?; - - let buf_reader = BufReader::new(reader); - - let builder = ParquetRecordBatchStreamBuilder::new(buf_reader) + .context(error::ReadObjectSnafu { path: &path })?; + let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default()) .await - .context(error::ReadParquetSnafu)?; - Ok(builder.schema().clone()) + .context(error::ReadParquetMetadataSnafu)?; + + Ok(FileMetadata::Parquet { + schema: metadata.schema().clone(), + metadata, + path, + }) } Format::Orc(_) => { let reader = object_store - .reader(path) + .reader(&path) .await - .context(error::ReadObjectSnafu { path })?; + .context(error::ReadObjectSnafu { path: &path })?; let schema = infer_orc_schema(reader) .await .context(error::ReadOrcSnafu)?; - Ok(Arc::new(schema)) + Ok(FileMetadata::Orc { + schema: Arc::new(schema), + path, + }) } } } @@ -158,14 +206,13 @@ impl StatementExecutor { async fn build_read_stream( &self, - format: &Format, - object_store: ObjectStore, - path: &str, schema: SchemaRef, + object_store: &ObjectStore, + file_metadata: &FileMetadata, projection: Vec, ) -> Result { - match format { - Format::Csv(format) => { + match file_metadata { + FileMetadata::Csv { format, path, .. } => { let csv_conf = CsvConfigBuilder::default() .batch_size(DEFAULT_BATCH_SIZE) .file_schema(schema.clone()) @@ -174,24 +221,23 @@ impl StatementExecutor { .context(error::BuildCsvConfigSnafu)?; self.build_file_stream( - CsvOpener::new(csv_conf, object_store, format.compression_type), + CsvOpener::new(csv_conf, object_store.clone(), format.compression_type), path, schema, ) .await } - Format::Json(format) => { + FileMetadata::Json { format, path, .. } => { let projected_schema = Arc::new( schema .project(&projection) .context(error::ProjectSchemaSnafu)?, ); - self.build_file_stream( JsonOpener::new( DEFAULT_BATCH_SIZE, projected_schema, - object_store, + object_store.clone(), format.compression_type, ), path, @@ -199,17 +245,14 @@ impl StatementExecutor { ) .await } - Format::Parquet(_) => { + FileMetadata::Parquet { metadata, path, .. } => { let reader = object_store - .reader(path) + .reader_with(path) + .buffer(DEFAULT_READ_BUFFER) .await .context(error::ReadObjectSnafu { path })?; - let buf_reader = BufReader::new(reader); - - let builder = ParquetRecordBatchStreamBuilder::new(buf_reader) - .await - .context(error::ReadParquetSnafu)?; - + let builder = + ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone()); let upstream = builder .build() .context(error::BuildParquetRecordBatchStreamSnafu)?; @@ -220,9 +263,10 @@ impl StatementExecutor { Some(projection), ))) } - Format::Orc(_) => { + FileMetadata::Orc { path, .. } => { let reader = object_store - .reader(path) + .reader_with(path) + .buffer(DEFAULT_READ_BUFFER) .await .context(error::ReadObjectSnafu { path })?; let stream = new_orc_stream_reader(reader) @@ -247,11 +291,8 @@ impl StatementExecutor { table: &req.table_name, }; let table = self.get_table(&table_ref).await?; - let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?; - let (object_store, entries) = self.list_copy_from_entries(&req).await?; - let mut files = Vec::with_capacity(entries.len()); let table_schema = table.schema().arrow_schema().clone(); @@ -260,12 +301,13 @@ impl StatementExecutor { continue; } let path = entry.path(); - let file_schema = self - .infer_schema(&format, object_store.clone(), path) + let file_metadata = self + .collect_metadata(&object_store, format, path.to_string()) .await?; - let (file_schema_projection, table_schema_projection, compat_schema) = - generated_schema_projection_and_compatible_file_schema(&file_schema, &table_schema); + let file_schema = file_metadata.schema(); + let (file_schema_projection, table_schema_projection, compat_schema) = + generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema); let projected_file_schema = Arc::new( file_schema .project(&file_schema_projection) @@ -276,25 +318,24 @@ impl StatementExecutor { .project(&table_schema_projection) .context(error::ProjectSchemaSnafu)?, ); - ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?; files.push(( Arc::new(compat_schema), file_schema_projection, projected_table_schema, - path, + file_metadata, )) } let mut rows_inserted = 0; - for (schema, file_schema_projection, projected_table_schema, path) in files { + for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files + { let mut stream = self .build_read_stream( - &format, - object_store.clone(), - path, - schema, + compat_schema, + &object_store, + &file_metadata, file_schema_projection, ) .await?;