Compare commits

...

27 Commits

Author SHA1 Message Date
Sasha Krassovsky
d699f5ece8 Fix codestyle 2023-09-22 14:50:18 -07:00
Sasha Krassovsky
43b301144a Let compute_ctl accept environment variables for compute 2023-09-22 14:17:06 -07:00
Konstantin Knizhnik
6723a79bec Do not handle lfc_change_limit in processes not haing PGPROC structure (#5332)
## Problem

See https://neondb.slack.com/archives/C05L7D1JAUS/p1693775881474019

## Summary of changes

Do not perform local file cache resizing in processes having no PGPROC

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2023-09-19 21:55:36 +03:00
Joonas Koivunen
5d8597c2f0 refactor(consumption_metrics): post-split cleanup (#5327)
Split off from #5297. Builds upon #5326. Handles original review
comments which I did not move to earlier split PRs. Completes test
support for verifying events by notifying of the last batch of events.
Adds cleaning up of tempfiles left because of an unlucky shutdown or
SIGKILL.

Finally closes #5175.

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2023-09-18 23:30:01 +03:00
Em Sharnoff
722e5260bf vm-monitor: Don't set cgroup memory.max (#5333)
All it does is make postgres OOM more often (which, tbf, means we're
less likely to have e.g. compute_ctl get OOM-killed, but that tradeoff
isn't worth it).

Internally, this means removing all references to `memory.max` and the
places where we calculate or store the intended value.

As discussed in the sync earlier.

ref:

- https://neondb.slack.com/archives/C03H1K0PGKH/p1694698949252439?thread_ts=1694505575.693449&cid=C03H1K0PGKH
- https://neondb.slack.com/archives/C03H1K0PGKH/p1695049198622759
2023-09-18 17:47:48 +00:00
Em Sharnoff
18f3a706da Bump vm-builder v0.17.5 -> v0.17.10 (#5334)
Only notable change is including neondatabase/autoscaling#523, which we
hope will help with making sure that TCP connections are properly
terminated before shutdown (which hopefully fixes a leak in the
pageserver).
2023-09-18 17:30:34 +00:00
Alexander Bayandin
70b17981a7 Enable compatibility tests on Postgres 16 (#5314)
## Problem

We didn't have a Postgres 16 snapshot of data to run compatibility tests
on, but now we have it (since the release).

## Summary of changes
- remove `@skip_on_postgres(PgVersion.V16, ...)` from compatibility
tests
2023-09-18 12:58:34 +01:00
Alexander Bayandin
0904d8cf4a Downgrade plv8 for Postgres 14/15 (#5320)
Backport https://github.com/neondatabase/neon/pull/5318 from release 
into main
2023-09-18 12:55:49 +01:00
Joonas Koivunen
55371af711 test: workaround known bad mock_s3 ListObjectsV2 response (#5330)
this should allow test
test_delete_tenant_exercise_crash_safety_failpoints with
debug-pg16-Check.RETRY_WITH_RESTART-mock_s3-tenant-delete-before-remove-timelines-dir-True
to pass more reliably.
2023-09-18 09:24:53 +02:00
Joonas Koivunen
e62ab176b8 refactor(consumption_metrics): split (#5326)
Split off from #5297. Builds upon #5325, should contain only the
splitting. Next up: #5327.
2023-09-16 18:45:08 +03:00
Joonas Koivunen
a221ecb0da test: test_download_remote_layers_api again (#5322)
The test is still flaky, perhaps more after #5233, see #3831.

Do one more `timeline_checkpoint` *after* shutting down safekeepers
*before* shutting down pageserver. Put more effort into not compacting
or creating image layers.
2023-09-16 18:27:19 +03:00
Joonas Koivunen
9cf4ae86ff refactor(consumption_metrics): pre-split cleanup (#5325)
Cleanups in preparation to splitting the consumption_metrics.rs in
#5326.

Split off from #5297.
2023-09-16 18:08:33 +03:00
Joonas Koivunen
74d99b5883 refactor(test_consumption_metrics): split for pageserver and proxy (#5324)
With the addition of the "stateful event verification" the
test_consumption_metrics.py is now too crowded. Split it up for
pageserver and proxy.

Split from #5297.
2023-09-16 18:05:35 +03:00
Joonas Koivunen
f902777202 fix: consumption metrics on restart (#5323)
Write collected metrics to disk to recover previously sent metrics on
restart.

Recover the previously collected metrics during startup, send them over
at right time
  - send cached synthetic size before actual is calculated
  - when `last_record_lsn` rolls back on startup
      - stay at last sent `written_size` metric
      - send `written_size_delta_bytes` metric as 0

Add test support: stateful verification of events in python tests.

Fixes: #5206
Cc: #5175 (loggings, will be enhanced in follow-up)
2023-09-16 11:24:42 +03:00
Joonas Koivunen
a7f4ee02a3 fix(consumption_metrics): exp backoff retry (#5317)
Split off from #5297. Depends on #5315.
Cc: #5175 for retry
2023-09-16 01:11:01 +03:00
Joonas Koivunen
00c4c8e2e8 feat(consumption_metrics): remove event deduplication support (#5316)
We no longer use pageserver deduplication anywhere. Give out a warning
instead.

Split off from #5297.

Cc: #5175 for dedup.
2023-09-16 00:06:19 +03:00
Joonas Koivunen
c5d226d9c7 refactor(consumption_metrics): prereq refactorings, tests (#5315)
Split off from #5297.

There should be no functional changes here:
- refactor tenant metric "production" like previously timeline, allows
unit testing, though not interesting enough yet to test
- introduce type aliases for tuples
- extra refactoring for `collect`, was initially thinking it was useful
but will do a inline later
- shorter binding names
- support for future allocation reuse quests with IdempotencyKey
- move code out of tokio::select to make it rustfmt-able
- generification, allow later replacement of `&'static str` with enum
- add tests that assert sent event contents exactly
2023-09-15 19:44:14 +03:00
Konstantin Knizhnik
66fa176cc8 Handle update of VM in XLOG_HEAP_LOCK/XLOG_HEAP2_LOCK_UPDATED WAL records (#4896)
## Problem

VM should be updated if XLH_LOCK_ALL_FROZEN_CLEARED flags is set in
XLOG_HEAP_LOCK,XLOG_HEAP_2_LOCK_UPDATED WAL records

## Summary of changes

Add handling of this records in walingest.rs

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2023-09-15 17:47:29 +03:00
Heikki Linnakangas
9e6b5b686c Add a test case for "CREATE DATABASE STRATEGY=file_copy". (#5301)
It was utterly broken on v15 before commit 83e7e5dbbd, which fixed the
incorrect definition of XLOG_DBASE_CREATE_WAL_LOG. We never noticed
because we had no tests for it.
2023-09-15 16:50:57 +03:00
Rahul Modpur
e6985bd098 Move tenant & timeline dir method to NeonPageserver and use them everywhere (#5262)
## Problem
In many places in test code, paths are built manually from what
NeonEnv.tenant_dir and NeonEnv.timeline_dir could do.

## Summary of changes
1. NeonEnv.tenant_dir and NeonEnv.timeline_dir moved under class
NeonPageserver as the path they use is per-pageserver instance.
2. Used these everywhere to replace manual path building

Closes #5258

---------

Signed-off-by: Rahul Modpur <rmodpur2@gmail.com>
2023-09-15 11:17:18 +01:00
Konstantin Knizhnik
e400a38fb9 References to old and new blocks were mixed in xlog_heap_update handler (#5312)
## Problem

See https://neondb.slack.com/archives/C05L7D1JAUS/p1694614585955029

https://www.notion.so/neondatabase/Duplicate-key-issue-651627ce843c45188fbdcb2d30fd2178

## Summary of changes

Swap old/new block references

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-09-15 10:32:25 +03:00
Alexander Bayandin
bd36d1c44a approved-for-ci-run.yml: fix variable name and permissions (#5307)
## Problem
- `gh pr list` fails with `unknown argument "main"; please quote all
values that have spaces due to using a variable with the wrong name
- `permissions: write-all` are too wide for the job

## Summary of changes
- For variable name `HEAD` -> `BRANCH`
- Grant only required permissions for each job

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-09-14 20:18:49 +03:00
Alexander Bayandin
0501b74f55 Update checksum for pg_hint_plan (#5309)
## Problem

The checksum for `pg_hint_plan` doesn't match:
```
sha256sum: WARNING: 1 computed checksum did NOT match
```

Ref
https://github.com/neondatabase/neon/actions/runs/6185715461/job/16793609251?pr=5307

It seems that the release was retagged yesterday:
https://github.com/ossc-db/pg_hint_plan/releases/tag/REL16_1_6_0

I don't see any malicious changes from 15_1.5.1:
https://github.com/ossc-db/pg_hint_plan/compare/REL15_1_5_1...REL16_1_6_0,
so it should be ok to update.

## Summary of changes
- Update checksum for `pg_hint_plan` 16_1.6.0
2023-09-14 18:17:50 +03:00
Em Sharnoff
3895829bda vm-monitor: Fix cgroup throttling (#5303)
I believe this (not actual IO problems) is the cause of the "disk speed
issue" that we've had for VMs recently. See e.g.:

1. https://neondb.slack.com/archives/C03H1K0PGKH/p1694287808046179?thread_ts=1694271790.580099&cid=C03H1K0PGKH
2. https://neondb.slack.com/archives/C03H1K0PGKH/p1694511932560659

The vm-informant (and now, the vm-monitor, its replacement) is supposed
to gradually increase the `neon-postgres` cgroup's memory.high value,
because otherwise the kernel will throttle all the processes in the
cgroup.

This PR fixes a bug with the vm-monitor's implementation of this
behavior.

---

Other references, for the vm-informant's implementation:

- Original issue: neondatabase/autoscaling#44
- Original PR: neondatabase/autoscaling#223
2023-09-14 13:21:50 +03:00
Joonas Koivunen
ffd146c3e5 refactor: globals in tests (#5298)
Refactor tests to have less globals.

This will allow to hopefully write more complex tests for our new metric
collection requirements in #5297. Includes reverted work from #4761
related to test globals.

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
Co-authored-by: MMeent <matthias@neon.tech>
2023-09-13 22:05:30 +03:00
Konstantin Knizhnik
1697e7b319 Fix lfc_ensure_function which now disables LFC (#5294)
## Problem

There was a bug in lfc_ensure_opened which actually disables LFC

## Summary of changes

Return true ifLFC file is normally opened

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2023-09-13 08:56:03 +03:00
bojanserafimov
8556d94740 proxy http: reproduce issue with transactions in pool (#5293)
xfail test reproducing issue https://github.com/neondatabase/neon/issues/4698
2023-09-12 17:13:25 -04:00
57 changed files with 2914 additions and 1290 deletions

View File

@@ -16,21 +16,29 @@ on:
# Actual magic happens here:
- labeled
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number }}
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
PR_NUMBER: ${{ github.event.pull_request.number }}
BRANCH: "ci-run/pr-${{ github.event.pull_request.number }}"
permissions: write-all
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
permissions: {}
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number }}
defaults:
run:
shell: bash -euo pipefail {0}
jobs:
remove-label:
# Remove `approved-for-ci-run` label if the workflow is triggered by changes in a PR.
# The PR should be reviewed and labelled manually again.
permissions:
pull-requests: write # For `gh pr edit`
if: |
contains(fromJSON('["opened", "synchronize", "reopened", "closed"]'), github.event.action) &&
contains(github.event.pull_request.labels.*.name, 'approved-for-ci-run')
@@ -43,6 +51,10 @@ jobs:
create-or-update-pr-for-ci-run:
# Create local PR for an `approved-for-ci-run` labelled PR to run CI pipeline in it.
permissions:
pull-requests: write # for `gh pr edit`
# For `git push` and `gh pr create` we use CI_ACCESS_TOKEN
if: |
github.event.action == 'labeled' &&
contains(github.event.pull_request.labels.*.name, 'approved-for-ci-run')
@@ -75,7 +87,7 @@ jobs:
Feel free to review/comment/discuss the original PR #${PR_NUMBER}.
EOF
ALREADY_CREATED="$(gh pr --repo ${GITHUB_REPOSITORY} list --head ${HEAD} --base main --json number --jq '.[].number')"
ALREADY_CREATED="$(gh pr --repo ${GITHUB_REPOSITORY} list --head ${BRANCH} --base main --json number --jq '.[].number')"
if [ -z "${ALREADY_CREATED}" ]; then
gh pr --repo "${GITHUB_REPOSITORY}" create --title "CI run for PR #${PR_NUMBER}" \
--body-file "body.md" \
@@ -87,6 +99,10 @@ jobs:
cleanup:
# Close PRs and delete branchs if the original PR is closed.
permissions:
contents: write # for `--delete-branch` flag in `gh pr close`
pull-requests: write # for `gh pr close`
if: |
github.event.action == 'closed' &&
github.event.pull_request.head.repo.full_name != github.repository
@@ -94,8 +110,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- run: |
CLOSED="$(gh pr --repo ${GITHUB_REPOSITORY} list --head ${HEAD} --json 'closed' --jq '.[].closed')"
- name: Close PR and delete `ci-run/pr-${{ env.PR_NUMBER }}` branch
run: |
CLOSED="$(gh pr --repo ${GITHUB_REPOSITORY} list --head ${BRANCH} --json 'closed' --jq '.[].closed')"
if [ "${CLOSED}" == "false" ]; then
gh pr --repo "${GITHUB_REPOSITORY}" close "${BRANCH}" --delete-branch
fi

View File

@@ -834,7 +834,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.17.5
VM_BUILDER_VERSION: v0.17.10
steps:
- name: Checkout

View File

@@ -124,8 +124,21 @@ COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y ninja-build python3-dev libncurses5 binutils clang
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.8.tar.gz -O plv8.tar.gz && \
echo "92b10c7db39afdae97ff748c9ec54713826af222c459084ad002571b79eb3f49 plv8.tar.gz" | sha256sum --check && \
RUN case "${PG_VERSION}" in \
"v14" | "v15") \
export PLV8_VERSION=3.1.5 \
export PLV8_CHECKSUM=1e108d5df639e4c189e1c5bdfa2432a521c126ca89e7e5a969d46899ca7bf106 \
;; \
"v16") \
export PLV8_VERSION=3.1.8 \
export PLV8_CHECKSUM=92b10c7db39afdae97ff748c9ec54713826af222c459084ad002571b79eb3f49 \
;; \
*) \
echo "Export the valid PG_VERSION variable" && exit 1 \
;; \
esac && \
wget https://github.com/plv8/plv8/archive/refs/tags/v${PLV8_VERSION}.tar.gz -O plv8.tar.gz && \
echo "${PLV8_CHECKSUM} plv8.tar.gz" | sha256sum --check && \
mkdir plv8-src && cd plv8-src && tar xvzf ../plv8.tar.gz --strip-components=1 -C . && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
make DOCKER=1 -j $(getconf _NPROCESSORS_ONLN) install && \
@@ -416,7 +429,7 @@ RUN case "${PG_VERSION}" in \
;; \
"v16") \
export PG_HINT_PLAN_VERSION=16_1_6_0 \
export PG_HINT_PLAN_CHECKSUM=ce6a8040c78012000f5da7240caf6a971401412f41d33f930f09291e6c304b99 \
export PG_HINT_PLAN_CHECKSUM=fc85a9212e7d2819d4ae4ac75817481101833c3cfa9f0fe1f980984e12347d00 \
;; \
*) \
echo "Export the valid PG_HINT_PLAN_VERSION variable" && exit 1 \

View File

@@ -630,25 +630,27 @@ impl ComputeNode {
/// Start Postgres as a child process and manage DBs/roles.
/// After that this will hang waiting on the postmaster process to exit.
#[instrument(skip_all)]
pub fn start_postgres(
&self,
storage_auth_token: Option<String>,
) -> Result<std::process::Child> {
pub fn start_postgres(&self, pspec: &ParsedSpec) -> Result<std::process::Child> {
let pgdata_path = Path::new(&self.pgdata);
let spec = &pspec.spec;
// Run postgres as a child process.
let mut pg = maybe_cgexec(&self.pgbin)
.args(["-D", &self.pgdata])
.envs(if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
} else {
vec![]
})
.spawn()
.expect("cannot start postgres process");
let mut pg_command = maybe_cgexec(&self.pgbin);
pg_command.args(["-D", &self.pgdata]);
if let Some(storage_auth_token) = &spec.storage_auth_token {
pg_command.env("NEON_AUTH_TOKEN", storage_auth_token);
}
if let Some(env_vars) = &spec.env_vars {
pg_command.envs(
env_vars
.iter()
.map(|(k, v)| (k, v.clone().unwrap_or(String::new()))),
);
}
let mut pg = pg_command.spawn().expect("cannot start postgres process");
wait_for_postgres(&mut pg, pgdata_path)?;
Ok(pg)
}
@@ -797,7 +799,7 @@ impl ComputeNode {
self.prepare_pgdata(&compute_state, extension_server_port)?;
let start_time = Utc::now();
let pg = self.start_postgres(pspec.storage_auth_token.clone())?;
let pg = self.start_postgres(pspec)?;
let config_time = Utc::now();
if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates {

View File

@@ -500,6 +500,7 @@ impl Endpoint {
safekeeper_connstrings,
storage_auth_token: auth_token.clone(),
remote_extensions: None,
env_vars: None,
};
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;

View File

@@ -58,7 +58,6 @@ pub struct ComputeSpec {
pub pageserver_connstring: Option<String>,
#[serde(default)]
pub safekeeper_connstrings: Vec<String>,
#[serde(default)]
pub mode: ComputeMode,
@@ -68,6 +67,9 @@ pub struct ComputeSpec {
// information about available remote extensions
pub remote_extensions: Option<RemoteExtSpec>,
// Environment variables to set on the compute
pub env_vars: Option<HashMap<String, Option<String>>>,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]

View File

@@ -243,5 +243,9 @@
"public_extensions": [
"postgis"
]
}
},
"env_vars": {
"OPENAI_API_KEY" : "i hope heikki has a good day today",
"MY_NULL_ENV_VAR" : null
}
}

View File

@@ -3,9 +3,9 @@
//!
use chrono::{DateTime, Utc};
use rand::Rng;
use serde::Serialize;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[serde(tag = "type")]
pub enum EventType {
#[serde(rename = "absolute")]
@@ -27,7 +27,8 @@ impl EventType {
}
pub fn incremental_timerange(&self) -> Option<std::ops::Range<&DateTime<Utc>>> {
// these can most likely be thought of as Range or RangeFull
// these can most likely be thought of as Range or RangeFull, at least pageserver creates
// incremental ranges where the stop and next start are equal.
use EventType::*;
match self {
Incremental {
@@ -41,15 +42,25 @@ impl EventType {
pub fn is_incremental(&self) -> bool {
matches!(self, EventType::Incremental { .. })
}
/// Returns the absolute time, or for incremental ranges, the stop time.
pub fn recorded_at(&self) -> &DateTime<Utc> {
use EventType::*;
match self {
Absolute { time } => time,
Incremental { stop_time, .. } => stop_time,
}
}
}
#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct Event<Extra> {
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct Event<Extra, Metric> {
#[serde(flatten)]
#[serde(rename = "type")]
pub kind: EventType,
pub metric: &'static str,
pub metric: Metric,
pub idempotency_key: String,
pub value: u64,
@@ -58,12 +69,38 @@ pub struct Event<Extra> {
}
pub fn idempotency_key(node_id: &str) -> String {
format!(
"{}-{}-{:04}",
Utc::now(),
node_id,
rand::thread_rng().gen_range(0..=9999)
)
IdempotencyKey::generate(node_id).to_string()
}
/// Downstream users will use these to detect upload retries.
pub struct IdempotencyKey<'a> {
now: chrono::DateTime<Utc>,
node_id: &'a str,
nonce: u16,
}
impl std::fmt::Display for IdempotencyKey<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}-{}-{:04}", self.now, self.node_id, self.nonce)
}
}
impl<'a> IdempotencyKey<'a> {
pub fn generate(node_id: &'a str) -> Self {
IdempotencyKey {
now: Utc::now(),
node_id,
nonce: rand::thread_rng().gen_range(0..=9999),
}
}
pub fn for_tests(now: DateTime<Utc>, node_id: &'a str, nonce: u16) -> Self {
IdempotencyKey {
now,
node_id,
nonce,
}
}
}
pub const CHUNK_SIZE: usize = 1000;

View File

@@ -137,9 +137,12 @@ pub const XLOG_HEAP_INSERT: u8 = 0x00;
pub const XLOG_HEAP_DELETE: u8 = 0x10;
pub const XLOG_HEAP_UPDATE: u8 = 0x20;
pub const XLOG_HEAP_HOT_UPDATE: u8 = 0x40;
pub const XLOG_HEAP_LOCK: u8 = 0x60;
pub const XLOG_HEAP_INIT_PAGE: u8 = 0x80;
pub const XLOG_HEAP2_VISIBLE: u8 = 0x40;
pub const XLOG_HEAP2_MULTI_INSERT: u8 = 0x50;
pub const XLOG_HEAP2_LOCK_UPDATED: u8 = 0x60;
pub const XLH_LOCK_ALL_FROZEN_CLEARED: u8 = 0x01;
pub const XLH_INSERT_ALL_FROZEN_SET: u8 = (1 << 5) as u8;
pub const XLH_INSERT_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
pub const XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;

View File

@@ -315,12 +315,8 @@ impl CgroupWatcher {
where
E: Stream<Item = Sequenced<u64>>,
{
// There are several actions might do when receiving a `memory.high`,
// such as freezing the cgroup, or increasing its `memory.high`. We don't
// want to do these things too often (because postgres needs to run, and
// we only have so much memory). These timers serve as rate limits for this.
let mut wait_to_freeze = pin!(tokio::time::sleep(Duration::ZERO));
let mut wait_to_increase_memory_high = pin!(tokio::time::sleep(Duration::ZERO));
let mut last_memory_high_increase_at: Option<Instant> = None;
let mut events = pin!(events);
// Are we waiting to be upscaled? Could be true if we request upscale due
@@ -332,6 +328,8 @@ impl CgroupWatcher {
upscale = upscales.recv() => {
let Sequenced { seqnum, data } = upscale
.context("failed to listen on upscale notification channel")?;
waiting_on_upscale = false;
last_memory_high_increase_at = None;
self.last_upscale_seqnum.store(seqnum, Ordering::Release);
info!(cpu = data.cpu, mem_bytes = data.mem, "received upscale");
}
@@ -396,12 +394,17 @@ impl CgroupWatcher {
.send(())
.await
.context("failed to request upscale")?;
waiting_on_upscale = true;
continue;
}
// Shoot, we can't freeze or and we're still waiting on upscale,
// increase memory.high to reduce throttling
if wait_to_increase_memory_high.is_elapsed() {
let can_increase_memory_high = match last_memory_high_increase_at {
None => true,
Some(t) => t.elapsed() > self.config.memory_high_increase_every,
};
if can_increase_memory_high {
info!(
"received memory.high event, \
but too soon to refreeze and already requested upscale \
@@ -437,12 +440,11 @@ impl CgroupWatcher {
);
self.set_high_bytes(new_high)
.context("failed to set memory.high")?;
wait_to_increase_memory_high
.as_mut()
.reset(Instant::now() + self.config.memory_high_increase_every)
last_memory_high_increase_at = Some(Instant::now());
continue;
}
// we can't do anything
info!("received memory.high event, but can't do anything");
}
};
}
@@ -559,14 +561,7 @@ impl CgroupWatcher {
/// Setting these values also affects the thresholds for receiving usage alerts.
#[derive(Debug)]
pub struct MemoryLimits {
high: u64,
max: u64,
}
impl MemoryLimits {
pub fn new(high: u64, max: u64) -> Self {
Self { max, high }
}
pub high: u64,
}
// Methods for manipulating the actual cgroup
@@ -643,12 +638,7 @@ impl CgroupWatcher {
/// Set cgroup memory.high and memory.max.
pub fn set_limits(&self, limits: &MemoryLimits) -> anyhow::Result<()> {
info!(
limits.high,
limits.max,
path = self.path(),
"writing new memory limits",
);
info!(limits.high, path = self.path(), "writing new memory limits",);
self.memory()
.context("failed to get memory subsystem while setting memory limits")?
.set_mem(cgroups_rs::memory::SetMemory {
@@ -657,7 +647,7 @@ impl CgroupWatcher {
high: Some(MaxValue::Value(
u64::min(limits.high, i64::MAX as u64) as i64
)),
max: Some(MaxValue::Value(u64::min(limits.max, i64::MAX as u64) as i64)),
max: None,
})
.context("failed to set memory limits")
}
@@ -665,7 +655,7 @@ impl CgroupWatcher {
/// Given some amount of available memory, set the desired cgroup memory limits
pub fn set_memory_limits(&mut self, available_memory: u64) -> anyhow::Result<()> {
let new_high = self.config.calculate_memory_high_value(available_memory);
let limits = MemoryLimits::new(new_high, available_memory);
let limits = MemoryLimits { high: new_high };
info!(
path = self.path(),
memory = ?limits,

View File

@@ -257,12 +257,11 @@ impl Runner {
new_cgroup_mem_high = cgroup.config.calculate_memory_high_value(available_memory);
}
let limits = MemoryLimits::new(
let limits = MemoryLimits {
// new_cgroup_mem_high is initialized to 0 but it is guarancontextd to not be here
// since it is properly initialized in the previous cgroup if let block
new_cgroup_mem_high,
available_memory,
);
high: new_cgroup_mem_high,
};
cgroup
.set_limits(&limits)
.context("failed to set cgroup memory limits")?;
@@ -328,7 +327,9 @@ impl Runner {
name = cgroup.path(),
"updating cgroup memory.high",
);
let limits = MemoryLimits::new(new_cgroup_mem_high, available_memory);
let limits = MemoryLimits {
high: new_cgroup_mem_high,
};
cgroup
.set_limits(&limits)
.context("failed to set file cache size")?;

View File

@@ -80,11 +80,11 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
tempfile.workspace = true
[dev-dependencies]
criterion.workspace = true
hex-literal.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }
[[bench]]

View File

@@ -518,6 +518,9 @@ fn start_pageserver(
// creates a child context with the right DownloadBehavior.
DownloadBehavior::Error,
);
let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");
task_mgr::spawn(
crate::BACKGROUND_RUNTIME.handle(),
TaskKind::MetricsCollection,
@@ -544,6 +547,7 @@ fn start_pageserver(
conf.cached_metric_collection_interval,
conf.synthetic_size_calculation_interval,
conf.id,
local_disk_storage,
metrics_ctx,
)
.instrument(info_span!("metrics_collection"))

View File

@@ -64,7 +64,7 @@ pub mod defaults {
super::ConfigurableSemaphore::DEFAULT_INITIAL.get();
pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "0s";
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";

View File

@@ -1,188 +1,54 @@
//!
//! Periodically collect consumption metrics for all active tenants
//! and push them to a HTTP endpoint.
//! Cache metrics to send only the updated ones.
//!
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{mgr, LogicalSizeCalculationCause};
use anyhow;
use chrono::{DateTime, Utc};
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use consumption_metrics::EventType;
use pageserver_api::models::TenantState;
use reqwest::Url;
use serde::Serialize;
use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tracing::*;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::id::NodeId;
mod metrics;
use metrics::MetricsKey;
mod disk_cache;
mod upload;
const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
#[serde_as]
#[derive(Serialize, Debug, Clone, Copy)]
struct Ids {
#[serde_as(as = "DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
timeline_id: Option<TimelineId>,
}
/// Basically a key-value pair, but usually in a Vec except for [`Cache`].
///
/// This is as opposed to `consumption_metrics::Event` which is the externally communicated form.
/// Difference is basically the missing idempotency key, which lives only for the duration of
/// upload attempts.
type RawMetric = (MetricsKey, (EventType, u64));
/// Key that uniquely identifies the object, this metric describes.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct MetricsKey {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
metric: &'static str,
}
impl MetricsKey {
const fn absolute_values(self) -> AbsoluteValueFactory {
AbsoluteValueFactory(self)
}
const fn incremental_values(self) -> IncrementalValueFactory {
IncrementalValueFactory(self)
}
}
/// Helper type which each individual metric kind can return to produce only absolute values.
struct AbsoluteValueFactory(MetricsKey);
impl AbsoluteValueFactory {
fn at(self, time: DateTime<Utc>, val: u64) -> (MetricsKey, (EventType, u64)) {
let key = self.0;
(key, (EventType::Absolute { time }, val))
}
}
/// Helper type which each individual metric kind can return to produce only incremental values.
struct IncrementalValueFactory(MetricsKey);
impl IncrementalValueFactory {
#[allow(clippy::wrong_self_convention)]
fn from_previous_up_to(
self,
prev_end: DateTime<Utc>,
up_to: DateTime<Utc>,
val: u64,
) -> (MetricsKey, (EventType, u64)) {
let key = self.0;
// cannot assert prev_end < up_to because these are realtime clock based
(
key,
(
EventType::Incremental {
start_time: prev_end,
stop_time: up_to,
},
val,
),
)
}
fn key(&self) -> &MetricsKey {
&self.0
}
}
// the static part of a MetricsKey
impl MetricsKey {
/// Absolute value of [`Timeline::get_last_record_lsn`].
///
/// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: "written_size",
}
.absolute_values()
}
/// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
/// previously sent, starting from the previously sent incremental time range ending at the
/// latest absolute measurement.
const fn written_size_delta(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> IncrementalValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
// the name here is correctly about data not size, because that is what is wanted by
// downstream pipeline
metric: "written_data_bytes_delta",
}
.incremental_values()
}
/// Exact [`Timeline::get_current_logical_size`].
///
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
const fn timeline_logical_size(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: "timeline_logical_size",
}
.absolute_values()
}
/// [`Tenant::remote_size`]
///
/// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: None,
metric: "remote_storage_size",
}
.absolute_values()
}
/// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
///
/// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: None,
metric: "resident_size",
}
.absolute_values()
}
/// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
///
/// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: None,
metric: "synthetic_storage_size",
}
.absolute_values()
}
}
/// Caches the [`RawMetric`]s
///
/// In practice, during startup, last sent values are stored here to be used in calculating new
/// ones. After successful uploading, the cached values are updated to cache. This used to be used
/// for deduplication, but that is no longer needed.
type Cache = HashMap<MetricsKey, (EventType, u64)>;
/// Main thread that serves metrics collection
pub async fn collect_metrics(
metric_collection_endpoint: &Url,
metric_collection_interval: Duration,
cached_metric_collection_interval: Duration,
_cached_metric_collection_interval: Duration,
synthetic_size_calculation_interval: Duration,
node_id: NodeId,
local_disk_storage: PathBuf,
ctx: RequestContext,
) -> anyhow::Result<()> {
let mut ticker = tokio::time::interval(metric_collection_interval);
info!("starting collect_metrics");
if _cached_metric_collection_interval != Duration::ZERO {
tracing::warn!(
"cached_metric_collection_interval is no longer used, please set it to zero."
)
}
// spin up background worker that caclulates tenant sizes
let worker_ctx =
@@ -202,543 +68,216 @@ pub async fn collect_metrics(
},
);
let path: Arc<PathBuf> = Arc::new(local_disk_storage);
let cancel = task_mgr::shutdown_token();
let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval);
let mut cached_metrics = tokio::select! {
_ = cancel.cancelled() => return Ok(()),
ret = restore_and_reschedule => ret,
};
// define client here to reuse it for all requests
let client = reqwest::ClientBuilder::new()
.timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
.build()
.expect("Failed to create http client with timeout");
let mut cached_metrics = HashMap::new();
let mut prev_iteration_time: std::time::Instant = std::time::Instant::now();
loop {
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
info!("collect_metrics received cancellation request");
return Ok(());
},
tick_at = ticker.tick() => {
// send cached metrics every cached_metric_collection_interval
let send_cached = prev_iteration_time.elapsed() >= cached_metric_collection_interval;
if send_cached {
prev_iteration_time = std::time::Instant::now();
}
collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx, send_cached).await;
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
metric_collection_interval,
"consumption_metrics_collect_metrics",
);
}
}
}
}
/// One iteration of metrics collection
///
/// Gather per-tenant and per-timeline metrics and send them to the `metric_collection_endpoint`.
/// Cache metrics to avoid sending the same metrics multiple times.
///
/// This function handles all errors internally
/// and doesn't break iteration if just one tenant fails.
///
/// TODO
/// - refactor this function (chunking+sending part) to reuse it in proxy module;
async fn collect_metrics_iteration(
client: &reqwest::Client,
cached_metrics: &mut HashMap<MetricsKey, (EventType, u64)>,
metric_collection_endpoint: &reqwest::Url,
node_id: NodeId,
ctx: &RequestContext,
send_cached: bool,
) {
let mut current_metrics: Vec<(MetricsKey, (EventType, u64))> = Vec::new();
trace!(
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
metric_collection_endpoint
);
// get list of tenants
let tenants = match mgr::list_tenants().await {
Ok(tenants) => tenants,
Err(err) => {
error!("failed to list tenants: {:?}", err);
return;
}
};
// iterate through list of Active tenants and collect metrics
for (tenant_id, tenant_state) in tenants {
if tenant_state != TenantState::Active {
continue;
}
let tenant = match mgr::get_tenant(tenant_id, true).await {
Ok(tenant) => tenant,
Err(err) => {
// It is possible that tenant was deleted between
// `list_tenants` and `get_tenant`, so just warn about it.
warn!("failed to get tenant {tenant_id:?}: {err:?}");
continue;
}
};
let mut tenant_resident_size = 0;
// iterate through list of timelines in tenant
for timeline in tenant.list_timelines() {
// collect per-timeline metrics only for active timelines
let timeline_id = timeline.timeline_id;
match TimelineSnapshot::collect(&timeline, ctx) {
Ok(Some(snap)) => {
snap.to_metrics(
tenant_id,
timeline_id,
Utc::now(),
&mut current_metrics,
cached_metrics,
);
}
Ok(None) => {}
Err(e) => {
error!(
"failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
timeline.timeline_id
);
continue;
}
}
tenant_resident_size += timeline.resident_physical_size();
}
current_metrics
.push(MetricsKey::remote_storage_size(tenant_id).at(Utc::now(), tenant.remote_size()));
current_metrics
.push(MetricsKey::resident_size(tenant_id).at(Utc::now(), tenant_resident_size));
// Note that this metric is calculated in a separate bgworker
// Here we only use cached value, which may lag behind the real latest one
let synthetic_size = tenant.cached_synthetic_size();
if synthetic_size != 0 {
// only send non-zeroes because otherwise these show up as errors in logs
current_metrics
.push(MetricsKey::synthetic_size(tenant_id).at(Utc::now(), synthetic_size));
}
}
// Filter metrics, unless we want to send all metrics, including cached ones.
// See: https://github.com/neondatabase/neon/issues/3485
if !send_cached {
current_metrics.retain(|(curr_key, (kind, curr_val))| {
if kind.is_incremental() {
// incremental values (currently only written_size_delta) should not get any cache
// deduplication because they will be used by upstream for "is still alive."
true
} else {
match cached_metrics.get(curr_key) {
Some((_, val)) => val != curr_val,
None => true,
}
}
});
}
if current_metrics.is_empty() {
trace!("no new metrics to send");
return;
}
// Send metrics.
// Split into chunks of 1000 metrics to avoid exceeding the max request size
let chunks = current_metrics.chunks(CHUNK_SIZE);
let mut chunk_to_send: Vec<Event<Ids>> = Vec::with_capacity(CHUNK_SIZE);
let node_id = node_id.to_string();
for chunk in chunks {
chunk_to_send.clear();
// reminder: ticker is ready immediatedly
let mut ticker = tokio::time::interval(metric_collection_interval);
// enrich metrics with type,timestamp and idempotency key before sending
chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event {
kind: *when,
metric: curr_key.metric,
idempotency_key: idempotency_key(&node_id),
value: *curr_val,
extra: Ids {
tenant_id: curr_key.tenant_id,
timeline_id: curr_key.timeline_id,
},
}));
loop {
let tick_at = tokio::select! {
_ = cancel.cancelled() => return Ok(()),
tick_at = ticker.tick() => tick_at,
};
const MAX_RETRIES: u32 = 3;
// these are point in time, with variable "now"
let metrics = metrics::collect_all_metrics(&cached_metrics, &ctx).await;
for attempt in 0..MAX_RETRIES {
let res = client
.post(metric_collection_endpoint.clone())
.json(&EventChunk {
events: (&chunk_to_send).into(),
})
.send()
.await;
if metrics.is_empty() {
continue;
}
match res {
Ok(res) => {
if res.status().is_success() {
// update cached metrics after they were sent successfully
for (curr_key, curr_val) in chunk.iter() {
cached_metrics.insert(curr_key.clone(), *curr_val);
}
} else {
error!("metrics endpoint refused the sent metrics: {:?}", res);
for metric in chunk_to_send
.iter()
.filter(|metric| metric.value > (1u64 << 40))
{
// Report if the metric value is suspiciously large
error!("potentially abnormal metric value: {:?}", metric);
}
}
break;
let metrics = Arc::new(metrics);
// why not race cancellation here? because we are one of the last tasks, and if we are
// already here, better to try to flush the new values.
let flush = async {
match disk_cache::flush_metrics_to_disk(&metrics, &path).await {
Ok(()) => {
tracing::debug!("flushed metrics to disk");
}
Err(err) if err.is_timeout() => {
error!(attempt, "timeout sending metrics, retrying immediately");
continue;
}
Err(err) => {
error!(attempt, ?err, "failed to send metrics");
break;
Err(e) => {
// idea here is that if someone creates a directory as our path, then they
// might notice it from the logs before shutdown and remove it
tracing::error!("failed to persist metrics to {path:?}: {e:#}");
}
}
};
let upload = async {
let res = upload::upload_metrics(
&client,
metric_collection_endpoint,
&cancel,
&node_id,
&metrics,
&mut cached_metrics,
)
.await;
if let Err(e) = res {
// serialization error which should never happen
tracing::error!("failed to upload due to {e:#}");
}
};
// let these run concurrently
let (_, _) = tokio::join!(flush, upload);
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
metric_collection_interval,
"consumption_metrics_collect_metrics",
);
}
}
/// Called on the first iteration in an attempt to join the metric uploading schedule from previous
/// pageserver session. Pageserver is supposed to upload at intervals regardless of restarts.
///
/// Cancellation safe.
async fn restore_and_reschedule(
path: &Arc<PathBuf>,
metric_collection_interval: Duration,
) -> Cache {
let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await {
Ok(found_some) => {
// there is no min needed because we write these sequentially in
// collect_all_metrics
let earlier_metric_at = found_some
.iter()
.map(|(_, (et, _))| et.recorded_at())
.copied()
.next();
let cached = found_some.into_iter().collect::<Cache>();
(cached, earlier_metric_at)
}
Err(e) => {
use std::io::{Error, ErrorKind};
let root = e.root_cause();
let maybe_ioerr = root.downcast_ref::<Error>();
let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
if !is_not_found {
tracing::info!("failed to read any previous metrics from {path:?}: {e:#}");
}
(HashMap::new(), None)
}
};
if let Some(earlier_metric_at) = earlier_metric_at {
let earlier_metric_at: SystemTime = earlier_metric_at.into();
let error = reschedule(earlier_metric_at, metric_collection_interval).await;
if let Some(error) = error {
if error.as_secs() >= 60 {
tracing::info!(
error_ms = error.as_millis(),
"startup scheduling error due to restart"
)
}
}
}
cached
}
/// Internal type to make timeline metric production testable.
///
/// As this value type contains all of the information needed from a timeline to produce the
/// metrics, it can easily be created with different values in test.
struct TimelineSnapshot {
loaded_at: (Lsn, SystemTime),
last_record_lsn: Lsn,
current_exact_logical_size: Option<u64>,
}
async fn reschedule(
earlier_metric_at: SystemTime,
metric_collection_interval: Duration,
) -> Option<Duration> {
let now = SystemTime::now();
match now.duration_since(earlier_metric_at) {
Ok(from_last_send) if from_last_send < metric_collection_interval => {
let sleep_for = metric_collection_interval - from_last_send;
impl TimelineSnapshot {
/// Collect the metrics from an actual timeline.
///
/// Fails currently only when [`Timeline::get_current_logical_size`] fails.
///
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
fn collect(
t: &Arc<crate::tenant::Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<Option<Self>> {
use anyhow::Context;
let deadline = std::time::Instant::now() + sleep_for;
if !t.is_active() {
// no collection for broken or stopping needed, we will still keep the cached values
// though at the caller.
Ok(None)
} else {
let loaded_at = t.loaded_at;
let last_record_lsn = t.get_last_record_lsn();
tokio::time::sleep_until(deadline.into()).await;
let current_exact_logical_size = {
let span = info_span!("collect_metrics_iteration", tenant_id = %t.tenant_id, timeline_id = %t.timeline_id);
let res = span
.in_scope(|| t.get_current_logical_size(ctx))
.context("get_current_logical_size");
match res? {
// Only send timeline logical size when it is fully calculated.
(size, is_exact) if is_exact => Some(size),
(_, _) => None,
}
};
let now = std::time::Instant::now();
Ok(Some(TimelineSnapshot {
loaded_at,
last_record_lsn,
current_exact_logical_size,
}))
}
}
/// Produce the timeline consumption metrics into the `metrics` argument.
fn to_metrics(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
now: DateTime<Utc>,
metrics: &mut Vec<(MetricsKey, (EventType, u64))>,
cache: &HashMap<MetricsKey, (EventType, u64)>,
) {
let timeline_written_size = u64::from(self.last_record_lsn);
let (key, written_size_now) =
MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
// last_record_lsn can only go up, right now at least, TODO: #2592 or related
// features might change this.
let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
// use this when available, because in a stream of incremental values, it will be
// accurate where as when last_record_lsn stops moving, we will only cache the last
// one of those.
let last_stop_time = cache
.get(written_size_delta_key.key())
.map(|(until, _val)| {
until
.incremental_timerange()
.expect("never create EventType::Absolute for written_size_delta")
.end
});
// by default, use the last sent written_size as the basis for
// calculating the delta. if we don't yet have one, use the load time value.
let prev = cache
.get(&key)
.map(|(prev_at, prev)| {
// use the prev time from our last incremental update, or default to latest
// absolute update on the first round.
let prev_at = prev_at
.absolute_time()
.expect("never create EventType::Incremental for written_size");
let prev_at = last_stop_time.unwrap_or(prev_at);
(*prev_at, *prev)
})
.unwrap_or_else(|| {
// if we don't have a previous point of comparison, compare to the load time
// lsn.
let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
(DateTime::from(*loaded_at), disk_consistent_lsn.0)
});
// written_size_bytes_delta
metrics.extend(
if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
let up_to = written_size_now
.0
.absolute_time()
.expect("never create EventType::Incremental for written_size");
let key_value = written_size_delta_key.from_previous_up_to(prev.0, *up_to, delta);
Some(key_value)
// executor threads might be busy, add extra measurements
Some(if now < deadline {
deadline - now
} else {
None
},
);
// written_size
metrics.push((key, written_size_now));
if let Some(size) = self.current_exact_logical_size {
metrics.push(MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, size));
now - deadline
})
}
Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
Err(_) => {
tracing::warn!(
?now,
?earlier_metric_at,
"oldest recorded metric is in future; first values will come out with inconsistent timestamps"
);
earlier_metric_at.duration_since(now).ok()
}
}
}
/// Caclculate synthetic size for each active tenant
pub async fn calculate_synthetic_size_worker(
async fn calculate_synthetic_size_worker(
synthetic_size_calculation_interval: Duration,
ctx: &RequestContext,
) -> anyhow::Result<()> {
info!("starting calculate_synthetic_size_worker");
// reminder: ticker is ready immediatedly
let mut ticker = tokio::time::interval(synthetic_size_calculation_interval);
let cause = LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;
loop {
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
return Ok(());
},
tick_at = ticker.tick() => {
let tick_at = tokio::select! {
_ = task_mgr::shutdown_watcher() => return Ok(()),
tick_at = ticker.tick() => tick_at,
};
let tenants = match mgr::list_tenants().await {
Ok(tenants) => tenants,
Err(e) => {
warn!("cannot get tenant list: {e:#}");
continue;
}
};
// iterate through list of Active tenants and collect metrics
for (tenant_id, tenant_state) in tenants {
let tenants = match mgr::list_tenants().await {
Ok(tenants) => tenants,
Err(e) => {
warn!("cannot get tenant list: {e:#}");
continue;
}
};
if tenant_state != TenantState::Active {
continue;
}
if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await
{
if let Err(e) = tenant.calculate_synthetic_size(
LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize,
ctx).await {
error!("failed to calculate synthetic size for tenant {}: {}", tenant_id, e);
}
}
for (tenant_id, tenant_state) in tenants {
if tenant_state != TenantState::Active {
continue;
}
if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await {
if let Err(e) = tenant.calculate_synthetic_size(cause, ctx).await {
error!("failed to calculate synthetic size for tenant {tenant_id}: {e:#}");
}
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
synthetic_size_calculation_interval,
"consumption_metrics_synthetic_size_worker",
);
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::time::SystemTime;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::consumption_metrics::MetricsKey;
use super::TimelineSnapshot;
use chrono::{DateTime, Utc};
#[test]
fn startup_collected_timeline_metrics_before_advancing() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let mut metrics = Vec::new();
let cache = HashMap::new();
let initdb_lsn = Lsn(0x10000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, SystemTime::now()),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
};
let now = DateTime::<Utc>::from(SystemTime::now());
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
snap.loaded_at.1.into(),
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
#[test]
fn startup_collected_timeline_metrics_second_round() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [now, before, init] = time_backwards();
let now = DateTime::<Utc>::from(now);
let before = DateTime::<Utc>::from(before);
let initdb_lsn = Lsn(0x10000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let mut metrics = Vec::new();
let cache = HashMap::from([
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0)
]);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_previous_up_to(before, now, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
#[test]
fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [now, just_before, before, init] = time_backwards();
let now = DateTime::<Utc>::from(now);
let just_before = DateTime::<Utc>::from(just_before);
let before = DateTime::<Utc>::from(before);
let initdb_lsn = Lsn(0x10000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let mut metrics = Vec::new();
let cache = HashMap::from([
// at t=before was the last time the last_record_lsn changed
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0),
// end time of this event is used for the next ones
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
before,
just_before,
0,
),
]);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
just_before,
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
let mut times = [std::time::SystemTime::UNIX_EPOCH; N];
times[0] = std::time::SystemTime::now();
for behind in 1..N {
times[behind] = times[0] - std::time::Duration::from_secs(behind as u64);
}
times
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
synthetic_size_calculation_interval,
"consumption_metrics_synthetic_size_worker",
);
}
}

View File

@@ -0,0 +1,117 @@
use anyhow::Context;
use std::path::PathBuf;
use std::sync::Arc;
use super::RawMetric;
pub(super) async fn read_metrics_from_disk(path: Arc<PathBuf>) -> anyhow::Result<Vec<RawMetric>> {
// do not add context to each error, callsite will log with full path
let span = tracing::Span::current();
tokio::task::spawn_blocking(move || {
let _e = span.entered();
if let Some(parent) = path.parent() {
if let Err(e) = scan_and_delete_with_same_prefix(&path) {
tracing::info!("failed to cleanup temporary files in {parent:?}: {e:#}");
}
}
let mut file = std::fs::File::open(&*path)?;
let reader = std::io::BufReader::new(&mut file);
anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
})
.await
.context("read metrics join error")
.and_then(|x| x)
}
fn scan_and_delete_with_same_prefix(path: &std::path::Path) -> std::io::Result<()> {
let it = std::fs::read_dir(path.parent().expect("caller checked"))?;
let prefix = path.file_name().expect("caller checked").to_string_lossy();
for entry in it {
let entry = entry?;
if !entry.metadata()?.is_file() {
continue;
}
let file_name = entry.file_name();
if path.file_name().unwrap() == file_name {
// do not remove our actual file
continue;
}
let file_name = file_name.to_string_lossy();
if !file_name.starts_with(&*prefix) {
continue;
}
let path = entry.path();
if let Err(e) = std::fs::remove_file(&path) {
tracing::warn!("cleaning up old tempfile {file_name:?} failed: {e:#}");
} else {
tracing::info!("cleaned up old tempfile {file_name:?}");
}
}
Ok(())
}
pub(super) async fn flush_metrics_to_disk(
current_metrics: &Arc<Vec<RawMetric>>,
path: &Arc<PathBuf>,
) -> anyhow::Result<()> {
use std::io::Write;
anyhow::ensure!(path.parent().is_some(), "path must have parent: {path:?}");
anyhow::ensure!(
path.file_name().is_some(),
"path must have filename: {path:?}"
);
let span = tracing::Span::current();
tokio::task::spawn_blocking({
let current_metrics = current_metrics.clone();
let path = path.clone();
move || {
let _e = span.entered();
let parent = path.parent().expect("existence checked");
let file_name = path.file_name().expect("existence checked");
let mut tempfile = tempfile::Builder::new()
.prefix(file_name)
.suffix(".tmp")
.tempfile_in(parent)?;
tracing::debug!("using tempfile {:?}", tempfile.path());
// write out all of the raw metrics, to be read out later on restart as cached values
{
let mut writer = std::io::BufWriter::new(&mut tempfile);
serde_json::to_writer(&mut writer, &*current_metrics)
.context("serialize metrics")?;
writer
.into_inner()
.map_err(|_| anyhow::anyhow!("flushing metrics failed"))?;
}
tempfile.flush()?;
tempfile.as_file().sync_all()?;
fail::fail_point!("before-persist-last-metrics-collected");
drop(tempfile.persist(&*path).map_err(|e| e.error)?);
let f = std::fs::File::open(path.parent().unwrap())?;
f.sync_all()?;
anyhow::Ok(())
}
})
.await
.with_context(|| format!("write metrics to {path:?} join error"))
.and_then(|x| x.with_context(|| format!("write metrics to {path:?}")))
}

View File

@@ -0,0 +1,455 @@
use crate::context::RequestContext;
use anyhow::Context;
use chrono::{DateTime, Utc};
use consumption_metrics::EventType;
use futures::stream::StreamExt;
use serde_with::serde_as;
use std::{sync::Arc, time::SystemTime};
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use super::{Cache, RawMetric};
/// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
/// instead of static str.
// Do not rename any of these without first consulting with data team and partner
// management.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub(super) enum Name {
/// Timeline last_record_lsn, absolute
#[serde(rename = "written_size")]
WrittenSize,
/// Timeline last_record_lsn, incremental
#[serde(rename = "written_data_bytes_delta")]
WrittenSizeDelta,
/// Timeline logical size
#[serde(rename = "timeline_logical_size")]
LogicalSize,
/// Tenant remote size
#[serde(rename = "remote_storage_size")]
RemoteSize,
/// Tenant resident size
#[serde(rename = "resident_size")]
ResidentSize,
/// Tenant synthetic size
#[serde(rename = "synthetic_storage_size")]
SyntheticSize,
}
/// Key that uniquely identifies the object this metric describes.
///
/// This is a denormalization done at the MetricsKey const methods; these should not be constructed
/// elsewhere.
#[serde_with::serde_as]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub(crate) struct MetricsKey {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub(super) tenant_id: TenantId,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) timeline_id: Option<TimelineId>,
pub(super) metric: Name,
}
impl MetricsKey {
const fn absolute_values(self) -> AbsoluteValueFactory {
AbsoluteValueFactory(self)
}
const fn incremental_values(self) -> IncrementalValueFactory {
IncrementalValueFactory(self)
}
}
/// Helper type which each individual metric kind can return to produce only absolute values.
struct AbsoluteValueFactory(MetricsKey);
impl AbsoluteValueFactory {
const fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
let key = self.0;
(key, (EventType::Absolute { time }, val))
}
fn key(&self) -> &MetricsKey {
&self.0
}
}
/// Helper type which each individual metric kind can return to produce only incremental values.
struct IncrementalValueFactory(MetricsKey);
impl IncrementalValueFactory {
#[allow(clippy::wrong_self_convention)]
const fn from_until(
self,
prev_end: DateTime<Utc>,
up_to: DateTime<Utc>,
val: u64,
) -> RawMetric {
let key = self.0;
// cannot assert prev_end < up_to because these are realtime clock based
let when = EventType::Incremental {
start_time: prev_end,
stop_time: up_to,
};
(key, (when, val))
}
fn key(&self) -> &MetricsKey {
&self.0
}
}
// the static part of a MetricsKey
impl MetricsKey {
/// Absolute value of [`Timeline::get_last_record_lsn`].
///
/// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: Name::WrittenSize,
}
.absolute_values()
}
/// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
/// previously sent, starting from the previously sent incremental time range ending at the
/// latest absolute measurement.
const fn written_size_delta(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> IncrementalValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: Name::WrittenSizeDelta,
}
.incremental_values()
}
/// Exact [`Timeline::get_current_logical_size`].
///
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
const fn timeline_logical_size(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: Name::LogicalSize,
}
.absolute_values()
}
/// [`Tenant::remote_size`]
///
/// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: None,
metric: Name::RemoteSize,
}
.absolute_values()
}
/// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
///
/// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: None,
metric: Name::ResidentSize,
}
.absolute_values()
}
/// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
///
/// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
/// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: None,
metric: Name::SyntheticSize,
}
.absolute_values()
}
}
pub(super) async fn collect_all_metrics(
cached_metrics: &Cache,
ctx: &RequestContext,
) -> Vec<RawMetric> {
use pageserver_api::models::TenantState;
let started_at = std::time::Instant::now();
let tenants = match crate::tenant::mgr::list_tenants().await {
Ok(tenants) => tenants,
Err(err) => {
tracing::error!("failed to list tenants: {:?}", err);
return vec![];
}
};
let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
if state != TenantState::Active {
None
} else {
crate::tenant::mgr::get_tenant(id, true)
.await
.ok()
.map(|tenant| (id, tenant))
}
});
let res = collect(tenants, cached_metrics, ctx).await;
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
total = res.len(),
"collected metrics"
);
res
}
async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
where
S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
{
let mut current_metrics: Vec<RawMetric> = Vec::new();
let mut tenants = std::pin::pin!(tenants);
while let Some((tenant_id, tenant)) = tenants.next().await {
let mut tenant_resident_size = 0;
for timeline in tenant.list_timelines() {
let timeline_id = timeline.timeline_id;
match TimelineSnapshot::collect(&timeline, ctx) {
Ok(Some(snap)) => {
snap.to_metrics(
tenant_id,
timeline_id,
Utc::now(),
&mut current_metrics,
cache,
);
}
Ok(None) => {}
Err(e) => {
tracing::error!(
"failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
timeline.timeline_id
);
continue;
}
}
tenant_resident_size += timeline.resident_physical_size();
}
let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
}
current_metrics
}
/// In-between abstraction to allow testing metrics without actual Tenants.
struct TenantSnapshot {
resident_size: u64,
remote_size: u64,
synthetic_size: u64,
}
impl TenantSnapshot {
/// Collect tenant status to have metrics created out of it.
///
/// `resident_size` is calculated of the timelines we had access to for other metrics, so we
/// cannot just list timelines here.
fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
TenantSnapshot {
resident_size,
remote_size: t.remote_size(),
// Note that this metric is calculated in a separate bgworker
// Here we only use cached value, which may lag behind the real latest one
synthetic_size: t.cached_synthetic_size(),
}
}
fn to_metrics(
&self,
tenant_id: TenantId,
now: DateTime<Utc>,
cached: &Cache,
metrics: &mut Vec<RawMetric>,
) {
let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
let synthetic_size = {
let factory = MetricsKey::synthetic_size(tenant_id);
let mut synthetic_size = self.synthetic_size;
if synthetic_size == 0 {
if let Some((_, value)) = cached.get(factory.key()) {
// use the latest value from previous session
synthetic_size = *value;
}
}
if synthetic_size != 0 {
// only send non-zeroes because otherwise these show up as errors in logs
Some(factory.at(now, synthetic_size))
} else {
None
}
};
metrics.extend(
[Some(remote_size), Some(resident_size), synthetic_size]
.into_iter()
.flatten(),
);
}
}
/// Internal type to make timeline metric production testable.
///
/// As this value type contains all of the information needed from a timeline to produce the
/// metrics, it can easily be created with different values in test.
struct TimelineSnapshot {
loaded_at: (Lsn, SystemTime),
last_record_lsn: Lsn,
current_exact_logical_size: Option<u64>,
}
impl TimelineSnapshot {
/// Collect the metrics from an actual timeline.
///
/// Fails currently only when [`Timeline::get_current_logical_size`] fails.
///
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
fn collect(
t: &Arc<crate::tenant::Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<Option<Self>> {
if !t.is_active() {
// no collection for broken or stopping needed, we will still keep the cached values
// though at the caller.
Ok(None)
} else {
let loaded_at = t.loaded_at;
let last_record_lsn = t.get_last_record_lsn();
let current_exact_logical_size = {
let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_id, timeline_id = %t.timeline_id);
let res = span
.in_scope(|| t.get_current_logical_size(ctx))
.context("get_current_logical_size");
match res? {
// Only send timeline logical size when it is fully calculated.
(size, is_exact) if is_exact => Some(size),
(_, _) => None,
}
};
Ok(Some(TimelineSnapshot {
loaded_at,
last_record_lsn,
current_exact_logical_size,
}))
}
}
/// Produce the timeline consumption metrics into the `metrics` argument.
fn to_metrics(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
now: DateTime<Utc>,
metrics: &mut Vec<RawMetric>,
cache: &Cache,
) {
let timeline_written_size = u64::from(self.last_record_lsn);
let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
let last_stop_time = cache
.get(written_size_delta_key.key())
.map(|(until, _val)| {
until
.incremental_timerange()
.expect("never create EventType::Absolute for written_size_delta")
.end
});
let (key, written_size_now) =
MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
// by default, use the last sent written_size as the basis for
// calculating the delta. if we don't yet have one, use the load time value.
let prev = cache
.get(&key)
.map(|(prev_at, prev)| {
// use the prev time from our last incremental update, or default to latest
// absolute update on the first round.
let prev_at = prev_at
.absolute_time()
.expect("never create EventType::Incremental for written_size");
let prev_at = last_stop_time.unwrap_or(prev_at);
(*prev_at, *prev)
})
.unwrap_or_else(|| {
// if we don't have a previous point of comparison, compare to the load time
// lsn.
let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
(DateTime::from(*loaded_at), disk_consistent_lsn.0)
});
let up_to = now;
if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
let key_value = written_size_delta_key.from_until(prev.0, up_to, delta);
// written_size_delta
metrics.push(key_value);
// written_size
metrics.push((key, written_size_now));
} else {
// the cached value was ahead of us, report zero until we've caught up
metrics.push(written_size_delta_key.from_until(prev.0, up_to, 0));
// the cached value was ahead of us, report the same until we've caught up
metrics.push((key, (written_size_now.0, prev.1)));
}
{
let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
let current_or_previous = self
.current_exact_logical_size
.or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
if let Some(size) = current_or_previous {
metrics.push(factory.at(now, size));
}
}
}
}
#[cfg(test)]
mod tests;
#[cfg(test)]
pub(crate) use tests::metric_examples;

View File

@@ -0,0 +1,297 @@
use super::*;
use std::collections::HashMap;
use std::time::SystemTime;
use utils::lsn::Lsn;
#[test]
fn startup_collected_timeline_metrics_before_advancing() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let mut metrics = Vec::new();
let cache = HashMap::new();
let initdb_lsn = Lsn(0x10000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, SystemTime::now()),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
};
let now = DateTime::<Utc>::from(SystemTime::now());
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
snap.loaded_at.1.into(),
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
#[test]
fn startup_collected_timeline_metrics_second_round() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [now, before, init] = time_backwards();
let now = DateTime::<Utc>::from(now);
let before = DateTime::<Utc>::from(before);
let initdb_lsn = Lsn(0x10000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let mut metrics = Vec::new();
let cache = HashMap::from([
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0)
]);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
#[test]
fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [now, just_before, before, init] = time_backwards();
let now = DateTime::<Utc>::from(now);
let just_before = DateTime::<Utc>::from(just_before);
let before = DateTime::<Utc>::from(before);
let initdb_lsn = Lsn(0x10000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let mut metrics = Vec::new();
let cache = HashMap::from([
// at t=before was the last time the last_record_lsn changed
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0),
// end time of this event is used for the next ones
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, just_before, 0),
]);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(just_before, now, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
#[test]
fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
// it can happen that we lose the inmemorylayer but have previously sent metrics and we
// should never go backwards
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [later, now, at_restart] = time_backwards();
// FIXME: tests would be so much easier if we did not need to juggle back and forth
// SystemTime and DateTime::<Utc> ... Could do the conversion only at upload time?
let now = DateTime::<Utc>::from(now);
let later = DateTime::<Utc>::from(later);
let before_restart = at_restart - std::time::Duration::from_secs(5 * 60);
let way_before = before_restart - std::time::Duration::from_secs(10 * 60);
let before_restart = DateTime::<Utc>::from(before_restart);
let way_before = DateTime::<Utc>::from(way_before);
let snap = TimelineSnapshot {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
current_exact_logical_size: None,
};
let mut cache = HashMap::from([
MetricsKey::written_size(tenant_id, timeline_id).at(before_restart, 100),
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
way_before,
before_restart,
// not taken into account, but the timestamps are important
999_999_999,
),
]);
let mut metrics = Vec::new();
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(
before_restart,
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, 100),
]
);
// now if we cache these metrics, and re-run while "still in recovery"
cache.extend(metrics.drain(..));
// "still in recovery", because our snapshot did not change
snap.to_metrics(tenant_id, timeline_id, later, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(now, later, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
]
);
}
#[test]
fn post_restart_current_exact_logical_size_uses_cached() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [now, at_restart] = time_backwards();
let now = DateTime::<Utc>::from(now);
let before_restart = at_restart - std::time::Duration::from_secs(5 * 60);
let before_restart = DateTime::<Utc>::from(before_restart);
let snap = TimelineSnapshot {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
current_exact_logical_size: None,
};
let cache = HashMap::from([
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(before_restart, 100)
]);
let mut metrics = Vec::new();
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
metrics.retain(|(key, _)| key.metric == Name::LogicalSize);
assert_eq!(
metrics,
&[MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 100)]
);
}
#[test]
fn post_restart_synthetic_size_uses_cached_if_available() {
let tenant_id = TenantId::generate();
let ts = TenantSnapshot {
resident_size: 1000,
remote_size: 1000,
// not yet calculated
synthetic_size: 0,
};
let now = SystemTime::now();
let before_restart = DateTime::<Utc>::from(now - std::time::Duration::from_secs(5 * 60));
let now = DateTime::<Utc>::from(now);
let cached = HashMap::from([MetricsKey::synthetic_size(tenant_id).at(before_restart, 1000)]);
let mut metrics = Vec::new();
ts.to_metrics(tenant_id, now, &cached, &mut metrics);
assert_eq!(
metrics,
&[
MetricsKey::remote_storage_size(tenant_id).at(now, 1000),
MetricsKey::resident_size(tenant_id).at(now, 1000),
MetricsKey::synthetic_size(tenant_id).at(now, 1000),
]
);
}
#[test]
fn post_restart_synthetic_size_is_not_sent_when_not_cached() {
let tenant_id = TenantId::generate();
let ts = TenantSnapshot {
resident_size: 1000,
remote_size: 1000,
// not yet calculated
synthetic_size: 0,
};
let now = SystemTime::now();
let now = DateTime::<Utc>::from(now);
let cached = HashMap::new();
let mut metrics = Vec::new();
ts.to_metrics(tenant_id, now, &cached, &mut metrics);
assert_eq!(
metrics,
&[
MetricsKey::remote_storage_size(tenant_id).at(now, 1000),
MetricsKey::resident_size(tenant_id).at(now, 1000),
// no synthetic size here
]
);
}
fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
let mut times = [std::time::SystemTime::UNIX_EPOCH; N];
times[0] = std::time::SystemTime::now();
for behind in 1..N {
times[behind] = times[0] - std::time::Duration::from_secs(behind as u64);
}
times
}
pub(crate) const fn metric_examples(
tenant_id: TenantId,
timeline_id: TimelineId,
now: DateTime<Utc>,
before: DateTime<Utc>,
) -> [RawMetric; 6] {
[
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
MetricsKey::written_size_delta(tenant_id, timeline_id).from_until(before, now, 0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0),
MetricsKey::remote_storage_size(tenant_id).at(now, 0),
MetricsKey::resident_size(tenant_id).at(now, 0),
MetricsKey::synthetic_size(tenant_id).at(now, 1),
]
}

View File

@@ -0,0 +1,443 @@
use consumption_metrics::{Event, EventChunk, IdempotencyKey, CHUNK_SIZE};
use serde_with::serde_as;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use super::{metrics::Name, Cache, MetricsKey, RawMetric};
use utils::id::{TenantId, TimelineId};
/// How the metrics from pageserver are identified.
#[serde_with::serde_as]
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, Copy, PartialEq)]
struct Ids {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub(super) tenant_id: TenantId,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) timeline_id: Option<TimelineId>,
}
#[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
pub(super) async fn upload_metrics(
client: &reqwest::Client,
metric_collection_endpoint: &reqwest::Url,
cancel: &CancellationToken,
node_id: &str,
metrics: &[RawMetric],
cached_metrics: &mut Cache,
) -> anyhow::Result<()> {
let mut uploaded = 0;
let mut failed = 0;
let started_at = std::time::Instant::now();
let mut iter = serialize_in_chunks(CHUNK_SIZE, metrics, node_id);
while let Some(res) = iter.next() {
let (chunk, body) = res?;
let event_bytes = body.len();
let is_last = iter.len() == 0;
let res = upload(client, metric_collection_endpoint, body, cancel, is_last)
.instrument(tracing::info_span!(
"upload",
%event_bytes,
uploaded,
total = metrics.len(),
))
.await;
match res {
Ok(()) => {
for (curr_key, curr_val) in chunk {
cached_metrics.insert(*curr_key, *curr_val);
}
uploaded += chunk.len();
}
Err(_) => {
// failure(s) have already been logged
//
// however this is an inconsistency: if we crash here, we will start with the
// values as uploaded. in practice, the rejections no longer happen.
failed += chunk.len();
}
}
}
let elapsed = started_at.elapsed();
tracing::info!(
uploaded,
failed,
elapsed_ms = elapsed.as_millis(),
"done sending metrics"
);
Ok(())
}
// The return type is quite ugly, but we gain testability in isolation
fn serialize_in_chunks<'a, F>(
chunk_size: usize,
input: &'a [RawMetric],
factory: F,
) -> impl ExactSizeIterator<Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>> + 'a
where
F: KeyGen<'a> + 'a,
{
use bytes::BufMut;
struct Iter<'a, F> {
inner: std::slice::Chunks<'a, RawMetric>,
chunk_size: usize,
// write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
buffer: bytes::BytesMut,
// chunk amount of events are reused to produce the serialized document
scratch: Vec<Event<Ids, Name>>,
factory: F,
}
impl<'a, F: KeyGen<'a>> Iterator for Iter<'a, F> {
type Item = Result<(&'a [RawMetric], bytes::Bytes), serde_json::Error>;
fn next(&mut self) -> Option<Self::Item> {
let chunk = self.inner.next()?;
if self.scratch.is_empty() {
// first round: create events with N strings
self.scratch.extend(
chunk
.iter()
.map(|raw_metric| raw_metric.as_event(&self.factory.generate())),
);
} else {
// next rounds: update_in_place to reuse allocations
assert_eq!(self.scratch.len(), self.chunk_size);
self.scratch
.iter_mut()
.zip(chunk.iter())
.for_each(|(slot, raw_metric)| {
raw_metric.update_in_place(slot, &self.factory.generate())
});
}
let res = serde_json::to_writer(
(&mut self.buffer).writer(),
&EventChunk {
events: (&self.scratch[..chunk.len()]).into(),
},
);
match res {
Ok(()) => Some(Ok((chunk, self.buffer.split().freeze()))),
Err(e) => Some(Err(e)),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl<'a, F: KeyGen<'a>> ExactSizeIterator for Iter<'a, F> {}
let buffer = bytes::BytesMut::new();
let inner = input.chunks(chunk_size);
let scratch = Vec::new();
Iter {
inner,
chunk_size,
buffer,
scratch,
factory,
}
}
trait RawMetricExt {
fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name>;
fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>);
}
impl RawMetricExt for RawMetric {
fn as_event(&self, key: &IdempotencyKey<'_>) -> Event<Ids, Name> {
let MetricsKey {
metric,
tenant_id,
timeline_id,
} = self.0;
let (kind, value) = self.1;
Event {
kind,
metric,
idempotency_key: key.to_string(),
value,
extra: Ids {
tenant_id,
timeline_id,
},
}
}
fn update_in_place(&self, event: &mut Event<Ids, Name>, key: &IdempotencyKey<'_>) {
use std::fmt::Write;
let MetricsKey {
metric,
tenant_id,
timeline_id,
} = self.0;
let (kind, value) = self.1;
*event = Event {
kind,
metric,
idempotency_key: {
event.idempotency_key.clear();
write!(event.idempotency_key, "{key}").unwrap();
std::mem::take(&mut event.idempotency_key)
},
value,
extra: Ids {
tenant_id,
timeline_id,
},
};
}
}
trait KeyGen<'a>: Copy {
fn generate(&self) -> IdempotencyKey<'a>;
}
impl<'a> KeyGen<'a> for &'a str {
fn generate(&self) -> IdempotencyKey<'a> {
IdempotencyKey::generate(self)
}
}
enum UploadError {
Rejected(reqwest::StatusCode),
Reqwest(reqwest::Error),
Cancelled,
}
impl std::fmt::Debug for UploadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// use same impl because backoff::retry will log this using both
std::fmt::Display::fmt(self, f)
}
}
impl std::fmt::Display for UploadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use UploadError::*;
match self {
Rejected(code) => write!(f, "server rejected the metrics with {code}"),
Reqwest(e) => write!(f, "request failed: {e}"),
Cancelled => write!(f, "cancelled"),
}
}
}
impl UploadError {
fn is_reject(&self) -> bool {
matches!(self, UploadError::Rejected(_))
}
}
// this is consumed by the test verifiers
static LAST_IN_BATCH: reqwest::header::HeaderName =
reqwest::header::HeaderName::from_static("pageserver-metrics-last-upload-in-batch");
async fn upload(
client: &reqwest::Client,
metric_collection_endpoint: &reqwest::Url,
body: bytes::Bytes,
cancel: &CancellationToken,
is_last: bool,
) -> Result<(), UploadError> {
let warn_after = 3;
let max_attempts = 10;
let res = utils::backoff::retry(
move || {
let body = body.clone();
async move {
let res = client
.post(metric_collection_endpoint.clone())
.header(reqwest::header::CONTENT_TYPE, "application/json")
.header(
LAST_IN_BATCH.clone(),
if is_last { "true" } else { "false" },
)
.body(body)
.send()
.await;
let res = res.and_then(|res| res.error_for_status());
// 10 redirects are normally allowed, so we don't need worry about 3xx
match res {
Ok(_response) => Ok(()),
Err(e) => {
let status = e.status().filter(|s| s.is_client_error());
if let Some(status) = status {
// rejection used to be a thing when the server could reject a
// whole batch of metrics if one metric was bad.
Err(UploadError::Rejected(status))
} else {
Err(UploadError::Reqwest(e))
}
}
}
}
},
UploadError::is_reject,
warn_after,
max_attempts,
"upload consumption_metrics",
utils::backoff::Cancel::new(cancel.clone(), || UploadError::Cancelled),
)
.await;
match &res {
Ok(_) => {}
Err(e) if e.is_reject() => {
// permanent errors currently do not get logged by backoff::retry
// display alternate has no effect, but keeping it here for easier pattern matching.
tracing::error!("failed to upload metrics: {e:#}");
}
Err(_) => {
// these have been logged already
}
}
res
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{DateTime, Utc};
use once_cell::sync::Lazy;
#[test]
fn chunked_serialization() {
let examples = metric_samples();
assert!(examples.len() > 1);
let factory = FixedGen::new(Utc::now(), "1", 42);
// need to use Event here because serde_json::Value uses default hashmap, not linked
// hashmap
#[derive(serde::Deserialize)]
struct EventChunk {
events: Vec<Event<Ids, Name>>,
}
let correct = serialize_in_chunks(examples.len(), &examples, factory)
.map(|res| res.unwrap().1)
.flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
.collect::<Vec<_>>();
for chunk_size in 1..examples.len() {
let actual = serialize_in_chunks(chunk_size, &examples, factory)
.map(|res| res.unwrap().1)
.flat_map(|body| serde_json::from_slice::<EventChunk>(&body).unwrap().events)
.collect::<Vec<_>>();
// if these are equal, it means that multi-chunking version works as well
assert_eq!(correct, actual);
}
}
#[derive(Clone, Copy)]
struct FixedGen<'a>(chrono::DateTime<chrono::Utc>, &'a str, u16);
impl<'a> FixedGen<'a> {
fn new(now: chrono::DateTime<chrono::Utc>, node_id: &'a str, nonce: u16) -> Self {
FixedGen(now, node_id, nonce)
}
}
impl<'a> KeyGen<'a> for FixedGen<'a> {
fn generate(&self) -> IdempotencyKey<'a> {
IdempotencyKey::for_tests(self.0, self.1, self.2)
}
}
static SAMPLES_NOW: Lazy<DateTime<Utc>> = Lazy::new(|| {
DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z")
.unwrap()
.into()
});
#[test]
fn metric_image_stability() {
// it is important that these strings stay as they are
let examples = [
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
),
(
line!(),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
),
];
let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(*SAMPLES_NOW, "1", 0);
let examples = examples.into_iter().zip(metric_samples());
for ((line, expected), (key, (kind, value))) in examples {
let e = consumption_metrics::Event {
kind,
metric: key.metric,
idempotency_key: idempotency_key.to_string(),
value,
extra: Ids {
tenant_id: key.tenant_id,
timeline_id: key.timeline_id,
},
};
let actual = serde_json::to_string(&e).unwrap();
assert_eq!(expected, actual, "example for {kind:?} from line {line}");
}
}
fn metric_samples() -> [RawMetric; 6] {
let tenant_id = TenantId::from_array([0; 16]);
let timeline_id = TimelineId::from_array([0xff; 16]);
let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z")
.unwrap()
.into();
let [now, before] = [*SAMPLES_NOW, before];
super::super::metrics::metric_examples(tenant_id, timeline_id, now, before)
}
}

View File

@@ -444,6 +444,7 @@ impl<'a> WalIngest<'a> {
// need to clear the corresponding bits in the visibility map.
let mut new_heap_blkno: Option<u32> = None;
let mut old_heap_blkno: Option<u32> = None;
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
match self.timeline.pg_version {
14 => {
@@ -470,14 +471,20 @@ impl<'a> WalIngest<'a> {
// we can't validate the remaining number of bytes without parsing
// the tuple data.
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
}
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
// non-HOT update where the new tuple goes to different page than
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
// set.
new_heap_blkno = Some(decoded.blocks[1].blkno);
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP_LOCK {
let xlrec = v14::XlHeapLock::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
@@ -497,6 +504,12 @@ impl<'a> WalIngest<'a> {
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
let xlrec = v14::XlHeapLockUpdated::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
@@ -526,14 +539,20 @@ impl<'a> WalIngest<'a> {
// we can't validate the remaining number of bytes without parsing
// the tuple data.
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
}
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
// non-HOT update where the new tuple goes to different page than
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
// set.
new_heap_blkno = Some(decoded.blocks[1].blkno);
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP_LOCK {
let xlrec = v15::XlHeapLock::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
@@ -553,6 +572,12 @@ impl<'a> WalIngest<'a> {
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
let xlrec = v15::XlHeapLockUpdated::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
@@ -582,14 +607,20 @@ impl<'a> WalIngest<'a> {
// we can't validate the remaining number of bytes without parsing
// the tuple data.
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
}
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
// non-HOT update where the new tuple goes to different page than
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
// set.
new_heap_blkno = Some(decoded.blocks[1].blkno);
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP_LOCK {
let xlrec = v16::XlHeapLock::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
@@ -609,6 +640,12 @@ impl<'a> WalIngest<'a> {
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
let xlrec = v16::XlHeapLockUpdated::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
@@ -616,7 +653,6 @@ impl<'a> WalIngest<'a> {
}
_ => {}
}
// FIXME: What about XLOG_HEAP_LOCK and XLOG_HEAP2_LOCK_UPDATED?
// Clear the VM bits if required.
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
@@ -660,7 +696,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)
@@ -676,7 +712,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno: None,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)
@@ -690,7 +726,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno: None,
old_heap_blkno,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)
@@ -717,6 +753,8 @@ impl<'a> WalIngest<'a> {
// need to clear the corresponding bits in the visibility map.
let mut new_heap_blkno: Option<u32> = None;
let mut old_heap_blkno: Option<u32> = None;
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
match self.timeline.pg_version {
@@ -745,14 +783,14 @@ impl<'a> WalIngest<'a> {
// we can't validate the remaining number of bytes without parsing
// the tuple data.
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
}
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
// non-HOT update where the new tuple goes to different page than
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
// set.
new_heap_blkno = Some(decoded.blocks[1].blkno);
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
}
pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
@@ -772,7 +810,11 @@ impl<'a> WalIngest<'a> {
}
}
pg_constants::XLOG_NEON_HEAP_LOCK => {
/* XLOG_NEON_HEAP_LOCK doesn't need special care */
let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
}
@@ -783,8 +825,6 @@ impl<'a> WalIngest<'a> {
),
}
// FIXME: What about XLOG_NEON_HEAP_LOCK?
// Clear the VM bits if required.
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
let vm_rel = RelTag {
@@ -827,7 +867,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)
@@ -843,7 +883,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno: None,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)
@@ -857,7 +897,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno: None,
old_heap_blkno,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)

View File

@@ -219,20 +219,66 @@ pub mod v14 {
old_offnum: buf.get_u16_le(),
old_infobits_set: buf.get_u8(),
flags: buf.get_u8(),
t_cid: buf.get_u32(),
t_cid: buf.get_u32_le(),
new_xmax: buf.get_u32_le(),
new_offnum: buf.get_u16_le(),
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapLock {
pub locking_xid: TransactionId,
pub offnum: OffsetNumber,
pub _padding: u16,
pub t_cid: u32,
pub infobits_set: u8,
pub flags: u8,
}
impl XlHeapLock {
pub fn decode(buf: &mut Bytes) -> XlHeapLock {
XlHeapLock {
locking_xid: buf.get_u32_le(),
offnum: buf.get_u16_le(),
_padding: buf.get_u16_le(),
t_cid: buf.get_u32_le(),
infobits_set: buf.get_u8(),
flags: buf.get_u8(),
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapLockUpdated {
pub xmax: TransactionId,
pub offnum: OffsetNumber,
pub infobits_set: u8,
pub flags: u8,
}
impl XlHeapLockUpdated {
pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
XlHeapLockUpdated {
xmax: buf.get_u32_le(),
offnum: buf.get_u16_le(),
infobits_set: buf.get_u8(),
flags: buf.get_u8(),
}
}
}
}
pub mod v15 {
pub use super::v14::{XlHeapDelete, XlHeapInsert, XlHeapMultiInsert, XlHeapUpdate};
pub use super::v14::{
XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
};
}
pub mod v16 {
pub use super::v14::{XlHeapInsert, XlHeapMultiInsert};
pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert};
use bytes::{Buf, Bytes};
use postgres_ffi::{OffsetNumber, TransactionId};
@@ -278,6 +324,26 @@ pub mod v16 {
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapLock {
pub locking_xid: TransactionId,
pub offnum: OffsetNumber,
pub infobits_set: u8,
pub flags: u8,
}
impl XlHeapLock {
pub fn decode(buf: &mut Bytes) -> XlHeapLock {
XlHeapLock {
locking_xid: buf.get_u32_le(),
offnum: buf.get_u16_le(),
infobits_set: buf.get_u8(),
flags: buf.get_u8(),
}
}
}
/* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
pub mod rm_neon {
use bytes::{Buf, Bytes};
@@ -366,6 +432,28 @@ pub mod v16 {
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlNeonHeapLock {
pub locking_xid: TransactionId,
pub t_cid: u32,
pub offnum: OffsetNumber,
pub infobits_set: u8,
pub flags: u8,
}
impl XlNeonHeapLock {
pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
XlNeonHeapLock {
locking_xid: buf.get_u32_le(),
t_cid: buf.get_u32_le(),
offnum: buf.get_u16_le(),
infobits_set: buf.get_u8(),
flags: buf.get_u8(),
}
}
}
}
}

View File

@@ -153,7 +153,7 @@ lfc_ensure_opened(void)
return false;
}
}
return false;
return true;
}
static void
@@ -222,8 +222,9 @@ lfc_change_limit_hook(int newval, void *extra)
/*
* Stats collector detach shared memory, so we should not try to access shared memory here.
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
*/
if (!lfc_ctl || !UsedShmemSegAddr || IsParallelWorker())
if (!lfc_ctl || !MyProc || !UsedShmemSegAddr || IsParallelWorker())
return;
/* Open cache file if not done yet */
@@ -640,6 +641,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ);
if (rc != BLCKSZ)
{
LWLockRelease(lfc_lock);
lfc_disable("write");
}
else
@@ -650,9 +652,8 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
entry->bitmap[chunk_offs >> 5] |= (1 << (chunk_offs & 31));
LWLockRelease(lfc_lock);
}
LWLockRelease(lfc_lock);
}
/*

View File

@@ -121,7 +121,7 @@ async fn collect_metrics_iteration(
let current_metrics = gather_proxy_io_bytes_per_client();
let metrics_to_send: Vec<Event<Ids>> = current_metrics
let metrics_to_send: Vec<Event<Ids, &'static str>> = current_metrics
.iter()
.filter_map(|(curr_key, (curr_val, curr_time))| {
let mut start_time = *curr_time;

View File

@@ -1,6 +1,7 @@
pytest_plugins = (
"fixtures.pg_version",
"fixtures.parametrize",
"fixtures.httpserver",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",

View File

@@ -0,0 +1,45 @@
from typing import Tuple
import pytest
from pytest_httpserver import HTTPServer
# TODO: mypy fails with:
# Module "fixtures.neon_fixtures" does not explicitly export attribute "PortDistributor" [attr-defined]
# from fixtures.neon_fixtures import PortDistributor
# compared to the fixtures from pytest_httpserver with same names, these are
# always function scoped, so you can check and stop the server in tests.
@pytest.fixture(scope="function")
def httpserver_ssl_context():
return None
@pytest.fixture(scope="function")
def make_httpserver(httpserver_listen_address, httpserver_ssl_context):
host, port = httpserver_listen_address
if not host:
host = HTTPServer.DEFAULT_LISTEN_HOST
if not port:
port = HTTPServer.DEFAULT_LISTEN_PORT
server = HTTPServer(host=host, port=port, ssl_context=httpserver_ssl_context)
server.start()
yield server
server.clear()
if server.is_running():
server.stop()
@pytest.fixture(scope="function")
def httpserver(make_httpserver):
server = make_httpserver
yield server
server.clear()
@pytest.fixture(scope="function")
def httpserver_listen_address(port_distributor) -> Tuple[str, int]:
port = port_distributor.get_port()
return ("localhost", port)

View File

@@ -223,12 +223,6 @@ def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistrib
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
@pytest.fixture(scope="session")
def httpserver_listen_address(port_distributor: PortDistributor):
port = port_distributor.get_port()
return ("localhost", port)
@pytest.fixture(scope="function")
def default_broker(
port_distributor: PortDistributor,
@@ -853,18 +847,6 @@ class NeonEnv:
"""Get list of safekeeper endpoints suitable for safekeepers GUC"""
return ",".join(f"localhost:{wa.port.pg}" for wa in self.safekeepers)
def timeline_dir(
self, tenant_id: TenantId, timeline_id: TimelineId, pageserver_id: Optional[int] = None
) -> Path:
"""Get a timeline directory's path based on the repo directory of the test environment"""
return (
self.tenant_dir(tenant_id, pageserver_id=pageserver_id) / "timelines" / str(timeline_id)
)
def tenant_dir(self, tenant_id: TenantId, pageserver_id: Optional[int] = None) -> Path:
"""Get a tenant directory's path based on the repo directory of the test environment"""
return self.get_pageserver(pageserver_id).workdir / "tenants" / str(tenant_id)
def get_pageserver_version(self) -> str:
bin_pageserver = str(self.neon_binpath / "pageserver")
res = subprocess.run(
@@ -1586,6 +1568,21 @@ class NeonPageserver(PgProtocol):
'.*registered custom resource manager "neon".*',
]
def timeline_dir(self, tenant_id: TenantId, timeline_id: Optional[TimelineId] = None) -> Path:
"""Get a timeline directory's path based on the repo directory of the test environment"""
if timeline_id is None:
return self.tenant_dir(tenant_id) / "timelines"
return self.tenant_dir(tenant_id) / "timelines" / str(timeline_id)
def tenant_dir(
self,
tenant_id: Optional[TenantId] = None,
) -> Path:
"""Get a tenant directory's path based on the repo directory of the test environment"""
if tenant_id is None:
return self.workdir / "tenants"
return self.workdir / "tenants" / str(tenant_id)
def start(
self,
overrides: Tuple[str, ...] = (),
@@ -2136,6 +2133,28 @@ class NeonProxy(PgProtocol):
def _wait_until_ready(self):
requests.get(f"http://{self.host}:{self.http_port}/v1/status")
def http_query(self, query, args, **kwargs):
# TODO maybe use default values if not provided
user = kwargs["user"]
password = kwargs["password"]
expected_code = kwargs.get("expected_code")
connstr = f"postgresql://{user}:{password}@{self.domain}:{self.proxy_port}/postgres"
response = requests.post(
f"https://{self.domain}:{self.external_http_port}/sql",
data=json.dumps({"query": query, "params": args}),
headers={
"Content-Type": "application/sql",
"Neon-Connection-String": connstr,
"Neon-Pool-Opt-In": "true",
},
verify=str(self.test_output_dir / "proxy.crt"),
)
if expected_code is not None:
assert response.status_code == kwargs["expected_code"], f"response: {response.json()}"
return response.json()
def get_metrics(self) -> str:
request_result = requests.get(f"http://{self.host}:{self.http_port}/metrics")
request_result.raise_for_status()

View File

@@ -236,15 +236,27 @@ def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str
response = list_prefix(neon_env_builder, prefix)
keys = response["KeyCount"]
objects = response.get("Contents", [])
common_prefixes = response.get("CommonPrefixes", [])
if keys != 0 and len(objects) == 0:
# this has been seen in one case with mock_s3:
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-4938/6000769714/index.html#suites/3556ed71f2d69272a7014df6dcb02317/ca01e4f4d8d9a11f
# looking at moto impl, it might be there's a race with common prefix (sub directory) not going away with deletes
common_prefixes = response.get("CommonPrefixes", [])
log.warn(
f"contradicting ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}"
)
remote_storage = neon_env_builder.pageserver_remote_storage
is_mock_s3 = isinstance(remote_storage, S3Storage) and not remote_storage.cleanup
if is_mock_s3:
if keys == 1 and len(objects) == 0 and len(common_prefixes) == 1:
# this has been seen in the wild by tests with the below contradicting logging
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-5322/6207777020/index.html#suites/3556ed71f2d69272a7014df6dcb02317/53b5c368b5a68865
# this seems like a mock_s3 issue
log.warn(
f"contrading ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}, assuming this means KeyCount=0"
)
keys = 0
elif keys != 0 and len(objects) == 0:
# this has been seen in one case with mock_s3:
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-4938/6000769714/index.html#suites/3556ed71f2d69272a7014df6dcb02317/ca01e4f4d8d9a11f
# looking at moto impl, it might be there's a race with common prefix (sub directory) not going away with deletes
log.warn(
f"contradicting ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}"
)
assert keys == 0, f"remote dir with prefix {prefix} is not empty after deletion: {objects}"

View File

@@ -115,6 +115,8 @@ class S3Storage:
prefix_in_bucket: str
client: S3Client
cleanup: bool
"""Is this MOCK_S3 (false) or REAL_S3 (true)"""
real: bool
endpoint: Optional[str] = None
def access_env_vars(self) -> Dict[str, str]:
@@ -265,6 +267,7 @@ class RemoteStorageKind(str, enum.Enum):
prefix_in_bucket="",
client=client,
cleanup=False,
real=False,
)
assert self == RemoteStorageKind.REAL_S3
@@ -300,6 +303,7 @@ class RemoteStorageKind(str, enum.Enum):
prefix_in_bucket=prefix_in_bucket,
client=client,
cleanup=True,
real=True,
)

View File

@@ -44,7 +44,7 @@ def measure_recovery_time(env: NeonCompare):
# Stop pageserver and remove tenant data
env.env.pageserver.stop()
timeline_dir = env.env.timeline_dir(env.tenant, env.timeline)
timeline_dir = env.env.pageserver.timeline_dir(env.tenant, env.timeline)
shutil.rmtree(timeline_dir)
# Start pageserver

View File

@@ -135,7 +135,7 @@ def test_timeline_init_break_before_checkpoint(neon_env_builder: NeonEnvBuilder)
tenant_id = env.initial_tenant
timelines_dir = env.pageserver.workdir / "tenants" / str(tenant_id) / "timelines"
timelines_dir = env.pageserver.timeline_dir(tenant_id)
old_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
initial_timeline_dirs = [d for d in timelines_dir.iterdir()]
@@ -166,7 +166,7 @@ def test_timeline_create_break_after_uninit_mark(neon_env_builder: NeonEnvBuilde
tenant_id = env.initial_tenant
timelines_dir = env.pageserver.workdir / "tenants" / str(tenant_id) / "timelines"
timelines_dir = env.pageserver.timeline_dir(tenant_id)
old_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
initial_timeline_dirs = [d for d in timelines_dir.iterdir()]

View File

@@ -20,7 +20,7 @@ from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.pg_version import PgVersion, skip_on_postgres
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, RemoteStorageUser
from fixtures.types import Lsn
@@ -151,7 +151,6 @@ def test_create_snapshot(
shutil.copytree(test_output_dir, compatibility_snapshot_dir)
@skip_on_postgres(PgVersion.V16, reason="TODO: Enable after the first Postgres 16 release")
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
@@ -209,7 +208,6 @@ def test_backward_compatibility(
), "Breaking changes are allowed by ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
@skip_on_postgres(PgVersion.V16, reason="TODO: Enable after the first Postgres 16 release")
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")

View File

@@ -1,16 +1,22 @@
import os
import pathlib
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.pg_version import PgVersion
from fixtures.utils import query_scalar
#
# Test CREATE DATABASE when there have been relmapper changes
#
def test_createdb(neon_simple_env: NeonEnv):
@pytest.mark.parametrize("strategy", ["file_copy", "wal_log"])
def test_createdb(neon_simple_env: NeonEnv, strategy: str):
env = neon_simple_env
if env.pg_version == PgVersion.V14 and strategy == "wal_log":
pytest.skip("wal_log strategy not supported on PostgreSQL 14")
env.neon_cli.create_branch("test_createdb", "empty")
endpoint = env.endpoints.create_start("test_createdb")
@@ -20,7 +26,10 @@ def test_createdb(neon_simple_env: NeonEnv):
# Cause a 'relmapper' change in the original branch
cur.execute("VACUUM FULL pg_class")
cur.execute("CREATE DATABASE foodb")
if env.pg_version == PgVersion.V14:
cur.execute("CREATE DATABASE foodb")
else:
cur.execute(f"CREATE DATABASE foodb STRATEGY={strategy}")
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")

View File

@@ -42,12 +42,11 @@ def handle_role(dbs, roles, operation):
raise ValueError("Invalid op")
fail = False
def ddl_forward_handler(request: Request, dbs: Dict[str, str], roles: Dict[str, str]) -> Response:
def ddl_forward_handler(
request: Request, dbs: Dict[str, str], roles: Dict[str, str], ddl: "DdlForwardingContext"
) -> Response:
log.info(f"Received request with data {request.get_data(as_text=True)}")
if fail:
if ddl.fail:
log.info("FAILING")
return Response(status=500, response="Failed just cuz")
if request.json is None:
@@ -72,6 +71,7 @@ class DdlForwardingContext:
self.port = port
self.dbs: Dict[str, str] = {}
self.roles: Dict[str, str] = {}
self.fail = False
endpoint = "/management/api/v2/roles_and_databases"
ddl_url = f"http://{host}:{port}{endpoint}"
self.pg.configure(
@@ -82,7 +82,7 @@ class DdlForwardingContext:
)
log.info(f"Listening on {ddl_url}")
self.server.expect_request(endpoint, method="PATCH").respond_with_handler(
lambda request: ddl_forward_handler(request, self.dbs, self.roles)
lambda request: ddl_forward_handler(request, self.dbs, self.roles, self)
)
def __enter__(self):
@@ -103,6 +103,9 @@ class DdlForwardingContext:
def wait(self, timeout=3):
self.server.wait(timeout=timeout)
def failures(self, bool):
self.fail = bool
def send_and_wait(self, query: str, timeout=3) -> List[Tuple[Any, ...]]:
res = self.send(query)
self.wait(timeout=timeout)
@@ -203,9 +206,9 @@ def test_ddl_forwarding(ddl: DdlForwardingContext):
assert ddl.dbs == {"stork": "cork"}
with pytest.raises(psycopg2.InternalError):
global fail
fail = True
ddl.failures(True)
cur.execute("CREATE DATABASE failure WITH OWNER=cork")
ddl.wait()
ddl.failures(False)
conn.close()

View File

@@ -417,7 +417,7 @@ def poor_mans_du(
largest_layer = 0
smallest_layer = None
for tenant_id, timeline_id in timelines:
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
assert timeline_dir.exists(), f"timeline dir does not exist: {timeline_dir}"
total = 0
for file in timeline_dir.iterdir():

View File

@@ -15,45 +15,45 @@ from fixtures.types import TimelineId
# Test configuration
#
# Create a table with {num_rows} rows, and perform {updates_to_perform} random
# UPDATEs on it, using {num_connections} separate connections.
num_connections = 10
num_rows = 100000
updates_to_perform = 10000
updates_performed = 0
# Run random UPDATEs on test table
async def update_table(endpoint: Endpoint):
global updates_performed
pg_conn = await endpoint.connect_async()
while updates_performed < updates_to_perform:
updates_performed += 1
id = random.randrange(1, num_rows)
await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}")
# Perform aggressive GC with 0 horizon
async def gc(env: NeonEnv, timeline: TimelineId):
pageserver_http = env.pageserver.http_client()
loop = asyncio.get_running_loop()
def do_gc():
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
with concurrent.futures.ThreadPoolExecutor() as pool:
while updates_performed < updates_to_perform:
await loop.run_in_executor(pool, do_gc)
# Create a table with {NUM_ROWS} rows, and perform {UPDATES_TO_PERFORM} random
# UPDATEs on it, using {NUM_CONNECTIONS} separate connections.
NUM_CONNECTIONS = 10
NUM_ROWS = 100000
UPDATES_TO_PERFORM = 10000
# At the same time, run UPDATEs and GC
async def update_and_gc(env: NeonEnv, endpoint: Endpoint, timeline: TimelineId):
workers = []
for _ in range(num_connections):
updates_performed = 0
# Perform aggressive GC with 0 horizon
async def gc(env: NeonEnv, timeline: TimelineId):
pageserver_http = env.pageserver.http_client()
nonlocal updates_performed
global UPDATES_TO_PERFORM
loop = asyncio.get_running_loop()
def do_gc():
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
with concurrent.futures.ThreadPoolExecutor() as pool:
while updates_performed < UPDATES_TO_PERFORM:
await loop.run_in_executor(pool, do_gc)
# Run random UPDATEs on test table
async def update_table(endpoint: Endpoint):
pg_conn = await endpoint.connect_async()
nonlocal updates_performed
while updates_performed < UPDATES_TO_PERFORM:
updates_performed += 1
id = random.randrange(1, NUM_ROWS)
await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}")
for _ in range(NUM_CONNECTIONS):
workers.append(asyncio.create_task(update_table(endpoint)))
workers.append(asyncio.create_task(gc(env, timeline)))
@@ -81,7 +81,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
f"""
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, {num_rows}) g
FROM generate_series(1, {NUM_ROWS}) g
"""
)
cur.execute("CREATE INDEX ON foo(id)")
@@ -91,7 +91,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
cur.execute("SELECT COUNT(*), SUM(counter) FROM foo")
r = cur.fetchone()
assert r is not None
assert r == (num_rows, updates_to_perform)
assert r == (NUM_ROWS, UPDATES_TO_PERFORM)
#
@@ -99,6 +99,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
# Disable time-based pitr, we will use LSN-based thresholds in the manual GC calls
neon_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
num_index_uploads = 0
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
@@ -160,5 +161,5 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
log.info(f"{num_index_uploads} index uploads after GC iteration {i}")
after = num_index_uploads
log.info(f"{after-before} new index uploads during test")
log.info(f"{after - before} new index uploads during test")
assert after - before < 5

View File

@@ -271,7 +271,7 @@ def _import(
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
dir_to_clear = env.pageserver.tenant_dir()
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)

View File

@@ -55,7 +55,7 @@ def test_basic_eviction(
for sk in env.safekeepers:
sk.stop()
timeline_path = env.timeline_dir(tenant_id, timeline_id)
timeline_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
initial_local_layers = sorted(
list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))
)
@@ -243,7 +243,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
assert by_kind["Image"] > 0
assert by_kind["Delta"] > 0
assert by_kind["InMemory"] == 0
resident_layers = list(env.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
resident_layers = list(env.pageserver.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
log.info("resident layers count before eviction: %s", len(resident_layers))
log.info("evict all layers")
@@ -251,7 +251,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
def ensure_resident_and_remote_size_metrics():
log.info("ensure that all the layers are gone")
resident_layers = list(env.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
resident_layers = list(env.pageserver.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
# we have disabled all background loops, so, this should hold
assert len(resident_layers) == 0

View File

@@ -38,7 +38,7 @@ def test_image_layer_writer_fail_before_finish(neon_simple_env: NeonEnv):
new_temp_layer_files = list(
filter(
lambda file: str(file).endswith(NeonPageserver.TEMP_FILE_SUFFIX),
[path for path in env.timeline_dir(tenant_id, timeline_id).iterdir()],
[path for path in env.pageserver.timeline_dir(tenant_id, timeline_id).iterdir()],
)
)
@@ -84,7 +84,7 @@ def test_delta_layer_writer_fail_before_finish(neon_simple_env: NeonEnv):
new_temp_layer_files = list(
filter(
lambda file: str(file).endswith(NeonPageserver.TEMP_FILE_SUFFIX),
[path for path in env.timeline_dir(tenant_id, timeline_id).iterdir()],
[path for path in env.pageserver.timeline_dir(tenant_id, timeline_id).iterdir()],
)
)

View File

@@ -0,0 +1,44 @@
import threading
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin
#
# Test branching, when a transaction is in prepared state
#
@pytest.mark.timeout(600)
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
env.neon_cli.create_branch("test_lfc_resize", "empty")
endpoint = env.endpoints.create_start(
"test_lfc_resize",
config_lines=[
"neon.file_cache_path='file.cache'",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
],
)
n_resize = 10
scale = 10
log.info("postgres is running on 'test_lfc_resize' branch")
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", "-c4", f"-T{n_resize}", "-Mprepared", connstr])
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
thread.start()
conn = endpoint.connect()
cur = conn.cursor()
for i in range(n_resize):
cur.execute(f"alter system set neon.file_cache_size_limit='{i*10}MB'")
cur.execute("select pg_reload_conf()")
time.sleep(1)
thread.join()

View File

@@ -1,265 +0,0 @@
#
# Test for collecting metrics from pageserver and proxy.
# Use mock HTTP server to receive metrics and verify that they look sane.
#
import time
from pathlib import Path
from typing import Iterator
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
PSQL,
NeonEnvBuilder,
NeonProxy,
VanillaPostgres,
wait_for_last_flush_lsn,
)
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
# ==============================================================================
# Storage metrics tests
# ==============================================================================
initial_tenant = TenantId.generate()
remote_uploaded = 0
checks = {
"written_size": lambda value: value > 0,
"resident_size": lambda value: value >= 0,
# >= 0 check here is to avoid race condition when we receive metrics before
# remote_uploaded is updated
"remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value >= 0,
# logical size may lag behind the actual size, so allow 0 here
"timeline_logical_size": lambda value: value >= 0,
}
metric_kinds_checked = set([])
#
# verify that metrics look minilally sane
#
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
log.info("received events:")
log.info(events)
for event in events:
assert event["tenant_id"] == str(
initial_tenant
), "Expecting metrics only from the initial tenant"
metric_name = event["metric"]
check = checks.get(metric_name)
# calm down mypy
if check is not None:
assert check(event["value"]), f"{metric_name} isn't valid"
global metric_kinds_checked
metric_kinds_checked.add(metric_name)
return Response(status=200)
@pytest.mark.parametrize(
"remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.LOCAL_FS]
)
def test_metric_collection(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
remote_storage_kind: RemoteStorageKind,
):
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = (
f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
"""
+ "tenant_config={pitr_interval = '0 sec'}"
)
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
# Set initial tenant of the test, that we expect the logs from
global initial_tenant
initial_tenant = neon_env_builder.initial_tenant
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)
# spin up neon, after http server is ready
env = neon_env_builder.init_start()
# Order of fixtures shutdown is not specified, and if http server gets down
# before pageserver, pageserver log might contain such errors in the end.
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_metric_collection")
endpoint = env.endpoints.create_start("test_metric_collection")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
cur.execute(
"""
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, 100000) g
"""
)
# Helper function that gets the number of given kind of remote ops from the metrics
def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
ps_metrics = env.pageserver.http_client().get_metrics()
total = 0.0
for sample in ps_metrics.query_all(
name="pageserver_remote_operation_seconds_count",
filter={
"file_kind": str(file_kind),
"op_kind": str(op_kind),
},
):
total += sample[2]
return int(total)
# upload some data to remote storage
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
global remote_uploaded
remote_uploaded = get_num_remote_ops("index", "upload")
assert remote_uploaded > 0
# wait longer than collecting interval and check that all requests are served
time.sleep(3)
httpserver.check()
global metric_kinds_checked, checks
expected_checks = set(checks.keys())
assert len(metric_kinds_checked) == len(
checks
), f"Expected to receive and check all kind of metrics, but {expected_checks - metric_kinds_checked} got uncovered"
# ==============================================================================
# Proxy metrics tests
# ==============================================================================
def proxy_metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
log.info("received events:")
log.info(events)
# perform basic sanity checks
for event in events:
assert event["metric"] == "proxy_io_bytes_per_client"
assert event["endpoint_id"] == "test_endpoint_id"
assert event["value"] >= 0
assert event["stop_time"] >= event["start_time"]
return Response(status=200)
@pytest.fixture(scope="function")
def proxy_with_metric_collector(
port_distributor: PortDistributor,
neon_binpath: Path,
httpserver_listen_address,
test_output_dir: Path,
) -> Iterator[NeonProxy]:
"""Neon proxy that routes through link auth and has metric collection enabled."""
http_port = port_distributor.get_port()
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
metric_collection_interval = "5s"
with NeonProxy(
neon_binpath=neon_binpath,
test_output_dir=test_output_dir,
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
external_http_port=external_http_port,
metric_collection_endpoint=metric_collection_endpoint,
metric_collection_interval=metric_collection_interval,
auth_backend=NeonProxy.Link(),
) as proxy:
proxy.start()
yield proxy
@pytest.mark.asyncio
async def test_proxy_metric_collection(
httpserver: HTTPServer,
proxy_with_metric_collector: NeonProxy,
vanilla_pg: VanillaPostgres,
):
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
proxy_metrics_handler
)
# do something to generate load to generate metrics
# sleep for 5 seconds to give metric collector time to collect metrics
psql = await PSQL(
host=proxy_with_metric_collector.host, port=proxy_with_metric_collector.proxy_port
).run(
"create table tbl as select * from generate_series(0,1000); select pg_sleep(5); select 42"
)
base_uri = proxy_with_metric_collector.link_auth_uri
link = await NeonProxy.find_auth_link(base_uri, psql)
psql_session_id = NeonProxy.get_session_id(base_uri, link)
await NeonProxy.activate_link_auth(vanilla_pg, proxy_with_metric_collector, psql_session_id)
assert psql.stdout is not None
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "42"
# do something to generate load to generate metrics
# sleep for 5 seconds to give metric collector time to collect metrics
psql = await PSQL(
host=proxy_with_metric_collector.host, port=proxy_with_metric_collector.proxy_port
).run("insert into tbl select * from generate_series(0,1000); select pg_sleep(5); select 42")
link = await NeonProxy.find_auth_link(base_uri, psql)
psql_session_id = NeonProxy.get_session_id(base_uri, link)
await NeonProxy.activate_link_auth(
vanilla_pg, proxy_with_metric_collector, psql_session_id, create_user=False
)
assert psql.stdout is not None
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "42"
httpserver.check()

View File

@@ -3,7 +3,6 @@
import time
from collections import defaultdict
from pathlib import Path
from typing import Any, DefaultDict, Dict, Tuple
import pytest
@@ -115,7 +114,7 @@ def test_ondemand_download_large_rel(
env.pageserver.stop()
# remove all the layer files
for layer in (Path(env.pageserver.workdir) / "tenants").glob("*/timelines/*/*-*_*"):
for layer in env.pageserver.tenant_dir().glob("*/timelines/*/*-*_*"):
log.info(f"unlinking layer {layer}")
layer.unlink()
@@ -237,7 +236,7 @@ def test_ondemand_download_timetravel(
env.pageserver.stop()
# remove all the layer files
for layer in (Path(env.pageserver.workdir) / "tenants").glob("*/timelines/*/*-*_*"):
for layer in env.pageserver.tenant_dir().glob("*/timelines/*/*-*_*"):
log.info(f"unlinking layer {layer}")
layer.unlink()
@@ -301,6 +300,7 @@ def test_ondemand_download_timetravel(
# they are present only in the remote storage, only locally, or both.
# It should not change.
assert filled_current_physical == get_api_current_physical_size()
endpoint_old.stop()
#
@@ -323,8 +323,8 @@ def test_download_remote_layers_api(
"compaction_period": "0s",
# small checkpoint distance to create more delta layer files
"checkpoint_distance": f"{1 * 1024 ** 2}", # 1 MB
"compaction_threshold": "1",
"image_creation_threshold": "1",
"compaction_threshold": "999999",
"image_creation_threshold": "999999",
"compaction_target_size": f"{1 * 1024 ** 2}", # 1 MB
}
)
@@ -357,8 +357,20 @@ def test_download_remote_layers_api(
tenant_id, timeline_id, "pageserver_resident_physical_size"
)
# Shut down safekeepers before starting the pageserver.
# If we don't, they might stream us more WAL.
for sk in env.safekeepers:
sk.stop()
# it is sad we cannot do a flush inmem layer without compaction, but
# working around with very high layer0 count and image layer creation
# threshold
client.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload_queue_empty(client, tenant_id, timeline_id)
filled_current_physical = get_api_current_physical_size()
log.info(filled_current_physical)
log.info(f"filled_current_physical: {filled_current_physical}")
filled_size = get_resident_physical_size()
log.info(f"filled_size: {filled_size}")
assert filled_current_physical == filled_size, "we don't yet do layer eviction"
@@ -366,18 +378,10 @@ def test_download_remote_layers_api(
env.pageserver.stop()
# remove all the layer files
# XXX only delete some of the layer files, to show that it really just downloads all the layers
for layer in (Path(env.pageserver.workdir) / "tenants").glob("*/timelines/*/*-*_*"):
for layer in env.pageserver.tenant_dir().glob("*/timelines/*/*-*_*"):
log.info(f"unlinking layer {layer.name}")
layer.unlink()
# Shut down safekeepers before starting the pageserver.
# If we don't, the tenant's walreceiver handler will trigger the
# the logical size computation task, and that downloads layes,
# which makes our assertions on size fail.
for sk in env.safekeepers:
sk.stop(immediate=True)
##### Second start, restore the data and ensure it's the same
env.pageserver.start(extra_env_vars={"FAILPOINTS": "remote-storage-download-pre-rename=return"})
env.pageserver.allowed_errors.extend(
@@ -391,32 +395,21 @@ def test_download_remote_layers_api(
###### Phase 1: exercise download error code path
# comparison here is requiring the size to be at least the previous size, because it's possible received WAL after last_flush_lsn_upload
# witnessed for example difference of 29827072 (filled_current_physical) to 29868032 (here) is no good reason to fail a test.
this_time = get_api_current_physical_size()
assert (
filled_current_physical <= this_time
filled_current_physical == this_time
), "current_physical_size is sum of loaded layer sizes, independent of whether local or remote"
if filled_current_physical != this_time:
log.info(
f"fixing up filled_current_physical from {filled_current_physical} to {this_time} ({this_time - filled_current_physical})"
)
filled_current_physical = this_time
post_unlink_size = get_resident_physical_size()
log.info(f"post_unlink_size: {post_unlink_size}")
assert (
post_unlink_size < filled_size
), "we just deleted layers and didn't cause anything to re-download them yet"
assert filled_size - post_unlink_size > 5 * (
1024**2
), "we may be downloading some layers as part of tenant activation"
# issue downloads that we know will fail
info = client.timeline_download_remote_layers(
tenant_id,
timeline_id,
# allow some concurrency to unveil potential concurrency bugs
max_concurrent_downloads=10,
errors_ok=True,
at_least_one_download=False,
@@ -425,9 +418,9 @@ def test_download_remote_layers_api(
assert info["state"] == "Completed"
assert info["total_layer_count"] > 0
assert info["successful_download_count"] == 0
assert (
info["failed_download_count"] > 0
) # can't assert == total_layer_count because attach + tenant status downloads some layers
# can't assert == total_layer_count because timeline_detail also tries to
# download layers for logical size, but this might not always hold.
assert info["failed_download_count"] > 0
assert (
info["total_layer_count"]
== info["successful_download_count"] + info["failed_download_count"]
@@ -436,7 +429,6 @@ def test_download_remote_layers_api(
assert (
get_resident_physical_size() == post_unlink_size
), "didn't download anything new due to failpoint"
# would be nice to assert that the layers in the layer map are still RemoteLayer
##### Retry, this time without failpoints
client.configure_failpoints(("remote-storage-download-pre-rename", "off"))

View File

@@ -0,0 +1,481 @@
import json
import time
from dataclasses import dataclass
from pathlib import Path
from queue import SimpleQueue
from typing import Any, Dict, Set
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
wait_for_last_flush_lsn,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId, TimelineId
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@pytest.mark.parametrize(
"remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.LOCAL_FS]
)
def test_metric_collection(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
remote_storage_kind: RemoteStorageKind,
):
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
uploads: SimpleQueue[Any] = SimpleQueue()
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
assert is_last in ["true", "false"]
uploads.put((events, is_last == "true"))
return Response(status=200)
# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)
# spin up neon, after http server is ready
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
cur.execute(
"""
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, 100000) g
"""
)
# Helper function that gets the number of given kind of remote ops from the metrics
def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
ps_metrics = env.pageserver.http_client().get_metrics()
total = 0.0
for sample in ps_metrics.query_all(
name="pageserver_remote_operation_seconds_count",
filter={
"file_kind": str(file_kind),
"op_kind": str(op_kind),
},
):
total += sample[2]
return int(total)
remote_uploaded = 0
# upload some data to remote storage
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
remote_uploaded = get_num_remote_ops("index", "upload")
assert remote_uploaded > 0
# we expect uploads at 1Hz, on busy runners this could be too optimistic,
# so give 5s we only want to get the following upload after "ready" value.
timeout = 5
# these strings in the upload queue allow synchronizing with the uploads
# and the main test execution
uploads.put("ready")
# note that this verifier graph should live across restarts as long as the
# cache file lives
v = MetricsVerifier()
while True:
events = uploads.get(timeout=timeout)
if events == "ready":
(events, is_last) = uploads.get(timeout=timeout)
v.ingest(events, is_last)
break
else:
(events, is_last) = events
v.ingest(events, is_last)
if "synthetic_storage_size" not in v.accepted_event_names():
log.info("waiting for synthetic storage size to be calculated and uploaded...")
rounds = 0
while "synthetic_storage_size" not in v.accepted_event_names():
(events, is_last) = uploads.get(timeout=timeout)
v.ingest(events, is_last)
rounds += 1
assert rounds < 10, "did not get synthetic_storage_size in 10 uploads"
# once we have it in verifiers, it will assert that future batches will contain it
env.pageserver.stop()
time.sleep(1)
uploads.put("ready")
env.pageserver.start()
while True:
events = uploads.get(timeout=timeout)
if events == "ready":
(events, is_last) = uploads.get(timeout=timeout * 3)
v.ingest(events, is_last)
(events, is_last) = uploads.get(timeout=timeout)
v.ingest(events, is_last)
break
else:
(events, is_last) = events
v.ingest(events, is_last)
httpserver.check()
def test_metric_collection_cleans_up_tempfile(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
):
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
# this should be Union[str, Tuple[List[Any], bool]], but it will make unpacking much more verbose
uploads: SimpleQueue[Any] = SimpleQueue()
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
is_last = request.headers["pageserver-metrics-last-upload-in-batch"]
assert is_last in ["true", "false"]
uploads.put((events, is_last == "true"))
return Response(status=200)
# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)
# spin up neon, after http server is ready
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
pageserver_http = env.pageserver.http_client()
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
cur.execute(
"""
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, 100000) g
"""
)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
# we expect uploads at 1Hz, on busy runners this could be too optimistic,
# so give 5s we only want to get the following upload after "ready" value.
timeout = 5
# these strings in the upload queue allow synchronizing with the uploads
# and the main test execution
uploads.put("ready")
while True:
events = uploads.get(timeout=timeout)
if events == "ready":
(events, _) = uploads.get(timeout=timeout)
break
# should really configure an env?
pageserver_http.configure_failpoints(("before-persist-last-metrics-collected", "exit"))
time.sleep(3)
env.pageserver.stop()
initially = iterate_pageserver_workdir(env.pageserver.workdir, "last_consumption_metrics.json")
assert (
len(initially.matching) == 2
), f"expecting actual file and tempfile, but not found: {initially.matching}"
uploads.put("ready")
env.pageserver.start()
while True:
events = uploads.get(timeout=timeout * 3)
if events == "ready":
(events, _) = uploads.get(timeout=timeout)
break
env.pageserver.stop()
later = iterate_pageserver_workdir(env.pageserver.workdir, "last_consumption_metrics.json")
# it is possible we shutdown the pageserver right at the correct time, so the old tempfile
# is gone, but we also have a new one.
only = set(["last_consumption_metrics.json"])
assert (
initially.matching.intersection(later.matching) == only
), "only initial tempfile should had been removed"
assert initially.other.issuperset(later.other), "no other files should had been removed"
@dataclass
class PrefixPartitionedFiles:
matching: Set[str]
other: Set[str]
def iterate_pageserver_workdir(path: Path, prefix: str) -> PrefixPartitionedFiles:
"""
Iterates the files in the workdir, returns two sets:
- files with the prefix
- files without the prefix
"""
matching = set()
other = set()
for entry in path.iterdir():
if not entry.is_file():
continue
if not entry.name.startswith(prefix):
other.add(entry.name)
else:
matching.add(entry.name)
return PrefixPartitionedFiles(matching, other)
class MetricsVerifier:
"""
A graph of per tenant per timeline verifiers, allowing one for each
metric
"""
def __init__(self):
self.tenants: Dict[TenantId, TenantMetricsVerifier] = {}
pass
def ingest(self, events, is_last):
stringified = json.dumps(events, indent=2)
log.info(f"ingesting: {stringified}")
for event in events:
id = TenantId(event["tenant_id"])
if id not in self.tenants:
self.tenants[id] = TenantMetricsVerifier(id)
self.tenants[id].ingest(event)
if is_last:
for t in self.tenants.values():
t.post_batch()
def accepted_event_names(self) -> Set[str]:
names: Set[str] = set()
for t in self.tenants.values():
names = names.union(t.accepted_event_names())
return names
class TenantMetricsVerifier:
def __init__(self, id: TenantId):
self.id = id
self.timelines: Dict[TimelineId, TimelineMetricsVerifier] = {}
self.state: Dict[str, Any] = {}
def ingest(self, event):
assert TenantId(event["tenant_id"]) == self.id
if "timeline_id" in event:
id = TimelineId(event["timeline_id"])
if id not in self.timelines:
self.timelines[id] = TimelineMetricsVerifier(self.id, id)
self.timelines[id].ingest(event)
else:
name = event["metric"]
if name not in self.state:
self.state[name] = PER_METRIC_VERIFIERS[name]()
self.state[name].ingest(event, self)
def post_batch(self):
for v in self.state.values():
v.post_batch(self)
for tl in self.timelines.values():
tl.post_batch(self)
def accepted_event_names(self) -> Set[str]:
names = set(self.state.keys())
for t in self.timelines.values():
names = names.union(t.accepted_event_names())
return names
class TimelineMetricsVerifier:
def __init__(self, tenant_id: TenantId, timeline_id: TimelineId):
self.id = timeline_id
self.state: Dict[str, Any] = {}
def ingest(self, event):
name = event["metric"]
if name not in self.state:
self.state[name] = PER_METRIC_VERIFIERS[name]()
self.state[name].ingest(event, self)
def post_batch(self, parent):
for v in self.state.values():
v.post_batch(self)
def accepted_event_names(self) -> Set[str]:
return set(self.state.keys())
class CannotVerifyAnything:
"""We can only assert types, but rust already has types, so no need."""
def __init__(self):
pass
def ingest(self, event, parent):
pass
def post_batch(self, parent):
pass
class WrittenDataVerifier:
def __init__(self):
self.values = []
pass
def ingest(self, event, parent):
self.values.append(event["value"])
def post_batch(self, parent):
pass
class WrittenDataDeltaVerifier:
def __init__(self):
self.value = None
self.sum = 0
self.timerange = None
pass
def ingest(self, event, parent):
assert event["type"] == "incremental"
self.value = event["value"]
self.sum += event["value"]
start = event["start_time"]
stop = event["stop_time"]
timerange = (start, stop)
if self.timerange is not None:
# this holds across restarts
assert self.timerange[1] == timerange[0], "time ranges should be continious"
self.timerange = timerange
def post_batch(self, parent):
absolute = parent.state["written_size"]
if len(absolute.values) == 1:
# in tests this comes up as initdb execution, so we can have 0 or
# about 30MB on the first event. it is not consistent.
assert self.value is not None
else:
assert self.value == absolute.values[-1] - absolute.values[-2]
# sounds like this should hold, but it will not for branches -- probably related to timing
# assert self.sum == absolute.latest
class SyntheticSizeVerifier:
def __init__(self):
self.prev = None
self.value = None
pass
def ingest(self, event, parent):
assert isinstance(parent, TenantMetricsVerifier)
assert event["type"] == "absolute"
value = event["value"]
self.value = value
def post_batch(self, parent):
if self.prev is not None:
# this is assuming no one goes and deletes the cache file
assert (
self.value is not None
), "after calculating first synthetic size, cached or more recent should be sent"
self.prev = self.value
self.value = None
PER_METRIC_VERIFIERS = {
"remote_storage_size": CannotVerifyAnything,
"resident_size": CannotVerifyAnything,
"written_size": WrittenDataVerifier,
"written_data_bytes_delta": WrittenDataDeltaVerifier,
"timeline_logical_size": CannotVerifyAnything,
"synthetic_storage_size": SyntheticSizeVerifier,
}

View File

@@ -346,23 +346,13 @@ def test_sql_over_http_pool(static_proxy: NeonProxy):
static_proxy.safe_psql("create user http_auth with password 'http' superuser")
def get_pid(status: int, pw: str) -> Any:
connstr = (
f"postgresql://http_auth:{pw}@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
return static_proxy.http_query(
"SELECT pid FROM pg_stat_activity WHERE state = 'active'",
[],
user="http_auth",
password=pw,
expected_code=status,
)
response = requests.post(
f"https://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
data=json.dumps(
{"query": "SELECT pid FROM pg_stat_activity WHERE state = 'active'", "params": []}
),
headers={
"Content-Type": "application/sql",
"Neon-Connection-String": connstr,
"Neon-Pool-Opt-In": "true",
},
verify=str(static_proxy.test_output_dir / "proxy.crt"),
)
assert response.status_code == status
return response.json()
pid1 = get_pid(200, "http")["rows"][0]["pid"]
@@ -387,3 +377,23 @@ def test_sql_over_http_pool(static_proxy: NeonProxy):
# old password should not work
res = get_pid(400, "http")
assert "password authentication failed for user" in res["message"]
# Beginning a transaction should not impact the next query,
# which might come from a completely different client.
@pytest.mark.xfail(reason="not implemented")
def test_http_pool_begin(static_proxy: NeonProxy):
static_proxy.safe_psql("create user http_auth with password 'http' superuser")
def query(status: int, query: str, *args) -> Any:
static_proxy.http_query(
query,
args,
user="http_auth",
password="http",
expected_code=status,
)
query(200, "BEGIN;")
query(400, "garbage-lol(&(&(&(&") # Intentional error to break the transaction
query(200, "SELECT 1;") # Query that should succeed regardless of the transaction

View File

@@ -0,0 +1,113 @@
from pathlib import Path
from typing import Iterator
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
PSQL,
NeonProxy,
VanillaPostgres,
)
from fixtures.port_distributor import PortDistributor
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
def proxy_metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
log.info("received events:")
log.info(events)
# perform basic sanity checks
for event in events:
assert event["metric"] == "proxy_io_bytes_per_client"
assert event["endpoint_id"] == "test_endpoint_id"
assert event["value"] >= 0
assert event["stop_time"] >= event["start_time"]
return Response(status=200)
@pytest.fixture(scope="function")
def proxy_with_metric_collector(
port_distributor: PortDistributor,
neon_binpath: Path,
httpserver_listen_address,
test_output_dir: Path,
) -> Iterator[NeonProxy]:
"""Neon proxy that routes through link auth and has metric collection enabled."""
http_port = port_distributor.get_port()
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
metric_collection_interval = "5s"
with NeonProxy(
neon_binpath=neon_binpath,
test_output_dir=test_output_dir,
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
external_http_port=external_http_port,
metric_collection_endpoint=metric_collection_endpoint,
metric_collection_interval=metric_collection_interval,
auth_backend=NeonProxy.Link(),
) as proxy:
proxy.start()
yield proxy
@pytest.mark.asyncio
async def test_proxy_metric_collection(
httpserver: HTTPServer,
proxy_with_metric_collector: NeonProxy,
vanilla_pg: VanillaPostgres,
):
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
proxy_metrics_handler
)
# do something to generate load to generate metrics
# sleep for 5 seconds to give metric collector time to collect metrics
psql = await PSQL(
host=proxy_with_metric_collector.host, port=proxy_with_metric_collector.proxy_port
).run(
"create table tbl as select * from generate_series(0,1000); select pg_sleep(5); select 42"
)
base_uri = proxy_with_metric_collector.link_auth_uri
link = await NeonProxy.find_auth_link(base_uri, psql)
psql_session_id = NeonProxy.get_session_id(base_uri, link)
await NeonProxy.activate_link_auth(vanilla_pg, proxy_with_metric_collector, psql_session_id)
assert psql.stdout is not None
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "42"
# do something to generate load to generate metrics
# sleep for 5 seconds to give metric collector time to collect metrics
psql = await PSQL(
host=proxy_with_metric_collector.host, port=proxy_with_metric_collector.proxy_port
).run("insert into tbl select * from generate_series(0,1000); select pg_sleep(5); select 42")
link = await NeonProxy.find_auth_link(base_uri, psql)
psql_session_id = NeonProxy.get_session_id(base_uri, link)
await NeonProxy.activate_link_auth(
vanilla_pg, proxy_with_metric_collector, psql_session_id, create_user=False
)
assert psql.stdout is not None
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "42"
httpserver.check()

View File

@@ -6,7 +6,6 @@ import queue
import shutil
import threading
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import pytest
@@ -137,7 +136,7 @@ def test_remote_storage_backup_and_restore(
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
dir_to_clear = env.pageserver.tenant_dir()
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
@@ -353,7 +352,7 @@ def test_remote_storage_upload_queue_retries(
env.pageserver.stop(immediate=True)
env.endpoints.stop_all()
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
dir_to_clear = env.pageserver.tenant_dir()
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
@@ -488,7 +487,7 @@ def test_remote_timeline_client_calls_started_metric(
env.pageserver.stop(immediate=True)
env.endpoints.stop_all()
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
dir_to_clear = env.pageserver.tenant_dir()
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
@@ -533,7 +532,7 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
timeline_path = env.timeline_dir(tenant_id, timeline_id)
timeline_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
client = env.pageserver.http_client()
@@ -704,7 +703,9 @@ def test_empty_branch_remote_storage_upload_on_restart(
# index upload is now hitting the failpoint, it should block the shutdown
env.pageserver.stop(immediate=True)
local_metadata = env.timeline_dir(env.initial_tenant, new_branch_timeline_id) / "metadata"
local_metadata = (
env.pageserver.timeline_dir(env.initial_tenant, new_branch_timeline_id) / "metadata"
)
assert local_metadata.is_file()
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)

View File

@@ -299,7 +299,7 @@ def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder):
# tenant is created with defaults, as in without config file
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
config_path = env.pageserver.workdir / "tenants" / str(tenant_id) / "config"
config_path = env.pageserver.tenant_dir(tenant_id) / "config"
assert config_path.exists(), "config file is always initially created"
http_client = env.pageserver.http_client()

View File

@@ -89,7 +89,7 @@ def test_tenant_delete_smoke(
tenant_delete_wait_completed(ps_http, tenant_id, iterations)
tenant_path = env.tenant_dir(tenant_id=tenant_id)
tenant_path = env.pageserver.tenant_dir(tenant_id)
assert not tenant_path.exists()
if remote_storage_kind in available_s3_storages():
@@ -269,7 +269,7 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations)
tenant_dir = env.tenant_dir(tenant_id)
tenant_dir = env.pageserver.tenant_dir(tenant_id)
# Check local is empty
assert not tenant_dir.exists()
@@ -366,7 +366,7 @@ def test_tenant_delete_is_resumed_on_attach(
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = env.pageserver.workdir / "tenants"
dir_to_clear = env.pageserver.tenant_dir()
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
@@ -379,7 +379,7 @@ def test_tenant_delete_is_resumed_on_attach(
wait_tenant_status_404(ps_http, tenant_id, iterations)
# we shouldn've created tenant dir on disk
tenant_path = env.tenant_dir(tenant_id=tenant_id)
tenant_path = env.pageserver.tenant_dir(tenant_id)
assert not tenant_path.exists()
if remote_storage_kind in available_s3_storages():

View File

@@ -119,65 +119,6 @@ def test_tenant_reattach(
num_connections = 10
num_rows = 100000
updates_to_perform = 0
updates_started = 0
updates_finished = 0
# Run random UPDATEs on test table. On failure, try again.
async def update_table(pg_conn: asyncpg.Connection):
global updates_started, updates_finished, updates_to_perform
while updates_started < updates_to_perform or updates_to_perform == 0:
updates_started += 1
id = random.randrange(1, num_rows)
# Loop to retry until the UPDATE succeeds
while True:
try:
await pg_conn.fetchrow(f"UPDATE t SET counter = counter + 1 WHERE id = {id}")
updates_finished += 1
if updates_finished % 1000 == 0:
log.info(f"update {updates_finished} / {updates_to_perform}")
break
except asyncpg.PostgresError as e:
# Received error from Postgres. Log it, sleep a little, and continue
log.info(f"UPDATE error: {e}")
await asyncio.sleep(0.1)
async def sleep_and_reattach(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
global updates_started, updates_finished, updates_to_perform
# Wait until we have performed some updates
wait_until(20, 0.5, lambda: updates_finished > 500)
log.info("Detaching tenant")
pageserver_http.tenant_detach(tenant_id)
await asyncio.sleep(1)
log.info("Re-attaching tenant")
pageserver_http.tenant_attach(tenant_id)
log.info("Re-attach finished")
# Continue with 5000 more updates
updates_to_perform = updates_started + 5000
# async guts of test_tenant_reattach_while_bysy test
async def reattach_while_busy(
env: NeonEnv, endpoint: Endpoint, pageserver_http: PageserverHttpClient, tenant_id: TenantId
):
workers = []
for _ in range(num_connections):
pg_conn = await endpoint.connect_async()
workers.append(asyncio.create_task(update_table(pg_conn)))
workers.append(asyncio.create_task(sleep_and_reattach(pageserver_http, tenant_id)))
await asyncio.gather(*workers)
assert updates_finished == updates_to_perform
# Detach and re-attach tenant, while compute is busy running queries.
#
@@ -226,6 +167,62 @@ def test_tenant_reattach_while_busy(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
updates_started = 0
updates_finished = 0
updates_to_perform = 0
# Run random UPDATEs on test table. On failure, try again.
async def update_table(pg_conn: asyncpg.Connection):
nonlocal updates_started, updates_finished, updates_to_perform
while updates_started < updates_to_perform or updates_to_perform == 0:
updates_started += 1
id = random.randrange(1, num_rows)
# Loop to retry until the UPDATE succeeds
while True:
try:
await pg_conn.fetchrow(f"UPDATE t SET counter = counter + 1 WHERE id = {id}")
updates_finished += 1
if updates_finished % 1000 == 0:
log.info(f"update {updates_finished} / {updates_to_perform}")
break
except asyncpg.PostgresError as e:
# Received error from Postgres. Log it, sleep a little, and continue
log.info(f"UPDATE error: {e}")
await asyncio.sleep(0.1)
async def sleep_and_reattach(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
nonlocal updates_started, updates_finished, updates_to_perform
# Wait until we have performed some updates
wait_until(20, 0.5, lambda: updates_finished > 500)
log.info("Detaching tenant")
pageserver_http.tenant_detach(tenant_id)
await asyncio.sleep(1)
log.info("Re-attaching tenant")
pageserver_http.tenant_attach(tenant_id)
log.info("Re-attach finished")
# Continue with 5000 more updates
updates_to_perform = updates_started + 5000
# async guts of test_tenant_reattach_while_bysy test
async def reattach_while_busy(
env: NeonEnv, endpoint: Endpoint, pageserver_http: PageserverHttpClient, tenant_id: TenantId
):
nonlocal updates_to_perform, updates_finished
workers = []
for _ in range(num_connections):
pg_conn = await endpoint.connect_async()
workers.append(asyncio.create_task(update_table(pg_conn)))
workers.append(asyncio.create_task(sleep_and_reattach(pageserver_http, tenant_id)))
await asyncio.gather(*workers)
assert updates_finished == updates_to_perform
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
@@ -289,7 +286,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
)
# assert tenant exists on disk
assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert env.pageserver.tenant_dir(tenant_id).exists()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
# we rely upon autocommit after each statement
@@ -332,7 +329,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
log.info("gc thread returned")
# check that nothing is left on disk for deleted tenant
assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert not env.pageserver.tenant_dir(tenant_id).exists()
with pytest.raises(
expected_exception=PageserverApiException, match=f"NotFound: tenant {tenant_id}"
@@ -357,7 +354,7 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv):
)
# assert tenant exists on disk
assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert env.pageserver.tenant_dir(tenant_id).exists()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
# we rely upon autocommit after each statement
@@ -386,7 +383,7 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv):
log.info("ignored tenant detached without error")
# check that nothing is left on disk for deleted tenant
assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert not env.pageserver.tenant_dir(tenant_id).exists()
# assert the tenant does not exists in the Pageserver
tenants_after_detach = [tenant["id"] for tenant in client.tenant_list()]
@@ -413,7 +410,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv):
)
# assert tenant exists on disk
assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert env.pageserver.tenant_dir(tenant_id).exists()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
# we rely upon autocommit after each statement
@@ -430,7 +427,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv):
log.info("regular tenant detached without error")
# check that nothing is left on disk for deleted tenant
assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert not env.pageserver.tenant_dir(tenant_id).exists()
# assert the tenant does not exists in the Pageserver
tenants_after_detach = [tenant["id"] for tenant in client.tenant_list()]
@@ -531,7 +528,7 @@ def test_ignored_tenant_reattach(
pageserver_http = env.pageserver.http_client()
ignored_tenant_id, _ = env.neon_cli.create_tenant()
tenant_dir = env.pageserver.workdir / "tenants" / str(ignored_tenant_id)
tenant_dir = env.pageserver.tenant_dir(ignored_tenant_id)
tenants_before_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()]
tenants_before_ignore.sort()
timelines_before_ignore = [
@@ -622,7 +619,7 @@ def test_ignored_tenant_download_missing_layers(
# ignore the tenant and remove its layers
pageserver_http.tenant_ignore(tenant_id)
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
layers_removed = False
for dir_entry in timeline_dir.iterdir():
if dir_entry.name.startswith("00000"):
@@ -675,7 +672,7 @@ def test_ignored_tenant_stays_broken_without_metadata(
# ignore the tenant and remove its metadata
pageserver_http.tenant_ignore(tenant_id)
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
metadata_removed = False
for dir_entry in timeline_dir.iterdir():
if dir_entry.name == "metadata":

View File

@@ -216,7 +216,7 @@ def switch_pg_to_new_pageserver(
endpoint.start()
timeline_to_detach_local_path = env.timeline_dir(tenant_id, timeline_id)
timeline_to_detach_local_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
files_before_detach = os.listdir(timeline_to_detach_local_path)
assert (
"metadata" in files_before_detach
@@ -561,7 +561,7 @@ def test_emergency_relocate_with_branches_slow_replay(
# simpler than initializing a new one from scratch, but the effect on the single tenant
# is the same.
env.pageserver.stop(immediate=True)
shutil.rmtree(env.pageserver.workdir / "tenants" / str(tenant_id))
shutil.rmtree(env.pageserver.tenant_dir(tenant_id))
env.pageserver.start()
# This fail point will pause the WAL ingestion on the main branch, after the
@@ -709,7 +709,7 @@ def test_emergency_relocate_with_branches_createdb(
# Kill the pageserver, remove the tenant directory, and restart
env.pageserver.stop(immediate=True)
shutil.rmtree(env.pageserver.workdir / "tenants" / str(tenant_id))
shutil.rmtree(env.pageserver.tenant_dir(tenant_id))
env.pageserver.start()
# Wait before ingesting the WAL for CREATE DATABASE on the main branch. The original

View File

@@ -27,7 +27,7 @@ from prometheus_client.samples import Sample
def test_tenant_creation_fails(neon_simple_env: NeonEnv):
tenants_dir = Path(neon_simple_env.pageserver.workdir) / "tenants"
tenants_dir = neon_simple_env.pageserver.tenant_dir()
initial_tenants = sorted(
map(lambda t: t.split()[0], neon_simple_env.neon_cli.list_tenants().stdout.splitlines())
)
@@ -320,13 +320,7 @@ def test_pageserver_with_empty_tenants(
)
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(
Path(env.pageserver.workdir)
/ "tenants"
/ str(tenant_with_empty_timelines)
/ "timelines"
)
1 for _p in Path.iterdir(env.pageserver.timeline_dir(tenant_with_empty_timelines))
)
assert (
files_in_timelines_dir == 0
@@ -337,9 +331,7 @@ def test_pageserver_with_empty_tenants(
env.pageserver.stop()
tenant_without_timelines_dir = env.initial_tenant
shutil.rmtree(
Path(env.pageserver.workdir) / "tenants" / str(tenant_without_timelines_dir) / "timelines"
)
shutil.rmtree(env.pageserver.timeline_dir(tenant_without_timelines_dir))
env.pageserver.start()

View File

@@ -179,9 +179,7 @@ def test_tenants_attached_after_download(
env.pageserver.stop()
timeline_dir = (
Path(env.pageserver.workdir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
)
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
local_layer_deleted = False
for path in Path.iterdir(timeline_dir):
if path.name.startswith("00000"):
@@ -259,7 +257,7 @@ def test_tenant_redownloads_truncated_file_on_startup(
env.endpoints.stop_all()
env.pageserver.stop()
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
local_layer_truncated = None
for path in Path.iterdir(timeline_dir):
if path.name.startswith("00000"):

View File

@@ -32,7 +32,9 @@ def test_threshold_based_eviction(
synthetic_size_calculation_interval="2s"
metric_collection_endpoint="http://{host}:{port}/nonexistent"
"""
metrics_refused_log_line = ".*metrics endpoint refused the sent metrics.*/nonexistent.*"
metrics_refused_log_line = (
".*metrics_collection:.* upload consumption_metrics (still failed|failed, will retry).*"
)
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.append(metrics_refused_log_line)

View File

@@ -3,7 +3,6 @@ import os
import queue
import shutil
import threading
from pathlib import Path
import pytest
import requests
@@ -72,13 +71,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
"test_ancestor_branch_delete_branch1", "test_ancestor_branch_delete_parent"
)
timeline_path = (
env.pageserver.workdir
/ "tenants"
/ str(env.initial_tenant)
/ "timelines"
/ str(parent_timeline_id)
)
timeline_path = env.pageserver.timeline_dir(env.initial_tenant, parent_timeline_id)
with pytest.raises(
PageserverApiException, match="Cannot delete timeline which has child timelines"
@@ -89,13 +82,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
assert exc.value.status_code == 412
timeline_path = (
env.pageserver.workdir
/ "tenants"
/ str(env.initial_tenant)
/ "timelines"
/ str(leaf_timeline_id)
)
timeline_path = env.pageserver.timeline_dir(env.initial_tenant, leaf_timeline_id)
assert timeline_path.exists()
# retry deletes when compaction or gc is running in pageserver
@@ -336,7 +323,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
),
)
timeline_dir = env.timeline_dir(env.initial_tenant, timeline_id)
timeline_dir = env.pageserver.timeline_dir(env.initial_tenant, timeline_id)
# Check local is empty
assert not timeline_dir.exists()
# Check no delete mark present
@@ -416,7 +403,7 @@ def test_timeline_resurrection_on_attach(
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
dir_to_clear = env.pageserver.tenant_dir()
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
@@ -467,13 +454,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
"test_timeline_delete_fail_before_local_delete",
)
leaf_timeline_path = (
env.pageserver.workdir
/ "tenants"
/ str(env.initial_tenant)
/ "timelines"
/ str(leaf_timeline_id)
)
leaf_timeline_path = env.pageserver.timeline_dir(env.initial_tenant, leaf_timeline_id)
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
timeline_info = wait_until_timeline_state(
@@ -921,7 +902,7 @@ def test_timeline_delete_resumed_on_attach(
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
dir_to_clear = env.pageserver.tenant_dir()
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
@@ -933,7 +914,7 @@ def test_timeline_delete_resumed_on_attach(
# delete should be resumed
wait_timeline_detail_404(ps_http, env.initial_tenant, timeline_id, iterations=iterations)
tenant_path = env.timeline_dir(tenant_id=tenant_id, timeline_id=timeline_id)
tenant_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
assert not tenant_path.exists()
if remote_storage_kind in available_s3_storages():

View File

@@ -518,7 +518,7 @@ def test_timeline_size_metrics(
).value
# assert that the physical size metric matches the actual physical size on disk
timeline_path = env.timeline_dir(env.initial_tenant, new_timeline_id)
timeline_path = env.pageserver.timeline_dir(env.initial_tenant, new_timeline_id)
assert tl_physical_size_metric == get_timeline_dir_size(timeline_path)
# Check that the logical size metric is sane, and matches
@@ -658,7 +658,7 @@ def get_physical_size_values(
)
res.api_current_physical = detail["current_physical_size"]
timeline_path = env.timeline_dir(tenant_id, timeline_id)
timeline_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
res.python_timelinedir_layerfiles_physical = get_timeline_dir_size(timeline_path)
return res

View File

@@ -19,18 +19,40 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
# Install extension containing function needed for test
cur.execute("CREATE EXTENSION neon_test_utils")
# Create a test table and freeze it to set the VM bit.
# Create a test table for a few different scenarios and freeze it to set the VM bits.
cur.execute("CREATE TABLE vmtest_delete (id integer PRIMARY KEY)")
cur.execute("INSERT INTO vmtest_delete VALUES (1)")
cur.execute("VACUUM FREEZE vmtest_delete")
cur.execute("CREATE TABLE vmtest_update (id integer PRIMARY KEY)")
cur.execute("INSERT INTO vmtest_update SELECT g FROM generate_series(1, 1000) g")
cur.execute("VACUUM FREEZE vmtest_update")
cur.execute("CREATE TABLE vmtest_hot_update (id integer PRIMARY KEY, filler text)")
cur.execute("INSERT INTO vmtest_hot_update VALUES (1, 'x')")
cur.execute("VACUUM FREEZE vmtest_hot_update")
cur.execute("CREATE TABLE vmtest_cold_update (id integer PRIMARY KEY)")
cur.execute("INSERT INTO vmtest_cold_update SELECT g FROM generate_series(1, 1000) g")
cur.execute("VACUUM FREEZE vmtest_cold_update")
cur.execute(
"CREATE TABLE vmtest_cold_update2 (id integer PRIMARY KEY, filler text) WITH (fillfactor=100)"
)
cur.execute("INSERT INTO vmtest_cold_update2 SELECT g, '' FROM generate_series(1, 1000) g")
cur.execute("VACUUM FREEZE vmtest_cold_update2")
# DELETE and UPDATE the rows.
cur.execute("DELETE FROM vmtest_delete WHERE id = 1")
cur.execute("UPDATE vmtest_update SET id = 5000 WHERE id = 1")
cur.execute("UPDATE vmtest_hot_update SET filler='x' WHERE id = 1")
cur.execute("UPDATE vmtest_cold_update SET id = 5000 WHERE id = 1")
# Clear the VM bit on the last page with an INSERT. Then clear the VM bit on
# the page where row 1 is (block 0), by doing an UPDATE. The UPDATE is a
# cold update, and the new tuple goes to the last page, which already had
# its VM bit cleared. The point is that the UPDATE *only* clears the VM bit
# on the page containing the old tuple. We had a bug where we got the old
# and new pages mixed up, and that only shows up when one of the bits is
# cleared, but not the other one.
cur.execute("INSERT INTO vmtest_cold_update2 VALUES (9999, 'x')")
# Clears the VM bit on the old page
cur.execute("UPDATE vmtest_cold_update2 SET id = 5000, filler=repeat('x', 200) WHERE id = 1")
# Branch at this point, to test that later
fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "test_vm_bit_clear")
@@ -50,9 +72,13 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
"""
)
cur.execute("SELECT * FROM vmtest_delete WHERE id = 1")
cur.execute("SELECT id FROM vmtest_delete WHERE id = 1")
assert cur.fetchall() == []
cur.execute("SELECT * FROM vmtest_update WHERE id = 1")
cur.execute("SELECT id FROM vmtest_hot_update WHERE id = 1")
assert cur.fetchall() == [(1,)]
cur.execute("SELECT id FROM vmtest_cold_update WHERE id = 1")
assert cur.fetchall() == []
cur.execute("SELECT id FROM vmtest_cold_update2 WHERE id = 1")
assert cur.fetchall() == []
cur.close()
@@ -77,7 +103,111 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
"""
)
cur_new.execute("SELECT * FROM vmtest_delete WHERE id = 1")
cur_new.execute("SELECT id FROM vmtest_delete WHERE id = 1")
assert cur_new.fetchall() == []
cur_new.execute("SELECT * FROM vmtest_update WHERE id = 1")
cur_new.execute("SELECT id FROM vmtest_hot_update WHERE id = 1")
assert cur_new.fetchall() == [(1,)]
cur_new.execute("SELECT id FROM vmtest_cold_update WHERE id = 1")
assert cur_new.fetchall() == []
cur_new.execute("SELECT id FROM vmtest_cold_update2 WHERE id = 1")
assert cur_new.fetchall() == []
#
# Test that the ALL_FROZEN VM bit is cleared correctly at a HEAP_LOCK
# record.
#
def test_vm_bit_clear_on_heap_lock(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_vm_bit_clear_on_heap_lock", "empty")
endpoint = env.endpoints.create_start(
"test_vm_bit_clear_on_heap_lock",
config_lines=[
"log_autovacuum_min_duration = 0",
# Perform anti-wraparound vacuuming aggressively
"autovacuum_naptime='1 s'",
"autovacuum_freeze_max_age = 1000000",
],
)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# Install extension containing function needed for test
cur.execute("CREATE EXTENSION neon_test_utils")
cur.execute("SELECT pg_switch_wal()")
# Create a test table and freeze it to set the all-frozen VM bit on all pages.
cur.execute("CREATE TABLE vmtest_lock (id integer PRIMARY KEY)")
cur.execute("INSERT INTO vmtest_lock SELECT g FROM generate_series(1, 50000) g")
cur.execute("VACUUM FREEZE vmtest_lock")
# Lock a row. This clears the all-frozen VM bit for that page.
cur.execute("SELECT * FROM vmtest_lock WHERE id = 40000 FOR UPDATE")
# Remember the XID. We will use it later to verify that we have consumed a lot of
# XIDs after this.
cur.execute("select pg_current_xact_id()")
locking_xid = cur.fetchall()[0][0]
# Stop and restart postgres, to clear the buffer cache.
#
# NOTE: clear_buffer_cache() will not do, because it evicts the dirty pages
# in a "clean" way. Our neon extension will write a full-page image of the VM
# page, and we want to avoid that.
endpoint.stop()
endpoint.start()
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 ")
tup = cur.fetchall()
xmax_before = tup[0][1]
# Consume a lot of XIDs, so that anti-wraparound autovacuum kicks
# in and the clog gets truncated. We set autovacuum_freeze_max_age to a very
# low value, so it doesn't take all that many XIDs for autovacuum to kick in.
for i in range(1000):
cur.execute(
"""
CREATE TEMP TABLE othertable (i int) ON COMMIT DROP;
do $$
begin
for i in 1..100000 loop
-- Use a begin-exception block to generate a new subtransaction on each iteration
begin
insert into othertable values (i);
exception when others then
raise 'not expected %', sqlerrm;
end;
end loop;
end;
$$;
"""
)
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 ")
tup = cur.fetchall()
log.info(f"tuple = {tup}")
xmax = tup[0][1]
assert xmax == xmax_before
if i % 50 == 0:
cur.execute("select datfrozenxid from pg_database where datname='postgres'")
datfrozenxid = cur.fetchall()[0][0]
if datfrozenxid > locking_xid:
break
cur.execute("select pg_current_xact_id()")
curr_xid = cur.fetchall()[0][0]
assert int(curr_xid) - int(locking_xid) >= 100000
# Now, if the VM all-frozen bit was not correctly cleared on
# replay, we will try to fetch the status of the XID that was
# already truncated away.
#
# ERROR: could not access status of transaction 1027
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 for update")
tup = cur.fetchall()
log.info(f"tuple = {tup}")

View File

@@ -14,6 +14,8 @@ from pathlib import Path
from typing import Any, List, Optional
import psycopg2
import psycopg2.errors
import psycopg2.extras
import pytest
from fixtures.broker import NeonBroker
from fixtures.log_helper import log
@@ -260,7 +262,7 @@ def test_restarts(neon_env_builder: NeonEnvBuilder):
else:
failed_node.start()
failed_node = None
assert query_scalar(cur, "SELECT sum(key) FROM t") == 500500
assert query_scalar(cur, "SELECT sum(key) FROM t") == (n_inserts * (n_inserts + 1)) // 2
# Test that safekeepers push their info to the broker and learn peer status from it

View File

@@ -43,7 +43,7 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder):
tenant_id, _ = env.neon_cli.create_tenant()
# assert tenant exists on disk
assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert (env.pageserver.tenant_dir(tenant_id)).exists()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
@@ -101,7 +101,7 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder):
pytest.fail(f"could not detach tenant: {last_error}")
# check that nothing is left on disk for deleted tenant
assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert not env.pageserver.tenant_dir(tenant_id).exists()
# Pageserver schedules kill+wait of the WAL redo process to the background runtime,
# asynchronously to tenant detach. Cut it some slack to complete kill+wait before