Compare commits

...

32 Commits

Author SHA1 Message Date
Joonas Koivunen
ad41a2b4cf try: simple test for proposergreeting 2023-10-12 18:57:31 +00:00
Joonas Koivunen
14a8cce474 try: complicate IdVisitor 2023-10-12 18:57:11 +00:00
Joonas Koivunen
068ae5d94b try: restore derive(Serialize) like newtype for Id
it should not make any difference, but this is how I got the
proposergreeting to deserialize.
2023-10-12 18:56:20 +00:00
Joonas Koivunen
6a2028b10d safekeeper: retain u64 in json serialization 2023-10-09 16:41:45 +00:00
Joonas Koivunen
0d2b9aabc0 lsn: add serde_as_u64 mod 2023-10-09 16:41:27 +00:00
Joonas Koivunen
6717959e52 partial fix: safekeeper sync safekeepers uses integers for communicating
this means that somewhere in C there's an expectation of using LSN as
u64 in json.
2023-10-09 10:33:31 +00:00
Joonas Koivunen
9a035b8e4f fix: accept either u64 or Lsn::from_str
there are some existing tests using plain base10 integers in json with
safekeeper. Those could also be used by C code, so let's just leave it.
2023-10-09 10:06:59 +00:00
Joonas Koivunen
c4bb1398dc chore: needless mut 2023-10-09 08:30:06 +00:00
duguorong
b106118d3b refactor: remove "#[serde_as]" attr 2023-10-09 06:47:30 +00:00
duguorong
80d1155e79 test: add serde tests for "TimelineMetadata" 2023-10-09 06:47:30 +00:00
Joonas Koivunen
6d94603a68 test: deduplicate lsn tests 2023-10-09 06:47:02 +00:00
duguorong
b74abe6b7d feat: human_readable aware serde for "Lsn" 2023-10-09 06:46:44 +00:00
duguorong
059d5e8211 feat: human_readable aware serde impl for Id 2023-10-09 06:38:08 +00:00
Joonas Koivunen
b80c0dfa03 test: split, wrangle test cases
in at least one instance, the human readability was wrong way around.
2023-10-09 06:37:43 +00:00
duguorong
0ceae0ab4d test: additional Lsn serialization tests 2023-10-09 06:21:35 +00:00
Joonas Koivunen
8d38bf7e60 refactor(test): dedup roundtrip test cases 2023-10-09 06:16:59 +00:00
duguorong
2b57f673a5 test: (ser)de of Id 2023-10-09 06:10:36 +00:00
duguorong
449319503d chore: add the "serde_assert" crate 2023-10-08 00:31:08 +08:00
John Spray
ea5a97e7b4 pageserver: implement emergency mode for operating without control plane (#5469)
## Problem

Pageservers with `control_plane_api` configured require a control plane
to start up: in an incident this might be a problem.

## Summary of changes

Note to reviewers: most of the code churn in mgr.rs is the refactor
commit that enables the later emergency mode commit: you may want to
review commits separately.

- Add `control_plane_emergency_mode` configuration property
- Refactor init_tenant_mgr to separate loading configurations from the
main loop where we construct Tenant, so that the generations fetch can
peek at the configs in emergency mode.
- During startup, in emergency mode, attach any tenants that were
attached on their last run, using the same generation number.

Closes: #5381 
Closes: https://github.com/neondatabase/neon/issues/5492
2023-10-06 17:25:21 +01:00
John Spray
547914fe19 pageserver: adjust timeline deletion for generations (#5453)
## Problem

Spun off from https://github.com/neondatabase/neon/pull/5449

Timeline deletion does the following:
1. Delete layers referenced in the index
2. Delete everything else in the timeline prefix, except the index
3. Delete the index.

When generations were added, the filter in step 2 got outdated, such
that the index objects were deleted along with everything else at step
2. That didn't really break anything, but it makes an automated test
unhappy and is a violation of the original intent of the code, which
presumably intends to upload an invariant that as long as any objects
for a timeline exist, the index exists.

(Eventually, this index-object-last complexity can go away: when we do
https://github.com/neondatabase/neon/issues/5080, there is no need to
keep the index_part around, as deletions can always be retried any time
any where.)

## Summary of changes

After object listing, split the listed objects into layers and index
objects. Delete the layers first, then the index objects.
2023-10-06 16:15:18 +00:00
Arpad Müller
607b185a49 Fix 1.73.0 clippy lints (#5494)
Doesn't do an upgrade of rustc to 1.73.0 as we want to wait for the
cargo response of the curl CVE before updating. In preparation for an
update, we address the clippy lints that are newly firing in 1.73.0.
2023-10-06 14:17:19 +01:00
Christian Schwarz
bfba5e3aca page_cache: ensure forward progress on miss (#5482)
Problem
=======

Prior to this PR, when we had a cache miss, we'd get back a write guard,
fill it, the drop it and retry the read from cache.

If there's severe contention for the cache, it could happen that the
just-filled data gets evicted before our retry, resulting in lost work
and no forward progress.

Solution
========

This PR leverages the now-available `tokio::sync::RwLockWriteGuard`'s
`downgrade()` functionality to turn the filled slot write guard into a
read guard.
We don't drop the guard at any point, so, forward progress is ensured.


Refs
====

Stacked atop https://github.com/neondatabase/neon/pull/5480 

part of https://github.com/neondatabase/neon/issues/4743
specifically part of https://github.com/neondatabase/neon/issues/5479
2023-10-06 13:41:13 +01:00
Christian Schwarz
ecc7a9567b page_cache: inline {,try_}lock_for_write into memorize_materialized_page (#5480)
Motivation
==========

It's the only user, and the name of `_for_write` is wrong as of

    commit 7a63685cde
    Author: Christian Schwarz <christian@neon.tech>
    Date:   Fri Aug 18 19:31:03 2023 +0200

        simplify page-caching of EphemeralFile (#4994)

Notes
=====

This also allows us to get rid of the WriteBufResult type.

Also rename `search_mapping_for_write` to `search_mapping_exact`. It
makes more sense that way because there is `_for_write`-locking anymore.

Refs
====

part of https://github.com/neondatabase/neon/issues/4743
specifically https://github.com/neondatabase/neon/issues/5479

this is prep work for https://github.com/neondatabase/neon/pull/5482
2023-10-06 13:38:02 +02:00
Joonas Koivunen
45f98dd018 debug_tool: get page at lsn and keyspace via http api (#5057)
If there are any layermap or layer file related problems, having a
reproducable `get_page@lsn` easily usable for fast debugging iteration
is helpful.

Split off from #4938.

Later evolved to add http apis for:
- `get_page@lsn` at
`/v1/tenant/:tenant_id/timeline/:timeline_id/get?key=<hex>&lsn=<lsn
string>`
- collecting the keyspace at
`/v1/tenant/:tenant_id/timeline/:timeline_id/keyspace?[at_lsn=<lsn
string>]`
    - defaults to `last_record_lsn`

collecting the keyspace seems to yield some ranges for which there is no
key.
2023-10-06 12:17:38 +01:00
John Spray
bdfe27f3ac swagger: add a 503 definition to each endpoint (#5476)
## Problem

The control plane doesn't have generic handling for this.

## Summary of changes

Add a 503 response to every endpoint.
2023-10-06 11:31:49 +01:00
Joonas Koivunen
a15f9b3baa pageserver: Tune 503 Resource unavailable (#5489)
503 Resource Unavailable appears as error in logs, but is not really an
error which should ever fail a test on, or even log an error in prod,
[evidence].

Changes:
- log 503 as `info!` level
- use `Cow<'static, str>` instead of `String`
- add an additional `wait_until_tenant_active` in
`test_actually_duplicate_l1`
 
We ought to have in tests "wait for tenants to complete loading" but
this is easier to implement for now.

[evidence]:
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-5485/6423110295/index.html#/testresult/182de66203864fc0
2023-10-06 09:59:14 +01:00
Alexander Bayandin
ce92638185 test_runner: allow race in test_tenant_delete_is_resumed_on_attach (#5478)
## Problem

`test_tenant_delete_is_resumed_on_attach` is flaky

## Summary of changes
- Allow race in `test_tenant_delete_is_resumed_on_attach`
- Cleanup `allowed_errors` in the file a bit
2023-10-06 09:49:31 +01:00
Joonas Koivunen
a3c82f19b8 tests: prettier subprocess output in test log (#5485)
Clean subprocess output so that:
- one line of output is just one line without a linebreak
    - like shells handle `echo subshell says: $(echo foo)`
- multiple lines are indented like other pytest output
- error output is dedented and then indented to be like other pytest
output

Minor readability changes remove friction.
2023-10-05 20:15:55 +00:00
Arthur Petukhovsky
8b15252f98 Move walproposer state into struct (#5364)
This patch extracts all postgres-dependent functions in a separate
`walproposer_api` functions struct. It helps to compile walproposer as
static library without compiling all other postgres server code. This is
useful to allow calling walproposer C code from Rust, or linking this
library with anything else.

All global variables containing walproposer state were extracted to a
separate `WalProposer` struct. This makes it possible to run several
walproposers in the same process, in separate threads.

There were no logic changes and PR mostly consists of shuffling
functions between several files. We have a good test coverage for
walproposer code and I've seen no issues with tests while I was
refactoring it, so I don't expect any issues after merge.

ref https://github.com/neondatabase/neon/issues/547

---------

Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
2023-10-05 18:48:01 +01:00
Alexander Bayandin
522aaca718 Temporary deploy staging preprod region from main (#5477)
## Problem

Stating preprod region can't use `release-XXX` right now, the config is
unified across all regions, it supports only `XXX`.

Ref
https://neondb.slack.com/archives/C03H1K0PGKH/p1696506459720909?thread_ts=1696437812.365249&cid=C03H1K0PGKH

## Summary of changes
- Deploy staging-preprod from main
2023-10-05 14:02:20 +00:00
John Spray
7cbb39063a tests: stabilize + extend deletion queue recovery test (#5457)
## Problem

This test was unstable when run in parallel with lots of others: if the
pageserver stayed up long enough for some of the deletions to get
validated, they won't be discarded on restart the way the test expects
when keep_attachment=True. This was a test bug, not a pageserver bug.

## Summary of changes

- Add failpoints to control plane api client
- Use failpoint to pause validation in the test to cover the case where
it had been flaky
- Add a metric for the number of deleted keys validated
- Add a permutation to the test to additionally exercise the case where
we _do_ validate lists before restart: this is a coverage enhancement
that seemed sensible when realizing that the test was relying on nothing
being validated before restart.
- the test will now always enter the restart with nothing or everything
validated.
2023-10-05 11:22:05 +01:00
John Spray
baa5fa1e77 pageserver: location configuration API, attachment modes, secondary locations (#5299)
## Problem

These changes are part of building seamless tenant migration, as
described in the RFC:
- https://github.com/neondatabase/neon/pull/5029

## Summary of changes

- A new configuration type `LocationConf` supersedes `TenantConfOpt` for
storing a tenant's configuration in the pageserver repo dir. It contains
`TenantConfOpt`, as well as a new `mode` attribute that describes what
kind of location this is (secondary, attached, attachment mode etc). It
is written to a file called `config-v1` instead of `config` -- this
prepares us for neatly making any other profound changes to the format
of the file in future. Forward compat for existing pageserver code is
achieved by writing out both old and new style files. Backward compat is
achieved by checking for the old-style file if the new one isn't found.
- The `TenantMap` type changes, to hold `TenantSlot` instead of just
`Tenant`. The `Tenant` type continues to be used for attached tenants
only. Tenants in other states (such as secondaries) are represented by a
different variant of `TenantSlot`.
- Where `Tenant` & `Timeline` used to hold an Arc<Mutex<TenantConfOpt>>,
they now hold a reference to a AttachedTenantConf, which includes the
extra information from LocationConf. This enables them to know the
current attachment mode.
- The attachment mode is used as an advisory input to decide whether to
do compaction and GC (AttachedStale is meant to avoid doing uploads,
AttachedMulti is meant to avoid doing deletions).
- A new HTTP API is added at `PUT /tenants/<tenant_id>/location_config`
to drive new location configuration. This provides a superset of the
functionality of attach/detach/load/ignore:
  - Attaching a tenant is just configuring it in an attached state
  - Detaching a tenant is configuring it to a detached state
  - Loading a tenant is just the same as attaching it
- Ignoring a tenant is the same as configuring it into Secondary with
warm=false (i.e. retain the files on disk but do nothing else).

Caveats:
- AttachedMulti tenants don't do compaction in this PR, but they do in
the follow on #5397
- Concurrent updates to the `location_config` API are not handled
elegantly in this PR, a better mechanism is added in the follow on
https://github.com/neondatabase/neon/pull/5367
- Secondary mode is just a placeholder in this PR: the code to upload
heatmaps and do downloads on secondary locations will be added in a
later PR (but that shouldn't change any external interfaces)

Closes: https://github.com/neondatabase/neon/issues/5379

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-10-05 09:55:10 +01:00
65 changed files with 4939 additions and 3030 deletions

View File

@@ -1092,8 +1092,10 @@ jobs:
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
# TODO: move deployPreprodRegion to release (`"$GITHUB_REF_NAME" == "release"` block), once Staging support different compute tag prefixes for different regions
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=true
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f disclamerAcknowledged=true
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"

11
Cargo.lock generated
View File

@@ -4006,6 +4006,16 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde_assert"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eda563240c1288b044209be1f0d38bb4d15044fb3e00dc354fbc922ab4733e80"
dependencies = [
"hashbrown 0.13.2",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.183"
@@ -5135,6 +5145,7 @@ dependencies = [
"routerify",
"sentry",
"serde",
"serde_assert",
"serde_json",
"serde_with",
"signal-hook",

View File

@@ -118,6 +118,7 @@ sentry = { version = "0.31", default-features = false, features = ["backtrace",
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "2.0"
serde_assert = "0.5.0"
sha2 = "0.10.2"
signal-hook = "0.3"
smallvec = "1.11"

View File

@@ -2,7 +2,6 @@ use crate::{background_process, local_env::LocalEnv};
use anyhow::anyhow;
use camino::Utf8PathBuf;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::{path::PathBuf, process::Child};
use utils::id::{NodeId, TenantId};
@@ -14,10 +13,8 @@ pub struct AttachmentService {
const COMMAND: &str = "attachment_service";
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct AttachHookRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
pub pageserver_id: Option<NodeId>,
}

View File

@@ -46,7 +46,6 @@ use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::id::{NodeId, TenantId, TimelineId};
use crate::local_env::LocalEnv;
@@ -57,13 +56,10 @@ use compute_api::responses::{ComputeState, ComputeStatus};
use compute_api::spec::{Cluster, ComputeMode, ComputeSpec};
// contents of a endpoint.json file
#[serde_as]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct EndpointConf {
endpoint_id: String,
#[serde_as(as = "DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
timeline_id: TimelineId,
mode: ComputeMode,
pg_port: u16,

View File

@@ -8,7 +8,6 @@ use anyhow::{bail, ensure, Context};
use postgres_backend::AuthType;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use std::env;
use std::fs;
@@ -33,7 +32,6 @@ pub const DEFAULT_PG_VERSION: u32 = 15;
// to 'neon_local init --config=<path>' option. See control_plane/simple.conf for
// an example.
//
#[serde_as]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct LocalEnv {
// Base directory for all the nodes (the pageserver, safekeepers and
@@ -59,7 +57,6 @@ pub struct LocalEnv {
// Default tenant ID to use with the 'neon_local' command line utility, when
// --tenant_id is not explicitly specified.
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub default_tenant_id: Option<TenantId>,
// used to issue tokens during e.g pg start
@@ -84,7 +81,6 @@ pub struct LocalEnv {
// A `HashMap<String, HashMap<TenantId, TimelineId>>` would be more appropriate here,
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
#[serde_as(as = "HashMap<_, Vec<(DisplayFromStr, DisplayFromStr)>>")]
branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
}

View File

@@ -6,7 +6,6 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
@@ -19,7 +18,6 @@ pub type PgIdent = String;
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[serde_as]
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct ComputeSpec {
pub format_version: f32,
@@ -50,12 +48,12 @@ pub struct ComputeSpec {
// these, and instead set the "neon.tenant_id", "neon.timeline_id",
// etc. GUCs in cluster.settings. TODO: Once the control plane has been
// updated to fill these fields, we can make these non optional.
#[serde_as(as = "Option<DisplayFromStr>")]
pub tenant_id: Option<TenantId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub timeline_id: Option<TimelineId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub pageserver_connstring: Option<String>,
#[serde(default)]
pub safekeeper_connstrings: Vec<String>,
@@ -140,14 +138,13 @@ impl RemoteExtSpec {
}
}
#[serde_as]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
pub enum ComputeMode {
/// A read-write node
#[default]
Primary,
/// A read-only node, pinned at a particular LSN
Static(#[serde_as(as = "DisplayFromStr")] Lsn),
Static(Lsn),
/// A read-only node that follows the tip of the branch in hot standby mode
///
/// Future versions may want to distinguish between replicas with hot standby

View File

@@ -4,7 +4,6 @@
//! See docs/rfcs/025-generation-numbers.md
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::id::{NodeId, TenantId};
#[derive(Serialize, Deserialize)]
@@ -12,10 +11,8 @@ pub struct ReAttachRequest {
pub node_id: NodeId,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct ReAttachResponseTenant {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
pub generation: u32,
}
@@ -25,10 +22,8 @@ pub struct ReAttachResponse {
pub tenants: Vec<ReAttachResponseTenant>,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct ValidateRequestTenant {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
pub gen: u32,
}
@@ -43,10 +38,8 @@ pub struct ValidateResponse {
pub tenants: Vec<ValidateResponseTenant>,
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct ValidateResponseTenant {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
pub valid: bool,
}

View File

@@ -6,10 +6,11 @@ use std::{
use byteorder::{BigEndian, ReadBytesExt};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use serde_with::serde_as;
use strum_macros;
use utils::{
completion,
generation::Generation,
history_buffer::HistoryBufferWithDropCounter,
id::{NodeId, TenantId, TimelineId},
lsn::Lsn,
@@ -174,25 +175,19 @@ pub enum TimelineState {
Broken { reason: String, backtrace: String },
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
#[serde_as(as = "DisplayFromStr")]
pub new_timeline_id: TimelineId,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<TimelineId>,
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_start_lsn: Option<Lsn>,
pub pg_version: Option<u32>,
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantCreateRequest {
#[serde_as(as = "DisplayFromStr")]
pub new_tenant_id: TenantId,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
@@ -201,7 +196,6 @@ pub struct TenantCreateRequest {
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
}
#[serde_as]
#[derive(Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantLoadRequest {
@@ -218,6 +212,8 @@ impl std::ops::Deref for TenantCreateRequest {
}
}
/// An alternative representation of `pageserver::tenant::TenantConf` with
/// simpler types.
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct TenantConfig {
pub checkpoint_distance: Option<u64>,
@@ -243,21 +239,59 @@ pub struct TenantConfig {
pub gc_feedback: Option<bool>,
}
#[serde_as]
/// A flattened analog of a `pagesever::tenant::LocationMode`, which
/// lists out all possible states (and the virtual "Detached" state)
/// in a flat form rather than using rust-style enums.
#[derive(Serialize, Deserialize, Debug)]
pub enum LocationConfigMode {
AttachedSingle,
AttachedMulti,
AttachedStale,
Secondary,
Detached,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct LocationConfigSecondary {
pub warm: bool,
}
/// An alternative representation of `pageserver::tenant::LocationConf`,
/// for use in external-facing APIs.
#[derive(Serialize, Deserialize, Debug)]
pub struct LocationConfig {
pub mode: LocationConfigMode,
/// If attaching, in what generation?
#[serde(default)]
pub generation: Option<Generation>,
#[serde(default)]
pub secondary_conf: Option<LocationConfigSecondary>,
// If requesting mode `Secondary`, configuration for that.
// Custom storage configuration for the tenant, if any
pub tenant_conf: TenantConfig,
}
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub TenantId);
pub struct TenantCreateResponse(pub TenantId);
#[derive(Serialize)]
pub struct StatusResponse {
pub id: NodeId,
}
#[serde_as]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantLocationConfigRequest {
pub tenant_id: TenantId,
#[serde(flatten)]
pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantConfigRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde(flatten)]
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
@@ -329,10 +363,8 @@ pub enum TenantAttachmentStatus {
Failed { reason: String },
}
#[serde_as]
#[derive(Serialize, Deserialize, Clone)]
pub struct TenantInfo {
#[serde_as(as = "DisplayFromStr")]
pub id: TenantId,
// NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's
pub state: TenantState,
@@ -343,33 +375,22 @@ pub struct TenantInfo {
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelineInfo {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: TimelineId,
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<TimelineId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub last_record_lsn: Lsn,
#[serde_as(as = "Option<DisplayFromStr>")]
pub prev_record_lsn: Option<Lsn>,
#[serde_as(as = "DisplayFromStr")]
pub latest_gc_cutoff_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,
/// The LSN that we have succesfully uploaded to remote storage
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
/// The LSN that we are advertizing to safekeepers
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn_visible: Lsn,
pub current_logical_size: Option<u64>, // is None when timeline is Unloaded
@@ -381,7 +402,6 @@ pub struct TimelineInfo {
pub timeline_dir_layer_file_size_sum: Option<u64>,
pub wal_source_connstr: Option<String>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub last_received_msg_lsn: Option<Lsn>,
/// the timestamp (in microseconds) of the last received message
pub last_received_msg_ts: Option<u128>,
@@ -478,23 +498,13 @@ pub struct LayerAccessStats {
pub residence_events_history: HistoryBufferWithDropCounter<LayerResidenceEvent, 16>,
}
#[serde_as]
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind")]
pub enum InMemoryLayerInfo {
Open {
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
},
Frozen {
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
#[serde_as(as = "DisplayFromStr")]
lsn_end: Lsn,
},
Open { lsn_start: Lsn },
Frozen { lsn_start: Lsn, lsn_end: Lsn },
}
#[serde_as]
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind")]
pub enum HistoricLayerInfo {
@@ -502,9 +512,7 @@ pub enum HistoricLayerInfo {
layer_file_name: String,
layer_file_size: u64,
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
#[serde_as(as = "DisplayFromStr")]
lsn_end: Lsn,
remote: bool,
access_stats: LayerAccessStats,
@@ -513,7 +521,6 @@ pub enum HistoricLayerInfo {
layer_file_name: String,
layer_file_size: u64,
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
remote: bool,
access_stats: LayerAccessStats,

View File

@@ -1,23 +1,18 @@
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::{
id::{NodeId, TenantId, TimelineId},
lsn::Lsn,
};
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: TimelineId,
pub peer_ids: Option<Vec<NodeId>>,
pub pg_version: u32,
pub system_id: Option<u64>,
pub wal_seg_size: Option<u32>,
#[serde_as(as = "DisplayFromStr")]
pub commit_lsn: Lsn,
// If not passed, it is assigned to the beginning of commit_lsn segment.
pub local_start_lsn: Option<Lsn>,
@@ -28,7 +23,6 @@ fn lsn_invalid() -> Lsn {
}
/// Data about safekeeper's timeline, mirrors broker.proto.
#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SkTimelineInfo {
/// Term.
@@ -36,25 +30,19 @@ pub struct SkTimelineInfo {
/// Term of the last entry.
pub last_log_term: Option<u64>,
/// LSN of the last record.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub flush_lsn: Lsn,
/// Up to which LSN safekeeper regards its WAL as committed.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub commit_lsn: Lsn,
/// LSN up to which safekeeper has backed WAL.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub backup_lsn: Lsn,
/// LSN of last checkpoint uploaded by pageserver.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub remote_consistent_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub peer_horizon_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub local_start_lsn: Lsn,
/// A connection string to use for WAL receiving.

View File

@@ -55,6 +55,7 @@ bytes.workspace = true
criterion.workspace = true
hex-literal.workspace = true
camino-tempfile.workspace = true
serde_assert.workspace = true
[[bench]]
name = "benchmarks"

View File

@@ -9,7 +9,6 @@ use jsonwebtoken::{
decode, encode, Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation,
};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use crate::id::TenantId;
@@ -32,11 +31,9 @@ pub enum Scope {
}
/// JWT payload. See docs/authentication.md for the format
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct Claims {
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub tenant_id: Option<TenantId>,
pub scope: Scope,
}

View File

@@ -1,8 +1,9 @@
use hyper::{header, Body, Response, StatusCode};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::error::Error as StdError;
use thiserror::Error;
use tracing::error;
use tracing::{error, info};
#[derive(Debug, Error)]
pub enum ApiError {
@@ -25,7 +26,7 @@ pub enum ApiError {
PreconditionFailed(Box<str>),
#[error("Resource temporarily unavailable: {0}")]
ResourceUnavailable(String),
ResourceUnavailable(Cow<'static, str>),
#[error("Shutting down")]
ShuttingDown,
@@ -115,10 +116,11 @@ pub async fn route_error_handler(err: routerify::RouteError) -> Response<Body> {
pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
// Print a stack trace for Internal Server errors
if let ApiError::InternalServerError(_) = api_error {
error!("Error processing HTTP request: {api_error:?}");
} else {
error!("Error processing HTTP request: {api_error:#}");
match api_error {
ApiError::ResourceUnavailable(_) => info!("Error processing HTTP request: {api_error:#}"),
ApiError::InternalServerError(_) => error!("Error processing HTTP request: {api_error:?}"),
_ => error!("Error processing HTTP request: {api_error:#}"),
}
api_error.into_response()

View File

@@ -3,6 +3,7 @@ use std::{fmt, str::FromStr};
use anyhow::Context;
use hex::FromHex;
use rand::Rng;
use serde::de::Visitor;
use serde::{Deserialize, Serialize};
use thiserror::Error;
@@ -20,9 +21,92 @@ pub enum IdError {
///
/// Use `#[serde_as(as = "DisplayFromStr")]` to (de)serialize it as hex string instead: `ad50847381e248feaac9876cc71ae418`.
/// Check the `serde_with::serde_as` documentation for options for more complex types.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
struct Id([u8; 16]);
impl Serialize for Id {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
serializer.serialize_newtype_struct("Id", &self.0)
}
}
}
impl<'de> Deserialize<'de> for Id {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct NonHumanReadable;
/// This implementation is from an `#[derive(serde::Serialize)]` expansion
/// which used to be used.
impl<'de> Visitor<'de> for NonHumanReadable {
type Value = Id;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("tuple struct Id")
}
fn visit_newtype_struct<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: serde::Deserializer<'de>,
{
Deserialize::deserialize(deserializer).map(Id)
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
match seq.next_element::<[u8; 16]>()? {
Some(only) => Ok(Id(only)),
None => Err(serde::de::Error::invalid_length(
1,
&"tuple struct Id with 1 element",
)),
}
}
}
struct HumanReadable;
impl<'de> Visitor<'de> for HumanReadable {
type Value = Id;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("hex string of 32 characters")
}
fn visit_newtype_struct<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = Deserialize::deserialize(deserializer)?;
self.visit_str(s)
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Id::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_newtype_struct("Id", HumanReadable)
} else {
deserializer.deserialize_newtype_struct("Id", NonHumanReadable)
}
}
}
impl Id {
pub fn get_from_buf(buf: &mut impl bytes::Buf) -> Id {
let mut arr = [0u8; 16];
@@ -308,3 +392,115 @@ impl fmt::Display for NodeId {
write!(f, "{}", self.0)
}
}
#[cfg(test)]
mod tests {
use serde_assert::{Deserializer, Serializer, Token, Tokens};
use crate::bin_ser::BeSer;
use super::*;
#[test]
fn test_id_serde_non_human_readable() {
let original_id = Id([
173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24,
]);
let expected_tokens = Tokens(vec![
Token::Tuple { len: 16 },
Token::U8(173),
Token::U8(80),
Token::U8(132),
Token::U8(115),
Token::U8(129),
Token::U8(226),
Token::U8(72),
Token::U8(254),
Token::U8(170),
Token::U8(201),
Token::U8(135),
Token::U8(108),
Token::U8(199),
Token::U8(26),
Token::U8(228),
Token::U8(24),
Token::TupleEnd,
]);
let serializer = Serializer::builder().is_human_readable(false).build();
let serialized_tokens = original_id.serialize(&serializer).unwrap();
assert_eq!(serialized_tokens, expected_tokens);
let mut deserializer = Deserializer::builder()
.is_human_readable(false)
.tokens(serialized_tokens)
.build();
let deserialized_id = Id::deserialize(&mut deserializer).unwrap();
assert_eq!(deserialized_id, original_id);
}
#[test]
fn test_id_serde_human_readable() {
let original_id = Id([
173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24,
]);
let expected_tokens = Tokens(vec![Token::Str(String::from(
"ad50847381e248feaac9876cc71ae418",
))]);
let serializer = Serializer::builder().is_human_readable(true).build();
let serialized_tokens = original_id.serialize(&serializer).unwrap();
assert_eq!(serialized_tokens, expected_tokens);
let mut deserializer = Deserializer::builder()
.is_human_readable(true)
// .self_describing(true)
.tokens(Tokens(vec![Token::Str(String::from(
"ad50847381e248feaac9876cc71ae418",
))]))
.build();
assert_eq!(Id::deserialize(&mut deserializer).unwrap(), original_id);
}
/*
macro_rules! roundtrip_type {
($type:ty, $expected_bytes:expr) => {{
let expected_bytes: [u8; 16] = $expected_bytes;
let original_id = <$type>::from(expected_bytes);
let ser_bytes = original_id.ser().unwrap();
assert_eq!(ser_bytes, expected_bytes);
let des_id = <$type>::des(&ser_bytes).unwrap();
assert_eq!(des_id, original_id);
}};
}
#[test]
fn test_id_bincode_serde() {
let expected_bytes = [
173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24,
];
roundtrip_type!(Id, expected_bytes);
}
#[test]
fn test_tenant_id_bincode_serde() {
let expected_bytes = [
173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24,
];
roundtrip_type!(TenantId, expected_bytes);
}
#[test]
fn test_timeline_id_bincode_serde() {
let expected_bytes = [
173, 80, 132, 115, 129, 226, 72, 254, 170, 201, 135, 108, 199, 26, 228, 24,
];
roundtrip_type!(TimelineId, expected_bytes);
}
*/
}

View File

@@ -1,7 +1,7 @@
#![warn(missing_docs)]
use camino::Utf8Path;
use serde::{Deserialize, Serialize};
use serde::{de::Visitor, Deserialize, Serialize};
use std::fmt;
use std::ops::{Add, AddAssign};
use std::str::FromStr;
@@ -13,10 +13,98 @@ use crate::seqwait::MonotonicCounter;
pub const XLOG_BLCKSZ: u32 = 8192;
/// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Serialize, Deserialize)]
#[serde(transparent)]
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash)]
pub struct Lsn(pub u64);
impl Serialize for Lsn {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
self.0.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for Lsn {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct LsnVisitor;
impl<'de> Visitor<'de> for LsnVisitor {
type Value = Lsn;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("LSN as either split hex string or plain unsigned integer")
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Lsn(v))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Lsn::from_str(v).map_err(|e| E::custom(e))
}
}
deserializer.deserialize_any(LsnVisitor)
}
}
/// Allows (de)serialization of an `Lsn` always as `u64`.
///
/// ### Example
///
/// ```rust
/// # use serde::{Serialize, Deserialize};
/// use utils::lsn::Lsn;
///
/// #[derive(Partialeq, Serialize, Deserialize)]
/// struct Foo {
/// #[serde(with = "utils::lsn::as_u64")]
/// always_u64: Lsn,
/// }
///
/// let orig = Foo { always_u64: Lsn(1234) };
///
/// let res = serde_json::to_string(&).unwrap();
/// assert_eq!(res, r#"{"always_u64": 1234}"#);
///
/// let foo = serde_json::from_str::<Foo>(&res).unwrap();
/// assert_eq!(res, orig);
/// ```
///
pub mod serde_as_u64 {
use super::Lsn;
/// Serializes the Lsn as u64 disregarding the human readability of the format.
///
/// Meant to be used via `#[serde(with = "...")]` or `#[serde(serialize_with = "...")]`.
pub fn serialize<S: serde::Serializer>(lsn: &Lsn, serializer: S) -> Result<S::Ok, S::Error> {
use serde::Serialize;
lsn.0.serialize(serializer)
}
/// Deserializes the Lsn as u64 disregarding the human readability of the format.
///
/// Meant to be used via `#[serde(with = "...")]` or `#[serde(deserialize_with = "...")]`.
pub fn deserialize<'de, D: serde::Deserializer<'de>>(deserializer: D) -> Result<Lsn, D::Error> {
use serde::Deserialize;
u64::deserialize(deserializer).map(Lsn)
}
}
/// We tried to parse an LSN from a string, but failed
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
#[error("LsnParseError")]
@@ -264,8 +352,13 @@ impl MonotonicCounter<Lsn> for RecordLsn {
#[cfg(test)]
mod tests {
use crate::bin_ser::BeSer;
use super::*;
use serde::ser::Serialize;
use serde_assert::{Deserializer, Serializer, Token, Tokens};
#[test]
fn test_lsn_strings() {
assert_eq!("12345678/AAAA5555".parse(), Ok(Lsn(0x12345678AAAA5555)));
@@ -341,4 +434,78 @@ mod tests {
assert_eq!(lsn.fetch_max(Lsn(6000)), Lsn(5678));
assert_eq!(lsn.fetch_max(Lsn(5000)), Lsn(6000));
}
#[test]
fn lsn_serde_tokens_humanreadable() {
// Serializer::is_human_readable is for example json
let original_lsn = Lsn(0x0123456789abcdef);
let expected_readable_tokens = Tokens(vec![Token::Str(String::from("1234567/89ABCDEF"))]);
let serializer = Serializer::builder().is_human_readable(true).build();
let readable_ser_tokens = original_lsn.serialize(&serializer).unwrap();
assert_eq!(readable_ser_tokens, expected_readable_tokens);
let mut deserializer = Deserializer::builder()
.is_human_readable(true)
.tokens(readable_ser_tokens)
.build();
let des_lsn = Lsn::deserialize(&mut deserializer).unwrap();
assert_eq!(des_lsn, original_lsn);
}
#[test]
fn lsn_serde_tokens_nonhumanreadable() {
// !Serializer::is_human_readable is for example bincode
let original_lsn = Lsn(0x0123456789abcdef);
let expected_non_readable_tokens = Tokens(vec![Token::U64(0x0123456789abcdef)]);
let serializer = Serializer::builder().is_human_readable(false).build();
let non_readable_ser_tokens = original_lsn.serialize(&serializer).unwrap();
assert_eq!(non_readable_ser_tokens, expected_non_readable_tokens);
let mut deserializer = Deserializer::builder()
.is_human_readable(false)
.tokens(non_readable_ser_tokens)
.build();
let des_lsn = Lsn::deserialize(&mut deserializer).unwrap();
assert_eq!(des_lsn, original_lsn);
}
#[test]
fn non_human_readable_does_not_accept_string() {
let original_lsn = Lsn(0x0123456789abcdef);
let serializer = Serializer::builder().is_human_readable(false).build();
let non_readable_ser_tokens = original_lsn.serialize(&serializer).unwrap();
let mut deserializer = Deserializer::builder()
.is_human_readable(true)
.tokens(non_readable_ser_tokens)
.build();
Lsn::deserialize(&mut deserializer).unwrap_err();
}
#[test]
fn human_readable_does_not_accept_u64() {
let original_lsn = Lsn(0x0123456789abcdef);
let serializer = Serializer::builder().is_human_readable(true).build();
let readable_ser_tokens = original_lsn.serialize(&serializer).unwrap();
let mut deserializer = Deserializer::builder()
.is_human_readable(false)
.tokens(readable_ser_tokens)
.build();
Lsn::deserialize(&mut deserializer).unwrap_err();
}
#[test]
fn test_lsn_bincode_roundtrips() {
let lsn = Lsn(0x0123456789abcdef);
let expected_bytes = [0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef];
let ser_bytes = lsn.ser().unwrap();
assert_eq!(ser_bytes, expected_bytes);
let des_lsn = Lsn::des(&ser_bytes).unwrap();
assert_eq!(des_lsn, lsn);
}
}

View File

@@ -3,7 +3,6 @@ use std::time::{Duration, SystemTime};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use pq_proto::{read_cstr, PG_EPOCH};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use tracing::{trace, warn};
use crate::lsn::Lsn;
@@ -15,21 +14,17 @@ use crate::lsn::Lsn;
///
/// serde Serialize is used only for human readable dump to json (e.g. in
/// safekeepers debug_dump).
#[serde_as]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct PageserverFeedback {
/// Last known size of the timeline. Used to enforce timeline size limit.
pub current_timeline_size: u64,
/// LSN last received and ingested by the pageserver. Controls backpressure.
#[serde_as(as = "DisplayFromStr")]
pub last_received_lsn: Lsn,
/// LSN up to which data is persisted by the pageserver to its local disc.
/// Controls backpressure.
#[serde_as(as = "DisplayFromStr")]
pub disk_consistent_lsn: Lsn,
/// LSN up to which data is persisted by the pageserver on s3; safekeepers
/// consider WAL before it can be removed.
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
// Serialize with RFC3339 format.
#[serde(with = "serde_systemtime")]

View File

@@ -58,7 +58,7 @@ where
// to get that.
impl<T: Ord> PartialOrd for Waiter<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
other.wake_num.partial_cmp(&self.wake_num)
Some(self.cmp(other))
}
}

View File

@@ -37,8 +37,8 @@ use crate::tenant::{
TIMELINES_SEGMENT_NAME,
};
use crate::{
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX,
TIMELINE_UNINIT_MARK_SUFFIX,
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_LOCATION_CONFIG_NAME,
TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX,
};
pub mod defaults {
@@ -211,6 +211,10 @@ pub struct PageServerConf {
/// JWT token for use with the control plane API.
pub control_plane_api_token: Option<SecretString>,
/// If true, pageserver will make best-effort to operate without a control plane: only
/// for use in major incidents.
pub control_plane_emergency_mode: bool,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -288,6 +292,7 @@ struct PageServerConfigBuilder {
control_plane_api: BuilderValue<Option<Url>>,
control_plane_api_token: BuilderValue<Option<SecretString>>,
control_plane_emergency_mode: BuilderValue<bool>,
}
impl Default for PageServerConfigBuilder {
@@ -355,6 +360,7 @@ impl Default for PageServerConfigBuilder {
control_plane_api: Set(None),
control_plane_api_token: Set(None),
control_plane_emergency_mode: Set(false),
}
}
}
@@ -491,6 +497,10 @@ impl PageServerConfigBuilder {
self.control_plane_api_token = BuilderValue::Set(token)
}
pub fn control_plane_emergency_mode(&mut self, enabled: bool) {
self.control_plane_emergency_mode = BuilderValue::Set(enabled)
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_size_logical_size_queries = self
.concurrent_tenant_size_logical_size_queries
@@ -582,6 +592,9 @@ impl PageServerConfigBuilder {
control_plane_api_token: self
.control_plane_api_token
.ok_or(anyhow!("missing control_plane_api_token"))?,
control_plane_emergency_mode: self
.control_plane_emergency_mode
.ok_or(anyhow!("missing control_plane_emergency_mode"))?,
})
}
}
@@ -631,10 +644,18 @@ impl PageServerConf {
/// Points to a place in pageserver's local directory,
/// where certain tenant's tenantconf file should be located.
///
/// Legacy: superseded by tenant_location_config_path. Eventually
/// remove this function.
pub fn tenant_config_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id).join(TENANT_CONFIG_NAME)
}
pub fn tenant_location_config_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id)
.join(TENANT_LOCATION_CONFIG_NAME)
}
pub fn timelines_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.tenant_path(tenant_id).join(TIMELINES_SEGMENT_NAME)
}
@@ -799,6 +820,10 @@ impl PageServerConf {
builder.control_plane_api_token(Some(parsed.into()))
}
},
"control_plane_emergency_mode" => {
builder.control_plane_emergency_mode(parse_toml_bool(key, item)?)
},
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -968,6 +993,7 @@ impl PageServerConf {
background_task_maximum_delay: Duration::ZERO,
control_plane_api: None,
control_plane_api_token: None,
control_plane_emergency_mode: false,
}
}
}
@@ -1191,7 +1217,8 @@ background_task_maximum_delay = '334 s'
defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY
)?,
control_plane_api: None,
control_plane_api_token: None
control_plane_api_token: None,
control_plane_emergency_mode: false
},
"Correct defaults should be used when no config values are provided"
);
@@ -1247,7 +1274,8 @@ background_task_maximum_delay = '334 s'
ondemand_download_behavior_treat_error_as_warn: false,
background_task_maximum_delay: Duration::from_secs(334),
control_plane_api: None,
control_plane_api_token: None
control_plane_api_token: None,
control_plane_emergency_mode: false
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -3,7 +3,6 @@ use anyhow::Context;
use chrono::{DateTime, Utc};
use consumption_metrics::EventType;
use futures::stream::StreamExt;
use serde_with::serde_as;
use std::{sync::Arc, time::SystemTime};
use utils::{
id::{TenantId, TimelineId},
@@ -42,13 +41,10 @@ pub(super) enum Name {
///
/// This is a denormalization done at the MetricsKey const methods; these should not be constructed
/// elsewhere.
#[serde_with::serde_as]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub(crate) struct MetricsKey {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub(super) tenant_id: TenantId,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) timeline_id: Option<TimelineId>,

View File

@@ -1,5 +1,4 @@
use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
use serde_with::serde_as;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
@@ -7,12 +6,9 @@ use super::{metrics::Name, Cache, MetricsKey, RawMetric};
use utils::id::{TenantId, TimelineId};
/// How the metrics from pageserver are identified.
#[serde_with::serde_as]
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
struct Ids {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub(super) tenant_id: TenantId,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) timeline_id: Option<TimelineId>,
}

View File

@@ -133,6 +133,8 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
node_id: self.node_id,
};
fail::fail_point!("control-plane-client-re-attach");
let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
tracing::info!(
"Received re-attach response with {} tenants",
@@ -168,6 +170,8 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
.collect(),
};
fail::fail_point!("control-plane-client-validate");
let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
Ok(response

View File

@@ -17,7 +17,6 @@ use hex::FromHex;
use remote_storage::{GenericRemoteStorage, RemotePath};
use serde::Deserialize;
use serde::Serialize;
use serde_with::serde_as;
use thiserror::Error;
use tokio;
use tokio_util::sync::CancellationToken;
@@ -40,7 +39,6 @@ use validator::ValidatorQueueMessage;
use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
// TODO: adminstrative "panic button" config property to disable all deletions
// TODO: configurable for how long to wait before executing deletions
/// We aggregate object deletions from many tenants in one place, for several reasons:
@@ -186,7 +184,7 @@ where
V: Serialize,
I: AsRef<[u8]>,
{
let transformed = input.iter().map(|(k, v)| (hex::encode(k), v.clone()));
let transformed = input.iter().map(|(k, v)| (hex::encode(k), v));
transformed
.collect::<HashMap<String, &V>>()
@@ -215,7 +213,6 @@ where
/// during recovery as startup.
const TEMP_SUFFIX: &str = ".tmp";
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
struct DeletionList {
/// Serialization version, for future use
@@ -244,7 +241,6 @@ struct DeletionList {
validated: bool,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
struct DeletionHeader {
/// Serialization version, for future use
@@ -325,10 +321,7 @@ impl DeletionList {
return false;
}
let timeline_entry = tenant_entry
.timelines
.entry(*timeline)
.or_insert_with(Vec::new);
let timeline_entry = tenant_entry.timelines.entry(*timeline).or_default();
let timeline_remote_path = remote_timeline_path(tenant, timeline);

View File

@@ -220,6 +220,8 @@ where
warn!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
metrics::DELETION_QUEUE.keys_dropped.inc_by(tenant.len() as u64);
mutated = true;
} else {
metrics::DELETION_QUEUE.keys_validated.inc_by(tenant.len() as u64);
}
this_list_valid
});

View File

@@ -93,9 +93,16 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
delete:
description: |
Attempts to delete specified tenant. 500 and 409 errors should be retried until 404 is retrieved.
Attempts to delete specified tenant. 500, 503 and 409 errors should be retried until 404 is retrieved.
404 means that deletion successfully finished"
responses:
"400":
@@ -134,6 +141,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/timeline:
parameters:
@@ -178,6 +192,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/timeline/{timeline_id}:
parameters:
@@ -226,6 +247,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
delete:
description: "Attempts to delete specified timeline. 500 and 409 errors should be retried"
responses:
@@ -265,13 +293,19 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/PreconditionFailedError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp:
parameters:
@@ -328,6 +362,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/timeline/{timeline_id}/do_gc:
parameters:
- name: tenant_id
@@ -375,6 +416,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/attach:
parameters:
- name: tenant_id
@@ -465,6 +513,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/detach:
parameters:
@@ -518,6 +573,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/ignore:
parameters:
@@ -560,6 +622,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/load:
parameters:
@@ -604,6 +673,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/synthetic_size:
parameters:
@@ -641,6 +717,12 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/size:
parameters:
@@ -704,6 +786,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/timeline/:
parameters:
@@ -780,6 +869,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/:
get:
description: Get tenants list
@@ -810,6 +906,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
post:
description: |
Create a tenant. Returns new tenant id on success.
@@ -860,6 +963,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/config:
put:
@@ -905,6 +1015,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant/{tenant_id}/config/:
parameters:
- name: tenant_id
@@ -954,6 +1071,13 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
"503":
description: Temporarily unavailable, please retry.
content:
application/json:
schema:
$ref: "#/components/schemas/ServiceUnavailableError"
components:
securitySchemes:
JWT:
@@ -1220,6 +1344,13 @@ components:
properties:
msg:
type: string
ServiceUnavailableError:
type: object
required:
- msg
properties:
msg:
type: string
NotFoundError:
type: object
required:

View File

@@ -6,11 +6,13 @@ use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use futures::TryFutureExt;
use hyper::header::CONTENT_TYPE;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, TenantAttachRequest, TenantLoadRequest,
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
};
use remote_storage::GenericRemoteStorage;
use tenant_size_model::{SizeResult, StorageModel};
@@ -29,7 +31,7 @@ use crate::deletion_queue::DeletionQueueClient;
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::config::{LocationConf, TenantConfOpt};
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
};
@@ -132,7 +134,7 @@ impl From<PageReconstructError> for ApiError {
ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
}
PageReconstructError::AncestorStopping(_) => {
ApiError::ResourceUnavailable(format!("{pre}"))
ApiError::ResourceUnavailable(format!("{pre}").into())
}
PageReconstructError::WalRedo(pre) => {
ApiError::InternalServerError(anyhow::Error::new(pre))
@@ -145,12 +147,15 @@ impl From<TenantMapInsertError> for ApiError {
fn from(tmie: TenantMapInsertError) -> ApiError {
match tmie {
TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
ApiError::ResourceUnavailable(format!("{tmie}"))
ApiError::ResourceUnavailable(format!("{tmie}").into())
}
TenantMapInsertError::TenantAlreadyExists(id, state) => {
ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
}
TenantMapInsertError::Closure(e) => ApiError::InternalServerError(e),
TenantMapInsertError::TenantExistsSecondary(id) => {
ApiError::Conflict(format!("tenant {id} already exists as secondary"))
}
TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
}
}
}
@@ -632,7 +637,7 @@ async fn tenant_list_handler(
.instrument(info_span!("tenant_list"))
.await
.map_err(|_| {
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".to_string())
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
})?
.iter()
.map(|(id, state)| TenantInfo {
@@ -748,10 +753,8 @@ async fn tenant_size_handler(
}
/// The type resides in the pageserver not to expose `ModelInputs`.
#[serde_with::serde_as]
#[derive(serde::Serialize)]
struct TenantHistorySize {
#[serde_as(as = "serde_with::DisplayFromStr")]
id: TenantId,
/// Size is a mixture of WAL and logical size, so the unit is bytes.
///
@@ -1011,6 +1014,48 @@ async fn update_tenant_config_handler(
json_response(StatusCode::OK, ())
}
async fn put_tenant_location_config_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
let conf = state.conf;
// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {
mgr::detach_tenant(conf, tenant_id, true)
.instrument(info_span!("tenant_detach", %tenant_id))
.await?;
return json_response(StatusCode::OK, ());
}
let location_conf =
LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
mgr::upsert_location(
state.conf,
tenant_id,
location_conf,
state.broker_client.clone(),
state.remote_storage.clone(),
state.deletion_queue_client.clone(),
&ctx,
)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
// which is not a 400 but a 409.
.map_err(ApiError::BadRequest)?;
json_response(StatusCode::OK, ())
}
/// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
async fn handle_tenant_break(
r: Request<Body>,
@@ -1190,6 +1235,136 @@ async fn deletion_queue_flush(
}
}
/// Try if `GetPage@Lsn` is successful, useful for manual debugging.
async fn getpage_at_lsn_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
struct Key(crate::repository::Key);
impl std::str::FromStr for Key {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
crate::repository::Key::from_hex(s).map(Key)
}
}
let key: Key = parse_query_param(&request, "key")?
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'key' query parameter")))?;
let lsn: Lsn = parse_query_param(&request, "lsn")?
.ok_or_else(|| ApiError::BadRequest(anyhow!("missing 'lsn' query parameter")))?;
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let page = timeline.get(key.0, lsn, &ctx).await?;
Result::<_, ApiError>::Ok(
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "application/octet-stream")
.body(hyper::Body::from(page))
.unwrap(),
)
}
.instrument(info_span!("timeline_get", %tenant_id, %timeline_id))
.await
}
async fn timeline_collect_keyspace(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
struct Partitioning {
keys: crate::keyspace::KeySpace,
at_lsn: Lsn,
}
impl serde::Serialize for Partitioning {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_key("keys")?;
map.serialize_value(&KeySpace(&self.keys))?;
map.serialize_key("at_lsn")?;
map.serialize_value(&WithDisplay(&self.at_lsn))?;
map.end()
}
}
struct WithDisplay<'a, T>(&'a T);
impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_str(&self.0)
}
}
struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
impl<'a> serde::Serialize for KeySpace<'a> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeSeq;
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
for kr in &self.0.ranges {
seq.serialize_element(&KeyRange(kr))?;
}
seq.end()
}
}
struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
impl<'a> serde::Serialize for KeyRange<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeTuple;
let mut t = serializer.serialize_tuple(2)?;
t.serialize_element(&WithDisplay(&self.0.start))?;
t.serialize_element(&WithDisplay(&self.0.end))?;
t.end()
}
}
let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn());
let keys = timeline
.collect_keyspace(at_lsn, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, Partitioning { keys, at_lsn })
}
.instrument(info_span!("timeline_collect_keyspace", %tenant_id, %timeline_id))
.await
}
async fn active_timeline_of_active_tenant(
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -1464,6 +1639,9 @@ pub fn make_router(
.get("/v1/tenant/:tenant_id/config", |r| {
api_handler(r, get_tenant_config_handler)
})
.put("/v1/tenant/:tenant_id/location_config", |r| {
api_handler(r, put_tenant_location_config_handler)
})
.get("/v1/tenant/:tenant_id/timeline", |r| {
api_handler(r, timeline_list_handler)
})
@@ -1534,5 +1712,12 @@ pub fn make_router(
.post("/v1/tracing/event", |r| {
testing_api_handler("emit a tracing event", r, post_tracing_event_handler)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/getpage", |r| {
testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/keyspace",
|r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
)
.any(handler_404))
}

View File

@@ -112,6 +112,10 @@ pub const METADATA_FILE_NAME: &str = "metadata";
/// Full path: `tenants/<tenant_id>/config`.
pub const TENANT_CONFIG_NAME: &str = "config";
/// Per-tenant configuration file.
/// Full path: `tenants/<tenant_id>/config`.
pub const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
/// A suffix used for various temporary files. Any temporary files found in the
/// data directory at pageserver startup can be automatically removed.
pub const TEMP_FILE_SUFFIX: &str = "___temp";

View File

@@ -691,10 +691,9 @@ impl StorageIoTime {
.expect("failed to define a metric");
let metrics = std::array::from_fn(|i| {
let op = StorageIoOperation::from_repr(i).unwrap();
let metric = storage_io_histogram_vec
storage_io_histogram_vec
.get_metric_with_label_values(&[op.as_str()])
.unwrap();
metric
.unwrap()
});
Self { metrics }
}
@@ -967,6 +966,7 @@ pub(crate) struct DeletionQueueMetrics {
pub(crate) keys_submitted: IntCounter,
pub(crate) keys_dropped: IntCounter,
pub(crate) keys_executed: IntCounter,
pub(crate) keys_validated: IntCounter,
pub(crate) dropped_lsn_updates: IntCounter,
pub(crate) unexpected_errors: IntCounter,
pub(crate) remote_errors: IntCounterVec,
@@ -988,7 +988,13 @@ pub(crate) static DELETION_QUEUE: Lazy<DeletionQueueMetrics> = Lazy::new(|| {
keys_executed: register_int_counter!(
"pageserver_deletion_queue_executed_total",
"Number of objects deleted. Only includes objects that we actually deleted, sum with pageserver_deletion_queue_dropped_total for the total number of keys processed."
"Number of objects deleted. Only includes objects that we actually deleted, sum with pageserver_deletion_queue_dropped_total for the total number of keys processed to completion"
)
.expect("failed to define a metric"),
keys_validated: register_int_counter!(
"pageserver_deletion_queue_validated_total",
"Number of keys validated for deletion. Sum with pageserver_deletion_queue_dropped_total for the total number of keys that have passed through the validation stage."
)
.expect("failed to define a metric"),

View File

@@ -66,8 +66,7 @@
//! inserted to the mapping, but you must hold the write-lock on the slot until
//! the contents are valid. If you need to release the lock without initializing
//! the contents, you must remove the mapping first. We make that easy for the
//! callers with PageWriteGuard: when lock_for_write() returns an uninitialized
//! page, the caller must explicitly call guard.mark_valid() after it has
//! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
//! initialized it. If the guard is dropped without calling mark_valid(), the
//! mapping is automatically removed and the slot is marked free.
//!
@@ -286,23 +285,25 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
///
/// Counterintuitively, this is used even for a read, if the requested page is not
/// currently found in the page cache. In that case, the caller of lock_for_read()
/// is expected to fill in the page contents and call mark_valid(). Similarly
/// lock_for_write() can return an invalid buffer that the caller is expected to
/// to initialize.
///
/// is expected to fill in the page contents and call mark_valid().
pub struct PageWriteGuard<'i> {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
state: PageWriteGuardState<'i>,
}
_permit: PinnedSlotsPermit,
// Are the page contents currently valid?
// Used to mark pages as invalid that are assigned but not yet filled with data.
valid: bool,
enum PageWriteGuardState<'i> {
Invalid {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
_permit: PinnedSlotsPermit,
},
Downgraded,
}
impl std::ops::DerefMut for PageWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.buf
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
@@ -310,25 +311,37 @@ impl std::ops::Deref for PageWriteGuard<'_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.inner.buf
match &self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
self.inner.buf
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
impl PageWriteGuard<'_> {
impl<'a> PageWriteGuard<'a> {
/// Mark that the buffer contents are now valid.
pub fn mark_valid(&mut self) {
assert!(self.inner.key.is_some());
assert!(
!self.valid,
"mark_valid called on a buffer that was already valid"
);
self.valid = true;
#[must_use]
pub fn mark_valid(mut self) -> PageReadGuard<'a> {
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
match prev {
PageWriteGuardState::Invalid { inner, _permit } => {
assert!(inner.key.is_some());
PageReadGuard {
_permit: Arc::new(_permit),
slot_guard: inner.downgrade(),
}
}
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
@@ -339,11 +352,14 @@ impl Drop for PageWriteGuard<'_> {
/// initializing it, remove the mapping from the page cache.
///
fn drop(&mut self) {
assert!(self.inner.key.is_some());
if !self.valid {
let self_key = self.inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
self.inner.key = None;
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => {
assert!(inner.key.is_some());
let self_key = inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
inner.key = None;
}
PageWriteGuardState::Downgraded => {}
}
}
}
@@ -354,12 +370,6 @@ pub enum ReadBufResult<'a> {
NotFound(PageWriteGuard<'a>),
}
/// lock_for_write() return value
pub enum WriteBufResult<'a> {
Found(PageWriteGuard<'a>),
NotFound(PageWriteGuard<'a>),
}
impl PageCache {
//
// Section 1.1: Public interface functions for looking up and memorizing materialized page
@@ -446,20 +456,77 @@ impl PageCache {
lsn,
};
match self.lock_for_write(&cache_key).await? {
WriteBufResult::Found(write_guard) => {
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(*write_guard == img);
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
loop {
// First check if the key already exists in the cache.
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().await;
if inner.key.as_ref() == Some(&cache_key) {
slot.inc_usage_count();
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
debug_assert_eq!(inner.buf.len(), img.len());
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(inner.buf == img);
return Ok(());
}
}
WriteBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(img);
write_guard.mark_valid();
}
}
debug_assert!(permit.is_some());
Ok(())
// Not found. Find a victim buffer
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;
// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
// our victim buffer unnecessarily. Put it into the free list and
// continue with the slot that the other thread chose.
if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
// TODO: put to free list
// We now just loop back to start from beginning. This is not
// optimal, we'll perform the lookup in the mapping again, which
// is not really necessary because we already got
// 'existing_slot_idx'. But this shouldn't happen often enough
// to matter much.
continue;
}
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.set_usage_count(1);
// Create a write guard for the slot so we go through the expected motions.
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
let mut write_guard = PageWriteGuard {
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
};
write_guard.copy_from_slice(img);
let _ = write_guard.mark_valid();
return Ok(());
}
}
// Section 1.2: Public interface functions for working with immutable file pages.
@@ -638,99 +705,10 @@ impl PageCache {
);
return Ok(ReadBufResult::NotFound(PageWriteGuard {
_permit: permit.take().unwrap(),
inner,
valid: false,
}));
}
}
/// Look up a page in the cache and lock it in write mode. If it's not
/// found, returns None.
///
/// When locking a page for writing, the search criteria is always "exact".
async fn try_lock_for_write(
&self,
cache_key: &CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageWriteGuard> {
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
return Some(PageWriteGuard {
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
valid: true,
});
}
}
None
}
/// Return a write-locked buffer for given block.
///
/// Similar to lock_for_read(), but the returned buffer is write-locked and
/// may be modified by the caller even if it's already found in the cache.
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
loop {
// First check if the key already exists in the cache.
if let Some(write_guard) = self.try_lock_for_write(cache_key, &mut permit).await {
debug_assert!(permit.is_none());
return Ok(WriteBufResult::Found(write_guard));
}
debug_assert!(permit.is_some());
// Not found. Find a victim buffer
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;
// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
// our victim buffer unnecessarily. Put it into the free list and
// continue with the slot that the other thread chose.
if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
// TODO: put to free list
// We now just loop back to start from beginning. This is not
// optimal, we'll perform the lookup in the mapping again, which
// is not really necessary because we already got
// 'existing_slot_idx'. But this shouldn't happen often enough
// to matter much.
continue;
}
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.set_usage_count(1);
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
return Ok(WriteBufResult::NotFound(PageWriteGuard {
_permit: permit.take().unwrap(),
inner,
valid: false,
}));
}
}
@@ -775,7 +753,7 @@ impl PageCache {
///
/// Like 'search_mapping, but performs an "exact" search. Used for
/// allocating a new buffer.
fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
match key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();

View File

@@ -44,6 +44,8 @@ use std::sync::MutexGuard;
use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
use self::config::AttachedLocationConfig;
use self::config::LocationConf;
use self::config::TenantConf;
use self::delete::DeleteTenantFlow;
use self::metadata::LoadMetadataError;
@@ -64,6 +66,7 @@ use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC, TENANT_SYNTHETI
use crate::repository::GcResult;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::config::LocationMode;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::metadata::load_metadata;
pub use crate::tenant::remote_timeline_client::index::IndexPart;
@@ -160,6 +163,28 @@ pub struct TenantSharedResources {
pub deletion_queue_client: DeletionQueueClient,
}
/// A [`Tenant`] is really an _attached_ tenant. The configuration
/// for an attached tenant is a subset of the [`LocationConf`], represented
/// in this struct.
pub(super) struct AttachedTenantConf {
tenant_conf: TenantConfOpt,
location: AttachedLocationConfig,
}
impl AttachedTenantConf {
fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
match &location_conf.mode {
LocationMode::Attached(attach_conf) => Ok(Self {
tenant_conf: location_conf.tenant_conf,
location: attach_conf.clone(),
}),
LocationMode::Secondary(_) => {
anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
}
}
}
}
///
/// Tenant consists of multiple timelines. Keep them in a hash table.
///
@@ -177,12 +202,15 @@ pub struct Tenant {
// We keep TenantConfOpt sturct here to preserve the information
// about parameters that are not set.
// This is necessary to allow global config updates.
tenant_conf: Arc<RwLock<TenantConfOpt>>,
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
tenant_id: TenantId,
/// The remote storage generation, used to protect S3 objects from split-brain.
/// Does not change over the lifetime of the [`Tenant`] object.
///
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
generation: Generation,
timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
@@ -526,14 +554,13 @@ impl Tenant {
pub(crate) fn spawn_attach(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
resources: TenantSharedResources,
attached_conf: AttachedTenantConf,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
// TODO dedup with spawn_load
let tenant_conf =
Self::load_tenant_config(conf, &tenant_id).context("load tenant config")?;
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let TenantSharedResources {
broker_client,
@@ -541,14 +568,12 @@ impl Tenant {
deletion_queue_client,
} = resources;
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let tenant = Arc::new(Tenant::new(
TenantState::Attaching,
conf,
tenant_conf,
attached_conf,
wal_redo_manager,
tenant_id,
generation,
remote_storage.clone(),
deletion_queue_client,
));
@@ -859,10 +884,9 @@ impl Tenant {
backtrace: String::new(),
},
conf,
TenantConfOpt::default(),
AttachedTenantConf::try_from(LocationConf::default()).unwrap(),
wal_redo_manager,
tenant_id,
Generation::broken(),
None,
DeletionQueueClient::broken(),
))
@@ -881,7 +905,7 @@ impl Tenant {
pub(crate) fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
generation: Generation,
attached_conf: AttachedTenantConf,
resources: TenantSharedResources,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
@@ -889,14 +913,6 @@ impl Tenant {
) -> Arc<Tenant> {
span::debug_assert_current_span_has_tenant_id();
let tenant_conf = match Self::load_tenant_config(conf, &tenant_id) {
Ok(conf) => conf,
Err(e) => {
error!("load tenant config failed: {:?}", e);
return Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"));
}
};
let broker_client = resources.broker_client;
let remote_storage = resources.remote_storage;
@@ -904,10 +920,9 @@ impl Tenant {
let tenant = Tenant::new(
TenantState::Loading,
conf,
tenant_conf,
attached_conf,
wal_redo_manager,
tenant_id,
generation,
remote_storage.clone(),
resources.deletion_queue_client.clone(),
);
@@ -1646,6 +1661,15 @@ impl Tenant {
"Cannot run GC iteration on inactive tenant"
);
{
let conf = self.tenant_conf.read().unwrap();
if !conf.location.may_delete_layers_hint() {
info!("Skipping GC in location state {:?}", conf.location);
return Ok(GcResult::default());
}
}
self.gc_iteration_internal(target_timeline_id, horizon, pitr, ctx)
.await
}
@@ -1664,6 +1688,14 @@ impl Tenant {
"Cannot run compaction iteration on inactive tenant"
);
{
let conf = self.tenant_conf.read().unwrap();
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
info!("Skipping compaction in location state {:?}", conf.location);
return Ok(());
}
}
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
@@ -2089,7 +2121,7 @@ where
impl Tenant {
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
*self.tenant_conf.read().unwrap()
self.tenant_conf.read().unwrap().tenant_conf
}
pub fn effective_config(&self) -> TenantConf {
@@ -2098,84 +2130,95 @@ impl Tenant {
}
pub fn get_checkpoint_distance(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.checkpoint_distance
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
}
pub fn get_checkpoint_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.checkpoint_timeout
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
}
pub fn get_compaction_target_size(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.compaction_target_size
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
}
pub fn get_compaction_period(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.compaction_period
.unwrap_or(self.conf.default_tenant_conf.compaction_period)
}
pub fn get_compaction_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.compaction_threshold
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
pub fn get_gc_horizon(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.gc_horizon
.unwrap_or(self.conf.default_tenant_conf.gc_horizon)
}
pub fn get_gc_period(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.gc_period
.unwrap_or(self.conf.default_tenant_conf.gc_period)
}
pub fn get_image_creation_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.image_creation_threshold
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
}
pub fn get_pitr_interval(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.pitr_interval
.unwrap_or(self.conf.default_tenant_conf.pitr_interval)
}
pub fn get_trace_read_requests(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.trace_read_requests
.unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
}
pub fn get_min_resident_size_override(&self) -> Option<u64> {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.min_resident_size_override
.or(self.conf.default_tenant_conf.min_resident_size_override)
}
pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
*self.tenant_conf.write().unwrap() = new_tenant_conf;
self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
// Don't hold self.timelines.lock() during the notifies.
// There's no risk of deadlock right now, but there could be if we consolidate
// mutexes in struct Timeline in the future.
let timelines = self.list_timelines();
for timeline in timelines {
timeline.tenant_conf_updated();
}
}
pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
*self.tenant_conf.write().unwrap() = new_conf;
// Don't hold self.timelines.lock() during the notifies.
// There's no risk of deadlock right now, but there could be if we consolidate
// mutexes in struct Timeline in the future.
@@ -2245,10 +2288,9 @@ impl Tenant {
fn new(
state: TenantState,
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
attached_conf: AttachedTenantConf,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
tenant_id: TenantId,
generation: Generation,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
) -> Tenant {
@@ -2308,12 +2350,12 @@ impl Tenant {
Tenant {
tenant_id,
generation,
generation: attached_conf.location.generation,
conf,
// using now here is good enough approximation to catch tenants with really long
// activation times.
loading_started_at: Instant::now(),
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
tenant_conf: Arc::new(RwLock::new(attached_conf)),
timelines: Mutex::new(HashMap::new()),
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,
@@ -2331,52 +2373,123 @@ impl Tenant {
pub(super) fn load_tenant_config(
conf: &'static PageServerConf,
tenant_id: &TenantId,
) -> anyhow::Result<TenantConfOpt> {
let target_config_path = conf.tenant_config_path(tenant_id);
) -> anyhow::Result<LocationConf> {
let legacy_config_path = conf.tenant_config_path(tenant_id);
let config_path = conf.tenant_location_config_path(tenant_id);
info!("loading tenantconf from {target_config_path}");
if config_path.exists() {
// New-style config takes precedence
let deserialized = Self::read_config(&config_path)?;
Ok(toml_edit::de::from_document::<LocationConf>(deserialized)?)
} else if legacy_config_path.exists() {
// Upgrade path: found an old-style configuration only
let deserialized = Self::read_config(&legacy_config_path)?;
// FIXME If the config file is not found, assume that we're attaching
// a detached tenant and config is passed via attach command.
// https://github.com/neondatabase/neon/issues/1555
// OR: we're loading after incomplete deletion that managed to remove config.
if !target_config_path.exists() {
info!("tenant config not found in {target_config_path}");
return Ok(TenantConfOpt::default());
let mut tenant_conf = TenantConfOpt::default();
for (key, item) in deserialized.iter() {
match key {
"tenant_config" => {
tenant_conf = PageServerConf::parse_toml_tenant_conf(item).with_context(|| {
format!("Failed to parse config from file '{legacy_config_path}' as pageserver config")
})?;
}
_ => bail!(
"config file {legacy_config_path} has unrecognized pageserver option '{key}'"
),
}
}
// Legacy configs are implicitly in attached state
Ok(LocationConf::attached_single(
tenant_conf,
Generation::none(),
))
} else {
// FIXME If the config file is not found, assume that we're attaching
// a detached tenant and config is passed via attach command.
// https://github.com/neondatabase/neon/issues/1555
// OR: we're loading after incomplete deletion that managed to remove config.
info!(
"tenant config not found in {} or {}",
config_path, legacy_config_path
);
Ok(LocationConf::default())
}
}
fn read_config(path: &Utf8Path) -> anyhow::Result<toml_edit::Document> {
info!("loading tenant configuration from {path}");
// load and parse file
let config = fs::read_to_string(&target_config_path)
.with_context(|| format!("Failed to load config from path '{target_config_path}'"))?;
let config = fs::read_to_string(path)
.with_context(|| format!("Failed to load config from path '{path}'"))?;
let toml = config.parse::<toml_edit::Document>().with_context(|| {
format!("Failed to parse config from file '{target_config_path}' as toml file")
})?;
let mut tenant_conf = TenantConfOpt::default();
for (key, item) in toml.iter() {
match key {
"tenant_config" => {
tenant_conf = PageServerConf::parse_toml_tenant_conf(item).with_context(|| {
format!("Failed to parse config from file '{target_config_path}' as pageserver config")
})?;
}
_ => bail!(
"config file {target_config_path} has unrecognized pageserver option '{key}'"
),
}
}
Ok(tenant_conf)
config
.parse::<toml_edit::Document>()
.with_context(|| format!("Failed to parse config from file '{path}' as toml file"))
}
#[tracing::instrument(skip_all, fields(%tenant_id))]
pub(super) async fn persist_tenant_config(
conf: &'static PageServerConf,
tenant_id: &TenantId,
location_conf: &LocationConf,
) -> anyhow::Result<()> {
let legacy_config_path = conf.tenant_config_path(tenant_id);
let config_path = conf.tenant_location_config_path(tenant_id);
Self::persist_tenant_config_at(tenant_id, &config_path, &legacy_config_path, location_conf)
.await
}
#[tracing::instrument(skip_all, fields(%tenant_id))]
pub(super) async fn persist_tenant_config_at(
tenant_id: &TenantId,
config_path: &Utf8Path,
legacy_config_path: &Utf8Path,
location_conf: &LocationConf,
) -> anyhow::Result<()> {
// Forward compat: write out an old-style configuration that old versions can read, in case we roll back
Self::persist_tenant_config_legacy(
tenant_id,
legacy_config_path,
&location_conf.tenant_conf,
)
.await?;
if let LocationMode::Attached(attach_conf) = &location_conf.mode {
// Once we use LocationMode, generations are mandatory. If we aren't using generations,
// then drop out after writing legacy-style config.
if attach_conf.generation.is_none() {
tracing::debug!("Running without generations, not writing new-style LocationConf");
return Ok(());
}
}
info!("persisting tenantconf to {config_path}");
let mut conf_content = r#"# This file contains a specific per-tenant's config.
# It is read in case of pageserver restart.
"#
.to_string();
// Convert the config to a toml file.
conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?;
let conf_content = conf_content.as_bytes();
let temp_path = path_with_suffix_extension(config_path, TEMP_FILE_SUFFIX);
VirtualFile::crashsafe_overwrite(config_path, &temp_path, conf_content)
.await
.with_context(|| format!("write tenant {tenant_id} config to {config_path}"))?;
Ok(())
}
#[tracing::instrument(skip_all, fields(%tenant_id))]
async fn persist_tenant_config_legacy(
tenant_id: &TenantId,
target_config_path: &Utf8Path,
tenant_conf: TenantConfOpt,
tenant_conf: &TenantConfOpt,
) -> anyhow::Result<()> {
// imitate a try-block with a closure
info!("persisting tenantconf to {target_config_path}");
let mut conf_content = r#"# This file contains a specific per-tenant's config.
@@ -3076,7 +3189,7 @@ pub(crate) enum CreateTenantFilesMode {
pub(crate) async fn create_tenant_files(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
location_conf: &LocationConf,
tenant_id: &TenantId,
mode: CreateTenantFilesMode,
) -> anyhow::Result<Utf8PathBuf> {
@@ -3099,7 +3212,7 @@ pub(crate) async fn create_tenant_files(
let creation_result = try_create_target_tenant_dir(
conf,
tenant_conf,
location_conf,
tenant_id,
mode,
&temporary_tenant_dir,
@@ -3125,7 +3238,7 @@ pub(crate) async fn create_tenant_files(
async fn try_create_target_tenant_dir(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
location_conf: &LocationConf,
tenant_id: &TenantId,
mode: CreateTenantFilesMode,
temporary_tenant_dir: &Utf8Path,
@@ -3155,14 +3268,26 @@ async fn try_create_target_tenant_dir(
temporary_tenant_dir,
)
.with_context(|| format!("resolve tenant {tenant_id} temporary timelines dir"))?;
let temporary_tenant_config_path = rebase_directory(
let temporary_legacy_tenant_config_path = rebase_directory(
&conf.tenant_config_path(tenant_id),
target_tenant_directory,
temporary_tenant_dir,
)
.with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
let temporary_tenant_config_path = rebase_directory(
&conf.tenant_location_config_path(tenant_id),
target_tenant_directory,
temporary_tenant_dir,
)
.with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
Tenant::persist_tenant_config(tenant_id, &temporary_tenant_config_path, tenant_conf).await?;
Tenant::persist_tenant_config_at(
tenant_id,
&temporary_tenant_config_path,
&temporary_legacy_tenant_config_path,
location_conf,
)
.await?;
crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
format!(
@@ -3443,10 +3568,13 @@ pub mod harness {
let tenant = Arc::new(Tenant::new(
TenantState::Loading,
self.conf,
TenantConfOpt::from(self.tenant_conf),
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt::from(self.tenant_conf),
self.generation,
))
.unwrap(),
walredo_mgr,
self.tenant_id,
self.generation,
Some(self.remote_storage.clone()),
self.deletion_queue.new_client(),
));

View File

@@ -186,26 +186,21 @@ impl FileBlockReader {
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
Ok(write_guard.mark_valid().into())
}
}
}
}

View File

@@ -13,6 +13,7 @@ use pageserver_api::models;
use serde::{Deserialize, Serialize};
use std::num::NonZeroU64;
use std::time::Duration;
use utils::generation::Generation;
pub mod defaults {
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
@@ -44,7 +45,211 @@ pub mod defaults {
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
}
/// Per-tenant configuration options
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum AttachmentMode {
/// Our generation is current as far as we know, and as far as we know we are the only attached
/// pageserver. This is the "normal" attachment mode.
Single,
/// Our generation number is current as far as we know, but we are advised that another
/// pageserver is still attached, and therefore to avoid executing deletions. This is
/// the attachment mode of a pagesever that is the destination of a migration.
Multi,
/// Our generation number is superseded, or about to be superseded. We are advised
/// to avoid remote storage writes if possible, and to avoid sending billing data. This
/// is the attachment mode of a pageserver that is the origin of a migration.
Stale,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct AttachedLocationConfig {
pub(crate) generation: Generation,
pub(crate) attach_mode: AttachmentMode,
// TODO: add a flag to override AttachmentMode's policies under
// disk pressure (i.e. unblock uploads under disk pressure in Stale
// state, unblock deletions after timeout in Multi state)
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct SecondaryLocationConfig {
/// If true, keep the local cache warm by polling remote storage
pub(crate) warm: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) enum LocationMode {
Attached(AttachedLocationConfig),
Secondary(SecondaryLocationConfig),
}
/// Per-tenant, per-pageserver configuration. All pageservers use the same TenantConf,
/// but have distinct LocationConf.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct LocationConf {
/// The location-specific part of the configuration, describes the operating
/// mode of this pageserver for this tenant.
pub(crate) mode: LocationMode,
/// The pan-cluster tenant configuration, the same on all locations
pub(crate) tenant_conf: TenantConfOpt,
}
impl std::fmt::Debug for LocationConf {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.mode {
LocationMode::Attached(conf) => {
write!(
f,
"Attached {:?}, gen={:?}",
conf.attach_mode, conf.generation
)
}
LocationMode::Secondary(conf) => {
write!(f, "Secondary, warm={}", conf.warm)
}
}
}
}
impl AttachedLocationConfig {
/// Consult attachment mode to determine whether we are currently permitted
/// to delete layers. This is only advisory, not required for data safety.
/// See [`AttachmentMode`] for more context.
pub(crate) fn may_delete_layers_hint(&self) -> bool {
// TODO: add an override for disk pressure in AttachedLocationConfig,
// and respect it here.
match &self.attach_mode {
AttachmentMode::Single => true,
AttachmentMode::Multi | AttachmentMode::Stale => {
// In Multi mode we avoid doing deletions because some other
// attached pageserver might get 404 while trying to read
// a layer we delete which is still referenced in their metadata.
//
// In Stale mode, we avoid doing deletions because we expect
// that they would ultimately fail validation in the deletion
// queue due to our stale generation.
false
}
}
}
/// Whether we are currently hinted that it is worthwhile to upload layers.
/// This is only advisory, not required for data safety.
/// See [`AttachmentMode`] for more context.
pub(crate) fn may_upload_layers_hint(&self) -> bool {
// TODO: add an override for disk pressure in AttachedLocationConfig,
// and respect it here.
match &self.attach_mode {
AttachmentMode::Single | AttachmentMode::Multi => true,
AttachmentMode::Stale => {
// In Stale mode, we avoid doing uploads because we expect that
// our replacement pageserver will already have started its own
// IndexPart that will never reference layers we upload: it is
// wasteful.
false
}
}
}
}
impl LocationConf {
/// For use when loading from a legacy configuration: presence of a tenant
/// implies it is in AttachmentMode::Single, which used to be the only
/// possible state. This function should eventually be removed.
pub(crate) fn attached_single(tenant_conf: TenantConfOpt, generation: Generation) -> Self {
Self {
mode: LocationMode::Attached(AttachedLocationConfig {
generation,
attach_mode: AttachmentMode::Single,
}),
tenant_conf,
}
}
/// For use when attaching/re-attaching: update the generation stored in this
/// structure. If we were in a secondary state, promote to attached (posession
/// of a fresh generation implies this).
pub(crate) fn attach_in_generation(&mut self, generation: Generation) {
match &mut self.mode {
LocationMode::Attached(attach_conf) => {
attach_conf.generation = generation;
}
LocationMode::Secondary(_) => {
// We are promoted to attached by the control plane's re-attach response
self.mode = LocationMode::Attached(AttachedLocationConfig {
generation,
attach_mode: AttachmentMode::Single,
})
}
}
}
pub(crate) fn try_from(conf: &'_ models::LocationConfig) -> anyhow::Result<Self> {
let tenant_conf = TenantConfOpt::try_from(&conf.tenant_conf)?;
fn get_generation(conf: &'_ models::LocationConfig) -> Result<Generation, anyhow::Error> {
conf.generation
.ok_or_else(|| anyhow::anyhow!("Generation must be set when attaching"))
}
let mode = match &conf.mode {
models::LocationConfigMode::AttachedMulti => {
LocationMode::Attached(AttachedLocationConfig {
generation: get_generation(conf)?,
attach_mode: AttachmentMode::Multi,
})
}
models::LocationConfigMode::AttachedSingle => {
LocationMode::Attached(AttachedLocationConfig {
generation: get_generation(conf)?,
attach_mode: AttachmentMode::Single,
})
}
models::LocationConfigMode::AttachedStale => {
LocationMode::Attached(AttachedLocationConfig {
generation: get_generation(conf)?,
attach_mode: AttachmentMode::Stale,
})
}
models::LocationConfigMode::Secondary => {
anyhow::ensure!(conf.generation.is_none());
let warm = conf
.secondary_conf
.as_ref()
.map(|c| c.warm)
.unwrap_or(false);
LocationMode::Secondary(SecondaryLocationConfig { warm })
}
models::LocationConfigMode::Detached => {
// Should not have been called: API code should translate this mode
// into a detach rather than trying to decode it as a LocationConf
return Err(anyhow::anyhow!("Cannot decode a Detached configuration"));
}
};
Ok(Self { mode, tenant_conf })
}
}
impl Default for LocationConf {
// TODO: this should be removed once tenant loading can guarantee that we are never
// loading from a directory without a configuration.
// => tech debt since https://github.com/neondatabase/neon/issues/1555
fn default() -> Self {
Self {
mode: LocationMode::Attached(AttachedLocationConfig {
generation: Generation::none(),
attach_mode: AttachmentMode::Single,
}),
tenant_conf: TenantConfOpt::default(),
}
}
}
/// A tenant's calcuated configuration, which is the result of merging a
/// tenant's TenantConfOpt with the global TenantConf from PageServerConf.
///
/// For storing and transmitting individual tenant's configuration, see
/// TenantConfOpt.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct TenantConf {
// Flush out an inmemory layer, if it's holding WAL older than this

View File

@@ -197,6 +197,7 @@ async fn cleanup_remaining_fs_traces(
};
rm(conf.tenant_config_path(tenant_id), false).await?;
rm(conf.tenant_location_config_path(tenant_id), false).await?;
fail::fail_point!("tenant-delete-before-remove-timelines-dir", |_| {
Err(anyhow::anyhow!(

View File

@@ -72,36 +72,32 @@ impl EphemeralFile {
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum, self.file.path, e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
}
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum, self.file.path, e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));
}
};
} else {
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
@@ -171,7 +167,7 @@ impl EphemeralFile {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
write_guard.mark_valid();
let _ = write_guard.mark_valid();
// pre-warm successful
}
Err(e) => {

View File

@@ -406,4 +406,123 @@ mod tests {
METADATA_OLD_FORMAT_VERSION, METADATA_FORMAT_VERSION
);
}
#[test]
fn test_metadata_bincode_serde() {
let original_metadata = TimelineMetadata::new(
Lsn(0x200),
Some(Lsn(0x100)),
Some(TIMELINE_ID),
Lsn(0),
Lsn(0),
Lsn(0),
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
);
let metadata_bytes = original_metadata
.to_bytes()
.expect("Cannot create bytes array from metadata");
let metadata_bincode_be_bytes = original_metadata
.ser()
.expect("Cannot serialize the metadata");
// 8 bytes for the length of the vector
assert_eq!(metadata_bincode_be_bytes.len(), 8 + metadata_bytes.len());
let expected_bincode_bytes = {
let mut temp = vec![];
let len_bytes = metadata_bytes.len().to_be_bytes();
temp.extend_from_slice(&len_bytes);
temp.extend_from_slice(&metadata_bytes);
temp
};
assert_eq!(metadata_bincode_be_bytes, expected_bincode_bytes);
let deserialized_metadata = TimelineMetadata::des(&metadata_bincode_be_bytes).unwrap();
// Deserialized metadata has the metadata header, which is different from the serialized one.
// Reference: TimelineMetaData::to_bytes()
let expected_metadata = {
let mut temp_metadata = original_metadata;
let body_bytes = temp_metadata
.body
.ser()
.expect("Cannot serialize the metadata body");
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_FORMAT_VERSION,
checksum: crc32c::crc32c(&body_bytes),
};
temp_metadata.hdr = hdr;
temp_metadata
};
assert_eq!(deserialized_metadata, expected_metadata);
}
#[test]
fn test_metadata_bincode_serde_ensure_roundtrip() {
let original_metadata = TimelineMetadata::new(
Lsn(0x200),
Some(Lsn(0x100)),
Some(TIMELINE_ID),
Lsn(0),
Lsn(0),
Lsn(0),
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
);
let expected_bytes = vec![
/* bincode length encoding bytes */
0, 0, 0, 0, 0, 0, 2, 0, // 8 bytes for the length of the serialized vector
/* TimelineMetadataHeader */
4, 37, 101, 34, 0, 70, 0, 4, // checksum, size, format_version (4 + 2 + 2)
/* TimelineMetadataBodyV2 */
0, 0, 0, 0, 0, 0, 2, 0, // disk_consistent_lsn (8 bytes)
1, 0, 0, 0, 0, 0, 0, 1, 0, // prev_record_lsn (9 bytes)
1, 17, 34, 51, 68, 85, 102, 119, 136, 17, 34, 51, 68, 85, 102, 119,
136, // ancestor_timeline (17 bytes)
0, 0, 0, 0, 0, 0, 0, 0, // ancestor_lsn (8 bytes)
0, 0, 0, 0, 0, 0, 0, 0, // latest_gc_cutoff_lsn (8 bytes)
0, 0, 0, 0, 0, 0, 0, 0, // initdb_lsn (8 bytes)
0, 0, 0, 15, // pg_version (4 bytes)
/* padding bytes */
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0,
];
let metadata_ser_bytes = original_metadata.ser().unwrap();
assert_eq!(metadata_ser_bytes, expected_bytes);
let expected_metadata = {
let mut temp_metadata = original_metadata;
let body_bytes = temp_metadata
.body
.ser()
.expect("Cannot serialize the metadata body");
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_FORMAT_VERSION,
checksum: crc32c::crc32c(&body_bytes),
};
temp_metadata.hdr = hdr;
temp_metadata
};
let des_metadata = TimelineMetadata::des(&metadata_ser_bytes).unwrap();
assert_eq!(des_metadata, expected_metadata);
}
}

View File

@@ -24,9 +24,11 @@ use crate::control_plane_client::{
};
use crate::deletion_queue::DeletionQueueClient;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::config::{LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
use crate::tenant::{
create_tenant_files, AttachedTenantConf, CreateTenantFilesMode, Tenant, TenantState,
};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
use utils::crashsafe::path_with_suffix_extension;
@@ -38,6 +40,39 @@ use super::delete::DeleteTenantError;
use super::timeline::delete::DeleteTimelineFlow;
use super::TenantSharedResources;
/// For a tenant that appears in TenantsMap, it may either be
/// - `Attached`: has a full Tenant object, is elegible to service
/// reads and ingest WAL.
/// - `Secondary`: is only keeping a local cache warm.
///
/// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
/// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
/// its lifetime, and we can preserve some important safety invariants like `Tenant` always
/// having a properly acquired generation (Secondary doesn't need a generation)
#[derive(Clone)]
pub enum TenantSlot {
Attached(Arc<Tenant>),
Secondary,
}
impl TenantSlot {
/// Return the `Tenant` in this slot if attached, else None
fn get_attached(&self) -> Option<&Arc<Tenant>> {
match self {
Self::Attached(t) => Some(t),
Self::Secondary => None,
}
}
/// Consume self and return the `Tenant` that was in this slot if attached, else None
fn into_attached(self) -> Option<Arc<Tenant>> {
match self {
Self::Attached(t) => Some(t),
Self::Secondary => None,
}
}
}
/// The tenants known to the pageserver.
/// The enum variants are used to distinguish the different states that the pageserver can be in.
pub(crate) enum TenantsMap {
@@ -45,14 +80,27 @@ pub(crate) enum TenantsMap {
Initializing,
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
/// New tenants can be added using [`tenant_map_insert`].
Open(HashMap<TenantId, Arc<Tenant>>),
Open(HashMap<TenantId, TenantSlot>),
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
/// Existing tenants are still accessible, but no new tenants can be created.
ShuttingDown(HashMap<TenantId, Arc<Tenant>>),
ShuttingDown(HashMap<TenantId, TenantSlot>),
}
impl TenantsMap {
/// Convenience function for typical usage, where we want to get a `Tenant` object, for
/// working with attached tenants. If the TenantId is in the map but in Secondary state,
/// None is returned.
pub(crate) fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
m.get(tenant_id).and_then(TenantSlot::get_attached)
}
}
}
/// Get the contents of the map at this tenant ID, even if it is in secondary state.
pub(crate) fn get_slot(&self, tenant_id: &TenantId) -> Option<&TenantSlot> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
@@ -61,7 +109,9 @@ impl TenantsMap {
pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
m.remove(tenant_id).and_then(TenantSlot::into_attached)
}
}
}
}
@@ -101,61 +151,86 @@ async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<U
static TENANTS: Lazy<RwLock<TenantsMap>> = Lazy::new(|| RwLock::new(TenantsMap::Initializing));
/// Initialize repositories with locally available timelines.
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
/// are scheduled for download and added to the tenant once download is completed.
#[instrument(skip_all)]
pub async fn init_tenant_mgr(
fn emergency_generations(
tenant_confs: &HashMap<TenantId, anyhow::Result<LocationConf>>,
) -> HashMap<TenantId, Generation> {
tenant_confs
.iter()
.filter_map(|(tid, lc)| {
let lc = match lc {
Ok(lc) => lc,
Err(_) => return None,
};
let gen = match &lc.mode {
LocationMode::Attached(alc) => Some(alc.generation),
LocationMode::Secondary(_) => None,
};
gen.map(|g| (*tid, g))
})
.collect()
}
async fn init_load_generations(
conf: &'static PageServerConf,
resources: TenantSharedResources,
init_order: InitializationOrder,
cancel: CancellationToken,
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
let tenants_dir = conf.tenants_path();
let mut tenants = HashMap::new();
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
let tenant_generations = if let Some(client) = ControlPlaneClient::new(conf, &cancel) {
let result = match client.re_attach().await {
tenant_confs: &HashMap<TenantId, anyhow::Result<LocationConf>>,
resources: &TenantSharedResources,
cancel: &CancellationToken,
) -> anyhow::Result<Option<HashMap<TenantId, Generation>>> {
let generations = if conf.control_plane_emergency_mode {
error!(
"Emergency mode! Tenants will be attached unsafely using their last known generation"
);
emergency_generations(tenant_confs)
} else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
info!("Calling control plane API to re-attach tenants");
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
match client.re_attach().await {
Ok(tenants) => tenants,
Err(RetryForeverError::ShuttingDown) => {
anyhow::bail!("Shut down while waiting for control plane re-attach response")
}
};
// The deletion queue needs to know about the startup attachment state to decide which (if any) stored
// deletion list entries may still be valid. We provide that by pushing a recovery operation into
// the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
// are processed, even though we don't block on recovery completing here.
//
// Must only do this if remote storage is enabled, otherwise deletion queue
// is not running and channel push will fail.
if resources.remote_storage.is_some() {
resources
.deletion_queue_client
.recover(result.clone())
.await?;
}
Some(result)
} else {
info!("Control plane API not configured, tenant generations are disabled");
None
return Ok(None);
};
// The deletion queue needs to know about the startup attachment state to decide which (if any) stored
// deletion list entries may still be valid. We provide that by pushing a recovery operation into
// the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
// are processed, even though we don't block on recovery completing here.
//
// Must only do this if remote storage is enabled, otherwise deletion queue
// is not running and channel push will fail.
if resources.remote_storage.is_some() {
resources
.deletion_queue_client
.recover(generations.clone())
.await?;
}
Ok(Some(generations))
}
/// Initial stage of load: walk the local tenants directory, clean up any temp files,
/// and load configurations for the tenants we found.
async fn init_load_tenant_configs(
conf: &'static PageServerConf,
) -> anyhow::Result<HashMap<TenantId, anyhow::Result<LocationConf>>> {
let tenants_dir = conf.tenants_path();
let mut dir_entries = tenants_dir
.read_dir_utf8()
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
let mut configs = HashMap::new();
loop {
match dir_entries.next() {
None => break,
Some(Ok(dir_entry)) => {
let tenant_dir_path = dir_entry.path().to_path_buf();
Some(Ok(dentry)) => {
let tenant_dir_path = dentry.path().to_path_buf();
if crate::is_temporary(&tenant_dir_path) {
info!("Found temporary tenant directory, removing: {tenant_dir_path}");
// No need to use safe_remove_tenant_dir_all because this is already
@@ -166,96 +241,158 @@ pub async fn init_tenant_mgr(
tenant_dir_path, e
);
}
} else {
// This case happens if we:
// * crash during attach before creating the attach marker file
// * crash during tenant delete before removing tenant directory
let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
})?;
if is_empty {
info!("removing empty tenant directory {tenant_dir_path:?}");
if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
error!(
"Failed to remove empty tenant directory '{}': {e:#}",
tenant_dir_path
)
}
continue;
}
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
if tenant_ignore_mark_file.exists() {
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
continue;
}
let tenant_id = match tenant_dir_path
.file_name()
.unwrap_or_default()
.parse::<TenantId>()
{
Ok(id) => id,
Err(_) => {
warn!(
"Invalid tenant path (garbage in our repo directory?): {}",
tenant_dir_path
);
continue;
}
};
let generation = if let Some(generations) = &tenant_generations {
// We have a generation map: treat it as the authority for whether
// this tenant is really attached.
if let Some(gen) = generations.get(&tenant_id) {
*gen
} else {
info!("Detaching tenant {tenant_id}, control plane omitted it in re-attach response");
if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
error!(
"Failed to remove detached tenant directory '{}': {:?}",
tenant_dir_path, e
);
}
continue;
}
} else {
// Legacy mode: no generation information, any tenant present
// on local disk may activate
info!(
"Starting tenant {} in legacy mode, no generation",
tenant_dir_path
);
Generation::none()
};
match schedule_local_tenant_processing(
conf,
tenant_id,
&tenant_dir_path,
generation,
resources.clone(),
Some(init_order.clone()),
&TENANTS,
&ctx,
) {
Ok(tenant) => {
tenants.insert(tenant.tenant_id(), tenant);
}
Err(e) => {
error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
}
}
continue;
}
// This case happens if we:
// * crash during attach before creating the attach marker file
// * crash during tenant delete before removing tenant directory
let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
})?;
if is_empty {
info!("removing empty tenant directory {tenant_dir_path:?}");
if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
error!(
"Failed to remove empty tenant directory '{}': {e:#}",
tenant_dir_path
)
}
continue;
}
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
if tenant_ignore_mark_file.exists() {
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
continue;
}
let tenant_id = match tenant_dir_path
.file_name()
.unwrap_or_default()
.parse::<TenantId>()
{
Ok(id) => id,
Err(_) => {
warn!(
"Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",
);
continue;
}
};
configs.insert(tenant_id, Tenant::load_tenant_config(conf, &tenant_id));
}
Some(Err(e)) => {
// On error, print it, but continue with the other tenants. If we error out
// here, the pageserver startup fails altogether, causing outage for *all*
// tenants. That seems worse.
error!(
"Failed to list tenants dir entry in directory {tenants_dir:?}, reason: {e:?}"
// An error listing the top level directory indicates serious problem
// with local filesystem: we will fail to load, and fail to start.
anyhow::bail!(e);
}
}
}
Ok(configs)
}
/// Initialize repositories with locally available timelines.
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
/// are scheduled for download and added to the tenant once download is completed.
#[instrument(skip_all)]
pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
resources: TenantSharedResources,
init_order: InitializationOrder,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let mut tenants = HashMap::new();
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
// Scan local filesystem for attached tenants
let tenant_configs = init_load_tenant_configs(conf).await?;
// Determine which tenants are to be attached
let tenant_generations =
init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
// Construct `Tenant` objects and start them running
for (tenant_id, location_conf) in tenant_configs {
let tenant_dir_path = conf.tenant_path(&tenant_id);
let mut location_conf = match location_conf {
Ok(l) => l,
Err(e) => {
warn!(%tenant_id, "Marking tenant broken, failed to {e:#}");
tenants.insert(
tenant_id,
TenantSlot::Attached(Tenant::create_broken_tenant(
conf,
tenant_id,
format!("{}", e),
)),
);
continue;
}
};
let generation = if let Some(generations) = &tenant_generations {
// We have a generation map: treat it as the authority for whether
// this tenant is really attached.
if let Some(gen) = generations.get(&tenant_id) {
*gen
} else {
match &location_conf.mode {
LocationMode::Secondary(_) => {
// We do not require the control plane's permission for secondary mode
// tenants, because they do no remote writes and hence require no
// generation number
info!(%tenant_id, "Loaded tenant in secondary mode");
tenants.insert(tenant_id, TenantSlot::Secondary);
}
LocationMode::Attached(_) => {
// TODO: augment re-attach API to enable the control plane to
// instruct us about secondary attachments. That way, instead of throwing
// away local state, we can gracefully fall back to secondary here, if the control
// plane tells us so.
// (https://github.com/neondatabase/neon/issues/5377)
info!(%tenant_id, "Detaching tenant, control plane omitted it in re-attach response");
if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
error!(%tenant_id,
"Failed to remove detached tenant directory '{tenant_dir_path}': {e:?}",
);
}
}
};
continue;
}
} else {
// Legacy mode: no generation information, any tenant present
// on local disk may activate
info!(%tenant_id, "Starting tenant in legacy mode, no generation",);
Generation::none()
};
// Presence of a generation number implies attachment: attach the tenant
// if it wasn't already, and apply the generation number.
location_conf.attach_in_generation(generation);
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
match schedule_local_tenant_processing(
conf,
tenant_id,
&tenant_dir_path,
AttachedTenantConf::try_from(location_conf)?,
resources.clone(),
Some(init_order.clone()),
&TENANTS,
&ctx,
) {
Ok(tenant) => {
tenants.insert(tenant.tenant_id(), TenantSlot::Attached(tenant));
}
Err(e) => {
error!(%tenant_id, "Failed to start tenant: {e:#}");
}
}
}
@@ -273,7 +410,7 @@ pub(crate) fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_id: TenantId,
tenant_path: &Utf8Path,
generation: Generation,
location_conf: AttachedTenantConf,
resources: TenantSharedResources,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
@@ -310,7 +447,7 @@ pub(crate) fn schedule_local_tenant_processing(
"attaching mark file present but no remote storage configured".to_string(),
)
} else {
match Tenant::spawn_attach(conf, tenant_id, generation, resources, tenants, ctx) {
match Tenant::spawn_attach(conf, tenant_id, resources, location_conf, tenants, ctx) {
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
@@ -322,7 +459,13 @@ pub(crate) fn schedule_local_tenant_processing(
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
// Start loading the tenant into memory. It will initially be in Loading state.
Tenant::spawn_load(
conf, tenant_id, generation, resources, init_order, tenants, ctx,
conf,
tenant_id,
location_conf,
resources,
init_order,
tenants,
ctx,
)
};
Ok(tenant)
@@ -378,7 +521,16 @@ async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock<TenantsMap>) {
let res = {
let (_guard, shutdown_progress) = completion::channel();
tenant.shutdown(shutdown_progress, freeze_and_flush).await
match tenant {
TenantSlot::Attached(t) => {
t.shutdown(shutdown_progress, freeze_and_flush).await
}
TenantSlot::Secondary => {
// TODO: once secondary mode downloads are implemented,
// ensure they have all stopped before we reach this point.
Ok(())
}
}
};
if let Err(other_progress) = res {
@@ -451,16 +603,19 @@ pub async fn create_tenant(
ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> {
tenant_map_insert(tenant_id, || async {
let location_conf = LocationConf::attached_single(tenant_conf, generation);
// We're holding the tenants lock in write mode while doing local IO.
// If this section ever becomes contentious, introduce a new `TenantState::Creating`
// and do the work in that state.
let tenant_directory = super::create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Create).await?;
let tenant_directory = super::create_tenant_files(conf, &location_conf, &tenant_id, CreateTenantFilesMode::Create).await?;
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant =
schedule_local_tenant_processing(conf, tenant_id, &tenant_directory,
generation, resources, None, &TENANTS, ctx)?;
AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -489,14 +644,126 @@ pub async fn set_new_tenant_config(
info!("configuring tenant {tenant_id}");
let tenant = get_tenant(tenant_id, true).await?;
let tenant_config_path = conf.tenant_config_path(&tenant_id);
Tenant::persist_tenant_config(&tenant_id, &tenant_config_path, new_tenant_conf)
// This is a legacy API that only operates on attached tenants: the preferred
// API to use is the location_config/ endpoint, which lets the caller provide
// the full LocationConf.
let location_conf = LocationConf::attached_single(new_tenant_conf, tenant.generation);
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf)
.await
.map_err(SetNewTenantConfigError::Persist)?;
tenant.set_new_tenant_config(new_tenant_conf);
Ok(())
}
#[instrument(skip_all, fields(tenant_id, new_location_config))]
pub(crate) async fn upsert_location(
conf: &'static PageServerConf,
tenant_id: TenantId,
new_location_config: LocationConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
ctx: &RequestContext,
) -> Result<(), anyhow::Error> {
info!("configuring tenant location {tenant_id} to state {new_location_config:?}");
let mut existing_tenant = match get_tenant(tenant_id, false).await {
Ok(t) => Some(t),
Err(GetTenantError::NotFound(_)) => None,
Err(e) => anyhow::bail!(e),
};
// If we need to shut down a Tenant, do that first
let shutdown_tenant = match (&new_location_config.mode, &existing_tenant) {
(LocationMode::Secondary(_), Some(t)) => Some(t),
(LocationMode::Attached(attach_conf), Some(t)) => {
if attach_conf.generation != t.generation {
Some(t)
} else {
None
}
}
_ => None,
};
// TODO: currently we risk concurrent operations interfering with the tenant
// while we await shutdown, but we also should not hold the TenantsMap lock
// across the whole operation. Before we start using this function in production,
// a follow-on change will revise how concurrency is handled in TenantsMap.
// (https://github.com/neondatabase/neon/issues/5378)
if let Some(tenant) = shutdown_tenant {
let (_guard, progress) = utils::completion::channel();
info!("Shutting down attached tenant");
match tenant.shutdown(progress, false).await {
Ok(()) => {}
Err(barrier) => {
info!("Shutdown already in progress, waiting for it to complete");
barrier.wait().await;
}
}
existing_tenant = None;
}
if let Some(tenant) = existing_tenant {
// Update the existing tenant
Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
tenant.set_new_location_config(AttachedTenantConf::try_from(new_location_config)?);
} else {
// Upsert a fresh TenantSlot into TenantsMap. Do it within the map write lock,
// and re-check that the state of anything we are replacing is as expected.
tenant_map_upsert_slot(tenant_id, |old_value| async move {
if let Some(TenantSlot::Attached(t)) = old_value {
if !matches!(t.current_state(), TenantState::Stopping { .. }) {
anyhow::bail!("Tenant state changed during location configuration update");
}
}
let new_slot = match &new_location_config.mode {
LocationMode::Secondary(_) => TenantSlot::Secondary,
LocationMode::Attached(_attach_config) => {
// Do a schedule_local_tenant_processing
// FIXME: should avoid doing this disk I/O inside the TenantsMap lock,
// we have the same problem in load_tenant/attach_tenant. Probably
// need a lock in TenantSlot to fix this.
Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
let tenant_path = conf.tenant_path(&tenant_id);
let resources = TenantSharedResources {
broker_client,
remote_storage,
deletion_queue_client,
};
let new_tenant = schedule_local_tenant_processing(
conf,
tenant_id,
&tenant_path,
AttachedTenantConf::try_from(new_location_config)?,
resources,
None,
&TENANTS,
ctx,
)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
TenantSlot::Attached(new_tenant)
}
};
Ok(new_slot)
})
.await?;
}
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub enum GetTenantError {
#[error("Tenant {0} not found")]
@@ -657,7 +924,12 @@ pub async fn load_tenant(
remote_storage,
deletion_queue_client
};
let new_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_path, generation, resources, None, &TENANTS, ctx)
let mut location_conf = Tenant::load_tenant_config(conf, &tenant_id).map_err( TenantMapInsertError::Other)?;
location_conf.attach_in_generation(generation);
Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
let new_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_path, AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -710,7 +982,10 @@ pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapLis
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
};
Ok(m.iter()
.map(|(id, tenant)| (*id, tenant.current_state()))
.filter_map(|(id, tenant)| match tenant {
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
TenantSlot::Secondary => None,
})
.collect())
}
@@ -727,7 +1002,8 @@ pub async fn attach_tenant(
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, || async {
let tenant_dir = create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Attach).await?;
let location_conf = LocationConf::attached_single(tenant_conf, generation);
let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id, CreateTenantFilesMode::Attach).await?;
// TODO: tenant directory remains on disk if we bail out from here on.
// See https://github.com/neondatabase/neon/issues/4233
@@ -738,8 +1014,7 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, generation, resources, None, &TENANTS, ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -762,8 +1037,10 @@ pub enum TenantMapInsertError {
ShuttingDown,
#[error("tenant {0} already exists, state: {1:?}")]
TenantAlreadyExists(TenantId, TenantState),
#[error("tenant {0} already exists in secondary state")]
TenantExistsSecondary(TenantId),
#[error(transparent)]
Closure(#[from] anyhow::Error),
Other(#[from] anyhow::Error),
}
/// Give the given closure access to the tenants map entry for the given `tenant_id`, iff that
@@ -787,20 +1064,47 @@ where
TenantsMap::Open(m) => m,
};
match m.entry(tenant_id) {
hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
tenant_id,
e.get().current_state(),
)),
hash_map::Entry::Occupied(e) => match e.get() {
TenantSlot::Attached(t) => Err(TenantMapInsertError::TenantAlreadyExists(
tenant_id,
t.current_state(),
)),
TenantSlot::Secondary => Err(TenantMapInsertError::TenantExistsSecondary(tenant_id)),
},
hash_map::Entry::Vacant(v) => match insert_fn().await {
Ok(tenant) => {
v.insert(tenant.clone());
v.insert(TenantSlot::Attached(tenant.clone()));
Ok(tenant)
}
Err(e) => Err(TenantMapInsertError::Closure(e)),
Err(e) => Err(TenantMapInsertError::Other(e)),
},
}
}
async fn tenant_map_upsert_slot<'a, F, R>(
tenant_id: TenantId,
upsert_fn: F,
) -> Result<(), TenantMapInsertError>
where
F: FnOnce(Option<TenantSlot>) -> R,
R: std::future::Future<Output = anyhow::Result<TenantSlot>>,
{
let mut guard = TENANTS.write().await;
let m = match &mut *guard {
TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing),
TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown),
TenantsMap::Open(m) => m,
};
match upsert_fn(m.remove(&tenant_id)).await {
Ok(upsert_val) => {
m.insert(tenant_id, upsert_val);
Ok(())
}
Err(e) => Err(TenantMapInsertError::Other(e)),
}
}
/// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
/// Allows to remove other tenant resources manually, via `tenant_cleanup`.
/// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
@@ -820,28 +1124,40 @@ where
// tenant-wde cleanup operations may take some time (removing the entire tenant directory), we want to
// avoid holding the lock for the entire process.
let tenant = {
tenants
match tenants
.write()
.await
.get(&tenant_id)
.cloned()
.get_slot(&tenant_id)
.ok_or(TenantStateError::NotFound(tenant_id))?
{
TenantSlot::Attached(t) => Some(t.clone()),
TenantSlot::Secondary => None,
}
};
// allow pageserver shutdown to await for our completion
let (_guard, progress) = completion::channel();
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
let freeze_and_flush = false;
// If the tenant was attached, shut it down gracefully. For secondary
// locations this part is not necessary
match tenant {
Some(attached_tenant) => {
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
let freeze_and_flush = false;
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
// that we can continue safely to cleanup.
match tenant.shutdown(progress, freeze_and_flush).await {
Ok(()) => {}
Err(_other) => {
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
// wait for it but return an error right away because these are distinct requests.
return Err(TenantStateError::IsStopping(tenant_id));
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
// that we can continue safely to cleanup.
match attached_tenant.shutdown(progress, freeze_and_flush).await {
Ok(()) => {}
Err(_other) => {
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
// wait for it but return an error right away because these are distinct requests.
return Err(TenantStateError::IsStopping(tenant_id));
}
}
}
None => {
// Nothing to wait on when not attached, proceed.
}
}
@@ -932,6 +1248,8 @@ mod tests {
use std::sync::Arc;
use tracing::{info_span, Instrument};
use crate::tenant::mgr::TenantSlot;
use super::{super::harness::TenantHarness, TenantsMap};
#[tokio::test(start_paused = true)]
@@ -953,7 +1271,7 @@ mod tests {
// tenant harness configures the logging and we cannot escape it
let _e = info_span!("testing", tenant_id = %id).entered();
let tenants = HashMap::from([(id, t.clone())]);
let tenants = HashMap::from([(id, TenantSlot::Attached(t.clone()))]);
let tenants = Arc::new(tokio::sync::RwLock::new(TenantsMap::Open(tenants)));
let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();

View File

@@ -901,9 +901,27 @@ impl RemoteTimelineClient {
.await
.context("list prefixes")?;
let remaining: Vec<RemotePath> = remaining
// We will delete the current index_part object last, since it acts as a deletion
// marker via its deleted_at attribute
let latest_index = remaining
.iter()
.filter(|p| {
p.object_name()
.map(|n| n.starts_with(IndexPart::FILE_NAME))
.unwrap_or(false)
})
.filter_map(|path| parse_remote_index_path(path.clone()).map(|gen| (path, gen)))
.max_by_key(|i| i.1)
.map(|i| i.0.clone())
.unwrap_or(
// No generation-suffixed indices, assume we are dealing with
// a legacy index.
remote_index_path(&self.tenant_id, &self.timeline_id, Generation::none()),
);
let remaining_layers: Vec<RemotePath> = remaining
.into_iter()
.filter(|p| p.object_name() != Some(IndexPart::FILE_NAME))
.filter(|p| p!= &latest_index)
.inspect(|path| {
if let Some(name) = path.object_name() {
info!(%name, "deleting a file not referenced from index_part.json");
@@ -913,9 +931,11 @@ impl RemoteTimelineClient {
})
.collect();
let not_referenced_count = remaining.len();
if !remaining.is_empty() {
self.deletion_queue_client.push_immediate(remaining).await?;
let not_referenced_count = remaining_layers.len();
if !remaining_layers.is_empty() {
self.deletion_queue_client
.push_immediate(remaining_layers)
.await?;
}
fail::fail_point!("timeline-delete-before-index-delete", |_| {
@@ -924,11 +944,9 @@ impl RemoteTimelineClient {
))?
});
let index_file_path = timeline_storage_path.join(Utf8Path::new(IndexPart::FILE_NAME));
debug!("enqueuing index part deletion");
self.deletion_queue_client
.push_immediate([index_file_path].to_vec())
.push_immediate([latest_index].to_vec())
.await?;
// Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait

View File

@@ -6,7 +6,6 @@ use std::collections::HashMap;
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::bin_ser::SerializeError;
use crate::tenant::metadata::TimelineMetadata;
@@ -58,7 +57,6 @@ impl LayerFileMetadata {
///
/// This type needs to be backwards and forwards compatible. When changing the fields,
/// remember to add a test case for the changed version.
#[serde_as]
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct IndexPart {
/// Debugging aid describing the version of this type.
@@ -78,7 +76,6 @@ pub struct IndexPart {
// 'disk_consistent_lsn' is a copy of the 'disk_consistent_lsn' in the metadata.
// It's duplicated for convenience when reading the serialized structure, but is
// private because internally we would read from metadata instead.
#[serde_as(as = "DisplayFromStr")]
disk_consistent_lsn: Lsn,
#[serde(rename = "metadata_bytes")]

View File

@@ -29,7 +29,6 @@ use tenant_size_model::{Segment, StorageModel};
/// needs. We will convert this into a StorageModel when it's time to perform
/// the calculation.
///
#[serde_with::serde_as]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct ModelInputs {
pub segments: Vec<SegmentMeta>,
@@ -37,11 +36,9 @@ pub struct ModelInputs {
}
/// A [`Segment`], with some extra information for display purposes
#[serde_with::serde_as]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct SegmentMeta {
pub segment: Segment,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub timeline_id: TimelineId,
pub kind: LsnKind,
}
@@ -77,32 +74,22 @@ pub enum LsnKind {
/// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as
/// part of [`ModelInputs`] from the HTTP api, explaining the inputs.
#[serde_with::serde_as]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct TimelineInputs {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub timeline_id: TimelineId,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
pub ancestor_id: Option<TimelineId>,
#[serde_as(as = "serde_with::DisplayFromStr")]
ancestor_lsn: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
last_record: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
latest_gc_cutoff: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
horizon_cutoff: Lsn,
#[serde_as(as = "serde_with::DisplayFromStr")]
pitr_cutoff: Lsn,
/// Cutoff point based on GC settings
#[serde_as(as = "serde_with::DisplayFromStr")]
next_gc_cutoff: Lsn,
/// Cutoff point calculated from the user-supplied 'max_retention_period'
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
retention_param_cutoff: Option<Lsn>,
}

View File

@@ -511,8 +511,7 @@ impl DeltaLayer {
///
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
let mut summary_buf = Vec::new();
summary_buf.resize(PAGE_SZ, 0);
let mut summary_buf = vec![0; PAGE_SZ];
file.read_exact_at(&mut summary_buf, 0)?;
let summary = Summary::des_prefix(&summary_buf)?;

View File

@@ -400,8 +400,7 @@ impl ImageLayer {
///
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
let mut summary_buf = Vec::new();
summary_buf.resize(PAGE_SZ, 0);
let mut summary_buf = vec![0; PAGE_SZ];
file.read_exact_at(&mut summary_buf, 0)?;
let summary = Summary::des_prefix(&summary_buf)?;
let metadata = file

View File

@@ -91,12 +91,12 @@ use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::debug_assert_current_span_has_tenant_and_timeline_id;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::storage_layer::{
AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStatsReset, PersistentLayerDesc,
};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) enum FlushLoopState {
@@ -149,7 +149,7 @@ pub struct TimelineResources {
pub struct Timeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
myself: Weak<Self>,
@@ -158,6 +158,9 @@ pub struct Timeline {
/// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
/// Never changes for the lifetime of this [`Timeline`] object.
///
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
generation: Generation,
pub pg_version: u32,
@@ -1378,42 +1381,42 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
// Private functions
impl Timeline {
fn get_checkpoint_distance(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.checkpoint_distance
.unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
}
fn get_checkpoint_timeout(&self) -> Duration {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.checkpoint_timeout
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
}
fn get_compaction_target_size(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.compaction_target_size
.unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
}
fn get_compaction_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.compaction_threshold
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
fn get_image_creation_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.image_creation_threshold
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
}
fn get_eviction_policy(&self) -> EvictionPolicy {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.eviction_policy
.unwrap_or(self.conf.default_tenant_conf.eviction_policy)
@@ -1429,7 +1432,7 @@ impl Timeline {
}
fn get_gc_feedback(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap();
let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
tenant_conf
.gc_feedback
.unwrap_or(self.conf.default_tenant_conf.gc_feedback)
@@ -1442,7 +1445,7 @@ impl Timeline {
// The threshold is embedded in the metric. So, we need to update it.
{
let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
&self.tenant_conf.read().unwrap(),
&self.tenant_conf.read().unwrap().tenant_conf,
&self.conf.default_tenant_conf,
);
let tenant_id_str = self.tenant_id.to_string();
@@ -1461,7 +1464,7 @@ impl Timeline {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
tenant_conf: Arc<RwLock<AttachedTenantConf>>,
metadata: &TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
timeline_id: TimelineId,
@@ -1484,7 +1487,7 @@ impl Timeline {
let evictions_low_residence_duration_metric_threshold =
Self::get_evictions_low_residence_duration_metric_threshold(
&tenant_conf_guard,
&tenant_conf_guard.tenant_conf,
&conf.default_tenant_conf,
);
drop(tenant_conf_guard);
@@ -1649,12 +1652,15 @@ impl Timeline {
let tenant_conf_guard = self.tenant_conf.read().unwrap();
let wal_connect_timeout = tenant_conf_guard
.tenant_conf
.walreceiver_connect_timeout
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
let lagging_wal_timeout = tenant_conf_guard
.tenant_conf
.lagging_wal_timeout
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
let max_lsn_wal_lag = tenant_conf_guard
.tenant_conf
.max_lsn_wal_lag
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
drop(tenant_conf_guard);
@@ -2357,7 +2363,7 @@ impl Timeline {
// during branch creation.
match ancestor.wait_to_become_active(ctx).await {
Ok(()) => {}
Err(state) if state == TimelineState::Stopping => {
Err(TimelineState::Stopping) => {
return Err(PageReconstructError::AncestorStopping(ancestor.timeline_id));
}
Err(state) => {
@@ -3273,13 +3279,10 @@ struct CompactLevel0Phase1StatsBuilder {
new_deltas_size: Option<u64>,
}
#[serde_as]
#[derive(serde::Serialize)]
struct CompactLevel0Phase1Stats {
version: u64,
#[serde_as(as = "serde_with::DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "serde_with::DisplayFromStr")]
timeline_id: TimelineId,
read_lock_acquisition_micros: RecordedDuration,
read_lock_held_spawn_blocking_startup_micros: RecordedDuration,

View File

@@ -825,7 +825,7 @@ impl PostgresRedoManager {
while nwrite < writebuf.len() {
let n = loop {
match nix::poll::poll(&mut pollfds[0..2], wal_redo_timeout.as_millis() as i32) {
Err(e) if e == nix::errno::Errno::EINTR => continue,
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
@@ -917,7 +917,7 @@ impl PostgresRedoManager {
// and forward any logging information that the child writes to its stderr to the page server's log.
let n = loop {
match nix::poll::poll(&mut pollfds[1..3], wal_redo_timeout.as_millis() as i32) {
Err(e) if e == nix::errno::Errno::EINTR => continue,
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;

View File

@@ -7,12 +7,12 @@ OBJS = \
extension_server.o \
file_cache.o \
libpagestore.o \
libpqwalproposer.o \
neon.o \
neon_utils.o \
pagestore_smgr.o \
relsize_cache.o \
walproposer.o \
walproposer_utils.o \
walproposer_pg.o \
control_plane_connector.o
PG_CPPFLAGS = -I$(libpq_srcdir)

View File

@@ -30,7 +30,7 @@
#include "neon.h"
#include "walproposer.h"
#include "walproposer_utils.h"
#include "neon_utils.h"
#define PageStoreTrace DEBUG5

View File

@@ -1,424 +0,0 @@
#include "postgres.h"
#include "libpq-fe.h"
#include "neon.h"
#include "walproposer.h"
/* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */
struct WalProposerConn
{
PGconn *pg_conn;
bool is_nonblocking; /* whether the connection is non-blocking */
char *recvbuf; /* last received data from
* walprop_async_read */
};
/* Helper function */
static bool
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
{
/* If we're already correctly blocking or nonblocking, all good */
if (is_nonblocking == conn->is_nonblocking)
return true;
/* Otherwise, set it appropriately */
if (PQsetnonblocking(conn->pg_conn, is_nonblocking) == -1)
return false;
conn->is_nonblocking = is_nonblocking;
return true;
}
/* Exported function definitions */
char *
walprop_error_message(WalProposerConn *conn)
{
return PQerrorMessage(conn->pg_conn);
}
WalProposerConnStatusType
walprop_status(WalProposerConn *conn)
{
switch (PQstatus(conn->pg_conn))
{
case CONNECTION_OK:
return WP_CONNECTION_OK;
case CONNECTION_BAD:
return WP_CONNECTION_BAD;
default:
return WP_CONNECTION_IN_PROGRESS;
}
}
WalProposerConn *
walprop_connect_start(char *conninfo, char *password)
{
WalProposerConn *conn;
PGconn *pg_conn;
const char *keywords[3];
const char *values[3];
int n;
/*
* Connect using the given connection string. If the
* NEON_AUTH_TOKEN environment variable was set, use that as
* the password.
*
* The connection options are parsed in the order they're given, so
* when we set the password before the connection string, the
* connection string can override the password from the env variable.
* Seems useful, although we don't currently use that capability
* anywhere.
*/
n = 0;
if (password)
{
keywords[n] = "password";
values[n] = password;
n++;
}
keywords[n] = "dbname";
values[n] = conninfo;
n++;
keywords[n] = NULL;
values[n] = NULL;
n++;
pg_conn = PQconnectStartParams(keywords, values, 1);
/*
* Allocation of a PQconn can fail, and will return NULL. We want to fully
* replicate the behavior of PQconnectStart here.
*/
if (!pg_conn)
return NULL;
/*
* And in theory this allocation can fail as well, but it's incredibly
* unlikely if we just successfully allocated a PGconn.
*
* palloc will exit on failure though, so there's not much we could do if
* it *did* fail.
*/
conn = palloc(sizeof(WalProposerConn));
conn->pg_conn = pg_conn;
conn->is_nonblocking = false; /* connections always start in blocking
* mode */
conn->recvbuf = NULL;
return conn;
}
WalProposerConnectPollStatusType
walprop_connect_poll(WalProposerConn *conn)
{
WalProposerConnectPollStatusType return_val;
switch (PQconnectPoll(conn->pg_conn))
{
case PGRES_POLLING_FAILED:
return_val = WP_CONN_POLLING_FAILED;
break;
case PGRES_POLLING_READING:
return_val = WP_CONN_POLLING_READING;
break;
case PGRES_POLLING_WRITING:
return_val = WP_CONN_POLLING_WRITING;
break;
case PGRES_POLLING_OK:
return_val = WP_CONN_POLLING_OK;
break;
/*
* There's a comment at its source about this constant being
* unused. We'll expect it's never returned.
*/
case PGRES_POLLING_ACTIVE:
elog(FATAL, "Unexpected PGRES_POLLING_ACTIVE returned from PQconnectPoll");
/*
* This return is never actually reached, but it's here to make
* the compiler happy
*/
return WP_CONN_POLLING_FAILED;
default:
Assert(false);
return_val = WP_CONN_POLLING_FAILED; /* keep the compiler quiet */
}
return return_val;
}
bool
walprop_send_query(WalProposerConn *conn, char *query)
{
/*
* We need to be in blocking mode for sending the query to run without
* requiring a call to PQflush
*/
if (!ensure_nonblocking_status(conn, false))
return false;
/* PQsendQuery returns 1 on success, 0 on failure */
if (!PQsendQuery(conn->pg_conn, query))
return false;
return true;
}
WalProposerExecStatusType
walprop_get_query_result(WalProposerConn *conn)
{
PGresult *result;
WalProposerExecStatusType return_val;
/* Marker variable if we need to log an unexpected success result */
char *unexpected_success = NULL;
/* Consume any input that we might be missing */
if (!PQconsumeInput(conn->pg_conn))
return WP_EXEC_FAILED;
if (PQisBusy(conn->pg_conn))
return WP_EXEC_NEEDS_INPUT;
result = PQgetResult(conn->pg_conn);
/*
* PQgetResult returns NULL only if getting the result was successful &
* there's no more of the result to get.
*/
if (!result)
{
elog(WARNING, "[libpqwalproposer] Unexpected successful end of command results");
return WP_EXEC_UNEXPECTED_SUCCESS;
}
/* Helper macro to reduce boilerplate */
#define UNEXPECTED_SUCCESS(msg) \
return_val = WP_EXEC_UNEXPECTED_SUCCESS; \
unexpected_success = msg; \
break;
switch (PQresultStatus(result))
{
/* "true" success case */
case PGRES_COPY_BOTH:
return_val = WP_EXEC_SUCCESS_COPYBOTH;
break;
/* Unexpected success case */
case PGRES_EMPTY_QUERY:
UNEXPECTED_SUCCESS("empty query return");
case PGRES_COMMAND_OK:
UNEXPECTED_SUCCESS("data-less command end");
case PGRES_TUPLES_OK:
UNEXPECTED_SUCCESS("tuples return");
case PGRES_COPY_OUT:
UNEXPECTED_SUCCESS("'Copy Out' response");
case PGRES_COPY_IN:
UNEXPECTED_SUCCESS("'Copy In' response");
case PGRES_SINGLE_TUPLE:
UNEXPECTED_SUCCESS("single tuple return");
case PGRES_PIPELINE_SYNC:
UNEXPECTED_SUCCESS("pipeline sync point");
/* Failure cases */
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
case PGRES_PIPELINE_ABORTED:
return_val = WP_EXEC_FAILED;
break;
default:
Assert(false);
return_val = WP_EXEC_FAILED; /* keep the compiler quiet */
}
if (unexpected_success)
elog(WARNING, "[libpqwalproposer] Unexpected successful %s", unexpected_success);
return return_val;
}
pgsocket
walprop_socket(WalProposerConn *conn)
{
return PQsocket(conn->pg_conn);
}
int
walprop_flush(WalProposerConn *conn)
{
return (PQflush(conn->pg_conn));
}
void
walprop_finish(WalProposerConn *conn)
{
if (conn->recvbuf != NULL)
PQfreemem(conn->recvbuf);
PQfinish(conn->pg_conn);
pfree(conn);
}
/*
* Receive a message from the safekeeper.
*
* On success, the data is placed in *buf. It is valid until the next call
* to this function.
*/
PGAsyncReadResult
walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
{
int result;
if (conn->recvbuf != NULL)
{
PQfreemem(conn->recvbuf);
conn->recvbuf = NULL;
}
/* Call PQconsumeInput so that we have the data we need */
if (!PQconsumeInput(conn->pg_conn))
{
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
}
/*
* The docs for PQgetCopyData list the return values as: 0 if the copy is
* still in progress, but no "complete row" is available -1 if the copy is
* done -2 if an error occurred (> 0) if it was successful; that value is
* the amount transferred.
*
* The protocol we use between walproposer and safekeeper means that we
* *usually* wouldn't expect to see that the copy is done, but this can
* sometimes be triggered by the server returning an ErrorResponse (which
* also happens to have the effect that the copy is done).
*/
switch (result = PQgetCopyData(conn->pg_conn, &conn->recvbuf, true))
{
case 0:
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_TRY_AGAIN;
case -1:
{
/*
* If we get -1, it's probably because of a server error; the
* safekeeper won't normally send a CopyDone message.
*
* We can check PQgetResult to make sure that the server
* failed; it'll always result in PGRES_FATAL_ERROR
*/
ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn));
if (status != PGRES_FATAL_ERROR)
elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status);
/*
* If there was actually an error, it'll be properly reported
* by calls to PQerrorMessage -- we don't have to do anything
* else
*/
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
}
case -2:
*amount = 0;
*buf = NULL;
return PG_ASYNC_READ_FAIL;
default:
/* Positive values indicate the size of the returned result */
*amount = result;
*buf = conn->recvbuf;
return PG_ASYNC_READ_SUCCESS;
}
}
PGAsyncWriteResult
walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
{
int result;
/* If we aren't in non-blocking mode, switch to it. */
if (!ensure_nonblocking_status(conn, true))
return PG_ASYNC_WRITE_FAIL;
/*
* The docs for PQputcopyData list the return values as: 1 if the data was
* queued, 0 if it was not queued because of full buffers, or -1 if an
* error occurred
*/
result = PQputCopyData(conn->pg_conn, buf, size);
/*
* We won't get a result of zero because walproposer always empties the
* connection's buffers before sending more
*/
Assert(result != 0);
switch (result)
{
case 1:
/* good -- continue */
break;
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
elog(FATAL, "invalid return %d from PQputCopyData", result);
}
/*
* After queueing the data, we still need to flush to get it to send. This
* might take multiple tries, but we don't want to wait around until it's
* done.
*
* PQflush has the following returns (directly quoting the docs): 0 if
* sucessful, 1 if it was unable to send all the data in the send queue
* yet -1 if it failed for some reason
*/
switch (result = PQflush(conn->pg_conn))
{
case 0:
return PG_ASYNC_WRITE_SUCCESS;
case 1:
return PG_ASYNC_WRITE_TRY_FLUSH;
case -1:
return PG_ASYNC_WRITE_FAIL;
default:
elog(FATAL, "invalid return %d from PQflush", result);
}
}
/*
* This function is very similar to walprop_async_write. For more
* information, refer to the comments there.
*/
bool
walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
{
int result;
/* If we are in non-blocking mode, switch out of it. */
if (!ensure_nonblocking_status(conn, false))
return false;
if ((result = PQputCopyData(conn->pg_conn, buf, size)) == -1)
return false;
Assert(result == 1);
/* Because the connection is non-blocking, flushing returns 0 or -1 */
if ((result = PQflush(conn->pg_conn)) == -1)
return false;
Assert(result == 0);
return true;
}

View File

@@ -18,6 +18,10 @@ extern char *neon_auth_token;
extern char *neon_timeline;
extern char *neon_tenant;
extern char *wal_acceptors_list;
extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connection_timeout;
extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void);
@@ -30,4 +34,10 @@ extern void pg_init_extension_server(void);
extern bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
extern uint64 BackpressureThrottlingTime(void);
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
extern void PGDLLEXPORT WalProposerMain(Datum main_arg);
#endif /* NEON_H */

116
pgxn/neon/neon_utils.c Normal file
View File

@@ -0,0 +1,116 @@
#include "postgres.h"
#include "access/timeline.h"
#include "access/xlogutils.h"
#include "common/logging.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "postmaster/interrupt.h"
#include "replication/slot.h"
#include "replication/walsender_private.h"
#include "storage/ipc.h"
#include "utils/builtins.h"
#include "utils/ps_status.h"
#include "libpq-fe.h"
#include <netinet/tcp.h>
#include <unistd.h>
#if PG_VERSION_NUM >= 150000
#include "access/xlogutils.h"
#include "access/xlogrecovery.h"
#endif
#if PG_MAJORVERSION_NUM >= 16
#include "utils/guc.h"
#endif
/*
* Convert a character which represents a hexadecimal digit to an integer.
*
* Returns -1 if the character is not a hexadecimal digit.
*/
int
HexDecodeChar(char c)
{
if (c >= '0' && c <= '9')
return c - '0';
if (c >= 'a' && c <= 'f')
return c - 'a' + 10;
if (c >= 'A' && c <= 'F')
return c - 'A' + 10;
return -1;
}
/*
* Decode a hex string into a byte string, 2 hex chars per byte.
*
* Returns false if invalid characters are encountered; otherwise true.
*/
bool
HexDecodeString(uint8 *result, char *input, int nbytes)
{
int i;
for (i = 0; i < nbytes; ++i)
{
int n1 = HexDecodeChar(input[i * 2]);
int n2 = HexDecodeChar(input[i * 2 + 1]);
if (n1 < 0 || n2 < 0)
return false;
result[i] = n1 * 16 + n2;
}
return true;
}
/* --------------------------------
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint32
pq_getmsgint32_le(StringInfo msg)
{
uint32 n32;
pq_copymsgbytes(msg, (char *) &n32, sizeof(n32));
return n32;
}
/* --------------------------------
* pq_getmsgint64 - get a binary 8-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint64
pq_getmsgint64_le(StringInfo msg)
{
uint64 n64;
pq_copymsgbytes(msg, (char *) &n64, sizeof(n64));
return n64;
}
/* append a binary [u]int32 to a StringInfo buffer in native (LE) order */
void
pq_sendint32_le(StringInfo buf, uint32 i)
{
enlargeStringInfo(buf, sizeof(uint32));
memcpy(buf->data + buf->len, &i, sizeof(uint32));
buf->len += sizeof(uint32);
}
/* append a binary [u]int64 to a StringInfo buffer in native (LE) order */
void
pq_sendint64_le(StringInfo buf, uint64 i)
{
enlargeStringInfo(buf, sizeof(uint64));
memcpy(buf->data + buf->len, &i, sizeof(uint64));
buf->len += sizeof(uint64);
}

12
pgxn/neon/neon_utils.h Normal file
View File

@@ -0,0 +1,12 @@
#ifndef __NEON_UTILS_H__
#define __NEON_UTILS_H__
#include "postgres.h"
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
void pq_sendint32_le(StringInfo buf, uint32 i);
void pq_sendint64_le(StringInfo buf, uint64 i);
#endif /* __NEON_UTILS_H__ */

File diff suppressed because it is too large Load Diff

View File

@@ -1,8 +1,8 @@
#ifndef __NEON_WALPROPOSER_H__
#define __NEON_WALPROPOSER_H__
#include "access/xlogdefs.h"
#include "postgres.h"
#include "access/xlogdefs.h"
#include "port.h"
#include "access/xlog_internal.h"
#include "access/transam.h"
@@ -16,29 +16,15 @@
#define MAX_SAFEKEEPERS 32
#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single* WAL
* message */
#define XLOG_HDR_SIZE (1 + 8 * 3) /* 'w' + startPos + walEnd + timestamp */
#define XLOG_HDR_START_POS 1 /* offset of start position in wal sender*
* message header */
#define XLOG_HDR_END_POS (1 + 8) /* offset of end position in wal sender*
* message header */
/*
* In the spirit of WL_SOCKET_READABLE and others, this corresponds to no events having occurred,
* because all WL_* events are given flags equal to some (1 << i), starting from i = 0
*/
#define WL_NO_EVENTS 0
extern char *wal_acceptors_list;
extern int wal_acceptor_reconnect_timeout;
extern int wal_acceptor_connection_timeout;
extern bool am_wal_proposer;
struct WalProposerConn; /* Defined in libpqwalproposer */
struct WalProposerConn; /* Defined in implementation (walprop_pg.c) */
typedef struct WalProposerConn WalProposerConn;
struct WalMessage;
typedef struct WalMessage WalMessage;
/* Possible return values from ReadPGAsync */
typedef enum
{
@@ -52,7 +38,7 @@ typedef enum
PG_ASYNC_READ_TRY_AGAIN,
/* Reading failed. Check PQerrorMessage(conn) */
PG_ASYNC_READ_FAIL,
} PGAsyncReadResult;
} PGAsyncReadResult;
/* Possible return values from WritePGAsync */
typedef enum
@@ -71,7 +57,7 @@ typedef enum
PG_ASYNC_WRITE_TRY_FLUSH,
/* Writing failed. Check PQerrorMessage(conn) */
PG_ASYNC_WRITE_FAIL,
} PGAsyncWriteResult;
} PGAsyncWriteResult;
/*
* WAL safekeeper state, which is used to wait for some event.
@@ -147,7 +133,7 @@ typedef enum
* to read.
*/
SS_ACTIVE,
} SafekeeperState;
} SafekeeperState;
/* Consensus logical timestamp. */
typedef uint64 term_t;
@@ -171,12 +157,12 @@ typedef struct ProposerGreeting
uint8 tenant_id[16];
TimeLineID timeline;
uint32 walSegSize;
} ProposerGreeting;
} ProposerGreeting;
typedef struct AcceptorProposerMessage
{
uint64 tag;
} AcceptorProposerMessage;
} AcceptorProposerMessage;
/*
* Acceptor -> Proposer initial response: the highest term acceptor voted for.
@@ -186,7 +172,7 @@ typedef struct AcceptorGreeting
AcceptorProposerMessage apm;
term_t term;
NNodeId nodeId;
} AcceptorGreeting;
} AcceptorGreeting;
/*
* Proposer -> Acceptor vote request.
@@ -196,20 +182,20 @@ typedef struct VoteRequest
uint64 tag;
term_t term;
pg_uuid_t proposerId; /* for monitoring/debugging */
} VoteRequest;
} VoteRequest;
/* Element of term switching chain. */
typedef struct TermSwitchEntry
{
term_t term;
XLogRecPtr lsn;
} TermSwitchEntry;
} TermSwitchEntry;
typedef struct TermHistory
{
uint32 n_entries;
TermSwitchEntry *entries;
} TermHistory;
} TermHistory;
/* Vote itself, sent from safekeeper to proposer */
typedef struct VoteResponse
@@ -227,7 +213,7 @@ typedef struct VoteResponse
* recovery of some safekeeper */
TermHistory termHistory;
XLogRecPtr timelineStartLsn; /* timeline globally starts at this LSN */
} VoteResponse;
} VoteResponse;
/*
* Proposer -> Acceptor message announcing proposer is elected and communicating
@@ -243,7 +229,7 @@ typedef struct ProposerElected
TermHistory *termHistory;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
} ProposerElected;
} ProposerElected;
/*
* Header of request with WAL message sent from proposer to safekeeper.
@@ -268,7 +254,7 @@ typedef struct AppendRequestHeader
*/
XLogRecPtr truncateLsn;
pg_uuid_t proposerId; /* for monitoring/debugging */
} AppendRequestHeader;
} AppendRequestHeader;
/*
* Hot standby feedback received from replica
@@ -278,7 +264,7 @@ typedef struct HotStandbyFeedback
TimestampTz ts;
FullTransactionId xmin;
FullTransactionId catalog_xmin;
} HotStandbyFeedback;
} HotStandbyFeedback;
typedef struct PageserverFeedback
{
@@ -289,7 +275,7 @@ typedef struct PageserverFeedback
XLogRecPtr disk_consistent_lsn;
XLogRecPtr remote_consistent_lsn;
TimestampTz replytime;
} PageserverFeedback;
} PageserverFeedback;
typedef struct WalproposerShmemState
{
@@ -297,7 +283,7 @@ typedef struct WalproposerShmemState
PageserverFeedback feedback;
term_t mineLastElectedTerm;
pg_atomic_uint64 backpressureThrottlingTime;
} WalproposerShmemState;
} WalproposerShmemState;
/*
* Report safekeeper state to proposer
@@ -321,17 +307,22 @@ typedef struct AppendResponse
/* and custom neon feedback. */
/* This part of the message is extensible. */
PageserverFeedback rf;
} AppendResponse;
} AppendResponse;
/* PageserverFeedback is extensible part of the message that is parsed separately */
/* Other fields are fixed part */
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
struct WalProposer;
typedef struct WalProposer WalProposer;
/*
* Descriptor of safekeeper
*/
typedef struct Safekeeper
{
WalProposer *wp;
char const *host;
char const *port;
@@ -340,7 +331,7 @@ typedef struct Safekeeper
*
* May contain private information like password and should not be logged.
*/
char conninfo[MAXCONNINFO];
char conninfo[MAXCONNINFO];
/*
* postgres protocol connection to the WAL acceptor
@@ -373,27 +364,12 @@ typedef struct Safekeeper
int eventPos; /* position in wait event set. Equal to -1 if*
* no event */
SafekeeperState state; /* safekeeper state machine state */
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
TimestampTz latestMsgReceivedAt; /* when latest msg is received */
AcceptorGreeting greetResponse; /* acceptor greeting */
VoteResponse voteResponse; /* the vote */
AppendResponse appendResponse; /* feedback for master */
} Safekeeper;
extern void PGDLLEXPORT WalProposerSync(int argc, char *argv[]);
extern void PGDLLEXPORT WalProposerMain(Datum main_arg);
extern void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos);
extern void WalProposerPoll(void);
extern void ParsePageserverFeedbackMessage(StringInfo reply_message,
PageserverFeedback *rf);
extern void StartProposerReplication(StartReplicationCmd *cmd);
extern Size WalproposerShmemSize(void);
extern bool WalproposerShmemInit(void);
extern void replication_feedback_set(PageserverFeedback *rf);
extern void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
/* libpqwalproposer hooks & helper type */
/* Re-exported PostgresPollingStatusType */
typedef enum
{
@@ -406,7 +382,7 @@ typedef enum
* 'libpq-fe.h' still has PGRES_POLLING_ACTIVE, but says it's unused.
* We've removed it here to avoid clutter.
*/
} WalProposerConnectPollStatusType;
} WalProposerConnectPollStatusType;
/* Re-exported and modified ExecStatusType */
typedef enum
@@ -431,7 +407,7 @@ typedef enum
WP_EXEC_NEEDS_INPUT,
/* Catch-all failure. Check PQerrorMessage. */
WP_EXEC_FAILED,
} WalProposerExecStatusType;
} WalProposerExecStatusType;
/* Re-exported ConnStatusType */
typedef enum
@@ -445,67 +421,252 @@ typedef enum
* that extra functionality, so we collect them into a single tag here.
*/
WP_CONNECTION_IN_PROGRESS,
} WalProposerConnStatusType;
/* Re-exported PQerrorMessage */
extern char *walprop_error_message(WalProposerConn *conn);
/* Re-exported PQstatus */
extern WalProposerConnStatusType walprop_status(WalProposerConn *conn);
/* Re-exported PQconnectStart */
extern WalProposerConn * walprop_connect_start(char *conninfo, char *password);
/* Re-exported PQconectPoll */
extern WalProposerConnectPollStatusType walprop_connect_poll(WalProposerConn *conn);
/* Blocking wrapper around PQsendQuery */
extern bool walprop_send_query(WalProposerConn *conn, char *query);
/* Wrapper around PQconsumeInput + PQisBusy + PQgetResult */
extern WalProposerExecStatusType walprop_get_query_result(WalProposerConn *conn);
/* Re-exported PQsocket */
extern pgsocket walprop_socket(WalProposerConn *conn);
/* Wrapper around PQconsumeInput (if socket's read-ready) + PQflush */
extern int walprop_flush(WalProposerConn *conn);
/* Re-exported PQfinish */
extern void walprop_finish(WalProposerConn *conn);
} WalProposerConnStatusType;
/*
* Ergonomic wrapper around PGgetCopyData
*
* Reads a CopyData block from a safekeeper, setting *amount to the number
* of bytes returned.
*
* This function is allowed to assume certain properties specific to the
* protocol with the safekeepers, so it should not be used as-is for any
* other purpose.
*
* Note: If possible, using <AsyncRead> is generally preferred, because it
* performs a bit of extra checking work that's always required and is normally
* somewhat verbose.
* Collection of hooks for walproposer, to call postgres functions,
* read WAL and send it over the network.
*/
extern PGAsyncReadResult walprop_async_read(WalProposerConn *conn, char **buf, int *amount);
typedef struct walproposer_api
{
/*
* Get WalproposerShmemState. This is used to store information about last
* elected term.
*/
WalproposerShmemState *(*get_shmem_state) (void);
/*
* Start receiving notifications about new WAL. This is an infinite loop
* which calls WalProposerBroadcast() and WalProposerPoll() to send the
* WAL.
*/
void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos);
/* Get pointer to the latest available WAL. */
XLogRecPtr (*get_flush_rec_ptr) (void);
/* Get current time. */
TimestampTz (*get_current_timestamp) (void);
/* Get postgres timeline. */
TimeLineID (*get_timeline_id) (void);
/* Current error message, aka PQerrorMessage. */
char *(*conn_error_message) (WalProposerConn *conn);
/* Connection status, aka PQstatus. */
WalProposerConnStatusType (*conn_status) (WalProposerConn *conn);
/* Start the connection, aka PQconnectStart. */
WalProposerConn *(*conn_connect_start) (char *conninfo);
/* Poll an asynchronous connection, aka PQconnectPoll. */
WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn *conn);
/* Send a blocking SQL query, aka PQsendQuery. */
bool (*conn_send_query) (WalProposerConn *conn, char *query);
/* Read the query result, aka PQgetResult. */
WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn *conn);
/* Flush buffer to the network, aka PQflush. */
int (*conn_flush) (WalProposerConn *conn);
/* Close the connection, aka PQfinish. */
void (*conn_finish) (WalProposerConn *conn);
/* Try to read CopyData message, aka PQgetCopyData. */
PGAsyncReadResult (*conn_async_read) (WalProposerConn *conn, char **buf, int *amount);
/* Try to write CopyData message, aka PQputCopyData. */
PGAsyncWriteResult (*conn_async_write) (WalProposerConn *conn, void const *buf, size_t size);
/* Blocking CopyData write, aka PQputCopyData + PQflush. */
bool (*conn_blocking_write) (WalProposerConn *conn, void const *buf, size_t size);
/* Download WAL from startpos to endpos and make it available locally. */
bool (*recovery_download) (Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos);
/* Read WAL from disk to buf. */
void (*wal_read) (XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count);
/* Allocate WAL reader. */
XLogReaderState *(*wal_reader_allocate) (void);
/* Deallocate event set. */
void (*free_event_set) (void);
/* Initialize event set. */
void (*init_event_set) (int n_safekeepers);
/* Update events for an existing safekeeper connection. */
void (*update_event_set) (Safekeeper *sk, uint32 events);
/* Add a new safekeeper connection to the event set. */
void (*add_safekeeper_event_set) (Safekeeper *sk, uint32 events);
/*
* Wait until some event happens: - timeout is reached - socket event for
* safekeeper connection - new WAL is available
*
* Returns 0 if timeout is reached, 1 if some event happened. Updates
* events mask to indicate events and sets sk to the safekeeper which has
* an event.
*/
int (*wait_event_set) (long timeout, Safekeeper **sk, uint32 *events);
/* Read random bytes. */
bool (*strong_random) (void *buf, size_t len);
/*
* Get a basebackup LSN. Used to cross-validate with the latest available
* LSN on the safekeepers.
*/
XLogRecPtr (*get_redo_start_lsn) (void);
/*
* Finish sync safekeepers with the given LSN. This function should not
* return and should exit the program.
*/
void (*finish_sync_safekeepers) (XLogRecPtr lsn);
/*
* Called after every new message from the safekeeper. Used to propagate
* backpressure feedback and to confirm WAL persistence (has been commited
* on the quorum of safekeepers).
*/
void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn);
/*
* Called on peer_horizon_lsn updates. Used to advance replication slot
* and to free up disk space by deleting unnecessary WAL.
*/
void (*confirm_wal_streamed) (XLogRecPtr lsn);
} walproposer_api;
/*
* Ergonomic wrapper around PQputCopyData + PQflush
*
* Starts to write a CopyData block to a safekeeper.
*
* For information on the meaning of return codes, refer to PGAsyncWriteResult.
* Configuration of the WAL proposer.
*/
extern PGAsyncWriteResult walprop_async_write(WalProposerConn *conn, void const *buf, size_t size);
typedef struct WalProposerConfig
{
/* hex-encoded TenantId cstr */
char *neon_tenant;
/* hex-encoded TimelineId cstr */
char *neon_timeline;
/*
* Comma-separated list of safekeepers, in the following format:
* host1:port1,host2:port2,host3:port3
*
* This cstr should be editable.
*/
char *safekeepers_list;
/*
* WalProposer reconnects to offline safekeepers once in this interval.
* Time is in milliseconds.
*/
int safekeeper_reconnect_timeout;
/*
* WalProposer terminates the connection if it doesn't receive any message
* from the safekeeper in this interval. Time is in milliseconds.
*/
int safekeeper_connection_timeout;
/*
* WAL segment size. Will be passed to safekeepers in greet request. Also
* used to detect page headers.
*/
int wal_segment_size;
/*
* If safekeeper was started in sync mode, walproposer will not subscribe
* for new WAL and will exit when quorum of safekeepers will be synced to
* the latest available LSN.
*/
bool syncSafekeepers;
/* Will be passed to safekeepers in greet request. */
uint64 systemId;
} WalProposerConfig;
/*
* Blocking equivalent to walprop_async_write_fn
*
* Returns 'true' if successful, 'false' on failure.
* WAL proposer state.
*/
extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size);
typedef struct WalProposer
{
WalProposerConfig *config;
int n_safekeepers;
extern uint64 BackpressureThrottlingTime(void);
/* (n_safekeepers / 2) + 1 */
int quorum;
Safekeeper safekeeper[MAX_SAFEKEEPERS];
/* WAL has been generated up to this point */
XLogRecPtr availableLsn;
/* last commitLsn broadcasted to safekeepers */
XLogRecPtr lastSentCommitLsn;
ProposerGreeting greetRequest;
/* Vote request for safekeeper */
VoteRequest voteRequest;
/*
* Minimal LSN which may be needed for recovery of some safekeeper,
* record-aligned (first record which might not yet received by someone).
*/
XLogRecPtr truncateLsn;
/*
* Term of the proposer. We want our term to be highest and unique, so we
* collect terms from safekeepers quorum, choose max and +1. After that
* our term is fixed and must not change. If we observe that some
* safekeeper has higher term, it means that we have another running
* compute, so we must stop immediately.
*/
term_t propTerm;
/* term history of the proposer */
TermHistory propTermHistory;
/* epoch start lsn of the proposer */
XLogRecPtr propEpochStartLsn;
/* Most advanced acceptor epoch */
term_t donorEpoch;
/* Most advanced acceptor */
int donor;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;
/* number of votes collected from safekeepers */
int n_votes;
/* number of successful connections over the lifetime of walproposer */
int n_connected;
/*
* Timestamp of the last reconnection attempt. Related to
* config->safekeeper_reconnect_timeout
*/
TimestampTz last_reconnect_attempt;
walproposer_api api;
} WalProposer;
extern WalProposer *WalProposerCreate(WalProposerConfig *config, walproposer_api api);
extern void WalProposerStart(WalProposer *wp);
extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPtr endpos);
extern void WalProposerPoll(WalProposer *wp);
extern void ParsePageserverFeedbackMessage(StringInfo reply_message,
PageserverFeedback *rf);
#endif /* __NEON_WALPROPOSER_H__ */

1667
pgxn/neon/walproposer_pg.c Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,659 +0,0 @@
#include "postgres.h"
#include "access/timeline.h"
#include "access/xlogutils.h"
#include "common/logging.h"
#include "common/ip.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "postmaster/interrupt.h"
#include "replication/slot.h"
#include "walproposer_utils.h"
#include "replication/walsender_private.h"
#include "storage/ipc.h"
#include "utils/builtins.h"
#include "utils/ps_status.h"
#include "libpq-fe.h"
#include <netinet/tcp.h>
#include <unistd.h>
#if PG_VERSION_NUM >= 150000
#include "access/xlogutils.h"
#include "access/xlogrecovery.h"
#endif
#if PG_MAJORVERSION_NUM >= 16
#include "utils/guc.h"
#endif
/*
* These variables are used similarly to openLogFile/SegNo,
* but for walproposer to write the XLOG during recovery. walpropFileTLI is the TimeLineID
* corresponding the filename of walpropFile.
*/
static int walpropFile = -1;
static TimeLineID walpropFileTLI = 0;
static XLogSegNo walpropSegNo = 0;
/* START cloned file-local variables and functions from walsender.c */
/*
* How far have we sent WAL already? This is also advertised in
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
static void WalSndLoop(void);
static void XLogBroadcastWalProposer(void);
/* END cloned file-level variables and functions from walsender.c */
int
CompareLsn(const void *a, const void *b)
{
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
if (lsn1 < lsn2)
return -1;
else if (lsn1 == lsn2)
return 0;
else
return 1;
}
/* Returns a human-readable string corresonding to the SafekeeperState
*
* The string should not be freed.
*
* The strings are intended to be used as a prefix to "state", e.g.:
*
* elog(LOG, "currently in %s state", FormatSafekeeperState(sk->state));
*
* If this sort of phrasing doesn't fit the message, instead use something like:
*
* elog(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state));
*/
char *
FormatSafekeeperState(SafekeeperState state)
{
char *return_val = NULL;
switch (state)
{
case SS_OFFLINE:
return_val = "offline";
break;
case SS_CONNECTING_READ:
case SS_CONNECTING_WRITE:
return_val = "connecting";
break;
case SS_WAIT_EXEC_RESULT:
return_val = "receiving query result";
break;
case SS_HANDSHAKE_RECV:
return_val = "handshake (receiving)";
break;
case SS_VOTING:
return_val = "voting";
break;
case SS_WAIT_VERDICT:
return_val = "wait-for-verdict";
break;
case SS_SEND_ELECTED_FLUSH:
return_val = "send-announcement-flush";
break;
case SS_IDLE:
return_val = "idle";
break;
case SS_ACTIVE:
return_val = "active";
break;
}
Assert(return_val != NULL);
return return_val;
}
/* Asserts that the provided events are expected for given safekeeper's state */
void
AssertEventsOkForState(uint32 events, Safekeeper *sk)
{
uint32 expected = SafekeeperStateDesiredEvents(sk->state);
/*
* The events are in-line with what we're expecting, under two conditions:
* (a) if we aren't expecting anything, `events` has no read- or
* write-ready component. (b) if we are expecting something, there's
* overlap (i.e. `events & expected != 0`)
*/
bool events_ok_for_state; /* long name so the `Assert` is more
* clear later */
if (expected == WL_NO_EVENTS)
events_ok_for_state = ((events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) == 0);
else
events_ok_for_state = ((events & expected) != 0);
if (!events_ok_for_state)
{
/*
* To give a descriptive message in the case of failure, we use elog
* and then an assertion that's guaranteed to fail.
*/
elog(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]",
FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state));
Assert(events_ok_for_state);
}
}
/* Returns the set of events a safekeeper in this state should be waiting on
*
* This will return WL_NO_EVENTS (= 0) for some events. */
uint32
SafekeeperStateDesiredEvents(SafekeeperState state)
{
uint32 result = WL_NO_EVENTS;
/* If the state doesn't have a modifier, we can check the base state */
switch (state)
{
/* Connecting states say what they want in the name */
case SS_CONNECTING_READ:
result = WL_SOCKET_READABLE;
break;
case SS_CONNECTING_WRITE:
result = WL_SOCKET_WRITEABLE;
break;
/* Reading states need the socket to be read-ready to continue */
case SS_WAIT_EXEC_RESULT:
case SS_HANDSHAKE_RECV:
case SS_WAIT_VERDICT:
result = WL_SOCKET_READABLE;
break;
/*
* Idle states use read-readiness as a sign that the connection
* has been disconnected.
*/
case SS_VOTING:
case SS_IDLE:
result = WL_SOCKET_READABLE;
break;
/*
* Flush states require write-ready for flushing. Active state
* does both reading and writing.
*
* TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We
* should check sk->flushWrite here to set WL_SOCKET_WRITEABLE.
*/
case SS_SEND_ELECTED_FLUSH:
case SS_ACTIVE:
result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE;
break;
/* The offline state expects no events. */
case SS_OFFLINE:
result = WL_NO_EVENTS;
break;
default:
Assert(false);
break;
}
return result;
}
/* Returns a human-readable string corresponding to the event set
*
* If the events do not correspond to something set as the `events` field of a `WaitEvent`, the
* returned string may be meaingless.
*
* The string should not be freed. It should also not be expected to remain the same between
* function calls. */
char *
FormatEvents(uint32 events)
{
static char return_str[8];
/* Helper variable to check if there's extra bits */
uint32 all_flags = WL_LATCH_SET
| WL_SOCKET_READABLE
| WL_SOCKET_WRITEABLE
| WL_TIMEOUT
| WL_POSTMASTER_DEATH
| WL_EXIT_ON_PM_DEATH
| WL_SOCKET_CONNECTED;
/*
* The formatting here isn't supposed to be *particularly* useful -- it's
* just to give an sense of what events have been triggered without
* needing to remember your powers of two.
*/
return_str[0] = (events & WL_LATCH_SET) ? 'L' : '_';
return_str[1] = (events & WL_SOCKET_READABLE) ? 'R' : '_';
return_str[2] = (events & WL_SOCKET_WRITEABLE) ? 'W' : '_';
return_str[3] = (events & WL_TIMEOUT) ? 'T' : '_';
return_str[4] = (events & WL_POSTMASTER_DEATH) ? 'D' : '_';
return_str[5] = (events & WL_EXIT_ON_PM_DEATH) ? 'E' : '_';
return_str[5] = (events & WL_SOCKET_CONNECTED) ? 'C' : '_';
if (events & (~all_flags))
{
elog(WARNING, "Event formatting found unexpected component %d",
events & (~all_flags));
return_str[6] = '*';
return_str[7] = '\0';
}
else
return_str[6] = '\0';
return (char *) &return_str;
}
/*
* Convert a character which represents a hexadecimal digit to an integer.
*
* Returns -1 if the character is not a hexadecimal digit.
*/
static int
HexDecodeChar(char c)
{
if (c >= '0' && c <= '9')
return c - '0';
if (c >= 'a' && c <= 'f')
return c - 'a' + 10;
if (c >= 'A' && c <= 'F')
return c - 'A' + 10;
return -1;
}
/*
* Decode a hex string into a byte string, 2 hex chars per byte.
*
* Returns false if invalid characters are encountered; otherwise true.
*/
bool
HexDecodeString(uint8 *result, char *input, int nbytes)
{
int i;
for (i = 0; i < nbytes; ++i)
{
int n1 = HexDecodeChar(input[i * 2]);
int n2 = HexDecodeChar(input[i * 2 + 1]);
if (n1 < 0 || n2 < 0)
return false;
result[i] = n1 * 16 + n2;
}
return true;
}
/* --------------------------------
* pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint32
pq_getmsgint32_le(StringInfo msg)
{
uint32 n32;
pq_copymsgbytes(msg, (char *) &n32, sizeof(n32));
return n32;
}
/* --------------------------------
* pq_getmsgint64 - get a binary 8-byte int from a message buffer in native (LE) order
* --------------------------------
*/
uint64
pq_getmsgint64_le(StringInfo msg)
{
uint64 n64;
pq_copymsgbytes(msg, (char *) &n64, sizeof(n64));
return n64;
}
/* append a binary [u]int32 to a StringInfo buffer in native (LE) order */
void
pq_sendint32_le(StringInfo buf, uint32 i)
{
enlargeStringInfo(buf, sizeof(uint32));
memcpy(buf->data + buf->len, &i, sizeof(uint32));
buf->len += sizeof(uint32);
}
/* append a binary [u]int64 to a StringInfo buffer in native (LE) order */
void
pq_sendint64_le(StringInfo buf, uint64 i)
{
enlargeStringInfo(buf, sizeof(uint64));
memcpy(buf->data + buf->len, &i, sizeof(uint64));
buf->len += sizeof(uint64);
}
/*
* Write XLOG data to disk.
*/
void
XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr)
{
int startoff;
int byteswritten;
while (nbytes > 0)
{
int segbytes;
/* Close the current segment if it's completed */
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
XLogWalPropClose(recptr);
if (walpropFile < 0)
{
#if PG_VERSION_NUM >= 150000
/* FIXME Is it ok to use hardcoded value here? */
TimeLineID tli = 1;
#else
bool use_existent = true;
#endif
/* Create/use new log file */
XLByteToSeg(recptr, walpropSegNo, wal_segment_size);
#if PG_VERSION_NUM >= 150000
walpropFile = XLogFileInit(walpropSegNo, tli);
walpropFileTLI = tli;
#else
walpropFile = XLogFileInit(walpropSegNo, &use_existent, false);
walpropFileTLI = ThisTimeLineID;
#endif
}
/* Calculate the start offset of the received logs */
startoff = XLogSegmentOffset(recptr, wal_segment_size);
if (startoff + nbytes > wal_segment_size)
segbytes = wal_segment_size - startoff;
else
segbytes = nbytes;
/* OK to write the logs */
errno = 0;
byteswritten = pg_pwrite(walpropFile, buf, segbytes, (off_t) startoff);
if (byteswritten <= 0)
{
char xlogfname[MAXFNAMELEN];
int save_errno;
/* if write didn't set errno, assume no disk space */
if (errno == 0)
errno = ENOSPC;
save_errno = errno;
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
errno = save_errno;
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not write to log segment %s "
"at offset %u, length %lu: %m",
xlogfname, startoff, (unsigned long) segbytes)));
}
/* Update state for write */
recptr += byteswritten;
nbytes -= byteswritten;
buf += byteswritten;
}
/*
* Close the current segment if it's fully written up in the last cycle of
* the loop.
*/
if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size))
{
XLogWalPropClose(recptr);
}
}
/*
* Close the current segment.
*/
void
XLogWalPropClose(XLogRecPtr recptr)
{
Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size));
if (close(walpropFile) != 0)
{
char xlogfname[MAXFNAMELEN];
XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size);
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not close log segment %s: %m",
xlogfname)));
}
walpropFile = -1;
}
/* START of cloned functions from walsender.c */
/*
* Subscribe for new WAL and stream it in the loop to safekeepers.
*
* At the moment, this never returns, but an ereport(ERROR) will take us back
* to the main loop.
*/
void
StartProposerReplication(StartReplicationCmd *cmd)
{
XLogRecPtr FlushPtr;
TimeLineID currTLI;
#if PG_VERSION_NUM < 150000
if (ThisTimeLineID == 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
#endif
/*
* We assume here that we're logging enough information in the WAL for
* log-shipping, since this is checked in PostmasterMain().
*
* NOTE: wal_level can only change at shutdown, so in most cases it is
* difficult for there to be WAL data that we can still see that was
* written at wal_level='minimal'.
*/
if (cmd->slotname)
{
ReplicationSlotAcquire(cmd->slotname, true);
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use a logical replication slot for physical replication")));
/*
* We don't need to verify the slot's restart_lsn here; instead we
* rely on the caller requesting the starting point to use. If the
* WAL segment doesn't exist, we'll fail later.
*/
}
/*
* Select the timeline. If it was given explicitly by the client, use
* that. Otherwise use the timeline of the last replayed record, which is
* kept in ThisTimeLineID.
*
* Neon doesn't currently use PG Timelines, but it may in the future, so
* we keep this code around to lighten the load for when we need it.
*/
#if PG_VERSION_NUM >= 150000
FlushPtr = GetFlushRecPtr(&currTLI);
#else
FlushPtr = GetFlushRecPtr();
currTLI = ThisTimeLineID;
#endif
/*
* When we first start replication the standby will be behind the
* primary. For some applications, for example synchronous
* replication, it is important to have a clear state for this initial
* catchup mode, so we can trigger actions when we change streaming
* state later. We may stay in this state for a long time, which is
* exactly why we want to be able to monitor whether or not we are
* still here.
*/
WalSndSetState(WALSNDSTATE_CATCHUP);
/*
* Don't allow a request to stream from a future point in WAL that
* hasn't been flushed to disk in this server yet.
*/
if (FlushPtr < cmd->startpoint)
{
ereport(ERROR,
(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
LSN_FORMAT_ARGS(cmd->startpoint),
LSN_FORMAT_ARGS(FlushPtr))));
}
/* Start streaming from the requested point */
sentPtr = cmd->startpoint;
/* Initialize shared memory status, too */
SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sentPtr = sentPtr;
SpinLockRelease(&MyWalSnd->mutex);
SyncRepInitConfig();
/* Infinite send loop, never returns */
WalSndLoop();
WalSndSetState(WALSNDSTATE_STARTUP);
if (cmd->slotname)
ReplicationSlotRelease();
}
/*
* Main loop that waits for LSN updates and calls the walproposer.
* Synchronous replication sets latch in WalSndWakeup at walsender.c
*/
static void
WalSndLoop(void)
{
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
for (;;)
{
CHECK_FOR_INTERRUPTS();
XLogBroadcastWalProposer();
if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
WalSndSetState(WALSNDSTATE_STREAMING);
WalProposerPoll();
}
}
/*
* Notify walproposer about the new WAL position.
*/
static void
XLogBroadcastWalProposer(void)
{
XLogRecPtr startptr;
XLogRecPtr endptr;
/* Start from the last sent position */
startptr = sentPtr;
/*
* Streaming the current timeline on a primary.
*
* Attempt to send all data that's already been written out and
* fsync'd to disk. We cannot go further than what's been written out
* given the current implementation of WALRead(). And in any case
* it's unsafe to send WAL that is not securely down to disk on the
* primary: if the primary subsequently crashes and restarts, standbys
* must not have applied any WAL that got lost on the primary.
*/
#if PG_VERSION_NUM >= 150000
endptr = GetFlushRecPtr(NULL);
#else
endptr = GetFlushRecPtr();
#endif
/*
* Record the current system time as an approximation of the time at which
* this WAL location was written for the purposes of lag tracking.
*
* In theory we could make XLogFlush() record a time in shmem whenever WAL
* is flushed and we could get that time as well as the LSN when we call
* GetFlushRecPtr() above (and likewise for the cascading standby
* equivalent), but rather than putting any new code into the hot WAL path
* it seems good enough to capture the time here. We should reach this
* after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
* may take some time, we read the WAL flush pointer and take the time
* very close to together here so that we'll get a later position if it is
* still moving.
*
* Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
* this gives us a cheap approximation for the WAL flush time for this
* LSN.
*
* Note that the LSN is not necessarily the LSN for the data contained in
* the present message; it's the end of the WAL, which might be further
* ahead. All the lag tracking machinery cares about is finding out when
* that arbitrary LSN is eventually reported as written, flushed and
* applied, so that it can measure the elapsed time.
*/
LagTrackerWrite(endptr, GetCurrentTimestamp());
/* Do we have any work to do? */
Assert(startptr <= endptr);
if (endptr <= startptr)
return;
WalProposerBroadcast(startptr, endptr);
sentPtr = endptr;
/* Update shared memory status */
{
WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
walsnd->sentPtr = sentPtr;
SpinLockRelease(&walsnd->mutex);
}
/* Report progress of XLOG streaming in PS display */
if (update_process_title)
{
char activitymsg[50];
snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
LSN_FORMAT_ARGS(sentPtr));
set_ps_display(activitymsg);
}
}

View File

@@ -1,19 +0,0 @@
#ifndef __NEON_WALPROPOSER_UTILS_H__
#define __NEON_WALPROPOSER_UTILS_H__
#include "walproposer.h"
int CompareLsn(const void *a, const void *b);
char *FormatSafekeeperState(SafekeeperState state);
void AssertEventsOkForState(uint32 events, Safekeeper *sk);
uint32 SafekeeperStateDesiredEvents(SafekeeperState state);
char *FormatEvents(uint32 events);
bool HexDecodeString(uint8 *result, char *input, int nbytes);
uint32 pq_getmsgint32_le(StringInfo msg);
uint64 pq_getmsgint64_le(StringInfo msg);
void pq_sendint32_le(StringInfo buf, uint32 i);
void pq_sendint64_le(StringInfo buf, uint64 i);
void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr);
void XLogWalPropClose(XLogRecPtr recptr);
#endif /* __NEON_WALPROPOSER_UTILS_H__ */

View File

@@ -100,7 +100,6 @@ pub struct SafekeeperData {
pub availability_zone_id: String,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub struct ProjectData {
pub id: ProjectId,
@@ -109,7 +108,6 @@ pub struct ProjectData {
pub platform_id: String,
pub user_id: String,
pub pageserver_id: u64,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub tenant: TenantId,
pub safekeepers: Vec<SafekeeperData>,
pub deleted: bool,
@@ -126,7 +124,6 @@ pub struct ProjectData {
pub maintenance_set: Option<String>,
}
#[serde_with::serde_as]
#[derive(Debug, serde::Deserialize)]
pub struct BranchData {
pub id: BranchId,
@@ -134,12 +131,10 @@ pub struct BranchData {
pub updated_at: DateTime<Utc>,
pub name: String,
pub project_id: ProjectId,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub timeline_id: TimelineId,
#[serde(default)]
pub parent_id: Option<BranchId>,
#[serde(default)]
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
pub parent_lsn: Option<Lsn>,
pub default: bool,
pub deleted: bool,

View File

@@ -13,7 +13,6 @@ use postgres_ffi::XLogSegNo;
use serde::Deserialize;
use serde::Serialize;
use serde_with::{serde_as, DisplayFromStr};
use utils::id::NodeId;
use utils::id::TenantTimelineId;
use utils::id::{TenantId, TimelineId};
@@ -74,12 +73,9 @@ pub struct Config {
pub wal_backup_enabled: bool,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
pub struct Timeline {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: TimelineId,
pub control_file: Option<SafeKeeperState>,
pub memory: Option<Memory>,

View File

@@ -4,7 +4,6 @@ use once_cell::sync::Lazy;
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::SkTimelineInfo;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::str::FromStr;
@@ -62,11 +61,9 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
/// Same as TermSwitchEntry, but serializes LSN using display serializer
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
pub struct TermSwitchApiEntry {
pub term: Term,
#[serde_as(as = "DisplayFromStr")]
pub lsn: Lsn,
}
@@ -79,28 +76,18 @@ pub struct AcceptorStateStatus {
}
/// Info about timeline on safekeeper ready for reporting.
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
pub struct TimelineStatus {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: TimelineId,
pub acceptor_state: AcceptorStateStatus,
pub pg_info: ServerInfo,
#[serde_as(as = "DisplayFromStr")]
pub flush_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub timeline_start_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub local_start_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub commit_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub backup_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub peer_horizon_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
pub peers: Vec<PeerInfo>,
pub walsenders: Vec<WalSenderState>,

View File

@@ -44,8 +44,11 @@ pub struct AppendLogicalMessage {
// fields from AppendRequestHeader
pub term: Term,
#[serde(with = "utils::lsn::serde_as_u64")]
pub epoch_start_lsn: Lsn,
#[serde(with = "utils::lsn::serde_as_u64")]
pub begin_lsn: Lsn,
#[serde(with = "utils::lsn::serde_as_u64")]
pub truncate_lsn: Lsn,
pub pg_version: u32,
}

View File

@@ -5,8 +5,6 @@ use tokio::io::AsyncWriteExt;
use tracing::info;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use serde_with::{serde_as, DisplayFromStr};
use crate::{
control_file, debug_dump,
http::routes::TimelineStatus,
@@ -15,12 +13,9 @@ use crate::{
};
/// Info about timeline on safekeeper ready for reporting.
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
pub struct Request {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: TimelineId,
pub http_hosts: Vec<String>,
}

View File

@@ -4,6 +4,7 @@ use anyhow::{bail, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use hex::FromHex;
use postgres_ffi::{TimeLineID, XLogSegNo, MAX_SEND_SIZE};
use serde::{Deserialize, Serialize};
use std::cmp::max;
@@ -281,6 +282,7 @@ impl SafeKeeperState {
/// Initial Proposer -> Acceptor message
#[derive(Debug, Deserialize)]
#[cfg_attr(test, derive(PartialEq, Serialize))]
pub struct ProposerGreeting {
/// proposer-acceptor protocol version
pub protocol_version: u32,
@@ -294,6 +296,46 @@ pub struct ProposerGreeting {
pub wal_seg_size: u32,
}
static EXAMPLE_PROPOSER_GREETING: &[u8] =
b"\x02\0\0\0\0q\x02\0\x80\xca+\x0e\xe8\x9e{\x94:b\xab\xe4\0\x1exo\0\0\0\0\0\0\0\0\xfa!\xa3\xc3\xa5s\x8d\xcc^\xd4\x1f\x8cA\x81\xb2\x13\x99\xcf:z& t\x82y\xbf\xee\x8aX\xad\r\xe1\x01\0\0\0\0\0\0\x01";
#[test]
fn serde_proposergreeting() {
let pg = ProposerGreeting::des(EXAMPLE_PROPOSER_GREETING).unwrap();
assert_eq!(
pg,
ProposerGreeting {
protocol_version: 2,
pg_version: 160000,
proposer_id: [128, 202, 43, 14, 232, 158, 123, 148, 58, 98, 171, 228, 0, 30, 120, 111],
system_id: 0,
timeline_id: TimelineId::from_hex("fa21a3c3a5738dcc5ed41f8c4181b213").unwrap(),
tenant_id: TenantId::from_hex("99cf3a7a2620748279bfee8a58ad0de1").unwrap(),
tli: 1,
wal_seg_size: 16777216
}
);
}
#[test]
fn ser_proposergreeting() {
let pg = ProposerGreeting {
protocol_version: 2,
pg_version: 160000,
proposer_id: [
128, 202, 43, 14, 232, 158, 123, 148, 58, 98, 171, 228, 0, 30, 120, 111,
],
system_id: 0,
timeline_id: TimelineId::from_hex("fa21a3c3a5738dcc5ed41f8c4181b213").unwrap(),
tenant_id: TenantId::from_hex("99cf3a7a2620748279bfee8a58ad0de1").unwrap(),
tli: 1,
wal_seg_size: 16777216,
};
assert_eq!(&pg.ser().unwrap(), EXAMPLE_PROPOSER_GREETING);
}
/// Acceptor -> Proposer initial response: the highest term known to me
/// (acceptor voted for).
#[derive(Debug, Serialize)]
@@ -402,12 +444,14 @@ impl ProposerAcceptorMessage {
/// Parse proposer message.
pub fn parse(msg_bytes: Bytes) -> Result<ProposerAcceptorMessage> {
// xxx using Reader is inefficient but easy to work with bincode
let raw = msg_bytes.clone();
let mut stream = msg_bytes.reader();
// u64 is here to avoid padding; it will be removed once we stop packing C structs into the wire as is
let tag = stream.read_u64::<LittleEndian>()? as u8 as char;
match tag {
'g' => {
let msg = ProposerGreeting::des_from(&mut stream)?;
tracing::info!("greeting in {raw:?}");
let msg = dbg!(ProposerGreeting::des_from(&mut stream))?;
Ok(ProposerAcceptorMessage::Greeting(msg))
}
'v' => {

View File

@@ -16,7 +16,6 @@ use postgres_ffi::get_current_timestamp;
use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use tokio::io::{AsyncRead, AsyncWrite};
use utils::id::TenantTimelineId;
use utils::lsn::AtomicLsn;
@@ -313,10 +312,8 @@ impl WalSendersShared {
}
// Serialized is used only for pretty printing in json.
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalSenderState {
#[serde_as(as = "DisplayFromStr")]
ttid: TenantTimelineId,
addr: SocketAddr,
conn_id: ConnectionId,

View File

@@ -5,10 +5,8 @@ use anyhow::{anyhow, bail, Result};
use camino::Utf8PathBuf;
use postgres_ffi::XLogSegNo;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use tokio::fs;
use serde_with::DisplayFromStr;
use std::cmp::max;
use std::sync::Arc;
use tokio::sync::{Mutex, MutexGuard};
@@ -41,20 +39,16 @@ use crate::SafeKeeperConf;
use crate::{debug_dump, wal_storage};
/// Things safekeeper should know about timeline state on peers.
#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
pub sk_id: NodeId,
/// Term of the last entry.
_last_log_term: Term,
/// LSN of the last record.
#[serde_as(as = "DisplayFromStr")]
_flush_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
pub commit_lsn: Lsn,
/// Since which LSN safekeeper has WAL. TODO: remove this once we fill new
/// sk since backup_lsn.
#[serde_as(as = "DisplayFromStr")]
pub local_start_lsn: Lsn,
/// When info was received. Serde annotations are not very useful but make
/// the code compile -- we don't rely on this field externally.
@@ -723,9 +717,9 @@ impl Timeline {
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
return Ok(()); // nothing to do
}
let remover = shared_state.sk.wal_store.remove_up_to(horizon_segno - 1);
// release the lock before removing
remover
shared_state.sk.wal_store.remove_up_to(horizon_segno - 1)
};
// delete old WAL files

View File

@@ -1085,15 +1085,32 @@ class AbstractNeonCli(abc.ABC):
stderr=subprocess.PIPE,
timeout=timeout,
)
indent = " "
if not res.returncode:
log.info(f"Run {res.args} success: {res.stdout}")
stripped = res.stdout.strip()
lines = stripped.splitlines()
if len(lines) < 2:
log.debug(f"Run {res.args} success: {stripped}")
else:
log.debug("Run %s success:\n%s" % (res.args, textwrap.indent(stripped, indent)))
elif check_return_code:
# this way command output will be in recorded and shown in CI in failure message
msg = f"""\
Run {res.args} failed:
stdout: {res.stdout}
stderr: {res.stderr}
indent = indent * 2
msg = textwrap.dedent(
"""\
Run %s failed:
stdout:
%s
stderr:
%s
"""
)
msg = msg % (
res.args,
textwrap.indent(res.stdout.strip(), indent),
textwrap.indent(res.stderr.strip(), indent),
)
log.info(msg)
raise RuntimeError(msg) from subprocess.CalledProcessError(
res.returncode, res.args, res.stdout, res.stderr

View File

@@ -3,7 +3,10 @@ import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.pageserver.utils import wait_for_upload_queue_empty
from fixtures.pageserver.utils import (
wait_for_upload_queue_empty,
wait_until_tenant_active,
)
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from requests.exceptions import ConnectionError
@@ -113,6 +116,8 @@ def test_actually_duplicated_l1(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin)
time.sleep(1)
env.pageserver.start()
wait_until_tenant_active(pageserver_http, tenant_id)
message = f".*duplicated L1 layer layer={l1_found.name}"
env.pageserver.allowed_errors.append(message)

View File

@@ -81,7 +81,7 @@ def generate_uploads_and_deletions(
f"""
INSERT INTO foo (id, val)
SELECT g, '{data}'
FROM generate_series(1, 20000) g
FROM generate_series(1, 200) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
@@ -116,6 +116,10 @@ def get_deletion_queue_submitted(ps_http) -> int:
return get_metric_or_0(ps_http, "pageserver_deletion_queue_submitted_total")
def get_deletion_queue_validated(ps_http) -> int:
return get_metric_or_0(ps_http, "pageserver_deletion_queue_validated_total")
def get_deletion_queue_dropped(ps_http) -> int:
return get_metric_or_0(ps_http, "pageserver_deletion_queue_dropped_total")
@@ -273,12 +277,15 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("keep_attachment", [True, False])
@pytest.mark.parametrize("validate_before", [True, False])
def test_deletion_queue_recovery(
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, keep_attachment: bool
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, keep_attachment: bool, validate_before: bool
):
"""
:param keep_attachment: If true, we re-attach after restart. Else, we act as if some other
node took the attachment while we were restarting.
:param validate_before: If true, we wait for deletions to be validated before restart. This
makes them elegible to be executed after restart, if the same node keeps the attachment.
"""
neon_env_builder.enable_generations = True
neon_env_builder.enable_pageserver_remote_storage(
@@ -288,12 +295,20 @@ def test_deletion_queue_recovery(
ps_http = env.pageserver.http_client()
# Prevent deletion lists from being executed, to build up some backlog of deletions
ps_http.configure_failpoints(
[
("deletion-queue-before-execute", "return"),
]
)
failpoints = [
# Prevent deletion lists from being executed, to build up some backlog of deletions
("deletion-queue-before-execute", "return"),
]
if not validate_before:
failpoints.append(
# Prevent deletion lists from being validated, we will test that they are
# dropped properly during recovery. 'pause' is okay here because we kill
# the pageserver with immediate=true
("control-plane-client-validate", "pause")
)
ps_http.configure_failpoints(failpoints)
generate_uploads_and_deletions(env)
@@ -305,6 +320,16 @@ def test_deletion_queue_recovery(
assert get_deletion_queue_unexpected_errors(ps_http) == 0
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
if validate_before:
def assert_validation_complete():
assert get_deletion_queue_submitted(ps_http) == get_deletion_queue_validated(ps_http)
wait_until(20, 1, assert_validation_complete)
# A short wait to let the DeletionHeader get written out, as this happens after
# the validated count gets incremented.
time.sleep(1)
log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued")
env.pageserver.stop(immediate=True)
@@ -327,14 +352,17 @@ def test_deletion_queue_recovery(
ps_http.deletion_queue_flush(execute=True)
wait_until(10, 1, lambda: assert_deletion_queue(ps_http, lambda n: n == 0))
if keep_attachment:
# If we kept the attachment, then our pre-restart deletions should have executed
# successfully
if keep_attachment or validate_before:
# - If we kept the attachment, then our pre-restart deletions should execute
# because on re-attach they were from the immediately preceding generation
# - If we validated before restart, then the deletions should execute because the
# deletion queue header records a validated deletion list sequence number.
assert get_deletion_queue_executed(ps_http) == before_restart_depth
else:
env.pageserver.allowed_errors.extend([".*Dropping stale deletions.*"])
# If we lost the attachment, we should have dropped our pre-restart deletions.
assert get_deletion_queue_dropped(ps_http) == before_restart_depth
env.pageserver.allowed_errors.extend([".*Dropping stale deletions.*"])
assert get_deletion_queue_unexpected_errors(ps_http) == 0
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
@@ -350,3 +378,73 @@ def test_deletion_queue_recovery(
assert get_deletion_queue_unexpected_errors(ps_http) == 0
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
neon_env_builder.enable_generations = True
neon_env_builder.enable_pageserver_remote_storage(
RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
ps_http = env.pageserver.http_client()
generate_uploads_and_deletions(env)
env.pageserver.allowed_errors.extend(
[
# When the pageserver can't reach the control plane, it will complain
".*calling control plane generation validation API failed.*",
# Emergency mode is a big deal, we log errors whenever it is used.
".*Emergency mode!.*",
]
)
# Simulate a major incident: the control plane goes offline
assert env.attachment_service is not None
env.attachment_service.stop()
# Remember how many validations had happened before the control plane went offline
validated = get_deletion_queue_validated(ps_http)
generate_uploads_and_deletions(env, init=False)
# The running pageserver should stop progressing deletions
time.sleep(10)
assert get_deletion_queue_validated(ps_http) == validated
# Restart the pageserver: ordinarily we would _avoid_ doing this during such an
# incident, but it might be unavoidable: if so, we want to be able to start up
# and serve clients.
env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP
env.pageserver.start(
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",)
)
# The pageserver should provide service to clients
generate_uploads_and_deletions(env, init=False)
# The pageserver should neither validate nor execute any deletions, it should have
# loaded the DeletionLists from before though
time.sleep(10)
assert get_deletion_queue_depth(ps_http) > 0
assert get_deletion_queue_validated(ps_http) == 0
assert get_deletion_queue_executed(ps_http) == 0
# When the control plane comes back up, normal service should resume
env.attachment_service.start()
ps_http.deletion_queue_flush(execute=True)
assert get_deletion_queue_depth(ps_http) == 0
assert get_deletion_queue_validated(ps_http) > 0
assert get_deletion_queue_executed(ps_http) > 0
# The pageserver should work fine when subsequently restarted in non-emergency mode
env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP
env.pageserver.start()
generate_uploads_and_deletions(env, init=False)
ps_http.deletion_queue_flush(execute=True)
assert get_deletion_queue_depth(ps_http) == 0
assert get_deletion_queue_validated(ps_http) > 0
assert get_deletion_queue_executed(ps_http) > 0

View File

@@ -45,14 +45,11 @@ def test_tenant_delete_smoke(
[
# The deletion queue will complain when it encounters simulated S3 errors
".*deletion executor: DeleteObjects request failed.*",
# lucky race with stopping from flushing a layer we fail to schedule any uploads
".*layer flush task.+: could not flush frozen layer: update_metadata_file",
]
)
# lucky race with stopping from flushing a layer we fail to schedule any uploads
env.pageserver.allowed_errors.append(
".*layer flush task.+: could not flush frozen layer: update_metadata_file"
)
ps_http = env.pageserver.http_client()
# first try to delete non existing tenant
@@ -194,11 +191,9 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
)
if simulate_failures:
env.pageserver.allowed_errors.extend(
[
# The deletion queue will complain when it encounters simulated S3 errors
".*deletion executor: DeleteObjects request failed.*",
]
env.pageserver.allowed_errors.append(
# The deletion queue will complain when it encounters simulated S3 errors
".*deletion executor: DeleteObjects request failed.*",
)
ps_http = env.pageserver.http_client()
@@ -293,6 +288,10 @@ def test_tenant_delete_is_resumed_on_attach(
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
env.pageserver.allowed_errors.append(
# lucky race with stopping from flushing a layer we fail to schedule any uploads
".*layer flush task.+: could not flush frozen layer: update_metadata_file"
)
tenant_id = env.initial_tenant