Compare commits

...

22 Commits

Author SHA1 Message Date
Em Sharnoff
8b11e3bc9c compute/sql_exporter: Bump max WSS window from 1h -> 3h
Concretely, this:

1. Changes the normal (public) exporter to add a new 3-hour working set
   size label onto the existing 5-minute, 15-minute, and 1-hour values.

2. Extends the range on the autoscaling exporter from 1..60 minutes to
   1..180 minutes -- keeping the same density, just 3x longer.
2024-12-24 17:27:48 -08:00
Alex Chi Z.
9c53b41245 fix(pageserver): update remote latest_gc_cutoff after gc-compaction (#10209)
## Problem

close https://github.com/neondatabase/neon/issues/10208
part of #9114 

## Summary of changes

* Ensure remote `latest_gc_cutoff` is up-to-date before removing any
files for gc-compaction.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-12-19 18:40:20 +00:00
Konstantin Knizhnik
197a89ab3d Increase default stotrage controller heartbeat interval from 100msec … (#10206)
## Problem

Currently default value of storage controller heartbeat interval is
100msec. It means that 10 times per second it establish connection to
PS. And it seems to be quite expensive.
At MacOS right now storage_controller consumes 70% CPU and trusts - 30%.
So together they completely utilize one core.
A lot of us has Macs. Let's save environment a little bit and do not
waste electricity and contribute to global warming.

By the way, on prod we have interval  10seconds 

## Summary of changes

Increase heartbeat interval from 100msec to 1 second.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-12-19 18:32:32 +00:00
Alex Chi Z.
b89e02f3e8 fix(pageserver): consider partial compaction layer map in layer check (#10044)
## Problem

In https://github.com/neondatabase/neon/pull/9897 we temporarily
disabled the layer valid check because the current one only considers
the end result of all compaction algorithms, but partial gc-compaction
would temporarily produce an "invalid" layer map.

part of https://github.com/neondatabase/neon/issues/9114

## Summary of changes

Allow LSN splits to overlap in the slow path check. Currently, the valid
check is only used in storage scrubber (background job) and during
gc-compaction (without taking layer lock). Therefore, it's fine for such
checks to be a little bit inefficient but more accurate.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2024-12-19 18:04:53 +00:00
Konstantin Knizhnik
04517c6ff3 Do not reload config file on PS reconnect (#10204)
## Problem

See https://github.com/neondatabase/neon/issues/10184
and
https://neondb.slack.com/archives/C04DGM6SMTM/p1733997259898819

Reloading config file inside parallel worker cause it's termination

## Summary of changes

Remove call of `HandleMainLoopInterrupts()` 
Update of page server URL is propagated by postmaster through shared
memory and we should not reload config for it.

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-12-19 15:22:39 +00:00
Vlad Lazar
628451d68e safekeeper: short-circuit interpreted wal sender (#10202)
## Problem

Safekeeper may currently send a batch to the pageserver even if it
hasn't decoded a new record.
I think this is quite unlikely in the field, but worth adressing.

## Summary of changes

Don't send anything if we haven't decoded a full record. Once this
merges and releases, the `InterpretedWalRecords` struct can be updated
to remove the Option wrapper for `next_record_lsn`.
2024-12-19 14:04:46 +00:00
Vlad Lazar
502d512fe2 safekeeper: lift benchmarking utils into safekeeper crate (#10200)
## Problem

The benchmarking utilities are also useful for testing. We want to write
tests in the safekeeper crate.

## Summary of changes

This commit lifts the utils to the safekeeper crate. They are compiled
if the benchmarking features is enabled or if in test mode.
2024-12-19 14:04:42 +00:00
John Spray
afda6d4700 storage_scrubber: don't report half-created timelines as corruption (#10198)
## Problem

test_timeline_archival_chaos does timeline creation with failure
injection, and thereby sometimes leaves timelines in a part created
state. This was being reported as corruption by the scrubber on test
teardown, because it considered a layer without an index to be an
invalid state. This was incorrect: the scrubber should accept this
state, it occurs legitimately during timeline creation.

Closes: https://github.com/neondatabase/neon/issues/9988

## Summary of changes

- Report a timeline with layers but no index as Relic rather than
MissingIndexPart.
- We retain the MissingIndexPart variant for the case where an index
_was_ found in the listing, but was not found by a subsequent GET, i.e.
racing with deletion.
2024-12-19 12:55:05 +00:00
John Spray
65042cbadd tests: use high IO concurrency in test_pgdata_import_smoke, use effective_io_concurrency=2 in tests by default (#10114)
## Problem

`test_pgdata_import_smoke` writes two gigabytes of pages and then reads
them back serially. This is CPU bottlenecked and results in a long
runtime, and sensitivity to CPU load from other tests on the same
machine.

Closes: https://github.com/neondatabase/neon/issues/10071

## Summary of changes

- Use effective_io_concurrency=32 when doing sequential scans through
2GiB of pages in test_pgdata_import_smoke. This is a ~10x runtime
decrease in the parts of the test that do sequential scans.
- Also set `effective_io_concurrency=2` for tests, as I noticed while
debugging that we were doing all getpage requests serially, which is bad
for checking the stability of the batching code.
2024-12-19 10:58:49 +00:00
Folke Behrens
b135194090 proxy: Delay SASL complete message until auth is done (#10189)
The final SASL complete message can be bundled with the remainder of the
auth flow messages until ReadyForQuery.

neondatabase/cloud#19184
2024-12-19 10:37:08 +00:00
Peter Bendel
43dc03459d Run pgbench on 10 GB scale factor on database with n relations (e.g. 10k) (#10172)
## Problem

We want to verify how much / if pgbench throughput and latency on Neon
suffers if the database contains many other relations, too.

## Summary of changes

Modify the benchmarking.yml pgbench-compare job to
- create an addiitional project at scale factor 10 GiB
- before running pgbench add n tables (initially 10k) to the database
- then compare the pgbench throughput and latency to the existing
pgbench-compare at 10 Gib scale factor

We use a realistic template for the n relations that is a partitioned
table with some realistic data types, indexes and constraints - similar
to a table that we use internally.

Example run:
https://github.com/neondatabase/neon/actions/runs/12377565956/job/34547386959
2024-12-19 10:25:44 +00:00
Christian Schwarz
a1b0558493 fast import: importer: use aws s3 cli (#10162)
## Problem

s5cmd doesn't pick up the pod service account

```
2024/12/16 16:26:01 Ignoring, HTTP credential provider invalid endpoint host, "169.254.170.23", only loopback hosts are allowed. <nil>
ERROR "ls s3://neon-dev-bulk-import-us-east-2/import-pgdata/fast-import/v1/br-wandering-hall-w2xobawv": NoCredentialProviders: no valid providers in chain. Deprecated. For verbose messaging see aws.Config.CredentialsChainVerboseErrors
```

## Summary of changes

Switch to offical CLI.


## Testing

Tested the pre-merge image in staging, using `job_image` override in
project settings.


https://neondb.slack.com/archives/C033RQ5SPDH/p1734554944391949?thread_ts=1734368383.258759&cid=C033RQ5SPDH

## Future Work

Switch back to s5cmd once https://github.com/peak/s5cmd/pull/769 gets
merged.

## Refs

- fixes https://github.com/neondatabase/cloud/issues/21876

---------

Co-authored-by: Gleb Novikov <NanoBjorn@users.noreply.github.com>
2024-12-19 10:04:17 +00:00
Alex Chi Z.
cc138b56f9 fix(pageserver): run psql in thread to avoid blocking (#10177)
## Problem

ref https://github.com/neondatabase/neon/issues/10170
ref https://github.com/neondatabase/neon/issues/9994

The psql command will block the main thread, causing other async tasks
to timeout (i.e., HTTP connect). Therefore, we need to move it to an I/O
executor thread.

## Summary of changes

* run psql connection in a thread

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: John Spray <john@neon.tech>
2024-12-19 09:45:06 +00:00
Konstantin Knizhnik
61fcf64c22 Fix flukyness of test_physical_and_logical_replicaiton.py (#10176)
## Problem

See https://github.com/neondatabase/neon/issues/10037
test_physical_and_logical_replication.py sometimes failed.

## Summary of changes

Add `wait_replica_caughtup` to wait for replica sync

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-12-18 19:15:38 +00:00
Alex Chi Z.
6d3e8096fc refactor(test): tighten up test_gc_feedback (#10126)
## Problem

In https://github.com/neondatabase/neon/pull/8103 we changed the test
case to have more test coverage of gc_compaction. Now that we have
`test_gc_compaction_smoke`, we can revert this test case to serve its
original purpose and revert the parameter changes.

part of https://github.com/neondatabase/neon/issues/9114

## Summary of changes

* Revert pitr_interval from 60s to 10s.
* Assert the physical/logical size ratio in the benchmark.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2024-12-18 18:10:05 +00:00
Alex Chi Z.
3d1c3a80ae feat(pageserver): add compact queue http endpoint (#10173)
## Problem

We cannot get the size of the compaction queue and access the info.

Part of #9114 

## Summary of changes

* Add an API endpoint to get the compaction queue.
* gc_compaction test case now waits until the compaction finishes.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-12-18 18:09:02 +00:00
John Spray
835287ba3a neon_local: add a flock to protect against concurrent execution (#10185)
## Problem

`neon_local` has always been unsafe to run concurrently with itself: it
uses simple text files for persistent state, and concurrent runs will
step on each other.

In some test environments we intentionally handle this with mutexes in
python land, but it's fragile to try and always remember to do that.

## Summary of changes

- Add a `flock` based mutex around the `main` function of neon_local,
using the repo directory as the file to lock
- Clean up an Option<> around control_plane_api, this is a drive-by
change because it was one of the fields that had a weird effect when
previous concurrent stuff stamped on it.
2024-12-18 16:29:47 +00:00
Conrad Ludgate
d63602cc78 chore(proxy): fully remove allow-self-signed-compute flag (#10168)
When https://github.com/neondatabase/cloud/pull/21856 is merged, this
flag is no longer necessary.
2024-12-18 16:03:14 +00:00
Erik Grinaker
1668d39b7c safekeeper: fix typo in allowlist for /profile/heap (#10186) 2024-12-18 15:51:53 +00:00
Alex Chi Z.
1d12efc428 fix(pageserver): allow repartition errors during gc-compaction smoke tests (#10164)
## Problem

part of https://github.com/neondatabase/neon/issues/9114

In https://github.com/neondatabase/neon/pull/10127 we fixed the race,
but we didn't add the errors to the allowlist.

## Summary of changes

* Allow repartition errors in the gc-compaction smoke test.

I think it might be worth to refactor the code to allow multiple threads
getting a copy of repartition status (i.e., using Rcu) in the future.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-12-18 15:37:26 +00:00
Arpad Müller
85696297c5 Add safekeepers command to storcon_cli for listing (#10151)
Add a `safekeepers` subcommand to `storcon_cli` that allows listing the
safekeepers.

```
$ curl -X POST --url http://localhost:1234/control/v1/safekeeper/42 --data \
  '{"active":true, "id":42, "created_at":"2023-10-25T09:11:25Z", "updated_at":"2024-08-28T11:32:43Z","region_id":"neon_local","host":"localhost","port":5454,"http_port":0,"version":123,"availability_zone_id":"us-east-2b"}'
$ cargo run --bin storcon_cli  -- --api http://localhost:1234 safekeepers
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.38s
     Running `target/debug/storcon_cli --api 'http://localhost:1234' safekeepers`
+----+---------+-----------+------+-----------+------------+
| Id | Version | Host      | Port | Http Port | AZ Id      |
+==========================================================+
| 42 | 123     | localhost | 5454 | 0         | us-east-2b |
+----+---------+-----------+------+-----------+------------+
```

Also:

* Don't return the raw `SafekeeperPersistence` struct that contains the
raw database presentation, but instead a new
`SafekeeperDescribeResponse` struct.
* The `SafekeeperPersistence` struct leaves out the `active` field on
purpose because we want to deprecate it and replace it with a
`scheduling_policy` one.

Part of https://github.com/neondatabase/neon/issues/9981
2024-12-18 12:47:56 +00:00
Konstantin Knizhnik
aaf980f70d Online checkpoint replication state (#9976)
## Problem

See https://neondb.slack.com/archives/C04DGM6SMTM/p1733180965970089

Replication state is checkpointed only by shutdown checkpoint.
It means that replication snapshots are not removed till compute
shutdown.

## Summary of changes

Checkpoint replication state during online checkpoint

Related Postgres PR:
https://github.com/neondatabase/postgres/pull/546

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2024-12-18 09:34:38 +00:00
54 changed files with 810 additions and 344 deletions

View File

@@ -308,6 +308,7 @@ jobs:
"image": [ "'"$image_default"'" ],
"include": [{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-freetier", "db_size": "3gb" ,"runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new", "db_size": "10gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new-many-tables","db_size": "10gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new", "db_size": "50gb","runner": '"$runner_default"', "image": "'"$image_default"'" },
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-freetier", "db_size": "3gb" ,"runner": '"$runner_azure"', "image": "neondatabase/build-tools:pinned-bookworm" },
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-new", "db_size": "10gb","runner": '"$runner_azure"', "image": "neondatabase/build-tools:pinned-bookworm" },
@@ -410,7 +411,7 @@ jobs:
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
if: contains(fromJson('["neonvm-captest-new", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
if: contains(fromJson('["neonvm-captest-new", "neonvm-captest-new-many-tables", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
@@ -429,7 +430,7 @@ jobs:
neonvm-captest-sharding-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_SHARDING_CONNSTR }}
;;
neonvm-captest-new | neonvm-captest-freetier | neonvm-azure-captest-new | neonvm-azure-captest-freetier)
neonvm-captest-new | neonvm-captest-new-many-tables | neonvm-captest-freetier | neonvm-azure-captest-new | neonvm-azure-captest-freetier)
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
;;
rds-aurora)
@@ -446,6 +447,26 @@ jobs:
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
# we want to compare Neon project OLTP throughput and latency at scale factor 10 GB
# without (neonvm-captest-new)
# and with (neonvm-captest-new-many-tables) many relations in the database
- name: Create many relations before the run
if: contains(fromJson('["neonvm-captest-new-many-tables"]'), matrix.platform)
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_perf_many_relations
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
TEST_NUM_RELATIONS: 10000
- name: Benchmark init
uses: ./.github/actions/run-python-test-set
with:

View File

@@ -1556,28 +1556,30 @@ RUN apt update && \
locales \
procps \
ca-certificates \
curl \
unzip \
$VERSION_INSTALLS && \
apt clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
# s5cmd 2.2.2 from https://github.com/peak/s5cmd/releases/tag/v2.2.2
# used by fast_import
# aws cli is used by fast_import (curl and unzip above are at this time only used for this installation step)
ARG TARGETARCH
ADD https://github.com/peak/s5cmd/releases/download/v2.2.2/s5cmd_2.2.2_linux_$TARGETARCH.deb /tmp/s5cmd.deb
RUN set -ex; \
\
# Determine the expected checksum based on TARGETARCH
if [ "${TARGETARCH}" = "amd64" ]; then \
CHECKSUM="392c385320cd5ffa435759a95af77c215553d967e4b1c0fffe52e4f14c29cf85"; \
TARGETARCH_ALT="x86_64"; \
CHECKSUM="c9a9df3770a3ff9259cb469b6179e02829687a464e0824d5c32d378820b53a00"; \
elif [ "${TARGETARCH}" = "arm64" ]; then \
CHECKSUM="939bee3cf4b5604ddb00e67f8c157b91d7c7a5b553d1fbb6890fad32894b7b46"; \
TARGETARCH_ALT="aarch64"; \
CHECKSUM="8181730be7891582b38b028112e81b4899ca817e8c616aad807c9e9d1289223a"; \
else \
echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \
fi; \
\
# Compute and validate the checksum
echo "${CHECKSUM} /tmp/s5cmd.deb" | sha256sum -c -
RUN dpkg -i /tmp/s5cmd.deb && rm /tmp/s5cmd.deb
curl -L "https://awscli.amazonaws.com/awscli-exe-linux-${TARGETARCH_ALT}-2.17.5.zip" -o /tmp/awscliv2.zip; \
echo "${CHECKSUM} /tmp/awscliv2.zip" | sha256sum -c -; \
unzip /tmp/awscliv2.zip -d /tmp/awscliv2; \
/tmp/awscliv2/aws/install; \
rm -rf /tmp/awscliv2.zip /tmp/awscliv2; \
true
ENV LANG=en_US.utf8
USER postgres

View File

@@ -1,8 +1,8 @@
-- NOTE: This is the "internal" / "machine-readable" version. This outputs the
-- working set size looking back 1..60 minutes, labeled with the number of
-- working set size looking back 1..180 minutes, labeled with the number of
-- minutes.
SELECT
x::text as duration_seconds,
neon.approximate_working_set_size_seconds(x) AS size
FROM (SELECT generate_series * 60 AS x FROM generate_series(1, 60)) AS t (x);
FROM (SELECT generate_series * 60 AS x FROM generate_series(1, 180)) AS t (x);

View File

@@ -4,5 +4,5 @@
SELECT
x AS duration,
neon.approximate_working_set_size_seconds(extract('epoch' FROM x::interval)::int) AS size FROM (
VALUES ('5m'), ('15m'), ('1h')
VALUES ('5m'), ('15m'), ('1h'), ('3h')
) AS t (x);

View File

@@ -34,12 +34,12 @@ use nix::unistd::Pid;
use tracing::{info, info_span, warn, Instrument};
use utils::fs_ext::is_directory_empty;
#[path = "fast_import/aws_s3_sync.rs"]
mod aws_s3_sync;
#[path = "fast_import/child_stdio_to_log.rs"]
mod child_stdio_to_log;
#[path = "fast_import/s3_uri.rs"]
mod s3_uri;
#[path = "fast_import/s5cmd.rs"]
mod s5cmd;
#[derive(clap::Parser)]
struct Args {
@@ -326,7 +326,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
}
info!("upload pgdata");
s5cmd::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/"))
aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
.await
.context("sync dump directory to destination")?;
@@ -334,10 +334,10 @@ pub(crate) async fn main() -> anyhow::Result<()> {
{
let status_dir = working_directory.join("status");
std::fs::create_dir(&status_dir).context("create status directory")?;
let status_file = status_dir.join("status");
let status_file = status_dir.join("pgdata");
std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
.context("write status file")?;
s5cmd::sync(&status_file, &s3_prefix.append("/status/pgdata"))
aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
.await
.context("sync status directory to destination")?;
}

View File

@@ -4,24 +4,21 @@ use camino::Utf8Path;
use super::s3_uri::S3Uri;
pub(crate) async fn sync(local: &Utf8Path, remote: &S3Uri) -> anyhow::Result<()> {
let mut builder = tokio::process::Command::new("s5cmd");
// s5cmd uses aws-sdk-go v1, hence doesn't support AWS_ENDPOINT_URL
if let Some(val) = std::env::var_os("AWS_ENDPOINT_URL") {
builder.arg("--endpoint-url").arg(val);
}
let mut builder = tokio::process::Command::new("aws");
builder
.arg("s3")
.arg("sync")
.arg(local.as_str())
.arg(remote.to_string());
let st = builder
.spawn()
.context("spawn s5cmd")?
.context("spawn aws s3 sync")?
.wait()
.await
.context("wait for s5cmd")?;
.context("wait for aws s3 sync")?;
if st.success() {
Ok(())
} else {
Err(anyhow::anyhow!("s5cmd failed"))
Err(anyhow::anyhow!("aws s3 sync failed"))
}
}

View File

@@ -19,6 +19,7 @@ use control_plane::storage_controller::{
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
};
use control_plane::{broker, local_env};
use nix::fcntl::{flock, FlockArg};
use pageserver_api::config::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
@@ -36,6 +37,8 @@ use safekeeper_api::{
};
use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
@@ -689,6 +692,21 @@ struct TimelineTreeEl {
pub children: BTreeSet<TimelineId>,
}
/// A flock-based guard over the neon_local repository directory
struct RepoLock {
_file: File,
}
impl RepoLock {
fn new() -> Result<Self> {
let repo_dir = File::open(local_env::base_path())?;
let repo_dir_fd = repo_dir.as_raw_fd();
flock(repo_dir_fd, FlockArg::LockExclusive)?;
Ok(Self { _file: repo_dir })
}
}
// Main entry point for the 'neon_local' CLI utility
//
// This utility helps to manage neon installation. That includes following:
@@ -700,9 +718,14 @@ fn main() -> Result<()> {
let cli = Cli::parse();
// Check for 'neon init' command first.
let subcommand_result = if let NeonLocalCmd::Init(args) = cli.command {
handle_init(&args).map(|env| Some(Cow::Owned(env)))
let (subcommand_result, _lock) = if let NeonLocalCmd::Init(args) = cli.command {
(handle_init(&args).map(|env| Some(Cow::Owned(env))), None)
} else {
// This tool uses a collection of simple files to store its state, and consequently
// it is not generally safe to run multiple commands concurrently. Rather than expect
// all callers to know this, use a lock file to protect against concurrent execution.
let _repo_lock = RepoLock::new().unwrap();
// all other commands need an existing config
let env = LocalEnv::load_config(&local_env::base_path()).context("Error loading config")?;
let original_env = env.clone();
@@ -728,11 +751,12 @@ fn main() -> Result<()> {
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
};
if &original_env != env {
let subcommand_result = if &original_env != env {
subcommand_result.map(|()| Some(Cow::Borrowed(env)))
} else {
subcommand_result.map(|()| None)
}
};
(subcommand_result, Some(_repo_lock))
};
match subcommand_result {
@@ -922,7 +946,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
} else {
// User (likely interactive) did not provide a description of the environment, give them the default
NeonLocalInitConf {
control_plane_api: Some(Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap())),
control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()),
broker: NeonBroker {
listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(),
},
@@ -1718,18 +1742,15 @@ async fn handle_start_all_impl(
broker::start_broker_process(env, &retry_timeout).await
});
// Only start the storage controller if the pageserver is configured to need it
if env.control_plane_api.is_some() {
js.spawn(async move {
let storage_controller = StorageController::from_env(env);
storage_controller
.start(NeonStorageControllerStartArgs::with_default_instance_id(
retry_timeout,
))
.await
.map_err(|e| e.context("start storage_controller"))
});
}
js.spawn(async move {
let storage_controller = StorageController::from_env(env);
storage_controller
.start(NeonStorageControllerStartArgs::with_default_instance_id(
retry_timeout,
))
.await
.map_err(|e| e.context("start storage_controller"))
});
for ps_conf in &env.pageservers {
js.spawn(async move {
@@ -1774,10 +1795,6 @@ async fn neon_start_status_check(
const RETRY_INTERVAL: Duration = Duration::from_millis(100);
const NOTICE_AFTER_RETRIES: Duration = Duration::from_secs(5);
if env.control_plane_api.is_none() {
return Ok(());
}
let storcon = StorageController::from_env(env);
let retries = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();

View File

@@ -316,6 +316,10 @@ impl Endpoint {
// and can cause errors like 'no unpinned buffers available', see
// <https://github.com/neondatabase/neon/issues/9956>
conf.append("shared_buffers", "1MB");
// Postgres defaults to effective_io_concurrency=1, which does not exercise the pageserver's
// batching logic. Set this to 2 so that we exercise the code a bit without letting
// individual tests do a lot of concurrent work on underpowered test machines
conf.append("effective_io_concurrency", "2");
conf.append("fsync", "off");
conf.append("max_connections", "100");
conf.append("wal_level", "logical");

View File

@@ -76,7 +76,7 @@ pub struct LocalEnv {
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
// be propagated into each pageserver's configuration.
pub control_plane_api: Option<Url>,
pub control_plane_api: Url,
// Control plane upcall API for storage controller. If set, this will be propagated into the
// storage controller's configuration.
@@ -133,7 +133,7 @@ pub struct NeonLocalInitConf {
pub storage_controller: Option<NeonStorageControllerConf>,
pub pageservers: Vec<NeonLocalInitPageserverConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub control_plane_api: Option<Option<Url>>,
pub control_plane_api: Option<Url>,
pub control_plane_compute_hook_api: Option<Option<Url>>,
}
@@ -180,7 +180,7 @@ impl NeonStorageControllerConf {
const DEFAULT_MAX_WARMING_UP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
// Very tight heartbeat interval to speed up tests
const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
const DEFAULT_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_millis(1000);
}
impl Default for NeonStorageControllerConf {
@@ -535,7 +535,7 @@ impl LocalEnv {
storage_controller,
pageservers,
safekeepers,
control_plane_api,
control_plane_api: control_plane_api.unwrap(),
control_plane_compute_hook_api,
branch_name_mappings,
}
@@ -638,7 +638,7 @@ impl LocalEnv {
storage_controller: self.storage_controller.clone(),
pageservers: vec![], // it's skip_serializing anyway
safekeepers: self.safekeepers.clone(),
control_plane_api: self.control_plane_api.clone(),
control_plane_api: Some(self.control_plane_api.clone()),
control_plane_compute_hook_api: self.control_plane_compute_hook_api.clone(),
branch_name_mappings: self.branch_name_mappings.clone(),
},
@@ -768,7 +768,7 @@ impl LocalEnv {
storage_controller: storage_controller.unwrap_or_default(),
pageservers: pageservers.iter().map(Into::into).collect(),
safekeepers,
control_plane_api: control_plane_api.unwrap_or_default(),
control_plane_api: control_plane_api.unwrap(),
control_plane_compute_hook_api: control_plane_compute_hook_api.unwrap_or_default(),
branch_name_mappings: Default::default(),
};

View File

@@ -95,21 +95,19 @@ impl PageServerNode {
let mut overrides = vec![pg_distrib_dir_param, broker_endpoint_param];
if let Some(control_plane_api) = &self.env.control_plane_api {
overrides.push(format!(
"control_plane_api='{}'",
control_plane_api.as_str()
));
overrides.push(format!(
"control_plane_api='{}'",
self.env.control_plane_api.as_str()
));
// Storage controller uses the same auth as pageserver: if JWT is enabled
// for us, we will also need it to talk to them.
if matches!(conf.http_auth_type, AuthType::NeonJWT) {
let jwt_token = self
.env
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
.unwrap();
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
}
// Storage controller uses the same auth as pageserver: if JWT is enabled
// for us, we will also need it to talk to them.
if matches!(conf.http_auth_type, AuthType::NeonJWT) {
let jwt_token = self
.env
.generate_auth_token(&Claims::new(None, Scope::GenerationsApi))
.unwrap();
overrides.push(format!("control_plane_api_token='{}'", jwt_token));
}
if !conf.other.contains_key("remote_storage") {

View File

@@ -338,7 +338,7 @@ impl StorageController {
.port(),
)
} else {
let listen_url = self.env.control_plane_api.clone().unwrap();
let listen_url = self.env.control_plane_api.clone();
let listen = format!(
"{}:{}",
@@ -708,7 +708,7 @@ impl StorageController {
} else {
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
// for general purpose API access.
let listen_url = self.env.control_plane_api.clone().unwrap();
let listen_url = self.env.control_plane_api.clone();
Url::from_str(&format!(
"http://{}:{}/{path}",
listen_url.host_str().unwrap(),

View File

@@ -5,7 +5,8 @@ use clap::{Parser, Subcommand};
use pageserver_api::{
controller_api::{
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
ShardSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
SafekeeperDescribeResponse, ShardSchedulingPolicy, TenantCreateRequest,
TenantDescribeResponse, TenantPolicyRequest,
},
models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
@@ -211,6 +212,8 @@ enum Command {
#[arg(long)]
timeout: humantime::Duration,
},
/// List safekeepers known to the storage controller
Safekeepers {},
}
#[derive(Parser)]
@@ -1020,6 +1023,31 @@ async fn main() -> anyhow::Result<()> {
"Fill was cancelled for node {node_id}. Schedulling policy is now {final_policy:?}"
);
}
Command::Safekeepers {} => {
let mut resp = storcon_client
.dispatch::<(), Vec<SafekeeperDescribeResponse>>(
Method::GET,
"control/v1/safekeeper".to_string(),
None,
)
.await?;
resp.sort_by(|a, b| a.id.cmp(&b.id));
let mut table = comfy_table::Table::new();
table.set_header(["Id", "Version", "Host", "Port", "Http Port", "AZ Id"]);
for sk in resp {
table.add_row([
format!("{}", sk.id),
format!("{}", sk.version),
sk.host,
format!("{}", sk.port),
format!("{}", sk.http_port),
sk.availability_zone_id.to_string(),
]);
}
println!("{table}");
}
}
Ok(())

View File

@@ -372,6 +372,23 @@ pub struct MetadataHealthListOutdatedResponse {
pub health_records: Vec<MetadataHealthRecord>,
}
/// Publicly exposed safekeeper description
///
/// The `active` flag which we have in the DB is not included on purpose: it is deprecated.
#[derive(Serialize, Deserialize, Clone)]
pub struct SafekeeperDescribeResponse {
pub id: NodeId,
pub region_id: String,
/// 1 is special, it means just created (not currently posted to storcon).
/// Zero or negative is not really expected.
/// Otherwise the number from `release-$(number_of_commits_on_branch)` tag.
pub version: i64,
pub host: String,
pub port: i32,
pub http_port: i32,
pub availability_zone_id: String,
}
#[cfg(test)]
mod test {
use super::*;

View File

@@ -6,6 +6,7 @@ pub mod utilization;
use camino::Utf8PathBuf;
pub use utilization::PageserverUtilization;
use core::ops::Range;
use std::{
collections::HashMap,
fmt::Display,
@@ -28,6 +29,7 @@ use utils::{
};
use crate::{
key::Key,
reltag::RelTag,
shard::{ShardCount, ShardStripeSize, TenantShardId},
};
@@ -210,6 +212,68 @@ pub enum TimelineState {
Broken { reason: String, backtrace: String },
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct CompactLsnRange {
pub start: Lsn,
pub end: Lsn,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct CompactKeyRange {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub start: Key,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub end: Key,
}
impl From<Range<Lsn>> for CompactLsnRange {
fn from(range: Range<Lsn>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}
impl From<Range<Key>> for CompactKeyRange {
fn from(range: Range<Key>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}
impl From<CompactLsnRange> for Range<Lsn> {
fn from(range: CompactLsnRange) -> Self {
range.start..range.end
}
}
impl From<CompactKeyRange> for Range<Key> {
fn from(range: CompactKeyRange) -> Self {
range.start..range.end
}
}
impl CompactLsnRange {
pub fn above(lsn: Lsn) -> Self {
Self {
start: lsn,
end: Lsn::MAX,
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct CompactInfoResponse {
pub compact_key_range: Option<CompactKeyRange>,
pub compact_lsn_range: Option<CompactLsnRange>,
pub sub_compaction: bool,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateRequest {
pub new_timeline_id: TimelineId,

View File

@@ -106,11 +106,11 @@ impl<R: RecordGenerator> WalGenerator<R> {
const TIMELINE_ID: u32 = 1;
/// Creates a new WAL generator with the given record generator.
pub fn new(record_generator: R) -> WalGenerator<R> {
pub fn new(record_generator: R, start_lsn: Lsn) -> WalGenerator<R> {
Self {
record_generator,
lsn: Lsn(0),
prev_lsn: Lsn(0),
lsn: start_lsn,
prev_lsn: start_lsn,
}
}

View File

@@ -97,8 +97,8 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
use crate::DEFAULT_PG_VERSION;
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
TimelineInfo,
CompactInfoResponse, StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest,
TimelineGcRequest, TimelineInfo,
};
use utils::{
auth::SwappableJwtAuth,
@@ -2039,6 +2039,34 @@ async fn timeline_cancel_compact_handler(
.await
}
// Get compact info of a timeline
async fn timeline_compact_info_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let res = tenant.get_scheduled_compaction_tasks(timeline_id);
let mut resp = Vec::new();
for item in res {
resp.push(CompactInfoResponse {
compact_key_range: item.compact_key_range,
compact_lsn_range: item.compact_lsn_range,
sub_compaction: item.sub_compaction,
});
}
json_response(StatusCode::OK, resp)
}
.instrument(info_span!("timeline_compact_info", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
}
// Run compaction immediately on given timeline.
async fn timeline_compact_handler(
mut request: Request<Body>,
@@ -3400,6 +3428,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc",
|r| api_handler(r, timeline_gc_handler),
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_compact_info_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_compact_handler),

View File

@@ -3122,6 +3122,23 @@ impl Tenant {
}
}
pub(crate) fn get_scheduled_compaction_tasks(
&self,
timeline_id: TimelineId,
) -> Vec<CompactOptions> {
use itertools::Itertools;
let guard = self.scheduled_compaction_tasks.lock().unwrap();
guard
.get(&timeline_id)
.map(|tline_pending_tasks| {
tline_pending_tasks
.iter()
.map(|x| x.options.clone())
.collect_vec()
})
.unwrap_or_default()
}
/// Schedule a compaction task for a timeline.
pub(crate) async fn schedule_compaction(
&self,
@@ -5759,13 +5776,13 @@ mod tests {
use timeline::{CompactOptions, DeltaLayerTestDesc};
use utils::id::TenantId;
#[cfg(feature = "testing")]
use models::CompactLsnRange;
#[cfg(feature = "testing")]
use pageserver_api::record::NeonWalRecord;
#[cfg(feature = "testing")]
use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn};
#[cfg(feature = "testing")]
use timeline::CompactLsnRange;
#[cfg(feature = "testing")]
use timeline::GcInfo;
static TEST_KEY: Lazy<Key> =
@@ -9634,7 +9651,7 @@ mod tests {
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> {
use timeline::CompactLsnRange;
use models::CompactLsnRange;
let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?;
let (tenant, ctx) = harness.load().await;

View File

@@ -1,12 +1,15 @@
use std::collections::BTreeSet;
use itertools::Itertools;
use pageserver_compaction::helpers::overlaps_with;
use super::storage_layer::LayerName;
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
///
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
/// The function implements a fast path check and a slow path check.
///
/// The fast path checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
///
/// ```plain
/// | | | |
@@ -25,31 +28,47 @@ use super::storage_layer::LayerName;
/// | | | 4 | | |
///
/// If layer 2 and 4 contain the same single key, this is also a valid layer map.
///
/// However, if a partial compaction is still going on, it is possible that we get a layer map not satisfying the above condition.
/// Therefore, we fallback to simply check if any of the two delta layers overlap. (See "A slow path...")
pub fn check_valid_layermap(metadata: &[LayerName]) -> Option<String> {
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
let mut all_delta_layers = Vec::new();
for name in metadata {
if let LayerName::Delta(layer) = name {
if layer.key_range.start.next() != layer.key_range.end {
all_delta_layers.push(layer.clone());
}
all_delta_layers.push(layer.clone());
}
}
for layer in &all_delta_layers {
let lsn_range = &layer.lsn_range;
lsn_split_point.insert(lsn_range.start);
lsn_split_point.insert(lsn_range.end);
if layer.key_range.start.next() != layer.key_range.end {
let lsn_range = &layer.lsn_range;
lsn_split_point.insert(lsn_range.start);
lsn_split_point.insert(lsn_range.end);
}
}
for layer in &all_delta_layers {
for (idx, layer) in all_delta_layers.iter().enumerate() {
if layer.key_range.start.next() == layer.key_range.end {
continue;
}
let lsn_range = layer.lsn_range.clone();
let intersects = lsn_split_point.range(lsn_range).collect_vec();
if intersects.len() > 1 {
let err = format!(
"layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
layer,
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
);
return Some(err);
// A slow path to check if the layer intersects with any other delta layer.
for (other_idx, other_layer) in all_delta_layers.iter().enumerate() {
if other_idx == idx {
// do not check self intersects with self
continue;
}
if overlaps_with(&layer.lsn_range, &other_layer.lsn_range)
&& overlaps_with(&layer.key_range, &other_layer.key_range)
{
let err = format!(
"layer violates the layer map LSN split assumption: layer {} intersects with layer {}",
layer, other_layer
);
return Some(err);
}
}
}
}
None

View File

@@ -31,9 +31,9 @@ use pageserver_api::{
},
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
models::{
CompactionAlgorithm, CompactionAlgorithmSettings, DownloadRemoteLayersTaskInfo,
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
LsnLease, TimelineState,
CompactKeyRange, CompactLsnRange, CompactionAlgorithm, CompactionAlgorithmSettings,
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, ShardNumber, TenantShardId},
@@ -788,63 +788,6 @@ pub(crate) struct CompactRequest {
pub sub_compaction_max_job_size_mb: Option<u64>,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactLsnRange {
pub start: Lsn,
pub end: Lsn,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactKeyRange {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub start: Key,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub end: Key,
}
impl From<Range<Lsn>> for CompactLsnRange {
fn from(range: Range<Lsn>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}
impl From<Range<Key>> for CompactKeyRange {
fn from(range: Range<Key>) -> Self {
Self {
start: range.start,
end: range.end,
}
}
}
impl From<CompactLsnRange> for Range<Lsn> {
fn from(range: CompactLsnRange) -> Self {
range.start..range.end
}
}
impl From<CompactKeyRange> for Range<Key> {
fn from(range: CompactKeyRange) -> Self {
range.start..range.end
}
}
impl CompactLsnRange {
#[cfg(test)]
#[cfg(feature = "testing")]
pub fn above(lsn: Lsn) -> Self {
Self {
start: lsn,
end: Lsn::MAX,
}
}
}
#[derive(Debug, Clone, Default)]
pub(crate) struct CompactOptions {
pub flags: EnumSet<CompactFlags>,

View File

@@ -29,6 +29,7 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::batch_split_writer::{
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
@@ -1823,7 +1824,7 @@ impl Timeline {
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
let ((dense_ks, sparse_ks), _) = {
let Ok(partition) = self.partitioning.try_lock() else {
bail!("failed to acquire partition lock");
bail!("failed to acquire partition lock during gc-compaction");
};
partition.clone()
};
@@ -2156,15 +2157,14 @@ impl Timeline {
// Step 1: construct a k-merge iterator over all layers.
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
// disable the check for now because we need to adjust the check for partial compactions, will enable later.
// let layer_names = job_desc
// .selected_layers
// .iter()
// .map(|layer| layer.layer_desc().layer_name())
// .collect_vec();
// if let Some(err) = check_valid_layermap(&layer_names) {
// warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
// }
let layer_names = job_desc
.selected_layers
.iter()
.map(|layer| layer.layer_desc().layer_name())
.collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
bail!("gc-compaction layer map check failed because {}, cannot proceed with compaction due to potential data loss", err);
}
// The maximum LSN we are processing in this compaction loop
let end_lsn = job_desc
.selected_layers
@@ -2546,13 +2546,48 @@ impl Timeline {
);
// Step 3: Place back to the layer map.
// First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
let all_layers = {
let guard = self.layers.read().await;
let layer_map = guard.layer_map()?;
layer_map.iter_historic_layers().collect_vec()
};
let mut final_layers = all_layers
.iter()
.map(|layer| layer.layer_name())
.collect::<HashSet<_>>();
for layer in &layer_selection {
final_layers.remove(&layer.layer_desc().layer_name());
}
for layer in &compact_to {
final_layers.insert(layer.layer_desc().layer_name());
}
let final_layers = final_layers.into_iter().collect_vec();
// TODO: move this check before we call `finish` on image layer writers. However, this will require us to get the layer name before we finish
// the writer, so potentially, we will need a function like `ImageLayerBatchWriter::get_all_pending_layer_keys` to get all the keys that are
// in the writer before finalizing the persistent layers. Now we would leave some dangling layers on the disk if the check fails.
if let Some(err) = check_valid_layermap(&final_layers) {
bail!("gc-compaction layer map check failed after compaction because {}, compaction result not applied to the layer map due to potential data loss", err);
}
// Between the sanity check and this compaction update, there could be new layers being flushed, but it should be fine because we only
// operate on L1 layers.
{
// TODO: sanity check if the layer map is valid (i.e., should not have overlaps)
let mut guard = self.layers.write().await;
guard
.open_mut()?
.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
};
// Schedule an index-only upload to update the `latest_gc_cutoff` in the index_part.json.
// Otherwise, after restart, the index_part only contains the old `latest_gc_cutoff` and
// find_gc_cutoffs will try accessing things below the cutoff. TODO: ideally, this should
// be batched into `schedule_compaction_update`.
let disk_consistent_lsn = self.disk_consistent_lsn.load();
self.schedule_uploads(disk_consistent_lsn, None)?;
self.remote_client
.schedule_compaction_update(&layer_selection, &compact_to)?;

View File

@@ -827,7 +827,6 @@ pageserver_send(shardno_t shard_no, NeonRequest *request)
{
while (!pageserver_connect(shard_no, shard->n_reconnect_attempts < max_reconnect_attempts ? LOG : ERROR))
{
HandleMainLoopInterrupts();
shard->n_reconnect_attempts += 1;
}
shard->n_reconnect_attempts = 0;

View File

@@ -678,6 +678,9 @@ mod tests {
.await
.unwrap();
// flush the final server message
stream.flush().await.unwrap();
handle.await.unwrap();
}

View File

@@ -271,7 +271,6 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
Ok(Box::leak(Box::new(ProxyConfig {
tls_config: None,
metric_collection: None,
allow_self_signed_compute: false,
http_config,
authentication_config: AuthenticationConfig {
jwks_cache: JwkCache::default(),

View File

@@ -129,9 +129,6 @@ struct ProxyCliArgs {
/// lock for `connect_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable).
#[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK)]
connect_compute_lock: String,
/// Allow self-signed certificates for compute nodes (for testing)
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
allow_self_signed_compute: bool,
#[clap(flatten)]
sql_over_http: SqlOverHttpArgs,
/// timeout for scram authentication protocol
@@ -564,9 +561,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
_ => bail!("either both or neither tls-key and tls-cert must be specified"),
};
if args.allow_self_signed_compute {
warn!("allowing self-signed compute certificates");
}
let backup_metric_collection_config = config::MetricBackupCollectionConfig {
interval: args.metric_backup_collection_interval,
remote_storage_config: args.metric_backup_collection_remote_storage.clone(),
@@ -641,7 +635,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
let config = ProxyConfig {
tls_config,
metric_collection,
allow_self_signed_compute: args.allow_self_signed_compute,
http_config,
authentication_config,
proxy_protocol_v2: args.proxy_protocol_v2,

View File

@@ -4,7 +4,8 @@ use std::sync::Arc;
use dashmap::DashMap;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use once_cell::sync::OnceCell;
use postgres_client::{tls::MakeTlsConnect, CancelToken};
use postgres_client::tls::MakeTlsConnect;
use postgres_client::CancelToken;
use pq_proto::CancelKeyData;
use rustls::crypto::ring;
use thiserror::Error;
@@ -14,17 +15,16 @@ use tracing::{debug, info};
use uuid::Uuid;
use crate::auth::{check_peer_addr_is_in_list, IpPattern};
use crate::compute::load_certs;
use crate::error::ReportableError;
use crate::ext::LockExt;
use crate::metrics::{CancellationRequest, CancellationSource, Metrics};
use crate::postgres_rustls::MakeRustlsConnect;
use crate::rate_limiter::LeakyBucketRateLimiter;
use crate::redis::cancellation_publisher::{
CancellationPublisher, CancellationPublisherMut, RedisPublisherClient,
};
use crate::compute::{load_certs, AcceptEverythingVerifier};
use crate::postgres_rustls::MakeRustlsConnect;
pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;
pub type CancellationHandlerMain = CancellationHandler<Option<Arc<Mutex<RedisPublisherClient>>>>;
pub(crate) type CancellationHandlerMainInternal = Option<Arc<Mutex<RedisPublisherClient>>>;
@@ -240,7 +240,6 @@ pub struct CancelClosure {
cancel_token: CancelToken,
ip_allowlist: Vec<IpPattern>,
hostname: String, // for pg_sni router
allow_self_signed_compute: bool,
}
impl CancelClosure {
@@ -249,45 +248,34 @@ impl CancelClosure {
cancel_token: CancelToken,
ip_allowlist: Vec<IpPattern>,
hostname: String,
allow_self_signed_compute: bool,
) -> Self {
Self {
socket_addr,
cancel_token,
ip_allowlist,
hostname,
allow_self_signed_compute,
}
}
/// Cancels the query running on user's compute node.
pub(crate) async fn try_cancel_query(self) -> Result<(), CancelError> {
let socket = TcpStream::connect(self.socket_addr).await?;
let client_config = if self.allow_self_signed_compute {
// Allow all certificates for creating the connection. Used only for tests
let verifier = Arc::new(AcceptEverythingVerifier);
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.dangerous()
.with_custom_certificate_verifier(verifier)
} else {
let root_store = TLS_ROOTS
.get_or_try_init(load_certs)
.map_err(|_e| {
CancelError::IO(std::io::Error::new(
std::io::ErrorKind::Other,
"TLS root store initialization failed".to_string(),
))
})?
.clone();
let root_store = TLS_ROOTS
.get_or_try_init(load_certs)
.map_err(|_e| {
CancelError::IO(std::io::Error::new(
std::io::ErrorKind::Other,
"TLS root store initialization failed".to_string(),
))
})?
.clone();
let client_config =
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.with_root_certificates(root_store)
};
let client_config = client_config.with_no_client_auth();
.with_no_client_auth();
let mut mk_tls = crate::postgres_rustls::MakeRustlsConnect::new(client_config);
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(

View File

@@ -10,7 +10,6 @@ use postgres_client::tls::MakeTlsConnect;
use postgres_client::{CancelToken, RawConnection};
use postgres_protocol::message::backend::NoticeResponseBody;
use pq_proto::StartupMessageParams;
use rustls::client::danger::ServerCertVerifier;
use rustls::crypto::ring;
use rustls::pki_types::InvalidDnsNameError;
use thiserror::Error;
@@ -251,7 +250,6 @@ impl ConnCfg {
pub(crate) async fn connect(
&self,
ctx: &RequestContext,
allow_self_signed_compute: bool,
aux: MetricsAuxInfo,
timeout: Duration,
) -> Result<PostgresConnection, ConnectionError> {
@@ -259,25 +257,17 @@ impl ConnCfg {
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
drop(pause);
let client_config = if allow_self_signed_compute {
// Allow all certificates for creating the connection
let verifier = Arc::new(AcceptEverythingVerifier);
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.dangerous()
.with_custom_certificate_verifier(verifier)
} else {
let root_store = TLS_ROOTS
.get_or_try_init(load_certs)
.map_err(ConnectionError::TlsCertificateError)?
.clone();
let root_store = TLS_ROOTS
.get_or_try_init(load_certs)
.map_err(ConnectionError::TlsCertificateError)?
.clone();
let client_config =
rustls::ClientConfig::builder_with_provider(Arc::new(ring::default_provider()))
.with_safe_default_protocol_versions()
.expect("ring should support the default protocol versions")
.with_root_certificates(root_store)
};
let client_config = client_config.with_no_client_auth();
.with_no_client_auth();
let mut mk_tls = crate::postgres_rustls::MakeRustlsConnect::new(client_config);
let tls = <MakeRustlsConnect as MakeTlsConnect<tokio::net::TcpStream>>::make_tls_connect(
@@ -320,7 +310,6 @@ impl ConnCfg {
},
vec![],
host.to_string(),
allow_self_signed_compute,
);
let connection = PostgresConnection {
@@ -365,50 +354,6 @@ pub(crate) fn load_certs() -> Result<Arc<rustls::RootCertStore>, Vec<rustls_nati
}
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
#[derive(Debug)]
pub(crate) struct AcceptEverythingVerifier;
impl ServerCertVerifier for AcceptEverythingVerifier {
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
use rustls::SignatureScheme;
// The schemes for which `SignatureScheme::supported_in_tls13` returns true.
vec![
SignatureScheme::ECDSA_NISTP521_SHA512,
SignatureScheme::ECDSA_NISTP384_SHA384,
SignatureScheme::ECDSA_NISTP256_SHA256,
SignatureScheme::RSA_PSS_SHA512,
SignatureScheme::RSA_PSS_SHA384,
SignatureScheme::RSA_PSS_SHA256,
SignatureScheme::ED25519,
]
}
fn verify_server_cert(
&self,
_end_entity: &rustls::pki_types::CertificateDer<'_>,
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp_response: &[u8],
_now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -25,7 +25,6 @@ use crate::types::Host;
pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
pub metric_collection: Option<MetricCollectionConfig>,
pub allow_self_signed_compute: bool,
pub http_config: HttpConfig,
pub authentication_config: AuthenticationConfig,
pub proxy_protocol_v2: ProxyProtocolV2,

View File

@@ -213,7 +213,6 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
params_compat: true,
params: &params,
locks: &config.connect_compute_locks,
allow_self_signed_compute: config.allow_self_signed_compute,
},
&user_info,
config.wake_compute_retry_config,

View File

@@ -73,12 +73,9 @@ impl NodeInfo {
pub(crate) async fn connect(
&self,
ctx: &RequestContext,
allow_self_signed_compute: bool,
timeout: Duration,
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
self.config
.connect(ctx, allow_self_signed_compute, self.aux.clone(), timeout)
.await
self.config.connect(ctx, self.aux.clone(), timeout).await
}
pub(crate) fn reuse_settings(&mut self, other: Self) {

View File

@@ -73,9 +73,6 @@ pub(crate) struct TcpMechanism<'a> {
/// connect_to_compute concurrency lock
pub(crate) locks: &'static ApiLocks<Host>,
/// Whether we should accept self-signed certificates (for testing)
pub(crate) allow_self_signed_compute: bool,
}
#[async_trait]
@@ -93,11 +90,7 @@ impl ConnectMechanism for TcpMechanism<'_> {
) -> Result<PostgresConnection, Self::Error> {
let host = node_info.config.get_host();
let permit = self.locks.get_permit(&host).await?;
permit.release_result(
node_info
.connect(ctx, self.allow_self_signed_compute, timeout)
.await,
)
permit.release_result(node_info.connect(ctx, timeout).await)
}
fn update_connect_config(&self, config: &mut compute::ConnCfg) {

View File

@@ -348,8 +348,6 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
params_compat,
params: &params,
locks: &config.connect_compute_locks,
// only used for console redirect testing.
allow_self_signed_compute: false,
},
&user_info,
config.wake_compute_retry_config,

View File

@@ -50,6 +50,12 @@ impl<S: AsyncWrite + Unpin> SaslStream<'_, S> {
self.stream.write_message(&msg.to_reply()).await?;
Ok(())
}
// Queue a SASL message for the client.
fn send_noflush(&mut self, msg: &ServerMessage<&str>) -> io::Result<()> {
self.stream.write_message_noflush(&msg.to_reply())?;
Ok(())
}
}
/// SASL authentication outcome.
@@ -85,7 +91,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> SaslStream<'_, S> {
continue;
}
Step::Success(result, reply) => {
self.send(&ServerMessage::Final(&reply)).await?;
self.send_noflush(&ServerMessage::Final(&reply))?;
Outcome::Success(result)
}
Step::Failure(reason) => Outcome::Failure(reason),

View File

@@ -9,6 +9,7 @@ default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
benchmarking = []
[dependencies]
async-stream.workspace = true
@@ -77,3 +78,4 @@ tracing-subscriber = { workspace = true, features = ["json"] }
[[bench]]
name = "receive_wal"
harness = false
required-features = ["benchmarking"]

View File

@@ -1,11 +1,7 @@
//! WAL ingestion benchmarks.
#[path = "benchutils.rs"]
mod benchutils;
use std::io::Write as _;
use benchutils::Env;
use bytes::BytesMut;
use camino_tempfile::tempfile;
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion};
@@ -16,6 +12,7 @@ use safekeeper::receive_wal::{self, WalAcceptor};
use safekeeper::safekeeper::{
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
};
use safekeeper::test_utils::Env;
use tokio::io::AsyncWriteExt as _;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
@@ -76,12 +73,15 @@ fn bench_process_msg(c: &mut Criterion) {
assert!(size >= prefixlen);
let message = vec![0; size - prefixlen];
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));
// Set up the Safekeeper.
let env = Env::new(fsync)?;
let mut safekeeper =
runtime.block_on(env.make_safekeeper(NodeId(1), TenantTimelineId::generate()))?;
let mut safekeeper = runtime.block_on(env.make_safekeeper(
NodeId(1),
TenantTimelineId::generate(),
Lsn(0),
))?;
b.iter_batched_ref(
// Pre-construct WAL records and requests. Criterion will batch them.
@@ -134,7 +134,8 @@ fn bench_wal_acceptor(c: &mut Criterion) {
let runtime = tokio::runtime::Runtime::new()?; // needs multithreaded
let env = Env::new(fsync)?;
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"));
let walgen =
&mut WalGenerator::new(LogicalMessageGenerator::new(c"prefix", b"message"), Lsn(0));
// Create buffered channels that can fit all requests, to avoid blocking on channels.
let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(n);
@@ -145,7 +146,7 @@ fn bench_wal_acceptor(c: &mut Criterion) {
// TODO: WalAcceptor doesn't actually need a full timeline, only
// Safekeeper::process_msg(). Consider decoupling them to simplify the setup.
let tli = env
.make_timeline(NodeId(1), TenantTimelineId::generate())
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
.await?
.wal_residence_guard()
.await?;
@@ -239,7 +240,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
assert!(size >= prefixlen);
let message = vec![0; size - prefixlen];
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message));
let walgen = &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), Lsn(0));
// Construct and spawn the WalAcceptor task.
let env = Env::new(fsync)?;
@@ -249,7 +250,7 @@ fn bench_wal_acceptor_throughput(c: &mut Criterion) {
runtime.block_on(async {
let tli = env
.make_timeline(NodeId(1), TenantTimelineId::generate())
.make_timeline(NodeId(1), TenantTimelineId::generate(), Lsn(0))
.await?
.wal_residence_guard()
.await?;

View File

@@ -564,7 +564,7 @@ pub fn make_router(
if conf.http_auth.is_some() {
router = router.middleware(auth_middleware(|request| {
const ALLOWLIST_ROUTES: &[&str] =
&["/v1/status", "/metrics", "/profile/cpu", "profile/heap"];
&["/v1/status", "/metrics", "/profile/cpu", "/profile/heap"];
if ALLOWLIST_ROUTES.contains(&request.uri().path()) {
None
} else {

View File

@@ -43,6 +43,9 @@ pub mod wal_reader_stream;
pub mod wal_service;
pub mod wal_storage;
#[cfg(any(test, feature = "benchmarking"))]
pub mod test_utils;
mod timelines_global_map;
use std::sync::Arc;
pub use timelines_global_map::GlobalTimelines;

View File

@@ -94,9 +94,14 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
}
}
let max_next_record_lsn = match max_next_record_lsn {
Some(lsn) => lsn,
None => { continue; }
};
let batch = InterpretedWalRecords {
records,
next_record_lsn: max_next_record_lsn
next_record_lsn: Some(max_next_record_lsn),
};
tx.send(Batch {wal_end_lsn, available_wal_end_lsn, records: batch}).await.unwrap();

View File

@@ -1,18 +1,18 @@
use std::sync::Arc;
use crate::rate_limit::RateLimiter;
use crate::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
use crate::state::{TimelinePersistentState, TimelineState};
use crate::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::remote_timeline_path;
use crate::{control_file, wal_storage, SafeKeeperConf};
use camino_tempfile::Utf8TempDir;
use safekeeper::rate_limit::RateLimiter;
use safekeeper::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
use safekeeper::state::{TimelinePersistentState, TimelineState};
use safekeeper::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
use safekeeper::timelines_set::TimelinesSet;
use safekeeper::wal_backup::remote_timeline_path;
use safekeeper::{control_file, wal_storage, SafeKeeperConf};
use tokio::fs::create_dir_all;
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
/// A Safekeeper benchmarking environment. Uses a tempdir for storage, removed on drop.
/// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
pub struct Env {
/// Whether to enable fsync.
pub fsync: bool,
@@ -21,7 +21,7 @@ pub struct Env {
}
impl Env {
/// Creates a new benchmarking environment in a temporary directory. fsync controls whether to
/// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
/// enable fsyncing.
pub fn new(fsync: bool) -> anyhow::Result<Self> {
let tempdir = camino_tempfile::tempdir()?;
@@ -47,6 +47,7 @@ impl Env {
&self,
node_id: NodeId,
ttid: TenantTimelineId,
start_lsn: Lsn,
) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
let conf = self.make_conf(node_id);
@@ -67,9 +68,9 @@ impl Env {
safekeeper
.process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
term: 1,
start_streaming_at: Lsn(0),
term_history: TermHistory(vec![(1, Lsn(0)).into()]),
timeline_start_lsn: Lsn(0),
start_streaming_at: start_lsn,
term_history: TermHistory(vec![(1, start_lsn).into()]),
timeline_start_lsn: start_lsn,
}))
.await?;
@@ -82,12 +83,13 @@ impl Env {
&self,
node_id: NodeId,
ttid: TenantTimelineId,
start_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
let conf = Arc::new(self.make_conf(node_id));
let timeline_dir = get_timeline_dir(&conf, &ttid);
let remote_path = remote_timeline_path(&ttid)?;
let safekeeper = self.make_safekeeper(node_id, ttid).await?;
let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
let timeline = Timeline::new(

View File

@@ -18,7 +18,7 @@ impl DiskWalProposer {
internal_available_lsn: Lsn(0),
prev_lsn: Lsn(0),
disk: BlockStorage::new(),
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[])),
wal_generator: WalGenerator::new(LogicalMessageGenerator::new(c"", &[]), Lsn(0)),
}),
})
}

View File

@@ -11,6 +11,7 @@ use diesel::Connection;
use itertools::Itertools;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::controller_api::MetadataHealthRecord;
use pageserver_api::controller_api::SafekeeperDescribeResponse;
use pageserver_api::controller_api::ShardSchedulingPolicy;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
use pageserver_api::models::TenantConfig;
@@ -1241,6 +1242,18 @@ impl SafekeeperPersistence {
availability_zone_id: &self.availability_zone_id,
}
}
pub(crate) fn as_describe_response(&self) -> SafekeeperDescribeResponse {
// omit the `active` flag on purpose: it is deprecated.
SafekeeperDescribeResponse {
id: NodeId(self.id as u64),
region_id: self.region_id.clone(),
version: self.version,
host: self.host.clone(),
port: self.port,
http_port: self.http_port,
availability_zone_id: self.availability_zone_id.clone(),
}
}
}
#[derive(Insertable, AsChangeset)]

View File

@@ -46,10 +46,11 @@ use pageserver_api::{
controller_api::{
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy,
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard,
TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse,
TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse,
SafekeeperDescribeResponse, ShardSchedulingPolicy, ShardsPreferredAzsRequest,
ShardsPreferredAzsResponse, TenantCreateRequest, TenantCreateResponse,
TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard,
TenantLocateResponse, TenantPolicyRequest, TenantShardMigrateRequest,
TenantShardMigrateResponse,
},
models::{
SecondaryProgress, TenantConfigPatchRequest, TenantConfigRequest,
@@ -7169,15 +7170,24 @@ impl Service {
pub(crate) async fn safekeepers_list(
&self,
) -> Result<Vec<crate::persistence::SafekeeperPersistence>, DatabaseError> {
self.persistence.list_safekeepers().await
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
Ok(self
.persistence
.list_safekeepers()
.await?
.into_iter()
.map(|v| v.as_describe_response())
.collect::<Vec<_>>())
}
pub(crate) async fn get_safekeeper(
&self,
id: i64,
) -> Result<crate::persistence::SafekeeperPersistence, DatabaseError> {
self.persistence.safekeeper_get(id).await
) -> Result<SafekeeperDescribeResponse, DatabaseError> {
self.persistence
.safekeeper_get(id)
.await
.map(|v| v.as_describe_response())
}
pub(crate) async fn upsert_safekeeper(

View File

@@ -310,7 +310,7 @@ pub(crate) enum BlobDataParseResult {
index_part_generation: Generation,
s3_layers: HashSet<(LayerName, Generation)>,
},
/// The remains of a deleted Timeline (i.e. an initdb archive only)
/// The remains of an uncleanly deleted Timeline or aborted timeline creation(e.g. an initdb archive only, or some layer without an index)
Relic,
Incorrect {
errors: Vec<String>,
@@ -346,7 +346,7 @@ pub(crate) async fn list_timeline_blobs(
match res {
ListTimelineBlobsResult::Ready(data) => Ok(data),
ListTimelineBlobsResult::MissingIndexPart(_) => {
// Retry if index is missing.
// Retry if listing raced with removal of an index
let data = list_timeline_blobs_impl(remote_client, id, root_target)
.await?
.into_data();
@@ -358,7 +358,7 @@ pub(crate) async fn list_timeline_blobs(
enum ListTimelineBlobsResult {
/// Blob data is ready to be intepreted.
Ready(RemoteTimelineBlobData),
/// List timeline blobs has layer files but is missing [`IndexPart`].
/// The listing contained an index but when we tried to fetch it, we couldn't
MissingIndexPart(RemoteTimelineBlobData),
}
@@ -467,19 +467,19 @@ async fn list_timeline_blobs_impl(
match index_part_object.as_ref() {
Some(selected) => index_part_keys.retain(|k| k != selected),
None => {
// It is possible that the branch gets deleted after we got some layer files listed
// and we no longer have the index file in the listing.
errors.push(
// This case does not indicate corruption, but it should be very unusual. It can
// happen if:
// - timeline creation is in progress (first layer is written before index is written)
// - timeline deletion happened while a stale pageserver was still attached, it might upload
// a layer after the deletion is done.
tracing::info!(
"S3 list response got no index_part.json file but still has layer files"
.to_string(),
);
return Ok(ListTimelineBlobsResult::MissingIndexPart(
RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Incorrect { errors, s3_layers },
unused_index_keys: index_part_keys,
unknown_keys,
},
));
return Ok(ListTimelineBlobsResult::Ready(RemoteTimelineBlobData {
blob_data: BlobDataParseResult::Relic,
unused_index_keys: index_part_keys,
unknown_keys,
}));
}
}

View File

@@ -3222,7 +3222,6 @@ class NeonProxy(PgProtocol):
# Link auth backend params
*["--auth-backend", "link"],
*["--uri", NeonProxy.link_auth_uri],
*["--allow-self-signed-compute", "true"],
]
class ProxyV1(AuthBackend):

View File

@@ -738,6 +738,18 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
res_json = res.json()
assert res_json is None
def timeline_compact_info(
self,
tenant_id: TenantId | TenantShardId,
timeline_id: TimelineId,
) -> Any:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact",
)
self.verbose_error(res)
res_json = res.json()
return res_json
def timeline_compact(
self,
tenant_id: TenantId | TenantShardId,
@@ -749,7 +761,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
enhanced_gc_bottom_most_compaction=False,
body: dict[str, Any] | None = None,
):
self.is_testing_enabled_or_skip()
query = {}
if force_repartition:
query["force_repartition"] = "true"

View File

@@ -0,0 +1,199 @@
-- create a schema that simulates Neon control plane operations table
-- however use partitioned operations tables with many (e.g. 500) child partition tables per table
-- in summary we create multiple of these partitioned operations tables (with 500 childs each) - until we reach the requested number of tables
-- first we need some other tables that can be referenced by the operations table
-- Table for branches
CREATE TABLE public.branches (
id text PRIMARY KEY
);
-- Table for endpoints
CREATE TABLE public.endpoints (
id text PRIMARY KEY
);
-- Table for projects
CREATE TABLE public.projects (
id text PRIMARY KEY
);
INSERT INTO public.branches (id)
VALUES ('branch_1');
-- Insert one row into endpoints
INSERT INTO public.endpoints (id)
VALUES ('endpoint_1');
-- Insert one row into projects
INSERT INTO public.projects (id)
VALUES ('project_1');
-- now we create a procedure that can create n operations tables
-- we do that in a procedure to save roundtrip latency when scaling the test to many tables
-- prefix is the base table name, e.g. 'operations_scale_1000' if we create 1000 tables
CREATE OR REPLACE PROCEDURE create_partitioned_tables(prefix text, n INT)
LANGUAGE plpgsql AS $$
DECLARE
table_name TEXT; -- Variable to hold table names dynamically
i INT; -- Counter for the loop
BEGIN
-- Loop to create n partitioned tables
FOR i IN 1..n LOOP
table_name := format('%s_%s', prefix, i);
-- Create the partitioned table
EXECUTE format(
'CREATE TABLE public.%s (
project_id character varying NOT NULL,
id uuid NOT NULL,
status integer,
action character varying NOT NULL,
error character varying,
created_at timestamp with time zone NOT NULL DEFAULT now(),
updated_at timestamp with time zone NOT NULL DEFAULT now(),
spec jsonb,
retry_at timestamp with time zone,
failures_count integer DEFAULT 0,
metadata jsonb NOT NULL DEFAULT ''{}''::jsonb,
executor_id text NOT NULL,
attempt_duration_ms integer,
metrics jsonb DEFAULT ''{}''::jsonb,
branch_id text,
endpoint_id text,
next_operation_id uuid,
compute_id text,
connection_attempt_at timestamp with time zone,
concurrency_key text,
queue_id text,
CONSTRAINT %s_pkey PRIMARY KEY (id, created_at),
CONSTRAINT %s_branch_id_fk FOREIGN KEY (branch_id) REFERENCES branches(id) ON DELETE CASCADE,
CONSTRAINT %s_endpoint_id_fk FOREIGN KEY (endpoint_id) REFERENCES endpoints(id) ON DELETE CASCADE,
CONSTRAINT %s_next_operation_id_fk FOREIGN KEY (next_operation_id, created_at) REFERENCES %s(id, created_at),
CONSTRAINT %s_project_id_fk FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE
) PARTITION BY RANGE (created_at)',
table_name, table_name, table_name, table_name, table_name, table_name, table_name
);
-- Add indexes for the partitioned table
EXECUTE format('CREATE INDEX index_%s_on_next_operation_id ON public.%s (next_operation_id)', table_name, table_name);
EXECUTE format('CREATE INDEX index_%s_on_project_id ON public.%s (project_id)', table_name, table_name);
EXECUTE format('CREATE INDEX %s_branch_id ON public.%s (branch_id)', table_name, table_name);
EXECUTE format('CREATE INDEX %s_branch_id_created_idx ON public.%s (branch_id, created_at)', table_name, table_name);
EXECUTE format('CREATE INDEX %s_created_at_idx ON public.%s (created_at)', table_name, table_name);
EXECUTE format('CREATE INDEX %s_created_at_project_id_id_cond_idx ON public.%s (created_at, project_id, id)', table_name, table_name);
EXECUTE format('CREATE INDEX %s_endpoint_id ON public.%s (endpoint_id)', table_name, table_name);
EXECUTE format(
'CREATE INDEX %s_for_redo_worker_idx ON public.%s (executor_id) WHERE status <> 1',
table_name, table_name
);
EXECUTE format(
'CREATE INDEX %s_project_id_status_index ON public.%s ((project_id::text), status)',
table_name, table_name
);
EXECUTE format(
'CREATE INDEX %s_status_not_finished ON public.%s (status) WHERE status <> 1',
table_name, table_name
);
EXECUTE format('CREATE INDEX %s_updated_at_desc_idx ON public.%s (updated_at DESC)', table_name, table_name);
EXECUTE format(
'CREATE INDEX %s_with_failures ON public.%s (failures_count) WHERE failures_count > 0',
table_name, table_name
);
END LOOP;
END;
$$;
-- next we create a procedure that can add the child partitions (one per day) to each of the operations tables
CREATE OR REPLACE PROCEDURE create_operations_partitions(
table_name TEXT,
start_date DATE,
end_date DATE
)
LANGUAGE plpgsql AS $$
DECLARE
partition_date DATE;
partition_name TEXT;
counter INT := 0; -- Counter to track the number of tables created in the current transaction
BEGIN
partition_date := start_date;
-- Create partitions in batches
WHILE partition_date < end_date LOOP
partition_name := format('%s_%s', table_name, to_char(partition_date,'YYYY_MM_DD'));
EXECUTE format(
'CREATE TABLE IF NOT EXISTS public.%s PARTITION OF public.%s
FOR VALUES FROM (''%s'') TO (''%s'')',
partition_name,
table_name,
partition_date,
partition_date + INTERVAL '1 day'
);
counter := counter + 1;
-- Commit and reset counter after every 100 partitions
IF counter >= 100 THEN
COMMIT;
counter := 0; -- Reset the counter
END IF;
-- Advance to the next day
partition_date := partition_date + INTERVAL '1 day';
END LOOP;
-- Final commit for remaining partitions
IF counter > 0 THEN
COMMIT;
END IF;
-- Insert synthetic rows into each partition
EXECUTE format(
'INSERT INTO %I (
project_id,
branch_id,
endpoint_id,
id,
status,
action,
created_at,
updated_at,
spec,
metadata,
executor_id,
failures_count
)
SELECT
''project_1'', -- project_id
''branch_1'', -- branch_id
''endpoint_1'', -- endpoint_id
''e8bba687-0df9-4291-bfcd-7d5f6aa7c158'', -- unique id
1, -- status
''SYNTHETIC_ACTION'', -- action
gs::timestamp + interval ''0 ms'', -- created_at
gs::timestamp + interval ''1 minute'', -- updated_at
''{"key": "value"}'', -- spec (JSONB)
''{"metadata_key": "metadata_value"}'', -- metadata (JSONB)
''executor_1'', -- executor_id
0 -- failures_count
FROM generate_series(%L, %L::DATE - INTERVAL ''1 day'', INTERVAL ''1 day'') AS gs',
table_name, start_date, end_date
);
-- Commit the inserted rows
COMMIT;
END;
$$;
-- we can now create partitioned tables using something like
-- CALL create_partitioned_tables('operations_scale_1000' ,10);
-- and we can create the child partitions for a table using something like
-- CALL create_operations_partitions(
-- 'operations_scale_1000_1',
-- '2000-01-01', -- Start date
-- ('2000-01-01'::DATE + INTERVAL '1 day' * 500)::DATE -- End date (start date + number of days)
-- );

View File

@@ -22,7 +22,7 @@ def gc_feedback_impl(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
# set PITR interval to be small, so we can do GC
"pitr_interval": "60 s",
"pitr_interval": "10 s",
# "compaction_threshold": "3",
# "image_creation_threshold": "2",
}
@@ -32,6 +32,7 @@ def gc_feedback_impl(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
n_steps = 10
n_update_iters = 100
step_size = 10000
branch_created = 0
with endpoint.cursor() as cur:
cur.execute("SET statement_timeout='1000s'")
cur.execute(
@@ -66,6 +67,7 @@ def gc_feedback_impl(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
if mode == "with_snapshots":
if step == n_steps / 2:
env.create_branch("child")
branch_created += 1
max_num_of_deltas_above_image = 0
max_total_num_of_deltas = 0
@@ -142,6 +144,15 @@ def gc_feedback_impl(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
with layer_map_path.open("w") as f:
f.write(json.dumps(client.timeline_layer_map_info(tenant_id, timeline_id)))
# We should have collected all garbage
if mode == "normal":
# in theory we should get physical size ~= logical size, but given that gc interval is 10s,
# and the layer has indexes that might contribute to the fluctuation, we allow a small margin
# of 1 here, and the end ratio we are asserting is 1 (margin) + 1 (expected) = 2.
assert physical_size / logical_size < 2
elif mode == "with_snapshots":
assert physical_size / logical_size < (2 + branch_created)
@pytest.mark.timeout(10000)
def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):

View File

@@ -0,0 +1,66 @@
import os
from pathlib import Path
import pytest
from fixtures.compare_fixtures import RemoteCompare
from fixtures.log_helper import log
def get_num_relations(default: int = 1000) -> list[int]:
# We parametrize each run with scale specifying the number of wanted child partitions.
# Databases are pre-created and passed through BENCHMARK_CONNSTR env variable.
scales = os.getenv("TEST_NUM_RELATIONS", default=str(default))
rv = []
for s in scales.split(","):
scale = int(s)
rv.append(scale)
return rv
@pytest.mark.parametrize("num_relations", get_num_relations())
@pytest.mark.remote_cluster
def test_perf_many_relations(remote_compare: RemoteCompare, num_relations: int):
"""
Test creating many relations in a single database.
We use partitioned tables with child tables, indexes and constraints to have a realistic schema.
Also we include some common data types like text, uuid, timestamp, JSONB, etc.
see many_relations/create_many_relations.sql
"""
env = remote_compare
# prepare some base tables and the plpgsql procedures that we use to create the tables
sql_file = Path(__file__).parent / "many_relations" / "create_many_relations.sql"
env.pg_bin.run_capture(["psql", env.pg.connstr(), "-f", str(sql_file)])
num_parent_tables = num_relations // 500 + 1
log.info(f"Creating {num_relations} relations in {num_parent_tables} parent tables")
log.info(f"Creating {num_parent_tables} parent tables")
sql = f"CALL create_partitioned_tables('operations_scale_{num_relations}', {num_parent_tables})"
log.info(sql)
env.pg_bin.run_capture(["psql", env.pg.connstr(), "-c", sql])
current_table = 0
num_relations_remaining = num_relations
# now run and measure the actual relation creation
while num_relations_remaining > 0:
current_table += 1
parent_table_name = f"operations_scale_{num_relations}_{current_table}"
if num_relations_remaining > 500:
num_relations_to_create = 500
else:
num_relations_to_create = num_relations_remaining
num_relations_remaining -= num_relations_to_create
log.info(
f"Creating {num_relations_to_create} child tables in partitioned parent table '{parent_table_name}'"
)
sql = f"CALL create_operations_partitions( '{parent_table_name}', '2000-01-01', ('2000-01-01'::DATE + INTERVAL '1 day' * {num_relations_to_create})::DATE)"
log.info(sql)
with env.zenbenchmark.record_duration(
f"CREATE_TABLE/{current_table}/{num_relations_to_create}"
):
env.pg_bin.run_capture(
["psql", env.pg.connstr(options="-cstatement_timeout=1000s "), "-c", sql]
)

View File

@@ -134,6 +134,10 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
}
env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF)
env.pageserver.allowed_errors.append(
r".*failed to acquire partition lock during gc-compaction.*"
)
env.pageserver.allowed_errors.append(r".*repartition() called concurrently.*")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
@@ -172,6 +176,12 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
workload.churn_rows(row_count, env.pageserver.id)
def compaction_finished():
queue_depth = len(ps_http.timeline_compact_info(tenant_id, timeline_id))
assert queue_depth == 0
wait_until(compaction_finished, timeout=60)
# ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked)
env.pageserver.assert_log_contains(
"scheduled_compact_timeline.*picked .* layers for compaction"

View File

@@ -84,6 +84,8 @@ def test_pgdata_import_smoke(
elif rel_block_size == RelBlockSize.TWO_STRPES_PER_SHARD:
target_relblock_size = (shard_count or 1) * stripe_size * 8192 * 2
elif rel_block_size == RelBlockSize.MULTIPLE_RELATION_SEGMENTS:
# Postgres uses a 1GiB segment size, fixed at compile time, so we must use >2GB of data
# to exercise multiple segments.
target_relblock_size = int(((2.333 * 1024 * 1024 * 1024) // 8192) * 8192)
else:
raise ValueError
@@ -111,9 +113,15 @@ def test_pgdata_import_smoke(
def validate_vanilla_equivalence(ep):
# TODO: would be nicer to just compare pgdump
assert ep.safe_psql("select count(*), sum(data::bigint)::bigint from t") == [
(expect_nrows, expect_sum)
]
# Enable IO concurrency for batching on large sequential scan, to avoid making
# this test unnecessarily onerous on CPU
assert ep.safe_psql_many(
[
"set effective_io_concurrency=32;",
"select count(*), sum(data::bigint)::bigint from t",
]
) == [[], [(expect_nrows, expect_sum)]]
validate_vanilla_equivalence(vanilla_pg)

View File

@@ -22,7 +22,10 @@ CHECKPOINT_TIMEOUT_SECONDS = 60
async def run_worker_for_tenant(
env: NeonEnv, entries: int, tenant: TenantId, offset: int | None = None
env: NeonEnv,
entries: int,
tenant: TenantId,
offset: int | None = None,
) -> Lsn:
if offset is None:
offset = 0
@@ -37,12 +40,20 @@ async def run_worker_for_tenant(
finally:
await conn.close(timeout=10)
last_flush_lsn = Lsn(ep.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
loop = asyncio.get_running_loop()
sql = await loop.run_in_executor(
None, lambda ep: ep.safe_psql("SELECT pg_current_wal_flush_lsn()"), ep
)
last_flush_lsn = Lsn(sql[0][0])
return last_flush_lsn
async def run_worker(env: NeonEnv, tenant_conf, entries: int) -> tuple[TenantId, TimelineId, Lsn]:
tenant, timeline = env.create_tenant(conf=tenant_conf)
loop = asyncio.get_running_loop()
# capture tenant_conf by specifying `tenant_conf=tenant_conf`, otherwise it will be evaluated to some random value
tenant, timeline = await loop.run_in_executor(
None, lambda tenant_conf, env: env.create_tenant(conf=tenant_conf), tenant_conf, env
)
last_flush_lsn = await run_worker_for_tenant(env, entries, tenant)
return tenant, timeline, last_flush_lsn

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
import time
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync, wait_replica_caughtup
def test_physical_and_logical_replication_slot_not_copied(neon_simple_env: NeonEnv, vanilla_pg):
@@ -38,6 +38,8 @@ def test_physical_and_logical_replication_slot_not_copied(neon_simple_env: NeonE
for pk in range(n_records):
p_cur.execute("insert into t (pk) values (%s)", (pk,))
wait_replica_caughtup(primary, secondary)
s_cur.execute("select count(*) from t")
assert s_cur.fetchall()[0][0] == n_records

View File

@@ -3009,7 +3009,7 @@ def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
compared = [dict(a), dict(b)]
masked_keys = ["created_at", "updated_at"]
masked_keys = ["created_at", "updated_at", "active"]
for d in compared:
# keep deleting these in case we are comparing the body as it will be uploaded by real scripts

View File

@@ -1,7 +1,7 @@
{
"v17": [
"17.2",
"65c4e46baf56ec05412c7dd63d62faff0b33dcfb"
"7e3f3974bc8895938308f94d0e96879ffae638cd"
],
"v16": [
"16.6",