Compare commits

..

2 Commits

Author SHA1 Message Date
Christian Schwarz
01898af391 clippy 2023-11-24 09:15:40 +00:00
Christian Schwarz
85f0867db8 failure during timeline creation: always clean up timeline dir
Context: https://app.incident.io/neondb/incidents/103

Before this change, there's the following race condition if the tenant
gets deleted during timeline creation:

TBD, copy from https://neondb.slack.com/archives/C066ZFAJU85/p1700751858049319

The root of the problem is that we bail out of timeline creation without
cleaning up the uninit marker or timeline directory.

This PR changes the code to do that, and with our new policy for
infallible local IO, it also takes the opportunity to clean stuff up
around `TimelineUninitMark` & `UninitializedTimeline`.

I also added a missing fsync of the common parent directory between
timelines_dir removal and uninit marker removal.

We could probably get rid of the entire uninit mark idea, as
we no longer treat local FS state as the source of truth, and we only
upload to remote storage after successful creation (right?).
2023-11-24 09:12:17 +00:00
23 changed files with 768 additions and 602 deletions

1
Cargo.lock generated
View File

@@ -1126,7 +1126,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-compression",
"bytes",
"cfg-if",
"chrono",
"clap",

View File

@@ -38,4 +38,3 @@ toml_edit.workspace = true
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
vm_monitor = { version = "0.1", path = "../libs/vm_monitor/" }
zstd = "0.12.4"
bytes = "1.0"

View File

@@ -31,7 +31,7 @@
//! -C 'postgresql://cloud_admin@localhost/postgres' \
//! -S /var/db/postgres/specs/current.json \
//! -b /usr/local/bin/postgres \
//! -r http://pg-ext-s3-gateway
//! -r {"bucket": "neon-dev-extensions-eu-central-1", "region": "eu-central-1"}
//! ```
//!
use std::collections::HashMap;
@@ -51,7 +51,7 @@ use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::get_pg_version;
use compute_tools::extension_server::{get_pg_version, init_remote_storage};
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
@@ -60,7 +60,7 @@ use compute_tools::spec::*;
// this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";
const BUILD_TAG_DEFAULT: &str = "5670669815";
fn main() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
@@ -74,18 +74,10 @@ fn main() -> Result<()> {
let pgbin_default = String::from("postgres");
let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
let ext_remote_storage = matches
.get_one::<String>("remote-ext-config")
// Compatibility hack: if the control plane specified any remote-ext-config
// use the default value for extension storage proxy gateway.
// Remove this once the control plane is updated to pass the gateway URL
.map(|conf| {
if conf.starts_with("http") {
conf.trim_end_matches('/')
} else {
"http://pg-ext-s3-gateway"
}
});
let remote_ext_config = matches.get_one::<String>("remote-ext-config");
let ext_remote_storage = remote_ext_config.map(|x| {
init_remote_storage(x).expect("cannot initialize remote extension storage from config")
});
let http_port = *matches
.get_one::<u16>("http-port")
@@ -206,7 +198,7 @@ fn main() -> Result<()> {
live_config_allowed,
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_remote_storage: ext_remote_storage.map(|s| s.to_string()),
ext_remote_storage,
ext_download_progress: RwLock::new(HashMap::new()),
build_tag,
};

View File

@@ -25,7 +25,7 @@ use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeMode, ComputeSpec};
use utils::measured_stream::MeasuredReader;
use remote_storage::{DownloadError, RemotePath};
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
use crate::checker::create_availability_check_data;
use crate::pg_helpers::*;
@@ -59,8 +59,8 @@ pub struct ComputeNode {
pub state: Mutex<ComputeState>,
/// `Condvar` to allow notifying waiters about state changes.
pub state_changed: Condvar,
/// the address of extension storage proxy gateway
pub ext_remote_storage: Option<String>,
/// the S3 bucket that we search for extensions in
pub ext_remote_storage: Option<GenericRemoteStorage>,
// key: ext_archive_name, value: started download time, download_completed?
pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
pub build_tag: String,
@@ -693,12 +693,13 @@ impl ComputeNode {
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
create_neon_superuser(spec, &mut client)?;
cleanup_instance(&mut client)?;
handle_extension_neon(self.connstr.as_str())?;
handle_roles(spec, &mut client)?;
handle_databases(spec, &mut client)?;
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
handle_grants(spec, &mut client, self.connstr.as_str())?;
handle_extensions(spec, &mut client)?;
handle_extension_neon(&mut client)?;
handle_alter_extension_neon(spec, &mut client, self.connstr.as_str())?;
create_availability_check_data(&mut client)?;
// 'Close' connection
@@ -738,12 +739,13 @@ impl ComputeNode {
if spec.mode == ComputeMode::Primary {
client.simple_query("SET neon.forward_ddl = false")?;
cleanup_instance(&mut client)?;
handle_extension_neon(self.connstr.as_str())?;
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, &mut client, self.connstr.as_str())?;
handle_extensions(&spec, &mut client)?;
handle_extension_neon(&mut client)?;
handle_alter_extension_neon(&spec, &mut client, self.connstr.as_str())?;
}
// 'Close' connection
@@ -957,12 +959,12 @@ LIMIT 100",
real_ext_name: String,
ext_path: RemotePath,
) -> Result<u64, DownloadError> {
let ext_remote_storage =
self.ext_remote_storage
.as_ref()
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
"Remote extensions storage is not configured",
)))?;
let remote_storage = self
.ext_remote_storage
.as_ref()
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
"Remote extensions storage is not configured",
)))?;
let ext_archive_name = ext_path.object_name().expect("bad path");
@@ -1018,7 +1020,7 @@ LIMIT 100",
let download_size = extension_server::download_extension(
&real_ext_name,
&ext_path,
ext_remote_storage,
remote_storage,
&self.pgbin,
)
.await

View File

@@ -71,16 +71,18 @@ More specifically, here is an example ext_index.json
}
}
*/
use anyhow::Context;
use anyhow::{self, Result};
use anyhow::{bail, Context};
use bytes::Bytes;
use compute_api::spec::RemoteExtSpec;
use regex::Regex;
use remote_storage::*;
use reqwest::StatusCode;
use serde_json;
use std::io::Read;
use std::num::NonZeroUsize;
use std::path::Path;
use std::str;
use tar::Archive;
use tokio::io::AsyncReadExt;
use tracing::info;
use tracing::log::warn;
use zstd::stream::read::Decoder;
@@ -136,31 +138,23 @@ fn parse_pg_version(human_version: &str) -> &str {
pub async fn download_extension(
ext_name: &str,
ext_path: &RemotePath,
ext_remote_storage: &str,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
) -> Result<u64> {
info!("Download extension {:?} from {:?}", ext_name, ext_path);
// TODO add retry logic
let download_buffer =
match download_extension_tar(ext_remote_storage, &ext_path.to_string()).await {
Ok(buffer) => buffer,
Err(error_message) => {
return Err(anyhow::anyhow!(
"error downloading extension {:?}: {:?}",
ext_name,
error_message
));
}
};
let mut download = remote_storage.download(ext_path).await?;
let mut download_buffer = Vec::new();
download
.download_stream
.read_to_end(&mut download_buffer)
.await?;
let download_size = download_buffer.len() as u64;
info!("Download size {:?}", download_size);
// it's unclear whether it is more performant to decompress into memory or not
// TODO: decompressing into memory can be avoided
let decoder = Decoder::new(download_buffer.as_ref())?;
let mut archive = Archive::new(decoder);
let mut decoder = Decoder::new(download_buffer.as_slice())?;
let mut decompress_buffer = Vec::new();
decoder.read_to_end(&mut decompress_buffer)?;
let mut archive = Archive::new(decompress_buffer.as_slice());
let unzip_dest = pgbin
.strip_suffix("/bin/postgres")
.expect("bad pgbin")
@@ -228,32 +222,29 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
}
}
// Do request to extension storage proxy, i.e.
// curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
// using HHTP GET
// and return the response body as bytes
//
async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
let uri = format!("{}/{}", ext_remote_storage, ext_path);
info!("Download extension {:?} from uri {:?}", ext_path, uri);
let resp = reqwest::get(uri).await?;
match resp.status() {
StatusCode::OK => match resp.bytes().await {
Ok(resp) => {
info!("Download extension {:?} completed successfully", ext_path);
Ok(resp)
}
Err(e) => bail!("could not deserialize remote extension response: {}", e),
},
StatusCode::SERVICE_UNAVAILABLE => bail!("remote extension is temporarily unavailable"),
_ => bail!(
"unexpected remote extension response status code: {}",
resp.status()
),
// This function initializes the necessary structs to use remote storage
pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result<GenericRemoteStorage> {
#[derive(Debug, serde::Deserialize)]
struct RemoteExtJson {
bucket: String,
region: String,
endpoint: Option<String>,
prefix: Option<String>,
}
let remote_ext_json = serde_json::from_str::<RemoteExtJson>(remote_ext_config)?;
let config = S3Config {
bucket_name: remote_ext_json.bucket,
bucket_region: remote_ext_json.region,
prefix_in_bucket: remote_ext_json.prefix,
endpoint: remote_ext_json.endpoint,
concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"),
max_keys_per_list_response: None,
};
let config = RemoteStorageConfig {
storage: RemoteStorageKind::AwsS3(config),
};
GenericRemoteStorage::from_config(&config)
}
#[cfg(test)]

View File

@@ -123,7 +123,7 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
// download extension files from remote extension storage on demand
// download extension files from S3 on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
info!("req.uri {:?}", req.uri());

View File

@@ -675,29 +675,78 @@ pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()>
Ok(())
}
/// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database
#[instrument(skip_all)]
pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
info!("handle extension neon");
/// connect to template1 and postgres to create neon extension
/// that will be available in all databases
pub fn handle_extension_neon(connstr: &str) -> Result<()> {
for dbname in ["template1", "postgres"].iter() {
let mut conf = Config::from_str(connstr)?;
conf.dbname(dbname);
let mut template1_client = conf.connect(NoTls)?;
let mut query = "CREATE SCHEMA IF NOT EXISTS neon";
client.simple_query(query)?;
query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon";
info!("create neon extension with query: {}", query);
client.simple_query(query)?;
query = "ALTER EXTENSION neon SET SCHEMA neon";
info!("alter neon extension schema with query: {}", query);
client.simple_query(query)?;
// this will be a no-op if extension is already up to date,
// which may happen in two cases:
// - extension was just installed
// - extension was already installed and is up to date
let query = "ALTER EXTENSION neon UPDATE";
info!("update neon extension schema with query: {}", query);
client.simple_query(query)?;
let create_extension_neon_query = "CREATE EXTENSION IF NOT EXISTS neon";
info!(
"creating neon extension with query: {} in db {}",
create_extension_neon_query, dbname
);
template1_client.simple_query(create_extension_neon_query)?;
}
Ok(())
}
/// Run ALTER EXTENSION neon UPDATE for each valid database
#[instrument(skip_all)]
pub fn handle_alter_extension_neon(
spec: &ComputeSpec,
client: &mut Client,
connstr: &str,
) -> Result<()> {
info!("modifying database permissions");
let existing_dbs = get_existing_dbs(client)?;
// We'd better do this at db creation time, but we don't always know when it happens.
for db in &spec.cluster.databases {
match existing_dbs.get(&db.name) {
Some(pg_db) => {
if pg_db.restrict_conn || pg_db.invalid {
info!(
"skipping grants for db {} (invalid: {}, connections not allowed: {})",
db.name, pg_db.invalid, pg_db.restrict_conn
);
continue;
}
}
None => {
bail!(
"database {} doesn't exist in Postgres after handle_databases()",
db.name
);
}
}
let mut conf = Config::from_str(connstr)?;
conf.dbname(&db.name);
let mut db_client = conf.connect(NoTls)?;
// this will be a no-op if extension is already up to date,
// which may happen in two cases:
// - extension was just installed
// - extension was already installed and is up to date
let create_extension_neon_query = "CREATE EXTENSION IF NOT EXISTS neon";
info!(
"create extension neon query for db {} : {}",
&db.name, &create_extension_neon_query
);
db_client.simple_query(create_extension_neon_query)?;
let alter_extension_neon_query = "ALTER EXTENSION neon UPDATE";
info!(
"alter extension neon query for db {} : {}",
&db.name, &alter_extension_neon_query
);
db_client.simple_query(alter_extension_neon_query)?;
}
Ok(())
}

View File

@@ -1252,7 +1252,7 @@ fn cli() -> Command {
let remote_ext_config_args = Arg::new("remote-ext-config")
.long("remote-ext-config")
.num_args(1)
.help("Configure the remote extensions storage proxy gateway to request for extensions.")
.help("Configure the S3 bucket that we search for extensions in.")
.required(false);
let lsn_arg = Arg::new("lsn")

View File

@@ -45,7 +45,6 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use compute_api::spec::RemoteExtSpec;
use serde::{Deserialize, Serialize};
use utils::id::{NodeId, TenantId, TimelineId};
@@ -477,18 +476,6 @@ impl Endpoint {
}
}
// check for file remote_extensions_spec.json
// if it is present, read it and pass to compute_ctl
let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
let remote_extensions_spec = std::fs::File::open(remote_extensions_spec_path);
let remote_extensions: Option<RemoteExtSpec>;
if let Ok(spec_file) = remote_extensions_spec {
remote_extensions = serde_json::from_reader(spec_file).ok();
} else {
remote_extensions = None;
};
// Create spec file
let spec = ComputeSpec {
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
@@ -510,7 +497,7 @@ impl Endpoint {
pageserver_connstring: Some(pageserver_connstring),
safekeeper_connstrings,
storage_auth_token: auth_token.clone(),
remote_extensions,
remote_extensions: None,
};
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;

View File

@@ -280,6 +280,7 @@ impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
use crate::tenant::delete::DeleteTenantError::*;
match value {
Get(g) => ApiError::from(g),
e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
Timeline(t) => ApiError::from(t),
NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()),
SlotError(e) => e.into(),
@@ -1688,24 +1689,8 @@ where
let token_cloned = token.clone();
let result = handler(r, token).await;
if token_cloned.is_cancelled() {
// dropguard has executed: we will never turn this result into response.
//
// at least temporarily do {:?} logging; these failures are rare enough but
// could hide difficult errors.
match &result {
Ok(response) => {
let status = response.status();
info!(%status, "Cancelled request finished successfully")
}
Err(e) => error!("Cancelled request finished with an error: {e:?}"),
}
info!("Cancelled request finished");
}
// only logging for cancelled panicked request handlers is the tracing_panic_hook,
// which should suffice.
//
// there is still a chance to lose the result due to race between
// returning from here and the actual connection closing happening
// before outer task gets to execute. leaving that up for #5815.
result
}
.in_current_span(),

View File

@@ -86,7 +86,6 @@ use crate::tenant::storage_layer::ImageLayer;
use crate::InitializationOrder;
use crate::tenant::timeline::delete::DeleteTimelineFlow;
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
use crate::virtual_file::VirtualFile;
use crate::walredo::PostgresRedoManager;
use crate::TEMP_FILE_SUFFIX;
@@ -257,6 +256,8 @@ pub struct Tenant {
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
// Cancellation token fires when we have entered shutdown(). This is a parent of
// Timelines' cancellation token.
pub(crate) cancel: CancellationToken,
@@ -632,9 +633,9 @@ impl Tenant {
}
};
info!("pending_deletion {}", pending_deletion);
info!("pending_deletion {}", pending_deletion.is_some());
if pending_deletion {
if let Some(deletion) = pending_deletion {
// as we are no longer loading, signal completion by dropping
// the completion while we resume deletion
drop(_completion);
@@ -651,6 +652,7 @@ impl Tenant {
}
match DeleteTenantFlow::resume_from_attach(
deletion,
&tenant_clone,
preload,
tenants,
@@ -730,7 +732,7 @@ impl Tenant {
///
async fn attach(
self: &Arc<Tenant>,
init_order: Option<InitializationOrder>,
mut init_order: Option<InitializationOrder>,
preload: Option<TenantPreload>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -747,6 +749,11 @@ impl Tenant {
}
};
// Signal that we have completed remote phase
init_order
.as_mut()
.and_then(|x| x.initial_tenant_load_remote.take());
let mut timelines_to_resume_deletions = vec![];
let mut remote_index_and_client = HashMap::new();
@@ -1850,7 +1857,6 @@ impl Tenant {
});
})
};
// test_long_timeline_create_then_tenant_delete is leaning on this message
tracing::info!("Waiting for timelines...");
while let Some(res) = js.join_next().await {
match res {
@@ -2369,6 +2375,7 @@ impl Tenant {
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
cancel: CancellationToken::default(),
gate: Gate::new(format!("Tenant<{tenant_id}>")),
}
@@ -3083,7 +3090,7 @@ impl Tenant {
.await
{
error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
cleanup_timeline_directory(uninit_mark);
drop(uninit_mark); // does the cleanup
return Err(e);
}
@@ -3135,20 +3142,8 @@ impl Tenant {
"Timeline {timeline_path} already exists, cannot create its uninit mark file",
);
let uninit_mark_path = self
.conf
.timeline_uninit_mark_file_path(tenant_id, timeline_id);
fs::File::create(&uninit_mark_path)
.context("Failed to create uninit mark file")
.and_then(|_| {
crashsafe::fsync_file_and_parent(&uninit_mark_path)
.context("Failed to fsync uninit mark file")
})
.with_context(|| {
format!("Failed to crate uninit mark for timeline {tenant_id}/{timeline_id}")
})?;
let uninit_mark = TimelineUninitMark::new(uninit_mark_path, timeline_path);
let uninit_mark = TimelineUninitMark::new(self.conf, tenant_id, timeline_id)
.context("create uninit mark")?;
Ok(uninit_mark)
}

View File

@@ -4,6 +4,7 @@ use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::models::TenantState;
use remote_storage::{GenericRemoteStorage, RemotePath};
use tokio::sync::OwnedMutexGuard;
use tokio_util::sync::CancellationToken;
use tracing::{error, instrument, warn, Instrument, Span};
@@ -38,6 +39,9 @@ pub(crate) enum DeleteTenantError {
#[error("Invalid state {0}. Expected Active or Broken")]
InvalidState(TenantState),
#[error("Tenant deletion is already in progress")]
AlreadyInProgress,
#[error("Tenant map slot error {0}")]
SlotError(#[from] TenantSlotError),
@@ -51,6 +55,8 @@ pub(crate) enum DeleteTenantError {
Other(#[from] anyhow::Error),
}
type DeletionGuard = tokio::sync::OwnedMutexGuard<DeleteTenantFlow>;
fn remote_tenant_delete_mark_path(
conf: &PageServerConf,
tenant_id: &TenantId,
@@ -281,14 +287,14 @@ impl DeleteTenantFlow {
) -> Result<(), DeleteTenantError> {
span::debug_assert_current_span_has_tenant_id();
Self::prepare(&tenant).await?;
let mut guard = Self::prepare(&tenant).await?;
if let Err(e) = Self::run_inner(conf, remote_storage.as_ref(), &tenant).await {
if let Err(e) = Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant).await {
tenant.set_broken(format!("{e:#}")).await;
return Err(e);
}
Self::schedule_background(conf, remote_storage, tenants, tenant);
Self::schedule_background(guard, conf, remote_storage, tenants, tenant);
Ok(())
}
@@ -298,10 +304,13 @@ impl DeleteTenantFlow {
// will result in an error, but here we need to be able to retry shutdown when tenant deletion is retried.
// So the solution is to set tenant state to broken.
async fn run_inner(
guard: &mut OwnedMutexGuard<Self>,
conf: &'static PageServerConf,
remote_storage: Option<&GenericRemoteStorage>,
tenant: &Tenant,
) -> Result<(), DeleteTenantError> {
guard.mark_in_progress()?;
fail::fail_point!("tenant-delete-before-create-remote-mark", |_| {
Err(anyhow::anyhow!(
"failpoint: tenant-delete-before-create-remote-mark"
@@ -336,25 +345,46 @@ impl DeleteTenantFlow {
Ok(())
}
fn mark_in_progress(&mut self) -> anyhow::Result<()> {
match self {
Self::Finished => anyhow::bail!("Bug. Is in finished state"),
Self::InProgress { .. } => { /* We're in a retry */ }
Self::NotStarted => { /* Fresh start */ }
}
*self = Self::InProgress;
Ok(())
}
pub(crate) async fn should_resume_deletion(
conf: &'static PageServerConf,
remote_mark_exists: bool,
tenant: &Tenant,
) -> Result<bool, DeleteTenantError> {
) -> Result<Option<DeletionGuard>, DeleteTenantError> {
let acquire = |t: &Tenant| {
Some(
Arc::clone(&t.delete_progress)
.try_lock_owned()
.expect("we're the only owner during init"),
)
};
if remote_mark_exists {
return Ok(true);
return Ok(acquire(tenant));
}
let tenant_id = tenant.tenant_id;
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
Ok(true)
Ok(acquire(tenant))
} else {
Ok(false)
Ok(None)
}
}
pub(crate) async fn resume_from_attach(
guard: DeletionGuard,
tenant: &Arc<Tenant>,
preload: Option<TenantPreload>,
tenants: &'static std::sync::RwLock<TenantsMap>,
@@ -373,10 +403,19 @@ impl DeleteTenantFlow {
.await
.context("attach")?;
Self::background(tenant.conf, tenant.remote_storage.clone(), tenants, tenant).await
Self::background(
guard,
tenant.conf,
tenant.remote_storage.clone(),
tenants,
tenant,
)
.await
}
async fn prepare(tenant: &Arc<Tenant>) -> Result<(), DeleteTenantError> {
async fn prepare(
tenant: &Arc<Tenant>,
) -> Result<tokio::sync::OwnedMutexGuard<Self>, DeleteTenantError> {
// FIXME: unsure about active only. Our init jobs may not be cancellable properly,
// so at least for now allow deletions only for active tenants. TODO recheck
// Broken and Stopping is needed for retries.
@@ -387,6 +426,10 @@ impl DeleteTenantFlow {
return Err(DeleteTenantError::InvalidState(tenant.current_state()));
}
let guard = Arc::clone(&tenant.delete_progress)
.try_lock_owned()
.map_err(|_| DeleteTenantError::AlreadyInProgress)?;
fail::fail_point!("tenant-delete-before-shutdown", |_| {
Err(anyhow::anyhow!("failpoint: tenant-delete-before-shutdown"))?
});
@@ -406,10 +449,11 @@ impl DeleteTenantFlow {
)));
}
Ok(())
Ok(guard)
}
fn schedule_background(
guard: OwnedMutexGuard<Self>,
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static std::sync::RwLock<TenantsMap>,
@@ -425,7 +469,9 @@ impl DeleteTenantFlow {
"tenant_delete",
false,
async move {
if let Err(err) = Self::background(conf, remote_storage, tenants, &tenant).await {
if let Err(err) =
Self::background(guard, conf, remote_storage, tenants, &tenant).await
{
error!("Error: {err:#}");
tenant.set_broken(format!("{err:#}")).await;
};
@@ -440,6 +486,7 @@ impl DeleteTenantFlow {
}
async fn background(
mut guard: OwnedMutexGuard<Self>,
conf: &PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static std::sync::RwLock<TenantsMap>,
@@ -503,6 +550,8 @@ impl DeleteTenantFlow {
.set(locked.len() as u64);
}
*guard = Self::Finished;
Ok(())
}
}

View File

@@ -2600,8 +2600,6 @@ impl Timeline {
)
};
pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
if self.cancel.is_cancelled() {
return Err(FlushLayerError::Cancelled);
}

View File

@@ -110,6 +110,35 @@ async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTi
Ok(())
}
// We delete local files first, so if pageserver restarts after local files deletion then remote deletion is not continued.
// This can be solved with inversion of these steps. But even if these steps are inverted then, when index_part.json
// gets deleted there is no way to distinguish between "this timeline is good, we just didnt upload it to remote"
// and "this timeline is deleted we should continue with removal of local state". So to avoid the ambiguity we use a mark file.
// After index part is deleted presence of this mark file indentifies that it was a deletion intention.
// So we can just remove the mark file.
async fn create_delete_mark(
conf: &PageServerConf,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<(), DeleteTimelineError> {
fail::fail_point!("timeline-delete-before-delete-mark", |_| {
Err(anyhow::anyhow!(
"failpoint: timeline-delete-before-delete-mark"
))?
});
let marker_path = conf.timeline_delete_mark_file_path(tenant_id, timeline_id);
// Note: we're ok to replace existing file.
let _ = std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(&marker_path)
.with_context(|| format!("could not create delete marker file {marker_path:?}"))?;
crashsafe::fsync_file_and_parent(&marker_path).context("sync_mark")?;
Ok(())
}
/// Grab the layer_removal_cs lock, and actually perform the deletion.
///
/// This lock prevents prevents GC or compaction from running at the same time.
@@ -282,8 +311,6 @@ async fn cleanup_remaining_timeline_fs_traces(
.context("fsync_pre_mark_remove")?;
// Remove delete mark
// TODO: once we are confident that no more exist in the field, remove this
// line. It cleans up a legacy marker file that might in rare cases be present.
tokio::fs::remove_file(conf.timeline_delete_mark_file_path(tenant_id, timeline_id))
.await
.or_else(fs_ext::ignore_not_found)
@@ -364,6 +391,8 @@ impl DeleteTimelineFlow {
set_deleted_in_remote_index(&timeline).await?;
create_delete_mark(tenant.conf, timeline.tenant_id, timeline.timeline_id).await?;
fail::fail_point!("timeline-delete-before-schedule", |_| {
Err(anyhow::anyhow!(
"failpoint: timeline-delete-before-schedule"
@@ -435,6 +464,10 @@ impl DeleteTimelineFlow {
guard.mark_in_progress()?;
// Note that delete mark can be missing on resume
// because we create delete mark after we set deleted_at in the index part.
create_delete_mark(tenant.conf, tenant.tenant_id, timeline_id).await?;
Self::schedule_background(guard, tenant.conf, tenant, timeline);
Ok(())

View File

@@ -2,10 +2,20 @@ use std::{collections::hash_map::Entry, fs, sync::Arc};
use anyhow::Context;
use camino::Utf8PathBuf;
use tracing::{error, info, info_span, warn};
use utils::{crashsafe, fs_ext, id::TimelineId, lsn::Lsn};
use tracing::{info, info_span, warn};
use utils::{
crashsafe,
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::{context::RequestContext, import_datadir, tenant::Tenant};
use crate::{
config::PageServerConf,
context::RequestContext,
import_datadir,
tenant::Tenant,
virtual_file::{on_fatal_io_error, MaybeFatalIo},
};
use super::Timeline;
@@ -45,20 +55,12 @@ impl<'t> UninitializedTimeline<'t> {
let timeline_id = self.timeline_id;
let tenant_id = self.owning_tenant.tenant_id;
if self.raw_timeline.is_none() {
return Err(anyhow::anyhow!(
"No timeline for initialization found for {tenant_id}/{timeline_id}"
));
}
let (new_timeline, uninit_mark) = self.raw_timeline.take().with_context(|| {
format!("No timeline for initalization found for {tenant_id}/{timeline_id}")
})?;
// Check that the caller initialized disk_consistent_lsn
let new_disk_consistent_lsn = self
.raw_timeline
.as_ref()
.expect("checked above")
.0
.get_disk_consistent_lsn();
let new_disk_consistent_lsn = new_timeline.get_disk_consistent_lsn();
anyhow::ensure!(
new_disk_consistent_lsn.is_valid(),
"new timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
@@ -70,25 +72,13 @@ impl<'t> UninitializedTimeline<'t> {
"Found freshly initialized timeline {tenant_id}/{timeline_id} in the tenant map"
),
Entry::Vacant(v) => {
// after taking here should be no fallible operations, because the drop guard will not
// cleanup after and would block for example the tenant deletion
let (new_timeline, uninit_mark) =
self.raw_timeline.take().expect("already checked");
// this is the mutual exclusion between different retries to create the timeline;
// this should be an assertion.
uninit_mark.remove_uninit_mark().with_context(|| {
format!(
"Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}"
)
})?;
uninit_mark.remove_uninit_mark();
v.insert(Arc::clone(&new_timeline));
new_timeline.maybe_spawn_flush_loop();
Ok(new_timeline)
}
}
Ok(new_timeline)
}
/// Prepares timeline data by loading it from the basebackup archive.
@@ -141,29 +131,6 @@ impl<'t> UninitializedTimeline<'t> {
}
}
impl Drop for UninitializedTimeline<'_> {
fn drop(&mut self) {
if let Some((_, uninit_mark)) = self.raw_timeline.take() {
let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_id, timeline_id = %self.timeline_id).entered();
error!("Timeline got dropped without initializing, cleaning its files");
cleanup_timeline_directory(uninit_mark);
}
}
}
pub(crate) fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) {
let timeline_path = &uninit_mark.timeline_path;
match fs_ext::ignore_absent_files(|| fs::remove_dir_all(timeline_path)) {
Ok(()) => {
info!("Timeline dir {timeline_path:?} removed successfully, removing the uninit mark")
}
Err(e) => {
error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
}
}
drop(uninit_mark); // mark handles its deletion on drop, gets retained if timeline dir exists
}
/// An uninit mark file, created along the timeline dir to ensure the timeline either gets fully initialized and loaded into pageserver's memory,
/// or gets removed eventually.
///
@@ -173,58 +140,133 @@ pub(crate) struct TimelineUninitMark {
uninit_mark_deleted: bool,
uninit_mark_path: Utf8PathBuf,
pub(crate) timeline_path: Utf8PathBuf,
common_parent: Utf8PathBuf,
}
impl TimelineUninitMark {
pub(crate) fn new(uninit_mark_path: Utf8PathBuf, timeline_path: Utf8PathBuf) -> Self {
Self {
pub(crate) fn new(
conf: &'static PageServerConf,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<Self> {
let timeline_path = conf.timeline_path(&tenant_id, &timeline_id);
let uninit_mark_path = conf.timeline_uninit_mark_file_path(tenant_id, timeline_id);
// assert they share the same parent
let timeline_parent_path = timeline_path
.parent()
.expect("timeline_path must have a parent");
let uninit_mark_parent_path = uninit_mark_path
.parent()
.expect("uninit mark path must have a parent");
assert_eq!(timeline_parent_path, uninit_mark_parent_path);
let common_parent = uninit_mark_parent_path;
// crate the uninit file
let _ = fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&uninit_mark_path)
.context("create uninit mark file")?;
crashsafe::fsync_file_and_parent(common_parent).context("fsync uninit mark file")?;
Ok(Self {
uninit_mark_deleted: false,
common_parent: common_parent.to_owned(),
uninit_mark_path,
timeline_path,
}
})
}
fn remove_uninit_mark(mut self) -> anyhow::Result<()> {
if !self.uninit_mark_deleted {
self.delete_mark_file_if_present()?;
}
fn remove_uninit_mark(mut self) {
// remove the uninit mark
fs::remove_file(&self.uninit_mark_path).fatal_err(&format!(
"TimelineUninitMark::drop: remove_file uninit mark: {}",
self.uninit_mark_path
));
Ok(())
}
// fsync to persist the removal
crashsafe::fsync(&self.common_parent).fatal_err(&format!(
"TimelineUninitMark::drop: fsync common parent dir: {}",
self.common_parent
));
fn delete_mark_file_if_present(&mut self) -> anyhow::Result<()> {
let uninit_mark_file = &self.uninit_mark_path;
let uninit_mark_parent = uninit_mark_file
.parent()
.with_context(|| format!("Uninit mark file {uninit_mark_file:?} has no parent"))?;
fs_ext::ignore_absent_files(|| fs::remove_file(uninit_mark_file)).with_context(|| {
format!("Failed to remove uninit mark file at path {uninit_mark_file:?}")
})?;
crashsafe::fsync(uninit_mark_parent).context("Failed to fsync uninit mark parent")?;
self.uninit_mark_deleted = true;
Ok(())
}
}
impl Drop for TimelineUninitMark {
fn drop(&mut self) {
if !self.uninit_mark_deleted {
if self.timeline_path.exists() {
error!(
"Uninit mark {} is not removed, timeline {} stays uninitialized",
self.uninit_mark_path, self.timeline_path
)
} else {
// unblock later timeline creation attempts
warn!(
"Removing intermediate uninit mark file {}",
// unblock later timeline creation attempts
let _entered =
info_span!("TimelineUninitMark_drop", timeline_path=%self.timeline_path).entered();
warn!("removing timeline dir and uninit mark file");
// sanity-check: ensure the uninit mark file still exists on disk
let uninit_mark_file_exists = self.uninit_mark_path.try_exists().fatal_err(&format!(
"TimelineUninitMark::drop: stat() uninit mark file: {}",
self.uninit_mark_path
));
if !uninit_mark_file_exists {
panic!(
"uninit mark file assumed to exists but doesn't: {}",
self.uninit_mark_path
);
if let Err(e) = self.delete_mark_file_if_present() {
error!("Failed to remove the uninit mark file: {e}")
}
// recursively delete `timeline_path`, ignoring NotFound errors and aborting the process on all others.
match fs::remove_dir_all(&self.timeline_path) {
Ok(()) => {
info!("timeline dir removed successfully");
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// this can happen both if the timeline_path does not exist
// and if the timeline_path exists and there's another thread
// still operating on that directory and our remove_dir_all call
// effectively got hit by time-of-check vs time-of-use.
// Disambiguate by calling remove_dir against the timeline_path
match std::fs::remove_dir(&self.timeline_path) {
Ok(()) => {
warn!("retrying timeline dir removal succeeded after NotFound, this is indicative of a race condition");
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// this is the good case: the first NotFound was because the dir didn't exist
info!("timeline dir does not exist");
}
Err(e) => {
on_fatal_io_error(&e, &format!("TimelineUninitMark::drop: remove_dir_all failed with NotFound, then remove_dir failed: {}", self.timeline_path));
}
}
}
Err(e) => {
on_fatal_io_error(
&e,
&format!(
"TimelineUninitMark::drop: delete timeline directory: {:?}",
self.timeline_path
),
);
}
}
// fsync to order timelines_dir removal before unint mark removal
crashsafe::fsync(&self.common_parent).fatal_err(&format!(
"TimelineUninitMark::drop: fsync after timeline dir removal: {}",
self.common_parent,
));
// remove the uninit mark
fs::remove_file(&self.common_parent).fatal_err(&format!(
"TimelineUninitMark::drop: remove_file uninit mark: {}",
self.common_parent,
));
// fsync to persist the removal
crashsafe::fsync(&self.common_parent).fatal_err(&format!(
"TimelineUninitMark::drop: fsync common parent dir: {}",
self.common_parent,
));
}
}
}

View File

@@ -41,10 +41,6 @@ from urllib3.util.retry import Retry
from fixtures.broker import NeonBroker
from fixtures.log_helper import log
from fixtures.pageserver.allowed_errors import (
DEFAULT_PAGESERVER_ALLOWED_ERRORS,
scan_pageserver_log_for_errors,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.types import IndexPartDump
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
@@ -434,6 +430,8 @@ class NeonEnvBuilder:
# Pageserver remote storage
self.pageserver_remote_storage = pageserver_remote_storage
# Extensions remote storage
self.ext_remote_storage: Optional[S3Storage] = None
# Safekeepers remote storage
self.sk_remote_storage: Optional[RemoteStorage] = None
@@ -532,6 +530,24 @@ class NeonEnvBuilder:
)
self.pageserver_remote_storage = ret
def enable_extensions_remote_storage(self, kind: RemoteStorageKind):
assert self.ext_remote_storage is None, "already configured extensions remote storage"
# there is an assumption that REAL_S3 for extensions is never
# cleaned up these are also special in that they have a hardcoded
# bucket and region, which is most likely the same as our normal
ext = self._configure_and_create_remote_storage(
kind,
RemoteStorageUser.EXTENSIONS,
bucket_name="neon-dev-extensions-eu-central-1",
bucket_region="eu-central-1",
)
assert isinstance(
ext, S3Storage
), "unsure why, but only MOCK_S3 and REAL_S3 are currently supported for extensions"
ext.cleanup = False
self.ext_remote_storage = ext
def enable_safekeeper_remote_storage(self, kind: RemoteStorageKind):
assert self.sk_remote_storage is None, "sk_remote_storage already configured"
@@ -588,7 +604,8 @@ class NeonEnvBuilder:
directory_to_clean.rmdir()
def cleanup_remote_storage(self):
for x in [self.pageserver_remote_storage, self.sk_remote_storage]:
# extensions are currently not cleaned up, disabled when creating
for x in [self.pageserver_remote_storage, self.ext_remote_storage, self.sk_remote_storage]:
if isinstance(x, S3Storage):
x.do_cleanup()
@@ -692,6 +709,7 @@ class NeonEnv:
self.pageservers: List[NeonPageserver] = []
self.broker = config.broker
self.pageserver_remote_storage = config.pageserver_remote_storage
self.ext_remote_storage = config.ext_remote_storage
self.safekeepers_remote_storage = config.sk_remote_storage
self.pg_version = config.pg_version
# Binary path for pageserver, safekeeper, etc
@@ -1447,7 +1465,12 @@ class NeonCli(AbstractNeonCli):
if pageserver_id is not None:
args.extend(["--pageserver-id", str(pageserver_id)])
res = self.raw_cli(args)
storage = self.env.ext_remote_storage
s3_env_vars = None
if isinstance(storage, S3Storage):
s3_env_vars = storage.access_env_vars()
res = self.raw_cli(args, extra_env_vars=s3_env_vars)
res.check_returncode()
return res
@@ -1619,7 +1642,57 @@ class NeonPageserver(PgProtocol):
# env.pageserver.allowed_errors.append(".*could not open garage door.*")
#
# The entries in the list are regular experessions.
self.allowed_errors: List[str] = list(DEFAULT_PAGESERVER_ALLOWED_ERRORS)
self.allowed_errors = [
# All tests print these, when starting up or shutting down
".*wal receiver task finished with an error: walreceiver connection handling failure.*",
".*Shutdown task error: walreceiver connection handling failure.*",
".*wal_connection_manager.*tcp connect error: Connection refused.*",
".*query handler for .* failed: Socket IO error: Connection reset by peer.*",
".*serving compute connection task.*exited with error: Postgres connection error.*",
".*serving compute connection task.*exited with error: Connection reset by peer.*",
".*serving compute connection task.*exited with error: Postgres query error.*",
".*Connection aborted: error communicating with the server: Transport endpoint is not connected.*",
# FIXME: replication patch for tokio_postgres regards any but CopyDone/CopyData message in CopyBoth stream as unexpected
".*Connection aborted: unexpected message from server*",
".*kill_and_wait_impl.*: wait successful.*",
".*query handler for 'pagestream.*failed: Broken pipe.*", # pageserver notices compute shut down
".*query handler for 'pagestream.*failed: Connection reset by peer.*", # pageserver notices compute shut down
# safekeeper connection can fail with this, in the window between timeline creation
# and streaming start
".*Failed to process query for timeline .*: state uninitialized, no data to read.*",
# Tests related to authentication and authorization print these
".*Error processing HTTP request: Forbidden",
# intentional failpoints
".*failpoint ",
# FIXME: These need investigation
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
".*Removing intermediate uninit mark file.*",
# Tenant::delete_timeline() can cause any of the four following errors.
# FIXME: we shouldn't be considering it an error: https://github.com/neondatabase/neon/issues/2946
".*could not flush frozen layer.*queue is in state Stopped", # when schedule layer upload fails because queued got closed before compaction got killed
".*wait for layer upload ops to complete.*", # .*Caused by:.*wait_completion aborted because upload queue was stopped
".*gc_loop.*Gc failed, retrying in.*timeline is Stopping", # When gc checks timeline state after acquiring layer_removal_cs
".*gc_loop.*Gc failed, retrying in.*: Cannot run GC iteration on inactive tenant", # Tenant::gc precondition
".*compaction_loop.*Compaction failed.*, retrying in.*timeline or pageserver is shutting down", # When compaction checks timeline state after acquiring layer_removal_cs
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
".*task iteration took longer than the configured period.*",
# this is until #3501
".*Compaction failed.*, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
# these can happen anytime we do compactions from background task and shutdown pageserver
r".*ERROR.*ancestor timeline \S+ is being stopped",
# this is expected given our collaborative shutdown approach for the UploadQueue
".*Compaction failed.*, retrying in .*: Other\\(queue is in state Stopped.*",
# Pageserver timeline deletion should be polled until it gets 404, so ignore it globally
".*Error processing HTTP request: NotFound: Timeline .* was not found",
".*took more than expected to complete.*",
# these can happen during shutdown, but it should not be a reason to fail a test
".*completed, took longer than expected.*",
# AWS S3 may emit 500 errors for keys in a DeleteObjects response: we retry these
# and it is not a failure of our code when it happens.
".*DeleteObjects.*We encountered an internal error. Please try again.*",
]
def timeline_dir(self, tenant_id: TenantId, timeline_id: Optional[TimelineId] = None) -> Path:
"""Get a timeline directory's path based on the repo directory of the test environment"""
@@ -1729,9 +1802,27 @@ class NeonPageserver(PgProtocol):
def assert_no_errors(self):
logfile = open(os.path.join(self.workdir, "pageserver.log"), "r")
errors = scan_pageserver_log_for_errors(logfile, self.allowed_errors)
error_or_warn = re.compile(r"\s(ERROR|WARN)")
errors = []
while True:
line = logfile.readline()
if not line:
break
for _lineno, error in errors:
if error_or_warn.search(line):
# Is this a torn log line? This happens when force-killing a process and restarting
# Example: "2023-10-25T09:38:31.752314Z WARN deletion executo2023-10-25T09:38:31.875947Z INFO version: git-env:0f9452f76e8ccdfc88291bccb3f53e3016f40192"
if re.match("\\d{4}-\\d{2}-\\d{2}T.+\\d{4}-\\d{2}-\\d{2}T.+INFO version.+", line):
continue
# It's an ERROR or WARN. Is it in the allow-list?
for a in self.allowed_errors:
if re.match(a, line):
break
else:
errors.append(line)
for error in errors:
log.info(f"not allowed error: {error.strip()}")
assert not errors
@@ -2555,17 +2646,6 @@ class Endpoint(PgProtocol):
with open(config_path, "w") as file:
json.dump(dict(data_dict, **kwargs), file, indent=4)
# Mock the extension part of spec passed from control plane for local testing
# endpooint.rs adds content of this file as a part of the spec.json
def create_remote_extension_spec(self, spec: dict[str, Any]):
"""Create a remote extension spec file for the endpoint."""
remote_extensions_spec_path = os.path.join(
self.endpoint_path(), "remote_extensions_spec.json"
)
with open(remote_extensions_spec_path, "w") as file:
json.dump(spec, file, indent=4)
def stop(self) -> "Endpoint":
"""
Stop the Postgres instance if it's running.

View File

@@ -1,116 +0,0 @@
#! /usr/bin/env python3
import argparse
import re
import sys
from typing import Iterable, List, Tuple
def scan_pageserver_log_for_errors(
input: Iterable[str], allowed_errors: List[str]
) -> List[Tuple[int, str]]:
error_or_warn = re.compile(r"\s(ERROR|WARN)")
errors = []
for lineno, line in enumerate(input, start=1):
if len(line) == 0:
continue
if error_or_warn.search(line):
# Is this a torn log line? This happens when force-killing a process and restarting
# Example: "2023-10-25T09:38:31.752314Z WARN deletion executo2023-10-25T09:38:31.875947Z INFO version: git-env:0f9452f76e8ccdfc88291bccb3f53e3016f40192"
if re.match("\\d{4}-\\d{2}-\\d{2}T.+\\d{4}-\\d{2}-\\d{2}T.+INFO version.+", line):
continue
# It's an ERROR or WARN. Is it in the allow-list?
for a in allowed_errors:
if re.match(a, line):
break
else:
errors.append((lineno, line))
return errors
DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
# All tests print these, when starting up or shutting down
".*wal receiver task finished with an error: walreceiver connection handling failure.*",
".*Shutdown task error: walreceiver connection handling failure.*",
".*wal_connection_manager.*tcp connect error: Connection refused.*",
".*query handler for .* failed: Socket IO error: Connection reset by peer.*",
".*serving compute connection task.*exited with error: Postgres connection error.*",
".*serving compute connection task.*exited with error: Connection reset by peer.*",
".*serving compute connection task.*exited with error: Postgres query error.*",
".*Connection aborted: error communicating with the server: Transport endpoint is not connected.*",
# FIXME: replication patch for tokio_postgres regards any but CopyDone/CopyData message in CopyBoth stream as unexpected
".*Connection aborted: unexpected message from server*",
".*kill_and_wait_impl.*: wait successful.*",
".*query handler for 'pagestream.*failed: Broken pipe.*", # pageserver notices compute shut down
".*query handler for 'pagestream.*failed: Connection reset by peer.*", # pageserver notices compute shut down
# safekeeper connection can fail with this, in the window between timeline creation
# and streaming start
".*Failed to process query for timeline .*: state uninitialized, no data to read.*",
# Tests related to authentication and authorization print these
".*Error processing HTTP request: Forbidden",
# intentional failpoints
".*failpoint ",
# FIXME: These need investigation
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
".*Removing intermediate uninit mark file.*",
# Tenant::delete_timeline() can cause any of the four following errors.
# FIXME: we shouldn't be considering it an error: https://github.com/neondatabase/neon/issues/2946
".*could not flush frozen layer.*queue is in state Stopped", # when schedule layer upload fails because queued got closed before compaction got killed
".*wait for layer upload ops to complete.*", # .*Caused by:.*wait_completion aborted because upload queue was stopped
".*gc_loop.*Gc failed, retrying in.*timeline is Stopping", # When gc checks timeline state after acquiring layer_removal_cs
".*gc_loop.*Gc failed, retrying in.*: Cannot run GC iteration on inactive tenant", # Tenant::gc precondition
".*compaction_loop.*Compaction failed.*, retrying in.*timeline or pageserver is shutting down", # When compaction checks timeline state after acquiring layer_removal_cs
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
".*task iteration took longer than the configured period.*",
# these can happen anytime we do compactions from background task and shutdown pageserver
r".*ERROR.*ancestor timeline \S+ is being stopped",
# this is expected given our collaborative shutdown approach for the UploadQueue
".*Compaction failed.*, retrying in .*: Other\\(queue is in state Stopped.*",
".*Compaction failed.*, retrying in .*: ShuttingDown",
# Pageserver timeline deletion should be polled until it gets 404, so ignore it globally
".*Error processing HTTP request: NotFound: Timeline .* was not found",
".*took more than expected to complete.*",
# these can happen during shutdown, but it should not be a reason to fail a test
".*completed, took longer than expected.*",
# AWS S3 may emit 500 errors for keys in a DeleteObjects response: we retry these
# and it is not a failure of our code when it happens.
".*DeleteObjects.*We encountered an internal error. Please try again.*",
)
def _check_allowed_errors(input):
allowed_errors: List[str] = list(DEFAULT_PAGESERVER_ALLOWED_ERRORS)
# add any test specifics here; cli parsing is not provided for the
# difficulty of copypasting regexes as arguments without any quoting
# errors.
errors = scan_pageserver_log_for_errors(input, allowed_errors)
for lineno, error in errors:
print(f"-:{lineno}: {error.strip()}", file=sys.stderr)
print(f"\n{len(errors)} not allowed errors", file=sys.stderr)
return errors
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="check input against pageserver global allowed_errors"
)
parser.add_argument(
"-i",
"--input",
type=argparse.FileType("r"),
default=sys.stdin,
help="Pageserver logs file. Reads from stdin if no file is provided.",
)
args = parser.parse_args()
errors = _check_allowed_errors(args.input)
sys.exit(len(errors) > 0)

View File

@@ -411,6 +411,7 @@ def check_neon_works(
config.initial_tenant = snapshot_config["default_tenant_id"]
config.pg_distrib_dir = pg_distrib_dir
config.remote_storage = None
config.ext_remote_storage = None
config.sk_remote_storage = None
# Use the "target" binaries to launch the storage nodes

View File

@@ -1,165 +1,316 @@
import os
import shutil
import threading
from contextlib import closing
from pathlib import Path
from typing import Any, Dict, List
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.pg_version import PgVersion
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
from fixtures.pg_version import PgVersion, skip_on_postgres
from fixtures.remote_storage import (
RemoteStorageKind,
S3Storage,
available_s3_storages,
)
# Check that the extension is not already in the share_dir_path_ext
# if it is, skip the test
#
# After the test is done, cleanup the control file and the extension directory
@pytest.fixture(scope="function")
def ext_file_cleanup(pg_bin):
out = pg_bin.run_capture("pg_config --sharedir".split())
share_dir_path = Path(f"{out}.stdout").read_text().strip()
log.info(f"share_dir_path: {share_dir_path}")
share_dir_path_ext = os.path.join(share_dir_path, "extension")
# Cleaning up downloaded files is important for local tests
# or else one test could reuse the files from another test or another test run
def cleanup(pg_version):
PGDIR = Path(f"pg_install/v{pg_version}")
log.info(f"share_dir_path_ext: {share_dir_path_ext}")
LIB_DIR = PGDIR / Path("lib/postgresql")
cleanup_lib_globs = ["anon*", "postgis*", "pg_buffercache*"]
cleanup_lib_glob_paths = [LIB_DIR.glob(x) for x in cleanup_lib_globs]
# if file is already in the share_dir_path_ext, skip the test
if os.path.isfile(os.path.join(share_dir_path_ext, "anon.control")):
log.info("anon.control is already in the share_dir_path_ext, skipping the test")
yield False
return
else:
yield True
SHARE_DIR = PGDIR / Path("share/postgresql/extension")
cleanup_ext_globs = [
"anon*",
"address_standardizer*",
"postgis*",
"pageinspect*",
"pg_buffercache*",
"pgrouting*",
]
cleanup_ext_glob_paths = [SHARE_DIR.glob(x) for x in cleanup_ext_globs]
# cleanup the control file
if os.path.isfile(os.path.join(share_dir_path_ext, "anon.control")):
os.unlink(os.path.join(share_dir_path_ext, "anon.control"))
log.info("anon.control was removed from the share_dir_path_ext")
all_glob_paths = cleanup_lib_glob_paths + cleanup_ext_glob_paths
all_cleanup_files = []
for file_glob in all_glob_paths:
for file in file_glob:
all_cleanup_files.append(file)
# remove the extension directory recursively
if os.path.isdir(os.path.join(share_dir_path_ext, "anon")):
directories_to_clean: List[Path] = []
for f in Path(os.path.join(share_dir_path_ext, "anon")).iterdir():
if f.is_file():
log.info(f"Removing file {f}")
f.unlink()
elif f.is_dir():
directories_to_clean.append(f)
for file in all_cleanup_files:
try:
os.remove(file)
log.info(f"removed file {file}")
except Exception as err:
log.info(
f"skipping remove of file {file} because it doesn't exist.\
this may be expected or unexpected depending on the test {err}"
)
for directory_to_clean in reversed(directories_to_clean):
if not os.listdir(directory_to_clean):
log.info(f"Removing empty directory {directory_to_clean}")
directory_to_clean.rmdir()
os.rmdir(os.path.join(share_dir_path_ext, "anon"))
log.info("anon directory was removed from the share_dir_path_ext")
cleanup_folders = [SHARE_DIR / Path("anon"), PGDIR / Path("download_extensions")]
for folder in cleanup_folders:
try:
shutil.rmtree(folder)
log.info(f"removed folder {folder}")
except Exception as err:
log.info(
f"skipping remove of folder {folder} because it doesn't exist.\
this may be expected or unexpected depending on the test {err}"
)
def upload_files(env):
log.info("Uploading test files to mock bucket")
os.chdir("test_runner/regress/data/extension_test")
for path in os.walk("."):
prefix, _, files = path
for file in files:
# the [2:] is to remove the leading "./"
full_path = os.path.join(prefix, file)[2:]
with open(full_path, "rb") as f:
log.info(f"UPLOAD {full_path} to ext/{full_path}")
assert isinstance(env.pageserver_remote_storage, S3Storage)
env.pageserver_remote_storage.client.upload_fileobj(
f,
env.ext_remote_storage.bucket_name,
f"ext/{full_path}",
)
os.chdir("../../../..")
# Test downloading remote extension.
@skip_on_postgres(PgVersion.V16, reason="TODO: PG16 extension building")
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
def test_remote_extensions(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
pg_version,
ext_file_cleanup,
remote_storage_kind: RemoteStorageKind,
pg_version: PgVersion,
):
if ext_file_cleanup is False:
log.info("test_remote_extensions skipped")
return
if pg_version == PgVersion.V16:
pytest.skip("TODO: PG16 extension building")
# setup mock http server
# that expects request for anon.tar.zst
# and returns the requested file
(host, port) = httpserver_listen_address
extensions_endpoint = f"http://{host}:{port}/pg-ext-s3-gateway"
archive_path = f"latest/v{pg_version}/extensions/anon.tar.zst"
def endpoint_handler_build_tag(request: Request) -> Response:
log.info(f"request: {request}")
file_name = "anon.tar.zst"
file_path = f"test_runner/regress/data/extension_test/5670669815/v{pg_version}/extensions/anon.tar.zst"
file_size = os.path.getsize(file_path)
fh = open(file_path, "rb")
return Response(
fh,
mimetype="application/octet-stream",
headers=[
("Content-Length", str(file_size)),
("Content-Disposition", 'attachment; filename="%s"' % file_name),
],
direct_passthrough=True,
)
httpserver.expect_request(
f"/pg-ext-s3-gateway/{archive_path}", method="GET"
).respond_with_handler(endpoint_handler_build_tag)
# Start a compute node with remote_extension spec
# and check that it can download the extensions and use them to CREATE EXTENSION.
neon_env_builder.enable_extensions_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
endpoint = env.endpoints.create(
assert env.ext_remote_storage is not None # satisfy mypy
# For MOCK_S3 we upload test files.
# For REAL_S3 we use the files already in the bucket
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
upload_files(env)
# Start a compute node and check that it can download the extensions
# and use them to CREATE EXTENSION and LOAD
endpoint = env.endpoints.create_start(
"test_remote_extensions",
tenant_id=tenant_id,
config_lines=["log_min_messages=debug3"],
remote_ext_config=env.ext_remote_storage.to_string(),
# config_lines=["log_min_messages=debug3"],
)
# mock remote_extensions spec
spec: Dict[str, Any] = {
"library_index": {
"anon": "anon",
},
"extension_data": {
"anon": {
"archive_path": "",
"control_data": {
"anon.control": "# PostgreSQL Anonymizer (anon) extension\ncomment = 'Data anonymization tools'\ndefault_version = '1.1.0'\ndirectory='extension/anon'\nrelocatable = false\nrequires = 'pgcrypto'\nsuperuser = false\nmodule_pathname = '$libdir/anon'\ntrusted = true\n"
},
},
},
}
spec["extension_data"]["anon"]["archive_path"] = archive_path
endpoint.create_remote_extension_spec(spec)
endpoint.start(
remote_ext_config=extensions_endpoint,
)
# this is expected to fail if there's no pgcrypto extension, that's ok
# we just want to check that the extension was downloaded
try:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# Check that appropriate files were downloaded
cur.execute("CREATE EXTENSION anon")
res = [x[0] for x in cur.fetchall()]
log.info(res)
except Exception as err:
assert "pgcrypto" in str(err), f"unexpected error creating anon extension {err}"
# Check that appropriate control files were downloaded
cur.execute("SELECT * FROM pg_available_extensions")
all_extensions = [x[0] for x in cur.fetchall()]
log.info(all_extensions)
assert "anon" in all_extensions
httpserver.check()
# postgis is on real s3 but not mock s3.
# it's kind of a big file, would rather not upload to github
if remote_storage_kind == RemoteStorageKind.REAL_S3:
assert "postgis" in all_extensions
# this may fail locally if dependency is missing
# we don't really care about the error,
# we just want to make sure it downloaded
try:
cur.execute("CREATE EXTENSION postgis")
except Exception as err:
log.info(f"(expected) error creating postgis extension: {err}")
# we do not check the error, so this is basically a NO-OP
# however checking the log you can make sure that it worked
# and also get valuable information about how long loading the extension took
# this is expected to fail on my computer because I don't have the pgcrypto extension
try:
cur.execute("CREATE EXTENSION anon")
except Exception as err:
log.info("error creating anon extension")
assert "pgcrypto" in str(err), "unexpected error creating anon extension"
finally:
cleanup(pg_version)
# TODO
# 1. Test downloading remote library.
#
# 2. Test a complex extension, which has multiple extensions in one archive
# Test downloading remote library.
@skip_on_postgres(PgVersion.V16, reason="TODO: PG16 extension building")
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
def test_remote_library(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_version: PgVersion,
):
neon_env_builder.enable_extensions_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_remote_library", tenant_id=tenant_id)
assert env.ext_remote_storage is not None # satisfy mypy
# For MOCK_S3 we upload test files.
# For REAL_S3 we use the files already in the bucket
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
upload_files(env)
# and use them to run LOAD library
endpoint = env.endpoints.create_start(
"test_remote_library",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
# config_lines=["log_min_messages=debug3"],
)
try:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# try to load library
try:
cur.execute("LOAD 'anon'")
except Exception as err:
log.info(f"error loading anon library: {err}")
raise AssertionError("unexpected error loading anon library") from err
# test library which name is different from extension name
# this may fail locally if dependency is missing
# however, it does successfully download the postgis archive
if remote_storage_kind == RemoteStorageKind.REAL_S3:
try:
cur.execute("LOAD 'postgis_topology-3'")
except Exception as err:
log.info("error loading postgis_topology-3")
assert "No such file or directory" in str(
err
), "unexpected error loading postgis_topology-3"
finally:
cleanup(pg_version)
# Here we test a complex extension
# which has multiple extensions in one archive
# using postgis as an example
#
# 3.Test that extension is downloaded after endpoint restart,
# @pytest.mark.skipif(
# RemoteStorageKind.REAL_S3 not in available_s3_storages(),
# reason="skipping test because real s3 not enabled",
# )
@skip_on_postgres(PgVersion.V16, reason="TODO: PG16 extension building")
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
def test_multiple_extensions_one_archive(
neon_env_builder: NeonEnvBuilder,
pg_version: PgVersion,
):
neon_env_builder.enable_extensions_remote_storage(RemoteStorageKind.REAL_S3)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_multiple_extensions_one_archive", tenant_id=tenant_id)
assert env.ext_remote_storage is not None # satisfy mypy
endpoint = env.endpoints.create_start(
"test_multiple_extensions_one_archive",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE EXTENSION address_standardizer;")
cur.execute("CREATE EXTENSION address_standardizer_data_us;")
# execute query to ensure that it works
cur.execute(
"SELECT house_num, name, suftype, city, country, state, unit \
FROM standardize_address('us_lex', 'us_gaz', 'us_rules', \
'One Rust Place, Boston, MA 02109');"
)
res = cur.fetchall()
log.info(res)
assert len(res) > 0
cleanup(pg_version)
# Test that extension is downloaded after endpoint restart,
# when the library is used in the query.
#
# Run the test with mutliple simultaneous connections to an endpoint.
# to ensure that the extension is downloaded only once.
#
# 4. Test that private extensions are only downloaded when they are present in the spec.
#
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
def test_extension_download_after_restart(
neon_env_builder: NeonEnvBuilder,
pg_version: PgVersion,
):
# TODO: PG15 + PG16 extension building
if "v14" not in pg_version: # test set only has extension built for v14
return None
neon_env_builder.enable_extensions_remote_storage(RemoteStorageKind.MOCK_S3)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_extension_download_after_restart", tenant_id=tenant_id)
assert env.ext_remote_storage is not None # satisfy mypy
# For MOCK_S3 we upload test files.
upload_files(env)
endpoint = env.endpoints.create_start(
"test_extension_download_after_restart",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
config_lines=["log_min_messages=debug3"],
)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE extension pg_buffercache;")
cur.execute("SELECT * from pg_buffercache;")
res = cur.fetchall()
assert len(res) > 0
log.info(res)
# shutdown compute node
endpoint.stop()
# remove extension files locally
cleanup(pg_version)
# spin up compute node again (there are no extension files available, because compute is stateless)
endpoint = env.endpoints.create_start(
"test_extension_download_after_restart",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
config_lines=["log_min_messages=debug3"],
)
# connect to compute node and run the query
# that will trigger the download of the extension
def run_query(endpoint, thread_id: int):
log.info("thread_id {%d} starting", thread_id)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SELECT * from pg_buffercache;")
res = cur.fetchall()
assert len(res) > 0
log.info("thread_id {%d}, res = %s", thread_id, res)
threads = [threading.Thread(target=run_query, args=(endpoint, i)) for i in range(2)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
cleanup(pg_version)

View File

@@ -26,3 +26,13 @@ def test_neon_extension(neon_env_builder: NeonEnvBuilder):
# If the version has changed, the test should be updated.
# Ensure that the default version is also updated in the neon.control file
assert cur.fetchone() == ("1.1",)
# create test database
cur.execute("CREATE DATABASE foodb")
# connect to test database
# test that neon extension is installed and has the correct version
with closing(endpoint_main.connect(dbname="foodb")) as conn:
with conn.cursor() as cur:
cur.execute("SELECT extversion from pg_extension where extname='neon'")
assert cur.fetchone() == ("1.1",)

View File

@@ -336,15 +336,10 @@ def test_live_reconfig_get_evictions_low_residence_duration_metric_threshold(
):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start(
initial_tenant_conf={
# disable compaction so that it will not download the layer for repartitioning
"compaction_period": "0s"
}
)
env = neon_env_builder.init_start()
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
(tenant_id, timeline_id) = env.initial_tenant, env.initial_timeline
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
ps_http = env.pageserver.http_client()
def get_metric():

View File

@@ -1,7 +1,6 @@
import enum
import os
import shutil
from threading import Thread
import pytest
from fixtures.log_helper import log
@@ -28,7 +27,7 @@ from fixtures.remote_storage import (
available_s3_storages,
)
from fixtures.types import TenantId
from fixtures.utils import run_pg_bench_small, wait_until
from fixtures.utils import run_pg_bench_small
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
@@ -400,78 +399,4 @@ def test_tenant_delete_is_resumed_on_attach(
)
def test_long_timeline_create_cancelled_by_tenant_delete(neon_env_builder: NeonEnvBuilder):
"""Reproduction of 2023-11-23 stuck tenants investigation"""
# do not use default tenant/timeline creation because it would output the failpoint log message too early
env = neon_env_builder.init_configs()
env.start()
pageserver_http = env.pageserver.http_client()
# happens with the cancellation bailing flushing loop earlier, leaving disk_consistent_lsn at zero
env.pageserver.allowed_errors.append(
".*Timeline got dropped without initializing, cleaning its files"
)
# the response hit_pausable_failpoint_and_later_fail
env.pageserver.allowed_errors.append(
f".*Error processing HTTP request: InternalServerError\\(new timeline {env.initial_tenant}/{env.initial_timeline} has invalid disk_consistent_lsn"
)
pageserver_http.tenant_create(env.initial_tenant)
failpoint = "flush-layer-cancel-after-writing-layer-out-pausable"
pageserver_http.configure_failpoints((failpoint, "pause"))
def hit_pausable_failpoint_and_later_fail():
with pytest.raises(
PageserverApiException, match="new timeline \\S+ has invalid disk_consistent_lsn"
):
pageserver_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline
)
def start_deletion():
pageserver_http.tenant_delete(env.initial_tenant)
def has_hit_failpoint():
assert env.pageserver.log_contains(f"at failpoint {failpoint}") is not None
def deletion_has_started_waiting_for_timelines():
assert env.pageserver.log_contains("Waiting for timelines...") is not None
def tenant_is_deleted():
try:
pageserver_http.tenant_status(env.initial_tenant)
except PageserverApiException as e:
assert e.status_code == 404
else:
raise RuntimeError("tenant was still accessible")
creation = Thread(target=hit_pausable_failpoint_and_later_fail)
creation.start()
deletion = None
try:
wait_until(10, 1, has_hit_failpoint)
# it should start ok, sync up with the stuck creation, then fail because disk_consistent_lsn was not updated
# then deletion should fail and set the tenant broken
deletion = Thread(target=start_deletion)
deletion.start()
wait_until(10, 1, deletion_has_started_waiting_for_timelines)
pageserver_http.configure_failpoints((failpoint, "off"))
creation.join()
deletion.join()
wait_until(10, 1, tenant_is_deleted)
finally:
creation.join()
if deletion is not None:
deletion.join()
# TODO test concurrent deletions with "hang" failpoint

View File

@@ -134,11 +134,10 @@ def wait_for_pageserver_catchup(endpoint_main: Endpoint, polling_interval=1, tim
res = endpoint_main.safe_psql(
"""
SELECT
pg_size_pretty(neon.pg_cluster_size()),
pg_size_pretty(pg_cluster_size()),
pg_wal_lsn_diff(pg_current_wal_flush_lsn(), received_lsn) as received_lsn_lag
FROM neon.backpressure_lsns();
""",
dbname="postgres",
FROM backpressure_lsns();
"""
)[0]
log.info(f"pg_cluster_size = {res[0]}, received_lsn_lag = {res[1]}")
received_lsn_lag = res[1]
@@ -215,7 +214,7 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
wait_for_pageserver_catchup(endpoint_main)
cur.execute("SELECT * from pg_size_pretty(neon.pg_cluster_size())")
cur.execute("SELECT * from pg_size_pretty(pg_cluster_size())")
pg_cluster_size = cur.fetchone()
log.info(f"pg_cluster_size = {pg_cluster_size}")