1#![allow(dead_code)]
16
17use std::path::{Path, PathBuf};
18use std::sync::atomic::{AtomicU64, Ordering};
19
20use chrono::{DateTime, Utc};
21use fs2::FileExt;
22use serde::{Deserialize, Serialize};
23use snafu::{IntoError, OptionExt, ResultExt};
24use tokio::io::AsyncWriteExt;
25
26use crate::data::import_v2::error::{
27 ImportStateIoSnafu, ImportStateLockedSnafu, ImportStateParseSnafu,
28 ImportStateUnknownChunkSnafu, Result,
29};
30use crate::data::path::encode_path_segment;
31
32const IMPORT_STATE_ROOT: &str = ".greptime";
33const IMPORT_STATE_DIR: &str = "import_state";
34static IMPORT_STATE_TMP_ID: AtomicU64 = AtomicU64::new(0);
35
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37#[serde(rename_all = "snake_case")]
38pub(crate) enum ImportChunkStatus {
39 Pending,
40 InProgress,
41 Completed,
42 Failed,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46pub(crate) struct ImportChunkState {
47 pub(crate) id: u32,
48 pub(crate) status: ImportChunkStatus,
49 #[serde(skip_serializing_if = "Option::is_none")]
50 pub(crate) error: Option<String>,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54pub(crate) struct ImportState {
55 pub(crate) snapshot_id: String,
56 pub(crate) target_addr: String,
57 pub(crate) updated_at: DateTime<Utc>,
58 pub(crate) chunks: Vec<ImportChunkState>,
60}
61
62impl ImportState {
63 pub(crate) fn new<I>(
64 snapshot_id: impl Into<String>,
65 target_addr: impl Into<String>,
66 chunk_ids: I,
67 ) -> Self
68 where
69 I: IntoIterator<Item = u32>,
70 {
71 Self {
72 snapshot_id: snapshot_id.into(),
73 target_addr: target_addr.into(),
74 updated_at: Utc::now(),
75 chunks: chunk_ids
76 .into_iter()
77 .map(|id| ImportChunkState {
78 id,
79 status: ImportChunkStatus::Pending,
80 error: None,
81 })
82 .collect(),
83 }
84 }
85
86 pub(crate) fn chunk_status(&self, chunk_id: u32) -> Option<ImportChunkStatus> {
87 self.chunks
88 .iter()
89 .find(|chunk| chunk.id == chunk_id)
90 .map(|chunk| chunk.status.clone())
91 }
92
93 pub(crate) fn set_chunk_status(
94 &mut self,
95 chunk_id: u32,
96 status: ImportChunkStatus,
97 error: Option<String>,
98 ) -> Result<()> {
99 let chunk = self
100 .chunks
101 .iter_mut()
102 .find(|chunk| chunk.id == chunk_id)
103 .context(ImportStateUnknownChunkSnafu { chunk_id })?;
104 chunk.status = status;
105 chunk.error = error;
106 self.updated_at = Utc::now();
107 Ok(())
108 }
109}
110
111#[derive(Debug)]
112pub(crate) struct ImportStateLockGuard {
113 file: std::fs::File,
114}
115
116impl Drop for ImportStateLockGuard {
117 fn drop(&mut self) {
118 let _ = self.file.unlock();
119 }
120}
121
122pub(crate) fn default_state_path(snapshot_id: &str, target_addr: &str) -> Option<PathBuf> {
123 let home = default_home_dir_with(|key| std::env::var_os(key));
124 let cwd = std::env::current_dir().ok();
125 default_state_path_with(home.as_deref(), cwd.as_deref(), snapshot_id, target_addr)
126}
127
128fn default_home_dir_with<F>(get: F) -> Option<PathBuf>
129where
130 F: for<'a> Fn(&'a str) -> Option<std::ffi::OsString>,
131{
132 get("HOME")
133 .or_else(|| get("USERPROFILE"))
134 .map(PathBuf::from)
135 .or_else(|| {
136 let drive = get("HOMEDRIVE")?;
137 let path = get("HOMEPATH")?;
138 Some(PathBuf::from(drive).join(path))
139 })
140}
141
142fn default_state_path_with(
143 home: Option<&Path>,
144 cwd: Option<&Path>,
145 snapshot_id: &str,
146 target_addr: &str,
147) -> Option<PathBuf> {
148 let file_name = import_state_file_name(snapshot_id, target_addr);
149 match (home, cwd) {
150 (Some(home), _) => Some(
151 home.join(IMPORT_STATE_ROOT)
152 .join(IMPORT_STATE_DIR)
153 .join(file_name),
154 ),
155 (None, Some(cwd)) => Some(cwd.join(file_name)),
156 (None, None) => None,
157 }
158}
159
160fn import_state_file_name(snapshot_id: &str, target_addr: &str) -> String {
161 format!(
162 ".import_state_{}_{}.json",
163 encode_path_segment(snapshot_id),
164 encode_path_segment(target_addr)
165 )
166}
167
168pub(crate) async fn load_import_state(path: &Path) -> Result<Option<ImportState>> {
169 match tokio::fs::read(path).await {
170 Ok(bytes) => {
171 let mut state: ImportState =
172 serde_json::from_slice(&bytes).context(ImportStateParseSnafu)?;
173 normalize_import_state_for_resume(&mut state);
174 Ok(Some(state))
175 }
176 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
177 Err(source) => Err(source).context(ImportStateIoSnafu {
178 path: path.display().to_string(),
179 }),
180 }
181}
182
183pub(crate) async fn save_import_state(path: &Path, state: &ImportState) -> Result<()> {
185 if let Some(parent) = path.parent() {
186 tokio::fs::create_dir_all(parent)
187 .await
188 .context(ImportStateIoSnafu {
189 path: parent.display().to_string(),
190 })?;
191 }
192
193 let bytes =
194 serde_json::to_vec_pretty(state).expect("ImportState should always be serializable");
195 let tmp_path = unique_tmp_path(path);
196 let mut file = tokio::fs::File::create(&tmp_path)
197 .await
198 .context(ImportStateIoSnafu {
199 path: tmp_path.display().to_string(),
200 })?;
201 file.write_all(&bytes).await.context(ImportStateIoSnafu {
202 path: tmp_path.display().to_string(),
203 })?;
204 file.sync_all().await.context(ImportStateIoSnafu {
205 path: tmp_path.display().to_string(),
206 })?;
207 drop(file);
209
210 tokio::fs::rename(&tmp_path, path)
211 .await
212 .context(ImportStateIoSnafu {
213 path: path.display().to_string(),
214 })?;
215 sync_parent_dir(path).await?;
216 Ok(())
217}
218
219pub(crate) fn try_acquire_import_state_lock(path: &Path) -> Result<ImportStateLockGuard> {
220 if let Some(parent) = path.parent() {
221 std::fs::create_dir_all(parent).context(ImportStateIoSnafu {
222 path: parent.display().to_string(),
223 })?;
224 }
225
226 let lock_path = import_state_lock_path(path);
227 let file = std::fs::OpenOptions::new()
228 .create(true)
229 .read(true)
230 .write(true)
231 .truncate(false)
232 .open(&lock_path)
233 .context(ImportStateIoSnafu {
234 path: lock_path.display().to_string(),
235 })?;
236 file.try_lock_exclusive().map_err(|error| {
237 if error.kind() == std::io::ErrorKind::WouldBlock {
238 ImportStateLockedSnafu {
239 path: lock_path.display().to_string(),
240 }
241 .build()
242 } else {
243 ImportStateIoSnafu {
244 path: lock_path.display().to_string(),
245 }
246 .into_error(error)
247 }
248 })?;
249
250 Ok(ImportStateLockGuard { file })
251}
252
253fn unique_tmp_path(path: &Path) -> PathBuf {
254 let pid = std::process::id();
255 let seq = IMPORT_STATE_TMP_ID.fetch_add(1, Ordering::Relaxed);
256 let file_name = path.file_name().unwrap_or_default().to_string_lossy();
257 path.with_file_name(format!("{file_name}.{pid}.{seq}.tmp"))
258}
259
260fn import_state_lock_path(path: &Path) -> PathBuf {
261 let file_name = path.file_name().unwrap_or_default().to_string_lossy();
262 path.with_file_name(format!("{file_name}.lock"))
263}
264
265fn normalize_import_state_for_resume(state: &mut ImportState) {
266 for chunk in &mut state.chunks {
267 if chunk.status == ImportChunkStatus::InProgress {
268 chunk.status = ImportChunkStatus::Pending;
269 chunk.error = None;
270 }
271 }
272}
273
274pub(crate) async fn delete_import_state(path: &Path) -> Result<()> {
275 match tokio::fs::remove_file(path).await {
276 Ok(()) => {
277 sync_parent_dir(path).await?;
278 Ok(())
279 }
280 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()),
281 Err(source) => Err(source).context(ImportStateIoSnafu {
282 path: path.display().to_string(),
283 }),
284 }
285}
286
287#[cfg(unix)]
288async fn sync_parent_dir(path: &Path) -> Result<()> {
289 let Some(parent) = path.parent() else {
290 return Ok(());
291 };
292
293 let dir = tokio::fs::File::open(parent)
294 .await
295 .context(ImportStateIoSnafu {
296 path: parent.display().to_string(),
297 })?;
298 dir.sync_all().await.context(ImportStateIoSnafu {
299 path: parent.display().to_string(),
300 })?;
301 Ok(())
302}
303
304#[cfg(not(unix))]
305async fn sync_parent_dir(_path: &Path) -> Result<()> {
306 Ok(())
307}
308
309#[cfg(test)]
310mod tests {
311 use chrono::Utc;
312 use tempfile::tempdir;
313
314 use super::*;
315
316 #[test]
317 fn test_import_state_new_initializes_pending_chunks() {
318 let state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]);
319
320 assert_eq!(state.snapshot_id, "snapshot-1");
321 assert_eq!(state.target_addr, "127.0.0.1:4000");
322 assert_eq!(state.chunks.len(), 2);
323 assert_eq!(state.chunks[0].status, ImportChunkStatus::Pending);
324 assert_eq!(state.chunks[1].status, ImportChunkStatus::Pending);
325 }
326
327 #[test]
328 fn test_set_chunk_status_updates_timestamp_and_error() {
329 let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]);
330 let before = state.updated_at;
331 state.updated_at = Utc::now() - chrono::Duration::seconds(10);
332
333 state
334 .set_chunk_status(1, ImportChunkStatus::Failed, Some("timeout".to_string()))
335 .unwrap();
336 assert_eq!(state.chunk_status(1), Some(ImportChunkStatus::Failed));
337 assert_eq!(state.chunks[0].error.as_deref(), Some("timeout"));
338 assert!(state.updated_at > before);
339 }
340
341 #[test]
342 fn test_set_chunk_status_rejects_unknown_chunk_id() {
343 let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]);
344
345 let error = state
346 .set_chunk_status(99, ImportChunkStatus::Completed, None)
347 .unwrap_err();
348
349 assert!(matches!(
350 error,
351 crate::data::import_v2::error::Error::ImportStateUnknownChunk { chunk_id, .. } if chunk_id == 99
352 ));
353 }
354
355 #[tokio::test]
356 async fn test_save_and_load_import_state_round_trip() {
357 let dir = tempdir().unwrap();
358 let path = dir.path().join("import_state.json");
359 let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]);
360 state
361 .set_chunk_status(2, ImportChunkStatus::Completed, None)
362 .unwrap();
363
364 save_import_state(&path, &state).await.unwrap();
365 let loaded = load_import_state(&path).await.unwrap().unwrap();
366
367 assert_eq!(loaded.snapshot_id, state.snapshot_id);
368 assert_eq!(loaded.target_addr, state.target_addr);
369 assert_eq!(loaded.chunks, state.chunks);
370 }
371
372 #[tokio::test]
373 async fn test_save_import_state_overwrites_existing_file() {
374 let dir = tempdir().unwrap();
375 let path = dir.path().join("import_state.json");
376 let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1]);
377 save_import_state(&path, &state).await.unwrap();
378
379 state
380 .set_chunk_status(1, ImportChunkStatus::Completed, None)
381 .unwrap();
382 save_import_state(&path, &state).await.unwrap();
383
384 let loaded = load_import_state(&path).await.unwrap().unwrap();
385 assert_eq!(loaded.chunk_status(1), Some(ImportChunkStatus::Completed));
386 }
387
388 #[test]
389 fn test_load_import_state_resets_in_progress_to_pending() {
390 let mut state = ImportState::new("snapshot-1", "127.0.0.1:4000", [1, 2]);
391 state
392 .set_chunk_status(
393 2,
394 ImportChunkStatus::InProgress,
395 Some("running".to_string()),
396 )
397 .unwrap();
398
399 normalize_import_state_for_resume(&mut state);
400
401 assert_eq!(state.chunk_status(1), Some(ImportChunkStatus::Pending));
402 assert_eq!(state.chunk_status(2), Some(ImportChunkStatus::Pending));
403 assert_eq!(state.chunks[1].error, None);
404 }
405
406 #[test]
407 fn test_unique_tmp_path_generates_distinct_paths() {
408 let path = Path::new("/tmp/import_state.json");
409
410 let first = unique_tmp_path(path);
411 let second = unique_tmp_path(path);
412
413 assert_ne!(first, second);
414 assert!(first.starts_with("/tmp"));
415 assert!(second.starts_with("/tmp"));
416 assert!(
417 first
418 .file_name()
419 .unwrap()
420 .to_string_lossy()
421 .ends_with(".tmp")
422 );
423 assert!(
424 second
425 .file_name()
426 .unwrap()
427 .to_string_lossy()
428 .ends_with(".tmp")
429 );
430 }
431
432 #[test]
433 fn test_try_acquire_import_state_lock_rejects_second_holder() {
434 let dir = tempdir().unwrap();
435 let path = dir.path().join("import_state.json");
436
437 let _first = try_acquire_import_state_lock(&path).unwrap();
438 let error = try_acquire_import_state_lock(&path).unwrap_err();
439
440 assert!(matches!(
441 error,
442 crate::data::import_v2::error::Error::ImportStateLocked { .. }
443 ));
444 }
445
446 #[tokio::test]
447 async fn test_delete_import_state_ignores_missing_file() {
448 let dir = tempdir().unwrap();
449 let path = dir.path().join("missing.json");
450
451 delete_import_state(&path).await.unwrap();
452 }
453
454 #[test]
455 fn test_default_state_path_prefers_home_and_encodes_snapshot_id() {
456 let home = tempdir().unwrap();
457 let cwd = tempdir().unwrap();
458
459 let path = default_state_path_with(
460 Some(home.path()),
461 Some(cwd.path()),
462 "../snapshot",
463 "127.0.0.1:4000",
464 )
465 .unwrap();
466
467 assert_eq!(
468 path,
469 home.path()
470 .join(IMPORT_STATE_ROOT)
471 .join(IMPORT_STATE_DIR)
472 .join(".import_state_%2E%2E%2Fsnapshot_127%2E0%2E0%2E1%3A4000.json")
473 );
474 }
475
476 #[test]
477 fn test_default_state_path_falls_back_to_cwd_when_home_missing() {
478 let cwd = tempdir().unwrap();
479
480 let path =
481 default_state_path_with(None, Some(cwd.path()), "snapshot-1", "target-a").unwrap();
482
483 assert_eq!(
484 path,
485 cwd.path().join(".import_state_snapshot-1_target-a.json")
486 );
487 }
488
489 #[test]
490 fn test_default_state_path_isolated_by_target_addr() {
491 let cwd = tempdir().unwrap();
492
493 let first = default_state_path_with(None, Some(cwd.path()), "snapshot-1", "127.0.0.1:4000")
494 .unwrap();
495 let second =
496 default_state_path_with(None, Some(cwd.path()), "snapshot-1", "127.0.0.1:4001")
497 .unwrap();
498
499 assert_ne!(first, second);
500 }
501
502 #[test]
503 fn test_default_home_dir_prefers_home() {
504 let detected = default_home_dir_with(|key| match key {
505 "HOME" => Some(std::ffi::OsString::from("/tmp/home")),
506 "USERPROFILE" => Some(std::ffi::OsString::from("/tmp/userprofile")),
507 _ => None,
508 });
509
510 assert_eq!(detected, Some(PathBuf::from("/tmp/home")));
511 }
512
513 #[test]
514 fn test_default_home_dir_falls_back_to_userprofile() {
515 let detected = default_home_dir_with(|key| match key {
516 "USERPROFILE" => Some(std::ffi::OsString::from("/tmp/userprofile")),
517 _ => None,
518 });
519
520 assert_eq!(detected, Some(PathBuf::from("/tmp/userprofile")));
521 }
522
523 #[test]
524 fn test_default_home_dir_falls_back_to_home_drive_and_path() {
525 let detected = default_home_dir_with(|key| match key {
526 "HOMEDRIVE" => Some(std::ffi::OsString::from("/tmp")),
527 "HOMEPATH" => Some(std::ffi::OsString::from("windows-home")),
528 _ => None,
529 });
530
531 assert_eq!(detected, Some(PathBuf::from("/tmp").join("windows-home")));
532 }
533}