diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index cf524fcb25..2e8c3946bd 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -673,12 +673,30 @@ impl<'a> TenantDownloader<'a> { HeatMapDownload::Modified(m) => m, }; - let heatmap = serde_json::from_slice::(&heatmap_bytes)?; - - // Save the heatmap: this will be useful on restart, allowing us to reconstruct - // layer metadata without having to re-download it. + // Heatmap storage location let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id); + let last_heatmap = if last_download.is_none() { + match load_heatmap(&heatmap_path, ctx).await { + Ok(htm) => htm, + Err(e) => { + tracing::warn!("Couldn't load heatmap from {heatmap_path}: {e:?}"); + None + } + } + } else { + None + }; + + let last_heatmap_timelines = last_heatmap.as_ref().map(|htm| { + htm.timelines + .iter() + .map(|tl| (tl.timeline_id, tl)) + .collect::>() + }); + + let heatmap = serde_json::from_slice::(&heatmap_bytes)?; + let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX); let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}"); let heatmap_path_bg = heatmap_path.clone(); @@ -707,10 +725,17 @@ impl<'a> TenantDownloader<'a> { let timeline_state = match timeline_state { Some(t) => t, None => { + let last_heatmap = + last_heatmap_timelines + .as_ref() + .and_then(|last_heatmap_timelines| { + last_heatmap_timelines.get(&timeline.timeline_id).copied() + }); // We have no existing state: need to scan local disk for layers first. let timeline_state = init_timeline_state( self.conf, tenant_shard_id, + last_heatmap, timeline, &self.secondary_state.resident_size_metric, ) @@ -1079,12 +1104,12 @@ impl<'a> TenantDownloader<'a> { } } - if on_disk.metadata.generation_file_size() != on_disk.metadata.generation_file_size() { + if on_disk.metadata.generation_file_size() != layer.metadata.generation_file_size() { tracing::info!( "Re-downloading layer {} with changed size or generation: {:?}->{:?}", layer.name, on_disk.metadata.generation_file_size(), - on_disk.metadata.generation_file_size() + layer.metadata.generation_file_size() ); return LayerAction::Download; } @@ -1277,6 +1302,7 @@ impl<'a> TenantDownloader<'a> { async fn init_timeline_state( conf: &'static PageServerConf, tenant_shard_id: &TenantShardId, + last_heatmap: Option<&HeatMapTimeline>, heatmap: &HeatMapTimeline, resident_metric: &UIntGauge, ) -> SecondaryDetailTimeline { @@ -1306,6 +1332,13 @@ async fn init_timeline_state( let heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> = heatmap.layers.iter().map(|l| (&l.name, l)).collect(); + let last_heatmap_metadata: HashMap<&LayerName, &HeatMapLayer> = + if let Some(last_heatmap) = last_heatmap { + last_heatmap.layers.iter().map(|l| (&l.name, l)).collect() + } else { + HashMap::new() + }; + while let Some(dentry) = dir .next_entry() .await @@ -1339,18 +1372,32 @@ async fn init_timeline_state( match LayerName::from_str(file_name) { Ok(name) => { let remote_meta = heatmap_metadata.get(&name); + let last_meta = last_heatmap_metadata.get(&name); + let mut remove = false; match remote_meta { Some(remote_meta) => { + let last_meta_generation_file_size = last_meta + .map(|m| m.metadata.generation_file_size()) + .unwrap_or(remote_meta.metadata.generation_file_size()); // TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784) - if local_meta.len() != remote_meta.metadata.file_size { - // This should not happen, because we do crashsafe write-then-rename when downloading - // layers, and layers in remote storage are immutable. Remove the local file because - // we cannot trust it. - tracing::warn!( + if remote_meta.metadata.generation_file_size() + != last_meta_generation_file_size + { + tracing::info!( + "Removing local layer {name} as on-disk json metadata has different generation or file size from remote: {:?} -> {:?}", + last_meta_generation_file_size, + remote_meta.metadata.generation_file_size() + ); + remove = true; + } else if local_meta.len() != remote_meta.metadata.file_size { + // This can happen in the presence of race conditions: the remote and on-disk metadata have changed, but we haven't had + // the chance yet to download the new layer to disk, before the process restarted. + tracing::info!( "Removing local layer {name} with unexpected local size {} != {}", local_meta.len(), remote_meta.metadata.file_size ); + remove = true; } else { // We expect the access time to be initialized immediately afterwards, when // the latest heatmap is applied to the state. @@ -1372,15 +1419,18 @@ async fn init_timeline_state( "Removing secondary local layer {} because it's absent in heatmap", name ); - tokio::fs::remove_file(&dentry.path()) - .await - .or_else(fs_ext::ignore_not_found) - .fatal_err(&format!( - "Removing layer {}", - dentry.path().to_string_lossy() - )); + remove = true; } } + if remove { + tokio::fs::remove_file(&dentry.path()) + .await + .or_else(fs_ext::ignore_not_found) + .fatal_err(&format!( + "Removing layer {}", + dentry.path().to_string_lossy() + )); + } } Err(_) => { // Ignore it. @@ -1391,3 +1441,18 @@ async fn init_timeline_state( detail } + +/// Loads a json-encoded heatmap file from the provided on-disk path +async fn load_heatmap( + path: &Utf8PathBuf, + ctx: &RequestContext, +) -> Result, anyhow::Error> { + let mut file = match VirtualFile::open(path, ctx).await { + Ok(file) => file, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None), + Err(e) => Err(e)?, + }; + let st = file.read_to_string(ctx).await?; + let htm = serde_json::from_str(&st)?; + Ok(Some(htm)) +} diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 8a7f4a4bf5..9d539198c7 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -234,6 +234,19 @@ impl VirtualFile { ) -> (FullSlice, Result) { self.inner.write_all(buf, ctx).await } + + async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { + self.inner.read_to_end(buf, ctx).await + } + + pub(crate) async fn read_to_string( + &mut self, + ctx: &RequestContext, + ) -> Result { + let mut buf = Vec::new(); + self.read_to_end(&mut buf, ctx).await?; + Ok(String::from_utf8(buf)?) + } } /// Indicates whether to enable fsync, fdatasync, or O_SYNC/O_DSYNC when writing @@ -993,6 +1006,24 @@ impl VirtualFileInner { (buf, result) }) } + + async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { + let mut tmp = vec![0; 128]; + loop { + let slice = tmp.slice(..128); + let (slice, res) = self.read_at(slice, self.pos, ctx).await; + match res { + Ok(0) => return Ok(()), + Ok(n) => { + self.pos += n as u64; + buf.extend_from_slice(&slice[..n]); + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + tmp = slice.into_inner(); + } + } } // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 @@ -1237,10 +1268,6 @@ impl VirtualFile { ) -> Result, std::io::Error> { self.inner.read_blk(blknum, ctx).await } - - async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { - self.inner.read_to_end(buf, ctx).await - } } #[cfg(test)] @@ -1260,24 +1287,6 @@ impl VirtualFileInner { slice.into_inner(), )) } - - async fn read_to_end(&mut self, buf: &mut Vec, ctx: &RequestContext) -> Result<(), Error> { - let mut tmp = vec![0; 128]; - loop { - let slice = tmp.slice(..128); - let (slice, res) = self.read_at(slice, self.pos, ctx).await; - match res { - Ok(0) => return Ok(()), - Ok(n) => { - self.pos += n as u64; - buf.extend_from_slice(&slice[..n]); - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), - } - tmp = slice.into_inner(); - } - } } impl Drop for VirtualFileInner {