mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 12:30:38 +00:00
Compare commits
8 Commits
arpad/lsn_
...
problame/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f72d063f3 | ||
|
|
c6d8c7cb0a | ||
|
|
831588b5aa | ||
|
|
47a8e9711f | ||
|
|
de9436518e | ||
|
|
fb92f8fa91 | ||
|
|
08f682b217 | ||
|
|
18e8baab5b |
@@ -1,17 +1,3 @@
|
||||
# The binaries are really slow, if you compile them in 'dev' mode with the defaults.
|
||||
# Enable some optimizations even in 'dev' mode, to make tests faster. The basic
|
||||
# optimizations enabled by "opt-level=1" don't affect debuggability too much.
|
||||
#
|
||||
# See https://www.reddit.com/r/rust/comments/gvrgca/this_is_a_neat_trick_for_getting_good_runtime/
|
||||
#
|
||||
[profile.dev.package."*"]
|
||||
# Set the default for dependencies in Development mode.
|
||||
opt-level = 3
|
||||
|
||||
[profile.dev]
|
||||
# Turn on a small amount of optimization in Development mode.
|
||||
opt-level = 1
|
||||
|
||||
[build]
|
||||
# This is only present for local builds, as it will be overridden
|
||||
# by the RUSTDOCFLAGS env var in CI.
|
||||
|
||||
5
.github/ISSUE_TEMPLATE/epic-template.md
vendored
5
.github/ISSUE_TEMPLATE/epic-template.md
vendored
@@ -17,9 +17,8 @@ assignees: ''
|
||||
## Implementation ideas
|
||||
|
||||
|
||||
```[tasklist]
|
||||
### Tasks
|
||||
```
|
||||
## Tasks
|
||||
- [ ]
|
||||
|
||||
|
||||
## Other related tasks and Epics
|
||||
|
||||
2
.github/actionlint.yml
vendored
2
.github/actionlint.yml
vendored
@@ -1,7 +1,5 @@
|
||||
self-hosted-runner:
|
||||
labels:
|
||||
- arm64
|
||||
- dev
|
||||
- gen3
|
||||
- large
|
||||
- small
|
||||
|
||||
5
.github/workflows/build_and_test.yml
vendored
5
.github/workflows/build_and_test.yml
vendored
@@ -172,10 +172,10 @@ jobs:
|
||||
# https://github.com/EmbarkStudios/cargo-deny
|
||||
- name: Check rust licenses/bans/advisories/sources
|
||||
if: ${{ !cancelled() }}
|
||||
run: cargo deny check --hide-inclusion-graph
|
||||
run: cargo deny check
|
||||
|
||||
build-neon:
|
||||
needs: [ check-permissions, tag ]
|
||||
needs: [ check-permissions ]
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
@@ -187,7 +187,6 @@ jobs:
|
||||
env:
|
||||
BUILD_TYPE: ${{ matrix.build_type }}
|
||||
GIT_VERSION: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
steps:
|
||||
- name: Fix git ownership
|
||||
|
||||
181
.github/workflows/neon_extra_builds.yml
vendored
181
.github/workflows/neon_extra_builds.yml
vendored
@@ -21,10 +21,7 @@ env:
|
||||
|
||||
jobs:
|
||||
check-macos-build:
|
||||
if: |
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
|
||||
github.ref_name == 'main'
|
||||
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos')
|
||||
timeout-minutes: 90
|
||||
runs-on: macos-latest
|
||||
|
||||
@@ -115,182 +112,8 @@ jobs:
|
||||
- name: Check that no warnings are produced
|
||||
run: ./run_clippy.sh
|
||||
|
||||
check-linux-arm-build:
|
||||
timeout-minutes: 90
|
||||
runs-on: [ self-hosted, dev, arm64 ]
|
||||
|
||||
env:
|
||||
# Use release build only, to have less debug info around
|
||||
# Hence keeping target/ (and general cache size) smaller
|
||||
BUILD_TYPE: release
|
||||
CARGO_FEATURES: --features testing
|
||||
CARGO_FLAGS: --locked --release
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
||||
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
options: --init
|
||||
|
||||
steps:
|
||||
- name: Fix git ownership
|
||||
run: |
|
||||
# Workaround for `fatal: detected dubious ownership in repository at ...`
|
||||
#
|
||||
# Use both ${{ github.workspace }} and ${GITHUB_WORKSPACE} because they're different on host and in containers
|
||||
# Ref https://github.com/actions/checkout/issues/785
|
||||
#
|
||||
git config --global --add safe.directory ${{ github.workspace }}
|
||||
git config --global --add safe.directory ${GITHUB_WORKSPACE}
|
||||
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 1
|
||||
|
||||
- name: Set pg 14 revision for caching
|
||||
id: pg_v14_rev
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v14) >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Set pg 15 revision for caching
|
||||
id: pg_v15_rev
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v15) >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Set pg 16 revision for caching
|
||||
id: pg_v16_rev
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v16) >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Set env variables
|
||||
run: |
|
||||
echo "CARGO_HOME=${GITHUB_WORKSPACE}/.cargo" >> $GITHUB_ENV
|
||||
|
||||
- name: Cache postgres v14 build
|
||||
id: cache_pg_14
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: pg_install/v14
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache postgres v15 build
|
||||
id: cache_pg_15
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: pg_install/v15
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache postgres v16 build
|
||||
id: cache_pg_16
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: pg_install/v16
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Build postgres v14
|
||||
if: steps.cache_pg_14.outputs.cache-hit != 'true'
|
||||
run: mold -run make postgres-v14 -j$(nproc)
|
||||
|
||||
- name: Build postgres v15
|
||||
if: steps.cache_pg_15.outputs.cache-hit != 'true'
|
||||
run: mold -run make postgres-v15 -j$(nproc)
|
||||
|
||||
- name: Build postgres v16
|
||||
if: steps.cache_pg_16.outputs.cache-hit != 'true'
|
||||
run: mold -run make postgres-v16 -j$(nproc)
|
||||
|
||||
- name: Build neon extensions
|
||||
run: mold -run make neon-pg-ext -j$(nproc)
|
||||
|
||||
- name: Build walproposer-lib
|
||||
run: mold -run make walproposer-lib -j$(nproc)
|
||||
|
||||
- name: Run cargo build
|
||||
run: |
|
||||
mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
|
||||
|
||||
- name: Run cargo test
|
||||
run: |
|
||||
cargo test $CARGO_FLAGS $CARGO_FEATURES
|
||||
|
||||
# Run separate tests for real S3
|
||||
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
|
||||
export REMOTE_STORAGE_S3_BUCKET=neon-github-public-dev
|
||||
export REMOTE_STORAGE_S3_REGION=eu-central-1
|
||||
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
|
||||
cargo test $CARGO_FLAGS --package remote_storage --test test_real_s3
|
||||
|
||||
# Run separate tests for real Azure Blob Storage
|
||||
# XXX: replace region with `eu-central-1`-like region
|
||||
export ENABLE_REAL_AZURE_REMOTE_STORAGE=y
|
||||
export AZURE_STORAGE_ACCOUNT="${{ secrets.AZURE_STORAGE_ACCOUNT_DEV }}"
|
||||
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
|
||||
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
|
||||
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
|
||||
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
|
||||
cargo test $CARGO_FLAGS --package remote_storage --test test_real_azure
|
||||
|
||||
check-codestyle-rust-arm:
|
||||
timeout-minutes: 90
|
||||
runs-on: [ self-hosted, dev, arm64 ]
|
||||
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
options: --init
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: true
|
||||
fetch-depth: 1
|
||||
|
||||
# Some of our rust modules use FFI and need those to be checked
|
||||
- name: Get postgres headers
|
||||
run: make postgres-headers -j$(nproc)
|
||||
|
||||
# cargo hack runs the given cargo subcommand (clippy in this case) for all feature combinations.
|
||||
# This will catch compiler & clippy warnings in all feature combinations.
|
||||
# TODO: use cargo hack for build and test as well, but, that's quite expensive.
|
||||
# NB: keep clippy args in sync with ./run_clippy.sh
|
||||
- run: |
|
||||
CLIPPY_COMMON_ARGS="$( source .neon_clippy_args; echo "$CLIPPY_COMMON_ARGS")"
|
||||
if [ "$CLIPPY_COMMON_ARGS" = "" ]; then
|
||||
echo "No clippy args found in .neon_clippy_args"
|
||||
exit 1
|
||||
fi
|
||||
echo "CLIPPY_COMMON_ARGS=${CLIPPY_COMMON_ARGS}" >> $GITHUB_ENV
|
||||
- name: Run cargo clippy (debug)
|
||||
run: cargo hack --feature-powerset clippy $CLIPPY_COMMON_ARGS
|
||||
- name: Run cargo clippy (release)
|
||||
run: cargo hack --feature-powerset clippy --release $CLIPPY_COMMON_ARGS
|
||||
|
||||
- name: Check documentation generation
|
||||
run: cargo doc --workspace --no-deps --document-private-items
|
||||
env:
|
||||
RUSTDOCFLAGS: "-Dwarnings -Arustdoc::private_intra_doc_links"
|
||||
|
||||
# Use `${{ !cancelled() }}` to run quck tests after the longer clippy run
|
||||
- name: Check formatting
|
||||
if: ${{ !cancelled() }}
|
||||
run: cargo fmt --all -- --check
|
||||
|
||||
# https://github.com/facebookincubator/cargo-guppy/tree/bec4e0eb29dcd1faac70b1b5360267fc02bf830e/tools/cargo-hakari#2-keep-the-workspace-hack-up-to-date-in-ci
|
||||
- name: Check rust dependencies
|
||||
if: ${{ !cancelled() }}
|
||||
run: |
|
||||
cargo hakari generate --diff # workspace-hack Cargo.toml is up-to-date
|
||||
cargo hakari manage-deps --dry-run # all workspace crates depend on workspace-hack
|
||||
|
||||
# https://github.com/EmbarkStudios/cargo-deny
|
||||
- name: Check rust licenses/bans/advisories/sources
|
||||
if: ${{ !cancelled() }}
|
||||
run: cargo deny check
|
||||
|
||||
gather-rust-build-stats:
|
||||
if: |
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-stats') ||
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
|
||||
github.ref_name == 'main'
|
||||
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-extra-build-stats')
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
|
||||
655
Cargo.lock
generated
655
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
17
Cargo.toml
17
Cargo.toml
@@ -48,7 +48,6 @@ async-trait = "0.1"
|
||||
aws-config = { version = "0.56", default-features = false, features=["rustls"] }
|
||||
aws-sdk-s3 = "0.29"
|
||||
aws-smithy-http = "0.56"
|
||||
aws-smithy-async = { version = "0.56", default-features = false, features=["rt-tokio"] }
|
||||
aws-credential-types = "0.56"
|
||||
aws-types = "0.56"
|
||||
axum = { version = "0.6.20", features = ["ws"] }
|
||||
@@ -67,7 +66,7 @@ comfy-table = "6.1"
|
||||
const_format = "0.2"
|
||||
crc32c = "0.6"
|
||||
crossbeam-utils = "0.8.5"
|
||||
dashmap = { version = "5.5.0", features = ["raw-api"] }
|
||||
dashmap = "5.5.0"
|
||||
either = "1.8"
|
||||
enum-map = "2.4.2"
|
||||
enumset = "1.0.12"
|
||||
@@ -83,7 +82,7 @@ hex = "0.4"
|
||||
hex-literal = "0.4"
|
||||
hmac = "0.12.1"
|
||||
hostname = "0.3.1"
|
||||
http-types = { version = "2", default-features = false }
|
||||
http-types = "2"
|
||||
humantime = "2.1"
|
||||
humantime-serde = "1.1.1"
|
||||
hyper = "0.14"
|
||||
@@ -164,11 +163,11 @@ env_logger = "0.10"
|
||||
log = "0.4"
|
||||
|
||||
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
|
||||
## Other git libraries
|
||||
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
|
||||
@@ -205,7 +204,7 @@ tonic-build = "0.9"
|
||||
|
||||
# This is only needed for proxy's tests.
|
||||
# TODO: we should probably fork `tokio-postgres-rustls` instead.
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
|
||||
################# Binary contents sections
|
||||
|
||||
|
||||
22
deny.toml
22
deny.toml
@@ -74,30 +74,10 @@ highlight = "all"
|
||||
workspace-default-features = "allow"
|
||||
external-default-features = "allow"
|
||||
allow = []
|
||||
|
||||
deny = []
|
||||
skip = []
|
||||
skip-tree = []
|
||||
|
||||
[[bans.deny]]
|
||||
# we use tokio, the same rationale applies for async-{io,waker,global-executor,executor,channel,lock}, smol
|
||||
# if you find yourself here while adding a dependency, try "default-features = false", ask around on #rust
|
||||
name = "async-std"
|
||||
|
||||
[[bans.deny]]
|
||||
name = "async-io"
|
||||
|
||||
[[bans.deny]]
|
||||
name = "async-waker"
|
||||
|
||||
[[bans.deny]]
|
||||
name = "async-global-executor"
|
||||
|
||||
[[bans.deny]]
|
||||
name = "async-executor"
|
||||
|
||||
[[bans.deny]]
|
||||
name = "smol"
|
||||
|
||||
# This section is considered when running `cargo deny check sources`.
|
||||
# More documentation about the 'sources' section can be found here:
|
||||
# https://embarkstudios.github.io/cargo-deny/checks/sources/cfg.html
|
||||
|
||||
@@ -18,7 +18,7 @@ use utils::{
|
||||
|
||||
use crate::reltag::RelTag;
|
||||
use anyhow::bail;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
|
||||
/// The state of a tenant in this pageserver.
|
||||
///
|
||||
@@ -371,6 +371,8 @@ pub struct TenantInfo {
|
||||
/// If a layer is present in both local FS and S3, it counts only once.
|
||||
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
|
||||
pub attachment_status: TenantAttachmentStatus,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generation: Option<u32>,
|
||||
}
|
||||
|
||||
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
|
||||
@@ -515,6 +517,8 @@ pub enum HistoricLayerInfo {
|
||||
lsn_end: Lsn,
|
||||
remote: bool,
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
remote_path: Option<String>,
|
||||
},
|
||||
Image {
|
||||
layer_file_name: String,
|
||||
@@ -523,6 +527,8 @@ pub enum HistoricLayerInfo {
|
||||
lsn_start: Lsn,
|
||||
remote: bool,
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
remote_path: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -767,6 +773,36 @@ impl PagestreamBeMessage {
|
||||
|
||||
bytes.into()
|
||||
}
|
||||
|
||||
pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
|
||||
let mut buf = buf.reader();
|
||||
let msg_tag = buf.read_u8()?;
|
||||
match msg_tag {
|
||||
100 => todo!(),
|
||||
101 => todo!(),
|
||||
102 => {
|
||||
let buf = buf.get_ref();
|
||||
/* TODO use constant */
|
||||
if buf.len() == 8192 {
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page: buf.clone(),
|
||||
}))
|
||||
} else {
|
||||
anyhow::bail!("invalid page size: {}", buf.len());
|
||||
}
|
||||
}
|
||||
103 => {
|
||||
let buf = buf.get_ref();
|
||||
let cstr = std::ffi::CStr::from_bytes_until_nul(buf)?;
|
||||
let rust_str = cstr.to_str()?;
|
||||
Ok(PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: rust_str.to_owned(),
|
||||
}))
|
||||
}
|
||||
104 => todo!(),
|
||||
_ => bail!("unknown tag: {:?}", msg_tag),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -832,6 +868,7 @@ mod tests {
|
||||
state: TenantState::Active,
|
||||
current_physical_size: Some(42),
|
||||
attachment_status: TenantAttachmentStatus::Attached,
|
||||
generation: None,
|
||||
};
|
||||
let expected_active = json!({
|
||||
"id": original_active.id.to_string(),
|
||||
@@ -852,6 +889,7 @@ mod tests {
|
||||
},
|
||||
current_physical_size: Some(42),
|
||||
attachment_status: TenantAttachmentStatus::Attached,
|
||||
generation: None,
|
||||
};
|
||||
let expected_broken = json!({
|
||||
"id": original_broken.id.to_string(),
|
||||
|
||||
@@ -8,7 +8,6 @@ license.workspace = true
|
||||
anyhow.workspace = true
|
||||
async-trait.workspace = true
|
||||
once_cell.workspace = true
|
||||
aws-smithy-async.workspace = true
|
||||
aws-smithy-http.workspace = true
|
||||
aws-types.workspace = true
|
||||
aws-config.workspace = true
|
||||
|
||||
@@ -81,6 +81,12 @@ impl std::fmt::Display for RemotePath {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RemotePath> for String {
|
||||
fn from(val: RemotePath) -> Self {
|
||||
val.0.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl RemotePath {
|
||||
pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
|
||||
anyhow::ensure!(
|
||||
@@ -102,7 +108,7 @@ impl RemotePath {
|
||||
self.0.file_name()
|
||||
}
|
||||
|
||||
pub fn join(&self, segment: &Utf8Path) -> Self {
|
||||
pub fn join<P: AsRef<Utf8Path>>(&self, segment: P) -> Self {
|
||||
Self(self.0.join(segment))
|
||||
}
|
||||
|
||||
|
||||
@@ -4,27 +4,23 @@
|
||||
//! allowing multiple api users to independently work with the same S3 bucket, if
|
||||
//! their bucket prefixes are both specified and different.
|
||||
|
||||
use std::{borrow::Cow, sync::Arc};
|
||||
use std::borrow::Cow;
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_config::{
|
||||
environment::credentials::EnvironmentVariableCredentialsProvider,
|
||||
imds::credentials::ImdsCredentialsProvider,
|
||||
meta::credentials::CredentialsProviderChain,
|
||||
provider_config::ProviderConfig,
|
||||
retry::{RetryConfigBuilder, RetryMode},
|
||||
web_identity_token::WebIdentityTokenCredentialsProvider,
|
||||
imds::credentials::ImdsCredentialsProvider, meta::credentials::CredentialsProviderChain,
|
||||
provider_config::ProviderConfig, web_identity_token::WebIdentityTokenCredentialsProvider,
|
||||
};
|
||||
use aws_credential_types::cache::CredentialsCache;
|
||||
use aws_sdk_s3::{
|
||||
config::{AsyncSleep, Config, Region, SharedAsyncSleep},
|
||||
config::{Config, Region},
|
||||
error::SdkError,
|
||||
operation::get_object::GetObjectError,
|
||||
primitives::ByteStream,
|
||||
types::{Delete, ObjectIdentifier},
|
||||
Client,
|
||||
};
|
||||
use aws_smithy_async::rt::sleep::TokioSleep;
|
||||
use aws_smithy_http::body::SdkBody;
|
||||
use hyper::Body;
|
||||
use scopeguard::ScopeGuard;
|
||||
@@ -87,23 +83,10 @@ impl S3Bucket {
|
||||
.or_else("imds", ImdsCredentialsProvider::builder().build())
|
||||
};
|
||||
|
||||
// AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
|
||||
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
|
||||
|
||||
// We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
|
||||
// responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
|
||||
// attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
|
||||
let mut retry_config = RetryConfigBuilder::new();
|
||||
retry_config
|
||||
.set_max_attempts(Some(1))
|
||||
.set_mode(Some(RetryMode::Adaptive));
|
||||
|
||||
let mut config_builder = Config::builder()
|
||||
.region(region)
|
||||
.credentials_cache(CredentialsCache::lazy())
|
||||
.credentials_provider(credentials_provider)
|
||||
.sleep_impl(SharedAsyncSleep::from(sleep_impl))
|
||||
.retry_config(retry_config.build());
|
||||
.credentials_provider(credentials_provider);
|
||||
|
||||
if let Some(custom_endpoint) = aws_config.endpoint.clone() {
|
||||
config_builder = config_builder
|
||||
|
||||
@@ -1,156 +0,0 @@
|
||||
/// An exhausting, injective counter with exponential search properties
|
||||
///
|
||||
/// The type implements an [`Iterator`] that yields numbers from the range
|
||||
/// from 0 to a pre-defined maximum.
|
||||
///
|
||||
/// * It is *exhausting* in that it visits all numbers inside `0..max`.
|
||||
/// * It is *injective* in that all numbers are visited only once.
|
||||
/// * It has *exponential search properties* in that it iterates over the
|
||||
/// number range in a fractal pattern.
|
||||
///
|
||||
/// This iterator is well suited for finding a pivot in algorithms that
|
||||
/// want centered pivots: its output is heavily biased towards starting
|
||||
/// with small numbers.
|
||||
///
|
||||
/// If the specified maximum is close to an exponent
|
||||
/// of two we will quickly reach that exponent and therefore we will
|
||||
/// quickly close in on the maximum, contrary to the claim of the counter
|
||||
/// providing centered pivots. For most algorithms, this is not a problem
|
||||
/// though as long as this is only a limited number of times, which it does,
|
||||
/// as the next maximum will be an exponent of two, for which the next
|
||||
/// smaller one is precisely its half of. So we will be able to perform a
|
||||
/// binary search in all instances.
|
||||
pub struct ExpCounter {
|
||||
/// The (exclusive) upper limit of our search
|
||||
max: u64,
|
||||
/// The base that increases after each round trip
|
||||
base: u64,
|
||||
/// An increasing offset, always a power of two
|
||||
offs: u64,
|
||||
/// Iteration counter
|
||||
i: u64,
|
||||
}
|
||||
|
||||
impl ExpCounter {
|
||||
/// Creates an iterator that uses `ExpCounter` for the first half of the
|
||||
/// range and a linear range counter for the second half.
|
||||
pub fn with_max_and_linear_search(max: u64) -> impl Iterator<Item = u64> {
|
||||
let linear_start = max / 2;
|
||||
ExpCounter::with_max(linear_start).chain(linear_start..max)
|
||||
}
|
||||
/// Creates a new `ExpCounter` instance that counts to the (exclusive) maximum
|
||||
pub fn with_max(max: u64) -> Self {
|
||||
Self {
|
||||
max,
|
||||
base: 0,
|
||||
offs: 1,
|
||||
i: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for ExpCounter {
|
||||
type Item = u64;
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.i >= self.max {
|
||||
return None;
|
||||
}
|
||||
if self.i == 0 {
|
||||
// Special casing this is easier than adding 0 as the first in some other fashion
|
||||
self.i += 1;
|
||||
return Some(0);
|
||||
}
|
||||
let to_yield = self.base + self.offs;
|
||||
self.offs *= 2;
|
||||
|
||||
if self.base + self.offs >= self.max {
|
||||
self.base += 1;
|
||||
self.offs = 1;
|
||||
while self.base > self.offs {
|
||||
self.offs *= 2;
|
||||
}
|
||||
if (self.base + self.offs).count_ones() == 1 {
|
||||
self.offs *= 2;
|
||||
}
|
||||
}
|
||||
self.i += 1;
|
||||
|
||||
Some(to_yield)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn dupes_and_missing(list: &mut [u64], max: u64) -> (usize, usize) {
|
||||
let mut contained_in_list = HashSet::<u64>::new();
|
||||
let mut dupes = 0;
|
||||
let mut missing = 0;
|
||||
|
||||
for v in list.iter() {
|
||||
println!("{v:4 } = {v:010b}");
|
||||
}
|
||||
println!("Yielded {} items", list.len());
|
||||
println!("The duplicates:");
|
||||
list.sort();
|
||||
let mut prev = None;
|
||||
for v in list.iter() {
|
||||
contained_in_list.insert(*v);
|
||||
if Some(*v) == prev {
|
||||
dupes += 1;
|
||||
println!("{v:3 } = {v:08b}");
|
||||
}
|
||||
prev = Some(*v);
|
||||
}
|
||||
println!("The missing numbers:");
|
||||
for v in 0..max {
|
||||
if !contained_in_list.contains(&v) {
|
||||
println!("{v:3 } = {v:010b}");
|
||||
missing += 1;
|
||||
}
|
||||
}
|
||||
(dupes, missing)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn to_40() {
|
||||
let max = 40;
|
||||
let list = ExpCounter::with_max(max).collect::<Vec<_>>();
|
||||
let expected = [
|
||||
0, 1, 2, 4, 8, 16, 32, 3, 5, 9, 17, 33, 6, 10, 18, 34, 7, 11, 19, 35, 12, 20, 36, 13,
|
||||
21, 37, 14, 22, 38, 15, 23, 39, 24, 25, 26, 27, 28, 29, 30, 31,
|
||||
];
|
||||
assert_eq!(list, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn to_64() {
|
||||
let max = 64;
|
||||
let mut list = ExpCounter::with_max(max).collect::<Vec<_>>();
|
||||
assert_eq!(dupes_and_missing(&mut list, max), (0, 0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn to_100() {
|
||||
let max = 100;
|
||||
let mut list = ExpCounter::with_max(max).collect::<Vec<_>>();
|
||||
assert_eq!(dupes_and_missing(&mut list, max), (0, 0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn to_127() {
|
||||
let max = 127;
|
||||
let mut list = ExpCounter::with_max(max).collect::<Vec<_>>();
|
||||
assert_eq!(dupes_and_missing(&mut list, max), (0, 0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn to_12345() {
|
||||
let max = 12345;
|
||||
let mut list = ExpCounter::with_max(max).collect::<Vec<_>>();
|
||||
assert_eq!(dupes_and_missing(&mut list, max), (0, 0));
|
||||
}
|
||||
}
|
||||
@@ -78,7 +78,6 @@ pub mod completion;
|
||||
/// Reporting utilities
|
||||
pub mod error;
|
||||
|
||||
pub mod exp_counter;
|
||||
/// async timeout helper
|
||||
pub mod timeout;
|
||||
|
||||
|
||||
@@ -125,9 +125,6 @@ where
|
||||
// Wake everyone with an error.
|
||||
let mut internal = self.internal.lock().unwrap();
|
||||
|
||||
// Block any future waiters from starting
|
||||
internal.shutdown = true;
|
||||
|
||||
// This will steal the entire waiters map.
|
||||
// When we drop it all waiters will be woken.
|
||||
mem::take(&mut internal.waiters)
|
||||
|
||||
@@ -82,6 +82,8 @@ enum-map.workspace = true
|
||||
enumset.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::Result;
|
||||
use camino::Utf8Path;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::Subcommand;
|
||||
use pageserver::context::{DownloadBehavior, RequestContext};
|
||||
use pageserver::task_mgr::TaskKind;
|
||||
use pageserver::tenant::block_io::BlockCursor;
|
||||
use pageserver::tenant::disk_btree::DiskBtreeReader;
|
||||
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
|
||||
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
|
||||
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
|
||||
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
use pageserver::{
|
||||
@@ -20,6 +22,7 @@ use pageserver::{
|
||||
};
|
||||
use std::fs;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use crate::layer_map_analyzer::parse_filename;
|
||||
|
||||
@@ -45,6 +48,13 @@ pub(crate) enum LayerCmd {
|
||||
/// The id from list-layer command
|
||||
id: usize,
|
||||
},
|
||||
RewriteSummary {
|
||||
layer_file_path: Utf8PathBuf,
|
||||
#[clap(long)]
|
||||
new_tenant_id: Option<TenantId>,
|
||||
#[clap(long)]
|
||||
new_timeline_id: Option<TimelineId>,
|
||||
},
|
||||
}
|
||||
|
||||
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
|
||||
@@ -100,6 +110,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
println!("- timeline {}", timeline.file_name().to_string_lossy());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::ListLayer {
|
||||
path,
|
||||
@@ -128,6 +139,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::DumpLayer {
|
||||
path,
|
||||
@@ -168,7 +180,63 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::RewriteSummary {
|
||||
layer_file_path,
|
||||
new_tenant_id,
|
||||
new_timeline_id,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
macro_rules! rewrite_closure {
|
||||
($($summary_ty:tt)*) => {{
|
||||
|summary| $($summary_ty)* {
|
||||
tenant_id: new_tenant_id.unwrap_or(summary.tenant_id),
|
||||
timeline_id: new_timeline_id.unwrap_or(summary.timeline_id),
|
||||
..summary
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
let res = ImageLayer::rewrite_summary(
|
||||
layer_file_path,
|
||||
rewrite_closure!(image_layer::Summary),
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
match res {
|
||||
Ok(()) => {
|
||||
println!("Successfully rewrote summary of image layer {layer_file_path}");
|
||||
return Ok(());
|
||||
}
|
||||
Err(image_layer::RewriteTenantTimelineError::MagicMismatch) => (), // fallthrough
|
||||
Err(image_layer::RewriteTenantTimelineError::Other(e)) => {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
let res = DeltaLayer::rewrite_summary(
|
||||
layer_file_path,
|
||||
rewrite_closure!(delta_layer::Summary),
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
match res {
|
||||
Ok(()) => {
|
||||
println!("Successfully rewrote summary of delta layer {layer_file_path}");
|
||||
return Ok(());
|
||||
}
|
||||
Err(delta_layer::RewriteTenantTimelineError::MagicMismatch) => (), // fallthrough
|
||||
Err(delta_layer::RewriteTenantTimelineError::Other(e)) => {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
anyhow::bail!("not an image or delta layer: {layer_file_path}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
368
pageserver/src/bin/getpage_bench_libpq.rs
Normal file
368
pageserver/src/bin/getpage_bench_libpq.rs
Normal file
@@ -0,0 +1,368 @@
|
||||
use anyhow::Context;
|
||||
use clap::Parser;
|
||||
|
||||
use hyper::client::HttpConnector;
|
||||
use hyper::{Client, Uri};
|
||||
|
||||
use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block};
|
||||
use pageserver::repository;
|
||||
|
||||
use pageserver_api::reltag::RelTag;
|
||||
use rand::prelude::*;
|
||||
|
||||
use std::future::Future;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
struct Key(repository::Key);
|
||||
|
||||
impl std::str::FromStr for Key {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
|
||||
repository::Key::from_hex(s).map(Key)
|
||||
}
|
||||
}
|
||||
|
||||
struct KeyRange {
|
||||
start: i128,
|
||||
end: i128,
|
||||
}
|
||||
|
||||
impl KeyRange {
|
||||
fn len(&self) -> i128 {
|
||||
self.end - self.start
|
||||
}
|
||||
}
|
||||
|
||||
struct RelTagBlockNo {
|
||||
rel_tag: RelTag,
|
||||
block_no: u32,
|
||||
}
|
||||
|
||||
#[derive(clap::Parser)]
|
||||
struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
|
||||
page_service_connstring: String,
|
||||
// tenant_id: String,
|
||||
// timeline_id: String,
|
||||
num_tasks: usize,
|
||||
num_requests: usize,
|
||||
tenants: Option<Vec<String>>,
|
||||
#[clap(long)]
|
||||
pick_n_tenants: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct Stats {
|
||||
completed_requests: AtomicU64,
|
||||
}
|
||||
|
||||
impl Stats {
|
||||
fn inc(&self) {
|
||||
self.completed_requests.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let args: &'static Args = Box::leak(Box::new(Args::parse()));
|
||||
|
||||
// std::env::set_var("RUST_LOG", "info,tokio_postgres=trace");
|
||||
// tracing_subscriber::fmt::init();
|
||||
|
||||
let client = Client::new();
|
||||
|
||||
let tenants = if let Some(tenants) = &args.tenants {
|
||||
tenants.clone()
|
||||
} else {
|
||||
// let tenant_id = "b97965931096047b2d54958756baee7b";
|
||||
// let timeline_id = "2868f84a8d166779e4c651b116c45059";
|
||||
|
||||
let resp = client
|
||||
.get(Uri::try_from(&format!("{}/v1/tenant", args.mgmt_api_endpoint)).unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let body = hyper::body::to_bytes(resp).await.unwrap();
|
||||
let tenants: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
||||
let mut out = Vec::new();
|
||||
for t in tenants.as_array().unwrap() {
|
||||
if let Some(limit) = args.pick_n_tenants {
|
||||
if out.len() >= limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
out.push(t.get("id").unwrap().as_str().unwrap().to_owned());
|
||||
}
|
||||
if let Some(limit) = args.pick_n_tenants {
|
||||
assert_eq!(out.len(), limit);
|
||||
}
|
||||
out
|
||||
};
|
||||
|
||||
let mut tenant_timelines = Vec::new();
|
||||
for tenant_id in tenants {
|
||||
let resp = client
|
||||
.get(
|
||||
Uri::try_from(&format!(
|
||||
"{}/v1/tenant/{}/timeline",
|
||||
args.mgmt_api_endpoint, tenant_id
|
||||
))
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let body = hyper::body::to_bytes(resp).await.unwrap();
|
||||
let timelines: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
||||
for t in timelines.as_array().unwrap() {
|
||||
let timeline_id = t.get("timeline_id").unwrap().as_str().unwrap().to_owned();
|
||||
tenant_timelines.push((tenant_id.clone(), timeline_id));
|
||||
}
|
||||
}
|
||||
println!("tenant_timelines:\n{:?}", tenant_timelines);
|
||||
|
||||
let stats = Arc::new(Stats::default());
|
||||
|
||||
tokio::spawn({
|
||||
let stats = Arc::clone(&stats);
|
||||
async move {
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
|
||||
let elapsed = start.elapsed();
|
||||
println!(
|
||||
"RPS: {:.0}",
|
||||
completed_requests as f64 / elapsed.as_secs_f64()
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut tasks = Vec::new();
|
||||
for (tenant_id, timeline_id) in tenant_timelines {
|
||||
let stats = Arc::clone(&stats);
|
||||
let t = tokio::spawn(timeline(
|
||||
args,
|
||||
client.clone(),
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
stats,
|
||||
));
|
||||
tasks.push(t);
|
||||
}
|
||||
|
||||
for t in tasks {
|
||||
t.await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn timeline(
|
||||
args: &'static Args,
|
||||
http_client: Client<HttpConnector, hyper::Body>,
|
||||
tenant_id: String,
|
||||
timeline_id: String,
|
||||
stats: Arc<Stats>,
|
||||
) -> impl Future<Output = ()> + Send + Sync {
|
||||
async move {
|
||||
let resp = http_client
|
||||
.get(
|
||||
Uri::try_from(&format!(
|
||||
"{}/v1/tenant/{}/timeline/{}/keyspace",
|
||||
args.mgmt_api_endpoint, tenant_id, timeline_id
|
||||
))
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
if !resp.status().is_success() {
|
||||
panic!("Failed to get keyspace: {resp:?}");
|
||||
}
|
||||
let body = hyper::body::to_bytes(resp).await.unwrap();
|
||||
let keyspace: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
||||
let lsn: Lsn = keyspace["at_lsn"].as_str().unwrap().parse().unwrap();
|
||||
|
||||
let ranges = keyspace["keys"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.filter_map(|r| {
|
||||
let r = r.as_array().unwrap();
|
||||
assert_eq!(r.len(), 2);
|
||||
let start = Key::from_str(r[0].as_str().unwrap()).unwrap();
|
||||
let end = Key::from_str(r[1].as_str().unwrap()).unwrap();
|
||||
// filter out non-relblock keys
|
||||
match (is_rel_block_key(start.0), is_rel_block_key(end.0)) {
|
||||
(true, true) => Some(KeyRange {
|
||||
start: start.0.to_i128(),
|
||||
end: end.0.to_i128(),
|
||||
}),
|
||||
(true, false) | (false, true) => {
|
||||
unimplemented!("split up range")
|
||||
}
|
||||
(false, false) => None,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// weighted ranges
|
||||
let weights = ranges.iter().map(|r| r.len()).collect::<Vec<_>>();
|
||||
|
||||
let ranges = Arc::new(ranges);
|
||||
let weights = Arc::new(weights);
|
||||
|
||||
let mut tasks = Vec::<JoinHandle<()>>::new();
|
||||
|
||||
let _start = std::time::Instant::now();
|
||||
|
||||
for _i in 0..args.num_tasks {
|
||||
let ranges = ranges.clone();
|
||||
let _weights = weights.clone();
|
||||
let _client = http_client.clone();
|
||||
let tenant_id = tenant_id.clone();
|
||||
let timeline_id = timeline_id.clone();
|
||||
let task = tokio::spawn({
|
||||
let stats = Arc::clone(&stats);
|
||||
async move {
|
||||
let mut client = getpage_client::Client::new(
|
||||
args.page_service_connstring.clone(),
|
||||
tenant_id.clone(),
|
||||
timeline_id.clone(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
for _i in 0..args.num_requests {
|
||||
let key = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = repository::Key::from_i128(key);
|
||||
// XXX filter these out when we iterate the keyspace
|
||||
assert!(
|
||||
is_rel_block_key(key),
|
||||
"we filter non-relblock keys out above"
|
||||
);
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we just checked");
|
||||
RelTagBlockNo { rel_tag, block_no }
|
||||
};
|
||||
client
|
||||
.getpage(key, lsn)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("getpage for tenant {} timeline {}", tenant_id, timeline_id)
|
||||
})
|
||||
.unwrap();
|
||||
stats.inc();
|
||||
}
|
||||
client.shutdown().await;
|
||||
}
|
||||
});
|
||||
tasks.push(task);
|
||||
}
|
||||
|
||||
for task in tasks {
|
||||
task.await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mod getpage_client {
|
||||
use std::pin::Pin;
|
||||
|
||||
use futures::SinkExt;
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
|
||||
PagestreamGetPageResponse,
|
||||
};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::RelTagBlockNo;
|
||||
|
||||
pub(crate) struct Client {
|
||||
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
|
||||
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
|
||||
conn_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub async fn new(
|
||||
connstring: String,
|
||||
tenant_id: String,
|
||||
timeline_id: String,
|
||||
) -> anyhow::Result<Self> {
|
||||
let (client, connection) =
|
||||
tokio_postgres::connect(&connstring, postgres::NoTls).await?;
|
||||
|
||||
let conn_task_cancel = CancellationToken::new();
|
||||
let conn_task = tokio::spawn({
|
||||
let conn_task_cancel = conn_task_cancel.clone();
|
||||
async move {
|
||||
tokio::select! {
|
||||
_ = conn_task_cancel.cancelled() => { }
|
||||
res = connection => {
|
||||
res.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = client
|
||||
.copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
|
||||
.await?;
|
||||
|
||||
Ok(Self {
|
||||
copy_both: Box::pin(copy_both),
|
||||
conn_task,
|
||||
cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn shutdown(mut self) {
|
||||
let _ = self.cancel_on_client_drop.take();
|
||||
self.conn_task.await.unwrap();
|
||||
}
|
||||
|
||||
pub async fn getpage(
|
||||
&mut self,
|
||||
key: RelTagBlockNo,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
let req = PagestreamGetPageRequest {
|
||||
latest: false,
|
||||
rel: key.rel_tag,
|
||||
blkno: key.block_no,
|
||||
lsn,
|
||||
};
|
||||
let req = PagestreamFeMessage::GetPage(req);
|
||||
let req: bytes::Bytes = req.serialize();
|
||||
// let mut req = tokio_util::io::ReaderStream::new(&req);
|
||||
let mut req = tokio_stream::once(Ok(req));
|
||||
|
||||
self.copy_both.send_all(&mut req).await?;
|
||||
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
|
||||
let next = next.unwrap().unwrap();
|
||||
|
||||
match PagestreamBeMessage::deserialize(next)? {
|
||||
PagestreamBeMessage::Exists(_) => todo!(),
|
||||
PagestreamBeMessage::Nblocks(_) => todo!(),
|
||||
PagestreamBeMessage::GetPage(p) => Ok(p),
|
||||
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
|
||||
PagestreamBeMessage::DbSize(_) => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -261,7 +261,7 @@ async fn calculate_synthetic_size_worker(
|
||||
}
|
||||
};
|
||||
|
||||
for (tenant_id, tenant_state) in tenants {
|
||||
for (tenant_id, tenant_state, _gen) in tenants {
|
||||
if tenant_state != TenantState::Active {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -197,7 +197,7 @@ pub(super) async fn collect_all_metrics(
|
||||
}
|
||||
};
|
||||
|
||||
let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
|
||||
let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
|
||||
if state != TenantState::Active {
|
||||
None
|
||||
} else {
|
||||
|
||||
@@ -345,7 +345,7 @@ impl DeletionList {
|
||||
result.extend(
|
||||
timeline_layers
|
||||
.into_iter()
|
||||
.map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))),
|
||||
.map(|l| timeline_remote_path.join(Utf8PathBuf::from(l))),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -541,7 +541,7 @@ async fn collect_eviction_candidates(
|
||||
|
||||
let mut candidates = Vec::new();
|
||||
|
||||
for (tenant_id, _state) in &tenants {
|
||||
for (tenant_id, _state, _gen) in &tenants {
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(EvictionCandidates::Cancelled);
|
||||
}
|
||||
|
||||
@@ -352,8 +352,7 @@ paths:
|
||||
in: query
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
format: hex
|
||||
type: integer
|
||||
description: A LSN to get the timestamp
|
||||
responses:
|
||||
"200":
|
||||
|
||||
@@ -303,7 +303,11 @@ async fn build_timeline_info(
|
||||
// we're executing this function, we will outlive the timeline on-disk state.
|
||||
info.current_logical_size_non_incremental = Some(
|
||||
timeline
|
||||
.get_current_logical_size_non_incremental(info.last_record_lsn, ctx)
|
||||
.get_current_logical_size_non_incremental(
|
||||
info.last_record_lsn,
|
||||
CancellationToken::new(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
}
|
||||
@@ -604,7 +608,7 @@ async fn get_timestamp_of_lsn_handler(
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
let result = timeline.get_max_timestamp_for_lsn(lsn, &ctx).await?;
|
||||
let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?;
|
||||
|
||||
match result {
|
||||
Some(time) => {
|
||||
@@ -756,11 +760,12 @@ async fn tenant_list_handler(
|
||||
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
|
||||
})?
|
||||
.iter()
|
||||
.map(|(id, state)| TenantInfo {
|
||||
.map(|(id, state, gen)| TenantInfo {
|
||||
id: *id,
|
||||
state: state.clone(),
|
||||
current_physical_size: None,
|
||||
attachment_status: state.attachment_status(),
|
||||
generation: (*gen).into(),
|
||||
})
|
||||
.collect::<Vec<TenantInfo>>();
|
||||
|
||||
@@ -789,6 +794,7 @@ async fn tenant_status(
|
||||
state: state.clone(),
|
||||
current_physical_size: Some(current_physical_size),
|
||||
attachment_status: state.attachment_status(),
|
||||
generation: tenant.generation().into(),
|
||||
})
|
||||
}
|
||||
.instrument(info_span!("tenant_status_handler", %tenant_id))
|
||||
@@ -1478,7 +1484,7 @@ async fn timeline_collect_keyspace(
|
||||
let keys = timeline
|
||||
.collect_keyspace(at_lsn, &ctx)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, Partitioning { keys, at_lsn })
|
||||
}
|
||||
|
||||
@@ -51,6 +51,9 @@ pub enum StorageTimeOperation {
|
||||
|
||||
#[strum(serialize = "create tenant")]
|
||||
CreateTenant,
|
||||
|
||||
#[strum(serialize = "duplicate tenant")]
|
||||
DuplicateTenant,
|
||||
}
|
||||
|
||||
pub static STORAGE_TIME_SUM_PER_TIMELINE: Lazy<CounterVec> = Lazy::new(|| {
|
||||
@@ -1225,6 +1228,15 @@ pub(crate) static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static WAL_REDO_WAIT_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_wal_redo_wait_seconds",
|
||||
"Time spent waiting for access to the Postgres WAL redo process",
|
||||
redo_histogram_time_buckets!(),
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static WAL_REDO_RECORDS_HISTOGRAM: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_wal_redo_records_histogram",
|
||||
@@ -1252,46 +1264,6 @@ pub(crate) static WAL_REDO_RECORD_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub(crate) struct WalRedoProcessCounters {
|
||||
pub(crate) started: IntCounter,
|
||||
pub(crate) killed_by_cause: enum_map::EnumMap<WalRedoKillCause, IntCounter>,
|
||||
}
|
||||
|
||||
#[derive(Debug, enum_map::Enum, strum_macros::IntoStaticStr)]
|
||||
pub(crate) enum WalRedoKillCause {
|
||||
WalRedoProcessDrop,
|
||||
NoLeakChildDrop,
|
||||
Startup,
|
||||
}
|
||||
|
||||
impl Default for WalRedoProcessCounters {
|
||||
fn default() -> Self {
|
||||
let started = register_int_counter!(
|
||||
"pageserver_wal_redo_process_started_total",
|
||||
"Number of WAL redo processes started",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let killed = register_int_counter_vec!(
|
||||
"pageserver_wal_redo_process_stopped_total",
|
||||
"Number of WAL redo processes stopped",
|
||||
&["cause"],
|
||||
)
|
||||
.unwrap();
|
||||
Self {
|
||||
started,
|
||||
killed_by_cause: EnumMap::from_array(std::array::from_fn(|i| {
|
||||
let cause = <WalRedoKillCause as enum_map::Enum>::from_usize(i);
|
||||
let cause_str: &'static str = cause.into();
|
||||
killed.with_label_values(&[cause_str])
|
||||
})),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static WAL_REDO_PROCESS_COUNTERS: Lazy<WalRedoProcessCounters> =
|
||||
Lazy::new(WalRedoProcessCounters::default);
|
||||
|
||||
/// Similar to `prometheus::HistogramTimer` but does not record on drop.
|
||||
pub struct StorageTimeMetricsTimer {
|
||||
metrics: StorageTimeMetrics,
|
||||
@@ -1959,6 +1931,7 @@ pub fn preinitialize_metrics() {
|
||||
&READ_NUM_FS_LAYERS,
|
||||
&WAIT_LSN_TIME,
|
||||
&WAL_REDO_TIME,
|
||||
&WAL_REDO_WAIT_TIME,
|
||||
&WAL_REDO_RECORDS_HISTOGRAM,
|
||||
&WAL_REDO_BYTES_HISTOGRAM,
|
||||
]
|
||||
|
||||
@@ -512,11 +512,7 @@ impl PageServerHandler {
|
||||
};
|
||||
|
||||
if let Err(e) = &response {
|
||||
// Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
|
||||
// because wait_lsn etc will drop out
|
||||
// is_stopping(): [`Timeline::flush_and_shutdown`] has entered
|
||||
// is_canceled(): [`Timeline::shutdown`]` has entered
|
||||
if timeline.cancel.is_cancelled() || timeline.is_stopping() {
|
||||
if timeline.cancel.is_cancelled() {
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
// shutdown, then do not send the error to the client. Instead just drop the
|
||||
// connection.
|
||||
|
||||
@@ -21,9 +21,8 @@ use serde::{Deserialize, Serialize};
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, trace, warn};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
use utils::exp_counter::ExpCounter;
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
|
||||
/// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type.
|
||||
@@ -31,33 +30,9 @@ pub type BlockNumber = u32;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum LsnForTimestamp {
|
||||
/// Found commits both before and after the given timestamp
|
||||
Present(Lsn),
|
||||
|
||||
/// Found no commits after the given timestamp, this means
|
||||
/// that the newest data in the branch is older than the given
|
||||
/// timestamp.
|
||||
///
|
||||
/// All commits <= LSN happened before the given timestamp
|
||||
Future(Lsn),
|
||||
|
||||
/// The queried timestamp is past our horizon we look back at (PITR)
|
||||
///
|
||||
/// All commits > LSN happened after the given timestamp,
|
||||
/// but any commits < LSN might have happened before or after
|
||||
/// the given timestamp. We don't know because no data before
|
||||
/// the given lsn is available.
|
||||
Past(Lsn),
|
||||
|
||||
/// We have found no commit with a timestamp,
|
||||
/// so we can't return anything meaningful.
|
||||
///
|
||||
/// The associated LSN is the lower bound value we can safely
|
||||
/// create branches on, but no statement is made if it is
|
||||
/// older or newer than the timestamp.
|
||||
///
|
||||
/// This variant can e.g. be returned right after a
|
||||
/// cluster import.
|
||||
NoData(Lsn),
|
||||
}
|
||||
|
||||
@@ -69,25 +44,6 @@ pub enum CalculateLogicalSizeError {
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum CollectKeySpaceError {
|
||||
#[error(transparent)]
|
||||
Decode(#[from] DeserializeError),
|
||||
#[error(transparent)]
|
||||
PageRead(PageReconstructError),
|
||||
#[error("cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for CollectKeySpaceError {
|
||||
fn from(err: PageReconstructError) -> Self {
|
||||
match err {
|
||||
PageReconstructError::Cancelled => Self::Cancelled,
|
||||
err => Self::PageRead(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for CalculateLogicalSizeError {
|
||||
fn from(pre: PageReconstructError) -> Self {
|
||||
match pre {
|
||||
@@ -369,11 +325,7 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<LsnForTimestamp, PageReconstructError> {
|
||||
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
|
||||
// We use this method to figure out the branching LSN for the new branch, but the
|
||||
// GC cutoff could be before the branching point and we cannot create a new branch
|
||||
// with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
|
||||
// on the safe side.
|
||||
let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
|
||||
let min_lsn = *gc_cutoff_lsn_guard;
|
||||
let max_lsn = self.get_last_record_lsn();
|
||||
|
||||
// LSNs are always 8-byte aligned. low/mid/high represent the
|
||||
@@ -383,92 +335,83 @@ impl Timeline {
|
||||
|
||||
let mut found_smaller = false;
|
||||
let mut found_larger = false;
|
||||
|
||||
// This is a search budget to not make the search too expensive
|
||||
let mut budget = 1_000_000u32;
|
||||
while low < high {
|
||||
// cannot overflow, high and low are both smaller than u64::MAX / 2
|
||||
// this always holds: low <= mid_start < high
|
||||
let mid_start = (high + low) / 2;
|
||||
let mut mid = mid_start;
|
||||
let mid = (high + low) / 2;
|
||||
|
||||
let mut max = None;
|
||||
let cmp = self
|
||||
.is_latest_commit_timestamp_ge_than(
|
||||
search_timestamp,
|
||||
Lsn(mid * 8),
|
||||
&mut found_smaller,
|
||||
&mut found_larger,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Search for an lsn that has a commit timestamp.
|
||||
// The search with `ExpCounter` will eventually try all LSNs
|
||||
// in the range between mid_start and high, which is important
|
||||
// when we set high to mid if we have exhausted all possibilities.
|
||||
// We don't do an explosive search as we want to prove that
|
||||
// every single lsn up to high is giving inconclusive results.
|
||||
// This can only be done by trying all lsns.
|
||||
for offs in ExpCounter::with_max_and_linear_search(high - mid_start) {
|
||||
mid = offs + mid_start;
|
||||
|
||||
// Do the query for mid + 1 instead of mid so that we can make definite statements
|
||||
// about low (low's invariant: always points to commit before or at search_timestamp).
|
||||
max = self
|
||||
.get_max_timestamp_for_lsn(Lsn((mid + 1) * 8), ctx)
|
||||
.await?;
|
||||
if max.is_some() {
|
||||
break;
|
||||
}
|
||||
// Do some limiting to make sure the query does not become too expensive.
|
||||
if let Some(new_budget) = budget.checked_sub(1) {
|
||||
budget = new_budget;
|
||||
} else {
|
||||
return Ok(LsnForTimestamp::NoData(min_lsn));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(max) = max {
|
||||
if max <= search_timestamp {
|
||||
// We found a commit with timestamp before (or at) `search_timestamp`.
|
||||
found_smaller = true;
|
||||
low = mid + 1;
|
||||
} else {
|
||||
// We found a commit with timestamp after `search_timestamp`.
|
||||
found_larger = true;
|
||||
high = mid;
|
||||
}
|
||||
if cmp {
|
||||
high = mid;
|
||||
} else {
|
||||
// max is None so we have proof of a chain of None's from mid_start all up to high.
|
||||
// Thus we can safely set high to mid_start
|
||||
high = mid_start;
|
||||
low = mid + 1;
|
||||
}
|
||||
}
|
||||
// If `found_smaller == true`, `low` is the LSN of the last commit record
|
||||
// before or at `search_timestamp`
|
||||
//
|
||||
// FIXME: it would be better to get the LSN of the previous commit.
|
||||
// Otherwise, if you restore to the returned LSN, the database will
|
||||
// include physical changes from later commits that will be marked
|
||||
// as aborted, and will need to be vacuumed away.
|
||||
let commit_lsn = Lsn(low * 8);
|
||||
match (found_smaller, found_larger) {
|
||||
(false, false) => {
|
||||
// This can happen if no commit records have been processed yet, e.g.
|
||||
// just after importing a cluster.
|
||||
Ok(LsnForTimestamp::NoData(min_lsn))
|
||||
Ok(LsnForTimestamp::NoData(max_lsn))
|
||||
}
|
||||
(true, false) => {
|
||||
// Didn't find any commit timestamps larger than the request
|
||||
Ok(LsnForTimestamp::Future(max_lsn))
|
||||
}
|
||||
(false, true) => {
|
||||
// Didn't find any commit timestamps smaller than the request
|
||||
Ok(LsnForTimestamp::Past(min_lsn))
|
||||
Ok(LsnForTimestamp::Past(max_lsn))
|
||||
}
|
||||
(true, false) => {
|
||||
// Only found commits with timestamps smaller than the request.
|
||||
// It's still a valid case for branch creation, return it.
|
||||
// And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
|
||||
// case, anyway.
|
||||
Ok(LsnForTimestamp::Future(commit_lsn))
|
||||
(true, true) => {
|
||||
// low is the LSN of the first commit record *after* the search_timestamp,
|
||||
// Back off by one to get to the point just before the commit.
|
||||
//
|
||||
// FIXME: it would be better to get the LSN of the previous commit.
|
||||
// Otherwise, if you restore to the returned LSN, the database will
|
||||
// include physical changes from later commits that will be marked
|
||||
// as aborted, and will need to be vacuumed away.
|
||||
Ok(LsnForTimestamp::Present(Lsn((low - 1) * 8)))
|
||||
}
|
||||
(true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Obtain the timestamp for the given lsn.
|
||||
/// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
|
||||
/// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
|
||||
///
|
||||
/// If the lsn has no timestamps, returns None. Returns the maximum such timestamp otherwise.
|
||||
pub(crate) async fn get_max_timestamp_for_lsn(
|
||||
/// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
|
||||
/// with a smaller/larger timestamp.
|
||||
///
|
||||
pub async fn is_latest_commit_timestamp_ge_than(
|
||||
&self,
|
||||
search_timestamp: TimestampTz,
|
||||
probe_lsn: Lsn,
|
||||
found_smaller: &mut bool,
|
||||
found_larger: &mut bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, PageReconstructError> {
|
||||
self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
|
||||
if timestamp >= search_timestamp {
|
||||
*found_larger = true;
|
||||
return ControlFlow::Break(true);
|
||||
} else {
|
||||
*found_smaller = true;
|
||||
}
|
||||
ControlFlow::Continue(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Obtain the possible timestamp range for the given lsn.
|
||||
///
|
||||
/// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
|
||||
pub async fn get_timestamp_for_lsn(
|
||||
&self,
|
||||
probe_lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
@@ -635,6 +578,7 @@ impl Timeline {
|
||||
pub async fn get_current_logical_size_non_incremental(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
@@ -646,7 +590,7 @@ impl Timeline {
|
||||
let mut total_size: u64 = 0;
|
||||
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
|
||||
for rel in self.list_rels(*spcnode, *dbnode, lsn, ctx).await? {
|
||||
if self.cancel.is_cancelled() {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(CalculateLogicalSizeError::Cancelled);
|
||||
}
|
||||
let relsize_key = rel_size_to_key(rel);
|
||||
@@ -663,11 +607,11 @@ impl Timeline {
|
||||
/// Get a KeySpace that covers all the Keys that are in use at the given LSN.
|
||||
/// Anything that's not listed maybe removed from the underlying storage (from
|
||||
/// that LSN forwards).
|
||||
pub(crate) async fn collect_keyspace(
|
||||
pub async fn collect_keyspace(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<KeySpace, CollectKeySpaceError> {
|
||||
) -> anyhow::Result<KeySpace> {
|
||||
// Iterate through key ranges, greedily packing them into partitions
|
||||
let mut result = KeySpaceAccum::new();
|
||||
|
||||
@@ -676,7 +620,7 @@ impl Timeline {
|
||||
|
||||
// Fetch list of database dirs and iterate them
|
||||
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
|
||||
let dbdir = DbDirectory::des(&buf)?;
|
||||
let dbdir = DbDirectory::des(&buf).context("deserialization failure")?;
|
||||
|
||||
let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
|
||||
dbs.sort_unstable();
|
||||
@@ -709,7 +653,7 @@ impl Timeline {
|
||||
let slrudir_key = slru_dir_to_key(kind);
|
||||
result.add_key(slrudir_key);
|
||||
let buf = self.get(slrudir_key, lsn, ctx).await?;
|
||||
let dir = SlruSegmentDirectory::des(&buf)?;
|
||||
let dir = SlruSegmentDirectory::des(&buf).context("deserialization failure")?;
|
||||
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
|
||||
segments.sort_unstable();
|
||||
for segno in segments {
|
||||
@@ -727,7 +671,7 @@ impl Timeline {
|
||||
// Then pg_twophase
|
||||
result.add_key(TWOPHASEDIR_KEY);
|
||||
let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
|
||||
let twophase_dir = TwoPhaseDirectory::des(&buf)?;
|
||||
let twophase_dir = TwoPhaseDirectory::des(&buf).context("deserialization failure")?;
|
||||
let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
|
||||
xids.sort_unstable();
|
||||
for xid in xids {
|
||||
@@ -1756,6 +1700,7 @@ const AUX_FILES_KEY: Key = Key {
|
||||
// Reverse mappings for a few Keys.
|
||||
// These are needed by WAL redo manager.
|
||||
|
||||
/// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`.
|
||||
pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
|
||||
Ok(match key.field1 {
|
||||
0x00 => (
|
||||
@@ -1771,7 +1716,8 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
|
||||
})
|
||||
}
|
||||
|
||||
fn is_rel_block_key(key: Key) -> bool {
|
||||
/// See [[key_to_rel_block]].
|
||||
pub fn is_rel_block_key(key: Key) -> bool {
|
||||
key.field1 == 0x00 && key.field4 != 0
|
||||
}
|
||||
|
||||
|
||||
@@ -1711,6 +1711,10 @@ impl Tenant {
|
||||
self.current_state() == TenantState::Active
|
||||
}
|
||||
|
||||
pub fn generation(&self) -> Generation {
|
||||
self.generation
|
||||
}
|
||||
|
||||
/// Changes tenant status to active, unless shutdown was already requested.
|
||||
///
|
||||
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
|
||||
@@ -1841,13 +1845,7 @@ impl Tenant {
|
||||
timelines.values().for_each(|timeline| {
|
||||
let timeline = Arc::clone(timeline);
|
||||
let span = Span::current();
|
||||
js.spawn(async move {
|
||||
if freeze_and_flush {
|
||||
timeline.flush_and_shutdown().instrument(span).await
|
||||
} else {
|
||||
timeline.shutdown().instrument(span).await
|
||||
}
|
||||
});
|
||||
js.spawn(async move { timeline.shutdown(freeze_and_flush).instrument(span).await });
|
||||
})
|
||||
};
|
||||
tracing::info!("Waiting for timelines...");
|
||||
@@ -4733,7 +4731,7 @@ mod tests {
|
||||
// Keeps uninit mark in place
|
||||
let raw_tline = tline.raw_timeline().unwrap();
|
||||
raw_tline
|
||||
.shutdown()
|
||||
.shutdown(false)
|
||||
.instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_id))
|
||||
.await;
|
||||
std::mem::forget(tline);
|
||||
|
||||
@@ -327,7 +327,7 @@ mod tests {
|
||||
let mut sz: u16 = rng.gen();
|
||||
// Make 50% of the arrays small
|
||||
if rng.gen() {
|
||||
sz &= 63;
|
||||
sz |= 63;
|
||||
}
|
||||
random_array(sz.into())
|
||||
})
|
||||
|
||||
@@ -30,7 +30,7 @@ use crate::metrics::TENANT_MANAGER as METRICS;
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
|
||||
use crate::tenant::delete::DeleteTenantFlow;
|
||||
use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
|
||||
use crate::tenant::{create_tenant_files, AttachedTenantConf, Tenant, TenantState};
|
||||
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
|
||||
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
@@ -40,7 +40,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::delete::DeleteTenantError;
|
||||
use super::timeline::delete::DeleteTimelineFlow;
|
||||
use super::TenantSharedResources;
|
||||
use super::{SpawnMode, TenantSharedResources};
|
||||
|
||||
/// For a tenant that appears in TenantsMap, it may either be
|
||||
/// - `Attached`: has a full Tenant object, is elegible to service
|
||||
@@ -566,10 +566,8 @@ pub(crate) async fn shutdown_all_tenants() {
|
||||
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
||||
use utils::completion;
|
||||
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
// Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
|
||||
let (total_in_progress, total_attached) = {
|
||||
// Atomically, 1. extract the list of tenants to shut down and 2. prevent creation of new tenants.
|
||||
let (in_progress_ops, tenants_to_shut_down) = {
|
||||
let mut m = tenants.write().unwrap();
|
||||
match &mut *m {
|
||||
TenantsMap::Initializing => {
|
||||
@@ -579,67 +577,78 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
||||
}
|
||||
TenantsMap::Open(tenants) => {
|
||||
let mut shutdown_state = HashMap::new();
|
||||
let mut total_in_progress = 0;
|
||||
let mut total_attached = 0;
|
||||
let mut in_progress_ops = Vec::new();
|
||||
let mut tenants_to_shut_down = Vec::new();
|
||||
|
||||
for (tenant_id, v) in tenants.drain() {
|
||||
for (k, v) in tenants.drain() {
|
||||
match v {
|
||||
TenantSlot::Attached(t) => {
|
||||
shutdown_state.insert(tenant_id, TenantSlot::Attached(t.clone()));
|
||||
join_set.spawn(
|
||||
async move {
|
||||
let freeze_and_flush = true;
|
||||
|
||||
let res = {
|
||||
let (_guard, shutdown_progress) = completion::channel();
|
||||
t.shutdown(shutdown_progress, freeze_and_flush).await
|
||||
};
|
||||
|
||||
if let Err(other_progress) = res {
|
||||
// join the another shutdown in progress
|
||||
other_progress.wait().await;
|
||||
}
|
||||
|
||||
// we cannot afford per tenant logging here, because if s3 is degraded, we are
|
||||
// going to log too many lines
|
||||
debug!("tenant successfully stopped");
|
||||
}
|
||||
.instrument(info_span!("shutdown", %tenant_id)),
|
||||
);
|
||||
|
||||
total_attached += 1;
|
||||
tenants_to_shut_down.push(t.clone());
|
||||
shutdown_state.insert(k, TenantSlot::Attached(t));
|
||||
}
|
||||
TenantSlot::Secondary => {
|
||||
shutdown_state.insert(tenant_id, TenantSlot::Secondary);
|
||||
shutdown_state.insert(k, TenantSlot::Secondary);
|
||||
}
|
||||
TenantSlot::InProgress(notify) => {
|
||||
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
|
||||
// wait for their notifications to fire in this function.
|
||||
join_set.spawn(async move {
|
||||
notify.wait().await;
|
||||
});
|
||||
|
||||
total_in_progress += 1;
|
||||
in_progress_ops.push(notify);
|
||||
}
|
||||
}
|
||||
}
|
||||
*m = TenantsMap::ShuttingDown(shutdown_state);
|
||||
(total_in_progress, total_attached)
|
||||
(in_progress_ops, tenants_to_shut_down)
|
||||
}
|
||||
TenantsMap::ShuttingDown(_) => {
|
||||
// TODO: it is possible that detach and shutdown happen at the same time. as a
|
||||
// result, during shutdown we do not wait for detach.
|
||||
error!("already shutting down, this function isn't supposed to be called more than once");
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
info!(
|
||||
"Waiting for {} InProgress tenants and {} Attached tenants to shut down",
|
||||
total_in_progress, total_attached
|
||||
in_progress_ops.len(),
|
||||
tenants_to_shut_down.len()
|
||||
);
|
||||
|
||||
for barrier in in_progress_ops {
|
||||
barrier.wait().await;
|
||||
}
|
||||
|
||||
info!(
|
||||
"InProgress tenants shut down, waiting for {} Attached tenants to shut down",
|
||||
tenants_to_shut_down.len()
|
||||
);
|
||||
let started_at = std::time::Instant::now();
|
||||
let mut join_set = JoinSet::new();
|
||||
for tenant in tenants_to_shut_down {
|
||||
let tenant_id = tenant.get_tenant_id();
|
||||
join_set.spawn(
|
||||
async move {
|
||||
let freeze_and_flush = true;
|
||||
|
||||
let res = {
|
||||
let (_guard, shutdown_progress) = completion::channel();
|
||||
tenant.shutdown(shutdown_progress, freeze_and_flush).await
|
||||
};
|
||||
|
||||
if let Err(other_progress) = res {
|
||||
// join the another shutdown in progress
|
||||
other_progress.wait().await;
|
||||
}
|
||||
|
||||
// we cannot afford per tenant logging here, because if s3 is degraded, we are
|
||||
// going to log too many lines
|
||||
|
||||
debug!("tenant successfully stopped");
|
||||
}
|
||||
.instrument(info_span!("shutdown", %tenant_id)),
|
||||
);
|
||||
}
|
||||
|
||||
let total = join_set.len();
|
||||
let mut panicked = 0;
|
||||
let mut buffering = true;
|
||||
@@ -652,7 +661,7 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
||||
match joined {
|
||||
Ok(()) => {}
|
||||
Err(join_error) if join_error.is_cancelled() => {
|
||||
unreachable!("we are not cancelling any of the tasks");
|
||||
unreachable!("we are not cancelling any of the futures");
|
||||
}
|
||||
Err(join_error) if join_error.is_panic() => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
@@ -1285,7 +1294,8 @@ pub(crate) enum TenantMapListError {
|
||||
///
|
||||
/// Get list of tenants, for the mgmt API
|
||||
///
|
||||
pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||
pub(crate) async fn list_tenants(
|
||||
) -> Result<Vec<(TenantId, TenantState, Generation)>, TenantMapListError> {
|
||||
let tenants = TENANTS.read().unwrap();
|
||||
let m = match &*tenants {
|
||||
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
|
||||
@@ -1293,7 +1303,9 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
|
||||
};
|
||||
Ok(m.iter()
|
||||
.filter_map(|(id, tenant)| match tenant {
|
||||
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
|
||||
TenantSlot::Attached(tenant) => {
|
||||
Some((*id, tenant.current_state(), tenant.generation()))
|
||||
}
|
||||
TenantSlot::Secondary => None,
|
||||
TenantSlot::InProgress(_) => None,
|
||||
})
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::sync::Arc;
|
||||
use anyhow::{bail, Context};
|
||||
use tokio::sync::oneshot::error::RecvError;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||
@@ -349,6 +350,10 @@ async fn fill_logical_sizes(
|
||||
// our advantage with `?` error handling.
|
||||
let mut joinset = tokio::task::JoinSet::new();
|
||||
|
||||
let cancel = tokio_util::sync::CancellationToken::new();
|
||||
// be sure to cancel all spawned tasks if we are dropped
|
||||
let _dg = cancel.clone().drop_guard();
|
||||
|
||||
// For each point that would benefit from having a logical size available,
|
||||
// spawn a Task to fetch it, unless we have it cached already.
|
||||
for seg in segments.iter() {
|
||||
@@ -366,8 +371,15 @@ async fn fill_logical_sizes(
|
||||
let parallel_size_calcs = Arc::clone(limit);
|
||||
let ctx = ctx.attached_child();
|
||||
joinset.spawn(
|
||||
calculate_logical_size(parallel_size_calcs, timeline, lsn, cause, ctx)
|
||||
.in_current_span(),
|
||||
calculate_logical_size(
|
||||
parallel_size_calcs,
|
||||
timeline,
|
||||
lsn,
|
||||
cause,
|
||||
ctx,
|
||||
cancel.child_token(),
|
||||
)
|
||||
.in_current_span(),
|
||||
);
|
||||
}
|
||||
e.insert(cached_size);
|
||||
@@ -475,13 +487,14 @@ async fn calculate_logical_size(
|
||||
lsn: utils::lsn::Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<TimelineAtLsnSizeResult, RecvError> {
|
||||
let _permit = tokio::sync::Semaphore::acquire_owned(limit)
|
||||
.await
|
||||
.expect("global semaphore should not had been closed");
|
||||
|
||||
let size_res = timeline
|
||||
.spawn_ondemand_logical_size_calculation(lsn, cause, ctx)
|
||||
.spawn_ondemand_logical_size_calculation(lsn, cause, ctx, cancel)
|
||||
.instrument(info_span!("spawn_ondemand_logical_size_calculation"))
|
||||
.await?;
|
||||
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
pub mod delta_layer;
|
||||
mod filename;
|
||||
mod image_layer;
|
||||
pub mod image_layer;
|
||||
mod inmemory_layer;
|
||||
mod layer;
|
||||
mod layer_desc;
|
||||
|
||||
@@ -69,13 +69,13 @@ use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct Summary {
|
||||
/// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
|
||||
magic: u16,
|
||||
format_version: u16,
|
||||
pub magic: u16,
|
||||
pub format_version: u16,
|
||||
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub key_range: Range<Key>,
|
||||
pub lsn_range: Range<Lsn>,
|
||||
|
||||
/// Block number where the 'index' part of the file begins.
|
||||
pub index_start_blk: u32,
|
||||
@@ -609,6 +609,62 @@ impl Drop for DeltaLayerWriter {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RewriteTenantTimelineError {
|
||||
#[error("magic mismatch")]
|
||||
MagicMismatch,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for RewriteTenantTimelineError {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
Self::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayer {
|
||||
pub async fn rewrite_summary<F>(
|
||||
path: &Utf8Path,
|
||||
rewrite: F,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), RewriteTenantTimelineError>
|
||||
where
|
||||
F: Fn(Summary) -> Summary,
|
||||
{
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
&*std::fs::OpenOptions::new().read(true).write(true),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
let mut file = file.file;
|
||||
if actual_summary.magic != DELTA_FILE_MAGIC {
|
||||
return Err(RewriteTenantTimelineError::MagicMismatch);
|
||||
}
|
||||
|
||||
let new_summary = rewrite(actual_summary);
|
||||
|
||||
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
|
||||
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
|
||||
if buf.spilled() {
|
||||
// The code in DeltaLayerWriterInner just warn!()s for this.
|
||||
// It should probably error out as well.
|
||||
return Err(RewriteTenantTimelineError::Other(anyhow::anyhow!(
|
||||
"Used more than one page size for summary buffer: {}",
|
||||
buf.len()
|
||||
)));
|
||||
}
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&buf).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayerInner {
|
||||
pub(super) async fn load(
|
||||
path: &Utf8Path,
|
||||
|
||||
@@ -67,20 +67,20 @@ use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
|
||||
/// the 'index' starts at the block indicated by 'index_start_blk'
|
||||
///
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub(super) struct Summary {
|
||||
pub struct Summary {
|
||||
/// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
|
||||
magic: u16,
|
||||
format_version: u16,
|
||||
pub magic: u16,
|
||||
pub format_version: u16,
|
||||
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn: Lsn,
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub key_range: Range<Key>,
|
||||
pub lsn: Lsn,
|
||||
|
||||
/// Block number where the 'index' part of the file begins.
|
||||
index_start_blk: u32,
|
||||
pub index_start_blk: u32,
|
||||
/// Block within the 'index', where the B-tree root page is stored
|
||||
index_root_blk: u32,
|
||||
pub index_root_blk: u32,
|
||||
// the 'values' part starts after the summary header, on block 1.
|
||||
}
|
||||
|
||||
@@ -294,6 +294,61 @@ impl ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RewriteTenantTimelineError {
|
||||
#[error("magic mismatch")]
|
||||
MagicMismatch,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for RewriteTenantTimelineError {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
Self::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayer {
|
||||
pub async fn rewrite_summary<F>(
|
||||
path: &Utf8Path,
|
||||
rewrite: F,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), RewriteTenantTimelineError>
|
||||
where
|
||||
F: Fn(Summary) -> Summary,
|
||||
{
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
&*std::fs::OpenOptions::new().read(true).write(true),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
let mut file = file.file;
|
||||
if actual_summary.magic != IMAGE_FILE_MAGIC {
|
||||
return Err(RewriteTenantTimelineError::MagicMismatch);
|
||||
}
|
||||
|
||||
let new_summary = rewrite(actual_summary);
|
||||
|
||||
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
|
||||
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
|
||||
if buf.spilled() {
|
||||
// The code in ImageLayerWriterInner just warn!()s for this.
|
||||
// It should probably error out as well.
|
||||
return Err(RewriteTenantTimelineError::Other(anyhow::anyhow!(
|
||||
"Used more than one page size for summary buffer: {}",
|
||||
buf.len()
|
||||
)));
|
||||
}
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&buf).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayerInner {
|
||||
pub(super) async fn load(
|
||||
path: &Utf8Path,
|
||||
|
||||
@@ -3,6 +3,7 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::models::{
|
||||
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
};
|
||||
use remote_storage::RemotePath;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
@@ -251,7 +252,6 @@ impl Layer {
|
||||
|
||||
layer
|
||||
.get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0, ctx)
|
||||
.instrument(tracing::info_span!("get_value_reconstruct_data", layer=%self))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -305,6 +305,11 @@ impl Layer {
|
||||
&self.0.path
|
||||
}
|
||||
|
||||
/// This can return None even though it should return Some in some edge cases.
|
||||
pub(crate) fn remote_path(&self) -> Option<RemotePath> {
|
||||
self.0.remote_path()
|
||||
}
|
||||
|
||||
pub(crate) fn metadata(&self) -> LayerFileMetadata {
|
||||
self.0.metadata()
|
||||
}
|
||||
@@ -915,6 +920,17 @@ impl LayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
/// This can return None even though it should return Some in some edge cases.
|
||||
fn remote_path(&self) -> Option<RemotePath> {
|
||||
let tl = self.timeline.upgrade()?; // TODO: should distinguish this case, but, accuracy doesn't matter for this field.
|
||||
Some(crate::tenant::remote_timeline_client::remote_layer_path(
|
||||
&tl.tenant_id,
|
||||
&tl.timeline_id,
|
||||
&self.desc.filename(),
|
||||
self.generation,
|
||||
))
|
||||
}
|
||||
|
||||
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
|
||||
let layer_file_name = self.desc.filename().file_name();
|
||||
|
||||
@@ -934,6 +950,7 @@ impl LayerInner {
|
||||
lsn_end: lsn_range.end,
|
||||
remote,
|
||||
access_stats,
|
||||
remote_path: self.remote_path().map(|p| p.into()),
|
||||
}
|
||||
} else {
|
||||
let lsn = self.desc.image_layer_lsn();
|
||||
@@ -944,6 +961,7 @@ impl LayerInner {
|
||||
lsn_start: lsn,
|
||||
remote,
|
||||
access_stats,
|
||||
remote_path: self.remote_path().map(|p| p.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1212,10 +1230,8 @@ impl DownloadedLayer {
|
||||
// this will be a permanent failure
|
||||
.context("load layer");
|
||||
|
||||
if let Err(e) = res.as_ref() {
|
||||
if res.is_err() {
|
||||
LAYER_IMPL_METRICS.inc_permanent_loading_failures();
|
||||
// TODO(#5815): we are not logging all errors, so temporarily log them here as well
|
||||
tracing::error!("layer loading failed permanently: {e:#}");
|
||||
}
|
||||
res
|
||||
};
|
||||
@@ -1294,7 +1310,6 @@ impl ResidentLayer {
|
||||
}
|
||||
|
||||
/// Loads all keys stored in the layer. Returns key, lsn and value size.
|
||||
#[tracing::instrument(skip_all, fields(layer=%self))]
|
||||
pub(crate) async fn load_keys<'a>(
|
||||
&'a self,
|
||||
ctx: &RequestContext,
|
||||
|
||||
@@ -36,6 +36,7 @@ use std::time::{Duration, Instant, SystemTime};
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
|
||||
@@ -49,7 +50,6 @@ use crate::tenant::{
|
||||
metadata::{save_metadata, TimelineMetadata},
|
||||
par_fsync,
|
||||
};
|
||||
use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
|
||||
@@ -247,7 +247,7 @@ pub struct Timeline {
|
||||
/// the flush finishes. You can use that to wait for the flush to finish.
|
||||
layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
|
||||
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
|
||||
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
|
||||
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>,
|
||||
|
||||
/// Layer removal lock.
|
||||
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
|
||||
@@ -374,19 +374,6 @@ pub enum PageReconstructError {
|
||||
WalRedo(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum FlushLayerError {
|
||||
/// Timeline cancellation token was cancelled
|
||||
#[error("timeline shutting down")]
|
||||
Cancelled,
|
||||
|
||||
#[error(transparent)]
|
||||
PageReconstructError(#[from] PageReconstructError),
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for PageReconstructError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
|
||||
match self {
|
||||
@@ -904,16 +891,15 @@ impl Timeline {
|
||||
self.launch_eviction_task(background_jobs_can_start);
|
||||
}
|
||||
|
||||
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
|
||||
/// also to remote storage. This method can easily take multiple seconds for a busy timeline.
|
||||
///
|
||||
/// While we are flushing, we continue to accept read I/O.
|
||||
#[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
|
||||
pub(crate) async fn flush_and_shutdown(&self) {
|
||||
pub async fn shutdown(self: &Arc<Self>, freeze_and_flush: bool) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// Stop ingesting data, so that we are not still writing to an InMemoryLayer while
|
||||
// trying to flush
|
||||
// Signal any subscribers to our cancellation token to drop out
|
||||
tracing::debug!("Cancelling CancellationToken");
|
||||
self.cancel.cancel();
|
||||
|
||||
// prevent writes to the InMemoryLayer
|
||||
tracing::debug!("Waiting for WalReceiverManager...");
|
||||
task_mgr::shutdown_tasks(
|
||||
Some(TaskKind::WalReceiverManager),
|
||||
@@ -922,70 +908,40 @@ impl Timeline {
|
||||
)
|
||||
.await;
|
||||
|
||||
// Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
|
||||
self.last_record_lsn.shutdown();
|
||||
|
||||
// now all writers to InMemory layer are gone, do the final flush if requested
|
||||
match self.freeze_and_flush().await {
|
||||
Ok(_) => {
|
||||
// drain the upload queue
|
||||
if let Some(client) = self.remote_client.as_ref() {
|
||||
// if we did not wait for completion here, it might be our shutdown process
|
||||
// didn't wait for remote uploads to complete at all, as new tasks can forever
|
||||
// be spawned.
|
||||
//
|
||||
// what is problematic is the shutting down of RemoteTimelineClient, because
|
||||
// obviously it does not make sense to stop while we wait for it, but what
|
||||
// about corner cases like s3 suddenly hanging up?
|
||||
if let Err(e) = client.wait_completion().await {
|
||||
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
|
||||
// we have some extra WAL replay to do next time the timeline starts.
|
||||
warn!("failed to flush to remote storage: {e:#}");
|
||||
}
|
||||
if freeze_and_flush {
|
||||
match self.freeze_and_flush().await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
warn!("failed to freeze and flush: {e:#}");
|
||||
return; // TODO: should probably drain remote timeline client anyways?
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
|
||||
// we have some extra WAL replay to do next time the timeline starts.
|
||||
warn!("failed to freeze and flush: {e:#}");
|
||||
|
||||
// drain the upload queue
|
||||
let res = if let Some(client) = self.remote_client.as_ref() {
|
||||
// if we did not wait for completion here, it might be our shutdown process
|
||||
// didn't wait for remote uploads to complete at all, as new tasks can forever
|
||||
// be spawned.
|
||||
//
|
||||
// what is problematic is the shutting down of RemoteTimelineClient, because
|
||||
// obviously it does not make sense to stop while we wait for it, but what
|
||||
// about corner cases like s3 suddenly hanging up?
|
||||
client.wait_completion().await
|
||||
} else {
|
||||
Ok(())
|
||||
};
|
||||
|
||||
if let Err(e) = res {
|
||||
warn!("failed to await for frozen and flushed uploads: {e:#}");
|
||||
}
|
||||
}
|
||||
|
||||
self.shutdown().await;
|
||||
}
|
||||
|
||||
/// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of
|
||||
/// the graceful [`Timeline::flush_and_shutdown`] function.
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
// Signal any subscribers to our cancellation token to drop out
|
||||
tracing::debug!("Cancelling CancellationToken");
|
||||
self.cancel.cancel();
|
||||
|
||||
// Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
|
||||
// while doing so.
|
||||
self.last_record_lsn.shutdown();
|
||||
|
||||
// Shut down the layer flush task before the remote client, as one depends on the other
|
||||
task_mgr::shutdown_tasks(
|
||||
Some(TaskKind::LayerFlushTask),
|
||||
Some(self.tenant_id),
|
||||
Some(self.timeline_id),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
|
||||
// case our caller wants to use that for a deletion
|
||||
if let Some(remote_client) = self.remote_client.as_ref() {
|
||||
match remote_client.stop() {
|
||||
Ok(()) => {}
|
||||
Err(StopError::QueueUninitialized) => {
|
||||
// Shutting down during initialization is legal
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!("Waiting for tasks...");
|
||||
|
||||
task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(self.timeline_id)).await;
|
||||
|
||||
// Finally wait until any gate-holders are complete
|
||||
@@ -1029,12 +985,7 @@ impl Timeline {
|
||||
reason,
|
||||
backtrace: backtrace_str,
|
||||
};
|
||||
self.set_state(broken_state);
|
||||
|
||||
// Although the Broken state is not equivalent to shutdown() (shutdown will be called
|
||||
// later when this tenant is detach or the process shuts down), firing the cancellation token
|
||||
// here avoids the need for other tasks to watch for the Broken state explicitly.
|
||||
self.cancel.cancel();
|
||||
self.set_state(broken_state)
|
||||
}
|
||||
|
||||
pub fn current_state(&self) -> TimelineState {
|
||||
@@ -1790,8 +1741,12 @@ impl Timeline {
|
||||
// delay will be terminated by a timeout regardless.
|
||||
let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
|
||||
|
||||
// no extra cancellation here, because nothing really waits for this to complete compared
|
||||
// to spawn_ondemand_logical_size_calculation.
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let calculated_size = match self_clone
|
||||
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx)
|
||||
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
|
||||
.await
|
||||
{
|
||||
Ok(s) => s,
|
||||
@@ -1860,6 +1815,7 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
let self_clone = Arc::clone(self);
|
||||
@@ -1880,7 +1836,7 @@ impl Timeline {
|
||||
false,
|
||||
async move {
|
||||
let res = self_clone
|
||||
.logical_size_calculation_task(lsn, cause, &ctx)
|
||||
.logical_size_calculation_task(lsn, cause, &ctx, cancel)
|
||||
.await;
|
||||
let _ = sender.send(res).ok();
|
||||
Ok(()) // Receiver is responsible for handling errors
|
||||
@@ -1896,28 +1852,58 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
ctx: &RequestContext,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
span::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let _guard = self.gate.enter();
|
||||
|
||||
let mut timeline_state_updates = self.subscribe_for_state_updates();
|
||||
let self_calculation = Arc::clone(self);
|
||||
|
||||
let mut calculation = pin!(async {
|
||||
let cancel = cancel.child_token();
|
||||
let ctx = ctx.attached_child();
|
||||
self_calculation
|
||||
.calculate_logical_size(lsn, cause, &ctx)
|
||||
.calculate_logical_size(lsn, cause, cancel, &ctx)
|
||||
.await
|
||||
});
|
||||
let timeline_state_cancellation = async {
|
||||
loop {
|
||||
match timeline_state_updates.changed().await {
|
||||
Ok(()) => {
|
||||
let new_state = timeline_state_updates.borrow().clone();
|
||||
match new_state {
|
||||
// we're running this job for active timelines only
|
||||
TimelineState::Active => continue,
|
||||
TimelineState::Broken { .. }
|
||||
| TimelineState::Stopping
|
||||
| TimelineState::Loading => {
|
||||
break format!("aborted because timeline became inactive (new state: {new_state:?})")
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_sender_dropped_error) => {
|
||||
// can't happen, the sender is not dropped as long as the Timeline exists
|
||||
break "aborted because state watch was dropped".to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let taskmgr_shutdown_cancellation = async {
|
||||
task_mgr::shutdown_watcher().await;
|
||||
"aborted because task_mgr shutdown requested".to_string()
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
res = &mut calculation => { res }
|
||||
_ = self.cancel.cancelled() => {
|
||||
debug!("cancelling logical size calculation for timeline shutdown");
|
||||
reason = timeline_state_cancellation => {
|
||||
debug!(reason = reason, "cancelling calculation");
|
||||
cancel.cancel();
|
||||
calculation.await
|
||||
}
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
debug!("cancelling logical size calculation for task shutdown");
|
||||
reason = taskmgr_shutdown_cancellation => {
|
||||
debug!(reason = reason, "cancelling calculation");
|
||||
cancel.cancel();
|
||||
calculation.await
|
||||
}
|
||||
}
|
||||
@@ -1931,6 +1917,7 @@ impl Timeline {
|
||||
&self,
|
||||
up_to_lsn: Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
info!(
|
||||
@@ -1973,7 +1960,7 @@ impl Timeline {
|
||||
};
|
||||
let timer = storage_time_metrics.start_timer();
|
||||
let logical_size = self
|
||||
.get_current_logical_size_non_incremental(up_to_lsn, ctx)
|
||||
.get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
|
||||
.await?;
|
||||
debug!("calculated logical size: {logical_size}");
|
||||
timer.stop_and_record();
|
||||
@@ -2386,10 +2373,6 @@ impl Timeline {
|
||||
info!("started flush loop");
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = self.cancel.cancelled() => {
|
||||
info!("shutting down layer flush task");
|
||||
break;
|
||||
},
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
info!("shutting down layer flush task");
|
||||
break;
|
||||
@@ -2401,14 +2384,6 @@ impl Timeline {
|
||||
let timer = self.metrics.flush_time_histo.start_timer();
|
||||
let flush_counter = *layer_flush_start_rx.borrow();
|
||||
let result = loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
info!("dropping out of flush loop for timeline shutdown");
|
||||
// Note: we do not bother transmitting into [`layer_flush_done_tx`], because
|
||||
// anyone waiting on that will respect self.cancel as well: they will stop
|
||||
// waiting at the same time we as drop out of this loop.
|
||||
return;
|
||||
}
|
||||
|
||||
let layer_to_flush = {
|
||||
let guard = self.layers.read().await;
|
||||
guard.layer_map().frozen_layers.front().cloned()
|
||||
@@ -2417,18 +2392,9 @@ impl Timeline {
|
||||
let Some(layer_to_flush) = layer_to_flush else {
|
||||
break Ok(());
|
||||
};
|
||||
match self.flush_frozen_layer(layer_to_flush, ctx).await {
|
||||
Ok(()) => {}
|
||||
Err(FlushLayerError::Cancelled) => {
|
||||
info!("dropping out of flush loop for timeline shutdown");
|
||||
return;
|
||||
}
|
||||
err @ Err(
|
||||
FlushLayerError::Other(_) | FlushLayerError::PageReconstructError(_),
|
||||
) => {
|
||||
error!("could not flush frozen layer: {err:?}");
|
||||
break err;
|
||||
}
|
||||
if let Err(err) = self.flush_frozen_layer(layer_to_flush, ctx).await {
|
||||
error!("could not flush frozen layer: {err:?}");
|
||||
break Err(err);
|
||||
}
|
||||
};
|
||||
// Notify any listeners that we're done
|
||||
@@ -2477,17 +2443,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
trace!("waiting for flush to complete");
|
||||
tokio::select! {
|
||||
rx_e = rx.changed() => {
|
||||
rx_e?;
|
||||
},
|
||||
// Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
|
||||
// the notification from [`flush_loop`] that it completed.
|
||||
_ = self.cancel.cancelled() => {
|
||||
tracing::info!("Cancelled layer flush due on timeline shutdown");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
rx.changed().await?;
|
||||
trace!("done")
|
||||
}
|
||||
}
|
||||
@@ -2502,7 +2458,7 @@ impl Timeline {
|
||||
self: &Arc<Self>,
|
||||
frozen_layer: Arc<InMemoryLayer>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), FlushLayerError> {
|
||||
) -> anyhow::Result<()> {
|
||||
// As a special case, when we have just imported an image into the repository,
|
||||
// instead of writing out a L0 delta layer, we directly write out image layer
|
||||
// files instead. This is possible as long as *all* the data imported into the
|
||||
@@ -2527,11 +2483,6 @@ impl Timeline {
|
||||
let (partitioning, _lsn) = self
|
||||
.repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
|
||||
.await?;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
// For image layers, we add them immediately into the layer map.
|
||||
(
|
||||
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
|
||||
@@ -2563,10 +2514,6 @@ impl Timeline {
|
||||
)
|
||||
};
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
|
||||
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||
|
||||
@@ -2576,10 +2523,6 @@ impl Timeline {
|
||||
let metadata = {
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
|
||||
|
||||
if disk_consistent_lsn != old_disk_consistent_lsn {
|
||||
|
||||
@@ -26,7 +26,6 @@ use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
|
||||
|
||||
use crate::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
pgdatadir_mapping::CollectKeySpaceError,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||
@@ -327,7 +326,8 @@ impl Timeline {
|
||||
match state.last_layer_access_imitation {
|
||||
Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
|
||||
_ => {
|
||||
self.imitate_timeline_cached_layer_accesses(ctx).await;
|
||||
self.imitate_timeline_cached_layer_accesses(cancel, ctx)
|
||||
.await;
|
||||
state.last_layer_access_imitation = Some(tokio::time::Instant::now())
|
||||
}
|
||||
}
|
||||
@@ -367,12 +367,21 @@ impl Timeline {
|
||||
|
||||
/// Recompute the values which would cause on-demand downloads during restart.
|
||||
#[instrument(skip_all)]
|
||||
async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
|
||||
async fn imitate_timeline_cached_layer_accesses(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
let lsn = self.get_last_record_lsn();
|
||||
|
||||
// imitiate on-restart initial logical size
|
||||
let size = self
|
||||
.calculate_logical_size(lsn, LogicalSizeCalculationCause::EvictionTaskImitation, ctx)
|
||||
.calculate_logical_size(
|
||||
lsn,
|
||||
LogicalSizeCalculationCause::EvictionTaskImitation,
|
||||
cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.instrument(info_span!("calculate_logical_size"))
|
||||
.await;
|
||||
|
||||
@@ -398,16 +407,9 @@ impl Timeline {
|
||||
if size.is_err() {
|
||||
// ignore, see above comment
|
||||
} else {
|
||||
match e {
|
||||
CollectKeySpaceError::Cancelled => {
|
||||
// Shutting down, ignore
|
||||
}
|
||||
err => {
|
||||
warn!(
|
||||
"failed to collect keyspace but succeeded in calculating logical size: {err:#}"
|
||||
);
|
||||
}
|
||||
}
|
||||
warn!(
|
||||
"failed to collect keyspace but succeeded in calculating logical size: {e:#}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,8 +43,8 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::metrics::{
|
||||
WalRedoKillCause, WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_COUNTERS,
|
||||
WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
|
||||
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
|
||||
WAL_REDO_WAIT_TIME,
|
||||
};
|
||||
use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
|
||||
use crate::repository::Key;
|
||||
@@ -207,8 +207,11 @@ impl PostgresRedoManager {
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
|
||||
const MAX_RETRY_ATTEMPTS: u32 = 1;
|
||||
let start_time = Instant::now();
|
||||
let mut n_attempts = 0u32;
|
||||
loop {
|
||||
let lock_time = Instant::now();
|
||||
|
||||
// launch the WAL redo process on first use
|
||||
let proc: Arc<WalRedoProcess> = {
|
||||
let proc_guard = self.redo_process.read().unwrap();
|
||||
@@ -233,7 +236,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
};
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
|
||||
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
let buf_tag = BufferTag { rel, blknum };
|
||||
@@ -241,7 +244,8 @@ impl PostgresRedoManager {
|
||||
.apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout)
|
||||
.context("apply_wal_records");
|
||||
|
||||
let duration = started_at.elapsed();
|
||||
let end_time = Instant::now();
|
||||
let duration = end_time.duration_since(lock_time);
|
||||
|
||||
let len = records.len();
|
||||
let nbytes = records.iter().fold(0, |acumulator, record| {
|
||||
@@ -663,10 +667,10 @@ impl WalRedoProcess {
|
||||
.close_fds()
|
||||
.spawn_no_leak_child(tenant_id)
|
||||
.context("spawn process")?;
|
||||
WAL_REDO_PROCESS_COUNTERS.started.inc();
|
||||
|
||||
let mut child = scopeguard::guard(child, |child| {
|
||||
error!("killing wal-redo-postgres process due to a problem during launch");
|
||||
child.kill_and_wait(WalRedoKillCause::Startup);
|
||||
child.kill_and_wait();
|
||||
});
|
||||
|
||||
let stdin = child.stdin.take().unwrap();
|
||||
@@ -997,7 +1001,7 @@ impl Drop for WalRedoProcess {
|
||||
self.child
|
||||
.take()
|
||||
.expect("we only do this once")
|
||||
.kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
|
||||
.kill_and_wait();
|
||||
self.stderr_logger_cancel.cancel();
|
||||
// no way to wait for stderr_logger_task from Drop because that is async only
|
||||
}
|
||||
@@ -1033,19 +1037,16 @@ impl NoLeakChild {
|
||||
})
|
||||
}
|
||||
|
||||
fn kill_and_wait(mut self, cause: WalRedoKillCause) {
|
||||
fn kill_and_wait(mut self) {
|
||||
let child = match self.child.take() {
|
||||
Some(child) => child,
|
||||
None => return,
|
||||
};
|
||||
Self::kill_and_wait_impl(child, cause);
|
||||
Self::kill_and_wait_impl(child);
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(pid=child.id(), ?cause))]
|
||||
fn kill_and_wait_impl(mut child: Child, cause: WalRedoKillCause) {
|
||||
scopeguard::defer! {
|
||||
WAL_REDO_PROCESS_COUNTERS.killed_by_cause[cause].inc();
|
||||
}
|
||||
#[instrument(skip_all, fields(pid=child.id()))]
|
||||
fn kill_and_wait_impl(mut child: Child) {
|
||||
let res = child.kill();
|
||||
if let Err(e) = res {
|
||||
// This branch is very unlikely because:
|
||||
@@ -1090,7 +1091,7 @@ impl Drop for NoLeakChild {
|
||||
// This thread here is going to outlive of our dropper.
|
||||
let span = tracing::info_span!("walredo", %tenant_id);
|
||||
let _entered = span.enter();
|
||||
Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop);
|
||||
Self::kill_and_wait_impl(child);
|
||||
})
|
||||
.await
|
||||
});
|
||||
|
||||
@@ -80,9 +80,6 @@ struct ProxyCliArgs {
|
||||
/// cache for `wake_compute` api method (use `size=0` to disable)
|
||||
#[clap(long, default_value = config::CacheOptions::DEFAULT_OPTIONS_NODE_INFO)]
|
||||
wake_compute_cache: String,
|
||||
/// lock for `wake_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable).
|
||||
#[clap(long, default_value = config::WakeComputeLockOptions::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK)]
|
||||
wake_compute_lock: String,
|
||||
/// Allow self-signed certificates for compute nodes (for testing)
|
||||
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
|
||||
allow_self_signed_compute: bool,
|
||||
@@ -223,23 +220,10 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
node_info: console::caches::NodeInfoCache::new("node_info_cache", size, ttl),
|
||||
}));
|
||||
|
||||
let config::WakeComputeLockOptions {
|
||||
shards,
|
||||
permits,
|
||||
epoch,
|
||||
timeout,
|
||||
} = args.wake_compute_lock.parse()?;
|
||||
info!(permits, shards, ?epoch, "Using NodeLocks (wake_compute)");
|
||||
let locks = Box::leak(Box::new(
|
||||
console::locks::ApiLocks::new("wake_compute_lock", permits, shards, timeout)
|
||||
.unwrap(),
|
||||
));
|
||||
tokio::spawn(locks.garbage_collect_worker(epoch));
|
||||
|
||||
let url = args.auth_endpoint.parse()?;
|
||||
let endpoint = http::Endpoint::new(url, http::new_client());
|
||||
|
||||
let api = console::provider::neon::Api::new(endpoint, caches, locks);
|
||||
let api = console::provider::neon::Api::new(endpoint, caches);
|
||||
auth::BackendType::Console(Cow::Owned(api), ())
|
||||
}
|
||||
AuthBackend::Postgres => {
|
||||
|
||||
@@ -264,79 +264,6 @@ impl FromStr for CacheOptions {
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper for cmdline cache options parsing.
|
||||
pub struct WakeComputeLockOptions {
|
||||
/// The number of shards the lock map should have
|
||||
pub shards: usize,
|
||||
/// The number of allowed concurrent requests for each endpoitn
|
||||
pub permits: usize,
|
||||
/// Garbage collection epoch
|
||||
pub epoch: Duration,
|
||||
/// Lock timeout
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
impl WakeComputeLockOptions {
|
||||
/// Default options for [`crate::console::provider::ApiLocks`].
|
||||
pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0";
|
||||
|
||||
// pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "shards=32,permits=4,epoch=10m,timeout=1s";
|
||||
|
||||
/// Parse lock options passed via cmdline.
|
||||
/// Example: [`Self::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK`].
|
||||
fn parse(options: &str) -> anyhow::Result<Self> {
|
||||
let mut shards = None;
|
||||
let mut permits = None;
|
||||
let mut epoch = None;
|
||||
let mut timeout = None;
|
||||
|
||||
for option in options.split(',') {
|
||||
let (key, value) = option
|
||||
.split_once('=')
|
||||
.with_context(|| format!("bad key-value pair: {option}"))?;
|
||||
|
||||
match key {
|
||||
"shards" => shards = Some(value.parse()?),
|
||||
"permits" => permits = Some(value.parse()?),
|
||||
"epoch" => epoch = Some(humantime::parse_duration(value)?),
|
||||
"timeout" => timeout = Some(humantime::parse_duration(value)?),
|
||||
unknown => bail!("unknown key: {unknown}"),
|
||||
}
|
||||
}
|
||||
|
||||
// these dont matter if lock is disabled
|
||||
if let Some(0) = permits {
|
||||
timeout = Some(Duration::default());
|
||||
epoch = Some(Duration::default());
|
||||
shards = Some(2);
|
||||
}
|
||||
|
||||
let out = Self {
|
||||
shards: shards.context("missing `shards`")?,
|
||||
permits: permits.context("missing `permits`")?,
|
||||
epoch: epoch.context("missing `epoch`")?,
|
||||
timeout: timeout.context("missing `timeout`")?,
|
||||
};
|
||||
|
||||
ensure!(out.shards > 1, "shard count must be > 1");
|
||||
ensure!(
|
||||
out.shards.is_power_of_two(),
|
||||
"shard count must be a power of two"
|
||||
);
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for WakeComputeLockOptions {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(options: &str) -> Result<Self, Self::Err> {
|
||||
let error = || format!("failed to parse cache lock options '{options}'");
|
||||
Self::parse(options).with_context(error)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -361,42 +288,4 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_lock_options() -> anyhow::Result<()> {
|
||||
let WakeComputeLockOptions {
|
||||
epoch,
|
||||
permits,
|
||||
shards,
|
||||
timeout,
|
||||
} = "shards=32,permits=4,epoch=10m,timeout=1s".parse()?;
|
||||
assert_eq!(epoch, Duration::from_secs(10 * 60));
|
||||
assert_eq!(timeout, Duration::from_secs(1));
|
||||
assert_eq!(shards, 32);
|
||||
assert_eq!(permits, 4);
|
||||
|
||||
let WakeComputeLockOptions {
|
||||
epoch,
|
||||
permits,
|
||||
shards,
|
||||
timeout,
|
||||
} = "epoch=60s,shards=16,timeout=100ms,permits=8".parse()?;
|
||||
assert_eq!(epoch, Duration::from_secs(60));
|
||||
assert_eq!(timeout, Duration::from_millis(100));
|
||||
assert_eq!(shards, 16);
|
||||
assert_eq!(permits, 8);
|
||||
|
||||
let WakeComputeLockOptions {
|
||||
epoch,
|
||||
permits,
|
||||
shards,
|
||||
timeout,
|
||||
} = "permits=0".parse()?;
|
||||
assert_eq!(epoch, Duration::ZERO);
|
||||
assert_eq!(timeout, Duration::ZERO);
|
||||
assert_eq!(shards, 2);
|
||||
assert_eq!(permits, 0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,10 +13,5 @@ pub mod caches {
|
||||
pub use super::provider::{ApiCaches, NodeInfoCache};
|
||||
}
|
||||
|
||||
/// Various cache-related types.
|
||||
pub mod locks {
|
||||
pub use super::provider::ApiLocks;
|
||||
}
|
||||
|
||||
/// Console's management API.
|
||||
pub mod mgmt;
|
||||
|
||||
@@ -8,13 +8,7 @@ use crate::{
|
||||
compute, scram,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use dashmap::DashMap;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::{
|
||||
sync::{OwnedSemaphorePermit, Semaphore},
|
||||
time::Instant,
|
||||
};
|
||||
use tracing::info;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub mod errors {
|
||||
use crate::{
|
||||
@@ -155,9 +149,6 @@ pub mod errors {
|
||||
|
||||
#[error(transparent)]
|
||||
ApiError(ApiError),
|
||||
|
||||
#[error("Timeout waiting to acquire wake compute lock")]
|
||||
TimeoutError,
|
||||
}
|
||||
|
||||
// This allows more useful interactions than `#[from]`.
|
||||
@@ -167,17 +158,6 @@ pub mod errors {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tokio::sync::AcquireError> for WakeComputeError {
|
||||
fn from(_: tokio::sync::AcquireError) -> Self {
|
||||
WakeComputeError::TimeoutError
|
||||
}
|
||||
}
|
||||
impl From<tokio::time::error::Elapsed> for WakeComputeError {
|
||||
fn from(_: tokio::time::error::Elapsed) -> Self {
|
||||
WakeComputeError::TimeoutError
|
||||
}
|
||||
}
|
||||
|
||||
impl UserFacingError for WakeComputeError {
|
||||
fn to_string_client(&self) -> String {
|
||||
use WakeComputeError::*;
|
||||
@@ -187,8 +167,6 @@ pub mod errors {
|
||||
BadComputeAddress(_) => REQUEST_FAILED.to_owned(),
|
||||
// However, API might return a meaningful error.
|
||||
ApiError(e) => e.to_string_client(),
|
||||
|
||||
TimeoutError => "timeout while acquiring the compute resource lock".to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -255,145 +233,3 @@ pub struct ApiCaches {
|
||||
/// Cache for the `wake_compute` API method.
|
||||
pub node_info: NodeInfoCache,
|
||||
}
|
||||
|
||||
/// Various caches for [`console`](super).
|
||||
pub struct ApiLocks {
|
||||
name: &'static str,
|
||||
node_locks: DashMap<Arc<str>, Arc<Semaphore>>,
|
||||
permits: usize,
|
||||
timeout: Duration,
|
||||
registered: prometheus::IntCounter,
|
||||
unregistered: prometheus::IntCounter,
|
||||
reclamation_lag: prometheus::Histogram,
|
||||
lock_acquire_lag: prometheus::Histogram,
|
||||
}
|
||||
|
||||
impl ApiLocks {
|
||||
pub fn new(
|
||||
name: &'static str,
|
||||
permits: usize,
|
||||
shards: usize,
|
||||
timeout: Duration,
|
||||
) -> prometheus::Result<Self> {
|
||||
let registered = prometheus::IntCounter::with_opts(
|
||||
prometheus::Opts::new(
|
||||
"semaphores_registered",
|
||||
"Number of semaphores registered in this api lock",
|
||||
)
|
||||
.namespace(name),
|
||||
)?;
|
||||
prometheus::register(Box::new(registered.clone()))?;
|
||||
let unregistered = prometheus::IntCounter::with_opts(
|
||||
prometheus::Opts::new(
|
||||
"semaphores_unregistered",
|
||||
"Number of semaphores unregistered in this api lock",
|
||||
)
|
||||
.namespace(name),
|
||||
)?;
|
||||
prometheus::register(Box::new(unregistered.clone()))?;
|
||||
let reclamation_lag = prometheus::Histogram::with_opts(
|
||||
prometheus::HistogramOpts::new(
|
||||
"reclamation_lag_seconds",
|
||||
"Time it takes to reclaim unused semaphores in the api lock",
|
||||
)
|
||||
.namespace(name)
|
||||
// 1us -> 65ms
|
||||
// benchmarks on my mac indicate it's usually in the range of 256us and 512us
|
||||
.buckets(prometheus::exponential_buckets(1e-6, 2.0, 16)?),
|
||||
)?;
|
||||
prometheus::register(Box::new(reclamation_lag.clone()))?;
|
||||
let lock_acquire_lag = prometheus::Histogram::with_opts(
|
||||
prometheus::HistogramOpts::new(
|
||||
"semaphore_acquire_seconds",
|
||||
"Time it takes to reclaim unused semaphores in the api lock",
|
||||
)
|
||||
.namespace(name)
|
||||
// 0.1ms -> 6s
|
||||
.buckets(prometheus::exponential_buckets(1e-4, 2.0, 16)?),
|
||||
)?;
|
||||
prometheus::register(Box::new(lock_acquire_lag.clone()))?;
|
||||
|
||||
Ok(Self {
|
||||
name,
|
||||
node_locks: DashMap::with_shard_amount(shards),
|
||||
permits,
|
||||
timeout,
|
||||
lock_acquire_lag,
|
||||
registered,
|
||||
unregistered,
|
||||
reclamation_lag,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_wake_compute_permit(
|
||||
&self,
|
||||
key: &Arc<str>,
|
||||
) -> Result<WakeComputePermit, errors::WakeComputeError> {
|
||||
if self.permits == 0 {
|
||||
return Ok(WakeComputePermit { permit: None });
|
||||
}
|
||||
let now = Instant::now();
|
||||
let semaphore = {
|
||||
// get fast path
|
||||
if let Some(semaphore) = self.node_locks.get(key) {
|
||||
semaphore.clone()
|
||||
} else {
|
||||
self.node_locks
|
||||
.entry(key.clone())
|
||||
.or_insert_with(|| {
|
||||
self.registered.inc();
|
||||
Arc::new(Semaphore::new(self.permits))
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
};
|
||||
let permit = tokio::time::timeout_at(now + self.timeout, semaphore.acquire_owned()).await;
|
||||
|
||||
self.lock_acquire_lag
|
||||
.observe((Instant::now() - now).as_secs_f64());
|
||||
|
||||
Ok(WakeComputePermit {
|
||||
permit: Some(permit??),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn garbage_collect_worker(&self, epoch: std::time::Duration) {
|
||||
if self.permits == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut interval = tokio::time::interval(epoch / (self.node_locks.shards().len()) as u32);
|
||||
loop {
|
||||
for (i, shard) in self.node_locks.shards().iter().enumerate() {
|
||||
interval.tick().await;
|
||||
// temporary lock a single shard and then clear any semaphores that aren't currently checked out
|
||||
// race conditions: if strong_count == 1, there's no way that it can increase while the shard is locked
|
||||
// therefore releasing it is safe from race conditions
|
||||
info!(
|
||||
name = self.name,
|
||||
shard = i,
|
||||
"performing epoch reclamation on api lock"
|
||||
);
|
||||
let mut lock = shard.write();
|
||||
let timer = self.reclamation_lag.start_timer();
|
||||
let count = lock
|
||||
.extract_if(|_, semaphore| Arc::strong_count(semaphore.get_mut()) == 1)
|
||||
.count();
|
||||
drop(lock);
|
||||
self.unregistered.inc_by(count as u64);
|
||||
timer.observe_duration()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WakeComputePermit {
|
||||
// None if the lock is disabled
|
||||
permit: Option<OwnedSemaphorePermit>,
|
||||
}
|
||||
|
||||
impl WakeComputePermit {
|
||||
pub fn should_check_cache(&self) -> bool {
|
||||
self.permit.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,12 +3,12 @@
|
||||
use super::{
|
||||
super::messages::{ConsoleError, GetRoleSecret, WakeCompute},
|
||||
errors::{ApiError, GetAuthInfoError, WakeComputeError},
|
||||
ApiCaches, ApiLocks, AuthInfo, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
|
||||
ApiCaches, AuthInfo, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
|
||||
};
|
||||
use crate::{auth::ClientCredentials, compute, http, scram};
|
||||
use async_trait::async_trait;
|
||||
use futures::TryFutureExt;
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use std::net::SocketAddr;
|
||||
use tokio::time::Instant;
|
||||
use tokio_postgres::config::SslMode;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
@@ -17,17 +17,12 @@ use tracing::{error, info, info_span, warn, Instrument};
|
||||
pub struct Api {
|
||||
endpoint: http::Endpoint,
|
||||
caches: &'static ApiCaches,
|
||||
locks: &'static ApiLocks,
|
||||
jwt: String,
|
||||
}
|
||||
|
||||
impl Api {
|
||||
/// Construct an API object containing the auth parameters.
|
||||
pub fn new(
|
||||
endpoint: http::Endpoint,
|
||||
caches: &'static ApiCaches,
|
||||
locks: &'static ApiLocks,
|
||||
) -> Self {
|
||||
pub fn new(endpoint: http::Endpoint, caches: &'static ApiCaches) -> Self {
|
||||
let jwt: String = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") {
|
||||
Ok(v) => v,
|
||||
Err(_) => "".to_string(),
|
||||
@@ -35,7 +30,6 @@ impl Api {
|
||||
Self {
|
||||
endpoint,
|
||||
caches,
|
||||
locks,
|
||||
jwt,
|
||||
}
|
||||
}
|
||||
@@ -169,22 +163,9 @@ impl super::Api for Api {
|
||||
return Ok(cached);
|
||||
}
|
||||
|
||||
let key: Arc<str> = key.into();
|
||||
|
||||
let permit = self.locks.get_wake_compute_permit(&key).await?;
|
||||
|
||||
// after getting back a permit - it's possible the cache was filled
|
||||
// double check
|
||||
if permit.should_check_cache() {
|
||||
if let Some(cached) = self.caches.node_info.get(&key) {
|
||||
info!(key = &*key, "found cached compute node info");
|
||||
return Ok(cached);
|
||||
}
|
||||
}
|
||||
|
||||
let node = self.do_wake_compute(extra, creds).await?;
|
||||
let (_, cached) = self.caches.node_info.insert(key.clone(), node);
|
||||
info!(key = &*key, "created a cache entry for compute node info");
|
||||
let (_, cached) = self.caches.node_info.insert(key.into(), node);
|
||||
info!(key = key, "created a cache entry for compute node info");
|
||||
|
||||
Ok(cached)
|
||||
}
|
||||
|
||||
@@ -570,7 +570,6 @@ fn report_error(e: &WakeComputeError, retry: bool) {
|
||||
"api_console_other_server_error"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
|
||||
WakeComputeError::TimeoutError => "timeout_error",
|
||||
};
|
||||
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
|
||||
}
|
||||
|
||||
69
test_runner/duplicate_tenant.py
Normal file
69
test_runner/duplicate_tenant.py
Normal file
@@ -0,0 +1,69 @@
|
||||
# Usage from top of repo:
|
||||
# poetry run python3 ./test_runner/duplicate_tenant.py c66e2e233057f7f05563caff664ecb14 .neon/remote_storage_local_fs
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.types import TenantId
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Duplicate tenant script.")
|
||||
parser.add_argument("initial_tenant", type=str, help="Initial tenant")
|
||||
parser.add_argument("remote_storage_local_fs_root", type=Path, help="Remote storage local fs root")
|
||||
parser.add_argument("--ncopies", type=int, help="Number of copies")
|
||||
parser.add_argument("--numthreads", type=int, default=1, help="Number of threads")
|
||||
parser.add_argument("--port", type=int, default=9898, help="Pageserver management api port")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
initial_tenant = args.initial_tenant
|
||||
remote_storage_local_fs_root: Path = args.remote_storage_local_fs_root
|
||||
ncopies = args.ncopies
|
||||
numthreads = args.numthreads
|
||||
|
||||
new_tenant = TenantId.generate()
|
||||
print(f"New tenant: {new_tenant}")
|
||||
|
||||
client = PageserverHttpClient(args.port, lambda: None)
|
||||
|
||||
src_tenant_gen = int(client.tenant_status(initial_tenant)["generation"])
|
||||
|
||||
assert remote_storage_local_fs_root.is_dir(), f"{remote_storage_local_fs_root} is not a directory"
|
||||
|
||||
src_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / initial_tenant / "timelines"
|
||||
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
|
||||
|
||||
dst_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / str(new_tenant) / "timelines"
|
||||
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
|
||||
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
|
||||
|
||||
for tl in src_timelines_dir.iterdir():
|
||||
src_tl_dir = src_timelines_dir / tl.name
|
||||
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
|
||||
dst_tl_dir = dst_timelines_dir / tl.name
|
||||
dst_tl_dir.mkdir(parents=False, exist_ok=False)
|
||||
for file in tl.iterdir():
|
||||
shutil.copy2(file, dst_tl_dir)
|
||||
if "__" in file.name:
|
||||
cmd = [
|
||||
"./target/debug/pagectl", # TODO: abstract this like the other binaries
|
||||
"layer",
|
||||
"rewrite-summary",
|
||||
str(dst_tl_dir / file.name),
|
||||
"--new-tenant-id",
|
||||
str(new_tenant),
|
||||
]
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
client.tenant_attach(new_tenant, generation=src_tenant_gen)
|
||||
|
||||
while True:
|
||||
status = client.tenant_status(new_tenant)
|
||||
if status["state"]["slug"] == "Active":
|
||||
break
|
||||
print("Waiting for tenant to be active..., is: " + status["state"]["slug"])
|
||||
time.sleep(1)
|
||||
|
||||
print("Tenant is active: " + str(new_tenant))
|
||||
@@ -58,6 +58,7 @@ class HistoricLayerInfo:
|
||||
lsn_start: str
|
||||
lsn_end: Optional[str]
|
||||
remote: bool
|
||||
remote_path: Optional[str] = None
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo:
|
||||
@@ -68,6 +69,7 @@ class HistoricLayerInfo:
|
||||
lsn_start=d["lsn_start"],
|
||||
lsn_end=d.get("lsn_end"),
|
||||
remote=d["remote"],
|
||||
remote_path=d.get("remote_path"),
|
||||
)
|
||||
|
||||
|
||||
@@ -219,6 +221,25 @@ class PageserverHttpClient(requests.Session):
|
||||
assert isinstance(new_tenant_id, str)
|
||||
return TenantId(new_tenant_id)
|
||||
|
||||
def tenant_duplicate(
|
||||
self, src_tenant_id: TenantId, new_tenant_id: TenantId, conf: Optional[Dict[str, Any]] = None
|
||||
) -> TenantId:
|
||||
if conf is not None:
|
||||
assert "new_tenant_id" not in conf.keys()
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{src_tenant_id}/duplicate",
|
||||
json={
|
||||
"new_tenant_id": str(new_tenant_id),
|
||||
**(conf or {}),
|
||||
},
|
||||
)
|
||||
self.verbose_error(res)
|
||||
if res.status_code == 409:
|
||||
raise Exception(f"could not create tenant: already exists for id {new_tenant_id}")
|
||||
new_tenant_id = res.json()
|
||||
assert isinstance(new_tenant_id, str)
|
||||
return TenantId(new_tenant_id)
|
||||
|
||||
def tenant_attach(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -26,7 +26,6 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
|
||||
".*will not become active. Current state: Broken.*",
|
||||
".*failed to load metadata.*",
|
||||
".*load failed.*load local timeline.*",
|
||||
".*layer loading failed permanently: load layer: .*",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -79,31 +79,13 @@ def test_lsn_mapping_old(neon_env_builder: NeonEnvBuilder):
|
||||
def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# disable default GC and compaction
|
||||
"gc_period": "1000 m",
|
||||
"compaction_period": "0 s",
|
||||
"gc_horizon": f"{1024 ** 2}",
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
}
|
||||
)
|
||||
|
||||
timeline_id = env.neon_cli.create_branch("test_lsn_mapping", tenant_id=tenant_id)
|
||||
endpoint_main = env.endpoints.create_start("test_lsn_mapping", tenant_id=tenant_id)
|
||||
timeline_id = endpoint_main.safe_psql("show neon.timeline_id")[0][0]
|
||||
log.info("postgres is running on 'main' branch")
|
||||
new_timeline_id = env.neon_cli.create_branch("test_lsn_mapping")
|
||||
endpoint_main = env.endpoints.create_start("test_lsn_mapping")
|
||||
log.info("postgres is running on 'test_lsn_mapping' branch")
|
||||
|
||||
cur = endpoint_main.connect().cursor()
|
||||
|
||||
# Obtain an lsn before all write operations on this branch
|
||||
start_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_lsn()"))
|
||||
|
||||
# Create table, and insert rows, each in a separate transaction
|
||||
# Disable `synchronous_commit` to make this initialization go faster.
|
||||
# XXX: on my laptop this test takes 7s, and setting `synchronous_commit=off`
|
||||
# doesn't change anything.
|
||||
# Disable synchronous_commit to make this initialization go faster.
|
||||
#
|
||||
# Each row contains current insert LSN and the current timestamp, when
|
||||
# the row was inserted.
|
||||
@@ -122,63 +104,40 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
cur.execute("INSERT INTO foo VALUES (-1)")
|
||||
|
||||
# Wait until WAL is received by pageserver
|
||||
last_flush_lsn = wait_for_last_flush_lsn(env, endpoint_main, tenant_id, timeline_id)
|
||||
wait_for_last_flush_lsn(env, endpoint_main, env.initial_tenant, new_timeline_id)
|
||||
|
||||
with env.pageserver.http_client() as client:
|
||||
# Check edge cases
|
||||
# Timestamp is in the future
|
||||
# Check edge cases: timestamp in the future
|
||||
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
)
|
||||
assert result["kind"] in ["present", "future"]
|
||||
# make sure that we return a well advanced lsn here
|
||||
assert Lsn(result["lsn"]) > start_lsn
|
||||
assert result["kind"] == "future"
|
||||
|
||||
# Timestamp is in the unreachable past
|
||||
# timestamp too the far history
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
)
|
||||
assert result["kind"] == "past"
|
||||
# make sure that we return the minimum lsn here at the start of the range
|
||||
assert Lsn(result["lsn"]) < start_lsn
|
||||
|
||||
# Probe a bunch of timestamps in the valid range
|
||||
for i in range(1, len(tbl), 100):
|
||||
probe_timestamp = tbl[i][1]
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
)
|
||||
assert result["kind"] not in ["past", "nodata"]
|
||||
lsn = result["lsn"]
|
||||
# Call get_lsn_by_timestamp to get the LSN
|
||||
# Launch a new read-only node at that LSN, and check that only the rows
|
||||
# that were supposed to be committed at that point in time are visible.
|
||||
endpoint_here = env.endpoints.create_start(
|
||||
branch_name="test_lsn_mapping",
|
||||
endpoint_id="ep-lsn_mapping_read",
|
||||
lsn=lsn,
|
||||
tenant_id=tenant_id,
|
||||
branch_name="test_lsn_mapping", endpoint_id="ep-lsn_mapping_read", lsn=lsn
|
||||
)
|
||||
assert endpoint_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
|
||||
|
||||
endpoint_here.stop_and_destroy()
|
||||
|
||||
# Do the "past" check again at a new branch to ensure that we don't return something before the branch cutoff
|
||||
timeline_id_child = env.neon_cli.create_branch(
|
||||
"test_lsn_mapping_child", tenant_id=tenant_id, ancestor_branch_name="test_lsn_mapping"
|
||||
)
|
||||
|
||||
# Timestamp is in the unreachable past
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id_child, f"{probe_timestamp.isoformat()}Z", 2
|
||||
)
|
||||
assert result["kind"] == "past"
|
||||
# make sure that we return the minimum lsn here at the start of the range
|
||||
assert Lsn(result["lsn"]) >= last_flush_lsn
|
||||
|
||||
|
||||
# Test pageserver get_timestamp_of_lsn API
|
||||
def test_ts_of_lsn_api(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
54
test_runner/regress/test_tenant_duplicate.py
Normal file
54
test_runner/regress/test_tenant_duplicate.py
Normal file
@@ -0,0 +1,54 @@
|
||||
import time
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
last_flush_lsn_upload,
|
||||
)
|
||||
from fixtures.remote_storage import (
|
||||
RemoteStorageKind,
|
||||
)
|
||||
from fixtures.types import TenantId
|
||||
from fixtures.log_helper import log
|
||||
|
||||
|
||||
def test_tenant_duplicate(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep_main:
|
||||
ep_main.safe_psql("CREATE TABLE foo (i int);")
|
||||
ep_main.safe_psql("INSERT INTO foo VALUES (1), (2), (3);")
|
||||
last_flush_lsn = last_flush_lsn_upload(
|
||||
env, ep_main, env.initial_tenant, env.initial_timeline
|
||||
)
|
||||
|
||||
new_tenant_id = TenantId.generate()
|
||||
# timeline id remains unchanged with tenant_duplicate
|
||||
# TODO: implement a remapping scheme so timeline ids remain globally unique
|
||||
new_timeline_id = env.initial_timeline
|
||||
|
||||
log.info(f"Duplicate tenant/timeline will be: {new_tenant_id}/{new_timeline_id}")
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
ps_http.tenant_duplicate(env.initial_tenant, new_tenant_id)
|
||||
|
||||
ps_http.tenant_delete(env.initial_tenant)
|
||||
|
||||
env.neon_cli.map_branch("duplicate", new_tenant_id, new_timeline_id)
|
||||
|
||||
# start read-only replicate and validate
|
||||
with env.endpoints.create_start(
|
||||
"duplicate", tenant_id=new_tenant_id, lsn=last_flush_lsn
|
||||
) as ep_dup:
|
||||
with ep_dup.connect() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT * FROM foo ORDER BY i;")
|
||||
cur.fetchall() == [(1,), (2,), (3,)]
|
||||
|
||||
# ensure restarting PS works
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
@@ -25,7 +25,6 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
|
||||
clap = { version = "4", features = ["derive", "string"] }
|
||||
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
|
||||
crossbeam-utils = { version = "0.8" }
|
||||
dashmap = { version = "5", default-features = false, features = ["raw-api"] }
|
||||
either = { version = "1" }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
futures = { version = "0.3" }
|
||||
@@ -39,7 +38,7 @@ hex = { version = "0.4", features = ["serde"] }
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
itertools = { version = "0.10" }
|
||||
libc = { version = "0.2", features = ["extra_traits"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["kv_unstable", "std"] }
|
||||
memchr = { version = "2" }
|
||||
nom = { version = "7" }
|
||||
num-bigint = { version = "0.4" }
|
||||
@@ -56,6 +55,7 @@ scopeguard = { version = "1" }
|
||||
serde = { version = "1", features = ["alloc", "derive"] }
|
||||
serde_json = { version = "1", features = ["raw_value"] }
|
||||
smallvec = { version = "1", default-features = false, features = ["write"] }
|
||||
standback = { version = "0.2", default-features = false, features = ["std"] }
|
||||
time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
|
||||
tokio-rustls = { version = "0.24" }
|
||||
@@ -76,13 +76,14 @@ cc = { version = "1", default-features = false, features = ["parallel"] }
|
||||
either = { version = "1" }
|
||||
itertools = { version = "0.10" }
|
||||
libc = { version = "0.2", features = ["extra_traits"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["kv_unstable", "std"] }
|
||||
memchr = { version = "2" }
|
||||
nom = { version = "7" }
|
||||
prost = { version = "0.11" }
|
||||
regex = { version = "1" }
|
||||
regex-syntax = { version = "0.7" }
|
||||
serde = { version = "1", features = ["alloc", "derive"] }
|
||||
standback = { version = "0.2", default-features = false, features = ["std"] }
|
||||
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit"] }
|
||||
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit", "visit-mut"] }
|
||||
time-macros = { version = "0.2", default-features = false, features = ["formatting", "parsing", "serde"] }
|
||||
|
||||
Reference in New Issue
Block a user