mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: support to copy table from/to CSV and JSON format file (#1475)
* refactor: refactor copy from executor * feat: support to copy from CSV and JSON format files * feat: support to copy table to the CSV and JSON format file * test: add tests copy from/to * chore: apply suggestions from CR
This commit is contained in:
@@ -113,9 +113,8 @@ pub enum Error {
|
||||
source: datafusion::parquet::errors::ParquetError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to infer schema from file: {}, source: {}", path, source))]
|
||||
#[snafu(display("Failed to infer schema from file, source: {}", source))]
|
||||
InferSchema {
|
||||
path: String,
|
||||
location: Location,
|
||||
source: arrow_schema::ArrowError,
|
||||
},
|
||||
|
||||
@@ -78,7 +78,7 @@ impl TryFrom<&HashMap<String, String>> for Format {
|
||||
|
||||
#[async_trait]
|
||||
pub trait FileFormat: Send + Sync + std::fmt::Debug {
|
||||
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<ArrowSchema>;
|
||||
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<ArrowSchema>;
|
||||
}
|
||||
|
||||
pub trait ArrowDecoder: Send + 'static {
|
||||
@@ -167,7 +167,7 @@ pub async fn infer_schemas(
|
||||
) -> Result<ArrowSchema> {
|
||||
let mut schemas = Vec::with_capacity(files.len());
|
||||
for file in files {
|
||||
schemas.push(file_format.infer_schema(store, file.to_string()).await?)
|
||||
schemas.push(file_format.infer_schema(store, file).await?)
|
||||
}
|
||||
ArrowSchema::try_merge(schemas).context(error::MergeSchemaSnafu)
|
||||
}
|
||||
@@ -175,14 +175,14 @@ pub async fn infer_schemas(
|
||||
pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
|
||||
mut stream: SendableRecordBatchStream,
|
||||
store: ObjectStore,
|
||||
path: String,
|
||||
path: &str,
|
||||
threshold: usize,
|
||||
encoder_factory: U,
|
||||
) -> Result<usize> {
|
||||
let writer = store
|
||||
.writer(&path)
|
||||
.writer(path)
|
||||
.await
|
||||
.context(error::WriteObjectSnafu { path: &path })?
|
||||
.context(error::WriteObjectSnafu { path })?
|
||||
.compat_write();
|
||||
|
||||
let buffer = SharedBuffer::with_capacity(threshold);
|
||||
|
||||
@@ -162,11 +162,11 @@ impl FileOpener for CsvOpener {
|
||||
|
||||
#[async_trait]
|
||||
impl FileFormat for CsvFormat {
|
||||
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<Schema> {
|
||||
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
|
||||
let reader = store
|
||||
.reader(&path)
|
||||
.reader(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path: &path })?;
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
|
||||
let decoded = self.compression_type.convert_async_read(reader);
|
||||
|
||||
@@ -179,7 +179,7 @@ impl FileFormat for CsvFormat {
|
||||
|
||||
let (schema, _records_read) =
|
||||
infer_csv_schema(reader, delimiter, schema_infer_max_record, has_header)
|
||||
.context(error::InferSchemaSnafu { path: &path })?;
|
||||
.context(error::InferSchemaSnafu)?;
|
||||
Ok(schema)
|
||||
})
|
||||
.await
|
||||
@@ -190,7 +190,7 @@ impl FileFormat for CsvFormat {
|
||||
pub async fn stream_to_csv(
|
||||
stream: SendableRecordBatchStream,
|
||||
store: ObjectStore,
|
||||
path: String,
|
||||
path: &str,
|
||||
threshold: usize,
|
||||
) -> Result<usize> {
|
||||
stream_to_file(stream, store, path, threshold, |buffer| {
|
||||
@@ -223,10 +223,7 @@ mod tests {
|
||||
async fn infer_schema_basic() {
|
||||
let csv = CsvFormat::default();
|
||||
let store = test_store(&test_data_root());
|
||||
let schema = csv
|
||||
.infer_schema(&store, "simple.csv".to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
let schema = csv.infer_schema(&store, "simple.csv").await.unwrap();
|
||||
let formatted: Vec<_> = format_schema(schema);
|
||||
|
||||
assert_eq!(
|
||||
@@ -257,7 +254,7 @@ mod tests {
|
||||
};
|
||||
let store = test_store(&test_data_root());
|
||||
let schema = json
|
||||
.infer_schema(&store, "schema_infer_limit.csv".to_string())
|
||||
.infer_schema(&store, "schema_infer_limit.csv")
|
||||
.await
|
||||
.unwrap();
|
||||
let formatted: Vec<_> = format_schema(schema);
|
||||
@@ -275,7 +272,7 @@ mod tests {
|
||||
let json = CsvFormat::default();
|
||||
let store = test_store(&test_data_root());
|
||||
let schema = json
|
||||
.infer_schema(&store, "schema_infer_limit.csv".to_string())
|
||||
.infer_schema(&store, "schema_infer_limit.csv")
|
||||
.await
|
||||
.unwrap();
|
||||
let formatted: Vec<_> = format_schema(schema);
|
||||
|
||||
@@ -80,11 +80,11 @@ impl Default for JsonFormat {
|
||||
|
||||
#[async_trait]
|
||||
impl FileFormat for JsonFormat {
|
||||
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<Schema> {
|
||||
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
|
||||
let reader = store
|
||||
.reader(&path)
|
||||
.reader(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path: &path })?;
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
|
||||
let decoded = self.compression_type.convert_async_read(reader);
|
||||
|
||||
@@ -95,8 +95,7 @@ impl FileFormat for JsonFormat {
|
||||
|
||||
let iter = ValueIter::new(&mut reader, schema_infer_max_record);
|
||||
|
||||
let schema = infer_json_schema_from_iterator(iter)
|
||||
.context(error::InferSchemaSnafu { path: &path })?;
|
||||
let schema = infer_json_schema_from_iterator(iter).context(error::InferSchemaSnafu)?;
|
||||
|
||||
Ok(schema)
|
||||
})
|
||||
@@ -149,7 +148,7 @@ impl FileOpener for JsonOpener {
|
||||
pub async fn stream_to_json(
|
||||
stream: SendableRecordBatchStream,
|
||||
store: ObjectStore,
|
||||
path: String,
|
||||
path: &str,
|
||||
threshold: usize,
|
||||
) -> Result<usize> {
|
||||
stream_to_file(stream, store, path, threshold, |buffer| {
|
||||
@@ -179,10 +178,7 @@ mod tests {
|
||||
async fn infer_schema_basic() {
|
||||
let json = JsonFormat::default();
|
||||
let store = test_store(&test_data_root());
|
||||
let schema = json
|
||||
.infer_schema(&store, "simple.json".to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
let schema = json.infer_schema(&store, "simple.json").await.unwrap();
|
||||
let formatted: Vec<_> = format_schema(schema);
|
||||
|
||||
assert_eq!(
|
||||
@@ -204,7 +200,7 @@ mod tests {
|
||||
};
|
||||
let store = test_store(&test_data_root());
|
||||
let schema = json
|
||||
.infer_schema(&store, "schema_infer_limit.json".to_string())
|
||||
.infer_schema(&store, "schema_infer_limit.json")
|
||||
.await
|
||||
.unwrap();
|
||||
let formatted: Vec<_> = format_schema(schema);
|
||||
|
||||
@@ -40,11 +40,11 @@ pub struct ParquetFormat {}
|
||||
|
||||
#[async_trait]
|
||||
impl FileFormat for ParquetFormat {
|
||||
async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result<Schema> {
|
||||
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
|
||||
let mut reader = store
|
||||
.reader(&path)
|
||||
.reader(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path: &path })?;
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
|
||||
let metadata = reader
|
||||
.get_metadata()
|
||||
@@ -171,10 +171,7 @@ mod tests {
|
||||
async fn infer_schema_basic() {
|
||||
let json = ParquetFormat::default();
|
||||
let store = test_store(&test_data_root());
|
||||
let schema = json
|
||||
.infer_schema(&store, "basic.parquet".to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
let schema = json.infer_schema(&store, "basic.parquet").await.unwrap();
|
||||
let formatted: Vec<_> = format_schema(schema);
|
||||
|
||||
assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted);
|
||||
|
||||
@@ -116,7 +116,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
|
||||
stream_to_json(
|
||||
Box::pin(stream),
|
||||
tmp_store.clone(),
|
||||
output_path.clone(),
|
||||
&output_path,
|
||||
threshold(size),
|
||||
)
|
||||
.await
|
||||
@@ -158,7 +158,7 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz
|
||||
stream_to_csv(
|
||||
Box::pin(stream),
|
||||
tmp_store.clone(),
|
||||
output_path.clone(),
|
||||
&output_path,
|
||||
threshold(size),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::task::{Context, Poll};
|
||||
|
||||
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
|
||||
use datafusion::error::Result as DfResult;
|
||||
use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream};
|
||||
use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream;
|
||||
use datafusion_common::DataFusionError;
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
@@ -39,6 +40,40 @@ type FutureStream = Pin<
|
||||
>,
|
||||
>;
|
||||
|
||||
/// ParquetRecordBatchStream -> DataFusion RecordBatchStream
|
||||
pub struct ParquetRecordBatchStreamAdapter<T> {
|
||||
stream: ParquetRecordBatchStream<T>,
|
||||
}
|
||||
|
||||
impl<T: Unpin + AsyncFileReader + Send + 'static> ParquetRecordBatchStreamAdapter<T> {
|
||||
pub fn new(stream: ParquetRecordBatchStream<T>) -> Self {
|
||||
Self { stream }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Unpin + AsyncFileReader + Send + 'static> DfRecordBatchStream
|
||||
for ParquetRecordBatchStreamAdapter<T>
|
||||
{
|
||||
fn schema(&self) -> DfSchemaRef {
|
||||
self.stream.schema().clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Unpin + AsyncFileReader + Send + 'static> Stream for ParquetRecordBatchStreamAdapter<T> {
|
||||
type Item = DfResult<DfRecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let batch = futures::ready!(Pin::new(&mut self.stream).poll_next(cx))
|
||||
.map(|r| r.map_err(|e| DataFusionError::External(Box::new(e))));
|
||||
Poll::Ready(batch)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
self.stream.size_hint()
|
||||
}
|
||||
}
|
||||
|
||||
/// Greptime SendableRecordBatchStream -> DataFusion RecordBatchStream
|
||||
pub struct DfRecordBatchStreamAdapter {
|
||||
stream: SendableRecordBatchStream,
|
||||
|
||||
@@ -435,6 +435,12 @@ pub enum Error {
|
||||
source: common_datasource::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse file format, source: {}", source))]
|
||||
ParseFileFormat {
|
||||
#[snafu(backtrace)]
|
||||
source: common_datasource::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build data source backend, source: {}", source))]
|
||||
BuildBackend {
|
||||
#[snafu(backtrace)]
|
||||
@@ -447,6 +453,26 @@ pub enum Error {
|
||||
source: common_datasource::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to infer schema from path: {}, source: {}", path, source))]
|
||||
InferSchema {
|
||||
path: String,
|
||||
#[snafu(backtrace)]
|
||||
source: common_datasource::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build csv config: {}", source))]
|
||||
BuildCsvConfig {
|
||||
source: common_datasource::file_format::csv::CsvConfigBuilderError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to write stream to path: {}, source: {}", path, source))]
|
||||
WriteStreamToFile {
|
||||
path: String,
|
||||
#[snafu(backtrace)]
|
||||
source: common_datasource::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read object in path: {}, source: {}", path, source))]
|
||||
ReadObject {
|
||||
path: String,
|
||||
@@ -454,6 +480,12 @@ pub enum Error {
|
||||
source: object_store::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read record batch, source: {}", source))]
|
||||
ReadRecordBatch {
|
||||
source: datafusion::error::DataFusionError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to read parquet file, source: {}", source))]
|
||||
ReadParquet {
|
||||
source: parquet::errors::ParquetError,
|
||||
@@ -466,6 +498,12 @@ pub enum Error {
|
||||
source: parquet::errors::ParquetError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build file stream, source: {}", source))]
|
||||
BuildFileStream {
|
||||
location: Location,
|
||||
source: datafusion::error::DataFusionError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to write parquet file, source: {}", source))]
|
||||
WriteParquet {
|
||||
#[snafu(backtrace)]
|
||||
@@ -517,7 +555,8 @@ impl ErrorExt for Error {
|
||||
| Error::ColumnNoneDefaultValue { .. }
|
||||
| Error::BuildRegex { .. }
|
||||
| Error::InvalidSchema { .. }
|
||||
| Error::PrepareImmutableTable { .. } => StatusCode::InvalidArguments,
|
||||
| Error::PrepareImmutableTable { .. }
|
||||
| Error::BuildCsvConfig { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
Error::NotSupported { .. } => StatusCode::Unsupported,
|
||||
|
||||
@@ -532,6 +571,10 @@ impl ErrorExt for Error {
|
||||
source.status_code()
|
||||
}
|
||||
|
||||
Error::ParseFileFormat { source, .. } | Error::InferSchema { source, .. } => {
|
||||
source.status_code()
|
||||
}
|
||||
|
||||
Error::Table { source }
|
||||
| Error::CopyTable { source, .. }
|
||||
| Error::Insert { source, .. } => source.status_code(),
|
||||
@@ -560,7 +603,12 @@ impl ErrorExt for Error {
|
||||
Error::TableNotFound { .. } => StatusCode::TableNotFound,
|
||||
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
|
||||
|
||||
Error::JoinTask { .. } => StatusCode::Unexpected,
|
||||
Error::JoinTask { .. }
|
||||
| Error::BuildParquetRecordBatchStream { .. }
|
||||
| Error::ReadRecordBatch { .. }
|
||||
| Error::BuildFileStream { .. }
|
||||
| Error::WriteStreamToFile { .. } => StatusCode::Unexpected,
|
||||
|
||||
Error::Catalog { source, .. } => source.status_code(),
|
||||
Error::CatalogEntrySerde { source, .. } => source.status_code(),
|
||||
|
||||
@@ -594,9 +642,7 @@ impl ErrorExt for Error {
|
||||
|
||||
Error::TableScanExec { source, .. } => source.status_code(),
|
||||
|
||||
Error::ReadObject { .. }
|
||||
| Error::ReadParquet { .. }
|
||||
| Error::BuildParquetRecordBatchStream { .. } => StatusCode::StorageUnavailable,
|
||||
Error::ReadObject { .. } | Error::ReadParquet { .. } => StatusCode::StorageUnavailable,
|
||||
|
||||
Error::ListObjects { source }
|
||||
| Error::ParseUrl { source }
|
||||
|
||||
@@ -14,17 +14,28 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_compat::CompatExt;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_datasource::file_format::csv::{CsvConfigBuilder, CsvOpener};
|
||||
use common_datasource::file_format::json::JsonOpener;
|
||||
use common_datasource::file_format::{FileFormat, Format};
|
||||
use common_datasource::lister::{Lister, Source};
|
||||
use common_datasource::object_store::{build_backend, parse_url};
|
||||
use common_datasource::util::find_dir_and_filename;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::adapter::ParquetRecordBatchStreamAdapter;
|
||||
use common_recordbatch::DfSendableRecordBatchStream;
|
||||
use datafusion::datasource::listing::PartitionedFile;
|
||||
use datafusion::datasource::object_store::ObjectStoreUrl;
|
||||
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
|
||||
use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream};
|
||||
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
|
||||
use datatypes::arrow::datatypes::{DataType, SchemaRef};
|
||||
use datatypes::vectors::Helper;
|
||||
use futures_util::StreamExt;
|
||||
use object_store::{Entry, EntryMode, Metakey, ObjectStore};
|
||||
use regex::Regex;
|
||||
use snafu::ResultExt;
|
||||
use table::engine::TableReference;
|
||||
@@ -34,15 +45,13 @@ use tokio::io::BufReader;
|
||||
use crate::error::{self, IntoVectorsSnafu, Result};
|
||||
use crate::statement::StatementExecutor;
|
||||
|
||||
impl StatementExecutor {
|
||||
pub(crate) async fn copy_table_from(&self, req: CopyTableRequest) -> Result<Output> {
|
||||
let table_ref = TableReference {
|
||||
catalog: &req.catalog_name,
|
||||
schema: &req.schema_name,
|
||||
table: &req.table_name,
|
||||
};
|
||||
let table = self.get_table(&table_ref).await?;
|
||||
const DEFAULT_BATCH_SIZE: usize = 8192;
|
||||
|
||||
impl StatementExecutor {
|
||||
async fn list_copy_from_entries(
|
||||
&self,
|
||||
req: &CopyTableRequest,
|
||||
) -> Result<(ObjectStore, Vec<Entry>)> {
|
||||
let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
|
||||
|
||||
let object_store =
|
||||
@@ -66,6 +75,137 @@ impl StatementExecutor {
|
||||
|
||||
let entries = lister.list().await.context(error::ListObjectsSnafu)?;
|
||||
|
||||
Ok((object_store, entries))
|
||||
}
|
||||
|
||||
async fn infer_schema(
|
||||
&self,
|
||||
format: &Format,
|
||||
object_store: ObjectStore,
|
||||
path: &str,
|
||||
) -> Result<SchemaRef> {
|
||||
match format {
|
||||
Format::Csv(format) => Ok(Arc::new(
|
||||
format
|
||||
.infer_schema(&object_store, path)
|
||||
.await
|
||||
.context(error::InferSchemaSnafu { path })?,
|
||||
)),
|
||||
Format::Json(format) => Ok(Arc::new(
|
||||
format
|
||||
.infer_schema(&object_store, path)
|
||||
.await
|
||||
.context(error::InferSchemaSnafu { path })?,
|
||||
)),
|
||||
Format::Parquet(_) => {
|
||||
let reader = object_store
|
||||
.reader(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
|
||||
let buf_reader = BufReader::new(reader);
|
||||
|
||||
let builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
|
||||
.await
|
||||
.context(error::ReadParquetSnafu)?;
|
||||
Ok(builder.schema().clone())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_file_stream<F: FileOpener + Send + 'static>(
|
||||
&self,
|
||||
opener: F,
|
||||
filename: &str,
|
||||
file_schema: SchemaRef,
|
||||
) -> Result<DfSendableRecordBatchStream> {
|
||||
let stream = FileStream::new(
|
||||
&FileScanConfig {
|
||||
object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
|
||||
file_schema,
|
||||
file_groups: vec![vec![PartitionedFile::new(filename.to_string(), 10)]],
|
||||
statistics: Default::default(),
|
||||
projection: None,
|
||||
limit: None,
|
||||
table_partition_cols: vec![],
|
||||
output_ordering: None,
|
||||
infinite_source: false,
|
||||
},
|
||||
0,
|
||||
opener,
|
||||
&ExecutionPlanMetricsSet::new(),
|
||||
)
|
||||
.context(error::BuildFileStreamSnafu)?;
|
||||
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
async fn build_read_stream(
|
||||
&self,
|
||||
format: &Format,
|
||||
object_store: ObjectStore,
|
||||
path: &str,
|
||||
schema: SchemaRef,
|
||||
) -> Result<DfSendableRecordBatchStream> {
|
||||
match format {
|
||||
Format::Csv(format) => {
|
||||
let csv_conf = CsvConfigBuilder::default()
|
||||
.batch_size(DEFAULT_BATCH_SIZE)
|
||||
.file_schema(schema.clone())
|
||||
.build()
|
||||
.context(error::BuildCsvConfigSnafu)?;
|
||||
|
||||
self.build_file_stream(
|
||||
CsvOpener::new(csv_conf, object_store, format.compression_type),
|
||||
path,
|
||||
schema,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Format::Json(format) => {
|
||||
self.build_file_stream(
|
||||
JsonOpener::new(
|
||||
DEFAULT_BATCH_SIZE,
|
||||
schema.clone(),
|
||||
object_store,
|
||||
format.compression_type,
|
||||
),
|
||||
path,
|
||||
schema,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Format::Parquet(_) => {
|
||||
let reader = object_store
|
||||
.reader(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
|
||||
let buf_reader = BufReader::new(reader.compat());
|
||||
|
||||
let builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
|
||||
.await
|
||||
.context(error::ReadParquetSnafu)?;
|
||||
|
||||
let upstream = builder
|
||||
.build()
|
||||
.context(error::BuildParquetRecordBatchStreamSnafu)?;
|
||||
|
||||
Ok(Box::pin(ParquetRecordBatchStreamAdapter::new(upstream)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn copy_table_from(&self, req: CopyTableRequest) -> Result<Output> {
|
||||
let table_ref = TableReference {
|
||||
catalog: &req.catalog_name,
|
||||
schema: &req.schema_name,
|
||||
table: &req.table_name,
|
||||
};
|
||||
let table = self.get_table(&table_ref).await?;
|
||||
|
||||
let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
|
||||
|
||||
let fields = table
|
||||
.schema()
|
||||
.arrow_schema()
|
||||
@@ -74,29 +214,33 @@ impl StatementExecutor {
|
||||
.map(|f| f.name().to_string())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut rows_inserted = 0;
|
||||
let (object_store, entries) = self.list_copy_from_entries(&req).await?;
|
||||
|
||||
let mut files = Vec::with_capacity(entries.len());
|
||||
|
||||
for entry in entries.iter() {
|
||||
let path = entry.path();
|
||||
// skips directories.
|
||||
if entry.path().ends_with('/') {
|
||||
let metadata = object_store
|
||||
.metadata(entry, Metakey::Mode)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path: entry.path() })?;
|
||||
if metadata.mode() != EntryMode::FILE {
|
||||
continue;
|
||||
}
|
||||
let reader = object_store
|
||||
.reader(path)
|
||||
.await
|
||||
.context(error::ReadObjectSnafu { path })?;
|
||||
let path = entry.path();
|
||||
let file_schema = self
|
||||
.infer_schema(&format, object_store.clone(), path)
|
||||
.await?;
|
||||
|
||||
let buf_reader = BufReader::new(reader.compat());
|
||||
ensure_schema_matches_ignore_timezone(&file_schema, table.schema().arrow_schema())?;
|
||||
|
||||
let builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
|
||||
.await
|
||||
.context(error::ReadParquetSnafu)?;
|
||||
files.push((file_schema, path))
|
||||
}
|
||||
|
||||
ensure_schema_matches_ignore_timezone(builder.schema(), table.schema().arrow_schema())?;
|
||||
|
||||
let mut stream = builder
|
||||
.build()
|
||||
.context(error::BuildParquetRecordBatchStreamSnafu)?;
|
||||
let mut rows_inserted = 0;
|
||||
for (schema, path) in files {
|
||||
let mut stream = self
|
||||
.build_read_stream(&format, object_store.clone(), path, schema)
|
||||
.await?;
|
||||
|
||||
// TODO(hl): make this configurable through options.
|
||||
let pending_mem_threshold = ReadableSize::mb(32).as_bytes();
|
||||
@@ -104,7 +248,7 @@ impl StatementExecutor {
|
||||
let mut pending = vec![];
|
||||
|
||||
while let Some(r) = stream.next().await {
|
||||
let record_batch = r.context(error::ReadParquetSnafu)?;
|
||||
let record_batch = r.context(error::ReadRecordBatchSnafu)?;
|
||||
let vectors =
|
||||
Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
|
||||
|
||||
|
||||
@@ -12,9 +12,16 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_datasource::file_format::csv::stream_to_csv;
|
||||
use common_datasource::file_format::json::stream_to_json;
|
||||
use common_datasource::file_format::Format;
|
||||
use common_datasource::object_store::{build_backend, parse_url};
|
||||
use common_query::physical_plan::SessionContext;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
use storage::sst::SstInfo;
|
||||
use storage::{ParquetWriter, Source};
|
||||
@@ -25,6 +32,46 @@ use crate::error::{self, Result, WriteParquetSnafu};
|
||||
use crate::statement::StatementExecutor;
|
||||
|
||||
impl StatementExecutor {
|
||||
async fn stream_to_file(
|
||||
&self,
|
||||
stream: SendableRecordBatchStream,
|
||||
format: &Format,
|
||||
object_store: ObjectStore,
|
||||
path: &str,
|
||||
) -> Result<usize> {
|
||||
let threshold = ReadableSize::mb(4).as_bytes() as usize;
|
||||
|
||||
match format {
|
||||
Format::Csv(_) => stream_to_csv(
|
||||
Box::pin(DfRecordBatchStreamAdapter::new(stream)),
|
||||
object_store,
|
||||
path,
|
||||
threshold,
|
||||
)
|
||||
.await
|
||||
.context(error::WriteStreamToFileSnafu { path }),
|
||||
Format::Json(_) => stream_to_json(
|
||||
Box::pin(DfRecordBatchStreamAdapter::new(stream)),
|
||||
object_store,
|
||||
path,
|
||||
threshold,
|
||||
)
|
||||
.await
|
||||
.context(error::WriteStreamToFileSnafu { path }),
|
||||
Format::Parquet(_) => {
|
||||
let writer = ParquetWriter::new(path, Source::Stream(stream), object_store);
|
||||
let rows_copied = writer
|
||||
.write_sst(&storage::sst::WriteOptions::default())
|
||||
.await
|
||||
.context(WriteParquetSnafu)?
|
||||
.map(|SstInfo { num_rows, .. }| num_rows)
|
||||
.unwrap_or(0);
|
||||
|
||||
Ok(rows_copied)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result<Output> {
|
||||
let table_ref = TableReference {
|
||||
catalog: &req.catalog_name,
|
||||
@@ -33,6 +80,8 @@ impl StatementExecutor {
|
||||
};
|
||||
let table = self.get_table(&table_ref).await?;
|
||||
|
||||
let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
|
||||
|
||||
let stream = table
|
||||
.scan(None, &[], None)
|
||||
.await
|
||||
@@ -48,14 +97,9 @@ impl StatementExecutor {
|
||||
let object_store =
|
||||
build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;
|
||||
|
||||
let writer = ParquetWriter::new(&path, Source::Stream(stream), object_store);
|
||||
|
||||
let rows_copied = writer
|
||||
.write_sst(&storage::sst::WriteOptions::default())
|
||||
.await
|
||||
.context(WriteParquetSnafu)?
|
||||
.map(|SstInfo { num_rows, .. }| num_rows)
|
||||
.unwrap_or(0);
|
||||
let rows_copied = self
|
||||
.stream_to_file(stream, &format, object_store, &path)
|
||||
.await?;
|
||||
|
||||
Ok(Output::AffectedRows(rows_copied))
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 16552765570
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
Copy demo TO '/tmp/demo/export/demo.parquet';
|
||||
Copy demo TO '/tmp/demo/export/parquet/demo.parquet';
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
@@ -14,7 +14,7 @@ CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
Copy with_filename FROM '/tmp/demo/export/demo.parquet';
|
||||
Copy with_filename FROM '/tmp/demo/export/parquet/demo.parquet';
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
@@ -31,7 +31,7 @@ CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
Copy with_path FROM '/tmp/demo/export/';
|
||||
Copy with_path FROM '/tmp/demo/export/parquet/';
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
@@ -48,7 +48,7 @@ CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp t
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
Copy with_pattern FROM '/tmp/demo/export/' WITH (PATTERN = 'demo.*');
|
||||
Copy with_pattern FROM '/tmp/demo/export/parquet/' WITH (PATTERN = 'demo.*');
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
|
||||
@@ -2,23 +2,23 @@ CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time inde
|
||||
|
||||
insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000);
|
||||
|
||||
Copy demo TO '/tmp/demo/export/demo.parquet';
|
||||
Copy demo TO '/tmp/demo/export/parquet/demo.parquet';
|
||||
|
||||
CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Copy with_filename FROM '/tmp/demo/export/demo.parquet';
|
||||
Copy with_filename FROM '/tmp/demo/export/parquet/demo.parquet';
|
||||
|
||||
select * from with_filename order by ts;
|
||||
|
||||
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Copy with_path FROM '/tmp/demo/export/';
|
||||
Copy with_path FROM '/tmp/demo/export/parquet/';
|
||||
|
||||
select * from with_path order by ts;
|
||||
|
||||
CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Copy with_pattern FROM '/tmp/demo/export/' WITH (PATTERN = 'demo.*');
|
||||
Copy with_pattern FROM '/tmp/demo/export/parquet/' WITH (PATTERN = 'demo.*');
|
||||
|
||||
select * from with_pattern order by ts;
|
||||
|
||||
|
||||
79
tests/cases/standalone/common/copy/copy_from_fs_csv.result
Normal file
79
tests/cases/standalone/common/copy/copy_from_fs_csv.result
Normal file
@@ -0,0 +1,79 @@
|
||||
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000);
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
Copy demo TO '/tmp/demo/export/csv/demo.csv' with (format='csv');
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
Copy with_filename FROM '/tmp/demo/export/csv/demo.csv' with (format='csv');
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
select * from with_filename order by ts;
|
||||
|
||||
+-------+------+--------+---------------------+
|
||||
| host | cpu | memory | ts |
|
||||
+-------+------+--------+---------------------+
|
||||
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
|
||||
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
|
||||
+-------+------+--------+---------------------+
|
||||
|
||||
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
Copy with_path FROM '/tmp/demo/export/csv/' with (format='csv');
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
select * from with_path order by ts;
|
||||
|
||||
+-------+------+--------+---------------------+
|
||||
| host | cpu | memory | ts |
|
||||
+-------+------+--------+---------------------+
|
||||
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
|
||||
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
|
||||
+-------+------+--------+---------------------+
|
||||
|
||||
CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
Copy with_pattern FROM '/tmp/demo/export/csv/' WITH (pattern = 'demo.*',format='csv');
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
select * from with_pattern order by ts;
|
||||
|
||||
+-------+------+--------+---------------------+
|
||||
| host | cpu | memory | ts |
|
||||
+-------+------+--------+---------------------+
|
||||
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
|
||||
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
|
||||
+-------+------+--------+---------------------+
|
||||
|
||||
drop table demo;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
drop table with_filename;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
drop table with_path;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
drop table with_pattern;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
31
tests/cases/standalone/common/copy/copy_from_fs_csv.sql
Normal file
31
tests/cases/standalone/common/copy/copy_from_fs_csv.sql
Normal file
@@ -0,0 +1,31 @@
|
||||
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||
|
||||
insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000);
|
||||
|
||||
Copy demo TO '/tmp/demo/export/csv/demo.csv' with (format='csv');
|
||||
|
||||
CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Copy with_filename FROM '/tmp/demo/export/csv/demo.csv' with (format='csv');
|
||||
|
||||
select * from with_filename order by ts;
|
||||
|
||||
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Copy with_path FROM '/tmp/demo/export/csv/' with (format='csv');
|
||||
|
||||
select * from with_path order by ts;
|
||||
|
||||
CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Copy with_pattern FROM '/tmp/demo/export/csv/' WITH (pattern = 'demo.*',format='csv');
|
||||
|
||||
select * from with_pattern order by ts;
|
||||
|
||||
drop table demo;
|
||||
|
||||
drop table with_filename;
|
||||
|
||||
drop table with_path;
|
||||
|
||||
drop table with_pattern;
|
||||
79
tests/cases/standalone/common/copy/copy_from_fs_json.result
Normal file
79
tests/cases/standalone/common/copy/copy_from_fs_json.result
Normal file
@@ -0,0 +1,79 @@
|
||||
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000);
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
Copy demo TO '/tmp/demo/export/json/demo.json' with (format='json');
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
Copy with_filename FROM '/tmp/demo/export/json/demo.json' with (format='json');
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
select * from with_filename order by ts;
|
||||
|
||||
+-------+------+--------+---------------------+
|
||||
| host | cpu | memory | ts |
|
||||
+-------+------+--------+---------------------+
|
||||
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
|
||||
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
|
||||
+-------+------+--------+---------------------+
|
||||
|
||||
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
Copy with_path FROM '/tmp/demo/export/json/' with (format='json');
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
select * from with_path order by ts;
|
||||
|
||||
+-------+------+--------+---------------------+
|
||||
| host | cpu | memory | ts |
|
||||
+-------+------+--------+---------------------+
|
||||
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
|
||||
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
|
||||
+-------+------+--------+---------------------+
|
||||
|
||||
CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
Copy with_pattern FROM '/tmp/demo/export/json/' WITH (pattern = 'demo.*',format='json');
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
select * from with_pattern order by ts;
|
||||
|
||||
+-------+------+--------+---------------------+
|
||||
| host | cpu | memory | ts |
|
||||
+-------+------+--------+---------------------+
|
||||
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
|
||||
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
|
||||
+-------+------+--------+---------------------+
|
||||
|
||||
drop table demo;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
drop table with_filename;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
drop table with_path;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
drop table with_pattern;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
31
tests/cases/standalone/common/copy/copy_from_fs_json.sql
Normal file
31
tests/cases/standalone/common/copy/copy_from_fs_json.sql
Normal file
@@ -0,0 +1,31 @@
|
||||
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||
|
||||
insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000);
|
||||
|
||||
Copy demo TO '/tmp/demo/export/json/demo.json' with (format='json');
|
||||
|
||||
CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Copy with_filename FROM '/tmp/demo/export/json/demo.json' with (format='json');
|
||||
|
||||
select * from with_filename order by ts;
|
||||
|
||||
CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Copy with_path FROM '/tmp/demo/export/json/' with (format='json');
|
||||
|
||||
select * from with_path order by ts;
|
||||
|
||||
CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index);
|
||||
|
||||
Copy with_pattern FROM '/tmp/demo/export/json/' WITH (pattern = 'demo.*',format='json');
|
||||
|
||||
select * from with_pattern order by ts;
|
||||
|
||||
drop table demo;
|
||||
|
||||
drop table with_filename;
|
||||
|
||||
drop table with_path;
|
||||
|
||||
drop table with_pattern;
|
||||
@@ -10,6 +10,14 @@ Copy demo TO '/tmp/export/demo.parquet';
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
Copy demo TO '/tmp/export/demo.csv' with (format='csv');
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
Copy demo TO '/tmp/export/demo.json' with (format='json');
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
drop table demo;
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
@@ -4,4 +4,8 @@ insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 16552765570
|
||||
|
||||
Copy demo TO '/tmp/export/demo.parquet';
|
||||
|
||||
Copy demo TO '/tmp/export/demo.csv' with (format='csv');
|
||||
|
||||
Copy demo TO '/tmp/export/demo.json' with (format='json');
|
||||
|
||||
drop table demo;
|
||||
|
||||
Reference in New Issue
Block a user