From a6269397c8924dbf9914ce13d81eded74a8c8f68 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 4 Jun 2024 16:43:33 +0800 Subject: [PATCH] fix: fix EntityTooSmall issue (#4100) * fix: fix EntityTooSmall issue * chore(ci): add minio to coverage * tests: add test for parquet writer * chore: move tests to `common-datasource` crate --- .github/workflows/develop.yml | 8 +++ Cargo.lock | 5 ++ src/common/datasource/Cargo.toml | 5 ++ src/common/datasource/src/file_format.rs | 2 + .../datasource/src/file_format/parquet.rs | 72 +++++++++++++++++++ src/common/datasource/src/lib.rs | 5 ++ src/mito2/Cargo.toml | 1 + .../minio/docker-compose-standalone.yml | 18 +++++ 8 files changed, 116 insertions(+) create mode 100644 tests-integration/fixtures/minio/docker-compose-standalone.yml diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index b397a0fd60..deecdc454a 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -518,6 +518,9 @@ jobs: - name: Setup kafka server working-directory: tests-integration/fixtures/kafka run: docker compose -f docker-compose-standalone.yml up -d --wait + - name: Setup minio + working-directory: tests-integration/fixtures/minio + run: docker compose -f docker-compose-standalone.yml up -d --wait - name: Run nextest cases run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard env: @@ -528,6 +531,11 @@ jobs: GT_S3_ACCESS_KEY_ID: ${{ secrets.AWS_CI_TEST_ACCESS_KEY_ID }} GT_S3_ACCESS_KEY: ${{ secrets.AWS_CI_TEST_SECRET_ACCESS_KEY }} GT_S3_REGION: ${{ vars.AWS_CI_TEST_BUCKET_REGION }} + GT_MINIO_BUCKET: greptime + GT_MINIO_ACCESS_KEY_ID: superpower_ci_user + GT_MINIO_ACCESS_KEY: superpower_password + GT_MINIO_REGION: us-west-2 + GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000 GT_ETCD_ENDPOINTS: http://127.0.0.1:2379 GT_KAFKA_ENDPOINTS: 127.0.0.1:9092 UNITTEST_LOG_DIR: "__unittest_logs" diff --git a/Cargo.lock b/Cargo.lock index d722b1e9f2..44ad551722 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1752,20 +1752,24 @@ dependencies = [ "async-compression 0.3.15", "async-trait", "bytes", + "common-base", "common-error", "common-macro", "common-recordbatch", "common-runtime", + "common-telemetry", "common-test-util", "datafusion 38.0.0", "datatypes", "derive_builder 0.12.0", + "dotenv", "futures", "lazy_static", "object-store", "orc-rust", "parquet", "paste", + "rand", "regex", "serde", "snafu 0.8.3", @@ -1773,6 +1777,7 @@ dependencies = [ "tokio", "tokio-util", "url", + "uuid", ] [[package]] diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index ece0edd9fe..e4bdec2c70 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -20,6 +20,7 @@ async-compression = { version = "0.3", features = [ ] } async-trait.workspace = true bytes.workspace = true +common-base.workspace = true common-error.workspace = true common-macro.workspace = true common-recordbatch.workspace = true @@ -33,6 +34,7 @@ object-store.workspace = true orc-rust = { git = "https://github.com/datafusion-contrib/datafusion-orc.git", rev = "502217315726314c4008808fe169764529640599" } parquet.workspace = true paste = "1.0" +rand.workspace = true regex = "1.7" serde.workspace = true snafu.workspace = true @@ -42,4 +44,7 @@ tokio-util.workspace = true url = "2.3" [dev-dependencies] +common-telemetry.workspace = true common-test-util.workspace = true +dotenv.workspace = true +uuid.workspace = true diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index 5bb9258ad3..c555f763b5 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -46,6 +46,7 @@ use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter}; use crate::compression::CompressionType; use crate::error::{self, Result}; use crate::share_buffer::SharedBuffer; +use crate::DEFAULT_WRITE_BUFFER_SIZE; pub const FORMAT_COMPRESSION_TYPE: &str = "compression_type"; pub const FORMAT_DELIMITER: &str = "delimiter"; @@ -204,6 +205,7 @@ pub async fn stream_to_file T>( store .writer_with(&path) .concurrent(concurrency) + .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) .await .map(|v| v.into_futures_async_write().compat_write()) .context(error::WriteObjectSnafu { path }) diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 2e887ac2f7..f5125757b9 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -39,6 +39,7 @@ use crate::buffered_writer::{ArrowWriterCloser, DfRecordBatchEncoder, LazyBuffer use crate::error::{self, Result}; use crate::file_format::FileFormat; use crate::share_buffer::SharedBuffer; +use crate::DEFAULT_WRITE_BUFFER_SIZE; #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub struct ParquetFormat {} @@ -197,6 +198,7 @@ impl BufferedWriter { store .writer_with(&path) .concurrent(concurrency) + .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize) .await .map(|v| v.into_futures_async_write().compat_write()) .context(error::WriteObjectSnafu { path }) @@ -276,9 +278,19 @@ pub async fn stream_to_parquet( #[cfg(test)] mod tests { + use std::env; + use std::sync::Arc; + + use common_telemetry::warn; use common_test_util::find_workspace_path; + use datatypes::arrow::array::{ArrayRef, Int64Array, RecordBatch}; + use datatypes::arrow::datatypes::{DataType, Field, Schema}; + use object_store::services::S3; + use object_store::ObjectStore; + use rand::{thread_rng, Rng}; use super::*; + use crate::file_format::parquet::BufferedWriter; use crate::test_util::{format_schema, test_store}; fn test_data_root() -> String { @@ -296,4 +308,64 @@ mod tests { assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted); } + + #[tokio::test] + async fn test_parquet_writer() { + common_telemetry::init_default_ut_logging(); + let _ = dotenv::dotenv(); + let Ok(bucket) = env::var("GT_MINIO_BUCKET") else { + warn!("ignoring test parquet writer"); + return; + }; + + let mut builder = S3::default(); + let _ = builder + .root(&uuid::Uuid::new_v4().to_string()) + .access_key_id(&env::var("GT_MINIO_ACCESS_KEY_ID").unwrap()) + .secret_access_key(&env::var("GT_MINIO_ACCESS_KEY").unwrap()) + .bucket(&bucket) + .region(&env::var("GT_MINIO_REGION").unwrap()) + .endpoint(&env::var("GT_MINIO_ENDPOINT_URL").unwrap()); + + let object_store = ObjectStore::new(builder).unwrap().finish(); + let file_path = uuid::Uuid::new_v4().to_string(); + let fields = vec![ + Field::new("field1", DataType::Int64, true), + Field::new("field0", DataType::Int64, true), + ]; + let arrow_schema = Arc::new(Schema::new(fields)); + let mut buffered_writer = BufferedWriter::try_new( + file_path.clone(), + object_store.clone(), + arrow_schema.clone(), + None, + // Sets a small value. + 128, + 8, + ) + .await + .unwrap(); + let rows = 200000; + let generator = || { + let columns: Vec = vec![ + Arc::new(Int64Array::from( + (0..rows) + .map(|_| thread_rng().gen::()) + .collect::>(), + )), + Arc::new(Int64Array::from( + (0..rows) + .map(|_| thread_rng().gen::()) + .collect::>(), + )), + ]; + RecordBatch::try_new(arrow_schema.clone(), columns).unwrap() + }; + let batch = generator(); + // Writes about ~30Mi + for _ in 0..10 { + buffered_writer.write(&batch).await.unwrap(); + } + buffered_writer.close().await.unwrap(); + } } diff --git a/src/common/datasource/src/lib.rs b/src/common/datasource/src/lib.rs index 8cb8756e06..5d24b1cdf4 100644 --- a/src/common/datasource/src/lib.rs +++ b/src/common/datasource/src/lib.rs @@ -27,3 +27,8 @@ pub mod test_util; #[cfg(test)] pub mod tests; pub mod util; + +use common_base::readable_size::ReadableSize; + +/// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb). +pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8); diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index f3a747dcd0..3994ebb439 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -75,6 +75,7 @@ uuid.workspace = true common-procedure-test.workspace = true common-test-util.workspace = true criterion = "0.4" +dotenv.workspace = true log-store.workspace = true object-store = { workspace = true, features = ["services-memory"] } rskafka.workspace = true diff --git a/tests-integration/fixtures/minio/docker-compose-standalone.yml b/tests-integration/fixtures/minio/docker-compose-standalone.yml new file mode 100644 index 0000000000..139cb916a9 --- /dev/null +++ b/tests-integration/fixtures/minio/docker-compose-standalone.yml @@ -0,0 +1,18 @@ +version: '3.8' +services: + minio: + image: bitnami/minio:2024 + ports: + - '9000:9000' + - '9001:9001' + environment: + - MINIO_ROOT_USER=superpower_ci_user + - MINIO_ROOT_PASSWORD=superpower_password + - MINIO_DEFAULT_BUCKETS=greptime + - BITNAMI_DEBUG=true + volumes: + - 'minio_data:/bitnami/minio/data' + +volumes: + minio_data: + driver: local