feat(cli): add export-v2 chunk parallelism (#8292)

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2026-06-14 23:00:53 -07:00
committed by GitHub
parent c9062fe042
commit 336e4cdb84
2 changed files with 265 additions and 17 deletions

View File

@@ -296,6 +296,10 @@ pub struct ExportCreateCommand {
#[clap(long, default_value = "1")]
parallelism: usize,
/// Number of export chunks to run concurrently on the client (1..=64).
#[clap(long, default_value = "1", value_parser = parse_chunk_parallelism)]
chunk_parallelism: usize,
/// Basic authentication (user:password).
#[clap(long)]
auth_basic: Option<String>,
@@ -351,6 +355,9 @@ impl ExportCreateCommand {
if self.parallelism != 1 {
invalid_args.push("--parallelism");
}
if self.chunk_parallelism != 1 {
invalid_args.push("--chunk-parallelism");
}
if !invalid_args.is_empty() {
return SchemaOnlyArgsNotAllowedSnafu {
args: invalid_args.join(", "),
@@ -391,6 +398,7 @@ impl ExportCreateCommand {
time_range,
chunk_time_window: self.chunk_time_window,
parallelism: self.parallelism,
chunk_parallelism: self.chunk_parallelism,
snapshot_uri: self.to.clone(),
storage_config: self.storage.clone(),
},
@@ -416,10 +424,22 @@ struct ExportConfig {
time_range: TimeRange,
chunk_time_window: Option<Duration>,
parallelism: usize,
chunk_parallelism: usize,
snapshot_uri: String,
storage_config: ObjectStoreConfig,
}
fn parse_chunk_parallelism(value: &str) -> std::result::Result<usize, String> {
let parallelism = value
.parse::<usize>()
.map_err(|_| "chunk parallelism must be an integer between 1 and 64".to_string())?;
if (1..=64).contains(&parallelism) {
Ok(parallelism)
} else {
Err("chunk parallelism must be between 1 and 64".to_string())
}
}
#[async_trait]
impl Tool for ExportCreate {
async fn do_work(&self) -> std::result::Result<(), BoxedError> {
@@ -474,6 +494,7 @@ impl ExportCreate {
&self.config.storage_config,
&mut manifest,
self.config.parallelism,
self.config.chunk_parallelism,
)
.await?;
return Ok(());
@@ -531,6 +552,7 @@ impl ExportCreate {
&self.config.storage_config,
&mut manifest,
self.config.parallelism,
self.config.chunk_parallelism,
)
.await?;
}
@@ -1557,6 +1579,8 @@ mod tests {
"csv",
"--parallelism",
"2",
"--chunk-parallelism",
"2",
]);
let error = cmd.build().await.err().unwrap().to_string();
@@ -1567,6 +1591,63 @@ mod tests {
assert!(error.contains("--chunk-time-window"));
assert!(error.contains("--format"));
assert!(error.contains("--parallelism"));
assert!(error.contains("--chunk-parallelism"));
}
#[test]
fn test_chunk_parallelism_defaults_to_one() {
let cmd = ExportCreateCommand::parse_from([
"export-v2-create",
"--addr",
"127.0.0.1:4000",
"--to",
"file:///tmp/export-v2-test",
]);
assert_eq!(1, cmd.chunk_parallelism);
}
#[test]
fn test_chunk_parallelism_parses_valid_value() {
let cmd = ExportCreateCommand::parse_from([
"export-v2-create",
"--addr",
"127.0.0.1:4000",
"--to",
"file:///tmp/export-v2-test",
"--chunk-parallelism",
"64",
]);
assert_eq!(64, cmd.chunk_parallelism);
}
#[test]
fn test_chunk_parallelism_rejects_out_of_range_values() {
assert!(
ExportCreateCommand::try_parse_from([
"export-v2-create",
"--addr",
"127.0.0.1:4000",
"--to",
"file:///tmp/export-v2-test",
"--chunk-parallelism",
"0",
])
.is_err()
);
assert!(
ExportCreateCommand::try_parse_from([
"export-v2-create",
"--addr",
"127.0.0.1:4000",
"--to",
"file:///tmp/export-v2-test",
"--chunk-parallelism",
"65",
])
.is_err()
);
}
#[test]
@@ -1602,6 +1683,7 @@ mod tests {
time_range: TimeRange::unbounded(),
chunk_time_window: None,
parallelism: 1,
chunk_parallelism: 1,
snapshot_uri: "file:///tmp/snapshot".to_string(),
storage_config: ObjectStoreConfig::default(),
};
@@ -1637,6 +1719,7 @@ mod tests {
time_range: TimeRange::unbounded(),
chunk_time_window: None,
parallelism: 1,
chunk_parallelism: 1,
snapshot_uri: "file:///tmp/snapshot".to_string(),
storage_config: ObjectStoreConfig::default(),
};
@@ -1667,6 +1750,7 @@ mod tests {
time_range,
chunk_time_window: Some(Duration::from_secs(3600)),
parallelism: 1,
chunk_parallelism: 1,
snapshot_uri: "file:///tmp/snapshot".to_string(),
storage_config: ObjectStoreConfig::default(),
};
@@ -1698,6 +1782,7 @@ mod tests {
time_range: TimeRange::unbounded(),
chunk_time_window: None,
parallelism: 1,
chunk_parallelism: 1,
snapshot_uri: "file:///tmp/snapshot".to_string(),
storage_config: ObjectStoreConfig::default(),
};
@@ -1731,6 +1816,7 @@ mod tests {
time_range: TimeRange::new(Some(start), Some(start)),
chunk_time_window: None,
parallelism: 1,
chunk_parallelism: 1,
snapshot_uri: "file:///tmp/snapshot".to_string(),
storage_config: ObjectStoreConfig::default(),
};

View File

@@ -13,22 +13,29 @@
// limitations under the License.
use common_telemetry::info;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use crate::common::ObjectStoreConfig;
use crate::data::export_v2::data::{CopyOptions, build_copy_target, execute_copy_database};
use crate::data::export_v2::error::Result;
use crate::data::export_v2::error::{Error, Result};
use crate::data::export_v2::manifest::{ChunkStatus, DataFormat, Manifest, TimeRange};
use crate::data::path::data_dir_for_schema_chunk;
use crate::data::snapshot_storage::{SnapshotStorage, StorageScheme};
use crate::database::DatabaseClient;
/// Owned, manifest-independent context shared by all chunk export futures.
///
/// `catalog`/`schemas`/`format` are cloned from the manifest up front so the
/// export futures never borrow the manifest, leaving the coordinator free to
/// mutate and persist it while chunks are in flight.
struct ExportContext<'a> {
storage: &'a dyn SnapshotStorage,
database_client: &'a DatabaseClient,
snapshot_uri: &'a str,
storage_config: &'a ObjectStoreConfig,
catalog: &'a str,
schemas: &'a [String],
catalog: String,
schemas: Vec<String>,
format: DataFormat,
parallelism: usize,
}
@@ -40,11 +47,36 @@ pub async fn export_data(
storage_config: &ObjectStoreConfig,
manifest: &mut Manifest,
parallelism: usize,
chunk_parallelism: usize,
) -> Result<()> {
if manifest.chunks.is_empty() {
return Ok(());
}
let context = ExportContext {
storage,
database_client,
snapshot_uri,
storage_config,
catalog: manifest.catalog.clone(),
schemas: manifest.schemas.clone(),
format: manifest.format,
parallelism,
};
if chunk_parallelism <= 1 {
export_data_serial(&context, storage, manifest).await
} else {
export_data_concurrent(&context, storage, manifest, chunk_parallelism).await
}
}
/// Exports chunks one at a time, preserving the original serial behavior.
async fn export_data_serial(
context: &ExportContext<'_>,
storage: &dyn SnapshotStorage,
manifest: &mut Manifest,
) -> Result<()> {
for idx in 0..manifest.chunks.len() {
if matches!(
manifest.chunks[idx].status,
@@ -57,17 +89,7 @@ pub async fn export_data(
manifest.touch();
storage.write_manifest(manifest).await?;
let context = ExportContext {
storage,
database_client,
snapshot_uri,
storage_config,
catalog: &manifest.catalog,
schemas: &manifest.schemas,
format: manifest.format,
parallelism,
};
let export_result = export_chunk(&context, chunk_id, time_range).await;
let export_result = export_chunk(context, chunk_id, time_range).await;
let result = match export_result {
Ok(files) => {
@@ -89,6 +111,91 @@ pub async fn export_data(
Ok(())
}
/// Exports up to `chunk_parallelism` chunks concurrently on the client.
///
/// The coordinator owns all manifest mutation/persistence: it marks chunks
/// `InProgress` and persists the manifest before polling their futures, then
/// applies each chunk result and persists again on completion. The export
/// futures only run COPY DATABASE and collect files; they never touch the
/// manifest, so manifest writes stay serialized in this task.
///
/// On the first chunk failure we stop scheduling new chunks but let already
/// in-flight chunks finish and persist their final status, then return the
/// first error.
async fn export_data_concurrent(
context: &ExportContext<'_>,
storage: &dyn SnapshotStorage,
manifest: &mut Manifest,
chunk_parallelism: usize,
) -> Result<()> {
let mut pending = FuturesUnordered::new();
let mut next_idx = 0;
let mut first_error: Option<Error> = None;
loop {
let mut scheduled = false;
// Schedule eligible chunks in order up to the parallelism limit. Once a
// failure is seen, stop scheduling but keep draining in-flight chunks.
while first_error.is_none() && pending.len() < chunk_parallelism {
let Some(idx) = next_eligible_chunk(manifest, &mut next_idx) else {
break;
};
let (chunk_id, time_range) = mark_chunk_in_progress(manifest, idx);
scheduled = true;
pending.push(async move {
let result = export_chunk(context, chunk_id, time_range).await;
(idx, result)
});
}
if scheduled {
manifest.touch();
storage.write_manifest(manifest).await?;
}
let Some((idx, export_result)) = pending.next().await else {
break;
};
match export_result {
Ok(files) => mark_chunk_completed(manifest, idx, files),
Err(err) => {
mark_chunk_failed(manifest, idx, err.to_string());
if first_error.is_none() {
first_error = Some(err);
}
}
}
manifest.touch();
storage.write_manifest(manifest).await?;
}
match first_error {
Some(err) => Err(err),
None => Ok(()),
}
}
/// Returns the index of the next chunk eligible for export, scanning forward
/// from `next_idx` and skipping already Completed/Skipped chunks. Advances
/// `next_idx` past the returned chunk so each chunk is scheduled at most once.
fn next_eligible_chunk(manifest: &Manifest, next_idx: &mut usize) -> Option<usize> {
while *next_idx < manifest.chunks.len() {
let idx = *next_idx;
*next_idx += 1;
if !matches!(
manifest.chunks[idx].status,
ChunkStatus::Completed | ChunkStatus::Skipped
) {
return Some(idx);
}
}
None
}
fn mark_chunk_in_progress(manifest: &mut Manifest, idx: usize) -> (u32, TimeRange) {
let chunk = &mut manifest.chunks[idx];
chunk.mark_in_progress();
@@ -122,7 +229,7 @@ async fn export_chunk(
parallelism: context.parallelism,
};
for schema in context.schemas {
for schema in &context.schemas {
let prefix = data_dir_for_schema_chunk(schema, chunk_id);
if needs_dir {
context.storage.create_dir_all(&prefix).await?;
@@ -136,7 +243,7 @@ async fn export_chunk(
)?;
execute_copy_database(
context.database_client,
context.catalog,
&context.catalog,
schema,
&target,
&copy_options,
@@ -144,7 +251,7 @@ async fn export_chunk(
.await?;
}
let files = list_chunk_files(context.storage, context.schemas, chunk_id).await?;
let files = list_chunk_files(context.storage, &context.schemas, chunk_id).await?;
info!("Collected {} files for chunk {}", files.len(), chunk_id);
Ok(files)
}
@@ -164,3 +271,58 @@ async fn list_chunk_files(
files.sort();
Ok(files)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data::export_v2::manifest::ChunkMeta;
fn pending_manifest(n: u32) -> Manifest {
let mut manifest = Manifest::new_full(
"greptime".to_string(),
vec!["public".to_string()],
TimeRange::unbounded(),
DataFormat::Parquet,
);
manifest.chunks = (1..=n)
.map(|id| ChunkMeta::new(id, TimeRange::unbounded()))
.collect();
manifest
}
#[test]
fn test_next_eligible_chunk_scans_in_order() {
let manifest = pending_manifest(3);
let mut next_idx = 0;
assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), Some(0));
assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), Some(1));
assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), Some(2));
assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), None);
}
#[test]
fn test_next_eligible_chunk_skips_completed_and_skipped() {
let mut manifest = pending_manifest(4);
manifest.chunks[0].mark_completed(vec!["data/public/1/f.parquet".to_string()], None);
manifest.chunks[2].mark_skipped();
let mut next_idx = 0;
// Chunk 0 (completed) and chunk 2 (skipped) are skipped; failed/pending eligible.
assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), Some(1));
assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), Some(3));
assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), None);
}
#[test]
fn test_next_eligible_chunk_treats_failed_and_in_progress_as_eligible() {
let mut manifest = pending_manifest(2);
manifest.chunks[0].mark_failed("boom".to_string());
manifest.chunks[1].mark_in_progress();
let mut next_idx = 0;
assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), Some(0));
assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), Some(1));
assert_eq!(next_eligible_chunk(&manifest, &mut next_idx), None);
}
}