Compare commits

..

7 Commits

Author SHA1 Message Date
Konstantin Knizhnik
0aecfbb09c Fix loading logical database size 2023-05-22 16:39:16 +03:00
Konstantin Knizhnik
9604aec0c9 Store timeline logical size in key-value storage to make it persistent 2023-05-20 21:28:56 +03:00
Alexander Bayandin
3837fca7a2 compute-node-image: fix postgis download (#4280)
## Problem

`osgeo.org` is experiencing some problems with DNS resolving which
breaks `compute-node-image` (because it can't download postgis)

## Summary of changes
- Add `140.211.15.30 download.osgeo.org` to /etc/hosts by passing it via
the container option
2023-05-19 15:34:22 +01:00
Dmitry Rodionov
7529ee2ec7 rfc: the state of pageserver tenant relocation (#3868)
Summarize current state of tenant relocation related activities and implementation ideas
2023-05-19 14:35:33 +03:00
Christian Schwarz
b391c94440 tenant create / update-config: reject unknown fields (#4267)
This PR enforces that the tenant create / update-config APIs reject
requests with unknown fields.

This is a desirable property because some tenant config settings control
the lifetime of user data (e.g., GC horizon or PITR interval).

Suppose we inadvertently rename the `pitr_interval` field in the Rust
code.
Then, right now, a client that still uses the old name will send a
tenant config request to configure a new PITR interval.
Before this PR, we would accept such a request, ignore the old name
field, and use the pageserver.toml default value for what the new PITR
interval is.
With this PR, we will instead reject such a request.

One might argue that the client could simply check whether the config it
sent has been applied, using the `/v1/tenant/.../config` endpoint.
That is correct for tenant create and update-config.

But, attach will soon [^1] grow the ability to have attach-time config
as well.
If we ignore unknown fields and fall back to global defaults in that
case, we risk data loss.
Example:
1. Default PITR in pageservers is 7 days.
2. Create a tenant and set its PITR to 30 days.
3. For 30 days, fill the tenant continuously with data.
4. Detach the tenant.
5. Attach tenant.

Attach must use the 30-day PITR setting in this scenario.
If it were to fall back to the 7-day default value, we would lose 23
days of PITR capability for the tenant.

So, the PR that adds attach-time tenant config will build on the
(clunky) infrastructure added in this PR

[^1]: https://github.com/neondatabase/neon/pull/4255

Implementation Notes
====================

This could have been a simple `#[serde(deny_unknown_fields)]` but sadly,
that is documented- but silent-at-compile-time-incompatible with
`#[serde(flatten)]`. But we are still using this by adding on outer struct and use unit tests to ensure it is correct.

`neon_local tenant config` now uses the `.remove()` pattern + bail if
there are leftover config args. That's in line with what
`neon_local tenant create` does. We should dedupe that logic in a future
PR.

---------

Signed-off-by: Alex Chi <iskyzh@gmail.com>
Co-authored-by: Alex Chi <iskyzh@gmail.com>
2023-05-18 21:16:09 -04:00
Alexander Bayandin
5abc4514b7 Un-xfail fixed tests on Postgres 15 (#4275)
- https://github.com/neondatabase/neon/pull/4182  
- https://github.com/neondatabase/neon/pull/4213
2023-05-18 22:38:33 +01:00
Alexander Bayandin
1b2ece3715 Re-enable compatibility tests on Postgres 15 (#4274)
- Enable compatibility tests for Postgres 15
- Also add `PgVersion::v_prefixed` property to return the version number
with, _guess what,_ v-prefix!
2023-05-18 19:56:09 +01:00
24 changed files with 467 additions and 1000 deletions

View File

@@ -71,12 +71,12 @@ runs:
path: /tmp/neon-previous
prefix: latest
- name: Download compatibility snapshot for Postgres 14
if: inputs.build_type != 'remote' && inputs.pg_version == 'v14'
- name: Download compatibility snapshot
if: inputs.build_type != 'remote'
uses: ./.github/actions/download
with:
name: compatibility-snapshot-${{ inputs.build_type }}-pg14
path: /tmp/compatibility_snapshot_pg14
name: compatibility-snapshot-${{ inputs.build_type }}-pg${{ inputs.pg_version }}
path: /tmp/compatibility_snapshot_pg${{ inputs.pg_version }}
prefix: latest
- name: Checkout
@@ -106,7 +106,7 @@ runs:
BUILD_TYPE: ${{ inputs.build_type }}
AWS_ACCESS_KEY_ID: ${{ inputs.real_s3_access_key_id }}
AWS_SECRET_ACCESS_KEY: ${{ inputs.real_s3_secret_access_key }}
COMPATIBILITY_SNAPSHOT_DIR: /tmp/compatibility_snapshot_pg14
COMPATIBILITY_SNAPSHOT_DIR: /tmp/compatibility_snapshot_pg${{ inputs.pg_version }}
ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'backward compatibility breakage')
ALLOW_FORWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'forward compatibility breakage')
RERUN_FLAKY: ${{ inputs.rerun_flaky }}
@@ -197,13 +197,13 @@ runs:
scripts/generate_and_push_perf_report.sh
fi
- name: Upload compatibility snapshot for Postgres 14
if: github.ref_name == 'release' && inputs.pg_version == 'v14'
- name: Upload compatibility snapshot
if: github.ref_name == 'release'
uses: ./.github/actions/upload
with:
name: compatibility-snapshot-${{ inputs.build_type }}-pg14-${{ github.run_id }}
name: compatibility-snapshot-${{ inputs.build_type }}-pg${{ inputs.pg_version }}-${{ github.run_id }}
# Directory is created by test_compatibility.py::test_create_snapshot, keep the path in sync with the test
path: /tmp/test_output/compatibility_snapshot_pg14/
path: /tmp/test_output/compatibility_snapshot_pg${{ inputs.pg_version }}/
prefix: latest
- name: Upload test results

View File

@@ -711,7 +711,11 @@ jobs:
compute-node-image:
runs-on: [ self-hosted, gen3, large ]
container: gcr.io/kaniko-project/executor:v1.9.2-debug
container:
image: gcr.io/kaniko-project/executor:v1.9.2-debug
# Workaround for "Resolving download.osgeo.org (download.osgeo.org)... failed: Temporary failure in name resolution.""
# Should be prevented by https://github.com/neondatabase/neon/issues/4281
options: --add-host=download.osgeo.org:140.211.15.30
needs: [ tag ]
strategy:
fail-fast: false
@@ -957,7 +961,7 @@ jobs:
promote-compatibility-data:
runs-on: [ self-hosted, gen3, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
options: --init
needs: [ promote-images, tag, regress-tests ]
if: github.ref_name == 'release' && github.event_name != 'workflow_dispatch'
@@ -968,11 +972,13 @@ jobs:
PREFIX: artifacts/latest
run: |
# Update compatibility snapshot for the release
for build_type in debug release; do
OLD_FILENAME=compatibility-snapshot-${build_type}-pg14-${GITHUB_RUN_ID}.tar.zst
NEW_FILENAME=compatibility-snapshot-${build_type}-pg14.tar.zst
for pg_version in v14 v15; do
for build_type in debug release; do
OLD_FILENAME=compatibility-snapshot-${build_type}-pg${pg_version}-${GITHUB_RUN_ID}.tar.zst
NEW_FILENAME=compatibility-snapshot-${build_type}-pg${pg_version}.tar.zst
time aws s3 mv --only-show-errors s3://${BUCKET}/${PREFIX}/${OLD_FILENAME} s3://${BUCKET}/${PREFIX}/${NEW_FILENAME}
time aws s3 mv --only-show-errors s3://${BUCKET}/${PREFIX}/${OLD_FILENAME} s3://${BUCKET}/${PREFIX}/${NEW_FILENAME}
done
done
# Update Neon artifact for the release (reuse already uploaded artifact)

View File

@@ -393,69 +393,79 @@ impl PageServerNode {
})
}
pub fn tenant_config(&self, tenant_id: TenantId, settings: HashMap<&str, &str>) -> Result<()> {
pub fn tenant_config(
&self,
tenant_id: TenantId,
mut settings: HashMap<&str, &str>,
) -> anyhow::Result<()> {
let config = {
// Braces to make the diff easier to read
models::TenantConfig {
checkpoint_distance: settings
.get("checkpoint_distance")
.remove("checkpoint_distance")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'checkpoint_distance' as an integer")?,
checkpoint_timeout: settings.get("checkpoint_timeout").map(|x| x.to_string()),
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
compaction_target_size: settings
.get("compaction_target_size")
.remove("compaction_target_size")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'compaction_target_size' as an integer")?,
compaction_period: settings.get("compaction_period").map(|x| x.to_string()),
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
compaction_threshold: settings
.get("compaction_threshold")
.remove("compaction_threshold")
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'compaction_threshold' as an integer")?,
gc_horizon: settings
.get("gc_horizon")
.remove("gc_horizon")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'gc_horizon' as an integer")?,
gc_period: settings.get("gc_period").map(|x| x.to_string()),
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
image_creation_threshold: settings
.get("image_creation_threshold")
.remove("image_creation_threshold")
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
walreceiver_connect_timeout: settings
.get("walreceiver_connect_timeout")
.remove("walreceiver_connect_timeout")
.map(|x| x.to_string()),
lagging_wal_timeout: settings
.remove("lagging_wal_timeout")
.map(|x| x.to_string()),
lagging_wal_timeout: settings.get("lagging_wal_timeout").map(|x| x.to_string()),
max_lsn_wal_lag: settings
.get("max_lsn_wal_lag")
.remove("max_lsn_wal_lag")
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
trace_read_requests: settings
.get("trace_read_requests")
.remove("trace_read_requests")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
eviction_policy: settings
.get("eviction_policy")
.map(|x| serde_json::from_str(x))
.remove("eviction_policy")
.map(serde_json::from_str)
.transpose()
.context("Failed to parse 'eviction_policy' json")?,
min_resident_size_override: settings
.get("min_resident_size_override")
.remove("min_resident_size_override")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'min_resident_size_override' as an integer")?,
evictions_low_residence_duration_metric_threshold: settings
.get("evictions_low_residence_duration_metric_threshold")
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
}
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
}
self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))?
.json(&models::TenantConfigRequest { tenant_id, config })
.send()?

View File

@@ -0,0 +1,232 @@
# The state of pageserver tenant relocation
Created on 17.03.23
## Motivation
There were previous write ups on the subject. The design of tenant relocation was planned at the time when we had quite different landscape. I e there was no on-demand download/eviction. They were on the horizon but we still planned for cases when they were not available. Some other things have changed. Now safekeepers offload wal to s3 so we're not risking overflowing their disks. Having all of the above, it makes sense to recap and take a look at the options we have now, which adjustments we'd like to make to original process, etc.
Related (in chronological order):
- Tracking issue with initial discussion: [#886](https://github.com/neondatabase/neon/issues/886)
- [015. Storage Messaging](015-storage-messaging.md)
- [020. Pageserver S3 Coordination](020-pageserver-s3-coordination.md)
## Summary
The RFC consists of a walkthrough of prior art on tenant relocation and corresponding problems. It describes 3 approaches.
1. Simplistic approach that uses ignore and is the fastest to implement. The main downside is a requirement of short downtime.
2. More complicated approach that avoids even short downtime.
3. Even more complicated approach that will allow multiple pageservers to operate concurrently on the same tenant possibly allowing for HA cluster topologies and horizontal scaling of reads (i e compute talks to multiple pageservers).
The order in which solutions are described is a bit different. We start from 2, then move to possible compromises (aka simplistic approach) and then move to discussing directions for solving HA/Pageserver replica case with 3.
## Components
pageserver, control-plane, safekeepers (a bit)
## Requirements
Relocation procedure should move tenant from one pageserver to another without downtime introduced by storage side. For now restarting compute for applying new configuration is fine.
- component restarts
- component outage
- pageserver loss
## The original proposed implementation
The starting point is this sequence:
```mermaid
sequenceDiagram
autonumber
participant CP as Control Plane
participant PS1 as Pageserver 1
participant PS2 as Pageserver 2
participant S3
CP->>PS2: Attach tenant X
PS2->>S3: Fetch timelines, indexes for them
PS2->>CP: Accepted
CP->>CP: Change pageserver id in project
CP->>PS1: Detach
```
Which problems do we have with naive approach?
### Concurrent GC and Compaction
The problem is that they can run on both, PS1 and PS2. Consider this example from [Pageserver S3 Coordination RFC](020-pageserver-s3-coordination.md)
```mermaid
sequenceDiagram
autonumber
participant PS1
participant S3
participant PS2
PS1->>S3: Uploads L1, L2 <br/> Index contains L1 L2
PS2->>S3: Attach called, sees L1, L2
PS1->>S3: Compaction comes <br/> Removes L1, adds L3
note over S3: Index now L2, L3
PS2->>S3: Uploads new layer L4 <br/> (added to previous view of the index)
note over S3: Index now L1, L2, L4
```
At this point it is not possible to restore the state from index, it contains L2 which
is no longer available in s3 and doesnt contain L3 added by compaction by the
first pageserver. So if any of the pageservers restart, initial sync will fail
(or in on-demand world it will fail a bit later during page request from
missing layer)
The problem lies in shared index_part.json. Having intersecting layers from append only edits is expected to work, though this is an uncharted territory without tests.
#### Options
There are several options on how to restrict concurrent access to index file.
First and the simplest one is external orchestration. Control plane which runs migration can use special api call on pageserver to stop background processes (gc, compaction), and even possibly all uploads.
So the sequence becomes:
```mermaid
sequenceDiagram
autonumber
participant CP as Control Plane
participant PS1 as Pageserver 1
participant PS2 as Pageserver 2
participant S3
CP->>PS1: Pause background jobs, pause uploading new layers.
CP->>PS2: Attach tenant X.
PS2->>S3: Fetch timelines, index, start background operations
PS2->>CP: Accepted
CP->>CP: Monitor PS2 last record lsn, ensure OK lag
CP->>CP: Change pageserver id in project
CP->>PS1: Detach
```
The downside of this sequence is the potential rollback process. What if something goes wrong on new pageserver? Can we safely roll back to source pageserver?
There are two questions:
#### How can we detect that something went wrong?
We can run usual availability check (consists of compute startup and an update of one row).
Note that we cant run separate compute for that before touching compute that client runs actual workload on, because we cant have two simultaneous computes running in read-write mode on the same timeline (enforced by safekeepers consensus algorithm). So we can either run some readonly check first (basebackup) and then change pageserver id and run availability check. If it failed we can roll it back to the old one.
#### What can go wrong? And how we can safely roll-back?
In the sequence above during attach we start background processes/uploads. They change state in remote storage so it is possible that after rollback remote state will be different from one that was observed by source pageserver. So if target pageserver goes wild then source pageserver may fail to start with changed remote state.
Proposed option would be to implement a barrier (read-only) mode when pageserver does not update remote state.
So the sequence for happy path becomes this one:
```mermaid
sequenceDiagram
autonumber
participant CP as Control Plane
participant PS1 as Pageserver 1
participant PS2 as Pageserver 2
participant S3
CP->>PS1: Pause background jobs, pause uploading new layers.
CP->>PS2: Attach tenant X in remote readonly mode.
PS2->>S3: Fetch timelines, index
PS2->>CP: Accepted
CP->>CP: Monitor PS2 last record lsn, ensure OK lag
CP->>CP: Change pageserver id in project
CP->>CP: Run successful availability check
CP->>PS2: Start uploads, background tasks
CP->>PS1: Detach
```
With this sequence we restrict any changes to remote storage to one pageserver. So there is no concurrent access at all, not only for index_part.json, but for everything else too. This approach makes it possible to roll back after failure on new pageserver.
The sequence with roll back process:
```mermaid
sequenceDiagram
autonumber
participant CP as Control Plane
participant PS1 as Pageserver 1
participant PS2 as Pageserver 2
participant S3
CP->>PS1: Pause background jobs, pause uploading new layers.
CP->>PS2: Attach tenant X in remote readonly mode.
PS2->>S3: Fetch timelines, index
PS2->>CP: Accepted
CP->>CP: Monitor PS2 last record lsn, ensure OK lag
CP->>CP: Change pageserver id in project
CP->>CP: Availability check Failed
CP->>CP: Change pageserver id back
CP->>PS1: Resume remote operations
CP->>PS2: Ignore (instead of detach for investigation purposes)
```
## Concurrent branch creation
Another problem is a possibility of concurrent branch creation calls.
I e during migration create_branch can be called on old pageserver and newly created branch wont be seen on new pageserver. Prior art includes prototyping an approach of trying to mirror such branches, but currently it lost its importance, because now attach is fast because we dont need to download all data, and additionally to the best of my knowledge of control plane internals (cc @ololobus to confirm) operations on one project are executed sequentially, so it is not possible to have such case. So branch create operation will be executed only when relocation is completed. As a safety measure we can forbid branch creation for tenants that are in readonly remote state.
## Simplistic approach
The difference of simplistic approach from one described above is that it calls ignore on source tenant first and then calls attach on target pageserver. Approach above does it in opposite order thus opening a possibility for race conditions we strive to avoid.
The approach largely follows this guide: <https://github.com/neondatabase/cloud/wiki/Cloud:-Ad-hoc-tenant-relocation>
The happy path sequence:
```mermaid
sequenceDiagram
autonumber
participant CP as Control Plane
participant PS1 as Pageserver 1
participant PS2 as Pageserver 2
participant SK as Safekeeper
participant S3
CP->>CP: Enable maintenance mode
CP->>PS1: Ignore
CP->>PS2: Attach
PS2->>CP: Accepted
loop Delete layers for each timeline
CP->>PS2: Get last record lsn
CP->>SK: Get commit lsn
CP->>CP: OK? Timed out?
end
CP->>CP: Change pageserver id in project
CP->>CP: Run successful availability check
CP->>CP: Disable maintenance mode
CP->>PS1: Detach ignored
```
The sequence contains exactly the same rollback problems as in previous approach described above. They can be resolved the same way.
Most probably we'd like to move forward without this safety measure and implement it on top of this approach to make progress towards the downtime-less one.
## Lease based approach
In order to allow for concurrent operation on the same data on remote storage for multiple pageservers we need to go further than external orchestration.
NOTE: [020. Pageserver S3 Coordination](020-pageserver-s3-coordination.md) discusses one more approach that relies on duplication of index_part.json for each pageserver operating on the timeline. This approach still requires external coordination which makes certain things easier but requires additional bookkeeping to account for multiple index_part.json files. Discussion/comparison with proposed lease based approach
The problems are outlined in [020. Pageserver S3 Coordination](020-pageserver-s3-coordination.md) and suggested solution includes [Coordination based approach](020-pageserver-s3-coordination.md#coordination-based-approach). This way it will allow to do basic leader election for pageservers so they can decide which node will be responsible for running GC and compaction. The process is based on extensive communication via storage broker and consists of a lease that is taken by one of the pageservers that extends it to continue serving a leader role.
There are two options for ingesting new data into pageserver in follower role. One option is to avoid WAL ingestion at all and rely on notifications from leader to discover new layers on s3. Main downside of this approach is that follower will always lag behind the primary node because it wont have the last layer until it is uploaded to remote storage. In case of a primary failure follower will be required to reingest last segment (up to 256Mb of WAL currently) which slows down recovery. Additionally if compute is connected to follower pageserver it will observe latest data with a delay. Queries from compute will likely experience bigger delays when recent lsn is required.
The second option is to consume WAL stream on both pageservers. In this case the only problem is non deterministic layer generation. Additional bookkeeping will be required to deduplicate layers from primary with local ones. Some process needs to somehow merge them to remove duplicated data. Additionally we need to have good testing coverage to ensure that our implementation of `get_page@lsn` properly handles intersecting layers.
There is another tradeoff. Approaches may be different in amount of traffic between system components. With first approach there can be increased traffic between follower and remote storage. But only in case follower has some activity that actually requests pages (!). With other approach traffic increase will be permanent and will be caused by two WAL streams instead of one.
## Summary
Proposed implementation strategy:
Go with the simplest approach for now. Then work on tech debt, increase test coverage. Then gradually move forward to second approach by implementing safety measures first, finishing with switch of order between ignore and attach operation.
And only then go to lease based approach to solve HA/Pageserver replica use cases.

View File

@@ -131,13 +131,14 @@ pub struct TimelineCreateRequest {
}
#[serde_as]
#[derive(Serialize, Deserialize, Default)]
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct TenantCreateRequest {
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub new_tenant_id: Option<TenantId>,
#[serde(flatten)]
pub config: TenantConfig,
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
}
impl std::ops::Deref for TenantCreateRequest {
@@ -148,7 +149,7 @@ impl std::ops::Deref for TenantCreateRequest {
}
}
#[derive(Serialize, Deserialize, Default)]
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct TenantConfig {
pub checkpoint_distance: Option<u64>,
pub checkpoint_timeout: Option<String>,
@@ -192,12 +193,13 @@ impl TenantCreateRequest {
}
#[serde_as]
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantConfigRequest {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde(flatten)]
pub config: TenantConfig,
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
}
impl std::ops::Deref for TenantConfigRequest {
@@ -280,7 +282,6 @@ pub struct TimelineInfo {
/// Sum of the size of all layer files.
/// If a layer is present in both local FS and S3, it counts only once.
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
pub current_logical_size_non_incremental: Option<u64>,
pub timeline_dir_layer_file_size_sum: Option<u64>,
@@ -768,4 +769,31 @@ mod tests {
assert!(format!("{:?}", &original_broken.state).contains("reason"));
assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
}
#[test]
fn test_reject_unknown_field() {
let id = TenantId::generate();
let create_request = json!({
"new_tenant_id": id.to_string(),
"unknown_field": "unknown_value".to_string(),
});
let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
assert!(
err.to_string().contains("unknown field `unknown_field`"),
"expect unknown field `unknown_field` error, got: {}",
err
);
let id = TenantId::generate();
let config_request = json!({
"tenant_id": id.to_string(),
"unknown_field": "unknown_value".to_string(),
});
let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
assert!(
err.to_string().contains("unknown field `unknown_field`"),
"expect unknown field `unknown_field` error, got: {}",
err
);
}
}

View File

@@ -88,7 +88,3 @@ harness = false
[[bench]]
name = "bench_walredo"
harness = false
[[bench]]
name = "bench_disk_lookup"
harness = false

View File

@@ -1,164 +0,0 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pageserver::{tenant::{disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}, block_io::{BlockBuf, FileBlockReader}, storage_layer::DeltaLayerWriter}, repository::Key, virtual_file::{VirtualFile, self}, page_cache};
use std::{time::Instant, collections::BTreeMap};
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use utils::{id::{TimelineId, TenantId}, lsn::Lsn};
use std::{io::{Read, Write}, path::PathBuf};
use pageserver::config::PageServerConf;
struct MockLayer {
pub path: PathBuf,
pub index_start_blk: u32,
pub index_root_blk: u32,
}
impl MockLayer {
fn read(&self, key: i128) -> Option<u64> {
// Read from disk btree
let file = FileBlockReader::new(VirtualFile::open(&self.path).unwrap());
let tree_reader = DiskBtreeReader::<_, 24>::new(
self.index_start_blk,
self.index_root_blk,
file,
);
let key: Key = Key::from_i128(key);
let mut key_bytes: [u8; 24] = [8u8; 24];
key.write_to_byte_slice(&mut key_bytes);
let mut result = None;
tree_reader.visit(&key_bytes, VisitDirection::Backwards, |key, value| {
if key == key_bytes {
result = Some(value);
}
return false
}).unwrap();
result
}
}
fn make_simple(n_keys: i128, name: &str) -> MockLayer {
let now = Instant::now();
let block_buf = BlockBuf::new();
let mut writer = DiskBtreeBuilder::<_, 24>::new(block_buf);
for i in 0..n_keys {
let key: Key = Key::from_i128(i);
let value: u64 = i as u64;
let mut key_bytes: [u8; 24] = [8u8; 24];
key.write_to_byte_slice(&mut key_bytes);
writer.append(&key_bytes, value).unwrap();
}
let (index_root_blk, block_buf) = writer.finish().unwrap();
println!("wrote {} keys to BlockBuf in {:?}", n_keys, now.elapsed());
// wrote 4_000_000 keys to BlockBuf in 129.980503ms
// wrote 40_000_000 keys to BlockBuf in 1.336874876s
// (/ 52.0 (+ 0.129 0.062))
let index_start_blk = 0; // ???
let path = std::env::current_dir().unwrap()
.parent().unwrap()
.join(".neon") // NOTE this is important because .neon is where I mount the ssd
.join("test_output")
.join("bench_disk_lookup")
.join("disk_btree")
.join(name);
std::fs::create_dir_all(path.clone().parent().unwrap()).unwrap();
let layer = MockLayer {
path: path.clone(),
index_start_blk,
index_root_blk,
};
let now = Instant::now();
let mut file = VirtualFile::create(&path).unwrap();
let mut total_len = 0;
for buf in block_buf.blocks {
file.write_all(buf.as_ref()).unwrap();
total_len += buf.len();
}
println!("flushed {} bytes to disk in {:?}", total_len, now.elapsed());
// flushed 52_355_072 bytes to disk in 62.540002ms => 800 MB/s
// flushed 523_411_456 bytes to disk in 551.762844ms => 800 MB/s
// flushed 523_411_456 bytes to disk in 4.989601463s => 100 MB/s !!!!
let now = Instant::now();
file.sync_all().unwrap();
println!("fsynced in {:?}", now.elapsed());
// flushed 523411456 bytes to disk in 574.897513ms | fsynced in 45.079831ms
// flushed 523411456 bytes to disk in 557.103133ms | fsynced in 56.976345ms
// flushed 523411456 bytes to disk in 559.939736ms | fsynced in 58.743932ms
// flushed 523411456 bytes to disk in 2.128451459s | fsynced in 1.662821424s
// flushed 523411456 bytes to disk in 2.937101445s | fsynced in 1.452016294s
// flushed 523411456 bytes to disk in 560.161377ms | fsynced in 63.579154ms
// flushed 523411456 bytes to disk in 562.492048ms | fsynced in 46.795958ms
// flushed 523411456 bytes to disk in 554.746062ms | fsynced in 69.815532ms
// flushed 523411456 bytes to disk in 566.547446ms | fsynced in 52.785175ms
layer
}
fn make_many(n_keys: i128, n_layers: i128) -> Vec<MockLayer> {
(0..n_layers)
.map(|i| make_simple(n_keys, &format!("layer_{}.tmp", i)))
.collect()
}
// cargo bench --bench bench_disk_lookup
fn bench_disk_lookup(c: &mut Criterion) {
virtual_file::init(10);
page_cache::init(10000);
// Results in a 40MB index
let n_keys = 4_000_000;
// One layer for each query
let n_layers = 100;
let n_queries = n_layers;
// let n_keys = 40_000_000;
// let n_layers = 10;
// Write to disk btree
let layers = make_many(n_keys, n_layers);
return;
// Write to mem btrees
let mem_btrees: Vec<BTreeMap<i128, u64>> = (0..n_layers)
.map(|_| (0..n_keys)
.map(|i| (i as i128, i as u64))
.collect())
.collect();
// Pick queries
let rng = &mut StdRng::seed_from_u64(1);
let queries: Vec<_> = (0..n_keys).collect();
let queries: Vec<_> = queries.choose_multiple(rng, n_queries as usize).copied().collect();
// Define and name the benchmark function
let mut group = c.benchmark_group("g1");
group.bench_function("disk_btree", |b| {
b.iter(|| {
for (i, q) in queries.clone().into_iter().enumerate() {
black_box({
assert_eq!(layers[i].read(q), Some(q as u64));
})
}
});
});
group.bench_function("mem_btree", |b| {
b.iter(|| {
for (i, q) in queries.clone().into_iter().enumerate() {
black_box({
assert_eq!(mem_btrees[i].get(&q), Some(&(q as u64)));
})
}
});
});
group.finish();
}
criterion_group!(group_1, bench_disk_lookup);
criterion_main!(group_1);

View File

@@ -5,7 +5,7 @@
//!
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{mgr, LogicalSizeCalculationCause};
use crate::tenant::mgr;
use anyhow;
use chrono::Utc;
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
@@ -113,7 +113,7 @@ pub async fn collect_metrics_iteration(
cached_metrics: &mut HashMap<PageserverConsumptionMetricsKey, u64>,
metric_collection_endpoint: &reqwest::Url,
node_id: NodeId,
ctx: &RequestContext,
_ctx: &RequestContext,
send_cached: bool,
) {
let mut current_metrics: Vec<(PageserverConsumptionMetricsKey, u64)> = Vec::new();
@@ -164,30 +164,15 @@ pub async fn collect_metrics_iteration(
timeline_written_size,
));
let span = info_span!("collect_metrics_iteration", tenant_id = %timeline.tenant_id, timeline_id = %timeline.timeline_id);
match span.in_scope(|| timeline.get_current_logical_size(ctx)) {
// Only send timeline logical size when it is fully calculated.
Ok((size, is_exact)) if is_exact => {
current_metrics.push((
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: Some(timeline.timeline_id),
metric: TIMELINE_LOGICAL_SIZE,
},
size,
));
}
Ok((_, _)) => {}
Err(err) => {
error!(
"failed to get current logical size for timeline {}: {err:?}",
timeline.timeline_id
);
continue;
}
};
current_metrics.push((
PageserverConsumptionMetricsKey {
tenant_id,
timeline_id: Some(timeline.timeline_id),
metric: TIMELINE_LOGICAL_SIZE,
},
timeline.get_current_logical_size(),
));
}
let timeline_resident_size = timeline.get_resident_physical_size();
tenant_resident_size += timeline_resident_size;
}
@@ -336,7 +321,6 @@ pub async fn calculate_synthetic_size_worker(
if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await
{
if let Err(e) = tenant.calculate_synthetic_size(
LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize,
ctx).await {
error!("failed to calculate synthetic size for tenant {}: {}", tenant_id, e);
}

View File

@@ -741,8 +741,11 @@ paths:
$ref: "#/components/schemas/Error"
post:
description: |
Create a tenant. Returns new tenant id on success.\
Create a tenant. Returns new tenant id on success.
If no new tenant id is specified in parameters, it would be generated. It's an error to recreate the same tenant.
Invalid fields in the tenant config will cause the request to be rejected with status 400.
requestBody:
content:
application/json:
@@ -790,6 +793,8 @@ paths:
put:
description: |
Update tenant's config.
Invalid fields in the tenant config will cause the request to be rejected with status 400.
requestBody:
content:
application/json:

View File

@@ -26,7 +26,7 @@ use crate::tenant::config::TenantConfOpt;
use crate::tenant::mgr::{TenantMapInsertError, TenantStateError};
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
use crate::tenant::{PageReconstructError, Timeline};
use crate::{config::PageServerConf, tenant::mgr};
use utils::{
auth::JwtAuth,
@@ -168,36 +168,12 @@ impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
}
// Helper function to construct a TimelineInfo struct for a timeline
async fn build_timeline_info(
fn build_timeline_info(
timeline: &Arc<Timeline>,
include_non_incremental_logical_size: bool,
ctx: &RequestContext,
_ctx: &RequestContext,
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
let mut info = build_timeline_info_common(timeline, ctx)?;
if include_non_incremental_logical_size {
// XXX we should be using spawn_ondemand_logical_size_calculation here.
// Otherwise, if someone deletes the timeline / detaches the tenant while
// we're executing this function, we will outlive the timeline on-disk state.
info.current_logical_size_non_incremental = Some(
timeline
.get_current_logical_size_non_incremental(
info.last_record_lsn,
CancellationToken::new(),
ctx,
)
.await?,
);
}
Ok(info)
}
fn build_timeline_info_common(
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
let last_record_lsn = timeline.get_last_record_lsn();
let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
let guard = timeline.last_received_wal.lock().unwrap();
@@ -217,13 +193,7 @@ fn build_timeline_info_common(
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
};
let current_logical_size = match timeline.get_current_logical_size(ctx) {
Ok((size, _)) => Some(size),
Err(err) => {
error!("Timeline info creation failed to get current logical size: {err:?}");
None
}
};
let current_logical_size = Some(timeline.get_current_logical_size());
let current_physical_size = Some(timeline.layer_size_sum());
let state = timeline.current_state();
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
@@ -240,7 +210,6 @@ fn build_timeline_info_common(
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
current_logical_size,
current_physical_size,
current_logical_size_non_incremental: None,
timeline_dir_layer_file_size_sum: None,
wal_source_connstr,
last_received_msg_lsn,
@@ -282,7 +251,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
.await {
Ok(Some(new_timeline)) => {
// Created. Construct a TimelineInfo for it.
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
let timeline_info = build_timeline_info(&new_timeline, &ctx)
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::CREATED, timeline_info)
}
@@ -296,8 +265,6 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let include_non_incremental_logical_size: Option<bool> =
parse_query_param(&request, "include-non-incremental-logical-size")?;
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
@@ -308,15 +275,11 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
let mut response_data = Vec::with_capacity(timelines.len());
for timeline in timelines {
let timeline_info = build_timeline_info(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
&ctx,
)
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
.await
.context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
.map_err(ApiError::InternalServerError)?;
let timeline_info = build_timeline_info(&timeline, &ctx)
.context(
"Failed to convert tenant timeline {timeline_id} into the local one: {e:?}",
)
.map_err(ApiError::InternalServerError)?;
response_data.push(timeline_info);
}
@@ -331,8 +294,6 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let include_non_incremental_logical_size: Option<bool> =
parse_query_param(&request, "include-non-incremental-logical-size")?;
check_permission(&request, Some(tenant_id))?;
// Logical size calculation needs downloading.
@@ -345,14 +306,9 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
.get_timeline(timeline_id, false)
.map_err(ApiError::NotFound)?;
let timeline_info = build_timeline_info(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
&ctx,
)
.await
.context("get local timeline info")
.map_err(ApiError::InternalServerError)?;
let timeline_info = build_timeline_info(&timeline, &ctx)
.context("get local timeline info")
.map_err(ApiError::InternalServerError)?;
Ok::<_, ApiError>(timeline_info)
}
@@ -546,11 +502,7 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
// this can be long operation
let inputs = tenant
.gather_size_inputs(
retention_period,
LogicalSizeCalculationCause::TenantSizeHandler,
&ctx,
)
.gather_size_inputs(retention_period, &ctx)
.await
.map_err(ApiError::InternalServerError)?;

View File

@@ -20,7 +20,6 @@ use postgres_ffi::{Oid, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet};
use std::ops::Range;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use utils::{bin_ser::BeSer, lsn::Lsn};
@@ -139,6 +138,17 @@ impl Timeline {
Ok(total_blocks)
}
/// Get timeline logical size
pub async fn get_logical_size(
&self,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<u64, PageReconstructError> {
let mut buf = self.get(LOGICAL_SIZE_KEY, lsn, ctx).await?;
let size = buf.get_u64_le();
Ok(size)
}
/// Get size of a relation file
pub async fn get_rel_size(
&self,
@@ -489,46 +499,6 @@ impl Timeline {
self.get(CHECKPOINT_KEY, lsn, ctx).await
}
/// Does the same as get_current_logical_size but counted on demand.
/// Used to initialize the logical size tracking on startup.
///
/// Only relation blocks are counted currently. That excludes metadata,
/// SLRUs, twophase files etc.
pub async fn get_current_logical_size_non_incremental(
&self,
lsn: Lsn,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
// Fetch list of database dirs and iterate them
let buf = self.get(DBDIR_KEY, lsn, ctx).await.context("read dbdir")?;
let dbdir = DbDirectory::des(&buf).context("deserialize db directory")?;
let mut total_size: u64 = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
for rel in self
.list_rels(*spcnode, *dbnode, lsn, ctx)
.await
.context("list rels")?
{
if cancel.is_cancelled() {
return Err(CalculateLogicalSizeError::Cancelled);
}
let relsize_key = rel_size_to_key(rel);
let mut buf = self
.get(relsize_key, lsn, ctx)
.await
.with_context(|| format!("read relation size of {rel:?}"))?;
let relsize = buf.get_u32_le();
total_size += relsize as u64;
}
}
Ok(total_size * BLCKSZ as u64)
}
///
/// Get a KeySpace that covers all the Keys that are in use at the given LSN.
/// Anything that's not listed maybe removed from the underlying storage (from
@@ -819,6 +789,12 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub fn put_logical_size(&mut self, size: u64) -> anyhow::Result<()> {
let buf = size.to_le_bytes();
self.put(LOGICAL_SIZE_KEY, Value::Image(Bytes::from(buf.to_vec())));
Ok(())
}
pub async fn drop_dbdir(
&mut self,
spcnode: Oid,
@@ -1131,7 +1107,8 @@ impl<'a> DatadirModification<'a> {
result?;
if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
let size = writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
self.put_logical_size(size)?;
self.pending_nblocks = 0;
}
@@ -1159,7 +1136,8 @@ impl<'a> DatadirModification<'a> {
writer.finish_write(lsn);
if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
let size = writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
self.put_logical_size(size)?;
}
Ok(())
@@ -1274,7 +1252,7 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
// 03 misc
// controlfile
// checkpoint
// pg_version
// logical_size
//
// Below is a full list of the keyspace allocation:
//
@@ -1314,6 +1292,10 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
// Checkpoint:
// 03 00000000 00000000 00000000 00 00000001
//-- Section 01: relation data and metadata
//
// LogicalSize:
// 03 00000000 00000000 00000000 00 00000002
//
const DBDIR_KEY: Key = Key {
field1: 0x00,
@@ -1536,6 +1518,15 @@ const CHECKPOINT_KEY: Key = Key {
field6: 1,
};
const LOGICAL_SIZE_KEY: Key = Key {
field1: 0x03,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 3,
};
// Reverse mappings for a few Keys.
// These are needed by WAL redo manager.

View File

@@ -237,11 +237,6 @@ pub enum TaskKind {
/// See [`crate::disk_usage_eviction_task`].
DiskUsageEviction,
// Initial logical size calculation
InitialLogicalSizeCalculation,
OndemandLogicalSizeCalculation,
// Task that flushes frozen in-memory layers to disk
LayerFlushTask,

View File

@@ -80,7 +80,6 @@ use utils::{
mod blob_io;
pub mod block_io;
pub mod disk_btree;
pub mod disk_persistent_bst;
pub(crate) mod ephemeral_file;
pub mod layer_map;
@@ -99,9 +98,7 @@ mod timeline;
pub mod size;
pub(crate) use timeline::debug_assert_current_span_has_tenant_and_timeline_id;
pub use timeline::{
LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
};
pub use timeline::{LocalLayerInfoForDiskUsageEviction, PageReconstructError, Timeline};
// re-export this function so that page_cache.rs can use it.
pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file;
@@ -1044,11 +1041,29 @@ impl Tenant {
// The loops will shut themselves down when they notice that the tenant is inactive.
self.activate(ctx)?;
self.load_logical_sizes().await?;
info!("Done");
Ok(())
}
async fn load_logical_sizes(&self) -> anyhow::Result<()> {
let not_broken_timelines: Vec<Arc<Timeline>>;
{
let timelines_accessor = self.timelines.lock().unwrap();
not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken)
.cloned()
.collect();
}
for timeline in not_broken_timelines {
timeline.load_inmem_logical_size().await?;
}
Ok(())
}
/// Subroutine of `load_tenant`, to load an individual timeline
///
/// NB: The parent is assumed to be already loaded!
@@ -2517,7 +2532,6 @@ impl Tenant {
ancestor: Option<Arc<Timeline>>,
) -> anyhow::Result<UninitializedTimeline> {
let tenant_id = self.tenant_id;
let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
let remote_client = RemoteTimelineClient::new(
remote_storage.clone(),
@@ -2638,14 +2652,8 @@ impl Tenant {
// `max_retention_period` overrides the cutoff that is used to calculate the size
// (only if it is shorter than the real cutoff).
max_retention_period: Option<u64>,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> anyhow::Result<size::ModelInputs> {
let logical_sizes_at_once = self
.conf
.concurrent_tenant_size_logical_size_queries
.inner();
// TODO: Having a single mutex block concurrent reads is not great for performance.
//
// But the only case where we need to run multiple of these at once is when we
@@ -2655,27 +2663,15 @@ impl Tenant {
// See more for on the issue #2748 condenced out of the initial PR review.
let mut shared_cache = self.cached_logical_sizes.lock().await;
size::gather_inputs(
self,
logical_sizes_at_once,
max_retention_period,
&mut shared_cache,
cause,
ctx,
)
.await
size::gather_inputs(self, max_retention_period, &mut shared_cache, ctx).await
}
/// Calculate synthetic tenant size and cache the result.
/// This is periodically called by background worker.
/// result is cached in tenant struct
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
pub async fn calculate_synthetic_size(
&self,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> anyhow::Result<u64> {
let inputs = self.gather_size_inputs(None, cause, ctx).await?;
pub async fn calculate_synthetic_size(&self, ctx: &RequestContext) -> anyhow::Result<u64> {
let inputs = self.gather_size_inputs(None, ctx).await?;
let size = inputs.calculate()?;

View File

@@ -4,20 +4,14 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use anyhow::{bail, Context};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use crate::context::RequestContext;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use super::{LogicalSizeCalculationCause, Tenant};
use super::Tenant;
use crate::tenant::Timeline;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use tracing::*;
use tenant_size_model::{Segment, StorageModel};
/// Inputs to the actual tenant sizing model
@@ -123,10 +117,8 @@ pub struct TimelineInputs {
/// tenant size will be zero.
pub(super) async fn gather_inputs(
tenant: &Tenant,
limit: &Arc<Semaphore>,
max_retention_period: Option<u64>,
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> anyhow::Result<ModelInputs> {
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
@@ -319,15 +311,7 @@ pub(super) async fn gather_inputs(
// We left the 'size' field empty in all of the Segments so far.
// Now find logical sizes for all of the points that might need or benefit from them.
fill_logical_sizes(
&timelines,
&mut segments,
limit,
logical_size_cache,
cause,
ctx,
)
.await?;
fill_logical_sizes(&timelines, &mut segments, logical_size_cache, ctx).await?;
Ok(ModelInputs {
segments,
@@ -343,9 +327,7 @@ pub(super) async fn gather_inputs(
async fn fill_logical_sizes(
timelines: &[Arc<Timeline>],
segments: &mut [SegmentMeta],
limit: &Arc<Semaphore>,
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
@@ -361,11 +343,6 @@ async fn fill_logical_sizes(
// with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to
// our advantage with `?` error handling.
let mut joinset = tokio::task::JoinSet::new();
let cancel = tokio_util::sync::CancellationToken::new();
// be sure to cancel all spawned tasks if we are dropped
let _dg = cancel.clone().drop_guard();
// For each point that would benefit from having a logical size available,
// spawn a Task to fetch it, unless we have it cached already.
@@ -378,71 +355,18 @@ async fn fill_logical_sizes(
let lsn = Lsn(seg.segment.lsn);
if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) {
let cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned();
let mut cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned();
if cached_size.is_none() {
let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap());
let parallel_size_calcs = Arc::clone(limit);
let ctx = ctx.attached_child();
joinset.spawn(
calculate_logical_size(
parallel_size_calcs,
timeline,
lsn,
cause,
ctx,
cancel.child_token(),
)
.in_current_span(),
);
cached_size = Some(timeline.get_logical_size(lsn, ctx).await?);
}
e.insert(cached_size);
}
}
// Perform the size lookups
let mut have_any_error = false;
while let Some(res) = joinset.join_next().await {
// each of these come with Result<anyhow::Result<_>, JoinError>
// because of spawn + spawn_blocking
match res {
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the futures, nor should be");
}
Err(join_error) => {
// cannot really do anything, as this panic is likely a bug
error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}");
have_any_error = true;
}
Ok(Err(recv_result_error)) => {
// cannot really do anything, as this panic is likely a bug
error!("failed to receive logical size query result: {recv_result_error:#}");
have_any_error = true;
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
warn!(
timeline_id=%timeline.timeline_id,
"failed to calculate logical size at {lsn}: {error:#}"
);
have_any_error = true;
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
logical_size_cache.insert((timeline.timeline_id, lsn), size);
sizes_needed.insert((timeline.timeline_id, lsn), Some(size));
}
}
}
// prune any keys not needed anymore; we record every used key and added key.
logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
if have_any_error {
// we cannot complete this round, because we are missing data.
// we have however cached all we were able to request calculation on.
anyhow::bail!("failed to calculate some logical_sizes");
}
// Insert the looked up sizes to the Segments
for seg in segments.iter_mut() {
if !seg.size_needed() {
@@ -484,33 +408,6 @@ impl ModelInputs {
}
}
/// Newtype around the tuple that carries the timeline at lsn logical size calculation.
struct TimelineAtLsnSizeResult(
Arc<crate::tenant::Timeline>,
utils::lsn::Lsn,
Result<u64, CalculateLogicalSizeError>,
);
#[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))]
async fn calculate_logical_size(
limit: Arc<tokio::sync::Semaphore>,
timeline: Arc<crate::tenant::Timeline>,
lsn: utils::lsn::Lsn,
cause: LogicalSizeCalculationCause,
ctx: RequestContext,
cancel: CancellationToken,
) -> Result<TimelineAtLsnSizeResult, RecvError> {
let _permit = tokio::sync::Semaphore::acquire_owned(limit)
.await
.expect("global semaphore should not had been closed");
let size_res = timeline
.spawn_ondemand_logical_size_calculation(lsn, cause, ctx, cancel)
.instrument(info_span!("spawn_ondemand_logical_size_calculation"))
.await?;
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
}
#[test]
fn verify_size_for_multiple_branches() {
// this is generated from integration test test_tenant_size_with_multiple_branches, but this way

View File

@@ -8,7 +8,6 @@ use bytes::Bytes;
use fail::fail_point;
use futures::StreamExt;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
@@ -16,7 +15,7 @@ use pageserver_api::models::{
};
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TenantTimelineId;
@@ -50,9 +49,9 @@ use crate::tenant::{
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS};
use crate::pgdatadir_mapping::BlockNumber;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
use pageserver_api::reltag::RelTag;
@@ -211,7 +210,7 @@ pub struct Timeline {
repartition_threshold: u64,
/// Current logical size of the "datadir", at the last LSN.
current_logical_size: LogicalSize,
current_logical_size: AtomicI64,
/// Information about the last processed message by the WAL receiver,
/// or None if WAL receiver has not received anything for this timeline
@@ -229,126 +228,6 @@ pub struct Timeline {
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
}
/// Internal structure to hold all data needed for logical size calculation.
///
/// Calculation consists of two stages:
///
/// 1. Initial size calculation. That might take a long time, because it requires
/// reading all layers containing relation sizes at `initial_part_end`.
///
/// 2. Collecting an incremental part and adding that to the initial size.
/// Increments are appended on walreceiver writing new timeline data,
/// which result in increase or decrease of the logical size.
struct LogicalSize {
/// Size, potentially slow to compute. Calculating this might require reading multiple
/// layers, and even ancestor's layers.
///
/// NOTE: size at a given LSN is constant, but after a restart we will calculate
/// the initial size at a different LSN.
initial_logical_size: OnceCell<u64>,
/// Semaphore to track ongoing calculation of `initial_logical_size`.
initial_size_computation: Arc<tokio::sync::Semaphore>,
/// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
initial_part_end: Option<Lsn>,
/// All other size changes after startup, combined together.
///
/// Size shouldn't ever be negative, but this is signed for two reasons:
///
/// 1. If we initialized the "baseline" size lazily, while we already
/// process incoming WAL, the incoming WAL records could decrement the
/// variable and temporarily make it negative. (This is just future-proofing;
/// the initialization is currently not done lazily.)
///
/// 2. If there is a bug and we e.g. forget to increment it in some cases
/// when size grows, but remember to decrement it when it shrinks again, the
/// variable could go negative. In that case, it seems better to at least
/// try to keep tracking it, rather than clamp or overflow it. Note that
/// get_current_logical_size() will clamp the returned value to zero if it's
/// negative, and log an error. Could set it permanently to zero or some
/// special value to indicate "broken" instead, but this will do for now.
///
/// Note that we also expose a copy of this value as a prometheus metric,
/// see `current_logical_size_gauge`. Use the `update_current_logical_size`
/// to modify this, it will also keep the prometheus metric in sync.
size_added_after_initial: AtomicI64,
}
/// Normalized current size, that the data in pageserver occupies.
#[derive(Debug, Clone, Copy)]
enum CurrentLogicalSize {
/// The size is not yet calculated to the end, this is an intermediate result,
/// constructed from walreceiver increments and normalized: logical data could delete some objects, hence be negative,
/// yet total logical size cannot be below 0.
Approximate(u64),
// Fully calculated logical size, only other future walreceiver increments are changing it, and those changes are
// available for observation without any calculations.
Exact(u64),
}
impl CurrentLogicalSize {
fn size(&self) -> u64 {
*match self {
Self::Approximate(size) => size,
Self::Exact(size) => size,
}
}
}
impl LogicalSize {
fn empty_initial() -> Self {
Self {
initial_logical_size: OnceCell::with_value(0),
// initial_logical_size already computed, so, don't admit any calculations
initial_size_computation: Arc::new(Semaphore::new(0)),
initial_part_end: None,
size_added_after_initial: AtomicI64::new(0),
}
}
fn deferred_initial(compute_to: Lsn) -> Self {
Self {
initial_logical_size: OnceCell::new(),
initial_size_computation: Arc::new(Semaphore::new(1)),
initial_part_end: Some(compute_to),
size_added_after_initial: AtomicI64::new(0),
}
}
fn current_size(&self) -> anyhow::Result<CurrentLogicalSize> {
let size_increment: i64 = self.size_added_after_initial.load(AtomicOrdering::Acquire);
// ^^^ keep this type explicit so that the casts in this function break if
// we change the type.
match self.initial_logical_size.get() {
Some(initial_size) => {
initial_size.checked_add_signed(size_increment)
.with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
.map(CurrentLogicalSize::Exact)
}
None => {
let non_negative_size_increment = u64::try_from(size_increment).unwrap_or(0);
Ok(CurrentLogicalSize::Approximate(non_negative_size_increment))
}
}
}
fn increment_size(&self, delta: i64) {
self.size_added_after_initial
.fetch_add(delta, AtomicOrdering::SeqCst);
}
/// Make the value computed by initial logical size computation
/// available for re-use. This doesn't contain the incremental part.
fn initialized_size(&self, lsn: Lsn) -> Option<u64> {
match self.initial_part_end {
Some(v) if v == lsn => self.initial_logical_size.get().copied(),
_ => None,
}
}
}
pub struct WalReceiverInfo {
pub wal_source_connconf: PgConnectionConfig,
pub last_received_msg_lsn: Lsn,
@@ -446,14 +325,6 @@ impl std::fmt::Display for PageReconstructError {
}
}
#[derive(Clone, Copy)]
pub enum LogicalSizeCalculationCause {
Initial,
ConsumptionMetricsSyntheticSize,
EvictionTaskImitation,
TenantSizeHandler,
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -838,23 +709,23 @@ impl Timeline {
/// the initial size calculation has not been run (gets triggered on the first size access).
///
/// return size and boolean flag that shows if the size is exact
pub fn get_current_logical_size(
self: &Arc<Self>,
ctx: &RequestContext,
) -> anyhow::Result<(u64, bool)> {
let current_size = self.current_logical_size.current_size()?;
debug!("Current size: {current_size:?}");
pub fn get_current_logical_size(self: &Arc<Self>) -> u64 {
self.current_logical_size.load(AtomicOrdering::Relaxed) as u64
}
let mut is_exact = true;
let size = current_size.size();
if let (CurrentLogicalSize::Approximate(_), Some(initial_part_end)) =
(current_size, self.current_logical_size.initial_part_end)
{
is_exact = false;
self.try_spawn_size_init_task(initial_part_end, ctx);
/// Load from KV storage value of logical timeline size and store it in inmemory atomic variable
pub async fn load_inmem_logical_size(&self) -> anyhow::Result<()> {
let lsn = self.get_disk_consistent_lsn();
if lsn != Lsn::INVALID {
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Error);
match self.get_logical_size(lsn, &ctx).await {
Ok(size) => self
.current_logical_size
.store(size as i64, AtomicOrdering::Relaxed),
Err(e) => info!("Failed to load logical size: {:?}", e),
}
}
Ok((size, is_exact))
Ok(())
}
/// Check if more than 'checkpoint_distance' of WAL has been accumulated in
@@ -1399,15 +1270,7 @@ impl Timeline {
latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
initdb_lsn: metadata.initdb_lsn(),
current_logical_size: if disk_consistent_lsn.is_valid() {
// we're creating timeline data with some layer files existing locally,
// need to recalculate timeline's logical size based on data in the layers.
LogicalSize::deferred_initial(disk_consistent_lsn)
} else {
// we're creating timeline data without any layers existing locally,
// initial logical size is 0.
LogicalSize::empty_initial()
},
current_logical_size: AtomicI64::new(0),
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
repartition_threshold: 0,
@@ -1840,292 +1703,12 @@ impl Timeline {
Ok(())
}
fn try_spawn_size_init_task(self: &Arc<Self>, lsn: Lsn, ctx: &RequestContext) {
let permit = match Arc::clone(&self.current_logical_size.initial_size_computation)
.try_acquire_owned()
{
Ok(permit) => permit,
Err(TryAcquireError::NoPermits) => {
// computation already ongoing or finished with success
return;
}
Err(TryAcquireError::Closed) => unreachable!("we never call close"),
};
debug_assert!(self
.current_logical_size
.initial_logical_size
.get()
.is_none());
info!(
"spawning logical size computation from context of task kind {:?}",
ctx.task_kind()
);
// We need to start the computation task.
// It gets a separate context since it will outlive the request that called this function.
let self_clone = Arc::clone(self);
let background_ctx = ctx.detached_child(
TaskKind::InitialLogicalSizeCalculation,
DownloadBehavior::Download,
);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::InitialLogicalSizeCalculation,
Some(self.tenant_id),
Some(self.timeline_id),
"initial size calculation",
false,
// NB: don't log errors here, task_mgr will do that.
async move {
// no cancellation here, because nothing really waits for this to complete compared
// to spawn_ondemand_logical_size_calculation.
let cancel = CancellationToken::new();
let calculated_size = match self_clone
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
.await
{
Ok(s) => s,
Err(CalculateLogicalSizeError::Cancelled) => {
// Don't make noise, this is a common task.
// In the unlikely case that there is another call to this function, we'll retry
// because initial_logical_size is still None.
info!("initial size calculation cancelled, likely timeline delete / tenant detach");
return Ok(());
}
Err(CalculateLogicalSizeError::Other(err)) => {
if let Some(e @ PageReconstructError::AncestorStopping(_)) =
err.root_cause().downcast_ref()
{
// This can happen if the timeline parent timeline switches to
// Stopping state while we're still calculating the initial
// timeline size for the child, for example if the tenant is
// being detached or the pageserver is shut down. Like with
// CalculateLogicalSizeError::Cancelled, don't make noise.
info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}");
return Ok(());
}
return Err(err.context("Failed to calculate logical size"));
}
};
// we cannot query current_logical_size.current_size() to know the current
// *negative* value, only truncated to u64.
let added = self_clone
.current_logical_size
.size_added_after_initial
.load(AtomicOrdering::Relaxed);
let sum = calculated_size.saturating_add_signed(added);
// set the gauge value before it can be set in `update_current_logical_size`.
self_clone.metrics.current_logical_size_gauge.set(sum);
match self_clone
.current_logical_size
.initial_logical_size
.set(calculated_size)
{
Ok(()) => (),
Err(_what_we_just_attempted_to_set) => {
let existing_size = self_clone
.current_logical_size
.initial_logical_size
.get()
.expect("once_cell set was lost, then get failed, impossible.");
// This shouldn't happen because the semaphore is initialized with 1.
// But if it happens, just complain & report success so there are no further retries.
error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing")
}
}
// now that `initial_logical_size.is_some()`, reduce permit count to 0
// so that we prevent future callers from spawning this task
permit.forget();
Ok(())
}.in_current_span(),
);
}
pub fn spawn_ondemand_logical_size_calculation(
self: &Arc<Self>,
lsn: Lsn,
cause: LogicalSizeCalculationCause,
ctx: RequestContext,
cancel: CancellationToken,
) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
let (sender, receiver) = oneshot::channel();
let self_clone = Arc::clone(self);
// XXX if our caller loses interest, i.e., ctx is cancelled,
// we should stop the size calculation work and return an error.
// That would require restructuring this function's API to
// return the result directly, instead of a Receiver for the result.
let ctx = ctx.detached_child(
TaskKind::OndemandLogicalSizeCalculation,
DownloadBehavior::Download,
);
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::OndemandLogicalSizeCalculation,
Some(self.tenant_id),
Some(self.timeline_id),
"ondemand logical size calculation",
false,
async move {
let res = self_clone
.logical_size_calculation_task(lsn, cause, &ctx, cancel)
.await;
let _ = sender.send(res).ok();
Ok(()) // Receiver is responsible for handling errors
}
.in_current_span(),
);
receiver
}
#[instrument(skip_all)]
async fn logical_size_calculation_task(
self: &Arc<Self>,
lsn: Lsn,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
cancel: CancellationToken,
) -> Result<u64, CalculateLogicalSizeError> {
debug_assert_current_span_has_tenant_and_timeline_id();
let mut timeline_state_updates = self.subscribe_for_state_updates();
let self_calculation = Arc::clone(self);
let mut calculation = pin!(async {
let cancel = cancel.child_token();
let ctx = ctx.attached_child();
self_calculation
.calculate_logical_size(lsn, cause, cancel, &ctx)
.await
});
let timeline_state_cancellation = async {
loop {
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = *timeline_state_updates.borrow();
match new_state {
// we're running this job for active timelines only
TimelineState::Active => continue,
TimelineState::Broken
| TimelineState::Stopping
| TimelineState::Loading => {
break format!("aborted because timeline became inactive (new state: {new_state:?})")
}
}
}
Err(_sender_dropped_error) => {
// can't happen, the sender is not dropped as long as the Timeline exists
break "aborted because state watch was dropped".to_string();
}
}
}
};
let taskmgr_shutdown_cancellation = async {
task_mgr::shutdown_watcher().await;
"aborted because task_mgr shutdown requested".to_string()
};
loop {
tokio::select! {
res = &mut calculation => { return res }
reason = timeline_state_cancellation => {
debug!(reason = reason, "cancelling calculation");
cancel.cancel();
return calculation.await;
}
reason = taskmgr_shutdown_cancellation => {
debug!(reason = reason, "cancelling calculation");
cancel.cancel();
return calculation.await;
}
}
}
}
/// Calculate the logical size of the database at the latest LSN.
///
/// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
/// especially if we need to download remote layers.
pub async fn calculate_logical_size(
&self,
up_to_lsn: Lsn,
cause: LogicalSizeCalculationCause,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
info!(
"Calculating logical size for timeline {} at {}",
self.timeline_id, up_to_lsn
);
// These failpoints are used by python tests to ensure that we don't delete
// the timeline while the logical size computation is ongoing.
// The first failpoint is used to make this function pause.
// Then the python test initiates timeline delete operation in a thread.
// It waits for a few seconds, then arms the second failpoint and disables
// the first failpoint. The second failpoint prints an error if the timeline
// delete code has deleted the on-disk state while we're still running here.
// It shouldn't do that. If it does it anyway, the error will be caught
// by the test suite, highlighting the problem.
fail::fail_point!("timeline-calculate-logical-size-pause");
fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
if !self
.conf
.metadata_path(self.timeline_id, self.tenant_id)
.exists()
{
error!("timeline-calculate-logical-size-pre metadata file does not exist")
}
// need to return something
Ok(0)
});
// See if we've already done the work for initial size calculation.
// This is a short-cut for timelines that are mostly unused.
if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
return Ok(size);
}
let storage_time_metrics = match cause {
LogicalSizeCalculationCause::Initial
| LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
| LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
LogicalSizeCalculationCause::EvictionTaskImitation => {
&self.metrics.imitate_logical_size_histo
}
};
let timer = storage_time_metrics.start_timer();
let logical_size = self
.get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
.await?;
debug!("calculated logical size: {logical_size}");
timer.stop_and_record();
Ok(logical_size)
}
/// Update current logical size, adding `delta' to the old value.
fn update_current_logical_size(&self, delta: i64) {
let logical_size = &self.current_logical_size;
logical_size.increment_size(delta);
// Also set the value in the prometheus gauge. Note that
// there is a race condition here: if this is is called by two
// threads concurrently, the prometheus gauge might be set to
// one value while current_logical_size is set to the
// other.
match logical_size.current_size() {
Ok(CurrentLogicalSize::Exact(new_current_size)) => self
.metrics
.current_logical_size_gauge
.set(new_current_size),
Ok(CurrentLogicalSize::Approximate(_)) => {
// don't update the gauge yet, this allows us not to update the gauge back and
// forth between the initial size calculation task.
}
// this is overflow
Err(e) => error!("Failed to compute current logical size for metrics update: {e:?}"),
}
fn update_current_logical_size(&self, delta: i64) -> u64 {
let prev_size = self
.current_logical_size
.fetch_add(delta, AtomicOrdering::SeqCst);
(prev_size + delta) as u64
}
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
@@ -4382,7 +3965,7 @@ impl<'a> TimelineWriter<'a> {
self.tl.finish_write(new_lsn);
}
pub fn update_current_logical_size(&self, delta: i64) {
pub fn update_current_logical_size(&self, delta: i64) -> u64 {
self.tl.update_current_logical_size(delta)
}
}

View File

@@ -30,7 +30,7 @@ use crate::{
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
storage_layer::PersistentLayer,
LogicalSizeCalculationCause, Tenant,
Tenant,
},
};
@@ -294,17 +294,12 @@ impl Timeline {
match state.last_layer_access_imitation {
Some(ts) if ts.elapsed() < p.threshold => { /* no need to run */ }
_ => {
self.imitate_timeline_cached_layer_accesses(cancel, ctx)
.await;
self.imitate_timeline_cached_layer_accesses(ctx).await;
state.last_layer_access_imitation = Some(tokio::time::Instant::now())
}
}
drop(state);
if cancel.is_cancelled() {
return ControlFlow::Break(());
}
// This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
// Make one of the tenant's timelines draw the short straw and run the calculation.
// The others wait until the calculation is done so that they take into account the
@@ -333,36 +328,8 @@ impl Timeline {
/// Recompute the values which would cause on-demand downloads during restart.
#[instrument(skip_all)]
async fn imitate_timeline_cached_layer_accesses(
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
) {
async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
let lsn = self.get_last_record_lsn();
// imitiate on-restart initial logical size
let size = self
.calculate_logical_size(
lsn,
LogicalSizeCalculationCause::EvictionTaskImitation,
cancel.clone(),
ctx,
)
.instrument(info_span!("calculate_logical_size"))
.await;
match &size {
Ok(_size) => {
// good, don't log it to avoid confusion
}
Err(_) => {
// we have known issues for which we already log this on consumption metrics,
// gc, and compaction. leave logging out for now.
//
// https://github.com/neondatabase/neon/issues/2539
}
}
// imitiate repartiting on first compactation
if let Err(e) = self
.collect_keyspace(lsn, ctx)
@@ -370,13 +337,7 @@ impl Timeline {
.await
{
// if this failed, we probably failed logical size because these use the same keys
if size.is_err() {
// ignore, see above comment
} else {
warn!(
"failed to collect keyspace but succeeded in calculating logical size: {e:#}"
);
}
warn!("failed to collect keyspace but succeeded in calculating logical size: {e:#}");
}
}
@@ -413,21 +374,9 @@ impl Timeline {
// So, the chance of the worst case is quite low in practice.
// It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
// So, we must coordinate with other with other eviction tasks of this tenant.
let limit = self
.conf
.eviction_task_immitated_concurrent_logical_size_queries
.inner();
let mut throwaway_cache = HashMap::new();
let gather = crate::tenant::size::gather_inputs(
tenant,
limit,
None,
&mut throwaway_cache,
LogicalSizeCalculationCause::EvictionTaskImitation,
ctx,
)
.instrument(info_span!("gather_inputs"));
let gather = crate::tenant::size::gather_inputs(tenant, None, &mut throwaway_cache, ctx)
.instrument(info_span!("gather_inputs"));
tokio::select! {
_ = cancel.cancelled() => {}

View File

@@ -346,9 +346,7 @@ pub(super) async fn handle_walreceiver_connection(
// Send the replication feedback message.
// Regular standby_status_update fields are put into this message.
let (timeline_logical_size, _) = timeline
.get_current_logical_size(&ctx)
.context("Status update creation failed to get current logical size")?;
let timeline_logical_size = timeline.get_current_logical_size();
let status_update = PageserverFeedback {
current_timeline_size: timeline_logical_size,
last_received_lsn,

View File

@@ -149,7 +149,7 @@ def top_output_dir(base_dir: Path) -> Iterator[Path]:
@pytest.fixture(scope="session")
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]:
versioned_dir = pg_distrib_dir / f"v{pg_version}"
versioned_dir = pg_distrib_dir / pg_version.v_prefixed
psql_bin_path = versioned_dir / "bin/psql"
postgres_bin_path = versioned_dir / "bin/postgres"
@@ -1745,8 +1745,8 @@ class PgBin:
def __init__(self, log_dir: Path, pg_distrib_dir: Path, pg_version: PgVersion):
self.log_dir = log_dir
self.pg_version = pg_version
self.pg_bin_path = pg_distrib_dir / f"v{pg_version}" / "bin"
self.pg_lib_dir = pg_distrib_dir / f"v{pg_version}" / "lib"
self.pg_bin_path = pg_distrib_dir / pg_version.v_prefixed / "bin"
self.pg_lib_dir = pg_distrib_dir / pg_version.v_prefixed / "lib"
self.env = os.environ.copy()
self.env["LD_LIBRARY_PATH"] = str(self.pg_lib_dir)

View File

@@ -149,11 +149,16 @@ class PageserverHttpClient(requests.Session):
assert isinstance(res_json, list)
return res_json
def tenant_create(self, new_tenant_id: Optional[TenantId] = None) -> TenantId:
def tenant_create(
self, new_tenant_id: Optional[TenantId] = None, conf: Optional[Dict[str, Any]] = None
) -> TenantId:
if conf is not None:
assert "new_tenant_id" not in conf.keys()
res = self.post(
f"http://localhost:{self.port}/v1/tenant",
json={
"new_tenant_id": str(new_tenant_id) if new_tenant_id else None,
**(conf or {}),
},
)
self.verbose_error(res)

View File

@@ -27,6 +27,12 @@ class PgVersion(str, enum.Enum):
def __repr__(self) -> str:
return f"'{self.value}'"
# In GitHub workflows we use Postgres version with v-prefix (e.g. v14 instead of just 14),
# sometime we need to do so in tests.
@property
def v_prefixed(self) -> str:
return f"v{self.value}"
@classmethod
def _missing_(cls, value) -> Optional["PgVersion"]:
known_values = {v.value for _, v in cls.__members__.items()}

View File

@@ -16,7 +16,7 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.pg_version import PgVersion, skip_on_postgres
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn
from pytest import FixtureRequest
@@ -41,7 +41,6 @@ check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif(
)
@skip_on_postgres(PgVersion.V15, "Compatibility tests doesn't support Postgres 15 yet")
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(before="test_forward_compatibility")
def test_create_snapshot(
@@ -49,12 +48,13 @@ def test_create_snapshot(
pg_bin: PgBin,
top_output_dir: Path,
test_output_dir: Path,
pg_version: PgVersion,
):
# The test doesn't really test anything
# it creates a new snapshot for releases after we tested the current version against the previous snapshot in `test_backward_compatibility`.
#
# There's no cleanup here, it allows to adjust the data in `test_backward_compatibility` itself without re-collecting it.
neon_env_builder.pg_version = PgVersion.V14
neon_env_builder.pg_version = pg_version
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_local_fs_remote_storage()
neon_env_builder.preserve_database_files = True
@@ -90,13 +90,14 @@ def test_create_snapshot(
env.pageserver.stop()
# Directory `compatibility_snapshot_dir` is uploaded to S3 in a workflow, keep the name in sync with it
compatibility_snapshot_dir = top_output_dir / "compatibility_snapshot_pg14"
compatibility_snapshot_dir = (
top_output_dir / f"compatibility_snapshot_pg{pg_version.v_prefixed}"
)
if compatibility_snapshot_dir.exists():
shutil.rmtree(compatibility_snapshot_dir)
shutil.copytree(test_output_dir, compatibility_snapshot_dir)
@skip_on_postgres(PgVersion.V15, "Compatibility tests doesn't support Postgres 15 yet")
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
@@ -115,7 +116,7 @@ def test_backward_compatibility(
compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR")
assert (
compatibility_snapshot_dir_env is not None
), "COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg14` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)"
), f"COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg{pg_version.v_prefixed}` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)"
compatibility_snapshot_dir = Path(compatibility_snapshot_dir_env).resolve()
breaking_changes_allowed = (
@@ -155,7 +156,6 @@ def test_backward_compatibility(
), "Breaking changes are allowed by ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
@skip_on_postgres(PgVersion.V15, "Compatibility tests doesn't support Postgres 15 yet")
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
@@ -183,7 +183,9 @@ def test_forward_compatibility(
), "COMPATIBILITY_POSTGRES_DISTRIB_DIR is not set. It should be set to a pg_install directrory (ideally generated by the previous version of Neon)"
compatibility_postgres_distrib_dir = Path(compatibility_postgres_distrib_dir_env).resolve()
compatibility_snapshot_dir = top_output_dir / "compatibility_snapshot_pg14"
compatibility_snapshot_dir = (
top_output_dir / f"compatibility_snapshot_pg{pg_version.v_prefixed}"
)
breaking_changes_allowed = (
os.environ.get("ALLOW_FORWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"

View File

@@ -1,9 +1,7 @@
import pytest
from fixtures.neon_fixtures import NeonEnv
from fixtures.pg_version import PgVersion, xfail_on_postgres
@xfail_on_postgres(PgVersion.V15, reason="https://github.com/neondatabase/neon/pull/4182")
@pytest.mark.timeout(1800)
def test_hot_standby(neon_simple_env: NeonEnv):
env = neon_simple_env

View File

@@ -5,7 +5,6 @@ from pathlib import Path
import pytest
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.pg_version import PgVersion, xfail_on_postgres
# Run the main PostgreSQL regression tests, in src/test/regress.
@@ -33,8 +32,8 @@ def test_pg_regress(
(runpath / "testtablespace").mkdir(parents=True)
# Compute all the file locations that pg_regress will need.
build_path = pg_distrib_dir / f"build/v{env.pg_version}/src/test/regress"
src_path = base_dir / f"vendor/postgres-v{env.pg_version}/src/test/regress"
build_path = pg_distrib_dir / f"build/{env.pg_version.v_prefixed}/src/test/regress"
src_path = base_dir / f"vendor/postgres-{env.pg_version.v_prefixed}/src/test/regress"
bindir = pg_distrib_dir / f"v{env.pg_version}/bin"
schedule = src_path / "parallel_schedule"
pg_regress = build_path / "pg_regress"
@@ -72,7 +71,6 @@ def test_pg_regress(
#
# This runs for a long time, especially in debug mode, so use a larger-than-default
# timeout.
@xfail_on_postgres(PgVersion.V15, reason="https://github.com/neondatabase/neon/pull/4213")
@pytest.mark.timeout(1800)
def test_isolation(
neon_simple_env: NeonEnv,
@@ -97,8 +95,8 @@ def test_isolation(
(runpath / "testtablespace").mkdir(parents=True)
# Compute all the file locations that pg_isolation_regress will need.
build_path = pg_distrib_dir / f"build/v{env.pg_version}/src/test/isolation"
src_path = base_dir / f"vendor/postgres-v{env.pg_version}/src/test/isolation"
build_path = pg_distrib_dir / f"build/{env.pg_version.v_prefixed}/src/test/isolation"
src_path = base_dir / f"vendor/postgres-{env.pg_version.v_prefixed}/src/test/isolation"
bindir = pg_distrib_dir / f"v{env.pg_version}/bin"
schedule = src_path / "isolation_schedule"
pg_isolation_regress = build_path / "pg_isolation_regress"