Skip to main content

cli/data/export_v2/
command.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Export V2 CLI commands.
16
17use std::collections::HashSet;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::{Parser, Subcommand};
22use common_error::ext::BoxedError;
23use common_telemetry::info;
24use serde_json::Value;
25use snafu::{OptionExt, ResultExt};
26
27use crate::Tool;
28use crate::common::ObjectStoreConfig;
29use crate::data::export_v2::coordinator::export_data;
30use crate::data::export_v2::error::{
31    ChunkTimeWindowRequiresBoundsSnafu, DatabaseSnafu, EmptyResultSnafu,
32    ManifestVersionMismatchSnafu, Result, ResumeConfigMismatchSnafu, SchemaOnlyArgsNotAllowedSnafu,
33    SchemaOnlyModeMismatchSnafu, SnapshotVerifyFailedSnafu, UnexpectedValueTypeSnafu,
34};
35use crate::data::export_v2::extractor::SchemaExtractor;
36use crate::data::export_v2::manifest::{
37    ChunkMeta, ChunkStatus, DataFormat, MANIFEST_FILE, MANIFEST_VERSION, Manifest, TimeRange,
38};
39use crate::data::export_v2::schema::{DDL_DIR, SCHEMA_DIR, SCHEMAS_FILE};
40use crate::data::path::{data_dir_for_schema_chunk, ddl_path_for_schema};
41use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
42use crate::data::sql::{escape_sql_identifier, escape_sql_literal};
43use crate::database::{DatabaseClient, parse_proxy_opts};
44
45/// Export V2 commands.
46#[derive(Debug, Subcommand)]
47pub enum ExportV2Command {
48    /// Create a new snapshot.
49    Create(ExportCreateCommand),
50    /// List snapshots under a parent location.
51    List(ExportListCommand),
52    /// Verify snapshot integrity.
53    Verify(ExportVerifyCommand),
54}
55
56impl ExportV2Command {
57    pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
58        match self {
59            ExportV2Command::Create(cmd) => cmd.build().await,
60            ExportV2Command::List(cmd) => cmd.build().await,
61            ExportV2Command::Verify(cmd) => cmd.build().await,
62        }
63    }
64}
65
66/// List snapshots under a parent location.
67#[derive(Debug, Parser)]
68pub struct ExportListCommand {
69    /// Parent storage location whose direct subdirectories are snapshots.
70    #[clap(long)]
71    location: String,
72
73    /// Object store configuration for remote storage backends.
74    #[clap(flatten)]
75    storage: ObjectStoreConfig,
76}
77
78impl ExportListCommand {
79    pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
80        validate_uri(&self.location).map_err(BoxedError::new)?;
81        let storage = OpenDalStorage::from_parent_uri(&self.location, &self.storage)
82            .map_err(BoxedError::new)?;
83
84        Ok(Box::new(ExportList {
85            location: self.location.clone(),
86            storage,
87        }))
88    }
89}
90
91/// Export list tool implementation.
92pub struct ExportList {
93    location: String,
94    storage: OpenDalStorage,
95}
96
97#[async_trait]
98impl Tool for ExportList {
99    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
100        self.run().await.map_err(BoxedError::new)
101    }
102}
103
104impl ExportList {
105    async fn run(&self) -> Result<()> {
106        let result = scan_snapshots(&self.storage).await?;
107
108        println!("Scanning: {}", self.location);
109        if result.snapshots.is_empty() {
110            println!("No snapshots found.");
111        } else {
112            print_snapshot_list(&result.snapshots, result.unreadable.len());
113        }
114        print_unreadable_warnings(&result.unreadable);
115
116        Ok(())
117    }
118}
119
120/// Verify snapshot integrity.
121#[derive(Debug, Parser)]
122pub struct ExportVerifyCommand {
123    /// Snapshot storage location (e.g., s3://bucket/path, file:///tmp/backup).
124    #[clap(long)]
125    snapshot: String,
126
127    /// Object store configuration for remote storage backends.
128    #[clap(flatten)]
129    storage: ObjectStoreConfig,
130}
131
132impl ExportVerifyCommand {
133    pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
134        validate_uri(&self.snapshot).map_err(BoxedError::new)?;
135        let storage =
136            OpenDalStorage::from_uri(&self.snapshot, &self.storage).map_err(BoxedError::new)?;
137
138        Ok(Box::new(ExportVerify {
139            snapshot: self.snapshot.clone(),
140            storage,
141        }))
142    }
143}
144
145/// Export verify tool implementation.
146pub struct ExportVerify {
147    snapshot: String,
148    storage: OpenDalStorage,
149}
150
151#[async_trait]
152impl Tool for ExportVerify {
153    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
154        self.run().await.map_err(BoxedError::new)
155    }
156}
157
158impl ExportVerify {
159    async fn run(&self) -> Result<()> {
160        let report = verify_snapshot(&self.storage).await?;
161        print_verify_report(&self.snapshot, &report);
162
163        if report.has_problems() {
164            return SnapshotVerifyFailedSnafu {
165                errors: report.error_count(),
166                warnings: report.warning_count(),
167            }
168            .fail();
169        }
170
171        Ok(())
172    }
173}
174
175/// Create a new snapshot.
176#[derive(Debug, Parser)]
177pub struct ExportCreateCommand {
178    /// Server address to connect (e.g., 127.0.0.1:4000).
179    #[clap(long)]
180    addr: String,
181
182    /// Target storage location (e.g., s3://bucket/path, file:///tmp/backup).
183    #[clap(long)]
184    to: String,
185
186    /// Catalog name.
187    #[clap(long, default_value = "greptime")]
188    catalog: String,
189
190    /// Schema list to export (default: all non-system schemas).
191    /// Can be specified multiple times or comma-separated.
192    #[clap(long, value_delimiter = ',')]
193    schemas: Vec<String>,
194
195    /// Export schema only, no data.
196    #[clap(long)]
197    schema_only: bool,
198
199    /// Time range start (ISO 8601 format, e.g., 2024-01-01T00:00:00Z).
200    #[clap(long)]
201    start_time: Option<String>,
202
203    /// Time range end (ISO 8601 format, e.g., 2024-12-31T23:59:59Z).
204    #[clap(long)]
205    end_time: Option<String>,
206
207    /// Chunk time window (e.g., 1h, 6h, 1d, 7d).
208    /// Requires both --start-time and --end-time when specified.
209    #[clap(long, value_parser = humantime::parse_duration)]
210    chunk_time_window: Option<Duration>,
211
212    /// Data format: parquet, csv, json.
213    #[clap(long, value_enum, default_value = "parquet")]
214    format: DataFormat,
215
216    /// Delete existing snapshot and recreate.
217    #[clap(long)]
218    force: bool,
219
220    /// Parallelism for COPY DATABASE execution (server-side, per schema per chunk).
221    #[clap(long, default_value = "1")]
222    parallelism: usize,
223
224    /// Basic authentication (user:password).
225    #[clap(long)]
226    auth_basic: Option<String>,
227
228    /// Request timeout.
229    #[clap(long, value_parser = humantime::parse_duration)]
230    timeout: Option<Duration>,
231
232    /// Proxy server address.
233    ///
234    /// If set, it overrides the system proxy unless `--no-proxy` is specified.
235    /// If neither `--proxy` nor `--no-proxy` is set, system proxy (env) may be used.
236    #[clap(long)]
237    proxy: Option<String>,
238
239    /// Disable all proxy usage (ignores `--proxy` and system proxy).
240    ///
241    /// When set and `--proxy` is not provided, this explicitly disables system proxy.
242    #[clap(long)]
243    no_proxy: bool,
244
245    /// Object store configuration for remote storage backends.
246    #[clap(flatten)]
247    storage: ObjectStoreConfig,
248}
249
250impl ExportCreateCommand {
251    pub async fn build(&self) -> std::result::Result<Box<dyn Tool>, BoxedError> {
252        // Validate URI format
253        validate_uri(&self.to).map_err(BoxedError::new)?;
254
255        let time_range = TimeRange::parse(self.start_time.as_deref(), self.end_time.as_deref())
256            .map_err(BoxedError::new)?;
257        if self.chunk_time_window.is_some() && !time_range.is_bounded() {
258            return ChunkTimeWindowRequiresBoundsSnafu
259                .fail()
260                .map_err(BoxedError::new);
261        }
262        if self.schema_only {
263            let mut invalid_args = Vec::new();
264            if self.start_time.is_some() {
265                invalid_args.push("--start-time");
266            }
267            if self.end_time.is_some() {
268                invalid_args.push("--end-time");
269            }
270            if self.chunk_time_window.is_some() {
271                invalid_args.push("--chunk-time-window");
272            }
273            if self.format != DataFormat::Parquet {
274                invalid_args.push("--format");
275            }
276            if self.parallelism != 1 {
277                invalid_args.push("--parallelism");
278            }
279            if !invalid_args.is_empty() {
280                return SchemaOnlyArgsNotAllowedSnafu {
281                    args: invalid_args.join(", "),
282                }
283                .fail()
284                .map_err(BoxedError::new);
285            }
286        }
287
288        // Parse schemas (empty vec means all schemas)
289        let schemas = if self.schemas.is_empty() {
290            None
291        } else {
292            Some(self.schemas.clone())
293        };
294
295        // Build storage
296        let storage = OpenDalStorage::from_uri(&self.to, &self.storage).map_err(BoxedError::new)?;
297
298        // Build database client
299        let proxy = parse_proxy_opts(self.proxy.clone(), self.no_proxy)?;
300        let database_client = DatabaseClient::new(
301            self.addr.clone(),
302            self.catalog.clone(),
303            self.auth_basic.clone(),
304            self.timeout.unwrap_or(Duration::from_secs(60)),
305            proxy,
306            self.no_proxy,
307        );
308
309        Ok(Box::new(ExportCreate {
310            config: ExportConfig {
311                catalog: self.catalog.clone(),
312                schemas,
313                schema_only: self.schema_only,
314                format: self.format,
315                force: self.force,
316                time_range,
317                chunk_time_window: self.chunk_time_window,
318                parallelism: self.parallelism,
319                snapshot_uri: self.to.clone(),
320                storage_config: self.storage.clone(),
321            },
322            storage: Box::new(storage),
323            database_client,
324        }))
325    }
326}
327
328/// Export tool implementation.
329pub struct ExportCreate {
330    config: ExportConfig,
331    storage: Box<dyn SnapshotStorage>,
332    database_client: DatabaseClient,
333}
334
335struct ExportConfig {
336    catalog: String,
337    schemas: Option<Vec<String>>,
338    schema_only: bool,
339    format: DataFormat,
340    force: bool,
341    time_range: TimeRange,
342    chunk_time_window: Option<Duration>,
343    parallelism: usize,
344    snapshot_uri: String,
345    storage_config: ObjectStoreConfig,
346}
347
348#[async_trait]
349impl Tool for ExportCreate {
350    async fn do_work(&self) -> std::result::Result<(), BoxedError> {
351        self.run().await.map_err(BoxedError::new)
352    }
353}
354
355impl ExportCreate {
356    async fn run(&self) -> Result<()> {
357        // 1. Check if snapshot exists
358        let exists = self.storage.exists().await?;
359
360        if exists {
361            if self.config.force {
362                info!("Deleting existing snapshot (--force)");
363                self.storage.delete_snapshot().await?;
364            } else {
365                // Resume mode - read existing manifest
366                let mut manifest = self.storage.read_manifest().await?;
367
368                // Check version compatibility
369                if manifest.version != MANIFEST_VERSION {
370                    return ManifestVersionMismatchSnafu {
371                        expected: MANIFEST_VERSION,
372                        found: manifest.version,
373                    }
374                    .fail();
375                }
376
377                validate_resume_config(&manifest, &self.config)?;
378
379                info!(
380                    "Resuming existing snapshot: {} (completed: {}/{} chunks)",
381                    manifest.snapshot_id,
382                    manifest.completed_count(),
383                    manifest.chunks.len()
384                );
385
386                if manifest.is_complete() {
387                    info!("Snapshot is already complete");
388                    return Ok(());
389                }
390
391                if manifest.schema_only {
392                    return Ok(());
393                }
394
395                export_data(
396                    self.storage.as_ref(),
397                    &self.database_client,
398                    &self.config.snapshot_uri,
399                    &self.config.storage_config,
400                    &mut manifest,
401                    self.config.parallelism,
402                )
403                .await?;
404                return Ok(());
405            }
406        }
407
408        // 2. Get schema list
409        let extractor = SchemaExtractor::new(&self.database_client, &self.config.catalog);
410        let schema_snapshot = extractor.extract(self.config.schemas.as_deref()).await?;
411
412        let schema_names: Vec<String> = schema_snapshot
413            .schemas
414            .iter()
415            .map(|s| s.name.clone())
416            .collect();
417        info!("Exporting schemas: {:?}", schema_names);
418
419        // 3. Create manifest
420        let mut manifest = Manifest::new_for_export(
421            self.config.catalog.clone(),
422            schema_names.clone(),
423            self.config.schema_only,
424            self.config.time_range.clone(),
425            self.config.format,
426            self.config.chunk_time_window,
427        )?;
428
429        // 4. Write schema files
430        self.storage.write_schema(&schema_snapshot).await?;
431        info!("Exported {} schemas", schema_snapshot.schemas.len());
432
433        // 5. Export DDL files for import recovery.
434        let ddl_by_schema = self.build_ddl_by_schema(&schema_names).await?;
435        for (schema, ddl) in ddl_by_schema {
436            let ddl_path = ddl_path_for_schema(&schema);
437            self.storage.write_text(&ddl_path, &ddl).await?;
438            info!("Exported DDL for schema {} to {}", schema, ddl_path);
439        }
440
441        // 6. Write manifest after schema artifacts and before any data export.
442        //
443        // The manifest is the snapshot commit point: only write it after the schema
444        // index and all DDL files are durable, so a crash cannot leave a "valid"
445        // snapshot that is missing required schema artifacts. For full exports we
446        // still need the manifest before data copy starts, because chunk resume is
447        // tracked by updating this manifest in place.
448        self.storage.write_manifest(&manifest).await?;
449        info!("Snapshot created: {}", manifest.snapshot_id);
450
451        if !self.config.schema_only {
452            export_data(
453                self.storage.as_ref(),
454                &self.database_client,
455                &self.config.snapshot_uri,
456                &self.config.storage_config,
457                &mut manifest,
458                self.config.parallelism,
459            )
460            .await?;
461        }
462
463        Ok(())
464    }
465
466    async fn build_ddl_by_schema(&self, schema_names: &[String]) -> Result<Vec<(String, String)>> {
467        let mut schemas = schema_names.to_vec();
468        schemas.sort();
469
470        let mut ddl_by_schema = Vec::with_capacity(schemas.len());
471        for schema in schemas {
472            let create_database = self.show_create("DATABASE", &schema, None).await?;
473
474            let (mut physical_tables, mut tables, mut views) =
475                self.get_schema_objects(&schema).await?;
476            physical_tables.sort();
477            let mut physical_ddls = Vec::with_capacity(physical_tables.len());
478            for table in physical_tables {
479                physical_ddls.push(self.show_create("TABLE", &schema, Some(&table)).await?);
480            }
481
482            tables.sort();
483            let mut table_ddls = Vec::with_capacity(tables.len());
484            for table in tables {
485                table_ddls.push(self.show_create("TABLE", &schema, Some(&table)).await?);
486            }
487
488            views.sort();
489            let mut view_ddls = Vec::with_capacity(views.len());
490            for view in views {
491                view_ddls.push(self.show_create("VIEW", &schema, Some(&view)).await?);
492            }
493
494            let ddl = build_schema_ddl(
495                &schema,
496                create_database,
497                physical_ddls,
498                table_ddls,
499                view_ddls,
500            );
501            ddl_by_schema.push((schema, ddl));
502        }
503
504        Ok(ddl_by_schema)
505    }
506
507    async fn get_schema_objects(
508        &self,
509        schema: &str,
510    ) -> Result<(Vec<String>, Vec<String>, Vec<String>)> {
511        let physical_tables = self.get_metric_physical_tables(schema).await?;
512        let physical_set: HashSet<&str> = physical_tables.iter().map(String::as_str).collect();
513        let sql = format!(
514            "SELECT table_name, table_type FROM information_schema.tables \
515             WHERE table_catalog = '{}' AND table_schema = '{}' \
516             AND (table_type = 'BASE TABLE' OR table_type = 'VIEW')",
517            escape_sql_literal(&self.config.catalog),
518            escape_sql_literal(schema)
519        );
520        let records: Option<Vec<Vec<Value>>> = self
521            .database_client
522            .sql_in_public(&sql)
523            .await
524            .context(DatabaseSnafu)?;
525
526        let mut tables = Vec::new();
527        let mut views = Vec::new();
528        if let Some(rows) = records {
529            for row in rows {
530                let name = match row.first() {
531                    Some(Value::String(name)) => name.clone(),
532                    _ => return UnexpectedValueTypeSnafu.fail(),
533                };
534                let table_type = match row.get(1) {
535                    Some(Value::String(table_type)) => table_type.as_str(),
536                    _ => return UnexpectedValueTypeSnafu.fail(),
537                };
538                if !physical_set.contains(name.as_str()) {
539                    if table_type == "VIEW" {
540                        views.push(name);
541                    } else {
542                        tables.push(name);
543                    }
544                }
545            }
546        }
547
548        Ok((physical_tables, tables, views))
549    }
550
551    async fn get_metric_physical_tables(&self, schema: &str) -> Result<Vec<String>> {
552        let sql = format!(
553            "SELECT DISTINCT table_name FROM information_schema.columns \
554             WHERE table_catalog = '{}' AND table_schema = '{}' AND column_name = '__tsid'",
555            escape_sql_literal(&self.config.catalog),
556            escape_sql_literal(schema)
557        );
558        let records: Option<Vec<Vec<Value>>> = self
559            .database_client
560            .sql_in_public(&sql)
561            .await
562            .context(DatabaseSnafu)?;
563
564        let mut tables = HashSet::new();
565        if let Some(rows) = records {
566            for row in rows {
567                let name = match row.first() {
568                    Some(Value::String(name)) => name.clone(),
569                    _ => return UnexpectedValueTypeSnafu.fail(),
570                };
571                tables.insert(name);
572            }
573        }
574
575        Ok(tables.into_iter().collect())
576    }
577
578    async fn show_create(
579        &self,
580        show_type: &str,
581        schema: &str,
582        table: Option<&str>,
583    ) -> Result<String> {
584        let sql = match table {
585            Some(table) => format!(
586                r#"SHOW CREATE {} "{}"."{}"."{}""#,
587                show_type,
588                escape_sql_identifier(&self.config.catalog),
589                escape_sql_identifier(schema),
590                escape_sql_identifier(table)
591            ),
592            None => format!(
593                r#"SHOW CREATE {} "{}"."{}""#,
594                show_type,
595                escape_sql_identifier(&self.config.catalog),
596                escape_sql_identifier(schema)
597            ),
598        };
599
600        let records: Option<Vec<Vec<Value>>> = self
601            .database_client
602            .sql_in_public(&sql)
603            .await
604            .context(DatabaseSnafu)?;
605        let rows = records.context(EmptyResultSnafu)?;
606        let row = rows.first().context(EmptyResultSnafu)?;
607        let Some(Value::String(create)) = row.get(1) else {
608            return UnexpectedValueTypeSnafu.fail();
609        };
610
611        Ok(format!("{};\n", create))
612    }
613}
614
615fn build_schema_ddl(
616    schema: &str,
617    create_database: String,
618    physical_tables: Vec<String>,
619    tables: Vec<String>,
620    views: Vec<String>,
621) -> String {
622    let mut ddl = String::new();
623    ddl.push_str(&format!("-- Schema: {}\n", schema));
624    ddl.push_str(&create_database);
625    for stmt in physical_tables {
626        ddl.push_str(&stmt);
627    }
628    for stmt in tables {
629        ddl.push_str(&stmt);
630    }
631    for stmt in views {
632        ddl.push_str(&stmt);
633    }
634    ddl.push('\n');
635    ddl
636}
637
638fn validate_resume_config(manifest: &Manifest, config: &ExportConfig) -> Result<()> {
639    if manifest.schema_only != config.schema_only {
640        return SchemaOnlyModeMismatchSnafu {
641            existing_schema_only: manifest.schema_only,
642            requested_schema_only: config.schema_only,
643        }
644        .fail();
645    }
646
647    if manifest.catalog != config.catalog {
648        return ResumeConfigMismatchSnafu {
649            field: "catalog",
650            existing: manifest.catalog.clone(),
651            requested: config.catalog.clone(),
652        }
653        .fail();
654    }
655
656    // If no schema filter is provided on resume, inherit the existing snapshot
657    // selection instead of reinterpreting the request as "all schemas".
658    if let Some(requested_schemas) = &config.schemas
659        && !schema_selection_matches(&manifest.schemas, requested_schemas)
660    {
661        return ResumeConfigMismatchSnafu {
662            field: "schemas",
663            existing: format_schema_selection(&manifest.schemas),
664            requested: format_schema_selection(requested_schemas),
665        }
666        .fail();
667    }
668
669    if manifest.time_range != config.time_range {
670        return ResumeConfigMismatchSnafu {
671            field: "time_range",
672            existing: format!("{:?}", manifest.time_range),
673            requested: format!("{:?}", config.time_range),
674        }
675        .fail();
676    }
677
678    if manifest.format != config.format {
679        return ResumeConfigMismatchSnafu {
680            field: "format",
681            existing: manifest.format.to_string(),
682            requested: config.format.to_string(),
683        }
684        .fail();
685    }
686
687    let expected_plan = Manifest::new_for_export(
688        manifest.catalog.clone(),
689        manifest.schemas.clone(),
690        config.schema_only,
691        config.time_range.clone(),
692        config.format,
693        config.chunk_time_window,
694    )?;
695    if !chunk_plan_matches(manifest, &expected_plan) {
696        return ResumeConfigMismatchSnafu {
697            field: "chunk plan",
698            existing: format_chunk_plan(&manifest.chunks),
699            requested: format_chunk_plan(&expected_plan.chunks),
700        }
701        .fail();
702    }
703
704    Ok(())
705}
706
707fn schema_selection_matches(existing: &[String], requested: &[String]) -> bool {
708    canonical_schema_selection(existing) == canonical_schema_selection(requested)
709}
710
711fn canonical_schema_selection(schemas: &[String]) -> Vec<String> {
712    let mut canonicalized = Vec::new();
713    let mut seen = HashSet::new();
714
715    for schema in schemas {
716        let normalized = schema.to_ascii_lowercase();
717        if seen.insert(normalized.clone()) {
718            canonicalized.push(normalized);
719        }
720    }
721
722    canonicalized.sort();
723    canonicalized
724}
725
726fn format_schema_selection(schemas: &[String]) -> String {
727    format!("[{}]", schemas.join(", "))
728}
729
730fn chunk_plan_matches(existing: &Manifest, expected: &Manifest) -> bool {
731    existing.chunks.len() == expected.chunks.len()
732        && existing
733            .chunks
734            .iter()
735            .zip(&expected.chunks)
736            .all(|(left, right)| left.id == right.id && left.time_range == right.time_range)
737}
738
739fn format_chunk_plan(chunks: &[ChunkMeta]) -> String {
740    let items = chunks
741        .iter()
742        .map(|chunk| format!("#{}:{:?}", chunk.id, chunk.time_range))
743        .collect::<Vec<_>>();
744    format!("[{}]", items.join(", "))
745}
746
747#[derive(Debug)]
748struct SnapshotListEntry {
749    path: String,
750    manifest: Manifest,
751}
752
753#[derive(Debug, Default)]
754struct SnapshotScanResult {
755    snapshots: Vec<SnapshotListEntry>,
756    unreadable: Vec<String>,
757}
758
759async fn scan_snapshots(storage: &OpenDalStorage) -> Result<SnapshotScanResult> {
760    let mut result = SnapshotScanResult::default();
761    for dir in storage.list_direct_child_dirs().await? {
762        let manifest_path = format!("{}/{}", dir.trim_matches('/'), MANIFEST_FILE);
763        let Some(data) = storage.read_file_if_exists(&manifest_path).await? else {
764            continue;
765        };
766
767        match serde_json::from_slice::<Manifest>(&data) {
768            Ok(manifest) => result.snapshots.push(SnapshotListEntry {
769                path: format!("{}/", dir.trim_matches('/')),
770                manifest,
771            }),
772            Err(_) => result
773                .unreadable
774                .push(format!("{}/", dir.trim_matches('/'))),
775        }
776    }
777
778    result
779        .snapshots
780        .sort_by_key(|entry| std::cmp::Reverse(entry.manifest.created_at));
781    result.unreadable.sort();
782    Ok(result)
783}
784
785fn print_snapshot_list(snapshots: &[SnapshotListEntry], unreadable_count: usize) {
786    if unreadable_count == 0 {
787        println!("Found {} snapshots:", snapshots.len());
788    } else {
789        println!(
790            "Found {} snapshots ({} {} skipped: unreadable manifest):",
791            snapshots.len(),
792            unreadable_count,
793            directory_word(unreadable_count)
794        );
795    }
796    println!();
797    println!(
798        "  {:<24}  {:<36}  {:<19}  {:<9}  {:<7}  {:<6}  Status",
799        "Path", "ID", "Created", "Catalog", "Schemas", "Chunks"
800    );
801    println!(
802        "  {:<24}  {:<36}  {:<19}  {:<9}  {:<7}  {:<6}  {:<10}",
803        "-".repeat(24),
804        "-".repeat(36),
805        "-".repeat(19),
806        "-".repeat(9),
807        "-".repeat(7),
808        "-".repeat(6),
809        "-".repeat(10)
810    );
811    for entry in snapshots {
812        let manifest = &entry.manifest;
813        println!(
814            "  {:<24}  {:<36}  {:<19}  {:<9}  {:<7}  {:<6}  {}",
815            entry.path,
816            manifest.snapshot_id,
817            manifest.created_at.format("%Y-%m-%d %H:%M:%S"),
818            manifest.catalog,
819            manifest.schemas.len(),
820            format_list_chunks(manifest),
821            snapshot_status(manifest)
822        );
823    }
824}
825
826fn print_unreadable_warnings(unreadable: &[String]) {
827    if unreadable.is_empty() {
828        return;
829    }
830
831    println!();
832    println!(
833        "Warning: {} {} had corrupt/unreadable manifest.json:",
834        unreadable.len(),
835        directory_word(unreadable.len())
836    );
837    for path in unreadable {
838        println!("  - {}", path);
839    }
840}
841
842fn directory_word(count: usize) -> &'static str {
843    if count == 1 {
844        "directory"
845    } else {
846        "directories"
847    }
848}
849
850fn snapshot_status(manifest: &Manifest) -> &'static str {
851    if manifest.schema_only {
852        "schema-only"
853    } else if manifest.is_complete() {
854        "complete"
855    } else {
856        "incomplete"
857    }
858}
859
860fn format_list_chunks(manifest: &Manifest) -> String {
861    let total = manifest.chunks.len();
862    if total == 0 {
863        return "0".to_string();
864    }
865
866    format!(
867        "{}/{}",
868        manifest.completed_count() + manifest.skipped_count(),
869        total
870    )
871}
872
873#[derive(Debug, Clone, Copy, PartialEq, Eq)]
874enum VerifySeverity {
875    Error,
876    Warn,
877}
878
879impl VerifySeverity {
880    fn as_str(self) -> &'static str {
881        match self {
882            VerifySeverity::Error => "ERROR",
883            VerifySeverity::Warn => "WARN",
884        }
885    }
886}
887
888#[derive(Debug)]
889struct VerifyProblem {
890    severity: VerifySeverity,
891    message: String,
892}
893
894#[derive(Debug, Default)]
895struct VerifyChunkSummary {
896    total: usize,
897    completed: usize,
898    skipped: usize,
899    pending: usize,
900    in_progress: usize,
901    failed: usize,
902}
903
904#[derive(Debug)]
905struct VerifyReport {
906    manifest: Manifest,
907    schema_index_exists: bool,
908    ddl_file_count: usize,
909    chunk_summary: VerifyChunkSummary,
910    data_files_total: usize,
911    data_files_verified: usize,
912    problems: Vec<VerifyProblem>,
913}
914
915impl VerifyReport {
916    fn error_count(&self) -> usize {
917        self.problems
918            .iter()
919            .filter(|problem| problem.severity == VerifySeverity::Error)
920            .count()
921    }
922
923    fn warning_count(&self) -> usize {
924        self.problems
925            .iter()
926            .filter(|problem| problem.severity == VerifySeverity::Warn)
927            .count()
928    }
929
930    fn has_problems(&self) -> bool {
931        !self.problems.is_empty()
932    }
933
934    fn push_error(&mut self, message: impl Into<String>) {
935        self.problems.push(VerifyProblem {
936            severity: VerifySeverity::Error,
937            message: message.into(),
938        });
939    }
940
941    fn push_warn(&mut self, message: impl Into<String>) {
942        self.problems.push(VerifyProblem {
943            severity: VerifySeverity::Warn,
944            message: message.into(),
945        });
946    }
947}
948
949async fn verify_snapshot(storage: &OpenDalStorage) -> Result<VerifyReport> {
950    let manifest = storage.read_manifest().await?;
951    let schema_index_path = format!("{}/{}", SCHEMA_DIR, SCHEMAS_FILE);
952    let ddl_prefix = format!("{}/{}/", SCHEMA_DIR, DDL_DIR);
953    let schema_index_exists = storage.file_exists(&schema_index_path).await?;
954    let ddl_files: HashSet<_> = storage
955        .list_files_recursive(&ddl_prefix)
956        .await?
957        .into_iter()
958        .collect();
959    let ddl_file_count = ddl_files
960        .iter()
961        .filter(|path| path.ends_with(".sql"))
962        .count();
963
964    let mut report = VerifyReport {
965        manifest,
966        schema_index_exists,
967        ddl_file_count,
968        chunk_summary: VerifyChunkSummary::default(),
969        data_files_total: 0,
970        data_files_verified: 0,
971        problems: Vec::new(),
972    };
973
974    if report.manifest.version != MANIFEST_VERSION {
975        report.push_error(format!(
976            "Manifest version mismatch: expected {}, found {}",
977            MANIFEST_VERSION, report.manifest.version
978        ));
979    }
980
981    if !report.schema_index_exists {
982        report.push_warn(format!("Missing schema index '{}'", schema_index_path));
983    }
984
985    for schema in &report.manifest.schemas {
986        let ddl_path = ddl_path_for_schema(schema);
987        if !ddl_files.contains(ddl_path.as_str()) {
988            report.problems.push(VerifyProblem {
989                severity: VerifySeverity::Error,
990                message: format!("Schema '{}': missing DDL file '{}'", schema, ddl_path),
991            });
992        }
993    }
994
995    report.chunk_summary = summarize_chunks(&report.manifest);
996    if report.manifest.schema_only {
997        let chunk_count = report.manifest.chunks.len();
998        if chunk_count > 0 {
999            report.push_error(format!(
1000                "Schema-only snapshot should not contain data chunks (found {})",
1001                chunk_count
1002            ));
1003        }
1004        let data_files = storage.list_files_recursive("data/").await?;
1005        if let Some(path) = data_files.first() {
1006            report.push_error(format!(
1007                "Schema-only snapshot should not contain data files (found '{}')",
1008                path
1009            ));
1010        }
1011    } else if report.manifest.chunks.is_empty() {
1012        report.push_error("Full snapshot should contain at least one data chunk");
1013    } else {
1014        verify_chunks_and_data_files(storage, &mut report).await?;
1015    }
1016
1017    Ok(report)
1018}
1019
1020fn summarize_chunks(manifest: &Manifest) -> VerifyChunkSummary {
1021    VerifyChunkSummary {
1022        total: manifest.chunks.len(),
1023        completed: manifest.completed_count(),
1024        skipped: manifest.skipped_count(),
1025        pending: manifest.pending_count(),
1026        in_progress: manifest.in_progress_count(),
1027        failed: manifest.failed_count(),
1028    }
1029}
1030
1031async fn verify_chunks_and_data_files(
1032    storage: &OpenDalStorage,
1033    report: &mut VerifyReport,
1034) -> Result<()> {
1035    let existing_files: HashSet<_> = storage
1036        .list_files_recursive("data/")
1037        .await?
1038        .into_iter()
1039        .collect();
1040    let mut data_files_total = 0;
1041    let mut data_files_verified = 0;
1042    let mut problems = Vec::new();
1043    let mut seen_chunk_ids = HashSet::new();
1044    let mut claimed_data_files = HashSet::new();
1045
1046    for chunk in &report.manifest.chunks {
1047        if !seen_chunk_ids.insert(chunk.id) {
1048            problems.push(VerifyProblem {
1049                severity: VerifySeverity::Error,
1050                message: format!("Chunk {}: duplicate chunk id", chunk.id),
1051            });
1052        }
1053        for file in &chunk.files {
1054            if let Some(path) = safe_manifest_data_file_path(file) {
1055                claimed_data_files.insert(path.to_string());
1056            }
1057        }
1058
1059        match chunk.status {
1060            ChunkStatus::Completed => {
1061                if chunk.files.is_empty() {
1062                    problems.push(VerifyProblem {
1063                        severity: VerifySeverity::Error,
1064                        message: format!("Chunk {}: completed chunk has no data files", chunk.id),
1065                    });
1066                    continue;
1067                }
1068                let allowed_prefixes = report
1069                    .manifest
1070                    .schemas
1071                    .iter()
1072                    .map(|schema| data_dir_for_schema_chunk(schema, chunk.id))
1073                    .collect::<Vec<_>>();
1074                for file in &chunk.files {
1075                    data_files_total += 1;
1076                    let Some(path) = valid_manifest_data_file_path(file, &allowed_prefixes) else {
1077                        problems.push(VerifyProblem {
1078                            severity: VerifySeverity::Error,
1079                            message: format!(
1080                                "Chunk {}: invalid data file path '{}'",
1081                                chunk.id, file
1082                            ),
1083                        });
1084                        continue;
1085                    };
1086
1087                    if existing_files.contains(path) {
1088                        data_files_verified += 1;
1089                    } else {
1090                        problems.push(VerifyProblem {
1091                            severity: VerifySeverity::Error,
1092                            message: format!("Chunk {}: missing file '{}'", chunk.id, path),
1093                        });
1094                    }
1095                }
1096            }
1097            ChunkStatus::Skipped => {
1098                if !chunk.files.is_empty() {
1099                    problems.push(VerifyProblem {
1100                        severity: VerifySeverity::Error,
1101                        message: format!(
1102                            "Chunk {}: skipped chunk should not list data files",
1103                            chunk.id
1104                        ),
1105                    });
1106                }
1107            }
1108            ChunkStatus::Pending => {
1109                problems.push(VerifyProblem {
1110                    severity: VerifySeverity::Error,
1111                    message: format!("Chunk {}: status is 'pending'", chunk.id),
1112                });
1113            }
1114            ChunkStatus::InProgress => {
1115                problems.push(VerifyProblem {
1116                    severity: VerifySeverity::Error,
1117                    message: format!("Chunk {}: status is 'in_progress'", chunk.id),
1118                });
1119            }
1120            ChunkStatus::Failed => {
1121                let reason = chunk.error.as_deref().unwrap_or("unknown error");
1122                problems.push(VerifyProblem {
1123                    severity: VerifySeverity::Error,
1124                    message: format!("Chunk {}: status is 'failed' (error: {})", chunk.id, reason),
1125                });
1126            }
1127        }
1128    }
1129
1130    for path in &existing_files {
1131        if !claimed_data_files.contains(path) {
1132            problems.push(VerifyProblem {
1133                severity: VerifySeverity::Error,
1134                message: format!("Unexpected data file '{}' is not listed in manifest", path),
1135            });
1136        }
1137    }
1138
1139    report.data_files_total = data_files_total;
1140    report.data_files_verified = data_files_verified;
1141    report.problems.extend(problems);
1142
1143    Ok(())
1144}
1145
1146fn valid_manifest_data_file_path<'a>(
1147    path: &'a str,
1148    allowed_prefixes: &[String],
1149) -> Option<&'a str> {
1150    let normalized = safe_manifest_data_file_path(path)?;
1151
1152    if !allowed_prefixes
1153        .iter()
1154        .any(|prefix| normalized.starts_with(prefix))
1155    {
1156        return None;
1157    }
1158
1159    Some(normalized)
1160}
1161
1162fn safe_manifest_data_file_path(path: &str) -> Option<&str> {
1163    let normalized = path.trim_start_matches('/');
1164    if normalized.is_empty() || !normalized.starts_with("data/") {
1165        return None;
1166    }
1167
1168    if normalized
1169        .split('/')
1170        .any(|segment| segment.is_empty() || segment == "." || segment == "..")
1171    {
1172        return None;
1173    }
1174
1175    Some(normalized)
1176}
1177
1178fn print_verify_report(snapshot: &str, report: &VerifyReport) {
1179    println!("Verifying snapshot: {}", report.manifest.snapshot_id);
1180    println!("  Location:     {}", snapshot);
1181    if report.manifest.version == MANIFEST_VERSION {
1182        println!("  Manifest:     OK (version {})", report.manifest.version);
1183    } else {
1184        println!(
1185            "  Manifest:     ERROR (version {}, expected {})",
1186            report.manifest.version, MANIFEST_VERSION
1187        );
1188    }
1189    println!(
1190        "  Schema files: {}",
1191        if report.schema_index_exists {
1192            format!("OK ({})", SCHEMAS_FILE)
1193        } else {
1194            format!("WARN (missing {})", SCHEMAS_FILE)
1195        }
1196    );
1197    if report.ddl_file_count > 0 {
1198        println!("  DDL files:    {} file(s) found", report.ddl_file_count);
1199    } else {
1200        println!("  DDL files:    not present");
1201    }
1202
1203    let chunks = &report.chunk_summary;
1204    println!(
1205        "  Chunks:       {} total ({} completed, {} skipped, {} pending, {} in_progress, {} failed)",
1206        chunks.total,
1207        chunks.completed,
1208        chunks.skipped,
1209        chunks.pending,
1210        chunks.in_progress,
1211        chunks.failed
1212    );
1213
1214    if report.manifest.schema_only {
1215        println!("  Data files:   skipped (schema-only)");
1216    } else {
1217        println!(
1218            "  Data files:   {}/{} files verified",
1219            report.data_files_verified, report.data_files_total
1220        );
1221    }
1222
1223    if report.problems.is_empty() {
1224        println!();
1225        println!("Snapshot is valid.");
1226        return;
1227    }
1228
1229    println!();
1230    println!("Problems found:");
1231    for problem in &report.problems {
1232        println!("  [{}] {}", problem.severity.as_str(), problem.message);
1233    }
1234    println!();
1235    println!(
1236        "Snapshot has {} error(s), {} warning(s).",
1237        report.error_count(),
1238        report.warning_count()
1239    );
1240}
1241
1242#[cfg(test)]
1243mod tests {
1244    use chrono::TimeZone;
1245    use clap::Parser;
1246    use tempfile::tempdir;
1247    use url::Url;
1248
1249    use super::*;
1250    use crate::data::path::ddl_path_for_schema;
1251
1252    #[test]
1253    fn test_ddl_path_for_schema() {
1254        assert_eq!(ddl_path_for_schema("public"), "schema/ddl/public.sql");
1255        assert_eq!(
1256            ddl_path_for_schema("../evil"),
1257            "schema/ddl/%2E%2E%2Fevil.sql"
1258        );
1259    }
1260
1261    #[test]
1262    fn test_build_schema_ddl_order() {
1263        let ddl = build_schema_ddl(
1264            "public",
1265            "CREATE DATABASE public;\n".to_string(),
1266            vec!["PHYSICAL;\n".to_string()],
1267            vec!["TABLE;\n".to_string()],
1268            vec!["VIEW;\n".to_string()],
1269        );
1270
1271        let db_pos = ddl.find("CREATE DATABASE").unwrap();
1272        let physical_pos = ddl.find("PHYSICAL;").unwrap();
1273        let table_pos = ddl.find("TABLE;").unwrap();
1274        let view_pos = ddl.find("VIEW;").unwrap();
1275        assert!(db_pos < physical_pos);
1276        assert!(physical_pos < table_pos);
1277        assert!(table_pos < view_pos);
1278    }
1279
1280    #[tokio::test]
1281    async fn test_build_rejects_chunk_window_without_bounds() {
1282        let cmd = ExportCreateCommand::parse_from([
1283            "export-v2-create",
1284            "--addr",
1285            "127.0.0.1:4000",
1286            "--to",
1287            "file:///tmp/export-v2-test",
1288            "--chunk-time-window",
1289            "1h",
1290        ]);
1291
1292        let result = cmd.build().await;
1293        assert!(result.is_err());
1294        let error = result.err().unwrap().to_string();
1295
1296        assert!(error.contains("chunk_time_window requires both --start-time and --end-time"));
1297    }
1298
1299    #[tokio::test]
1300    async fn test_build_rejects_data_export_args_in_schema_only_mode() {
1301        let cmd = ExportCreateCommand::parse_from([
1302            "export-v2-create",
1303            "--addr",
1304            "127.0.0.1:4000",
1305            "--to",
1306            "file:///tmp/export-v2-test",
1307            "--schema-only",
1308            "--start-time",
1309            "2024-01-01T00:00:00Z",
1310            "--end-time",
1311            "2024-01-02T00:00:00Z",
1312            "--chunk-time-window",
1313            "1h",
1314            "--format",
1315            "csv",
1316            "--parallelism",
1317            "2",
1318        ]);
1319
1320        let error = cmd.build().await.err().unwrap().to_string();
1321
1322        assert!(error.contains("--schema-only cannot be used with data export arguments"));
1323        assert!(error.contains("--start-time"));
1324        assert!(error.contains("--end-time"));
1325        assert!(error.contains("--chunk-time-window"));
1326        assert!(error.contains("--format"));
1327        assert!(error.contains("--parallelism"));
1328    }
1329
1330    #[test]
1331    fn test_schema_only_mode_mismatch_error_message() {
1332        let error = crate::data::export_v2::error::SchemaOnlyModeMismatchSnafu {
1333            existing_schema_only: false,
1334            requested_schema_only: true,
1335        }
1336        .build()
1337        .to_string();
1338
1339        assert!(error.contains("existing: false"));
1340        assert!(error.contains("requested: true"));
1341    }
1342
1343    #[test]
1344    fn test_validate_resume_config_rejects_catalog_mismatch() {
1345        let manifest = Manifest::new_for_export(
1346            "greptime".to_string(),
1347            vec!["public".to_string()],
1348            false,
1349            TimeRange::unbounded(),
1350            DataFormat::Parquet,
1351            None,
1352        )
1353        .unwrap();
1354        let config = ExportConfig {
1355            catalog: "other".to_string(),
1356            schemas: None,
1357            schema_only: false,
1358            format: DataFormat::Parquet,
1359            force: false,
1360            time_range: TimeRange::unbounded(),
1361            chunk_time_window: None,
1362            parallelism: 1,
1363            snapshot_uri: "file:///tmp/snapshot".to_string(),
1364            storage_config: ObjectStoreConfig::default(),
1365        };
1366
1367        let error = validate_resume_config(&manifest, &config)
1368            .err()
1369            .unwrap()
1370            .to_string();
1371        assert!(error.contains("catalog"));
1372    }
1373
1374    #[test]
1375    fn test_validate_resume_config_accepts_schema_selection_with_different_case_and_order() {
1376        let manifest = Manifest::new_for_export(
1377            "greptime".to_string(),
1378            vec!["public".to_string(), "analytics".to_string()],
1379            false,
1380            TimeRange::unbounded(),
1381            DataFormat::Parquet,
1382            None,
1383        )
1384        .unwrap();
1385        let config = ExportConfig {
1386            catalog: "greptime".to_string(),
1387            schemas: Some(vec![
1388                "ANALYTICS".to_string(),
1389                "PUBLIC".to_string(),
1390                "public".to_string(),
1391            ]),
1392            schema_only: false,
1393            format: DataFormat::Parquet,
1394            force: false,
1395            time_range: TimeRange::unbounded(),
1396            chunk_time_window: None,
1397            parallelism: 1,
1398            snapshot_uri: "file:///tmp/snapshot".to_string(),
1399            storage_config: ObjectStoreConfig::default(),
1400        };
1401
1402        assert!(validate_resume_config(&manifest, &config).is_ok());
1403    }
1404
1405    #[test]
1406    fn test_validate_resume_config_rejects_chunk_plan_mismatch() {
1407        let start = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
1408        let end = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 2, 0, 0).unwrap();
1409        let time_range = TimeRange::new(Some(start), Some(end));
1410        let manifest = Manifest::new_for_export(
1411            "greptime".to_string(),
1412            vec!["public".to_string()],
1413            false,
1414            time_range.clone(),
1415            DataFormat::Parquet,
1416            None,
1417        )
1418        .unwrap();
1419        let config = ExportConfig {
1420            catalog: "greptime".to_string(),
1421            schemas: None,
1422            schema_only: false,
1423            format: DataFormat::Parquet,
1424            force: false,
1425            time_range,
1426            chunk_time_window: Some(Duration::from_secs(3600)),
1427            parallelism: 1,
1428            snapshot_uri: "file:///tmp/snapshot".to_string(),
1429            storage_config: ObjectStoreConfig::default(),
1430        };
1431
1432        let error = validate_resume_config(&manifest, &config)
1433            .err()
1434            .unwrap()
1435            .to_string();
1436        assert!(error.contains("chunk plan"));
1437    }
1438
1439    #[test]
1440    fn test_validate_resume_config_rejects_format_mismatch() {
1441        let manifest = Manifest::new_for_export(
1442            "greptime".to_string(),
1443            vec!["public".to_string()],
1444            false,
1445            TimeRange::unbounded(),
1446            DataFormat::Parquet,
1447            None,
1448        )
1449        .unwrap();
1450        let config = ExportConfig {
1451            catalog: "greptime".to_string(),
1452            schemas: None,
1453            schema_only: false,
1454            format: DataFormat::Csv,
1455            force: false,
1456            time_range: TimeRange::unbounded(),
1457            chunk_time_window: None,
1458            parallelism: 1,
1459            snapshot_uri: "file:///tmp/snapshot".to_string(),
1460            storage_config: ObjectStoreConfig::default(),
1461        };
1462
1463        let error = validate_resume_config(&manifest, &config)
1464            .err()
1465            .unwrap()
1466            .to_string();
1467        assert!(error.contains("format"));
1468    }
1469
1470    #[test]
1471    fn test_validate_resume_config_rejects_time_range_mismatch() {
1472        let start = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
1473        let end = chrono::Utc.with_ymd_and_hms(2025, 1, 1, 1, 0, 0).unwrap();
1474        let manifest = Manifest::new_for_export(
1475            "greptime".to_string(),
1476            vec!["public".to_string()],
1477            false,
1478            TimeRange::new(Some(start), Some(end)),
1479            DataFormat::Parquet,
1480            None,
1481        )
1482        .unwrap();
1483        let config = ExportConfig {
1484            catalog: "greptime".to_string(),
1485            schemas: None,
1486            schema_only: false,
1487            format: DataFormat::Parquet,
1488            force: false,
1489            time_range: TimeRange::new(Some(start), Some(start)),
1490            chunk_time_window: None,
1491            parallelism: 1,
1492            snapshot_uri: "file:///tmp/snapshot".to_string(),
1493            storage_config: ObjectStoreConfig::default(),
1494        };
1495
1496        let error = validate_resume_config(&manifest, &config)
1497            .err()
1498            .unwrap()
1499            .to_string();
1500        assert!(error.contains("time_range"));
1501    }
1502
1503    #[tokio::test]
1504    async fn test_scan_snapshots_sorts_and_tracks_unreadable_manifests() {
1505        let dir = tempdir().unwrap();
1506        write_test_manifest(
1507            dir.path(),
1508            "older",
1509            test_manifest(
1510                chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1511                false,
1512                true,
1513            ),
1514        );
1515        write_test_manifest(
1516            dir.path(),
1517            "newer",
1518            test_manifest(
1519                chrono::Utc.with_ymd_and_hms(2026, 2, 1, 0, 0, 0).unwrap(),
1520                false,
1521                true,
1522            ),
1523        );
1524
1525        std::fs::create_dir_all(dir.path().join("empty-dir")).unwrap();
1526        std::fs::create_dir_all(dir.path().join("not-snapshot")).unwrap();
1527        std::fs::write(dir.path().join("not-snapshot").join("data.txt"), "x").unwrap();
1528        std::fs::create_dir_all(dir.path().join("broken")).unwrap();
1529        std::fs::write(dir.path().join("broken").join(MANIFEST_FILE), "{not-json").unwrap();
1530
1531        let uri = Url::from_directory_path(dir.path()).unwrap().to_string();
1532        let storage = OpenDalStorage::from_file_uri(&uri).unwrap();
1533        let result = scan_snapshots(&storage).await.unwrap();
1534
1535        assert_eq!(result.snapshots.len(), 2);
1536        assert_eq!(
1537            result.snapshots[0].manifest.created_at,
1538            chrono::Utc.with_ymd_and_hms(2026, 2, 1, 0, 0, 0).unwrap()
1539        );
1540        assert_eq!(
1541            result.snapshots[1].manifest.created_at,
1542            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap()
1543        );
1544        assert_eq!(result.unreadable, vec!["broken/".to_string()]);
1545        assert_eq!(result.snapshots[0].path, "newer/");
1546        assert_eq!(result.snapshots[1].path, "older/");
1547    }
1548
1549    #[test]
1550    fn test_snapshot_list_status_and_chunk_summary() {
1551        let schema_only = test_manifest(
1552            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1553            true,
1554            true,
1555        );
1556        assert_eq!(snapshot_status(&schema_only), "schema-only");
1557        assert_eq!(format_list_chunks(&schema_only), "0");
1558
1559        let complete = test_manifest(
1560            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1561            false,
1562            true,
1563        );
1564        assert_eq!(snapshot_status(&complete), "complete");
1565        assert_eq!(format_list_chunks(&complete), "2/2");
1566
1567        let incomplete = test_manifest(
1568            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1569            false,
1570            false,
1571        );
1572        assert_eq!(snapshot_status(&incomplete), "incomplete");
1573        assert_eq!(format_list_chunks(&incomplete), "1/2");
1574    }
1575
1576    #[tokio::test]
1577    async fn test_verify_snapshot_accepts_valid_full_snapshot() {
1578        let dir = tempdir().unwrap();
1579        let manifest = test_manifest(
1580            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1581            false,
1582            true,
1583        );
1584        write_root_manifest(dir.path(), manifest);
1585        write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
1586        write_default_ddl_files(dir.path());
1587        write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
1588
1589        let storage = file_storage_for_dir(dir.path());
1590        let report = verify_snapshot(&storage).await.unwrap();
1591
1592        assert_eq!(report.error_count(), 0);
1593        assert_eq!(report.warning_count(), 0);
1594        assert_eq!(report.data_files_total, 1);
1595        assert_eq!(report.data_files_verified, 1);
1596    }
1597
1598    #[tokio::test]
1599    async fn test_verify_snapshot_reports_missing_data_file_and_failed_chunk() {
1600        let dir = tempdir().unwrap();
1601        let mut manifest = test_manifest(
1602            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1603            false,
1604            true,
1605        );
1606        manifest.chunks[1].mark_failed("copy failed".to_string());
1607        write_root_manifest(dir.path(), manifest);
1608        write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
1609        write_default_ddl_files(dir.path());
1610
1611        let storage = file_storage_for_dir(dir.path());
1612        let report = verify_snapshot(&storage).await.unwrap();
1613
1614        assert_eq!(report.error_count(), 2);
1615        assert!(
1616            report
1617                .problems
1618                .iter()
1619                .any(|problem| problem.message.contains("missing file"))
1620        );
1621        assert!(
1622            report
1623                .problems
1624                .iter()
1625                .any(|problem| problem.message.contains("status is 'failed'"))
1626        );
1627    }
1628
1629    #[tokio::test]
1630    async fn test_verify_snapshot_reports_missing_schema_index_as_warning() {
1631        let dir = tempdir().unwrap();
1632        let manifest = test_manifest(
1633            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1634            false,
1635            true,
1636        );
1637        write_root_manifest(dir.path(), manifest);
1638        write_default_ddl_files(dir.path());
1639        write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
1640
1641        let storage = file_storage_for_dir(dir.path());
1642        let report = verify_snapshot(&storage).await.unwrap();
1643
1644        assert_eq!(report.error_count(), 0);
1645        assert_eq!(report.warning_count(), 1);
1646        assert!(
1647            report
1648                .problems
1649                .iter()
1650                .any(|problem| problem.message.contains("Missing schema index"))
1651        );
1652    }
1653
1654    #[tokio::test]
1655    async fn test_verify_snapshot_rejects_schema_only_snapshot_with_chunks() {
1656        let dir = tempdir().unwrap();
1657        let mut manifest = test_manifest(
1658            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1659            true,
1660            true,
1661        );
1662        let mut chunk = ChunkMeta::new(1, TimeRange::unbounded());
1663        chunk.mark_completed(vec!["data/public/1/file.parquet".to_string()], None);
1664        manifest.chunks.push(chunk);
1665        write_root_manifest(dir.path(), manifest);
1666        write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
1667        write_default_ddl_files(dir.path());
1668
1669        let storage = file_storage_for_dir(dir.path());
1670        let report = verify_snapshot(&storage).await.unwrap();
1671
1672        assert_eq!(report.error_count(), 1);
1673        assert_eq!(report.data_files_total, 0);
1674        assert!(
1675            report
1676                .problems
1677                .iter()
1678                .any(|problem| problem.message.contains("should not contain data chunks"))
1679        );
1680    }
1681
1682    #[tokio::test]
1683    async fn test_verify_snapshot_rejects_schema_only_snapshot_with_data_files() {
1684        let dir = tempdir().unwrap();
1685        let manifest = test_manifest(
1686            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1687            true,
1688            true,
1689        );
1690        write_root_manifest(dir.path(), manifest);
1691        write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
1692        write_default_ddl_files(dir.path());
1693        write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
1694
1695        let storage = file_storage_for_dir(dir.path());
1696        let report = verify_snapshot(&storage).await.unwrap();
1697
1698        assert_eq!(report.error_count(), 1);
1699        assert_eq!(report.data_files_total, 0);
1700        assert!(
1701            report
1702                .problems
1703                .iter()
1704                .any(|problem| problem.message.contains("should not contain data files"))
1705        );
1706    }
1707
1708    #[tokio::test]
1709    async fn test_verify_snapshot_rejects_full_snapshot_without_chunks() {
1710        let dir = tempdir().unwrap();
1711        let mut manifest = test_manifest(
1712            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1713            false,
1714            true,
1715        );
1716        manifest.chunks.clear();
1717        write_root_manifest(dir.path(), manifest);
1718        write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
1719        write_default_ddl_files(dir.path());
1720
1721        let storage = file_storage_for_dir(dir.path());
1722        let report = verify_snapshot(&storage).await.unwrap();
1723
1724        assert_eq!(report.error_count(), 1);
1725        assert_eq!(report.data_files_total, 0);
1726        assert!(
1727            report
1728                .problems
1729                .iter()
1730                .any(|problem| problem.message.contains("at least one data chunk"))
1731        );
1732    }
1733
1734    #[tokio::test]
1735    async fn test_verify_snapshot_rejects_skipped_chunk_data_files() {
1736        let dir = tempdir().unwrap();
1737        let manifest = test_manifest(
1738            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1739            false,
1740            true,
1741        );
1742        write_root_manifest(dir.path(), manifest);
1743        write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
1744        write_default_ddl_files(dir.path());
1745        write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
1746        write_snapshot_file(dir.path(), "data/public/2/file.parquet", b"data");
1747
1748        let storage = file_storage_for_dir(dir.path());
1749        let report = verify_snapshot(&storage).await.unwrap();
1750
1751        assert_eq!(report.error_count(), 1);
1752        assert!(
1753            report
1754                .problems
1755                .iter()
1756                .any(|problem| { problem.message.contains("Unexpected data file") })
1757        );
1758    }
1759
1760    #[tokio::test]
1761    async fn test_verify_snapshot_rejects_duplicate_chunk_ids() {
1762        let dir = tempdir().unwrap();
1763        let mut manifest = test_manifest(
1764            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1765            false,
1766            true,
1767        );
1768        let mut duplicate = ChunkMeta::new(1, TimeRange::unbounded());
1769        duplicate.mark_completed(vec!["data/public/1/file.parquet".to_string()], None);
1770        manifest.chunks.push(duplicate);
1771        write_root_manifest(dir.path(), manifest);
1772        write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
1773        write_default_ddl_files(dir.path());
1774        write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
1775
1776        let storage = file_storage_for_dir(dir.path());
1777        let report = verify_snapshot(&storage).await.unwrap();
1778
1779        assert_eq!(report.error_count(), 1);
1780        assert!(
1781            report
1782                .problems
1783                .iter()
1784                .any(|problem| problem.message.contains("duplicate chunk id"))
1785        );
1786    }
1787
1788    #[tokio::test]
1789    async fn test_verify_snapshot_requires_all_schema_ddl() {
1790        let dir = tempdir().unwrap();
1791        let manifest = test_manifest(
1792            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1793            true,
1794            true,
1795        );
1796        write_root_manifest(dir.path(), manifest);
1797        write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
1798        write_snapshot_file(
1799            dir.path(),
1800            "schema/ddl/public.sql",
1801            b"CREATE DATABASE public;",
1802        );
1803
1804        let storage = file_storage_for_dir(dir.path());
1805        let report = verify_snapshot(&storage).await.unwrap();
1806
1807        assert_eq!(report.error_count(), 1);
1808        assert!(
1809            report
1810                .problems
1811                .iter()
1812                .any(|problem| problem.message.contains("analytics"))
1813        );
1814    }
1815
1816    #[tokio::test]
1817    async fn test_verify_snapshot_reports_missing_ddl_dir() {
1818        let dir = tempdir().unwrap();
1819        let manifest = test_manifest(
1820            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1821            false,
1822            true,
1823        );
1824        write_root_manifest(dir.path(), manifest);
1825        write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
1826        write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
1827
1828        let storage = file_storage_for_dir(dir.path());
1829        let report = verify_snapshot(&storage).await.unwrap();
1830
1831        assert_eq!(report.error_count(), 2);
1832        assert!(
1833            report
1834                .problems
1835                .iter()
1836                .any(|problem| problem.message.contains("schema/ddl/public.sql"))
1837        );
1838        assert!(
1839            report
1840                .problems
1841                .iter()
1842                .any(|problem| problem.message.contains("schema/ddl/analytics.sql"))
1843        );
1844    }
1845
1846    #[tokio::test]
1847    async fn test_verify_snapshot_reports_manifest_version_mismatch() {
1848        let dir = tempdir().unwrap();
1849        let mut manifest = test_manifest(
1850            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1851            false,
1852            true,
1853        );
1854        manifest.version = MANIFEST_VERSION + 1;
1855        write_root_manifest(dir.path(), manifest);
1856        write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
1857        write_default_ddl_files(dir.path());
1858        write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
1859
1860        let storage = file_storage_for_dir(dir.path());
1861        let report = verify_snapshot(&storage).await.unwrap();
1862
1863        assert_eq!(report.error_count(), 1);
1864        assert!(
1865            report
1866                .problems
1867                .iter()
1868                .any(|problem| problem.message.contains("Manifest version mismatch"))
1869        );
1870    }
1871
1872    #[tokio::test]
1873    async fn test_verify_snapshot_rejects_invalid_data_file_paths() {
1874        let dir = tempdir().unwrap();
1875        let mut manifest = test_manifest(
1876            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1877            false,
1878            true,
1879        );
1880        manifest.chunks[0].files = vec!["data/public/1/../file.parquet".to_string()];
1881        write_root_manifest(dir.path(), manifest);
1882        write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
1883        write_default_ddl_files(dir.path());
1884
1885        let storage = file_storage_for_dir(dir.path());
1886        let report = verify_snapshot(&storage).await.unwrap();
1887
1888        assert_eq!(report.error_count(), 1);
1889        assert!(
1890            report
1891                .problems
1892                .iter()
1893                .any(|problem| problem.message.contains("invalid data file path"))
1894        );
1895        assert_eq!(report.data_files_verified, 0);
1896    }
1897
1898    #[tokio::test]
1899    async fn test_verify_snapshot_accepts_leading_slash_manifest_data_paths() {
1900        let dir = tempdir().unwrap();
1901        let mut manifest = test_manifest(
1902            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1903            false,
1904            true,
1905        );
1906        manifest.chunks[0].files = vec!["/data/public/1/file.parquet".to_string()];
1907        write_root_manifest(dir.path(), manifest);
1908        write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
1909        write_default_ddl_files(dir.path());
1910        write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
1911
1912        let storage = file_storage_for_dir(dir.path());
1913        let report = verify_snapshot(&storage).await.unwrap();
1914
1915        assert_eq!(report.error_count(), 0);
1916        assert_eq!(report.data_files_verified, 1);
1917    }
1918
1919    #[tokio::test]
1920    async fn test_verify_snapshot_rejects_unlisted_files_under_completed_chunk_prefix() {
1921        let dir = tempdir().unwrap();
1922        let manifest = test_manifest(
1923            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1924            false,
1925            true,
1926        );
1927        write_root_manifest(dir.path(), manifest);
1928        write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
1929        write_default_ddl_files(dir.path());
1930        write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
1931        write_snapshot_file(dir.path(), "data/public/1/extra.parquet", b"data");
1932
1933        let storage = file_storage_for_dir(dir.path());
1934        let report = verify_snapshot(&storage).await.unwrap();
1935
1936        assert_eq!(report.error_count(), 1);
1937        assert!(
1938            report
1939                .problems
1940                .iter()
1941                .any(|problem| problem.message.contains("Unexpected data file"))
1942        );
1943        assert_eq!(report.data_files_verified, 1);
1944    }
1945
1946    #[tokio::test]
1947    async fn test_verify_snapshot_rejects_orphan_data_files_outside_known_chunk_prefixes() {
1948        let dir = tempdir().unwrap();
1949        let manifest = test_manifest(
1950            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1951            false,
1952            true,
1953        );
1954        write_root_manifest(dir.path(), manifest);
1955        write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
1956        write_default_ddl_files(dir.path());
1957        write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
1958        write_snapshot_file(dir.path(), "data/public/99/file.parquet", b"data");
1959
1960        let storage = file_storage_for_dir(dir.path());
1961        let report = verify_snapshot(&storage).await.unwrap();
1962
1963        assert_eq!(report.error_count(), 1);
1964        assert!(
1965            report
1966                .problems
1967                .iter()
1968                .any(|problem| problem.message.contains("Unexpected data file"))
1969        );
1970        assert_eq!(report.data_files_verified, 1);
1971    }
1972
1973    #[tokio::test]
1974    async fn test_verify_snapshot_rejects_data_files_under_wrong_chunk_or_schema() {
1975        let dir = tempdir().unwrap();
1976        let mut manifest = test_manifest(
1977            chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
1978            false,
1979            true,
1980        );
1981        manifest.chunks[0].files = vec![
1982            "data/public/99/file.parquet".to_string(),
1983            "data/metrics/1/file.parquet".to_string(),
1984        ];
1985        write_root_manifest(dir.path(), manifest);
1986        write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
1987        write_default_ddl_files(dir.path());
1988        write_snapshot_file(dir.path(), "data/public/99/file.parquet", b"data");
1989        write_snapshot_file(dir.path(), "data/metrics/1/file.parquet", b"data");
1990
1991        let storage = file_storage_for_dir(dir.path());
1992        let report = verify_snapshot(&storage).await.unwrap();
1993
1994        assert_eq!(report.error_count(), 2);
1995        assert_eq!(report.data_files_verified, 0);
1996        assert!(
1997            report
1998                .problems
1999                .iter()
2000                .all(|problem| problem.message.contains("invalid data file path"))
2001        );
2002    }
2003
2004    fn write_test_manifest(root: &std::path::Path, dir: &str, manifest: Manifest) {
2005        let snapshot_dir = root.join(dir);
2006        std::fs::create_dir_all(&snapshot_dir).unwrap();
2007        std::fs::write(
2008            snapshot_dir.join(MANIFEST_FILE),
2009            serde_json::to_vec_pretty(&manifest).unwrap(),
2010        )
2011        .unwrap();
2012    }
2013
2014    fn write_root_manifest(root: &std::path::Path, manifest: Manifest) {
2015        std::fs::write(
2016            root.join(MANIFEST_FILE),
2017            serde_json::to_vec_pretty(&manifest).unwrap(),
2018        )
2019        .unwrap();
2020    }
2021
2022    fn write_snapshot_file(root: &std::path::Path, relative_path: &str, content: &[u8]) {
2023        let mut path = root.to_path_buf();
2024        for segment in relative_path.split('/') {
2025            path.push(segment);
2026        }
2027        std::fs::create_dir_all(path.parent().unwrap()).unwrap();
2028        std::fs::write(path, content).unwrap();
2029    }
2030
2031    fn write_default_ddl_files(root: &std::path::Path) {
2032        write_snapshot_file(root, "schema/ddl/public.sql", b"CREATE DATABASE public;");
2033        write_snapshot_file(
2034            root,
2035            "schema/ddl/analytics.sql",
2036            b"CREATE DATABASE analytics;",
2037        );
2038    }
2039
2040    fn file_storage_for_dir(root: &std::path::Path) -> OpenDalStorage {
2041        let uri = Url::from_directory_path(root).unwrap().to_string();
2042        OpenDalStorage::from_file_uri(&uri).unwrap()
2043    }
2044
2045    fn test_manifest(
2046        created_at: chrono::DateTime<chrono::Utc>,
2047        schema_only: bool,
2048        complete: bool,
2049    ) -> Manifest {
2050        let mut manifest = Manifest::new_for_export(
2051            "greptime".to_string(),
2052            vec!["public".to_string(), "analytics".to_string()],
2053            schema_only,
2054            TimeRange::unbounded(),
2055            DataFormat::Parquet,
2056            None,
2057        )
2058        .unwrap();
2059        manifest.created_at = created_at;
2060        manifest.updated_at = created_at;
2061
2062        if !schema_only {
2063            manifest.chunks.clear();
2064            let mut first = ChunkMeta::new(1, TimeRange::unbounded());
2065            first.mark_completed(vec!["data/public/1/file.parquet".to_string()], None);
2066            manifest.chunks.push(first);
2067
2068            if complete {
2069                manifest
2070                    .chunks
2071                    .push(ChunkMeta::skipped(2, TimeRange::unbounded()));
2072            } else {
2073                manifest
2074                    .chunks
2075                    .push(ChunkMeta::new(2, TimeRange::unbounded()));
2076            }
2077        }
2078
2079        manifest
2080    }
2081}