From 336e4cdb84126c0851bcb64224ef757f0f09f7a7 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Sun, 14 Jun 2026 23:00:53 -0700 Subject: [PATCH] feat(cli): add export-v2 chunk parallelism (#8292) Signed-off-by: jeremyhi --- src/cli/src/data/export_v2/command.rs | 86 ++++++++++ src/cli/src/data/export_v2/coordinator.rs | 196 ++++++++++++++++++++-- 2 files changed, 265 insertions(+), 17 deletions(-) diff --git a/src/cli/src/data/export_v2/command.rs b/src/cli/src/data/export_v2/command.rs index 829be8ea1d..a264a0f293 100644 --- a/src/cli/src/data/export_v2/command.rs +++ b/src/cli/src/data/export_v2/command.rs @@ -296,6 +296,10 @@ pub struct ExportCreateCommand { #[clap(long, default_value = "1")] parallelism: usize, + /// Number of export chunks to run concurrently on the client (1..=64). + #[clap(long, default_value = "1", value_parser = parse_chunk_parallelism)] + chunk_parallelism: usize, + /// Basic authentication (user:password). #[clap(long)] auth_basic: Option, @@ -351,6 +355,9 @@ impl ExportCreateCommand { if self.parallelism != 1 { invalid_args.push("--parallelism"); } + if self.chunk_parallelism != 1 { + invalid_args.push("--chunk-parallelism"); + } if !invalid_args.is_empty() { return SchemaOnlyArgsNotAllowedSnafu { args: invalid_args.join(", "), @@ -391,6 +398,7 @@ impl ExportCreateCommand { time_range, chunk_time_window: self.chunk_time_window, parallelism: self.parallelism, + chunk_parallelism: self.chunk_parallelism, snapshot_uri: self.to.clone(), storage_config: self.storage.clone(), }, @@ -416,10 +424,22 @@ struct ExportConfig { time_range: TimeRange, chunk_time_window: Option, parallelism: usize, + chunk_parallelism: usize, snapshot_uri: String, storage_config: ObjectStoreConfig, } +fn parse_chunk_parallelism(value: &str) -> std::result::Result { + let parallelism = value + .parse::() + .map_err(|_| "chunk parallelism must be an integer between 1 and 64".to_string())?; + if (1..=64).contains(¶llelism) { + Ok(parallelism) + } else { + Err("chunk parallelism must be between 1 and 64".to_string()) + } +} + #[async_trait] impl Tool for ExportCreate { async fn do_work(&self) -> std::result::Result<(), BoxedError> { @@ -474,6 +494,7 @@ impl ExportCreate { &self.config.storage_config, &mut manifest, self.config.parallelism, + self.config.chunk_parallelism, ) .await?; return Ok(()); @@ -531,6 +552,7 @@ impl ExportCreate { &self.config.storage_config, &mut manifest, self.config.parallelism, + self.config.chunk_parallelism, ) .await?; } @@ -1557,6 +1579,8 @@ mod tests { "csv", "--parallelism", "2", + "--chunk-parallelism", + "2", ]); let error = cmd.build().await.err().unwrap().to_string(); @@ -1567,6 +1591,63 @@ mod tests { assert!(error.contains("--chunk-time-window")); assert!(error.contains("--format")); assert!(error.contains("--parallelism")); + assert!(error.contains("--chunk-parallelism")); + } + + #[test] + fn test_chunk_parallelism_defaults_to_one() { + let cmd = ExportCreateCommand::parse_from([ + "export-v2-create", + "--addr", + "127.0.0.1:4000", + "--to", + "file:///tmp/export-v2-test", + ]); + + assert_eq!(1, cmd.chunk_parallelism); + } + + #[test] + fn test_chunk_parallelism_parses_valid_value() { + let cmd = ExportCreateCommand::parse_from([ + "export-v2-create", + "--addr", + "127.0.0.1:4000", + "--to", + "file:///tmp/export-v2-test", + "--chunk-parallelism", + "64", + ]); + + assert_eq!(64, cmd.chunk_parallelism); + } + + #[test] + fn test_chunk_parallelism_rejects_out_of_range_values() { + assert!( + ExportCreateCommand::try_parse_from([ + "export-v2-create", + "--addr", + "127.0.0.1:4000", + "--to", + "file:///tmp/export-v2-test", + "--chunk-parallelism", + "0", + ]) + .is_err() + ); + assert!( + ExportCreateCommand::try_parse_from([ + "export-v2-create", + "--addr", + "127.0.0.1:4000", + "--to", + "file:///tmp/export-v2-test", + "--chunk-parallelism", + "65", + ]) + .is_err() + ); } #[test] @@ -1602,6 +1683,7 @@ mod tests { time_range: TimeRange::unbounded(), chunk_time_window: None, parallelism: 1, + chunk_parallelism: 1, snapshot_uri: "file:///tmp/snapshot".to_string(), storage_config: ObjectStoreConfig::default(), }; @@ -1637,6 +1719,7 @@ mod tests { time_range: TimeRange::unbounded(), chunk_time_window: None, parallelism: 1, + chunk_parallelism: 1, snapshot_uri: "file:///tmp/snapshot".to_string(), storage_config: ObjectStoreConfig::default(), }; @@ -1667,6 +1750,7 @@ mod tests { time_range, chunk_time_window: Some(Duration::from_secs(3600)), parallelism: 1, + chunk_parallelism: 1, snapshot_uri: "file:///tmp/snapshot".to_string(), storage_config: ObjectStoreConfig::default(), }; @@ -1698,6 +1782,7 @@ mod tests { time_range: TimeRange::unbounded(), chunk_time_window: None, parallelism: 1, + chunk_parallelism: 1, snapshot_uri: "file:///tmp/snapshot".to_string(), storage_config: ObjectStoreConfig::default(), }; @@ -1731,6 +1816,7 @@ mod tests { time_range: TimeRange::new(Some(start), Some(start)), chunk_time_window: None, parallelism: 1, + chunk_parallelism: 1, snapshot_uri: "file:///tmp/snapshot".to_string(), storage_config: ObjectStoreConfig::default(), }; diff --git a/src/cli/src/data/export_v2/coordinator.rs b/src/cli/src/data/export_v2/coordinator.rs index d96c01d693..1f63b6705e 100644 --- a/src/cli/src/data/export_v2/coordinator.rs +++ b/src/cli/src/data/export_v2/coordinator.rs @@ -13,22 +13,29 @@ // limitations under the License. use common_telemetry::info; +use futures::StreamExt; +use futures::stream::FuturesUnordered; use crate::common::ObjectStoreConfig; use crate::data::export_v2::data::{CopyOptions, build_copy_target, execute_copy_database}; -use crate::data::export_v2::error::Result; +use crate::data::export_v2::error::{Error, Result}; use crate::data::export_v2::manifest::{ChunkStatus, DataFormat, Manifest, TimeRange}; use crate::data::path::data_dir_for_schema_chunk; use crate::data::snapshot_storage::{SnapshotStorage, StorageScheme}; use crate::database::DatabaseClient; +/// Owned, manifest-independent context shared by all chunk export futures. +/// +/// `catalog`/`schemas`/`format` are cloned from the manifest up front so the +/// export futures never borrow the manifest, leaving the coordinator free to +/// mutate and persist it while chunks are in flight. struct ExportContext<'a> { storage: &'a dyn SnapshotStorage, database_client: &'a DatabaseClient, snapshot_uri: &'a str, storage_config: &'a ObjectStoreConfig, - catalog: &'a str, - schemas: &'a [String], + catalog: String, + schemas: Vec, format: DataFormat, parallelism: usize, } @@ -40,11 +47,36 @@ pub async fn export_data( storage_config: &ObjectStoreConfig, manifest: &mut Manifest, parallelism: usize, + chunk_parallelism: usize, ) -> Result<()> { if manifest.chunks.is_empty() { return Ok(()); } + let context = ExportContext { + storage, + database_client, + snapshot_uri, + storage_config, + catalog: manifest.catalog.clone(), + schemas: manifest.schemas.clone(), + format: manifest.format, + parallelism, + }; + + if chunk_parallelism <= 1 { + export_data_serial(&context, storage, manifest).await + } else { + export_data_concurrent(&context, storage, manifest, chunk_parallelism).await + } +} + +/// Exports chunks one at a time, preserving the original serial behavior. +async fn export_data_serial( + context: &ExportContext<'_>, + storage: &dyn SnapshotStorage, + manifest: &mut Manifest, +) -> Result<()> { for idx in 0..manifest.chunks.len() { if matches!( manifest.chunks[idx].status, @@ -57,17 +89,7 @@ pub async fn export_data( manifest.touch(); storage.write_manifest(manifest).await?; - let context = ExportContext { - storage, - database_client, - snapshot_uri, - storage_config, - catalog: &manifest.catalog, - schemas: &manifest.schemas, - format: manifest.format, - parallelism, - }; - let export_result = export_chunk(&context, chunk_id, time_range).await; + let export_result = export_chunk(context, chunk_id, time_range).await; let result = match export_result { Ok(files) => { @@ -89,6 +111,91 @@ pub async fn export_data( Ok(()) } +/// Exports up to `chunk_parallelism` chunks concurrently on the client. +/// +/// The coordinator owns all manifest mutation/persistence: it marks chunks +/// `InProgress` and persists the manifest before polling their futures, then +/// applies each chunk result and persists again on completion. The export +/// futures only run COPY DATABASE and collect files; they never touch the +/// manifest, so manifest writes stay serialized in this task. +/// +/// On the first chunk failure we stop scheduling new chunks but let already +/// in-flight chunks finish and persist their final status, then return the +/// first error. +async fn export_data_concurrent( + context: &ExportContext<'_>, + storage: &dyn SnapshotStorage, + manifest: &mut Manifest, + chunk_parallelism: usize, +) -> Result<()> { + let mut pending = FuturesUnordered::new(); + let mut next_idx = 0; + let mut first_error: Option = None; + + loop { + let mut scheduled = false; + + // Schedule eligible chunks in order up to the parallelism limit. Once a + // failure is seen, stop scheduling but keep draining in-flight chunks. + while first_error.is_none() && pending.len() < chunk_parallelism { + let Some(idx) = next_eligible_chunk(manifest, &mut next_idx) else { + break; + }; + + let (chunk_id, time_range) = mark_chunk_in_progress(manifest, idx); + scheduled = true; + + pending.push(async move { + let result = export_chunk(context, chunk_id, time_range).await; + (idx, result) + }); + } + + if scheduled { + manifest.touch(); + storage.write_manifest(manifest).await?; + } + + let Some((idx, export_result)) = pending.next().await else { + break; + }; + + match export_result { + Ok(files) => mark_chunk_completed(manifest, idx, files), + Err(err) => { + mark_chunk_failed(manifest, idx, err.to_string()); + if first_error.is_none() { + first_error = Some(err); + } + } + } + manifest.touch(); + storage.write_manifest(manifest).await?; + } + + match first_error { + Some(err) => Err(err), + None => Ok(()), + } +} + +/// Returns the index of the next chunk eligible for export, scanning forward +/// from `next_idx` and skipping already Completed/Skipped chunks. Advances +/// `next_idx` past the returned chunk so each chunk is scheduled at most once. +fn next_eligible_chunk(manifest: &Manifest, next_idx: &mut usize) -> Option { + while *next_idx < manifest.chunks.len() { + let idx = *next_idx; + *next_idx += 1; + if !matches!( + manifest.chunks[idx].status, + ChunkStatus::Completed | ChunkStatus::Skipped + ) { + return Some(idx); + } + } + None +} + fn mark_chunk_in_progress(manifest: &mut Manifest, idx: usize) -> (u32, TimeRange) { let chunk = &mut manifest.chunks[idx]; chunk.mark_in_progress(); @@ -122,7 +229,7 @@ async fn export_chunk( parallelism: context.parallelism, }; - for schema in context.schemas { + for schema in &context.schemas { let prefix = data_dir_for_schema_chunk(schema, chunk_id); if needs_dir { context.storage.create_dir_all(&prefix).await?; @@ -136,7 +243,7 @@ async fn export_chunk( )?; execute_copy_database( context.database_client, - context.catalog, + &context.catalog, schema, &target, ©_options, @@ -144,7 +251,7 @@ async fn export_chunk( .await?; } - let files = list_chunk_files(context.storage, context.schemas, chunk_id).await?; + let files = list_chunk_files(context.storage, &context.schemas, chunk_id).await?; info!("Collected {} files for chunk {}", files.len(), chunk_id); Ok(files) } @@ -164,3 +271,58 @@ async fn list_chunk_files( files.sort(); Ok(files) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::data::export_v2::manifest::ChunkMeta; + + fn pending_manifest(n: u32) -> Manifest { + let mut manifest = Manifest::new_full( + "greptime".to_string(), + vec!["public".to_string()], + TimeRange::unbounded(), + DataFormat::Parquet, + ); + manifest.chunks = (1..=n) + .map(|id| ChunkMeta::new(id, TimeRange::unbounded())) + .collect(); + manifest + } + + #[test] + fn test_next_eligible_chunk_scans_in_order() { + let manifest = pending_manifest(3); + let mut next_idx = 0; + + assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), Some(0)); + assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), Some(1)); + assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), Some(2)); + assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), None); + } + + #[test] + fn test_next_eligible_chunk_skips_completed_and_skipped() { + let mut manifest = pending_manifest(4); + manifest.chunks[0].mark_completed(vec!["data/public/1/f.parquet".to_string()], None); + manifest.chunks[2].mark_skipped(); + let mut next_idx = 0; + + // Chunk 0 (completed) and chunk 2 (skipped) are skipped; failed/pending eligible. + assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), Some(1)); + assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), Some(3)); + assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), None); + } + + #[test] + fn test_next_eligible_chunk_treats_failed_and_in_progress_as_eligible() { + let mut manifest = pending_manifest(2); + manifest.chunks[0].mark_failed("boom".to_string()); + manifest.chunks[1].mark_in_progress(); + let mut next_idx = 0; + + assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), Some(0)); + assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), Some(1)); + assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), None); + } +}