pageserver: always load remote metadata (no more spawn_load) (#5580)

## Problem

The pageserver had two ways of loading a tenant:
- `spawn_load` would trust on-disk content to reflect all existing
timelines
- `spawn_attach` would list timelines in remote storage.

It was incorrect for `spawn_load` to trust local disk content, because
it doesn't know if the tenant might have been attached and written
somewhere else. To make this correct would requires some generation
number checks, but the payoff is to avoid one S3 op per tenant at
startup, so it's not worth the complexity -- it is much simpler to have
one way to load a tenant.

## Summary of changes

- `Tenant` objects are always created with `Tenant::spawn`: there is no
more distinction between "load" and "attach".
- The ability to run without remote storage (for `neon_local`) is
preserved by adding a branch inside `attach` that uses a fallback
`load_local` if no remote_storage is present.
- Fix attaching a tenant when it has a timeline with no IndexPart: this
can occur if a newly created timeline manages to upload a layer before
it has uploaded an index.
- The attach marker file that used to indicate whether a tenant should
be "loaded" or "attached" is no longer needed, and is removed.
- The GenericRemoteStorage interface gets a `list()` method that maps
more directly to what ListObjects does, returning both keys and common
prefixes. The existing `list_files` and `list_prefixes` methods are just
calls into `list()` now -- these can be removed later if we would like
to shrink the interface a bit.
- The remote deletion marker is moved into `timelines/` and detected as
part of listing timelines rather than as a separate GET request. If any
existing tenants have a marker in the old location (unlikely, only
happens if something crashes mid-delete), then they will rely on the
control plane retrying to complete their deletion.
- Revise S3 calls for timeline listing and tenant load to take a
cancellation token, and retry forever: it never makes sense to make a
Tenant broken because of a transient S3 issue.

## Breaking changes

- The remote deletion marker is moved from `deleted` to
`timelines/deleted` within the tenant prefix. Markers in the old
location will be ignored: it is the control plane's responsibility to
retry deletions until they succeed. Markers in the new location will be
tolerated by the previous release of pageserver via
https://github.com/neondatabase/neon/pull/5632
- The local `attaching` marker file is no longer written. Therefore, if
the pageserver is downgraded after running this code, the old pageserver
will not be able to distinguish between partially attached tenants and
fully attached tenants. This would only impact tenants that were partway
through attaching at the moment of downgrade. In the unlikely even t
that we do experience an incident that prompts us to roll back, then we
may check for attach operations in flight, and manually insert
`attaching` marker files as needed.

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
John Spray
2023-10-26 14:48:44 +01:00
committed by GitHub
parent 8360307ea0
commit de90bf4663
22 changed files with 808 additions and 1011 deletions

View File

@@ -110,7 +110,6 @@ impl TenantState {
// So, return `Maybe` while Attaching, making Console wait for the attach task to finish. // So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe, Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
// tenant mgr startup distinguishes attaching from loading via marker file. // tenant mgr startup distinguishes attaching from loading via marker file.
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached, Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
// We only reach Active after successful load / attach. // We only reach Active after successful load / attach.
// So, call atttachment status Attached. // So, call atttachment status Attached.

View File

@@ -23,8 +23,8 @@ use tracing::debug;
use crate::s3_bucket::RequestKind; use crate::s3_bucket::RequestKind;
use crate::{ use crate::{
AzureConfig, ConcurrencyLimiter, Download, DownloadError, RemotePath, RemoteStorage, AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath,
StorageMetadata, RemoteStorage, StorageMetadata,
}; };
pub struct AzureBlobStorage { pub struct AzureBlobStorage {
@@ -184,10 +184,11 @@ fn to_download_error(error: azure_core::Error) -> DownloadError {
#[async_trait::async_trait] #[async_trait::async_trait]
impl RemoteStorage for AzureBlobStorage { impl RemoteStorage for AzureBlobStorage {
async fn list_prefixes( async fn list(
&self, &self,
prefix: Option<&RemotePath>, prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> { mode: ListingMode,
) -> anyhow::Result<Listing, DownloadError> {
// get the passed prefix or if it is not set use prefix_in_bucket value // get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix let list_prefix = prefix
.map(|p| self.relative_path_to_name(p)) .map(|p| self.relative_path_to_name(p))
@@ -195,16 +196,19 @@ impl RemoteStorage for AzureBlobStorage {
.map(|mut p| { .map(|mut p| {
// required to end with a separator // required to end with a separator
// otherwise request will return only the entry of a prefix // otherwise request will return only the entry of a prefix
if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { if matches!(mode, ListingMode::WithDelimiter)
&& !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
{
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR); p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
} }
p p
}); });
let mut builder = self let mut builder = self.client.list_blobs();
.client
.list_blobs() if let ListingMode::WithDelimiter = mode {
.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()); builder = builder.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
}
if let Some(prefix) = list_prefix { if let Some(prefix) = list_prefix {
builder = builder.prefix(Cow::from(prefix.to_owned())); builder = builder.prefix(Cow::from(prefix.to_owned()));
@@ -215,46 +219,23 @@ impl RemoteStorage for AzureBlobStorage {
} }
let mut response = builder.into_stream(); let mut response = builder.into_stream();
let mut res = Vec::new(); let mut res = Listing::default();
while let Some(entry) = response.next().await { while let Some(l) = response.next().await {
let entry = entry.map_err(to_download_error)?; let entry = l.map_err(to_download_error)?;
let name_iter = entry let prefix_iter = entry
.blobs .blobs
.prefixes() .prefixes()
.map(|prefix| self.name_to_relative_path(&prefix.name)); .map(|prefix| self.name_to_relative_path(&prefix.name));
res.extend(name_iter); res.prefixes.extend(prefix_iter);
}
Ok(res)
}
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> { let blob_iter = entry
let folder_name = folder
.map(|p| self.relative_path_to_name(p))
.or_else(|| self.prefix_in_container.clone());
let mut builder = self.client.list_blobs();
if let Some(folder_name) = folder_name {
builder = builder.prefix(Cow::from(folder_name.to_owned()));
}
if let Some(limit) = self.max_keys_per_list_response {
builder = builder.max_results(MaxResults::new(limit));
}
let mut response = builder.into_stream();
let mut res = Vec::new();
while let Some(l) = response.next().await {
let entry = l.map_err(anyhow::Error::new)?;
let name_iter = entry
.blobs .blobs
.blobs() .blobs()
.map(|bl| self.name_to_relative_path(&bl.name)); .map(|k| self.name_to_relative_path(&k.name));
res.extend(name_iter); res.keys.extend(blob_iter);
} }
Ok(res) Ok(res)
} }
async fn upload( async fn upload(
&self, &self,
mut from: impl AsyncRead + Unpin + Send + Sync + 'static, mut from: impl AsyncRead + Unpin + Send + Sync + 'static,

View File

@@ -129,6 +129,22 @@ impl RemotePath {
} }
} }
/// We don't need callers to be able to pass arbitrary delimiters: just control
/// whether listings will use a '/' separator or not.
///
/// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
/// NoDelimiter mode will only populate `keys`.
pub enum ListingMode {
WithDelimiter,
NoDelimiter,
}
#[derive(Default)]
pub struct Listing {
pub prefixes: Vec<RemotePath>,
pub keys: Vec<RemotePath>,
}
/// Storage (potentially remote) API to manage its state. /// Storage (potentially remote) API to manage its state.
/// This storage tries to be unaware of any layered repository context, /// This storage tries to be unaware of any layered repository context,
/// providing basic CRUD operations for storage files. /// providing basic CRUD operations for storage files.
@@ -141,8 +157,13 @@ pub trait RemoteStorage: Send + Sync + 'static {
async fn list_prefixes( async fn list_prefixes(
&self, &self,
prefix: Option<&RemotePath>, prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError>; ) -> Result<Vec<RemotePath>, DownloadError> {
let result = self
.list(prefix, ListingMode::WithDelimiter)
.await?
.prefixes;
Ok(result)
}
/// Lists all files in directory "recursively" /// Lists all files in directory "recursively"
/// (not really recursively, because AWS has a flat namespace) /// (not really recursively, because AWS has a flat namespace)
/// Note: This is subtely different than list_prefixes, /// Note: This is subtely different than list_prefixes,
@@ -154,7 +175,16 @@ pub trait RemoteStorage: Send + Sync + 'static {
/// whereas, /// whereas,
/// list_prefixes("foo/bar/") = ["cat", "dog"] /// list_prefixes("foo/bar/") = ["cat", "dog"]
/// See `test_real_s3.rs` for more details. /// See `test_real_s3.rs` for more details.
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>>; async fn list_files(&self, prefix: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let result = self.list(prefix, ListingMode::NoDelimiter).await?.keys;
Ok(result)
}
async fn list(
&self,
prefix: Option<&RemotePath>,
_mode: ListingMode,
) -> anyhow::Result<Listing, DownloadError>;
/// Streams the local file contents into remote into the remote storage entry. /// Streams the local file contents into remote into the remote storage entry.
async fn upload( async fn upload(
@@ -205,6 +235,9 @@ pub enum DownloadError {
BadInput(anyhow::Error), BadInput(anyhow::Error),
/// The file was not found in the remote storage. /// The file was not found in the remote storage.
NotFound, NotFound,
/// A cancellation token aborted the download, typically during
/// tenant detach or process shutdown.
Cancelled,
/// The file was found in the remote storage, but the download failed. /// The file was found in the remote storage, but the download failed.
Other(anyhow::Error), Other(anyhow::Error),
} }
@@ -215,6 +248,7 @@ impl std::fmt::Display for DownloadError {
DownloadError::BadInput(e) => { DownloadError::BadInput(e) => {
write!(f, "Failed to download a remote file due to user input: {e}") write!(f, "Failed to download a remote file due to user input: {e}")
} }
DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
DownloadError::NotFound => write!(f, "No file found for the remote object id given"), DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"), DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
} }
@@ -234,6 +268,19 @@ pub enum GenericRemoteStorage {
} }
impl GenericRemoteStorage { impl GenericRemoteStorage {
pub async fn list(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
) -> anyhow::Result<Listing, DownloadError> {
match self {
Self::LocalFs(s) => s.list(prefix, mode).await,
Self::AwsS3(s) => s.list(prefix, mode).await,
Self::AzureBlob(s) => s.list(prefix, mode).await,
Self::Unreliable(s) => s.list(prefix, mode).await,
}
}
// A function for listing all the files in a "directory" // A function for listing all the files in a "directory"
// Example: // Example:
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"] // list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]

View File

@@ -15,7 +15,7 @@ use tokio::{
use tracing::*; use tracing::*;
use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty}; use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
use crate::{Download, DownloadError, RemotePath}; use crate::{Download, DownloadError, Listing, ListingMode, RemotePath};
use super::{RemoteStorage, StorageMetadata}; use super::{RemoteStorage, StorageMetadata};
@@ -75,7 +75,7 @@ impl LocalFs {
} }
#[cfg(test)] #[cfg(test)]
async fn list(&self) -> anyhow::Result<Vec<RemotePath>> { async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
Ok(get_all_files(&self.storage_root, true) Ok(get_all_files(&self.storage_root, true)
.await? .await?
.into_iter() .into_iter()
@@ -89,52 +89,10 @@ impl LocalFs {
}) })
.collect()) .collect())
} }
}
#[async_trait::async_trait]
impl RemoteStorage for LocalFs {
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
let path = match prefix {
Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
None => Cow::Borrowed(&self.storage_root),
};
let prefixes_to_filter = get_all_files(path.as_ref(), false)
.await
.map_err(DownloadError::Other)?;
let mut prefixes = Vec::with_capacity(prefixes_to_filter.len());
// filter out empty directories to mirror s3 behavior.
for prefix in prefixes_to_filter {
if prefix.is_dir()
&& is_directory_empty(&prefix)
.await
.map_err(DownloadError::Other)?
{
continue;
}
prefixes.push(
prefix
.strip_prefix(&self.storage_root)
.context("Failed to strip prefix")
.and_then(RemotePath::new)
.expect(
"We list files for storage root, hence should be able to remote the prefix",
),
)
}
Ok(prefixes)
}
// recursively lists all files in a directory, // recursively lists all files in a directory,
// mirroring the `list_files` for `s3_bucket` // mirroring the `list_files` for `s3_bucket`
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> { async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let full_path = match folder { let full_path = match folder {
Some(folder) => folder.with_base(&self.storage_root), Some(folder) => folder.with_base(&self.storage_root),
None => self.storage_root.clone(), None => self.storage_root.clone(),
@@ -186,6 +144,70 @@ impl RemoteStorage for LocalFs {
Ok(files) Ok(files)
} }
}
#[async_trait::async_trait]
impl RemoteStorage for LocalFs {
async fn list(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
) -> Result<Listing, DownloadError> {
let mut result = Listing::default();
if let ListingMode::NoDelimiter = mode {
let keys = self
.list_recursive(prefix)
.await
.map_err(DownloadError::Other)?;
result.keys = keys
.into_iter()
.filter(|k| {
let path = k.with_base(&self.storage_root);
!path.is_dir()
})
.collect();
return Ok(result);
}
let path = match prefix {
Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
None => Cow::Borrowed(&self.storage_root),
};
let prefixes_to_filter = get_all_files(path.as_ref(), false)
.await
.map_err(DownloadError::Other)?;
// filter out empty directories to mirror s3 behavior.
for prefix in prefixes_to_filter {
if prefix.is_dir()
&& is_directory_empty(&prefix)
.await
.map_err(DownloadError::Other)?
{
continue;
}
let stripped = prefix
.strip_prefix(&self.storage_root)
.context("Failed to strip prefix")
.and_then(RemotePath::new)
.expect(
"We list files for storage root, hence should be able to remote the prefix",
);
if prefix.is_dir() {
result.prefixes.push(stripped);
} else {
result.keys.push(stripped);
}
}
Ok(result)
}
async fn upload( async fn upload(
&self, &self,
@@ -479,7 +501,7 @@ mod fs_tests {
let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?; let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
assert_eq!( assert_eq!(
storage.list().await?, storage.list_all().await?,
vec![target_path_1.clone()], vec![target_path_1.clone()],
"Should list a single file after first upload" "Should list a single file after first upload"
); );
@@ -667,7 +689,7 @@ mod fs_tests {
let upload_target = upload_dummy_file(&storage, upload_name, None).await?; let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
storage.delete(&upload_target).await?; storage.delete(&upload_target).await?;
assert!(storage.list().await?.is_empty()); assert!(storage.list_all().await?.is_empty());
storage storage
.delete(&upload_target) .delete(&upload_target)
@@ -725,6 +747,43 @@ mod fs_tests {
Ok(()) Ok(())
} }
#[tokio::test]
async fn list() -> anyhow::Result<()> {
// No delimiter: should recursively list everything
let storage = create_storage()?;
let child = upload_dummy_file(&storage, "grandparent/parent/child", None).await?;
let uncle = upload_dummy_file(&storage, "grandparent/uncle", None).await?;
let listing = storage.list(None, ListingMode::NoDelimiter).await?;
assert!(listing.prefixes.is_empty());
assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec());
// Delimiter: should only go one deep
let listing = storage.list(None, ListingMode::WithDelimiter).await?;
assert_eq!(
listing.prefixes,
[RemotePath::from_string("timelines").unwrap()].to_vec()
);
assert!(listing.keys.is_empty());
// Delimiter & prefix
let listing = storage
.list(
Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
ListingMode::WithDelimiter,
)
.await?;
assert_eq!(
listing.prefixes,
[RemotePath::from_string("timelines/some_timeline/grandparent/parent").unwrap()]
.to_vec()
);
assert_eq!(listing.keys, [uncle.clone()].to_vec());
Ok(())
}
async fn upload_dummy_file( async fn upload_dummy_file(
storage: &LocalFs, storage: &LocalFs,
name: &str, name: &str,
@@ -777,7 +836,7 @@ mod fs_tests {
} }
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> { async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
let mut files = storage.list().await?; let mut files = storage.list_all().await?;
files.sort_by(|a, b| a.0.cmp(&b.0)); files.sort_by(|a, b| a.0.cmp(&b.0));
Ok(files) Ok(files)
} }

View File

@@ -30,8 +30,8 @@ use tracing::debug;
use super::StorageMetadata; use super::StorageMetadata;
use crate::{ use crate::{
ConcurrencyLimiter, Download, DownloadError, RemotePath, RemoteStorage, S3Config, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR, S3Config, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
}; };
pub(super) mod metrics; pub(super) mod metrics;
@@ -299,13 +299,13 @@ impl<S: AsyncRead> AsyncRead for TimedDownload<S> {
#[async_trait::async_trait] #[async_trait::async_trait]
impl RemoteStorage for S3Bucket { impl RemoteStorage for S3Bucket {
/// See the doc for `RemoteStorage::list_prefixes` async fn list(
/// Note: it wont include empty "directories"
async fn list_prefixes(
&self, &self,
prefix: Option<&RemotePath>, prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> { mode: ListingMode,
) -> Result<Listing, DownloadError> {
let kind = RequestKind::List; let kind = RequestKind::List;
let mut result = Listing::default();
// get the passed prefix or if it is not set use prefix_in_bucket value // get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix let list_prefix = prefix
@@ -314,28 +314,33 @@ impl RemoteStorage for S3Bucket {
.map(|mut p| { .map(|mut p| {
// required to end with a separator // required to end with a separator
// otherwise request will return only the entry of a prefix // otherwise request will return only the entry of a prefix
if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { if matches!(mode, ListingMode::WithDelimiter)
&& !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
{
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR); p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
} }
p p
}); });
let mut document_keys = Vec::new();
let mut continuation_token = None; let mut continuation_token = None;
loop { loop {
let _guard = self.permit(kind).await; let _guard = self.permit(kind).await;
let started_at = start_measuring_requests(kind); let started_at = start_measuring_requests(kind);
let fetch_response = self let mut request = self
.client .client
.list_objects_v2() .list_objects_v2()
.bucket(self.bucket_name.clone()) .bucket(self.bucket_name.clone())
.set_prefix(list_prefix.clone()) .set_prefix(list_prefix.clone())
.set_continuation_token(continuation_token) .set_continuation_token(continuation_token)
.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()) .set_max_keys(self.max_keys_per_list_response);
.set_max_keys(self.max_keys_per_list_response)
if let ListingMode::WithDelimiter = mode {
request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
}
let response = request
.send() .send()
.await .await
.context("Failed to list S3 prefixes") .context("Failed to list S3 prefixes")
@@ -345,71 +350,35 @@ impl RemoteStorage for S3Bucket {
metrics::BUCKET_METRICS metrics::BUCKET_METRICS
.req_seconds .req_seconds
.observe_elapsed(kind, &fetch_response, started_at); .observe_elapsed(kind, &response, started_at);
let fetch_response = fetch_response?; let response = response?;
document_keys.extend( let keys = response.contents().unwrap_or_default();
fetch_response let empty = Vec::new();
.common_prefixes let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
.unwrap_or_default()
.into_iter() tracing::info!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
for object in keys {
let object_path = object.key().expect("response does not contain a key");
let remote_path = self.s3_object_to_relative_path(object_path);
result.keys.push(remote_path);
}
result.prefixes.extend(
prefixes
.iter()
.filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))), .filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
); );
continuation_token = match fetch_response.next_continuation_token { continuation_token = match response.next_continuation_token {
Some(new_token) => Some(new_token), Some(new_token) => Some(new_token),
None => break, None => break,
}; };
} }
Ok(document_keys) Ok(result)
}
/// See the doc for `RemoteStorage::list_files`
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let kind = RequestKind::List;
let folder_name = folder
.map(|p| self.relative_path_to_s3_object(p))
.or_else(|| self.prefix_in_bucket.clone());
// AWS may need to break the response into several parts
let mut continuation_token = None;
let mut all_files = vec![];
loop {
let _guard = self.permit(kind).await;
let started_at = start_measuring_requests(kind);
let response = self
.client
.list_objects_v2()
.bucket(self.bucket_name.clone())
.set_prefix(folder_name.clone())
.set_continuation_token(continuation_token)
.set_max_keys(self.max_keys_per_list_response)
.send()
.await
.context("Failed to list files in S3 bucket");
let started_at = ScopeGuard::into_inner(started_at);
metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &response, started_at);
let response = response?;
for object in response.contents().unwrap_or_default() {
let object_path = object.key().expect("response does not contain a key");
let remote_path = self.s3_object_to_relative_path(object_path);
all_files.push(remote_path);
}
match response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok(all_files)
} }
async fn upload( async fn upload(

View File

@@ -5,7 +5,9 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Mutex; use std::sync::Mutex;
use crate::{Download, DownloadError, RemotePath, RemoteStorage, StorageMetadata}; use crate::{
Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata,
};
pub struct UnreliableWrapper { pub struct UnreliableWrapper {
inner: crate::GenericRemoteStorage, inner: crate::GenericRemoteStorage,
@@ -95,6 +97,15 @@ impl RemoteStorage for UnreliableWrapper {
self.inner.list_files(folder).await self.inner.list_files(folder).await
} }
async fn list(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
) -> Result<Listing, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
self.inner.list(prefix, mode).await
}
async fn upload( async fn upload(
&self, &self,
data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,

View File

@@ -33,8 +33,7 @@ use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
use crate::tenant::config::TenantConf; use crate::tenant::config::TenantConf;
use crate::tenant::config::TenantConfOpt; use crate::tenant::config::TenantConfOpt;
use crate::tenant::{ use crate::tenant::{
TENANTS_SEGMENT_NAME, TENANT_ATTACHING_MARKER_FILENAME, TENANT_DELETED_MARKER_FILE_NAME, TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
TIMELINES_SEGMENT_NAME,
}; };
use crate::{ use crate::{
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_LOCATION_CONFIG_NAME, IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_LOCATION_CONFIG_NAME,
@@ -633,11 +632,6 @@ impl PageServerConf {
self.tenants_path().join(tenant_id.to_string()) self.tenants_path().join(tenant_id.to_string())
} }
pub fn tenant_attaching_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id)
.join(TENANT_ATTACHING_MARKER_FILENAME)
}
pub fn tenant_ignore_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf { pub fn tenant_ignore_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id).join(IGNORED_TENANT_FILE_NAME) self.tenant_path(tenant_id).join(IGNORED_TENANT_FILE_NAME)
} }

File diff suppressed because it is too large Load Diff

View File

@@ -3,10 +3,10 @@ use std::sync::Arc;
use anyhow::Context; use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf}; use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::models::TenantState; use pageserver_api::models::TenantState;
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath}; use remote_storage::{GenericRemoteStorage, RemotePath};
use tokio::sync::OwnedMutexGuard; use tokio::sync::OwnedMutexGuard;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{error, info, instrument, warn, Instrument, Span}; use tracing::{error, instrument, warn, Instrument, Span};
use utils::{ use utils::{
backoff, completion, crashsafe, fs_ext, backoff, completion, crashsafe, fs_ext,
@@ -25,11 +25,9 @@ use super::{
remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD}, remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
span, span,
timeline::delete::DeleteTimelineFlow, timeline::delete::DeleteTimelineFlow,
tree_sort_timelines, DeleteTimelineError, Tenant, tree_sort_timelines, DeleteTimelineError, Tenant, TenantPreload,
}; };
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u32 = 3;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum DeleteTenantError { pub(crate) enum DeleteTenantError {
#[error("GetTenant {0}")] #[error("GetTenant {0}")]
@@ -60,7 +58,7 @@ fn remote_tenant_delete_mark_path(
.context("Failed to strip workdir prefix") .context("Failed to strip workdir prefix")
.and_then(RemotePath::new) .and_then(RemotePath::new)
.context("tenant path")?; .context("tenant path")?;
Ok(tenant_remote_path.join(Utf8Path::new("deleted"))) Ok(tenant_remote_path.join(Utf8Path::new("timelines/deleted")))
} }
async fn create_remote_delete_mark( async fn create_remote_delete_mark(
@@ -239,32 +237,6 @@ async fn cleanup_remaining_fs_traces(
Ok(()) Ok(())
} }
pub(crate) async fn remote_delete_mark_exists(
conf: &PageServerConf,
tenant_id: &TenantId,
remote_storage: &GenericRemoteStorage,
) -> anyhow::Result<bool> {
// If remote storage is there we rely on it
let remote_mark_path = remote_tenant_delete_mark_path(conf, tenant_id).context("path")?;
let result = backoff::retry(
|| async { remote_storage.download(&remote_mark_path).await },
|e| matches!(e, DownloadError::NotFound),
SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS,
SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS,
"fetch_tenant_deletion_mark",
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
)
.await;
match result {
Ok(_) => Ok(true),
Err(DownloadError::NotFound) => Ok(false),
Err(e) => Err(anyhow::anyhow!(e)).context("remote_delete_mark_exists")?,
}
}
/// Orchestrates tenant shut down of all tasks, removes its in-memory structures, /// Orchestrates tenant shut down of all tasks, removes its in-memory structures,
/// and deletes its data from both disk and s3. /// and deletes its data from both disk and s3.
/// The sequence of steps: /// The sequence of steps:
@@ -276,10 +248,9 @@ pub(crate) async fn remote_delete_mark_exists(
/// 6. Remove remote mark /// 6. Remove remote mark
/// 7. Cleanup remaining fs traces, tenant dir, config, timelines dir, local delete mark /// 7. Cleanup remaining fs traces, tenant dir, config, timelines dir, local delete mark
/// It is resumable from any step in case a crash/restart occurs. /// It is resumable from any step in case a crash/restart occurs.
/// There are three entrypoints to the process: /// There are two entrypoints to the process:
/// 1. [`DeleteTenantFlow::run`] this is the main one called by a management api handler. /// 1. [`DeleteTenantFlow::run`] this is the main one called by a management api handler.
/// 2. [`DeleteTenantFlow::resume_from_load`] is called during restarts when local or remote deletion marks are still there. /// 2. [`DeleteTenantFlow::resume_from_attach`] is called when deletion is resumed tenant is found to be deleted during attach process.
/// 3. [`DeleteTenantFlow::resume_from_attach`] is called when deletion is resumed tenant is found to be deleted during attach process.
/// Note the only other place that messes around timeline delete mark is the `Tenant::spawn_load` function. /// Note the only other place that messes around timeline delete mark is the `Tenant::spawn_load` function.
#[derive(Default)] #[derive(Default)]
pub enum DeleteTenantFlow { pub enum DeleteTenantFlow {
@@ -378,7 +349,7 @@ impl DeleteTenantFlow {
pub(crate) async fn should_resume_deletion( pub(crate) async fn should_resume_deletion(
conf: &'static PageServerConf, conf: &'static PageServerConf,
remote_storage: Option<&GenericRemoteStorage>, remote_mark_exists: bool,
tenant: &Tenant, tenant: &Tenant,
) -> Result<Option<DeletionGuard>, DeleteTenantError> { ) -> Result<Option<DeletionGuard>, DeleteTenantError> {
let acquire = |t: &Tenant| { let acquire = |t: &Tenant| {
@@ -389,66 +360,25 @@ impl DeleteTenantFlow {
) )
}; };
let tenant_id = tenant.tenant_id; if remote_mark_exists {
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
return Ok(acquire(tenant)); return Ok(acquire(tenant));
} }
let remote_storage = match remote_storage { let tenant_id = tenant.tenant_id;
Some(remote_storage) => remote_storage, // Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
None => return Ok(None), if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
};
if remote_delete_mark_exists(conf, &tenant_id, remote_storage).await? {
Ok(acquire(tenant)) Ok(acquire(tenant))
} else { } else {
Ok(None) Ok(None)
} }
} }
pub(crate) async fn resume_from_load(
guard: DeletionGuard,
tenant: &Arc<Tenant>,
init_order: Option<&InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel();
tenant
.set_stopping(progress, true, false)
.await
.expect("cant be stopping or broken");
// Do not consume valuable resources during the load phase, continue deletion once init phase is complete.
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
if let Some(background) = background_jobs_can_start {
info!("waiting for backgound jobs barrier");
background.clone().wait().await;
info!("ready for backgound jobs barrier");
}
// Tenant may not be loadable if we fail late in cleanup_remaining_fs_traces (e g remove timelines dir)
let timelines_path = tenant.conf.timelines_path(&tenant.tenant_id);
if timelines_path.exists() {
tenant.load(init_order, None, ctx).await.context("load")?;
}
Self::background(
guard,
tenant.conf,
tenant.remote_storage.clone(),
tenants,
tenant,
)
.await
}
pub(crate) async fn resume_from_attach( pub(crate) async fn resume_from_attach(
guard: DeletionGuard, guard: DeletionGuard,
tenant: &Arc<Tenant>, tenant: &Arc<Tenant>,
preload: Option<TenantPreload>,
tenants: &'static tokio::sync::RwLock<TenantsMap>, tenants: &'static tokio::sync::RwLock<TenantsMap>,
init_order: Option<InitializationOrder>,
ctx: &RequestContext, ctx: &RequestContext,
) -> Result<(), DeleteTenantError> { ) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel(); let (_, progress) = completion::channel();
@@ -459,7 +389,7 @@ impl DeleteTenantFlow {
.expect("cant be stopping or broken"); .expect("cant be stopping or broken");
tenant tenant
.attach(ctx, super::AttachMarkerMode::Expect) .attach(init_order, preload, ctx)
.await .await
.context("attach")?; .context("attach")?;

View File

@@ -26,10 +26,7 @@ use crate::deletion_queue::DeletionQueueClient;
use crate::task_mgr::{self, TaskKind}; use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt}; use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow; use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::{ use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
create_tenant_files, AttachMarkerMode, AttachedTenantConf, CreateTenantFilesMode, Tenant,
TenantState,
};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX}; use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
use utils::crashsafe::path_with_suffix_extension; use utils::crashsafe::path_with_suffix_extension;
@@ -437,14 +434,15 @@ pub async fn init_tenant_mgr(
location_conf.attach_in_generation(generation); location_conf.attach_in_generation(generation);
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?; Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
match schedule_local_tenant_processing( match tenant_spawn(
conf, conf,
tenant_id, tenant_id,
&tenant_dir_path, &tenant_dir_path,
AttachedTenantConf::try_from(location_conf)?,
resources.clone(), resources.clone(),
AttachedTenantConf::try_from(location_conf)?,
Some(init_order.clone()), Some(init_order.clone()),
&TENANTS, &TENANTS,
SpawnMode::Normal,
&ctx, &ctx,
) { ) {
Ok(tenant) => { Ok(tenant) => {
@@ -464,15 +462,18 @@ pub async fn init_tenant_mgr(
Ok(()) Ok(())
} }
/// Wrapper for Tenant::spawn that checks invariants before running, and inserts
/// a broken tenant in the map if Tenant::spawn fails.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub(crate) fn schedule_local_tenant_processing( pub(crate) fn tenant_spawn(
conf: &'static PageServerConf, conf: &'static PageServerConf,
tenant_id: TenantId, tenant_id: TenantId,
tenant_path: &Utf8Path, tenant_path: &Utf8Path,
location_conf: AttachedTenantConf,
resources: TenantSharedResources, resources: TenantSharedResources,
location_conf: AttachedTenantConf,
init_order: Option<InitializationOrder>, init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>, tenants: &'static tokio::sync::RwLock<TenantsMap>,
mode: SpawnMode,
ctx: &RequestContext, ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> { ) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!( anyhow::ensure!(
@@ -496,45 +497,24 @@ pub(crate) fn schedule_local_tenant_processing(
"Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}" "Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
); );
let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() { info!("Attaching tenant {tenant_id}");
info!("tenant {tenant_id} has attaching mark file, resuming its attach operation"); let tenant = match Tenant::spawn(
if resources.remote_storage.is_none() { conf,
warn!("tenant {tenant_id} has attaching mark file, but pageserver has no remote storage configured"); tenant_id,
Tenant::create_broken_tenant( resources,
conf, location_conf,
tenant_id, init_order,
"attaching mark file present but no remote storage configured".to_string(), tenants,
) mode,
} else { ctx,
match Tenant::spawn_attach( ) {
conf, Ok(tenant) => tenant,
tenant_id, Err(e) => {
resources, error!("Failed to spawn tenant {tenant_id}, reason: {e:#}");
location_conf, Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
tenants,
AttachMarkerMode::Expect,
ctx,
) {
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
}
}
} }
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(
conf,
tenant_id,
location_conf,
resources,
init_order,
tenants,
ctx,
)
}; };
Ok(tenant) Ok(tenant)
} }
@@ -670,29 +650,41 @@ pub(crate) async fn create_tenant(
ctx: &RequestContext, ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> { ) -> Result<Arc<Tenant>, TenantMapInsertError> {
tenant_map_insert(tenant_id, || async { tenant_map_insert(tenant_id, || async {
let location_conf = LocationConf::attached_single(tenant_conf, generation); let location_conf = LocationConf::attached_single(tenant_conf, generation);
// We're holding the tenants lock in write mode while doing local IO. // We're holding the tenants lock in write mode while doing local IO.
// If this section ever becomes contentious, introduce a new `TenantState::Creating` // If this section ever becomes contentious, introduce a new `TenantState::Creating`
// and do the work in that state. // and do the work in that state.
let tenant_directory = super::create_tenant_files(conf, &location_conf, &tenant_id, CreateTenantFilesMode::Create).await?; super::create_tenant_files(conf, &location_conf, &tenant_id).await?;
// TODO: tenant directory remains on disk if we bail out from here on. // TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233 // See https://github.com/neondatabase/neon/issues/4233
let created_tenant = let tenant_path = conf.tenant_path(&tenant_id);
schedule_local_tenant_processing(conf, tenant_id, &tenant_directory,
AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)?; let created_tenant = tenant_spawn(
conf,
tenant_id,
&tenant_path,
resources,
AttachedTenantConf::try_from(location_conf)?,
None,
&TENANTS,
SpawnMode::Create,
ctx,
)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233 // See https://github.com/neondatabase/neon/issues/4233
let crated_tenant_id = created_tenant.tenant_id(); let crated_tenant_id = created_tenant.tenant_id();
anyhow::ensure!( anyhow::ensure!(
tenant_id == crated_tenant_id, tenant_id == crated_tenant_id,
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {crated_tenant_id})", "loaded created tenant has unexpected tenant id \
); (expect {tenant_id} != actual {crated_tenant_id})",
);
Ok(created_tenant) Ok(created_tenant)
}).await })
.await
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@@ -801,9 +793,10 @@ pub(crate) async fn upsert_location(
} }
} }
let tenant_path = conf.tenant_path(&tenant_id);
let new_slot = match &new_location_config.mode { let new_slot = match &new_location_config.mode {
LocationMode::Secondary(_) => { LocationMode::Secondary(_) => {
let tenant_path = conf.tenant_path(&tenant_id);
// Directory doesn't need to be fsync'd because if we crash it can // Directory doesn't need to be fsync'd because if we crash it can
// safely be recreated next time this tenant location is configured. // safely be recreated next time this tenant location is configured.
unsafe_create_dir_all(&tenant_path) unsafe_create_dir_all(&tenant_path)
@@ -833,28 +826,21 @@ pub(crate) async fn upsert_location(
.await .await
.map_err(SetNewTenantConfigError::Persist)?; .map_err(SetNewTenantConfigError::Persist)?;
let tenant = match Tenant::spawn_attach( let tenant = tenant_spawn(
conf, conf,
tenant_id, tenant_id,
&tenant_path,
TenantSharedResources { TenantSharedResources {
broker_client, broker_client,
remote_storage, remote_storage,
deletion_queue_client, deletion_queue_client,
}, },
AttachedTenantConf::try_from(new_location_config)?, AttachedTenantConf::try_from(new_location_config)?,
None,
&TENANTS, &TENANTS,
// The LocationConf API does not use marker files, because we have Secondary SpawnMode::Normal,
// locations where the directory's existence is not a signal that it contains
// all timelines. See https://github.com/neondatabase/neon/issues/5550
AttachMarkerMode::Ignore,
ctx, ctx,
) { )?;
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
}
};
TenantSlot::Attached(tenant) TenantSlot::Attached(tenant)
} }
@@ -1043,7 +1029,7 @@ pub(crate) async fn load_tenant(
location_conf.attach_in_generation(generation); location_conf.attach_in_generation(generation);
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?; Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
let new_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_path, AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx) let new_tenant = tenant_spawn(conf, tenant_id, &tenant_path, resources, AttachedTenantConf::try_from(location_conf)?, None, &TENANTS, SpawnMode::Normal, ctx)
.with_context(|| { .with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}") format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?; })?;
@@ -1117,18 +1103,12 @@ pub(crate) async fn attach_tenant(
) -> Result<(), TenantMapInsertError> { ) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, || async { tenant_map_insert(tenant_id, || async {
let location_conf = LocationConf::attached_single(tenant_conf, generation); let location_conf = LocationConf::attached_single(tenant_conf, generation);
let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id, CreateTenantFilesMode::Attach).await?; let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?;
// TODO: tenant directory remains on disk if we bail out from here on. // TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233 // See https://github.com/neondatabase/neon/issues/4233
// Without the attach marker, schedule_local_tenant_processing will treat the attached tenant as fully attached let attached_tenant = tenant_spawn(conf, tenant_id, &tenant_dir,
let marker_file_exists = conf resources, AttachedTenantConf::try_from(location_conf)?, None, &TENANTS, SpawnMode::Normal, ctx)?;
.tenant_attaching_mark_file_path(&tenant_id)
.try_exists()
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233 // See https://github.com/neondatabase/neon/issues/4233

View File

@@ -168,36 +168,14 @@
//! - create `Timeline` struct and a `RemoteTimelineClient` //! - create `Timeline` struct and a `RemoteTimelineClient`
//! - initialize the client's upload queue with its `IndexPart` //! - initialize the client's upload queue with its `IndexPart`
//! - schedule uploads for layers that are only present locally. //! - schedule uploads for layers that are only present locally.
//! - if the remote `IndexPart`'s metadata was newer than the metadata in
//! the local filesystem, write the remote metadata to the local filesystem
//! - After the above is done for each timeline, open the tenant for business by //! - After the above is done for each timeline, open the tenant for business by
//! transitioning it from `TenantState::Attaching` to `TenantState::Active` state. //! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
//! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops. //! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
//! //!
//! We keep track of the fact that a client is in `Attaching` state in a marker
//! file on the local disk. This is critical because, when we restart the pageserver,
//! we do not want to do the `List timelines` step for each tenant that has already
//! been successfully attached (for performance & cost reasons).
//! Instead, for a tenant without the attach marker file, we assume that the
//! local state is in sync or ahead of the remote state. This includes the list
//! of all of the tenant's timelines, which is particularly critical to be up-to-date:
//! if there's a timeline on the remote that the pageserver doesn't know about,
//! the GC will not consider its branch point, leading to data loss.
//! So, for a tenant with the attach marker file, we know that we do not yet have
//! persisted all the remote timeline's metadata files locally. To exclude the
//! risk above, we re-run the procedure for such tenants
//!
//! # Operating Without Remote Storage //! # Operating Without Remote Storage
//! //!
//! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is //! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
//! not created and the uploads are skipped. //! not created and the uploads are skipped.
//! Theoretically, it should be ok to remove and re-add remote storage configuration to
//! the pageserver config at any time, since it doesn't make a difference to
//! [`Timeline::load_layer_map`].
//! Of course, the remote timeline dir must not change while we have de-configured
//! remote storage, i.e., the pageserver must remain the owner of the given prefix
//! in remote storage.
//! But note that we don't test any of this right now.
//! //!
//! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync //! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
//! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map //! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
@@ -468,7 +446,10 @@ impl RemoteTimelineClient {
// //
/// Download index file /// Download index file
pub async fn download_index_file(&self) -> Result<MaybeDeletedIndexPart, DownloadError> { pub async fn download_index_file(
&self,
cancel: CancellationToken,
) -> Result<MaybeDeletedIndexPart, DownloadError> {
let _unfinished_gauge_guard = self.metrics.call_begin( let _unfinished_gauge_guard = self.metrics.call_begin(
&RemoteOpFileKind::Index, &RemoteOpFileKind::Index,
&RemoteOpKind::Download, &RemoteOpKind::Download,
@@ -482,6 +463,7 @@ impl RemoteTimelineClient {
&self.tenant_id, &self.tenant_id,
&self.timeline_id, &self.timeline_id,
self.generation, self.generation,
cancel,
) )
.measure_remote_op( .measure_remote_op(
self.tenant_id, self.tenant_id,
@@ -1725,7 +1707,11 @@ mod tests {
let client = timeline.remote_client.as_ref().unwrap(); let client = timeline.remote_client.as_ref().unwrap();
// Download back the index.json, and check that the list of files is correct // Download back the index.json, and check that the list of files is correct
let initial_index_part = match client.download_index_file().await.unwrap() { let initial_index_part = match client
.download_index_file(CancellationToken::new())
.await
.unwrap()
{
MaybeDeletedIndexPart::IndexPart(index_part) => index_part, MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"), MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
}; };
@@ -1814,7 +1800,11 @@ mod tests {
} }
// Download back the index.json, and check that the list of files is correct // Download back the index.json, and check that the list of files is correct
let index_part = match client.download_index_file().await.unwrap() { let index_part = match client
.download_index_file(CancellationToken::new())
.await
.unwrap()
{
MaybeDeletedIndexPart::IndexPart(index_part) => index_part, MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"), MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
}; };
@@ -2013,7 +2003,7 @@ mod tests {
let client = test_state.build_client(get_generation); let client = test_state.build_client(get_generation);
let download_r = client let download_r = client
.download_index_file() .download_index_file(CancellationToken::new())
.await .await
.expect("download should always succeed"); .expect("download should always succeed");
assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_))); assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));

View File

@@ -18,8 +18,8 @@ use crate::config::PageServerConf;
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerFileName; use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::{Generation, TENANT_DELETED_MARKER_FILE_NAME}; use crate::tenant::Generation;
use remote_storage::{DownloadError, GenericRemoteStorage}; use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
use utils::crashsafe::path_with_suffix_extension; use utils::crashsafe::path_with_suffix_extension;
use utils::id::{TenantId, TimelineId}; use utils::id::{TenantId, TimelineId};
@@ -170,53 +170,43 @@ pub fn is_temp_download_file(path: &Utf8Path) -> bool {
pub async fn list_remote_timelines( pub async fn list_remote_timelines(
storage: &GenericRemoteStorage, storage: &GenericRemoteStorage,
tenant_id: TenantId, tenant_id: TenantId,
) -> anyhow::Result<HashSet<TimelineId>> { cancel: CancellationToken,
) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
let remote_path = remote_timelines_path(&tenant_id); let remote_path = remote_timelines_path(&tenant_id);
fail::fail_point!("storage-sync-list-remote-timelines", |_| { fail::fail_point!("storage-sync-list-remote-timelines", |_| {
anyhow::bail!("storage-sync-list-remote-timelines"); anyhow::bail!("storage-sync-list-remote-timelines");
}); });
let timelines = download_retry( let listing = download_retry_forever(
|| storage.list_prefixes(Some(&remote_path)), || storage.list(Some(&remote_path), ListingMode::WithDelimiter),
&format!("list prefixes for {tenant_id}"), &format!("list timelines for {tenant_id}"),
cancel,
) )
.await?; .await?;
if timelines.is_empty() {
anyhow::bail!("no timelines found on the remote storage")
}
let mut timeline_ids = HashSet::new(); let mut timeline_ids = HashSet::new();
let mut other_prefixes = HashSet::new();
for timeline_remote_storage_key in timelines { for timeline_remote_storage_key in listing.prefixes {
if timeline_remote_storage_key.object_name() == Some(TENANT_DELETED_MARKER_FILE_NAME) {
// A `deleted` key within `timelines/` is a marker file, not a timeline. Ignore it.
// This code will be removed in https://github.com/neondatabase/neon/pull/5580
continue;
}
let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| { let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}") anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}")
})?; })?;
let timeline_id: TimelineId = object_name match object_name.parse::<TimelineId>() {
.parse() Ok(t) => timeline_ids.insert(t),
.with_context(|| format!("parse object name into timeline id '{object_name}'"))?; Err(_) => other_prefixes.insert(object_name.to_string()),
};
// list_prefixes is assumed to return unique names. Ensure this here.
// NB: it's safer to bail out than warn-log this because the pageserver
// needs to absolutely know about _all_ timelines that exist, so that
// GC knows all the branchpoints. If we skipped over a timeline instead,
// GC could delete a layer that's still needed by that timeline.
anyhow::ensure!(
!timeline_ids.contains(&timeline_id),
"list_prefixes contains duplicate timeline id {timeline_id}"
);
timeline_ids.insert(timeline_id);
} }
Ok(timeline_ids) for key in listing.keys {
let object_name = key
.object_name()
.ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
other_prefixes.insert(object_name.to_string());
}
Ok((timeline_ids, other_prefixes))
} }
async fn do_download_index_part( async fn do_download_index_part(
@@ -224,10 +214,11 @@ async fn do_download_index_part(
tenant_id: &TenantId, tenant_id: &TenantId,
timeline_id: &TimelineId, timeline_id: &TimelineId,
index_generation: Generation, index_generation: Generation,
cancel: CancellationToken,
) -> Result<IndexPart, DownloadError> { ) -> Result<IndexPart, DownloadError> {
let remote_path = remote_index_path(tenant_id, timeline_id, index_generation); let remote_path = remote_index_path(tenant_id, timeline_id, index_generation);
let index_part_bytes = download_retry( let index_part_bytes = download_retry_forever(
|| async { || async {
let mut index_part_download = storage.download(&remote_path).await?; let mut index_part_download = storage.download(&remote_path).await?;
@@ -242,6 +233,7 @@ async fn do_download_index_part(
Ok(index_part_bytes) Ok(index_part_bytes)
}, },
&format!("download {remote_path:?}"), &format!("download {remote_path:?}"),
cancel,
) )
.await?; .await?;
@@ -263,19 +255,28 @@ pub(super) async fn download_index_part(
tenant_id: &TenantId, tenant_id: &TenantId,
timeline_id: &TimelineId, timeline_id: &TimelineId,
my_generation: Generation, my_generation: Generation,
cancel: CancellationToken,
) -> Result<IndexPart, DownloadError> { ) -> Result<IndexPart, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id(); debug_assert_current_span_has_tenant_and_timeline_id();
if my_generation.is_none() { if my_generation.is_none() {
// Operating without generations: just fetch the generation-less path // Operating without generations: just fetch the generation-less path
return do_download_index_part(storage, tenant_id, timeline_id, my_generation).await; return do_download_index_part(storage, tenant_id, timeline_id, my_generation, cancel)
.await;
} }
// Stale case: If we were intentionally attached in a stale generation, there may already be a remote // Stale case: If we were intentionally attached in a stale generation, there may already be a remote
// index in our generation. // index in our generation.
// //
// This is an optimization to avoid doing the listing for the general case below. // This is an optimization to avoid doing the listing for the general case below.
let res = do_download_index_part(storage, tenant_id, timeline_id, my_generation).await; let res = do_download_index_part(
storage,
tenant_id,
timeline_id,
my_generation,
cancel.clone(),
)
.await;
match res { match res {
Ok(index_part) => { Ok(index_part) => {
tracing::debug!( tracing::debug!(
@@ -295,8 +296,14 @@ pub(super) async fn download_index_part(
// we want to find the most recent index from a previous generation. // we want to find the most recent index from a previous generation.
// //
// This is an optimization to avoid doing the listing for the general case below. // This is an optimization to avoid doing the listing for the general case below.
let res = let res = do_download_index_part(
do_download_index_part(storage, tenant_id, timeline_id, my_generation.previous()).await; storage,
tenant_id,
timeline_id,
my_generation.previous(),
cancel.clone(),
)
.await;
match res { match res {
Ok(index_part) => { Ok(index_part) => {
tracing::debug!("Found index_part from previous generation"); tracing::debug!("Found index_part from previous generation");
@@ -340,13 +347,14 @@ pub(super) async fn download_index_part(
match max_previous_generation { match max_previous_generation {
Some(g) => { Some(g) => {
tracing::debug!("Found index_part in generation {g:?}"); tracing::debug!("Found index_part in generation {g:?}");
do_download_index_part(storage, tenant_id, timeline_id, g).await do_download_index_part(storage, tenant_id, timeline_id, g, cancel).await
} }
None => { None => {
// Migration from legacy pre-generation state: we have a generation but no prior // Migration from legacy pre-generation state: we have a generation but no prior
// attached pageservers did. Try to load from a no-generation path. // attached pageservers did. Try to load from a no-generation path.
tracing::info!("No index_part.json* found"); tracing::info!("No index_part.json* found");
do_download_index_part(storage, tenant_id, timeline_id, Generation::none()).await do_download_index_part(storage, tenant_id, timeline_id, Generation::none(), cancel)
.await
} }
} }
} }
@@ -376,3 +384,23 @@ where
) )
.await .await
} }
async fn download_retry_forever<T, O, F>(
op: O,
description: &str,
cancel: CancellationToken,
) -> Result<T, DownloadError>
where
O: FnMut() -> F,
F: Future<Output = Result<T, DownloadError>>,
{
backoff::retry(
op,
|e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound),
FAILED_DOWNLOAD_WARN_THRESHOLD,
u32::MAX,
description,
backoff::Cancel::new(cancel, || DownloadError::Cancelled),
)
.await
}

View File

@@ -294,6 +294,7 @@ async fn cleanup_remaining_timeline_fs_traces(
// Remove delete mark // Remove delete mark
tokio::fs::remove_file(conf.timeline_delete_mark_file_path(tenant_id, timeline_id)) tokio::fs::remove_file(conf.timeline_delete_mark_file_path(tenant_id, timeline_id))
.await .await
.or_else(fs_ext::ignore_not_found)
.context("remove delete mark") .context("remove delete mark")
} }

View File

@@ -60,6 +60,7 @@ from fixtures.utils import (
allure_attach_from_dir, allure_attach_from_dir,
get_self_dir, get_self_dir,
subprocess_capture, subprocess_capture,
wait_until,
) )
""" """
@@ -1679,6 +1680,40 @@ class NeonPageserver(PgProtocol):
self.running = False self.running = False
return self return self
def restart(self, immediate: bool = False):
"""
High level wrapper for restart: restarts the process, and waits for
tenant state to stabilize.
"""
self.stop(immediate=immediate)
self.start()
self.quiesce_tenants()
def quiesce_tenants(self):
"""
Wait for all tenants to enter a stable state (Active or Broken)
Call this after restarting the pageserver, or after attaching a tenant,
to ensure that it is ready for use.
"""
stable_states = {"Active", "Broken"}
client = self.http_client()
def complete():
log.info("Checking tenants...")
tenants = client.tenant_list()
log.info(f"Tenant list: {tenants}...")
any_unstable = any((t["state"]["slug"] not in stable_states) for t in tenants)
if any_unstable:
for t in tenants:
log.info(f"Waiting for tenant {t['id']} in state {t['state']['slug']}")
log.info(f"any_unstable={any_unstable}")
assert not any_unstable
wait_until(20, 0.5, complete)
def __enter__(self) -> "NeonPageserver": def __enter__(self) -> "NeonPageserver":
return self return self

View File

@@ -333,16 +333,30 @@ def test_non_uploaded_root_timeline_is_deleted_after_restart(neon_env_builder: N
env = neon_env_builder.init_configs() env = neon_env_builder.init_configs()
env.start() env.start()
env.pageserver.allowed_errors.append( env.pageserver.allowed_errors.extend(
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*" [
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*",
".*Failed to load index_part from remote storage.*",
# On a fast restart, there may be an initdb still running in a basebackup...__temp directory
".*Failed to purge.*Directory not empty.*",
]
) )
ps_http = env.pageserver.http_client() ps_http = env.pageserver.http_client()
# pause all uploads # pause all uploads
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
ps_http.tenant_create(env.initial_tenant) ps_http.tenant_create(env.initial_tenant)
# Create a timeline whose creation will succeed. The tenant will need at least one
# timeline to be loadable.
success_timeline = TimelineId.generate()
log.info(f"Creating timeline {success_timeline}")
ps_http.timeline_create(env.pg_version, env.initial_tenant, success_timeline, timeout=60)
# Create a timeline whose upload to remote storage will be blocked
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
def start_creating_timeline(): def start_creating_timeline():
log.info(f"Creating (expect failure) timeline {env.initial_timeline}")
with pytest.raises(RequestException): with pytest.raises(RequestException):
ps_http.timeline_create( ps_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60 env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
@@ -366,6 +380,9 @@ def test_non_uploaded_root_timeline_is_deleted_after_restart(neon_env_builder: N
with pytest.raises(PageserverApiException, match="not found"): with pytest.raises(PageserverApiException, match="not found"):
ps_http.timeline_detail(env.initial_tenant, env.initial_timeline) ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
# The one successfully created timeline should still be there.
assert len(ps_http.timeline_list(tenant_id=env.initial_tenant)) == 1
def test_non_uploaded_branch_is_deleted_after_restart(neon_env_builder: NeonEnvBuilder): def test_non_uploaded_branch_is_deleted_after_restart(neon_env_builder: NeonEnvBuilder):
""" """

View File

@@ -15,7 +15,7 @@ from fixtures.types import TenantId, TimelineId
# Test restarting page server, while safekeeper and compute node keep # Test restarting page server, while safekeeper and compute node keep
# running. # running.
def test_broken_timeline(neon_env_builder: NeonEnvBuilder): def test_local_corruption(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start() env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend( env.pageserver.allowed_errors.extend(
@@ -69,24 +69,19 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
env.pageserver.start() env.pageserver.start()
# Tenant 0 should still work # Un-damaged tenant works
pg0.start() pg0.start()
assert pg0.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100 assert pg0.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100
# But all others are broken # Tenant with corrupt local metadata works: remote storage is authoritative for metadata
pg1.start()
# First timeline would not get loaded into pageserver due to corrupt metadata file assert pg1.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100
with pytest.raises(
Exception, match=f"Tenant {tenant1} will not become active. Current state: Broken"
) as err:
pg1.start()
log.info(
f"As expected, compute startup failed eagerly for timeline with corrupt metadata: {err}"
)
# Second timeline will fail during basebackup, because the local layer file is corrupt. # Second timeline will fail during basebackup, because the local layer file is corrupt.
# It will fail when we try to read (and reconstruct) a page from it, ergo the error message. # It will fail when we try to read (and reconstruct) a page from it, ergo the error message.
# (We don't check layer file contents on startup, when loading the timeline) # (We don't check layer file contents on startup, when loading the timeline)
#
# This will change when we implement checksums for layers
with pytest.raises(Exception, match="layer loading failed:") as err: with pytest.raises(Exception, match="layer loading failed:") as err:
pg2.start() pg2.start()
log.info( log.info(
@@ -133,8 +128,7 @@ def test_timeline_init_break_before_checkpoint(neon_env_builder: NeonEnvBuilder)
_ = env.neon_cli.create_timeline("test_timeline_init_break_before_checkpoint", tenant_id) _ = env.neon_cli.create_timeline("test_timeline_init_break_before_checkpoint", tenant_id)
# Restart the page server # Restart the page server
env.pageserver.stop(immediate=True) env.pageserver.restart(immediate=True)
env.pageserver.start()
# Creating the timeline didn't finish. The other timelines on tenant should still be present and work normally. # Creating the timeline didn't finish. The other timelines on tenant should still be present and work normally.
new_tenant_timelines = env.neon_cli.list_timelines(tenant_id) new_tenant_timelines = env.neon_cli.list_timelines(tenant_id)

View File

@@ -62,14 +62,14 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool)
tenant_load_delay_ms = 5000 tenant_load_delay_ms = 5000
env.pageserver.stop() env.pageserver.stop()
env.pageserver.start( env.pageserver.start(
extra_env_vars={"FAILPOINTS": f"before-loading-tenant=return({tenant_load_delay_ms})"} extra_env_vars={"FAILPOINTS": f"before-attaching-tenant=return({tenant_load_delay_ms})"}
) )
# Check that it's in Loading state # Check that it's in Attaching state
client = env.pageserver.http_client() client = env.pageserver.http_client()
tenant_status = client.tenant_status(env.initial_tenant) tenant_status = client.tenant_status(env.initial_tenant)
log.info("Tenant status : %s", tenant_status) log.info("Tenant status : %s", tenant_status)
assert tenant_status["state"]["slug"] == "Loading" assert tenant_status["state"]["slug"] == "Attaching"
# Try to read. This waits until the loading finishes, and then return normally. # Try to read. This waits until the loading finishes, and then return normally.
cur.execute("SELECT count(*) FROM foo") cur.execute("SELECT count(*) FROM foo")

View File

@@ -241,8 +241,7 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
assert reason.endswith(f"failpoint: {failpoint}"), reason assert reason.endswith(f"failpoint: {failpoint}"), reason
if check is Check.RETRY_WITH_RESTART: if check is Check.RETRY_WITH_RESTART:
env.pageserver.stop() env.pageserver.restart()
env.pageserver.start()
if failpoint in ( if failpoint in (
"tenant-delete-before-shutdown", "tenant-delete-before-shutdown",

View File

@@ -66,10 +66,6 @@ def test_tenant_reattach(
env.pageserver.allowed_errors.append( env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*" f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
) )
# Thats because of UnreliableWrapper's injected failures
env.pageserver.allowed_errors.append(
f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*"
)
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
with endpoint.cursor() as cur: with endpoint.cursor() as cur:
@@ -116,7 +112,7 @@ def test_tenant_reattach(
assert query_scalar(cur, "SELECT count(*) FROM t") == 100000 assert query_scalar(cur, "SELECT count(*) FROM t") == 100000
# Check that we had to retry the downloads # Check that we had to retry the downloads
assert env.pageserver.log_contains(".*list prefixes.*failed, will retry.*") assert env.pageserver.log_contains(".*list timelines.*failed, will retry.*")
assert env.pageserver.log_contains(".*download.*failed, will retry.*") assert env.pageserver.log_contains(".*download.*failed, will retry.*")
@@ -643,47 +639,6 @@ def test_ignored_tenant_download_missing_layers(neon_env_builder: NeonEnvBuilder
ensure_test_data(data_id, data_secret, endpoint) ensure_test_data(data_id, data_secret, endpoint)
# Tests that it's possible to `load` broken tenants:
# * `ignore` a tenant
# * removes its `metadata` file locally
# * `load` the same tenant
# * ensure that it's status is `Broken`
def test_ignored_tenant_stays_broken_without_metadata(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
env.endpoints.create_start("main")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Attempts to connect from compute to pageserver while the tenant is
# temporarily detached produces these errors in the pageserver log.
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: (Broken|Stopping).*"
)
# ignore the tenant and remove its metadata
pageserver_http.tenant_ignore(tenant_id)
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
metadata_removed = False
for dir_entry in timeline_dir.iterdir():
if dir_entry.name == "metadata":
# Looks like a layer file. Remove it
dir_entry.unlink()
metadata_removed = True
assert metadata_removed, f"Failed to find metadata file in {timeline_dir}"
env.pageserver.allowed_errors.append(
f".*{tenant_id}.*: load failed.*: failed to load metadata.*"
)
# now, load it from the local files and expect it to be broken due to inability to load tenant files into memory
pageserver_http.tenant_load(tenant_id=tenant_id)
wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 5)
# Tests that attach is never working on a tenant, ignored or not, as long as it's not absent locally # Tests that attach is never working on a tenant, ignored or not, as long as it's not absent locally
# Similarly, tests that it's not possible to schedule a `load` for tenat that's not ignored. # Similarly, tests that it's not possible to schedule a `load` for tenat that's not ignored.
def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder): def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder):
@@ -778,7 +733,8 @@ def test_ignore_while_attaching(
tenants_before_ignore tenants_before_ignore
), "Only ignored tenant should be missing" ), "Only ignored tenant should be missing"
# But can load it from local files, that will restore attach. # Calling load will bring the tenant back online
pageserver_http.configure_failpoints([("attach-before-activate", "off")])
pageserver_http.tenant_load(tenant_id) pageserver_http.tenant_load(tenant_id)
wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5) wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5)

View File

@@ -1,5 +1,4 @@
import os import os
import shutil
import time import time
from contextlib import closing from contextlib import closing
from datetime import datetime from datetime import datetime
@@ -20,7 +19,7 @@ from fixtures.neon_fixtures import (
) )
from fixtures.pageserver.utils import timeline_delete_wait_completed from fixtures.pageserver.utils import timeline_delete_wait_completed
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
from fixtures.types import Lsn, TenantId, TimelineId from fixtures.types import Lsn, TenantId
from fixtures.utils import wait_until from fixtures.utils import wait_until
from prometheus_client.samples import Sample from prometheus_client.samples import Sample
@@ -298,13 +297,8 @@ def test_pageserver_with_empty_tenants(
client = env.pageserver.http_client() client = env.pageserver.http_client()
tenant_with_empty_timelines = TenantId.generate() tenant_with_empty_timelines = env.initial_tenant
client.tenant_create(tenant_with_empty_timelines) timeline_delete_wait_completed(client, tenant_with_empty_timelines, env.initial_timeline)
temp_timelines = client.timeline_list(tenant_with_empty_timelines)
for temp_timeline in temp_timelines:
timeline_delete_wait_completed(
client, tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
)
files_in_timelines_dir = sum( files_in_timelines_dir = sum(
1 for _p in Path.iterdir(env.pageserver.timeline_dir(tenant_with_empty_timelines)) 1 for _p in Path.iterdir(env.pageserver.timeline_dir(tenant_with_empty_timelines))
@@ -317,34 +311,19 @@ def test_pageserver_with_empty_tenants(
env.endpoints.stop_all() env.endpoints.stop_all()
env.pageserver.stop() env.pageserver.stop()
tenant_without_timelines_dir = env.initial_tenant
shutil.rmtree(env.pageserver.timeline_dir(tenant_without_timelines_dir))
env.pageserver.start() env.pageserver.start()
client = env.pageserver.http_client() client = env.pageserver.http_client()
def not_loading(): def not_attaching():
tenants = client.tenant_list() tenants = client.tenant_list()
assert len(tenants) == 2 assert len(tenants) == 1
assert all(t["state"]["slug"] != "Loading" for t in tenants) assert all(t["state"]["slug"] != "Attaching" for t in tenants)
wait_until(10, 0.2, not_loading) wait_until(10, 0.2, not_attaching)
tenants = client.tenant_list() tenants = client.tenant_list()
[broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)]
assert (
broken_tenant["state"]["slug"] == "Broken"
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
broken_tenant_status = client.tenant_status(tenant_without_timelines_dir)
assert (
broken_tenant_status["state"]["slug"] == "Broken"
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
assert env.pageserver.log_contains(".*load failed, setting tenant state to Broken:.*")
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)] [loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
assert ( assert (
loaded_tenant["state"]["slug"] == "Active" loaded_tenant["state"]["slug"] == "Active"
@@ -358,9 +337,6 @@ def test_pageserver_with_empty_tenants(
time.sleep(1) # to allow metrics propagation time.sleep(1) # to allow metrics propagation
ps_metrics = client.get_metrics() ps_metrics = client.get_metrics()
broken_tenants_metric_filter = {
"tenant_id": str(tenant_without_timelines_dir),
}
active_tenants_metric_filter = { active_tenants_metric_filter = {
"state": "Active", "state": "Active",
} }
@@ -374,13 +350,3 @@ def test_pageserver_with_empty_tenants(
assert ( assert (
tenant_active_count == 1 tenant_active_count == 1
), f"Tenant {tenant_with_empty_timelines} should have metric as active" ), f"Tenant {tenant_with_empty_timelines} should have metric as active"
tenant_broken_count = int(
ps_metrics.query_one(
"pageserver_broken_tenants_count", filter=broken_tenants_metric_filter
).value
)
assert (
tenant_broken_count == 1
), f"Tenant {tenant_without_timelines_dir} should have metric as broken"

View File

@@ -70,8 +70,7 @@ def test_threshold_based_eviction(
} }
# restart because changing tenant config is not instant # restart because changing tenant config is not instant
env.pageserver.stop() env.pageserver.restart()
env.pageserver.start()
assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == { assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == {
"kind": "LayerAccessThreshold", "kind": "LayerAccessThreshold",

View File

@@ -277,13 +277,6 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
if failpoint == "timeline-delete-after-index-delete": if failpoint == "timeline-delete-after-index-delete":
m = ps_http.get_metrics() m = ps_http.get_metrics()
assert (
m.query_one(
"remote_storage_s3_request_seconds_count",
filter={"request_type": "get_object", "result": "err"},
).value
== 2 # One is missing tenant deletion mark, second is missing index part
)
assert ( assert (
m.query_one( m.query_one(
"remote_storage_s3_request_seconds_count", "remote_storage_s3_request_seconds_count",