Compare commits

...

29 Commits

Author SHA1 Message Date
John Spray
f06435509d Merge branch 'jcsp/config-spawn-blocking' into jcsp/no-more-load-ptX 2023-10-19 19:12:01 +01:00
John Spray
e0d3b8ebdc noisy completion 2023-10-19 19:11:24 +01:00
John Spray
d8bbe302af Separate out remote load phase 2023-10-19 17:53:41 +01:00
John Spray
64247ef3a5 pageserver: parallel load of configs 2023-10-19 17:47:31 +01:00
John Spray
74472a5bfa pageserver: do config writes in a spawn_blocking 2023-10-19 14:45:00 +01:00
John Spray
32dcf6dafa pageserver: unified spawn() method 2023-10-19 10:01:55 +01:00
John Spray
ed798f5440 pageserver: retry forever & cancellation token on index download 2023-10-19 09:20:36 +01:00
John Spray
c9cbdf0bf7 pageserver: retry forever for remote timeline listing
There is no point giving up: if S3 is unavailable, we should
keep trying until it becomes available.
2023-10-19 09:20:36 +01:00
John Spray
25f2565bbd pageserver: re-work remote deletion markers
- Store them under timelines/
- Read them as part of timeline listing, rather than
  with a separate GET.
2023-10-19 09:20:36 +01:00
John Spray
b9b8f35456 remote_storage: add more flexible list()
This provides roughly the same API as Listobjects,
in that it can be called with or without a delimiter,
and separately outputs prefixes and keys.
2023-10-19 09:20:36 +01:00
John Spray
fd271fec28 pageserver: fixes for attach (tolerate Stopping on delete, tolerate
missing timelines dir)
2023-10-19 09:20:36 +01:00
John Spray
435b592fc4 pageserver: tolerate missing delete marker 2023-10-19 09:20:36 +01:00
John Spray
2a77f6d61d tests: use restart helper in test_delete_tenant_exercise_crash_safety_failpoints 2023-10-19 09:20:36 +01:00
John Spray
89756bca22 tests: allow log in test_non_uploaded_root_timeline_is_deleted_after_restart 2023-10-19 09:20:36 +01:00
John Spray
4b274802b1 tests: fix test_timeline_init_break_before_checkpoint 2023-10-19 09:20:36 +01:00
John Spray
ff0b8c6e04 tests: update test_broken_timeline 2023-10-19 09:20:36 +01:00
John Spray
62dffe5a05 pageserver: clean up stale timeline dirs on attach 2023-10-19 09:20:36 +01:00
John Spray
e4cd6cde4a tests: remove outdated metric check
Checking S3 metrics is too invasive for an integration test
in general anyway: it isn't a functional requirement that the
pagserver does a specific number of S3 ops while attaching
a tenant.
2023-10-19 09:20:36 +01:00
John Spray
9ba9965110 tests: remove test_ignored_tenant_stays_broken_without_metadata
This test exercised a local storage only path
2023-10-19 09:20:36 +01:00
John Spray
8b0ee2abf9 tests: add a quiesce helper for waiting for attachment 2023-10-19 09:20:36 +01:00
John Spray
c8c375b565 tests: update failpoint for restart test 2023-10-19 09:20:36 +01:00
John Spray
f0577ccf9a tests: s/Loading/Attaching 2023-10-19 09:20:36 +01:00
John Spray
63ccd0aa47 tests: remove local-specific test_pageserver_with_empty_tenant case 2023-10-19 09:20:36 +01:00
John Spray
d8e874dbdd pageserver: tolerate loading empty tenants from remote storage 2023-10-19 09:20:36 +01:00
John Spray
12b79f710e pageserver: tolerate missing index_parts in remote storage 2023-10-19 09:20:36 +01:00
John Spray
dd2136bd09 tests: fix test_ignore_while_attaching 2023-10-19 09:20:36 +01:00
John Spray
93ff9de366 Update unit tests 2023-10-19 09:20:36 +01:00
John Spray
eaf970180b Remove defunct "attach marker" code. 2023-10-19 09:20:36 +01:00
John Spray
4817731840 pageserver: always load remote metadata 2023-10-19 09:20:36 +01:00
25 changed files with 906 additions and 1067 deletions

View File

@@ -110,7 +110,6 @@ impl TenantState {
// So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
// tenant mgr startup distinguishes attaching from loading via marker file.
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
// We only reach Active after successful load / attach.
// So, call atttachment status Attached.

View File

@@ -23,8 +23,8 @@ use tracing::debug;
use crate::s3_bucket::RequestKind;
use crate::{
AzureConfig, ConcurrencyLimiter, Download, DownloadError, RemotePath, RemoteStorage,
StorageMetadata,
AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath,
RemoteStorage, StorageMetadata,
};
pub struct AzureBlobStorage {
@@ -201,10 +201,11 @@ fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
#[async_trait::async_trait]
impl RemoteStorage for AzureBlobStorage {
async fn list_prefixes(
async fn list(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
mode: ListingMode,
) -> anyhow::Result<Listing, DownloadError> {
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
.map(|p| self.relative_path_to_name(p))
@@ -212,16 +213,19 @@ impl RemoteStorage for AzureBlobStorage {
.map(|mut p| {
// required to end with a separator
// otherwise request will return only the entry of a prefix
if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
if matches!(mode, ListingMode::WithDelimiter)
&& !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
{
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
}
p
});
let mut builder = self
.client
.list_blobs()
.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
let mut builder = self.client.list_blobs();
if let ListingMode::WithDelimiter = mode {
builder = builder.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
}
if let Some(prefix) = list_prefix {
builder = builder.prefix(Cow::from(prefix.to_owned()));
@@ -232,7 +236,7 @@ impl RemoteStorage for AzureBlobStorage {
}
let mut response = builder.into_stream();
let mut res = Vec::new();
let mut res = Listing::default();
while let Some(l) = response.next().await {
let entry = match l {
Ok(l) => l,
@@ -250,43 +254,21 @@ impl RemoteStorage for AzureBlobStorage {
});
}
};
let name_iter = entry
let prefix_iter = entry
.blobs
.prefixes()
.map(|prefix| self.name_to_relative_path(&prefix.name));
res.extend(name_iter);
}
Ok(res)
}
res.prefixes.extend(prefix_iter);
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let folder_name = folder
.map(|p| self.relative_path_to_name(p))
.or_else(|| self.prefix_in_container.clone());
let mut builder = self.client.list_blobs();
if let Some(folder_name) = folder_name {
builder = builder.prefix(Cow::from(folder_name.to_owned()));
}
if let Some(limit) = self.max_keys_per_list_response {
builder = builder.max_results(MaxResults::new(limit));
}
let mut response = builder.into_stream();
let mut res = Vec::new();
while let Some(l) = response.next().await {
let entry = l.map_err(anyhow::Error::new)?;
let name_iter = entry
let blob_iter = entry
.blobs
.blobs()
.map(|bl| self.name_to_relative_path(&bl.name));
res.extend(name_iter);
.map(|k| self.name_to_relative_path(&k.name));
res.keys.extend(blob_iter);
}
Ok(res)
}
async fn upload(
&self,
mut from: impl AsyncRead + Unpin + Send + Sync + 'static,

View File

@@ -129,6 +129,22 @@ impl RemotePath {
}
}
/// We don't need callers to be able to pass arbitrary delimiters: just control
/// whether listings will use a '/' separator or not.
///
/// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
/// NoDelimiter mode will only populate `keys`.
pub enum ListingMode {
WithDelimiter,
NoDelimiter,
}
#[derive(Default)]
pub struct Listing {
pub prefixes: Vec<RemotePath>,
pub keys: Vec<RemotePath>,
}
/// Storage (potentially remote) API to manage its state.
/// This storage tries to be unaware of any layered repository context,
/// providing basic CRUD operations for storage files.
@@ -141,8 +157,13 @@ pub trait RemoteStorage: Send + Sync + 'static {
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError>;
) -> Result<Vec<RemotePath>, DownloadError> {
let result = self
.list(prefix, ListingMode::WithDelimiter)
.await?
.prefixes;
Ok(result)
}
/// Lists all files in directory "recursively"
/// (not really recursively, because AWS has a flat namespace)
/// Note: This is subtely different than list_prefixes,
@@ -154,7 +175,21 @@ pub trait RemoteStorage: Send + Sync + 'static {
/// whereas,
/// list_prefixes("foo/bar/") = ["cat", "dog"]
/// See `test_real_s3.rs` for more details.
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>>;
async fn list_files(&self, prefix: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let result = self.list(prefix, ListingMode::NoDelimiter).await?.keys;
Ok(result)
}
async fn list(
&self,
prefix: Option<&RemotePath>,
_mode: ListingMode,
) -> anyhow::Result<Listing, DownloadError>; /* {
// XXX Placeholder impl.
let mut result = Listing::default();
result.prefixes = self.list_prefixes(prefix).await?;
Ok(result)
}*/
/// Streams the local file contents into remote into the remote storage entry.
async fn upload(
@@ -205,6 +240,9 @@ pub enum DownloadError {
BadInput(anyhow::Error),
/// The file was not found in the remote storage.
NotFound,
/// A cancellation token aborted the download, typically during
/// tenant detach or process shutdown.
Cancelled,
/// The file was found in the remote storage, but the download failed.
Other(anyhow::Error),
}
@@ -215,6 +253,7 @@ impl std::fmt::Display for DownloadError {
DownloadError::BadInput(e) => {
write!(f, "Failed to download a remote file due to user input: {e}")
}
DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
}
@@ -234,6 +273,19 @@ pub enum GenericRemoteStorage {
}
impl GenericRemoteStorage {
pub async fn list(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
) -> anyhow::Result<Listing, DownloadError> {
match self {
Self::LocalFs(s) => s.list(prefix, mode).await,
Self::AwsS3(s) => s.list(prefix, mode).await,
Self::AzureBlob(s) => s.list(prefix, mode).await,
Self::Unreliable(s) => s.list(prefix, mode).await,
}
}
// A function for listing all the files in a "directory"
// Example:
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]

View File

@@ -15,7 +15,7 @@ use tokio::{
use tracing::*;
use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
use crate::{Download, DownloadError, RemotePath};
use crate::{Download, DownloadError, Listing, ListingMode, RemotePath};
use super::{RemoteStorage, StorageMetadata};
@@ -75,7 +75,7 @@ impl LocalFs {
}
#[cfg(test)]
async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
Ok(get_all_files(&self.storage_root, true)
.await?
.into_iter()
@@ -89,52 +89,10 @@ impl LocalFs {
})
.collect())
}
}
#[async_trait::async_trait]
impl RemoteStorage for LocalFs {
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
let path = match prefix {
Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
None => Cow::Borrowed(&self.storage_root),
};
let prefixes_to_filter = get_all_files(path.as_ref(), false)
.await
.map_err(DownloadError::Other)?;
let mut prefixes = Vec::with_capacity(prefixes_to_filter.len());
// filter out empty directories to mirror s3 behavior.
for prefix in prefixes_to_filter {
if prefix.is_dir()
&& is_directory_empty(&prefix)
.await
.map_err(DownloadError::Other)?
{
continue;
}
prefixes.push(
prefix
.strip_prefix(&self.storage_root)
.context("Failed to strip prefix")
.and_then(RemotePath::new)
.expect(
"We list files for storage root, hence should be able to remote the prefix",
),
)
}
Ok(prefixes)
}
// recursively lists all files in a directory,
// mirroring the `list_files` for `s3_bucket`
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let full_path = match folder {
Some(folder) => folder.with_base(&self.storage_root),
None => self.storage_root.clone(),
@@ -186,6 +144,61 @@ impl RemoteStorage for LocalFs {
Ok(files)
}
}
#[async_trait::async_trait]
impl RemoteStorage for LocalFs {
async fn list(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
) -> Result<Listing, DownloadError> {
let mut result = Listing::default();
if let ListingMode::NoDelimiter = mode {
result.keys = self
.list_recursive(prefix)
.await
.map_err(DownloadError::Other)?;
return Ok(result);
}
let path = match prefix {
Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
None => Cow::Borrowed(&self.storage_root),
};
let prefixes_to_filter = get_all_files(path.as_ref(), false)
.await
.map_err(DownloadError::Other)?;
// filter out empty directories to mirror s3 behavior.
for prefix in prefixes_to_filter {
if prefix.is_dir()
&& is_directory_empty(&prefix)
.await
.map_err(DownloadError::Other)?
{
continue;
}
let stripped = prefix
.strip_prefix(&self.storage_root)
.context("Failed to strip prefix")
.and_then(RemotePath::new)
.expect(
"We list files for storage root, hence should be able to remote the prefix",
);
if prefix.is_dir() {
result.prefixes.push(stripped);
} else {
result.keys.push(stripped)
}
}
Ok(result)
}
async fn upload(
&self,
@@ -479,7 +492,7 @@ mod fs_tests {
let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
assert_eq!(
storage.list().await?,
storage.list_all().await?,
vec![target_path_1.clone()],
"Should list a single file after first upload"
);
@@ -667,7 +680,7 @@ mod fs_tests {
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
storage.delete(&upload_target).await?;
assert!(storage.list().await?.is_empty());
assert!(storage.list_all().await?.is_empty());
storage
.delete(&upload_target)
@@ -777,7 +790,7 @@ mod fs_tests {
}
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
let mut files = storage.list().await?;
let mut files = storage.list_all().await?;
files.sort_by(|a, b| a.0.cmp(&b.0));
Ok(files)
}

View File

@@ -30,8 +30,8 @@ use tracing::debug;
use super::StorageMetadata;
use crate::{
ConcurrencyLimiter, Download, DownloadError, RemotePath, RemoteStorage, S3Config,
MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
S3Config, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
};
pub(super) mod metrics;
@@ -299,13 +299,13 @@ impl<S: AsyncRead> AsyncRead for TimedDownload<S> {
#[async_trait::async_trait]
impl RemoteStorage for S3Bucket {
/// See the doc for `RemoteStorage::list_prefixes`
/// Note: it wont include empty "directories"
async fn list_prefixes(
async fn list(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
mode: ListingMode,
) -> Result<Listing, DownloadError> {
let kind = RequestKind::List;
let mut result = Listing::default();
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
@@ -314,28 +314,33 @@ impl RemoteStorage for S3Bucket {
.map(|mut p| {
// required to end with a separator
// otherwise request will return only the entry of a prefix
if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
if matches!(mode, ListingMode::WithDelimiter)
&& !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
{
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
}
p
});
let mut document_keys = Vec::new();
let mut continuation_token = None;
loop {
let _guard = self.permit(kind).await;
let started_at = start_measuring_requests(kind);
let fetch_response = self
let mut request = self
.client
.list_objects_v2()
.bucket(self.bucket_name.clone())
.set_prefix(list_prefix.clone())
.set_continuation_token(continuation_token)
.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string())
.set_max_keys(self.max_keys_per_list_response)
.set_max_keys(self.max_keys_per_list_response);
if let ListingMode::WithDelimiter = mode {
request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
}
let response = request
.send()
.await
.context("Failed to list S3 prefixes")
@@ -345,71 +350,35 @@ impl RemoteStorage for S3Bucket {
metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &fetch_response, started_at);
.observe_elapsed(kind, &response, started_at);
let fetch_response = fetch_response?;
let response = response?;
document_keys.extend(
fetch_response
.common_prefixes
.unwrap_or_default()
.into_iter()
let keys = response.contents().unwrap_or_default();
let empty = Vec::new();
let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
tracing::info!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
for object in keys {
let object_path = object.key().expect("response does not contain a key");
let remote_path = self.s3_object_to_relative_path(object_path);
result.keys.push(remote_path);
}
result.prefixes.extend(
prefixes
.iter()
.filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
);
continuation_token = match fetch_response.next_continuation_token {
continuation_token = match response.next_continuation_token {
Some(new_token) => Some(new_token),
None => break,
};
}
Ok(document_keys)
}
/// See the doc for `RemoteStorage::list_files`
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let kind = RequestKind::List;
let folder_name = folder
.map(|p| self.relative_path_to_s3_object(p))
.or_else(|| self.prefix_in_bucket.clone());
// AWS may need to break the response into several parts
let mut continuation_token = None;
let mut all_files = vec![];
loop {
let _guard = self.permit(kind).await;
let started_at = start_measuring_requests(kind);
let response = self
.client
.list_objects_v2()
.bucket(self.bucket_name.clone())
.set_prefix(folder_name.clone())
.set_continuation_token(continuation_token)
.set_max_keys(self.max_keys_per_list_response)
.send()
.await
.context("Failed to list files in S3 bucket");
let started_at = ScopeGuard::into_inner(started_at);
metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &response, started_at);
let response = response?;
for object in response.contents().unwrap_or_default() {
let object_path = object.key().expect("response does not contain a key");
let remote_path = self.s3_object_to_relative_path(object_path);
all_files.push(remote_path);
}
match response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok(all_files)
Ok(result)
}
async fn upload(

View File

@@ -5,7 +5,9 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Mutex;
use crate::{Download, DownloadError, RemotePath, RemoteStorage, StorageMetadata};
use crate::{
Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata,
};
pub struct UnreliableWrapper {
inner: crate::GenericRemoteStorage,
@@ -95,6 +97,15 @@ impl RemoteStorage for UnreliableWrapper {
self.inner.list_files(folder).await
}
async fn list(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
) -> Result<Listing, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
self.inner.list(prefix, mode).await
}
async fn upload(
&self,
data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,

View File

@@ -1,12 +1,36 @@
use std::sync::Arc;
use std::sync::{atomic::AtomicI32, Arc};
use tokio::sync::{mpsc, Mutex};
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
///
/// Can be cloned, moved and kept around in futures as "guard objects".
#[derive(Clone)]
pub struct Completion(mpsc::Sender<()>);
pub struct Completion {
sender: mpsc::Sender<()>,
refcount: Arc<AtomicI32>,
}
impl Clone for Completion {
fn clone(&self) -> Self {
let i = self
.refcount
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
tracing::info!("Completion::clone[{:p}]: {i}", &(*self.refcount));
Self {
sender: self.sender.clone(),
refcount: self.refcount.clone(),
}
}
}
impl Drop for Completion {
fn drop(&mut self) {
let i = self
.refcount
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
tracing::info!("Completion::drop[{:p}]: {i}", &(*self.refcount));
}
}
/// Barrier will wait until all clones of [`Completion`] have been dropped.
#[derive(Clone)]
@@ -45,5 +69,11 @@ pub fn channel() -> (Completion, Barrier) {
let (tx, rx) = mpsc::channel::<()>(1);
let rx = Mutex::new(rx);
let rx = Arc::new(rx);
(Completion(tx), Barrier(rx))
(
Completion {
sender: tx,
refcount: Arc::new(AtomicI32::new(1)),
},
Barrier(rx),
)
}

View File

@@ -361,12 +361,18 @@ fn start_pageserver(
let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel();
let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel();
let (tenants_can_start, tenants_can_start_barrier) = utils::completion::channel();
tracing::info!("init_remote_done_tx:");
let c = init_remote_done_tx.clone();
drop(c);
let order = pageserver::InitializationOrder {
initial_tenant_load_remote: Some(init_done_tx),
initial_tenant_load: Some(init_remote_done_tx),
initial_logical_size_can_start: init_done_rx.clone(),
initial_logical_size_attempt: Some(init_logical_size_done_tx),
tenants_can_start: tenants_can_start_barrier.clone(),
background_jobs_can_start: background_jobs_barrier.clone(),
};
@@ -393,6 +399,8 @@ fn start_pageserver(
init_remote_done_rx.wait().await;
startup_checkpoint("initial_tenant_load_remote", "Remote part of initial load completed");
drop(tenants_can_start);
init_done_rx.wait().await;
startup_checkpoint("initial_tenant_load", "Initial load completed");
STARTUP_IS_LOADING.set(0);

View File

@@ -33,8 +33,7 @@ use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
use crate::tenant::config::TenantConf;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{
TENANTS_SEGMENT_NAME, TENANT_ATTACHING_MARKER_FILENAME, TENANT_DELETED_MARKER_FILE_NAME,
TIMELINES_SEGMENT_NAME,
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
};
use crate::{
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_LOCATION_CONFIG_NAME,
@@ -633,11 +632,6 @@ impl PageServerConf {
self.tenants_path().join(tenant_id.to_string())
}
pub fn tenant_attaching_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id)
.join(TENANT_ATTACHING_MARKER_FILENAME)
}
pub fn tenant_ignore_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id).join(IGNORED_TENANT_FILE_NAME)
}

View File

@@ -186,6 +186,8 @@ pub struct InitializationOrder {
/// attempt. It is important to drop this once the attempt has completed.
pub initial_logical_size_attempt: Option<utils::completion::Completion>,
pub tenants_can_start: utils::completion::Barrier,
/// Barrier for when we can start any background jobs.
///
/// This can be broken up later on, but right now there is just one class of a background job.

File diff suppressed because it is too large Load Diff

View File

@@ -3,10 +3,10 @@ use std::sync::Arc;
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::models::TenantState;
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
use remote_storage::{GenericRemoteStorage, RemotePath};
use tokio::sync::OwnedMutexGuard;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, instrument, warn, Instrument, Span};
use tracing::{error, instrument, warn, Instrument, Span};
use utils::{
backoff, completion, crashsafe, fs_ext,
@@ -25,11 +25,9 @@ use super::{
remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
span,
timeline::delete::DeleteTimelineFlow,
tree_sort_timelines, DeleteTimelineError, Tenant,
tree_sort_timelines, DeleteTimelineError, Tenant, TenantPreload,
};
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u32 = 3;
#[derive(Debug, thiserror::Error)]
pub(crate) enum DeleteTenantError {
#[error("GetTenant {0}")]
@@ -60,7 +58,7 @@ fn remote_tenant_delete_mark_path(
.context("Failed to strip workdir prefix")
.and_then(RemotePath::new)
.context("tenant path")?;
Ok(tenant_remote_path.join(Utf8Path::new("deleted")))
Ok(tenant_remote_path.join(Utf8Path::new("timelines/deleted")))
}
async fn create_remote_delete_mark(
@@ -239,32 +237,6 @@ async fn cleanup_remaining_fs_traces(
Ok(())
}
pub(crate) async fn remote_delete_mark_exists(
conf: &PageServerConf,
tenant_id: &TenantId,
remote_storage: &GenericRemoteStorage,
) -> anyhow::Result<bool> {
// If remote storage is there we rely on it
let remote_mark_path = remote_tenant_delete_mark_path(conf, tenant_id).context("path")?;
let result = backoff::retry(
|| async { remote_storage.download(&remote_mark_path).await },
|e| matches!(e, DownloadError::NotFound),
SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS,
SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS,
"fetch_tenant_deletion_mark",
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
)
.await;
match result {
Ok(_) => Ok(true),
Err(DownloadError::NotFound) => Ok(false),
Err(e) => Err(anyhow::anyhow!(e)).context("remote_delete_mark_exists")?,
}
}
/// Orchestrates tenant shut down of all tasks, removes its in-memory structures,
/// and deletes its data from both disk and s3.
/// The sequence of steps:
@@ -276,10 +248,9 @@ pub(crate) async fn remote_delete_mark_exists(
/// 6. Remove remote mark
/// 7. Cleanup remaining fs traces, tenant dir, config, timelines dir, local delete mark
/// It is resumable from any step in case a crash/restart occurs.
/// There are three entrypoints to the process:
/// There are two entrypoints to the process:
/// 1. [`DeleteTenantFlow::run`] this is the main one called by a management api handler.
/// 2. [`DeleteTenantFlow::resume_from_load`] is called during restarts when local or remote deletion marks are still there.
/// 3. [`DeleteTenantFlow::resume_from_attach`] is called when deletion is resumed tenant is found to be deleted during attach process.
/// 2. [`DeleteTenantFlow::resume_from_attach`] is called when deletion is resumed tenant is found to be deleted during attach process.
/// Note the only other place that messes around timeline delete mark is the `Tenant::spawn_load` function.
#[derive(Default)]
pub enum DeleteTenantFlow {
@@ -378,7 +349,7 @@ impl DeleteTenantFlow {
pub(crate) async fn should_resume_deletion(
conf: &'static PageServerConf,
remote_storage: Option<&GenericRemoteStorage>,
remote_mark_exists: bool,
tenant: &Tenant,
) -> Result<Option<DeletionGuard>, DeleteTenantError> {
let acquire = |t: &Tenant| {
@@ -389,66 +360,24 @@ impl DeleteTenantFlow {
)
};
let tenant_id = tenant.tenant_id;
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
if remote_mark_exists {
return Ok(acquire(tenant));
}
let remote_storage = match remote_storage {
Some(remote_storage) => remote_storage,
None => return Ok(None),
};
if remote_delete_mark_exists(conf, &tenant_id, remote_storage).await? {
Ok(acquire(tenant))
} else {
Ok(None)
let tenant_id = tenant.tenant_id;
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
match tokio::fs::metadata(conf.tenant_deleted_mark_file_path(&tenant_id)).await {
Ok(_) => Ok(acquire(tenant)),
Err(_) => Ok(None),
}
}
pub(crate) async fn resume_from_load(
guard: DeletionGuard,
tenant: &Arc<Tenant>,
init_order: Option<&InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel();
tenant
.set_stopping(progress, true, false)
.await
.expect("cant be stopping or broken");
// Do not consume valuable resources during the load phase, continue deletion once init phase is complete.
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
if let Some(background) = background_jobs_can_start {
info!("waiting for backgound jobs barrier");
background.clone().wait().await;
info!("ready for backgound jobs barrier");
}
// Tenant may not be loadable if we fail late in cleanup_remaining_fs_traces (e g remove timelines dir)
let timelines_path = tenant.conf.timelines_path(&tenant.tenant_id);
if timelines_path.exists() {
tenant.load(init_order, None, ctx).await.context("load")?;
}
Self::background(
guard,
tenant.conf,
tenant.remote_storage.clone(),
tenants,
tenant,
)
.await
}
pub(crate) async fn resume_from_attach(
guard: DeletionGuard,
tenant: &Arc<Tenant>,
preload: Option<TenantPreload>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
init_order: Option<InitializationOrder>,
ctx: &RequestContext,
) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel();
@@ -459,7 +388,7 @@ impl DeleteTenantFlow {
.expect("cant be stopping or broken");
tenant
.attach(ctx, super::AttachMarkerMode::Expect)
.attach(init_order, preload, ctx)
.await
.context("attach")?;

View File

@@ -1,7 +1,7 @@
//! This module acts as a switchboard to access different repositories managed by this
//! page server.
use camino::{Utf8Path, Utf8PathBuf};
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use rand::{distributions::Alphanumeric, Rng};
use std::collections::{hash_map, HashMap};
use std::sync::Arc;
@@ -26,10 +26,7 @@ use crate::deletion_queue::DeletionQueueClient;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::{
create_tenant_files, AttachMarkerMode, AttachedTenantConf, CreateTenantFilesMode, Tenant,
TenantState,
};
use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
use utils::crashsafe::path_with_suffix_extension;
@@ -256,83 +253,105 @@ async fn init_load_generations(
Ok(Some(generations))
}
/// Given a directory discovered in the pageserver's tenants/ directory, attempt
/// to load a tenant config from it.
///
/// If file is missing, return Ok(None)
fn load_tenant_config(
conf: &'static PageServerConf,
dentry: Utf8DirEntry,
) -> anyhow::Result<Option<(TenantId, anyhow::Result<LocationConf>)>> {
let tenant_dir_path = dentry.path().to_path_buf();
if crate::is_temporary(&tenant_dir_path) {
info!("Found temporary tenant directory, removing: {tenant_dir_path}");
// No need to use safe_remove_tenant_dir_all because this is already
// a temporary path
if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) {
error!(
"Failed to remove temporary directory '{}': {:?}",
tenant_dir_path, e
);
}
return Ok(None);
}
// This case happens if we crash during attachment before writing a config into the dir
let is_empty = tenant_dir_path
.is_empty_dir()
.with_context(|| format!("Failed to check whether {tenant_dir_path:?} is an empty dir"))?;
if is_empty {
info!("removing empty tenant directory {tenant_dir_path:?}");
if let Err(e) = std::fs::remove_dir(&tenant_dir_path) {
error!(
"Failed to remove empty tenant directory '{}': {e:#}",
tenant_dir_path
)
}
return Ok(None);
}
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
if tenant_ignore_mark_file.exists() {
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
return Ok(None);
}
let tenant_id = match tenant_dir_path
.file_name()
.unwrap_or_default()
.parse::<TenantId>()
{
Ok(id) => id,
Err(_) => {
warn!("Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",);
return Ok(None);
}
};
Ok(Some((
tenant_id,
Tenant::load_tenant_config(conf, &tenant_id),
)))
}
/// Initial stage of load: walk the local tenants directory, clean up any temp files,
/// and load configurations for the tenants we found.
///
/// Do this in parallel, because we expect 10k+ tenants, so serial execution can take
/// seconds even on reasonably fast drives.
async fn init_load_tenant_configs(
conf: &'static PageServerConf,
) -> anyhow::Result<HashMap<TenantId, anyhow::Result<LocationConf>>> {
let tenants_dir = conf.tenants_path();
let mut dir_entries = tenants_dir
.read_dir_utf8()
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
let dentries = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<Utf8DirEntry>> {
let dir_entries = tenants_dir
.read_dir_utf8()
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
let mut result = Vec::new();
for dentry in dir_entries {
result.push(dentry?);
}
Ok(result)
})
.await??;
let mut configs = HashMap::new();
loop {
match dir_entries.next() {
None => break,
Some(Ok(dentry)) => {
let tenant_dir_path = dentry.path().to_path_buf();
if crate::is_temporary(&tenant_dir_path) {
info!("Found temporary tenant directory, removing: {tenant_dir_path}");
// No need to use safe_remove_tenant_dir_all because this is already
// a temporary path
if let Err(e) = fs::remove_dir_all(&tenant_dir_path).await {
error!(
"Failed to remove temporary directory '{}': {:?}",
tenant_dir_path, e
);
}
continue;
}
// This case happens if we:
// * crash during attach before creating the attach marker file
// * crash during tenant delete before removing tenant directory
let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
})?;
if is_empty {
info!("removing empty tenant directory {tenant_dir_path:?}");
if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
error!(
"Failed to remove empty tenant directory '{}': {e:#}",
tenant_dir_path
)
}
continue;
}
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
if tenant_ignore_mark_file.exists() {
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
continue;
}
let tenant_id = match tenant_dir_path
.file_name()
.unwrap_or_default()
.parse::<TenantId>()
{
Ok(id) => id,
Err(_) => {
warn!(
"Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",
);
continue;
}
};
configs.insert(tenant_id, Tenant::load_tenant_config(conf, &tenant_id));
}
Some(Err(e)) => {
// An error listing the top level directory indicates serious problem
// with local filesystem: we will fail to load, and fail to start.
anyhow::bail!(e);
}
}
let mut join_set = JoinSet::new();
for dentry in dentries {
join_set.spawn_blocking(move || load_tenant_config(conf, dentry));
}
while let Some(r) = join_set.join_next().await {
match r?? {
Some((tenant_id, tenant_config)) => configs.insert(tenant_id, tenant_config),
None => None,
};
}
Ok(configs)
}
@@ -480,45 +499,24 @@ pub(crate) fn schedule_local_tenant_processing(
"Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
);
let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
if resources.remote_storage.is_none() {
warn!("tenant {tenant_id} has attaching mark file, but pageserver has no remote storage configured");
Tenant::create_broken_tenant(
conf,
tenant_id,
"attaching mark file present but no remote storage configured".to_string(),
)
} else {
match Tenant::spawn_attach(
conf,
tenant_id,
resources,
location_conf,
tenants,
AttachMarkerMode::Expect,
ctx,
) {
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
}
}
info!("Attaching tenant {tenant_id}");
let tenant = match Tenant::spawn(
conf,
tenant_id,
resources,
location_conf,
init_order,
tenants,
SpawnMode::Normal,
ctx,
) {
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn tenant {tenant_id}, reason: {e:#}");
Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
}
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(
conf,
tenant_id,
location_conf,
resources,
init_order,
tenants,
ctx,
)
};
Ok(tenant)
}
@@ -660,13 +658,13 @@ pub(crate) async fn create_tenant(
// We're holding the tenants lock in write mode while doing local IO.
// If this section ever becomes contentious, introduce a new `TenantState::Creating`
// and do the work in that state.
let tenant_directory = super::create_tenant_files(conf, &location_conf, &tenant_id, CreateTenantFilesMode::Create).await?;
super::create_tenant_files(conf, &location_conf, &tenant_id).await?;
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant =
schedule_local_tenant_processing(conf, tenant_id, &tenant_directory,
AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)?;
let created_tenant = Tenant::spawn(conf, tenant_id, resources,
AttachedTenantConf::try_from(location_conf)?, None, &TENANTS, SpawnMode::Create, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -817,7 +815,7 @@ pub(crate) async fn upsert_location(
.await
.map_err(SetNewTenantConfigError::Persist)?;
let tenant = match Tenant::spawn_attach(
let tenant = match Tenant::spawn(
conf,
tenant_id,
TenantSharedResources {
@@ -826,16 +824,14 @@ pub(crate) async fn upsert_location(
deletion_queue_client,
},
AttachedTenantConf::try_from(new_location_config)?,
None,
&TENANTS,
// The LocationConf API does not use marker files, because we have Secondary
// locations where the directory's existence is not a signal that it contains
// all timelines. See https://github.com/neondatabase/neon/issues/5550
AttachMarkerMode::Ignore,
SpawnMode::Normal,
ctx,
) {
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
error!("Failed to spawn tenant {tenant_id}, reason: {e:#}");
Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
}
};
@@ -1101,17 +1097,10 @@ pub(crate) async fn attach_tenant(
) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, || async {
let location_conf = LocationConf::attached_single(tenant_conf, generation);
let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id, CreateTenantFilesMode::Attach).await?;
let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?;
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
// Without the attach marker, schedule_local_tenant_processing will treat the attached tenant as fully attached
let marker_file_exists = conf
.tenant_attaching_mark_file_path(&tenant_id)
.try_exists()
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233

View File

@@ -170,36 +170,14 @@
//! - create [`RemoteLayer`](super::storage_layer::RemoteLayer) instances
//! for layers that are referenced by `IndexPart` but not present locally
//! - schedule uploads for layers that are only present locally.
//! - if the remote `IndexPart`'s metadata was newer than the metadata in
//! the local filesystem, write the remote metadata to the local filesystem
//! - After the above is done for each timeline, open the tenant for business by
//! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
//! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
//!
//! We keep track of the fact that a client is in `Attaching` state in a marker
//! file on the local disk. This is critical because, when we restart the pageserver,
//! we do not want to do the `List timelines` step for each tenant that has already
//! been successfully attached (for performance & cost reasons).
//! Instead, for a tenant without the attach marker file, we assume that the
//! local state is in sync or ahead of the remote state. This includes the list
//! of all of the tenant's timelines, which is particularly critical to be up-to-date:
//! if there's a timeline on the remote that the pageserver doesn't know about,
//! the GC will not consider its branch point, leading to data loss.
//! So, for a tenant with the attach marker file, we know that we do not yet have
//! persisted all the remote timeline's metadata files locally. To exclude the
//! risk above, we re-run the procedure for such tenants
//!
//! # Operating Without Remote Storage
//!
//! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
//! not created and the uploads are skipped.
//! Theoretically, it should be ok to remove and re-add remote storage configuration to
//! the pageserver config at any time, since it doesn't make a difference to
//! [`Timeline::load_layer_map`].
//! Of course, the remote timeline dir must not change while we have de-configured
//! remote storage, i.e., the pageserver must remain the owner of the given prefix
//! in remote storage.
//! But note that we don't test any of this right now.
//!
//! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
//! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
@@ -468,7 +446,10 @@ impl RemoteTimelineClient {
//
/// Download index file
pub async fn download_index_file(&self) -> Result<MaybeDeletedIndexPart, DownloadError> {
pub async fn download_index_file(
&self,
cancel: CancellationToken,
) -> Result<MaybeDeletedIndexPart, DownloadError> {
let _unfinished_gauge_guard = self.metrics.call_begin(
&RemoteOpFileKind::Index,
&RemoteOpKind::Download,
@@ -482,6 +463,7 @@ impl RemoteTimelineClient {
&self.tenant_id,
&self.timeline_id,
self.generation,
cancel,
)
.measure_remote_op(
self.tenant_id,
@@ -1655,7 +1637,11 @@ mod tests {
let client = timeline.remote_client.as_ref().unwrap();
// Download back the index.json, and check that the list of files is correct
let initial_index_part = match client.download_index_file().await.unwrap() {
let initial_index_part = match client
.download_index_file(CancellationToken::new())
.await
.unwrap()
{
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
};
@@ -1747,7 +1733,11 @@ mod tests {
}
// Download back the index.json, and check that the list of files is correct
let index_part = match client.download_index_file().await.unwrap() {
let index_part = match client
.download_index_file(CancellationToken::new())
.await
.unwrap()
{
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
};
@@ -1938,7 +1928,7 @@ mod tests {
let client = test_state.build_client(get_generation);
let download_r = client
.download_index_file()
.download_index_file(CancellationToken::new())
.await
.expect("download should always succeed");
assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));

View File

@@ -19,7 +19,7 @@ use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::Generation;
use remote_storage::{DownloadError, GenericRemoteStorage};
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
use utils::crashsafe::path_with_suffix_extension;
use utils::id::{TenantId, TimelineId};
@@ -170,47 +170,52 @@ pub fn is_temp_download_file(path: &Utf8Path) -> bool {
pub async fn list_remote_timelines(
storage: &GenericRemoteStorage,
tenant_id: TenantId,
) -> anyhow::Result<HashSet<TimelineId>> {
cancel: CancellationToken,
) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
let remote_path = remote_timelines_path(&tenant_id);
fail::fail_point!("storage-sync-list-remote-timelines", |_| {
anyhow::bail!("storage-sync-list-remote-timelines");
});
let timelines = download_retry(
|| storage.list_prefixes(Some(&remote_path)),
&format!("list prefixes for {tenant_id}"),
let listing = download_retry_forever(
|| storage.list(Some(&remote_path), ListingMode::WithDelimiter),
&format!("list timelines for {tenant_id}"),
cancel,
)
.await?;
if timelines.is_empty() {
anyhow::bail!("no timelines found on the remote storage")
let mut timeline_ids = HashSet::new();
let mut other_prefixes = HashSet::new();
tracing::info!("list_remote_timelines prefixes:");
for p in &listing.prefixes {
tracing::info!(" '{p}'");
}
tracing::info!("list_remote_timelines keys:");
for p in &listing.keys {
tracing::info!(" '{p}'");
}
let mut timeline_ids = HashSet::new();
for timeline_remote_storage_key in timelines {
for timeline_remote_storage_key in listing.prefixes {
let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}")
})?;
let timeline_id: TimelineId = object_name
.parse()
.with_context(|| format!("parse object name into timeline id '{object_name}'"))?;
// list_prefixes is assumed to return unique names. Ensure this here.
// NB: it's safer to bail out than warn-log this because the pageserver
// needs to absolutely know about _all_ timelines that exist, so that
// GC knows all the branchpoints. If we skipped over a timeline instead,
// GC could delete a layer that's still needed by that timeline.
anyhow::ensure!(
!timeline_ids.contains(&timeline_id),
"list_prefixes contains duplicate timeline id {timeline_id}"
);
timeline_ids.insert(timeline_id);
match object_name.parse::<TimelineId>() {
Ok(t) => timeline_ids.insert(t),
Err(_) => other_prefixes.insert(object_name.to_string()),
};
}
Ok(timeline_ids)
for key in listing.keys {
let object_name = key
.object_name()
.ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
other_prefixes.insert(object_name.to_string());
}
Ok((timeline_ids, other_prefixes))
}
async fn do_download_index_part(
@@ -218,10 +223,11 @@ async fn do_download_index_part(
tenant_id: &TenantId,
timeline_id: &TimelineId,
index_generation: Generation,
cancel: CancellationToken,
) -> Result<IndexPart, DownloadError> {
let remote_path = remote_index_path(tenant_id, timeline_id, index_generation);
let index_part_bytes = download_retry(
let index_part_bytes = download_retry_forever(
|| async {
let mut index_part_download = storage.download(&remote_path).await?;
@@ -236,6 +242,7 @@ async fn do_download_index_part(
Ok(index_part_bytes)
},
&format!("download {remote_path:?}"),
cancel,
)
.await?;
@@ -257,19 +264,28 @@ pub(super) async fn download_index_part(
tenant_id: &TenantId,
timeline_id: &TimelineId,
my_generation: Generation,
cancel: CancellationToken,
) -> Result<IndexPart, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();
if my_generation.is_none() {
// Operating without generations: just fetch the generation-less path
return do_download_index_part(storage, tenant_id, timeline_id, my_generation).await;
return do_download_index_part(storage, tenant_id, timeline_id, my_generation, cancel)
.await;
}
// Stale case: If we were intentionally attached in a stale generation, there may already be a remote
// index in our generation.
//
// This is an optimization to avoid doing the listing for the general case below.
let res = do_download_index_part(storage, tenant_id, timeline_id, my_generation).await;
let res = do_download_index_part(
storage,
tenant_id,
timeline_id,
my_generation,
cancel.clone(),
)
.await;
match res {
Ok(index_part) => {
tracing::debug!(
@@ -289,8 +305,14 @@ pub(super) async fn download_index_part(
// we want to find the most recent index from a previous generation.
//
// This is an optimization to avoid doing the listing for the general case below.
let res =
do_download_index_part(storage, tenant_id, timeline_id, my_generation.previous()).await;
let res = do_download_index_part(
storage,
tenant_id,
timeline_id,
my_generation.previous(),
cancel.clone(),
)
.await;
match res {
Ok(index_part) => {
tracing::debug!("Found index_part from previous generation");
@@ -334,13 +356,14 @@ pub(super) async fn download_index_part(
match max_previous_generation {
Some(g) => {
tracing::debug!("Found index_part in generation {g:?}");
do_download_index_part(storage, tenant_id, timeline_id, g).await
do_download_index_part(storage, tenant_id, timeline_id, g, cancel).await
}
None => {
// Migration from legacy pre-generation state: we have a generation but no prior
// attached pageservers did. Try to load from a no-generation path.
tracing::info!("No index_part.json* found");
do_download_index_part(storage, tenant_id, timeline_id, Generation::none()).await
do_download_index_part(storage, tenant_id, timeline_id, Generation::none(), cancel)
.await
}
}
}
@@ -370,3 +393,23 @@ where
)
.await
}
async fn download_retry_forever<T, O, F>(
op: O,
description: &str,
cancel: CancellationToken,
) -> Result<T, DownloadError>
where
O: FnMut() -> F,
F: Future<Output = Result<T, DownloadError>>,
{
backoff::retry(
op,
|e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound),
FAILED_DOWNLOAD_WARN_THRESHOLD,
u32::MAX,
description,
backoff::Cancel::new(cancel, || DownloadError::Cancelled),
)
.await
}

View File

@@ -294,6 +294,7 @@ async fn cleanup_remaining_timeline_fs_traces(
// Remove delete mark
tokio::fs::remove_file(conf.timeline_delete_mark_file_path(tenant_id, timeline_id))
.await
.or_else(fs_ext::ignore_not_found)
.context("remove delete mark")
}

View File

@@ -60,6 +60,7 @@ from fixtures.utils import (
allure_attach_from_dir,
get_self_dir,
subprocess_capture,
wait_until,
)
"""
@@ -1680,6 +1681,41 @@ class NeonPageserver(PgProtocol):
self.running = False
return self
def restart(self, immediate: bool = False):
"""
High level wrapper for restart: restarts the process, and waits for
tenant state to stabilize.
"""
self.stop(immediate=immediate)
self.start()
self.quiesce_tenants()
def quiesce_tenants(self):
"""
Wait for all tenants to enter a stable state (Active or Broken)
Call this after restarting the pageserver, or after attaching a tenant,
to ensure that it is ready for use.
"""
stable_states = {"Active", "Broken"}
client = self.http_client()
def complete():
log.info("Checking tenants...")
tenants = client.tenant_list()
tenants = client.tenant_list()
log.info(f"Tenant list: {tenants}...")
any_unstable = any((t["state"]["slug"] not in stable_states) for t in tenants)
if any_unstable:
for t in tenants:
log.info(f"Waiting for tenant {t['id']} in state {t['state']['slug']}")
log.info(f"any_unstable={any_unstable}")
assert not any_unstable
wait_until(20, 0.5, complete)
def __enter__(self) -> "NeonPageserver":
return self

View File

@@ -333,16 +333,30 @@ def test_non_uploaded_root_timeline_is_deleted_after_restart(neon_env_builder: N
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.append(
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
env.pageserver.allowed_errors.extend(
[
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*",
".*Failed to load index_part from remote storage.*",
# On a fast restart, there may be an initdb still running in a basebackup...__temp directory
".*Failed to purge.*Directory not empty.*",
]
)
ps_http = env.pageserver.http_client()
# pause all uploads
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
ps_http.tenant_create(env.initial_tenant)
# Create a timeline whose creation will succeed. The tenant will need at least one
# timeline to be loadable.
success_timeline = TimelineId.generate()
log.info(f"Creating timeline {success_timeline}")
ps_http.timeline_create(env.pg_version, env.initial_tenant, success_timeline, timeout=60)
# Create a timeline whose upload to remote storage will be blocked
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
def start_creating_timeline():
log.info(f"Creating (expect failure) timeline {env.initial_timeline}")
with pytest.raises(RequestException):
ps_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
@@ -366,6 +380,9 @@ def test_non_uploaded_root_timeline_is_deleted_after_restart(neon_env_builder: N
with pytest.raises(PageserverApiException, match="not found"):
ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
# The one successfully created timeline should still be there.
assert len(ps_http.timeline_list(tenant_id=env.initial_tenant)) == 1
def test_non_uploaded_branch_is_deleted_after_restart(neon_env_builder: NeonEnvBuilder):
"""

View File

@@ -15,7 +15,7 @@ from fixtures.types import TenantId, TimelineId
# Test restarting page server, while safekeeper and compute node keep
# running.
def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
def test_local_corruption(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(
@@ -69,24 +69,19 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
env.pageserver.start()
# Tenant 0 should still work
# Un-damaged tenant works
pg0.start()
assert pg0.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100
# But all others are broken
# First timeline would not get loaded into pageserver due to corrupt metadata file
with pytest.raises(
Exception, match=f"Tenant {tenant1} will not become active. Current state: Broken"
) as err:
pg1.start()
log.info(
f"As expected, compute startup failed eagerly for timeline with corrupt metadata: {err}"
)
# Tenant with corrupt local metadata works: remote storage is authoritative for metadata
pg1.start()
assert pg1.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100
# Second timeline will fail during basebackup, because the local layer file is corrupt.
# It will fail when we try to read (and reconstruct) a page from it, ergo the error message.
# (We don't check layer file contents on startup, when loading the timeline)
#
# This will change when we implement checksums for layers
with pytest.raises(Exception, match="Failed to load delta layer") as err:
pg2.start()
log.info(
@@ -133,8 +128,7 @@ def test_timeline_init_break_before_checkpoint(neon_env_builder: NeonEnvBuilder)
_ = env.neon_cli.create_timeline("test_timeline_init_break_before_checkpoint", tenant_id)
# Restart the page server
env.pageserver.stop(immediate=True)
env.pageserver.start()
env.pageserver.restart(immediate=True)
# Creating the timeline didn't finish. The other timelines on tenant should still be present and work normally.
new_tenant_timelines = env.neon_cli.list_timelines(tenant_id)

View File

@@ -62,14 +62,14 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool)
tenant_load_delay_ms = 5000
env.pageserver.stop()
env.pageserver.start(
extra_env_vars={"FAILPOINTS": f"before-loading-tenant=return({tenant_load_delay_ms})"}
extra_env_vars={"FAILPOINTS": f"before-attaching-tenant=return({tenant_load_delay_ms})"}
)
# Check that it's in Loading state
# Check that it's in Attaching state
client = env.pageserver.http_client()
tenant_status = client.tenant_status(env.initial_tenant)
log.info("Tenant status : %s", tenant_status)
assert tenant_status["state"]["slug"] == "Loading"
assert tenant_status["state"]["slug"] == "Attaching"
# Try to read. This waits until the loading finishes, and then return normally.
cur.execute("SELECT count(*) FROM foo")

View File

@@ -241,8 +241,7 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
assert reason.endswith(f"failpoint: {failpoint}"), reason
if check is Check.RETRY_WITH_RESTART:
env.pageserver.stop()
env.pageserver.start()
env.pageserver.restart()
if failpoint in (
"tenant-delete-before-shutdown",

View File

@@ -66,10 +66,6 @@ def test_tenant_reattach(
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
# Thats because of UnreliableWrapper's injected failures
env.pageserver.allowed_errors.append(
f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*"
)
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
with endpoint.cursor() as cur:
@@ -116,7 +112,7 @@ def test_tenant_reattach(
assert query_scalar(cur, "SELECT count(*) FROM t") == 100000
# Check that we had to retry the downloads
assert env.pageserver.log_contains(".*list prefixes.*failed, will retry.*")
assert env.pageserver.log_contains(".*list timelines.*failed, will retry.*")
assert env.pageserver.log_contains(".*download.*failed, will retry.*")
@@ -643,47 +639,6 @@ def test_ignored_tenant_download_missing_layers(neon_env_builder: NeonEnvBuilder
ensure_test_data(data_id, data_secret, endpoint)
# Tests that it's possible to `load` broken tenants:
# * `ignore` a tenant
# * removes its `metadata` file locally
# * `load` the same tenant
# * ensure that it's status is `Broken`
def test_ignored_tenant_stays_broken_without_metadata(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
env.endpoints.create_start("main")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: (Broken|Stopping).*"
)
# ignore the tenant and remove its metadata
pageserver_http.tenant_ignore(tenant_id)
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
metadata_removed = False
for dir_entry in timeline_dir.iterdir():
if dir_entry.name == "metadata":
# Looks like a layer file. Remove it
dir_entry.unlink()
metadata_removed = True
assert metadata_removed, f"Failed to find metadata file in {timeline_dir}"
env.pageserver.allowed_errors.append(
f".*{tenant_id}.*: load failed.*: failed to load metadata.*"
)
# now, load it from the local files and expect it to be broken due to inability to load tenant files into memory
pageserver_http.tenant_load(tenant_id=tenant_id)
wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 5)
# Tests that attach is never working on a tenant, ignored or not, as long as it's not absent locally
# Similarly, tests that it's not possible to schedule a `load` for tenat that's not ignored.
def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder):
@@ -778,7 +733,8 @@ def test_ignore_while_attaching(
tenants_before_ignore
), "Only ignored tenant should be missing"
# But can load it from local files, that will restore attach.
# Calling load will bring the tenant back online
pageserver_http.configure_failpoints([("attach-before-activate", "off")])
pageserver_http.tenant_load(tenant_id)
wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5)

View File

@@ -1,5 +1,4 @@
import os
import shutil
import time
from contextlib import closing
from datetime import datetime
@@ -20,7 +19,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.utils import timeline_delete_wait_completed
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.types import Lsn, TenantId
from fixtures.utils import wait_until
from prometheus_client.samples import Sample
@@ -298,13 +297,8 @@ def test_pageserver_with_empty_tenants(
client = env.pageserver.http_client()
tenant_with_empty_timelines = TenantId.generate()
client.tenant_create(tenant_with_empty_timelines)
temp_timelines = client.timeline_list(tenant_with_empty_timelines)
for temp_timeline in temp_timelines:
timeline_delete_wait_completed(
client, tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
)
tenant_with_empty_timelines = env.initial_tenant
timeline_delete_wait_completed(client, tenant_with_empty_timelines, env.initial_timeline)
files_in_timelines_dir = sum(
1 for _p in Path.iterdir(env.pageserver.timeline_dir(tenant_with_empty_timelines))
@@ -317,34 +311,19 @@ def test_pageserver_with_empty_tenants(
env.endpoints.stop_all()
env.pageserver.stop()
tenant_without_timelines_dir = env.initial_tenant
shutil.rmtree(env.pageserver.timeline_dir(tenant_without_timelines_dir))
env.pageserver.start()
client = env.pageserver.http_client()
def not_loading():
def not_attaching():
tenants = client.tenant_list()
assert len(tenants) == 2
assert all(t["state"]["slug"] != "Loading" for t in tenants)
assert len(tenants) == 1
assert all(t["state"]["slug"] != "Attaching" for t in tenants)
wait_until(10, 0.2, not_loading)
wait_until(10, 0.2, not_attaching)
tenants = client.tenant_list()
[broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)]
assert (
broken_tenant["state"]["slug"] == "Broken"
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
broken_tenant_status = client.tenant_status(tenant_without_timelines_dir)
assert (
broken_tenant_status["state"]["slug"] == "Broken"
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
assert env.pageserver.log_contains(".*load failed, setting tenant state to Broken:.*")
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
assert (
loaded_tenant["state"]["slug"] == "Active"
@@ -358,9 +337,6 @@ def test_pageserver_with_empty_tenants(
time.sleep(1) # to allow metrics propagation
ps_metrics = client.get_metrics()
broken_tenants_metric_filter = {
"tenant_id": str(tenant_without_timelines_dir),
}
active_tenants_metric_filter = {
"state": "Active",
}
@@ -374,13 +350,3 @@ def test_pageserver_with_empty_tenants(
assert (
tenant_active_count == 1
), f"Tenant {tenant_with_empty_timelines} should have metric as active"
tenant_broken_count = int(
ps_metrics.query_one(
"pageserver_broken_tenants_count", filter=broken_tenants_metric_filter
).value
)
assert (
tenant_broken_count == 1
), f"Tenant {tenant_without_timelines_dir} should have metric as broken"

View File

@@ -70,8 +70,7 @@ def test_threshold_based_eviction(
}
# restart because changing tenant config is not instant
env.pageserver.stop()
env.pageserver.start()
env.pageserver.restart()
assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == {
"kind": "LayerAccessThreshold",

View File

@@ -277,13 +277,6 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
if failpoint == "timeline-delete-after-index-delete":
m = ps_http.get_metrics()
assert (
m.query_one(
"remote_storage_s3_request_seconds_count",
filter={"request_type": "get_object", "result": "err"},
).value
== 2 # One is missing tenant deletion mark, second is missing index part
)
assert (
m.query_one(
"remote_storage_s3_request_seconds_count",