mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-29 11:00:38 +00:00
Compare commits
2 Commits
jcsp/remov
...
problame/u
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
01898af391 | ||
|
|
85f0867db8 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1126,7 +1126,6 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-compression",
|
||||
"bytes",
|
||||
"cfg-if",
|
||||
"chrono",
|
||||
"clap",
|
||||
|
||||
@@ -38,4 +38,3 @@ toml_edit.workspace = true
|
||||
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
|
||||
vm_monitor = { version = "0.1", path = "../libs/vm_monitor/" }
|
||||
zstd = "0.12.4"
|
||||
bytes = "1.0"
|
||||
|
||||
@@ -31,7 +31,7 @@
|
||||
//! -C 'postgresql://cloud_admin@localhost/postgres' \
|
||||
//! -S /var/db/postgres/specs/current.json \
|
||||
//! -b /usr/local/bin/postgres \
|
||||
//! -r http://pg-ext-s3-gateway
|
||||
//! -r {"bucket": "neon-dev-extensions-eu-central-1", "region": "eu-central-1"}
|
||||
//! ```
|
||||
//!
|
||||
use std::collections::HashMap;
|
||||
@@ -51,7 +51,7 @@ use compute_api::responses::ComputeStatus;
|
||||
|
||||
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||
use compute_tools::configurator::launch_configurator;
|
||||
use compute_tools::extension_server::get_pg_version;
|
||||
use compute_tools::extension_server::{get_pg_version, init_remote_storage};
|
||||
use compute_tools::http::api::launch_http_server;
|
||||
use compute_tools::logger::*;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
@@ -60,7 +60,7 @@ use compute_tools::spec::*;
|
||||
|
||||
// this is an arbitrary build tag. Fine as a default / for testing purposes
|
||||
// in-case of not-set environment var
|
||||
const BUILD_TAG_DEFAULT: &str = "latest";
|
||||
const BUILD_TAG_DEFAULT: &str = "5670669815";
|
||||
|
||||
fn main() -> Result<()> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
|
||||
@@ -74,18 +74,10 @@ fn main() -> Result<()> {
|
||||
let pgbin_default = String::from("postgres");
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap_or(&pgbin_default);
|
||||
|
||||
let ext_remote_storage = matches
|
||||
.get_one::<String>("remote-ext-config")
|
||||
// Compatibility hack: if the control plane specified any remote-ext-config
|
||||
// use the default value for extension storage proxy gateway.
|
||||
// Remove this once the control plane is updated to pass the gateway URL
|
||||
.map(|conf| {
|
||||
if conf.starts_with("http") {
|
||||
conf.trim_end_matches('/')
|
||||
} else {
|
||||
"http://pg-ext-s3-gateway"
|
||||
}
|
||||
});
|
||||
let remote_ext_config = matches.get_one::<String>("remote-ext-config");
|
||||
let ext_remote_storage = remote_ext_config.map(|x| {
|
||||
init_remote_storage(x).expect("cannot initialize remote extension storage from config")
|
||||
});
|
||||
|
||||
let http_port = *matches
|
||||
.get_one::<u16>("http-port")
|
||||
@@ -206,7 +198,7 @@ fn main() -> Result<()> {
|
||||
live_config_allowed,
|
||||
state: Mutex::new(new_state),
|
||||
state_changed: Condvar::new(),
|
||||
ext_remote_storage: ext_remote_storage.map(|s| s.to_string()),
|
||||
ext_remote_storage,
|
||||
ext_download_progress: RwLock::new(HashMap::new()),
|
||||
build_tag,
|
||||
};
|
||||
|
||||
@@ -25,7 +25,7 @@ use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{ComputeMode, ComputeSpec};
|
||||
use utils::measured_stream::MeasuredReader;
|
||||
|
||||
use remote_storage::{DownloadError, RemotePath};
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
|
||||
|
||||
use crate::checker::create_availability_check_data;
|
||||
use crate::pg_helpers::*;
|
||||
@@ -59,8 +59,8 @@ pub struct ComputeNode {
|
||||
pub state: Mutex<ComputeState>,
|
||||
/// `Condvar` to allow notifying waiters about state changes.
|
||||
pub state_changed: Condvar,
|
||||
/// the address of extension storage proxy gateway
|
||||
pub ext_remote_storage: Option<String>,
|
||||
/// the S3 bucket that we search for extensions in
|
||||
pub ext_remote_storage: Option<GenericRemoteStorage>,
|
||||
// key: ext_archive_name, value: started download time, download_completed?
|
||||
pub ext_download_progress: RwLock<HashMap<String, (DateTime<Utc>, bool)>>,
|
||||
pub build_tag: String,
|
||||
@@ -693,12 +693,13 @@ impl ComputeNode {
|
||||
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
|
||||
create_neon_superuser(spec, &mut client)?;
|
||||
cleanup_instance(&mut client)?;
|
||||
handle_extension_neon(self.connstr.as_str())?;
|
||||
handle_roles(spec, &mut client)?;
|
||||
handle_databases(spec, &mut client)?;
|
||||
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(spec, &mut client, self.connstr.as_str())?;
|
||||
handle_extensions(spec, &mut client)?;
|
||||
handle_extension_neon(&mut client)?;
|
||||
handle_alter_extension_neon(spec, &mut client, self.connstr.as_str())?;
|
||||
create_availability_check_data(&mut client)?;
|
||||
|
||||
// 'Close' connection
|
||||
@@ -738,12 +739,13 @@ impl ComputeNode {
|
||||
if spec.mode == ComputeMode::Primary {
|
||||
client.simple_query("SET neon.forward_ddl = false")?;
|
||||
cleanup_instance(&mut client)?;
|
||||
handle_extension_neon(self.connstr.as_str())?;
|
||||
handle_roles(&spec, &mut client)?;
|
||||
handle_databases(&spec, &mut client)?;
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(&spec, &mut client, self.connstr.as_str())?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
handle_extension_neon(&mut client)?;
|
||||
handle_alter_extension_neon(&spec, &mut client, self.connstr.as_str())?;
|
||||
}
|
||||
|
||||
// 'Close' connection
|
||||
@@ -957,12 +959,12 @@ LIMIT 100",
|
||||
real_ext_name: String,
|
||||
ext_path: RemotePath,
|
||||
) -> Result<u64, DownloadError> {
|
||||
let ext_remote_storage =
|
||||
self.ext_remote_storage
|
||||
.as_ref()
|
||||
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
|
||||
"Remote extensions storage is not configured",
|
||||
)))?;
|
||||
let remote_storage = self
|
||||
.ext_remote_storage
|
||||
.as_ref()
|
||||
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
|
||||
"Remote extensions storage is not configured",
|
||||
)))?;
|
||||
|
||||
let ext_archive_name = ext_path.object_name().expect("bad path");
|
||||
|
||||
@@ -1018,7 +1020,7 @@ LIMIT 100",
|
||||
let download_size = extension_server::download_extension(
|
||||
&real_ext_name,
|
||||
&ext_path,
|
||||
ext_remote_storage,
|
||||
remote_storage,
|
||||
&self.pgbin,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -71,16 +71,18 @@ More specifically, here is an example ext_index.json
|
||||
}
|
||||
}
|
||||
*/
|
||||
use anyhow::Context;
|
||||
use anyhow::{self, Result};
|
||||
use anyhow::{bail, Context};
|
||||
use bytes::Bytes;
|
||||
use compute_api::spec::RemoteExtSpec;
|
||||
use regex::Regex;
|
||||
use remote_storage::*;
|
||||
use reqwest::StatusCode;
|
||||
use serde_json;
|
||||
use std::io::Read;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::Path;
|
||||
use std::str;
|
||||
use tar::Archive;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::info;
|
||||
use tracing::log::warn;
|
||||
use zstd::stream::read::Decoder;
|
||||
@@ -136,31 +138,23 @@ fn parse_pg_version(human_version: &str) -> &str {
|
||||
pub async fn download_extension(
|
||||
ext_name: &str,
|
||||
ext_path: &RemotePath,
|
||||
ext_remote_storage: &str,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
) -> Result<u64> {
|
||||
info!("Download extension {:?} from {:?}", ext_name, ext_path);
|
||||
|
||||
// TODO add retry logic
|
||||
let download_buffer =
|
||||
match download_extension_tar(ext_remote_storage, &ext_path.to_string()).await {
|
||||
Ok(buffer) => buffer,
|
||||
Err(error_message) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"error downloading extension {:?}: {:?}",
|
||||
ext_name,
|
||||
error_message
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let mut download = remote_storage.download(ext_path).await?;
|
||||
let mut download_buffer = Vec::new();
|
||||
download
|
||||
.download_stream
|
||||
.read_to_end(&mut download_buffer)
|
||||
.await?;
|
||||
let download_size = download_buffer.len() as u64;
|
||||
info!("Download size {:?}", download_size);
|
||||
// it's unclear whether it is more performant to decompress into memory or not
|
||||
// TODO: decompressing into memory can be avoided
|
||||
let decoder = Decoder::new(download_buffer.as_ref())?;
|
||||
let mut archive = Archive::new(decoder);
|
||||
|
||||
let mut decoder = Decoder::new(download_buffer.as_slice())?;
|
||||
let mut decompress_buffer = Vec::new();
|
||||
decoder.read_to_end(&mut decompress_buffer)?;
|
||||
let mut archive = Archive::new(decompress_buffer.as_slice());
|
||||
let unzip_dest = pgbin
|
||||
.strip_suffix("/bin/postgres")
|
||||
.expect("bad pgbin")
|
||||
@@ -228,32 +222,29 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
|
||||
}
|
||||
}
|
||||
|
||||
// Do request to extension storage proxy, i.e.
|
||||
// curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
|
||||
// using HHTP GET
|
||||
// and return the response body as bytes
|
||||
//
|
||||
async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
|
||||
let uri = format!("{}/{}", ext_remote_storage, ext_path);
|
||||
|
||||
info!("Download extension {:?} from uri {:?}", ext_path, uri);
|
||||
|
||||
let resp = reqwest::get(uri).await?;
|
||||
|
||||
match resp.status() {
|
||||
StatusCode::OK => match resp.bytes().await {
|
||||
Ok(resp) => {
|
||||
info!("Download extension {:?} completed successfully", ext_path);
|
||||
Ok(resp)
|
||||
}
|
||||
Err(e) => bail!("could not deserialize remote extension response: {}", e),
|
||||
},
|
||||
StatusCode::SERVICE_UNAVAILABLE => bail!("remote extension is temporarily unavailable"),
|
||||
_ => bail!(
|
||||
"unexpected remote extension response status code: {}",
|
||||
resp.status()
|
||||
),
|
||||
// This function initializes the necessary structs to use remote storage
|
||||
pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result<GenericRemoteStorage> {
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
struct RemoteExtJson {
|
||||
bucket: String,
|
||||
region: String,
|
||||
endpoint: Option<String>,
|
||||
prefix: Option<String>,
|
||||
}
|
||||
let remote_ext_json = serde_json::from_str::<RemoteExtJson>(remote_ext_config)?;
|
||||
|
||||
let config = S3Config {
|
||||
bucket_name: remote_ext_json.bucket,
|
||||
bucket_region: remote_ext_json.region,
|
||||
prefix_in_bucket: remote_ext_json.prefix,
|
||||
endpoint: remote_ext_json.endpoint,
|
||||
concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"),
|
||||
max_keys_per_list_response: None,
|
||||
};
|
||||
let config = RemoteStorageConfig {
|
||||
storage: RemoteStorageKind::AwsS3(config),
|
||||
};
|
||||
GenericRemoteStorage::from_config(&config)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -123,7 +123,7 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
}
|
||||
}
|
||||
|
||||
// download extension files from remote extension storage on demand
|
||||
// download extension files from S3 on demand
|
||||
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
||||
info!("serving {:?} POST request", route);
|
||||
info!("req.uri {:?}", req.uri());
|
||||
|
||||
@@ -675,29 +675,78 @@ pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
|
||||
info!("handle extension neon");
|
||||
/// connect to template1 and postgres to create neon extension
|
||||
/// that will be available in all databases
|
||||
pub fn handle_extension_neon(connstr: &str) -> Result<()> {
|
||||
for dbname in ["template1", "postgres"].iter() {
|
||||
let mut conf = Config::from_str(connstr)?;
|
||||
conf.dbname(dbname);
|
||||
let mut template1_client = conf.connect(NoTls)?;
|
||||
|
||||
let mut query = "CREATE SCHEMA IF NOT EXISTS neon";
|
||||
client.simple_query(query)?;
|
||||
|
||||
query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon";
|
||||
info!("create neon extension with query: {}", query);
|
||||
client.simple_query(query)?;
|
||||
|
||||
query = "ALTER EXTENSION neon SET SCHEMA neon";
|
||||
info!("alter neon extension schema with query: {}", query);
|
||||
client.simple_query(query)?;
|
||||
|
||||
// this will be a no-op if extension is already up to date,
|
||||
// which may happen in two cases:
|
||||
// - extension was just installed
|
||||
// - extension was already installed and is up to date
|
||||
let query = "ALTER EXTENSION neon UPDATE";
|
||||
info!("update neon extension schema with query: {}", query);
|
||||
client.simple_query(query)?;
|
||||
let create_extension_neon_query = "CREATE EXTENSION IF NOT EXISTS neon";
|
||||
info!(
|
||||
"creating neon extension with query: {} in db {}",
|
||||
create_extension_neon_query, dbname
|
||||
);
|
||||
template1_client.simple_query(create_extension_neon_query)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run ALTER EXTENSION neon UPDATE for each valid database
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_alter_extension_neon(
|
||||
spec: &ComputeSpec,
|
||||
client: &mut Client,
|
||||
connstr: &str,
|
||||
) -> Result<()> {
|
||||
info!("modifying database permissions");
|
||||
let existing_dbs = get_existing_dbs(client)?;
|
||||
|
||||
// We'd better do this at db creation time, but we don't always know when it happens.
|
||||
for db in &spec.cluster.databases {
|
||||
match existing_dbs.get(&db.name) {
|
||||
Some(pg_db) => {
|
||||
if pg_db.restrict_conn || pg_db.invalid {
|
||||
info!(
|
||||
"skipping grants for db {} (invalid: {}, connections not allowed: {})",
|
||||
db.name, pg_db.invalid, pg_db.restrict_conn
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
bail!(
|
||||
"database {} doesn't exist in Postgres after handle_databases()",
|
||||
db.name
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let mut conf = Config::from_str(connstr)?;
|
||||
conf.dbname(&db.name);
|
||||
|
||||
let mut db_client = conf.connect(NoTls)?;
|
||||
|
||||
// this will be a no-op if extension is already up to date,
|
||||
// which may happen in two cases:
|
||||
// - extension was just installed
|
||||
// - extension was already installed and is up to date
|
||||
let create_extension_neon_query = "CREATE EXTENSION IF NOT EXISTS neon";
|
||||
info!(
|
||||
"create extension neon query for db {} : {}",
|
||||
&db.name, &create_extension_neon_query
|
||||
);
|
||||
db_client.simple_query(create_extension_neon_query)?;
|
||||
|
||||
let alter_extension_neon_query = "ALTER EXTENSION neon UPDATE";
|
||||
info!(
|
||||
"alter extension neon query for db {} : {}",
|
||||
&db.name, &alter_extension_neon_query
|
||||
);
|
||||
db_client.simple_query(alter_extension_neon_query)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1252,7 +1252,7 @@ fn cli() -> Command {
|
||||
let remote_ext_config_args = Arg::new("remote-ext-config")
|
||||
.long("remote-ext-config")
|
||||
.num_args(1)
|
||||
.help("Configure the remote extensions storage proxy gateway to request for extensions.")
|
||||
.help("Configure the S3 bucket that we search for extensions in.")
|
||||
.required(false);
|
||||
|
||||
let lsn_arg = Arg::new("lsn")
|
||||
|
||||
@@ -45,7 +45,6 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use compute_api::spec::RemoteExtSpec;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
@@ -477,18 +476,6 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
// check for file remote_extensions_spec.json
|
||||
// if it is present, read it and pass to compute_ctl
|
||||
let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json");
|
||||
let remote_extensions_spec = std::fs::File::open(remote_extensions_spec_path);
|
||||
let remote_extensions: Option<RemoteExtSpec>;
|
||||
|
||||
if let Ok(spec_file) = remote_extensions_spec {
|
||||
remote_extensions = serde_json::from_reader(spec_file).ok();
|
||||
} else {
|
||||
remote_extensions = None;
|
||||
};
|
||||
|
||||
// Create spec file
|
||||
let spec = ComputeSpec {
|
||||
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
|
||||
@@ -510,7 +497,7 @@ impl Endpoint {
|
||||
pageserver_connstring: Some(pageserver_connstring),
|
||||
safekeeper_connstrings,
|
||||
storage_auth_token: auth_token.clone(),
|
||||
remote_extensions,
|
||||
remote_extensions: None,
|
||||
};
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||
|
||||
@@ -280,6 +280,7 @@ impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
|
||||
use crate::tenant::delete::DeleteTenantError::*;
|
||||
match value {
|
||||
Get(g) => ApiError::from(g),
|
||||
e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
|
||||
Timeline(t) => ApiError::from(t),
|
||||
NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()),
|
||||
SlotError(e) => e.into(),
|
||||
@@ -1688,24 +1689,8 @@ where
|
||||
let token_cloned = token.clone();
|
||||
let result = handler(r, token).await;
|
||||
if token_cloned.is_cancelled() {
|
||||
// dropguard has executed: we will never turn this result into response.
|
||||
//
|
||||
// at least temporarily do {:?} logging; these failures are rare enough but
|
||||
// could hide difficult errors.
|
||||
match &result {
|
||||
Ok(response) => {
|
||||
let status = response.status();
|
||||
info!(%status, "Cancelled request finished successfully")
|
||||
}
|
||||
Err(e) => error!("Cancelled request finished with an error: {e:?}"),
|
||||
}
|
||||
info!("Cancelled request finished");
|
||||
}
|
||||
// only logging for cancelled panicked request handlers is the tracing_panic_hook,
|
||||
// which should suffice.
|
||||
//
|
||||
// there is still a chance to lose the result due to race between
|
||||
// returning from here and the actual connection closing happening
|
||||
// before outer task gets to execute. leaving that up for #5815.
|
||||
result
|
||||
}
|
||||
.in_current_span(),
|
||||
|
||||
@@ -86,7 +86,6 @@ use crate::tenant::storage_layer::ImageLayer;
|
||||
use crate::InitializationOrder;
|
||||
|
||||
use crate::tenant::timeline::delete::DeleteTimelineFlow;
|
||||
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
@@ -257,6 +256,8 @@ pub struct Tenant {
|
||||
|
||||
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
|
||||
|
||||
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
|
||||
|
||||
// Cancellation token fires when we have entered shutdown(). This is a parent of
|
||||
// Timelines' cancellation token.
|
||||
pub(crate) cancel: CancellationToken,
|
||||
@@ -632,9 +633,9 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
info!("pending_deletion {}", pending_deletion);
|
||||
info!("pending_deletion {}", pending_deletion.is_some());
|
||||
|
||||
if pending_deletion {
|
||||
if let Some(deletion) = pending_deletion {
|
||||
// as we are no longer loading, signal completion by dropping
|
||||
// the completion while we resume deletion
|
||||
drop(_completion);
|
||||
@@ -651,6 +652,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
match DeleteTenantFlow::resume_from_attach(
|
||||
deletion,
|
||||
&tenant_clone,
|
||||
preload,
|
||||
tenants,
|
||||
@@ -730,7 +732,7 @@ impl Tenant {
|
||||
///
|
||||
async fn attach(
|
||||
self: &Arc<Tenant>,
|
||||
init_order: Option<InitializationOrder>,
|
||||
mut init_order: Option<InitializationOrder>,
|
||||
preload: Option<TenantPreload>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -747,6 +749,11 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
// Signal that we have completed remote phase
|
||||
init_order
|
||||
.as_mut()
|
||||
.and_then(|x| x.initial_tenant_load_remote.take());
|
||||
|
||||
let mut timelines_to_resume_deletions = vec![];
|
||||
|
||||
let mut remote_index_and_client = HashMap::new();
|
||||
@@ -1850,7 +1857,6 @@ impl Tenant {
|
||||
});
|
||||
})
|
||||
};
|
||||
// test_long_timeline_create_then_tenant_delete is leaning on this message
|
||||
tracing::info!("Waiting for timelines...");
|
||||
while let Some(res) = js.join_next().await {
|
||||
match res {
|
||||
@@ -2369,6 +2375,7 @@ impl Tenant {
|
||||
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
|
||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
|
||||
cancel: CancellationToken::default(),
|
||||
gate: Gate::new(format!("Tenant<{tenant_id}>")),
|
||||
}
|
||||
@@ -3083,7 +3090,7 @@ impl Tenant {
|
||||
.await
|
||||
{
|
||||
error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
|
||||
cleanup_timeline_directory(uninit_mark);
|
||||
drop(uninit_mark); // does the cleanup
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
@@ -3135,20 +3142,8 @@ impl Tenant {
|
||||
"Timeline {timeline_path} already exists, cannot create its uninit mark file",
|
||||
);
|
||||
|
||||
let uninit_mark_path = self
|
||||
.conf
|
||||
.timeline_uninit_mark_file_path(tenant_id, timeline_id);
|
||||
fs::File::create(&uninit_mark_path)
|
||||
.context("Failed to create uninit mark file")
|
||||
.and_then(|_| {
|
||||
crashsafe::fsync_file_and_parent(&uninit_mark_path)
|
||||
.context("Failed to fsync uninit mark file")
|
||||
})
|
||||
.with_context(|| {
|
||||
format!("Failed to crate uninit mark for timeline {tenant_id}/{timeline_id}")
|
||||
})?;
|
||||
|
||||
let uninit_mark = TimelineUninitMark::new(uninit_mark_path, timeline_path);
|
||||
let uninit_mark = TimelineUninitMark::new(self.conf, tenant_id, timeline_id)
|
||||
.context("create uninit mark")?;
|
||||
|
||||
Ok(uninit_mark)
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::models::TenantState;
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, instrument, warn, Instrument, Span};
|
||||
|
||||
@@ -38,6 +39,9 @@ pub(crate) enum DeleteTenantError {
|
||||
#[error("Invalid state {0}. Expected Active or Broken")]
|
||||
InvalidState(TenantState),
|
||||
|
||||
#[error("Tenant deletion is already in progress")]
|
||||
AlreadyInProgress,
|
||||
|
||||
#[error("Tenant map slot error {0}")]
|
||||
SlotError(#[from] TenantSlotError),
|
||||
|
||||
@@ -51,6 +55,8 @@ pub(crate) enum DeleteTenantError {
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
type DeletionGuard = tokio::sync::OwnedMutexGuard<DeleteTenantFlow>;
|
||||
|
||||
fn remote_tenant_delete_mark_path(
|
||||
conf: &PageServerConf,
|
||||
tenant_id: &TenantId,
|
||||
@@ -281,14 +287,14 @@ impl DeleteTenantFlow {
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
Self::prepare(&tenant).await?;
|
||||
let mut guard = Self::prepare(&tenant).await?;
|
||||
|
||||
if let Err(e) = Self::run_inner(conf, remote_storage.as_ref(), &tenant).await {
|
||||
if let Err(e) = Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant).await {
|
||||
tenant.set_broken(format!("{e:#}")).await;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Self::schedule_background(conf, remote_storage, tenants, tenant);
|
||||
Self::schedule_background(guard, conf, remote_storage, tenants, tenant);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -298,10 +304,13 @@ impl DeleteTenantFlow {
|
||||
// will result in an error, but here we need to be able to retry shutdown when tenant deletion is retried.
|
||||
// So the solution is to set tenant state to broken.
|
||||
async fn run_inner(
|
||||
guard: &mut OwnedMutexGuard<Self>,
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<&GenericRemoteStorage>,
|
||||
tenant: &Tenant,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
guard.mark_in_progress()?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-create-remote-mark", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-create-remote-mark"
|
||||
@@ -336,25 +345,46 @@ impl DeleteTenantFlow {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn mark_in_progress(&mut self) -> anyhow::Result<()> {
|
||||
match self {
|
||||
Self::Finished => anyhow::bail!("Bug. Is in finished state"),
|
||||
Self::InProgress { .. } => { /* We're in a retry */ }
|
||||
Self::NotStarted => { /* Fresh start */ }
|
||||
}
|
||||
|
||||
*self = Self::InProgress;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn should_resume_deletion(
|
||||
conf: &'static PageServerConf,
|
||||
remote_mark_exists: bool,
|
||||
tenant: &Tenant,
|
||||
) -> Result<bool, DeleteTenantError> {
|
||||
) -> Result<Option<DeletionGuard>, DeleteTenantError> {
|
||||
let acquire = |t: &Tenant| {
|
||||
Some(
|
||||
Arc::clone(&t.delete_progress)
|
||||
.try_lock_owned()
|
||||
.expect("we're the only owner during init"),
|
||||
)
|
||||
};
|
||||
|
||||
if remote_mark_exists {
|
||||
return Ok(true);
|
||||
return Ok(acquire(tenant));
|
||||
}
|
||||
|
||||
let tenant_id = tenant.tenant_id;
|
||||
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
|
||||
if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
|
||||
Ok(true)
|
||||
Ok(acquire(tenant))
|
||||
} else {
|
||||
Ok(false)
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn resume_from_attach(
|
||||
guard: DeletionGuard,
|
||||
tenant: &Arc<Tenant>,
|
||||
preload: Option<TenantPreload>,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
@@ -373,10 +403,19 @@ impl DeleteTenantFlow {
|
||||
.await
|
||||
.context("attach")?;
|
||||
|
||||
Self::background(tenant.conf, tenant.remote_storage.clone(), tenants, tenant).await
|
||||
Self::background(
|
||||
guard,
|
||||
tenant.conf,
|
||||
tenant.remote_storage.clone(),
|
||||
tenants,
|
||||
tenant,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn prepare(tenant: &Arc<Tenant>) -> Result<(), DeleteTenantError> {
|
||||
async fn prepare(
|
||||
tenant: &Arc<Tenant>,
|
||||
) -> Result<tokio::sync::OwnedMutexGuard<Self>, DeleteTenantError> {
|
||||
// FIXME: unsure about active only. Our init jobs may not be cancellable properly,
|
||||
// so at least for now allow deletions only for active tenants. TODO recheck
|
||||
// Broken and Stopping is needed for retries.
|
||||
@@ -387,6 +426,10 @@ impl DeleteTenantFlow {
|
||||
return Err(DeleteTenantError::InvalidState(tenant.current_state()));
|
||||
}
|
||||
|
||||
let guard = Arc::clone(&tenant.delete_progress)
|
||||
.try_lock_owned()
|
||||
.map_err(|_| DeleteTenantError::AlreadyInProgress)?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-shutdown", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: tenant-delete-before-shutdown"))?
|
||||
});
|
||||
@@ -406,10 +449,11 @@ impl DeleteTenantFlow {
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(guard)
|
||||
}
|
||||
|
||||
fn schedule_background(
|
||||
guard: OwnedMutexGuard<Self>,
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
@@ -425,7 +469,9 @@ impl DeleteTenantFlow {
|
||||
"tenant_delete",
|
||||
false,
|
||||
async move {
|
||||
if let Err(err) = Self::background(conf, remote_storage, tenants, &tenant).await {
|
||||
if let Err(err) =
|
||||
Self::background(guard, conf, remote_storage, tenants, &tenant).await
|
||||
{
|
||||
error!("Error: {err:#}");
|
||||
tenant.set_broken(format!("{err:#}")).await;
|
||||
};
|
||||
@@ -440,6 +486,7 @@ impl DeleteTenantFlow {
|
||||
}
|
||||
|
||||
async fn background(
|
||||
mut guard: OwnedMutexGuard<Self>,
|
||||
conf: &PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
@@ -503,6 +550,8 @@ impl DeleteTenantFlow {
|
||||
.set(locked.len() as u64);
|
||||
}
|
||||
|
||||
*guard = Self::Finished;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2600,8 +2600,6 @@ impl Timeline {
|
||||
)
|
||||
};
|
||||
|
||||
pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
@@ -110,6 +110,35 @@ async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTi
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// We delete local files first, so if pageserver restarts after local files deletion then remote deletion is not continued.
|
||||
// This can be solved with inversion of these steps. But even if these steps are inverted then, when index_part.json
|
||||
// gets deleted there is no way to distinguish between "this timeline is good, we just didnt upload it to remote"
|
||||
// and "this timeline is deleted we should continue with removal of local state". So to avoid the ambiguity we use a mark file.
|
||||
// After index part is deleted presence of this mark file indentifies that it was a deletion intention.
|
||||
// So we can just remove the mark file.
|
||||
async fn create_delete_mark(
|
||||
conf: &PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
fail::fail_point!("timeline-delete-before-delete-mark", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: timeline-delete-before-delete-mark"
|
||||
))?
|
||||
});
|
||||
let marker_path = conf.timeline_delete_mark_file_path(tenant_id, timeline_id);
|
||||
|
||||
// Note: we're ok to replace existing file.
|
||||
let _ = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(&marker_path)
|
||||
.with_context(|| format!("could not create delete marker file {marker_path:?}"))?;
|
||||
|
||||
crashsafe::fsync_file_and_parent(&marker_path).context("sync_mark")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Grab the layer_removal_cs lock, and actually perform the deletion.
|
||||
///
|
||||
/// This lock prevents prevents GC or compaction from running at the same time.
|
||||
@@ -282,8 +311,6 @@ async fn cleanup_remaining_timeline_fs_traces(
|
||||
.context("fsync_pre_mark_remove")?;
|
||||
|
||||
// Remove delete mark
|
||||
// TODO: once we are confident that no more exist in the field, remove this
|
||||
// line. It cleans up a legacy marker file that might in rare cases be present.
|
||||
tokio::fs::remove_file(conf.timeline_delete_mark_file_path(tenant_id, timeline_id))
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
@@ -364,6 +391,8 @@ impl DeleteTimelineFlow {
|
||||
|
||||
set_deleted_in_remote_index(&timeline).await?;
|
||||
|
||||
create_delete_mark(tenant.conf, timeline.tenant_id, timeline.timeline_id).await?;
|
||||
|
||||
fail::fail_point!("timeline-delete-before-schedule", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: timeline-delete-before-schedule"
|
||||
@@ -435,6 +464,10 @@ impl DeleteTimelineFlow {
|
||||
|
||||
guard.mark_in_progress()?;
|
||||
|
||||
// Note that delete mark can be missing on resume
|
||||
// because we create delete mark after we set deleted_at in the index part.
|
||||
create_delete_mark(tenant.conf, tenant.tenant_id, timeline_id).await?;
|
||||
|
||||
Self::schedule_background(guard, tenant.conf, tenant, timeline);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -2,10 +2,20 @@ use std::{collections::hash_map::Entry, fs, sync::Arc};
|
||||
|
||||
use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
use tracing::{error, info, info_span, warn};
|
||||
use utils::{crashsafe, fs_ext, id::TimelineId, lsn::Lsn};
|
||||
use tracing::{info, info_span, warn};
|
||||
use utils::{
|
||||
crashsafe,
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::{context::RequestContext, import_datadir, tenant::Tenant};
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
context::RequestContext,
|
||||
import_datadir,
|
||||
tenant::Tenant,
|
||||
virtual_file::{on_fatal_io_error, MaybeFatalIo},
|
||||
};
|
||||
|
||||
use super::Timeline;
|
||||
|
||||
@@ -45,20 +55,12 @@ impl<'t> UninitializedTimeline<'t> {
|
||||
let timeline_id = self.timeline_id;
|
||||
let tenant_id = self.owning_tenant.tenant_id;
|
||||
|
||||
if self.raw_timeline.is_none() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"No timeline for initialization found for {tenant_id}/{timeline_id}"
|
||||
));
|
||||
}
|
||||
let (new_timeline, uninit_mark) = self.raw_timeline.take().with_context(|| {
|
||||
format!("No timeline for initalization found for {tenant_id}/{timeline_id}")
|
||||
})?;
|
||||
|
||||
// Check that the caller initialized disk_consistent_lsn
|
||||
let new_disk_consistent_lsn = self
|
||||
.raw_timeline
|
||||
.as_ref()
|
||||
.expect("checked above")
|
||||
.0
|
||||
.get_disk_consistent_lsn();
|
||||
|
||||
let new_disk_consistent_lsn = new_timeline.get_disk_consistent_lsn();
|
||||
anyhow::ensure!(
|
||||
new_disk_consistent_lsn.is_valid(),
|
||||
"new timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
|
||||
@@ -70,25 +72,13 @@ impl<'t> UninitializedTimeline<'t> {
|
||||
"Found freshly initialized timeline {tenant_id}/{timeline_id} in the tenant map"
|
||||
),
|
||||
Entry::Vacant(v) => {
|
||||
// after taking here should be no fallible operations, because the drop guard will not
|
||||
// cleanup after and would block for example the tenant deletion
|
||||
let (new_timeline, uninit_mark) =
|
||||
self.raw_timeline.take().expect("already checked");
|
||||
|
||||
// this is the mutual exclusion between different retries to create the timeline;
|
||||
// this should be an assertion.
|
||||
uninit_mark.remove_uninit_mark().with_context(|| {
|
||||
format!(
|
||||
"Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}"
|
||||
)
|
||||
})?;
|
||||
uninit_mark.remove_uninit_mark();
|
||||
v.insert(Arc::clone(&new_timeline));
|
||||
|
||||
new_timeline.maybe_spawn_flush_loop();
|
||||
|
||||
Ok(new_timeline)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(new_timeline)
|
||||
}
|
||||
|
||||
/// Prepares timeline data by loading it from the basebackup archive.
|
||||
@@ -141,29 +131,6 @@ impl<'t> UninitializedTimeline<'t> {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for UninitializedTimeline<'_> {
|
||||
fn drop(&mut self) {
|
||||
if let Some((_, uninit_mark)) = self.raw_timeline.take() {
|
||||
let _entered = info_span!("drop_uninitialized_timeline", tenant_id = %self.owning_tenant.tenant_id, timeline_id = %self.timeline_id).entered();
|
||||
error!("Timeline got dropped without initializing, cleaning its files");
|
||||
cleanup_timeline_directory(uninit_mark);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) {
|
||||
let timeline_path = &uninit_mark.timeline_path;
|
||||
match fs_ext::ignore_absent_files(|| fs::remove_dir_all(timeline_path)) {
|
||||
Ok(()) => {
|
||||
info!("Timeline dir {timeline_path:?} removed successfully, removing the uninit mark")
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
|
||||
}
|
||||
}
|
||||
drop(uninit_mark); // mark handles its deletion on drop, gets retained if timeline dir exists
|
||||
}
|
||||
|
||||
/// An uninit mark file, created along the timeline dir to ensure the timeline either gets fully initialized and loaded into pageserver's memory,
|
||||
/// or gets removed eventually.
|
||||
///
|
||||
@@ -173,58 +140,133 @@ pub(crate) struct TimelineUninitMark {
|
||||
uninit_mark_deleted: bool,
|
||||
uninit_mark_path: Utf8PathBuf,
|
||||
pub(crate) timeline_path: Utf8PathBuf,
|
||||
common_parent: Utf8PathBuf,
|
||||
}
|
||||
|
||||
impl TimelineUninitMark {
|
||||
pub(crate) fn new(uninit_mark_path: Utf8PathBuf, timeline_path: Utf8PathBuf) -> Self {
|
||||
Self {
|
||||
pub(crate) fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<Self> {
|
||||
let timeline_path = conf.timeline_path(&tenant_id, &timeline_id);
|
||||
let uninit_mark_path = conf.timeline_uninit_mark_file_path(tenant_id, timeline_id);
|
||||
|
||||
// assert they share the same parent
|
||||
let timeline_parent_path = timeline_path
|
||||
.parent()
|
||||
.expect("timeline_path must have a parent");
|
||||
let uninit_mark_parent_path = uninit_mark_path
|
||||
.parent()
|
||||
.expect("uninit mark path must have a parent");
|
||||
assert_eq!(timeline_parent_path, uninit_mark_parent_path);
|
||||
let common_parent = uninit_mark_parent_path;
|
||||
|
||||
// crate the uninit file
|
||||
let _ = fs::OpenOptions::new()
|
||||
.create_new(true)
|
||||
.write(true)
|
||||
.open(&uninit_mark_path)
|
||||
.context("create uninit mark file")?;
|
||||
crashsafe::fsync_file_and_parent(common_parent).context("fsync uninit mark file")?;
|
||||
|
||||
Ok(Self {
|
||||
uninit_mark_deleted: false,
|
||||
common_parent: common_parent.to_owned(),
|
||||
uninit_mark_path,
|
||||
timeline_path,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn remove_uninit_mark(mut self) -> anyhow::Result<()> {
|
||||
if !self.uninit_mark_deleted {
|
||||
self.delete_mark_file_if_present()?;
|
||||
}
|
||||
fn remove_uninit_mark(mut self) {
|
||||
// remove the uninit mark
|
||||
fs::remove_file(&self.uninit_mark_path).fatal_err(&format!(
|
||||
"TimelineUninitMark::drop: remove_file uninit mark: {}",
|
||||
self.uninit_mark_path
|
||||
));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
// fsync to persist the removal
|
||||
crashsafe::fsync(&self.common_parent).fatal_err(&format!(
|
||||
"TimelineUninitMark::drop: fsync common parent dir: {}",
|
||||
self.common_parent
|
||||
));
|
||||
|
||||
fn delete_mark_file_if_present(&mut self) -> anyhow::Result<()> {
|
||||
let uninit_mark_file = &self.uninit_mark_path;
|
||||
let uninit_mark_parent = uninit_mark_file
|
||||
.parent()
|
||||
.with_context(|| format!("Uninit mark file {uninit_mark_file:?} has no parent"))?;
|
||||
fs_ext::ignore_absent_files(|| fs::remove_file(uninit_mark_file)).with_context(|| {
|
||||
format!("Failed to remove uninit mark file at path {uninit_mark_file:?}")
|
||||
})?;
|
||||
crashsafe::fsync(uninit_mark_parent).context("Failed to fsync uninit mark parent")?;
|
||||
self.uninit_mark_deleted = true;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TimelineUninitMark {
|
||||
fn drop(&mut self) {
|
||||
if !self.uninit_mark_deleted {
|
||||
if self.timeline_path.exists() {
|
||||
error!(
|
||||
"Uninit mark {} is not removed, timeline {} stays uninitialized",
|
||||
self.uninit_mark_path, self.timeline_path
|
||||
)
|
||||
} else {
|
||||
// unblock later timeline creation attempts
|
||||
warn!(
|
||||
"Removing intermediate uninit mark file {}",
|
||||
// unblock later timeline creation attempts
|
||||
let _entered =
|
||||
info_span!("TimelineUninitMark_drop", timeline_path=%self.timeline_path).entered();
|
||||
warn!("removing timeline dir and uninit mark file");
|
||||
|
||||
// sanity-check: ensure the uninit mark file still exists on disk
|
||||
let uninit_mark_file_exists = self.uninit_mark_path.try_exists().fatal_err(&format!(
|
||||
"TimelineUninitMark::drop: stat() uninit mark file: {}",
|
||||
self.uninit_mark_path
|
||||
));
|
||||
if !uninit_mark_file_exists {
|
||||
panic!(
|
||||
"uninit mark file assumed to exists but doesn't: {}",
|
||||
self.uninit_mark_path
|
||||
);
|
||||
if let Err(e) = self.delete_mark_file_if_present() {
|
||||
error!("Failed to remove the uninit mark file: {e}")
|
||||
}
|
||||
|
||||
// recursively delete `timeline_path`, ignoring NotFound errors and aborting the process on all others.
|
||||
match fs::remove_dir_all(&self.timeline_path) {
|
||||
Ok(()) => {
|
||||
info!("timeline dir removed successfully");
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
// this can happen both if the timeline_path does not exist
|
||||
// and if the timeline_path exists and there's another thread
|
||||
// still operating on that directory and our remove_dir_all call
|
||||
// effectively got hit by time-of-check vs time-of-use.
|
||||
// Disambiguate by calling remove_dir against the timeline_path
|
||||
match std::fs::remove_dir(&self.timeline_path) {
|
||||
Ok(()) => {
|
||||
warn!("retrying timeline dir removal succeeded after NotFound, this is indicative of a race condition");
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
// this is the good case: the first NotFound was because the dir didn't exist
|
||||
info!("timeline dir does not exist");
|
||||
}
|
||||
Err(e) => {
|
||||
on_fatal_io_error(&e, &format!("TimelineUninitMark::drop: remove_dir_all failed with NotFound, then remove_dir failed: {}", self.timeline_path));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
on_fatal_io_error(
|
||||
&e,
|
||||
&format!(
|
||||
"TimelineUninitMark::drop: delete timeline directory: {:?}",
|
||||
self.timeline_path
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// fsync to order timelines_dir removal before unint mark removal
|
||||
crashsafe::fsync(&self.common_parent).fatal_err(&format!(
|
||||
"TimelineUninitMark::drop: fsync after timeline dir removal: {}",
|
||||
self.common_parent,
|
||||
));
|
||||
|
||||
// remove the uninit mark
|
||||
fs::remove_file(&self.common_parent).fatal_err(&format!(
|
||||
"TimelineUninitMark::drop: remove_file uninit mark: {}",
|
||||
self.common_parent,
|
||||
));
|
||||
|
||||
// fsync to persist the removal
|
||||
crashsafe::fsync(&self.common_parent).fatal_err(&format!(
|
||||
"TimelineUninitMark::drop: fsync common parent dir: {}",
|
||||
self.common_parent,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,10 +41,6 @@ from urllib3.util.retry import Retry
|
||||
|
||||
from fixtures.broker import NeonBroker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.allowed_errors import (
|
||||
DEFAULT_PAGESERVER_ALLOWED_ERRORS,
|
||||
scan_pageserver_log_for_errors,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.types import IndexPartDump
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
@@ -434,6 +430,8 @@ class NeonEnvBuilder:
|
||||
|
||||
# Pageserver remote storage
|
||||
self.pageserver_remote_storage = pageserver_remote_storage
|
||||
# Extensions remote storage
|
||||
self.ext_remote_storage: Optional[S3Storage] = None
|
||||
# Safekeepers remote storage
|
||||
self.sk_remote_storage: Optional[RemoteStorage] = None
|
||||
|
||||
@@ -532,6 +530,24 @@ class NeonEnvBuilder:
|
||||
)
|
||||
self.pageserver_remote_storage = ret
|
||||
|
||||
def enable_extensions_remote_storage(self, kind: RemoteStorageKind):
|
||||
assert self.ext_remote_storage is None, "already configured extensions remote storage"
|
||||
|
||||
# there is an assumption that REAL_S3 for extensions is never
|
||||
# cleaned up these are also special in that they have a hardcoded
|
||||
# bucket and region, which is most likely the same as our normal
|
||||
ext = self._configure_and_create_remote_storage(
|
||||
kind,
|
||||
RemoteStorageUser.EXTENSIONS,
|
||||
bucket_name="neon-dev-extensions-eu-central-1",
|
||||
bucket_region="eu-central-1",
|
||||
)
|
||||
assert isinstance(
|
||||
ext, S3Storage
|
||||
), "unsure why, but only MOCK_S3 and REAL_S3 are currently supported for extensions"
|
||||
ext.cleanup = False
|
||||
self.ext_remote_storage = ext
|
||||
|
||||
def enable_safekeeper_remote_storage(self, kind: RemoteStorageKind):
|
||||
assert self.sk_remote_storage is None, "sk_remote_storage already configured"
|
||||
|
||||
@@ -588,7 +604,8 @@ class NeonEnvBuilder:
|
||||
directory_to_clean.rmdir()
|
||||
|
||||
def cleanup_remote_storage(self):
|
||||
for x in [self.pageserver_remote_storage, self.sk_remote_storage]:
|
||||
# extensions are currently not cleaned up, disabled when creating
|
||||
for x in [self.pageserver_remote_storage, self.ext_remote_storage, self.sk_remote_storage]:
|
||||
if isinstance(x, S3Storage):
|
||||
x.do_cleanup()
|
||||
|
||||
@@ -692,6 +709,7 @@ class NeonEnv:
|
||||
self.pageservers: List[NeonPageserver] = []
|
||||
self.broker = config.broker
|
||||
self.pageserver_remote_storage = config.pageserver_remote_storage
|
||||
self.ext_remote_storage = config.ext_remote_storage
|
||||
self.safekeepers_remote_storage = config.sk_remote_storage
|
||||
self.pg_version = config.pg_version
|
||||
# Binary path for pageserver, safekeeper, etc
|
||||
@@ -1447,7 +1465,12 @@ class NeonCli(AbstractNeonCli):
|
||||
if pageserver_id is not None:
|
||||
args.extend(["--pageserver-id", str(pageserver_id)])
|
||||
|
||||
res = self.raw_cli(args)
|
||||
storage = self.env.ext_remote_storage
|
||||
s3_env_vars = None
|
||||
if isinstance(storage, S3Storage):
|
||||
s3_env_vars = storage.access_env_vars()
|
||||
|
||||
res = self.raw_cli(args, extra_env_vars=s3_env_vars)
|
||||
res.check_returncode()
|
||||
return res
|
||||
|
||||
@@ -1619,7 +1642,57 @@ class NeonPageserver(PgProtocol):
|
||||
# env.pageserver.allowed_errors.append(".*could not open garage door.*")
|
||||
#
|
||||
# The entries in the list are regular experessions.
|
||||
self.allowed_errors: List[str] = list(DEFAULT_PAGESERVER_ALLOWED_ERRORS)
|
||||
self.allowed_errors = [
|
||||
# All tests print these, when starting up or shutting down
|
||||
".*wal receiver task finished with an error: walreceiver connection handling failure.*",
|
||||
".*Shutdown task error: walreceiver connection handling failure.*",
|
||||
".*wal_connection_manager.*tcp connect error: Connection refused.*",
|
||||
".*query handler for .* failed: Socket IO error: Connection reset by peer.*",
|
||||
".*serving compute connection task.*exited with error: Postgres connection error.*",
|
||||
".*serving compute connection task.*exited with error: Connection reset by peer.*",
|
||||
".*serving compute connection task.*exited with error: Postgres query error.*",
|
||||
".*Connection aborted: error communicating with the server: Transport endpoint is not connected.*",
|
||||
# FIXME: replication patch for tokio_postgres regards any but CopyDone/CopyData message in CopyBoth stream as unexpected
|
||||
".*Connection aborted: unexpected message from server*",
|
||||
".*kill_and_wait_impl.*: wait successful.*",
|
||||
".*query handler for 'pagestream.*failed: Broken pipe.*", # pageserver notices compute shut down
|
||||
".*query handler for 'pagestream.*failed: Connection reset by peer.*", # pageserver notices compute shut down
|
||||
# safekeeper connection can fail with this, in the window between timeline creation
|
||||
# and streaming start
|
||||
".*Failed to process query for timeline .*: state uninitialized, no data to read.*",
|
||||
# Tests related to authentication and authorization print these
|
||||
".*Error processing HTTP request: Forbidden",
|
||||
# intentional failpoints
|
||||
".*failpoint ",
|
||||
# FIXME: These need investigation
|
||||
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
|
||||
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
|
||||
".*Removing intermediate uninit mark file.*",
|
||||
# Tenant::delete_timeline() can cause any of the four following errors.
|
||||
# FIXME: we shouldn't be considering it an error: https://github.com/neondatabase/neon/issues/2946
|
||||
".*could not flush frozen layer.*queue is in state Stopped", # when schedule layer upload fails because queued got closed before compaction got killed
|
||||
".*wait for layer upload ops to complete.*", # .*Caused by:.*wait_completion aborted because upload queue was stopped
|
||||
".*gc_loop.*Gc failed, retrying in.*timeline is Stopping", # When gc checks timeline state after acquiring layer_removal_cs
|
||||
".*gc_loop.*Gc failed, retrying in.*: Cannot run GC iteration on inactive tenant", # Tenant::gc precondition
|
||||
".*compaction_loop.*Compaction failed.*, retrying in.*timeline or pageserver is shutting down", # When compaction checks timeline state after acquiring layer_removal_cs
|
||||
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
|
||||
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
|
||||
".*task iteration took longer than the configured period.*",
|
||||
# this is until #3501
|
||||
".*Compaction failed.*, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
|
||||
# these can happen anytime we do compactions from background task and shutdown pageserver
|
||||
r".*ERROR.*ancestor timeline \S+ is being stopped",
|
||||
# this is expected given our collaborative shutdown approach for the UploadQueue
|
||||
".*Compaction failed.*, retrying in .*: Other\\(queue is in state Stopped.*",
|
||||
# Pageserver timeline deletion should be polled until it gets 404, so ignore it globally
|
||||
".*Error processing HTTP request: NotFound: Timeline .* was not found",
|
||||
".*took more than expected to complete.*",
|
||||
# these can happen during shutdown, but it should not be a reason to fail a test
|
||||
".*completed, took longer than expected.*",
|
||||
# AWS S3 may emit 500 errors for keys in a DeleteObjects response: we retry these
|
||||
# and it is not a failure of our code when it happens.
|
||||
".*DeleteObjects.*We encountered an internal error. Please try again.*",
|
||||
]
|
||||
|
||||
def timeline_dir(self, tenant_id: TenantId, timeline_id: Optional[TimelineId] = None) -> Path:
|
||||
"""Get a timeline directory's path based on the repo directory of the test environment"""
|
||||
@@ -1729,9 +1802,27 @@ class NeonPageserver(PgProtocol):
|
||||
|
||||
def assert_no_errors(self):
|
||||
logfile = open(os.path.join(self.workdir, "pageserver.log"), "r")
|
||||
errors = scan_pageserver_log_for_errors(logfile, self.allowed_errors)
|
||||
error_or_warn = re.compile(r"\s(ERROR|WARN)")
|
||||
errors = []
|
||||
while True:
|
||||
line = logfile.readline()
|
||||
if not line:
|
||||
break
|
||||
|
||||
for _lineno, error in errors:
|
||||
if error_or_warn.search(line):
|
||||
# Is this a torn log line? This happens when force-killing a process and restarting
|
||||
# Example: "2023-10-25T09:38:31.752314Z WARN deletion executo2023-10-25T09:38:31.875947Z INFO version: git-env:0f9452f76e8ccdfc88291bccb3f53e3016f40192"
|
||||
if re.match("\\d{4}-\\d{2}-\\d{2}T.+\\d{4}-\\d{2}-\\d{2}T.+INFO version.+", line):
|
||||
continue
|
||||
|
||||
# It's an ERROR or WARN. Is it in the allow-list?
|
||||
for a in self.allowed_errors:
|
||||
if re.match(a, line):
|
||||
break
|
||||
else:
|
||||
errors.append(line)
|
||||
|
||||
for error in errors:
|
||||
log.info(f"not allowed error: {error.strip()}")
|
||||
|
||||
assert not errors
|
||||
@@ -2555,17 +2646,6 @@ class Endpoint(PgProtocol):
|
||||
with open(config_path, "w") as file:
|
||||
json.dump(dict(data_dict, **kwargs), file, indent=4)
|
||||
|
||||
# Mock the extension part of spec passed from control plane for local testing
|
||||
# endpooint.rs adds content of this file as a part of the spec.json
|
||||
def create_remote_extension_spec(self, spec: dict[str, Any]):
|
||||
"""Create a remote extension spec file for the endpoint."""
|
||||
remote_extensions_spec_path = os.path.join(
|
||||
self.endpoint_path(), "remote_extensions_spec.json"
|
||||
)
|
||||
|
||||
with open(remote_extensions_spec_path, "w") as file:
|
||||
json.dump(spec, file, indent=4)
|
||||
|
||||
def stop(self) -> "Endpoint":
|
||||
"""
|
||||
Stop the Postgres instance if it's running.
|
||||
|
||||
@@ -1,116 +0,0 @@
|
||||
#! /usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import re
|
||||
import sys
|
||||
from typing import Iterable, List, Tuple
|
||||
|
||||
|
||||
def scan_pageserver_log_for_errors(
|
||||
input: Iterable[str], allowed_errors: List[str]
|
||||
) -> List[Tuple[int, str]]:
|
||||
error_or_warn = re.compile(r"\s(ERROR|WARN)")
|
||||
errors = []
|
||||
for lineno, line in enumerate(input, start=1):
|
||||
if len(line) == 0:
|
||||
continue
|
||||
|
||||
if error_or_warn.search(line):
|
||||
# Is this a torn log line? This happens when force-killing a process and restarting
|
||||
# Example: "2023-10-25T09:38:31.752314Z WARN deletion executo2023-10-25T09:38:31.875947Z INFO version: git-env:0f9452f76e8ccdfc88291bccb3f53e3016f40192"
|
||||
if re.match("\\d{4}-\\d{2}-\\d{2}T.+\\d{4}-\\d{2}-\\d{2}T.+INFO version.+", line):
|
||||
continue
|
||||
|
||||
# It's an ERROR or WARN. Is it in the allow-list?
|
||||
for a in allowed_errors:
|
||||
if re.match(a, line):
|
||||
break
|
||||
else:
|
||||
errors.append((lineno, line))
|
||||
return errors
|
||||
|
||||
|
||||
DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
|
||||
# All tests print these, when starting up or shutting down
|
||||
".*wal receiver task finished with an error: walreceiver connection handling failure.*",
|
||||
".*Shutdown task error: walreceiver connection handling failure.*",
|
||||
".*wal_connection_manager.*tcp connect error: Connection refused.*",
|
||||
".*query handler for .* failed: Socket IO error: Connection reset by peer.*",
|
||||
".*serving compute connection task.*exited with error: Postgres connection error.*",
|
||||
".*serving compute connection task.*exited with error: Connection reset by peer.*",
|
||||
".*serving compute connection task.*exited with error: Postgres query error.*",
|
||||
".*Connection aborted: error communicating with the server: Transport endpoint is not connected.*",
|
||||
# FIXME: replication patch for tokio_postgres regards any but CopyDone/CopyData message in CopyBoth stream as unexpected
|
||||
".*Connection aborted: unexpected message from server*",
|
||||
".*kill_and_wait_impl.*: wait successful.*",
|
||||
".*query handler for 'pagestream.*failed: Broken pipe.*", # pageserver notices compute shut down
|
||||
".*query handler for 'pagestream.*failed: Connection reset by peer.*", # pageserver notices compute shut down
|
||||
# safekeeper connection can fail with this, in the window between timeline creation
|
||||
# and streaming start
|
||||
".*Failed to process query for timeline .*: state uninitialized, no data to read.*",
|
||||
# Tests related to authentication and authorization print these
|
||||
".*Error processing HTTP request: Forbidden",
|
||||
# intentional failpoints
|
||||
".*failpoint ",
|
||||
# FIXME: These need investigation
|
||||
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
|
||||
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
|
||||
".*Removing intermediate uninit mark file.*",
|
||||
# Tenant::delete_timeline() can cause any of the four following errors.
|
||||
# FIXME: we shouldn't be considering it an error: https://github.com/neondatabase/neon/issues/2946
|
||||
".*could not flush frozen layer.*queue is in state Stopped", # when schedule layer upload fails because queued got closed before compaction got killed
|
||||
".*wait for layer upload ops to complete.*", # .*Caused by:.*wait_completion aborted because upload queue was stopped
|
||||
".*gc_loop.*Gc failed, retrying in.*timeline is Stopping", # When gc checks timeline state after acquiring layer_removal_cs
|
||||
".*gc_loop.*Gc failed, retrying in.*: Cannot run GC iteration on inactive tenant", # Tenant::gc precondition
|
||||
".*compaction_loop.*Compaction failed.*, retrying in.*timeline or pageserver is shutting down", # When compaction checks timeline state after acquiring layer_removal_cs
|
||||
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
|
||||
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
|
||||
".*task iteration took longer than the configured period.*",
|
||||
# these can happen anytime we do compactions from background task and shutdown pageserver
|
||||
r".*ERROR.*ancestor timeline \S+ is being stopped",
|
||||
# this is expected given our collaborative shutdown approach for the UploadQueue
|
||||
".*Compaction failed.*, retrying in .*: Other\\(queue is in state Stopped.*",
|
||||
".*Compaction failed.*, retrying in .*: ShuttingDown",
|
||||
# Pageserver timeline deletion should be polled until it gets 404, so ignore it globally
|
||||
".*Error processing HTTP request: NotFound: Timeline .* was not found",
|
||||
".*took more than expected to complete.*",
|
||||
# these can happen during shutdown, but it should not be a reason to fail a test
|
||||
".*completed, took longer than expected.*",
|
||||
# AWS S3 may emit 500 errors for keys in a DeleteObjects response: we retry these
|
||||
# and it is not a failure of our code when it happens.
|
||||
".*DeleteObjects.*We encountered an internal error. Please try again.*",
|
||||
)
|
||||
|
||||
|
||||
def _check_allowed_errors(input):
|
||||
allowed_errors: List[str] = list(DEFAULT_PAGESERVER_ALLOWED_ERRORS)
|
||||
|
||||
# add any test specifics here; cli parsing is not provided for the
|
||||
# difficulty of copypasting regexes as arguments without any quoting
|
||||
# errors.
|
||||
|
||||
errors = scan_pageserver_log_for_errors(input, allowed_errors)
|
||||
|
||||
for lineno, error in errors:
|
||||
print(f"-:{lineno}: {error.strip()}", file=sys.stderr)
|
||||
|
||||
print(f"\n{len(errors)} not allowed errors", file=sys.stderr)
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(
|
||||
description="check input against pageserver global allowed_errors"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-i",
|
||||
"--input",
|
||||
type=argparse.FileType("r"),
|
||||
default=sys.stdin,
|
||||
help="Pageserver logs file. Reads from stdin if no file is provided.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
errors = _check_allowed_errors(args.input)
|
||||
|
||||
sys.exit(len(errors) > 0)
|
||||
@@ -411,6 +411,7 @@ def check_neon_works(
|
||||
config.initial_tenant = snapshot_config["default_tenant_id"]
|
||||
config.pg_distrib_dir = pg_distrib_dir
|
||||
config.remote_storage = None
|
||||
config.ext_remote_storage = None
|
||||
config.sk_remote_storage = None
|
||||
|
||||
# Use the "target" binaries to launch the storage nodes
|
||||
|
||||
@@ -1,165 +1,316 @@
|
||||
import os
|
||||
import shutil
|
||||
import threading
|
||||
from contextlib import closing
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
from fixtures.pg_version import PgVersion, skip_on_postgres
|
||||
from fixtures.remote_storage import (
|
||||
RemoteStorageKind,
|
||||
S3Storage,
|
||||
available_s3_storages,
|
||||
)
|
||||
|
||||
|
||||
# Check that the extension is not already in the share_dir_path_ext
|
||||
# if it is, skip the test
|
||||
#
|
||||
# After the test is done, cleanup the control file and the extension directory
|
||||
@pytest.fixture(scope="function")
|
||||
def ext_file_cleanup(pg_bin):
|
||||
out = pg_bin.run_capture("pg_config --sharedir".split())
|
||||
share_dir_path = Path(f"{out}.stdout").read_text().strip()
|
||||
log.info(f"share_dir_path: {share_dir_path}")
|
||||
share_dir_path_ext = os.path.join(share_dir_path, "extension")
|
||||
# Cleaning up downloaded files is important for local tests
|
||||
# or else one test could reuse the files from another test or another test run
|
||||
def cleanup(pg_version):
|
||||
PGDIR = Path(f"pg_install/v{pg_version}")
|
||||
|
||||
log.info(f"share_dir_path_ext: {share_dir_path_ext}")
|
||||
LIB_DIR = PGDIR / Path("lib/postgresql")
|
||||
cleanup_lib_globs = ["anon*", "postgis*", "pg_buffercache*"]
|
||||
cleanup_lib_glob_paths = [LIB_DIR.glob(x) for x in cleanup_lib_globs]
|
||||
|
||||
# if file is already in the share_dir_path_ext, skip the test
|
||||
if os.path.isfile(os.path.join(share_dir_path_ext, "anon.control")):
|
||||
log.info("anon.control is already in the share_dir_path_ext, skipping the test")
|
||||
yield False
|
||||
return
|
||||
else:
|
||||
yield True
|
||||
SHARE_DIR = PGDIR / Path("share/postgresql/extension")
|
||||
cleanup_ext_globs = [
|
||||
"anon*",
|
||||
"address_standardizer*",
|
||||
"postgis*",
|
||||
"pageinspect*",
|
||||
"pg_buffercache*",
|
||||
"pgrouting*",
|
||||
]
|
||||
cleanup_ext_glob_paths = [SHARE_DIR.glob(x) for x in cleanup_ext_globs]
|
||||
|
||||
# cleanup the control file
|
||||
if os.path.isfile(os.path.join(share_dir_path_ext, "anon.control")):
|
||||
os.unlink(os.path.join(share_dir_path_ext, "anon.control"))
|
||||
log.info("anon.control was removed from the share_dir_path_ext")
|
||||
all_glob_paths = cleanup_lib_glob_paths + cleanup_ext_glob_paths
|
||||
all_cleanup_files = []
|
||||
for file_glob in all_glob_paths:
|
||||
for file in file_glob:
|
||||
all_cleanup_files.append(file)
|
||||
|
||||
# remove the extension directory recursively
|
||||
if os.path.isdir(os.path.join(share_dir_path_ext, "anon")):
|
||||
directories_to_clean: List[Path] = []
|
||||
for f in Path(os.path.join(share_dir_path_ext, "anon")).iterdir():
|
||||
if f.is_file():
|
||||
log.info(f"Removing file {f}")
|
||||
f.unlink()
|
||||
elif f.is_dir():
|
||||
directories_to_clean.append(f)
|
||||
for file in all_cleanup_files:
|
||||
try:
|
||||
os.remove(file)
|
||||
log.info(f"removed file {file}")
|
||||
except Exception as err:
|
||||
log.info(
|
||||
f"skipping remove of file {file} because it doesn't exist.\
|
||||
this may be expected or unexpected depending on the test {err}"
|
||||
)
|
||||
|
||||
for directory_to_clean in reversed(directories_to_clean):
|
||||
if not os.listdir(directory_to_clean):
|
||||
log.info(f"Removing empty directory {directory_to_clean}")
|
||||
directory_to_clean.rmdir()
|
||||
|
||||
os.rmdir(os.path.join(share_dir_path_ext, "anon"))
|
||||
log.info("anon directory was removed from the share_dir_path_ext")
|
||||
cleanup_folders = [SHARE_DIR / Path("anon"), PGDIR / Path("download_extensions")]
|
||||
for folder in cleanup_folders:
|
||||
try:
|
||||
shutil.rmtree(folder)
|
||||
log.info(f"removed folder {folder}")
|
||||
except Exception as err:
|
||||
log.info(
|
||||
f"skipping remove of folder {folder} because it doesn't exist.\
|
||||
this may be expected or unexpected depending on the test {err}"
|
||||
)
|
||||
|
||||
|
||||
def upload_files(env):
|
||||
log.info("Uploading test files to mock bucket")
|
||||
os.chdir("test_runner/regress/data/extension_test")
|
||||
for path in os.walk("."):
|
||||
prefix, _, files = path
|
||||
for file in files:
|
||||
# the [2:] is to remove the leading "./"
|
||||
full_path = os.path.join(prefix, file)[2:]
|
||||
|
||||
with open(full_path, "rb") as f:
|
||||
log.info(f"UPLOAD {full_path} to ext/{full_path}")
|
||||
assert isinstance(env.pageserver_remote_storage, S3Storage)
|
||||
env.pageserver_remote_storage.client.upload_fileobj(
|
||||
f,
|
||||
env.ext_remote_storage.bucket_name,
|
||||
f"ext/{full_path}",
|
||||
)
|
||||
os.chdir("../../../..")
|
||||
|
||||
|
||||
# Test downloading remote extension.
|
||||
@skip_on_postgres(PgVersion.V16, reason="TODO: PG16 extension building")
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
|
||||
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
|
||||
def test_remote_extensions(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver_listen_address,
|
||||
pg_version,
|
||||
ext_file_cleanup,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
if ext_file_cleanup is False:
|
||||
log.info("test_remote_extensions skipped")
|
||||
return
|
||||
|
||||
if pg_version == PgVersion.V16:
|
||||
pytest.skip("TODO: PG16 extension building")
|
||||
|
||||
# setup mock http server
|
||||
# that expects request for anon.tar.zst
|
||||
# and returns the requested file
|
||||
(host, port) = httpserver_listen_address
|
||||
extensions_endpoint = f"http://{host}:{port}/pg-ext-s3-gateway"
|
||||
|
||||
archive_path = f"latest/v{pg_version}/extensions/anon.tar.zst"
|
||||
|
||||
def endpoint_handler_build_tag(request: Request) -> Response:
|
||||
log.info(f"request: {request}")
|
||||
|
||||
file_name = "anon.tar.zst"
|
||||
file_path = f"test_runner/regress/data/extension_test/5670669815/v{pg_version}/extensions/anon.tar.zst"
|
||||
file_size = os.path.getsize(file_path)
|
||||
fh = open(file_path, "rb")
|
||||
return Response(
|
||||
fh,
|
||||
mimetype="application/octet-stream",
|
||||
headers=[
|
||||
("Content-Length", str(file_size)),
|
||||
("Content-Disposition", 'attachment; filename="%s"' % file_name),
|
||||
],
|
||||
direct_passthrough=True,
|
||||
)
|
||||
|
||||
httpserver.expect_request(
|
||||
f"/pg-ext-s3-gateway/{archive_path}", method="GET"
|
||||
).respond_with_handler(endpoint_handler_build_tag)
|
||||
|
||||
# Start a compute node with remote_extension spec
|
||||
# and check that it can download the extensions and use them to CREATE EXTENSION.
|
||||
neon_env_builder.enable_extensions_remote_storage(remote_storage_kind)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
|
||||
endpoint = env.endpoints.create(
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
|
||||
# For MOCK_S3 we upload test files.
|
||||
# For REAL_S3 we use the files already in the bucket
|
||||
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
upload_files(env)
|
||||
|
||||
# Start a compute node and check that it can download the extensions
|
||||
# and use them to CREATE EXTENSION and LOAD
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_remote_extensions",
|
||||
tenant_id=tenant_id,
|
||||
config_lines=["log_min_messages=debug3"],
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
# config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
|
||||
# mock remote_extensions spec
|
||||
spec: Dict[str, Any] = {
|
||||
"library_index": {
|
||||
"anon": "anon",
|
||||
},
|
||||
"extension_data": {
|
||||
"anon": {
|
||||
"archive_path": "",
|
||||
"control_data": {
|
||||
"anon.control": "# PostgreSQL Anonymizer (anon) extension\ncomment = 'Data anonymization tools'\ndefault_version = '1.1.0'\ndirectory='extension/anon'\nrelocatable = false\nrequires = 'pgcrypto'\nsuperuser = false\nmodule_pathname = '$libdir/anon'\ntrusted = true\n"
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
spec["extension_data"]["anon"]["archive_path"] = archive_path
|
||||
|
||||
endpoint.create_remote_extension_spec(spec)
|
||||
|
||||
endpoint.start(
|
||||
remote_ext_config=extensions_endpoint,
|
||||
)
|
||||
|
||||
# this is expected to fail if there's no pgcrypto extension, that's ok
|
||||
# we just want to check that the extension was downloaded
|
||||
try:
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Check that appropriate files were downloaded
|
||||
cur.execute("CREATE EXTENSION anon")
|
||||
res = [x[0] for x in cur.fetchall()]
|
||||
log.info(res)
|
||||
except Exception as err:
|
||||
assert "pgcrypto" in str(err), f"unexpected error creating anon extension {err}"
|
||||
# Check that appropriate control files were downloaded
|
||||
cur.execute("SELECT * FROM pg_available_extensions")
|
||||
all_extensions = [x[0] for x in cur.fetchall()]
|
||||
log.info(all_extensions)
|
||||
assert "anon" in all_extensions
|
||||
|
||||
httpserver.check()
|
||||
# postgis is on real s3 but not mock s3.
|
||||
# it's kind of a big file, would rather not upload to github
|
||||
if remote_storage_kind == RemoteStorageKind.REAL_S3:
|
||||
assert "postgis" in all_extensions
|
||||
# this may fail locally if dependency is missing
|
||||
# we don't really care about the error,
|
||||
# we just want to make sure it downloaded
|
||||
try:
|
||||
cur.execute("CREATE EXTENSION postgis")
|
||||
except Exception as err:
|
||||
log.info(f"(expected) error creating postgis extension: {err}")
|
||||
# we do not check the error, so this is basically a NO-OP
|
||||
# however checking the log you can make sure that it worked
|
||||
# and also get valuable information about how long loading the extension took
|
||||
|
||||
# this is expected to fail on my computer because I don't have the pgcrypto extension
|
||||
try:
|
||||
cur.execute("CREATE EXTENSION anon")
|
||||
except Exception as err:
|
||||
log.info("error creating anon extension")
|
||||
assert "pgcrypto" in str(err), "unexpected error creating anon extension"
|
||||
finally:
|
||||
cleanup(pg_version)
|
||||
|
||||
|
||||
# TODO
|
||||
# 1. Test downloading remote library.
|
||||
#
|
||||
# 2. Test a complex extension, which has multiple extensions in one archive
|
||||
# Test downloading remote library.
|
||||
@skip_on_postgres(PgVersion.V16, reason="TODO: PG16 extension building")
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
|
||||
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
|
||||
def test_remote_library(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
neon_env_builder.enable_extensions_remote_storage(remote_storage_kind)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_remote_library", tenant_id=tenant_id)
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
|
||||
# For MOCK_S3 we upload test files.
|
||||
# For REAL_S3 we use the files already in the bucket
|
||||
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
upload_files(env)
|
||||
|
||||
# and use them to run LOAD library
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_remote_library",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
# config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
try:
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# try to load library
|
||||
try:
|
||||
cur.execute("LOAD 'anon'")
|
||||
except Exception as err:
|
||||
log.info(f"error loading anon library: {err}")
|
||||
raise AssertionError("unexpected error loading anon library") from err
|
||||
|
||||
# test library which name is different from extension name
|
||||
# this may fail locally if dependency is missing
|
||||
# however, it does successfully download the postgis archive
|
||||
if remote_storage_kind == RemoteStorageKind.REAL_S3:
|
||||
try:
|
||||
cur.execute("LOAD 'postgis_topology-3'")
|
||||
except Exception as err:
|
||||
log.info("error loading postgis_topology-3")
|
||||
assert "No such file or directory" in str(
|
||||
err
|
||||
), "unexpected error loading postgis_topology-3"
|
||||
finally:
|
||||
cleanup(pg_version)
|
||||
|
||||
|
||||
# Here we test a complex extension
|
||||
# which has multiple extensions in one archive
|
||||
# using postgis as an example
|
||||
#
|
||||
# 3.Test that extension is downloaded after endpoint restart,
|
||||
# @pytest.mark.skipif(
|
||||
# RemoteStorageKind.REAL_S3 not in available_s3_storages(),
|
||||
# reason="skipping test because real s3 not enabled",
|
||||
# )
|
||||
@skip_on_postgres(PgVersion.V16, reason="TODO: PG16 extension building")
|
||||
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
|
||||
def test_multiple_extensions_one_archive(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
neon_env_builder.enable_extensions_remote_storage(RemoteStorageKind.REAL_S3)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_multiple_extensions_one_archive", tenant_id=tenant_id)
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_multiple_extensions_one_archive",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
)
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE EXTENSION address_standardizer;")
|
||||
cur.execute("CREATE EXTENSION address_standardizer_data_us;")
|
||||
# execute query to ensure that it works
|
||||
cur.execute(
|
||||
"SELECT house_num, name, suftype, city, country, state, unit \
|
||||
FROM standardize_address('us_lex', 'us_gaz', 'us_rules', \
|
||||
'One Rust Place, Boston, MA 02109');"
|
||||
)
|
||||
res = cur.fetchall()
|
||||
log.info(res)
|
||||
assert len(res) > 0
|
||||
|
||||
cleanup(pg_version)
|
||||
|
||||
|
||||
# Test that extension is downloaded after endpoint restart,
|
||||
# when the library is used in the query.
|
||||
#
|
||||
# Run the test with mutliple simultaneous connections to an endpoint.
|
||||
# to ensure that the extension is downloaded only once.
|
||||
#
|
||||
# 4. Test that private extensions are only downloaded when they are present in the spec.
|
||||
#
|
||||
@pytest.mark.skip(reason="https://github.com/neondatabase/neon/issues/4949")
|
||||
def test_extension_download_after_restart(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
# TODO: PG15 + PG16 extension building
|
||||
if "v14" not in pg_version: # test set only has extension built for v14
|
||||
return None
|
||||
|
||||
neon_env_builder.enable_extensions_remote_storage(RemoteStorageKind.MOCK_S3)
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
env.neon_cli.create_timeline("test_extension_download_after_restart", tenant_id=tenant_id)
|
||||
|
||||
assert env.ext_remote_storage is not None # satisfy mypy
|
||||
|
||||
# For MOCK_S3 we upload test files.
|
||||
upload_files(env)
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_extension_download_after_restart",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE extension pg_buffercache;")
|
||||
cur.execute("SELECT * from pg_buffercache;")
|
||||
res = cur.fetchall()
|
||||
assert len(res) > 0
|
||||
log.info(res)
|
||||
|
||||
# shutdown compute node
|
||||
endpoint.stop()
|
||||
# remove extension files locally
|
||||
cleanup(pg_version)
|
||||
|
||||
# spin up compute node again (there are no extension files available, because compute is stateless)
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_extension_download_after_restart",
|
||||
tenant_id=tenant_id,
|
||||
remote_ext_config=env.ext_remote_storage.to_string(),
|
||||
config_lines=["log_min_messages=debug3"],
|
||||
)
|
||||
|
||||
# connect to compute node and run the query
|
||||
# that will trigger the download of the extension
|
||||
def run_query(endpoint, thread_id: int):
|
||||
log.info("thread_id {%d} starting", thread_id)
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT * from pg_buffercache;")
|
||||
res = cur.fetchall()
|
||||
assert len(res) > 0
|
||||
log.info("thread_id {%d}, res = %s", thread_id, res)
|
||||
|
||||
threads = [threading.Thread(target=run_query, args=(endpoint, i)) for i in range(2)]
|
||||
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
cleanup(pg_version)
|
||||
|
||||
@@ -26,3 +26,13 @@ def test_neon_extension(neon_env_builder: NeonEnvBuilder):
|
||||
# If the version has changed, the test should be updated.
|
||||
# Ensure that the default version is also updated in the neon.control file
|
||||
assert cur.fetchone() == ("1.1",)
|
||||
|
||||
# create test database
|
||||
cur.execute("CREATE DATABASE foodb")
|
||||
|
||||
# connect to test database
|
||||
# test that neon extension is installed and has the correct version
|
||||
with closing(endpoint_main.connect(dbname="foodb")) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT extversion from pg_extension where extname='neon'")
|
||||
assert cur.fetchone() == ("1.1",)
|
||||
|
||||
@@ -336,15 +336,10 @@ def test_live_reconfig_get_evictions_low_residence_duration_metric_threshold(
|
||||
):
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
# disable compaction so that it will not download the layer for repartitioning
|
||||
"compaction_period": "0s"
|
||||
}
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
|
||||
(tenant_id, timeline_id) = env.initial_tenant, env.initial_timeline
|
||||
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
def get_metric():
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import enum
|
||||
import os
|
||||
import shutil
|
||||
from threading import Thread
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -28,7 +27,7 @@ from fixtures.remote_storage import (
|
||||
available_s3_storages,
|
||||
)
|
||||
from fixtures.types import TenantId
|
||||
from fixtures.utils import run_pg_bench_small, wait_until
|
||||
from fixtures.utils import run_pg_bench_small
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
|
||||
@@ -400,78 +399,4 @@ def test_tenant_delete_is_resumed_on_attach(
|
||||
)
|
||||
|
||||
|
||||
def test_long_timeline_create_cancelled_by_tenant_delete(neon_env_builder: NeonEnvBuilder):
|
||||
"""Reproduction of 2023-11-23 stuck tenants investigation"""
|
||||
|
||||
# do not use default tenant/timeline creation because it would output the failpoint log message too early
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# happens with the cancellation bailing flushing loop earlier, leaving disk_consistent_lsn at zero
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Timeline got dropped without initializing, cleaning its files"
|
||||
)
|
||||
# the response hit_pausable_failpoint_and_later_fail
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Error processing HTTP request: InternalServerError\\(new timeline {env.initial_tenant}/{env.initial_timeline} has invalid disk_consistent_lsn"
|
||||
)
|
||||
|
||||
pageserver_http.tenant_create(env.initial_tenant)
|
||||
|
||||
failpoint = "flush-layer-cancel-after-writing-layer-out-pausable"
|
||||
pageserver_http.configure_failpoints((failpoint, "pause"))
|
||||
|
||||
def hit_pausable_failpoint_and_later_fail():
|
||||
with pytest.raises(
|
||||
PageserverApiException, match="new timeline \\S+ has invalid disk_consistent_lsn"
|
||||
):
|
||||
pageserver_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline
|
||||
)
|
||||
|
||||
def start_deletion():
|
||||
pageserver_http.tenant_delete(env.initial_tenant)
|
||||
|
||||
def has_hit_failpoint():
|
||||
assert env.pageserver.log_contains(f"at failpoint {failpoint}") is not None
|
||||
|
||||
def deletion_has_started_waiting_for_timelines():
|
||||
assert env.pageserver.log_contains("Waiting for timelines...") is not None
|
||||
|
||||
def tenant_is_deleted():
|
||||
try:
|
||||
pageserver_http.tenant_status(env.initial_tenant)
|
||||
except PageserverApiException as e:
|
||||
assert e.status_code == 404
|
||||
else:
|
||||
raise RuntimeError("tenant was still accessible")
|
||||
|
||||
creation = Thread(target=hit_pausable_failpoint_and_later_fail)
|
||||
creation.start()
|
||||
|
||||
deletion = None
|
||||
|
||||
try:
|
||||
wait_until(10, 1, has_hit_failpoint)
|
||||
|
||||
# it should start ok, sync up with the stuck creation, then fail because disk_consistent_lsn was not updated
|
||||
# then deletion should fail and set the tenant broken
|
||||
deletion = Thread(target=start_deletion)
|
||||
deletion.start()
|
||||
|
||||
wait_until(10, 1, deletion_has_started_waiting_for_timelines)
|
||||
|
||||
pageserver_http.configure_failpoints((failpoint, "off"))
|
||||
|
||||
creation.join()
|
||||
deletion.join()
|
||||
|
||||
wait_until(10, 1, tenant_is_deleted)
|
||||
finally:
|
||||
creation.join()
|
||||
if deletion is not None:
|
||||
deletion.join()
|
||||
|
||||
|
||||
# TODO test concurrent deletions with "hang" failpoint
|
||||
|
||||
@@ -134,11 +134,10 @@ def wait_for_pageserver_catchup(endpoint_main: Endpoint, polling_interval=1, tim
|
||||
res = endpoint_main.safe_psql(
|
||||
"""
|
||||
SELECT
|
||||
pg_size_pretty(neon.pg_cluster_size()),
|
||||
pg_size_pretty(pg_cluster_size()),
|
||||
pg_wal_lsn_diff(pg_current_wal_flush_lsn(), received_lsn) as received_lsn_lag
|
||||
FROM neon.backpressure_lsns();
|
||||
""",
|
||||
dbname="postgres",
|
||||
FROM backpressure_lsns();
|
||||
"""
|
||||
)[0]
|
||||
log.info(f"pg_cluster_size = {res[0]}, received_lsn_lag = {res[1]}")
|
||||
received_lsn_lag = res[1]
|
||||
@@ -215,7 +214,7 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
wait_for_pageserver_catchup(endpoint_main)
|
||||
|
||||
cur.execute("SELECT * from pg_size_pretty(neon.pg_cluster_size())")
|
||||
cur.execute("SELECT * from pg_size_pretty(pg_cluster_size())")
|
||||
pg_cluster_size = cur.fetchone()
|
||||
log.info(f"pg_cluster_size = {pg_cluster_size}")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user