mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
pageserver: always load remote metadata (no more spawn_load) (#5580)
## Problem The pageserver had two ways of loading a tenant: - `spawn_load` would trust on-disk content to reflect all existing timelines - `spawn_attach` would list timelines in remote storage. It was incorrect for `spawn_load` to trust local disk content, because it doesn't know if the tenant might have been attached and written somewhere else. To make this correct would requires some generation number checks, but the payoff is to avoid one S3 op per tenant at startup, so it's not worth the complexity -- it is much simpler to have one way to load a tenant. ## Summary of changes - `Tenant` objects are always created with `Tenant::spawn`: there is no more distinction between "load" and "attach". - The ability to run without remote storage (for `neon_local`) is preserved by adding a branch inside `attach` that uses a fallback `load_local` if no remote_storage is present. - Fix attaching a tenant when it has a timeline with no IndexPart: this can occur if a newly created timeline manages to upload a layer before it has uploaded an index. - The attach marker file that used to indicate whether a tenant should be "loaded" or "attached" is no longer needed, and is removed. - The GenericRemoteStorage interface gets a `list()` method that maps more directly to what ListObjects does, returning both keys and common prefixes. The existing `list_files` and `list_prefixes` methods are just calls into `list()` now -- these can be removed later if we would like to shrink the interface a bit. - The remote deletion marker is moved into `timelines/` and detected as part of listing timelines rather than as a separate GET request. If any existing tenants have a marker in the old location (unlikely, only happens if something crashes mid-delete), then they will rely on the control plane retrying to complete their deletion. - Revise S3 calls for timeline listing and tenant load to take a cancellation token, and retry forever: it never makes sense to make a Tenant broken because of a transient S3 issue. ## Breaking changes - The remote deletion marker is moved from `deleted` to `timelines/deleted` within the tenant prefix. Markers in the old location will be ignored: it is the control plane's responsibility to retry deletions until they succeed. Markers in the new location will be tolerated by the previous release of pageserver via https://github.com/neondatabase/neon/pull/5632 - The local `attaching` marker file is no longer written. Therefore, if the pageserver is downgraded after running this code, the old pageserver will not be able to distinguish between partially attached tenants and fully attached tenants. This would only impact tenants that were partway through attaching at the moment of downgrade. In the unlikely even t that we do experience an incident that prompts us to roll back, then we may check for attach operations in flight, and manually insert `attaching` marker files as needed. --------- Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
@@ -110,7 +110,6 @@ impl TenantState {
|
|||||||
// So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
|
// So, return `Maybe` while Attaching, making Console wait for the attach task to finish.
|
||||||
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
|
Self::Attaching | Self::Activating(ActivatingFrom::Attaching) => Maybe,
|
||||||
// tenant mgr startup distinguishes attaching from loading via marker file.
|
// tenant mgr startup distinguishes attaching from loading via marker file.
|
||||||
// If it's loading, there is no attach marker file, i.e., attach had finished in the past.
|
|
||||||
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
|
Self::Loading | Self::Activating(ActivatingFrom::Loading) => Attached,
|
||||||
// We only reach Active after successful load / attach.
|
// We only reach Active after successful load / attach.
|
||||||
// So, call atttachment status Attached.
|
// So, call atttachment status Attached.
|
||||||
|
|||||||
@@ -23,8 +23,8 @@ use tracing::debug;
|
|||||||
|
|
||||||
use crate::s3_bucket::RequestKind;
|
use crate::s3_bucket::RequestKind;
|
||||||
use crate::{
|
use crate::{
|
||||||
AzureConfig, ConcurrencyLimiter, Download, DownloadError, RemotePath, RemoteStorage,
|
AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath,
|
||||||
StorageMetadata,
|
RemoteStorage, StorageMetadata,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct AzureBlobStorage {
|
pub struct AzureBlobStorage {
|
||||||
@@ -184,10 +184,11 @@ fn to_download_error(error: azure_core::Error) -> DownloadError {
|
|||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl RemoteStorage for AzureBlobStorage {
|
impl RemoteStorage for AzureBlobStorage {
|
||||||
async fn list_prefixes(
|
async fn list(
|
||||||
&self,
|
&self,
|
||||||
prefix: Option<&RemotePath>,
|
prefix: Option<&RemotePath>,
|
||||||
) -> Result<Vec<RemotePath>, DownloadError> {
|
mode: ListingMode,
|
||||||
|
) -> anyhow::Result<Listing, DownloadError> {
|
||||||
// get the passed prefix or if it is not set use prefix_in_bucket value
|
// get the passed prefix or if it is not set use prefix_in_bucket value
|
||||||
let list_prefix = prefix
|
let list_prefix = prefix
|
||||||
.map(|p| self.relative_path_to_name(p))
|
.map(|p| self.relative_path_to_name(p))
|
||||||
@@ -195,16 +196,19 @@ impl RemoteStorage for AzureBlobStorage {
|
|||||||
.map(|mut p| {
|
.map(|mut p| {
|
||||||
// required to end with a separator
|
// required to end with a separator
|
||||||
// otherwise request will return only the entry of a prefix
|
// otherwise request will return only the entry of a prefix
|
||||||
if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
if matches!(mode, ListingMode::WithDelimiter)
|
||||||
|
&& !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
|
||||||
|
{
|
||||||
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
|
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
|
||||||
}
|
}
|
||||||
p
|
p
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut builder = self
|
let mut builder = self.client.list_blobs();
|
||||||
.client
|
|
||||||
.list_blobs()
|
if let ListingMode::WithDelimiter = mode {
|
||||||
.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
|
builder = builder.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(prefix) = list_prefix {
|
if let Some(prefix) = list_prefix {
|
||||||
builder = builder.prefix(Cow::from(prefix.to_owned()));
|
builder = builder.prefix(Cow::from(prefix.to_owned()));
|
||||||
@@ -215,46 +219,23 @@ impl RemoteStorage for AzureBlobStorage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut response = builder.into_stream();
|
let mut response = builder.into_stream();
|
||||||
let mut res = Vec::new();
|
let mut res = Listing::default();
|
||||||
while let Some(entry) = response.next().await {
|
while let Some(l) = response.next().await {
|
||||||
let entry = entry.map_err(to_download_error)?;
|
let entry = l.map_err(to_download_error)?;
|
||||||
let name_iter = entry
|
let prefix_iter = entry
|
||||||
.blobs
|
.blobs
|
||||||
.prefixes()
|
.prefixes()
|
||||||
.map(|prefix| self.name_to_relative_path(&prefix.name));
|
.map(|prefix| self.name_to_relative_path(&prefix.name));
|
||||||
res.extend(name_iter);
|
res.prefixes.extend(prefix_iter);
|
||||||
}
|
|
||||||
Ok(res)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
let blob_iter = entry
|
||||||
let folder_name = folder
|
|
||||||
.map(|p| self.relative_path_to_name(p))
|
|
||||||
.or_else(|| self.prefix_in_container.clone());
|
|
||||||
|
|
||||||
let mut builder = self.client.list_blobs();
|
|
||||||
|
|
||||||
if let Some(folder_name) = folder_name {
|
|
||||||
builder = builder.prefix(Cow::from(folder_name.to_owned()));
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(limit) = self.max_keys_per_list_response {
|
|
||||||
builder = builder.max_results(MaxResults::new(limit));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut response = builder.into_stream();
|
|
||||||
let mut res = Vec::new();
|
|
||||||
while let Some(l) = response.next().await {
|
|
||||||
let entry = l.map_err(anyhow::Error::new)?;
|
|
||||||
let name_iter = entry
|
|
||||||
.blobs
|
.blobs
|
||||||
.blobs()
|
.blobs()
|
||||||
.map(|bl| self.name_to_relative_path(&bl.name));
|
.map(|k| self.name_to_relative_path(&k.name));
|
||||||
res.extend(name_iter);
|
res.keys.extend(blob_iter);
|
||||||
}
|
}
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn upload(
|
async fn upload(
|
||||||
&self,
|
&self,
|
||||||
mut from: impl AsyncRead + Unpin + Send + Sync + 'static,
|
mut from: impl AsyncRead + Unpin + Send + Sync + 'static,
|
||||||
|
|||||||
@@ -129,6 +129,22 @@ impl RemotePath {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// We don't need callers to be able to pass arbitrary delimiters: just control
|
||||||
|
/// whether listings will use a '/' separator or not.
|
||||||
|
///
|
||||||
|
/// The WithDelimiter mode will populate `prefixes` and `keys` in the result. The
|
||||||
|
/// NoDelimiter mode will only populate `keys`.
|
||||||
|
pub enum ListingMode {
|
||||||
|
WithDelimiter,
|
||||||
|
NoDelimiter,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct Listing {
|
||||||
|
pub prefixes: Vec<RemotePath>,
|
||||||
|
pub keys: Vec<RemotePath>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Storage (potentially remote) API to manage its state.
|
/// Storage (potentially remote) API to manage its state.
|
||||||
/// This storage tries to be unaware of any layered repository context,
|
/// This storage tries to be unaware of any layered repository context,
|
||||||
/// providing basic CRUD operations for storage files.
|
/// providing basic CRUD operations for storage files.
|
||||||
@@ -141,8 +157,13 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
|||||||
async fn list_prefixes(
|
async fn list_prefixes(
|
||||||
&self,
|
&self,
|
||||||
prefix: Option<&RemotePath>,
|
prefix: Option<&RemotePath>,
|
||||||
) -> Result<Vec<RemotePath>, DownloadError>;
|
) -> Result<Vec<RemotePath>, DownloadError> {
|
||||||
|
let result = self
|
||||||
|
.list(prefix, ListingMode::WithDelimiter)
|
||||||
|
.await?
|
||||||
|
.prefixes;
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
/// Lists all files in directory "recursively"
|
/// Lists all files in directory "recursively"
|
||||||
/// (not really recursively, because AWS has a flat namespace)
|
/// (not really recursively, because AWS has a flat namespace)
|
||||||
/// Note: This is subtely different than list_prefixes,
|
/// Note: This is subtely different than list_prefixes,
|
||||||
@@ -154,7 +175,16 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
|||||||
/// whereas,
|
/// whereas,
|
||||||
/// list_prefixes("foo/bar/") = ["cat", "dog"]
|
/// list_prefixes("foo/bar/") = ["cat", "dog"]
|
||||||
/// See `test_real_s3.rs` for more details.
|
/// See `test_real_s3.rs` for more details.
|
||||||
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>>;
|
async fn list_files(&self, prefix: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||||
|
let result = self.list(prefix, ListingMode::NoDelimiter).await?.keys;
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list(
|
||||||
|
&self,
|
||||||
|
prefix: Option<&RemotePath>,
|
||||||
|
_mode: ListingMode,
|
||||||
|
) -> anyhow::Result<Listing, DownloadError>;
|
||||||
|
|
||||||
/// Streams the local file contents into remote into the remote storage entry.
|
/// Streams the local file contents into remote into the remote storage entry.
|
||||||
async fn upload(
|
async fn upload(
|
||||||
@@ -205,6 +235,9 @@ pub enum DownloadError {
|
|||||||
BadInput(anyhow::Error),
|
BadInput(anyhow::Error),
|
||||||
/// The file was not found in the remote storage.
|
/// The file was not found in the remote storage.
|
||||||
NotFound,
|
NotFound,
|
||||||
|
/// A cancellation token aborted the download, typically during
|
||||||
|
/// tenant detach or process shutdown.
|
||||||
|
Cancelled,
|
||||||
/// The file was found in the remote storage, but the download failed.
|
/// The file was found in the remote storage, but the download failed.
|
||||||
Other(anyhow::Error),
|
Other(anyhow::Error),
|
||||||
}
|
}
|
||||||
@@ -215,6 +248,7 @@ impl std::fmt::Display for DownloadError {
|
|||||||
DownloadError::BadInput(e) => {
|
DownloadError::BadInput(e) => {
|
||||||
write!(f, "Failed to download a remote file due to user input: {e}")
|
write!(f, "Failed to download a remote file due to user input: {e}")
|
||||||
}
|
}
|
||||||
|
DownloadError::Cancelled => write!(f, "Cancelled, shutting down"),
|
||||||
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
|
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
|
||||||
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
|
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
|
||||||
}
|
}
|
||||||
@@ -234,6 +268,19 @@ pub enum GenericRemoteStorage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl GenericRemoteStorage {
|
impl GenericRemoteStorage {
|
||||||
|
pub async fn list(
|
||||||
|
&self,
|
||||||
|
prefix: Option<&RemotePath>,
|
||||||
|
mode: ListingMode,
|
||||||
|
) -> anyhow::Result<Listing, DownloadError> {
|
||||||
|
match self {
|
||||||
|
Self::LocalFs(s) => s.list(prefix, mode).await,
|
||||||
|
Self::AwsS3(s) => s.list(prefix, mode).await,
|
||||||
|
Self::AzureBlob(s) => s.list(prefix, mode).await,
|
||||||
|
Self::Unreliable(s) => s.list(prefix, mode).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// A function for listing all the files in a "directory"
|
// A function for listing all the files in a "directory"
|
||||||
// Example:
|
// Example:
|
||||||
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
|
// list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"]
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ use tokio::{
|
|||||||
use tracing::*;
|
use tracing::*;
|
||||||
use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
|
use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
|
||||||
|
|
||||||
use crate::{Download, DownloadError, RemotePath};
|
use crate::{Download, DownloadError, Listing, ListingMode, RemotePath};
|
||||||
|
|
||||||
use super::{RemoteStorage, StorageMetadata};
|
use super::{RemoteStorage, StorageMetadata};
|
||||||
|
|
||||||
@@ -75,7 +75,7 @@ impl LocalFs {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
|
async fn list_all(&self) -> anyhow::Result<Vec<RemotePath>> {
|
||||||
Ok(get_all_files(&self.storage_root, true)
|
Ok(get_all_files(&self.storage_root, true)
|
||||||
.await?
|
.await?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
@@ -89,52 +89,10 @@ impl LocalFs {
|
|||||||
})
|
})
|
||||||
.collect())
|
.collect())
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl RemoteStorage for LocalFs {
|
|
||||||
async fn list_prefixes(
|
|
||||||
&self,
|
|
||||||
prefix: Option<&RemotePath>,
|
|
||||||
) -> Result<Vec<RemotePath>, DownloadError> {
|
|
||||||
let path = match prefix {
|
|
||||||
Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
|
|
||||||
None => Cow::Borrowed(&self.storage_root),
|
|
||||||
};
|
|
||||||
|
|
||||||
let prefixes_to_filter = get_all_files(path.as_ref(), false)
|
|
||||||
.await
|
|
||||||
.map_err(DownloadError::Other)?;
|
|
||||||
|
|
||||||
let mut prefixes = Vec::with_capacity(prefixes_to_filter.len());
|
|
||||||
|
|
||||||
// filter out empty directories to mirror s3 behavior.
|
|
||||||
for prefix in prefixes_to_filter {
|
|
||||||
if prefix.is_dir()
|
|
||||||
&& is_directory_empty(&prefix)
|
|
||||||
.await
|
|
||||||
.map_err(DownloadError::Other)?
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
prefixes.push(
|
|
||||||
prefix
|
|
||||||
.strip_prefix(&self.storage_root)
|
|
||||||
.context("Failed to strip prefix")
|
|
||||||
.and_then(RemotePath::new)
|
|
||||||
.expect(
|
|
||||||
"We list files for storage root, hence should be able to remote the prefix",
|
|
||||||
),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(prefixes)
|
|
||||||
}
|
|
||||||
|
|
||||||
// recursively lists all files in a directory,
|
// recursively lists all files in a directory,
|
||||||
// mirroring the `list_files` for `s3_bucket`
|
// mirroring the `list_files` for `s3_bucket`
|
||||||
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
async fn list_recursive(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
||||||
let full_path = match folder {
|
let full_path = match folder {
|
||||||
Some(folder) => folder.with_base(&self.storage_root),
|
Some(folder) => folder.with_base(&self.storage_root),
|
||||||
None => self.storage_root.clone(),
|
None => self.storage_root.clone(),
|
||||||
@@ -186,6 +144,70 @@ impl RemoteStorage for LocalFs {
|
|||||||
|
|
||||||
Ok(files)
|
Ok(files)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl RemoteStorage for LocalFs {
|
||||||
|
async fn list(
|
||||||
|
&self,
|
||||||
|
prefix: Option<&RemotePath>,
|
||||||
|
mode: ListingMode,
|
||||||
|
) -> Result<Listing, DownloadError> {
|
||||||
|
let mut result = Listing::default();
|
||||||
|
|
||||||
|
if let ListingMode::NoDelimiter = mode {
|
||||||
|
let keys = self
|
||||||
|
.list_recursive(prefix)
|
||||||
|
.await
|
||||||
|
.map_err(DownloadError::Other)?;
|
||||||
|
|
||||||
|
result.keys = keys
|
||||||
|
.into_iter()
|
||||||
|
.filter(|k| {
|
||||||
|
let path = k.with_base(&self.storage_root);
|
||||||
|
!path.is_dir()
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
return Ok(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
let path = match prefix {
|
||||||
|
Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
|
||||||
|
None => Cow::Borrowed(&self.storage_root),
|
||||||
|
};
|
||||||
|
|
||||||
|
let prefixes_to_filter = get_all_files(path.as_ref(), false)
|
||||||
|
.await
|
||||||
|
.map_err(DownloadError::Other)?;
|
||||||
|
|
||||||
|
// filter out empty directories to mirror s3 behavior.
|
||||||
|
for prefix in prefixes_to_filter {
|
||||||
|
if prefix.is_dir()
|
||||||
|
&& is_directory_empty(&prefix)
|
||||||
|
.await
|
||||||
|
.map_err(DownloadError::Other)?
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let stripped = prefix
|
||||||
|
.strip_prefix(&self.storage_root)
|
||||||
|
.context("Failed to strip prefix")
|
||||||
|
.and_then(RemotePath::new)
|
||||||
|
.expect(
|
||||||
|
"We list files for storage root, hence should be able to remote the prefix",
|
||||||
|
);
|
||||||
|
|
||||||
|
if prefix.is_dir() {
|
||||||
|
result.prefixes.push(stripped);
|
||||||
|
} else {
|
||||||
|
result.keys.push(stripped);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
async fn upload(
|
async fn upload(
|
||||||
&self,
|
&self,
|
||||||
@@ -479,7 +501,7 @@ mod fs_tests {
|
|||||||
|
|
||||||
let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
|
let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
storage.list().await?,
|
storage.list_all().await?,
|
||||||
vec![target_path_1.clone()],
|
vec![target_path_1.clone()],
|
||||||
"Should list a single file after first upload"
|
"Should list a single file after first upload"
|
||||||
);
|
);
|
||||||
@@ -667,7 +689,7 @@ mod fs_tests {
|
|||||||
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
|
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
|
||||||
|
|
||||||
storage.delete(&upload_target).await?;
|
storage.delete(&upload_target).await?;
|
||||||
assert!(storage.list().await?.is_empty());
|
assert!(storage.list_all().await?.is_empty());
|
||||||
|
|
||||||
storage
|
storage
|
||||||
.delete(&upload_target)
|
.delete(&upload_target)
|
||||||
@@ -725,6 +747,43 @@ mod fs_tests {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn list() -> anyhow::Result<()> {
|
||||||
|
// No delimiter: should recursively list everything
|
||||||
|
let storage = create_storage()?;
|
||||||
|
let child = upload_dummy_file(&storage, "grandparent/parent/child", None).await?;
|
||||||
|
let uncle = upload_dummy_file(&storage, "grandparent/uncle", None).await?;
|
||||||
|
|
||||||
|
let listing = storage.list(None, ListingMode::NoDelimiter).await?;
|
||||||
|
assert!(listing.prefixes.is_empty());
|
||||||
|
assert_eq!(listing.keys, [uncle.clone(), child.clone()].to_vec());
|
||||||
|
|
||||||
|
// Delimiter: should only go one deep
|
||||||
|
let listing = storage.list(None, ListingMode::WithDelimiter).await?;
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
listing.prefixes,
|
||||||
|
[RemotePath::from_string("timelines").unwrap()].to_vec()
|
||||||
|
);
|
||||||
|
assert!(listing.keys.is_empty());
|
||||||
|
|
||||||
|
// Delimiter & prefix
|
||||||
|
let listing = storage
|
||||||
|
.list(
|
||||||
|
Some(&RemotePath::from_string("timelines/some_timeline/grandparent").unwrap()),
|
||||||
|
ListingMode::WithDelimiter,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
assert_eq!(
|
||||||
|
listing.prefixes,
|
||||||
|
[RemotePath::from_string("timelines/some_timeline/grandparent/parent").unwrap()]
|
||||||
|
.to_vec()
|
||||||
|
);
|
||||||
|
assert_eq!(listing.keys, [uncle.clone()].to_vec());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn upload_dummy_file(
|
async fn upload_dummy_file(
|
||||||
storage: &LocalFs,
|
storage: &LocalFs,
|
||||||
name: &str,
|
name: &str,
|
||||||
@@ -777,7 +836,7 @@ mod fs_tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
|
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
|
||||||
let mut files = storage.list().await?;
|
let mut files = storage.list_all().await?;
|
||||||
files.sort_by(|a, b| a.0.cmp(&b.0));
|
files.sort_by(|a, b| a.0.cmp(&b.0));
|
||||||
Ok(files)
|
Ok(files)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,8 +30,8 @@ use tracing::debug;
|
|||||||
|
|
||||||
use super::StorageMetadata;
|
use super::StorageMetadata;
|
||||||
use crate::{
|
use crate::{
|
||||||
ConcurrencyLimiter, Download, DownloadError, RemotePath, RemoteStorage, S3Config,
|
ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
|
||||||
MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
S3Config, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub(super) mod metrics;
|
pub(super) mod metrics;
|
||||||
@@ -299,13 +299,13 @@ impl<S: AsyncRead> AsyncRead for TimedDownload<S> {
|
|||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl RemoteStorage for S3Bucket {
|
impl RemoteStorage for S3Bucket {
|
||||||
/// See the doc for `RemoteStorage::list_prefixes`
|
async fn list(
|
||||||
/// Note: it wont include empty "directories"
|
|
||||||
async fn list_prefixes(
|
|
||||||
&self,
|
&self,
|
||||||
prefix: Option<&RemotePath>,
|
prefix: Option<&RemotePath>,
|
||||||
) -> Result<Vec<RemotePath>, DownloadError> {
|
mode: ListingMode,
|
||||||
|
) -> Result<Listing, DownloadError> {
|
||||||
let kind = RequestKind::List;
|
let kind = RequestKind::List;
|
||||||
|
let mut result = Listing::default();
|
||||||
|
|
||||||
// get the passed prefix or if it is not set use prefix_in_bucket value
|
// get the passed prefix or if it is not set use prefix_in_bucket value
|
||||||
let list_prefix = prefix
|
let list_prefix = prefix
|
||||||
@@ -314,28 +314,33 @@ impl RemoteStorage for S3Bucket {
|
|||||||
.map(|mut p| {
|
.map(|mut p| {
|
||||||
// required to end with a separator
|
// required to end with a separator
|
||||||
// otherwise request will return only the entry of a prefix
|
// otherwise request will return only the entry of a prefix
|
||||||
if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
if matches!(mode, ListingMode::WithDelimiter)
|
||||||
|
&& !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
|
||||||
|
{
|
||||||
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
|
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
|
||||||
}
|
}
|
||||||
p
|
p
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut document_keys = Vec::new();
|
|
||||||
|
|
||||||
let mut continuation_token = None;
|
let mut continuation_token = None;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let _guard = self.permit(kind).await;
|
let _guard = self.permit(kind).await;
|
||||||
let started_at = start_measuring_requests(kind);
|
let started_at = start_measuring_requests(kind);
|
||||||
|
|
||||||
let fetch_response = self
|
let mut request = self
|
||||||
.client
|
.client
|
||||||
.list_objects_v2()
|
.list_objects_v2()
|
||||||
.bucket(self.bucket_name.clone())
|
.bucket(self.bucket_name.clone())
|
||||||
.set_prefix(list_prefix.clone())
|
.set_prefix(list_prefix.clone())
|
||||||
.set_continuation_token(continuation_token)
|
.set_continuation_token(continuation_token)
|
||||||
.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string())
|
.set_max_keys(self.max_keys_per_list_response);
|
||||||
.set_max_keys(self.max_keys_per_list_response)
|
|
||||||
|
if let ListingMode::WithDelimiter = mode {
|
||||||
|
request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
let response = request
|
||||||
.send()
|
.send()
|
||||||
.await
|
.await
|
||||||
.context("Failed to list S3 prefixes")
|
.context("Failed to list S3 prefixes")
|
||||||
@@ -345,71 +350,35 @@ impl RemoteStorage for S3Bucket {
|
|||||||
|
|
||||||
metrics::BUCKET_METRICS
|
metrics::BUCKET_METRICS
|
||||||
.req_seconds
|
.req_seconds
|
||||||
.observe_elapsed(kind, &fetch_response, started_at);
|
.observe_elapsed(kind, &response, started_at);
|
||||||
|
|
||||||
let fetch_response = fetch_response?;
|
let response = response?;
|
||||||
|
|
||||||
document_keys.extend(
|
let keys = response.contents().unwrap_or_default();
|
||||||
fetch_response
|
let empty = Vec::new();
|
||||||
.common_prefixes
|
let prefixes = response.common_prefixes.as_ref().unwrap_or(&empty);
|
||||||
.unwrap_or_default()
|
|
||||||
.into_iter()
|
tracing::info!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
|
||||||
|
|
||||||
|
for object in keys {
|
||||||
|
let object_path = object.key().expect("response does not contain a key");
|
||||||
|
let remote_path = self.s3_object_to_relative_path(object_path);
|
||||||
|
result.keys.push(remote_path);
|
||||||
|
}
|
||||||
|
|
||||||
|
result.prefixes.extend(
|
||||||
|
prefixes
|
||||||
|
.iter()
|
||||||
.filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
|
.filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
|
||||||
);
|
);
|
||||||
|
|
||||||
continuation_token = match fetch_response.next_continuation_token {
|
continuation_token = match response.next_continuation_token {
|
||||||
Some(new_token) => Some(new_token),
|
Some(new_token) => Some(new_token),
|
||||||
None => break,
|
None => break,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(document_keys)
|
Ok(result)
|
||||||
}
|
|
||||||
|
|
||||||
/// See the doc for `RemoteStorage::list_files`
|
|
||||||
async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
|
|
||||||
let kind = RequestKind::List;
|
|
||||||
|
|
||||||
let folder_name = folder
|
|
||||||
.map(|p| self.relative_path_to_s3_object(p))
|
|
||||||
.or_else(|| self.prefix_in_bucket.clone());
|
|
||||||
|
|
||||||
// AWS may need to break the response into several parts
|
|
||||||
let mut continuation_token = None;
|
|
||||||
let mut all_files = vec![];
|
|
||||||
loop {
|
|
||||||
let _guard = self.permit(kind).await;
|
|
||||||
let started_at = start_measuring_requests(kind);
|
|
||||||
|
|
||||||
let response = self
|
|
||||||
.client
|
|
||||||
.list_objects_v2()
|
|
||||||
.bucket(self.bucket_name.clone())
|
|
||||||
.set_prefix(folder_name.clone())
|
|
||||||
.set_continuation_token(continuation_token)
|
|
||||||
.set_max_keys(self.max_keys_per_list_response)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.context("Failed to list files in S3 bucket");
|
|
||||||
|
|
||||||
let started_at = ScopeGuard::into_inner(started_at);
|
|
||||||
metrics::BUCKET_METRICS
|
|
||||||
.req_seconds
|
|
||||||
.observe_elapsed(kind, &response, started_at);
|
|
||||||
|
|
||||||
let response = response?;
|
|
||||||
|
|
||||||
for object in response.contents().unwrap_or_default() {
|
|
||||||
let object_path = object.key().expect("response does not contain a key");
|
|
||||||
let remote_path = self.s3_object_to_relative_path(object_path);
|
|
||||||
all_files.push(remote_path);
|
|
||||||
}
|
|
||||||
match response.next_continuation_token {
|
|
||||||
Some(new_token) => continuation_token = Some(new_token),
|
|
||||||
None => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(all_files)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn upload(
|
async fn upload(
|
||||||
|
|||||||
@@ -5,7 +5,9 @@ use std::collections::hash_map::Entry;
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
|
|
||||||
use crate::{Download, DownloadError, RemotePath, RemoteStorage, StorageMetadata};
|
use crate::{
|
||||||
|
Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata,
|
||||||
|
};
|
||||||
|
|
||||||
pub struct UnreliableWrapper {
|
pub struct UnreliableWrapper {
|
||||||
inner: crate::GenericRemoteStorage,
|
inner: crate::GenericRemoteStorage,
|
||||||
@@ -95,6 +97,15 @@ impl RemoteStorage for UnreliableWrapper {
|
|||||||
self.inner.list_files(folder).await
|
self.inner.list_files(folder).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn list(
|
||||||
|
&self,
|
||||||
|
prefix: Option<&RemotePath>,
|
||||||
|
mode: ListingMode,
|
||||||
|
) -> Result<Listing, DownloadError> {
|
||||||
|
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))?;
|
||||||
|
self.inner.list(prefix, mode).await
|
||||||
|
}
|
||||||
|
|
||||||
async fn upload(
|
async fn upload(
|
||||||
&self,
|
&self,
|
||||||
data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
|
data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||||
|
|||||||
@@ -33,8 +33,7 @@ use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
|
|||||||
use crate::tenant::config::TenantConf;
|
use crate::tenant::config::TenantConf;
|
||||||
use crate::tenant::config::TenantConfOpt;
|
use crate::tenant::config::TenantConfOpt;
|
||||||
use crate::tenant::{
|
use crate::tenant::{
|
||||||
TENANTS_SEGMENT_NAME, TENANT_ATTACHING_MARKER_FILENAME, TENANT_DELETED_MARKER_FILE_NAME,
|
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
|
||||||
TIMELINES_SEGMENT_NAME,
|
|
||||||
};
|
};
|
||||||
use crate::{
|
use crate::{
|
||||||
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_LOCATION_CONFIG_NAME,
|
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_LOCATION_CONFIG_NAME,
|
||||||
@@ -633,11 +632,6 @@ impl PageServerConf {
|
|||||||
self.tenants_path().join(tenant_id.to_string())
|
self.tenants_path().join(tenant_id.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn tenant_attaching_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
|
|
||||||
self.tenant_path(tenant_id)
|
|
||||||
.join(TENANT_ATTACHING_MARKER_FILENAME)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn tenant_ignore_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
|
pub fn tenant_ignore_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
|
||||||
self.tenant_path(tenant_id).join(IGNORED_TENANT_FILE_NAME)
|
self.tenant_path(tenant_id).join(IGNORED_TENANT_FILE_NAME)
|
||||||
}
|
}
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -3,10 +3,10 @@ use std::sync::Arc;
|
|||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use camino::{Utf8Path, Utf8PathBuf};
|
use camino::{Utf8Path, Utf8PathBuf};
|
||||||
use pageserver_api::models::TenantState;
|
use pageserver_api::models::TenantState;
|
||||||
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
|
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||||
use tokio::sync::OwnedMutexGuard;
|
use tokio::sync::OwnedMutexGuard;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{error, info, instrument, warn, Instrument, Span};
|
use tracing::{error, instrument, warn, Instrument, Span};
|
||||||
|
|
||||||
use utils::{
|
use utils::{
|
||||||
backoff, completion, crashsafe, fs_ext,
|
backoff, completion, crashsafe, fs_ext,
|
||||||
@@ -25,11 +25,9 @@ use super::{
|
|||||||
remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
|
remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
|
||||||
span,
|
span,
|
||||||
timeline::delete::DeleteTimelineFlow,
|
timeline::delete::DeleteTimelineFlow,
|
||||||
tree_sort_timelines, DeleteTimelineError, Tenant,
|
tree_sort_timelines, DeleteTimelineError, Tenant, TenantPreload,
|
||||||
};
|
};
|
||||||
|
|
||||||
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u32 = 3;
|
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub(crate) enum DeleteTenantError {
|
pub(crate) enum DeleteTenantError {
|
||||||
#[error("GetTenant {0}")]
|
#[error("GetTenant {0}")]
|
||||||
@@ -60,7 +58,7 @@ fn remote_tenant_delete_mark_path(
|
|||||||
.context("Failed to strip workdir prefix")
|
.context("Failed to strip workdir prefix")
|
||||||
.and_then(RemotePath::new)
|
.and_then(RemotePath::new)
|
||||||
.context("tenant path")?;
|
.context("tenant path")?;
|
||||||
Ok(tenant_remote_path.join(Utf8Path::new("deleted")))
|
Ok(tenant_remote_path.join(Utf8Path::new("timelines/deleted")))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_remote_delete_mark(
|
async fn create_remote_delete_mark(
|
||||||
@@ -239,32 +237,6 @@ async fn cleanup_remaining_fs_traces(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn remote_delete_mark_exists(
|
|
||||||
conf: &PageServerConf,
|
|
||||||
tenant_id: &TenantId,
|
|
||||||
remote_storage: &GenericRemoteStorage,
|
|
||||||
) -> anyhow::Result<bool> {
|
|
||||||
// If remote storage is there we rely on it
|
|
||||||
let remote_mark_path = remote_tenant_delete_mark_path(conf, tenant_id).context("path")?;
|
|
||||||
|
|
||||||
let result = backoff::retry(
|
|
||||||
|| async { remote_storage.download(&remote_mark_path).await },
|
|
||||||
|e| matches!(e, DownloadError::NotFound),
|
|
||||||
SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS,
|
|
||||||
SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS,
|
|
||||||
"fetch_tenant_deletion_mark",
|
|
||||||
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
|
|
||||||
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
match result {
|
|
||||||
Ok(_) => Ok(true),
|
|
||||||
Err(DownloadError::NotFound) => Ok(false),
|
|
||||||
Err(e) => Err(anyhow::anyhow!(e)).context("remote_delete_mark_exists")?,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Orchestrates tenant shut down of all tasks, removes its in-memory structures,
|
/// Orchestrates tenant shut down of all tasks, removes its in-memory structures,
|
||||||
/// and deletes its data from both disk and s3.
|
/// and deletes its data from both disk and s3.
|
||||||
/// The sequence of steps:
|
/// The sequence of steps:
|
||||||
@@ -276,10 +248,9 @@ pub(crate) async fn remote_delete_mark_exists(
|
|||||||
/// 6. Remove remote mark
|
/// 6. Remove remote mark
|
||||||
/// 7. Cleanup remaining fs traces, tenant dir, config, timelines dir, local delete mark
|
/// 7. Cleanup remaining fs traces, tenant dir, config, timelines dir, local delete mark
|
||||||
/// It is resumable from any step in case a crash/restart occurs.
|
/// It is resumable from any step in case a crash/restart occurs.
|
||||||
/// There are three entrypoints to the process:
|
/// There are two entrypoints to the process:
|
||||||
/// 1. [`DeleteTenantFlow::run`] this is the main one called by a management api handler.
|
/// 1. [`DeleteTenantFlow::run`] this is the main one called by a management api handler.
|
||||||
/// 2. [`DeleteTenantFlow::resume_from_load`] is called during restarts when local or remote deletion marks are still there.
|
/// 2. [`DeleteTenantFlow::resume_from_attach`] is called when deletion is resumed tenant is found to be deleted during attach process.
|
||||||
/// 3. [`DeleteTenantFlow::resume_from_attach`] is called when deletion is resumed tenant is found to be deleted during attach process.
|
|
||||||
/// Note the only other place that messes around timeline delete mark is the `Tenant::spawn_load` function.
|
/// Note the only other place that messes around timeline delete mark is the `Tenant::spawn_load` function.
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub enum DeleteTenantFlow {
|
pub enum DeleteTenantFlow {
|
||||||
@@ -378,7 +349,7 @@ impl DeleteTenantFlow {
|
|||||||
|
|
||||||
pub(crate) async fn should_resume_deletion(
|
pub(crate) async fn should_resume_deletion(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
remote_storage: Option<&GenericRemoteStorage>,
|
remote_mark_exists: bool,
|
||||||
tenant: &Tenant,
|
tenant: &Tenant,
|
||||||
) -> Result<Option<DeletionGuard>, DeleteTenantError> {
|
) -> Result<Option<DeletionGuard>, DeleteTenantError> {
|
||||||
let acquire = |t: &Tenant| {
|
let acquire = |t: &Tenant| {
|
||||||
@@ -389,66 +360,25 @@ impl DeleteTenantFlow {
|
|||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
let tenant_id = tenant.tenant_id;
|
if remote_mark_exists {
|
||||||
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
|
|
||||||
if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
|
|
||||||
return Ok(acquire(tenant));
|
return Ok(acquire(tenant));
|
||||||
}
|
}
|
||||||
|
|
||||||
let remote_storage = match remote_storage {
|
let tenant_id = tenant.tenant_id;
|
||||||
Some(remote_storage) => remote_storage,
|
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
|
||||||
None => return Ok(None),
|
if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
|
||||||
};
|
|
||||||
|
|
||||||
if remote_delete_mark_exists(conf, &tenant_id, remote_storage).await? {
|
|
||||||
Ok(acquire(tenant))
|
Ok(acquire(tenant))
|
||||||
} else {
|
} else {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn resume_from_load(
|
|
||||||
guard: DeletionGuard,
|
|
||||||
tenant: &Arc<Tenant>,
|
|
||||||
init_order: Option<&InitializationOrder>,
|
|
||||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
|
||||||
ctx: &RequestContext,
|
|
||||||
) -> Result<(), DeleteTenantError> {
|
|
||||||
let (_, progress) = completion::channel();
|
|
||||||
|
|
||||||
tenant
|
|
||||||
.set_stopping(progress, true, false)
|
|
||||||
.await
|
|
||||||
.expect("cant be stopping or broken");
|
|
||||||
|
|
||||||
// Do not consume valuable resources during the load phase, continue deletion once init phase is complete.
|
|
||||||
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
|
|
||||||
if let Some(background) = background_jobs_can_start {
|
|
||||||
info!("waiting for backgound jobs barrier");
|
|
||||||
background.clone().wait().await;
|
|
||||||
info!("ready for backgound jobs barrier");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tenant may not be loadable if we fail late in cleanup_remaining_fs_traces (e g remove timelines dir)
|
|
||||||
let timelines_path = tenant.conf.timelines_path(&tenant.tenant_id);
|
|
||||||
if timelines_path.exists() {
|
|
||||||
tenant.load(init_order, None, ctx).await.context("load")?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Self::background(
|
|
||||||
guard,
|
|
||||||
tenant.conf,
|
|
||||||
tenant.remote_storage.clone(),
|
|
||||||
tenants,
|
|
||||||
tenant,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn resume_from_attach(
|
pub(crate) async fn resume_from_attach(
|
||||||
guard: DeletionGuard,
|
guard: DeletionGuard,
|
||||||
tenant: &Arc<Tenant>,
|
tenant: &Arc<Tenant>,
|
||||||
|
preload: Option<TenantPreload>,
|
||||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||||
|
init_order: Option<InitializationOrder>,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<(), DeleteTenantError> {
|
) -> Result<(), DeleteTenantError> {
|
||||||
let (_, progress) = completion::channel();
|
let (_, progress) = completion::channel();
|
||||||
@@ -459,7 +389,7 @@ impl DeleteTenantFlow {
|
|||||||
.expect("cant be stopping or broken");
|
.expect("cant be stopping or broken");
|
||||||
|
|
||||||
tenant
|
tenant
|
||||||
.attach(ctx, super::AttachMarkerMode::Expect)
|
.attach(init_order, preload, ctx)
|
||||||
.await
|
.await
|
||||||
.context("attach")?;
|
.context("attach")?;
|
||||||
|
|
||||||
|
|||||||
@@ -26,10 +26,7 @@ use crate::deletion_queue::DeletionQueueClient;
|
|||||||
use crate::task_mgr::{self, TaskKind};
|
use crate::task_mgr::{self, TaskKind};
|
||||||
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
|
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
|
||||||
use crate::tenant::delete::DeleteTenantFlow;
|
use crate::tenant::delete::DeleteTenantFlow;
|
||||||
use crate::tenant::{
|
use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
|
||||||
create_tenant_files, AttachMarkerMode, AttachedTenantConf, CreateTenantFilesMode, Tenant,
|
|
||||||
TenantState,
|
|
||||||
};
|
|
||||||
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
|
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
|
||||||
|
|
||||||
use utils::crashsafe::path_with_suffix_extension;
|
use utils::crashsafe::path_with_suffix_extension;
|
||||||
@@ -437,14 +434,15 @@ pub async fn init_tenant_mgr(
|
|||||||
location_conf.attach_in_generation(generation);
|
location_conf.attach_in_generation(generation);
|
||||||
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
|
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
|
||||||
|
|
||||||
match schedule_local_tenant_processing(
|
match tenant_spawn(
|
||||||
conf,
|
conf,
|
||||||
tenant_id,
|
tenant_id,
|
||||||
&tenant_dir_path,
|
&tenant_dir_path,
|
||||||
AttachedTenantConf::try_from(location_conf)?,
|
|
||||||
resources.clone(),
|
resources.clone(),
|
||||||
|
AttachedTenantConf::try_from(location_conf)?,
|
||||||
Some(init_order.clone()),
|
Some(init_order.clone()),
|
||||||
&TENANTS,
|
&TENANTS,
|
||||||
|
SpawnMode::Normal,
|
||||||
&ctx,
|
&ctx,
|
||||||
) {
|
) {
|
||||||
Ok(tenant) => {
|
Ok(tenant) => {
|
||||||
@@ -464,15 +462,18 @@ pub async fn init_tenant_mgr(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wrapper for Tenant::spawn that checks invariants before running, and inserts
|
||||||
|
/// a broken tenant in the map if Tenant::spawn fails.
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub(crate) fn schedule_local_tenant_processing(
|
pub(crate) fn tenant_spawn(
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
tenant_id: TenantId,
|
tenant_id: TenantId,
|
||||||
tenant_path: &Utf8Path,
|
tenant_path: &Utf8Path,
|
||||||
location_conf: AttachedTenantConf,
|
|
||||||
resources: TenantSharedResources,
|
resources: TenantSharedResources,
|
||||||
|
location_conf: AttachedTenantConf,
|
||||||
init_order: Option<InitializationOrder>,
|
init_order: Option<InitializationOrder>,
|
||||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||||
|
mode: SpawnMode,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> anyhow::Result<Arc<Tenant>> {
|
) -> anyhow::Result<Arc<Tenant>> {
|
||||||
anyhow::ensure!(
|
anyhow::ensure!(
|
||||||
@@ -496,45 +497,24 @@ pub(crate) fn schedule_local_tenant_processing(
|
|||||||
"Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
|
"Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
|
||||||
);
|
);
|
||||||
|
|
||||||
let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
|
info!("Attaching tenant {tenant_id}");
|
||||||
info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
|
let tenant = match Tenant::spawn(
|
||||||
if resources.remote_storage.is_none() {
|
conf,
|
||||||
warn!("tenant {tenant_id} has attaching mark file, but pageserver has no remote storage configured");
|
tenant_id,
|
||||||
Tenant::create_broken_tenant(
|
resources,
|
||||||
conf,
|
location_conf,
|
||||||
tenant_id,
|
init_order,
|
||||||
"attaching mark file present but no remote storage configured".to_string(),
|
tenants,
|
||||||
)
|
mode,
|
||||||
} else {
|
ctx,
|
||||||
match Tenant::spawn_attach(
|
) {
|
||||||
conf,
|
Ok(tenant) => tenant,
|
||||||
tenant_id,
|
Err(e) => {
|
||||||
resources,
|
error!("Failed to spawn tenant {tenant_id}, reason: {e:#}");
|
||||||
location_conf,
|
Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
|
||||||
tenants,
|
|
||||||
AttachMarkerMode::Expect,
|
|
||||||
ctx,
|
|
||||||
) {
|
|
||||||
Ok(tenant) => tenant,
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
|
|
||||||
Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
|
|
||||||
// Start loading the tenant into memory. It will initially be in Loading state.
|
|
||||||
Tenant::spawn_load(
|
|
||||||
conf,
|
|
||||||
tenant_id,
|
|
||||||
location_conf,
|
|
||||||
resources,
|
|
||||||
init_order,
|
|
||||||
tenants,
|
|
||||||
ctx,
|
|
||||||
)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(tenant)
|
Ok(tenant)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -670,29 +650,41 @@ pub(crate) async fn create_tenant(
|
|||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<Arc<Tenant>, TenantMapInsertError> {
|
) -> Result<Arc<Tenant>, TenantMapInsertError> {
|
||||||
tenant_map_insert(tenant_id, || async {
|
tenant_map_insert(tenant_id, || async {
|
||||||
|
|
||||||
let location_conf = LocationConf::attached_single(tenant_conf, generation);
|
let location_conf = LocationConf::attached_single(tenant_conf, generation);
|
||||||
|
|
||||||
// We're holding the tenants lock in write mode while doing local IO.
|
// We're holding the tenants lock in write mode while doing local IO.
|
||||||
// If this section ever becomes contentious, introduce a new `TenantState::Creating`
|
// If this section ever becomes contentious, introduce a new `TenantState::Creating`
|
||||||
// and do the work in that state.
|
// and do the work in that state.
|
||||||
let tenant_directory = super::create_tenant_files(conf, &location_conf, &tenant_id, CreateTenantFilesMode::Create).await?;
|
super::create_tenant_files(conf, &location_conf, &tenant_id).await?;
|
||||||
|
|
||||||
// TODO: tenant directory remains on disk if we bail out from here on.
|
// TODO: tenant directory remains on disk if we bail out from here on.
|
||||||
// See https://github.com/neondatabase/neon/issues/4233
|
// See https://github.com/neondatabase/neon/issues/4233
|
||||||
|
|
||||||
let created_tenant =
|
let tenant_path = conf.tenant_path(&tenant_id);
|
||||||
schedule_local_tenant_processing(conf, tenant_id, &tenant_directory,
|
|
||||||
AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)?;
|
let created_tenant = tenant_spawn(
|
||||||
|
conf,
|
||||||
|
tenant_id,
|
||||||
|
&tenant_path,
|
||||||
|
resources,
|
||||||
|
AttachedTenantConf::try_from(location_conf)?,
|
||||||
|
None,
|
||||||
|
&TENANTS,
|
||||||
|
SpawnMode::Create,
|
||||||
|
ctx,
|
||||||
|
)?;
|
||||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||||
// See https://github.com/neondatabase/neon/issues/4233
|
// See https://github.com/neondatabase/neon/issues/4233
|
||||||
|
|
||||||
let crated_tenant_id = created_tenant.tenant_id();
|
let crated_tenant_id = created_tenant.tenant_id();
|
||||||
anyhow::ensure!(
|
anyhow::ensure!(
|
||||||
tenant_id == crated_tenant_id,
|
tenant_id == crated_tenant_id,
|
||||||
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {crated_tenant_id})",
|
"loaded created tenant has unexpected tenant id \
|
||||||
);
|
(expect {tenant_id} != actual {crated_tenant_id})",
|
||||||
|
);
|
||||||
Ok(created_tenant)
|
Ok(created_tenant)
|
||||||
}).await
|
})
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
@@ -801,9 +793,10 @@ pub(crate) async fn upsert_location(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let tenant_path = conf.tenant_path(&tenant_id);
|
||||||
|
|
||||||
let new_slot = match &new_location_config.mode {
|
let new_slot = match &new_location_config.mode {
|
||||||
LocationMode::Secondary(_) => {
|
LocationMode::Secondary(_) => {
|
||||||
let tenant_path = conf.tenant_path(&tenant_id);
|
|
||||||
// Directory doesn't need to be fsync'd because if we crash it can
|
// Directory doesn't need to be fsync'd because if we crash it can
|
||||||
// safely be recreated next time this tenant location is configured.
|
// safely be recreated next time this tenant location is configured.
|
||||||
unsafe_create_dir_all(&tenant_path)
|
unsafe_create_dir_all(&tenant_path)
|
||||||
@@ -833,28 +826,21 @@ pub(crate) async fn upsert_location(
|
|||||||
.await
|
.await
|
||||||
.map_err(SetNewTenantConfigError::Persist)?;
|
.map_err(SetNewTenantConfigError::Persist)?;
|
||||||
|
|
||||||
let tenant = match Tenant::spawn_attach(
|
let tenant = tenant_spawn(
|
||||||
conf,
|
conf,
|
||||||
tenant_id,
|
tenant_id,
|
||||||
|
&tenant_path,
|
||||||
TenantSharedResources {
|
TenantSharedResources {
|
||||||
broker_client,
|
broker_client,
|
||||||
remote_storage,
|
remote_storage,
|
||||||
deletion_queue_client,
|
deletion_queue_client,
|
||||||
},
|
},
|
||||||
AttachedTenantConf::try_from(new_location_config)?,
|
AttachedTenantConf::try_from(new_location_config)?,
|
||||||
|
None,
|
||||||
&TENANTS,
|
&TENANTS,
|
||||||
// The LocationConf API does not use marker files, because we have Secondary
|
SpawnMode::Normal,
|
||||||
// locations where the directory's existence is not a signal that it contains
|
|
||||||
// all timelines. See https://github.com/neondatabase/neon/issues/5550
|
|
||||||
AttachMarkerMode::Ignore,
|
|
||||||
ctx,
|
ctx,
|
||||||
) {
|
)?;
|
||||||
Ok(tenant) => tenant,
|
|
||||||
Err(e) => {
|
|
||||||
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
|
|
||||||
Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
TenantSlot::Attached(tenant)
|
TenantSlot::Attached(tenant)
|
||||||
}
|
}
|
||||||
@@ -1043,7 +1029,7 @@ pub(crate) async fn load_tenant(
|
|||||||
location_conf.attach_in_generation(generation);
|
location_conf.attach_in_generation(generation);
|
||||||
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
|
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
|
||||||
|
|
||||||
let new_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_path, AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)
|
let new_tenant = tenant_spawn(conf, tenant_id, &tenant_path, resources, AttachedTenantConf::try_from(location_conf)?, None, &TENANTS, SpawnMode::Normal, ctx)
|
||||||
.with_context(|| {
|
.with_context(|| {
|
||||||
format!("Failed to schedule tenant processing in path {tenant_path:?}")
|
format!("Failed to schedule tenant processing in path {tenant_path:?}")
|
||||||
})?;
|
})?;
|
||||||
@@ -1117,18 +1103,12 @@ pub(crate) async fn attach_tenant(
|
|||||||
) -> Result<(), TenantMapInsertError> {
|
) -> Result<(), TenantMapInsertError> {
|
||||||
tenant_map_insert(tenant_id, || async {
|
tenant_map_insert(tenant_id, || async {
|
||||||
let location_conf = LocationConf::attached_single(tenant_conf, generation);
|
let location_conf = LocationConf::attached_single(tenant_conf, generation);
|
||||||
let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id, CreateTenantFilesMode::Attach).await?;
|
let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?;
|
||||||
// TODO: tenant directory remains on disk if we bail out from here on.
|
// TODO: tenant directory remains on disk if we bail out from here on.
|
||||||
// See https://github.com/neondatabase/neon/issues/4233
|
// See https://github.com/neondatabase/neon/issues/4233
|
||||||
|
|
||||||
// Without the attach marker, schedule_local_tenant_processing will treat the attached tenant as fully attached
|
let attached_tenant = tenant_spawn(conf, tenant_id, &tenant_dir,
|
||||||
let marker_file_exists = conf
|
resources, AttachedTenantConf::try_from(location_conf)?, None, &TENANTS, SpawnMode::Normal, ctx)?;
|
||||||
.tenant_attaching_mark_file_path(&tenant_id)
|
|
||||||
.try_exists()
|
|
||||||
.context("check for attach marker file existence")?;
|
|
||||||
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
|
|
||||||
|
|
||||||
let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)?;
|
|
||||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||||
// See https://github.com/neondatabase/neon/issues/4233
|
// See https://github.com/neondatabase/neon/issues/4233
|
||||||
|
|
||||||
|
|||||||
@@ -168,36 +168,14 @@
|
|||||||
//! - create `Timeline` struct and a `RemoteTimelineClient`
|
//! - create `Timeline` struct and a `RemoteTimelineClient`
|
||||||
//! - initialize the client's upload queue with its `IndexPart`
|
//! - initialize the client's upload queue with its `IndexPart`
|
||||||
//! - schedule uploads for layers that are only present locally.
|
//! - schedule uploads for layers that are only present locally.
|
||||||
//! - if the remote `IndexPart`'s metadata was newer than the metadata in
|
|
||||||
//! the local filesystem, write the remote metadata to the local filesystem
|
|
||||||
//! - After the above is done for each timeline, open the tenant for business by
|
//! - After the above is done for each timeline, open the tenant for business by
|
||||||
//! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
|
//! transitioning it from `TenantState::Attaching` to `TenantState::Active` state.
|
||||||
//! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
|
//! This starts the timelines' WAL-receivers and the tenant's GC & Compaction loops.
|
||||||
//!
|
//!
|
||||||
//! We keep track of the fact that a client is in `Attaching` state in a marker
|
|
||||||
//! file on the local disk. This is critical because, when we restart the pageserver,
|
|
||||||
//! we do not want to do the `List timelines` step for each tenant that has already
|
|
||||||
//! been successfully attached (for performance & cost reasons).
|
|
||||||
//! Instead, for a tenant without the attach marker file, we assume that the
|
|
||||||
//! local state is in sync or ahead of the remote state. This includes the list
|
|
||||||
//! of all of the tenant's timelines, which is particularly critical to be up-to-date:
|
|
||||||
//! if there's a timeline on the remote that the pageserver doesn't know about,
|
|
||||||
//! the GC will not consider its branch point, leading to data loss.
|
|
||||||
//! So, for a tenant with the attach marker file, we know that we do not yet have
|
|
||||||
//! persisted all the remote timeline's metadata files locally. To exclude the
|
|
||||||
//! risk above, we re-run the procedure for such tenants
|
|
||||||
//!
|
|
||||||
//! # Operating Without Remote Storage
|
//! # Operating Without Remote Storage
|
||||||
//!
|
//!
|
||||||
//! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
|
//! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
|
||||||
//! not created and the uploads are skipped.
|
//! not created and the uploads are skipped.
|
||||||
//! Theoretically, it should be ok to remove and re-add remote storage configuration to
|
|
||||||
//! the pageserver config at any time, since it doesn't make a difference to
|
|
||||||
//! [`Timeline::load_layer_map`].
|
|
||||||
//! Of course, the remote timeline dir must not change while we have de-configured
|
|
||||||
//! remote storage, i.e., the pageserver must remain the owner of the given prefix
|
|
||||||
//! in remote storage.
|
|
||||||
//! But note that we don't test any of this right now.
|
|
||||||
//!
|
//!
|
||||||
//! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
|
//! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
|
||||||
//! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
|
//! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
|
||||||
@@ -468,7 +446,10 @@ impl RemoteTimelineClient {
|
|||||||
//
|
//
|
||||||
|
|
||||||
/// Download index file
|
/// Download index file
|
||||||
pub async fn download_index_file(&self) -> Result<MaybeDeletedIndexPart, DownloadError> {
|
pub async fn download_index_file(
|
||||||
|
&self,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
) -> Result<MaybeDeletedIndexPart, DownloadError> {
|
||||||
let _unfinished_gauge_guard = self.metrics.call_begin(
|
let _unfinished_gauge_guard = self.metrics.call_begin(
|
||||||
&RemoteOpFileKind::Index,
|
&RemoteOpFileKind::Index,
|
||||||
&RemoteOpKind::Download,
|
&RemoteOpKind::Download,
|
||||||
@@ -482,6 +463,7 @@ impl RemoteTimelineClient {
|
|||||||
&self.tenant_id,
|
&self.tenant_id,
|
||||||
&self.timeline_id,
|
&self.timeline_id,
|
||||||
self.generation,
|
self.generation,
|
||||||
|
cancel,
|
||||||
)
|
)
|
||||||
.measure_remote_op(
|
.measure_remote_op(
|
||||||
self.tenant_id,
|
self.tenant_id,
|
||||||
@@ -1725,7 +1707,11 @@ mod tests {
|
|||||||
let client = timeline.remote_client.as_ref().unwrap();
|
let client = timeline.remote_client.as_ref().unwrap();
|
||||||
|
|
||||||
// Download back the index.json, and check that the list of files is correct
|
// Download back the index.json, and check that the list of files is correct
|
||||||
let initial_index_part = match client.download_index_file().await.unwrap() {
|
let initial_index_part = match client
|
||||||
|
.download_index_file(CancellationToken::new())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
{
|
||||||
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
|
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
|
||||||
MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
|
MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
|
||||||
};
|
};
|
||||||
@@ -1814,7 +1800,11 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Download back the index.json, and check that the list of files is correct
|
// Download back the index.json, and check that the list of files is correct
|
||||||
let index_part = match client.download_index_file().await.unwrap() {
|
let index_part = match client
|
||||||
|
.download_index_file(CancellationToken::new())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
{
|
||||||
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
|
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
|
||||||
MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
|
MaybeDeletedIndexPart::Deleted(_) => panic!("unexpectedly got deleted index part"),
|
||||||
};
|
};
|
||||||
@@ -2013,7 +2003,7 @@ mod tests {
|
|||||||
let client = test_state.build_client(get_generation);
|
let client = test_state.build_client(get_generation);
|
||||||
|
|
||||||
let download_r = client
|
let download_r = client
|
||||||
.download_index_file()
|
.download_index_file(CancellationToken::new())
|
||||||
.await
|
.await
|
||||||
.expect("download should always succeed");
|
.expect("download should always succeed");
|
||||||
assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
|
assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_)));
|
||||||
|
|||||||
@@ -18,8 +18,8 @@ use crate::config::PageServerConf;
|
|||||||
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
|
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
|
||||||
use crate::tenant::storage_layer::LayerFileName;
|
use crate::tenant::storage_layer::LayerFileName;
|
||||||
use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||||
use crate::tenant::{Generation, TENANT_DELETED_MARKER_FILE_NAME};
|
use crate::tenant::Generation;
|
||||||
use remote_storage::{DownloadError, GenericRemoteStorage};
|
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
|
||||||
use utils::crashsafe::path_with_suffix_extension;
|
use utils::crashsafe::path_with_suffix_extension;
|
||||||
use utils::id::{TenantId, TimelineId};
|
use utils::id::{TenantId, TimelineId};
|
||||||
|
|
||||||
@@ -170,53 +170,43 @@ pub fn is_temp_download_file(path: &Utf8Path) -> bool {
|
|||||||
pub async fn list_remote_timelines(
|
pub async fn list_remote_timelines(
|
||||||
storage: &GenericRemoteStorage,
|
storage: &GenericRemoteStorage,
|
||||||
tenant_id: TenantId,
|
tenant_id: TenantId,
|
||||||
) -> anyhow::Result<HashSet<TimelineId>> {
|
cancel: CancellationToken,
|
||||||
|
) -> anyhow::Result<(HashSet<TimelineId>, HashSet<String>)> {
|
||||||
let remote_path = remote_timelines_path(&tenant_id);
|
let remote_path = remote_timelines_path(&tenant_id);
|
||||||
|
|
||||||
fail::fail_point!("storage-sync-list-remote-timelines", |_| {
|
fail::fail_point!("storage-sync-list-remote-timelines", |_| {
|
||||||
anyhow::bail!("storage-sync-list-remote-timelines");
|
anyhow::bail!("storage-sync-list-remote-timelines");
|
||||||
});
|
});
|
||||||
|
|
||||||
let timelines = download_retry(
|
let listing = download_retry_forever(
|
||||||
|| storage.list_prefixes(Some(&remote_path)),
|
|| storage.list(Some(&remote_path), ListingMode::WithDelimiter),
|
||||||
&format!("list prefixes for {tenant_id}"),
|
&format!("list timelines for {tenant_id}"),
|
||||||
|
cancel,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if timelines.is_empty() {
|
|
||||||
anyhow::bail!("no timelines found on the remote storage")
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut timeline_ids = HashSet::new();
|
let mut timeline_ids = HashSet::new();
|
||||||
|
let mut other_prefixes = HashSet::new();
|
||||||
|
|
||||||
for timeline_remote_storage_key in timelines {
|
for timeline_remote_storage_key in listing.prefixes {
|
||||||
if timeline_remote_storage_key.object_name() == Some(TENANT_DELETED_MARKER_FILE_NAME) {
|
|
||||||
// A `deleted` key within `timelines/` is a marker file, not a timeline. Ignore it.
|
|
||||||
// This code will be removed in https://github.com/neondatabase/neon/pull/5580
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
|
let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
|
||||||
anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}")
|
anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}")
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let timeline_id: TimelineId = object_name
|
match object_name.parse::<TimelineId>() {
|
||||||
.parse()
|
Ok(t) => timeline_ids.insert(t),
|
||||||
.with_context(|| format!("parse object name into timeline id '{object_name}'"))?;
|
Err(_) => other_prefixes.insert(object_name.to_string()),
|
||||||
|
};
|
||||||
// list_prefixes is assumed to return unique names. Ensure this here.
|
|
||||||
// NB: it's safer to bail out than warn-log this because the pageserver
|
|
||||||
// needs to absolutely know about _all_ timelines that exist, so that
|
|
||||||
// GC knows all the branchpoints. If we skipped over a timeline instead,
|
|
||||||
// GC could delete a layer that's still needed by that timeline.
|
|
||||||
anyhow::ensure!(
|
|
||||||
!timeline_ids.contains(&timeline_id),
|
|
||||||
"list_prefixes contains duplicate timeline id {timeline_id}"
|
|
||||||
);
|
|
||||||
timeline_ids.insert(timeline_id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(timeline_ids)
|
for key in listing.keys {
|
||||||
|
let object_name = key
|
||||||
|
.object_name()
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("object name for key {key}"))?;
|
||||||
|
other_prefixes.insert(object_name.to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((timeline_ids, other_prefixes))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn do_download_index_part(
|
async fn do_download_index_part(
|
||||||
@@ -224,10 +214,11 @@ async fn do_download_index_part(
|
|||||||
tenant_id: &TenantId,
|
tenant_id: &TenantId,
|
||||||
timeline_id: &TimelineId,
|
timeline_id: &TimelineId,
|
||||||
index_generation: Generation,
|
index_generation: Generation,
|
||||||
|
cancel: CancellationToken,
|
||||||
) -> Result<IndexPart, DownloadError> {
|
) -> Result<IndexPart, DownloadError> {
|
||||||
let remote_path = remote_index_path(tenant_id, timeline_id, index_generation);
|
let remote_path = remote_index_path(tenant_id, timeline_id, index_generation);
|
||||||
|
|
||||||
let index_part_bytes = download_retry(
|
let index_part_bytes = download_retry_forever(
|
||||||
|| async {
|
|| async {
|
||||||
let mut index_part_download = storage.download(&remote_path).await?;
|
let mut index_part_download = storage.download(&remote_path).await?;
|
||||||
|
|
||||||
@@ -242,6 +233,7 @@ async fn do_download_index_part(
|
|||||||
Ok(index_part_bytes)
|
Ok(index_part_bytes)
|
||||||
},
|
},
|
||||||
&format!("download {remote_path:?}"),
|
&format!("download {remote_path:?}"),
|
||||||
|
cancel,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@@ -263,19 +255,28 @@ pub(super) async fn download_index_part(
|
|||||||
tenant_id: &TenantId,
|
tenant_id: &TenantId,
|
||||||
timeline_id: &TimelineId,
|
timeline_id: &TimelineId,
|
||||||
my_generation: Generation,
|
my_generation: Generation,
|
||||||
|
cancel: CancellationToken,
|
||||||
) -> Result<IndexPart, DownloadError> {
|
) -> Result<IndexPart, DownloadError> {
|
||||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||||
|
|
||||||
if my_generation.is_none() {
|
if my_generation.is_none() {
|
||||||
// Operating without generations: just fetch the generation-less path
|
// Operating without generations: just fetch the generation-less path
|
||||||
return do_download_index_part(storage, tenant_id, timeline_id, my_generation).await;
|
return do_download_index_part(storage, tenant_id, timeline_id, my_generation, cancel)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stale case: If we were intentionally attached in a stale generation, there may already be a remote
|
// Stale case: If we were intentionally attached in a stale generation, there may already be a remote
|
||||||
// index in our generation.
|
// index in our generation.
|
||||||
//
|
//
|
||||||
// This is an optimization to avoid doing the listing for the general case below.
|
// This is an optimization to avoid doing the listing for the general case below.
|
||||||
let res = do_download_index_part(storage, tenant_id, timeline_id, my_generation).await;
|
let res = do_download_index_part(
|
||||||
|
storage,
|
||||||
|
tenant_id,
|
||||||
|
timeline_id,
|
||||||
|
my_generation,
|
||||||
|
cancel.clone(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
match res {
|
match res {
|
||||||
Ok(index_part) => {
|
Ok(index_part) => {
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
@@ -295,8 +296,14 @@ pub(super) async fn download_index_part(
|
|||||||
// we want to find the most recent index from a previous generation.
|
// we want to find the most recent index from a previous generation.
|
||||||
//
|
//
|
||||||
// This is an optimization to avoid doing the listing for the general case below.
|
// This is an optimization to avoid doing the listing for the general case below.
|
||||||
let res =
|
let res = do_download_index_part(
|
||||||
do_download_index_part(storage, tenant_id, timeline_id, my_generation.previous()).await;
|
storage,
|
||||||
|
tenant_id,
|
||||||
|
timeline_id,
|
||||||
|
my_generation.previous(),
|
||||||
|
cancel.clone(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
match res {
|
match res {
|
||||||
Ok(index_part) => {
|
Ok(index_part) => {
|
||||||
tracing::debug!("Found index_part from previous generation");
|
tracing::debug!("Found index_part from previous generation");
|
||||||
@@ -340,13 +347,14 @@ pub(super) async fn download_index_part(
|
|||||||
match max_previous_generation {
|
match max_previous_generation {
|
||||||
Some(g) => {
|
Some(g) => {
|
||||||
tracing::debug!("Found index_part in generation {g:?}");
|
tracing::debug!("Found index_part in generation {g:?}");
|
||||||
do_download_index_part(storage, tenant_id, timeline_id, g).await
|
do_download_index_part(storage, tenant_id, timeline_id, g, cancel).await
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
// Migration from legacy pre-generation state: we have a generation but no prior
|
// Migration from legacy pre-generation state: we have a generation but no prior
|
||||||
// attached pageservers did. Try to load from a no-generation path.
|
// attached pageservers did. Try to load from a no-generation path.
|
||||||
tracing::info!("No index_part.json* found");
|
tracing::info!("No index_part.json* found");
|
||||||
do_download_index_part(storage, tenant_id, timeline_id, Generation::none()).await
|
do_download_index_part(storage, tenant_id, timeline_id, Generation::none(), cancel)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -376,3 +384,23 @@ where
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn download_retry_forever<T, O, F>(
|
||||||
|
op: O,
|
||||||
|
description: &str,
|
||||||
|
cancel: CancellationToken,
|
||||||
|
) -> Result<T, DownloadError>
|
||||||
|
where
|
||||||
|
O: FnMut() -> F,
|
||||||
|
F: Future<Output = Result<T, DownloadError>>,
|
||||||
|
{
|
||||||
|
backoff::retry(
|
||||||
|
op,
|
||||||
|
|e| matches!(e, DownloadError::BadInput(_) | DownloadError::NotFound),
|
||||||
|
FAILED_DOWNLOAD_WARN_THRESHOLD,
|
||||||
|
u32::MAX,
|
||||||
|
description,
|
||||||
|
backoff::Cancel::new(cancel, || DownloadError::Cancelled),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|||||||
@@ -294,6 +294,7 @@ async fn cleanup_remaining_timeline_fs_traces(
|
|||||||
// Remove delete mark
|
// Remove delete mark
|
||||||
tokio::fs::remove_file(conf.timeline_delete_mark_file_path(tenant_id, timeline_id))
|
tokio::fs::remove_file(conf.timeline_delete_mark_file_path(tenant_id, timeline_id))
|
||||||
.await
|
.await
|
||||||
|
.or_else(fs_ext::ignore_not_found)
|
||||||
.context("remove delete mark")
|
.context("remove delete mark")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -60,6 +60,7 @@ from fixtures.utils import (
|
|||||||
allure_attach_from_dir,
|
allure_attach_from_dir,
|
||||||
get_self_dir,
|
get_self_dir,
|
||||||
subprocess_capture,
|
subprocess_capture,
|
||||||
|
wait_until,
|
||||||
)
|
)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@@ -1679,6 +1680,40 @@ class NeonPageserver(PgProtocol):
|
|||||||
self.running = False
|
self.running = False
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def restart(self, immediate: bool = False):
|
||||||
|
"""
|
||||||
|
High level wrapper for restart: restarts the process, and waits for
|
||||||
|
tenant state to stabilize.
|
||||||
|
"""
|
||||||
|
self.stop(immediate=immediate)
|
||||||
|
self.start()
|
||||||
|
self.quiesce_tenants()
|
||||||
|
|
||||||
|
def quiesce_tenants(self):
|
||||||
|
"""
|
||||||
|
Wait for all tenants to enter a stable state (Active or Broken)
|
||||||
|
|
||||||
|
Call this after restarting the pageserver, or after attaching a tenant,
|
||||||
|
to ensure that it is ready for use.
|
||||||
|
"""
|
||||||
|
|
||||||
|
stable_states = {"Active", "Broken"}
|
||||||
|
|
||||||
|
client = self.http_client()
|
||||||
|
|
||||||
|
def complete():
|
||||||
|
log.info("Checking tenants...")
|
||||||
|
tenants = client.tenant_list()
|
||||||
|
log.info(f"Tenant list: {tenants}...")
|
||||||
|
any_unstable = any((t["state"]["slug"] not in stable_states) for t in tenants)
|
||||||
|
if any_unstable:
|
||||||
|
for t in tenants:
|
||||||
|
log.info(f"Waiting for tenant {t['id']} in state {t['state']['slug']}")
|
||||||
|
log.info(f"any_unstable={any_unstable}")
|
||||||
|
assert not any_unstable
|
||||||
|
|
||||||
|
wait_until(20, 0.5, complete)
|
||||||
|
|
||||||
def __enter__(self) -> "NeonPageserver":
|
def __enter__(self) -> "NeonPageserver":
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
|||||||
@@ -333,16 +333,30 @@ def test_non_uploaded_root_timeline_is_deleted_after_restart(neon_env_builder: N
|
|||||||
env = neon_env_builder.init_configs()
|
env = neon_env_builder.init_configs()
|
||||||
env.start()
|
env.start()
|
||||||
|
|
||||||
env.pageserver.allowed_errors.append(
|
env.pageserver.allowed_errors.extend(
|
||||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
|
[
|
||||||
|
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*",
|
||||||
|
".*Failed to load index_part from remote storage.*",
|
||||||
|
# On a fast restart, there may be an initdb still running in a basebackup...__temp directory
|
||||||
|
".*Failed to purge.*Directory not empty.*",
|
||||||
|
]
|
||||||
)
|
)
|
||||||
ps_http = env.pageserver.http_client()
|
ps_http = env.pageserver.http_client()
|
||||||
|
|
||||||
# pause all uploads
|
# pause all uploads
|
||||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
|
||||||
ps_http.tenant_create(env.initial_tenant)
|
ps_http.tenant_create(env.initial_tenant)
|
||||||
|
|
||||||
|
# Create a timeline whose creation will succeed. The tenant will need at least one
|
||||||
|
# timeline to be loadable.
|
||||||
|
success_timeline = TimelineId.generate()
|
||||||
|
log.info(f"Creating timeline {success_timeline}")
|
||||||
|
ps_http.timeline_create(env.pg_version, env.initial_tenant, success_timeline, timeout=60)
|
||||||
|
|
||||||
|
# Create a timeline whose upload to remote storage will be blocked
|
||||||
|
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||||
|
|
||||||
def start_creating_timeline():
|
def start_creating_timeline():
|
||||||
|
log.info(f"Creating (expect failure) timeline {env.initial_timeline}")
|
||||||
with pytest.raises(RequestException):
|
with pytest.raises(RequestException):
|
||||||
ps_http.timeline_create(
|
ps_http.timeline_create(
|
||||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||||
@@ -366,6 +380,9 @@ def test_non_uploaded_root_timeline_is_deleted_after_restart(neon_env_builder: N
|
|||||||
with pytest.raises(PageserverApiException, match="not found"):
|
with pytest.raises(PageserverApiException, match="not found"):
|
||||||
ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
|
ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
|
||||||
|
|
||||||
|
# The one successfully created timeline should still be there.
|
||||||
|
assert len(ps_http.timeline_list(tenant_id=env.initial_tenant)) == 1
|
||||||
|
|
||||||
|
|
||||||
def test_non_uploaded_branch_is_deleted_after_restart(neon_env_builder: NeonEnvBuilder):
|
def test_non_uploaded_branch_is_deleted_after_restart(neon_env_builder: NeonEnvBuilder):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ from fixtures.types import TenantId, TimelineId
|
|||||||
|
|
||||||
# Test restarting page server, while safekeeper and compute node keep
|
# Test restarting page server, while safekeeper and compute node keep
|
||||||
# running.
|
# running.
|
||||||
def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
def test_local_corruption(neon_env_builder: NeonEnvBuilder):
|
||||||
env = neon_env_builder.init_start()
|
env = neon_env_builder.init_start()
|
||||||
|
|
||||||
env.pageserver.allowed_errors.extend(
|
env.pageserver.allowed_errors.extend(
|
||||||
@@ -69,24 +69,19 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
|||||||
|
|
||||||
env.pageserver.start()
|
env.pageserver.start()
|
||||||
|
|
||||||
# Tenant 0 should still work
|
# Un-damaged tenant works
|
||||||
pg0.start()
|
pg0.start()
|
||||||
assert pg0.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100
|
assert pg0.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100
|
||||||
|
|
||||||
# But all others are broken
|
# Tenant with corrupt local metadata works: remote storage is authoritative for metadata
|
||||||
|
pg1.start()
|
||||||
# First timeline would not get loaded into pageserver due to corrupt metadata file
|
assert pg1.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100
|
||||||
with pytest.raises(
|
|
||||||
Exception, match=f"Tenant {tenant1} will not become active. Current state: Broken"
|
|
||||||
) as err:
|
|
||||||
pg1.start()
|
|
||||||
log.info(
|
|
||||||
f"As expected, compute startup failed eagerly for timeline with corrupt metadata: {err}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Second timeline will fail during basebackup, because the local layer file is corrupt.
|
# Second timeline will fail during basebackup, because the local layer file is corrupt.
|
||||||
# It will fail when we try to read (and reconstruct) a page from it, ergo the error message.
|
# It will fail when we try to read (and reconstruct) a page from it, ergo the error message.
|
||||||
# (We don't check layer file contents on startup, when loading the timeline)
|
# (We don't check layer file contents on startup, when loading the timeline)
|
||||||
|
#
|
||||||
|
# This will change when we implement checksums for layers
|
||||||
with pytest.raises(Exception, match="layer loading failed:") as err:
|
with pytest.raises(Exception, match="layer loading failed:") as err:
|
||||||
pg2.start()
|
pg2.start()
|
||||||
log.info(
|
log.info(
|
||||||
@@ -133,8 +128,7 @@ def test_timeline_init_break_before_checkpoint(neon_env_builder: NeonEnvBuilder)
|
|||||||
_ = env.neon_cli.create_timeline("test_timeline_init_break_before_checkpoint", tenant_id)
|
_ = env.neon_cli.create_timeline("test_timeline_init_break_before_checkpoint", tenant_id)
|
||||||
|
|
||||||
# Restart the page server
|
# Restart the page server
|
||||||
env.pageserver.stop(immediate=True)
|
env.pageserver.restart(immediate=True)
|
||||||
env.pageserver.start()
|
|
||||||
|
|
||||||
# Creating the timeline didn't finish. The other timelines on tenant should still be present and work normally.
|
# Creating the timeline didn't finish. The other timelines on tenant should still be present and work normally.
|
||||||
new_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
|
new_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
|
||||||
|
|||||||
@@ -62,14 +62,14 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool)
|
|||||||
tenant_load_delay_ms = 5000
|
tenant_load_delay_ms = 5000
|
||||||
env.pageserver.stop()
|
env.pageserver.stop()
|
||||||
env.pageserver.start(
|
env.pageserver.start(
|
||||||
extra_env_vars={"FAILPOINTS": f"before-loading-tenant=return({tenant_load_delay_ms})"}
|
extra_env_vars={"FAILPOINTS": f"before-attaching-tenant=return({tenant_load_delay_ms})"}
|
||||||
)
|
)
|
||||||
|
|
||||||
# Check that it's in Loading state
|
# Check that it's in Attaching state
|
||||||
client = env.pageserver.http_client()
|
client = env.pageserver.http_client()
|
||||||
tenant_status = client.tenant_status(env.initial_tenant)
|
tenant_status = client.tenant_status(env.initial_tenant)
|
||||||
log.info("Tenant status : %s", tenant_status)
|
log.info("Tenant status : %s", tenant_status)
|
||||||
assert tenant_status["state"]["slug"] == "Loading"
|
assert tenant_status["state"]["slug"] == "Attaching"
|
||||||
|
|
||||||
# Try to read. This waits until the loading finishes, and then return normally.
|
# Try to read. This waits until the loading finishes, and then return normally.
|
||||||
cur.execute("SELECT count(*) FROM foo")
|
cur.execute("SELECT count(*) FROM foo")
|
||||||
|
|||||||
@@ -241,8 +241,7 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
|
|||||||
assert reason.endswith(f"failpoint: {failpoint}"), reason
|
assert reason.endswith(f"failpoint: {failpoint}"), reason
|
||||||
|
|
||||||
if check is Check.RETRY_WITH_RESTART:
|
if check is Check.RETRY_WITH_RESTART:
|
||||||
env.pageserver.stop()
|
env.pageserver.restart()
|
||||||
env.pageserver.start()
|
|
||||||
|
|
||||||
if failpoint in (
|
if failpoint in (
|
||||||
"tenant-delete-before-shutdown",
|
"tenant-delete-before-shutdown",
|
||||||
|
|||||||
@@ -66,10 +66,6 @@ def test_tenant_reattach(
|
|||||||
env.pageserver.allowed_errors.append(
|
env.pageserver.allowed_errors.append(
|
||||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||||
)
|
)
|
||||||
# Thats because of UnreliableWrapper's injected failures
|
|
||||||
env.pageserver.allowed_errors.append(
|
|
||||||
f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*"
|
|
||||||
)
|
|
||||||
|
|
||||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||||
with endpoint.cursor() as cur:
|
with endpoint.cursor() as cur:
|
||||||
@@ -116,7 +112,7 @@ def test_tenant_reattach(
|
|||||||
assert query_scalar(cur, "SELECT count(*) FROM t") == 100000
|
assert query_scalar(cur, "SELECT count(*) FROM t") == 100000
|
||||||
|
|
||||||
# Check that we had to retry the downloads
|
# Check that we had to retry the downloads
|
||||||
assert env.pageserver.log_contains(".*list prefixes.*failed, will retry.*")
|
assert env.pageserver.log_contains(".*list timelines.*failed, will retry.*")
|
||||||
assert env.pageserver.log_contains(".*download.*failed, will retry.*")
|
assert env.pageserver.log_contains(".*download.*failed, will retry.*")
|
||||||
|
|
||||||
|
|
||||||
@@ -643,47 +639,6 @@ def test_ignored_tenant_download_missing_layers(neon_env_builder: NeonEnvBuilder
|
|||||||
ensure_test_data(data_id, data_secret, endpoint)
|
ensure_test_data(data_id, data_secret, endpoint)
|
||||||
|
|
||||||
|
|
||||||
# Tests that it's possible to `load` broken tenants:
|
|
||||||
# * `ignore` a tenant
|
|
||||||
# * removes its `metadata` file locally
|
|
||||||
# * `load` the same tenant
|
|
||||||
# * ensure that it's status is `Broken`
|
|
||||||
def test_ignored_tenant_stays_broken_without_metadata(neon_env_builder: NeonEnvBuilder):
|
|
||||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
|
||||||
env = neon_env_builder.init_start()
|
|
||||||
pageserver_http = env.pageserver.http_client()
|
|
||||||
env.endpoints.create_start("main")
|
|
||||||
|
|
||||||
tenant_id = env.initial_tenant
|
|
||||||
timeline_id = env.initial_timeline
|
|
||||||
|
|
||||||
# Attempts to connect from compute to pageserver while the tenant is
|
|
||||||
# temporarily detached produces these errors in the pageserver log.
|
|
||||||
env.pageserver.allowed_errors.append(f".*Tenant {tenant_id} not found.*")
|
|
||||||
env.pageserver.allowed_errors.append(
|
|
||||||
f".*Tenant {tenant_id} will not become active\\. Current state: (Broken|Stopping).*"
|
|
||||||
)
|
|
||||||
|
|
||||||
# ignore the tenant and remove its metadata
|
|
||||||
pageserver_http.tenant_ignore(tenant_id)
|
|
||||||
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
|
|
||||||
metadata_removed = False
|
|
||||||
for dir_entry in timeline_dir.iterdir():
|
|
||||||
if dir_entry.name == "metadata":
|
|
||||||
# Looks like a layer file. Remove it
|
|
||||||
dir_entry.unlink()
|
|
||||||
metadata_removed = True
|
|
||||||
assert metadata_removed, f"Failed to find metadata file in {timeline_dir}"
|
|
||||||
|
|
||||||
env.pageserver.allowed_errors.append(
|
|
||||||
f".*{tenant_id}.*: load failed.*: failed to load metadata.*"
|
|
||||||
)
|
|
||||||
|
|
||||||
# now, load it from the local files and expect it to be broken due to inability to load tenant files into memory
|
|
||||||
pageserver_http.tenant_load(tenant_id=tenant_id)
|
|
||||||
wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 5)
|
|
||||||
|
|
||||||
|
|
||||||
# Tests that attach is never working on a tenant, ignored or not, as long as it's not absent locally
|
# Tests that attach is never working on a tenant, ignored or not, as long as it's not absent locally
|
||||||
# Similarly, tests that it's not possible to schedule a `load` for tenat that's not ignored.
|
# Similarly, tests that it's not possible to schedule a `load` for tenat that's not ignored.
|
||||||
def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder):
|
def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder):
|
||||||
@@ -778,7 +733,8 @@ def test_ignore_while_attaching(
|
|||||||
tenants_before_ignore
|
tenants_before_ignore
|
||||||
), "Only ignored tenant should be missing"
|
), "Only ignored tenant should be missing"
|
||||||
|
|
||||||
# But can load it from local files, that will restore attach.
|
# Calling load will bring the tenant back online
|
||||||
|
pageserver_http.configure_failpoints([("attach-before-activate", "off")])
|
||||||
pageserver_http.tenant_load(tenant_id)
|
pageserver_http.tenant_load(tenant_id)
|
||||||
|
|
||||||
wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5)
|
wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5)
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
import os
|
import os
|
||||||
import shutil
|
|
||||||
import time
|
import time
|
||||||
from contextlib import closing
|
from contextlib import closing
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
@@ -20,7 +19,7 @@ from fixtures.neon_fixtures import (
|
|||||||
)
|
)
|
||||||
from fixtures.pageserver.utils import timeline_delete_wait_completed
|
from fixtures.pageserver.utils import timeline_delete_wait_completed
|
||||||
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
|
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
|
||||||
from fixtures.types import Lsn, TenantId, TimelineId
|
from fixtures.types import Lsn, TenantId
|
||||||
from fixtures.utils import wait_until
|
from fixtures.utils import wait_until
|
||||||
from prometheus_client.samples import Sample
|
from prometheus_client.samples import Sample
|
||||||
|
|
||||||
@@ -298,13 +297,8 @@ def test_pageserver_with_empty_tenants(
|
|||||||
|
|
||||||
client = env.pageserver.http_client()
|
client = env.pageserver.http_client()
|
||||||
|
|
||||||
tenant_with_empty_timelines = TenantId.generate()
|
tenant_with_empty_timelines = env.initial_tenant
|
||||||
client.tenant_create(tenant_with_empty_timelines)
|
timeline_delete_wait_completed(client, tenant_with_empty_timelines, env.initial_timeline)
|
||||||
temp_timelines = client.timeline_list(tenant_with_empty_timelines)
|
|
||||||
for temp_timeline in temp_timelines:
|
|
||||||
timeline_delete_wait_completed(
|
|
||||||
client, tenant_with_empty_timelines, TimelineId(temp_timeline["timeline_id"])
|
|
||||||
)
|
|
||||||
|
|
||||||
files_in_timelines_dir = sum(
|
files_in_timelines_dir = sum(
|
||||||
1 for _p in Path.iterdir(env.pageserver.timeline_dir(tenant_with_empty_timelines))
|
1 for _p in Path.iterdir(env.pageserver.timeline_dir(tenant_with_empty_timelines))
|
||||||
@@ -317,34 +311,19 @@ def test_pageserver_with_empty_tenants(
|
|||||||
env.endpoints.stop_all()
|
env.endpoints.stop_all()
|
||||||
env.pageserver.stop()
|
env.pageserver.stop()
|
||||||
|
|
||||||
tenant_without_timelines_dir = env.initial_tenant
|
|
||||||
shutil.rmtree(env.pageserver.timeline_dir(tenant_without_timelines_dir))
|
|
||||||
|
|
||||||
env.pageserver.start()
|
env.pageserver.start()
|
||||||
|
|
||||||
client = env.pageserver.http_client()
|
client = env.pageserver.http_client()
|
||||||
|
|
||||||
def not_loading():
|
def not_attaching():
|
||||||
tenants = client.tenant_list()
|
tenants = client.tenant_list()
|
||||||
assert len(tenants) == 2
|
assert len(tenants) == 1
|
||||||
assert all(t["state"]["slug"] != "Loading" for t in tenants)
|
assert all(t["state"]["slug"] != "Attaching" for t in tenants)
|
||||||
|
|
||||||
wait_until(10, 0.2, not_loading)
|
wait_until(10, 0.2, not_attaching)
|
||||||
|
|
||||||
tenants = client.tenant_list()
|
tenants = client.tenant_list()
|
||||||
|
|
||||||
[broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)]
|
|
||||||
assert (
|
|
||||||
broken_tenant["state"]["slug"] == "Broken"
|
|
||||||
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
|
|
||||||
|
|
||||||
broken_tenant_status = client.tenant_status(tenant_without_timelines_dir)
|
|
||||||
assert (
|
|
||||||
broken_tenant_status["state"]["slug"] == "Broken"
|
|
||||||
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
|
|
||||||
|
|
||||||
assert env.pageserver.log_contains(".*load failed, setting tenant state to Broken:.*")
|
|
||||||
|
|
||||||
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
|
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines)]
|
||||||
assert (
|
assert (
|
||||||
loaded_tenant["state"]["slug"] == "Active"
|
loaded_tenant["state"]["slug"] == "Active"
|
||||||
@@ -358,9 +337,6 @@ def test_pageserver_with_empty_tenants(
|
|||||||
time.sleep(1) # to allow metrics propagation
|
time.sleep(1) # to allow metrics propagation
|
||||||
|
|
||||||
ps_metrics = client.get_metrics()
|
ps_metrics = client.get_metrics()
|
||||||
broken_tenants_metric_filter = {
|
|
||||||
"tenant_id": str(tenant_without_timelines_dir),
|
|
||||||
}
|
|
||||||
active_tenants_metric_filter = {
|
active_tenants_metric_filter = {
|
||||||
"state": "Active",
|
"state": "Active",
|
||||||
}
|
}
|
||||||
@@ -374,13 +350,3 @@ def test_pageserver_with_empty_tenants(
|
|||||||
assert (
|
assert (
|
||||||
tenant_active_count == 1
|
tenant_active_count == 1
|
||||||
), f"Tenant {tenant_with_empty_timelines} should have metric as active"
|
), f"Tenant {tenant_with_empty_timelines} should have metric as active"
|
||||||
|
|
||||||
tenant_broken_count = int(
|
|
||||||
ps_metrics.query_one(
|
|
||||||
"pageserver_broken_tenants_count", filter=broken_tenants_metric_filter
|
|
||||||
).value
|
|
||||||
)
|
|
||||||
|
|
||||||
assert (
|
|
||||||
tenant_broken_count == 1
|
|
||||||
), f"Tenant {tenant_without_timelines_dir} should have metric as broken"
|
|
||||||
|
|||||||
@@ -70,8 +70,7 @@ def test_threshold_based_eviction(
|
|||||||
}
|
}
|
||||||
|
|
||||||
# restart because changing tenant config is not instant
|
# restart because changing tenant config is not instant
|
||||||
env.pageserver.stop()
|
env.pageserver.restart()
|
||||||
env.pageserver.start()
|
|
||||||
|
|
||||||
assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == {
|
assert ps_http.tenant_config(tenant_id).effective_config["eviction_policy"] == {
|
||||||
"kind": "LayerAccessThreshold",
|
"kind": "LayerAccessThreshold",
|
||||||
|
|||||||
@@ -277,13 +277,6 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
|
|||||||
|
|
||||||
if failpoint == "timeline-delete-after-index-delete":
|
if failpoint == "timeline-delete-after-index-delete":
|
||||||
m = ps_http.get_metrics()
|
m = ps_http.get_metrics()
|
||||||
assert (
|
|
||||||
m.query_one(
|
|
||||||
"remote_storage_s3_request_seconds_count",
|
|
||||||
filter={"request_type": "get_object", "result": "err"},
|
|
||||||
).value
|
|
||||||
== 2 # One is missing tenant deletion mark, second is missing index part
|
|
||||||
)
|
|
||||||
assert (
|
assert (
|
||||||
m.query_one(
|
m.query_one(
|
||||||
"remote_storage_s3_request_seconds_count",
|
"remote_storage_s3_request_seconds_count",
|
||||||
|
|||||||
Reference in New Issue
Block a user