Compare commits

...

27 Commits

Author SHA1 Message Date
Alek Westover
79756c90e4 simplify test, fix a bit, it still breaks locally, probably due to create extension error 2023-08-14 09:59:34 -04:00
Alek Westover
4c36e4969e refactor tests to not mutate pgbin and to be determinstic by using a simpler extension to test 2023-08-11 14:13:30 -04:00
Arpad Müller
9ffccb55f1 InMemoryLayer: move end_lsn out of the lock (#4963)
## Problem

In some places, the lock on `InMemoryLayerInner` is only created to
obtain `end_lsn`. This is not needed however, if we move `end_lsn` to
`InMemoryLayer` instead.

## Summary of changes

Make `end_lsn` a member of `InMemoryLayer`, and do less locking of
`InMemoryLayerInner`. `end_lsn` is changed from `Option<Lsn>` into an
`OnceLock<Lsn>`. Thanks to this change, we don't need to lock any more
in three functions.

Part of #4743 . Suggested in
https://github.com/neondatabase/neon/pull/4905#issuecomment-1666458428 .
2023-08-11 18:01:02 +02:00
Arthur Petukhovsky
3a6b99f03c proxy: improve http logs (#4976)
Fix multiline logs on websocket errors and always print sql-over-http
errors sent to the user.
2023-08-11 18:18:07 +03:00
Dmitry Rodionov
d39fd66773 tests: remove redundant wait_while (#4952)
Remove redundant `wait_while` in tests. It had only one usage. Use
`wait_tenant_status404`.

Related:
https://github.com/neondatabase/neon/pull/4855#discussion_r1289610641
2023-08-11 10:18:13 +03:00
Arthur Petukhovsky
73d7a9bc6e proxy: propagate ws span (#4966)
Found this log on staging:
```
2023-08-10T17:42:58.573790Z  INFO handling interactive connection from client protocol="ws"
```

We seem to be losing websocket span in spawn, this patch fixes it.
2023-08-10 23:38:22 +03:00
Sasha Krassovsky
3a71cf38c1 Grant BypassRLS to new neon_superuser roles (#4935) 2023-08-10 21:04:45 +02:00
Conrad Ludgate
25c66dc635 proxy: http logging to 11 (#4950)
## Problem

Mysterious network issues

## Summary of changes

Log a lot more about HTTP/DNS in hopes of detecting more of the network
errors
2023-08-10 17:49:24 +01:00
George MacKerron
538373019a Increase max sql-over-http response size from 1MB to 10MB (#4961)
## Problem

1MB response limit is very small.

## Summary of changes

This data is not yet tracked, so we shoudn't raise the limit too high yet. 
But as discussed with @kelvich and @conradludgate, this PR lifts it to 
10MB, and adds also details of the limit to the error response.
2023-08-10 17:21:52 +01:00
Dmitry Rodionov
c58b22bacb Delete tenant's data from s3 (#4855)
## Summary of changes

For context see
https://github.com/neondatabase/neon/blob/main/docs/rfcs/022-pageserver-delete-from-s3.md

Create Flow to delete tenant's data from pageserver. The approach
heavily mimics previously implemented timeline deletion implemented
mostly in https://github.com/neondatabase/neon/pull/4384 and followed up
in https://github.com/neondatabase/neon/pull/4552

For remaining deletion related issues consult with deletion project
here: https://github.com/orgs/neondatabase/projects/33

resolves #4250
resolves https://github.com/neondatabase/neon/issues/3889

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-08-10 18:53:16 +03:00
Alek Westover
17aea78aa7 delete already present files from library index (#4955) 2023-08-10 16:51:16 +03:00
Joonas Koivunen
71f9d9e5a3 test: allow slow shutdown warning (#4953)
Introduced in #4886, did not consider that tests with real_s3 could
sometimes go over the limit. Do not fail tests because of that.
2023-08-10 15:55:41 +03:00
Alek Westover
119b86480f test: make pg_regress less flaky, hopefully (#4903)
`pg_regress` is flaky: https://github.com/neondatabase/neon/issues/559

Consolidated `CHECKPOINT` to `check_restored_datadir_content`, add a
wait for `wait_for_last_flush_lsn`.

Some recently introduced flakyness was fixed with #4948.

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-08-10 15:24:43 +03:00
Arpad Müller
fa1f87b268 Make the implementation of DiskBtreeReader::visit non-recursive (#4884)
## Problem

The `DiskBtreeReader::visit` function calls `read_blk` internally, and
while #4863 converted the API of `visit` to async, the internal function
is still recursive. So, analogously to #4838, we turn the recursive
function into an iterative one.

## Summary of changes

First, we prepare the change by moving the for loop outside of the case
switch, so that we only have one loop that calls recursion. Then, we
switch from using recursion to an approach where we store the search
path inside the tree on a stack on the heap.

The caller of the `visit` function can control when the search over the
B-Tree ends, by returning `false` from the closure. This is often used
to either only find one specific entry (by always returning `false`),
but it is also used to iterate over all entries of the B-tree (by always
returning `true`), or to look for ranges (mostly in tests, but
`get_value_reconstruct_data` also has such a use).

Each stack entry contains two things: the block number (aka the block's
offset), and a children iterator. The children iterator is constructed
depending on the search direction, and with the results of a binary
search over node's children list. It is the only thing that survives a
spilling/push to the stack, everything else is reconstructed. In other
words, each stack spill, will, if the search is still ongoing, cause an
entire re-parsing of the node. Theoretically, this would be a linear
overhead in the number of leaves the search visits. However, one needs
to note:

* the workloads to look for a specific entry are just visiting one leaf,
ever, so this is mostly about workloads that visit larger ranges,
including ones that visit the entire B-tree.
* the requests first hit the page cache, so often the cost is just in
terms of node deserialization
* for nodes that only have leaf nodes as children, no spilling to the
stack-on-heap happens (outside of the initial request where the iterator
is `None`). In other words, for balanced trees, the spilling overhead is
$\Theta\left(\frac{n}{b^2}\right)$, where `b` is the branching factor
and `n` is the number of nodes in the tree. The B-Trees in the current
implementation have a branching factor of roughly `PAGE_SZ/L` where
`PAGE_SZ` is 8192, and `L` is `DELTA_KEY_SIZE = 26` or `KEY_SIZE = 18`
in production code, so this gives us an estimate that we'd be re-loading
an inner node for every 99000 leaves in the B-tree in the worst case.

Due to these points above, I'd say that not fully caching the inner
nodes with inner children is reasonable, especially as we also want to
be fast for the "find one specific entry" workloads, where the stack
content is never accessed: any action to make the spilling
computationally more complex would contribute to wasted cycles here,
even if these workloads "only" spill one node for each depth level of
the b-tree (which is practically always a low single-digit number,
Kleppmann points out on page 81 that for branching factor 500, a four
level B-tree with 4 KB pages can store 250 TB of data).

But disclaimer, this is all stuff I thought about in my head, I have not
confirmed it with any benchmarks or data.

Builds on top of #4863, part of #4743
2023-08-10 13:43:13 +02:00
Joonas Koivunen
db48f7e40d test: mark test_download_extensions.py skipped for now (#4948)
The test mutates a shared directory which does not work with multiple
concurrent tests. It is being fixed, so this should be a very temporary
band-aid.

Cc: #4949.
2023-08-10 11:05:27 +00:00
Alek Westover
e157b16c24 if control file already exists ignore the remote version of the extension (#4945) 2023-08-09 18:56:09 +00:00
bojanserafimov
94ad9204bb Measure compute-pageserver latency (#4901)
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-08-09 13:20:30 -04:00
Joonas Koivunen
c8aed107c5 refactor: make {Delta,Image}LayerInners usable without {Delta,Image}Layer (#4937)
On the quest of #4745, these are more related to the task at hand, but
still small. In addition to $subject, allow
`ValueRef<ResidentDeltaLayer>`.
2023-08-09 19:18:44 +03:00
Anastasia Lubennikova
da128a509a fix pkglibdir path for remote extensions 2023-08-09 19:13:11 +03:00
Alexander Bayandin
5993b2bedc test_runner: remove excessive timeouts (#4659)
## Problem

For some tests, we override the default timeout (300s / 5m) with a larger
values like 600s / 10m or even 1800s / 30m, even if it's not required.
I've collected some statistics (for the last 60 days) for tests
duration:

| test | max (s) | p99 (s) | p50 (s) | count |

|-----------------------------------|---------|---------|---------|-------|
| test_hot_standby | 9 | 2 | 2 | 5319 |
| test_import_from_vanilla | 16 | 9 | 6 | 5692 |
| test_import_from_pageserver_small | 37 | 7 | 5 | 5719 |
| test_pg_regress | 101 | 73 | 44 | 5642 |
| test_isolation | 65 | 56 | 39 | 5692 |

A couple of tests that I left with custom 600s / 10m timeout.

| test | max (s) | p99 (s) | p50 (s) | count |

|-----------------------------------|---------|---------|---------|-------|
| test_gc_cutoff | 456 | 224 | 109 | 5694 |
| test_pageserver_chaos | 528 | 267 | 121 | 5712 |

## Summary of changes
- Remove `@pytest.mark.timeout` annotation from several tests
2023-08-09 16:27:53 +01:00
Anastasia Lubennikova
4ce7aa9ffe Fix extensions download error handling (#4941)
Don't panic if library or extension is not found in remote extension storage 
or download has failed. Instead, log the error and proceed - if file is not 
present locally as well, postgres will fail with postgres error.  If it is a 
shared_preload_library, it won't start, because of bad config. Otherwise, it 
will just fail to run the SQL function/ command that needs the library. 

Also, don't try to download extensions if remote storage is not configured.
2023-08-09 15:37:51 +03:00
Joonas Koivunen
cbd04f5140 remove_remote_layer: uninteresting refactorings (#4936)
In the quest to solve #4745 by moving the download/evictedness to be
internally mutable factor of a Layer and get rid of `trait
PersistentLayer` at least for prod usage, `layer_removal_cs`, we present
some misc cleanups.

---------

Co-authored-by: Dmitry Rodionov <dmitry@neon.tech>
2023-08-09 14:35:56 +03:00
Arpad Müller
1037a8ddd9 Explain why VirtualFile stores tenant_id and timeline_id as strings (#4930)
## Problem

One might wonder why the code here doesn't use `TimelineId` or
`TenantId`. I originally had a refactor to use them, but then discarded
it, because converting to strings on each time there is a read or write
is wasteful.

## Summary of changes

We add some docs explaining why here no `TimelineId` or `TenantId` is
being used.
2023-08-08 23:41:09 +02:00
Felix Prasanna
6661f4fd44 bump vm-builder version to v0.15.0-alpha1 (#4934) 2023-08-08 15:22:10 -05:00
Alexander Bayandin
b9f84b9609 Improve test results format (#4549)
## Problem

The current test history format is a bit inconvenient:
- It stores all test results in one row, so all queries should include
subqueries which expand the test 
- It includes duplicated test results if the rerun is triggered manually
for one of the test jobs (for example, if we rerun `debug-pg14`, then
the report will include duplicates for other build types/postgres
versions)
- It doesn't have a reference to run_id, which we use to create a link
to allure report

Here's the proposed new format:
```
    id           BIGSERIAL PRIMARY KEY,
    parent_suite TEXT NOT NULL,
    suite        TEXT NOT NULL,
    name         TEXT NOT NULL,
    status       TEXT NOT NULL,
    started_at   TIMESTAMPTZ NOT NULL,
    stopped_at   TIMESTAMPTZ NOT NULL,
    duration     INT NOT NULL,
    flaky        BOOLEAN NOT NULL,
    build_type   TEXT NOT NULL,
    pg_version   INT NOT NULL,
    run_id       BIGINT NOT NULL,
    run_attempt  INT NOT NULL,
    reference    TEXT NOT NULL,
    revision     CHAR(40) NOT NULL,
    raw          JSONB COMPRESSION lz4 NOT NULL,
```

## Summary of changes
- Misc allure changes:
  - Update allure to 2.23.1
- Delete files from previous runs in HTML report (by using `sync
--delete` instead of `mv`)
- Use `test-cases/*.json` instead of `suites.json`, using this directory
allows us to catch all reruns.
- Until we migrated `scripts/flaky_tests.py` and
`scripts/benchmark_durations.py` store test results in 2 formats (in 2
different databases).
2023-08-08 20:09:38 +01:00
Felix Prasanna
459253879e Revert "bump vm-builder to v0.15.0-alpha1 (#4895)" (#4931)
This reverts commit 682dfb3a31.
2023-08-08 20:21:39 +03:00
Conrad Ludgate
0fa85aa08e proxy: delay auth on retry (#4929)
## Problem

When an endpoint is shutting down, it can take a few seconds. Currently
when starting a new compute, this causes an "endpoint is in transition"
error. We need to add delays before retrying to ensure that we allow
time for the endpoint to shutdown properly.

## Summary of changes

Adds a delay before retrying in auth. connect_to_compute already has
this delay
2023-08-08 17:19:24 +03:00
60 changed files with 2461 additions and 1012 deletions

View File

@@ -2,6 +2,12 @@ name: 'Create Allure report'
description: 'Generate Allure report from uploaded by actions/allure-report-store tests results'
outputs:
base-url:
description: 'Base URL for Allure report'
value: ${{ steps.generate-report.outputs.base-url }}
base-s3-url:
description: 'Base S3 URL for Allure report'
value: ${{ steps.generate-report.outputs.base-s3-url }}
report-url:
description: 'Allure report URL'
value: ${{ steps.generate-report.outputs.report-url }}
@@ -63,8 +69,8 @@ runs:
rm -f ${ALLURE_ZIP}
fi
env:
ALLURE_VERSION: 2.22.1
ALLURE_ZIP_SHA256: fdc7a62d94b14c5e0bf25198ae1feded6b005fdbed864b4d3cb4e5e901720b0b
ALLURE_VERSION: 2.23.1
ALLURE_ZIP_SHA256: 11141bfe727504b3fd80c0f9801eb317407fd0ac983ebb57e671f14bac4bcd86
# Potentially we could have several running build for the same key (for example, for the main branch), so we use improvised lock for this
- name: Acquire lock
@@ -102,6 +108,11 @@ runs:
REPORT_PREFIX=reports/${BRANCH_OR_PR}
RAW_PREFIX=reports-raw/${BRANCH_OR_PR}/${GITHUB_RUN_ID}
BASE_URL=https://${BUCKET}.s3.amazonaws.com/${REPORT_PREFIX}/${GITHUB_RUN_ID}
BASE_S3_URL=s3://${BUCKET}/${REPORT_PREFIX}/${GITHUB_RUN_ID}
REPORT_URL=${BASE_URL}/index.html
REPORT_JSON_URL=${BASE_URL}/data/suites.json
# Get previously uploaded data for this run
ZSTD_NBTHREADS=0
@@ -110,10 +121,9 @@ runs:
# There's no previously uploaded data for this $GITHUB_RUN_ID
exit 0
fi
for S3_FILEPATH in ${S3_FILEPATHS}; do
time aws s3 cp --only-show-errors "s3://${BUCKET}/${S3_FILEPATH}" "${WORKDIR}"
archive=${WORKDIR}/$(basename $S3_FILEPATH)
time aws s3 cp --recursive --only-show-errors "s3://${BUCKET}/${RAW_PREFIX}/" "${WORKDIR}/"
for archive in $(find ${WORKDIR} -name "*.tar.zst"); do
mkdir -p ${archive%.tar.zst}
time tar -xf ${archive} -C ${archive%.tar.zst}
rm -f ${archive}
@@ -129,10 +139,9 @@ runs:
sed -i 's|<a href="." class=|<a href="https://'${BUCKET}'.s3.amazonaws.com/'${REPORT_PREFIX}'/latest/index.html?nocache='"'+Date.now()+'"'" class=|g' ${WORKDIR}/report/app.js
# Upload a history and the final report (in this particular order to not to have duplicated history in 2 places)
# Use sync for the final report to delete files from previous runs
time aws s3 mv --recursive --only-show-errors "${WORKDIR}/report/history" "s3://${BUCKET}/${REPORT_PREFIX}/latest/history"
time aws s3 mv --recursive --only-show-errors "${WORKDIR}/report" "s3://${BUCKET}/${REPORT_PREFIX}/${GITHUB_RUN_ID}"
REPORT_URL=https://${BUCKET}.s3.amazonaws.com/${REPORT_PREFIX}/${GITHUB_RUN_ID}/index.html
time aws s3 sync --delete --only-show-errors "${WORKDIR}/report" "s3://${BUCKET}/${REPORT_PREFIX}/${GITHUB_RUN_ID}"
# Generate redirect
cat <<EOF > ${WORKDIR}/index.html
@@ -144,8 +153,10 @@ runs:
EOF
time aws s3 cp --only-show-errors ${WORKDIR}/index.html "s3://${BUCKET}/${REPORT_PREFIX}/latest/index.html"
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
echo "report-json-url=${REPORT_URL%/index.html}/data/suites.json" >> $GITHUB_OUTPUT
echo "base-url=${BASE_URL}" >> $GITHUB_OUTPUT
echo "base-s3-url=${BASE_S3_URL}" >> $GITHUB_OUTPUT
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
echo "report-json-url=${REPORT_JSON_URL}" >> $GITHUB_OUTPUT
echo "[Allure Report](${REPORT_URL})" >> ${GITHUB_STEP_SUMMARY}

View File

@@ -31,7 +31,7 @@ runs:
BUCKET=neon-github-public-dev
FILENAME=$(basename $ARCHIVE)
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX%$GITHUB_RUN_ATTEMPT} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX%$GITHUB_RUN_ATTEMPT} | jq -r '.Contents[]?.Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
if [ -z "${S3_KEY}" ]; then
if [ "${SKIP_IF_DOES_NOT_EXIST}" = "true" ]; then
echo 'SKIPPED=true' >> $GITHUB_OUTPUT

View File

@@ -471,6 +471,26 @@ jobs:
--build-type ${BUILD_TYPE} \
--ingest suites.json
- name: Store Allure test stat in the DB (new)
if: ${{ !cancelled() && steps.create-allure-report.outputs.report-json-url }}
env:
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
REPORT_JSON_URL: ${{ steps.create-allure-report.outputs.report-json-url }}
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
BASE_S3_URL: ${{ steps.create-allure-report.outputs.base-s3-url }}
run: |
aws s3 cp --only-show-errors --recursive ${BASE_S3_URL}/data/test-cases ./test-cases
./scripts/pysync
export DATABASE_URL="$TEST_RESULT_CONNSTR"
poetry run python3 scripts/ingest_regress_test_result-new-format.py \
--reference ${GITHUB_REF} \
--revision ${COMMIT_SHA} \
--run-id ${GITHUB_RUN_ID} \
--run-attempt ${GITHUB_RUN_ATTEMPT} \
--test-cases-dir ./test-cases
coverage-report:
runs-on: [ self-hosted, gen3, small ]
container:
@@ -1067,7 +1087,7 @@ jobs:
OLD_PREFIX=artifacts/${GITHUB_RUN_ID}
FILENAME=neon-${{ runner.os }}-${build_type}-artifact.tar.zst
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${OLD_PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${OLD_PREFIX} | jq -r '.Contents[]?.Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
if [ -z "${S3_KEY}" ]; then
echo >&2 "Neither s3://${BUCKET}/${OLD_PREFIX}/${FILENAME} nor its version from previous attempts exist"
exit 1

View File

@@ -58,16 +58,12 @@ use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;
// this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "5670669815";
fn main() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
let build_tag = option_env!("BUILD_TAG")
.unwrap_or(BUILD_TAG_DEFAULT)
.to_string();
// if there is no build tag environment variable, use a "default value"
// the use-case for this "default value" is regression tests that don't require BUILD_TAG
let build_tag = option_env!("BUILD_TAG").unwrap_or("5670669815").to_string();
info!("build_tag: {build_tag}");
let matches = cli().get_matches();

View File

@@ -6,6 +6,7 @@ use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::{Condvar, Mutex, OnceLock, RwLock};
use std::time::Instant;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
@@ -24,7 +25,7 @@ use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeMode, ComputeSpec};
use utils::measured_stream::MeasuredReader;
use remote_storage::{GenericRemoteStorage, RemotePath};
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
use crate::pg_helpers::*;
use crate::spec::*;
@@ -285,7 +286,7 @@ impl ComputeNode {
#[instrument(skip_all, fields(%lsn))]
fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let start_time = Utc::now();
let start_time = Instant::now();
let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
@@ -298,7 +299,10 @@ impl ComputeNode {
info!("Storage auth token not set");
}
// Connect to pageserver
let mut client = config.connect(NoTls)?;
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
let basebackup_cmd = match lsn {
// HACK We don't use compression on first start (Lsn(0)) because there's no API for it
Lsn(0) => format!("basebackup {} {}", spec.tenant_id, spec.timeline_id),
@@ -344,13 +348,10 @@ impl ComputeNode {
};
// Report metrics
self.state.lock().unwrap().metrics.basebackup_bytes =
measured_reader.get_byte_count() as u64;
self.state.lock().unwrap().metrics.basebackup_ms = Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64;
let mut state = self.state.lock().unwrap();
state.metrics.pageserver_connect_micros = pageserver_connect_micros;
state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
Ok(())
}
@@ -925,6 +926,7 @@ LIMIT 100",
let spec = &pspec.spec;
let custom_ext = spec.custom_extensions.clone().unwrap_or(Vec::new());
info!("custom extensions: {:?}", &custom_ext);
let (ext_remote_paths, library_index) = extension_server::get_available_extensions(
ext_remote_storage,
&self.pgbin,
@@ -944,94 +946,113 @@ LIMIT 100",
}
// download an archive, unzip and place files in correct locations
pub async fn download_extension(&self, ext_name: &str, is_library: bool) -> Result<u64> {
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
let mut real_ext_name = ext_name.to_string();
if is_library {
// sometimes library names might have a suffix like
// library.so or library.so.3. We strip this off
// because library_index is based on the name without the file extension
let strip_lib_suffix = Regex::new(r"\.so.*").unwrap();
let lib_raw_name = strip_lib_suffix.replace(&real_ext_name, "").to_string();
real_ext_name = self
.library_index
.get()
.expect("must have already downloaded the library_index")[&lib_raw_name]
.clone();
}
pub async fn download_extension(
&self,
ext_name: &str,
is_library: bool,
) -> Result<u64, DownloadError> {
let remote_storage = self
.ext_remote_storage
.as_ref()
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
"Remote extensions storage is not configured",
)))?;
let ext_path = &self
.ext_remote_paths
.get()
.expect("error accessing ext_remote_paths")[&real_ext_name];
let ext_archive_name = ext_path.object_name().expect("bad path");
let mut real_ext_name = ext_name;
if is_library {
// sometimes library names might have a suffix like
// library.so or library.so.3. We strip this off
// because library_index is based on the name without the file extension
let strip_lib_suffix = Regex::new(r"\.so.*").unwrap();
let lib_raw_name = strip_lib_suffix.replace(real_ext_name, "").to_string();
let mut first_try = false;
if !self
.ext_download_progress
.read()
.expect("lock err")
.contains_key(ext_archive_name)
{
self.ext_download_progress
.write()
.expect("lock err")
.insert(ext_archive_name.to_string(), (Utc::now(), false));
first_try = true;
}
let (download_start, download_completed) =
self.ext_download_progress.read().expect("lock err")[ext_archive_name];
let start_time_delta = Utc::now()
.signed_duration_since(download_start)
.to_std()
.unwrap()
.as_millis() as u64;
// how long to wait for extension download if it was started by another process
const HANG_TIMEOUT: u64 = 3000; // milliseconds
if download_completed {
info!("extension already downloaded, skipping re-download");
return Ok(0);
} else if start_time_delta < HANG_TIMEOUT && !first_try {
info!("download {ext_archive_name} already started by another process, hanging untill completion or timeout");
let mut interval =
tokio::time::interval(tokio::time::Duration::from_millis(500));
loop {
info!("waiting for download");
interval.tick().await;
let (_, download_completed_now) =
self.ext_download_progress.read().expect("lock")[ext_archive_name];
if download_completed_now {
info!("download finished by whoever else downloaded it");
return Ok(0);
}
}
// NOTE: the above loop will get terminated
// based on the timeout of the download function
}
// if extension hasn't been downloaded before or the previous
// attempt to download was at least HANG_TIMEOUT ms ago
// then we try to download it here
info!("downloading new extension {ext_archive_name}");
let download_size = extension_server::download_extension(
&real_ext_name,
ext_path,
remote_storage,
&self.pgbin,
)
.await;
self.ext_download_progress
.write()
.expect("bad lock")
.insert(ext_archive_name.to_string(), (download_start, true));
download_size
}
real_ext_name = self
.library_index
.get()
.expect("must have already downloaded the library_index")
.get(&lib_raw_name)
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
"library {} is not found",
lib_raw_name
)))?;
}
let ext_path = &self
.ext_remote_paths
.get()
.expect("error accessing ext_remote_paths")
.get(real_ext_name)
.ok_or(DownloadError::BadInput(anyhow::anyhow!(
"real_ext_name {} is not found",
real_ext_name
)))?;
let ext_archive_name = ext_path.object_name().expect("bad path");
let mut first_try = false;
if !self
.ext_download_progress
.read()
.expect("lock err")
.contains_key(ext_archive_name)
{
self.ext_download_progress
.write()
.expect("lock err")
.insert(ext_archive_name.to_string(), (Utc::now(), false));
first_try = true;
}
let (download_start, download_completed) =
self.ext_download_progress.read().expect("lock err")[ext_archive_name];
let start_time_delta = Utc::now()
.signed_duration_since(download_start)
.to_std()
.unwrap()
.as_millis() as u64;
// how long to wait for extension download if it was started by another process
const HANG_TIMEOUT: u64 = 3000; // milliseconds
if download_completed {
info!("extension already downloaded, skipping re-download");
return Ok(0);
} else if start_time_delta < HANG_TIMEOUT && !first_try {
info!("download {ext_archive_name} already started by another process, hanging untill completion or timeout");
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
loop {
info!("waiting for download");
interval.tick().await;
let (_, download_completed_now) =
self.ext_download_progress.read().expect("lock")[ext_archive_name];
if download_completed_now {
info!("download finished by whoever else downloaded it");
return Ok(0);
}
}
// NOTE: the above loop will get terminated
// based on the timeout of the download function
}
// if extension hasn't been downloaded before or the previous
// attempt to download was at least HANG_TIMEOUT ms ago
// then we try to download it here
info!("downloading new extension {ext_archive_name}");
let download_size = extension_server::download_extension(
real_ext_name,
ext_path,
remote_storage,
&self.pgbin,
)
.await
.map_err(DownloadError::Other);
self.ext_download_progress
.write()
.expect("bad lock")
.insert(ext_archive_name.to_string(), (download_start, true));
download_size
}
#[tokio::main]
@@ -1090,7 +1111,17 @@ LIMIT 100",
.as_millis() as u64;
info!("Prepare extensions took {prep_ext_time_delta}ms");
// Don't try to download libraries that are not in the index.
// Assume that they are already present locally.
libs_vec.retain(|lib| {
self.library_index
.get()
.expect("error accessing ext_remote_paths")
.contains_key(lib)
});
info!("Downloading to shared preload libraries: {:?}", &libs_vec);
let mut download_tasks = Vec::new();
for library in &libs_vec {
download_tasks.push(self.download_extension(library, true));
@@ -1104,8 +1135,19 @@ LIMIT 100",
prep_extensions_ms: prep_ext_time_delta,
};
for result in results {
let download_size = result?;
remote_ext_metrics.num_ext_downloaded += 1;
let download_size = match result {
Ok(res) => {
remote_ext_metrics.num_ext_downloaded += 1;
res
}
Err(err) => {
// if we failed to download an extension, we don't want to fail the whole
// process, but we do want to log the error
error!("Failed to download extension: {}", err);
0
}
};
remote_ext_metrics.largest_ext_size =
std::cmp::max(remote_ext_metrics.largest_ext_size, download_size);
remote_ext_metrics.total_ext_download_size += download_size;

View File

@@ -156,7 +156,7 @@ pub async fn get_available_extensions(
let ext_index_full = serde_json::from_slice::<Index>(&ext_idx_buffer)?;
let mut enabled_extensions = ext_index_full.public_extensions;
enabled_extensions.extend_from_slice(custom_extensions);
let library_index = ext_index_full.library_index;
let mut library_index = ext_index_full.library_index;
let all_extension_data = ext_index_full.extension_data;
info!("library_index: {:?}", library_index);
@@ -169,13 +169,19 @@ pub async fn get_available_extensions(
let extension_name = control_file
.strip_suffix(".control")
.expect("control files must end in .control");
ext_remote_paths.insert(
extension_name.to_string(),
RemotePath::from_string(&ext_data.archive_path)?,
);
let control_path = local_sharedir.join(control_file);
info!("writing file {:?}{:?}", control_path, control_contents);
file_create_tasks.push(tokio::fs::write(control_path, control_contents));
if !control_path.exists() {
ext_remote_paths.insert(
extension_name.to_string(),
RemotePath::from_string(&ext_data.archive_path)?,
);
info!("writing file {:?}{:?}", control_path, control_contents);
file_create_tasks.push(tokio::fs::write(control_path, control_contents));
} else {
warn!("control file {:?} exists both locally and remotely. ignoring the remote version.", control_file);
// also delete this from library index
library_index.retain(|_, value| value != extension_name);
}
}
}
let results = join_all(file_create_tasks).await;
@@ -222,7 +228,7 @@ pub async fn download_extension(
);
let libdir_paths = (
unzip_dest.to_string() + "/lib",
Path::new(&get_pg_config("--libdir", pgbin)).join("postgresql"),
Path::new(&get_pg_config("--pkglibdir", pgbin)).to_path_buf(),
);
// move contents of the libdir / sharedir in unzipped archive to the correct local paths
for paths in [sharedir_paths, libdir_paths] {

View File

@@ -141,6 +141,15 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
let filename = route.split('/').last().unwrap().to_string();
info!("serving /extension_server POST request, filename: {filename:?} is_library: {is_library}");
// don't even try to download extensions
// if no remote storage is configured
if compute.ext_remote_storage.is_none() {
info!("no extensions remote storage configured");
let mut resp = Response::new(Body::from("no remote storage configured"));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
return resp;
}
match compute.download_extension(&filename, is_library).await {
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {

View File

@@ -270,7 +270,7 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
}
RoleAction::Create => {
let mut query: String = format!(
"CREATE ROLE {} CREATEROLE CREATEDB IN ROLE neon_superuser",
"CREATE ROLE {} CREATEROLE CREATEDB BYPASSRLS IN ROLE neon_superuser",
name.pg_quote()
);
info!("role create query: '{}'", &query);

View File

@@ -68,13 +68,40 @@ where
/// Response of the /metrics.json API
#[derive(Clone, Debug, Default, Serialize)]
pub struct ComputeMetrics {
/// Time spent waiting in pool
pub wait_for_spec_ms: u64,
pub sync_safekeepers_ms: u64,
/// Time spent checking if safekeepers are synced
pub sync_sk_check_ms: u64,
/// Time spent syncing safekeepers (walproposer.c).
/// In most cases this should be zero.
pub sync_safekeepers_ms: u64,
/// Time it took to establish a pg connection to the pageserver.
/// This is two roundtrips, so it's a good proxy for compute-pageserver
/// latency. The latency is usually 0.2ms, but it's not safe to assume
/// that.
pub pageserver_connect_micros: u64,
/// Time to get basebackup from pageserver and write it to disk.
pub basebackup_ms: u64,
/// Compressed size of basebackup received.
pub basebackup_bytes: u64,
/// Time spent starting potgres. This includes initialization of shared
/// buffers, preloading extensions, and other pg operations.
pub start_postgres_ms: u64,
/// Time spent applying pg catalog updates that were made in the console
/// UI. This should be 0 when startup time matters, since cplane tries
/// to do these updates eagerly, and passes the skip_pg_catalog_updates
/// when it's safe to skip this step.
pub config_ms: u64,
/// Total time, from when we receive the spec to when we're ready to take
/// pg connections.
pub total_startup_ms: u64,
pub load_ext_ms: u64,
pub num_ext_downloaded: u64,

View File

@@ -24,6 +24,20 @@ pub async fn is_directory_empty(path: impl AsRef<Path>) -> anyhow::Result<bool>
Ok(dir.next_entry().await?.is_none())
}
pub async fn list_dir(path: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
let mut dir = tokio::fs::read_dir(&path)
.await
.context(format!("read_dir({})", path.as_ref().display()))?;
let mut content = vec![];
while let Some(next) = dir.next_entry().await? {
let file_name = next.file_name();
content.push(file_name.to_string_lossy().to_string());
}
Ok(content)
}
pub fn ignore_not_found(e: io::Error) -> io::Result<()> {
if e.kind() == io::ErrorKind::NotFound {
Ok(())
@@ -43,7 +57,7 @@ where
mod test {
use std::path::PathBuf;
use crate::fs_ext::is_directory_empty;
use crate::fs_ext::{is_directory_empty, list_dir};
use super::ignore_absent_files;
@@ -109,4 +123,25 @@ mod test {
assert!(!file_path.exists());
}
#[tokio::test]
async fn list_dir_works() {
let dir = tempfile::tempdir().unwrap();
let dir_path = dir.path();
assert!(list_dir(dir_path).await.unwrap().is_empty());
let file_path: PathBuf = dir_path.join("testfile");
let _ = std::fs::File::create(&file_path).unwrap();
assert_eq!(&list_dir(dir_path).await.unwrap(), &["testfile"]);
let another_dir_path: PathBuf = dir_path.join("testdir");
std::fs::create_dir(another_dir_path).unwrap();
let expected = &["testdir", "testfile"];
let mut actual = list_dir(dir_path).await.unwrap();
actual.sort();
assert_eq!(actual, expected);
}
}

View File

@@ -373,7 +373,7 @@ fn start_pageserver(
let order = pageserver::InitializationOrder {
initial_tenant_load: Some(init_done_tx),
initial_logical_size_can_start: init_done_rx.clone(),
initial_logical_size_attempt: init_logical_size_done_tx,
initial_logical_size_attempt: Some(init_logical_size_done_tx),
background_jobs_can_start: background_jobs_barrier.clone(),
};

View File

@@ -31,7 +31,9 @@ use utils::{
use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
use crate::tenant::config::TenantConf;
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{TENANT_ATTACHING_MARKER_FILENAME, TIMELINES_SEGMENT_NAME};
use crate::tenant::{
TENANT_ATTACHING_MARKER_FILENAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
};
use crate::{
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX,
TIMELINE_UNINIT_MARK_SUFFIX,
@@ -613,6 +615,11 @@ impl PageServerConf {
)
}
pub fn tenant_deleted_mark_file_path(&self, tenant_id: &TenantId) -> PathBuf {
self.tenant_path(tenant_id)
.join(TENANT_DELETED_MARKER_FILE_NAME)
}
pub fn traces_path(&self) -> PathBuf {
self.workdir.join("traces")
}

View File

@@ -304,17 +304,18 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// Debug-log the list of candidates
let now = SystemTime::now();
for (i, (partition, candidate)) in candidates.iter().enumerate() {
let desc = candidate.layer.layer_desc();
debug!(
"cand {}/{}: size={}, no_access_for={}us, partition={:?}, {}/{}/{}",
i + 1,
candidates.len(),
candidate.layer.file_size(),
desc.file_size,
now.duration_since(candidate.last_activity_ts)
.unwrap()
.as_micros(),
partition,
candidate.layer.get_tenant_id(),
candidate.layer.get_timeline_id(),
desc.tenant_id,
desc.timeline_id,
candidate.layer,
);
}
@@ -346,7 +347,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
warned = Some(usage_planned);
}
usage_planned.add_available_bytes(candidate.layer.file_size());
usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size);
batched
.entry(TimelineKey(candidate.timeline))
@@ -389,15 +390,16 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
Ok(results) => {
assert_eq!(results.len(), batch.len());
for (result, layer) in results.into_iter().zip(batch.iter()) {
let file_size = layer.layer_desc().file_size;
match result {
Some(Ok(())) => {
usage_assumed.add_available_bytes(layer.file_size());
usage_assumed.add_available_bytes(file_size);
}
Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
unreachable!("get_local_layers_for_disk_usage_eviction finds only local layers")
}
Some(Err(EvictionError::FileNotFound)) => {
evictions_failed.file_sizes += layer.file_size();
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
Some(Err(
@@ -406,7 +408,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
)) => {
let e = utils::error::report_compact_sources(&e);
warn!(%layer, "failed to evict layer: {e}");
evictions_failed.file_sizes += layer.file_size();
evictions_failed.file_sizes += file_size;
evictions_failed.count += 1;
}
None => {

View File

@@ -93,6 +93,47 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
delete:
description: |
Attempts to delete specified tenant. 500 and 409 errors should be retried until 404 is retrieved.
404 means that deletion successfully finished"
responses:
"400":
description: Error when no tenant id found in path
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"404":
description: Tenant not found
content:
application/json:
schema:
$ref: "#/components/schemas/NotFoundError"
"409":
description: Deletion is already in progress, continue polling
content:
application/json:
schema:
$ref: "#/components/schemas/ConflictError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/timeline:
parameters:
@@ -820,6 +861,7 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/config:
put:
description: |

View File

@@ -187,7 +187,7 @@ impl From<crate::tenant::DeleteTimelineError> for ApiError {
format!("Cannot delete timeline which has child timelines: {children:?}")
.into_boxed_str(),
),
a @ AlreadyInProgress => ApiError::Conflict(a.to_string()),
a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()),
Other(e) => ApiError::InternalServerError(e),
}
}
@@ -208,6 +208,19 @@ impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
}
}
impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
fn from(value: crate::tenant::delete::DeleteTenantError) -> Self {
use crate::tenant::delete::DeleteTenantError::*;
match value {
Get(g) => ApiError::from(g),
e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
Timeline(t) => ApiError::from(t),
Other(o) => ApiError::InternalServerError(o),
e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()),
}
}
}
// Helper function to construct a TimelineInfo struct for a timeline
async fn build_timeline_info(
timeline: &Arc<Timeline>,
@@ -617,6 +630,23 @@ async fn tenant_status(
json_response(StatusCode::OK, tenant_info)
}
async fn tenant_delete_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
// TODO openapi spec
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let state = get_state(&request);
mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_id)
.instrument(info_span!("tenant_delete_handler", %tenant_id))
.await?;
json_response(StatusCode::ACCEPTED, ())
}
/// HTTP endpoint to query the current tenant_size of a tenant.
///
/// This is not used by consumption metrics under [`crate::consumption_metrics`], but can be used
@@ -1345,6 +1375,9 @@ pub fn make_router(
.get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
.post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
.get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status))
.delete("/v1/tenant/:tenant_id", |r| {
api_handler(r, tenant_delete_handler)
})
.get("/v1/tenant/:tenant_id/synthetic_size", |r| {
api_handler(r, tenant_size_handler)
})

View File

@@ -190,7 +190,7 @@ pub struct InitializationOrder {
/// Each timeline owns a clone of this to be consumed on the initial logical size calculation
/// attempt. It is important to drop this once the attempt has completed.
pub initial_logical_size_attempt: utils::completion::Completion,
pub initial_logical_size_attempt: Option<utils::completion::Completion>,
/// Barrier for when we can start any background jobs.
///
@@ -226,6 +226,7 @@ async fn timed<Fut: std::future::Future>(
let ret = fut.await;
// this has a global allowed_errors
tracing::warn!(
task = name,
elapsed_ms = started.elapsed().as_millis(),

View File

@@ -28,6 +28,7 @@ use std::cmp::min;
use std::collections::hash_map::Entry;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::fmt::Debug;
use std::fs;
use std::fs::File;
use std::fs::OpenOptions;
@@ -46,8 +47,10 @@ use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
use self::config::TenantConf;
use self::delete::DeleteTenantFlow;
use self::metadata::LoadMetadataError;
use self::metadata::TimelineMetadata;
use self::mgr::TenantsMap;
use self::remote_timeline_client::RemoteTimelineClient;
use self::timeline::uninit::TimelineUninitMark;
use self::timeline::uninit::UninitializedTimeline;
@@ -106,6 +109,7 @@ macro_rules! pausable_failpoint {
pub mod blob_io;
pub mod block_io;
pub mod disk_btree;
pub(crate) mod ephemeral_file;
pub mod layer_map;
@@ -118,6 +122,7 @@ mod remote_timeline_client;
pub mod storage_layer;
pub mod config;
pub mod delete;
pub mod mgr;
pub mod tasks;
pub mod upload_queue;
@@ -145,6 +150,8 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
pub const TENANT_ATTACHING_MARKER_FILENAME: &str = "attaching";
pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
///
/// Tenant consists of multiple timelines. Keep them in a hash table.
///
@@ -183,6 +190,8 @@ pub struct Tenant {
cached_synthetic_tenant_size: Arc<AtomicU64>,
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
}
// We should not blindly overwrite local metadata with remote one.
@@ -274,7 +283,7 @@ pub enum LoadLocalTimelineError {
ResumeDeletion(#[source] anyhow::Error),
}
#[derive(Debug, thiserror::Error)]
#[derive(thiserror::Error)]
pub enum DeleteTimelineError {
#[error("NotFound")]
NotFound,
@@ -283,17 +292,37 @@ pub enum DeleteTimelineError {
HasChildren(Vec<TimelineId>),
#[error("Timeline deletion is already in progress")]
AlreadyInProgress,
AlreadyInProgress(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl Debug for DeleteTimelineError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NotFound => write!(f, "NotFound"),
Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(),
Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(),
Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
}
}
}
pub enum SetStoppingError {
AlreadyStopping(completion::Barrier),
Broken,
}
impl Debug for SetStoppingError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(),
Self::Broken => write!(f, "Broken"),
}
}
}
struct RemoteStartupData {
index_part: IndexPart,
remote_metadata: TimelineMetadata,
@@ -616,7 +645,7 @@ impl Tenant {
// For every timeline, download the metadata file, scan the local directory,
// and build a layer map that contains an entry for each remote and local
// layer file.
let sorted_timelines = tree_sort_timelines(timeline_ancestors)?;
let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
for (timeline_id, remote_metadata) in sorted_timelines {
let (index_part, remote_client) = remote_index_and_client
.remove(&timeline_id)
@@ -740,12 +769,13 @@ impl Tenant {
/// If the loading fails for some reason, the Tenant will go into Broken
/// state.
#[instrument(skip_all, fields(tenant_id=%tenant_id))]
pub fn spawn_load(
pub(crate) fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> Arc<Tenant> {
span::debug_assert_current_span_has_tenant_id();
@@ -765,7 +795,7 @@ impl Tenant {
tenant_conf,
wal_redo_manager,
tenant_id,
remote_storage,
remote_storage.clone(),
);
let tenant = Arc::new(tenant);
@@ -781,27 +811,83 @@ impl Tenant {
"initial tenant load",
false,
async move {
let make_broken = |t: &Tenant, err: anyhow::Error| {
error!("load failed, setting tenant state to Broken: {err:?}");
t.state.send_modify(|state| {
assert!(
matches!(*state, TenantState::Loading | TenantState::Stopping { .. }),
"the loading task owns the tenant state until activation is complete"
);
*state = TenantState::broken_from_reason(err.to_string());
});
};
let mut init_order = init_order;
// take the completion because initial tenant loading will complete when all of
// these tasks complete.
let _completion = init_order.as_mut().and_then(|x| x.initial_tenant_load.take());
let _completion = init_order
.as_mut()
.and_then(|x| x.initial_tenant_load.take());
// Dont block pageserver startup on figuring out deletion status
let pending_deletion = {
match DeleteTenantFlow::should_resume_deletion(
conf,
remote_storage.as_ref(),
&tenant_clone,
)
.await
{
Ok(should_resume_deletion) => should_resume_deletion,
Err(err) => {
make_broken(&tenant_clone, anyhow::anyhow!(err));
return Ok(());
}
}
};
info!("pending deletion {}", pending_deletion.is_some());
if let Some(deletion) = pending_deletion {
// as we are no longer loading, signal completion by dropping
// the completion while we resume deletion
drop(_completion);
// do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
let _ = init_order
.as_mut()
.and_then(|x| x.initial_logical_size_attempt.take());
match DeleteTenantFlow::resume(
deletion,
&tenant_clone,
init_order.as_ref(),
tenants,
&ctx,
)
.await
{
Err(err) => {
make_broken(&tenant_clone, anyhow::anyhow!(err));
return Ok(());
}
Ok(()) => return Ok(()),
}
}
let background_jobs_can_start =
init_order.as_ref().map(|x| &x.background_jobs_can_start);
match tenant_clone.load(init_order.as_ref(), &ctx).await {
Ok(()) => {
debug!("load finished, activating");
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
debug!("load finished",);
tenant_clone.activate(broker_client, background_jobs_can_start, &ctx);
}
Err(err) => {
error!("load failed, setting tenant state to Broken: {err:?}");
tenant_clone.state.send_modify(|state| {
assert_eq!(*state, TenantState::Loading, "the loading task owns the tenant state until activation is complete");
*state = TenantState::broken_from_reason(err.to_string());
});
}
Err(err) => make_broken(&tenant_clone, err),
}
Ok(())
Ok(())
}
.instrument({
let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
@@ -877,6 +963,8 @@ impl Tenant {
)
})?;
info!("Found deletion mark for timeline {}", timeline_id);
match load_metadata(self.conf, &self.tenant_id, &timeline_id) {
Ok(metadata) => {
timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
@@ -966,9 +1054,11 @@ impl Tenant {
// Sort the array of timeline IDs into tree-order, so that parent comes before
// all its children.
tree_sort_timelines(timelines_to_load).map(|sorted_timelines| TenantDirectoryScan {
sorted_timelines_to_load: sorted_timelines,
timelines_to_resume_deletion,
tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
TenantDirectoryScan {
sorted_timelines_to_load: sorted_timelines,
timelines_to_resume_deletion,
}
})
}
@@ -1682,7 +1772,7 @@ impl Tenant {
// It's mesed up.
// we just ignore the failure to stop
match self.set_stopping(shutdown_progress).await {
match self.set_stopping(shutdown_progress, false).await {
Ok(()) => {}
Err(SetStoppingError::Broken) => {
// assume that this is acceptable
@@ -1722,18 +1812,25 @@ impl Tenant {
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
///
/// This function is not cancel-safe!
async fn set_stopping(&self, progress: completion::Barrier) -> Result<(), SetStoppingError> {
///
/// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
async fn set_stopping(
&self,
progress: completion::Barrier,
allow_transition_from_loading: bool,
) -> Result<(), SetStoppingError> {
let mut rx = self.state.subscribe();
// cannot stop before we're done activating, so wait out until we're done activating
rx.wait_for(|state| match state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
TenantState::Activating(_) | TenantState::Attaching => {
info!(
"waiting for {} to turn Active|Broken|Stopping",
<&'static str>::from(state)
);
false
}
TenantState::Loading => allow_transition_from_loading,
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
})
.await
@@ -1742,9 +1839,16 @@ impl Tenant {
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
let mut err = None;
let stopping = self.state.send_if_modified(|current_state| match current_state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
TenantState::Activating(_) | TenantState::Attaching => {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
}
TenantState::Loading => {
if !allow_transition_from_loading {
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
};
*current_state = TenantState::Stopping { progress };
true
}
TenantState::Active => {
// FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
// are created after the transition to Stopping. That's harmless, as the Timelines
@@ -1813,6 +1917,10 @@ impl Tenant {
.expect("cannot drop self.state while on a &self method");
// we now know we're done activating, let's see whether this task is the winner to transition into Broken
self.set_broken_no_wait(reason)
}
pub(crate) fn set_broken_no_wait(&self, reason: String) {
self.state.send_modify(|current_state| {
match *current_state {
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
@@ -1878,22 +1986,28 @@ impl Tenant {
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
/// perform a topological sort, so that the parent of each timeline comes
/// before the children.
fn tree_sort_timelines(
timelines: HashMap<TimelineId, TimelineMetadata>,
) -> anyhow::Result<Vec<(TimelineId, TimelineMetadata)>> {
/// E extracts the ancestor from T
/// This allows for T to be different. It can be TimelineMetadata, can be Timeline itself, etc.
fn tree_sort_timelines<T, E>(
timelines: HashMap<TimelineId, T>,
extractor: E,
) -> anyhow::Result<Vec<(TimelineId, T)>>
where
E: Fn(&T) -> Option<TimelineId>,
{
let mut result = Vec::with_capacity(timelines.len());
let mut now = Vec::with_capacity(timelines.len());
// (ancestor, children)
let mut later: HashMap<TimelineId, Vec<(TimelineId, TimelineMetadata)>> =
let mut later: HashMap<TimelineId, Vec<(TimelineId, T)>> =
HashMap::with_capacity(timelines.len());
for (timeline_id, metadata) in timelines {
if let Some(ancestor_id) = metadata.ancestor_timeline() {
for (timeline_id, value) in timelines {
if let Some(ancestor_id) = extractor(&value) {
let children = later.entry(ancestor_id).or_default();
children.push((timeline_id, metadata));
children.push((timeline_id, value));
} else {
now.push((timeline_id, metadata));
now.push((timeline_id, value));
}
}
@@ -2062,7 +2176,7 @@ impl Tenant {
remote_client,
pg_version,
initial_logical_size_can_start.cloned(),
initial_logical_size_attempt.cloned(),
initial_logical_size_attempt.cloned().flatten(),
state,
);
@@ -2146,6 +2260,7 @@ impl Tenant {
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
}
}
@@ -2162,6 +2277,7 @@ impl Tenant {
// FIXME If the config file is not found, assume that we're attaching
// a detached tenant and config is passed via attach command.
// https://github.com/neondatabase/neon/issues/1555
// OR: we're loading after incomplete deletion that managed to remove config.
if !target_config_path.exists() {
info!("tenant config not found in {target_config_display}");
return Ok(TenantConfOpt::default());

View File

@@ -0,0 +1,546 @@
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use anyhow::Context;
use pageserver_api::models::TenantState;
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
use tokio::sync::OwnedMutexGuard;
use tracing::{error, info, instrument, warn, Instrument, Span};
use utils::{
completion, crashsafe, fs_ext,
id::{TenantId, TimelineId},
};
use crate::{
config::PageServerConf,
context::RequestContext,
task_mgr::{self, TaskKind},
InitializationOrder,
};
use super::{
mgr::{GetTenantError, TenantsMap},
span,
timeline::delete::DeleteTimelineFlow,
tree_sort_timelines, DeleteTimelineError, Tenant,
};
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u8 = 3;
#[derive(Debug, thiserror::Error)]
pub enum DeleteTenantError {
#[error("GetTenant {0}")]
Get(#[from] GetTenantError),
#[error("Invalid state {0}. Expected Active or Broken")]
InvalidState(TenantState),
#[error("Tenant deletion is already in progress")]
AlreadyInProgress,
#[error("Timeline {0}")]
Timeline(#[from] DeleteTimelineError),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
type DeletionGuard = tokio::sync::OwnedMutexGuard<DeleteTenantFlow>;
fn remote_tenant_delete_mark_path(
conf: &PageServerConf,
tenant_id: &TenantId,
) -> anyhow::Result<RemotePath> {
let tenant_remote_path = conf
.tenant_path(tenant_id)
.strip_prefix(&conf.workdir)
.context("Failed to strip workdir prefix")
.and_then(RemotePath::new)
.context("tenant path")?;
Ok(tenant_remote_path.join(Path::new("deleted")))
}
async fn create_remote_delete_mark(
conf: &PageServerConf,
remote_storage: &GenericRemoteStorage,
tenant_id: &TenantId,
) -> Result<(), DeleteTenantError> {
let remote_mark_path = remote_tenant_delete_mark_path(conf, tenant_id)?;
let data: &[u8] = &[];
remote_storage
.upload(data, 0, &remote_mark_path, None)
.await
.context("mark upload")?;
Ok(())
}
async fn create_local_delete_mark(
conf: &PageServerConf,
tenant_id: &TenantId,
) -> Result<(), DeleteTenantError> {
let marker_path = conf.tenant_deleted_mark_file_path(tenant_id);
// Note: we're ok to replace existing file.
let _ = std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(&marker_path)
.with_context(|| format!("could not create delete marker file {marker_path:?}"))?;
crashsafe::fsync_file_and_parent(&marker_path).context("sync_mark")?;
Ok(())
}
async fn schedule_ordered_timeline_deletions(
tenant: &Arc<Tenant>,
) -> Result<Vec<(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>, TimelineId)>, DeleteTenantError> {
// Tenant is stopping at this point. We know it will be deleted.
// No new timelines should be created.
// Tree sort timelines to delete from leafs to the root.
// NOTE: by calling clone we release the mutex which creates a possibility for a race: pending deletion
// can complete and remove timeline from the map in between our call to clone
// and `DeleteTimelineFlow::run`, so `run` wont find timeline in `timelines` map.
// timelines.lock is currently synchronous so we cant hold it across await point.
// So just ignore NotFound error if we get it from `run`.
// Beware: in case it becomes async and we try to hold it here, `run` also locks it, which can create a deadlock.
let timelines = tenant.timelines.lock().unwrap().clone();
let sorted =
tree_sort_timelines(timelines, |t| t.get_ancestor_timeline_id()).context("tree sort")?;
let mut already_running_deletions = vec![];
for (timeline_id, _) in sorted.into_iter().rev() {
if let Err(e) = DeleteTimelineFlow::run(tenant, timeline_id, true).await {
match e {
DeleteTimelineError::NotFound => {
// Timeline deletion finished after call to clone above but before call
// to `DeleteTimelineFlow::run` and removed timeline from the map.
continue;
}
DeleteTimelineError::AlreadyInProgress(guard) => {
already_running_deletions.push((guard, timeline_id));
continue;
}
e => return Err(DeleteTenantError::Timeline(e)),
}
}
}
Ok(already_running_deletions)
}
async fn ensure_timelines_dir_empty(timelines_path: &Path) -> Result<(), DeleteTenantError> {
// Assert timelines dir is empty.
if !fs_ext::is_directory_empty(timelines_path).await? {
// Display first 10 items in directory
let list = &fs_ext::list_dir(timelines_path).await.context("list_dir")?[..10];
return Err(DeleteTenantError::Other(anyhow::anyhow!(
"Timelines directory is not empty after all timelines deletion: {list:?}"
)));
}
Ok(())
}
async fn remove_tenant_remote_delete_mark(
conf: &PageServerConf,
remote_storage: Option<&GenericRemoteStorage>,
tenant_id: &TenantId,
) -> Result<(), DeleteTenantError> {
if let Some(remote_storage) = remote_storage {
remote_storage
.delete(&remote_tenant_delete_mark_path(conf, tenant_id)?)
.await?;
}
Ok(())
}
// Cleanup fs traces: tenant config, timelines dir local delete mark, tenant dir
async fn cleanup_remaining_fs_traces(
conf: &PageServerConf,
tenant_id: &TenantId,
) -> Result<(), DeleteTenantError> {
let rm = |p: PathBuf, is_dir: bool| async move {
if is_dir {
tokio::fs::remove_dir(&p).await
} else {
tokio::fs::remove_file(&p).await
}
.or_else(fs_ext::ignore_not_found)
.with_context(|| {
let to_display = p.display();
format!("failed to delete {to_display}")
})
};
rm(conf.tenant_config_path(tenant_id), false).await?;
fail::fail_point!("tenant-delete-before-remove-timelines-dir", |_| {
Err(anyhow::anyhow!(
"failpoint: tenant-delete-before-remove-timelines-dir"
))?
});
rm(conf.timelines_path(tenant_id), true).await?;
fail::fail_point!("tenant-delete-before-remove-deleted-mark", |_| {
Err(anyhow::anyhow!(
"failpoint: tenant-delete-before-remove-deleted-mark"
))?
});
rm(conf.tenant_deleted_mark_file_path(tenant_id), false).await?;
fail::fail_point!("tenant-delete-before-remove-tenant-dir", |_| {
Err(anyhow::anyhow!(
"failpoint: tenant-delete-before-remove-tenant-dir"
))?
});
rm(conf.tenant_path(tenant_id), true).await?;
Ok(())
}
/// Orchestrates tenant shut down of all tasks, removes its in-memory structures,
/// and deletes its data from both disk and s3.
/// The sequence of steps:
/// 1. Upload remote deletion mark.
/// 2. Create local mark file.
/// 3. Shutdown tasks
/// 4. Run ordered timeline deletions
/// 5. Wait for timeline deletion operations that were scheduled before tenant deletion was requested
/// 6. Remove remote mark
/// 7. Cleanup remaining fs traces, tenant dir, config, timelines dir, local delete mark
/// It is resumable from any step in case a crash/restart occurs.
/// There are three entrypoints to the process:
/// 1. [`DeleteTenantFlow::run`] this is the main one called by a management api handler.
/// 2. [`DeleteTenantFlow::resume`] is called during restarts when local or remote deletion marks are still there.
/// Note the only other place that messes around timeline delete mark is the `Tenant::spawn_load` function.
#[derive(Default)]
pub enum DeleteTenantFlow {
#[default]
NotStarted,
InProgress,
Finished,
}
impl DeleteTenantFlow {
// These steps are run in the context of management api request handler.
// Long running steps are continued to run in the background.
// NB: If this fails half-way through, and is retried, the retry will go through
// all the same steps again. Make sure the code here is idempotent, and don't
// error out if some of the shutdown tasks have already been completed!
// NOTE: static needed for background part.
// We assume that calling code sets up the span with tenant_id.
#[instrument(skip_all)]
pub(crate) async fn run(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
) -> Result<(), DeleteTenantError> {
span::debug_assert_current_span_has_tenant_id();
let (tenant, mut guard) = Self::prepare(tenants, tenant_id).await?;
if let Err(e) = Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant).await {
tenant.set_broken(format!("{e:#}")).await;
return Err(e);
}
Self::schedule_background(guard, conf, remote_storage, tenants, tenant);
Ok(())
}
// Helper function needed to be able to match once on returned error and transition tenant into broken state.
// This is needed because tenant.shutwodn is not idempotent. If tenant state is set to stopping another call to tenant.shutdown
// will result in an error, but here we need to be able to retry shutdown when tenant deletion is retried.
// So the solution is to set tenant state to broken.
async fn run_inner(
guard: &mut OwnedMutexGuard<Self>,
conf: &'static PageServerConf,
remote_storage: Option<&GenericRemoteStorage>,
tenant: &Tenant,
) -> Result<(), DeleteTenantError> {
guard.mark_in_progress()?;
fail::fail_point!("tenant-delete-before-create-remote-mark", |_| {
Err(anyhow::anyhow!(
"failpoint: tenant-delete-before-create-remote-mark"
))?
});
// IDEA: implement detach as delete without remote storage. Then they would use the same lock (deletion_progress) so wont contend.
// Though sounds scary, different mark name?
// Detach currently uses remove_dir_all so in case of a crash we can end up in a weird state.
if let Some(remote_storage) = &remote_storage {
create_remote_delete_mark(conf, remote_storage, &tenant.tenant_id)
.await
.context("remote_mark")?
}
fail::fail_point!("tenant-delete-before-create-local-mark", |_| {
Err(anyhow::anyhow!(
"failpoint: tenant-delete-before-create-local-mark"
))?
});
create_local_delete_mark(conf, &tenant.tenant_id)
.await
.context("local delete mark")?;
fail::fail_point!("tenant-delete-before-background", |_| {
Err(anyhow::anyhow!(
"failpoint: tenant-delete-before-background"
))?
});
Ok(())
}
fn mark_in_progress(&mut self) -> anyhow::Result<()> {
match self {
Self::Finished => anyhow::bail!("Bug. Is in finished state"),
Self::InProgress { .. } => { /* We're in a retry */ }
Self::NotStarted => { /* Fresh start */ }
}
*self = Self::InProgress;
Ok(())
}
pub async fn should_resume_deletion(
conf: &'static PageServerConf,
remote_storage: Option<&GenericRemoteStorage>,
tenant: &Tenant,
) -> Result<Option<DeletionGuard>, DeleteTenantError> {
let acquire = |t: &Tenant| {
Some(
Arc::clone(&t.delete_progress)
.try_lock_owned()
.expect("we're the only owner during init"),
)
};
let tenant_id = tenant.tenant_id;
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
return Ok(acquire(tenant));
}
// If remote storage is there we rely on it
if let Some(remote_storage) = remote_storage {
let remote_mark_path = remote_tenant_delete_mark_path(conf, &tenant_id)?;
let attempt = 1;
loop {
match remote_storage.download(&remote_mark_path).await {
Ok(_) => return Ok(acquire(tenant)),
Err(e) => {
if matches!(e, DownloadError::NotFound) {
return Ok(None);
}
if attempt > SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS {
return Err(anyhow::anyhow!(e))?;
}
warn!(
"failed to fetch tenant deletion mark at {} attempt {}",
&remote_mark_path, attempt
)
}
}
}
}
Ok(None)
}
pub(crate) async fn resume(
guard: DeletionGuard,
tenant: &Arc<Tenant>,
init_order: Option<&InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel();
tenant
.set_stopping(progress, true)
.await
.expect("cant be stopping or broken");
// Do not consume valuable resources during the load phase, continue deletion once init phase is complete.
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
if let Some(background) = background_jobs_can_start {
info!("waiting for backgound jobs barrier");
background.clone().wait().await;
info!("ready for backgound jobs barrier");
}
// Tenant may not be loadable if we fail late in cleanup_remaining_fs_traces (e g remove timelines dir)
let timelines_path = tenant.conf.timelines_path(&tenant.tenant_id);
if timelines_path.exists() {
tenant.load(init_order, ctx).await.context("load")?;
}
Self::background(
guard,
tenant.conf,
tenant.remote_storage.clone(),
tenants,
tenant,
)
.await
}
async fn prepare(
tenants: &tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
) -> Result<(Arc<Tenant>, tokio::sync::OwnedMutexGuard<Self>), DeleteTenantError> {
let m = tenants.read().await;
let tenant = m
.get(&tenant_id)
.ok_or(GetTenantError::NotFound(tenant_id))?;
// FIXME: unsure about active only. Our init jobs may not be cancellable properly,
// so at least for now allow deletions only for active tenants. TODO recheck
// Broken and Stopping is needed for retries.
if !matches!(
tenant.current_state(),
TenantState::Active | TenantState::Broken { .. }
) {
return Err(DeleteTenantError::InvalidState(tenant.current_state()));
}
let guard = Arc::clone(&tenant.delete_progress)
.try_lock_owned()
.map_err(|_| DeleteTenantError::AlreadyInProgress)?;
fail::fail_point!("tenant-delete-before-shutdown", |_| {
Err(anyhow::anyhow!("failpoint: tenant-delete-before-shutdown"))?
});
// make pageserver shutdown not to wait for our completion
let (_, progress) = completion::channel();
// It would be good to only set stopping here and continue shutdown in the background, but shutdown is not idempotent.
// i e it is an error to do:
// tenant.set_stopping
// tenant.shutdown
// Its also bad that we're holding tenants.read here.
// TODO relax set_stopping to be idempotent?
if tenant.shutdown(progress, false).await.is_err() {
return Err(DeleteTenantError::Other(anyhow::anyhow!(
"tenant shutdown is already in progress"
)));
}
Ok((Arc::clone(tenant), guard))
}
fn schedule_background(
guard: OwnedMutexGuard<Self>,
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenant: Arc<Tenant>,
) {
let tenant_id = tenant.tenant_id;
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::TimelineDeletionWorker,
Some(tenant_id),
None,
"tenant_delete",
false,
async move {
if let Err(err) =
Self::background(guard, conf, remote_storage, tenants, &tenant).await
{
error!("Error: {err:#}");
tenant.set_broken(format!("{err:#}")).await;
};
Ok(())
}
.instrument({
let span = tracing::info_span!(parent: None, "delete_tenant", tenant_id=%tenant_id);
span.follows_from(Span::current());
span
}),
);
}
async fn background(
mut guard: OwnedMutexGuard<Self>,
conf: &PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenant: &Arc<Tenant>,
) -> Result<(), DeleteTenantError> {
// Tree sort timelines, schedule delete for them. Mention retries from the console side.
// Note that if deletion fails we dont mark timelines as broken,
// the whole tenant will become broken as by `Self::schedule_background` logic
let already_running_timeline_deletions = schedule_ordered_timeline_deletions(tenant)
.await
.context("schedule_ordered_timeline_deletions")?;
fail::fail_point!("tenant-delete-before-polling-ongoing-deletions", |_| {
Err(anyhow::anyhow!(
"failpoint: tenant-delete-before-polling-ongoing-deletions"
))?
});
// Wait for deletions that were already running at the moment when tenant deletion was requested.
// When we can lock deletion guard it means that corresponding timeline deletion finished.
for (guard, timeline_id) in already_running_timeline_deletions {
let flow = guard.lock().await;
if !flow.is_finished() {
return Err(DeleteTenantError::Other(anyhow::anyhow!(
"already running timeline deletion failed: {timeline_id}"
)));
}
}
let timelines_path = conf.timelines_path(&tenant.tenant_id);
// May not exist if we fail in cleanup_remaining_fs_traces after removing it
if timelines_path.exists() {
// sanity check to guard against layout changes
ensure_timelines_dir_empty(&timelines_path)
.await
.context("timelines dir not empty")?;
}
remove_tenant_remote_delete_mark(conf, remote_storage.as_ref(), &tenant.tenant_id).await?;
fail::fail_point!("tenant-delete-before-cleanup-remaining-fs-traces", |_| {
Err(anyhow::anyhow!(
"failpoint: tenant-delete-before-cleanup-remaining-fs-traces"
))?
});
cleanup_remaining_fs_traces(conf, &tenant.tenant_id)
.await
.context("cleanup_remaining_fs_traces")?;
let mut locked = tenants.write().await;
if locked.remove(&tenant.tenant_id).is_none() {
warn!("Tenant got removed from tenants map during deletion");
};
*guard = Self::Finished;
Ok(())
}
}

View File

@@ -20,6 +20,7 @@
//!
use byteorder::{ReadBytesExt, BE};
use bytes::{BufMut, Bytes, BytesMut};
use either::Either;
use hex;
use std::{cmp::Ordering, io, result};
use thiserror::Error;
@@ -256,103 +257,77 @@ where
where
V: FnMut(&[u8], u64) -> bool,
{
self.search_recurse(self.root_blk, search_key, dir, &mut visitor)
}
let mut stack = Vec::new();
stack.push((self.root_blk, None));
while let Some((node_blknum, opt_iter)) = stack.pop() {
// Locate the node.
let node_buf = self.reader.read_blk(self.start_blk + node_blknum)?;
fn search_recurse<V>(
&self,
node_blknum: u32,
search_key: &[u8; L],
dir: VisitDirection,
visitor: &mut V,
) -> Result<bool>
where
V: FnMut(&[u8], u64) -> bool,
{
// Locate the node.
let node_buf = self.reader.read_blk(self.start_blk + node_blknum)?;
let node = OnDiskNode::deparse(node_buf.as_ref())?;
let prefix_len = node.prefix_len as usize;
let suffix_len = node.suffix_len as usize;
let node = OnDiskNode::deparse(node_buf.as_ref())?;
let prefix_len = node.prefix_len as usize;
let suffix_len = node.suffix_len as usize;
assert!(node.num_children > 0);
assert!(node.num_children > 0);
let mut keybuf = Vec::new();
keybuf.extend(node.prefix);
keybuf.resize(prefix_len + suffix_len, 0);
let mut keybuf = Vec::new();
keybuf.extend(node.prefix);
keybuf.resize(prefix_len + suffix_len, 0);
if dir == VisitDirection::Forwards {
// Locate the first match
let mut idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
Ok(idx) => idx,
Err(idx) => {
if node.level == 0 {
// Imagine that the node contains the following keys:
//
// 1
// 3 <-- idx
// 5
//
// If the search key is '2' and there is exact match,
// the binary search would return the index of key
// '3'. That's cool, '3' is the first key to return.
let mut iter = if let Some(iter) = opt_iter {
iter
} else if dir == VisitDirection::Forwards {
// Locate the first match
let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
Ok(idx) => idx,
Err(idx) => {
if node.level == 0 {
// Imagine that the node contains the following keys:
//
// 1
// 3 <-- idx
// 5
//
// If the search key is '2' and there is exact match,
// the binary search would return the index of key
// '3'. That's cool, '3' is the first key to return.
idx
} else {
// This is an internal page, so each key represents a lower
// bound for what's in the child page. If there is no exact
// match, we have to return the *previous* entry.
//
// 1 <-- return this
// 3 <-- idx
// 5
idx.saturating_sub(1)
}
}
};
Either::Left(idx..node.num_children.into())
} else {
let idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
Ok(idx) => {
// Exact match. That's the first entry to return, and walk
// backwards from there.
idx
} else {
// This is an internal page, so each key represents a lower
// bound for what's in the child page. If there is no exact
// match, we have to return the *previous* entry.
//
// 1 <-- return this
// 3 <-- idx
// 5
idx.saturating_sub(1)
}
}
};
// idx points to the first match now. Keep going from there
let mut key_off = idx * suffix_len;
while idx < node.num_children as usize {
let suffix = &node.keys[key_off..key_off + suffix_len];
keybuf[prefix_len..].copy_from_slice(suffix);
let value = node.value(idx);
#[allow(clippy::collapsible_if)]
if node.level == 0 {
// leaf
if !visitor(&keybuf, value.to_u64()) {
return Ok(false);
Err(idx) => {
// No exact match. The binary search returned the index of the
// first key that's > search_key. Back off by one, and walk
// backwards from there.
if let Some(idx) = idx.checked_sub(1) {
idx
} else {
return Ok(false);
}
}
} else {
#[allow(clippy::collapsible_if)]
if !self.search_recurse(value.to_blknum(), search_key, dir, visitor)? {
return Ok(false);
}
}
idx += 1;
key_off += suffix_len;
}
} else {
let mut idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
Ok(idx) => {
// Exact match. That's the first entry to return, and walk
// backwards from there. (The loop below starts from 'idx -
// 1', so add one here to compensate.)
idx + 1
}
Err(idx) => {
// No exact match. The binary search returned the index of the
// first key that's > search_key. Back off by one, and walk
// backwards from there. (The loop below starts from idx - 1,
// so we don't need to subtract one here)
idx
}
};
Either::Right((0..=idx).rev())
};
// idx points to the first match + 1 now. Keep going from there.
let mut key_off = idx * suffix_len;
while idx > 0 {
idx -= 1;
key_off -= suffix_len;
// idx points to the first match now. Keep going from there
while let Some(idx) = iter.next() {
let key_off = idx * suffix_len;
let suffix = &node.keys[key_off..key_off + suffix_len];
keybuf[prefix_len..].copy_from_slice(suffix);
let value = node.value(idx);
@@ -363,12 +338,8 @@ where
return Ok(false);
}
} else {
#[allow(clippy::collapsible_if)]
if !self.search_recurse(value.to_blknum(), search_key, dir, visitor)? {
return Ok(false);
}
}
if idx == 0 {
stack.push((node_blknum, Some(iter)));
stack.push((value.to_blknum(), None));
break;
}
}

View File

@@ -121,7 +121,7 @@ impl BatchedUpdates<'_> {
///
/// This should be called when the corresponding file on disk has been deleted.
///
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) {
pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) {
self.layer_map.remove_historic_noflush(layer_desc)
}
@@ -253,11 +253,11 @@ impl LayerMap {
///
/// Helper function for BatchedUpdates::remove_historic
///
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
self.historic
.remove(historic_layer_coverage::LayerKey::from(&layer_desc));
.remove(historic_layer_coverage::LayerKey::from(layer_desc));
let layer_key = layer_desc.key();
if Self::is_l0(&layer_desc) {
if Self::is_l0(layer_desc) {
let len_before = self.l0_delta_layers.len();
let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
l0_delta_layers.retain(|other| other.key() != layer_key);
@@ -766,8 +766,7 @@ mod tests {
expected_in_counts
);
map.batch_update()
.remove_historic(downloaded.layer_desc().clone());
map.batch_update().remove_historic(downloaded.layer_desc());
assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
}

View File

@@ -20,17 +20,19 @@ use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME};
use utils::fs_ext::PathExt;
use utils::id::{TenantId, TimelineId};
use super::delete::DeleteTenantError;
use super::timeline::delete::DeleteTimelineFlow;
/// The tenants known to the pageserver.
/// The enum variants are used to distinguish the different states that the pageserver can be in.
enum TenantsMap {
pub(crate) enum TenantsMap {
/// [`init_tenant_mgr`] is not done yet.
Initializing,
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
@@ -42,13 +44,13 @@ enum TenantsMap {
}
impl TenantsMap {
fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
pub(crate) fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
}
}
fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
@@ -97,7 +99,9 @@ pub async fn init_tenant_mgr(
);
}
} else {
// This case happens if we crash during attach before creating the attach marker file
// This case happens if we:
// * crash during attach before creating the attach marker file
// * crash during tenant delete before removing tenant directory
let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
})?;
@@ -124,6 +128,7 @@ pub async fn init_tenant_mgr(
broker_client.clone(),
remote_storage.clone(),
Some(init_order.clone()),
&TENANTS,
&ctx,
) {
Ok(tenant) => {
@@ -154,12 +159,13 @@ pub async fn init_tenant_mgr(
Ok(())
}
pub fn schedule_local_tenant_processing(
pub(crate) fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_path: &Path,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!(
@@ -219,6 +225,7 @@ pub fn schedule_local_tenant_processing(
broker_client,
remote_storage,
init_order,
tenants,
ctx,
)
};
@@ -356,7 +363,7 @@ pub async fn create_tenant(
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?;
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233
@@ -417,6 +424,14 @@ pub async fn get_tenant(
}
}
pub async fn delete_tenant(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenant_id: TenantId,
) -> Result<(), DeleteTenantError> {
DeleteTenantFlow::run(conf, remote_storage, &TENANTS, tenant_id).await
}
#[derive(Debug, thiserror::Error)]
pub enum DeleteTimelineError {
#[error("Tenant {0}")]
@@ -432,7 +447,7 @@ pub async fn delete_timeline(
_ctx: &RequestContext,
) -> Result<(), DeleteTimelineError> {
let tenant = get_tenant(tenant_id, true).await?;
DeleteTimelineFlow::run(&tenant, timeline_id).await?;
DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
Ok(())
}
@@ -507,7 +522,7 @@ pub async fn load_tenant(
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
}
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, ctx)
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, &TENANTS, ctx)
.with_context(|| {
format!("Failed to schedule tenant processing in path {tenant_path:?}")
})?;
@@ -588,7 +603,7 @@ pub async fn attach_tenant(
.context("check for attach marker file existence")?;
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?;
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
// See https://github.com/neondatabase/neon/issues/4233

View File

@@ -401,16 +401,6 @@ pub trait AsLayerDesc {
/// An image layer is a snapshot of all the data in a key-range, at a single
/// LSN.
pub trait PersistentLayer: Layer + AsLayerDesc {
/// Identify the tenant this layer belongs to
fn get_tenant_id(&self) -> TenantId {
self.layer_desc().tenant_id
}
/// Identify the timeline this layer belongs to
fn get_timeline_id(&self) -> TimelineId {
self.layer_desc().timeline_id
}
/// File name used for this layer, both in the pageserver's local filesystem
/// state as well as in the remote storage.
fn filename(&self) -> LayerFileName {
@@ -436,14 +426,6 @@ pub trait PersistentLayer: Layer + AsLayerDesc {
false
}
/// Returns None if the layer file size is not known.
///
/// Should not change over the lifetime of the layer object because
/// current_physical_size is computed as the som of this value.
fn file_size(&self) -> u64 {
self.layer_desc().file_size
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo;
fn access_stats(&self) -> &LayerAccessStats;

View File

@@ -90,14 +90,30 @@ pub struct Summary {
impl From<&DeltaLayer> for Summary {
fn from(layer: &DeltaLayer) -> Self {
Self::expected(
layer.desc.tenant_id,
layer.desc.timeline_id,
layer.desc.key_range.clone(),
layer.desc.lsn_range.clone(),
)
}
}
impl Summary {
pub(super) fn expected(
tenant_id: TenantId,
timeline_id: TimelineId,
keys: Range<Key>,
lsns: Range<Lsn>,
) -> Self {
Self {
magic: DELTA_FILE_MAGIC,
format_version: STORAGE_FORMAT_VERSION,
tenant_id: layer.desc.tenant_id,
timeline_id: layer.desc.timeline_id,
key_range: layer.desc.key_range.clone(),
lsn_range: layer.desc.lsn_range.clone(),
tenant_id,
timeline_id,
key_range: keys,
lsn_range: lsns,
index_start_blk: 0,
index_root_blk: 0,
@@ -108,12 +124,10 @@ impl From<&DeltaLayer> for Summary {
// Flag indicating that this version initialize the page
const WILL_INIT: u64 = 1;
///
/// Struct representing reference to BLOB in layers. Reference contains BLOB
/// offset, and for WAL records it also contains `will_init` flag. The flag
/// helps to determine the range of records that needs to be applied, without
/// reading/deserializing records themselves.
///
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
pub struct BlobRef(pub u64);
@@ -138,10 +152,8 @@ impl BlobRef {
pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
struct DeltaKey([u8; DELTA_KEY_SIZE]);
///
/// This is the key of the B-tree index stored in the delta layer. It consists
/// of the serialized representation of a Key and LSN.
///
impl DeltaKey {
fn from_slice(buf: &[u8]) -> Self {
let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
@@ -214,6 +226,12 @@ pub struct DeltaLayerInner {
file: FileBlockReader<VirtualFile>,
}
impl AsRef<DeltaLayerInner> for DeltaLayerInner {
fn as_ref(&self) -> &DeltaLayerInner {
self
}
}
impl std::fmt::Debug for DeltaLayerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DeltaLayerInner")
@@ -311,86 +329,16 @@ impl Layer for DeltaLayer {
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.desc.lsn_range.start);
let mut need_image = true;
ensure!(self.desc.key_range.contains(&key));
{
// Open the file and lock the metadata in memory
let inner = self
.load(LayerAccessKind::GetValueReconstructData, ctx)
.await?;
let inner = self
.load(LayerAccessKind::GetValueReconstructData, ctx)
.await?;
// Scan the page versions backwards, starting from `lsn`.
let file = &inner.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
tree_reader
.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
let blob_ref = BlobRef(value);
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
})
.await?;
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
)
})?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
// release metadata lock and close the file
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
} else {
Ok(ValueReconstructResult::Complete)
}
inner
.get_value_reconstruct_data(key, lsn_range, reconstruct_state)
.await
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
@@ -516,43 +464,27 @@ impl DeltaLayer {
async fn load_inner(&self) -> Result<Arc<DeltaLayerInner>> {
let path = self.path();
let file = VirtualFile::open(&path)
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary = match &self.path_or_conf {
PathOrConf::Conf(_) => Some(Summary::from(self)),
PathOrConf::Path(_) => None,
};
let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let loaded = DeltaLayerInner::load(&path, summary)?;
match &self.path_or_conf {
PathOrConf::Conf(_) => {
let mut expected_summary = Summary::from(self);
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!("in-file summary does not match expected summary. actual = {:?} expected = {:?}", actual_summary, expected_summary);
}
}
PathOrConf::Path(path) => {
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
let expected_filename = self.filename().file_name();
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
if actual_filename != expected_filename {
println!(
"warning: filename does not match what is expected from in-file summary"
);
println!("actual: {:?}", actual_filename);
println!("expected: {:?}", expected_filename);
}
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
let expected_filename = self.filename().file_name();
if actual_filename != expected_filename {
println!("warning: filename does not match what is expected from in-file summary");
println!("actual: {:?}", actual_filename);
println!("expected: {:?}", expected_filename);
}
}
debug!("loaded from {}", &path.display());
Ok(Arc::new(DeltaLayerInner {
file,
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
}))
Ok(Arc::new(loaded))
}
/// Create a DeltaLayer struct representing an existing file on disk.
@@ -621,9 +553,12 @@ impl DeltaLayer {
/// Obtains all keys and value references stored in the layer
///
/// The value can be obtained via the [`ValueRef::load`] function.
pub async fn load_val_refs(&self, ctx: &RequestContext) -> Result<Vec<(Key, Lsn, ValueRef)>> {
pub async fn load_val_refs(
&self,
ctx: &RequestContext,
) -> Result<Vec<(Key, Lsn, ValueRef<Arc<DeltaLayerInner>>)>> {
let inner = self
.load(LayerAccessKind::KeyIter, ctx)
.load(LayerAccessKind::Iter, ctx)
.await
.context("load delta layer")?;
DeltaLayerInner::load_val_refs(inner)
@@ -912,15 +847,120 @@ impl Drop for DeltaLayerWriter {
}
impl DeltaLayerInner {
async fn load_val_refs(this: &Arc<DeltaLayerInner>) -> Result<Vec<(Key, Lsn, ValueRef)>> {
let file = &this.file;
pub(super) fn load(path: &std::path::Path, summary: Option<Summary>) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
if let Some(mut expected_summary) = summary {
// production code path
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
}
}
Ok(DeltaLayerInner {
file,
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
})
}
pub(super) async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
) -> anyhow::Result<ValueReconstructResult> {
let mut need_image = true;
// Scan the page versions backwards, starting from `lsn`.
let file = &self.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
this.index_start_blk,
this.index_root_blk,
self.index_start_blk,
self.index_root_blk,
file,
);
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
let mut all_offsets = Vec::<(Key, Lsn, ValueRef)>::new();
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
tree_reader
.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
let blob_ref = BlobRef(value);
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
})
.await?;
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path.display()
)
})?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
} else {
Ok(ValueReconstructResult::Complete)
}
}
pub(super) async fn load_val_refs<T: AsRef<DeltaLayerInner> + Clone>(
this: &T,
) -> Result<Vec<(Key, Lsn, ValueRef<T>)>> {
let dl = this.as_ref();
let file = &dl.file;
let tree_reader =
DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(dl.index_start_blk, dl.index_root_blk, file);
let mut all_offsets = Vec::<(Key, Lsn, ValueRef<T>)>::new();
tree_reader
.visit(
&[0u8; DELTA_KEY_SIZE],
@@ -939,7 +979,8 @@ impl DeltaLayerInner {
Ok(all_offsets)
}
async fn load_keys(&self) -> Result<Vec<(Key, Lsn, u64)>> {
pub(super) async fn load_keys(&self) -> Result<Vec<(Key, Lsn, u64)>> {
let file = &self.file;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
@@ -979,26 +1020,27 @@ impl DeltaLayerInner {
}
/// Reference to an on-disk value
pub struct ValueRef {
pub struct ValueRef<T: AsRef<DeltaLayerInner>> {
blob_ref: BlobRef,
reader: BlockCursor<Adapter>,
reader: BlockCursor<Adapter<T>>,
}
impl ValueRef {
impl<T: AsRef<DeltaLayerInner>> ValueRef<T> {
/// Loads the value from disk
pub fn load(&self) -> Result<Value> {
// theoretically we *could* record an access time for each, but it does not really matter
let buf = self.reader.read_blob(self.blob_ref.pos())?;
let val = Value::des(&buf)?;
Ok(val)
}
}
struct Adapter(Arc<DeltaLayerInner>);
struct Adapter<T: AsRef<DeltaLayerInner>>(T);
impl BlockReader for Adapter {
impl<T: AsRef<DeltaLayerInner>> BlockReader for Adapter<T> {
type BlockLease = PageReadGuard<'static>;
fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, std::io::Error> {
self.0.file.read_blk(blknum)
self.0.as_ref().file.read_blk(blknum)
}
}

View File

@@ -66,7 +66,7 @@ use super::{AsLayerDesc, Layer, LayerAccessStatsReset, PathOrConf, PersistentLay
/// the 'index' starts at the block indicated by 'index_start_blk'
///
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
struct Summary {
pub(super) struct Summary {
/// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
magic: u16,
format_version: u16,
@@ -85,13 +85,29 @@ struct Summary {
impl From<&ImageLayer> for Summary {
fn from(layer: &ImageLayer) -> Self {
Self::expected(
layer.desc.tenant_id,
layer.desc.timeline_id,
layer.desc.key_range.clone(),
layer.lsn,
)
}
}
impl Summary {
pub(super) fn expected(
tenant_id: TenantId,
timeline_id: TimelineId,
key_range: Range<Key>,
lsn: Lsn,
) -> Self {
Self {
magic: IMAGE_FILE_MAGIC,
format_version: STORAGE_FORMAT_VERSION,
tenant_id: layer.desc.tenant_id,
timeline_id: layer.desc.timeline_id,
key_range: layer.desc.key_range.clone(),
lsn: layer.lsn,
tenant_id,
timeline_id,
key_range,
lsn,
index_start_blk: 0,
index_root_blk: 0,
@@ -136,6 +152,8 @@ pub struct ImageLayerInner {
index_start_blk: u32,
index_root_blk: u32,
lsn: Lsn,
/// Reader object for reading blocks from the file.
file: FileBlockReader<VirtualFile>,
}
@@ -200,27 +218,11 @@ impl Layer for ImageLayer {
let inner = self
.load(LayerAccessKind::GetValueReconstructData, ctx)
.await?;
let file = &inner.file;
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader.get(&keybuf).await? {
let blob = file.block_cursor().read_blob(offset).with_context(|| {
format!(
"failed to read value from data file {} at offset {}",
self.path().display(),
offset
)
})?;
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
} else {
Ok(ValueReconstructResult::Missing)
}
inner
.get_value_reconstruct_data(key, reconstruct_state)
.await
// FIXME: makes no sense to dump paths
.with_context(|| format!("read {}", self.path().display()))
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
@@ -323,56 +325,35 @@ impl ImageLayer {
) -> Result<&ImageLayerInner> {
self.access_stats
.record_access(access_kind, ctx.task_kind());
loop {
if let Some(inner) = self.inner.get() {
return Ok(inner);
}
self.inner
.get_or_try_init(|| self.load_inner())
.await
.with_context(|| format!("Failed to load image layer {}", self.path().display()))?;
}
self.inner
.get_or_try_init(|| self.load_inner())
.await
.with_context(|| format!("Failed to load image layer {}", self.path().display()))
}
async fn load_inner(&self) -> Result<ImageLayerInner> {
let path = self.path();
// Open the file if it's not open already.
let file = VirtualFile::open(&path)
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let expected_summary = match &self.path_or_conf {
PathOrConf::Conf(_) => Some(Summary::from(self)),
PathOrConf::Path(_) => None,
};
match &self.path_or_conf {
PathOrConf::Conf(_) => {
let mut expected_summary = Summary::from(self);
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary)?;
if actual_summary != expected_summary {
bail!("in-file summary does not match expected summary. actual = {:?} expected = {:?}", actual_summary, expected_summary);
}
}
PathOrConf::Path(path) => {
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
let expected_filename = self.filename().file_name();
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
let expected_filename = self.filename().file_name();
if actual_filename != expected_filename {
println!(
"warning: filename does not match what is expected from in-file summary"
);
println!("actual: {:?}", actual_filename);
println!("expected: {:?}", expected_filename);
}
if actual_filename != expected_filename {
println!("warning: filename does not match what is expected from in-file summary");
println!("actual: {:?}", actual_filename);
println!("expected: {:?}", expected_filename);
}
}
Ok(ImageLayerInner {
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
file,
})
Ok(loaded)
}
/// Create an ImageLayer struct representing an existing file on disk
@@ -442,6 +423,65 @@ impl ImageLayer {
}
}
impl ImageLayerInner {
pub(super) fn load(
path: &std::path::Path,
lsn: Lsn,
summary: Option<Summary>,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
if let Some(mut expected_summary) = summary {
// production code path
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
}
}
Ok(ImageLayerInner {
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
lsn,
file,
})
}
pub(super) async fn get_value_reconstruct_data(
&self,
key: Key,
reconstruct_state: &mut ValueReconstructState,
) -> anyhow::Result<ValueReconstructResult> {
let file = &self.file;
let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file);
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader.get(&keybuf).await? {
let blob = file
.block_cursor()
.read_blob(offset)
.with_context(|| format!("failed to read value from offset {}", offset))?;
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
} else {
Ok(ValueReconstructResult::Missing)
}
}
}
/// A builder object for constructing a new image layer.
///
/// Usage:

View File

@@ -16,6 +16,7 @@ use anyhow::{ensure, Result};
use pageserver_api::models::InMemoryLayerInfo;
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::OnceLock;
use tracing::*;
use utils::{
bin_ser::BeSer,
@@ -42,14 +43,16 @@ pub struct InMemoryLayer {
tenant_id: TenantId,
timeline_id: TimelineId,
///
/// This layer contains all the changes from 'start_lsn'. The
/// start is inclusive.
///
start_lsn: Lsn,
/// The above fields never change. The parts that do change are in 'inner',
/// and protected by mutex.
/// Frozen layers have an exclusive end LSN.
/// Writes are only allowed when this is `None`.
end_lsn: OnceLock<Lsn>,
/// The above fields never change, except for `end_lsn`, which is only set once.
/// All other changing parts are in `inner`, and protected by a mutex.
inner: RwLock<InMemoryLayerInner>,
}
@@ -57,21 +60,16 @@ impl std::fmt::Debug for InMemoryLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemoryLayer")
.field("start_lsn", &self.start_lsn)
.field("end_lsn", &self.end_lsn)
.field("inner", &self.inner)
.finish()
}
}
pub struct InMemoryLayerInner {
/// Frozen layers have an exclusive end LSN.
/// Writes are only allowed when this is None
end_lsn: Option<Lsn>,
///
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The value is an offset into the
/// ephemeral file where the page version is stored.
///
index: HashMap<Key, VecMap<Lsn, u64>>,
/// The values are stored in a serialized format in this file.
@@ -82,15 +80,7 @@ pub struct InMemoryLayerInner {
impl std::fmt::Debug for InMemoryLayerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemoryLayerInner")
.field("end_lsn", &self.end_lsn)
.finish()
}
}
impl InMemoryLayerInner {
fn assert_writeable(&self) {
assert!(self.end_lsn.is_none());
f.debug_struct("InMemoryLayerInner").finish()
}
}
@@ -101,13 +91,21 @@ impl InMemoryLayer {
pub fn info(&self) -> InMemoryLayerInfo {
let lsn_start = self.start_lsn;
let lsn_end = self.inner.read().unwrap().end_lsn;
match lsn_end {
Some(lsn_end) => InMemoryLayerInfo::Frozen { lsn_start, lsn_end },
None => InMemoryLayerInfo::Open { lsn_start },
if let Some(&lsn_end) = self.end_lsn.get() {
InMemoryLayerInfo::Frozen { lsn_start, lsn_end }
} else {
InMemoryLayerInfo::Open { lsn_start }
}
}
fn assert_writable(&self) {
assert!(self.end_lsn.get().is_none());
}
fn end_lsn_or_max(&self) -> Lsn {
self.end_lsn.get().copied().unwrap_or(Lsn::MAX)
}
}
#[async_trait::async_trait]
@@ -117,14 +115,7 @@ impl Layer for InMemoryLayer {
}
fn get_lsn_range(&self) -> Range<Lsn> {
let inner = self.inner.read().unwrap();
let end_lsn = if let Some(end_lsn) = inner.end_lsn {
end_lsn
} else {
Lsn(u64::MAX)
};
self.start_lsn..end_lsn
self.start_lsn..self.end_lsn_or_max()
}
fn is_incremental(&self) -> bool {
@@ -136,11 +127,7 @@ impl Layer for InMemoryLayer {
async fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
let inner = self.inner.read().unwrap();
let end_str = inner
.end_lsn
.as_ref()
.map(Lsn::to_string)
.unwrap_or_default();
let end_str = self.end_lsn_or_max();
println!(
"----- in-memory layer for tli {} LSNs {}-{} ----",
@@ -236,9 +223,7 @@ impl Layer for InMemoryLayer {
impl std::fmt::Display for InMemoryLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let inner = self.inner.read().unwrap();
let end_lsn = inner.end_lsn.unwrap_or(Lsn(u64::MAX));
let end_lsn = self.end_lsn_or_max();
write!(f, "inmem-{:016X}-{:016X}", self.start_lsn.0, end_lsn.0)
}
}
@@ -270,8 +255,8 @@ impl InMemoryLayer {
timeline_id,
tenant_id,
start_lsn,
end_lsn: OnceLock::new(),
inner: RwLock::new(InMemoryLayerInner {
end_lsn: None,
index: HashMap::new(),
file,
}),
@@ -285,7 +270,7 @@ impl InMemoryLayer {
pub fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
let mut inner = self.inner.write().unwrap();
inner.assert_writeable();
self.assert_writable();
let off = {
SER_BUFFER.with(|x| -> Result<_> {
@@ -317,10 +302,10 @@ impl InMemoryLayer {
/// Records the end_lsn for non-dropped layers.
/// `end_lsn` is exclusive
pub fn freeze(&self, end_lsn: Lsn) {
let mut inner = self.inner.write().unwrap();
let inner = self.inner.write().unwrap();
assert!(self.start_lsn < end_lsn);
inner.end_lsn = Some(end_lsn);
self.end_lsn.set(end_lsn).expect("end_lsn set only once");
for vec_map in inner.index.values() {
for (lsn, _pos) in vec_map.as_slice() {
@@ -344,12 +329,14 @@ impl InMemoryLayer {
// rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().unwrap();
let end_lsn = *self.end_lsn.get().unwrap();
let mut delta_layer_writer = DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_id,
Key::MIN,
self.start_lsn..inner.end_lsn.unwrap(),
self.start_lsn..end_lsn,
)?;
let mut buf = Vec::new();

View File

@@ -919,7 +919,7 @@ impl Timeline {
pub fn set_state(&self, new_state: TimelineState) {
match (self.current_state(), new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
warn!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
}
(st, TimelineState::Loading) => {
error!("ignoring transition from {st:?} into Loading state");
@@ -1160,7 +1160,7 @@ impl Timeline {
return Err(EvictionError::CannotEvictRemoteLayer);
}
let layer_file_size = local_layer.file_size();
let layer_file_size = local_layer.layer_desc().file_size;
let local_layer_mtime = local_layer
.local_path()
@@ -1590,7 +1590,6 @@ impl Timeline {
///
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut guard = self.layers.write().await;
let mut num_layers = 0;
let timer = self.metrics.load_layer_map_histo.start_timer();
@@ -1608,12 +1607,12 @@ impl Timeline {
let fname = direntry.file_name();
let fname = fname.to_string_lossy();
if let Some(imgfilename) = ImageFileName::parse_str(&fname) {
if let Some(filename) = ImageFileName::parse_str(&fname) {
// create an ImageLayer struct for each image file.
if imgfilename.lsn > disk_consistent_lsn {
if filename.lsn > disk_consistent_lsn {
info!(
"found future image layer {} on timeline {} disk_consistent_lsn is {}",
imgfilename, self.timeline_id, disk_consistent_lsn
filename, self.timeline_id, disk_consistent_lsn
);
rename_to_backup(&direntry_path)?;
@@ -1621,31 +1620,31 @@ impl Timeline {
}
let file_size = direntry_path.metadata()?.len();
let stats =
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident);
let layer = ImageLayer::new(
self.conf,
self.timeline_id,
self.tenant_id,
&imgfilename,
&filename,
file_size,
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident),
stats,
);
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
loaded_layers.push(Arc::new(layer));
num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
} else if let Some(filename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file.
// The end-LSN is exclusive, while disk_consistent_lsn is
// inclusive. For example, if disk_consistent_lsn is 100, it is
// OK for a delta layer to have end LSN 101, but if the end LSN
// is 102, then it might not have been fully flushed to disk
// before crash.
if deltafilename.lsn_range.end > disk_consistent_lsn + 1 {
if filename.lsn_range.end > disk_consistent_lsn + 1 {
info!(
"found future delta layer {} on timeline {} disk_consistent_lsn is {}",
deltafilename, self.timeline_id, disk_consistent_lsn
filename, self.timeline_id, disk_consistent_lsn
);
rename_to_backup(&direntry_path)?;
@@ -1653,20 +1652,20 @@ impl Timeline {
}
let file_size = direntry_path.metadata()?.len();
let stats =
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident);
let layer = DeltaLayer::new(
self.conf,
self.timeline_id,
self.tenant_id,
&deltafilename,
&filename,
file_size,
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident),
stats,
);
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
loaded_layers.push(Arc::new(layer));
num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
} else if remote_timeline_client::is_temp_download_file(&direntry_path) {
@@ -1691,6 +1690,7 @@ impl Timeline {
}
}
let num_layers = loaded_layers.len();
guard.initialize_local_layers(loaded_layers, Lsn(disk_consistent_lsn.0) + 1);
info!(
@@ -1791,13 +1791,15 @@ impl Timeline {
);
continue;
}
let stats =
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted);
let remote_layer = RemoteLayer::new_img(
self.tenant_id,
self.timeline_id,
imgfilename,
&remote_layer_metadata,
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted),
stats,
);
let remote_layer = Arc::new(remote_layer);
added_remote_layers.push(remote_layer);
@@ -1816,12 +1818,15 @@ impl Timeline {
);
continue;
}
let stats =
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted);
let remote_layer = RemoteLayer::new_delta(
self.tenant_id,
self.timeline_id,
deltafilename,
&remote_layer_metadata,
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted),
stats,
);
let remote_layer = Arc::new(remote_layer);
added_remote_layers.push(remote_layer);
@@ -2269,15 +2274,16 @@ trait TraversalLayerExt {
impl TraversalLayerExt for Arc<dyn PersistentLayer> {
fn traversal_id(&self) -> TraversalId {
let timeline_id = self.layer_desc().timeline_id;
match self.local_path() {
Some(local_path) => {
debug_assert!(local_path.to_str().unwrap().contains(&format!("{}", self.get_timeline_id())),
debug_assert!(local_path.to_str().unwrap().contains(&format!("{}", timeline_id)),
"need timeline ID to uniquely identify the layer when traversal crosses ancestor boundary",
);
format!("{}", local_path.display())
}
None => {
format!("remote {}/{self}", self.get_timeline_id())
format!("remote {}/{self}", timeline_id)
}
}
}
@@ -2813,7 +2819,10 @@ impl Timeline {
// We will remove frozen layer and add delta layer in one atomic operation later.
let layer = self.create_delta_layer(&frozen_layer).await?;
(
HashMap::from([(layer.filename(), LayerFileMetadata::new(layer.file_size()))]),
HashMap::from([(
layer.filename(),
LayerFileMetadata::new(layer.layer_desc().file_size),
)]),
Some(layer),
)
};
@@ -2833,7 +2842,7 @@ impl Timeline {
);
// update metrics
let sz = l.file_size();
let sz = l.layer_desc().file_size;
self.metrics.resident_physical_size_gauge.add(sz);
self.metrics.num_persistent_files_created.inc_by(1);
self.metrics.persistent_bytes_written.inc_by(sz);
@@ -3452,14 +3461,14 @@ impl Timeline {
// "gaps" in the sequence of level 0 files should only happen in case
// of a crash, partial download from cloud storage, or something like
// that, so it's not a big deal in practice.
level0_deltas.sort_by_key(|l| l.get_lsn_range().start);
level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
let mut level0_deltas_iter = level0_deltas.iter();
let first_level0_delta = level0_deltas_iter.next().unwrap();
let mut prev_lsn_end = first_level0_delta.get_lsn_range().end;
let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
let mut deltas_to_compact = vec![Arc::clone(first_level0_delta)];
for l in level0_deltas_iter {
let lsn_range = l.get_lsn_range();
let lsn_range = &l.layer_desc().lsn_range;
if lsn_range.start != prev_lsn_end {
break;
@@ -3468,8 +3477,13 @@ impl Timeline {
prev_lsn_end = lsn_range.end;
}
let lsn_range = Range {
start: deltas_to_compact.first().unwrap().get_lsn_range().start,
end: deltas_to_compact.last().unwrap().get_lsn_range().end,
start: deltas_to_compact
.first()
.unwrap()
.layer_desc()
.lsn_range
.start,
end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
};
let remotes = deltas_to_compact
@@ -3521,33 +3535,22 @@ impl Timeline {
let mut prev: Option<Key> = None;
let mut all_value_refs = Vec::new();
let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
// TODO: replace this with an await once we fully go async
all_value_refs.extend(
Handle::current().block_on(
l.clone()
.downcast_delta_layer()
.expect("delta layer")
.load_val_refs(ctx),
)?,
);
let delta = l.clone().downcast_delta_layer().expect("delta layer");
Handle::current().block_on(async {
all_value_refs.extend(delta.load_val_refs(ctx).await?);
all_keys.extend(delta.load_keys(ctx).await?);
anyhow::Ok(())
})?;
}
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_value_refs.sort_by_key(|(key, lsn, _value_ref)| (*key, *lsn));
let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
// TODO: replace this with an await once we fully go async
all_keys.extend(
Handle::current().block_on(
l.clone()
.downcast_delta_layer()
.expect("delta layer")
.load_keys(ctx),
)?,
);
}
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
all_keys.sort_by_key(|(key, lsn, _size)| (*key, *lsn));
@@ -4656,7 +4659,7 @@ impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction {
impl LocalLayerInfoForDiskUsageEviction {
pub fn file_size(&self) -> u64 {
self.layer.file_size()
self.layer.layer_desc().file_size
}
}

View File

@@ -219,27 +219,13 @@ async fn delete_local_layer_files(
}
};
let r = if metadata.is_dir() {
// There shouldnt be any directories inside timeline dir as of current layout.
if metadata.is_dir() {
warn!(path=%entry.path().display(), "unexpected directory under timeline dir");
tokio::fs::remove_dir(entry.path()).await
} else {
tokio::fs::remove_file(entry.path()).await
};
if let Err(e) = r {
if e.kind() == std::io::ErrorKind::NotFound {
warn!(
timeline_dir=?local_timeline_directory,
path=?entry.path().display(),
"got not found err while removing timeline dir, proceeding anyway"
);
continue;
}
anyhow::bail!(anyhow::anyhow!(
"Failed to remove: {}. Error: {e}",
entry.path().display()
));
}
.with_context(|| format!("Failed to remove: {}", entry.path().display()))?;
}
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
@@ -359,10 +345,11 @@ impl DeleteTimelineFlow {
// NB: If this fails half-way through, and is retried, the retry will go through
// all the same steps again. Make sure the code here is idempotent, and don't
// error out if some of the shutdown tasks have already been completed!
#[instrument(skip_all, fields(tenant_id=%tenant.tenant_id, %timeline_id))]
#[instrument(skip(tenant), fields(tenant_id=%tenant.tenant_id))]
pub async fn run(
tenant: &Arc<Tenant>,
timeline_id: TimelineId,
inplace: bool,
) -> Result<(), DeleteTimelineError> {
let (timeline, mut guard) = Self::prepare(tenant, timeline_id)?;
@@ -380,7 +367,11 @@ impl DeleteTimelineFlow {
))?
});
Self::schedule_background(guard, tenant.conf, Arc::clone(tenant), timeline);
if inplace {
Self::background(guard, tenant.conf, tenant, &timeline).await?
} else {
Self::schedule_background(guard, tenant.conf, Arc::clone(tenant), timeline);
}
Ok(())
}
@@ -398,6 +389,8 @@ impl DeleteTimelineFlow {
}
/// Shortcut to create Timeline in stopping state and spawn deletion task.
/// See corresponding parts of [`crate::tenant::delete::DeleteTenantFlow`]
#[instrument(skip_all, fields(%timeline_id))]
pub async fn resume_deletion(
tenant: Arc<Tenant>,
timeline_id: TimelineId,
@@ -444,11 +437,15 @@ impl DeleteTimelineFlow {
Ok(())
}
#[instrument(skip_all, fields(%timeline_id))]
pub async fn cleanup_remaining_timeline_fs_traces(
tenant: &Tenant,
timeline_id: TimelineId,
) -> anyhow::Result<()> {
cleanup_remaining_timeline_fs_traces(tenant.conf, tenant.tenant_id, timeline_id).await
let r =
cleanup_remaining_timeline_fs_traces(tenant.conf, tenant.tenant_id, timeline_id).await;
info!("Done");
r
}
fn prepare(
@@ -494,11 +491,17 @@ impl DeleteTimelineFlow {
// At the end of the operation we're holding the guard and need to lock timelines map
// to remove the timeline from it.
// Always if you have two locks that are taken in different order this can result in a deadlock.
let delete_lock_guard = DeletionGuard(
Arc::clone(&timeline.delete_progress)
.try_lock_owned()
.map_err(|_| DeleteTimelineError::AlreadyInProgress)?,
);
let delete_progress = Arc::clone(&timeline.delete_progress);
let delete_lock_guard = match delete_progress.try_lock_owned() {
Ok(guard) => DeletionGuard(guard),
Err(_) => {
// Unfortunately if lock fails arc is consumed.
return Err(DeleteTimelineError::AlreadyInProgress(Arc::clone(
&timeline.delete_progress,
)));
}
};
timeline.set_state(TimelineState::Stopping);
@@ -553,10 +556,14 @@ impl DeleteTimelineFlow {
remove_timeline_from_tenant(tenant, timeline.timeline_id, &guard).await?;
*guard.0 = Self::Finished;
*guard = Self::Finished;
Ok(())
}
pub(crate) fn is_finished(&self) -> bool {
matches!(self, Self::Finished)
}
}
struct DeletionGuard(OwnedMutexGuard<DeleteTimelineFlow>);

View File

@@ -120,10 +120,9 @@ impl LayerManager {
ensure!(
lsn > last_record_lsn,
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})\n{}",
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
lsn,
last_record_lsn,
std::backtrace::Backtrace::force_capture(),
);
// Do we have a layer open for writing already?
@@ -278,7 +277,7 @@ impl LayerManager {
updates: &mut BatchedUpdates<'_>,
mapping: &mut LayerFileManager,
) {
updates.remove_historic(layer.layer_desc().clone());
updates.remove_historic(layer.layer_desc());
mapping.remove(layer);
}
@@ -292,10 +291,10 @@ impl LayerManager {
metrics: &TimelineMetrics,
mapping: &mut LayerFileManager,
) -> anyhow::Result<()> {
let desc = layer.layer_desc();
if !layer.is_remote_layer() {
layer.delete_resident_layer_file()?;
let layer_file_size = layer.file_size();
metrics.resident_physical_size_gauge.sub(layer_file_size);
metrics.resident_physical_size_gauge.sub(desc.file_size);
}
// TODO Removing from the bottom of the layer map is expensive.
@@ -303,7 +302,7 @@ impl LayerManager {
// won't be needed for page reconstruction for this timeline,
// and mark what we can't delete yet as deleted from the layer
// map index without actually rebuilding the index.
updates.remove_historic(layer.layer_desc().clone());
updates.remove_historic(desc);
mapping.remove(layer);
Ok(())

View File

@@ -53,6 +53,9 @@ pub struct VirtualFile {
pub path: PathBuf,
open_options: OpenOptions,
// These are strings becase we only use them for metrics, and those expect strings.
// It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
// strings.
tenant_id: String,
timeline_id: String,
}

View File

@@ -5,7 +5,7 @@ use crate::{
auth::{self, AuthFlow, ClientCredentials},
compute,
console::{self, AuthInfo, CachedNodeInfo, ConsoleReqExtra},
proxy::handle_try_wake,
proxy::{handle_try_wake, retry_after},
sasl, scram,
stream::PqStream,
};
@@ -62,10 +62,13 @@ pub(super) async fn authenticate(
}
Ok(ControlFlow::Continue(e)) => {
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
num_retries += 1;
}
Ok(ControlFlow::Break(n)) => break n,
}
let wait_duration = retry_after(num_retries);
num_retries += 1;
tokio::time::sleep(wait_duration).await;
};
if let Some(keys) = scram_keys {
use tokio_postgres::config::AuthKeys;

View File

@@ -8,6 +8,7 @@ use super::{
use crate::{auth::ClientCredentials, compute, http, scram};
use async_trait::async_trait;
use futures::TryFutureExt;
use tokio::time::Instant;
use tokio_postgres::config::SslMode;
use tracing::{error, info, info_span, warn, Instrument};
@@ -47,7 +48,9 @@ impl Api {
.build()?;
info!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
let response = self.endpoint.execute(request).await?;
info!(duration = ?start.elapsed(), "received http response");
let body = match parse_body::<GetRoleSecret>(response).await {
Ok(body) => body,
// Error 404 is special: it's ok not to have a secret.
@@ -88,7 +91,9 @@ impl Api {
.build()?;
info!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
let response = self.endpoint.execute(request).await?;
info!(duration = ?start.elapsed(), "received http response");
let body = parse_body::<WakeCompute>(response).await?;
// Unfortunately, ownership won't let us use `Option::ok_or` here.

View File

@@ -7,11 +7,14 @@ pub mod server;
pub mod sql_over_http;
pub mod websocket;
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use futures::FutureExt;
pub use reqwest::{Request, Response, StatusCode};
pub use reqwest_middleware::{ClientWithMiddleware, Error};
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio::time::Instant;
use tracing::trace;
use crate::url::ApiUrl;
use reqwest_middleware::RequestBuilder;
@@ -20,13 +23,21 @@ use reqwest_middleware::RequestBuilder;
/// because it takes care of observability (OpenTelemetry).
/// We deliberately don't want to replace this with a public static.
pub fn new_client() -> ClientWithMiddleware {
reqwest_middleware::ClientBuilder::new(reqwest::Client::new())
let client = reqwest::ClientBuilder::new()
.dns_resolver(Arc::new(GaiResolver::default()))
.connection_verbose(true)
.build()
.expect("Failed to create http client");
reqwest_middleware::ClientBuilder::new(client)
.with(reqwest_tracing::TracingMiddleware::default())
.build()
}
pub fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware {
let timeout_client = reqwest::ClientBuilder::new()
.dns_resolver(Arc::new(GaiResolver::default()))
.connection_verbose(true)
.timeout(default_timout)
.build()
.expect("Failed to create http client with timeout");
@@ -39,6 +50,10 @@ pub fn new_client_with_timeout(default_timout: Duration) -> ClientWithMiddleware
// As per docs, "This middleware always errors when given requests with streaming bodies".
// That's all right because we only use this client to send `serde_json::RawValue`, which
// is not a stream.
//
// ex-maintainer note:
// this limitation can be fixed if streaming is necessary.
// retries will still not be performed, but it wont error immediately
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build()
}
@@ -81,6 +96,37 @@ impl Endpoint {
}
}
/// https://docs.rs/reqwest/0.11.18/src/reqwest/dns/gai.rs.html
use hyper::{
client::connect::dns::{GaiResolver as HyperGaiResolver, Name},
service::Service,
};
use reqwest::dns::{Addrs, Resolve, Resolving};
#[derive(Debug)]
pub struct GaiResolver(HyperGaiResolver);
impl Default for GaiResolver {
fn default() -> Self {
Self(HyperGaiResolver::new())
}
}
impl Resolve for GaiResolver {
fn resolve(&self, name: Name) -> Resolving {
let this = &mut self.0.clone();
let start = Instant::now();
Box::pin(
Service::<Name>::call(this, name.clone()).map(move |result| {
let resolve_duration = start.elapsed();
trace!(duration = ?resolve_duration, addr = %name, "resolve host complete");
result
.map(|addrs| -> Addrs { Box::new(addrs) })
.map_err(|err| -> Box<dyn std::error::Error + Send + Sync> { Box::new(err) })
}),
)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -34,7 +34,7 @@ enum Payload {
Batch(Vec<QueryData>),
}
pub const MAX_RESPONSE_SIZE: usize = 1024 * 1024; // 1 MB
pub const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MB
const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
@@ -214,7 +214,7 @@ pub async fn handle(
if request_content_length > MAX_REQUEST_SIZE {
return Err(anyhow::anyhow!(
"request is too large (max {MAX_REQUEST_SIZE} bytes)"
"request is too large (max is {MAX_REQUEST_SIZE} bytes)"
));
}
@@ -292,13 +292,15 @@ async fn query_to_json<T: GenericClient>(
// big.
pin_mut!(row_stream);
let mut rows: Vec<tokio_postgres::Row> = Vec::new();
let mut curret_size = 0;
let mut current_size = 0;
while let Some(row) = row_stream.next().await {
let row = row?;
curret_size += row.body_len();
current_size += row.body_len();
rows.push(row);
if curret_size > MAX_RESPONSE_SIZE {
return Err(anyhow::anyhow!("response too large"));
if current_size > MAX_RESPONSE_SIZE {
return Err(anyhow::anyhow!(
"response is too large (max is {MAX_RESPONSE_SIZE} bytes)"
));
}
}

View File

@@ -187,12 +187,16 @@ async fn ws_handler(
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)
.map_err(|e| ApiError::BadRequest(e.into()))?;
tokio::spawn(async move {
if let Err(e) = serve_websocket(websocket, config, &cancel_map, session_id, host).await
{
error!(session_id = ?session_id, "error in websocket connection: {e:?}");
tokio::spawn(
async move {
if let Err(e) =
serve_websocket(websocket, config, &cancel_map, session_id, host).await
{
error!(session_id = ?session_id, "error in websocket connection: {e:#}");
}
}
});
.in_current_span(),
);
// Return the response so the spawned future can continue.
Ok(response)
@@ -217,6 +221,10 @@ async fn ws_handler(
},
None => Value::Null,
};
error!(
?code,
"sql-over-http per-client task finished with an error: {e:#}"
);
(
json!({ "message": message, "code": code }),
HashMap::default(),

View File

@@ -545,7 +545,7 @@ impl ShouldRetry for compute::ConnectionError {
}
}
fn retry_after(num_retries: u32) -> time::Duration {
pub fn retry_after(num_retries: u32) -> time::Duration {
// 1.5 seems to be an ok growth factor heuristic
BASE_RETRY_WAIT_DURATION.mul_f64(1.5_f64.powi(num_retries as i32))
}

View File

@@ -223,6 +223,7 @@ module.exports = async ({ github, context, fetch, report }) => {
} else {
commentBody += `#### No tests were run or test report is not available\n`
}
commentBody += autoupdateNotice
let createCommentFn, listCommentsFn, updateCommentFn, issueNumberOrSha
if (isPullRequest) {

View File

@@ -0,0 +1,198 @@
#! /usr/bin/env python3
import argparse
import dataclasses
import json
import logging
import os
import re
import sys
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Tuple
import backoff
import psycopg2
from psycopg2.extras import execute_values
CREATE_TABLE = """
CREATE TABLE IF NOT EXISTS results (
id BIGSERIAL PRIMARY KEY,
parent_suite TEXT NOT NULL,
suite TEXT NOT NULL,
name TEXT NOT NULL,
status TEXT NOT NULL,
started_at TIMESTAMPTZ NOT NULL,
stopped_at TIMESTAMPTZ NOT NULL,
duration INT NOT NULL,
flaky BOOLEAN NOT NULL,
build_type TEXT NOT NULL,
pg_version INT NOT NULL,
run_id BIGINT NOT NULL,
run_attempt INT NOT NULL,
reference TEXT NOT NULL,
revision CHAR(40) NOT NULL,
raw JSONB COMPRESSION lz4 NOT NULL,
UNIQUE (parent_suite, suite, name, build_type, pg_version, started_at, stopped_at, run_id)
);
"""
@dataclass
class Row:
parent_suite: str
suite: str
name: str
status: str
started_at: datetime
stopped_at: datetime
duration: int
flaky: bool
build_type: str
pg_version: int
run_id: int
run_attempt: int
reference: str
revision: str
raw: str
TEST_NAME_RE = re.compile(r"[\[-](?P<build_type>debug|release)-pg(?P<pg_version>\d+)[-\]]")
def err(msg):
print(f"error: {msg}")
sys.exit(1)
@contextmanager
def get_connection_cursor(connstr: str):
@backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150)
def connect(connstr):
conn = psycopg2.connect(connstr, connect_timeout=30)
conn.autocommit = True
return conn
conn = connect(connstr)
try:
with conn.cursor() as cur:
yield cur
finally:
if conn is not None:
conn.close()
def create_table(cur):
cur.execute(CREATE_TABLE)
def parse_test_name(test_name: str) -> Tuple[str, int, str]:
build_type, pg_version = None, None
if match := TEST_NAME_RE.search(test_name):
found = match.groupdict()
build_type = found["build_type"]
pg_version = int(found["pg_version"])
else:
# It's ok, we embed BUILD_TYPE and Postgres Version into the test name only for regress suite and do not for other suites (like performance)
build_type = "release"
pg_version = 14
unparametrized_name = re.sub(rf"{build_type}-pg{pg_version}-?", "", test_name).replace("[]", "")
return build_type, pg_version, unparametrized_name
def ingest_test_result(
cur,
reference: str,
revision: str,
run_id: int,
run_attempt: int,
test_cases_dir: Path,
):
rows = []
for f in test_cases_dir.glob("*.json"):
test = json.loads(f.read_text())
# Drop unneded fields from raw data
raw = test.copy()
raw.pop("parameterValues")
raw.pop("labels")
raw.pop("extra")
build_type, pg_version, unparametrized_name = parse_test_name(test["name"])
labels = {label["name"]: label["value"] for label in test["labels"]}
row = Row(
parent_suite=labels["parentSuite"],
suite=labels["suite"],
name=unparametrized_name,
status=test["status"],
started_at=datetime.fromtimestamp(test["time"]["start"] / 1000, tz=timezone.utc),
stopped_at=datetime.fromtimestamp(test["time"]["stop"] / 1000, tz=timezone.utc),
duration=test["time"]["duration"],
flaky=test["flaky"] or test["retriesStatusChange"],
build_type=build_type,
pg_version=pg_version,
run_id=run_id,
run_attempt=run_attempt,
reference=reference,
revision=revision,
raw=json.dumps(raw),
)
rows.append(dataclasses.astuple(row))
columns = ",".join(f.name for f in dataclasses.fields(Row))
query = f"INSERT INTO results ({columns}) VALUES %s ON CONFLICT DO NOTHING"
execute_values(cur, query, rows)
def main():
parser = argparse.ArgumentParser(
description="Regress test result uploader. \
Database connection string should be provided via DATABASE_URL environment variable",
)
parser.add_argument("--initdb", action="store_true", help="Initialuze database")
parser.add_argument(
"--reference", type=str, required=True, help="git reference, for example refs/heads/main"
)
parser.add_argument("--revision", type=str, required=True, help="git revision")
parser.add_argument("--run-id", type=int, required=True, help="GitHub Workflow run id")
parser.add_argument(
"--run-attempt", type=int, required=True, help="GitHub Workflow run attempt"
)
parser.add_argument(
"--test-cases-dir",
type=Path,
required=True,
help="Path to a dir with extended test cases data",
)
connstr = os.getenv("DATABASE_URL", "")
if not connstr:
err("DATABASE_URL environment variable is not set")
args = parser.parse_args()
with get_connection_cursor(connstr) as cur:
if args.initdb:
create_table(cur)
if not args.test_cases_dir.exists():
err(f"test-cases dir {args.test_cases_dir} does not exist")
if not args.test_cases_dir.is_dir():
err(f"test-cases dir {args.test_cases_dir} it not a directory")
ingest_test_result(
cur,
reference=args.reference,
revision=args.revision,
run_id=args.run_id,
run_attempt=args.run_attempt,
test_cases_dir=args.test_cases_dir,
)
if __name__ == "__main__":
logging.getLogger("backoff").addHandler(logging.StreamHandler())
main()

View File

@@ -32,6 +32,7 @@ import requests
from _pytest.config import Config
from _pytest.config.argparsing import Parser
from _pytest.fixtures import FixtureRequest
from mypy_boto3_s3 import S3Client
# Type-related stuff
from psycopg2.extensions import connection as PgConnection
@@ -432,7 +433,7 @@ class NeonEnvBuilder:
self.port_distributor = port_distributor
self.remote_storage = remote_storage
self.ext_remote_storage: Optional[S3Storage] = None
self.remote_storage_client: Optional[Any] = None
self.remote_storage_client: Optional[S3Client] = None
self.remote_storage_users = remote_storage_users
self.broker = broker
self.run_id = run_id
@@ -875,7 +876,14 @@ class NeonEnv:
def timeline_dir(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
"""Get a timeline directory's path based on the repo directory of the test environment"""
return self.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
return self.tenant_dir(tenant_id) / "timelines" / str(timeline_id)
def tenant_dir(
self,
tenant_id: TenantId,
) -> Path:
"""Get a tenant directory's path based on the repo directory of the test environment"""
return self.repo_dir / "tenants" / str(tenant_id)
def get_pageserver_version(self) -> str:
bin_pageserver = str(self.neon_binpath / "pageserver")
@@ -1518,6 +1526,8 @@ class NeonPageserver(PgProtocol):
# Pageserver timeline deletion should be polled until it gets 404, so ignore it globally
".*Error processing HTTP request: NotFound: Timeline .* was not found",
".*took more than expected to complete.*",
# these can happen during shutdown, but it should not be a reason to fail a test
".*completed, took longer than expected.*",
]
def start(
@@ -2817,8 +2827,15 @@ def check_restored_datadir_content(
endpoint: Endpoint,
):
# Get the timeline ID. We need it for the 'basebackup' command
timeline = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
# many tests already checkpoint, but do it just in case
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CHECKPOINT")
# wait for pageserver to catch up
wait_for_last_flush_lsn(env, endpoint, endpoint.tenant_id, timeline_id)
# stop postgres to ensure that files won't change
endpoint.stop()
@@ -2833,7 +2850,7 @@ def check_restored_datadir_content(
{psql_path} \
--no-psqlrc \
postgres://localhost:{env.pageserver.service_port.pg} \
-c 'basebackup {endpoint.tenant_id} {timeline}' \
-c 'basebackup {endpoint.tenant_id} {timeline_id}' \
| tar -x -C {restored_dir_path}
"""

View File

@@ -210,6 +210,10 @@ class PageserverHttpClient(requests.Session):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params)
self.verbose_error(res)
def tenant_delete(self, tenant_id: TenantId):
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
self.verbose_error(res)
def tenant_load(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/load")
self.verbose_error(res)

View File

@@ -1,9 +1,11 @@
import time
from typing import Any, Dict, Optional
from typing import TYPE_CHECKING, Any, Dict, Optional
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.remote_storage import RemoteStorageKind, S3Storage
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
def assert_tenant_state(
@@ -17,15 +19,6 @@ def assert_tenant_state(
assert tenant_status["state"]["slug"] == expected_state, message or tenant_status
def tenant_exists(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
tenants = pageserver_http.tenant_list()
matching = [t for t in tenants if TenantId(t["id"]) == tenant_id]
assert len(matching) < 2
if len(matching) == 0:
return None
return matching[0]
def remote_consistent_lsn(
pageserver_http: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
) -> Lsn:
@@ -199,20 +192,19 @@ def wait_timeline_detail_404(
timeline_id: TimelineId,
iterations: int,
):
last_exc = None
for _ in range(iterations):
time.sleep(0.250)
def timeline_is_missing():
data = {}
try:
data = pageserver_http.timeline_detail(tenant_id, timeline_id)
log.info(f"detail {data}")
log.info(f"timeline detail {data}")
except PageserverApiException as e:
log.debug(e)
if e.status_code == 404:
return
last_exc = e
raise RuntimeError(f"Timeline exists state {data.get('state')}")
raise last_exc or RuntimeError(f"Timeline wasnt deleted in time, state: {data['state']}")
wait_until(iterations, interval=0.250, func=timeline_is_missing)
def timeline_delete_wait_completed(
@@ -224,3 +216,72 @@ def timeline_delete_wait_completed(
):
pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args)
wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id, iterations)
if TYPE_CHECKING:
# TODO avoid by combining remote storage related stuff in single type
# and just passing in this type instead of whole builder
from fixtures.neon_fixtures import NeonEnvBuilder
def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None):
# For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api.
assert neon_env_builder.remote_storage_kind in (
RemoteStorageKind.MOCK_S3,
RemoteStorageKind.REAL_S3,
)
# For mypy
assert isinstance(neon_env_builder.remote_storage, S3Storage)
assert neon_env_builder.remote_storage_client is not None
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
response = neon_env_builder.remote_storage_client.list_objects_v2(
Bucket=neon_env_builder.remote_storage.bucket_name,
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
)
objects = response.get("Contents")
assert (
response["KeyCount"] == 0
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
def wait_tenant_status_404(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
iterations: int,
interval: float = 0.250,
):
def tenant_is_missing():
data = {}
try:
data = pageserver_http.tenant_status(tenant_id)
log.info(f"tenant status {data}")
except PageserverApiException as e:
log.debug(e)
if e.status_code == 404:
return
raise RuntimeError(f"Timeline exists state {data.get('state')}")
wait_until(iterations, interval=interval, func=tenant_is_missing)
def tenant_delete_wait_completed(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
iterations: int,
):
pageserver_http.tenant_delete(tenant_id=tenant_id)
wait_tenant_status_404(pageserver_http, tenant_id=tenant_id, iterations=iterations)
MANY_SMALL_LAYERS_TENANT_CONFIG = {
"gc_period": "0s",
"compaction_period": "0s",
"checkpoint_distance": f"{1024**2}",
"image_creation_threshold": "100",
}
def poll_for_remote_storage_iterations(remote_storage_kind: RemoteStorageKind) -> int:
return 20 if remote_storage_kind is RemoteStorageKind.REAL_S3 else 6

View File

@@ -6,13 +6,16 @@ import subprocess
import tarfile
import time
from pathlib import Path
from typing import Any, Callable, Dict, List, Tuple, TypeVar
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Tuple, TypeVar
from urllib.parse import urlencode
import allure
from psycopg2.extensions import cursor
from fixtures.log_helper import log
if TYPE_CHECKING:
from fixtures.neon_fixtures import PgBin
from fixtures.types import TimelineId
Fn = TypeVar("Fn", bound=Callable[..., Any])
@@ -300,17 +303,13 @@ def wait_until(number_of_iterations: int, interval: float, func: Fn):
raise Exception("timed out while waiting for %s" % func) from last_exception
def wait_while(number_of_iterations: int, interval: float, func):
def run_pg_bench_small(pg_bin: "PgBin", connstr: str):
"""
Wait until 'func' returns false, or throws an exception.
Fast way to populate data.
For more layers consider combining with these tenant settings:
{
"checkpoint_distance": 1024 ** 2,
"image_creation_threshold": 100,
}
"""
for i in range(number_of_iterations):
try:
if not func():
return
log.info("waiting for %s iteration %s failed", func, i + 1)
time.sleep(interval)
continue
except Exception:
return
raise Exception("timed out while waiting for %s" % func)
pg_bin.run(["pgbench", "-i", "-I dtGvp", "-s1", connstr])

View File

@@ -1,24 +1,12 @@
{
"public_extensions": [
"anon",
"pg_buffercache"
],
"library_index": {
"anon": "anon",
"pg_buffercache": "pg_buffercache"
},
"public_extensions": [ "weighted_mean" ],
"library_index": { "weighted_mean": "weighted_mean" },
"extension_data": {
"pg_buffercache": {
"weighted_mean": {
"control_data": {
"pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
"weighted_mean.control": "# weighted_mean extension\ncomment = 'A weighted mean aggregate function extension'\ndefault_version = '1.0.1'\nmodule_pathname = '$libdir/weighted_mean'\nrelocatable = true\ntrusted = true\n"
},
"archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
},
"anon": {
"control_data": {
"anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
},
"archive_path": "5670669815/v14/extensions/anon.tar.zst"
"archive_path": "5670669815/v14/extensions/weighted_mean.tar.zst"
}
}
}
}

View File

@@ -1,17 +1,16 @@
{
"public_extensions": [
"anon"
"weighted_mean"
],
"library_index": {
"anon": "anon"
"weighted_mean": "weighted_mean"
},
"extension_data": {
"anon": {
"weighted_mean": {
"control_data": {
"anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
"weighted_mean.control": "# weighted_mean extension\ncomment = 'A weighted mean aggregate function extension'\ndefault_version = '1.0.1'\nmodule_pathname = '$libdir/weighted_mean'\nrelocatable = true\ntrusted = true\n"
},
"archive_path": "5670669815/v15/extensions/anon.tar.zst"
"archive_path": "5670669815/v15/extensions/weighted_mean.tar.zst"
}
}
}

View File

@@ -0,0 +1,51 @@
This is some data for testing remote extensions
It was generated by this docker code
Basically just make installing the extension.
```
#########################################################################################
#
# Layer "weighted-mean-pg-build"
# compile weighted-mean extension
# Note: Please do not include this in the compute image,
# it is used for testing remote extension functionality
# so the extension *must* not be part of the compute image already
#
#########################################################################################
FROM build-deps AS weighted-mean-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin/:$PATH"
RUN wget https://github.com/Kozea/weighted_mean/archive/master.tar.gz -O weighted_mean.tar.gz &&\
mkdir weighted_mean-src && cd weighted_mean-src && tar xvzf ../weighted_mean.tar.gz --strip-components=1 -C . && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt &&\
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/weighted_mean.control && \
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /after.txt &&\
mkdir -p /extensions/weighted_mean && cp /usr/local/pgsql/share/extension/weighted_mean.control /extensions/weighted_mean && \
sort -o /before.txt /before.txt && sort -o /after.txt /after.txt && \
comm -13 /before.txt /after.txt | tar --directory=/usr/local/pgsql --zstd -cf /extensions/weighted_mean.tar.zst -T -
```
weighted_mean
Copyright (c) Ronan Dunklau 2012-2015
Permission to use, copy, modify, and distribute this software and its
documentation for any purpose, without fee, and without a written agreement
is hereby granted, provided that the above copyright notice and this
paragraph and the following two paragraphs appear in all copies.
IN NO EVENT SHALL RONAN DUNKLAU BE LIABLE TO ANY PARTY FOR
DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
DOCUMENTATION, EVEN IF RONAN DUNKLAU HAS BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
RONAN DUNKLAU SPECIFICALLY DISCLAIMS ANY WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
ON AN "AS IS" BASIS, AND RONAN DUNKLAU HAS NO OBLIGATIONS TO
PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.

View File

@@ -1,64 +1,23 @@
import os
import shutil
import threading
import uuid
from contextlib import closing
from pathlib import Path
from typing import Optional
import pytest
from fixtures.broker import NeonBroker
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import RemoteStorageKind, available_s3_storages
# Cleaning up downloaded files is important for local tests
# or else one test could reuse the files from another test or another test run
def cleanup(pg_version):
PGDIR = Path(f"pg_install/v{pg_version}")
LIB_DIR = PGDIR / Path("lib/postgresql")
cleanup_lib_globs = ["anon*", "postgis*", "pg_buffercache*"]
cleanup_lib_glob_paths = [LIB_DIR.glob(x) for x in cleanup_lib_globs]
SHARE_DIR = PGDIR / Path("share/postgresql/extension")
cleanup_ext_globs = [
"anon*",
"address_standardizer*",
"postgis*",
"pageinspect*",
"pg_buffercache*",
"pgrouting*",
]
cleanup_ext_glob_paths = [SHARE_DIR.glob(x) for x in cleanup_ext_globs]
all_glob_paths = cleanup_lib_glob_paths + cleanup_ext_glob_paths
all_cleanup_files = []
for file_glob in all_glob_paths:
for file in file_glob:
all_cleanup_files.append(file)
for file in all_cleanup_files:
try:
os.remove(file)
log.info(f"removed file {file}")
except Exception as err:
log.info(
f"skipping remove of file {file} because it doesn't exist.\
this may be expected or unexpected depending on the test {err}"
)
cleanup_folders = [SHARE_DIR / Path("anon"), PGDIR / Path("download_extensions")]
for folder in cleanup_folders:
try:
shutil.rmtree(folder)
log.info(f"removed folder {folder}")
except Exception as err:
log.info(
f"skipping remove of folder {folder} because it doesn't exist.\
this may be expected or unexpected depending on the test {err}"
)
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import (
MockS3Server,
RemoteStorageKind,
available_s3_storages,
)
def upload_files(env):
@@ -80,126 +39,134 @@ def upload_files(env):
os.chdir("../../../..")
# creates the weighted_mean extension and runs test queries to verify it works
def weighted_mean_test(endpoint, message: Optional[str] = None, create: bool = True):
if message is not None:
log.info(message)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# Check that appropriate control files were downloaded
cur.execute("SELECT * FROM pg_available_extensions")
all_extensions = [x[0] for x in cur.fetchall()]
log.info(all_extensions)
assert "weighted_mean" in all_extensions
# test is from: https://github.com/Kozea/weighted_mean/tree/master/test
if create:
cur.execute("CREATE extension weighted_mean")
cur.execute(
"create temp table test as (\
select a::numeric, b::numeric\
from\
generate_series(1, 100) as a(a),\
generate_series(1, 100) as b(b));"
)
cur.execute("select weighted_mean(a,b) from test;")
x = cur.fetchone()
log.info(x)
assert str(x) == "(Decimal('50.5000000000000000'),)"
cur.execute("update test set b = 0;")
cur.execute("select weighted_mean(a,b) from test;")
x = cur.fetchone()
log.info(x)
assert str(x) == "(Decimal('0'),)"
# Test downloading remote extension.
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
def test_remote_extensions(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_version: PgVersion,
test_output_dir: Path,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
neon_binpath: Path,
pg_distrib_dir: Path,
default_broker: NeonBroker,
run_id: uuid.UUID,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_remote_extensions",
enable_remote_extensions=True,
)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
alt_pgdir = test_output_dir / "pg_install"
log.info(f"Copying {pg_distrib_dir} (which is big) to {alt_pgdir}")
shutil.copytree(pg_distrib_dir / pg_version.v_prefixed, alt_pgdir / pg_version.v_prefixed)
assert env.ext_remote_storage is not None # satisfy mypy
assert env.remote_storage_client is not None # satisfy mypy
with NeonEnvBuilder(
repo_dir=test_output_dir / "repo",
port_distributor=port_distributor,
mock_s3_server=mock_s3_server,
neon_binpath=neon_binpath,
pg_distrib_dir=alt_pgdir,
pg_version=pg_version,
broker=default_broker,
run_id=run_id,
preserve_database_files=False,
) as neon_env_builder:
log.info(port_distributor)
log.info(mock_s3_server)
log.info(neon_binpath)
log.info(pg_distrib_dir)
log.info(pg_version)
log.info(default_broker)
log.info(run_id)
# For MOCK_S3 we upload test files.
# For REAL_S3 we use the files already in the bucket
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
test_name="test_remote_extensions",
enable_remote_extensions=True,
)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_remote_extensions", tenant_id=tenant_id)
assert env.ext_remote_storage is not None # satisfy mypy
upload_files(env)
# Start a compute node and check that it can download the extensions
# and use them to CREATE EXTENSION and LOAD
endpoint = env.endpoints.create_start(
"test_remote_extensions",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
# config_lines=["log_min_messages=debug3"],
)
try:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# Check that appropriate control files were downloaded
cur.execute("SELECT * FROM pg_available_extensions")
all_extensions = [x[0] for x in cur.fetchall()]
log.info(all_extensions)
assert "anon" in all_extensions
# Start a compute node and check that it can download the extensions
# and use them to CREATE EXTENSION and LOAD
endpoint = env.endpoints.create_start(
"test_remote_extensions",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
# config_lines=["log_min_messages=debug3"],
)
weighted_mean_test(endpoint)
# postgis is on real s3 but not mock s3.
# it's kind of a big file, would rather not upload to github
if remote_storage_kind == RemoteStorageKind.REAL_S3:
assert "postgis" in all_extensions
# this may fail locally if dependency is missing
# we don't really care about the error,
# we just want to make sure it downloaded
try:
cur.execute("CREATE EXTENSION postgis")
except Exception as err:
log.info(f"(expected) error creating postgis extension: {err}")
# we do not check the error, so this is basically a NO-OP
# however checking the log you can make sure that it worked
# and also get valuable information about how long loading the extension took
# # Test that extension is downloaded after endpoint restart,
# # when the library is used in the query.
# #
# # Run the test with mutliple simultaneous connections to an endpoint.
# # to ensure that the extension is downloaded only once.
# this is expected to fail on my computer because I don't have the pgcrypto extension
try:
cur.execute("CREATE EXTENSION anon")
except Exception as err:
log.info("error creating anon extension")
assert "pgcrypto" in str(err), "unexpected error creating anon extension"
finally:
cleanup(pg_version)
# import subprocess
# import threading
# # shutdown compute node
# endpoint.stop()
# # remove extension files locally
# SHAREDIR = subprocess.check_output(
# [alt_pgdir / f"{pg_version.v_prefixed}/bin/pg_config", "--sharedir"]
# )
# SHAREDIRPATH = Path(SHAREDIR.decode("utf-8").strip())
# log.info("SHAREDIRPATH: %s", SHAREDIRPATH)
# (SHAREDIRPATH / "extension/weighted_mean--1.0.1.sql").unlink()
# (SHAREDIRPATH / "extension/weighted_mean.control").unlink()
# # spin up compute node again (there are no extension files available, because compute is stateless)
# endpoint = env.endpoints.create_start(
# "test_remote_extensions",
# tenant_id=tenant_id,
# remote_ext_config=env.ext_remote_storage.to_string(),
# # config_lines=["log_min_messages=debug3"],
# )
# Test downloading remote library.
@pytest.mark.parametrize("remote_storage_kind", available_s3_storages())
def test_remote_library(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_version: PgVersion,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_remote_library",
enable_remote_extensions=True,
)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_remote_library", tenant_id=tenant_id)
assert env.ext_remote_storage is not None # satisfy mypy
assert env.remote_storage_client is not None # satisfy mypy
# For MOCK_S3 we upload test files.
# For REAL_S3 we use the files already in the bucket
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
upload_files(env)
# and use them to run LOAD library
endpoint = env.endpoints.create_start(
"test_remote_library",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
# config_lines=["log_min_messages=debug3"],
)
try:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# try to load library
try:
cur.execute("LOAD 'anon'")
except Exception as err:
log.info(f"error loading anon library: {err}")
raise AssertionError("unexpected error loading anon library") from err
# test library which name is different from extension name
# this may fail locally if dependency is missing
# however, it does successfully download the postgis archive
if remote_storage_kind == RemoteStorageKind.REAL_S3:
try:
cur.execute("LOAD 'postgis_topology-3'")
except Exception as err:
log.info("error loading postgis_topology-3")
assert "No such file or directory" in str(
err
), "unexpected error loading postgis_topology-3"
finally:
cleanup(pg_version)
# # connect to compute node and run the query
# # that will trigger the download of the extension
# threads = [
# threading.Thread(
# target=weighted_mean_test, args=(endpoint, f"this is thread {i}", False)
# )
# for i in range(2)
# ]
# for thread in threads:
# thread.start()
# for thread in threads:
# thread.join()
# Here we test a complex extension
@@ -210,114 +177,56 @@ def test_remote_library(
reason="skipping test because real s3 not enabled",
)
def test_multiple_extensions_one_archive(
neon_env_builder: NeonEnvBuilder,
pg_version: PgVersion,
test_output_dir: Path,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
neon_binpath: Path,
pg_distrib_dir: Path,
default_broker: NeonBroker,
run_id: uuid.UUID,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.REAL_S3,
test_name="test_multiple_extensions_one_archive",
enable_remote_extensions=True,
)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_multiple_extensions_one_archive", tenant_id=tenant_id)
alt_pgdir = test_output_dir / "pg_install"
log.info(f"Copying {pg_distrib_dir} (which is big) to {alt_pgdir}.")
shutil.copytree(pg_distrib_dir / pg_version.v_prefixed, alt_pgdir / pg_version.v_prefixed)
assert env.ext_remote_storage is not None # satisfy mypy
assert env.remote_storage_client is not None # satisfy mypy
with NeonEnvBuilder(
repo_dir=test_output_dir / "repo",
port_distributor=port_distributor,
mock_s3_server=mock_s3_server,
neon_binpath=neon_binpath,
pg_distrib_dir=alt_pgdir,
pg_version=pg_version,
broker=default_broker,
run_id=run_id,
preserve_database_files=False,
) as neon_env_builder:
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.REAL_S3,
test_name="test_multiple_extensions_one_archive",
enable_remote_extensions=True,
)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_multiple_extensions_one_archive", tenant_id=tenant_id)
endpoint = env.endpoints.create_start(
"test_multiple_extensions_one_archive",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE EXTENSION address_standardizer;")
cur.execute("CREATE EXTENSION address_standardizer_data_us;")
# execute query to ensure that it works
cur.execute(
"SELECT house_num, name, suftype, city, country, state, unit \
FROM standardize_address('us_lex', 'us_gaz', 'us_rules', \
'One Rust Place, Boston, MA 02109');"
)
res = cur.fetchall()
log.info(res)
assert len(res) > 0
assert env.ext_remote_storage is not None # satisfy mypy
cleanup(pg_version)
# Test that extension is downloaded after endpoint restart,
# when the library is used in the query.
#
# Run the test with mutliple simultaneous connections to an endpoint.
# to ensure that the extension is downloaded only once.
#
def test_extension_download_after_restart(
neon_env_builder: NeonEnvBuilder,
pg_version: PgVersion,
):
if "15" in pg_version: # SKIP v15 for now because test set only has extension built for v14
return None
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
test_name="test_extension_download_after_restart",
enable_remote_extensions=True,
)
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline("test_extension_download_after_restart", tenant_id=tenant_id)
assert env.ext_remote_storage is not None # satisfy mypy
assert env.remote_storage_client is not None # satisfy mypy
# For MOCK_S3 we upload test files.
upload_files(env)
endpoint = env.endpoints.create_start(
"test_extension_download_after_restart",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
config_lines=["log_min_messages=debug3"],
)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE extension pg_buffercache;")
cur.execute("SELECT * from pg_buffercache;")
res = cur.fetchall()
assert len(res) > 0
log.info(res)
# shutdown compute node
endpoint.stop()
# remove extension files locally
cleanup(pg_version)
# spin up compute node again (there are no extension files available, because compute is stateless)
endpoint = env.endpoints.create_start(
"test_extension_download_after_restart",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
config_lines=["log_min_messages=debug3"],
)
# connect to compute node and run the query
# that will trigger the download of the extension
def run_query(endpoint, thread_id: int):
log.info("thread_id {%d} starting", thread_id)
endpoint = env.endpoints.create_start(
"test_multiple_extensions_one_archive",
tenant_id=tenant_id,
remote_ext_config=env.ext_remote_storage.to_string(),
)
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SELECT * from pg_buffercache;")
cur.execute("CREATE EXTENSION address_standardizer;")
cur.execute("CREATE EXTENSION address_standardizer_data_us;")
# execute query to ensure that it works
cur.execute(
"SELECT house_num, name, suftype, city, country, state, unit \
FROM standardize_address('us_lex', 'us_gaz', 'us_rules', \
'One Rust Place, Boston, MA 02109');"
)
res = cur.fetchall()
log.info(res)
assert len(res) > 0
log.info("thread_id {%d}, res = %s", thread_id, res)
threads = [threading.Thread(target=run_query, args=(endpoint, i)) for i in range(2)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
cleanup(pg_version)

View File

@@ -1,10 +1,8 @@
import time
import pytest
from fixtures.neon_fixtures import NeonEnv
@pytest.mark.timeout(1800)
def test_hot_standby(neon_simple_env: NeonEnv):
env = neon_simple_env

View File

@@ -23,7 +23,6 @@ from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import subprocess_capture
@pytest.mark.timeout(600)
def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
# Put data in vanilla pg
vanilla_pg.start()
@@ -163,7 +162,6 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
assert endpoint.safe_psql("select count(*) from t") == [(300000,)]
@pytest.mark.timeout(600)
def test_import_from_pageserver_small(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_local_fs_remote_storage()
env = neon_env_builder.init_start()

View File

@@ -3,15 +3,11 @@
#
from pathlib import Path
import pytest
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
# Run the main PostgreSQL regression tests, in src/test/regress.
#
# This runs for a long time, especially in debug mode, so use a larger-than-default
# timeout.
@pytest.mark.timeout(1800)
def test_pg_regress(
neon_simple_env: NeonEnv,
test_output_dir: Path,
@@ -60,18 +56,11 @@ def test_pg_regress(
with capsys.disabled():
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
# checkpoint one more time to ensure that the lsn we get is the latest one
endpoint.safe_psql("CHECKPOINT")
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, endpoint)
# Run the PostgreSQL "isolation" tests, in src/test/isolation.
#
# This runs for a long time, especially in debug mode, so use a larger-than-default
# timeout.
@pytest.mark.timeout(1800)
def test_isolation(
neon_simple_env: NeonEnv,
test_output_dir: Path,
@@ -173,9 +162,4 @@ def test_sql_regress(
with capsys.disabled():
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
# checkpoint one more time to ensure that the lsn we get is the latest one
endpoint.safe_psql("CHECKPOINT")
endpoint.safe_psql("select pg_current_wal_insert_lsn()")[0][0]
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, endpoint)

View File

@@ -97,6 +97,11 @@ def test_remote_storage_backup_and_restore(
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
# Thats because of UnreliableWrapper's injected failures
env.pageserver.allowed_errors.append(
f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*"
)
checkpoint_numbers = range(1, 3)
for checkpoint_number in checkpoint_numbers:

View File

@@ -33,8 +33,4 @@ def test_subxacts(neon_simple_env: NeonEnv, test_output_dir):
cur.execute(f"insert into t1 values ({i}, {j})")
cur.execute("commit")
# force wal flush
cur.execute("checkpoint")
# Check that we can restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, endpoint)

View File

@@ -0,0 +1,250 @@
import enum
import os
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
MANY_SMALL_LAYERS_TENANT_CONFIG,
assert_prefix_empty,
poll_for_remote_storage_iterations,
tenant_delete_wait_completed,
wait_tenant_status_404,
wait_until_tenant_active,
wait_until_tenant_state,
)
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
from fixtures.types import TenantId
from fixtures.utils import run_pg_bench_small
@pytest.mark.parametrize(
"remote_storage_kind", [RemoteStorageKind.NOOP, *available_remote_storages()]
)
def test_tenant_delete_smoke(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_bin: PgBin,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_tenant_delete_smoke",
)
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
# first try to delete non existing tenant
tenant_id = TenantId.generate()
env.pageserver.allowed_errors.append(f".*NotFound: tenant {tenant_id}.*")
with pytest.raises(PageserverApiException, match=f"NotFound: tenant {tenant_id}"):
ps_http.tenant_delete(tenant_id=tenant_id)
env.neon_cli.create_tenant(
tenant_id=tenant_id,
conf=MANY_SMALL_LAYERS_TENANT_CONFIG,
)
# create two timelines one being the parent of another
parent = None
for timeline in ["first", "second"]:
timeline_id = env.neon_cli.create_branch(
timeline, tenant_id=tenant_id, ancestor_branch_name=parent
)
with env.endpoints.create_start(timeline, tenant_id=tenant_id) as endpoint:
run_pg_bench_small(pg_bin, endpoint.connstr())
wait_for_last_flush_lsn(env, endpoint, tenant=tenant_id, timeline=timeline_id)
parent = timeline
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
tenant_delete_wait_completed(ps_http, tenant_id, iterations)
tenant_path = env.tenant_dir(tenant_id=tenant_id)
assert not tenant_path.exists()
if remote_storage_kind in [RemoteStorageKind.MOCK_S3, RemoteStorageKind.REAL_S3]:
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
class Check(enum.Enum):
RETRY_WITHOUT_RESTART = enum.auto()
RETRY_WITH_RESTART = enum.auto()
FAILPOINTS = [
"tenant-delete-before-shutdown",
"tenant-delete-before-create-remote-mark",
"tenant-delete-before-create-local-mark",
"tenant-delete-before-background",
"tenant-delete-before-polling-ongoing-deletions",
"tenant-delete-before-cleanup-remaining-fs-traces",
"tenant-delete-before-remove-timelines-dir",
"tenant-delete-before-remove-deleted-mark",
"tenant-delete-before-remove-tenant-dir",
# Some failpoints from timeline deletion
"timeline-delete-before-index-deleted-at",
"timeline-delete-before-rm",
"timeline-delete-before-index-delete",
"timeline-delete-after-rm-dir",
]
FAILPOINTS_BEFORE_BACKGROUND = [
"timeline-delete-before-schedule",
"tenant-delete-before-shutdown",
"tenant-delete-before-create-remote-mark",
"tenant-delete-before-create-local-mark",
"tenant-delete-before-background",
]
def combinations():
result = []
remotes = [RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"):
remotes.append(RemoteStorageKind.REAL_S3)
for remote_storage_kind in remotes:
for delete_failpoint in FAILPOINTS:
if remote_storage_kind == RemoteStorageKind.NOOP and delete_failpoint in (
"timeline-delete-before-index-delete",
):
# the above failpoint are not relevant for config without remote storage
continue
result.append((remote_storage_kind, delete_failpoint))
return result
@pytest.mark.parametrize("remote_storage_kind, failpoint", combinations())
@pytest.mark.parametrize("check", list(Check))
def test_delete_tenant_exercise_crash_safety_failpoints(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
failpoint: str,
check: Check,
pg_bin: PgBin,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind, "test_delete_tenant_exercise_crash_safety_failpoints"
)
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
tenant_id = env.initial_tenant
env.pageserver.allowed_errors.extend(
[
# From deletion polling
f".*NotFound: tenant {env.initial_tenant}.*",
# allow errors caused by failpoints
f".*failpoint: {failpoint}",
# It appears when we stopped flush loop during deletion (attempt) and then pageserver is stopped
".*freeze_and_flush_on_shutdown.*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
# We may leave some upload tasks in the queue. They're likely deletes.
# For uploads we explicitly wait with `last_flush_lsn_upload` below.
# So by ignoring these instead of waiting for empty upload queue
# we execute more distinct code paths.
'.*stopping left-over name="remote upload".*',
]
)
ps_http = env.pageserver.http_client()
timeline_id = env.neon_cli.create_timeline("delete", tenant_id=tenant_id)
with env.endpoints.create_start("delete", tenant_id=tenant_id) as endpoint:
# generate enough layers
run_pg_bench_small(pg_bin, endpoint.connstr())
if remote_storage_kind is RemoteStorageKind.NOOP:
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
else:
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
ps_http.configure_failpoints((failpoint, "return"))
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
# These failpoints are earlier than background task is spawned.
# so they result in api request failure.
if failpoint in FAILPOINTS_BEFORE_BACKGROUND:
with pytest.raises(PageserverApiException, match=failpoint):
ps_http.tenant_delete(tenant_id)
else:
ps_http.tenant_delete(tenant_id)
tenant_info = wait_until_tenant_state(
pageserver_http=ps_http,
tenant_id=tenant_id,
expected_state="Broken",
iterations=iterations,
)
reason = tenant_info["state"]["data"]["reason"]
log.info(f"tenant broken: {reason}")
# failpoint may not be the only error in the stack
assert reason.endswith(f"failpoint: {failpoint}"), reason
if check is Check.RETRY_WITH_RESTART:
env.pageserver.stop()
env.pageserver.start()
if (
remote_storage_kind is RemoteStorageKind.NOOP
and failpoint == "tenant-delete-before-create-local-mark"
):
tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations)
elif failpoint in (
"tenant-delete-before-shutdown",
"tenant-delete-before-create-remote-mark",
):
wait_until_tenant_active(
ps_http, tenant_id=tenant_id, iterations=iterations, period=0.25
)
tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations)
else:
# Pageserver should've resumed deletion after restart.
wait_tenant_status_404(ps_http, tenant_id, iterations=iterations + 10)
elif check is Check.RETRY_WITHOUT_RESTART:
# this should succeed
# this also checks that delete can be retried even when tenant is in Broken state
ps_http.configure_failpoints((failpoint, "off"))
tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations)
# Check remote is impty
if remote_storage_kind is RemoteStorageKind.MOCK_S3:
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
tenant_dir = env.tenant_dir(tenant_id)
# Check local is empty
assert not tenant_dir.exists()
# TODO test concurrent deletions with "hang" failpoint
# TODO test tenant delete continues after attach

View File

@@ -66,6 +66,10 @@ def test_tenant_reattach(
env.pageserver.allowed_errors.append(
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
)
# Thats because of UnreliableWrapper's injected failures
env.pageserver.allowed_errors.append(
f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*"
)
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
with endpoint.cursor() as cur:

View File

@@ -17,9 +17,9 @@ from fixtures.neon_fixtures import (
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import (
assert_tenant_state,
tenant_exists,
wait_for_last_record_lsn,
wait_for_upload,
wait_tenant_status_404,
)
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
@@ -29,7 +29,6 @@ from fixtures.utils import (
start_in_background,
subprocess_capture,
wait_until,
wait_while,
)
@@ -269,11 +268,16 @@ def test_tenant_relocation(
env = neon_env_builder.init_start()
tenant_id = TenantId("74ee8b079a0e437eb0afea7d26a07209")
# FIXME: Is this expected?
env.pageserver.allowed_errors.append(
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
)
# Needed for detach polling.
env.pageserver.allowed_errors.append(f".*NotFound: tenant {tenant_id}.*")
# create folder for remote storage mock
remote_storage_mock_path = env.repo_dir / "local_fs_remote_storage"
@@ -283,9 +287,7 @@ def test_tenant_relocation(
pageserver_http = env.pageserver.http_client()
tenant_id, initial_timeline_id = env.neon_cli.create_tenant(
TenantId("74ee8b079a0e437eb0afea7d26a07209")
)
_, initial_timeline_id = env.neon_cli.create_tenant(tenant_id)
log.info("tenant to relocate %s initial_timeline_id %s", tenant_id, initial_timeline_id)
env.neon_cli.create_branch("test_tenant_relocation_main", tenant_id=tenant_id)
@@ -469,11 +471,8 @@ def test_tenant_relocation(
pageserver_http.tenant_detach(tenant_id)
# Wait a little, so that the detach operation has time to finish.
wait_while(
number_of_iterations=100,
interval=1,
func=lambda: tenant_exists(pageserver_http, tenant_id),
)
wait_tenant_status_404(pageserver_http, tenant_id, iterations=100, interval=1)
post_migration_check(ep_main, 500500, old_local_path_main)
post_migration_check(ep_second, 1001000, old_local_path_second)

View File

@@ -146,6 +146,11 @@ def test_tenants_attached_after_download(
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
# Thats because of UnreliableWrapper's injected failures
env.pageserver.allowed_errors.append(
f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*"
)
for checkpoint_number in range(1, 3):
with endpoint.cursor() as cur:
cur.execute(

View File

@@ -4,7 +4,6 @@ import queue
import shutil
import threading
from pathlib import Path
from typing import Optional
import pytest
import requests
@@ -18,6 +17,8 @@ from fixtures.neon_fixtures import (
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
assert_prefix_empty,
poll_for_remote_storage_iterations,
timeline_delete_wait_completed,
wait_for_last_record_lsn,
wait_for_upload,
@@ -27,7 +28,6 @@ from fixtures.pageserver.utils import (
)
from fixtures.remote_storage import (
RemoteStorageKind,
S3Storage,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
@@ -187,10 +187,9 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
8. Retry or restart without the failpoint and check the result.
"""
if remote_storage_kind is not None:
neon_env_builder.enable_remote_storage(
remote_storage_kind, "test_delete_timeline_exercise_crash_safety_failpoints"
)
neon_env_builder.enable_remote_storage(
remote_storage_kind, "test_delete_timeline_exercise_crash_safety_failpoints"
)
env = neon_env_builder.init_start(
initial_tenant_conf={
@@ -231,7 +230,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
ps_http.configure_failpoints((failpoint, "return"))
iterations = 20 if remote_storage_kind is RemoteStorageKind.REAL_S3 else 4
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
# These failpoints are earlier than background task is spawned.
# so they result in api request failure.
@@ -280,14 +279,14 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
"remote_storage_s3_request_seconds_count",
filter={"request_type": "get_object", "result": "err"},
).value
== 1
== 2 # One is missing tenant deletion mark, second is missing index part
)
assert (
m.query_one(
"remote_storage_s3_request_seconds_count",
filter={"request_type": "get_object", "result": "ok"},
).value
== 1
== 1 # index part for initial timeline
)
elif check is Check.RETRY_WITHOUT_RESTART:
# this should succeed
@@ -413,27 +412,6 @@ def test_timeline_resurrection_on_attach(
assert all([tl["state"] == "Active" for tl in timelines])
def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str] = None):
# For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api.
assert neon_env_builder.remote_storage_kind in (
RemoteStorageKind.MOCK_S3,
RemoteStorageKind.REAL_S3,
)
# For mypy
assert isinstance(neon_env_builder.remote_storage, S3Storage)
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
assert neon_env_builder.remote_storage_client is not None
response = neon_env_builder.remote_storage_client.list_objects_v2(
Bucket=neon_env_builder.remote_storage.bucket_name,
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
)
objects = response.get("Contents")
assert (
response["KeyCount"] == 0
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuilder):
"""
When deleting a timeline, if we succeed in setting the deleted flag remotely