Compare commits

..

1 Commits

Author SHA1 Message Date
Heikki Linnakangas
f011af063d Increase timeout when stopping a service in tests.
I've seen a few test failures with error:

    pageserver stop failed: pageserver with pid 115851 did not stop in 10 seconds

These have all been with tests that use real S3. Pageserver shutdown
waits for all in-memory layers to be flush to disk and uploaded to
remote storage, so I think it's reasonable that that might take longer
than 10 s if there's some kind of a network hiccup.
2023-05-14 17:51:14 +03:00
21 changed files with 200 additions and 460 deletions

View File

@@ -206,4 +206,4 @@ runs:
uses: ./.github/actions/allure-report-store
with:
report-dir: /tmp/test_output/allure/results
unique-key: ${{ inputs.build_type }}
unique-key: ${{ inputs.test_selection }}-${{ inputs.build_type }}

11
Cargo.lock generated
View File

@@ -2859,22 +2859,22 @@ dependencies = [
[[package]]
name = "pin-project"
version = "1.1.0"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c95a7476719eab1e366eaf73d0260af3021184f18177925b07f54b30089ceead"
checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.1.0"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07"
checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.15",
"syn 1.0.109",
]
[[package]]
@@ -4979,7 +4979,6 @@ dependencies = [
"metrics",
"nix",
"once_cell",
"pin-project",
"pin-project-lite",
"pq_proto",
"rand",

View File

@@ -30,12 +30,12 @@ use utils::pid_file::{self, PidFileRead};
// These constants control the loop used to poll for process start / stop.
//
// The loop waits for at most 10 seconds, polling every 100 ms.
// The loop waits for at most 20 seconds, polling every 100 ms.
// Once a second, it prints a dot ("."), to give the user an indication that
// it's waiting. If the process hasn't started/stopped after 5 seconds,
// it prints a notice that it's taking long, but keeps waiting.
//
const RETRY_UNTIL_SECS: u64 = 10;
const RETRY_UNTIL_SECS: u64 = 20;
const RETRIES: u64 = (RETRY_UNTIL_SECS * 1000) / RETRY_INTERVAL_MILLIS;
const RETRY_INTERVAL_MILLIS: u64 = 100;
const DOT_EVERY_RETRIES: u64 = 10;

View File

@@ -41,9 +41,6 @@ pq_proto.workspace = true
metrics.workspace = true
workspace_hack.workspace = true
# needed to have pin-projected with PinnedDrop
pin-project = "1.1"
[dev-dependencies]
byteorder.workspace = true
bytes.workspace = true

View File

@@ -147,56 +147,3 @@ macro_rules! const_assert {
const _: () = assert!($($args)*);
};
}
pub mod cancel_log {
pub trait CancelationLoggingExt {
fn log_being_canceled(self, step: &'static str) -> LogCancelation<Self>
where
Self: Sized;
}
impl<Fut: std::future::Future> CancelationLoggingExt for Fut {
fn log_being_canceled(self, step: &'static str) -> LogCancelation<Self>
where
Self: Sized,
{
LogCancelation {
inner: self,
name: Some(step),
span: tracing::Span::current(),
}
}
}
#[pin_project::pin_project(PinnedDrop)]
pub struct LogCancelation<Fut> {
#[pin]
inner: Fut,
name: Option<&'static str>,
span: tracing::Span,
}
impl<Fut: std::future::Future> std::future::Future for LogCancelation<Fut> {
type Output = Fut::Output;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
let ready = std::task::ready!(this.inner.poll(cx));
this.name.take();
std::task::Poll::Ready(ready)
}
}
#[pin_project::pinned_drop]
impl<Fut> PinnedDrop for LogCancelation<Fut> {
fn drop(self: std::pin::Pin<&mut Self>) {
if let Some(name) = self.name {
let _g = self.span.enter();
tracing::info!("canceled while waiting for {name}");
}
}
}
}

View File

@@ -29,7 +29,6 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline}
use crate::{config::PageServerConf, tenant::mgr};
use utils::{
auth::JwtAuth,
cancel_log::CancelationLoggingExt,
http::{
endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
error::{ApiError, HttpErrorBody},
@@ -187,7 +186,6 @@ async fn build_timeline_info(
CancellationToken::new(),
ctx,
)
.log_being_canceled("get_current_logical_size_non_incremental")
.await?,
);
}
@@ -340,9 +338,7 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline_info = async {
let tenant = mgr::get_tenant(tenant_id, true)
.log_being_canceled("get_tenant")
.await?;
let tenant = mgr::get_tenant(tenant_id, true).await?;
let timeline = tenant
.get_timeline(timeline_id, false)
@@ -353,7 +349,6 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
include_non_incremental_logical_size.unwrap_or(false),
&ctx,
)
.log_being_canceled("build_timeline_info")
.await
.context("get local timeline info")
.map_err(ApiError::InternalServerError)?;
@@ -361,7 +356,6 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
Ok::<_, ApiError>(timeline_info)
}
.instrument(info_span!("timeline_detail", tenant = %tenant_id, timeline = %timeline_id))
.log_being_canceled("whole response")
.await?;
json_response(StatusCode::OK, timeline_info)

View File

@@ -1264,24 +1264,8 @@ impl Tenant {
"Cannot create timelines on inactive tenant"
);
if let Ok(existing) = self.get_timeline(new_timeline_id, false) {
if self.get_timeline(new_timeline_id, false).is_ok() {
debug!("timeline {new_timeline_id} already exists");
if let Some(remote_client) = existing.remote_client.as_ref() {
// Wait for uploads to complete, so that when we return Ok, the timeline
// is known to be durable on remote storage. Just like we do at the end of
// this function, after we have created the timeline ourselves.
//
// We only really care that the initial version of `index_part.json` has
// been uploaded. That's enough to remember that the timeline
// exists. However, there is no function to wait specifically for that so
// we just wait for all in-progress uploads to finish.
remote_client
.wait_completion()
.await
.context("wait for timeline uploads to complete")?;
}
return Ok(None);
}
@@ -1323,17 +1307,6 @@ impl Tenant {
}
};
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
// Ok, the timeline is durable in remote storage.
let kind = ancestor_timeline_id
.map(|_| "branched")
.unwrap_or("bootstrapped");
remote_client.wait_completion().await.with_context(|| {
format!("wait for {} timeline initial uploads to complete", kind)
})?;
}
Ok(Some(loaded_timeline))
}
@@ -2403,18 +2376,17 @@ impl Tenant {
src_timeline.initdb_lsn,
src_timeline.pg_version,
);
let new_timeline = {
let mut timelines = self.timelines.lock().unwrap();
self.prepare_timeline(
let mut timelines = self.timelines.lock().unwrap();
let new_timeline = self
.prepare_timeline(
dst_id,
&metadata,
timeline_uninit_mark,
false,
Some(Arc::clone(src_timeline)),
)?
.initialize_with_lock(ctx, &mut timelines, true, true)?
};
.initialize_with_lock(ctx, &mut timelines, true, true)?;
drop(timelines);
// Root timeline gets its layers during creation and uploads them along with the metadata.
// A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.

View File

@@ -292,93 +292,77 @@ fn bad_duration<'a>(field_name: &'static str, value: &'a str) -> impl 'a + Fn()
move || format!("Cannot parse `{field_name}` duration {value:?}")
}
impl TenantConfOpt {
#[allow(clippy::too_many_arguments)]
fn from_request(
checkpoint_distance: Option<u64>,
checkpoint_timeout: &Option<String>,
compaction_target_size: Option<u64>,
compaction_period: &Option<String>,
compaction_threshold: Option<usize>,
gc_horizon: Option<u64>,
gc_period: &Option<String>,
image_creation_threshold: Option<usize>,
pitr_interval: &Option<String>,
walreceiver_connect_timeout: &Option<String>,
lagging_wal_timeout: &Option<String>,
max_lsn_wal_lag: Option<NonZeroU64>,
trace_read_requests: Option<bool>,
eviction_policy: &Option<serde_json::Value>,
min_resident_size_override: Option<u64>,
evictions_low_residence_duration_metric_threshold: &Option<String>,
) -> Result<Self, anyhow::Error> {
impl TryFrom<&'_ TenantCreateRequest> for TenantConfOpt {
type Error = anyhow::Error;
fn try_from(request_data: &TenantCreateRequest) -> Result<Self, Self::Error> {
let mut tenant_conf = TenantConfOpt::default();
if let Some(gc_period) = &gc_period {
if let Some(gc_period) = &request_data.gc_period {
tenant_conf.gc_period = Some(
humantime::parse_duration(gc_period)
.with_context(bad_duration("gc_period", gc_period))?,
);
}
tenant_conf.gc_horizon = gc_horizon;
tenant_conf.image_creation_threshold = image_creation_threshold;
tenant_conf.gc_horizon = request_data.gc_horizon;
tenant_conf.image_creation_threshold = request_data.image_creation_threshold;
if let Some(pitr_interval) = &pitr_interval {
if let Some(pitr_interval) = &request_data.pitr_interval {
tenant_conf.pitr_interval = Some(
humantime::parse_duration(pitr_interval)
.with_context(bad_duration("pitr_interval", pitr_interval))?,
);
}
if let Some(walreceiver_connect_timeout) = &walreceiver_connect_timeout {
if let Some(walreceiver_connect_timeout) = &request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(walreceiver_connect_timeout).with_context(
bad_duration("walreceiver_connect_timeout", walreceiver_connect_timeout),
)?,
);
}
if let Some(lagging_wal_timeout) = &lagging_wal_timeout {
if let Some(lagging_wal_timeout) = &request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout = Some(
humantime::parse_duration(lagging_wal_timeout)
.with_context(bad_duration("lagging_wal_timeout", lagging_wal_timeout))?,
);
}
if let Some(max_lsn_wal_lag) = max_lsn_wal_lag {
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
}
if let Some(trace_read_requests) = trace_read_requests {
if let Some(trace_read_requests) = request_data.trace_read_requests {
tenant_conf.trace_read_requests = Some(trace_read_requests);
}
tenant_conf.checkpoint_distance = checkpoint_distance;
if let Some(checkpoint_timeout) = &checkpoint_timeout {
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = &request_data.checkpoint_timeout {
tenant_conf.checkpoint_timeout = Some(
humantime::parse_duration(checkpoint_timeout)
.with_context(bad_duration("checkpoint_timeout", checkpoint_timeout))?,
);
}
tenant_conf.compaction_target_size = compaction_target_size;
tenant_conf.compaction_threshold = compaction_threshold;
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
if let Some(compaction_period) = &compaction_period {
if let Some(compaction_period) = &request_data.compaction_period {
tenant_conf.compaction_period = Some(
humantime::parse_duration(compaction_period)
.with_context(bad_duration("compaction_period", compaction_period))?,
);
}
if let Some(eviction_policy) = &eviction_policy {
if let Some(eviction_policy) = &request_data.eviction_policy {
tenant_conf.eviction_policy = Some(
serde::Deserialize::deserialize(eviction_policy)
.context("parse field `eviction_policy`")?,
);
}
tenant_conf.min_resident_size_override = min_resident_size_override;
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
&evictions_low_residence_duration_metric_threshold
&request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(evictions_low_residence_duration_metric_threshold)
@@ -393,53 +377,81 @@ impl TenantConfOpt {
}
}
impl TryFrom<&'_ TenantCreateRequest> for TenantConfOpt {
type Error = anyhow::Error;
fn try_from(request_data: &TenantCreateRequest) -> Result<Self, Self::Error> {
Self::from_request(
request_data.checkpoint_distance,
&request_data.checkpoint_timeout,
request_data.compaction_target_size,
&request_data.compaction_period,
request_data.compaction_threshold,
request_data.gc_horizon,
&request_data.gc_period,
request_data.image_creation_threshold,
&request_data.pitr_interval,
&request_data.walreceiver_connect_timeout,
&request_data.lagging_wal_timeout,
request_data.max_lsn_wal_lag,
request_data.trace_read_requests,
&request_data.eviction_policy,
request_data.min_resident_size_override,
&request_data.evictions_low_residence_duration_metric_threshold,
)
}
}
impl TryFrom<&'_ TenantConfigRequest> for TenantConfOpt {
type Error = anyhow::Error;
fn try_from(request_data: &TenantConfigRequest) -> Result<Self, Self::Error> {
Self::from_request(
request_data.checkpoint_distance,
&request_data.checkpoint_timeout,
request_data.compaction_target_size,
&request_data.compaction_period,
request_data.compaction_threshold,
request_data.gc_horizon,
&request_data.gc_period,
request_data.image_creation_threshold,
&request_data.pitr_interval,
&request_data.walreceiver_connect_timeout,
&request_data.lagging_wal_timeout,
request_data.max_lsn_wal_lag,
request_data.trace_read_requests,
&request_data.eviction_policy,
request_data.min_resident_size_override,
&request_data.evictions_low_residence_duration_metric_threshold,
)
let mut tenant_conf = TenantConfOpt::default();
if let Some(gc_period) = &request_data.gc_period {
tenant_conf.gc_period = Some(
humantime::parse_duration(gc_period)
.with_context(bad_duration("gc_period", gc_period))?,
);
}
tenant_conf.gc_horizon = request_data.gc_horizon;
tenant_conf.image_creation_threshold = request_data.image_creation_threshold;
if let Some(pitr_interval) = &request_data.pitr_interval {
tenant_conf.pitr_interval = Some(
humantime::parse_duration(pitr_interval)
.with_context(bad_duration("pitr_interval", pitr_interval))?,
);
}
if let Some(walreceiver_connect_timeout) = &request_data.walreceiver_connect_timeout {
tenant_conf.walreceiver_connect_timeout = Some(
humantime::parse_duration(walreceiver_connect_timeout).with_context(
bad_duration("walreceiver_connect_timeout", walreceiver_connect_timeout),
)?,
);
}
if let Some(lagging_wal_timeout) = &request_data.lagging_wal_timeout {
tenant_conf.lagging_wal_timeout = Some(
humantime::parse_duration(lagging_wal_timeout)
.with_context(bad_duration("lagging_wal_timeout", lagging_wal_timeout))?,
);
}
tenant_conf.max_lsn_wal_lag = request_data.max_lsn_wal_lag;
tenant_conf.trace_read_requests = request_data.trace_read_requests;
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
if let Some(checkpoint_timeout) = &request_data.checkpoint_timeout {
tenant_conf.checkpoint_timeout = Some(
humantime::parse_duration(checkpoint_timeout)
.with_context(bad_duration("checkpoint_timeout", checkpoint_timeout))?,
);
}
tenant_conf.compaction_target_size = request_data.compaction_target_size;
tenant_conf.compaction_threshold = request_data.compaction_threshold;
if let Some(compaction_period) = &request_data.compaction_period {
tenant_conf.compaction_period = Some(
humantime::parse_duration(compaction_period)
.with_context(bad_duration("compaction_period", compaction_period))?,
);
}
if let Some(eviction_policy) = &request_data.eviction_policy {
tenant_conf.eviction_policy = Some(
serde::Deserialize::deserialize(eviction_policy)
.context("parse field `eviction_policy`")?,
);
}
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
if let Some(evictions_low_residence_duration_metric_threshold) =
&request_data.evictions_low_residence_duration_metric_threshold
{
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
humantime::parse_duration(evictions_low_residence_duration_metric_threshold)
.with_context(bad_duration(
"evictions_low_residence_duration_metric_threshold",
evictions_low_residence_duration_metric_threshold,
))?,
);
}
Ok(tenant_conf)
}
}

View File

@@ -192,9 +192,8 @@ retry:
{
if (!PQconsumeInput(pageserver_conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
neon_log(LOG, "could not get response from pageserver: %s", msg);
pfree(msg);
neon_log(LOG, "could not get response from pageserver: %s",
PQerrorMessage(pageserver_conn));
return -1;
}
}
@@ -344,7 +343,7 @@ pageserver_receive(void)
resp = NULL;
}
else if (rc == -2)
neon_log(ERROR, "could not read COPY data: %s", pchomp(PQerrorMessage(pageserver_conn)));
neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
else
neon_log(ERROR, "unexpected PQgetCopyData return value: %d", rc);
}
@@ -368,7 +367,7 @@ pageserver_flush(void)
}
else if (PQflush(pageserver_conn))
{
char *msg = pchomp(PQerrorMessage(pageserver_conn));
char *msg = PQerrorMessage(pageserver_conn);
pageserver_disconnect();
neon_log(ERROR, "failed to flush page requests: %s", msg);

View File

@@ -7,7 +7,6 @@ mod credentials;
pub use credentials::ClientCredentials;
mod password_hack;
pub use password_hack::parse_endpoint_param;
use password_hack::PasswordHackPayload;
mod flow;
@@ -45,10 +44,10 @@ pub enum AuthErrorImpl {
#[error(
"Endpoint ID is not specified. \
Either please upgrade the postgres client library (libpq) for SNI support \
or pass the endpoint ID (first part of the domain name) as a parameter: '?options=endpoint%3D<endpoint-id>'. \
or pass the endpoint ID (first part of the domain name) as a parameter: '?options=project%3D<endpoint-id>'. \
See more at https://neon.tech/sni"
)]
MissingEndpointName,
MissingProjectName,
#[error("password authentication failed for user '{0}'")]
AuthFailed(Box<str>),
@@ -89,7 +88,7 @@ impl UserFacingError for AuthError {
AuthFailed(_) => self.to_string(),
BadAuthMethod(_) => self.to_string(),
MalformedPassword(_) => self.to_string(),
MissingEndpointName => self.to_string(),
MissingProjectName => self.to_string(),
Io(_) => "Internal error".to_string(),
}
}

View File

@@ -52,8 +52,8 @@ pub async fn password_hack(
.authenticate()
.await?;
info!(project = &payload.endpoint, "received missing parameter");
creds.project = Some(payload.endpoint);
info!(project = &payload.project, "received missing parameter");
creds.project = Some(payload.project);
let mut node = api.wake_compute(extra, creds).await?;
node.config.password(payload.password);

View File

@@ -1,7 +1,6 @@
//! User credentials used in authentication.
use crate::{auth::password_hack::parse_endpoint_param, error::UserFacingError};
use itertools::Itertools;
use crate::error::UserFacingError;
use pq_proto::StartupMessageParams;
use std::collections::HashSet;
use thiserror::Error;
@@ -62,15 +61,7 @@ impl<'a> ClientCredentials<'a> {
// Project name might be passed via PG's command-line options.
let project_option = params
.options_raw()
.and_then(|options| {
// We support both `project` (deprecated) and `endpoint` options for backward compatibility.
// However, if both are present, we don't exactly know which one to use.
// Therefore we require that only one of them is present.
options
.filter_map(parse_endpoint_param)
.at_most_one()
.ok()?
})
.and_then(|mut options| options.find_map(|opt| opt.strip_prefix("project=")))
.map(|name| name.to_string());
let project_from_domain = if let Some(sni_str) = sni {
@@ -186,51 +177,6 @@ mod tests {
Ok(())
}
#[test]
fn parse_endpoint_from_options() -> anyhow::Result<()> {
let options = StartupMessageParams::new([
("user", "john_doe"),
("options", "-ckey=1 endpoint=bar -c geqo=off"),
]);
let creds = ClientCredentials::parse(&options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project.as_deref(), Some("bar"));
Ok(())
}
#[test]
fn parse_three_endpoints_from_options() -> anyhow::Result<()> {
let options = StartupMessageParams::new([
("user", "john_doe"),
(
"options",
"-ckey=1 endpoint=one endpoint=two endpoint=three -c geqo=off",
),
]);
let creds = ClientCredentials::parse(&options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert!(creds.project.is_none());
Ok(())
}
#[test]
fn parse_when_endpoint_and_project_are_in_options() -> anyhow::Result<()> {
let options = StartupMessageParams::new([
("user", "john_doe"),
("options", "-ckey=1 endpoint=bar project=foo -c geqo=off"),
]);
let creds = ClientCredentials::parse(&options, None, None)?;
assert_eq!(creds.user, "john_doe");
assert!(creds.project.is_none());
Ok(())
}
#[test]
fn parse_projects_identical() -> anyhow::Result<()> {
let options = StartupMessageParams::new([("user", "john_doe"), ("options", "project=baz")]);

View File

@@ -91,7 +91,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, PasswordHack> {
// the user neither enabled SNI nor resorted to any other method
// for passing the project name we rely on. We should show them
// the most helpful error message and point to the documentation.
.ok_or(AuthErrorImpl::MissingEndpointName)?;
.ok_or(AuthErrorImpl::MissingProjectName)?;
Ok(payload)
}

View File

@@ -6,55 +6,27 @@
use bstr::ByteSlice;
pub struct PasswordHackPayload {
pub endpoint: String,
pub project: String,
pub password: Vec<u8>,
}
impl PasswordHackPayload {
pub fn parse(bytes: &[u8]) -> Option<Self> {
// The format is `project=<utf-8>;<password-bytes>`.
let mut iter = bytes.splitn_str(2, ";");
let endpoint = iter.next()?.to_str().ok()?;
let endpoint = parse_endpoint_param(endpoint)?.to_owned();
let mut iter = bytes.strip_prefix(b"project=")?.splitn_str(2, ";");
let project = iter.next()?.to_str().ok()?.to_owned();
let password = iter.next()?.to_owned();
Some(Self { endpoint, password })
Some(Self { project, password })
}
}
pub fn parse_endpoint_param(bytes: &str) -> Option<&str> {
bytes
.strip_prefix("project=")
.or_else(|| bytes.strip_prefix("endpoint="))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_endpoint_param_fn() {
let input = "";
assert!(parse_endpoint_param(input).is_none());
let input = "project=";
assert_eq!(parse_endpoint_param(input), Some(""));
let input = "project=foobar";
assert_eq!(parse_endpoint_param(input), Some("foobar"));
let input = "endpoint=";
assert_eq!(parse_endpoint_param(input), Some(""));
let input = "endpoint=foobar";
assert_eq!(parse_endpoint_param(input), Some("foobar"));
let input = "other_option=foobar";
assert!(parse_endpoint_param(input).is_none());
}
#[test]
fn parse_password_hack_payload_project() {
fn parse_password_hack_payload() {
let bytes = b"";
assert!(PasswordHackPayload::parse(bytes).is_none());
@@ -62,33 +34,13 @@ mod tests {
assert!(PasswordHackPayload::parse(bytes).is_none());
let bytes = b"project=;";
let payload: PasswordHackPayload =
PasswordHackPayload::parse(bytes).expect("parsing failed");
assert_eq!(payload.endpoint, "");
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
assert_eq!(payload.project, "");
assert_eq!(payload.password, b"");
let bytes = b"project=foobar;pass;word";
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
assert_eq!(payload.endpoint, "foobar");
assert_eq!(payload.password, b"pass;word");
}
#[test]
fn parse_password_hack_payload_endpoint() {
let bytes = b"";
assert!(PasswordHackPayload::parse(bytes).is_none());
let bytes = b"endpoint=";
assert!(PasswordHackPayload::parse(bytes).is_none());
let bytes = b"endpoint=;";
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
assert_eq!(payload.endpoint, "");
assert_eq!(payload.password, b"");
let bytes = b"endpoint=foobar;pass;word";
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
assert_eq!(payload.endpoint, "foobar");
assert_eq!(payload.project, "foobar");
assert_eq!(payload.password, b"pass;word");
}
}

View File

@@ -1,4 +1,4 @@
use crate::{auth::parse_endpoint_param, cancellation::CancelClosure, error::UserFacingError};
use crate::{cancellation::CancelClosure, error::UserFacingError};
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
@@ -279,7 +279,7 @@ fn filtered_options(params: &StartupMessageParams) -> Option<String> {
#[allow(unstable_name_collisions)]
let options: String = params
.options_raw()?
.filter(|opt| parse_endpoint_param(opt).is_none())
.filter(|opt| !opt.starts_with("project="))
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();

View File

@@ -79,17 +79,7 @@ module.exports = async ({ github, context, fetch, report }) => {
for (const parentSuite of suites.children) {
for (const suite of parentSuite.children) {
for (const test of suite.children) {
let buildType, pgVersion
const match = test.name.match(/[\[-](?<buildType>debug|release)-pg(?<pgVersion>\d+)[-\]]/)?.groups
if (match) {
({buildType, pgVersion} = match)
} else {
// It's ok, we embed BUILD_TYPE and Postgres Version into the test name only for regress suite and do not for other suites (like performance).
console.info(`Cannot get BUILD_TYPE and Postgres Version from test name: "${test.name}", defaulting to "release" and "14"`)
buildType = "release"
pgVersion = "14"
}
const {groups: {buildType, pgVersion}} = test.name.match(/[\[-](?<buildType>debug|release)-pg(?<pgVersion>\d+)[-\]]/)
pgVersions.add(pgVersion)
buildTypes.add(buildType)

View File

@@ -272,7 +272,6 @@ class PageserverHttpClient(requests.Session):
new_timeline_id: Optional[TimelineId] = None,
ancestor_timeline_id: Optional[TimelineId] = None,
ancestor_start_lsn: Optional[Lsn] = None,
**kwargs,
) -> Dict[Any, Any]:
body: Dict[str, Any] = {
"new_timeline_id": str(new_timeline_id) if new_timeline_id else None,
@@ -282,9 +281,7 @@ class PageserverHttpClient(requests.Session):
if pg_version != PgVersion.NOT_SET:
body["pg_version"] = int(pg_version)
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", json=body, **kwargs
)
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", json=body)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f"could not create timeline: already exists for id {new_timeline_id}")

View File

@@ -136,7 +136,9 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*")
# remove the initial tenant
## why wait for upload queue? => https://github.com/neondatabase/neon/issues/3865
assert env.initial_timeline
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, env.initial_timeline)
pageserver_http.tenant_detach(env.initial_tenant)
assert isinstance(env.remote_storage, LocalFsStorage)
tenant_remote_storage = env.remote_storage.root / "tenants" / str(env.initial_tenant)

View File

@@ -5,18 +5,16 @@ import pytest
from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
def test_proxy_select_1(static_proxy: NeonProxy, option_name: str):
def test_proxy_select_1(static_proxy: NeonProxy):
"""
A simplest smoke test: check proxy against a local postgres instance.
"""
out = static_proxy.safe_psql("select 1", options=f"{option_name}=generic-project-name")
out = static_proxy.safe_psql("select 1", options="project=generic-project-name")
assert out[0][0] == 1
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
def test_password_hack(static_proxy: NeonProxy, option_name: str):
def test_password_hack(static_proxy: NeonProxy):
"""
Check the PasswordHack auth flow: an alternative to SCRAM auth for
clients which can't provide the project/endpoint name via SNI or `options`.
@@ -25,12 +23,11 @@ def test_password_hack(static_proxy: NeonProxy, option_name: str):
user = "borat"
password = "password"
static_proxy.safe_psql(
f"create role {user} with login password '{password}'",
options=f"{option_name}=irrelevant",
f"create role {user} with login password '{password}'", options="project=irrelevant"
)
# Note the format of `magic`!
magic = f"{option_name}=irrelevant;{password}"
magic = f"project=irrelevant;{password}"
static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic)
# Must also check that invalid magic won't be accepted.
@@ -59,62 +56,55 @@ async def test_link_auth(vanilla_pg: VanillaPostgres, link_proxy: NeonProxy):
assert out == "42"
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
def test_proxy_options(static_proxy: NeonProxy, option_name: str):
def test_proxy_options(static_proxy: NeonProxy):
"""
Check that we pass extra `options` to the PostgreSQL server:
* `project=...` and `endpoint=...` shouldn't be passed at all
* (otherwise postgres will raise an error).
* `project=...` shouldn't be passed at all (otherwise postgres will raise an error).
* everything else should be passed as-is.
"""
options = f"{option_name}=irrelevant -cproxytest.option=value"
options = "project=irrelevant -cproxytest.option=value"
out = static_proxy.safe_psql("show proxytest.option", options=options)
assert out[0][0] == "value"
options = f"-c proxytest.foo=\\ str {option_name}=irrelevant"
options = "-c proxytest.foo=\\ str project=irrelevant"
out = static_proxy.safe_psql("show proxytest.foo", options=options)
assert out[0][0] == " str"
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
def test_auth_errors(static_proxy: NeonProxy, option_name: str):
def test_auth_errors(static_proxy: NeonProxy):
"""
Check that we throw very specific errors in some unsuccessful auth scenarios.
"""
# User does not exist
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.connect(user="pinocchio", options=f"{option_name}=irrelevant")
static_proxy.connect(user="pinocchio", options="project=irrelevant")
text = str(exprinfo.value).strip()
assert text.endswith("password authentication failed for user 'pinocchio'")
static_proxy.safe_psql(
"create role pinocchio with login password 'magic'",
options=f"{option_name}=irrelevant",
"create role pinocchio with login password 'magic'", options="project=irrelevant"
)
# User exists, but password is missing
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.connect(user="pinocchio", password=None, options=f"{option_name}=irrelevant")
static_proxy.connect(user="pinocchio", password=None, options="project=irrelevant")
text = str(exprinfo.value).strip()
assert text.endswith("password authentication failed for user 'pinocchio'")
# User exists, but password is wrong
with pytest.raises(psycopg2.Error) as exprinfo:
static_proxy.connect(user="pinocchio", password="bad", options=f"{option_name}=irrelevant")
static_proxy.connect(user="pinocchio", password="bad", options="project=irrelevant")
text = str(exprinfo.value).strip()
assert text.endswith("password authentication failed for user 'pinocchio'")
# Finally, check that the user can connect
with static_proxy.connect(
user="pinocchio", password="magic", options=f"{option_name}=irrelevant"
):
with static_proxy.connect(user="pinocchio", password="magic", options="project=irrelevant"):
pass
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
def test_forward_params_to_client(static_proxy: NeonProxy):
"""
Check that we forward all necessary PostgreSQL server params to client.
"""
@@ -140,7 +130,7 @@ def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
where name = any(%s)
"""
with static_proxy.connect(options=f"{option_name}=irrelevant") as conn:
with static_proxy.connect(options="project=irrelevant") as conn:
with conn.cursor() as cur:
cur.execute(query, (reported_params_subset,))
for name, value in cur.fetchall():
@@ -148,18 +138,17 @@ def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
assert conn.get_parameter_status(name) == value
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
@pytest.mark.timeout(5)
def test_close_on_connections_exit(static_proxy: NeonProxy, option_name: str):
def test_close_on_connections_exit(static_proxy: NeonProxy):
# Open two connections, send SIGTERM, then ensure that proxy doesn't exit
# until after connections close.
with static_proxy.connect(options=f"{option_name}=irrelevant"), static_proxy.connect(
options=f"{option_name}=irrelevant"
with static_proxy.connect(options="project=irrelevant"), static_proxy.connect(
options="project=irrelevant"
):
static_proxy.terminate()
with pytest.raises(subprocess.TimeoutExpired):
static_proxy.wait_for_exit(timeout=2)
# Ensure we don't accept any more connections
with pytest.raises(psycopg2.OperationalError):
static_proxy.connect(options=f"{option_name}=irrelevant")
static_proxy.connect(options="project=irrelevant")
static_proxy.wait_for_exit()

View File

@@ -2,12 +2,11 @@
# env NEON_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
import os
import queue
import shutil
import threading
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Tuple
import pytest
from fixtures.log_helper import log
@@ -27,7 +26,6 @@ from fixtures.pageserver.utils import (
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import print_gc_result, query_scalar, wait_until
from requests import ReadTimeout
#
@@ -628,7 +626,10 @@ def test_empty_branch_remote_storage_upload(
new_branch_name = "new_branch"
new_branch_timeline_id = env.neon_cli.create_branch(new_branch_name, "main", env.initial_tenant)
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
with env.endpoints.create_start(new_branch_name, tenant_id=env.initial_tenant) as endpoint:
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_branch_timeline_id)
wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id)
timelines_before_detach = set(
map(
@@ -657,19 +658,13 @@ def test_empty_branch_remote_storage_upload(
), f"Expected to have same timelines after reattach, but got {timelines_after_detach}"
# Branches off a root branch, but does not write anything to the new branch, so it has a metadata file only.
# Ensures the branch is not on the remote storage and restarts the pageserver — the branch should be uploaded after the restart.
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_empty_branch_remote_storage_upload_on_restart(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
"""
Branches off a root branch, but does not write anything to the new branch, so
it has a metadata file only.
Ensures the branch is not on the remote storage and restarts the pageserver
— the upload should be scheduled by load, and create_timeline should await
for it even though it gets 409 Conflict.
"""
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_empty_branch_remote_storage_upload_on_restart",
@@ -678,87 +673,35 @@ def test_empty_branch_remote_storage_upload_on_restart(
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
client.configure_failpoints(("before-upload-index", "return"))
new_branch_name = "new_branch"
new_branch_timeline_id = env.neon_cli.create_branch(new_branch_name, "main", env.initial_tenant)
new_branch_timeline_id = TimelineId.generate()
with env.endpoints.create_start(new_branch_name, tenant_id=env.initial_tenant) as endpoint:
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_branch_timeline_id)
wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id)
with pytest.raises(ReadTimeout):
client.timeline_create(
tenant_id=env.initial_tenant,
ancestor_timeline_id=env.initial_timeline,
new_timeline_id=new_branch_timeline_id,
pg_version=env.pg_version,
timeout=4,
)
env.pageserver.allowed_errors.append(
f".*POST.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
)
# index upload is now hitting the failpoint, should not block the shutdown
env.pageserver.stop()
timeline_path = (
Path("tenants") / str(env.initial_tenant) / "timelines" / str(new_branch_timeline_id)
)
local_metadata = env.repo_dir / timeline_path / "metadata"
assert local_metadata.is_file(), "timeout cancelled timeline branching, not the upload"
# Remove new branch from the remote storage
assert isinstance(env.remote_storage, LocalFsStorage)
new_branch_on_remote_storage = env.remote_storage.root / timeline_path
new_branch_on_remote_storage = (
env.remote_storage.root
/ "tenants"
/ str(env.initial_tenant)
/ "timelines"
/ str(new_branch_timeline_id)
)
assert (
not new_branch_on_remote_storage.exists()
), "failpoint should had prohibited index_part.json upload"
new_branch_on_remote_storage.is_dir()
), f"'{new_branch_on_remote_storage}' path does not exist on the remote storage"
shutil.rmtree(new_branch_on_remote_storage)
# during reconciliation we should had scheduled the uploads and on the
# retried create_timeline, we will await for those to complete on next
# client.timeline_create
env.pageserver.start(extra_env_vars={"FAILPOINTS": "before-upload-index=return"})
env.pageserver.start()
# sleep a bit to force the upload task go into exponential backoff
time.sleep(1)
q: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
barrier = threading.Barrier(2)
def create_in_background():
barrier.wait()
try:
client.timeline_create(
tenant_id=env.initial_tenant,
ancestor_timeline_id=env.initial_timeline,
new_timeline_id=new_branch_timeline_id,
pg_version=env.pg_version,
)
q.put(None)
except PageserverApiException as e:
q.put(e)
create_thread = threading.Thread(target=create_in_background)
create_thread.start()
try:
# maximize chances of actually waiting for the uploads by create_timeline
barrier.wait()
assert not new_branch_on_remote_storage.exists(), "failpoint should had stopped uploading"
client.configure_failpoints(("before-upload-index", "off"))
conflict = q.get()
assert conflict, "create_timeline should not have succeeded"
assert (
conflict.status_code == 409
), "timeline was created before restart, and uploads scheduled during initial load, so we expect 409 conflict"
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
assert (
new_branch_on_remote_storage / "index_part.json"
).is_file(), "uploads scheduled during initial load should had been awaited for"
finally:
create_thread.join()
wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id)
assert (
new_branch_on_remote_storage.is_dir()
), f"New branch should have been reuploaded on pageserver restart to the remote storage path '{new_branch_on_remote_storage}'"
def wait_upload_queue_empty(
@@ -809,17 +752,4 @@ def get_queued_count(
return int(val)
def assert_nothing_to_upload(
client: PageserverHttpClient,
tenant_id: TenantId,
timeline_id: TimelineId,
):
"""
Check last_record_lsn == remote_consistent_lsn. Assert works only for empty timelines, which
do not have anything to compact or gc.
"""
detail = client.timeline_detail(tenant_id, timeline_id)
assert Lsn(detail["last_record_lsn"]) == Lsn(detail["remote_consistent_lsn"])
# TODO Test that we correctly handle GC of files that are stuck in upload queue.

View File

@@ -23,6 +23,7 @@ from fixtures.pageserver.utils import (
tenant_exists,
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import (
@@ -545,7 +546,7 @@ def test_emergency_relocate_with_branches_slow_replay(
# - A logical replication message between the inserts, so that we can conveniently
# pause the WAL ingestion between the two inserts.
# - Child branch, created after the inserts
tenant_id, _ = env.neon_cli.create_tenant()
tenant_id, timeline_id = env.neon_cli.create_tenant()
main_endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with main_endpoint.cursor() as cur:
@@ -558,7 +559,14 @@ def test_emergency_relocate_with_branches_slow_replay(
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
main_endpoint.stop()
env.neon_cli.create_branch("child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn)
child_timeline_id = env.neon_cli.create_branch(
"child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn
)
# Wait for the index_part.json file of both branches to be uploaded to remote storage.
# This is a work around for issue https://github.com/neondatabase/neon/issues/3865.
wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id)
wait_for_upload_queue_empty(pageserver_http, tenant_id, child_timeline_id)
# Now kill the pageserver, remove the tenant directory, and restart. This simulates
# the scenario that a pageserver dies unexpectedly and cannot be recovered, so we relocate
@@ -696,7 +704,7 @@ def test_emergency_relocate_with_branches_createdb(
pageserver_http = env.pageserver.http_client()
# create new nenant
tenant_id, _ = env.neon_cli.create_tenant()
tenant_id, timeline_id = env.neon_cli.create_tenant()
main_endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with main_endpoint.cursor() as cur:
@@ -704,7 +712,9 @@ def test_emergency_relocate_with_branches_createdb(
cur.execute("CREATE DATABASE neondb")
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
env.neon_cli.create_branch("child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn)
child_timeline_id = env.neon_cli.create_branch(
"child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn
)
with main_endpoint.cursor(dbname="neondb") as cur:
cur.execute("CREATE TABLE test_migrate_one AS SELECT generate_series(1,100)")
@@ -715,6 +725,11 @@ def test_emergency_relocate_with_branches_createdb(
cur.execute("CREATE TABLE test_migrate_one AS SELECT generate_series(1,200)")
child_endpoint.stop()
# Wait for the index_part.json file of both branches to be uploaded to remote storage.
# This is a work around for issue https://github.com/neondatabase/neon/issues/3865.
wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id)
wait_for_upload_queue_empty(pageserver_http, tenant_id, child_timeline_id)
# Kill the pageserver, remove the tenant directory, and restart
env.pageserver.stop(immediate=True)
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_id))