Compare commits

...

25 Commits

Author SHA1 Message Date
Alex Chi Z
e76b2061bd fix comments
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-05-08 10:50:30 -04:00
Alex Chi Z
4a27dd743b Update pageserver/src/aux_file.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2024-05-08 10:46:58 -04:00
Alex Chi Z
5629f65841 chore(pageserver): use sha256 for aux file encoding
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-05-07 15:42:16 -04:00
Conrad Ludgate
0c99e5ec6d proxy: cull http connections (#7632)
## Problem

Some HTTP client connections can stay open for quite a long time.

## Summary of changes

When there are too many HTTP client connections, pick a random
connection and gracefully cancel it.
2024-05-07 18:15:06 +01:00
John Spray
0af66a6003 pageserver: include generation number in local layer paths (#7609)
## Problem

In https://github.com/neondatabase/neon/pull/7531, we would like to be
able to rewrite layers safely. One option is to make `Layer` able to
rewrite files in place safely (e.g. by blocking evictions/deletions for
an old Layer while a new one is created), but that's relatively fragile.
It's more robust in general if we simply never overwrite the same local
file: we can do that by putting the generation number in the filename.

## Summary of changes

- Add `local_layer_path` (counterpart to `remote_layer_path`) and
convert all locations that manually constructed a local layer path by
joining LayerFileName to timeline path
- In the layer upload path, construct remote paths with
`remote_layer_path` rather than trying to build them out of a local
path.
- During startup, carry the full path to layer files through
`init::reconcile`, and pass it into `Layer::for_resident`
- Add a test to make sure we handle upgrades properly.
- Comment out the generation part of `local_layer_path`, since we need
to maintain forward compatibility for one release. A tiny followup PR
will enable it afterwards.

We could make this a bit simpler if we bulk renamed existing layers on
startup instead of carrying literal paths through init, but that is
operationally risky on existing servers with millions of layer files. We
can always do a renaming change in future if it becomes annoying, but
for the moment it's kind of nice to have a structure that enables us to
change local path names again in future quite easily.

We should rename `LayerFileName` to `LayerName` or somesuch, to make it
more obvious that it's not a literal filename: this was already a bit
confusing where that type is used in remote paths. That will be a
followup, to avoid polluting this PR's diff.
2024-05-07 18:03:12 +01:00
Alex Chi Z
017c34b773 feat(pageserver): generate basebackup from aux file v2 storage (#7517)
This pull request adds the new basebackup read path + aux file write
path. In the regression test, all logical replication tests are run with
matrix aux_file_v2=false/true.

Also fixed the vectored get code path to correctly return missing key
error when being called from the unified sequential get code path.
---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-05-07 16:30:18 +00:00
Christian Schwarz
308227fa51 remove neon_local --pageserver-config-override (#7614)
Preceding PR https://github.com/neondatabase/neon/pull/7613 reduced the
usage of `--pageserver-config-override`.

This PR builds on top of that work and fully removes the `neon_local
--pageserver-config-override`.

Tests that need a non-default `pageserver.toml` control it using two
options:

1. Specify `NeonEnvBuilder.pageserver_config_override` before
`NeonEnvBuilder.init_start()`. This uses a new `neon_local init
--pageserver-config` flag.
2. After `init_start()`: `env.pageserver.stop()` +
`NeonPageserver.edit_config_toml()` + `env.pageserver.start()`

A few test cases were using
`env.pageserver.start(overrides=("--pageserver-config-override...",))`.
I changed them to use one of the options above. 

Future Work
-----------

The `neon_local init --pageserver-config` flag still uses `pageserver
--config-override` under the hood. In the future, neon_local should just
write the `pageserver.toml` directly.

The `NeonEnvBuilder.pageserver_config_override` field should be renamed
to `pageserver_initial_config`. Let's save this churn for a separate
refactor commit.
2024-05-07 16:29:59 +00:00
Joonas Koivunen
d041f9a887 refactor(rtc): remove excess cloning (#7635)
RemoteTimelineClient has a lot of mandatory cloning. By using a single
way of creating IndexPart out of UploadQueueInitialized we can simplify
things and also avoid cloning the latest files for each
`index_part.json` upload (the contents will still be cloned).
2024-05-07 19:22:29 +03:00
Christian Schwarz
ea531d448e fix(test suite): forward compat test is not using latest neon_local (#7637)
The `test_forward_compatibility` test runs the old production binaries,
but is supposed to always run the latest neon_local binary.

I think commit 6acbee23 broke that by accident because in that commit,
`from_repo_dir` is introduced and runs an `init_start()` before the
`test_forward_compatibility` gets a chance to patch up the
neon_local_binpath.
2024-05-07 15:43:04 +00:00
dependabot[bot]
2dbd1c1ed5 build(deps): bump flask-cors from 3.0.10 to 4.0.1 (#7633) 2024-05-07 16:29:40 +01:00
Alexander Bayandin
51376ef3c8 Add Postgres commit sha to Postgres version (#4603)
## Problem

Ref https://neondb.slack.com/archives/C036U0GRMRB/p1688122168477729

## Summary of changes
- Add sha from postgres repo into postgres version string (via
`--with-extra-version`)
- Add a test that Postgres version matches the expected one
- Remove build-time hard check and allow only related tests to fail
2024-05-07 15:18:17 +00:00
dependabot[bot]
5a3d8e75ed build(deps): bump jinja2 from 3.1.3 to 3.1.4 (#7626) 2024-05-07 12:53:52 +00:00
dependabot[bot]
6e4e578841 build(deps): bump werkzeug from 3.0.1 to 3.0.3 (#7625) 2024-05-07 13:12:53 +01:00
Joonas Koivunen
3c9b484c4d feat: Timeline detach ancestor (#7456)
## Problem

Timelines cannot be deleted if they have children. In many production
cases, a branch or a timeline has been created off the main branch for
various reasons to the effect of having now a "new main" branch. This
feature will make it possible to detach a timeline from its ancestor by
inheriting all of the data before the branchpoint to the detached
timeline and by also reparenting all of the ancestor's earlier branches
to the detached timeline.

## Summary of changes

- Earlier added copy_lsn_prefix functionality is used
- RemoteTimelineClient learns to adopt layers by copying them from
another timeline
- LayerManager adds support for adding adopted layers
-
`timeline::Timeline::{prepare_to_detach,complete_detaching}_from_ancestor`
and `timeline::detach_ancestor` are added
- HTTP PUT handler

Cc: #6994

Co-authored-by: Christian Schwarz <christian@neon.tech>
2024-05-07 13:47:57 +03:00
John Spray
af849a1f61 pageserver: post-shard-split layer trimming (1/2) (#7572)
## Problem

After a shard split of a large existing tenant, child tenants can end up
with oversized historic layers indefinitely, if those layers are
prevented from being GC'd by branchpoints.

This PR is followed by https://github.com/neondatabase/neon/pull/7531

Related issue: https://github.com/neondatabase/neon/issues/7504

## Summary of changes

- Add a new compaction phase `compact_shard_ancestors`, which identifies
layers that are no longer needed after a shard split.
- Add a Timeline->LayerMap code path called `rewrite_layers` , which is
currently only used to drop layers, but will later be used to rewrite
them as well in https://github.com/neondatabase/neon/pull/7531
- Add a new test that compacts after a split, and checks that something
is deleted.

Note that this doesn't have much impact on a tenant's resident size
(since unused layers would end up evicted anyway), but it:
- Makes index_part.json much smaller
- Makes the system easier to reason about: avoid having tenants which
are like "my physical size is 4TiB but don't worry I'll never actually
download it", instead have tenants report the real physical size of what
they might download.

Why do we remove these layers in compaction rather than during the
split? Because we have existing split tenants that need cleaning up. We
can add it to the split operation in future as an optimization.
2024-05-07 11:15:58 +01:00
Christian Schwarz
ac7dc82103 use less neon_local --pageserver-config-override / pageserver -c (#7613) 2024-05-06 22:31:26 +02:00
Anna Khanova
f1b654b77d proxy: reduce number of concurrent connections (#7620)
## Problem

Usually, the connection itself is quite fast (bellow 10ms for p999:
https://neonprod.grafana.net/goto/aOyn8vYIg?orgId=1).

It doesn't make a lot of sense to wait for a lot of time for the lock,
if it takes a lot of time to acquire it, probably, something goes wrong.

We also spawn a lot of retries, but they are not super helpful (0 means
that it was connected successfully, 1, most probably, that it was
re-request of the compute node address
https://neonprod.grafana.net/goto/J_8VQvLIR?orgId=1). Let's try to keep
a small number of retries.
2024-05-06 19:03:25 +00:00
Sasha Krassovsky
7dd58e1449 On-demand WAL download for walsender (#6872)
## Problem
There's allegedly a bug where if we connect a subscriber before WAL is
downloaded from the safekeeper, it creates an error.

## Summary of changes
Adds support for pausing safekeepers from sending WAL to computes, and
then creates a compute and attaches a subscriber while it's in this
paused state. Fails to reproduce the issue, but probably a good test to
have

---------

Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
2024-05-06 10:54:07 -07:00
Arpad Müller
f3af5f4660 Fix test_ts_of_lsn_api flakiness (#7599)
Changes parameters to fix the flakiness of `test_ts_of_lsn_api`. Already
now, the amount of flakiness of the test is pretty low. With this, it's
even lower.

cc #5768
2024-05-06 16:41:51 +00:00
Joonas Koivunen
a96e15cb6b test: less flaky test_synthetic_size_while_deleting (#7622)
#7585 introduced test case for deletions while synthetic size is being
calculated. The test has a race against deletion, but we only accept one
outcome. Fix it to accept 404 as well, as we cannot control from outside
which outcome happens.

Evidence:
https://neon-github-public-dev.s3.amazonaws.com/reports/pr-7456/8970595458/index.html#/testresult/32a5b2f8c4094bdb
2024-05-06 15:52:51 +00:00
Christian Schwarz
df1def7018 refactor(pageserver): remove --update-init flag (#7612)
We don't actually use it.

refs https://github.com/neondatabase/neon/issues/7555
2024-05-06 16:40:44 +02:00
Tristan Partin
69337be5c2 Fix grammar in provider.rs error message
s/temporary/temporarily

---------

Co-authored-by: Barry Grenon <barry_grenon@yahoo.ca>
2024-05-06 09:14:42 -05:00
John Spray
67a2215163 pageserver: label tenant_slots metric by slot type (#7603)
## Problem

The current `tenant_slots` metric becomes less useful once we have lots
of secondaries, because we can't tell how many tenants are really
attached (without doing a sum() on some other metric).

## Summary of changes

- Add a `mode` label to this metric
- Update the metric with `slot_added` and `slot_removed` helpers that
are called at all the places we mutate the tenants map.
- Add a debug assertion at shutdown that checks the metrics add up to
the right number, as a cheap way of validating that we're calling the
metric hooks in all the right places.
2024-05-06 14:07:15 +01:00
John Spray
3764dd2e84 pageserver: call maybe_freeze_ephemeral_layer from a dedicated task (#7594)
## Problem

In testing of the earlier fix for OOMs under heavy write load
(https://github.com/neondatabase/neon/pull/7218), we saw that the limit
on ephemeral layer size wasn't being reliably enforced. That was
diagnosed as being due to overwhelmed compaction loops: most tenants
were waiting on the semaphore for background tasks, and thereby not
running the function that proactively rolls layers frequently enough.

Related: https://github.com/neondatabase/neon/issues/6939 

## Summary of changes

- Create a new per-tenant background loop for "ingest housekeeping",
which invokes maybe_freeze_ephemeral_layer() without taking the
background task semaphore.
- Downgrade to DEBUG a log line in maybe_freeze_ephemeral_layer that had
been INFO, but turns out to be pretty common in the field.

There's some discussion on the issue
(https://github.com/neondatabase/neon/issues/6939#issuecomment-2083554275)
about alternatives for calling this maybe_freeze_epemeral_layer
periodically without it getting stuck behind compaction. A whole task
just for this feels like kind of a big hammer, but we may in future find
that there are other pieces of lightweight housekeeping that we want to
do here too.

Why is it okay to call maybe_freeze_ephemeral_layer outside of the
background tasks semaphore?
- this is the same work we would do anyway if we receive writes from the
safekeeper, just done a bit sooner.
- The period of the new task is generously jittered (+/- 5%), so when
the ephemeral layer size tips over the threshold, we shouldn't see an
excessively aggressive thundering herd of layer freezes (and only layers
larger than the mean layer size will be frozen)
- All that said, this is an imperfect approach that relies on having a
generous amount of RAM to dip into when we need to freeze somewhat
urgently. It would be nice in future to also block compaction/GC when we
recognize resource stress and need to do other work (like layer
freezing) to reduce memory footprint.
2024-05-06 14:07:07 +01:00
Heikki Linnakangas
0115fe6cb2 Make 'neon.protocol_version = 2' the default (#7616)
Once all the computes in production have restarted, we can remove
protocol version 1 altogether.

See issue #6211.
2024-05-06 14:37:55 +03:00
95 changed files with 3872 additions and 1061 deletions

View File

@@ -236,27 +236,6 @@ jobs:
submodules: true
fetch-depth: 1
- name: Check Postgres submodules revision
shell: bash -euo pipefail {0}
run: |
# This is a temporary solution to ensure that the Postgres submodules revision is correct (i.e. the updated intentionally).
# Eventually it will be replaced by a regression test https://github.com/neondatabase/neon/pull/4603
FAILED=false
for postgres in postgres-v14 postgres-v15 postgres-v16; do
expected=$(cat vendor/revisions.json | jq --raw-output '."'"${postgres}"'"')
actual=$(git rev-parse "HEAD:vendor/${postgres}")
if [ "${expected}" != "${actual}" ]; then
echo >&2 "Expected ${postgres} rev to be at '${expected}', but it is at '${actual}'"
FAILED=true
fi
done
if [ "${FAILED}" = "true" ]; then
echo >&2 "Please update vendor/revisions.json if these changes are intentional"
exit 1
fi
- name: Set pg 14 revision for caching
id: pg_v14_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v14) >> $GITHUB_OUTPUT

3
Cargo.lock generated
View File

@@ -1348,6 +1348,7 @@ dependencies = [
"tokio-postgres",
"tokio-util",
"toml",
"toml_edit",
"tracing",
"url",
"utils",
@@ -3691,6 +3692,7 @@ dependencies = [
"serde_json",
"serde_path_to_error",
"serde_with",
"sha2",
"signal-hook",
"smallvec",
"storage_broker",
@@ -4371,6 +4373,7 @@ dependencies = [
"hyper 1.2.0",
"hyper-tungstenite",
"hyper-util",
"indexmap 2.0.1",
"ipnet",
"itertools",
"lasso",

View File

@@ -99,6 +99,7 @@ humantime = "2.1"
humantime-serde = "1.1.1"
hyper = "0.14"
hyper-tungstenite = "0.13.0"
indexmap = "2"
inotify = "0.10.2"
ipnet = "2.9.0"
itertools = "0.10"

View File

@@ -81,11 +81,14 @@ $(POSTGRES_INSTALL_DIR)/build/%/config.status:
echo "'git submodule update --init --recursive --depth 2 --progress .' in project root.\n"; \
exit 1; }
mkdir -p $(POSTGRES_INSTALL_DIR)/build/$*
(cd $(POSTGRES_INSTALL_DIR)/build/$* && \
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-$*/configure \
VERSION=$*; \
EXTRA_VERSION=$$(cd $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION && git rev-parse HEAD); \
(cd $(POSTGRES_INSTALL_DIR)/build/$$VERSION && \
env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION/configure \
CFLAGS='$(PG_CFLAGS)' \
$(PG_CONFIGURE_OPTS) \
--prefix=$(abspath $(POSTGRES_INSTALL_DIR))/$* > configure.log)
$(PG_CONFIGURE_OPTS) --with-extra-version=" ($$EXTRA_VERSION)" \
--prefix=$(abspath $(POSTGRES_INSTALL_DIR))/$$VERSION > configure.log)
# nicer alias to run 'configure'
# Note: I've been unable to use templates for this part of our configuration.

View File

@@ -28,6 +28,7 @@ serde_with.workspace = true
tar.workspace = true
thiserror.workspace = true
toml.workspace = true
toml_edit.workspace = true
tokio.workspace = true
tokio-postgres.workspace = true
tokio-util.workspace = true

View File

@@ -133,7 +133,7 @@ fn main() -> Result<()> {
let subcommand_result = match sub_name {
"tenant" => rt.block_on(handle_tenant(sub_args, &mut env)),
"timeline" => rt.block_on(handle_timeline(sub_args, &mut env)),
"start" => rt.block_on(handle_start_all(sub_args, &env)),
"start" => rt.block_on(handle_start_all(&env)),
"stop" => rt.block_on(handle_stop_all(sub_args, &env)),
"pageserver" => rt.block_on(handle_pageserver(sub_args, &env)),
"storage_controller" => rt.block_on(handle_storage_controller(sub_args, &env)),
@@ -358,6 +358,13 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
default_conf(*num_pageservers)
};
let pageserver_config: toml_edit::Document =
if let Some(path) = init_match.get_one::<PathBuf>("pageserver-config") {
std::fs::read_to_string(path)?.parse()?
} else {
toml_edit::Document::new()
};
let pg_version = init_match
.get_one::<u32>("pg-version")
.copied()
@@ -375,7 +382,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
// Initialize pageserver, create initial tenant and timeline.
for ps_conf in &env.pageservers {
PageServerNode::from_env(&env, ps_conf)
.initialize(&pageserver_config_overrides(init_match))
.initialize(&pageserver_config)
.unwrap_or_else(|e| {
eprintln!("pageserver init failed: {e:?}");
exit(1);
@@ -397,15 +404,6 @@ fn get_default_pageserver(env: &local_env::LocalEnv) -> PageServerNode {
PageServerNode::from_env(env, ps_conf)
}
fn pageserver_config_overrides(init_match: &ArgMatches) -> Vec<&str> {
init_match
.get_many::<String>("pageserver-config-override")
.into_iter()
.flatten()
.map(String::as_str)
.collect()
}
async fn handle_tenant(
tenant_match: &ArgMatches,
env: &mut local_env::LocalEnv,
@@ -1076,10 +1074,7 @@ fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageSe
async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
match sub_match.subcommand() {
Some(("start", subcommand_args)) => {
if let Err(e) = get_pageserver(env, subcommand_args)?
.start(&pageserver_config_overrides(subcommand_args))
.await
{
if let Err(e) = get_pageserver(env, subcommand_args)?.start().await {
eprintln!("pageserver start failed: {e}");
exit(1);
}
@@ -1105,10 +1100,7 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
exit(1);
}
if let Err(e) = pageserver
.start(&pageserver_config_overrides(subcommand_args))
.await
{
if let Err(e) = pageserver.start().await {
eprintln!("pageserver start failed: {e}");
exit(1);
}
@@ -1235,7 +1227,7 @@ async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
Ok(())
}
async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> {
async fn handle_start_all(env: &local_env::LocalEnv) -> anyhow::Result<()> {
// Endpoints are not started automatically
broker::start_broker_process(env).await?;
@@ -1252,10 +1244,7 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
for ps_conf in &env.pageservers {
let pageserver = PageServerNode::from_env(env, ps_conf);
if let Err(e) = pageserver
.start(&pageserver_config_overrides(sub_match))
.await
{
if let Err(e) = pageserver.start().await {
eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e);
try_stop_all(env, true).await;
exit(1);
@@ -1396,13 +1385,6 @@ fn cli() -> Command {
.required(false)
.value_name("stop-mode");
let pageserver_config_args = Arg::new("pageserver-config-override")
.long("pageserver-config-override")
.num_args(1)
.action(ArgAction::Append)
.help("Additional pageserver's configuration options or overrides, refer to pageserver's 'config-override' CLI parameter docs for more")
.required(false);
let remote_ext_config_args = Arg::new("remote-ext-config")
.long("remote-ext-config")
.num_args(1)
@@ -1464,14 +1446,21 @@ fn cli() -> Command {
.subcommand(
Command::new("init")
.about("Initialize a new Neon repository, preparing configs for services to start with")
.arg(pageserver_config_args.clone())
.arg(num_pageservers_arg.clone())
.arg(
Arg::new("config")
.long("config")
.required(false)
.value_parser(value_parser!(PathBuf))
.value_name("config"),
.value_name("config")
)
.arg(
Arg::new("pageserver-config")
.long("pageserver-config")
.required(false)
.value_parser(value_parser!(PathBuf))
.value_name("pageserver-config")
.help("Merge the provided pageserver config into the one generated by neon_local."),
)
.arg(pg_version_arg.clone())
.arg(force_arg)
@@ -1553,7 +1542,6 @@ fn cli() -> Command {
.subcommand(Command::new("status"))
.subcommand(Command::new("start")
.about("Start local pageserver")
.arg(pageserver_config_args.clone())
)
.subcommand(Command::new("stop")
.about("Stop local pageserver")
@@ -1561,7 +1549,6 @@ fn cli() -> Command {
)
.subcommand(Command::new("restart")
.about("Restart local pageserver")
.arg(pageserver_config_args.clone())
)
)
.subcommand(
@@ -1676,7 +1663,6 @@ fn cli() -> Command {
.subcommand(
Command::new("start")
.about("Start page server and safekeepers")
.arg(pageserver_config_args)
)
.subcommand(
Command::new("stop")

View File

@@ -4,7 +4,6 @@
//!
//! .neon/
//!
use std::borrow::Cow;
use std::collections::HashMap;
use std::io;
@@ -18,7 +17,8 @@ use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use futures::SinkExt;
use pageserver_api::models::{
self, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, TimelineInfo,
self, AuxFilePolicy, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo,
TimelineInfo,
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
@@ -77,7 +77,7 @@ impl PageServerNode {
/// Merge overrides provided by the user on the command line with our default overides derived from neon_local configuration.
///
/// These all end up on the command line of the `pageserver` binary.
fn neon_local_overrides(&self, cli_overrides: &[&str]) -> Vec<String> {
fn neon_local_overrides(&self, cli_overrides: &toml_edit::Document) -> Vec<String> {
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let pg_distrib_dir_param = format!(
"pg_distrib_dir='{}'",
@@ -157,10 +157,7 @@ impl PageServerNode {
}
}
if !cli_overrides
.iter()
.any(|c| c.starts_with("remote_storage"))
{
if !cli_overrides.contains_key("remote_storage") {
overrides.push(format!(
"remote_storage={{local_path='../{PAGESERVER_REMOTE_STORAGE_DIR}'}}"
));
@@ -173,13 +170,13 @@ impl PageServerNode {
}
// Apply the user-provided overrides
overrides.extend(cli_overrides.iter().map(|&c| c.to_owned()));
overrides.push(cli_overrides.to_string());
overrides
}
/// Initializes a pageserver node by creating its config with the overrides provided.
pub fn initialize(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
pub fn initialize(&self, config_overrides: &toml_edit::Document) -> anyhow::Result<()> {
// First, run `pageserver --init` and wait for it to write a config into FS and exit.
self.pageserver_init(config_overrides)
.with_context(|| format!("Failed to run init for pageserver node {}", self.conf.id))
@@ -197,11 +194,11 @@ impl PageServerNode {
.expect("non-Unicode path")
}
pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
self.start_node(config_overrides, false).await
pub async fn start(&self) -> anyhow::Result<()> {
self.start_node().await
}
fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
fn pageserver_init(&self, config_overrides: &toml_edit::Document) -> anyhow::Result<()> {
let datadir = self.repo_path();
let node_id = self.conf.id;
println!(
@@ -219,11 +216,18 @@ impl PageServerNode {
let datadir_path_str = datadir.to_str().with_context(|| {
format!("Cannot start pageserver node {node_id} in path that has no string representation: {datadir:?}")
})?;
let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
args.push(Cow::Borrowed("--init"));
// `pageserver --init` merges the `--config-override`s into a built-in default config,
// then writes out the merged product to `pageserver.toml`.
// TODO: just write the full `pageserver.toml` and get rid of `--config-override`.
let mut args = vec!["--init", "--workdir", datadir_path_str];
let overrides = self.neon_local_overrides(config_overrides);
for piece in &overrides {
args.push("--config-override");
args.push(piece);
}
let init_output = Command::new(self.env.pageserver_bin())
.args(args.iter().map(Cow::as_ref))
.args(args)
.envs(self.pageserver_env_variables()?)
.output()
.with_context(|| format!("Failed to run pageserver init for node {node_id}"))?;
@@ -262,11 +266,7 @@ impl PageServerNode {
Ok(())
}
async fn start_node(
&self,
config_overrides: &[&str],
update_config: bool,
) -> anyhow::Result<()> {
async fn start_node(&self) -> anyhow::Result<()> {
// TODO: using a thread here because start_process() is not async but we need to call check_status()
let datadir = self.repo_path();
print!(
@@ -283,15 +283,12 @@ impl PageServerNode {
self.conf.id, datadir,
)
})?;
let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
if update_config {
args.push(Cow::Borrowed("--update-config"));
}
let args = vec!["-D", datadir_path_str];
background_process::start_process(
"pageserver",
&datadir,
&self.env.pageserver_bin(),
args.iter().map(Cow::as_ref),
args,
self.pageserver_env_variables()?,
background_process::InitialPidFile::Expect(self.pid_file()),
|| async {
@@ -308,22 +305,6 @@ impl PageServerNode {
Ok(())
}
fn pageserver_basic_args<'a>(
&self,
config_overrides: &'a [&'a str],
datadir_path_str: &'a str,
) -> Vec<Cow<'a, str>> {
let mut args = vec![Cow::Borrowed("-D"), Cow::Borrowed(datadir_path_str)];
let overrides = self.neon_local_overrides(config_overrides);
for config_override in overrides {
args.push(Cow::Borrowed("-c"));
args.push(Cow::Owned(config_override));
}
args
}
fn pageserver_env_variables(&self) -> anyhow::Result<Vec<(String, String)>> {
// FIXME: why is this tied to pageserver's auth type? Whether or not the safekeeper
// needs a token, and how to generate that token, seems independent to whether
@@ -449,11 +430,11 @@ impl PageServerNode {
.map(serde_json::from_str)
.transpose()
.context("parse `timeline_get_throttle` from json")?,
switch_to_aux_file_v2: settings
.remove("switch_to_aux_file_v2")
.map(|x| x.parse::<bool>())
switch_aux_file_policy: settings
.remove("switch_aux_file_policy")
.map(|x| x.parse::<AuxFilePolicy>())
.transpose()
.context("Failed to parse 'switch_to_aux_file_v2' as bool")?,
.context("Failed to parse 'switch_aux_file_policy'")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
@@ -572,11 +553,11 @@ impl PageServerNode {
.map(serde_json::from_str)
.transpose()
.context("parse `timeline_get_throttle` from json")?,
switch_to_aux_file_v2: settings
.remove("switch_to_aux_file_v2")
.map(|x| x.parse::<bool>())
switch_aux_file_policy: settings
.remove("switch_aux_file_policy")
.map(|x| x.parse::<AuxFilePolicy>())
.transpose()
.context("Failed to parse 'switch_to_aux_file_v2' as bool")?,
.context("Failed to parse 'switch_aux_file_policy'")?,
}
};

View File

@@ -480,6 +480,15 @@ impl<A: CounterPairAssoc> CounterPairVec<A> {
let id = self.vec.with_labels(labels);
self.vec.remove_metric(id)
}
pub fn sample(&self, labels: <A::LabelGroupSet as LabelGroupSet>::Group<'_>) -> u64 {
let id = self.vec.with_labels(labels);
let metric = self.vec.get_metric(id);
let inc = metric.inc.count.load(std::sync::atomic::Ordering::Relaxed);
let dec = metric.dec.count.load(std::sync::atomic::Ordering::Relaxed);
inc.saturating_sub(dec)
}
}
impl<T, A> ::measured::metric::group::MetricGroup<T> for CounterPairVec<A>

View File

@@ -240,7 +240,7 @@ impl<'a> ShardedRange<'a> {
/// pages that would not actually be stored on this node.
///
/// Don't use this function in code that works with physical entities like layer files.
fn raw_size(range: &Range<Key>) -> u32 {
pub fn raw_size(range: &Range<Key>) -> u32 {
if is_contiguous_range(range) {
contiguous_range_len(range)
} else {

View File

@@ -1,3 +1,4 @@
pub mod detach_ancestor;
pub mod partitioning;
pub mod utilization;
@@ -8,6 +9,7 @@ use std::{
collections::HashMap,
io::{BufRead, Read},
num::{NonZeroU64, NonZeroUsize},
str::FromStr,
time::{Duration, SystemTime},
};
@@ -303,7 +305,31 @@ pub struct TenantConfig {
pub lazy_slru_download: Option<bool>,
pub timeline_get_throttle: Option<ThrottleConfig>,
pub image_layer_creation_check_threshold: Option<u8>,
pub switch_to_aux_file_v2: Option<bool>,
pub switch_aux_file_policy: Option<AuxFilePolicy>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum AuxFilePolicy {
V1,
V2,
CrossValidation,
}
impl FromStr for AuxFilePolicy {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let s = s.to_lowercase();
if s == "v1" {
Ok(Self::V1)
} else if s == "v2" {
Ok(Self::V2)
} else if s == "crossvalidation" || s == "cross_validation" {
Ok(Self::CrossValidation)
} else {
anyhow::bail!("cannot parse {} to aux file policy", s)
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]

View File

@@ -0,0 +1,6 @@
use utils::id::TimelineId;
#[derive(Default, serde::Serialize)]
pub struct AncestorDetached {
pub reparented_timelines: Vec<TimelineId>,
}

View File

@@ -50,6 +50,14 @@ extern "C" fn get_flush_rec_ptr(wp: *mut WalProposer) -> XLogRecPtr {
}
}
extern "C" fn update_donor(wp: *mut WalProposer, donor: *mut Safekeeper, donor_lsn: XLogRecPtr) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
(*api).update_donor(&mut (*donor), donor_lsn)
}
}
extern "C" fn get_current_timestamp(wp: *mut WalProposer) -> TimestampTz {
unsafe {
let callback_data = (*(*wp).config).callback_data;
@@ -391,6 +399,7 @@ pub(crate) fn create_api() -> walproposer_api {
get_shmem_state: Some(get_shmem_state),
start_streaming: Some(start_streaming),
get_flush_rec_ptr: Some(get_flush_rec_ptr),
update_donor: Some(update_donor),
get_current_timestamp: Some(get_current_timestamp),
conn_error_message: Some(conn_error_message),
conn_status: Some(conn_status),
@@ -421,6 +430,32 @@ pub(crate) fn create_api() -> walproposer_api {
}
}
pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
let empty_feedback = crate::bindings::PageserverFeedback {
present: false,
currentClusterSize: 0,
last_received_lsn: 0,
disk_consistent_lsn: 0,
remote_consistent_lsn: 0,
replytime: 0,
shard_number: 0,
};
crate::bindings::WalproposerShmemState {
propEpochStartLsn: crate::bindings::pg_atomic_uint64 { value: 0 },
donor_name: [0; 64],
donor_conninfo: [0; 1024],
donor_lsn: 0,
mutex: 0,
mineLastElectedTerm: crate::bindings::pg_atomic_uint64 { value: 0 },
backpressureThrottlingTime: crate::bindings::pg_atomic_uint64 { value: 0 },
currentClusterSize: crate::bindings::pg_atomic_uint64 { value: 0 },
shard_ps_feedback: [empty_feedback; 128],
num_shards: 0,
min_ps_feedback: empty_feedback,
}
}
impl std::fmt::Display for Level {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)

View File

@@ -1,8 +1,5 @@
use std::ffi::CString;
use postgres_ffi::WAL_SEGMENT_SIZE;
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::{
api_bindings::{create_api, take_vec_u8, Level},
bindings::{
@@ -10,6 +7,8 @@ use crate::{
WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
},
};
use postgres_ffi::WAL_SEGMENT_SIZE;
use utils::{id::TenantTimelineId, lsn::Lsn};
/// Rust high-level wrapper for C walproposer API. Many methods are not required
/// for simple cases, hence todo!() in default implementations.
@@ -28,6 +27,10 @@ pub trait ApiImpl {
todo!()
}
fn update_donor(&self, _donor: &mut Safekeeper, _donor_lsn: u64) {
todo!()
}
fn get_current_timestamp(&self) -> i64 {
todo!()
}
@@ -274,6 +277,7 @@ mod tests {
sync::{atomic::AtomicUsize, mpsc::sync_channel},
};
use std::cell::UnsafeCell;
use utils::id::TenantTimelineId;
use crate::{api_bindings::Level, bindings::NeonWALReadResult, walproposer::Wrapper};
@@ -297,6 +301,8 @@ mod tests {
replies_ptr: AtomicUsize,
// channel to send LSN to the main thread
sync_channel: std::sync::mpsc::SyncSender<u64>,
// Shmem state, used for storing donor info
shmem: UnsafeCell<crate::bindings::WalproposerShmemState>,
}
impl MockImpl {
@@ -327,11 +333,22 @@ mod tests {
}
impl ApiImpl for MockImpl {
fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
self.shmem.get()
}
fn get_current_timestamp(&self) -> i64 {
println!("get_current_timestamp");
0
}
fn update_donor(&self, donor: &mut crate::bindings::Safekeeper, donor_lsn: u64) {
let mut shmem = unsafe { *self.get_shmem_state() };
shmem.propEpochStartLsn.value = donor_lsn;
shmem.donor_conninfo = donor.conninfo;
shmem.donor_lsn = donor_lsn;
}
fn conn_status(
&self,
_: &mut crate::bindings::Safekeeper,
@@ -507,6 +524,7 @@ mod tests {
],
replies_ptr: AtomicUsize::new(0),
sync_channel: sender,
shmem: UnsafeCell::new(crate::api_bindings::empty_shmem()),
});
let config = crate::walproposer::Config {
ttid,

View File

@@ -88,6 +88,7 @@ reqwest.workspace = true
rpds.workspace = true
enum-map.workspace = true
enumset = { workspace = true, features = ["serde"]}
sha2.workspace = true
strum.workspace = true
strum_macros.workspace = true

View File

@@ -1,10 +1,16 @@
use bytes::{Buf, BufMut, Bytes};
use pageserver_api::key::{Key, AUX_KEY_PREFIX, METADATA_KEY_SIZE};
use sha2::Digest;
use tracing::warn;
/// Create a metadata key from a hash, encoded as [AUX_KEY_PREFIX, 2B directory prefix, first 13B of 128b xxhash].
fn hash256(data: &[u8]) -> [u8; 32] {
sha2::Sha256::digest(data).into()
}
/// Create a metadata key from a hash, encoded as [AUX_KEY_PREFIX, 2B directory prefix, first 13B of 128b sha256].
fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key {
let mut key = [0; METADATA_KEY_SIZE];
let hash = twox_hash::xxh3::hash128(data).to_be_bytes();
let hash = hash256(data);
key[0] = AUX_KEY_PREFIX;
key[1] = dir_level1;
key[2] = dir_level2;
@@ -61,6 +67,84 @@ pub fn encode_aux_file_key(path: &str) -> Key {
}
}
const AUX_FILE_ENCODING_VERSION: u8 = 0x01;
pub fn decode_file_value(val: &[u8]) -> anyhow::Result<Vec<(&str, &[u8])>> {
let mut ptr = val;
if ptr.is_empty() {
// empty value = no files
return Ok(Vec::new());
}
assert_eq!(
ptr.get_u8(),
AUX_FILE_ENCODING_VERSION,
"unsupported aux file value"
);
let mut files = vec![];
while ptr.has_remaining() {
let key_len = ptr.get_u32() as usize;
let key = &ptr[..key_len];
ptr.advance(key_len);
let val_len = ptr.get_u32() as usize;
let content = &ptr[..val_len];
ptr.advance(val_len);
let path = std::str::from_utf8(key)?;
files.push((path, content));
}
Ok(files)
}
/// Decode an aux file key-value pair into a list of files. The returned `Bytes` contains reference
/// to the original value slice. Be cautious about memory consumption.
pub fn decode_file_value_bytes(val: &Bytes) -> anyhow::Result<Vec<(String, Bytes)>> {
let mut ptr = val.clone();
if ptr.is_empty() {
// empty value = no files
return Ok(Vec::new());
}
assert_eq!(
ptr.get_u8(),
AUX_FILE_ENCODING_VERSION,
"unsupported aux file value"
);
let mut files = vec![];
while ptr.has_remaining() {
let key_len = ptr.get_u32() as usize;
let key = ptr.slice(..key_len);
ptr.advance(key_len);
let val_len = ptr.get_u32() as usize;
let content = ptr.slice(..val_len);
ptr.advance(val_len);
let path = std::str::from_utf8(&key)?.to_string();
files.push((path, content));
}
Ok(files)
}
pub fn encode_file_value(files: &[(&str, &[u8])]) -> anyhow::Result<Vec<u8>> {
if files.is_empty() {
// no files = empty value
return Ok(Vec::new());
}
let mut encoded = vec![];
encoded.put_u8(AUX_FILE_ENCODING_VERSION);
for (path, content) in files {
if path.len() > u32::MAX as usize {
anyhow::bail!("{} exceeds path size limit", path);
}
encoded.put_u32(path.len() as u32);
encoded.put_slice(path.as_bytes());
if content.len() > u32::MAX as usize {
anyhow::bail!("{} exceeds content size limit", path);
}
encoded.put_u32(content.len() as u32);
encoded.put_slice(content);
}
Ok(encoded)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -70,14 +154,17 @@ mod tests {
// AUX file encoding requires the hash to be portable across all platforms. This test case checks
// if the algorithm produces the same hash across different environments.
assert_eq!(
305317690835051308206966631765527126151,
twox_hash::xxh3::hash128("test1".as_bytes())
"1b4f0e9851971998e732078544c96b36c3d01cedf7caa332359d6f1d83567014",
hex::encode(hash256("test1".as_bytes()))
);
assert_eq!(
85104974691013376326742244813280798847,
twox_hash::xxh3::hash128("test/test2".as_bytes())
"3dfc364fd121aaa081cec54ef9ef6f69a4756df50cf3c52f1abd2b451829c4c0",
hex::encode(hash256("test/test2".as_bytes()))
);
assert_eq!(
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
hex::encode(hash256("".as_bytes()))
);
assert_eq!(0, twox_hash::xxh3::hash128("".as_bytes()));
}
#[test]
@@ -85,28 +172,45 @@ mod tests {
// To correct retrieve AUX files, the generated keys for the same file must be the same for all versions
// of the page server.
assert_eq!(
"6200000101E5B20C5F8DD5AA3289D6D9EAFA",
"62000001011B4F0E9851971998E732078544",
encode_aux_file_key("pg_logical/mappings/test1").to_string()
);
assert_eq!(
"620000010239AAC544893139B26F501B97E6",
"620000010260303AE22B998861BCE3B28F33",
encode_aux_file_key("pg_logical/snapshots/test2").to_string()
);
assert_eq!(
"620000010300000000000000000000000000",
"6200000103E3B0C44298FC1C149AFBF4C899",
encode_aux_file_key("pg_logical/replorigin_checkpoint").to_string()
);
assert_eq!(
"62000001FF8635AF2134B7266EC5B4189FD6",
"62000001FF60945D49535300BE8E42108658",
encode_aux_file_key("pg_logical/unsupported").to_string()
);
assert_eq!(
"6200000201772D0E5D71DE14DA86142A1619",
"6200000201FD61A03AF4F77D870FC21E05E7",
encode_aux_file_key("pg_replslot/test3").to_string()
);
assert_eq!(
"620000FFFF1866EBEB53B807B26A2416F317",
"620000FFFF7F75AD39B73CD2A1FE41680550",
encode_aux_file_key("other_file_not_supported").to_string()
);
}
#[test]
fn test_value_encoding() {
let files = vec![
("pg_logical/1.file", "1111".as_bytes()),
("pg_logical/2.file", "2222".as_bytes()),
];
assert_eq!(
files,
decode_file_value(&encode_file_value(&files).unwrap()).unwrap()
);
let files = vec![];
assert_eq!(
files,
decode_file_value(&encode_file_value(&files).unwrap()).unwrap()
);
}
}

View File

@@ -3,6 +3,7 @@
//! Main entry point for the Page Server executable.
use std::env::{var, VarError};
use std::io::Read;
use std::sync::Arc;
use std::time::Duration;
use std::{env, ops::ControlFlow, str::FromStr};
@@ -151,37 +152,34 @@ fn initialize_config(
workdir: &Utf8Path,
) -> anyhow::Result<ControlFlow<(), &'static PageServerConf>> {
let init = arg_matches.get_flag("init");
let update_config = init || arg_matches.get_flag("update-config");
let (mut toml, config_file_exists) = if cfg_file_path.is_file() {
if init {
anyhow::bail!(
"Config file '{cfg_file_path}' already exists, cannot init it, use --update-config to update it",
);
let file_contents: Option<toml_edit::Document> = match std::fs::File::open(cfg_file_path) {
Ok(mut f) => {
if init {
anyhow::bail!("config file already exists: {cfg_file_path}");
}
let md = f.metadata().context("stat config file")?;
if md.is_file() {
let mut s = String::new();
f.read_to_string(&mut s).context("read config file")?;
Some(s.parse().context("parse config file toml")?)
} else {
anyhow::bail!("directory entry exists but is not a file: {cfg_file_path}");
}
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => None,
Err(e) => {
anyhow::bail!("open pageserver config: {e}: {cfg_file_path}");
}
// Supplement the CLI arguments with the config file
let cfg_file_contents = std::fs::read_to_string(cfg_file_path)
.with_context(|| format!("Failed to read pageserver config at '{cfg_file_path}'"))?;
(
cfg_file_contents
.parse::<toml_edit::Document>()
.with_context(|| {
format!("Failed to parse '{cfg_file_path}' as pageserver config")
})?,
true,
)
} else if cfg_file_path.exists() {
anyhow::bail!("Config file '{cfg_file_path}' exists but is not a regular file");
} else {
// We're initializing the tenant, so there's no config file yet
(
DEFAULT_CONFIG_FILE
.parse::<toml_edit::Document>()
.context("could not parse built-in config file")?,
false,
)
};
let mut effective_config = file_contents.unwrap_or_else(|| {
DEFAULT_CONFIG_FILE
.parse()
.expect("unit tests ensure this works")
});
// Patch with overrides from the command line
if let Some(values) = arg_matches.get_many::<String>("config-override") {
for option_line in values {
let doc = toml_edit::Document::from_str(option_line).with_context(|| {
@@ -189,22 +187,21 @@ fn initialize_config(
})?;
for (key, item) in doc.iter() {
if config_file_exists && update_config && key == "id" && toml.contains_key(key) {
anyhow::bail!("Pageserver config file exists at '{cfg_file_path}' and has node id already, it cannot be overridden");
}
toml.insert(key, item.clone());
effective_config.insert(key, item.clone());
}
}
}
debug!("Resulting toml: {toml}");
let conf = PageServerConf::parse_and_validate(&toml, workdir)
debug!("Resulting toml: {effective_config}");
// Construct the runtime representation
let conf = PageServerConf::parse_and_validate(&effective_config, workdir)
.context("Failed to parse pageserver configuration")?;
if update_config {
if init {
info!("Writing pageserver config to '{cfg_file_path}'");
std::fs::write(cfg_file_path, toml.to_string())
std::fs::write(cfg_file_path, effective_config.to_string())
.with_context(|| format!("Failed to write pageserver config to '{cfg_file_path}'"))?;
info!("Config successfully written to '{cfg_file_path}'")
}
@@ -758,18 +755,13 @@ fn cli() -> Command {
// See `settings.md` for more details on the extra configuration patameters pageserver can process
.arg(
Arg::new("config-override")
.long("config-override")
.short('c')
.num_args(1)
.action(ArgAction::Append)
.help("Additional configuration overrides of the ones from the toml config file (or new ones to add there). \
Any option has to be a valid toml document, example: `-c=\"foo='hey'\"` `-c=\"foo={value=1}\"`"),
)
.arg(
Arg::new("update-config")
.long("update-config")
.action(ArgAction::SetTrue)
.help("Update the config file when started"),
)
.arg(
Arg::new("enabled-features")
.long("enabled-features")

View File

@@ -540,7 +540,12 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
js.spawn(async move {
layer
.secondary_tenant
.evict_layer(tenant_manager.get_conf(), layer.timeline_id, layer.name)
.evict_layer(
tenant_manager.get_conf(),
layer.timeline_id,
layer.name,
layer.metadata,
)
.await;
Ok(file_size)
});

View File

@@ -63,6 +63,7 @@ use crate::tenant::remote_timeline_client::list_remote_timelines;
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::Timeline;
use crate::tenant::SpawnMode;
@@ -1228,13 +1229,15 @@ async fn layer_download_handler(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let layer_file_name = get_request_param(&request, "layer_file_name")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let layer_name = LayerFileName::from_str(layer_file_name)
.map_err(|s| ApiError::BadRequest(anyhow::anyhow!(s)))?;
let state = get_state(&request);
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let downloaded = timeline
.download_layer(layer_file_name)
.download_layer(&layer_name)
.await
.map_err(ApiError::InternalServerError)?;
@@ -1258,11 +1261,14 @@ async fn evict_timeline_layer_handler(
let layer_file_name = get_request_param(&request, "layer_file_name")?;
let state = get_state(&request);
let layer_name = LayerFileName::from_str(layer_file_name)
.map_err(|s| ApiError::BadRequest(anyhow::anyhow!(s)))?;
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let evicted = timeline
.evict_layer(layer_file_name)
.evict_layer(&layer_name)
.await
.map_err(ApiError::InternalServerError)?;
@@ -1827,6 +1833,75 @@ async fn timeline_download_remote_layers_handler_get(
json_response(StatusCode::OK, info)
}
async fn timeline_detach_ancestor_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
use crate::tenant::timeline::detach_ancestor::Options;
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let span = tracing::info_span!("detach_ancestor", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id);
async move {
let mut options = Options::default();
let rewrite_concurrency =
parse_query_param::<_, std::num::NonZeroUsize>(&request, "rewrite_concurrency")?;
let copy_concurrency =
parse_query_param::<_, std::num::NonZeroUsize>(&request, "copy_concurrency")?;
[
(&mut options.rewrite_concurrency, rewrite_concurrency),
(&mut options.copy_concurrency, copy_concurrency),
]
.into_iter()
.filter_map(|(target, val)| val.map(|val| (target, val)))
.for_each(|(target, val)| *target = val);
let state = get_state(&request);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
let ctx = RequestContext::new(TaskKind::DetachAncestor, DownloadBehavior::Download);
let ctx = &ctx;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(|e| ApiError::NotFound(e.into()))?;
let (_guard, prepared) = timeline
.prepare_to_detach_from_ancestor(&tenant, options, ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
let res = state
.tenant_manager
.complete_detaching_timeline_ancestor(tenant_shard_id, timeline_id, prepared, ctx)
.await;
match res {
Ok(reparented_timelines) => {
let resp = pageserver_api::models::detach_ancestor::AncestorDetached {
reparented_timelines,
};
json_response(StatusCode::OK, resp)
}
Err(e) => Err(ApiError::InternalServerError(
e.context("timeline detach completion"),
)),
}
}
.instrument(span)
.await
}
async fn deletion_queue_flush(
r: Request<Body>,
cancel: CancellationToken,
@@ -2515,6 +2590,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_remote_layers",
|r| api_handler(r, timeline_download_remote_layers_handler_get),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/detach_ancestor",
|r| api_handler(r, timeline_detach_ancestor_handler),
)
.delete("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
api_handler(r, timeline_delete_handler)
})

View File

@@ -1512,29 +1512,80 @@ static REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER: Lazy<IntCounterVec> = Lazy
});
pub(crate) struct TenantManagerMetrics {
pub(crate) tenant_slots: UIntGauge,
tenant_slots_attached: UIntGauge,
tenant_slots_secondary: UIntGauge,
tenant_slots_inprogress: UIntGauge,
pub(crate) tenant_slot_writes: IntCounter,
pub(crate) unexpected_errors: IntCounter,
}
impl TenantManagerMetrics {
/// Helpers for tracking slots. Note that these do not track the lifetime of TenantSlot objects
/// exactly: they track the lifetime of the slots _in the tenant map_.
pub(crate) fn slot_inserted(&self, slot: &TenantSlot) {
match slot {
TenantSlot::Attached(_) => {
self.tenant_slots_attached.inc();
}
TenantSlot::Secondary(_) => {
self.tenant_slots_secondary.inc();
}
TenantSlot::InProgress(_) => {
self.tenant_slots_inprogress.inc();
}
}
}
pub(crate) fn slot_removed(&self, slot: &TenantSlot) {
match slot {
TenantSlot::Attached(_) => {
self.tenant_slots_attached.dec();
}
TenantSlot::Secondary(_) => {
self.tenant_slots_secondary.dec();
}
TenantSlot::InProgress(_) => {
self.tenant_slots_inprogress.dec();
}
}
}
#[cfg(all(debug_assertions, not(test)))]
pub(crate) fn slots_total(&self) -> u64 {
self.tenant_slots_attached.get()
+ self.tenant_slots_secondary.get()
+ self.tenant_slots_inprogress.get()
}
}
pub(crate) static TENANT_MANAGER: Lazy<TenantManagerMetrics> = Lazy::new(|| {
TenantManagerMetrics {
tenant_slots: register_uint_gauge!(
let tenant_slots = register_uint_gauge_vec!(
"pageserver_tenant_manager_slots",
"How many slots currently exist, including all attached, secondary and in-progress operations",
&["mode"]
)
.expect("failed to define a metric"),
tenant_slot_writes: register_int_counter!(
"pageserver_tenant_manager_slot_writes",
"Writes to a tenant slot, including all of create/attach/detach/delete"
)
.expect("failed to define a metric"),
unexpected_errors: register_int_counter!(
"pageserver_tenant_manager_unexpected_errors_total",
"Number of unexpected conditions encountered: nonzero value indicates a non-fatal bug."
)
.expect("failed to define a metric"),
}
.expect("failed to define a metric");
TenantManagerMetrics {
tenant_slots_attached: tenant_slots
.get_metric_with_label_values(&["attached"])
.unwrap(),
tenant_slots_secondary: tenant_slots
.get_metric_with_label_values(&["secondary"])
.unwrap(),
tenant_slots_inprogress: tenant_slots
.get_metric_with_label_values(&["inprogress"])
.unwrap(),
tenant_slot_writes: register_int_counter!(
"pageserver_tenant_manager_slot_writes",
"Writes to a tenant slot, including all of create/attach/detach/delete"
)
.expect("failed to define a metric"),
unexpected_errors: register_int_counter!(
"pageserver_tenant_manager_unexpected_errors_total",
"Number of unexpected conditions encountered: nonzero value indicates a non-fatal bug."
)
.expect("failed to define a metric"),
}
});
pub(crate) struct DeletionQueueMetrics {
@@ -2275,6 +2326,7 @@ use std::time::{Duration, Instant};
use crate::context::{PageContentKind, RequestContext};
use crate::task_mgr::TaskKind;
use crate::tenant::mgr::TenantSlot;
/// Maintain a per timeline gauge in addition to the global gauge.
struct PerTimelineRemotePhysicalSizeGauge {
@@ -2877,6 +2929,8 @@ pub fn preinitialize_metrics() {
&WALRECEIVER_CANDIDATES_REMOVED,
&tokio_epoll_uring::THREAD_LOCAL_LAUNCH_FAILURES,
&tokio_epoll_uring::THREAD_LOCAL_LAUNCH_SUCCESSES,
&REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
&REMOTE_ONDEMAND_DOWNLOADED_BYTES,
]
.into_iter()
.for_each(|c| {

View File

@@ -10,9 +10,9 @@ use super::tenant::{PageReconstructError, Timeline};
use crate::context::RequestContext;
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::metrics::WAL_INGEST;
use crate::repository::*;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
use crate::walrecord::NeonWalRecord;
use crate::{aux_file, repository::*};
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
@@ -24,6 +24,7 @@ use pageserver_api::key::{
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
};
use pageserver_api::keyspace::SparseKeySpace;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
@@ -670,7 +671,7 @@ impl Timeline {
self.get(CHECKPOINT_KEY, lsn, ctx).await
}
pub(crate) async fn list_aux_files(
async fn list_aux_files_v1(
&self,
lsn: Lsn,
ctx: &RequestContext,
@@ -688,6 +689,63 @@ impl Timeline {
}
}
async fn list_aux_files_v2(
&self,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
let kv = self
.scan(KeySpace::single(Key::metadata_aux_key_range()), lsn, ctx)
.await
.context("scan")?;
let mut result = HashMap::new();
for (_, v) in kv {
let v = v.context("get value")?;
let v = aux_file::decode_file_value_bytes(&v).context("value decode")?;
for (fname, content) in v {
result.insert(fname, content);
}
}
Ok(result)
}
pub(crate) async fn list_aux_files(
&self,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
match self.get_switch_aux_file_policy() {
AuxFilePolicy::V1 => self.list_aux_files_v1(lsn, ctx).await,
AuxFilePolicy::V2 => self.list_aux_files_v2(lsn, ctx).await,
AuxFilePolicy::CrossValidation => {
let v1_result = self.list_aux_files_v1(lsn, ctx).await;
let v2_result = self.list_aux_files_v2(lsn, ctx).await;
match (v1_result, v2_result) {
(Ok(v1), Ok(v2)) => {
if v1 != v2 {
tracing::error!(
"unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}"
);
return Err(PageReconstructError::Other(anyhow::anyhow!(
"unmatched aux file v1 v2 result"
)));
}
Ok(v1)
}
(Ok(_), Err(v2)) => {
tracing::error!("aux file v1 returns Ok while aux file v2 returns an err");
Err(v2)
}
(Err(v1), Ok(_)) => {
tracing::error!("aux file v2 returns Ok while aux file v1 returns an err");
Err(v1)
}
(Err(_), Err(v2)) => Err(v2),
}
}
}
}
/// Does the same as get_current_logical_size but counted on demand.
/// Used to initialize the logical size tracking on startup.
///
@@ -1389,6 +1447,9 @@ impl<'a> DatadirModification<'a> {
}
pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
return Ok(());
}
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
files: HashMap::new(),
})?;
@@ -1404,89 +1465,121 @@ impl<'a> DatadirModification<'a> {
content: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
let file_path = path.to_string();
let content = if content.is_empty() {
None
} else {
Some(Bytes::copy_from_slice(content))
};
let n_files;
let mut aux_files = self.tline.aux_files.lock().await;
if let Some(mut dir) = aux_files.dir.take() {
// We already updated aux files in `self`: emit a delta and update our latest value.
dir.upsert(file_path.clone(), content.clone());
n_files = dir.files.len();
if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
aux_files.n_deltas = 0;
let policy = self.tline.get_switch_aux_file_policy();
if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
let key = aux_file::encode_aux_file_key(path);
// retrieve the key from the engine
let old_val = match self.get(key, ctx).await {
Ok(val) => Some(val),
Err(PageReconstructError::MissingKey(_)) => None,
Err(e) => return Err(e.into()),
};
let files = if let Some(ref old_val) = old_val {
aux_file::decode_file_value(old_val)?
} else {
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
);
aux_files.n_deltas += 1;
}
aux_files.dir = Some(dir);
} else {
// Check if the AUX_FILES_KEY is initialized
match self.get(AUX_FILES_KEY, ctx).await {
Ok(dir_bytes) => {
let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
// Key is already set, we may append a delta
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
file_path: file_path.clone(),
content: content.clone(),
}),
);
dir.upsert(file_path, content);
n_files = dir.files.len();
aux_files.dir = Some(dir);
}
Err(
e @ (PageReconstructError::AncestorStopping(_)
| PageReconstructError::Cancelled
| PageReconstructError::AncestorLsnTimeout(_)),
) => {
// Important that we do not interpret a shutdown error as "not found" and thereby
// reset the map.
return Err(e.into());
}
// Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
// the original code assumes all other errors are missing keys. Therefore, we keep the code path
// the same for now, though in theory, we should only match the `MissingKey` variant.
Err(
PageReconstructError::Other(_)
| PageReconstructError::WalRedo(_)
| PageReconstructError::MissingKey { .. },
) => {
// Key is missing, we must insert an image as the basis for subsequent deltas.
Vec::new()
};
let new_files = if content.is_empty() {
files
.into_iter()
.filter(|(p, _)| &path != p)
.collect::<Vec<_>>()
} else {
files
.into_iter()
.filter(|(p, _)| &path != p)
.chain(std::iter::once((path, content)))
.collect::<Vec<_>>()
};
let new_val = aux_file::encode_file_value(&new_files)?;
self.put(key, Value::Image(new_val.into()));
}
let mut dir = AuxFilesDirectory {
files: HashMap::new(),
};
dir.upsert(file_path, content);
if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy {
let file_path = path.to_string();
let content = if content.is_empty() {
None
} else {
Some(Bytes::copy_from_slice(content))
};
let n_files;
let mut aux_files = self.tline.aux_files.lock().await;
if let Some(mut dir) = aux_files.dir.take() {
// We already updated aux files in `self`: emit a delta and update our latest value.
dir.upsert(file_path.clone(), content.clone());
n_files = dir.files.len();
if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
n_files = 1;
aux_files.dir = Some(dir);
aux_files.n_deltas = 0;
} else {
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
);
aux_files.n_deltas += 1;
}
aux_files.dir = Some(dir);
} else {
// Check if the AUX_FILES_KEY is initialized
match self.get(AUX_FILES_KEY, ctx).await {
Ok(dir_bytes) => {
let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
// Key is already set, we may append a delta
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
file_path: file_path.clone(),
content: content.clone(),
}),
);
dir.upsert(file_path, content);
n_files = dir.files.len();
aux_files.dir = Some(dir);
}
Err(
e @ (PageReconstructError::AncestorStopping(_)
| PageReconstructError::Cancelled
| PageReconstructError::AncestorLsnTimeout(_)),
) => {
// Important that we do not interpret a shutdown error as "not found" and thereby
// reset the map.
return Err(e.into());
}
// Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
// the original code assumes all other errors are missing keys. Therefore, we keep the code path
// the same for now, though in theory, we should only match the `MissingKey` variant.
Err(
PageReconstructError::Other(_)
| PageReconstructError::WalRedo(_)
| PageReconstructError::MissingKey { .. },
) => {
// Key is missing, we must insert an image as the basis for subsequent deltas.
let mut dir = AuxFilesDirectory {
files: HashMap::new(),
};
dir.upsert(file_path, content);
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
n_files = 1;
aux_files.dir = Some(dir);
}
}
}
}
self.pending_directory_entries
.push((DirectoryKind::AuxFiles, n_files));
self.pending_directory_entries
.push((DirectoryKind::AuxFiles, n_files));
}
Ok(())
}

View File

@@ -33,7 +33,6 @@ impl Value {
}
}
#[cfg(test)]
#[derive(Debug, PartialEq)]
pub(crate) enum InvalidInput {
TooShortValue,
@@ -42,10 +41,8 @@ pub(crate) enum InvalidInput {
/// We could have a ValueRef where everything is `serde(borrow)`. Before implementing that, lets
/// use this type for querying if a slice looks some particular way.
#[cfg(test)]
pub(crate) struct ValueBytes;
#[cfg(test)]
impl ValueBytes {
pub(crate) fn will_init(raw: &[u8]) -> Result<bool, InvalidInput> {
if raw.len() < 12 {

View File

@@ -319,6 +319,9 @@ pub enum TaskKind {
// Eviction. One per timeline.
Eviction,
// Ingest housekeeping (flushing ephemeral layers on time threshold or disk pressure)
IngestHousekeeping,
/// See [`crate::disk_usage_eviction_task`].
DiskUsageEviction,
@@ -367,6 +370,8 @@ pub enum TaskKind {
#[cfg(test)]
UnitTest,
DetachAncestor,
}
#[derive(Default)]

View File

@@ -322,6 +322,9 @@ pub struct Tenant {
/// All [`Tenant::timelines`] of a given [`Tenant`] instance share the same [`throttle::Throttle`] instance.
pub(crate) timeline_get_throttle:
Arc<throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>>,
/// An ongoing timeline detach must be checked during attempts to GC or compact a timeline.
ongoing_timeline_detach: std::sync::Mutex<Option<(TimelineId, utils::completion::Barrier)>>,
}
impl std::fmt::Debug for Tenant {
@@ -1676,6 +1679,34 @@ impl Tenant {
Ok(())
}
// Call through to all timelines to freeze ephemeral layers if needed. Usually
// this happens during ingest: this background housekeeping is for freezing layers
// that are open but haven't been written to for some time.
async fn ingest_housekeeping(&self) {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
// compaction runs.
let timelines = {
self.timelines
.lock()
.unwrap()
.values()
.filter_map(|timeline| {
if timeline.is_active() {
Some(timeline.clone())
} else {
None
}
})
.collect::<Vec<_>>()
};
for timeline in &timelines {
timeline.maybe_freeze_ephemeral_layer().await;
}
}
pub fn current_state(&self) -> TenantState {
self.state.borrow().clone()
}
@@ -2529,6 +2560,7 @@ impl Tenant {
&crate::metrics::tenant_throttling::TIMELINE_GET,
)),
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
ongoing_timeline_detach: std::sync::Mutex::default(),
}
}
@@ -3726,7 +3758,7 @@ pub(crate) mod harness {
image_layer_creation_check_threshold: Some(
tenant_conf.image_layer_creation_check_threshold,
),
switch_to_aux_file_v2: Some(tenant_conf.switch_to_aux_file_v2),
switch_aux_file_policy: Some(tenant_conf.switch_aux_file_policy),
}
}
}

View File

@@ -9,6 +9,7 @@
//! may lead to a data loss.
//!
use anyhow::bail;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::CompactionAlgorithm;
use pageserver_api::models::EvictionPolicy;
use pageserver_api::models::{self, ThrottleConfig};
@@ -370,9 +371,9 @@ pub struct TenantConf {
// Expresed in multiples of checkpoint distance.
pub image_layer_creation_check_threshold: u8,
/// Switch to aux file v2. Switching this flag requires the user has not written any aux file into
/// Switch to a new aux file policy. Switching this flag requires the user has not written any aux file into
/// the storage before, and this flag cannot be switched back. Otherwise there will be data corruptions.
pub switch_to_aux_file_v2: bool,
pub switch_aux_file_policy: AuxFilePolicy,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -471,7 +472,7 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub switch_to_aux_file_v2: Option<bool>,
pub switch_aux_file_policy: Option<AuxFilePolicy>,
}
impl TenantConfOpt {
@@ -529,9 +530,9 @@ impl TenantConfOpt {
image_layer_creation_check_threshold: self
.image_layer_creation_check_threshold
.unwrap_or(global_conf.image_layer_creation_check_threshold),
switch_to_aux_file_v2: self
.switch_to_aux_file_v2
.unwrap_or(global_conf.switch_to_aux_file_v2),
switch_aux_file_policy: self
.switch_aux_file_policy
.unwrap_or(global_conf.switch_aux_file_policy),
}
}
}
@@ -573,7 +574,7 @@ impl Default for TenantConf {
lazy_slru_download: false,
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
switch_to_aux_file_v2: false,
switch_aux_file_policy: AuxFilePolicy::V1,
}
}
}
@@ -648,7 +649,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
lazy_slru_download: value.lazy_slru_download,
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
switch_to_aux_file_v2: value.switch_to_aux_file_v2,
switch_aux_file_policy: value.switch_aux_file_policy,
}
}
}

View File

@@ -585,9 +585,20 @@ impl DeleteTenantFlow {
// FIXME: we should not be modifying this from outside of mgr.rs.
// This will go away when we simplify deletion (https://github.com/neondatabase/neon/issues/5080)
crate::metrics::TENANT_MANAGER
.tenant_slots
.set(locked.len() as u64);
// Update stats
match &removed {
TenantsMapRemoveResult::Occupied(slot) => {
crate::metrics::TENANT_MANAGER.slot_removed(slot);
}
TenantsMapRemoveResult::InProgress(barrier) => {
crate::metrics::TENANT_MANAGER
.slot_removed(&TenantSlot::InProgress(barrier.clone()));
}
TenantsMapRemoveResult::Vacant => {
// Nothing changed in map, no metric update
}
}
match removed {
TenantsMapRemoveResult::Occupied(TenantSlot::Attached(tenant)) => {

View File

@@ -207,6 +207,24 @@ impl TimelineMetadata {
self.body.ancestor_lsn
}
/// When reparenting, the `ancestor_lsn` does not change.
pub fn reparent(&mut self, timeline: &TimelineId) {
assert!(self.body.ancestor_timeline.is_some());
// no assertion for redoing this: it's fine, we may have to repeat this multiple times over
self.body.ancestor_timeline = Some(*timeline);
}
pub fn detach_from_ancestor(&mut self, timeline: &TimelineId, ancestor_lsn: &Lsn) {
if let Some(ancestor) = self.body.ancestor_timeline {
assert_eq!(ancestor, *timeline);
}
if self.body.ancestor_lsn != Lsn(0) {
assert_eq!(self.body.ancestor_lsn, *ancestor_lsn);
}
self.body.ancestor_timeline = None;
self.body.ancestor_lsn = Lsn(0);
}
pub fn latest_gc_cutoff_lsn(&self) -> Lsn {
self.body.latest_gc_cutoff_lsn
}

View File

@@ -56,6 +56,7 @@ use utils::id::{TenantId, TimelineId};
use super::delete::DeleteTenantError;
use super::secondary::SecondaryTenant;
use super::timeline::detach_ancestor::PreparedTimelineDetach;
use super::TenantSharedResources;
/// For a tenant that appears in TenantsMap, it may either be
@@ -246,6 +247,7 @@ impl TenantsMap {
}
}
#[cfg(all(debug_assertions, not(test)))]
pub(crate) fn len(&self) -> usize {
match self {
TenantsMap::Initializing => 0,
@@ -746,6 +748,7 @@ pub async fn init_tenant_mgr(
}
};
METRICS.slot_inserted(&slot);
tenants.insert(tenant_shard_id, slot);
}
@@ -753,7 +756,7 @@ pub async fn init_tenant_mgr(
let mut tenants_map = TENANTS.write().unwrap();
assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
METRICS.tenant_slots.set(tenants.len() as u64);
*tenants_map = TenantsMap::Open(tenants);
Ok(TenantManager {
@@ -824,6 +827,14 @@ fn tenant_spawn(
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
let mut join_set = JoinSet::new();
#[cfg(all(debug_assertions, not(test)))]
{
// Check that our metrics properly tracked the size of the tenants map. This is a convenient location to check,
// as it happens implicitly at the end of tests etc.
let m = tenants.read().unwrap();
debug_assert_eq!(METRICS.slots_total(), m.len() as u64);
}
// Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
let (total_in_progress, total_attached) = {
let mut m = tenants.write().unwrap();
@@ -1997,6 +2008,101 @@ impl TenantManager {
})
.collect())
}
/// Completes an earlier prepared timeline detach ancestor.
pub(crate) async fn complete_detaching_timeline_ancestor(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
prepared: PreparedTimelineDetach,
ctx: &RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
struct RevertOnDropSlot(Option<SlotGuard>);
impl Drop for RevertOnDropSlot {
fn drop(&mut self) {
if let Some(taken) = self.0.take() {
taken.revert();
}
}
}
impl RevertOnDropSlot {
fn into_inner(mut self) -> SlotGuard {
self.0.take().unwrap()
}
}
impl std::ops::Deref for RevertOnDropSlot {
type Target = SlotGuard;
fn deref(&self) -> &Self::Target {
self.0.as_ref().unwrap()
}
}
let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let slot_guard = RevertOnDropSlot(Some(slot_guard));
let tenant = {
let Some(old_slot) = slot_guard.get_old_value() else {
anyhow::bail!(
"Tenant not found when trying to complete detaching timeline ancestor"
);
};
let Some(tenant) = old_slot.get_attached() else {
anyhow::bail!("Tenant is not in attached state");
};
if !tenant.is_active() {
anyhow::bail!("Tenant is not active");
}
tenant.clone()
};
let timeline = tenant.get_timeline(timeline_id, true)?;
let reparented = timeline
.complete_detaching_timeline_ancestor(&tenant, prepared, ctx)
.await?;
let mut slot_guard = slot_guard.into_inner();
let (_guard, progress) = utils::completion::channel();
match tenant.shutdown(progress, ShutdownMode::Hard).await {
Ok(()) => {
slot_guard.drop_old_value()?;
}
Err(_barrier) => {
slot_guard.revert();
// this really should not happen, at all, unless shutdown was already going?
anyhow::bail!("Cannot restart Tenant, already shutting down");
}
}
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
let shard_identity = config.shard;
let tenant = tenant_spawn(
self.conf,
tenant_shard_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(config)?,
shard_identity,
None,
self.tenants,
SpawnMode::Eager,
ctx,
)?;
slot_guard.upsert(TenantSlot::Attached(tenant))?;
Ok(reparented)
}
}
#[derive(Debug, thiserror::Error)]
@@ -2428,10 +2534,13 @@ impl SlotGuard {
TenantsMap::Open(m) => m,
};
METRICS.slot_inserted(&new_value);
let replaced = m.insert(self.tenant_shard_id, new_value);
self.upserted = true;
METRICS.tenant_slots.set(m.len() as u64);
if let Some(replaced) = replaced.as_ref() {
METRICS.slot_removed(replaced);
}
replaced
};
@@ -2541,9 +2650,13 @@ impl Drop for SlotGuard {
}
if self.old_value_is_shutdown() {
METRICS.slot_removed(entry.get());
entry.remove();
} else {
entry.insert(self.old_value.take().unwrap());
let inserting = self.old_value.take().unwrap();
METRICS.slot_inserted(&inserting);
let replaced = entry.insert(inserting);
METRICS.slot_removed(&replaced);
}
}
Entry::Vacant(_) => {
@@ -2554,8 +2667,6 @@ impl Drop for SlotGuard {
);
}
}
METRICS.tenant_slots.set(m.len() as u64);
}
}
@@ -2635,7 +2746,9 @@ fn tenant_map_acquire_slot_impl(
}
_ => {
let (completion, barrier) = utils::completion::channel();
v.insert(TenantSlot::InProgress(barrier));
let inserting = TenantSlot::InProgress(barrier);
METRICS.slot_inserted(&inserting);
v.insert(inserting);
tracing::debug!("Vacant, inserted InProgress");
Ok(SlotGuard::new(*tenant_shard_id, None, completion))
}
@@ -2671,7 +2784,10 @@ fn tenant_map_acquire_slot_impl(
_ => {
// Happy case: the slot was not in any state that violated our mode
let (completion, barrier) = utils::completion::channel();
let old_value = o.insert(TenantSlot::InProgress(barrier));
let in_progress = TenantSlot::InProgress(barrier);
METRICS.slot_inserted(&in_progress);
let old_value = o.insert(in_progress);
METRICS.slot_removed(&old_value);
tracing::debug!("Occupied, replaced with InProgress");
Ok(SlotGuard::new(
*tenant_shard_id,

View File

@@ -570,7 +570,7 @@ impl RemoteTimelineClient {
// ahead of what's _actually_ on the remote during index upload.
upload_queue.latest_metadata = metadata.clone();
self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
self.schedule_index_upload(upload_queue);
Ok(())
}
@@ -591,7 +591,7 @@ impl RemoteTimelineClient {
upload_queue.latest_metadata.apply(update);
self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
self.schedule_index_upload(upload_queue);
Ok(())
}
@@ -611,18 +611,14 @@ impl RemoteTimelineClient {
let upload_queue = guard.initialized_mut()?;
if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
self.schedule_index_upload(upload_queue, upload_queue.latest_metadata.clone());
self.schedule_index_upload(upload_queue);
}
Ok(())
}
/// Launch an index-file upload operation in the background (internal function)
fn schedule_index_upload(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
metadata: TimelineMetadata,
) {
fn schedule_index_upload(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
info!(
@@ -631,11 +627,7 @@ impl RemoteTimelineClient {
upload_queue.latest_files_changes_since_metadata_upload_scheduled,
);
let index_part = IndexPart::new(
upload_queue.latest_files.clone(),
disk_consistent_lsn,
metadata,
);
let index_part = IndexPart::from(&*upload_queue);
let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
self.metric_begin(&op);
upload_queue.queued_operations.push_back(op);
@@ -645,9 +637,61 @@ impl RemoteTimelineClient {
self.launch_queued_tasks(upload_queue);
}
pub(crate) async fn schedule_reparenting_and_wait(
self: &Arc<Self>,
new_parent: &TimelineId,
) -> anyhow::Result<()> {
// FIXME: because of how Timeline::schedule_uploads works when called from layer flushing
// and reads the in-memory part we cannot do the detaching like this
let receiver = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
upload_queue.latest_metadata.reparent(new_parent);
self.schedule_index_upload(upload_queue);
self.schedule_barrier0(upload_queue)
};
Self::wait_completion0(receiver).await
}
/// Schedules uploading a new version of `index_part.json` with the given layers added,
/// detaching from ancestor and waits for it to complete.
///
/// Launch an upload operation in the background.
///
/// This is used with `Timeline::detach_ancestor` functionality.
pub(crate) async fn schedule_adding_existing_layers_to_index_detach_and_wait(
self: &Arc<Self>,
layers: &[Layer],
adopted: (TimelineId, Lsn),
) -> anyhow::Result<()> {
let barrier = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
upload_queue
.latest_metadata
.detach_from_ancestor(&adopted.0, &adopted.1);
for layer in layers {
upload_queue
.latest_files
.insert(layer.layer_desc().filename(), layer.metadata());
}
self.schedule_index_upload(upload_queue);
let barrier = self.schedule_barrier0(upload_queue);
self.launch_queued_tasks(upload_queue);
barrier
};
Self::wait_completion0(barrier).await
}
/// Launch an upload operation in the background; the file is added to be included in next
/// `index_part.json` upload.
pub(crate) fn schedule_layer_file_upload(
self: &Arc<Self>,
layer: ResidentLayer,
@@ -673,9 +717,11 @@ impl RemoteTimelineClient {
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
info!(
"scheduled layer file upload {layer} gen={:?} shard={:?}",
metadata.generation, metadata.shard
gen=?metadata.generation,
shard=?metadata.shard,
"scheduled layer file upload {layer}",
);
let op = UploadOp::UploadLayer(layer, metadata);
self.metric_begin(&op);
upload_queue.queued_operations.push_back(op);
@@ -738,10 +784,6 @@ impl RemoteTimelineClient {
where
I: IntoIterator<Item = LayerFileName>,
{
// Deleting layers doesn't affect the values stored in TimelineMetadata,
// so we don't need update it. Just serialize it.
let metadata = upload_queue.latest_metadata.clone();
// Decorate our list of names with each name's metadata, dropping
// names that are unexpectedly missing from our metadata. This metadata
// is later used when physically deleting layers, to construct key paths.
@@ -780,7 +822,7 @@ impl RemoteTimelineClient {
// index_part update, because that needs to be uploaded before we can actually delete the
// files.
if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
self.schedule_index_upload(upload_queue, metadata);
self.schedule_index_upload(upload_queue);
}
with_metadata
@@ -882,12 +924,18 @@ impl RemoteTimelineClient {
/// Wait for all previously scheduled uploads/deletions to complete
pub(crate) async fn wait_completion(self: &Arc<Self>) -> anyhow::Result<()> {
let mut receiver = {
let receiver = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_barrier0(upload_queue)
};
Self::wait_completion0(receiver).await
}
async fn wait_completion0(
mut receiver: tokio::sync::watch::Receiver<()>,
) -> anyhow::Result<()> {
if receiver.changed().await.is_err() {
anyhow::bail!("wait_completion aborted because upload queue was stopped");
}
@@ -1003,8 +1051,7 @@ impl RemoteTimelineClient {
let deleted_at = Utc::now().naive_utc();
stopped.deleted_at = SetDeletedFlagProgress::InProgress(deleted_at);
let mut index_part = IndexPart::try_from(&stopped.upload_queue_for_deletion)
.context("IndexPart serialize")?;
let mut index_part = IndexPart::from(&stopped.upload_queue_for_deletion);
index_part.deleted_at = Some(deleted_at);
index_part
};
@@ -1085,6 +1132,93 @@ impl RemoteTimelineClient {
Ok(())
}
/// Uploads the given layer **without** adding it to be part of a future `index_part.json` upload.
///
/// This is not normally needed.
pub(crate) async fn upload_layer_file(
self: &Arc<Self>,
uploaded: &ResidentLayer,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let remote_path = remote_layer_path(
&self.tenant_shard_id.tenant_id,
&self.timeline_id,
self.tenant_shard_id.to_index(),
&uploaded.layer_desc().filename(),
uploaded.metadata().generation,
);
backoff::retry(
|| async {
upload::upload_timeline_layer(
&self.storage_impl,
uploaded.local_path(),
&remote_path,
uploaded.metadata().file_size(),
cancel,
)
.await
},
TimeoutOrCancel::caused_by_cancel,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"upload a layer without adding it to latest files",
cancel,
)
.await
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
.context("upload a layer without adding it to latest files")
}
/// Copies the `adopted` remote existing layer to the remote path of `adopted_as`. The layer is
/// not added to be part of a future `index_part.json` upload.
pub(crate) async fn copy_timeline_layer(
self: &Arc<Self>,
adopted: &Layer,
adopted_as: &Layer,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let source_remote_path = remote_layer_path(
&self.tenant_shard_id.tenant_id,
&adopted
.get_timeline_id()
.expect("Source timeline should be alive"),
self.tenant_shard_id.to_index(),
&adopted.layer_desc().filename(),
adopted.metadata().generation,
);
let target_remote_path = remote_layer_path(
&self.tenant_shard_id.tenant_id,
&self.timeline_id,
self.tenant_shard_id.to_index(),
&adopted_as.layer_desc().filename(),
adopted_as.metadata().generation,
);
backoff::retry(
|| async {
upload::copy_timeline_layer(
&self.storage_impl,
&source_remote_path,
&target_remote_path,
cancel,
)
.await
},
TimeoutOrCancel::caused_by_cancel,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"copy timeline layer",
cancel,
)
.await
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
.context("remote copy timeline layer")
}
async fn flush_deletion_queue(&self) -> Result<(), DeletionQueueError> {
match tokio::time::timeout(
DELETION_QUEUE_FLUSH_TIMEOUT,
@@ -1256,7 +1390,7 @@ impl RemoteTimelineClient {
while let Some(next_op) = upload_queue.queued_operations.front() {
// Can we run this task now?
let can_run_now = match next_op {
UploadOp::UploadLayer(_, _) => {
UploadOp::UploadLayer(..) => {
// Can always be scheduled.
true
}
@@ -1383,13 +1517,25 @@ impl RemoteTimelineClient {
let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
let path = layer.local_path();
let local_path = layer.local_path();
// We should only be uploading layers created by this `Tenant`'s lifetime, so
// the metadata in the upload should always match our current generation.
assert_eq!(layer_metadata.generation, self.generation);
let remote_path = remote_layer_path(
&self.tenant_shard_id.tenant_id,
&self.timeline_id,
layer_metadata.shard,
&layer.layer_desc().filename(),
layer_metadata.generation,
);
upload::upload_timeline_layer(
self.conf,
&self.storage_impl,
path,
layer_metadata,
self.generation,
local_path,
&remote_path,
layer_metadata.file_size(),
&self.cancel,
)
.measure_remote_op(
@@ -1818,29 +1964,6 @@ pub fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
}
}
/// Files on the remote storage are stored with paths, relative to the workdir.
/// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
///
/// Errors if the path provided does not start from pageserver's workdir.
pub fn remote_path(
conf: &PageServerConf,
local_path: &Utf8Path,
generation: Generation,
) -> anyhow::Result<RemotePath> {
let stripped = local_path
.strip_prefix(&conf.workdir)
.context("Failed to strip workdir prefix")?;
let suffixed = format!("{0}{1}", stripped, generation.get_suffix());
RemotePath::new(Utf8Path::new(&suffixed)).with_context(|| {
format!(
"to resolve remote part of path {:?} for base {:?}",
local_path, conf.workdir
)
})
}
#[cfg(test)]
mod tests {
use super::*;
@@ -1848,6 +1971,7 @@ mod tests {
context::RequestContext,
tenant::{
harness::{TenantHarness, TIMELINE_ID},
storage_layer::layer::local_layer_path,
Tenant, Timeline,
},
DEFAULT_PG_VERSION,
@@ -2030,11 +2154,20 @@ mod tests {
]
.into_iter()
.map(|(name, contents): (LayerFileName, Vec<u8>)| {
std::fs::write(timeline_path.join(name.file_name()), &contents).unwrap();
let local_path = local_layer_path(
harness.conf,
&timeline.tenant_shard_id,
&timeline.timeline_id,
&name,
&generation,
);
std::fs::write(&local_path, &contents).unwrap();
Layer::for_resident(
harness.conf,
&timeline,
local_path,
name,
LayerFileMetadata::new(contents.len() as u64, generation, shard),
)
@@ -2171,19 +2304,22 @@ mod tests {
..
} = TestSetup::new("metrics").await.unwrap();
let client = timeline.remote_client.as_ref().unwrap();
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let local_path = local_layer_path(
harness.conf,
&timeline.tenant_shard_id,
&timeline.timeline_id,
&layer_file_name_1,
&harness.generation,
);
let content_1 = dummy_contents("foo");
std::fs::write(
timeline_path.join(layer_file_name_1.file_name()),
&content_1,
)
.unwrap();
std::fs::write(&local_path, &content_1).unwrap();
let layer_file_1 = Layer::for_resident(
harness.conf,
&timeline,
local_path,
layer_file_name_1.clone(),
LayerFileMetadata::new(content_1.len() as u64, harness.generation, harness.shard),
);
@@ -2252,12 +2388,7 @@ mod tests {
async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart {
// An empty IndexPart, just sufficient to ensure deserialization will succeed
let example_metadata = TimelineMetadata::example();
let example_index_part = IndexPart::new(
HashMap::new(),
example_metadata.disk_consistent_lsn(),
example_metadata,
);
let example_index_part = IndexPart::example();
let index_part_bytes = serde_json::to_vec(&example_index_part).unwrap();

View File

@@ -21,6 +21,7 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::layer::local_layer_path;
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::Generation;
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
@@ -55,7 +56,13 @@ pub async fn download_layer_file<'a>(
debug_assert_current_span_has_tenant_and_timeline_id();
let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
let local_path = timeline_path.join(layer_file_name.file_name());
let local_path = local_layer_path(
conf,
&tenant_shard_id,
&timeline_id,
layer_file_name,
&layer_metadata.generation,
);
let remote_path = remote_layer_path(
&tenant_shard_id.tenant_id,

View File

@@ -6,7 +6,6 @@ use std::collections::HashMap;
use chrono::NaiveDateTime;
use serde::{Deserialize, Serialize};
use utils::bin_ser::SerializeError;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::storage_layer::LayerFileName;
@@ -104,15 +103,14 @@ impl IndexPart {
pub const FILE_NAME: &'static str = "index_part.json";
pub fn new(
layers_and_metadata: HashMap<LayerFileName, LayerFileMetadata>,
fn new(
layers_and_metadata: &HashMap<LayerFileName, LayerFileMetadata>,
disk_consistent_lsn: Lsn,
metadata: TimelineMetadata,
) -> Self {
// Transform LayerFileMetadata into IndexLayerMetadata
let layer_metadata = layers_and_metadata
.into_iter()
.map(|(k, v)| (k, IndexLayerMetadata::from(v)))
.iter()
.map(|(k, v)| (k.to_owned(), IndexLayerMetadata::from(v)))
.collect();
Self {
@@ -141,20 +139,24 @@ impl IndexPart {
pub fn to_s3_bytes(&self) -> serde_json::Result<Vec<u8>> {
serde_json::to_vec(self)
}
#[cfg(test)]
pub(crate) fn example() -> Self {
let example_metadata = TimelineMetadata::example();
Self::new(
&HashMap::new(),
example_metadata.disk_consistent_lsn(),
example_metadata,
)
}
}
impl TryFrom<&UploadQueueInitialized> for IndexPart {
type Error = SerializeError;
impl From<&UploadQueueInitialized> for IndexPart {
fn from(uq: &UploadQueueInitialized) -> Self {
let disk_consistent_lsn = uq.latest_metadata.disk_consistent_lsn();
let metadata = uq.latest_metadata.clone();
fn try_from(upload_queue: &UploadQueueInitialized) -> Result<Self, Self::Error> {
let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
let metadata = upload_queue.latest_metadata.clone();
Ok(Self::new(
upload_queue.latest_files.clone(),
disk_consistent_lsn,
metadata,
))
Self::new(&uq.latest_files, disk_consistent_lsn, metadata)
}
}
@@ -172,8 +174,8 @@ pub struct IndexLayerMetadata {
pub shard: ShardIndex,
}
impl From<LayerFileMetadata> for IndexLayerMetadata {
fn from(other: LayerFileMetadata) -> Self {
impl From<&LayerFileMetadata> for IndexLayerMetadata {
fn from(other: &LayerFileMetadata) -> Self {
IndexLayerMetadata {
file_size: other.file_size,
generation: other.generation,

View File

@@ -12,18 +12,13 @@ use tokio_util::sync::CancellationToken;
use utils::backoff;
use super::Generation;
use crate::{
config::PageServerConf,
tenant::remote_timeline_client::{
index::IndexPart, remote_index_path, remote_initdb_archive_path,
remote_initdb_preserved_archive_path, remote_path,
},
use crate::tenant::remote_timeline_client::{
index::IndexPart, remote_index_path, remote_initdb_archive_path,
remote_initdb_preserved_archive_path,
};
use remote_storage::{GenericRemoteStorage, TimeTravelError};
use remote_storage::{GenericRemoteStorage, RemotePath, TimeTravelError};
use utils::id::{TenantId, TimelineId};
use super::index::LayerFileMetadata;
use tracing::info;
/// Serializes and uploads the given index part data to the remote storage.
@@ -65,11 +60,10 @@ pub(crate) async fn upload_index_part<'a>(
///
/// On an error, bumps the retries count and reschedules the entire task.
pub(super) async fn upload_timeline_layer<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage,
source_path: &'a Utf8Path,
known_metadata: &'a LayerFileMetadata,
generation: Generation,
local_path: &'a Utf8Path,
remote_path: &'a RemotePath,
metadata_size: u64,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
fail_point!("before-upload-layer", |_| {
@@ -78,8 +72,7 @@ pub(super) async fn upload_timeline_layer<'a>(
pausable_failpoint!("before-upload-layer-pausable");
let storage_path = remote_path(conf, source_path, generation)?;
let source_file_res = fs::File::open(&source_path).await;
let source_file_res = fs::File::open(&local_path).await;
let source_file = match source_file_res {
Ok(source_file) => source_file,
Err(e) if e.kind() == ErrorKind::NotFound => {
@@ -90,34 +83,49 @@ pub(super) async fn upload_timeline_layer<'a>(
// it has been written to disk yet.
//
// This is tested against `test_compaction_delete_before_upload`
info!(path = %source_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
info!(path = %local_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
return Ok(());
}
Err(e) => {
Err(e).with_context(|| format!("open a source file for layer {source_path:?}"))?
}
Err(e) => Err(e).with_context(|| format!("open a source file for layer {local_path:?}"))?,
};
let fs_size = source_file
.metadata()
.await
.with_context(|| format!("get the source file metadata for layer {source_path:?}"))?
.with_context(|| format!("get the source file metadata for layer {local_path:?}"))?
.len();
let metadata_size = known_metadata.file_size();
if metadata_size != fs_size {
bail!("File {source_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
bail!("File {local_path:?} has its current FS size {fs_size} diferent from initially determined {metadata_size}");
}
let fs_size = usize::try_from(fs_size)
.with_context(|| format!("convert {source_path:?} size {fs_size} usize"))?;
.with_context(|| format!("convert {local_path:?} size {fs_size} usize"))?;
let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
storage
.upload(reader, fs_size, &storage_path, None, cancel)
.upload(reader, fs_size, remote_path, None, cancel)
.await
.with_context(|| format!("upload layer from local path '{source_path}'"))
.with_context(|| format!("upload layer from local path '{local_path}'"))
}
pub(super) async fn copy_timeline_layer(
storage: &GenericRemoteStorage,
source_path: &RemotePath,
target_path: &RemotePath,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
fail_point!("before-copy-layer", |_| {
bail!("failpoint before-copy-layer")
});
pausable_failpoint!("before-copy-layer-pausable");
storage
.copy_object(source_path, target_path, cancel)
.await
.with_context(|| format!("copy layer {source_path} to {target_path}"))
}
/// Uploads the given `initdb` data to the remote storage.

View File

@@ -21,8 +21,9 @@ use self::{
use super::{
config::{SecondaryLocationConfig, TenantConfOpt},
mgr::TenantManager,
remote_timeline_client::LayerFileMetadata,
span::debug_assert_current_span_has_tenant_id,
storage_layer::LayerFileName,
storage_layer::{layer::local_layer_path, LayerFileName},
};
use pageserver_api::{
@@ -182,6 +183,7 @@ impl SecondaryTenant {
conf: &PageServerConf,
timeline_id: TimelineId,
name: LayerFileName,
metadata: LayerFileMetadata,
) {
debug_assert_current_span_has_tenant_id();
@@ -195,9 +197,13 @@ impl SecondaryTenant {
let now = SystemTime::now();
let path = conf
.timeline_path(&self.tenant_shard_id, &timeline_id)
.join(name.file_name());
let local_path = local_layer_path(
conf,
&self.tenant_shard_id,
&timeline_id,
&name,
&metadata.generation,
);
let this = self.clone();
@@ -208,7 +214,7 @@ impl SecondaryTenant {
// it, the secondary downloader could have seen an updated heatmap that
// resulted in a layer being deleted.
// Other local I/O errors are process-fatal: these should never happen.
let deleted = std::fs::remove_file(path);
let deleted = std::fs::remove_file(local_path);
let not_found = deleted
.as_ref()

View File

@@ -22,7 +22,7 @@ use crate::{
FAILED_REMOTE_OP_RETRIES,
},
span::debug_assert_current_span_has_tenant_id,
storage_layer::LayerFileName,
storage_layer::{layer::local_layer_path, LayerFileName},
tasks::{warn_when_period_overrun, BackgroundLoopKind},
},
virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
@@ -621,12 +621,12 @@ impl<'a> TenantDownloader<'a> {
let layers_in_heatmap = heatmap_timeline
.layers
.iter()
.map(|l| &l.name)
.map(|l| (&l.name, l.metadata.generation))
.collect::<HashSet<_>>();
let layers_on_disk = timeline_state
.on_disk_layers
.iter()
.map(|l| l.0)
.map(|l| (l.0, l.1.metadata.generation))
.collect::<HashSet<_>>();
let mut layer_count = layers_on_disk.len();
@@ -637,16 +637,24 @@ impl<'a> TenantDownloader<'a> {
.sum();
// Remove on-disk layers that are no longer present in heatmap
for layer in layers_on_disk.difference(&layers_in_heatmap) {
for (layer_file_name, generation) in layers_on_disk.difference(&layers_in_heatmap) {
layer_count -= 1;
layer_byte_count -= timeline_state
.on_disk_layers
.get(layer)
.get(layer_file_name)
.unwrap()
.metadata
.file_size();
delete_layers.push((*timeline_id, (*layer).clone()));
let local_path = local_layer_path(
self.conf,
self.secondary_state.get_tenant_shard_id(),
timeline_id,
layer_file_name,
generation,
);
delete_layers.push((*timeline_id, (*layer_file_name).clone(), local_path));
}
progress.bytes_downloaded += layer_byte_count;
@@ -661,11 +669,7 @@ impl<'a> TenantDownloader<'a> {
}
// Execute accumulated deletions
for (timeline_id, layer_name) in delete_layers {
let timeline_path = self
.conf
.timeline_path(self.secondary_state.get_tenant_shard_id(), &timeline_id);
let local_path = timeline_path.join(layer_name.to_string());
for (timeline_id, layer_name, local_path) in delete_layers {
tracing::info!(timeline_id=%timeline_id, "Removing secondary local layer {layer_name} because it's absent in heatmap",);
tokio::fs::remove_file(&local_path)
@@ -754,9 +758,6 @@ impl<'a> TenantDownloader<'a> {
) -> Result<(), UpdateError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
let timeline_path = self
.conf
.timeline_path(tenant_shard_id, &timeline.timeline_id);
// Accumulate updates to the state
let mut touched = Vec::new();
@@ -806,10 +807,14 @@ impl<'a> TenantDownloader<'a> {
if cfg!(debug_assertions) {
// Debug for https://github.com/neondatabase/neon/issues/6966: check that the files we think
// are already present on disk are really there.
let local_path = self
.conf
.timeline_path(tenant_shard_id, &timeline.timeline_id)
.join(layer.name.file_name());
let local_path = local_layer_path(
self.conf,
tenant_shard_id,
&timeline.timeline_id,
&layer.name,
&layer.metadata.generation,
);
match tokio::fs::metadata(&local_path).await {
Ok(meta) => {
tracing::debug!(
@@ -903,7 +908,13 @@ impl<'a> TenantDownloader<'a> {
};
if downloaded_bytes != layer.metadata.file_size {
let local_path = timeline_path.join(layer.name.to_string());
let local_path = local_layer_path(
self.conf,
tenant_shard_id,
&timeline.timeline_id,
&layer.name,
&layer.metadata.generation,
);
tracing::warn!(
"Downloaded layer {} with unexpected size {} != {}. Removing download.",

View File

@@ -1139,15 +1139,15 @@ impl DeltaLayerInner {
Ok(all_keys)
}
/// Using the given writer, write out a truncated version, where LSNs higher than the
/// truncate_at are missing.
#[cfg(test)]
/// Using the given writer, write out a version which has the earlier Lsns than `until`.
///
/// Return the amount of key value records pushed to the writer.
pub(super) async fn copy_prefix(
&self,
writer: &mut DeltaLayerWriter,
truncate_at: Lsn,
until: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> anyhow::Result<usize> {
use crate::tenant::vectored_blob_io::{
BlobMeta, VectoredReadBuilder, VectoredReadExtended,
};
@@ -1211,6 +1211,8 @@ impl DeltaLayerInner {
// FIXME: buffering of DeltaLayerWriter
let mut per_blob_copy = Vec::new();
let mut records = 0;
while let Some(item) = stream.try_next().await? {
tracing::debug!(?item, "popped");
let offset = item
@@ -1229,7 +1231,7 @@ impl DeltaLayerInner {
prev = Option::from(item);
let actionable = actionable.filter(|x| x.0.lsn < truncate_at);
let actionable = actionable.filter(|x| x.0.lsn < until);
let builder = if let Some((meta, offsets)) = actionable {
// extend or create a new builder
@@ -1297,7 +1299,7 @@ impl DeltaLayerInner {
let will_init = crate::repository::ValueBytes::will_init(data)
.inspect_err(|_e| {
#[cfg(feature = "testing")]
tracing::error!(data=?utils::Hex(data), err=?_e, "failed to parse will_init out of serialized value");
tracing::error!(data=?utils::Hex(data), err=?_e, %key, %lsn, "failed to parse will_init out of serialized value");
})
.unwrap_or(false);
@@ -1314,7 +1316,10 @@ impl DeltaLayerInner {
)
.await;
per_blob_copy = tmp;
res?;
records += 1;
}
buffer = Some(res.buf);
@@ -1326,7 +1331,7 @@ impl DeltaLayerInner {
"with the sentinel above loop should had handled all"
);
Ok(())
Ok(records)
}
pub(super) async fn dump(&self, ctx: &RequestContext) -> anyhow::Result<()> {
@@ -1399,7 +1404,6 @@ impl DeltaLayerInner {
Ok(())
}
#[cfg(test)]
fn stream_index_forwards<'a, R>(
&'a self,
reader: &'a DiskBtreeReader<R, DELTA_KEY_SIZE>,

View File

@@ -2,11 +2,13 @@
//! Helper functions for dealing with filenames of the image and delta layer files.
//!
use crate::repository::Key;
use std::borrow::Cow;
use std::cmp::Ordering;
use std::fmt;
use std::ops::Range;
use std::str::FromStr;
use regex::Regex;
use utils::lsn::Lsn;
use super::PersistentLayerDesc;
@@ -74,10 +76,19 @@ impl DeltaFileName {
let key_end_str = key_parts.next()?;
let lsn_start_str = lsn_parts.next()?;
let lsn_end_str = lsn_parts.next()?;
if parts.next().is_some() || key_parts.next().is_some() || key_parts.next().is_some() {
return None;
}
if key_start_str.len() != 36
|| key_end_str.len() != 36
|| lsn_start_str.len() != 16
|| lsn_end_str.len() != 16
{
return None;
}
let key_start = Key::from_hex(key_start_str).ok()?;
let key_end = Key::from_hex(key_end_str).ok()?;
@@ -182,6 +193,10 @@ impl ImageFileName {
return None;
}
if key_start_str.len() != 36 || key_end_str.len() != 36 || lsn_str.len() != 16 {
return None;
}
let key_start = Key::from_hex(key_start_str).ok()?;
let key_end = Key::from_hex(key_end_str).ok()?;
@@ -259,9 +274,22 @@ impl From<DeltaFileName> for LayerFileName {
impl FromStr for LayerFileName {
type Err = String;
/// Conversion from either a physical layer filename, or the string-ization of
/// Self. When loading a physical layer filename, we drop any extra information
/// not needed to build Self.
fn from_str(value: &str) -> Result<Self, Self::Err> {
let delta = DeltaFileName::parse_str(value);
let image = ImageFileName::parse_str(value);
let gen_suffix_regex = Regex::new("^(?<base>.+)-(?<gen>[0-9a-f]{8})$").unwrap();
let file_name: Cow<str> = match gen_suffix_regex.captures(value) {
Some(captures) => captures
.name("base")
.expect("Non-optional group")
.as_str()
.into(),
None => value.into(),
};
let delta = DeltaFileName::parse_str(&file_name);
let image = ImageFileName::parse_str(&file_name);
let ok = match (delta, image) {
(None, None) => {
return Err(format!(
@@ -315,3 +343,42 @@ impl<'de> serde::de::Visitor<'de> for LayerFileNameVisitor {
v.parse().map_err(|e| E::custom(e))
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn image_layer_parse() -> anyhow::Result<()> {
let expected = LayerFileName::Image(ImageFileName {
key_range: Key::from_i128(0)
..Key::from_hex("000000067F00000001000004DF0000000006").unwrap(),
lsn: Lsn::from_hex("00000000014FED58").unwrap(),
});
let parsed = LayerFileName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58-00000001").map_err(|s| anyhow::anyhow!(s))?;
assert_eq!(parsed, expected,);
// Omitting generation suffix is valid
let parsed = LayerFileName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58").map_err(|s| anyhow::anyhow!(s))?;
assert_eq!(parsed, expected,);
Ok(())
}
#[test]
fn delta_layer_parse() -> anyhow::Result<()> {
let expected = LayerFileName::Delta(DeltaFileName {
key_range: Key::from_i128(0)
..Key::from_hex("000000067F00000001000004DF0000000006").unwrap(),
lsn_range: Lsn::from_hex("00000000014FED58").unwrap()
..Lsn::from_hex("000000000154C481").unwrap(),
});
let parsed = LayerFileName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58-000000000154C481-00000001").map_err(|s| anyhow::anyhow!(s))?;
assert_eq!(parsed, expected);
// Omitting generation suffix is valid
let parsed = LayerFileName::from_str("000000000000000000000000000000000000-000000067F00000001000004DF0000000006__00000000014FED58-000000000154C481").map_err(|s| anyhow::anyhow!(s))?;
assert_eq!(parsed, expected);
Ok(())
}
}

View File

@@ -4,12 +4,13 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::{
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
};
use pageserver_api::shard::ShardIndex;
use pageserver_api::shard::{ShardIndex, TenantShardId};
use std::ops::Range;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, SystemTime};
use tracing::Instrument;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::sync::heavier_once_cell;
@@ -123,6 +124,25 @@ impl PartialEq for Layer {
}
}
pub(crate) fn local_layer_path(
conf: &PageServerConf,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
layer_file_name: &LayerFileName,
_generation: &Generation,
) -> Utf8PathBuf {
let timeline_path = conf.timeline_path(tenant_shard_id, timeline_id);
timeline_path.join(layer_file_name.file_name())
// TOOD: include generation in the name in now+1 releases.
// timeline_path.join(format!(
// "{}{}",
// layer_file_name.file_name(),
// generation.get_suffix()
// ))
}
impl Layer {
/// Creates a layer value for a file we know to not be resident.
pub(crate) fn for_evicted(
@@ -131,6 +151,14 @@ impl Layer {
file_name: LayerFileName,
metadata: LayerFileMetadata,
) -> Self {
let local_path = local_layer_path(
conf,
&timeline.tenant_shard_id,
&timeline.timeline_id,
&file_name,
&metadata.generation,
);
let desc = PersistentLayerDesc::from_filename(
timeline.tenant_shard_id,
timeline.timeline_id,
@@ -143,6 +171,7 @@ impl Layer {
let owner = Layer(Arc::new(LayerInner::new(
conf,
timeline,
local_path,
access_stats,
desc,
None,
@@ -159,6 +188,7 @@ impl Layer {
pub(crate) fn for_resident(
conf: &'static PageServerConf,
timeline: &Arc<Timeline>,
local_path: Utf8PathBuf,
file_name: LayerFileName,
metadata: LayerFileMetadata,
) -> ResidentLayer {
@@ -184,6 +214,7 @@ impl Layer {
LayerInner::new(
conf,
timeline,
local_path,
access_stats,
desc,
Some(inner),
@@ -225,9 +256,19 @@ impl Layer {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
let local_path = local_layer_path(
conf,
&timeline.tenant_shard_id,
&timeline.timeline_id,
&desc.filename(),
&timeline.generation,
);
LayerInner::new(
conf,
timeline,
local_path,
access_stats,
desc,
Some(inner),
@@ -410,6 +451,13 @@ impl Layer {
self.0.metadata()
}
pub(crate) fn get_timeline_id(&self) -> Option<TimelineId> {
self.0
.timeline
.upgrade()
.map(|timeline| timeline.timeline_id)
}
/// Traditional debug dumping facility
#[allow(unused)]
pub(crate) async fn dump(&self, verbose: bool, ctx: &RequestContext) -> anyhow::Result<()> {
@@ -709,19 +757,17 @@ impl Drop for LayerInner {
}
impl LayerInner {
#[allow(clippy::too_many_arguments)]
fn new(
conf: &'static PageServerConf,
timeline: &Arc<Timeline>,
local_path: Utf8PathBuf,
access_stats: LayerAccessStats,
desc: PersistentLayerDesc,
downloaded: Option<Arc<DownloadedLayer>>,
generation: Generation,
shard: ShardIndex,
) -> Self {
let path = conf
.timeline_path(&timeline.tenant_shard_id, &timeline.timeline_id)
.join(desc.filename().to_string());
let (inner, version, init_status) = if let Some(inner) = downloaded {
let version = inner.version;
let resident = ResidentOrWantedEvicted::Resident(inner);
@@ -737,7 +783,7 @@ impl LayerInner {
LayerInner {
conf,
debug_str: { format!("timelines/{}/{}", timeline.timeline_id, desc.filename()).into() },
path,
path: local_path,
desc,
timeline: Arc::downgrade(timeline),
have_remote_client: timeline.remote_client.is_some(),
@@ -1797,25 +1843,23 @@ impl ResidentLayer {
}
}
/// FIXME: truncate is bad name because we are not truncating anything, but copying the
/// filtered parts.
#[cfg(test)]
pub(super) async fn copy_delta_prefix(
/// Returns the amount of keys and values written to the writer.
pub(crate) async fn copy_delta_prefix(
&self,
writer: &mut super::delta_layer::DeltaLayerWriter,
truncate_at: Lsn,
until: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<()> {
) -> anyhow::Result<usize> {
use LayerKind::*;
let owner = &self.owner.0;
match self.downloaded.get(owner, ctx).await? {
Delta(ref d) => d
.copy_prefix(writer, truncate_at, ctx)
.copy_prefix(writer, until, ctx)
.await
.with_context(|| format!("truncate {self}")),
Image(_) => anyhow::bail!(format!("cannot truncate image layer {self}")),
.with_context(|| format!("copy_delta_prefix until {until} of {self}")),
Image(_) => anyhow::bail!(format!("cannot copy_lsn_prefix of image layer {self}")),
}
}

View File

@@ -2,6 +2,7 @@
//! such as compaction and GC
use std::ops::ControlFlow;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -9,9 +10,11 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::config::defaults::DEFAULT_COMPACTION_PERIOD;
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::CompactionError;
use crate::tenant::{Tenant, TenantState};
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{backoff, completion};
@@ -44,6 +47,7 @@ pub(crate) enum BackgroundLoopKind {
Compaction,
Gc,
Eviction,
IngestHouseKeeping,
ConsumptionMetricsCollectMetrics,
ConsumptionMetricsSyntheticSizeWorker,
InitialLogicalSizeCalculation,
@@ -132,6 +136,30 @@ pub fn start_background_loops(
}
},
);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::IngestHousekeeping,
Some(tenant_shard_id),
None,
&format!("ingest housekeeping for tenant {tenant_shard_id}"),
false,
{
let tenant = Arc::clone(tenant);
let background_jobs_can_start = background_jobs_can_start.cloned();
async move {
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
};
ingest_housekeeping_loop(tenant, cancel)
.instrument(info_span!("ingest_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
.await;
Ok(())
}
},
);
}
///
@@ -379,6 +407,61 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
loop {
tokio::select! {
_ = cancel.cancelled() => {
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
},
}
// We run ingest housekeeping with the same frequency as compaction: it is not worth
// having a distinct setting. But we don't run it in the same task, because compaction
// blocks on acquiring the background job semaphore.
let period = tenant.get_compaction_period();
// If compaction period is set to zero (to disable it), then we will use a reasonable default
let period = if period == Duration::ZERO {
humantime::Duration::from_str(DEFAULT_COMPACTION_PERIOD)
.unwrap()
.into()
} else {
period
};
// Jitter the period by +/- 5%
let period =
rand::thread_rng().gen_range((period * (95)) / 100..(period * (105)) / 100);
// Always sleep first: we do not need to do ingest housekeeping early in the lifetime of
// a tenant, since it won't have started writing any ephemeral files yet.
if tokio::time::timeout(period, cancel.cancelled())
.await
.is_ok()
{
break;
}
let started_at = Instant::now();
tenant.ingest_housekeeping().await;
warn_when_period_overrun(
started_at.elapsed(),
period,
BackgroundLoopKind::IngestHouseKeeping,
);
}
}
.await;
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
// if the tenant has a proper status already, no need to wait for anything
if tenant.current_state() == TenantState::Active {
@@ -420,8 +503,6 @@ pub(crate) async fn random_init_delay(
period: Duration,
cancel: &CancellationToken,
) -> Result<(), Cancelled> {
use rand::Rng;
if period == Duration::ZERO {
return Ok(());
}

View File

@@ -1,5 +1,6 @@
mod compaction;
pub mod delete;
pub(crate) mod detach_ancestor;
mod eviction_task;
mod init;
pub mod layer_manager;
@@ -22,8 +23,9 @@ use pageserver_api::{
},
keyspace::{KeySpaceAccum, SparseKeyPartitioning},
models::{
CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
EvictionPolicy, InMemoryLayerInfo, LayerMapInfo, TimelineState,
AuxFilePolicy, CompactionAlgorithm, DownloadRemoteLayersTaskInfo,
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, ShardNumber, TenantShardId},
@@ -58,6 +60,7 @@ use std::{
ops::ControlFlow,
};
use crate::tenant::storage_layer::layer::local_layer_path;
use crate::tenant::{
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
@@ -862,9 +865,13 @@ impl Timeline {
// Initialise the reconstruct state for the key with the cache
// entry returned above.
let mut reconstruct_state = ValuesReconstructState::new();
let mut key_state = VectoredValueReconstructState::default();
key_state.img = cached_page_img;
reconstruct_state.keys.insert(key, Ok(key_state));
// Only add the cached image to the reconstruct state when it exists.
if cached_page_img.is_some() {
let mut key_state = VectoredValueReconstructState::default();
key_state.img = cached_page_img;
reconstruct_state.keys.insert(key, Ok(key_state));
}
let vectored_res = self
.get_vectored_impl(keyspace.clone(), lsn, reconstruct_state, ctx)
@@ -1076,7 +1083,7 @@ impl Timeline {
// We should generalize this into Keyspace::contains in the future.
for range in &keyspace.ranges {
if range.start.field1 < METADATA_KEY_BEGIN_PREFIX
|| range.end.field1 >= METADATA_KEY_END_PREFIX
|| range.end.field1 > METADATA_KEY_END_PREFIX
{
return Err(GetVectoredError::Other(anyhow::anyhow!(
"only metadata keyspace can be scanned"
@@ -1494,15 +1501,21 @@ impl Timeline {
/// Flush to disk all data that was written with the put_* functions
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
self.freeze_and_flush0().await
}
// This exists to provide a non-span creating version of `freeze_and_flush` we can call without
// polluting the span hierarchy.
pub(crate) async fn freeze_and_flush0(&self) -> anyhow::Result<()> {
let to_lsn = self.freeze_inmem_layer(false).await;
self.flush_frozen_layers_and_wait(to_lsn).await
}
/// If there is no writer, and conditions for rolling the latest layer are met, then freeze it.
///
/// This is for use in background housekeeping, to provide guarantees of layers closing eventually
/// even if there are no ongoing writes to drive that.
async fn maybe_freeze_ephemeral_layer(&self) {
// Check if an open ephemeral layer should be closed: this provides
// background enforcement of checkpoint interval if there is no active WAL receiver, to avoid keeping
// an ephemeral layer open forever when idle. It also freezes layers if the global limit on
// ephemeral layer bytes has been breached.
pub(super) async fn maybe_freeze_ephemeral_layer(&self) {
let Ok(_write_guard) = self.write_lock.try_lock() else {
// If the write lock is held, there is an active wal receiver: rolling open layers
// is their responsibility while they hold this lock.
@@ -1529,13 +1542,11 @@ impl Timeline {
// we are a sharded tenant and have skipped some WAL
let last_freeze_ts = *self.last_freeze_ts.read().unwrap();
if last_freeze_ts.elapsed() >= self.get_checkpoint_timeout() {
// This should be somewhat rare, so we log it at INFO level.
//
// We checked for checkpoint timeout so that a shard without any
// data ingested (yet) doesn't write a remote index as soon as it
// Only do this if have been layer-less longer than get_checkpoint_timeout, so that a shard
// without any data ingested (yet) doesn't write a remote index as soon as it
// sees its LSN advance: we only do this if we've been layer-less
// for some time.
tracing::info!(
tracing::debug!(
"Advancing disk_consistent_lsn past WAL ingest gap {} -> {}",
disk_consistent_lsn,
last_record_lsn
@@ -1625,11 +1636,6 @@ impl Timeline {
(guard, permit)
};
// Prior to compaction, check if an open ephemeral layer should be closed: this provides
// background enforcement of checkpoint interval if there is no active WAL receiver, to avoid keeping
// an ephemeral layer open forever when idle.
self.maybe_freeze_ephemeral_layer().await;
// this wait probably never needs any "long time spent" logging, because we already nag if
// compaction task goes over it's period (20s) which is quite often in production.
let (_guard, _permit) = tokio::select! {
@@ -1899,7 +1905,7 @@ impl Timeline {
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
pub(crate) async fn download_layer(
&self,
layer_file_name: &str,
layer_file_name: &LayerFileName,
) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name).await else {
return Ok(None);
@@ -1917,7 +1923,10 @@ impl Timeline {
/// Evict just one layer.
///
/// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
pub(crate) async fn evict_layer(
&self,
layer_file_name: &LayerFileName,
) -> anyhow::Result<Option<bool>> {
let _gate = self
.gate
.enter()
@@ -1991,13 +2000,12 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
// Private functions
impl Timeline {
#[allow(dead_code)]
pub(crate) fn get_switch_to_aux_file_v2(&self) -> bool {
pub(crate) fn get_switch_aux_file_policy(&self) -> AuxFilePolicy {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.switch_to_aux_file_v2
.unwrap_or(self.conf.default_tenant_conf.switch_to_aux_file_v2)
.switch_aux_file_policy
.unwrap_or(self.conf.default_tenant_conf.switch_aux_file_policy)
}
pub(crate) fn get_lazy_slru_download(&self) -> bool {
@@ -2409,8 +2417,8 @@ impl Timeline {
for discovered in discovered {
let (name, kind) = match discovered {
Discovered::Layer(file_name, file_size) => {
discovered_layers.push((file_name, file_size));
Discovered::Layer(layer_file_name, local_path, file_size) => {
discovered_layers.push((layer_file_name, local_path, file_size));
continue;
}
Discovered::Metadata => {
@@ -2455,7 +2463,7 @@ impl Timeline {
let mut needs_cleanup = Vec::new();
let mut total_physical_size = 0;
for (name, decision) in decided {
for (name, local_path, decision) in decided {
let decision = match decision {
Ok(UseRemote { local, remote }) => {
// Remote is authoritative, but we may still choose to retain
@@ -2465,26 +2473,23 @@ impl Timeline {
// the correct generation.
UseLocal(remote)
} else {
path.push(name.file_name());
init::cleanup_local_file_for_remote(&path, &local, &remote)?;
path.pop();
let local_path = local_path.as_ref().expect("Locally found layer must have path");
init::cleanup_local_file_for_remote(local_path, &local, &remote)?;
UseRemote { local, remote }
}
}
Ok(decision) => decision,
Err(DismissedLayer::Future { local }) => {
if local.is_some() {
path.push(name.file_name());
init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
path.pop();
let local_path = local_path.expect("Locally found layer must have path");
init::cleanup_future_layer(&local_path, &name, disk_consistent_lsn)?;
}
needs_cleanup.push(name);
continue;
}
Err(DismissedLayer::LocalOnly(local)) => {
path.push(name.file_name());
init::cleanup_local_only_file(&path, &name, &local)?;
path.pop();
let local_path = local_path.expect("Locally found layer must have path");
init::cleanup_local_only_file(&local_path, &name, &local)?;
// this file never existed remotely, we will have to do rework
continue;
}
@@ -2500,7 +2505,18 @@ impl Timeline {
let layer = match decision {
UseLocal(m) => {
total_physical_size += m.file_size();
Layer::for_resident(conf, &this, name, m).drop_eviction_guard()
let local_path = local_path.unwrap_or_else(|| {
local_layer_path(
conf,
&this.tenant_shard_id,
&this.timeline_id,
&name,
&m.generation,
)
});
Layer::for_resident(conf, &this, local_path, name, m).drop_eviction_guard()
}
Evicted(remote) | UseRemote { remote, .. } => {
Layer::for_evicted(conf, &this, name, remote)
@@ -2981,11 +2997,11 @@ impl Timeline {
}
}
async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
async fn find_layer(&self, layer_name: &LayerFileName) -> Option<Layer> {
let guard = self.layers.read().await;
for historic_layer in guard.layer_map().iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
let historic_layer_name = historic_layer.filename();
if layer_name == &historic_layer_name {
return Some(guard.get_from_desc(&historic_layer));
}
}
@@ -3015,7 +3031,7 @@ impl Timeline {
HeatMapLayer::new(
layer.layer_desc().filename(),
layer.metadata().into(),
(&layer.metadata()).into(),
last_activity_ts,
)
});
@@ -3517,7 +3533,7 @@ impl Timeline {
Ok(ancestor)
}
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
pub(crate) fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
format!(
"Ancestor is missing. Timeline id: {} Ancestor id {:?}",
@@ -4333,6 +4349,49 @@ impl Timeline {
_ = self.cancel.cancelled() => {}
)
}
/// Detach this timeline from its ancestor by copying all of ancestors layers as this
/// Timelines layers up to the ancestor_lsn.
///
/// Requires a timeline that:
/// - has an ancestor to detach from
/// - the ancestor does not have an ancestor -- follows from the original RFC limitations, not
/// a technical requirement
/// - has prev_lsn in remote storage (temporary restriction)
///
/// After the operation has been started, it cannot be canceled. Upon restart it needs to be
/// polled again until completion.
///
/// During the operation all timelines sharing the data with this timeline will be reparented
/// from our ancestor to be branches of this timeline.
pub(crate) async fn prepare_to_detach_from_ancestor(
self: &Arc<Timeline>,
tenant: &crate::tenant::Tenant,
options: detach_ancestor::Options,
ctx: &RequestContext,
) -> Result<
(
completion::Completion,
detach_ancestor::PreparedTimelineDetach,
),
detach_ancestor::Error,
> {
detach_ancestor::prepare(self, tenant, options, ctx).await
}
/// Completes the ancestor detach. This method is to be called while holding the
/// TenantManager's tenant slot, so during this method we cannot be deleted nor can any
/// timeline be deleted. After this method returns successfully, tenant must be reloaded.
///
/// Pageserver receiving a SIGKILL during this operation is not supported (yet).
pub(crate) async fn complete_detaching_timeline_ancestor(
self: &Arc<Timeline>,
tenant: &crate::tenant::Tenant,
prepared: detach_ancestor::PreparedTimelineDetach,
ctx: &RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
detach_ancestor::complete(self, tenant, prepared, ctx).await
}
}
/// Top-level failure to compact.
@@ -4441,6 +4500,24 @@ impl Timeline {
Ok(())
}
async fn rewrite_layers(
self: &Arc<Self>,
replace_layers: Vec<(Layer, ResidentLayer)>,
drop_layers: Vec<Layer>,
) -> anyhow::Result<()> {
let mut guard = self.layers.write().await;
guard.rewrite_layers(&replace_layers, &drop_layers, &self.metrics);
let upload_layers: Vec<_> = replace_layers.into_iter().map(|r| r.1).collect();
if let Some(remote_client) = self.remote_client.as_ref() {
remote_client.schedule_compaction_update(&drop_layers, &upload_layers)?;
}
Ok(())
}
/// Schedules the uploads of the given image layers
fn upload_new_image_layers(
self: &Arc<Self>,
@@ -4599,6 +4676,8 @@ impl Timeline {
retain_lsns: Vec<Lsn>,
new_gc_cutoff: Lsn,
) -> anyhow::Result<GcResult> {
// FIXME: if there is an ongoing detach_from_ancestor, we should just skip gc
let now = SystemTime::now();
let mut result: GcResult = GcResult::default();

View File

@@ -15,7 +15,8 @@ use anyhow::{anyhow, Context};
use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_api::keyspace::ShardedRange;
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, info_span, trace, warn, Instrument};
use utils::id::TimelineId;
@@ -93,7 +94,7 @@ impl Timeline {
// Define partitioning schema if needed
// FIXME: the match should only cover repartitioning, not the next steps
match self
let partition_count = match self
.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
@@ -146,6 +147,7 @@ impl Timeline {
assert!(sparse_layers.is_empty());
self.upload_new_image_layers(dense_layers)?;
dense_partitioning.parts.len()
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
@@ -157,9 +159,150 @@ impl Timeline {
if !self.cancel.is_cancelled() {
tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
}
1
}
};
if self.shard_identity.count >= ShardCount::new(2) {
// Limit the number of layer rewrites to the number of partitions: this means its
// runtime should be comparable to a full round of image layer creations, rather than
// being potentially much longer.
let rewrite_max = partition_count;
self.compact_shard_ancestors(rewrite_max, ctx).await?;
}
Ok(())
}
/// Check for layers that are elegible to be rewritten:
/// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
/// we don't indefinitely retain keys in this shard that aren't needed.
/// - For future use: layers beyond pitr_interval that are in formats we would
/// rather not maintain compatibility with indefinitely.
///
/// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
/// how much work it will try to do in each compaction pass.
async fn compact_shard_ancestors(
self: &Arc<Self>,
rewrite_max: usize,
_ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut drop_layers = Vec::new();
let layers_to_rewrite: Vec<Layer> = Vec::new();
// We will use the PITR cutoff as a condition for rewriting layers.
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.pitr;
let layers = self.layers.read().await;
for layer_desc in layers.layer_map().iter_historic_layers() {
let layer = layers.get_from_desc(&layer_desc);
if layer.metadata().shard.shard_count == self.shard_identity.count {
// This layer does not belong to a historic ancestor, no need to re-image it.
continue;
}
// This layer was created on an ancestor shard: check if it contains any data for this shard.
let sharded_range = ShardedRange::new(layer_desc.get_key_range(), &self.shard_identity);
let layer_local_page_count = sharded_range.page_count();
let layer_raw_page_count = ShardedRange::raw_size(&layer_desc.get_key_range());
if layer_local_page_count == 0 {
// This ancestral layer only covers keys that belong to other shards.
// We include the full metadata in the log: if we had some critical bug that caused
// us to incorrectly drop layers, this would simplify manually debugging + reinstating those layers.
info!(%layer, old_metadata=?layer.metadata(),
"dropping layer after shard split, contains no keys for this shard.",
);
if cfg!(debug_assertions) {
// Expensive, exhaustive check of keys in this layer: this guards against ShardedRange's calculations being
// wrong. If ShardedRange claims the local page count is zero, then no keys in this layer
// should be !is_key_disposable()
let range = layer_desc.get_key_range();
let mut key = range.start;
while key < range.end {
debug_assert!(self.shard_identity.is_key_disposable(&key));
key = key.next();
}
}
drop_layers.push(layer);
continue;
} else if layer_local_page_count != u32::MAX
&& layer_local_page_count == layer_raw_page_count
{
debug!(%layer,
"layer is entirely shard local ({} keys), no need to filter it",
layer_local_page_count
);
continue;
}
// Don't bother re-writing a layer unless it will at least halve its size
if layer_local_page_count != u32::MAX
&& layer_local_page_count > layer_raw_page_count / 2
{
debug!(%layer,
"layer is already mostly local ({}/{}), not rewriting",
layer_local_page_count,
layer_raw_page_count
);
}
// Don't bother re-writing a layer if it is within the PITR window: it will age-out eventually
// without incurring the I/O cost of a rewrite.
if layer_desc.get_lsn_range().end >= pitr_cutoff {
debug!(%layer, "Skipping rewrite of layer still in PITR window ({} >= {})",
layer_desc.get_lsn_range().end, pitr_cutoff);
continue;
}
if layer_desc.is_delta() {
// We do not yet implement rewrite of delta layers
debug!(%layer, "Skipping rewrite of delta layer");
continue;
}
// Only rewrite layers if they would have different remote paths: either they belong to this
// shard but an old generation, or they belonged to another shard. This also implicitly
// guarantees that the layer is persistent in remote storage (as only remote persistent
// layers are carried across shard splits, any local-only layer would be in the current generation)
if layer.metadata().generation == self.generation
&& layer.metadata().shard.shard_count == self.shard_identity.count
{
debug!(%layer, "Skipping rewrite, is not from old generation");
continue;
}
if layers_to_rewrite.len() >= rewrite_max {
tracing::info!(%layer, "Will rewrite layer on a future compaction, already rewrote {}",
layers_to_rewrite.len()
);
continue;
}
// Fall through: all our conditions for doing a rewrite passed.
// TODO: implement rewriting
tracing::debug!(%layer, "Would rewrite layer");
}
// Drop the layers read lock: we will acquire it for write in [`Self::rewrite_layers`]
drop(layers);
// TODO: collect layers to rewrite
let replace_layers = Vec::new();
// Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
self.rewrite_layers(replace_layers, drop_layers).await?;
if let Some(remote_client) = self.remote_client.as_ref() {
// We wait for all uploads to complete before finishing this compaction stage. This is not
// necessary for correctness, but it simplifies testing, and avoids proceeding with another
// Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
// load.
remote_client.wait_completion().await?;
}
Ok(())
}

View File

@@ -422,6 +422,10 @@ impl DeleteTimelineFlow {
pub(crate) fn is_finished(&self) -> bool {
matches!(self, Self::Finished)
}
pub(crate) fn is_not_started(&self) -> bool {
matches!(self, Self::NotStarted)
}
}
struct DeletionGuard(OwnedMutexGuard<DeleteTimelineFlow>);

View File

@@ -0,0 +1,550 @@
use std::sync::Arc;
use super::{layer_manager::LayerManager, Timeline};
use crate::{
context::{DownloadBehavior, RequestContext},
task_mgr::TaskKind,
tenant::{
storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer},
Tenant,
},
virtual_file::{MaybeFatalIo, VirtualFile},
};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{completion, generation::Generation, id::TimelineId, lsn::Lsn};
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
#[error("no ancestors")]
NoAncestor,
#[error("too many ancestors")]
TooManyAncestors,
#[error("shutting down, please retry later")]
ShuttingDown,
#[error("detached timeline must receive writes before the operation")]
DetachedTimelineNeedsWrites,
#[error("flushing failed")]
FlushAncestor(#[source] anyhow::Error),
#[error("layer download failed")]
RewrittenDeltaDownloadFailed(#[source] anyhow::Error),
#[error("copying LSN prefix locally failed")]
CopyDeltaPrefix(#[source] anyhow::Error),
#[error("upload rewritten layer")]
UploadRewritten(#[source] anyhow::Error),
#[error("ancestor is already being detached by: {}", .0)]
OtherTimelineDetachOngoing(TimelineId),
#[error("remote copying layer failed")]
CopyFailed(#[source] anyhow::Error),
#[error("unexpected error")]
Unexpected(#[source] anyhow::Error),
}
pub(crate) struct PreparedTimelineDetach {
layers: Vec<Layer>,
}
/// TODO: this should be part of PageserverConf because we cannot easily modify cplane arguments.
#[derive(Debug)]
pub(crate) struct Options {
pub(crate) rewrite_concurrency: std::num::NonZeroUsize,
pub(crate) copy_concurrency: std::num::NonZeroUsize,
}
impl Default for Options {
fn default() -> Self {
Self {
rewrite_concurrency: std::num::NonZeroUsize::new(2).unwrap(),
copy_concurrency: std::num::NonZeroUsize::new(10).unwrap(),
}
}
}
/// See [`Timeline::prepare_to_detach_from_ancestor`]
pub(super) async fn prepare(
detached: &Arc<Timeline>,
tenant: &Tenant,
options: Options,
ctx: &RequestContext,
) -> Result<(completion::Completion, PreparedTimelineDetach), Error> {
use Error::*;
if detached.remote_client.as_ref().is_none() {
unimplemented!("no new code for running without remote storage");
}
let Some((ancestor, ancestor_lsn)) = detached
.ancestor_timeline
.as_ref()
.map(|tl| (tl.clone(), detached.ancestor_lsn))
else {
return Err(NoAncestor);
};
if !ancestor_lsn.is_valid() {
return Err(NoAncestor);
}
if ancestor.ancestor_timeline.is_some() {
// non-technical requirement; we could flatten N ancestors just as easily but we chose
// not to
return Err(TooManyAncestors);
}
if detached.get_prev_record_lsn() == Lsn::INVALID
|| detached.disk_consistent_lsn.load() == ancestor_lsn
{
// this is to avoid a problem that after detaching we would be unable to start up the
// compute because of "PREV_LSN: invalid".
return Err(DetachedTimelineNeedsWrites);
}
// before we acquire the gate, we must mark the ancestor as having a detach operation
// ongoing which will block other concurrent detach operations so we don't get to ackward
// situations where there would be two branches trying to reparent earlier branches.
let (guard, barrier) = completion::channel();
{
let mut guard = tenant.ongoing_timeline_detach.lock().unwrap();
if let Some((tl, other)) = guard.as_ref() {
if !other.is_ready() {
return Err(OtherTimelineDetachOngoing(*tl));
}
}
*guard = Some((detached.timeline_id, barrier));
}
let _gate_entered = detached.gate.enter().map_err(|_| ShuttingDown)?;
if ancestor_lsn >= ancestor.get_disk_consistent_lsn() {
let span =
tracing::info_span!("freeze_and_flush", ancestor_timeline_id=%ancestor.timeline_id);
async {
let started_at = std::time::Instant::now();
let freeze_and_flush = ancestor.freeze_and_flush0();
let mut freeze_and_flush = std::pin::pin!(freeze_and_flush);
let res =
tokio::time::timeout(std::time::Duration::from_secs(1), &mut freeze_and_flush)
.await;
let res = match res {
Ok(res) => res,
Err(_elapsed) => {
tracing::info!("freezing and flushing ancestor is still ongoing");
freeze_and_flush.await
}
};
res.map_err(FlushAncestor)?;
// we do not need to wait for uploads to complete but we do need `struct Layer`,
// copying delta prefix is unsupported currently for `InMemoryLayer`.
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
"froze and flushed the ancestor"
);
Ok(())
}
.instrument(span)
.await?;
}
let end_lsn = ancestor_lsn + 1;
let (filtered_layers, straddling_branchpoint, rest_of_historic) = {
// we do not need to start from our layers, because they can only be layers that come
// *after* ancestor_lsn
let layers = tokio::select! {
guard = ancestor.layers.read() => guard,
_ = detached.cancel.cancelled() => {
return Err(ShuttingDown);
}
_ = ancestor.cancel.cancelled() => {
return Err(ShuttingDown);
}
};
// between retries, these can change if compaction or gc ran in between. this will mean
// we have to redo work.
partition_work(ancestor_lsn, &layers)
};
// TODO: layers are already sorted by something: use that to determine how much of remote
// copies are already done.
tracing::info!(filtered=%filtered_layers, to_rewrite = straddling_branchpoint.len(), historic=%rest_of_historic.len(), "collected layers");
// TODO: copying and lsn prefix copying could be done at the same time with a single fsync after
let mut new_layers: Vec<Layer> =
Vec::with_capacity(straddling_branchpoint.len() + rest_of_historic.len());
{
tracing::debug!(to_rewrite = %straddling_branchpoint.len(), "copying prefix of delta layers");
let mut tasks = tokio::task::JoinSet::new();
let mut wrote_any = false;
let limiter = Arc::new(tokio::sync::Semaphore::new(
options.rewrite_concurrency.get(),
));
for layer in straddling_branchpoint {
let limiter = limiter.clone();
let timeline = detached.clone();
let ctx = ctx.detached_child(TaskKind::DetachAncestor, DownloadBehavior::Download);
tasks.spawn(async move {
let _permit = limiter.acquire().await;
let copied =
upload_rewritten_layer(end_lsn, &layer, &timeline, &timeline.cancel, &ctx)
.await?;
Ok(copied)
});
}
while let Some(res) = tasks.join_next().await {
match res {
Ok(Ok(Some(copied))) => {
wrote_any = true;
tracing::info!(layer=%copied, "rewrote and uploaded");
new_layers.push(copied);
}
Ok(Ok(None)) => {}
Ok(Err(e)) => return Err(e),
Err(je) => return Err(Unexpected(je.into())),
}
}
// FIXME: the fsync should be mandatory, after both rewrites and copies
if wrote_any {
let timeline_dir = VirtualFile::open(
&detached
.conf
.timeline_path(&detached.tenant_shard_id, &detached.timeline_id),
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
}
}
let mut tasks = tokio::task::JoinSet::new();
let limiter = Arc::new(tokio::sync::Semaphore::new(options.copy_concurrency.get()));
for adopted in rest_of_historic {
let limiter = limiter.clone();
let timeline = detached.clone();
tasks.spawn(
async move {
let _permit = limiter.acquire().await;
let owned =
remote_copy(&adopted, &timeline, timeline.generation, &timeline.cancel).await?;
tracing::info!(layer=%owned, "remote copied");
Ok(owned)
}
.in_current_span(),
);
}
while let Some(res) = tasks.join_next().await {
match res {
Ok(Ok(owned)) => {
new_layers.push(owned);
}
Ok(Err(failed)) => {
return Err(failed);
}
Err(je) => return Err(Unexpected(je.into())),
}
}
// TODO: fsync directory again if we hardlinked something
let prepared = PreparedTimelineDetach { layers: new_layers };
Ok((guard, prepared))
}
fn partition_work(
ancestor_lsn: Lsn,
source_layermap: &LayerManager,
) -> (usize, Vec<Layer>, Vec<Layer>) {
let mut straddling_branchpoint = vec![];
let mut rest_of_historic = vec![];
let mut later_by_lsn = 0;
for desc in source_layermap.layer_map().iter_historic_layers() {
// off by one chances here:
// - start is inclusive
// - end is exclusive
if desc.lsn_range.start > ancestor_lsn {
later_by_lsn += 1;
continue;
}
let target = if desc.lsn_range.start <= ancestor_lsn
&& desc.lsn_range.end > ancestor_lsn
&& desc.is_delta
{
// TODO: image layer at Lsn optimization
&mut straddling_branchpoint
} else {
&mut rest_of_historic
};
target.push(source_layermap.get_from_desc(&desc));
}
(later_by_lsn, straddling_branchpoint, rest_of_historic)
}
async fn upload_rewritten_layer(
end_lsn: Lsn,
layer: &Layer,
target: &Arc<Timeline>,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<Option<Layer>, Error> {
use Error::UploadRewritten;
let copied = copy_lsn_prefix(end_lsn, layer, target, ctx).await?;
let Some(copied) = copied else {
return Ok(None);
};
// FIXME: better shuttingdown error
target
.remote_client
.as_ref()
.unwrap()
.upload_layer_file(&copied, cancel)
.await
.map_err(UploadRewritten)?;
Ok(Some(copied.into()))
}
async fn copy_lsn_prefix(
end_lsn: Lsn,
layer: &Layer,
target_timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> Result<Option<ResidentLayer>, Error> {
use Error::{CopyDeltaPrefix, RewrittenDeltaDownloadFailed};
tracing::debug!(%layer, %end_lsn, "copying lsn prefix");
let mut writer = DeltaLayerWriter::new(
target_timeline.conf,
target_timeline.timeline_id,
target_timeline.tenant_shard_id,
layer.layer_desc().key_range.start,
layer.layer_desc().lsn_range.start..end_lsn,
)
.await
.map_err(CopyDeltaPrefix)?;
let resident = layer
.download_and_keep_resident()
.await
// likely shutdown
.map_err(RewrittenDeltaDownloadFailed)?;
let records = resident
.copy_delta_prefix(&mut writer, end_lsn, ctx)
.await
.map_err(CopyDeltaPrefix)?;
drop(resident);
tracing::debug!(%layer, records, "copied records");
if records == 0 {
drop(writer);
// TODO: we might want to store an empty marker in remote storage for this
// layer so that we will not needlessly walk `layer` on repeated attempts.
Ok(None)
} else {
// reuse the key instead of adding more holes between layers by using the real
// highest key in the layer.
let reused_highest_key = layer.layer_desc().key_range.end;
let copied = writer
.finish(reused_highest_key, target_timeline, ctx)
.await
.map_err(CopyDeltaPrefix)?;
tracing::debug!(%layer, %copied, "new layer produced");
Ok(Some(copied))
}
}
/// Creates a new Layer instance for the adopted layer, and ensures it is found from the remote
/// storage on successful return without the adopted layer being added to `index_part.json`.
async fn remote_copy(
adopted: &Layer,
adoptee: &Arc<Timeline>,
generation: Generation,
cancel: &CancellationToken,
) -> Result<Layer, Error> {
use Error::CopyFailed;
// depending if Layer::keep_resident we could hardlink
let mut metadata = adopted.metadata();
debug_assert!(metadata.generation <= generation);
metadata.generation = generation;
let owned = crate::tenant::storage_layer::Layer::for_evicted(
adoptee.conf,
adoptee,
adopted.layer_desc().filename(),
metadata,
);
// FIXME: better shuttingdown error
adoptee
.remote_client
.as_ref()
.unwrap()
.copy_timeline_layer(adopted, &owned, cancel)
.await
.map(move |()| owned)
.map_err(CopyFailed)
}
/// See [`Timeline::complete_detaching_timeline_ancestor`].
pub(super) async fn complete(
detached: &Arc<Timeline>,
tenant: &Tenant,
prepared: PreparedTimelineDetach,
_ctx: &RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
let rtc = detached
.remote_client
.as_ref()
.expect("has to have a remote timeline client for timeline ancestor detach");
let PreparedTimelineDetach { layers } = prepared;
let ancestor = detached
.get_ancestor_timeline()
.expect("must still have a ancestor");
let ancestor_lsn = detached.get_ancestor_lsn();
// publish the prepared layers before we reparent any of the timelines, so that on restart
// reparented timelines find layers. also do the actual detaching.
//
// if we crash after this operation, we will at least come up having detached a timeline, but
// we cannot go back and reparent the timelines which would had been reparented in normal
// execution.
//
// this is not perfect, but it avoids us a retry happening after a compaction or gc on restart
// which could give us a completely wrong layer combination.
rtc.schedule_adding_existing_layers_to_index_detach_and_wait(
&layers,
(ancestor.timeline_id, ancestor_lsn),
)
.await?;
let mut tasks = tokio::task::JoinSet::new();
// because we are now keeping the slot in progress, it is unlikely that there will be any
// timeline deletions during this time. if we raced one, then we'll just ignore it.
tenant
.timelines
.lock()
.unwrap()
.values()
.filter_map(|tl| {
if Arc::ptr_eq(tl, detached) {
return None;
}
if !tl.is_active() {
return None;
}
let tl_ancestor = tl.ancestor_timeline.as_ref()?;
let is_same = Arc::ptr_eq(&ancestor, tl_ancestor);
let is_earlier = tl.get_ancestor_lsn() <= ancestor_lsn;
let is_deleting = tl
.delete_progress
.try_lock()
.map(|flow| !flow.is_not_started())
.unwrap_or(true);
if is_same && is_earlier && !is_deleting {
Some(tl.clone())
} else {
None
}
})
.for_each(|timeline| {
// important in this scope: we are holding the Tenant::timelines lock
let span = tracing::info_span!("reparent", reparented=%timeline.timeline_id);
let new_parent = detached.timeline_id;
tasks.spawn(
async move {
let res = timeline
.remote_client
.as_ref()
.expect("reparented has to have remote client because detached has one")
.schedule_reparenting_and_wait(&new_parent)
.await;
match res {
Ok(()) => Some(timeline),
Err(e) => {
// with the use of tenant slot, we no longer expect these.
tracing::warn!("reparenting failed: {e:#}");
None
}
}
}
.instrument(span),
);
});
let reparenting_candidates = tasks.len();
let mut reparented = Vec::with_capacity(tasks.len());
while let Some(res) = tasks.join_next().await {
match res {
Ok(Some(timeline)) => {
tracing::info!(reparented=%timeline.timeline_id, "reparenting done");
reparented.push(timeline.timeline_id);
}
Ok(None) => {
// lets just ignore this for now. one or all reparented timelines could had
// started deletion, and that is fine.
}
Err(je) if je.is_cancelled() => unreachable!("not used"),
Err(je) if je.is_panic() => {
// ignore; it's better to continue with a single reparenting failing (or even
// all of them) in order to get to the goal state.
//
// these timelines will never be reparentable, but they can be always detached as
// separate tree roots.
}
Err(je) => tracing::error!("unexpected join error: {je:?}"),
}
}
if reparenting_candidates != reparented.len() {
tracing::info!("failed to reparent some candidates");
}
Ok(reparented)
}

View File

@@ -12,7 +12,7 @@ use crate::{
METADATA_FILE_NAME,
};
use anyhow::Context;
use camino::Utf8Path;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::shard::ShardIndex;
use std::{collections::HashMap, str::FromStr};
use utils::lsn::Lsn;
@@ -20,7 +20,7 @@ use utils::lsn::Lsn;
/// Identified files in the timeline directory.
pub(super) enum Discovered {
/// The only one we care about
Layer(LayerFileName, u64),
Layer(LayerFileName, Utf8PathBuf, u64),
/// Old ephmeral files from previous launches, should be removed
Ephemeral(String),
/// Old temporary timeline files, unsure what these really are, should be removed
@@ -46,7 +46,7 @@ pub(super) fn scan_timeline_dir(path: &Utf8Path) -> anyhow::Result<Vec<Discovere
let discovered = match LayerFileName::from_str(&file_name) {
Ok(file_name) => {
let file_size = direntry.metadata()?.len();
Discovered::Layer(file_name, file_size)
Discovered::Layer(file_name, direntry.path().to_owned(), file_size)
}
Err(_) => {
if file_name == METADATA_FILE_NAME {
@@ -104,26 +104,38 @@ pub(super) enum DismissedLayer {
/// Merges local discoveries and remote [`IndexPart`] to a collection of decisions.
pub(super) fn reconcile(
discovered: Vec<(LayerFileName, u64)>,
discovered: Vec<(LayerFileName, Utf8PathBuf, u64)>,
index_part: Option<&IndexPart>,
disk_consistent_lsn: Lsn,
generation: Generation,
shard: ShardIndex,
) -> Vec<(LayerFileName, Result<Decision, DismissedLayer>)> {
) -> Vec<(
LayerFileName,
Option<Utf8PathBuf>,
Result<Decision, DismissedLayer>,
)> {
use Decision::*;
// name => (local, remote)
type Collected = HashMap<LayerFileName, (Option<LayerFileMetadata>, Option<LayerFileMetadata>)>;
// name => (local_path, local_metadata, remote_metadata)
type Collected = HashMap<
LayerFileName,
(
Option<Utf8PathBuf>,
Option<LayerFileMetadata>,
Option<LayerFileMetadata>,
),
>;
let mut discovered = discovered
.into_iter()
.map(|(name, file_size)| {
.map(|(layer_name, local_path, file_size)| {
(
name,
layer_name,
// The generation and shard here will be corrected to match IndexPart in the merge below, unless
// it is not in IndexPart, in which case using our current generation makes sense
// because it will be uploaded in this generation.
(
Some(local_path),
Some(LayerFileMetadata::new(file_size, generation, shard)),
None,
),
@@ -140,15 +152,15 @@ pub(super) fn reconcile(
.map(|(name, metadata)| (name, LayerFileMetadata::from(metadata)))
.for_each(|(name, metadata)| {
if let Some(existing) = discovered.get_mut(name) {
existing.1 = Some(metadata);
existing.2 = Some(metadata);
} else {
discovered.insert(name.to_owned(), (None, Some(metadata)));
discovered.insert(name.to_owned(), (None, None, Some(metadata)));
}
});
discovered
.into_iter()
.map(|(name, (local, remote))| {
.map(|(name, (local_path, local, remote))| {
let decision = if name.is_in_future(disk_consistent_lsn) {
Err(DismissedLayer::Future { local })
} else {
@@ -165,7 +177,7 @@ pub(super) fn reconcile(
}
};
(name, decision)
(name, local_path, decision)
})
.collect::<Vec<_>>()
}

View File

@@ -205,6 +205,24 @@ impl LayerManager {
updates.flush();
}
/// Called when compaction is completed.
pub(crate) fn rewrite_layers(
&mut self,
rewrite_layers: &[(Layer, ResidentLayer)],
drop_layers: &[Layer],
_metrics: &TimelineMetrics,
) {
let mut updates = self.layer_map.batch_update();
// TODO: implement rewrites (currently this code path only used for drops)
assert!(rewrite_layers.is_empty());
for l in drop_layers {
Self::delete_historic_layer(l, &mut updates, &mut self.layer_fmgr);
}
updates.flush();
}
/// Called when garbage collect has selected the layers to be removed.
pub(crate) fn finish_gc_timeline(&mut self, gc_layers: &[Layer]) {
let mut updates = self.layer_map.batch_update();

View File

@@ -14,7 +14,8 @@ OBJS = \
relsize_cache.o \
walproposer.o \
walproposer_pg.o \
control_plane_connector.o
control_plane_connector.o \
walsender_hooks.o
PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq)

View File

@@ -49,7 +49,7 @@ char *neon_auth_token;
int readahead_buffer_size = 128;
int flush_every_n_requests = 8;
int neon_protocol_version = 1;
int neon_protocol_version = 2;
static int n_reconnect_attempts = 0;
static int max_reconnect_attempts = 60;
@@ -860,7 +860,7 @@ pg_init_libpagestore(void)
"Version of compute<->page server protocol",
NULL,
&neon_protocol_version,
1, /* default to old protocol for now */
2, /* use protocol version 2 */
1, /* min */
2, /* max */
PGC_SU_BACKEND,

View File

@@ -34,6 +34,7 @@
#include "walproposer.h"
#include "pagestore_client.h"
#include "control_plane_connector.h"
#include "walsender_hooks.h"
PG_MODULE_MAGIC;
void _PG_init(void);
@@ -265,7 +266,6 @@ LogicalSlotsMonitorMain(Datum main_arg)
}
}
void
_PG_init(void)
{
@@ -279,6 +279,7 @@ _PG_init(void)
pg_init_libpagestore();
pg_init_walproposer();
WalSender_Custom_XLogReaderRoutines = NeonOnDemandXLogReaderRoutines;
InitLogicalReplicationMonitor();

View File

@@ -36,10 +36,7 @@
static NeonWALReadResult NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
static NeonWALReadResult NeonWALReaderReadMsg(NeonWALReader *state);
static void NeonWALReaderResetRemote(NeonWALReader *state);
static bool NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
static bool neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, TimeLineID *tli_p);
static void neon_wal_segment_close(NeonWALReader *state);
static bool is_wal_segment_exists(XLogSegNo segno, int segsize,
TimeLineID tli);
@@ -82,8 +79,9 @@ struct NeonWALReader
XLogRecPtr req_lsn;
Size req_len;
Size req_progress;
WalProposer *wp; /* we learn donor through walproposer */
char donor_conninfo[MAXCONNINFO];
char donor_name[64]; /* saved donor safekeeper name for logging */
XLogRecPtr donor_lsn;
/* state of connection to safekeeper */
NeonWALReaderRemoteState rem_state;
WalProposerConn *wp_conn;
@@ -107,7 +105,7 @@ struct NeonWALReader
/* palloc and initialize NeonWALReader */
NeonWALReader *
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp, char *log_prefix)
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix)
{
NeonWALReader *reader;
@@ -123,8 +121,6 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalPropose
reader->seg.ws_tli = 0;
reader->segcxt.ws_segsize = wal_segment_size;
reader->wp = wp;
reader->rem_state = RS_NONE;
if (log_prefix)
@@ -204,21 +200,16 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
{
if (state->rem_state == RS_NONE)
{
XLogRecPtr donor_lsn;
/* no connection yet; start one */
Safekeeper *donor = GetDonor(state->wp, &donor_lsn);
if (donor == NULL)
if (!NeonWALReaderUpdateDonor(state))
{
snprintf(state->err_msg, sizeof(state->err_msg),
"failed to establish remote connection to fetch WAL: no donor available");
return NEON_WALREAD_ERROR;
}
snprintf(state->donor_name, sizeof(state->donor_name), "%s:%s", donor->host, donor->port);
nwr_log(LOG, "establishing connection to %s, flush_lsn %X/%X to fetch WAL",
state->donor_name, LSN_FORMAT_ARGS(donor_lsn));
state->wp_conn = libpqwp_connect_start(donor->conninfo);
/* no connection yet; start one */
nwr_log(LOG, "establishing connection to %s, lsn=%X/%X to fetch WAL", state->donor_name, LSN_FORMAT_ARGS(state->donor_lsn));
state->wp_conn = libpqwp_connect_start(state->donor_conninfo);
if (PQstatus(state->wp_conn->pg_conn) == CONNECTION_BAD)
{
snprintf(state->err_msg, sizeof(state->err_msg),
@@ -251,10 +242,22 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
{
/* connection successfully established */
char start_repl_query[128];
term_t term = pg_atomic_read_u64(&GetWalpropShmemState()->mineLastElectedTerm);
/*
* Set elected walproposer's term to pull only data from
* its history. Note: for logical walsender it means we
* might stream WAL not yet committed by safekeepers. It
* would be cleaner to fix this.
*
* mineLastElectedTerm shouldn't be 0 at this point
* because we checked above that donor exists and it
* appears only after successfull election.
*/
Assert(term > 0);
snprintf(start_repl_query, sizeof(start_repl_query),
"START_REPLICATION PHYSICAL %X/%X (term='" UINT64_FORMAT "')",
LSN_FORMAT_ARGS(startptr), state->wp->propTerm);
LSN_FORMAT_ARGS(startptr), term);
nwr_log(LOG, "connection to %s to fetch WAL succeeded, running %s",
state->donor_name, start_repl_query);
if (!libpqwp_send_query(state->wp_conn, start_repl_query))
@@ -404,6 +407,10 @@ NeonWALReadRemote(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size cou
state->req_lsn = InvalidXLogRecPtr;
state->req_len = 0;
state->req_progress = 0;
/* Update the current segment info. */
state->seg.ws_tli = tli;
return NEON_WALREAD_SUCCESS;
}
}
@@ -526,7 +533,7 @@ err:
}
/* reset remote connection and request in progress */
static void
void
NeonWALReaderResetRemote(NeonWALReader *state)
{
state->req_lsn = InvalidXLogRecPtr;
@@ -691,13 +698,25 @@ NeonWALReadLocal(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size coun
return true;
}
XLogRecPtr
NeonWALReaderGetRemLsn(NeonWALReader *state)
{
return state->rem_lsn;
}
const WALOpenSegment *
NeonWALReaderGetSegment(NeonWALReader *state)
{
return &state->seg;
}
/*
* Copy of vanilla wal_segment_open, but returns false in case of error instead
* of ERROR, with errno set.
*
* XLogReaderRoutine->segment_open callback for local pg_wal files
*/
static bool
bool
neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo,
TimeLineID *tli_p)
{
@@ -724,7 +743,7 @@ is_wal_segment_exists(XLogSegNo segno, int segsize, TimeLineID tli)
}
/* copy of vanilla wal_segment_close with NeonWALReader */
static void
void
neon_wal_segment_close(NeonWALReader *state)
{
if (state->seg.ws_file >= 0)
@@ -740,3 +759,19 @@ NeonWALReaderErrMsg(NeonWALReader *state)
{
return state->err_msg;
}
/*
* Returns true if there is a donor, and false otherwise
*/
bool
NeonWALReaderUpdateDonor(NeonWALReader *state)
{
WalproposerShmemState *wps = GetWalpropShmemState();
SpinLockAcquire(&wps->mutex);
memcpy(state->donor_name, wps->donor_name, sizeof(state->donor_name));
memcpy(state->donor_conninfo, wps->donor_conninfo, sizeof(state->donor_conninfo));
state->donor_lsn = wps->donor_lsn;
SpinLockRelease(&wps->mutex);
return state->donor_name[0] != '\0';
}

View File

@@ -19,12 +19,19 @@ typedef enum
NEON_WALREAD_ERROR,
} NeonWALReadResult;
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, WalProposer *wp, char *log_prefix);
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix);
extern void NeonWALReaderFree(NeonWALReader *state);
extern void NeonWALReaderResetRemote(NeonWALReader *state);
extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
extern pgsocket NeonWALReaderSocket(NeonWALReader *state);
extern uint32 NeonWALReaderEvents(NeonWALReader *state);
extern bool NeonWALReaderIsRemConnEstablished(NeonWALReader *state);
extern char *NeonWALReaderErrMsg(NeonWALReader *state);
extern XLogRecPtr NeonWALReaderGetRemLsn(NeonWALReader *state);
extern const WALOpenSegment *NeonWALReaderGetSegment(NeonWALReader *state);
extern bool neon_wal_segment_open(NeonWALReader *state, XLogSegNo nextSegNo, TimeLineID *tli_p);
extern void neon_wal_segment_close(NeonWALReader *state);
extern bool NeonWALReaderUpdateDonor(NeonWALReader *state);
#endif /* __NEON_WALREADER_H__ */

View File

@@ -80,7 +80,7 @@ static int CompareLsn(const void *a, const void *b);
static char *FormatSafekeeperState(Safekeeper *sk);
static void AssertEventsOkForState(uint32 events, Safekeeper *sk);
static char *FormatEvents(WalProposer *wp, uint32 events);
static void UpdateDonorShmem(WalProposer *wp);
WalProposer *
WalProposerCreate(WalProposerConfig *config, walproposer_api api)
@@ -922,7 +922,8 @@ static void
DetermineEpochStartLsn(WalProposer *wp)
{
TermHistory *dth;
int n_ready = 0;
int n_ready = 0;
WalproposerShmemState *walprop_shared;
wp->propEpochStartLsn = InvalidXLogRecPtr;
wp->donorEpoch = 0;
@@ -964,16 +965,18 @@ DetermineEpochStartLsn(WalProposer *wp)
if (n_ready < wp->quorum)
{
/*
* This is a rare case that can be triggered if safekeeper has voted and disconnected.
* In this case, its state will not be SS_IDLE and its vote cannot be used, because
* we clean up `voteResponse` in `ShutdownConnection`.
* This is a rare case that can be triggered if safekeeper has voted
* and disconnected. In this case, its state will not be SS_IDLE and
* its vote cannot be used, because we clean up `voteResponse` in
* `ShutdownConnection`.
*/
wp_log(FATAL, "missing majority of votes, collected %d, expected %d, got %d", wp->n_votes, wp->quorum, n_ready);
}
/*
* If propEpochStartLsn is 0, it means flushLsn is 0 everywhere, we are bootstrapping
* and nothing was committed yet. Start streaming then from the basebackup LSN.
* If propEpochStartLsn is 0, it means flushLsn is 0 everywhere, we are
* bootstrapping and nothing was committed yet. Start streaming then from
* the basebackup LSN.
*/
if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers)
{
@@ -984,11 +987,12 @@ DetermineEpochStartLsn(WalProposer *wp)
}
wp_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn));
}
pg_atomic_write_u64(&wp->api.get_shmem_state(wp)->propEpochStartLsn, wp->propEpochStartLsn);
/*
* Safekeepers are setting truncateLsn after timelineStartLsn is known, so it
* should never be zero at this point, if we know timelineStartLsn.
*
* Safekeepers are setting truncateLsn after timelineStartLsn is known, so
* it should never be zero at this point, if we know timelineStartLsn.
*
* timelineStartLsn can be zero only on the first syncSafekeepers run.
*/
Assert((wp->truncateLsn != InvalidXLogRecPtr) ||
@@ -1022,10 +1026,9 @@ DetermineEpochStartLsn(WalProposer *wp)
* since which we are going to write according to the consensus. If not,
* we must bail out, as clog and other non rel data is inconsistent.
*/
walprop_shared = wp->api.get_shmem_state(wp);
if (!wp->config->syncSafekeepers)
{
WalproposerShmemState *walprop_shared = wp->api.get_shmem_state(wp);
/*
* Basebackup LSN always points to the beginning of the record (not
* the page), as StartupXLOG most probably wants it this way.
@@ -1040,7 +1043,7 @@ DetermineEpochStartLsn(WalProposer *wp)
* compute (who could generate WAL) is ok.
*/
if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term ==
walprop_shared->mineLastElectedTerm)))
pg_atomic_read_u64(&walprop_shared->mineLastElectedTerm))))
{
/*
* Panic to restart PG as we need to retake basebackup.
@@ -1054,8 +1057,8 @@ DetermineEpochStartLsn(WalProposer *wp)
LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp)));
}
}
walprop_shared->mineLastElectedTerm = wp->propTerm;
}
pg_atomic_write_u64(&walprop_shared->mineLastElectedTerm, wp->propTerm);
}
/*
@@ -1105,9 +1108,13 @@ SendProposerElected(Safekeeper *sk)
{
/* safekeeper is empty or no common point, start from the beginning */
sk->startStreamingAt = wp->propTermHistory.entries[0].lsn;
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, timelineStartLsn=%X/%X, termHistory.n_entries=%u" ,
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), LSN_FORMAT_ARGS(wp->timelineStartLsn), wp->propTermHistory.n_entries);
/* wp->timelineStartLsn == InvalidXLogRecPtr can be only when timeline is created manually (test_s3_wal_replay) */
wp_log(LOG, "no common point with sk %s:%s, streaming since first term at %X/%X, timelineStartLsn=%X/%X, termHistory.n_entries=%u",
sk->host, sk->port, LSN_FORMAT_ARGS(sk->startStreamingAt), LSN_FORMAT_ARGS(wp->timelineStartLsn), wp->propTermHistory.n_entries);
/*
* wp->timelineStartLsn == InvalidXLogRecPtr can be only when timeline
* is created manually (test_s3_wal_replay)
*/
Assert(sk->startStreamingAt == wp->timelineStartLsn || wp->timelineStartLsn == InvalidXLogRecPtr);
}
else
@@ -1177,6 +1184,12 @@ StartStreaming(Safekeeper *sk)
sk->active_state = SS_ACTIVE_SEND;
sk->streamingAt = sk->startStreamingAt;
/*
* Donors can only be in SS_ACTIVE state, so we potentially update the
* donor when we switch one to SS_ACTIVE.
*/
UpdateDonorShmem(sk->wp);
/* event set will be updated inside SendMessageToNode */
SendMessageToNode(sk);
}
@@ -1568,17 +1581,17 @@ GetAcknowledgedByQuorumWALPosition(WalProposer *wp)
* none if it doesn't exist. donor_lsn is set to end position of the donor to
* the best of our knowledge.
*/
Safekeeper *
GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
static void
UpdateDonorShmem(WalProposer *wp)
{
Safekeeper *donor = NULL;
int i;
*donor_lsn = InvalidXLogRecPtr;
XLogRecPtr donor_lsn = InvalidXLogRecPtr;
if (wp->n_votes < wp->quorum)
{
wp_log(WARNING, "GetDonor called before elections are won");
return NULL;
wp_log(WARNING, "UpdateDonorShmem called before elections are won");
return;
}
/*
@@ -1589,7 +1602,7 @@ GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
if (wp->safekeeper[wp->donor].state >= SS_IDLE)
{
donor = &wp->safekeeper[wp->donor];
*donor_lsn = wp->propEpochStartLsn;
donor_lsn = wp->propEpochStartLsn;
}
/*
@@ -1601,13 +1614,19 @@ GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
{
Safekeeper *sk = &wp->safekeeper[i];
if (sk->state == SS_ACTIVE && sk->appendResponse.flushLsn > *donor_lsn)
if (sk->state == SS_ACTIVE && sk->appendResponse.flushLsn > donor_lsn)
{
donor = sk;
*donor_lsn = sk->appendResponse.flushLsn;
donor_lsn = sk->appendResponse.flushLsn;
}
}
return donor;
if (donor == NULL)
{
wp_log(WARNING, "UpdateDonorShmem didn't find a suitable donor, skipping");
return;
}
wp->api.update_donor(wp, donor, donor_lsn);
}
/*
@@ -1617,7 +1636,7 @@ static void
HandleSafekeeperResponse(WalProposer *wp, Safekeeper *sk)
{
XLogRecPtr candidateTruncateLsn;
XLogRecPtr newCommitLsn;
XLogRecPtr newCommitLsn;
newCommitLsn = GetAcknowledgedByQuorumWALPosition(wp);
if (newCommitLsn > wp->commitLsn)
@@ -1627,7 +1646,7 @@ HandleSafekeeperResponse(WalProposer *wp, Safekeeper *sk)
BroadcastAppendRequest(wp);
}
/*
/*
* Unlock syncrep waiters, update ps_feedback, CheckGracefulShutdown().
* The last one will terminate the process if the shutdown is requested
* and WAL is committed by the quorum. BroadcastAppendRequest() should be

View File

@@ -284,14 +284,19 @@ typedef struct PageserverFeedback
typedef struct WalproposerShmemState
{
pg_atomic_uint64 propEpochStartLsn;
char donor_name[64];
char donor_conninfo[MAXCONNINFO];
XLogRecPtr donor_lsn;
slock_t mutex;
term_t mineLastElectedTerm;
pg_atomic_uint64 mineLastElectedTerm;
pg_atomic_uint64 backpressureThrottlingTime;
pg_atomic_uint64 currentClusterSize;
/* last feedback from each shard */
PageserverFeedback shard_ps_feedback[MAX_SHARDS];
int num_shards;
int num_shards;
/* aggregated feedback with min LSNs across shards */
PageserverFeedback min_ps_feedback;
@@ -465,6 +470,9 @@ typedef struct walproposer_api
/* Get pointer to the latest available WAL. */
XLogRecPtr (*get_flush_rec_ptr) (WalProposer *wp);
/* Update current donor info in WalProposer Shmem */
void (*update_donor) (WalProposer *wp, Safekeeper *donor, XLogRecPtr donor_lsn);
/* Get current time. */
TimestampTz (*get_current_timestamp) (WalProposer *wp);
@@ -497,7 +505,7 @@ typedef struct walproposer_api
*
* On success, the data is placed in *buf. It is valid until the next call
* to this function.
*
*
* Returns PG_ASYNC_READ_FAIL on closed connection.
*/
PGAsyncReadResult (*conn_async_read) (Safekeeper *sk, char **buf, int *amount);
@@ -545,13 +553,14 @@ typedef struct walproposer_api
* Returns 0 if timeout is reached, 1 if some event happened. Updates
* events mask to indicate events and sets sk to the safekeeper which has
* an event.
*
*
* On timeout, events is set to WL_NO_EVENTS. On socket event, events is
* set to WL_SOCKET_READABLE and/or WL_SOCKET_WRITEABLE. When socket is
* closed, events is set to WL_SOCKET_READABLE.
*
* WL_SOCKET_WRITEABLE is usually set only when we need to flush the buffer.
* It can be returned only if caller asked for this event in the last *_event_set call.
*
* WL_SOCKET_WRITEABLE is usually set only when we need to flush the
* buffer. It can be returned only if caller asked for this event in the
* last *_event_set call.
*/
int (*wait_event_set) (WalProposer *wp, long timeout, Safekeeper **sk, uint32 *events);
@@ -571,9 +580,9 @@ typedef struct walproposer_api
void (*finish_sync_safekeepers) (WalProposer *wp, XLogRecPtr lsn);
/*
* Called after every AppendResponse from the safekeeper. Used to propagate
* backpressure feedback and to confirm WAL persistence (has been commited
* on the quorum of safekeepers).
* Called after every AppendResponse from the safekeeper. Used to
* propagate backpressure feedback and to confirm WAL persistence (has
* been commited on the quorum of safekeepers).
*/
void (*process_safekeeper_feedback) (WalProposer *wp, Safekeeper *sk);
@@ -716,12 +725,14 @@ extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPt
extern void WalProposerPoll(WalProposer *wp);
extern void WalProposerFree(WalProposer *wp);
extern WalproposerShmemState *GetWalpropShmemState();
/*
* WaitEventSet API doesn't allow to remove socket, so walproposer_pg uses it to
* recreate set from scratch, hence the export.
*/
extern void SafekeeperStateDesiredEvents(Safekeeper *sk, uint32 *sk_events, uint32 *nwr_events);
extern Safekeeper *GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn);
extern TimeLineID walprop_pg_get_timeline_id(void);
#define WPEVENT 1337 /* special log level for walproposer internal

View File

@@ -85,7 +85,6 @@ static void walprop_pg_init_standalone_sync_safekeepers(void);
static void walprop_pg_init_walsender(void);
static void walprop_pg_init_bgworker(void);
static TimestampTz walprop_pg_get_current_timestamp(WalProposer *wp);
static TimeLineID walprop_pg_get_timeline_id(void);
static void walprop_pg_load_libpqwalreceiver(void);
static process_interrupts_callback_t PrevProcessInterruptsCallback;
@@ -94,6 +93,8 @@ static shmem_startup_hook_type prev_shmem_startup_hook_type;
static shmem_request_hook_type prev_shmem_request_hook = NULL;
static void walproposer_shmem_request(void);
#endif
static void WalproposerShmemInit_SyncSafekeeper(void);
static void StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd);
static void WalSndLoop(WalProposer *wp);
@@ -136,6 +137,7 @@ WalProposerSync(int argc, char *argv[])
WalProposer *wp;
init_walprop_config(true);
WalproposerShmemInit_SyncSafekeeper();
walprop_pg_init_standalone_sync_safekeepers();
walprop_pg_load_libpqwalreceiver();
@@ -281,6 +283,8 @@ WalproposerShmemInit(void)
{
memset(walprop_shared, 0, WalproposerShmemSize());
SpinLockInit(&walprop_shared->mutex);
pg_atomic_init_u64(&walprop_shared->propEpochStartLsn, 0);
pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0);
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0);
}
@@ -289,6 +293,17 @@ WalproposerShmemInit(void)
return found;
}
static void
WalproposerShmemInit_SyncSafekeeper(void)
{
walprop_shared = palloc(WalproposerShmemSize());
memset(walprop_shared, 0, WalproposerShmemSize());
SpinLockInit(&walprop_shared->mutex);
pg_atomic_init_u64(&walprop_shared->propEpochStartLsn, 0);
pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0);
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
}
#define BACK_PRESSURE_DELAY 10000L // 0.01 sec
static bool
@@ -399,6 +414,13 @@ nwp_shmem_startup_hook(void)
WalproposerShmemInit();
}
WalproposerShmemState *
GetWalpropShmemState()
{
Assert(walprop_shared != NULL);
return walprop_shared;
}
static WalproposerShmemState *
walprop_pg_get_shmem_state(WalProposer *wp)
{
@@ -431,14 +453,15 @@ record_pageserver_feedback(PageserverFeedback *ps_feedback)
for (int i = 0; i < walprop_shared->num_shards; i++)
{
PageserverFeedback *feedback = &walprop_shared->shard_ps_feedback[i];
if (feedback->present)
{
if (min_feedback.last_received_lsn == InvalidXLogRecPtr || feedback->last_received_lsn < min_feedback.last_received_lsn)
min_feedback.last_received_lsn = feedback->last_received_lsn;
if (min_feedback.disk_consistent_lsn == InvalidXLogRecPtr || feedback->disk_consistent_lsn < min_feedback.disk_consistent_lsn)
min_feedback.disk_consistent_lsn = feedback->disk_consistent_lsn;
if (min_feedback.remote_consistent_lsn == InvalidXLogRecPtr || feedback->remote_consistent_lsn < min_feedback.remote_consistent_lsn)
min_feedback.remote_consistent_lsn = feedback->remote_consistent_lsn;
}
@@ -551,6 +574,7 @@ static void
walprop_sigusr2(SIGNAL_ARGS)
{
int save_errno = errno;
got_SIGUSR2 = true;
SetLatch(MyLatch);
errno = save_errno;
@@ -598,7 +622,7 @@ walprop_pg_get_current_timestamp(WalProposer *wp)
return GetCurrentTimestamp();
}
static TimeLineID
TimeLineID
walprop_pg_get_timeline_id(void)
{
#if PG_VERSION_NUM >= 150000
@@ -617,6 +641,20 @@ walprop_pg_load_libpqwalreceiver(void)
wpg_log(ERROR, "libpqwalreceiver didn't initialize correctly");
}
static void
walprop_pg_update_donor(WalProposer *wp, Safekeeper *donor, XLogRecPtr donor_lsn)
{
WalproposerShmemState *wps = wp->api.get_shmem_state(wp);
char donor_name[64];
pg_snprintf(donor_name, sizeof(donor_name), "%s:%s", donor->host, donor->port);
SpinLockAcquire(&wps->mutex);
memcpy(wps->donor_name, donor_name, sizeof(donor_name));
memcpy(wps->donor_conninfo, donor->conninfo, sizeof(donor->conninfo));
wps->donor_lsn = donor_lsn;
SpinLockRelease(&wps->mutex);
}
/* Helper function */
static bool
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
@@ -717,7 +755,6 @@ walprop_connect_start(Safekeeper *sk)
{
Assert(sk->conn == NULL);
sk->conn = libpqwp_connect_start(sk->conninfo);
}
static WalProposerConnectPollStatusType
@@ -1091,7 +1128,7 @@ static void
StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
{
XLogRecPtr FlushPtr;
__attribute__((unused)) TimeLineID currTLI;
__attribute__((unused)) TimeLineID currTLI;
#if PG_VERSION_NUM < 150000
if (ThisTimeLineID == 0)
@@ -1295,116 +1332,13 @@ XLogBroadcastWalProposer(WalProposer *wp)
}
}
/* Download WAL before basebackup for logical walsenders from sk, if needed */
/*
Used to download WAL before basebackup for logical walsenders from sk, no longer
needed because walsender always uses neon_walreader.
*/
static bool
WalProposerRecovery(WalProposer *wp, Safekeeper *sk)
{
char *err;
WalReceiverConn *wrconn;
WalRcvStreamOptions options;
char conninfo[MAXCONNINFO];
TimeLineID timeline;
XLogRecPtr startpos;
XLogRecPtr endpos;
startpos = GetLogRepRestartLSN(wp);
if (startpos == InvalidXLogRecPtr)
return true; /* recovery not needed */
endpos = wp->propEpochStartLsn;
timeline = wp->greetRequest.timeline;
if (!neon_auth_token)
{
memcpy(conninfo, sk->conninfo, MAXCONNINFO);
}
else
{
int written = 0;
written = snprintf((char *) conninfo, MAXCONNINFO, "password=%s %s", neon_auth_token, sk->conninfo);
if (written > MAXCONNINFO || written < 0)
wpg_log(FATAL, "could not append password to the safekeeper connection string");
}
#if PG_MAJORVERSION_NUM < 16
wrconn = walrcv_connect(conninfo, false, "wal_proposer_recovery", &err);
#else
wrconn = walrcv_connect(conninfo, false, false, "wal_proposer_recovery", &err);
#endif
if (!wrconn)
{
ereport(WARNING,
(errmsg("could not connect to WAL acceptor %s:%s: %s",
sk->host, sk->port,
err)));
return false;
}
wpg_log(LOG,
"start recovery for logical replication from %s:%s starting from %X/%08X till %X/%08X timeline "
"%d",
sk->host, sk->port, (uint32) (startpos >> 32),
(uint32) startpos, (uint32) (endpos >> 32), (uint32) endpos, timeline);
options.logical = false;
options.startpoint = startpos;
options.slotname = NULL;
options.proto.physical.startpointTLI = timeline;
if (walrcv_startstreaming(wrconn, &options))
{
XLogRecPtr rec_start_lsn;
XLogRecPtr rec_end_lsn = 0;
int len;
char *buf;
pgsocket wait_fd = PGINVALID_SOCKET;
while ((len = walrcv_receive(wrconn, &buf, &wait_fd)) >= 0)
{
if (len == 0)
{
(void) WaitLatchOrSocket(
MyLatch, WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE, wait_fd,
-1, WAIT_EVENT_WAL_RECEIVER_MAIN);
}
else
{
Assert(buf[0] == 'w' || buf[0] == 'k');
if (buf[0] == 'k')
continue; /* keepalive */
memcpy(&rec_start_lsn, &buf[XLOG_HDR_START_POS],
sizeof rec_start_lsn);
rec_start_lsn = pg_ntoh64(rec_start_lsn);
rec_end_lsn = rec_start_lsn + len - XLOG_HDR_SIZE;
/* write WAL to disk */
XLogWalPropWrite(sk->wp, &buf[XLOG_HDR_SIZE], len - XLOG_HDR_SIZE, rec_start_lsn);
ereport(DEBUG1,
(errmsg("Recover message %X/%X length %d",
LSN_FORMAT_ARGS(rec_start_lsn), len)));
if (rec_end_lsn >= endpos)
break;
}
}
ereport(LOG,
(errmsg("end of replication stream at %X/%X: %m",
LSN_FORMAT_ARGS(rec_end_lsn))));
walrcv_disconnect(wrconn);
/* failed to receive all WAL till endpos */
if (rec_end_lsn < endpos)
return false;
}
else
{
ereport(LOG,
(errmsg("primary server contains no more WAL on requested timeline %u LSN %X/%08X",
timeline, (uint32) (startpos >> 32), (uint32) startpos)));
return false;
}
return true;
}
@@ -1545,7 +1479,7 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk)
snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port);
Assert(!sk->xlogreader);
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, sk->wp, log_prefix);
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propEpochStartLsn, log_prefix);
if (sk->xlogreader == NULL)
wpg_log(FATAL, "failed to allocate xlog reader");
}
@@ -1960,8 +1894,8 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback *hs, WalProposer *wp)
static void
walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
{
HotStandbyFeedback hsFeedback;
bool needToAdvanceSlot = false;
HotStandbyFeedback hsFeedback;
bool needToAdvanceSlot = false;
if (wp->config->syncSafekeepers)
return;
@@ -2095,22 +2029,25 @@ GetLogRepRestartLSN(WalProposer *wp)
return lrRestartLsn;
}
void SetNeonCurrentClusterSize(uint64 size)
void
SetNeonCurrentClusterSize(uint64 size)
{
pg_atomic_write_u64(&walprop_shared->currentClusterSize, size);
}
uint64 GetNeonCurrentClusterSize(void)
uint64
GetNeonCurrentClusterSize(void)
{
return pg_atomic_read_u64(&walprop_shared->currentClusterSize);
}
uint64 GetNeonCurrentClusterSize(void);
uint64 GetNeonCurrentClusterSize(void);
static const walproposer_api walprop_pg = {
.get_shmem_state = walprop_pg_get_shmem_state,
.start_streaming = walprop_pg_start_streaming,
.get_flush_rec_ptr = walprop_pg_get_flush_rec_ptr,
.update_donor = walprop_pg_update_donor,
.get_current_timestamp = walprop_pg_get_current_timestamp,
.conn_error_message = walprop_error_message,
.conn_status = walprop_status,

172
pgxn/neon/walsender_hooks.c Normal file
View File

@@ -0,0 +1,172 @@
/*-------------------------------------------------------------------------
*
* walsender_hooks.c
*
* Implements XLogReaderRoutine in terms of NeonWALReader. Allows for
* fetching WAL from safekeepers, which normal xlogreader can't do.
*
*-------------------------------------------------------------------------
*/
#include "walsender_hooks.h"
#include "postgres.h"
#include "fmgr.h"
#include "access/xlogdefs.h"
#include "replication/walsender.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "miscadmin.h"
#include "utils/wait_event.h"
#include "utils/guc.h"
#include "postmaster/interrupt.h"
#include "neon_walreader.h"
#include "walproposer.h"
static NeonWALReader *wal_reader = NULL;
extern XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
extern bool GetDonorShmem(XLogRecPtr *donor_lsn);
static XLogRecPtr
NeonWALReadWaitForWAL(XLogRecPtr loc)
{
while (!NeonWALReaderUpdateDonor(wal_reader))
{
pg_usleep(1000);
CHECK_FOR_INTERRUPTS();
}
return WalSndWaitForWal(loc);
}
static int
NeonWALPageRead(
XLogReaderState *xlogreader,
XLogRecPtr targetPagePtr,
int reqLen,
XLogRecPtr targetRecPtr,
char *readBuf)
{
XLogRecPtr rem_lsn;
/* Wait for flush pointer to advance past our request */
XLogRecPtr flushptr = NeonWALReadWaitForWAL(targetPagePtr + reqLen);
int count;
if (flushptr < targetPagePtr + reqLen)
return -1;
/* Read at most XLOG_BLCKSZ bytes */
if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
count = XLOG_BLCKSZ;
else
count = flushptr - targetPagePtr;
/*
* Sometimes walsender requests non-monotonic sequences of WAL. If that's
* the case, we have to reset streaming from remote at the correct
* position. For example, walsender may try to verify the segment header
* when trying to read in the middle of it.
*/
rem_lsn = NeonWALReaderGetRemLsn(wal_reader);
if (rem_lsn != InvalidXLogRecPtr && targetPagePtr != rem_lsn)
{
NeonWALReaderResetRemote(wal_reader);
}
for (;;)
{
NeonWALReadResult res = NeonWALRead(
wal_reader,
readBuf,
targetPagePtr,
count,
walprop_pg_get_timeline_id());
if (res == NEON_WALREAD_SUCCESS)
{
/*
* Setting ws_tli is required by the XLogReaderRoutine, it is used
* for segment name generation in error reports.
*
* ReadPageInternal updates ws_segno after calling cb on its own
* and XLogReaderRoutine description doesn't require it, but
* WALRead sets, let's follow it.
*/
xlogreader->seg.ws_tli = NeonWALReaderGetSegment(wal_reader)->ws_tli;
xlogreader->seg.ws_segno = NeonWALReaderGetSegment(wal_reader)->ws_segno;
/*
* ws_file doesn't exist in case of remote read, and isn't used by
* xlogreader except by WALRead on which we don't rely anyway.
*/
return count;
}
if (res == NEON_WALREAD_ERROR)
{
elog(ERROR, "[walsender] Failed to read WAL (req_lsn=%X/%X, len=%d): %s",
LSN_FORMAT_ARGS(targetPagePtr),
reqLen,
NeonWALReaderErrMsg(wal_reader));
return -1;
}
/*
* Res is WOULDBLOCK, so we wait on the socket, recreating event set
* if necessary
*/
{
pgsocket sock = NeonWALReaderSocket(wal_reader);
uint32_t reader_events = NeonWALReaderEvents(wal_reader);
long timeout_ms = 1000;
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
if (ConfigReloadPending)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
}
WaitLatchOrSocket(
MyLatch,
WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | reader_events,
sock,
timeout_ms,
WAIT_EVENT_WAL_SENDER_MAIN);
}
}
}
static void
NeonWALReadSegmentOpen(XLogReaderState *xlogreader, XLogSegNo nextSegNo, TimeLineID *tli_p)
{
neon_wal_segment_open(wal_reader, nextSegNo, tli_p);
xlogreader->seg.ws_file = NeonWALReaderGetSegment(wal_reader)->ws_file;
}
static void
NeonWALReadSegmentClose(XLogReaderState *xlogreader)
{
neon_wal_segment_close(wal_reader);
xlogreader->seg.ws_file = NeonWALReaderGetSegment(wal_reader)->ws_file;
}
void
NeonOnDemandXLogReaderRoutines(XLogReaderRoutine *xlr)
{
if (!wal_reader)
{
XLogRecPtr epochStartLsn = pg_atomic_read_u64(&GetWalpropShmemState()->propEpochStartLsn);
if (epochStartLsn == 0)
{
elog(ERROR, "Unable to start walsender when propEpochStartLsn is 0!");
}
wal_reader = NeonWALReaderAllocate(wal_segment_size, epochStartLsn, "[walsender] ");
}
xlr->page_read = NeonWALPageRead;
xlr->segment_open = NeonWALReadSegmentOpen;
xlr->segment_close = NeonWALReadSegmentClose;
}

View File

@@ -0,0 +1,7 @@
#ifndef __WALSENDER_HOOKS_H__
#define __WALSENDER_HOOKS_H__
struct XLogReaderRoutine;
void NeonOnDemandXLogReaderRoutines(struct XLogReaderRoutine *xlr);
#endif

21
poetry.lock generated
View File

@@ -1001,18 +1001,17 @@ dotenv = ["python-dotenv"]
[[package]]
name = "flask-cors"
version = "3.0.10"
version = "4.0.1"
description = "A Flask extension adding a decorator for CORS support"
optional = false
python-versions = "*"
files = [
{file = "Flask-Cors-3.0.10.tar.gz", hash = "sha256:b60839393f3b84a0f3746f6cdca56c1ad7426aa738b70d6c61375857823181de"},
{file = "Flask_Cors-3.0.10-py2.py3-none-any.whl", hash = "sha256:74efc975af1194fc7891ff5cd85b0f7478be4f7f59fe158102e91abb72bb4438"},
{file = "Flask_Cors-4.0.1-py2.py3-none-any.whl", hash = "sha256:f2a704e4458665580c074b714c4627dd5a306b333deb9074d0b1794dfa2fb677"},
{file = "flask_cors-4.0.1.tar.gz", hash = "sha256:eeb69b342142fdbf4766ad99357a7f3876a2ceb77689dc10ff912aac06c389e4"},
]
[package.dependencies]
Flask = ">=0.9"
Six = "*"
[[package]]
name = "frozenlist"
@@ -1243,13 +1242,13 @@ files = [
[[package]]
name = "jinja2"
version = "3.1.3"
version = "3.1.4"
description = "A very fast and expressive template engine."
optional = false
python-versions = ">=3.7"
files = [
{file = "Jinja2-3.1.3-py3-none-any.whl", hash = "sha256:7d6d50dd97d52cbc355597bd845fabfbac3f551e1f99619e39a35ce8c370b5fa"},
{file = "Jinja2-3.1.3.tar.gz", hash = "sha256:ac8bd6544d4bb2c9792bf3a159e80bba8fda7f07e81bc3aed565432d5925ba90"},
{file = "jinja2-3.1.4-py3-none-any.whl", hash = "sha256:bc5dd2abb727a5319567b7a813e6a2e7318c39f4f487cfe6c89c6f9c7d25197d"},
{file = "jinja2-3.1.4.tar.gz", hash = "sha256:4a3aee7acbbe7303aede8e9648d13b8bf88a429282aa6122a993f0ac800cb369"},
]
[package.dependencies]
@@ -2612,13 +2611,13 @@ files = [
[[package]]
name = "werkzeug"
version = "3.0.1"
version = "3.0.3"
description = "The comprehensive WSGI web application library."
optional = false
python-versions = ">=3.8"
files = [
{file = "werkzeug-3.0.1-py3-none-any.whl", hash = "sha256:90a285dc0e42ad56b34e696398b8122ee4c681833fb35b8334a095d82c56da10"},
{file = "werkzeug-3.0.1.tar.gz", hash = "sha256:507e811ecea72b18a404947aded4b3390e1db8f826b494d76550ef45bb3b1dcc"},
{file = "werkzeug-3.0.3-py3-none-any.whl", hash = "sha256:fc9645dc43e03e4d630d23143a04a7f947a9a3b5727cd535fdfe155a17cc48c8"},
{file = "werkzeug-3.0.3.tar.gz", hash = "sha256:097e5bfda9f0aba8da6b8545146def481d06aa7d3266e7448e2cccf67dd8bd18"},
]
[package.dependencies]
@@ -2900,4 +2899,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "b3452b50901123fd5f2c385ce8a0c1c492296393b8a7926a322b6df0ea3ac572"
content-hash = "496d6d9f722983bda4d1265370bc8ba75560da74ab5d6b68c94a03290815e1eb"

View File

@@ -40,6 +40,7 @@ hyper.workspace = true
hyper1 = { package = "hyper", version = "1.2", features = ["server"] }
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
http-body-util = { version = "0.1" }
indexmap.workspace = true
ipnet.workspace = true
itertools.workspace = true
lasso = { workspace = true, features = ["multi-threaded"] }

View File

@@ -27,6 +27,7 @@ use proxy::redis::cancellation_publisher::RedisPublisherClient;
use proxy::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use proxy::redis::elasticache;
use proxy::redis::notifications;
use proxy::serverless::cancel_set::CancelSet;
use proxy::serverless::GlobalConnPoolOptions;
use proxy::usage_metrics;
@@ -243,6 +244,12 @@ struct SqlOverHttpArgs {
/// increase memory used by the pool
#[clap(long, default_value_t = 128)]
sql_over_http_pool_shards: usize,
#[clap(long, default_value_t = 10000)]
sql_over_http_client_conn_threshold: u64,
#[clap(long, default_value_t = 64)]
sql_over_http_cancel_set_shards: usize,
}
#[tokio::main]
@@ -599,6 +606,8 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
opt_in: args.sql_over_http.sql_over_http_pool_opt_in,
max_total_conns: args.sql_over_http.sql_over_http_pool_max_total_conns,
},
cancel_set: CancelSet::new(args.sql_over_http.sql_over_http_cancel_set_shards),
client_conn_threshold: args.sql_over_http.sql_over_http_client_conn_threshold,
};
let authentication_config = AuthenticationConfig {
scram_protocol_timeout: args.scram_protocol_timeout,

View File

@@ -2,7 +2,7 @@ use crate::{
auth::{self, backend::AuthRateLimiter},
console::locks::ApiLocks,
rate_limiter::RateBucketInfo,
serverless::GlobalConnPoolOptions,
serverless::{cancel_set::CancelSet, GlobalConnPoolOptions},
Host,
};
use anyhow::{bail, ensure, Context, Ok};
@@ -56,6 +56,8 @@ pub struct TlsConfig {
pub struct HttpConfig {
pub request_timeout: tokio::time::Duration,
pub pool_options: GlobalConnPoolOptions,
pub cancel_set: CancelSet,
pub client_conn_threshold: u64,
}
pub struct AuthenticationConfig {
@@ -536,9 +538,9 @@ pub struct RetryConfig {
impl RetryConfig {
/// Default options for RetryConfig.
/// Total delay for 8 retries with 100ms base delay and 1.6 backoff factor is about 7s.
/// Total delay for 5 retries with 200ms base delay and 2 backoff factor is about 6s.
pub const CONNECT_TO_COMPUTE_DEFAULT_VALUES: &'static str =
"num_retries=8,base_retry_wait_duration=100ms,retry_wait_exponent_base=1.6";
"num_retries=5,base_retry_wait_duration=200ms,retry_wait_exponent_base=2";
/// Total delay for 8 retries with 100ms base delay and 1.6 backoff factor is about 7s.
/// Cplane has timeout of 60s on each request. 8m7s in total.
pub const WAKE_COMPUTE_DEFAULT_VALUES: &'static str =
@@ -592,7 +594,7 @@ impl ConcurrencyLockOptions {
pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0";
/// Default options for [`crate::console::provider::ApiLocks`].
pub const DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK: &'static str =
"shards=64,permits=50,epoch=10m,timeout=500ms";
"shards=64,permits=10,epoch=10m,timeout=10ms";
// pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "shards=32,permits=4,epoch=10m,timeout=1s";

View File

@@ -76,7 +76,7 @@ pub mod errors {
}
http::StatusCode::LOCKED | http::StatusCode::UNPROCESSABLE_ENTITY => {
// Status 423: project might be in maintenance mode (or bad state), or quotas exceeded.
format!("{REQUEST_FAILED}: endpoint is temporary unavailable. check your quotas and/or contact our support")
format!("{REQUEST_FAILED}: endpoint is temporarily unavailable. Check your quotas and/or contact our support.")
}
_ => REQUEST_FAILED.to_owned(),
},

View File

@@ -3,6 +3,7 @@
//! Handles both SQL over HTTP and SQL over Websockets.
mod backend;
pub mod cancel_set;
mod conn_pool;
mod http_util;
mod json;
@@ -109,20 +110,37 @@ pub async fn task_main(
let conn_id = uuid::Uuid::new_v4();
let http_conn_span = tracing::info_span!("http_conn", ?conn_id);
connections.spawn(
connection_handler(
config,
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
cancellation_token.clone(),
server.clone(),
tls_acceptor.clone(),
conn,
peer_addr,
)
.instrument(http_conn_span),
);
let n_connections = Metrics::get()
.proxy
.client_connections
.sample(crate::metrics::Protocol::Http);
tracing::trace!(?n_connections, threshold = ?config.http_config.client_conn_threshold, "check");
if n_connections > config.http_config.client_conn_threshold {
tracing::trace!("attempting to cancel a random connection");
if let Some(token) = config.http_config.cancel_set.take() {
tracing::debug!("cancelling a random connection");
token.cancel()
}
}
let conn_token = cancellation_token.child_token();
let conn = connection_handler(
config,
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
conn_token.clone(),
server.clone(),
tls_acceptor.clone(),
conn,
peer_addr,
)
.instrument(http_conn_span);
connections.spawn(async move {
let _cancel_guard = config.http_config.cancel_set.insert(conn_id, conn_token);
conn.await
});
}
connections.wait().await;
@@ -243,6 +261,7 @@ async fn connection_handler(
// On cancellation, trigger the HTTP connection handler to shut down.
let res = match select(pin!(cancellation_token.cancelled()), pin!(conn)).await {
Either::Left((_cancelled, mut conn)) => {
tracing::debug!(%peer_addr, "cancelling connection");
conn.as_mut().graceful_shutdown();
conn.await
}

View File

@@ -0,0 +1,102 @@
//! A set for cancelling random http connections
use std::{
hash::{BuildHasher, BuildHasherDefault},
num::NonZeroUsize,
time::Duration,
};
use indexmap::IndexMap;
use parking_lot::Mutex;
use rand::{thread_rng, Rng};
use rustc_hash::FxHasher;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
type Hasher = BuildHasherDefault<FxHasher>;
pub struct CancelSet {
shards: Box<[Mutex<CancelShard>]>,
// keyed by random uuid, fxhasher is fine
hasher: Hasher,
}
pub struct CancelShard {
tokens: IndexMap<uuid::Uuid, (Instant, CancellationToken), Hasher>,
}
impl CancelSet {
pub fn new(shards: usize) -> Self {
CancelSet {
shards: (0..shards)
.map(|_| {
Mutex::new(CancelShard {
tokens: IndexMap::with_hasher(Hasher::default()),
})
})
.collect(),
hasher: Hasher::default(),
}
}
pub fn take(&self) -> Option<CancellationToken> {
for _ in 0..4 {
if let Some(token) = self.take_raw(thread_rng().gen()) {
return Some(token);
}
tracing::trace!("failed to get cancel token");
}
None
}
pub fn take_raw(&self, rng: usize) -> Option<CancellationToken> {
NonZeroUsize::new(self.shards.len())
.and_then(|len| self.shards[rng % len].lock().take(rng / len))
}
pub fn insert(&self, id: uuid::Uuid, token: CancellationToken) -> CancelGuard<'_> {
let shard = NonZeroUsize::new(self.shards.len()).map(|len| {
let hash = self.hasher.hash_one(id) as usize;
let shard = &self.shards[hash % len];
shard.lock().insert(id, token);
shard
});
CancelGuard { shard, id }
}
}
impl CancelShard {
fn take(&mut self, rng: usize) -> Option<CancellationToken> {
NonZeroUsize::new(self.tokens.len()).and_then(|len| {
// 10 second grace period so we don't cancel new connections
if self.tokens.get_index(rng % len)?.1 .0.elapsed() < Duration::from_secs(10) {
return None;
}
let (_key, (_insert, token)) = self.tokens.swap_remove_index(rng % len)?;
Some(token)
})
}
fn remove(&mut self, id: uuid::Uuid) {
self.tokens.swap_remove(&id);
}
fn insert(&mut self, id: uuid::Uuid, token: CancellationToken) {
self.tokens.insert(id, (Instant::now(), token));
}
}
pub struct CancelGuard<'a> {
shard: Option<&'a Mutex<CancelShard>>,
id: Uuid,
}
impl Drop for CancelGuard<'_> {
fn drop(&mut self) {
if let Some(shard) = self.shard {
shard.lock().remove(self.id);
}
}
}

View File

@@ -716,7 +716,7 @@ impl<C: ClientInnerExt> Drop for Client<C> {
mod tests {
use std::{mem, sync::atomic::AtomicBool};
use crate::{BranchId, EndpointId, ProjectId};
use crate::{serverless::cancel_set::CancelSet, BranchId, EndpointId, ProjectId};
use super::*;
@@ -767,6 +767,8 @@ mod tests {
max_total_conns: 3,
},
request_timeout: Duration::from_secs(1),
cancel_set: CancelSet::new(0),
client_conn_threshold: u64::MAX,
}));
let pool = GlobalConnPool::new(config);
let conn_info = ConnInfo {

View File

@@ -424,8 +424,8 @@ pub enum SqlOverHttpCancel {
impl ReportableError for SqlOverHttpCancel {
fn get_error_kind(&self) -> ErrorKind {
match self {
SqlOverHttpCancel::Postgres => ErrorKind::RateLimit,
SqlOverHttpCancel::Connect => ErrorKind::ServiceRateLimit,
SqlOverHttpCancel::Postgres => ErrorKind::ClientDisconnect,
SqlOverHttpCancel::Connect => ErrorKind::ClientDisconnect,
}
}
}

View File

@@ -14,7 +14,7 @@ requests = "^2.31.0"
pytest-xdist = "^3.3.1"
asyncpg = "^0.29.0"
aiopg = "^1.4.0"
Jinja2 = "^3.1.3"
Jinja2 = "^3.1.4"
types-requests = "^2.31.0.0"
types-psycopg2 = "^2.9.21.10"
boto3 = "^1.34.11"
@@ -24,7 +24,7 @@ backoff = "^2.2.1"
pytest-lazy-fixture = "^0.6.3"
prometheus-client = "^0.14.1"
pytest-timeout = "^2.1.0"
Werkzeug = "^3.0.1"
Werkzeug = "^3.0.3"
pytest-order = "^1.1.0"
allure-pytest = "^2.13.2"
pytest-asyncio = "^0.21.0"

View File

@@ -506,6 +506,8 @@ struct WalSender<'a, IO> {
send_buf: [u8; MAX_SEND_SIZE],
}
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
/// Send WAL until
/// - an error occurs
@@ -584,14 +586,22 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
loop {
self.end_pos = self.end_watch.get();
if self.end_pos > self.start_pos {
// We have something to send.
let have_something_to_send = (|| {
fail::fail_point!(
"sk-pause-send",
self.appname.as_deref() != Some("pageserver"),
|_| { false }
);
self.end_pos > self.start_pos
})();
if have_something_to_send {
trace!("got end_pos {:?}, streaming", self.end_pos);
return Ok(());
}
// Wait for WAL to appear, now self.end_pos == self.start_pos.
if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
if let Some(lsn) = self.wait_for_lsn().await? {
self.end_pos = lsn;
trace!("got end_pos {:?}, streaming", self.end_pos);
return Ok(());
@@ -628,6 +638,54 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
.await?;
}
}
/// Wait until we have available WAL > start_pos or timeout expires. Returns
/// - Ok(Some(end_pos)) if needed lsn is successfully observed;
/// - Ok(None) if timeout expired;
/// - Err in case of error -- only if 1) term changed while fetching in recovery
/// mode 2) watch channel closed, which must never happen.
async fn wait_for_lsn(&mut self) -> anyhow::Result<Option<Lsn>> {
let fp = (|| {
fail::fail_point!(
"sk-pause-send",
self.appname.as_deref() != Some("pageserver"),
|_| { true }
);
false
})();
if fp {
tokio::time::sleep(POLL_STATE_TIMEOUT).await;
return Ok(None);
}
let res = timeout(POLL_STATE_TIMEOUT, async move {
loop {
let end_pos = self.end_watch.get();
if end_pos > self.start_pos {
return Ok(end_pos);
}
if let EndWatch::Flush(rx) = &self.end_watch {
let curr_term = rx.borrow().term;
if let Some(client_term) = self.term {
if curr_term != client_term {
bail!("term changed: requested {}, now {}", client_term, curr_term);
}
}
}
self.end_watch.changed().await?;
}
})
.await;
match res {
// success
Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
// error inside closure
Ok(Err(err)) => Err(err),
// timeout
Err(_) => Ok(None),
}
}
}
/// A half driving receiving replies.
@@ -685,47 +743,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
}
}
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
/// Wait until we have available WAL > start_pos or timeout expires. Returns
/// - Ok(Some(end_pos)) if needed lsn is successfully observed;
/// - Ok(None) if timeout expired;
/// - Err in case of error -- only if 1) term changed while fetching in recovery
/// mode 2) watch channel closed, which must never happen.
async fn wait_for_lsn(
rx: &mut EndWatch,
client_term: Option<Term>,
start_pos: Lsn,
) -> anyhow::Result<Option<Lsn>> {
let res = timeout(POLL_STATE_TIMEOUT, async move {
loop {
let end_pos = rx.get();
if end_pos > start_pos {
return Ok(end_pos);
}
if let EndWatch::Flush(rx) = rx {
let curr_term = rx.borrow().term;
if let Some(client_term) = client_term {
if curr_term != client_term {
bail!("term changed: requested {}, now {}", client_term, curr_term);
}
}
}
rx.changed().await?;
}
})
.await;
match res {
// success
Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
// error inside closure
Ok(Err(err)) => Err(err),
// timeout
Err(_) => Ok(None),
}
}
#[cfg(test)]
mod tests {
use utils::id::{TenantId, TimelineId};

View File

@@ -17,8 +17,7 @@ use utils::lsn::Lsn;
use walproposer::{
api_bindings::Level,
bindings::{
pg_atomic_uint64, NeonWALReadResult, PageserverFeedback, SafekeeperStateDesiredEvents,
WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE,
NeonWALReadResult, SafekeeperStateDesiredEvents, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE,
},
walproposer::{ApiImpl, Config},
};
@@ -224,31 +223,13 @@ impl SimulationApi {
})
.collect::<Vec<_>>();
let empty_feedback = PageserverFeedback {
present: false,
currentClusterSize: 0,
last_received_lsn: 0,
disk_consistent_lsn: 0,
remote_consistent_lsn: 0,
replytime: 0,
shard_number: 0,
};
Self {
os: args.os,
safekeepers: RefCell::new(sk_conns),
disk: args.disk,
redo_start_lsn: args.redo_start_lsn,
last_logged_commit_lsn: 0,
shmem: UnsafeCell::new(walproposer::bindings::WalproposerShmemState {
mutex: 0,
mineLastElectedTerm: 0,
backpressureThrottlingTime: pg_atomic_uint64 { value: 0 },
currentClusterSize: pg_atomic_uint64 { value: 0 },
shard_ps_feedback: [empty_feedback; 128],
num_shards: 0,
min_ps_feedback: empty_feedback,
}),
shmem: UnsafeCell::new(walproposer::api_bindings::empty_shmem()),
config: args.config,
event_set: RefCell::new(None),
}
@@ -274,6 +255,12 @@ impl ApiImpl for SimulationApi {
self.os.now() as i64 * 1000
}
fn update_donor(&self, donor: &mut walproposer::bindings::Safekeeper, donor_lsn: u64) {
let mut shmem = unsafe { *self.get_shmem_state() };
shmem.propEpochStartLsn.value = donor_lsn;
shmem.donor_conninfo = donor.conninfo;
}
fn conn_status(
&self,
_: &mut walproposer::bindings::Safekeeper,

View File

@@ -76,13 +76,10 @@ you can use `--pg-version` argument.
`TEST_OUTPUT`: Set the directory where test state and test output files
should go.
`TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests.
`NEON_PAGESERVER_OVERRIDES`: add a `;`-separated set of configs that will be passed as
`RUST_LOG`: logging configuration to pass into Neon CLI
Useful parameters and commands:
`--pageserver-config-override=${value}` `-c` values to pass into pageserver through neon_local cli
`--preserve-database-files` to preserve pageserver (layer) and safekeer (segment) timeline files on disk
after running a test suite. Such files might be large, so removed by default; but might be useful for debugging or creation of svg images with layer file contents.

View File

@@ -14,7 +14,7 @@ import textwrap
import threading
import time
import uuid
from contextlib import closing, contextmanager
from contextlib import ExitStack, closing, contextmanager
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
@@ -54,7 +54,7 @@ from fixtures.pageserver.allowed_errors import (
DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.types import IndexPartDump
from fixtures.pageserver.types import IndexPartDump, LayerFileName, parse_layer_file_name
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload,
@@ -68,7 +68,7 @@ from fixtures.remote_storage import (
RemoteStorageUser,
S3Storage,
default_remote_storage,
remote_storage_to_toml_inline_table,
remote_storage_to_toml_dict,
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.utils import are_walreceivers_absent
@@ -82,6 +82,7 @@ from fixtures.utils import (
subprocess_capture,
wait_until,
)
from fixtures.utils import AuxFileStore as AuxFileStore # reexport
"""
This file contains pytest fixtures. A fixture is a test resource that can be
@@ -450,6 +451,7 @@ class NeonEnvBuilder:
test_output_dir: Path,
test_overlay_dir: Optional[Path] = None,
pageserver_remote_storage: Optional[RemoteStorage] = None,
# toml that will be decomposed into `--config-override` flags during `pageserver --init`
pageserver_config_override: Optional[str] = None,
num_safekeepers: int = 1,
num_pageservers: int = 1,
@@ -464,6 +466,7 @@ class NeonEnvBuilder:
initial_tenant: Optional[TenantId] = None,
initial_timeline: Optional[TimelineId] = None,
pageserver_virtual_file_io_engine: Optional[str] = None,
pageserver_aux_file_policy: Optional[AuxFileStore] = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -487,6 +490,7 @@ class NeonEnvBuilder:
self.env: Optional[NeonEnv] = None
self.keep_remote_storage_contents: bool = True
self.neon_binpath = neon_binpath
self.neon_local_binpath = neon_binpath
self.pg_distrib_dir = pg_distrib_dir
self.pg_version = pg_version
self.preserve_database_files = preserve_database_files
@@ -518,6 +522,8 @@ class NeonEnvBuilder:
self.pageserver_validate_vectored_get = bool(validate)
log.debug(f'Overriding pageserver validate_vectored_get config to "{validate}"')
self.pageserver_aux_file_policy = pageserver_aux_file_policy
assert test_name.startswith(
"test_"
), "Unexpectedly instantiated from outside a test function"
@@ -563,6 +569,7 @@ class NeonEnvBuilder:
timeline_id=env.initial_timeline,
shard_count=initial_tenant_shard_count,
shard_stripe_size=initial_tenant_shard_stripe_size,
aux_file_v2=self.pageserver_aux_file_policy,
)
assert env.initial_tenant == initial_tenant
assert env.initial_timeline == initial_timeline
@@ -631,17 +638,11 @@ class NeonEnvBuilder:
def from_repo_dir(
self,
repo_dir: Path,
neon_binpath: Optional[Path] = None,
pg_distrib_dir: Optional[Path] = None,
) -> NeonEnv:
"""
A simple method to import data into the current NeonEnvBuilder from a snapshot of a repo dir.
"""
# Setting custom `neon_binpath` and `pg_distrib_dir` is useful for compatibility tests
self.neon_binpath = neon_binpath or self.neon_binpath
self.pg_distrib_dir = pg_distrib_dir or self.pg_distrib_dir
# Get the initial tenant and timeline from the snapshot config
snapshot_config_toml = repo_dir / "config"
with snapshot_config_toml.open("r") as f:
@@ -981,7 +982,7 @@ class NeonEnv:
Some notable functions and fields in NeonEnv:
postgres - A factory object for creating postgres compute nodes.
endpoints - A factory object for creating postgres compute nodes.
pageservers - An array containing objects representing the pageservers
@@ -1016,12 +1017,12 @@ class NeonEnv:
self.pg_version = config.pg_version
# Binary path for pageserver, safekeeper, etc
self.neon_binpath = config.neon_binpath
# Binary path for neon_local test-specific binaries: may be overridden
# after construction for compat testing
self.neon_local_binpath = config.neon_binpath
# Binary path for neon_local test-specific binaries
self.neon_local_binpath = config.neon_local_binpath
if self.neon_local_binpath is None:
self.neon_local_binpath = self.neon_binpath
self.pg_distrib_dir = config.pg_distrib_dir
self.endpoint_counter = 0
self.pageserver_config_override = config.pageserver_config_override
self.storage_controller_config = config.storage_controller_config
# generate initial tenant ID here instead of letting 'neon init' generate it,
@@ -1051,6 +1052,7 @@ class NeonEnv:
)
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
self.pageserver_aux_file_policy = config.pageserver_aux_file_policy
# Create a config file corresponding to the options
cfg: Dict[str, Any] = {
@@ -1131,7 +1133,11 @@ class NeonEnv:
cfg["safekeepers"].append(sk_cfg)
log.info(f"Config: {cfg}")
self.neon_cli.init(cfg, force=config.config_init_force)
self.neon_cli.init(
cfg,
force=config.config_init_force,
pageserver_config_override=config.pageserver_config_override,
)
def start(self):
# Storage controller starts first, so that pageserver /re-attach calls don't
@@ -1283,6 +1289,7 @@ def _shared_simple_env(
pg_distrib_dir: Path,
pg_version: PgVersion,
pageserver_virtual_file_io_engine: str,
pageserver_aux_file_policy: Optional[AuxFileStore],
) -> Iterator[NeonEnv]:
"""
# Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES
@@ -1313,6 +1320,7 @@ def _shared_simple_env(
test_name=request.node.name,
test_output_dir=test_output_dir,
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
pageserver_aux_file_policy=pageserver_aux_file_policy,
) as builder:
env = builder.init_start()
@@ -1352,6 +1360,7 @@ def neon_env_builder(
test_overlay_dir: Path,
top_output_dir: Path,
pageserver_virtual_file_io_engine: str,
pageserver_aux_file_policy: Optional[AuxFileStore] = None,
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -1385,6 +1394,7 @@ def neon_env_builder(
test_name=request.node.name,
test_output_dir=test_output_dir,
test_overlay_dir=test_overlay_dir,
pageserver_aux_file_policy=pageserver_aux_file_policy,
) as builder:
yield builder
@@ -1544,6 +1554,7 @@ class NeonCli(AbstractNeonCli):
shard_stripe_size: Optional[int] = None,
placement_policy: Optional[str] = None,
set_default: bool = False,
aux_file_v2: Optional[AuxFileStore] = None,
) -> Tuple[TenantId, TimelineId]:
"""
Creates a new tenant, returns its id and its initial timeline's id.
@@ -1567,6 +1578,16 @@ class NeonCli(AbstractNeonCli):
product(["-c"], (f"{key}:{value}" for key, value in conf.items()))
)
)
if aux_file_v2 is AuxFileStore.V2:
args.extend(["-c", "switch_aux_file_policy:v2"])
if aux_file_v2 is AuxFileStore.V1:
args.extend(["-c", "switch_aux_file_policy:v1"])
if aux_file_v2 is AuxFileStore.CrossValidation:
args.extend(["-c", "switch_aux_file_policy:cross_validation"])
if set_default:
args.append("--set-default")
@@ -1703,30 +1724,46 @@ class NeonCli(AbstractNeonCli):
self,
config: Dict[str, Any],
force: Optional[str] = None,
pageserver_config_override: Optional[str] = None,
) -> "subprocess.CompletedProcess[str]":
with tempfile.NamedTemporaryFile(mode="w+") as tmp:
tmp.write(toml.dumps(config))
tmp.flush()
remote_storage = self.env.pageserver_remote_storage
cmd = ["init", f"--config={tmp.name}", "--pg-version", self.env.pg_version]
ps_config = {}
if remote_storage is not None:
ps_config["remote_storage"] = remote_storage_to_toml_dict(remote_storage)
if pageserver_config_override is not None:
for o in pageserver_config_override.split(";"):
override = toml.loads(o)
for key, value in override.items():
ps_config[key] = value
with ExitStack() as stack:
ps_config_file = stack.enter_context(tempfile.NamedTemporaryFile(mode="w+"))
ps_config_file.write(toml.dumps(ps_config))
ps_config_file.flush()
neon_local_config = stack.enter_context(tempfile.NamedTemporaryFile(mode="w+"))
neon_local_config.write(toml.dumps(config))
neon_local_config.flush()
cmd = [
"init",
f"--config={neon_local_config.name}",
"--pg-version",
self.env.pg_version,
f"--pageserver-config={ps_config_file.name}",
]
if force is not None:
cmd.extend(["--force", force])
storage = self.env.pageserver_remote_storage
append_pageserver_param_overrides(
params_to_update=cmd,
remote_storage=storage,
pageserver_config_override=self.env.pageserver_config_override,
)
s3_env_vars = None
if isinstance(storage, S3Storage):
s3_env_vars = storage.access_env_vars()
if isinstance(remote_storage, S3Storage):
s3_env_vars = remote_storage.access_env_vars()
res = self.raw_cli(cmd, extra_env_vars=s3_env_vars)
res.check_returncode()
return res
return res
def storage_controller_start(self):
cmd = ["storage_controller", "start"]
@@ -1741,16 +1778,10 @@ class NeonCli(AbstractNeonCli):
def pageserver_start(
self,
id: int,
overrides: Tuple[str, ...] = (),
extra_env_vars: Optional[Dict[str, str]] = None,
) -> "subprocess.CompletedProcess[str]":
start_args = ["pageserver", "start", f"--id={id}", *overrides]
start_args = ["pageserver", "start", f"--id={id}"]
storage = self.env.pageserver_remote_storage
append_pageserver_param_overrides(
params_to_update=start_args,
remote_storage=storage,
pageserver_config_override=self.env.pageserver_config_override,
)
if isinstance(storage, S3Storage):
s3_env_vars = storage.access_env_vars()
@@ -2414,9 +2445,42 @@ class NeonPageserver(PgProtocol, LogUtils):
return self.workdir / "tenants"
return self.workdir / "tenants" / str(tenant_shard_id)
@property
def config_toml_path(self) -> Path:
return self.workdir / "pageserver.toml"
def edit_config_toml(self, edit_fn: Callable[[Dict[str, Any]], None]):
"""
Edit the pageserver's config toml file in place.
"""
path = self.config_toml_path
with open(path, "r") as f:
config = toml.load(f)
edit_fn(config)
with open(path, "w") as f:
toml.dump(config, f)
def patch_config_toml_nonrecursive(self, patch: Dict[str, Any]) -> Dict[str, Any]:
"""
Non-recursively merge the given `patch` dict into the existing config toml, using `dict.update()`.
Returns the replaced values.
If there was no previous value, the key is mapped to None.
This allows to restore the original value by calling this method with the returned dict.
"""
replacements = {}
def doit(config: Dict[str, Any]):
while len(patch) > 0:
key, new = patch.popitem()
old = config.get(key, None)
config[key] = new
replacements[key] = old
self.edit_config_toml(doit)
return replacements
def start(
self,
overrides: Tuple[str, ...] = (),
extra_env_vars: Optional[Dict[str, str]] = None,
) -> "NeonPageserver":
"""
@@ -2426,9 +2490,7 @@ class NeonPageserver(PgProtocol, LogUtils):
"""
assert self.running is False
self.env.neon_cli.pageserver_start(
self.id, overrides=overrides, extra_env_vars=extra_env_vars
)
self.env.neon_cli.pageserver_start(self.id, extra_env_vars=extra_env_vars)
self.running = True
return self
@@ -2590,32 +2652,36 @@ class NeonPageserver(PgProtocol, LogUtils):
tenant_id, generation=self.env.storage_controller.attach_hook_issue(tenant_id, self.id)
)
def list_layers(self, tenant_id: TenantId, timeline_id: TimelineId) -> list[Path]:
"""
Inspect local storage on a pageserver to discover which layer files are present.
def append_pageserver_param_overrides(
params_to_update: List[str],
remote_storage: Optional[RemoteStorage],
pageserver_config_override: Optional[str] = None,
):
if remote_storage is not None:
remote_storage_toml_table = remote_storage_to_toml_inline_table(remote_storage)
:return: list of relative paths to layers, from the timeline root.
"""
timeline_path = self.timeline_dir(tenant_id, timeline_id)
params_to_update.append(
f"--pageserver-config-override=remote_storage={remote_storage_toml_table}"
def relative(p: Path) -> Path:
return p.relative_to(timeline_path)
return sorted(
list(
map(
relative,
filter(
lambda path: path.name != "metadata"
and "ephemeral" not in path.name
and "temp" not in path.name,
timeline_path.glob("*"),
),
)
)
)
else:
params_to_update.append('--pageserver-config-override=remote_storage=""')
env_overrides = os.getenv("NEON_PAGESERVER_OVERRIDES")
if env_overrides is not None:
params_to_update += [
f"--pageserver-config-override={o.strip()}" for o in env_overrides.split(";")
]
if pageserver_config_override is not None:
params_to_update += [
f"--pageserver-config-override={o.strip()}"
for o in pageserver_config_override.split(";")
]
def layer_exists(
self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: LayerFileName
) -> bool:
layers = self.list_layers(tenant_id, timeline_id)
return layer_name in [parse_layer_file_name(p.name) for p in layers]
class PgBin:

View File

@@ -89,6 +89,8 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
# During teardown, we stop the storage controller before the pageservers, so pageservers
# can experience connection errors doing background deletion queue work.
".*WARN deletion backend: calling control plane generation validation API failed.*Connection refused.*",
# Can happen when the test shuts down the storage controller while it is calling the utilization API
".*WARN.*path=/v1/utilization .*request was dropped before completing",
)

View File

@@ -819,6 +819,23 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
continue
self.download_layer(tenant_id, timeline_id, layer.layer_file_name)
def detach_ancestor(
self,
tenant_id: Union[TenantId, TenantShardId],
timeline_id: TimelineId,
batch_size: int | None = None,
) -> Set[TimelineId]:
params = {}
if batch_size is not None:
params["batch_size"] = batch_size
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/detach_ancestor",
params=params,
)
self.verbose_error(res)
json = res.json()
return set(map(TimelineId, json["reparented_timelines"]))
def evict_layer(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str
):

View File

@@ -1,3 +1,4 @@
import re
from dataclasses import dataclass
from typing import Any, Dict, Tuple, Union
@@ -47,46 +48,36 @@ class InvalidFileName(Exception):
pass
IMAGE_LAYER_FILE_NAME = re.compile("^([A-F0-9]{36})-([A-F0-9]{36})__([A-F0-9]{16})(-[a-f0-9]{8})?$")
def parse_image_layer(f_name: str) -> Tuple[int, int, int]:
"""Parse an image layer file name. Return key start, key end, and snapshot lsn"""
parts = f_name.split("__")
if len(parts) != 2:
raise InvalidFileName(f"expecting two parts separated by '__', got: {parts}")
key_parts = parts[0].split("-")
if len(key_parts) != 2:
raise InvalidFileName(
f"expecting two key parts separated by '--' in parts[0], got: {key_parts}"
)
try:
return int(key_parts[0], 16), int(key_parts[1], 16), int(parts[1], 16)
except ValueError as e:
raise InvalidFileName(f"conversion error: {f_name}") from e
match = IMAGE_LAYER_FILE_NAME.match(f_name)
if match is None:
raise InvalidFileName(f"'{f_name}' is not an image layer filename")
return int(match.group(1), 16), int(match.group(2), 16), int(match.group(3), 16)
DELTA_LAYER_FILE_NAME = re.compile(
"^([A-F0-9]{36})-([A-F0-9]{36})__([A-F0-9]{16})-([A-F0-9]{16})(-[a-f0-9]{8})?$"
)
def parse_delta_layer(f_name: str) -> Tuple[int, int, int, int]:
"""Parse a delta layer file name. Return key start, key end, lsn start, and lsn end"""
parts = f_name.split("__")
if len(parts) != 2:
raise InvalidFileName(f"expecting two parts separated by '__', got: {parts}")
key_parts = parts[0].split("-")
if len(key_parts) != 2:
raise InvalidFileName(
f"expecting two key parts separated by '--' in parts[0], got: {key_parts}"
)
lsn_parts = parts[1].split("-")
if len(lsn_parts) != 2:
raise InvalidFileName(
f"expecting two lsn parts separated by '--' in parts[1], got: {lsn_parts}"
)
try:
return (
int(key_parts[0], 16),
int(key_parts[1], 16),
int(lsn_parts[0], 16),
int(lsn_parts[1], 16),
)
except ValueError as e:
raise InvalidFileName(f"conversion error: {f_name}") from e
match = DELTA_LAYER_FILE_NAME.match(f_name)
if match is None:
raise InvalidFileName(f"'{f_name}' is not an delta layer filename")
return (
int(match.group(1), 16),
int(match.group(2), 16),
int(match.group(3), 16),
int(match.group(4), 16),
)
def parse_layer_file_name(file_name: str) -> LayerFileName:

View File

@@ -5,6 +5,7 @@ import pytest
from _pytest.python import Metafunc
from fixtures.pg_version import PgVersion
from fixtures.utils import AuxFileStore
"""
Dynamically parametrize tests by different parameters
@@ -31,6 +32,11 @@ def pageserver_virtual_file_io_engine() -> Optional[str]:
return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE")
@pytest.fixture(scope="function", autouse=True)
def pageserver_aux_file_policy() -> Optional[AuxFileStore]:
return None
def pytest_generate_tests(metafunc: Metafunc):
if (bt := os.getenv("BUILD_TYPE")) is None:
build_types = ["debug", "release"]

View File

@@ -141,11 +141,13 @@ class LocalFsStorage:
with self.heatmap_path(tenant_id).open("r") as f:
return json.load(f)
def to_toml_inline_table(self) -> str:
rv = {
def to_toml_dict(self) -> Dict[str, Any]:
return {
"local_path": str(self.root),
}
return toml.TomlEncoder().dump_inline_table(rv)
def to_toml_inline_table(self) -> str:
return toml.TomlEncoder().dump_inline_table(self.to_toml_dict())
def cleanup(self):
# no cleanup is done here, because there's NeonEnvBuilder.cleanup_local_storage which will remove everything, including localfs files
@@ -194,7 +196,7 @@ class S3Storage:
}
)
def to_toml_inline_table(self) -> str:
def to_toml_dict(self) -> Dict[str, Any]:
rv = {
"bucket_name": self.bucket_name,
"bucket_region": self.bucket_region,
@@ -206,7 +208,10 @@ class S3Storage:
if self.endpoint is not None:
rv["endpoint"] = self.endpoint
return toml.TomlEncoder().dump_inline_table(rv)
return rv
def to_toml_inline_table(self) -> str:
return toml.TomlEncoder().dump_inline_table(self.to_toml_dict())
def do_cleanup(self):
if not self.cleanup:
@@ -414,6 +419,13 @@ def default_remote_storage() -> RemoteStorageKind:
return RemoteStorageKind.LOCAL_FS
def remote_storage_to_toml_dict(remote_storage: RemoteStorage) -> Dict[str, Any]:
if not isinstance(remote_storage, (LocalFsStorage, S3Storage)):
raise Exception("invalid remote storage type")
return remote_storage.to_toml_dict()
# serialize as toml inline table
def remote_storage_to_toml_inline_table(remote_storage: RemoteStorage) -> str:
if not isinstance(remote_storage, (LocalFsStorage, S3Storage)):

View File

@@ -1,4 +1,5 @@
import contextlib
import enum
import json
import os
import re
@@ -484,3 +485,16 @@ def assert_no_errors(log_file, service, allowed_errors):
log.info(f"not allowed {service} error: {error.strip()}")
assert not errors, f"Log errors on {service}: {errors[0]}"
@enum.unique
class AuxFileStore(str, enum.Enum):
V1 = "V1"
V2 = "V2"
CrossValidation = "CrossValidation"
def __repr__(self) -> str:
return f"'aux-{self.value}'"
def __str__(self) -> str:
return f"'aux-{self.value}'"

View File

@@ -140,10 +140,14 @@ def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int, shape:
# start without gc so we can time compaction with less noise; use shorter
# period for compaction so it starts earlier
def patch_default_tenant_config(config):
tenant_config = config.get("tenant_config", {})
tenant_config["compaction_period"] = "3s"
tenant_config["gc_period"] = "0s"
config["tenant_config"] = tenant_config
env.pageserver.edit_config_toml(patch_default_tenant_config)
env.pageserver.start(
overrides=(
"--pageserver-config-override=tenant_config={ compaction_period = '3s', gc_period = '0s' }",
),
# this does print more than we want, but the number should be comparable between runs
extra_env_vars={
"RUST_LOG": f"[compaction_loop{{tenant_id={env.initial_tenant}}}]=debug,info"

View File

@@ -190,7 +190,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"trace_read_requests": True,
"walreceiver_connect_timeout": "13m",
"image_layer_creation_check_threshold": 1,
"switch_to_aux_file_v2": True,
"switch_aux_file_policy": "CrossValidation",
}
ps_http = env.pageserver.http_client()

View File

@@ -233,17 +233,18 @@ def test_forward_compatibility(
neon_env_builder.pageserver_validate_vectored_get = None
neon_env_builder.num_safekeepers = 3
neon_local_binpath = neon_env_builder.neon_binpath
# Use previous version's production binaries (pageserver, safekeeper, pg_distrib_dir, etc.).
# But always use the current version's neon_local binary.
# This is because we want to test the compatibility of the data format, not the compatibility of the neon_local CLI.
neon_env_builder.neon_binpath = compatibility_neon_bin
neon_env_builder.pg_distrib_dir = compatibility_postgres_distrib_dir
neon_env_builder.neon_local_binpath = neon_env_builder.neon_local_binpath
env = neon_env_builder.from_repo_dir(
compatibility_snapshot_dir / "repo",
neon_binpath=compatibility_neon_bin,
pg_distrib_dir=compatibility_postgres_distrib_dir,
)
# Use current neon_local even though we're using old binaries for
# everything else: our test code is written for latest CLI args.
env.neon_local_binpath = neon_local_binpath
neon_env_builder.start()
check_neon_works(

View File

@@ -5,7 +5,6 @@ from dataclasses import dataclass
from typing import Any, Dict, Iterable, Tuple
import pytest
import toml
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
@@ -45,17 +44,16 @@ def test_min_resident_size_override_handling(
ps_http.set_tenant_config(tenant_id, {})
assert_config(tenant_id, None, default_tenant_conf_value)
env.pageserver.stop()
if config_level_override is not None:
env.pageserver.start(
overrides=(
"--pageserver-config-override=tenant_config={ min_resident_size_override = "
+ str(config_level_override)
+ " }",
)
)
else:
env.pageserver.start()
def set_min_resident_size(config):
tenant_config = config.get("tenant_config", {})
tenant_config["min_resident_size_override"] = config_level_override
config["tenant_config"] = tenant_config
env.pageserver.edit_config_toml(set_min_resident_size)
env.pageserver.stop()
env.pageserver.start()
tenant_id, _ = env.neon_cli.create_tenant()
assert_overrides(tenant_id, config_level_override)
@@ -164,34 +162,32 @@ class EvictionEnv:
usage eviction task is unknown; it might need to run one more iteration
before assertions can be made.
"""
disk_usage_config = {
"period": period,
"max_usage_pct": max_usage_pct,
"min_avail_bytes": min_avail_bytes,
"mock_statvfs": mock_behavior,
"eviction_order": eviction_order.config(),
}
enc = toml.TomlEncoder()
# these can sometimes happen during startup before any tenants have been
# loaded, so nothing can be evicted, we just wait for next iteration which
# is able to evict.
pageserver.allowed_errors.append(".*WARN.* disk usage still high.*")
pageserver.start(
overrides=(
"--pageserver-config-override=disk_usage_based_eviction="
+ enc.dump_inline_table(disk_usage_config).replace("\n", " "),
pageserver.patch_config_toml_nonrecursive(
{
"disk_usage_based_eviction": {
"period": period,
"max_usage_pct": max_usage_pct,
"min_avail_bytes": min_avail_bytes,
"mock_statvfs": mock_behavior,
"eviction_order": eviction_order.config(),
},
# Disk usage based eviction runs as a background task.
# But pageserver startup delays launch of background tasks for some time, to prioritize initial logical size calculations during startup.
# But, initial logical size calculation may not be triggered if safekeepers don't publish new broker messages.
# But, we only have a 10-second-timeout in this test.
# So, disable the delay for this test.
"--pageserver-config-override=background_task_maximum_delay='0s'",
),
"background_task_maximum_delay": "0s",
}
)
pageserver.start()
# we now do initial logical size calculation on startup, which on debug builds can fight with disk usage based eviction
for tenant_id, timeline_id in self.timelines:
tenant_ps = self.neon_env.get_tenant_pageserver(tenant_id)

View File

@@ -2,6 +2,7 @@ import time
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.pageserver.types import parse_layer_file_name
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload_queue_empty,
@@ -86,14 +87,7 @@ def test_actually_duplicated_l1(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin)
# path = env.remote_storage.timeline_path(tenant_id, timeline_id)
l1_found = None
for path in env.pageserver.timeline_dir(tenant_id, timeline_id).iterdir():
if path.name == "metadata" or path.name.startswith("ephemeral-"):
continue
if len(path.suffixes) > 0:
# temp files
continue
for path in env.pageserver.list_layers(tenant_id, timeline_id):
[key_range, lsn_range] = path.name.split("__", maxsplit=1)
if "-" not in lsn_range:
@@ -108,19 +102,21 @@ def test_actually_duplicated_l1(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin)
if l1_found is not None:
raise RuntimeError(f"found multiple L1: {l1_found.name} and {path.name}")
l1_found = path
l1_found = parse_layer_file_name(path.name)
assert l1_found is not None, "failed to find L1 locally"
uploaded = env.pageserver_remote_storage.remote_layer_path(
tenant_id, timeline_id, l1_found.name
tenant_id, timeline_id, l1_found.to_str()
)
assert not uploaded.exists(), "to-be-overwritten should not yet be uploaded"
env.pageserver.start()
wait_until_tenant_active(pageserver_http, tenant_id)
assert not l1_found.exists(), "partial compaction result should had been removed during startup"
assert not env.pageserver.layer_exists(
tenant_id, timeline_id, l1_found
), "partial compaction result should had been removed during startup"
# wait for us to catch up again
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, lsn)
@@ -130,18 +126,18 @@ def test_actually_duplicated_l1(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin)
# give time for log flush
time.sleep(1)
message = f".*duplicated L1 layer layer={l1_found.name}"
message = f".*duplicated L1 layer layer={l1_found}"
found_msg = env.pageserver.log_contains(message)
# resident or evicted, it should not be overwritten, however it should had been non-existing at startup
assert (
found_msg is None
), "layer should had been removed during startup, did it live on as evicted?"
assert l1_found.exists(), "the L1 reappears"
assert env.pageserver.layer_exists(tenant_id, timeline_id, l1_found), "the L1 reappears"
wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id)
uploaded = env.pageserver_remote_storage.remote_layer_path(
tenant_id, timeline_id, l1_found.name
tenant_id, timeline_id, l1_found.to_str()
)
assert uploaded.exists(), "the L1 is uploaded"

View File

@@ -7,6 +7,7 @@ from fixtures.neon_fixtures import (
flush_ep_to_pageserver,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.types import parse_layer_file_name
from fixtures.pageserver.utils import wait_for_upload
from fixtures.remote_storage import RemoteStorageKind
@@ -57,9 +58,9 @@ def test_basic_eviction(
for sk in env.safekeepers:
sk.stop()
timeline_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
initial_local_layers = sorted(
list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))
initial_local_layers = dict(
(parse_layer_file_name(path.name), path)
for path in env.pageserver.list_layers(tenant_id, timeline_id)
)
assert (
len(initial_local_layers) > 1
@@ -73,6 +74,7 @@ def test_basic_eviction(
assert len(initial_local_layers) == len(
initial_layer_map_info.historic_layers
), "Should have the same layers in memory and on disk"
for returned_layer in initial_layer_map_info.historic_layers:
assert (
returned_layer.kind == "Delta"
@@ -81,27 +83,29 @@ def test_basic_eviction(
not returned_layer.remote
), f"All created layers should be present locally, but got {returned_layer}"
local_layers = list(
filter(lambda layer: layer.name == returned_layer.layer_file_name, initial_local_layers)
returned_layer_name = parse_layer_file_name(returned_layer.layer_file_name)
assert (
returned_layer_name in initial_local_layers
), f"Did not find returned layer {returned_layer_name} in local layers {list(initial_local_layers.keys())}"
local_layer_path = (
env.pageserver.timeline_dir(tenant_id, timeline_id)
/ initial_local_layers[returned_layer_name]
)
assert (
len(local_layers) == 1
), f"Did not find returned layer {returned_layer} in local layers {initial_local_layers}"
local_layer = local_layers[0]
assert (
returned_layer.layer_file_size == local_layer.stat().st_size
), f"Returned layer {returned_layer} has a different file size than local layer {local_layer}"
returned_layer.layer_file_size == local_layer_path.stat().st_size
), f"Returned layer {returned_layer} has a different file size than local layer {local_layer_path}"
# Detach all layers, ensre they are not in the local FS, but are still dumped as part of the layer map
for local_layer in initial_local_layers:
for local_layer_name, local_layer_path in initial_local_layers.items():
client.evict_layer(
tenant_id=tenant_id, timeline_id=timeline_id, layer_name=local_layer.name
tenant_id=tenant_id, timeline_id=timeline_id, layer_name=local_layer_path.name
)
assert not any(
new_local_layer.name == local_layer.name for new_local_layer in timeline_path.glob("*")
), f"Did not expect to find {local_layer} layer after evicting"
assert not env.pageserver.layer_exists(
tenant_id, timeline_id, local_layer_name
), f"Did not expect to find {local_layer_name} layer after evicting"
empty_layers = list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))
empty_layers = env.pageserver.list_layers(tenant_id, timeline_id)
assert not empty_layers, f"After evicting all layers, timeline {tenant_id}/{timeline_id} should have no layers locally, but got: {empty_layers}"
evicted_layer_map_info = client.layer_map_info(tenant_id=tenant_id, timeline_id=timeline_id)
@@ -118,15 +122,15 @@ def test_basic_eviction(
assert (
returned_layer.remote
), f"All layers should be evicted and not present locally, but got {returned_layer}"
assert any(
local_layer.name == returned_layer.layer_file_name
for local_layer in initial_local_layers
returned_layer_name = parse_layer_file_name(returned_layer.layer_file_name)
assert (
returned_layer_name in initial_local_layers
), f"Did not find returned layer {returned_layer} in local layers {initial_local_layers}"
# redownload all evicted layers and ensure the initial state is restored
for local_layer in initial_local_layers:
for local_layer_name, _local_layer_path in initial_local_layers.items():
client.download_layer(
tenant_id=tenant_id, timeline_id=timeline_id, layer_name=local_layer.name
tenant_id=tenant_id, timeline_id=timeline_id, layer_name=local_layer_name.to_str()
)
client.timeline_download_remote_layers(
tenant_id,
@@ -137,8 +141,9 @@ def test_basic_eviction(
at_least_one_download=False,
)
redownloaded_layers = sorted(
list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))
redownloaded_layers = dict(
(parse_layer_file_name(path.name), path)
for path in env.pageserver.list_layers(tenant_id, timeline_id)
)
assert (
redownloaded_layers == initial_local_layers

View File

@@ -6,7 +6,9 @@ from string import ascii_lowercase
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
AuxFileStore,
NeonEnv,
NeonEnvBuilder,
logical_replication_sync,
wait_for_last_flush_lsn,
)
@@ -18,6 +20,19 @@ def random_string(n: int):
return "".join([choice(ascii_lowercase) for _ in range(n)])
@pytest.mark.parametrize(
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.V2, AuxFileStore.CrossValidation]
)
def test_aux_file_v2_flag(neon_simple_env: NeonEnv, pageserver_aux_file_policy: AuxFileStore):
env = neon_simple_env
with env.pageserver.http_client() as client:
tenant_config = client.tenant_config(env.initial_tenant).effective_config
assert pageserver_aux_file_policy == tenant_config["switch_aux_file_policy"]
@pytest.mark.parametrize(
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation]
)
def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
@@ -159,6 +174,9 @@ COMMIT;
# Test that neon.logical_replication_max_snap_files works
@pytest.mark.parametrize(
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation]
)
def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg):
def slot_removed(ep):
assert (
@@ -203,8 +221,86 @@ def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg):
wait_until(number_of_iterations=10, interval=2, func=partial(slot_removed, endpoint))
# Tests that walsender correctly blocks until WAL is downloaded from safekeepers
def test_lr_with_slow_safekeeper(neon_env_builder: NeonEnvBuilder, vanilla_pg):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
env.neon_cli.create_branch("init")
endpoint = env.endpoints.create_start("init")
with endpoint.connect().cursor() as cur:
cur.execute("create table wal_generator (id serial primary key, data text)")
cur.execute(
"""
INSERT INTO wal_generator (data)
SELECT repeat('A', 1024) -- Generates a kilobyte of data per row
FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of data
"""
)
cur.execute("create table t(a int)")
cur.execute("create publication pub for table t")
cur.execute("insert into t values (1)")
vanilla_pg.start()
vanilla_pg.safe_psql("create table t(a int)")
connstr = endpoint.connstr().replace("'", "''")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub")
logical_replication_sync(vanilla_pg, endpoint)
vanilla_pg.stop()
# Pause the safekeepers so that they can't send WAL (except to pageserver)
for sk in env.safekeepers:
sk_http = sk.http_client()
sk_http.configure_failpoints([("sk-pause-send", "return")])
# Insert a 2
with endpoint.connect().cursor() as cur:
cur.execute("insert into t values (2)")
endpoint.stop_and_destroy()
# This new endpoint should contain [1, 2], but it can't access WAL from safekeeper
endpoint = env.endpoints.create_start("init")
with endpoint.connect().cursor() as cur:
cur.execute("select * from t")
res = [r[0] for r in cur.fetchall()]
assert res == [1, 2]
# Reconnect subscriber
vanilla_pg.start()
connstr = endpoint.connstr().replace("'", "''")
vanilla_pg.safe_psql(f"alter subscription sub1 connection '{connstr}'")
time.sleep(5)
# Make sure the 2 isn't replicated
assert [r[0] for r in vanilla_pg.safe_psql("select * from t")] == [1]
# Re-enable WAL download
for sk in env.safekeepers:
sk_http = sk.http_client()
sk_http.configure_failpoints([("sk-pause-send", "off")])
logical_replication_sync(vanilla_pg, endpoint)
assert [r[0] for r in vanilla_pg.safe_psql("select * from t")] == [1, 2]
# Check that local reads also work
with endpoint.connect().cursor() as cur:
cur.execute("insert into t values (3)")
logical_replication_sync(vanilla_pg, endpoint)
assert [r[0] for r in vanilla_pg.safe_psql("select * from t")] == [1, 2, 3]
log_path = vanilla_pg.pgdatadir / "pg.log"
with open(log_path, "r") as log_file:
logs = log_file.read()
assert "could not receive data from WAL stream" not in logs
# Test compute start at LSN page of which starts with contrecord
# https://github.com/neondatabase/neon/issues/5749
@pytest.mark.parametrize(
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation]
)
def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
@@ -295,6 +391,9 @@ def test_wal_page_boundary_start(neon_simple_env: NeonEnv, vanilla_pg):
# logical replication bug as such, but without logical replication,
# records passed ot the WAL redo process are never large enough to hit
# the bug.
@pytest.mark.parametrize(
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation]
)
def test_large_records(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
@@ -366,6 +465,9 @@ def test_slots_and_branching(neon_simple_env: NeonEnv):
ws_cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')")
@pytest.mark.parametrize(
"pageserver_aux_file_policy", [AuxFileStore.V1, AuxFileStore.CrossValidation]
)
def test_replication_shutdown(neon_simple_env: NeonEnv):
# Ensure Postgres can exit without stuck when a replication job is active + neon extension installed
env = neon_simple_env

View File

@@ -119,11 +119,11 @@ def test_ts_of_lsn_api(neon_env_builder: NeonEnvBuilder):
cur = endpoint_main.connect().cursor()
# Create table, and insert rows, each in a separate transaction
# Disable synchronous_commit to make this initialization go faster.
# Enable synchronous commit as we are timing sensitive
#
# Each row contains current insert LSN and the current timestamp, when
# the row was inserted.
cur.execute("SET synchronous_commit=off")
cur.execute("SET synchronous_commit=on")
cur.execute("CREATE TABLE foo (x integer)")
tbl = []
for i in range(1000):
@@ -132,7 +132,7 @@ def test_ts_of_lsn_api(neon_env_builder: NeonEnvBuilder):
after_timestamp = query_scalar(cur, "SELECT clock_timestamp()").replace(tzinfo=timezone.utc)
after_lsn = query_scalar(cur, "SELECT pg_current_wal_lsn()")
tbl.append([i, after_timestamp, after_lsn])
time.sleep(0.005)
time.sleep(0.02)
# Execute one more transaction with synchronous_commit enabled, to flush
# all the previous transactions

View File

@@ -12,7 +12,6 @@ from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
# test that we cannot override node id after init
def test_pageserver_init_node_id(
neon_simple_env: NeonEnv, neon_binpath: Path, pg_distrib_dir: Path
):
@@ -49,11 +48,7 @@ def test_pageserver_init_node_id(
bad_reinit = run_pageserver(good_init_cmd)
assert bad_reinit.returncode == 1, "pageserver refuses to init if already exists"
assert "already exists, cannot init it" in bad_reinit.stderr
bad_update = run_pageserver(["--update-config", "-c", "id = 3"])
assert bad_update.returncode == 1, "pageserver should not allow updating node id"
assert "has node id already, it cannot be overridden" in bad_update.stderr
assert "config file already exists" in bad_reinit.stderr
def check_client(env: NeonEnv, client: PageserverHttpClient):

View File

@@ -10,6 +10,7 @@ of the pageserver are:
"""
import enum
import os
import re
import time
from typing import Optional
@@ -220,7 +221,12 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
# We will start a pageserver with no control_plane_api set, so it won't be able to self-register
env.storage_controller.node_register(env.pageserver)
env.pageserver.start(overrides=('--pageserver-config-override=control_plane_api=""',))
replaced_config = env.pageserver.patch_config_toml_nonrecursive(
{
"control_plane_api": "",
}
)
env.pageserver.start()
env.storage_controller.node_configure(env.pageserver.id, {"availability": "Active"})
env.neon_cli.create_tenant(
@@ -251,8 +257,8 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
assert parse_generation_suffix(key) is None
env.pageserver.stop()
# Starting without the override that disabled control_plane_api
env.pageserver.patch_config_toml_nonrecursive(replaced_config)
env.pageserver.start()
generate_uploads_and_deletions(env, pageserver=env.pageserver, init=False)
@@ -525,9 +531,12 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
# incident, but it might be unavoidable: if so, we want to be able to start up
# and serve clients.
env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP
env.pageserver.start(
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",),
replaced = env.pageserver.patch_config_toml_nonrecursive(
{
"control_plane_emergency_mode": True,
}
)
env.pageserver.start()
# The pageserver should provide service to clients
generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver)
@@ -549,6 +558,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
# The pageserver should work fine when subsequently restarted in non-emergency mode
env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP
env.pageserver.patch_config_toml_nonrecursive(replaced)
env.pageserver.start()
generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver)
@@ -691,3 +701,50 @@ def test_multi_attach(
# All data we wrote while multi-attached remains readable
workload.validate(pageservers[2].id)
@pytest.mark.skip(reason="To be enabled after release with new local path style")
def test_upgrade_generationless_local_file_paths(
neon_env_builder: NeonEnvBuilder,
):
"""
Test pageserver behavior when startup up with local layer paths without
generation numbers: it should accept these layer files, and avoid doing
a delete/download cycle on them.
"""
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(1000)
env.pageserver.stop()
# Rename the local paths to legacy format, to simulate what
# we would see when upgrading
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
files_renamed = 0
for filename in os.listdir(timeline_dir):
path = os.path.join(timeline_dir, filename)
log.info(f"Found file {path}")
if path.endswith("-00000001"):
new_path = path[:-9]
os.rename(path, new_path)
log.info(f"Renamed {path} -> {new_path}")
files_renamed += 1
assert files_renamed > 0
env.pageserver.start()
workload.validate()
# Assert that there were no on-demand downloads
assert (
env.pageserver.http_client().get_metric_value(
"pageserver_remote_ondemand_downloaded_layers_total"
)
== 0
)

View File

@@ -20,7 +20,10 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
endpoint = env.endpoints.create_start("main")
pageserver_http = env.pageserver.http_client()
assert pageserver_http.get_metric_value("pageserver_tenant_manager_slots") == 1
assert (
pageserver_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"})
== 1
)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
@@ -55,7 +58,10 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
env.pageserver.start()
# We reloaded our tenant
assert pageserver_http.get_metric_value("pageserver_tenant_manager_slots") == 1
assert (
pageserver_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"})
== 1
)
cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)

View File

@@ -2,12 +2,12 @@ import json
import os
import random
import time
from pathlib import Path
from typing import Any, Dict, Optional
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver, S3Scrubber
from fixtures.pageserver.types import parse_layer_file_name
from fixtures.pageserver.utils import (
assert_prefix_empty,
poll_for_remote_storage_iterations,
@@ -51,9 +51,13 @@ def evict_random_layers(
if "ephemeral" in layer.name or "temp_download" in layer.name:
continue
layer_name = parse_layer_file_name(layer.name)
if rng.choice([True, False]):
log.info(f"Evicting layer {tenant_id}/{timeline_id} {layer.name}")
client.evict_layer(tenant_id=tenant_id, timeline_id=timeline_id, layer_name=layer.name)
log.info(f"Evicting layer {tenant_id}/{timeline_id} {layer_name.to_str()}")
client.evict_layer(
tenant_id=tenant_id, timeline_id=timeline_id, layer_name=layer_name.to_str()
)
@pytest.mark.parametrize("seed", [1, 2, 3])
@@ -402,32 +406,6 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
validate_heatmap(heatmap_second)
def list_layers(pageserver, tenant_id: TenantId, timeline_id: TimelineId) -> list[Path]:
"""
Inspect local storage on a pageserver to discover which layer files are present.
:return: list of relative paths to layers, from the timeline root.
"""
timeline_path = pageserver.timeline_dir(tenant_id, timeline_id)
def relative(p: Path) -> Path:
return p.relative_to(timeline_path)
return sorted(
list(
map(
relative,
filter(
lambda path: path.name != "metadata"
and "ephemeral" not in path.name
and "temp" not in path.name,
timeline_path.glob("*"),
),
)
)
)
def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
"""
Test the overall data flow in secondary mode:
@@ -482,8 +460,8 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
ps_secondary.http_client().tenant_secondary_download(tenant_id)
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
ps_secondary, tenant_id, timeline_id
assert ps_attached.list_layers(tenant_id, timeline_id) == ps_secondary.list_layers(
tenant_id, timeline_id
)
# Make changes on attached pageserver, check secondary downloads them
@@ -500,8 +478,8 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
ps_secondary.http_client().tenant_secondary_download(tenant_id)
try:
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
ps_secondary, tenant_id, timeline_id
assert ps_attached.list_layers(tenant_id, timeline_id) == ps_secondary.list_layers(
tenant_id, timeline_id
)
except:
# Do a full listing of the secondary location on errors, to help debug of
@@ -523,8 +501,8 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
# ==================================================================
try:
log.info("Evicting a layer...")
layer_to_evict = list_layers(ps_attached, tenant_id, timeline_id)[0]
some_other_layer = list_layers(ps_attached, tenant_id, timeline_id)[1]
layer_to_evict = ps_attached.list_layers(tenant_id, timeline_id)[0]
some_other_layer = ps_attached.list_layers(tenant_id, timeline_id)[1]
log.info(f"Victim layer: {layer_to_evict.name}")
ps_attached.http_client().evict_layer(
tenant_id, timeline_id, layer_name=layer_to_evict.name
@@ -537,13 +515,13 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
layer["name"] for layer in heatmap_after_eviction["timelines"][0]["layers"]
)
assert layer_to_evict.name not in heatmap_layers
assert some_other_layer.name in heatmap_layers
assert parse_layer_file_name(some_other_layer.name).to_str() in heatmap_layers
ps_secondary.http_client().tenant_secondary_download(tenant_id)
assert layer_to_evict not in list_layers(ps_attached, tenant_id, timeline_id)
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
ps_secondary, tenant_id, timeline_id
assert layer_to_evict not in ps_attached.list_layers(tenant_id, timeline_id)
assert ps_attached.list_layers(tenant_id, timeline_id) == ps_secondary.list_layers(
tenant_id, timeline_id
)
except:
# On assertion failures, log some details to help with debugging
@@ -630,7 +608,7 @@ def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):
for timeline_id in timelines:
log.info(f"Checking for secondary timeline {timeline_id} on node {ps_secondary.id}")
# One or more layers should be present for all timelines
assert list_layers(ps_secondary, tenant_id, timeline_id)
assert ps_secondary.list_layers(tenant_id, timeline_id)
# Delete the second timeline: this should be reflected later on the secondary
env.storage_controller.pageserver_api().timeline_delete(tenant_id, timelines[1])
@@ -645,10 +623,10 @@ def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):
ps_secondary = next(p for p in env.pageservers if p != ps_attached)
# This one was not deleted
assert list_layers(ps_secondary, tenant_id, timelines[0])
assert ps_secondary.list_layers(tenant_id, timelines[0])
# This one was deleted
assert not list_layers(ps_secondary, tenant_id, timelines[1])
assert not ps_secondary.list_layers(tenant_id, timelines[1])
t_end = time.time()
@@ -708,7 +686,7 @@ def test_slow_secondary_downloads(neon_env_builder: NeonEnvBuilder, via_controll
ps_attached.http_client().timeline_checkpoint(tenant_id, timeline_id)
# Expect lots of layers
assert len(list_layers(ps_attached, tenant_id, timeline_id)) > 10
assert len(ps_attached.list_layers(tenant_id, timeline_id)) > 10
# Simulate large data by making layer downloads artifically slow
for ps in env.pageservers:

View File

@@ -0,0 +1,35 @@
import json
import re
from pathlib import Path
from fixtures.neon_fixtures import PgBin
from fixtures.pg_version import PgVersion
def test_postgres_version(base_dir: Path, pg_bin: PgBin, pg_version: PgVersion):
"""Test that Postgres version matches the one we expect"""
with (base_dir / "vendor" / "revisions.json").open() as f:
expected_revisions = json.load(f)
output_prefix = pg_bin.run_capture(["postgres", "--version"], with_command_header=False)
stdout = Path(f"{output_prefix}.stdout")
assert stdout.exists(), "postgres --version didn't print anything to stdout"
with stdout.open() as f:
output = f.read().strip()
# `postgres --version` prints something like "postgres (PostgreSQL) 15.6 (85d809c124a898847a97d66a211f7d5ef4f8e0cb)".
pattern = r"postgres \(PostgreSQL\) (?P<version>\d+\.\d+) \((?P<commit>[0-9a-f]{40})\)"
match = re.search(pattern, output, re.IGNORECASE)
assert match is not None, f"Can't parse {output} with {pattern}"
version = match.group("version")
commit = match.group("commit")
assert (
pg_version.v_prefixed in expected_revisions
), f"Version `{pg_version.v_prefixed}` doesn't exist in `vendor/revisions.json`, please update it if these changes are intentional"
msg = f"Unexpected Postgres {pg_version} version: `{output}`, please update `vendor/revisions.json` if these changes are intentional"
assert [version, commit] == expected_revisions[pg_version.v_prefixed], msg

View File

@@ -1,6 +1,3 @@
# It's possible to run any regular test with the local fs remote storage via
# env NEON_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
import os
import queue
import shutil
@@ -15,6 +12,7 @@ from fixtures.neon_fixtures import (
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.pageserver.types import parse_layer_file_name
from fixtures.pageserver.utils import (
timeline_delete_wait_completed,
wait_for_last_record_lsn,
@@ -832,8 +830,9 @@ def test_compaction_waits_for_upload(
assert len(upload_stuck_layers) > 0
for name in upload_stuck_layers:
path = env.pageserver.timeline_dir(tenant_id, timeline_id) / name
assert path.exists(), "while uploads are stuck the layers should be present on disk"
assert env.pageserver.layer_exists(
tenant_id, timeline_id, parse_layer_file_name(name)
), "while uploads are stuck the layers should be present on disk"
# now this will do the L0 => L1 compaction and want to remove
# upload_stuck_layers and the original initdb L0
@@ -841,8 +840,9 @@ def test_compaction_waits_for_upload(
# as uploads are paused, the upload_stuck_layers should still be with us
for name in upload_stuck_layers:
path = env.pageserver.timeline_dir(tenant_id, timeline_id) / name
assert path.exists(), "uploads are stuck still over compaction"
assert env.pageserver.layer_exists(
tenant_id, timeline_id, parse_layer_file_name(name)
), "uploads are stuck still over compaction"
compacted_layers = client.layer_map_info(tenant_id, timeline_id).historic_by_name()
overlap = compacted_layers.intersection(upload_stuck_layers)
@@ -876,9 +876,8 @@ def test_compaction_waits_for_upload(
wait_until(10, 1, until_layer_deletes_completed)
for name in upload_stuck_layers:
path = env.pageserver.timeline_dir(tenant_id, timeline_id) / name
assert (
not path.exists()
assert not env.pageserver.layer_exists(
tenant_id, timeline_id, parse_layer_file_name(name)
), "l0 should now be removed because of L0 => L1 compaction and completed uploads"
# We should not have hit the error handling path in uploads where a uploaded file is gone

View File

@@ -47,7 +47,7 @@ def test_tenant_s3_restore(
tenant_id = env.initial_tenant
# Default tenant and the one we created
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1
assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 1
# create two timelines one being the parent of another, both with non-trivial data
parent = None
@@ -72,13 +72,13 @@ def test_tenant_s3_restore(
time.sleep(4)
assert (
ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1
ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 1
), "tenant removed before we deletion was issued"
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
tenant_delete_wait_completed(ps_http, tenant_id, iterations)
ps_http.deletion_queue_flush(execute=True)
assert (
ps_http.get_metric_value("pageserver_tenant_manager_slots") == 0
ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 0
), "tenant removed before we deletion was issued"
env.storage_controller.attach_hook_drop(tenant_id)
@@ -116,4 +116,4 @@ def test_tenant_s3_restore(
# There might be some activity that advances the lsn so we can't use a strict equality check
assert last_flush_lsn >= expected_last_flush_lsn, "last_flush_lsn too old"
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1
assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 1

View File

@@ -177,6 +177,67 @@ def test_sharding_split_unsharded(
env.storage_controller.consistency_check()
def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder):
"""
Test that after a split, we clean up parent layer data in the child shards via compaction.
"""
TENANT_CONF = {
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{128 * 1024}",
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "3600s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
}
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Check that we created with an unsharded TenantShardId: this is the default,
# but check it in case we change the default in future
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 0)) is not None
workload = Workload(env, tenant_id, timeline_id, branch_name="main")
workload.init()
workload.write_rows(256)
workload.validate()
workload.stop()
# Split one shard into two
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=2)
# Check we got the shard IDs we expected
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 2)) is not None
assert env.storage_controller.inspect(TenantShardId(tenant_id, 1, 2)) is not None
workload.validate()
workload.stop()
env.storage_controller.consistency_check()
# Cleanup part 1: while layers are still in PITR window, we should only drop layers that are fully redundant
for shard in shards:
ps = env.get_tenant_pageserver(shard)
# Invoke compaction: this should drop any layers that don't overlap with the shard's key stripes
detail_before = ps.http_client().timeline_detail(shard, timeline_id)
ps.http_client().timeline_compact(shard, timeline_id)
detail_after = ps.http_client().timeline_detail(shard, timeline_id)
# Physical size should shrink because some layers have been dropped
assert detail_after["current_physical_size"] < detail_before["current_physical_size"]
# Compaction shouldn't make anything unreadable
workload.validate()
def test_sharding_split_smoke(
neon_env_builder: NeonEnvBuilder,
):

View File

@@ -290,9 +290,12 @@ def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up
# This is the pageserver where we'll initially create the tenant. Run it in emergency
# mode so that it doesn't talk to storage controller, and do not register it.
env.pageservers[0].allowed_errors.append(".*Emergency mode!.*")
env.pageservers[0].start(
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",),
env.pageservers[0].patch_config_toml_nonrecursive(
{
"control_plane_emergency_mode": True,
}
)
env.pageservers[0].start()
origin_ps = env.pageservers[0]
# These are the pageservers managed by the sharding service, where the tenant

View File

@@ -64,7 +64,7 @@ def test_tenant_delete_smoke(
)
# Default tenant and the one we created
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 2
assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 2
# create two timelines one being the parent of another
parent = None
@@ -90,9 +90,9 @@ def test_tenant_delete_smoke(
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 2
assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 2
tenant_delete_wait_completed(ps_http, tenant_id, iterations)
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1
assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 1
tenant_path = env.pageserver.tenant_dir(tenant_id)
assert not tenant_path.exists()
@@ -108,7 +108,7 @@ def test_tenant_delete_smoke(
)
# Deletion updates the tenant count: the one default tenant remains
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1
assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 1
class Check(enum.Enum):
@@ -532,7 +532,9 @@ def test_tenant_delete_concurrent(
# The TenantSlot is still present while the original request is hung before
# final removal
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1
assert (
ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 1
)
# Permit the original request to run to success
ps_http.configure_failpoints((BEFORE_REMOVE_FAILPOINT, "off"))
@@ -556,7 +558,8 @@ def test_tenant_delete_concurrent(
)
# Zero tenants remain (we deleted the default tenant)
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 0
assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 0
assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "inprogress"}) == 0
def test_tenant_delete_races_timeline_creation(
@@ -673,7 +676,7 @@ def test_tenant_delete_races_timeline_creation(
)
# Zero tenants remain (we deleted the default tenant)
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 0
assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 0
def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder):

View File

@@ -668,9 +668,9 @@ def test_synthetic_size_while_deleting(neon_env_builder: NeonEnvBuilder):
client.configure_failpoints((failpoint, "off"))
with pytest.raises(
PageserverApiException, match="Failed to refresh gc_info before gathering inputs"
):
# accept both, because the deletion might still complete before
matcher = "(Failed to refresh gc_info before gathering inputs|NotFound: tenant)"
with pytest.raises(PageserverApiException, match=matcher):
completion.result()
# this happens on both cases

View File

@@ -18,6 +18,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
last_flush_lsn_upload,
)
from fixtures.pageserver.types import parse_layer_file_name
from fixtures.pageserver.utils import (
assert_tenant_state,
wait_for_last_record_lsn,
@@ -246,7 +247,10 @@ def test_tenant_redownloads_truncated_file_on_startup(
# ensure the same size is found from the index_part.json
index_part = env.pageserver_remote_storage.index_content(tenant_id, timeline_id)
assert index_part["layer_metadata"][path.name]["file_size"] == expected_size
assert (
index_part["layer_metadata"][parse_layer_file_name(path.name).to_str()]["file_size"]
== expected_size
)
## Start the pageserver. It will notice that the file size doesn't match, and
## rename away the local file. It will be re-downloaded when it's needed.
@@ -276,7 +280,7 @@ def test_tenant_redownloads_truncated_file_on_startup(
# the remote side of local_layer_truncated
remote_layer_path = env.pageserver_remote_storage.remote_layer_path(
tenant_id, timeline_id, path.name
tenant_id, timeline_id, parse_layer_file_name(path.name).to_str()
)
# if the upload ever was ongoing, this check would be racy, but at least one

View File

@@ -0,0 +1,410 @@
import enum
from concurrent.futures import ThreadPoolExecutor
from queue import Empty, Queue
from threading import Barrier
from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import HistoricLayerInfo
from fixtures.pageserver.utils import wait_timeline_detail_404
from fixtures.types import Lsn, TimelineId
def by_end_lsn(info: HistoricLayerInfo) -> Lsn:
assert info.lsn_end is not None
return Lsn(info.lsn_end)
def layer_name(info: HistoricLayerInfo) -> str:
return info.layer_file_name
@enum.unique
class Branchpoint(str, enum.Enum):
"""
Have branches at these Lsns possibly relative to L0 layer boundary.
"""
EARLIER = "earlier"
AT_L0 = "at"
AFTER_L0 = "after"
LAST_RECORD_LSN = "head"
def __str__(self) -> str:
return self.value
@staticmethod
def all() -> List["Branchpoint"]:
return [
Branchpoint.EARLIER,
Branchpoint.AT_L0,
Branchpoint.AFTER_L0,
Branchpoint.LAST_RECORD_LSN,
]
@pytest.mark.parametrize("branchpoint", Branchpoint.all())
@pytest.mark.parametrize("restart_after", [True, False])
def test_ancestor_detach_branched_from(
neon_env_builder: NeonEnvBuilder, branchpoint: Branchpoint, restart_after: bool
):
"""
Creates a branch relative to L0 lsn boundary according to Branchpoint. Later the timeline is detached.
"""
# TODO: parametrize; currently unimplemented over at pageserver
write_to_branch_first = True
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(
[
".*initial size calculation failed: downloading failed, possibly for shutdown",
".*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
]
)
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
ep.safe_psql("CREATE TABLE foo (i BIGINT);")
after_first_tx = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(0, 8191) g(i);")
# create a single layer for us to remote copy
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
client.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(8192, 16383) g(i);")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
deltas = client.layer_map_info(env.initial_tenant, env.initial_timeline).delta_layers()
# there is also the in-mem layer, but ignore it for now
assert len(deltas) == 2, "expecting there to be two deltas: initdb and checkpointed"
later_delta = max(deltas, key=by_end_lsn)
assert later_delta.lsn_end is not None
# -1 as the lsn_end is exclusive.
last_lsn = Lsn(later_delta.lsn_end).lsn_int - 1
if branchpoint == Branchpoint.EARLIER:
branch_at = after_first_tx
rows = 0
truncated_layers = 1
elif branchpoint == Branchpoint.AT_L0:
branch_at = Lsn(last_lsn)
rows = 8192
truncated_layers = 0
elif branchpoint == Branchpoint.AFTER_L0:
branch_at = Lsn(last_lsn + 8)
rows = 8192
# as there is no 8 byte walrecord, nothing should get copied from the straddling layer
truncated_layers = 0
else:
# this case also covers the implicit flush of ancestor as the inmemory hasn't been flushed yet
assert branchpoint == Branchpoint.LAST_RECORD_LSN
branch_at = None
rows = 16384
truncated_layers = 0
name = "new main"
timeline_id = env.neon_cli.create_branch(
name, "main", env.initial_tenant, ancestor_start_lsn=branch_at
)
recorded = Lsn(client.timeline_detail(env.initial_tenant, timeline_id)["ancestor_lsn"])
if branch_at is None:
# fix it up if we need it later (currently unused)
branch_at = recorded
else:
assert branch_at == recorded, "the test should not use unaligned lsns"
if write_to_branch_first:
with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
# make sure the ep is writable
# with BEFORE_L0, AFTER_L0 there will be a gap in Lsns caused by accurate end_lsn on straddling layers
ep.safe_psql("CREATE TABLE audit AS SELECT 1 as starts;")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, timeline_id)
# branch must have a flush for "PREV_LSN: none"
client.timeline_checkpoint(env.initial_tenant, timeline_id)
branch_layers = set(
map(layer_name, client.layer_map_info(env.initial_tenant, timeline_id).historic_layers)
)
else:
branch_layers = set()
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert all_reparented == set()
if restart_after:
env.pageserver.stop()
env.pageserver.start()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == 16384
with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
old_main_info = client.layer_map_info(env.initial_tenant, env.initial_timeline)
old_main = set(map(layer_name, old_main_info.historic_layers))
new_main_info = client.layer_map_info(env.initial_tenant, timeline_id)
new_main = set(map(layer_name, new_main_info.historic_layers))
new_main_copied_or_truncated = new_main - branch_layers
new_main_truncated = new_main_copied_or_truncated - old_main
assert len(new_main_truncated) == truncated_layers
# could additionally check that the symmetric difference has layers starting at the same lsn
# but if nothing was copied, then there is no nice rule.
# there could be a hole in LSNs between copied from the "old main" and the first branch layer.
client.timeline_delete(env.initial_tenant, env.initial_timeline)
wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline, 10, 1.0)
@pytest.mark.parametrize("restart_after", [True, False])
def test_ancestor_detach_reparents_earlier(neon_env_builder: NeonEnvBuilder, restart_after: bool):
"""
The case from RFC:
+-> another branch with same ancestor_lsn as new main
|
old main -------|---------X--------->
| | |
| | +-> after
| |
| +-> new main
|
+-> reparented
Ends up as:
old main --------------------------->
|
+-> after
+-> another branch with same ancestor_lsn as new main
|
new main -------|---------|->
|
+-> reparented
We confirm the end result by being able to delete "old main" after deleting "after".
"""
# TODO: support not yet implemented for these
write_to_branch_first = True
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(
[
".*initial size calculation failed: downloading failed, possibly for shutdown",
# after restart this is likely to happen if there is other load on the runner
".*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
]
)
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
ep.safe_psql("CREATE TABLE foo (i BIGINT);")
ep.safe_psql("CREATE TABLE audit AS SELECT 1 as starts;")
branchpoint_pipe = wait_for_last_flush_lsn(
env, ep, env.initial_tenant, env.initial_timeline
)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(0, 8191) g(i);")
branchpoint_x = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
client.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
ep.safe_psql("INSERT INTO foo SELECT i::bigint FROM generate_series(8192, 16383) g(i);")
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
# as this only gets reparented, we don't need to write to it like new main
reparented = env.neon_cli.create_branch(
"reparented", "main", env.initial_tenant, ancestor_start_lsn=branchpoint_pipe
)
same_branchpoint = env.neon_cli.create_branch(
"same_branchpoint", "main", env.initial_tenant, ancestor_start_lsn=branchpoint_x
)
timeline_id = env.neon_cli.create_branch(
"new main", "main", env.initial_tenant, ancestor_start_lsn=branchpoint_x
)
after = env.neon_cli.create_branch("after", "main", env.initial_tenant, ancestor_start_lsn=None)
if write_to_branch_first:
with env.endpoints.create_start("new main", tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == 8192
with ep.cursor() as cur:
cur.execute("UPDATE audit SET starts = starts + 1")
assert cur.rowcount == 1
wait_for_last_flush_lsn(env, ep, env.initial_tenant, timeline_id)
client.timeline_checkpoint(env.initial_tenant, timeline_id)
all_reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert all_reparented == {reparented, same_branchpoint}
if restart_after:
env.pageserver.stop()
env.pageserver.start()
env.pageserver.quiesce_tenants()
# checking the ancestor after is much faster than waiting for the endpoint not start
expected_result = [
("main", env.initial_timeline, None, 16384, 1),
("after", after, env.initial_timeline, 16384, 1),
("new main", timeline_id, None, 8192, 2),
("same_branchpoint", same_branchpoint, timeline_id, 8192, 1),
("reparented", reparented, timeline_id, 0, 1),
]
for _, timeline_id, expected_ancestor, _, _ in expected_result:
details = client.timeline_detail(env.initial_tenant, timeline_id)
ancestor_timeline_id = details["ancestor_timeline_id"]
if expected_ancestor is None:
assert ancestor_timeline_id is None
else:
assert TimelineId(ancestor_timeline_id) == expected_ancestor
for name, _, _, rows, starts in expected_result:
with env.endpoints.create_start(name, tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
assert ep.safe_psql(f"SELECT count(*) FROM audit WHERE starts = {starts}")[0][0] == 1
# delete the timelines to confirm detach actually worked
client.timeline_delete(env.initial_tenant, after)
wait_timeline_detail_404(client, env.initial_tenant, after, 10, 1.0)
client.timeline_delete(env.initial_tenant, env.initial_timeline)
wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline, 10, 1.0)
@pytest.mark.parametrize("restart_after", [True, False])
def test_detached_receives_flushes_while_being_detached(
neon_env_builder: NeonEnvBuilder, restart_after: bool
):
"""
Makes sure that the timeline is able to receive writes through-out the detach process.
"""
write_to_branch_first = True
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
# row counts have been manually verified to cause reconnections and getpage
# requests when restart_after=False with pg16
def insert_rows(n: int, ep) -> int:
ep.safe_psql(
f"INSERT INTO foo SELECT i::bigint, 'more info!! this is a long string' || i FROM generate_series(0, {n - 1}) g(i);"
)
return n
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep:
ep.safe_psql("CREATE EXTENSION neon_test_utils;")
ep.safe_psql("CREATE TABLE foo (i BIGINT, aux TEXT NOT NULL);")
rows = insert_rows(256, ep)
branchpoint = wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
timeline_id = env.neon_cli.create_branch(
"new main", "main", tenant_id=env.initial_tenant, ancestor_start_lsn=branchpoint
)
log.info("starting the new main endpoint")
ep = env.endpoints.create_start("new main", tenant_id=env.initial_tenant)
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
if write_to_branch_first:
rows += insert_rows(256, ep)
wait_for_last_flush_lsn(env, ep, env.initial_tenant, timeline_id)
client.timeline_checkpoint(env.initial_tenant, timeline_id)
log.info("completed {write_to_branch_first=}")
def small_txs(ep, queue: Queue[str], barrier):
extra_rows = 0
with ep.connect() as conn:
while True:
try:
queue.get_nowait()
break
except Empty:
pass
if barrier is not None:
barrier.wait()
barrier = None
cursor = conn.cursor()
cursor.execute(
"INSERT INTO foo(i, aux) VALUES (1, 'more info!! this is a long string' || 1);"
)
extra_rows += 1
return extra_rows
with ThreadPoolExecutor(max_workers=1) as exec:
queue: Queue[str] = Queue()
barrier = Barrier(2)
completion = exec.submit(small_txs, ep, queue, barrier)
barrier.wait()
reparented = client.detach_ancestor(env.initial_tenant, timeline_id)
assert len(reparented) == 0
if restart_after:
# ep and row production is kept alive on purpose
env.pageserver.stop()
env.pageserver.start()
env.pageserver.quiesce_tenants()
queue.put("done")
extra_rows = completion.result()
assert extra_rows > 0, "some rows should had been written"
rows += extra_rows
assert client.timeline_detail(env.initial_tenant, timeline_id)["ancestor_timeline_id"] is None
assert ep.safe_psql("SELECT clear_buffer_cache();")
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
assert ep.safe_psql("SELECT SUM(LENGTH(aux)) FROM foo")[0][0] != 0
ep.stop()
# finally restart the endpoint and make sure we still have the same answer
with env.endpoints.create_start("new main", tenant_id=env.initial_tenant) as ep:
assert ep.safe_psql("SELECT count(*) FROM foo;")[0][0] == rows
env.pageserver.allowed_errors.append(
"initial size calculation failed: downloading failed, possibly for shutdown"
)
# TODO:
# - after starting the operation, tenant is deleted
# - after starting the operation, pageserver is shutdown, restarted
# - after starting the operation, bottom-most timeline is deleted, pageserver is restarted, gc is inhibited
# - deletion of reparented while reparenting should fail once, then succeed (?)
# - branch near existing L1 boundary, image layers?
# - investigate: why are layers started at uneven lsn? not just after branching, but in general.

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "8ef3c33aa01631e17cb24a122776349fcc777b46",
"postgres-v15": "f0d6b0ef7581bd78011832e23d8420a7d2c8a83a",
"postgres-v14": "d6f7e2c604bfc7cbc4c46bcea0a8e800f4bc778a"
"v16": ["16.2", "8ef3c33aa01631e17cb24a122776349fcc777b46"],
"v15": ["15.6", "f0d6b0ef7581bd78011832e23d8420a7d2c8a83a"],
"v14": ["14.11", "d6f7e2c604bfc7cbc4c46bcea0a8e800f4bc778a"]
}