Compare commits

..

8 Commits

Author SHA1 Message Date
Christian Schwarz
9f72d063f3 WIP: change duplicate_tenant.py script to use the pagectl command
The script works but at restart, we detach the created tenants because
they're not known to the attachment service:

  Detaching tenant, control plane omitted it in re-attach response tenant_id=1e399d390e3aee6b11c701cbc716bb6c

=> figure out how to further integrate this
2023-11-09 13:55:03 +00:00
Christian Schwarz
c6d8c7cb0a switch to generalized rewrite_summary & impl page_ctl subcommand to use it 2023-11-09 13:55:03 +00:00
Christian Schwarz
831588b5aa remove http handler 2023-11-09 13:55:03 +00:00
Christian Schwarz
47a8e9711f Revert "WIP duplicate based on LayerMap"
This reverts commit 0d7dfdbb3e.
2023-11-09 13:55:03 +00:00
Christian Schwarz
de9436518e WIP duplicate based on LayerMap 2023-11-09 13:55:03 +00:00
Christian Schwarz
fb92f8fa91 random getpage benchmark 2023-11-09 13:55:03 +00:00
Christian Schwarz
08f682b217 script to duplicate a tenant a bunch of times, using the API added in previous commit
Setup the pageserver using a config like below.

Then use the script like so, against the tenant to duplicate:

    poetry run python3 ./test_runner/duplicate_tenant.py 7ea51af32d42bfe7fb93bf5f28114d09  200 8

backup of pageserver.toml

    d =1
    pg_distrib_dir ='/home/admin/neon-main/pg_install'
    http_auth_type ='Trust'
    pg_auth_type ='Trust'
    listen_http_addr ='127.0.0.1:9898'
    listen_pg_addr ='127.0.0.1:64000'
    broker_endpoint ='http://127.0.0.1:50051/'
    #control_plane_api ='http://127.0.0.1:1234/'

    # Initial configuration file created by 'pageserver --init'
    #listen_pg_addr = '127.0.0.1:64000'
    #listen_http_addr = '127.0.0.1:9898'

    #wait_lsn_timeout = '60 s'
    #wal_redo_timeout = '60 s'

    #max_file_descriptors = 10000
    #page_cache_size = 160000

    # initial superuser role name to use when creating a new tenant
    #initial_superuser_name = 'cloud_admin'

    #broker_endpoint = 'http://127.0.0.1:50051'

    #log_format = 'plain'

    #concurrent_tenant_size_logical_size_queries = '1'

    #metric_collection_interval = '10 min'
    #cached_metric_collection_interval = '0s'
    #synthetic_size_calculation_interval = '10 min'

    #disk_usage_based_eviction = { max_usage_pct = .., min_avail_bytes = .., period = "10s"}

    #background_task_maximum_delay = '10s'

    [tenant_config]
    #checkpoint_distance = 268435456 # in bytes
    #checkpoint_timeout = 10 m
    #compaction_target_size = 134217728 # in bytes
    #compaction_period = '20 s'
    #compaction_threshold = 10

    #gc_period = '1 hr'
    #gc_horizon = 67108864
    #image_creation_threshold = 3
    #pitr_interval = '7 days'

    #min_resident_size_override = .. # in bytes
    #evictions_low_residence_duration_metric_threshold = '24 hour'
    #gc_feedback = false

    # make it determinsitic
    gc_period = '0s'
    checkpoint_timeout = '3650 day'
    compaction_period = '20 s'
    compaction_threshold = 10
    compaction_target_size = 134217728
    checkpoint_distance = 268435456
    image_creation_threshold = 3

    [remote_storage]
    local_path = '/home/admin/neon-main/bench_repo_dir/repo/remote_storage_local_fs'
2023-11-09 12:23:59 +00:00
Christian Schwarz
18e8baab5b API to duplicate a tenant 2023-11-09 12:23:55 +00:00
50 changed files with 1690 additions and 1301 deletions

View File

@@ -1,17 +1,3 @@
# The binaries are really slow, if you compile them in 'dev' mode with the defaults.
# Enable some optimizations even in 'dev' mode, to make tests faster. The basic
# optimizations enabled by "opt-level=1" don't affect debuggability too much.
#
# See https://www.reddit.com/r/rust/comments/gvrgca/this_is_a_neat_trick_for_getting_good_runtime/
#
[profile.dev.package."*"]
# Set the default for dependencies in Development mode.
opt-level = 3
[profile.dev]
# Turn on a small amount of optimization in Development mode.
opt-level = 1
[build]
# This is only present for local builds, as it will be overridden
# by the RUSTDOCFLAGS env var in CI.

View File

@@ -17,9 +17,8 @@ assignees: ''
## Implementation ideas
```[tasklist]
### Tasks
```
## Tasks
- [ ]
## Other related tasks and Epics

View File

@@ -1,7 +1,5 @@
self-hosted-runner:
labels:
- arm64
- dev
- gen3
- large
- small

View File

@@ -172,10 +172,10 @@ jobs:
# https://github.com/EmbarkStudios/cargo-deny
- name: Check rust licenses/bans/advisories/sources
if: ${{ !cancelled() }}
run: cargo deny check --hide-inclusion-graph
run: cargo deny check
build-neon:
needs: [ check-permissions, tag ]
needs: [ check-permissions ]
runs-on: [ self-hosted, gen3, large ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
@@ -187,7 +187,6 @@ jobs:
env:
BUILD_TYPE: ${{ matrix.build_type }}
GIT_VERSION: ${{ github.event.pull_request.head.sha || github.sha }}
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
steps:
- name: Fix git ownership

View File

@@ -21,10 +21,7 @@ env:
jobs:
check-macos-build:
if: |
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos')
timeout-minutes: 90
runs-on: macos-latest
@@ -115,182 +112,8 @@ jobs:
- name: Check that no warnings are produced
run: ./run_clippy.sh
check-linux-arm-build:
timeout-minutes: 90
runs-on: [ self-hosted, dev, arm64 ]
env:
# Use release build only, to have less debug info around
# Hence keeping target/ (and general cache size) smaller
BUILD_TYPE: release
CARGO_FEATURES: --features testing
CARGO_FLAGS: --locked --release
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
steps:
- name: Fix git ownership
run: |
# Workaround for `fatal: detected dubious ownership in repository at ...`
#
# Use both ${{ github.workspace }} and ${GITHUB_WORKSPACE} because they're different on host and in containers
# Ref https://github.com/actions/checkout/issues/785
#
git config --global --add safe.directory ${{ github.workspace }}
git config --global --add safe.directory ${GITHUB_WORKSPACE}
- name: Checkout
uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 1
- name: Set pg 14 revision for caching
id: pg_v14_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v14) >> $GITHUB_OUTPUT
- name: Set pg 15 revision for caching
id: pg_v15_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v15) >> $GITHUB_OUTPUT
- name: Set pg 16 revision for caching
id: pg_v16_rev
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v16) >> $GITHUB_OUTPUT
- name: Set env variables
run: |
echo "CARGO_HOME=${GITHUB_WORKSPACE}/.cargo" >> $GITHUB_ENV
- name: Cache postgres v14 build
id: cache_pg_14
uses: actions/cache@v3
with:
path: pg_install/v14
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v15 build
id: cache_pg_15
uses: actions/cache@v3
with:
path: pg_install/v15
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Cache postgres v16 build
id: cache_pg_16
uses: actions/cache@v3
with:
path: pg_install/v16
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
- name: Build postgres v14
if: steps.cache_pg_14.outputs.cache-hit != 'true'
run: mold -run make postgres-v14 -j$(nproc)
- name: Build postgres v15
if: steps.cache_pg_15.outputs.cache-hit != 'true'
run: mold -run make postgres-v15 -j$(nproc)
- name: Build postgres v16
if: steps.cache_pg_16.outputs.cache-hit != 'true'
run: mold -run make postgres-v16 -j$(nproc)
- name: Build neon extensions
run: mold -run make neon-pg-ext -j$(nproc)
- name: Build walproposer-lib
run: mold -run make walproposer-lib -j$(nproc)
- name: Run cargo build
run: |
mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests
- name: Run cargo test
run: |
cargo test $CARGO_FLAGS $CARGO_FEATURES
# Run separate tests for real S3
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
export REMOTE_STORAGE_S3_BUCKET=neon-github-public-dev
export REMOTE_STORAGE_S3_REGION=eu-central-1
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
cargo test $CARGO_FLAGS --package remote_storage --test test_real_s3
# Run separate tests for real Azure Blob Storage
# XXX: replace region with `eu-central-1`-like region
export ENABLE_REAL_AZURE_REMOTE_STORAGE=y
export AZURE_STORAGE_ACCOUNT="${{ secrets.AZURE_STORAGE_ACCOUNT_DEV }}"
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
# Avoid `$CARGO_FEATURES` since there's no `testing` feature in the e2e tests now
cargo test $CARGO_FLAGS --package remote_storage --test test_real_azure
check-codestyle-rust-arm:
timeout-minutes: 90
runs-on: [ self-hosted, dev, arm64 ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: true
fetch-depth: 1
# Some of our rust modules use FFI and need those to be checked
- name: Get postgres headers
run: make postgres-headers -j$(nproc)
# cargo hack runs the given cargo subcommand (clippy in this case) for all feature combinations.
# This will catch compiler & clippy warnings in all feature combinations.
# TODO: use cargo hack for build and test as well, but, that's quite expensive.
# NB: keep clippy args in sync with ./run_clippy.sh
- run: |
CLIPPY_COMMON_ARGS="$( source .neon_clippy_args; echo "$CLIPPY_COMMON_ARGS")"
if [ "$CLIPPY_COMMON_ARGS" = "" ]; then
echo "No clippy args found in .neon_clippy_args"
exit 1
fi
echo "CLIPPY_COMMON_ARGS=${CLIPPY_COMMON_ARGS}" >> $GITHUB_ENV
- name: Run cargo clippy (debug)
run: cargo hack --feature-powerset clippy $CLIPPY_COMMON_ARGS
- name: Run cargo clippy (release)
run: cargo hack --feature-powerset clippy --release $CLIPPY_COMMON_ARGS
- name: Check documentation generation
run: cargo doc --workspace --no-deps --document-private-items
env:
RUSTDOCFLAGS: "-Dwarnings -Arustdoc::private_intra_doc_links"
# Use `${{ !cancelled() }}` to run quck tests after the longer clippy run
- name: Check formatting
if: ${{ !cancelled() }}
run: cargo fmt --all -- --check
# https://github.com/facebookincubator/cargo-guppy/tree/bec4e0eb29dcd1faac70b1b5360267fc02bf830e/tools/cargo-hakari#2-keep-the-workspace-hack-up-to-date-in-ci
- name: Check rust dependencies
if: ${{ !cancelled() }}
run: |
cargo hakari generate --diff # workspace-hack Cargo.toml is up-to-date
cargo hakari manage-deps --dry-run # all workspace crates depend on workspace-hack
# https://github.com/EmbarkStudios/cargo-deny
- name: Check rust licenses/bans/advisories/sources
if: ${{ !cancelled() }}
run: cargo deny check
gather-rust-build-stats:
if: |
contains(github.event.pull_request.labels.*.name, 'run-extra-build-stats') ||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
github.ref_name == 'main'
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-extra-build-stats')
runs-on: [ self-hosted, gen3, large ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned

655
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -48,7 +48,6 @@ async-trait = "0.1"
aws-config = { version = "0.56", default-features = false, features=["rustls"] }
aws-sdk-s3 = "0.29"
aws-smithy-http = "0.56"
aws-smithy-async = { version = "0.56", default-features = false, features=["rt-tokio"] }
aws-credential-types = "0.56"
aws-types = "0.56"
axum = { version = "0.6.20", features = ["ws"] }
@@ -67,7 +66,7 @@ comfy-table = "6.1"
const_format = "0.2"
crc32c = "0.6"
crossbeam-utils = "0.8.5"
dashmap = { version = "5.5.0", features = ["raw-api"] }
dashmap = "5.5.0"
either = "1.8"
enum-map = "2.4.2"
enumset = "1.0.12"
@@ -83,7 +82,7 @@ hex = "0.4"
hex-literal = "0.4"
hmac = "0.12.1"
hostname = "0.3.1"
http-types = { version = "2", default-features = false }
http-types = "2"
humantime = "2.1"
humantime-serde = "1.1.1"
hyper = "0.14"
@@ -164,11 +163,11 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
## Other git libraries
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
@@ -205,7 +204,7 @@ tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
################# Binary contents sections

View File

@@ -74,30 +74,10 @@ highlight = "all"
workspace-default-features = "allow"
external-default-features = "allow"
allow = []
deny = []
skip = []
skip-tree = []
[[bans.deny]]
# we use tokio, the same rationale applies for async-{io,waker,global-executor,executor,channel,lock}, smol
# if you find yourself here while adding a dependency, try "default-features = false", ask around on #rust
name = "async-std"
[[bans.deny]]
name = "async-io"
[[bans.deny]]
name = "async-waker"
[[bans.deny]]
name = "async-global-executor"
[[bans.deny]]
name = "async-executor"
[[bans.deny]]
name = "smol"
# This section is considered when running `cargo deny check sources`.
# More documentation about the 'sources' section can be found here:
# https://embarkstudios.github.io/cargo-deny/checks/sources/cfg.html

View File

@@ -18,7 +18,7 @@ use utils::{
use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Buf, BufMut, Bytes, BytesMut};
/// The state of a tenant in this pageserver.
///
@@ -371,6 +371,8 @@ pub struct TenantInfo {
/// If a layer is present in both local FS and S3, it counts only once.
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
pub attachment_status: TenantAttachmentStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub generation: Option<u32>,
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
@@ -515,6 +517,8 @@ pub enum HistoricLayerInfo {
lsn_end: Lsn,
remote: bool,
access_stats: LayerAccessStats,
remote_path: Option<String>,
},
Image {
layer_file_name: String,
@@ -523,6 +527,8 @@ pub enum HistoricLayerInfo {
lsn_start: Lsn,
remote: bool,
access_stats: LayerAccessStats,
remote_path: Option<String>,
},
}
@@ -767,6 +773,36 @@ impl PagestreamBeMessage {
bytes.into()
}
pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
let mut buf = buf.reader();
let msg_tag = buf.read_u8()?;
match msg_tag {
100 => todo!(),
101 => todo!(),
102 => {
let buf = buf.get_ref();
/* TODO use constant */
if buf.len() == 8192 {
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page: buf.clone(),
}))
} else {
anyhow::bail!("invalid page size: {}", buf.len());
}
}
103 => {
let buf = buf.get_ref();
let cstr = std::ffi::CStr::from_bytes_until_nul(buf)?;
let rust_str = cstr.to_str()?;
Ok(PagestreamBeMessage::Error(PagestreamErrorResponse {
message: rust_str.to_owned(),
}))
}
104 => todo!(),
_ => bail!("unknown tag: {:?}", msg_tag),
}
}
}
#[cfg(test)]
@@ -832,6 +868,7 @@ mod tests {
state: TenantState::Active,
current_physical_size: Some(42),
attachment_status: TenantAttachmentStatus::Attached,
generation: None,
};
let expected_active = json!({
"id": original_active.id.to_string(),
@@ -852,6 +889,7 @@ mod tests {
},
current_physical_size: Some(42),
attachment_status: TenantAttachmentStatus::Attached,
generation: None,
};
let expected_broken = json!({
"id": original_broken.id.to_string(),

View File

@@ -8,7 +8,6 @@ license.workspace = true
anyhow.workspace = true
async-trait.workspace = true
once_cell.workspace = true
aws-smithy-async.workspace = true
aws-smithy-http.workspace = true
aws-types.workspace = true
aws-config.workspace = true

View File

@@ -81,6 +81,12 @@ impl std::fmt::Display for RemotePath {
}
}
impl From<RemotePath> for String {
fn from(val: RemotePath) -> Self {
val.0.into()
}
}
impl RemotePath {
pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
anyhow::ensure!(
@@ -102,7 +108,7 @@ impl RemotePath {
self.0.file_name()
}
pub fn join(&self, segment: &Utf8Path) -> Self {
pub fn join<P: AsRef<Utf8Path>>(&self, segment: P) -> Self {
Self(self.0.join(segment))
}

View File

@@ -4,27 +4,23 @@
//! allowing multiple api users to independently work with the same S3 bucket, if
//! their bucket prefixes are both specified and different.
use std::{borrow::Cow, sync::Arc};
use std::borrow::Cow;
use anyhow::Context;
use aws_config::{
environment::credentials::EnvironmentVariableCredentialsProvider,
imds::credentials::ImdsCredentialsProvider,
meta::credentials::CredentialsProviderChain,
provider_config::ProviderConfig,
retry::{RetryConfigBuilder, RetryMode},
web_identity_token::WebIdentityTokenCredentialsProvider,
imds::credentials::ImdsCredentialsProvider, meta::credentials::CredentialsProviderChain,
provider_config::ProviderConfig, web_identity_token::WebIdentityTokenCredentialsProvider,
};
use aws_credential_types::cache::CredentialsCache;
use aws_sdk_s3::{
config::{AsyncSleep, Config, Region, SharedAsyncSleep},
config::{Config, Region},
error::SdkError,
operation::get_object::GetObjectError,
primitives::ByteStream,
types::{Delete, ObjectIdentifier},
Client,
};
use aws_smithy_async::rt::sleep::TokioSleep;
use aws_smithy_http::body::SdkBody;
use hyper::Body;
use scopeguard::ScopeGuard;
@@ -87,23 +83,10 @@ impl S3Bucket {
.or_else("imds", ImdsCredentialsProvider::builder().build())
};
// AWS SDK requires us to specify how the RetryConfig should sleep when it wants to back off
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
// We do our own retries (see [`backoff::retry`]). However, for the AWS SDK to enable rate limiting in response to throttling
// responses (e.g. 429 on too many ListObjectsv2 requests), we must provide a retry config. We set it to use at most one
// attempt, and enable 'Adaptive' mode, which causes rate limiting to be enabled.
let mut retry_config = RetryConfigBuilder::new();
retry_config
.set_max_attempts(Some(1))
.set_mode(Some(RetryMode::Adaptive));
let mut config_builder = Config::builder()
.region(region)
.credentials_cache(CredentialsCache::lazy())
.credentials_provider(credentials_provider)
.sleep_impl(SharedAsyncSleep::from(sleep_impl))
.retry_config(retry_config.build());
.credentials_provider(credentials_provider);
if let Some(custom_endpoint) = aws_config.endpoint.clone() {
config_builder = config_builder

View File

@@ -1,156 +0,0 @@
/// An exhausting, injective counter with exponential search properties
///
/// The type implements an [`Iterator`] that yields numbers from the range
/// from 0 to a pre-defined maximum.
///
/// * It is *exhausting* in that it visits all numbers inside `0..max`.
/// * It is *injective* in that all numbers are visited only once.
/// * It has *exponential search properties* in that it iterates over the
/// number range in a fractal pattern.
///
/// This iterator is well suited for finding a pivot in algorithms that
/// want centered pivots: its output is heavily biased towards starting
/// with small numbers.
///
/// If the specified maximum is close to an exponent
/// of two we will quickly reach that exponent and therefore we will
/// quickly close in on the maximum, contrary to the claim of the counter
/// providing centered pivots. For most algorithms, this is not a problem
/// though as long as this is only a limited number of times, which it does,
/// as the next maximum will be an exponent of two, for which the next
/// smaller one is precisely its half of. So we will be able to perform a
/// binary search in all instances.
pub struct ExpCounter {
/// The (exclusive) upper limit of our search
max: u64,
/// The base that increases after each round trip
base: u64,
/// An increasing offset, always a power of two
offs: u64,
/// Iteration counter
i: u64,
}
impl ExpCounter {
/// Creates an iterator that uses `ExpCounter` for the first half of the
/// range and a linear range counter for the second half.
pub fn with_max_and_linear_search(max: u64) -> impl Iterator<Item = u64> {
let linear_start = max / 2;
ExpCounter::with_max(linear_start).chain(linear_start..max)
}
/// Creates a new `ExpCounter` instance that counts to the (exclusive) maximum
pub fn with_max(max: u64) -> Self {
Self {
max,
base: 0,
offs: 1,
i: 0,
}
}
}
impl Iterator for ExpCounter {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
if self.i >= self.max {
return None;
}
if self.i == 0 {
// Special casing this is easier than adding 0 as the first in some other fashion
self.i += 1;
return Some(0);
}
let to_yield = self.base + self.offs;
self.offs *= 2;
if self.base + self.offs >= self.max {
self.base += 1;
self.offs = 1;
while self.base > self.offs {
self.offs *= 2;
}
if (self.base + self.offs).count_ones() == 1 {
self.offs *= 2;
}
}
self.i += 1;
Some(to_yield)
}
}
#[cfg(test)]
mod test {
use std::collections::HashSet;
use super::*;
fn dupes_and_missing(list: &mut [u64], max: u64) -> (usize, usize) {
let mut contained_in_list = HashSet::<u64>::new();
let mut dupes = 0;
let mut missing = 0;
for v in list.iter() {
println!("{v:4 } = {v:010b}");
}
println!("Yielded {} items", list.len());
println!("The duplicates:");
list.sort();
let mut prev = None;
for v in list.iter() {
contained_in_list.insert(*v);
if Some(*v) == prev {
dupes += 1;
println!("{v:3 } = {v:08b}");
}
prev = Some(*v);
}
println!("The missing numbers:");
for v in 0..max {
if !contained_in_list.contains(&v) {
println!("{v:3 } = {v:010b}");
missing += 1;
}
}
(dupes, missing)
}
#[test]
fn to_40() {
let max = 40;
let list = ExpCounter::with_max(max).collect::<Vec<_>>();
let expected = [
0, 1, 2, 4, 8, 16, 32, 3, 5, 9, 17, 33, 6, 10, 18, 34, 7, 11, 19, 35, 12, 20, 36, 13,
21, 37, 14, 22, 38, 15, 23, 39, 24, 25, 26, 27, 28, 29, 30, 31,
];
assert_eq!(list, expected);
}
#[test]
fn to_64() {
let max = 64;
let mut list = ExpCounter::with_max(max).collect::<Vec<_>>();
assert_eq!(dupes_and_missing(&mut list, max), (0, 0));
}
#[test]
fn to_100() {
let max = 100;
let mut list = ExpCounter::with_max(max).collect::<Vec<_>>();
assert_eq!(dupes_and_missing(&mut list, max), (0, 0));
}
#[test]
fn to_127() {
let max = 127;
let mut list = ExpCounter::with_max(max).collect::<Vec<_>>();
assert_eq!(dupes_and_missing(&mut list, max), (0, 0));
}
#[test]
fn to_12345() {
let max = 12345;
let mut list = ExpCounter::with_max(max).collect::<Vec<_>>();
assert_eq!(dupes_and_missing(&mut list, max), (0, 0));
}
}

View File

@@ -78,7 +78,6 @@ pub mod completion;
/// Reporting utilities
pub mod error;
pub mod exp_counter;
/// async timeout helper
pub mod timeout;

View File

@@ -125,9 +125,6 @@ where
// Wake everyone with an error.
let mut internal = self.internal.lock().unwrap();
// Block any future waiters from starting
internal.shutdown = true;
// This will steal the entire waiters map.
// When we drop it all waiters will be woken.
mem::take(&mut internal.waiters)

View File

@@ -82,6 +82,8 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
tokio-stream.workspace = true
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
[dev-dependencies]
criterion.workspace = true

View File

@@ -1,13 +1,15 @@
use std::path::{Path, PathBuf};
use anyhow::Result;
use camino::Utf8Path;
use camino::{Utf8Path, Utf8PathBuf};
use clap::Subcommand;
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::block_io::BlockCursor;
use pageserver::tenant::disk_btree::DiskBtreeReader;
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use pageserver::{page_cache, virtual_file};
use pageserver::{
@@ -20,6 +22,7 @@ use pageserver::{
};
use std::fs;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
use crate::layer_map_analyzer::parse_filename;
@@ -45,6 +48,13 @@ pub(crate) enum LayerCmd {
/// The id from list-layer command
id: usize,
},
RewriteSummary {
layer_file_path: Utf8PathBuf,
#[clap(long)]
new_tenant_id: Option<TenantId>,
#[clap(long)]
new_timeline_id: Option<TimelineId>,
},
}
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
@@ -100,6 +110,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
println!("- timeline {}", timeline.file_name().to_string_lossy());
}
}
Ok(())
}
LayerCmd::ListLayer {
path,
@@ -128,6 +139,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
idx += 1;
}
}
Ok(())
}
LayerCmd::DumpLayer {
path,
@@ -168,7 +180,63 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
idx += 1;
}
}
Ok(())
}
LayerCmd::RewriteSummary {
layer_file_path,
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(10);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
macro_rules! rewrite_closure {
($($summary_ty:tt)*) => {{
|summary| $($summary_ty)* {
tenant_id: new_tenant_id.unwrap_or(summary.tenant_id),
timeline_id: new_timeline_id.unwrap_or(summary.timeline_id),
..summary
}
}};
}
let res = ImageLayer::rewrite_summary(
layer_file_path,
rewrite_closure!(image_layer::Summary),
&ctx,
)
.await;
match res {
Ok(()) => {
println!("Successfully rewrote summary of image layer {layer_file_path}");
return Ok(());
}
Err(image_layer::RewriteTenantTimelineError::MagicMismatch) => (), // fallthrough
Err(image_layer::RewriteTenantTimelineError::Other(e)) => {
return Err(e);
}
}
let res = DeltaLayer::rewrite_summary(
layer_file_path,
rewrite_closure!(delta_layer::Summary),
&ctx,
)
.await;
match res {
Ok(()) => {
println!("Successfully rewrote summary of delta layer {layer_file_path}");
return Ok(());
}
Err(delta_layer::RewriteTenantTimelineError::MagicMismatch) => (), // fallthrough
Err(delta_layer::RewriteTenantTimelineError::Other(e)) => {
return Err(e);
}
}
anyhow::bail!("not an image or delta layer: {layer_file_path}");
}
}
Ok(())
}

View File

@@ -0,0 +1,368 @@
use anyhow::Context;
use clap::Parser;
use hyper::client::HttpConnector;
use hyper::{Client, Uri};
use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block};
use pageserver::repository;
use pageserver_api::reltag::RelTag;
use rand::prelude::*;
use std::future::Future;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::task::JoinHandle;
use utils::lsn::Lsn;
struct Key(repository::Key);
impl std::str::FromStr for Key {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
repository::Key::from_hex(s).map(Key)
}
}
struct KeyRange {
start: i128,
end: i128,
}
impl KeyRange {
fn len(&self) -> i128 {
self.end - self.start
}
}
struct RelTagBlockNo {
rel_tag: RelTag,
block_no: u32,
}
#[derive(clap::Parser)]
struct Args {
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
page_service_connstring: String,
// tenant_id: String,
// timeline_id: String,
num_tasks: usize,
num_requests: usize,
tenants: Option<Vec<String>>,
#[clap(long)]
pick_n_tenants: Option<usize>,
}
#[derive(Debug, Default)]
struct Stats {
completed_requests: AtomicU64,
}
impl Stats {
fn inc(&self) {
self.completed_requests.fetch_add(1, Ordering::Relaxed);
}
}
#[tokio::main]
async fn main() {
let args: &'static Args = Box::leak(Box::new(Args::parse()));
// std::env::set_var("RUST_LOG", "info,tokio_postgres=trace");
// tracing_subscriber::fmt::init();
let client = Client::new();
let tenants = if let Some(tenants) = &args.tenants {
tenants.clone()
} else {
// let tenant_id = "b97965931096047b2d54958756baee7b";
// let timeline_id = "2868f84a8d166779e4c651b116c45059";
let resp = client
.get(Uri::try_from(&format!("{}/v1/tenant", args.mgmt_api_endpoint)).unwrap())
.await
.unwrap();
let body = hyper::body::to_bytes(resp).await.unwrap();
let tenants: serde_json::Value = serde_json::from_slice(&body).unwrap();
let mut out = Vec::new();
for t in tenants.as_array().unwrap() {
if let Some(limit) = args.pick_n_tenants {
if out.len() >= limit {
break;
}
}
out.push(t.get("id").unwrap().as_str().unwrap().to_owned());
}
if let Some(limit) = args.pick_n_tenants {
assert_eq!(out.len(), limit);
}
out
};
let mut tenant_timelines = Vec::new();
for tenant_id in tenants {
let resp = client
.get(
Uri::try_from(&format!(
"{}/v1/tenant/{}/timeline",
args.mgmt_api_endpoint, tenant_id
))
.unwrap(),
)
.await
.unwrap();
let body = hyper::body::to_bytes(resp).await.unwrap();
let timelines: serde_json::Value = serde_json::from_slice(&body).unwrap();
for t in timelines.as_array().unwrap() {
let timeline_id = t.get("timeline_id").unwrap().as_str().unwrap().to_owned();
tenant_timelines.push((tenant_id.clone(), timeline_id));
}
}
println!("tenant_timelines:\n{:?}", tenant_timelines);
let stats = Arc::new(Stats::default());
tokio::spawn({
let stats = Arc::clone(&stats);
async move {
loop {
let start = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
let elapsed = start.elapsed();
println!(
"RPS: {:.0}",
completed_requests as f64 / elapsed.as_secs_f64()
);
}
}
});
let mut tasks = Vec::new();
for (tenant_id, timeline_id) in tenant_timelines {
let stats = Arc::clone(&stats);
let t = tokio::spawn(timeline(
args,
client.clone(),
tenant_id,
timeline_id,
stats,
));
tasks.push(t);
}
for t in tasks {
t.await.unwrap();
}
}
fn timeline(
args: &'static Args,
http_client: Client<HttpConnector, hyper::Body>,
tenant_id: String,
timeline_id: String,
stats: Arc<Stats>,
) -> impl Future<Output = ()> + Send + Sync {
async move {
let resp = http_client
.get(
Uri::try_from(&format!(
"{}/v1/tenant/{}/timeline/{}/keyspace",
args.mgmt_api_endpoint, tenant_id, timeline_id
))
.unwrap(),
)
.await
.unwrap();
if !resp.status().is_success() {
panic!("Failed to get keyspace: {resp:?}");
}
let body = hyper::body::to_bytes(resp).await.unwrap();
let keyspace: serde_json::Value = serde_json::from_slice(&body).unwrap();
let lsn: Lsn = keyspace["at_lsn"].as_str().unwrap().parse().unwrap();
let ranges = keyspace["keys"]
.as_array()
.unwrap()
.iter()
.filter_map(|r| {
let r = r.as_array().unwrap();
assert_eq!(r.len(), 2);
let start = Key::from_str(r[0].as_str().unwrap()).unwrap();
let end = Key::from_str(r[1].as_str().unwrap()).unwrap();
// filter out non-relblock keys
match (is_rel_block_key(start.0), is_rel_block_key(end.0)) {
(true, true) => Some(KeyRange {
start: start.0.to_i128(),
end: end.0.to_i128(),
}),
(true, false) | (false, true) => {
unimplemented!("split up range")
}
(false, false) => None,
}
})
.collect::<Vec<_>>();
// weighted ranges
let weights = ranges.iter().map(|r| r.len()).collect::<Vec<_>>();
let ranges = Arc::new(ranges);
let weights = Arc::new(weights);
let mut tasks = Vec::<JoinHandle<()>>::new();
let _start = std::time::Instant::now();
for _i in 0..args.num_tasks {
let ranges = ranges.clone();
let _weights = weights.clone();
let _client = http_client.clone();
let tenant_id = tenant_id.clone();
let timeline_id = timeline_id.clone();
let task = tokio::spawn({
let stats = Arc::clone(&stats);
async move {
let mut client = getpage_client::Client::new(
args.page_service_connstring.clone(),
tenant_id.clone(),
timeline_id.clone(),
)
.await
.unwrap();
for _i in 0..args.num_requests {
let key = {
let mut rng = rand::thread_rng();
let r = ranges.choose_weighted(&mut rng, |r| r.len()).unwrap();
let key: i128 = rng.gen_range(r.start..r.end);
let key = repository::Key::from_i128(key);
// XXX filter these out when we iterate the keyspace
assert!(
is_rel_block_key(key),
"we filter non-relblock keys out above"
);
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we just checked");
RelTagBlockNo { rel_tag, block_no }
};
client
.getpage(key, lsn)
.await
.with_context(|| {
format!("getpage for tenant {} timeline {}", tenant_id, timeline_id)
})
.unwrap();
stats.inc();
}
client.shutdown().await;
}
});
tasks.push(task);
}
for task in tasks {
task.await.unwrap();
}
}
}
mod getpage_client {
use std::pin::Pin;
use futures::SinkExt;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
PagestreamGetPageResponse,
};
use tokio::task::JoinHandle;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::lsn::Lsn;
use crate::RelTagBlockNo;
pub(crate) struct Client {
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
conn_task: JoinHandle<()>,
}
impl Client {
pub async fn new(
connstring: String,
tenant_id: String,
timeline_id: String,
) -> anyhow::Result<Self> {
let (client, connection) =
tokio_postgres::connect(&connstring, postgres::NoTls).await?;
let conn_task_cancel = CancellationToken::new();
let conn_task = tokio::spawn({
let conn_task_cancel = conn_task_cancel.clone();
async move {
tokio::select! {
_ = conn_task_cancel.cancelled() => { }
res = connection => {
res.unwrap();
}
}
}
});
let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = client
.copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
.await?;
Ok(Self {
copy_both: Box::pin(copy_both),
conn_task,
cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
})
}
pub async fn shutdown(mut self) {
let _ = self.cancel_on_client_drop.take();
self.conn_task.await.unwrap();
}
pub async fn getpage(
&mut self,
key: RelTagBlockNo,
lsn: Lsn,
) -> anyhow::Result<PagestreamGetPageResponse> {
let req = PagestreamGetPageRequest {
latest: false,
rel: key.rel_tag,
blkno: key.block_no,
lsn,
};
let req = PagestreamFeMessage::GetPage(req);
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
self.copy_both.send_all(&mut req).await?;
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next = next.unwrap().unwrap();
match PagestreamBeMessage::deserialize(next)? {
PagestreamBeMessage::Exists(_) => todo!(),
PagestreamBeMessage::Nblocks(_) => todo!(),
PagestreamBeMessage::GetPage(p) => Ok(p),
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
PagestreamBeMessage::DbSize(_) => todo!(),
}
}
}
}

View File

@@ -261,7 +261,7 @@ async fn calculate_synthetic_size_worker(
}
};
for (tenant_id, tenant_state) in tenants {
for (tenant_id, tenant_state, _gen) in tenants {
if tenant_state != TenantState::Active {
continue;
}

View File

@@ -197,7 +197,7 @@ pub(super) async fn collect_all_metrics(
}
};
let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
if state != TenantState::Active {
None
} else {

View File

@@ -345,7 +345,7 @@ impl DeletionList {
result.extend(
timeline_layers
.into_iter()
.map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))),
.map(|l| timeline_remote_path.join(Utf8PathBuf::from(l))),
);
}
}

View File

@@ -541,7 +541,7 @@ async fn collect_eviction_candidates(
let mut candidates = Vec::new();
for (tenant_id, _state) in &tenants {
for (tenant_id, _state, _gen) in &tenants {
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}

View File

@@ -352,8 +352,7 @@ paths:
in: query
required: true
schema:
type: string
format: hex
type: integer
description: A LSN to get the timestamp
responses:
"200":

View File

@@ -303,7 +303,11 @@ async fn build_timeline_info(
// we're executing this function, we will outlive the timeline on-disk state.
info.current_logical_size_non_incremental = Some(
timeline
.get_current_logical_size_non_incremental(info.last_record_lsn, ctx)
.get_current_logical_size_non_incremental(
info.last_record_lsn,
CancellationToken::new(),
ctx,
)
.await?,
);
}
@@ -604,7 +608,7 @@ async fn get_timestamp_of_lsn_handler(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let result = timeline.get_max_timestamp_for_lsn(lsn, &ctx).await?;
let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?;
match result {
Some(time) => {
@@ -756,11 +760,12 @@ async fn tenant_list_handler(
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
})?
.iter()
.map(|(id, state)| TenantInfo {
.map(|(id, state, gen)| TenantInfo {
id: *id,
state: state.clone(),
current_physical_size: None,
attachment_status: state.attachment_status(),
generation: (*gen).into(),
})
.collect::<Vec<TenantInfo>>();
@@ -789,6 +794,7 @@ async fn tenant_status(
state: state.clone(),
current_physical_size: Some(current_physical_size),
attachment_status: state.attachment_status(),
generation: tenant.generation().into(),
})
}
.instrument(info_span!("tenant_status_handler", %tenant_id))
@@ -1478,7 +1484,7 @@ async fn timeline_collect_keyspace(
let keys = timeline
.collect_keyspace(at_lsn, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, Partitioning { keys, at_lsn })
}

View File

@@ -51,6 +51,9 @@ pub enum StorageTimeOperation {
#[strum(serialize = "create tenant")]
CreateTenant,
#[strum(serialize = "duplicate tenant")]
DuplicateTenant,
}
pub static STORAGE_TIME_SUM_PER_TIMELINE: Lazy<CounterVec> = Lazy::new(|| {
@@ -1225,6 +1228,15 @@ pub(crate) static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static WAL_REDO_WAIT_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wal_redo_wait_seconds",
"Time spent waiting for access to the Postgres WAL redo process",
redo_histogram_time_buckets!(),
)
.expect("failed to define a metric")
});
pub(crate) static WAL_REDO_RECORDS_HISTOGRAM: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wal_redo_records_histogram",
@@ -1252,46 +1264,6 @@ pub(crate) static WAL_REDO_RECORD_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
.unwrap()
});
pub(crate) struct WalRedoProcessCounters {
pub(crate) started: IntCounter,
pub(crate) killed_by_cause: enum_map::EnumMap<WalRedoKillCause, IntCounter>,
}
#[derive(Debug, enum_map::Enum, strum_macros::IntoStaticStr)]
pub(crate) enum WalRedoKillCause {
WalRedoProcessDrop,
NoLeakChildDrop,
Startup,
}
impl Default for WalRedoProcessCounters {
fn default() -> Self {
let started = register_int_counter!(
"pageserver_wal_redo_process_started_total",
"Number of WAL redo processes started",
)
.unwrap();
let killed = register_int_counter_vec!(
"pageserver_wal_redo_process_stopped_total",
"Number of WAL redo processes stopped",
&["cause"],
)
.unwrap();
Self {
started,
killed_by_cause: EnumMap::from_array(std::array::from_fn(|i| {
let cause = <WalRedoKillCause as enum_map::Enum>::from_usize(i);
let cause_str: &'static str = cause.into();
killed.with_label_values(&[cause_str])
})),
}
}
}
pub(crate) static WAL_REDO_PROCESS_COUNTERS: Lazy<WalRedoProcessCounters> =
Lazy::new(WalRedoProcessCounters::default);
/// Similar to `prometheus::HistogramTimer` but does not record on drop.
pub struct StorageTimeMetricsTimer {
metrics: StorageTimeMetrics,
@@ -1959,6 +1931,7 @@ pub fn preinitialize_metrics() {
&READ_NUM_FS_LAYERS,
&WAIT_LSN_TIME,
&WAL_REDO_TIME,
&WAL_REDO_WAIT_TIME,
&WAL_REDO_RECORDS_HISTOGRAM,
&WAL_REDO_BYTES_HISTOGRAM,
]

View File

@@ -512,11 +512,7 @@ impl PageServerHandler {
};
if let Err(e) = &response {
// Requests may fail as soon as we are Stopping, even if the Timeline's cancellation token wasn't fired yet,
// because wait_lsn etc will drop out
// is_stopping(): [`Timeline::flush_and_shutdown`] has entered
// is_canceled(): [`Timeline::shutdown`]` has entered
if timeline.cancel.is_cancelled() || timeline.is_stopping() {
if timeline.cancel.is_cancelled() {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.

View File

@@ -21,9 +21,8 @@ use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet};
use std::ops::ControlFlow;
use std::ops::Range;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::exp_counter::ExpCounter;
use utils::{bin_ser::BeSer, lsn::Lsn};
/// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type.
@@ -31,33 +30,9 @@ pub type BlockNumber = u32;
#[derive(Debug)]
pub enum LsnForTimestamp {
/// Found commits both before and after the given timestamp
Present(Lsn),
/// Found no commits after the given timestamp, this means
/// that the newest data in the branch is older than the given
/// timestamp.
///
/// All commits <= LSN happened before the given timestamp
Future(Lsn),
/// The queried timestamp is past our horizon we look back at (PITR)
///
/// All commits > LSN happened after the given timestamp,
/// but any commits < LSN might have happened before or after
/// the given timestamp. We don't know because no data before
/// the given lsn is available.
Past(Lsn),
/// We have found no commit with a timestamp,
/// so we can't return anything meaningful.
///
/// The associated LSN is the lower bound value we can safely
/// create branches on, but no statement is made if it is
/// older or newer than the timestamp.
///
/// This variant can e.g. be returned right after a
/// cluster import.
NoData(Lsn),
}
@@ -69,25 +44,6 @@ pub enum CalculateLogicalSizeError {
Other(#[from] anyhow::Error),
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum CollectKeySpaceError {
#[error(transparent)]
Decode(#[from] DeserializeError),
#[error(transparent)]
PageRead(PageReconstructError),
#[error("cancelled")]
Cancelled,
}
impl From<PageReconstructError> for CollectKeySpaceError {
fn from(err: PageReconstructError) -> Self {
match err {
PageReconstructError::Cancelled => Self::Cancelled,
err => Self::PageRead(err),
}
}
}
impl From<PageReconstructError> for CalculateLogicalSizeError {
fn from(pre: PageReconstructError) -> Self {
match pre {
@@ -369,11 +325,7 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<LsnForTimestamp, PageReconstructError> {
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
// We use this method to figure out the branching LSN for the new branch, but the
// GC cutoff could be before the branching point and we cannot create a new branch
// with LSN < `ancestor_lsn`. Thus, pick the maximum of these two to be
// on the safe side.
let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
let min_lsn = *gc_cutoff_lsn_guard;
let max_lsn = self.get_last_record_lsn();
// LSNs are always 8-byte aligned. low/mid/high represent the
@@ -383,92 +335,83 @@ impl Timeline {
let mut found_smaller = false;
let mut found_larger = false;
// This is a search budget to not make the search too expensive
let mut budget = 1_000_000u32;
while low < high {
// cannot overflow, high and low are both smaller than u64::MAX / 2
// this always holds: low <= mid_start < high
let mid_start = (high + low) / 2;
let mut mid = mid_start;
let mid = (high + low) / 2;
let mut max = None;
let cmp = self
.is_latest_commit_timestamp_ge_than(
search_timestamp,
Lsn(mid * 8),
&mut found_smaller,
&mut found_larger,
ctx,
)
.await?;
// Search for an lsn that has a commit timestamp.
// The search with `ExpCounter` will eventually try all LSNs
// in the range between mid_start and high, which is important
// when we set high to mid if we have exhausted all possibilities.
// We don't do an explosive search as we want to prove that
// every single lsn up to high is giving inconclusive results.
// This can only be done by trying all lsns.
for offs in ExpCounter::with_max_and_linear_search(high - mid_start) {
mid = offs + mid_start;
// Do the query for mid + 1 instead of mid so that we can make definite statements
// about low (low's invariant: always points to commit before or at search_timestamp).
max = self
.get_max_timestamp_for_lsn(Lsn((mid + 1) * 8), ctx)
.await?;
if max.is_some() {
break;
}
// Do some limiting to make sure the query does not become too expensive.
if let Some(new_budget) = budget.checked_sub(1) {
budget = new_budget;
} else {
return Ok(LsnForTimestamp::NoData(min_lsn));
}
}
if let Some(max) = max {
if max <= search_timestamp {
// We found a commit with timestamp before (or at) `search_timestamp`.
found_smaller = true;
low = mid + 1;
} else {
// We found a commit with timestamp after `search_timestamp`.
found_larger = true;
high = mid;
}
if cmp {
high = mid;
} else {
// max is None so we have proof of a chain of None's from mid_start all up to high.
// Thus we can safely set high to mid_start
high = mid_start;
low = mid + 1;
}
}
// If `found_smaller == true`, `low` is the LSN of the last commit record
// before or at `search_timestamp`
//
// FIXME: it would be better to get the LSN of the previous commit.
// Otherwise, if you restore to the returned LSN, the database will
// include physical changes from later commits that will be marked
// as aborted, and will need to be vacuumed away.
let commit_lsn = Lsn(low * 8);
match (found_smaller, found_larger) {
(false, false) => {
// This can happen if no commit records have been processed yet, e.g.
// just after importing a cluster.
Ok(LsnForTimestamp::NoData(min_lsn))
Ok(LsnForTimestamp::NoData(max_lsn))
}
(true, false) => {
// Didn't find any commit timestamps larger than the request
Ok(LsnForTimestamp::Future(max_lsn))
}
(false, true) => {
// Didn't find any commit timestamps smaller than the request
Ok(LsnForTimestamp::Past(min_lsn))
Ok(LsnForTimestamp::Past(max_lsn))
}
(true, false) => {
// Only found commits with timestamps smaller than the request.
// It's still a valid case for branch creation, return it.
// And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
// case, anyway.
Ok(LsnForTimestamp::Future(commit_lsn))
(true, true) => {
// low is the LSN of the first commit record *after* the search_timestamp,
// Back off by one to get to the point just before the commit.
//
// FIXME: it would be better to get the LSN of the previous commit.
// Otherwise, if you restore to the returned LSN, the database will
// include physical changes from later commits that will be marked
// as aborted, and will need to be vacuumed away.
Ok(LsnForTimestamp::Present(Lsn((low - 1) * 8)))
}
(true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
}
}
/// Obtain the timestamp for the given lsn.
/// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
/// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
///
/// If the lsn has no timestamps, returns None. Returns the maximum such timestamp otherwise.
pub(crate) async fn get_max_timestamp_for_lsn(
/// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
/// with a smaller/larger timestamp.
///
pub async fn is_latest_commit_timestamp_ge_than(
&self,
search_timestamp: TimestampTz,
probe_lsn: Lsn,
found_smaller: &mut bool,
found_larger: &mut bool,
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
if timestamp >= search_timestamp {
*found_larger = true;
return ControlFlow::Break(true);
} else {
*found_smaller = true;
}
ControlFlow::Continue(())
})
.await
}
/// Obtain the possible timestamp range for the given lsn.
///
/// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
pub async fn get_timestamp_for_lsn(
&self,
probe_lsn: Lsn,
ctx: &RequestContext,
@@ -635,6 +578,7 @@ impl Timeline {
pub async fn get_current_logical_size_non_incremental(
&self,
lsn: Lsn,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
@@ -646,7 +590,7 @@ impl Timeline {
let mut total_size: u64 = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
for rel in self.list_rels(*spcnode, *dbnode, lsn, ctx).await? {
if self.cancel.is_cancelled() {
if cancel.is_cancelled() {
return Err(CalculateLogicalSizeError::Cancelled);
}
let relsize_key = rel_size_to_key(rel);
@@ -663,11 +607,11 @@ impl Timeline {
/// Get a KeySpace that covers all the Keys that are in use at the given LSN.
/// Anything that's not listed maybe removed from the underlying storage (from
/// that LSN forwards).
pub(crate) async fn collect_keyspace(
pub async fn collect_keyspace(
&self,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<KeySpace, CollectKeySpaceError> {
) -> anyhow::Result<KeySpace> {
// Iterate through key ranges, greedily packing them into partitions
let mut result = KeySpaceAccum::new();
@@ -676,7 +620,7 @@ impl Timeline {
// Fetch list of database dirs and iterate them
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
let dbdir = DbDirectory::des(&buf)?;
let dbdir = DbDirectory::des(&buf).context("deserialization failure")?;
let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
dbs.sort_unstable();
@@ -709,7 +653,7 @@ impl Timeline {
let slrudir_key = slru_dir_to_key(kind);
result.add_key(slrudir_key);
let buf = self.get(slrudir_key, lsn, ctx).await?;
let dir = SlruSegmentDirectory::des(&buf)?;
let dir = SlruSegmentDirectory::des(&buf).context("deserialization failure")?;
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
segments.sort_unstable();
for segno in segments {
@@ -727,7 +671,7 @@ impl Timeline {
// Then pg_twophase
result.add_key(TWOPHASEDIR_KEY);
let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
let twophase_dir = TwoPhaseDirectory::des(&buf)?;
let twophase_dir = TwoPhaseDirectory::des(&buf).context("deserialization failure")?;
let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
xids.sort_unstable();
for xid in xids {
@@ -1756,6 +1700,7 @@ const AUX_FILES_KEY: Key = Key {
// Reverse mappings for a few Keys.
// These are needed by WAL redo manager.
/// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`.
pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
Ok(match key.field1 {
0x00 => (
@@ -1771,7 +1716,8 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
})
}
fn is_rel_block_key(key: Key) -> bool {
/// See [[key_to_rel_block]].
pub fn is_rel_block_key(key: Key) -> bool {
key.field1 == 0x00 && key.field4 != 0
}

View File

@@ -1711,6 +1711,10 @@ impl Tenant {
self.current_state() == TenantState::Active
}
pub fn generation(&self) -> Generation {
self.generation
}
/// Changes tenant status to active, unless shutdown was already requested.
///
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
@@ -1841,13 +1845,7 @@ impl Tenant {
timelines.values().for_each(|timeline| {
let timeline = Arc::clone(timeline);
let span = Span::current();
js.spawn(async move {
if freeze_and_flush {
timeline.flush_and_shutdown().instrument(span).await
} else {
timeline.shutdown().instrument(span).await
}
});
js.spawn(async move { timeline.shutdown(freeze_and_flush).instrument(span).await });
})
};
tracing::info!("Waiting for timelines...");
@@ -4733,7 +4731,7 @@ mod tests {
// Keeps uninit mark in place
let raw_tline = tline.raw_timeline().unwrap();
raw_tline
.shutdown()
.shutdown(false)
.instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_id))
.await;
std::mem::forget(tline);

View File

@@ -327,7 +327,7 @@ mod tests {
let mut sz: u16 = rng.gen();
// Make 50% of the arrays small
if rng.gen() {
sz &= 63;
sz |= 63;
}
random_array(sz.into())
})

View File

@@ -30,7 +30,7 @@ use crate::metrics::TENANT_MANAGER as METRICS;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
use crate::tenant::{create_tenant_files, AttachedTenantConf, Tenant, TenantState};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
use utils::crashsafe::path_with_suffix_extension;
@@ -40,7 +40,7 @@ use utils::id::{TenantId, TimelineId};
use super::delete::DeleteTenantError;
use super::timeline::delete::DeleteTimelineFlow;
use super::TenantSharedResources;
use super::{SpawnMode, TenantSharedResources};
/// For a tenant that appears in TenantsMap, it may either be
/// - `Attached`: has a full Tenant object, is elegible to service
@@ -566,10 +566,8 @@ pub(crate) async fn shutdown_all_tenants() {
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
use utils::completion;
let mut join_set = JoinSet::new();
// Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
let (total_in_progress, total_attached) = {
// Atomically, 1. extract the list of tenants to shut down and 2. prevent creation of new tenants.
let (in_progress_ops, tenants_to_shut_down) = {
let mut m = tenants.write().unwrap();
match &mut *m {
TenantsMap::Initializing => {
@@ -579,67 +577,78 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
}
TenantsMap::Open(tenants) => {
let mut shutdown_state = HashMap::new();
let mut total_in_progress = 0;
let mut total_attached = 0;
let mut in_progress_ops = Vec::new();
let mut tenants_to_shut_down = Vec::new();
for (tenant_id, v) in tenants.drain() {
for (k, v) in tenants.drain() {
match v {
TenantSlot::Attached(t) => {
shutdown_state.insert(tenant_id, TenantSlot::Attached(t.clone()));
join_set.spawn(
async move {
let freeze_and_flush = true;
let res = {
let (_guard, shutdown_progress) = completion::channel();
t.shutdown(shutdown_progress, freeze_and_flush).await
};
if let Err(other_progress) = res {
// join the another shutdown in progress
other_progress.wait().await;
}
// we cannot afford per tenant logging here, because if s3 is degraded, we are
// going to log too many lines
debug!("tenant successfully stopped");
}
.instrument(info_span!("shutdown", %tenant_id)),
);
total_attached += 1;
tenants_to_shut_down.push(t.clone());
shutdown_state.insert(k, TenantSlot::Attached(t));
}
TenantSlot::Secondary => {
shutdown_state.insert(tenant_id, TenantSlot::Secondary);
shutdown_state.insert(k, TenantSlot::Secondary);
}
TenantSlot::InProgress(notify) => {
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
// wait for their notifications to fire in this function.
join_set.spawn(async move {
notify.wait().await;
});
total_in_progress += 1;
in_progress_ops.push(notify);
}
}
}
*m = TenantsMap::ShuttingDown(shutdown_state);
(total_in_progress, total_attached)
(in_progress_ops, tenants_to_shut_down)
}
TenantsMap::ShuttingDown(_) => {
// TODO: it is possible that detach and shutdown happen at the same time. as a
// result, during shutdown we do not wait for detach.
error!("already shutting down, this function isn't supposed to be called more than once");
return;
}
}
};
let started_at = std::time::Instant::now();
info!(
"Waiting for {} InProgress tenants and {} Attached tenants to shut down",
total_in_progress, total_attached
in_progress_ops.len(),
tenants_to_shut_down.len()
);
for barrier in in_progress_ops {
barrier.wait().await;
}
info!(
"InProgress tenants shut down, waiting for {} Attached tenants to shut down",
tenants_to_shut_down.len()
);
let started_at = std::time::Instant::now();
let mut join_set = JoinSet::new();
for tenant in tenants_to_shut_down {
let tenant_id = tenant.get_tenant_id();
join_set.spawn(
async move {
let freeze_and_flush = true;
let res = {
let (_guard, shutdown_progress) = completion::channel();
tenant.shutdown(shutdown_progress, freeze_and_flush).await
};
if let Err(other_progress) = res {
// join the another shutdown in progress
other_progress.wait().await;
}
// we cannot afford per tenant logging here, because if s3 is degraded, we are
// going to log too many lines
debug!("tenant successfully stopped");
}
.instrument(info_span!("shutdown", %tenant_id)),
);
}
let total = join_set.len();
let mut panicked = 0;
let mut buffering = true;
@@ -652,7 +661,7 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
match joined {
Ok(()) => {}
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the tasks");
unreachable!("we are not cancelling any of the futures");
}
Err(join_error) if join_error.is_panic() => {
// cannot really do anything, as this panic is likely a bug
@@ -1285,7 +1294,8 @@ pub(crate) enum TenantMapListError {
///
/// Get list of tenants, for the mgmt API
///
pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
pub(crate) async fn list_tenants(
) -> Result<Vec<(TenantId, TenantState, Generation)>, TenantMapListError> {
let tenants = TENANTS.read().unwrap();
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
@@ -1293,7 +1303,9 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
};
Ok(m.iter()
.filter_map(|(id, tenant)| match tenant {
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
TenantSlot::Attached(tenant) => {
Some((*id, tenant.current_state(), tenant.generation()))
}
TenantSlot::Secondary => None,
TenantSlot::InProgress(_) => None,
})

View File

@@ -6,6 +6,7 @@ use std::sync::Arc;
use anyhow::{bail, Context};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use crate::context::RequestContext;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
@@ -349,6 +350,10 @@ async fn fill_logical_sizes(
// our advantage with `?` error handling.
let mut joinset = tokio::task::JoinSet::new();
let cancel = tokio_util::sync::CancellationToken::new();
// be sure to cancel all spawned tasks if we are dropped
let _dg = cancel.clone().drop_guard();
// For each point that would benefit from having a logical size available,
// spawn a Task to fetch it, unless we have it cached already.
for seg in segments.iter() {
@@ -366,8 +371,15 @@ async fn fill_logical_sizes(
let parallel_size_calcs = Arc::clone(limit);
let ctx = ctx.attached_child();
joinset.spawn(
calculate_logical_size(parallel_size_calcs, timeline, lsn, cause, ctx)
.in_current_span(),
calculate_logical_size(
parallel_size_calcs,
timeline,
lsn,
cause,
ctx,
cancel.child_token(),
)
.in_current_span(),
);
}
e.insert(cached_size);
@@ -475,13 +487,14 @@ async fn calculate_logical_size(
lsn: utils::lsn::Lsn,
cause: LogicalSizeCalculationCause,
ctx: RequestContext,
cancel: CancellationToken,
) -> Result<TimelineAtLsnSizeResult, RecvError> {
let _permit = tokio::sync::Semaphore::acquire_owned(limit)
.await
.expect("global semaphore should not had been closed");
let size_res = timeline
.spawn_ondemand_logical_size_calculation(lsn, cause, ctx)
.spawn_ondemand_logical_size_calculation(lsn, cause, ctx, cancel)
.instrument(info_span!("spawn_ondemand_logical_size_calculation"))
.await?;
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))

View File

@@ -2,7 +2,7 @@
pub mod delta_layer;
mod filename;
mod image_layer;
pub mod image_layer;
mod inmemory_layer;
mod layer;
mod layer_desc;

View File

@@ -69,13 +69,13 @@ use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Summary {
/// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
magic: u16,
format_version: u16,
pub magic: u16,
pub format_version: u16,
tenant_id: TenantId,
timeline_id: TimelineId,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub key_range: Range<Key>,
pub lsn_range: Range<Lsn>,
/// Block number where the 'index' part of the file begins.
pub index_start_blk: u32,
@@ -609,6 +609,62 @@ impl Drop for DeltaLayerWriter {
}
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteTenantTimelineError {
#[error("magic mismatch")]
MagicMismatch,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl From<std::io::Error> for RewriteTenantTimelineError {
fn from(e: std::io::Error) -> Self {
Self::Other(anyhow::anyhow!(e))
}
}
impl DeltaLayer {
pub async fn rewrite_summary<F>(
path: &Utf8Path,
rewrite: F,
ctx: &RequestContext,
) -> Result<(), RewriteTenantTimelineError>
where
F: Fn(Summary) -> Summary,
{
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
let mut file = file.file;
if actual_summary.magic != DELTA_FILE_MAGIC {
return Err(RewriteTenantTimelineError::MagicMismatch);
}
let new_summary = rewrite(actual_summary);
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
if buf.spilled() {
// The code in DeltaLayerWriterInner just warn!()s for this.
// It should probably error out as well.
return Err(RewriteTenantTimelineError::Other(anyhow::anyhow!(
"Used more than one page size for summary buffer: {}",
buf.len()
)));
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
Ok(())
}
}
impl DeltaLayerInner {
pub(super) async fn load(
path: &Utf8Path,

View File

@@ -67,20 +67,20 @@ use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
/// the 'index' starts at the block indicated by 'index_start_blk'
///
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(super) struct Summary {
pub struct Summary {
/// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
magic: u16,
format_version: u16,
pub magic: u16,
pub format_version: u16,
tenant_id: TenantId,
timeline_id: TimelineId,
key_range: Range<Key>,
lsn: Lsn,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub key_range: Range<Key>,
pub lsn: Lsn,
/// Block number where the 'index' part of the file begins.
index_start_blk: u32,
pub index_start_blk: u32,
/// Block within the 'index', where the B-tree root page is stored
index_root_blk: u32,
pub index_root_blk: u32,
// the 'values' part starts after the summary header, on block 1.
}
@@ -294,6 +294,61 @@ impl ImageLayer {
}
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteTenantTimelineError {
#[error("magic mismatch")]
MagicMismatch,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl From<std::io::Error> for RewriteTenantTimelineError {
fn from(e: std::io::Error) -> Self {
Self::Other(anyhow::anyhow!(e))
}
}
impl ImageLayer {
pub async fn rewrite_summary<F>(
path: &Utf8Path,
rewrite: F,
ctx: &RequestContext,
) -> Result<(), RewriteTenantTimelineError>
where
F: Fn(Summary) -> Summary,
{
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
let mut file = file.file;
if actual_summary.magic != IMAGE_FILE_MAGIC {
return Err(RewriteTenantTimelineError::MagicMismatch);
}
let new_summary = rewrite(actual_summary);
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
if buf.spilled() {
// The code in ImageLayerWriterInner just warn!()s for this.
// It should probably error out as well.
return Err(RewriteTenantTimelineError::Other(anyhow::anyhow!(
"Used more than one page size for summary buffer: {}",
buf.len()
)));
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
Ok(())
}
}
impl ImageLayerInner {
pub(super) async fn load(
path: &Utf8Path,

View File

@@ -3,6 +3,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::models::{
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
};
use remote_storage::RemotePath;
use std::ops::Range;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
@@ -251,7 +252,6 @@ impl Layer {
layer
.get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0, ctx)
.instrument(tracing::info_span!("get_value_reconstruct_data", layer=%self))
.await
}
@@ -305,6 +305,11 @@ impl Layer {
&self.0.path
}
/// This can return None even though it should return Some in some edge cases.
pub(crate) fn remote_path(&self) -> Option<RemotePath> {
self.0.remote_path()
}
pub(crate) fn metadata(&self) -> LayerFileMetadata {
self.0.metadata()
}
@@ -915,6 +920,17 @@ impl LayerInner {
}
}
/// This can return None even though it should return Some in some edge cases.
fn remote_path(&self) -> Option<RemotePath> {
let tl = self.timeline.upgrade()?; // TODO: should distinguish this case, but, accuracy doesn't matter for this field.
Some(crate::tenant::remote_timeline_client::remote_layer_path(
&tl.tenant_id,
&tl.timeline_id,
&self.desc.filename(),
self.generation,
))
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.desc.filename().file_name();
@@ -934,6 +950,7 @@ impl LayerInner {
lsn_end: lsn_range.end,
remote,
access_stats,
remote_path: self.remote_path().map(|p| p.into()),
}
} else {
let lsn = self.desc.image_layer_lsn();
@@ -944,6 +961,7 @@ impl LayerInner {
lsn_start: lsn,
remote,
access_stats,
remote_path: self.remote_path().map(|p| p.into()),
}
}
}
@@ -1212,10 +1230,8 @@ impl DownloadedLayer {
// this will be a permanent failure
.context("load layer");
if let Err(e) = res.as_ref() {
if res.is_err() {
LAYER_IMPL_METRICS.inc_permanent_loading_failures();
// TODO(#5815): we are not logging all errors, so temporarily log them here as well
tracing::error!("layer loading failed permanently: {e:#}");
}
res
};
@@ -1294,7 +1310,6 @@ impl ResidentLayer {
}
/// Loads all keys stored in the layer. Returns key, lsn and value size.
#[tracing::instrument(skip_all, fields(layer=%self))]
pub(crate) async fn load_keys<'a>(
&'a self,
ctx: &RequestContext,

View File

@@ -36,6 +36,7 @@ use std::time::{Duration, Instant, SystemTime};
use crate::context::{
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
};
use crate::deletion_queue::DeletionQueueClient;
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::{
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
@@ -49,7 +50,6 @@ use crate::tenant::{
metadata::{save_metadata, TimelineMetadata},
par_fsync,
};
use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
@@ -247,7 +247,7 @@ pub struct Timeline {
/// the flush finishes. You can use that to wait for the flush to finish.
layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>,
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
@@ -374,19 +374,6 @@ pub enum PageReconstructError {
WalRedo(anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
enum FlushLayerError {
/// Timeline cancellation token was cancelled
#[error("timeline shutting down")]
Cancelled,
#[error(transparent)]
PageReconstructError(#[from] PageReconstructError),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl std::fmt::Debug for PageReconstructError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
@@ -904,16 +891,15 @@ impl Timeline {
self.launch_eviction_task(background_jobs_can_start);
}
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
/// also to remote storage. This method can easily take multiple seconds for a busy timeline.
///
/// While we are flushing, we continue to accept read I/O.
#[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
pub(crate) async fn flush_and_shutdown(&self) {
pub async fn shutdown(self: &Arc<Self>, freeze_and_flush: bool) {
debug_assert_current_span_has_tenant_and_timeline_id();
// Stop ingesting data, so that we are not still writing to an InMemoryLayer while
// trying to flush
// Signal any subscribers to our cancellation token to drop out
tracing::debug!("Cancelling CancellationToken");
self.cancel.cancel();
// prevent writes to the InMemoryLayer
tracing::debug!("Waiting for WalReceiverManager...");
task_mgr::shutdown_tasks(
Some(TaskKind::WalReceiverManager),
@@ -922,70 +908,40 @@ impl Timeline {
)
.await;
// Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
self.last_record_lsn.shutdown();
// now all writers to InMemory layer are gone, do the final flush if requested
match self.freeze_and_flush().await {
Ok(_) => {
// drain the upload queue
if let Some(client) = self.remote_client.as_ref() {
// if we did not wait for completion here, it might be our shutdown process
// didn't wait for remote uploads to complete at all, as new tasks can forever
// be spawned.
//
// what is problematic is the shutting down of RemoteTimelineClient, because
// obviously it does not make sense to stop while we wait for it, but what
// about corner cases like s3 suddenly hanging up?
if let Err(e) = client.wait_completion().await {
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
// we have some extra WAL replay to do next time the timeline starts.
warn!("failed to flush to remote storage: {e:#}");
}
if freeze_and_flush {
match self.freeze_and_flush().await {
Ok(()) => {}
Err(e) => {
warn!("failed to freeze and flush: {e:#}");
return; // TODO: should probably drain remote timeline client anyways?
}
}
Err(e) => {
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
// we have some extra WAL replay to do next time the timeline starts.
warn!("failed to freeze and flush: {e:#}");
// drain the upload queue
let res = if let Some(client) = self.remote_client.as_ref() {
// if we did not wait for completion here, it might be our shutdown process
// didn't wait for remote uploads to complete at all, as new tasks can forever
// be spawned.
//
// what is problematic is the shutting down of RemoteTimelineClient, because
// obviously it does not make sense to stop while we wait for it, but what
// about corner cases like s3 suddenly hanging up?
client.wait_completion().await
} else {
Ok(())
};
if let Err(e) = res {
warn!("failed to await for frozen and flushed uploads: {e:#}");
}
}
self.shutdown().await;
}
/// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of
/// the graceful [`Timeline::flush_and_shutdown`] function.
pub(crate) async fn shutdown(&self) {
// Signal any subscribers to our cancellation token to drop out
tracing::debug!("Cancelling CancellationToken");
self.cancel.cancel();
// Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
// while doing so.
self.last_record_lsn.shutdown();
// Shut down the layer flush task before the remote client, as one depends on the other
task_mgr::shutdown_tasks(
Some(TaskKind::LayerFlushTask),
Some(self.tenant_id),
Some(self.timeline_id),
)
.await;
// Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
// case our caller wants to use that for a deletion
if let Some(remote_client) = self.remote_client.as_ref() {
match remote_client.stop() {
Ok(()) => {}
Err(StopError::QueueUninitialized) => {
// Shutting down during initialization is legal
}
}
}
tracing::debug!("Waiting for tasks...");
task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(self.timeline_id)).await;
// Finally wait until any gate-holders are complete
@@ -1029,12 +985,7 @@ impl Timeline {
reason,
backtrace: backtrace_str,
};
self.set_state(broken_state);
// Although the Broken state is not equivalent to shutdown() (shutdown will be called
// later when this tenant is detach or the process shuts down), firing the cancellation token
// here avoids the need for other tasks to watch for the Broken state explicitly.
self.cancel.cancel();
self.set_state(broken_state)
}
pub fn current_state(&self) -> TimelineState {
@@ -1790,8 +1741,12 @@ impl Timeline {
// delay will be terminated by a timeout regardless.
let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
// no extra cancellation here, because nothing really waits for this to complete compared
// to spawn_ondemand_logical_size_calculation.
let cancel = CancellationToken::new();
let calculated_size = match self_clone
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx)
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
.await
{
Ok(s) => s,
@@ -1860,6 +1815,7 @@ impl Timeline {
lsn: Lsn,
cause: LogicalSizeCalculationCause,
ctx: RequestContext,
cancel: CancellationToken,
) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
let (sender, receiver) = oneshot::channel();
let self_clone = Arc::clone(self);
@@ -1880,7 +1836,7 @@ impl Timeline {
false,
async move {
let res = self_clone
.logical_size_calculation_task(lsn, cause, &ctx)
.logical_size_calculation_task(lsn, cause, &ctx, cancel)
.await;
let _ = sender.send(res).ok();
Ok(()) // Receiver is responsible for handling errors
@@ -1896,28 +1852,58 @@ impl Timeline {
lsn: Lsn,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
cancel: CancellationToken,
) -> Result<u64, CalculateLogicalSizeError> {
span::debug_assert_current_span_has_tenant_and_timeline_id();
let _guard = self.gate.enter();
let mut timeline_state_updates = self.subscribe_for_state_updates();
let self_calculation = Arc::clone(self);
let mut calculation = pin!(async {
let cancel = cancel.child_token();
let ctx = ctx.attached_child();
self_calculation
.calculate_logical_size(lsn, cause, &ctx)
.calculate_logical_size(lsn, cause, cancel, &ctx)
.await
});
let timeline_state_cancellation = async {
loop {
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = timeline_state_updates.borrow().clone();
match new_state {
// we're running this job for active timelines only
TimelineState::Active => continue,
TimelineState::Broken { .. }
| TimelineState::Stopping
| TimelineState::Loading => {
break format!("aborted because timeline became inactive (new state: {new_state:?})")
}
}
}
Err(_sender_dropped_error) => {
// can't happen, the sender is not dropped as long as the Timeline exists
break "aborted because state watch was dropped".to_string();
}
}
}
};
let taskmgr_shutdown_cancellation = async {
task_mgr::shutdown_watcher().await;
"aborted because task_mgr shutdown requested".to_string()
};
tokio::select! {
res = &mut calculation => { res }
_ = self.cancel.cancelled() => {
debug!("cancelling logical size calculation for timeline shutdown");
reason = timeline_state_cancellation => {
debug!(reason = reason, "cancelling calculation");
cancel.cancel();
calculation.await
}
_ = task_mgr::shutdown_watcher() => {
debug!("cancelling logical size calculation for task shutdown");
reason = taskmgr_shutdown_cancellation => {
debug!(reason = reason, "cancelling calculation");
cancel.cancel();
calculation.await
}
}
@@ -1931,6 +1917,7 @@ impl Timeline {
&self,
up_to_lsn: Lsn,
cause: LogicalSizeCalculationCause,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<u64, CalculateLogicalSizeError> {
info!(
@@ -1973,7 +1960,7 @@ impl Timeline {
};
let timer = storage_time_metrics.start_timer();
let logical_size = self
.get_current_logical_size_non_incremental(up_to_lsn, ctx)
.get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
.await?;
debug!("calculated logical size: {logical_size}");
timer.stop_and_record();
@@ -2386,10 +2373,6 @@ impl Timeline {
info!("started flush loop");
loop {
tokio::select! {
_ = self.cancel.cancelled() => {
info!("shutting down layer flush task");
break;
},
_ = task_mgr::shutdown_watcher() => {
info!("shutting down layer flush task");
break;
@@ -2401,14 +2384,6 @@ impl Timeline {
let timer = self.metrics.flush_time_histo.start_timer();
let flush_counter = *layer_flush_start_rx.borrow();
let result = loop {
if self.cancel.is_cancelled() {
info!("dropping out of flush loop for timeline shutdown");
// Note: we do not bother transmitting into [`layer_flush_done_tx`], because
// anyone waiting on that will respect self.cancel as well: they will stop
// waiting at the same time we as drop out of this loop.
return;
}
let layer_to_flush = {
let guard = self.layers.read().await;
guard.layer_map().frozen_layers.front().cloned()
@@ -2417,18 +2392,9 @@ impl Timeline {
let Some(layer_to_flush) = layer_to_flush else {
break Ok(());
};
match self.flush_frozen_layer(layer_to_flush, ctx).await {
Ok(()) => {}
Err(FlushLayerError::Cancelled) => {
info!("dropping out of flush loop for timeline shutdown");
return;
}
err @ Err(
FlushLayerError::Other(_) | FlushLayerError::PageReconstructError(_),
) => {
error!("could not flush frozen layer: {err:?}");
break err;
}
if let Err(err) = self.flush_frozen_layer(layer_to_flush, ctx).await {
error!("could not flush frozen layer: {err:?}");
break Err(err);
}
};
// Notify any listeners that we're done
@@ -2477,17 +2443,7 @@ impl Timeline {
}
}
trace!("waiting for flush to complete");
tokio::select! {
rx_e = rx.changed() => {
rx_e?;
},
// Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
// the notification from [`flush_loop`] that it completed.
_ = self.cancel.cancelled() => {
tracing::info!("Cancelled layer flush due on timeline shutdown");
return Ok(())
}
};
rx.changed().await?;
trace!("done")
}
}
@@ -2502,7 +2458,7 @@ impl Timeline {
self: &Arc<Self>,
frozen_layer: Arc<InMemoryLayer>,
ctx: &RequestContext,
) -> Result<(), FlushLayerError> {
) -> anyhow::Result<()> {
// As a special case, when we have just imported an image into the repository,
// instead of writing out a L0 delta layer, we directly write out image layer
// files instead. This is possible as long as *all* the data imported into the
@@ -2527,11 +2483,6 @@ impl Timeline {
let (partitioning, _lsn) = self
.repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
.await?;
if self.cancel.is_cancelled() {
return Err(FlushLayerError::Cancelled);
}
// For image layers, we add them immediately into the layer map.
(
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
@@ -2563,10 +2514,6 @@ impl Timeline {
)
};
if self.cancel.is_cancelled() {
return Err(FlushLayerError::Cancelled);
}
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
@@ -2576,10 +2523,6 @@ impl Timeline {
let metadata = {
let mut guard = self.layers.write().await;
if self.cancel.is_cancelled() {
return Err(FlushLayerError::Cancelled);
}
guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
if disk_consistent_lsn != old_disk_consistent_lsn {

View File

@@ -26,7 +26,6 @@ use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
use crate::{
context::{DownloadBehavior, RequestContext},
pgdatadir_mapping::CollectKeySpaceError,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
@@ -327,7 +326,8 @@ impl Timeline {
match state.last_layer_access_imitation {
Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
_ => {
self.imitate_timeline_cached_layer_accesses(ctx).await;
self.imitate_timeline_cached_layer_accesses(cancel, ctx)
.await;
state.last_layer_access_imitation = Some(tokio::time::Instant::now())
}
}
@@ -367,12 +367,21 @@ impl Timeline {
/// Recompute the values which would cause on-demand downloads during restart.
#[instrument(skip_all)]
async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
async fn imitate_timeline_cached_layer_accesses(
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
) {
let lsn = self.get_last_record_lsn();
// imitiate on-restart initial logical size
let size = self
.calculate_logical_size(lsn, LogicalSizeCalculationCause::EvictionTaskImitation, ctx)
.calculate_logical_size(
lsn,
LogicalSizeCalculationCause::EvictionTaskImitation,
cancel.clone(),
ctx,
)
.instrument(info_span!("calculate_logical_size"))
.await;
@@ -398,16 +407,9 @@ impl Timeline {
if size.is_err() {
// ignore, see above comment
} else {
match e {
CollectKeySpaceError::Cancelled => {
// Shutting down, ignore
}
err => {
warn!(
"failed to collect keyspace but succeeded in calculating logical size: {err:#}"
);
}
}
warn!(
"failed to collect keyspace but succeeded in calculating logical size: {e:#}"
);
}
}
}

View File

@@ -43,8 +43,8 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use crate::config::PageServerConf;
use crate::metrics::{
WalRedoKillCause, WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_COUNTERS,
WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
WAL_REDO_WAIT_TIME,
};
use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
use crate::repository::Key;
@@ -207,8 +207,11 @@ impl PostgresRedoManager {
) -> anyhow::Result<Bytes> {
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
const MAX_RETRY_ATTEMPTS: u32 = 1;
let start_time = Instant::now();
let mut n_attempts = 0u32;
loop {
let lock_time = Instant::now();
// launch the WAL redo process on first use
let proc: Arc<WalRedoProcess> = {
let proc_guard = self.redo_process.read().unwrap();
@@ -233,7 +236,7 @@ impl PostgresRedoManager {
}
};
let started_at = std::time::Instant::now();
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
@@ -241,7 +244,8 @@ impl PostgresRedoManager {
.apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout)
.context("apply_wal_records");
let duration = started_at.elapsed();
let end_time = Instant::now();
let duration = end_time.duration_since(lock_time);
let len = records.len();
let nbytes = records.iter().fold(0, |acumulator, record| {
@@ -663,10 +667,10 @@ impl WalRedoProcess {
.close_fds()
.spawn_no_leak_child(tenant_id)
.context("spawn process")?;
WAL_REDO_PROCESS_COUNTERS.started.inc();
let mut child = scopeguard::guard(child, |child| {
error!("killing wal-redo-postgres process due to a problem during launch");
child.kill_and_wait(WalRedoKillCause::Startup);
child.kill_and_wait();
});
let stdin = child.stdin.take().unwrap();
@@ -997,7 +1001,7 @@ impl Drop for WalRedoProcess {
self.child
.take()
.expect("we only do this once")
.kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
.kill_and_wait();
self.stderr_logger_cancel.cancel();
// no way to wait for stderr_logger_task from Drop because that is async only
}
@@ -1033,19 +1037,16 @@ impl NoLeakChild {
})
}
fn kill_and_wait(mut self, cause: WalRedoKillCause) {
fn kill_and_wait(mut self) {
let child = match self.child.take() {
Some(child) => child,
None => return,
};
Self::kill_and_wait_impl(child, cause);
Self::kill_and_wait_impl(child);
}
#[instrument(skip_all, fields(pid=child.id(), ?cause))]
fn kill_and_wait_impl(mut child: Child, cause: WalRedoKillCause) {
scopeguard::defer! {
WAL_REDO_PROCESS_COUNTERS.killed_by_cause[cause].inc();
}
#[instrument(skip_all, fields(pid=child.id()))]
fn kill_and_wait_impl(mut child: Child) {
let res = child.kill();
if let Err(e) = res {
// This branch is very unlikely because:
@@ -1090,7 +1091,7 @@ impl Drop for NoLeakChild {
// This thread here is going to outlive of our dropper.
let span = tracing::info_span!("walredo", %tenant_id);
let _entered = span.enter();
Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop);
Self::kill_and_wait_impl(child);
})
.await
});

View File

@@ -80,9 +80,6 @@ struct ProxyCliArgs {
/// cache for `wake_compute` api method (use `size=0` to disable)
#[clap(long, default_value = config::CacheOptions::DEFAULT_OPTIONS_NODE_INFO)]
wake_compute_cache: String,
/// lock for `wake_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable).
#[clap(long, default_value = config::WakeComputeLockOptions::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK)]
wake_compute_lock: String,
/// Allow self-signed certificates for compute nodes (for testing)
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
allow_self_signed_compute: bool,
@@ -223,23 +220,10 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
node_info: console::caches::NodeInfoCache::new("node_info_cache", size, ttl),
}));
let config::WakeComputeLockOptions {
shards,
permits,
epoch,
timeout,
} = args.wake_compute_lock.parse()?;
info!(permits, shards, ?epoch, "Using NodeLocks (wake_compute)");
let locks = Box::leak(Box::new(
console::locks::ApiLocks::new("wake_compute_lock", permits, shards, timeout)
.unwrap(),
));
tokio::spawn(locks.garbage_collect_worker(epoch));
let url = args.auth_endpoint.parse()?;
let endpoint = http::Endpoint::new(url, http::new_client());
let api = console::provider::neon::Api::new(endpoint, caches, locks);
let api = console::provider::neon::Api::new(endpoint, caches);
auth::BackendType::Console(Cow::Owned(api), ())
}
AuthBackend::Postgres => {

View File

@@ -264,79 +264,6 @@ impl FromStr for CacheOptions {
}
}
/// Helper for cmdline cache options parsing.
pub struct WakeComputeLockOptions {
/// The number of shards the lock map should have
pub shards: usize,
/// The number of allowed concurrent requests for each endpoitn
pub permits: usize,
/// Garbage collection epoch
pub epoch: Duration,
/// Lock timeout
pub timeout: Duration,
}
impl WakeComputeLockOptions {
/// Default options for [`crate::console::provider::ApiLocks`].
pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0";
// pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "shards=32,permits=4,epoch=10m,timeout=1s";
/// Parse lock options passed via cmdline.
/// Example: [`Self::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK`].
fn parse(options: &str) -> anyhow::Result<Self> {
let mut shards = None;
let mut permits = None;
let mut epoch = None;
let mut timeout = None;
for option in options.split(',') {
let (key, value) = option
.split_once('=')
.with_context(|| format!("bad key-value pair: {option}"))?;
match key {
"shards" => shards = Some(value.parse()?),
"permits" => permits = Some(value.parse()?),
"epoch" => epoch = Some(humantime::parse_duration(value)?),
"timeout" => timeout = Some(humantime::parse_duration(value)?),
unknown => bail!("unknown key: {unknown}"),
}
}
// these dont matter if lock is disabled
if let Some(0) = permits {
timeout = Some(Duration::default());
epoch = Some(Duration::default());
shards = Some(2);
}
let out = Self {
shards: shards.context("missing `shards`")?,
permits: permits.context("missing `permits`")?,
epoch: epoch.context("missing `epoch`")?,
timeout: timeout.context("missing `timeout`")?,
};
ensure!(out.shards > 1, "shard count must be > 1");
ensure!(
out.shards.is_power_of_two(),
"shard count must be a power of two"
);
Ok(out)
}
}
impl FromStr for WakeComputeLockOptions {
type Err = anyhow::Error;
fn from_str(options: &str) -> Result<Self, Self::Err> {
let error = || format!("failed to parse cache lock options '{options}'");
Self::parse(options).with_context(error)
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -361,42 +288,4 @@ mod tests {
Ok(())
}
#[test]
fn test_parse_lock_options() -> anyhow::Result<()> {
let WakeComputeLockOptions {
epoch,
permits,
shards,
timeout,
} = "shards=32,permits=4,epoch=10m,timeout=1s".parse()?;
assert_eq!(epoch, Duration::from_secs(10 * 60));
assert_eq!(timeout, Duration::from_secs(1));
assert_eq!(shards, 32);
assert_eq!(permits, 4);
let WakeComputeLockOptions {
epoch,
permits,
shards,
timeout,
} = "epoch=60s,shards=16,timeout=100ms,permits=8".parse()?;
assert_eq!(epoch, Duration::from_secs(60));
assert_eq!(timeout, Duration::from_millis(100));
assert_eq!(shards, 16);
assert_eq!(permits, 8);
let WakeComputeLockOptions {
epoch,
permits,
shards,
timeout,
} = "permits=0".parse()?;
assert_eq!(epoch, Duration::ZERO);
assert_eq!(timeout, Duration::ZERO);
assert_eq!(shards, 2);
assert_eq!(permits, 0);
Ok(())
}
}

View File

@@ -13,10 +13,5 @@ pub mod caches {
pub use super::provider::{ApiCaches, NodeInfoCache};
}
/// Various cache-related types.
pub mod locks {
pub use super::provider::ApiLocks;
}
/// Console's management API.
pub mod mgmt;

View File

@@ -8,13 +8,7 @@ use crate::{
compute, scram,
};
use async_trait::async_trait;
use dashmap::DashMap;
use std::{sync::Arc, time::Duration};
use tokio::{
sync::{OwnedSemaphorePermit, Semaphore},
time::Instant,
};
use tracing::info;
use std::sync::Arc;
pub mod errors {
use crate::{
@@ -155,9 +149,6 @@ pub mod errors {
#[error(transparent)]
ApiError(ApiError),
#[error("Timeout waiting to acquire wake compute lock")]
TimeoutError,
}
// This allows more useful interactions than `#[from]`.
@@ -167,17 +158,6 @@ pub mod errors {
}
}
impl From<tokio::sync::AcquireError> for WakeComputeError {
fn from(_: tokio::sync::AcquireError) -> Self {
WakeComputeError::TimeoutError
}
}
impl From<tokio::time::error::Elapsed> for WakeComputeError {
fn from(_: tokio::time::error::Elapsed) -> Self {
WakeComputeError::TimeoutError
}
}
impl UserFacingError for WakeComputeError {
fn to_string_client(&self) -> String {
use WakeComputeError::*;
@@ -187,8 +167,6 @@ pub mod errors {
BadComputeAddress(_) => REQUEST_FAILED.to_owned(),
// However, API might return a meaningful error.
ApiError(e) => e.to_string_client(),
TimeoutError => "timeout while acquiring the compute resource lock".to_owned(),
}
}
}
@@ -255,145 +233,3 @@ pub struct ApiCaches {
/// Cache for the `wake_compute` API method.
pub node_info: NodeInfoCache,
}
/// Various caches for [`console`](super).
pub struct ApiLocks {
name: &'static str,
node_locks: DashMap<Arc<str>, Arc<Semaphore>>,
permits: usize,
timeout: Duration,
registered: prometheus::IntCounter,
unregistered: prometheus::IntCounter,
reclamation_lag: prometheus::Histogram,
lock_acquire_lag: prometheus::Histogram,
}
impl ApiLocks {
pub fn new(
name: &'static str,
permits: usize,
shards: usize,
timeout: Duration,
) -> prometheus::Result<Self> {
let registered = prometheus::IntCounter::with_opts(
prometheus::Opts::new(
"semaphores_registered",
"Number of semaphores registered in this api lock",
)
.namespace(name),
)?;
prometheus::register(Box::new(registered.clone()))?;
let unregistered = prometheus::IntCounter::with_opts(
prometheus::Opts::new(
"semaphores_unregistered",
"Number of semaphores unregistered in this api lock",
)
.namespace(name),
)?;
prometheus::register(Box::new(unregistered.clone()))?;
let reclamation_lag = prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"reclamation_lag_seconds",
"Time it takes to reclaim unused semaphores in the api lock",
)
.namespace(name)
// 1us -> 65ms
// benchmarks on my mac indicate it's usually in the range of 256us and 512us
.buckets(prometheus::exponential_buckets(1e-6, 2.0, 16)?),
)?;
prometheus::register(Box::new(reclamation_lag.clone()))?;
let lock_acquire_lag = prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"semaphore_acquire_seconds",
"Time it takes to reclaim unused semaphores in the api lock",
)
.namespace(name)
// 0.1ms -> 6s
.buckets(prometheus::exponential_buckets(1e-4, 2.0, 16)?),
)?;
prometheus::register(Box::new(lock_acquire_lag.clone()))?;
Ok(Self {
name,
node_locks: DashMap::with_shard_amount(shards),
permits,
timeout,
lock_acquire_lag,
registered,
unregistered,
reclamation_lag,
})
}
pub async fn get_wake_compute_permit(
&self,
key: &Arc<str>,
) -> Result<WakeComputePermit, errors::WakeComputeError> {
if self.permits == 0 {
return Ok(WakeComputePermit { permit: None });
}
let now = Instant::now();
let semaphore = {
// get fast path
if let Some(semaphore) = self.node_locks.get(key) {
semaphore.clone()
} else {
self.node_locks
.entry(key.clone())
.or_insert_with(|| {
self.registered.inc();
Arc::new(Semaphore::new(self.permits))
})
.clone()
}
};
let permit = tokio::time::timeout_at(now + self.timeout, semaphore.acquire_owned()).await;
self.lock_acquire_lag
.observe((Instant::now() - now).as_secs_f64());
Ok(WakeComputePermit {
permit: Some(permit??),
})
}
pub async fn garbage_collect_worker(&self, epoch: std::time::Duration) {
if self.permits == 0 {
return;
}
let mut interval = tokio::time::interval(epoch / (self.node_locks.shards().len()) as u32);
loop {
for (i, shard) in self.node_locks.shards().iter().enumerate() {
interval.tick().await;
// temporary lock a single shard and then clear any semaphores that aren't currently checked out
// race conditions: if strong_count == 1, there's no way that it can increase while the shard is locked
// therefore releasing it is safe from race conditions
info!(
name = self.name,
shard = i,
"performing epoch reclamation on api lock"
);
let mut lock = shard.write();
let timer = self.reclamation_lag.start_timer();
let count = lock
.extract_if(|_, semaphore| Arc::strong_count(semaphore.get_mut()) == 1)
.count();
drop(lock);
self.unregistered.inc_by(count as u64);
timer.observe_duration()
}
}
}
}
pub struct WakeComputePermit {
// None if the lock is disabled
permit: Option<OwnedSemaphorePermit>,
}
impl WakeComputePermit {
pub fn should_check_cache(&self) -> bool {
self.permit.is_some()
}
}

View File

@@ -3,12 +3,12 @@
use super::{
super::messages::{ConsoleError, GetRoleSecret, WakeCompute},
errors::{ApiError, GetAuthInfoError, WakeComputeError},
ApiCaches, ApiLocks, AuthInfo, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
ApiCaches, AuthInfo, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
};
use crate::{auth::ClientCredentials, compute, http, scram};
use async_trait::async_trait;
use futures::TryFutureExt;
use std::{net::SocketAddr, sync::Arc};
use std::net::SocketAddr;
use tokio::time::Instant;
use tokio_postgres::config::SslMode;
use tracing::{error, info, info_span, warn, Instrument};
@@ -17,17 +17,12 @@ use tracing::{error, info, info_span, warn, Instrument};
pub struct Api {
endpoint: http::Endpoint,
caches: &'static ApiCaches,
locks: &'static ApiLocks,
jwt: String,
}
impl Api {
/// Construct an API object containing the auth parameters.
pub fn new(
endpoint: http::Endpoint,
caches: &'static ApiCaches,
locks: &'static ApiLocks,
) -> Self {
pub fn new(endpoint: http::Endpoint, caches: &'static ApiCaches) -> Self {
let jwt: String = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") {
Ok(v) => v,
Err(_) => "".to_string(),
@@ -35,7 +30,6 @@ impl Api {
Self {
endpoint,
caches,
locks,
jwt,
}
}
@@ -169,22 +163,9 @@ impl super::Api for Api {
return Ok(cached);
}
let key: Arc<str> = key.into();
let permit = self.locks.get_wake_compute_permit(&key).await?;
// after getting back a permit - it's possible the cache was filled
// double check
if permit.should_check_cache() {
if let Some(cached) = self.caches.node_info.get(&key) {
info!(key = &*key, "found cached compute node info");
return Ok(cached);
}
}
let node = self.do_wake_compute(extra, creds).await?;
let (_, cached) = self.caches.node_info.insert(key.clone(), node);
info!(key = &*key, "created a cache entry for compute node info");
let (_, cached) = self.caches.node_info.insert(key.into(), node);
info!(key = key, "created a cache entry for compute node info");
Ok(cached)
}

View File

@@ -570,7 +570,6 @@ fn report_error(e: &WakeComputeError, retry: bool) {
"api_console_other_server_error"
}
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
WakeComputeError::TimeoutError => "timeout_error",
};
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
}

View File

@@ -0,0 +1,69 @@
# Usage from top of repo:
# poetry run python3 ./test_runner/duplicate_tenant.py c66e2e233057f7f05563caff664ecb14 .neon/remote_storage_local_fs
from pathlib import Path
import shutil
import subprocess
import time
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.types import TenantId
import argparse
parser = argparse.ArgumentParser(description="Duplicate tenant script.")
parser.add_argument("initial_tenant", type=str, help="Initial tenant")
parser.add_argument("remote_storage_local_fs_root", type=Path, help="Remote storage local fs root")
parser.add_argument("--ncopies", type=int, help="Number of copies")
parser.add_argument("--numthreads", type=int, default=1, help="Number of threads")
parser.add_argument("--port", type=int, default=9898, help="Pageserver management api port")
args = parser.parse_args()
initial_tenant = args.initial_tenant
remote_storage_local_fs_root: Path = args.remote_storage_local_fs_root
ncopies = args.ncopies
numthreads = args.numthreads
new_tenant = TenantId.generate()
print(f"New tenant: {new_tenant}")
client = PageserverHttpClient(args.port, lambda: None)
src_tenant_gen = int(client.tenant_status(initial_tenant)["generation"])
assert remote_storage_local_fs_root.is_dir(), f"{remote_storage_local_fs_root} is not a directory"
src_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / initial_tenant / "timelines"
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
dst_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / str(new_tenant) / "timelines"
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
for tl in src_timelines_dir.iterdir():
src_tl_dir = src_timelines_dir / tl.name
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
dst_tl_dir = dst_timelines_dir / tl.name
dst_tl_dir.mkdir(parents=False, exist_ok=False)
for file in tl.iterdir():
shutil.copy2(file, dst_tl_dir)
if "__" in file.name:
cmd = [
"./target/debug/pagectl", # TODO: abstract this like the other binaries
"layer",
"rewrite-summary",
str(dst_tl_dir / file.name),
"--new-tenant-id",
str(new_tenant),
]
subprocess.run(cmd, check=True)
client.tenant_attach(new_tenant, generation=src_tenant_gen)
while True:
status = client.tenant_status(new_tenant)
if status["state"]["slug"] == "Active":
break
print("Waiting for tenant to be active..., is: " + status["state"]["slug"])
time.sleep(1)
print("Tenant is active: " + str(new_tenant))

View File

@@ -58,6 +58,7 @@ class HistoricLayerInfo:
lsn_start: str
lsn_end: Optional[str]
remote: bool
remote_path: Optional[str] = None
@classmethod
def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo:
@@ -68,6 +69,7 @@ class HistoricLayerInfo:
lsn_start=d["lsn_start"],
lsn_end=d.get("lsn_end"),
remote=d["remote"],
remote_path=d.get("remote_path"),
)
@@ -219,6 +221,25 @@ class PageserverHttpClient(requests.Session):
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_duplicate(
self, src_tenant_id: TenantId, new_tenant_id: TenantId, conf: Optional[Dict[str, Any]] = None
) -> TenantId:
if conf is not None:
assert "new_tenant_id" not in conf.keys()
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{src_tenant_id}/duplicate",
json={
"new_tenant_id": str(new_tenant_id),
**(conf or {}),
},
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f"could not create tenant: already exists for id {new_tenant_id}")
new_tenant_id = res.json()
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_attach(
self,
tenant_id: TenantId,

View File

@@ -26,7 +26,6 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
".*will not become active. Current state: Broken.*",
".*failed to load metadata.*",
".*load failed.*load local timeline.*",
".*layer loading failed permanently: load layer: .*",
]
)

View File

@@ -79,31 +79,13 @@ def test_lsn_mapping_old(neon_env_builder: NeonEnvBuilder):
def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
tenant_id, _ = env.neon_cli.create_tenant(
conf={
# disable default GC and compaction
"gc_period": "1000 m",
"compaction_period": "0 s",
"gc_horizon": f"{1024 ** 2}",
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
}
)
timeline_id = env.neon_cli.create_branch("test_lsn_mapping", tenant_id=tenant_id)
endpoint_main = env.endpoints.create_start("test_lsn_mapping", tenant_id=tenant_id)
timeline_id = endpoint_main.safe_psql("show neon.timeline_id")[0][0]
log.info("postgres is running on 'main' branch")
new_timeline_id = env.neon_cli.create_branch("test_lsn_mapping")
endpoint_main = env.endpoints.create_start("test_lsn_mapping")
log.info("postgres is running on 'test_lsn_mapping' branch")
cur = endpoint_main.connect().cursor()
# Obtain an lsn before all write operations on this branch
start_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_lsn()"))
# Create table, and insert rows, each in a separate transaction
# Disable `synchronous_commit` to make this initialization go faster.
# XXX: on my laptop this test takes 7s, and setting `synchronous_commit=off`
# doesn't change anything.
# Disable synchronous_commit to make this initialization go faster.
#
# Each row contains current insert LSN and the current timestamp, when
# the row was inserted.
@@ -122,63 +104,40 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
cur.execute("INSERT INTO foo VALUES (-1)")
# Wait until WAL is received by pageserver
last_flush_lsn = wait_for_last_flush_lsn(env, endpoint_main, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint_main, env.initial_tenant, new_timeline_id)
with env.pageserver.http_client() as client:
# Check edge cases
# Timestamp is in the future
# Check edge cases: timestamp in the future
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z", 2
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z", 2
)
assert result["kind"] in ["present", "future"]
# make sure that we return a well advanced lsn here
assert Lsn(result["lsn"]) > start_lsn
assert result["kind"] == "future"
# Timestamp is in the unreachable past
# timestamp too the far history
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z", 2
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z", 2
)
assert result["kind"] == "past"
# make sure that we return the minimum lsn here at the start of the range
assert Lsn(result["lsn"]) < start_lsn
# Probe a bunch of timestamps in the valid range
for i in range(1, len(tbl), 100):
probe_timestamp = tbl[i][1]
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z", 2
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z", 2
)
assert result["kind"] not in ["past", "nodata"]
lsn = result["lsn"]
# Call get_lsn_by_timestamp to get the LSN
# Launch a new read-only node at that LSN, and check that only the rows
# that were supposed to be committed at that point in time are visible.
endpoint_here = env.endpoints.create_start(
branch_name="test_lsn_mapping",
endpoint_id="ep-lsn_mapping_read",
lsn=lsn,
tenant_id=tenant_id,
branch_name="test_lsn_mapping", endpoint_id="ep-lsn_mapping_read", lsn=lsn
)
assert endpoint_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
endpoint_here.stop_and_destroy()
# Do the "past" check again at a new branch to ensure that we don't return something before the branch cutoff
timeline_id_child = env.neon_cli.create_branch(
"test_lsn_mapping_child", tenant_id=tenant_id, ancestor_branch_name="test_lsn_mapping"
)
# Timestamp is in the unreachable past
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = client.timeline_get_lsn_by_timestamp(
tenant_id, timeline_id_child, f"{probe_timestamp.isoformat()}Z", 2
)
assert result["kind"] == "past"
# make sure that we return the minimum lsn here at the start of the range
assert Lsn(result["lsn"]) >= last_flush_lsn
# Test pageserver get_timestamp_of_lsn API
def test_ts_of_lsn_api(neon_env_builder: NeonEnvBuilder):

View File

@@ -0,0 +1,54 @@
import time
from fixtures.neon_fixtures import (
NeonEnvBuilder,
last_flush_lsn_upload,
)
from fixtures.remote_storage import (
RemoteStorageKind,
)
from fixtures.types import TenantId
from fixtures.log_helper import log
def test_tenant_duplicate(
neon_env_builder: NeonEnvBuilder,
):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
with env.endpoints.create_start("main", tenant_id=env.initial_tenant) as ep_main:
ep_main.safe_psql("CREATE TABLE foo (i int);")
ep_main.safe_psql("INSERT INTO foo VALUES (1), (2), (3);")
last_flush_lsn = last_flush_lsn_upload(
env, ep_main, env.initial_tenant, env.initial_timeline
)
new_tenant_id = TenantId.generate()
# timeline id remains unchanged with tenant_duplicate
# TODO: implement a remapping scheme so timeline ids remain globally unique
new_timeline_id = env.initial_timeline
log.info(f"Duplicate tenant/timeline will be: {new_tenant_id}/{new_timeline_id}")
ps_http = env.pageserver.http_client()
ps_http.tenant_duplicate(env.initial_tenant, new_tenant_id)
ps_http.tenant_delete(env.initial_tenant)
env.neon_cli.map_branch("duplicate", new_tenant_id, new_timeline_id)
# start read-only replicate and validate
with env.endpoints.create_start(
"duplicate", tenant_id=new_tenant_id, lsn=last_flush_lsn
) as ep_dup:
with ep_dup.connect() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM foo ORDER BY i;")
cur.fetchall() == [(1,), (2,), (3,)]
# ensure restarting PS works
env.pageserver.stop()
env.pageserver.start()

View File

@@ -25,7 +25,6 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
clap = { version = "4", features = ["derive", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8" }
dashmap = { version = "5", default-features = false, features = ["raw-api"] }
either = { version = "1" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
futures = { version = "0.3" }
@@ -39,7 +38,7 @@ hex = { version = "0.4", features = ["serde"] }
hyper = { version = "0.14", features = ["full"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }
log = { version = "0.4", default-features = false, features = ["kv_unstable", "std"] }
memchr = { version = "2" }
nom = { version = "7" }
num-bigint = { version = "0.4" }
@@ -56,6 +55,7 @@ scopeguard = { version = "1" }
serde = { version = "1", features = ["alloc", "derive"] }
serde_json = { version = "1", features = ["raw_value"] }
smallvec = { version = "1", default-features = false, features = ["write"] }
standback = { version = "0.2", default-features = false, features = ["std"] }
time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
tokio-rustls = { version = "0.24" }
@@ -76,13 +76,14 @@ cc = { version = "1", default-features = false, features = ["parallel"] }
either = { version = "1" }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }
log = { version = "0.4", default-features = false, features = ["kv_unstable", "std"] }
memchr = { version = "2" }
nom = { version = "7" }
prost = { version = "0.11" }
regex = { version = "1" }
regex-syntax = { version = "0.7" }
serde = { version = "1", features = ["alloc", "derive"] }
standback = { version = "0.2", default-features = false, features = ["std"] }
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit"] }
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit", "visit-mut"] }
time-macros = { version = "0.2", default-features = false, features = ["formatting", "parsing", "serde"] }