feat: support parallel table operations in COPY DATABASE (#7213)

* feat: support parallel table operations in COPY DATABASE

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat(cli): add a new `parallelism` parameter to control the parallelism during export

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add sqlness tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: clippy

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor(cli): improve parallelism configuration for data export and import

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-11-17 20:22:51 +08:00
committed by GitHub
parent cc61af7c65
commit 6adc348fcd
7 changed files with 360 additions and 59 deletions

1
Cargo.lock generated
View File

@@ -8659,6 +8659,7 @@ dependencies = [
"common-recordbatch",
"common-runtime",
"common-sql",
"common-stat",
"common-telemetry",
"common-test-util",
"common-time",

View File

@@ -67,9 +67,17 @@ pub struct ExportCommand {
#[clap(long, default_value_t = default_database())]
database: String,
/// Parallelism of the export.
#[clap(long, short = 'j', default_value = "1")]
export_jobs: usize,
/// The number of databases exported in parallel.
/// For example, if there are 20 databases and `db_parallelism` is 4,
/// 4 databases will be exported concurrently.
#[clap(long, short = 'j', default_value = "1", alias = "export-jobs")]
db_parallelism: usize,
/// The number of tables exported in parallel within a single database.
/// For example, if a database has 30 tables and `parallelism` is 8,
/// 8 tables will be exported concurrently.
#[clap(long, default_value = "4")]
table_parallelism: usize,
/// Max retry times for each job.
#[clap(long, default_value = "3")]
@@ -210,10 +218,11 @@ impl ExportCommand {
schema,
database_client,
output_dir: self.output_dir.clone(),
parallelism: self.export_jobs,
export_jobs: self.db_parallelism,
target: self.target.clone(),
start_time: self.start_time.clone(),
end_time: self.end_time.clone(),
parallelism: self.table_parallelism,
s3: self.s3,
ddl_local_dir: self.ddl_local_dir.clone(),
s3_bucket: self.s3_bucket.clone(),
@@ -251,10 +260,11 @@ pub struct Export {
schema: Option<String>,
database_client: DatabaseClient,
output_dir: Option<String>,
parallelism: usize,
export_jobs: usize,
target: ExportTarget,
start_time: Option<String>,
end_time: Option<String>,
parallelism: usize,
s3: bool,
ddl_local_dir: Option<String>,
s3_bucket: Option<String>,
@@ -464,7 +474,7 @@ impl Export {
async fn export_create_table(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let semaphore = Arc::new(Semaphore::new(self.export_jobs));
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let operator = Arc::new(self.build_prefer_fs_operator().await?);
@@ -625,13 +635,13 @@ impl Export {
async fn export_database_data(&self) -> Result<()> {
let timer = Instant::now();
let semaphore = Arc::new(Semaphore::new(self.parallelism));
let semaphore = Arc::new(Semaphore::new(self.export_jobs));
let db_names = self.get_db_names().await?;
let db_count = db_names.len();
let mut tasks = Vec::with_capacity(db_count);
let operator = Arc::new(self.build_operator().await?);
let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
let with_options = build_with_options(&self.start_time, &self.end_time);
let with_options = build_with_options(&self.start_time, &self.end_time, self.parallelism);
for schema in db_names {
let semaphore_moved = semaphore.clone();
@@ -888,7 +898,11 @@ impl Tool for Export {
}
/// Builds the WITH options string for SQL commands, assuming consistent syntax across S3 and local exports.
fn build_with_options(start_time: &Option<String>, end_time: &Option<String>) -> String {
fn build_with_options(
start_time: &Option<String>,
end_time: &Option<String>,
parallelism: usize,
) -> String {
let mut options = vec!["format = 'parquet'".to_string()];
if let Some(start) = start_time {
options.push(format!("start_time = '{}'", start));
@@ -896,5 +910,6 @@ fn build_with_options(start_time: &Option<String>, end_time: &Option<String>) ->
if let Some(end) = end_time {
options.push(format!("end_time = '{}'", end));
}
options.push(format!("parallelism = {}", parallelism));
options.join(", ")
}

View File

@@ -56,9 +56,11 @@ pub struct ImportCommand {
#[clap(long, default_value_t = default_database())]
database: String,
/// Parallelism of the import.
#[clap(long, short = 'j', default_value = "1")]
import_jobs: usize,
/// The number of databases imported in parallel.
/// For example, if there are 20 databases and `db_parallelism` is 4,
/// 4 databases will be imported concurrently.
#[clap(long, short = 'j', default_value = "1", alias = "import-jobs")]
db_parallelism: usize,
/// Max retry times for each job.
#[clap(long, default_value = "3")]
@@ -109,7 +111,7 @@ impl ImportCommand {
schema,
database_client,
input_dir: self.input_dir.clone(),
parallelism: self.import_jobs,
parallelism: self.db_parallelism,
target: self.target.clone(),
}))
}

View File

@@ -36,6 +36,7 @@ common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-sql.workspace = true
common-stat.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true

View File

@@ -12,14 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use client::{Output, OutputData, OutputMeta};
use common_catalog::format_full_table_name;
use common_datasource::file_format::Format;
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::build_backend;
use common_stat::get_total_cpu_cores;
use common_telemetry::{debug, error, info, tracing};
use futures::future::try_join_all;
use object_store::Entry;
use regex::Regex;
use session::context::QueryContextRef;
@@ -27,6 +32,7 @@ use snafu::{OptionExt, ResultExt, ensure};
use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME};
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
use table::table_reference::TableReference;
use tokio::sync::Semaphore;
use crate::error;
use crate::error::{CatalogSnafu, InvalidCopyDatabasePathSnafu};
@@ -35,6 +41,16 @@ use crate::statement::StatementExecutor;
pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time";
pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time";
pub(crate) const CONTINUE_ON_ERROR_KEY: &str = "continue_on_error";
pub(crate) const PARALLELISM_KEY: &str = "parallelism";
/// Get parallelism from options, default to total CPU cores.
fn parse_parallelism_from_option_map(options: &HashMap<String, String>) -> usize {
options
.get(PARALLELISM_KEY)
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or_else(get_total_cpu_cores)
.max(1)
}
impl StatementExecutor {
#[tracing::instrument(skip_all)]
@@ -51,22 +67,26 @@ impl StatementExecutor {
}
);
let parallelism = parse_parallelism_from_option_map(&req.with);
info!(
"Copy database {}.{} to dir: {}, time: {:?}",
req.catalog_name, req.schema_name, req.location, req.time_range
"Copy database {}.{} to dir: {}, time: {:?}, parallelism: {}",
req.catalog_name, req.schema_name, req.location, req.time_range, parallelism
);
let table_names = self
.catalog_manager
.table_names(&req.catalog_name, &req.schema_name, Some(&ctx))
.await
.context(CatalogSnafu)?;
let num_tables = table_names.len();
let suffix = Format::try_from(&req.with)
.context(error::ParseFileFormatSnafu)?
.suffix();
let mut exported_rows = 0;
for table_name in table_names {
let mut tasks = Vec::with_capacity(num_tables);
let semaphore = Arc::new(Semaphore::new(parallelism));
for (i, table_name) in table_names.into_iter().enumerate() {
let table = self
.get_table(&TableReference {
catalog: &req.catalog_name,
@@ -89,33 +109,40 @@ impl StatementExecutor {
{
continue;
}
let semaphore_moved = semaphore.clone();
let mut table_file = req.location.clone();
table_file.push_str(&table_name);
table_file.push_str(suffix);
info!(
"Copy table: {}.{}.{} to {}",
req.catalog_name, req.schema_name, table_name, table_file
);
let table_no = i + 1;
let moved_ctx = ctx.clone();
let full_table_name =
format_full_table_name(&req.catalog_name, &req.schema_name, &table_name);
let copy_table_req = CopyTableRequest {
catalog_name: req.catalog_name.clone(),
schema_name: req.schema_name.clone(),
table_name,
location: table_file.clone(),
with: req.with.clone(),
connection: req.connection.clone(),
pattern: None,
direction: CopyDirection::Export,
timestamp_range: req.time_range,
limit: None,
};
let exported = self
.copy_table_to(
CopyTableRequest {
catalog_name: req.catalog_name.clone(),
schema_name: req.schema_name.clone(),
table_name,
location: table_file,
with: req.with.clone(),
connection: req.connection.clone(),
pattern: None,
direction: CopyDirection::Export,
timestamp_range: req.time_range,
limit: None,
},
ctx.clone(),
)
.await?;
exported_rows += exported;
tasks.push(async move {
let _permit = semaphore_moved.acquire().await.unwrap();
info!(
"Copy table({}/{}): {} to {}",
table_no, num_tables, full_table_name, table_file
);
self.copy_table_to(copy_table_req, moved_ctx).await
});
}
let results = try_join_all(tasks).await?;
let exported_rows = results.into_iter().sum();
Ok(Output::new_with_affected_rows(exported_rows))
}
@@ -134,9 +161,10 @@ impl StatementExecutor {
}
);
let parallelism = parse_parallelism_from_option_map(&req.with);
info!(
"Copy database {}.{} from dir: {}, time: {:?}",
req.catalog_name, req.schema_name, req.location, req.time_range
"Copy database {}.{} from dir: {}, time: {:?}, parallelism: {}",
req.catalog_name, req.schema_name, req.location, req.time_range, parallelism
);
let suffix = Format::try_from(&req.with)
.context(error::ParseFileFormatSnafu)?
@@ -150,8 +178,8 @@ impl StatementExecutor {
.and_then(|v| bool::from_str(v).ok())
.unwrap_or(false);
let mut rows_inserted = 0;
let mut insert_cost = 0;
let mut tasks = Vec::with_capacity(entries.len());
let semaphore = Arc::new(Semaphore::new(parallelism));
for e in entries {
let table_name = match parse_file_name_to_copy(&e) {
@@ -165,6 +193,7 @@ impl StatementExecutor {
}
}
};
let req = CopyTableRequest {
catalog_name: req.catalog_name.clone(),
schema_name: req.schema_name.clone(),
@@ -177,23 +206,36 @@ impl StatementExecutor {
timestamp_range: None,
limit: None,
};
debug!("Copy table, arg: {:?}", req);
match self.copy_table_from(req, ctx.clone()).await {
Ok(o) => {
let (rows, cost) = o.extract_rows_and_cost();
rows_inserted += rows;
insert_cost += cost;
}
Err(err) => {
if continue_on_error {
error!(err; "Failed to import file to table: {}", table_name);
continue;
} else {
return Err(err);
let moved_ctx = ctx.clone();
let moved_table_name = table_name.clone();
let moved_semaphore = semaphore.clone();
tasks.push(async move {
let _permit = moved_semaphore.acquire().await.unwrap();
debug!("Copy table, arg: {:?}", req);
match self.copy_table_from(req, moved_ctx).await {
Ok(o) => {
let (rows, cost) = o.extract_rows_and_cost();
Ok((rows, cost))
}
Err(err) => {
if continue_on_error {
error!(err; "Failed to import file to table: {}", moved_table_name);
Ok((0, 0))
} else {
Err(err)
}
}
}
}
});
}
let results = try_join_all(tasks).await?;
let (rows_inserted, insert_cost) = results
.into_iter()
.fold((0, 0), |(acc_rows, acc_cost), (rows, cost)| {
(acc_rows + rows, acc_cost + cost)
});
Ok(Output::new(
OutputData::AffectedRows(rows_inserted),
OutputMeta::new_with_cost(insert_cost),
@@ -229,15 +271,18 @@ async fn list_files_to_copy(req: &CopyDatabaseRequest, suffix: &str) -> error::R
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use common_stat::get_total_cpu_cores;
use object_store::ObjectStore;
use object_store::services::Fs;
use object_store::util::normalize_dir;
use path_slash::PathExt;
use table::requests::CopyDatabaseRequest;
use crate::statement::copy_database::{list_files_to_copy, parse_file_name_to_copy};
use crate::statement::copy_database::{
list_files_to_copy, parse_file_name_to_copy, parse_parallelism_from_option_map,
};
#[tokio::test]
async fn test_list_files_and_parse_table_name() {
@@ -276,4 +321,16 @@ mod tests {
listed
);
}
#[test]
fn test_parse_parallelism_from_option_map() {
let options = HashMap::new();
assert_eq!(
parse_parallelism_from_option_map(&options),
get_total_cpu_cores()
);
let options = HashMap::from([("parallelism".to_string(), "0".to_string())]);
assert_eq!(parse_parallelism_from_option_map(&options), 1);
}
}

View File

@@ -64,3 +64,149 @@ DROP TABLE demo;
Affected Rows: 0
CREATE TABLE cpu_metrics (
host STRING,
`usage` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
INSERT INTO cpu_metrics
VALUES
('host1', 66.6, 1655276557000),
('host2', 77.7, 1655276558000),
('host3', 88.8, 1655276559000);
Affected Rows: 3
CREATE TABLE memory_stats (
host STRING,
used DOUBLE,
`free` DOUBLE,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
INSERT INTO memory_stats
VALUES
('host1', 1024, 512, 1655276557000),
('host2', 2048, 1024, 1655276558000),
('host3', 4096, 2048, 1655276559000);
Affected Rows: 3
CREATE TABLE event_logs (
`id` INT,
`message` STRING,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
INSERT INTO event_logs
VALUES
(1, 'start', 1655276557000),
(2, 'processing', 1655276558000),
(3, 'finish', 1655276559000);
Affected Rows: 3
CREATE TABLE sensors (
sensor_id STRING,
temperature DOUBLE,
pressure INT,
ts TIMESTAMP TIME INDEX
);
Affected Rows: 0
INSERT INTO sensors
VALUES
('s1', 36.5, 1001, 1655276557000),
('s2', 37.2, 1003, 1655276558000),
('s3', 35.9, 998, 1655276559000);
Affected Rows: 3
COPY DATABASE public TO '${SQLNESS_HOME}/export_parallel/' WITH (format='parquet', parallelism=2);
Affected Rows: 12
DELETE FROM cpu_metrics;
Affected Rows: 3
DELETE FROM memory_stats;
Affected Rows: 3
DELETE FROM event_logs;
Affected Rows: 3
DELETE FROM sensors;
Affected Rows: 3
COPY DATABASE public FROM '${SQLNESS_HOME}/export_parallel/' WITH (parallelism=2);
Affected Rows: 12
SELECT * FROM cpu_metrics;
+-------+-------+---------------------+
| host | usage | ts |
+-------+-------+---------------------+
| host1 | 66.6 | 2022-06-15T07:02:37 |
| host2 | 77.7 | 2022-06-15T07:02:38 |
| host3 | 88.8 | 2022-06-15T07:02:39 |
+-------+-------+---------------------+
SELECT * FROM memory_stats;
+-------+--------+--------+---------------------+
| host | used | free | ts |
+-------+--------+--------+---------------------+
| host1 | 1024.0 | 512.0 | 2022-06-15T07:02:37 |
| host2 | 2048.0 | 1024.0 | 2022-06-15T07:02:38 |
| host3 | 4096.0 | 2048.0 | 2022-06-15T07:02:39 |
+-------+--------+--------+---------------------+
SELECT * FROM event_logs;
+----+------------+---------------------+
| id | message | ts |
+----+------------+---------------------+
| 1 | start | 2022-06-15T07:02:37 |
| 2 | processing | 2022-06-15T07:02:38 |
| 3 | finish | 2022-06-15T07:02:39 |
+----+------------+---------------------+
SELECT * FROM sensors;
+-----------+-------------+----------+---------------------+
| sensor_id | temperature | pressure | ts |
+-----------+-------------+----------+---------------------+
| s1 | 36.5 | 1001 | 2022-06-15T07:02:37 |
| s2 | 37.2 | 1003 | 2022-06-15T07:02:38 |
| s3 | 35.9 | 998 | 2022-06-15T07:02:39 |
+-----------+-------------+----------+---------------------+
DROP TABLE cpu_metrics;
Affected Rows: 0
DROP TABLE memory_stats;
Affected Rows: 0
DROP TABLE event_logs;
Affected Rows: 0
DROP TABLE sensors;
Affected Rows: 0

View File

@@ -25,3 +25,82 @@ DELETE FROM demo;
COPY DATABASE public FROM '${SQLNESS_HOME}/demo/export/parquet_range/' LIMIT 2;
DROP TABLE demo;
CREATE TABLE cpu_metrics (
host STRING,
`usage` DOUBLE,
ts TIMESTAMP TIME INDEX
);
INSERT INTO cpu_metrics
VALUES
('host1', 66.6, 1655276557000),
('host2', 77.7, 1655276558000),
('host3', 88.8, 1655276559000);
CREATE TABLE memory_stats (
host STRING,
used DOUBLE,
`free` DOUBLE,
ts TIMESTAMP TIME INDEX
);
INSERT INTO memory_stats
VALUES
('host1', 1024, 512, 1655276557000),
('host2', 2048, 1024, 1655276558000),
('host3', 4096, 2048, 1655276559000);
CREATE TABLE event_logs (
`id` INT,
`message` STRING,
ts TIMESTAMP TIME INDEX
);
INSERT INTO event_logs
VALUES
(1, 'start', 1655276557000),
(2, 'processing', 1655276558000),
(3, 'finish', 1655276559000);
CREATE TABLE sensors (
sensor_id STRING,
temperature DOUBLE,
pressure INT,
ts TIMESTAMP TIME INDEX
);
INSERT INTO sensors
VALUES
('s1', 36.5, 1001, 1655276557000),
('s2', 37.2, 1003, 1655276558000),
('s3', 35.9, 998, 1655276559000);
COPY DATABASE public TO '${SQLNESS_HOME}/export_parallel/' WITH (format='parquet', parallelism=2);
DELETE FROM cpu_metrics;
DELETE FROM memory_stats;
DELETE FROM event_logs;
DELETE FROM sensors;
COPY DATABASE public FROM '${SQLNESS_HOME}/export_parallel/' WITH (parallelism=2);
SELECT * FROM cpu_metrics;
SELECT * FROM memory_stats;
SELECT * FROM event_logs;
SELECT * FROM sensors;
DROP TABLE cpu_metrics;
DROP TABLE memory_stats;
DROP TABLE event_logs;
DROP TABLE sensors;