mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 12:30:38 +00:00
Compare commits
29 Commits
problame/2
...
proxy-forw
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db7b244fdb | ||
|
|
e00127e84b | ||
|
|
6506fd14c4 | ||
|
|
46fb1a90ce | ||
|
|
56171cbe8c | ||
|
|
48b05b7c50 | ||
|
|
0856fe6676 | ||
|
|
4133d14a77 | ||
|
|
30c9e145d7 | ||
|
|
24e916d37f | ||
|
|
23f58145ed | ||
|
|
350865392c | ||
|
|
1be5e564ce | ||
|
|
7a70ef991f | ||
|
|
be30388901 | ||
|
|
3525080031 | ||
|
|
527cdbc010 | ||
|
|
39be2b0108 | ||
|
|
fa52cd575e | ||
|
|
d2c410c748 | ||
|
|
221531c9db | ||
|
|
4c173456dc | ||
|
|
e82625b77d | ||
|
|
0ac1e71524 | ||
|
|
271133d960 | ||
|
|
3d5fab127a | ||
|
|
66719d7eaf | ||
|
|
9a9d9beaee | ||
|
|
2bfc831c60 |
2
.github/actionlint.yml
vendored
2
.github/actionlint.yml
vendored
@@ -4,6 +4,8 @@ self-hosted-runner:
|
||||
- dev
|
||||
- gen3
|
||||
- large
|
||||
# Remove `macos-14` from the list after https://github.com/rhysd/actionlint/pull/392 is merged.
|
||||
- macos-14
|
||||
- small
|
||||
- us-east-2
|
||||
config-variables:
|
||||
|
||||
@@ -179,23 +179,6 @@ runs:
|
||||
aws s3 rm "s3://${BUCKET}/${LOCK_FILE}"
|
||||
fi
|
||||
|
||||
- name: Store Allure test stat in the DB
|
||||
if: ${{ !cancelled() && inputs.store-test-results-into-db == 'true' }}
|
||||
shell: bash -euxo pipefail {0}
|
||||
env:
|
||||
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
REPORT_JSON_URL: ${{ steps.generate-report.outputs.report-json-url }}
|
||||
run: |
|
||||
export DATABASE_URL=${REGRESS_TEST_RESULT_CONNSTR}
|
||||
|
||||
./scripts/pysync
|
||||
|
||||
poetry run python3 scripts/ingest_regress_test_result.py \
|
||||
--revision ${COMMIT_SHA} \
|
||||
--reference ${GITHUB_REF} \
|
||||
--build-type unified \
|
||||
--ingest ${WORKDIR}/report/data/suites.json
|
||||
|
||||
- name: Store Allure test stat in the DB (new)
|
||||
if: ${{ !cancelled() && inputs.store-test-results-into-db == 'true' }}
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
24
.github/workflows/build_and_test.yml
vendored
24
.github/workflows/build_and_test.yml
vendored
@@ -531,7 +531,6 @@ jobs:
|
||||
with:
|
||||
store-test-results-into-db: true
|
||||
env:
|
||||
REGRESS_TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR }}
|
||||
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
|
||||
|
||||
- uses: actions/github-script@v6
|
||||
@@ -609,17 +608,6 @@ jobs:
|
||||
--input-objects=/tmp/coverage/binaries.list \
|
||||
--format=lcov
|
||||
|
||||
- name: Upload coverage report
|
||||
id: upload-coverage-report
|
||||
env:
|
||||
BUCKET: neon-github-public-dev
|
||||
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
run: |
|
||||
aws s3 cp --only-show-errors --recursive /tmp/coverage/report s3://${BUCKET}/code-coverage/${COMMIT_SHA}
|
||||
|
||||
REPORT_URL=https://${BUCKET}.s3.amazonaws.com/code-coverage/${COMMIT_SHA}/index.html
|
||||
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Build coverage report NEW
|
||||
id: upload-coverage-report-new
|
||||
env:
|
||||
@@ -656,21 +644,11 @@ jobs:
|
||||
|
||||
- uses: actions/github-script@v6
|
||||
env:
|
||||
REPORT_URL: ${{ steps.upload-coverage-report.outputs.report-url }}
|
||||
REPORT_URL_NEW: ${{ steps.upload-coverage-report-new.outputs.report-url }}
|
||||
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
with:
|
||||
script: |
|
||||
const { REPORT_URL, REPORT_URL_NEW, COMMIT_SHA } = process.env
|
||||
|
||||
await github.rest.repos.createCommitStatus({
|
||||
owner: context.repo.owner,
|
||||
repo: context.repo.repo,
|
||||
sha: `${COMMIT_SHA}`,
|
||||
state: 'success',
|
||||
target_url: `${REPORT_URL}`,
|
||||
context: 'Code coverage report',
|
||||
})
|
||||
const { REPORT_URL_NEW, COMMIT_SHA } = process.env
|
||||
|
||||
await github.rest.repos.createCommitStatus({
|
||||
owner: context.repo.owner,
|
||||
|
||||
12
.github/workflows/neon_extra_builds.yml
vendored
12
.github/workflows/neon_extra_builds.yml
vendored
@@ -26,7 +26,7 @@ jobs:
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
|
||||
github.ref_name == 'main'
|
||||
timeout-minutes: 90
|
||||
runs-on: macos-latest
|
||||
runs-on: macos-14
|
||||
|
||||
env:
|
||||
# Use release build only, to have less debug info around
|
||||
@@ -60,21 +60,21 @@ jobs:
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: pg_install/v14
|
||||
key: v1-${{ runner.os }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache postgres v15 build
|
||||
id: cache_pg_15
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: pg_install/v15
|
||||
key: v1-${{ runner.os }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache postgres v16 build
|
||||
id: cache_pg_16
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: pg_install/v16
|
||||
key: v1-${{ runner.os }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Set extra env for macOS
|
||||
run: |
|
||||
@@ -89,7 +89,7 @@ jobs:
|
||||
!~/.cargo/registry/src
|
||||
~/.cargo/git
|
||||
target
|
||||
key: v1-${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-${{ hashFiles('./rust-toolchain.toml') }}-rust
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-cargo-${{ hashFiles('./Cargo.lock') }}-${{ hashFiles('./rust-toolchain.toml') }}-rust
|
||||
|
||||
- name: Build postgres v14
|
||||
if: steps.cache_pg_14.outputs.cache-hit != 'true'
|
||||
@@ -110,7 +110,7 @@ jobs:
|
||||
run: make walproposer-lib -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Run cargo build
|
||||
run: cargo build --all --release
|
||||
run: PQ_LIB_DIR=$(pwd)/pg_install/v16/lib cargo build --all --release
|
||||
|
||||
- name: Check that no warnings are produced
|
||||
run: ./run_clippy.sh
|
||||
|
||||
@@ -20,7 +20,7 @@ ln -s ../../pre-commit.py .git/hooks/pre-commit
|
||||
|
||||
This will run following checks on staged files before each commit:
|
||||
- `rustfmt`
|
||||
- checks for python files, see [obligatory checks](/docs/sourcetree.md#obligatory-checks).
|
||||
- checks for Python files, see [obligatory checks](/docs/sourcetree.md#obligatory-checks).
|
||||
|
||||
There is also a separate script `./run_clippy.sh` that runs `cargo clippy` on the whole project
|
||||
and `./scripts/reformat` that runs all formatting tools to ensure the project is up to date.
|
||||
|
||||
40
Cargo.lock
generated
40
Cargo.lock
generated
@@ -1144,16 +1144,6 @@ version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b"
|
||||
|
||||
[[package]]
|
||||
name = "close_fds"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3bc416f33de9d59e79e57560f450d21ff8393adcf1cdfc3e6d8fb93d5f88a2ed"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "colorchoice"
|
||||
version = "1.0.0"
|
||||
@@ -2811,6 +2801,15 @@ version = "2.6.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167"
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.8.0"
|
||||
@@ -2943,6 +2942,19 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.26.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"memoffset 0.7.1",
|
||||
"pin-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nix"
|
||||
version = "0.27.1"
|
||||
@@ -3396,7 +3408,6 @@ dependencies = [
|
||||
"camino-tempfile",
|
||||
"chrono",
|
||||
"clap",
|
||||
"close_fds",
|
||||
"const_format",
|
||||
"consumption_metrics",
|
||||
"crc32c",
|
||||
@@ -3472,6 +3483,7 @@ dependencies = [
|
||||
"bincode",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"const_format",
|
||||
"enum-map",
|
||||
"hex",
|
||||
@@ -3891,6 +3903,7 @@ dependencies = [
|
||||
"pin-project-lite",
|
||||
"postgres-protocol",
|
||||
"rand 0.8.5",
|
||||
"smallvec",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
@@ -5662,9 +5675,10 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-epoll-uring"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#0dd3a2f8bf3239d34a19719ef1a74146c093126f"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#0e1af4ccddf2f01805cfc9eaefa97ee13c04b52d"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"nix 0.26.4",
|
||||
"once_cell",
|
||||
"scopeguard",
|
||||
"thiserror",
|
||||
@@ -6186,7 +6200,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "uring-common"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#0dd3a2f8bf3239d34a19719ef1a74146c093126f"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#0e1af4ccddf2f01805cfc9eaefa97ee13c04b52d"
|
||||
dependencies = [
|
||||
"io-uring",
|
||||
"libc",
|
||||
|
||||
@@ -64,7 +64,6 @@ camino = "1.1.6"
|
||||
cfg-if = "1.0.0"
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock"] }
|
||||
clap = { version = "4.0", features = ["derive"] }
|
||||
close_fds = "0.3.2"
|
||||
comfy-table = "6.1"
|
||||
const_format = "0.2"
|
||||
crc32c = "0.6"
|
||||
|
||||
6
Makefile
6
Makefile
@@ -51,6 +51,8 @@ CARGO_BUILD_FLAGS += $(filter -j1,$(MAKEFLAGS))
|
||||
CARGO_CMD_PREFIX += $(if $(filter n,$(MAKEFLAGS)),,+)
|
||||
# Force cargo not to print progress bar
|
||||
CARGO_CMD_PREFIX += CARGO_TERM_PROGRESS_WHEN=never CI=1
|
||||
# Set PQ_LIB_DIR to make sure `attachment_service` get linked with bundled libpq (through diesel)
|
||||
CARGO_CMD_PREFIX += PQ_LIB_DIR=$(POSTGRES_INSTALL_DIR)/v16/lib
|
||||
|
||||
#
|
||||
# Top level Makefile to build Neon and PostgreSQL
|
||||
@@ -174,10 +176,10 @@ neon-pg-ext-clean-%:
|
||||
|
||||
# Build walproposer as a static library. walproposer source code is located
|
||||
# in the pgxn/neon directory.
|
||||
#
|
||||
#
|
||||
# We also need to include libpgport.a and libpgcommon.a, because walproposer
|
||||
# uses some functions from those libraries.
|
||||
#
|
||||
#
|
||||
# Some object files are removed from libpgport.a and libpgcommon.a because
|
||||
# they depend on openssl and other libraries that are not included in our
|
||||
# Rust build.
|
||||
|
||||
20
README.md
20
README.md
@@ -14,8 +14,8 @@ Alternatively, compile and run the project [locally](#running-local-installation
|
||||
A Neon installation consists of compute nodes and the Neon storage engine. Compute nodes are stateless PostgreSQL nodes backed by the Neon storage engine.
|
||||
|
||||
The Neon storage engine consists of two major components:
|
||||
- Pageserver. Scalable storage backend for the compute nodes.
|
||||
- Safekeepers. The safekeepers form a redundant WAL service that received WAL from the compute node, and stores it durably until it has been processed by the pageserver and uploaded to cloud storage.
|
||||
- Pageserver: Scalable storage backend for the compute nodes.
|
||||
- Safekeepers: The safekeepers form a redundant WAL service that received WAL from the compute node, and stores it durably until it has been processed by the pageserver and uploaded to cloud storage.
|
||||
|
||||
See developer documentation in [SUMMARY.md](/docs/SUMMARY.md) for more information.
|
||||
|
||||
@@ -81,9 +81,9 @@ The project uses [rust toolchain file](./rust-toolchain.toml) to define the vers
|
||||
|
||||
This file is automatically picked up by [`rustup`](https://rust-lang.github.io/rustup/overrides.html#the-toolchain-file) that installs (if absent) and uses the toolchain version pinned in the file.
|
||||
|
||||
rustup users who want to build with another toolchain can use [`rustup override`](https://rust-lang.github.io/rustup/overrides.html#directory-overrides) command to set a specific toolchain for the project's directory.
|
||||
rustup users who want to build with another toolchain can use the [`rustup override`](https://rust-lang.github.io/rustup/overrides.html#directory-overrides) command to set a specific toolchain for the project's directory.
|
||||
|
||||
non-rustup users most probably are not getting the same toolchain automatically from the file, so are responsible to manually verify their toolchain matches the version in the file.
|
||||
non-rustup users most probably are not getting the same toolchain automatically from the file, so are responsible to manually verify that their toolchain matches the version in the file.
|
||||
Newer rustc versions most probably will work fine, yet older ones might not be supported due to some new features used by the project or the crates.
|
||||
|
||||
#### Building on Linux
|
||||
@@ -124,7 +124,7 @@ make -j`sysctl -n hw.logicalcpu` -s
|
||||
To run the `psql` client, install the `postgresql-client` package or modify `PATH` and `LD_LIBRARY_PATH` to include `pg_install/bin` and `pg_install/lib`, respectively.
|
||||
|
||||
To run the integration tests or Python scripts (not required to use the code), install
|
||||
Python (3.9 or higher), and install python3 packages using `./scripts/pysync` (requires [poetry>=1.3](https://python-poetry.org/)) in the project directory.
|
||||
Python (3.9 or higher), and install the python3 packages using `./scripts/pysync` (requires [poetry>=1.3](https://python-poetry.org/)) in the project directory.
|
||||
|
||||
|
||||
#### Running neon database
|
||||
@@ -166,7 +166,7 @@ Starting postgres at 'postgresql://cloud_admin@127.0.0.1:55432/postgres'
|
||||
|
||||
2. Now, it is possible to connect to postgres and run some queries:
|
||||
```text
|
||||
> psql -p55432 -h 127.0.0.1 -U cloud_admin postgres
|
||||
> psql -p 55432 -h 127.0.0.1 -U cloud_admin postgres
|
||||
postgres=# CREATE TABLE t(key int primary key, value text);
|
||||
CREATE TABLE
|
||||
postgres=# insert into t values(1,1);
|
||||
@@ -205,7 +205,7 @@ Starting postgres at 'postgresql://cloud_admin@127.0.0.1:55434/postgres'
|
||||
|
||||
# this new postgres instance will have all the data from 'main' postgres,
|
||||
# but all modifications would not affect data in original postgres
|
||||
> psql -p55434 -h 127.0.0.1 -U cloud_admin postgres
|
||||
> psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres
|
||||
postgres=# select * from t;
|
||||
key | value
|
||||
-----+-------
|
||||
@@ -216,7 +216,7 @@ postgres=# insert into t values(2,2);
|
||||
INSERT 0 1
|
||||
|
||||
# check that the new change doesn't affect the 'main' postgres
|
||||
> psql -p55432 -h 127.0.0.1 -U cloud_admin postgres
|
||||
> psql -p 55432 -h 127.0.0.1 -U cloud_admin postgres
|
||||
postgres=# select * from t;
|
||||
key | value
|
||||
-----+-------
|
||||
@@ -224,7 +224,7 @@ postgres=# select * from t;
|
||||
(1 row)
|
||||
```
|
||||
|
||||
4. If you want to run tests afterward (see below), you must stop all the running of the pageserver, safekeeper, and postgres instances
|
||||
4. If you want to run tests afterwards (see below), you must stop all the running pageserver, safekeeper, and postgres instances
|
||||
you have just started. You can terminate them all with one command:
|
||||
```sh
|
||||
> cargo neon stop
|
||||
@@ -243,7 +243,7 @@ CARGO_BUILD_FLAGS="--features=testing" make
|
||||
```
|
||||
|
||||
By default, this runs both debug and release modes, and all supported postgres versions. When
|
||||
testing locally, it is convenient to run just run one set of permutations, like this:
|
||||
testing locally, it is convenient to run just one set of permutations, like this:
|
||||
|
||||
```sh
|
||||
DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest
|
||||
|
||||
@@ -319,7 +319,7 @@ impl ComputeNode {
|
||||
// Get basebackup from the libpq connection to pageserver using `connstr` and
|
||||
// unarchive it to `pgdata` directory overriding all its previous content.
|
||||
#[instrument(skip_all, fields(%lsn))]
|
||||
fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
|
||||
fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
|
||||
let spec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let start_time = Instant::now();
|
||||
|
||||
@@ -390,6 +390,34 @@ impl ComputeNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Gets the basebackup in a retry loop
|
||||
#[instrument(skip_all, fields(%lsn))]
|
||||
pub fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
|
||||
let mut retry_period_ms = 500;
|
||||
let mut attempts = 0;
|
||||
let max_attempts = 5;
|
||||
loop {
|
||||
let result = self.try_get_basebackup(compute_state, lsn);
|
||||
match result {
|
||||
Ok(_) => {
|
||||
return result;
|
||||
}
|
||||
Err(ref e) if attempts < max_attempts => {
|
||||
warn!(
|
||||
"Failed to get basebackup: {} (attempt {}/{})",
|
||||
e, attempts, max_attempts
|
||||
);
|
||||
std::thread::sleep(std::time::Duration::from_millis(retry_period_ms));
|
||||
retry_period_ms *= 2;
|
||||
}
|
||||
Err(_) => {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
attempts += 1;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn check_safekeepers_synced_async(
|
||||
&self,
|
||||
compute_state: &ComputeState,
|
||||
|
||||
@@ -39,7 +39,7 @@ struct Cli {
|
||||
|
||||
/// Path to the .json file to store state (will be created if it doesn't exist)
|
||||
#[arg(short, long)]
|
||||
path: Utf8PathBuf,
|
||||
path: Option<Utf8PathBuf>,
|
||||
|
||||
/// URL to connect to postgres, like postgresql://localhost:1234/attachment_service
|
||||
#[arg(long)]
|
||||
@@ -62,7 +62,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
GIT_VERSION,
|
||||
launch_ts.to_string(),
|
||||
BUILD_TAG,
|
||||
args.path,
|
||||
args.path.as_ref().unwrap_or(&Utf8PathBuf::from("<none>")),
|
||||
args.listen
|
||||
);
|
||||
|
||||
@@ -70,11 +70,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
jwt_token: args.jwt_token,
|
||||
};
|
||||
|
||||
let json_path = if args.path.as_os_str().is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(args.path)
|
||||
};
|
||||
let json_path = args.path;
|
||||
let persistence = Arc::new(Persistence::new(args.database_url, json_path.clone()));
|
||||
|
||||
let service = Service::spawn(config, persistence.clone()).await?;
|
||||
|
||||
@@ -256,7 +256,9 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
|
||||
for env_key in [
|
||||
"AWS_ACCESS_KEY_ID",
|
||||
"AWS_SECRET_ACCESS_KEY",
|
||||
"AWS_SESSION_TOKEN",
|
||||
"AWS_PROFILE",
|
||||
// HOME is needed in combination with `AWS_PROFILE` to pick up the SSO sessions.
|
||||
"HOME",
|
||||
"AZURE_STORAGE_ACCOUNT",
|
||||
"AZURE_STORAGE_ACCESS_KEY",
|
||||
] {
|
||||
|
||||
@@ -395,6 +395,11 @@ impl PageServerNode {
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
|
||||
lazy_slru_download: settings
|
||||
.remove("lazy_slru_download")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'lazy_slru_download' as bool")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
@@ -495,6 +500,11 @@ impl PageServerNode {
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
|
||||
lazy_slru_download: settings
|
||||
.remove("lazy_slru_download")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'lazy_slru_download' as bool")?,
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ strum_macros.workspace = true
|
||||
hex.workspace = true
|
||||
thiserror.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
chrono.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
|
||||
@@ -63,16 +63,84 @@ impl KeySpace {
|
||||
KeyPartitioning { parts }
|
||||
}
|
||||
|
||||
/// Update the keyspace such that it doesn't contain any range
|
||||
/// that is overlapping with `other`. This can involve splitting or
|
||||
/// removing of existing ranges.
|
||||
pub fn remove_overlapping_with(&mut self, other: &KeySpace) {
|
||||
let (self_start, self_end) = match (self.start(), self.end()) {
|
||||
(Some(start), Some(end)) => (start, end),
|
||||
_ => {
|
||||
// self is empty
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Key spaces are sorted by definition, so skip ahead to the first
|
||||
// potentially intersecting range. Similarly, ignore ranges that start
|
||||
// after the current keyspace ends.
|
||||
let other_ranges = other
|
||||
.ranges
|
||||
.iter()
|
||||
.skip_while(|range| self_start >= range.end)
|
||||
.take_while(|range| self_end > range.start);
|
||||
|
||||
for range in other_ranges {
|
||||
while let Some(overlap_at) = self.overlaps_at(range) {
|
||||
let overlapped = self.ranges[overlap_at].clone();
|
||||
|
||||
if overlapped.start < range.start && overlapped.end <= range.end {
|
||||
// Higher part of the range is completely overlapped.
|
||||
self.ranges[overlap_at].end = range.start;
|
||||
}
|
||||
if overlapped.start >= range.start && overlapped.end > range.end {
|
||||
// Lower part of the range is completely overlapped.
|
||||
self.ranges[overlap_at].start = range.end;
|
||||
}
|
||||
if overlapped.start < range.start && overlapped.end > range.end {
|
||||
// Middle part of the range is overlapped.
|
||||
self.ranges[overlap_at].end = range.start;
|
||||
self.ranges
|
||||
.insert(overlap_at + 1, range.end..overlapped.end);
|
||||
}
|
||||
if overlapped.start >= range.start && overlapped.end <= range.end {
|
||||
// Whole range is overlapped
|
||||
self.ranges.remove(overlap_at);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(&self) -> Option<Key> {
|
||||
self.ranges.first().map(|range| range.start)
|
||||
}
|
||||
|
||||
pub fn end(&self) -> Option<Key> {
|
||||
self.ranges.last().map(|range| range.end)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub fn total_size(&self) -> usize {
|
||||
self.ranges
|
||||
.iter()
|
||||
.map(|range| key_range_size(range) as usize)
|
||||
.sum()
|
||||
}
|
||||
|
||||
fn overlaps_at(&self, range: &Range<Key>) -> Option<usize> {
|
||||
match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
|
||||
Ok(0) => None,
|
||||
Err(0) => None,
|
||||
Ok(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
|
||||
Err(index) if self.ranges[index - 1].end > range.start => Some(index - 1),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Check if key space contains overlapping range
|
||||
///
|
||||
pub fn overlaps(&self, range: &Range<Key>) -> bool {
|
||||
match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
|
||||
Ok(0) => false,
|
||||
Err(0) => false,
|
||||
Ok(index) => self.ranges[index - 1].end > range.start,
|
||||
Err(index) => self.ranges[index - 1].end > range.start,
|
||||
}
|
||||
self.overlaps_at(range).is_some()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -441,4 +509,118 @@ mod tests {
|
||||
// xxxxxxxxxxx
|
||||
assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_remove_full_overlapps() {
|
||||
let mut key_space1 = KeySpace {
|
||||
ranges: vec![
|
||||
Key::from_i128(1)..Key::from_i128(4),
|
||||
Key::from_i128(5)..Key::from_i128(8),
|
||||
Key::from_i128(10)..Key::from_i128(12),
|
||||
],
|
||||
};
|
||||
let key_space2 = KeySpace {
|
||||
ranges: vec![
|
||||
Key::from_i128(2)..Key::from_i128(3),
|
||||
Key::from_i128(6)..Key::from_i128(7),
|
||||
Key::from_i128(11)..Key::from_i128(13),
|
||||
],
|
||||
};
|
||||
key_space1.remove_overlapping_with(&key_space2);
|
||||
assert_eq!(
|
||||
key_space1.ranges,
|
||||
vec![
|
||||
Key::from_i128(1)..Key::from_i128(2),
|
||||
Key::from_i128(3)..Key::from_i128(4),
|
||||
Key::from_i128(5)..Key::from_i128(6),
|
||||
Key::from_i128(7)..Key::from_i128(8),
|
||||
Key::from_i128(10)..Key::from_i128(11)
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_remove_partial_overlaps() {
|
||||
// Test partial ovelaps
|
||||
let mut key_space1 = KeySpace {
|
||||
ranges: vec![
|
||||
Key::from_i128(1)..Key::from_i128(5),
|
||||
Key::from_i128(7)..Key::from_i128(10),
|
||||
Key::from_i128(12)..Key::from_i128(15),
|
||||
],
|
||||
};
|
||||
let key_space2 = KeySpace {
|
||||
ranges: vec![
|
||||
Key::from_i128(3)..Key::from_i128(6),
|
||||
Key::from_i128(8)..Key::from_i128(11),
|
||||
Key::from_i128(14)..Key::from_i128(17),
|
||||
],
|
||||
};
|
||||
key_space1.remove_overlapping_with(&key_space2);
|
||||
assert_eq!(
|
||||
key_space1.ranges,
|
||||
vec![
|
||||
Key::from_i128(1)..Key::from_i128(3),
|
||||
Key::from_i128(7)..Key::from_i128(8),
|
||||
Key::from_i128(12)..Key::from_i128(14),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_remove_no_overlaps() {
|
||||
let mut key_space1 = KeySpace {
|
||||
ranges: vec![
|
||||
Key::from_i128(1)..Key::from_i128(5),
|
||||
Key::from_i128(7)..Key::from_i128(10),
|
||||
Key::from_i128(12)..Key::from_i128(15),
|
||||
],
|
||||
};
|
||||
let key_space2 = KeySpace {
|
||||
ranges: vec![
|
||||
Key::from_i128(6)..Key::from_i128(7),
|
||||
Key::from_i128(11)..Key::from_i128(12),
|
||||
Key::from_i128(15)..Key::from_i128(17),
|
||||
],
|
||||
};
|
||||
key_space1.remove_overlapping_with(&key_space2);
|
||||
assert_eq!(
|
||||
key_space1.ranges,
|
||||
vec![
|
||||
Key::from_i128(1)..Key::from_i128(5),
|
||||
Key::from_i128(7)..Key::from_i128(10),
|
||||
Key::from_i128(12)..Key::from_i128(15),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_remove_one_range_overlaps_multiple() {
|
||||
let mut key_space1 = KeySpace {
|
||||
ranges: vec![
|
||||
Key::from_i128(1)..Key::from_i128(3),
|
||||
Key::from_i128(3)..Key::from_i128(6),
|
||||
Key::from_i128(6)..Key::from_i128(10),
|
||||
Key::from_i128(12)..Key::from_i128(15),
|
||||
Key::from_i128(17)..Key::from_i128(20),
|
||||
Key::from_i128(20)..Key::from_i128(30),
|
||||
Key::from_i128(30)..Key::from_i128(40),
|
||||
],
|
||||
};
|
||||
let key_space2 = KeySpace {
|
||||
ranges: vec![Key::from_i128(9)..Key::from_i128(19)],
|
||||
};
|
||||
key_space1.remove_overlapping_with(&key_space2);
|
||||
assert_eq!(
|
||||
key_space1.ranges,
|
||||
vec![
|
||||
Key::from_i128(1)..Key::from_i128(3),
|
||||
Key::from_i128(3)..Key::from_i128(6),
|
||||
Key::from_i128(6)..Key::from_i128(9),
|
||||
Key::from_i128(19)..Key::from_i128(20),
|
||||
Key::from_i128(20)..Key::from_i128(30),
|
||||
Key::from_i128(30)..Key::from_i128(40),
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::{
|
||||
};
|
||||
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
use strum_macros;
|
||||
@@ -271,6 +272,7 @@ pub struct TenantConfig {
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
pub gc_feedback: Option<bool>,
|
||||
pub heatmap_period: Option<String>,
|
||||
pub lazy_slru_download: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -452,6 +454,8 @@ pub struct TenantDetails {
|
||||
#[serde(flatten)]
|
||||
pub tenant_info: TenantInfo,
|
||||
|
||||
pub walredo: Option<WalRedoManagerStatus>,
|
||||
|
||||
pub timelines: Vec<TimelineId>,
|
||||
}
|
||||
|
||||
@@ -639,6 +643,12 @@ pub struct TimelineGcRequest {
|
||||
pub gc_horizon: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WalRedoManagerStatus {
|
||||
pub last_redo_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
pub pid: Option<u32>,
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
#[derive(PartialEq, Eq, Debug)]
|
||||
pub enum PagestreamFeMessage {
|
||||
@@ -646,6 +656,7 @@ pub enum PagestreamFeMessage {
|
||||
Nblocks(PagestreamNblocksRequest),
|
||||
GetPage(PagestreamGetPageRequest),
|
||||
DbSize(PagestreamDbSizeRequest),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentRequest),
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
@@ -656,6 +667,7 @@ pub enum PagestreamBeMessage {
|
||||
GetPage(PagestreamGetPageResponse),
|
||||
Error(PagestreamErrorResponse),
|
||||
DbSize(PagestreamDbSizeResponse),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentResponse),
|
||||
}
|
||||
|
||||
// Keep in sync with `pagestore_client.h`
|
||||
@@ -666,6 +678,7 @@ enum PagestreamBeMessageTag {
|
||||
GetPage = 102,
|
||||
Error = 103,
|
||||
DbSize = 104,
|
||||
GetSlruSegment = 105,
|
||||
}
|
||||
impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
type Error = u8;
|
||||
@@ -676,6 +689,7 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
102 => Ok(PagestreamBeMessageTag::GetPage),
|
||||
103 => Ok(PagestreamBeMessageTag::Error),
|
||||
104 => Ok(PagestreamBeMessageTag::DbSize),
|
||||
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
|
||||
_ => Err(value),
|
||||
}
|
||||
}
|
||||
@@ -710,6 +724,14 @@ pub struct PagestreamDbSizeRequest {
|
||||
pub dbnode: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamGetSlruSegmentRequest {
|
||||
pub latest: bool,
|
||||
pub lsn: Lsn,
|
||||
pub kind: u8,
|
||||
pub segno: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamExistsResponse {
|
||||
pub exists: bool,
|
||||
@@ -725,6 +747,11 @@ pub struct PagestreamGetPageResponse {
|
||||
pub page: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetSlruSegmentResponse {
|
||||
pub segment: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamErrorResponse {
|
||||
pub message: String,
|
||||
@@ -788,6 +815,14 @@ impl PagestreamFeMessage {
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.dbnode);
|
||||
}
|
||||
|
||||
Self::GetSlruSegment(req) => {
|
||||
bytes.put_u8(4);
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u8(req.kind);
|
||||
bytes.put_u32(req.segno);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
@@ -838,6 +873,14 @@ impl PagestreamFeMessage {
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
})),
|
||||
4 => Ok(PagestreamFeMessage::GetSlruSegment(
|
||||
PagestreamGetSlruSegmentRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
kind: body.read_u8()?,
|
||||
segno: body.read_u32::<BigEndian>()?,
|
||||
},
|
||||
)),
|
||||
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
|
||||
}
|
||||
}
|
||||
@@ -873,6 +916,12 @@ impl PagestreamBeMessage {
|
||||
bytes.put_u8(Tag::DbSize as u8);
|
||||
bytes.put_i64(resp.db_size);
|
||||
}
|
||||
|
||||
Self::GetSlruSegment(resp) => {
|
||||
bytes.put_u8(Tag::GetSlruSegment as u8);
|
||||
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
|
||||
bytes.put(&resp.segment[..]);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
@@ -913,6 +962,14 @@ impl PagestreamBeMessage {
|
||||
let db_size = buf.read_i64::<BigEndian>()?;
|
||||
Self::DbSize(PagestreamDbSizeResponse { db_size })
|
||||
}
|
||||
Tag::GetSlruSegment => {
|
||||
let n_blocks = buf.read_u32::<BigEndian>()?;
|
||||
let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize];
|
||||
buf.read_exact(&mut segment)?;
|
||||
Self::GetSlruSegment(PagestreamGetSlruSegmentResponse {
|
||||
segment: segment.into(),
|
||||
})
|
||||
}
|
||||
};
|
||||
let remaining = buf.into_inner();
|
||||
if !remaining.is_empty() {
|
||||
@@ -931,6 +988,7 @@ impl PagestreamBeMessage {
|
||||
Self::GetPage(_) => "GetPage",
|
||||
Self::Error(_) => "Error",
|
||||
Self::DbSize(_) => "DbSize",
|
||||
Self::GetSlruSegment(_) => "GetSlruSegment",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,9 +123,11 @@ impl RelTag {
|
||||
PartialOrd,
|
||||
Ord,
|
||||
strum_macros::EnumIter,
|
||||
strum_macros::FromRepr,
|
||||
)]
|
||||
#[repr(u8)]
|
||||
pub enum SlruKind {
|
||||
Clog,
|
||||
Clog = 0,
|
||||
MultiXactMembers,
|
||||
MultiXactOffsets,
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ byteorder.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
rand.workspace = true
|
||||
smallvec.workspace = true
|
||||
tokio.workspace = true
|
||||
tracing.workspace = true
|
||||
thiserror.workspace = true
|
||||
|
||||
@@ -7,7 +7,8 @@ pub mod framed;
|
||||
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use std::{borrow::Cow, collections::HashMap, fmt, io, str};
|
||||
use smallvec::SmallVec;
|
||||
use std::{borrow::Cow, fmt, io, ops::Range, str};
|
||||
|
||||
// re-export for use in utils pageserver_feedback.rs
|
||||
pub use postgres_protocol::PG_EPOCH;
|
||||
@@ -49,29 +50,67 @@ pub enum FeStartupPacket {
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StartupMessageParams {
|
||||
params: HashMap<String, String>,
|
||||
data: String,
|
||||
pairs: SmallVec<[Range<u32>; 4]>,
|
||||
// for easy access
|
||||
user: Option<Range<u32>>,
|
||||
database: Option<Range<u32>>,
|
||||
options: Option<Range<u32>>,
|
||||
replication: Option<Range<u32>>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for StartupMessageParams {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_map().entries(self.iter()).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl StartupMessageParams {
|
||||
/// Get parameter's value by its name.
|
||||
pub fn get(&self, name: &str) -> Option<&str> {
|
||||
self.params.get(name).map(|s| s.as_str())
|
||||
self.pairs
|
||||
.iter()
|
||||
.map(|r| &self.data[r.start as usize..r.end as usize])
|
||||
.find_map(|pair| pair.strip_prefix(name).and_then(|x| x.strip_prefix('\0')))
|
||||
}
|
||||
|
||||
pub fn user(&self) -> Option<&str> {
|
||||
self.user
|
||||
.clone()
|
||||
.and_then(|r| self.data.get(r.start as usize..r.end as usize))
|
||||
}
|
||||
|
||||
pub fn database(&self) -> Option<&str> {
|
||||
self.database
|
||||
.clone()
|
||||
.and_then(|r| self.data.get(r.start as usize..r.end as usize))
|
||||
}
|
||||
|
||||
pub(crate) fn options_str(&self) -> Option<&str> {
|
||||
self.options
|
||||
.clone()
|
||||
.and_then(|r| self.data.get(r.start as usize..r.end as usize))
|
||||
}
|
||||
|
||||
pub fn replication(&self) -> Option<&str> {
|
||||
self.replication
|
||||
.clone()
|
||||
.and_then(|r| self.data.get(r.start as usize..r.end as usize))
|
||||
}
|
||||
|
||||
/// Split command-line options according to PostgreSQL's logic,
|
||||
/// taking into account all escape sequences but leaving them as-is.
|
||||
/// [`None`] means that there's no `options` in [`Self`].
|
||||
pub fn options_raw(&self) -> Option<impl Iterator<Item = &str>> {
|
||||
self.get("options").map(Self::parse_options_raw)
|
||||
self.options_str().map(Self::parse_options_raw)
|
||||
}
|
||||
|
||||
/// Split command-line options according to PostgreSQL's logic,
|
||||
/// applying all escape sequences (using owned strings as needed).
|
||||
/// [`None`] means that there's no `options` in [`Self`].
|
||||
pub fn options_escaped(&self) -> Option<impl Iterator<Item = Cow<'_, str>>> {
|
||||
self.get("options").map(Self::parse_options_escaped)
|
||||
self.options_str().map(Self::parse_options_escaped)
|
||||
}
|
||||
|
||||
/// Split command-line options according to PostgreSQL's logic,
|
||||
@@ -111,15 +150,44 @@ impl StartupMessageParams {
|
||||
|
||||
/// Iterate through key-value pairs in an arbitrary order.
|
||||
pub fn iter(&self) -> impl Iterator<Item = (&str, &str)> {
|
||||
self.params.iter().map(|(k, v)| (k.as_str(), v.as_str()))
|
||||
self.pairs
|
||||
.iter()
|
||||
.map(|r| &self.data[r.start as usize..r.end as usize])
|
||||
.flat_map(|pair| pair.split_once('\0'))
|
||||
}
|
||||
|
||||
// This function is mostly useful in tests.
|
||||
#[doc(hidden)]
|
||||
pub fn new<'a, const N: usize>(pairs: [(&'a str, &'a str); N]) -> Self {
|
||||
Self {
|
||||
params: pairs.map(|(k, v)| (k.to_owned(), v.to_owned())).into(),
|
||||
let mut this = Self {
|
||||
data: Default::default(),
|
||||
pairs: Default::default(),
|
||||
user: Default::default(),
|
||||
database: Default::default(),
|
||||
options: Default::default(),
|
||||
replication: Default::default(),
|
||||
};
|
||||
for (k, v) in pairs {
|
||||
let start = this.data.len();
|
||||
this.data.push_str(k);
|
||||
this.data.push('\0');
|
||||
let value_offset = this.data.len();
|
||||
this.data.push_str(v);
|
||||
let end = this.data.len();
|
||||
this.data.push('\0');
|
||||
let range = start as u32..end as u32;
|
||||
this.pairs.push(range);
|
||||
let value_range = value_offset as u32..end as u32;
|
||||
match k {
|
||||
"user" => this.user = Some(value_range),
|
||||
"database" => this.database = Some(value_range),
|
||||
"options" => this.options = Some(value_range),
|
||||
"replication" => this.replication = Some(value_range),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
this.data.push('\0');
|
||||
this
|
||||
}
|
||||
}
|
||||
|
||||
@@ -346,33 +414,62 @@ impl FeStartupPacket {
|
||||
|
||||
// Parse pairs of null-terminated strings (key, value).
|
||||
// See `postgres: ProcessStartupPacket, build_startup_packet`.
|
||||
let mut tokens = str::from_utf8(&msg)
|
||||
let data = str::from_utf8(&msg)
|
||||
.map_err(|_e| {
|
||||
ProtocolError::BadMessage("StartupMessage params: invalid utf-8".to_owned())
|
||||
})?
|
||||
.strip_suffix('\0') // drop packet's own null
|
||||
.ok_or_else(|| {
|
||||
ProtocolError::Protocol(
|
||||
.to_owned();
|
||||
|
||||
let mut params = StartupMessageParams {
|
||||
data,
|
||||
pairs: Default::default(),
|
||||
user: Default::default(),
|
||||
database: Default::default(),
|
||||
options: Default::default(),
|
||||
replication: Default::default(),
|
||||
};
|
||||
|
||||
let mut offset = 0;
|
||||
let mut rest = params.data.as_str();
|
||||
loop {
|
||||
let Some((key, rest1)) = rest.split_once('\0') else {
|
||||
return Err(ProtocolError::Protocol(
|
||||
"StartupMessage params: missing null terminator".to_string(),
|
||||
)
|
||||
})?
|
||||
.split_terminator('\0');
|
||||
));
|
||||
};
|
||||
// pairs terminated
|
||||
if key.is_empty() {
|
||||
params.data.truncate(offset + 1);
|
||||
params.data.shrink_to_fit();
|
||||
break;
|
||||
}
|
||||
let Some((value, rest2)) = rest1.split_once('\0') else {
|
||||
return Err(ProtocolError::Protocol(
|
||||
"StartupMessage params: missing null terminator".to_string(),
|
||||
));
|
||||
};
|
||||
rest = rest2;
|
||||
|
||||
let mut params = HashMap::new();
|
||||
while let Some(name) = tokens.next() {
|
||||
let value = tokens.next().ok_or_else(|| {
|
||||
ProtocolError::Protocol(
|
||||
"StartupMessage params: key without value".to_string(),
|
||||
)
|
||||
})?;
|
||||
let start = offset;
|
||||
let value_offset = offset + key.len() + 1;
|
||||
let end = value_offset + value.len();
|
||||
offset = end + 1;
|
||||
|
||||
params.insert(name.to_owned(), value.to_owned());
|
||||
params.pairs.push(start as u32..end as u32);
|
||||
let value_range = value_offset as u32..end as u32;
|
||||
match key {
|
||||
"user" => params.user = Some(value_range),
|
||||
"database" => params.database = Some(value_range),
|
||||
"options" => params.options = Some(value_range),
|
||||
"replication" => params.replication = Some(value_range),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
FeStartupPacket::StartupMessage {
|
||||
major_version,
|
||||
minor_version,
|
||||
params: StartupMessageParams { params },
|
||||
params,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -28,6 +28,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::s3_bucket::RequestKind;
|
||||
use crate::TimeTravelError;
|
||||
use crate::{
|
||||
AzureConfig, ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath,
|
||||
RemoteStorage, StorageMetadata,
|
||||
@@ -379,12 +380,10 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
_timestamp: SystemTime,
|
||||
_done_if_after: SystemTime,
|
||||
_cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), TimeTravelError> {
|
||||
// TODO use Azure point in time recovery feature for this
|
||||
// https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
|
||||
Err(anyhow::anyhow!(
|
||||
"time travel recovery for azure blob storage is not implemented"
|
||||
))
|
||||
Err(TimeTravelError::Unimplemented)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -219,7 +219,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
timestamp: SystemTime,
|
||||
done_if_after: SystemTime,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()>;
|
||||
) -> Result<(), TimeTravelError>;
|
||||
}
|
||||
|
||||
pub type DownloadStream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync>>;
|
||||
@@ -269,6 +269,45 @@ impl std::fmt::Display for DownloadError {
|
||||
|
||||
impl std::error::Error for DownloadError {}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum TimeTravelError {
|
||||
/// Validation or other error happened due to user input.
|
||||
BadInput(anyhow::Error),
|
||||
/// The used remote storage does not have time travel recovery implemented
|
||||
Unimplemented,
|
||||
/// The number of versions/deletion markers is above our limit.
|
||||
TooManyVersions,
|
||||
/// A cancellation token aborted the process, typically during
|
||||
/// request closure or process shutdown.
|
||||
Cancelled,
|
||||
/// Other errors
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TimeTravelError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
TimeTravelError::BadInput(e) => {
|
||||
write!(
|
||||
f,
|
||||
"Failed to time travel recover a prefix due to user input: {e}"
|
||||
)
|
||||
}
|
||||
TimeTravelError::Unimplemented => write!(
|
||||
f,
|
||||
"time travel recovery is not implemented for the current storage backend"
|
||||
),
|
||||
TimeTravelError::Cancelled => write!(f, "Cancelled, shutting down"),
|
||||
TimeTravelError::TooManyVersions => {
|
||||
write!(f, "Number of versions/delete markers above limit")
|
||||
}
|
||||
TimeTravelError::Other(e) => write!(f, "Failed to time travel recover a prefix: {e:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for TimeTravelError {}
|
||||
|
||||
/// Every storage, currently supported.
|
||||
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
|
||||
#[derive(Clone)]
|
||||
@@ -404,7 +443,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
timestamp: SystemTime,
|
||||
done_if_after: SystemTime,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), TimeTravelError> {
|
||||
match self {
|
||||
Self::LocalFs(s) => {
|
||||
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
|
||||
@@ -434,7 +473,12 @@ impl GenericRemoteStorage {
|
||||
Self::LocalFs(LocalFs::new(root.clone())?)
|
||||
}
|
||||
RemoteStorageKind::AwsS3(s3_config) => {
|
||||
info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}'",
|
||||
// The profile and access key id are only printed here for debugging purposes,
|
||||
// their values don't indicate the eventually taken choice for auth.
|
||||
let profile = std::env::var("AWS_PROFILE").unwrap_or_else(|_| "<none>".into());
|
||||
let access_key_id =
|
||||
std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_else(|_| "<none>".into());
|
||||
info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}', profile: {profile}, access_key_id: {access_key_id}",
|
||||
s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
|
||||
Self::AwsS3(Arc::new(S3Bucket::new(s3_config)?))
|
||||
}
|
||||
|
||||
@@ -18,7 +18,9 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken};
|
||||
use tracing::*;
|
||||
use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
|
||||
|
||||
use crate::{Download, DownloadError, DownloadStream, Listing, ListingMode, RemotePath};
|
||||
use crate::{
|
||||
Download, DownloadError, DownloadStream, Listing, ListingMode, RemotePath, TimeTravelError,
|
||||
};
|
||||
|
||||
use super::{RemoteStorage, StorageMetadata};
|
||||
|
||||
@@ -430,8 +432,8 @@ impl RemoteStorage for LocalFs {
|
||||
_timestamp: SystemTime,
|
||||
_done_if_after: SystemTime,
|
||||
_cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
unimplemented!()
|
||||
) -> Result<(), TimeTravelError> {
|
||||
Err(TimeTravelError::Unimplemented)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -46,7 +46,7 @@ use utils::backoff;
|
||||
use super::StorageMetadata;
|
||||
use crate::{
|
||||
ConcurrencyLimiter, Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorage,
|
||||
S3Config, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
S3Config, TimeTravelError, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
pub(super) mod metrics;
|
||||
@@ -639,14 +639,14 @@ impl RemoteStorage for S3Bucket {
|
||||
timestamp: SystemTime,
|
||||
done_if_after: SystemTime,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), TimeTravelError> {
|
||||
let kind = RequestKind::TimeTravel;
|
||||
let _guard = self.permit(kind).await;
|
||||
|
||||
let timestamp = DateTime::from(timestamp);
|
||||
let done_if_after = DateTime::from(done_if_after);
|
||||
|
||||
tracing::info!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
|
||||
tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
|
||||
|
||||
// get the passed prefix or if it is not set use prefix_in_bucket value
|
||||
let prefix = prefix
|
||||
@@ -664,21 +664,21 @@ impl RemoteStorage for S3Bucket {
|
||||
loop {
|
||||
let response = backoff::retry(
|
||||
|| async {
|
||||
Ok(self
|
||||
.client
|
||||
self.client
|
||||
.list_object_versions()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.set_prefix(prefix.clone())
|
||||
.set_key_marker(key_marker.clone())
|
||||
.set_version_id_marker(version_id_marker.clone())
|
||||
.send()
|
||||
.await?)
|
||||
.await
|
||||
.map_err(|e| TimeTravelError::Other(e.into()))
|
||||
},
|
||||
is_permanent,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
"listing object versions for time_travel_recover",
|
||||
backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")),
|
||||
backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -699,7 +699,8 @@ impl RemoteStorage for S3Bucket {
|
||||
.map(VerOrDelete::from_delete_marker);
|
||||
itertools::process_results(versions.chain(deletes), |n_vds| {
|
||||
versions_and_deletes.extend(n_vds)
|
||||
})?;
|
||||
})
|
||||
.map_err(TimeTravelError::Other)?;
|
||||
fn none_if_empty(v: Option<String>) -> Option<String> {
|
||||
v.filter(|v| !v.is_empty())
|
||||
}
|
||||
@@ -708,9 +709,9 @@ impl RemoteStorage for S3Bucket {
|
||||
if version_id_marker.is_none() {
|
||||
// The final response is not supposed to be truncated
|
||||
if response.is_truncated.unwrap_or_default() {
|
||||
anyhow::bail!(
|
||||
return Err(TimeTravelError::Other(anyhow::anyhow!(
|
||||
"Received truncated ListObjectVersions response for prefix={prefix:?}"
|
||||
);
|
||||
)));
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -721,12 +722,15 @@ impl RemoteStorage for S3Bucket {
|
||||
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
|
||||
const COMPLEXITY_LIMIT: usize = 100_000;
|
||||
if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
|
||||
anyhow::bail!(
|
||||
"Limit for number of versions/deletions exceeded for prefix={prefix:?}"
|
||||
);
|
||||
return Err(TimeTravelError::TooManyVersions);
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"Built list for time travel with {} versions and deletions",
|
||||
versions_and_deletes.len()
|
||||
);
|
||||
|
||||
// Work on the list of references instead of the objects directly,
|
||||
// otherwise we get lifetime errors in the sort_by_key call below.
|
||||
let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
|
||||
@@ -740,8 +744,8 @@ impl RemoteStorage for S3Bucket {
|
||||
version_id, key, ..
|
||||
} = &vd;
|
||||
if version_id == "null" {
|
||||
anyhow::bail!("Received ListVersions response for key={key} with version_id='null', \
|
||||
indicating either disabled versioning, or legacy objects with null version id values");
|
||||
return Err(TimeTravelError::Other(anyhow!("Received ListVersions response for key={key} with version_id='null', \
|
||||
indicating either disabled versioning, or legacy objects with null version id values")));
|
||||
}
|
||||
tracing::trace!(
|
||||
"Parsing version key={key} version_id={version_id} kind={:?}",
|
||||
@@ -788,22 +792,23 @@ impl RemoteStorage for S3Bucket {
|
||||
|
||||
backoff::retry(
|
||||
|| async {
|
||||
Ok(self
|
||||
.client
|
||||
self.client
|
||||
.copy_object()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.key(key)
|
||||
.copy_source(&source_id)
|
||||
.send()
|
||||
.await?)
|
||||
.await
|
||||
.map_err(|e| TimeTravelError::Other(e.into()))
|
||||
},
|
||||
is_permanent,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
"listing object versions for time_travel_recover",
|
||||
backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")),
|
||||
"copying object version for time_travel_recover",
|
||||
backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled),
|
||||
)
|
||||
.await?;
|
||||
tracing::info!(%version_id, %key, "Copied old version in S3");
|
||||
}
|
||||
VerOrDelete {
|
||||
kind: VerOrDeleteKind::DeleteMarker,
|
||||
@@ -820,8 +825,13 @@ impl RemoteStorage for S3Bucket {
|
||||
} else {
|
||||
tracing::trace!("Deleting {key}...");
|
||||
|
||||
let oid = ObjectIdentifier::builder().key(key.to_owned()).build()?;
|
||||
self.delete_oids(kind, &[oid]).await?;
|
||||
let oid = ObjectIdentifier::builder()
|
||||
.key(key.to_owned())
|
||||
.build()
|
||||
.map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?;
|
||||
self.delete_oids(kind, &[oid])
|
||||
.await
|
||||
.map_err(TimeTravelError::Other)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{
|
||||
Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
|
||||
StorageMetadata,
|
||||
StorageMetadata, TimeTravelError,
|
||||
};
|
||||
|
||||
pub struct UnreliableWrapper {
|
||||
@@ -191,8 +191,9 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
timestamp: SystemTime,
|
||||
done_if_after: SystemTime,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))?;
|
||||
) -> Result<(), TimeTravelError> {
|
||||
self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
|
||||
.map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?;
|
||||
self.inner
|
||||
.time_travel_recover(prefix, timestamp, done_if_after, cancel)
|
||||
.await
|
||||
|
||||
@@ -1,4 +1,10 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
/// Gates are a concurrency helper, primarily used for implementing safe shutdown.
|
||||
///
|
||||
@@ -6,62 +12,70 @@ use std::{sync::Arc, time::Duration};
|
||||
/// the resource calls `close()` when they want to ensure that all holders of guards
|
||||
/// have released them, and that no future guards will be issued.
|
||||
pub struct Gate {
|
||||
/// Each caller of enter() takes one unit from the semaphore. In close(), we
|
||||
/// take all the units to ensure all GateGuards are destroyed.
|
||||
sem: Arc<tokio::sync::Semaphore>,
|
||||
|
||||
/// For observability only: a name that will be used to log warnings if a particular
|
||||
/// gate is holding up shutdown
|
||||
name: String,
|
||||
inner: Arc<GateInner>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Gate {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "Gate<{}>", self.name)
|
||||
f.debug_struct("Gate")
|
||||
// use this for identification
|
||||
.field("ptr", &Arc::as_ptr(&self.inner))
|
||||
.field("inner", &self.inner)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
struct GateInner {
|
||||
sem: tokio::sync::Semaphore,
|
||||
closing: std::sync::atomic::AtomicBool,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for GateInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let avail = self.sem.available_permits();
|
||||
|
||||
let guards = u32::try_from(avail)
|
||||
.ok()
|
||||
// the sem only supports 32-bit ish amount, but lets play it safe
|
||||
.and_then(|x| Gate::MAX_UNITS.checked_sub(x));
|
||||
|
||||
let closing = self.closing.load(Ordering::Relaxed);
|
||||
|
||||
if let Some(guards) = guards {
|
||||
f.debug_struct("Gate")
|
||||
.field("remaining_guards", &guards)
|
||||
.field("closing", &closing)
|
||||
.finish()
|
||||
} else {
|
||||
f.debug_struct("Gate")
|
||||
.field("avail_permits", &avail)
|
||||
.field("closing", &closing)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII guard for a [`Gate`]: as long as this exists, calls to [`Gate::close`] will
|
||||
/// not complete.
|
||||
#[derive(Debug)]
|
||||
pub struct GateGuard(tokio::sync::OwnedSemaphorePermit);
|
||||
pub struct GateGuard {
|
||||
// Record the span where the gate was entered, so that we can identify who was blocking Gate::close
|
||||
span_at_enter: tracing::Span,
|
||||
gate: Arc<GateInner>,
|
||||
}
|
||||
|
||||
/// Observability helper: every `warn_period`, emit a log warning that we're still waiting on this gate
|
||||
async fn warn_if_stuck<Fut: std::future::Future>(
|
||||
fut: Fut,
|
||||
name: &str,
|
||||
warn_period: std::time::Duration,
|
||||
) -> <Fut as std::future::Future>::Output {
|
||||
let started = std::time::Instant::now();
|
||||
|
||||
let mut fut = std::pin::pin!(fut);
|
||||
|
||||
let mut warned = false;
|
||||
let ret = loop {
|
||||
match tokio::time::timeout(warn_period, &mut fut).await {
|
||||
Ok(ret) => break ret,
|
||||
Err(_) => {
|
||||
tracing::warn!(
|
||||
gate = name,
|
||||
elapsed_ms = started.elapsed().as_millis(),
|
||||
"still waiting, taking longer than expected..."
|
||||
);
|
||||
warned = true;
|
||||
}
|
||||
impl Drop for GateGuard {
|
||||
fn drop(&mut self) {
|
||||
if self.gate.closing.load(Ordering::Relaxed) {
|
||||
self.span_at_enter.in_scope(
|
||||
|| tracing::info!(gate = ?Arc::as_ptr(&self.gate), "kept the gate from closing"),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
// If we emitted a warning for slowness, also emit a message when we complete, so that
|
||||
// someone debugging a shutdown can know for sure whether we have moved past this operation.
|
||||
if warned {
|
||||
tracing::info!(
|
||||
gate = name,
|
||||
elapsed_ms = started.elapsed().as_millis(),
|
||||
"completed, after taking longer than expected"
|
||||
)
|
||||
// when the permit was acquired, it was forgotten to allow us to manage it's lifecycle
|
||||
// manually, so "return" the permit now.
|
||||
self.gate.sem.add_permits(1);
|
||||
}
|
||||
|
||||
ret
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -69,15 +83,19 @@ pub enum GateError {
|
||||
GateClosed,
|
||||
}
|
||||
|
||||
impl Gate {
|
||||
const MAX_UNITS: u32 = u32::MAX;
|
||||
|
||||
pub fn new(name: String) -> Self {
|
||||
impl Default for Gate {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
sem: Arc::new(tokio::sync::Semaphore::new(Self::MAX_UNITS as usize)),
|
||||
name,
|
||||
inner: Arc::new(GateInner {
|
||||
sem: tokio::sync::Semaphore::new(Self::MAX_UNITS as usize),
|
||||
closing: AtomicBool::new(false),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Gate {
|
||||
const MAX_UNITS: u32 = u32::MAX;
|
||||
|
||||
/// Acquire a guard that will prevent close() calls from completing. If close()
|
||||
/// was already called, this will return an error which should be interpreted
|
||||
@@ -88,11 +106,23 @@ impl Gate {
|
||||
/// to avoid blocking close() indefinitely: typically types that contain a Gate will
|
||||
/// also contain a CancellationToken.
|
||||
pub fn enter(&self) -> Result<GateGuard, GateError> {
|
||||
self.sem
|
||||
.clone()
|
||||
.try_acquire_owned()
|
||||
.map(GateGuard)
|
||||
.map_err(|_| GateError::GateClosed)
|
||||
let permit = self
|
||||
.inner
|
||||
.sem
|
||||
.try_acquire()
|
||||
.map_err(|_| GateError::GateClosed)?;
|
||||
|
||||
// we now have the permit, let's disable the normal raii functionality and leave
|
||||
// "returning" the permit to our GateGuard::drop.
|
||||
//
|
||||
// this is done to avoid the need for multiple Arcs (one for semaphore, next for other
|
||||
// fields).
|
||||
permit.forget();
|
||||
|
||||
Ok(GateGuard {
|
||||
span_at_enter: tracing::Span::current(),
|
||||
gate: self.inner.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Types with a shutdown() method and a gate should call this method at the
|
||||
@@ -102,48 +132,88 @@ impl Gate {
|
||||
/// important that the holders of such guards are respecting a CancellationToken which has
|
||||
/// been cancelled before entering this function.
|
||||
pub async fn close(&self) {
|
||||
warn_if_stuck(self.do_close(), &self.name, Duration::from_millis(1000)).await
|
||||
let started_at = std::time::Instant::now();
|
||||
let mut do_close = std::pin::pin!(self.do_close());
|
||||
|
||||
let nag_after = Duration::from_secs(1);
|
||||
|
||||
let Err(_timeout) = tokio::time::timeout(nag_after, &mut do_close).await else {
|
||||
return;
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
gate = ?self.as_ptr(),
|
||||
elapsed_ms = started_at.elapsed().as_millis(),
|
||||
"closing is taking longer than expected"
|
||||
);
|
||||
|
||||
// close operation is not trying to be cancellation safe as pageserver does not need it.
|
||||
//
|
||||
// note: "closing" is not checked in Gate::enter -- it exists just for observability,
|
||||
// dropping of GateGuard after this will log who they were.
|
||||
self.inner.closing.store(true, Ordering::Relaxed);
|
||||
|
||||
do_close.await;
|
||||
|
||||
tracing::info!(
|
||||
gate = ?self.as_ptr(),
|
||||
elapsed_ms = started_at.elapsed().as_millis(),
|
||||
"close completed"
|
||||
);
|
||||
}
|
||||
|
||||
/// Used as an identity of a gate. This identity will be resolved to something useful when
|
||||
/// it's actually closed in a hopefully sensible `tracing::Span` which will describe it even
|
||||
/// more.
|
||||
///
|
||||
/// `GateGuard::drop` also logs this pointer when it has realized it has been keeping the gate
|
||||
/// open for too long.
|
||||
fn as_ptr(&self) -> *const GateInner {
|
||||
Arc::as_ptr(&self.inner)
|
||||
}
|
||||
|
||||
/// Check if [`Self::close()`] has finished waiting for all [`Self::enter()`] users to finish. This
|
||||
/// is usually analoguous for "Did shutdown finish?" for types that include a Gate, whereas checking
|
||||
/// the CancellationToken on such types is analogous to "Did shutdown start?"
|
||||
pub fn close_complete(&self) -> bool {
|
||||
self.sem.is_closed()
|
||||
self.inner.sem.is_closed()
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(gate = ?self.as_ptr()))]
|
||||
async fn do_close(&self) {
|
||||
tracing::debug!(gate = self.name, "Closing Gate...");
|
||||
match self.sem.acquire_many(Self::MAX_UNITS).await {
|
||||
Ok(_units) => {
|
||||
tracing::debug!("Closing Gate...");
|
||||
|
||||
match self.inner.sem.acquire_many(Self::MAX_UNITS).await {
|
||||
Ok(_permit) => {
|
||||
// While holding all units, close the semaphore. All subsequent calls to enter() will fail.
|
||||
self.sem.close();
|
||||
self.inner.sem.close();
|
||||
}
|
||||
Err(_) => {
|
||||
Err(_closed) => {
|
||||
// Semaphore closed: we are the only function that can do this, so it indicates a double-call.
|
||||
// This is legal. Timeline::shutdown for example is not protected from being called more than
|
||||
// once.
|
||||
tracing::debug!(gate = self.name, "Double close")
|
||||
tracing::debug!("Double close")
|
||||
}
|
||||
}
|
||||
tracing::debug!(gate = self.name, "Closed Gate.")
|
||||
tracing::debug!("Closed Gate.")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::FutureExt;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_idle_gate() {
|
||||
// Having taken no gates, we should not be blocked in close
|
||||
let gate = Gate::new("test".to_string());
|
||||
async fn close_unused() {
|
||||
// Having taken no guards, we should not be blocked in close
|
||||
let gate = Gate::default();
|
||||
gate.close().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn close_idle() {
|
||||
// If a guard is dropped before entering, close should not be blocked
|
||||
let gate = Gate::new("test".to_string());
|
||||
let gate = Gate::default();
|
||||
let guard = gate.enter().unwrap();
|
||||
drop(guard);
|
||||
gate.close().await;
|
||||
@@ -152,25 +222,30 @@ mod tests {
|
||||
gate.enter().expect_err("enter should fail after close");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_busy_gate() {
|
||||
let gate = Gate::new("test".to_string());
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn close_busy_gate() {
|
||||
let gate = Gate::default();
|
||||
let forever = Duration::from_secs(24 * 7 * 365);
|
||||
|
||||
let guard = gate.enter().unwrap();
|
||||
let guard =
|
||||
tracing::info_span!("i am holding back the gate").in_scope(|| gate.enter().unwrap());
|
||||
|
||||
let mut close_fut = std::pin::pin!(gate.close());
|
||||
|
||||
// Close should be blocked
|
||||
assert!(close_fut.as_mut().now_or_never().is_none());
|
||||
// Close should be waiting for guards to drop
|
||||
tokio::time::timeout(forever, &mut close_fut)
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
// Attempting to enter() should fail, even though close isn't done yet.
|
||||
gate.enter()
|
||||
.expect_err("enter should fail after entering close");
|
||||
|
||||
// this will now log, which we cannot verify except manually
|
||||
drop(guard);
|
||||
|
||||
// Guard is gone, close should finish
|
||||
assert!(close_fut.as_mut().now_or_never().is_some());
|
||||
close_fut.await;
|
||||
|
||||
// Attempting to enter() is still forbidden
|
||||
gate.enter().expect_err("enter should fail finishing close");
|
||||
|
||||
@@ -21,7 +21,6 @@ camino.workspace = true
|
||||
camino-tempfile.workspace = true
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
clap = { workspace = true, features = ["string"] }
|
||||
close_fds.workspace = true
|
||||
const_format.workspace = true
|
||||
consumption_metrics.workspace = true
|
||||
crc32c.workspace = true
|
||||
|
||||
@@ -156,7 +156,8 @@ impl PagestreamClient {
|
||||
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
|
||||
PagestreamBeMessage::Exists(_)
|
||||
| PagestreamBeMessage::Nblocks(_)
|
||||
| PagestreamBeMessage::DbSize(_) => {
|
||||
| PagestreamBeMessage::DbSize(_)
|
||||
| PagestreamBeMessage::GetSlruSegment(_) => {
|
||||
anyhow::bail!(
|
||||
"unexpected be message kind in response to getpage request: {}",
|
||||
msg.kind()
|
||||
|
||||
@@ -66,13 +66,10 @@ impl serde::Serialize for LatencyPercentiles {
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?;
|
||||
for p in LATENCY_PERCENTILES {
|
||||
for (p, v) in LATENCY_PERCENTILES.iter().zip(&self.latency_percentiles) {
|
||||
ser.serialize_entry(
|
||||
&format!("p{p}"),
|
||||
&format!(
|
||||
"{}",
|
||||
&humantime::format_duration(self.latency_percentiles[0])
|
||||
),
|
||||
&format!("{}", humantime::format_duration(*v)),
|
||||
)?;
|
||||
}
|
||||
ser.end()
|
||||
|
||||
@@ -222,6 +222,8 @@ where
|
||||
async fn send_tarball(mut self) -> anyhow::Result<()> {
|
||||
// TODO include checksum
|
||||
|
||||
let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;
|
||||
|
||||
// Create pgdata subdirs structure
|
||||
for dir in PGDATA_SUBDIRS.iter() {
|
||||
let header = new_tar_header_dir(dir)?;
|
||||
@@ -248,29 +250,29 @@ where
|
||||
.context("could not add config file to basebackup tarball")?;
|
||||
}
|
||||
}
|
||||
|
||||
// Gather non-relational files from object storage pages.
|
||||
let slru_partitions = self
|
||||
.timeline
|
||||
.get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
|
||||
.await?
|
||||
.partition(Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64);
|
||||
|
||||
let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
|
||||
|
||||
for part in slru_partitions.parts {
|
||||
let blocks = self
|
||||
if !lazy_slru_download {
|
||||
// Gather non-relational files from object storage pages.
|
||||
let slru_partitions = self
|
||||
.timeline
|
||||
.get_vectored(&part.ranges, self.lsn, self.ctx)
|
||||
.await?;
|
||||
.get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
|
||||
.await?
|
||||
.partition(Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64);
|
||||
|
||||
for (key, block) in blocks {
|
||||
slru_builder.add_block(&key, block?).await?;
|
||||
let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
|
||||
|
||||
for part in slru_partitions.parts {
|
||||
let blocks = self
|
||||
.timeline
|
||||
.get_vectored(&part.ranges, self.lsn, self.ctx)
|
||||
.await?;
|
||||
|
||||
for (key, block) in blocks {
|
||||
slru_builder.add_block(&key, block?).await?;
|
||||
}
|
||||
}
|
||||
slru_builder.finish().await?;
|
||||
}
|
||||
|
||||
slru_builder.finish().await?;
|
||||
|
||||
let mut min_restart_lsn: Lsn = Lsn::MAX;
|
||||
// Create tablespace directories
|
||||
for ((spcnode, dbnode), has_relmap_file) in
|
||||
|
||||
@@ -33,12 +33,10 @@ use pageserver::{
|
||||
use postgres_backend::AuthType;
|
||||
use utils::failpoint_support;
|
||||
use utils::logging::TracingErrorLayerEnablement;
|
||||
use utils::signals::ShutdownSignals;
|
||||
use utils::{
|
||||
auth::{JwtAuth, SwappableJwtAuth},
|
||||
logging, project_build_tag, project_git_version,
|
||||
sentry_init::init_sentry,
|
||||
signals::Signal,
|
||||
tcp_listener,
|
||||
};
|
||||
|
||||
@@ -656,34 +654,42 @@ fn start_pageserver(
|
||||
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
ShutdownSignals::handle(|signal| match signal {
|
||||
Signal::Quit => {
|
||||
info!(
|
||||
"Got {}. Terminating in immediate shutdown mode",
|
||||
signal.name()
|
||||
);
|
||||
std::process::exit(111);
|
||||
}
|
||||
{
|
||||
use signal_hook::consts::*;
|
||||
let signal_handler = BACKGROUND_RUNTIME.spawn_blocking(move || {
|
||||
let mut signals =
|
||||
signal_hook::iterator::Signals::new([SIGINT, SIGTERM, SIGQUIT]).unwrap();
|
||||
return signals
|
||||
.forever()
|
||||
.next()
|
||||
.expect("forever() never returns None unless explicitly closed");
|
||||
});
|
||||
let signal = BACKGROUND_RUNTIME
|
||||
.block_on(signal_handler)
|
||||
.expect("join error");
|
||||
match signal {
|
||||
SIGQUIT => {
|
||||
info!("Got signal {signal}. Terminating in immediate shutdown mode",);
|
||||
std::process::exit(111);
|
||||
}
|
||||
SIGINT | SIGTERM => {
|
||||
info!("Got signal {signal}. Terminating gracefully in fast shutdown mode",);
|
||||
|
||||
Signal::Interrupt | Signal::Terminate => {
|
||||
info!(
|
||||
"Got {}. Terminating gracefully in fast shutdown mode",
|
||||
signal.name()
|
||||
);
|
||||
|
||||
// This cancels the `shutdown_pageserver` cancellation tree.
|
||||
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
|
||||
// The plan is to change that over time.
|
||||
shutdown_pageserver.take();
|
||||
let bg_remote_storage = remote_storage.clone();
|
||||
let bg_deletion_queue = deletion_queue.clone();
|
||||
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(
|
||||
bg_remote_storage.map(|_| bg_deletion_queue),
|
||||
0,
|
||||
));
|
||||
unreachable!()
|
||||
// This cancels the `shutdown_pageserver` cancellation tree.
|
||||
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
|
||||
// The plan is to change that over time.
|
||||
shutdown_pageserver.take();
|
||||
let bg_remote_storage = remote_storage.clone();
|
||||
let bg_deletion_queue = deletion_queue.clone();
|
||||
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(
|
||||
bg_remote_storage.map(|_| bg_deletion_queue),
|
||||
0,
|
||||
));
|
||||
unreachable!()
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn create_remote_storage_client(
|
||||
|
||||
@@ -178,6 +178,64 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_id}/time_travel_remote_storage:
|
||||
parameters:
|
||||
- name: tenant_id
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: travel_to
|
||||
in: query
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: date-time
|
||||
- name: done_if_after
|
||||
in: query
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: date-time
|
||||
put:
|
||||
description: Time travel the tenant's remote storage
|
||||
responses:
|
||||
"200":
|
||||
description: OK
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: string
|
||||
"400":
|
||||
description: Error when no tenant id found in path or invalid timestamp
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline:
|
||||
parameters:
|
||||
@@ -1443,7 +1501,8 @@ components:
|
||||
node_id:
|
||||
description: Pageserver node ID where this shard is attached
|
||||
type: integer
|
||||
shard_id: Tenant shard ID of the shard
|
||||
shard_id:
|
||||
description: Tenant shard ID of the shard
|
||||
type: string
|
||||
SecondaryConfig:
|
||||
type: object
|
||||
|
||||
@@ -26,6 +26,7 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::TimeTravelError;
|
||||
use tenant_size_model::{SizeResult, StorageModel};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
@@ -47,6 +48,7 @@ use crate::tenant::mgr::{
|
||||
TenantSlotError, TenantSlotUpsertError, TenantStateError,
|
||||
};
|
||||
use crate::tenant::mgr::{TenantSlot, UpsertLocationError};
|
||||
use crate::tenant::remote_timeline_client;
|
||||
use crate::tenant::secondary::SecondaryController;
|
||||
use crate::tenant::size::ModelInputs;
|
||||
use crate::tenant::storage_layer::LayerAccessStatsReset;
|
||||
@@ -77,8 +79,14 @@ use utils::{
|
||||
// For APIs that require an Active tenant, how long should we block waiting for that state?
|
||||
// This is not functionally necessary (clients will retry), but avoids generating a lot of
|
||||
// failed API calls while tenants are activating.
|
||||
#[cfg(not(feature = "testing"))]
|
||||
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
|
||||
|
||||
// Tests run on slow/oversubscribed nodes, and may need to wait much longer for tenants to
|
||||
// finish attaching, if calls to remote storage are slow.
|
||||
#[cfg(feature = "testing")]
|
||||
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
|
||||
|
||||
pub struct State {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
@@ -959,6 +967,7 @@ async fn tenant_status(
|
||||
attachment_status: state.attachment_status(),
|
||||
generation: tenant.generation().into(),
|
||||
},
|
||||
walredo: tenant.wal_redo_manager_status(),
|
||||
timelines: tenant.list_timeline_ids(),
|
||||
})
|
||||
}
|
||||
@@ -1423,6 +1432,79 @@ async fn list_location_config_handler(
|
||||
json_response(StatusCode::OK, result)
|
||||
}
|
||||
|
||||
// Do a time travel recovery on the given tenant/tenant shard. Tenant needs to be detached
|
||||
// (from all pageservers) as it invalidates consistency assumptions.
|
||||
async fn tenant_time_travel_remote_storage_handler(
|
||||
request: Request<Body>,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let timestamp_raw = must_get_query_param(&request, "travel_to")?;
|
||||
let timestamp = humantime::parse_rfc3339(×tamp_raw)
|
||||
.with_context(|| format!("Invalid time for travel_to: {timestamp_raw:?}"))
|
||||
.map_err(ApiError::BadRequest)?;
|
||||
|
||||
let done_if_after_raw = must_get_query_param(&request, "done_if_after")?;
|
||||
let done_if_after = humantime::parse_rfc3339(&done_if_after_raw)
|
||||
.with_context(|| format!("Invalid time for done_if_after: {done_if_after_raw:?}"))
|
||||
.map_err(ApiError::BadRequest)?;
|
||||
|
||||
// This is just a sanity check to fend off naive wrong usages of the API:
|
||||
// the tenant needs to be detached *everywhere*
|
||||
let state = get_state(&request);
|
||||
let we_manage_tenant = state.tenant_manager.manages_tenant_shard(tenant_shard_id);
|
||||
if we_manage_tenant {
|
||||
return Err(ApiError::BadRequest(anyhow!(
|
||||
"Tenant {tenant_shard_id} is already attached at this pageserver"
|
||||
)));
|
||||
}
|
||||
|
||||
let Some(storage) = state.remote_storage.as_ref() else {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"remote storage not configured, cannot run time travel"
|
||||
)));
|
||||
};
|
||||
|
||||
if timestamp > done_if_after {
|
||||
return Err(ApiError::BadRequest(anyhow!(
|
||||
"The done_if_after timestamp comes before the timestamp to recover to"
|
||||
)));
|
||||
}
|
||||
|
||||
tracing::info!("Issuing time travel request internally. timestamp={timestamp_raw}, done_if_after={done_if_after_raw}");
|
||||
|
||||
remote_timeline_client::upload::time_travel_recover_tenant(
|
||||
storage,
|
||||
&tenant_shard_id,
|
||||
timestamp,
|
||||
done_if_after,
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
TimeTravelError::BadInput(e) => {
|
||||
warn!("bad input error: {e}");
|
||||
ApiError::BadRequest(anyhow!("bad input error"))
|
||||
}
|
||||
TimeTravelError::Unimplemented => {
|
||||
ApiError::BadRequest(anyhow!("unimplemented for the configured remote storage"))
|
||||
}
|
||||
TimeTravelError::Cancelled => ApiError::InternalServerError(anyhow!("cancelled")),
|
||||
TimeTravelError::TooManyVersions => {
|
||||
ApiError::InternalServerError(anyhow!("too many versions in remote storage"))
|
||||
}
|
||||
TimeTravelError::Other(e) => {
|
||||
warn!("internal error: {e}");
|
||||
ApiError::InternalServerError(anyhow!("internal error"))
|
||||
}
|
||||
})?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`].
|
||||
async fn handle_tenant_break(
|
||||
r: Request<Body>,
|
||||
@@ -1968,6 +2050,10 @@ pub fn make_router(
|
||||
.get("/v1/location_config", |r| {
|
||||
api_handler(r, list_location_config_handler)
|
||||
})
|
||||
.put(
|
||||
"/v1/tenant/:tenant_shard_id/time_travel_remote_storage",
|
||||
|r| api_handler(r, tenant_time_travel_remote_storage_handler),
|
||||
)
|
||||
.get("/v1/tenant/:tenant_shard_id/timeline", |r| {
|
||||
api_handler(r, timeline_list_handler)
|
||||
})
|
||||
|
||||
@@ -1043,6 +1043,7 @@ pub enum SmgrQueryType {
|
||||
GetRelSize,
|
||||
GetPageAtLsn,
|
||||
GetDbSize,
|
||||
GetSlruSegment,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -1159,11 +1160,12 @@ mod smgr_query_time_tests {
|
||||
#[test]
|
||||
fn op_label_name() {
|
||||
use super::SmgrQueryType::*;
|
||||
let expect: [(super::SmgrQueryType, &'static str); 4] = [
|
||||
let expect: [(super::SmgrQueryType, &'static str); 5] = [
|
||||
(GetRelExists, "get_rel_exists"),
|
||||
(GetRelSize, "get_rel_size"),
|
||||
(GetPageAtLsn, "get_page_at_lsn"),
|
||||
(GetDbSize, "get_db_size"),
|
||||
(GetSlruSegment, "get_slru_segment"),
|
||||
];
|
||||
for (op, expect) in expect {
|
||||
let actual: &'static str = op.into();
|
||||
@@ -1649,11 +1651,18 @@ pub(crate) static WAL_REDO_RECORD_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
#[rustfmt::skip]
|
||||
pub(crate) static WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_wal_redo_process_launch_duration",
|
||||
"Histogram of the duration of successful WalRedoProcess::launch calls",
|
||||
redo_histogram_time_buckets!(),
|
||||
vec![
|
||||
0.0002, 0.0004, 0.0006, 0.0008, 0.0010,
|
||||
0.0020, 0.0040, 0.0060, 0.0080, 0.0100,
|
||||
0.0200, 0.0400, 0.0600, 0.0800, 0.1000,
|
||||
0.2000, 0.4000, 0.6000, 0.8000, 1.0000,
|
||||
1.5000, 2.0000, 2.5000, 3.0000, 4.0000, 10.0000
|
||||
],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
@@ -22,7 +22,8 @@ use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
|
||||
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
|
||||
PagestreamNblocksRequest, PagestreamNblocksResponse,
|
||||
PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
|
||||
PagestreamNblocksResponse,
|
||||
};
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber};
|
||||
@@ -74,8 +75,8 @@ use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::trace::Tracer;
|
||||
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
@@ -647,6 +648,15 @@ impl PageServerHandler {
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetSlruSegment(req) => {
|
||||
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.lsn);
|
||||
(
|
||||
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
match response {
|
||||
@@ -1137,6 +1147,33 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
async fn handle_get_slru_segment_request(
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &PagestreamGetSlruSegmentRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
|
||||
|
||||
let _timer = timeline
|
||||
.query_metrics
|
||||
.start_timer(metrics::SmgrQueryType::GetSlruSegment);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
let kind = SlruKind::from_repr(req.kind)
|
||||
.ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
|
||||
let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetSlruSegment(
|
||||
PagestreamGetSlruSegmentResponse { segment },
|
||||
))
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all, fields(?lsn, ?prev_lsn, %full_backup))]
|
||||
async fn handle_basebackup_request<IO>(
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::repository::*;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use pageserver_api::key::{
|
||||
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
|
||||
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
|
||||
@@ -321,6 +321,27 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the whole SLRU segment
|
||||
pub(crate) async fn get_slru_segment(
|
||||
&self,
|
||||
kind: SlruKind,
|
||||
segno: u32,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
let n_blocks = self
|
||||
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
|
||||
.await?;
|
||||
let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
|
||||
for blkno in 0..n_blocks {
|
||||
let block = self
|
||||
.get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx)
|
||||
.await?;
|
||||
segment.extend_from_slice(&block[..BLCKSZ as usize]);
|
||||
}
|
||||
Ok(segment.freeze())
|
||||
}
|
||||
|
||||
/// Look up given SLRU page version.
|
||||
pub(crate) async fn get_slru_page_at_lsn(
|
||||
&self,
|
||||
|
||||
@@ -20,6 +20,7 @@ use futures::FutureExt;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::models;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use pageserver_api::models::WalRedoManagerStatus;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::DownloadError;
|
||||
@@ -364,6 +365,14 @@ impl WalRedoManager {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
|
||||
match self {
|
||||
WalRedoManager::Prod(m) => m.status(),
|
||||
#[cfg(test)]
|
||||
WalRedoManager::Test(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
|
||||
@@ -1020,6 +1029,7 @@ impl Tenant {
|
||||
Some(remote_timeline_client),
|
||||
self.deletion_queue_client.clone(),
|
||||
)
|
||||
.instrument(tracing::info_span!("timeline_delete", %timeline_id))
|
||||
.await
|
||||
.context("resume_deletion")
|
||||
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
|
||||
@@ -1956,6 +1966,10 @@ impl Tenant {
|
||||
self.generation
|
||||
}
|
||||
|
||||
pub(crate) fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
|
||||
self.walredo_mgr.status()
|
||||
}
|
||||
|
||||
/// Changes tenant status to active, unless shutdown was already requested.
|
||||
///
|
||||
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
|
||||
@@ -2093,7 +2107,10 @@ impl Tenant {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
timelines.values().for_each(|timeline| {
|
||||
let timeline = Arc::clone(timeline);
|
||||
let span = Span::current();
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
let span =
|
||||
tracing::info_span!("timeline_shutdown", %timeline_id, ?freeze_and_flush);
|
||||
js.spawn(async move {
|
||||
if freeze_and_flush {
|
||||
timeline.flush_and_shutdown().instrument(span).await
|
||||
@@ -2693,7 +2710,7 @@ impl Tenant {
|
||||
activate_now_sem: tokio::sync::Semaphore::new(0),
|
||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
|
||||
cancel: CancellationToken::default(),
|
||||
gate: Gate::new(format!("Tenant<{tenant_shard_id}>")),
|
||||
gate: Gate::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3903,6 +3920,7 @@ pub(crate) mod harness {
|
||||
),
|
||||
gc_feedback: Some(tenant_conf.gc_feedback),
|
||||
heatmap_period: Some(tenant_conf.heatmap_period),
|
||||
lazy_slru_download: Some(tenant_conf.lazy_slru_download),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5225,7 +5243,7 @@ mod tests {
|
||||
let raw_tline = tline.raw_timeline().unwrap();
|
||||
raw_tline
|
||||
.shutdown()
|
||||
.instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id))
|
||||
.instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id, timeline_id=%TIMELINE_ID))
|
||||
.await;
|
||||
std::mem::forget(tline);
|
||||
}
|
||||
|
||||
@@ -345,6 +345,9 @@ pub struct TenantConf {
|
||||
/// may be disabled if a Tenant will not have secondary locations: only secondary
|
||||
/// locations will use the heatmap uploaded by attached locations.
|
||||
pub heatmap_period: Duration,
|
||||
|
||||
/// If true then SLRU segments are dowloaded on demand, if false SLRU segments are included in basebackup
|
||||
pub lazy_slru_download: bool,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -430,6 +433,10 @@ pub struct TenantConfOpt {
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
pub heatmap_period: Option<Duration>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub lazy_slru_download: Option<bool>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -475,6 +482,9 @@ impl TenantConfOpt {
|
||||
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
|
||||
gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback),
|
||||
heatmap_period: self.heatmap_period.unwrap_or(global_conf.heatmap_period),
|
||||
lazy_slru_download: self
|
||||
.lazy_slru_download
|
||||
.unwrap_or(global_conf.lazy_slru_download),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -513,6 +523,7 @@ impl Default for TenantConf {
|
||||
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
|
||||
gc_feedback: false,
|
||||
heatmap_period: Duration::ZERO,
|
||||
lazy_slru_download: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -584,6 +595,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
.map(humantime),
|
||||
gc_feedback: value.gc_feedback,
|
||||
heatmap_period: value.heatmap_period.map(humantime),
|
||||
lazy_slru_download: value.lazy_slru_download,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,7 +136,11 @@ async fn schedule_ordered_timeline_deletions(
|
||||
let mut already_running_deletions = vec![];
|
||||
|
||||
for (timeline_id, _) in sorted.into_iter().rev() {
|
||||
if let Err(e) = DeleteTimelineFlow::run(tenant, timeline_id, true).await {
|
||||
let span = tracing::info_span!("timeline_delete", %timeline_id);
|
||||
let res = DeleteTimelineFlow::run(tenant, timeline_id, true)
|
||||
.instrument(span)
|
||||
.await;
|
||||
if let Err(e) = res {
|
||||
match e {
|
||||
DeleteTimelineError::NotFound => {
|
||||
// Timeline deletion finished after call to clone above but before call
|
||||
|
||||
@@ -898,6 +898,17 @@ impl TenantManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether the `TenantManager` is responsible for the tenant shard
|
||||
pub(crate) fn manages_tenant_shard(&self, tenant_shard_id: TenantShardId) -> bool {
|
||||
let locked = self.tenants.read().unwrap();
|
||||
|
||||
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
|
||||
.ok()
|
||||
.flatten();
|
||||
|
||||
peek_slot.is_some()
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
|
||||
pub(crate) async fn upsert_location(
|
||||
&self,
|
||||
@@ -1311,6 +1322,7 @@ impl TenantManager {
|
||||
tenant_shard_id: TenantShardId,
|
||||
activation_timeout: Duration,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
super::span::debug_assert_current_span_has_tenant_id();
|
||||
// We acquire a SlotGuard during this function to protect against concurrent
|
||||
// changes while the ::prepare phase of DeleteTenantFlow executes, but then
|
||||
// have to return the Tenant to the map while the background deletion runs.
|
||||
|
||||
@@ -1719,6 +1719,11 @@ pub fn remote_timelines_path(tenant_shard_id: &TenantShardId) -> RemotePath {
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
}
|
||||
|
||||
fn remote_timelines_path_unsharded(tenant_id: &TenantId) -> RemotePath {
|
||||
let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}");
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_timeline_path(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
|
||||
@@ -5,9 +5,11 @@ use camino::Utf8Path;
|
||||
use fail::fail_point;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::io::{ErrorKind, SeekFrom};
|
||||
use std::time::SystemTime;
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::AsyncSeekExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff;
|
||||
|
||||
use super::Generation;
|
||||
use crate::{
|
||||
@@ -17,7 +19,7 @@ use crate::{
|
||||
remote_initdb_preserved_archive_path, remote_path, upload_cancellable,
|
||||
},
|
||||
};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::{GenericRemoteStorage, TimeTravelError};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::index::LayerFileMetadata;
|
||||
@@ -157,3 +159,45 @@ pub(crate) async fn preserve_initdb_archive(
|
||||
.await
|
||||
.with_context(|| format!("backing up initdb archive for '{tenant_id} / {timeline_id}'"))
|
||||
}
|
||||
|
||||
pub(crate) async fn time_travel_recover_tenant(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timestamp: SystemTime,
|
||||
done_if_after: SystemTime,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), TimeTravelError> {
|
||||
let warn_after = 3;
|
||||
let max_attempts = 10;
|
||||
let mut prefixes = Vec::with_capacity(2);
|
||||
if tenant_shard_id.is_zero() {
|
||||
// Also recover the unsharded prefix for a shard of zero:
|
||||
// - if the tenant is totally unsharded, the unsharded prefix contains all the data
|
||||
// - if the tenant is sharded, we still want to recover the initdb data, but we only
|
||||
// want to do it once, so let's do it on the 0 shard
|
||||
let timelines_path_unsharded =
|
||||
super::remote_timelines_path_unsharded(&tenant_shard_id.tenant_id);
|
||||
prefixes.push(timelines_path_unsharded);
|
||||
}
|
||||
if !tenant_shard_id.is_unsharded() {
|
||||
// If the tenant is sharded, we need to recover the sharded prefix
|
||||
let timelines_path = super::remote_timelines_path(tenant_shard_id);
|
||||
prefixes.push(timelines_path);
|
||||
}
|
||||
for prefix in &prefixes {
|
||||
backoff::retry(
|
||||
|| async {
|
||||
storage
|
||||
.time_travel_recover(Some(prefix), timestamp, done_if_after, cancel.clone())
|
||||
.await
|
||||
},
|
||||
|e| !matches!(e, TimeTravelError::Other(_)),
|
||||
warn_after,
|
||||
max_attempts,
|
||||
"time travel recovery of tenant prefix",
|
||||
backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -112,7 +112,7 @@ impl SecondaryTenant {
|
||||
// on shutdown we walk the tenants and fire their
|
||||
// individual cancellations?
|
||||
cancel: CancellationToken::new(),
|
||||
gate: Gate::new(format!("SecondaryTenant {tenant_shard_id}")),
|
||||
gate: Gate::default(),
|
||||
|
||||
shard_identity,
|
||||
tenant_conf: std::sync::Mutex::new(tenant_conf),
|
||||
|
||||
@@ -884,7 +884,7 @@ impl DeltaLayerInner {
|
||||
|
||||
let keys = self.load_keys(ctx).await?;
|
||||
|
||||
async fn dump_blob(val: ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
|
||||
async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
|
||||
let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
|
||||
let val = Value::des(&buf)?;
|
||||
let desc = match val {
|
||||
@@ -906,13 +906,32 @@ impl DeltaLayerInner {
|
||||
|
||||
for entry in keys {
|
||||
let DeltaEntry { key, lsn, val, .. } = entry;
|
||||
let desc = match dump_blob(val, ctx).await {
|
||||
let desc = match dump_blob(&val, ctx).await {
|
||||
Ok(desc) => desc,
|
||||
Err(err) => {
|
||||
format!("ERROR: {err}")
|
||||
}
|
||||
};
|
||||
println!(" key {key} at {lsn}: {desc}");
|
||||
|
||||
// Print more details about CHECKPOINT records. Would be nice to print details
|
||||
// of many other record types too, but these are particularly interesting, as
|
||||
// have a lot of special processing for them in walingest.rs.
|
||||
use pageserver_api::key::CHECKPOINT_KEY;
|
||||
use postgres_ffi::CheckPoint;
|
||||
if key == CHECKPOINT_KEY {
|
||||
let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
|
||||
let val = Value::des(&buf)?;
|
||||
match val {
|
||||
Value::Image(img) => {
|
||||
let checkpoint = CheckPoint::decode(&img)?;
|
||||
println!(" CHECKPOINT: {:?}", checkpoint);
|
||||
}
|
||||
Value::WalRecord(_rec) => {
|
||||
println!(" unexpected walrecord value for checkpoint key");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -124,7 +124,7 @@ pub(super) enum FlushLoopState {
|
||||
|
||||
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Hole {
|
||||
pub(crate) struct Hole {
|
||||
key_range: Range<Key>,
|
||||
coverage_size: usize,
|
||||
}
|
||||
@@ -457,6 +457,21 @@ pub(crate) enum GetVectoredError {
|
||||
InvalidLsn(Lsn),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum GetReadyAncestorError {
|
||||
#[error("ancestor timeline {0} is being stopped")]
|
||||
AncestorStopping(TimelineId),
|
||||
|
||||
#[error("Ancestor LSN wait error: {0}")]
|
||||
AncestorLsnTimeout(#[from] WaitLsnError),
|
||||
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum LogicalSizeCalculationCause {
|
||||
Initial,
|
||||
@@ -535,22 +550,34 @@ impl From<GetVectoredError> for CreateImageLayersError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetReadyAncestorError> for PageReconstructError {
|
||||
fn from(e: GetReadyAncestorError) -> Self {
|
||||
use GetReadyAncestorError::*;
|
||||
match e {
|
||||
AncestorStopping(tid) => PageReconstructError::AncestorStopping(tid),
|
||||
AncestorLsnTimeout(wait_err) => PageReconstructError::AncestorLsnTimeout(wait_err),
|
||||
Cancelled => PageReconstructError::Cancelled,
|
||||
Other(other) => PageReconstructError::Other(other),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
/// Get the LSN where this branch was created
|
||||
pub fn get_ancestor_lsn(&self) -> Lsn {
|
||||
pub(crate) fn get_ancestor_lsn(&self) -> Lsn {
|
||||
self.ancestor_lsn
|
||||
}
|
||||
|
||||
/// Get the ancestor's timeline id
|
||||
pub fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
|
||||
pub(crate) fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
|
||||
self.ancestor_timeline
|
||||
.as_ref()
|
||||
.map(|ancestor| ancestor.timeline_id)
|
||||
}
|
||||
|
||||
/// Lock and get timeline's GC cutoff
|
||||
pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
|
||||
pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
|
||||
self.latest_gc_cutoff_lsn.read()
|
||||
}
|
||||
|
||||
@@ -706,27 +733,27 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
|
||||
pub fn get_last_record_lsn(&self) -> Lsn {
|
||||
pub(crate) fn get_last_record_lsn(&self) -> Lsn {
|
||||
self.last_record_lsn.load().last
|
||||
}
|
||||
|
||||
pub fn get_prev_record_lsn(&self) -> Lsn {
|
||||
pub(crate) fn get_prev_record_lsn(&self) -> Lsn {
|
||||
self.last_record_lsn.load().prev
|
||||
}
|
||||
|
||||
/// Atomically get both last and prev.
|
||||
pub fn get_last_record_rlsn(&self) -> RecordLsn {
|
||||
pub(crate) fn get_last_record_rlsn(&self) -> RecordLsn {
|
||||
self.last_record_lsn.load()
|
||||
}
|
||||
|
||||
pub fn get_disk_consistent_lsn(&self) -> Lsn {
|
||||
pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn {
|
||||
self.disk_consistent_lsn.load()
|
||||
}
|
||||
|
||||
/// remote_consistent_lsn from the perspective of the tenant's current generation,
|
||||
/// not validated with control plane yet.
|
||||
/// See [`Self::get_remote_consistent_lsn_visible`].
|
||||
pub fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
|
||||
pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.remote_consistent_lsn_projected()
|
||||
} else {
|
||||
@@ -737,7 +764,7 @@ impl Timeline {
|
||||
/// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
|
||||
/// i.e. a value of remote_consistent_lsn_projected which has undergone
|
||||
/// generation validation in the deletion queue.
|
||||
pub fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
|
||||
pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.remote_consistent_lsn_visible()
|
||||
} else {
|
||||
@@ -748,7 +775,7 @@ impl Timeline {
|
||||
/// The sum of the file size of all historic layers in the layer map.
|
||||
/// This method makes no distinction between local and remote layers.
|
||||
/// Hence, the result **does not represent local filesystem usage**.
|
||||
pub async fn layer_size_sum(&self) -> u64 {
|
||||
pub(crate) async fn layer_size_sum(&self) -> u64 {
|
||||
let guard = self.layers.read().await;
|
||||
let layer_map = guard.layer_map();
|
||||
let mut size = 0;
|
||||
@@ -758,7 +785,7 @@ impl Timeline {
|
||||
size
|
||||
}
|
||||
|
||||
pub fn resident_physical_size(&self) -> u64 {
|
||||
pub(crate) fn resident_physical_size(&self) -> u64 {
|
||||
self.metrics.resident_physical_size_get()
|
||||
}
|
||||
|
||||
@@ -834,7 +861,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Check that it is valid to request operations with that lsn.
|
||||
pub fn check_lsn_is_in_scope(
|
||||
pub(crate) fn check_lsn_is_in_scope(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
|
||||
@@ -850,7 +877,7 @@ impl Timeline {
|
||||
|
||||
/// Flush to disk all data that was written with the put_* functions
|
||||
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
|
||||
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
|
||||
pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
|
||||
self.freeze_inmem_layer(false).await;
|
||||
self.flush_frozen_layers_and_wait().await
|
||||
}
|
||||
@@ -994,7 +1021,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Mutate the timeline with a [`TimelineWriter`].
|
||||
pub async fn writer(&self) -> TimelineWriter<'_> {
|
||||
pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
|
||||
TimelineWriter {
|
||||
tl: self,
|
||||
_write_guard: self.write_lock.lock().await,
|
||||
@@ -1006,7 +1033,7 @@ impl Timeline {
|
||||
///
|
||||
/// Also flush after a period of time without new data -- it helps
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
pub(crate) async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let open_layer_size = {
|
||||
let guard = self.layers.read().await;
|
||||
@@ -1044,13 +1071,16 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn activate(
|
||||
pub(crate) fn activate(
|
||||
self: &Arc<Self>,
|
||||
broker_client: BrokerClientChannel,
|
||||
background_jobs_can_start: Option<&completion::Barrier>,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
self.spawn_initial_logical_size_computation_task(ctx);
|
||||
if self.tenant_shard_id.is_zero() {
|
||||
// Logical size is only maintained accurately on shard zero.
|
||||
self.spawn_initial_logical_size_computation_task(ctx);
|
||||
}
|
||||
self.launch_wal_receiver(ctx, broker_client);
|
||||
self.set_state(TimelineState::Active);
|
||||
self.launch_eviction_task(background_jobs_can_start);
|
||||
@@ -1060,7 +1090,6 @@ impl Timeline {
|
||||
/// also to remote storage. This method can easily take multiple seconds for a busy timeline.
|
||||
///
|
||||
/// While we are flushing, we continue to accept read I/O.
|
||||
#[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
|
||||
pub(crate) async fn flush_and_shutdown(&self) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
@@ -1109,6 +1138,8 @@ impl Timeline {
|
||||
/// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of
|
||||
/// the graceful [`Timeline::flush_and_shutdown`] function.
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
span::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// Signal any subscribers to our cancellation token to drop out
|
||||
tracing::debug!("Cancelling CancellationToken");
|
||||
self.cancel.cancel();
|
||||
@@ -1144,7 +1175,7 @@ impl Timeline {
|
||||
self.gate.close().await;
|
||||
}
|
||||
|
||||
pub fn set_state(&self, new_state: TimelineState) {
|
||||
pub(crate) fn set_state(&self, new_state: TimelineState) {
|
||||
match (self.current_state(), new_state) {
|
||||
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
|
||||
info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
|
||||
@@ -1164,7 +1195,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_broken(&self, reason: String) {
|
||||
pub(crate) fn set_broken(&self, reason: String) {
|
||||
let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
|
||||
let broken_state = TimelineState::Broken {
|
||||
reason,
|
||||
@@ -1178,27 +1209,27 @@ impl Timeline {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
|
||||
pub fn current_state(&self) -> TimelineState {
|
||||
pub(crate) fn current_state(&self) -> TimelineState {
|
||||
self.state.borrow().clone()
|
||||
}
|
||||
|
||||
pub fn is_broken(&self) -> bool {
|
||||
pub(crate) fn is_broken(&self) -> bool {
|
||||
matches!(&*self.state.borrow(), TimelineState::Broken { .. })
|
||||
}
|
||||
|
||||
pub fn is_active(&self) -> bool {
|
||||
pub(crate) fn is_active(&self) -> bool {
|
||||
self.current_state() == TimelineState::Active
|
||||
}
|
||||
|
||||
pub fn is_stopping(&self) -> bool {
|
||||
pub(crate) fn is_stopping(&self) -> bool {
|
||||
self.current_state() == TimelineState::Stopping
|
||||
}
|
||||
|
||||
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
|
||||
pub(crate) fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
|
||||
self.state.subscribe()
|
||||
}
|
||||
|
||||
pub async fn wait_to_become_active(
|
||||
pub(crate) async fn wait_to_become_active(
|
||||
&self,
|
||||
_ctx: &RequestContext, // Prepare for use by cancellation
|
||||
) -> Result<(), TimelineState> {
|
||||
@@ -1223,7 +1254,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
|
||||
pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
|
||||
let guard = self.layers.read().await;
|
||||
let layer_map = guard.layer_map();
|
||||
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
|
||||
@@ -1247,7 +1278,10 @@ impl Timeline {
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
|
||||
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
pub(crate) async fn download_layer(
|
||||
&self,
|
||||
layer_file_name: &str,
|
||||
) -> anyhow::Result<Option<bool>> {
|
||||
let Some(layer) = self.find_layer(layer_file_name).await else {
|
||||
return Ok(None);
|
||||
};
|
||||
@@ -1264,7 +1298,7 @@ impl Timeline {
|
||||
/// Evict just one layer.
|
||||
///
|
||||
/// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
|
||||
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let _gate = self
|
||||
.gate
|
||||
.enter()
|
||||
@@ -1287,6 +1321,13 @@ const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
|
||||
|
||||
// Private functions
|
||||
impl Timeline {
|
||||
pub(crate) fn get_lazy_slru_download(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
tenant_conf
|
||||
.lazy_slru_download
|
||||
.unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
|
||||
}
|
||||
|
||||
fn get_checkpoint_distance(&self) -> u64 {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
|
||||
tenant_conf
|
||||
@@ -1495,7 +1536,7 @@ impl Timeline {
|
||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
|
||||
|
||||
cancel,
|
||||
gate: Gate::new(format!("Timeline<{tenant_shard_id}/{timeline_id}>")),
|
||||
gate: Gate::default(),
|
||||
|
||||
compaction_lock: tokio::sync::Mutex::default(),
|
||||
gc_lock: tokio::sync::Mutex::default(),
|
||||
@@ -1817,6 +1858,12 @@ impl Timeline {
|
||||
priority: GetLogicalSizePriority,
|
||||
ctx: &RequestContext,
|
||||
) -> logical_size::CurrentLogicalSize {
|
||||
if !self.tenant_shard_id.is_zero() {
|
||||
// Logical size is only accurately maintained on shard zero: when called elsewhere, for example
|
||||
// when HTTP API is serving a GET for timeline zero, return zero
|
||||
return logical_size::CurrentLogicalSize::Approximate(logical_size::Approximate::zero());
|
||||
}
|
||||
|
||||
let current_size = self.current_logical_size.current_size();
|
||||
debug!("Current size: {current_size:?}");
|
||||
|
||||
@@ -2059,7 +2106,7 @@ impl Timeline {
|
||||
.expect("only this task sets it");
|
||||
}
|
||||
|
||||
pub fn spawn_ondemand_logical_size_calculation(
|
||||
pub(crate) fn spawn_ondemand_logical_size_calculation(
|
||||
self: &Arc<Self>,
|
||||
lsn: Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
@@ -2105,6 +2152,9 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
span::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
// We should never be calculating logical sizes on shard !=0, because these shards do not have
|
||||
// accurate relation sizes, and they do not emit consumption metrics.
|
||||
debug_assert!(self.tenant_shard_id.is_zero());
|
||||
|
||||
let _guard = self.gate.enter();
|
||||
|
||||
@@ -2138,7 +2188,7 @@ impl Timeline {
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// This method is cancellation-safe.
|
||||
pub async fn calculate_logical_size(
|
||||
async fn calculate_logical_size(
|
||||
&self,
|
||||
up_to_lsn: Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
@@ -2392,60 +2442,8 @@ impl Timeline {
|
||||
timeline.ancestor_lsn,
|
||||
cont_lsn
|
||||
);
|
||||
let ancestor = match timeline.get_ancestor_timeline() {
|
||||
Ok(timeline) => timeline,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
|
||||
// It's possible that the ancestor timeline isn't active yet, or
|
||||
// is active but hasn't yet caught up to the branch point. Wait
|
||||
// for it.
|
||||
//
|
||||
// This cannot happen while the pageserver is running normally,
|
||||
// because you cannot create a branch from a point that isn't
|
||||
// present in the pageserver yet. However, we don't wait for the
|
||||
// branch point to be uploaded to cloud storage before creating
|
||||
// a branch. I.e., the branch LSN need not be remote consistent
|
||||
// for the branching operation to succeed.
|
||||
//
|
||||
// Hence, if we try to load a tenant in such a state where
|
||||
// 1. the existence of the branch was persisted (in IndexPart and/or locally)
|
||||
// 2. but the ancestor state is behind branch_lsn because it was not yet persisted
|
||||
// then we will need to wait for the ancestor timeline to
|
||||
// re-stream WAL up to branch_lsn before we access it.
|
||||
//
|
||||
// How can a tenant get in such a state?
|
||||
// - ungraceful pageserver process exit
|
||||
// - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
|
||||
//
|
||||
// NB: this could be avoided by requiring
|
||||
// branch_lsn >= remote_consistent_lsn
|
||||
// during branch creation.
|
||||
match ancestor.wait_to_become_active(ctx).await {
|
||||
Ok(()) => {}
|
||||
Err(TimelineState::Stopping) => {
|
||||
return Err(PageReconstructError::AncestorStopping(ancestor.timeline_id));
|
||||
}
|
||||
Err(state) => {
|
||||
return Err(PageReconstructError::Other(anyhow::anyhow!(
|
||||
"Timeline {} will not become active. Current state: {:?}",
|
||||
ancestor.timeline_id,
|
||||
&state,
|
||||
)));
|
||||
}
|
||||
}
|
||||
ancestor
|
||||
.wait_lsn(timeline.ancestor_lsn, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
e @ WaitLsnError::Timeout(_) => PageReconstructError::AncestorLsnTimeout(e),
|
||||
WaitLsnError::Shutdown => PageReconstructError::Cancelled,
|
||||
e @ WaitLsnError::BadState => {
|
||||
PageReconstructError::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
})?;
|
||||
|
||||
timeline_owned = ancestor;
|
||||
timeline_owned = timeline.get_ready_ancestor_timeline(ctx).await?;
|
||||
timeline = &*timeline_owned;
|
||||
prev_lsn = Lsn(u64::MAX);
|
||||
continue 'outer;
|
||||
@@ -2575,6 +2573,66 @@ impl Timeline {
|
||||
Some((lsn, img))
|
||||
}
|
||||
|
||||
async fn get_ready_ancestor_timeline(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, GetReadyAncestorError> {
|
||||
let ancestor = match self.get_ancestor_timeline() {
|
||||
Ok(timeline) => timeline,
|
||||
Err(e) => return Err(GetReadyAncestorError::from(e)),
|
||||
};
|
||||
|
||||
// It's possible that the ancestor timeline isn't active yet, or
|
||||
// is active but hasn't yet caught up to the branch point. Wait
|
||||
// for it.
|
||||
//
|
||||
// This cannot happen while the pageserver is running normally,
|
||||
// because you cannot create a branch from a point that isn't
|
||||
// present in the pageserver yet. However, we don't wait for the
|
||||
// branch point to be uploaded to cloud storage before creating
|
||||
// a branch. I.e., the branch LSN need not be remote consistent
|
||||
// for the branching operation to succeed.
|
||||
//
|
||||
// Hence, if we try to load a tenant in such a state where
|
||||
// 1. the existence of the branch was persisted (in IndexPart and/or locally)
|
||||
// 2. but the ancestor state is behind branch_lsn because it was not yet persisted
|
||||
// then we will need to wait for the ancestor timeline to
|
||||
// re-stream WAL up to branch_lsn before we access it.
|
||||
//
|
||||
// How can a tenant get in such a state?
|
||||
// - ungraceful pageserver process exit
|
||||
// - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
|
||||
//
|
||||
// NB: this could be avoided by requiring
|
||||
// branch_lsn >= remote_consistent_lsn
|
||||
// during branch creation.
|
||||
match ancestor.wait_to_become_active(ctx).await {
|
||||
Ok(()) => {}
|
||||
Err(TimelineState::Stopping) => {
|
||||
return Err(GetReadyAncestorError::AncestorStopping(
|
||||
ancestor.timeline_id,
|
||||
));
|
||||
}
|
||||
Err(state) => {
|
||||
return Err(GetReadyAncestorError::Other(anyhow::anyhow!(
|
||||
"Timeline {} will not become active. Current state: {:?}",
|
||||
ancestor.timeline_id,
|
||||
&state,
|
||||
)));
|
||||
}
|
||||
}
|
||||
ancestor
|
||||
.wait_lsn(self.ancestor_lsn, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
|
||||
WaitLsnError::Shutdown => GetReadyAncestorError::Cancelled,
|
||||
e @ WaitLsnError::BadState => GetReadyAncestorError::Other(anyhow::anyhow!(e)),
|
||||
})?;
|
||||
|
||||
Ok(ancestor)
|
||||
}
|
||||
|
||||
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
|
||||
let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
|
||||
format!(
|
||||
@@ -2785,12 +2843,12 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Flush one frozen in-memory layer to disk, as a new delta layer.
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id, layer=%frozen_layer))]
|
||||
async fn flush_frozen_layer(
|
||||
self: &Arc<Self>,
|
||||
frozen_layer: Arc<InMemoryLayer>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), FlushLayerError> {
|
||||
span::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
// As a special case, when we have just imported an image into the repository,
|
||||
// instead of writing out a L0 delta layer, we directly write out image layer
|
||||
// files instead. This is possible as long as *all* the data imported into the
|
||||
@@ -3379,7 +3437,7 @@ enum DurationRecorder {
|
||||
}
|
||||
|
||||
impl DurationRecorder {
|
||||
pub fn till_now(&self) -> DurationRecorder {
|
||||
fn till_now(&self) -> DurationRecorder {
|
||||
match self {
|
||||
DurationRecorder::NotStarted => {
|
||||
panic!("must only call on recorded measurements")
|
||||
@@ -3390,7 +3448,7 @@ impl DurationRecorder {
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn into_recorded(self) -> Option<RecordedDuration> {
|
||||
fn into_recorded(self) -> Option<RecordedDuration> {
|
||||
match self {
|
||||
DurationRecorder::NotStarted => None,
|
||||
DurationRecorder::Recorded(recorded, _) => Some(recorded),
|
||||
@@ -4590,7 +4648,9 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_download_all_remote_layers_task_info(&self) -> Option<DownloadRemoteLayersTaskInfo> {
|
||||
pub(crate) fn get_download_all_remote_layers_task_info(
|
||||
&self,
|
||||
) -> Option<DownloadRemoteLayersTaskInfo> {
|
||||
self.download_all_remote_layers_task_info
|
||||
.read()
|
||||
.unwrap()
|
||||
@@ -4686,7 +4746,7 @@ fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageRecon
|
||||
// TODO Currently, Deref is used to allow easy access to read methods from this trait.
|
||||
// This is probably considered a bad practice in Rust and should be fixed eventually,
|
||||
// but will cause large code changes.
|
||||
pub struct TimelineWriter<'a> {
|
||||
pub(crate) struct TimelineWriter<'a> {
|
||||
tl: &'a Timeline,
|
||||
_write_guard: tokio::sync::MutexGuard<'a, ()>,
|
||||
}
|
||||
@@ -4704,7 +4764,7 @@ impl<'a> TimelineWriter<'a> {
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
pub async fn put(
|
||||
pub(crate) async fn put(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
|
||||
@@ -356,12 +356,14 @@ impl DeleteTimelineFlow {
|
||||
// NB: If this fails half-way through, and is retried, the retry will go through
|
||||
// all the same steps again. Make sure the code here is idempotent, and don't
|
||||
// error out if some of the shutdown tasks have already been completed!
|
||||
#[instrument(skip(tenant), fields(tenant_id=%tenant.tenant_shard_id.tenant_id, shard_id=%tenant.tenant_shard_id.shard_slug()))]
|
||||
#[instrument(skip_all, fields(%inplace))]
|
||||
pub async fn run(
|
||||
tenant: &Arc<Tenant>,
|
||||
timeline_id: TimelineId,
|
||||
inplace: bool,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
super::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let (timeline, mut guard) = Self::prepare(tenant, timeline_id)?;
|
||||
|
||||
guard.mark_in_progress()?;
|
||||
|
||||
@@ -319,6 +319,13 @@ impl Timeline {
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> ControlFlow<()> {
|
||||
if !self.tenant_shard_id.is_zero() {
|
||||
// Shards !=0 do not maintain accurate relation sizes, and do not need to calculate logical size
|
||||
// for consumption metrics (consumption metrics are only sent from shard 0). We may therefore
|
||||
// skip imitating logical size accesses for eviction purposes.
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
|
||||
let mut state = self.eviction_task_timeline_state.lock().await;
|
||||
|
||||
// Only do the imitate_layer accesses approximately as often as the threshold. A little
|
||||
|
||||
@@ -101,6 +101,14 @@ impl From<&Exact> for u64 {
|
||||
}
|
||||
}
|
||||
|
||||
impl Approximate {
|
||||
/// For use in situations where we don't have a sane logical size value but need
|
||||
/// to return something, e.g. in HTTP API on shard >0 of a sharded tenant.
|
||||
pub(crate) fn zero() -> Self {
|
||||
Self(0)
|
||||
}
|
||||
}
|
||||
|
||||
impl CurrentLogicalSize {
|
||||
pub(crate) fn size_dont_care_about_accuracy(&self) -> u64 {
|
||||
match self {
|
||||
|
||||
@@ -426,13 +426,21 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
// Send the replication feedback message.
|
||||
// Regular standby_status_update fields are put into this message.
|
||||
let current_timeline_size = timeline
|
||||
.get_current_logical_size(
|
||||
crate::tenant::timeline::GetLogicalSizePriority::User,
|
||||
&ctx,
|
||||
)
|
||||
// FIXME: https://github.com/neondatabase/neon/issues/5963
|
||||
.size_dont_care_about_accuracy();
|
||||
let current_timeline_size = if timeline.tenant_shard_id.is_zero() {
|
||||
timeline
|
||||
.get_current_logical_size(
|
||||
crate::tenant::timeline::GetLogicalSizePriority::User,
|
||||
&ctx,
|
||||
)
|
||||
// FIXME: https://github.com/neondatabase/neon/issues/5963
|
||||
.size_dont_care_about_accuracy()
|
||||
} else {
|
||||
// Non-zero shards send zero for logical size. The safekeeper will ignore
|
||||
// this number. This is because in a sharded tenant, only shard zero maintains
|
||||
// accurate logical size.
|
||||
0
|
||||
};
|
||||
|
||||
let status_update = PageserverFeedback {
|
||||
current_timeline_size,
|
||||
last_received_lsn,
|
||||
|
||||
@@ -22,6 +22,7 @@ use anyhow::Context;
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use nix::poll::*;
|
||||
use pageserver_api::models::WalRedoManagerStatus;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use serde::Serialize;
|
||||
use std::collections::VecDeque;
|
||||
@@ -29,7 +30,6 @@ use std::io;
|
||||
use std::io::prelude::*;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::os::unix::prelude::CommandExt;
|
||||
use std::process::Stdio;
|
||||
use std::process::{Child, ChildStdin, ChildStdout, Command};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
|
||||
@@ -93,7 +93,7 @@ struct ProcessOutput {
|
||||
pub struct PostgresRedoManager {
|
||||
tenant_shard_id: TenantShardId,
|
||||
conf: &'static PageServerConf,
|
||||
last_successful_redo_at: std::sync::Mutex<Option<Instant>>,
|
||||
last_redo_at: std::sync::Mutex<Option<Instant>>,
|
||||
redo_process: RwLock<Option<Arc<WalRedoProcess>>>,
|
||||
}
|
||||
|
||||
@@ -179,6 +179,20 @@ impl PostgresRedoManager {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
|
||||
Some(WalRedoManagerStatus {
|
||||
last_redo_at: {
|
||||
let at = *self.last_redo_at.lock().unwrap();
|
||||
at.and_then(|at| {
|
||||
let age = at.elapsed();
|
||||
// map any chrono errors silently to None here
|
||||
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
|
||||
})
|
||||
},
|
||||
pid: self.redo_process.read().unwrap().as_ref().map(|p| p.id()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl PostgresRedoManager {
|
||||
@@ -193,7 +207,7 @@ impl PostgresRedoManager {
|
||||
PostgresRedoManager {
|
||||
tenant_shard_id,
|
||||
conf,
|
||||
last_successful_redo_at: std::sync::Mutex::default(),
|
||||
last_redo_at: std::sync::Mutex::default(),
|
||||
redo_process: RwLock::new(None),
|
||||
}
|
||||
}
|
||||
@@ -202,21 +216,9 @@ impl PostgresRedoManager {
|
||||
/// rely on our owner calling this function periodically in its own housekeeping
|
||||
/// loops.
|
||||
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
|
||||
if let Ok(g) = self.last_successful_redo_at.try_lock() {
|
||||
if let Some(last_successful_redo_at) = *g {
|
||||
// Kill the walredo process if
|
||||
// - it has been unused for `idle_timeout`
|
||||
// - it has been used, but, without success.
|
||||
// The former is just good housekeeping.
|
||||
// The latter adds robustness for the case where something is wrong
|
||||
// with the walredo process.
|
||||
//
|
||||
// Note that we don't want to kill the process immediately on each redo failure.
|
||||
// The reason is that the redo failure could be caused by corrupted or malicious data.
|
||||
// We don't want to get into a kill-respawn loop in that case.
|
||||
// So, we piggy-back on the quiescing mechanism,
|
||||
// resulting in a max kill-respawn frequency of `1/idle_timeout`.
|
||||
if last_successful_redo_at.elapsed() >= idle_timeout {
|
||||
if let Ok(g) = self.last_redo_at.try_lock() {
|
||||
if let Some(last_redo_at) = *g {
|
||||
if last_redo_at.elapsed() >= idle_timeout {
|
||||
drop(g);
|
||||
let mut guard = self.redo_process.write().unwrap();
|
||||
*guard = None;
|
||||
@@ -239,32 +241,8 @@ impl PostgresRedoManager {
|
||||
wal_redo_timeout: Duration,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let res = self.apply_batch_postgres0(
|
||||
key,
|
||||
lsn,
|
||||
base_img,
|
||||
base_img_lsn,
|
||||
records,
|
||||
wal_redo_timeout,
|
||||
pg_version,
|
||||
);
|
||||
if res.is_ok() {
|
||||
*self.last_successful_redo_at.lock().unwrap() = Some(Instant::now());
|
||||
}
|
||||
res
|
||||
}
|
||||
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn apply_batch_postgres0(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
base_img: Option<Bytes>,
|
||||
base_img_lsn: Lsn,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
|
||||
const MAX_RETRY_ATTEMPTS: u32 = 1;
|
||||
let mut n_attempts = 0u32;
|
||||
@@ -279,8 +257,7 @@ impl PostgresRedoManager {
|
||||
let mut proc_guard = self.redo_process.write().unwrap();
|
||||
match &*proc_guard {
|
||||
None => {
|
||||
let timer =
|
||||
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.start_timer();
|
||||
let start = Instant::now();
|
||||
let proc = Arc::new(
|
||||
WalRedoProcess::launch(
|
||||
self.conf,
|
||||
@@ -289,7 +266,14 @@ impl PostgresRedoManager {
|
||||
)
|
||||
.context("launch walredo process")?,
|
||||
);
|
||||
timer.observe_duration();
|
||||
let duration = start.elapsed();
|
||||
WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM
|
||||
.observe(duration.as_secs_f64());
|
||||
info!(
|
||||
duration_ms = duration.as_millis(),
|
||||
pid = proc.id(),
|
||||
"launched walredo process"
|
||||
);
|
||||
*proc_guard = Some(Arc::clone(&proc));
|
||||
proc
|
||||
}
|
||||
@@ -643,40 +627,6 @@ impl PostgresRedoManager {
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Command with ability not to give all file descriptors to child process
|
||||
///
|
||||
trait CloseFileDescriptors: CommandExt {
|
||||
///
|
||||
/// Close file descriptors (other than stdin, stdout, stderr) in child process
|
||||
///
|
||||
fn close_fds(&mut self) -> &mut Command;
|
||||
}
|
||||
|
||||
impl<C: CommandExt> CloseFileDescriptors for C {
|
||||
fn close_fds(&mut self) -> &mut Command {
|
||||
// SAFETY: Code executed inside pre_exec should have async-signal-safety,
|
||||
// which means it should be safe to execute inside a signal handler.
|
||||
// The precise meaning depends on platform. See `man signal-safety`
|
||||
// for the linux definition.
|
||||
//
|
||||
// The set_fds_cloexec_threadsafe function is documented to be
|
||||
// async-signal-safe.
|
||||
//
|
||||
// Aside from this function, the rest of the code is re-entrant and
|
||||
// doesn't make any syscalls. We're just passing constants.
|
||||
//
|
||||
// NOTE: It's easy to indirectly cause a malloc or lock a mutex,
|
||||
// which is not async-signal-safe. Be careful.
|
||||
unsafe {
|
||||
self.pre_exec(move || {
|
||||
close_fds::set_fds_cloexec_threadsafe(3, &[]);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct WalRedoProcess {
|
||||
#[allow(dead_code)]
|
||||
conf: &'static PageServerConf,
|
||||
@@ -705,23 +655,25 @@ impl WalRedoProcess {
|
||||
|
||||
// Start postgres itself
|
||||
let child = Command::new(pg_bin_dir_path.join("postgres"))
|
||||
// the first arg must be --wal-redo so the child process enters into walredo mode
|
||||
.arg("--wal-redo")
|
||||
// the child doesn't process this arg, but, having it in the argv helps indentify the
|
||||
// walredo process for a particular tenant when debugging a pagserver
|
||||
.args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
|
||||
.stdin(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
|
||||
.env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
|
||||
// The redo process is not trusted, and runs in seccomp mode that
|
||||
// doesn't allow it to open any files. We have to also make sure it
|
||||
// doesn't inherit any file descriptors from the pageserver, that
|
||||
// would allow an attacker to read any files that happen to be open
|
||||
// in the pageserver.
|
||||
//
|
||||
// The Rust standard library makes sure to mark any file descriptors with
|
||||
// as close-on-exec by default, but that's not enough, since we use
|
||||
// libraries that directly call libc open without setting that flag.
|
||||
.close_fds()
|
||||
// NB: The redo process is not trusted after we sent it the first
|
||||
// walredo work. Before that, it is trusted. Specifically, we trust
|
||||
// it to
|
||||
// 1. close all file descriptors except stdin, stdout, stderr because
|
||||
// pageserver might not be 100% diligent in setting FD_CLOEXEC on all
|
||||
// the files it opens, and
|
||||
// 2. to use seccomp to sandbox itself before processing the first
|
||||
// walredo request.
|
||||
.spawn_no_leak_child(tenant_shard_id)
|
||||
.context("spawn process")?;
|
||||
WAL_REDO_PROCESS_COUNTERS.started.inc();
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
From 5518a806a70e7f40d5054a762ccda7d5e6b0d31c Mon Sep 17 00:00:00 2001
|
||||
From de3dd0cd034d2bcc12b456171ce163bdc1f4cb65 Mon Sep 17 00:00:00 2001
|
||||
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
|
||||
Date: Tue, 30 Jan 2024 14:33:00 +0200
|
||||
Subject: [PATCH] Make v0.6.0 work with Neon
|
||||
Date: Thu, 1 Feb 2024 17:42:31 +0200
|
||||
Subject: [PATCH 1/1] Make v0.6.0 work with Neon
|
||||
|
||||
Now that the WAL-logging happens as a separate step at the end of the
|
||||
build, we need a few neon-specific hints to make it work.
|
||||
@@ -10,35 +10,35 @@ build, we need a few neon-specific hints to make it work.
|
||||
1 file changed, 28 insertions(+)
|
||||
|
||||
diff --git a/src/hnswbuild.c b/src/hnswbuild.c
|
||||
index 680789ba9044900eac9321844ee2a808a4a2ed12..41c5b709bcb2367ac8b8c498788ecac4c1148b74 100644
|
||||
index 680789b..bfa657a 100644
|
||||
--- a/src/hnswbuild.c
|
||||
+++ b/src/hnswbuild.c
|
||||
@@ -1089,13 +1089,41 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
|
||||
SeedRandom(42);
|
||||
#endif
|
||||
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_start_unlogged_build(index->rd_smgr);
|
||||
+ smgr_start_unlogged_build(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
InitBuildState(buildstate, heap, index, indexInfo, forkNum);
|
||||
|
||||
|
||||
BuildGraph(buildstate, forkNum);
|
||||
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_finish_unlogged_build_phase_1(index->rd_smgr);
|
||||
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
if (RelationNeedsWAL(index))
|
||||
+ {
|
||||
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocks(index), true);
|
||||
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ {
|
||||
+#if PG_VERSION_NUM >= 160000
|
||||
+ RelFileLocator rlocator = index->rd_smgr->smgr_rlocator.locator;
|
||||
+ RelFileLocator rlocator = RelationGetSmgr(index)->smgr_rlocator.locator;
|
||||
+#else
|
||||
+ RelFileNode rlocator = index->rd_smgr->smgr_rnode.node;
|
||||
+ RelFileNode rlocator = RelationGetSmgr(index)->smgr_rnode.node;
|
||||
+#endif
|
||||
+
|
||||
+ SetLastWrittenLSNForBlockRange(XactLastRecEnd, rlocator,
|
||||
@@ -49,8 +49,12 @@ index 680789ba9044900eac9321844ee2a808a4a2ed12..41c5b709bcb2367ac8b8c498788ecac4
|
||||
+ }
|
||||
+
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_end_unlogged_build(index->rd_smgr);
|
||||
+ smgr_end_unlogged_build(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
FreeBuildState(buildstate);
|
||||
}
|
||||
|
||||
--
|
||||
2.39.2
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
|
||||
#include "neon_pgversioncompat.h"
|
||||
|
||||
#include "access/slru.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include RELFILEINFO_HDR
|
||||
#include "lib/stringinfo.h"
|
||||
@@ -34,6 +35,7 @@ typedef enum
|
||||
T_NeonNblocksRequest,
|
||||
T_NeonGetPageRequest,
|
||||
T_NeonDbSizeRequest,
|
||||
T_NeonGetSlruSegmentRequest,
|
||||
|
||||
/* pagestore -> pagestore_client */
|
||||
T_NeonExistsResponse = 100,
|
||||
@@ -41,6 +43,7 @@ typedef enum
|
||||
T_NeonGetPageResponse,
|
||||
T_NeonErrorResponse,
|
||||
T_NeonDbSizeResponse,
|
||||
T_NeonGetSlruSegmentResponse,
|
||||
} NeonMessageTag;
|
||||
|
||||
/* base struct for c-style inheritance */
|
||||
@@ -59,6 +62,13 @@ typedef struct
|
||||
(errmsg(NEON_TAG "[shard %d] " fmt, shard_no, ##__VA_ARGS__), \
|
||||
errhidestmt(true), errhidecontext(true), errposition(0), internalerrposition(0)))
|
||||
|
||||
/* SLRUs downloadable from page server */
|
||||
typedef enum {
|
||||
SLRU_CLOG,
|
||||
SLRU_MULTIXACT_MEMBERS,
|
||||
SLRU_MULTIXACT_OFFSETS
|
||||
} SlruKind;
|
||||
|
||||
/*
|
||||
* supertype of all the Neon*Request structs below
|
||||
*
|
||||
@@ -101,6 +111,13 @@ typedef struct
|
||||
BlockNumber blkno;
|
||||
} NeonGetPageRequest;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
NeonRequest req;
|
||||
SlruKind kind;
|
||||
int segno;
|
||||
} NeonGetSlruSegmentRequest;
|
||||
|
||||
/* supertype of all the Neon*Response structs below */
|
||||
typedef struct
|
||||
{
|
||||
@@ -140,6 +157,14 @@ typedef struct
|
||||
* message */
|
||||
} NeonErrorResponse;
|
||||
|
||||
typedef struct
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
int n_blocks;
|
||||
char data[BLCKSZ * SLRU_PAGES_PER_SEGMENT];
|
||||
} NeonGetSlruSegmentResponse;
|
||||
|
||||
|
||||
extern StringInfoData nm_pack_request(NeonRequest *msg);
|
||||
extern NeonResponse *nm_unpack_response(StringInfo s);
|
||||
extern char *nm_to_string(NeonMessage *msg);
|
||||
|
||||
@@ -1043,12 +1043,25 @@ nm_pack_request(NeonRequest *msg)
|
||||
break;
|
||||
}
|
||||
|
||||
case T_NeonGetSlruSegmentRequest:
|
||||
{
|
||||
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendbyte(&s, msg_req->kind);
|
||||
pq_sendint32(&s, msg_req->segno);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
/* pagestore -> pagestore_client. We never need to create these. */
|
||||
case T_NeonExistsResponse:
|
||||
case T_NeonNblocksResponse:
|
||||
case T_NeonGetPageResponse:
|
||||
case T_NeonErrorResponse:
|
||||
case T_NeonDbSizeResponse:
|
||||
case T_NeonGetSlruSegmentResponse:
|
||||
default:
|
||||
neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag);
|
||||
break;
|
||||
@@ -1135,6 +1148,20 @@ nm_unpack_response(StringInfo s)
|
||||
break;
|
||||
}
|
||||
|
||||
case T_NeonGetSlruSegmentResponse:
|
||||
{
|
||||
NeonGetSlruSegmentResponse *msg_resp;
|
||||
int n_blocks = pq_getmsgint(s, 4);
|
||||
msg_resp = palloc(sizeof(NeonGetSlruSegmentResponse));
|
||||
msg_resp->tag = tag;
|
||||
msg_resp->n_blocks = n_blocks;
|
||||
memcpy(msg_resp->data, pq_getmsgbytes(s, n_blocks * BLCKSZ), n_blocks * BLCKSZ);
|
||||
pq_getmsgend(s);
|
||||
|
||||
resp = (NeonResponse *) msg_resp;
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* pagestore_client -> pagestore
|
||||
*
|
||||
@@ -1144,6 +1171,7 @@ nm_unpack_response(StringInfo s)
|
||||
case T_NeonNblocksRequest:
|
||||
case T_NeonGetPageRequest:
|
||||
case T_NeonDbSizeRequest:
|
||||
case T_NeonGetSlruSegmentRequest:
|
||||
default:
|
||||
neon_log(ERROR, "unexpected neon message tag 0x%02x", tag);
|
||||
break;
|
||||
@@ -1213,7 +1241,18 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
case T_NeonGetSlruSegmentRequest:
|
||||
{
|
||||
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
|
||||
|
||||
appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentRequest\"");
|
||||
appendStringInfo(&s, ", \"kind\": %u", msg_req->kind);
|
||||
appendStringInfo(&s, ", \"segno\": %u", msg_req->segno);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
/* pagestore -> pagestore_client */
|
||||
case T_NeonExistsResponse:
|
||||
{
|
||||
@@ -1267,6 +1306,17 @@ nm_to_string(NeonMessage *msg)
|
||||
msg_resp->db_size);
|
||||
appendStringInfoChar(&s, '}');
|
||||
|
||||
break;
|
||||
}
|
||||
case T_NeonGetSlruSegmentResponse:
|
||||
{
|
||||
NeonGetSlruSegmentResponse *msg_resp = (NeonGetSlruSegmentResponse *) msg;
|
||||
|
||||
appendStringInfoString(&s, "{\"type\": \"NeonGetSlruSegmentResponse\"");
|
||||
appendStringInfo(&s, ", \"n_blocks\": %u}",
|
||||
msg_resp->n_blocks);
|
||||
appendStringInfoChar(&s, '}');
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -2739,6 +2789,74 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
}
|
||||
|
||||
#define STRPREFIX(str, prefix) (strncmp(str, prefix, strlen(prefix)) == 0)
|
||||
|
||||
static int
|
||||
neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buffer)
|
||||
{
|
||||
XLogRecPtr request_lsn;
|
||||
/*
|
||||
* GetRedoStartLsn() returns LSN of basebackup.
|
||||
* We need to download SLRU segments only once after node startup,
|
||||
* then SLRUs are maintained locally.
|
||||
*/
|
||||
request_lsn = GetRedoStartLsn();
|
||||
request_lsn = nm_adjust_lsn(request_lsn);
|
||||
SlruKind kind;
|
||||
|
||||
if (STRPREFIX(path, "pg_xact"))
|
||||
kind = SLRU_CLOG;
|
||||
else if (STRPREFIX(path, "pg_multixact/members"))
|
||||
kind = SLRU_MULTIXACT_MEMBERS;
|
||||
else if (STRPREFIX(path, "pg_multixact/offsets"))
|
||||
kind = SLRU_MULTIXACT_OFFSETS;
|
||||
else
|
||||
return -1;
|
||||
|
||||
NeonResponse *resp;
|
||||
NeonGetSlruSegmentRequest request = {
|
||||
.req.tag = T_NeonGetSlruSegmentRequest,
|
||||
.req.latest = false,
|
||||
.req.lsn = request_lsn,
|
||||
|
||||
.kind = kind,
|
||||
.segno = segno
|
||||
};
|
||||
int n_blocks;
|
||||
shardno_t shard_no = 0; /* All SLRUs are at shard 0 */
|
||||
do
|
||||
{
|
||||
while (!page_server->send(shard_no, &request.req) || !page_server->flush(shard_no));
|
||||
consume_prefetch_responses();
|
||||
resp = page_server->receive(shard_no);
|
||||
} while (resp == NULL);
|
||||
|
||||
switch (resp->tag)
|
||||
{
|
||||
case T_NeonGetSlruSegmentResponse:
|
||||
n_blocks = ((NeonGetSlruSegmentResponse *) resp)->n_blocks;
|
||||
memcpy(buffer, ((NeonGetSlruSegmentResponse *) resp)->data, n_blocks*BLCKSZ);
|
||||
break;
|
||||
|
||||
case T_NeonErrorResponse:
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_IO_ERROR),
|
||||
errmsg(NEON_TAG "could not read SLRU %d segment %d at lsn %X/%08X",
|
||||
kind,
|
||||
segno,
|
||||
LSN_FORMAT_ARGS(request_lsn)),
|
||||
errdetail("page server returned error: %s",
|
||||
((NeonErrorResponse *) resp)->message)));
|
||||
break;
|
||||
|
||||
default:
|
||||
neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
}
|
||||
pfree(resp);
|
||||
|
||||
return n_blocks;
|
||||
}
|
||||
|
||||
static void
|
||||
AtEOXact_neon(XactEvent event, void *arg)
|
||||
{
|
||||
@@ -2797,6 +2915,8 @@ static const struct f_smgr neon_smgr =
|
||||
.smgr_start_unlogged_build = neon_start_unlogged_build,
|
||||
.smgr_finish_unlogged_build_phase_1 = neon_finish_unlogged_build_phase_1,
|
||||
.smgr_end_unlogged_build = neon_end_unlogged_build,
|
||||
|
||||
.smgr_read_slru_segment = neon_read_slru_segment,
|
||||
};
|
||||
|
||||
const f_smgr *
|
||||
|
||||
@@ -140,9 +140,42 @@ static XLogReaderState *reader_state;
|
||||
#define TRACE DEBUG5
|
||||
|
||||
#ifdef HAVE_LIBSECCOMP
|
||||
|
||||
|
||||
/*
|
||||
* https://man7.org/linux/man-pages/man2/close_range.2.html
|
||||
*
|
||||
* The `close_range` syscall is available as of Linux 5.9.
|
||||
*
|
||||
* The `close_range` libc wrapper is only available in glibc >= 2.34.
|
||||
* Debian Bullseye ships a libc package based on glibc 2.31.
|
||||
* => write the wrapper ourselves, using the syscall number from the kernel headers.
|
||||
*
|
||||
* If the Linux uAPI headers don't define the system call number,
|
||||
* fail the build deliberately rather than ifdef'ing it to ENOSYS.
|
||||
* We prefer a compile time over a runtime error for walredo.
|
||||
*/
|
||||
#include <unistd.h>
|
||||
#include <sys/syscall.h>
|
||||
#include <errno.h>
|
||||
int close_range(unsigned int start_fd, unsigned int count, unsigned int flags) {
|
||||
return syscall(__NR_close_range, start_fd, count, flags);
|
||||
}
|
||||
|
||||
static void
|
||||
enter_seccomp_mode(void)
|
||||
{
|
||||
|
||||
/*
|
||||
* The pageserver process relies on us to close all the file descriptors
|
||||
* it potentially leaked to us, _before_ we start processing potentially dangerous
|
||||
* wal records. See the comment in the Rust code that launches this process.
|
||||
*/
|
||||
int err;
|
||||
if (err = close_range(3, ~0U, 0)) {
|
||||
ereport(FATAL, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("seccomp: could not close files >= fd 3")));
|
||||
}
|
||||
|
||||
PgSeccompRule syscalls[] =
|
||||
{
|
||||
/* Hard requirements */
|
||||
|
||||
@@ -9,11 +9,10 @@ use crate::auth::credentials::check_peer_addr_is_in_list;
|
||||
use crate::auth::validate_password_and_exchange;
|
||||
use crate::cache::Cached;
|
||||
use crate::console::errors::GetAuthInfoError;
|
||||
use crate::console::provider::ConsoleBackend;
|
||||
use crate::console::provider::{CachedRoleSecret, ConsoleBackend};
|
||||
use crate::console::AuthSecret;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::proxy::connect_compute::handle_try_wake;
|
||||
use crate::proxy::retry::retry_after;
|
||||
use crate::proxy::wake_compute::wake_compute;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::stream::Stream;
|
||||
use crate::{
|
||||
@@ -28,13 +27,26 @@ use crate::{
|
||||
};
|
||||
use crate::{scram, EndpointCacheKey, EndpointId, RoleName};
|
||||
use futures::TryFutureExt;
|
||||
use std::borrow::Cow;
|
||||
use std::ops::ControlFlow;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::info;
|
||||
|
||||
use super::IpPattern;
|
||||
/// Alternative to [`std::borrow::Cow`] but doesn't need `T: ToOwned` as we don't need that functionality
|
||||
pub enum MaybeOwned<'a, T> {
|
||||
Owned(T),
|
||||
Borrowed(&'a T),
|
||||
}
|
||||
|
||||
impl<T> std::ops::Deref for MaybeOwned<'_, T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
match self {
|
||||
MaybeOwned::Owned(t) => t,
|
||||
MaybeOwned::Borrowed(t) => t,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This type serves two purposes:
|
||||
///
|
||||
@@ -46,17 +58,16 @@ use super::IpPattern;
|
||||
/// backends which require them for the authentication process.
|
||||
pub enum BackendType<'a, T> {
|
||||
/// Cloud API (V2).
|
||||
Console(Cow<'a, ConsoleBackend>, T),
|
||||
Console(MaybeOwned<'a, ConsoleBackend>, T),
|
||||
/// Authentication via a web browser.
|
||||
Link(Cow<'a, url::ApiUrl>),
|
||||
#[cfg(test)]
|
||||
/// Test backend.
|
||||
Test(&'a dyn TestBackend),
|
||||
Link(MaybeOwned<'a, url::ApiUrl>),
|
||||
}
|
||||
|
||||
pub trait TestBackend: Send + Sync + 'static {
|
||||
fn wake_compute(&self) -> Result<CachedNodeInfo, console::errors::WakeComputeError>;
|
||||
fn get_allowed_ips(&self) -> Result<Vec<IpPattern>, console::errors::GetAuthInfoError>;
|
||||
fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), console::errors::GetAuthInfoError>;
|
||||
}
|
||||
|
||||
impl std::fmt::Display for BackendType<'_, ()> {
|
||||
@@ -67,14 +78,14 @@ impl std::fmt::Display for BackendType<'_, ()> {
|
||||
ConsoleBackend::Console(endpoint) => {
|
||||
fmt.debug_tuple("Console").field(&endpoint.url()).finish()
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
ConsoleBackend::Postgres(endpoint) => {
|
||||
fmt.debug_tuple("Postgres").field(&endpoint.url()).finish()
|
||||
}
|
||||
#[cfg(test)]
|
||||
ConsoleBackend::Test(_) => fmt.debug_tuple("Test").finish(),
|
||||
},
|
||||
Link(url) => fmt.debug_tuple("Link").field(&url.as_str()).finish(),
|
||||
#[cfg(test)]
|
||||
Test(_) => fmt.debug_tuple("Test").finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -85,10 +96,8 @@ impl<T> BackendType<'_, T> {
|
||||
pub fn as_ref(&self) -> BackendType<'_, &T> {
|
||||
use BackendType::*;
|
||||
match self {
|
||||
Console(c, x) => Console(Cow::Borrowed(c), x),
|
||||
Link(c) => Link(Cow::Borrowed(c)),
|
||||
#[cfg(test)]
|
||||
Test(x) => Test(*x),
|
||||
Console(c, x) => Console(MaybeOwned::Borrowed(c), x),
|
||||
Link(c) => Link(MaybeOwned::Borrowed(c)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -102,8 +111,6 @@ impl<'a, T> BackendType<'a, T> {
|
||||
match self {
|
||||
Console(c, x) => Console(c, f(x)),
|
||||
Link(c) => Link(c),
|
||||
#[cfg(test)]
|
||||
Test(x) => Test(x),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -116,8 +123,6 @@ impl<'a, T, E> BackendType<'a, Result<T, E>> {
|
||||
match self {
|
||||
Console(c, x) => x.map(|x| Console(c, x)),
|
||||
Link(c) => Ok(Link(c)),
|
||||
#[cfg(test)]
|
||||
Test(x) => Ok(Test(x)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -147,7 +152,7 @@ impl ComputeUserInfo {
|
||||
}
|
||||
|
||||
pub enum ComputeCredentialKeys {
|
||||
#[cfg(feature = "testing")]
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
Password(Vec<u8>),
|
||||
AuthKeys(AuthKeys),
|
||||
}
|
||||
@@ -200,13 +205,16 @@ async fn auth_quirks(
|
||||
};
|
||||
|
||||
info!("fetching user's authentication info");
|
||||
let allowed_ips = api.get_allowed_ips(ctx, &info).await?;
|
||||
let (allowed_ips, maybe_secret) = api.get_allowed_ips_and_secret(ctx, &info).await?;
|
||||
|
||||
// check allowed list
|
||||
if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
|
||||
return Err(auth::AuthError::ip_address_not_allowed());
|
||||
}
|
||||
let cached_secret = api.get_role_secret(ctx, &info).await?;
|
||||
let cached_secret = match maybe_secret {
|
||||
Some(secret) => secret,
|
||||
None => api.get_role_secret(ctx, &info).await?,
|
||||
};
|
||||
|
||||
let secret = cached_secret.value.clone().unwrap_or_else(|| {
|
||||
// If we don't have an authentication secret, we mock one to
|
||||
@@ -274,42 +282,6 @@ async fn authenticate_with_secret(
|
||||
classic::authenticate(info, client, config, &mut ctx.latency_timer, secret).await
|
||||
}
|
||||
|
||||
/// wake a compute (or retrieve an existing compute session from cache)
|
||||
async fn wake_compute(
|
||||
ctx: &mut RequestMonitoring,
|
||||
api: &impl console::Api,
|
||||
compute_credentials: ComputeCredentials<ComputeCredentialKeys>,
|
||||
) -> auth::Result<(CachedNodeInfo, ComputeUserInfo)> {
|
||||
let mut num_retries = 0;
|
||||
let mut node = loop {
|
||||
let wake_res = api.wake_compute(ctx, &compute_credentials.info).await;
|
||||
match handle_try_wake(wake_res, num_retries) {
|
||||
Err(e) => {
|
||||
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
|
||||
return Err(e.into());
|
||||
}
|
||||
Ok(ControlFlow::Continue(e)) => {
|
||||
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
|
||||
}
|
||||
Ok(ControlFlow::Break(n)) => break n,
|
||||
}
|
||||
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
tokio::time::sleep(wait_duration).await;
|
||||
};
|
||||
|
||||
ctx.set_project(node.aux.clone());
|
||||
|
||||
match compute_credentials.keys {
|
||||
#[cfg(feature = "testing")]
|
||||
ComputeCredentialKeys::Password(password) => node.config.password(password),
|
||||
ComputeCredentialKeys::AuthKeys(auth_keys) => node.config.auth_keys(auth_keys),
|
||||
};
|
||||
|
||||
Ok((node, compute_credentials.info))
|
||||
}
|
||||
|
||||
impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
/// Get compute endpoint name from the credentials.
|
||||
pub fn get_endpoint(&self) -> Option<EndpointId> {
|
||||
@@ -318,8 +290,6 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
match self {
|
||||
Console(_, user_info) => user_info.endpoint_id.clone(),
|
||||
Link(_) => Some("link".into()),
|
||||
#[cfg(test)]
|
||||
Test(_) => Some("test".into()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -330,8 +300,6 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
match self {
|
||||
Console(_, user_info) => &user_info.user,
|
||||
Link(_) => "link",
|
||||
#[cfg(test)]
|
||||
Test(_) => "test",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -356,8 +324,20 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
|
||||
let compute_credentials =
|
||||
auth_quirks(ctx, &*api, user_info, client, allow_cleartext, config).await?;
|
||||
let (cache_info, user_info) = wake_compute(ctx, &*api, compute_credentials).await?;
|
||||
(cache_info, BackendType::Console(api, user_info))
|
||||
|
||||
let mut num_retries = 0;
|
||||
let mut node =
|
||||
wake_compute(&mut num_retries, ctx, &api, &compute_credentials.info).await?;
|
||||
|
||||
ctx.set_project(node.aux.clone());
|
||||
|
||||
match compute_credentials.keys {
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
ComputeCredentialKeys::Password(password) => node.config.password(password),
|
||||
ComputeCredentialKeys::AuthKeys(auth_keys) => node.config.auth_keys(auth_keys),
|
||||
};
|
||||
|
||||
(node, BackendType::Console(api, compute_credentials.info))
|
||||
}
|
||||
// NOTE: this auth backend doesn't use client credentials.
|
||||
Link(url) => {
|
||||
@@ -370,10 +350,6 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
BackendType::Link(url),
|
||||
)
|
||||
}
|
||||
#[cfg(test)]
|
||||
Test(_) => {
|
||||
unreachable!("this function should never be called in the test backend")
|
||||
}
|
||||
};
|
||||
|
||||
info!("user successfully authenticated");
|
||||
@@ -382,16 +358,14 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
}
|
||||
|
||||
impl BackendType<'_, ComputeUserInfo> {
|
||||
pub async fn get_allowed_ips(
|
||||
pub async fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
) -> Result<CachedAllowedIps, GetAuthInfoError> {
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
|
||||
use BackendType::*;
|
||||
match self {
|
||||
Console(api, user_info) => api.get_allowed_ips(ctx, user_info).await,
|
||||
Link(_) => Ok(Cached::new_uncached(Arc::new(vec![]))),
|
||||
#[cfg(test)]
|
||||
Test(x) => Ok(Cached::new_uncached(Arc::new(x.get_allowed_ips()?))),
|
||||
Console(api, user_info) => api.get_allowed_ips_and_secret(ctx, user_info).await,
|
||||
Link(_) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -406,8 +380,6 @@ impl BackendType<'_, ComputeUserInfo> {
|
||||
match self {
|
||||
Console(api, user_info) => api.wake_compute(ctx, user_info).map_ok(Some).await,
|
||||
Link(_) => Ok(None),
|
||||
#[cfg(test)]
|
||||
Test(x) => x.wake_compute().map(Some),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ pub(super) async fn authenticate(
|
||||
) -> auth::Result<ComputeCredentials<ComputeCredentialKeys>> {
|
||||
let flow = AuthFlow::new(client);
|
||||
let scram_keys = match secret {
|
||||
#[cfg(feature = "testing")]
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
AuthSecret::Md5(_) => {
|
||||
info!("auth endpoint chooses MD5");
|
||||
return Err(auth::AuthError::bad_auth_method("MD5"));
|
||||
|
||||
@@ -83,8 +83,7 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
use ComputeUserInfoParseError::*;
|
||||
|
||||
// Some parameters are stored in the startup message.
|
||||
let get_param = |key| params.get(key).ok_or(MissingKey(key));
|
||||
let user: RoleName = get_param("user")?.into();
|
||||
let user: RoleName = params.user().ok_or(MissingKey("user"))?.into();
|
||||
|
||||
// record the values if we have them
|
||||
ctx.set_application(params.get("application_name").map(SmolStr::from));
|
||||
|
||||
@@ -172,7 +172,7 @@ pub(super) fn validate_password_and_exchange(
|
||||
secret: AuthSecret,
|
||||
) -> super::Result<sasl::Outcome<ComputeCredentialKeys>> {
|
||||
match secret {
|
||||
#[cfg(feature = "testing")]
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
AuthSecret::Md5(_) => {
|
||||
// test only
|
||||
Ok(sasl::Outcome::Success(ComputeCredentialKeys::Password(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use futures::future::Either;
|
||||
use proxy::auth;
|
||||
use proxy::auth::backend::MaybeOwned;
|
||||
use proxy::config::AuthenticationConfig;
|
||||
use proxy::config::CacheOptions;
|
||||
use proxy::config::HttpConfig;
|
||||
@@ -17,9 +18,9 @@ use proxy::usage_metrics;
|
||||
use anyhow::bail;
|
||||
use proxy::config::{self, ProxyConfig};
|
||||
use proxy::serverless;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::pin;
|
||||
use std::sync::Arc;
|
||||
use std::{borrow::Cow, net::SocketAddr};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -259,18 +260,13 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
if let auth::BackendType::Console(api, _) = &config.auth_backend {
|
||||
match &**api {
|
||||
proxy::console::provider::ConsoleBackend::Console(api) => {
|
||||
let cache = api.caches.project_info.clone();
|
||||
if let Some(url) = args.redis_notifications {
|
||||
info!("Starting redis notifications listener ({url})");
|
||||
maintenance_tasks
|
||||
.spawn(notifications::task_main(url.to_owned(), cache.clone()));
|
||||
}
|
||||
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
|
||||
if let proxy::console::provider::ConsoleBackend::Console(api) = &**api {
|
||||
let cache = api.caches.project_info.clone();
|
||||
if let Some(url) = args.redis_notifications {
|
||||
info!("Starting redis notifications listener ({url})");
|
||||
maintenance_tasks.spawn(notifications::task_main(url.to_owned(), cache.clone()));
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
proxy::console::provider::ConsoleBackend::Postgres(_) => {}
|
||||
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -369,18 +365,18 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
|
||||
let api = console::provider::neon::Api::new(endpoint, caches, locks);
|
||||
let api = console::provider::ConsoleBackend::Console(api);
|
||||
auth::BackendType::Console(Cow::Owned(api), ())
|
||||
auth::BackendType::Console(MaybeOwned::Owned(api), ())
|
||||
}
|
||||
#[cfg(feature = "testing")]
|
||||
AuthBackend::Postgres => {
|
||||
let url = args.auth_endpoint.parse()?;
|
||||
let api = console::provider::mock::Api::new(url);
|
||||
let api = console::provider::ConsoleBackend::Postgres(api);
|
||||
auth::BackendType::Console(Cow::Owned(api), ())
|
||||
auth::BackendType::Console(MaybeOwned::Owned(api), ())
|
||||
}
|
||||
AuthBackend::Link => {
|
||||
let url = args.uri.parse()?;
|
||||
auth::BackendType::Link(Cow::Owned(url))
|
||||
auth::BackendType::Link(MaybeOwned::Owned(url))
|
||||
}
|
||||
};
|
||||
let http_config = HttpConfig {
|
||||
|
||||
@@ -89,13 +89,13 @@ impl ConnCfg {
|
||||
pub fn set_startup_params(&mut self, params: &StartupMessageParams) {
|
||||
// Only set `user` if it's not present in the config.
|
||||
// Link auth flow takes username from the console's response.
|
||||
if let (None, Some(user)) = (self.get_user(), params.get("user")) {
|
||||
if let (None, Some(user)) = (self.get_user(), params.user()) {
|
||||
self.user(user);
|
||||
}
|
||||
|
||||
// Only set `dbname` if it's not present in the config.
|
||||
// Link auth flow takes dbname from the console's response.
|
||||
if let (None, Some(dbname)) = (self.get_dbname(), params.get("database")) {
|
||||
if let (None, Some(dbname)) = (self.get_dbname(), params.database()) {
|
||||
self.dbname(dbname);
|
||||
}
|
||||
|
||||
@@ -110,7 +110,7 @@ impl ConnCfg {
|
||||
}
|
||||
|
||||
// TODO: This is especially ugly...
|
||||
if let Some(replication) = params.get("replication") {
|
||||
if let Some(replication) = params.replication() {
|
||||
use tokio_postgres::config::ReplicationMode;
|
||||
match replication {
|
||||
"true" | "on" | "yes" | "1" => {
|
||||
|
||||
@@ -100,31 +100,6 @@ pub struct MetricsAuxInfo {
|
||||
pub branch_id: BranchId,
|
||||
}
|
||||
|
||||
impl MetricsAuxInfo {
|
||||
/// Definitions of labels for traffic metric.
|
||||
pub const TRAFFIC_LABELS: &'static [&'static str] = &[
|
||||
// Received (rx) / sent (tx).
|
||||
"direction",
|
||||
// ID of a project.
|
||||
"project_id",
|
||||
// ID of an endpoint within a project.
|
||||
"endpoint_id",
|
||||
// ID of a branch within a project (snapshot).
|
||||
"branch_id",
|
||||
];
|
||||
|
||||
/// Values of labels for traffic metric.
|
||||
// TODO: add more type safety (validate arity & positions).
|
||||
pub fn traffic_labels(&self, direction: &'static str) -> [&str; 4] {
|
||||
[
|
||||
direction,
|
||||
&self.project_id,
|
||||
&self.endpoint_id,
|
||||
&self.branch_id,
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#[cfg(feature = "testing")]
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod mock;
|
||||
pub mod neon;
|
||||
|
||||
@@ -199,7 +199,7 @@ pub mod errors {
|
||||
/// Auth secret which is managed by the cloud.
|
||||
#[derive(Clone, Eq, PartialEq, Debug)]
|
||||
pub enum AuthSecret {
|
||||
#[cfg(feature = "testing")]
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
/// Md5 hash of user's password.
|
||||
Md5([u8; 16]),
|
||||
|
||||
@@ -250,11 +250,11 @@ pub trait Api {
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedRoleSecret, errors::GetAuthInfoError>;
|
||||
|
||||
async fn get_allowed_ips(
|
||||
async fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedAllowedIps, errors::GetAuthInfoError>;
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError>;
|
||||
|
||||
/// Wake up the compute node and return the corresponding connection info.
|
||||
async fn wake_compute(
|
||||
@@ -264,13 +264,16 @@ pub trait Api {
|
||||
) -> Result<CachedNodeInfo, errors::WakeComputeError>;
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[non_exhaustive]
|
||||
pub enum ConsoleBackend {
|
||||
/// Current Cloud API (V2).
|
||||
Console(neon::Api),
|
||||
/// Local mock of Cloud API (V2).
|
||||
#[cfg(feature = "testing")]
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
Postgres(mock::Api),
|
||||
/// Internal testing
|
||||
#[cfg(test)]
|
||||
Test(Box<dyn crate::auth::backend::TestBackend>),
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -283,21 +286,25 @@ impl Api for ConsoleBackend {
|
||||
use ConsoleBackend::*;
|
||||
match self {
|
||||
Console(api) => api.get_role_secret(ctx, user_info).await,
|
||||
#[cfg(feature = "testing")]
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
Postgres(api) => api.get_role_secret(ctx, user_info).await,
|
||||
#[cfg(test)]
|
||||
Test(_) => unreachable!("this function should never be called in the test backend"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_allowed_ips(
|
||||
async fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedAllowedIps, errors::GetAuthInfoError> {
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError> {
|
||||
use ConsoleBackend::*;
|
||||
match self {
|
||||
Console(api) => api.get_allowed_ips(ctx, user_info).await,
|
||||
#[cfg(feature = "testing")]
|
||||
Postgres(api) => api.get_allowed_ips(ctx, user_info).await,
|
||||
Console(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
Postgres(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
|
||||
#[cfg(test)]
|
||||
Test(api) => api.get_allowed_ips_and_secret(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -310,8 +317,10 @@ impl Api for ConsoleBackend {
|
||||
|
||||
match self {
|
||||
Console(api) => api.wake_compute(ctx, user_info).await,
|
||||
#[cfg(feature = "testing")]
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
Postgres(api) => api.wake_compute(ctx, user_info).await,
|
||||
#[cfg(test)]
|
||||
Test(api) => api.wake_compute(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,14 +157,17 @@ impl super::Api for Api {
|
||||
))
|
||||
}
|
||||
|
||||
async fn get_allowed_ips(
|
||||
async fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
_ctx: &mut RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedAllowedIps, GetAuthInfoError> {
|
||||
Ok(Cached::new_uncached(Arc::new(
|
||||
self.do_get_auth_info(user_info).await?.allowed_ips,
|
||||
)))
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
|
||||
Ok((
|
||||
Cached::new_uncached(Arc::new(
|
||||
self.do_get_auth_info(user_info).await?.allowed_ips,
|
||||
)),
|
||||
None,
|
||||
))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
|
||||
@@ -19,7 +19,6 @@ use tokio::time::Instant;
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Api {
|
||||
endpoint: http::Endpoint,
|
||||
pub caches: &'static ApiCaches,
|
||||
@@ -194,17 +193,17 @@ impl super::Api for Api {
|
||||
Ok(Cached::new_uncached(auth_info.secret))
|
||||
}
|
||||
|
||||
async fn get_allowed_ips(
|
||||
async fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
ctx: &mut RequestMonitoring,
|
||||
user_info: &ComputeUserInfo,
|
||||
) -> Result<CachedAllowedIps, GetAuthInfoError> {
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
|
||||
let ep = &user_info.endpoint;
|
||||
if let Some(allowed_ips) = self.caches.project_info.get_allowed_ips(ep) {
|
||||
ALLOWED_IPS_BY_CACHE_OUTCOME
|
||||
.with_label_values(&["hit"])
|
||||
.inc();
|
||||
return Ok(allowed_ips);
|
||||
return Ok((allowed_ips, None));
|
||||
}
|
||||
ALLOWED_IPS_BY_CACHE_OUTCOME
|
||||
.with_label_values(&["miss"])
|
||||
@@ -223,7 +222,10 @@ impl super::Api for Api {
|
||||
.project_info
|
||||
.insert_allowed_ips(&project_id, ep, allowed_ips.clone());
|
||||
}
|
||||
Ok(Cached::new_uncached(allowed_ips))
|
||||
Ok((
|
||||
Cached::new_uncached(allowed_ips),
|
||||
Some(Cached::new_uncached(auth_info.secret)),
|
||||
))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
|
||||
@@ -208,15 +208,6 @@ pub static NUM_WAKEUP_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_BYTES_PROXIED_PER_CLIENT_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_io_bytes_per_client",
|
||||
"Number of bytes sent/received between client and backend.",
|
||||
crate::console::messages::MetricsAuxInfo::TRAFFIC_LABELS,
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_io_bytes",
|
||||
|
||||
@@ -5,6 +5,7 @@ pub mod connect_compute;
|
||||
pub mod handshake;
|
||||
pub mod passthrough;
|
||||
pub mod retry;
|
||||
pub mod wake_compute;
|
||||
|
||||
use crate::{
|
||||
auth,
|
||||
@@ -236,7 +237,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
{
|
||||
Ok(auth_result) => auth_result,
|
||||
Err(e) => {
|
||||
let db = params.get("database");
|
||||
let db = params.database();
|
||||
let app = params.get("application_name");
|
||||
let params_span = tracing::info_span!("", ?user, ?db, ?app);
|
||||
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
use crate::{
|
||||
auth,
|
||||
compute::{self, PostgresConnection},
|
||||
console::{self, errors::WakeComputeError, Api},
|
||||
console::{self, errors::WakeComputeError},
|
||||
context::RequestMonitoring,
|
||||
metrics::{bool_to_str, NUM_CONNECTION_FAILURES, NUM_WAKEUP_FAILURES},
|
||||
proxy::retry::{retry_after, ShouldRetry},
|
||||
metrics::NUM_CONNECTION_FAILURES,
|
||||
proxy::{
|
||||
retry::{retry_after, ShouldRetry},
|
||||
wake_compute::wake_compute,
|
||||
},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use hyper::StatusCode;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use std::ops::ControlFlow;
|
||||
use tokio::time;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
@@ -88,39 +89,6 @@ impl ConnectMechanism for TcpMechanism<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
fn report_error(e: &WakeComputeError, retry: bool) {
|
||||
use crate::console::errors::ApiError;
|
||||
let retry = bool_to_str(retry);
|
||||
let kind = match e {
|
||||
WakeComputeError::BadComputeAddress(_) => "bad_compute_address",
|
||||
WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error",
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
ref text,
|
||||
}) if text.contains("written data quota exceeded")
|
||||
|| text.contains("the limit for current plan reached") =>
|
||||
{
|
||||
"quota_exceeded"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
..
|
||||
}) => "api_console_locked",
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
}) => "api_console_bad_request",
|
||||
WakeComputeError::ApiError(ApiError::Console { status, .. })
|
||||
if status.is_server_error() =>
|
||||
{
|
||||
"api_console_other_server_error"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
|
||||
WakeComputeError::TimeoutError => "timeout_error",
|
||||
};
|
||||
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
|
||||
}
|
||||
|
||||
/// Try to connect to the compute node, retrying if necessary.
|
||||
/// This function might update `node_info`, so we take it by `&mut`.
|
||||
#[tracing::instrument(skip_all)]
|
||||
@@ -137,7 +105,7 @@ where
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
|
||||
// try once
|
||||
let (config, err) = match mechanism
|
||||
let err = match mechanism
|
||||
.connect_once(ctx, &node_info, CONNECT_TIMEOUT)
|
||||
.await
|
||||
{
|
||||
@@ -145,51 +113,27 @@ where
|
||||
ctx.latency_timer.success();
|
||||
return Ok(res);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
(invalidate_cache(node_info), e)
|
||||
}
|
||||
Err(e) => e,
|
||||
};
|
||||
|
||||
ctx.latency_timer.cache_miss();
|
||||
error!(error = ?err, "could not connect to compute node");
|
||||
|
||||
let mut num_retries = 1;
|
||||
|
||||
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
let node_info = loop {
|
||||
let wake_res = match user_info {
|
||||
auth::BackendType::Console(api, user_info) => api.wake_compute(ctx, user_info).await,
|
||||
// nothing to do?
|
||||
auth::BackendType::Link(_) => return Err(err.into()),
|
||||
// test backend
|
||||
#[cfg(test)]
|
||||
auth::BackendType::Test(x) => x.wake_compute(),
|
||||
};
|
||||
match user_info {
|
||||
auth::BackendType::Console(api, info) => {
|
||||
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
|
||||
match handle_try_wake(wake_res, num_retries) {
|
||||
Err(e) => {
|
||||
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
|
||||
report_error(&e, false);
|
||||
return Err(e.into());
|
||||
}
|
||||
// failed to wake up but we can continue to retry
|
||||
Ok(ControlFlow::Continue(e)) => {
|
||||
report_error(&e, true);
|
||||
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
|
||||
}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
Ok(ControlFlow::Break(mut node_info)) => {
|
||||
node_info.config.reuse_password(&config);
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
break node_info;
|
||||
}
|
||||
ctx.latency_timer.cache_miss();
|
||||
let config = invalidate_cache(node_info);
|
||||
node_info = wake_compute(&mut num_retries, ctx, api, info).await?;
|
||||
|
||||
node_info.config.reuse_password(&config);
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
}
|
||||
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
|
||||
time::sleep(wait_duration).await;
|
||||
// nothing to do?
|
||||
auth::BackendType::Link(_) => {}
|
||||
};
|
||||
|
||||
// now that we have a new node, try connect to it repeatedly.
|
||||
@@ -221,23 +165,3 @@ where
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to wake up the compute node.
|
||||
/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable
|
||||
/// * Returns Ok(Break(node)) if the wakeup succeeded
|
||||
/// * Returns Err(e) if there was an error
|
||||
pub fn handle_try_wake(
|
||||
result: Result<console::CachedNodeInfo, WakeComputeError>,
|
||||
num_retries: u32,
|
||||
) -> Result<ControlFlow<console::CachedNodeInfo, WakeComputeError>, WakeComputeError> {
|
||||
match result {
|
||||
Err(err) => match &err {
|
||||
WakeComputeError::ApiError(api) if api.should_retry(num_retries) => {
|
||||
Ok(ControlFlow::Continue(err))
|
||||
}
|
||||
_ => Err(err),
|
||||
},
|
||||
// Ready to try again.
|
||||
Ok(new) => Ok(ControlFlow::Break(new)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::{
|
||||
console::messages::MetricsAuxInfo,
|
||||
context::RequestMonitoring,
|
||||
metrics::{NUM_BYTES_PROXIED_COUNTER, NUM_BYTES_PROXIED_PER_CLIENT_COUNTER},
|
||||
metrics::NUM_BYTES_PROXIED_COUNTER,
|
||||
usage_metrics::{Ids, USAGE_METRICS},
|
||||
};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
@@ -25,27 +25,23 @@ pub async fn proxy_pass(
|
||||
});
|
||||
|
||||
let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&["tx"]);
|
||||
let m_sent2 = NUM_BYTES_PROXIED_PER_CLIENT_COUNTER.with_label_values(&aux.traffic_labels("tx"));
|
||||
let mut client = MeasuredStream::new(
|
||||
client,
|
||||
|_| {},
|
||||
|cnt| {
|
||||
// Number of bytes we sent to the client (outbound).
|
||||
m_sent.inc_by(cnt as u64);
|
||||
m_sent2.inc_by(cnt as u64);
|
||||
usage.record_egress(cnt as u64);
|
||||
},
|
||||
);
|
||||
|
||||
let m_recv = NUM_BYTES_PROXIED_COUNTER.with_label_values(&["rx"]);
|
||||
let m_recv2 = NUM_BYTES_PROXIED_PER_CLIENT_COUNTER.with_label_values(&aux.traffic_labels("rx"));
|
||||
let mut compute = MeasuredStream::new(
|
||||
compute,
|
||||
|_| {},
|
||||
|cnt| {
|
||||
// Number of bytes the client sent to the compute node (inbound).
|
||||
m_recv.inc_by(cnt as u64);
|
||||
m_recv2.inc_by(cnt as u64);
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -5,9 +5,9 @@ mod mitm;
|
||||
use super::connect_compute::ConnectMechanism;
|
||||
use super::retry::ShouldRetry;
|
||||
use super::*;
|
||||
use crate::auth::backend::{ComputeUserInfo, TestBackend};
|
||||
use crate::auth::IpPattern;
|
||||
use crate::auth::backend::{ComputeUserInfo, MaybeOwned, TestBackend};
|
||||
use crate::config::CertResolver;
|
||||
use crate::console::provider::{CachedAllowedIps, CachedRoleSecret, ConsoleBackend};
|
||||
use crate::console::{self, CachedNodeInfo, NodeInfo};
|
||||
use crate::proxy::retry::{retry_after, NUM_RETRIES_CONNECT};
|
||||
use crate::{auth, http, sasl, scram};
|
||||
@@ -371,6 +371,7 @@ enum ConnectAction {
|
||||
Fail,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct TestConnectMechanism {
|
||||
counter: Arc<std::sync::Mutex<usize>>,
|
||||
sequence: Vec<ConnectAction>,
|
||||
@@ -471,7 +472,10 @@ impl TestBackend for TestConnectMechanism {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_allowed_ips(&self) -> Result<Vec<IpPattern>, console::errors::GetAuthInfoError> {
|
||||
fn get_allowed_ips_and_secret(
|
||||
&self,
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), console::errors::GetAuthInfoError>
|
||||
{
|
||||
unimplemented!("not used in tests")
|
||||
}
|
||||
}
|
||||
@@ -487,9 +491,16 @@ fn helper_create_cached_node_info() -> CachedNodeInfo {
|
||||
|
||||
fn helper_create_connect_info(
|
||||
mechanism: &TestConnectMechanism,
|
||||
) -> (CachedNodeInfo, auth::BackendType<'_, ComputeUserInfo>) {
|
||||
) -> (CachedNodeInfo, auth::BackendType<'static, ComputeUserInfo>) {
|
||||
let cache = helper_create_cached_node_info();
|
||||
let user_info = auth::BackendType::Test(mechanism);
|
||||
let user_info = auth::BackendType::Console(
|
||||
MaybeOwned::Owned(ConsoleBackend::Test(Box::new(mechanism.clone()))),
|
||||
ComputeUserInfo {
|
||||
endpoint: "endpoint".into(),
|
||||
user: "user".into(),
|
||||
options: NeonOptions::parse_options_raw(""),
|
||||
},
|
||||
);
|
||||
(cache, user_info)
|
||||
}
|
||||
|
||||
|
||||
95
proxy/src/proxy/wake_compute.rs
Normal file
95
proxy/src/proxy/wake_compute.rs
Normal file
@@ -0,0 +1,95 @@
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::console::{
|
||||
errors::WakeComputeError,
|
||||
provider::{CachedNodeInfo, ConsoleBackend},
|
||||
Api,
|
||||
};
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::metrics::{bool_to_str, NUM_WAKEUP_FAILURES};
|
||||
use crate::proxy::retry::retry_after;
|
||||
use hyper::StatusCode;
|
||||
use std::ops::ControlFlow;
|
||||
use tracing::{error, warn};
|
||||
|
||||
use super::retry::ShouldRetry;
|
||||
|
||||
/// wake a compute (or retrieve an existing compute session from cache)
|
||||
pub async fn wake_compute(
|
||||
num_retries: &mut u32,
|
||||
ctx: &mut RequestMonitoring,
|
||||
api: &ConsoleBackend,
|
||||
info: &ComputeUserInfo,
|
||||
) -> Result<CachedNodeInfo, WakeComputeError> {
|
||||
loop {
|
||||
let wake_res = api.wake_compute(ctx, info).await;
|
||||
match handle_try_wake(wake_res, *num_retries) {
|
||||
Err(e) => {
|
||||
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
|
||||
report_error(&e, false);
|
||||
return Err(e);
|
||||
}
|
||||
Ok(ControlFlow::Continue(e)) => {
|
||||
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
|
||||
report_error(&e, true);
|
||||
}
|
||||
Ok(ControlFlow::Break(n)) => return Ok(n),
|
||||
}
|
||||
|
||||
let wait_duration = retry_after(*num_retries);
|
||||
*num_retries += 1;
|
||||
tokio::time::sleep(wait_duration).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to wake up the compute node.
|
||||
/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable
|
||||
/// * Returns Ok(Break(node)) if the wakeup succeeded
|
||||
/// * Returns Err(e) if there was an error
|
||||
pub fn handle_try_wake(
|
||||
result: Result<CachedNodeInfo, WakeComputeError>,
|
||||
num_retries: u32,
|
||||
) -> Result<ControlFlow<CachedNodeInfo, WakeComputeError>, WakeComputeError> {
|
||||
match result {
|
||||
Err(err) => match &err {
|
||||
WakeComputeError::ApiError(api) if api.should_retry(num_retries) => {
|
||||
Ok(ControlFlow::Continue(err))
|
||||
}
|
||||
_ => Err(err),
|
||||
},
|
||||
// Ready to try again.
|
||||
Ok(new) => Ok(ControlFlow::Break(new)),
|
||||
}
|
||||
}
|
||||
|
||||
fn report_error(e: &WakeComputeError, retry: bool) {
|
||||
use crate::console::errors::ApiError;
|
||||
let retry = bool_to_str(retry);
|
||||
let kind = match e {
|
||||
WakeComputeError::BadComputeAddress(_) => "bad_compute_address",
|
||||
WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error",
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
ref text,
|
||||
}) if text.contains("written data quota exceeded")
|
||||
|| text.contains("the limit for current plan reached") =>
|
||||
{
|
||||
"quota_exceeded"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
..
|
||||
}) => "api_console_locked",
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
}) => "api_console_bad_request",
|
||||
WakeComputeError::ApiError(ApiError::Console { status, .. })
|
||||
if status.is_server_error() =>
|
||||
{
|
||||
"api_console_other_server_error"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
|
||||
WakeComputeError::TimeoutError => "timeout_error",
|
||||
};
|
||||
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
|
||||
}
|
||||
@@ -3,6 +3,7 @@
|
||||
//! Handles both SQL over HTTP and SQL over Websockets.
|
||||
|
||||
mod conn_pool;
|
||||
mod json;
|
||||
mod sql_over_http;
|
||||
mod websocket;
|
||||
|
||||
|
||||
@@ -540,7 +540,7 @@ async fn connect_to_compute(
|
||||
.map(|_| conn_info.user_info.clone());
|
||||
|
||||
if !config.disable_ip_check_for_http {
|
||||
let allowed_ips = backend.get_allowed_ips(ctx).await?;
|
||||
let (allowed_ips, _) = backend.get_allowed_ips_and_secret(ctx).await?;
|
||||
if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
|
||||
return Err(auth::AuthError::ip_address_not_allowed().into());
|
||||
}
|
||||
|
||||
448
proxy/src/serverless/json.rs
Normal file
448
proxy/src/serverless/json.rs
Normal file
@@ -0,0 +1,448 @@
|
||||
use serde_json::Map;
|
||||
use serde_json::Value;
|
||||
use tokio_postgres::types::Kind;
|
||||
use tokio_postgres::types::Type;
|
||||
use tokio_postgres::Row;
|
||||
|
||||
//
|
||||
// Convert json non-string types to strings, so that they can be passed to Postgres
|
||||
// as parameters.
|
||||
//
|
||||
pub fn json_to_pg_text(json: Vec<Value>) -> Vec<Option<String>> {
|
||||
json.iter()
|
||||
.map(|value| {
|
||||
match value {
|
||||
// special care for nulls
|
||||
Value::Null => None,
|
||||
|
||||
// convert to text with escaping
|
||||
v @ (Value::Bool(_) | Value::Number(_) | Value::Object(_)) => Some(v.to_string()),
|
||||
|
||||
// avoid escaping here, as we pass this as a parameter
|
||||
Value::String(s) => Some(s.to_string()),
|
||||
|
||||
// special care for arrays
|
||||
Value::Array(_) => json_array_to_pg_array(value),
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
//
|
||||
// Serialize a JSON array to a Postgres array. Contrary to the strings in the params
|
||||
// in the array we need to escape the strings. Postgres is okay with arrays of form
|
||||
// '{1,"2",3}'::int[], so we don't check that array holds values of the same type, leaving
|
||||
// it for Postgres to check.
|
||||
//
|
||||
// Example of the same escaping in node-postgres: packages/pg/lib/utils.js
|
||||
//
|
||||
fn json_array_to_pg_array(value: &Value) -> Option<String> {
|
||||
match value {
|
||||
// special care for nulls
|
||||
Value::Null => None,
|
||||
|
||||
// convert to text with escaping
|
||||
// here string needs to be escaped, as it is part of the array
|
||||
v @ (Value::Bool(_) | Value::Number(_) | Value::String(_)) => Some(v.to_string()),
|
||||
v @ Value::Object(_) => json_array_to_pg_array(&Value::String(v.to_string())),
|
||||
|
||||
// recurse into array
|
||||
Value::Array(arr) => {
|
||||
let vals = arr
|
||||
.iter()
|
||||
.map(json_array_to_pg_array)
|
||||
.map(|v| v.unwrap_or_else(|| "NULL".to_string()))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
|
||||
Some(format!("{{{}}}", vals))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Convert postgres row with text-encoded values to JSON object
|
||||
//
|
||||
pub fn pg_text_row_to_json(
|
||||
row: &Row,
|
||||
columns: &[Type],
|
||||
raw_output: bool,
|
||||
array_mode: bool,
|
||||
) -> Result<Value, anyhow::Error> {
|
||||
let iter = row
|
||||
.columns()
|
||||
.iter()
|
||||
.zip(columns)
|
||||
.enumerate()
|
||||
.map(|(i, (column, typ))| {
|
||||
let name = column.name();
|
||||
let pg_value = row.as_text(i)?;
|
||||
let json_value = if raw_output {
|
||||
match pg_value {
|
||||
Some(v) => Value::String(v.to_string()),
|
||||
None => Value::Null,
|
||||
}
|
||||
} else {
|
||||
pg_text_to_json(pg_value, typ)?
|
||||
};
|
||||
Ok((name.to_string(), json_value))
|
||||
});
|
||||
|
||||
if array_mode {
|
||||
// drop keys and aggregate into array
|
||||
let arr = iter
|
||||
.map(|r| r.map(|(_key, val)| val))
|
||||
.collect::<Result<Vec<Value>, anyhow::Error>>()?;
|
||||
Ok(Value::Array(arr))
|
||||
} else {
|
||||
let obj = iter.collect::<Result<Map<String, Value>, anyhow::Error>>()?;
|
||||
Ok(Value::Object(obj))
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Convert postgres text-encoded value to JSON value
|
||||
//
|
||||
fn pg_text_to_json(pg_value: Option<&str>, pg_type: &Type) -> Result<Value, anyhow::Error> {
|
||||
if let Some(val) = pg_value {
|
||||
if let Kind::Array(elem_type) = pg_type.kind() {
|
||||
return pg_array_parse(val, elem_type);
|
||||
}
|
||||
|
||||
match *pg_type {
|
||||
Type::BOOL => Ok(Value::Bool(val == "t")),
|
||||
Type::INT2 | Type::INT4 => {
|
||||
let val = val.parse::<i32>()?;
|
||||
Ok(Value::Number(serde_json::Number::from(val)))
|
||||
}
|
||||
Type::FLOAT4 | Type::FLOAT8 => {
|
||||
let fval = val.parse::<f64>()?;
|
||||
let num = serde_json::Number::from_f64(fval);
|
||||
if let Some(num) = num {
|
||||
Ok(Value::Number(num))
|
||||
} else {
|
||||
// Pass Nan, Inf, -Inf as strings
|
||||
// JS JSON.stringify() does converts them to null, but we
|
||||
// want to preserve them, so we pass them as strings
|
||||
Ok(Value::String(val.to_string()))
|
||||
}
|
||||
}
|
||||
Type::JSON | Type::JSONB => Ok(serde_json::from_str(val)?),
|
||||
_ => Ok(Value::String(val.to_string())),
|
||||
}
|
||||
} else {
|
||||
Ok(Value::Null)
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Parse postgres array into JSON array.
|
||||
//
|
||||
// This is a bit involved because we need to handle nested arrays and quoted
|
||||
// values. Unlike postgres we don't check that all nested arrays have the same
|
||||
// dimensions, we just return them as is.
|
||||
//
|
||||
fn pg_array_parse(pg_array: &str, elem_type: &Type) -> Result<Value, anyhow::Error> {
|
||||
_pg_array_parse(pg_array, elem_type, false).map(|(v, _)| v)
|
||||
}
|
||||
|
||||
fn _pg_array_parse(
|
||||
pg_array: &str,
|
||||
elem_type: &Type,
|
||||
nested: bool,
|
||||
) -> Result<(Value, usize), anyhow::Error> {
|
||||
let mut pg_array_chr = pg_array.char_indices();
|
||||
let mut level = 0;
|
||||
let mut quote = false;
|
||||
let mut entries: Vec<Value> = Vec::new();
|
||||
let mut entry = String::new();
|
||||
|
||||
// skip bounds decoration
|
||||
if let Some('[') = pg_array.chars().next() {
|
||||
for (_, c) in pg_array_chr.by_ref() {
|
||||
if c == '=' {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn push_checked(
|
||||
entry: &mut String,
|
||||
entries: &mut Vec<Value>,
|
||||
elem_type: &Type,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
if !entry.is_empty() {
|
||||
// While in usual postgres response we get nulls as None and everything else
|
||||
// as Some(&str), in arrays we get NULL as unquoted 'NULL' string (while
|
||||
// string with value 'NULL' will be represented by '"NULL"'). So catch NULLs
|
||||
// here while we have quotation info and convert them to None.
|
||||
if entry == "NULL" {
|
||||
entries.push(pg_text_to_json(None, elem_type)?);
|
||||
} else {
|
||||
entries.push(pg_text_to_json(Some(entry), elem_type)?);
|
||||
}
|
||||
entry.clear();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
while let Some((mut i, mut c)) = pg_array_chr.next() {
|
||||
let mut escaped = false;
|
||||
|
||||
if c == '\\' {
|
||||
escaped = true;
|
||||
(i, c) = pg_array_chr.next().unwrap();
|
||||
}
|
||||
|
||||
match c {
|
||||
'{' if !quote => {
|
||||
level += 1;
|
||||
if level > 1 {
|
||||
let (res, off) = _pg_array_parse(&pg_array[i..], elem_type, true)?;
|
||||
entries.push(res);
|
||||
for _ in 0..off - 1 {
|
||||
pg_array_chr.next();
|
||||
}
|
||||
}
|
||||
}
|
||||
'}' if !quote => {
|
||||
level -= 1;
|
||||
if level == 0 {
|
||||
push_checked(&mut entry, &mut entries, elem_type)?;
|
||||
if nested {
|
||||
return Ok((Value::Array(entries), i));
|
||||
}
|
||||
}
|
||||
}
|
||||
'"' if !escaped => {
|
||||
if quote {
|
||||
// end of quoted string, so push it manually without any checks
|
||||
// for emptiness or nulls
|
||||
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
|
||||
entry.clear();
|
||||
}
|
||||
quote = !quote;
|
||||
}
|
||||
',' if !quote => {
|
||||
push_checked(&mut entry, &mut entries, elem_type)?;
|
||||
}
|
||||
_ => {
|
||||
entry.push(c);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if level != 0 {
|
||||
return Err(anyhow::anyhow!("unbalanced array"));
|
||||
}
|
||||
|
||||
Ok((Value::Array(entries), 0))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serde_json::json;
|
||||
|
||||
#[test]
|
||||
fn test_atomic_types_to_pg_params() {
|
||||
let json = vec![Value::Bool(true), Value::Bool(false)];
|
||||
let pg_params = json_to_pg_text(json);
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec![Some("true".to_owned()), Some("false".to_owned())]
|
||||
);
|
||||
|
||||
let json = vec![Value::Number(serde_json::Number::from(42))];
|
||||
let pg_params = json_to_pg_text(json);
|
||||
assert_eq!(pg_params, vec![Some("42".to_owned())]);
|
||||
|
||||
let json = vec![Value::String("foo\"".to_string())];
|
||||
let pg_params = json_to_pg_text(json);
|
||||
assert_eq!(pg_params, vec![Some("foo\"".to_owned())]);
|
||||
|
||||
let json = vec![Value::Null];
|
||||
let pg_params = json_to_pg_text(json);
|
||||
assert_eq!(pg_params, vec![None]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_json_array_to_pg_array() {
|
||||
// atoms and escaping
|
||||
let json = "[true, false, null, \"NULL\", 42, \"foo\", \"bar\\\"-\\\\\"]";
|
||||
let json: Value = serde_json::from_str(json).unwrap();
|
||||
let pg_params = json_to_pg_text(vec![json]);
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec![Some(
|
||||
"{true,false,NULL,\"NULL\",42,\"foo\",\"bar\\\"-\\\\\"}".to_owned()
|
||||
)]
|
||||
);
|
||||
|
||||
// nested arrays
|
||||
let json = "[[true, false], [null, 42], [\"foo\", \"bar\\\"-\\\\\"]]";
|
||||
let json: Value = serde_json::from_str(json).unwrap();
|
||||
let pg_params = json_to_pg_text(vec![json]);
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec![Some(
|
||||
"{{true,false},{NULL,42},{\"foo\",\"bar\\\"-\\\\\"}}".to_owned()
|
||||
)]
|
||||
);
|
||||
// array of objects
|
||||
let json = r#"[{"foo": 1},{"bar": 2}]"#;
|
||||
let json: Value = serde_json::from_str(json).unwrap();
|
||||
let pg_params = json_to_pg_text(vec![json]);
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec![Some(r#"{"{\"foo\":1}","{\"bar\":2}"}"#.to_owned())]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_atomic_types_parse() {
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("foo"), &Type::TEXT).unwrap(),
|
||||
json!("foo")
|
||||
);
|
||||
assert_eq!(pg_text_to_json(None, &Type::TEXT).unwrap(), json!(null));
|
||||
assert_eq!(pg_text_to_json(Some("42"), &Type::INT4).unwrap(), json!(42));
|
||||
assert_eq!(pg_text_to_json(Some("42"), &Type::INT2).unwrap(), json!(42));
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("42"), &Type::INT8).unwrap(),
|
||||
json!("42")
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("42.42"), &Type::FLOAT8).unwrap(),
|
||||
json!(42.42)
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("42.42"), &Type::FLOAT4).unwrap(),
|
||||
json!(42.42)
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("NaN"), &Type::FLOAT4).unwrap(),
|
||||
json!("NaN")
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("Infinity"), &Type::FLOAT4).unwrap(),
|
||||
json!("Infinity")
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("-Infinity"), &Type::FLOAT4).unwrap(),
|
||||
json!("-Infinity")
|
||||
);
|
||||
|
||||
let json: Value =
|
||||
serde_json::from_str("{\"s\":\"str\",\"n\":42,\"f\":4.2,\"a\":[null,3,\"a\"]}")
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
pg_text_to_json(
|
||||
Some(r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#),
|
||||
&Type::JSONB
|
||||
)
|
||||
.unwrap(),
|
||||
json
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pg_array_parse_text() {
|
||||
fn pt(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, &Type::TEXT).unwrap()
|
||||
}
|
||||
assert_eq!(
|
||||
pt(r#"{"aa\"\\\,a",cha,"bbbb"}"#),
|
||||
json!(["aa\"\\,a", "cha", "bbbb"])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{{"foo","bar"},{"bee","bop"}}"#),
|
||||
json!([["foo", "bar"], ["bee", "bop"]])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{{{{"foo",NULL,"bop",bup}}}}"#),
|
||||
json!([[[["foo", null, "bop", "bup"]]]])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{{"1",2,3},{4,NULL,6},{NULL,NULL,NULL}}"#),
|
||||
json!([["1", "2", "3"], ["4", null, "6"], [null, null, null]])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pg_array_parse_bool() {
|
||||
fn pb(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, &Type::BOOL).unwrap()
|
||||
}
|
||||
assert_eq!(pb(r#"{t,f,t}"#), json!([true, false, true]));
|
||||
assert_eq!(pb(r#"{{t,f,t}}"#), json!([[true, false, true]]));
|
||||
assert_eq!(
|
||||
pb(r#"{{t,f},{f,t}}"#),
|
||||
json!([[true, false], [false, true]])
|
||||
);
|
||||
assert_eq!(
|
||||
pb(r#"{{t,NULL},{NULL,f}}"#),
|
||||
json!([[true, null], [null, false]])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pg_array_parse_numbers() {
|
||||
fn pn(pg_arr: &str, ty: &Type) -> Value {
|
||||
pg_array_parse(pg_arr, ty).unwrap()
|
||||
}
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT4), json!([1, 2, 3]));
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT2), json!([1, 2, 3]));
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT8), json!(["1", "2", "3"]));
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT4), json!([1.0, 2.0, 3.0]));
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT8), json!([1.0, 2.0, 3.0]));
|
||||
assert_eq!(
|
||||
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT4),
|
||||
json!([1.1, 2.2, 3.3])
|
||||
);
|
||||
assert_eq!(
|
||||
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT8),
|
||||
json!([1.1, 2.2, 3.3])
|
||||
);
|
||||
assert_eq!(
|
||||
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT4),
|
||||
json!(["NaN", "Infinity", "-Infinity"])
|
||||
);
|
||||
assert_eq!(
|
||||
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT8),
|
||||
json!(["NaN", "Infinity", "-Infinity"])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pg_array_with_decoration() {
|
||||
fn p(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, &Type::INT2).unwrap()
|
||||
}
|
||||
assert_eq!(
|
||||
p(r#"[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}"#),
|
||||
json!([[[1, 2, 3], [4, 5, 6]]])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pg_array_parse_json() {
|
||||
fn pt(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, &Type::JSONB).unwrap()
|
||||
}
|
||||
assert_eq!(pt(r#"{"{}"}"#), json!([{}]));
|
||||
assert_eq!(
|
||||
pt(r#"{"{\"foo\": 1, \"bar\": 2}"}"#),
|
||||
json!([{"foo": 1, "bar": 2}])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{"{\"foo\": 1}", "{\"bar\": 2}"}"#),
|
||||
json!([{"foo": 1}, {"bar": 2}])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{{"{\"foo\": 1}", "{\"bar\": 2}"}}"#),
|
||||
json!([[{"foo": 1}, {"bar": 2}]])
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -12,16 +12,12 @@ use hyper::Response;
|
||||
use hyper::StatusCode;
|
||||
use hyper::{Body, HeaderMap, Request};
|
||||
use serde_json::json;
|
||||
use serde_json::Map;
|
||||
use serde_json::Value;
|
||||
use tokio_postgres::error::DbError;
|
||||
use tokio_postgres::error::ErrorPosition;
|
||||
use tokio_postgres::types::Kind;
|
||||
use tokio_postgres::types::Type;
|
||||
use tokio_postgres::GenericClient;
|
||||
use tokio_postgres::IsolationLevel;
|
||||
use tokio_postgres::ReadyForQueryStatus;
|
||||
use tokio_postgres::Row;
|
||||
use tokio_postgres::Transaction;
|
||||
use tracing::error;
|
||||
use tracing::instrument;
|
||||
@@ -40,6 +36,7 @@ use crate::RoleName;
|
||||
|
||||
use super::conn_pool::ConnInfo;
|
||||
use super::conn_pool::GlobalConnPool;
|
||||
use super::json::{json_to_pg_text, pg_text_row_to_json};
|
||||
use super::SERVERLESS_DRIVER_SNI;
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
@@ -72,62 +69,6 @@ static TXN_DEFERRABLE: HeaderName = HeaderName::from_static("neon-batch-deferrab
|
||||
|
||||
static HEADER_VALUE_TRUE: HeaderValue = HeaderValue::from_static("true");
|
||||
|
||||
//
|
||||
// Convert json non-string types to strings, so that they can be passed to Postgres
|
||||
// as parameters.
|
||||
//
|
||||
fn json_to_pg_text(json: Vec<Value>) -> Vec<Option<String>> {
|
||||
json.iter()
|
||||
.map(|value| {
|
||||
match value {
|
||||
// special care for nulls
|
||||
Value::Null => None,
|
||||
|
||||
// convert to text with escaping
|
||||
v @ (Value::Bool(_) | Value::Number(_) | Value::Object(_)) => Some(v.to_string()),
|
||||
|
||||
// avoid escaping here, as we pass this as a parameter
|
||||
Value::String(s) => Some(s.to_string()),
|
||||
|
||||
// special care for arrays
|
||||
Value::Array(_) => json_array_to_pg_array(value),
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
//
|
||||
// Serialize a JSON array to a Postgres array. Contrary to the strings in the params
|
||||
// in the array we need to escape the strings. Postgres is okay with arrays of form
|
||||
// '{1,"2",3}'::int[], so we don't check that array holds values of the same type, leaving
|
||||
// it for Postgres to check.
|
||||
//
|
||||
// Example of the same escaping in node-postgres: packages/pg/lib/utils.js
|
||||
//
|
||||
fn json_array_to_pg_array(value: &Value) -> Option<String> {
|
||||
match value {
|
||||
// special care for nulls
|
||||
Value::Null => None,
|
||||
|
||||
// convert to text with escaping
|
||||
// here string needs to be escaped, as it is part of the array
|
||||
v @ (Value::Bool(_) | Value::Number(_) | Value::String(_)) => Some(v.to_string()),
|
||||
v @ Value::Object(_) => json_array_to_pg_array(&Value::String(v.to_string())),
|
||||
|
||||
// recurse into array
|
||||
Value::Array(arr) => {
|
||||
let vals = arr
|
||||
.iter()
|
||||
.map(json_array_to_pg_array)
|
||||
.map(|v| v.unwrap_or_else(|| "NULL".to_string()))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
|
||||
Some(format!("{{{}}}", vals))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_conn_info(
|
||||
ctx: &mut RequestMonitoring,
|
||||
headers: &HeaderMap,
|
||||
@@ -611,389 +552,3 @@ async fn query_to_json<T: GenericClient>(
|
||||
}),
|
||||
))
|
||||
}
|
||||
|
||||
//
|
||||
// Convert postgres row with text-encoded values to JSON object
|
||||
//
|
||||
pub fn pg_text_row_to_json(
|
||||
row: &Row,
|
||||
columns: &[Type],
|
||||
raw_output: bool,
|
||||
array_mode: bool,
|
||||
) -> Result<Value, anyhow::Error> {
|
||||
let iter = row
|
||||
.columns()
|
||||
.iter()
|
||||
.zip(columns)
|
||||
.enumerate()
|
||||
.map(|(i, (column, typ))| {
|
||||
let name = column.name();
|
||||
let pg_value = row.as_text(i)?;
|
||||
let json_value = if raw_output {
|
||||
match pg_value {
|
||||
Some(v) => Value::String(v.to_string()),
|
||||
None => Value::Null,
|
||||
}
|
||||
} else {
|
||||
pg_text_to_json(pg_value, typ)?
|
||||
};
|
||||
Ok((name.to_string(), json_value))
|
||||
});
|
||||
|
||||
if array_mode {
|
||||
// drop keys and aggregate into array
|
||||
let arr = iter
|
||||
.map(|r| r.map(|(_key, val)| val))
|
||||
.collect::<Result<Vec<Value>, anyhow::Error>>()?;
|
||||
Ok(Value::Array(arr))
|
||||
} else {
|
||||
let obj = iter.collect::<Result<Map<String, Value>, anyhow::Error>>()?;
|
||||
Ok(Value::Object(obj))
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Convert postgres text-encoded value to JSON value
|
||||
//
|
||||
pub fn pg_text_to_json(pg_value: Option<&str>, pg_type: &Type) -> Result<Value, anyhow::Error> {
|
||||
if let Some(val) = pg_value {
|
||||
if let Kind::Array(elem_type) = pg_type.kind() {
|
||||
return pg_array_parse(val, elem_type);
|
||||
}
|
||||
|
||||
match *pg_type {
|
||||
Type::BOOL => Ok(Value::Bool(val == "t")),
|
||||
Type::INT2 | Type::INT4 => {
|
||||
let val = val.parse::<i32>()?;
|
||||
Ok(Value::Number(serde_json::Number::from(val)))
|
||||
}
|
||||
Type::FLOAT4 | Type::FLOAT8 => {
|
||||
let fval = val.parse::<f64>()?;
|
||||
let num = serde_json::Number::from_f64(fval);
|
||||
if let Some(num) = num {
|
||||
Ok(Value::Number(num))
|
||||
} else {
|
||||
// Pass Nan, Inf, -Inf as strings
|
||||
// JS JSON.stringify() does converts them to null, but we
|
||||
// want to preserve them, so we pass them as strings
|
||||
Ok(Value::String(val.to_string()))
|
||||
}
|
||||
}
|
||||
Type::JSON | Type::JSONB => Ok(serde_json::from_str(val)?),
|
||||
_ => Ok(Value::String(val.to_string())),
|
||||
}
|
||||
} else {
|
||||
Ok(Value::Null)
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Parse postgres array into JSON array.
|
||||
//
|
||||
// This is a bit involved because we need to handle nested arrays and quoted
|
||||
// values. Unlike postgres we don't check that all nested arrays have the same
|
||||
// dimensions, we just return them as is.
|
||||
//
|
||||
fn pg_array_parse(pg_array: &str, elem_type: &Type) -> Result<Value, anyhow::Error> {
|
||||
_pg_array_parse(pg_array, elem_type, false).map(|(v, _)| v)
|
||||
}
|
||||
|
||||
fn _pg_array_parse(
|
||||
pg_array: &str,
|
||||
elem_type: &Type,
|
||||
nested: bool,
|
||||
) -> Result<(Value, usize), anyhow::Error> {
|
||||
let mut pg_array_chr = pg_array.char_indices();
|
||||
let mut level = 0;
|
||||
let mut quote = false;
|
||||
let mut entries: Vec<Value> = Vec::new();
|
||||
let mut entry = String::new();
|
||||
|
||||
// skip bounds decoration
|
||||
if let Some('[') = pg_array.chars().next() {
|
||||
for (_, c) in pg_array_chr.by_ref() {
|
||||
if c == '=' {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn push_checked(
|
||||
entry: &mut String,
|
||||
entries: &mut Vec<Value>,
|
||||
elem_type: &Type,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
if !entry.is_empty() {
|
||||
// While in usual postgres response we get nulls as None and everything else
|
||||
// as Some(&str), in arrays we get NULL as unquoted 'NULL' string (while
|
||||
// string with value 'NULL' will be represented by '"NULL"'). So catch NULLs
|
||||
// here while we have quotation info and convert them to None.
|
||||
if entry == "NULL" {
|
||||
entries.push(pg_text_to_json(None, elem_type)?);
|
||||
} else {
|
||||
entries.push(pg_text_to_json(Some(entry), elem_type)?);
|
||||
}
|
||||
entry.clear();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
while let Some((mut i, mut c)) = pg_array_chr.next() {
|
||||
let mut escaped = false;
|
||||
|
||||
if c == '\\' {
|
||||
escaped = true;
|
||||
(i, c) = pg_array_chr.next().unwrap();
|
||||
}
|
||||
|
||||
match c {
|
||||
'{' if !quote => {
|
||||
level += 1;
|
||||
if level > 1 {
|
||||
let (res, off) = _pg_array_parse(&pg_array[i..], elem_type, true)?;
|
||||
entries.push(res);
|
||||
for _ in 0..off - 1 {
|
||||
pg_array_chr.next();
|
||||
}
|
||||
}
|
||||
}
|
||||
'}' if !quote => {
|
||||
level -= 1;
|
||||
if level == 0 {
|
||||
push_checked(&mut entry, &mut entries, elem_type)?;
|
||||
if nested {
|
||||
return Ok((Value::Array(entries), i));
|
||||
}
|
||||
}
|
||||
}
|
||||
'"' if !escaped => {
|
||||
if quote {
|
||||
// end of quoted string, so push it manually without any checks
|
||||
// for emptiness or nulls
|
||||
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
|
||||
entry.clear();
|
||||
}
|
||||
quote = !quote;
|
||||
}
|
||||
',' if !quote => {
|
||||
push_checked(&mut entry, &mut entries, elem_type)?;
|
||||
}
|
||||
_ => {
|
||||
entry.push(c);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if level != 0 {
|
||||
return Err(anyhow::anyhow!("unbalanced array"));
|
||||
}
|
||||
|
||||
Ok((Value::Array(entries), 0))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serde_json::json;
|
||||
|
||||
#[test]
|
||||
fn test_atomic_types_to_pg_params() {
|
||||
let json = vec![Value::Bool(true), Value::Bool(false)];
|
||||
let pg_params = json_to_pg_text(json);
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec![Some("true".to_owned()), Some("false".to_owned())]
|
||||
);
|
||||
|
||||
let json = vec![Value::Number(serde_json::Number::from(42))];
|
||||
let pg_params = json_to_pg_text(json);
|
||||
assert_eq!(pg_params, vec![Some("42".to_owned())]);
|
||||
|
||||
let json = vec![Value::String("foo\"".to_string())];
|
||||
let pg_params = json_to_pg_text(json);
|
||||
assert_eq!(pg_params, vec![Some("foo\"".to_owned())]);
|
||||
|
||||
let json = vec![Value::Null];
|
||||
let pg_params = json_to_pg_text(json);
|
||||
assert_eq!(pg_params, vec![None]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_json_array_to_pg_array() {
|
||||
// atoms and escaping
|
||||
let json = "[true, false, null, \"NULL\", 42, \"foo\", \"bar\\\"-\\\\\"]";
|
||||
let json: Value = serde_json::from_str(json).unwrap();
|
||||
let pg_params = json_to_pg_text(vec![json]);
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec![Some(
|
||||
"{true,false,NULL,\"NULL\",42,\"foo\",\"bar\\\"-\\\\\"}".to_owned()
|
||||
)]
|
||||
);
|
||||
|
||||
// nested arrays
|
||||
let json = "[[true, false], [null, 42], [\"foo\", \"bar\\\"-\\\\\"]]";
|
||||
let json: Value = serde_json::from_str(json).unwrap();
|
||||
let pg_params = json_to_pg_text(vec![json]);
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec![Some(
|
||||
"{{true,false},{NULL,42},{\"foo\",\"bar\\\"-\\\\\"}}".to_owned()
|
||||
)]
|
||||
);
|
||||
// array of objects
|
||||
let json = r#"[{"foo": 1},{"bar": 2}]"#;
|
||||
let json: Value = serde_json::from_str(json).unwrap();
|
||||
let pg_params = json_to_pg_text(vec![json]);
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec![Some(r#"{"{\"foo\":1}","{\"bar\":2}"}"#.to_owned())]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_atomic_types_parse() {
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("foo"), &Type::TEXT).unwrap(),
|
||||
json!("foo")
|
||||
);
|
||||
assert_eq!(pg_text_to_json(None, &Type::TEXT).unwrap(), json!(null));
|
||||
assert_eq!(pg_text_to_json(Some("42"), &Type::INT4).unwrap(), json!(42));
|
||||
assert_eq!(pg_text_to_json(Some("42"), &Type::INT2).unwrap(), json!(42));
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("42"), &Type::INT8).unwrap(),
|
||||
json!("42")
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("42.42"), &Type::FLOAT8).unwrap(),
|
||||
json!(42.42)
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("42.42"), &Type::FLOAT4).unwrap(),
|
||||
json!(42.42)
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("NaN"), &Type::FLOAT4).unwrap(),
|
||||
json!("NaN")
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("Infinity"), &Type::FLOAT4).unwrap(),
|
||||
json!("Infinity")
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("-Infinity"), &Type::FLOAT4).unwrap(),
|
||||
json!("-Infinity")
|
||||
);
|
||||
|
||||
let json: Value =
|
||||
serde_json::from_str("{\"s\":\"str\",\"n\":42,\"f\":4.2,\"a\":[null,3,\"a\"]}")
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
pg_text_to_json(
|
||||
Some(r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#),
|
||||
&Type::JSONB
|
||||
)
|
||||
.unwrap(),
|
||||
json
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pg_array_parse_text() {
|
||||
fn pt(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, &Type::TEXT).unwrap()
|
||||
}
|
||||
assert_eq!(
|
||||
pt(r#"{"aa\"\\\,a",cha,"bbbb"}"#),
|
||||
json!(["aa\"\\,a", "cha", "bbbb"])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{{"foo","bar"},{"bee","bop"}}"#),
|
||||
json!([["foo", "bar"], ["bee", "bop"]])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{{{{"foo",NULL,"bop",bup}}}}"#),
|
||||
json!([[[["foo", null, "bop", "bup"]]]])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{{"1",2,3},{4,NULL,6},{NULL,NULL,NULL}}"#),
|
||||
json!([["1", "2", "3"], ["4", null, "6"], [null, null, null]])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pg_array_parse_bool() {
|
||||
fn pb(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, &Type::BOOL).unwrap()
|
||||
}
|
||||
assert_eq!(pb(r#"{t,f,t}"#), json!([true, false, true]));
|
||||
assert_eq!(pb(r#"{{t,f,t}}"#), json!([[true, false, true]]));
|
||||
assert_eq!(
|
||||
pb(r#"{{t,f},{f,t}}"#),
|
||||
json!([[true, false], [false, true]])
|
||||
);
|
||||
assert_eq!(
|
||||
pb(r#"{{t,NULL},{NULL,f}}"#),
|
||||
json!([[true, null], [null, false]])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pg_array_parse_numbers() {
|
||||
fn pn(pg_arr: &str, ty: &Type) -> Value {
|
||||
pg_array_parse(pg_arr, ty).unwrap()
|
||||
}
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT4), json!([1, 2, 3]));
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT2), json!([1, 2, 3]));
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT8), json!(["1", "2", "3"]));
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT4), json!([1.0, 2.0, 3.0]));
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT8), json!([1.0, 2.0, 3.0]));
|
||||
assert_eq!(
|
||||
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT4),
|
||||
json!([1.1, 2.2, 3.3])
|
||||
);
|
||||
assert_eq!(
|
||||
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT8),
|
||||
json!([1.1, 2.2, 3.3])
|
||||
);
|
||||
assert_eq!(
|
||||
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT4),
|
||||
json!(["NaN", "Infinity", "-Infinity"])
|
||||
);
|
||||
assert_eq!(
|
||||
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT8),
|
||||
json!(["NaN", "Infinity", "-Infinity"])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pg_array_with_decoration() {
|
||||
fn p(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, &Type::INT2).unwrap()
|
||||
}
|
||||
assert_eq!(
|
||||
p(r#"[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}"#),
|
||||
json!([[[1, 2, 3], [4, 5, 6]]])
|
||||
);
|
||||
}
|
||||
#[test]
|
||||
fn test_pg_array_parse_json() {
|
||||
fn pt(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, &Type::JSONB).unwrap()
|
||||
}
|
||||
assert_eq!(pt(r#"{"{}"}"#), json!([{}]));
|
||||
assert_eq!(
|
||||
pt(r#"{"{\"foo\": 1, \"bar\": 2}"}"#),
|
||||
json!([{"foo": 1, "bar": 2}])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{"{\"foo\": 1}", "{\"bar\": 2}"}"#),
|
||||
json!([{"foo": 1}, {"bar": 2}])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{{"{\"foo\": 1}", "{\"bar\": 2}"}}"#),
|
||||
json!([[{"foo": 1}, {"bar": 2}]])
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,118 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
|
||||
import backoff
|
||||
import psycopg2
|
||||
|
||||
CREATE_TABLE = """
|
||||
CREATE TABLE IF NOT EXISTS regress_test_results (
|
||||
id SERIAL PRIMARY KEY,
|
||||
reference CHAR(255),
|
||||
revision CHAR(40),
|
||||
build_type CHAR(16),
|
||||
data JSONB
|
||||
)
|
||||
"""
|
||||
|
||||
|
||||
def err(msg):
|
||||
print(f"error: {msg}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def get_connection_cursor():
|
||||
connstr = os.getenv("DATABASE_URL")
|
||||
if not connstr:
|
||||
err("DATABASE_URL environment variable is not set")
|
||||
|
||||
@backoff.on_exception(backoff.expo, psycopg2.OperationalError, max_time=150)
|
||||
def connect(connstr):
|
||||
conn = psycopg2.connect(connstr, connect_timeout=30)
|
||||
conn.autocommit = True
|
||||
return conn
|
||||
|
||||
conn = connect(connstr)
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
yield cur
|
||||
finally:
|
||||
if conn is not None:
|
||||
conn.close()
|
||||
|
||||
|
||||
def create_table(cur):
|
||||
cur.execute(CREATE_TABLE)
|
||||
|
||||
|
||||
def ingest_regress_test_result(
|
||||
cursor, reference: str, revision: str, build_type: str, data_file: Path
|
||||
):
|
||||
data = data_file.read_text()
|
||||
# In the JSON report we can have lines related to LazyFixture with escaped double-quote
|
||||
# It's hard to insert them into jsonb field as is, so replace \" with ' to make it easier for us
|
||||
#
|
||||
# "<LazyFixture \"vanilla_compare\">" -> "<LazyFixture 'vanilla_compare'>"
|
||||
data = re.sub(r'("<LazyFixture )\\"([^\\]+)\\"(>")', r"\g<1>'\g<2>'\g<3>", data)
|
||||
values = (
|
||||
reference,
|
||||
revision,
|
||||
build_type,
|
||||
data,
|
||||
)
|
||||
cursor.execute(
|
||||
"""
|
||||
INSERT INTO regress_test_results (
|
||||
reference,
|
||||
revision,
|
||||
build_type,
|
||||
data
|
||||
) VALUES (%s, %s, %s, %s)
|
||||
""",
|
||||
values,
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Regress test result uploader. \
|
||||
Database connection string should be provided via DATABASE_URL environment variable",
|
||||
)
|
||||
parser.add_argument("--initdb", action="store_true", help="Initialuze database")
|
||||
parser.add_argument(
|
||||
"--reference", type=str, required=True, help="git reference, for example refs/heads/main"
|
||||
)
|
||||
parser.add_argument("--revision", type=str, required=True, help="git revision")
|
||||
parser.add_argument(
|
||||
"--build-type", type=str, required=True, help="build type: release, debug or remote"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ingest", type=Path, required=True, help="Path to regress test result file"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
with get_connection_cursor() as cur:
|
||||
if args.initdb:
|
||||
create_table(cur)
|
||||
|
||||
if not args.ingest.exists():
|
||||
err(f"ingest path {args.ingest} does not exist")
|
||||
|
||||
ingest_regress_test_result(
|
||||
cur,
|
||||
reference=args.reference,
|
||||
revision=args.revision,
|
||||
build_type=args.build_type,
|
||||
data_file=args.ingest,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.getLogger("backoff").addHandler(logging.StreamHandler())
|
||||
main()
|
||||
@@ -3980,8 +3980,17 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
|
||||
# list files we're going to compare
|
||||
assert endpoint.pgdata_dir
|
||||
pgdata_files = list_files_to_compare(Path(endpoint.pgdata_dir))
|
||||
|
||||
restored_files = list_files_to_compare(restored_dir_path)
|
||||
|
||||
if pgdata_files != restored_files:
|
||||
# filter pg_xact and multixact files which are downloaded on demand
|
||||
pgdata_files = [
|
||||
f
|
||||
for f in pgdata_files
|
||||
if not f.startswith("pg_xact") and not f.startswith("pg_multixact")
|
||||
]
|
||||
|
||||
# check that file sets are equal
|
||||
assert pgdata_files == restored_files
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import json
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple, Union
|
||||
|
||||
import requests
|
||||
@@ -389,6 +390,20 @@ class PageserverHttpClient(requests.Session):
|
||||
)
|
||||
return res.text
|
||||
|
||||
def tenant_time_travel_remote_storage(
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
timestamp: datetime,
|
||||
done_if_after: datetime,
|
||||
):
|
||||
"""
|
||||
Issues a request to perform time travel operations on the remote storage
|
||||
"""
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/time_travel_remote_storage?travel_to={timestamp.isoformat()}Z&done_if_after={done_if_after.isoformat()}Z"
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
def timeline_list(
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
import time
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from mypy_boto3_s3.type_defs import ListObjectsV2OutputTypeDef, ObjectTypeDef
|
||||
from mypy_boto3_s3.type_defs import (
|
||||
EmptyResponseMetadataTypeDef,
|
||||
ListObjectsV2OutputTypeDef,
|
||||
ObjectTypeDef,
|
||||
)
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
@@ -346,6 +350,27 @@ def list_prefix(
|
||||
return response
|
||||
|
||||
|
||||
def enable_remote_storage_versioning(
|
||||
remote: RemoteStorage,
|
||||
) -> EmptyResponseMetadataTypeDef:
|
||||
"""
|
||||
Enable S3 versioning for the remote storage
|
||||
"""
|
||||
# local_fs has no
|
||||
assert isinstance(remote, S3Storage), "localfs is currently not supported"
|
||||
assert remote.client is not None
|
||||
|
||||
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
|
||||
response = remote.client.put_bucket_versioning(
|
||||
Bucket=remote.bucket_name,
|
||||
VersioningConfiguration={
|
||||
"MFADelete": "Disabled",
|
||||
"Status": "Enabled",
|
||||
},
|
||||
)
|
||||
return response
|
||||
|
||||
|
||||
def wait_tenant_status_404(
|
||||
pageserver_http: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -160,8 +160,9 @@ class LocalFsStorage:
|
||||
class S3Storage:
|
||||
bucket_name: str
|
||||
bucket_region: str
|
||||
access_key: str
|
||||
secret_key: str
|
||||
access_key: Optional[str]
|
||||
secret_key: Optional[str]
|
||||
aws_profile: Optional[str]
|
||||
prefix_in_bucket: str
|
||||
client: S3Client
|
||||
cleanup: bool
|
||||
@@ -170,10 +171,18 @@ class S3Storage:
|
||||
endpoint: Optional[str] = None
|
||||
|
||||
def access_env_vars(self) -> Dict[str, str]:
|
||||
return {
|
||||
"AWS_ACCESS_KEY_ID": self.access_key,
|
||||
"AWS_SECRET_ACCESS_KEY": self.secret_key,
|
||||
}
|
||||
if self.aws_profile is not None:
|
||||
return {
|
||||
"AWS_PROFILE": self.aws_profile,
|
||||
}
|
||||
if self.access_key is not None and self.secret_key is not None:
|
||||
return {
|
||||
"AWS_ACCESS_KEY_ID": self.access_key,
|
||||
"AWS_SECRET_ACCESS_KEY": self.secret_key,
|
||||
}
|
||||
raise RuntimeError(
|
||||
"Either AWS_PROFILE or (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY) have to be set for S3Storage"
|
||||
)
|
||||
|
||||
def to_string(self) -> str:
|
||||
return json.dumps(
|
||||
@@ -308,6 +317,7 @@ class RemoteStorageKind(str, enum.Enum):
|
||||
bucket_region=mock_region,
|
||||
access_key=access_key,
|
||||
secret_key=secret_key,
|
||||
aws_profile=None,
|
||||
prefix_in_bucket="",
|
||||
client=client,
|
||||
cleanup=False,
|
||||
@@ -317,12 +327,11 @@ class RemoteStorageKind(str, enum.Enum):
|
||||
assert self == RemoteStorageKind.REAL_S3
|
||||
|
||||
env_access_key = os.getenv("AWS_ACCESS_KEY_ID")
|
||||
assert env_access_key, "no aws access key provided"
|
||||
env_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
|
||||
assert env_secret_key, "no aws access key provided"
|
||||
|
||||
# session token is needed for local runs with sso auth
|
||||
session_token = os.getenv("AWS_SESSION_TOKEN")
|
||||
env_profile = os.getenv("AWS_PROFILE")
|
||||
assert (
|
||||
env_access_key and env_secret_key
|
||||
) or env_profile, "need to specify either access key and secret access key or profile"
|
||||
|
||||
bucket_name = bucket_name or os.getenv("REMOTE_STORAGE_S3_BUCKET")
|
||||
assert bucket_name is not None, "no remote storage bucket name provided"
|
||||
@@ -334,9 +343,6 @@ class RemoteStorageKind(str, enum.Enum):
|
||||
client = boto3.client(
|
||||
"s3",
|
||||
region_name=bucket_region,
|
||||
aws_access_key_id=env_access_key,
|
||||
aws_secret_access_key=env_secret_key,
|
||||
aws_session_token=session_token,
|
||||
)
|
||||
|
||||
return S3Storage(
|
||||
@@ -344,6 +350,7 @@ class RemoteStorageKind(str, enum.Enum):
|
||||
bucket_region=bucket_region,
|
||||
access_key=env_access_key,
|
||||
secret_key=env_secret_key,
|
||||
aws_profile=env_profile,
|
||||
prefix_in_bucket=prefix_in_bucket,
|
||||
client=client,
|
||||
cleanup=True,
|
||||
|
||||
111
test_runner/performance/test_lazy_startup.py
Normal file
111
test_runner/performance/test_lazy_startup.py
Normal file
@@ -0,0 +1,111 @@
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
# Start and measure duration with huge SLRU segments.
|
||||
# This test is similar to test_startup_simple, but it creates huge number of transactions
|
||||
# and records containing this XIDs. Autovacuum is disable for the table to prevent CLOG truncation.
|
||||
#
|
||||
# This test runs pretty quickly and can be informative when used in combination
|
||||
# with emulated network delay. Some useful delay commands:
|
||||
#
|
||||
# 1. Add 2msec delay to all localhost traffic
|
||||
# `sudo tc qdisc add dev lo root handle 1:0 netem delay 2msec`
|
||||
#
|
||||
# 2. Test that it works (you should see 4ms ping)
|
||||
# `ping localhost`
|
||||
#
|
||||
# 3. Revert back to normal
|
||||
# `sudo tc qdisc del dev lo root netem`
|
||||
#
|
||||
# NOTE this test might not represent the real startup time because the basebackup
|
||||
# for a large database might be larger if there's a lof of transaction metadata,
|
||||
# or safekeepers might need more syncing, or there might be more operations to
|
||||
# apply during config step, like more users, databases, or extensions. By default
|
||||
# we load extensions 'neon,pg_stat_statements,timescaledb,pg_cron', but in this
|
||||
# test we only load neon.
|
||||
@pytest.mark.timeout(1000)
|
||||
def test_lazy_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
lazy_tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"lazy_slru_download": "true",
|
||||
}
|
||||
)
|
||||
eager_tenant, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"lazy_slru_download": "false",
|
||||
}
|
||||
)
|
||||
tenants = [lazy_tenant, eager_tenant]
|
||||
slru = "lazy"
|
||||
for tenant in tenants:
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
endpoint.safe_psql("CREATE TABLE t (pk integer PRIMARY KEY, x integer)")
|
||||
endpoint.safe_psql("ALTER TABLE t SET (autovacuum_enabled = false)")
|
||||
endpoint.safe_psql("INSERT INTO t VALUES (1, 0)")
|
||||
endpoint.safe_psql(
|
||||
"""
|
||||
CREATE PROCEDURE updating() as
|
||||
$$
|
||||
DECLARE
|
||||
i integer;
|
||||
BEGIN
|
||||
FOR i IN 1..10000000 LOOP
|
||||
UPDATE t SET x = x + 1 WHERE pk=1;
|
||||
COMMIT;
|
||||
END LOOP;
|
||||
END
|
||||
$$ LANGUAGE plpgsql
|
||||
"""
|
||||
)
|
||||
endpoint.safe_psql("SET statement_timeout=0")
|
||||
endpoint.safe_psql("call updating()")
|
||||
|
||||
endpoint.stop()
|
||||
|
||||
# We do two iterations so we can see if the second startup is faster. It should
|
||||
# be because the compute node should already be configured with roles, databases,
|
||||
# extensions, etc from the first run.
|
||||
for i in range(2):
|
||||
# Start
|
||||
with zenbenchmark.record_duration(f"{slru}_{i}_start"):
|
||||
endpoint.start()
|
||||
|
||||
with zenbenchmark.record_duration(f"{slru}_{i}_select"):
|
||||
sum = endpoint.safe_psql("select sum(x) from t")[0][0]
|
||||
assert sum == 10000000
|
||||
|
||||
# Get metrics
|
||||
metrics = requests.get(f"http://localhost:{endpoint.http_port}/metrics.json").json()
|
||||
durations = {
|
||||
"wait_for_spec_ms": f"{slru}_{i}_wait_for_spec",
|
||||
"sync_safekeepers_ms": f"{slru}_{i}_sync_safekeepers",
|
||||
"sync_sk_check_ms": f"{slru}_{i}_sync_sk_check",
|
||||
"basebackup_ms": f"{slru}_{i}_basebackup",
|
||||
"start_postgres_ms": f"{slru}_{i}_start_postgres",
|
||||
"config_ms": f"{slru}_{i}_config",
|
||||
"total_startup_ms": f"{slru}_{i}_total_startup",
|
||||
}
|
||||
for key, name in durations.items():
|
||||
value = metrics[key]
|
||||
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
basebackup_bytes = metrics["basebackup_bytes"]
|
||||
zenbenchmark.record(
|
||||
f"{slru}_{i}_basebackup_bytes",
|
||||
basebackup_bytes,
|
||||
"bytes",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
# Stop so we can restart
|
||||
endpoint.stop()
|
||||
|
||||
# Imitate optimizations that console would do for the second start
|
||||
endpoint.respec(skip_pg_catalog_updates=True)
|
||||
slru = "eager"
|
||||
@@ -173,6 +173,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"image_creation_threshold": 7,
|
||||
"pitr_interval": "1m",
|
||||
"lagging_wal_timeout": "23m",
|
||||
"lazy_slru_download": True,
|
||||
"max_lsn_wal_lag": 230000,
|
||||
"min_resident_size_override": 23,
|
||||
"trace_read_requests": True,
|
||||
|
||||
@@ -9,14 +9,14 @@ def test_compute_pageserver_connection_stress(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.append(".*simulated connection error.*")
|
||||
|
||||
# Enable failpoint before starting everything else up so that we exercise the retry
|
||||
# on fetching basebackup
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
pageserver_http.configure_failpoints(("simulated-bad-compute-connection", "50%return(15)"))
|
||||
|
||||
env.neon_cli.create_branch("test_compute_pageserver_connection_stress")
|
||||
endpoint = env.endpoints.create_start("test_compute_pageserver_connection_stress")
|
||||
|
||||
# Enable failpoint after starting everything else up so that loading initial
|
||||
# basebackup doesn't fail
|
||||
pageserver_http.configure_failpoints(("simulated-bad-compute-connection", "50%return(15)"))
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
|
||||
121
test_runner/regress/test_s3_restore.py
Normal file
121
test_runner/regress/test_s3_restore.py
Normal file
@@ -0,0 +1,121 @@
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
)
|
||||
from fixtures.pageserver.utils import (
|
||||
MANY_SMALL_LAYERS_TENANT_CONFIG,
|
||||
assert_prefix_empty,
|
||||
enable_remote_storage_versioning,
|
||||
poll_for_remote_storage_iterations,
|
||||
tenant_delete_wait_completed,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind, s3_storage
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.utils import run_pg_bench_small
|
||||
|
||||
|
||||
def test_tenant_s3_restore(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
remote_storage_kind = s3_storage()
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
# Mock S3 doesn't have versioning enabled by default, enable it
|
||||
# (also do it before there is any writes to the bucket)
|
||||
if remote_storage_kind == RemoteStorageKind.MOCK_S3:
|
||||
remote_storage = neon_env_builder.pageserver_remote_storage
|
||||
assert remote_storage, "remote storage not configured"
|
||||
enable_remote_storage_versioning(remote_storage)
|
||||
pytest.skip("moto doesn't support self-copy: https://github.com/getmoto/moto/issues/7300")
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# The deletion queue will complain when it encounters simulated S3 errors
|
||||
".*deletion executor: DeleteObjects request failed.*",
|
||||
# lucky race with stopping from flushing a layer we fail to schedule any uploads
|
||||
".*layer flush task.+: could not flush frozen layer: update_metadata_file",
|
||||
]
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
# Default tenant and the one we created
|
||||
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1
|
||||
|
||||
# create two timelines one being the parent of another, both with non-trivial data
|
||||
parent = None
|
||||
last_flush_lsns = []
|
||||
|
||||
for timeline in ["first", "second"]:
|
||||
timeline_id = env.neon_cli.create_branch(
|
||||
timeline, tenant_id=tenant_id, ancestor_branch_name=parent
|
||||
)
|
||||
with env.endpoints.create_start(timeline, tenant_id=tenant_id) as endpoint:
|
||||
run_pg_bench_small(pg_bin, endpoint.connstr())
|
||||
endpoint.safe_psql(f"CREATE TABLE created_{timeline}(id integer);")
|
||||
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
last_flush_lsns.append(last_flush_lsn)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
|
||||
parent = timeline
|
||||
|
||||
# These sleeps are important because they fend off differences in clocks between us and S3
|
||||
time.sleep(4)
|
||||
ts_before_deletion = datetime.now(tz=timezone.utc).replace(tzinfo=None)
|
||||
time.sleep(4)
|
||||
|
||||
assert (
|
||||
ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1
|
||||
), "tenant removed before we deletion was issued"
|
||||
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
|
||||
tenant_delete_wait_completed(ps_http, tenant_id, iterations)
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
assert (
|
||||
ps_http.get_metric_value("pageserver_tenant_manager_slots") == 0
|
||||
), "tenant removed before we deletion was issued"
|
||||
env.attachment_service.attach_hook_drop(tenant_id)
|
||||
|
||||
tenant_path = env.pageserver.tenant_dir(tenant_id)
|
||||
assert not tenant_path.exists()
|
||||
|
||||
assert_prefix_empty(
|
||||
neon_env_builder.pageserver_remote_storage,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(tenant_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
time.sleep(4)
|
||||
ts_after_deletion = datetime.now(tz=timezone.utc).replace(tzinfo=None)
|
||||
time.sleep(4)
|
||||
|
||||
ps_http.tenant_time_travel_remote_storage(
|
||||
tenant_id, timestamp=ts_before_deletion, done_if_after=ts_after_deletion
|
||||
)
|
||||
|
||||
generation = env.attachment_service.attach_hook_issue(tenant_id, env.pageserver.id)
|
||||
|
||||
ps_http.tenant_attach(tenant_id, generation=generation)
|
||||
env.pageserver.quiesce_tenants()
|
||||
|
||||
for i, timeline in enumerate(["first", "second"]):
|
||||
with env.endpoints.create_start(timeline, tenant_id=tenant_id) as endpoint:
|
||||
endpoint.safe_psql(f"SELECT * FROM created_{timeline};")
|
||||
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
expected_last_flush_lsn = last_flush_lsns[i]
|
||||
# There might be some activity that advances the lsn so we can't use a strict equality check
|
||||
assert last_flush_lsn >= expected_last_flush_lsn, "last_flush_lsn too old"
|
||||
|
||||
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1
|
||||
@@ -376,11 +376,6 @@ def test_create_churn_during_restart(neon_env_builder: NeonEnvBuilder):
|
||||
# so we allow it to log at WARN, even if it is occasionally a false positive.
|
||||
env.pageserver.allowed_errors.append(".*failed to freeze and flush.*")
|
||||
|
||||
# When we shut down a tenant during a timeline creation, initdb is not cancelled, we wait
|
||||
# for it to complete (since https://github.com/neondatabase/neon/pull/6451). This means
|
||||
# that shutdown can be delayed by >=1s on debug builds where initdb takes a long time to run.
|
||||
env.pageserver.allowed_errors.append(".*still waiting, taking longer than expected... gate.*")
|
||||
|
||||
def create_bg(delay_ms):
|
||||
time.sleep(delay_ms / 1000.0)
|
||||
try:
|
||||
|
||||
@@ -60,6 +60,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
|
||||
match msg {
|
||||
PagestreamFeMessage::Exists(_) => {}
|
||||
PagestreamFeMessage::Nblocks(_) => {}
|
||||
PagestreamFeMessage::GetSlruSegment(_) => {}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
total += 1;
|
||||
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 3de48ce3d9...be7a65fe67
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: b089a8a02c...81e16cd537
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: cf302768b2...f7ea954989
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"postgres-v16": "cf302768b2890569956641e0e5ba112ae1445351",
|
||||
"postgres-v15": "b089a8a02c9f6f4379883fddb33cf10a3aa0b14f",
|
||||
"postgres-v14": "3de48ce3d9c1f4fac1cdc7029487f8db9e537eac"
|
||||
"postgres-v16": "f7ea954989a2e7901f858779cff55259f203479a",
|
||||
"postgres-v15": "81e16cd537053f49e175d4a08ab7c8aec3d9b535",
|
||||
"postgres-v14": "be7a65fe67dc81d85bbcbebb13e00d94715f4b88"
|
||||
}
|
||||
|
||||
@@ -174,11 +174,10 @@ build: |
|
||||
libtool \
|
||||
pkg-config
|
||||
|
||||
# Note, we use pgbouncer from neondatabase/pgbouncer fork, which could contain extra commits.
|
||||
# Use `dist_man_MANS=` to skip manpage generation (which requires python3/pandoc)
|
||||
ENV PGBOUNCER_TAG pgbouncer_1_21_0-neon-1
|
||||
ENV PGBOUNCER_TAG pgbouncer_1_22_0
|
||||
RUN set -e \
|
||||
&& git clone --recurse-submodules --depth 1 --branch ${PGBOUNCER_TAG} https://github.com/neondatabase/pgbouncer.git pgbouncer \
|
||||
&& git clone --recurse-submodules --depth 1 --branch ${PGBOUNCER_TAG} https://github.com/pgbouncer/pgbouncer.git pgbouncer \
|
||||
&& cd pgbouncer \
|
||||
&& ./autogen.sh \
|
||||
&& LDFLAGS=-static ./configure --prefix=/usr/local/pgbouncer --without-openssl \
|
||||
|
||||
Reference in New Issue
Block a user