Compare commits

...

15 Commits

Author SHA1 Message Date
Konstantin Knizhnik
70c741fc13 Look at the database last written LSN in neon_exits 2023-09-16 21:40:55 +03:00
Joonas Koivunen
e62ab176b8 refactor(consumption_metrics): split (#5326)
Split off from #5297. Builds upon #5325, should contain only the
splitting. Next up: #5327.
2023-09-16 18:45:08 +03:00
Joonas Koivunen
a221ecb0da test: test_download_remote_layers_api again (#5322)
The test is still flaky, perhaps more after #5233, see #3831.

Do one more `timeline_checkpoint` *after* shutting down safekeepers
*before* shutting down pageserver. Put more effort into not compacting
or creating image layers.
2023-09-16 18:27:19 +03:00
Joonas Koivunen
9cf4ae86ff refactor(consumption_metrics): pre-split cleanup (#5325)
Cleanups in preparation to splitting the consumption_metrics.rs in
#5326.

Split off from #5297.
2023-09-16 18:08:33 +03:00
Joonas Koivunen
74d99b5883 refactor(test_consumption_metrics): split for pageserver and proxy (#5324)
With the addition of the "stateful event verification" the
test_consumption_metrics.py is now too crowded. Split it up for
pageserver and proxy.

Split from #5297.
2023-09-16 18:05:35 +03:00
Joonas Koivunen
f902777202 fix: consumption metrics on restart (#5323)
Write collected metrics to disk to recover previously sent metrics on
restart.

Recover the previously collected metrics during startup, send them over
at right time
  - send cached synthetic size before actual is calculated
  - when `last_record_lsn` rolls back on startup
      - stay at last sent `written_size` metric
      - send `written_size_delta_bytes` metric as 0

Add test support: stateful verification of events in python tests.

Fixes: #5206
Cc: #5175 (loggings, will be enhanced in follow-up)
2023-09-16 11:24:42 +03:00
Joonas Koivunen
a7f4ee02a3 fix(consumption_metrics): exp backoff retry (#5317)
Split off from #5297. Depends on #5315.
Cc: #5175 for retry
2023-09-16 01:11:01 +03:00
Joonas Koivunen
00c4c8e2e8 feat(consumption_metrics): remove event deduplication support (#5316)
We no longer use pageserver deduplication anywhere. Give out a warning
instead.

Split off from #5297.

Cc: #5175 for dedup.
2023-09-16 00:06:19 +03:00
Joonas Koivunen
c5d226d9c7 refactor(consumption_metrics): prereq refactorings, tests (#5315)
Split off from #5297.

There should be no functional changes here:
- refactor tenant metric "production" like previously timeline, allows
unit testing, though not interesting enough yet to test
- introduce type aliases for tuples
- extra refactoring for `collect`, was initially thinking it was useful
but will do a inline later
- shorter binding names
- support for future allocation reuse quests with IdempotencyKey
- move code out of tokio::select to make it rustfmt-able
- generification, allow later replacement of `&'static str` with enum
- add tests that assert sent event contents exactly
2023-09-15 19:44:14 +03:00
Konstantin Knizhnik
66fa176cc8 Handle update of VM in XLOG_HEAP_LOCK/XLOG_HEAP2_LOCK_UPDATED WAL records (#4896)
## Problem

VM should be updated if XLH_LOCK_ALL_FROZEN_CLEARED flags is set in
XLOG_HEAP_LOCK,XLOG_HEAP_2_LOCK_UPDATED WAL records

## Summary of changes

Add handling of this records in walingest.rs

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2023-09-15 17:47:29 +03:00
Heikki Linnakangas
9e6b5b686c Add a test case for "CREATE DATABASE STRATEGY=file_copy". (#5301)
It was utterly broken on v15 before commit 83e7e5dbbd, which fixed the
incorrect definition of XLOG_DBASE_CREATE_WAL_LOG. We never noticed
because we had no tests for it.
2023-09-15 16:50:57 +03:00
Rahul Modpur
e6985bd098 Move tenant & timeline dir method to NeonPageserver and use them everywhere (#5262)
## Problem
In many places in test code, paths are built manually from what
NeonEnv.tenant_dir and NeonEnv.timeline_dir could do.

## Summary of changes
1. NeonEnv.tenant_dir and NeonEnv.timeline_dir moved under class
NeonPageserver as the path they use is per-pageserver instance.
2. Used these everywhere to replace manual path building

Closes #5258

---------

Signed-off-by: Rahul Modpur <rmodpur2@gmail.com>
2023-09-15 11:17:18 +01:00
Konstantin Knizhnik
e400a38fb9 References to old and new blocks were mixed in xlog_heap_update handler (#5312)
## Problem

See https://neondb.slack.com/archives/C05L7D1JAUS/p1694614585955029

https://www.notion.so/neondatabase/Duplicate-key-issue-651627ce843c45188fbdcb2d30fd2178

## Summary of changes

Swap old/new block references

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
2023-09-15 10:32:25 +03:00
Alexander Bayandin
bd36d1c44a approved-for-ci-run.yml: fix variable name and permissions (#5307)
## Problem
- `gh pr list` fails with `unknown argument "main"; please quote all
values that have spaces due to using a variable with the wrong name
- `permissions: write-all` are too wide for the job

## Summary of changes
- For variable name `HEAD` -> `BRANCH`
- Grant only required permissions for each job

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-09-14 20:18:49 +03:00
Alexander Bayandin
0501b74f55 Update checksum for pg_hint_plan (#5309)
## Problem

The checksum for `pg_hint_plan` doesn't match:
```
sha256sum: WARNING: 1 computed checksum did NOT match
```

Ref
https://github.com/neondatabase/neon/actions/runs/6185715461/job/16793609251?pr=5307

It seems that the release was retagged yesterday:
https://github.com/ossc-db/pg_hint_plan/releases/tag/REL16_1_6_0

I don't see any malicious changes from 15_1.5.1:
https://github.com/ossc-db/pg_hint_plan/compare/REL15_1_5_1...REL16_1_6_0,
so it should be ok to update.

## Summary of changes
- Update checksum for `pg_hint_plan` 16_1.6.0
2023-09-14 18:17:50 +03:00
40 changed files with 2193 additions and 1104 deletions

View File

@@ -16,21 +16,29 @@ on:
# Actual magic happens here:
- labeled
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number }}
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
PR_NUMBER: ${{ github.event.pull_request.number }}
BRANCH: "ci-run/pr-${{ github.event.pull_request.number }}"
permissions: write-all
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
permissions: {}
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number }}
defaults:
run:
shell: bash -euo pipefail {0}
jobs:
remove-label:
# Remove `approved-for-ci-run` label if the workflow is triggered by changes in a PR.
# The PR should be reviewed and labelled manually again.
permissions:
pull-requests: write # For `gh pr edit`
if: |
contains(fromJSON('["opened", "synchronize", "reopened", "closed"]'), github.event.action) &&
contains(github.event.pull_request.labels.*.name, 'approved-for-ci-run')
@@ -43,6 +51,10 @@ jobs:
create-or-update-pr-for-ci-run:
# Create local PR for an `approved-for-ci-run` labelled PR to run CI pipeline in it.
permissions:
pull-requests: write # for `gh pr edit`
# For `git push` and `gh pr create` we use CI_ACCESS_TOKEN
if: |
github.event.action == 'labeled' &&
contains(github.event.pull_request.labels.*.name, 'approved-for-ci-run')
@@ -75,7 +87,7 @@ jobs:
Feel free to review/comment/discuss the original PR #${PR_NUMBER}.
EOF
ALREADY_CREATED="$(gh pr --repo ${GITHUB_REPOSITORY} list --head ${HEAD} --base main --json number --jq '.[].number')"
ALREADY_CREATED="$(gh pr --repo ${GITHUB_REPOSITORY} list --head ${BRANCH} --base main --json number --jq '.[].number')"
if [ -z "${ALREADY_CREATED}" ]; then
gh pr --repo "${GITHUB_REPOSITORY}" create --title "CI run for PR #${PR_NUMBER}" \
--body-file "body.md" \
@@ -87,6 +99,10 @@ jobs:
cleanup:
# Close PRs and delete branchs if the original PR is closed.
permissions:
contents: write # for `--delete-branch` flag in `gh pr close`
pull-requests: write # for `gh pr close`
if: |
github.event.action == 'closed' &&
github.event.pull_request.head.repo.full_name != github.repository
@@ -94,8 +110,9 @@ jobs:
runs-on: ubuntu-latest
steps:
- run: |
CLOSED="$(gh pr --repo ${GITHUB_REPOSITORY} list --head ${HEAD} --json 'closed' --jq '.[].closed')"
- name: Close PR and delete `ci-run/pr-${{ env.PR_NUMBER }}` branch
run: |
CLOSED="$(gh pr --repo ${GITHUB_REPOSITORY} list --head ${BRANCH} --json 'closed' --jq '.[].closed')"
if [ "${CLOSED}" == "false" ]; then
gh pr --repo "${GITHUB_REPOSITORY}" close "${BRANCH}" --delete-branch
fi

View File

@@ -416,7 +416,7 @@ RUN case "${PG_VERSION}" in \
;; \
"v16") \
export PG_HINT_PLAN_VERSION=16_1_6_0 \
export PG_HINT_PLAN_CHECKSUM=ce6a8040c78012000f5da7240caf6a971401412f41d33f930f09291e6c304b99 \
export PG_HINT_PLAN_CHECKSUM=fc85a9212e7d2819d4ae4ac75817481101833c3cfa9f0fe1f980984e12347d00 \
;; \
*) \
echo "Export the valid PG_HINT_PLAN_VERSION variable" && exit 1 \

View File

@@ -5,7 +5,7 @@ use chrono::{DateTime, Utc};
use rand::Rng;
use serde::Serialize;
#[derive(Serialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[serde(tag = "type")]
pub enum EventType {
#[serde(rename = "absolute")]
@@ -27,7 +27,8 @@ impl EventType {
}
pub fn incremental_timerange(&self) -> Option<std::ops::Range<&DateTime<Utc>>> {
// these can most likely be thought of as Range or RangeFull
// these can most likely be thought of as Range or RangeFull, at least pageserver creates
// incremental ranges where the stop and next start are equal.
use EventType::*;
match self {
Incremental {
@@ -41,15 +42,25 @@ impl EventType {
pub fn is_incremental(&self) -> bool {
matches!(self, EventType::Incremental { .. })
}
/// Returns the absolute time, or for incremental ranges, the stop time.
pub fn recorded_at(&self) -> &DateTime<Utc> {
use EventType::*;
match self {
Absolute { time } => time,
Incremental { stop_time, .. } => stop_time,
}
}
}
#[derive(Serialize, Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub struct Event<Extra> {
pub struct Event<Extra, Metric: Serialize> {
#[serde(flatten)]
#[serde(rename = "type")]
pub kind: EventType,
pub metric: &'static str,
pub metric: Metric,
pub idempotency_key: String,
pub value: u64,
@@ -58,12 +69,38 @@ pub struct Event<Extra> {
}
pub fn idempotency_key(node_id: &str) -> String {
format!(
"{}-{}-{:04}",
Utc::now(),
node_id,
rand::thread_rng().gen_range(0..=9999)
)
IdempotencyKey::generate(node_id).to_string()
}
/// Downstream users will use these to detect upload retries.
pub struct IdempotencyKey<'a> {
now: chrono::DateTime<Utc>,
node_id: &'a str,
nonce: u16,
}
impl std::fmt::Display for IdempotencyKey<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}-{}-{:04}", self.now, self.node_id, self.nonce)
}
}
impl<'a> IdempotencyKey<'a> {
pub fn generate(node_id: &'a str) -> Self {
IdempotencyKey {
now: Utc::now(),
node_id,
nonce: rand::thread_rng().gen_range(0..=9999),
}
}
pub fn for_tests(now: DateTime<Utc>, node_id: &'a str, nonce: u16) -> Self {
IdempotencyKey {
now,
node_id,
nonce,
}
}
}
pub const CHUNK_SIZE: usize = 1000;

View File

@@ -137,9 +137,12 @@ pub const XLOG_HEAP_INSERT: u8 = 0x00;
pub const XLOG_HEAP_DELETE: u8 = 0x10;
pub const XLOG_HEAP_UPDATE: u8 = 0x20;
pub const XLOG_HEAP_HOT_UPDATE: u8 = 0x40;
pub const XLOG_HEAP_LOCK: u8 = 0x60;
pub const XLOG_HEAP_INIT_PAGE: u8 = 0x80;
pub const XLOG_HEAP2_VISIBLE: u8 = 0x40;
pub const XLOG_HEAP2_MULTI_INSERT: u8 = 0x50;
pub const XLOG_HEAP2_LOCK_UPDATED: u8 = 0x60;
pub const XLH_LOCK_ALL_FROZEN_CLEARED: u8 = 0x01;
pub const XLH_INSERT_ALL_FROZEN_SET: u8 = (1 << 5) as u8;
pub const XLH_INSERT_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;
pub const XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED: u8 = (1 << 0) as u8;

View File

@@ -80,11 +80,11 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
tempfile.workspace = true
[dev-dependencies]
criterion.workspace = true
hex-literal.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }
[[bench]]

View File

@@ -518,6 +518,9 @@ fn start_pageserver(
// creates a child context with the right DownloadBehavior.
DownloadBehavior::Error,
);
let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");
task_mgr::spawn(
crate::BACKGROUND_RUNTIME.handle(),
TaskKind::MetricsCollection,
@@ -544,6 +547,7 @@ fn start_pageserver(
conf.cached_metric_collection_interval,
conf.synthetic_size_calculation_interval,
conf.id,
local_disk_storage,
metrics_ctx,
)
.instrument(info_span!("metrics_collection"))

View File

@@ -64,7 +64,7 @@ pub mod defaults {
super::ConfigurableSemaphore::DEFAULT_INITIAL.get();
pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "0s";
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";

View File

@@ -1,188 +1,54 @@
//!
//! Periodically collect consumption metrics for all active tenants
//! and push them to a HTTP endpoint.
//! Cache metrics to send only the updated ones.
//!
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{mgr, LogicalSizeCalculationCause};
use anyhow;
use chrono::{DateTime, Utc};
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use consumption_metrics::EventType;
use pageserver_api::models::TenantState;
use reqwest::Url;
use serde::Serialize;
use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tracing::*;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::id::NodeId;
mod metrics;
use metrics::{Ids, MetricsKey};
mod disk_cache;
mod upload;
const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
#[serde_as]
#[derive(Serialize, Debug, Clone, Copy)]
struct Ids {
#[serde_as(as = "DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
timeline_id: Option<TimelineId>,
}
/// Basically a key-value pair, but usually in a Vec except for [`Cache`].
///
/// This is as opposed to `consumption_metrics::Event` which is the externally communicated form.
/// Difference is basically the missing idempotency key, which lives only for the duration of
/// upload attempts.
type RawMetric = (MetricsKey, (EventType, u64));
/// Key that uniquely identifies the object, this metric describes.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct MetricsKey {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
metric: &'static str,
}
impl MetricsKey {
const fn absolute_values(self) -> AbsoluteValueFactory {
AbsoluteValueFactory(self)
}
const fn incremental_values(self) -> IncrementalValueFactory {
IncrementalValueFactory(self)
}
}
/// Helper type which each individual metric kind can return to produce only absolute values.
struct AbsoluteValueFactory(MetricsKey);
impl AbsoluteValueFactory {
fn at(self, time: DateTime<Utc>, val: u64) -> (MetricsKey, (EventType, u64)) {
let key = self.0;
(key, (EventType::Absolute { time }, val))
}
}
/// Helper type which each individual metric kind can return to produce only incremental values.
struct IncrementalValueFactory(MetricsKey);
impl IncrementalValueFactory {
#[allow(clippy::wrong_self_convention)]
fn from_previous_up_to(
self,
prev_end: DateTime<Utc>,
up_to: DateTime<Utc>,
val: u64,
) -> (MetricsKey, (EventType, u64)) {
let key = self.0;
// cannot assert prev_end < up_to because these are realtime clock based
(
key,
(
EventType::Incremental {
start_time: prev_end,
stop_time: up_to,
},
val,
),
)
}
fn key(&self) -> &MetricsKey {
&self.0
}
}
// the static part of a MetricsKey
impl MetricsKey {
/// Absolute value of [`Timeline::get_last_record_lsn`].
///
/// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: "written_size",
}
.absolute_values()
}
/// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
/// previously sent, starting from the previously sent incremental time range ending at the
/// latest absolute measurement.
const fn written_size_delta(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> IncrementalValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
// the name here is correctly about data not size, because that is what is wanted by
// downstream pipeline
metric: "written_data_bytes_delta",
}
.incremental_values()
}
/// Exact [`Timeline::get_current_logical_size`].
///
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
const fn timeline_logical_size(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: "timeline_logical_size",
}
.absolute_values()
}
/// [`Tenant::remote_size`]
///
/// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: None,
metric: "remote_storage_size",
}
.absolute_values()
}
/// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
///
/// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: None,
metric: "resident_size",
}
.absolute_values()
}
/// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
///
/// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: None,
metric: "synthetic_storage_size",
}
.absolute_values()
}
}
/// Caches the [`RawMetric`]s
///
/// In practice, during startup, last sent values are stored here to be used in calculating new
/// ones. After successful uploading, the cached values are updated to cache. This used to be used
/// for deduplication, but that is no longer needed.
type Cache = HashMap<MetricsKey, (EventType, u64)>;
/// Main thread that serves metrics collection
pub async fn collect_metrics(
metric_collection_endpoint: &Url,
metric_collection_interval: Duration,
cached_metric_collection_interval: Duration,
_cached_metric_collection_interval: Duration,
synthetic_size_calculation_interval: Duration,
node_id: NodeId,
local_disk_storage: PathBuf,
ctx: RequestContext,
) -> anyhow::Result<()> {
let mut ticker = tokio::time::interval(metric_collection_interval);
info!("starting collect_metrics");
if _cached_metric_collection_interval != Duration::ZERO {
tracing::warn!(
"cached_metric_collection_interval is no longer used, please set it to zero."
)
}
// spin up background worker that caclulates tenant sizes
let worker_ctx =
@@ -202,543 +68,218 @@ pub async fn collect_metrics(
},
);
let final_path: Arc<PathBuf> = Arc::new(local_disk_storage);
let cancel = task_mgr::shutdown_token();
let restore_and_reschedule = restore_and_reschedule(&final_path, metric_collection_interval);
let mut cached_metrics = tokio::select! {
_ = cancel.cancelled() => return Ok(()),
ret = restore_and_reschedule => ret,
};
// define client here to reuse it for all requests
let client = reqwest::ClientBuilder::new()
.timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
.build()
.expect("Failed to create http client with timeout");
let mut cached_metrics = HashMap::new();
let mut prev_iteration_time: std::time::Instant = std::time::Instant::now();
loop {
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
info!("collect_metrics received cancellation request");
return Ok(());
},
tick_at = ticker.tick() => {
// send cached metrics every cached_metric_collection_interval
let send_cached = prev_iteration_time.elapsed() >= cached_metric_collection_interval;
if send_cached {
prev_iteration_time = std::time::Instant::now();
}
collect_metrics_iteration(&client, &mut cached_metrics, metric_collection_endpoint, node_id, &ctx, send_cached).await;
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
metric_collection_interval,
"consumption_metrics_collect_metrics",
);
}
}
}
}
/// One iteration of metrics collection
///
/// Gather per-tenant and per-timeline metrics and send them to the `metric_collection_endpoint`.
/// Cache metrics to avoid sending the same metrics multiple times.
///
/// This function handles all errors internally
/// and doesn't break iteration if just one tenant fails.
///
/// TODO
/// - refactor this function (chunking+sending part) to reuse it in proxy module;
async fn collect_metrics_iteration(
client: &reqwest::Client,
cached_metrics: &mut HashMap<MetricsKey, (EventType, u64)>,
metric_collection_endpoint: &reqwest::Url,
node_id: NodeId,
ctx: &RequestContext,
send_cached: bool,
) {
let mut current_metrics: Vec<(MetricsKey, (EventType, u64))> = Vec::new();
trace!(
"starting collect_metrics_iteration. metric_collection_endpoint: {}",
metric_collection_endpoint
);
// get list of tenants
let tenants = match mgr::list_tenants().await {
Ok(tenants) => tenants,
Err(err) => {
error!("failed to list tenants: {:?}", err);
return;
}
};
// iterate through list of Active tenants and collect metrics
for (tenant_id, tenant_state) in tenants {
if tenant_state != TenantState::Active {
continue;
}
let tenant = match mgr::get_tenant(tenant_id, true).await {
Ok(tenant) => tenant,
Err(err) => {
// It is possible that tenant was deleted between
// `list_tenants` and `get_tenant`, so just warn about it.
warn!("failed to get tenant {tenant_id:?}: {err:?}");
continue;
}
};
let mut tenant_resident_size = 0;
// iterate through list of timelines in tenant
for timeline in tenant.list_timelines() {
// collect per-timeline metrics only for active timelines
let timeline_id = timeline.timeline_id;
match TimelineSnapshot::collect(&timeline, ctx) {
Ok(Some(snap)) => {
snap.to_metrics(
tenant_id,
timeline_id,
Utc::now(),
&mut current_metrics,
cached_metrics,
);
}
Ok(None) => {}
Err(e) => {
error!(
"failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
timeline.timeline_id
);
continue;
}
}
tenant_resident_size += timeline.resident_physical_size();
}
current_metrics
.push(MetricsKey::remote_storage_size(tenant_id).at(Utc::now(), tenant.remote_size()));
current_metrics
.push(MetricsKey::resident_size(tenant_id).at(Utc::now(), tenant_resident_size));
// Note that this metric is calculated in a separate bgworker
// Here we only use cached value, which may lag behind the real latest one
let synthetic_size = tenant.cached_synthetic_size();
if synthetic_size != 0 {
// only send non-zeroes because otherwise these show up as errors in logs
current_metrics
.push(MetricsKey::synthetic_size(tenant_id).at(Utc::now(), synthetic_size));
}
}
// Filter metrics, unless we want to send all metrics, including cached ones.
// See: https://github.com/neondatabase/neon/issues/3485
if !send_cached {
current_metrics.retain(|(curr_key, (kind, curr_val))| {
if kind.is_incremental() {
// incremental values (currently only written_size_delta) should not get any cache
// deduplication because they will be used by upstream for "is still alive."
true
} else {
match cached_metrics.get(curr_key) {
Some((_, val)) => val != curr_val,
None => true,
}
}
});
}
if current_metrics.is_empty() {
trace!("no new metrics to send");
return;
}
// Send metrics.
// Split into chunks of 1000 metrics to avoid exceeding the max request size
let chunks = current_metrics.chunks(CHUNK_SIZE);
let mut chunk_to_send: Vec<Event<Ids>> = Vec::with_capacity(CHUNK_SIZE);
let node_id = node_id.to_string();
for chunk in chunks {
chunk_to_send.clear();
// reminder: ticker is ready immediatedly
let mut ticker = tokio::time::interval(metric_collection_interval);
// enrich metrics with type,timestamp and idempotency key before sending
chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event {
kind: *when,
metric: curr_key.metric,
idempotency_key: idempotency_key(&node_id),
value: *curr_val,
extra: Ids {
tenant_id: curr_key.tenant_id,
timeline_id: curr_key.timeline_id,
},
}));
loop {
let tick_at = tokio::select! {
_ = cancel.cancelled() => return Ok(()),
tick_at = ticker.tick() => tick_at,
};
const MAX_RETRIES: u32 = 3;
// these are point in time, with variable "now"
let metrics = metrics::collect_all_metrics(&cached_metrics, &ctx).await;
for attempt in 0..MAX_RETRIES {
let res = client
.post(metric_collection_endpoint.clone())
.json(&EventChunk {
events: (&chunk_to_send).into(),
})
.send()
.await;
if metrics.is_empty() {
continue;
}
match res {
Ok(res) => {
if res.status().is_success() {
// update cached metrics after they were sent successfully
for (curr_key, curr_val) in chunk.iter() {
cached_metrics.insert(curr_key.clone(), *curr_val);
}
} else {
error!("metrics endpoint refused the sent metrics: {:?}", res);
for metric in chunk_to_send
.iter()
.filter(|metric| metric.value > (1u64 << 40))
{
// Report if the metric value is suspiciously large
error!("potentially abnormal metric value: {:?}", metric);
}
}
break;
let metrics = Arc::new(metrics);
// why not race cancellation here? because we are one of the last tasks, and if we are
// already here, better to try to flush the new values.
let flush = async {
match disk_cache::flush_metrics_to_disk(&metrics, &final_path).await {
Ok(()) => {
tracing::debug!("flushed metrics to disk");
}
Err(err) if err.is_timeout() => {
error!(attempt, "timeout sending metrics, retrying immediately");
continue;
}
Err(err) => {
error!(attempt, ?err, "failed to send metrics");
break;
Err(e) => {
// idea here is that if someone creates a directory as our final_path, then they
// might notice it from the logs before shutdown and remove it
tracing::error!("failed to persist metrics to {final_path:?}: {e:#}");
}
}
};
let upload = async {
let res = upload::upload_metrics(
&client,
metric_collection_endpoint,
&cancel,
&node_id,
&metrics,
&mut cached_metrics,
)
.await;
if let Err(e) = res {
// serialization error which should never happen
tracing::error!("failed to upload due to {e:#}");
}
};
// let these run concurrently
let (_, _) = tokio::join!(flush, upload);
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
metric_collection_interval,
"consumption_metrics_collect_metrics",
);
}
}
/// Called on the first iteration in an attempt to join the metric uploading schedule from previous
/// pageserver session. Pageserver is supposed to upload at intervals regardless of restarts.
///
/// Cancellation safe.
async fn restore_and_reschedule(
final_path: &Arc<PathBuf>,
metric_collection_interval: Duration,
) -> Cache {
let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(final_path.clone())
.await
{
Ok(found_some) => {
// there is no min needed because we write these sequentially in
// collect_all_metrics
let earlier_metric_at = found_some
.iter()
.map(|(_, (et, _))| et.recorded_at())
.copied()
.next();
let cached = found_some.into_iter().collect::<Cache>();
(cached, earlier_metric_at)
}
Err(e) => {
use std::io::{Error, ErrorKind};
let root = e.root_cause();
let maybe_ioerr = root.downcast_ref::<Error>();
let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
if !is_not_found {
tracing::info!("failed to read any previous metrics from {final_path:?}: {e:#}");
}
(HashMap::new(), None)
}
};
if let Some(earlier_metric_at) = earlier_metric_at {
let earlier_metric_at: SystemTime = earlier_metric_at.into();
let error = reschedule(earlier_metric_at, metric_collection_interval).await;
if let Some(error) = error {
if error.as_secs() >= 60 {
tracing::info!(
error_ms = error.as_millis(),
"startup scheduling error due to restart"
)
}
}
}
cached
}
/// Internal type to make timeline metric production testable.
///
/// As this value type contains all of the information needed from a timeline to produce the
/// metrics, it can easily be created with different values in test.
struct TimelineSnapshot {
loaded_at: (Lsn, SystemTime),
last_record_lsn: Lsn,
current_exact_logical_size: Option<u64>,
}
async fn reschedule(
earlier_metric_at: SystemTime,
metric_collection_interval: Duration,
) -> Option<Duration> {
let now = SystemTime::now();
match now.duration_since(earlier_metric_at) {
Ok(from_last_send) if from_last_send < metric_collection_interval => {
let sleep_for = metric_collection_interval - from_last_send;
impl TimelineSnapshot {
/// Collect the metrics from an actual timeline.
///
/// Fails currently only when [`Timeline::get_current_logical_size`] fails.
///
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
fn collect(
t: &Arc<crate::tenant::Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<Option<Self>> {
use anyhow::Context;
let deadline = std::time::Instant::now() + sleep_for;
if !t.is_active() {
// no collection for broken or stopping needed, we will still keep the cached values
// though at the caller.
Ok(None)
} else {
let loaded_at = t.loaded_at;
let last_record_lsn = t.get_last_record_lsn();
tokio::time::sleep_until(deadline.into()).await;
let current_exact_logical_size = {
let span = info_span!("collect_metrics_iteration", tenant_id = %t.tenant_id, timeline_id = %t.timeline_id);
let res = span
.in_scope(|| t.get_current_logical_size(ctx))
.context("get_current_logical_size");
match res? {
// Only send timeline logical size when it is fully calculated.
(size, is_exact) if is_exact => Some(size),
(_, _) => None,
}
};
let now = std::time::Instant::now();
Ok(Some(TimelineSnapshot {
loaded_at,
last_record_lsn,
current_exact_logical_size,
}))
}
}
/// Produce the timeline consumption metrics into the `metrics` argument.
fn to_metrics(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
now: DateTime<Utc>,
metrics: &mut Vec<(MetricsKey, (EventType, u64))>,
cache: &HashMap<MetricsKey, (EventType, u64)>,
) {
let timeline_written_size = u64::from(self.last_record_lsn);
let (key, written_size_now) =
MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
// last_record_lsn can only go up, right now at least, TODO: #2592 or related
// features might change this.
let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
// use this when available, because in a stream of incremental values, it will be
// accurate where as when last_record_lsn stops moving, we will only cache the last
// one of those.
let last_stop_time = cache
.get(written_size_delta_key.key())
.map(|(until, _val)| {
until
.incremental_timerange()
.expect("never create EventType::Absolute for written_size_delta")
.end
});
// by default, use the last sent written_size as the basis for
// calculating the delta. if we don't yet have one, use the load time value.
let prev = cache
.get(&key)
.map(|(prev_at, prev)| {
// use the prev time from our last incremental update, or default to latest
// absolute update on the first round.
let prev_at = prev_at
.absolute_time()
.expect("never create EventType::Incremental for written_size");
let prev_at = last_stop_time.unwrap_or(prev_at);
(*prev_at, *prev)
})
.unwrap_or_else(|| {
// if we don't have a previous point of comparison, compare to the load time
// lsn.
let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
(DateTime::from(*loaded_at), disk_consistent_lsn.0)
});
// written_size_bytes_delta
metrics.extend(
if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
let up_to = written_size_now
.0
.absolute_time()
.expect("never create EventType::Incremental for written_size");
let key_value = written_size_delta_key.from_previous_up_to(prev.0, *up_to, delta);
Some(key_value)
// executor threads might be busy, add extra measurements
Some(if now < deadline {
deadline - now
} else {
None
},
);
// written_size
metrics.push((key, written_size_now));
if let Some(size) = self.current_exact_logical_size {
metrics.push(MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, size));
now - deadline
})
}
Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
Err(_) => {
tracing::warn!(
?now,
?earlier_metric_at,
"oldest recorded metric is in future; first values will come out with inconsistent timestamps"
);
earlier_metric_at.duration_since(now).ok()
}
}
}
/// Caclculate synthetic size for each active tenant
pub async fn calculate_synthetic_size_worker(
async fn calculate_synthetic_size_worker(
synthetic_size_calculation_interval: Duration,
ctx: &RequestContext,
) -> anyhow::Result<()> {
info!("starting calculate_synthetic_size_worker");
// reminder: ticker is ready immediatedly
let mut ticker = tokio::time::interval(synthetic_size_calculation_interval);
let cause = LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;
loop {
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
return Ok(());
},
tick_at = ticker.tick() => {
let tick_at = tokio::select! {
_ = task_mgr::shutdown_watcher() => return Ok(()),
tick_at = ticker.tick() => tick_at,
};
let tenants = match mgr::list_tenants().await {
Ok(tenants) => tenants,
Err(e) => {
warn!("cannot get tenant list: {e:#}");
continue;
}
};
// iterate through list of Active tenants and collect metrics
for (tenant_id, tenant_state) in tenants {
let tenants = match mgr::list_tenants().await {
Ok(tenants) => tenants,
Err(e) => {
warn!("cannot get tenant list: {e:#}");
continue;
}
};
if tenant_state != TenantState::Active {
continue;
}
if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await
{
if let Err(e) = tenant.calculate_synthetic_size(
LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize,
ctx).await {
error!("failed to calculate synthetic size for tenant {}: {}", tenant_id, e);
}
}
for (tenant_id, tenant_state) in tenants {
if tenant_state != TenantState::Active {
continue;
}
if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await {
if let Err(e) = tenant.calculate_synthetic_size(cause, ctx).await {
error!("failed to calculate synthetic size for tenant {tenant_id}: {e:#}");
}
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
synthetic_size_calculation_interval,
"consumption_metrics_synthetic_size_worker",
);
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::time::SystemTime;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::consumption_metrics::MetricsKey;
use super::TimelineSnapshot;
use chrono::{DateTime, Utc};
#[test]
fn startup_collected_timeline_metrics_before_advancing() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let mut metrics = Vec::new();
let cache = HashMap::new();
let initdb_lsn = Lsn(0x10000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, SystemTime::now()),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
};
let now = DateTime::<Utc>::from(SystemTime::now());
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
snap.loaded_at.1.into(),
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
#[test]
fn startup_collected_timeline_metrics_second_round() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [now, before, init] = time_backwards();
let now = DateTime::<Utc>::from(now);
let before = DateTime::<Utc>::from(before);
let initdb_lsn = Lsn(0x10000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let mut metrics = Vec::new();
let cache = HashMap::from([
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0)
]);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_previous_up_to(before, now, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
#[test]
fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [now, just_before, before, init] = time_backwards();
let now = DateTime::<Utc>::from(now);
let just_before = DateTime::<Utc>::from(just_before);
let before = DateTime::<Utc>::from(before);
let initdb_lsn = Lsn(0x10000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let mut metrics = Vec::new();
let cache = HashMap::from([
// at t=before was the last time the last_record_lsn changed
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0),
// end time of this event is used for the next ones
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
before,
just_before,
0,
),
]);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
just_before,
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
let mut times = [std::time::SystemTime::UNIX_EPOCH; N];
times[0] = std::time::SystemTime::now();
for behind in 1..N {
times[behind] = times[0] - std::time::Duration::from_secs(behind as u64);
}
times
crate::tenant::tasks::warn_when_period_overrun(
tick_at.elapsed(),
synthetic_size_calculation_interval,
"consumption_metrics_synthetic_size_worker",
);
}
}

View File

@@ -0,0 +1,66 @@
use anyhow::Context;
use std::path::PathBuf;
use std::sync::Arc;
use super::RawMetric;
pub(super) async fn read_metrics_from_disk(path: Arc<PathBuf>) -> anyhow::Result<Vec<RawMetric>> {
// do not add context to each error, callsite will log with full path
let span = tracing::Span::current();
tokio::task::spawn_blocking(move || {
let _e = span.entered();
let mut file = std::fs::File::open(&*path)?;
let reader = std::io::BufReader::new(&mut file);
anyhow::Ok(serde_json::from_reader::<_, Vec<RawMetric>>(reader)?)
})
.await
.context("read metrics join error")
.and_then(|x| x)
}
pub(super) async fn flush_metrics_to_disk(
current_metrics: &Arc<Vec<RawMetric>>,
final_path: &Arc<PathBuf>,
) -> anyhow::Result<()> {
use std::io::Write;
anyhow::ensure!(
final_path.parent().is_some(),
"path must have parent: {final_path:?}"
);
let span = tracing::Span::current();
tokio::task::spawn_blocking({
let current_metrics = current_metrics.clone();
let final_path = final_path.clone();
move || {
let _e = span.entered();
let mut tempfile =
tempfile::NamedTempFile::new_in(final_path.parent().expect("existence checked"))?;
// write out all of the raw metrics, to be read out later on restart as cached values
{
let mut writer = std::io::BufWriter::new(&mut tempfile);
serde_json::to_writer(&mut writer, &*current_metrics)
.context("serialize metrics")?;
writer
.into_inner()
.map_err(|_| anyhow::anyhow!("flushing metrics failed"))?;
}
tempfile.flush()?;
tempfile.as_file().sync_all()?;
drop(tempfile.persist(&*final_path)?);
let f = std::fs::File::open(final_path.parent().unwrap())?;
f.sync_all()?;
anyhow::Ok(())
}
})
.await
.with_context(|| format!("write metrics to {final_path:?} join error"))
.and_then(|x| x.with_context(|| format!("write metrics to {final_path:?}")))
}

View File

@@ -0,0 +1,470 @@
use crate::context::RequestContext;
use crate::tenant::mgr;
use chrono::{DateTime, Utc};
use consumption_metrics::EventType;
use futures::stream::StreamExt;
use pageserver_api::models::TenantState;
use serde::Serialize;
use serde_with::{serde_as, DisplayFromStr};
use std::sync::Arc;
use std::time::SystemTime;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use anyhow::Context;
use super::{Cache, RawMetric};
// FIXME: all other consumption_metrics::Event stuff is over at uploading, maybe move?
#[serde_as]
#[derive(Serialize, serde::Deserialize, Debug, Clone, Copy)]
pub(super) struct Ids {
#[serde_as(as = "DisplayFromStr")]
pub(super) tenant_id: TenantId,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) timeline_id: Option<TimelineId>,
}
/// Name of the metric, used by `MetricsKey` factory methods and `deserialize_cached_events`
/// instead of static str.
// Do not rename any of these without first consulting with data team and partner
// management.
// FIXME: write those tests before refactoring to this!
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub(super) enum Name {
/// Timeline last_record_lsn, absolute
#[serde(rename = "written_size")]
WrittenSize,
/// Timeline last_record_lsn, incremental
#[serde(rename = "written_data_bytes_delta")]
WrittenSizeDelta,
/// Timeline logical size
#[serde(rename = "timeline_logical_size")]
LogicalSize,
/// Tenant remote size
#[serde(rename = "remote_storage_size")]
RemoteSize,
/// Tenant resident size
#[serde(rename = "resident_size")]
ResidentSize,
/// Tenant synthetic size
#[serde(rename = "synthetic_storage_size")]
SyntheticSize,
}
/// Key that uniquely identifies the object this metric describes.
///
/// This is a denormalization done at the MetricsKey const methods; these should not be constructed
/// elsewhere.
#[serde_with::serde_as]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub(super) struct MetricsKey {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub(super) tenant_id: TenantId,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
#[serde(skip_serializing_if = "Option::is_none")]
pub(super) timeline_id: Option<TimelineId>,
pub(super) metric: Name,
}
impl MetricsKey {
const fn absolute_values(self) -> AbsoluteValueFactory {
AbsoluteValueFactory(self)
}
const fn incremental_values(self) -> IncrementalValueFactory {
IncrementalValueFactory(self)
}
}
/// Helper type which each individual metric kind can return to produce only absolute values.
struct AbsoluteValueFactory(MetricsKey);
impl AbsoluteValueFactory {
fn at(self, time: DateTime<Utc>, val: u64) -> RawMetric {
let key = self.0;
(key, (EventType::Absolute { time }, val))
}
fn key(&self) -> &MetricsKey {
&self.0
}
}
/// Helper type which each individual metric kind can return to produce only incremental values.
struct IncrementalValueFactory(MetricsKey);
impl IncrementalValueFactory {
#[allow(clippy::wrong_self_convention)]
fn from_previous_up_to(
self,
prev_end: DateTime<Utc>,
up_to: DateTime<Utc>,
val: u64,
) -> RawMetric {
let key = self.0;
// cannot assert prev_end < up_to because these are realtime clock based
(
key,
(
EventType::Incremental {
start_time: prev_end,
stop_time: up_to,
},
val,
),
)
}
fn key(&self) -> &MetricsKey {
&self.0
}
}
// the static part of a MetricsKey
impl MetricsKey {
/// Absolute value of [`Timeline::get_last_record_lsn`].
///
/// [`Timeline::get_last_record_lsn`]: crate::tenant::Timeline::get_last_record_lsn
const fn written_size(tenant_id: TenantId, timeline_id: TimelineId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: Name::WrittenSize,
}
.absolute_values()
}
/// Values will be the difference of the latest [`MetricsKey::written_size`] to what we
/// previously sent, starting from the previously sent incremental time range ending at the
/// latest absolute measurement.
const fn written_size_delta(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> IncrementalValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: Name::WrittenSizeDelta,
}
.incremental_values()
}
/// Exact [`Timeline::get_current_logical_size`].
///
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
const fn timeline_logical_size(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: Some(timeline_id),
metric: Name::LogicalSize,
}
.absolute_values()
}
/// [`Tenant::remote_size`]
///
/// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: None,
metric: Name::RemoteSize,
}
.absolute_values()
}
/// Sum of [`Timeline::resident_physical_size`] for each `Tenant`.
///
/// [`Timeline::resident_physical_size`]: crate::tenant::Timeline::resident_physical_size
const fn resident_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: None,
metric: Name::ResidentSize,
}
.absolute_values()
}
/// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
///
/// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
/// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
timeline_id: None,
metric: Name::SyntheticSize,
}
.absolute_values()
}
}
pub(super) async fn collect_all_metrics(
cached_metrics: &Cache,
ctx: &RequestContext,
) -> Vec<RawMetric> {
let started_at = std::time::Instant::now();
let tenants = match mgr::list_tenants().await {
Ok(tenants) => tenants,
Err(err) => {
tracing::error!("failed to list tenants: {:?}", err);
return vec![];
}
};
let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
if state != TenantState::Active {
None
} else {
mgr::get_tenant(id, true)
.await
.ok()
.map(|tenant| (id, tenant))
}
});
let res = collect(tenants, cached_metrics, ctx).await;
tracing::info!(
elapsed_ms = started_at.elapsed().as_millis(),
total = res.len(),
"collected metrics"
);
res
}
async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<RawMetric>
where
S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
{
let mut current_metrics: Vec<RawMetric> = Vec::new();
let mut tenants = std::pin::pin!(tenants);
while let Some((tenant_id, tenant)) = tenants.next().await {
let mut tenant_resident_size = 0;
for timeline in tenant.list_timelines() {
let timeline_id = timeline.timeline_id;
match TimelineSnapshot::collect(&timeline, ctx) {
Ok(Some(snap)) => {
snap.to_metrics(
tenant_id,
timeline_id,
Utc::now(),
&mut current_metrics,
cache,
);
}
Ok(None) => {}
Err(e) => {
tracing::error!(
"failed to get metrics values for tenant {tenant_id} timeline {}: {e:#?}",
timeline.timeline_id
);
continue;
}
}
tenant_resident_size += timeline.resident_physical_size();
}
let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
}
current_metrics
}
/// Testing helping in-between abstraction allowing testing metrics without actual Tenants.
struct TenantSnapshot {
resident_size: u64,
remote_size: u64,
synthetic_size: u64,
}
impl TenantSnapshot {
/// Collect tenant status to have metrics created out of it.
///
/// `resident_size` is calculated of the timelines we had access to for other metrics, so we
/// cannot just list timelines here.
fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
TenantSnapshot {
resident_size,
remote_size: t.remote_size(),
// Note that this metric is calculated in a separate bgworker
// Here we only use cached value, which may lag behind the real latest one
synthetic_size: t.cached_synthetic_size(),
}
}
fn to_metrics(
&self,
tenant_id: TenantId,
now: DateTime<Utc>,
cached: &Cache,
metrics: &mut Vec<RawMetric>,
) {
let remote_size = MetricsKey::remote_storage_size(tenant_id).at(now, self.remote_size);
let resident_size = MetricsKey::resident_size(tenant_id).at(now, self.resident_size);
let synthetic_size = {
let factory = MetricsKey::synthetic_size(tenant_id);
let mut synthetic_size = self.synthetic_size;
if synthetic_size == 0 {
if let Some((_, value)) = cached.get(factory.key()) {
// use the latest value from previous session
synthetic_size = *value;
}
}
if synthetic_size != 0 {
// only send non-zeroes because otherwise these show up as errors in logs
Some(factory.at(now, synthetic_size))
} else {
None
}
};
metrics.extend(
[Some(remote_size), Some(resident_size), synthetic_size]
.into_iter()
.flatten(),
);
}
}
/// Internal type to make timeline metric production testable.
///
/// As this value type contains all of the information needed from a timeline to produce the
/// metrics, it can easily be created with different values in test.
struct TimelineSnapshot {
loaded_at: (Lsn, SystemTime),
last_record_lsn: Lsn,
current_exact_logical_size: Option<u64>,
}
impl TimelineSnapshot {
/// Collect the metrics from an actual timeline.
///
/// Fails currently only when [`Timeline::get_current_logical_size`] fails.
///
/// [`Timeline::get_current_logical_size`]: crate::tenant::Timeline::get_current_logical_size
fn collect(
t: &Arc<crate::tenant::Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<Option<Self>> {
if !t.is_active() {
// no collection for broken or stopping needed, we will still keep the cached values
// though at the caller.
Ok(None)
} else {
let loaded_at = t.loaded_at;
let last_record_lsn = t.get_last_record_lsn();
let current_exact_logical_size = {
let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_id, timeline_id = %t.timeline_id);
let res = span
.in_scope(|| t.get_current_logical_size(ctx))
.context("get_current_logical_size");
match res? {
// Only send timeline logical size when it is fully calculated.
(size, is_exact) if is_exact => Some(size),
(_, _) => None,
}
};
Ok(Some(TimelineSnapshot {
loaded_at,
last_record_lsn,
current_exact_logical_size,
}))
}
}
/// Produce the timeline consumption metrics into the `metrics` argument.
fn to_metrics(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
now: DateTime<Utc>,
metrics: &mut Vec<RawMetric>,
cache: &Cache,
) {
let timeline_written_size = u64::from(self.last_record_lsn);
let written_size_delta_key = MetricsKey::written_size_delta(tenant_id, timeline_id);
let last_stop_time = cache
.get(written_size_delta_key.key())
.map(|(until, _val)| {
until
.incremental_timerange()
.expect("never create EventType::Absolute for written_size_delta")
.end
});
let (key, written_size_now) =
MetricsKey::written_size(tenant_id, timeline_id).at(now, timeline_written_size);
// by default, use the last sent written_size as the basis for
// calculating the delta. if we don't yet have one, use the load time value.
let prev = cache
.get(&key)
.map(|(prev_at, prev)| {
// use the prev time from our last incremental update, or default to latest
// absolute update on the first round.
let prev_at = prev_at
.absolute_time()
.expect("never create EventType::Incremental for written_size");
let prev_at = last_stop_time.unwrap_or(prev_at);
(*prev_at, *prev)
})
.unwrap_or_else(|| {
// if we don't have a previous point of comparison, compare to the load time
// lsn.
let (disk_consistent_lsn, loaded_at) = &self.loaded_at;
(DateTime::from(*loaded_at), disk_consistent_lsn.0)
});
let up_to = now;
if let Some(delta) = written_size_now.1.checked_sub(prev.1) {
let key_value = written_size_delta_key.from_previous_up_to(prev.0, up_to, delta);
// written_size_delta
metrics.push(key_value);
// written_size
metrics.push((key, written_size_now));
} else {
// the cached value was ahead of us, report zero until we've caught up
metrics.push(written_size_delta_key.from_previous_up_to(prev.0, up_to, 0));
// the cached value was ahead of us, report the same until we've caught up
metrics.push((key, (written_size_now.0, prev.1)));
}
{
let factory = MetricsKey::timeline_logical_size(tenant_id, timeline_id);
let current_or_previous = self
.current_exact_logical_size
.or_else(|| cache.get(factory.key()).map(|(_, val)| *val));
if let Some(size) = current_or_previous {
metrics.push(factory.at(now, size));
}
}
}
}
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,361 @@
use std::collections::HashMap;
use std::time::SystemTime;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use super::*;
use chrono::{DateTime, Utc};
#[test]
fn startup_collected_timeline_metrics_before_advancing() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let mut metrics = Vec::new();
let cache = HashMap::new();
let initdb_lsn = Lsn(0x10000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, SystemTime::now()),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
};
let now = DateTime::<Utc>::from(SystemTime::now());
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
snap.loaded_at.1.into(),
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
#[test]
fn startup_collected_timeline_metrics_second_round() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [now, before, init] = time_backwards();
let now = DateTime::<Utc>::from(now);
let before = DateTime::<Utc>::from(before);
let initdb_lsn = Lsn(0x10000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let mut metrics = Vec::new();
let cache = HashMap::from([
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0)
]);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_previous_up_to(before, now, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
#[test]
fn startup_collected_timeline_metrics_nth_round_at_same_lsn() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [now, just_before, before, init] = time_backwards();
let now = DateTime::<Utc>::from(now);
let just_before = DateTime::<Utc>::from(just_before);
let before = DateTime::<Utc>::from(before);
let initdb_lsn = Lsn(0x10000);
let disk_consistent_lsn = Lsn(initdb_lsn.0 * 2);
let mut metrics = Vec::new();
let cache = HashMap::from([
// at t=before was the last time the last_record_lsn changed
MetricsKey::written_size(tenant_id, timeline_id).at(before, disk_consistent_lsn.0),
// end time of this event is used for the next ones
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
before,
just_before,
0,
),
]);
let snap = TimelineSnapshot {
loaded_at: (disk_consistent_lsn, init),
last_record_lsn: disk_consistent_lsn,
current_exact_logical_size: Some(0x42000),
};
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
just_before,
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, disk_consistent_lsn.0),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0x42000)
]
);
}
#[test]
fn metric_image_stability() {
// it is important that these strings stay as they are
let tenant_id = TenantId::from_array([0; 16]);
let timeline_id = TimelineId::from_array([0xff; 16]);
let now = DateTime::parse_from_rfc3339("2023-09-15T00:00:00.123456789Z").unwrap();
let before = DateTime::parse_from_rfc3339("2023-09-14T00:00:00.123456789Z").unwrap();
let [now, before] = [DateTime::<Utc>::from(now), DateTime::from(before)];
let examples = [
(
line!(),
MetricsKey::written_size(tenant_id, timeline_id).at(now, 0),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"written_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_previous_up_to(before, now, 0),
r#"{"type":"incremental","start_time":"2023-09-14T00:00:00.123456789Z","stop_time":"2023-09-15T00:00:00.123456789Z","metric":"written_data_bytes_delta","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 0),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"timeline_logical_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000","timeline_id":"ffffffffffffffffffffffffffffffff"}"#,
),
(
line!(),
MetricsKey::remote_storage_size(tenant_id).at(now, 0),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"remote_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
),
(
line!(),
MetricsKey::resident_size(tenant_id).at(now, 0),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"resident_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":0,"tenant_id":"00000000000000000000000000000000"}"#,
),
(
line!(),
MetricsKey::synthetic_size(tenant_id).at(now, 1),
r#"{"type":"absolute","time":"2023-09-15T00:00:00.123456789Z","metric":"synthetic_storage_size","idempotency_key":"2023-09-15 00:00:00.123456789 UTC-1-0000","value":1,"tenant_id":"00000000000000000000000000000000"}"#,
),
];
let idempotency_key = consumption_metrics::IdempotencyKey::for_tests(now, "1", 0);
for (line, (key, (kind, value)), expected) in examples {
let e = consumption_metrics::Event {
kind,
metric: key.metric,
idempotency_key: idempotency_key.to_string(),
value,
extra: Ids {
tenant_id: key.tenant_id,
timeline_id: key.timeline_id,
},
};
let actual = serde_json::to_string(&e).unwrap();
assert_eq!(expected, actual, "example from line {line}");
}
}
#[test]
fn post_restart_written_sizes_with_rolled_back_last_record_lsn() {
// it can happen that we lose the inmemorylayer but have previously sent metrics and we
// should never go backwards
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [later, now, at_restart] = time_backwards();
// FIXME: tests would be so much easier if we did not need to juggle back and forth
// SystemTime and DateTime::<Utc> ... Could do the conversion only at upload time?
let now = DateTime::<Utc>::from(now);
let later = DateTime::<Utc>::from(later);
let before_restart = at_restart - std::time::Duration::from_secs(5 * 60);
let way_before = before_restart - std::time::Duration::from_secs(10 * 60);
let before_restart = DateTime::<Utc>::from(before_restart);
let way_before = DateTime::<Utc>::from(way_before);
let snap = TimelineSnapshot {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
current_exact_logical_size: None,
};
let mut cache = HashMap::from([
MetricsKey::written_size(tenant_id, timeline_id).at(before_restart, 100),
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
way_before,
before_restart,
// not taken into account, but the timestamps are important
999_999_999,
),
]);
let mut metrics = Vec::new();
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id).from_previous_up_to(
before_restart,
now,
0
),
MetricsKey::written_size(tenant_id, timeline_id).at(now, 100),
]
);
// now if we cache these metrics, and re-run while "still in recovery"
cache.extend(metrics.drain(..));
// "still in recovery", because our snapshot did not change
snap.to_metrics(tenant_id, timeline_id, later, &mut metrics, &cache);
assert_eq!(
metrics,
&[
MetricsKey::written_size_delta(tenant_id, timeline_id)
.from_previous_up_to(now, later, 0),
MetricsKey::written_size(tenant_id, timeline_id).at(later, 100),
]
);
}
#[test]
fn post_restart_current_exact_logical_size_uses_cached() {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let [now, at_restart] = time_backwards();
let now = DateTime::<Utc>::from(now);
let before_restart = at_restart - std::time::Duration::from_secs(5 * 60);
let before_restart = DateTime::<Utc>::from(before_restart);
let snap = TimelineSnapshot {
loaded_at: (Lsn(50), at_restart),
last_record_lsn: Lsn(50),
current_exact_logical_size: None,
};
let cache = HashMap::from([
MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(before_restart, 100)
]);
let mut metrics = Vec::new();
snap.to_metrics(tenant_id, timeline_id, now, &mut metrics, &cache);
metrics.retain(|(key, _)| key.metric == Name::LogicalSize);
assert_eq!(
metrics,
&[MetricsKey::timeline_logical_size(tenant_id, timeline_id).at(now, 100)]
);
}
#[test]
fn post_restart_synthetic_size_uses_cached_if_available() {
let tenant_id = TenantId::generate();
let ts = TenantSnapshot {
resident_size: 1000,
remote_size: 1000,
// not yet calculated
synthetic_size: 0,
};
let now = SystemTime::now();
let before_restart = DateTime::<Utc>::from(now - std::time::Duration::from_secs(5 * 60));
let now = DateTime::<Utc>::from(now);
let cached = HashMap::from([MetricsKey::synthetic_size(tenant_id).at(before_restart, 1000)]);
let mut metrics = Vec::new();
ts.to_metrics(tenant_id, now, &cached, &mut metrics);
assert_eq!(
metrics,
&[
MetricsKey::remote_storage_size(tenant_id).at(now, 1000),
MetricsKey::resident_size(tenant_id).at(now, 1000),
MetricsKey::synthetic_size(tenant_id).at(now, 1000),
]
);
}
#[test]
fn post_restart_synthetic_size_is_not_sent_when_not_cached() {
let tenant_id = TenantId::generate();
let ts = TenantSnapshot {
resident_size: 1000,
remote_size: 1000,
// not yet calculated
synthetic_size: 0,
};
let now = SystemTime::now();
let now = DateTime::<Utc>::from(now);
let cached = HashMap::new();
let mut metrics = Vec::new();
ts.to_metrics(tenant_id, now, &cached, &mut metrics);
assert_eq!(
metrics,
&[
MetricsKey::remote_storage_size(tenant_id).at(now, 1000),
MetricsKey::resident_size(tenant_id).at(now, 1000),
// no synthetic size here
]
);
}
fn time_backwards<const N: usize>() -> [std::time::SystemTime; N] {
let mut times = [std::time::SystemTime::UNIX_EPOCH; N];
times[0] = std::time::SystemTime::now();
for behind in 1..N {
times[behind] = times[0] - std::time::Duration::from_secs(behind as u64);
}
times
}

View File

@@ -0,0 +1,177 @@
use consumption_metrics::{idempotency_key, Event, EventChunk, CHUNK_SIZE};
use tokio_util::sync::CancellationToken;
use tracing::*;
use super::{Cache, Ids, RawMetric};
#[tracing::instrument(skip_all, fields(metrics_total = %metrics.len()))]
pub(super) async fn upload_metrics(
client: &reqwest::Client,
metric_collection_endpoint: &reqwest::Url,
cancel: &CancellationToken,
node_id: &str,
metrics: &[RawMetric],
cached_metrics: &mut Cache,
) -> anyhow::Result<()> {
use bytes::BufMut;
let mut uploaded = 0;
let mut failed = 0;
let started_at = std::time::Instant::now();
// write to a BytesMut so that we can cheaply clone the frozen Bytes for retries
let mut buffer = bytes::BytesMut::new();
let mut chunk_to_send = Vec::new();
for chunk in metrics.chunks(CHUNK_SIZE) {
chunk_to_send.clear();
// FIXME: this should always overwrite and truncate to chunk.len()
chunk_to_send.extend(chunk.iter().map(|(curr_key, (when, curr_val))| Event {
kind: *when,
metric: curr_key.metric,
// FIXME: finally write! this to the prev allocation
idempotency_key: idempotency_key(node_id),
value: *curr_val,
extra: Ids {
tenant_id: curr_key.tenant_id,
timeline_id: curr_key.timeline_id,
},
}));
serde_json::to_writer(
(&mut buffer).writer(),
&EventChunk {
events: (&chunk_to_send).into(),
},
)?;
let body = buffer.split().freeze();
let event_bytes = body.len();
let res = upload(client, metric_collection_endpoint, body, cancel)
.instrument(tracing::info_span!(
"upload",
%event_bytes,
uploaded,
total = metrics.len(),
))
.await;
match res {
Ok(()) => {
for (curr_key, curr_val) in chunk {
cached_metrics.insert(*curr_key, *curr_val);
}
uploaded += chunk.len();
}
Err(_) => {
// failure(s) have already been logged
//
// however this is an inconsistency: if we crash here, we will start with the
// values as uploaded. in practice, the rejections no longer happen.
failed += chunk.len();
}
}
}
let elapsed = started_at.elapsed();
tracing::info!(
uploaded,
failed,
elapsed_ms = elapsed.as_millis(),
"done sending metrics"
);
Ok(())
}
enum UploadError {
Rejected(reqwest::StatusCode),
Reqwest(reqwest::Error),
Cancelled,
}
impl std::fmt::Debug for UploadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// use same impl because backoff::retry will log this using both
std::fmt::Display::fmt(self, f)
}
}
impl std::fmt::Display for UploadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use UploadError::*;
match self {
Rejected(code) => write!(f, "server rejected the metrics with {code}"),
Reqwest(e) => write!(f, "request failed: {e}"),
Cancelled => write!(f, "cancelled"),
}
}
}
impl UploadError {
fn is_reject(&self) -> bool {
matches!(self, UploadError::Rejected(_))
}
}
async fn upload(
client: &reqwest::Client,
metric_collection_endpoint: &reqwest::Url,
body: bytes::Bytes,
cancel: &CancellationToken,
) -> Result<(), UploadError> {
let warn_after = 3;
let max_attempts = 10;
let res = utils::backoff::retry(
move || {
let body = body.clone();
async move {
let res = client
.post(metric_collection_endpoint.clone())
.header(reqwest::header::CONTENT_TYPE, "application/json")
.body(body)
.send()
.await;
let res = res.and_then(|res| res.error_for_status());
match res {
Ok(_response) => Ok(()),
Err(e) => {
let status = e.status().filter(|s| s.is_client_error());
if let Some(status) = status {
Err(UploadError::Rejected(status))
} else {
Err(UploadError::Reqwest(e))
}
}
}
}
},
UploadError::is_reject,
warn_after,
max_attempts,
"upload consumption_metrics",
utils::backoff::Cancel::new(cancel.clone(), || UploadError::Cancelled),
)
.await;
match &res {
Ok(_) => {}
Err(e) if e.is_reject() => {
// permanent errors currently do not get logged by backoff::retry
// display alternate has no effect, but keeping it here for easier pattern matching.
tracing::error!("failed to upload metrics: {e:#}");
}
Err(_) => {
// these have been logged already
}
}
res
}

View File

@@ -444,6 +444,7 @@ impl<'a> WalIngest<'a> {
// need to clear the corresponding bits in the visibility map.
let mut new_heap_blkno: Option<u32> = None;
let mut old_heap_blkno: Option<u32> = None;
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
match self.timeline.pg_version {
14 => {
@@ -470,14 +471,20 @@ impl<'a> WalIngest<'a> {
// we can't validate the remaining number of bytes without parsing
// the tuple data.
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
}
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
// non-HOT update where the new tuple goes to different page than
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
// set.
new_heap_blkno = Some(decoded.blocks[1].blkno);
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP_LOCK {
let xlrec = v14::XlHeapLock::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
@@ -497,6 +504,12 @@ impl<'a> WalIngest<'a> {
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
let xlrec = v14::XlHeapLockUpdated::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
@@ -526,14 +539,20 @@ impl<'a> WalIngest<'a> {
// we can't validate the remaining number of bytes without parsing
// the tuple data.
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
}
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
// non-HOT update where the new tuple goes to different page than
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
// set.
new_heap_blkno = Some(decoded.blocks[1].blkno);
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP_LOCK {
let xlrec = v15::XlHeapLock::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
@@ -553,6 +572,12 @@ impl<'a> WalIngest<'a> {
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
let xlrec = v15::XlHeapLockUpdated::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
@@ -582,14 +607,20 @@ impl<'a> WalIngest<'a> {
// we can't validate the remaining number of bytes without parsing
// the tuple data.
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
}
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
// non-HOT update where the new tuple goes to different page than
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
// set.
new_heap_blkno = Some(decoded.blocks[1].blkno);
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP_LOCK {
let xlrec = v16::XlHeapLock::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID {
@@ -609,6 +640,12 @@ impl<'a> WalIngest<'a> {
if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 {
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
} else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED {
let xlrec = v16::XlHeapLockUpdated::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
} else {
bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid);
@@ -616,7 +653,6 @@ impl<'a> WalIngest<'a> {
}
_ => {}
}
// FIXME: What about XLOG_HEAP_LOCK and XLOG_HEAP2_LOCK_UPDATED?
// Clear the VM bits if required.
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
@@ -660,7 +696,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)
@@ -676,7 +712,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno: None,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)
@@ -690,7 +726,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno: None,
old_heap_blkno,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)
@@ -717,6 +753,8 @@ impl<'a> WalIngest<'a> {
// need to clear the corresponding bits in the visibility map.
let mut new_heap_blkno: Option<u32> = None;
let mut old_heap_blkno: Option<u32> = None;
let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS;
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
match self.timeline.pg_version {
@@ -745,14 +783,14 @@ impl<'a> WalIngest<'a> {
// we can't validate the remaining number of bytes without parsing
// the tuple data.
if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno);
}
if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 {
// PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a
// non-HOT update where the new tuple goes to different page than
// the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is
// set.
new_heap_blkno = Some(decoded.blocks[1].blkno);
new_heap_blkno = Some(decoded.blocks[0].blkno);
}
}
pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => {
@@ -772,7 +810,11 @@ impl<'a> WalIngest<'a> {
}
}
pg_constants::XLOG_NEON_HEAP_LOCK => {
/* XLOG_NEON_HEAP_LOCK doesn't need special care */
let xlrec = v16::rm_neon::XlNeonHeapLock::decode(buf);
if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 {
old_heap_blkno = Some(decoded.blocks[0].blkno);
flags = pg_constants::VISIBILITYMAP_ALL_FROZEN;
}
}
info => bail!("Unknown WAL record type for Neon RMGR: {}", info),
}
@@ -783,8 +825,6 @@ impl<'a> WalIngest<'a> {
),
}
// FIXME: What about XLOG_NEON_HEAP_LOCK?
// Clear the VM bits if required.
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
let vm_rel = RelTag {
@@ -827,7 +867,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)
@@ -843,7 +883,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
old_heap_blkno: None,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)
@@ -857,7 +897,7 @@ impl<'a> WalIngest<'a> {
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno: None,
old_heap_blkno,
flags: pg_constants::VISIBILITYMAP_VALID_BITS,
flags,
},
ctx,
)

View File

@@ -219,20 +219,66 @@ pub mod v14 {
old_offnum: buf.get_u16_le(),
old_infobits_set: buf.get_u8(),
flags: buf.get_u8(),
t_cid: buf.get_u32(),
t_cid: buf.get_u32_le(),
new_xmax: buf.get_u32_le(),
new_offnum: buf.get_u16_le(),
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapLock {
pub locking_xid: TransactionId,
pub offnum: OffsetNumber,
pub _padding: u16,
pub t_cid: u32,
pub infobits_set: u8,
pub flags: u8,
}
impl XlHeapLock {
pub fn decode(buf: &mut Bytes) -> XlHeapLock {
XlHeapLock {
locking_xid: buf.get_u32_le(),
offnum: buf.get_u16_le(),
_padding: buf.get_u16_le(),
t_cid: buf.get_u32_le(),
infobits_set: buf.get_u8(),
flags: buf.get_u8(),
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapLockUpdated {
pub xmax: TransactionId,
pub offnum: OffsetNumber,
pub infobits_set: u8,
pub flags: u8,
}
impl XlHeapLockUpdated {
pub fn decode(buf: &mut Bytes) -> XlHeapLockUpdated {
XlHeapLockUpdated {
xmax: buf.get_u32_le(),
offnum: buf.get_u16_le(),
infobits_set: buf.get_u8(),
flags: buf.get_u8(),
}
}
}
}
pub mod v15 {
pub use super::v14::{XlHeapDelete, XlHeapInsert, XlHeapMultiInsert, XlHeapUpdate};
pub use super::v14::{
XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapLockUpdated, XlHeapMultiInsert, XlHeapUpdate,
};
}
pub mod v16 {
pub use super::v14::{XlHeapInsert, XlHeapMultiInsert};
pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert};
use bytes::{Buf, Bytes};
use postgres_ffi::{OffsetNumber, TransactionId};
@@ -278,6 +324,26 @@ pub mod v16 {
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapLock {
pub locking_xid: TransactionId,
pub offnum: OffsetNumber,
pub infobits_set: u8,
pub flags: u8,
}
impl XlHeapLock {
pub fn decode(buf: &mut Bytes) -> XlHeapLock {
XlHeapLock {
locking_xid: buf.get_u32_le(),
offnum: buf.get_u16_le(),
infobits_set: buf.get_u8(),
flags: buf.get_u8(),
}
}
}
/* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
pub mod rm_neon {
use bytes::{Buf, Bytes};
@@ -366,6 +432,28 @@ pub mod v16 {
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlNeonHeapLock {
pub locking_xid: TransactionId,
pub t_cid: u32,
pub offnum: OffsetNumber,
pub infobits_set: u8,
pub flags: u8,
}
impl XlNeonHeapLock {
pub fn decode(buf: &mut Bytes) -> XlNeonHeapLock {
XlNeonHeapLock {
locking_xid: buf.get_u32_le(),
t_cid: buf.get_u32_le(),
offnum: buf.get_u16_le(),
infobits_set: buf.get_u8(),
flags: buf.get_u8(),
}
}
}
}
}

View File

@@ -1450,6 +1450,9 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
BlockNumber n_blocks;
bool latest;
XLogRecPtr request_lsn;
XLogRecPtr rel_lsn;
XLogRecPtr db_lsn;
static const NRelFileInfo dummyNode = {0};
switch (reln->smgr_relpersistence)
{
@@ -1504,7 +1507,9 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
return false;
}
request_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO);
rel_lsn = neon_get_request_lsn(&latest, InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO);
db_lsn = neon_get_request_lsn(&latest, dummyNode, MAIN_FORKNUM, 0);
request_lsn = Max(rel_lsn, db_lsn);
{
NeonExistsRequest request = {
.req.tag = T_NeonExistsRequest,

View File

@@ -121,7 +121,7 @@ async fn collect_metrics_iteration(
let current_metrics = gather_proxy_io_bytes_per_client();
let metrics_to_send: Vec<Event<Ids>> = current_metrics
let metrics_to_send: Vec<Event<Ids, &'static str>> = current_metrics
.iter()
.filter_map(|(curr_key, (curr_val, curr_time))| {
let mut start_time = *curr_time;

View File

@@ -847,18 +847,6 @@ class NeonEnv:
"""Get list of safekeeper endpoints suitable for safekeepers GUC"""
return ",".join(f"localhost:{wa.port.pg}" for wa in self.safekeepers)
def timeline_dir(
self, tenant_id: TenantId, timeline_id: TimelineId, pageserver_id: Optional[int] = None
) -> Path:
"""Get a timeline directory's path based on the repo directory of the test environment"""
return (
self.tenant_dir(tenant_id, pageserver_id=pageserver_id) / "timelines" / str(timeline_id)
)
def tenant_dir(self, tenant_id: TenantId, pageserver_id: Optional[int] = None) -> Path:
"""Get a tenant directory's path based on the repo directory of the test environment"""
return self.get_pageserver(pageserver_id).workdir / "tenants" / str(tenant_id)
def get_pageserver_version(self) -> str:
bin_pageserver = str(self.neon_binpath / "pageserver")
res = subprocess.run(
@@ -1580,6 +1568,21 @@ class NeonPageserver(PgProtocol):
'.*registered custom resource manager "neon".*',
]
def timeline_dir(self, tenant_id: TenantId, timeline_id: Optional[TimelineId] = None) -> Path:
"""Get a timeline directory's path based on the repo directory of the test environment"""
if timeline_id is None:
return self.tenant_dir(tenant_id) / "timelines"
return self.tenant_dir(tenant_id) / "timelines" / str(timeline_id)
def tenant_dir(
self,
tenant_id: Optional[TenantId] = None,
) -> Path:
"""Get a tenant directory's path based on the repo directory of the test environment"""
if tenant_id is None:
return self.workdir / "tenants"
return self.workdir / "tenants" / str(tenant_id)
def start(
self,
overrides: Tuple[str, ...] = (),

View File

@@ -44,7 +44,7 @@ def measure_recovery_time(env: NeonCompare):
# Stop pageserver and remove tenant data
env.env.pageserver.stop()
timeline_dir = env.env.timeline_dir(env.tenant, env.timeline)
timeline_dir = env.env.pageserver.timeline_dir(env.tenant, env.timeline)
shutil.rmtree(timeline_dir)
# Start pageserver

View File

@@ -135,7 +135,7 @@ def test_timeline_init_break_before_checkpoint(neon_env_builder: NeonEnvBuilder)
tenant_id = env.initial_tenant
timelines_dir = env.pageserver.workdir / "tenants" / str(tenant_id) / "timelines"
timelines_dir = env.pageserver.timeline_dir(tenant_id)
old_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
initial_timeline_dirs = [d for d in timelines_dir.iterdir()]
@@ -166,7 +166,7 @@ def test_timeline_create_break_after_uninit_mark(neon_env_builder: NeonEnvBuilde
tenant_id = env.initial_tenant
timelines_dir = env.pageserver.workdir / "tenants" / str(tenant_id) / "timelines"
timelines_dir = env.pageserver.timeline_dir(tenant_id)
old_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
initial_timeline_dirs = [d for d in timelines_dir.iterdir()]

View File

@@ -1,16 +1,22 @@
import os
import pathlib
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.pg_version import PgVersion
from fixtures.utils import query_scalar
#
# Test CREATE DATABASE when there have been relmapper changes
#
def test_createdb(neon_simple_env: NeonEnv):
@pytest.mark.parametrize("strategy", ["file_copy", "wal_log"])
def test_createdb(neon_simple_env: NeonEnv, strategy: str):
env = neon_simple_env
if env.pg_version == PgVersion.V14 and strategy == "wal_log":
pytest.skip("wal_log strategy not supported on PostgreSQL 14")
env.neon_cli.create_branch("test_createdb", "empty")
endpoint = env.endpoints.create_start("test_createdb")
@@ -20,7 +26,10 @@ def test_createdb(neon_simple_env: NeonEnv):
# Cause a 'relmapper' change in the original branch
cur.execute("VACUUM FULL pg_class")
cur.execute("CREATE DATABASE foodb")
if env.pg_version == PgVersion.V14:
cur.execute("CREATE DATABASE foodb")
else:
cur.execute(f"CREATE DATABASE foodb STRATEGY={strategy}")
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")

View File

@@ -417,7 +417,7 @@ def poor_mans_du(
largest_layer = 0
smallest_layer = None
for tenant_id, timeline_id in timelines:
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
assert timeline_dir.exists(), f"timeline dir does not exist: {timeline_dir}"
total = 0
for file in timeline_dir.iterdir():

View File

@@ -271,7 +271,7 @@ def _import(
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
dir_to_clear = env.pageserver.tenant_dir()
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)

View File

@@ -55,7 +55,7 @@ def test_basic_eviction(
for sk in env.safekeepers:
sk.stop()
timeline_path = env.timeline_dir(tenant_id, timeline_id)
timeline_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
initial_local_layers = sorted(
list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))
)
@@ -243,7 +243,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
assert by_kind["Image"] > 0
assert by_kind["Delta"] > 0
assert by_kind["InMemory"] == 0
resident_layers = list(env.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
resident_layers = list(env.pageserver.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
log.info("resident layers count before eviction: %s", len(resident_layers))
log.info("evict all layers")
@@ -251,7 +251,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
def ensure_resident_and_remote_size_metrics():
log.info("ensure that all the layers are gone")
resident_layers = list(env.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
resident_layers = list(env.pageserver.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
# we have disabled all background loops, so, this should hold
assert len(resident_layers) == 0

View File

@@ -38,7 +38,7 @@ def test_image_layer_writer_fail_before_finish(neon_simple_env: NeonEnv):
new_temp_layer_files = list(
filter(
lambda file: str(file).endswith(NeonPageserver.TEMP_FILE_SUFFIX),
[path for path in env.timeline_dir(tenant_id, timeline_id).iterdir()],
[path for path in env.pageserver.timeline_dir(tenant_id, timeline_id).iterdir()],
)
)
@@ -84,7 +84,7 @@ def test_delta_layer_writer_fail_before_finish(neon_simple_env: NeonEnv):
new_temp_layer_files = list(
filter(
lambda file: str(file).endswith(NeonPageserver.TEMP_FILE_SUFFIX),
[path for path in env.timeline_dir(tenant_id, timeline_id).iterdir()],
[path for path in env.pageserver.timeline_dir(tenant_id, timeline_id).iterdir()],
)
)

View File

@@ -1,273 +0,0 @@
#
# Test for collecting metrics from pageserver and proxy.
# Use mock HTTP server to receive metrics and verify that they look sane.
#
from pathlib import Path
from queue import SimpleQueue
from typing import Any, Iterator, Set
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
PSQL,
NeonEnvBuilder,
NeonProxy,
VanillaPostgres,
wait_for_last_flush_lsn,
)
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@pytest.mark.parametrize(
"remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.LOCAL_FS]
)
def test_metric_collection(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
remote_storage_kind: RemoteStorageKind,
):
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
metric_kinds_checked: Set[str] = set([])
uploads: SimpleQueue[Any] = SimpleQueue()
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
uploads.put(events)
return Response(status=200)
# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = (
f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
"""
+ "tenant_config={pitr_interval = '0 sec'}"
)
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)
# spin up neon, after http server is ready
env = neon_env_builder.init_start()
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_metric_collection")
endpoint = env.endpoints.create_start("test_metric_collection")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
cur.execute(
"""
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, 100000) g
"""
)
# Helper function that gets the number of given kind of remote ops from the metrics
def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
ps_metrics = env.pageserver.http_client().get_metrics()
total = 0.0
for sample in ps_metrics.query_all(
name="pageserver_remote_operation_seconds_count",
filter={
"file_kind": str(file_kind),
"op_kind": str(op_kind),
},
):
total += sample[2]
return int(total)
remote_uploaded = 0
# upload some data to remote storage
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
remote_uploaded = get_num_remote_ops("index", "upload")
assert remote_uploaded > 0
# we expect uploads at 1Hz, on busy runners this could be too optimistic,
# so give 5s we only want to get the following upload after "ready" value.
# later tests will be added to ensure that the timeseries are sane.
timeout = 5
uploads.put("ready")
while True:
# discard earlier than "ready"
log.info("waiting for upload")
events = uploads.get(timeout=timeout)
import json
if events == "ready":
events = uploads.get(timeout=timeout)
httpserver.check()
httpserver.stop()
# if anything comes after this, we'll just ignore it
stringified = json.dumps(events, indent=2)
log.info(f"inspecting: {stringified}")
break
else:
stringified = json.dumps(events, indent=2)
log.info(f"discarding: {stringified}")
# verify that metrics look minimally sane
checks = {
"written_size": lambda value: value > 0,
"resident_size": lambda value: value >= 0,
"remote_storage_size": lambda value: value > 0 if remote_uploaded > 0 else value == 0,
# logical size may lag behind the actual size, so allow 0 here
"timeline_logical_size": lambda value: value >= 0,
# this can also be zero, depending on when we get the value
"written_data_bytes_delta": lambda value: value >= 0,
}
metric_kinds_checked = set()
metric_kinds_seen = set()
for event in events:
assert event["tenant_id"] == str(tenant_id)
metric_name = event["metric"]
metric_kinds_seen.add(metric_name)
check = checks.get(metric_name)
# calm down mypy
if check is not None:
value = event["value"]
log.info(f"checking {metric_name} value {value}")
assert check(value), f"{metric_name} isn't valid"
metric_kinds_checked.add(metric_name)
expected_checks = set(checks.keys())
assert (
metric_kinds_checked == checks.keys()
), f"Expected to receive and check all kind of metrics, but {expected_checks - metric_kinds_checked} got uncovered"
assert metric_kinds_seen == metric_kinds_checked
def proxy_metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
log.info("received events:")
log.info(events)
# perform basic sanity checks
for event in events:
assert event["metric"] == "proxy_io_bytes_per_client"
assert event["endpoint_id"] == "test_endpoint_id"
assert event["value"] >= 0
assert event["stop_time"] >= event["start_time"]
return Response(status=200)
@pytest.fixture(scope="function")
def proxy_with_metric_collector(
port_distributor: PortDistributor,
neon_binpath: Path,
httpserver_listen_address,
test_output_dir: Path,
) -> Iterator[NeonProxy]:
"""Neon proxy that routes through link auth and has metric collection enabled."""
http_port = port_distributor.get_port()
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
metric_collection_interval = "5s"
with NeonProxy(
neon_binpath=neon_binpath,
test_output_dir=test_output_dir,
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
external_http_port=external_http_port,
metric_collection_endpoint=metric_collection_endpoint,
metric_collection_interval=metric_collection_interval,
auth_backend=NeonProxy.Link(),
) as proxy:
proxy.start()
yield proxy
@pytest.mark.asyncio
async def test_proxy_metric_collection(
httpserver: HTTPServer,
proxy_with_metric_collector: NeonProxy,
vanilla_pg: VanillaPostgres,
):
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
proxy_metrics_handler
)
# do something to generate load to generate metrics
# sleep for 5 seconds to give metric collector time to collect metrics
psql = await PSQL(
host=proxy_with_metric_collector.host, port=proxy_with_metric_collector.proxy_port
).run(
"create table tbl as select * from generate_series(0,1000); select pg_sleep(5); select 42"
)
base_uri = proxy_with_metric_collector.link_auth_uri
link = await NeonProxy.find_auth_link(base_uri, psql)
psql_session_id = NeonProxy.get_session_id(base_uri, link)
await NeonProxy.activate_link_auth(vanilla_pg, proxy_with_metric_collector, psql_session_id)
assert psql.stdout is not None
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "42"
# do something to generate load to generate metrics
# sleep for 5 seconds to give metric collector time to collect metrics
psql = await PSQL(
host=proxy_with_metric_collector.host, port=proxy_with_metric_collector.proxy_port
).run("insert into tbl select * from generate_series(0,1000); select pg_sleep(5); select 42")
link = await NeonProxy.find_auth_link(base_uri, psql)
psql_session_id = NeonProxy.get_session_id(base_uri, link)
await NeonProxy.activate_link_auth(
vanilla_pg, proxy_with_metric_collector, psql_session_id, create_user=False
)
assert psql.stdout is not None
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "42"
httpserver.check()

View File

@@ -3,7 +3,6 @@
import time
from collections import defaultdict
from pathlib import Path
from typing import Any, DefaultDict, Dict, Tuple
import pytest
@@ -115,7 +114,7 @@ def test_ondemand_download_large_rel(
env.pageserver.stop()
# remove all the layer files
for layer in (Path(env.pageserver.workdir) / "tenants").glob("*/timelines/*/*-*_*"):
for layer in env.pageserver.tenant_dir().glob("*/timelines/*/*-*_*"):
log.info(f"unlinking layer {layer}")
layer.unlink()
@@ -237,7 +236,7 @@ def test_ondemand_download_timetravel(
env.pageserver.stop()
# remove all the layer files
for layer in (Path(env.pageserver.workdir) / "tenants").glob("*/timelines/*/*-*_*"):
for layer in env.pageserver.tenant_dir().glob("*/timelines/*/*-*_*"):
log.info(f"unlinking layer {layer}")
layer.unlink()
@@ -324,8 +323,8 @@ def test_download_remote_layers_api(
"compaction_period": "0s",
# small checkpoint distance to create more delta layer files
"checkpoint_distance": f"{1 * 1024 ** 2}", # 1 MB
"compaction_threshold": "1",
"image_creation_threshold": "1",
"compaction_threshold": "999999",
"image_creation_threshold": "999999",
"compaction_target_size": f"{1 * 1024 ** 2}", # 1 MB
}
)
@@ -358,8 +357,20 @@ def test_download_remote_layers_api(
tenant_id, timeline_id, "pageserver_resident_physical_size"
)
# Shut down safekeepers before starting the pageserver.
# If we don't, they might stream us more WAL.
for sk in env.safekeepers:
sk.stop()
# it is sad we cannot do a flush inmem layer without compaction, but
# working around with very high layer0 count and image layer creation
# threshold
client.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload_queue_empty(client, tenant_id, timeline_id)
filled_current_physical = get_api_current_physical_size()
log.info(filled_current_physical)
log.info(f"filled_current_physical: {filled_current_physical}")
filled_size = get_resident_physical_size()
log.info(f"filled_size: {filled_size}")
assert filled_current_physical == filled_size, "we don't yet do layer eviction"
@@ -367,18 +378,10 @@ def test_download_remote_layers_api(
env.pageserver.stop()
# remove all the layer files
# XXX only delete some of the layer files, to show that it really just downloads all the layers
for layer in (Path(env.pageserver.workdir) / "tenants").glob("*/timelines/*/*-*_*"):
for layer in env.pageserver.tenant_dir().glob("*/timelines/*/*-*_*"):
log.info(f"unlinking layer {layer.name}")
layer.unlink()
# Shut down safekeepers before starting the pageserver.
# If we don't, the tenant's walreceiver handler will trigger the
# the logical size computation task, and that downloads layes,
# which makes our assertions on size fail.
for sk in env.safekeepers:
sk.stop(immediate=True)
##### Second start, restore the data and ensure it's the same
env.pageserver.start(extra_env_vars={"FAILPOINTS": "remote-storage-download-pre-rename=return"})
env.pageserver.allowed_errors.extend(
@@ -392,32 +395,21 @@ def test_download_remote_layers_api(
###### Phase 1: exercise download error code path
# comparison here is requiring the size to be at least the previous size, because it's possible received WAL after last_flush_lsn_upload
# witnessed for example difference of 29827072 (filled_current_physical) to 29868032 (here) is no good reason to fail a test.
this_time = get_api_current_physical_size()
assert (
filled_current_physical <= this_time
filled_current_physical == this_time
), "current_physical_size is sum of loaded layer sizes, independent of whether local or remote"
if filled_current_physical != this_time:
log.info(
f"fixing up filled_current_physical from {filled_current_physical} to {this_time} ({this_time - filled_current_physical})"
)
filled_current_physical = this_time
post_unlink_size = get_resident_physical_size()
log.info(f"post_unlink_size: {post_unlink_size}")
assert (
post_unlink_size < filled_size
), "we just deleted layers and didn't cause anything to re-download them yet"
assert filled_size - post_unlink_size > 5 * (
1024**2
), "we may be downloading some layers as part of tenant activation"
# issue downloads that we know will fail
info = client.timeline_download_remote_layers(
tenant_id,
timeline_id,
# allow some concurrency to unveil potential concurrency bugs
max_concurrent_downloads=10,
errors_ok=True,
at_least_one_download=False,
@@ -426,9 +418,9 @@ def test_download_remote_layers_api(
assert info["state"] == "Completed"
assert info["total_layer_count"] > 0
assert info["successful_download_count"] == 0
assert (
info["failed_download_count"] > 0
) # can't assert == total_layer_count because attach + tenant status downloads some layers
# can't assert == total_layer_count because timeline_detail also tries to
# download layers for logical size, but this might not always hold.
assert info["failed_download_count"] > 0
assert (
info["total_layer_count"]
== info["successful_download_count"] + info["failed_download_count"]
@@ -437,7 +429,6 @@ def test_download_remote_layers_api(
assert (
get_resident_physical_size() == post_unlink_size
), "didn't download anything new due to failpoint"
# would be nice to assert that the layers in the layer map are still RemoteLayer
##### Retry, this time without failpoints
client.configure_failpoints(("remote-storage-download-pre-rename", "off"))

View File

@@ -0,0 +1,333 @@
import json
import time
from queue import SimpleQueue
from typing import Any, Dict, Set
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
wait_for_last_flush_lsn,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId, TimelineId
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@pytest.mark.parametrize(
"remote_storage_kind", [RemoteStorageKind.NOOP, RemoteStorageKind.LOCAL_FS]
)
def test_metric_collection(
httpserver: HTTPServer,
neon_env_builder: NeonEnvBuilder,
httpserver_listen_address,
remote_storage_kind: RemoteStorageKind,
):
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
uploads: SimpleQueue[Any] = SimpleQueue()
def metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
uploads.put(events)
return Response(status=200)
# Require collecting metrics frequently, since we change
# the timeline and want something to be logged about it.
#
# Disable time-based pitr, we will use the manual GC calls
# to trigger remote storage operations in a controlled way
neon_env_builder.pageserver_config_override = (
f"""
metric_collection_interval="1s"
metric_collection_endpoint="{metric_collection_endpoint}"
cached_metric_collection_interval="0s"
synthetic_size_calculation_interval="3s"
"""
+ "tenant_config={pitr_interval = '0 sec'}"
)
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
log.info(f"test_metric_collection endpoint is {metric_collection_endpoint}")
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
metrics_handler
)
# spin up neon, after http server is ready
env = neon_env_builder.init_start()
# httpserver is shut down before pageserver during passing run
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
cur.execute(
"""
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, 100000) g
"""
)
# Helper function that gets the number of given kind of remote ops from the metrics
def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
ps_metrics = env.pageserver.http_client().get_metrics()
total = 0.0
for sample in ps_metrics.query_all(
name="pageserver_remote_operation_seconds_count",
filter={
"file_kind": str(file_kind),
"op_kind": str(op_kind),
},
):
total += sample[2]
return int(total)
remote_uploaded = 0
# upload some data to remote storage
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
remote_uploaded = get_num_remote_ops("index", "upload")
assert remote_uploaded > 0
# we expect uploads at 1Hz, on busy runners this could be too optimistic,
# so give 5s we only want to get the following upload after "ready" value.
timeout = 5
# these strings in the upload queue allow synchronizing with the uploads
# and the main test execution
uploads.put("ready")
# note that this verifier graph should live across restarts as long as the
# cache file lives
v = MetricsVerifier()
while True:
events = uploads.get(timeout=timeout)
if events == "ready":
events = uploads.get(timeout=timeout)
v.ingest(events)
break
else:
v.ingest(events)
if "synthetic_storage_size" not in v.accepted_event_names():
log.info("waiting for synthetic storage size to be calculated and uploaded...")
rounds = 0
while "synthetic_storage_size" not in v.accepted_event_names():
events = uploads.get(timeout=timeout)
v.ingest(events)
rounds += 1
assert rounds < 10, "did not get synthetic_storage_size in 10 uploads"
# once we have it in verifiers, it will assert that future batches will contain it
env.pageserver.stop()
time.sleep(1)
uploads.put("ready")
env.pageserver.start()
while True:
events = uploads.get(timeout=timeout)
if events == "ready":
events = uploads.get(timeout=timeout * 3)
v.ingest(events)
events = uploads.get(timeout=timeout)
v.ingest(events)
break
else:
v.ingest(events)
httpserver.check()
class MetricsVerifier:
"""
A graph of per tenant per timeline verifiers, allowing one for each
metric
"""
def __init__(self):
self.tenants: Dict[TenantId, TenantMetricsVerifier] = {}
pass
def ingest(self, events):
stringified = json.dumps(events, indent=2)
log.info(f"ingesting: {stringified}")
for event in events:
id = TenantId(event["tenant_id"])
if id not in self.tenants:
self.tenants[id] = TenantMetricsVerifier(id)
self.tenants[id].ingest(event)
for t in self.tenants.values():
t.post_batch()
def accepted_event_names(self) -> Set[str]:
names: Set[str] = set()
for t in self.tenants.values():
names = names.union(t.accepted_event_names())
return names
class TenantMetricsVerifier:
def __init__(self, id: TenantId):
self.id = id
self.timelines: Dict[TimelineId, TimelineMetricsVerifier] = {}
self.state: Dict[str, Any] = {}
def ingest(self, event):
assert TenantId(event["tenant_id"]) == self.id
if "timeline_id" in event:
id = TimelineId(event["timeline_id"])
if id not in self.timelines:
self.timelines[id] = TimelineMetricsVerifier(self.id, id)
self.timelines[id].ingest(event)
else:
name = event["metric"]
if name not in self.state:
self.state[name] = PER_METRIC_VERIFIERS[name]()
self.state[name].ingest(event, self)
def post_batch(self):
for v in self.state.values():
v.post_batch(self)
for tl in self.timelines.values():
tl.post_batch(self)
def accepted_event_names(self) -> Set[str]:
names = set(self.state.keys())
for t in self.timelines.values():
names = names.union(t.accepted_event_names())
return names
class TimelineMetricsVerifier:
def __init__(self, tenant_id: TenantId, timeline_id: TimelineId):
self.id = timeline_id
self.state: Dict[str, Any] = {}
def ingest(self, event):
name = event["metric"]
if name not in self.state:
self.state[name] = PER_METRIC_VERIFIERS[name]()
self.state[name].ingest(event, self)
def post_batch(self, parent):
for v in self.state.values():
v.post_batch(self)
def accepted_event_names(self) -> Set[str]:
return set(self.state.keys())
class CannotVerifyAnything:
"""We can only assert types, but rust already has types, so no need."""
def __init__(self):
pass
def ingest(self, event, parent):
pass
def post_batch(self, parent):
pass
class WrittenDataVerifier:
def __init__(self):
self.values = []
pass
def ingest(self, event, parent):
self.values.append(event["value"])
def post_batch(self, parent):
pass
class WrittenDataDeltaVerifier:
def __init__(self):
self.value = None
self.sum = 0
self.timerange = None
pass
def ingest(self, event, parent):
assert event["type"] == "incremental"
self.value = event["value"]
self.sum += event["value"]
start = event["start_time"]
stop = event["stop_time"]
timerange = (start, stop)
if self.timerange is not None:
# this holds across restarts
assert self.timerange[1] == timerange[0], "time ranges should be continious"
self.timerange = timerange
def post_batch(self, parent):
absolute = parent.state["written_size"]
if len(absolute.values) == 1:
# in tests this comes up as initdb execution, so we can have 0 or
# about 30MB on the first event. it is not consistent.
assert self.value is not None
else:
assert self.value == absolute.values[-1] - absolute.values[-2]
# sounds like this should hold, but it will not for branches -- probably related to timing
# assert self.sum == absolute.latest
class SyntheticSizeVerifier:
def __init__(self):
self.prev = None
self.value = None
pass
def ingest(self, event, parent):
assert isinstance(parent, TenantMetricsVerifier)
assert event["type"] == "absolute"
value = event["value"]
self.value = value
def post_batch(self, parent):
if self.prev is not None:
# this is assuming no one goes and deletes the cache file
assert (
self.value is not None
), "after calculating first synthetic size, cached or more recent should be sent"
self.prev = self.value
self.value = None
PER_METRIC_VERIFIERS = {
"remote_storage_size": CannotVerifyAnything,
"resident_size": CannotVerifyAnything,
"written_size": WrittenDataVerifier,
"written_data_bytes_delta": WrittenDataDeltaVerifier,
"timeline_logical_size": CannotVerifyAnything,
"synthetic_storage_size": SyntheticSizeVerifier,
}

View File

@@ -0,0 +1,113 @@
from pathlib import Path
from typing import Iterator
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
PSQL,
NeonProxy,
VanillaPostgres,
)
from fixtures.port_distributor import PortDistributor
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
def proxy_metrics_handler(request: Request) -> Response:
if request.json is None:
return Response(status=400)
events = request.json["events"]
log.info("received events:")
log.info(events)
# perform basic sanity checks
for event in events:
assert event["metric"] == "proxy_io_bytes_per_client"
assert event["endpoint_id"] == "test_endpoint_id"
assert event["value"] >= 0
assert event["stop_time"] >= event["start_time"]
return Response(status=200)
@pytest.fixture(scope="function")
def proxy_with_metric_collector(
port_distributor: PortDistributor,
neon_binpath: Path,
httpserver_listen_address,
test_output_dir: Path,
) -> Iterator[NeonProxy]:
"""Neon proxy that routes through link auth and has metric collection enabled."""
http_port = port_distributor.get_port()
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
metric_collection_interval = "5s"
with NeonProxy(
neon_binpath=neon_binpath,
test_output_dir=test_output_dir,
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
external_http_port=external_http_port,
metric_collection_endpoint=metric_collection_endpoint,
metric_collection_interval=metric_collection_interval,
auth_backend=NeonProxy.Link(),
) as proxy:
proxy.start()
yield proxy
@pytest.mark.asyncio
async def test_proxy_metric_collection(
httpserver: HTTPServer,
proxy_with_metric_collector: NeonProxy,
vanilla_pg: VanillaPostgres,
):
# mock http server that returns OK for the metrics
httpserver.expect_request("/billing/api/v1/usage_events", method="POST").respond_with_handler(
proxy_metrics_handler
)
# do something to generate load to generate metrics
# sleep for 5 seconds to give metric collector time to collect metrics
psql = await PSQL(
host=proxy_with_metric_collector.host, port=proxy_with_metric_collector.proxy_port
).run(
"create table tbl as select * from generate_series(0,1000); select pg_sleep(5); select 42"
)
base_uri = proxy_with_metric_collector.link_auth_uri
link = await NeonProxy.find_auth_link(base_uri, psql)
psql_session_id = NeonProxy.get_session_id(base_uri, link)
await NeonProxy.activate_link_auth(vanilla_pg, proxy_with_metric_collector, psql_session_id)
assert psql.stdout is not None
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "42"
# do something to generate load to generate metrics
# sleep for 5 seconds to give metric collector time to collect metrics
psql = await PSQL(
host=proxy_with_metric_collector.host, port=proxy_with_metric_collector.proxy_port
).run("insert into tbl select * from generate_series(0,1000); select pg_sleep(5); select 42")
link = await NeonProxy.find_auth_link(base_uri, psql)
psql_session_id = NeonProxy.get_session_id(base_uri, link)
await NeonProxy.activate_link_auth(
vanilla_pg, proxy_with_metric_collector, psql_session_id, create_user=False
)
assert psql.stdout is not None
out = (await psql.stdout.read()).decode("utf-8").strip()
assert out == "42"
httpserver.check()

View File

@@ -6,7 +6,6 @@ import queue
import shutil
import threading
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import pytest
@@ -137,7 +136,7 @@ def test_remote_storage_backup_and_restore(
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
dir_to_clear = env.pageserver.tenant_dir()
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
@@ -353,7 +352,7 @@ def test_remote_storage_upload_queue_retries(
env.pageserver.stop(immediate=True)
env.endpoints.stop_all()
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
dir_to_clear = env.pageserver.tenant_dir()
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
@@ -488,7 +487,7 @@ def test_remote_timeline_client_calls_started_metric(
env.pageserver.stop(immediate=True)
env.endpoints.stop_all()
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
dir_to_clear = env.pageserver.tenant_dir()
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
@@ -533,7 +532,7 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
timeline_path = env.timeline_dir(tenant_id, timeline_id)
timeline_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
client = env.pageserver.http_client()
@@ -704,7 +703,9 @@ def test_empty_branch_remote_storage_upload_on_restart(
# index upload is now hitting the failpoint, it should block the shutdown
env.pageserver.stop(immediate=True)
local_metadata = env.timeline_dir(env.initial_tenant, new_branch_timeline_id) / "metadata"
local_metadata = (
env.pageserver.timeline_dir(env.initial_tenant, new_branch_timeline_id) / "metadata"
)
assert local_metadata.is_file()
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)

View File

@@ -299,7 +299,7 @@ def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder):
# tenant is created with defaults, as in without config file
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
config_path = env.pageserver.workdir / "tenants" / str(tenant_id) / "config"
config_path = env.pageserver.tenant_dir(tenant_id) / "config"
assert config_path.exists(), "config file is always initially created"
http_client = env.pageserver.http_client()

View File

@@ -89,7 +89,7 @@ def test_tenant_delete_smoke(
tenant_delete_wait_completed(ps_http, tenant_id, iterations)
tenant_path = env.tenant_dir(tenant_id=tenant_id)
tenant_path = env.pageserver.tenant_dir(tenant_id)
assert not tenant_path.exists()
if remote_storage_kind in available_s3_storages():
@@ -269,7 +269,7 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations)
tenant_dir = env.tenant_dir(tenant_id)
tenant_dir = env.pageserver.tenant_dir(tenant_id)
# Check local is empty
assert not tenant_dir.exists()
@@ -366,7 +366,7 @@ def test_tenant_delete_is_resumed_on_attach(
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = env.pageserver.workdir / "tenants"
dir_to_clear = env.pageserver.tenant_dir()
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
@@ -379,7 +379,7 @@ def test_tenant_delete_is_resumed_on_attach(
wait_tenant_status_404(ps_http, tenant_id, iterations)
# we shouldn've created tenant dir on disk
tenant_path = env.tenant_dir(tenant_id=tenant_id)
tenant_path = env.pageserver.tenant_dir(tenant_id)
assert not tenant_path.exists()
if remote_storage_kind in available_s3_storages():

View File

@@ -286,7 +286,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
)
# assert tenant exists on disk
assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert env.pageserver.tenant_dir(tenant_id).exists()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
# we rely upon autocommit after each statement
@@ -329,7 +329,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
log.info("gc thread returned")
# check that nothing is left on disk for deleted tenant
assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert not env.pageserver.tenant_dir(tenant_id).exists()
with pytest.raises(
expected_exception=PageserverApiException, match=f"NotFound: tenant {tenant_id}"
@@ -354,7 +354,7 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv):
)
# assert tenant exists on disk
assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert env.pageserver.tenant_dir(tenant_id).exists()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
# we rely upon autocommit after each statement
@@ -383,7 +383,7 @@ def test_tenant_detach_ignored_tenant(neon_simple_env: NeonEnv):
log.info("ignored tenant detached without error")
# check that nothing is left on disk for deleted tenant
assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert not env.pageserver.tenant_dir(tenant_id).exists()
# assert the tenant does not exists in the Pageserver
tenants_after_detach = [tenant["id"] for tenant in client.tenant_list()]
@@ -410,7 +410,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv):
)
# assert tenant exists on disk
assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert env.pageserver.tenant_dir(tenant_id).exists()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
# we rely upon autocommit after each statement
@@ -427,7 +427,7 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv):
log.info("regular tenant detached without error")
# check that nothing is left on disk for deleted tenant
assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert not env.pageserver.tenant_dir(tenant_id).exists()
# assert the tenant does not exists in the Pageserver
tenants_after_detach = [tenant["id"] for tenant in client.tenant_list()]
@@ -528,7 +528,7 @@ def test_ignored_tenant_reattach(
pageserver_http = env.pageserver.http_client()
ignored_tenant_id, _ = env.neon_cli.create_tenant()
tenant_dir = env.pageserver.workdir / "tenants" / str(ignored_tenant_id)
tenant_dir = env.pageserver.tenant_dir(ignored_tenant_id)
tenants_before_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()]
tenants_before_ignore.sort()
timelines_before_ignore = [
@@ -619,7 +619,7 @@ def test_ignored_tenant_download_missing_layers(
# ignore the tenant and remove its layers
pageserver_http.tenant_ignore(tenant_id)
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
layers_removed = False
for dir_entry in timeline_dir.iterdir():
if dir_entry.name.startswith("00000"):
@@ -672,7 +672,7 @@ def test_ignored_tenant_stays_broken_without_metadata(
# ignore the tenant and remove its metadata
pageserver_http.tenant_ignore(tenant_id)
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
metadata_removed = False
for dir_entry in timeline_dir.iterdir():
if dir_entry.name == "metadata":

View File

@@ -216,7 +216,7 @@ def switch_pg_to_new_pageserver(
endpoint.start()
timeline_to_detach_local_path = env.timeline_dir(tenant_id, timeline_id)
timeline_to_detach_local_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
files_before_detach = os.listdir(timeline_to_detach_local_path)
assert (
"metadata" in files_before_detach
@@ -561,7 +561,7 @@ def test_emergency_relocate_with_branches_slow_replay(
# simpler than initializing a new one from scratch, but the effect on the single tenant
# is the same.
env.pageserver.stop(immediate=True)
shutil.rmtree(env.pageserver.workdir / "tenants" / str(tenant_id))
shutil.rmtree(env.pageserver.tenant_dir(tenant_id))
env.pageserver.start()
# This fail point will pause the WAL ingestion on the main branch, after the
@@ -709,7 +709,7 @@ def test_emergency_relocate_with_branches_createdb(
# Kill the pageserver, remove the tenant directory, and restart
env.pageserver.stop(immediate=True)
shutil.rmtree(env.pageserver.workdir / "tenants" / str(tenant_id))
shutil.rmtree(env.pageserver.tenant_dir(tenant_id))
env.pageserver.start()
# Wait before ingesting the WAL for CREATE DATABASE on the main branch. The original

View File

@@ -27,7 +27,7 @@ from prometheus_client.samples import Sample
def test_tenant_creation_fails(neon_simple_env: NeonEnv):
tenants_dir = Path(neon_simple_env.pageserver.workdir) / "tenants"
tenants_dir = neon_simple_env.pageserver.tenant_dir()
initial_tenants = sorted(
map(lambda t: t.split()[0], neon_simple_env.neon_cli.list_tenants().stdout.splitlines())
)
@@ -320,13 +320,7 @@ def test_pageserver_with_empty_tenants(
)
files_in_timelines_dir = sum(
1
for _p in Path.iterdir(
Path(env.pageserver.workdir)
/ "tenants"
/ str(tenant_with_empty_timelines)
/ "timelines"
)
1 for _p in Path.iterdir(env.pageserver.timeline_dir(tenant_with_empty_timelines))
)
assert (
files_in_timelines_dir == 0
@@ -337,9 +331,7 @@ def test_pageserver_with_empty_tenants(
env.pageserver.stop()
tenant_without_timelines_dir = env.initial_tenant
shutil.rmtree(
Path(env.pageserver.workdir) / "tenants" / str(tenant_without_timelines_dir) / "timelines"
)
shutil.rmtree(env.pageserver.timeline_dir(tenant_without_timelines_dir))
env.pageserver.start()

View File

@@ -179,9 +179,7 @@ def test_tenants_attached_after_download(
env.pageserver.stop()
timeline_dir = (
Path(env.pageserver.workdir) / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
)
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
local_layer_deleted = False
for path in Path.iterdir(timeline_dir):
if path.name.startswith("00000"):
@@ -259,7 +257,7 @@ def test_tenant_redownloads_truncated_file_on_startup(
env.endpoints.stop_all()
env.pageserver.stop()
timeline_dir = env.timeline_dir(tenant_id, timeline_id)
timeline_dir = env.pageserver.timeline_dir(tenant_id, timeline_id)
local_layer_truncated = None
for path in Path.iterdir(timeline_dir):
if path.name.startswith("00000"):

View File

@@ -32,7 +32,9 @@ def test_threshold_based_eviction(
synthetic_size_calculation_interval="2s"
metric_collection_endpoint="http://{host}:{port}/nonexistent"
"""
metrics_refused_log_line = ".*metrics endpoint refused the sent metrics.*/nonexistent.*"
metrics_refused_log_line = (
".*metrics_collection:.* upload consumption_metrics (still failed|failed, will retry).*"
)
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.append(metrics_refused_log_line)

View File

@@ -3,7 +3,6 @@ import os
import queue
import shutil
import threading
from pathlib import Path
import pytest
import requests
@@ -72,13 +71,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
"test_ancestor_branch_delete_branch1", "test_ancestor_branch_delete_parent"
)
timeline_path = (
env.pageserver.workdir
/ "tenants"
/ str(env.initial_tenant)
/ "timelines"
/ str(parent_timeline_id)
)
timeline_path = env.pageserver.timeline_dir(env.initial_tenant, parent_timeline_id)
with pytest.raises(
PageserverApiException, match="Cannot delete timeline which has child timelines"
@@ -89,13 +82,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
assert exc.value.status_code == 412
timeline_path = (
env.pageserver.workdir
/ "tenants"
/ str(env.initial_tenant)
/ "timelines"
/ str(leaf_timeline_id)
)
timeline_path = env.pageserver.timeline_dir(env.initial_tenant, leaf_timeline_id)
assert timeline_path.exists()
# retry deletes when compaction or gc is running in pageserver
@@ -336,7 +323,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
),
)
timeline_dir = env.timeline_dir(env.initial_tenant, timeline_id)
timeline_dir = env.pageserver.timeline_dir(env.initial_tenant, timeline_id)
# Check local is empty
assert not timeline_dir.exists()
# Check no delete mark present
@@ -416,7 +403,7 @@ def test_timeline_resurrection_on_attach(
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
dir_to_clear = env.pageserver.tenant_dir()
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
@@ -467,13 +454,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
"test_timeline_delete_fail_before_local_delete",
)
leaf_timeline_path = (
env.pageserver.workdir
/ "tenants"
/ str(env.initial_tenant)
/ "timelines"
/ str(leaf_timeline_id)
)
leaf_timeline_path = env.pageserver.timeline_dir(env.initial_tenant, leaf_timeline_id)
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
timeline_info = wait_until_timeline_state(
@@ -921,7 +902,7 @@ def test_timeline_delete_resumed_on_attach(
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.pageserver.workdir) / "tenants"
dir_to_clear = env.pageserver.tenant_dir()
shutil.rmtree(dir_to_clear)
os.mkdir(dir_to_clear)
@@ -933,7 +914,7 @@ def test_timeline_delete_resumed_on_attach(
# delete should be resumed
wait_timeline_detail_404(ps_http, env.initial_tenant, timeline_id, iterations=iterations)
tenant_path = env.timeline_dir(tenant_id=tenant_id, timeline_id=timeline_id)
tenant_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
assert not tenant_path.exists()
if remote_storage_kind in available_s3_storages():

View File

@@ -518,7 +518,7 @@ def test_timeline_size_metrics(
).value
# assert that the physical size metric matches the actual physical size on disk
timeline_path = env.timeline_dir(env.initial_tenant, new_timeline_id)
timeline_path = env.pageserver.timeline_dir(env.initial_tenant, new_timeline_id)
assert tl_physical_size_metric == get_timeline_dir_size(timeline_path)
# Check that the logical size metric is sane, and matches
@@ -658,7 +658,7 @@ def get_physical_size_values(
)
res.api_current_physical = detail["current_physical_size"]
timeline_path = env.timeline_dir(tenant_id, timeline_id)
timeline_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
res.python_timelinedir_layerfiles_physical = get_timeline_dir_size(timeline_path)
return res

View File

@@ -19,18 +19,40 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
# Install extension containing function needed for test
cur.execute("CREATE EXTENSION neon_test_utils")
# Create a test table and freeze it to set the VM bit.
# Create a test table for a few different scenarios and freeze it to set the VM bits.
cur.execute("CREATE TABLE vmtest_delete (id integer PRIMARY KEY)")
cur.execute("INSERT INTO vmtest_delete VALUES (1)")
cur.execute("VACUUM FREEZE vmtest_delete")
cur.execute("CREATE TABLE vmtest_update (id integer PRIMARY KEY)")
cur.execute("INSERT INTO vmtest_update SELECT g FROM generate_series(1, 1000) g")
cur.execute("VACUUM FREEZE vmtest_update")
cur.execute("CREATE TABLE vmtest_hot_update (id integer PRIMARY KEY, filler text)")
cur.execute("INSERT INTO vmtest_hot_update VALUES (1, 'x')")
cur.execute("VACUUM FREEZE vmtest_hot_update")
cur.execute("CREATE TABLE vmtest_cold_update (id integer PRIMARY KEY)")
cur.execute("INSERT INTO vmtest_cold_update SELECT g FROM generate_series(1, 1000) g")
cur.execute("VACUUM FREEZE vmtest_cold_update")
cur.execute(
"CREATE TABLE vmtest_cold_update2 (id integer PRIMARY KEY, filler text) WITH (fillfactor=100)"
)
cur.execute("INSERT INTO vmtest_cold_update2 SELECT g, '' FROM generate_series(1, 1000) g")
cur.execute("VACUUM FREEZE vmtest_cold_update2")
# DELETE and UPDATE the rows.
cur.execute("DELETE FROM vmtest_delete WHERE id = 1")
cur.execute("UPDATE vmtest_update SET id = 5000 WHERE id = 1")
cur.execute("UPDATE vmtest_hot_update SET filler='x' WHERE id = 1")
cur.execute("UPDATE vmtest_cold_update SET id = 5000 WHERE id = 1")
# Clear the VM bit on the last page with an INSERT. Then clear the VM bit on
# the page where row 1 is (block 0), by doing an UPDATE. The UPDATE is a
# cold update, and the new tuple goes to the last page, which already had
# its VM bit cleared. The point is that the UPDATE *only* clears the VM bit
# on the page containing the old tuple. We had a bug where we got the old
# and new pages mixed up, and that only shows up when one of the bits is
# cleared, but not the other one.
cur.execute("INSERT INTO vmtest_cold_update2 VALUES (9999, 'x')")
# Clears the VM bit on the old page
cur.execute("UPDATE vmtest_cold_update2 SET id = 5000, filler=repeat('x', 200) WHERE id = 1")
# Branch at this point, to test that later
fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "test_vm_bit_clear")
@@ -50,9 +72,13 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
"""
)
cur.execute("SELECT * FROM vmtest_delete WHERE id = 1")
cur.execute("SELECT id FROM vmtest_delete WHERE id = 1")
assert cur.fetchall() == []
cur.execute("SELECT * FROM vmtest_update WHERE id = 1")
cur.execute("SELECT id FROM vmtest_hot_update WHERE id = 1")
assert cur.fetchall() == [(1,)]
cur.execute("SELECT id FROM vmtest_cold_update WHERE id = 1")
assert cur.fetchall() == []
cur.execute("SELECT id FROM vmtest_cold_update2 WHERE id = 1")
assert cur.fetchall() == []
cur.close()
@@ -77,7 +103,111 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
"""
)
cur_new.execute("SELECT * FROM vmtest_delete WHERE id = 1")
cur_new.execute("SELECT id FROM vmtest_delete WHERE id = 1")
assert cur_new.fetchall() == []
cur_new.execute("SELECT * FROM vmtest_update WHERE id = 1")
cur_new.execute("SELECT id FROM vmtest_hot_update WHERE id = 1")
assert cur_new.fetchall() == [(1,)]
cur_new.execute("SELECT id FROM vmtest_cold_update WHERE id = 1")
assert cur_new.fetchall() == []
cur_new.execute("SELECT id FROM vmtest_cold_update2 WHERE id = 1")
assert cur_new.fetchall() == []
#
# Test that the ALL_FROZEN VM bit is cleared correctly at a HEAP_LOCK
# record.
#
def test_vm_bit_clear_on_heap_lock(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_vm_bit_clear_on_heap_lock", "empty")
endpoint = env.endpoints.create_start(
"test_vm_bit_clear_on_heap_lock",
config_lines=[
"log_autovacuum_min_duration = 0",
# Perform anti-wraparound vacuuming aggressively
"autovacuum_naptime='1 s'",
"autovacuum_freeze_max_age = 1000000",
],
)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
# Install extension containing function needed for test
cur.execute("CREATE EXTENSION neon_test_utils")
cur.execute("SELECT pg_switch_wal()")
# Create a test table and freeze it to set the all-frozen VM bit on all pages.
cur.execute("CREATE TABLE vmtest_lock (id integer PRIMARY KEY)")
cur.execute("INSERT INTO vmtest_lock SELECT g FROM generate_series(1, 50000) g")
cur.execute("VACUUM FREEZE vmtest_lock")
# Lock a row. This clears the all-frozen VM bit for that page.
cur.execute("SELECT * FROM vmtest_lock WHERE id = 40000 FOR UPDATE")
# Remember the XID. We will use it later to verify that we have consumed a lot of
# XIDs after this.
cur.execute("select pg_current_xact_id()")
locking_xid = cur.fetchall()[0][0]
# Stop and restart postgres, to clear the buffer cache.
#
# NOTE: clear_buffer_cache() will not do, because it evicts the dirty pages
# in a "clean" way. Our neon extension will write a full-page image of the VM
# page, and we want to avoid that.
endpoint.stop()
endpoint.start()
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 ")
tup = cur.fetchall()
xmax_before = tup[0][1]
# Consume a lot of XIDs, so that anti-wraparound autovacuum kicks
# in and the clog gets truncated. We set autovacuum_freeze_max_age to a very
# low value, so it doesn't take all that many XIDs for autovacuum to kick in.
for i in range(1000):
cur.execute(
"""
CREATE TEMP TABLE othertable (i int) ON COMMIT DROP;
do $$
begin
for i in 1..100000 loop
-- Use a begin-exception block to generate a new subtransaction on each iteration
begin
insert into othertable values (i);
exception when others then
raise 'not expected %', sqlerrm;
end;
end loop;
end;
$$;
"""
)
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 ")
tup = cur.fetchall()
log.info(f"tuple = {tup}")
xmax = tup[0][1]
assert xmax == xmax_before
if i % 50 == 0:
cur.execute("select datfrozenxid from pg_database where datname='postgres'")
datfrozenxid = cur.fetchall()[0][0]
if datfrozenxid > locking_xid:
break
cur.execute("select pg_current_xact_id()")
curr_xid = cur.fetchall()[0][0]
assert int(curr_xid) - int(locking_xid) >= 100000
# Now, if the VM all-frozen bit was not correctly cleared on
# replay, we will try to fetch the status of the XID that was
# already truncated away.
#
# ERROR: could not access status of transaction 1027
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 for update")
tup = cur.fetchall()
log.info(f"tuple = {tup}")

View File

@@ -43,7 +43,7 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder):
tenant_id, _ = env.neon_cli.create_tenant()
# assert tenant exists on disk
assert (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert (env.pageserver.tenant_dir(tenant_id)).exists()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
@@ -101,7 +101,7 @@ def test_walredo_not_left_behind_on_detach(neon_env_builder: NeonEnvBuilder):
pytest.fail(f"could not detach tenant: {last_error}")
# check that nothing is left on disk for deleted tenant
assert not (env.pageserver.workdir / "tenants" / str(tenant_id)).exists()
assert not env.pageserver.tenant_dir(tenant_id).exists()
# Pageserver schedules kill+wait of the WAL redo process to the background runtime,
# asynchronously to tenant detach. Cut it some slack to complete kill+wait before