mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 10:00:38 +00:00
Compare commits
5 Commits
problame/t
...
prof-disk-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c1cd13d948 | ||
|
|
a64a5cb56c | ||
|
|
7c2d09f4f8 | ||
|
|
8e86f693f2 | ||
|
|
5a46473f7c |
18
.github/actions/run-python-test-set/action.yml
vendored
18
.github/actions/run-python-test-set/action.yml
vendored
@@ -71,12 +71,12 @@ runs:
|
||||
path: /tmp/neon-previous
|
||||
prefix: latest
|
||||
|
||||
- name: Download compatibility snapshot
|
||||
if: inputs.build_type != 'remote'
|
||||
- name: Download compatibility snapshot for Postgres 14
|
||||
if: inputs.build_type != 'remote' && inputs.pg_version == 'v14'
|
||||
uses: ./.github/actions/download
|
||||
with:
|
||||
name: compatibility-snapshot-${{ inputs.build_type }}-pg${{ inputs.pg_version }}
|
||||
path: /tmp/compatibility_snapshot_pg${{ inputs.pg_version }}
|
||||
name: compatibility-snapshot-${{ inputs.build_type }}-pg14
|
||||
path: /tmp/compatibility_snapshot_pg14
|
||||
prefix: latest
|
||||
|
||||
- name: Checkout
|
||||
@@ -106,7 +106,7 @@ runs:
|
||||
BUILD_TYPE: ${{ inputs.build_type }}
|
||||
AWS_ACCESS_KEY_ID: ${{ inputs.real_s3_access_key_id }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ inputs.real_s3_secret_access_key }}
|
||||
COMPATIBILITY_SNAPSHOT_DIR: /tmp/compatibility_snapshot_pg${{ inputs.pg_version }}
|
||||
COMPATIBILITY_SNAPSHOT_DIR: /tmp/compatibility_snapshot_pg14
|
||||
ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'backward compatibility breakage')
|
||||
ALLOW_FORWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'forward compatibility breakage')
|
||||
RERUN_FLAKY: ${{ inputs.rerun_flaky }}
|
||||
@@ -197,13 +197,13 @@ runs:
|
||||
scripts/generate_and_push_perf_report.sh
|
||||
fi
|
||||
|
||||
- name: Upload compatibility snapshot
|
||||
if: github.ref_name == 'release'
|
||||
- name: Upload compatibility snapshot for Postgres 14
|
||||
if: github.ref_name == 'release' && inputs.pg_version == 'v14'
|
||||
uses: ./.github/actions/upload
|
||||
with:
|
||||
name: compatibility-snapshot-${{ inputs.build_type }}-pg${{ inputs.pg_version }}-${{ github.run_id }}
|
||||
name: compatibility-snapshot-${{ inputs.build_type }}-pg14-${{ github.run_id }}
|
||||
# Directory is created by test_compatibility.py::test_create_snapshot, keep the path in sync with the test
|
||||
path: /tmp/test_output/compatibility_snapshot_pg${{ inputs.pg_version }}/
|
||||
path: /tmp/test_output/compatibility_snapshot_pg14/
|
||||
prefix: latest
|
||||
|
||||
- name: Upload test results
|
||||
|
||||
18
.github/workflows/build_and_test.yml
vendored
18
.github/workflows/build_and_test.yml
vendored
@@ -711,11 +711,7 @@ jobs:
|
||||
|
||||
compute-node-image:
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
container:
|
||||
image: gcr.io/kaniko-project/executor:v1.9.2-debug
|
||||
# Workaround for "Resolving download.osgeo.org (download.osgeo.org)... failed: Temporary failure in name resolution.""
|
||||
# Should be prevented by https://github.com/neondatabase/neon/issues/4281
|
||||
options: --add-host=download.osgeo.org:140.211.15.30
|
||||
container: gcr.io/kaniko-project/executor:v1.9.2-debug
|
||||
needs: [ tag ]
|
||||
strategy:
|
||||
fail-fast: false
|
||||
@@ -961,7 +957,7 @@ jobs:
|
||||
promote-compatibility-data:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:pinned
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
options: --init
|
||||
needs: [ promote-images, tag, regress-tests ]
|
||||
if: github.ref_name == 'release' && github.event_name != 'workflow_dispatch'
|
||||
@@ -972,13 +968,11 @@ jobs:
|
||||
PREFIX: artifacts/latest
|
||||
run: |
|
||||
# Update compatibility snapshot for the release
|
||||
for pg_version in v14 v15; do
|
||||
for build_type in debug release; do
|
||||
OLD_FILENAME=compatibility-snapshot-${build_type}-pg${pg_version}-${GITHUB_RUN_ID}.tar.zst
|
||||
NEW_FILENAME=compatibility-snapshot-${build_type}-pg${pg_version}.tar.zst
|
||||
for build_type in debug release; do
|
||||
OLD_FILENAME=compatibility-snapshot-${build_type}-pg14-${GITHUB_RUN_ID}.tar.zst
|
||||
NEW_FILENAME=compatibility-snapshot-${build_type}-pg14.tar.zst
|
||||
|
||||
time aws s3 mv --only-show-errors s3://${BUCKET}/${PREFIX}/${OLD_FILENAME} s3://${BUCKET}/${PREFIX}/${NEW_FILENAME}
|
||||
done
|
||||
time aws s3 mv --only-show-errors s3://${BUCKET}/${PREFIX}/${OLD_FILENAME} s3://${BUCKET}/${PREFIX}/${NEW_FILENAME}
|
||||
done
|
||||
|
||||
# Update Neon artifact for the release (reuse already uploaded artifact)
|
||||
|
||||
@@ -393,79 +393,69 @@ impl PageServerNode {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn tenant_config(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
mut settings: HashMap<&str, &str>,
|
||||
) -> anyhow::Result<()> {
|
||||
pub fn tenant_config(&self, tenant_id: TenantId, settings: HashMap<&str, &str>) -> Result<()> {
|
||||
let config = {
|
||||
// Braces to make the diff easier to read
|
||||
models::TenantConfig {
|
||||
checkpoint_distance: settings
|
||||
.remove("checkpoint_distance")
|
||||
.get("checkpoint_distance")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'checkpoint_distance' as an integer")?,
|
||||
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
|
||||
checkpoint_timeout: settings.get("checkpoint_timeout").map(|x| x.to_string()),
|
||||
compaction_target_size: settings
|
||||
.remove("compaction_target_size")
|
||||
.get("compaction_target_size")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_target_size' as an integer")?,
|
||||
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
|
||||
compaction_period: settings.get("compaction_period").map(|x| x.to_string()),
|
||||
compaction_threshold: settings
|
||||
.remove("compaction_threshold")
|
||||
.get("compaction_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_threshold' as an integer")?,
|
||||
gc_horizon: settings
|
||||
.remove("gc_horizon")
|
||||
.get("gc_horizon")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_horizon' as an integer")?,
|
||||
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
|
||||
gc_period: settings.get("gc_period").map(|x| x.to_string()),
|
||||
image_creation_threshold: settings
|
||||
.remove("image_creation_threshold")
|
||||
.get("image_creation_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
|
||||
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
|
||||
pitr_interval: settings.get("pitr_interval").map(|x| x.to_string()),
|
||||
walreceiver_connect_timeout: settings
|
||||
.remove("walreceiver_connect_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
lagging_wal_timeout: settings
|
||||
.remove("lagging_wal_timeout")
|
||||
.get("walreceiver_connect_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
lagging_wal_timeout: settings.get("lagging_wal_timeout").map(|x| x.to_string()),
|
||||
max_lsn_wal_lag: settings
|
||||
.remove("max_lsn_wal_lag")
|
||||
.get("max_lsn_wal_lag")
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||
trace_read_requests: settings
|
||||
.remove("trace_read_requests")
|
||||
.get("trace_read_requests")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
eviction_policy: settings
|
||||
.remove("eviction_policy")
|
||||
.map(serde_json::from_str)
|
||||
.get("eviction_policy")
|
||||
.map(|x| serde_json::from_str(x))
|
||||
.transpose()
|
||||
.context("Failed to parse 'eviction_policy' json")?,
|
||||
min_resident_size_override: settings
|
||||
.remove("min_resident_size_override")
|
||||
.get("min_resident_size_override")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'min_resident_size_override' as an integer")?,
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.get("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
}
|
||||
};
|
||||
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
}
|
||||
|
||||
self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))?
|
||||
.json(&models::TenantConfigRequest { tenant_id, config })
|
||||
.send()?
|
||||
|
||||
@@ -1,232 +0,0 @@
|
||||
# The state of pageserver tenant relocation
|
||||
|
||||
Created on 17.03.23
|
||||
|
||||
## Motivation
|
||||
|
||||
There were previous write ups on the subject. The design of tenant relocation was planned at the time when we had quite different landscape. I e there was no on-demand download/eviction. They were on the horizon but we still planned for cases when they were not available. Some other things have changed. Now safekeepers offload wal to s3 so we're not risking overflowing their disks. Having all of the above, it makes sense to recap and take a look at the options we have now, which adjustments we'd like to make to original process, etc.
|
||||
|
||||
Related (in chronological order):
|
||||
|
||||
- Tracking issue with initial discussion: [#886](https://github.com/neondatabase/neon/issues/886)
|
||||
- [015. Storage Messaging](015-storage-messaging.md)
|
||||
- [020. Pageserver S3 Coordination](020-pageserver-s3-coordination.md)
|
||||
|
||||
## Summary
|
||||
|
||||
The RFC consists of a walkthrough of prior art on tenant relocation and corresponding problems. It describes 3 approaches.
|
||||
|
||||
1. Simplistic approach that uses ignore and is the fastest to implement. The main downside is a requirement of short downtime.
|
||||
2. More complicated approach that avoids even short downtime.
|
||||
3. Even more complicated approach that will allow multiple pageservers to operate concurrently on the same tenant possibly allowing for HA cluster topologies and horizontal scaling of reads (i e compute talks to multiple pageservers).
|
||||
|
||||
The order in which solutions are described is a bit different. We start from 2, then move to possible compromises (aka simplistic approach) and then move to discussing directions for solving HA/Pageserver replica case with 3.
|
||||
|
||||
## Components
|
||||
|
||||
pageserver, control-plane, safekeepers (a bit)
|
||||
|
||||
## Requirements
|
||||
|
||||
Relocation procedure should move tenant from one pageserver to another without downtime introduced by storage side. For now restarting compute for applying new configuration is fine.
|
||||
|
||||
- component restarts
|
||||
- component outage
|
||||
- pageserver loss
|
||||
|
||||
## The original proposed implementation
|
||||
|
||||
The starting point is this sequence:
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
autonumber
|
||||
participant CP as Control Plane
|
||||
participant PS1 as Pageserver 1
|
||||
participant PS2 as Pageserver 2
|
||||
participant S3
|
||||
|
||||
CP->>PS2: Attach tenant X
|
||||
PS2->>S3: Fetch timelines, indexes for them
|
||||
PS2->>CP: Accepted
|
||||
CP->>CP: Change pageserver id in project
|
||||
CP->>PS1: Detach
|
||||
```
|
||||
|
||||
Which problems do we have with naive approach?
|
||||
|
||||
### Concurrent GC and Compaction
|
||||
|
||||
The problem is that they can run on both, PS1 and PS2. Consider this example from [Pageserver S3 Coordination RFC](020-pageserver-s3-coordination.md)
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
autonumber
|
||||
participant PS1
|
||||
participant S3
|
||||
participant PS2
|
||||
|
||||
PS1->>S3: Uploads L1, L2 <br/> Index contains L1 L2
|
||||
PS2->>S3: Attach called, sees L1, L2
|
||||
PS1->>S3: Compaction comes <br/> Removes L1, adds L3
|
||||
note over S3: Index now L2, L3
|
||||
PS2->>S3: Uploads new layer L4 <br/> (added to previous view of the index)
|
||||
note over S3: Index now L1, L2, L4
|
||||
```
|
||||
|
||||
At this point it is not possible to restore the state from index, it contains L2 which
|
||||
is no longer available in s3 and doesnt contain L3 added by compaction by the
|
||||
first pageserver. So if any of the pageservers restart, initial sync will fail
|
||||
(or in on-demand world it will fail a bit later during page request from
|
||||
missing layer)
|
||||
|
||||
The problem lies in shared index_part.json. Having intersecting layers from append only edits is expected to work, though this is an uncharted territory without tests.
|
||||
|
||||
#### Options
|
||||
|
||||
There are several options on how to restrict concurrent access to index file.
|
||||
|
||||
First and the simplest one is external orchestration. Control plane which runs migration can use special api call on pageserver to stop background processes (gc, compaction), and even possibly all uploads.
|
||||
|
||||
So the sequence becomes:
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
autonumber
|
||||
participant CP as Control Plane
|
||||
participant PS1 as Pageserver 1
|
||||
participant PS2 as Pageserver 2
|
||||
participant S3
|
||||
|
||||
CP->>PS1: Pause background jobs, pause uploading new layers.
|
||||
CP->>PS2: Attach tenant X.
|
||||
PS2->>S3: Fetch timelines, index, start background operations
|
||||
PS2->>CP: Accepted
|
||||
CP->>CP: Monitor PS2 last record lsn, ensure OK lag
|
||||
CP->>CP: Change pageserver id in project
|
||||
CP->>PS1: Detach
|
||||
```
|
||||
|
||||
The downside of this sequence is the potential rollback process. What if something goes wrong on new pageserver? Can we safely roll back to source pageserver?
|
||||
|
||||
There are two questions:
|
||||
|
||||
#### How can we detect that something went wrong?
|
||||
|
||||
We can run usual availability check (consists of compute startup and an update of one row).
|
||||
Note that we cant run separate compute for that before touching compute that client runs actual workload on, because we cant have two simultaneous computes running in read-write mode on the same timeline (enforced by safekeepers consensus algorithm). So we can either run some readonly check first (basebackup) and then change pageserver id and run availability check. If it failed we can roll it back to the old one.
|
||||
|
||||
#### What can go wrong? And how we can safely roll-back?
|
||||
|
||||
In the sequence above during attach we start background processes/uploads. They change state in remote storage so it is possible that after rollback remote state will be different from one that was observed by source pageserver. So if target pageserver goes wild then source pageserver may fail to start with changed remote state.
|
||||
|
||||
Proposed option would be to implement a barrier (read-only) mode when pageserver does not update remote state.
|
||||
|
||||
So the sequence for happy path becomes this one:
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
autonumber
|
||||
participant CP as Control Plane
|
||||
participant PS1 as Pageserver 1
|
||||
participant PS2 as Pageserver 2
|
||||
participant S3
|
||||
|
||||
CP->>PS1: Pause background jobs, pause uploading new layers.
|
||||
CP->>PS2: Attach tenant X in remote readonly mode.
|
||||
PS2->>S3: Fetch timelines, index
|
||||
PS2->>CP: Accepted
|
||||
CP->>CP: Monitor PS2 last record lsn, ensure OK lag
|
||||
CP->>CP: Change pageserver id in project
|
||||
CP->>CP: Run successful availability check
|
||||
CP->>PS2: Start uploads, background tasks
|
||||
CP->>PS1: Detach
|
||||
```
|
||||
|
||||
With this sequence we restrict any changes to remote storage to one pageserver. So there is no concurrent access at all, not only for index_part.json, but for everything else too. This approach makes it possible to roll back after failure on new pageserver.
|
||||
|
||||
The sequence with roll back process:
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
autonumber
|
||||
participant CP as Control Plane
|
||||
participant PS1 as Pageserver 1
|
||||
participant PS2 as Pageserver 2
|
||||
participant S3
|
||||
|
||||
CP->>PS1: Pause background jobs, pause uploading new layers.
|
||||
CP->>PS2: Attach tenant X in remote readonly mode.
|
||||
PS2->>S3: Fetch timelines, index
|
||||
PS2->>CP: Accepted
|
||||
CP->>CP: Monitor PS2 last record lsn, ensure OK lag
|
||||
CP->>CP: Change pageserver id in project
|
||||
CP->>CP: Availability check Failed
|
||||
CP->>CP: Change pageserver id back
|
||||
CP->>PS1: Resume remote operations
|
||||
CP->>PS2: Ignore (instead of detach for investigation purposes)
|
||||
```
|
||||
|
||||
## Concurrent branch creation
|
||||
|
||||
Another problem is a possibility of concurrent branch creation calls.
|
||||
|
||||
I e during migration create_branch can be called on old pageserver and newly created branch wont be seen on new pageserver. Prior art includes prototyping an approach of trying to mirror such branches, but currently it lost its importance, because now attach is fast because we dont need to download all data, and additionally to the best of my knowledge of control plane internals (cc @ololobus to confirm) operations on one project are executed sequentially, so it is not possible to have such case. So branch create operation will be executed only when relocation is completed. As a safety measure we can forbid branch creation for tenants that are in readonly remote state.
|
||||
|
||||
## Simplistic approach
|
||||
|
||||
The difference of simplistic approach from one described above is that it calls ignore on source tenant first and then calls attach on target pageserver. Approach above does it in opposite order thus opening a possibility for race conditions we strive to avoid.
|
||||
|
||||
The approach largely follows this guide: <https://github.com/neondatabase/cloud/wiki/Cloud:-Ad-hoc-tenant-relocation>
|
||||
|
||||
The happy path sequence:
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
autonumber
|
||||
participant CP as Control Plane
|
||||
participant PS1 as Pageserver 1
|
||||
participant PS2 as Pageserver 2
|
||||
participant SK as Safekeeper
|
||||
participant S3
|
||||
|
||||
CP->>CP: Enable maintenance mode
|
||||
CP->>PS1: Ignore
|
||||
CP->>PS2: Attach
|
||||
PS2->>CP: Accepted
|
||||
loop Delete layers for each timeline
|
||||
CP->>PS2: Get last record lsn
|
||||
CP->>SK: Get commit lsn
|
||||
CP->>CP: OK? Timed out?
|
||||
end
|
||||
CP->>CP: Change pageserver id in project
|
||||
CP->>CP: Run successful availability check
|
||||
CP->>CP: Disable maintenance mode
|
||||
CP->>PS1: Detach ignored
|
||||
```
|
||||
|
||||
The sequence contains exactly the same rollback problems as in previous approach described above. They can be resolved the same way.
|
||||
|
||||
Most probably we'd like to move forward without this safety measure and implement it on top of this approach to make progress towards the downtime-less one.
|
||||
|
||||
## Lease based approach
|
||||
|
||||
In order to allow for concurrent operation on the same data on remote storage for multiple pageservers we need to go further than external orchestration.
|
||||
|
||||
NOTE: [020. Pageserver S3 Coordination](020-pageserver-s3-coordination.md) discusses one more approach that relies on duplication of index_part.json for each pageserver operating on the timeline. This approach still requires external coordination which makes certain things easier but requires additional bookkeeping to account for multiple index_part.json files. Discussion/comparison with proposed lease based approach
|
||||
|
||||
The problems are outlined in [020. Pageserver S3 Coordination](020-pageserver-s3-coordination.md) and suggested solution includes [Coordination based approach](020-pageserver-s3-coordination.md#coordination-based-approach). This way it will allow to do basic leader election for pageservers so they can decide which node will be responsible for running GC and compaction. The process is based on extensive communication via storage broker and consists of a lease that is taken by one of the pageservers that extends it to continue serving a leader role.
|
||||
|
||||
There are two options for ingesting new data into pageserver in follower role. One option is to avoid WAL ingestion at all and rely on notifications from leader to discover new layers on s3. Main downside of this approach is that follower will always lag behind the primary node because it wont have the last layer until it is uploaded to remote storage. In case of a primary failure follower will be required to reingest last segment (up to 256Mb of WAL currently) which slows down recovery. Additionally if compute is connected to follower pageserver it will observe latest data with a delay. Queries from compute will likely experience bigger delays when recent lsn is required.
|
||||
|
||||
The second option is to consume WAL stream on both pageservers. In this case the only problem is non deterministic layer generation. Additional bookkeeping will be required to deduplicate layers from primary with local ones. Some process needs to somehow merge them to remove duplicated data. Additionally we need to have good testing coverage to ensure that our implementation of `get_page@lsn` properly handles intersecting layers.
|
||||
|
||||
There is another tradeoff. Approaches may be different in amount of traffic between system components. With first approach there can be increased traffic between follower and remote storage. But only in case follower has some activity that actually requests pages (!). With other approach traffic increase will be permanent and will be caused by two WAL streams instead of one.
|
||||
|
||||
## Summary
|
||||
|
||||
Proposed implementation strategy:
|
||||
|
||||
Go with the simplest approach for now. Then work on tech debt, increase test coverage. Then gradually move forward to second approach by implementing safety measures first, finishing with switch of order between ignore and attach operation.
|
||||
|
||||
And only then go to lease based approach to solve HA/Pageserver replica use cases.
|
||||
@@ -131,14 +131,13 @@ pub struct TimelineCreateRequest {
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
#[derive(Serialize, Deserialize, Default)]
|
||||
pub struct TenantCreateRequest {
|
||||
#[serde(default)]
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub new_tenant_id: Option<TenantId>,
|
||||
#[serde(flatten)]
|
||||
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
|
||||
pub config: TenantConfig,
|
||||
}
|
||||
|
||||
impl std::ops::Deref for TenantCreateRequest {
|
||||
@@ -149,7 +148,7 @@ impl std::ops::Deref for TenantCreateRequest {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
#[derive(Serialize, Deserialize, Default)]
|
||||
pub struct TenantConfig {
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
pub checkpoint_timeout: Option<String>,
|
||||
@@ -193,13 +192,12 @@ impl TenantCreateRequest {
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantConfigRequest {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub tenant_id: TenantId,
|
||||
#[serde(flatten)]
|
||||
pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it
|
||||
pub config: TenantConfig,
|
||||
}
|
||||
|
||||
impl std::ops::Deref for TenantConfigRequest {
|
||||
@@ -770,31 +768,4 @@ mod tests {
|
||||
assert!(format!("{:?}", &original_broken.state).contains("reason"));
|
||||
assert!(format!("{:?}", &original_broken.state).contains("backtrace info"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_reject_unknown_field() {
|
||||
let id = TenantId::generate();
|
||||
let create_request = json!({
|
||||
"new_tenant_id": id.to_string(),
|
||||
"unknown_field": "unknown_value".to_string(),
|
||||
});
|
||||
let err = serde_json::from_value::<TenantCreateRequest>(create_request).unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("unknown field `unknown_field`"),
|
||||
"expect unknown field `unknown_field` error, got: {}",
|
||||
err
|
||||
);
|
||||
|
||||
let id = TenantId::generate();
|
||||
let config_request = json!({
|
||||
"tenant_id": id.to_string(),
|
||||
"unknown_field": "unknown_value".to_string(),
|
||||
});
|
||||
let err = serde_json::from_value::<TenantConfigRequest>(config_request).unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("unknown field `unknown_field`"),
|
||||
"expect unknown field `unknown_field` error, got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,3 +88,7 @@ harness = false
|
||||
[[bench]]
|
||||
name = "bench_walredo"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "bench_disk_lookup"
|
||||
harness = false
|
||||
|
||||
133
pageserver/benches/bench_disk_lookup.rs
Normal file
133
pageserver/benches/bench_disk_lookup.rs
Normal file
@@ -0,0 +1,133 @@
|
||||
use criterion::{black_box, criterion_group, criterion_main, Criterion};
|
||||
use pageserver::{tenant::{disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}, block_io::{BlockBuf, FileBlockReader}, storage_layer::DeltaLayerWriter}, repository::Key, virtual_file::{VirtualFile, self}, page_cache};
|
||||
use std::{time::Instant, collections::BTreeMap};
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use utils::{id::{TimelineId, TenantId}, lsn::Lsn};
|
||||
use std::{io::{Read, Write}, path::PathBuf};
|
||||
use pageserver::config::PageServerConf;
|
||||
|
||||
struct MockLayer {
|
||||
pub path: PathBuf,
|
||||
pub index_start_blk: u32,
|
||||
pub index_root_blk: u32,
|
||||
}
|
||||
|
||||
impl MockLayer {
|
||||
fn read(&self, key: i128) -> Option<u64> {
|
||||
// Read from disk btree
|
||||
let file = FileBlockReader::new(VirtualFile::open(&self.path).unwrap());
|
||||
let tree_reader = DiskBtreeReader::<_, 24>::new(
|
||||
self.index_start_blk,
|
||||
self.index_root_blk,
|
||||
file,
|
||||
);
|
||||
|
||||
let key: Key = Key::from_i128(key);
|
||||
let mut key_bytes: [u8; 24] = [8u8; 24];
|
||||
key.write_to_byte_slice(&mut key_bytes);
|
||||
|
||||
let mut result = None;
|
||||
tree_reader.visit(&key_bytes, VisitDirection::Backwards, |key, value| {
|
||||
if key == key_bytes {
|
||||
result = Some(value);
|
||||
}
|
||||
return false
|
||||
}).unwrap();
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
fn make_simple(n_keys: i128, name: &str) -> MockLayer {
|
||||
let block_buf = BlockBuf::new();
|
||||
let mut writer = DiskBtreeBuilder::<_, 24>::new(block_buf);
|
||||
for i in 0..n_keys {
|
||||
let key: Key = Key::from_i128(i);
|
||||
let value: u64 = i as u64;
|
||||
|
||||
let mut key_bytes: [u8; 24] = [8u8; 24];
|
||||
key.write_to_byte_slice(&mut key_bytes);
|
||||
writer.append(&key_bytes, value).unwrap();
|
||||
}
|
||||
let (index_root_blk, block_buf) = writer.finish().unwrap();
|
||||
let index_start_blk = 0; // ???
|
||||
let path = std::env::current_dir().unwrap()
|
||||
.parent().unwrap()
|
||||
.join("test_output")
|
||||
.join("bench_disk_lookup")
|
||||
.join("disk_btree")
|
||||
.join(name);
|
||||
std::fs::create_dir_all(path.clone().parent().unwrap()).unwrap();
|
||||
let layer = MockLayer {
|
||||
path: path.clone(),
|
||||
index_start_blk,
|
||||
index_root_blk,
|
||||
};
|
||||
|
||||
let mut file = VirtualFile::create(&path).unwrap();
|
||||
for buf in block_buf.blocks {
|
||||
file.write_all(buf.as_ref()).unwrap();
|
||||
}
|
||||
|
||||
layer
|
||||
}
|
||||
|
||||
fn make_many(n_keys: i128, n_layers: i128) -> Vec<MockLayer> {
|
||||
(0..n_layers)
|
||||
.map(|i| make_simple(n_keys, &format!("layer_{}.tmp", i)))
|
||||
.collect()
|
||||
}
|
||||
|
||||
|
||||
// cargo bench --bench bench_disk_lookup
|
||||
fn bench_disk_lookup(c: &mut Criterion) {
|
||||
virtual_file::init(10);
|
||||
page_cache::init(10000);
|
||||
|
||||
// Results in a 40MB index
|
||||
let n_keys = 4_000_000;
|
||||
|
||||
// One layer for each query
|
||||
let n_layers = 100;
|
||||
let n_queries = n_layers;
|
||||
|
||||
// Write to disk btree
|
||||
let layers = make_many(n_keys, n_layers);
|
||||
|
||||
// Write to mem btrees
|
||||
let mem_btrees: Vec<BTreeMap<i128, u64>> = (0..n_layers)
|
||||
.map(|_| (0..n_keys)
|
||||
.map(|i| (i as i128, i as u64))
|
||||
.collect())
|
||||
.collect();
|
||||
|
||||
// Pick queries
|
||||
let rng = &mut StdRng::seed_from_u64(1);
|
||||
let queries: Vec<_> = (0..n_keys).collect();
|
||||
let queries: Vec<_> = queries.choose_multiple(rng, n_queries as usize).copied().collect();
|
||||
|
||||
// Define and name the benchmark function
|
||||
let mut group = c.benchmark_group("g1");
|
||||
group.bench_function("disk_btree", |b| {
|
||||
b.iter(|| {
|
||||
for (i, q) in queries.clone().into_iter().enumerate() {
|
||||
black_box({
|
||||
assert_eq!(layers[i].read(q), Some(q as u64));
|
||||
})
|
||||
}
|
||||
});
|
||||
});
|
||||
group.bench_function("mem_btree", |b| {
|
||||
b.iter(|| {
|
||||
for (i, q) in queries.clone().into_iter().enumerate() {
|
||||
black_box({
|
||||
assert_eq!(mem_btrees[i].get(&q), Some(&(q as u64)));
|
||||
})
|
||||
}
|
||||
});
|
||||
});
|
||||
group.finish();
|
||||
}
|
||||
|
||||
criterion_group!(group_1, bench_disk_lookup);
|
||||
criterion_main!(group_1);
|
||||
@@ -741,11 +741,8 @@ paths:
|
||||
$ref: "#/components/schemas/Error"
|
||||
post:
|
||||
description: |
|
||||
Create a tenant. Returns new tenant id on success.
|
||||
|
||||
Create a tenant. Returns new tenant id on success.\
|
||||
If no new tenant id is specified in parameters, it would be generated. It's an error to recreate the same tenant.
|
||||
|
||||
Invalid fields in the tenant config will cause the request to be rejected with status 400.
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
@@ -793,8 +790,6 @@ paths:
|
||||
put:
|
||||
description: |
|
||||
Update tenant's config.
|
||||
|
||||
Invalid fields in the tenant config will cause the request to be rejected with status 400.
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
|
||||
@@ -18,7 +18,6 @@ use remote_storage::DownloadError;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use tokio::sync::watch;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
|
||||
@@ -81,6 +80,7 @@ use utils::{
|
||||
mod blob_io;
|
||||
pub mod block_io;
|
||||
pub mod disk_btree;
|
||||
pub mod disk_persistent_bst;
|
||||
pub(crate) mod ephemeral_file;
|
||||
pub mod layer_map;
|
||||
|
||||
@@ -155,8 +155,6 @@ pub struct Tenant {
|
||||
cached_synthetic_tenant_size: Arc<AtomicU64>,
|
||||
|
||||
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
|
||||
|
||||
background_loops_cancel: Mutex<Option<CancellationToken>>,
|
||||
}
|
||||
|
||||
/// A timeline with some of its files on disk, being initialized.
|
||||
@@ -1591,7 +1589,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
/// Changes tenant status to active, unless shutdown was already requested.
|
||||
fn activate(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
fn activate(&self, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let mut result = Ok(());
|
||||
@@ -1623,15 +1621,8 @@ impl Tenant {
|
||||
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
|
||||
|
||||
// Spawn gc and compaction loops. The loops will shut themselves
|
||||
// down in response to the cancellation token getting dropped.
|
||||
let background_loops_cancel = CancellationToken::new();
|
||||
let existing = self
|
||||
.background_loops_cancel
|
||||
.lock()
|
||||
.unwrap()
|
||||
.replace(background_loops_cancel.clone());
|
||||
assert!(existing.is_none(), "we don't support re-activation");
|
||||
tasks::start_background_loops(self, background_loops_cancel.clone());
|
||||
// down when they notice that the tenant is inactive.
|
||||
tasks::start_background_loops(self.tenant_id);
|
||||
|
||||
let mut activated_timelines = 0;
|
||||
let mut timelines_broken_during_activation = 0;
|
||||
@@ -1687,10 +1678,6 @@ impl Tenant {
|
||||
TenantState::Active | TenantState::Loading | TenantState::Attaching => {
|
||||
*current_state = TenantState::Stopping;
|
||||
|
||||
if let Some(cancel) = self.background_loops_cancel.lock().unwrap().take() {
|
||||
cancel.cancel();
|
||||
}
|
||||
|
||||
// FIXME: If the tenant is still Loading or Attaching, new timelines
|
||||
// might be created after this. That's harmless, as the Timelines
|
||||
// won't be accessible to anyone, when the Tenant is in Stopping
|
||||
@@ -1725,10 +1712,6 @@ impl Tenant {
|
||||
// we can, but it shouldn't happen.
|
||||
warn!("Changing Active tenant to Broken state, reason: {}", reason);
|
||||
*current_state = TenantState::broken_from_reason(reason);
|
||||
|
||||
if let Some(cancel) = self.background_loops_cancel.lock().unwrap().take() {
|
||||
cancel.cancel();
|
||||
}
|
||||
}
|
||||
TenantState::Broken { .. } => {
|
||||
// This shouldn't happen either
|
||||
@@ -1750,6 +1733,10 @@ impl Tenant {
|
||||
});
|
||||
}
|
||||
|
||||
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
|
||||
self.state.subscribe()
|
||||
}
|
||||
|
||||
pub async fn wait_to_become_active(&self) -> anyhow::Result<()> {
|
||||
let mut receiver = self.state.subscribe();
|
||||
loop {
|
||||
@@ -1996,7 +1983,6 @@ impl Tenant {
|
||||
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
|
||||
background_loops_cancel: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
0
pageserver/src/tenant/disk_persistent_bst.rs
Normal file
0
pageserver/src/tenant/disk_persistent_bst.rs
Normal file
@@ -1,6 +1,7 @@
|
||||
//! This module contains functions to serve per-tenant background processes,
|
||||
//! such as compaction and GC
|
||||
|
||||
use std::ops::ControlFlow;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
@@ -8,12 +9,13 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::TENANT_TASK_EVENTS;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::Tenant;
|
||||
use crate::tenant::mgr;
|
||||
use crate::tenant::{Tenant, TenantState};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::id::TenantId;
|
||||
|
||||
pub fn start_background_loops(tenant: &Arc<Tenant>, cancel: CancellationToken) {
|
||||
let tenant_id = tenant.tenant_id;
|
||||
pub fn start_background_loops(tenant_id: TenantId) {
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::Compaction,
|
||||
@@ -21,15 +23,11 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, cancel: CancellationToken) {
|
||||
None,
|
||||
&format!("compactor for tenant {tenant_id}"),
|
||||
false,
|
||||
{
|
||||
let tenant = Arc::clone(tenant);
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
compaction_loop(tenant, cancel)
|
||||
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
async move {
|
||||
compaction_loop(tenant_id)
|
||||
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
|
||||
.await;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
task_mgr::spawn(
|
||||
@@ -39,15 +37,11 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, cancel: CancellationToken) {
|
||||
None,
|
||||
&format!("garbage collector for tenant {tenant_id}"),
|
||||
false,
|
||||
{
|
||||
let tenant = Arc::clone(tenant);
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
gc_loop(tenant, cancel)
|
||||
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
async move {
|
||||
gc_loop(tenant_id)
|
||||
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
|
||||
.await;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -55,16 +49,28 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, cancel: CancellationToken) {
|
||||
///
|
||||
/// Compaction task's main loop
|
||||
///
|
||||
async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
async fn compaction_loop(tenant_id: TenantId) {
|
||||
let wait_duration = Duration::from_secs(2);
|
||||
info!("starting");
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
async {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
|
||||
let mut first = true;
|
||||
loop {
|
||||
trace!("waking up");
|
||||
|
||||
let tenant = tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request");
|
||||
return;
|
||||
},
|
||||
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
|
||||
ControlFlow::Break(()) => return,
|
||||
ControlFlow::Continue(tenant) => tenant,
|
||||
},
|
||||
};
|
||||
|
||||
let period = tenant.get_compaction_period();
|
||||
|
||||
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
|
||||
@@ -113,19 +119,30 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
///
|
||||
/// GC task's main loop
|
||||
///
|
||||
async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
async fn gc_loop(tenant_id: TenantId) {
|
||||
let wait_duration = Duration::from_secs(2);
|
||||
info!("starting");
|
||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||
async {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
// GC might require downloading, to find the cutoff LSN that corresponds to the
|
||||
// cutoff specified as time.
|
||||
let ctx =
|
||||
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
let mut first = true;
|
||||
loop {
|
||||
trace!("waking up");
|
||||
|
||||
let tenant = tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
info!("received cancellation request");
|
||||
return;
|
||||
},
|
||||
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
|
||||
ControlFlow::Break(()) => return,
|
||||
ControlFlow::Continue(tenant) => tenant,
|
||||
},
|
||||
};
|
||||
|
||||
let period = tenant.get_gc_period();
|
||||
|
||||
if first {
|
||||
@@ -144,9 +161,7 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
Duration::from_secs(10)
|
||||
} else {
|
||||
// Run gc
|
||||
let res = tenant
|
||||
.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx)
|
||||
.await;
|
||||
let res = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx).await;
|
||||
if let Err(e) = res {
|
||||
error!("Gc failed, retrying in {:?}: {e:?}", wait_duration);
|
||||
wait_duration
|
||||
@@ -172,6 +187,49 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
trace!("GC loop stopped.");
|
||||
}
|
||||
|
||||
async fn wait_for_active_tenant(
|
||||
tenant_id: TenantId,
|
||||
wait: Duration,
|
||||
) -> ControlFlow<(), Arc<Tenant>> {
|
||||
let tenant = loop {
|
||||
match mgr::get_tenant(tenant_id, false).await {
|
||||
Ok(tenant) => break tenant,
|
||||
Err(e) => {
|
||||
error!("Failed to get a tenant {tenant_id}: {e:#}");
|
||||
tokio::time::sleep(wait).await;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// if the tenant has a proper status already, no need to wait for anything
|
||||
if tenant.current_state() == TenantState::Active {
|
||||
ControlFlow::Continue(tenant)
|
||||
} else {
|
||||
let mut tenant_state_updates = tenant.subscribe_for_state_updates();
|
||||
loop {
|
||||
match tenant_state_updates.changed().await {
|
||||
Ok(()) => {
|
||||
let new_state = &*tenant_state_updates.borrow();
|
||||
match new_state {
|
||||
TenantState::Active => {
|
||||
debug!("Tenant state changed to active, continuing the task loop");
|
||||
return ControlFlow::Continue(tenant);
|
||||
}
|
||||
state => {
|
||||
debug!("Not running the task loop, tenant is not active: {state:?}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_sender_dropped_error) => {
|
||||
info!("Tenant dropped the state updates sender, quitting waiting for tenant and the task loop");
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
#[error("cancelled")]
|
||||
pub(crate) struct Cancelled;
|
||||
|
||||
@@ -149,7 +149,7 @@ def top_output_dir(base_dir: Path) -> Iterator[Path]:
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]:
|
||||
versioned_dir = pg_distrib_dir / pg_version.v_prefixed
|
||||
versioned_dir = pg_distrib_dir / f"v{pg_version}"
|
||||
|
||||
psql_bin_path = versioned_dir / "bin/psql"
|
||||
postgres_bin_path = versioned_dir / "bin/postgres"
|
||||
@@ -1745,8 +1745,8 @@ class PgBin:
|
||||
def __init__(self, log_dir: Path, pg_distrib_dir: Path, pg_version: PgVersion):
|
||||
self.log_dir = log_dir
|
||||
self.pg_version = pg_version
|
||||
self.pg_bin_path = pg_distrib_dir / pg_version.v_prefixed / "bin"
|
||||
self.pg_lib_dir = pg_distrib_dir / pg_version.v_prefixed / "lib"
|
||||
self.pg_bin_path = pg_distrib_dir / f"v{pg_version}" / "bin"
|
||||
self.pg_lib_dir = pg_distrib_dir / f"v{pg_version}" / "lib"
|
||||
self.env = os.environ.copy()
|
||||
self.env["LD_LIBRARY_PATH"] = str(self.pg_lib_dir)
|
||||
|
||||
|
||||
@@ -149,16 +149,11 @@ class PageserverHttpClient(requests.Session):
|
||||
assert isinstance(res_json, list)
|
||||
return res_json
|
||||
|
||||
def tenant_create(
|
||||
self, new_tenant_id: Optional[TenantId] = None, conf: Optional[Dict[str, Any]] = None
|
||||
) -> TenantId:
|
||||
if conf is not None:
|
||||
assert "new_tenant_id" not in conf.keys()
|
||||
def tenant_create(self, new_tenant_id: Optional[TenantId] = None) -> TenantId:
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant",
|
||||
json={
|
||||
"new_tenant_id": str(new_tenant_id) if new_tenant_id else None,
|
||||
**(conf or {}),
|
||||
},
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -27,12 +27,6 @@ class PgVersion(str, enum.Enum):
|
||||
def __repr__(self) -> str:
|
||||
return f"'{self.value}'"
|
||||
|
||||
# In GitHub workflows we use Postgres version with v-prefix (e.g. v14 instead of just 14),
|
||||
# sometime we need to do so in tests.
|
||||
@property
|
||||
def v_prefixed(self) -> str:
|
||||
return f"v{self.value}"
|
||||
|
||||
@classmethod
|
||||
def _missing_(cls, value) -> Optional["PgVersion"]:
|
||||
known_values = {v.value for _, v in cls.__members__.items()}
|
||||
|
||||
@@ -16,7 +16,7 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.pg_version import PgVersion, skip_on_postgres
|
||||
from fixtures.types import Lsn
|
||||
from pytest import FixtureRequest
|
||||
|
||||
@@ -41,6 +41,7 @@ check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif(
|
||||
)
|
||||
|
||||
|
||||
@skip_on_postgres(PgVersion.V15, "Compatibility tests doesn't support Postgres 15 yet")
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.order(before="test_forward_compatibility")
|
||||
def test_create_snapshot(
|
||||
@@ -48,13 +49,12 @@ def test_create_snapshot(
|
||||
pg_bin: PgBin,
|
||||
top_output_dir: Path,
|
||||
test_output_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
):
|
||||
# The test doesn't really test anything
|
||||
# it creates a new snapshot for releases after we tested the current version against the previous snapshot in `test_backward_compatibility`.
|
||||
#
|
||||
# There's no cleanup here, it allows to adjust the data in `test_backward_compatibility` itself without re-collecting it.
|
||||
neon_env_builder.pg_version = pg_version
|
||||
neon_env_builder.pg_version = PgVersion.V14
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
neon_env_builder.preserve_database_files = True
|
||||
@@ -90,14 +90,13 @@ def test_create_snapshot(
|
||||
env.pageserver.stop()
|
||||
|
||||
# Directory `compatibility_snapshot_dir` is uploaded to S3 in a workflow, keep the name in sync with it
|
||||
compatibility_snapshot_dir = (
|
||||
top_output_dir / f"compatibility_snapshot_pg{pg_version.v_prefixed}"
|
||||
)
|
||||
compatibility_snapshot_dir = top_output_dir / "compatibility_snapshot_pg14"
|
||||
if compatibility_snapshot_dir.exists():
|
||||
shutil.rmtree(compatibility_snapshot_dir)
|
||||
shutil.copytree(test_output_dir, compatibility_snapshot_dir)
|
||||
|
||||
|
||||
@skip_on_postgres(PgVersion.V15, "Compatibility tests doesn't support Postgres 15 yet")
|
||||
@check_ondisk_data_compatibility_if_enabled
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.order(after="test_create_snapshot")
|
||||
@@ -116,7 +115,7 @@ def test_backward_compatibility(
|
||||
compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR")
|
||||
assert (
|
||||
compatibility_snapshot_dir_env is not None
|
||||
), f"COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg{pg_version.v_prefixed}` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)"
|
||||
), "COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg14` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)"
|
||||
compatibility_snapshot_dir = Path(compatibility_snapshot_dir_env).resolve()
|
||||
|
||||
breaking_changes_allowed = (
|
||||
@@ -156,6 +155,7 @@ def test_backward_compatibility(
|
||||
), "Breaking changes are allowed by ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
|
||||
|
||||
|
||||
@skip_on_postgres(PgVersion.V15, "Compatibility tests doesn't support Postgres 15 yet")
|
||||
@check_ondisk_data_compatibility_if_enabled
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.order(after="test_create_snapshot")
|
||||
@@ -183,9 +183,7 @@ def test_forward_compatibility(
|
||||
), "COMPATIBILITY_POSTGRES_DISTRIB_DIR is not set. It should be set to a pg_install directrory (ideally generated by the previous version of Neon)"
|
||||
compatibility_postgres_distrib_dir = Path(compatibility_postgres_distrib_dir_env).resolve()
|
||||
|
||||
compatibility_snapshot_dir = (
|
||||
top_output_dir / f"compatibility_snapshot_pg{pg_version.v_prefixed}"
|
||||
)
|
||||
compatibility_snapshot_dir = top_output_dir / "compatibility_snapshot_pg14"
|
||||
|
||||
breaking_changes_allowed = (
|
||||
os.environ.get("ALLOW_FORWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"
|
||||
|
||||
@@ -118,11 +118,6 @@ class EvictionEnv:
|
||||
|
||||
wait_until(10, 1, statvfs_called)
|
||||
|
||||
# these can sometimes happen during startup before any tenants have been
|
||||
# loaded, so nothing can be evicted, we just wait for next iteration which
|
||||
# is able to evict.
|
||||
self.neon_env.pageserver.allowed_errors.append(".*WARN.* disk usage still high.*")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> EvictionEnv:
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.pg_version import PgVersion, xfail_on_postgres
|
||||
|
||||
|
||||
@xfail_on_postgres(PgVersion.V15, reason="https://github.com/neondatabase/neon/pull/4182")
|
||||
@pytest.mark.timeout(1800)
|
||||
def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
@@ -5,6 +5,7 @@ from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
from fixtures.pg_version import PgVersion, xfail_on_postgres
|
||||
|
||||
|
||||
# Run the main PostgreSQL regression tests, in src/test/regress.
|
||||
@@ -32,8 +33,8 @@ def test_pg_regress(
|
||||
(runpath / "testtablespace").mkdir(parents=True)
|
||||
|
||||
# Compute all the file locations that pg_regress will need.
|
||||
build_path = pg_distrib_dir / f"build/{env.pg_version.v_prefixed}/src/test/regress"
|
||||
src_path = base_dir / f"vendor/postgres-{env.pg_version.v_prefixed}/src/test/regress"
|
||||
build_path = pg_distrib_dir / f"build/v{env.pg_version}/src/test/regress"
|
||||
src_path = base_dir / f"vendor/postgres-v{env.pg_version}/src/test/regress"
|
||||
bindir = pg_distrib_dir / f"v{env.pg_version}/bin"
|
||||
schedule = src_path / "parallel_schedule"
|
||||
pg_regress = build_path / "pg_regress"
|
||||
@@ -71,6 +72,7 @@ def test_pg_regress(
|
||||
#
|
||||
# This runs for a long time, especially in debug mode, so use a larger-than-default
|
||||
# timeout.
|
||||
@xfail_on_postgres(PgVersion.V15, reason="https://github.com/neondatabase/neon/pull/4213")
|
||||
@pytest.mark.timeout(1800)
|
||||
def test_isolation(
|
||||
neon_simple_env: NeonEnv,
|
||||
@@ -95,8 +97,8 @@ def test_isolation(
|
||||
(runpath / "testtablespace").mkdir(parents=True)
|
||||
|
||||
# Compute all the file locations that pg_isolation_regress will need.
|
||||
build_path = pg_distrib_dir / f"build/{env.pg_version.v_prefixed}/src/test/isolation"
|
||||
src_path = base_dir / f"vendor/postgres-{env.pg_version.v_prefixed}/src/test/isolation"
|
||||
build_path = pg_distrib_dir / f"build/v{env.pg_version}/src/test/isolation"
|
||||
src_path = base_dir / f"vendor/postgres-v{env.pg_version}/src/test/isolation"
|
||||
bindir = pg_distrib_dir / f"v{env.pg_version}/bin"
|
||||
schedule = src_path / "isolation_schedule"
|
||||
pg_isolation_regress = build_path / "pg_isolation_regress"
|
||||
|
||||
Reference in New Issue
Block a user