mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
refactor: use new type LayerFileName when referring to layer file names in PathBuf/RemotePath (#3026)
refactor: use new type LayerFileName when referring to layer file names in PathBuf/RemotePath
Before this patch, we would sometimes carry around plain file names in
`Path` types and/or awkwardly "rebase" paths to have a unified
representation of the layer file name between local and remote.
This patch introduces a new type `LayerFileName` which replaces the use
of `Path` / `PathBuf` / `RemotePath` in the `storage_sync2` APIs.
Instead of holding a string, it contains the parsed representation of
the image and delta file name.
When we need the file name, e.g., to construct a local path or
remote object key, we construct the name ad-hoc.
`LayerFileName` is also serde {Dese,Se}rializable, and in an initial
version of this patch, it was supposed to be used directly inside
`IndexPart`, replacing `RemotePath`.
However,
commit 3122f3282f
Ignore backup files (ones with .n.old suffix) in download_missing
fixed handling of `*.old` backup file names in IndexPart, and we need
to carry that behavior forward.
The solution is to remove `*.old` backup files names during
deserialization. When we re-serialize the IndexPart, the `*.old` file
will be gone.
This leaks the `.old` file in the remote storage, but makes it safe
to clean it up later.
There is additional churn by a preliminary refactoring that got squashed
into this change:
split off LayerMap's needs from trait Layer into super trait
That refactoring renames `Layer` to `PersistentLayer` and splits off a subset
of the functions into a super-trait called `Layer`.
The upser trait implements just the functions needed by `LayerMap`, whereas
`PersisentLayer` adds the context of the pageserver.
The naming is imperfect as some functions that reside in `PersistentLayer`
have nothing persistence-specific to it. But it's a step in the right direction.
This commit is contained in:
committed by
GitHub
parent
d1edc8aa00
commit
22ae67af8d
@@ -142,7 +142,7 @@ impl std::fmt::Display for DownloadError {
|
||||
write!(f, "Failed to download a remote file due to user input: {e}")
|
||||
}
|
||||
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
|
||||
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e}"),
|
||||
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
use anyhow::Result;
|
||||
use pageserver::repository::{Key, Value};
|
||||
use pageserver::repository::Key;
|
||||
use pageserver::tenant::filename::{DeltaFileName, ImageFileName};
|
||||
use pageserver::tenant::layer_map::LayerMap;
|
||||
use pageserver::tenant::storage_layer::Layer;
|
||||
use pageserver::tenant::storage_layer::ValueReconstructResult;
|
||||
use pageserver::tenant::storage_layer::ValueReconstructState;
|
||||
use pageserver::tenant::storage_layer::{Layer, ValueReconstructResult};
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use std::cmp::{max, min};
|
||||
use std::fs::File;
|
||||
@@ -14,7 +13,7 @@ use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
@@ -25,14 +24,6 @@ struct DummyDelta {
|
||||
}
|
||||
|
||||
impl Layer for DummyDelta {
|
||||
fn get_tenant_id(&self) -> TenantId {
|
||||
TenantId::from_str("00000000000000000000000000000000").unwrap()
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> TimelineId {
|
||||
TimelineId::from_str("00000000000000000000000000000000").unwrap()
|
||||
}
|
||||
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.key_range.clone()
|
||||
}
|
||||
@@ -40,15 +31,6 @@ impl Layer for DummyDelta {
|
||||
fn get_lsn_range(&self) -> Range<Lsn> {
|
||||
self.lsn_range.clone()
|
||||
}
|
||||
|
||||
fn filename(&self) -> PathBuf {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn local_path(&self) -> Option<PathBuf> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
_key: Key,
|
||||
@@ -62,24 +44,12 @@ impl Layer for DummyDelta {
|
||||
true
|
||||
}
|
||||
|
||||
fn is_in_memory(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + '_> {
|
||||
panic!()
|
||||
}
|
||||
|
||||
fn key_iter(&self) -> Box<dyn Iterator<Item = (Key, Lsn, u64)> + '_> {
|
||||
panic!("Not implemented")
|
||||
}
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
panic!()
|
||||
}
|
||||
|
||||
fn dump(&self, _verbose: bool) -> Result<()> {
|
||||
todo!()
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn short_id(&self) -> String {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,14 +59,6 @@ struct DummyImage {
|
||||
}
|
||||
|
||||
impl Layer for DummyImage {
|
||||
fn get_tenant_id(&self) -> TenantId {
|
||||
TenantId::from_str("00000000000000000000000000000000").unwrap()
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> TimelineId {
|
||||
TimelineId::from_str("00000000000000000000000000000000").unwrap()
|
||||
}
|
||||
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.key_range.clone()
|
||||
}
|
||||
@@ -106,14 +68,6 @@ impl Layer for DummyImage {
|
||||
self.lsn..(self.lsn + 1)
|
||||
}
|
||||
|
||||
fn filename(&self) -> PathBuf {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn local_path(&self) -> Option<PathBuf> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
_key: Key,
|
||||
@@ -127,29 +81,17 @@ impl Layer for DummyImage {
|
||||
false
|
||||
}
|
||||
|
||||
fn is_in_memory(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + '_> {
|
||||
panic!()
|
||||
}
|
||||
|
||||
fn key_iter(&self) -> Box<dyn Iterator<Item = (Key, Lsn, u64)> + '_> {
|
||||
panic!("Not implemented")
|
||||
}
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
panic!()
|
||||
}
|
||||
|
||||
fn dump(&self, _verbose: bool) -> Result<()> {
|
||||
todo!()
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn short_id(&self) -> String {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
|
||||
let mut layer_map = LayerMap::default();
|
||||
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<dyn Layer> {
|
||||
let mut layer_map = LayerMap::<dyn Layer>::default();
|
||||
|
||||
let mut min_lsn = Lsn(u64::MAX);
|
||||
let mut max_lsn = Lsn(0);
|
||||
@@ -185,7 +127,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
|
||||
}
|
||||
|
||||
/// Construct a layer map query pattern for benchmarks
|
||||
fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> {
|
||||
fn uniform_query_pattern(layer_map: &LayerMap<dyn Layer>) -> Vec<(Key, Lsn)> {
|
||||
// For each image layer we query one of the pages contained, at LSN right
|
||||
// before the image layer was created. This gives us a somewhat uniform
|
||||
// coverage of both the lsn and key space because image layers have
|
||||
@@ -258,7 +200,7 @@ fn bench_from_real_project(c: &mut Criterion) {
|
||||
|
||||
// Benchmark using synthetic data. Arrange image layers on stacked diagonal lines.
|
||||
fn bench_sequential(c: &mut Criterion) {
|
||||
let mut layer_map = LayerMap::default();
|
||||
let mut layer_map: LayerMap<dyn Layer> = LayerMap::default();
|
||||
|
||||
// Init layer map. Create 100_000 layers arranged in 1000 diagonal lines.
|
||||
//
|
||||
|
||||
@@ -197,12 +197,11 @@ pub use download::{is_temp_download_file, list_remote_timelines};
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fmt::Debug;
|
||||
use std::ops::DerefMut;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use anyhow::ensure;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage};
|
||||
use tokio::runtime::Runtime;
|
||||
use tracing::{info, warn};
|
||||
use tracing::{info_span, Instrument};
|
||||
@@ -215,6 +214,7 @@ use crate::metrics::MeasureRemoteOp;
|
||||
use crate::metrics::RemoteOpFileKind;
|
||||
use crate::metrics::RemoteOpKind;
|
||||
use crate::metrics::REMOTE_UPLOAD_QUEUE_UNFINISHED_TASKS;
|
||||
use crate::tenant::filename::LayerFileName;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
storage_sync::index::LayerFileMetadata,
|
||||
@@ -287,7 +287,7 @@ struct UploadQueueInitialized {
|
||||
|
||||
/// All layer files stored in the remote storage, taking into account all
|
||||
/// in-progress and queued operations
|
||||
latest_files: HashMap<RemotePath, LayerFileMetadata>,
|
||||
latest_files: HashMap<LayerFileName, LayerFileMetadata>,
|
||||
|
||||
/// Metadata stored in the remote storage, taking into account all
|
||||
/// in-progress and queued operations.
|
||||
@@ -357,10 +357,6 @@ impl UploadQueue {
|
||||
|
||||
fn initialize_with_current_remote_index_part(
|
||||
&mut self,
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
|
||||
index_part: &IndexPart,
|
||||
) -> anyhow::Result<&mut UploadQueueInitialized> {
|
||||
match self {
|
||||
@@ -371,18 +367,13 @@ impl UploadQueue {
|
||||
}
|
||||
|
||||
let mut files = HashMap::with_capacity(index_part.timeline_layers.len());
|
||||
let timeline_path = conf.timeline_path(&timeline_id, &tenant_id);
|
||||
for timeline_name in &index_part.timeline_layers {
|
||||
let local_path = timeline_path.join(timeline_name);
|
||||
let remote_timeline_path = conf.remote_path(&local_path).expect(
|
||||
"Remote timeline path and local timeline path were constructed form the same conf",
|
||||
);
|
||||
for layer_name in &index_part.timeline_layers {
|
||||
let layer_metadata = index_part
|
||||
.layer_metadata
|
||||
.get(timeline_name)
|
||||
.get(layer_name)
|
||||
.map(LayerFileMetadata::from)
|
||||
.unwrap_or(LayerFileMetadata::MISSING);
|
||||
files.insert(remote_timeline_path, layer_metadata);
|
||||
files.insert(layer_name.to_owned(), layer_metadata);
|
||||
}
|
||||
|
||||
let index_part_metadata = index_part.parse_metadata()?;
|
||||
@@ -431,13 +422,13 @@ struct UploadTask {
|
||||
#[derive(Debug)]
|
||||
enum UploadOp {
|
||||
/// Upload a layer file
|
||||
UploadLayer(PathBuf, LayerFileMetadata),
|
||||
UploadLayer(LayerFileName, LayerFileMetadata),
|
||||
|
||||
/// Upload the metadata file
|
||||
UploadMetadata(IndexPart, Lsn),
|
||||
|
||||
/// Delete a file.
|
||||
Delete(RemoteOpFileKind, PathBuf),
|
||||
Delete(RemoteOpFileKind, LayerFileName),
|
||||
|
||||
/// Barrier. When the barrier operation is reached,
|
||||
Barrier(tokio::sync::watch::Sender<()>),
|
||||
@@ -446,14 +437,16 @@ enum UploadOp {
|
||||
impl std::fmt::Display for UploadOp {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
match self {
|
||||
UploadOp::UploadLayer(path, metadata) => write!(
|
||||
f,
|
||||
"UploadLayer({}, size={:?})",
|
||||
path.display(),
|
||||
metadata.file_size()
|
||||
),
|
||||
UploadOp::UploadLayer(path, metadata) => {
|
||||
write!(
|
||||
f,
|
||||
"UploadLayer({}, size={:?})",
|
||||
path.file_name(),
|
||||
metadata.file_size()
|
||||
)
|
||||
}
|
||||
UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn),
|
||||
UploadOp::Delete(_, path) => write!(f, "Delete({})", path.display()),
|
||||
UploadOp::Delete(_, path) => write!(f, "Delete({})", path.file_name()),
|
||||
UploadOp::Barrier(_) => write!(f, "Barrier"),
|
||||
}
|
||||
}
|
||||
@@ -465,12 +458,7 @@ impl RemoteTimelineClient {
|
||||
/// The given `index_part` must be the one on the remote.
|
||||
pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
|
||||
let mut upload_queue = self.upload_queue.lock().unwrap();
|
||||
upload_queue.initialize_with_current_remote_index_part(
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
index_part,
|
||||
)?;
|
||||
upload_queue.initialize_with_current_remote_index_part(index_part)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -524,13 +512,15 @@ impl RemoteTimelineClient {
|
||||
/// On success, returns the size of the downloaded file.
|
||||
pub async fn download_layer_file(
|
||||
&self,
|
||||
remote_path: &RemotePath,
|
||||
layer_file_name: &LayerFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
) -> anyhow::Result<u64> {
|
||||
let downloaded_size = download::download_layer_file(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
remote_path,
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
layer_file_name,
|
||||
layer_metadata,
|
||||
)
|
||||
.measure_remote_op(
|
||||
@@ -548,13 +538,13 @@ impl RemoteTimelineClient {
|
||||
let new_metadata = LayerFileMetadata::new(downloaded_size);
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
if let Some(upgraded) = upload_queue.latest_files.get_mut(remote_path) {
|
||||
if let Some(upgraded) = upload_queue.latest_files.get_mut(layer_file_name) {
|
||||
upgraded.merge(&new_metadata);
|
||||
} else {
|
||||
// The file should exist, since we just downloaded it.
|
||||
warn!(
|
||||
"downloaded file {:?} not found in local copy of the index file",
|
||||
remote_path
|
||||
layer_file_name
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -611,7 +601,7 @@ impl RemoteTimelineClient {
|
||||
///
|
||||
pub fn schedule_layer_file_upload(
|
||||
self: &Arc<Self>,
|
||||
path: &Path,
|
||||
layer_file_name: &LayerFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
@@ -626,13 +616,16 @@ impl RemoteTimelineClient {
|
||||
|
||||
upload_queue
|
||||
.latest_files
|
||||
.insert(self.conf.remote_path(path)?, layer_metadata.clone());
|
||||
.insert(layer_file_name.clone(), layer_metadata.clone());
|
||||
|
||||
let op = UploadOp::UploadLayer(PathBuf::from(path), layer_metadata.clone());
|
||||
let op = UploadOp::UploadLayer(layer_file_name.clone(), layer_metadata.clone());
|
||||
self.update_upload_queue_unfinished_metric(1, &op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
|
||||
info!("scheduled layer file upload {}", path.display());
|
||||
info!(
|
||||
"scheduled layer file upload {}",
|
||||
layer_file_name.file_name()
|
||||
);
|
||||
|
||||
// Launch the task immediately, if possible
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
@@ -644,16 +637,13 @@ impl RemoteTimelineClient {
|
||||
///
|
||||
/// The deletion won't actually be performed, until all preceding
|
||||
/// upload operations have completed succesfully.
|
||||
pub fn schedule_layer_file_deletion(self: &Arc<Self>, paths: &[PathBuf]) -> anyhow::Result<()> {
|
||||
pub fn schedule_layer_file_deletion(
|
||||
self: &Arc<Self>,
|
||||
names: &[LayerFileName],
|
||||
) -> anyhow::Result<()> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
// Convert the paths into RemotePaths, and gather other information we need.
|
||||
let mut remote_paths = Vec::with_capacity(paths.len());
|
||||
for path in paths {
|
||||
remote_paths.push(self.conf.remote_path(path)?);
|
||||
}
|
||||
|
||||
// Deleting layers doesn't affect the values stored in TimelineMetadata,
|
||||
// so we don't need update it. Just serialize it.
|
||||
let metadata_bytes = upload_queue.latest_metadata.to_bytes()?;
|
||||
@@ -667,8 +657,8 @@ impl RemoteTimelineClient {
|
||||
// from latest_files, but not yet scheduled for deletion. Use a closure
|
||||
// to syntactically forbid ? or bail! calls here.
|
||||
let no_bail_here = || {
|
||||
for remote_path in remote_paths {
|
||||
upload_queue.latest_files.remove(&remote_path);
|
||||
for name in names {
|
||||
upload_queue.latest_files.remove(name);
|
||||
}
|
||||
|
||||
let index_part = IndexPart::new(
|
||||
@@ -681,11 +671,11 @@ impl RemoteTimelineClient {
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
|
||||
// schedule the actual deletions
|
||||
for path in paths {
|
||||
let op = UploadOp::Delete(RemoteOpFileKind::Layer, PathBuf::from(path));
|
||||
for name in names {
|
||||
let op = UploadOp::Delete(RemoteOpFileKind::Layer, name.clone());
|
||||
self.update_upload_queue_unfinished_metric(1, &op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
info!("scheduled layer file deletion {}", path.display());
|
||||
info!("scheduled layer file deletion {}", name.file_name());
|
||||
}
|
||||
|
||||
// Launch the tasks immediately, if possible
|
||||
@@ -841,7 +831,11 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
|
||||
let upload_result: anyhow::Result<()> = match &task.op {
|
||||
UploadOp::UploadLayer(ref path, ref layer_metadata) => {
|
||||
UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => {
|
||||
let path = &self
|
||||
.conf
|
||||
.timeline_path(&self.timeline_id, &self.tenant_id)
|
||||
.join(layer_file_name.file_name());
|
||||
upload::upload_timeline_layer(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
@@ -872,7 +866,11 @@ impl RemoteTimelineClient {
|
||||
)
|
||||
.await
|
||||
}
|
||||
UploadOp::Delete(metric_file_kind, ref path) => {
|
||||
UploadOp::Delete(metric_file_kind, ref layer_file_name) => {
|
||||
let path = &self
|
||||
.conf
|
||||
.timeline_path(&self.timeline_id, &self.tenant_id)
|
||||
.join(layer_file_name.file_name());
|
||||
delete::delete_layer(self.conf, &self.storage_impl, path)
|
||||
.measure_remote_op(
|
||||
self.tenant_id,
|
||||
@@ -1078,7 +1076,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
|
||||
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
|
||||
use std::collections::HashSet;
|
||||
use std::{collections::HashSet, path::Path};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
|
||||
@@ -1102,8 +1100,8 @@ mod tests {
|
||||
TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
|
||||
}
|
||||
|
||||
fn assert_file_list(a: &HashSet<String>, b: &[&str]) {
|
||||
let mut avec: Vec<&str> = a.iter().map(|a| a.as_str()).collect();
|
||||
fn assert_file_list(a: &HashSet<LayerFileName>, b: &[&str]) {
|
||||
let mut avec: Vec<String> = a.iter().map(|x| x.file_name()).collect();
|
||||
avec.sort();
|
||||
|
||||
let mut bvec = b.to_vec();
|
||||
@@ -1198,11 +1196,11 @@ mod tests {
|
||||
std::fs::write(timeline_path.join("bar"), &content_bar)?;
|
||||
|
||||
client.schedule_layer_file_upload(
|
||||
&timeline_path.join("foo"),
|
||||
&LayerFileName::Test("foo".to_owned()),
|
||||
&LayerFileMetadata::new(content_foo.len() as u64),
|
||||
)?;
|
||||
client.schedule_layer_file_upload(
|
||||
&timeline_path.join("bar"),
|
||||
&LayerFileName::Test("bar".to_owned()),
|
||||
&LayerFileMetadata::new(content_bar.len() as u64),
|
||||
)?;
|
||||
|
||||
@@ -1244,10 +1242,10 @@ mod tests {
|
||||
let content_baz = dummy_contents("baz");
|
||||
std::fs::write(timeline_path.join("baz"), &content_baz)?;
|
||||
client.schedule_layer_file_upload(
|
||||
&timeline_path.join("baz"),
|
||||
&LayerFileName::Test("baz".to_owned()),
|
||||
&LayerFileMetadata::new(content_baz.len() as u64),
|
||||
)?;
|
||||
client.schedule_layer_file_deletion(&[timeline_path.join("foo")])?;
|
||||
client.schedule_layer_file_deletion(&[LayerFileName::Test("foo".to_owned())])?;
|
||||
{
|
||||
let mut guard = client.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut().unwrap();
|
||||
|
||||
@@ -6,15 +6,16 @@ use anyhow::{bail, Context};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use tokio::fs;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tracing::debug;
|
||||
use tracing::{debug, info_span, Instrument};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::storage_sync::index::LayerFileMetadata;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
|
||||
use crate::tenant::filename::LayerFileName;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::index::IndexPart;
|
||||
use super::index::{IndexPart, IndexPartUnclean};
|
||||
|
||||
async fn fsync_path(path: impl AsRef<std::path::Path>) -> Result<(), std::io::Error> {
|
||||
fs::File::open(path).await?.sync_all().await
|
||||
@@ -28,10 +29,16 @@ async fn fsync_path(path: impl AsRef<std::path::Path>) -> Result<(), std::io::Er
|
||||
pub async fn download_layer_file<'a>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &'a GenericRemoteStorage,
|
||||
remote_path: &'a RemotePath,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
layer_file_name: &'a LayerFileName,
|
||||
layer_metadata: &'a LayerFileMetadata,
|
||||
) -> anyhow::Result<u64> {
|
||||
let local_path = conf.local_path(remote_path);
|
||||
let timeline_path = conf.timeline_path(&timeline_id, &tenant_id);
|
||||
|
||||
let local_path = timeline_path.join(layer_file_name.file_name());
|
||||
|
||||
let remote_path = conf.remote_path(&local_path)?;
|
||||
|
||||
// Perform a rename inspired by durable_rename from file_utils.c.
|
||||
// The sequence:
|
||||
@@ -52,7 +59,7 @@ pub async fn download_layer_file<'a>(
|
||||
temp_file_path.display()
|
||||
)
|
||||
})?;
|
||||
let mut download = storage.download(remote_path).await.with_context(|| {
|
||||
let mut download = storage.download(&remote_path).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to open a download stream for layer with remote storage path '{remote_path:?}'"
|
||||
)
|
||||
@@ -169,7 +176,9 @@ pub async fn list_remote_timelines<'a>(
|
||||
part_downloads.push(async move {
|
||||
(
|
||||
timeline_id,
|
||||
download_index_part(conf, &storage_clone, tenant_id, timeline_id).await,
|
||||
download_index_part(conf, &storage_clone, tenant_id, timeline_id)
|
||||
.instrument(info_span!("download_index_part", timeline=%timeline_id))
|
||||
.await,
|
||||
)
|
||||
});
|
||||
}
|
||||
@@ -211,11 +220,13 @@ pub async fn download_index_part(
|
||||
.with_context(|| format!("Failed to download an index part into file {index_part_path:?}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
|
||||
let index_part: IndexPartUnclean = serde_json::from_slice(&index_part_bytes)
|
||||
.with_context(|| {
|
||||
format!("Failed to deserialize index part file into file {index_part_path:?}")
|
||||
})
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let index_part = index_part.remove_unclean_layer_file_names();
|
||||
|
||||
Ok(index_part)
|
||||
}
|
||||
|
||||
@@ -4,11 +4,11 @@
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use remote_storage::RemotePath;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use tracing::warn;
|
||||
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
use crate::tenant::{filename::LayerFileName, metadata::TimelineMetadata};
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -62,7 +62,10 @@ impl LayerFileMetadata {
|
||||
/// remember to add a test case for the changed version.
|
||||
#[serde_as]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct IndexPart {
|
||||
pub struct IndexPartImpl<L>
|
||||
where
|
||||
L: std::hash::Hash + PartialEq + Eq,
|
||||
{
|
||||
/// Debugging aid describing the version of this type.
|
||||
#[serde(default)]
|
||||
version: usize,
|
||||
@@ -70,19 +73,19 @@ pub struct IndexPart {
|
||||
/// Layer names, which are stored on the remote storage.
|
||||
///
|
||||
/// Additional metadata can might exist in `layer_metadata`.
|
||||
pub timeline_layers: HashSet<String>,
|
||||
pub timeline_layers: HashSet<L>,
|
||||
|
||||
/// FIXME: unused field. This should be removed, but that changes the on-disk format,
|
||||
/// so we need to make sure we're backwards-` (and maybe forwards-) compatible
|
||||
/// First pass is to move it to Optional and the next would be its removal
|
||||
missing_layers: Option<HashSet<String>>,
|
||||
missing_layers: Option<HashSet<L>>,
|
||||
|
||||
/// Per layer file name metadata, which can be present for a present or missing layer file.
|
||||
///
|
||||
/// Older versions of `IndexPart` will not have this property or have only a part of metadata
|
||||
/// that latest version stores.
|
||||
#[serde(default)]
|
||||
pub layer_metadata: HashMap<String, IndexLayerMetadata>,
|
||||
#[serde(default = "HashMap::default")]
|
||||
pub layer_metadata: HashMap<L, IndexLayerMetadata>,
|
||||
|
||||
// 'disk_consistent_lsn' is a copy of the 'disk_consistent_lsn' in the metadata.
|
||||
// It's duplicated here for convenience.
|
||||
@@ -91,6 +94,104 @@ pub struct IndexPart {
|
||||
metadata_bytes: Vec<u8>,
|
||||
}
|
||||
|
||||
// TODO seems like another part of the remote storage file format
|
||||
// compatibility issue, see https://github.com/neondatabase/neon/issues/3072
|
||||
pub type IndexPart = IndexPartImpl<LayerFileName>;
|
||||
|
||||
pub type IndexPartUnclean = IndexPartImpl<UncleanLayerFileName>;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
|
||||
pub enum UncleanLayerFileName {
|
||||
Clean(LayerFileName),
|
||||
BackupFile(String),
|
||||
}
|
||||
|
||||
impl<'de> serde::Deserialize<'de> for UncleanLayerFileName {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
deserializer.deserialize_string(UncleanLayerFileNameVisitor)
|
||||
}
|
||||
}
|
||||
|
||||
struct UncleanLayerFileNameVisitor;
|
||||
|
||||
impl<'de> serde::de::Visitor<'de> for UncleanLayerFileNameVisitor {
|
||||
type Value = UncleanLayerFileName;
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(
|
||||
formatter,
|
||||
"a string that is a valid LayerFileName or '.old' backup file name"
|
||||
)
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
let maybe_clean: Result<LayerFileName, _> = v.parse();
|
||||
match maybe_clean {
|
||||
Ok(clean) => Ok(UncleanLayerFileName::Clean(clean)),
|
||||
Err(e) => {
|
||||
if v.ends_with(".old") {
|
||||
Ok(UncleanLayerFileName::BackupFile(v.to_owned()))
|
||||
} else {
|
||||
Err(E::custom(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UncleanLayerFileName {
|
||||
fn into_clean(self) -> Option<LayerFileName> {
|
||||
match self {
|
||||
UncleanLayerFileName::Clean(clean) => Some(clean),
|
||||
UncleanLayerFileName::BackupFile(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexPartUnclean {
|
||||
pub fn remove_unclean_layer_file_names(self) -> IndexPart {
|
||||
let IndexPartUnclean {
|
||||
version,
|
||||
timeline_layers,
|
||||
// this is an unused field, ignore it on cleaning
|
||||
missing_layers: _,
|
||||
layer_metadata,
|
||||
disk_consistent_lsn,
|
||||
metadata_bytes,
|
||||
} = self;
|
||||
|
||||
IndexPart {
|
||||
version,
|
||||
timeline_layers: timeline_layers
|
||||
.into_iter()
|
||||
.filter_map(|unclean_file_name| match unclean_file_name {
|
||||
UncleanLayerFileName::Clean(clean_name) => Some(clean_name),
|
||||
UncleanLayerFileName::BackupFile(backup_file_name) => {
|
||||
// For details see https://github.com/neondatabase/neon/issues/3024
|
||||
warn!(
|
||||
"got backup file on the remote storage, ignoring it {backup_file_name}"
|
||||
);
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
missing_layers: None,
|
||||
layer_metadata: layer_metadata
|
||||
.into_iter()
|
||||
.filter_map(|(l, m)| l.into_clean().map(|l| (l, m)))
|
||||
.collect(),
|
||||
disk_consistent_lsn,
|
||||
metadata_bytes,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexPart {
|
||||
/// When adding or modifying any parts of `IndexPart`, increment the version so that it can be
|
||||
/// used to understand later versions.
|
||||
@@ -100,23 +201,17 @@ impl IndexPart {
|
||||
pub const FILE_NAME: &'static str = "index_part.json";
|
||||
|
||||
pub fn new(
|
||||
layers_and_metadata: HashMap<RemotePath, LayerFileMetadata>,
|
||||
layers_and_metadata: HashMap<LayerFileName, LayerFileMetadata>,
|
||||
disk_consistent_lsn: Lsn,
|
||||
metadata_bytes: Vec<u8>,
|
||||
) -> Self {
|
||||
let mut timeline_layers = HashSet::with_capacity(layers_and_metadata.len());
|
||||
let mut layer_metadata = HashMap::with_capacity(layers_and_metadata.len());
|
||||
|
||||
for (remote_path, metadata) in &layers_and_metadata {
|
||||
for (remote_name, metadata) in &layers_and_metadata {
|
||||
timeline_layers.insert(remote_name.to_owned());
|
||||
let metadata = IndexLayerMetadata::from(metadata);
|
||||
match remote_path.object_name() {
|
||||
Some(layer_name) => {
|
||||
timeline_layers.insert(layer_name.to_owned());
|
||||
layer_metadata.insert(layer_name.to_owned(), metadata);
|
||||
}
|
||||
// TODO move this on a type level: we know, that every layer entry does have a name
|
||||
None => panic!("Layer {remote_path:?} has no file name, skipping"),
|
||||
}
|
||||
layer_metadata.insert(remote_name.to_owned(), metadata);
|
||||
}
|
||||
|
||||
Self {
|
||||
@@ -156,21 +251,22 @@ mod tests {
|
||||
fn v0_indexpart_is_parsed() {
|
||||
let example = r#"{
|
||||
"timeline_layers":["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"],
|
||||
"missing_layers":["not_a_real_layer_but_adding_coverage"],
|
||||
"missing_layers":["LAYER_FILE_NAME::test/not_a_real_layer_but_adding_coverage"],
|
||||
"disk_consistent_lsn":"0/16960E8",
|
||||
"metadata_bytes":[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
|
||||
}"#;
|
||||
|
||||
let expected = IndexPart {
|
||||
version: 0,
|
||||
timeline_layers: HashSet::from([String::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9")]),
|
||||
missing_layers: Some(HashSet::from([String::from("not_a_real_layer_but_adding_coverage")])),
|
||||
timeline_layers: HashSet::from(["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap()]),
|
||||
missing_layers: None, // disabled fields should not carry unused values further
|
||||
layer_metadata: HashMap::default(),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata_bytes: [113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
|
||||
};
|
||||
|
||||
let part = serde_json::from_str::<IndexPart>(example).unwrap();
|
||||
let part: IndexPartUnclean = serde_json::from_str(example).unwrap();
|
||||
let part = part.remove_unclean_layer_file_names();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
@@ -179,10 +275,10 @@ mod tests {
|
||||
let example = r#"{
|
||||
"version":1,
|
||||
"timeline_layers":["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"],
|
||||
"missing_layers":["not_a_real_layer_but_adding_coverage"],
|
||||
"missing_layers":["LAYER_FILE_NAME::test/not_a_real_layer_but_adding_coverage"],
|
||||
"layer_metadata":{
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
|
||||
"not_a_real_layer_but_adding_coverage": { "file_size": 9007199254741001 }
|
||||
"LAYER_FILE_NAME::test/not_a_real_layer_but_adding_coverage": { "file_size": 9007199254741001 }
|
||||
},
|
||||
"disk_consistent_lsn":"0/16960E8",
|
||||
"metadata_bytes":[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
|
||||
@@ -191,13 +287,13 @@ mod tests {
|
||||
let expected = IndexPart {
|
||||
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
|
||||
version: 1,
|
||||
timeline_layers: HashSet::from([String::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9")]),
|
||||
missing_layers: Some(HashSet::from([String::from("not_a_real_layer_but_adding_coverage")])),
|
||||
timeline_layers: HashSet::from(["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap()]),
|
||||
missing_layers: None,
|
||||
layer_metadata: HashMap::from([
|
||||
(String::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"), IndexLayerMetadata {
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: Some(25600000),
|
||||
}),
|
||||
(String::from("not_a_real_layer_but_adding_coverage"), IndexLayerMetadata {
|
||||
(LayerFileName::new_test("not_a_real_layer_but_adding_coverage"), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: Some(9007199254741001),
|
||||
@@ -207,7 +303,9 @@ mod tests {
|
||||
metadata_bytes: [113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
|
||||
};
|
||||
|
||||
let part = serde_json::from_str::<IndexPart>(example).unwrap();
|
||||
let part = serde_json::from_str::<IndexPartUnclean>(example)
|
||||
.unwrap()
|
||||
.remove_unclean_layer_file_names();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
@@ -218,7 +316,7 @@ mod tests {
|
||||
"timeline_layers":["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"],
|
||||
"layer_metadata":{
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
|
||||
"not_a_real_layer_but_adding_coverage": { "file_size": 9007199254741001 }
|
||||
"LAYER_FILE_NAME::test/not_a_real_layer_but_adding_coverage": { "file_size": 9007199254741001 }
|
||||
},
|
||||
"disk_consistent_lsn":"0/16960E8",
|
||||
"metadata_bytes":[112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
|
||||
@@ -227,29 +325,24 @@ mod tests {
|
||||
let expected = IndexPart {
|
||||
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
|
||||
version: 1,
|
||||
timeline_layers: HashSet::from(["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".to_string()]),
|
||||
timeline_layers: HashSet::from(["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap()]),
|
||||
layer_metadata: HashMap::from([
|
||||
(
|
||||
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".to_string(),
|
||||
IndexLayerMetadata {
|
||||
file_size: Some(25600000),
|
||||
}
|
||||
),
|
||||
(
|
||||
"not_a_real_layer_but_adding_coverage".to_string(),
|
||||
IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: Some(9007199254741001),
|
||||
}
|
||||
)
|
||||
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), IndexLayerMetadata {
|
||||
file_size: Some(25600000),
|
||||
}),
|
||||
(LayerFileName::new_test("not_a_real_layer_but_adding_coverage"), IndexLayerMetadata {
|
||||
// serde_json should always parse this but this might be a double with jq for
|
||||
// example.
|
||||
file_size: Some(9007199254741001),
|
||||
})
|
||||
]),
|
||||
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
|
||||
metadata_bytes: [112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
|
||||
missing_layers: None,
|
||||
};
|
||||
|
||||
let part = serde_json::from_str::<IndexPart>(example).unwrap();
|
||||
let part = serde_json::from_str::<IndexPartUnclean>(example).unwrap();
|
||||
let part = part.remove_unclean_layer_file_names();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,6 +57,7 @@ use crate::storage_sync::RemoteTimelineClient;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::metadata::load_metadata;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use crate::tenant_config::TenantConfOpt;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
@@ -89,8 +90,6 @@ mod timeline;
|
||||
|
||||
pub mod size;
|
||||
|
||||
use storage_layer::Layer;
|
||||
|
||||
pub use timeline::Timeline;
|
||||
|
||||
// re-export this function so that page_cache.rs can use it.
|
||||
|
||||
@@ -60,7 +60,7 @@ where
|
||||
///
|
||||
/// ```no_run
|
||||
/// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
|
||||
/// # let reader: FileBlockReader<std::fs::File> = todo!();
|
||||
/// # let reader: FileBlockReader<std::fs::File> = unimplemented!("stub");
|
||||
/// let cursor = reader.block_cursor();
|
||||
/// let buf = cursor.read_blk(1);
|
||||
/// // do stuff with 'buf'
|
||||
|
||||
@@ -30,7 +30,9 @@ use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::filename::{DeltaFileName, PathOrConf};
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::storage_layer::{
|
||||
PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
@@ -52,6 +54,9 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::filename::LayerFileName;
|
||||
use super::storage_layer::Layer;
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
///
|
||||
@@ -194,14 +199,6 @@ pub struct DeltaLayerInner {
|
||||
}
|
||||
|
||||
impl Layer for DeltaLayer {
|
||||
fn get_tenant_id(&self) -> TenantId {
|
||||
self.tenant_id
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> TimelineId {
|
||||
self.timeline_id
|
||||
}
|
||||
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.key_range.clone()
|
||||
}
|
||||
@@ -209,13 +206,86 @@ impl Layer for DeltaLayer {
|
||||
fn get_lsn_range(&self) -> Range<Lsn> {
|
||||
self.lsn_range.clone()
|
||||
}
|
||||
|
||||
fn filename(&self) -> PathBuf {
|
||||
PathBuf::from(self.layer_name().to_string())
|
||||
fn is_incremental(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn local_path(&self) -> Option<PathBuf> {
|
||||
Some(self.path())
|
||||
fn short_id(&self) -> String {
|
||||
self.filename().file_name()
|
||||
}
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool) -> Result<()> {
|
||||
println!(
|
||||
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.key_range.start,
|
||||
self.key_range.end,
|
||||
self.lsn_range.start,
|
||||
self.lsn_range.end
|
||||
);
|
||||
|
||||
if !verbose {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let inner = self.load()?;
|
||||
|
||||
println!(
|
||||
"index_start_blk: {}, root {}",
|
||||
inner.index_start_blk, inner.index_root_blk
|
||||
);
|
||||
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
file,
|
||||
);
|
||||
|
||||
tree_reader.dump()?;
|
||||
|
||||
let mut cursor = file.block_cursor();
|
||||
|
||||
// A subroutine to dump a single blob
|
||||
let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result<String> {
|
||||
let buf = cursor.read_blob(blob_ref.pos())?;
|
||||
let val = Value::des(&buf)?;
|
||||
let desc = match val {
|
||||
Value::Image(img) => {
|
||||
format!(" img {} bytes", img.len())
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec)?;
|
||||
format!(
|
||||
" rec {} bytes will_init: {} {}",
|
||||
buf.len(),
|
||||
rec.will_init(),
|
||||
wal_desc
|
||||
)
|
||||
}
|
||||
};
|
||||
Ok(desc)
|
||||
};
|
||||
|
||||
tree_reader.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
|delta_key, val| {
|
||||
let blob_ref = BlobRef(val);
|
||||
let key = DeltaKey::extract_key_from_buf(delta_key);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(delta_key);
|
||||
|
||||
let desc = match dump_blob(blob_ref) {
|
||||
Ok(desc) => desc,
|
||||
Err(err) => format!("ERROR: {}", err),
|
||||
};
|
||||
println!(" key {} at {}: {}", key, lsn, desc);
|
||||
true
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_value_reconstruct_data(
|
||||
@@ -302,6 +372,24 @@ impl Layer for DeltaLayer {
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for DeltaLayer {
|
||||
fn get_tenant_id(&self) -> TenantId {
|
||||
self.tenant_id
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> TimelineId {
|
||||
self.timeline_id
|
||||
}
|
||||
|
||||
fn filename(&self) -> LayerFileName {
|
||||
self.layer_name().into()
|
||||
}
|
||||
|
||||
fn local_path(&self) -> PathBuf {
|
||||
self.path()
|
||||
}
|
||||
|
||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = anyhow::Result<(Key, Lsn, Value)>> + 'a> {
|
||||
let inner = match self.load() {
|
||||
@@ -332,89 +420,6 @@ impl Layer for DeltaLayer {
|
||||
fs::remove_file(self.path())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_incremental(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn is_in_memory(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool) -> Result<()> {
|
||||
println!(
|
||||
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.key_range.start,
|
||||
self.key_range.end,
|
||||
self.lsn_range.start,
|
||||
self.lsn_range.end
|
||||
);
|
||||
|
||||
if !verbose {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let inner = self.load()?;
|
||||
|
||||
println!(
|
||||
"index_start_blk: {}, root {}",
|
||||
inner.index_start_blk, inner.index_root_blk
|
||||
);
|
||||
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
inner.index_start_blk,
|
||||
inner.index_root_blk,
|
||||
file,
|
||||
);
|
||||
|
||||
tree_reader.dump()?;
|
||||
|
||||
let mut cursor = file.block_cursor();
|
||||
|
||||
// A subroutine to dump a single blob
|
||||
let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result<String> {
|
||||
let buf = cursor.read_blob(blob_ref.pos())?;
|
||||
let val = Value::des(&buf)?;
|
||||
let desc = match val {
|
||||
Value::Image(img) => {
|
||||
format!(" img {} bytes", img.len())
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let wal_desc = walrecord::describe_wal_record(&rec)?;
|
||||
format!(
|
||||
" rec {} bytes will_init: {} {}",
|
||||
buf.len(),
|
||||
rec.will_init(),
|
||||
wal_desc
|
||||
)
|
||||
}
|
||||
};
|
||||
Ok(desc)
|
||||
};
|
||||
|
||||
tree_reader.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
|delta_key, val| {
|
||||
let blob_ref = BlobRef(val);
|
||||
let key = DeltaKey::extract_key_from_buf(delta_key);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(delta_key);
|
||||
|
||||
let desc = match dump_blob(blob_ref) {
|
||||
Ok(desc) => desc,
|
||||
Err(err) => format!("ERROR: {}", err),
|
||||
};
|
||||
println!(" key {} at {}: {}", key, lsn, desc);
|
||||
true
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayer {
|
||||
@@ -511,8 +516,8 @@ impl DeltaLayer {
|
||||
}
|
||||
}
|
||||
PathOrConf::Path(path) => {
|
||||
let actual_filename = Path::new(path.file_name().unwrap());
|
||||
let expected_filename = self.filename();
|
||||
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
|
||||
let expected_filename = self.filename().file_name();
|
||||
|
||||
if actual_filename != expected_filename {
|
||||
println!(
|
||||
|
||||
@@ -7,11 +7,12 @@ use std::cmp::Ordering;
|
||||
use std::fmt;
|
||||
use std::ops::Range;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
// Note: Timeline::load_layer_map() relies on this sort order
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
|
||||
pub struct DeltaFileName {
|
||||
pub key_range: Range<Key>,
|
||||
pub lsn_range: Range<Lsn>,
|
||||
@@ -101,7 +102,7 @@ impl fmt::Display for DeltaFileName {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
|
||||
pub struct ImageFileName {
|
||||
pub key_range: Range<Key>,
|
||||
pub lsn: Lsn,
|
||||
@@ -172,6 +173,103 @@ impl fmt::Display for ImageFileName {
|
||||
)
|
||||
}
|
||||
}
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
|
||||
pub enum LayerFileName {
|
||||
Image(ImageFileName),
|
||||
Delta(DeltaFileName),
|
||||
#[cfg(test)]
|
||||
Test(String),
|
||||
}
|
||||
|
||||
impl LayerFileName {
|
||||
pub fn file_name(&self) -> String {
|
||||
match self {
|
||||
LayerFileName::Image(fname) => format!("{fname}"),
|
||||
LayerFileName::Delta(fname) => format!("{fname}"),
|
||||
#[cfg(test)]
|
||||
LayerFileName::Test(fname) => fname.to_string(),
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
pub(crate) fn new_test(name: &str) -> LayerFileName {
|
||||
LayerFileName::Test(name.to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ImageFileName> for LayerFileName {
|
||||
fn from(fname: ImageFileName) -> Self {
|
||||
LayerFileName::Image(fname)
|
||||
}
|
||||
}
|
||||
impl From<DeltaFileName> for LayerFileName {
|
||||
fn from(fname: DeltaFileName) -> Self {
|
||||
LayerFileName::Delta(fname)
|
||||
}
|
||||
}
|
||||
|
||||
// include a `/` in the name as an additional layer of robustness
|
||||
// because `/` chars are not allowed in UNIX paths
|
||||
#[cfg(test)]
|
||||
const LAYER_FILE_NAME_TEST_PREFIX: &str = "LAYER_FILE_NAME::test/";
|
||||
|
||||
impl FromStr for LayerFileName {
|
||||
type Err = String;
|
||||
|
||||
fn from_str(value: &str) -> Result<Self, Self::Err> {
|
||||
#[cfg(test)]
|
||||
if let Some(value) = value.strip_prefix(LAYER_FILE_NAME_TEST_PREFIX) {
|
||||
return Ok(LayerFileName::Test(value.to_owned()));
|
||||
}
|
||||
let delta = DeltaFileName::parse_str(value);
|
||||
let image = ImageFileName::parse_str(value);
|
||||
let ok = match (delta, image) {
|
||||
(None, None) => {
|
||||
return Err(format!(
|
||||
"neither delta nor image layer file name: {value:?}"
|
||||
))
|
||||
}
|
||||
(Some(delta), None) => LayerFileName::Delta(delta),
|
||||
(None, Some(image)) => LayerFileName::Image(image),
|
||||
(Some(_), Some(_)) => unreachable!(),
|
||||
};
|
||||
Ok(ok)
|
||||
}
|
||||
}
|
||||
|
||||
impl serde::Serialize for LayerFileName {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
match self {
|
||||
LayerFileName::Image(fname) => serializer.serialize_str(&format!("{}", fname)),
|
||||
LayerFileName::Delta(fname) => serializer.serialize_str(&format!("{}", fname)),
|
||||
#[cfg(test)]
|
||||
LayerFileName::Test(t) => {
|
||||
serializer.serialize_str(&format!("{LAYER_FILE_NAME_TEST_PREFIX}{t}"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct LayerFileNameVisitor;
|
||||
|
||||
impl<'de> serde::de::Visitor<'de> for LayerFileNameVisitor {
|
||||
type Value = LayerFileName;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(
|
||||
formatter,
|
||||
"a string that is a valid image or delta layer file name"
|
||||
)
|
||||
}
|
||||
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
v.parse().map_err(|e| E::custom(e))
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper enum to hold a PageServerConf, or a path
|
||||
///
|
||||
|
||||
@@ -26,7 +26,9 @@ use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::filename::{ImageFileName, PathOrConf};
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::storage_layer::{
|
||||
PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
@@ -48,6 +50,9 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::filename::LayerFileName;
|
||||
use super::storage_layer::Layer;
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
///
|
||||
@@ -120,22 +125,6 @@ pub struct ImageLayerInner {
|
||||
}
|
||||
|
||||
impl Layer for ImageLayer {
|
||||
fn filename(&self) -> PathBuf {
|
||||
PathBuf::from(self.layer_name().to_string())
|
||||
}
|
||||
|
||||
fn local_path(&self) -> Option<PathBuf> {
|
||||
Some(self.path())
|
||||
}
|
||||
|
||||
fn get_tenant_id(&self) -> TenantId {
|
||||
self.tenant_id
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> TimelineId {
|
||||
self.timeline_id
|
||||
}
|
||||
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.key_range.clone()
|
||||
}
|
||||
@@ -144,58 +133,12 @@ impl Layer for ImageLayer {
|
||||
// End-bound is exclusive
|
||||
self.lsn..(self.lsn + 1)
|
||||
}
|
||||
|
||||
/// Look up given page in the file
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
assert!(self.key_range.contains(&key));
|
||||
assert!(lsn_range.start >= self.lsn);
|
||||
assert!(lsn_range.end >= self.lsn);
|
||||
|
||||
let inner = self.load()?;
|
||||
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
if let Some(offset) = tree_reader.get(&keybuf)? {
|
||||
let blob = file.block_cursor().read_blob(offset).with_context(|| {
|
||||
format!(
|
||||
"failed to read value from data file {} at offset {}",
|
||||
self.filename().display(),
|
||||
offset
|
||||
)
|
||||
})?;
|
||||
let value = Bytes::from(blob);
|
||||
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Missing)
|
||||
}
|
||||
}
|
||||
|
||||
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>>> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
// delete underlying file
|
||||
fs::remove_file(self.path())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_incremental(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn is_in_memory(&self) -> bool {
|
||||
false
|
||||
fn short_id(&self) -> String {
|
||||
self.filename().file_name()
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
@@ -223,6 +166,68 @@ impl Layer for ImageLayer {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Look up given page in the file
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
assert!(self.key_range.contains(&key));
|
||||
assert!(lsn_range.start >= self.lsn);
|
||||
assert!(lsn_range.end >= self.lsn);
|
||||
|
||||
let inner = self.load()?;
|
||||
|
||||
let file = inner.file.as_ref().unwrap();
|
||||
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
if let Some(offset) = tree_reader.get(&keybuf)? {
|
||||
let blob = file.block_cursor().read_blob(offset).with_context(|| {
|
||||
format!(
|
||||
"failed to read value from data file {} at offset {}",
|
||||
self.path().display(),
|
||||
offset
|
||||
)
|
||||
})?;
|
||||
let value = Bytes::from(blob);
|
||||
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Missing)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for ImageLayer {
|
||||
fn filename(&self) -> LayerFileName {
|
||||
self.layer_name().into()
|
||||
}
|
||||
|
||||
fn local_path(&self) -> PathBuf {
|
||||
self.path()
|
||||
}
|
||||
|
||||
fn get_tenant_id(&self) -> TenantId {
|
||||
self.tenant_id
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> TimelineId {
|
||||
self.timeline_id
|
||||
}
|
||||
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>>> {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
// delete underlying file
|
||||
fs::remove_file(self.path())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayer {
|
||||
@@ -314,8 +319,8 @@ impl ImageLayer {
|
||||
}
|
||||
}
|
||||
PathOrConf::Path(path) => {
|
||||
let actual_filename = Path::new(path.file_name().unwrap());
|
||||
let expected_filename = self.filename();
|
||||
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
|
||||
let expected_filename = self.filename().file_name();
|
||||
|
||||
if actual_filename != expected_filename {
|
||||
println!(
|
||||
|
||||
@@ -10,9 +10,9 @@ use crate::tenant::blob_io::{BlobCursor, BlobWriter};
|
||||
use crate::tenant::block_io::BlockReader;
|
||||
use crate::tenant::delta_layer::{DeltaLayer, DeltaLayerWriter};
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
|
||||
use crate::walrecord;
|
||||
use anyhow::{bail, ensure, Result};
|
||||
use anyhow::{ensure, Result};
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use tracing::*;
|
||||
@@ -26,9 +26,10 @@ use utils::{
|
||||
// while being able to use std::fmt::Write's methods
|
||||
use std::fmt::Write as _;
|
||||
use std::ops::Range;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::RwLock;
|
||||
|
||||
use super::storage_layer::Layer;
|
||||
|
||||
thread_local! {
|
||||
/// A buffer for serializing object during [`InMemoryLayer::put_value`].
|
||||
/// This buffer is reused for each serialization to avoid additional malloc calls.
|
||||
@@ -75,33 +76,13 @@ impl InMemoryLayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer for InMemoryLayer {
|
||||
// An in-memory layer can be spilled to disk into ephemeral file,
|
||||
// This function is used only for debugging, so we don't need to be very precise.
|
||||
// Construct a filename as if it was a delta layer.
|
||||
fn filename(&self) -> PathBuf {
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let end_lsn = inner.end_lsn.unwrap_or(Lsn(u64::MAX));
|
||||
|
||||
PathBuf::from(format!(
|
||||
"inmem-{:016X}-{:016X}",
|
||||
self.start_lsn.0, end_lsn.0
|
||||
))
|
||||
}
|
||||
|
||||
fn local_path(&self) -> Option<PathBuf> {
|
||||
None
|
||||
}
|
||||
|
||||
fn get_tenant_id(&self) -> TenantId {
|
||||
self.tenant_id
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> TimelineId {
|
||||
impl InMemoryLayer {
|
||||
pub fn get_timeline_id(&self) -> TimelineId {
|
||||
self.timeline_id
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer for InMemoryLayer {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
Key::MIN..Key::MAX
|
||||
}
|
||||
@@ -116,73 +97,16 @@ impl Layer for InMemoryLayer {
|
||||
};
|
||||
self.start_lsn..end_lsn
|
||||
}
|
||||
|
||||
/// Look up given value in the layer.
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
ensure!(lsn_range.start >= self.start_lsn);
|
||||
let mut need_image = true;
|
||||
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let mut reader = inner.file.block_cursor();
|
||||
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
if let Some(vec_map) = inner.index.get(&key) {
|
||||
let slice = vec_map.slice_range(lsn_range);
|
||||
for (entry_lsn, pos) in slice.iter().rev() {
|
||||
let buf = reader.read_blob(*pos)?;
|
||||
let value = Value::des(&buf)?;
|
||||
match value {
|
||||
Value::Image(img) => {
|
||||
reconstruct_state.img = Some((*entry_lsn, img));
|
||||
return Ok(ValueReconstructResult::Complete);
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((*entry_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// release lock on 'inner'
|
||||
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know.
|
||||
if need_image {
|
||||
Ok(ValueReconstructResult::Continue)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
}
|
||||
}
|
||||
|
||||
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>>> {
|
||||
todo!();
|
||||
}
|
||||
|
||||
/// Nothing to do here. When you drop the last reference to the layer, it will
|
||||
/// be deallocated.
|
||||
fn delete(&self) -> Result<()> {
|
||||
bail!("can't delete an InMemoryLayer")
|
||||
}
|
||||
|
||||
fn is_incremental(&self) -> bool {
|
||||
// in-memory layer is always considered incremental.
|
||||
true
|
||||
}
|
||||
|
||||
fn is_in_memory(&self) -> bool {
|
||||
true
|
||||
fn short_id(&self) -> String {
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let end_lsn = inner.end_lsn.unwrap_or(Lsn(u64::MAX));
|
||||
format!("inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
@@ -235,6 +159,55 @@ impl Layer for InMemoryLayer {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Look up given value in the layer.
|
||||
fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
) -> anyhow::Result<ValueReconstructResult> {
|
||||
ensure!(lsn_range.start >= self.start_lsn);
|
||||
let mut need_image = true;
|
||||
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let mut reader = inner.file.block_cursor();
|
||||
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
if let Some(vec_map) = inner.index.get(&key) {
|
||||
let slice = vec_map.slice_range(lsn_range);
|
||||
for (entry_lsn, pos) in slice.iter().rev() {
|
||||
let buf = reader.read_blob(*pos)?;
|
||||
let value = Value::des(&buf)?;
|
||||
match value {
|
||||
Value::Image(img) => {
|
||||
reconstruct_state.img = Some((*entry_lsn, img));
|
||||
return Ok(ValueReconstructResult::Complete);
|
||||
}
|
||||
Value::WalRecord(rec) => {
|
||||
let will_init = rec.will_init();
|
||||
reconstruct_state.records.push((*entry_lsn, rec));
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// release lock on 'inner'
|
||||
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know.
|
||||
if need_image {
|
||||
Ok(ValueReconstructResult::Continue)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemoryLayer {
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
use crate::metrics::NUM_ONDISK_LAYERS;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::inmemory_layer::InMemoryLayer;
|
||||
use crate::tenant::storage_layer::Layer;
|
||||
use crate::tenant::storage_layer::{range_eq, range_overlaps};
|
||||
use amplify_num::i256;
|
||||
use anyhow::Result;
|
||||
@@ -28,11 +27,12 @@ use std::sync::Arc;
|
||||
use tracing::*;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use super::storage_layer::Layer;
|
||||
|
||||
///
|
||||
/// LayerMap tracks what layers exist on a timeline.
|
||||
///
|
||||
#[derive(Default)]
|
||||
pub struct LayerMap {
|
||||
pub struct LayerMap<L: ?Sized> {
|
||||
//
|
||||
// 'open_layer' holds the current InMemoryLayer that is accepting new
|
||||
// records. If it is None, 'next_open_layer_at' will be set instead, indicating
|
||||
@@ -53,15 +53,27 @@ pub struct LayerMap {
|
||||
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
|
||||
|
||||
/// All the historic layers are kept here
|
||||
historic_layers: RTree<LayerRTreeObject>,
|
||||
historic_layers: RTree<LayerRTreeObject<L>>,
|
||||
|
||||
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
|
||||
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
|
||||
l0_delta_layers: Vec<Arc<dyn Layer>>,
|
||||
l0_delta_layers: Vec<Arc<L>>,
|
||||
}
|
||||
|
||||
struct LayerRTreeObject {
|
||||
layer: Arc<dyn Layer>,
|
||||
impl<L: ?Sized> Default for LayerMap<L> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
open_layer: None,
|
||||
next_open_layer_at: None,
|
||||
frozen_layers: VecDeque::default(),
|
||||
historic_layers: RTree::default(),
|
||||
l0_delta_layers: Vec::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct LayerRTreeObject<L: ?Sized> {
|
||||
layer: Arc<L>,
|
||||
|
||||
envelope: AABB<[IntKey; 2]>,
|
||||
}
|
||||
@@ -185,7 +197,7 @@ impl Num for IntKey {
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for LayerRTreeObject {
|
||||
impl<T: ?Sized> PartialEq for LayerRTreeObject<T> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
// FIXME: ptr_eq might fail to return true for 'dyn'
|
||||
// references. Clippy complains about this. In practice it
|
||||
@@ -196,15 +208,21 @@ impl PartialEq for LayerRTreeObject {
|
||||
}
|
||||
}
|
||||
|
||||
impl RTreeObject for LayerRTreeObject {
|
||||
impl<L> RTreeObject for LayerRTreeObject<L>
|
||||
where
|
||||
L: ?Sized,
|
||||
{
|
||||
type Envelope = AABB<[IntKey; 2]>;
|
||||
fn envelope(&self) -> Self::Envelope {
|
||||
self.envelope
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerRTreeObject {
|
||||
fn new(layer: Arc<dyn Layer>) -> Self {
|
||||
impl<L> LayerRTreeObject<L>
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
fn new(layer: Arc<L>) -> Self {
|
||||
let key_range = layer.get_key_range();
|
||||
let lsn_range = layer.get_lsn_range();
|
||||
|
||||
@@ -223,12 +241,15 @@ impl LayerRTreeObject {
|
||||
}
|
||||
|
||||
/// Return value of LayerMap::search
|
||||
pub struct SearchResult {
|
||||
pub layer: Arc<dyn Layer>,
|
||||
pub struct SearchResult<L: ?Sized> {
|
||||
pub layer: Arc<L>,
|
||||
pub lsn_floor: Lsn,
|
||||
}
|
||||
|
||||
impl LayerMap {
|
||||
impl<L> LayerMap<L>
|
||||
where
|
||||
L: ?Sized + Layer,
|
||||
{
|
||||
///
|
||||
/// Find the latest layer that covers the given 'key', with lsn <
|
||||
/// 'end_lsn'.
|
||||
@@ -240,10 +261,10 @@ impl LayerMap {
|
||||
/// contain the version, even if it's missing from the returned
|
||||
/// layer.
|
||||
///
|
||||
pub fn search(&self, key: Key, end_lsn: Lsn) -> Result<Option<SearchResult>> {
|
||||
pub fn search(&self, key: Key, end_lsn: Lsn) -> Result<Option<SearchResult<L>>> {
|
||||
// linear search
|
||||
// Find the latest image layer that covers the given key
|
||||
let mut latest_img: Option<Arc<dyn Layer>> = None;
|
||||
let mut latest_img: Option<Arc<L>> = None;
|
||||
let mut latest_img_lsn: Option<Lsn> = None;
|
||||
let envelope = AABB::from_corners(
|
||||
[IntKey::from(key.to_i128()), IntKey::from(0i128)],
|
||||
@@ -277,7 +298,7 @@ impl LayerMap {
|
||||
}
|
||||
|
||||
// Search the delta layers
|
||||
let mut latest_delta: Option<Arc<dyn Layer>> = None;
|
||||
let mut latest_delta: Option<Arc<L>> = None;
|
||||
for e in self
|
||||
.historic_layers
|
||||
.locate_in_envelope_intersecting(&envelope)
|
||||
@@ -301,7 +322,7 @@ impl LayerMap {
|
||||
// No need to search any further
|
||||
trace!(
|
||||
"found layer {} for request on {key} at {end_lsn}",
|
||||
l.filename().display(),
|
||||
l.short_id(),
|
||||
);
|
||||
latest_delta.replace(Arc::clone(l));
|
||||
break;
|
||||
@@ -319,7 +340,7 @@ impl LayerMap {
|
||||
if let Some(l) = latest_delta {
|
||||
trace!(
|
||||
"found (old) layer {} for request on {key} at {end_lsn}",
|
||||
l.filename().display(),
|
||||
l.short_id(),
|
||||
);
|
||||
let lsn_floor = std::cmp::max(
|
||||
Lsn(latest_img_lsn.unwrap_or(Lsn(0)).0 + 1),
|
||||
@@ -344,7 +365,7 @@ impl LayerMap {
|
||||
///
|
||||
/// Insert an on-disk layer
|
||||
///
|
||||
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) {
|
||||
pub fn insert_historic(&mut self, layer: Arc<L>) {
|
||||
if layer.get_key_range() == (Key::MIN..Key::MAX) {
|
||||
self.l0_delta_layers.push(layer.clone());
|
||||
}
|
||||
@@ -357,7 +378,7 @@ impl LayerMap {
|
||||
///
|
||||
/// This should be called when the corresponding file on disk has been deleted.
|
||||
///
|
||||
pub fn remove_historic(&mut self, layer: Arc<dyn Layer>) {
|
||||
pub fn remove_historic(&mut self, layer: Arc<L>) {
|
||||
if layer.get_key_range() == (Key::MIN..Key::MAX) {
|
||||
let len_before = self.l0_delta_layers.len();
|
||||
|
||||
@@ -426,13 +447,13 @@ impl LayerMap {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<dyn Layer>> {
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
|
||||
self.historic_layers.iter().map(|e| e.layer.clone())
|
||||
}
|
||||
|
||||
/// Find the last image layer that covers 'key', ignoring any image layers
|
||||
/// newer than 'lsn'.
|
||||
fn find_latest_image(&self, key: Key, lsn: Lsn) -> Option<Arc<dyn Layer>> {
|
||||
fn find_latest_image(&self, key: Key, lsn: Lsn) -> Option<Arc<L>> {
|
||||
let mut candidate_lsn = Lsn(0);
|
||||
let mut candidate = None;
|
||||
let envelope = AABB::from_corners(
|
||||
@@ -474,7 +495,7 @@ impl LayerMap {
|
||||
&self,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
) -> Result<Vec<(Range<Key>, Option<Arc<dyn Layer>>)>> {
|
||||
) -> Result<Vec<(Range<Key>, Option<Arc<L>>)>> {
|
||||
let mut points = vec![key_range.start];
|
||||
let envelope = AABB::from_corners(
|
||||
[IntKey::from(key_range.start.to_i128()), IntKey::from(0)],
|
||||
@@ -559,7 +580,7 @@ impl LayerMap {
|
||||
}
|
||||
|
||||
/// Return all L0 delta layers
|
||||
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<dyn Layer>>> {
|
||||
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<L>>> {
|
||||
Ok(self.l0_delta_layers.clone())
|
||||
}
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::filename::LayerFileName;
|
||||
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
|
||||
where
|
||||
T: PartialOrd<T>,
|
||||
@@ -69,26 +70,9 @@ pub enum ValueReconstructResult {
|
||||
Missing,
|
||||
}
|
||||
|
||||
/// A Layer contains all data in a "rectangle" consisting of a range of keys and
|
||||
/// range of LSNs.
|
||||
///
|
||||
/// There are two kinds of layers, in-memory and on-disk layers. In-memory
|
||||
/// layers are used to ingest incoming WAL, and provide fast access to the
|
||||
/// recent page versions. On-disk layers are stored as files on disk, and are
|
||||
/// immutable. This trait presents the common functionality of in-memory and
|
||||
/// on-disk layers.
|
||||
///
|
||||
/// Furthermore, there are two kinds of on-disk layers: delta and image layers.
|
||||
/// A delta layer contains all modifications within a range of LSNs and keys.
|
||||
/// An image layer is a snapshot of all the data in a key-range, at a single
|
||||
/// LSN
|
||||
///
|
||||
/// Supertrait of the [`Layer`] trait that captures the bare minimum interface
|
||||
/// required by [`LayerMap`].
|
||||
pub trait Layer: Send + Sync {
|
||||
fn get_tenant_id(&self) -> TenantId;
|
||||
|
||||
/// Identify the timeline this layer belongs to
|
||||
fn get_timeline_id(&self) -> TimelineId;
|
||||
|
||||
/// Range of keys that this layer covers
|
||||
fn get_key_range(&self) -> Range<Key>;
|
||||
|
||||
@@ -100,13 +84,11 @@ pub trait Layer: Send + Sync {
|
||||
/// - An image layer represents snapshot at one LSN, so end_lsn is always the snapshot LSN + 1
|
||||
fn get_lsn_range(&self) -> Range<Lsn>;
|
||||
|
||||
/// Filename used to store this layer on disk. (Even in-memory layers
|
||||
/// implement this, to print a handy unique identifier for the layer for
|
||||
/// log messages, even though they're never not on disk.)
|
||||
fn filename(&self) -> PathBuf;
|
||||
|
||||
/// If a layer has a corresponding file on a local filesystem, return its absolute path.
|
||||
fn local_path(&self) -> Option<PathBuf>;
|
||||
/// Does this layer only contain some data for the key-range (incremental),
|
||||
/// or does it contain a version of every page? This is important to know
|
||||
/// for garbage collecting old layers: an incremental layer depends on
|
||||
/// the previous non-incremental layer.
|
||||
fn is_incremental(&self) -> bool;
|
||||
|
||||
///
|
||||
/// Return data needed to reconstruct given page at LSN.
|
||||
@@ -127,14 +109,39 @@ pub trait Layer: Send + Sync {
|
||||
reconstruct_data: &mut ValueReconstructState,
|
||||
) -> Result<ValueReconstructResult>;
|
||||
|
||||
/// Does this layer only contain some data for the key-range (incremental),
|
||||
/// or does it contain a version of every page? This is important to know
|
||||
/// for garbage collecting old layers: an incremental layer depends on
|
||||
/// the previous non-incremental layer.
|
||||
fn is_incremental(&self) -> bool;
|
||||
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
|
||||
fn short_id(&self) -> String;
|
||||
|
||||
/// Returns true for layers that are represented in memory.
|
||||
fn is_in_memory(&self) -> bool;
|
||||
/// Dump summary of the contents of the layer to stdout
|
||||
fn dump(&self, verbose: bool) -> Result<()>;
|
||||
}
|
||||
|
||||
/// A Layer contains all data in a "rectangle" consisting of a range of keys and
|
||||
/// range of LSNs.
|
||||
///
|
||||
/// There are two kinds of layers, in-memory and on-disk layers. In-memory
|
||||
/// layers are used to ingest incoming WAL, and provide fast access to the
|
||||
/// recent page versions. On-disk layers are stored as files on disk, and are
|
||||
/// immutable. This trait presents the common functionality of in-memory and
|
||||
/// on-disk layers.
|
||||
///
|
||||
/// Furthermore, there are two kinds of on-disk layers: delta and image layers.
|
||||
/// A delta layer contains all modifications within a range of LSNs and keys.
|
||||
/// An image layer is a snapshot of all the data in a key-range, at a single
|
||||
/// LSN
|
||||
///
|
||||
pub trait PersistentLayer: Layer {
|
||||
fn get_tenant_id(&self) -> TenantId;
|
||||
|
||||
/// Identify the timeline this layer belongs to
|
||||
fn get_timeline_id(&self) -> TimelineId;
|
||||
|
||||
/// File name used for this layer, both in the pageserver's local filesystem
|
||||
/// state as well as in the remote storage.
|
||||
fn filename(&self) -> LayerFileName;
|
||||
|
||||
// Path to the layer file in the local filesystem.
|
||||
fn local_path(&self) -> PathBuf;
|
||||
|
||||
/// Iterate through all keys and values stored in the layer
|
||||
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + '_>;
|
||||
@@ -147,7 +154,4 @@ pub trait Layer: Send + Sync {
|
||||
|
||||
/// Permanently remove this layer from disk.
|
||||
fn delete(&self) -> Result<()>;
|
||||
|
||||
/// Dump summary of the contents of the layer to stdout
|
||||
fn dump(&self, verbose: bool) -> Result<()>;
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ use tokio::task::spawn_blocking;
|
||||
use tracing::*;
|
||||
|
||||
use std::cmp::{max, min, Ordering};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::ops::{Deref, Range};
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -30,7 +30,7 @@ use crate::tenant::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
metadata::{save_metadata, TimelineMetadata},
|
||||
par_fsync,
|
||||
storage_layer::{Layer, ValueReconstructResult, ValueReconstructState},
|
||||
storage_layer::{PersistentLayer, ValueReconstructResult, ValueReconstructState},
|
||||
};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
@@ -62,6 +62,9 @@ use crate::ZERO_PAGE;
|
||||
use crate::{is_temporary, task_mgr};
|
||||
use crate::{page_cache, storage_sync::index::LayerFileMetadata};
|
||||
|
||||
use super::filename::LayerFileName;
|
||||
use super::storage_layer::Layer;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
enum FlushLoopState {
|
||||
NotStarted,
|
||||
@@ -78,7 +81,7 @@ pub struct Timeline {
|
||||
|
||||
pub pg_version: u32,
|
||||
|
||||
pub layers: RwLock<LayerMap>,
|
||||
pub layers: RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
|
||||
last_freeze_at: AtomicLsn,
|
||||
// Atomic would be more appropriate here.
|
||||
@@ -927,7 +930,7 @@ impl Timeline {
|
||||
let layer =
|
||||
ImageLayer::new(self.conf, self.timeline_id, self.tenant_id, &imgfilename);
|
||||
|
||||
trace!("found layer {}", layer.filename().display());
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += layer.path().metadata()?.len();
|
||||
layers.insert_historic(Arc::new(layer));
|
||||
num_layers += 1;
|
||||
@@ -951,7 +954,7 @@ impl Timeline {
|
||||
let layer =
|
||||
DeltaLayer::new(self.conf, self.timeline_id, self.tenant_id, &deltafilename);
|
||||
|
||||
trace!("found layer {}", layer.filename().display());
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += layer.path().metadata()?.len();
|
||||
layers.insert_historic(Arc::new(layer));
|
||||
num_layers += 1;
|
||||
@@ -998,9 +1001,9 @@ impl Timeline {
|
||||
&self,
|
||||
index_part: &IndexPart,
|
||||
remote_client: &RemoteTimelineClient,
|
||||
local_layers: HashSet<PathBuf>,
|
||||
local_layers: HashMap<LayerFileName, Arc<dyn PersistentLayer>>,
|
||||
up_to_date_disk_consistent_lsn: Lsn,
|
||||
) -> anyhow::Result<HashSet<PathBuf>> {
|
||||
) -> anyhow::Result<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> {
|
||||
// Are we missing some files that are present in remote storage?
|
||||
// Download them now.
|
||||
// TODO Downloading many files this way is not efficient.
|
||||
@@ -1012,8 +1015,7 @@ impl Timeline {
|
||||
let mut local_only_layers = local_layers;
|
||||
let timeline_dir = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
|
||||
for remote_layer_name in &index_part.timeline_layers {
|
||||
let local_layer_path = timeline_dir.join(remote_layer_name);
|
||||
local_only_layers.remove(&local_layer_path);
|
||||
local_only_layers.remove(remote_layer_name);
|
||||
|
||||
let remote_layer_metadata = index_part
|
||||
.layer_metadata
|
||||
@@ -1021,10 +1023,7 @@ impl Timeline {
|
||||
.map(LayerFileMetadata::from)
|
||||
.unwrap_or(LayerFileMetadata::MISSING);
|
||||
|
||||
let remote_layer_path = self
|
||||
.conf
|
||||
.remote_path(&local_layer_path)
|
||||
.expect("local_layer_path received from the same conf that provided a workdir");
|
||||
let local_layer_path = timeline_dir.join(remote_layer_name.file_name());
|
||||
|
||||
if local_layer_path.exists() {
|
||||
let mut already_downloaded = true;
|
||||
@@ -1056,83 +1055,74 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
info!("remote layer {remote_layer_path:?} does not exist locally");
|
||||
info!("remote layer {remote_layer_name:?} does not exist locally");
|
||||
}
|
||||
|
||||
let layer_name = local_layer_path
|
||||
.file_name()
|
||||
.and_then(|os_str| os_str.to_str())
|
||||
.with_context(|| {
|
||||
format!("Layer file {local_layer_path:?} has no name in unicode")
|
||||
})?;
|
||||
if let Some(imgfilename) = ImageFileName::parse_str(layer_name) {
|
||||
if imgfilename.lsn > up_to_date_disk_consistent_lsn {
|
||||
warn!(
|
||||
match remote_layer_name {
|
||||
LayerFileName::Image(imgfilename) => {
|
||||
if imgfilename.lsn > up_to_date_disk_consistent_lsn {
|
||||
warn!(
|
||||
"found future image layer {} on timeline {} remote_consistent_lsn is {}",
|
||||
imgfilename, self.timeline_id, up_to_date_disk_consistent_lsn
|
||||
);
|
||||
continue;
|
||||
continue;
|
||||
}
|
||||
|
||||
trace!("downloading image file: {remote_layer_name:?}");
|
||||
let downloaded_size = remote_client
|
||||
.download_layer_file(remote_layer_name, &remote_layer_metadata)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("failed to download image layer {remote_layer_name:?}")
|
||||
})?;
|
||||
trace!("done");
|
||||
|
||||
let image_layer =
|
||||
ImageLayer::new(self.conf, self.timeline_id, self.tenant_id, imgfilename);
|
||||
|
||||
self.layers
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert_historic(Arc::new(image_layer));
|
||||
self.metrics
|
||||
.current_physical_size_gauge
|
||||
.add(downloaded_size);
|
||||
}
|
||||
|
||||
trace!("downloading image file: {remote_layer_path:?}");
|
||||
let downloaded_size = remote_client
|
||||
.download_layer_file(&remote_layer_path, &remote_layer_metadata)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("failed to download image layer from path {remote_layer_path:?}")
|
||||
})?;
|
||||
trace!("done");
|
||||
|
||||
let image_layer =
|
||||
ImageLayer::new(self.conf, self.timeline_id, self.tenant_id, &imgfilename);
|
||||
|
||||
self.layers
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert_historic(Arc::new(image_layer));
|
||||
self.metrics
|
||||
.current_physical_size_gauge
|
||||
.add(downloaded_size);
|
||||
} else if let Some(deltafilename) = DeltaFileName::parse_str(layer_name) {
|
||||
// Create a DeltaLayer struct for each delta file.
|
||||
// The end-LSN is exclusive, while disk_consistent_lsn is
|
||||
// inclusive. For example, if disk_consistent_lsn is 100, it is
|
||||
// OK for a delta layer to have end LSN 101, but if the end LSN
|
||||
// is 102, then it might not have been fully flushed to disk
|
||||
// before crash.
|
||||
if deltafilename.lsn_range.end > up_to_date_disk_consistent_lsn + 1 {
|
||||
warn!(
|
||||
LayerFileName::Delta(deltafilename) => {
|
||||
// Create a DeltaLayer struct for each delta file.
|
||||
// The end-LSN is exclusive, while disk_consistent_lsn is
|
||||
// inclusive. For example, if disk_consistent_lsn is 100, it is
|
||||
// OK for a delta layer to have end LSN 101, but if the end LSN
|
||||
// is 102, then it might not have been fully flushed to disk
|
||||
// before crash.
|
||||
if deltafilename.lsn_range.end > up_to_date_disk_consistent_lsn + 1 {
|
||||
warn!(
|
||||
"found future delta layer {} on timeline {} remote_consistent_lsn is {}",
|
||||
deltafilename, self.timeline_id, up_to_date_disk_consistent_lsn
|
||||
);
|
||||
continue;
|
||||
continue;
|
||||
}
|
||||
|
||||
trace!("downloading delta file: {remote_layer_name:?}");
|
||||
let sz = remote_client
|
||||
.download_layer_file(remote_layer_name, &remote_layer_metadata)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("failed to download delta layer {remote_layer_name:?}")
|
||||
})?;
|
||||
trace!("done");
|
||||
|
||||
let delta_layer =
|
||||
DeltaLayer::new(self.conf, self.timeline_id, self.tenant_id, deltafilename);
|
||||
|
||||
self.layers
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert_historic(Arc::new(delta_layer));
|
||||
self.metrics.current_physical_size_gauge.add(sz);
|
||||
}
|
||||
|
||||
trace!("downloading delta file: {remote_layer_path:?}");
|
||||
let sz = remote_client
|
||||
.download_layer_file(&remote_layer_path, &remote_layer_metadata)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("failed to download delta layer from path {remote_layer_path:?}")
|
||||
})?;
|
||||
trace!("done");
|
||||
|
||||
let delta_layer =
|
||||
DeltaLayer::new(self.conf, self.timeline_id, self.tenant_id, &deltafilename);
|
||||
|
||||
self.layers
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert_historic(Arc::new(delta_layer));
|
||||
self.metrics.current_physical_size_gauge.add(sz);
|
||||
} else if layer_name.ends_with(".old") {
|
||||
// For details see https://github.com/neondatabase/neon/issues/3024
|
||||
warn!(
|
||||
"got backup file on the remote storage, ignoring it {file}",
|
||||
file = layer_name
|
||||
)
|
||||
} else {
|
||||
bail!("unexpected layer filename {layer_name} in remote storage path: {remote_layer_path:?}");
|
||||
#[cfg(test)]
|
||||
LayerFileName::Test(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1169,18 +1159,13 @@ impl Timeline {
|
||||
|
||||
let disk_consistent_lsn = up_to_date_metadata.disk_consistent_lsn();
|
||||
|
||||
// Build a map of local layers for quick lookups
|
||||
let local_layers = self
|
||||
.layers
|
||||
.read()
|
||||
.unwrap()
|
||||
.iter_historic_layers()
|
||||
.map(|historic_layer| {
|
||||
historic_layer
|
||||
.local_path()
|
||||
.expect("Historic layers should have a path")
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
.map(|l| (l.filename(), l))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let local_only_layers = match index_part {
|
||||
Some(index_part) => {
|
||||
@@ -1189,6 +1174,7 @@ impl Timeline {
|
||||
index_part.timeline_layers.len()
|
||||
);
|
||||
remote_client.init_upload_queue(index_part)?;
|
||||
|
||||
self.download_missing(index_part, remote_client, local_layers, disk_consistent_lsn)
|
||||
.await?
|
||||
}
|
||||
@@ -1200,14 +1186,15 @@ impl Timeline {
|
||||
};
|
||||
|
||||
// Are there local files that don't exist remotely? Schedule uploads for them
|
||||
for layer_path in &local_only_layers {
|
||||
for (layer_name, layer) in &local_only_layers {
|
||||
let layer_path = layer.local_path();
|
||||
let layer_size = layer_path
|
||||
.metadata()
|
||||
.with_context(|| format!("failed to get file {layer_path:?} metadata"))?
|
||||
.len();
|
||||
info!("scheduling {layer_path:?} for upload");
|
||||
remote_client
|
||||
.schedule_layer_file_upload(layer_path, &LayerFileMetadata::new(layer_size))?;
|
||||
.schedule_layer_file_upload(layer_name, &LayerFileMetadata::new(layer_size))?;
|
||||
}
|
||||
if !local_only_layers.is_empty() {
|
||||
remote_client.schedule_index_upload(up_to_date_metadata)?;
|
||||
@@ -1322,7 +1309,36 @@ impl Timeline {
|
||||
Err(e) => error!("Failed to compute current logical size for metrics update: {e:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type TraversalId = String;
|
||||
|
||||
trait TraversalLayerExt {
|
||||
fn traversal_id(&self) -> TraversalId;
|
||||
}
|
||||
|
||||
impl TraversalLayerExt for Arc<dyn PersistentLayer> {
|
||||
fn traversal_id(&self) -> String {
|
||||
debug_assert!(
|
||||
self.local_path().to_str().unwrap()
|
||||
.contains(&format!("{}", self.get_timeline_id())),
|
||||
"need timeline ID to uniquely identify the layer when tranversal crosses ancestor boundary",
|
||||
);
|
||||
format!("{}", self.local_path().display())
|
||||
}
|
||||
}
|
||||
|
||||
impl TraversalLayerExt for Arc<InMemoryLayer> {
|
||||
fn traversal_id(&self) -> String {
|
||||
format!(
|
||||
"timeline {} in-memory {}",
|
||||
self.get_timeline_id(),
|
||||
self.short_id()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
///
|
||||
/// Get a handle to a Layer for reading.
|
||||
///
|
||||
@@ -1343,7 +1359,7 @@ impl Timeline {
|
||||
|
||||
// For debugging purposes, collect the path of layers that we traversed
|
||||
// through. It's included in the error message if we fail to find the key.
|
||||
let mut traversal_path: Vec<(ValueReconstructResult, Lsn, Arc<dyn Layer>)> = Vec::new();
|
||||
let mut traversal_path = Vec::<(ValueReconstructResult, Lsn, TraversalId)>::new();
|
||||
|
||||
let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
|
||||
*cached_lsn
|
||||
@@ -1425,7 +1441,7 @@ impl Timeline {
|
||||
reconstruct_state,
|
||||
)?;
|
||||
cont_lsn = lsn_floor;
|
||||
traversal_path.push((result, cont_lsn, open_layer.clone()));
|
||||
traversal_path.push((result, cont_lsn, open_layer.traversal_id()));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -1440,7 +1456,7 @@ impl Timeline {
|
||||
reconstruct_state,
|
||||
)?;
|
||||
cont_lsn = lsn_floor;
|
||||
traversal_path.push((result, cont_lsn, frozen_layer.clone()));
|
||||
traversal_path.push((result, cont_lsn, frozen_layer.traversal_id()));
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
@@ -1455,7 +1471,7 @@ impl Timeline {
|
||||
reconstruct_state,
|
||||
)?;
|
||||
cont_lsn = lsn_floor;
|
||||
traversal_path.push((result, cont_lsn, layer));
|
||||
traversal_path.push((result, cont_lsn, layer.traversal_id()));
|
||||
} else if timeline.ancestor_timeline.is_some() {
|
||||
// Nothing on this timeline. Traverse to parent
|
||||
result = ValueReconstructResult::Continue;
|
||||
@@ -1670,7 +1686,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Flush one frozen in-memory layer to disk, as a new delta layer.
|
||||
#[instrument(skip(self, frozen_layer), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.filename().display()))]
|
||||
#[instrument(skip(self, frozen_layer), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer.short_id()))]
|
||||
async fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> anyhow::Result<()> {
|
||||
// As a special case, when we have just imported an image into the repository,
|
||||
// instead of writing out a L0 delta layer, we directly write out image layer
|
||||
@@ -1729,7 +1745,7 @@ impl Timeline {
|
||||
fn update_metadata_file(
|
||||
&self,
|
||||
disk_consistent_lsn: Lsn,
|
||||
layer_paths_to_upload: HashMap<PathBuf, LayerFileMetadata>,
|
||||
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
|
||||
) -> anyhow::Result<()> {
|
||||
// We can only save a valid 'prev_record_lsn' value on disk if we
|
||||
// flushed *all* in-memory changes to disk. We only track
|
||||
@@ -1794,10 +1810,11 @@ impl Timeline {
|
||||
fn create_delta_layer(
|
||||
&self,
|
||||
frozen_layer: &InMemoryLayer,
|
||||
) -> anyhow::Result<(PathBuf, LayerFileMetadata)> {
|
||||
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
|
||||
// Write it out
|
||||
let new_delta = frozen_layer.write_to_disk()?;
|
||||
let new_delta_path = new_delta.path();
|
||||
let new_delta_filename = new_delta.filename();
|
||||
|
||||
// Sync it to disk.
|
||||
//
|
||||
@@ -1826,7 +1843,7 @@ impl Timeline {
|
||||
self.metrics.num_persistent_files_created.inc_by(1);
|
||||
self.metrics.persistent_bytes_written.inc_by(sz);
|
||||
|
||||
Ok((new_delta_path, LayerFileMetadata::new(sz)))
|
||||
Ok((new_delta_filename, LayerFileMetadata::new(sz)))
|
||||
}
|
||||
|
||||
fn repartition(&self, lsn: Lsn, partition_size: u64) -> anyhow::Result<(KeyPartitioning, Lsn)> {
|
||||
@@ -1888,7 +1905,7 @@ impl Timeline {
|
||||
partitioning: &KeyPartitioning,
|
||||
lsn: Lsn,
|
||||
force: bool,
|
||||
) -> anyhow::Result<HashMap<PathBuf, LayerFileMetadata>> {
|
||||
) -> anyhow::Result<HashMap<LayerFileName, LayerFileMetadata>> {
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
let mut image_layers: Vec<ImageLayer> = Vec::new();
|
||||
for partition in partitioning.parts.iter() {
|
||||
@@ -1966,9 +1983,10 @@ impl Timeline {
|
||||
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
|
||||
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
|
||||
for l in image_layers {
|
||||
let path = l.path();
|
||||
let metadata = path.metadata()?;
|
||||
let path = l.filename();
|
||||
let metadata = timeline_path.join(path.file_name()).metadata()?;
|
||||
|
||||
layer_paths_to_upload.insert(path, LayerFileMetadata::new(metadata.len()));
|
||||
|
||||
@@ -1984,7 +2002,7 @@ impl Timeline {
|
||||
#[derive(Default)]
|
||||
struct CompactLevel0Phase1Result {
|
||||
new_layers: Vec<DeltaLayer>,
|
||||
deltas_to_compact: Vec<Arc<dyn Layer>>,
|
||||
deltas_to_compact: Vec<Arc<dyn PersistentLayer>>,
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
@@ -2042,7 +2060,7 @@ impl Timeline {
|
||||
level0_deltas.len()
|
||||
);
|
||||
for l in deltas_to_compact.iter() {
|
||||
info!("compact includes {}", l.filename().display());
|
||||
info!("compact includes {}", l.filename().file_name());
|
||||
}
|
||||
// We don't need the original list of layers anymore. Drop it so that
|
||||
// we don't accidentally use it later in the function.
|
||||
@@ -2271,7 +2289,7 @@ impl Timeline {
|
||||
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_upload(
|
||||
&new_delta_path,
|
||||
&l.filename(),
|
||||
&LayerFileMetadata::new(metadata.len()),
|
||||
)?;
|
||||
}
|
||||
@@ -2280,19 +2298,19 @@ impl Timeline {
|
||||
self.metrics.current_physical_size_gauge.add(metadata.len());
|
||||
|
||||
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
||||
layers.insert_historic(Arc::new(l));
|
||||
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
||||
layers.insert_historic(x);
|
||||
}
|
||||
|
||||
// Now that we have reshuffled the data to set of new delta layers, we can
|
||||
// delete the old ones
|
||||
let mut layer_paths_to_delete = Vec::with_capacity(deltas_to_compact.len());
|
||||
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
|
||||
for l in deltas_to_compact {
|
||||
if let Some(path) = l.local_path() {
|
||||
self.metrics
|
||||
.current_physical_size_gauge
|
||||
.sub(path.metadata()?.len());
|
||||
layer_paths_to_delete.push(path);
|
||||
}
|
||||
let path = l.local_path();
|
||||
self.metrics
|
||||
.current_physical_size_gauge
|
||||
.sub(path.metadata()?.len());
|
||||
layer_names_to_delete.push(l.filename());
|
||||
l.delete()?;
|
||||
layers.remove_historic(l);
|
||||
}
|
||||
@@ -2300,7 +2318,7 @@ impl Timeline {
|
||||
|
||||
// Also schedule the deletions in remote storage
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_deletion(&layer_paths_to_delete)?;
|
||||
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -2485,23 +2503,13 @@ impl Timeline {
|
||||
//
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
// This layer is in the process of being flushed to disk.
|
||||
// It will be swapped out of the layer map, replaced with
|
||||
// on-disk layers containing the same data.
|
||||
// We can't GC it, as it's not on disk. We can't remove it
|
||||
// from the layer map yet, as it would make its data
|
||||
// inaccessible.
|
||||
if l.is_in_memory() {
|
||||
continue;
|
||||
}
|
||||
|
||||
result.layers_total += 1;
|
||||
|
||||
// 1. Is it newer than GC horizon cutoff point?
|
||||
if l.get_lsn_range().end > horizon_cutoff {
|
||||
debug!(
|
||||
"keeping {} because it's newer than horizon_cutoff {}",
|
||||
l.filename().display(),
|
||||
l.filename().file_name(),
|
||||
horizon_cutoff
|
||||
);
|
||||
result.layers_needed_by_cutoff += 1;
|
||||
@@ -2512,7 +2520,7 @@ impl Timeline {
|
||||
if l.get_lsn_range().end > pitr_cutoff {
|
||||
debug!(
|
||||
"keeping {} because it's newer than pitr_cutoff {}",
|
||||
l.filename().display(),
|
||||
l.filename().file_name(),
|
||||
pitr_cutoff
|
||||
);
|
||||
result.layers_needed_by_pitr += 1;
|
||||
@@ -2529,7 +2537,7 @@ impl Timeline {
|
||||
if &l.get_lsn_range().start <= retain_lsn {
|
||||
debug!(
|
||||
"keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
|
||||
l.filename().display(),
|
||||
l.filename().file_name(),
|
||||
retain_lsn,
|
||||
l.is_incremental(),
|
||||
);
|
||||
@@ -2562,7 +2570,7 @@ impl Timeline {
|
||||
{
|
||||
debug!(
|
||||
"keeping {} because it is the latest layer",
|
||||
l.filename().display()
|
||||
l.filename().file_name()
|
||||
);
|
||||
result.layers_not_updated += 1;
|
||||
continue 'outer;
|
||||
@@ -2571,7 +2579,7 @@ impl Timeline {
|
||||
// We didn't find any reason to keep this file, so remove it.
|
||||
debug!(
|
||||
"garbage collecting {} is_dropped: xx is_incremental: {}",
|
||||
l.filename().display(),
|
||||
l.filename().file_name(),
|
||||
l.is_incremental(),
|
||||
);
|
||||
layers_to_remove.push(Arc::clone(&l));
|
||||
@@ -2580,14 +2588,13 @@ impl Timeline {
|
||||
// Actually delete the layers from disk and remove them from the map.
|
||||
// (couldn't do this in the loop above, because you cannot modify a collection
|
||||
// while iterating it. BTreeMap::retain() would be another option)
|
||||
let mut layer_paths_to_delete = Vec::with_capacity(layers_to_remove.len());
|
||||
let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len());
|
||||
for doomed_layer in layers_to_remove {
|
||||
if let Some(path) = doomed_layer.local_path() {
|
||||
self.metrics
|
||||
.current_physical_size_gauge
|
||||
.sub(path.metadata()?.len());
|
||||
layer_paths_to_delete.push(path);
|
||||
}
|
||||
let path = doomed_layer.local_path();
|
||||
self.metrics
|
||||
.current_physical_size_gauge
|
||||
.sub(path.metadata()?.len());
|
||||
layer_names_to_delete.push(doomed_layer.filename());
|
||||
doomed_layer.delete()?;
|
||||
layers.remove_historic(doomed_layer);
|
||||
result.layers_removed += 1;
|
||||
@@ -2603,7 +2610,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_deletion(&layer_paths_to_delete)?;
|
||||
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
|
||||
}
|
||||
|
||||
result.elapsed = now.elapsed()?;
|
||||
@@ -2688,7 +2695,7 @@ impl Timeline {
|
||||
/// to an error, as anyhow context information.
|
||||
fn layer_traversal_error(
|
||||
msg: String,
|
||||
path: Vec<(ValueReconstructResult, Lsn, Arc<dyn Layer>)>,
|
||||
path: Vec<(ValueReconstructResult, Lsn, TraversalId)>,
|
||||
) -> anyhow::Result<()> {
|
||||
// We want the original 'msg' to be the outermost context. The outermost context
|
||||
// is the most high-level information, which also gets propagated to the client.
|
||||
@@ -2697,9 +2704,7 @@ fn layer_traversal_error(
|
||||
.map(|(r, c, l)| {
|
||||
format!(
|
||||
"layer traversal: result {:?}, cont_lsn {}, layer: {}",
|
||||
r,
|
||||
c,
|
||||
l.filename().display()
|
||||
r, c, l,
|
||||
)
|
||||
})
|
||||
.chain(std::iter::once(msg));
|
||||
|
||||
@@ -411,12 +411,10 @@ def test_tenant_ignores_backup_file(
|
||||
env.postgres.stop_all()
|
||||
env.pageserver.stop()
|
||||
|
||||
# file is still mentioned in the index. Removing it requires more hacking on remote queue initialization
|
||||
# Will be easier to do once there will be no .download_missing so it will be only one cycle through the layers
|
||||
# in load_layer_map
|
||||
# the .old file is gone from newly serialized index_part
|
||||
new_index_part = local_fs_index_part(env, tenant_id, timeline_id)
|
||||
backup_layers = filter(lambda x: x.endswith(".old"), new_index_part["timeline_layers"])
|
||||
assert len(list(backup_layers)) == 1
|
||||
assert len(list(backup_layers)) == 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
|
||||
Reference in New Issue
Block a user