mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 01:50:38 +00:00
Rework remote storage sync queue, general refactoring
This commit is contained in:
committed by
Kirill Bulatov
parent
64a602b8f3
commit
0a7735a656
223
pageserver/src/remote_storage/storage_sync/delete.rs
Normal file
223
pageserver/src/remote_storage/storage_sync/delete.rs
Normal file
@@ -0,0 +1,223 @@
|
||||
//! Timeline synchrnonization logic to delete a bulk of timeline's remote files from the remote storage.
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use tracing::{debug, error, info};
|
||||
use utils::zid::ZTenantTimelineId;
|
||||
|
||||
use crate::remote_storage::{
|
||||
storage_sync::{SyncQueue, SyncTask},
|
||||
RemoteStorage,
|
||||
};
|
||||
|
||||
use super::{LayersDeletion, SyncData};
|
||||
|
||||
/// Attempts to remove the timleline layers from the remote storage.
|
||||
/// If the task had not adjusted the metadata before, the deletion will fail.
|
||||
pub(super) async fn delete_timeline_layers<'a, P, S>(
|
||||
storage: &'a S,
|
||||
sync_queue: &SyncQueue,
|
||||
sync_id: ZTenantTimelineId,
|
||||
mut delete_data: SyncData<LayersDeletion>,
|
||||
) -> bool
|
||||
where
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
if !delete_data.data.deletion_registered {
|
||||
error!("Cannot delete timeline layers before the deletion metadata is not registered, reenqueueing");
|
||||
delete_data.retries += 1;
|
||||
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
|
||||
return false;
|
||||
}
|
||||
|
||||
if delete_data.data.layers_to_delete.is_empty() {
|
||||
info!("No layers to delete, skipping");
|
||||
return true;
|
||||
}
|
||||
|
||||
let layers_to_delete = delete_data
|
||||
.data
|
||||
.layers_to_delete
|
||||
.drain()
|
||||
.collect::<Vec<_>>();
|
||||
debug!("Layers to delete: {layers_to_delete:?}");
|
||||
info!("Deleting {} timeline layers", layers_to_delete.len());
|
||||
|
||||
let mut delete_tasks = layers_to_delete
|
||||
.into_iter()
|
||||
.map(|local_layer_path| async {
|
||||
let storage_path = match storage.storage_path(&local_layer_path).with_context(|| {
|
||||
format!(
|
||||
"Failed to get the layer storage path for local path '{}'",
|
||||
local_layer_path.display()
|
||||
)
|
||||
}) {
|
||||
Ok(path) => path,
|
||||
Err(e) => return Err((e, local_layer_path)),
|
||||
};
|
||||
|
||||
match storage.delete(&storage_path).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to delete remote layer from storage at '{:?}'",
|
||||
storage_path
|
||||
)
|
||||
}) {
|
||||
Ok(()) => Ok(local_layer_path),
|
||||
Err(e) => Err((e, local_layer_path)),
|
||||
}
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
let mut errored = false;
|
||||
while let Some(deletion_result) = delete_tasks.next().await {
|
||||
match deletion_result {
|
||||
Ok(local_layer_path) => {
|
||||
debug!(
|
||||
"Successfully deleted layer {} for timeline {sync_id}",
|
||||
local_layer_path.display()
|
||||
);
|
||||
delete_data.data.deleted_layers.insert(local_layer_path);
|
||||
}
|
||||
Err((e, local_layer_path)) => {
|
||||
errored = true;
|
||||
error!(
|
||||
"Failed to delete layer {} for timeline {sync_id}: {e:?}",
|
||||
local_layer_path.display()
|
||||
);
|
||||
delete_data.data.layers_to_delete.insert(local_layer_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if errored {
|
||||
debug!("Reenqueuing failed delete task for timeline {sync_id}");
|
||||
delete_data.retries += 1;
|
||||
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
|
||||
}
|
||||
errored
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{collections::HashSet, num::NonZeroUsize};
|
||||
|
||||
use itertools::Itertools;
|
||||
use tempfile::tempdir;
|
||||
use tokio::fs;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
remote_storage::{
|
||||
storage_sync::test_utils::{create_local_timeline, dummy_metadata},
|
||||
LocalFs,
|
||||
},
|
||||
repository::repo_harness::{RepoHarness, TIMELINE_ID},
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_timeline_negative() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("delete_timeline_negative")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let storage = LocalFs::new(tempdir()?.path().to_path_buf(), &harness.conf.workdir)?;
|
||||
|
||||
let deleted = delete_timeline_layers(
|
||||
&storage,
|
||||
&sync_queue,
|
||||
sync_id,
|
||||
SyncData {
|
||||
retries: 1,
|
||||
data: LayersDeletion {
|
||||
deleted_layers: HashSet::new(),
|
||||
layers_to_delete: HashSet::new(),
|
||||
deletion_registered: false,
|
||||
},
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(
|
||||
!deleted,
|
||||
"Should not start the deletion for task with delete metadata unregistered"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_timeline() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("delete_timeline")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let layer_files = ["a", "b", "c", "d"];
|
||||
let storage = LocalFs::new(tempdir()?.path().to_path_buf(), &harness.conf.workdir)?;
|
||||
let current_retries = 3;
|
||||
let metadata = dummy_metadata(Lsn(0x30));
|
||||
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
let timeline_upload =
|
||||
create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?;
|
||||
for local_path in timeline_upload.layers_to_upload {
|
||||
let remote_path = storage.storage_path(&local_path)?;
|
||||
let remote_parent_dir = remote_path.parent().unwrap();
|
||||
if !remote_parent_dir.exists() {
|
||||
fs::create_dir_all(&remote_parent_dir).await?;
|
||||
}
|
||||
fs::copy(&local_path, &remote_path).await?;
|
||||
}
|
||||
assert_eq!(
|
||||
storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|remote_path| storage.local_path(&remote_path).unwrap())
|
||||
.filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) })
|
||||
.sorted()
|
||||
.collect::<Vec<_>>(),
|
||||
layer_files
|
||||
.iter()
|
||||
.map(|layer_str| layer_str.to_string())
|
||||
.sorted()
|
||||
.collect::<Vec<_>>(),
|
||||
"Expect to have all layer files remotely before deletion"
|
||||
);
|
||||
|
||||
let deleted = delete_timeline_layers(
|
||||
&storage,
|
||||
&sync_queue,
|
||||
sync_id,
|
||||
SyncData {
|
||||
retries: current_retries,
|
||||
data: LayersDeletion {
|
||||
deleted_layers: HashSet::new(),
|
||||
layers_to_delete: HashSet::from([
|
||||
local_timeline_path.join("a"),
|
||||
local_timeline_path.join("c"),
|
||||
local_timeline_path.join("something_different"),
|
||||
]),
|
||||
deletion_registered: true,
|
||||
},
|
||||
},
|
||||
)
|
||||
.await;
|
||||
assert!(deleted, "Should be able to delete timeline files");
|
||||
|
||||
assert_eq!(
|
||||
storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|remote_path| storage.local_path(&remote_path).unwrap())
|
||||
.filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) })
|
||||
.sorted()
|
||||
.collect::<Vec<_>>(),
|
||||
vec!["b".to_string(), "d".to_string()],
|
||||
"Expect to have only non-deleted files remotely"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1 +1,228 @@
|
||||
//! Timeline synchrnonization logic to delete a bulk of timeline's remote files from the remote storage.
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use crate::storage_sync::{SyncQueue, SyncTask};
|
||||
use remote_storage::RemoteStorage;
|
||||
use utils::zid::ZTenantTimelineId;
|
||||
|
||||
use super::{LayersDeletion, SyncData};
|
||||
|
||||
/// Attempts to remove the timleline layers from the remote storage.
|
||||
/// If the task had not adjusted the metadata before, the deletion will fail.
|
||||
pub(super) async fn delete_timeline_layers<'a, P, S>(
|
||||
storage: &'a S,
|
||||
sync_queue: &SyncQueue,
|
||||
sync_id: ZTenantTimelineId,
|
||||
mut delete_data: SyncData<LayersDeletion>,
|
||||
) -> bool
|
||||
where
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
if !delete_data.data.deletion_registered {
|
||||
error!("Cannot delete timeline layers before the deletion metadata is not registered, reenqueueing");
|
||||
delete_data.retries += 1;
|
||||
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
|
||||
return false;
|
||||
}
|
||||
|
||||
if delete_data.data.layers_to_delete.is_empty() {
|
||||
info!("No layers to delete, skipping");
|
||||
return true;
|
||||
}
|
||||
|
||||
let layers_to_delete = delete_data
|
||||
.data
|
||||
.layers_to_delete
|
||||
.drain()
|
||||
.collect::<Vec<_>>();
|
||||
debug!("Layers to delete: {layers_to_delete:?}");
|
||||
info!("Deleting {} timeline layers", layers_to_delete.len());
|
||||
|
||||
let mut delete_tasks = layers_to_delete
|
||||
.into_iter()
|
||||
.map(|local_layer_path| async {
|
||||
let storage_path =
|
||||
match storage
|
||||
.remote_object_id(&local_layer_path)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to get the layer storage path for local path '{}'",
|
||||
local_layer_path.display()
|
||||
)
|
||||
}) {
|
||||
Ok(path) => path,
|
||||
Err(e) => return Err((e, local_layer_path)),
|
||||
};
|
||||
|
||||
match storage.delete(&storage_path).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to delete remote layer from storage at '{:?}'",
|
||||
storage_path
|
||||
)
|
||||
}) {
|
||||
Ok(()) => Ok(local_layer_path),
|
||||
Err(e) => Err((e, local_layer_path)),
|
||||
}
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
let mut errored = false;
|
||||
while let Some(deletion_result) = delete_tasks.next().await {
|
||||
match deletion_result {
|
||||
Ok(local_layer_path) => {
|
||||
debug!(
|
||||
"Successfully deleted layer {} for timeline {sync_id}",
|
||||
local_layer_path.display()
|
||||
);
|
||||
delete_data.data.deleted_layers.insert(local_layer_path);
|
||||
}
|
||||
Err((e, local_layer_path)) => {
|
||||
errored = true;
|
||||
error!(
|
||||
"Failed to delete layer {} for timeline {sync_id}: {e:?}",
|
||||
local_layer_path.display()
|
||||
);
|
||||
delete_data.data.layers_to_delete.insert(local_layer_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if errored {
|
||||
debug!("Reenqueuing failed delete task for timeline {sync_id}");
|
||||
delete_data.retries += 1;
|
||||
sync_queue.push(sync_id, SyncTask::Delete(delete_data));
|
||||
}
|
||||
errored
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{collections::HashSet, num::NonZeroUsize};
|
||||
|
||||
use itertools::Itertools;
|
||||
use tempfile::tempdir;
|
||||
use tokio::fs;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
repository::repo_harness::{RepoHarness, TIMELINE_ID},
|
||||
storage_sync::test_utils::{create_local_timeline, dummy_metadata},
|
||||
};
|
||||
use remote_storage::LocalFs;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_timeline_negative() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("delete_timeline_negative")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let storage = LocalFs::new(
|
||||
tempdir()?.path().to_path_buf(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?;
|
||||
|
||||
let deleted = delete_timeline_layers(
|
||||
&storage,
|
||||
&sync_queue,
|
||||
sync_id,
|
||||
SyncData {
|
||||
retries: 1,
|
||||
data: LayersDeletion {
|
||||
deleted_layers: HashSet::new(),
|
||||
layers_to_delete: HashSet::new(),
|
||||
deletion_registered: false,
|
||||
},
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(
|
||||
!deleted,
|
||||
"Should not start the deletion for task with delete metadata unregistered"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_timeline() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("delete_timeline")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let layer_files = ["a", "b", "c", "d"];
|
||||
let storage = LocalFs::new(
|
||||
tempdir()?.path().to_path_buf(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?;
|
||||
let current_retries = 3;
|
||||
let metadata = dummy_metadata(Lsn(0x30));
|
||||
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
let timeline_upload =
|
||||
create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?;
|
||||
for local_path in timeline_upload.layers_to_upload {
|
||||
let remote_path = storage.remote_object_id(&local_path)?;
|
||||
let remote_parent_dir = remote_path.parent().unwrap();
|
||||
if !remote_parent_dir.exists() {
|
||||
fs::create_dir_all(&remote_parent_dir).await?;
|
||||
}
|
||||
fs::copy(&local_path, &remote_path).await?;
|
||||
}
|
||||
assert_eq!(
|
||||
storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|remote_path| storage.local_path(&remote_path).unwrap())
|
||||
.filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) })
|
||||
.sorted()
|
||||
.collect::<Vec<_>>(),
|
||||
layer_files
|
||||
.iter()
|
||||
.map(|layer_str| layer_str.to_string())
|
||||
.sorted()
|
||||
.collect::<Vec<_>>(),
|
||||
"Expect to have all layer files remotely before deletion"
|
||||
);
|
||||
|
||||
let deleted = delete_timeline_layers(
|
||||
&storage,
|
||||
&sync_queue,
|
||||
sync_id,
|
||||
SyncData {
|
||||
retries: current_retries,
|
||||
data: LayersDeletion {
|
||||
deleted_layers: HashSet::new(),
|
||||
layers_to_delete: HashSet::from([
|
||||
local_timeline_path.join("a"),
|
||||
local_timeline_path.join("c"),
|
||||
local_timeline_path.join("something_different"),
|
||||
]),
|
||||
deletion_registered: true,
|
||||
},
|
||||
},
|
||||
)
|
||||
.await;
|
||||
assert!(deleted, "Should be able to delete timeline files");
|
||||
|
||||
assert_eq!(
|
||||
storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|remote_path| storage.local_path(&remote_path).unwrap())
|
||||
.filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) })
|
||||
.sorted()
|
||||
.collect::<Vec<_>>(),
|
||||
vec!["b".to_string(), "d".to_string()],
|
||||
"Expect to have only non-deleted files remotely"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,15 +12,13 @@ use tokio::{
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
layered_repository::metadata::metadata_path,
|
||||
storage_sync::{sync_queue, SyncTask},
|
||||
config::PageServerConf, layered_repository::metadata::metadata_path, storage_sync::SyncTask,
|
||||
};
|
||||
use utils::zid::ZTenantTimelineId;
|
||||
|
||||
use super::{
|
||||
index::{IndexPart, RemoteTimeline},
|
||||
SyncData, TimelineDownload,
|
||||
LayersDownload, SyncData, SyncQueue,
|
||||
};
|
||||
|
||||
pub const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
|
||||
@@ -76,7 +74,7 @@ pub(super) enum DownloadedTimeline {
|
||||
FailedAndRescheduled,
|
||||
/// Remote timeline data is found, its latest checkpoint's metadata contents (disk_consistent_lsn) is known.
|
||||
/// Initial download successful.
|
||||
Successful(SyncData<TimelineDownload>),
|
||||
Successful(SyncData<LayersDownload>),
|
||||
}
|
||||
|
||||
/// Attempts to download all given timeline's layers.
|
||||
@@ -87,9 +85,10 @@ pub(super) enum DownloadedTimeline {
|
||||
pub(super) async fn download_timeline_layers<'a, P, S>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &'a S,
|
||||
sync_queue: &'a SyncQueue,
|
||||
remote_timeline: Option<&'a RemoteTimeline>,
|
||||
sync_id: ZTenantTimelineId,
|
||||
mut download_data: SyncData<TimelineDownload>,
|
||||
mut download_data: SyncData<LayersDownload>,
|
||||
) -> DownloadedTimeline
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
@@ -251,7 +250,7 @@ where
|
||||
if errors_happened {
|
||||
debug!("Reenqueuing failed download task for timeline {sync_id}");
|
||||
download_data.retries += 1;
|
||||
sync_queue::push(sync_id, SyncTask::Download(download_data));
|
||||
sync_queue.push(sync_id, SyncTask::Download(download_data));
|
||||
DownloadedTimeline::FailedAndRescheduled
|
||||
} else {
|
||||
info!("Successfully downloaded all layers");
|
||||
@@ -265,7 +264,10 @@ async fn fsync_path(path: impl AsRef<Path>) -> Result<(), io::Error> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::{
|
||||
collections::{BTreeSet, HashSet},
|
||||
num::NonZeroUsize,
|
||||
};
|
||||
|
||||
use remote_storage::{LocalFs, RemoteStorage};
|
||||
use tempfile::tempdir;
|
||||
@@ -284,6 +286,8 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn download_timeline() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("download_timeline")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let layer_files = ["a", "b", "layer_to_skip", "layer_to_keep_locally"];
|
||||
let storage = LocalFs::new(
|
||||
@@ -324,11 +328,12 @@ mod tests {
|
||||
let download_data = match download_timeline_layers(
|
||||
harness.conf,
|
||||
&storage,
|
||||
&sync_queue,
|
||||
Some(&remote_timeline),
|
||||
sync_id,
|
||||
SyncData::new(
|
||||
current_retries,
|
||||
TimelineDownload {
|
||||
LayersDownload {
|
||||
layers_to_skip: HashSet::from([local_timeline_path.join("layer_to_skip")]),
|
||||
},
|
||||
),
|
||||
@@ -380,17 +385,19 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn download_timeline_negatives() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("download_timeline_negatives")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?;
|
||||
|
||||
let empty_remote_timeline_download = download_timeline_layers(
|
||||
harness.conf,
|
||||
&storage,
|
||||
&sync_queue,
|
||||
None,
|
||||
sync_id,
|
||||
SyncData::new(
|
||||
0,
|
||||
TimelineDownload {
|
||||
LayersDownload {
|
||||
layers_to_skip: HashSet::new(),
|
||||
},
|
||||
),
|
||||
@@ -409,11 +416,12 @@ mod tests {
|
||||
let already_downloading_remote_timeline_download = download_timeline_layers(
|
||||
harness.conf,
|
||||
&storage,
|
||||
&sync_queue,
|
||||
Some(¬_expecting_download_remote_timeline),
|
||||
sync_id,
|
||||
SyncData::new(
|
||||
0,
|
||||
TimelineDownload {
|
||||
LayersDownload {
|
||||
layers_to_skip: HashSet::new(),
|
||||
},
|
||||
),
|
||||
|
||||
@@ -8,16 +8,14 @@ use remote_storage::RemoteStorage;
|
||||
use tokio::fs;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
layered_repository::metadata::metadata_path,
|
||||
storage_sync::{sync_queue, SyncTask},
|
||||
};
|
||||
use utils::zid::ZTenantTimelineId;
|
||||
|
||||
use super::{
|
||||
index::{IndexPart, RemoteTimeline},
|
||||
SyncData, TimelineUpload,
|
||||
LayersUpload, SyncData, SyncQueue,
|
||||
};
|
||||
use crate::{
|
||||
config::PageServerConf, layered_repository::metadata::metadata_path, storage_sync::SyncTask,
|
||||
};
|
||||
|
||||
/// Serializes and uploads the given index part data to the remote storage.
|
||||
@@ -68,11 +66,7 @@ pub(super) enum UploadedTimeline {
|
||||
/// Upload failed due to some error, the upload task is rescheduled for another retry.
|
||||
FailedAndRescheduled,
|
||||
/// No issues happened during the upload, all task files were put into the remote storage.
|
||||
Successful(SyncData<TimelineUpload>),
|
||||
/// No failures happened during the upload, but some files were removed locally before the upload task completed
|
||||
/// (could happen due to retries, for instance, if GC happens in the interim).
|
||||
/// Such files are considered "not needed" and ignored, but the task's metadata should be discarded and the new one loaded from the local file.
|
||||
SuccessfulAfterLocalFsUpdate(SyncData<TimelineUpload>),
|
||||
Successful(SyncData<LayersUpload>),
|
||||
}
|
||||
|
||||
/// Attempts to upload given layer files.
|
||||
@@ -81,9 +75,10 @@ pub(super) enum UploadedTimeline {
|
||||
/// On an error, bumps the retries count and reschedules the entire task.
|
||||
pub(super) async fn upload_timeline_layers<'a, P, S>(
|
||||
storage: &'a S,
|
||||
sync_queue: &SyncQueue,
|
||||
remote_timeline: Option<&'a RemoteTimeline>,
|
||||
sync_id: ZTenantTimelineId,
|
||||
mut upload_data: SyncData<TimelineUpload>,
|
||||
mut upload_data: SyncData<LayersUpload>,
|
||||
) -> UploadedTimeline
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
@@ -168,7 +163,6 @@ where
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
let mut errors_happened = false;
|
||||
let mut local_fs_updated = false;
|
||||
while let Some(upload_result) = upload_tasks.next().await {
|
||||
match upload_result {
|
||||
Ok(uploaded_path) => {
|
||||
@@ -185,7 +179,16 @@ where
|
||||
errors_happened = true;
|
||||
error!("Failed to upload a layer for timeline {sync_id}: {e:?}");
|
||||
} else {
|
||||
local_fs_updated = true;
|
||||
// We have run the upload sync task, but the file we wanted to upload is gone.
|
||||
// This is "fine" due the asynchronous nature of the sync loop: it only reacts to events and might need to
|
||||
// retry the upload tasks, if S3 or network is down: but during this time, pageserver might still operate and
|
||||
// run compaction/gc threads, removing redundant files from disk.
|
||||
// It's not good to pause GC/compaction because of those and we would rather skip such uploads.
|
||||
//
|
||||
// Yet absence of such files might also mean that the timeline metadata file was updated (GC moves the Lsn forward, for instance).
|
||||
// We don't try to read a more recent version, since it could contain `disk_consistent_lsn` that does not have its upload finished yet.
|
||||
// This will create "missing" layers and make data inconsistent.
|
||||
// Instead, we only update the metadata when it was submitted in an upload task as a checkpoint result.
|
||||
upload.layers_to_upload.remove(&source_path);
|
||||
warn!(
|
||||
"Missing locally a layer file {} scheduled for upload, skipping",
|
||||
@@ -200,11 +203,8 @@ where
|
||||
if errors_happened {
|
||||
debug!("Reenqueuing failed upload task for timeline {sync_id}");
|
||||
upload_data.retries += 1;
|
||||
sync_queue::push(sync_id, SyncTask::Upload(upload_data));
|
||||
sync_queue.push(sync_id, SyncTask::Upload(upload_data));
|
||||
UploadedTimeline::FailedAndRescheduled
|
||||
} else if local_fs_updated {
|
||||
info!("Successfully uploaded all layers, some local layers were removed during the upload");
|
||||
UploadedTimeline::SuccessfulAfterLocalFsUpdate(upload_data)
|
||||
} else {
|
||||
info!("Successfully uploaded all layers");
|
||||
UploadedTimeline::Successful(upload_data)
|
||||
@@ -218,7 +218,10 @@ enum UploadError {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::{
|
||||
collections::{BTreeSet, HashSet},
|
||||
num::NonZeroUsize,
|
||||
};
|
||||
|
||||
use remote_storage::LocalFs;
|
||||
use tempfile::tempdir;
|
||||
@@ -237,6 +240,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn regular_layer_upload() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("regular_layer_upload")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
|
||||
let layer_files = ["a", "b"];
|
||||
@@ -258,6 +262,7 @@ mod tests {
|
||||
|
||||
let upload_result = upload_timeline_layers(
|
||||
&storage,
|
||||
&sync_queue,
|
||||
None,
|
||||
sync_id,
|
||||
SyncData::new(current_retries, timeline_upload.clone()),
|
||||
@@ -322,6 +327,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn layer_upload_after_local_fs_update() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("layer_upload_after_local_fs_update")?;
|
||||
let (sync_queue, _) = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
|
||||
let layer_files = ["a1", "b1"];
|
||||
@@ -347,6 +353,7 @@ mod tests {
|
||||
|
||||
let upload_result = upload_timeline_layers(
|
||||
&storage,
|
||||
&sync_queue,
|
||||
None,
|
||||
sync_id,
|
||||
SyncData::new(current_retries, timeline_upload.clone()),
|
||||
@@ -354,7 +361,7 @@ mod tests {
|
||||
.await;
|
||||
|
||||
let upload_data = match upload_result {
|
||||
UploadedTimeline::SuccessfulAfterLocalFsUpdate(upload_data) => upload_data,
|
||||
UploadedTimeline::Successful(upload_data) => upload_data,
|
||||
wrong_result => panic!(
|
||||
"Expected a successful after local fs upload for timeline, but got: {wrong_result:?}"
|
||||
),
|
||||
|
||||
Reference in New Issue
Block a user