mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 22:50:38 +00:00
Compare commits
1 Commits
log_cancel
...
increase-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f011af063d |
@@ -206,4 +206,4 @@ runs:
|
||||
uses: ./.github/actions/allure-report-store
|
||||
with:
|
||||
report-dir: /tmp/test_output/allure/results
|
||||
unique-key: ${{ inputs.build_type }}
|
||||
unique-key: ${{ inputs.test_selection }}-${{ inputs.build_type }}
|
||||
|
||||
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -2859,22 +2859,22 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "pin-project"
|
||||
version = "1.1.0"
|
||||
version = "1.0.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c95a7476719eab1e366eaf73d0260af3021184f18177925b07f54b30089ceead"
|
||||
checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
|
||||
dependencies = [
|
||||
"pin-project-internal",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-internal"
|
||||
version = "1.1.0"
|
||||
version = "1.0.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07"
|
||||
checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.15",
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4979,7 +4979,6 @@ dependencies = [
|
||||
"metrics",
|
||||
"nix",
|
||||
"once_cell",
|
||||
"pin-project",
|
||||
"pin-project-lite",
|
||||
"pq_proto",
|
||||
"rand",
|
||||
|
||||
@@ -30,12 +30,12 @@ use utils::pid_file::{self, PidFileRead};
|
||||
|
||||
// These constants control the loop used to poll for process start / stop.
|
||||
//
|
||||
// The loop waits for at most 10 seconds, polling every 100 ms.
|
||||
// The loop waits for at most 20 seconds, polling every 100 ms.
|
||||
// Once a second, it prints a dot ("."), to give the user an indication that
|
||||
// it's waiting. If the process hasn't started/stopped after 5 seconds,
|
||||
// it prints a notice that it's taking long, but keeps waiting.
|
||||
//
|
||||
const RETRY_UNTIL_SECS: u64 = 10;
|
||||
const RETRY_UNTIL_SECS: u64 = 20;
|
||||
const RETRIES: u64 = (RETRY_UNTIL_SECS * 1000) / RETRY_INTERVAL_MILLIS;
|
||||
const RETRY_INTERVAL_MILLIS: u64 = 100;
|
||||
const DOT_EVERY_RETRIES: u64 = 10;
|
||||
|
||||
@@ -41,9 +41,6 @@ pq_proto.workspace = true
|
||||
metrics.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
# needed to have pin-projected with PinnedDrop
|
||||
pin-project = "1.1"
|
||||
|
||||
[dev-dependencies]
|
||||
byteorder.workspace = true
|
||||
bytes.workspace = true
|
||||
|
||||
@@ -147,56 +147,3 @@ macro_rules! const_assert {
|
||||
const _: () = assert!($($args)*);
|
||||
};
|
||||
}
|
||||
|
||||
pub mod cancel_log {
|
||||
pub trait CancelationLoggingExt {
|
||||
fn log_being_canceled(self, step: &'static str) -> LogCancelation<Self>
|
||||
where
|
||||
Self: Sized;
|
||||
}
|
||||
|
||||
impl<Fut: std::future::Future> CancelationLoggingExt for Fut {
|
||||
fn log_being_canceled(self, step: &'static str) -> LogCancelation<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
LogCancelation {
|
||||
inner: self,
|
||||
name: Some(step),
|
||||
span: tracing::Span::current(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project::pin_project(PinnedDrop)]
|
||||
pub struct LogCancelation<Fut> {
|
||||
#[pin]
|
||||
inner: Fut,
|
||||
name: Option<&'static str>,
|
||||
span: tracing::Span,
|
||||
}
|
||||
|
||||
impl<Fut: std::future::Future> std::future::Future for LogCancelation<Fut> {
|
||||
type Output = Fut::Output;
|
||||
|
||||
fn poll(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
let ready = std::task::ready!(this.inner.poll(cx));
|
||||
this.name.take();
|
||||
std::task::Poll::Ready(ready)
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project::pinned_drop]
|
||||
impl<Fut> PinnedDrop for LogCancelation<Fut> {
|
||||
fn drop(self: std::pin::Pin<&mut Self>) {
|
||||
if let Some(name) = self.name {
|
||||
let _g = self.span.enter();
|
||||
tracing::info!("canceled while waiting for {name}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,6 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline}
|
||||
use crate::{config::PageServerConf, tenant::mgr};
|
||||
use utils::{
|
||||
auth::JwtAuth,
|
||||
cancel_log::CancelationLoggingExt,
|
||||
http::{
|
||||
endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
|
||||
error::{ApiError, HttpErrorBody},
|
||||
@@ -187,7 +186,6 @@ async fn build_timeline_info(
|
||||
CancellationToken::new(),
|
||||
ctx,
|
||||
)
|
||||
.log_being_canceled("get_current_logical_size_non_incremental")
|
||||
.await?,
|
||||
);
|
||||
}
|
||||
@@ -340,9 +338,7 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
|
||||
let timeline_info = async {
|
||||
let tenant = mgr::get_tenant(tenant_id, true)
|
||||
.log_being_canceled("get_tenant")
|
||||
.await?;
|
||||
let tenant = mgr::get_tenant(tenant_id, true).await?;
|
||||
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, false)
|
||||
@@ -353,7 +349,6 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
include_non_incremental_logical_size.unwrap_or(false),
|
||||
&ctx,
|
||||
)
|
||||
.log_being_canceled("build_timeline_info")
|
||||
.await
|
||||
.context("get local timeline info")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
@@ -361,7 +356,6 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
Ok::<_, ApiError>(timeline_info)
|
||||
}
|
||||
.instrument(info_span!("timeline_detail", tenant = %tenant_id, timeline = %timeline_id))
|
||||
.log_being_canceled("whole response")
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, timeline_info)
|
||||
|
||||
@@ -1264,24 +1264,8 @@ impl Tenant {
|
||||
"Cannot create timelines on inactive tenant"
|
||||
);
|
||||
|
||||
if let Ok(existing) = self.get_timeline(new_timeline_id, false) {
|
||||
if self.get_timeline(new_timeline_id, false).is_ok() {
|
||||
debug!("timeline {new_timeline_id} already exists");
|
||||
|
||||
if let Some(remote_client) = existing.remote_client.as_ref() {
|
||||
// Wait for uploads to complete, so that when we return Ok, the timeline
|
||||
// is known to be durable on remote storage. Just like we do at the end of
|
||||
// this function, after we have created the timeline ourselves.
|
||||
//
|
||||
// We only really care that the initial version of `index_part.json` has
|
||||
// been uploaded. That's enough to remember that the timeline
|
||||
// exists. However, there is no function to wait specifically for that so
|
||||
// we just wait for all in-progress uploads to finish.
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for timeline uploads to complete")?;
|
||||
}
|
||||
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
@@ -1323,17 +1307,6 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
|
||||
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
|
||||
// Ok, the timeline is durable in remote storage.
|
||||
let kind = ancestor_timeline_id
|
||||
.map(|_| "branched")
|
||||
.unwrap_or("bootstrapped");
|
||||
remote_client.wait_completion().await.with_context(|| {
|
||||
format!("wait for {} timeline initial uploads to complete", kind)
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(Some(loaded_timeline))
|
||||
}
|
||||
|
||||
@@ -2403,18 +2376,17 @@ impl Tenant {
|
||||
src_timeline.initdb_lsn,
|
||||
src_timeline.pg_version,
|
||||
);
|
||||
|
||||
let new_timeline = {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
self.prepare_timeline(
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let new_timeline = self
|
||||
.prepare_timeline(
|
||||
dst_id,
|
||||
&metadata,
|
||||
timeline_uninit_mark,
|
||||
false,
|
||||
Some(Arc::clone(src_timeline)),
|
||||
)?
|
||||
.initialize_with_lock(ctx, &mut timelines, true, true)?
|
||||
};
|
||||
.initialize_with_lock(ctx, &mut timelines, true, true)?;
|
||||
drop(timelines);
|
||||
|
||||
// Root timeline gets its layers during creation and uploads them along with the metadata.
|
||||
// A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
|
||||
|
||||
@@ -292,93 +292,77 @@ fn bad_duration<'a>(field_name: &'static str, value: &'a str) -> impl 'a + Fn()
|
||||
move || format!("Cannot parse `{field_name}` duration {value:?}")
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn from_request(
|
||||
checkpoint_distance: Option<u64>,
|
||||
checkpoint_timeout: &Option<String>,
|
||||
compaction_target_size: Option<u64>,
|
||||
compaction_period: &Option<String>,
|
||||
compaction_threshold: Option<usize>,
|
||||
gc_horizon: Option<u64>,
|
||||
gc_period: &Option<String>,
|
||||
image_creation_threshold: Option<usize>,
|
||||
pitr_interval: &Option<String>,
|
||||
walreceiver_connect_timeout: &Option<String>,
|
||||
lagging_wal_timeout: &Option<String>,
|
||||
max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
trace_read_requests: Option<bool>,
|
||||
eviction_policy: &Option<serde_json::Value>,
|
||||
min_resident_size_override: Option<u64>,
|
||||
evictions_low_residence_duration_metric_threshold: &Option<String>,
|
||||
) -> Result<Self, anyhow::Error> {
|
||||
impl TryFrom<&'_ TenantCreateRequest> for TenantConfOpt {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(request_data: &TenantCreateRequest) -> Result<Self, Self::Error> {
|
||||
let mut tenant_conf = TenantConfOpt::default();
|
||||
|
||||
if let Some(gc_period) = &gc_period {
|
||||
if let Some(gc_period) = &request_data.gc_period {
|
||||
tenant_conf.gc_period = Some(
|
||||
humantime::parse_duration(gc_period)
|
||||
.with_context(bad_duration("gc_period", gc_period))?,
|
||||
);
|
||||
}
|
||||
tenant_conf.gc_horizon = gc_horizon;
|
||||
tenant_conf.image_creation_threshold = image_creation_threshold;
|
||||
tenant_conf.gc_horizon = request_data.gc_horizon;
|
||||
tenant_conf.image_creation_threshold = request_data.image_creation_threshold;
|
||||
|
||||
if let Some(pitr_interval) = &pitr_interval {
|
||||
if let Some(pitr_interval) = &request_data.pitr_interval {
|
||||
tenant_conf.pitr_interval = Some(
|
||||
humantime::parse_duration(pitr_interval)
|
||||
.with_context(bad_duration("pitr_interval", pitr_interval))?,
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(walreceiver_connect_timeout) = &walreceiver_connect_timeout {
|
||||
if let Some(walreceiver_connect_timeout) = &request_data.walreceiver_connect_timeout {
|
||||
tenant_conf.walreceiver_connect_timeout = Some(
|
||||
humantime::parse_duration(walreceiver_connect_timeout).with_context(
|
||||
bad_duration("walreceiver_connect_timeout", walreceiver_connect_timeout),
|
||||
)?,
|
||||
);
|
||||
}
|
||||
if let Some(lagging_wal_timeout) = &lagging_wal_timeout {
|
||||
if let Some(lagging_wal_timeout) = &request_data.lagging_wal_timeout {
|
||||
tenant_conf.lagging_wal_timeout = Some(
|
||||
humantime::parse_duration(lagging_wal_timeout)
|
||||
.with_context(bad_duration("lagging_wal_timeout", lagging_wal_timeout))?,
|
||||
);
|
||||
}
|
||||
if let Some(max_lsn_wal_lag) = max_lsn_wal_lag {
|
||||
if let Some(max_lsn_wal_lag) = request_data.max_lsn_wal_lag {
|
||||
tenant_conf.max_lsn_wal_lag = Some(max_lsn_wal_lag);
|
||||
}
|
||||
if let Some(trace_read_requests) = trace_read_requests {
|
||||
if let Some(trace_read_requests) = request_data.trace_read_requests {
|
||||
tenant_conf.trace_read_requests = Some(trace_read_requests);
|
||||
}
|
||||
|
||||
tenant_conf.checkpoint_distance = checkpoint_distance;
|
||||
if let Some(checkpoint_timeout) = &checkpoint_timeout {
|
||||
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
||||
if let Some(checkpoint_timeout) = &request_data.checkpoint_timeout {
|
||||
tenant_conf.checkpoint_timeout = Some(
|
||||
humantime::parse_duration(checkpoint_timeout)
|
||||
.with_context(bad_duration("checkpoint_timeout", checkpoint_timeout))?,
|
||||
);
|
||||
}
|
||||
|
||||
tenant_conf.compaction_target_size = compaction_target_size;
|
||||
tenant_conf.compaction_threshold = compaction_threshold;
|
||||
tenant_conf.compaction_target_size = request_data.compaction_target_size;
|
||||
tenant_conf.compaction_threshold = request_data.compaction_threshold;
|
||||
|
||||
if let Some(compaction_period) = &compaction_period {
|
||||
if let Some(compaction_period) = &request_data.compaction_period {
|
||||
tenant_conf.compaction_period = Some(
|
||||
humantime::parse_duration(compaction_period)
|
||||
.with_context(bad_duration("compaction_period", compaction_period))?,
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(eviction_policy) = &eviction_policy {
|
||||
if let Some(eviction_policy) = &request_data.eviction_policy {
|
||||
tenant_conf.eviction_policy = Some(
|
||||
serde::Deserialize::deserialize(eviction_policy)
|
||||
.context("parse field `eviction_policy`")?,
|
||||
);
|
||||
}
|
||||
|
||||
tenant_conf.min_resident_size_override = min_resident_size_override;
|
||||
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
|
||||
|
||||
if let Some(evictions_low_residence_duration_metric_threshold) =
|
||||
&evictions_low_residence_duration_metric_threshold
|
||||
&request_data.evictions_low_residence_duration_metric_threshold
|
||||
{
|
||||
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
|
||||
humantime::parse_duration(evictions_low_residence_duration_metric_threshold)
|
||||
@@ -393,53 +377,81 @@ impl TenantConfOpt {
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&'_ TenantCreateRequest> for TenantConfOpt {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(request_data: &TenantCreateRequest) -> Result<Self, Self::Error> {
|
||||
Self::from_request(
|
||||
request_data.checkpoint_distance,
|
||||
&request_data.checkpoint_timeout,
|
||||
request_data.compaction_target_size,
|
||||
&request_data.compaction_period,
|
||||
request_data.compaction_threshold,
|
||||
request_data.gc_horizon,
|
||||
&request_data.gc_period,
|
||||
request_data.image_creation_threshold,
|
||||
&request_data.pitr_interval,
|
||||
&request_data.walreceiver_connect_timeout,
|
||||
&request_data.lagging_wal_timeout,
|
||||
request_data.max_lsn_wal_lag,
|
||||
request_data.trace_read_requests,
|
||||
&request_data.eviction_policy,
|
||||
request_data.min_resident_size_override,
|
||||
&request_data.evictions_low_residence_duration_metric_threshold,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&'_ TenantConfigRequest> for TenantConfOpt {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(request_data: &TenantConfigRequest) -> Result<Self, Self::Error> {
|
||||
Self::from_request(
|
||||
request_data.checkpoint_distance,
|
||||
&request_data.checkpoint_timeout,
|
||||
request_data.compaction_target_size,
|
||||
&request_data.compaction_period,
|
||||
request_data.compaction_threshold,
|
||||
request_data.gc_horizon,
|
||||
&request_data.gc_period,
|
||||
request_data.image_creation_threshold,
|
||||
&request_data.pitr_interval,
|
||||
&request_data.walreceiver_connect_timeout,
|
||||
&request_data.lagging_wal_timeout,
|
||||
request_data.max_lsn_wal_lag,
|
||||
request_data.trace_read_requests,
|
||||
&request_data.eviction_policy,
|
||||
request_data.min_resident_size_override,
|
||||
&request_data.evictions_low_residence_duration_metric_threshold,
|
||||
)
|
||||
let mut tenant_conf = TenantConfOpt::default();
|
||||
if let Some(gc_period) = &request_data.gc_period {
|
||||
tenant_conf.gc_period = Some(
|
||||
humantime::parse_duration(gc_period)
|
||||
.with_context(bad_duration("gc_period", gc_period))?,
|
||||
);
|
||||
}
|
||||
tenant_conf.gc_horizon = request_data.gc_horizon;
|
||||
tenant_conf.image_creation_threshold = request_data.image_creation_threshold;
|
||||
|
||||
if let Some(pitr_interval) = &request_data.pitr_interval {
|
||||
tenant_conf.pitr_interval = Some(
|
||||
humantime::parse_duration(pitr_interval)
|
||||
.with_context(bad_duration("pitr_interval", pitr_interval))?,
|
||||
);
|
||||
}
|
||||
if let Some(walreceiver_connect_timeout) = &request_data.walreceiver_connect_timeout {
|
||||
tenant_conf.walreceiver_connect_timeout = Some(
|
||||
humantime::parse_duration(walreceiver_connect_timeout).with_context(
|
||||
bad_duration("walreceiver_connect_timeout", walreceiver_connect_timeout),
|
||||
)?,
|
||||
);
|
||||
}
|
||||
if let Some(lagging_wal_timeout) = &request_data.lagging_wal_timeout {
|
||||
tenant_conf.lagging_wal_timeout = Some(
|
||||
humantime::parse_duration(lagging_wal_timeout)
|
||||
.with_context(bad_duration("lagging_wal_timeout", lagging_wal_timeout))?,
|
||||
);
|
||||
}
|
||||
tenant_conf.max_lsn_wal_lag = request_data.max_lsn_wal_lag;
|
||||
tenant_conf.trace_read_requests = request_data.trace_read_requests;
|
||||
|
||||
tenant_conf.checkpoint_distance = request_data.checkpoint_distance;
|
||||
if let Some(checkpoint_timeout) = &request_data.checkpoint_timeout {
|
||||
tenant_conf.checkpoint_timeout = Some(
|
||||
humantime::parse_duration(checkpoint_timeout)
|
||||
.with_context(bad_duration("checkpoint_timeout", checkpoint_timeout))?,
|
||||
);
|
||||
}
|
||||
tenant_conf.compaction_target_size = request_data.compaction_target_size;
|
||||
tenant_conf.compaction_threshold = request_data.compaction_threshold;
|
||||
|
||||
if let Some(compaction_period) = &request_data.compaction_period {
|
||||
tenant_conf.compaction_period = Some(
|
||||
humantime::parse_duration(compaction_period)
|
||||
.with_context(bad_duration("compaction_period", compaction_period))?,
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(eviction_policy) = &request_data.eviction_policy {
|
||||
tenant_conf.eviction_policy = Some(
|
||||
serde::Deserialize::deserialize(eviction_policy)
|
||||
.context("parse field `eviction_policy`")?,
|
||||
);
|
||||
}
|
||||
|
||||
tenant_conf.min_resident_size_override = request_data.min_resident_size_override;
|
||||
|
||||
if let Some(evictions_low_residence_duration_metric_threshold) =
|
||||
&request_data.evictions_low_residence_duration_metric_threshold
|
||||
{
|
||||
tenant_conf.evictions_low_residence_duration_metric_threshold = Some(
|
||||
humantime::parse_duration(evictions_low_residence_duration_metric_threshold)
|
||||
.with_context(bad_duration(
|
||||
"evictions_low_residence_duration_metric_threshold",
|
||||
evictions_low_residence_duration_metric_threshold,
|
||||
))?,
|
||||
);
|
||||
}
|
||||
|
||||
Ok(tenant_conf)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -192,9 +192,8 @@ retry:
|
||||
{
|
||||
if (!PQconsumeInput(pageserver_conn))
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
neon_log(LOG, "could not get response from pageserver: %s", msg);
|
||||
pfree(msg);
|
||||
neon_log(LOG, "could not get response from pageserver: %s",
|
||||
PQerrorMessage(pageserver_conn));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
@@ -344,7 +343,7 @@ pageserver_receive(void)
|
||||
resp = NULL;
|
||||
}
|
||||
else if (rc == -2)
|
||||
neon_log(ERROR, "could not read COPY data: %s", pchomp(PQerrorMessage(pageserver_conn)));
|
||||
neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
|
||||
else
|
||||
neon_log(ERROR, "unexpected PQgetCopyData return value: %d", rc);
|
||||
}
|
||||
@@ -368,7 +367,7 @@ pageserver_flush(void)
|
||||
}
|
||||
else if (PQflush(pageserver_conn))
|
||||
{
|
||||
char *msg = pchomp(PQerrorMessage(pageserver_conn));
|
||||
char *msg = PQerrorMessage(pageserver_conn);
|
||||
|
||||
pageserver_disconnect();
|
||||
neon_log(ERROR, "failed to flush page requests: %s", msg);
|
||||
|
||||
@@ -7,7 +7,6 @@ mod credentials;
|
||||
pub use credentials::ClientCredentials;
|
||||
|
||||
mod password_hack;
|
||||
pub use password_hack::parse_endpoint_param;
|
||||
use password_hack::PasswordHackPayload;
|
||||
|
||||
mod flow;
|
||||
@@ -45,10 +44,10 @@ pub enum AuthErrorImpl {
|
||||
#[error(
|
||||
"Endpoint ID is not specified. \
|
||||
Either please upgrade the postgres client library (libpq) for SNI support \
|
||||
or pass the endpoint ID (first part of the domain name) as a parameter: '?options=endpoint%3D<endpoint-id>'. \
|
||||
or pass the endpoint ID (first part of the domain name) as a parameter: '?options=project%3D<endpoint-id>'. \
|
||||
See more at https://neon.tech/sni"
|
||||
)]
|
||||
MissingEndpointName,
|
||||
MissingProjectName,
|
||||
|
||||
#[error("password authentication failed for user '{0}'")]
|
||||
AuthFailed(Box<str>),
|
||||
@@ -89,7 +88,7 @@ impl UserFacingError for AuthError {
|
||||
AuthFailed(_) => self.to_string(),
|
||||
BadAuthMethod(_) => self.to_string(),
|
||||
MalformedPassword(_) => self.to_string(),
|
||||
MissingEndpointName => self.to_string(),
|
||||
MissingProjectName => self.to_string(),
|
||||
Io(_) => "Internal error".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,8 +52,8 @@ pub async fn password_hack(
|
||||
.authenticate()
|
||||
.await?;
|
||||
|
||||
info!(project = &payload.endpoint, "received missing parameter");
|
||||
creds.project = Some(payload.endpoint);
|
||||
info!(project = &payload.project, "received missing parameter");
|
||||
creds.project = Some(payload.project);
|
||||
|
||||
let mut node = api.wake_compute(extra, creds).await?;
|
||||
node.config.password(payload.password);
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
//! User credentials used in authentication.
|
||||
|
||||
use crate::{auth::password_hack::parse_endpoint_param, error::UserFacingError};
|
||||
use itertools::Itertools;
|
||||
use crate::error::UserFacingError;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use std::collections::HashSet;
|
||||
use thiserror::Error;
|
||||
@@ -62,15 +61,7 @@ impl<'a> ClientCredentials<'a> {
|
||||
// Project name might be passed via PG's command-line options.
|
||||
let project_option = params
|
||||
.options_raw()
|
||||
.and_then(|options| {
|
||||
// We support both `project` (deprecated) and `endpoint` options for backward compatibility.
|
||||
// However, if both are present, we don't exactly know which one to use.
|
||||
// Therefore we require that only one of them is present.
|
||||
options
|
||||
.filter_map(parse_endpoint_param)
|
||||
.at_most_one()
|
||||
.ok()?
|
||||
})
|
||||
.and_then(|mut options| options.find_map(|opt| opt.strip_prefix("project=")))
|
||||
.map(|name| name.to_string());
|
||||
|
||||
let project_from_domain = if let Some(sni_str) = sni {
|
||||
@@ -186,51 +177,6 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_endpoint_from_options() -> anyhow::Result<()> {
|
||||
let options = StartupMessageParams::new([
|
||||
("user", "john_doe"),
|
||||
("options", "-ckey=1 endpoint=bar -c geqo=off"),
|
||||
]);
|
||||
|
||||
let creds = ClientCredentials::parse(&options, None, None)?;
|
||||
assert_eq!(creds.user, "john_doe");
|
||||
assert_eq!(creds.project.as_deref(), Some("bar"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_three_endpoints_from_options() -> anyhow::Result<()> {
|
||||
let options = StartupMessageParams::new([
|
||||
("user", "john_doe"),
|
||||
(
|
||||
"options",
|
||||
"-ckey=1 endpoint=one endpoint=two endpoint=three -c geqo=off",
|
||||
),
|
||||
]);
|
||||
|
||||
let creds = ClientCredentials::parse(&options, None, None)?;
|
||||
assert_eq!(creds.user, "john_doe");
|
||||
assert!(creds.project.is_none());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_when_endpoint_and_project_are_in_options() -> anyhow::Result<()> {
|
||||
let options = StartupMessageParams::new([
|
||||
("user", "john_doe"),
|
||||
("options", "-ckey=1 endpoint=bar project=foo -c geqo=off"),
|
||||
]);
|
||||
|
||||
let creds = ClientCredentials::parse(&options, None, None)?;
|
||||
assert_eq!(creds.user, "john_doe");
|
||||
assert!(creds.project.is_none());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_projects_identical() -> anyhow::Result<()> {
|
||||
let options = StartupMessageParams::new([("user", "john_doe"), ("options", "project=baz")]);
|
||||
|
||||
@@ -91,7 +91,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, PasswordHack> {
|
||||
// the user neither enabled SNI nor resorted to any other method
|
||||
// for passing the project name we rely on. We should show them
|
||||
// the most helpful error message and point to the documentation.
|
||||
.ok_or(AuthErrorImpl::MissingEndpointName)?;
|
||||
.ok_or(AuthErrorImpl::MissingProjectName)?;
|
||||
|
||||
Ok(payload)
|
||||
}
|
||||
|
||||
@@ -6,55 +6,27 @@
|
||||
use bstr::ByteSlice;
|
||||
|
||||
pub struct PasswordHackPayload {
|
||||
pub endpoint: String,
|
||||
pub project: String,
|
||||
pub password: Vec<u8>,
|
||||
}
|
||||
|
||||
impl PasswordHackPayload {
|
||||
pub fn parse(bytes: &[u8]) -> Option<Self> {
|
||||
// The format is `project=<utf-8>;<password-bytes>`.
|
||||
let mut iter = bytes.splitn_str(2, ";");
|
||||
let endpoint = iter.next()?.to_str().ok()?;
|
||||
let endpoint = parse_endpoint_param(endpoint)?.to_owned();
|
||||
let mut iter = bytes.strip_prefix(b"project=")?.splitn_str(2, ";");
|
||||
let project = iter.next()?.to_str().ok()?.to_owned();
|
||||
let password = iter.next()?.to_owned();
|
||||
|
||||
Some(Self { endpoint, password })
|
||||
Some(Self { project, password })
|
||||
}
|
||||
}
|
||||
|
||||
pub fn parse_endpoint_param(bytes: &str) -> Option<&str> {
|
||||
bytes
|
||||
.strip_prefix("project=")
|
||||
.or_else(|| bytes.strip_prefix("endpoint="))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parse_endpoint_param_fn() {
|
||||
let input = "";
|
||||
assert!(parse_endpoint_param(input).is_none());
|
||||
|
||||
let input = "project=";
|
||||
assert_eq!(parse_endpoint_param(input), Some(""));
|
||||
|
||||
let input = "project=foobar";
|
||||
assert_eq!(parse_endpoint_param(input), Some("foobar"));
|
||||
|
||||
let input = "endpoint=";
|
||||
assert_eq!(parse_endpoint_param(input), Some(""));
|
||||
|
||||
let input = "endpoint=foobar";
|
||||
assert_eq!(parse_endpoint_param(input), Some("foobar"));
|
||||
|
||||
let input = "other_option=foobar";
|
||||
assert!(parse_endpoint_param(input).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_password_hack_payload_project() {
|
||||
fn parse_password_hack_payload() {
|
||||
let bytes = b"";
|
||||
assert!(PasswordHackPayload::parse(bytes).is_none());
|
||||
|
||||
@@ -62,33 +34,13 @@ mod tests {
|
||||
assert!(PasswordHackPayload::parse(bytes).is_none());
|
||||
|
||||
let bytes = b"project=;";
|
||||
let payload: PasswordHackPayload =
|
||||
PasswordHackPayload::parse(bytes).expect("parsing failed");
|
||||
assert_eq!(payload.endpoint, "");
|
||||
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
|
||||
assert_eq!(payload.project, "");
|
||||
assert_eq!(payload.password, b"");
|
||||
|
||||
let bytes = b"project=foobar;pass;word";
|
||||
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
|
||||
assert_eq!(payload.endpoint, "foobar");
|
||||
assert_eq!(payload.password, b"pass;word");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_password_hack_payload_endpoint() {
|
||||
let bytes = b"";
|
||||
assert!(PasswordHackPayload::parse(bytes).is_none());
|
||||
|
||||
let bytes = b"endpoint=";
|
||||
assert!(PasswordHackPayload::parse(bytes).is_none());
|
||||
|
||||
let bytes = b"endpoint=;";
|
||||
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
|
||||
assert_eq!(payload.endpoint, "");
|
||||
assert_eq!(payload.password, b"");
|
||||
|
||||
let bytes = b"endpoint=foobar;pass;word";
|
||||
let payload = PasswordHackPayload::parse(bytes).expect("parsing failed");
|
||||
assert_eq!(payload.endpoint, "foobar");
|
||||
assert_eq!(payload.project, "foobar");
|
||||
assert_eq!(payload.password, b"pass;word");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{auth::parse_endpoint_param, cancellation::CancelClosure, error::UserFacingError};
|
||||
use crate::{cancellation::CancelClosure, error::UserFacingError};
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use itertools::Itertools;
|
||||
use pq_proto::StartupMessageParams;
|
||||
@@ -279,7 +279,7 @@ fn filtered_options(params: &StartupMessageParams) -> Option<String> {
|
||||
#[allow(unstable_name_collisions)]
|
||||
let options: String = params
|
||||
.options_raw()?
|
||||
.filter(|opt| parse_endpoint_param(opt).is_none())
|
||||
.filter(|opt| !opt.starts_with("project="))
|
||||
.intersperse(" ") // TODO: use impl from std once it's stabilized
|
||||
.collect();
|
||||
|
||||
|
||||
@@ -79,17 +79,7 @@ module.exports = async ({ github, context, fetch, report }) => {
|
||||
for (const parentSuite of suites.children) {
|
||||
for (const suite of parentSuite.children) {
|
||||
for (const test of suite.children) {
|
||||
let buildType, pgVersion
|
||||
const match = test.name.match(/[\[-](?<buildType>debug|release)-pg(?<pgVersion>\d+)[-\]]/)?.groups
|
||||
if (match) {
|
||||
({buildType, pgVersion} = match)
|
||||
} else {
|
||||
// It's ok, we embed BUILD_TYPE and Postgres Version into the test name only for regress suite and do not for other suites (like performance).
|
||||
console.info(`Cannot get BUILD_TYPE and Postgres Version from test name: "${test.name}", defaulting to "release" and "14"`)
|
||||
|
||||
buildType = "release"
|
||||
pgVersion = "14"
|
||||
}
|
||||
const {groups: {buildType, pgVersion}} = test.name.match(/[\[-](?<buildType>debug|release)-pg(?<pgVersion>\d+)[-\]]/)
|
||||
|
||||
pgVersions.add(pgVersion)
|
||||
buildTypes.add(buildType)
|
||||
|
||||
@@ -272,7 +272,6 @@ class PageserverHttpClient(requests.Session):
|
||||
new_timeline_id: Optional[TimelineId] = None,
|
||||
ancestor_timeline_id: Optional[TimelineId] = None,
|
||||
ancestor_start_lsn: Optional[Lsn] = None,
|
||||
**kwargs,
|
||||
) -> Dict[Any, Any]:
|
||||
body: Dict[str, Any] = {
|
||||
"new_timeline_id": str(new_timeline_id) if new_timeline_id else None,
|
||||
@@ -282,9 +281,7 @@ class PageserverHttpClient(requests.Session):
|
||||
if pg_version != PgVersion.NOT_SET:
|
||||
body["pg_version"] = int(pg_version)
|
||||
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", json=body, **kwargs
|
||||
)
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", json=body)
|
||||
self.verbose_error(res)
|
||||
if res.status_code == 409:
|
||||
raise Exception(f"could not create timeline: already exists for id {new_timeline_id}")
|
||||
|
||||
@@ -136,7 +136,9 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
|
||||
env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*")
|
||||
|
||||
# remove the initial tenant
|
||||
## why wait for upload queue? => https://github.com/neondatabase/neon/issues/3865
|
||||
assert env.initial_timeline
|
||||
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, env.initial_timeline)
|
||||
pageserver_http.tenant_detach(env.initial_tenant)
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
tenant_remote_storage = env.remote_storage.root / "tenants" / str(env.initial_tenant)
|
||||
|
||||
@@ -5,18 +5,16 @@ import pytest
|
||||
from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres
|
||||
|
||||
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
def test_proxy_select_1(static_proxy: NeonProxy, option_name: str):
|
||||
def test_proxy_select_1(static_proxy: NeonProxy):
|
||||
"""
|
||||
A simplest smoke test: check proxy against a local postgres instance.
|
||||
"""
|
||||
|
||||
out = static_proxy.safe_psql("select 1", options=f"{option_name}=generic-project-name")
|
||||
out = static_proxy.safe_psql("select 1", options="project=generic-project-name")
|
||||
assert out[0][0] == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
def test_password_hack(static_proxy: NeonProxy, option_name: str):
|
||||
def test_password_hack(static_proxy: NeonProxy):
|
||||
"""
|
||||
Check the PasswordHack auth flow: an alternative to SCRAM auth for
|
||||
clients which can't provide the project/endpoint name via SNI or `options`.
|
||||
@@ -25,12 +23,11 @@ def test_password_hack(static_proxy: NeonProxy, option_name: str):
|
||||
user = "borat"
|
||||
password = "password"
|
||||
static_proxy.safe_psql(
|
||||
f"create role {user} with login password '{password}'",
|
||||
options=f"{option_name}=irrelevant",
|
||||
f"create role {user} with login password '{password}'", options="project=irrelevant"
|
||||
)
|
||||
|
||||
# Note the format of `magic`!
|
||||
magic = f"{option_name}=irrelevant;{password}"
|
||||
magic = f"project=irrelevant;{password}"
|
||||
static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic)
|
||||
|
||||
# Must also check that invalid magic won't be accepted.
|
||||
@@ -59,62 +56,55 @@ async def test_link_auth(vanilla_pg: VanillaPostgres, link_proxy: NeonProxy):
|
||||
assert out == "42"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
def test_proxy_options(static_proxy: NeonProxy, option_name: str):
|
||||
def test_proxy_options(static_proxy: NeonProxy):
|
||||
"""
|
||||
Check that we pass extra `options` to the PostgreSQL server:
|
||||
* `project=...` and `endpoint=...` shouldn't be passed at all
|
||||
* (otherwise postgres will raise an error).
|
||||
* `project=...` shouldn't be passed at all (otherwise postgres will raise an error).
|
||||
* everything else should be passed as-is.
|
||||
"""
|
||||
|
||||
options = f"{option_name}=irrelevant -cproxytest.option=value"
|
||||
options = "project=irrelevant -cproxytest.option=value"
|
||||
out = static_proxy.safe_psql("show proxytest.option", options=options)
|
||||
assert out[0][0] == "value"
|
||||
|
||||
options = f"-c proxytest.foo=\\ str {option_name}=irrelevant"
|
||||
options = "-c proxytest.foo=\\ str project=irrelevant"
|
||||
out = static_proxy.safe_psql("show proxytest.foo", options=options)
|
||||
assert out[0][0] == " str"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
def test_auth_errors(static_proxy: NeonProxy, option_name: str):
|
||||
def test_auth_errors(static_proxy: NeonProxy):
|
||||
"""
|
||||
Check that we throw very specific errors in some unsuccessful auth scenarios.
|
||||
"""
|
||||
|
||||
# User does not exist
|
||||
with pytest.raises(psycopg2.Error) as exprinfo:
|
||||
static_proxy.connect(user="pinocchio", options=f"{option_name}=irrelevant")
|
||||
static_proxy.connect(user="pinocchio", options="project=irrelevant")
|
||||
text = str(exprinfo.value).strip()
|
||||
assert text.endswith("password authentication failed for user 'pinocchio'")
|
||||
|
||||
static_proxy.safe_psql(
|
||||
"create role pinocchio with login password 'magic'",
|
||||
options=f"{option_name}=irrelevant",
|
||||
"create role pinocchio with login password 'magic'", options="project=irrelevant"
|
||||
)
|
||||
|
||||
# User exists, but password is missing
|
||||
with pytest.raises(psycopg2.Error) as exprinfo:
|
||||
static_proxy.connect(user="pinocchio", password=None, options=f"{option_name}=irrelevant")
|
||||
static_proxy.connect(user="pinocchio", password=None, options="project=irrelevant")
|
||||
text = str(exprinfo.value).strip()
|
||||
assert text.endswith("password authentication failed for user 'pinocchio'")
|
||||
|
||||
# User exists, but password is wrong
|
||||
with pytest.raises(psycopg2.Error) as exprinfo:
|
||||
static_proxy.connect(user="pinocchio", password="bad", options=f"{option_name}=irrelevant")
|
||||
static_proxy.connect(user="pinocchio", password="bad", options="project=irrelevant")
|
||||
text = str(exprinfo.value).strip()
|
||||
assert text.endswith("password authentication failed for user 'pinocchio'")
|
||||
|
||||
# Finally, check that the user can connect
|
||||
with static_proxy.connect(
|
||||
user="pinocchio", password="magic", options=f"{option_name}=irrelevant"
|
||||
):
|
||||
with static_proxy.connect(user="pinocchio", password="magic", options="project=irrelevant"):
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
|
||||
def test_forward_params_to_client(static_proxy: NeonProxy):
|
||||
"""
|
||||
Check that we forward all necessary PostgreSQL server params to client.
|
||||
"""
|
||||
@@ -140,7 +130,7 @@ def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
|
||||
where name = any(%s)
|
||||
"""
|
||||
|
||||
with static_proxy.connect(options=f"{option_name}=irrelevant") as conn:
|
||||
with static_proxy.connect(options="project=irrelevant") as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(query, (reported_params_subset,))
|
||||
for name, value in cur.fetchall():
|
||||
@@ -148,18 +138,17 @@ def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
|
||||
assert conn.get_parameter_status(name) == value
|
||||
|
||||
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
@pytest.mark.timeout(5)
|
||||
def test_close_on_connections_exit(static_proxy: NeonProxy, option_name: str):
|
||||
def test_close_on_connections_exit(static_proxy: NeonProxy):
|
||||
# Open two connections, send SIGTERM, then ensure that proxy doesn't exit
|
||||
# until after connections close.
|
||||
with static_proxy.connect(options=f"{option_name}=irrelevant"), static_proxy.connect(
|
||||
options=f"{option_name}=irrelevant"
|
||||
with static_proxy.connect(options="project=irrelevant"), static_proxy.connect(
|
||||
options="project=irrelevant"
|
||||
):
|
||||
static_proxy.terminate()
|
||||
with pytest.raises(subprocess.TimeoutExpired):
|
||||
static_proxy.wait_for_exit(timeout=2)
|
||||
# Ensure we don't accept any more connections
|
||||
with pytest.raises(psycopg2.OperationalError):
|
||||
static_proxy.connect(options=f"{option_name}=irrelevant")
|
||||
static_proxy.connect(options="project=irrelevant")
|
||||
static_proxy.wait_for_exit()
|
||||
|
||||
@@ -2,12 +2,11 @@
|
||||
# env NEON_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
|
||||
|
||||
import os
|
||||
import queue
|
||||
import shutil
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -27,7 +26,6 @@ from fixtures.pageserver.utils import (
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import print_gc_result, query_scalar, wait_until
|
||||
from requests import ReadTimeout
|
||||
|
||||
|
||||
#
|
||||
@@ -628,7 +626,10 @@ def test_empty_branch_remote_storage_upload(
|
||||
|
||||
new_branch_name = "new_branch"
|
||||
new_branch_timeline_id = env.neon_cli.create_branch(new_branch_name, "main", env.initial_tenant)
|
||||
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
|
||||
|
||||
with env.endpoints.create_start(new_branch_name, tenant_id=env.initial_tenant) as endpoint:
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_branch_timeline_id)
|
||||
wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id)
|
||||
|
||||
timelines_before_detach = set(
|
||||
map(
|
||||
@@ -657,19 +658,13 @@ def test_empty_branch_remote_storage_upload(
|
||||
), f"Expected to have same timelines after reattach, but got {timelines_after_detach}"
|
||||
|
||||
|
||||
# Branches off a root branch, but does not write anything to the new branch, so it has a metadata file only.
|
||||
# Ensures the branch is not on the remote storage and restarts the pageserver — the branch should be uploaded after the restart.
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_empty_branch_remote_storage_upload_on_restart(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
):
|
||||
"""
|
||||
Branches off a root branch, but does not write anything to the new branch, so
|
||||
it has a metadata file only.
|
||||
|
||||
Ensures the branch is not on the remote storage and restarts the pageserver
|
||||
— the upload should be scheduled by load, and create_timeline should await
|
||||
for it even though it gets 409 Conflict.
|
||||
"""
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_empty_branch_remote_storage_upload_on_restart",
|
||||
@@ -678,87 +673,35 @@ def test_empty_branch_remote_storage_upload_on_restart(
|
||||
env = neon_env_builder.init_start()
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
client.configure_failpoints(("before-upload-index", "return"))
|
||||
new_branch_name = "new_branch"
|
||||
new_branch_timeline_id = env.neon_cli.create_branch(new_branch_name, "main", env.initial_tenant)
|
||||
|
||||
new_branch_timeline_id = TimelineId.generate()
|
||||
with env.endpoints.create_start(new_branch_name, tenant_id=env.initial_tenant) as endpoint:
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_branch_timeline_id)
|
||||
wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id)
|
||||
|
||||
with pytest.raises(ReadTimeout):
|
||||
client.timeline_create(
|
||||
tenant_id=env.initial_tenant,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
new_timeline_id=new_branch_timeline_id,
|
||||
pg_version=env.pg_version,
|
||||
timeout=4,
|
||||
)
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*POST.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
|
||||
)
|
||||
|
||||
# index upload is now hitting the failpoint, should not block the shutdown
|
||||
env.pageserver.stop()
|
||||
|
||||
timeline_path = (
|
||||
Path("tenants") / str(env.initial_tenant) / "timelines" / str(new_branch_timeline_id)
|
||||
)
|
||||
|
||||
local_metadata = env.repo_dir / timeline_path / "metadata"
|
||||
assert local_metadata.is_file(), "timeout cancelled timeline branching, not the upload"
|
||||
|
||||
# Remove new branch from the remote storage
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
new_branch_on_remote_storage = env.remote_storage.root / timeline_path
|
||||
new_branch_on_remote_storage = (
|
||||
env.remote_storage.root
|
||||
/ "tenants"
|
||||
/ str(env.initial_tenant)
|
||||
/ "timelines"
|
||||
/ str(new_branch_timeline_id)
|
||||
)
|
||||
assert (
|
||||
not new_branch_on_remote_storage.exists()
|
||||
), "failpoint should had prohibited index_part.json upload"
|
||||
new_branch_on_remote_storage.is_dir()
|
||||
), f"'{new_branch_on_remote_storage}' path does not exist on the remote storage"
|
||||
shutil.rmtree(new_branch_on_remote_storage)
|
||||
|
||||
# during reconciliation we should had scheduled the uploads and on the
|
||||
# retried create_timeline, we will await for those to complete on next
|
||||
# client.timeline_create
|
||||
env.pageserver.start(extra_env_vars={"FAILPOINTS": "before-upload-index=return"})
|
||||
env.pageserver.start()
|
||||
|
||||
# sleep a bit to force the upload task go into exponential backoff
|
||||
time.sleep(1)
|
||||
|
||||
q: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
|
||||
barrier = threading.Barrier(2)
|
||||
|
||||
def create_in_background():
|
||||
barrier.wait()
|
||||
try:
|
||||
client.timeline_create(
|
||||
tenant_id=env.initial_tenant,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
new_timeline_id=new_branch_timeline_id,
|
||||
pg_version=env.pg_version,
|
||||
)
|
||||
q.put(None)
|
||||
except PageserverApiException as e:
|
||||
q.put(e)
|
||||
|
||||
create_thread = threading.Thread(target=create_in_background)
|
||||
create_thread.start()
|
||||
|
||||
try:
|
||||
# maximize chances of actually waiting for the uploads by create_timeline
|
||||
barrier.wait()
|
||||
|
||||
assert not new_branch_on_remote_storage.exists(), "failpoint should had stopped uploading"
|
||||
|
||||
client.configure_failpoints(("before-upload-index", "off"))
|
||||
conflict = q.get()
|
||||
|
||||
assert conflict, "create_timeline should not have succeeded"
|
||||
assert (
|
||||
conflict.status_code == 409
|
||||
), "timeline was created before restart, and uploads scheduled during initial load, so we expect 409 conflict"
|
||||
|
||||
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
|
||||
|
||||
assert (
|
||||
new_branch_on_remote_storage / "index_part.json"
|
||||
).is_file(), "uploads scheduled during initial load should had been awaited for"
|
||||
finally:
|
||||
create_thread.join()
|
||||
wait_upload_queue_empty(client, env.initial_tenant, new_branch_timeline_id)
|
||||
assert (
|
||||
new_branch_on_remote_storage.is_dir()
|
||||
), f"New branch should have been reuploaded on pageserver restart to the remote storage path '{new_branch_on_remote_storage}'"
|
||||
|
||||
|
||||
def wait_upload_queue_empty(
|
||||
@@ -809,17 +752,4 @@ def get_queued_count(
|
||||
return int(val)
|
||||
|
||||
|
||||
def assert_nothing_to_upload(
|
||||
client: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
):
|
||||
"""
|
||||
Check last_record_lsn == remote_consistent_lsn. Assert works only for empty timelines, which
|
||||
do not have anything to compact or gc.
|
||||
"""
|
||||
detail = client.timeline_detail(tenant_id, timeline_id)
|
||||
assert Lsn(detail["last_record_lsn"]) == Lsn(detail["remote_consistent_lsn"])
|
||||
|
||||
|
||||
# TODO Test that we correctly handle GC of files that are stuck in upload queue.
|
||||
|
||||
@@ -23,6 +23,7 @@ from fixtures.pageserver.utils import (
|
||||
tenant_exists,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
wait_for_upload_queue_empty,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import (
|
||||
@@ -545,7 +546,7 @@ def test_emergency_relocate_with_branches_slow_replay(
|
||||
# - A logical replication message between the inserts, so that we can conveniently
|
||||
# pause the WAL ingestion between the two inserts.
|
||||
# - Child branch, created after the inserts
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
|
||||
main_endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
with main_endpoint.cursor() as cur:
|
||||
@@ -558,7 +559,14 @@ def test_emergency_relocate_with_branches_slow_replay(
|
||||
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
|
||||
main_endpoint.stop()
|
||||
env.neon_cli.create_branch("child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn)
|
||||
child_timeline_id = env.neon_cli.create_branch(
|
||||
"child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn
|
||||
)
|
||||
|
||||
# Wait for the index_part.json file of both branches to be uploaded to remote storage.
|
||||
# This is a work around for issue https://github.com/neondatabase/neon/issues/3865.
|
||||
wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id)
|
||||
wait_for_upload_queue_empty(pageserver_http, tenant_id, child_timeline_id)
|
||||
|
||||
# Now kill the pageserver, remove the tenant directory, and restart. This simulates
|
||||
# the scenario that a pageserver dies unexpectedly and cannot be recovered, so we relocate
|
||||
@@ -696,7 +704,7 @@ def test_emergency_relocate_with_branches_createdb(
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# create new nenant
|
||||
tenant_id, _ = env.neon_cli.create_tenant()
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
|
||||
main_endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
with main_endpoint.cursor() as cur:
|
||||
@@ -704,7 +712,9 @@ def test_emergency_relocate_with_branches_createdb(
|
||||
|
||||
cur.execute("CREATE DATABASE neondb")
|
||||
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
env.neon_cli.create_branch("child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn)
|
||||
child_timeline_id = env.neon_cli.create_branch(
|
||||
"child", tenant_id=tenant_id, ancestor_start_lsn=current_lsn
|
||||
)
|
||||
|
||||
with main_endpoint.cursor(dbname="neondb") as cur:
|
||||
cur.execute("CREATE TABLE test_migrate_one AS SELECT generate_series(1,100)")
|
||||
@@ -715,6 +725,11 @@ def test_emergency_relocate_with_branches_createdb(
|
||||
cur.execute("CREATE TABLE test_migrate_one AS SELECT generate_series(1,200)")
|
||||
child_endpoint.stop()
|
||||
|
||||
# Wait for the index_part.json file of both branches to be uploaded to remote storage.
|
||||
# This is a work around for issue https://github.com/neondatabase/neon/issues/3865.
|
||||
wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id)
|
||||
wait_for_upload_queue_empty(pageserver_http, tenant_id, child_timeline_id)
|
||||
|
||||
# Kill the pageserver, remove the tenant directory, and restart
|
||||
env.pageserver.stop(immediate=True)
|
||||
shutil.rmtree(Path(env.repo_dir) / "tenants" / str(tenant_id))
|
||||
|
||||
Reference in New Issue
Block a user