pageserver: use generation in keys when writing

This commit is contained in:
John Spray
2023-08-29 14:59:40 +01:00
parent 930de712ee
commit 67b17034ab
8 changed files with 147 additions and 72 deletions

View File

@@ -643,23 +643,6 @@ impl PageServerConf {
.join(METADATA_FILE_NAME)
}
/// Files on the remote storage are stored with paths, relative to the workdir.
/// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
///
/// Errors if the path provided does not start from pageserver's workdir.
pub fn remote_path(&self, local_path: &Path) -> anyhow::Result<RemotePath> {
local_path
.strip_prefix(&self.workdir)
.context("Failed to strip workdir prefix")
.and_then(RemotePath::new)
.with_context(|| {
format!(
"Failed to resolve remote part of path {:?} for base {:?}",
local_path, self.workdir
)
})
}
/// Turns storage remote path of a file into its local path.
pub fn local_path(&self, remote_path: &RemotePath) -> PathBuf {
remote_path.with_base(&self.workdir)

View File

@@ -655,12 +655,8 @@ impl Tenant {
.as_ref()
.ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
let remote_timeline_ids = remote_timeline_client::list_remote_timelines(
remote_storage,
self.conf,
self.tenant_id,
)
.await?;
let remote_timeline_ids =
remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
info!("found {} timelines", remote_timeline_ids.len());
@@ -672,6 +668,7 @@ impl Tenant {
self.conf,
self.tenant_id,
timeline_id,
self.generation,
);
part_downloads.spawn(
async move {
@@ -2944,6 +2941,7 @@ impl Tenant {
self.conf,
self.tenant_id,
timeline_id,
self.generation,
);
Some(remote_client)
} else {

View File

@@ -216,7 +216,7 @@ use utils::backoff::{
};
use std::collections::{HashMap, VecDeque};
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
@@ -235,6 +235,7 @@ use crate::task_mgr::shutdown_token;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::upload_queue::Delete;
use crate::tenant::TIMELINES_SEGMENT_NAME;
use crate::{
config::PageServerConf,
task_mgr,
@@ -252,6 +253,7 @@ use self::index::IndexPart;
use super::storage_layer::LayerFileName;
use super::upload_queue::SetDeletedFlagProgress;
use super::Generation;
// Occasional network issues and such can cause remote operations to fail, and
// that's expected. If a download fails, we log it at info-level, and retry.
@@ -315,6 +317,7 @@ pub struct RemoteTimelineClient {
tenant_id: TenantId,
timeline_id: TimelineId,
generation: Generation,
upload_queue: Mutex<UploadQueue>,
@@ -335,12 +338,14 @@ impl RemoteTimelineClient {
conf: &'static PageServerConf,
tenant_id: TenantId,
timeline_id: TimelineId,
generation: Generation,
) -> RemoteTimelineClient {
RemoteTimelineClient {
conf,
runtime: BACKGROUND_RUNTIME.handle().to_owned(),
tenant_id,
timeline_id,
generation,
storage_impl: remote_storage,
upload_queue: Mutex::new(UploadQueue::Uninitialized),
metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)),
@@ -453,6 +458,7 @@ impl RemoteTimelineClient {
&self.storage_impl,
&self.tenant_id,
&self.timeline_id,
self.generation,
)
.measure_remote_op(
self.tenant_id,
@@ -761,10 +767,10 @@ impl RemoteTimelineClient {
backoff::retry(
|| {
upload::upload_index_part(
self.conf,
&self.storage_impl,
&self.tenant_id,
&self.timeline_id,
self.generation,
&index_part_with_deleted_at,
)
},
@@ -850,8 +856,7 @@ impl RemoteTimelineClient {
// Do not delete index part yet, it is needed for possible retry. If we remove it first
// and retry will arrive to different pageserver there wont be any traces of it on remote storage
let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
let timeline_storage_path = self.conf.remote_path(&timeline_path)?;
let timeline_storage_path = remote_timeline_path(&self.tenant_id, &self.timeline_id);
let remaining = backoff::retry(
|| async {
@@ -1055,15 +1060,17 @@ impl RemoteTimelineClient {
let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => {
let path = &self
let mut path = self
.conf
.timeline_path(&self.tenant_id, &self.timeline_id)
.join(layer_file_name.file_name());
upload::upload_timeline_layer(
self.conf,
&self.storage_impl,
path,
&path,
layer_metadata,
self.generation,
)
.measure_remote_op(
self.tenant_id,
@@ -1085,10 +1092,10 @@ impl RemoteTimelineClient {
};
let res = upload::upload_index_part(
self.conf,
&self.storage_impl,
&self.tenant_id,
&self.timeline_id,
self.generation,
index_part,
)
.measure_remote_op(
@@ -1360,6 +1367,79 @@ impl RemoteTimelineClient {
}
}
pub fn remote_timelines_path(tenant_id: &TenantId) -> RemotePath {
let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
RemotePath::from_string(&path).expect("Failed to construct path")
}
pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
remote_timelines_path(tenant_id).join(&PathBuf::from(timeline_id.to_string()))
}
pub fn remote_layer_path(
tenant_id: &TenantId,
timeline_id: &TimelineId,
layer_file_name: &LayerFileName,
layer_meta: &LayerFileMetadata,
) -> RemotePath {
let path = if let Some(generation) = layer_meta.generation {
// Generation-aware key format
format!(
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
layer_file_name.file_name(),
generation.get_suffix()
)
} else {
// Pre-generation key format
format!(
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}",
layer_file_name.file_name(),
)
};
RemotePath::from_string(&path).expect("Failed to construct path")
}
pub fn remote_index_path(
tenant_id: &TenantId,
timeline_id: &TimelineId,
generation: Generation,
) -> RemotePath {
RemotePath::from_string(&format!(
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
IndexPart::FILE_NAME,
generation.get_suffix()
))
.expect("Failed to construct path")
}
/// Files on the remote storage are stored with paths, relative to the workdir.
/// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
///
/// Errors if the path provided does not start from pageserver's workdir.
pub fn remote_path(
conf: &PageServerConf,
local_path: &Path,
generation: Generation,
) -> anyhow::Result<RemotePath> {
let stripped = local_path
.strip_prefix(&conf.workdir)
.context("Failed to strip workdir prefix")?;
let suffixed = format!(
"{0}{1}",
stripped.to_string_lossy(),
generation.get_suffix()
);
RemotePath::new(&PathBuf::from(suffixed)).with_context(|| {
format!(
"Failed to resolve remote part of path {:?} for base {:?}",
local_path, conf.workdir
)
})
}
#[cfg(test)]
mod tests {
use super::*;
@@ -1409,8 +1489,11 @@ mod tests {
assert_eq!(avec, bvec);
}
fn assert_remote_files(expected: &[&str], remote_path: &Path) {
let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
fn assert_remote_files(expected: &[&str], remote_path: &Path, generation: Generation) {
let mut expected: Vec<String> = expected
.iter()
.map(|x| format!("{}{}", x, generation.get_suffix()))
.collect();
expected.sort();
let mut found: Vec<String> = Vec::new();
@@ -1461,6 +1544,8 @@ mod tests {
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
};
let generation = Generation::new(0xdeadbeef);
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
let client = Arc::new(RemoteTimelineClient {
@@ -1468,6 +1553,7 @@ mod tests {
runtime: tokio::runtime::Handle::current(),
tenant_id: harness.tenant_id,
timeline_id: TIMELINE_ID,
generation: generation,
storage_impl: storage,
upload_queue: Mutex::new(UploadQueue::Uninitialized),
metrics: Arc::new(RemoteTimelineClientMetrics::new(
@@ -1526,6 +1612,8 @@ mod tests {
.init_upload_queue_for_empty_remote(&metadata)
.unwrap();
let generation = Generation::new(0xdeadbeef);
// Create a couple of dummy files, schedule upload for them
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap();
@@ -1542,8 +1630,6 @@ mod tests {
std::fs::write(timeline_path.join(filename.file_name()), content).unwrap();
}
let generation = Generation::new(0xdeadbeef);
client
.schedule_layer_file_upload(
&layer_file_name_1,
@@ -1641,6 +1727,7 @@ mod tests {
"index_part.json",
],
&remote_timeline_dir,
generation,
);
// Finish them
@@ -1653,6 +1740,7 @@ mod tests {
"index_part.json",
],
&remote_timeline_dir,
generation,
);
}

View File

@@ -5,7 +5,10 @@ use tracing::debug;
use remote_storage::GenericRemoteStorage;
use crate::config::PageServerConf;
use crate::{
config::PageServerConf,
tenant::{remote_timeline_client::remote_path, Generation},
};
pub(super) async fn delete_layer<'a>(
conf: &'static PageServerConf,
@@ -17,7 +20,9 @@ pub(super) async fn delete_layer<'a>(
});
debug!("Deleting layer from remote storage: {local_layer_path:?}",);
let path_to_delete = conf.remote_path(local_layer_path)?;
// FIXME: once we start writing out keys with generations, this will
// need updating (or, in the deletion queue branch, it is already gone)
let path_to_delete = remote_path(conf, local_layer_path, Generation::placeholder())?;
// We don't want to print an error if the delete failed if the file has
// already been deleted. Thankfully, in this situation S3 already

View File

@@ -15,14 +15,16 @@ use tokio_util::sync::CancellationToken;
use utils::{backoff, crashsafe};
use crate::config::PageServerConf;
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::Generation;
use remote_storage::{DownloadError, GenericRemoteStorage};
use utils::crashsafe::path_with_suffix_extension;
use utils::id::{TenantId, TimelineId};
use super::index::{IndexPart, LayerFileMetadata};
use super::{FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES};
use super::{remote_index_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES};
static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120);
@@ -41,13 +43,11 @@ pub async fn download_layer_file<'a>(
) -> Result<u64, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let timeline_path = conf.timeline_path(&tenant_id, &timeline_id);
let local_path = conf
.timeline_path(&tenant_id, &timeline_id)
.join(layer_file_name.file_name());
let local_path = timeline_path.join(layer_file_name.file_name());
let remote_path = conf
.remote_path(&local_path)
.map_err(DownloadError::Other)?;
let remote_path = remote_layer_path(&tenant_id, &timeline_id, layer_file_name, layer_metadata);
// Perform a rename inspired by durable_rename from file_utils.c.
// The sequence:
@@ -175,19 +175,17 @@ pub fn is_temp_download_file(path: &Path) -> bool {
/// List timelines of given tenant in remote storage
pub async fn list_remote_timelines<'a>(
storage: &'a GenericRemoteStorage,
conf: &'static PageServerConf,
tenant_id: TenantId,
) -> anyhow::Result<HashSet<TimelineId>> {
let tenant_path = conf.timelines_path(&tenant_id);
let tenant_storage_path = conf.remote_path(&tenant_path)?;
let remote_path = remote_timelines_path(&tenant_id);
fail::fail_point!("storage-sync-list-remote-timelines", |_| {
anyhow::bail!("storage-sync-list-remote-timelines");
});
let timelines = download_retry(
|| storage.list_prefixes(Some(&tenant_storage_path)),
&format!("list prefixes for {tenant_path:?}"),
|| storage.list_prefixes(Some(&remote_path)),
&format!("list prefixes for {tenant_id}"),
)
.await?;
@@ -226,17 +224,17 @@ pub(super) async fn download_index_part(
storage: &GenericRemoteStorage,
tenant_id: &TenantId,
timeline_id: &TimelineId,
generation: Generation,
) -> Result<IndexPart, DownloadError> {
let index_part_path = conf
let local_path = conf
.metadata_path(tenant_id, timeline_id)
.with_file_name(IndexPart::FILE_NAME);
let part_storage_path = conf
.remote_path(&index_part_path)
.map_err(DownloadError::BadInput)?;
let remote_path = remote_index_path(tenant_id, timeline_id, generation);
let index_part_bytes = download_retry(
|| async {
let mut index_part_download = storage.download(&part_storage_path).await?;
let mut index_part_download = storage.download(&remote_path).await?;
let mut index_part_bytes = Vec::new();
tokio::io::copy(
@@ -244,20 +242,16 @@ pub(super) async fn download_index_part(
&mut index_part_bytes,
)
.await
.with_context(|| {
format!("Failed to download an index part into file {index_part_path:?}")
})
.with_context(|| format!("Failed to download an index part into file {local_path:?}"))
.map_err(DownloadError::Other)?;
Ok(index_part_bytes)
},
&format!("download {part_storage_path:?}"),
&format!("download {remote_path:?}"),
)
.await?;
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
.with_context(|| {
format!("Failed to deserialize index part file into file {index_part_path:?}")
})
.with_context(|| format!("Failed to deserialize index part file into file {local_path:?}"))
.map_err(DownloadError::Other)?;
Ok(index_part)

View File

@@ -26,7 +26,7 @@ pub struct LayerFileMetadata {
file_size: u64,
// Optional for backward compatibility: older data will not have a generation set
generation: Option<Generation>,
pub(crate) generation: Option<Generation>,
}
impl From<&'_ IndexLayerMetadata> for LayerFileMetadata {
@@ -142,13 +142,17 @@ impl TryFrom<&UploadQueueInitialized> for IndexPart {
}
}
fn generation_is_none(g: &Option<Generation>) -> bool {
g.map(|g| g.is_none()).unwrap_or(true)
}
/// Serialized form of [`LayerFileMetadata`].
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)]
pub struct IndexLayerMetadata {
pub(super) file_size: u64,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(skip_serializing_if = "generation_is_none")]
pub(super) generation: Option<Generation>,
}

View File

@@ -5,7 +5,11 @@ use fail::fail_point;
use std::{io::ErrorKind, path::Path};
use tokio::fs;
use crate::{config::PageServerConf, tenant::remote_timeline_client::index::IndexPart};
use super::Generation;
use crate::{
config::PageServerConf,
tenant::remote_timeline_client::{index::IndexPart, remote_index_path, remote_path},
};
use remote_storage::GenericRemoteStorage;
use utils::id::{TenantId, TimelineId};
@@ -15,10 +19,10 @@ use tracing::info;
/// Serializes and uploads the given index part data to the remote storage.
pub(super) async fn upload_index_part<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage,
tenant_id: &TenantId,
timeline_id: &TimelineId,
generation: Generation,
index_part: &'a IndexPart,
) -> anyhow::Result<()> {
tracing::trace!("uploading new index part");
@@ -32,13 +36,9 @@ pub(super) async fn upload_index_part<'a>(
let index_part_size = index_part_bytes.len();
let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
let index_part_path = conf
.metadata_path(tenant_id, timeline_id)
.with_file_name(IndexPart::FILE_NAME);
let storage_path = conf.remote_path(&index_part_path)?;
let remote_path = remote_index_path(tenant_id, timeline_id, generation);
storage
.upload_storage_object(Box::new(index_part_bytes), index_part_size, &storage_path)
.upload_storage_object(Box::new(index_part_bytes), index_part_size, &remote_path)
.await
.with_context(|| format!("Failed to upload index part for '{tenant_id} / {timeline_id}'"))
}
@@ -52,12 +52,13 @@ pub(super) async fn upload_timeline_layer<'a>(
storage: &'a GenericRemoteStorage,
source_path: &'a Path,
known_metadata: &'a LayerFileMetadata,
generation: Generation,
) -> anyhow::Result<()> {
fail_point!("before-upload-layer", |_| {
bail!("failpoint before-upload-layer")
});
let storage_path = conf.remote_path(source_path)?;
let storage_path = remote_path(conf, source_path, generation)?;
let source_file_res = fs::File::open(&source_path).await;
let source_file = match source_file_res {
Ok(source_file) => source_file,

View File

@@ -233,7 +233,9 @@ impl std::fmt::Display for UploadOp {
metadata.file_size()
)
}
UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn),
UploadOp::UploadMetadata(_, lsn) => {
write!(f, "UploadMetadata(lsn: {})", lsn)
}
UploadOp::Delete(delete) => write!(
f,
"Delete(path: {}, scheduled_from_timeline_delete: {})",