Compare commits

..

4 Commits

Author SHA1 Message Date
John Spray
94e1d80a64 scrubber: make scan-metadata enumerate relics & unreadable timelines 2023-12-08 14:48:41 +00:00
John Spray
a28d91e8bc scrubber: report on generation-ful-ness of indices 2023-12-08 13:59:42 +00:00
John Spray
041d610fbe scrubber: handle initdb files 2023-12-08 13:49:40 +00:00
John Spray
ec03c29644 scrubber: only trim prefix if it ends with / 2023-12-08 13:49:40 +00:00
40 changed files with 513 additions and 523 deletions

View File

@@ -199,10 +199,6 @@ jobs:
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
for r in 14 15 16; do
git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r"
git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r"
done
- name: Checkout
uses: actions/checkout@v3
@@ -1101,10 +1097,6 @@ jobs:
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
for r in 14 15 16; do
git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r"
git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r"
done
- name: Checkout
uses: actions/checkout@v3

View File

@@ -142,10 +142,6 @@ jobs:
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
for r in 14 15 16; do
git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r"
git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r"
done
- name: Checkout
uses: actions/checkout@v4
@@ -242,20 +238,6 @@ jobs:
options: --init
steps:
- name: Fix git ownership
run: |
# Workaround for `fatal: detected dubious ownership in repository at ...`
#
# Use both ${{ github.workspace }} and ${GITHUB_WORKSPACE} because they're different on host and in containers
# Ref https://github.com/actions/checkout/issues/785
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
for r in 14 15 16; do
git config --global --add safe.directory "${{ github.workspace }}/vendor/postgres-v$r"
git config --global --add safe.directory "${GITHUB_WORKSPACE}/vendor/postgres-v$r"
done
- name: Checkout
uses: actions/checkout@v4
with:

45
Cargo.lock generated
View File

@@ -44,12 +44,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "allocator-api2"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5"
[[package]]
name = "android_system_properties"
version = "0.1.5"
@@ -896,7 +890,7 @@ checksum = "a246e68bb43f6cd9db24bea052a53e40405417c5fb372e3d1a8a7f770a564ef5"
dependencies = [
"memchr",
"once_cell",
"regex-automata 0.1.10",
"regex-automata",
"serde",
]
@@ -2048,10 +2042,6 @@ name = "hashbrown"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
dependencies = [
"ahash",
"allocator-api2",
]
[[package]]
name = "hashlink"
@@ -2543,7 +2533,7 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata 0.1.10",
"regex-automata",
]
[[package]]
@@ -2569,9 +2559,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "memchr"
version = "2.6.4"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "memoffset"
@@ -3820,14 +3810,13 @@ dependencies = [
[[package]]
name = "regex"
version = "1.10.2"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343"
checksum = "d1a59b5d8e97dee33696bf13c5ba8ab85341c002922fba050069326b9c498974"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata 0.4.3",
"regex-syntax 0.8.2",
"regex-syntax 0.7.2",
]
[[package]]
@@ -3839,17 +3828,6 @@ dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
name = "regex-automata"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.8.2",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
@@ -3858,9 +3836,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.2"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"
[[package]]
name = "relative-path"
@@ -5246,8 +5224,6 @@ dependencies = [
"futures-core",
"futures-io",
"futures-sink",
"futures-util",
"hashbrown 0.14.0",
"pin-project-lite",
"tokio",
"tracing",
@@ -6243,8 +6219,7 @@ dependencies = [
"prost",
"rand 0.8.5",
"regex",
"regex-automata 0.4.3",
"regex-syntax 0.8.2",
"regex-syntax 0.7.2",
"reqwest",
"ring 0.16.20",
"rustls",

View File

@@ -109,7 +109,7 @@ pin-project-lite = "0.2"
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
prost = "0.11"
rand = "0.8"
regex = "1.10.2"
regex = "1.4"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_19"] }
reqwest-middleware = "0.2.0"
@@ -149,7 +149,7 @@ tokio-postgres-rustls = "0.10.0"
tokio-rustls = "0.24"
tokio-stream = "0.1"
tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
tokio-util = { version = "0.7", features = ["io"] }
toml = "0.7"
toml_edit = "0.19"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}

View File

@@ -252,7 +252,7 @@ fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()>
IF NOT EXISTS (
SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
THEN
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION IN ROLE pg_read_all_data, pg_write_all_data;
IF array_length(roles, 1) IS NOT NULL THEN
EXECUTE format('GRANT neon_superuser TO %s',
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));

View File

@@ -193,11 +193,16 @@ impl Escaping for PgIdent {
/// Build a list of existing Postgres roles
pub fn get_existing_roles(xact: &mut Transaction<'_>) -> Result<Vec<Role>> {
let postgres_roles = xact
.query("SELECT rolname, rolpassword FROM pg_catalog.pg_authid", &[])?
.query(
"SELECT rolname, rolpassword, rolreplication, rolbypassrls FROM pg_catalog.pg_authid",
&[],
)?
.iter()
.map(|row| Role {
name: row.get("rolname"),
encrypted_password: row.get("rolpassword"),
replication: Some(row.get("rolreplication")),
bypassrls: Some(row.get("rolbypassrls")),
options: None,
})
.collect();

View File

@@ -252,6 +252,8 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
let action = if let Some(r) = pg_role {
if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
|| (r.encrypted_password.is_some() && role.encrypted_password.is_none())
|| !r.bypassrls.unwrap_or(false)
|| !r.replication.unwrap_or(false)
{
RoleAction::Update
} else if let Some(pg_pwd) = &r.encrypted_password {
@@ -283,22 +285,14 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
match action {
RoleAction::None => {}
RoleAction::Update => {
// This can be run on /every/ role! Not just ones created through the console.
// This means that if you add some funny ALTER here that adds a permission,
// this will get run even on user-created roles! This will result in different
// behavior before and after a spec gets reapplied. The below ALTER as it stands
// now only grants LOGIN and changes the password. Please do not allow this branch
// to do anything silly.
let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
let mut query: String =
format!("ALTER ROLE {} BYPASSRLS REPLICATION", name.pg_quote());
query.push_str(&role.to_pg_options());
xact.execute(query.as_str(), &[])?;
}
RoleAction::Create => {
// This branch only runs when roles are created through the console, so it is
// safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
// from neon_superuser.
let mut query: String = format!(
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB IN ROLE neon_superuser",
"CREATE ROLE {} CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
name.pg_quote()
);
info!("role create query: '{}'", &query);

View File

@@ -207,6 +207,8 @@ pub struct DeltaOp {
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
pub replication: Option<bool>,
pub bypassrls: Option<bool>,
pub options: GenericOptions,
}

View File

@@ -1,14 +1,16 @@
use tokio_util::task::{task_tracker::TaskTrackerToken, TaskTracker};
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
///
/// Can be cloned, moved and kept around in futures as "guard objects".
#[derive(Clone)]
pub struct Completion(TaskTrackerToken);
pub struct Completion(mpsc::Sender<()>);
/// Barrier will wait until all clones of [`Completion`] have been dropped.
#[derive(Clone)]
pub struct Barrier(TaskTracker);
pub struct Barrier(Arc<Mutex<mpsc::Receiver<()>>>);
impl Default for Barrier {
fn default() -> Self {
@@ -19,7 +21,7 @@ impl Default for Barrier {
impl Barrier {
pub async fn wait(self) {
self.0.wait().await;
self.0.lock().await.recv().await;
}
pub async fn maybe_wait(barrier: Option<Barrier>) {
@@ -31,7 +33,8 @@ impl Barrier {
impl PartialEq for Barrier {
fn eq(&self, other: &Self) -> bool {
TaskTracker::ptr_eq(&self.0, &other.0)
// we don't use dyn so this is good
Arc::ptr_eq(&self.0, &other.0)
}
}
@@ -39,10 +42,8 @@ impl Eq for Barrier {}
/// Create new Guard and Barrier pair.
pub fn channel() -> (Completion, Barrier) {
let tracker = TaskTracker::new();
// otherwise wait never exits
tracker.close();
let token = tracker.token();
(Completion(token), Barrier(tracker))
let (tx, rx) = mpsc::channel::<()>(1);
let rx = Mutex::new(rx);
let rx = Arc::new(rx);
(Completion(tx), Barrier(rx))
}

View File

@@ -425,6 +425,7 @@ fn start_pageserver(
let tenant_manager = Arc::new(tenant_manager);
BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx;
let shutdown_pageserver = shutdown_pageserver.clone();
let drive_init = async move {
// NOTE: unlike many futures in pageserver, this one is cancellation-safe
@@ -559,6 +560,7 @@ fn start_pageserver(
}
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
let background_jobs_barrier = background_jobs_barrier;
let metrics_ctx = RequestContext::todo_child(
TaskKind::MetricsCollection,
// This task itself shouldn't download anything.

View File

@@ -281,18 +281,12 @@ async fn calculate_synthetic_size_worker(
// By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
// which turns out is really handy to understand the system.
if let Err(e) = tenant.calculate_synthetic_size(cause, cancel, ctx).await {
// this error can be returned if timeline is shutting down, but it does not
// mean the synthetic size worker should terminate. we do not need any checks
// in this function because `mgr::get_tenant` will error out after shutdown has
// progressed to shutting down tenants.
let is_cancelled = matches!(
e.downcast_ref::<PageReconstructError>(),
Some(PageReconstructError::Cancelled)
);
if !is_cancelled {
error!("failed to calculate synthetic size for tenant {tenant_id}: {e:#}");
if let Some(PageReconstructError::Cancelled) =
e.downcast_ref::<PageReconstructError>()
{
return Ok(());
}
error!("failed to calculate synthetic size for tenant {tenant_id}: {e:#}");
}
}
}
@@ -305,7 +299,7 @@ async fn calculate_synthetic_size_worker(
let res = tokio::time::timeout_at(
started_at + synthetic_size_calculation_interval,
cancel.cancelled(),
task_mgr::shutdown_token().cancelled(),
)
.await;
if res.is_ok() {

View File

@@ -285,63 +285,6 @@ pub static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> = Lazy::new(|| PageCacheS
},
});
pub(crate) mod page_cache_eviction_metrics {
use std::num::NonZeroUsize;
use metrics::{register_int_counter_vec, IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
#[derive(Clone, Copy)]
pub(crate) enum Outcome {
FoundSlotUnused { iters: NonZeroUsize },
FoundSlotEvicted { iters: NonZeroUsize },
ItersExceeded { iters: NonZeroUsize },
}
static ITERS_TOTAL_VEC: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_cache_find_victim_iters_total",
"Counter for the number of iterations in the find_victim loop",
&["outcome"],
)
.expect("failed to define a metric")
});
static CALLS_VEC: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_cache_find_victim_calls",
"Incremented at the end of each find_victim() call.\
Filter by outcome to get e.g., eviction rate.",
&["outcome"]
)
.unwrap()
});
pub(crate) fn observe(outcome: Outcome) {
macro_rules! dry {
($label:literal, $iters:expr) => {{
static LABEL: &'static str = $label;
static ITERS_TOTAL: Lazy<IntCounter> =
Lazy::new(|| ITERS_TOTAL_VEC.with_label_values(&[LABEL]));
static CALLS: Lazy<IntCounter> =
Lazy::new(|| CALLS_VEC.with_label_values(&[LABEL]));
ITERS_TOTAL.inc_by(($iters.get()) as u64);
CALLS.inc();
}};
}
match outcome {
Outcome::FoundSlotUnused { iters } => dry!("found_empty", iters),
Outcome::FoundSlotEvicted { iters } => {
dry!("found_evicted", iters)
}
Outcome::ItersExceeded { iters } => {
dry!("err_iters_exceeded", iters);
super::page_cache_errors_inc(super::PageCacheErrorKind::EvictIterLimit);
}
}
}
}
pub(crate) static PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_page_cache_acquire_pinned_slot_seconds",
@@ -351,6 +294,14 @@ pub(crate) static PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME: Lazy<Histogram> = Lazy::n
.expect("failed to define a metric")
});
pub(crate) static PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_page_cache_find_victim_iters_total",
"Counter for the number of iterations in the find_victim loop",
)
.expect("failed to define a metric")
});
static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"page_cache_errors_total",
@@ -891,26 +842,6 @@ pub(crate) static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
)
.expect("failed to define a metric")
});
pub(crate) mod virtual_file_descriptor_cache {
use super::*;
pub(crate) static SIZE_MAX: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_virtual_file_descriptor_cache_size_max",
"Maximum number of open file descriptors in the cache."
)
.unwrap()
});
// SIZE_CURRENT: derive it like so:
// ```
// sum (pageserver_io_operations_seconds_count{operation=~"^(open|open-after-replace)$")
// -ignoring(operation)
// sum(pageserver_io_operations_seconds_count{operation=~"^(close|close-by-replace)$"}
// ```
}
#[derive(Debug)]
struct GlobalAndPerTimelineHistogram {
global: Histogram,

View File

@@ -88,11 +88,7 @@ use utils::{
lsn::Lsn,
};
use crate::{
context::RequestContext,
metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
repository::Key,
};
use crate::{context::RequestContext, metrics::PageCacheSizeMetrics, repository::Key};
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
const TEST_PAGE_CACHE_SIZE: usize = 50;
@@ -901,10 +897,8 @@ impl PageCache {
// Note that just yielding to tokio during iteration without such
// priority boosting is likely counter-productive. We'd just give more opportunities
// for B to bump usage count, further starving A.
page_cache_eviction_metrics::observe(
page_cache_eviction_metrics::Outcome::ItersExceeded {
iters: iters.try_into().unwrap(),
},
crate::metrics::page_cache_errors_inc(
crate::metrics::PageCacheErrorKind::EvictIterLimit,
);
anyhow::bail!("exceeded evict iter limit");
}
@@ -915,18 +909,8 @@ impl PageCache {
// remove mapping for old buffer
self.remove_mapping(old_key);
inner.key = None;
page_cache_eviction_metrics::observe(
page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
iters: iters.try_into().unwrap(),
},
);
} else {
page_cache_eviction_metrics::observe(
page_cache_eviction_metrics::Outcome::FoundSlotUnused {
iters: iters.try_into().unwrap(),
},
);
}
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
return Ok((slot_idx, inner));
}
}

View File

@@ -654,7 +654,6 @@ pub fn init(num_slots: usize) {
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
panic!("virtual_file::init called twice");
}
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
}
const TEST_MAX_FILE_DESCRIPTORS: usize = 10;

View File

@@ -61,7 +61,6 @@ thiserror.workspace = true
tls-listener.workspace = true
tokio-postgres.workspace = true
tokio-rustls.workspace = true
tokio-util.workspace = true
tokio = { workspace = true, features = ["signal"] }
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
@@ -78,6 +77,7 @@ postgres-protocol.workspace = true
smol_str.workspace = true
workspace_hack.workspace = true
tokio-util.workspace = true
[dev-dependencies]
rcgen.workspace = true

View File

@@ -3,7 +3,7 @@
use crate::{
auth::password_hack::parse_endpoint_param,
error::UserFacingError,
proxy::{neon_options_str, NUM_CONNECTION_ACCEPTED_BY_SNI},
proxy::{neon_options, NUM_CONNECTION_ACCEPTED_BY_SNI},
};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
@@ -140,7 +140,7 @@ impl ClientCredentials {
let cache_key = format!(
"{}{}",
project.as_deref().unwrap_or(""),
neon_options_str(params)
neon_options(params).unwrap_or("".to_string())
)
.into();
@@ -406,7 +406,10 @@ mod tests {
let peer_addr = IpAddr::from([127, 0, 0, 1]);
let creds = ClientCredentials::parse(&options, sni, common_names, peer_addr)?;
assert_eq!(creds.project.as_deref(), Some("project"));
assert_eq!(creds.cache_key, "projectendpoint_type:read_write lsn:0/2");
assert_eq!(
creds.cache_key,
"projectneon_endpoint_type:read_write neon_lsn:0/2"
);
Ok(())
}

View File

@@ -8,7 +8,6 @@ use std::{net::SocketAddr, sync::Arc};
use futures::future::Either;
use itertools::Itertools;
use proxy::config::TlsServerEndPoint;
use proxy::proxy::run_until_cancelled;
use tokio::net::TcpListener;
use anyhow::{anyhow, bail, ensure, Context};
@@ -21,7 +20,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::sync::CancellationToken;
use utils::{project_git_version, sentry_init::init_sentry};
use tracing::{error, info, Instrument};
use tracing::{error, info, warn, Instrument};
project_git_version!(GIT_VERSION);
@@ -152,39 +151,63 @@ async fn task_main(
// will be inherited by all accepted client sockets.
socket2::SockRef::from(&listener).set_keepalive(true)?;
let connections = tokio_util::task::task_tracker::TaskTracker::new();
let mut connections = tokio::task::JoinSet::new();
while let Some(accept_result) =
run_until_cancelled(listener.accept(), &cancellation_token).await
{
let (socket, peer_addr) = accept_result?;
loop {
tokio::select! {
accept_result = listener.accept() => {
let (socket, peer_addr) = accept_result?;
let session_id = uuid::Uuid::new_v4();
let tls_config = Arc::clone(&tls_config);
let dest_suffix = Arc::clone(&dest_suffix);
let session_id = uuid::Uuid::new_v4();
let tls_config = Arc::clone(&tls_config);
let dest_suffix = Arc::clone(&dest_suffix);
connections.spawn(
async move {
socket
.set_nodelay(true)
.context("failed to set socket option")?;
connections.spawn(
async move {
socket
.set_nodelay(true)
.context("failed to set socket option")?;
info!(%peer_addr, "serving");
handle_client(dest_suffix, tls_config, tls_server_end_point, socket).await
info!(%peer_addr, "serving");
handle_client(dest_suffix, tls_config, tls_server_end_point, socket).await
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
error!("per-client task finished with an error: {e:#}");
})
.instrument(tracing::info_span!("handle_client", ?session_id))
);
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
error!("per-client task finished with an error: {e:#}");
})
.instrument(tracing::info_span!("handle_client", ?session_id)),
);
// Don't modify this unless you read https://docs.rs/tokio/latest/tokio/macro.select.html carefully.
// If this future completes and the pattern doesn't match, this branch is disabled for this call to `select!`.
// This only counts for this loop and it will be enabled again on next `select!`.
//
// Prior code had this as `Some(Err(e))` which _looks_ equivalent to the current setup, but it's not.
// When `connections.join_next()` returned `Some(Ok(()))` (which we expect), it would disable the join_next and it would
// not get called again, even if there are more connections to remove.
Some(res) = connections.join_next() => {
if let Err(e) = res {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
}
_ = cancellation_token.cancelled() => {
drop(listener);
break;
}
}
}
connections.close();
drop(listener);
connections.wait().await;
// Drain connections
info!("waiting for all client connections to finish");
while let Some(res) = connections.join_next().await {
if let Err(e) = res {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
}
info!("all client connections have finished");
Ok(())
}

View File

@@ -1,6 +1,6 @@
use crate::{
auth::parse_endpoint_param, cancellation::CancelClosure, console::errors::WakeComputeError,
error::UserFacingError, proxy::neon_option,
error::UserFacingError, proxy::is_neon_param,
};
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
@@ -275,7 +275,7 @@ fn filtered_options(params: &StartupMessageParams) -> Option<String> {
#[allow(unstable_name_collisions)]
let options: String = params
.options_raw()?
.filter(|opt| parse_endpoint_param(opt).is_none() && neon_option(opt).is_none())
.filter(|opt| parse_endpoint_param(opt).is_none() && !is_neon_param(opt))
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();

View File

@@ -201,18 +201,7 @@ pub struct ConsoleReqExtra<'a> {
pub session_id: uuid::Uuid,
/// Name of client application, if set.
pub application_name: Option<&'a str>,
pub options: Vec<(String, String)>,
}
impl<'a> ConsoleReqExtra<'a> {
// https://swagger.io/docs/specification/serialization/ DeepObject format
// paramName[prop1]=value1&paramName[prop2]=value2&....
pub fn options_as_deep_object(&self) -> Vec<(String, String)> {
self.options
.iter()
.map(|(k, v)| (format!("options[{}]", k), v.to_string()))
.collect()
}
pub options: Option<&'a str>,
}
/// Auth secret which is managed by the cloud.

View File

@@ -106,7 +106,7 @@ impl Api {
) -> Result<NodeInfo, WakeComputeError> {
let request_id = uuid::Uuid::new_v4().to_string();
async {
let mut request_builder = self
let request = self
.endpoint
.get("proxy_wake_compute")
.header("X-Request-ID", &request_id)
@@ -115,14 +115,9 @@ impl Api {
.query(&[
("application_name", extra.application_name),
("project", Some(&creds.endpoint)),
]);
request_builder = if extra.options.is_empty() {
request_builder
} else {
request_builder.query(&extra.options_as_deep_object())
};
let request = request_builder.build()?;
("options", extra.options),
])
.build()?;
info!(url = request.url().as_str(), "sending http request");
let start = Instant::now();

View File

@@ -277,21 +277,6 @@ static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});
pub async fn run_until_cancelled<F: std::future::Future>(
f: F,
cancellation_token: &CancellationToken,
) -> Option<F::Output> {
match futures::future::select(
std::pin::pin!(f),
std::pin::pin!(cancellation_token.cancelled()),
)
.await
{
futures::future::Either::Left((f, _)) => Some(f),
futures::future::Either::Right(((), _)) => None,
}
}
pub async fn task_main(
config: &'static ProxyConfig,
listener: tokio::net::TcpListener,
@@ -305,62 +290,71 @@ pub async fn task_main(
// will be inherited by all accepted client sockets.
socket2::SockRef::from(&listener).set_keepalive(true)?;
let connections = tokio_util::task::task_tracker::TaskTracker::new();
let mut connections = tokio::task::JoinSet::new();
let cancel_map = Arc::new(CancelMap::default());
while let Some(accept_result) =
run_until_cancelled(listener.accept(), &cancellation_token).await
{
let (socket, peer_addr) = accept_result?;
loop {
tokio::select! {
accept_result = listener.accept() => {
let (socket, peer_addr) = accept_result?;
let session_id = uuid::Uuid::new_v4();
let cancel_map = Arc::clone(&cancel_map);
connections.spawn(
async move {
info!("accepted postgres client connection");
let session_id = uuid::Uuid::new_v4();
let cancel_map = Arc::clone(&cancel_map);
connections.spawn(
async move {
info!("accepted postgres client connection");
let mut socket = WithClientIp::new(socket);
let mut peer_addr = peer_addr;
if let Some(ip) = socket.wait_for_addr().await? {
peer_addr = ip;
tracing::Span::current().record("peer_addr", &tracing::field::display(ip));
} else if config.require_client_ip {
bail!("missing required client IP");
}
let mut socket = WithClientIp::new(socket);
let mut peer_addr = peer_addr;
if let Some(ip) = socket.wait_for_addr().await? {
peer_addr = ip;
tracing::Span::current().record("peer_addr", &tracing::field::display(ip));
} else if config.require_client_ip {
bail!("missing required client IP");
}
socket
.inner
.set_nodelay(true)
.context("failed to set socket option")?;
socket
.inner
.set_nodelay(true)
.context("failed to set socket option")?;
handle_client(
config,
&cancel_map,
session_id,
socket,
ClientMode::Tcp,
peer_addr.ip(),
)
.await
handle_client(config, &cancel_map, session_id, socket, ClientMode::Tcp, peer_addr.ip()).await
}
.instrument(info_span!("handle_client", ?session_id, peer_addr = tracing::field::Empty))
.unwrap_or_else(move |e| {
// Acknowledge that the task has finished with an error.
error!(?session_id, "per-client task finished with an error: {e:#}");
}),
);
}
.instrument(info_span!(
"handle_client",
?session_id,
peer_addr = tracing::field::Empty
))
.unwrap_or_else(move |e| {
// Acknowledge that the task has finished with an error.
error!(?session_id, "per-client task finished with an error: {e:#}");
}),
);
// Don't modify this unless you read https://docs.rs/tokio/latest/tokio/macro.select.html carefully.
// If this future completes and the pattern doesn't match, this branch is disabled for this call to `select!`.
// This only counts for this loop and it will be enabled again on next `select!`.
//
// Prior code had this as `Some(Err(e))` which _looks_ equivalent to the current setup, but it's not.
// When `connections.join_next()` returned `Some(Ok(()))` (which we expect), it would disable the join_next and it would
// not get called again, even if there are more connections to remove.
Some(res) = connections.join_next() => {
if let Err(e) = res {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
}
_ = cancellation_token.cancelled() => {
drop(listener);
break;
}
}
}
connections.close();
drop(listener);
// Drain connections
connections.wait().await;
while let Some(res) = connections.join_next().await {
if let Err(e) = res {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
}
Ok(())
}
@@ -968,10 +962,12 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
allow_self_signed_compute,
} = self;
let console_options = neon_options(params);
let extra = console::ConsoleReqExtra {
session_id, // aka this connection's id
application_name: params.get("application_name"),
options: neon_options(params),
options: console_options.as_deref(),
};
let mut latency_timer = LatencyTimer::new(mode.protocol_label());
@@ -1031,29 +1027,26 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
}
}
pub fn neon_options(params: &StartupMessageParams) -> Vec<(String, String)> {
pub fn neon_options(params: &StartupMessageParams) -> Option<String> {
#[allow(unstable_name_collisions)]
match params.options_raw() {
Some(options) => options.filter_map(neon_option).collect(),
None => vec![],
}
}
pub fn neon_options_str(params: &StartupMessageParams) -> String {
#[allow(unstable_name_collisions)]
neon_options(params)
.iter()
.map(|(k, v)| format!("{}:{}", k, v))
let options: String = params
.options_raw()?
.filter(|opt| is_neon_param(opt))
.sorted() // we sort it to use as cache key
.intersperse(" ".to_owned())
.collect()
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();
// Don't even bother with empty options.
if options.is_empty() {
return None;
}
Some(options)
}
pub fn neon_option(bytes: &str) -> Option<(String, String)> {
pub fn is_neon_param(bytes: &str) -> bool {
static RE: OnceCell<Regex> = OnceCell::new();
let re = RE.get_or_init(|| Regex::new(r"^neon_(\w+):(.+)").unwrap());
RE.get_or_init(|| Regex::new(r"^neon_\w+:").unwrap());
let cap = re.captures(bytes)?;
let (_, [k, v]) = cap.extract();
Some((k.to_owned(), v.to_owned()))
RE.get().unwrap().is_match(bytes)
}

View File

@@ -491,7 +491,7 @@ fn helper_create_connect_info(
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some("TEST"),
options: vec![],
options: None,
};
let creds = auth::BackendType::Test(mechanism);
(cache, extra, creds)

View File

@@ -10,7 +10,6 @@ use anyhow::bail;
use hyper::StatusCode;
pub use reqwest_middleware::{ClientWithMiddleware, Error};
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio_util::task::TaskTracker;
use crate::protocol2::{ProxyProtocolAccept, WithClientIp};
use crate::proxy::{NUM_CLIENT_CONNECTION_CLOSED_COUNTER, NUM_CLIENT_CONNECTION_OPENED_COUNTER};
@@ -71,9 +70,6 @@ pub async fn task_main(
incoming: addr_incoming,
};
let ws_connections = tokio_util::task::task_tracker::TaskTracker::new();
ws_connections.close(); // allows `ws_connections.wait to complete`
let tls_listener = TlsListener::new(tls_acceptor, addr_incoming).filter(|conn| {
if let Err(err) = conn {
error!("failed to accept TLS connection for websockets: {err:?}");
@@ -90,7 +86,6 @@ pub async fn task_main(
let remote_addr = io.inner.remote_addr();
let sni_name = tls.server_name().map(|s| s.to_string());
let conn_pool = conn_pool.clone();
let ws_connections = ws_connections.clone();
async move {
let peer_addr = match client_addr {
@@ -102,7 +97,6 @@ pub async fn task_main(
move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
let ws_connections = ws_connections.clone();
async move {
let cancel_map = Arc::new(CancelMap::default());
@@ -112,7 +106,6 @@ pub async fn task_main(
req,
config,
conn_pool,
ws_connections,
cancel_map,
session_id,
sni_name,
@@ -136,9 +129,6 @@ pub async fn task_main(
.with_graceful_shutdown(cancellation_token.cancelled())
.await?;
// await websocket connections
ws_connections.wait().await;
Ok(())
}
@@ -180,12 +170,10 @@ where
}
}
#[allow(clippy::too_many_arguments)]
async fn request_handler(
mut request: Request<Body>,
config: &'static ProxyConfig,
conn_pool: Arc<conn_pool::GlobalConnPool>,
ws_connections: TaskTracker,
cancel_map: Arc<CancelMap>,
session_id: uuid::Uuid,
sni_hostname: Option<String>,
@@ -205,7 +193,7 @@ async fn request_handler(
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)
.map_err(|e| ApiError::BadRequest(e.into()))?;
ws_connections.spawn(
tokio::spawn(
async move {
if let Err(e) = websocket::serve_websocket(
websocket,

View File

@@ -433,7 +433,7 @@ async fn connect_to_compute(
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some(APP_NAME),
options: console_options,
options: console_options.as_deref(),
};
// TODO(anna): this is a bit hacky way, consider using console notification listener.
if !config.disable_ip_check_for_http {

View File

@@ -142,7 +142,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
.collect();
if !orphan_layers.is_empty() {
result.errors.push(format!(
result.warnings.push(format!(
"index_part.json does not contain layers from S3: {:?}",
orphan_layers
.iter()
@@ -170,6 +170,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
));
}
}
BlobDataParseResult::Relic => {}
BlobDataParseResult::Incorrect(parse_errors) => result.errors.extend(
parse_errors
.into_iter()
@@ -215,6 +216,8 @@ pub(crate) enum BlobDataParseResult {
index_part_generation: Generation,
s3_layers: HashSet<(LayerFileName, Generation)>,
},
/// The remains of a deleted Timeline (i.e. an initdb archive only)
Relic,
Incorrect(Vec<String>),
}
@@ -245,6 +248,7 @@ pub(crate) async fn list_timeline_blobs(
timeline_dir_target.delimiter = String::new();
let mut index_parts: Vec<ObjectIdentifier> = Vec::new();
let mut initdb_archive: bool = false;
let stream = stream_listing(s3_client, &timeline_dir_target);
pin_mut!(stream);
@@ -258,6 +262,10 @@ pub(crate) async fn list_timeline_blobs(
tracing::info!("Index key {key}");
index_parts.push(obj)
}
Some("initdb.tar.zst") => {
tracing::info!("initdb archive {key}");
initdb_archive = true;
}
Some(maybe_layer_name) => match parse_layer_object_name(maybe_layer_name) {
Ok((new_layer, gen)) => {
tracing::info!("Parsed layer key: {} {:?}", new_layer, gen);
@@ -279,6 +287,16 @@ pub(crate) async fn list_timeline_blobs(
}
}
if index_parts.is_empty() && s3_layers.is_empty() && initdb_archive {
tracing::info!(
"Timeline is empty apart from initdb archive: expected post-deletion state."
);
return Ok(S3TimelineBlobData {
blob_data: BlobDataParseResult::Relic,
keys_to_remove: Vec::new(),
});
}
// Choose the index_part with the highest generation
let (index_part_object, index_part_generation) = match index_parts
.iter()

View File

@@ -86,7 +86,9 @@ impl S3Target {
if new_self.prefix_in_bucket.is_empty() {
new_self.prefix_in_bucket = format!("/{}/", new_segment);
} else {
let _ = new_self.prefix_in_bucket.pop();
if new_self.prefix_in_bucket.ends_with('/') {
let _ = new_self.prefix_in_bucket.pop();
}
new_self.prefix_in_bucket =
[&new_self.prefix_in_bucket, new_segment, ""].join(&new_self.delimiter);
}

View File

@@ -20,6 +20,14 @@ pub struct MetadataSummary {
with_warnings: HashSet<TenantTimelineId>,
with_garbage: HashSet<TenantTimelineId>,
indices_by_version: HashMap<usize, usize>,
indices_with_generation: usize,
indices_without_generation: usize,
/// Timelines that couldn't even parse metadata and/or object keys: extremely damaged
invalid_count: usize,
/// Timelines with just an initdb archive, left behind after deletion.
relic_count: usize,
layer_count: MinMaxHisto,
timeline_size_bytes: MinMaxHisto,
@@ -39,6 +47,8 @@ impl MinMaxHisto {
fn new() -> Self {
Self {
histo: histogram::Histogram::builder()
// Accomodate tenant sizes up to 32TiB
.maximum_value(32 * 1024 * 1024 * 1024 * 1024)
.build()
.expect("Bad histogram params"),
min: u64::MAX,
@@ -90,6 +100,10 @@ impl MetadataSummary {
with_warnings: HashSet::new(),
with_garbage: HashSet::new(),
indices_by_version: HashMap::new(),
indices_with_generation: 0,
indices_without_generation: 0,
invalid_count: 0,
relic_count: 0,
layer_count: MinMaxHisto::new(),
timeline_size_bytes: MinMaxHisto::new(),
layer_size_bytes: MinMaxHisto::new(),
@@ -111,24 +125,35 @@ impl MetadataSummary {
fn update_data(&mut self, data: &S3TimelineBlobData) {
self.count += 1;
if let BlobDataParseResult::Parsed {
index_part,
index_part_generation: _,
s3_layers: _,
} = &data.blob_data
{
*self
.indices_by_version
.entry(index_part.get_version())
.or_insert(0) += 1;
match &data.blob_data {
BlobDataParseResult::Parsed {
index_part,
index_part_generation,
s3_layers: _,
} => {
*self
.indices_by_version
.entry(index_part.get_version())
.or_insert(0) += 1;
if let Err(e) = self.update_histograms(index_part) {
// Value out of range? Warn that the results are untrustworthy
tracing::warn!(
"Error updating histograms, summary stats may be wrong: {}",
e
);
// These statistics exist to track the transition to generations. By early 2024 there should be zero
// generation-less timelines in the field and this check can be removed.
if index_part_generation.is_none() {
self.indices_without_generation += 1;
} else {
self.indices_with_generation += 1;
}
if let Err(e) = self.update_histograms(index_part) {
// Value out of range? Warn that the results are untrustworthy
tracing::warn!(
"Error updating histograms, summary stats may be wrong: {}",
e
);
}
}
BlobDataParseResult::Incorrect(_) => self.invalid_count += 1,
BlobDataParseResult::Relic => self.relic_count += 1,
}
}
@@ -156,7 +181,10 @@ impl MetadataSummary {
With errors: {1}
With warnings: {2}
With garbage: {3}
Invalid: {9}
Relics: {10}
Index versions: {version_summary}
Indices with/without generations: {7}/{8}
Timeline size bytes: {4}
Layer size bytes: {5}
Timeline layer count: {6}
@@ -168,6 +196,10 @@ Timeline layer count: {6}
self.timeline_size_bytes.oneline(),
self.layer_size_bytes.oneline(),
self.layer_count.oneline(),
self.indices_with_generation,
self.indices_without_generation,
self.invalid_count,
self.relic_count
)
}

View File

@@ -56,7 +56,6 @@ from fixtures.remote_storage import (
RemoteStorageKind,
RemoteStorageUser,
S3Storage,
default_remote_storage,
remote_storage_to_toml_inline_table,
)
from fixtures.types import Lsn, TenantId, TimelineId
@@ -469,7 +468,7 @@ class NeonEnvBuilder:
# Cannot create more than one environment from one builder
assert self.env is None, "environment already initialized"
if default_remote_storage_if_missing and self.pageserver_remote_storage is None:
self.enable_pageserver_remote_storage(default_remote_storage())
self.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
self.env = NeonEnv(self)
return self.env

View File

@@ -372,13 +372,6 @@ def s3_storage() -> RemoteStorageKind:
return RemoteStorageKind.MOCK_S3
def default_remote_storage() -> RemoteStorageKind:
"""
The remote storage kind used in tests that do not specify a preference
"""
return RemoteStorageKind.LOCAL_FS
# serialize as toml inline table
def remote_storage_to_toml_inline_table(remote_storage: RemoteStorage) -> str:
if not isinstance(remote_storage, (LocalFsStorage, S3Storage)):

View File

@@ -35,11 +35,6 @@ def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
pageserver_http.configure_failpoints(("after-timeline-gc-removed-layers", "exit"))
# Because this test does a rapid series of restarts of the same node, it's possible that
# we are restarted again before we can clean up deletion lists form the previous generation,
# resulting in a subsequent startup logging a warning.
env.pageserver.allowed_errors.append(".*Dropping stale deletions for tenant.*")
for _ in range(5):
with pytest.raises(subprocess.SubprocessError):
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr])

View File

@@ -5,6 +5,7 @@ import time
from collections import defaultdict
from typing import Any, DefaultDict, Dict, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
@@ -18,7 +19,7 @@ from fixtures.pageserver.utils import (
wait_for_upload,
wait_for_upload_queue_empty,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
from fixtures.types import Lsn
from fixtures.utils import query_scalar, wait_until
@@ -44,7 +45,13 @@ def get_num_downloaded_layers(client: PageserverHttpClient):
# If you have a large relation, check that the pageserver downloads parts of it as
# require by queries.
#
def test_ondemand_download_large_rel(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_ondemand_download_large_rel(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
# thinking about using a shared environment? the test assumes that global
# metrics are for single tenant.
env = neon_env_builder.init_start(
@@ -138,7 +145,13 @@ def test_ondemand_download_large_rel(neon_env_builder: NeonEnvBuilder):
# If you have a relation with a long history of updates, the pageserver downloads the layer
# files containing the history as needed by timetravel queries.
#
def test_ondemand_download_timetravel(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_ondemand_download_timetravel(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
# thinking about using a shared environment? the test assumes that global
# metrics are for single tenant.
@@ -216,7 +229,8 @@ def test_ondemand_download_timetravel(neon_env_builder: NeonEnvBuilder):
assert filled_current_physical == filled_size, "we don't yet do layer eviction"
# Wait until generated image layers are uploaded to S3
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, timeline_id)
if remote_storage_kind is not None:
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, timeline_id)
env.pageserver.stop()

View File

@@ -314,10 +314,6 @@ def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder):
assert not config_path.exists(), "detach did not remove config file"
# The re-attach's increment of the generation number may invalidate deletion queue
# updates in flight from the previous attachment.
env.pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
env.pageserver.tenant_attach(tenant_id)
wait_until(
number_of_iterations=5,

View File

@@ -23,18 +23,23 @@ from fixtures.pageserver.utils import (
wait_until_tenant_active,
wait_until_tenant_state,
)
from fixtures.remote_storage import RemoteStorageKind, available_s3_storages, s3_storage
from fixtures.remote_storage import (
RemoteStorageKind,
available_remote_storages,
available_s3_storages,
)
from fixtures.types import TenantId
from fixtures.utils import run_pg_bench_small, wait_until
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_tenant_delete_smoke(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_bin: PgBin,
):
neon_env_builder.pageserver_config_override = "test_remote_failures=1"
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
@@ -73,15 +78,16 @@ def test_tenant_delete_smoke(
run_pg_bench_small(pg_bin, endpoint.connstr())
wait_for_last_flush_lsn(env, endpoint, tenant=tenant_id, timeline=timeline_id)
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
parent = timeline
@@ -94,15 +100,16 @@ def test_tenant_delete_smoke(
tenant_path = env.pageserver.tenant_dir(tenant_id)
assert not tenant_path.exists()
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
if remote_storage_kind in available_s3_storages():
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
# Deletion updates the tenant count: the one default tenant remains
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1
@@ -142,7 +149,9 @@ FAILPOINTS_BEFORE_BACKGROUND = [
def combinations():
result = []
remotes = available_s3_storages()
remotes = [RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"):
remotes.append(RemoteStorageKind.REAL_S3)
for remote_storage_kind in remotes:
for delete_failpoint in FAILPOINTS:
@@ -156,8 +165,8 @@ def combinations():
return result
@pytest.mark.parametrize("check", list(Check))
@pytest.mark.parametrize("remote_storage_kind, failpoint, simulate_failures", combinations())
@pytest.mark.parametrize("check", list(Check))
def test_delete_tenant_exercise_crash_safety_failpoints(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
@@ -205,15 +214,16 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
run_pg_bench_small(pg_bin, endpoint.connstr())
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
ps_http.configure_failpoints((failpoint, "return"))
@@ -266,23 +276,25 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
assert not tenant_dir.exists()
# Check remote is empty
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
allowed_postfix="initdb.tar.zst",
)
if remote_storage_kind in available_s3_storages():
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
allowed_postfix="initdb.tar.zst",
)
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_tenant_delete_is_resumed_on_attach(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_bin: PgBin,
):
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
@@ -302,15 +314,16 @@ def test_tenant_delete_is_resumed_on_attach(
wait_for_last_flush_lsn(env, endpoint, tenant=tenant_id, timeline=timeline_id)
# sanity check, data should be there
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
# failpoint before we remove index_part from s3
failpoint = "timeline-delete-before-index-delete"
@@ -341,15 +354,16 @@ def test_tenant_delete_is_resumed_on_attach(
iterations=iterations,
)
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
reason = tenant_info["state"]["data"]["reason"]
# failpoint may not be the only error in the stack
@@ -375,16 +389,17 @@ def test_tenant_delete_is_resumed_on_attach(
tenant_path = env.pageserver.tenant_dir(tenant_id)
assert not tenant_path.exists()
ps_http.deletion_queue_flush(execute=True)
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
if remote_storage_kind in available_s3_storages():
ps_http.deletion_queue_flush(execute=True)
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
def test_long_timeline_create_cancelled_by_tenant_delete(neon_env_builder: NeonEnvBuilder):

View File

@@ -21,6 +21,7 @@ from fixtures.pageserver.utils import (
)
from fixtures.remote_storage import (
RemoteStorageKind,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, wait_until
@@ -58,11 +59,16 @@ class ReattachMode(str, enum.Enum):
# Basic detach and re-attach test
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
@pytest.mark.parametrize(
"mode",
[ReattachMode.REATTACH_EXPLICIT, ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP],
)
def test_tenant_reattach(neon_env_builder: NeonEnvBuilder, mode: str):
def test_tenant_reattach(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, mode: str
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
# Exercise retry code path by making all uploads and downloads fail for the
# first time. The retries print INFO-messages to the log; we will check
# that they are present after the test.
@@ -181,13 +187,16 @@ num_rows = 100000
#
# I don't know what's causing that...
@pytest.mark.skip(reason="fixme")
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_tenant_reattach_while_busy(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
updates_started = 0
updates_finished = 0
updates_to_perform = 0
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
# Run random UPDATEs on test table. On failure, try again.
@@ -430,9 +439,13 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv):
should not be present in pageserver's memory"
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_detach_while_attaching(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
##### First start, insert secret data and upload it to the remote storage
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()

View File

@@ -20,6 +20,7 @@ from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import (
LocalFsStorage,
RemoteStorageKind,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import (
@@ -448,9 +449,13 @@ def test_tenant_relocation(
# last-record LSN. We had a bug where GetPage incorrectly followed the
# timeline to the ancestor without waiting for the missing WAL to
# arrive.
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_emergency_relocate_with_branches_slow_replay(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
env.pageserver.is_testing_enabled_or_skip()
pageserver_http = env.pageserver.http_client()
@@ -598,9 +603,13 @@ def test_emergency_relocate_with_branches_slow_replay(
# exist. Update dbir" path (2), and inserts an entry in the
# DbDirectory with 'false' to indicate there is no PG_VERSION file.
#
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_emergency_relocate_with_branches_createdb(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()

View File

@@ -18,7 +18,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.pageserver.utils import timeline_delete_wait_completed
from fixtures.remote_storage import RemoteStorageKind
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
from fixtures.types import Lsn, TenantId
from fixtures.utils import wait_until
from prometheus_client.samples import Sample
@@ -281,7 +281,13 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
assert post_detach_samples == set()
def test_pageserver_with_empty_tenants(neon_env_builder: NeonEnvBuilder):
# Check that empty tenants work with or without the remote storage
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_pageserver_with_empty_tenants(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(

View File

@@ -11,6 +11,7 @@ import os
from pathlib import Path
from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
@@ -26,6 +27,7 @@ from fixtures.pageserver.utils import (
from fixtures.remote_storage import (
LocalFsStorage,
RemoteStorageKind,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, wait_until
@@ -58,7 +60,10 @@ async def all_tenants_workload(env: NeonEnv, tenants_endpoints):
await asyncio.gather(*workers)
def test_tenants_many(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
# FIXME: Is this expected?
@@ -213,7 +218,11 @@ def test_tenants_attached_after_download(neon_env_builder: NeonEnvBuilder):
def test_tenant_redownloads_truncated_file_on_startup(
neon_env_builder: NeonEnvBuilder,
):
# we store the layer file length metadata, we notice on startup that a layer file is of wrong size, and proceed to redownload it.
remote_storage_kind = RemoteStorageKind.LOCAL_FS
# since we now store the layer file length metadata, we notice on startup that a layer file is of wrong size, and proceed to redownload it.
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)

View File

@@ -29,7 +29,8 @@ from fixtures.pageserver.utils import (
from fixtures.remote_storage import (
LocalFsStorage,
RemoteStorageKind,
s3_storage,
available_remote_storages,
available_s3_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, run_pg_bench_small, wait_until
@@ -141,11 +142,25 @@ DELETE_FAILPOINTS = [
]
def combinations():
result = []
remotes = [RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"):
remotes.append(RemoteStorageKind.REAL_S3)
for remote_storage_kind in remotes:
for delete_failpoint in DELETE_FAILPOINTS:
result.append((remote_storage_kind, delete_failpoint))
return result
# cover the two cases: remote storage configured vs not configured
@pytest.mark.parametrize("failpoint", DELETE_FAILPOINTS)
@pytest.mark.parametrize("remote_storage_kind, failpoint", combinations())
@pytest.mark.parametrize("check", list(Check))
def test_delete_timeline_exercise_crash_safety_failpoints(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
failpoint: str,
check: Check,
pg_bin: PgBin,
@@ -165,7 +180,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
7. Ensure failpoint is hit
8. Retry or restart without the failpoint and check the result.
"""
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start(
@@ -186,17 +201,18 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
env.pageserver.allowed_errors.append(f".*{timeline_id}.*failpoint: {failpoint}")
# It appears when we stopped flush loop during deletion and then pageserver is stopped
@@ -300,9 +316,11 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
assert not (timeline_dir.parent / f"{timeline_id}.___deleted").exists()
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
@pytest.mark.parametrize("fill_branch", [True, False])
def test_timeline_resurrection_on_attach(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
fill_branch: bool,
):
"""
@@ -311,6 +329,8 @@ def test_timeline_resurrection_on_attach(
Original issue: https://github.com/neondatabase/neon/issues/3560
"""
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
##### First start, insert data and upload it to the remote storage
env = neon_env_builder.init_start()
@@ -638,10 +658,20 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=2)
@pytest.mark.parametrize(
"remote_storage_kind",
list(
filter(
lambda s: s in (RemoteStorageKind.MOCK_S3, RemoteStorageKind.REAL_S3),
available_remote_storages(),
)
),
)
def test_timeline_delete_works_for_remote_smoke(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
@@ -774,11 +804,12 @@ def test_delete_orphaned_objects(
assert env.pageserver_remote_storage.index_path(env.initial_tenant, timeline_id).exists()
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_timeline_delete_resumed_on_attach(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_bin: PgBin,
):
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
@@ -793,17 +824,18 @@ def test_timeline_delete_resumed_on_attach(
run_pg_bench_small(pg_bin, endpoint.connstr())
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
# failpoint before we remove index_part from s3
failpoint = "timeline-delete-during-rm"
@@ -841,17 +873,18 @@ def test_timeline_delete_resumed_on_attach(
# failpoint may not be the only error in the stack
assert reason.endswith(f"failpoint: {failpoint}"), reason
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
"timelines",
str(timeline_id),
)
),
)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
"timelines",
str(timeline_id),
)
),
)
# now we stop pageserver and remove local tenant state
env.endpoints.stop_all()
@@ -872,14 +905,15 @@ def test_timeline_delete_resumed_on_attach(
tenant_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
assert not tenant_path.exists()
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(timeline_id),
"timelines",
str(timeline_id),
)
),
)
if remote_storage_kind in available_s3_storages():
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(timeline_id),
"timelines",
str(timeline_id),
)
),
)

View File

@@ -39,7 +39,10 @@ from fixtures.pageserver.utils import (
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind, default_remote_storage
from fixtures.remote_storage import (
RemoteStorageKind,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import get_dir_size, query_scalar, start_in_background
@@ -454,9 +457,10 @@ def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId,
return sk_wal_size_mb <= target_size_mb
def test_wal_backup(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
@@ -499,10 +503,11 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder):
)
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant

View File

@@ -53,8 +53,7 @@ num-traits = { version = "0.2", features = ["i128"] }
prost = { version = "0.11" }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.8" }
regex-syntax = { version = "0.7" }
reqwest = { version = "0.11", default-features = false, features = ["blocking", "default-tls", "json", "multipart", "rustls-tls", "stream"] }
ring = { version = "0.16", features = ["std"] }
rustls = { version = "0.21", features = ["dangerous_configuration"] }
@@ -66,7 +65,7 @@ subtle = { version = "2" }
time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
tokio-rustls = { version = "0.24" }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io"] }
toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
toml_edit = { version = "0.19", features = ["serde"] }
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "log", "timeout", "util"] }
@@ -91,8 +90,7 @@ memchr = { version = "2" }
nom = { version = "7" }
prost = { version = "0.11" }
regex = { version = "1" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.8" }
regex-syntax = { version = "0.7" }
serde = { version = "1", features = ["alloc", "derive"] }
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit"] }
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit", "visit-mut"] }