diff --git a/src/common/datasource/src/buffered_writer.rs b/src/common/datasource/src/buffered_writer.rs index e1571b0187..953715b223 100644 --- a/src/common/datasource/src/buffered_writer.rs +++ b/src/common/datasource/src/buffered_writer.rs @@ -12,28 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::future::Future; - use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::parquet::format::FileMetaData; -use snafu::{OptionExt, ResultExt}; -use tokio::io::{AsyncWrite, AsyncWriteExt}; -use crate::error::{self, Result}; -use crate::share_buffer::SharedBuffer; - -pub struct LazyBufferedWriter { - path: String, - writer_factory: F, - writer: Option, - /// None stands for [`LazyBufferedWriter`] closed. - encoder: Option, - buffer: SharedBuffer, - rows_written: usize, - bytes_written: u64, - threshold: usize, -} +use crate::error::Result; pub trait DfRecordBatchEncoder { fn write(&mut self, batch: &RecordBatch) -> Result<()>; @@ -43,126 +26,3 @@ pub trait DfRecordBatchEncoder { pub trait ArrowWriterCloser { async fn close(mut self) -> Result; } - -impl< - T: AsyncWrite + Send + Unpin, - U: DfRecordBatchEncoder + ArrowWriterCloser, - F: Fn(String) -> Fut, - Fut: Future>, -> LazyBufferedWriter -{ - /// Closes `LazyBufferedWriter` and optionally flushes all data to underlying storage - /// if any row's been written. - pub async fn close_with_arrow_writer(mut self) -> Result<(FileMetaData, u64)> { - let encoder = self - .encoder - .take() - .context(error::BufferedWriterClosedSnafu)?; - let metadata = encoder.close().await?; - - // It's important to shut down! flushes all pending writes - self.close_inner_writer().await?; - Ok((metadata, self.bytes_written)) - } -} - -impl< - T: AsyncWrite + Send + Unpin, - U: DfRecordBatchEncoder, - F: Fn(String) -> Fut, - Fut: Future>, -> LazyBufferedWriter -{ - /// Closes the writer and flushes the buffer data. - pub async fn close_inner_writer(&mut self) -> Result<()> { - // Use `rows_written` to keep a track of if any rows have been written. - // If no row's been written, then we can simply close the underlying - // writer without flush so that no file will be actually created. - if self.rows_written != 0 { - self.bytes_written += self.try_flush(true).await?; - } - - if let Some(writer) = &mut self.writer { - writer.shutdown().await.context(error::AsyncWriteSnafu)?; - } - Ok(()) - } - - pub fn new( - threshold: usize, - buffer: SharedBuffer, - encoder: U, - path: impl AsRef, - writer_factory: F, - ) -> Self { - Self { - path: path.as_ref().to_string(), - threshold, - encoder: Some(encoder), - buffer, - rows_written: 0, - bytes_written: 0, - writer_factory, - writer: None, - } - } - - pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> { - let encoder = self - .encoder - .as_mut() - .context(error::BufferedWriterClosedSnafu)?; - encoder.write(batch)?; - self.rows_written += batch.num_rows(); - self.bytes_written += self.try_flush(false).await?; - Ok(()) - } - - async fn try_flush(&mut self, all: bool) -> Result { - let mut bytes_written: u64 = 0; - - // Once buffered data size reaches threshold, split the data in chunks (typically 4MB) - // and write to underlying storage. - while self.buffer.buffer.lock().unwrap().len() >= self.threshold { - let chunk = { - let mut buffer = self.buffer.buffer.lock().unwrap(); - buffer.split_to(self.threshold) - }; - let size = chunk.len(); - - self.maybe_init_writer() - .await? - .write_all(&chunk) - .await - .context(error::AsyncWriteSnafu)?; - - bytes_written += size as u64; - } - - if all { - bytes_written += self.try_flush_all().await?; - } - Ok(bytes_written) - } - - /// Only initiates underlying file writer when rows have been written. - async fn maybe_init_writer(&mut self) -> Result<&mut T> { - if let Some(ref mut writer) = self.writer { - Ok(writer) - } else { - let writer = (self.writer_factory)(self.path.clone()).await?; - Ok(self.writer.insert(writer)) - } - } - - async fn try_flush_all(&mut self) -> Result { - let remain = self.buffer.buffer.lock().unwrap().split(); - let size = remain.len(); - self.maybe_init_writer() - .await? - .write_all(&remain) - .await - .context(error::AsyncWriteSnafu)?; - Ok(size as u64) - } -} diff --git a/src/common/datasource/src/compressed_writer.rs b/src/common/datasource/src/compressed_writer.rs new file mode 100644 index 0000000000..afd2544f4c --- /dev/null +++ b/src/common/datasource/src/compressed_writer.rs @@ -0,0 +1,202 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use async_compression::tokio::write::{BzEncoder, GzipEncoder, XzEncoder, ZstdEncoder}; +use snafu::ResultExt; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +use crate::compression::CompressionType; +use crate::error::{self, Result}; + +/// A compressed writer that wraps an underlying async writer with compression. +/// +/// This writer supports multiple compression formats including GZIP, BZIP2, XZ, and ZSTD. +/// It provides transparent compression for any async writer implementation. +pub struct CompressedWriter { + inner: Box, + compression_type: CompressionType, +} + +impl CompressedWriter { + /// Creates a new compressed writer with the specified compression type. + /// + /// # Arguments + /// + /// * `writer` - The underlying writer to wrap with compression + /// * `compression_type` - The type of compression to apply + pub fn new( + writer: impl AsyncWrite + Unpin + Send + 'static, + compression_type: CompressionType, + ) -> Self { + let inner: Box = match compression_type { + CompressionType::Gzip => Box::new(GzipEncoder::new(writer)), + CompressionType::Bzip2 => Box::new(BzEncoder::new(writer)), + CompressionType::Xz => Box::new(XzEncoder::new(writer)), + CompressionType::Zstd => Box::new(ZstdEncoder::new(writer)), + CompressionType::Uncompressed => Box::new(writer), + }; + + Self { + inner, + compression_type, + } + } + + /// Returns the compression type used by this writer. + pub fn compression_type(&self) -> CompressionType { + self.compression_type + } + + /// Flush the writer and shutdown compression + pub async fn shutdown(mut self) -> Result<()> { + self.inner + .shutdown() + .await + .context(error::AsyncWriteSnafu)?; + Ok(()) + } +} + +impl AsyncWrite for CompressedWriter { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } +} + +/// A trait for converting async writers into compressed writers. +/// +/// This trait is automatically implemented for all types that implement [`AsyncWrite`]. +pub trait IntoCompressedWriter { + /// Converts this writer into a [`CompressedWriter`] with the specified compression type. + /// + /// # Arguments + /// + /// * `self` - The underlying writer to wrap with compression + /// * `compression_type` - The type of compression to apply + fn into_compressed_writer(self, compression_type: CompressionType) -> CompressedWriter + where + Self: AsyncWrite + Unpin + Send + 'static + Sized, + { + CompressedWriter::new(self, compression_type) + } +} + +impl IntoCompressedWriter for W {} + +#[cfg(test)] +mod tests { + use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; + + use super::*; + + #[tokio::test] + async fn test_compressed_writer_gzip() { + let (duplex_writer, mut duplex_reader) = duplex(1024); + let mut writer = duplex_writer.into_compressed_writer(CompressionType::Gzip); + let original = b"test data for gzip compression"; + + writer.write_all(original).await.unwrap(); + writer.shutdown().await.unwrap(); + + let mut buffer = Vec::new(); + duplex_reader.read_to_end(&mut buffer).await.unwrap(); + + // The compressed data should be different from the original + assert_ne!(buffer, original); + assert!(!buffer.is_empty()); + } + + #[tokio::test] + async fn test_compressed_writer_bzip2() { + let (duplex_writer, mut duplex_reader) = duplex(1024); + let mut writer = duplex_writer.into_compressed_writer(CompressionType::Bzip2); + let original = b"test data for bzip2 compression"; + + writer.write_all(original).await.unwrap(); + writer.shutdown().await.unwrap(); + + let mut buffer = Vec::new(); + duplex_reader.read_to_end(&mut buffer).await.unwrap(); + + // The compressed data should be different from the original + assert_ne!(buffer, original); + assert!(!buffer.is_empty()); + } + + #[tokio::test] + async fn test_compressed_writer_xz() { + let (duplex_writer, mut duplex_reader) = duplex(1024); + let mut writer = duplex_writer.into_compressed_writer(CompressionType::Xz); + let original = b"test data for xz compression"; + + writer.write_all(original).await.unwrap(); + writer.shutdown().await.unwrap(); + + let mut buffer = Vec::new(); + duplex_reader.read_to_end(&mut buffer).await.unwrap(); + + // The compressed data should be different from the original + assert_ne!(buffer, original); + assert!(!buffer.is_empty()); + } + + #[tokio::test] + async fn test_compressed_writer_zstd() { + let (duplex_writer, mut duplex_reader) = duplex(1024); + let mut writer = duplex_writer.into_compressed_writer(CompressionType::Zstd); + let original = b"test data for zstd compression"; + + writer.write_all(original).await.unwrap(); + writer.shutdown().await.unwrap(); + + let mut buffer = Vec::new(); + duplex_reader.read_to_end(&mut buffer).await.unwrap(); + + // The compressed data should be different from the original + assert_ne!(buffer, original); + assert!(!buffer.is_empty()); + } + + #[tokio::test] + async fn test_compressed_writer_uncompressed() { + let (duplex_writer, mut duplex_reader) = duplex(1024); + let mut writer = duplex_writer.into_compressed_writer(CompressionType::Uncompressed); + let original = b"test data for uncompressed"; + + writer.write_all(original).await.unwrap(); + writer.shutdown().await.unwrap(); + + let mut buffer = Vec::new(); + duplex_reader.read_to_end(&mut buffer).await.unwrap(); + + // Uncompressed data should be the same as the original + assert_eq!(buffer, original); + } +} diff --git a/src/common/datasource/src/error.rs b/src/common/datasource/src/error.rs index cfaa5a19c0..a8aa08e55c 100644 --- a/src/common/datasource/src/error.rs +++ b/src/common/datasource/src/error.rs @@ -194,12 +194,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Buffered writer closed"))] - BufferedWriterClosed { - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to write parquet file, path: {}", path))] WriteParquet { path: String, @@ -208,6 +202,14 @@ pub enum Error { #[snafu(source)] error: parquet::errors::ParquetError, }, + + #[snafu(display("Failed to build file stream"))] + BuildFileStream { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: datafusion::error::DataFusionError, + }, } pub type Result = std::result::Result; @@ -239,7 +241,7 @@ impl ErrorExt for Error { | ReadRecordBatch { .. } | WriteRecordBatch { .. } | EncodeRecordBatch { .. } - | BufferedWriterClosed { .. } + | BuildFileStream { .. } | OrcReader { .. } => StatusCode::Unexpected, } } diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index 7c4e8d6c88..614be170e8 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -30,12 +30,22 @@ use arrow::record_batch::RecordBatch; use arrow_schema::{ArrowError, Schema as ArrowSchema}; use async_trait::async_trait; use bytes::{Buf, Bytes}; -use datafusion::datasource::physical_plan::FileOpenFuture; +use common_recordbatch::DfSendableRecordBatchStream; +use datafusion::datasource::file_format::file_compression_type::FileCompressionType as DfCompressionType; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::datasource::physical_plan::{ + FileGroup, FileOpenFuture, FileScanConfigBuilder, FileSource, FileStream, +}; use datafusion::error::{DataFusionError, Result as DataFusionResult}; use datafusion::physical_plan::SendableRecordBatchStream; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datatypes::arrow::datatypes::SchemaRef; use futures::{StreamExt, TryStreamExt}; use object_store::ObjectStore; +use object_store_opendal::OpendalStore; use snafu::ResultExt; +use tokio::io::AsyncWriteExt; use tokio_util::compat::FuturesAsyncWriteCompatExt; use self::csv::CsvFormat; @@ -43,7 +53,8 @@ use self::json::JsonFormat; use self::orc::OrcFormat; use self::parquet::ParquetFormat; use crate::DEFAULT_WRITE_BUFFER_SIZE; -use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter}; +use crate::buffered_writer::DfRecordBatchEncoder; +use crate::compressed_writer::{CompressedWriter, IntoCompressedWriter}; use crate::compression::CompressionType; use crate::error::{self, Result}; use crate::share_buffer::SharedBuffer; @@ -195,33 +206,128 @@ pub async fn infer_schemas( ArrowSchema::try_merge(schemas).context(error::MergeSchemaSnafu) } -pub async fn stream_to_file T>( +/// Writes data to a compressed writer if the data is not empty. +/// +/// Does nothing if `data` is empty; otherwise writes all data and returns any error. +async fn write_to_compressed_writer( + compressed_writer: &mut CompressedWriter, + data: &[u8], +) -> Result<()> { + if !data.is_empty() { + compressed_writer + .write_all(data) + .await + .context(error::AsyncWriteSnafu)?; + } + Ok(()) +} + +/// Streams [SendableRecordBatchStream] to a file with optional compression support. +/// Data is buffered and flushed according to the given `threshold`. +/// Ensures that writer resources are cleanly released and that an empty file is not +/// created if no rows are written. +/// +/// Returns the total number of rows successfully written. +pub async fn stream_to_file( mut stream: SendableRecordBatchStream, store: ObjectStore, path: &str, threshold: usize, concurrency: usize, - encoder_factory: U, -) -> Result { + compression_type: CompressionType, + encoder_factory: impl Fn(SharedBuffer) -> E, +) -> Result +where + E: DfRecordBatchEncoder, +{ + // Create the file writer with OpenDAL's built-in buffering + let writer = store + .writer_with(path) + .concurrent(concurrency) + .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) + .await + .with_context(|_| error::WriteObjectSnafu { path })? + .into_futures_async_write() + .compat_write(); + + // Apply compression if needed + let mut compressed_writer = writer.into_compressed_writer(compression_type); + + // Create a buffer for the encoder let buffer = SharedBuffer::with_capacity(threshold); - let encoder = encoder_factory(buffer.clone()); - let mut writer = LazyBufferedWriter::new(threshold, buffer, encoder, path, |path| async { - store - .writer_with(&path) - .concurrent(concurrency) - .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) - .await - .map(|v| v.into_futures_async_write().compat_write()) - .context(error::WriteObjectSnafu { path }) - }); + let mut encoder = encoder_factory(buffer.clone()); let mut rows = 0; + // Process each record batch while let Some(batch) = stream.next().await { let batch = batch.context(error::ReadRecordBatchSnafu)?; - writer.write(&batch).await?; + + // Write batch using the encoder + encoder.write(&batch)?; rows += batch.num_rows(); + + loop { + let chunk = { + let mut buffer_guard = buffer.buffer.lock().unwrap(); + if buffer_guard.len() < threshold { + break; + } + buffer_guard.split_to(threshold) + }; + write_to_compressed_writer(&mut compressed_writer, &chunk).await?; + } } - writer.close_inner_writer().await?; + + // If no row's been written, just simply close the underlying writer + // without flush so that no file will be actually created. + if rows != 0 { + // Final flush of any remaining data + let final_data = { + let mut buffer_guard = buffer.buffer.lock().unwrap(); + buffer_guard.split() + }; + write_to_compressed_writer(&mut compressed_writer, &final_data).await?; + } + + // Shutdown compression and close writer + compressed_writer.shutdown().await?; + Ok(rows) } + +/// Creates a [FileStream] for reading data from a file with optional column projection +/// and compression support. +/// +/// Returns [SendableRecordBatchStream]. +pub async fn file_to_stream( + store: &ObjectStore, + filename: &str, + file_schema: SchemaRef, + file_source: Arc, + projection: Option>, + compression_type: CompressionType, +) -> Result { + let df_compression: DfCompressionType = compression_type.into(); + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + file_schema, + file_source.clone(), + ) + .with_file_group(FileGroup::new(vec![PartitionedFile::new( + filename.to_string(), + 0, + )])) + .with_projection(projection) + .with_file_compression_type(df_compression) + .build(); + + let store = Arc::new(OpendalStore::new(store.clone())); + let file_opener = file_source + .with_projection(&config) + .create_file_opener(store, &config, 0); + let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new()) + .context(error::BuildFileStreamSnafu)?; + + Ok(Box::pin(stream)) +} diff --git a/src/common/datasource/src/file_format/csv.rs b/src/common/datasource/src/file_format/csv.rs index efffce8d12..392f3ff49d 100644 --- a/src/common/datasource/src/file_format/csv.rs +++ b/src/common/datasource/src/file_format/csv.rs @@ -157,19 +157,27 @@ pub async fn stream_to_csv( concurrency: usize, format: &CsvFormat, ) -> Result { - stream_to_file(stream, store, path, threshold, concurrency, |buffer| { - let mut builder = WriterBuilder::new(); - if let Some(timestamp_format) = &format.timestamp_format { - builder = builder.with_timestamp_format(timestamp_format.to_owned()) - } - if let Some(date_format) = &format.date_format { - builder = builder.with_date_format(date_format.to_owned()) - } - if let Some(time_format) = &format.time_format { - builder = builder.with_time_format(time_format.to_owned()) - } - builder.build(buffer) - }) + stream_to_file( + stream, + store, + path, + threshold, + concurrency, + format.compression_type, + |buffer| { + let mut builder = WriterBuilder::new(); + if let Some(timestamp_format) = &format.timestamp_format { + builder = builder.with_timestamp_format(timestamp_format.to_owned()) + } + if let Some(date_format) = &format.date_format { + builder = builder.with_date_format(date_format.to_owned()) + } + if let Some(time_format) = &format.time_format { + builder = builder.with_time_format(time_format.to_owned()) + } + builder.build(buffer) + }, + ) .await } @@ -181,13 +189,21 @@ impl DfRecordBatchEncoder for csv::Writer { #[cfg(test)] mod tests { + use std::sync::Arc; + use common_recordbatch::adapter::DfRecordBatchStreamAdapter; + use common_recordbatch::{RecordBatch, RecordBatches}; use common_test_util::find_workspace_path; + use datafusion::datasource::physical_plan::{CsvSource, FileSource}; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::vectors::{Float64Vector, StringVector, UInt32Vector, VectorRef}; + use futures::TryStreamExt; use super::*; use crate::file_format::{ FORMAT_COMPRESSION_TYPE, FORMAT_DELIMITER, FORMAT_HAS_HEADER, - FORMAT_SCHEMA_INFER_MAX_RECORD, FileFormat, + FORMAT_SCHEMA_INFER_MAX_RECORD, FileFormat, file_to_stream, }; use crate::test_util::{format_schema, test_store}; @@ -297,4 +313,166 @@ mod tests { } ); } + + #[tokio::test] + async fn test_compressed_csv() { + // Create test data + let column_schemas = vec![ + ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), false), + ColumnSchema::new("name", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("value", ConcreteDataType::float64_datatype(), false), + ]; + let schema = Arc::new(Schema::new(column_schemas)); + + // Create multiple record batches with different data + let batch1_columns: Vec = vec![ + Arc::new(UInt32Vector::from_slice(vec![1, 2, 3])), + Arc::new(StringVector::from(vec!["Alice", "Bob", "Charlie"])), + Arc::new(Float64Vector::from_slice(vec![10.5, 20.3, 30.7])), + ]; + let batch1 = RecordBatch::new(schema.clone(), batch1_columns).unwrap(); + + let batch2_columns: Vec = vec![ + Arc::new(UInt32Vector::from_slice(vec![4, 5, 6])), + Arc::new(StringVector::from(vec!["David", "Eva", "Frank"])), + Arc::new(Float64Vector::from_slice(vec![40.1, 50.2, 60.3])), + ]; + let batch2 = RecordBatch::new(schema.clone(), batch2_columns).unwrap(); + + let batch3_columns: Vec = vec![ + Arc::new(UInt32Vector::from_slice(vec![7, 8, 9])), + Arc::new(StringVector::from(vec!["Grace", "Henry", "Ivy"])), + Arc::new(Float64Vector::from_slice(vec![70.4, 80.5, 90.6])), + ]; + let batch3 = RecordBatch::new(schema.clone(), batch3_columns).unwrap(); + + // Combine all batches into a RecordBatches collection + let recordbatches = RecordBatches::try_new(schema, vec![batch1, batch2, batch3]).unwrap(); + + // Test with different compression types + let compression_types = vec![ + CompressionType::Gzip, + CompressionType::Bzip2, + CompressionType::Xz, + CompressionType::Zstd, + ]; + + // Create a temporary file path + let temp_dir = common_test_util::temp_dir::create_temp_dir("test_compressed_csv"); + for compression_type in compression_types { + let format = CsvFormat { + compression_type, + ..CsvFormat::default() + }; + + // Use correct format without Debug formatter + let compressed_file_name = + format!("test_compressed_csv.{}", compression_type.file_extension()); + let compressed_file_path = temp_dir.path().join(&compressed_file_name); + let compressed_file_path_str = compressed_file_path.to_str().unwrap(); + + // Create a simple file store for testing + let store = test_store("/"); + + // Export CSV with compression + let rows = stream_to_csv( + Box::pin(DfRecordBatchStreamAdapter::new(recordbatches.as_stream())), + store, + compressed_file_path_str, + 1024, + 1, + &format, + ) + .await + .unwrap(); + + assert_eq!(rows, 9); + + // Verify compressed file was created and has content + assert!(compressed_file_path.exists()); + let file_size = std::fs::metadata(&compressed_file_path).unwrap().len(); + assert!(file_size > 0); + + // Verify the file is actually compressed + let file_content = std::fs::read(&compressed_file_path).unwrap(); + // Compressed files should not start with CSV header + // They should have compression magic bytes + match compression_type { + CompressionType::Gzip => { + // Gzip magic bytes: 0x1f 0x8b + assert_eq!(file_content[0], 0x1f, "Gzip file should start with 0x1f"); + assert_eq!( + file_content[1], 0x8b, + "Gzip file should have 0x8b as second byte" + ); + } + CompressionType::Bzip2 => { + // Bzip2 magic bytes: 'BZ' + assert_eq!(file_content[0], b'B', "Bzip2 file should start with 'B'"); + assert_eq!( + file_content[1], b'Z', + "Bzip2 file should have 'Z' as second byte" + ); + } + CompressionType::Xz => { + // XZ magic bytes: 0xFD '7zXZ' + assert_eq!(file_content[0], 0xFD, "XZ file should start with 0xFD"); + } + CompressionType::Zstd => { + // Zstd magic bytes: 0x28 0xB5 0x2F 0xFD + assert_eq!(file_content[0], 0x28, "Zstd file should start with 0x28"); + assert_eq!( + file_content[1], 0xB5, + "Zstd file should have 0xB5 as second byte" + ); + } + _ => {} + } + + // Verify the compressed file can be decompressed and content matches original data + let store = test_store("/"); + let schema = Arc::new( + CsvFormat { + compression_type, + ..Default::default() + } + .infer_schema(&store, compressed_file_path_str) + .await + .unwrap(), + ); + let csv_source = CsvSource::new(true, b',', b'"') + .with_schema(schema.clone()) + .with_batch_size(8192); + + let stream = file_to_stream( + &store, + compressed_file_path_str, + schema.clone(), + csv_source.clone(), + None, + compression_type, + ) + .await + .unwrap(); + + let batches = stream.try_collect::>().await.unwrap(); + let pretty_print = arrow::util::pretty::pretty_format_batches(&batches) + .unwrap() + .to_string(); + let expected = r#"+----+---------+-------+ +| id | name | value | ++----+---------+-------+ +| 1 | Alice | 10.5 | +| 2 | Bob | 20.3 | +| 3 | Charlie | 30.7 | +| 4 | David | 40.1 | +| 5 | Eva | 50.2 | +| 6 | Frank | 60.3 | +| 7 | Grace | 70.4 | +| 8 | Henry | 80.5 | +| 9 | Ivy | 90.6 | ++----+---------+-------+"#; + assert_eq!(expected, pretty_print); + } + } } diff --git a/src/common/datasource/src/file_format/json.rs b/src/common/datasource/src/file_format/json.rs index c234eec846..cafcd71372 100644 --- a/src/common/datasource/src/file_format/json.rs +++ b/src/common/datasource/src/file_format/json.rs @@ -115,10 +115,17 @@ pub async fn stream_to_json( path: &str, threshold: usize, concurrency: usize, + format: &JsonFormat, ) -> Result { - stream_to_file(stream, store, path, threshold, concurrency, |buffer| { - json::LineDelimitedWriter::new(buffer) - }) + stream_to_file( + stream, + store, + path, + threshold, + concurrency, + format.compression_type, + json::LineDelimitedWriter::new, + ) .await } @@ -130,10 +137,21 @@ impl DfRecordBatchEncoder for json::Writer { #[cfg(test)] mod tests { + use std::sync::Arc; + + use common_recordbatch::adapter::DfRecordBatchStreamAdapter; + use common_recordbatch::{RecordBatch, RecordBatches}; use common_test_util::find_workspace_path; + use datafusion::datasource::physical_plan::{FileSource, JsonSource}; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::vectors::{Float64Vector, StringVector, UInt32Vector, VectorRef}; + use futures::TryStreamExt; use super::*; - use crate::file_format::{FORMAT_COMPRESSION_TYPE, FORMAT_SCHEMA_INFER_MAX_RECORD, FileFormat}; + use crate::file_format::{ + FORMAT_COMPRESSION_TYPE, FORMAT_SCHEMA_INFER_MAX_RECORD, FileFormat, file_to_stream, + }; use crate::test_util::{format_schema, test_store}; fn test_data_root() -> String { @@ -203,4 +221,165 @@ mod tests { } ); } + + #[tokio::test] + async fn test_compressed_json() { + // Create test data + let column_schemas = vec![ + ColumnSchema::new("id", ConcreteDataType::uint32_datatype(), false), + ColumnSchema::new("name", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("value", ConcreteDataType::float64_datatype(), false), + ]; + let schema = Arc::new(Schema::new(column_schemas)); + + // Create multiple record batches with different data + let batch1_columns: Vec = vec![ + Arc::new(UInt32Vector::from_slice(vec![1, 2, 3])), + Arc::new(StringVector::from(vec!["Alice", "Bob", "Charlie"])), + Arc::new(Float64Vector::from_slice(vec![10.5, 20.3, 30.7])), + ]; + let batch1 = RecordBatch::new(schema.clone(), batch1_columns).unwrap(); + + let batch2_columns: Vec = vec![ + Arc::new(UInt32Vector::from_slice(vec![4, 5, 6])), + Arc::new(StringVector::from(vec!["David", "Eva", "Frank"])), + Arc::new(Float64Vector::from_slice(vec![40.1, 50.2, 60.3])), + ]; + let batch2 = RecordBatch::new(schema.clone(), batch2_columns).unwrap(); + + let batch3_columns: Vec = vec![ + Arc::new(UInt32Vector::from_slice(vec![7, 8, 9])), + Arc::new(StringVector::from(vec!["Grace", "Henry", "Ivy"])), + Arc::new(Float64Vector::from_slice(vec![70.4, 80.5, 90.6])), + ]; + let batch3 = RecordBatch::new(schema.clone(), batch3_columns).unwrap(); + + // Combine all batches into a RecordBatches collection + let recordbatches = RecordBatches::try_new(schema, vec![batch1, batch2, batch3]).unwrap(); + + // Test with different compression types + let compression_types = vec![ + CompressionType::Gzip, + CompressionType::Bzip2, + CompressionType::Xz, + CompressionType::Zstd, + ]; + + // Create a temporary file path + let temp_dir = common_test_util::temp_dir::create_temp_dir("test_compressed_json"); + for compression_type in compression_types { + let format = JsonFormat { + compression_type, + ..JsonFormat::default() + }; + + let compressed_file_name = + format!("test_compressed_json.{}", compression_type.file_extension()); + let compressed_file_path = temp_dir.path().join(&compressed_file_name); + let compressed_file_path_str = compressed_file_path.to_str().unwrap(); + + // Create a simple file store for testing + let store = test_store("/"); + + // Export JSON with compression + let rows = stream_to_json( + Box::pin(DfRecordBatchStreamAdapter::new(recordbatches.as_stream())), + store, + compressed_file_path_str, + 1024, + 1, + &format, + ) + .await + .unwrap(); + + assert_eq!(rows, 9); + + // Verify compressed file was created and has content + assert!(compressed_file_path.exists()); + let file_size = std::fs::metadata(&compressed_file_path).unwrap().len(); + assert!(file_size > 0); + + // Verify the file is actually compressed + let file_content = std::fs::read(&compressed_file_path).unwrap(); + // Compressed files should not start with '{' (JSON character) + // They should have compression magic bytes + match compression_type { + CompressionType::Gzip => { + // Gzip magic bytes: 0x1f 0x8b + assert_eq!(file_content[0], 0x1f, "Gzip file should start with 0x1f"); + assert_eq!( + file_content[1], 0x8b, + "Gzip file should have 0x8b as second byte" + ); + } + CompressionType::Bzip2 => { + // Bzip2 magic bytes: 'BZ' + assert_eq!(file_content[0], b'B', "Bzip2 file should start with 'B'"); + assert_eq!( + file_content[1], b'Z', + "Bzip2 file should have 'Z' as second byte" + ); + } + CompressionType::Xz => { + // XZ magic bytes: 0xFD '7zXZ' + assert_eq!(file_content[0], 0xFD, "XZ file should start with 0xFD"); + } + CompressionType::Zstd => { + // Zstd magic bytes: 0x28 0xB5 0x2F 0xFD + assert_eq!(file_content[0], 0x28, "Zstd file should start with 0x28"); + assert_eq!( + file_content[1], 0xB5, + "Zstd file should have 0xB5 as second byte" + ); + } + _ => {} + } + + // Verify the compressed file can be decompressed and content matches original data + let store = test_store("/"); + let schema = Arc::new( + JsonFormat { + compression_type, + ..Default::default() + } + .infer_schema(&store, compressed_file_path_str) + .await + .unwrap(), + ); + let json_source = JsonSource::new() + .with_schema(schema.clone()) + .with_batch_size(8192); + + let stream = file_to_stream( + &store, + compressed_file_path_str, + schema.clone(), + json_source.clone(), + None, + compression_type, + ) + .await + .unwrap(); + + let batches = stream.try_collect::>().await.unwrap(); + let pretty_print = arrow::util::pretty::pretty_format_batches(&batches) + .unwrap() + .to_string(); + let expected = r#"+----+---------+-------+ +| id | name | value | ++----+---------+-------+ +| 1 | Alice | 10.5 | +| 2 | Bob | 20.3 | +| 3 | Charlie | 30.7 | +| 4 | David | 40.1 | +| 5 | Eva | 50.2 | +| 6 | Frank | 60.3 | +| 7 | Grace | 70.4 | +| 8 | Henry | 80.5 | +| 9 | Ivy | 90.6 | ++----+---------+-------+"#; + assert_eq!(expected, pretty_print); + } + } } diff --git a/src/common/datasource/src/lib.rs b/src/common/datasource/src/lib.rs index 72e94c7f36..91663ce22c 100644 --- a/src/common/datasource/src/lib.rs +++ b/src/common/datasource/src/lib.rs @@ -16,6 +16,7 @@ #![feature(type_alias_impl_trait)] pub mod buffered_writer; +pub mod compressed_writer; pub mod compression; pub mod error; pub mod file_format; diff --git a/src/common/datasource/src/test_util.rs b/src/common/datasource/src/test_util.rs index f3f813be34..244df3b7a5 100644 --- a/src/common/datasource/src/test_util.rs +++ b/src/common/datasource/src/test_util.rs @@ -28,7 +28,7 @@ use object_store::ObjectStore; use object_store::services::Fs; use crate::file_format::csv::{CsvFormat, stream_to_csv}; -use crate::file_format::json::stream_to_json; +use crate::file_format::json::{JsonFormat, stream_to_json}; use crate::test_util; pub const TEST_BATCH_SIZE: usize = 100; @@ -122,13 +122,16 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi let output_path = format!("{}/{}", dir.path().display(), "output"); + let json_format = JsonFormat::default(); + assert!( stream_to_json( Box::pin(stream), tmp_store.clone(), &output_path, threshold(size), - 8 + 8, + &json_format, ) .await .is_ok() diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 2ba71444e7..68576db582 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -578,7 +578,7 @@ pub enum Error { #[snafu(implicit)] location: Location, #[snafu(source)] - error: datafusion::error::DataFusionError, + error: common_datasource::error::Error, }, #[snafu(display( diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index da120ff1bf..35cfdc7830 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -20,8 +20,9 @@ use std::sync::Arc; use client::{Output, OutputData, OutputMeta}; use common_base::readable_size::ReadableSize; use common_datasource::file_format::csv::CsvFormat; +use common_datasource::file_format::json::JsonFormat; use common_datasource::file_format::orc::{ReaderAdapter, infer_orc_schema, new_orc_stream_reader}; -use common_datasource::file_format::{FileFormat, Format}; +use common_datasource::file_format::{FileFormat, Format, file_to_stream}; use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::{FS_SCHEMA, build_backend, parse_url}; use common_datasource::util::find_dir_and_filename; @@ -29,14 +30,9 @@ use common_query::{OutputCost, OutputRows}; use common_recordbatch::DfSendableRecordBatchStream; use common_recordbatch::adapter::RecordBatchStreamTypeAdapter; use common_telemetry::{debug, tracing}; -use datafusion::datasource::listing::PartitionedFile; -use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::datasource::physical_plan::{ - CsvSource, FileGroup, FileScanConfigBuilder, FileSource, FileStream, JsonSource, -}; +use datafusion::datasource::physical_plan::{CsvSource, FileSource, JsonSource}; use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder; use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata; -use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_expr::Expr; use datatypes::arrow::compute::can_cast_types; use datatypes::arrow::datatypes::{DataType as ArrowDataType, Schema, SchemaRef}; @@ -55,6 +51,7 @@ use crate::statement::StatementExecutor; const DEFAULT_BATCH_SIZE: usize = 8192; const DEFAULT_READ_BUFFER: usize = 256 * 1024; + enum FileMetadata { Parquet { schema: SchemaRef, @@ -67,6 +64,7 @@ enum FileMetadata { }, Json { schema: SchemaRef, + format: JsonFormat, path: String, }, Csv { @@ -147,6 +145,7 @@ impl StatementExecutor { .await .context(error::InferSchemaSnafu { path: &path })?, ), + format, path, }), Format::Parquet(_) => { @@ -195,33 +194,6 @@ impl StatementExecutor { } } - async fn build_file_stream( - &self, - store: &ObjectStore, - filename: &str, - file_schema: SchemaRef, - file_source: Arc, - projection: Option>, - ) -> Result { - let config = FileScanConfigBuilder::new( - ObjectStoreUrl::local_filesystem(), - file_schema, - file_source.clone(), - ) - .with_file_group(FileGroup::new(vec![PartitionedFile::new(filename, 0)])) - .with_projection(projection) - .build(); - - let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone())); - let file_opener = file_source - .with_projection(&config) - .create_file_opener(store, &config, 0); - let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new()) - .context(error::BuildFileStreamSnafu)?; - - Ok(Box::pin(stream)) - } - async fn build_read_stream( &self, compat_schema: SchemaRef, @@ -245,16 +217,16 @@ impl StatementExecutor { let csv_source = CsvSource::new(format.has_header, format.delimiter, b'"') .with_schema(schema.clone()) .with_batch_size(DEFAULT_BATCH_SIZE); - - let stream = self - .build_file_stream( - object_store, - path, - schema.clone(), - csv_source, - Some(projection), - ) - .await?; + let stream = file_to_stream( + object_store, + path, + schema.clone(), + csv_source, + Some(projection), + format.compression_type, + ) + .await + .context(error::BuildFileStreamSnafu)?; Ok(Box::pin( // The projection is already applied in the CSV reader when we created the stream, @@ -264,7 +236,11 @@ impl StatementExecutor { .context(error::PhysicalExprSnafu)?, )) } - FileMetadata::Json { path, schema } => { + FileMetadata::Json { + path, + format, + schema, + } => { let output_schema = Arc::new( compat_schema .project(&projection) @@ -274,16 +250,16 @@ impl StatementExecutor { let json_source = JsonSource::new() .with_schema(schema.clone()) .with_batch_size(DEFAULT_BATCH_SIZE); - - let stream = self - .build_file_stream( - object_store, - path, - schema.clone(), - json_source, - Some(projection), - ) - .await?; + let stream = file_to_stream( + object_store, + path, + schema.clone(), + json_source, + Some(projection), + format.compression_type, + ) + .await + .context(error::BuildFileStreamSnafu)?; Ok(Box::pin( // The projection is already applied in the JSON reader when we created the stream, diff --git a/src/operator/src/statement/copy_table_to.rs b/src/operator/src/statement/copy_table_to.rs index d542f8acbb..3e982373c4 100644 --- a/src/operator/src/statement/copy_table_to.rs +++ b/src/operator/src/statement/copy_table_to.rs @@ -76,12 +76,13 @@ impl StatementExecutor { ) .await .context(error::WriteStreamToFileSnafu { path }), - Format::Json(_) => stream_to_json( + Format::Json(format) => stream_to_json( Box::pin(DfRecordBatchStreamAdapter::new(stream)), object_store, path, threshold, WRITE_CONCURRENCY, + format, ) .await .context(error::WriteStreamToFileSnafu { path }), diff --git a/tests/cases/standalone/common/copy/copy_from_csv_compressed.result b/tests/cases/standalone/common/copy/copy_from_csv_compressed.result new file mode 100644 index 0000000000..223bbfc26f --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_from_csv_compressed.result @@ -0,0 +1,233 @@ +-- Test compressed CSV import functionality +-- First, create and export data with different compression types +CREATE TABLE test_csv_export( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +-- Insert test data +INSERT INTO test_csv_export(`id`, `name`, `value`, ts) VALUES + (1, 'Alice', 10.5, 1640995200000), + (2, 'Bob', 20.3, 1640995260000), + (3, 'Charlie', 30.7, 1640995320000), + (4, 'David', 40.1, 1640995380000), + (5, 'Eve', 50.9, 1640995440000); + +Affected Rows: 5 + +-- Export with different compression types +COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_uncompressed.csv' WITH (format='csv'); + +Affected Rows: 5 + +COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_gzip.csv.gz' WITH (format='csv', compression_type='gzip'); + +Affected Rows: 5 + +COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_zstd.csv.zst' WITH (format='csv', compression_type='zstd'); + +Affected Rows: 5 + +COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_bzip2.csv.bz2' WITH (format='csv', compression_type='bzip2'); + +Affected Rows: 5 + +COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_xz.csv.xz' WITH (format='csv', compression_type='xz'); + +Affected Rows: 5 + +-- Test importing uncompressed CSV +CREATE TABLE test_csv_import_uncompressed( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +COPY test_csv_import_uncompressed FROM '${SQLNESS_HOME}/import/test_csv_uncompressed.csv' WITH (format='csv'); + +Affected Rows: 5 + +SELECT COUNT(*) as uncompressed_count FROM test_csv_import_uncompressed; + ++--------------------+ +| uncompressed_count | ++--------------------+ +| 5 | ++--------------------+ + +-- Test importing GZIP compressed CSV +CREATE TABLE test_csv_import_gzip( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +COPY test_csv_import_gzip FROM '${SQLNESS_HOME}/import/test_csv_gzip.csv.gz' WITH (format='csv', compression_type='gzip'); + +Affected Rows: 5 + +SELECT COUNT(*) as gzip_count FROM test_csv_import_gzip; + ++------------+ +| gzip_count | ++------------+ +| 5 | ++------------+ + +SELECT `id`, `name`, `value` FROM test_csv_import_gzip WHERE `id` = 1; + ++----+-------+-------+ +| id | name | value | ++----+-------+-------+ +| 1 | Alice | 10.5 | ++----+-------+-------+ + +-- Test importing ZSTD compressed CSV +CREATE TABLE test_csv_import_zstd( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +COPY test_csv_import_zstd FROM '${SQLNESS_HOME}/import/test_csv_zstd.csv.zst' WITH (format='csv', compression_type='zstd'); + +Affected Rows: 5 + +SELECT COUNT(*) as zstd_count FROM test_csv_import_zstd; + ++------------+ +| zstd_count | ++------------+ +| 5 | ++------------+ + +SELECT `id`, `name`, `value` FROM test_csv_import_zstd WHERE `id` = 2; + ++----+------+-------+ +| id | name | value | ++----+------+-------+ +| 2 | Bob | 20.3 | ++----+------+-------+ + +-- Test importing BZIP2 compressed CSV +CREATE TABLE test_csv_import_bzip2( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +COPY test_csv_import_bzip2 FROM '${SQLNESS_HOME}/import/test_csv_bzip2.csv.bz2' WITH (format='csv', compression_type='bzip2'); + +Affected Rows: 5 + +SELECT COUNT(*) as bzip2_count FROM test_csv_import_bzip2; + ++-------------+ +| bzip2_count | ++-------------+ +| 5 | ++-------------+ + +SELECT `id`, `name`, `value` FROM test_csv_import_bzip2 WHERE `id` = 3; + ++----+---------+-------+ +| id | name | value | ++----+---------+-------+ +| 3 | Charlie | 30.7 | ++----+---------+-------+ + +-- Test importing XZ compressed CSV +CREATE TABLE test_csv_import_xz( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +COPY test_csv_import_xz FROM '${SQLNESS_HOME}/import/test_csv_xz.csv.xz' WITH (format='csv', compression_type='xz'); + +Affected Rows: 5 + +SELECT COUNT(*) as xz_count FROM test_csv_import_xz; + ++----------+ +| xz_count | ++----------+ +| 5 | ++----------+ + +SELECT `id`, `name`, `value` FROM test_csv_import_xz WHERE `id` = 4; + ++----+-------+-------+ +| id | name | value | ++----+-------+-------+ +| 4 | David | 40.1 | ++----+-------+-------+ + +-- Verify data integrity by comparing all imported tables +SELECT source, count FROM ( + SELECT 'uncompressed' as source, COUNT(*) as count, 1 as order_key FROM test_csv_import_uncompressed + UNION ALL + SELECT 'gzip', COUNT(*) as count, 2 as order_key FROM test_csv_import_gzip + UNION ALL + SELECT 'zstd', COUNT(*) as count, 3 as order_key FROM test_csv_import_zstd + UNION ALL + SELECT 'bzip2', COUNT(*) as count, 4 as order_key FROM test_csv_import_bzip2 + UNION ALL + SELECT 'xz', COUNT(*) as count, 5 as order_key FROM test_csv_import_xz +) AS subquery +ORDER BY order_key; + ++--------------+-------+ +| source | count | ++--------------+-------+ +| uncompressed | 5 | +| gzip | 5 | +| zstd | 5 | +| bzip2 | 5 | +| xz | 5 | ++--------------+-------+ + +-- Clean up +DROP TABLE test_csv_export; + +Affected Rows: 0 + +DROP TABLE test_csv_import_uncompressed; + +Affected Rows: 0 + +DROP TABLE test_csv_import_gzip; + +Affected Rows: 0 + +DROP TABLE test_csv_import_zstd; + +Affected Rows: 0 + +DROP TABLE test_csv_import_bzip2; + +Affected Rows: 0 + +DROP TABLE test_csv_import_xz; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/copy/copy_from_csv_compressed.sql b/tests/cases/standalone/common/copy/copy_from_csv_compressed.sql new file mode 100644 index 0000000000..8be260b69a --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_from_csv_compressed.sql @@ -0,0 +1,109 @@ +-- Test compressed CSV import functionality +-- First, create and export data with different compression types +CREATE TABLE test_csv_export( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +-- Insert test data +INSERT INTO test_csv_export(`id`, `name`, `value`, ts) VALUES + (1, 'Alice', 10.5, 1640995200000), + (2, 'Bob', 20.3, 1640995260000), + (3, 'Charlie', 30.7, 1640995320000), + (4, 'David', 40.1, 1640995380000), + (5, 'Eve', 50.9, 1640995440000); + +-- Export with different compression types +COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_uncompressed.csv' WITH (format='csv'); +COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_gzip.csv.gz' WITH (format='csv', compression_type='gzip'); +COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_zstd.csv.zst' WITH (format='csv', compression_type='zstd'); +COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_bzip2.csv.bz2' WITH (format='csv', compression_type='bzip2'); +COPY test_csv_export TO '${SQLNESS_HOME}/import/test_csv_xz.csv.xz' WITH (format='csv', compression_type='xz'); + +-- Test importing uncompressed CSV +CREATE TABLE test_csv_import_uncompressed( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +COPY test_csv_import_uncompressed FROM '${SQLNESS_HOME}/import/test_csv_uncompressed.csv' WITH (format='csv'); + +SELECT COUNT(*) as uncompressed_count FROM test_csv_import_uncompressed; + +-- Test importing GZIP compressed CSV +CREATE TABLE test_csv_import_gzip( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +COPY test_csv_import_gzip FROM '${SQLNESS_HOME}/import/test_csv_gzip.csv.gz' WITH (format='csv', compression_type='gzip'); + +SELECT COUNT(*) as gzip_count FROM test_csv_import_gzip; +SELECT `id`, `name`, `value` FROM test_csv_import_gzip WHERE `id` = 1; + +-- Test importing ZSTD compressed CSV +CREATE TABLE test_csv_import_zstd( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +COPY test_csv_import_zstd FROM '${SQLNESS_HOME}/import/test_csv_zstd.csv.zst' WITH (format='csv', compression_type='zstd'); + +SELECT COUNT(*) as zstd_count FROM test_csv_import_zstd; +SELECT `id`, `name`, `value` FROM test_csv_import_zstd WHERE `id` = 2; + +-- Test importing BZIP2 compressed CSV +CREATE TABLE test_csv_import_bzip2( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +COPY test_csv_import_bzip2 FROM '${SQLNESS_HOME}/import/test_csv_bzip2.csv.bz2' WITH (format='csv', compression_type='bzip2'); + +SELECT COUNT(*) as bzip2_count FROM test_csv_import_bzip2; +SELECT `id`, `name`, `value` FROM test_csv_import_bzip2 WHERE `id` = 3; + +-- Test importing XZ compressed CSV +CREATE TABLE test_csv_import_xz( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +COPY test_csv_import_xz FROM '${SQLNESS_HOME}/import/test_csv_xz.csv.xz' WITH (format='csv', compression_type='xz'); + +SELECT COUNT(*) as xz_count FROM test_csv_import_xz; +SELECT `id`, `name`, `value` FROM test_csv_import_xz WHERE `id` = 4; + +-- Verify data integrity by comparing all imported tables +SELECT source, count FROM ( + SELECT 'uncompressed' as source, COUNT(*) as count, 1 as order_key FROM test_csv_import_uncompressed + UNION ALL + SELECT 'gzip', COUNT(*) as count, 2 as order_key FROM test_csv_import_gzip + UNION ALL + SELECT 'zstd', COUNT(*) as count, 3 as order_key FROM test_csv_import_zstd + UNION ALL + SELECT 'bzip2', COUNT(*) as count, 4 as order_key FROM test_csv_import_bzip2 + UNION ALL + SELECT 'xz', COUNT(*) as count, 5 as order_key FROM test_csv_import_xz +) AS subquery +ORDER BY order_key; + +-- Clean up +DROP TABLE test_csv_export; +DROP TABLE test_csv_import_uncompressed; +DROP TABLE test_csv_import_gzip; +DROP TABLE test_csv_import_zstd; +DROP TABLE test_csv_import_bzip2; +DROP TABLE test_csv_import_xz; diff --git a/tests/cases/standalone/common/copy/copy_from_json_compressed.result b/tests/cases/standalone/common/copy/copy_from_json_compressed.result new file mode 100644 index 0000000000..28b64f328c --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_from_json_compressed.result @@ -0,0 +1,233 @@ +-- Test compressed JSON import functionality +-- First, create and export data with different compression types +CREATE TABLE test_json_export( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +-- Insert test data +INSERT INTO test_json_export(`id`, `name`, `value`, ts) VALUES + (1, 'Alice', 10.5, 1640995200000), + (2, 'Bob', 20.3, 1640995260000), + (3, 'Charlie', 30.7, 1640995320000), + (4, 'David', 40.1, 1640995380000), + (5, 'Eve', 50.9, 1640995440000); + +Affected Rows: 5 + +-- Export with different compression types +COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_uncompressed.json' WITH (format='json'); + +Affected Rows: 5 + +COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_gzip.json.gz' WITH (format='json', compression_type='gzip'); + +Affected Rows: 5 + +COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_zstd.json.zst' WITH (format='json', compression_type='zstd'); + +Affected Rows: 5 + +COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_bzip2.json.bz2' WITH (format='json', compression_type='bzip2'); + +Affected Rows: 5 + +COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_xz.json.xz' WITH (format='json', compression_type='xz'); + +Affected Rows: 5 + +-- Test importing uncompressed JSON +CREATE TABLE test_json_import_uncompressed( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +COPY test_json_import_uncompressed FROM '${SQLNESS_HOME}/import/test_json_uncompressed.json' WITH (format='json'); + +Affected Rows: 5 + +SELECT COUNT(*) as uncompressed_count FROM test_json_import_uncompressed; + ++--------------------+ +| uncompressed_count | ++--------------------+ +| 5 | ++--------------------+ + +-- Test importing GZIP compressed JSON +CREATE TABLE test_json_import_gzip( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +COPY test_json_import_gzip FROM '${SQLNESS_HOME}/import/test_json_gzip.json.gz' WITH (format='json', compression_type='gzip'); + +Affected Rows: 5 + +SELECT COUNT(*) as gzip_count FROM test_json_import_gzip; + ++------------+ +| gzip_count | ++------------+ +| 5 | ++------------+ + +SELECT `id`, `name`, `value` FROM test_json_import_gzip WHERE `id` = 1; + ++----+-------+-------+ +| id | name | value | ++----+-------+-------+ +| 1 | Alice | 10.5 | ++----+-------+-------+ + +-- Test importing ZSTD compressed JSON +CREATE TABLE test_json_import_zstd( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +COPY test_json_import_zstd FROM '${SQLNESS_HOME}/import/test_json_zstd.json.zst' WITH (format='json', compression_type='zstd'); + +Affected Rows: 5 + +SELECT COUNT(*) as zstd_count FROM test_json_import_zstd; + ++------------+ +| zstd_count | ++------------+ +| 5 | ++------------+ + +SELECT `id`, `name`, `value` FROM test_json_import_zstd WHERE `id` = 2; + ++----+------+-------+ +| id | name | value | ++----+------+-------+ +| 2 | Bob | 20.3 | ++----+------+-------+ + +-- Test importing BZIP2 compressed JSON +CREATE TABLE test_json_import_bzip2( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +COPY test_json_import_bzip2 FROM '${SQLNESS_HOME}/import/test_json_bzip2.json.bz2' WITH (format='json', compression_type='bzip2'); + +Affected Rows: 5 + +SELECT COUNT(*) as bzip2_count FROM test_json_import_bzip2; + ++-------------+ +| bzip2_count | ++-------------+ +| 5 | ++-------------+ + +SELECT `id`, `name`, `value` FROM test_json_import_bzip2 WHERE `id` = 3; + ++----+---------+-------+ +| id | name | value | ++----+---------+-------+ +| 3 | Charlie | 30.7 | ++----+---------+-------+ + +-- Test importing XZ compressed JSON +CREATE TABLE test_json_import_xz( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +COPY test_json_import_xz FROM '${SQLNESS_HOME}/import/test_json_xz.json.xz' WITH (format='json', compression_type='xz'); + +Affected Rows: 5 + +SELECT COUNT(*) as xz_count FROM test_json_import_xz; + ++----------+ +| xz_count | ++----------+ +| 5 | ++----------+ + +SELECT `id`, `name`, `value` FROM test_json_import_xz WHERE `id` = 4; + ++----+-------+-------+ +| id | name | value | ++----+-------+-------+ +| 4 | David | 40.1 | ++----+-------+-------+ + +-- Verify data integrity by comparing all imported tables +SELECT source, count FROM ( + SELECT 'uncompressed' as source, COUNT(*) as count, 1 as order_key FROM test_json_import_uncompressed + UNION ALL + SELECT 'gzip', COUNT(*) as count, 2 as order_key FROM test_json_import_gzip + UNION ALL + SELECT 'zstd', COUNT(*) as count, 3 as order_key FROM test_json_import_zstd + UNION ALL + SELECT 'bzip2', COUNT(*) as count, 4 as order_key FROM test_json_import_bzip2 + UNION ALL + SELECT 'xz', COUNT(*) as count, 5 as order_key FROM test_json_import_xz +) AS subquery +ORDER BY order_key; + ++--------------+-------+ +| source | count | ++--------------+-------+ +| uncompressed | 5 | +| gzip | 5 | +| zstd | 5 | +| bzip2 | 5 | +| xz | 5 | ++--------------+-------+ + +-- Clean up +DROP TABLE test_json_export; + +Affected Rows: 0 + +DROP TABLE test_json_import_uncompressed; + +Affected Rows: 0 + +DROP TABLE test_json_import_gzip; + +Affected Rows: 0 + +DROP TABLE test_json_import_zstd; + +Affected Rows: 0 + +DROP TABLE test_json_import_bzip2; + +Affected Rows: 0 + +DROP TABLE test_json_import_xz; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/copy/copy_from_json_compressed.sql b/tests/cases/standalone/common/copy/copy_from_json_compressed.sql new file mode 100644 index 0000000000..8b3b400812 --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_from_json_compressed.sql @@ -0,0 +1,109 @@ +-- Test compressed JSON import functionality +-- First, create and export data with different compression types +CREATE TABLE test_json_export( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +-- Insert test data +INSERT INTO test_json_export(`id`, `name`, `value`, ts) VALUES + (1, 'Alice', 10.5, 1640995200000), + (2, 'Bob', 20.3, 1640995260000), + (3, 'Charlie', 30.7, 1640995320000), + (4, 'David', 40.1, 1640995380000), + (5, 'Eve', 50.9, 1640995440000); + +-- Export with different compression types +COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_uncompressed.json' WITH (format='json'); +COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_gzip.json.gz' WITH (format='json', compression_type='gzip'); +COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_zstd.json.zst' WITH (format='json', compression_type='zstd'); +COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_bzip2.json.bz2' WITH (format='json', compression_type='bzip2'); +COPY test_json_export TO '${SQLNESS_HOME}/import/test_json_xz.json.xz' WITH (format='json', compression_type='xz'); + +-- Test importing uncompressed JSON +CREATE TABLE test_json_import_uncompressed( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +COPY test_json_import_uncompressed FROM '${SQLNESS_HOME}/import/test_json_uncompressed.json' WITH (format='json'); + +SELECT COUNT(*) as uncompressed_count FROM test_json_import_uncompressed; + +-- Test importing GZIP compressed JSON +CREATE TABLE test_json_import_gzip( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +COPY test_json_import_gzip FROM '${SQLNESS_HOME}/import/test_json_gzip.json.gz' WITH (format='json', compression_type='gzip'); + +SELECT COUNT(*) as gzip_count FROM test_json_import_gzip; +SELECT `id`, `name`, `value` FROM test_json_import_gzip WHERE `id` = 1; + +-- Test importing ZSTD compressed JSON +CREATE TABLE test_json_import_zstd( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +COPY test_json_import_zstd FROM '${SQLNESS_HOME}/import/test_json_zstd.json.zst' WITH (format='json', compression_type='zstd'); + +SELECT COUNT(*) as zstd_count FROM test_json_import_zstd; +SELECT `id`, `name`, `value` FROM test_json_import_zstd WHERE `id` = 2; + +-- Test importing BZIP2 compressed JSON +CREATE TABLE test_json_import_bzip2( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +COPY test_json_import_bzip2 FROM '${SQLNESS_HOME}/import/test_json_bzip2.json.bz2' WITH (format='json', compression_type='bzip2'); + +SELECT COUNT(*) as bzip2_count FROM test_json_import_bzip2; +SELECT `id`, `name`, `value` FROM test_json_import_bzip2 WHERE `id` = 3; + +-- Test importing XZ compressed JSON +CREATE TABLE test_json_import_xz( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +COPY test_json_import_xz FROM '${SQLNESS_HOME}/import/test_json_xz.json.xz' WITH (format='json', compression_type='xz'); + +SELECT COUNT(*) as xz_count FROM test_json_import_xz; +SELECT `id`, `name`, `value` FROM test_json_import_xz WHERE `id` = 4; + +-- Verify data integrity by comparing all imported tables +SELECT source, count FROM ( + SELECT 'uncompressed' as source, COUNT(*) as count, 1 as order_key FROM test_json_import_uncompressed + UNION ALL + SELECT 'gzip', COUNT(*) as count, 2 as order_key FROM test_json_import_gzip + UNION ALL + SELECT 'zstd', COUNT(*) as count, 3 as order_key FROM test_json_import_zstd + UNION ALL + SELECT 'bzip2', COUNT(*) as count, 4 as order_key FROM test_json_import_bzip2 + UNION ALL + SELECT 'xz', COUNT(*) as count, 5 as order_key FROM test_json_import_xz +) AS subquery +ORDER BY order_key; + +-- Clean up +DROP TABLE test_json_export; +DROP TABLE test_json_import_uncompressed; +DROP TABLE test_json_import_gzip; +DROP TABLE test_json_import_zstd; +DROP TABLE test_json_import_bzip2; +DROP TABLE test_json_import_xz; diff --git a/tests/cases/standalone/common/copy/copy_to_csv_compressed.result b/tests/cases/standalone/common/copy/copy_to_csv_compressed.result new file mode 100644 index 0000000000..8b3f8585c2 --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_to_csv_compressed.result @@ -0,0 +1,65 @@ +-- Test compressed CSV export functionality +CREATE TABLE test_csv_compression( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +-- Insert test data +INSERT INTO test_csv_compression(`id`, `name`, `value`, ts) VALUES + (1, 'Alice', 10.5, 1640995200000), + (2, 'Bob', 20.3, 1640995260000), + (3, 'Charlie', 30.7, 1640995320000), + (4, 'David', 40.1, 1640995380000), + (5, 'Eve', 50.9, 1640995440000); + +Affected Rows: 5 + +-- Test uncompressed CSV export +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_uncompressed.csv' WITH (format='csv'); + +Affected Rows: 5 + +-- Test GZIP compressed CSV export +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_gzip.csv.gz' WITH (format='csv', compression_type='gzip'); + +Affected Rows: 5 + +-- Test ZSTD compressed CSV export +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_zstd.csv.zst' WITH (format='csv', compression_type='zstd'); + +Affected Rows: 5 + +-- Test BZIP2 compressed CSV export +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_bzip2.csv.bz2' WITH (format='csv', compression_type='bzip2'); + +Affected Rows: 5 + +-- Test XZ compressed CSV export +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_xz.csv.xz' WITH (format='csv', compression_type='xz'); + +Affected Rows: 5 + +-- Test compressed CSV with custom delimiter b'\t' (ASCII 9) +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_tab_separated.csv.gz' WITH (format='csv', compression_type='gzip', delimiter='9'); + +Affected Rows: 5 + +-- Test compressed CSV with timestamp format +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_timestamp_format.csv.zst' WITH (format='csv', compression_type='zstd', timestamp_format='%Y-%m-%d %H:%M:%S'); + +Affected Rows: 5 + +-- Test compressed CSV with date format +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_date_format.csv.gz' WITH (format='csv', compression_type='gzip', date_format='%Y/%m/%d'); + +Affected Rows: 5 + +-- Clean up +DROP TABLE test_csv_compression; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/copy/copy_to_csv_compressed.sql b/tests/cases/standalone/common/copy/copy_to_csv_compressed.sql new file mode 100644 index 0000000000..1eb899c563 --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_to_csv_compressed.sql @@ -0,0 +1,42 @@ +-- Test compressed CSV export functionality +CREATE TABLE test_csv_compression( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +-- Insert test data +INSERT INTO test_csv_compression(`id`, `name`, `value`, ts) VALUES + (1, 'Alice', 10.5, 1640995200000), + (2, 'Bob', 20.3, 1640995260000), + (3, 'Charlie', 30.7, 1640995320000), + (4, 'David', 40.1, 1640995380000), + (5, 'Eve', 50.9, 1640995440000); + +-- Test uncompressed CSV export +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_uncompressed.csv' WITH (format='csv'); + +-- Test GZIP compressed CSV export +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_gzip.csv.gz' WITH (format='csv', compression_type='gzip'); + +-- Test ZSTD compressed CSV export +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_zstd.csv.zst' WITH (format='csv', compression_type='zstd'); + +-- Test BZIP2 compressed CSV export +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_bzip2.csv.bz2' WITH (format='csv', compression_type='bzip2'); + +-- Test XZ compressed CSV export +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_xz.csv.xz' WITH (format='csv', compression_type='xz'); + +-- Test compressed CSV with custom delimiter b'\t' (ASCII 9) +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_tab_separated.csv.gz' WITH (format='csv', compression_type='gzip', delimiter='9'); + +-- Test compressed CSV with timestamp format +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_timestamp_format.csv.zst' WITH (format='csv', compression_type='zstd', timestamp_format='%Y-%m-%d %H:%M:%S'); + +-- Test compressed CSV with date format +COPY test_csv_compression TO '${SQLNESS_HOME}/export/test_csv_date_format.csv.gz' WITH (format='csv', compression_type='gzip', date_format='%Y/%m/%d'); + +-- Clean up +DROP TABLE test_csv_compression; diff --git a/tests/cases/standalone/common/copy/copy_to_json_compressed.result b/tests/cases/standalone/common/copy/copy_to_json_compressed.result new file mode 100644 index 0000000000..567b8b1853 --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_to_json_compressed.result @@ -0,0 +1,55 @@ +-- Test compressed JSON export functionality +CREATE TABLE test_json_compression( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +-- Insert test data +INSERT INTO test_json_compression(`id`, `name`, `value`, ts) VALUES + (1, 'Alice', 10.5, 1640995200000), + (2, 'Bob', 20.3, 1640995260000), + (3, 'Charlie', 30.7, 1640995320000), + (4, 'David', 40.1, 1640995380000), + (5, 'Eve', 50.9, 1640995440000); + +Affected Rows: 5 + +-- Test uncompressed JSON export +COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_uncompressed.json' WITH (format='json'); + +Affected Rows: 5 + +-- Test GZIP compressed JSON export +COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_gzip.json.gz' WITH (format='json', compression_type='gzip'); + +Affected Rows: 5 + +-- Test ZSTD compressed JSON export +COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_zstd.json.zst' WITH (format='json', compression_type='zstd'); + +Affected Rows: 5 + +-- Test BZIP2 compressed JSON export +COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_bzip2.json.bz2' WITH (format='json', compression_type='bzip2'); + +Affected Rows: 5 + +-- Test XZ compressed JSON export +COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_xz.json.xz' WITH (format='json', compression_type='xz'); + +Affected Rows: 5 + +-- Test compressed JSON with schema inference limit +COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_schema_limit.json.gz' WITH (format='json', compression_type='gzip', schema_infer_max_record=100); + +Affected Rows: 5 + +-- Clean up +DROP TABLE test_json_compression; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/copy/copy_to_json_compressed.sql b/tests/cases/standalone/common/copy/copy_to_json_compressed.sql new file mode 100644 index 0000000000..ba31c290dd --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_to_json_compressed.sql @@ -0,0 +1,36 @@ +-- Test compressed JSON export functionality +CREATE TABLE test_json_compression( + `id` UINT32, + `name` STRING, + `value` DOUBLE, + ts TIMESTAMP TIME INDEX +); + +-- Insert test data +INSERT INTO test_json_compression(`id`, `name`, `value`, ts) VALUES + (1, 'Alice', 10.5, 1640995200000), + (2, 'Bob', 20.3, 1640995260000), + (3, 'Charlie', 30.7, 1640995320000), + (4, 'David', 40.1, 1640995380000), + (5, 'Eve', 50.9, 1640995440000); + +-- Test uncompressed JSON export +COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_uncompressed.json' WITH (format='json'); + +-- Test GZIP compressed JSON export +COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_gzip.json.gz' WITH (format='json', compression_type='gzip'); + +-- Test ZSTD compressed JSON export +COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_zstd.json.zst' WITH (format='json', compression_type='zstd'); + +-- Test BZIP2 compressed JSON export +COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_bzip2.json.bz2' WITH (format='json', compression_type='bzip2'); + +-- Test XZ compressed JSON export +COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_xz.json.xz' WITH (format='json', compression_type='xz'); + +-- Test compressed JSON with schema inference limit +COPY test_json_compression TO '${SQLNESS_HOME}/export/test_json_schema_limit.json.gz' WITH (format='json', compression_type='gzip', schema_infer_max_record=100); + +-- Clean up +DROP TABLE test_json_compression;