feat: implement compressed CSV/JSON export functionality (#7162)

* feat: implement compressed CSV/JSON export functionality

- Add CompressedWriter for real-time compression during CSV/JSON export
- Support GZIP, BZIP2, XZ, ZSTD compression formats
- Remove LazyBufferedWriter dependency for simplified architecture
- Implement Encoder -> Compressor -> FileWriter data flow
- Add tests for compressed CSV/JSON export

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>

* feat: implement compressed CSV/JSON export functionality

- refactor and extend compressed_writer tests
- add coverage for Bzip2 and Xz compression

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>

* feat: implement compressed CSV/JSON export functionality

- Switch to threshold-based chunked flushing
- Avoid unnecessary writes on empty buffers
- Replace direct write_all() calls with the new helper for consistency

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>

* feat: implement compressed CSV/JSON import (COPY FROM) functionality

- Add support for reading compressed CSV and JSON in COPY FROM
- Support GZIP, BZIP2, XZ, ZSTD compression formats
- Add tests for compressed CSV/JSON import

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>

* feat: implement compressed CSV/JSON export/import functionality

- Fix review comments

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>

* feat: implement compressed CSV/JSON export/import functionality

- Move temp_dir out of the loop

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>

* feat: implement compressed CSV/JSON export/import functionality

- Fix unreasonable locking logic

Co-authored-by: jeremyhi <jiachun_feng@proton.me>
Signed-off-by: McKnight22 <tao.wang.22@outlook.com>

---------

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>
Co-authored-by: jeremyhi <jiachun_feng@proton.me>
This commit is contained in:
McKnight22
2025-11-18 10:55:58 +08:00
committed by GitHub
parent 4e9f419de7
commit 605f3270e5
19 changed files with 1632 additions and 242 deletions

View File

@@ -12,28 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::future::Future;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::parquet::format::FileMetaData;
use snafu::{OptionExt, ResultExt};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use crate::error::{self, Result};
use crate::share_buffer::SharedBuffer;
pub struct LazyBufferedWriter<T, U, F> {
path: String,
writer_factory: F,
writer: Option<T>,
/// None stands for [`LazyBufferedWriter`] closed.
encoder: Option<U>,
buffer: SharedBuffer,
rows_written: usize,
bytes_written: u64,
threshold: usize,
}
use crate::error::Result;
pub trait DfRecordBatchEncoder {
fn write(&mut self, batch: &RecordBatch) -> Result<()>;
@@ -43,126 +26,3 @@ pub trait DfRecordBatchEncoder {
pub trait ArrowWriterCloser {
async fn close(mut self) -> Result<FileMetaData>;
}
impl<
T: AsyncWrite + Send + Unpin,
U: DfRecordBatchEncoder + ArrowWriterCloser,
F: Fn(String) -> Fut,
Fut: Future<Output = Result<T>>,
> LazyBufferedWriter<T, U, F>
{
/// Closes `LazyBufferedWriter` and optionally flushes all data to underlying storage
/// if any row's been written.
pub async fn close_with_arrow_writer(mut self) -> Result<(FileMetaData, u64)> {
let encoder = self
.encoder
.take()
.context(error::BufferedWriterClosedSnafu)?;
let metadata = encoder.close().await?;
// It's important to shut down! flushes all pending writes
self.close_inner_writer().await?;
Ok((metadata, self.bytes_written))
}
}
impl<
T: AsyncWrite + Send + Unpin,
U: DfRecordBatchEncoder,
F: Fn(String) -> Fut,
Fut: Future<Output = Result<T>>,
> LazyBufferedWriter<T, U, F>
{
/// Closes the writer and flushes the buffer data.
pub async fn close_inner_writer(&mut self) -> Result<()> {
// Use `rows_written` to keep a track of if any rows have been written.
// If no row's been written, then we can simply close the underlying
// writer without flush so that no file will be actually created.
if self.rows_written != 0 {
self.bytes_written += self.try_flush(true).await?;
}
if let Some(writer) = &mut self.writer {
writer.shutdown().await.context(error::AsyncWriteSnafu)?;
}
Ok(())
}
pub fn new(
threshold: usize,
buffer: SharedBuffer,
encoder: U,
path: impl AsRef<str>,
writer_factory: F,
) -> Self {
Self {
path: path.as_ref().to_string(),
threshold,
encoder: Some(encoder),
buffer,
rows_written: 0,
bytes_written: 0,
writer_factory,
writer: None,
}
}
pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
let encoder = self
.encoder
.as_mut()
.context(error::BufferedWriterClosedSnafu)?;
encoder.write(batch)?;
self.rows_written += batch.num_rows();
self.bytes_written += self.try_flush(false).await?;
Ok(())
}
async fn try_flush(&mut self, all: bool) -> Result<u64> {
let mut bytes_written: u64 = 0;
// Once buffered data size reaches threshold, split the data in chunks (typically 4MB)
// and write to underlying storage.
while self.buffer.buffer.lock().unwrap().len() >= self.threshold {
let chunk = {
let mut buffer = self.buffer.buffer.lock().unwrap();
buffer.split_to(self.threshold)
};
let size = chunk.len();
self.maybe_init_writer()
.await?
.write_all(&chunk)
.await
.context(error::AsyncWriteSnafu)?;
bytes_written += size as u64;
}
if all {
bytes_written += self.try_flush_all().await?;
}
Ok(bytes_written)
}
/// Only initiates underlying file writer when rows have been written.
async fn maybe_init_writer(&mut self) -> Result<&mut T> {
if let Some(ref mut writer) = self.writer {
Ok(writer)
} else {
let writer = (self.writer_factory)(self.path.clone()).await?;
Ok(self.writer.insert(writer))
}
}
async fn try_flush_all(&mut self) -> Result<u64> {
let remain = self.buffer.buffer.lock().unwrap().split();
let size = remain.len();
self.maybe_init_writer()
.await?
.write_all(&remain)
.await
.context(error::AsyncWriteSnafu)?;
Ok(size as u64)
}
}

View File

@@ -0,0 +1,202 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, ZstdEncoder};
use snafu::ResultExt;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use crate::compression::CompressionType;
use crate::error::{self, Result};
/// A compressed writer that wraps an underlying async writer with compression.
///
/// This writer supports multiple compression formats including GZIP, BZIP2, XZ, and ZSTD.
/// It provides transparent compression for any async writer implementation.
pub struct CompressedWriter {
inner: Box<dyn AsyncWrite + Unpin + Send>,
compression_type: CompressionType,
}
impl CompressedWriter {
/// Creates a new compressed writer with the specified compression type.
///
/// # Arguments
///
/// * `writer` - The underlying writer to wrap with compression
/// * `compression_type` - The type of compression to apply
pub fn new(
writer: impl AsyncWrite + Unpin + Send + 'static,
compression_type: CompressionType,
) -> Self {
let inner: Box<dyn AsyncWrite + Unpin + Send> = match compression_type {
CompressionType::Gzip => Box::new(GzipEncoder::new(writer)),
CompressionType::Bzip2 => Box::new(BzEncoder::new(writer)),
CompressionType::Xz => Box::new(XzEncoder::new(writer)),
CompressionType::Zstd => Box::new(ZstdEncoder::new(writer)),
CompressionType::Uncompressed => Box::new(writer),
};
Self {
inner,
compression_type,
}
}
/// Returns the compression type used by this writer.
pub fn compression_type(&self) -> CompressionType {
self.compression_type
}
/// Flush the writer and shutdown compression
pub async fn shutdown(mut self) -> Result<()> {
self.inner
.shutdown()
.await
.context(error::AsyncWriteSnafu)?;
Ok(())
}
}
impl AsyncWrite for CompressedWriter {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}
}
/// A trait for converting async writers into compressed writers.
///
/// This trait is automatically implemented for all types that implement [`AsyncWrite`].
pub trait IntoCompressedWriter {
/// Converts this writer into a [`CompressedWriter`] with the specified compression type.
///
/// # Arguments
///
/// * `self` - The underlying writer to wrap with compression
/// * `compression_type` - The type of compression to apply
fn into_compressed_writer(self, compression_type: CompressionType) -> CompressedWriter
where
Self: AsyncWrite + Unpin + Send + 'static + Sized,
{
CompressedWriter::new(self, compression_type)
}
}
impl<W: AsyncWrite + Unpin + Send + 'static> IntoCompressedWriter for W {}
#[cfg(test)]
mod tests {
use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex};
use super::*;
#[tokio::test]
async fn test_compressed_writer_gzip() {
let (duplex_writer, mut duplex_reader) = duplex(1024);
let mut writer = duplex_writer.into_compressed_writer(CompressionType::Gzip);
let original = b"test data for gzip compression";
writer.write_all(original).await.unwrap();
writer.shutdown().await.unwrap();
let mut buffer = Vec::new();
duplex_reader.read_to_end(&mut buffer).await.unwrap();
// The compressed data should be different from the original
assert_ne!(buffer, original);
assert!(!buffer.is_empty());
}
#[tokio::test]
async fn test_compressed_writer_bzip2() {
let (duplex_writer, mut duplex_reader) = duplex(1024);
let mut writer = duplex_writer.into_compressed_writer(CompressionType::Bzip2);
let original = b"test data for bzip2 compression";
writer.write_all(original).await.unwrap();
writer.shutdown().await.unwrap();
let mut buffer = Vec::new();
duplex_reader.read_to_end(&mut buffer).await.unwrap();
// The compressed data should be different from the original
assert_ne!(buffer, original);
assert!(!buffer.is_empty());
}
#[tokio::test]
async fn test_compressed_writer_xz() {
let (duplex_writer, mut duplex_reader) = duplex(1024);
let mut writer = duplex_writer.into_compressed_writer(CompressionType::Xz);
let original = b"test data for xz compression";
writer.write_all(original).await.unwrap();
writer.shutdown().await.unwrap();
let mut buffer = Vec::new();
duplex_reader.read_to_end(&mut buffer).await.unwrap();
// The compressed data should be different from the original
assert_ne!(buffer, original);
assert!(!buffer.is_empty());
}
#[tokio::test]
async fn test_compressed_writer_zstd() {
let (duplex_writer, mut duplex_reader) = duplex(1024);
let mut writer = duplex_writer.into_compressed_writer(CompressionType::Zstd);
let original = b"test data for zstd compression";
writer.write_all(original).await.unwrap();
writer.shutdown().await.unwrap();
let mut buffer = Vec::new();
duplex_reader.read_to_end(&mut buffer).await.unwrap();
// The compressed data should be different from the original
assert_ne!(buffer, original);
assert!(!buffer.is_empty());
}
#[tokio::test]
async fn test_compressed_writer_uncompressed() {
let (duplex_writer, mut duplex_reader) = duplex(1024);
let mut writer = duplex_writer.into_compressed_writer(CompressionType::Uncompressed);
let original = b"test data for uncompressed";
writer.write_all(original).await.unwrap();
writer.shutdown().await.unwrap();
let mut buffer = Vec::new();
duplex_reader.read_to_end(&mut buffer).await.unwrap();
// Uncompressed data should be the same as the original
assert_eq!(buffer, original);
}
}

View File

@@ -194,12 +194,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Buffered writer closed"))]
BufferedWriterClosed {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to write parquet file, path: {}", path))]
WriteParquet {
path: String,
@@ -208,6 +202,14 @@ pub enum Error {
#[snafu(source)]
error: parquet::errors::ParquetError,
},
#[snafu(display("Failed to build file stream"))]
BuildFileStream {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: datafusion::error::DataFusionError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -239,7 +241,7 @@ impl ErrorExt for Error {
| ReadRecordBatch { .. }
| WriteRecordBatch { .. }
| EncodeRecordBatch { .. }
| BufferedWriterClosed { .. }
| BuildFileStream { .. }
| OrcReader { .. } => StatusCode::Unexpected,
}
}

View File

@@ -30,12 +30,22 @@ use arrow::record_batch::RecordBatch;
use arrow_schema::{ArrowError, Schema as ArrowSchema};
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use datafusion::datasource::physical_plan::FileOpenFuture;
use common_recordbatch::DfSendableRecordBatchStream;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType as DfCompressionType;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
FileGroup, FileOpenFuture, FileScanConfigBuilder, FileSource, FileStream,
};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datatypes::arrow::datatypes::SchemaRef;
use futures::{StreamExt, TryStreamExt};
use object_store::ObjectStore;
use object_store_opendal::OpendalStore;
use snafu::ResultExt;
use tokio::io::AsyncWriteExt;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use self::csv::CsvFormat;
@@ -43,7 +53,8 @@ use self::json::JsonFormat;
use self::orc::OrcFormat;
use self::parquet::ParquetFormat;
use crate::DEFAULT_WRITE_BUFFER_SIZE;
use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter};
use crate::buffered_writer::DfRecordBatchEncoder;
use crate::compressed_writer::{CompressedWriter, IntoCompressedWriter};
use crate::compression::CompressionType;
use crate::error::{self, Result};
use crate::share_buffer::SharedBuffer;
@@ -195,33 +206,128 @@ pub async fn infer_schemas(
ArrowSchema::try_merge(schemas).context(error::MergeSchemaSnafu)
}
pub async fn stream_to_file<T: DfRecordBatchEncoder, U: Fn(SharedBuffer) -> T>(
/// Writes data to a compressed writer if the data is not empty.
///
/// Does nothing if `data` is empty; otherwise writes all data and returns any error.
async fn write_to_compressed_writer(
compressed_writer: &mut CompressedWriter,
data: &[u8],
) -> Result<()> {
if !data.is_empty() {
compressed_writer
.write_all(data)
.await
.context(error::AsyncWriteSnafu)?;
}
Ok(())
}
/// Streams [SendableRecordBatchStream] to a file with optional compression support.
/// Data is buffered and flushed according to the given `threshold`.
/// Ensures that writer resources are cleanly released and that an empty file is not
/// created if no rows are written.
///
/// Returns the total number of rows successfully written.
pub async fn stream_to_file<E>(
mut stream: SendableRecordBatchStream,
store: ObjectStore,
path: &str,
threshold: usize,
concurrency: usize,
encoder_factory: U,
) -> Result<usize> {
compression_type: CompressionType,
encoder_factory: impl Fn(SharedBuffer) -> E,
) -> Result<usize>
where
E: DfRecordBatchEncoder,
{
// Create the file writer with OpenDAL's built-in buffering
let writer = store
.writer_with(path)
.concurrent(concurrency)
.chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.await
.with_context(|_| error::WriteObjectSnafu { path })?
.into_futures_async_write()
.compat_write();
// Apply compression if needed
let mut compressed_writer = writer.into_compressed_writer(compression_type);
// Create a buffer for the encoder
let buffer = SharedBuffer::with_capacity(threshold);
let encoder = encoder_factory(buffer.clone());
let mut writer = LazyBufferedWriter::new(threshold, buffer, encoder, path, |path| async {
store
.writer_with(&path)
.concurrent(concurrency)
.chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.await
.map(|v| v.into_futures_async_write().compat_write())
.context(error::WriteObjectSnafu { path })
});
let mut encoder = encoder_factory(buffer.clone());
let mut rows = 0;
// Process each record batch
while let Some(batch) = stream.next().await {
let batch = batch.context(error::ReadRecordBatchSnafu)?;
writer.write(&batch).await?;
// Write batch using the encoder
encoder.write(&batch)?;
rows += batch.num_rows();
loop {
let chunk = {
let mut buffer_guard = buffer.buffer.lock().unwrap();
if buffer_guard.len() < threshold {
break;
}
buffer_guard.split_to(threshold)
};
write_to_compressed_writer(&mut compressed_writer, &chunk).await?;
}
}
writer.close_inner_writer().await?;
// If no row's been written, just simply close the underlying writer
// without flush so that no file will be actually created.
if rows != 0 {
// Final flush of any remaining data
let final_data = {
let mut buffer_guard = buffer.buffer.lock().unwrap();
buffer_guard.split()
};
write_to_compressed_writer(&mut compressed_writer, &final_data).await?;
}
// Shutdown compression and close writer
compressed_writer.shutdown().await?;
Ok(rows)
}
/// Creates a [FileStream] for reading data from a file with optional column projection
/// and compression support.
///
/// Returns [SendableRecordBatchStream].
pub async fn file_to_stream(
store: &ObjectStore,
filename: &str,
file_schema: SchemaRef,
file_source: Arc<dyn FileSource>,
projection: Option<Vec<usize>>,
compression_type: CompressionType,
) -> Result<DfSendableRecordBatchStream> {
let df_compression: DfCompressionType = compression_type.into();
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
file_source.clone(),
)
.with_file_group(FileGroup::new(vec![PartitionedFile::new(
filename.to_string(),
0,
)]))
.with_projection(projection)
.with_file_compression_type(df_compression)
.build();
let store = Arc::new(OpendalStore::new(store.clone()));
let file_opener = file_source
.with_projection(&config)
.create_file_opener(store, &config, 0);
let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())
.context(error::BuildFileStreamSnafu)?;
Ok(Box::pin(stream))
}

View File

@@ -157,19 +157,27 @@ pub async fn stream_to_csv(
concurrency: usize,
format: &CsvFormat,
) -> Result<usize> {
stream_to_file(stream, store, path, threshold, concurrency, |buffer| {
let mut builder = WriterBuilder::new();
if let Some(timestamp_format) = &format.timestamp_format {
builder = builder.with_timestamp_format(timestamp_format.to_owned())
}
if let Some(date_format) = &format.date_format {
builder = builder.with_date_format(date_format.to_owned())
}
if let Some(time_format) = &format.time_format {
builder = builder.with_time_format(time_format.to_owned())
}
builder.build(buffer)
})
stream_to_file(
stream,
store,
path,
threshold,
concurrency,
format.compression_type,
|buffer| {
let mut builder = WriterBuilder::new();
if let Some(timestamp_format) = &format.timestamp_format {
builder = builder.with_timestamp_format(timestamp_format.to_owned())
}
if let Some(date_format) = &format.date_format {
builder = builder.with_date_format(date_format.to_owned())
}
if let Some(time_format) = &format.time_format {
builder = builder.with_time_format(time_format.to_owned())
}
builder.build(buffer)
},
)
.await
}
@@ -181,13 +189,21 @@ impl DfRecordBatchEncoder for csv::Writer<SharedBuffer> {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_test_util::find_workspace_path;
use datafusion::datasource::physical_plan::{CsvSource, FileSource};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{Float64Vector, StringVector, UInt32Vector, VectorRef};
use futures::TryStreamExt;
use super::*;
use crate::file_format::{
FORMAT_COMPRESSION_TYPE, FORMAT_DELIMITER, FORMAT_HAS_HEADER,
FORMAT_SCHEMA_INFER_MAX_RECORD, FileFormat,
FORMAT_SCHEMA_INFER_MAX_RECORD, FileFormat, file_to_stream,
};
use crate::test_util::{format_schema, test_store};
@@ -297,4 +313,166 @@ mod tests {
}
);
}
#[tokio::test]
async fn test_compressed_csv() {
// Create test data
let column_schemas = vec![
ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new("name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("value", ConcreteDataType::float64_datatype(), false),
];
let schema = Arc::new(Schema::new(column_schemas));
// Create multiple record batches with different data
let batch1_columns: Vec<VectorRef> = vec![
Arc::new(UInt32Vector::from_slice(vec![1, 2, 3])),
Arc::new(StringVector::from(vec!["Alice", "Bob", "Charlie"])),
Arc::new(Float64Vector::from_slice(vec![10.5, 20.3, 30.7])),
];
let batch1 = RecordBatch::new(schema.clone(), batch1_columns).unwrap();
let batch2_columns: Vec<VectorRef> = vec![
Arc::new(UInt32Vector::from_slice(vec![4, 5, 6])),
Arc::new(StringVector::from(vec!["David", "Eva", "Frank"])),
Arc::new(Float64Vector::from_slice(vec![40.1, 50.2, 60.3])),
];
let batch2 = RecordBatch::new(schema.clone(), batch2_columns).unwrap();
let batch3_columns: Vec<VectorRef> = vec![
Arc::new(UInt32Vector::from_slice(vec![7, 8, 9])),
Arc::new(StringVector::from(vec!["Grace", "Henry", "Ivy"])),
Arc::new(Float64Vector::from_slice(vec![70.4, 80.5, 90.6])),
];
let batch3 = RecordBatch::new(schema.clone(), batch3_columns).unwrap();
// Combine all batches into a RecordBatches collection
let recordbatches = RecordBatches::try_new(schema, vec![batch1, batch2, batch3]).unwrap();
// Test with different compression types
let compression_types = vec![
CompressionType::Gzip,
CompressionType::Bzip2,
CompressionType::Xz,
CompressionType::Zstd,
];
// Create a temporary file path
let temp_dir = common_test_util::temp_dir::create_temp_dir("test_compressed_csv");
for compression_type in compression_types {
let format = CsvFormat {
compression_type,
..CsvFormat::default()
};
// Use correct format without Debug formatter
let compressed_file_name =
format!("test_compressed_csv.{}", compression_type.file_extension());
let compressed_file_path = temp_dir.path().join(&compressed_file_name);
let compressed_file_path_str = compressed_file_path.to_str().unwrap();
// Create a simple file store for testing
let store = test_store("/");
// Export CSV with compression
let rows = stream_to_csv(
Box::pin(DfRecordBatchStreamAdapter::new(recordbatches.as_stream())),
store,
compressed_file_path_str,
1024,
1,
&format,
)
.await
.unwrap();
assert_eq!(rows, 9);
// Verify compressed file was created and has content
assert!(compressed_file_path.exists());
let file_size = std::fs::metadata(&compressed_file_path).unwrap().len();
assert!(file_size > 0);
// Verify the file is actually compressed
let file_content = std::fs::read(&compressed_file_path).unwrap();
// Compressed files should not start with CSV header
// They should have compression magic bytes
match compression_type {
CompressionType::Gzip => {
// Gzip magic bytes: 0x1f 0x8b
assert_eq!(file_content[0], 0x1f, "Gzip file should start with 0x1f");
assert_eq!(
file_content[1], 0x8b,
"Gzip file should have 0x8b as second byte"
);
}
CompressionType::Bzip2 => {
// Bzip2 magic bytes: 'BZ'
assert_eq!(file_content[0], b'B', "Bzip2 file should start with 'B'");
assert_eq!(
file_content[1], b'Z',
"Bzip2 file should have 'Z' as second byte"
);
}
CompressionType::Xz => {
// XZ magic bytes: 0xFD '7zXZ'
assert_eq!(file_content[0], 0xFD, "XZ file should start with 0xFD");
}
CompressionType::Zstd => {
// Zstd magic bytes: 0x28 0xB5 0x2F 0xFD
assert_eq!(file_content[0], 0x28, "Zstd file should start with 0x28");
assert_eq!(
file_content[1], 0xB5,
"Zstd file should have 0xB5 as second byte"
);
}
_ => {}
}
// Verify the compressed file can be decompressed and content matches original data
let store = test_store("/");
let schema = Arc::new(
CsvFormat {
compression_type,
..Default::default()
}
.infer_schema(&store, compressed_file_path_str)
.await
.unwrap(),
);
let csv_source = CsvSource::new(true, b',', b'"')
.with_schema(schema.clone())
.with_batch_size(8192);
let stream = file_to_stream(
&store,
compressed_file_path_str,
schema.clone(),
csv_source.clone(),
None,
compression_type,
)
.await
.unwrap();
let batches = stream.try_collect::<Vec<_>>().await.unwrap();
let pretty_print = arrow::util::pretty::pretty_format_batches(&batches)
.unwrap()
.to_string();
let expected = r#"+----+---------+-------+
| id | name | value |
+----+---------+-------+
| 1 | Alice | 10.5 |
| 2 | Bob | 20.3 |
| 3 | Charlie | 30.7 |
| 4 | David | 40.1 |
| 5 | Eva | 50.2 |
| 6 | Frank | 60.3 |
| 7 | Grace | 70.4 |
| 8 | Henry | 80.5 |
| 9 | Ivy | 90.6 |
+----+---------+-------+"#;
assert_eq!(expected, pretty_print);
}
}
}

View File

@@ -115,10 +115,17 @@ pub async fn stream_to_json(
path: &str,
threshold: usize,
concurrency: usize,
format: &JsonFormat,
) -> Result<usize> {
stream_to_file(stream, store, path, threshold, concurrency, |buffer| {
json::LineDelimitedWriter::new(buffer)
})
stream_to_file(
stream,
store,
path,
threshold,
concurrency,
format.compression_type,
json::LineDelimitedWriter::new,
)
.await
}
@@ -130,10 +137,21 @@ impl DfRecordBatchEncoder for json::Writer<SharedBuffer, LineDelimited> {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_test_util::find_workspace_path;
use datafusion::datasource::physical_plan::{FileSource, JsonSource};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{Float64Vector, StringVector, UInt32Vector, VectorRef};
use futures::TryStreamExt;
use super::*;
use crate::file_format::{FORMAT_COMPRESSION_TYPE, FORMAT_SCHEMA_INFER_MAX_RECORD, FileFormat};
use crate::file_format::{
FORMAT_COMPRESSION_TYPE, FORMAT_SCHEMA_INFER_MAX_RECORD, FileFormat, file_to_stream,
};
use crate::test_util::{format_schema, test_store};
fn test_data_root() -> String {
@@ -203,4 +221,165 @@ mod tests {
}
);
}
#[tokio::test]
async fn test_compressed_json() {
// Create test data
let column_schemas = vec![
ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), false),
ColumnSchema::new("name", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("value", ConcreteDataType::float64_datatype(), false),
];
let schema = Arc::new(Schema::new(column_schemas));
// Create multiple record batches with different data
let batch1_columns: Vec<VectorRef> = vec![
Arc::new(UInt32Vector::from_slice(vec![1, 2, 3])),
Arc::new(StringVector::from(vec!["Alice", "Bob", "Charlie"])),
Arc::new(Float64Vector::from_slice(vec![10.5, 20.3, 30.7])),
];
let batch1 = RecordBatch::new(schema.clone(), batch1_columns).unwrap();
let batch2_columns: Vec<VectorRef> = vec![
Arc::new(UInt32Vector::from_slice(vec![4, 5, 6])),
Arc::new(StringVector::from(vec!["David", "Eva", "Frank"])),
Arc::new(Float64Vector::from_slice(vec![40.1, 50.2, 60.3])),
];
let batch2 = RecordBatch::new(schema.clone(), batch2_columns).unwrap();
let batch3_columns: Vec<VectorRef> = vec![
Arc::new(UInt32Vector::from_slice(vec![7, 8, 9])),
Arc::new(StringVector::from(vec!["Grace", "Henry", "Ivy"])),
Arc::new(Float64Vector::from_slice(vec![70.4, 80.5, 90.6])),
];
let batch3 = RecordBatch::new(schema.clone(), batch3_columns).unwrap();
// Combine all batches into a RecordBatches collection
let recordbatches = RecordBatches::try_new(schema, vec![batch1, batch2, batch3]).unwrap();
// Test with different compression types
let compression_types = vec![
CompressionType::Gzip,
CompressionType::Bzip2,
CompressionType::Xz,
CompressionType::Zstd,
];
// Create a temporary file path
let temp_dir = common_test_util::temp_dir::create_temp_dir("test_compressed_json");
for compression_type in compression_types {
let format = JsonFormat {
compression_type,
..JsonFormat::default()
};
let compressed_file_name =
format!("test_compressed_json.{}", compression_type.file_extension());
let compressed_file_path = temp_dir.path().join(&compressed_file_name);
let compressed_file_path_str = compressed_file_path.to_str().unwrap();
// Create a simple file store for testing
let store = test_store("/");
// Export JSON with compression
let rows = stream_to_json(
Box::pin(DfRecordBatchStreamAdapter::new(recordbatches.as_stream())),
store,
compressed_file_path_str,
1024,
1,
&format,
)
.await
.unwrap();
assert_eq!(rows, 9);
// Verify compressed file was created and has content
assert!(compressed_file_path.exists());
let file_size = std::fs::metadata(&compressed_file_path).unwrap().len();
assert!(file_size > 0);
// Verify the file is actually compressed
let file_content = std::fs::read(&compressed_file_path).unwrap();
// Compressed files should not start with '{' (JSON character)
// They should have compression magic bytes
match compression_type {
CompressionType::Gzip => {
// Gzip magic bytes: 0x1f 0x8b
assert_eq!(file_content[0], 0x1f, "Gzip file should start with 0x1f");
assert_eq!(
file_content[1], 0x8b,
"Gzip file should have 0x8b as second byte"
);
}
CompressionType::Bzip2 => {
// Bzip2 magic bytes: 'BZ'
assert_eq!(file_content[0], b'B', "Bzip2 file should start with 'B'");
assert_eq!(
file_content[1], b'Z',
"Bzip2 file should have 'Z' as second byte"
);
}
CompressionType::Xz => {
// XZ magic bytes: 0xFD '7zXZ'
assert_eq!(file_content[0], 0xFD, "XZ file should start with 0xFD");
}
CompressionType::Zstd => {
// Zstd magic bytes: 0x28 0xB5 0x2F 0xFD
assert_eq!(file_content[0], 0x28, "Zstd file should start with 0x28");
assert_eq!(
file_content[1], 0xB5,
"Zstd file should have 0xB5 as second byte"
);
}
_ => {}
}
// Verify the compressed file can be decompressed and content matches original data
let store = test_store("/");
let schema = Arc::new(
JsonFormat {
compression_type,
..Default::default()
}
.infer_schema(&store, compressed_file_path_str)
.await
.unwrap(),
);
let json_source = JsonSource::new()
.with_schema(schema.clone())
.with_batch_size(8192);
let stream = file_to_stream(
&store,
compressed_file_path_str,
schema.clone(),
json_source.clone(),
None,
compression_type,
)
.await
.unwrap();
let batches = stream.try_collect::<Vec<_>>().await.unwrap();
let pretty_print = arrow::util::pretty::pretty_format_batches(&batches)
.unwrap()
.to_string();
let expected = r#"+----+---------+-------+
| id | name | value |
+----+---------+-------+
| 1 | Alice | 10.5 |
| 2 | Bob | 20.3 |
| 3 | Charlie | 30.7 |
| 4 | David | 40.1 |
| 5 | Eva | 50.2 |
| 6 | Frank | 60.3 |
| 7 | Grace | 70.4 |
| 8 | Henry | 80.5 |
| 9 | Ivy | 90.6 |
+----+---------+-------+"#;
assert_eq!(expected, pretty_print);
}
}
}

View File

@@ -16,6 +16,7 @@
#![feature(type_alias_impl_trait)]
pub mod buffered_writer;
pub mod compressed_writer;
pub mod compression;
pub mod error;
pub mod file_format;

View File

@@ -28,7 +28,7 @@ use object_store::ObjectStore;
use object_store::services::Fs;
use crate::file_format::csv::{CsvFormat, stream_to_csv};
use crate::file_format::json::stream_to_json;
use crate::file_format::json::{JsonFormat, stream_to_json};
use crate::test_util;
pub const TEST_BATCH_SIZE: usize = 100;
@@ -122,13 +122,16 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
let output_path = format!("{}/{}", dir.path().display(), "output");
let json_format = JsonFormat::default();
assert!(
stream_to_json(
Box::pin(stream),
tmp_store.clone(),
&output_path,
threshold(size),
8
8,
&json_format,
)
.await
.is_ok()

View File

@@ -578,7 +578,7 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: datafusion::error::DataFusionError,
error: common_datasource::error::Error,
},
#[snafu(display(

View File

@@ -20,8 +20,9 @@ use std::sync::Arc;
use client::{Output, OutputData, OutputMeta};
use common_base::readable_size::ReadableSize;
use common_datasource::file_format::csv::CsvFormat;
use common_datasource::file_format::json::JsonFormat;
use common_datasource::file_format::orc::{ReaderAdapter, infer_orc_schema, new_orc_stream_reader};
use common_datasource::file_format::{FileFormat, Format};
use common_datasource::file_format::{FileFormat, Format, file_to_stream};
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::{FS_SCHEMA, build_backend, parse_url};
use common_datasource::util::find_dir_and_filename;
@@ -29,14 +30,9 @@ use common_query::{OutputCost, OutputRows};
use common_recordbatch::DfSendableRecordBatchStream;
use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
use common_telemetry::{debug, tracing};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
CsvSource, FileGroup, FileScanConfigBuilder, FileSource, FileStream, JsonSource,
};
use datafusion::datasource::physical_plan::{CsvSource, FileSource, JsonSource};
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_expr::Expr;
use datatypes::arrow::compute::can_cast_types;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Schema, SchemaRef};
@@ -55,6 +51,7 @@ use crate::statement::StatementExecutor;
const DEFAULT_BATCH_SIZE: usize = 8192;
const DEFAULT_READ_BUFFER: usize = 256 * 1024;
enum FileMetadata {
Parquet {
schema: SchemaRef,
@@ -67,6 +64,7 @@ enum FileMetadata {
},
Json {
schema: SchemaRef,
format: JsonFormat,
path: String,
},
Csv {
@@ -147,6 +145,7 @@ impl StatementExecutor {
.await
.context(error::InferSchemaSnafu { path: &path })?,
),
format,
path,
}),
Format::Parquet(_) => {
@@ -195,33 +194,6 @@ impl StatementExecutor {
}
}
async fn build_file_stream(
&self,
store: &ObjectStore,
filename: &str,
file_schema: SchemaRef,
file_source: Arc<dyn FileSource>,
projection: Option<Vec<usize>>,
) -> Result<DfSendableRecordBatchStream> {
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
file_source.clone(),
)
.with_file_group(FileGroup::new(vec![PartitionedFile::new(filename, 0)]))
.with_projection(projection)
.build();
let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone()));
let file_opener = file_source
.with_projection(&config)
.create_file_opener(store, &config, 0);
let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())
.context(error::BuildFileStreamSnafu)?;
Ok(Box::pin(stream))
}
async fn build_read_stream(
&self,
compat_schema: SchemaRef,
@@ -245,16 +217,16 @@ impl StatementExecutor {
let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"')
.with_schema(schema.clone())
.with_batch_size(DEFAULT_BATCH_SIZE);
let stream = self
.build_file_stream(
object_store,
path,
schema.clone(),
csv_source,
Some(projection),
)
.await?;
let stream = file_to_stream(
object_store,
path,
schema.clone(),
csv_source,
Some(projection),
format.compression_type,
)
.await
.context(error::BuildFileStreamSnafu)?;
Ok(Box::pin(
// The projection is already applied in the CSV reader when we created the stream,
@@ -264,7 +236,11 @@ impl StatementExecutor {
.context(error::PhysicalExprSnafu)?,
))
}
FileMetadata::Json { path, schema } => {
FileMetadata::Json {
path,
format,
schema,
} => {
let output_schema = Arc::new(
compat_schema
.project(&projection)
@@ -274,16 +250,16 @@ impl StatementExecutor {
let json_source = JsonSource::new()
.with_schema(schema.clone())
.with_batch_size(DEFAULT_BATCH_SIZE);
let stream = self
.build_file_stream(
object_store,
path,
schema.clone(),
json_source,
Some(projection),
)
.await?;
let stream = file_to_stream(
object_store,
path,
schema.clone(),
json_source,
Some(projection),
format.compression_type,
)
.await
.context(error::BuildFileStreamSnafu)?;
Ok(Box::pin(
// The projection is already applied in the JSON reader when we created the stream,

View File

@@ -76,12 +76,13 @@ impl StatementExecutor {
)
.await
.context(error::WriteStreamToFileSnafu { path }),
Format::Json(_) => stream_to_json(
Format::Json(format) => stream_to_json(
Box::pin(DfRecordBatchStreamAdapter::new(stream)),
object_store,
path,
threshold,
WRITE_CONCURRENCY,
format,
)
.await
.context(error::WriteStreamToFileSnafu { path }),

View File

@@ -0,0 +1,233 @@
-- Test compressed CSV import functionality
-- First, create and export data with different compression types
CREATE TABLE test_csv_export(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
-- Insert test data
INSERT INTO test_csv_export(`id`, `name`, `value`, ts) VALUES
(1, 'Alice', 10.5, 1640995200000),
(2, 'Bob', 20.3, 1640995260000),
(3, 'Charlie', 30.7, 1640995320000),
(4, 'David', 40.1, 1640995380000),
(5, 'Eve', 50.9, 1640995440000);
Affected Rows: 5
-- Export with different compression types
COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_uncompressed.csv' WITH (format='csv');
Affected Rows: 5
COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_gzip.csv.gz' WITH (format='csv', compression_type='gzip');
Affected Rows: 5
COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_zstd.csv.zst' WITH (format='csv', compression_type='zstd');
Affected Rows: 5
COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_bzip2.csv.bz2' WITH (format='csv', compression_type='bzip2');
Affected Rows: 5
COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_xz.csv.xz' WITH (format='csv', compression_type='xz');
Affected Rows: 5
-- Test importing uncompressed CSV
CREATE TABLE test_csv_import_uncompressed(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
COPY test_csv_import_uncompressed FROM '${SQLNESS_HOME}/import/test_csv_uncompressed.csv' WITH (format='csv');
Affected Rows: 5
SELECT COUNT(*) as uncompressed_count FROM test_csv_import_uncompressed;
+--------------------+
| uncompressed_count |
+--------------------+
| 5 |
+--------------------+
-- Test importing GZIP compressed CSV
CREATE TABLE test_csv_import_gzip(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
COPY test_csv_import_gzip FROM '${SQLNESS_HOME}/import/test_csv_gzip.csv.gz' WITH (format='csv', compression_type='gzip');
Affected Rows: 5
SELECT COUNT(*) as gzip_count FROM test_csv_import_gzip;
+------------+
| gzip_count |
+------------+
| 5 |
+------------+
SELECT `id`, `name`, `value` FROM test_csv_import_gzip WHERE `id` = 1;
+----+-------+-------+
| id | name | value |
+----+-------+-------+
| 1 | Alice | 10.5 |
+----+-------+-------+
-- Test importing ZSTD compressed CSV
CREATE TABLE test_csv_import_zstd(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
COPY test_csv_import_zstd FROM '${SQLNESS_HOME}/import/test_csv_zstd.csv.zst' WITH (format='csv', compression_type='zstd');
Affected Rows: 5
SELECT COUNT(*) as zstd_count FROM test_csv_import_zstd;
+------------+
| zstd_count |
+------------+
| 5 |
+------------+
SELECT `id`, `name`, `value` FROM test_csv_import_zstd WHERE `id` = 2;
+----+------+-------+
| id | name | value |
+----+------+-------+
| 2 | Bob | 20.3 |
+----+------+-------+
-- Test importing BZIP2 compressed CSV
CREATE TABLE test_csv_import_bzip2(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
COPY test_csv_import_bzip2 FROM '${SQLNESS_HOME}/import/test_csv_bzip2.csv.bz2' WITH (format='csv', compression_type='bzip2');
Affected Rows: 5
SELECT COUNT(*) as bzip2_count FROM test_csv_import_bzip2;
+-------------+
| bzip2_count |
+-------------+
| 5 |
+-------------+
SELECT `id`, `name`, `value` FROM test_csv_import_bzip2 WHERE `id` = 3;
+----+---------+-------+
| id | name | value |
+----+---------+-------+
| 3 | Charlie | 30.7 |
+----+---------+-------+
-- Test importing XZ compressed CSV
CREATE TABLE test_csv_import_xz(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
COPY test_csv_import_xz FROM '${SQLNESS_HOME}/import/test_csv_xz.csv.xz' WITH (format='csv', compression_type='xz');
Affected Rows: 5
SELECT COUNT(*) as xz_count FROM test_csv_import_xz;
+----------+
| xz_count |
+----------+
| 5 |
+----------+
SELECT `id`, `name`, `value` FROM test_csv_import_xz WHERE `id` = 4;
+----+-------+-------+
| id | name | value |
+----+-------+-------+
| 4 | David | 40.1 |
+----+-------+-------+
-- Verify data integrity by comparing all imported tables
SELECT source, count FROM (
SELECT 'uncompressed' as source, COUNT(*) as count, 1 as order_key FROM test_csv_import_uncompressed
UNION ALL
SELECT 'gzip', COUNT(*) as count, 2 as order_key FROM test_csv_import_gzip
UNION ALL
SELECT 'zstd', COUNT(*) as count, 3 as order_key FROM test_csv_import_zstd
UNION ALL
SELECT 'bzip2', COUNT(*) as count, 4 as order_key FROM test_csv_import_bzip2
UNION ALL
SELECT 'xz', COUNT(*) as count, 5 as order_key FROM test_csv_import_xz
) AS subquery
ORDER BY order_key;
+--------------+-------+
| source | count |
+--------------+-------+
| uncompressed | 5 |
| gzip | 5 |
| zstd | 5 |
| bzip2 | 5 |
| xz | 5 |
+--------------+-------+
-- Clean up
DROP TABLE test_csv_export;
Affected Rows: 0
DROP TABLE test_csv_import_uncompressed;
Affected Rows: 0
DROP TABLE test_csv_import_gzip;
Affected Rows: 0
DROP TABLE test_csv_import_zstd;
Affected Rows: 0
DROP TABLE test_csv_import_bzip2;
Affected Rows: 0
DROP TABLE test_csv_import_xz;
Affected Rows: 0

View File

@@ -0,0 +1,109 @@
-- Test compressed CSV import functionality
-- First, create and export data with different compression types
CREATE TABLE test_csv_export(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
-- Insert test data
INSERT INTO test_csv_export(`id`, `name`, `value`, ts) VALUES
(1, 'Alice', 10.5, 1640995200000),
(2, 'Bob', 20.3, 1640995260000),
(3, 'Charlie', 30.7, 1640995320000),
(4, 'David', 40.1, 1640995380000),
(5, 'Eve', 50.9, 1640995440000);
-- Export with different compression types
COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_uncompressed.csv' WITH (format='csv');
COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_gzip.csv.gz' WITH (format='csv', compression_type='gzip');
COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_zstd.csv.zst' WITH (format='csv', compression_type='zstd');
COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_bzip2.csv.bz2' WITH (format='csv', compression_type='bzip2');
COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_xz.csv.xz' WITH (format='csv', compression_type='xz');
-- Test importing uncompressed CSV
CREATE TABLE test_csv_import_uncompressed(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
COPY test_csv_import_uncompressed FROM '${SQLNESS_HOME}/import/test_csv_uncompressed.csv' WITH (format='csv');
SELECT COUNT(*) as uncompressed_count FROM test_csv_import_uncompressed;
-- Test importing GZIP compressed CSV
CREATE TABLE test_csv_import_gzip(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
COPY test_csv_import_gzip FROM '${SQLNESS_HOME}/import/test_csv_gzip.csv.gz' WITH (format='csv', compression_type='gzip');
SELECT COUNT(*) as gzip_count FROM test_csv_import_gzip;
SELECT `id`, `name`, `value` FROM test_csv_import_gzip WHERE `id` = 1;
-- Test importing ZSTD compressed CSV
CREATE TABLE test_csv_import_zstd(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
COPY test_csv_import_zstd FROM '${SQLNESS_HOME}/import/test_csv_zstd.csv.zst' WITH (format='csv', compression_type='zstd');
SELECT COUNT(*) as zstd_count FROM test_csv_import_zstd;
SELECT `id`, `name`, `value` FROM test_csv_import_zstd WHERE `id` = 2;
-- Test importing BZIP2 compressed CSV
CREATE TABLE test_csv_import_bzip2(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
COPY test_csv_import_bzip2 FROM '${SQLNESS_HOME}/import/test_csv_bzip2.csv.bz2' WITH (format='csv', compression_type='bzip2');
SELECT COUNT(*) as bzip2_count FROM test_csv_import_bzip2;
SELECT `id`, `name`, `value` FROM test_csv_import_bzip2 WHERE `id` = 3;
-- Test importing XZ compressed CSV
CREATE TABLE test_csv_import_xz(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
COPY test_csv_import_xz FROM '${SQLNESS_HOME}/import/test_csv_xz.csv.xz' WITH (format='csv', compression_type='xz');
SELECT COUNT(*) as xz_count FROM test_csv_import_xz;
SELECT `id`, `name`, `value` FROM test_csv_import_xz WHERE `id` = 4;
-- Verify data integrity by comparing all imported tables
SELECT source, count FROM (
SELECT 'uncompressed' as source, COUNT(*) as count, 1 as order_key FROM test_csv_import_uncompressed
UNION ALL
SELECT 'gzip', COUNT(*) as count, 2 as order_key FROM test_csv_import_gzip
UNION ALL
SELECT 'zstd', COUNT(*) as count, 3 as order_key FROM test_csv_import_zstd
UNION ALL
SELECT 'bzip2', COUNT(*) as count, 4 as order_key FROM test_csv_import_bzip2
UNION ALL
SELECT 'xz', COUNT(*) as count, 5 as order_key FROM test_csv_import_xz
) AS subquery
ORDER BY order_key;
-- Clean up
DROP TABLE test_csv_export;
DROP TABLE test_csv_import_uncompressed;
DROP TABLE test_csv_import_gzip;
DROP TABLE test_csv_import_zstd;
DROP TABLE test_csv_import_bzip2;
DROP TABLE test_csv_import_xz;

View File

@@ -0,0 +1,233 @@
-- Test compressed JSON import functionality
-- First, create and export data with different compression types
CREATE TABLE test_json_export(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
-- Insert test data
INSERT INTO test_json_export(`id`, `name`, `value`, ts) VALUES
(1, 'Alice', 10.5, 1640995200000),
(2, 'Bob', 20.3, 1640995260000),
(3, 'Charlie', 30.7, 1640995320000),
(4, 'David', 40.1, 1640995380000),
(5, 'Eve', 50.9, 1640995440000);
Affected Rows: 5
-- Export with different compression types
COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_uncompressed.json' WITH (format='json');
Affected Rows: 5
COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_gzip.json.gz' WITH (format='json', compression_type='gzip');
Affected Rows: 5
COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_zstd.json.zst' WITH (format='json', compression_type='zstd');
Affected Rows: 5
COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_bzip2.json.bz2' WITH (format='json', compression_type='bzip2');
Affected Rows: 5
COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_xz.json.xz' WITH (format='json', compression_type='xz');
Affected Rows: 5
-- Test importing uncompressed JSON
CREATE TABLE test_json_import_uncompressed(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
COPY test_json_import_uncompressed FROM '${SQLNESS_HOME}/import/test_json_uncompressed.json' WITH (format='json');
Affected Rows: 5
SELECT COUNT(*) as uncompressed_count FROM test_json_import_uncompressed;
+--------------------+
| uncompressed_count |
+--------------------+
| 5 |
+--------------------+
-- Test importing GZIP compressed JSON
CREATE TABLE test_json_import_gzip(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
COPY test_json_import_gzip FROM '${SQLNESS_HOME}/import/test_json_gzip.json.gz' WITH (format='json', compression_type='gzip');
Affected Rows: 5
SELECT COUNT(*) as gzip_count FROM test_json_import_gzip;
+------------+
| gzip_count |
+------------+
| 5 |
+------------+
SELECT `id`, `name`, `value` FROM test_json_import_gzip WHERE `id` = 1;
+----+-------+-------+
| id | name | value |
+----+-------+-------+
| 1 | Alice | 10.5 |
+----+-------+-------+
-- Test importing ZSTD compressed JSON
CREATE TABLE test_json_import_zstd(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
COPY test_json_import_zstd FROM '${SQLNESS_HOME}/import/test_json_zstd.json.zst' WITH (format='json', compression_type='zstd');
Affected Rows: 5
SELECT COUNT(*) as zstd_count FROM test_json_import_zstd;
+------------+
| zstd_count |
+------------+
| 5 |
+------------+
SELECT `id`, `name`, `value` FROM test_json_import_zstd WHERE `id` = 2;
+----+------+-------+
| id | name | value |
+----+------+-------+
| 2 | Bob | 20.3 |
+----+------+-------+
-- Test importing BZIP2 compressed JSON
CREATE TABLE test_json_import_bzip2(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
COPY test_json_import_bzip2 FROM '${SQLNESS_HOME}/import/test_json_bzip2.json.bz2' WITH (format='json', compression_type='bzip2');
Affected Rows: 5
SELECT COUNT(*) as bzip2_count FROM test_json_import_bzip2;
+-------------+
| bzip2_count |
+-------------+
| 5 |
+-------------+
SELECT `id`, `name`, `value` FROM test_json_import_bzip2 WHERE `id` = 3;
+----+---------+-------+
| id | name | value |
+----+---------+-------+
| 3 | Charlie | 30.7 |
+----+---------+-------+
-- Test importing XZ compressed JSON
CREATE TABLE test_json_import_xz(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
COPY test_json_import_xz FROM '${SQLNESS_HOME}/import/test_json_xz.json.xz' WITH (format='json', compression_type='xz');
Affected Rows: 5
SELECT COUNT(*) as xz_count FROM test_json_import_xz;
+----------+
| xz_count |
+----------+
| 5 |
+----------+
SELECT `id`, `name`, `value` FROM test_json_import_xz WHERE `id` = 4;
+----+-------+-------+
| id | name | value |
+----+-------+-------+
| 4 | David | 40.1 |
+----+-------+-------+
-- Verify data integrity by comparing all imported tables
SELECT source, count FROM (
SELECT 'uncompressed' as source, COUNT(*) as count, 1 as order_key FROM test_json_import_uncompressed
UNION ALL
SELECT 'gzip', COUNT(*) as count, 2 as order_key FROM test_json_import_gzip
UNION ALL
SELECT 'zstd', COUNT(*) as count, 3 as order_key FROM test_json_import_zstd
UNION ALL
SELECT 'bzip2', COUNT(*) as count, 4 as order_key FROM test_json_import_bzip2
UNION ALL
SELECT 'xz', COUNT(*) as count, 5 as order_key FROM test_json_import_xz
) AS subquery
ORDER BY order_key;
+--------------+-------+
| source | count |
+--------------+-------+
| uncompressed | 5 |
| gzip | 5 |
| zstd | 5 |
| bzip2 | 5 |
| xz | 5 |
+--------------+-------+
-- Clean up
DROP TABLE test_json_export;
Affected Rows: 0
DROP TABLE test_json_import_uncompressed;
Affected Rows: 0
DROP TABLE test_json_import_gzip;
Affected Rows: 0
DROP TABLE test_json_import_zstd;
Affected Rows: 0
DROP TABLE test_json_import_bzip2;
Affected Rows: 0
DROP TABLE test_json_import_xz;
Affected Rows: 0

View File

@@ -0,0 +1,109 @@
-- Test compressed JSON import functionality
-- First, create and export data with different compression types
CREATE TABLE test_json_export(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
-- Insert test data
INSERT INTO test_json_export(`id`, `name`, `value`, ts) VALUES
(1, 'Alice', 10.5, 1640995200000),
(2, 'Bob', 20.3, 1640995260000),
(3, 'Charlie', 30.7, 1640995320000),
(4, 'David', 40.1, 1640995380000),
(5, 'Eve', 50.9, 1640995440000);
-- Export with different compression types
COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_uncompressed.json' WITH (format='json');
COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_gzip.json.gz' WITH (format='json', compression_type='gzip');
COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_zstd.json.zst' WITH (format='json', compression_type='zstd');
COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_bzip2.json.bz2' WITH (format='json', compression_type='bzip2');
COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_xz.json.xz' WITH (format='json', compression_type='xz');
-- Test importing uncompressed JSON
CREATE TABLE test_json_import_uncompressed(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
COPY test_json_import_uncompressed FROM '${SQLNESS_HOME}/import/test_json_uncompressed.json' WITH (format='json');
SELECT COUNT(*) as uncompressed_count FROM test_json_import_uncompressed;
-- Test importing GZIP compressed JSON
CREATE TABLE test_json_import_gzip(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
COPY test_json_import_gzip FROM '${SQLNESS_HOME}/import/test_json_gzip.json.gz' WITH (format='json', compression_type='gzip');
SELECT COUNT(*) as gzip_count FROM test_json_import_gzip;
SELECT `id`, `name`, `value` FROM test_json_import_gzip WHERE `id` = 1;
-- Test importing ZSTD compressed JSON
CREATE TABLE test_json_import_zstd(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
COPY test_json_import_zstd FROM '${SQLNESS_HOME}/import/test_json_zstd.json.zst' WITH (format='json', compression_type='zstd');
SELECT COUNT(*) as zstd_count FROM test_json_import_zstd;
SELECT `id`, `name`, `value` FROM test_json_import_zstd WHERE `id` = 2;
-- Test importing BZIP2 compressed JSON
CREATE TABLE test_json_import_bzip2(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
COPY test_json_import_bzip2 FROM '${SQLNESS_HOME}/import/test_json_bzip2.json.bz2' WITH (format='json', compression_type='bzip2');
SELECT COUNT(*) as bzip2_count FROM test_json_import_bzip2;
SELECT `id`, `name`, `value` FROM test_json_import_bzip2 WHERE `id` = 3;
-- Test importing XZ compressed JSON
CREATE TABLE test_json_import_xz(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
COPY test_json_import_xz FROM '${SQLNESS_HOME}/import/test_json_xz.json.xz' WITH (format='json', compression_type='xz');
SELECT COUNT(*) as xz_count FROM test_json_import_xz;
SELECT `id`, `name`, `value` FROM test_json_import_xz WHERE `id` = 4;
-- Verify data integrity by comparing all imported tables
SELECT source, count FROM (
SELECT 'uncompressed' as source, COUNT(*) as count, 1 as order_key FROM test_json_import_uncompressed
UNION ALL
SELECT 'gzip', COUNT(*) as count, 2 as order_key FROM test_json_import_gzip
UNION ALL
SELECT 'zstd', COUNT(*) as count, 3 as order_key FROM test_json_import_zstd
UNION ALL
SELECT 'bzip2', COUNT(*) as count, 4 as order_key FROM test_json_import_bzip2
UNION ALL
SELECT 'xz', COUNT(*) as count, 5 as order_key FROM test_json_import_xz
) AS subquery
ORDER BY order_key;
-- Clean up
DROP TABLE test_json_export;
DROP TABLE test_json_import_uncompressed;
DROP TABLE test_json_import_gzip;
DROP TABLE test_json_import_zstd;
DROP TABLE test_json_import_bzip2;
DROP TABLE test_json_import_xz;

View File

@@ -0,0 +1,65 @@
-- Test compressed CSV export functionality
CREATE TABLE test_csv_compression(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
-- Insert test data
INSERT INTO test_csv_compression(`id`, `name`, `value`, ts) VALUES
(1, 'Alice', 10.5, 1640995200000),
(2, 'Bob', 20.3, 1640995260000),
(3, 'Charlie', 30.7, 1640995320000),
(4, 'David', 40.1, 1640995380000),
(5, 'Eve', 50.9, 1640995440000);
Affected Rows: 5
-- Test uncompressed CSV export
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_uncompressed.csv' WITH (format='csv');
Affected Rows: 5
-- Test GZIP compressed CSV export
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_gzip.csv.gz' WITH (format='csv', compression_type='gzip');
Affected Rows: 5
-- Test ZSTD compressed CSV export
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_zstd.csv.zst' WITH (format='csv', compression_type='zstd');
Affected Rows: 5
-- Test BZIP2 compressed CSV export
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_bzip2.csv.bz2' WITH (format='csv', compression_type='bzip2');
Affected Rows: 5
-- Test XZ compressed CSV export
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_xz.csv.xz' WITH (format='csv', compression_type='xz');
Affected Rows: 5
-- Test compressed CSV with custom delimiter b'\t' (ASCII 9)
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_tab_separated.csv.gz' WITH (format='csv', compression_type='gzip', delimiter='9');
Affected Rows: 5
-- Test compressed CSV with timestamp format
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_timestamp_format.csv.zst' WITH (format='csv', compression_type='zstd', timestamp_format='%Y-%m-%d %H:%M:%S');
Affected Rows: 5
-- Test compressed CSV with date format
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_date_format.csv.gz' WITH (format='csv', compression_type='gzip', date_format='%Y/%m/%d');
Affected Rows: 5
-- Clean up
DROP TABLE test_csv_compression;
Affected Rows: 0

View File

@@ -0,0 +1,42 @@
-- Test compressed CSV export functionality
CREATE TABLE test_csv_compression(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
-- Insert test data
INSERT INTO test_csv_compression(`id`, `name`, `value`, ts) VALUES
(1, 'Alice', 10.5, 1640995200000),
(2, 'Bob', 20.3, 1640995260000),
(3, 'Charlie', 30.7, 1640995320000),
(4, 'David', 40.1, 1640995380000),
(5, 'Eve', 50.9, 1640995440000);
-- Test uncompressed CSV export
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_uncompressed.csv' WITH (format='csv');
-- Test GZIP compressed CSV export
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_gzip.csv.gz' WITH (format='csv', compression_type='gzip');
-- Test ZSTD compressed CSV export
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_zstd.csv.zst' WITH (format='csv', compression_type='zstd');
-- Test BZIP2 compressed CSV export
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_bzip2.csv.bz2' WITH (format='csv', compression_type='bzip2');
-- Test XZ compressed CSV export
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_xz.csv.xz' WITH (format='csv', compression_type='xz');
-- Test compressed CSV with custom delimiter b'\t' (ASCII 9)
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_tab_separated.csv.gz' WITH (format='csv', compression_type='gzip', delimiter='9');
-- Test compressed CSV with timestamp format
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_timestamp_format.csv.zst' WITH (format='csv', compression_type='zstd', timestamp_format='%Y-%m-%d %H:%M:%S');
-- Test compressed CSV with date format
COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_date_format.csv.gz' WITH (format='csv', compression_type='gzip', date_format='%Y/%m/%d');
-- Clean up
DROP TABLE test_csv_compression;

View File

@@ -0,0 +1,55 @@
-- Test compressed JSON export functionality
CREATE TABLE test_json_compression(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
-- Insert test data
INSERT INTO test_json_compression(`id`, `name`, `value`, ts) VALUES
(1, 'Alice', 10.5, 1640995200000),
(2, 'Bob', 20.3, 1640995260000),
(3, 'Charlie', 30.7, 1640995320000),
(4, 'David', 40.1, 1640995380000),
(5, 'Eve', 50.9, 1640995440000);
Affected Rows: 5
-- Test uncompressed JSON export
COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_uncompressed.json' WITH (format='json');
Affected Rows: 5
-- Test GZIP compressed JSON export
COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_gzip.json.gz' WITH (format='json', compression_type='gzip');
Affected Rows: 5
-- Test ZSTD compressed JSON export
COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_zstd.json.zst' WITH (format='json', compression_type='zstd');
Affected Rows: 5
-- Test BZIP2 compressed JSON export
COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_bzip2.json.bz2' WITH (format='json', compression_type='bzip2');
Affected Rows: 5
-- Test XZ compressed JSON export
COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_xz.json.xz' WITH (format='json', compression_type='xz');
Affected Rows: 5
-- Test compressed JSON with schema inference limit
COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_schema_limit.json.gz' WITH (format='json', compression_type='gzip', schema_infer_max_record=100);
Affected Rows: 5
-- Clean up
DROP TABLE test_json_compression;
Affected Rows: 0

View File

@@ -0,0 +1,36 @@
-- Test compressed JSON export functionality
CREATE TABLE test_json_compression(
`id` UINT32,
`name` STRING,
`value` DOUBLE,
ts TIMESTAMP TIME INDEX
);
-- Insert test data
INSERT INTO test_json_compression(`id`, `name`, `value`, ts) VALUES
(1, 'Alice', 10.5, 1640995200000),
(2, 'Bob', 20.3, 1640995260000),
(3, 'Charlie', 30.7, 1640995320000),
(4, 'David', 40.1, 1640995380000),
(5, 'Eve', 50.9, 1640995440000);
-- Test uncompressed JSON export
COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_uncompressed.json' WITH (format='json');
-- Test GZIP compressed JSON export
COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_gzip.json.gz' WITH (format='json', compression_type='gzip');
-- Test ZSTD compressed JSON export
COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_zstd.json.zst' WITH (format='json', compression_type='zstd');
-- Test BZIP2 compressed JSON export
COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_bzip2.json.bz2' WITH (format='json', compression_type='bzip2');
-- Test XZ compressed JSON export
COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_xz.json.xz' WITH (format='json', compression_type='xz');
-- Test compressed JSON with schema inference limit
COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_schema_limit.json.gz' WITH (format='json', compression_type='gzip', schema_infer_max_record=100);
-- Clean up
DROP TABLE test_json_compression;