diff --git a/Cargo.lock b/Cargo.lock index 717ed9c5e3..4d35a7e9bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1928,7 +1928,6 @@ dependencies = [ "async-stream", "async-trait", "auth", - "backon", "base64 0.22.1", "cache", "catalog", diff --git a/src/cli/Cargo.toml b/src/cli/Cargo.toml index 172a691f6e..8c4150cd46 100644 --- a/src/cli/Cargo.toml +++ b/src/cli/Cargo.toml @@ -19,7 +19,6 @@ workspace = true async-stream.workspace = true async-trait.workspace = true auth.workspace = true -backon.workspace = true base64.workspace = true cache.workspace = true catalog.workspace = true diff --git a/src/cli/src/data.rs b/src/cli/src/data.rs index ac54138490..114886542e 100644 --- a/src/cli/src/data.rs +++ b/src/cli/src/data.rs @@ -17,7 +17,6 @@ pub mod export_v2; mod import; pub mod import_v2; pub(crate) mod path; -pub(crate) mod retry; pub mod snapshot_storage; pub(crate) mod sql; mod storage_export; diff --git a/src/cli/src/data/import_v2.rs b/src/cli/src/data/import_v2.rs index 2b8a935607..b197f59b3b 100644 --- a/src/cli/src/data/import_v2.rs +++ b/src/cli/src/data/import_v2.rs @@ -35,6 +35,7 @@ //! ``` mod command; +pub(crate) mod coordinator; pub mod error; pub mod executor; pub(crate) mod state; diff --git a/src/cli/src/data/import_v2/command.rs b/src/cli/src/data/import_v2/command.rs index ea92e15ee1..9ee60252a0 100644 --- a/src/cli/src/data/import_v2/command.rs +++ b/src/cli/src/data/import_v2/command.rs @@ -15,24 +15,29 @@ //! Import V2 CLI command. use std::collections::HashSet; -use std::time::{Duration, Instant}; +use std::time::Duration; use async_trait::async_trait; use clap::Parser; use common_error::ext::BoxedError; use common_telemetry::info; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use crate::Tool; use crate::common::ObjectStoreConfig; use crate::data::export_v2::data::{build_copy_source, execute_copy_database_from}; use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus, DataFormat, MANIFEST_VERSION}; +use crate::data::import_v2::coordinator::{ + ImportResumeConfig, ImportTaskExecutor, build_import_tasks, chunk_has_schema_files, + import_with_resume_session, prepare_import_resume, +}; use crate::data::import_v2::error::{ - ChunkImportFailedSnafu, EmptyChunkManifestSnafu, IncompleteSnapshotSnafu, - ManifestVersionMismatchSnafu, MissingChunkDataSnafu, Result, SchemaNotInSnapshotSnafu, - SnapshotStorageSnafu, + ChunkImportFailedSnafu, EmptyChunkManifestSnafu, ImportStatePathUnavailableSnafu, + IncompleteSnapshotSnafu, ManifestVersionMismatchSnafu, MissingChunkDataSnafu, Result, + SchemaNotInSnapshotSnafu, SnapshotStorageSnafu, }; use crate::data::import_v2::executor::{DdlExecutor, DdlStatement}; +use crate::data::import_v2::state::{ImportTaskKey, default_state_path}; use crate::data::path::{data_dir_for_schema_chunk, ddl_path_for_schema}; use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri}; use crate::database::{DatabaseClient, parse_proxy_opts}; @@ -185,13 +190,12 @@ impl Import { info!("Generated {} DDL statements", ddl_statements.len()); - let data_prefixes = if !manifest.schema_only && !manifest.chunks.is_empty() { - Some( - validate_data_snapshot(self.storage.as_ref(), &manifest.chunks, &schemas_to_import) - .await?, - ) + let data_tasks = if !manifest.schema_only && !manifest.chunks.is_empty() { + validate_data_snapshot(self.storage.as_ref(), &manifest.chunks, &schemas_to_import) + .await?; + build_import_tasks(&manifest.chunks, &schemas_to_import) } else { - None + Vec::new() }; // 4. Dry-run mode: print DDL and exit @@ -212,24 +216,67 @@ impl Import { return Ok(()); } - // 5. Execute DDL - let executor = DdlExecutor::new(&self.database_client); - executor.execute_strict(&ddl_statements).await?; - - if !manifest.schema_only && !manifest.chunks.is_empty() { - self.import_data( - &manifest.chunks, + let mut resume_session = if !data_tasks.is_empty() { + let state_path = default_state_path( + &manifest.snapshot_id.to_string(), + self.database_client.addr(), + &self.catalog, &schemas_to_import, - manifest.format, - data_prefixes.expect("validated full snapshot must provide data prefixes"), ) - .await?; + .context(ImportStatePathUnavailableSnafu { + snapshot_id: manifest.snapshot_id.to_string(), + })?; + Some( + prepare_import_resume(ImportResumeConfig { + snapshot_id: manifest.snapshot_id.to_string(), + target_addr: self.database_client.addr().to_string(), + catalog: self.catalog.clone(), + schemas: schemas_to_import.clone(), + state_path, + tasks: data_tasks, + }) + .await?, + ) + } else { + None + }; + + let skip_ddl = resume_session + .as_ref() + .map(|session| session.should_skip_ddl()) + .unwrap_or(false); + + // 5. Execute DDL unless a previous run already completed it. + let ddl_executed = if skip_ddl { + info!( + "Existing import state has DDL marked completed; skipping DDL execution and resuming data import" + ); + false + } else { + let executor = DdlExecutor::new(&self.database_client); + executor.execute_strict(&ddl_statements).await?; + if let Some(session) = resume_session.as_mut() { + session.mark_ddl_completed().await?; + } + true + }; + + if let Some(resume_session) = resume_session { + let executor = CopyDatabaseImportTaskExecutor { + import: self, + format: manifest.format, + }; + import_with_resume_session(resume_session, &executor).await?; } - info!( - "Import completed: {} DDL statements executed", - ddl_statements.len() - ); + if ddl_executed { + info!( + "Import completed: {} DDL statements executed", + ddl_statements.len() + ); + } else { + info!("Import completed: DDL execution skipped"); + } Ok(()) } @@ -252,79 +299,39 @@ impl Import { Ok(statements) } +} - async fn import_data( - &self, - chunks: &[ChunkMeta], - schemas: &[String], - format: DataFormat, - actual_prefixes: HashSet, - ) -> Result<()> { - let import_start = Instant::now(); - let total_chunks = chunks - .iter() - .filter(|chunk| chunk.status == ChunkStatus::Completed) - .count(); - info!( - "Importing data: {} chunks, {} schemas", - total_chunks, - schemas.len() - ); +struct CopyDatabaseImportTaskExecutor<'a> { + import: &'a Import, + format: DataFormat, +} - for (idx, chunk) in chunks.iter().enumerate() { - if chunk.status == ChunkStatus::Skipped { - info!( - "[{}/{}] Chunk {}: skipped (no data)", - idx + 1, - chunks.len(), - chunk.id - ); - continue; - } +#[async_trait] +impl ImportTaskExecutor for CopyDatabaseImportTaskExecutor<'_> { + async fn import_task(&self, task: &ImportTaskKey) -> Result<()> { + let source = build_copy_source( + &self.import.snapshot_uri, + &self.import.storage_config, + &task.schema, + task.chunk_id, + ) + .context(ChunkImportFailedSnafu { + chunk_id: task.chunk_id, + schema: task.schema.clone(), + })?; - info!( - "[{}/{}] Chunk {} ({:?} ~ {:?})", - idx + 1, - chunks.len(), - chunk.id, - chunk.time_range.start, - chunk.time_range.end - ); - - for schema in schemas { - if !validate_chunk_schema_files(chunk, schema, &actual_prefixes)? { - info!(" {}: no data, skipped", schema); - continue; - } - - info!(" {}: importing...", schema); - let copy_start = Instant::now(); - let source = - build_copy_source(&self.snapshot_uri, &self.storage_config, schema, chunk.id) - .context(ChunkImportFailedSnafu { - chunk_id: chunk.id, - schema: schema.clone(), - })?; - - execute_copy_database_from( - &self.database_client, - &self.catalog, - schema, - &source, - format, - ) - .await - .context(ChunkImportFailedSnafu { - chunk_id: chunk.id, - schema: schema.clone(), - })?; - - info!(" {}: done in {:?}", schema, copy_start.elapsed()); - } - } - - info!("Data import finished in {:?}", import_start.elapsed()); - Ok(()) + execute_copy_database_from( + &self.import.database_client, + &self.import.catalog, + &task.schema, + &source, + self.format, + ) + .await + .context(ChunkImportFailedSnafu { + chunk_id: task.chunk_id, + schema: task.schema.clone(), + }) } } @@ -511,14 +518,6 @@ fn validate_chunk_statuses(chunks: &[ChunkMeta]) -> Result<()> { Ok(()) } -fn chunk_has_schema_files(chunk: &ChunkMeta, schema: &str) -> bool { - let prefix = data_dir_for_schema_chunk(schema, chunk.id); - chunk.files.iter().any(|path| { - let normalized = path.trim_start_matches('/'); - normalized.starts_with(&prefix) - }) -} - fn format_data_import_plan(chunks: &[ChunkMeta], schemas: &[String]) -> Vec { let mut lines = vec!["-- Data import plan:".to_string()]; for chunk in chunks { @@ -536,7 +535,7 @@ async fn validate_data_snapshot( storage: &dyn SnapshotStorage, chunks: &[ChunkMeta], schemas: &[String], -) -> Result> { +) -> Result<()> { validate_chunk_statuses(chunks)?; let actual_prefixes = collect_chunk_data_prefixes(storage).await?; @@ -552,7 +551,7 @@ async fn validate_data_snapshot( } } - Ok(actual_prefixes) + Ok(()) } async fn collect_chunk_data_prefixes(storage: &dyn SnapshotStorage) -> Result> { diff --git a/src/cli/src/data/import_v2/coordinator.rs b/src/cli/src/data/import_v2/coordinator.rs new file mode 100644 index 0000000000..fe2d016464 --- /dev/null +++ b/src/cli/src/data/import_v2/coordinator.rs @@ -0,0 +1,695 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeSet; +use std::path::{Path, PathBuf}; +use std::time::Instant; + +use async_trait::async_trait; +use common_telemetry::{info, warn}; + +use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus}; +use crate::data::import_v2::error::{ + ImportStateDdlIncompleteSnafu, ImportStateMismatchSnafu, Result, +}; +use crate::data::import_v2::state::{ + ImportState, ImportStateLockGuard, ImportTaskKey, ImportTaskStatus, canonical_schema_selection, + delete_import_state, load_import_state, save_import_state, try_acquire_import_state_lock, +}; +use crate::data::path::data_dir_for_schema_chunk; + +#[async_trait] +pub(crate) trait ImportTaskExecutor { + async fn import_task(&self, task: &ImportTaskKey) -> Result<()>; +} + +pub(crate) struct ImportResumeConfig { + pub(crate) snapshot_id: String, + pub(crate) target_addr: String, + pub(crate) catalog: String, + pub(crate) schemas: Vec, + pub(crate) state_path: PathBuf, + pub(crate) tasks: Vec, +} + +pub(crate) struct ImportResumeSession { + config: ImportResumeConfig, + state: ImportState, + lock: ImportStateLockGuard, +} + +impl ImportResumeSession { + pub(crate) fn should_skip_ddl(&self) -> bool { + self.state.ddl_completed + } + + /// Marks DDL as completed and persists the state. Must be called after a + /// successful DDL run on a fresh session, so that crashes after this point + /// resume into the data-import phase instead of replaying DDL. + pub(crate) async fn mark_ddl_completed(&mut self) -> Result<()> { + self.state.mark_ddl_completed(); + save_import_state(&self.config.state_path, &self.state).await + } +} + +pub(crate) fn chunk_has_schema_files(chunk: &ChunkMeta, schema: &str) -> bool { + let prefix = data_dir_for_schema_chunk(schema, chunk.id); + chunk.files.iter().any(|path| { + let normalized = path.trim_start_matches('/'); + normalized.starts_with(&prefix) + }) +} + +pub(crate) fn build_import_tasks(chunks: &[ChunkMeta], schemas: &[String]) -> Vec { + let mut tasks = Vec::new(); + for chunk in chunks { + if chunk.status == ChunkStatus::Skipped { + continue; + } + // TODO: build a per-chunk schema index if chunk file manifests become large. + for schema in schemas { + if chunk_has_schema_files(chunk, schema) { + tasks.push(ImportTaskKey::new(chunk.id, schema.clone())); + } + } + } + tasks +} + +pub(crate) async fn prepare_import_resume( + config: ImportResumeConfig, +) -> Result { + // Validate the request before touching the state file or acquiring the + // lock. Duplicate task keys would corrupt the resume bookkeeping because + // status lookups use linear `find()` and only ever see the first match. + validate_config_tasks(&config)?; + + let lock = try_acquire_import_state_lock(&config.state_path)?; + let state = match load_import_state(&config.state_path).await? { + Some(loaded) => { + validate_state_matches(&loaded, &config)?; + loaded + } + None => { + // Persist a fresh state immediately so that any crash after this + // point is recoverable as a resume. `ddl_completed=false` on a + // loaded state therefore means a previous run reached this point + // but did not confirm DDL completion - DDL must be (re-)run before + // data import is allowed. + let fresh = ImportState::new( + &config.snapshot_id, + &config.target_addr, + &config.catalog, + &config.schemas, + config.tasks.clone(), + ); + save_import_state(&config.state_path, &fresh).await?; + fresh + } + }; + + Ok(ImportResumeSession { + config, + state, + lock, + }) +} + +pub(crate) async fn import_with_resume_session( + session: ImportResumeSession, + executor: &E, +) -> Result<()> +where + E: ImportTaskExecutor + Sync, +{ + let ImportResumeSession { + config, + mut state, + lock, + } = session; + + // The state machine requires DDL to be explicitly marked completed before + // data import; otherwise a caller could import data and leave a state that + // replays DDL on the next resume. Surface the misuse instead of silently + // importing. + if !state.ddl_completed { + return ImportStateDdlIncompleteSnafu { + path: config.state_path.display().to_string(), + } + .fail(); + } + + let completed = state + .tasks + .iter() + .filter(|task| task.status == ImportTaskStatus::Completed) + .count(); + info!( + "Import resume state: {} completed, {} pending, path: {}", + completed, + state.tasks.len().saturating_sub(completed), + config.state_path.display() + ); + + let import_start = Instant::now(); + for (idx, task) in config.tasks.iter().enumerate() { + if state.task_status(task.chunk_id, &task.schema) == Some(ImportTaskStatus::Completed) { + info!( + "[{}/{}] Chunk {} schema {}: already completed, skipped", + idx + 1, + config.tasks.len(), + task.chunk_id, + task.schema + ); + continue; + } + + info!( + "[{}/{}] Chunk {} schema {}: importing...", + idx + 1, + config.tasks.len(), + task.chunk_id, + task.schema + ); + state.set_task_status( + task.chunk_id, + &task.schema, + ImportTaskStatus::InProgress, + None, + )?; + save_import_state(&config.state_path, &state).await?; + + let task_start = Instant::now(); + let result = executor.import_task(task).await; + + match result { + Ok(()) => { + // The task itself succeeded. If we cannot persist the + // Completed marker, the next resume will replay it (potentially + // duplicating data depending on engine semantics), but we must + // not pretend the import as a whole failed - return the persist + // error so the operator notices, after logging the success. + update_status_and_save( + &config, + &mut state, + task, + ImportTaskStatus::Completed, + None, + ) + .await?; + info!( + "[{}/{}] Chunk {} schema {}: done in {:?}", + idx + 1, + config.tasks.len(), + task.chunk_id, + task.schema, + task_start.elapsed() + ); + } + Err(task_error) => { + // Persist Failed best-effort, but always surface the original + // task error to the caller. State persistence problems are + // logged so they are not silently lost. + if let Err(persist_error) = update_status_and_save( + &config, + &mut state, + task, + ImportTaskStatus::Failed, + Some(task_error.to_string()), + ) + .await + { + warn!( + "Failed to persist Failed status for chunk {} schema {} after task error ({}); state file may be out of date: {}", + task.chunk_id, task.schema, task_error, persist_error + ); + } + return Err(task_error); + } + } + } + + delete_import_state(&config.state_path).await?; + info!("Data import finished in {:?}", import_start.elapsed()); + drop(lock); + Ok(()) +} + +async fn update_status_and_save( + config: &ImportResumeConfig, + state: &mut ImportState, + task: &ImportTaskKey, + status: ImportTaskStatus, + error_message: Option, +) -> Result<()> { + // set_task_status only fails if the task isn't in the state; that would + // indicate a logic bug since `task` came from the same config. Surface it + // instead of swallowing. + state.set_task_status(task.chunk_id, &task.schema, status, error_message)?; + save_import_state(&config.state_path, state).await +} + +fn validate_state_matches(state: &ImportState, config: &ImportResumeConfig) -> Result<()> { + if state.snapshot_id != config.snapshot_id { + return state_mismatch( + config, + format!( + "snapshot_id differs (state: {}, requested: {})", + state.snapshot_id, config.snapshot_id + ), + ); + } + // Target addresses are compared literally; hostname normalization is left to the caller. + if state.target_addr != config.target_addr { + return state_mismatch( + config, + format!( + "target_addr differs (state: {}, requested: {})", + state.target_addr, config.target_addr + ), + ); + } + if state.catalog != config.catalog { + return state_mismatch( + config, + format!( + "catalog differs (state: {}, requested: {})", + state.catalog, config.catalog + ), + ); + } + + let requested_schemas = canonical_schema_selection(&config.schemas); + if state.schemas != requested_schemas { + return state_mismatch( + config, + format!( + "schemas differ (state: {:?}, requested: {:?})", + state.schemas, requested_schemas + ), + ); + } + + if task_set_from_state(state, &config.state_path)? != task_set_from_config(config)? { + return state_mismatch(config, "task set differs".to_string()); + } + + Ok(()) +} + +fn state_mismatch(config: &ImportResumeConfig, reason: String) -> Result<()> { + ImportStateMismatchSnafu { + path: config.state_path.display().to_string(), + reason, + } + .fail() +} + +fn task_set_from_state<'a>( + state: &'a ImportState, + state_path: &Path, +) -> Result> { + let mut tasks = BTreeSet::new(); + for task in &state.tasks { + if !tasks.insert((task.chunk_id, task.schema.as_str())) { + return ImportStateMismatchSnafu { + path: state_path.display().to_string(), + reason: format!( + "duplicate task key in state (chunk_id: {}, schema: {})", + task.chunk_id, task.schema + ), + } + .fail(); + } + } + Ok(tasks) +} + +fn task_set_from_config(config: &ImportResumeConfig) -> Result> { + let mut tasks = BTreeSet::new(); + for task in &config.tasks { + if !tasks.insert((task.chunk_id, task.schema.as_str())) { + return ImportStateMismatchSnafu { + path: config.state_path.display().to_string(), + reason: format!( + "duplicate task key in request (chunk_id: {}, schema: {})", + task.chunk_id, task.schema + ), + } + .fail(); + } + } + Ok(tasks) +} + +fn validate_config_tasks(config: &ImportResumeConfig) -> Result<()> { + task_set_from_config(config).map(|_| ()) +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::{Arc, Mutex}; + + use super::*; + use crate::data::export_v2::manifest::{ChunkMeta, TimeRange}; + use crate::data::import_v2::error::TestTaskFailedSnafu; + + #[derive(Debug, Clone, Copy)] + enum FailureMode { + Fatal, + RetryableThenSuccess { failures: usize }, + } + + struct RecordingExecutor { + imported: Arc>>, + fail_task: Option, + failure_mode: Option, + attempts: Arc, + } + + #[async_trait] + impl ImportTaskExecutor for RecordingExecutor { + async fn import_task(&self, task: &ImportTaskKey) -> Result<()> { + let attempt = self.attempts.fetch_add(1, Ordering::SeqCst); + if self.fail_task.as_ref() == Some(task) { + match self.failure_mode { + Some(FailureMode::Fatal) => { + return TestTaskFailedSnafu { + message: "fatal failure".to_string(), + retryable: false, + } + .fail(); + } + Some(FailureMode::RetryableThenSuccess { failures }) if attempt < failures => { + return TestTaskFailedSnafu { + message: "retryable failure".to_string(), + retryable: true, + } + .fail(); + } + _ => {} + } + } + self.imported.lock().unwrap().push(task.clone()); + Ok(()) + } + } + + fn recording_executor(imported: Arc>>) -> RecordingExecutor { + RecordingExecutor { + imported, + fail_task: None, + failure_mode: None, + attempts: Arc::new(AtomicUsize::new(0)), + } + } + + fn config(path: PathBuf, tasks: Vec) -> ImportResumeConfig { + ImportResumeConfig { + snapshot_id: "snapshot-1".to_string(), + target_addr: "127.0.0.1:4000".to_string(), + catalog: "greptime".to_string(), + schemas: vec!["public".to_string(), "analytics".to_string()], + state_path: path, + tasks, + } + } + + async fn run_import_with_resume(config: ImportResumeConfig, executor: &E) -> Result<()> + where + E: ImportTaskExecutor + Sync, + { + // Mirror the production caller: mark DDL completed for fresh sessions + // so the data-import guard is satisfied. Tests that want to exercise + // the unsafe path drive prepare/import directly. + let mut session = prepare_import_resume(config).await?; + if !session.should_skip_ddl() { + session.mark_ddl_completed().await?; + } + import_with_resume_session(session, executor).await + } + + #[test] + fn test_build_import_tasks_skips_skipped_chunks_and_missing_schema_files() { + let mut completed = ChunkMeta::new(1, TimeRange::unbounded()); + completed.status = ChunkStatus::Completed; + completed.files = vec!["data/public/1/file.parquet".to_string()]; + let mut skipped = ChunkMeta::new(2, TimeRange::unbounded()); + skipped.status = ChunkStatus::Skipped; + skipped.files = vec!["data/public/2/file.parquet".to_string()]; + + let tasks = build_import_tasks( + &[completed, skipped], + &["public".to_string(), "analytics".to_string()], + ); + + assert_eq!(tasks, vec![ImportTaskKey::new(1, "public")]); + } + + #[tokio::test] + async fn test_import_with_resume_skips_completed_tasks() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("import_state.json"); + let tasks = vec![ + ImportTaskKey::new(1, "public"), + ImportTaskKey::new(2, "analytics"), + ]; + let mut state = ImportState::new( + "snapshot-1", + "127.0.0.1:4000", + "greptime", + &["public".to_string(), "analytics".to_string()], + tasks.clone(), + ); + state.mark_ddl_completed(); + state + .set_task_status(1, "public", ImportTaskStatus::Completed, None) + .unwrap(); + save_import_state(&path, &state).await.unwrap(); + + let imported = Arc::new(Mutex::new(Vec::new())); + let executor = recording_executor(imported.clone()); + + run_import_with_resume(config(path.clone(), tasks), &executor) + .await + .unwrap(); + + assert_eq!( + imported.lock().unwrap().clone(), + vec![ImportTaskKey::new(2, "analytics")] + ); + assert!(load_import_state(&path).await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_import_with_resume_persists_failed_task() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("import_state.json"); + let failed_task = ImportTaskKey::new(1, "public"); + let tasks = vec![failed_task.clone()]; + let imported = Arc::new(Mutex::new(Vec::new())); + let executor = RecordingExecutor { + imported, + fail_task: Some(failed_task.clone()), + failure_mode: Some(FailureMode::Fatal), + attempts: Arc::new(AtomicUsize::new(0)), + }; + + let error = run_import_with_resume(config(path.clone(), tasks), &executor) + .await + .unwrap_err(); + assert!(matches!( + error, + crate::data::import_v2::error::Error::TestTaskFailed { + retryable: false, + .. + } + )); + + let state = load_import_state(&path).await.unwrap().unwrap(); + assert_eq!( + state.task_status(failed_task.chunk_id, &failed_task.schema), + Some(ImportTaskStatus::Failed) + ); + } + + #[tokio::test] + async fn test_import_with_resume_rejects_mismatched_state_identity() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("import_state.json"); + let tasks = vec![ImportTaskKey::new(1, "public")]; + let state = ImportState::new( + "snapshot-1", + "127.0.0.1:4001", + "greptime", + &["public".to_string(), "analytics".to_string()], + tasks.clone(), + ); + save_import_state(&path, &state).await.unwrap(); + + let imported = Arc::new(Mutex::new(Vec::new())); + let executor = recording_executor(imported); + + let error = run_import_with_resume(config(path, tasks), &executor) + .await + .unwrap_err(); + + assert!(matches!( + error, + crate::data::import_v2::error::Error::ImportStateMismatch { .. } + )); + } + + #[tokio::test] + async fn test_prepare_import_resume_reports_existing_state_before_ddl() { + let dir = tempfile::tempdir().unwrap(); + let tasks = vec![ImportTaskKey::new(1, "public")]; + + let fresh_session = + prepare_import_resume(config(dir.path().join("fresh_state.json"), tasks.clone())) + .await + .unwrap(); + assert!(!fresh_session.should_skip_ddl()); + drop(fresh_session); + + let existing_path = dir.path().join("existing_state.json"); + let mut state = ImportState::new( + "snapshot-1", + "127.0.0.1:4000", + "greptime", + &["public".to_string(), "analytics".to_string()], + tasks.clone(), + ); + state.mark_ddl_completed(); + save_import_state(&existing_path, &state).await.unwrap(); + + let resume_session = prepare_import_resume(config(existing_path, tasks)) + .await + .unwrap(); + assert!(resume_session.should_skip_ddl()); + } + + #[tokio::test] + async fn test_import_with_resume_rejects_duplicate_state_tasks() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("import_state.json"); + let tasks = vec![ImportTaskKey::new(1, "public")]; + let mut state = ImportState::new( + "snapshot-1", + "127.0.0.1:4000", + "greptime", + &["public".to_string(), "analytics".to_string()], + tasks.clone(), + ); + state.tasks.push(state.tasks[0].clone()); + save_import_state(&path, &state).await.unwrap(); + + let imported = Arc::new(Mutex::new(Vec::new())); + let executor = recording_executor(imported); + + let error = run_import_with_resume(config(path, tasks), &executor) + .await + .unwrap_err(); + + assert!(matches!( + error, + crate::data::import_v2::error::Error::ImportStateMismatch { .. } + )); + } + + #[tokio::test] + async fn test_import_with_resume_rejects_data_import_when_ddl_incomplete() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("import_state.json"); + let tasks = vec![ImportTaskKey::new(1, "public")]; + + // prepare creates fresh state with ddl_completed=false; calling + // import_with_resume_session directly (without mark_ddl_completed) + // must be rejected. + let session = prepare_import_resume(config(path, tasks)).await.unwrap(); + let imported = Arc::new(Mutex::new(Vec::new())); + let executor = recording_executor(imported.clone()); + + let error = import_with_resume_session(session, &executor) + .await + .unwrap_err(); + + assert!(matches!( + error, + crate::data::import_v2::error::Error::ImportStateDdlIncomplete { .. } + )); + assert!(imported.lock().unwrap().is_empty()); + } + + #[tokio::test] + async fn test_prepare_import_resume_rejects_duplicate_request_tasks_on_fresh_state() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("import_state.json"); + let task = ImportTaskKey::new(1, "public"); + // No state file yet - duplicate detection must run before the fresh + // state is persisted, otherwise corrupted bookkeeping would be + // written to disk and observed only on a later resume. + let error = + match prepare_import_resume(config(path.clone(), vec![task.clone(), task])).await { + Ok(_) => panic!("duplicate request tasks should be rejected"), + Err(error) => error, + }; + + assert!(matches!( + error, + crate::data::import_v2::error::Error::ImportStateMismatch { .. } + )); + assert!(load_import_state(&path).await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_import_with_resume_does_not_retry_retryable_task_error() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("import_state.json"); + let failed_task = ImportTaskKey::new(1, "public"); + let tasks = vec![failed_task.clone()]; + let imported = Arc::new(Mutex::new(Vec::new())); + let attempts = Arc::new(AtomicUsize::new(0)); + let executor = RecordingExecutor { + imported: imported.clone(), + fail_task: Some(failed_task.clone()), + // If task import were retried, the second attempt would succeed. + // COPY DATABASE FROM failures are ambiguous, so retryable errors + // must still stop immediately to avoid duplicate rows. + failure_mode: Some(FailureMode::RetryableThenSuccess { failures: 1 }), + attempts: attempts.clone(), + }; + + let error = run_import_with_resume(config(path.clone(), tasks), &executor) + .await + .unwrap_err(); + + assert!(matches!( + error, + crate::data::import_v2::error::Error::TestTaskFailed { + retryable: true, + .. + } + )); + assert_eq!(attempts.load(Ordering::SeqCst), 1); + assert!(imported.lock().unwrap().is_empty()); + + let state = load_import_state(&path).await.unwrap().unwrap(); + assert_eq!( + state.task_status(failed_task.chunk_id, &failed_task.schema), + Some(ImportTaskStatus::Failed) + ); + } +} diff --git a/src/cli/src/data/import_v2/error.rs b/src/cli/src/data/import_v2/error.rs index 3bb712acd5..165f1c0118 100644 --- a/src/cli/src/data/import_v2/error.rs +++ b/src/cli/src/data/import_v2/error.rs @@ -129,9 +129,55 @@ pub enum Error { location: Location, }, - #[snafu(display("Import state references unknown chunk {}", chunk_id))] - ImportStateUnknownChunk { + #[snafu(display( + "Failed to determine import state path for snapshot '{}'. Set HOME, USERPROFILE, or run from a valid current directory.", + snapshot_id + ))] + ImportStatePathUnavailable { + snapshot_id: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Import state at '{}' does not match current import: {}. Either rerun with matching import arguments, or delete the state file to start over (DDL will be re-executed).", + path, + reason + ))] + ImportStateMismatch { + path: String, + reason: String, + #[snafu(implicit)] + location: Location, + }, + + #[cfg(test)] + #[snafu(display("Test task failed: {}", message))] + TestTaskFailed { + message: String, + retryable: bool, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Import state references unknown task: chunk {}, schema '{}'", + chunk_id, + schema + ))] + ImportStateUnknownTask { chunk_id: u32, + schema: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "Import state at '{}' is not ready for data import: DDL has not been marked completed", + path + ))] + ImportStateDdlIncomplete { + path: String, #[snafu(implicit)] location: Location, }, @@ -148,7 +194,18 @@ impl ErrorExt for Error { | Error::IncompleteSnapshot { .. } | Error::EmptyChunkManifest { .. } | Error::MissingChunkData { .. } => StatusCode::InvalidArguments, - Error::ImportStateUnknownChunk { .. } => StatusCode::Unexpected, + Error::ImportStatePathUnavailable { .. } + | Error::ImportStateUnknownTask { .. } + | Error::ImportStateDdlIncomplete { .. } => StatusCode::Unexpected, + Error::ImportStateMismatch { .. } => StatusCode::InvalidArguments, + #[cfg(test)] + Error::TestTaskFailed { retryable, .. } => { + if *retryable { + StatusCode::StorageUnavailable + } else { + StatusCode::InvalidArguments + } + } Error::Database { error, .. } => error.status_code(), Error::SnapshotStorage { error, .. } | Error::ChunkImportFailed { error, .. } => { error.status_code() diff --git a/src/cli/src/data/import_v2/state.rs b/src/cli/src/data/import_v2/state.rs index cdad6e45db..ba431ac62f 100644 --- a/src/cli/src/data/import_v2/state.rs +++ b/src/cli/src/data/import_v2/state.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![allow(dead_code)] - use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; @@ -24,8 +22,8 @@ use snafu::{IntoError, OptionExt, ResultExt}; use tokio::io::AsyncWriteExt; use crate::data::import_v2::error::{ - ImportStateIoSnafu, ImportStateLockedSnafu, ImportStateParseSnafu, - ImportStateUnknownChunkSnafu, Result, + ImportStateIoSnafu, ImportStateLockedSnafu, ImportStateParseSnafu, ImportStateUnknownTaskSnafu, + Result, }; use crate::data::path::encode_path_segment; @@ -33,9 +31,9 @@ const IMPORT_STATE_ROOT: &str = ".greptime"; const IMPORT_STATE_DIR: &str = "import_state"; static IMPORT_STATE_TMP_ID: AtomicU64 = AtomicU64::new(0); -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] -pub(crate) enum ImportChunkStatus { +pub(crate) enum ImportTaskStatus { Pending, InProgress, Completed, @@ -43,9 +41,25 @@ pub(crate) enum ImportChunkStatus { } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub(crate) struct ImportChunkState { - pub(crate) id: u32, - pub(crate) status: ImportChunkStatus, +pub(crate) struct ImportTaskKey { + pub(crate) chunk_id: u32, + pub(crate) schema: String, +} + +impl ImportTaskKey { + pub(crate) fn new(chunk_id: u32, schema: impl Into) -> Self { + Self { + chunk_id, + schema: schema.into(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct ImportTaskState { + pub(crate) chunk_id: u32, + pub(crate) schema: String, + pub(crate) status: ImportTaskStatus, #[serde(skip_serializing_if = "Option::is_none")] pub(crate) error: Option, } @@ -54,55 +68,77 @@ pub(crate) struct ImportChunkState { pub(crate) struct ImportState { pub(crate) snapshot_id: String, pub(crate) target_addr: String, + pub(crate) catalog: String, + pub(crate) schemas: Vec, + #[serde(default)] + pub(crate) ddl_completed: bool, pub(crate) updated_at: DateTime, - // Chunk counts are expected to stay below ~1000, so linear scans are acceptable here. - pub(crate) chunks: Vec, + // Tasks are (chunk-schema) tuples and can reach the tens of thousands; + // linear scans here are accepted because per-task work is dominated by + // network I/O and an fsync, but if the bound grows further this should be + // backed by a HashMap<(chunk_id, schema), index> rebuilt after load. + pub(crate) tasks: Vec, } impl ImportState { pub(crate) fn new( snapshot_id: impl Into, target_addr: impl Into, - chunk_ids: I, + catalog: impl Into, + schemas: &[String], + tasks: I, ) -> Self where - I: IntoIterator, + I: IntoIterator, { Self { snapshot_id: snapshot_id.into(), target_addr: target_addr.into(), + catalog: catalog.into(), + schemas: canonical_schema_selection(schemas), + ddl_completed: false, updated_at: Utc::now(), - chunks: chunk_ids + tasks: tasks .into_iter() - .map(|id| ImportChunkState { - id, - status: ImportChunkStatus::Pending, + .map(|task| ImportTaskState { + chunk_id: task.chunk_id, + schema: task.schema, + status: ImportTaskStatus::Pending, error: None, }) .collect(), } } - pub(crate) fn chunk_status(&self, chunk_id: u32) -> Option { - self.chunks - .iter() - .find(|chunk| chunk.id == chunk_id) - .map(|chunk| chunk.status.clone()) + pub(crate) fn mark_ddl_completed(&mut self) { + self.ddl_completed = true; + self.updated_at = Utc::now(); } - pub(crate) fn set_chunk_status( + pub(crate) fn task_status(&self, chunk_id: u32, schema: &str) -> Option { + self.tasks + .iter() + .find(|task| task.chunk_id == chunk_id && task.schema == schema) + .map(|task| task.status) + } + + pub(crate) fn set_task_status( &mut self, chunk_id: u32, - status: ImportChunkStatus, + schema: &str, + status: ImportTaskStatus, error: Option, ) -> Result<()> { - let chunk = self - .chunks + let task = self + .tasks .iter_mut() - .find(|chunk| chunk.id == chunk_id) - .context(ImportStateUnknownChunkSnafu { chunk_id })?; - chunk.status = status; - chunk.error = error; + .find(|task| task.chunk_id == chunk_id && task.schema == schema) + .context(ImportStateUnknownTaskSnafu { + chunk_id, + schema: schema.to_string(), + })?; + task.status = status; + task.error = error; self.updated_at = Utc::now(); Ok(()) } @@ -119,15 +155,27 @@ impl Drop for ImportStateLockGuard { } } -pub(crate) fn default_state_path(snapshot_id: &str, target_addr: &str) -> Option { +pub(crate) fn default_state_path( + snapshot_id: &str, + target_addr: &str, + catalog: &str, + schemas: &[String], +) -> Option { let home = default_home_dir_with(|key| std::env::var_os(key)); let cwd = std::env::current_dir().ok(); - default_state_path_with(home.as_deref(), cwd.as_deref(), snapshot_id, target_addr) + default_state_path_with( + home.as_deref(), + cwd.as_deref(), + snapshot_id, + target_addr, + catalog, + schemas, + ) } fn default_home_dir_with(get: F) -> Option where - F: for<'a> Fn(&'a str) -> Option, + F: Fn(&str) -> Option, { get("HOME") .or_else(|| get("USERPROFILE")) @@ -144,8 +192,10 @@ fn default_state_path_with( cwd: Option<&Path>, snapshot_id: &str, target_addr: &str, + catalog: &str, + schemas: &[String], ) -> Option { - let file_name = import_state_file_name(snapshot_id, target_addr); + let file_name = import_state_file_name(snapshot_id, target_addr, catalog, schemas); match (home, cwd) { (Some(home), _) => Some( home.join(IMPORT_STATE_ROOT) @@ -157,14 +207,58 @@ fn default_state_path_with( } } -fn import_state_file_name(snapshot_id: &str, target_addr: &str) -> String { +fn import_state_file_name( + snapshot_id: &str, + target_addr: &str, + catalog: &str, + schemas: &[String], +) -> String { format!( - ".import_state_{}_{}.json", + ".import_state_{}_{}_{}.json", encode_path_segment(snapshot_id), - encode_path_segment(target_addr) + encode_path_segment(target_addr), + import_identity_hash(catalog, schemas) ) } +pub(crate) fn canonical_schema_selection(schemas: &[String]) -> Vec { + let mut canonicalized = schemas + .iter() + .map(|schema| schema.to_ascii_lowercase()) + .collect::>(); + canonicalized.sort(); + canonicalized.dedup(); + canonicalized +} + +/// FNV-1a over `(catalog, schemas)`. The output is part of the persisted state +/// filename, so we cannot use `std::collections::hash_map::DefaultHasher` - +/// Rust does not guarantee its algorithm across releases, which would make a +/// state file written by one toolchain undiscoverable by another. +fn import_identity_hash(catalog: &str, schemas: &[String]) -> String { + const FNV_OFFSET: u64 = 0xcbf29ce484222325; + const FNV_PRIME: u64 = 0x100000001b3; + + fn hash_bytes(mut hash: u64, bytes: &[u8]) -> u64 { + for byte in bytes { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(FNV_PRIME); + } + hash + } + + let mut hash = FNV_OFFSET; + hash = hash_bytes(hash, catalog.as_bytes()); + // 0xff cannot appear in valid UTF-8, so it works as an unambiguous + // field separator between adjacent identifiers. + hash = hash_bytes(hash, &[0xff]); + for schema in canonical_schema_selection(schemas) { + hash = hash_bytes(hash, schema.as_bytes()); + hash = hash_bytes(hash, &[0xff]); + } + format!("{hash:016x}") +} + pub(crate) async fn load_import_state(path: &Path) -> Result> { match tokio::fs::read(path).await { Ok(bytes) => { @@ -268,10 +362,10 @@ fn import_state_lock_path(path: &Path) -> PathBuf { } fn normalize_import_state_for_resume(state: &mut ImportState) { - for chunk in &mut state.chunks { - if chunk.status == ImportChunkStatus::InProgress { - chunk.status = ImportChunkStatus::Pending; - chunk.error = None; + for task in &mut state.tasks { + if task.status == ImportTaskStatus::InProgress { + task.status = ImportTaskStatus::Pending; + task.error = None; } } } @@ -324,42 +418,82 @@ mod tests { const CHILD_LOCK_TEST: &str = "data::import_v2::state::tests::test_try_acquire_import_state_lock_child_process"; - #[test] - fn test_import_state_new_initializes_pending_chunks() { - let state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]); + fn schemas() -> Vec { + vec!["public".to_string(), "analytics".to_string()] + } - assert_eq!(state.snapshot_id, "snapshot-1"); - assert_eq!(state.target_addr, "127.0.0.1:4000"); - assert_eq!(state.chunks.len(), 2); - assert_eq!(state.chunks[0].status, ImportChunkStatus::Pending); - assert_eq!(state.chunks[1].status, ImportChunkStatus::Pending); + fn tasks() -> Vec { + vec![ + ImportTaskKey::new(1, "public"), + ImportTaskKey::new(2, "analytics"), + ] } #[test] - fn test_set_chunk_status_updates_timestamp_and_error() { - let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]); + fn test_import_state_new_initializes_pending_tasks() { + let state = ImportState::new( + "snapshot-1", + "127.0.0.1:4000", + "greptime", + &schemas(), + tasks(), + ); + + assert_eq!(state.snapshot_id, "snapshot-1"); + assert_eq!(state.target_addr, "127.0.0.1:4000"); + assert_eq!(state.catalog, "greptime"); + assert_eq!(state.schemas, vec!["analytics", "public"]); + assert_eq!(state.tasks.len(), 2); + assert_eq!(state.tasks[0].status, ImportTaskStatus::Pending); + assert_eq!(state.tasks[1].status, ImportTaskStatus::Pending); + } + + #[test] + fn test_set_task_status_updates_timestamp_and_error() { + let mut state = ImportState::new( + "snapshot-1", + "127.0.0.1:4000", + "greptime", + &schemas(), + tasks(), + ); let before = state.updated_at; state.updated_at = Utc::now() - chrono::Duration::seconds(10); state - .set_chunk_status(1, ImportChunkStatus::Failed, Some("timeout".to_string())) + .set_task_status( + 1, + "public", + ImportTaskStatus::Failed, + Some("timeout".to_string()), + ) .unwrap(); - assert_eq!(state.chunk_status(1), Some(ImportChunkStatus::Failed)); - assert_eq!(state.chunks[0].error.as_deref(), Some("timeout")); + assert_eq!( + state.task_status(1, "public"), + Some(ImportTaskStatus::Failed) + ); + assert_eq!(state.tasks[0].error.as_deref(), Some("timeout")); assert!(state.updated_at > before); } #[test] - fn test_set_chunk_status_rejects_unknown_chunk_id() { - let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]); + fn test_set_task_status_rejects_unknown_task() { + let mut state = ImportState::new( + "snapshot-1", + "127.0.0.1:4000", + "greptime", + &schemas(), + tasks(), + ); let error = state - .set_chunk_status(99, ImportChunkStatus::Completed, None) + .set_task_status(99, "public", ImportTaskStatus::Completed, None) .unwrap_err(); assert!(matches!( error, - crate::data::import_v2::error::Error::ImportStateUnknownChunk { chunk_id, .. } if chunk_id == 99 + crate::data::import_v2::error::Error::ImportStateUnknownTask { chunk_id, schema, .. } + if chunk_id == 99 && schema == "public" )); } @@ -367,9 +501,15 @@ mod tests { async fn test_save_and_load_import_state_round_trip() { let dir = tempdir().unwrap(); let path = dir.path().join("import_state.json"); - let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]); + let mut state = ImportState::new( + "snapshot-1", + "127.0.0.1:4000", + "greptime", + &schemas(), + tasks(), + ); state - .set_chunk_status(2, ImportChunkStatus::Completed, None) + .set_task_status(2, "analytics", ImportTaskStatus::Completed, None) .unwrap(); save_import_state(&path, &state).await.unwrap(); @@ -377,53 +517,78 @@ mod tests { assert_eq!(loaded.snapshot_id, state.snapshot_id); assert_eq!(loaded.target_addr, state.target_addr); - assert_eq!(loaded.chunks, state.chunks); + assert_eq!(loaded.catalog, state.catalog); + assert_eq!(loaded.schemas, state.schemas); + assert_eq!(loaded.tasks, state.tasks); } #[tokio::test] async fn test_save_import_state_overwrites_existing_file() { let dir = tempdir().unwrap(); let path = dir.path().join("import_state.json"); - let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]); + let mut state = ImportState::new( + "snapshot-1", + "127.0.0.1:4000", + "greptime", + &schemas(), + tasks(), + ); save_import_state(&path, &state).await.unwrap(); state - .set_chunk_status(1, ImportChunkStatus::Completed, None) + .set_task_status(1, "public", ImportTaskStatus::Completed, None) .unwrap(); save_import_state(&path, &state).await.unwrap(); let loaded = load_import_state(&path).await.unwrap().unwrap(); - assert_eq!(loaded.chunk_status(1), Some(ImportChunkStatus::Completed)); + assert_eq!( + loaded.task_status(1, "public"), + Some(ImportTaskStatus::Completed) + ); } #[test] fn test_load_import_state_resets_in_progress_to_pending() { - let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]); + let mut state = ImportState::new( + "snapshot-1", + "127.0.0.1:4000", + "greptime", + &schemas(), + tasks(), + ); state - .set_chunk_status( + .set_task_status( 2, - ImportChunkStatus::InProgress, + "analytics", + ImportTaskStatus::InProgress, Some("running".to_string()), ) .unwrap(); normalize_import_state_for_resume(&mut state); - assert_eq!(state.chunk_status(1), Some(ImportChunkStatus::Pending)); - assert_eq!(state.chunk_status(2), Some(ImportChunkStatus::Pending)); - assert_eq!(state.chunks[1].error, None); + assert_eq!( + state.task_status(1, "public"), + Some(ImportTaskStatus::Pending) + ); + assert_eq!( + state.task_status(2, "analytics"), + Some(ImportTaskStatus::Pending) + ); + assert_eq!(state.tasks[1].error, None); } #[test] fn test_unique_tmp_path_generates_distinct_paths() { - let path = Path::new("/tmp/import_state.json"); + let dir = tempdir().unwrap(); + let path = dir.path().join("import_state.json"); - let first = unique_tmp_path(path); - let second = unique_tmp_path(path); + let first = unique_tmp_path(&path); + let second = unique_tmp_path(&path); assert_ne!(first, second); - assert!(first.starts_with("/tmp")); - assert!(second.starts_with("/tmp")); + assert!(first.starts_with(dir.path())); + assert!(second.starts_with(dir.path())); assert!( first .file_name() @@ -507,44 +672,104 @@ mod tests { Some(cwd.path()), "../snapshot", "127.0.0.1:4000", + "greptime", + &schemas(), ) .unwrap(); assert_eq!( - path, - home.path() - .join(IMPORT_STATE_ROOT) - .join(IMPORT_STATE_DIR) - .join(".import_state_%2E%2E%2Fsnapshot_127%2E0%2E0%2E1%3A4000.json") + path.parent().unwrap(), + home.path().join(IMPORT_STATE_ROOT).join(IMPORT_STATE_DIR) ); + let file_name = path.file_name().unwrap().to_string_lossy(); + assert!(file_name.starts_with(".import_state_%2E%2E%2Fsnapshot_127%2E0%2E0%2E1%3A4000_")); + assert!(file_name.ends_with(".json")); } #[test] fn test_default_state_path_falls_back_to_cwd_when_home_missing() { let cwd = tempdir().unwrap(); - let path = - default_state_path_with(None, Some(cwd.path()), "snapshot-1", "target-a").unwrap(); + let path = default_state_path_with( + None, + Some(cwd.path()), + "snapshot-1", + "target-a", + "greptime", + &schemas(), + ) + .unwrap(); - assert_eq!( - path, - cwd.path().join(".import_state_snapshot-1_target-a.json") - ); + assert_eq!(path.parent().unwrap(), cwd.path()); + let file_name = path.file_name().unwrap().to_string_lossy(); + assert!(file_name.starts_with(".import_state_snapshot-1_target-a_")); + assert!(file_name.ends_with(".json")); } #[test] fn test_default_state_path_isolated_by_target_addr() { let cwd = tempdir().unwrap(); - let first = default_state_path_with(None, Some(cwd.path()), "snapshot-1", "127.0.0.1:4000") - .unwrap(); - let second = - default_state_path_with(None, Some(cwd.path()), "snapshot-1", "127.0.0.1:4001") - .unwrap(); + let first = default_state_path_with( + None, + Some(cwd.path()), + "snapshot-1", + "127.0.0.1:4000", + "greptime", + &schemas(), + ) + .unwrap(); + let second = default_state_path_with( + None, + Some(cwd.path()), + "snapshot-1", + "127.0.0.1:4001", + "greptime", + &schemas(), + ) + .unwrap(); assert_ne!(first, second); } + #[test] + fn test_default_state_path_isolated_by_catalog_and_schemas() { + let cwd = tempdir().unwrap(); + let public_only = vec!["public".to_string()]; + let analytics_only = vec!["analytics".to_string()]; + + let first = default_state_path_with( + None, + Some(cwd.path()), + "snapshot-1", + "127.0.0.1:4000", + "greptime", + &public_only, + ) + .unwrap(); + let second = default_state_path_with( + None, + Some(cwd.path()), + "snapshot-1", + "127.0.0.1:4000", + "other", + &public_only, + ) + .unwrap(); + let third = default_state_path_with( + None, + Some(cwd.path()), + "snapshot-1", + "127.0.0.1:4000", + "greptime", + &analytics_only, + ) + .unwrap(); + + assert_ne!(first, second); + assert_ne!(first, third); + } + #[test] fn test_default_home_dir_prefers_home() { let detected = default_home_dir_with(|key| match key { diff --git a/src/cli/src/data/retry.rs b/src/cli/src/data/retry.rs deleted file mode 100644 index f0ac6386d3..0000000000 --- a/src/cli/src/data/retry.rs +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#![allow(dead_code)] - -use std::time::Duration; - -use backon::ExponentialBuilder; - -pub(crate) fn default_retry_policy() -> ExponentialBuilder { - ExponentialBuilder::default() - .with_min_delay(Duration::from_secs(1)) - .with_max_delay(Duration::from_secs(300)) - .with_factor(2.0) - // This is the number of retries after the initial attempt. - .with_max_times(3) - .with_jitter() -} - -#[cfg(test)] -mod tests { - use std::future::ready; - use std::sync::Arc; - use std::sync::atomic::{AtomicUsize, Ordering}; - - use backon::Retryable; - - use super::*; - - #[tokio::test] - async fn test_retry_policy_retries_retryable_error_until_success() { - let attempts = Arc::new(AtomicUsize::new(0)); - - let result = ({ - let attempts = attempts.clone(); - move || { - let attempts = attempts.clone(); - async move { - let current = attempts.fetch_add(1, Ordering::SeqCst); - if current < 2 { - Err("retryable") - } else { - Ok("done") - } - } - } - }) - .retry(default_retry_policy()) - .when(|error| *error == "retryable") - .sleep(|_| ready(())) - .await; - - assert_eq!(result, Ok("done")); - assert_eq!(attempts.load(Ordering::SeqCst), 3); - } - - #[tokio::test] - async fn test_retry_policy_stops_on_non_retryable_error() { - let attempts = Arc::new(AtomicUsize::new(0)); - - let result: std::result::Result<(), &str> = ({ - let attempts = attempts.clone(); - move || { - let attempts = attempts.clone(); - async move { - attempts.fetch_add(1, Ordering::SeqCst); - Err("fatal") - } - } - }) - .retry(default_retry_policy()) - .when(|error| *error == "retryable") - .sleep(|_| ready(())) - .await; - - assert_eq!(result, Err("fatal")); - assert_eq!(attempts.load(Ordering::SeqCst), 1); - } - - #[tokio::test] - async fn test_retry_policy_returns_last_error_after_reaching_limit() { - let attempts = Arc::new(AtomicUsize::new(0)); - - let result: std::result::Result<(), usize> = ({ - let attempts = attempts.clone(); - move || { - let attempts = attempts.clone(); - async move { - let current = attempts.fetch_add(1, Ordering::SeqCst); - Err(current) - } - } - }) - .retry(default_retry_policy().with_max_times(2)) - .when(|_| true) - .sleep(|_| ready(())) - .await; - - assert_eq!(result, Err(2)); - assert_eq!(attempts.load(Ordering::SeqCst), 3); - } -} diff --git a/src/cli/src/database.rs b/src/cli/src/database.rs index fa3f6faefb..cba27129dd 100644 --- a/src/cli/src/database.rs +++ b/src/cli/src/database.rs @@ -89,6 +89,10 @@ impl DatabaseClient { } } + pub fn addr(&self) -> &str { + &self.addr + } + pub async fn sql_in_public(&self, sql: &str) -> Result>>> { self.sql(sql, DEFAULT_SCHEMA_NAME).await }