mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 19:40:39 +00:00
Add commands to unload and load the tenant in memory (#2977)
Closes https://github.com/neondatabase/neon/issues/2537 Follow-up of https://github.com/neondatabase/neon/pull/2950 With the new model that prevents attaching without the remote storage, it has started to be even more odd to add attach-with-files functionality (in addition to the issues raised previously). Adds two separate commands: * `POST {tenant_id}/ignore` that places a mark file to skip such tenant on every start and removes it from memory * `POST {tenant_id}/schedule_load` that tries to load a tenant from local FS similar to what pageserver does now on startup, but without directory removals
This commit is contained in:
@@ -27,7 +27,9 @@ use utils::{
|
||||
|
||||
use crate::tenant::{TENANT_ATTACHING_MARKER_FILENAME, TIMELINES_SEGMENT_NAME};
|
||||
use crate::tenant_config::{TenantConf, TenantConfOpt};
|
||||
use crate::{METADATA_FILE_NAME, TENANT_CONFIG_NAME, TIMELINE_UNINIT_MARK_SUFFIX};
|
||||
use crate::{
|
||||
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TIMELINE_UNINIT_MARK_SUFFIX,
|
||||
};
|
||||
|
||||
pub mod defaults {
|
||||
use crate::tenant_config::defaults::*;
|
||||
@@ -402,6 +404,10 @@ impl PageServerConf {
|
||||
.join(TENANT_ATTACHING_MARKER_FILENAME)
|
||||
}
|
||||
|
||||
pub fn tenant_ignore_mark_file_path(&self, tenant_id: TenantId) -> PathBuf {
|
||||
self.tenant_path(&tenant_id).join(IGNORED_TENANT_FILE_NAME)
|
||||
}
|
||||
|
||||
/// Points to a place in pageserver's local directory,
|
||||
/// where certain tenant's tenantconf file should be located.
|
||||
pub fn tenant_config_path(&self, tenant_id: TenantId) -> PathBuf {
|
||||
|
||||
@@ -274,6 +274,7 @@ paths:
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
|
||||
post:
|
||||
description: Schedules attach operation to happen in the background for given tenant
|
||||
responses:
|
||||
@@ -325,7 +326,9 @@ paths:
|
||||
type: string
|
||||
format: hex
|
||||
post:
|
||||
description: Detach local tenant
|
||||
description: |
|
||||
Remove tenant data (including all corresponding timelines) from pageserver's memory and file system.
|
||||
Files on the remote storage are not affected.
|
||||
responses:
|
||||
"200":
|
||||
description: Tenant detached
|
||||
@@ -354,6 +357,92 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/ignore:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
post:
|
||||
description: |
|
||||
Remove tenant data (including all corresponding timelines) from pageserver's memory.
|
||||
Files on local disk and remote storage are not affected.
|
||||
|
||||
Future pageserver restarts won't load the data back until `load` is called on such tenant.
|
||||
responses:
|
||||
"200":
|
||||
description: Tenant ignored
|
||||
"400":
|
||||
description: Error when no tenant id found in path parameters
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/load:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
post:
|
||||
description: |
|
||||
Schedules an operation that attempts to load a tenant from the local disk and
|
||||
synchronise it with the remote storage (if enabled), repeating pageserver's restart logic for tenant load.
|
||||
If the tenant was ignored before, removes the ignore mark and continues with load scheduling.
|
||||
|
||||
Errors if the tenant is absent on disk, already present in memory or fails to schedule its load.
|
||||
Scheduling a load does not mean that the tenant would load successfully, check tenant status to ensure load correctness.
|
||||
responses:
|
||||
"202":
|
||||
description: Tenant scheduled to load successfully
|
||||
"400":
|
||||
description: Error when no tenant id found in path parameters
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/size:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
|
||||
@@ -349,13 +349,13 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
|
||||
if let Some(remote_storage) = &state.remote_storage {
|
||||
// FIXME: distinguish between "Tenant already exists" and other errors
|
||||
tenant_mgr::attach_tenant(state.conf, tenant_id, remote_storage)
|
||||
tenant_mgr::attach_tenant(state.conf, tenant_id, remote_storage.clone())
|
||||
.instrument(info_span!("tenant_attach", tenant = %tenant_id))
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
} else {
|
||||
return Err(ApiError::BadRequest(anyhow!(
|
||||
"attach_tenant is possible because pageserver was configured without remote storage"
|
||||
"attach_tenant is not possible because pageserver was configured without remote storage"
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -394,6 +394,35 @@ async fn tenant_detach_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn tenant_load_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let state = get_state(&request);
|
||||
tenant_mgr::load_tenant(state.conf, tenant_id, state.remote_storage.clone())
|
||||
.instrument(info_span!("load", tenant = %tenant_id))
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
async fn tenant_ignore_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let state = get_state(&request);
|
||||
let conf = state.conf;
|
||||
tenant_mgr::ignore_tenant(conf, tenant_id)
|
||||
.instrument(info_span!("ignore_tenant", tenant = %tenant_id))
|
||||
.await
|
||||
// FIXME: Errors from `ignore_tenant` can be caused by both both user and internal errors.
|
||||
// Replace this with better handling once the error type permits it.
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn tenant_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
|
||||
@@ -833,6 +862,8 @@ pub fn make_router(
|
||||
.post("/v1/tenant/:tenant_id/timeline", timeline_create_handler)
|
||||
.post("/v1/tenant/:tenant_id/attach", tenant_attach_handler)
|
||||
.post("/v1/tenant/:tenant_id/detach", tenant_detach_handler)
|
||||
.post("/v1/tenant/:tenant_id/load", tenant_load_handler)
|
||||
.post("/v1/tenant/:tenant_id/ignore", tenant_ignore_handler)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id",
|
||||
timeline_detail_handler,
|
||||
|
||||
@@ -125,6 +125,13 @@ pub const TEMP_FILE_SUFFIX: &str = "___temp";
|
||||
/// Full path: `tenants/<tenant_id>/timelines/<timeline_id>___uninit`.
|
||||
pub const TIMELINE_UNINIT_MARK_SUFFIX: &str = "___uninit";
|
||||
|
||||
/// A marker file to prevent pageserver from loading a certain tenant on restart.
|
||||
/// Different from [`TIMELINE_UNINIT_MARK_SUFFIX`] due to semantics of the corresponding
|
||||
/// `ignore` management API command, that expects the ignored tenant to be properly loaded
|
||||
/// into pageserver's memory before being ignored.
|
||||
/// Full path: `tenants/<tenant_id>/___ignored_tenant`.
|
||||
pub const IGNORED_TENANT_FILE_NAME: &str = "___ignored_tenant";
|
||||
|
||||
pub fn is_temporary(path: &Path) -> bool {
|
||||
match path.file_name() {
|
||||
Some(name) => name.to_string_lossy().ends_with(TEMP_FILE_SUFFIX),
|
||||
|
||||
@@ -571,7 +571,7 @@ impl Tenant {
|
||||
pub fn spawn_attach(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
) -> Arc<Tenant> {
|
||||
// XXX: Attach should provide the config, especially during tenant migration.
|
||||
// See https://github.com/neondatabase/neon/issues/1555
|
||||
@@ -584,7 +584,7 @@ impl Tenant {
|
||||
tenant_conf,
|
||||
wal_redo_manager,
|
||||
tenant_id,
|
||||
Some(remote_storage.clone()),
|
||||
Some(remote_storage),
|
||||
));
|
||||
|
||||
// Do all the hard work in the background
|
||||
|
||||
@@ -1075,7 +1075,7 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
trace!("downloading image file: {}", path.display());
|
||||
trace!("downloading image file: {path:?}");
|
||||
let sz = remote_client
|
||||
.download_layer_file(&RemotePath::new(path), &layer_metadata)
|
||||
.await
|
||||
@@ -1105,7 +1105,7 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
trace!("downloading delta file: {}", path.display());
|
||||
trace!("downloading delta file: {path:?}");
|
||||
let sz = remote_client
|
||||
.download_layer_file(&RemotePath::new(path), &layer_metadata)
|
||||
.await
|
||||
|
||||
@@ -13,11 +13,13 @@ use tokio::sync::RwLock;
|
||||
use tracing::*;
|
||||
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use utils::crashsafe;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::{Tenant, TenantState};
|
||||
use crate::tenant_config::TenantConfOpt;
|
||||
use crate::IGNORED_TENANT_FILE_NAME;
|
||||
|
||||
use utils::fs_ext::PathExt;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -47,24 +49,52 @@ pub async fn init_tenant_mgr(
|
||||
Ok(Some(dir_entry)) => {
|
||||
let tenant_dir_path = dir_entry.path();
|
||||
if crate::is_temporary(&tenant_dir_path) {
|
||||
info!("Found temporary tenant directory, removing: {tenant_dir_path:?}",);
|
||||
info!(
|
||||
"Found temporary tenant directory, removing: {}",
|
||||
tenant_dir_path.display()
|
||||
);
|
||||
if let Err(e) = fs::remove_dir_all(&tenant_dir_path).await {
|
||||
error!("Failed to remove temporary directory {tenant_dir_path:?}: {e:?}");
|
||||
error!(
|
||||
"Failed to remove temporary directory '{}': {:?}",
|
||||
tenant_dir_path.display(),
|
||||
e
|
||||
);
|
||||
}
|
||||
} else {
|
||||
match load_local_tenant(conf, &tenant_dir_path, remote_storage.clone()) {
|
||||
Ok(Some(tenant)) => {
|
||||
TENANTS.write().await.insert(tenant.tenant_id(), tenant);
|
||||
number_of_tenants += 1;
|
||||
}
|
||||
Ok(None) => {
|
||||
// This case happens if we crash during attach before creating the attach marker file
|
||||
if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
|
||||
error!("Failed to remove empty tenant directory {tenant_dir_path:?}: {e:#}")
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}"),
|
||||
// This case happens if we crash during attach before creating the attach marker file
|
||||
let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
|
||||
format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
|
||||
})?;
|
||||
if is_empty {
|
||||
info!("removing empty tenant directory {tenant_dir_path:?}");
|
||||
if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
|
||||
error!(
|
||||
"Failed to remove empty tenant directory '{}': {e:#}",
|
||||
tenant_dir_path.display()
|
||||
)
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
|
||||
if tenant_ignore_mark_file.exists() {
|
||||
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
|
||||
continue;
|
||||
}
|
||||
|
||||
match schedule_local_tenant_processing(
|
||||
conf,
|
||||
&tenant_dir_path,
|
||||
remote_storage.clone(),
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
TENANTS.write().await.insert(tenant.tenant_id(), tenant);
|
||||
number_of_tenants += 1;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -82,34 +112,45 @@ pub async fn init_tenant_mgr(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn load_local_tenant(
|
||||
pub fn schedule_local_tenant_processing(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_path: &Path,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
) -> anyhow::Result<Option<Arc<Tenant>>> {
|
||||
if !tenant_path.is_dir() {
|
||||
anyhow::bail!("tenant_path is not a directory: {tenant_path:?}")
|
||||
}
|
||||
|
||||
let is_empty = tenant_path
|
||||
.is_empty_dir()
|
||||
.context("check whether tenant_path is an empty dir")?;
|
||||
if is_empty {
|
||||
info!("skipping empty tenant directory {tenant_path:?}");
|
||||
return Ok(None);
|
||||
}
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
anyhow::ensure!(
|
||||
tenant_path.is_dir(),
|
||||
"Cannot load tenant from path {tenant_path:?}, it either does not exist or not a directory"
|
||||
);
|
||||
anyhow::ensure!(
|
||||
!crate::is_temporary(tenant_path),
|
||||
"Cannot load tenant from temporary path {tenant_path:?}"
|
||||
);
|
||||
anyhow::ensure!(
|
||||
!tenant_path.is_empty_dir().with_context(|| {
|
||||
format!("Failed to check whether {tenant_path:?} is an empty dir")
|
||||
})?,
|
||||
"Cannot load tenant from empty directory {tenant_path:?}"
|
||||
);
|
||||
|
||||
let tenant_id = tenant_path
|
||||
.file_name()
|
||||
.and_then(OsStr::to_str)
|
||||
.unwrap_or_default()
|
||||
.parse::<TenantId>()
|
||||
.context("Could not parse tenant id out of the tenant dir name")?;
|
||||
.with_context(|| {
|
||||
format!("Could not parse tenant id out of the tenant dir name in path {tenant_path:?}")
|
||||
})?;
|
||||
|
||||
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(tenant_id);
|
||||
anyhow::ensure!(
|
||||
!conf.tenant_ignore_mark_file_path(tenant_id).exists(),
|
||||
"Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
|
||||
);
|
||||
|
||||
let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
|
||||
info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
|
||||
if let Some(remote_storage) = remote_storage {
|
||||
Tenant::spawn_attach(conf, tenant_id, &remote_storage)
|
||||
Tenant::spawn_attach(conf, tenant_id, remote_storage)
|
||||
} else {
|
||||
warn!("tenant {tenant_id} has attaching mark file, but pageserver has no remote storage configured");
|
||||
Tenant::create_broken_tenant(conf, tenant_id)
|
||||
@@ -119,7 +160,7 @@ fn load_local_tenant(
|
||||
// Start loading the tenant into memory. It will initially be in Loading state.
|
||||
Tenant::spawn_load(conf, tenant_id, remote_storage)
|
||||
};
|
||||
Ok(Some(tenant))
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
///
|
||||
@@ -177,25 +218,15 @@ pub async fn create_tenant(
|
||||
// If this section ever becomes contentious, introduce a new `TenantState::Creating`.
|
||||
let tenant_directory =
|
||||
super::tenant::create_tenant_files(conf, tenant_conf, tenant_id)?;
|
||||
let created_tenant = load_local_tenant(conf, &tenant_directory, remote_storage)?;
|
||||
match created_tenant {
|
||||
None => {
|
||||
// We get None in case the directory is empty.
|
||||
// This shouldn't happen here, because we just created the directory.
|
||||
// So, skip any cleanup work for now, we don't know how we reached this state.
|
||||
anyhow::bail!("we just created the tenant directory, it can't be empty");
|
||||
}
|
||||
Some(tenant) => {
|
||||
anyhow::ensure!(
|
||||
tenant_id == tenant.tenant_id(),
|
||||
"loaded created tenant has unexpected tenant id (expect {} != actual {})",
|
||||
tenant_id,
|
||||
tenant.tenant_id()
|
||||
);
|
||||
v.insert(Arc::clone(&tenant));
|
||||
Ok(Some(tenant))
|
||||
}
|
||||
}
|
||||
let created_tenant =
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, remote_storage)?;
|
||||
let crated_tenant_id = created_tenant.tenant_id();
|
||||
anyhow::ensure!(
|
||||
tenant_id == crated_tenant_id,
|
||||
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {crated_tenant_id})",
|
||||
);
|
||||
v.insert(Arc::clone(&created_tenant));
|
||||
Ok(Some(created_tenant))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -266,35 +297,58 @@ pub async fn detach_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<()> {
|
||||
let tenant = match {
|
||||
let mut tenants_accessor = TENANTS.write().await;
|
||||
tenants_accessor.remove(&tenant_id)
|
||||
} {
|
||||
Some(tenant) => tenant,
|
||||
None => anyhow::bail!("Tenant not found for id {tenant_id}"),
|
||||
};
|
||||
remove_tenant_from_memory(tenant_id, async {
|
||||
let local_tenant_directory = conf.tenant_path(&tenant_id);
|
||||
fs::remove_dir_all(&local_tenant_directory)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to remove local tenant directory {local_tenant_directory:?}")
|
||||
})?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
tenant.set_stopping();
|
||||
// shutdown all tenant and timeline tasks: gc, compaction, page service)
|
||||
task_mgr::shutdown_tasks(None, Some(tenant_id), None).await;
|
||||
pub async fn load_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
) -> anyhow::Result<()> {
|
||||
run_if_no_tenant_in_memory(tenant_id, |vacant_entry| {
|
||||
let tenant_path = conf.tenant_path(&tenant_id);
|
||||
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(tenant_id);
|
||||
if tenant_ignore_mark.exists() {
|
||||
std::fs::remove_file(&tenant_ignore_mark)
|
||||
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
|
||||
}
|
||||
|
||||
// If removal fails there will be no way to successfully retry detach,
|
||||
// because the tenant no longer exists in the in-memory map. And it needs to be removed from it
|
||||
// before we remove files, because it contains references to tenant
|
||||
// which references ephemeral files which are deleted on drop. So if we keep these references,
|
||||
// we will attempt to remove files which no longer exist. This can be fixed by having shutdown
|
||||
// mechanism for tenant that will clean temporary data to avoid any references to ephemeral files
|
||||
let local_tenant_directory = conf.tenant_path(&tenant_id);
|
||||
fs::remove_dir_all(&local_tenant_directory)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to remove local tenant directory '{}'",
|
||||
local_tenant_directory.display()
|
||||
)
|
||||
})?;
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, remote_storage)
|
||||
.with_context(|| {
|
||||
format!("Failed to schedule tenant processing in path {tenant_path:?}")
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
vacant_entry.insert(new_tenant);
|
||||
Ok(())
|
||||
}).await
|
||||
}
|
||||
|
||||
pub async fn ignore_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<()> {
|
||||
remove_tenant_from_memory(tenant_id, async {
|
||||
let ignore_mark_file = conf.tenant_ignore_mark_file_path(tenant_id);
|
||||
fs::File::create(&ignore_mark_file)
|
||||
.await
|
||||
.context("Failed to create ignore mark file")
|
||||
.and_then(|_| {
|
||||
crashsafe::fsync_file_and_parent(&ignore_mark_file)
|
||||
.context("Failed to fsync ignore mark file")
|
||||
})
|
||||
.with_context(|| format!("Failed to crate ignore mark for tenant {tenant_id}"))?;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
///
|
||||
@@ -316,25 +370,92 @@ pub async fn list_tenants() -> Vec<(TenantId, TenantState)> {
|
||||
pub async fn attach_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
) -> anyhow::Result<()> {
|
||||
run_if_no_tenant_in_memory(tenant_id, |vacant_entry| {
|
||||
let tenant_path = conf.tenant_path(&tenant_id);
|
||||
anyhow::ensure!(
|
||||
!tenant_path.exists(),
|
||||
"Cannot attach tenant {tenant_id}, local tenant directory already exists"
|
||||
);
|
||||
|
||||
let tenant = Tenant::spawn_attach(conf, tenant_id, remote_storage);
|
||||
vacant_entry.insert(tenant);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn run_if_no_tenant_in_memory<F, V>(tenant_id: TenantId, run: F) -> anyhow::Result<V>
|
||||
where
|
||||
F: FnOnce(hash_map::VacantEntry<TenantId, Arc<Tenant>>) -> anyhow::Result<V>,
|
||||
{
|
||||
match TENANTS.write().await.entry(tenant_id) {
|
||||
hash_map::Entry::Occupied(e) => {
|
||||
// Cannot attach a tenant that already exists. The error message depends on
|
||||
// the state it's in.
|
||||
match e.get().current_state() {
|
||||
TenantState::Attaching => {
|
||||
anyhow::bail!("tenant {tenant_id} attach is already in progress")
|
||||
}
|
||||
current_state => {
|
||||
anyhow::bail!("tenant already exists, current state: {current_state:?}")
|
||||
}
|
||||
}
|
||||
anyhow::bail!(
|
||||
"tenant {tenant_id} already exists, state: {:?}",
|
||||
e.get().current_state()
|
||||
)
|
||||
}
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
let tenant = Tenant::spawn_attach(conf, tenant_id, remote_storage);
|
||||
v.insert(tenant);
|
||||
Ok(())
|
||||
hash_map::Entry::Vacant(v) => run(v),
|
||||
}
|
||||
}
|
||||
|
||||
/// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
|
||||
/// Allows to remove other tenant resources manually, via `tenant_cleanup`.
|
||||
/// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
|
||||
/// operation would be needed to remove it.
|
||||
async fn remove_tenant_from_memory<V, F>(
|
||||
tenant_id: TenantId,
|
||||
tenant_cleanup: F,
|
||||
) -> anyhow::Result<V>
|
||||
where
|
||||
F: std::future::Future<Output = anyhow::Result<V>>,
|
||||
{
|
||||
// It's important to keep the tenant in memory after the final cleanup, to avoid cleanup races.
|
||||
// The exclusive lock here ensures we don't miss the tenant state updates before trying another removal.
|
||||
// tenant-wde cleanup operations may take some time (removing the entire tenant directory), we want to
|
||||
// avoid holding the lock for the entire process.
|
||||
{
|
||||
let tenants_accessor = TENANTS.write().await;
|
||||
match tenants_accessor.get(&tenant_id) {
|
||||
Some(tenant) => match tenant.current_state() {
|
||||
TenantState::Attaching
|
||||
| TenantState::Loading
|
||||
| TenantState::Broken
|
||||
| TenantState::Active => tenant.set_stopping(),
|
||||
TenantState::Stopping => {
|
||||
anyhow::bail!("Tenant {tenant_id} is stopping already")
|
||||
}
|
||||
},
|
||||
None => anyhow::bail!("Tenant not found for id {tenant_id}"),
|
||||
}
|
||||
}
|
||||
|
||||
// shutdown all tenant and timeline tasks: gc, compaction, page service)
|
||||
// No new tasks will be started for this tenant because it's in `Stopping` state.
|
||||
// Hence, once we're done here, the `tenant_cleanup` callback can mutate tenant on-disk state freely.
|
||||
task_mgr::shutdown_tasks(None, Some(tenant_id), None).await;
|
||||
|
||||
match tenant_cleanup
|
||||
.await
|
||||
.with_context(|| format!("Failed to run cleanup for tenant {tenant_id}"))
|
||||
{
|
||||
Ok(hook_value) => {
|
||||
let mut tenants_accessor = TENANTS.write().await;
|
||||
if tenants_accessor.remove(&tenant_id).is_none() {
|
||||
warn!("Tenant {tenant_id} got removed from memory before operation finished");
|
||||
}
|
||||
Ok(hook_value)
|
||||
}
|
||||
Err(e) => {
|
||||
let tenants_accessor = TENANTS.read().await;
|
||||
match tenants_accessor.get(&tenant_id) {
|
||||
Some(tenant) => tenant.set_broken(),
|
||||
None => warn!("Tenant {tenant_id} got removed from memory"),
|
||||
}
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1119,6 +1119,14 @@ class PageserverHttpClient(requests.Session):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach")
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_load(self, tenant_id: TenantId):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/load")
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_ignore(self, tenant_id: TenantId):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/ignore")
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_status(self, tenant_id: TenantId) -> Dict[Any, Any]:
|
||||
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -71,8 +71,10 @@ def test_remote_storage_backup_and_restore(
|
||||
# FIXME retry downloads without throwing errors
|
||||
env.pageserver.allowed_errors.append(".*failed to load remote timeline.*")
|
||||
# we have a bunch of pytest.raises for these below
|
||||
env.pageserver.allowed_errors.append(".*tenant already exists.*")
|
||||
env.pageserver.allowed_errors.append(".*attach is already in progress.*")
|
||||
env.pageserver.allowed_errors.append(".*tenant .*? already exists, state:.*")
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Cannot attach tenant .*?, local tenant directory already exists.*"
|
||||
)
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
pg = env.postgres.create_start("main")
|
||||
@@ -136,7 +138,7 @@ def test_remote_storage_backup_and_restore(
|
||||
|
||||
# assert cannot attach timeline that is scheduled for download
|
||||
# FIXME implement layer download retries
|
||||
with pytest.raises(Exception, match="tenant already exists, current state: Broken"):
|
||||
with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state: Broken"):
|
||||
client.tenant_attach(tenant_id)
|
||||
|
||||
tenant_status = client.tenant_status(tenant_id)
|
||||
@@ -149,9 +151,7 @@ def test_remote_storage_backup_and_restore(
|
||||
env.pageserver.start()
|
||||
|
||||
# ensure that an initiated attach operation survives pageserver restart
|
||||
with pytest.raises(
|
||||
Exception, match=r".*(tenant already exists|attach is already in progress).*"
|
||||
):
|
||||
with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state:"):
|
||||
client.tenant_attach(tenant_id)
|
||||
log.info("waiting for timeline redownload")
|
||||
wait_until(
|
||||
@@ -191,7 +191,7 @@ def test_remote_storage_upload_queue_retries(
|
||||
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_remote_storage_backup_and_restore",
|
||||
test_name="test_remote_storage_upload_queue_retries",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -353,7 +353,7 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_remote_storage_backup_and_restore",
|
||||
test_name="test_timeline_deletion_with_files_stuck_in_upload_queue",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
@@ -7,6 +7,7 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PageserverApiException,
|
||||
PageserverHttpClient,
|
||||
Postgres,
|
||||
RemoteStorageKind,
|
||||
available_remote_storages,
|
||||
wait_for_last_record_lsn,
|
||||
@@ -167,3 +168,337 @@ def test_detach_while_attaching(
|
||||
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("SELECT COUNT(*) FROM foo")
|
||||
|
||||
|
||||
# Tests that `ignore` and `get` operations' combination is able to remove and restore the tenant in pageserver's memory.
|
||||
# * writes some data into tenant's timeline
|
||||
# * ensures it's synced with the remote storage
|
||||
# * `ignore` the tenant
|
||||
# * verify that ignored tenant files are generally unchanged, only an ignored mark had appeared
|
||||
# * verify the ignored tenant is gone from pageserver's memory
|
||||
# * restart the pageserver and verify that ignored tenant is still not loaded
|
||||
# * `load` the same tenant
|
||||
# * ensure that it's status is `Active` and it's present in pageserver's memory with all timelines
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3])
|
||||
def test_ignored_tenant_reattach(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_remote_storage_backup_and_restore",
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
ignored_tenant_id, _ = env.neon_cli.create_tenant()
|
||||
tenant_dir = env.repo_dir / "tenants" / str(ignored_tenant_id)
|
||||
tenants_before_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()]
|
||||
tenants_before_ignore.sort()
|
||||
timelines_before_ignore = [
|
||||
timeline["timeline_id"]
|
||||
for timeline in pageserver_http.timeline_list(tenant_id=ignored_tenant_id)
|
||||
]
|
||||
files_before_ignore = [tenant_path for tenant_path in tenant_dir.glob("**/*")]
|
||||
|
||||
# ignore the tenant and veirfy it's not present in pageserver replies, with its files still on disk
|
||||
pageserver_http.tenant_ignore(ignored_tenant_id)
|
||||
|
||||
files_after_ignore_with_retain = [tenant_path for tenant_path in tenant_dir.glob("**/*")]
|
||||
new_files = set(files_after_ignore_with_retain) - set(files_before_ignore)
|
||||
disappeared_files = set(files_before_ignore) - set(files_after_ignore_with_retain)
|
||||
assert (
|
||||
len(disappeared_files) == 0
|
||||
), f"Tenant ignore should not remove files from disk, missing: {disappeared_files}"
|
||||
assert (
|
||||
len(new_files) == 1
|
||||
), f"Only tenant ignore file should appear on disk but got: {new_files}"
|
||||
|
||||
tenants_after_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()]
|
||||
assert ignored_tenant_id not in tenants_after_ignore, "Ignored tenant should be missing"
|
||||
assert len(tenants_after_ignore) + 1 == len(
|
||||
tenants_before_ignore
|
||||
), "Only ignored tenant should be missing"
|
||||
|
||||
# restart the pageserver to ensure we don't load the ignore timeline
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
tenants_after_restart = [tenant["id"] for tenant in pageserver_http.tenant_list()]
|
||||
tenants_after_restart.sort()
|
||||
assert (
|
||||
tenants_after_restart == tenants_after_ignore
|
||||
), "Ignored tenant should not be reloaded after pageserver restart"
|
||||
|
||||
# now, load it from the local files and expect it works
|
||||
pageserver_http.tenant_load(tenant_id=ignored_tenant_id)
|
||||
wait_until_tenant_status(pageserver_http, ignored_tenant_id, "Active", 5)
|
||||
|
||||
tenants_after_attach = [tenant["id"] for tenant in pageserver_http.tenant_list()]
|
||||
tenants_after_attach.sort()
|
||||
assert tenants_after_attach == tenants_before_ignore, "Should have all tenants back"
|
||||
|
||||
timelines_after_ignore = [
|
||||
timeline["timeline_id"]
|
||||
for timeline in pageserver_http.timeline_list(tenant_id=ignored_tenant_id)
|
||||
]
|
||||
assert timelines_before_ignore == timelines_after_ignore, "Should have all timelines back"
|
||||
|
||||
|
||||
# Tests that it's possible to `load` tenants with missing layers and get them restored:
|
||||
# * writes some data into tenant's timeline
|
||||
# * ensures it's synced with the remote storage
|
||||
# * `ignore` the tenant
|
||||
# * removes all timeline's local layers
|
||||
# * `load` the same tenant
|
||||
# * ensure that it's status is `Active`
|
||||
# * check that timeline data is restored
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_ignored_tenant_download_missing_layers(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_ignored_tenant_download_and_attach",
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
pg = env.postgres.create_start("main")
|
||||
|
||||
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
data_id = 1
|
||||
data_secret = "very secret secret"
|
||||
insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, pg)
|
||||
|
||||
tenants_before_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()]
|
||||
tenants_before_ignore.sort()
|
||||
timelines_before_ignore = [
|
||||
timeline["timeline_id"] for timeline in pageserver_http.timeline_list(tenant_id=tenant_id)
|
||||
]
|
||||
|
||||
# ignore the tenant and remove its layers
|
||||
pageserver_http.tenant_ignore(tenant_id)
|
||||
tenant_timeline_dir = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
layers_removed = False
|
||||
for dir_entry in tenant_timeline_dir.iterdir():
|
||||
if dir_entry.name.startswith("00000"):
|
||||
# Looks like a layer file. Remove it
|
||||
dir_entry.unlink()
|
||||
layers_removed = True
|
||||
assert layers_removed, f"Found no layers for tenant {tenant_timeline_dir}"
|
||||
|
||||
# now, load it from the local files and expect it to work due to remote storage restoration
|
||||
pageserver_http.tenant_load(tenant_id=tenant_id)
|
||||
wait_until_tenant_status(pageserver_http, tenant_id, "Active", 5)
|
||||
|
||||
tenants_after_attach = [tenant["id"] for tenant in pageserver_http.tenant_list()]
|
||||
tenants_after_attach.sort()
|
||||
assert tenants_after_attach == tenants_before_ignore, "Should have all tenants back"
|
||||
|
||||
timelines_after_ignore = [
|
||||
timeline["timeline_id"] for timeline in pageserver_http.timeline_list(tenant_id=tenant_id)
|
||||
]
|
||||
assert timelines_before_ignore == timelines_after_ignore, "Should have all timelines back"
|
||||
|
||||
pg.stop()
|
||||
pg.start()
|
||||
ensure_test_data(data_id, data_secret, pg)
|
||||
|
||||
|
||||
# Tests that it's possible to `load` broken tenants:
|
||||
# * `ignore` a tenant
|
||||
# * removes its `metadata` file locally
|
||||
# * `load` the same tenant
|
||||
# * ensure that it's status is `Broken`
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_ignored_tenant_stays_broken_without_metadata(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_ignored_tenant_stays_broken_without_metadata",
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
pg = env.postgres.create_start("main")
|
||||
|
||||
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
# ignore the tenant and remove its metadata
|
||||
pageserver_http.tenant_ignore(tenant_id)
|
||||
tenant_timeline_dir = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
metadata_removed = False
|
||||
for dir_entry in tenant_timeline_dir.iterdir():
|
||||
if dir_entry.name == "metadata":
|
||||
# Looks like a layer file. Remove it
|
||||
dir_entry.unlink()
|
||||
metadata_removed = True
|
||||
assert metadata_removed, f"Failed to find metadata file in {tenant_timeline_dir}"
|
||||
|
||||
env.pageserver.allowed_errors.append(".*could not load tenant .*?: failed to load metadata.*")
|
||||
|
||||
# now, load it from the local files and expect it to be broken due to inability to load tenant files into memory
|
||||
pageserver_http.tenant_load(tenant_id=tenant_id)
|
||||
wait_until_tenant_status(pageserver_http, tenant_id, "Broken", 5)
|
||||
|
||||
|
||||
# Tests that attach is never working on a tenant, ignored or not, as long as it's not absent locally
|
||||
# Similarly, tests that it's not possible to schedule a `load` for tenat that's not ignored.
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_load_attach_negatives(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_load_attach_negatives",
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
pg = env.postgres.create_start("main")
|
||||
|
||||
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
|
||||
|
||||
env.pageserver.allowed_errors.append(".*tenant .*? already exists, state:.*")
|
||||
with pytest.raises(
|
||||
expected_exception=PageserverApiException,
|
||||
match=f"tenant {tenant_id} already exists, state: Active",
|
||||
):
|
||||
pageserver_http.tenant_load(tenant_id)
|
||||
|
||||
with pytest.raises(
|
||||
expected_exception=PageserverApiException,
|
||||
match=f"tenant {tenant_id} already exists, state: Active",
|
||||
):
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
|
||||
pageserver_http.tenant_ignore(tenant_id)
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Cannot attach tenant .*?, local tenant directory already exists.*"
|
||||
)
|
||||
with pytest.raises(
|
||||
expected_exception=PageserverApiException,
|
||||
match=f"Cannot attach tenant {tenant_id}, local tenant directory already exists",
|
||||
):
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_ignore_while_attaching(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_ignore_while_attaching",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
pg = env.postgres.create_start("main")
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
data_id = 1
|
||||
data_secret = "very secret secret"
|
||||
insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, pg)
|
||||
|
||||
tenants_before_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()]
|
||||
|
||||
# Detach it
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
# And re-attach, but stop attach task_mgr task from completing
|
||||
pageserver_http.configure_failpoints([("attach-before-activate", "return(5000)")])
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
# Run ignore on the task, thereby cancelling the attach.
|
||||
# XXX This should take priority over attach, i.e., it should cancel the attach task.
|
||||
# But neither the failpoint, nor the proper storage_sync2 download functions,
|
||||
# are sensitive to task_mgr::shutdown.
|
||||
# This problem is tracked in https://github.com/neondatabase/neon/issues/2996 .
|
||||
# So, for now, effectively, this ignore here will block until attach task completes.
|
||||
pageserver_http.tenant_ignore(tenant_id)
|
||||
|
||||
# Cannot attach it due to some local files existing
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Cannot attach tenant .*?, local tenant directory already exists.*"
|
||||
)
|
||||
with pytest.raises(
|
||||
expected_exception=PageserverApiException,
|
||||
match=f"Cannot attach tenant {tenant_id}, local tenant directory already exists",
|
||||
):
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
|
||||
tenants_after_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()]
|
||||
assert tenant_id not in tenants_after_ignore, "Ignored tenant should be missing"
|
||||
assert len(tenants_after_ignore) + 1 == len(
|
||||
tenants_before_ignore
|
||||
), "Only ignored tenant should be missing"
|
||||
|
||||
# But can load it from local files, that will restore attach.
|
||||
pageserver_http.tenant_load(tenant_id)
|
||||
|
||||
wait_until_tenant_status(pageserver_http, tenant_id, "Active", 5)
|
||||
|
||||
pg.stop()
|
||||
pg.start()
|
||||
ensure_test_data(data_id, data_secret, pg)
|
||||
|
||||
|
||||
def insert_test_data(
|
||||
pageserver_http: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
data_id: int,
|
||||
data: str,
|
||||
pg: Postgres,
|
||||
):
|
||||
with pg.cursor() as cur:
|
||||
cur.execute(
|
||||
f"""
|
||||
CREATE TABLE test(id int primary key, secret text);
|
||||
INSERT INTO test VALUES ({data_id}, '{data}');
|
||||
"""
|
||||
)
|
||||
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
|
||||
# wait until pageserver receives that data
|
||||
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, current_lsn)
|
||||
|
||||
# run checkpoint manually to be sure that data landed in remote storage
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# wait until pageserver successfully uploaded a checkpoint to remote storage
|
||||
log.info("waiting for to be ignored tenant data checkpoint upload")
|
||||
wait_for_upload(pageserver_http, tenant_id, timeline_id, current_lsn)
|
||||
|
||||
|
||||
def ensure_test_data(data_id: int, data: str, pg: Postgres):
|
||||
with pg.cursor() as cur:
|
||||
assert (
|
||||
query_scalar(cur, f"SELECT secret FROM test WHERE id = {data_id};") == data
|
||||
), "Should have timeline data back"
|
||||
|
||||
|
||||
# Does not use `wait_until` for debugging purposes
|
||||
def wait_until_tenant_status(
|
||||
pageserver_http: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
expected_status: str,
|
||||
iterations: int,
|
||||
) -> bool:
|
||||
for _ in range(iterations):
|
||||
try:
|
||||
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
|
||||
log.debug(f"Tenant {tenant_id} status: {tenant}")
|
||||
if tenant["state"] == expected_status:
|
||||
return True
|
||||
except Exception as e:
|
||||
log.debug(f"Tenant {tenant_id} status retrieval failure: {e}")
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
raise Exception(f"Tenant {tenant_id} did not become {expected_status} in {iterations} seconds")
|
||||
|
||||
Reference in New Issue
Block a user