1use std::collections::HashSet;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::Parser;
22use common_error::ext::BoxedError;
23use common_telemetry::info;
24use snafu::{OptionExt, ResultExt};
25
26use crate::Tool;
27use crate::common::ObjectStoreConfig;
28use crate::data::export_v2::data::{build_copy_source, execute_copy_database_from};
29use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus, DataFormat, MANIFEST_VERSION};
30use crate::data::import_v2::coordinator::{
31 ImportResumeConfig, ImportTaskExecutor, build_import_tasks, chunk_has_schema_files,
32 import_with_resume_session, prepare_import_resume,
33};
34use crate::data::import_v2::error::{
35 ChunkImportFailedSnafu, EmptyChunkManifestSnafu, ImportStatePathUnavailableSnafu,
36 IncompleteSnapshotSnafu, ManifestVersionMismatchSnafu, MissingChunkDataSnafu, Result,
37 SchemaNotInSnapshotSnafu, SnapshotStorageSnafu,
38};
39use crate::data::import_v2::executor::{DdlExecutor, DdlStatement};
40use crate::data::import_v2::state::{ImportTaskKey, default_state_path};
41use crate::data::path::{data_dir_for_schema_chunk, ddl_path_for_schema};
42use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
43use crate::database::{DatabaseClient, parse_proxy_opts};
44
45#[derive(Debug, Parser)]
47pub struct ImportV2Command {
48 #[clap(long)]
50 addr: String,
51
52 #[clap(long)]
54 from: String,
55
56 #[clap(long, default_value = "greptime")]
58 catalog: String,
59
60 #[clap(long, value_delimiter = ',')]
63 schemas: Vec<String>,
64
65 #[clap(long)]
67 dry_run: bool,
68
69 #[clap(long)]
71 auth_basic: Option<String>,
72
73 #[clap(long, value_parser = humantime::parse_duration)]
75 timeout: Option<Duration>,
76
77 #[clap(long)]
82 proxy: Option<String>,
83
84 #[clap(long)]
88 no_proxy: bool,
89
90 #[clap(flatten)]
92 storage: ObjectStoreConfig,
93}
94
95impl ImportV2Command {
96 pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
97 validate_uri(&self.from)
99 .context(SnapshotStorageSnafu)
100 .map_err(BoxedError::new)?;
101
102 let schemas = if self.schemas.is_empty() {
104 None
105 } else {
106 Some(self.schemas.clone())
107 };
108
109 let storage = OpenDalStorage::from_uri(&self.from, &self.storage)
111 .context(SnapshotStorageSnafu)
112 .map_err(BoxedError::new)?;
113
114 let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
116 let database_client = DatabaseClient::new(
117 self.addr.clone(),
118 self.catalog.clone(),
119 self.auth_basic.clone(),
120 self.timeout.unwrap_or(Duration::from_secs(60)),
121 proxy,
122 self.no_proxy,
123 );
124
125 Ok(Box::new(Import {
126 catalog: self.catalog.clone(),
127 schemas,
128 dry_run: self.dry_run,
129 snapshot_uri: self.from.clone(),
130 storage_config: self.storage.clone(),
131 storage: Box::new(storage),
132 database_client,
133 }))
134 }
135}
136
137pub struct Import {
139 catalog: String,
140 schemas: Option<Vec<String>>,
141 dry_run: bool,
142 snapshot_uri: String,
143 storage_config: ObjectStoreConfig,
144 storage: Box<dyn SnapshotStorage>,
145 database_client: DatabaseClient,
146}
147
148#[async_trait]
149impl Tool for Import {
150 async fn do_work(&self) -> std::result::Result<(), BoxedError> {
151 self.run().await.map_err(BoxedError::new)
152 }
153}
154
155impl Import {
156 async fn run(&self) -> Result<()> {
157 let manifest = self
159 .storage
160 .read_manifest()
161 .await
162 .context(SnapshotStorageSnafu)?;
163
164 info!(
165 "Loading snapshot: {} (version: {}, schema_only: {})",
166 manifest.snapshot_id, manifest.version, manifest.schema_only
167 );
168
169 if manifest.version != MANIFEST_VERSION {
171 return ManifestVersionMismatchSnafu {
172 expected: MANIFEST_VERSION,
173 found: manifest.version,
174 }
175 .fail();
176 }
177
178 info!("Snapshot contains {} schema(s)", manifest.schemas.len());
179
180 let schemas_to_import = match &self.schemas {
182 Some(filter) => canonicalize_schema_filter(filter, &manifest.schemas)?,
183 None => manifest.schemas.clone(),
184 };
185
186 info!("Importing schemas: {:?}", schemas_to_import);
187
188 let ddl_statements = self.read_ddl_statements(&schemas_to_import).await?;
190
191 info!("Generated {} DDL statements", ddl_statements.len());
192
193 let data_tasks = if !manifest.schema_only && !manifest.chunks.is_empty() {
194 validate_data_snapshot(self.storage.as_ref(), &manifest.chunks, &schemas_to_import)
195 .await?;
196 build_import_tasks(&manifest.chunks, &schemas_to_import)
197 } else {
198 Vec::new()
199 };
200
201 if self.dry_run {
203 info!("Dry-run mode - DDL statements to execute:");
204 println!();
205 for (i, stmt) in ddl_statements.iter().enumerate() {
206 println!("-- Statement {}", i + 1);
207 println!("{};", stmt.sql);
208 println!();
209 }
210 if !manifest.schema_only && !manifest.chunks.is_empty() {
211 for line in format_data_import_plan(&manifest.chunks, &schemas_to_import) {
212 println!("{line}");
213 }
214 println!();
215 }
216 return Ok(());
217 }
218
219 let mut resume_session = if !data_tasks.is_empty() {
220 let state_path = default_state_path(
221 &manifest.snapshot_id.to_string(),
222 self.database_client.addr(),
223 &self.catalog,
224 &schemas_to_import,
225 )
226 .context(ImportStatePathUnavailableSnafu {
227 snapshot_id: manifest.snapshot_id.to_string(),
228 })?;
229 Some(
230 prepare_import_resume(ImportResumeConfig {
231 snapshot_id: manifest.snapshot_id.to_string(),
232 target_addr: self.database_client.addr().to_string(),
233 catalog: self.catalog.clone(),
234 schemas: schemas_to_import.clone(),
235 state_path,
236 tasks: data_tasks,
237 })
238 .await?,
239 )
240 } else {
241 None
242 };
243
244 let skip_ddl = resume_session
245 .as_ref()
246 .map(|session| session.should_skip_ddl())
247 .unwrap_or(false);
248
249 let ddl_executed = if skip_ddl {
251 info!(
252 "Existing import state has DDL marked completed; skipping DDL execution and resuming data import"
253 );
254 false
255 } else {
256 let executor = DdlExecutor::new(&self.database_client);
257 executor.execute_strict(&ddl_statements).await?;
258 if let Some(session) = resume_session.as_mut() {
259 session.mark_ddl_completed().await?;
260 }
261 true
262 };
263
264 if let Some(resume_session) = resume_session {
265 let executor = CopyDatabaseImportTaskExecutor {
266 import: self,
267 format: manifest.format,
268 };
269 import_with_resume_session(resume_session, &executor).await?;
270 }
271
272 if ddl_executed {
273 info!(
274 "Import completed: {} DDL statements executed",
275 ddl_statements.len()
276 );
277 } else {
278 info!("Import completed: DDL execution skipped");
279 }
280
281 Ok(())
282 }
283
284 async fn read_ddl_statements(&self, schemas: &[String]) -> Result<Vec<DdlStatement>> {
285 let mut statements = Vec::new();
286 for schema in schemas {
287 let path = ddl_path_for_schema(schema);
288 let content = self
289 .storage
290 .read_text(&path)
291 .await
292 .context(SnapshotStorageSnafu)?;
293 statements.extend(
294 parse_ddl_statements(&content)
295 .into_iter()
296 .map(|sql| ddl_statement_for_schema(schema, sql)),
297 );
298 }
299
300 Ok(statements)
301 }
302}
303
304struct CopyDatabaseImportTaskExecutor<'a> {
305 import: &'a Import,
306 format: DataFormat,
307}
308
309#[async_trait]
310impl ImportTaskExecutor for CopyDatabaseImportTaskExecutor<'_> {
311 async fn import_task(&self, task: &ImportTaskKey) -> Result<()> {
312 let source = build_copy_source(
313 &self.import.snapshot_uri,
314 &self.import.storage_config,
315 &task.schema,
316 task.chunk_id,
317 )
318 .context(ChunkImportFailedSnafu {
319 chunk_id: task.chunk_id,
320 schema: task.schema.clone(),
321 })?;
322
323 execute_copy_database_from(
324 &self.import.database_client,
325 &self.import.catalog,
326 &task.schema,
327 &source,
328 self.format,
329 )
330 .await
331 .context(ChunkImportFailedSnafu {
332 chunk_id: task.chunk_id,
333 schema: task.schema.clone(),
334 })
335 }
336}
337
338fn parse_ddl_statements(content: &str) -> Vec<String> {
339 let mut statements = Vec::new();
340 let mut current = String::new();
341 let mut chars = content.chars().peekable();
342 let mut in_single_quote = false;
343 let mut in_double_quote = false;
344 let mut in_line_comment = false;
345 let mut in_block_comment = false;
346
347 while let Some(ch) = chars.next() {
348 if in_line_comment {
349 if ch == '\n' {
350 in_line_comment = false;
351 current.push('\n');
352 }
353 continue;
354 }
355
356 if in_block_comment {
357 if ch == '*' && chars.peek() == Some(&'/') {
358 chars.next();
359 in_block_comment = false;
360 }
361 continue;
362 }
363
364 if in_single_quote {
365 current.push(ch);
366 if ch == '\'' {
367 if chars.peek() == Some(&'\'') {
368 current.push(chars.next().expect("peeked quote must exist"));
369 } else {
370 in_single_quote = false;
371 }
372 }
373 continue;
374 }
375
376 if in_double_quote {
377 current.push(ch);
378 if ch == '"' {
379 if chars.peek() == Some(&'"') {
380 current.push(chars.next().expect("peeked quote must exist"));
381 } else {
382 in_double_quote = false;
383 }
384 }
385 continue;
386 }
387
388 match ch {
389 '-' if chars.peek() == Some(&'-') => {
390 chars.next();
391 in_line_comment = true;
392 }
393 '/' if chars.peek() == Some(&'*') => {
394 chars.next();
395 in_block_comment = true;
396 }
397 '\'' => {
398 in_single_quote = true;
399 current.push(ch);
400 }
401 '"' => {
402 in_double_quote = true;
403 current.push(ch);
404 }
405 ';' => {
406 let statement = current.trim();
407 if !statement.is_empty() {
408 statements.push(statement.to_string());
409 }
410 current.clear();
411 }
412 _ => current.push(ch),
413 }
414 }
415
416 let statement = current.trim();
417 if !statement.is_empty() {
418 statements.push(statement.to_string());
419 }
420
421 statements
422}
423
424fn ddl_statement_for_schema(schema: &str, sql: String) -> DdlStatement {
425 if is_schema_scoped_statement(&sql) {
426 DdlStatement::with_execution_schema(sql, schema.to_string())
427 } else {
428 DdlStatement::new(sql)
429 }
430}
431
432fn is_schema_scoped_statement(sql: &str) -> bool {
433 let trimmed = sql.trim_start();
434 if !starts_with_keyword(trimmed, "CREATE") {
435 return false;
436 }
437
438 let Some(rest) = trimmed.get("CREATE".len()..) else {
439 return false;
440 };
441 let mut rest = rest.trim_start();
442 if starts_with_keyword(rest, "OR") {
443 let Some(next) = rest.get("OR".len()..) else {
444 return false;
445 };
446 rest = next.trim_start();
447 if !starts_with_keyword(rest, "REPLACE") {
448 return false;
449 }
450 let Some(next) = rest.get("REPLACE".len()..) else {
451 return false;
452 };
453 rest = next.trim_start();
454 }
455
456 if starts_with_keyword(rest, "EXTERNAL") {
457 let Some(next) = rest.get("EXTERNAL".len()..) else {
458 return false;
459 };
460 rest = next.trim_start();
461 }
462
463 starts_with_keyword(rest, "TABLE") || starts_with_keyword(rest, "VIEW")
464}
465
466fn starts_with_keyword(input: &str, keyword: &str) -> bool {
467 input
468 .get(0..keyword.len())
469 .map(|s| s.eq_ignore_ascii_case(keyword))
470 .unwrap_or(false)
471 && input
472 .as_bytes()
473 .get(keyword.len())
474 .map(|b| !b.is_ascii_alphanumeric() && *b != b'_')
475 .unwrap_or(true)
476}
477
478fn canonicalize_schema_filter(
479 filter: &[String],
480 manifest_schemas: &[String],
481) -> Result<Vec<String>> {
482 let mut canonicalized = Vec::new();
483 let mut seen = HashSet::new();
484
485 for schema in filter {
486 let canonical = manifest_schemas
487 .iter()
488 .find(|candidate| candidate.eq_ignore_ascii_case(schema))
489 .cloned()
490 .ok_or_else(|| {
491 SchemaNotInSnapshotSnafu {
492 schema: schema.clone(),
493 }
494 .build()
495 })?;
496
497 if seen.insert(canonical.to_ascii_lowercase()) {
498 canonicalized.push(canonical);
499 }
500 }
501
502 Ok(canonicalized)
503}
504
505fn validate_chunk_statuses(chunks: &[ChunkMeta]) -> Result<()> {
506 let invalid_chunk = chunks
507 .iter()
508 .find(|chunk| !matches!(chunk.status, ChunkStatus::Completed | ChunkStatus::Skipped));
509
510 if let Some(chunk) = invalid_chunk {
511 return IncompleteSnapshotSnafu {
512 chunk_id: chunk.id,
513 status: chunk.status,
514 }
515 .fail();
516 }
517
518 Ok(())
519}
520
521fn format_data_import_plan(chunks: &[ChunkMeta], schemas: &[String]) -> Vec<String> {
522 let mut lines = vec!["-- Data import plan:".to_string()];
523 for chunk in chunks {
524 lines.push(format!("-- Chunk {}: {:?}", chunk.id, chunk.status));
525 for schema in schemas {
526 if chunk_has_schema_files(chunk, schema) {
527 lines.push(format!("-- {} -> COPY DATABASE FROM", schema));
528 }
529 }
530 }
531 lines
532}
533
534async fn validate_data_snapshot(
535 storage: &dyn SnapshotStorage,
536 chunks: &[ChunkMeta],
537 schemas: &[String],
538) -> Result<()> {
539 validate_chunk_statuses(chunks)?;
540 let actual_prefixes = collect_chunk_data_prefixes(storage).await?;
541
542 for chunk in chunks {
543 if chunk.status == ChunkStatus::Skipped {
544 continue;
545 }
546 if chunk.files.is_empty() {
547 return EmptyChunkManifestSnafu { chunk_id: chunk.id }.fail();
548 }
549 for schema in schemas {
550 validate_chunk_schema_files(chunk, schema, &actual_prefixes)?;
551 }
552 }
553
554 Ok(())
555}
556
557async fn collect_chunk_data_prefixes(storage: &dyn SnapshotStorage) -> Result<HashSet<String>> {
558 let files = storage
559 .list_files_recursive("data/")
560 .await
561 .context(SnapshotStorageSnafu)?;
562 let mut prefixes = HashSet::new();
563
564 for path in files {
565 let normalized = path.trim_start_matches('/');
566 let mut parts = normalized.splitn(4, '/');
567 let Some(root) = parts.next() else {
568 continue;
569 };
570 let Some(schema) = parts.next() else {
571 continue;
572 };
573 let Some(chunk_id) = parts.next() else {
574 continue;
575 };
576 if root != "data" {
577 continue;
578 }
579 prefixes.insert(format!("data/{schema}/{chunk_id}/"));
580 }
581
582 Ok(prefixes)
583}
584
585fn validate_chunk_schema_files(
586 chunk: &ChunkMeta,
587 schema: &str,
588 actual_prefixes: &HashSet<String>,
589) -> Result<bool> {
590 if !chunk_has_schema_files(chunk, schema) {
591 return Ok(false);
592 }
593
594 let prefix = data_dir_for_schema_chunk(schema, chunk.id);
595 if !actual_prefixes.contains(&prefix) {
596 return MissingChunkDataSnafu {
597 chunk_id: chunk.id,
598 schema: schema.to_string(),
599 path: prefix,
600 }
601 .fail();
602 }
603
604 Ok(true)
605}
606
607#[cfg(test)]
608mod tests {
609 use std::collections::{HashMap, HashSet};
610
611 use async_trait::async_trait;
612
613 use super::*;
614 use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus, Manifest, TimeRange};
615 use crate::data::export_v2::schema::SchemaSnapshot;
616 use crate::data::snapshot_storage::SnapshotStorage;
617
618 struct StubStorage {
619 manifest: Manifest,
620 files_by_prefix: HashMap<String, Vec<String>>,
621 }
622
623 #[async_trait]
624 impl SnapshotStorage for StubStorage {
625 async fn exists(&self) -> crate::data::export_v2::error::Result<bool> {
626 Ok(true)
627 }
628
629 async fn read_manifest(&self) -> crate::data::export_v2::error::Result<Manifest> {
630 Ok(self.manifest.clone())
631 }
632
633 async fn write_manifest(
634 &self,
635 _manifest: &Manifest,
636 ) -> crate::data::export_v2::error::Result<()> {
637 unimplemented!("not needed in import_v2::command tests")
638 }
639
640 async fn read_text(&self, _path: &str) -> crate::data::export_v2::error::Result<String> {
641 unimplemented!("not needed in import_v2::command tests")
642 }
643
644 async fn write_text(
645 &self,
646 _path: &str,
647 _content: &str,
648 ) -> crate::data::export_v2::error::Result<()> {
649 unimplemented!("not needed in import_v2::command tests")
650 }
651
652 async fn write_schema(
653 &self,
654 _snapshot: &SchemaSnapshot,
655 ) -> crate::data::export_v2::error::Result<()> {
656 unimplemented!("not needed in import_v2::command tests")
657 }
658
659 async fn create_dir_all(&self, _path: &str) -> crate::data::export_v2::error::Result<()> {
660 unimplemented!("not needed in import_v2::command tests")
661 }
662
663 async fn list_files_recursive(
664 &self,
665 prefix: &str,
666 ) -> crate::data::export_v2::error::Result<Vec<String>> {
667 Ok(self
668 .files_by_prefix
669 .iter()
670 .filter(|(candidate, _)| candidate.starts_with(prefix))
671 .flat_map(|(_, files)| files.clone())
672 .collect())
673 }
674
675 async fn delete_snapshot(&self) -> crate::data::export_v2::error::Result<()> {
676 unimplemented!("not needed in import_v2::command tests")
677 }
678 }
679
680 #[test]
681 fn test_parse_ddl_statements() {
682 let content = r#"
683-- Schema: public
684CREATE DATABASE public;
685CREATE TABLE t (ts TIMESTAMP TIME INDEX, host STRING, PRIMARY KEY (host)) ENGINE=mito;
686
687-- comment
688CREATE VIEW v AS SELECT * FROM t;
689"#;
690 let statements = parse_ddl_statements(content);
691 assert_eq!(statements.len(), 3);
692 assert!(statements[0].starts_with("CREATE DATABASE public"));
693 assert!(statements[1].starts_with("CREATE TABLE t"));
694 assert!(statements[2].starts_with("CREATE VIEW v"));
695 }
696
697 #[test]
698 fn test_parse_ddl_statements_preserves_semicolons_in_string_literals() {
699 let content = r#"
700CREATE TABLE t (
701 host STRING DEFAULT 'a;b'
702);
703CREATE VIEW v AS SELECT ';' AS marker;
704"#;
705
706 let statements = parse_ddl_statements(content);
707
708 assert_eq!(statements.len(), 2);
709 assert!(statements[0].contains("'a;b'"));
710 assert!(statements[1].contains("';' AS marker"));
711 }
712
713 #[test]
714 fn test_parse_ddl_statements_handles_comments_without_splitting() {
715 let content = r#"
716-- leading comment
717CREATE TABLE t (ts TIMESTAMP TIME INDEX); /* block; comment */
718CREATE VIEW v AS SELECT 1;
719"#;
720
721 let statements = parse_ddl_statements(content);
722
723 assert_eq!(statements.len(), 2);
724 assert!(statements[0].starts_with("CREATE TABLE t"));
725 assert!(statements[1].starts_with("CREATE VIEW v"));
726 }
727
728 #[test]
729 fn test_canonicalize_schema_filter_uses_manifest_casing() {
730 let filter = vec!["TEST_DB".to_string(), "PUBLIC".to_string()];
731 let manifest_schemas = vec!["test_db".to_string(), "public".to_string()];
732
733 let canonicalized = canonicalize_schema_filter(&filter, &manifest_schemas).unwrap();
734
735 assert_eq!(canonicalized, vec!["test_db", "public"]);
736 }
737
738 #[test]
739 fn test_canonicalize_schema_filter_dedupes_case_insensitive_matches() {
740 let filter = vec![
741 "TEST_DB".to_string(),
742 "test_db".to_string(),
743 "PUBLIC".to_string(),
744 "public".to_string(),
745 ];
746 let manifest_schemas = vec!["test_db".to_string(), "public".to_string()];
747
748 let canonicalized = canonicalize_schema_filter(&filter, &manifest_schemas).unwrap();
749
750 assert_eq!(canonicalized, vec!["test_db", "public"]);
751 }
752
753 #[test]
754 fn test_canonicalize_schema_filter_rejects_missing_schema() {
755 let filter = vec!["missing".to_string()];
756 let manifest_schemas = vec!["test_db".to_string()];
757
758 let error = canonicalize_schema_filter(&filter, &manifest_schemas)
759 .expect_err("missing schema should fail")
760 .to_string();
761
762 assert!(error.contains("missing"));
763 }
764
765 #[test]
766 fn test_ddl_statement_for_schema_create_table_uses_execution_schema() {
767 let stmt = ddl_statement_for_schema(
768 "test_db",
769 "CREATE TABLE metrics (ts TIMESTAMP TIME INDEX) ENGINE=mito".to_string(),
770 );
771 assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
772 }
773
774 #[test]
775 fn test_ddl_statement_for_schema_create_view_uses_execution_schema() {
776 let stmt = ddl_statement_for_schema(
777 "test_db",
778 "CREATE VIEW metrics_view AS SELECT * FROM metrics".to_string(),
779 );
780 assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
781 }
782
783 #[test]
784 fn test_ddl_statement_for_schema_create_or_replace_view_uses_execution_schema() {
785 let stmt = ddl_statement_for_schema(
786 "test_db",
787 "CREATE OR REPLACE VIEW metrics_view AS SELECT * FROM metrics".to_string(),
788 );
789 assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
790 }
791
792 #[test]
793 fn test_ddl_statement_for_schema_create_external_table_uses_execution_schema() {
794 let stmt = ddl_statement_for_schema(
795 "test_db",
796 "CREATE EXTERNAL TABLE IF NOT EXISTS ext_metrics (ts TIMESTAMP TIME INDEX) ENGINE=file"
797 .to_string(),
798 );
799 assert_eq!(stmt.execution_schema.as_deref(), Some("test_db"));
800 }
801
802 #[test]
803 fn test_ddl_statement_for_schema_create_database_uses_public_context() {
804 let stmt = ddl_statement_for_schema("test_db", "CREATE DATABASE test_db".to_string());
805 assert_eq!(stmt.execution_schema, None);
806 }
807
808 #[test]
809 fn test_starts_with_keyword_requires_word_boundary() {
810 assert!(starts_with_keyword("CREATE TABLE t", "CREATE"));
811 assert!(!starts_with_keyword("CREATED TABLE t", "CREATE"));
812 assert!(!starts_with_keyword("TABLESPACE foo", "TABLE"));
813 }
814
815 #[test]
816 fn test_validate_chunk_statuses_rejects_failed_chunk() {
817 let mut failed = ChunkMeta::new(3, TimeRange::unbounded());
818 failed.status = ChunkStatus::Failed;
819
820 let error = validate_chunk_statuses(&[failed]).expect_err("failed chunk should error");
821 assert!(error.to_string().contains("Incomplete snapshot"));
822 }
823
824 #[test]
825 fn test_validate_chunk_statuses_accepts_completed_and_skipped_chunks() {
826 let mut completed = ChunkMeta::new(1, TimeRange::unbounded());
827 completed.status = ChunkStatus::Completed;
828 let skipped = ChunkMeta::skipped(2, TimeRange::unbounded());
829
830 assert!(validate_chunk_statuses(&[completed, skipped]).is_ok());
831 }
832
833 #[test]
834 fn test_chunk_has_schema_files_matches_encoded_schema_prefix() {
835 let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
836 chunk.files = vec![
837 "data/public/7/a.parquet".to_string(),
838 "data/%E6%B5%8B%E8%AF%95/7/b.parquet".to_string(),
839 ];
840
841 assert!(chunk_has_schema_files(&chunk, "public"));
842 assert!(chunk_has_schema_files(&chunk, "测试"));
843 assert!(!chunk_has_schema_files(&chunk, "metrics"));
844 }
845
846 #[test]
847 fn test_format_data_import_plan_includes_matching_schemas_only() {
848 let mut completed = ChunkMeta::new(1, TimeRange::unbounded());
849 completed.status = ChunkStatus::Completed;
850 completed.files = vec![
851 "data/public/1/a.parquet".to_string(),
852 "data/%E6%B5%8B%E8%AF%95/1/b.parquet".to_string(),
853 ];
854 let skipped = ChunkMeta::skipped(2, TimeRange::unbounded());
855
856 let lines = format_data_import_plan(
857 &[completed, skipped],
858 &[
859 "public".to_string(),
860 "测试".to_string(),
861 "metrics".to_string(),
862 ],
863 );
864
865 assert_eq!(lines[0], "-- Data import plan:");
866 assert!(lines.contains(&"-- Chunk 1: Completed".to_string()));
867 assert!(lines.contains(&"-- public -> COPY DATABASE FROM".to_string()));
868 assert!(lines.contains(&"-- 测试 -> COPY DATABASE FROM".to_string()));
869 assert!(!lines.contains(&"-- metrics -> COPY DATABASE FROM".to_string()));
870 assert!(lines.contains(&"-- Chunk 2: Skipped".to_string()));
871 }
872
873 #[tokio::test]
874 async fn test_collect_chunk_data_prefixes_indexes_present_prefixes() {
875 let storage = StubStorage {
876 manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
877 files_by_prefix: HashMap::from([
878 (
879 "data/public/7/".to_string(),
880 vec!["data/public/7/a.parquet".to_string()],
881 ),
882 (
883 "data/%E6%B5%8B%E8%AF%95/9/".to_string(),
884 vec!["data/%E6%B5%8B%E8%AF%95/9/b.parquet".to_string()],
885 ),
886 ]),
887 };
888
889 let prefixes = collect_chunk_data_prefixes(&storage).await.unwrap();
890
891 assert!(prefixes.contains("data/public/7/"));
892 assert!(prefixes.contains("data/%E6%B5%8B%E8%AF%95/9/"));
893 }
894
895 #[test]
896 fn test_validate_chunk_schema_files_accepts_present_prefix() {
897 let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
898 chunk.files = vec!["data/public/7/a.parquet".to_string()];
899 let actual_prefixes = HashSet::from(["data/public/7/".to_string()]);
900
901 assert!(validate_chunk_schema_files(&chunk, "public", &actual_prefixes).unwrap());
902 }
903
904 #[test]
905 fn test_validate_chunk_schema_files_rejects_missing_prefix() {
906 let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
907 chunk.files = vec!["data/public/7/a.parquet".to_string()];
908
909 let error = validate_chunk_schema_files(&chunk, "public", &HashSet::new())
910 .expect_err("missing chunk prefix should fail")
911 .to_string();
912 assert!(error.contains("marked completed but no files were found"));
913 }
914
915 #[test]
916 fn test_validate_chunk_schema_files_skips_absent_schema() {
917 let mut chunk = ChunkMeta::new(7, TimeRange::unbounded());
918 chunk.files = vec!["data/public/7/a.parquet".to_string()];
919
920 assert!(!validate_chunk_schema_files(&chunk, "metrics", &HashSet::new()).unwrap());
921 }
922
923 #[tokio::test]
924 async fn test_validate_data_snapshot_rejects_failed_chunk_before_dry_run() {
925 let mut failed = ChunkMeta::new(3, TimeRange::unbounded());
926 failed.status = ChunkStatus::Failed;
927
928 let storage = StubStorage {
929 manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
930 files_by_prefix: HashMap::new(),
931 };
932
933 let error = validate_data_snapshot(&storage, &[failed], &["public".to_string()])
934 .await
935 .expect_err("failed chunk should reject dry-run validation")
936 .to_string();
937 assert!(error.contains("Incomplete snapshot"));
938 }
939
940 #[tokio::test]
941 async fn test_validate_data_snapshot_rejects_missing_chunk_prefix_before_dry_run() {
942 let mut completed = ChunkMeta::new(7, TimeRange::unbounded());
943 completed.status = ChunkStatus::Completed;
944 completed.files = vec!["data/public/7/a.parquet".to_string()];
945
946 let storage = StubStorage {
947 manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
948 files_by_prefix: HashMap::new(),
949 };
950
951 let error = validate_data_snapshot(&storage, &[completed], &["public".to_string()])
952 .await
953 .expect_err("missing chunk prefix should reject dry-run validation")
954 .to_string();
955 assert!(error.contains("marked completed but no files were found"));
956 }
957
958 #[tokio::test]
959 async fn test_validate_data_snapshot_rejects_completed_chunk_with_empty_manifest() {
960 let mut completed = ChunkMeta::new(7, TimeRange::unbounded());
961 completed.status = ChunkStatus::Completed;
962
963 let storage = StubStorage {
964 manifest: Manifest::new_schema_only("greptime".to_string(), vec!["public".to_string()]),
965 files_by_prefix: HashMap::new(),
966 };
967
968 let error = validate_data_snapshot(&storage, &[completed], &["public".to_string()])
969 .await
970 .expect_err("empty completed chunk should reject validation")
971 .to_string();
972 assert!(error.contains("file manifest is empty"));
973 }
974}