Share the remote storage as a crate

This commit is contained in:
Kirill Bulatov
2022-05-04 17:06:44 +03:00
committed by Kirill Bulatov
parent d4e155aaa3
commit de37f982db
25 changed files with 961 additions and 957 deletions

View File

@@ -0,0 +1,470 @@
//! Timeline synchrnonization logic to fetch the layer files from remote storage into pageserver's local directory.
use std::{collections::HashSet, fmt::Debug, path::Path};
use anyhow::Context;
use futures::stream::{FuturesUnordered, StreamExt};
use remote_storage::{path_with_suffix_extension, RemoteStorage};
use tokio::{
fs,
io::{self, AsyncWriteExt},
};
use tracing::{debug, error, info, warn};
use crate::{
config::PageServerConf,
layered_repository::metadata::metadata_path,
storage_sync::{sync_queue, SyncTask},
};
use utils::zid::ZTenantTimelineId;
use super::{
index::{IndexPart, RemoteTimeline},
SyncData, TimelineDownload,
};
pub const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
/// Retrieves index data from the remote storage for a given timeline.
pub async fn download_index_part<P, S>(
conf: &'static PageServerConf,
storage: &S,
sync_id: ZTenantTimelineId,
) -> anyhow::Result<IndexPart>
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
{
let index_part_path = metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id)
.with_file_name(IndexPart::FILE_NAME)
.with_extension(IndexPart::FILE_EXTENSION);
let part_storage_path = storage
.remote_object_id(&index_part_path)
.with_context(|| {
format!(
"Failed to get the index part storage path for local path '{}'",
index_part_path.display()
)
})?;
let mut index_part_bytes = Vec::new();
storage
.download(&part_storage_path, &mut index_part_bytes)
.await
.with_context(|| {
format!("Failed to download an index part from storage path {part_storage_path:?}")
})?;
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes).with_context(|| {
format!("Failed to deserialize index part file from storage path '{part_storage_path:?}'")
})?;
let missing_files = index_part.missing_files();
if !missing_files.is_empty() {
warn!("Found missing layers in index part for timeline {sync_id}: {missing_files:?}");
}
Ok(index_part)
}
/// Timeline download result, with extra data, needed for downloading.
#[derive(Debug)]
pub(super) enum DownloadedTimeline {
/// Remote timeline data is either absent or corrupt, no download possible.
Abort,
/// Remote timeline data is found, its latest checkpoint's metadata contents (disk_consistent_lsn) is known.
/// Initial download failed due to some error, the download task is rescheduled for another retry.
FailedAndRescheduled,
/// Remote timeline data is found, its latest checkpoint's metadata contents (disk_consistent_lsn) is known.
/// Initial download successful.
Successful(SyncData<TimelineDownload>),
}
/// Attempts to download all given timeline's layers.
/// Timeline files that already exist locally are skipped during the download, but the local metadata file is
/// updated in the end, if the remote one contains a newer disk_consistent_lsn.
///
/// On an error, bumps the retries count and updates the files to skip with successful downloads, rescheduling the task.
pub(super) async fn download_timeline_layers<'a, P, S>(
conf: &'static PageServerConf,
storage: &'a S,
remote_timeline: Option<&'a RemoteTimeline>,
sync_id: ZTenantTimelineId,
mut download_data: SyncData<TimelineDownload>,
) -> DownloadedTimeline
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
{
let remote_timeline = match remote_timeline {
Some(remote_timeline) => {
if !remote_timeline.awaits_download {
error!("Timeline with sync id {sync_id} is not awaiting download");
return DownloadedTimeline::Abort;
}
remote_timeline
}
None => {
error!("Timeline with sync id {sync_id} is not present in the remote index");
return DownloadedTimeline::Abort;
}
};
let download = &mut download_data.data;
let layers_to_download = remote_timeline
.stored_files()
.difference(&download.layers_to_skip)
.cloned()
.collect::<Vec<_>>();
debug!("Layers to download: {layers_to_download:?}");
info!("Downloading {} timeline layers", layers_to_download.len());
let mut download_tasks = layers_to_download
.into_iter()
.map(|layer_desination_path| async move {
if layer_desination_path.exists() {
debug!(
"Layer already exists locally, skipping download: {}",
layer_desination_path.display()
);
} else {
let layer_storage_path = storage
.remote_object_id(&layer_desination_path)
.with_context(|| {
format!(
"Failed to get the layer storage path for local path '{}'",
layer_desination_path.display()
)
})?;
// Perform a rename inspired by durable_rename from file_utils.c.
// The sequence:
// write(tmp)
// fsync(tmp)
// rename(tmp, new)
// fsync(new)
// fsync(parent)
// For more context about durable_rename check this email from postgres mailing list:
// https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
// If pageserver crashes the temp file will be deleted on startup and re-downloaded.
let temp_file_path =
path_with_suffix_extension(&layer_desination_path, TEMP_DOWNLOAD_EXTENSION);
let mut destination_file =
fs::File::create(&temp_file_path).await.with_context(|| {
format!(
"Failed to create a destination file for layer '{}'",
temp_file_path.display()
)
})?;
storage
.download(&layer_storage_path, &mut destination_file)
.await
.with_context(|| {
format!(
"Failed to download a layer from storage path '{layer_storage_path:?}'"
)
})?;
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
// A file will not be closed immediately when it goes out of scope if there are any IO operations
// that have not yet completed. To ensure that a file is closed immediately when it is dropped,
// you should call flush before dropping it.
//
// From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
// we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
// But for additional safety lets check/wait for any pending operations.
destination_file.flush().await.with_context(|| {
format!(
"failed to flush source file at {}",
temp_file_path.display()
)
})?;
// not using sync_data because it can lose file size update
destination_file.sync_all().await.with_context(|| {
format!(
"failed to fsync source file at {}",
temp_file_path.display()
)
})?;
drop(destination_file);
fail::fail_point!("remote-storage-download-pre-rename", |_| {
anyhow::bail!("remote-storage-download-pre-rename failpoint triggered")
});
fs::rename(&temp_file_path, &layer_desination_path).await?;
fsync_path(&layer_desination_path).await.with_context(|| {
format!(
"Cannot fsync layer destination path {}",
layer_desination_path.display(),
)
})?;
}
Ok::<_, anyhow::Error>(layer_desination_path)
})
.collect::<FuturesUnordered<_>>();
let mut errors_happened = false;
// keep files we've downloaded to remove them from layers_to_skip if directory fsync fails
let mut undo = HashSet::new();
while let Some(download_result) = download_tasks.next().await {
match download_result {
Ok(downloaded_path) => {
undo.insert(downloaded_path.clone());
download.layers_to_skip.insert(downloaded_path);
}
Err(e) => {
errors_happened = true;
error!("Failed to download a layer for timeline {sync_id}: {e:?}");
}
}
}
// fsync timeline directory which is a parent directory for downloaded files
let ZTenantTimelineId {
tenant_id,
timeline_id,
} = &sync_id;
let timeline_dir = conf.timeline_path(timeline_id, tenant_id);
if let Err(e) = fsync_path(&timeline_dir).await {
error!(
"Cannot fsync parent directory {} error {}",
timeline_dir.display(),
e
);
for item in undo {
download.layers_to_skip.remove(&item);
}
errors_happened = true;
}
if errors_happened {
debug!("Reenqueuing failed download task for timeline {sync_id}");
download_data.retries += 1;
sync_queue::push(sync_id, SyncTask::Download(download_data));
DownloadedTimeline::FailedAndRescheduled
} else {
info!("Successfully downloaded all layers");
DownloadedTimeline::Successful(download_data)
}
}
async fn fsync_path(path: impl AsRef<Path>) -> Result<(), io::Error> {
fs::File::open(path).await?.sync_all().await
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeSet, HashSet};
use remote_storage::{LocalFs, RemoteStorage};
use tempfile::tempdir;
use utils::lsn::Lsn;
use crate::{
repository::repo_harness::{RepoHarness, TIMELINE_ID},
storage_sync::{
index::RelativePath,
test_utils::{create_local_timeline, dummy_metadata},
},
};
use super::*;
#[tokio::test]
async fn download_timeline() -> anyhow::Result<()> {
let harness = RepoHarness::create("download_timeline")?;
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let layer_files = ["a", "b", "layer_to_skip", "layer_to_keep_locally"];
let storage = LocalFs::new(
tempdir()?.path().to_path_buf(),
harness.conf.workdir.clone(),
)?;
let current_retries = 3;
let metadata = dummy_metadata(Lsn(0x30));
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
let timeline_upload =
create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?;
for local_path in timeline_upload.layers_to_upload {
let remote_path = storage.remote_object_id(&local_path)?;
let remote_parent_dir = remote_path.parent().unwrap();
if !remote_parent_dir.exists() {
fs::create_dir_all(&remote_parent_dir).await?;
}
fs::copy(&local_path, &remote_path).await?;
}
let mut read_dir = fs::read_dir(&local_timeline_path).await?;
while let Some(dir_entry) = read_dir.next_entry().await? {
if dir_entry.file_name().to_str() == Some("layer_to_keep_locally") {
continue;
} else {
fs::remove_file(dir_entry.path()).await?;
}
}
let mut remote_timeline = RemoteTimeline::new(metadata.clone());
remote_timeline.awaits_download = true;
remote_timeline.add_timeline_layers(
layer_files
.iter()
.map(|layer| local_timeline_path.join(layer)),
);
let download_data = match download_timeline_layers(
harness.conf,
&storage,
Some(&remote_timeline),
sync_id,
SyncData::new(
current_retries,
TimelineDownload {
layers_to_skip: HashSet::from([local_timeline_path.join("layer_to_skip")]),
},
),
)
.await
{
DownloadedTimeline::Successful(data) => data,
wrong_result => {
panic!("Expected a successful download for timeline, but got: {wrong_result:?}")
}
};
assert_eq!(
current_retries, download_data.retries,
"On successful download, retries are not expected to change"
);
assert_eq!(
download_data
.data
.layers_to_skip
.into_iter()
.collect::<BTreeSet<_>>(),
layer_files
.iter()
.map(|layer| local_timeline_path.join(layer))
.collect(),
"On successful download, layers to skip should contain all downloaded files and present layers that were skipped"
);
let mut downloaded_files = BTreeSet::new();
let mut read_dir = fs::read_dir(&local_timeline_path).await?;
while let Some(dir_entry) = read_dir.next_entry().await? {
downloaded_files.insert(dir_entry.path());
}
assert_eq!(
downloaded_files,
layer_files
.iter()
.filter(|layer| layer != &&"layer_to_skip")
.map(|layer| local_timeline_path.join(layer))
.collect(),
"On successful download, all layers that were not skipped, should be downloaded"
);
Ok(())
}
#[tokio::test]
async fn download_timeline_negatives() -> anyhow::Result<()> {
let harness = RepoHarness::create("download_timeline_negatives")?;
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?;
let empty_remote_timeline_download = download_timeline_layers(
harness.conf,
&storage,
None,
sync_id,
SyncData::new(
0,
TimelineDownload {
layers_to_skip: HashSet::new(),
},
),
)
.await;
assert!(
matches!(empty_remote_timeline_download, DownloadedTimeline::Abort),
"Should not allow downloading for empty remote timeline"
);
let not_expecting_download_remote_timeline = RemoteTimeline::new(dummy_metadata(Lsn(5)));
assert!(
!not_expecting_download_remote_timeline.awaits_download,
"Should not expect download for the timeline"
);
let already_downloading_remote_timeline_download = download_timeline_layers(
harness.conf,
&storage,
Some(&not_expecting_download_remote_timeline),
sync_id,
SyncData::new(
0,
TimelineDownload {
layers_to_skip: HashSet::new(),
},
),
)
.await;
assert!(
matches!(
already_downloading_remote_timeline_download,
DownloadedTimeline::Abort,
),
"Should not allow downloading for remote timeline that does not expect it"
);
Ok(())
}
#[tokio::test]
async fn test_download_index_part() -> anyhow::Result<()> {
let harness = RepoHarness::create("test_download_index_part")?;
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(
tempdir()?.path().to_path_buf(),
harness.conf.workdir.clone(),
)?;
let metadata = dummy_metadata(Lsn(0x30));
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
let index_part = IndexPart::new(
HashSet::from([
RelativePath::new(&local_timeline_path, local_timeline_path.join("one"))?,
RelativePath::new(&local_timeline_path, local_timeline_path.join("two"))?,
]),
HashSet::from([RelativePath::new(
&local_timeline_path,
local_timeline_path.join("three"),
)?]),
metadata.disk_consistent_lsn(),
metadata.to_bytes()?,
);
let local_index_part_path =
metadata_path(harness.conf, sync_id.timeline_id, sync_id.tenant_id)
.with_file_name(IndexPart::FILE_NAME)
.with_extension(IndexPart::FILE_EXTENSION);
let storage_path = storage.remote_object_id(&local_index_part_path)?;
fs::create_dir_all(storage_path.parent().unwrap()).await?;
fs::write(&storage_path, serde_json::to_vec(&index_part)?).await?;
let downloaded_index_part = download_index_part(harness.conf, &storage, sync_id).await?;
assert_eq!(
downloaded_index_part, index_part,
"Downloaded index part should be the same as the one in storage"
);
Ok(())
}
}

View File

@@ -0,0 +1,408 @@
//! In-memory index to track the tenant files on the remote storage.
//! Able to restore itself from the storage index parts, that are located in every timeline's remote directory and contain all data about
//! remote timeline layers and its metadata.
use std::{
collections::{HashMap, HashSet},
path::{Path, PathBuf},
sync::Arc,
};
use anyhow::{Context, Ok};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use tokio::sync::RwLock;
use crate::{config::PageServerConf, layered_repository::metadata::TimelineMetadata};
use utils::{lsn::Lsn, zid::ZTenantTimelineId};
/// A part of the filesystem path, that needs a root to become a path again.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct RelativePath(String);
impl RelativePath {
/// Attempts to strip off the base from path, producing a relative path or an error.
pub fn new<P: AsRef<Path>>(base: &Path, path: P) -> anyhow::Result<Self> {
let path = path.as_ref();
let relative = path.strip_prefix(base).with_context(|| {
format!(
"path '{}' is not relative to base '{}'",
path.display(),
base.display()
)
})?;
Ok(RelativePath(relative.to_string_lossy().to_string()))
}
/// Joins the relative path with the base path.
fn as_path(&self, base: &Path) -> PathBuf {
base.join(&self.0)
}
}
/// An index to track tenant files that exist on the remote storage.
#[derive(Debug, Clone)]
pub struct RemoteTimelineIndex {
timeline_entries: HashMap<ZTenantTimelineId, RemoteTimeline>,
}
/// A wrapper to synchronize the access to the index, should be created and used before dealing with any [`RemoteTimelineIndex`].
pub struct RemoteIndex(Arc<RwLock<RemoteTimelineIndex>>);
impl RemoteIndex {
pub fn empty() -> Self {
Self(Arc::new(RwLock::new(RemoteTimelineIndex {
timeline_entries: HashMap::new(),
})))
}
pub fn from_parts(
conf: &'static PageServerConf,
index_parts: HashMap<ZTenantTimelineId, IndexPart>,
) -> anyhow::Result<Self> {
let mut timeline_entries = HashMap::new();
for (sync_id, index_part) in index_parts {
let timeline_path = conf.timeline_path(&sync_id.timeline_id, &sync_id.tenant_id);
let remote_timeline = RemoteTimeline::from_index_part(&timeline_path, index_part)
.context("Failed to restore remote timeline data from index part")?;
timeline_entries.insert(sync_id, remote_timeline);
}
Ok(Self(Arc::new(RwLock::new(RemoteTimelineIndex {
timeline_entries,
}))))
}
pub async fn read(&self) -> tokio::sync::RwLockReadGuard<'_, RemoteTimelineIndex> {
self.0.read().await
}
pub async fn write(&self) -> tokio::sync::RwLockWriteGuard<'_, RemoteTimelineIndex> {
self.0.write().await
}
}
impl Clone for RemoteIndex {
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}
impl RemoteTimelineIndex {
pub fn timeline_entry(&self, id: &ZTenantTimelineId) -> Option<&RemoteTimeline> {
self.timeline_entries.get(id)
}
pub fn timeline_entry_mut(&mut self, id: &ZTenantTimelineId) -> Option<&mut RemoteTimeline> {
self.timeline_entries.get_mut(id)
}
pub fn add_timeline_entry(&mut self, id: ZTenantTimelineId, entry: RemoteTimeline) {
self.timeline_entries.insert(id, entry);
}
pub fn all_sync_ids(&self) -> impl Iterator<Item = ZTenantTimelineId> + '_ {
self.timeline_entries.keys().copied()
}
pub fn set_awaits_download(
&mut self,
id: &ZTenantTimelineId,
awaits_download: bool,
) -> anyhow::Result<()> {
self.timeline_entry_mut(id)
.ok_or_else(|| anyhow::anyhow!("unknown timeline sync {}", id))?
.awaits_download = awaits_download;
Ok(())
}
}
/// Restored index part data about the timeline, stored in the remote index.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct RemoteTimeline {
timeline_layers: HashSet<PathBuf>,
missing_layers: HashSet<PathBuf>,
pub metadata: TimelineMetadata,
pub awaits_download: bool,
}
impl RemoteTimeline {
pub fn new(metadata: TimelineMetadata) -> Self {
Self {
timeline_layers: HashSet::new(),
missing_layers: HashSet::new(),
metadata,
awaits_download: false,
}
}
pub fn add_timeline_layers(&mut self, new_layers: impl IntoIterator<Item = PathBuf>) {
self.timeline_layers.extend(new_layers.into_iter());
}
pub fn add_upload_failures(&mut self, upload_failures: impl IntoIterator<Item = PathBuf>) {
self.missing_layers.extend(upload_failures.into_iter());
}
/// Lists all layer files in the given remote timeline. Omits the metadata file.
pub fn stored_files(&self) -> &HashSet<PathBuf> {
&self.timeline_layers
}
pub fn from_index_part(timeline_path: &Path, index_part: IndexPart) -> anyhow::Result<Self> {
let metadata = TimelineMetadata::from_bytes(&index_part.metadata_bytes)?;
Ok(Self {
timeline_layers: to_local_paths(timeline_path, index_part.timeline_layers),
missing_layers: to_local_paths(timeline_path, index_part.missing_layers),
metadata,
awaits_download: false,
})
}
}
/// Part of the remote index, corresponding to a certain timeline.
/// Contains the data about all files in the timeline, present remotely and its metadata.
#[serde_as]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct IndexPart {
timeline_layers: HashSet<RelativePath>,
/// Currently is not really used in pageserver,
/// present to manually keep track of the layer files that pageserver might never retrieve.
///
/// Such "holes" might appear if any upload task was evicted on an error threshold:
/// the this layer will only be rescheduled for upload on pageserver restart.
missing_layers: HashSet<RelativePath>,
#[serde_as(as = "DisplayFromStr")]
disk_consistent_lsn: Lsn,
metadata_bytes: Vec<u8>,
}
impl IndexPart {
pub const FILE_NAME: &'static str = "index_part";
pub const FILE_EXTENSION: &'static str = "json";
#[cfg(test)]
pub fn new(
timeline_layers: HashSet<RelativePath>,
missing_layers: HashSet<RelativePath>,
disk_consistent_lsn: Lsn,
metadata_bytes: Vec<u8>,
) -> Self {
Self {
timeline_layers,
missing_layers,
disk_consistent_lsn,
metadata_bytes,
}
}
pub fn missing_files(&self) -> &HashSet<RelativePath> {
&self.missing_layers
}
pub fn from_remote_timeline(
timeline_path: &Path,
remote_timeline: RemoteTimeline,
) -> anyhow::Result<Self> {
let metadata_bytes = remote_timeline.metadata.to_bytes()?;
Ok(Self {
timeline_layers: to_relative_paths(timeline_path, remote_timeline.timeline_layers)
.context("Failed to convert timeline layers' paths to relative ones")?,
missing_layers: to_relative_paths(timeline_path, remote_timeline.missing_layers)
.context("Failed to convert missing layers' paths to relative ones")?,
disk_consistent_lsn: remote_timeline.metadata.disk_consistent_lsn(),
metadata_bytes,
})
}
}
fn to_local_paths(
timeline_path: &Path,
paths: impl IntoIterator<Item = RelativePath>,
) -> HashSet<PathBuf> {
paths
.into_iter()
.map(|path| path.as_path(timeline_path))
.collect()
}
fn to_relative_paths(
timeline_path: &Path,
paths: impl IntoIterator<Item = PathBuf>,
) -> anyhow::Result<HashSet<RelativePath>> {
paths
.into_iter()
.map(|path| RelativePath::new(timeline_path, path))
.collect()
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use super::*;
use crate::repository::repo_harness::{RepoHarness, TIMELINE_ID};
#[test]
fn index_part_conversion() {
let harness = RepoHarness::create("index_part_conversion").unwrap();
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let metadata =
TimelineMetadata::new(Lsn(5).align(), Some(Lsn(4)), None, Lsn(3), Lsn(2), Lsn(1));
let remote_timeline = RemoteTimeline {
timeline_layers: HashSet::from([
timeline_path.join("layer_1"),
timeline_path.join("layer_2"),
]),
missing_layers: HashSet::from([
timeline_path.join("missing_1"),
timeline_path.join("missing_2"),
]),
metadata: metadata.clone(),
awaits_download: false,
};
let index_part = IndexPart::from_remote_timeline(&timeline_path, remote_timeline.clone())
.expect("Correct remote timeline should be convertable to index part");
assert_eq!(
index_part.timeline_layers.iter().collect::<BTreeSet<_>>(),
BTreeSet::from([
&RelativePath("layer_1".to_string()),
&RelativePath("layer_2".to_string())
]),
"Index part should have all remote timeline layers after the conversion"
);
assert_eq!(
index_part.missing_layers.iter().collect::<BTreeSet<_>>(),
BTreeSet::from([
&RelativePath("missing_1".to_string()),
&RelativePath("missing_2".to_string())
]),
"Index part should have all missing remote timeline layers after the conversion"
);
assert_eq!(
index_part.disk_consistent_lsn,
metadata.disk_consistent_lsn(),
"Index part should have disk consistent lsn from the timeline"
);
assert_eq!(
index_part.metadata_bytes,
metadata
.to_bytes()
.expect("Failed to serialize correct metadata into bytes"),
"Index part should have all missing remote timeline layers after the conversion"
);
let restored_timeline = RemoteTimeline::from_index_part(&timeline_path, index_part)
.expect("Correct index part should be convertable to remote timeline");
let original_metadata = &remote_timeline.metadata;
let restored_metadata = &restored_timeline.metadata;
// we have to compare the metadata this way, since its header is different after creation and restoration,
// but that is now consireded ok.
assert_eq!(
original_metadata.disk_consistent_lsn(),
restored_metadata.disk_consistent_lsn(),
"remote timeline -> index part -> remote timeline conversion should not alter metadata"
);
assert_eq!(
original_metadata.prev_record_lsn(),
restored_metadata.prev_record_lsn(),
"remote timeline -> index part -> remote timeline conversion should not alter metadata"
);
assert_eq!(
original_metadata.ancestor_timeline(),
restored_metadata.ancestor_timeline(),
"remote timeline -> index part -> remote timeline conversion should not alter metadata"
);
assert_eq!(
original_metadata.ancestor_lsn(),
restored_metadata.ancestor_lsn(),
"remote timeline -> index part -> remote timeline conversion should not alter metadata"
);
assert_eq!(
original_metadata.latest_gc_cutoff_lsn(),
restored_metadata.latest_gc_cutoff_lsn(),
"remote timeline -> index part -> remote timeline conversion should not alter metadata"
);
assert_eq!(
original_metadata.initdb_lsn(),
restored_metadata.initdb_lsn(),
"remote timeline -> index part -> remote timeline conversion should not alter metadata"
);
assert_eq!(
remote_timeline.awaits_download, restored_timeline.awaits_download,
"remote timeline -> index part -> remote timeline conversion should not loose download flag"
);
assert_eq!(
remote_timeline
.timeline_layers
.into_iter()
.collect::<BTreeSet<_>>(),
restored_timeline
.timeline_layers
.into_iter()
.collect::<BTreeSet<_>>(),
"remote timeline -> index part -> remote timeline conversion should not loose layer data"
);
assert_eq!(
remote_timeline
.missing_layers
.into_iter()
.collect::<BTreeSet<_>>(),
restored_timeline
.missing_layers
.into_iter()
.collect::<BTreeSet<_>>(),
"remote timeline -> index part -> remote timeline conversion should not loose missing file data"
);
}
#[test]
fn index_part_conversion_negatives() {
let harness = RepoHarness::create("index_part_conversion_negatives").unwrap();
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let metadata =
TimelineMetadata::new(Lsn(5).align(), Some(Lsn(4)), None, Lsn(3), Lsn(2), Lsn(1));
let conversion_result = IndexPart::from_remote_timeline(
&timeline_path,
RemoteTimeline {
timeline_layers: HashSet::from([
PathBuf::from("bad_path"),
timeline_path.join("layer_2"),
]),
missing_layers: HashSet::from([
timeline_path.join("missing_1"),
timeline_path.join("missing_2"),
]),
metadata: metadata.clone(),
awaits_download: false,
},
);
assert!(conversion_result.is_err(), "Should not be able to convert metadata with layer paths that are not in the timeline directory");
let conversion_result = IndexPart::from_remote_timeline(
&timeline_path,
RemoteTimeline {
timeline_layers: HashSet::from([
timeline_path.join("layer_1"),
timeline_path.join("layer_2"),
]),
missing_layers: HashSet::from([
PathBuf::from("bad_path"),
timeline_path.join("missing_2"),
]),
metadata,
awaits_download: false,
},
);
assert!(conversion_result.is_err(), "Should not be able to convert metadata with missing layer paths that are not in the timeline directory");
}
}

View File

@@ -0,0 +1,464 @@
//! Timeline synchronization logic to compress and upload to the remote storage all new timeline files from the checkpoints.
use std::{fmt::Debug, path::PathBuf};
use anyhow::Context;
use futures::stream::{FuturesUnordered, StreamExt};
use remote_storage::RemoteStorage;
use tokio::fs;
use tracing::{debug, error, info, warn};
use crate::{
config::PageServerConf,
layered_repository::metadata::metadata_path,
storage_sync::{sync_queue, SyncTask},
};
use utils::zid::ZTenantTimelineId;
use super::{
index::{IndexPart, RemoteTimeline},
SyncData, TimelineUpload,
};
/// Serializes and uploads the given index part data to the remote storage.
pub(super) async fn upload_index_part<P, S>(
conf: &'static PageServerConf,
storage: &S,
sync_id: ZTenantTimelineId,
index_part: IndexPart,
) -> anyhow::Result<()>
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
{
let index_part_bytes = serde_json::to_vec(&index_part)
.context("Failed to serialize index part file into bytes")?;
let index_part_size = index_part_bytes.len();
let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
let index_part_path = metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id)
.with_file_name(IndexPart::FILE_NAME)
.with_extension(IndexPart::FILE_EXTENSION);
let index_part_storage_path =
storage
.remote_object_id(&index_part_path)
.with_context(|| {
format!(
"Failed to get the index part storage path for local path '{}'",
index_part_path.display()
)
})?;
storage
.upload(
index_part_bytes,
index_part_size,
&index_part_storage_path,
None,
)
.await
.with_context(|| {
format!("Failed to upload index part to the storage path '{index_part_storage_path:?}'")
})
}
/// Timeline upload result, with extra data, needed for uploading.
#[derive(Debug)]
pub(super) enum UploadedTimeline {
/// Upload failed due to some error, the upload task is rescheduled for another retry.
FailedAndRescheduled,
/// No issues happened during the upload, all task files were put into the remote storage.
Successful(SyncData<TimelineUpload>),
/// No failures happened during the upload, but some files were removed locally before the upload task completed
/// (could happen due to retries, for instance, if GC happens in the interim).
/// Such files are considered "not needed" and ignored, but the task's metadata should be discarded and the new one loaded from the local file.
SuccessfulAfterLocalFsUpdate(SyncData<TimelineUpload>),
}
/// Attempts to upload given layer files.
/// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
///
/// On an error, bumps the retries count and reschedules the entire task.
pub(super) async fn upload_timeline_layers<'a, P, S>(
storage: &'a S,
remote_timeline: Option<&'a RemoteTimeline>,
sync_id: ZTenantTimelineId,
mut upload_data: SyncData<TimelineUpload>,
) -> UploadedTimeline
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
{
let upload = &mut upload_data.data;
let new_upload_lsn = upload
.metadata
.as_ref()
.map(|meta| meta.disk_consistent_lsn());
let already_uploaded_layers = remote_timeline
.map(|timeline| timeline.stored_files())
.cloned()
.unwrap_or_default();
let layers_to_upload = upload
.layers_to_upload
.difference(&already_uploaded_layers)
.cloned()
.collect::<Vec<_>>();
debug!("Layers to upload: {layers_to_upload:?}");
info!(
"Uploading {} timeline layers, new lsn: {new_upload_lsn:?}",
layers_to_upload.len(),
);
let mut upload_tasks = layers_to_upload
.into_iter()
.map(|source_path| async move {
let storage_path = storage
.remote_object_id(&source_path)
.with_context(|| {
format!(
"Failed to get the layer storage path for local path '{}'",
source_path.display()
)
})
.map_err(UploadError::Other)?;
let source_file = match fs::File::open(&source_path).await.with_context(|| {
format!(
"Failed to upen a source file for layer '{}'",
source_path.display()
)
}) {
Ok(file) => file,
Err(e) => return Err(UploadError::MissingLocalFile(source_path, e)),
};
let source_size = source_file
.metadata()
.await
.with_context(|| {
format!(
"Failed to get the source file metadata for layer '{}'",
source_path.display()
)
})
.map_err(UploadError::Other)?
.len() as usize;
match storage
.upload(source_file, source_size, &storage_path, None)
.await
.with_context(|| {
format!(
"Failed to upload a layer from local path '{}'",
source_path.display()
)
}) {
Ok(()) => Ok(source_path),
Err(e) => Err(UploadError::MissingLocalFile(source_path, e)),
}
})
.collect::<FuturesUnordered<_>>();
let mut errors_happened = false;
let mut local_fs_updated = false;
while let Some(upload_result) = upload_tasks.next().await {
match upload_result {
Ok(uploaded_path) => {
upload.layers_to_upload.remove(&uploaded_path);
upload.uploaded_layers.insert(uploaded_path);
}
Err(e) => match e {
UploadError::Other(e) => {
errors_happened = true;
error!("Failed to upload a layer for timeline {sync_id}: {e:?}");
}
UploadError::MissingLocalFile(source_path, e) => {
if source_path.exists() {
errors_happened = true;
error!("Failed to upload a layer for timeline {sync_id}: {e:?}");
} else {
local_fs_updated = true;
upload.layers_to_upload.remove(&source_path);
warn!(
"Missing locally a layer file {} scheduled for upload, skipping",
source_path.display()
);
}
}
},
}
}
if errors_happened {
debug!("Reenqueuing failed upload task for timeline {sync_id}");
upload_data.retries += 1;
sync_queue::push(sync_id, SyncTask::Upload(upload_data));
UploadedTimeline::FailedAndRescheduled
} else if local_fs_updated {
info!("Successfully uploaded all layers, some local layers were removed during the upload");
UploadedTimeline::SuccessfulAfterLocalFsUpdate(upload_data)
} else {
info!("Successfully uploaded all layers");
UploadedTimeline::Successful(upload_data)
}
}
enum UploadError {
MissingLocalFile(PathBuf, anyhow::Error),
Other(anyhow::Error),
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeSet, HashSet};
use remote_storage::LocalFs;
use tempfile::tempdir;
use utils::lsn::Lsn;
use crate::{
repository::repo_harness::{RepoHarness, TIMELINE_ID},
storage_sync::{
index::RelativePath,
test_utils::{create_local_timeline, dummy_metadata},
},
};
use super::{upload_index_part, *};
#[tokio::test]
async fn regular_layer_upload() -> anyhow::Result<()> {
let harness = RepoHarness::create("regular_layer_upload")?;
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let layer_files = ["a", "b"];
let storage = LocalFs::new(
tempdir()?.path().to_path_buf(),
harness.conf.workdir.clone(),
)?;
let current_retries = 3;
let metadata = dummy_metadata(Lsn(0x30));
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
let mut timeline_upload =
create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?;
timeline_upload.metadata = None;
assert!(
storage.list().await?.is_empty(),
"Storage should be empty before any uploads are made"
);
let upload_result = upload_timeline_layers(
&storage,
None,
sync_id,
SyncData::new(current_retries, timeline_upload.clone()),
)
.await;
let upload_data = match upload_result {
UploadedTimeline::Successful(upload_data) => upload_data,
wrong_result => {
panic!("Expected a successful upload for timeline, but got: {wrong_result:?}")
}
};
assert_eq!(
current_retries, upload_data.retries,
"On successful upload, retries are not expected to change"
);
let upload = &upload_data.data;
assert!(
upload.layers_to_upload.is_empty(),
"Successful upload should have no layers left to upload"
);
assert_eq!(
upload
.uploaded_layers
.iter()
.cloned()
.collect::<BTreeSet<_>>(),
layer_files
.iter()
.map(|layer_file| local_timeline_path.join(layer_file))
.collect(),
"Successful upload should have all layers uploaded"
);
assert_eq!(
upload.metadata, None,
"Successful upload without metadata should not have it returned either"
);
let storage_files = storage.list().await?;
assert_eq!(
storage_files.len(),
layer_files.len(),
"All layers should be uploaded"
);
assert_eq!(
storage_files
.into_iter()
.map(|storage_path| storage.local_path(&storage_path))
.collect::<anyhow::Result<BTreeSet<_>>>()?,
layer_files
.into_iter()
.map(|file| local_timeline_path.join(file))
.collect(),
"Uploaded files should match with the local ones"
);
Ok(())
}
// Currently, GC can run between upload retries, removing local layers scheduled for upload. Test this scenario.
#[tokio::test]
async fn layer_upload_after_local_fs_update() -> anyhow::Result<()> {
let harness = RepoHarness::create("layer_upload_after_local_fs_update")?;
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let layer_files = ["a1", "b1"];
let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?;
let current_retries = 5;
let metadata = dummy_metadata(Lsn(0x40));
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
let layers_to_upload = {
let mut layers = layer_files.to_vec();
layers.push("layer_to_remove");
layers
};
let timeline_upload =
create_local_timeline(&harness, TIMELINE_ID, &layers_to_upload, metadata.clone())
.await?;
assert!(
storage.list().await?.is_empty(),
"Storage should be empty before any uploads are made"
);
fs::remove_file(local_timeline_path.join("layer_to_remove")).await?;
let upload_result = upload_timeline_layers(
&storage,
None,
sync_id,
SyncData::new(current_retries, timeline_upload.clone()),
)
.await;
let upload_data = match upload_result {
UploadedTimeline::SuccessfulAfterLocalFsUpdate(upload_data) => upload_data,
wrong_result => panic!(
"Expected a successful after local fs upload for timeline, but got: {wrong_result:?}"
),
};
assert_eq!(
current_retries, upload_data.retries,
"On successful upload, retries are not expected to change"
);
let upload = &upload_data.data;
assert!(
upload.layers_to_upload.is_empty(),
"Successful upload should have no layers left to upload, even those that were removed from the local fs"
);
assert_eq!(
upload
.uploaded_layers
.iter()
.cloned()
.collect::<BTreeSet<_>>(),
layer_files
.iter()
.map(|layer_file| local_timeline_path.join(layer_file))
.collect(),
"Successful upload should have all layers uploaded"
);
assert_eq!(
upload.metadata,
Some(metadata),
"Successful upload should not chage its metadata"
);
let storage_files = storage.list().await?;
assert_eq!(
storage_files.len(),
layer_files.len(),
"All layers should be uploaded"
);
assert_eq!(
storage_files
.into_iter()
.map(|storage_path| storage.local_path(&storage_path))
.collect::<anyhow::Result<BTreeSet<_>>>()?,
layer_files
.into_iter()
.map(|file| local_timeline_path.join(file))
.collect(),
"Uploaded files should match with the local ones"
);
Ok(())
}
#[tokio::test]
async fn test_upload_index_part() -> anyhow::Result<()> {
let harness = RepoHarness::create("test_upload_index_part")?;
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?;
let metadata = dummy_metadata(Lsn(0x40));
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
let index_part = IndexPart::new(
HashSet::from([
RelativePath::new(&local_timeline_path, local_timeline_path.join("one"))?,
RelativePath::new(&local_timeline_path, local_timeline_path.join("two"))?,
]),
HashSet::from([RelativePath::new(
&local_timeline_path,
local_timeline_path.join("three"),
)?]),
metadata.disk_consistent_lsn(),
metadata.to_bytes()?,
);
assert!(
storage.list().await?.is_empty(),
"Storage should be empty before any uploads are made"
);
upload_index_part(harness.conf, &storage, sync_id, index_part.clone()).await?;
let storage_files = storage.list().await?;
assert_eq!(
storage_files.len(),
1,
"Should have only the index part file uploaded"
);
let index_part_path = storage_files.first().unwrap();
assert_eq!(
index_part_path.file_stem().and_then(|name| name.to_str()),
Some(IndexPart::FILE_NAME),
"Remote index part should have the correct name"
);
assert_eq!(
index_part_path
.extension()
.and_then(|extension| extension.to_str()),
Some(IndexPart::FILE_EXTENSION),
"Remote index part should have the correct extension"
);
let remote_index_part: IndexPart =
serde_json::from_slice(&fs::read(&index_part_path).await?)?;
assert_eq!(
index_part, remote_index_part,
"Remote index part should match the local one"
);
Ok(())
}
}