Compare commits

..

9 Commits

Author SHA1 Message Date
Christian Schwarz
4d7b504c70 WIP: move PageServerConf methods that return workdir subdirs into own type
I thought this might be handy in https://github.com/neondatabase/neon/pull/5536
because initially I planned to download directly into local filesystem.
I switched to using `remote_storage` instead in that PR.

But these changes here might still be useful.
2023-10-11 18:14:12 +02:00
Conrad Ludgate
bf065aabdf proxy: update locked error retry filter (#5376)
## Problem

We don't want to retry customer quota exhaustion errors.

## Summary of changes

Make sure both types of quota exhaustion errors are not retried
2023-10-10 08:59:16 +01:00
Konstantin Knizhnik
fe74fac276 Fix handling flush error in prefetch (#5473)
## Problem

See https://neondb.slack.com/archives/C05U648A9NJ

In case of failure of flush in prefetch, 
prefetch state is reseted. We need to retry register buffer attempt,
otherwise we will get assertion failure.

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2023-10-10 07:43:37 +03:00
Alexander Bayandin
b91ac670e1 Update plpgsql_check extension to 2.5.3 (#5437) 2023-10-09 17:07:43 +01:00
John Spray
b3195afd20 tests: fix a race in test_deletion_queue_recovery on loaded nodes (#5495)
## Problem

Seen in CI for https://github.com/neondatabase/neon/pull/5453 -- the
time gap between validation completing and the header getting written is
long enough to fail the test, where it was doing a cheeky 1 second
sleep.

## Summary of changes

- Replace 1 second sleep with a wait_until to see the header file get
written
- Use enums as test params to make the results more readable (instead of
True-False parameters)
- Fix the temp suffix used for deletion queue headers: this worked fine,
but resulted in `..tmp` extension.
2023-10-09 16:28:28 +01:00
John Spray
7eaa7a496b pageserver: cancellation handling in writes to postgres client socket (#5503)
## Problem

Writes to the postgres client socket from the page server were not
wrapped in cancellation handling, so a stuck client connection could
prevent tenant shutdowwn.

## Summary of changes

All the places we call flush() to write to the socket, we should be
respecting the cancellation token for the task.

In this PR, I explicitly pass around a CancellationToken rather than
doing inline `task_mgr::shutdown_token` calls, to avoid coupling it to
the global task_mgr state and make it easier to refactor later.

I have some follow-on commits that add a Shutdown variant to QueryError
and use it more extensively, but that's pure refactor so will keep
separate from this bug fix PR.

Closes: https://github.com/neondatabase/neon/issues/5341
2023-10-09 15:54:17 +01:00
Joonas Koivunen
4772cd6c93 fix: deny branching, starting compute from not yet uploaded timelines (#5484)
Part of #5172. First commits show that we used to allow starting up a
compute or creating a branch off a not yet uploaded timeline. This PR
moves activation of a timeline to happen **after** initial layer file(s)
(if any) and `index_part.json` have been uploaded. Simply moving
activation to be *after* downloads have finished works because we now
spawn a task per http request handler.

Current behaviour of uploading on the timelines on next startup is kept,
to be removed later as part of #5172.

Adds:
- `NeonCli.map_branch` and corresponding `neon_local` implementation:
allow creating computes for timelines managed via pageserver http
client/api
- possibly duplicate tests (I did not want to search for, will cleanup
in a follow-up if these duplicated)

Changes:
- make `wait_until_tenant_state` return immediatedly on `Broken` and not
wait more
2023-10-09 17:03:38 +03:00
Shany Pozin
010b4d0d5c Move ApiError 404 to info level (#5501)
## Problem
Moving ApiError 404 to info level logging (see
https://github.com/neondatabase/neon/pull/5489#issuecomment-1750211212)
2023-10-09 13:54:46 +03:00
Rahul Modpur
477cb3717b Fix neon_local pageserver status command (#5475)
## Problem
Fix neon_local pageserver status command
#5430

## Summary of changes
Fix clap config for pageserver status subcommand

## Checklist before requesting a review

- [x] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.


Signed-off-by: Rahul Modpur <rmodpur2@gmail.com>
2023-10-09 09:13:57 +01:00
47 changed files with 750 additions and 679 deletions

11
Cargo.lock generated
View File

@@ -4006,16 +4006,6 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde_assert"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eda563240c1288b044209be1f0d38bb4d15044fb3e00dc354fbc922ab4733e80"
dependencies = [
"hashbrown 0.13.2",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.183"
@@ -5145,7 +5135,6 @@ dependencies = [
"routerify",
"sentry",
"serde",
"serde_assert",
"serde_json",
"serde_with",
"signal-hook",

View File

@@ -118,7 +118,6 @@ sentry = { version = "0.31", default-features = false, features = ["backtrace",
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "2.0"
serde_assert = "0.5.0"
sha2 = "0.10.2"
signal-hook = "0.3"
smallvec = "1.11"

View File

@@ -368,8 +368,8 @@ RUN wget https://github.com/citusdata/postgresql-hll/archive/refs/tags/v2.18.tar
FROM build-deps AS plpgsql-check-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.4.0.tar.gz -O plpgsql_check.tar.gz && \
echo "9ba58387a279b35a3bfa39ee611e5684e6cddb2ba046ddb2c5190b3bd2ca254a plpgsql_check.tar.gz" | sha256sum --check && \
RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.5.3.tar.gz -O plpgsql_check.tar.gz && \
echo "6631ec3e7fb3769eaaf56e3dfedb829aa761abf163d13dba354b4c218508e1c0 plpgsql_check.tar.gz" | sha256sum --check && \
mkdir plpgsql_check-src && cd plpgsql_check-src && tar xvzf ../plpgsql_check.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \

View File

@@ -2,6 +2,7 @@ use crate::{background_process, local_env::LocalEnv};
use anyhow::anyhow;
use camino::Utf8PathBuf;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::{path::PathBuf, process::Child};
use utils::id::{NodeId, TenantId};
@@ -13,8 +14,10 @@ pub struct AttachmentService {
const COMMAND: &str = "attachment_service";
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct AttachHookRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
pub pageserver_id: Option<NodeId>,
}

View File

@@ -116,6 +116,7 @@ fn main() -> Result<()> {
"attachment_service" => handle_attachment_service(sub_args, &env),
"safekeeper" => handle_safekeeper(sub_args, &env),
"endpoint" => handle_endpoint(sub_args, &env),
"mappings" => handle_mappings(sub_args, &mut env),
"pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
_ => bail!("unexpected subcommand {sub_name}"),
};
@@ -816,6 +817,38 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
Ok(())
}
fn handle_mappings(sub_match: &ArgMatches, env: &mut local_env::LocalEnv) -> Result<()> {
let (sub_name, sub_args) = match sub_match.subcommand() {
Some(ep_subcommand_data) => ep_subcommand_data,
None => bail!("no mappings subcommand provided"),
};
match sub_name {
"map" => {
let branch_name = sub_args
.get_one::<String>("branch-name")
.expect("branch-name argument missing");
let tenant_id = sub_args
.get_one::<String>("tenant-id")
.map(|x| TenantId::from_str(x))
.expect("tenant-id argument missing")
.expect("malformed tenant-id arg");
let timeline_id = sub_args
.get_one::<String>("timeline-id")
.map(|x| TimelineId::from_str(x))
.expect("timeline-id argument missing")
.expect("malformed timeline-id arg");
env.register_branch_mapping(branch_name.to_owned(), tenant_id, timeline_id)?;
Ok(())
}
other => unimplemented!("mappings subcommand {other}"),
}
}
fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageServerNode> {
let node_id = if let Some(id_str) = args.get_one::<String>("pageserver-id") {
@@ -1084,6 +1117,7 @@ fn cli() -> Command {
// --id, when using a pageserver command
let pageserver_id_arg = Arg::new("pageserver-id")
.long("id")
.global(true)
.help("pageserver id")
.required(false);
// --pageserver-id when using a non-pageserver command
@@ -1254,17 +1288,20 @@ fn cli() -> Command {
Command::new("pageserver")
.arg_required_else_help(true)
.about("Manage pageserver")
.arg(pageserver_id_arg)
.subcommand(Command::new("status"))
.arg(pageserver_id_arg.clone())
.subcommand(Command::new("start").about("Start local pageserver")
.arg(pageserver_id_arg.clone())
.arg(pageserver_config_args.clone()))
.subcommand(Command::new("stop").about("Stop local pageserver")
.arg(pageserver_id_arg.clone())
.arg(stop_mode_arg.clone()))
.subcommand(Command::new("restart").about("Restart local pageserver")
.arg(pageserver_id_arg.clone())
.arg(pageserver_config_args.clone()))
.subcommand(Command::new("start")
.about("Start local pageserver")
.arg(pageserver_config_args.clone())
)
.subcommand(Command::new("stop")
.about("Stop local pageserver")
.arg(stop_mode_arg.clone())
)
.subcommand(Command::new("restart")
.about("Restart local pageserver")
.arg(pageserver_config_args.clone())
)
)
.subcommand(
Command::new("attachment_service")
@@ -1321,8 +1358,8 @@ fn cli() -> Command {
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
.arg(endpoint_id_arg.clone())
.arg(tenant_id_arg.clone())
.arg(branch_name_arg)
.arg(timeline_id_arg)
.arg(branch_name_arg.clone())
.arg(timeline_id_arg.clone())
.arg(lsn_arg)
.arg(pg_port_arg)
.arg(http_port_arg)
@@ -1335,7 +1372,7 @@ fn cli() -> Command {
.subcommand(
Command::new("stop")
.arg(endpoint_id_arg)
.arg(tenant_id_arg)
.arg(tenant_id_arg.clone())
.arg(
Arg::new("destroy")
.help("Also delete data directory (now optional, should be default in future)")
@@ -1346,6 +1383,18 @@ fn cli() -> Command {
)
)
.subcommand(
Command::new("mappings")
.arg_required_else_help(true)
.about("Manage neon_local branch name mappings")
.subcommand(
Command::new("map")
.about("Create new mapping which cannot exist already")
.arg(branch_name_arg.clone())
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone())
)
)
// Obsolete old name for 'endpoint'. We now just print an error if it's used.
.subcommand(
Command::new("pg")

View File

@@ -46,6 +46,7 @@ use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::id::{NodeId, TenantId, TimelineId};
use crate::local_env::LocalEnv;
@@ -56,10 +57,13 @@ use compute_api::responses::{ComputeState, ComputeStatus};
use compute_api::spec::{Cluster, ComputeMode, ComputeSpec};
// contents of a endpoint.json file
#[serde_as]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct EndpointConf {
endpoint_id: String,
#[serde_as(as = "DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
timeline_id: TimelineId,
mode: ComputeMode,
pg_port: u16,

View File

@@ -8,6 +8,7 @@ use anyhow::{bail, ensure, Context};
use postgres_backend::AuthType;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use std::env;
use std::fs;
@@ -32,6 +33,7 @@ pub const DEFAULT_PG_VERSION: u32 = 15;
// to 'neon_local init --config=<path>' option. See control_plane/simple.conf for
// an example.
//
#[serde_as]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct LocalEnv {
// Base directory for all the nodes (the pageserver, safekeepers and
@@ -57,6 +59,7 @@ pub struct LocalEnv {
// Default tenant ID to use with the 'neon_local' command line utility, when
// --tenant_id is not explicitly specified.
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub default_tenant_id: Option<TenantId>,
// used to issue tokens during e.g pg start
@@ -81,6 +84,7 @@ pub struct LocalEnv {
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
#[serde_as(as = "HashMap<_, Vec<(DisplayFromStr, DisplayFromStr)>>")]
branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
}

View File

@@ -6,6 +6,7 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -18,6 +19,7 @@ pub type PgIdent = String;
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[serde_as]
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct ComputeSpec {
pub format_version: f32,
@@ -48,12 +50,12 @@ pub struct ComputeSpec {
// these, and instead set the "neon.tenant_id", "neon.timeline_id",
// etc. GUCs in cluster.settings. TODO: Once the control plane has been
// updated to fill these fields, we can make these non optional.
#[serde_as(as = "Option<DisplayFromStr>")]
pub tenant_id: Option<TenantId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub timeline_id: Option<TimelineId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub pageserver_connstring: Option<String>,
#[serde(default)]
pub safekeeper_connstrings: Vec<String>,
@@ -138,13 +140,14 @@ impl RemoteExtSpec {
}
}
#[serde_as]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
pub enum ComputeMode {
/// A read-write node
#[default]
Primary,
/// A read-only node, pinned at a particular LSN
Static(Lsn),
Static(#[serde_as(as = "DisplayFromStr")] Lsn),
/// A read-only node that follows the tip of the branch in hot standby mode
///
/// Future versions may want to distinguish between replicas with hot standby

View File

@@ -4,6 +4,7 @@
//! See docs/rfcs/025-generation-numbers.md
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::id::{NodeId, TenantId};
#[derive(Serialize, Deserialize)]
@@ -11,8 +12,10 @@ pub struct ReAttachRequest {
pub node_id: NodeId,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct ReAttachResponseTenant {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
pub generation: u32,
}
@@ -22,8 +25,10 @@ pub struct ReAttachResponse {
pub tenants: Vec<ReAttachResponseTenant>,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct ValidateRequestTenant {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
pub gen: u32,
}
@@ -38,8 +43,10 @@ pub struct ValidateResponse {
pub tenants: Vec<ValidateResponseTenant>,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct ValidateResponseTenant {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
pub valid: bool,
}

View File

@@ -6,7 +6,7 @@ use std::{
use byteorder::{BigEndian, ReadBytesExt};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use serde_with::{serde_as, DisplayFromStr};
use strum_macros;
use utils::{
completion,
@@ -175,19 +175,25 @@ pub enum TimelineState {
Broken { reason: String, backtrace: String },
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
#[serde_as(as = "DisplayFromStr")]
pub new_timeline_id: TimelineId,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<TimelineId>,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_start_lsn: Option<Lsn>,
pub pg_version: Option<u32>,
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantCreateRequest {
#[serde_as(as = "DisplayFromStr")]
pub new_tenant_id: TenantId,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
@@ -196,6 +202,7 @@ pub struct TenantCreateRequest {
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
}
#[serde_as]
#[derive(Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantLoadRequest {
@@ -272,26 +279,31 @@ pub struct LocationConfig {
pub tenant_conf: TenantConfig,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
pub struct TenantCreateResponse(pub TenantId);
pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub TenantId);
#[derive(Serialize)]
pub struct StatusResponse {
pub id: NodeId,
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantLocationConfigRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde(flatten)]
pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantConfigRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde(flatten)]
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
@@ -363,8 +375,10 @@ pub enum TenantAttachmentStatus {
Failed { reason: String },
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
pub struct TenantInfo {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
// NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
pub state: TenantState,
@@ -375,22 +389,33 @@ pub struct TenantInfo {
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelineInfo {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: TimelineId,
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<TimelineId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub last_record_lsn: Lsn,
#[serde_as(as = "Option<DisplayFromStr>")]
pub prev_record_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub latest_gc_cutoff_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,
/// The LSN that we have succesfully uploaded to remote storage
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
/// The LSN that we are advertizing to safekeepers
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn_visible: Lsn,
pub current_logical_size: Option<u64>, // is None when timeline is Unloaded
@@ -402,6 +427,7 @@ pub struct TimelineInfo {
pub timeline_dir_layer_file_size_sum: Option<u64>,
pub wal_source_connstr: Option<String>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub last_received_msg_lsn: Option<Lsn>,
/// the timestamp (in microseconds) of the last received message
pub last_received_msg_ts: Option<u128>,
@@ -498,13 +524,23 @@ pub struct LayerAccessStats {
pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
}
#[serde_as]
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind")]
pub enum InMemoryLayerInfo {
Open { lsn_start: Lsn },
Frozen { lsn_start: Lsn, lsn_end: Lsn },
Open {
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
},
Frozen {
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
#[serde_as(as = "DisplayFromStr")]
lsn_end: Lsn,
},
}
#[serde_as]
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind")]
pub enum HistoricLayerInfo {
@@ -512,7 +548,9 @@ pub enum HistoricLayerInfo {
layer_file_name: String,
layer_file_size: u64,
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
#[serde_as(as = "DisplayFromStr")]
lsn_end: Lsn,
remote: bool,
access_stats: LayerAccessStats,
@@ -521,6 +559,7 @@ pub enum HistoricLayerInfo {
layer_file_name: String,
layer_file_size: u64,
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
remote: bool,
access_stats: LayerAccessStats,

View File

@@ -442,10 +442,20 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
trace!("got message {:?}", msg);
let result = self.process_message(handler, msg, &mut query_string).await;
self.flush().await?;
tokio::select!(
biased;
_ = shutdown_watcher() => {
// We were requested to shut down.
tracing::info!("shutdown request received during response flush");
return Ok(())
},
flush_r = self.flush() => {
flush_r?;
}
);
match result? {
ProcessMsgResult::Continue => {
self.flush().await?;
continue;
}
ProcessMsgResult::Break => break,

View File

@@ -1,18 +1,23 @@
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::{
id::{NodeId, TenantId, TimelineId},
lsn::Lsn,
};
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: TimelineId,
pub peer_ids: Option<Vec<NodeId>>,
pub pg_version: u32,
pub system_id: Option<u64>,
pub wal_seg_size: Option<u32>,
#[serde_as(as = "DisplayFromStr")]
pub commit_lsn: Lsn,
// If not passed, it is assigned to the beginning of commit_lsn segment.
pub local_start_lsn: Option<Lsn>,
@@ -23,6 +28,7 @@ fn lsn_invalid() -> Lsn {
}
/// Data about safekeeper's timeline, mirrors broker.proto.
#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SkTimelineInfo {
/// Term.
@@ -30,19 +36,25 @@ pub struct SkTimelineInfo {
/// Term of the last entry.
pub last_log_term: Option<u64>,
/// LSN of the last record.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub flush_lsn: Lsn,
/// Up to which LSN safekeeper regards its WAL as committed.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub commit_lsn: Lsn,
/// LSN up to which safekeeper has backed WAL.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub backup_lsn: Lsn,
/// LSN of last checkpoint uploaded by pageserver.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub remote_consistent_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub peer_horizon_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub local_start_lsn: Lsn,
/// A connection string to use for WAL receiving.

View File

@@ -55,7 +55,6 @@ bytes.workspace = true
criterion.workspace = true
hex-literal.workspace = true
camino-tempfile.workspace = true
serde_assert.workspace = true
[[bench]]
name = "benchmarks"

View File

@@ -9,6 +9,7 @@ use jsonwebtoken::{
decode, encode, Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation,
};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use crate::id::TenantId;
@@ -31,9 +32,11 @@ pub enum Scope {
}
/// JWT payload. See docs/authentication.md for the format
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct Claims {
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub tenant_id: Option<TenantId>,
pub scope: Scope,
}

View File

@@ -119,6 +119,7 @@ pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
match api_error {
ApiError::ResourceUnavailable(_) => info!("Error processing HTTP request: {api_error:#}"),
ApiError::NotFound(_) => info!("Error processing HTTP request: {api_error:#}"),
ApiError::InternalServerError(_) => error!("Error processing HTTP request: {api_error:?}"),
_ => error!("Error processing HTTP request: {api_error:#}"),
}

View File

@@ -3,7 +3,6 @@ use std::{fmt, str::FromStr};
use anyhow::Context;
use hex::FromHex;
use rand::Rng;
use serde::de::Visitor;
use serde::{Deserialize, Serialize};
use thiserror::Error;
@@ -21,92 +20,9 @@ pub enum IdError {
///
/// Use `#[serde_as(as = "DisplayFromStr")]` to (de)serialize it as hex string instead: `ad50847381e248feaac9876cc71ae418`.
/// Check the `serde_with::serde_as` documentation for options for more complex types.
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
struct Id([u8; 16]);
impl Serialize for Id {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
serializer.serialize_newtype_struct("Id", &self.0)
}
}
}
impl<'de> Deserialize<'de> for Id {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct NonHumanReadable;
/// This implementation is from an `#[derive(serde::Serialize)]` expansion
/// which used to be used.
impl<'de> Visitor<'de> for NonHumanReadable {
type Value = Id;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("tuple struct Id")
}
fn visit_newtype_struct<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: serde::Deserializer<'de>,
{
Deserialize::deserialize(deserializer).map(Id)
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
match seq.next_element::<[u8; 16]>()? {
Some(only) => Ok(Id(only)),
None => Err(serde::de::Error::invalid_length(
1,
&"tuple struct Id with 1 element",
)),
}
}
}
struct HumanReadable;
impl<'de> Visitor<'de> for HumanReadable {
type Value = Id;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("hex string of 32 characters")
}
fn visit_newtype_struct<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = Deserialize::deserialize(deserializer)?;
self.visit_str(s)
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Id::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_newtype_struct("Id", HumanReadable)
} else {
deserializer.deserialize_newtype_struct("Id", NonHumanReadable)
}
}
}
impl Id {
pub fn get_from_buf(buf: &mut impl bytes::Buf) -> Id {
let mut arr = [0u8; 16];
@@ -392,115 +308,3 @@ impl fmt::Display for NodeId {
write!(f, "{}", self.0)
}
}
#[cfg(test)]
mod tests {
use serde_assert::{Deserializer, Serializer, Token, Tokens};
use crate::bin_ser::BeSer;
use super::*;
#[test]
fn test_id_serde_non_human_readable() {
let original_id = Id([
173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24,
]);
let expected_tokens = Tokens(vec![
Token::Tuple { len: 16 },
Token::U8(173),
Token::U8(80),
Token::U8(132),
Token::U8(115),
Token::U8(129),
Token::U8(226),
Token::U8(72),
Token::U8(254),
Token::U8(170),
Token::U8(201),
Token::U8(135),
Token::U8(108),
Token::U8(199),
Token::U8(26),
Token::U8(228),
Token::U8(24),
Token::TupleEnd,
]);
let serializer = Serializer::builder().is_human_readable(false).build();
let serialized_tokens = original_id.serialize(&serializer).unwrap();
assert_eq!(serialized_tokens, expected_tokens);
let mut deserializer = Deserializer::builder()
.is_human_readable(false)
.tokens(serialized_tokens)
.build();
let deserialized_id = Id::deserialize(&mut deserializer).unwrap();
assert_eq!(deserialized_id, original_id);
}
#[test]
fn test_id_serde_human_readable() {
let original_id = Id([
173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24,
]);
let expected_tokens = Tokens(vec![Token::Str(String::from(
"ad50847381e248feaac9876cc71ae418",
))]);
let serializer = Serializer::builder().is_human_readable(true).build();
let serialized_tokens = original_id.serialize(&serializer).unwrap();
assert_eq!(serialized_tokens, expected_tokens);
let mut deserializer = Deserializer::builder()
.is_human_readable(true)
// .self_describing(true)
.tokens(Tokens(vec![Token::Str(String::from(
"ad50847381e248feaac9876cc71ae418",
))]))
.build();
assert_eq!(Id::deserialize(&mut deserializer).unwrap(), original_id);
}
/*
macro_rules! roundtrip_type {
($type:ty, $expected_bytes:expr) => {{
let expected_bytes: [u8; 16] = $expected_bytes;
let original_id = <$type>::from(expected_bytes);
let ser_bytes = original_id.ser().unwrap();
assert_eq!(ser_bytes, expected_bytes);
let des_id = <$type>::des(&ser_bytes).unwrap();
assert_eq!(des_id, original_id);
}};
}
#[test]
fn test_id_bincode_serde() {
let expected_bytes = [
173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24,
];
roundtrip_type!(Id, expected_bytes);
}
#[test]
fn test_tenant_id_bincode_serde() {
let expected_bytes = [
173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24,
];
roundtrip_type!(TenantId, expected_bytes);
}
#[test]
fn test_timeline_id_bincode_serde() {
let expected_bytes = [
173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24,
];
roundtrip_type!(TimelineId, expected_bytes);
}
*/
}

View File

@@ -1,7 +1,7 @@
#![warn(missing_docs)]
use camino::Utf8Path;
use serde::{de::Visitor, Deserialize, Serialize};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::{Add, AddAssign};
use std::str::FromStr;
@@ -13,98 +13,10 @@ use crate::seqwait::MonotonicCounter;
pub const XLOG_BLCKSZ: u32 = 8192;
/// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash)]
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct Lsn(pub u64);
impl Serialize for Lsn {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
self.0.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for Lsn {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct LsnVisitor;
impl<'de> Visitor<'de> for LsnVisitor {
type Value = Lsn;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("LSN as either split hex string or plain unsigned integer")
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Lsn(v))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Lsn::from_str(v).map_err(|e| E::custom(e))
}
}
deserializer.deserialize_any(LsnVisitor)
}
}
/// Allows (de)serialization of an `Lsn` always as `u64`.
///
/// ### Example
///
/// ```rust
/// # use serde::{Serialize, Deserialize};
/// use utils::lsn::Lsn;
///
/// #[derive(Partialeq, Serialize, Deserialize)]
/// struct Foo {
/// #[serde(with = "utils::lsn::as_u64")]
/// always_u64: Lsn,
/// }
///
/// let orig = Foo { always_u64: Lsn(1234) };
///
/// let res = serde_json::to_string(&).unwrap();
/// assert_eq!(res, r#"{"always_u64": 1234}"#);
///
/// let foo = serde_json::from_str::<Foo>(&res).unwrap();
/// assert_eq!(res, orig);
/// ```
///
pub mod serde_as_u64 {
use super::Lsn;
/// Serializes the Lsn as u64 disregarding the human readability of the format.
///
/// Meant to be used via `#[serde(with = "...")]` or `#[serde(serialize_with = "...")]`.
pub fn serialize<S: serde::Serializer>(lsn: &Lsn, serializer: S) -> Result<S::Ok, S::Error> {
use serde::Serialize;
lsn.0.serialize(serializer)
}
/// Deserializes the Lsn as u64 disregarding the human readability of the format.
///
/// Meant to be used via `#[serde(with = "...")]` or `#[serde(deserialize_with = "...")]`.
pub fn deserialize<'de, D: serde::Deserializer<'de>>(deserializer: D) -> Result<Lsn, D::Error> {
use serde::Deserialize;
u64::deserialize(deserializer).map(Lsn)
}
}
/// We tried to parse an LSN from a string, but failed
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
#[error("LsnParseError")]
@@ -352,13 +264,8 @@ impl MonotonicCounter<Lsn> for RecordLsn {
#[cfg(test)]
mod tests {
use crate::bin_ser::BeSer;
use super::*;
use serde::ser::Serialize;
use serde_assert::{Deserializer, Serializer, Token, Tokens};
#[test]
fn test_lsn_strings() {
assert_eq!("12345678/AAAA5555".parse(), Ok(Lsn(0x12345678AAAA5555)));
@@ -434,78 +341,4 @@ mod tests {
assert_eq!(lsn.fetch_max(Lsn(6000)), Lsn(5678));
assert_eq!(lsn.fetch_max(Lsn(5000)), Lsn(6000));
}
#[test]
fn lsn_serde_tokens_humanreadable() {
// Serializer::is_human_readable is for example json
let original_lsn = Lsn(0x0123456789abcdef);
let expected_readable_tokens = Tokens(vec![Token::Str(String::from("1234567/89ABCDEF"))]);
let serializer = Serializer::builder().is_human_readable(true).build();
let readable_ser_tokens = original_lsn.serialize(&serializer).unwrap();
assert_eq!(readable_ser_tokens, expected_readable_tokens);
let mut deserializer = Deserializer::builder()
.is_human_readable(true)
.tokens(readable_ser_tokens)
.build();
let des_lsn = Lsn::deserialize(&mut deserializer).unwrap();
assert_eq!(des_lsn, original_lsn);
}
#[test]
fn lsn_serde_tokens_nonhumanreadable() {
// !Serializer::is_human_readable is for example bincode
let original_lsn = Lsn(0x0123456789abcdef);
let expected_non_readable_tokens = Tokens(vec![Token::U64(0x0123456789abcdef)]);
let serializer = Serializer::builder().is_human_readable(false).build();
let non_readable_ser_tokens = original_lsn.serialize(&serializer).unwrap();
assert_eq!(non_readable_ser_tokens, expected_non_readable_tokens);
let mut deserializer = Deserializer::builder()
.is_human_readable(false)
.tokens(non_readable_ser_tokens)
.build();
let des_lsn = Lsn::deserialize(&mut deserializer).unwrap();
assert_eq!(des_lsn, original_lsn);
}
#[test]
fn non_human_readable_does_not_accept_string() {
let original_lsn = Lsn(0x0123456789abcdef);
let serializer = Serializer::builder().is_human_readable(false).build();
let non_readable_ser_tokens = original_lsn.serialize(&serializer).unwrap();
let mut deserializer = Deserializer::builder()
.is_human_readable(true)
.tokens(non_readable_ser_tokens)
.build();
Lsn::deserialize(&mut deserializer).unwrap_err();
}
#[test]
fn human_readable_does_not_accept_u64() {
let original_lsn = Lsn(0x0123456789abcdef);
let serializer = Serializer::builder().is_human_readable(true).build();
let readable_ser_tokens = original_lsn.serialize(&serializer).unwrap();
let mut deserializer = Deserializer::builder()
.is_human_readable(false)
.tokens(readable_ser_tokens)
.build();
Lsn::deserialize(&mut deserializer).unwrap_err();
}
#[test]
fn test_lsn_bincode_roundtrips() {
let lsn = Lsn(0x0123456789abcdef);
let expected_bytes = [0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef];
let ser_bytes = lsn.ser().unwrap();
assert_eq!(ser_bytes, expected_bytes);
let des_lsn = Lsn::des(&ser_bytes).unwrap();
assert_eq!(des_lsn, lsn);
}
}

View File

@@ -3,6 +3,7 @@ use std::time::{Duration, SystemTime};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use pq_proto::{read_cstr, PG_EPOCH};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use tracing::{trace, warn};
use crate::lsn::Lsn;
@@ -14,17 +15,21 @@ use crate::lsn::Lsn;
///
/// serde Serialize is used only for human readable dump to json (e.g. in
/// safekeepers debug_dump).
#[serde_as]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct PageserverFeedback {
/// Last known size of the timeline. Used to enforce timeline size limit.
pub current_timeline_size: u64,
/// LSN last received and ingested by the pageserver. Controls backpressure.
#[serde_as(as = "DisplayFromStr")]
pub last_received_lsn: Lsn,
/// LSN up to which data is persisted by the pageserver to its local disc.
/// Controls backpressure.
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,
/// LSN up to which data is persisted by the pageserver on s3; safekeepers
/// consider WAL before it can be removed.
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
// Serialize with RFC3339 format.
#[serde(with = "serde_systemtime")]

View File

@@ -8,6 +8,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde::de::IntoDeserializer;
use std::env;
use std::ops::Deref;
use storage_broker::Uri;
use utils::crashsafe::path_with_suffix_extension;
use utils::id::ConnectionId;
@@ -153,7 +154,7 @@ pub struct PageServerConf {
// that during unit testing, because the current directory is global
// to the process but different unit tests work on different
// repositories.
pub workdir: Utf8PathBuf,
pub workdir: PageserverConfWorkdir,
pub pg_distrib_dir: Utf8PathBuf,
@@ -241,6 +242,45 @@ impl<T> BuilderValue<T> {
}
}
#[derive(Clone, PartialEq, Eq)]
pub struct PageserverConfWorkdir {
d: Utf8PathBuf,
}
impl Deref for PageServerConf {
type Target = PageserverConfWorkdir;
fn deref(&self) -> &Self::Target {
&self.workdir
}
}
impl Deref for PageserverConfWorkdir {
type Target = Utf8Path;
fn deref(&self) -> &Self::Target {
&self.d
}
}
impl AsRef<std::path::Path> for PageserverConfWorkdir {
fn as_ref(&self) -> &std::path::Path {
self.d.as_ref()
}
}
impl std::fmt::Debug for PageserverConfWorkdir {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.d.fmt(f)
}
}
impl std::fmt::Display for PageserverConfWorkdir {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.d.fmt(f)
}
}
// needed to simplify config construction
struct PageServerConfigBuilder {
listen_pg_addr: BuilderValue<String>,
@@ -257,7 +297,7 @@ struct PageServerConfigBuilder {
page_cache_size: BuilderValue<usize>,
max_file_descriptors: BuilderValue<usize>,
workdir: BuilderValue<Utf8PathBuf>,
workdir: BuilderValue<PageserverConfWorkdir>,
pg_distrib_dir: BuilderValue<Utf8PathBuf>,
@@ -310,7 +350,9 @@ impl Default for PageServerConfigBuilder {
superuser: Set(DEFAULT_SUPERUSER.to_string()),
page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
workdir: Set(Utf8PathBuf::new()),
workdir: Set(PageserverConfWorkdir {
d: Utf8PathBuf::new(),
}),
pg_distrib_dir: Set(Utf8PathBuf::from_path_buf(
env::current_dir().expect("cannot access current directory"),
)
@@ -399,7 +441,7 @@ impl PageServerConfigBuilder {
}
pub fn workdir(&mut self, workdir: Utf8PathBuf) {
self.workdir = BuilderValue::Set(workdir)
self.workdir = BuilderValue::Set(PageserverConfWorkdir { d: workdir })
}
pub fn pg_distrib_dir(&mut self, pg_distrib_dir: Utf8PathBuf) {
@@ -599,17 +641,17 @@ impl PageServerConfigBuilder {
}
}
impl PageServerConf {
impl PageserverConfWorkdir {
//
// Repository paths, relative to workdir.
//
pub fn tenants_path(&self) -> Utf8PathBuf {
self.workdir.join(TENANTS_SEGMENT_NAME)
self.d.join(TENANTS_SEGMENT_NAME)
}
pub fn deletion_prefix(&self) -> Utf8PathBuf {
self.workdir.join("deletion")
self.d.join("deletion")
}
pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
@@ -692,7 +734,7 @@ impl PageServerConf {
}
pub fn traces_path(&self) -> Utf8PathBuf {
self.workdir.join("traces")
self.d.join("traces")
}
pub fn trace_path(
@@ -716,9 +758,11 @@ impl PageServerConf {
/// Turns storage remote path of a file into its local path.
pub fn local_path(&self, remote_path: &RemotePath) -> Utf8PathBuf {
remote_path.with_base(&self.workdir)
remote_path.with_base(&self.d)
}
}
impl PageServerConf {
//
// Postgres distribution paths
//
@@ -958,6 +1002,9 @@ impl PageServerConf {
}
pub fn dummy_conf(repo_dir: Utf8PathBuf) -> Self {
let repo_dir = PageserverConfWorkdir { d: repo_dir };
let pg_distrib_dir = Utf8PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
PageServerConf {
@@ -1495,10 +1542,12 @@ threshold = "20m"
Ok(())
}
fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(Utf8PathBuf, Utf8PathBuf)> {
fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(PageserverConfWorkdir, Utf8PathBuf)> {
let tempdir_path = tempdir.path();
let workdir = tempdir_path.join("workdir");
let workdir = PageserverConfWorkdir {
d: tempdir_path.join("workdir"),
};
fs::create_dir_all(&workdir)?;
let pg_distrib_dir = tempdir_path.join("pg_distrib");

View File

@@ -3,6 +3,7 @@ use anyhow::Context;
use chrono::{DateTime, Utc};
use consumption_metrics::EventType;
use futures::stream::StreamExt;
use serde_with::serde_as;
use std::{sync::Arc, time::SystemTime};
use utils::{
id::{TenantId, TimelineId},
@@ -41,10 +42,13 @@ pub(super) enum Name {
///
/// This is a denormalization done at the MetricsKey const methods; these should not be constructed
/// elsewhere.
#[serde_with::serde_as]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub(crate) struct MetricsKey {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub(super) tenant_id: TenantId,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) timeline_id: Option<TimelineId>,

View File

@@ -1,4 +1,5 @@
use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
use serde_with::serde_as;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
@@ -6,9 +7,12 @@ use super::{metrics::Name, Cache, MetricsKey, RawMetric};
use utils::id::{TenantId, TimelineId};
/// How the metrics from pageserver are identified.
#[serde_with::serde_as]
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
struct Ids {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub(super) tenant_id: TenantId,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) timeline_id: Option<TimelineId>,
}

View File

@@ -17,6 +17,7 @@ use hex::FromHex;
use remote_storage::{GenericRemoteStorage, RemotePath};
use serde::Deserialize;
use serde::Serialize;
use serde_with::serde_as;
use thiserror::Error;
use tokio;
use tokio_util::sync::CancellationToken;
@@ -211,8 +212,9 @@ where
/// Files ending with this suffix will be ignored and erased
/// during recovery as startup.
const TEMP_SUFFIX: &str = ".tmp";
const TEMP_SUFFIX: &str = "tmp";
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
struct DeletionList {
/// Serialization version, for future use
@@ -241,6 +243,7 @@ struct DeletionList {
validated: bool,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
struct DeletionHeader {
/// Serialization version, for future use

View File

@@ -230,6 +230,7 @@ impl ListWriter {
let list_name_pattern =
Regex::new("(?<sequence>[a-zA-Z0-9]{16})-(?<version>[a-zA-Z0-9]{2}).list").unwrap();
let temp_extension = format!(".{TEMP_SUFFIX}");
let header_path = self.conf.deletion_header_path();
let mut seqs: Vec<u64> = Vec::new();
while let Some(dentry) = dir.next_entry().await? {
@@ -241,7 +242,7 @@ impl ListWriter {
continue;
}
if dentry_str.ends_with(TEMP_SUFFIX) {
if dentry_str.ends_with(&temp_extension) {
info!("Cleaning up temporary file {dentry_str}");
let absolute_path =
deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));

View File

@@ -396,6 +396,9 @@ async fn timeline_create_handler(
format!("{err:#}")
))
}
Err(e @ tenant::CreateTimelineError::AncestorNotActive) => {
json_response(StatusCode::SERVICE_UNAVAILABLE, HttpErrorBody::from_msg(e.to_string()))
}
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
}
}
@@ -753,8 +756,10 @@ async fn tenant_size_handler(
}
/// The type resides in the pageserver not to expose `ModelInputs`.
#[serde_with::serde_as]
#[derive(serde::Serialize)]
struct TenantHistorySize {
#[serde_as(as = "serde_with::DisplayFromStr")]
id: TenantId,
/// Size is a mixture of WAL and logical size, so the unit is bytes.
///

View File

@@ -35,6 +35,7 @@ use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken;
use tracing::field;
use tracing::*;
use utils::id::ConnectionId;
@@ -64,69 +65,6 @@ use crate::trace::Tracer;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
fn copyin_stream<IO>(pgb: &mut PostgresBackend<IO>) -> impl Stream<Item = io::Result<Bytes>> + '_
where
IO: AsyncRead + AsyncWrite + Unpin,
{
async_stream::try_stream! {
loop {
let msg = tokio::select! {
biased;
_ = task_mgr::shutdown_watcher() => {
// We were requested to shut down.
let msg = "pageserver is shutting down";
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
Err(QueryError::Other(anyhow::anyhow!(msg)))
}
msg = pgb.read_message() => { msg.map_err(QueryError::from)}
};
match msg {
Ok(Some(message)) => {
let copy_data_bytes = match message {
FeMessage::CopyData(bytes) => bytes,
FeMessage::CopyDone => { break },
FeMessage::Sync => continue,
FeMessage::Terminate => {
let msg = "client terminated connection with Terminate message during COPY";
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
break;
}
m => {
let msg = format!("unexpected message {m:?}");
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
Err(io::Error::new(io::ErrorKind::Other, msg))?;
break;
}
};
yield copy_data_bytes;
}
Ok(None) => {
let msg = "client closed connection during COPY";
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
pgb.flush().await?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
}
Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
Err(io_error)?;
}
Err(other) => {
Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
}
};
}
}
}
/// Read the end of a tar archive.
///
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
@@ -284,7 +222,13 @@ async fn page_service_conn_main(
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
let mut conn_handler = PageServerHandler::new(
conf,
broker_client,
auth,
connection_ctx,
task_mgr::shutdown_token(),
);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend
@@ -318,6 +262,10 @@ struct PageServerHandler {
/// For each query received over the connection,
/// `process_query` creates a child context from this one.
connection_ctx: RequestContext,
/// A token that should fire when the tenant transitions from
/// attached state, or when the pageserver is shutting down.
cancel: CancellationToken,
}
impl PageServerHandler {
@@ -326,6 +274,7 @@ impl PageServerHandler {
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<JwtAuth>>,
connection_ctx: RequestContext,
cancel: CancellationToken,
) -> Self {
PageServerHandler {
_conf: conf,
@@ -333,6 +282,91 @@ impl PageServerHandler {
auth,
claims: None,
connection_ctx,
cancel,
}
}
/// Wrap PostgresBackend::flush to respect our CancellationToken: it is important to use
/// this rather than naked flush() in order to shut down promptly. Without this, we would
/// block shutdown of a tenant if a postgres client was failing to consume bytes we send
/// in the flush.
async fn flush_cancellable<IO>(&self, pgb: &mut PostgresBackend<IO>) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
tokio::select!(
flush_r = pgb.flush() => {
Ok(flush_r?)
},
_ = self.cancel.cancelled() => {
Err(QueryError::Other(anyhow::anyhow!("Shutting down")))
}
)
}
fn copyin_stream<'a, IO>(
&'a self,
pgb: &'a mut PostgresBackend<IO>,
) -> impl Stream<Item = io::Result<Bytes>> + 'a
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
async_stream::try_stream! {
loop {
let msg = tokio::select! {
biased;
_ = task_mgr::shutdown_watcher() => {
// We were requested to shut down.
let msg = "pageserver is shutting down";
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
Err(QueryError::Other(anyhow::anyhow!(msg)))
}
msg = pgb.read_message() => { msg.map_err(QueryError::from)}
};
match msg {
Ok(Some(message)) => {
let copy_data_bytes = match message {
FeMessage::CopyData(bytes) => bytes,
FeMessage::CopyDone => { break },
FeMessage::Sync => continue,
FeMessage::Terminate => {
let msg = "client terminated connection with Terminate message during COPY";
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
break;
}
m => {
let msg = format!("unexpected message {m:?}");
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
Err(io::Error::new(io::ErrorKind::Other, msg))?;
break;
}
};
yield copy_data_bytes;
}
Ok(None) => {
let msg = "client closed connection during COPY";
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
self.flush_cancellable(pgb).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
}
Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
Err(io_error)?;
}
Err(other) => {
Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
}
};
}
}
}
@@ -372,7 +406,7 @@ impl PageServerHandler {
// switch client to COPYBOTH
pgb.write_message_noflush(&BeMessage::CopyBothResponse)?;
pgb.flush().await?;
self.flush_cancellable(pgb).await?;
let metrics = metrics::SmgrQueryTimePerTimeline::new(&tenant_id, &timeline_id);
@@ -465,7 +499,7 @@ impl PageServerHandler {
});
pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?;
pgb.flush().await?;
self.flush_cancellable(pgb).await?;
}
Ok(())
}
@@ -508,9 +542,9 @@ impl PageServerHandler {
// Import basebackup provided via CopyData
info!("importing basebackup");
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
self.flush_cancellable(pgb).await?;
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
timeline
.import_basebackup_from_tar(
&mut copyin_reader,
@@ -563,8 +597,8 @@ impl PageServerHandler {
// Import wal provided via CopyData
info!("importing wal");
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
pgb.flush().await?;
let mut copyin_reader = pin!(StreamReader::new(copyin_stream(pgb)));
self.flush_cancellable(pgb).await?;
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb)));
import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
info!("wal import complete");
@@ -772,7 +806,7 @@ impl PageServerHandler {
// switch client to COPYOUT
pgb.write_message_noflush(&BeMessage::CopyOutResponse)?;
pgb.flush().await?;
self.flush_cancellable(pgb).await?;
// Send a tarball of the latest layer on the timeline. Compress if not
// fullbackup. TODO Compress in that case too (tests need to be updated)
@@ -824,7 +858,7 @@ impl PageServerHandler {
}
pgb.write_message_noflush(&BeMessage::CopyDone)?;
pgb.flush().await?;
self.flush_cancellable(pgb).await?;
let basebackup_after = started
.elapsed()

View File

@@ -406,6 +406,8 @@ pub enum CreateTimelineError {
AlreadyExists,
#[error(transparent)]
AncestorLsn(anyhow::Error),
#[error("ancestor timeline is not active")]
AncestorNotActive,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
@@ -1587,6 +1589,12 @@ impl Tenant {
.get_timeline(ancestor_timeline_id, false)
.context("Cannot branch off the timeline that's not present in pageserver")?;
// instead of waiting around, just deny the request because ancestor is not yet
// ready for other purposes either.
if !ancestor_timeline.is_active() {
return Err(CreateTimelineError::AncestorNotActive);
}
if let Some(lsn) = ancestor_start_lsn.as_mut() {
*lsn = lsn.align();
@@ -1619,8 +1627,6 @@ impl Tenant {
}
};
loaded_timeline.activate(broker_client, None, ctx);
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
// Ok, the timeline is durable in remote storage.
@@ -1632,6 +1638,8 @@ impl Tenant {
})?;
}
loaded_timeline.activate(broker_client, None, ctx);
Ok(loaded_timeline)
}

View File

@@ -406,123 +406,4 @@ mod tests {
METADATA_OLD_FORMAT_VERSION, METADATA_FORMAT_VERSION
);
}
#[test]
fn test_metadata_bincode_serde() {
let original_metadata = TimelineMetadata::new(
Lsn(0x200),
Some(Lsn(0x100)),
Some(TIMELINE_ID),
Lsn(0),
Lsn(0),
Lsn(0),
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
);
let metadata_bytes = original_metadata
.to_bytes()
.expect("Cannot create bytes array from metadata");
let metadata_bincode_be_bytes = original_metadata
.ser()
.expect("Cannot serialize the metadata");
// 8 bytes for the length of the vector
assert_eq!(metadata_bincode_be_bytes.len(), 8 + metadata_bytes.len());
let expected_bincode_bytes = {
let mut temp = vec![];
let len_bytes = metadata_bytes.len().to_be_bytes();
temp.extend_from_slice(&len_bytes);
temp.extend_from_slice(&metadata_bytes);
temp
};
assert_eq!(metadata_bincode_be_bytes, expected_bincode_bytes);
let deserialized_metadata = TimelineMetadata::des(&metadata_bincode_be_bytes).unwrap();
// Deserialized metadata has the metadata header, which is different from the serialized one.
// Reference: TimelineMetaData::to_bytes()
let expected_metadata = {
let mut temp_metadata = original_metadata;
let body_bytes = temp_metadata
.body
.ser()
.expect("Cannot serialize the metadata body");
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_FORMAT_VERSION,
checksum: crc32c::crc32c(&body_bytes),
};
temp_metadata.hdr = hdr;
temp_metadata
};
assert_eq!(deserialized_metadata, expected_metadata);
}
#[test]
fn test_metadata_bincode_serde_ensure_roundtrip() {
let original_metadata = TimelineMetadata::new(
Lsn(0x200),
Some(Lsn(0x100)),
Some(TIMELINE_ID),
Lsn(0),
Lsn(0),
Lsn(0),
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
);
let expected_bytes = vec![
/* bincode length encoding bytes */
0, 0, 0, 0, 0, 0, 2, 0, // 8 bytes for the length of the serialized vector
/* TimelineMetadataHeader */
4, 37, 101, 34, 0, 70, 0, 4, // checksum, size, format_version (4 + 2 + 2)
/* TimelineMetadataBodyV2 */
0, 0, 0, 0, 0, 0, 2, 0, // disk_consistent_lsn (8 bytes)
1, 0, 0, 0, 0, 0, 0, 1, 0, // prev_record_lsn (9 bytes)
1, 17, 34, 51, 68, 85, 102, 119, 136, 17, 34, 51, 68, 85, 102, 119,
136, // ancestor_timeline (17 bytes)
0, 0, 0, 0, 0, 0, 0, 0, // ancestor_lsn (8 bytes)
0, 0, 0, 0, 0, 0, 0, 0, // latest_gc_cutoff_lsn (8 bytes)
0, 0, 0, 0, 0, 0, 0, 0, // initdb_lsn (8 bytes)
0, 0, 0, 15, // pg_version (4 bytes)
/* padding bytes */
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0,
];
let metadata_ser_bytes = original_metadata.ser().unwrap();
assert_eq!(metadata_ser_bytes, expected_bytes);
let expected_metadata = {
let mut temp_metadata = original_metadata;
let body_bytes = temp_metadata
.body
.ser()
.expect("Cannot serialize the metadata body");
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_FORMAT_VERSION,
checksum: crc32c::crc32c(&body_bytes),
};
temp_metadata.hdr = hdr;
temp_metadata
};
let des_metadata = TimelineMetadata::des(&metadata_ser_bytes).unwrap();
assert_eq!(des_metadata, expected_metadata);
}
}

View File

@@ -6,6 +6,7 @@ use std::collections::HashMap;
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::bin_ser::SerializeError;
use crate::tenant::metadata::TimelineMetadata;
@@ -57,6 +58,7 @@ impl LayerFileMetadata {
///
/// This type needs to be backwards and forwards compatible. When changing the fields,
/// remember to add a test case for the changed version.
#[serde_as]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct IndexPart {
/// Debugging aid describing the version of this type.
@@ -76,6 +78,7 @@ pub struct IndexPart {
// 'disk_consistent_lsn' is a copy of the 'disk_consistent_lsn' in the metadata.
// It's duplicated for convenience when reading the serialized structure, but is
// private because internally we would read from metadata instead.
#[serde_as(as = "DisplayFromStr")]
disk_consistent_lsn: Lsn,
#[serde(rename = "metadata_bytes")]

View File

@@ -31,6 +31,7 @@ pub(super) async fn upload_index_part<'a>(
fail_point!("before-upload-index", |_| {
bail!("failpoint before-upload-index")
});
pausable_failpoint!("before-upload-index-pausable");
let index_part_bytes =
serde_json::to_vec(&index_part).context("serialize index part file into bytes")?;

View File

@@ -29,6 +29,7 @@ use tenant_size_model::{Segment, StorageModel};
/// needs. We will convert this into a StorageModel when it's time to perform
/// the calculation.
///
#[serde_with::serde_as]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct ModelInputs {
pub segments: Vec<SegmentMeta>,
@@ -36,9 +37,11 @@ pub struct ModelInputs {
}
/// A [`Segment`], with some extra information for display purposes
#[serde_with::serde_as]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct SegmentMeta {
pub segment: Segment,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub timeline_id: TimelineId,
pub kind: LsnKind,
}
@@ -74,22 +77,32 @@ pub enum LsnKind {
/// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as
/// part of [`ModelInputs`] from the HTTP api, explaining the inputs.
#[serde_with::serde_as]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct TimelineInputs {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub timeline_id: TimelineId,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
pub ancestor_id: Option<TimelineId>,
#[serde_as(as = "serde_with::DisplayFromStr")]
ancestor_lsn: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
last_record: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
latest_gc_cutoff: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
horizon_cutoff: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
pitr_cutoff: Lsn,
/// Cutoff point based on GC settings
#[serde_as(as = "serde_with::DisplayFromStr")]
next_gc_cutoff: Lsn,
/// Cutoff point calculated from the user-supplied 'max_retention_period'
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
retention_param_cutoff: Option<Lsn>,
}

View File

@@ -3279,10 +3279,13 @@ struct CompactLevel0Phase1StatsBuilder {
new_deltas_size: Option<u64>,
}
#[serde_as]
#[derive(serde::Serialize)]
struct CompactLevel0Phase1Stats {
version: u64,
#[serde_as(as = "serde_with::DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "serde_with::DisplayFromStr")]
timeline_id: TimelineId,
read_lock_acquisition_micros: RecordedDuration,
read_lock_held_spawn_blocking_startup_micros: RecordedDuration,

View File

@@ -721,7 +721,7 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
req.buftag = tag;
Retry:
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &req);
if (entry != NULL)
@@ -858,7 +858,11 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
if (flush_every_n_requests > 0 &&
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
{
page_server->flush();
if (!page_server->flush())
{
/* Prefetch set is reset in case of error, so we should try to register our request once again */
goto Retry;
}
MyPState->ring_flush = MyPState->ring_unused;
}

View File

@@ -89,7 +89,10 @@ pub mod errors {
Self::Console {
status: http::StatusCode::LOCKED,
ref text,
} => !text.contains("quota"),
} => {
!text.contains("written data quota exceeded")
&& !text.contains("the limit for current plan reached")
}
// retry server errors
Self::Console { status, .. } if status.is_server_error() => true,
_ => false,

View File

@@ -100,6 +100,7 @@ pub struct SafekeeperData {
pub availability_zone_id: String,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub struct ProjectData {
pub id: ProjectId,
@@ -108,6 +109,7 @@ pub struct ProjectData {
pub platform_id: String,
pub user_id: String,
pub pageserver_id: u64,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub tenant: TenantId,
pub safekeepers: Vec<SafekeeperData>,
pub deleted: bool,
@@ -124,6 +126,7 @@ pub struct ProjectData {
pub maintenance_set: Option<String>,
}
#[serde_with::serde_as]
#[derive(Debug, serde::Deserialize)]
pub struct BranchData {
pub id: BranchId,
@@ -131,10 +134,12 @@ pub struct BranchData {
pub updated_at: DateTime<Utc>,
pub name: String,
pub project_id: ProjectId,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub timeline_id: TimelineId,
#[serde(default)]
pub parent_id: Option<BranchId>,
#[serde(default)]
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
pub parent_lsn: Option<Lsn>,
pub default: bool,
pub deleted: bool,

View File

@@ -13,6 +13,7 @@ use postgres_ffi::XLogSegNo;
use serde::Deserialize;
use serde::Serialize;
use serde_with::{serde_as, DisplayFromStr};
use utils::id::NodeId;
use utils::id::TenantTimelineId;
use utils::id::{TenantId, TimelineId};
@@ -73,9 +74,12 @@ pub struct Config {
pub wal_backup_enabled: bool,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
pub struct Timeline {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: TimelineId,
pub control_file: Option<SafeKeeperState>,
pub memory: Option<Memory>,

View File

@@ -4,6 +4,7 @@ use once_cell::sync::Lazy;
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::SkTimelineInfo;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::str::FromStr;
@@ -61,9 +62,11 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
/// Same as TermSwitchEntry, but serializes LSN using display serializer
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
pub struct TermSwitchApiEntry {
pub term: Term,
#[serde_as(as = "DisplayFromStr")]
pub lsn: Lsn,
}
@@ -76,18 +79,28 @@ pub struct AcceptorStateStatus {
}
/// Info about timeline on safekeeper ready for reporting.
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
pub struct TimelineStatus {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: TimelineId,
pub acceptor_state: AcceptorStateStatus,
pub pg_info: ServerInfo,
#[serde_as(as = "DisplayFromStr")]
pub flush_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub timeline_start_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub local_start_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub commit_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub backup_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub peer_horizon_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
pub peers: Vec<PeerInfo>,
pub walsenders: Vec<WalSenderState>,

View File

@@ -44,11 +44,8 @@ pub struct AppendLogicalMessage {
// fields from AppendRequestHeader
pub term: Term,
#[serde(with = "utils::lsn::serde_as_u64")]
pub epoch_start_lsn: Lsn,
#[serde(with = "utils::lsn::serde_as_u64")]
pub begin_lsn: Lsn,
#[serde(with = "utils::lsn::serde_as_u64")]
pub truncate_lsn: Lsn,
pub pg_version: u32,
}

View File

@@ -5,6 +5,8 @@ use tokio::io::AsyncWriteExt;
use tracing::info;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use serde_with::{serde_as, DisplayFromStr};
use crate::{
control_file, debug_dump,
http::routes::TimelineStatus,
@@ -13,9 +15,12 @@ use crate::{
};
/// Info about timeline on safekeeper ready for reporting.
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
pub struct Request {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: TimelineId,
pub http_hosts: Vec<String>,
}

View File

@@ -4,7 +4,6 @@ use anyhow::{bail, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use hex::FromHex;
use postgres_ffi::{TimeLineID, XLogSegNo, MAX_SEND_SIZE};
use serde::{Deserialize, Serialize};
use std::cmp::max;
@@ -282,7 +281,6 @@ impl SafeKeeperState {
/// Initial Proposer -> Acceptor message
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq, Serialize))]
pub struct ProposerGreeting {
/// proposer-acceptor protocol version
pub protocol_version: u32,
@@ -296,46 +294,6 @@ pub struct ProposerGreeting {
pub wal_seg_size: u32,
}
static EXAMPLE_PROPOSER_GREETING: &[u8] =
b"\x02\0\0\0\0q\x02\0\x80\xca+\x0e\xe8\x9e{\x94:b\xab\xe4\0\x1exo\0\0\0\0\0\0\0\0\xfa!\xa3\xc3\xa5s\x8d\xcc^\xd4\x1f\x8cA\x81\xb2\x13\x99\xcf:z& t\x82y\xbf\xee\x8aX\xad\r\xe1\x01\0\0\0\0\0\0\x01";
#[test]
fn serde_proposergreeting() {
let pg = ProposerGreeting::des(EXAMPLE_PROPOSER_GREETING).unwrap();
assert_eq!(
pg,
ProposerGreeting {
protocol_version: 2,
pg_version: 160000,
proposer_id: [128, 202, 43, 14, 232, 158, 123, 148, 58, 98, 171, 228, 0, 30, 120, 111],
system_id: 0,
timeline_id: TimelineId::from_hex("fa21a3c3a5738dcc5ed41f8c4181b213").unwrap(),
tenant_id: TenantId::from_hex("99cf3a7a2620748279bfee8a58ad0de1").unwrap(),
tli: 1,
wal_seg_size: 16777216
}
);
}
#[test]
fn ser_proposergreeting() {
let pg = ProposerGreeting {
protocol_version: 2,
pg_version: 160000,
proposer_id: [
128, 202, 43, 14, 232, 158, 123, 148, 58, 98, 171, 228, 0, 30, 120, 111,
],
system_id: 0,
timeline_id: TimelineId::from_hex("fa21a3c3a5738dcc5ed41f8c4181b213").unwrap(),
tenant_id: TenantId::from_hex("99cf3a7a2620748279bfee8a58ad0de1").unwrap(),
tli: 1,
wal_seg_size: 16777216,
};
assert_eq!(&pg.ser().unwrap(), EXAMPLE_PROPOSER_GREETING);
}
/// Acceptor -> Proposer initial response: the highest term known to me
/// (acceptor voted for).
#[derive(Debug, Serialize)]
@@ -444,14 +402,12 @@ impl ProposerAcceptorMessage {
/// Parse proposer message.
pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
// xxx using Reader is inefficient but easy to work with bincode
let raw = msg_bytes.clone();
let mut stream = msg_bytes.reader();
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
match tag {
'g' => {
tracing::info!("greeting in {raw:?}");
let msg = dbg!(ProposerGreeting::des_from(&mut stream))?;
let msg = ProposerGreeting::des_from(&mut stream)?;
Ok(ProposerAcceptorMessage::Greeting(msg))
}
'v' => {

View File

@@ -16,6 +16,7 @@ use postgres_ffi::get_current_timestamp;
use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use tokio::io::{AsyncRead, AsyncWrite};
use utils::id::TenantTimelineId;
use utils::lsn::AtomicLsn;
@@ -312,8 +313,10 @@ impl WalSendersShared {
}
// Serialized is used only for pretty printing in json.
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalSenderState {
#[serde_as(as = "DisplayFromStr")]
ttid: TenantTimelineId,
addr: SocketAddr,
conn_id: ConnectionId,

View File

@@ -5,8 +5,10 @@ use anyhow::{anyhow, bail, Result};
use camino::Utf8PathBuf;
use postgres_ffi::XLogSegNo;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use tokio::fs;
use serde_with::DisplayFromStr;
use std::cmp::max;
use std::sync::Arc;
use tokio::sync::{Mutex, MutexGuard};
@@ -39,16 +41,20 @@ use crate::SafeKeeperConf;
use crate::{debug_dump, wal_storage};
/// Things safekeeper should know about timeline state on peers.
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
pub sk_id: NodeId,
/// Term of the last entry.
_last_log_term: Term,
/// LSN of the last record.
#[serde_as(as = "DisplayFromStr")]
_flush_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub commit_lsn: Lsn,
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
/// sk since backup_lsn.
#[serde_as(as = "DisplayFromStr")]
pub local_start_lsn: Lsn,
/// When info was received. Serde annotations are not very useful but make
/// the code compile -- we don't rely on this field externally.

View File

@@ -1464,6 +1464,29 @@ class NeonCli(AbstractNeonCli):
return self.raw_cli(args, check_return_code=check_return_code)
def map_branch(
self, name: str, tenant_id: TenantId, timeline_id: TimelineId
) -> "subprocess.CompletedProcess[str]":
"""
Map tenant id and timeline id to a neon_local branch name. They do not have to exist.
Usually needed when creating branches via PageserverHttpClient and not neon_local.
After creating a name mapping, you can use EndpointFactory.create_start
with this registered branch name.
"""
args = [
"mappings",
"map",
"--branch-name",
name,
"--tenant-id",
str(tenant_id),
"--timeline-id",
str(timeline_id),
]
return self.raw_cli(args, check_return_code=True)
def start(self, check_return_code=True) -> "subprocess.CompletedProcess[str]":
return self.raw_cli(["start"], check_return_code=check_return_code)

View File

@@ -74,11 +74,14 @@ def wait_until_tenant_state(
for _ in range(iterations):
try:
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
except Exception as e:
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
else:
log.debug(f"Tenant {tenant_id} data: {tenant}")
if tenant["state"]["slug"] == expected_state:
return tenant
except Exception as e:
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
if tenant["state"]["slug"] == "Broken":
raise RuntimeError(f"tenant became Broken, not {expected_state}")
time.sleep(period)

View File

@@ -1,14 +1,24 @@
import random
import threading
import time
from typing import List
from queue import SimpleQueue
from typing import Any, Dict, List, Union
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgBin
from fixtures.types import Lsn
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
PgBin,
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import wait_until_tenant_active
from fixtures.types import Lsn, TimelineId
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
from requests import RequestException
from requests.exceptions import RetryError
# Test branch creation
@@ -128,3 +138,245 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
endpoint1 = env.endpoints.create_start("b1")
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])
def test_cannot_create_endpoint_on_non_uploaded_timeline(neon_env_builder: NeonEnvBuilder):
"""
Endpoint should not be possible to create because branch has not been uploaded.
"""
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.append(
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
)
env.pageserver.allowed_errors.append(
".*page_service_conn_main.*: query handler for 'basebackup .* is not active, state: Loading"
)
ps_http = env.pageserver.http_client()
# pause all uploads
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
ps_http.tenant_create(env.initial_tenant)
initial_branch = "initial_branch"
def start_creating_timeline():
with pytest.raises(RequestException):
ps_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
)
t = threading.Thread(target=start_creating_timeline)
try:
t.start()
wait_until_paused(env, "before-upload-index-pausable")
env.neon_cli.map_branch(initial_branch, env.initial_tenant, env.initial_timeline)
with pytest.raises(RuntimeError, match="is not active, state: Loading"):
env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant)
finally:
# FIXME: paused uploads bother shutdown
env.pageserver.stop(immediate=True)
t.join()
def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder):
"""
Branch should not be possible to create because ancestor has not been uploaded.
"""
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.append(
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
)
ps_http = env.pageserver.http_client()
# pause all uploads
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
ps_http.tenant_create(env.initial_tenant)
def start_creating_timeline():
with pytest.raises(RequestException):
ps_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
)
t = threading.Thread(target=start_creating_timeline)
try:
t.start()
wait_until_paused(env, "before-upload-index-pausable")
branch_id = TimelineId.generate()
with pytest.raises(RetryError, match="too many 503 error responses"):
ps_http.timeline_create(
env.pg_version,
env.initial_tenant,
branch_id,
ancestor_timeline_id=env.initial_timeline,
)
with pytest.raises(
PageserverApiException,
match=f"NotFound: Timeline {env.initial_tenant}/{branch_id} was not found",
):
ps_http.timeline_detail(env.initial_tenant, branch_id)
# important to note that a task might still be in progress to complete
# the work, but will never get to that because we have the pause
# failpoint
finally:
# FIXME: paused uploads bother shutdown
env.pageserver.stop(immediate=True)
t.join()
def test_competing_branchings_from_loading_race_to_ok_or_err(neon_env_builder: NeonEnvBuilder):
"""
If the activate only after upload is used, then retries could become competing.
"""
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.append(
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
)
env.pageserver.allowed_errors.append(
".*Error processing HTTP request: InternalServerError\\(Timeline .*/.* already exists in pageserver's memory"
)
ps_http = env.pageserver.http_client()
# pause all uploads
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
ps_http.tenant_create(env.initial_tenant)
def start_creating_timeline():
ps_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
)
create_root = threading.Thread(target=start_creating_timeline)
branch_id = TimelineId.generate()
queue: SimpleQueue[Union[Dict[Any, Any], Exception]] = SimpleQueue()
barrier = threading.Barrier(3)
def try_branch():
barrier.wait()
barrier.wait()
try:
ret = ps_http.timeline_create(
env.pg_version,
env.initial_tenant,
branch_id,
ancestor_timeline_id=env.initial_timeline,
timeout=5,
)
queue.put(ret)
except Exception as e:
queue.put(e)
threads = [threading.Thread(target=try_branch) for _ in range(2)]
try:
create_root.start()
for t in threads:
t.start()
wait_until_paused(env, "before-upload-index-pausable")
barrier.wait()
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
barrier.wait()
# now both requests race to branch, only one can win because they take gc_cs, Tenant::timelines or marker files
first = queue.get()
second = queue.get()
log.info(first)
log.info(second)
(succeeded, failed) = (first, second) if isinstance(second, Exception) else (second, first)
assert isinstance(failed, Exception)
assert isinstance(succeeded, Dict)
# FIXME: there's probably multiple valid status codes:
# - Timeline 62505b9a9f6b1d29117b1b74eaf07b12/56cd19d3b2dbcc65e9d53ec6ca304f24 already exists
# - whatever 409 response says, but that is a subclass of PageserverApiException
assert isinstance(failed, PageserverApiException)
assert succeeded["state"] == "Active"
finally:
# we might still have the failpoint active
env.pageserver.stop(immediate=True)
# pytest should nag if we leave threads unjoined
for t in threads:
t.join()
create_root.join()
def test_non_uploaded_branch_availability_after_restart(neon_env_builder: NeonEnvBuilder):
"""
Currently before RFC#27 we keep and continue uploading branches which were not successfully uploaded before shutdown.
This test likely duplicates some other test, but it's easier to write one than to make sure there will be a failing test when the rfc is implemented.
"""
env = neon_env_builder.init_configs()
env.start()
env.pageserver.allowed_errors.append(
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*"
)
ps_http = env.pageserver.http_client()
# pause all uploads
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
ps_http.tenant_create(env.initial_tenant)
def start_creating_timeline():
with pytest.raises(RequestException):
ps_http.timeline_create(
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
)
t = threading.Thread(target=start_creating_timeline)
try:
t.start()
wait_until_paused(env, "before-upload-index-pausable")
finally:
# FIXME: paused uploads bother shutdown
env.pageserver.stop(immediate=True)
t.join()
# now without a failpoint
env.pageserver.start()
wait_until_tenant_active(ps_http, env.initial_tenant)
# currently it lives on and will get eventually uploaded, but this will change
detail = ps_http.timeline_detail(env.initial_tenant, env.initial_timeline)
assert detail["state"] == "Active"
def wait_until_paused(env: NeonEnv, failpoint: str):
found = False
msg = f"at failpoint {failpoint}"
for _ in range(20):
time.sleep(1)
found = env.pageserver.log_contains(msg) is not None
if found:
break
assert found

View File

@@ -10,6 +10,7 @@ of the pageserver are:
"""
import enum
import re
import time
from typing import Optional
@@ -276,15 +277,28 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
assert get_deletion_queue_unexpected_errors(ps_http) == 0
@pytest.mark.parametrize("keep_attachment", [True, False])
@pytest.mark.parametrize("validate_before", [True, False])
class KeepAttachment(str, enum.Enum):
KEEP = "keep"
LOSE = "lose"
class ValidateBefore(str, enum.Enum):
VALIDATE = "validate"
NO_VALIDATE = "no-validate"
@pytest.mark.parametrize("keep_attachment", [KeepAttachment.KEEP, KeepAttachment.LOSE])
@pytest.mark.parametrize("validate_before", [ValidateBefore.VALIDATE, ValidateBefore.NO_VALIDATE])
def test_deletion_queue_recovery(
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, keep_attachment: bool, validate_before: bool
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
keep_attachment: KeepAttachment,
validate_before: ValidateBefore,
):
"""
:param keep_attachment: If true, we re-attach after restart. Else, we act as if some other
:param keep_attachment: whether to re-attach after restart. Else, we act as if some other
node took the attachment while we were restarting.
:param validate_before: If true, we wait for deletions to be validated before restart. This
:param validate_before: whether to wait for deletions to be validated before restart. This
makes them elegible to be executed after restart, if the same node keeps the attachment.
"""
neon_env_builder.enable_generations = True
@@ -300,7 +314,7 @@ def test_deletion_queue_recovery(
("deletion-queue-before-execute", "return"),
]
if not validate_before:
if validate_before == ValidateBefore.NO_VALIDATE:
failpoints.append(
# Prevent deletion lists from being validated, we will test that they are
# dropped properly during recovery. 'pause' is okay here because we kill
@@ -320,20 +334,25 @@ def test_deletion_queue_recovery(
assert get_deletion_queue_unexpected_errors(ps_http) == 0
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
if validate_before:
if validate_before == ValidateBefore.VALIDATE:
def assert_validation_complete():
assert get_deletion_queue_submitted(ps_http) == get_deletion_queue_validated(ps_http)
wait_until(20, 1, assert_validation_complete)
# A short wait to let the DeletionHeader get written out, as this happens after
# the validated count gets incremented.
time.sleep(1)
# The validatated keys statistic advances before the header is written, so we
# also wait to see the header hit the disk: this seems paranoid but the race
# can really happen on a heavily overloaded test machine.
def assert_header_written():
assert (env.pageserver.workdir / "deletion" / "header-01").exists()
wait_until(20, 1, assert_header_written)
log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued")
env.pageserver.stop(immediate=True)
if not keep_attachment:
if keep_attachment == KeepAttachment.LOSE:
some_other_pageserver = 101010
assert env.attachment_service is not None
env.attachment_service.attach_hook(env.initial_tenant, some_other_pageserver)
@@ -352,7 +371,7 @@ def test_deletion_queue_recovery(
ps_http.deletion_queue_flush(execute=True)
wait_until(10, 1, lambda: assert_deletion_queue(ps_http, lambda n: n == 0))
if keep_attachment or validate_before:
if keep_attachment == KeepAttachment.KEEP or validate_before == ValidateBefore.VALIDATE:
# - If we kept the attachment, then our pre-restart deletions should execute
# because on re-attach they were from the immediately preceding generation
# - If we validated before restart, then the deletions should execute because the

View File

@@ -17,6 +17,8 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
n_restarts = 10
scale = 10
env.pageserver.allowed_errors.append(".*query handler.*failed.*Shutting down")
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])

View File

@@ -752,6 +752,9 @@ def test_ignore_while_attaching(
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
# An endpoint is starting up concurrently with our detach, it can
# experience RPC failure due to shutdown.
env.pageserver.allowed_errors.append(".*query handler.*failed.*Shutting down")
data_id = 1
data_secret = "very secret secret"