1use std::collections::HashSet;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::{Parser, ValueEnum};
21use common_error::ext::BoxedError;
22use common_telemetry::{debug, error, info};
23use object_store::ObjectStore;
24use serde_json::Value;
25use snafu::{OptionExt, ResultExt};
26use tokio::sync::Semaphore;
27use tokio::time::Instant;
28
29use crate::common::{ObjectStoreConfig, new_fs_object_store};
30use crate::data::storage_export::{
31 AzblobBackend, FsBackend, GcsBackend, OssBackend, S3Backend, StorageExport, StorageType,
32};
33use crate::data::{COPY_PATH_PLACEHOLDER, default_database};
34use crate::database::{DatabaseClient, parse_proxy_opts};
35use crate::error::{
36 EmptyResultSnafu, Error, OpenDalSnafu, OutputDirNotSetSnafu, Result, SchemaNotFoundSnafu,
37};
38use crate::{Tool, database};
39
40type TableReference = (String, String, String);
41
42#[derive(Debug, Default, Clone, ValueEnum)]
43enum ExportTarget {
44 Schema,
46 Data,
48 #[default]
50 All,
51}
52
53#[derive(Debug, Default, Parser)]
55pub struct ExportCommand {
56 #[clap(long)]
58 addr: String,
59
60 #[clap(long)]
63 output_dir: Option<String>,
64
65 #[clap(long, default_value_t = default_database())]
67 database: String,
68
69 #[clap(long, short = 'j', default_value = "1", alias = "export-jobs")]
73 db_parallelism: usize,
74
75 #[clap(long, default_value = "4")]
79 table_parallelism: usize,
80
81 #[clap(long, default_value = "3")]
83 max_retry: usize,
84
85 #[clap(long, short = 't', value_enum, default_value = "all")]
87 target: ExportTarget,
88
89 #[clap(long)]
92 start_time: Option<String>,
93
94 #[clap(long)]
97 end_time: Option<String>,
98
99 #[clap(long)]
101 auth_basic: Option<String>,
102
103 #[clap(long, value_parser = humantime::parse_duration)]
108 timeout: Option<Duration>,
109
110 #[clap(long)]
115 proxy: Option<String>,
116
117 #[clap(long)]
121 no_proxy: bool,
122
123 #[clap(long)]
131 ddl_local_dir: Option<String>,
132
133 #[clap(flatten)]
134 storage: ObjectStoreConfig,
135}
136
137impl ExportCommand {
138 pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
139 let (storage_type, operator) = if self.storage.enable_s3 {
141 (
142 StorageType::S3(S3Backend::new(self.storage.s3.clone())?),
143 self.storage.build_s3()?,
144 )
145 } else if self.storage.enable_oss {
146 (
147 StorageType::Oss(OssBackend::new(self.storage.oss.clone())?),
148 self.storage.build_oss()?,
149 )
150 } else if self.storage.enable_gcs {
151 (
152 StorageType::Gcs(GcsBackend::new(self.storage.gcs.clone())?),
153 self.storage.build_gcs()?,
154 )
155 } else if self.storage.enable_azblob {
156 (
157 StorageType::Azblob(AzblobBackend::new(self.storage.azblob.clone())?),
158 self.storage.build_azblob()?,
159 )
160 } else if let Some(output_dir) = &self.output_dir {
161 (
162 StorageType::Fs(FsBackend::new(output_dir.clone())),
163 new_fs_object_store(output_dir)?,
164 )
165 } else {
166 return Err(BoxedError::new(OutputDirNotSetSnafu {}.build()));
167 };
168
169 let (catalog, schema) =
170 database::split_database(&self.database).map_err(BoxedError::new)?;
171 let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
172 let database_client = DatabaseClient::new(
173 self.addr.clone(),
174 catalog.clone(),
175 self.auth_basic.clone(),
176 self.timeout.unwrap_or_default(),
178 proxy,
179 self.no_proxy,
180 );
181
182 Ok(Box::new(Export {
183 catalog,
184 schema,
185 database_client,
186 export_jobs: self.db_parallelism,
187 target: self.target.clone(),
188 start_time: self.start_time.clone(),
189 end_time: self.end_time.clone(),
190 parallelism: self.table_parallelism,
191 storage_type,
192 ddl_local_dir: self.ddl_local_dir.clone(),
193 operator,
194 }))
195 }
196}
197
198#[derive(Clone)]
199pub struct Export {
200 catalog: String,
201 schema: Option<String>,
202 database_client: DatabaseClient,
203 export_jobs: usize,
204 target: ExportTarget,
205 start_time: Option<String>,
206 end_time: Option<String>,
207 parallelism: usize,
208 storage_type: StorageType,
209 ddl_local_dir: Option<String>,
210 operator: ObjectStore,
211}
212
213impl Export {
214 async fn get_db_names(&self) -> Result<Vec<String>> {
215 let db_names = self.all_db_names().await?;
216 let Some(schema) = &self.schema else {
217 return Ok(db_names);
218 };
219
220 db_names
222 .into_iter()
223 .find(|db_name| db_name.to_lowercase() == schema.to_lowercase())
224 .map(|name| vec![name])
225 .context(SchemaNotFoundSnafu {
226 catalog: &self.catalog,
227 schema,
228 })
229 }
230
231 async fn all_db_names(&self) -> Result<Vec<String>> {
233 let records = self
234 .database_client
235 .sql_in_public("SHOW DATABASES")
236 .await?
237 .context(EmptyResultSnafu)?;
238 let mut result = Vec::with_capacity(records.len());
239 for value in records {
240 let Value::String(schema) = &value[0] else {
241 unreachable!()
242 };
243 if schema == common_catalog::consts::INFORMATION_SCHEMA_NAME {
244 continue;
245 }
246 if schema == common_catalog::consts::PG_CATALOG_NAME {
247 continue;
248 }
249 result.push(schema.clone());
250 }
251 Ok(result)
252 }
253
254 async fn get_table_list(
257 &self,
258 catalog: &str,
259 schema: &str,
260 ) -> Result<(
261 Vec<TableReference>,
262 Vec<TableReference>,
263 Vec<TableReference>,
264 )> {
265 let sql = format!(
267 "SELECT table_catalog, table_schema, table_name \
268 FROM information_schema.columns \
269 WHERE column_name = '__tsid' \
270 and table_catalog = \'{catalog}\' \
271 and table_schema = \'{schema}\'"
272 );
273 let records = self
274 .database_client
275 .sql_in_public(&sql)
276 .await?
277 .context(EmptyResultSnafu)?;
278 let mut metric_physical_tables = HashSet::with_capacity(records.len());
279 for value in records {
280 let mut t = Vec::with_capacity(3);
281 for v in &value {
282 let Value::String(value) = v else {
283 unreachable!()
284 };
285 t.push(value);
286 }
287 metric_physical_tables.insert((t[0].clone(), t[1].clone(), t[2].clone()));
288 }
289
290 let sql = format!(
291 "SELECT table_catalog, table_schema, table_name, table_type \
292 FROM information_schema.tables \
293 WHERE (table_type = \'BASE TABLE\' OR table_type = \'VIEW\') \
294 and table_catalog = \'{catalog}\' \
295 and table_schema = \'{schema}\'",
296 );
297 let records = self
298 .database_client
299 .sql_in_public(&sql)
300 .await?
301 .context(EmptyResultSnafu)?;
302
303 debug!("Fetched table/view list: {:?}", records);
304
305 if records.is_empty() {
306 return Ok((vec![], vec![], vec![]));
307 }
308
309 let mut remaining_tables = Vec::with_capacity(records.len());
310 let mut views = Vec::new();
311 for value in records {
312 let mut t = Vec::with_capacity(4);
313 for v in &value {
314 let Value::String(value) = v else {
315 unreachable!()
316 };
317 t.push(value);
318 }
319 let table = (t[0].clone(), t[1].clone(), t[2].clone());
320 let table_type = t[3].as_str();
321 if !metric_physical_tables.contains(&table) {
323 if table_type == "VIEW" {
324 views.push(table);
325 } else {
326 remaining_tables.push(table);
327 }
328 }
329 }
330
331 Ok((
332 metric_physical_tables.into_iter().collect(),
333 remaining_tables,
334 views,
335 ))
336 }
337
338 async fn show_create(
339 &self,
340 show_type: &str,
341 catalog: &str,
342 schema: &str,
343 table: Option<&str>,
344 ) -> Result<String> {
345 let sql = match table {
346 Some(table) => format!(
347 r#"SHOW CREATE {} "{}"."{}"."{}""#,
348 show_type, catalog, schema, table
349 ),
350 None => format!(r#"SHOW CREATE {} "{}"."{}""#, show_type, catalog, schema),
351 };
352 let records = self
353 .database_client
354 .sql_in_public(&sql)
355 .await?
356 .context(EmptyResultSnafu)?;
357 let Value::String(create) = &records[0][1] else {
358 unreachable!()
359 };
360
361 Ok(format!("{};\n", create))
362 }
363
364 async fn export_create_database(&self) -> Result<()> {
365 let timer = Instant::now();
366 let db_names = self.get_db_names().await?;
367 let db_count = db_names.len();
368 let operator = self.build_prefer_fs_operator().await?;
369
370 for schema in db_names {
371 let create_database = self
372 .show_create("DATABASE", &self.catalog, &schema, None)
373 .await?;
374
375 let file_path = self.get_file_path(&schema, "create_database.sql");
376 self.write_to_storage(&operator, &file_path, create_database.into_bytes())
377 .await?;
378
379 info!(
380 "Exported {}.{} database creation SQL to {}",
381 self.catalog,
382 schema,
383 self.storage_type.format_output_path(&file_path)
384 );
385 }
386
387 let elapsed = timer.elapsed();
388 info!("Success {db_count} jobs, cost: {elapsed:?}");
389
390 Ok(())
391 }
392
393 async fn export_create_table(&self) -> Result<()> {
394 let timer = Instant::now();
395 let semaphore = Arc::new(Semaphore::new(self.export_jobs));
396 let db_names = self.get_db_names().await?;
397 let db_count = db_names.len();
398 let operator = Arc::new(self.build_prefer_fs_operator().await?);
399 let mut tasks = Vec::with_capacity(db_names.len());
400
401 for schema in db_names {
402 let semaphore_moved = semaphore.clone();
403 let export_self = self.clone();
404 let operator = operator.clone();
405 tasks.push(async move {
406 let _permit = semaphore_moved.acquire().await.unwrap();
407 let (metric_physical_tables, remaining_tables, views) = export_self
408 .get_table_list(&export_self.catalog, &schema)
409 .await?;
410
411 if !export_self.storage_type.is_remote_storage() {
413 let db_dir = format!("{}/{}/", export_self.catalog, schema);
414 operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
415 }
416
417 let file_path = export_self.get_file_path(&schema, "create_tables.sql");
418 let mut content = Vec::new();
419
420 for (c, s, t) in metric_physical_tables.iter().chain(&remaining_tables) {
422 let create_table = export_self.show_create("TABLE", c, s, Some(t)).await?;
423 content.extend_from_slice(create_table.as_bytes());
424 }
425
426 for (c, s, v) in &views {
428 let create_view = export_self.show_create("VIEW", c, s, Some(v)).await?;
429 content.extend_from_slice(create_view.as_bytes());
430 }
431
432 export_self
434 .write_to_storage(&operator, &file_path, content)
435 .await?;
436
437 info!(
438 "Finished exporting {}.{schema} with {} table schemas to path: {}",
439 export_self.catalog,
440 metric_physical_tables.len() + remaining_tables.len() + views.len(),
441 export_self.storage_type.format_output_path(&file_path)
442 );
443
444 Ok::<(), Error>(())
445 });
446 }
447
448 let success = self.execute_tasks(tasks).await;
449 let elapsed = timer.elapsed();
450 info!("Success {success}/{db_count} jobs, cost: {elapsed:?}");
451
452 Ok(())
453 }
454
455 async fn build_operator(&self) -> Result<ObjectStore> {
456 Ok(self.operator.clone())
457 }
458
459 async fn build_prefer_fs_operator(&self) -> Result<ObjectStore> {
461 if self.storage_type.is_remote_storage() && self.ddl_local_dir.is_some() {
462 let root = self.ddl_local_dir.as_ref().unwrap().clone();
463 let op = new_fs_object_store(&root).map_err(|e| Error::Other {
464 source: e,
465 location: snafu::location!(),
466 })?;
467 Ok(op)
468 } else {
469 Ok(self.operator.clone())
470 }
471 }
472
473 async fn export_database_data(&self) -> Result<()> {
474 let timer = Instant::now();
475 let semaphore = Arc::new(Semaphore::new(self.export_jobs));
476 let db_names = self.get_db_names().await?;
477 let db_count = db_names.len();
478 let mut tasks = Vec::with_capacity(db_count);
479 let operator = Arc::new(self.build_operator().await?);
480 let fs_first_operator = Arc::new(self.build_prefer_fs_operator().await?);
481 let with_options = build_with_options(&self.start_time, &self.end_time, self.parallelism);
482
483 for schema in db_names {
484 let semaphore_moved = semaphore.clone();
485 let export_self = self.clone();
486 let with_options_clone = with_options.clone();
487 let operator = operator.clone();
488 let fs_first_operator = fs_first_operator.clone();
489
490 tasks.push(async move {
491 let _permit = semaphore_moved.acquire().await.unwrap();
492
493 if !export_self.storage_type.is_remote_storage() {
495 let db_dir = format!("{}/{}/", export_self.catalog, schema);
496 operator.create_dir(&db_dir).await.context(OpenDalSnafu)?;
497 }
498
499 let (path, connection_part) = export_self
500 .storage_type
501 .get_storage_path(&export_self.catalog, &schema);
502
503 let sql = format!(
505 r#"COPY DATABASE "{}"."{}" TO '{}' WITH ({}){};"#,
506 export_self.catalog, schema, path, with_options_clone, connection_part
507 );
508
509 let safe_sql = export_self.storage_type.mask_sensitive_info(&sql);
511 info!("Executing sql: {}", safe_sql);
512
513 export_self.database_client.sql_in_public(&sql).await?;
514 info!(
515 "Finished exporting {}.{} data to {}",
516 export_self.catalog, schema, path
517 );
518
519 let copy_database_from_sql = {
521 let command_without_connection = format!(
522 r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({});"#,
523 export_self.catalog, schema, COPY_PATH_PLACEHOLDER, with_options_clone
524 );
525
526 if connection_part.is_empty() {
527 command_without_connection
528 } else {
529 let command_with_connection = format!(
530 r#"COPY DATABASE "{}"."{}" FROM '{}' WITH ({}){};"#,
531 export_self.catalog, schema, path, with_options_clone, connection_part
532 );
533
534 format!(
535 "-- {}\n{}",
536 command_with_connection, command_without_connection
537 )
538 }
539 };
540
541 let copy_from_path = export_self.get_file_path(&schema, "copy_from.sql");
542 export_self
543 .write_to_storage(
544 &fs_first_operator,
545 ©_from_path,
546 copy_database_from_sql.into_bytes(),
547 )
548 .await?;
549
550 info!(
551 "Finished exporting {}.{} copy_from.sql to {}",
552 export_self.catalog,
553 schema,
554 export_self.storage_type.format_output_path(©_from_path)
555 );
556
557 Ok::<(), Error>(())
558 });
559 }
560
561 let success = self.execute_tasks(tasks).await;
562 let elapsed = timer.elapsed();
563 info!("Success {success}/{db_count} jobs, costs: {elapsed:?}");
564
565 Ok(())
566 }
567
568 fn get_file_path(&self, schema: &str, file_name: &str) -> String {
569 format!("{}/{}/{}", self.catalog, schema, file_name)
570 }
571
572 async fn write_to_storage(
573 &self,
574 op: &ObjectStore,
575 file_path: &str,
576 content: Vec<u8>,
577 ) -> Result<()> {
578 op.write(file_path, content)
579 .await
580 .context(OpenDalSnafu)
581 .map(|_| ())
582 }
583
584 async fn execute_tasks(
585 &self,
586 tasks: Vec<impl std::future::Future<Output = Result<()>>>,
587 ) -> usize {
588 futures::future::join_all(tasks)
589 .await
590 .into_iter()
591 .filter(|r| match r {
592 Ok(_) => true,
593 Err(e) => {
594 error!(e; "export job failed");
595 false
596 }
597 })
598 .count()
599 }
600}
601
602#[async_trait]
603impl Tool for Export {
604 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
605 match self.target {
606 ExportTarget::Schema => {
607 self.export_create_database()
608 .await
609 .map_err(BoxedError::new)?;
610 self.export_create_table().await.map_err(BoxedError::new)
611 }
612 ExportTarget::Data => self.export_database_data().await.map_err(BoxedError::new),
613 ExportTarget::All => {
614 self.export_create_database()
615 .await
616 .map_err(BoxedError::new)?;
617 self.export_create_table().await.map_err(BoxedError::new)?;
618 self.export_database_data().await.map_err(BoxedError::new)
619 }
620 }
621 }
622}
623
624fn build_with_options(
626 start_time: &Option<String>,
627 end_time: &Option<String>,
628 parallelism: usize,
629) -> String {
630 let mut options = vec!["format = 'parquet'".to_string()];
631 if let Some(start) = start_time {
632 options.push(format!("start_time = '{}'", start));
633 }
634 if let Some(end) = end_time {
635 options.push(format!("end_time = '{}'", end));
636 }
637 options.push(format!("parallelism = {}", parallelism));
638 options.join(", ")
639}
640
641#[cfg(test)]
642mod tests {
643 use clap::Parser;
644 use common_test_util::temp_dir::create_temp_dir;
645
646 use super::*;
647
648 const MOCK_AZBLOB_ACCOUNT_KEY_B64: &str = "dGVzdC1rZXk=";
649
650 #[tokio::test]
653 async fn test_export_command_build_with_local_fs() {
654 let temp_dir = create_temp_dir("test_export_local_fs");
655 let output_dir = temp_dir.path().to_str().unwrap();
656
657 let cmd = ExportCommand::parse_from([
658 "export",
659 "--addr",
660 "127.0.0.1:4000",
661 "--output-dir",
662 output_dir,
663 ]);
664
665 let result = cmd.build().await;
666 assert!(result.is_ok());
667 }
668
669 #[tokio::test]
670 async fn test_export_command_build_with_s3_success() {
671 let cmd = ExportCommand::parse_from([
672 "export",
673 "--addr",
674 "127.0.0.1:4000",
675 "--s3",
676 "--s3-bucket",
677 "test-bucket",
678 "--s3-root",
679 "test-root",
680 "--s3-access-key-id",
681 "test-key",
682 "--s3-secret-access-key",
683 "test-secret",
684 "--s3-region",
686 "us-west-2",
687 "--s3-endpoint",
688 "https://s3.amazonaws.com",
689 ]);
690
691 let result = cmd.build().await;
692 assert!(result.is_ok());
693 }
694
695 #[tokio::test]
696 async fn test_export_command_build_with_oss_success() {
697 let cmd = ExportCommand::parse_from([
698 "export",
699 "--addr",
700 "127.0.0.1:4000",
701 "--oss",
702 "--oss-bucket",
703 "test-bucket",
704 "--oss-root",
705 "test-root",
706 "--oss-access-key-id",
707 "test-key-id",
708 "--oss-access-key-secret",
709 "test-secret",
710 "--oss-endpoint",
711 "https://oss.example.com",
712 ]);
713
714 let result = cmd.build().await;
715 assert!(result.is_ok());
716 }
717
718 #[tokio::test]
719 async fn test_export_command_build_with_gcs_success() {
720 let cmd = ExportCommand::parse_from([
721 "export",
722 "--addr",
723 "127.0.0.1:4000",
724 "--gcs",
725 "--gcs-bucket",
726 "test-bucket",
727 "--gcs-root",
728 "test-root",
729 "--gcs-scope",
730 "test-scope",
731 "--gcs-credential",
732 "test-credential-content",
733 "--gcs-endpoint",
734 "https://storage.googleapis.com",
735 ]);
736
737 let result = cmd.build().await;
738 assert!(result.is_ok());
739 }
740
741 #[tokio::test]
742 async fn test_export_command_build_with_gcs_adc_success() {
743 let cmd = ExportCommand::parse_from([
745 "export",
746 "--addr",
747 "127.0.0.1:4000",
748 "--gcs",
749 "--gcs-bucket",
750 "test-bucket",
751 "--gcs-root",
752 "test-root",
753 "--gcs-scope",
754 "test-scope",
755 ]);
758
759 let result = cmd.build().await;
760 assert!(result.is_ok());
761 }
762
763 #[tokio::test]
764 async fn test_export_command_build_with_azblob_success() {
765 let cmd = ExportCommand::parse_from([
766 "export",
767 "--addr",
768 "127.0.0.1:4000",
769 "--azblob",
770 "--azblob-container",
771 "test-container",
772 "--azblob-root",
773 "test-root",
774 "--azblob-account-name",
775 "test-account",
776 "--azblob-account-key",
777 MOCK_AZBLOB_ACCOUNT_KEY_B64,
778 "--azblob-endpoint",
779 "https://account.blob.core.windows.net",
780 ]);
781
782 let result = cmd.build().await;
783 assert!(result.is_ok());
784 }
785
786 #[tokio::test]
787 async fn test_export_command_build_with_azblob_with_sas_token() {
788 let cmd = ExportCommand::parse_from([
790 "export",
791 "--addr",
792 "127.0.0.1:4000",
793 "--azblob",
794 "--azblob-container",
795 "test-container",
796 "--azblob-root",
797 "test-root",
798 "--azblob-account-name",
799 "test-account",
800 "--azblob-account-key",
801 MOCK_AZBLOB_ACCOUNT_KEY_B64,
802 "--azblob-endpoint",
803 "https://account.blob.core.windows.net",
804 "--azblob-sas-token",
805 "test-sas-token",
806 ]);
807
808 let result = cmd.build().await;
809 assert!(result.is_ok());
810 }
811
812 #[test]
815 fn test_export_command_build_with_conflict() {
816 let result =
818 ExportCommand::try_parse_from(["export", "--addr", "127.0.0.1:4000", "--s3", "--oss"]);
819
820 assert!(result.is_err());
821 let err = result.unwrap_err();
822 assert!(err.kind() == clap::error::ErrorKind::ArgumentConflict);
824 }
825
826 #[tokio::test]
827 async fn test_export_command_build_with_s3_no_enable_flag() {
828 let result = ExportCommand::try_parse_from([
830 "export",
831 "--addr",
832 "127.0.0.1:4000",
833 "--s3-bucket",
835 "test-bucket",
836 "--s3-access-key-id",
837 "test-key",
838 "--output-dir",
839 "/tmp/test",
840 ]);
841
842 assert!(result.is_err());
843 let err = result.unwrap_err();
844 assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
845 assert!(err.to_string().contains("--s3"));
846 }
847
848 #[tokio::test]
849 async fn test_export_command_build_with_oss_no_enable_flag() {
850 let result = ExportCommand::try_parse_from([
852 "export",
853 "--addr",
854 "127.0.0.1:4000",
855 "--oss-bucket",
856 "test-bucket",
857 "--output-dir",
858 "/tmp/test",
859 ]);
860
861 assert!(result.is_err());
862 let err = result.unwrap_err();
863 assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
864 assert!(err.to_string().contains("--oss"));
865 }
866
867 #[tokio::test]
868 async fn test_export_command_build_with_gcs_no_enable_flag() {
869 let result = ExportCommand::try_parse_from([
871 "export",
872 "--addr",
873 "127.0.0.1:4000",
874 "--gcs-bucket",
875 "test-bucket",
876 "--output-dir",
877 "/tmp/test",
878 ]);
879
880 assert!(result.is_err());
881 let err = result.unwrap_err();
882 assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
883 assert!(err.to_string().contains("--gcs"));
884 }
885
886 #[tokio::test]
887 async fn test_export_command_build_with_azblob_no_enable_flag() {
888 let result = ExportCommand::try_parse_from([
890 "export",
891 "--addr",
892 "127.0.0.1:4000",
893 "--azblob-container",
894 "test-container",
895 "--output-dir",
896 "/tmp/test",
897 ]);
898
899 assert!(result.is_err());
900 let err = result.unwrap_err();
901 assert_eq!(err.kind(), clap::error::ErrorKind::MissingRequiredArgument);
902 assert!(err.to_string().contains("--azblob"));
903 }
904
905 #[tokio::test]
908 async fn test_export_command_build_with_s3_empty_root() {
909 let cmd = ExportCommand::parse_from([
911 "export",
912 "--addr",
913 "127.0.0.1:4000",
914 "--s3",
915 "--s3-bucket",
916 "test-bucket",
917 "--s3-root",
918 "", "--s3-access-key-id",
920 "test-key",
921 "--s3-secret-access-key",
922 "test-secret",
923 "--s3-region",
924 "us-west-2",
925 ]);
926
927 let result = cmd.build().await;
928 assert!(
930 result.is_ok(),
931 "Expected success but got: {:?}",
932 result.err()
933 );
934 }
935
936 #[tokio::test]
937 async fn test_export_command_build_with_oss_empty_access_key_id() {
938 let cmd = ExportCommand::parse_from([
940 "export",
941 "--addr",
942 "127.0.0.1:4000",
943 "--oss",
944 "--oss-bucket",
945 "test-bucket",
946 "--oss-access-key-id",
947 "", "--oss-access-key-secret",
949 "test-secret",
950 "--oss-endpoint",
951 "https://oss.example.com",
952 ]);
953
954 let result = cmd.build().await;
955 assert!(result.is_err());
956 if let Err(err) = result {
957 assert!(
958 err.to_string().contains("OSS access key ID must be set"),
959 "Actual error: {}",
960 err
961 );
962 }
963 }
964
965 #[tokio::test]
966 async fn test_export_command_build_with_oss_missing_endpoint() {
967 let cmd = ExportCommand::parse_from([
969 "export",
970 "--addr",
971 "127.0.0.1:4000",
972 "--oss",
973 "--oss-bucket",
974 "test-bucket",
975 "--oss-root",
976 "test-root",
977 "--oss-access-key-id",
978 "test-key-id",
979 "--oss-access-key-secret",
980 "test-secret",
981 ]);
982
983 let result = cmd.build().await;
984 assert!(result.is_err());
985 if let Err(err) = result {
986 assert!(
987 err.to_string().contains("OSS endpoint must be set"),
988 "Actual error: {}",
989 err
990 );
991 }
992 }
993
994 #[tokio::test]
995 async fn test_export_command_build_with_oss_multiple_missing_fields() {
996 let cmd = ExportCommand::parse_from([
998 "export",
999 "--addr",
1000 "127.0.0.1:4000",
1001 "--oss",
1002 "--oss-bucket",
1003 "test-bucket",
1004 ]);
1006
1007 let result = cmd.build().await;
1008 assert!(result.is_err());
1009 if let Err(err) = result {
1010 let err_str = err.to_string();
1011 assert!(
1013 err_str.contains("OSS"),
1014 "Error should mention OSS: {}",
1015 err_str
1016 );
1017 assert!(
1018 err_str.contains("must be set"),
1019 "Error should mention required fields: {}",
1020 err_str
1021 );
1022 }
1023 }
1024
1025 #[tokio::test]
1026 async fn test_export_command_build_with_gcs_empty_bucket() {
1027 let cmd = ExportCommand::parse_from([
1029 "export",
1030 "--addr",
1031 "127.0.0.1:4000",
1032 "--gcs",
1033 "--gcs-bucket",
1034 "", "--gcs-root",
1036 "test-root",
1037 "--gcs-scope",
1038 "test-scope",
1039 ]);
1040
1041 let result = cmd.build().await;
1042 assert!(result.is_err());
1043 if let Err(err) = result {
1044 assert!(
1045 err.to_string().contains("GCS bucket must be set"),
1046 "Actual error: {}",
1047 err
1048 );
1049 }
1050 }
1051
1052 #[tokio::test]
1053 async fn test_export_command_build_with_gcs_empty_root() {
1054 let cmd = ExportCommand::parse_from([
1056 "export",
1057 "--addr",
1058 "127.0.0.1:4000",
1059 "--gcs",
1060 "--gcs-bucket",
1061 "test-bucket",
1062 "--gcs-root",
1063 "", "--gcs-scope",
1065 "test-scope",
1066 "--gcs-credential",
1067 "test-credential",
1068 "--gcs-endpoint",
1069 "https://storage.googleapis.com",
1070 ]);
1071
1072 let result = cmd.build().await;
1073 assert!(result.is_err());
1074 if let Err(err) = result {
1075 assert!(
1076 err.to_string().contains("GCS root must be set"),
1077 "Actual error: {}",
1078 err
1079 );
1080 }
1081 }
1082
1083 #[tokio::test]
1084 async fn test_export_command_build_with_azblob_empty_account_name() {
1085 let cmd = ExportCommand::parse_from([
1087 "export",
1088 "--addr",
1089 "127.0.0.1:4000",
1090 "--azblob",
1091 "--azblob-container",
1092 "test-container",
1093 "--azblob-root",
1094 "test-root",
1095 "--azblob-account-name",
1096 "", "--azblob-account-key",
1098 MOCK_AZBLOB_ACCOUNT_KEY_B64,
1099 "--azblob-endpoint",
1100 "https://account.blob.core.windows.net",
1101 ]);
1102
1103 let result = cmd.build().await;
1104 assert!(result.is_err());
1105 if let Err(err) = result {
1106 assert!(
1107 err.to_string().contains("AzBlob account name must be set"),
1108 "Actual error: {}",
1109 err
1110 );
1111 }
1112 }
1113
1114 #[tokio::test]
1115 async fn test_export_command_build_with_azblob_missing_account_key() {
1116 let cmd = ExportCommand::parse_from([
1118 "export",
1119 "--addr",
1120 "127.0.0.1:4000",
1121 "--azblob",
1122 "--azblob-container",
1123 "test-container",
1124 "--azblob-root",
1125 "test-root",
1126 "--azblob-account-name",
1127 "test-account",
1128 "--azblob-endpoint",
1129 "https://account.blob.core.windows.net",
1130 ]);
1131
1132 let result = cmd.build().await;
1133 assert!(result.is_err());
1134 if let Err(err) = result {
1135 assert!(
1136 err.to_string()
1137 .contains("AzBlob account key (when sas_token is not provided) must be set"),
1138 "Actual error: {}",
1139 err
1140 );
1141 }
1142 }
1143
1144 #[tokio::test]
1147 async fn test_export_command_build_with_no_storage() {
1148 let cmd = ExportCommand::parse_from(["export", "--addr", "127.0.0.1:4000"]);
1150
1151 let result = cmd.build().await;
1152 assert!(result.is_err());
1153 if let Err(err) = result {
1154 assert!(
1155 err.to_string().contains("Output directory not set"),
1156 "Actual error: {}",
1157 err
1158 );
1159 }
1160 }
1161
1162 #[tokio::test]
1163 async fn test_export_command_build_with_s3_minimal_config() {
1164 let cmd = ExportCommand::parse_from([
1166 "export",
1167 "--addr",
1168 "127.0.0.1:4000",
1169 "--s3",
1170 "--s3-bucket",
1171 "test-bucket",
1172 "--s3-access-key-id",
1173 "test-key",
1174 "--s3-secret-access-key",
1175 "test-secret",
1176 "--s3-region",
1177 "us-west-2",
1178 ]);
1180
1181 let result = cmd.build().await;
1182 assert!(result.is_ok(), "Minimal S3 config should succeed");
1183 }
1184
1185 #[tokio::test]
1186 async fn test_export_command_build_with_oss_minimal_config() {
1187 let cmd = ExportCommand::parse_from([
1189 "export",
1190 "--addr",
1191 "127.0.0.1:4000",
1192 "--oss",
1193 "--oss-bucket",
1194 "test-bucket",
1195 "--oss-access-key-id",
1196 "test-key-id",
1197 "--oss-access-key-secret",
1198 "test-secret",
1199 "--oss-endpoint",
1200 "https://oss.example.com",
1201 ]);
1203
1204 let result = cmd.build().await;
1205 assert!(result.is_ok(), "Minimal OSS config should succeed");
1206 }
1207
1208 #[tokio::test]
1209 async fn test_export_command_build_with_gcs_minimal_config() {
1210 let cmd = ExportCommand::parse_from([
1212 "export",
1213 "--addr",
1214 "127.0.0.1:4000",
1215 "--gcs",
1216 "--gcs-bucket",
1217 "test-bucket",
1218 "--gcs-root",
1219 "test-root",
1220 "--gcs-scope",
1221 "test-scope",
1222 ]);
1224
1225 let result = cmd.build().await;
1226 assert!(result.is_ok(), "Minimal GCS config should succeed");
1227 }
1228
1229 #[tokio::test]
1230 async fn test_export_command_build_with_azblob_minimal_config() {
1231 let cmd = ExportCommand::parse_from([
1233 "export",
1234 "--addr",
1235 "127.0.0.1:4000",
1236 "--azblob",
1237 "--azblob-container",
1238 "test-container",
1239 "--azblob-root",
1240 "test-root",
1241 "--azblob-account-name",
1242 "test-account",
1243 "--azblob-account-key",
1244 MOCK_AZBLOB_ACCOUNT_KEY_B64,
1245 "--azblob-endpoint",
1246 "https://account.blob.core.windows.net",
1247 ]);
1249
1250 let result = cmd.build().await;
1251 assert!(result.is_ok(), "Minimal AzBlob config should succeed");
1252 }
1253
1254 #[tokio::test]
1255 async fn test_export_command_build_with_local_and_s3() {
1256 let temp_dir = create_temp_dir("test_export_local_and_s3");
1258 let output_dir = temp_dir.path().to_str().unwrap();
1259
1260 let cmd = ExportCommand::parse_from([
1261 "export",
1262 "--addr",
1263 "127.0.0.1:4000",
1264 "--output-dir",
1265 output_dir,
1266 "--s3",
1267 "--s3-bucket",
1268 "test-bucket",
1269 "--s3-access-key-id",
1270 "test-key",
1271 "--s3-secret-access-key",
1272 "test-secret",
1273 "--s3-region",
1274 "us-west-2",
1275 ]);
1276
1277 let result = cmd.build().await;
1278 assert!(
1279 result.is_ok(),
1280 "S3 should be selected when both are provided"
1281 );
1282 }
1283
1284 #[tokio::test]
1287 async fn test_export_command_build_with_azblob_only_sas_token() {
1288 let cmd = ExportCommand::parse_from([
1290 "export",
1291 "--addr",
1292 "127.0.0.1:4000",
1293 "--azblob",
1294 "--azblob-container",
1295 "test-container",
1296 "--azblob-root",
1297 "test-root",
1298 "--azblob-account-name",
1299 "test-account",
1300 "--azblob-endpoint",
1301 "https://account.blob.core.windows.net",
1302 "--azblob-sas-token",
1303 "test-sas-token",
1304 ]);
1306
1307 let result = cmd.build().await;
1308 assert!(
1309 result.is_ok(),
1310 "AzBlob with only sas_token should succeed: {:?}",
1311 result.err()
1312 );
1313 }
1314
1315 #[tokio::test]
1316 async fn test_export_command_build_with_azblob_empty_account_key_with_sas() {
1317 let cmd = ExportCommand::parse_from([
1319 "export",
1320 "--addr",
1321 "127.0.0.1:4000",
1322 "--azblob",
1323 "--azblob-container",
1324 "test-container",
1325 "--azblob-root",
1326 "test-root",
1327 "--azblob-account-name",
1328 "test-account",
1329 "--azblob-account-key",
1330 "", "--azblob-endpoint",
1332 "https://account.blob.core.windows.net",
1333 "--azblob-sas-token",
1334 "test-sas-token",
1335 ]);
1336
1337 let result = cmd.build().await;
1338 assert!(
1339 result.is_ok(),
1340 "AzBlob with empty account_key but sas_token should succeed: {:?}",
1341 result.err()
1342 );
1343 }
1344}