feat: import resume part2 (#8070)

* feat: import resume part2

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by AI comments

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by AI comments

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by comments

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: condig docs

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2026-05-11 19:56:52 -07:00
committed by GitHub
parent a282b207a6
commit d709fd29ef
10 changed files with 1180 additions and 315 deletions

1
Cargo.lock generated
View File

@@ -1928,7 +1928,6 @@ dependencies = [
"async-stream",
"async-trait",
"auth",
"backon",
"base64 0.22.1",
"cache",
"catalog",

View File

@@ -19,7 +19,6 @@ workspace = true
async-stream.workspace = true
async-trait.workspace = true
auth.workspace = true
backon.workspace = true
base64.workspace = true
cache.workspace = true
catalog.workspace = true

View File

@@ -17,7 +17,6 @@ pub mod export_v2;
mod import;
pub mod import_v2;
pub(crate) mod path;
pub(crate) mod retry;
pub mod snapshot_storage;
pub(crate) mod sql;
mod storage_export;

View File

@@ -35,6 +35,7 @@
//! ```
mod command;
pub(crate) mod coordinator;
pub mod error;
pub mod executor;
pub(crate) mod state;

View File

@@ -15,24 +15,29 @@
//! Import V2 CLI command.
use std::collections::HashSet;
use std::time::{Duration, Instant};
use std::time::Duration;
use async_trait::async_trait;
use clap::Parser;
use common_error::ext::BoxedError;
use common_telemetry::info;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use crate::Tool;
use crate::common::ObjectStoreConfig;
use crate::data::export_v2::data::{build_copy_source, execute_copy_database_from};
use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus, DataFormat, MANIFEST_VERSION};
use crate::data::import_v2::coordinator::{
ImportResumeConfig, ImportTaskExecutor, build_import_tasks, chunk_has_schema_files,
import_with_resume_session, prepare_import_resume,
};
use crate::data::import_v2::error::{
ChunkImportFailedSnafu, EmptyChunkManifestSnafu, IncompleteSnapshotSnafu,
ManifestVersionMismatchSnafu, MissingChunkDataSnafu, Result, SchemaNotInSnapshotSnafu,
SnapshotStorageSnafu,
ChunkImportFailedSnafu, EmptyChunkManifestSnafu, ImportStatePathUnavailableSnafu,
IncompleteSnapshotSnafu, ManifestVersionMismatchSnafu, MissingChunkDataSnafu, Result,
SchemaNotInSnapshotSnafu, SnapshotStorageSnafu,
};
use crate::data::import_v2::executor::{DdlExecutor, DdlStatement};
use crate::data::import_v2::state::{ImportTaskKey, default_state_path};
use crate::data::path::{data_dir_for_schema_chunk, ddl_path_for_schema};
use crate::data::snapshot_storage::{OpenDalStorage, SnapshotStorage, validate_uri};
use crate::database::{DatabaseClient, parse_proxy_opts};
@@ -185,13 +190,12 @@ impl Import {
info!("Generated {} DDL statements", ddl_statements.len());
let data_prefixes = if !manifest.schema_only && !manifest.chunks.is_empty() {
Some(
validate_data_snapshot(self.storage.as_ref(), &manifest.chunks, &schemas_to_import)
.await?,
)
let data_tasks = if !manifest.schema_only && !manifest.chunks.is_empty() {
validate_data_snapshot(self.storage.as_ref(), &manifest.chunks, &schemas_to_import)
.await?;
build_import_tasks(&manifest.chunks, &schemas_to_import)
} else {
None
Vec::new()
};
// 4. Dry-run mode: print DDL and exit
@@ -212,24 +216,67 @@ impl Import {
return Ok(());
}
// 5. Execute DDL
let executor = DdlExecutor::new(&self.database_client);
executor.execute_strict(&ddl_statements).await?;
if !manifest.schema_only && !manifest.chunks.is_empty() {
self.import_data(
&manifest.chunks,
let mut resume_session = if !data_tasks.is_empty() {
let state_path = default_state_path(
&manifest.snapshot_id.to_string(),
self.database_client.addr(),
&self.catalog,
&schemas_to_import,
manifest.format,
data_prefixes.expect("validated full snapshot must provide data prefixes"),
)
.await?;
.context(ImportStatePathUnavailableSnafu {
snapshot_id: manifest.snapshot_id.to_string(),
})?;
Some(
prepare_import_resume(ImportResumeConfig {
snapshot_id: manifest.snapshot_id.to_string(),
target_addr: self.database_client.addr().to_string(),
catalog: self.catalog.clone(),
schemas: schemas_to_import.clone(),
state_path,
tasks: data_tasks,
})
.await?,
)
} else {
None
};
let skip_ddl = resume_session
.as_ref()
.map(|session| session.should_skip_ddl())
.unwrap_or(false);
// 5. Execute DDL unless a previous run already completed it.
let ddl_executed = if skip_ddl {
info!(
"Existing import state has DDL marked completed; skipping DDL execution and resuming data import"
);
false
} else {
let executor = DdlExecutor::new(&self.database_client);
executor.execute_strict(&ddl_statements).await?;
if let Some(session) = resume_session.as_mut() {
session.mark_ddl_completed().await?;
}
true
};
if let Some(resume_session) = resume_session {
let executor = CopyDatabaseImportTaskExecutor {
import: self,
format: manifest.format,
};
import_with_resume_session(resume_session, &executor).await?;
}
info!(
"Import completed: {} DDL statements executed",
ddl_statements.len()
);
if ddl_executed {
info!(
"Import completed: {} DDL statements executed",
ddl_statements.len()
);
} else {
info!("Import completed: DDL execution skipped");
}
Ok(())
}
@@ -252,79 +299,39 @@ impl Import {
Ok(statements)
}
}
async fn import_data(
&self,
chunks: &[ChunkMeta],
schemas: &[String],
format: DataFormat,
actual_prefixes: HashSet<String>,
) -> Result<()> {
let import_start = Instant::now();
let total_chunks = chunks
.iter()
.filter(|chunk| chunk.status == ChunkStatus::Completed)
.count();
info!(
"Importing data: {} chunks, {} schemas",
total_chunks,
schemas.len()
);
struct CopyDatabaseImportTaskExecutor<'a> {
import: &'a Import,
format: DataFormat,
}
for (idx, chunk) in chunks.iter().enumerate() {
if chunk.status == ChunkStatus::Skipped {
info!(
"[{}/{}] Chunk {}: skipped (no data)",
idx + 1,
chunks.len(),
chunk.id
);
continue;
}
#[async_trait]
impl ImportTaskExecutor for CopyDatabaseImportTaskExecutor<'_> {
async fn import_task(&self, task: &ImportTaskKey) -> Result<()> {
let source = build_copy_source(
&self.import.snapshot_uri,
&self.import.storage_config,
&task.schema,
task.chunk_id,
)
.context(ChunkImportFailedSnafu {
chunk_id: task.chunk_id,
schema: task.schema.clone(),
})?;
info!(
"[{}/{}] Chunk {} ({:?} ~ {:?})",
idx + 1,
chunks.len(),
chunk.id,
chunk.time_range.start,
chunk.time_range.end
);
for schema in schemas {
if !validate_chunk_schema_files(chunk, schema, &actual_prefixes)? {
info!(" {}: no data, skipped", schema);
continue;
}
info!(" {}: importing...", schema);
let copy_start = Instant::now();
let source =
build_copy_source(&self.snapshot_uri, &self.storage_config, schema, chunk.id)
.context(ChunkImportFailedSnafu {
chunk_id: chunk.id,
schema: schema.clone(),
})?;
execute_copy_database_from(
&self.database_client,
&self.catalog,
schema,
&source,
format,
)
.await
.context(ChunkImportFailedSnafu {
chunk_id: chunk.id,
schema: schema.clone(),
})?;
info!(" {}: done in {:?}", schema, copy_start.elapsed());
}
}
info!("Data import finished in {:?}", import_start.elapsed());
Ok(())
execute_copy_database_from(
&self.import.database_client,
&self.import.catalog,
&task.schema,
&source,
self.format,
)
.await
.context(ChunkImportFailedSnafu {
chunk_id: task.chunk_id,
schema: task.schema.clone(),
})
}
}
@@ -511,14 +518,6 @@ fn validate_chunk_statuses(chunks: &[ChunkMeta]) -> Result<()> {
Ok(())
}
fn chunk_has_schema_files(chunk: &ChunkMeta, schema: &str) -> bool {
let prefix = data_dir_for_schema_chunk(schema, chunk.id);
chunk.files.iter().any(|path| {
let normalized = path.trim_start_matches('/');
normalized.starts_with(&prefix)
})
}
fn format_data_import_plan(chunks: &[ChunkMeta], schemas: &[String]) -> Vec<String> {
let mut lines = vec!["-- Data import plan:".to_string()];
for chunk in chunks {
@@ -536,7 +535,7 @@ async fn validate_data_snapshot(
storage: &dyn SnapshotStorage,
chunks: &[ChunkMeta],
schemas: &[String],
) -> Result<HashSet<String>> {
) -> Result<()> {
validate_chunk_statuses(chunks)?;
let actual_prefixes = collect_chunk_data_prefixes(storage).await?;
@@ -552,7 +551,7 @@ async fn validate_data_snapshot(
}
}
Ok(actual_prefixes)
Ok(())
}
async fn collect_chunk_data_prefixes(storage: &dyn SnapshotStorage) -> Result<HashSet<String>> {

View File

@@ -0,0 +1,695 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeSet;
use std::path::{Path, PathBuf};
use std::time::Instant;
use async_trait::async_trait;
use common_telemetry::{info, warn};
use crate::data::export_v2::manifest::{ChunkMeta, ChunkStatus};
use crate::data::import_v2::error::{
ImportStateDdlIncompleteSnafu, ImportStateMismatchSnafu, Result,
};
use crate::data::import_v2::state::{
ImportState, ImportStateLockGuard, ImportTaskKey, ImportTaskStatus, canonical_schema_selection,
delete_import_state, load_import_state, save_import_state, try_acquire_import_state_lock,
};
use crate::data::path::data_dir_for_schema_chunk;
#[async_trait]
pub(crate) trait ImportTaskExecutor {
async fn import_task(&self, task: &ImportTaskKey) -> Result<()>;
}
pub(crate) struct ImportResumeConfig {
pub(crate) snapshot_id: String,
pub(crate) target_addr: String,
pub(crate) catalog: String,
pub(crate) schemas: Vec<String>,
pub(crate) state_path: PathBuf,
pub(crate) tasks: Vec<ImportTaskKey>,
}
pub(crate) struct ImportResumeSession {
config: ImportResumeConfig,
state: ImportState,
lock: ImportStateLockGuard,
}
impl ImportResumeSession {
pub(crate) fn should_skip_ddl(&self) -> bool {
self.state.ddl_completed
}
/// Marks DDL as completed and persists the state. Must be called after a
/// successful DDL run on a fresh session, so that crashes after this point
/// resume into the data-import phase instead of replaying DDL.
pub(crate) async fn mark_ddl_completed(&mut self) -> Result<()> {
self.state.mark_ddl_completed();
save_import_state(&self.config.state_path, &self.state).await
}
}
pub(crate) fn chunk_has_schema_files(chunk: &ChunkMeta, schema: &str) -> bool {
let prefix = data_dir_for_schema_chunk(schema, chunk.id);
chunk.files.iter().any(|path| {
let normalized = path.trim_start_matches('/');
normalized.starts_with(&prefix)
})
}
pub(crate) fn build_import_tasks(chunks: &[ChunkMeta], schemas: &[String]) -> Vec<ImportTaskKey> {
let mut tasks = Vec::new();
for chunk in chunks {
if chunk.status == ChunkStatus::Skipped {
continue;
}
// TODO: build a per-chunk schema index if chunk file manifests become large.
for schema in schemas {
if chunk_has_schema_files(chunk, schema) {
tasks.push(ImportTaskKey::new(chunk.id, schema.clone()));
}
}
}
tasks
}
pub(crate) async fn prepare_import_resume(
config: ImportResumeConfig,
) -> Result<ImportResumeSession> {
// Validate the request before touching the state file or acquiring the
// lock. Duplicate task keys would corrupt the resume bookkeeping because
// status lookups use linear `find()` and only ever see the first match.
validate_config_tasks(&config)?;
let lock = try_acquire_import_state_lock(&config.state_path)?;
let state = match load_import_state(&config.state_path).await? {
Some(loaded) => {
validate_state_matches(&loaded, &config)?;
loaded
}
None => {
// Persist a fresh state immediately so that any crash after this
// point is recoverable as a resume. `ddl_completed=false` on a
// loaded state therefore means a previous run reached this point
// but did not confirm DDL completion - DDL must be (re-)run before
// data import is allowed.
let fresh = ImportState::new(
&config.snapshot_id,
&config.target_addr,
&config.catalog,
&config.schemas,
config.tasks.clone(),
);
save_import_state(&config.state_path, &fresh).await?;
fresh
}
};
Ok(ImportResumeSession {
config,
state,
lock,
})
}
pub(crate) async fn import_with_resume_session<E>(
session: ImportResumeSession,
executor: &E,
) -> Result<()>
where
E: ImportTaskExecutor + Sync,
{
let ImportResumeSession {
config,
mut state,
lock,
} = session;
// The state machine requires DDL to be explicitly marked completed before
// data import; otherwise a caller could import data and leave a state that
// replays DDL on the next resume. Surface the misuse instead of silently
// importing.
if !state.ddl_completed {
return ImportStateDdlIncompleteSnafu {
path: config.state_path.display().to_string(),
}
.fail();
}
let completed = state
.tasks
.iter()
.filter(|task| task.status == ImportTaskStatus::Completed)
.count();
info!(
"Import resume state: {} completed, {} pending, path: {}",
completed,
state.tasks.len().saturating_sub(completed),
config.state_path.display()
);
let import_start = Instant::now();
for (idx, task) in config.tasks.iter().enumerate() {
if state.task_status(task.chunk_id, &task.schema) == Some(ImportTaskStatus::Completed) {
info!(
"[{}/{}] Chunk {} schema {}: already completed, skipped",
idx + 1,
config.tasks.len(),
task.chunk_id,
task.schema
);
continue;
}
info!(
"[{}/{}] Chunk {} schema {}: importing...",
idx + 1,
config.tasks.len(),
task.chunk_id,
task.schema
);
state.set_task_status(
task.chunk_id,
&task.schema,
ImportTaskStatus::InProgress,
None,
)?;
save_import_state(&config.state_path, &state).await?;
let task_start = Instant::now();
let result = executor.import_task(task).await;
match result {
Ok(()) => {
// The task itself succeeded. If we cannot persist the
// Completed marker, the next resume will replay it (potentially
// duplicating data depending on engine semantics), but we must
// not pretend the import as a whole failed - return the persist
// error so the operator notices, after logging the success.
update_status_and_save(
&config,
&mut state,
task,
ImportTaskStatus::Completed,
None,
)
.await?;
info!(
"[{}/{}] Chunk {} schema {}: done in {:?}",
idx + 1,
config.tasks.len(),
task.chunk_id,
task.schema,
task_start.elapsed()
);
}
Err(task_error) => {
// Persist Failed best-effort, but always surface the original
// task error to the caller. State persistence problems are
// logged so they are not silently lost.
if let Err(persist_error) = update_status_and_save(
&config,
&mut state,
task,
ImportTaskStatus::Failed,
Some(task_error.to_string()),
)
.await
{
warn!(
"Failed to persist Failed status for chunk {} schema {} after task error ({}); state file may be out of date: {}",
task.chunk_id, task.schema, task_error, persist_error
);
}
return Err(task_error);
}
}
}
delete_import_state(&config.state_path).await?;
info!("Data import finished in {:?}", import_start.elapsed());
drop(lock);
Ok(())
}
async fn update_status_and_save(
config: &ImportResumeConfig,
state: &mut ImportState,
task: &ImportTaskKey,
status: ImportTaskStatus,
error_message: Option<String>,
) -> Result<()> {
// set_task_status only fails if the task isn't in the state; that would
// indicate a logic bug since `task` came from the same config. Surface it
// instead of swallowing.
state.set_task_status(task.chunk_id, &task.schema, status, error_message)?;
save_import_state(&config.state_path, state).await
}
fn validate_state_matches(state: &ImportState, config: &ImportResumeConfig) -> Result<()> {
if state.snapshot_id != config.snapshot_id {
return state_mismatch(
config,
format!(
"snapshot_id differs (state: {}, requested: {})",
state.snapshot_id, config.snapshot_id
),
);
}
// Target addresses are compared literally; hostname normalization is left to the caller.
if state.target_addr != config.target_addr {
return state_mismatch(
config,
format!(
"target_addr differs (state: {}, requested: {})",
state.target_addr, config.target_addr
),
);
}
if state.catalog != config.catalog {
return state_mismatch(
config,
format!(
"catalog differs (state: {}, requested: {})",
state.catalog, config.catalog
),
);
}
let requested_schemas = canonical_schema_selection(&config.schemas);
if state.schemas != requested_schemas {
return state_mismatch(
config,
format!(
"schemas differ (state: {:?}, requested: {:?})",
state.schemas, requested_schemas
),
);
}
if task_set_from_state(state, &config.state_path)? != task_set_from_config(config)? {
return state_mismatch(config, "task set differs".to_string());
}
Ok(())
}
fn state_mismatch(config: &ImportResumeConfig, reason: String) -> Result<()> {
ImportStateMismatchSnafu {
path: config.state_path.display().to_string(),
reason,
}
.fail()
}
fn task_set_from_state<'a>(
state: &'a ImportState,
state_path: &Path,
) -> Result<BTreeSet<(u32, &'a str)>> {
let mut tasks = BTreeSet::new();
for task in &state.tasks {
if !tasks.insert((task.chunk_id, task.schema.as_str())) {
return ImportStateMismatchSnafu {
path: state_path.display().to_string(),
reason: format!(
"duplicate task key in state (chunk_id: {}, schema: {})",
task.chunk_id, task.schema
),
}
.fail();
}
}
Ok(tasks)
}
fn task_set_from_config(config: &ImportResumeConfig) -> Result<BTreeSet<(u32, &str)>> {
let mut tasks = BTreeSet::new();
for task in &config.tasks {
if !tasks.insert((task.chunk_id, task.schema.as_str())) {
return ImportStateMismatchSnafu {
path: config.state_path.display().to_string(),
reason: format!(
"duplicate task key in request (chunk_id: {}, schema: {})",
task.chunk_id, task.schema
),
}
.fail();
}
}
Ok(tasks)
}
fn validate_config_tasks(config: &ImportResumeConfig) -> Result<()> {
task_set_from_config(config).map(|_| ())
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use super::*;
use crate::data::export_v2::manifest::{ChunkMeta, TimeRange};
use crate::data::import_v2::error::TestTaskFailedSnafu;
#[derive(Debug, Clone, Copy)]
enum FailureMode {
Fatal,
RetryableThenSuccess { failures: usize },
}
struct RecordingExecutor {
imported: Arc<Mutex<Vec<ImportTaskKey>>>,
fail_task: Option<ImportTaskKey>,
failure_mode: Option<FailureMode>,
attempts: Arc<AtomicUsize>,
}
#[async_trait]
impl ImportTaskExecutor for RecordingExecutor {
async fn import_task(&self, task: &ImportTaskKey) -> Result<()> {
let attempt = self.attempts.fetch_add(1, Ordering::SeqCst);
if self.fail_task.as_ref() == Some(task) {
match self.failure_mode {
Some(FailureMode::Fatal) => {
return TestTaskFailedSnafu {
message: "fatal failure".to_string(),
retryable: false,
}
.fail();
}
Some(FailureMode::RetryableThenSuccess { failures }) if attempt < failures => {
return TestTaskFailedSnafu {
message: "retryable failure".to_string(),
retryable: true,
}
.fail();
}
_ => {}
}
}
self.imported.lock().unwrap().push(task.clone());
Ok(())
}
}
fn recording_executor(imported: Arc<Mutex<Vec<ImportTaskKey>>>) -> RecordingExecutor {
RecordingExecutor {
imported,
fail_task: None,
failure_mode: None,
attempts: Arc::new(AtomicUsize::new(0)),
}
}
fn config(path: PathBuf, tasks: Vec<ImportTaskKey>) -> ImportResumeConfig {
ImportResumeConfig {
snapshot_id: "snapshot-1".to_string(),
target_addr: "127.0.0.1:4000".to_string(),
catalog: "greptime".to_string(),
schemas: vec!["public".to_string(), "analytics".to_string()],
state_path: path,
tasks,
}
}
async fn run_import_with_resume<E>(config: ImportResumeConfig, executor: &E) -> Result<()>
where
E: ImportTaskExecutor + Sync,
{
// Mirror the production caller: mark DDL completed for fresh sessions
// so the data-import guard is satisfied. Tests that want to exercise
// the unsafe path drive prepare/import directly.
let mut session = prepare_import_resume(config).await?;
if !session.should_skip_ddl() {
session.mark_ddl_completed().await?;
}
import_with_resume_session(session, executor).await
}
#[test]
fn test_build_import_tasks_skips_skipped_chunks_and_missing_schema_files() {
let mut completed = ChunkMeta::new(1, TimeRange::unbounded());
completed.status = ChunkStatus::Completed;
completed.files = vec!["data/public/1/file.parquet".to_string()];
let mut skipped = ChunkMeta::new(2, TimeRange::unbounded());
skipped.status = ChunkStatus::Skipped;
skipped.files = vec!["data/public/2/file.parquet".to_string()];
let tasks = build_import_tasks(
&[completed, skipped],
&["public".to_string(), "analytics".to_string()],
);
assert_eq!(tasks, vec![ImportTaskKey::new(1, "public")]);
}
#[tokio::test]
async fn test_import_with_resume_skips_completed_tasks() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("import_state.json");
let tasks = vec![
ImportTaskKey::new(1, "public"),
ImportTaskKey::new(2, "analytics"),
];
let mut state = ImportState::new(
"snapshot-1",
"127.0.0.1:4000",
"greptime",
&["public".to_string(), "analytics".to_string()],
tasks.clone(),
);
state.mark_ddl_completed();
state
.set_task_status(1, "public", ImportTaskStatus::Completed, None)
.unwrap();
save_import_state(&path, &state).await.unwrap();
let imported = Arc::new(Mutex::new(Vec::new()));
let executor = recording_executor(imported.clone());
run_import_with_resume(config(path.clone(), tasks), &executor)
.await
.unwrap();
assert_eq!(
imported.lock().unwrap().clone(),
vec![ImportTaskKey::new(2, "analytics")]
);
assert!(load_import_state(&path).await.unwrap().is_none());
}
#[tokio::test]
async fn test_import_with_resume_persists_failed_task() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("import_state.json");
let failed_task = ImportTaskKey::new(1, "public");
let tasks = vec![failed_task.clone()];
let imported = Arc::new(Mutex::new(Vec::new()));
let executor = RecordingExecutor {
imported,
fail_task: Some(failed_task.clone()),
failure_mode: Some(FailureMode::Fatal),
attempts: Arc::new(AtomicUsize::new(0)),
};
let error = run_import_with_resume(config(path.clone(), tasks), &executor)
.await
.unwrap_err();
assert!(matches!(
error,
crate::data::import_v2::error::Error::TestTaskFailed {
retryable: false,
..
}
));
let state = load_import_state(&path).await.unwrap().unwrap();
assert_eq!(
state.task_status(failed_task.chunk_id, &failed_task.schema),
Some(ImportTaskStatus::Failed)
);
}
#[tokio::test]
async fn test_import_with_resume_rejects_mismatched_state_identity() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("import_state.json");
let tasks = vec![ImportTaskKey::new(1, "public")];
let state = ImportState::new(
"snapshot-1",
"127.0.0.1:4001",
"greptime",
&["public".to_string(), "analytics".to_string()],
tasks.clone(),
);
save_import_state(&path, &state).await.unwrap();
let imported = Arc::new(Mutex::new(Vec::new()));
let executor = recording_executor(imported);
let error = run_import_with_resume(config(path, tasks), &executor)
.await
.unwrap_err();
assert!(matches!(
error,
crate::data::import_v2::error::Error::ImportStateMismatch { .. }
));
}
#[tokio::test]
async fn test_prepare_import_resume_reports_existing_state_before_ddl() {
let dir = tempfile::tempdir().unwrap();
let tasks = vec![ImportTaskKey::new(1, "public")];
let fresh_session =
prepare_import_resume(config(dir.path().join("fresh_state.json"), tasks.clone()))
.await
.unwrap();
assert!(!fresh_session.should_skip_ddl());
drop(fresh_session);
let existing_path = dir.path().join("existing_state.json");
let mut state = ImportState::new(
"snapshot-1",
"127.0.0.1:4000",
"greptime",
&["public".to_string(), "analytics".to_string()],
tasks.clone(),
);
state.mark_ddl_completed();
save_import_state(&existing_path, &state).await.unwrap();
let resume_session = prepare_import_resume(config(existing_path, tasks))
.await
.unwrap();
assert!(resume_session.should_skip_ddl());
}
#[tokio::test]
async fn test_import_with_resume_rejects_duplicate_state_tasks() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("import_state.json");
let tasks = vec![ImportTaskKey::new(1, "public")];
let mut state = ImportState::new(
"snapshot-1",
"127.0.0.1:4000",
"greptime",
&["public".to_string(), "analytics".to_string()],
tasks.clone(),
);
state.tasks.push(state.tasks[0].clone());
save_import_state(&path, &state).await.unwrap();
let imported = Arc::new(Mutex::new(Vec::new()));
let executor = recording_executor(imported);
let error = run_import_with_resume(config(path, tasks), &executor)
.await
.unwrap_err();
assert!(matches!(
error,
crate::data::import_v2::error::Error::ImportStateMismatch { .. }
));
}
#[tokio::test]
async fn test_import_with_resume_rejects_data_import_when_ddl_incomplete() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("import_state.json");
let tasks = vec![ImportTaskKey::new(1, "public")];
// prepare creates fresh state with ddl_completed=false; calling
// import_with_resume_session directly (without mark_ddl_completed)
// must be rejected.
let session = prepare_import_resume(config(path, tasks)).await.unwrap();
let imported = Arc::new(Mutex::new(Vec::new()));
let executor = recording_executor(imported.clone());
let error = import_with_resume_session(session, &executor)
.await
.unwrap_err();
assert!(matches!(
error,
crate::data::import_v2::error::Error::ImportStateDdlIncomplete { .. }
));
assert!(imported.lock().unwrap().is_empty());
}
#[tokio::test]
async fn test_prepare_import_resume_rejects_duplicate_request_tasks_on_fresh_state() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("import_state.json");
let task = ImportTaskKey::new(1, "public");
// No state file yet - duplicate detection must run before the fresh
// state is persisted, otherwise corrupted bookkeeping would be
// written to disk and observed only on a later resume.
let error =
match prepare_import_resume(config(path.clone(), vec![task.clone(), task])).await {
Ok(_) => panic!("duplicate request tasks should be rejected"),
Err(error) => error,
};
assert!(matches!(
error,
crate::data::import_v2::error::Error::ImportStateMismatch { .. }
));
assert!(load_import_state(&path).await.unwrap().is_none());
}
#[tokio::test]
async fn test_import_with_resume_does_not_retry_retryable_task_error() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("import_state.json");
let failed_task = ImportTaskKey::new(1, "public");
let tasks = vec![failed_task.clone()];
let imported = Arc::new(Mutex::new(Vec::new()));
let attempts = Arc::new(AtomicUsize::new(0));
let executor = RecordingExecutor {
imported: imported.clone(),
fail_task: Some(failed_task.clone()),
// If task import were retried, the second attempt would succeed.
// COPY DATABASE FROM failures are ambiguous, so retryable errors
// must still stop immediately to avoid duplicate rows.
failure_mode: Some(FailureMode::RetryableThenSuccess { failures: 1 }),
attempts: attempts.clone(),
};
let error = run_import_with_resume(config(path.clone(), tasks), &executor)
.await
.unwrap_err();
assert!(matches!(
error,
crate::data::import_v2::error::Error::TestTaskFailed {
retryable: true,
..
}
));
assert_eq!(attempts.load(Ordering::SeqCst), 1);
assert!(imported.lock().unwrap().is_empty());
let state = load_import_state(&path).await.unwrap().unwrap();
assert_eq!(
state.task_status(failed_task.chunk_id, &failed_task.schema),
Some(ImportTaskStatus::Failed)
);
}
}

View File

@@ -129,9 +129,55 @@ pub enum Error {
location: Location,
},
#[snafu(display("Import state references unknown chunk {}", chunk_id))]
ImportStateUnknownChunk {
#[snafu(display(
"Failed to determine import state path for snapshot '{}'. Set HOME, USERPROFILE, or run from a valid current directory.",
snapshot_id
))]
ImportStatePathUnavailable {
snapshot_id: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Import state at '{}' does not match current import: {}. Either rerun with matching import arguments, or delete the state file to start over (DDL will be re-executed).",
path,
reason
))]
ImportStateMismatch {
path: String,
reason: String,
#[snafu(implicit)]
location: Location,
},
#[cfg(test)]
#[snafu(display("Test task failed: {}", message))]
TestTaskFailed {
message: String,
retryable: bool,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Import state references unknown task: chunk {}, schema '{}'",
chunk_id,
schema
))]
ImportStateUnknownTask {
chunk_id: u32,
schema: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Import state at '{}' is not ready for data import: DDL has not been marked completed",
path
))]
ImportStateDdlIncomplete {
path: String,
#[snafu(implicit)]
location: Location,
},
@@ -148,7 +194,18 @@ impl ErrorExt for Error {
| Error::IncompleteSnapshot { .. }
| Error::EmptyChunkManifest { .. }
| Error::MissingChunkData { .. } => StatusCode::InvalidArguments,
Error::ImportStateUnknownChunk { .. } => StatusCode::Unexpected,
Error::ImportStatePathUnavailable { .. }
| Error::ImportStateUnknownTask { .. }
| Error::ImportStateDdlIncomplete { .. } => StatusCode::Unexpected,
Error::ImportStateMismatch { .. } => StatusCode::InvalidArguments,
#[cfg(test)]
Error::TestTaskFailed { retryable, .. } => {
if *retryable {
StatusCode::StorageUnavailable
} else {
StatusCode::InvalidArguments
}
}
Error::Database { error, .. } => error.status_code(),
Error::SnapshotStorage { error, .. } | Error::ChunkImportFailed { error, .. } => {
error.status_code()

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
@@ -24,8 +22,8 @@ use snafu::{IntoError, OptionExt, ResultExt};
use tokio::io::AsyncWriteExt;
use crate::data::import_v2::error::{
ImportStateIoSnafu, ImportStateLockedSnafu, ImportStateParseSnafu,
ImportStateUnknownChunkSnafu, Result,
ImportStateIoSnafu, ImportStateLockedSnafu, ImportStateParseSnafu, ImportStateUnknownTaskSnafu,
Result,
};
use crate::data::path::encode_path_segment;
@@ -33,9 +31,9 @@ const IMPORT_STATE_ROOT: &str = ".greptime";
const IMPORT_STATE_DIR: &str = "import_state";
static IMPORT_STATE_TMP_ID: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ImportChunkStatus {
pub(crate) enum ImportTaskStatus {
Pending,
InProgress,
Completed,
@@ -43,9 +41,25 @@ pub(crate) enum ImportChunkStatus {
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct ImportChunkState {
pub(crate) id: u32,
pub(crate) status: ImportChunkStatus,
pub(crate) struct ImportTaskKey {
pub(crate) chunk_id: u32,
pub(crate) schema: String,
}
impl ImportTaskKey {
pub(crate) fn new(chunk_id: u32, schema: impl Into<String>) -> Self {
Self {
chunk_id,
schema: schema.into(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct ImportTaskState {
pub(crate) chunk_id: u32,
pub(crate) schema: String,
pub(crate) status: ImportTaskStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) error: Option<String>,
}
@@ -54,55 +68,77 @@ pub(crate) struct ImportChunkState {
pub(crate) struct ImportState {
pub(crate) snapshot_id: String,
pub(crate) target_addr: String,
pub(crate) catalog: String,
pub(crate) schemas: Vec<String>,
#[serde(default)]
pub(crate) ddl_completed: bool,
pub(crate) updated_at: DateTime<Utc>,
// Chunk counts are expected to stay below ~1000, so linear scans are acceptable here.
pub(crate) chunks: Vec<ImportChunkState>,
// Tasks are (chunk-schema) tuples and can reach the tens of thousands;
// linear scans here are accepted because per-task work is dominated by
// network I/O and an fsync, but if the bound grows further this should be
// backed by a HashMap<(chunk_id, schema), index> rebuilt after load.
pub(crate) tasks: Vec<ImportTaskState>,
}
impl ImportState {
pub(crate) fn new<I>(
snapshot_id: impl Into<String>,
target_addr: impl Into<String>,
chunk_ids: I,
catalog: impl Into<String>,
schemas: &[String],
tasks: I,
) -> Self
where
I: IntoIterator<Item = u32>,
I: IntoIterator<Item = ImportTaskKey>,
{
Self {
snapshot_id: snapshot_id.into(),
target_addr: target_addr.into(),
catalog: catalog.into(),
schemas: canonical_schema_selection(schemas),
ddl_completed: false,
updated_at: Utc::now(),
chunks: chunk_ids
tasks: tasks
.into_iter()
.map(|id| ImportChunkState {
id,
status: ImportChunkStatus::Pending,
.map(|task| ImportTaskState {
chunk_id: task.chunk_id,
schema: task.schema,
status: ImportTaskStatus::Pending,
error: None,
})
.collect(),
}
}
pub(crate) fn chunk_status(&self, chunk_id: u32) -> Option<ImportChunkStatus> {
self.chunks
.iter()
.find(|chunk| chunk.id == chunk_id)
.map(|chunk| chunk.status.clone())
pub(crate) fn mark_ddl_completed(&mut self) {
self.ddl_completed = true;
self.updated_at = Utc::now();
}
pub(crate) fn set_chunk_status(
pub(crate) fn task_status(&self, chunk_id: u32, schema: &str) -> Option<ImportTaskStatus> {
self.tasks
.iter()
.find(|task| task.chunk_id == chunk_id && task.schema == schema)
.map(|task| task.status)
}
pub(crate) fn set_task_status(
&mut self,
chunk_id: u32,
status: ImportChunkStatus,
schema: &str,
status: ImportTaskStatus,
error: Option<String>,
) -> Result<()> {
let chunk = self
.chunks
let task = self
.tasks
.iter_mut()
.find(|chunk| chunk.id == chunk_id)
.context(ImportStateUnknownChunkSnafu { chunk_id })?;
chunk.status = status;
chunk.error = error;
.find(|task| task.chunk_id == chunk_id && task.schema == schema)
.context(ImportStateUnknownTaskSnafu {
chunk_id,
schema: schema.to_string(),
})?;
task.status = status;
task.error = error;
self.updated_at = Utc::now();
Ok(())
}
@@ -119,15 +155,27 @@ impl Drop for ImportStateLockGuard {
}
}
pub(crate) fn default_state_path(snapshot_id: &str, target_addr: &str) -> Option<PathBuf> {
pub(crate) fn default_state_path(
snapshot_id: &str,
target_addr: &str,
catalog: &str,
schemas: &[String],
) -> Option<PathBuf> {
let home = default_home_dir_with(|key| std::env::var_os(key));
let cwd = std::env::current_dir().ok();
default_state_path_with(home.as_deref(), cwd.as_deref(), snapshot_id, target_addr)
default_state_path_with(
home.as_deref(),
cwd.as_deref(),
snapshot_id,
target_addr,
catalog,
schemas,
)
}
fn default_home_dir_with<F>(get: F) -> Option<PathBuf>
where
F: for<'a> Fn(&'a str) -> Option<std::ffi::OsString>,
F: Fn(&str) -> Option<std::ffi::OsString>,
{
get("HOME")
.or_else(|| get("USERPROFILE"))
@@ -144,8 +192,10 @@ fn default_state_path_with(
cwd: Option<&Path>,
snapshot_id: &str,
target_addr: &str,
catalog: &str,
schemas: &[String],
) -> Option<PathBuf> {
let file_name = import_state_file_name(snapshot_id, target_addr);
let file_name = import_state_file_name(snapshot_id, target_addr, catalog, schemas);
match (home, cwd) {
(Some(home), _) => Some(
home.join(IMPORT_STATE_ROOT)
@@ -157,14 +207,58 @@ fn default_state_path_with(
}
}
fn import_state_file_name(snapshot_id: &str, target_addr: &str) -> String {
fn import_state_file_name(
snapshot_id: &str,
target_addr: &str,
catalog: &str,
schemas: &[String],
) -> String {
format!(
".import_state_{}_{}.json",
".import_state_{}_{}_{}.json",
encode_path_segment(snapshot_id),
encode_path_segment(target_addr)
encode_path_segment(target_addr),
import_identity_hash(catalog, schemas)
)
}
pub(crate) fn canonical_schema_selection(schemas: &[String]) -> Vec<String> {
let mut canonicalized = schemas
.iter()
.map(|schema| schema.to_ascii_lowercase())
.collect::<Vec<_>>();
canonicalized.sort();
canonicalized.dedup();
canonicalized
}
/// FNV-1a over `(catalog, schemas)`. The output is part of the persisted state
/// filename, so we cannot use `std::collections::hash_map::DefaultHasher` -
/// Rust does not guarantee its algorithm across releases, which would make a
/// state file written by one toolchain undiscoverable by another.
fn import_identity_hash(catalog: &str, schemas: &[String]) -> String {
const FNV_OFFSET: u64 = 0xcbf29ce484222325;
const FNV_PRIME: u64 = 0x100000001b3;
fn hash_bytes(mut hash: u64, bytes: &[u8]) -> u64 {
for byte in bytes {
hash ^= u64::from(*byte);
hash = hash.wrapping_mul(FNV_PRIME);
}
hash
}
let mut hash = FNV_OFFSET;
hash = hash_bytes(hash, catalog.as_bytes());
// 0xff cannot appear in valid UTF-8, so it works as an unambiguous
// field separator between adjacent identifiers.
hash = hash_bytes(hash, &[0xff]);
for schema in canonical_schema_selection(schemas) {
hash = hash_bytes(hash, schema.as_bytes());
hash = hash_bytes(hash, &[0xff]);
}
format!("{hash:016x}")
}
pub(crate) async fn load_import_state(path: &Path) -> Result<Option<ImportState>> {
match tokio::fs::read(path).await {
Ok(bytes) => {
@@ -268,10 +362,10 @@ fn import_state_lock_path(path: &Path) -> PathBuf {
}
fn normalize_import_state_for_resume(state: &mut ImportState) {
for chunk in &mut state.chunks {
if chunk.status == ImportChunkStatus::InProgress {
chunk.status = ImportChunkStatus::Pending;
chunk.error = None;
for task in &mut state.tasks {
if task.status == ImportTaskStatus::InProgress {
task.status = ImportTaskStatus::Pending;
task.error = None;
}
}
}
@@ -324,42 +418,82 @@ mod tests {
const CHILD_LOCK_TEST: &str =
"data::import_v2::state::tests::test_try_acquire_import_state_lock_child_process";
#[test]
fn test_import_state_new_initializes_pending_chunks() {
let state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]);
fn schemas() -> Vec<String> {
vec!["public".to_string(), "analytics".to_string()]
}
assert_eq!(state.snapshot_id, "snapshot-1");
assert_eq!(state.target_addr, "127.0.0.1:4000");
assert_eq!(state.chunks.len(), 2);
assert_eq!(state.chunks[0].status, ImportChunkStatus::Pending);
assert_eq!(state.chunks[1].status, ImportChunkStatus::Pending);
fn tasks() -> Vec<ImportTaskKey> {
vec![
ImportTaskKey::new(1, "public"),
ImportTaskKey::new(2, "analytics"),
]
}
#[test]
fn test_set_chunk_status_updates_timestamp_and_error() {
let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]);
fn test_import_state_new_initializes_pending_tasks() {
let state = ImportState::new(
"snapshot-1",
"127.0.0.1:4000",
"greptime",
&schemas(),
tasks(),
);
assert_eq!(state.snapshot_id, "snapshot-1");
assert_eq!(state.target_addr, "127.0.0.1:4000");
assert_eq!(state.catalog, "greptime");
assert_eq!(state.schemas, vec!["analytics", "public"]);
assert_eq!(state.tasks.len(), 2);
assert_eq!(state.tasks[0].status, ImportTaskStatus::Pending);
assert_eq!(state.tasks[1].status, ImportTaskStatus::Pending);
}
#[test]
fn test_set_task_status_updates_timestamp_and_error() {
let mut state = ImportState::new(
"snapshot-1",
"127.0.0.1:4000",
"greptime",
&schemas(),
tasks(),
);
let before = state.updated_at;
state.updated_at = Utc::now() - chrono::Duration::seconds(10);
state
.set_chunk_status(1, ImportChunkStatus::Failed, Some("timeout".to_string()))
.set_task_status(
1,
"public",
ImportTaskStatus::Failed,
Some("timeout".to_string()),
)
.unwrap();
assert_eq!(state.chunk_status(1), Some(ImportChunkStatus::Failed));
assert_eq!(state.chunks[0].error.as_deref(), Some("timeout"));
assert_eq!(
state.task_status(1, "public"),
Some(ImportTaskStatus::Failed)
);
assert_eq!(state.tasks[0].error.as_deref(), Some("timeout"));
assert!(state.updated_at > before);
}
#[test]
fn test_set_chunk_status_rejects_unknown_chunk_id() {
let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]);
fn test_set_task_status_rejects_unknown_task() {
let mut state = ImportState::new(
"snapshot-1",
"127.0.0.1:4000",
"greptime",
&schemas(),
tasks(),
);
let error = state
.set_chunk_status(99, ImportChunkStatus::Completed, None)
.set_task_status(99, "public", ImportTaskStatus::Completed, None)
.unwrap_err();
assert!(matches!(
error,
crate::data::import_v2::error::Error::ImportStateUnknownChunk { chunk_id, .. } if chunk_id == 99
crate::data::import_v2::error::Error::ImportStateUnknownTask { chunk_id, schema, .. }
if chunk_id == 99 && schema == "public"
));
}
@@ -367,9 +501,15 @@ mod tests {
async fn test_save_and_load_import_state_round_trip() {
let dir = tempdir().unwrap();
let path = dir.path().join("import_state.json");
let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]);
let mut state = ImportState::new(
"snapshot-1",
"127.0.0.1:4000",
"greptime",
&schemas(),
tasks(),
);
state
.set_chunk_status(2, ImportChunkStatus::Completed, None)
.set_task_status(2, "analytics", ImportTaskStatus::Completed, None)
.unwrap();
save_import_state(&path, &state).await.unwrap();
@@ -377,53 +517,78 @@ mod tests {
assert_eq!(loaded.snapshot_id, state.snapshot_id);
assert_eq!(loaded.target_addr, state.target_addr);
assert_eq!(loaded.chunks, state.chunks);
assert_eq!(loaded.catalog, state.catalog);
assert_eq!(loaded.schemas, state.schemas);
assert_eq!(loaded.tasks, state.tasks);
}
#[tokio::test]
async fn test_save_import_state_overwrites_existing_file() {
let dir = tempdir().unwrap();
let path = dir.path().join("import_state.json");
let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]);
let mut state = ImportState::new(
"snapshot-1",
"127.0.0.1:4000",
"greptime",
&schemas(),
tasks(),
);
save_import_state(&path, &state).await.unwrap();
state
.set_chunk_status(1, ImportChunkStatus::Completed, None)
.set_task_status(1, "public", ImportTaskStatus::Completed, None)
.unwrap();
save_import_state(&path, &state).await.unwrap();
let loaded = load_import_state(&path).await.unwrap().unwrap();
assert_eq!(loaded.chunk_status(1), Some(ImportChunkStatus::Completed));
assert_eq!(
loaded.task_status(1, "public"),
Some(ImportTaskStatus::Completed)
);
}
#[test]
fn test_load_import_state_resets_in_progress_to_pending() {
let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]);
let mut state = ImportState::new(
"snapshot-1",
"127.0.0.1:4000",
"greptime",
&schemas(),
tasks(),
);
state
.set_chunk_status(
.set_task_status(
2,
ImportChunkStatus::InProgress,
"analytics",
ImportTaskStatus::InProgress,
Some("running".to_string()),
)
.unwrap();
normalize_import_state_for_resume(&mut state);
assert_eq!(state.chunk_status(1), Some(ImportChunkStatus::Pending));
assert_eq!(state.chunk_status(2), Some(ImportChunkStatus::Pending));
assert_eq!(state.chunks[1].error, None);
assert_eq!(
state.task_status(1, "public"),
Some(ImportTaskStatus::Pending)
);
assert_eq!(
state.task_status(2, "analytics"),
Some(ImportTaskStatus::Pending)
);
assert_eq!(state.tasks[1].error, None);
}
#[test]
fn test_unique_tmp_path_generates_distinct_paths() {
let path = Path::new("/tmp/import_state.json");
let dir = tempdir().unwrap();
let path = dir.path().join("import_state.json");
let first = unique_tmp_path(path);
let second = unique_tmp_path(path);
let first = unique_tmp_path(&path);
let second = unique_tmp_path(&path);
assert_ne!(first, second);
assert!(first.starts_with("/tmp"));
assert!(second.starts_with("/tmp"));
assert!(first.starts_with(dir.path()));
assert!(second.starts_with(dir.path()));
assert!(
first
.file_name()
@@ -507,44 +672,104 @@ mod tests {
Some(cwd.path()),
"../snapshot",
"127.0.0.1:4000",
"greptime",
&schemas(),
)
.unwrap();
assert_eq!(
path,
home.path()
.join(IMPORT_STATE_ROOT)
.join(IMPORT_STATE_DIR)
.join(".import_state_%2E%2E%2Fsnapshot_127%2E0%2E0%2E1%3A4000.json")
path.parent().unwrap(),
home.path().join(IMPORT_STATE_ROOT).join(IMPORT_STATE_DIR)
);
let file_name = path.file_name().unwrap().to_string_lossy();
assert!(file_name.starts_with(".import_state_%2E%2E%2Fsnapshot_127%2E0%2E0%2E1%3A4000_"));
assert!(file_name.ends_with(".json"));
}
#[test]
fn test_default_state_path_falls_back_to_cwd_when_home_missing() {
let cwd = tempdir().unwrap();
let path =
default_state_path_with(None, Some(cwd.path()), "snapshot-1", "target-a").unwrap();
let path = default_state_path_with(
None,
Some(cwd.path()),
"snapshot-1",
"target-a",
"greptime",
&schemas(),
)
.unwrap();
assert_eq!(
path,
cwd.path().join(".import_state_snapshot-1_target-a.json")
);
assert_eq!(path.parent().unwrap(), cwd.path());
let file_name = path.file_name().unwrap().to_string_lossy();
assert!(file_name.starts_with(".import_state_snapshot-1_target-a_"));
assert!(file_name.ends_with(".json"));
}
#[test]
fn test_default_state_path_isolated_by_target_addr() {
let cwd = tempdir().unwrap();
let first = default_state_path_with(None, Some(cwd.path()), "snapshot-1", "127.0.0.1:4000")
.unwrap();
let second =
default_state_path_with(None, Some(cwd.path()), "snapshot-1", "127.0.0.1:4001")
.unwrap();
let first = default_state_path_with(
None,
Some(cwd.path()),
"snapshot-1",
"127.0.0.1:4000",
"greptime",
&schemas(),
)
.unwrap();
let second = default_state_path_with(
None,
Some(cwd.path()),
"snapshot-1",
"127.0.0.1:4001",
"greptime",
&schemas(),
)
.unwrap();
assert_ne!(first, second);
}
#[test]
fn test_default_state_path_isolated_by_catalog_and_schemas() {
let cwd = tempdir().unwrap();
let public_only = vec!["public".to_string()];
let analytics_only = vec!["analytics".to_string()];
let first = default_state_path_with(
None,
Some(cwd.path()),
"snapshot-1",
"127.0.0.1:4000",
"greptime",
&public_only,
)
.unwrap();
let second = default_state_path_with(
None,
Some(cwd.path()),
"snapshot-1",
"127.0.0.1:4000",
"other",
&public_only,
)
.unwrap();
let third = default_state_path_with(
None,
Some(cwd.path()),
"snapshot-1",
"127.0.0.1:4000",
"greptime",
&analytics_only,
)
.unwrap();
assert_ne!(first, second);
assert_ne!(first, third);
}
#[test]
fn test_default_home_dir_prefers_home() {
let detected = default_home_dir_with(|key| match key {

View File

@@ -1,113 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![allow(dead_code)]
use std::time::Duration;
use backon::ExponentialBuilder;
pub(crate) fn default_retry_policy() -> ExponentialBuilder {
ExponentialBuilder::default()
.with_min_delay(Duration::from_secs(1))
.with_max_delay(Duration::from_secs(300))
.with_factor(2.0)
// This is the number of retries after the initial attempt.
.with_max_times(3)
.with_jitter()
}
#[cfg(test)]
mod tests {
use std::future::ready;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use backon::Retryable;
use super::*;
#[tokio::test]
async fn test_retry_policy_retries_retryable_error_until_success() {
let attempts = Arc::new(AtomicUsize::new(0));
let result = ({
let attempts = attempts.clone();
move || {
let attempts = attempts.clone();
async move {
let current = attempts.fetch_add(1, Ordering::SeqCst);
if current < 2 {
Err("retryable")
} else {
Ok("done")
}
}
}
})
.retry(default_retry_policy())
.when(|error| *error == "retryable")
.sleep(|_| ready(()))
.await;
assert_eq!(result, Ok("done"));
assert_eq!(attempts.load(Ordering::SeqCst), 3);
}
#[tokio::test]
async fn test_retry_policy_stops_on_non_retryable_error() {
let attempts = Arc::new(AtomicUsize::new(0));
let result: std::result::Result<(), &str> = ({
let attempts = attempts.clone();
move || {
let attempts = attempts.clone();
async move {
attempts.fetch_add(1, Ordering::SeqCst);
Err("fatal")
}
}
})
.retry(default_retry_policy())
.when(|error| *error == "retryable")
.sleep(|_| ready(()))
.await;
assert_eq!(result, Err("fatal"));
assert_eq!(attempts.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_retry_policy_returns_last_error_after_reaching_limit() {
let attempts = Arc::new(AtomicUsize::new(0));
let result: std::result::Result<(), usize> = ({
let attempts = attempts.clone();
move || {
let attempts = attempts.clone();
async move {
let current = attempts.fetch_add(1, Ordering::SeqCst);
Err(current)
}
}
})
.retry(default_retry_policy().with_max_times(2))
.when(|_| true)
.sleep(|_| ready(()))
.await;
assert_eq!(result, Err(2));
assert_eq!(attempts.load(Ordering::SeqCst), 3);
}
}

View File

@@ -89,6 +89,10 @@ impl DatabaseClient {
}
}
pub fn addr(&self) -> &str {
&self.addr
}
pub async fn sql_in_public(&self, sql: &str) -> Result<Option<Vec<Vec<Value>>>> {
self.sql(sql, DEFAULT_SCHEMA_NAME).await
}