diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs
index 4d7339ec13..91a385bf77 100644
--- a/pageserver/src/http/routes.rs
+++ b/pageserver/src/http/routes.rs
@@ -386,7 +386,7 @@ async fn tenant_attach_handler(request: Request
) -> Result,
}
return json_response(StatusCode::ACCEPTED, ());
}
- // no tenant in the index, release the lock to make the potentially lengthy download opetation
+ // no tenant in the index, release the lock to make the potentially lengthy download operation
drop(index_accessor);
// download index parts for every tenant timeline
diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs
index e8844baf5d..037fe76d7f 100644
--- a/pageserver/src/storage_sync.rs
+++ b/pageserver/src/storage_sync.rs
@@ -171,7 +171,7 @@ use self::{
use crate::{
config::PageServerConf,
exponential_backoff,
- storage_sync::index::RemoteIndex,
+ storage_sync::index::{LayerFileMetadata, RemoteIndex},
task_mgr,
task_mgr::TaskKind,
task_mgr::BACKGROUND_RUNTIME,
@@ -193,7 +193,7 @@ static SYNC_QUEUE: OnceCell = OnceCell::new();
/// A timeline status to share with pageserver's sync counterpart,
/// after comparing local and remote timeline state.
-#[derive(Clone)]
+#[derive(Clone, PartialEq, Eq)]
pub enum LocalTimelineInitStatus {
/// The timeline has every remote layer present locally.
/// There could be some layers requiring uploading,
@@ -316,7 +316,7 @@ impl SyncQueue {
/// A task to run in the async download/upload loop.
/// Limited by the number of retries, after certain threshold the failing task gets evicted and the timeline disabled.
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq, Eq)]
enum SyncTask {
/// A checkpoint outcome with possible local file updates that need actualization in the remote storage.
/// Not necessary more fresh than the one already uploaded.
@@ -427,7 +427,7 @@ impl SyncTaskBatch {
.extend(new_delete.data.deleted_layers.iter().cloned());
}
if let Some(batch_upload) = &mut self.upload {
- let not_deleted = |layer: &PathBuf| {
+ let not_deleted = |layer: &PathBuf, _: &mut LayerFileMetadata| {
!new_delete.data.layers_to_delete.contains(layer)
&& !new_delete.data.deleted_layers.contains(layer)
};
@@ -455,21 +455,35 @@ impl SyncTaskBatch {
#[derive(Debug, Clone, PartialEq, Eq)]
struct LayersUpload {
/// Layer file path in the pageserver workdir, that were added for the corresponding checkpoint.
- layers_to_upload: HashSet,
+ layers_to_upload: HashMap,
/// Already uploaded layers. Used to store the data about the uploads between task retries
/// and to record the data into the remote index after the task got completed or evicted.
- uploaded_layers: HashSet,
+ uploaded_layers: HashMap,
metadata: Option,
}
/// A timeline download task.
/// Does not contain the file list to download, to allow other
/// parts of the pageserer code to schedule the task
-/// without using the remote index or any other ways to list the remote timleine files.
+/// without using the remote index or any other ways to list the remote timeline files.
/// Skips the files that are already downloaded.
#[derive(Debug, Clone, PartialEq, Eq)]
struct LayersDownload {
layers_to_skip: HashSet,
+
+ /// Paths which have been downloaded, and had their metadata verified or generated.
+ ///
+ /// Metadata generation happens when upgrading from past version of `IndexPart`.
+ gathered_metadata: HashMap,
+}
+
+impl LayersDownload {
+ fn from_skipped_layers(layers_to_skip: HashSet) -> Self {
+ LayersDownload {
+ layers_to_skip,
+ gathered_metadata: HashMap::default(),
+ }
+ }
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -491,7 +505,7 @@ struct LayersDeletion {
pub fn schedule_layer_upload(
tenant_id: TenantId,
timeline_id: TimelineId,
- layers_to_upload: HashSet,
+ layers_to_upload: HashMap,
metadata: Option,
) {
let sync_queue = match SYNC_QUEUE.get() {
@@ -508,7 +522,7 @@ pub fn schedule_layer_upload(
},
SyncTask::upload(LayersUpload {
layers_to_upload,
- uploaded_layers: HashSet::new(),
+ uploaded_layers: HashMap::new(),
metadata,
}),
);
@@ -566,21 +580,44 @@ pub fn schedule_layer_download(tenant_id: TenantId, timeline_id: TimelineId) {
tenant_id,
timeline_id,
},
- SyncTask::download(LayersDownload {
- layers_to_skip: HashSet::new(),
- }),
+ SyncTask::download(LayersDownload::from_skipped_layers(HashSet::new())),
);
debug!("Download task for tenant {tenant_id}, timeline {timeline_id} sent")
}
+/// Local existing timeline files
+///
+/// Values of this type serve different meanings in different contexts. On startup, collected
+/// timelines come with the full collected information and when signalling readyness to attach
+/// after completed download. After the download the file information is no longer carried, because
+/// it is already merged into [`RemoteTimeline`].
+#[derive(Debug)]
+pub struct TimelineLocalFiles(TimelineMetadata, HashMap);
+
+impl TimelineLocalFiles {
+ pub fn metadata(&self) -> &TimelineMetadata {
+ &self.0
+ }
+
+ /// Called during startup, for all of the local files with full metadata.
+ pub(crate) fn collected(
+ metadata: TimelineMetadata,
+ timeline_files: HashMap,
+ ) -> TimelineLocalFiles {
+ TimelineLocalFiles(metadata, timeline_files)
+ }
+
+ /// Called near the end of tenant initialization, to signal readyness to attach tenants.
+ pub(crate) fn ready(metadata: TimelineMetadata) -> Self {
+ TimelineLocalFiles(metadata, HashMap::new())
+ }
+}
+
/// Launch a thread to perform remote storage sync tasks.
/// See module docs for loop step description.
pub fn spawn_storage_sync_task(
conf: &'static PageServerConf,
- local_timeline_files: HashMap<
- TenantId,
- HashMap)>,
- >,
+ local_timeline_files: HashMap>,
storage: GenericRemoteStorage,
max_concurrent_timelines_sync: NonZeroUsize,
max_sync_errors: NonZeroU32,
@@ -738,7 +775,7 @@ async fn storage_sync_loop(
tenant_entry
.iter()
.map(|(&id, entry)| {
- (id, (entry.metadata.clone(), HashSet::new()))
+ (id, TimelineLocalFiles::ready(entry.metadata.clone()))
})
.collect(),
),
@@ -983,15 +1020,27 @@ async fn download_timeline_data(
}
DownloadedTimeline::Successful(mut download_data) => {
match update_local_metadata(conf, sync_id, current_remote_timeline).await {
- Ok(()) => match index.write().await.set_awaits_download(&sync_id, false) {
- Ok(()) => {
- register_sync_status(sync_id, sync_start, TASK_NAME, Some(true));
- return DownloadStatus::Downloaded;
- }
- Err(e) => {
- error!("Timeline {sync_id} was expected to be in the remote index after a successful download, but it's absent: {e:?}");
- }
- },
+ Ok(()) => {
+ let mut g = index.write().await;
+
+ match g.set_awaits_download(&sync_id, false) {
+ Ok(()) => {
+ let timeline = g
+ .timeline_entry_mut(&sync_id)
+ .expect("set_awaits_download verified existence");
+
+ timeline.merge_metadata_from_downloaded(
+ &download_data.data.gathered_metadata,
+ );
+
+ register_sync_status(sync_id, sync_start, TASK_NAME, Some(true));
+ return DownloadStatus::Downloaded;
+ }
+ Err(e) => {
+ error!("Timeline {sync_id} was expected to be in the remote index after a successful download, but it's absent: {e:?}");
+ }
+ };
+ }
Err(e) => {
error!("Failed to update local timeline metadata: {e:?}");
download_data.retries += 1;
@@ -1194,11 +1243,18 @@ async fn update_remote_data(
}
if upload_failed {
existing_entry.add_upload_failures(
- uploaded_data.layers_to_upload.iter().cloned(),
+ uploaded_data
+ .layers_to_upload
+ .iter()
+ .map(|(k, v)| (k.to_owned(), v.to_owned())),
);
} else {
- existing_entry
- .add_timeline_layers(uploaded_data.uploaded_layers.iter().cloned());
+ existing_entry.add_timeline_layers(
+ uploaded_data
+ .uploaded_layers
+ .iter()
+ .map(|(k, v)| (k.to_owned(), v.to_owned())),
+ );
}
}
RemoteDataUpdate::Delete(layers_to_remove) => {
@@ -1218,11 +1274,19 @@ async fn update_remote_data(
};
let mut new_remote_timeline = RemoteTimeline::new(new_metadata.clone());
if upload_failed {
- new_remote_timeline
- .add_upload_failures(uploaded_data.layers_to_upload.iter().cloned());
+ new_remote_timeline.add_upload_failures(
+ uploaded_data
+ .layers_to_upload
+ .iter()
+ .map(|(k, v)| (k.to_owned(), v.to_owned())),
+ );
} else {
- new_remote_timeline
- .add_timeline_layers(uploaded_data.uploaded_layers.iter().cloned());
+ new_remote_timeline.add_timeline_layers(
+ uploaded_data
+ .uploaded_layers
+ .iter()
+ .map(|(k, v)| (k.to_owned(), v.to_owned())),
+ );
}
index_accessor.add_timeline_entry(sync_id, new_remote_timeline.clone());
@@ -1270,13 +1334,14 @@ async fn validate_task_retries(
fn schedule_first_sync_tasks(
index: &mut RemoteTimelineIndex,
sync_queue: &SyncQueue,
- local_timeline_files: HashMap)>,
+ local_timeline_files: HashMap,
) -> TenantTimelineValues {
let mut local_timeline_init_statuses = TenantTimelineValues::new();
let mut new_sync_tasks = VecDeque::with_capacity(local_timeline_files.len());
- for (sync_id, (local_metadata, local_files)) in local_timeline_files {
+ for (sync_id, local_timeline) in local_timeline_files {
+ let TimelineLocalFiles(local_metadata, local_files) = local_timeline;
match index.timeline_entry_mut(&sync_id) {
Some(remote_timeline) => {
let (timeline_status, awaits_download) = compare_local_and_remote_timeline(
@@ -1320,7 +1385,7 @@ fn schedule_first_sync_tasks(
sync_id,
SyncTask::upload(LayersUpload {
layers_to_upload: local_files,
- uploaded_layers: HashSet::new(),
+ uploaded_layers: HashMap::new(),
metadata: Some(local_metadata.clone()),
}),
));
@@ -1347,20 +1412,46 @@ fn compare_local_and_remote_timeline(
new_sync_tasks: &mut VecDeque<(TenantTimelineId, SyncTask)>,
sync_id: TenantTimelineId,
local_metadata: TimelineMetadata,
- local_files: HashSet,
+ local_files: HashMap,
remote_entry: &RemoteTimeline,
) -> (LocalTimelineInitStatus, bool) {
let _entered = info_span!("compare_local_and_remote_timeline", sync_id = %sync_id).entered();
- let remote_files = remote_entry.stored_files();
+ let needed_to_download_files = remote_entry
+ .stored_files()
+ .iter()
+ .filter_map(|(layer_file, remote_metadata)| {
+ if let Some(local_metadata) = local_files.get(layer_file) {
+ match (remote_metadata.file_size(), local_metadata.file_size()) {
+ (Some(x), Some(y)) if x == y => { None },
+ (None, Some(_)) => {
+ // upgrading from an earlier IndexPart without metadata
+ None
+ },
+ _ => {
+ // having to deal with other than (Some(x), Some(y)) where x != y here is a
+ // bummer, but see #2582 and #2610 for attempts and discussion.
+ warn!("Redownloading locally existing {layer_file:?} due to size mismatch, size on index: {:?}, on disk: {:?}", remote_metadata.file_size(), local_metadata.file_size());
+ Some(layer_file)
+ },
+ }
+ } else {
+ // doesn't exist locally
+ Some(layer_file)
+ }
+ })
+ .collect::>();
- let number_of_layers_to_download = remote_files.difference(&local_files).count();
- let (initial_timeline_status, awaits_download) = if number_of_layers_to_download > 0 {
+ let (initial_timeline_status, awaits_download) = if !needed_to_download_files.is_empty() {
new_sync_tasks.push_back((
sync_id,
- SyncTask::download(LayersDownload {
- layers_to_skip: local_files.clone(),
- }),
+ SyncTask::download(LayersDownload::from_skipped_layers(
+ local_files
+ .keys()
+ .filter(|path| !needed_to_download_files.contains(path))
+ .cloned()
+ .collect(),
+ )),
));
info!("NeedsSync");
(LocalTimelineInitStatus::NeedsSync, true)
@@ -1375,15 +1466,22 @@ fn compare_local_and_remote_timeline(
};
let layers_to_upload = local_files
- .difference(remote_files)
- .cloned()
- .collect::>();
+ .iter()
+ .filter_map(|(local_file, metadata)| {
+ if !remote_entry.stored_files().contains_key(local_file) {
+ Some((local_file.to_owned(), metadata.to_owned()))
+ } else {
+ None
+ }
+ })
+ .collect::>();
+
if !layers_to_upload.is_empty() {
new_sync_tasks.push_back((
sync_id,
SyncTask::upload(LayersUpload {
layers_to_upload,
- uploaded_layers: HashSet::new(),
+ uploaded_layers: HashMap::new(),
metadata: Some(local_metadata),
}),
));
@@ -1439,11 +1537,12 @@ mod test_utils {
let timeline_path = harness.timeline_path(&timeline_id);
fs::create_dir_all(&timeline_path).await?;
- let mut layers_to_upload = HashSet::with_capacity(filenames.len());
+ let mut layers_to_upload = HashMap::with_capacity(filenames.len());
for &file in filenames {
let file_path = timeline_path.join(file);
fs::write(&file_path, dummy_contents(file).into_bytes()).await?;
- layers_to_upload.insert(file_path);
+ let metadata = LayerFileMetadata::new(file_path.metadata()?.len());
+ layers_to_upload.insert(file_path, metadata);
}
fs::write(
@@ -1454,7 +1553,7 @@ mod test_utils {
Ok(LayersUpload {
layers_to_upload,
- uploaded_layers: HashSet::new(),
+ uploaded_layers: HashMap::new(),
metadata: Some(metadata),
})
}
@@ -1509,12 +1608,13 @@ mod tests {
assert!(sync_id_2 != sync_id_3);
assert!(sync_id_3 != TEST_SYNC_ID);
- let download_task = SyncTask::download(LayersDownload {
- layers_to_skip: HashSet::from([PathBuf::from("sk")]),
- });
+ let download_task =
+ SyncTask::download(LayersDownload::from_skipped_layers(HashSet::from([
+ PathBuf::from("sk"),
+ ])));
let upload_task = SyncTask::upload(LayersUpload {
- layers_to_upload: HashSet::from([PathBuf::from("up")]),
- uploaded_layers: HashSet::from([PathBuf::from("upl")]),
+ layers_to_upload: HashMap::from([(PathBuf::from("up"), LayerFileMetadata::new(123))]),
+ uploaded_layers: HashMap::from([(PathBuf::from("upl"), LayerFileMetadata::new(123))]),
metadata: Some(dummy_metadata(Lsn(2))),
});
let delete_task = SyncTask::delete(LayersDeletion {
@@ -1558,12 +1658,10 @@ mod tests {
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
assert_eq!(sync_queue.len(), 0);
- let download = LayersDownload {
- layers_to_skip: HashSet::from([PathBuf::from("sk")]),
- };
+ let download = LayersDownload::from_skipped_layers(HashSet::from([PathBuf::from("sk")]));
let upload = LayersUpload {
- layers_to_upload: HashSet::from([PathBuf::from("up")]),
- uploaded_layers: HashSet::from([PathBuf::from("upl")]),
+ layers_to_upload: HashMap::from([(PathBuf::from("up"), LayerFileMetadata::new(123))]),
+ uploaded_layers: HashMap::from([(PathBuf::from("upl"), LayerFileMetadata::new(123))]),
metadata: Some(dummy_metadata(Lsn(2))),
};
let delete = LayersDeletion {
@@ -1611,18 +1709,10 @@ mod tests {
#[tokio::test]
async fn same_task_id_same_tasks_batch() {
let sync_queue = SyncQueue::new(NonZeroUsize::new(1).unwrap());
- let download_1 = LayersDownload {
- layers_to_skip: HashSet::from([PathBuf::from("sk1")]),
- };
- let download_2 = LayersDownload {
- layers_to_skip: HashSet::from([PathBuf::from("sk2")]),
- };
- let download_3 = LayersDownload {
- layers_to_skip: HashSet::from([PathBuf::from("sk3")]),
- };
- let download_4 = LayersDownload {
- layers_to_skip: HashSet::from([PathBuf::from("sk4")]),
- };
+ let download_1 = LayersDownload::from_skipped_layers(HashSet::from([PathBuf::from("sk1")]));
+ let download_2 = LayersDownload::from_skipped_layers(HashSet::from([PathBuf::from("sk2")]));
+ let download_3 = LayersDownload::from_skipped_layers(HashSet::from([PathBuf::from("sk3")]));
+ let download_4 = LayersDownload::from_skipped_layers(HashSet::from([PathBuf::from("sk4")]));
let sync_id_2 = TenantTimelineId {
tenant_id: TenantId::from_array(hex!("22223344556677881122334455667788")),
@@ -1646,15 +1736,15 @@ mod tests {
Some(SyncTaskBatch {
download: Some(SyncData {
retries: 0,
- data: LayersDownload {
- layers_to_skip: {
+ data: LayersDownload::from_skipped_layers(
+ {
let mut set = HashSet::new();
set.extend(download_1.layers_to_skip.into_iter());
set.extend(download_2.layers_to_skip.into_iter());
set.extend(download_4.layers_to_skip.into_iter());
set
},
- }
+ )
}),
upload: None,
delete: None,
@@ -1670,4 +1760,148 @@ mod tests {
"Should have one task left out of the batch"
);
}
+
+ mod local_and_remote_comparisons {
+ use super::*;
+
+ #[test]
+ fn ready() {
+ let mut new_sync_tasks = VecDeque::default();
+ let sync_id = TenantTimelineId::generate();
+ let local_metadata = dummy_metadata(0x02.into());
+ let local_files =
+ HashMap::from([(PathBuf::from("first_file"), LayerFileMetadata::new(123))]);
+ let mut remote_entry = RemoteTimeline::new(local_metadata.clone());
+ remote_entry
+ .add_timeline_layers([(PathBuf::from("first_file"), LayerFileMetadata::new(123))]);
+
+ let (status, sync_needed) = compare_local_and_remote_timeline(
+ &mut new_sync_tasks,
+ sync_id,
+ local_metadata.clone(),
+ local_files,
+ &remote_entry,
+ );
+
+ assert_eq!(
+ status,
+ LocalTimelineInitStatus::LocallyComplete(local_metadata)
+ );
+ assert!(!sync_needed);
+
+ assert!(new_sync_tasks.is_empty(), "{:?}", new_sync_tasks);
+ }
+
+ #[test]
+ fn needs_download() {
+ let mut new_sync_tasks = VecDeque::default();
+ let sync_id = TenantTimelineId::generate();
+ let local_metadata = dummy_metadata(0x02.into());
+ let local_files = HashMap::default();
+ let mut remote_entry = RemoteTimeline::new(local_metadata.clone());
+ remote_entry
+ .add_timeline_layers([(PathBuf::from("first_file"), LayerFileMetadata::new(123))]);
+
+ let (status, sync_needed) = compare_local_and_remote_timeline(
+ &mut new_sync_tasks,
+ sync_id,
+ local_metadata,
+ local_files.clone(),
+ &remote_entry,
+ );
+
+ assert_eq!(status, LocalTimelineInitStatus::NeedsSync);
+ assert!(sync_needed);
+
+ let new_sync_tasks = new_sync_tasks.into_iter().collect::>();
+
+ assert_eq!(
+ &new_sync_tasks,
+ &[(
+ sync_id,
+ SyncTask::download(LayersDownload::from_skipped_layers(
+ local_files.keys().cloned().collect()
+ ))
+ )]
+ );
+ }
+
+ #[test]
+ fn redownload_is_not_needed_on_upgrade() {
+ // originally the implementation missed the `(None, Some(_))` case in the match, and
+ // proceeded to always redownload if the remote metadata was not available.
+
+ let mut new_sync_tasks = VecDeque::default();
+ let sync_id = TenantTimelineId::generate();
+
+ let local_metadata = dummy_metadata(0x02.into());
+
+ // type system would in general allow that LayerFileMetadata would be created with
+ // file_size: None, however `LayerFileMetadata::default` is only allowed from tests,
+ // and so everywhere within the system valid LayerFileMetadata is being created, it is
+ // created through `::new`.
+ let local_files =
+ HashMap::from([(PathBuf::from("first_file"), LayerFileMetadata::new(123))]);
+
+ let mut remote_entry = RemoteTimeline::new(local_metadata.clone());
+
+ // RemoteTimeline is constructed out of an older version IndexPart, which didn't carry
+ // any metadata.
+ remote_entry
+ .add_timeline_layers([(PathBuf::from("first_file"), LayerFileMetadata::default())]);
+
+ let (status, sync_needed) = compare_local_and_remote_timeline(
+ &mut new_sync_tasks,
+ sync_id,
+ local_metadata.clone(),
+ local_files,
+ &remote_entry,
+ );
+
+ assert_eq!(
+ status,
+ LocalTimelineInitStatus::LocallyComplete(local_metadata)
+ );
+ assert!(!sync_needed);
+ }
+
+ #[test]
+ fn needs_upload() {
+ let mut new_sync_tasks = VecDeque::default();
+ let sync_id = TenantTimelineId::generate();
+ let local_metadata = dummy_metadata(0x02.into());
+ let local_files =
+ HashMap::from([(PathBuf::from("first_file"), LayerFileMetadata::new(123))]);
+ let mut remote_entry = RemoteTimeline::new(local_metadata.clone());
+ remote_entry.add_timeline_layers([]);
+
+ let (status, sync_needed) = compare_local_and_remote_timeline(
+ &mut new_sync_tasks,
+ sync_id,
+ local_metadata.clone(),
+ local_files.clone(),
+ &remote_entry,
+ );
+
+ assert_eq!(
+ status,
+ LocalTimelineInitStatus::LocallyComplete(local_metadata.clone())
+ );
+ assert!(!sync_needed);
+
+ let new_sync_tasks = new_sync_tasks.into_iter().collect::>();
+
+ assert_eq!(
+ &new_sync_tasks,
+ &[(
+ sync_id,
+ SyncTask::upload(LayersUpload {
+ layers_to_upload: local_files,
+ uploaded_layers: HashMap::default(),
+ metadata: Some(local_metadata),
+ })
+ )]
+ );
+ }
+ }
}
diff --git a/pageserver/src/storage_sync/delete.rs b/pageserver/src/storage_sync/delete.rs
index 21a3372e70..39846f0da3 100644
--- a/pageserver/src/storage_sync/delete.rs
+++ b/pageserver/src/storage_sync/delete.rs
@@ -171,7 +171,7 @@ mod tests {
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
let timeline_upload =
create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?;
- for local_path in timeline_upload.layers_to_upload {
+ for (local_path, _metadata) in timeline_upload.layers_to_upload {
let remote_path =
local_storage.resolve_in_storage(&local_storage.remote_object_id(&local_path)?)?;
let remote_parent_dir = remote_path.parent().unwrap();
diff --git a/pageserver/src/storage_sync/download.rs b/pageserver/src/storage_sync/download.rs
index 3e850443d8..61ef164f14 100644
--- a/pageserver/src/storage_sync/download.rs
+++ b/pageserver/src/storage_sync/download.rs
@@ -16,7 +16,11 @@ use tokio::{
};
use tracing::{debug, error, info, warn};
-use crate::{config::PageServerConf, storage_sync::SyncTask, TEMP_FILE_SUFFIX};
+use crate::{
+ config::PageServerConf,
+ storage_sync::{index::LayerFileMetadata, SyncTask},
+ TEMP_FILE_SUFFIX,
+};
use utils::{
crashsafe_dir::path_with_suffix_extension,
id::{TenantId, TenantTimelineId, TimelineId},
@@ -219,8 +223,14 @@ pub(super) async fn download_timeline_layers<'a>(
let layers_to_download = remote_timeline
.stored_files()
- .difference(&download.layers_to_skip)
- .cloned()
+ .iter()
+ .filter_map(|(layer_path, metadata)| {
+ if !download.layers_to_skip.contains(layer_path) {
+ Some((layer_path.to_owned(), metadata.to_owned()))
+ } else {
+ None
+ }
+ })
.collect::>();
debug!("Layers to download: {layers_to_download:?}");
@@ -233,89 +243,129 @@ pub(super) async fn download_timeline_layers<'a>(
let mut download_tasks = layers_to_download
.into_iter()
- .map(|layer_destination_path| async move {
- if layer_destination_path.exists() {
- debug!(
- "Layer already exists locally, skipping download: {}",
- layer_destination_path.display()
- );
- } else {
- // Perform a rename inspired by durable_rename from file_utils.c.
- // The sequence:
- // write(tmp)
- // fsync(tmp)
- // rename(tmp, new)
- // fsync(new)
- // fsync(parent)
- // For more context about durable_rename check this email from postgres mailing list:
- // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
- // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
- let temp_file_path =
- path_with_suffix_extension(&layer_destination_path, TEMP_FILE_SUFFIX);
+ .map(|(layer_destination_path, metadata)| async move {
- let mut destination_file =
- fs::File::create(&temp_file_path).await.with_context(|| {
- format!(
- "Failed to create a destination file for layer '{}'",
- temp_file_path.display()
- )
- })?;
+ match layer_destination_path.metadata() {
+ Ok(m) if m.is_file() => {
+ // the file exists from earlier round when we failed after renaming it as
+ // layer_destination_path
+ let verified = if let Some(expected) = metadata.file_size() {
+ m.len() == expected
+ } else {
+ // behaviour before recording metadata was to accept any existing
+ true
+ };
- let mut layer_download = storage.download_storage_object(None, &layer_destination_path)
- .await
- .with_context(|| {
- format!(
- "Failed to initiate the download the layer for {sync_id} into file '{}'",
- temp_file_path.display()
- )
- })?;
- io::copy(&mut layer_download.download_stream, &mut destination_file)
- .await
- .with_context(|| {
- format!(
- "Failed to download the layer for {sync_id} into file '{}'",
- temp_file_path.display()
- )
- })?;
-
- // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
- // A file will not be closed immediately when it goes out of scope if there are any IO operations
- // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
- // you should call flush before dropping it.
- //
- // From the tokio code I see that it waits for pending operations to complete. There shouldn't be any because
- // we assume that `destination_file` file is fully written. I.e there is no pending .write(...).await operations.
- // But for additional safety let's check/wait for any pending operations.
- destination_file.flush().await.with_context(|| {
- format!(
- "failed to flush source file at {}",
- temp_file_path.display()
- )
- })?;
-
- // not using sync_data because it can lose file size update
- destination_file.sync_all().await.with_context(|| {
- format!(
- "failed to fsync source file at {}",
- temp_file_path.display()
- )
- })?;
- drop(destination_file);
-
- fail::fail_point!("remote-storage-download-pre-rename", |_| {
- anyhow::bail!("remote-storage-download-pre-rename failpoint triggered")
- });
-
- fs::rename(&temp_file_path, &layer_destination_path).await?;
-
- fsync_path(&layer_destination_path).await.with_context(|| {
- format!(
- "Cannot fsync layer destination path {}",
- layer_destination_path.display(),
- )
- })?;
+ if verified {
+ debug!(
+ "Layer already exists locally, skipping download: {}",
+ layer_destination_path.display()
+ );
+ return Ok((layer_destination_path, LayerFileMetadata::new(m.len())))
+ } else {
+ // no need to remove it, it will be overwritten by fs::rename
+ // after successful download
+ warn!("Downloaded layer exists already but layer file metadata mismatches: {}, metadata {:?}", layer_destination_path.display(), metadata);
+ }
+ }
+ Ok(m) => {
+ return Err(anyhow::anyhow!("Downloaded layer destination exists but is not a file: {m:?}, target needs to be removed/archived manually: {layer_destination_path:?}"));
+ }
+ Err(_) => {
+ // behave as the file didn't exist
+ }
}
- Ok::<_, anyhow::Error>(layer_destination_path)
+
+ // Perform a rename inspired by durable_rename from file_utils.c.
+ // The sequence:
+ // write(tmp)
+ // fsync(tmp)
+ // rename(tmp, new)
+ // fsync(new)
+ // fsync(parent)
+ // For more context about durable_rename check this email from postgres mailing list:
+ // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
+ // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
+ let temp_file_path =
+ path_with_suffix_extension(&layer_destination_path, TEMP_FILE_SUFFIX);
+
+ // TODO: this doesn't use the cached fd for some reason?
+ let mut destination_file =
+ fs::File::create(&temp_file_path).await.with_context(|| {
+ format!(
+ "Failed to create a destination file for layer '{}'",
+ temp_file_path.display()
+ )
+ })?;
+
+ let mut layer_download = storage.download_storage_object(None, &layer_destination_path)
+ .await
+ .with_context(|| {
+ format!(
+ "Failed to initiate the download the layer for {sync_id} into file '{}'",
+ temp_file_path.display()
+ )
+ })?;
+
+ let bytes_amount = io::copy(&mut layer_download.download_stream, &mut destination_file)
+ .await
+ .with_context(|| {
+ format!(
+ "Failed to download the layer for {sync_id} into file '{}'",
+ temp_file_path.display()
+ )
+ })?;
+
+ // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
+ // A file will not be closed immediately when it goes out of scope if there are any IO operations
+ // that have not yet completed. To ensure that a file is closed immediately when it is dropped,
+ // you should call flush before dropping it.
+ //
+ // From the tokio code I see that it waits for pending operations to complete. There shouldn't be any because
+ // we assume that `destination_file` file is fully written. I.e there is no pending .write(...).await operations.
+ // But for additional safety let's check/wait for any pending operations.
+ destination_file.flush().await.with_context(|| {
+ format!(
+ "failed to flush source file at {}",
+ temp_file_path.display()
+ )
+ })?;
+
+ match metadata.file_size() {
+ Some(expected) if expected != bytes_amount => {
+ anyhow::bail!(
+ "According to layer file metadata should had downloaded {expected} bytes but downloaded {bytes_amount} bytes into file '{}'",
+ temp_file_path.display()
+ );
+ },
+ Some(_) | None => {
+ // matches, or upgrading from an earlier IndexPart version
+ }
+ }
+
+ // not using sync_data because it can lose file size update
+ destination_file.sync_all().await.with_context(|| {
+ format!(
+ "failed to fsync source file at {}",
+ temp_file_path.display()
+ )
+ })?;
+ drop(destination_file);
+
+ fail::fail_point!("remote-storage-download-pre-rename", |_| {
+ anyhow::bail!("remote-storage-download-pre-rename failpoint triggered")
+ });
+
+ fs::rename(&temp_file_path, &layer_destination_path).await?;
+
+ fsync_path(&layer_destination_path).await.with_context(|| {
+ format!(
+ "Cannot fsync layer destination path {}",
+ layer_destination_path.display(),
+ )
+ })?;
+
+ Ok::<_, anyhow::Error>((layer_destination_path, LayerFileMetadata::new(bytes_amount)))
})
.collect::>();
@@ -324,9 +374,12 @@ pub(super) async fn download_timeline_layers<'a>(
let mut undo = HashSet::new();
while let Some(download_result) = download_tasks.next().await {
match download_result {
- Ok(downloaded_path) => {
+ Ok((downloaded_path, metadata)) => {
undo.insert(downloaded_path.clone());
- download.layers_to_skip.insert(downloaded_path);
+ download.layers_to_skip.insert(downloaded_path.clone());
+ // what if the key existed already? ignore, because then we would had
+ // downloaded a partial file, and had to retry
+ download.gathered_metadata.insert(downloaded_path, metadata);
}
Err(e) => {
errors_happened = true;
@@ -349,6 +402,8 @@ pub(super) async fn download_timeline_layers<'a>(
);
for item in undo {
download.layers_to_skip.remove(&item);
+ // intentionally don't clear the gathered_metadata because it exists for fsync_path
+ // failure on parent directory
}
errors_happened = true;
}
@@ -453,9 +508,9 @@ mod tests {
let timeline_upload =
create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?;
- for local_path in timeline_upload.layers_to_upload {
+ for local_path in timeline_upload.layers_to_upload.keys() {
let remote_path =
- local_storage.resolve_in_storage(&storage.remote_object_id(&local_path)?)?;
+ local_storage.resolve_in_storage(&storage.remote_object_id(local_path)?)?;
let remote_parent_dir = remote_path.parent().unwrap();
if !remote_parent_dir.exists() {
fs::create_dir_all(&remote_parent_dir).await?;
@@ -473,11 +528,19 @@ mod tests {
let mut remote_timeline = RemoteTimeline::new(metadata.clone());
remote_timeline.awaits_download = true;
- remote_timeline.add_timeline_layers(
- layer_files
- .iter()
- .map(|layer| local_timeline_path.join(layer)),
- );
+ remote_timeline.add_timeline_layers(layer_files.iter().map(|layer| {
+ let layer_path = local_timeline_path.join(layer);
+
+ // this could had also been LayerFileMetadata::default(), but since in this test we
+ // don't do the merge operation done by storage_sync::download_timeline_data, it would
+ // not be merged back to timeline.
+ let metadata_from_upload = timeline_upload
+ .layers_to_upload
+ .get(&layer_path)
+ .expect("layer must exist in previously uploaded paths")
+ .to_owned();
+ (layer_path, metadata_from_upload)
+ }));
let download_data = match download_timeline_layers(
harness.conf,
@@ -487,9 +550,9 @@ mod tests {
sync_id,
SyncData::new(
current_retries,
- LayersDownload {
- layers_to_skip: HashSet::from([local_timeline_path.join("layer_to_skip")]),
- },
+ LayersDownload::from_skipped_layers(HashSet::from([
+ local_timeline_path.join("layer_to_skip")
+ ])),
),
)
.await
@@ -552,12 +615,7 @@ mod tests {
&sync_queue,
None,
sync_id,
- SyncData::new(
- 0,
- LayersDownload {
- layers_to_skip: HashSet::new(),
- },
- ),
+ SyncData::new(0, LayersDownload::from_skipped_layers(HashSet::new())),
)
.await;
assert!(
@@ -576,12 +634,7 @@ mod tests {
&sync_queue,
Some(¬_expecting_download_remote_timeline),
sync_id,
- SyncData::new(
- 0,
- LayersDownload {
- layers_to_skip: HashSet::new(),
- },
- ),
+ SyncData::new(0, LayersDownload::from_skipped_layers(HashSet::new())),
)
.await;
assert!(
diff --git a/pageserver/src/storage_sync/index.rs b/pageserver/src/storage_sync/index.rs
index db37c7b411..0779d95e8e 100644
--- a/pageserver/src/storage_sync/index.rs
+++ b/pageserver/src/storage_sync/index.rs
@@ -212,8 +212,8 @@ impl RemoteTimelineIndex {
/// Restored index part data about the timeline, stored in the remote index.
#[derive(Debug, Clone)]
pub struct RemoteTimeline {
- timeline_layers: HashSet,
- missing_layers: HashSet,
+ timeline_layers: HashMap,
+ missing_layers: HashMap,
pub metadata: TimelineMetadata,
pub awaits_download: bool,
@@ -222,62 +222,161 @@ pub struct RemoteTimeline {
impl RemoteTimeline {
pub fn new(metadata: TimelineMetadata) -> Self {
Self {
- timeline_layers: HashSet::new(),
- missing_layers: HashSet::new(),
+ timeline_layers: HashMap::default(),
+ missing_layers: HashMap::default(),
metadata,
awaits_download: false,
}
}
- pub fn add_timeline_layers(&mut self, new_layers: impl IntoIterator- ) {
- self.timeline_layers.extend(new_layers.into_iter());
+ pub fn add_timeline_layers(
+ &mut self,
+ new_layers: impl IntoIterator
- ,
+ ) {
+ self.timeline_layers.extend(new_layers);
}
- pub fn add_upload_failures(&mut self, upload_failures: impl IntoIterator
- ) {
- self.missing_layers.extend(upload_failures.into_iter());
+ pub fn add_upload_failures(
+ &mut self,
+ upload_failures: impl IntoIterator
- ,
+ ) {
+ self.missing_layers.extend(upload_failures);
}
pub fn remove_layers(&mut self, layers_to_remove: &HashSet) {
self.timeline_layers
- .retain(|layer| !layers_to_remove.contains(layer));
+ .retain(|layer, _| !layers_to_remove.contains(layer));
self.missing_layers
- .retain(|layer| !layers_to_remove.contains(layer));
+ .retain(|layer, _| !layers_to_remove.contains(layer));
}
/// Lists all layer files in the given remote timeline. Omits the metadata file.
- pub fn stored_files(&self) -> &HashSet {
+ pub fn stored_files(&self) -> &HashMap {
&self.timeline_layers
}
+ /// Combines metadata gathered or verified during downloading needed layer files to metadata on
+ /// the [`RemoteIndex`], so it can be uploaded later.
+ pub fn merge_metadata_from_downloaded(
+ &mut self,
+ downloaded: &HashMap,
+ ) {
+ downloaded.iter().for_each(|(path, metadata)| {
+ if let Some(upgraded) = self.timeline_layers.get_mut(path) {
+ upgraded.merge(metadata);
+ }
+ });
+ }
+
pub fn from_index_part(timeline_path: &Path, index_part: IndexPart) -> anyhow::Result {
let metadata = TimelineMetadata::from_bytes(&index_part.metadata_bytes)?;
+ let default_metadata = &IndexLayerMetadata::default();
+
+ let find_metadata = |key: &RelativePath| -> LayerFileMetadata {
+ index_part
+ .layer_metadata
+ .get(key)
+ .unwrap_or(default_metadata)
+ .into()
+ };
+
Ok(Self {
- timeline_layers: to_local_paths(timeline_path, index_part.timeline_layers),
- missing_layers: to_local_paths(timeline_path, index_part.missing_layers),
+ timeline_layers: index_part
+ .timeline_layers
+ .iter()
+ .map(|layer_path| (layer_path.as_path(timeline_path), find_metadata(layer_path)))
+ .collect(),
+ missing_layers: index_part
+ .missing_layers
+ .iter()
+ .map(|layer_path| (layer_path.as_path(timeline_path), find_metadata(layer_path)))
+ .collect(),
metadata,
awaits_download: false,
})
}
}
+/// Metadata gathered for each of the layer files.
+///
+/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
+/// might have less or more metadata depending if upgrading or rolling back an upgrade.
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
+#[cfg_attr(test, derive(Default))]
+pub struct LayerFileMetadata {
+ file_size: Option,
+}
+
+impl From<&'_ IndexLayerMetadata> for LayerFileMetadata {
+ fn from(other: &IndexLayerMetadata) -> Self {
+ LayerFileMetadata {
+ file_size: other.file_size,
+ }
+ }
+}
+
+impl LayerFileMetadata {
+ pub fn new(file_size: u64) -> Self {
+ LayerFileMetadata {
+ file_size: Some(file_size),
+ }
+ }
+
+ pub fn file_size(&self) -> Option {
+ self.file_size
+ }
+
+ /// Metadata has holes due to version upgrades. This method is called to upgrade self with the
+ /// other value.
+ ///
+ /// This is called on the possibly outdated version.
+ pub fn merge(&mut self, other: &Self) {
+ self.file_size = other.file_size.or(self.file_size);
+ }
+}
+
/// Part of the remote index, corresponding to a certain timeline.
/// Contains the data about all files in the timeline, present remotely and its metadata.
+///
+/// This type needs to be backwards and forwards compatible. When changing the fields,
+/// remember to add a test case for the changed version.
#[serde_as]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct IndexPart {
+ /// Debugging aid describing the version of this type.
+ #[serde(default)]
+ version: usize,
+
+ /// Each of the layers present on remote storage.
+ ///
+ /// Additional metadata can might exist in `layer_metadata`.
timeline_layers: HashSet,
+
/// Currently is not really used in pageserver,
/// present to manually keep track of the layer files that pageserver might never retrieve.
///
/// Such "holes" might appear if any upload task was evicted on an error threshold:
/// the this layer will only be rescheduled for upload on pageserver restart.
missing_layers: HashSet,
+
+ /// Per layer file metadata, which can be present for a present or missing layer file.
+ ///
+ /// Older versions of `IndexPart` will not have this property or have only a part of metadata
+ /// that latest version stores.
+ #[serde(default)]
+ layer_metadata: HashMap,
+
#[serde_as(as = "DisplayFromStr")]
disk_consistent_lsn: Lsn,
metadata_bytes: Vec,
}
impl IndexPart {
+ /// When adding or modifying any parts of `IndexPart`, increment the version so that it can be
+ /// used to understand later versions.
+ ///
+ /// Version is currently informative only.
+ const LATEST_VERSION: usize = 1;
pub const FILE_NAME: &'static str = "index_part.json";
#[cfg(test)]
@@ -288,8 +387,10 @@ impl IndexPart {
metadata_bytes: Vec,
) -> Self {
Self {
+ version: Self::LATEST_VERSION,
timeline_layers,
missing_layers,
+ layer_metadata: HashMap::default(),
disk_consistent_lsn,
metadata_bytes,
}
@@ -304,35 +405,68 @@ impl IndexPart {
remote_timeline: RemoteTimeline,
) -> anyhow::Result {
let metadata_bytes = remote_timeline.metadata.to_bytes()?;
+
+ let mut layer_metadata = HashMap::new();
+
+ let mut missing_layers = HashSet::new();
+
+ separate_paths_and_metadata(
+ timeline_path,
+ &remote_timeline.missing_layers,
+ &mut missing_layers,
+ &mut layer_metadata,
+ )
+ .context("Failed to convert missing layers' paths to relative ones")?;
+
+ let mut timeline_layers = HashSet::new();
+
+ separate_paths_and_metadata(
+ timeline_path,
+ &remote_timeline.timeline_layers,
+ &mut timeline_layers,
+ &mut layer_metadata,
+ )
+ .context("Failed to convert timeline layers' paths to relative ones")?;
+
Ok(Self {
- timeline_layers: to_relative_paths(timeline_path, remote_timeline.timeline_layers)
- .context("Failed to convert timeline layers' paths to relative ones")?,
- missing_layers: to_relative_paths(timeline_path, remote_timeline.missing_layers)
- .context("Failed to convert missing layers' paths to relative ones")?,
+ version: Self::LATEST_VERSION,
+ timeline_layers,
+ missing_layers,
+ layer_metadata,
disk_consistent_lsn: remote_timeline.metadata.disk_consistent_lsn(),
metadata_bytes,
})
}
}
-fn to_local_paths(
- timeline_path: &Path,
- paths: impl IntoIterator
- ,
-) -> HashSet {
- paths
- .into_iter()
- .map(|path| path.as_path(timeline_path))
- .collect()
+/// Serialized form of [`LayerFileMetadata`].
+#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
+pub struct IndexLayerMetadata {
+ file_size: Option,
}
-fn to_relative_paths(
+impl From<&'_ LayerFileMetadata> for IndexLayerMetadata {
+ fn from(other: &'_ LayerFileMetadata) -> Self {
+ IndexLayerMetadata {
+ file_size: other.file_size,
+ }
+ }
+}
+
+fn separate_paths_and_metadata(
timeline_path: &Path,
- paths: impl IntoIterator
- ,
-) -> anyhow::Result> {
- paths
- .into_iter()
- .map(|path| RelativePath::new(timeline_path, path))
- .collect()
+ input: &HashMap,
+ output: &mut HashSet,
+ layer_metadata: &mut HashMap,
+) -> anyhow::Result<()> {
+ for (path, metadata) in input {
+ let rel_path = RelativePath::new(timeline_path, path)?;
+ let metadata = IndexLayerMetadata::from(metadata);
+
+ layer_metadata.insert(rel_path.clone(), metadata);
+ output.insert(rel_path);
+ }
+ Ok(())
}
#[cfg(test)]
@@ -357,13 +491,13 @@ mod tests {
DEFAULT_PG_VERSION,
);
let remote_timeline = RemoteTimeline {
- timeline_layers: HashSet::from([
- timeline_path.join("layer_1"),
- timeline_path.join("layer_2"),
+ timeline_layers: HashMap::from([
+ (timeline_path.join("layer_1"), LayerFileMetadata::new(1)),
+ (timeline_path.join("layer_2"), LayerFileMetadata::new(2)),
]),
- missing_layers: HashSet::from([
- timeline_path.join("missing_1"),
- timeline_path.join("missing_2"),
+ missing_layers: HashMap::from([
+ (timeline_path.join("missing_1"), LayerFileMetadata::new(3)),
+ (timeline_path.join("missing_2"), LayerFileMetadata::new(4)),
]),
metadata: metadata.clone(),
awaits_download: false,
@@ -485,13 +619,13 @@ mod tests {
let conversion_result = IndexPart::from_remote_timeline(
&timeline_path,
RemoteTimeline {
- timeline_layers: HashSet::from([
- PathBuf::from("bad_path"),
- timeline_path.join("layer_2"),
+ timeline_layers: HashMap::from([
+ (PathBuf::from("bad_path"), LayerFileMetadata::new(1)),
+ (timeline_path.join("layer_2"), LayerFileMetadata::new(2)),
]),
- missing_layers: HashSet::from([
- timeline_path.join("missing_1"),
- timeline_path.join("missing_2"),
+ missing_layers: HashMap::from([
+ (timeline_path.join("missing_1"), LayerFileMetadata::new(3)),
+ (timeline_path.join("missing_2"), LayerFileMetadata::new(4)),
]),
metadata: metadata.clone(),
awaits_download: false,
@@ -502,13 +636,13 @@ mod tests {
let conversion_result = IndexPart::from_remote_timeline(
&timeline_path,
RemoteTimeline {
- timeline_layers: HashSet::from([
- timeline_path.join("layer_1"),
- timeline_path.join("layer_2"),
+ timeline_layers: HashMap::from([
+ (timeline_path.join("layer_1"), LayerFileMetadata::new(1)),
+ (timeline_path.join("layer_2"), LayerFileMetadata::new(2)),
]),
- missing_layers: HashSet::from([
- PathBuf::from("bad_path"),
- timeline_path.join("missing_2"),
+ missing_layers: HashMap::from([
+ (PathBuf::from("bad_path"), LayerFileMetadata::new(3)),
+ (timeline_path.join("missing_2"), LayerFileMetadata::new(4)),
]),
metadata,
awaits_download: false,
@@ -516,4 +650,63 @@ mod tests {
);
assert!(conversion_result.is_err(), "Should not be able to convert metadata with missing layer paths that are not in the timeline directory");
}
+
+ #[test]
+ fn v0_indexpart_is_parsed() {
+ let example = r#"{
+ "timeline_layers":["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"],
+ "missing_layers":["not_a_real_layer_but_adding_coverage"],
+ "disk_consistent_lsn":"0/16960E8",
+ "metadata_bytes":[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
+ }"#;
+
+ let expected = IndexPart {
+ version: 0,
+ timeline_layers: [RelativePath("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".to_owned())].into_iter().collect(),
+ missing_layers: [RelativePath("not_a_real_layer_but_adding_coverage".to_owned())].into_iter().collect(),
+ layer_metadata: HashMap::default(),
+ disk_consistent_lsn: "0/16960E8".parse::().unwrap(),
+ metadata_bytes: [113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
+ };
+
+ let part = serde_json::from_str::(example).unwrap();
+ assert_eq!(part, expected);
+ }
+
+ #[test]
+ fn v1_indexpart_is_parsed() {
+ let example = r#"{
+ "version":1,
+ "timeline_layers":["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"],
+ "missing_layers":["not_a_real_layer_but_adding_coverage"],
+ "layer_metadata":{
+ "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
+ "not_a_real_layer_but_adding_coverage": { "file_size": 9007199254741001 }
+ },
+ "disk_consistent_lsn":"0/16960E8",
+ "metadata_bytes":[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
+ }"#;
+
+ let expected = IndexPart {
+ // note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
+ version: 1,
+ timeline_layers: [RelativePath("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".to_owned())].into_iter().collect(),
+ missing_layers: [RelativePath("not_a_real_layer_but_adding_coverage".to_owned())].into_iter().collect(),
+ layer_metadata: HashMap::from([
+ (RelativePath("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".to_owned()), IndexLayerMetadata {
+ file_size: Some(25600000),
+ }),
+ (RelativePath("not_a_real_layer_but_adding_coverage".to_owned()), IndexLayerMetadata {
+ // serde_json should always parse this but this might be a double with jq for
+ // example.
+ file_size: Some(9007199254741001),
+ })
+ ]),
+ disk_consistent_lsn: "0/16960E8".parse::().unwrap(),
+ metadata_bytes: [113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
+ };
+
+ let part = serde_json::from_str::(example).unwrap();
+ assert_eq!(part, expected);
+ }
}
diff --git a/pageserver/src/storage_sync/upload.rs b/pageserver/src/storage_sync/upload.rs
index 75657915c0..f91105052b 100644
--- a/pageserver/src/storage_sync/upload.rs
+++ b/pageserver/src/storage_sync/upload.rs
@@ -69,14 +69,25 @@ pub(super) async fn upload_timeline_layers<'a>(
.map(|meta| meta.disk_consistent_lsn());
let already_uploaded_layers = remote_timeline
- .map(|timeline| timeline.stored_files())
- .cloned()
+ .map(|timeline| {
+ timeline
+ .stored_files()
+ .keys()
+ .cloned()
+ .collect::>()
+ })
.unwrap_or_default();
let layers_to_upload = upload
.layers_to_upload
- .difference(&already_uploaded_layers)
- .cloned()
+ .iter()
+ .filter_map(|(k, v)| {
+ if !already_uploaded_layers.contains(k) {
+ Some((k.to_owned(), v.to_owned()))
+ } else {
+ None
+ }
+ })
.collect::>();
if layers_to_upload.is_empty() {
@@ -98,7 +109,7 @@ pub(super) async fn upload_timeline_layers<'a>(
let mut upload_tasks = layers_to_upload
.into_iter()
- .map(|source_path| async move {
+ .map(|(source_path, known_metadata)| async move {
let source_file = match fs::File::open(&source_path).await.with_context(|| {
format!(
"Failed to upen a source file for layer '{}'",
@@ -109,7 +120,7 @@ pub(super) async fn upload_timeline_layers<'a>(
Err(e) => return Err(UploadError::MissingLocalFile(source_path, e)),
};
- let source_size = source_file
+ let fs_size = source_file
.metadata()
.await
.with_context(|| {
@@ -119,10 +130,24 @@ pub(super) async fn upload_timeline_layers<'a>(
)
})
.map_err(UploadError::Other)?
- .len() as usize;
+ .len();
+
+ // FIXME: this looks bad
+ if let Some(metadata_size) = known_metadata.file_size() {
+ if metadata_size != fs_size {
+ return Err(UploadError::Other(anyhow::anyhow!(
+ "File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}"
+ )));
+ }
+ } else {
+ // this is a silly state we would like to avoid
+ }
+
+ let fs_size = usize::try_from(fs_size).with_context(|| format!("File {source_path:?} size {fs_size} could not be converted to usize"))
+ .map_err(UploadError::Other)?;
match storage
- .upload_storage_object(Box::new(source_file), source_size, &source_path)
+ .upload_storage_object(Box::new(source_file), fs_size, &source_path)
.await
.with_context(|| format!("Failed to upload layer file for {sync_id}"))
{
@@ -136,8 +161,11 @@ pub(super) async fn upload_timeline_layers<'a>(
while let Some(upload_result) = upload_tasks.next().await {
match upload_result {
Ok(uploaded_path) => {
- upload.layers_to_upload.remove(&uploaded_path);
- upload.uploaded_layers.insert(uploaded_path);
+ let metadata = upload
+ .layers_to_upload
+ .remove(&uploaded_path)
+ .expect("metadata should always exist, assuming no double uploads");
+ upload.uploaded_layers.insert(uploaded_path, metadata);
}
Err(e) => match e {
UploadError::Other(e) => {
@@ -262,7 +290,7 @@ mod tests {
assert_eq!(
upload
.uploaded_layers
- .iter()
+ .keys()
.cloned()
.collect::>(),
layer_files
@@ -357,7 +385,7 @@ mod tests {
assert_eq!(
upload
.uploaded_layers
- .iter()
+ .keys()
.cloned()
.collect::>(),
layer_files
diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs
index 3639e30fee..0f8e60f8d3 100644
--- a/pageserver/src/tenant/timeline.rs
+++ b/pageserver/src/tenant/timeline.rs
@@ -52,7 +52,10 @@ use crate::task_mgr::TaskKind;
use crate::walreceiver::{is_etcd_client_initialized, spawn_connection_manager_task};
use crate::walredo::WalRedoManager;
use crate::CheckpointConfig;
-use crate::{page_cache, storage_sync};
+use crate::{
+ page_cache,
+ storage_sync::{self, index::LayerFileMetadata},
+};
pub struct Timeline {
conf: &'static PageServerConf,
@@ -1190,8 +1193,8 @@ impl Timeline {
self.create_image_layers(&partitioning, self.initdb_lsn, true)?
} else {
// normal case, write out a L0 delta layer file.
- let delta_path = self.create_delta_layer(&frozen_layer)?;
- HashSet::from([delta_path])
+ let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?;
+ HashMap::from([(delta_path, metadata)])
};
fail_point!("flush-frozen-before-sync");
@@ -1226,7 +1229,7 @@ impl Timeline {
fn update_disk_consistent_lsn(
&self,
disk_consistent_lsn: Lsn,
- layer_paths_to_upload: HashSet,
+ layer_paths_to_upload: HashMap,
) -> Result<()> {
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
// After crash, we will restart WAL streaming and processing from that point.
@@ -1295,7 +1298,10 @@ impl Timeline {
}
// Write out the given frozen in-memory layer as a new L0 delta file
- fn create_delta_layer(&self, frozen_layer: &InMemoryLayer) -> Result {
+ fn create_delta_layer(
+ &self,
+ frozen_layer: &InMemoryLayer,
+ ) -> Result<(PathBuf, LayerFileMetadata)> {
// Write it out
let new_delta = frozen_layer.write_to_disk()?;
let new_delta_path = new_delta.path();
@@ -1321,12 +1327,13 @@ impl Timeline {
// update the timeline's physical size
let sz = new_delta_path.metadata()?.len();
+
self.metrics.current_physical_size_gauge.add(sz);
// update metrics
self.metrics.num_persistent_files_created.inc_by(1);
self.metrics.persistent_bytes_written.inc_by(sz);
- Ok(new_delta_path)
+ Ok((new_delta_path, LayerFileMetadata::new(sz)))
}
pub fn compact(&self) -> anyhow::Result<()> {
@@ -1392,7 +1399,7 @@ impl Timeline {
storage_sync::schedule_layer_upload(
self.tenant_id,
self.timeline_id,
- HashSet::from_iter(layer_paths_to_upload),
+ layer_paths_to_upload,
None,
);
}
@@ -1473,10 +1480,9 @@ impl Timeline {
partitioning: &KeyPartitioning,
lsn: Lsn,
force: bool,
- ) -> Result> {
+ ) -> Result> {
let timer = self.metrics.create_images_time_histo.start_timer();
let mut image_layers: Vec = Vec::new();
- let mut layer_paths_to_upload = HashSet::new();
for partition in partitioning.parts.iter() {
if force || self.time_for_new_image_layer(partition, lsn)? {
let img_range =
@@ -1498,7 +1504,6 @@ impl Timeline {
}
}
let image_layer = image_layer_writer.finish()?;
- layer_paths_to_upload.insert(image_layer.path());
image_layers.push(image_layer);
}
}
@@ -1512,15 +1517,25 @@ impl Timeline {
//
// Compaction creates multiple image layers. It would be better to create them all
// and fsync them all in parallel.
- let mut all_paths = Vec::from_iter(layer_paths_to_upload.clone());
- all_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id));
+ let all_paths = image_layers
+ .iter()
+ .map(|layer| layer.path())
+ .chain(std::iter::once(
+ self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
+ ))
+ .collect::>();
par_fsync::par_fsync(&all_paths)?;
+ let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
+
let mut layers = self.layers.write().unwrap();
for l in image_layers {
- self.metrics
- .current_physical_size_gauge
- .add(l.path().metadata()?.len());
+ let path = l.path();
+ let metadata = path.metadata()?;
+
+ layer_paths_to_upload.insert(path, LayerFileMetadata::new(metadata.len()));
+
+ self.metrics.current_physical_size_gauge.add(metadata.len());
layers.insert_historic(Arc::new(l));
}
drop(layers);
@@ -1771,16 +1786,16 @@ impl Timeline {
}
let mut layers = self.layers.write().unwrap();
- let mut new_layer_paths = HashSet::with_capacity(new_layers.len());
+ let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
for l in new_layers {
let new_delta_path = l.path();
- // update the timeline's physical size
- self.metrics
- .current_physical_size_gauge
- .add(new_delta_path.metadata()?.len());
+ let metadata = new_delta_path.metadata()?;
- new_layer_paths.insert(new_delta_path);
+ // update the timeline's physical size
+ self.metrics.current_physical_size_gauge.add(metadata.len());
+
+ new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
layers.insert_historic(Arc::new(l));
}
diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs
index c6698ee22f..b2c927d4fc 100644
--- a/pageserver/src/tenant_mgr.rs
+++ b/pageserver/src/tenant_mgr.rs
@@ -1,7 +1,7 @@
//! This module acts as a switchboard to access different repositories managed by this
//! page server.
-use std::collections::{hash_map, HashMap, HashSet};
+use std::collections::{hash_map, HashMap};
use std::ffi::OsStr;
use std::fs;
use std::path::{Path, PathBuf};
@@ -14,8 +14,8 @@ use remote_storage::GenericRemoteStorage;
use crate::config::{PageServerConf, METADATA_FILE_NAME};
use crate::http::models::TenantInfo;
-use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex};
-use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
+use crate::storage_sync::index::{LayerFileMetadata, RemoteIndex, RemoteTimelineIndex};
+use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData, TimelineLocalFiles};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::{
ephemeral_file::is_ephemeral_file, metadata::TimelineMetadata, Tenant, TenantState,
@@ -104,7 +104,7 @@ pub fn init_tenant_mgr(
if let TenantAttachData::Ready(t) = new_timeline_values {
for (timeline_id, old_value) in old_values {
if let LocalTimelineInitStatus::LocallyComplete(metadata) = old_value {
- t.insert(timeline_id, (metadata, HashSet::new()));
+ t.insert(timeline_id, TimelineLocalFiles::ready(metadata));
}
}
}
@@ -189,7 +189,7 @@ pub fn attach_local_tenants(
let has_timelines = !timelines.is_empty();
let timelines_to_attach = timelines
.iter()
- .map(|(&k, (v, _))| (k, v.clone()))
+ .map(|(&k, v)| (k, v.metadata().to_owned()))
.collect();
match tenant.init_attach_timelines(timelines_to_attach) {
Ok(()) => {
@@ -483,7 +483,7 @@ pub fn list_tenant_info(remote_index: &RemoteTimelineIndex) -> Vec {
#[derive(Debug)]
pub enum TenantAttachData {
- Ready(HashMap)>),
+ Ready(HashMap),
Broken(anyhow::Error),
}
/// Attempts to collect information about all tenant and timelines, existing on the local FS.
@@ -602,7 +602,6 @@ fn is_temporary(path: &Path) -> bool {
}
}
-#[allow(clippy::type_complexity)]
fn collect_timelines_for_tenant(
config: &'static PageServerConf,
tenant_path: &Path,
@@ -648,7 +647,10 @@ fn collect_timelines_for_tenant(
} else {
match collect_timeline_files(&timeline_dir) {
Ok((timeline_id, metadata, timeline_files)) => {
- tenant_timelines.insert(timeline_id, (metadata, timeline_files));
+ tenant_timelines.insert(
+ timeline_id,
+ TimelineLocalFiles::collected(metadata, timeline_files),
+ );
}
Err(e) => {
error!(
@@ -690,8 +692,12 @@ fn collect_timelines_for_tenant(
// NOTE: ephemeral files are excluded from the list
fn collect_timeline_files(
timeline_dir: &Path,
-) -> anyhow::Result<(TimelineId, TimelineMetadata, HashSet)> {
- let mut timeline_files = HashSet::new();
+) -> anyhow::Result<(
+ TimelineId,
+ TimelineMetadata,
+ HashMap,
+)> {
+ let mut timeline_files = HashMap::new();
let mut timeline_metadata_path = None;
let timeline_id = timeline_dir
@@ -704,7 +710,9 @@ fn collect_timeline_files(
fs::read_dir(&timeline_dir).context("Failed to list timeline dir contents")?;
for entry in timeline_dir_entries {
let entry_path = entry.context("Failed to list timeline dir entry")?.path();
- if entry_path.is_file() {
+ let metadata = entry_path.metadata()?;
+
+ if metadata.is_file() {
if entry_path.file_name().and_then(OsStr::to_str) == Some(METADATA_FILE_NAME) {
timeline_metadata_path = Some(entry_path);
} else if is_ephemeral_file(&entry_path.file_name().unwrap().to_string_lossy()) {
@@ -719,7 +727,8 @@ fn collect_timeline_files(
)
})?;
} else {
- timeline_files.insert(entry_path);
+ let layer_metadata = LayerFileMetadata::new(metadata.len());
+ timeline_files.insert(entry_path, layer_metadata);
}
}
}
diff --git a/test_runner/regress/test_tenants_with_remote_storage.py b/test_runner/regress/test_tenants_with_remote_storage.py
index d8424e22c8..a7c2e7ace0 100644
--- a/test_runner/regress/test_tenants_with_remote_storage.py
+++ b/test_runner/regress/test_tenants_with_remote_storage.py
@@ -7,13 +7,16 @@
#
import asyncio
+import json
import os
+import shutil
from pathlib import Path
from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
+ LocalFsStorage,
NeonEnv,
NeonEnvBuilder,
NeonPageserverHttpClient,
@@ -189,3 +192,246 @@ def expect_tenant_to_download_timeline(
), f"Tenant {tenant_id} should have no downloads in progress"
return
assert False, f"Tenant {tenant_id} is missing on pageserver"
+
+
+@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
+def test_tenant_upgrades_index_json_from_v0(
+ neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
+):
+ # the "image" for the v0 index_part.json. the fields themselves are
+ # replaced with values read from the later version because of #2592 (initdb
+ # lsn not reproducible).
+ v0_skeleton = json.loads(
+ """{
+ "timeline_layers":[
+ "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"
+ ],
+ "missing_layers":[],
+ "disk_consistent_lsn":"0/16960E8",
+ "metadata_bytes":[]
+ }"""
+ )
+
+ # getting a too eager compaction happening for this test would not play
+ # well with the strict assertions.
+ neon_env_builder.pageserver_config_override = "tenant_config.compaction_period='1h'"
+
+ neon_env_builder.enable_remote_storage(
+ remote_storage_kind, "test_tenant_upgrades_index_json_from_v0"
+ )
+
+ # launch pageserver, populate the default tenants timeline, wait for it to be uploaded,
+ # then go ahead and modify the "remote" version as if it was downgraded, needing upgrade
+ env = neon_env_builder.init_start()
+ pageserver_http = env.pageserver.http_client()
+ pg = env.postgres.create_start("main")
+
+ tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
+ timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
+
+ with pg.cursor() as cur:
+ cur.execute("CREATE TABLE t0 AS VALUES (123, 'second column as text');")
+ current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
+
+ # flush, wait until in remote storage
+ wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, current_lsn)
+ pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
+ wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
+
+ env.postgres.stop_all()
+ env.pageserver.stop()
+
+ # remove all local data for the tenant to force redownloading and subsequent upgrade
+ shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_id))
+
+ # downgrade the remote file
+ timeline_path = local_fs_index_part_path(env, tenant_id, timeline_id)
+ with open(timeline_path, "r+") as timeline_file:
+ # keep the deserialized for later inspection
+ orig_index_part = json.load(timeline_file)
+
+ v0_index_part = {key: orig_index_part[key] for key in v0_skeleton}
+
+ timeline_file.seek(0)
+ json.dump(v0_index_part, timeline_file)
+
+ env.pageserver.start()
+ pageserver_http = env.pageserver.http_client()
+ pageserver_http.tenant_attach(tenant_id)
+
+ wait_until(
+ number_of_iterations=5,
+ interval=1,
+ func=lambda: expect_tenant_to_download_timeline(pageserver_http, tenant_id),
+ )
+
+ pg = env.postgres.create_start("main")
+
+ with pg.cursor() as cur:
+ cur.execute("INSERT INTO t0 VALUES (234, 'test data');")
+ current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
+
+ wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, current_lsn)
+ pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
+ wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
+
+ # not needed anymore
+ env.postgres.stop_all()
+ env.pageserver.stop()
+
+ # make sure the file has been upgraded back to how it started
+ index_part = local_fs_index_part(env, tenant_id, timeline_id)
+ assert index_part["version"] == orig_index_part["version"]
+ assert index_part["missing_layers"] == orig_index_part["missing_layers"]
+
+ # expect one more layer because of the forced checkpoint
+ assert len(index_part["timeline_layers"]) == len(orig_index_part["timeline_layers"]) + 1
+
+ # all of the same layer files are there, but they might be shuffled around
+ orig_layers = set(orig_index_part["timeline_layers"])
+ later_layers = set(index_part["timeline_layers"])
+ assert later_layers.issuperset(orig_layers)
+
+ added_layers = later_layers - orig_layers
+ assert len(added_layers) == 1
+
+ # all of metadata has been regenerated (currently just layer file size)
+ all_metadata_keys = set()
+ for layer in orig_layers:
+ orig_metadata = orig_index_part["layer_metadata"][layer]
+ new_metadata = index_part["layer_metadata"][layer]
+ assert (
+ orig_metadata == new_metadata
+ ), f"metadata for layer {layer} should not have changed {orig_metadata} vs. {new_metadata}"
+ all_metadata_keys |= set(orig_metadata.keys())
+
+ one_new_layer = next(iter(added_layers))
+ assert one_new_layer in index_part["layer_metadata"], "new layer should have metadata"
+
+ only_new_metadata = index_part["layer_metadata"][one_new_layer]
+
+ assert (
+ set(only_new_metadata.keys()).symmetric_difference(all_metadata_keys) == set()
+ ), "new layer metadata has same metadata as others"
+
+
+# FIXME: test index_part.json getting downgraded from imaginary new version
+
+
+@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
+def test_tenant_redownloads_truncated_file_on_startup(
+ neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
+):
+ # since we now store the layer file length metadata, we notice on startup that a layer file is of wrong size, and proceed to redownload it.
+ neon_env_builder.enable_remote_storage(
+ remote_storage_kind=remote_storage_kind,
+ test_name="test_tenant_redownloads_truncated_file_on_startup",
+ )
+
+ env = neon_env_builder.init_start()
+ pageserver_http = env.pageserver.http_client()
+ pg = env.postgres.create_start("main")
+
+ tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
+ timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
+
+ with pg.cursor() as cur:
+ cur.execute("CREATE TABLE t1 AS VALUES (123, 'foobar');")
+ current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
+
+ wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, current_lsn)
+ pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
+ wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
+
+ env.postgres.stop_all()
+ env.pageserver.stop()
+
+ timeline_dir = Path(env.repo_dir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
+ local_layer_truncated = None
+ for path in Path.iterdir(timeline_dir):
+ if path.name.startswith("00000"):
+ correct_size = os.stat(path).st_size
+ os.truncate(path, 0)
+ local_layer_truncated = (path, correct_size)
+ break
+ assert (
+ local_layer_truncated is not None
+ ), f"Found no local layer files to delete in directory {timeline_dir}"
+
+ (path, expected_size) = local_layer_truncated
+
+ # ensure the same size is found from the index_part.json
+ index_part = local_fs_index_part(env, tenant_id, timeline_id)
+ assert index_part["layer_metadata"][path.name]["file_size"] == expected_size
+
+ ##### Start the pageserver, forcing it to download the layer file and load the timeline into memory
+ env.pageserver.start()
+ client = env.pageserver.http_client()
+
+ wait_until(
+ number_of_iterations=5,
+ interval=1,
+ func=lambda: expect_tenant_to_download_timeline(client, tenant_id),
+ )
+
+ restored_timelines = client.timeline_list(tenant_id)
+ assert (
+ len(restored_timelines) == 1
+ ), f"Tenant {tenant_id} should have its timeline reattached after its layer is downloaded from the remote storage"
+ retored_timeline = restored_timelines[0]
+ assert retored_timeline["timeline_id"] == str(
+ timeline_id
+ ), f"Tenant {tenant_id} should have its old timeline {timeline_id} restored from the remote storage"
+
+ assert os.stat(path).st_size == expected_size, "truncated layer should had been re-downloaded"
+
+ # the remote side of local_layer_truncated
+ remote_layer_path = local_fs_index_part_path(env, tenant_id, timeline_id).parent / path.name
+
+ # if the upload ever was ongoing, this check would be racy, but at least one
+ # extra http request has been made in between so assume it's enough delay
+ assert (
+ os.stat(remote_layer_path).st_size == expected_size
+ ), "truncated file should not had been uploaded around re-download"
+
+ pg = env.postgres.create_start("main")
+
+ with pg.cursor() as cur:
+ cur.execute("INSERT INTO t1 VALUES (234, 'test data');")
+ current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
+
+ wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, current_lsn)
+ pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
+ wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
+
+ # now that the upload is complete, make sure the file hasn't been
+ # re-uploaded truncated. this is a rather bogus check given the current
+ # implementation, but it's critical it doesn't happen so wasting a few
+ # lines of python to do this.
+ assert (
+ os.stat(remote_layer_path).st_size == expected_size
+ ), "truncated file should not had been uploaded after next checkpoint"
+
+
+def local_fs_index_part(env, tenant_id, timeline_id):
+ """
+ Return json.load parsed index_part.json of tenant and timeline from LOCAL_FS
+ """
+ timeline_path = local_fs_index_part_path(env, tenant_id, timeline_id)
+ with open(timeline_path, "r") as timeline_file:
+ return json.load(timeline_file)
+
+
+def local_fs_index_part_path(env, tenant_id, timeline_id):
+ """
+ Return path to the LOCAL_FS index_part.json of the tenant and timeline.
+ """
+ assert isinstance(env.remote_storage, LocalFsStorage)
+ return (
+ env.remote_storage.root
+ / "tenants"
+ / str(tenant_id)
+ / "timelines"
+ / str(timeline_id)
+ / "index_part.json"
+ )