mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 20:50:37 +00:00
Compare commits
28 Commits
feat/ci_wo
...
problame/m
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
058b35f884 | ||
|
|
ff0aa152f1 | ||
|
|
3375f28990 | ||
|
|
e82deb2ccc | ||
|
|
42bda5d632 | ||
|
|
59c2c3f8ad | ||
|
|
2d6bf176a0 | ||
|
|
313ebfdb88 | ||
|
|
811fab136f | ||
|
|
ee26f09e45 | ||
|
|
f36f0068b8 | ||
|
|
5ff2f1ee7d | ||
|
|
67f5f83edc | ||
|
|
593e35027a | ||
|
|
af95320a8c | ||
|
|
b299eb19e2 | ||
|
|
88d52b31b7 | ||
|
|
aa695b2ad7 | ||
|
|
b695907752 | ||
|
|
75041cb61b | ||
|
|
e80ce970f7 | ||
|
|
f2de5b504f | ||
|
|
b9746168ff | ||
|
|
5cc0059088 | ||
|
|
911946a3cd | ||
|
|
61ff84a3a2 | ||
|
|
15e21c714b | ||
|
|
0689965282 |
82
.github/workflows/build-macos.yml
vendored
82
.github/workflows/build-macos.yml
vendored
@@ -1,82 +0,0 @@
|
||||
name: Check neon with MacOS builds
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
COPT: '-Werror'
|
||||
|
||||
# TODO: move `check-*` and `files-changed` jobs to the "Caller" Workflow
|
||||
# We should care about that as Github has limitations:
|
||||
# - You can connect up to four levels of workflows
|
||||
# - You can call a maximum of 20 unique reusable workflows from a single workflow file.
|
||||
# https://docs.github.com/en/actions/sharing-automations/reusing-workflows#limitations
|
||||
jobs:
|
||||
files-changed:
|
||||
name: Detect what files changed
|
||||
runs-on: ubuntu-22.04
|
||||
timeout-minutes: 3
|
||||
outputs:
|
||||
postgres_changes: ${{ steps.postgres_changes.outputs.changes }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@6ccd57f4c5d15bdc2fef309bd9fb6cc9db2ef1c6 # v4.1.7
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
- name: Check for Postgres changes
|
||||
uses: dorny/paths-filter@1441771bbfdd59dcd748680ee64ebd8faab1a242 #v3
|
||||
id: postgres_changes
|
||||
with:
|
||||
token: ${{ github.token }}
|
||||
filters: |
|
||||
v14: ['vendor/postgres-v14/**', 'Makefile', 'pgxn/**']
|
||||
v15: ['vendor/postgres-v15/**', 'Makefile', 'pgxn/**']
|
||||
v16: ['vendor/postgres-v16/**', 'Makefile', 'pgxn/**']
|
||||
v17: ['vendor/postgres-v17/**', 'Makefile', 'pgxn/**']
|
||||
base: ${{ github.event_name != 'pull_request' && (github.event.merge_group.base_ref || github.ref_name) || '' }}
|
||||
ref: ${{ github.event_name != 'pull_request' && (github.event.merge_group.head_ref || github.ref) || ''}}
|
||||
|
||||
check-macos-build:
|
||||
needs: [ files-changed ]
|
||||
if: |
|
||||
needs.files-changed.outputs.postgres_changes != '[]' && (
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
|
||||
github.ref_name == 'main'
|
||||
)
|
||||
timeout-minutes: 30
|
||||
runs-on: ubuntu-22.04
|
||||
strategy:
|
||||
matrix:
|
||||
postgres-version: ${{ fromJSON(needs.files-changed.outputs.postgres_changes) }}
|
||||
env:
|
||||
# Use release build only, to have less debug info around
|
||||
# Hence keeping target/ (and general cache size) smaller
|
||||
BUILD_TYPE: release
|
||||
steps:
|
||||
- name: Checkout main repo
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Checkout submodule vendor/postgres-${{ matrix.postgres-version }}
|
||||
run: |
|
||||
git submodule init vendor/postgres-${{ matrix.postgres-version }}
|
||||
git submodule update --depth 1
|
||||
|
||||
- name: Install build dependencies
|
||||
run: |
|
||||
echo brew install flex bison openssl protobuf icu4c
|
||||
|
||||
- name: Build Postgres ${{ matrix.postgres-version }}
|
||||
run: |
|
||||
echo make postgres-${{ matrix.postgres-version }}
|
||||
|
||||
- name: Build Neon Pg Ext ${{ matrix.postgres-version }}
|
||||
run: |
|
||||
echo make "neon-pg-ext-${{ matrix.postgres-version }}"
|
||||
|
||||
- name: Build walproposer-lib (only for v17)
|
||||
if: matrix.postgres-version == 'v17'
|
||||
run:
|
||||
echo make walproposer-lib
|
||||
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -6,7 +6,7 @@ on:
|
||||
- main
|
||||
- release
|
||||
- release-proxy
|
||||
# pull_request:
|
||||
pull_request:
|
||||
|
||||
defaults:
|
||||
run:
|
||||
|
||||
174
.github/workflows/neon_extra_builds.yml
vendored
174
.github/workflows/neon_extra_builds.yml
vendored
@@ -31,10 +31,180 @@ jobs:
|
||||
uses: ./.github/workflows/build-build-tools-image.yml
|
||||
secrets: inherit
|
||||
|
||||
run-macos-build:
|
||||
check-macos-build:
|
||||
needs: [ check-permissions ]
|
||||
if: |
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-macos') ||
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
|
||||
github.ref_name == 'main'
|
||||
uses: ./.github/workflows/build-macos.yml
|
||||
timeout-minutes: 90
|
||||
runs-on: macos-15
|
||||
|
||||
env:
|
||||
# Use release build only, to have less debug info around
|
||||
# Hence keeping target/ (and general cache size) smaller
|
||||
BUILD_TYPE: release
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
- name: Install macOS postgres dependencies
|
||||
run: brew install flex bison openssl protobuf icu4c
|
||||
|
||||
- name: Set pg 14 revision for caching
|
||||
id: pg_v14_rev
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v14) >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Set pg 15 revision for caching
|
||||
id: pg_v15_rev
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v15) >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Set pg 16 revision for caching
|
||||
id: pg_v16_rev
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v16) >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Set pg 17 revision for caching
|
||||
id: pg_v17_rev
|
||||
run: echo pg_rev=$(git rev-parse HEAD:vendor/postgres-v17) >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Cache postgres v14 build
|
||||
id: cache_pg_14
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v14
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v14_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache postgres v15 build
|
||||
id: cache_pg_15
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v15
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v15_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache postgres v16 build
|
||||
id: cache_pg_16
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v16
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v16_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache postgres v17 build
|
||||
id: cache_pg_17
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: pg_install/v17
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Set extra env for macOS
|
||||
run: |
|
||||
echo 'LDFLAGS=-L/usr/local/opt/openssl@3/lib' >> $GITHUB_ENV
|
||||
echo 'CPPFLAGS=-I/usr/local/opt/openssl@3/include' >> $GITHUB_ENV
|
||||
|
||||
- name: Cache cargo deps
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: |
|
||||
~/.cargo/registry
|
||||
!~/.cargo/registry/src
|
||||
~/.cargo/git
|
||||
target
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-cargo-${{ hashFiles('./Cargo.lock') }}-${{ hashFiles('./rust-toolchain.toml') }}-rust
|
||||
|
||||
- name: Build postgres v14
|
||||
if: steps.cache_pg_14.outputs.cache-hit != 'true'
|
||||
run: make postgres-v14 -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Build postgres v15
|
||||
if: steps.cache_pg_15.outputs.cache-hit != 'true'
|
||||
run: make postgres-v15 -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Build postgres v16
|
||||
if: steps.cache_pg_16.outputs.cache-hit != 'true'
|
||||
run: make postgres-v16 -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Build postgres v17
|
||||
if: steps.cache_pg_17.outputs.cache-hit != 'true'
|
||||
run: make postgres-v17 -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Build neon extensions
|
||||
run: make neon-pg-ext -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Build walproposer-lib
|
||||
run: make walproposer-lib -j$(sysctl -n hw.ncpu)
|
||||
|
||||
- name: Run cargo build
|
||||
run: PQ_LIB_DIR=$(pwd)/pg_install/v16/lib cargo build --all --release
|
||||
|
||||
- name: Check that no warnings are produced
|
||||
run: ./run_clippy.sh
|
||||
|
||||
gather-rust-build-stats:
|
||||
needs: [ check-permissions, build-build-tools-image ]
|
||||
if: |
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-stats') ||
|
||||
contains(github.event.pull_request.labels.*.name, 'run-extra-build-*') ||
|
||||
github.ref_name == 'main'
|
||||
runs-on: [ self-hosted, large ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
options: --init
|
||||
|
||||
env:
|
||||
BUILD_TYPE: release
|
||||
# build with incremental compilation produce partial results
|
||||
# so do not attempt to cache this build, also disable the incremental compilation
|
||||
CARGO_INCREMENTAL: 0
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
# Some of our rust modules use FFI and need those to be checked
|
||||
- name: Get postgres headers
|
||||
run: make postgres-headers -j$(nproc)
|
||||
|
||||
- name: Build walproposer-lib
|
||||
run: make walproposer-lib -j$(nproc)
|
||||
|
||||
- name: Produce the build stats
|
||||
run: PQ_LIB_DIR=$(pwd)/pg_install/v17/lib cargo build --all --release --timings -j$(nproc)
|
||||
|
||||
- name: Upload the build stats
|
||||
id: upload-stats
|
||||
env:
|
||||
BUCKET: neon-github-public-dev
|
||||
SHA: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
||||
run: |
|
||||
REPORT_URL=https://${BUCKET}.s3.amazonaws.com/build-stats/${SHA}/${GITHUB_RUN_ID}/cargo-timing.html
|
||||
aws s3 cp --only-show-errors ./target/cargo-timings/cargo-timing.html "s3://${BUCKET}/build-stats/${SHA}/${GITHUB_RUN_ID}/"
|
||||
echo "report-url=${REPORT_URL}" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Publish build stats report
|
||||
uses: actions/github-script@v7
|
||||
env:
|
||||
REPORT_URL: ${{ steps.upload-stats.outputs.report-url }}
|
||||
SHA: ${{ github.event.pull_request.head.sha || github.sha }}
|
||||
with:
|
||||
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
|
||||
retries: 5
|
||||
script: |
|
||||
const { REPORT_URL, SHA } = process.env
|
||||
|
||||
await github.rest.repos.createCommitStatus({
|
||||
owner: context.repo.owner,
|
||||
repo: context.repo.repo,
|
||||
sha: `${SHA}`,
|
||||
state: 'success',
|
||||
target_url: `${REPORT_URL}`,
|
||||
context: `Build stats (release)`,
|
||||
})
|
||||
|
||||
@@ -105,6 +105,11 @@ fn main() -> Result<()> {
|
||||
fn init() -> Result<(String, clap::ArgMatches)> {
|
||||
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
|
||||
|
||||
opentelemetry::global::set_error_handler(|err| {
|
||||
tracing::info!("OpenTelemetry error: {err}");
|
||||
})
|
||||
.expect("global error handler lock poisoned");
|
||||
|
||||
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
|
||||
thread::spawn(move || {
|
||||
for sig in signals.forever() {
|
||||
|
||||
@@ -2,14 +2,28 @@
|
||||
|
||||
// This module has heavy inspiration from the prometheus crate's `process_collector.rs`.
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use prometheus::Gauge;
|
||||
|
||||
use crate::UIntGauge;
|
||||
|
||||
pub struct Collector {
|
||||
descs: Vec<prometheus::core::Desc>,
|
||||
vmlck: crate::UIntGauge,
|
||||
cpu_seconds_highres: Gauge,
|
||||
}
|
||||
|
||||
const NMETRICS: usize = 1;
|
||||
const NMETRICS: usize = 2;
|
||||
|
||||
static CLK_TCK_F64: Lazy<f64> = Lazy::new(|| {
|
||||
let long = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
|
||||
if long == -1 {
|
||||
panic!("sysconf(_SC_CLK_TCK) failed");
|
||||
}
|
||||
let convertible_to_f64: i32 =
|
||||
i32::try_from(long).expect("sysconf(_SC_CLK_TCK) is larger than i32");
|
||||
convertible_to_f64 as f64
|
||||
});
|
||||
|
||||
impl prometheus::core::Collector for Collector {
|
||||
fn desc(&self) -> Vec<&prometheus::core::Desc> {
|
||||
@@ -27,6 +41,12 @@ impl prometheus::core::Collector for Collector {
|
||||
mfs.extend(self.vmlck.collect())
|
||||
}
|
||||
}
|
||||
if let Ok(stat) = myself.stat() {
|
||||
let cpu_seconds = stat.utime + stat.stime;
|
||||
self.cpu_seconds_highres
|
||||
.set(cpu_seconds as f64 / *CLK_TCK_F64);
|
||||
mfs.extend(self.cpu_seconds_highres.collect());
|
||||
}
|
||||
mfs
|
||||
}
|
||||
}
|
||||
@@ -43,7 +63,23 @@ impl Collector {
|
||||
.cloned(),
|
||||
);
|
||||
|
||||
Self { descs, vmlck }
|
||||
let cpu_seconds_highres = Gauge::new(
|
||||
"libmetrics_process_cpu_seconds_highres",
|
||||
"Total user and system CPU time spent in seconds.\
|
||||
Sub-second resolution, hence better than `process_cpu_seconds_total`.",
|
||||
)
|
||||
.unwrap();
|
||||
descs.extend(
|
||||
prometheus::core::Collector::desc(&cpu_seconds_highres)
|
||||
.into_iter()
|
||||
.cloned(),
|
||||
);
|
||||
|
||||
Self {
|
||||
descs,
|
||||
vmlck,
|
||||
cpu_seconds_highres,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -184,6 +184,7 @@ pub struct CancelKeyData {
|
||||
|
||||
impl fmt::Display for CancelKeyData {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
// TODO: this is producing strange results, with 0xffffffff........ always in the logs.
|
||||
let hi = (self.backend_pid as u64) << 32;
|
||||
let lo = self.cancel_key as u64;
|
||||
let id = hi | lo;
|
||||
|
||||
@@ -97,10 +97,7 @@ impl AzureBlobStorage {
|
||||
|
||||
pub fn relative_path_to_name(&self, path: &RemotePath) -> String {
|
||||
assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
|
||||
let path_string = path
|
||||
.get_path()
|
||||
.as_str()
|
||||
.trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR);
|
||||
let path_string = path.get_path().as_str();
|
||||
match &self.prefix_in_container {
|
||||
Some(prefix) => {
|
||||
if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
||||
@@ -277,19 +274,14 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
cancel: &CancellationToken,
|
||||
) -> impl Stream<Item = Result<Listing, DownloadError>> {
|
||||
// get the passed prefix or if it is not set use prefix_in_bucket value
|
||||
let list_prefix = prefix
|
||||
.map(|p| self.relative_path_to_name(p))
|
||||
.or_else(|| self.prefix_in_container.clone())
|
||||
.map(|mut p| {
|
||||
// required to end with a separator
|
||||
// otherwise request will return only the entry of a prefix
|
||||
if matches!(mode, ListingMode::WithDelimiter)
|
||||
&& !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
|
||||
{
|
||||
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
|
||||
let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| {
|
||||
self.prefix_in_container.clone().map(|mut s| {
|
||||
if !s.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
||||
s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
|
||||
}
|
||||
p
|
||||
});
|
||||
s
|
||||
})
|
||||
});
|
||||
|
||||
async_stream::stream! {
|
||||
let _permit = self.permit(RequestKind::List, cancel).await?;
|
||||
|
||||
@@ -314,13 +314,15 @@ struct PageServerHandler {
|
||||
|
||||
timeline_handles: TimelineHandles,
|
||||
|
||||
/// Messages queued up for the next processing batch
|
||||
next_batch: Option<BatchedFeMessage>,
|
||||
|
||||
/// See [`PageServerConf::server_side_batch_timeout`]
|
||||
server_side_batch_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
struct Carry {
|
||||
msg: BatchedFeMessage,
|
||||
started_at: Instant,
|
||||
}
|
||||
|
||||
struct TimelineHandles {
|
||||
wrapper: TenantManagerWrapper,
|
||||
/// Note on size: the typical size of this map is 1. The largest size we expect
|
||||
@@ -582,7 +584,6 @@ impl PageServerHandler {
|
||||
connection_ctx,
|
||||
timeline_handles: TimelineHandles::new(tenant_manager),
|
||||
cancel,
|
||||
next_batch: None,
|
||||
server_side_batch_timeout,
|
||||
}
|
||||
}
|
||||
@@ -611,44 +612,87 @@ impl PageServerHandler {
|
||||
)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = tracing::Level::TRACE)]
|
||||
async fn read_batch_from_connection<IO>(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
maybe_carry: &mut Option<Carry>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<BatchOrEof>, QueryError>
|
||||
) -> Result<BatchOrEof, QueryError>
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
let mut batch = self.next_batch.take();
|
||||
let mut batch_started_at: Option<std::time::Instant> = None;
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
|
||||
|
||||
let next_batch: Option<BatchedFeMessage> = loop {
|
||||
let sleep_fut = match (self.server_side_batch_timeout, batch_started_at) {
|
||||
(Some(batch_timeout), Some(started_at)) => futures::future::Either::Left(
|
||||
tokio::time::sleep_until((started_at + batch_timeout).into()),
|
||||
),
|
||||
_ => futures::future::Either::Right(futures::future::pending()),
|
||||
let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell?
|
||||
|
||||
loop {
|
||||
// Create a future that will become ready when we need to stop batching.
|
||||
use futures::future::Either;
|
||||
let batching_deadline = match (
|
||||
&*maybe_carry as &Option<Carry>,
|
||||
&mut batching_deadline_storage,
|
||||
) {
|
||||
(None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
|
||||
(None, Some(_)) => unreachable!(),
|
||||
(Some(_), Some(fut)) => Either::Right(fut), // below arm already ran
|
||||
(Some(carry), None) => {
|
||||
match self.server_side_batch_timeout {
|
||||
None => {
|
||||
return Ok(BatchOrEof::Batch(smallvec::smallvec![
|
||||
maybe_carry
|
||||
.take()
|
||||
.expect("we already checked it's Some")
|
||||
.msg
|
||||
]))
|
||||
}
|
||||
Some(batch_timeout) => {
|
||||
// Take into consideration the time the carry spent waiting.
|
||||
let batch_timeout =
|
||||
batch_timeout.saturating_sub(carry.started_at.elapsed());
|
||||
if batch_timeout.is_zero() {
|
||||
// the timer doesn't support restarting with zero duration
|
||||
return Ok(BatchOrEof::Batch(smallvec::smallvec![
|
||||
maybe_carry
|
||||
.take()
|
||||
.expect("we already checked it's Some")
|
||||
.msg
|
||||
]));
|
||||
} else {
|
||||
batching_deadline_storage = Some(Box::pin(async move {
|
||||
tokio::time::sleep(batch_timeout).await;
|
||||
}));
|
||||
Either::Right(
|
||||
batching_deadline_storage.as_mut().expect("we just set it"),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
_ = self.cancel.cancelled() => {
|
||||
return Err(QueryError::Shutdown)
|
||||
}
|
||||
msg = pgb.read_message() => {
|
||||
msg
|
||||
}
|
||||
_ = sleep_fut => {
|
||||
assert!(batch.is_some());
|
||||
break None;
|
||||
_ = batching_deadline => {
|
||||
return Ok(BatchOrEof::Batch(smallvec::smallvec![maybe_carry.take().expect("per construction of batching_deadline").msg]));
|
||||
}
|
||||
msg = pgb.read_message() => { msg }
|
||||
};
|
||||
|
||||
let msg_start = Instant::now();
|
||||
|
||||
// Rest of this loop body is trying to batch `msg` into `batch`.
|
||||
// If we can add msg to batch we continue into the next loop iteration.
|
||||
// If we can't add msg to batch batch, we carry `msg` over to the next call.
|
||||
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(FeMessage::Terminate) => {
|
||||
return Ok(Some(BatchOrEof::Eof));
|
||||
return Ok(BatchOrEof::Eof);
|
||||
}
|
||||
Some(m) => {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
@@ -656,10 +700,11 @@ impl PageServerHandler {
|
||||
)));
|
||||
}
|
||||
None => {
|
||||
return Ok(Some(BatchOrEof::Eof));
|
||||
return Ok(BatchOrEof::Eof);
|
||||
} // client disconnected
|
||||
};
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
|
||||
fail::fail_point!("ps::handle-pagerequest-message");
|
||||
|
||||
// parse request
|
||||
@@ -701,11 +746,11 @@ impl PageServerHandler {
|
||||
span,
|
||||
error: $error,
|
||||
};
|
||||
let batch_and_error = match batch {
|
||||
Some(b) => smallvec::smallvec![b, error],
|
||||
let batch_and_error = match maybe_carry.take() {
|
||||
Some(carry) => smallvec::smallvec![carry.msg, error],
|
||||
None => smallvec::smallvec![error],
|
||||
};
|
||||
Ok(Some(BatchOrEof::Batch(batch_and_error)))
|
||||
Ok(BatchOrEof::Batch(batch_and_error))
|
||||
}};
|
||||
}
|
||||
|
||||
@@ -758,26 +803,20 @@ impl PageServerHandler {
|
||||
}
|
||||
};
|
||||
|
||||
let batch_timeout = match self.server_side_batch_timeout {
|
||||
Some(value) => value,
|
||||
None => {
|
||||
// Batching is not enabled - stop on the first message.
|
||||
return Ok(Some(BatchOrEof::Batch(smallvec::smallvec![this_msg])));
|
||||
}
|
||||
};
|
||||
|
||||
// check if we can batch
|
||||
match (&mut batch, this_msg) {
|
||||
//
|
||||
// batch
|
||||
//
|
||||
match (maybe_carry.as_mut(), this_msg) {
|
||||
(None, this_msg) => {
|
||||
batch = Some(this_msg);
|
||||
*maybe_carry = Some(Carry { msg: this_msg, started_at: msg_start });
|
||||
}
|
||||
(
|
||||
Some(BatchedFeMessage::GetPage {
|
||||
Some(Carry { msg: BatchedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: accum_shard,
|
||||
pages: accum_pages,
|
||||
pages: ref mut accum_pages,
|
||||
effective_request_lsn: accum_lsn,
|
||||
}),
|
||||
}, started_at: _}),
|
||||
BatchedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: this_shard,
|
||||
@@ -787,12 +826,14 @@ impl PageServerHandler {
|
||||
) if async {
|
||||
assert_eq!(this_pages.len(), 1);
|
||||
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
|
||||
trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size");
|
||||
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
|
||||
return false;
|
||||
}
|
||||
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
|
||||
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
|
||||
{
|
||||
trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
return false;
|
||||
@@ -800,6 +841,7 @@ impl PageServerHandler {
|
||||
// the vectored get currently only supports a single LSN, so, bounce as soon
|
||||
// as the effective request_lsn changes
|
||||
if *accum_lsn != this_lsn {
|
||||
trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
|
||||
return false;
|
||||
}
|
||||
true
|
||||
@@ -809,21 +851,17 @@ impl PageServerHandler {
|
||||
// ok to batch
|
||||
accum_pages.extend(this_pages);
|
||||
}
|
||||
(Some(_), this_msg) => {
|
||||
(Some(carry), this_msg) => {
|
||||
// by default, don't continue batching
|
||||
break Some(this_msg);
|
||||
let carry = std::mem::replace(carry,
|
||||
Carry {
|
||||
msg: this_msg,
|
||||
started_at: msg_start,
|
||||
});
|
||||
return Ok(BatchOrEof::Batch(smallvec::smallvec![carry.msg]));
|
||||
}
|
||||
}
|
||||
|
||||
// batching impl piece
|
||||
let started_at = batch_started_at.get_or_insert_with(Instant::now);
|
||||
if started_at.elapsed() > batch_timeout {
|
||||
break None;
|
||||
}
|
||||
};
|
||||
|
||||
self.next_batch = next_batch;
|
||||
Ok(batch.map(|b| BatchOrEof::Batch(smallvec::smallvec![b])))
|
||||
}
|
||||
}
|
||||
|
||||
/// Pagestream sub-protocol handler.
|
||||
@@ -861,22 +899,17 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
|
||||
// If [`PageServerHandler`] is reused for multiple pagestreams,
|
||||
// then make sure to not process requests from the previous ones.
|
||||
self.next_batch = None;
|
||||
let mut carry: Option<Carry> = None;
|
||||
|
||||
loop {
|
||||
let maybe_batched = self
|
||||
.read_batch_from_connection(pgb, &tenant_id, &timeline_id, &ctx)
|
||||
.read_batch_from_connection(pgb, &tenant_id, &timeline_id, &mut carry, &ctx)
|
||||
.await?;
|
||||
let batched = match maybe_batched {
|
||||
Some(BatchOrEof::Batch(b)) => b,
|
||||
Some(BatchOrEof::Eof) => {
|
||||
BatchOrEof::Batch(b) => b,
|
||||
BatchOrEof::Eof => {
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
for batch in batched {
|
||||
@@ -922,6 +955,7 @@ impl PageServerHandler {
|
||||
(
|
||||
{
|
||||
let npages = pages.len();
|
||||
trace!(npages, "handling getpage request");
|
||||
let res = self
|
||||
.handle_get_page_at_lsn_request_batched(
|
||||
&shard,
|
||||
|
||||
@@ -189,6 +189,7 @@ pub struct TenantSharedResources {
|
||||
/// A [`Tenant`] is really an _attached_ tenant. The configuration
|
||||
/// for an attached tenant is a subset of the [`LocationConf`], represented
|
||||
/// in this struct.
|
||||
#[derive(Clone)]
|
||||
pub(super) struct AttachedTenantConf {
|
||||
tenant_conf: TenantConfOpt,
|
||||
location: AttachedLocationConfig,
|
||||
@@ -1807,6 +1808,7 @@ impl Tenant {
|
||||
self.tenant_shard_id,
|
||||
timeline_id,
|
||||
self.generation,
|
||||
&self.tenant_conf.load().location,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -2527,6 +2529,10 @@ impl Tenant {
|
||||
{
|
||||
let conf = self.tenant_conf.load();
|
||||
|
||||
// If we may not delete layers, then simply skip GC. Even though a tenant
|
||||
// in AttachedMulti state could do GC and just enqueue the blocked deletions,
|
||||
// the only advantage to doing it is to perhaps shrink the LayerMap metadata
|
||||
// a bit sooner than we would achieve by waiting for AttachedSingle status.
|
||||
if !conf.location.may_delete_layers_hint() {
|
||||
info!("Skipping GC in location state {:?}", conf.location);
|
||||
return Ok(GcResult::default());
|
||||
@@ -2568,7 +2574,14 @@ impl Tenant {
|
||||
|
||||
{
|
||||
let conf = self.tenant_conf.load();
|
||||
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
|
||||
|
||||
// Note that compaction usually requires deletions, but we don't respect
|
||||
// may_delete_layers_hint here: that is because tenants in AttachedMulti
|
||||
// should proceed with compaction even if they can't do deletion, to avoid
|
||||
// accumulating dangerously deep stacks of L0 layers. Deletions will be
|
||||
// enqueued inside RemoteTimelineClient, and executed layer if/when we transition
|
||||
// to AttachedSingle state.
|
||||
if !conf.location.may_upload_layers_hint() {
|
||||
info!("Skipping compaction in location state {:?}", conf.location);
|
||||
return Ok(false);
|
||||
}
|
||||
@@ -3446,6 +3459,7 @@ impl Tenant {
|
||||
// this race is not possible if both request types come from the storage
|
||||
// controller (as they should!) because an exclusive op lock is required
|
||||
// on the storage controller side.
|
||||
|
||||
self.tenant_conf.rcu(|inner| {
|
||||
Arc::new(AttachedTenantConf {
|
||||
tenant_conf: new_tenant_conf.clone(),
|
||||
@@ -3455,20 +3469,22 @@ impl Tenant {
|
||||
})
|
||||
});
|
||||
|
||||
let updated = self.tenant_conf.load().clone();
|
||||
|
||||
self.tenant_conf_updated(&new_tenant_conf);
|
||||
// Don't hold self.timelines.lock() during the notifies.
|
||||
// There's no risk of deadlock right now, but there could be if we consolidate
|
||||
// mutexes in struct Timeline in the future.
|
||||
let timelines = self.list_timelines();
|
||||
for timeline in timelines {
|
||||
timeline.tenant_conf_updated(&new_tenant_conf);
|
||||
timeline.tenant_conf_updated(&updated);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
|
||||
let new_tenant_conf = new_conf.tenant_conf.clone();
|
||||
|
||||
self.tenant_conf.store(Arc::new(new_conf));
|
||||
self.tenant_conf.store(Arc::new(new_conf.clone()));
|
||||
|
||||
self.tenant_conf_updated(&new_tenant_conf);
|
||||
// Don't hold self.timelines.lock() during the notifies.
|
||||
@@ -3476,7 +3492,7 @@ impl Tenant {
|
||||
// mutexes in struct Timeline in the future.
|
||||
let timelines = self.list_timelines();
|
||||
for timeline in timelines {
|
||||
timeline.tenant_conf_updated(&new_tenant_conf);
|
||||
timeline.tenant_conf_updated(&new_conf);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4544,6 +4560,7 @@ impl Tenant {
|
||||
self.tenant_shard_id,
|
||||
timeline_id,
|
||||
self.generation,
|
||||
&self.tenant_conf.load().location,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -1719,10 +1719,11 @@ impl TenantManager {
|
||||
parent_layers.push(relative_path.to_owned());
|
||||
}
|
||||
}
|
||||
debug_assert!(
|
||||
!parent_layers.is_empty(),
|
||||
"shutdown cannot empty the layermap"
|
||||
);
|
||||
|
||||
if parent_layers.is_empty() {
|
||||
tracing::info!("Ancestor shard has no resident layer to hard link");
|
||||
}
|
||||
|
||||
(parent_timelines, parent_layers)
|
||||
};
|
||||
|
||||
|
||||
@@ -197,6 +197,7 @@ use utils::backoff::{
|
||||
self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
};
|
||||
use utils::pausable_failpoint;
|
||||
use utils::shard::ShardNumber;
|
||||
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
@@ -240,6 +241,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use self::index::IndexPart;
|
||||
|
||||
use super::config::AttachedLocationConfig;
|
||||
use super::metadata::MetadataUpdate;
|
||||
use super::storage_layer::{Layer, LayerName, ResidentLayer};
|
||||
use super::upload_queue::{NotInitialized, SetDeletedFlagProgress};
|
||||
@@ -301,6 +303,36 @@ pub enum WaitCompletionError {
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error("Upload queue either in unexpected state or hasn't downloaded manifest yet")]
|
||||
pub struct UploadQueueNotReadyError;
|
||||
/// Behavioral modes that enable seamless live migration.
|
||||
///
|
||||
/// See docs/rfcs/028-pageserver-migration.md to understand how these fit in.
|
||||
struct RemoteTimelineClientConfig {
|
||||
/// If this is false, then update to remote_consistent_lsn are dropped rather
|
||||
/// than being submitted to DeletionQueue for validation. This behavior is
|
||||
/// used when a tenant attachment is known to have a stale generation number,
|
||||
/// such that validation attempts will always fail. This is not necessary
|
||||
/// for correctness, but avoids spamming error statistics with failed validations
|
||||
/// when doing migrations of tenants.
|
||||
process_remote_consistent_lsn_updates: bool,
|
||||
|
||||
/// If this is true, then object deletions are held in a buffer in RemoteTimelineClient
|
||||
/// rather than being submitted to the DeletionQueue. This behavior is used when a tenant
|
||||
/// is known to be multi-attached, in order to avoid disrupting other attached tenants
|
||||
/// whose generations' metadata refers to the deleted objects.
|
||||
block_deletions: bool,
|
||||
}
|
||||
|
||||
/// RemoteTimelineClientConfig's state is entirely driven by LocationConf, but we do
|
||||
/// not carry the entire LocationConf structure: it's much more than we need. The From
|
||||
/// impl extracts the subset of the LocationConf that is interesting to RemoteTimelineClient.
|
||||
impl From<&AttachedLocationConfig> for RemoteTimelineClientConfig {
|
||||
fn from(lc: &AttachedLocationConfig) -> Self {
|
||||
Self {
|
||||
block_deletions: !lc.may_delete_layers_hint(),
|
||||
process_remote_consistent_lsn_updates: lc.may_upload_layers_hint(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A client for accessing a timeline's data in remote storage.
|
||||
///
|
||||
@@ -321,7 +353,7 @@ pub struct UploadQueueNotReadyError;
|
||||
/// in the index part file, whenever timeline metadata is uploaded.
|
||||
///
|
||||
/// Downloads are not queued, they are performed immediately.
|
||||
pub struct RemoteTimelineClient {
|
||||
pub(crate) struct RemoteTimelineClient {
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
runtime: tokio::runtime::Handle,
|
||||
@@ -338,6 +370,9 @@ pub struct RemoteTimelineClient {
|
||||
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
|
||||
/// Subset of tenant configuration used to control upload behaviors during migrations
|
||||
config: std::sync::RwLock<RemoteTimelineClientConfig>,
|
||||
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
@@ -348,13 +383,14 @@ impl RemoteTimelineClient {
|
||||
/// Note: the caller must initialize the upload queue before any uploads can be scheduled,
|
||||
/// by calling init_upload_queue.
|
||||
///
|
||||
pub fn new(
|
||||
pub(crate) fn new(
|
||||
remote_storage: GenericRemoteStorage,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
conf: &'static PageServerConf,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
generation: Generation,
|
||||
location_conf: &AttachedLocationConfig,
|
||||
) -> RemoteTimelineClient {
|
||||
RemoteTimelineClient {
|
||||
conf,
|
||||
@@ -374,6 +410,7 @@ impl RemoteTimelineClient {
|
||||
&tenant_shard_id,
|
||||
&timeline_id,
|
||||
)),
|
||||
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(location_conf)),
|
||||
cancel: CancellationToken::new(),
|
||||
}
|
||||
}
|
||||
@@ -429,6 +466,43 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Notify this client of a change to its parent tenant's config, as this may cause us to
|
||||
/// take action (unblocking deletions when transitioning from AttachedMulti to AttachedSingle)
|
||||
pub(super) fn update_config(&self, location_conf: &AttachedLocationConfig) {
|
||||
let new_conf = RemoteTimelineClientConfig::from(location_conf);
|
||||
let unblocked = !new_conf.block_deletions;
|
||||
|
||||
// Update config before draining deletions, so that we don't race with more being
|
||||
// inserted. This can result in deletions happening our of order, but that does not
|
||||
// violate any invariants: deletions only need to be ordered relative to upload of the index
|
||||
// that dereferences the deleted objects, and we are not changing that order.
|
||||
*self.config.write().unwrap() = new_conf;
|
||||
|
||||
if unblocked {
|
||||
// If we may now delete layers, drain any that were blocked in our old
|
||||
// configuration state
|
||||
let mut queue_locked = self.upload_queue.lock().unwrap();
|
||||
|
||||
if let Ok(queue) = queue_locked.initialized_mut() {
|
||||
let blocked_deletions = std::mem::take(&mut queue.blocked_deletions);
|
||||
for d in blocked_deletions {
|
||||
if let Err(e) = self.deletion_queue_client.push_layers_sync(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
self.generation,
|
||||
d.layers,
|
||||
) {
|
||||
// This could happen if the pageserver is shut down while a tenant
|
||||
// is transitioning from a deletion-blocked state: we will leak some
|
||||
// S3 objects in this case.
|
||||
warn!("Failed to drain blocked deletions: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `None` if nothing is yet uplodaded, `Some(disk_consistent_lsn)` otherwise.
|
||||
pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
|
||||
match &mut *self.upload_queue.lock().unwrap() {
|
||||
@@ -1912,16 +1986,24 @@ impl RemoteTimelineClient {
|
||||
res
|
||||
}
|
||||
UploadOp::Delete(delete) => {
|
||||
pausable_failpoint!("before-delete-layer-pausable");
|
||||
self.deletion_queue_client
|
||||
.push_layers(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
self.generation,
|
||||
delete.layers.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
if self.config.read().unwrap().block_deletions {
|
||||
let mut queue_locked = self.upload_queue.lock().unwrap();
|
||||
if let Ok(queue) = queue_locked.initialized_mut() {
|
||||
queue.blocked_deletions.push(delete.clone());
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
pausable_failpoint!("before-delete-layer-pausable");
|
||||
self.deletion_queue_client
|
||||
.push_layers(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
self.generation,
|
||||
delete.layers.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
|
||||
// unreachable. Barrier operations are handled synchronously in
|
||||
@@ -2028,8 +2110,16 @@ impl RemoteTimelineClient {
|
||||
// Legacy mode: skip validating generation
|
||||
upload_queue.visible_remote_consistent_lsn.store(lsn);
|
||||
None
|
||||
} else {
|
||||
} else if self
|
||||
.config
|
||||
.read()
|
||||
.unwrap()
|
||||
.process_remote_consistent_lsn_updates
|
||||
{
|
||||
Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
|
||||
} else {
|
||||
// Our config disables remote_consistent_lsn updates: drop it.
|
||||
None
|
||||
}
|
||||
}
|
||||
UploadOp::Delete(_) => {
|
||||
@@ -2166,6 +2256,7 @@ impl RemoteTimelineClient {
|
||||
queued_operations: VecDeque::default(),
|
||||
#[cfg(feature = "testing")]
|
||||
dangling_files: HashMap::default(),
|
||||
blocked_deletions: Vec::new(),
|
||||
shutting_down: false,
|
||||
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
|
||||
};
|
||||
@@ -2231,6 +2322,28 @@ impl RemoteTimelineClient {
|
||||
UploadQueue::Initialized(x) => x.no_pending_work(),
|
||||
}
|
||||
}
|
||||
|
||||
/// 'foreign' in the sense that it does not belong to this tenant shard. This method
|
||||
/// is used during GC for other shards to get the index of shard zero.
|
||||
pub(crate) async fn download_foreign_index(
|
||||
&self,
|
||||
shard_number: ShardNumber,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(IndexPart, Generation, std::time::SystemTime), DownloadError> {
|
||||
let foreign_shard_id = TenantShardId {
|
||||
shard_number,
|
||||
shard_count: self.tenant_shard_id.shard_count,
|
||||
tenant_id: self.tenant_shard_id.tenant_id,
|
||||
};
|
||||
download_index_part(
|
||||
&self.storage_impl,
|
||||
&foreign_shard_id,
|
||||
&self.timeline_id,
|
||||
Generation::MAX,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct UploadQueueAccessor<'a> {
|
||||
@@ -2379,6 +2492,7 @@ mod tests {
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
tenant::{
|
||||
config::AttachmentMode,
|
||||
harness::{TenantHarness, TIMELINE_ID},
|
||||
storage_layer::layer::local_layer_path,
|
||||
Tenant, Timeline,
|
||||
@@ -2464,6 +2578,10 @@ mod tests {
|
||||
|
||||
/// Construct a RemoteTimelineClient in an arbitrary generation
|
||||
fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
|
||||
let location_conf = AttachedLocationConfig {
|
||||
generation,
|
||||
attach_mode: AttachmentMode::Single,
|
||||
};
|
||||
Arc::new(RemoteTimelineClient {
|
||||
conf: self.harness.conf,
|
||||
runtime: tokio::runtime::Handle::current(),
|
||||
@@ -2477,6 +2595,7 @@ mod tests {
|
||||
&self.harness.tenant_shard_id,
|
||||
&TIMELINE_ID,
|
||||
)),
|
||||
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)),
|
||||
cancel: CancellationToken::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -111,15 +111,6 @@ pub(crate) struct SecondaryTenant {
|
||||
pub(super) heatmap_total_size_metric: UIntGauge,
|
||||
}
|
||||
|
||||
impl Drop for SecondaryTenant {
|
||||
fn drop(&mut self) {
|
||||
let tenant_id = self.tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
|
||||
let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
}
|
||||
}
|
||||
|
||||
impl SecondaryTenant {
|
||||
pub(crate) fn new(
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -167,6 +158,13 @@ impl SecondaryTenant {
|
||||
|
||||
// Wait for any secondary downloader work to complete
|
||||
self.gate.close().await;
|
||||
|
||||
self.validate_metrics();
|
||||
|
||||
let tenant_id = self.tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
|
||||
let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
}
|
||||
|
||||
pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
|
||||
@@ -254,6 +252,20 @@ impl SecondaryTenant {
|
||||
.await
|
||||
.expect("secondary eviction should not have panicked");
|
||||
}
|
||||
|
||||
/// Exhaustive check that incrementally updated metrics match the actual state.
|
||||
#[cfg(feature = "testing")]
|
||||
fn validate_metrics(&self) {
|
||||
let detail = self.detail.lock().unwrap();
|
||||
let resident_size = detail.total_resident_size();
|
||||
|
||||
assert_eq!(resident_size, self.resident_size_metric.get());
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "testing"))]
|
||||
fn validate_metrics(&self) {
|
||||
// No-op in non-testing builds
|
||||
}
|
||||
}
|
||||
|
||||
/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
|
||||
|
||||
@@ -242,6 +242,19 @@ impl SecondaryDetail {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
pub(crate) fn total_resident_size(&self) -> u64 {
|
||||
self.timelines
|
||||
.values()
|
||||
.map(|tl| {
|
||||
tl.on_disk_layers
|
||||
.values()
|
||||
.map(|v| v.metadata.file_size)
|
||||
.sum::<u64>()
|
||||
})
|
||||
.sum::<u64>()
|
||||
}
|
||||
|
||||
pub(super) fn evict_layer(
|
||||
&mut self,
|
||||
name: LayerName,
|
||||
@@ -763,24 +776,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
}
|
||||
|
||||
// Metrics consistency check in testing builds
|
||||
if cfg!(feature = "testing") {
|
||||
let detail = self.secondary_state.detail.lock().unwrap();
|
||||
let resident_size = detail
|
||||
.timelines
|
||||
.values()
|
||||
.map(|tl| {
|
||||
tl.on_disk_layers
|
||||
.values()
|
||||
.map(|v| v.metadata.file_size)
|
||||
.sum::<u64>()
|
||||
})
|
||||
.sum::<u64>();
|
||||
assert_eq!(
|
||||
resident_size,
|
||||
self.secondary_state.resident_size_metric.get()
|
||||
);
|
||||
}
|
||||
|
||||
self.secondary_state.validate_metrics();
|
||||
// Only update last_etag after a full successful download: this way will not skip
|
||||
// the next download, even if the heatmap's actual etag is unchanged.
|
||||
self.secondary_state.detail.lock().unwrap().last_download = Some(DownloadSummary {
|
||||
|
||||
@@ -38,6 +38,7 @@ use pageserver_api::{
|
||||
shard::{ShardIdentity, ShardNumber, TenantShardId},
|
||||
};
|
||||
use rand::Rng;
|
||||
use remote_storage::DownloadError;
|
||||
use serde_with::serde_as;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::{
|
||||
@@ -272,7 +273,7 @@ pub struct Timeline {
|
||||
|
||||
/// Remote storage client.
|
||||
/// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
|
||||
pub remote_client: Arc<RemoteTimelineClient>,
|
||||
pub(crate) remote_client: Arc<RemoteTimelineClient>,
|
||||
|
||||
// What page versions do we hold in the repository? If we get a
|
||||
// request > last_record_lsn, we need to wait until we receive all
|
||||
@@ -2171,14 +2172,14 @@ impl Timeline {
|
||||
)
|
||||
}
|
||||
|
||||
pub(super) fn tenant_conf_updated(&self, new_conf: &TenantConfOpt) {
|
||||
pub(super) fn tenant_conf_updated(&self, new_conf: &AttachedTenantConf) {
|
||||
// NB: Most tenant conf options are read by background loops, so,
|
||||
// changes will automatically be picked up.
|
||||
|
||||
// The threshold is embedded in the metric. So, we need to update it.
|
||||
{
|
||||
let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
|
||||
new_conf,
|
||||
&new_conf.tenant_conf,
|
||||
&self.conf.default_tenant_conf,
|
||||
);
|
||||
|
||||
@@ -2186,6 +2187,9 @@ impl Timeline {
|
||||
let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug());
|
||||
|
||||
let timeline_id_str = self.timeline_id.to_string();
|
||||
|
||||
self.remote_client.update_config(&new_conf.location);
|
||||
|
||||
self.metrics
|
||||
.evictions_with_low_residence_duration
|
||||
.write()
|
||||
@@ -4821,6 +4825,86 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn find_gc_time_cutoff(
|
||||
&self,
|
||||
pitr: Duration,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<Lsn>, PageReconstructError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
if self.shard_identity.is_shard_zero() {
|
||||
// Shard Zero has SLRU data and can calculate the PITR time -> LSN mapping itself
|
||||
let now = SystemTime::now();
|
||||
let time_range = if pitr == Duration::ZERO {
|
||||
humantime::parse_duration(DEFAULT_PITR_INTERVAL).expect("constant is invalid")
|
||||
} else {
|
||||
pitr
|
||||
};
|
||||
|
||||
// If PITR is so large or `now` is so small that this underflows, we will retain no history (highly unexpected case)
|
||||
let time_cutoff = now.checked_sub(time_range).unwrap_or(now);
|
||||
let timestamp = to_pg_timestamp(time_cutoff);
|
||||
|
||||
let time_cutoff = match self.find_lsn_for_timestamp(timestamp, cancel, ctx).await? {
|
||||
LsnForTimestamp::Present(lsn) => Some(lsn),
|
||||
LsnForTimestamp::Future(lsn) => {
|
||||
// The timestamp is in the future. That sounds impossible,
|
||||
// but what it really means is that there hasn't been
|
||||
// any commits since the cutoff timestamp.
|
||||
//
|
||||
// In this case we should use the LSN of the most recent commit,
|
||||
// which is implicitly the last LSN in the log.
|
||||
debug!("future({})", lsn);
|
||||
Some(self.get_last_record_lsn())
|
||||
}
|
||||
LsnForTimestamp::Past(lsn) => {
|
||||
debug!("past({})", lsn);
|
||||
None
|
||||
}
|
||||
LsnForTimestamp::NoData(lsn) => {
|
||||
debug!("nodata({})", lsn);
|
||||
None
|
||||
}
|
||||
};
|
||||
Ok(time_cutoff)
|
||||
} else {
|
||||
// Shards other than shard zero cannot do timestamp->lsn lookups, and must instead learn their GC cutoff
|
||||
// from shard zero's index. The index doesn't explicitly tell us the time cutoff, but we may assume that
|
||||
// the point up to which shard zero's last_gc_cutoff has advanced will either be the time cutoff, or a
|
||||
// space cutoff that we would also have respected ourselves.
|
||||
match self
|
||||
.remote_client
|
||||
.download_foreign_index(ShardNumber(0), cancel)
|
||||
.await
|
||||
{
|
||||
Ok((index_part, index_generation, _index_mtime)) => {
|
||||
tracing::info!("GC loaded shard zero metadata (gen {index_generation:?}): latest_gc_cutoff_lsn: {}",
|
||||
index_part.metadata.latest_gc_cutoff_lsn());
|
||||
Ok(Some(index_part.metadata.latest_gc_cutoff_lsn()))
|
||||
}
|
||||
Err(DownloadError::NotFound) => {
|
||||
// This is unexpected, because during timeline creations shard zero persists to remote
|
||||
// storage before other shards are called, and during timeline deletion non-zeroth shards are
|
||||
// deleted before the zeroth one. However, it should be harmless: if we somehow end up in this
|
||||
// state, then shard zero should _eventually_ write an index when it GCs.
|
||||
tracing::warn!("GC couldn't find shard zero's index for timeline");
|
||||
Ok(None)
|
||||
}
|
||||
Err(e) => {
|
||||
// TODO: this function should return a different error type than page reconstruct error
|
||||
Err(PageReconstructError::Other(anyhow::anyhow!(e)))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: after reading shard zero's GC cutoff, we should validate its generation with the storage
|
||||
// controller. Otherwise, it is possible that we see the GC cutoff go backwards while shard zero
|
||||
// is going through a migration if we read the old location's index and it has GC'd ahead of the
|
||||
// new location. This is legal in principle, but problematic in practice because it might result
|
||||
// in a timeline creation succeeding on shard zero ('s new location) but then failing on other shards
|
||||
// because they have GC'd past the branch point.
|
||||
}
|
||||
}
|
||||
|
||||
/// Find the Lsns above which layer files need to be retained on
|
||||
/// garbage collection.
|
||||
///
|
||||
@@ -4863,40 +4947,7 @@ impl Timeline {
|
||||
// - if PITR interval is set, then this is our cutoff.
|
||||
// - if PITR interval is not set, then we do a lookup
|
||||
// based on DEFAULT_PITR_INTERVAL, so that size-based retention does not result in keeping history around permanently on idle databases.
|
||||
let time_cutoff = {
|
||||
let now = SystemTime::now();
|
||||
let time_range = if pitr == Duration::ZERO {
|
||||
humantime::parse_duration(DEFAULT_PITR_INTERVAL).expect("constant is invalid")
|
||||
} else {
|
||||
pitr
|
||||
};
|
||||
|
||||
// If PITR is so large or `now` is so small that this underflows, we will retain no history (highly unexpected case)
|
||||
let time_cutoff = now.checked_sub(time_range).unwrap_or(now);
|
||||
let timestamp = to_pg_timestamp(time_cutoff);
|
||||
|
||||
match self.find_lsn_for_timestamp(timestamp, cancel, ctx).await? {
|
||||
LsnForTimestamp::Present(lsn) => Some(lsn),
|
||||
LsnForTimestamp::Future(lsn) => {
|
||||
// The timestamp is in the future. That sounds impossible,
|
||||
// but what it really means is that there hasn't been
|
||||
// any commits since the cutoff timestamp.
|
||||
//
|
||||
// In this case we should use the LSN of the most recent commit,
|
||||
// which is implicitly the last LSN in the log.
|
||||
debug!("future({})", lsn);
|
||||
Some(self.get_last_record_lsn())
|
||||
}
|
||||
LsnForTimestamp::Past(lsn) => {
|
||||
debug!("past({})", lsn);
|
||||
None
|
||||
}
|
||||
LsnForTimestamp::NoData(lsn) => {
|
||||
debug!("nodata({})", lsn);
|
||||
None
|
||||
}
|
||||
}
|
||||
};
|
||||
let time_cutoff = self.find_gc_time_cutoff(pitr, cancel, ctx).await?;
|
||||
|
||||
Ok(match (pitr, time_cutoff) {
|
||||
(Duration::ZERO, Some(time_cutoff)) => {
|
||||
|
||||
@@ -283,7 +283,7 @@ impl DeleteTimelineFlow {
|
||||
|
||||
/// Shortcut to create Timeline in stopping state and spawn deletion task.
|
||||
#[instrument(skip_all, fields(%timeline_id))]
|
||||
pub async fn resume_deletion(
|
||||
pub(crate) async fn resume_deletion(
|
||||
tenant: Arc<Tenant>,
|
||||
timeline_id: TimelineId,
|
||||
local_metadata: &TimelineMetadata,
|
||||
|
||||
@@ -88,6 +88,9 @@ pub(crate) struct UploadQueueInitialized {
|
||||
#[cfg(feature = "testing")]
|
||||
pub(crate) dangling_files: HashMap<LayerName, Generation>,
|
||||
|
||||
/// Deletions that are blocked by the tenant configuration
|
||||
pub(crate) blocked_deletions: Vec<Delete>,
|
||||
|
||||
/// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
|
||||
pub(crate) shutting_down: bool,
|
||||
|
||||
@@ -180,6 +183,7 @@ impl UploadQueue {
|
||||
queued_operations: VecDeque::new(),
|
||||
#[cfg(feature = "testing")]
|
||||
dangling_files: HashMap::new(),
|
||||
blocked_deletions: Vec::new(),
|
||||
shutting_down: false,
|
||||
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
|
||||
};
|
||||
@@ -220,6 +224,7 @@ impl UploadQueue {
|
||||
queued_operations: VecDeque::new(),
|
||||
#[cfg(feature = "testing")]
|
||||
dangling_files: HashMap::new(),
|
||||
blocked_deletions: Vec::new(),
|
||||
shutting_down: false,
|
||||
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
|
||||
};
|
||||
@@ -270,7 +275,7 @@ pub(crate) struct UploadTask {
|
||||
|
||||
/// A deletion of some layers within the lifetime of a timeline. This is not used
|
||||
/// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct Delete {
|
||||
pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>,
|
||||
}
|
||||
|
||||
@@ -111,7 +111,7 @@ struct SqlOverHttpArgs {
|
||||
sql_over_http_cancel_set_shards: usize,
|
||||
|
||||
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
|
||||
sql_over_http_max_request_size_bytes: u64,
|
||||
sql_over_http_max_request_size_bytes: usize,
|
||||
|
||||
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
|
||||
sql_over_http_max_response_size_bytes: usize,
|
||||
|
||||
@@ -276,7 +276,7 @@ struct SqlOverHttpArgs {
|
||||
sql_over_http_cancel_set_shards: usize,
|
||||
|
||||
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
|
||||
sql_over_http_max_request_size_bytes: u64,
|
||||
sql_over_http_max_request_size_bytes: usize,
|
||||
|
||||
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
|
||||
sql_over_http_max_response_size_bytes: usize,
|
||||
|
||||
@@ -64,7 +64,7 @@ pub struct HttpConfig {
|
||||
pub pool_options: GlobalConnPoolOptions,
|
||||
pub cancel_set: CancelSet,
|
||||
pub client_conn_threshold: u64,
|
||||
pub max_request_size_bytes: u64,
|
||||
pub max_request_size_bytes: usize,
|
||||
pub max_response_size_bytes: usize,
|
||||
}
|
||||
|
||||
|
||||
@@ -380,6 +380,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
|
||||
// after getting back a permit - it's possible the cache was filled
|
||||
// double check
|
||||
if permit.should_check_cache() {
|
||||
// TODO: if there is something in the cache, mark the permit as success.
|
||||
check_cache!();
|
||||
}
|
||||
|
||||
|
||||
@@ -122,18 +122,18 @@ impl Endpoint {
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub(crate) enum ReadBodyError {
|
||||
pub(crate) enum ReadBodyError<E> {
|
||||
#[error("Content length exceeds limit of {limit} bytes")]
|
||||
BodyTooLarge { limit: usize },
|
||||
|
||||
#[error(transparent)]
|
||||
Read(#[from] reqwest::Error),
|
||||
Read(#[from] E),
|
||||
}
|
||||
|
||||
pub(crate) async fn read_body_with_limit(
|
||||
mut b: impl Body<Data = Bytes, Error = reqwest::Error> + Unpin,
|
||||
pub(crate) async fn read_body_with_limit<E>(
|
||||
mut b: impl Body<Data = Bytes, Error = E> + Unpin,
|
||||
limit: usize,
|
||||
) -> Result<Vec<u8>, ReadBodyError> {
|
||||
) -> Result<Vec<u8>, ReadBodyError<E>> {
|
||||
// We could use `b.limited().collect().await.to_bytes()` here
|
||||
// but this ends up being slightly more efficient as far as I can tell.
|
||||
|
||||
|
||||
@@ -117,7 +117,6 @@ where
|
||||
node_info.set_keys(user_info.get_keys());
|
||||
node_info.allow_self_signed_compute = allow_self_signed_compute;
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
let retry_type = RetryType::ConnectToCompute;
|
||||
|
||||
// try once
|
||||
let err = match mechanism
|
||||
@@ -129,7 +128,7 @@ where
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
outcome: ConnectOutcome::Success,
|
||||
retry_type,
|
||||
retry_type: RetryType::ConnectToCompute,
|
||||
},
|
||||
num_retries.into(),
|
||||
);
|
||||
@@ -147,7 +146,7 @@ where
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
outcome: ConnectOutcome::Failed,
|
||||
retry_type,
|
||||
retry_type: RetryType::ConnectToCompute,
|
||||
},
|
||||
num_retries.into(),
|
||||
);
|
||||
@@ -156,8 +155,9 @@ where
|
||||
node_info
|
||||
} else {
|
||||
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
debug!("compute node's state has likely changed; requesting a wake-up");
|
||||
let old_node_info = invalidate_cache(node_info);
|
||||
// TODO: increment num_retries?
|
||||
let mut node_info =
|
||||
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
|
||||
node_info.reuse_settings(old_node_info);
|
||||
@@ -169,7 +169,7 @@ where
|
||||
// now that we have a new node, try connect to it repeatedly.
|
||||
// this can error for a few reasons, for instance:
|
||||
// * DNS connection settings haven't quite propagated yet
|
||||
info!("wake_compute success. attempting to connect");
|
||||
debug!("wake_compute success. attempting to connect");
|
||||
num_retries = 1;
|
||||
loop {
|
||||
match mechanism
|
||||
@@ -181,10 +181,11 @@ where
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
outcome: ConnectOutcome::Success,
|
||||
retry_type,
|
||||
retry_type: RetryType::ConnectToCompute,
|
||||
},
|
||||
num_retries.into(),
|
||||
);
|
||||
// TODO: is this necessary? We have a metric.
|
||||
info!(?num_retries, "connected to compute node after");
|
||||
return Ok(res);
|
||||
}
|
||||
@@ -194,7 +195,7 @@ where
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
outcome: ConnectOutcome::Failed,
|
||||
retry_type,
|
||||
retry_type: RetryType::ConnectToCompute,
|
||||
},
|
||||
num_retries.into(),
|
||||
);
|
||||
|
||||
@@ -87,6 +87,8 @@ where
|
||||
transfer_one_direction(cx, &mut compute_to_client, compute, client)
|
||||
.map_err(ErrorSource::from_compute)?;
|
||||
|
||||
// TODO: 1 info log, with a enum label for close direction.
|
||||
|
||||
// Early termination checks from compute to client.
|
||||
if let TransferState::Done(_) = compute_to_client {
|
||||
if let TransferState::Running(buf) = &client_to_compute {
|
||||
|
||||
@@ -5,7 +5,7 @@ use pq_proto::{
|
||||
};
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{info, warn};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::auth::endpoint_sni;
|
||||
use crate::config::{TlsConfig, PG_ALPN_PROTOCOL};
|
||||
@@ -199,6 +199,8 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
.await?;
|
||||
}
|
||||
|
||||
// This log highlights the start of the connection.
|
||||
// This contains useful information for debugging, not logged elsewhere, like role name and endpoint id.
|
||||
info!(
|
||||
?version,
|
||||
?params,
|
||||
@@ -211,7 +213,7 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
FeStartupPacket::StartupMessage { params, version }
|
||||
if version.major() == 3 && version > PG_PROTOCOL_LATEST =>
|
||||
{
|
||||
warn!(?version, "unsupported minor version");
|
||||
debug!(?version, "unsupported minor version");
|
||||
|
||||
// no protocol extensions are supported.
|
||||
// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/backend/tcop/backend_startup.c#L744-L753>
|
||||
@@ -233,14 +235,16 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
|
||||
info!(
|
||||
?version,
|
||||
?params,
|
||||
session_type = "normal",
|
||||
"successful handshake; unsupported minor version requested"
|
||||
);
|
||||
break Ok(HandshakeData::Startup(stream, params));
|
||||
}
|
||||
FeStartupPacket::StartupMessage { version, .. } => {
|
||||
FeStartupPacket::StartupMessage { version, params } => {
|
||||
warn!(
|
||||
?version,
|
||||
?params,
|
||||
session_type = "normal",
|
||||
"unsuccessful handshake; unsupported version"
|
||||
);
|
||||
|
||||
@@ -254,7 +254,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
conn_gauge: NumClientConnectionsGuard<'static>,
|
||||
) -> Result<Option<ProxyPassthrough<CancellationHandlerMainInternal, S>>, ClientRequestError> {
|
||||
info!(
|
||||
debug!(
|
||||
protocol = %ctx.protocol(),
|
||||
"handling interactive connection from client"
|
||||
);
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::info;
|
||||
use tracing::debug;
|
||||
use utils::measured_stream::MeasuredStream;
|
||||
|
||||
use super::copy_bidirectional::ErrorSource;
|
||||
@@ -45,7 +45,7 @@ pub(crate) async fn proxy_pass(
|
||||
);
|
||||
|
||||
// Starting from here we only proxy the client's traffic.
|
||||
info!("performing the proxy pass...");
|
||||
debug!("performing the proxy pass...");
|
||||
let _ = crate::proxy::copy_bidirectional::copy_bidirectional_client_compute(
|
||||
&mut client,
|
||||
&mut compute,
|
||||
|
||||
@@ -17,7 +17,6 @@ pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
|
||||
api: &B,
|
||||
config: RetryConfig,
|
||||
) -> Result<CachedNodeInfo, WakeComputeError> {
|
||||
let retry_type = RetryType::WakeCompute;
|
||||
loop {
|
||||
match api.wake_compute(ctx).await {
|
||||
Err(e) if !should_retry(&e, *num_retries, config) => {
|
||||
@@ -26,7 +25,7 @@ pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
outcome: ConnectOutcome::Failed,
|
||||
retry_type,
|
||||
retry_type: RetryType::WakeCompute,
|
||||
},
|
||||
(*num_retries).into(),
|
||||
);
|
||||
@@ -40,10 +39,12 @@ pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
outcome: ConnectOutcome::Success,
|
||||
retry_type,
|
||||
retry_type: RetryType::WakeCompute,
|
||||
},
|
||||
(*num_retries).into(),
|
||||
);
|
||||
// TODO: is this necessary? We have a metric.
|
||||
// TODO: this log line is misleading as "wake_compute" might return cached (and stale) info.
|
||||
info!(?num_retries, "compute node woken up after");
|
||||
return Ok(n);
|
||||
}
|
||||
|
||||
@@ -195,7 +195,11 @@ impl DynamicLimiter {
|
||||
///
|
||||
/// Set the outcome to `None` to ignore the job.
|
||||
fn release_inner(&self, start: Instant, outcome: Option<Outcome>) {
|
||||
tracing::info!("outcome is {:?}", outcome);
|
||||
if outcome.is_none() {
|
||||
tracing::warn!("outcome is {:?}", outcome);
|
||||
} else {
|
||||
tracing::debug!("outcome is {:?}", outcome);
|
||||
}
|
||||
if self.config.initial_limit == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -31,26 +31,32 @@ impl LimitAlgorithm for Aimd {
|
||||
|
||||
if utilisation > self.utilisation {
|
||||
let limit = old_limit + self.inc;
|
||||
let increased_limit = limit.clamp(self.min, self.max);
|
||||
if increased_limit > old_limit {
|
||||
tracing::info!(increased_limit, "limit increased");
|
||||
let new_limit = limit.clamp(self.min, self.max);
|
||||
if new_limit > old_limit {
|
||||
tracing::info!(old_limit, new_limit, "limit increased");
|
||||
} else {
|
||||
tracing::debug!(old_limit, new_limit, "limit clamped at max");
|
||||
}
|
||||
|
||||
increased_limit
|
||||
new_limit
|
||||
} else {
|
||||
old_limit
|
||||
}
|
||||
}
|
||||
Outcome::Overload => {
|
||||
let limit = old_limit as f32 * self.dec;
|
||||
let new_limit = old_limit as f32 * self.dec;
|
||||
|
||||
// Floor instead of round, so the limit reduces even with small numbers.
|
||||
// E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
|
||||
let limit = limit.floor() as usize;
|
||||
let new_limit = new_limit.floor() as usize;
|
||||
|
||||
let limit = limit.clamp(self.min, self.max);
|
||||
tracing::info!(limit, "limit decreased");
|
||||
limit
|
||||
let new_limit = new_limit.clamp(self.min, self.max);
|
||||
if new_limit < old_limit {
|
||||
tracing::info!(old_limit, new_limit, "limit decreased");
|
||||
} else {
|
||||
tracing::debug!(old_limit, new_limit, "limit clamped at min");
|
||||
}
|
||||
new_limit
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,6 +121,7 @@ impl RedisPublisherClient {
|
||||
cancel_key_data: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO: review redundant error duplication logs.
|
||||
if !self.limiter.check() {
|
||||
tracing::info!("Rate limit exceeded. Skipping cancellation message");
|
||||
return Err(anyhow::anyhow!("Rate limit exceeded"));
|
||||
@@ -146,7 +147,7 @@ impl CancellationPublisherMut for RedisPublisherClient {
|
||||
tracing::info!("publishing cancellation key to Redis");
|
||||
match self.try_publish_internal(cancel_key_data, session_id).await {
|
||||
Ok(()) => {
|
||||
tracing::info!("cancellation key successfuly published to Redis");
|
||||
tracing::debug!("cancellation key successfuly published to Redis");
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -12,8 +12,8 @@ use tracing::field::display;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use super::conn_pool::poll_client;
|
||||
use super::conn_pool_lib::{Client, ConnInfo, GlobalConnPool};
|
||||
use super::http_conn_pool::{self, poll_http2_client, Send};
|
||||
use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
|
||||
use super::http_conn_pool::{self, poll_http2_client, HttpConnPool, Send};
|
||||
use super::local_conn_pool::{self, LocalConnPool, EXT_NAME, EXT_SCHEMA, EXT_VERSION};
|
||||
use crate::auth::backend::local::StaticAuthRules;
|
||||
use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
|
||||
@@ -36,9 +36,10 @@ use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
|
||||
|
||||
pub(crate) struct PoolingBackend {
|
||||
pub(crate) http_conn_pool: Arc<super::http_conn_pool::GlobalConnPool<Send>>,
|
||||
pub(crate) http_conn_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
|
||||
pub(crate) local_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
|
||||
pub(crate) pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
|
||||
pub(crate) pool:
|
||||
Arc<GlobalConnPool<tokio_postgres::Client, EndpointConnPool<tokio_postgres::Client>>>,
|
||||
|
||||
pub(crate) config: &'static ProxyConfig,
|
||||
pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
|
||||
@@ -167,10 +168,10 @@ impl PoolingBackend {
|
||||
force_new: bool,
|
||||
) -> Result<Client<tokio_postgres::Client>, HttpConnError> {
|
||||
let maybe_client = if force_new {
|
||||
info!("pool: pool is disabled");
|
||||
debug!("pool: pool is disabled");
|
||||
None
|
||||
} else {
|
||||
info!("pool: looking for an existing connection");
|
||||
debug!("pool: looking for an existing connection");
|
||||
self.pool.get(ctx, &conn_info)?
|
||||
};
|
||||
|
||||
@@ -204,14 +205,14 @@ impl PoolingBackend {
|
||||
ctx: &RequestContext,
|
||||
conn_info: ConnInfo,
|
||||
) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
|
||||
info!("pool: looking for an existing connection");
|
||||
debug!("pool: looking for an existing connection");
|
||||
if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) {
|
||||
return Ok(client);
|
||||
}
|
||||
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
tracing::Span::current().record("conn_id", display(conn_id));
|
||||
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
debug!(%conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials {
|
||||
info: ComputeUserInfo {
|
||||
user: conn_info.user_info.user.clone(),
|
||||
@@ -474,7 +475,7 @@ impl ShouldRetryWakeCompute for LocalProxyConnError {
|
||||
}
|
||||
|
||||
struct TokioMechanism {
|
||||
pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
|
||||
pool: Arc<GlobalConnPool<tokio_postgres::Client, EndpointConnPool<tokio_postgres::Client>>>,
|
||||
conn_info: ConnInfo,
|
||||
conn_id: uuid::Uuid,
|
||||
|
||||
@@ -524,7 +525,7 @@ impl ConnectMechanism for TokioMechanism {
|
||||
}
|
||||
|
||||
struct HyperMechanism {
|
||||
pool: Arc<http_conn_pool::GlobalConnPool<Send>>,
|
||||
pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
|
||||
conn_info: ConnInfo,
|
||||
conn_id: uuid::Uuid,
|
||||
|
||||
|
||||
@@ -19,7 +19,8 @@ use {
|
||||
};
|
||||
|
||||
use super::conn_pool_lib::{
|
||||
Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, GlobalConnPool,
|
||||
Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, EndpointConnPool,
|
||||
GlobalConnPool,
|
||||
};
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
@@ -52,7 +53,7 @@ impl fmt::Display for ConnInfo {
|
||||
}
|
||||
|
||||
pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
global_pool: Arc<GlobalConnPool<C>>,
|
||||
global_pool: Arc<GlobalConnPool<C, EndpointConnPool<C>>>,
|
||||
ctx: &RequestContext,
|
||||
conn_info: ConnInfo,
|
||||
client: C,
|
||||
@@ -167,6 +168,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
|
||||
Client::new(inner, conn_info, pool_clone)
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ClientDataRemote {
|
||||
session: tokio::sync::watch::Sender<uuid::Uuid>,
|
||||
cancel: CancellationToken,
|
||||
@@ -243,7 +245,7 @@ mod tests {
|
||||
},
|
||||
cancel_set: CancelSet::new(0),
|
||||
client_conn_threshold: u64::MAX,
|
||||
max_request_size_bytes: u64::MAX,
|
||||
max_request_size_bytes: usize::MAX,
|
||||
max_response_size_bytes: usize::MAX,
|
||||
}));
|
||||
let pool = GlobalConnPool::new(config);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::collections::HashMap;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Deref;
|
||||
use std::sync::atomic::{self, AtomicUsize};
|
||||
use std::sync::{Arc, Weak};
|
||||
@@ -43,13 +44,14 @@ impl ConnInfo {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum ClientDataEnum {
|
||||
Remote(ClientDataRemote),
|
||||
Local(ClientDataLocal),
|
||||
#[allow(dead_code)]
|
||||
Http(ClientDataHttp),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ClientInnerCommon<C: ClientInnerExt> {
|
||||
pub(crate) inner: C,
|
||||
pub(crate) aux: MetricsAuxInfo,
|
||||
@@ -91,6 +93,7 @@ pub(crate) struct ConnPoolEntry<C: ClientInnerExt> {
|
||||
pub(crate) struct EndpointConnPool<C: ClientInnerExt> {
|
||||
pools: HashMap<(DbName, RoleName), DbUserConnPool<C>>,
|
||||
total_conns: usize,
|
||||
/// max # connections per endpoint
|
||||
max_conns: usize,
|
||||
_guard: HttpEndpointPoolsGuard<'static>,
|
||||
global_connections_count: Arc<AtomicUsize>,
|
||||
@@ -232,7 +235,7 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
||||
|
||||
// do logging outside of the mutex
|
||||
if returned {
|
||||
info!(%conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
|
||||
debug!(%conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
|
||||
} else {
|
||||
info!(%conn_id, "{pool_name}: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
|
||||
}
|
||||
@@ -317,24 +320,49 @@ impl<C: ClientInnerExt> DbUserConn<C> for DbUserConnPool<C> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct GlobalConnPool<C: ClientInnerExt> {
|
||||
pub(crate) trait EndpointConnPoolExt<C: ClientInnerExt> {
|
||||
fn clear_closed(&mut self) -> usize;
|
||||
fn total_conns(&self) -> usize;
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> EndpointConnPoolExt<C> for EndpointConnPool<C> {
|
||||
fn clear_closed(&mut self) -> usize {
|
||||
let mut clients_removed: usize = 0;
|
||||
for db_pool in self.pools.values_mut() {
|
||||
clients_removed += db_pool.clear_closed_clients(&mut self.total_conns);
|
||||
}
|
||||
clients_removed
|
||||
}
|
||||
|
||||
fn total_conns(&self) -> usize {
|
||||
self.total_conns
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct GlobalConnPool<C, P>
|
||||
where
|
||||
C: ClientInnerExt,
|
||||
P: EndpointConnPoolExt<C>,
|
||||
{
|
||||
// endpoint -> per-endpoint connection pool
|
||||
//
|
||||
// That should be a fairly conteded map, so return reference to the per-endpoint
|
||||
// pool as early as possible and release the lock.
|
||||
global_pool: DashMap<EndpointCacheKey, Arc<RwLock<EndpointConnPool<C>>>>,
|
||||
pub(crate) global_pool: DashMap<EndpointCacheKey, Arc<RwLock<P>>>,
|
||||
|
||||
/// Number of endpoint-connection pools
|
||||
///
|
||||
/// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each.
|
||||
/// That seems like far too much effort, so we're using a relaxed increment counter instead.
|
||||
/// It's only used for diagnostics.
|
||||
global_pool_size: AtomicUsize,
|
||||
pub(crate) global_pool_size: AtomicUsize,
|
||||
|
||||
/// Total number of connections in the pool
|
||||
global_connections_count: Arc<AtomicUsize>,
|
||||
pub(crate) global_connections_count: Arc<AtomicUsize>,
|
||||
|
||||
config: &'static crate::config::HttpConfig,
|
||||
pub(crate) config: &'static crate::config::HttpConfig,
|
||||
|
||||
_marker: PhantomData<C>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
@@ -357,7 +385,11 @@ pub struct GlobalConnPoolOptions {
|
||||
pub max_total_conns: usize,
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> GlobalConnPool<C> {
|
||||
impl<C, P> GlobalConnPool<C, P>
|
||||
where
|
||||
C: ClientInnerExt,
|
||||
P: EndpointConnPoolExt<C>,
|
||||
{
|
||||
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
|
||||
let shards = config.pool_options.pool_shards;
|
||||
Arc::new(Self {
|
||||
@@ -365,6 +397,7 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
||||
global_pool_size: AtomicUsize::new(0),
|
||||
config,
|
||||
global_connections_count: Arc::new(AtomicUsize::new(0)),
|
||||
_marker: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -378,60 +411,6 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
||||
self.config.pool_options.idle_timeout
|
||||
}
|
||||
|
||||
pub(crate) fn get(
|
||||
self: &Arc<Self>,
|
||||
ctx: &RequestContext,
|
||||
conn_info: &ConnInfo,
|
||||
) -> Result<Option<Client<C>>, HttpConnError> {
|
||||
let mut client: Option<ClientInnerCommon<C>> = None;
|
||||
let Some(endpoint) = conn_info.endpoint_cache_key() else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
|
||||
if let Some(entry) = endpoint_pool
|
||||
.write()
|
||||
.get_conn_entry(conn_info.db_and_user())
|
||||
{
|
||||
client = Some(entry.conn);
|
||||
}
|
||||
let endpoint_pool = Arc::downgrade(&endpoint_pool);
|
||||
|
||||
// ok return cached connection if found and establish a new one otherwise
|
||||
if let Some(mut client) = client {
|
||||
if client.inner.is_closed() {
|
||||
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
|
||||
return Ok(None);
|
||||
}
|
||||
tracing::Span::current()
|
||||
.record("conn_id", tracing::field::display(client.get_conn_id()));
|
||||
tracing::Span::current().record(
|
||||
"pid",
|
||||
tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
info!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
|
||||
match client.get_data() {
|
||||
ClientDataEnum::Local(data) => {
|
||||
data.session().send(ctx.session_id())?;
|
||||
}
|
||||
|
||||
ClientDataEnum::Remote(data) => {
|
||||
data.session().send(ctx.session_id())?;
|
||||
}
|
||||
ClientDataEnum::Http(_) => (),
|
||||
}
|
||||
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
ctx.success();
|
||||
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub(crate) fn shutdown(&self) {
|
||||
// drops all strong references to endpoint-pools
|
||||
self.global_pool.clear();
|
||||
@@ -464,17 +443,10 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
||||
// if the current endpoint pool is unique (no other strong or weak references)
|
||||
// then it is currently not in use by any connections.
|
||||
if let Some(pool) = Arc::get_mut(x.get_mut()) {
|
||||
let EndpointConnPool {
|
||||
pools, total_conns, ..
|
||||
} = pool.get_mut();
|
||||
let endpoints = pool.get_mut();
|
||||
clients_removed = endpoints.clear_closed();
|
||||
|
||||
// ensure that closed clients are removed
|
||||
for db_pool in pools.values_mut() {
|
||||
clients_removed += db_pool.clear_closed_clients(total_conns);
|
||||
}
|
||||
|
||||
// we only remove this pool if it has no active connections
|
||||
if *total_conns == 0 {
|
||||
if endpoints.total_conns() == 0 {
|
||||
info!("pool: discarding pool for endpoint {endpoint}");
|
||||
return false;
|
||||
}
|
||||
@@ -510,6 +482,62 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
||||
info!("pool: performed global pool gc. size now {global_pool_size}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt> GlobalConnPool<C, EndpointConnPool<C>> {
|
||||
pub(crate) fn get(
|
||||
self: &Arc<Self>,
|
||||
ctx: &RequestContext,
|
||||
conn_info: &ConnInfo,
|
||||
) -> Result<Option<Client<C>>, HttpConnError> {
|
||||
let mut client: Option<ClientInnerCommon<C>> = None;
|
||||
let Some(endpoint) = conn_info.endpoint_cache_key() else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
|
||||
if let Some(entry) = endpoint_pool
|
||||
.write()
|
||||
.get_conn_entry(conn_info.db_and_user())
|
||||
{
|
||||
client = Some(entry.conn);
|
||||
}
|
||||
let endpoint_pool = Arc::downgrade(&endpoint_pool);
|
||||
|
||||
// ok return cached connection if found and establish a new one otherwise
|
||||
if let Some(mut client) = client {
|
||||
if client.inner.is_closed() {
|
||||
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
|
||||
return Ok(None);
|
||||
}
|
||||
tracing::Span::current()
|
||||
.record("conn_id", tracing::field::display(client.get_conn_id()));
|
||||
tracing::Span::current().record(
|
||||
"pid",
|
||||
tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
debug!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
|
||||
match client.get_data() {
|
||||
ClientDataEnum::Local(data) => {
|
||||
data.session().send(ctx.session_id())?;
|
||||
}
|
||||
|
||||
ClientDataEnum::Remote(data) => {
|
||||
data.session().send(ctx.session_id())?;
|
||||
}
|
||||
ClientDataEnum::Http(_) => (),
|
||||
}
|
||||
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
ctx.success();
|
||||
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub(crate) fn get_or_create_endpoint_pool(
|
||||
self: &Arc<Self>,
|
||||
@@ -556,7 +584,6 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
||||
pool
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Client<C: ClientInnerExt> {
|
||||
span: Span,
|
||||
inner: Option<ClientInnerCommon<C>>,
|
||||
|
||||
@@ -2,16 +2,17 @@ use std::collections::VecDeque;
|
||||
use std::sync::atomic::{self, AtomicUsize};
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use dashmap::DashMap;
|
||||
use hyper::client::conn::http2;
|
||||
use hyper_util::rt::{TokioExecutor, TokioIo};
|
||||
use parking_lot::RwLock;
|
||||
use rand::Rng;
|
||||
use tokio::net::TcpStream;
|
||||
use tracing::{debug, error, info, info_span, Instrument};
|
||||
|
||||
use super::backend::HttpConnError;
|
||||
use super::conn_pool_lib::{ClientInnerExt, ConnInfo};
|
||||
use super::conn_pool_lib::{
|
||||
ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry,
|
||||
EndpointConnPoolExt, GlobalConnPool,
|
||||
};
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
|
||||
@@ -23,17 +24,11 @@ pub(crate) type Connect =
|
||||
http2::Connection<TokioIo<TcpStream>, hyper::body::Incoming, TokioExecutor>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ConnPoolEntry<C: ClientInnerExt + Clone> {
|
||||
conn: C,
|
||||
conn_id: uuid::Uuid,
|
||||
aux: MetricsAuxInfo,
|
||||
}
|
||||
|
||||
pub(crate) struct ClientDataHttp();
|
||||
|
||||
// Per-endpoint connection pool
|
||||
// Number of open connections is limited by the `max_conns_per_endpoint`.
|
||||
pub(crate) struct EndpointConnPool<C: ClientInnerExt + Clone> {
|
||||
pub(crate) struct HttpConnPool<C: ClientInnerExt + Clone> {
|
||||
// TODO(conrad):
|
||||
// either we should open more connections depending on stream count
|
||||
// (not exposed by hyper, need our own counter)
|
||||
@@ -48,14 +43,19 @@ pub(crate) struct EndpointConnPool<C: ClientInnerExt + Clone> {
|
||||
global_connections_count: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> EndpointConnPool<C> {
|
||||
impl<C: ClientInnerExt + Clone> HttpConnPool<C> {
|
||||
fn get_conn_entry(&mut self) -> Option<ConnPoolEntry<C>> {
|
||||
let Self { conns, .. } = self;
|
||||
|
||||
loop {
|
||||
let conn = conns.pop_front()?;
|
||||
if !conn.conn.is_closed() {
|
||||
conns.push_back(conn.clone());
|
||||
if !conn.conn.inner.is_closed() {
|
||||
let new_conn = ConnPoolEntry {
|
||||
conn: conn.conn.clone(),
|
||||
_last_access: std::time::Instant::now(),
|
||||
};
|
||||
|
||||
conns.push_back(new_conn);
|
||||
return Some(conn);
|
||||
}
|
||||
}
|
||||
@@ -69,7 +69,7 @@ impl<C: ClientInnerExt + Clone> EndpointConnPool<C> {
|
||||
} = self;
|
||||
|
||||
let old_len = conns.len();
|
||||
conns.retain(|conn| conn.conn_id != conn_id);
|
||||
conns.retain(|entry| entry.conn.conn_id != conn_id);
|
||||
let new_len = conns.len();
|
||||
let removed = old_len - new_len;
|
||||
if removed > 0 {
|
||||
@@ -84,7 +84,22 @@ impl<C: ClientInnerExt + Clone> EndpointConnPool<C> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> Drop for EndpointConnPool<C> {
|
||||
impl<C: ClientInnerExt + Clone> EndpointConnPoolExt<C> for HttpConnPool<C> {
|
||||
fn clear_closed(&mut self) -> usize {
|
||||
let Self { conns, .. } = self;
|
||||
let old_len = conns.len();
|
||||
conns.retain(|entry| !entry.conn.inner.is_closed());
|
||||
|
||||
let new_len = conns.len();
|
||||
old_len - new_len
|
||||
}
|
||||
|
||||
fn total_conns(&self) -> usize {
|
||||
self.conns.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> Drop for HttpConnPool<C> {
|
||||
fn drop(&mut self) {
|
||||
if !self.conns.is_empty() {
|
||||
self.global_connections_count
|
||||
@@ -98,117 +113,7 @@ impl<C: ClientInnerExt + Clone> Drop for EndpointConnPool<C> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct GlobalConnPool<C: ClientInnerExt + Clone> {
|
||||
// endpoint -> per-endpoint connection pool
|
||||
//
|
||||
// That should be a fairly conteded map, so return reference to the per-endpoint
|
||||
// pool as early as possible and release the lock.
|
||||
global_pool: DashMap<EndpointCacheKey, Arc<RwLock<EndpointConnPool<C>>>>,
|
||||
|
||||
/// Number of endpoint-connection pools
|
||||
///
|
||||
/// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each.
|
||||
/// That seems like far too much effort, so we're using a relaxed increment counter instead.
|
||||
/// It's only used for diagnostics.
|
||||
global_pool_size: AtomicUsize,
|
||||
|
||||
/// Total number of connections in the pool
|
||||
global_connections_count: Arc<AtomicUsize>,
|
||||
|
||||
config: &'static crate::config::HttpConfig,
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> GlobalConnPool<C> {
|
||||
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
|
||||
let shards = config.pool_options.pool_shards;
|
||||
Arc::new(Self {
|
||||
global_pool: DashMap::with_shard_amount(shards),
|
||||
global_pool_size: AtomicUsize::new(0),
|
||||
config,
|
||||
global_connections_count: Arc::new(AtomicUsize::new(0)),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn shutdown(&self) {
|
||||
// drops all strong references to endpoint-pools
|
||||
self.global_pool.clear();
|
||||
}
|
||||
|
||||
pub(crate) async fn gc_worker(&self, mut rng: impl Rng) {
|
||||
let epoch = self.config.pool_options.gc_epoch;
|
||||
let mut interval = tokio::time::interval(epoch / (self.global_pool.shards().len()) as u32);
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
let shard = rng.gen_range(0..self.global_pool.shards().len());
|
||||
self.gc(shard);
|
||||
}
|
||||
}
|
||||
|
||||
fn gc(&self, shard: usize) {
|
||||
debug!(shard, "pool: performing epoch reclamation");
|
||||
|
||||
// acquire a random shard lock
|
||||
let mut shard = self.global_pool.shards()[shard].write();
|
||||
|
||||
let timer = Metrics::get()
|
||||
.proxy
|
||||
.http_pool_reclaimation_lag_seconds
|
||||
.start_timer();
|
||||
let current_len = shard.len();
|
||||
let mut clients_removed = 0;
|
||||
shard.retain(|endpoint, x| {
|
||||
// if the current endpoint pool is unique (no other strong or weak references)
|
||||
// then it is currently not in use by any connections.
|
||||
if let Some(pool) = Arc::get_mut(x.get_mut()) {
|
||||
let EndpointConnPool { conns, .. } = pool.get_mut();
|
||||
|
||||
let old_len = conns.len();
|
||||
|
||||
conns.retain(|conn| !conn.conn.is_closed());
|
||||
|
||||
let new_len = conns.len();
|
||||
let removed = old_len - new_len;
|
||||
clients_removed += removed;
|
||||
|
||||
// we only remove this pool if it has no active connections
|
||||
if conns.is_empty() {
|
||||
info!("pool: discarding pool for endpoint {endpoint}");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
});
|
||||
|
||||
let new_len = shard.len();
|
||||
drop(shard);
|
||||
timer.observe();
|
||||
|
||||
// Do logging outside of the lock.
|
||||
if clients_removed > 0 {
|
||||
let size = self
|
||||
.global_connections_count
|
||||
.fetch_sub(clients_removed, atomic::Ordering::Relaxed)
|
||||
- clients_removed;
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_pool_opened_connections
|
||||
.get_metric()
|
||||
.dec_by(clients_removed as i64);
|
||||
info!("pool: performed global pool gc. removed {clients_removed} clients, total number of clients in pool is {size}");
|
||||
}
|
||||
let removed = current_len - new_len;
|
||||
|
||||
if removed > 0 {
|
||||
let global_pool_size = self
|
||||
.global_pool_size
|
||||
.fetch_sub(removed, atomic::Ordering::Relaxed)
|
||||
- removed;
|
||||
info!("pool: performed global pool gc. size now {global_pool_size}");
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> GlobalConnPool<C, HttpConnPool<C>> {
|
||||
#[expect(unused_results)]
|
||||
pub(crate) fn get(
|
||||
self: &Arc<Self>,
|
||||
@@ -226,27 +131,28 @@ impl<C: ClientInnerExt + Clone> GlobalConnPool<C> {
|
||||
return result;
|
||||
};
|
||||
|
||||
tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
|
||||
info!(
|
||||
tracing::Span::current().record("conn_id", tracing::field::display(client.conn.conn_id));
|
||||
debug!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
ctx.success();
|
||||
Ok(Some(Client::new(client.conn, client.aux)))
|
||||
|
||||
Ok(Some(Client::new(client.conn.clone())))
|
||||
}
|
||||
|
||||
fn get_or_create_endpoint_pool(
|
||||
self: &Arc<Self>,
|
||||
endpoint: &EndpointCacheKey,
|
||||
) -> Arc<RwLock<EndpointConnPool<C>>> {
|
||||
) -> Arc<RwLock<HttpConnPool<C>>> {
|
||||
// fast path
|
||||
if let Some(pool) = self.global_pool.get(endpoint) {
|
||||
return pool.clone();
|
||||
}
|
||||
|
||||
// slow path
|
||||
let new_pool = Arc::new(RwLock::new(EndpointConnPool {
|
||||
let new_pool = Arc::new(RwLock::new(HttpConnPool {
|
||||
conns: VecDeque::new(),
|
||||
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
|
||||
global_connections_count: self.global_connections_count.clone(),
|
||||
@@ -279,7 +185,7 @@ impl<C: ClientInnerExt + Clone> GlobalConnPool<C> {
|
||||
}
|
||||
|
||||
pub(crate) fn poll_http2_client(
|
||||
global_pool: Arc<GlobalConnPool<Send>>,
|
||||
global_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
|
||||
ctx: &RequestContext,
|
||||
conn_info: &ConnInfo,
|
||||
client: Send,
|
||||
@@ -299,11 +205,15 @@ pub(crate) fn poll_http2_client(
|
||||
let pool = match conn_info.endpoint_cache_key() {
|
||||
Some(endpoint) => {
|
||||
let pool = global_pool.get_or_create_endpoint_pool(&endpoint);
|
||||
|
||||
pool.write().conns.push_back(ConnPoolEntry {
|
||||
conn: client.clone(),
|
||||
conn_id,
|
||||
let client = ClientInnerCommon {
|
||||
inner: client.clone(),
|
||||
aux: aux.clone(),
|
||||
conn_id,
|
||||
data: ClientDataEnum::Http(ClientDataHttp()),
|
||||
};
|
||||
pool.write().conns.push_back(ConnPoolEntry {
|
||||
conn: client,
|
||||
_last_access: std::time::Instant::now(),
|
||||
});
|
||||
Metrics::get()
|
||||
.proxy
|
||||
@@ -335,23 +245,30 @@ pub(crate) fn poll_http2_client(
|
||||
.instrument(span),
|
||||
);
|
||||
|
||||
Client::new(client, aux)
|
||||
let client = ClientInnerCommon {
|
||||
inner: client,
|
||||
aux,
|
||||
conn_id,
|
||||
data: ClientDataEnum::Http(ClientDataHttp()),
|
||||
};
|
||||
|
||||
Client::new(client)
|
||||
}
|
||||
|
||||
pub(crate) struct Client<C: ClientInnerExt + Clone> {
|
||||
pub(crate) inner: C,
|
||||
aux: MetricsAuxInfo,
|
||||
pub(crate) inner: ClientInnerCommon<C>,
|
||||
}
|
||||
|
||||
impl<C: ClientInnerExt + Clone> Client<C> {
|
||||
pub(self) fn new(inner: C, aux: MetricsAuxInfo) -> Self {
|
||||
Self { inner, aux }
|
||||
pub(self) fn new(inner: ClientInnerCommon<C>) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
|
||||
pub(crate) fn metrics(&self) -> Arc<MetricCounter> {
|
||||
let aux = &self.inner.aux;
|
||||
USAGE_METRICS.register(Ids {
|
||||
endpoint_id: self.aux.endpoint_id,
|
||||
branch_id: self.aux.branch_id,
|
||||
endpoint_id: aux.endpoint_id,
|
||||
branch_id: aux.branch_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ use tokio_postgres::tls::NoTlsStream;
|
||||
use tokio_postgres::types::ToSql;
|
||||
use tokio_postgres::{AsyncMessage, Socket};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
use tracing::{debug, error, info, info_span, warn, Instrument};
|
||||
|
||||
use super::backend::HttpConnError;
|
||||
use super::conn_pool_lib::{
|
||||
@@ -44,6 +44,7 @@ pub(crate) const EXT_NAME: &str = "pg_session_jwt";
|
||||
pub(crate) const EXT_VERSION: &str = "0.1.2";
|
||||
pub(crate) const EXT_SCHEMA: &str = "auth";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ClientDataLocal {
|
||||
session: tokio::sync::watch::Sender<uuid::Uuid>,
|
||||
cancel: CancellationToken,
|
||||
@@ -110,7 +111,7 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
|
||||
"pid",
|
||||
tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
info!(
|
||||
debug!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"local_pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
|
||||
@@ -88,7 +88,7 @@ pub async fn task_main(
|
||||
}
|
||||
});
|
||||
|
||||
let http_conn_pool = http_conn_pool::GlobalConnPool::new(&config.http_config);
|
||||
let http_conn_pool = conn_pool_lib::GlobalConnPool::new(&config.http_config);
|
||||
{
|
||||
let http_conn_pool = Arc::clone(&http_conn_pool);
|
||||
tokio::spawn(async move {
|
||||
|
||||
@@ -8,7 +8,7 @@ use http::header::AUTHORIZATION;
|
||||
use http::Method;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt, Full};
|
||||
use hyper::body::{Body, Incoming};
|
||||
use hyper::body::Incoming;
|
||||
use hyper::http::{HeaderName, HeaderValue};
|
||||
use hyper::{header, HeaderMap, Request, Response, StatusCode};
|
||||
use pq_proto::StartupMessageParamsBuilder;
|
||||
@@ -18,7 +18,7 @@ use tokio::time;
|
||||
use tokio_postgres::error::{DbError, ErrorPosition, SqlState};
|
||||
use tokio_postgres::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info};
|
||||
use tracing::{debug, error, info};
|
||||
use typed_json::json;
|
||||
use url::Url;
|
||||
use urlencoding;
|
||||
@@ -36,6 +36,7 @@ use crate::auth::{endpoint_sni, ComputeUserInfoParseError};
|
||||
use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig};
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::http::{read_body_with_limit, ReadBodyError};
|
||||
use crate::metrics::{HttpDirection, Metrics};
|
||||
use crate::proxy::{run_until_cancelled, NeonOptions};
|
||||
use crate::serverless::backend::HttpConnError;
|
||||
@@ -47,6 +48,7 @@ use crate::usage_metrics::{MetricCounter, MetricCounterRecorder};
|
||||
struct QueryData {
|
||||
query: String,
|
||||
#[serde(deserialize_with = "bytes_to_pg_text")]
|
||||
#[serde(default)]
|
||||
params: Vec<Option<String>>,
|
||||
#[serde(default)]
|
||||
array_mode: Option<bool>,
|
||||
@@ -357,8 +359,6 @@ pub(crate) enum SqlOverHttpError {
|
||||
ConnectCompute(#[from] HttpConnError),
|
||||
#[error("{0}")]
|
||||
ConnInfo(#[from] ConnInfoError),
|
||||
#[error("request is too large (max is {0} bytes)")]
|
||||
RequestTooLarge(u64),
|
||||
#[error("response is too large (max is {0} bytes)")]
|
||||
ResponseTooLarge(usize),
|
||||
#[error("invalid isolation level")]
|
||||
@@ -377,7 +377,6 @@ impl ReportableError for SqlOverHttpError {
|
||||
SqlOverHttpError::ReadPayload(e) => e.get_error_kind(),
|
||||
SqlOverHttpError::ConnectCompute(e) => e.get_error_kind(),
|
||||
SqlOverHttpError::ConnInfo(e) => e.get_error_kind(),
|
||||
SqlOverHttpError::RequestTooLarge(_) => ErrorKind::User,
|
||||
SqlOverHttpError::ResponseTooLarge(_) => ErrorKind::User,
|
||||
SqlOverHttpError::InvalidIsolationLevel => ErrorKind::User,
|
||||
SqlOverHttpError::Postgres(p) => p.get_error_kind(),
|
||||
@@ -393,7 +392,6 @@ impl UserFacingError for SqlOverHttpError {
|
||||
SqlOverHttpError::ReadPayload(p) => p.to_string(),
|
||||
SqlOverHttpError::ConnectCompute(c) => c.to_string_client(),
|
||||
SqlOverHttpError::ConnInfo(c) => c.to_string_client(),
|
||||
SqlOverHttpError::RequestTooLarge(_) => self.to_string(),
|
||||
SqlOverHttpError::ResponseTooLarge(_) => self.to_string(),
|
||||
SqlOverHttpError::InvalidIsolationLevel => self.to_string(),
|
||||
SqlOverHttpError::Postgres(p) => p.to_string(),
|
||||
@@ -406,13 +404,12 @@ impl UserFacingError for SqlOverHttpError {
|
||||
impl HttpCodeError for SqlOverHttpError {
|
||||
fn get_http_status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
SqlOverHttpError::ReadPayload(_) => StatusCode::BAD_REQUEST,
|
||||
SqlOverHttpError::ReadPayload(e) => e.get_http_status_code(),
|
||||
SqlOverHttpError::ConnectCompute(h) => match h.get_error_kind() {
|
||||
ErrorKind::User => StatusCode::BAD_REQUEST,
|
||||
_ => StatusCode::INTERNAL_SERVER_ERROR,
|
||||
},
|
||||
SqlOverHttpError::ConnInfo(_) => StatusCode::BAD_REQUEST,
|
||||
SqlOverHttpError::RequestTooLarge(_) => StatusCode::PAYLOAD_TOO_LARGE,
|
||||
SqlOverHttpError::ResponseTooLarge(_) => StatusCode::INSUFFICIENT_STORAGE,
|
||||
SqlOverHttpError::InvalidIsolationLevel => StatusCode::BAD_REQUEST,
|
||||
SqlOverHttpError::Postgres(_) => StatusCode::BAD_REQUEST,
|
||||
@@ -426,19 +423,41 @@ impl HttpCodeError for SqlOverHttpError {
|
||||
pub(crate) enum ReadPayloadError {
|
||||
#[error("could not read the HTTP request body: {0}")]
|
||||
Read(#[from] hyper::Error),
|
||||
#[error("request is too large (max is {limit} bytes)")]
|
||||
BodyTooLarge { limit: usize },
|
||||
#[error("could not parse the HTTP request body: {0}")]
|
||||
Parse(#[from] serde_json::Error),
|
||||
}
|
||||
|
||||
impl From<ReadBodyError<hyper::Error>> for ReadPayloadError {
|
||||
fn from(value: ReadBodyError<hyper::Error>) -> Self {
|
||||
match value {
|
||||
ReadBodyError::BodyTooLarge { limit } => Self::BodyTooLarge { limit },
|
||||
ReadBodyError::Read(e) => Self::Read(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReportableError for ReadPayloadError {
|
||||
fn get_error_kind(&self) -> ErrorKind {
|
||||
match self {
|
||||
ReadPayloadError::Read(_) => ErrorKind::ClientDisconnect,
|
||||
ReadPayloadError::BodyTooLarge { .. } => ErrorKind::User,
|
||||
ReadPayloadError::Parse(_) => ErrorKind::User,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HttpCodeError for ReadPayloadError {
|
||||
fn get_http_status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
ReadPayloadError::Read(_) => StatusCode::BAD_REQUEST,
|
||||
ReadPayloadError::BodyTooLarge { .. } => StatusCode::PAYLOAD_TOO_LARGE,
|
||||
ReadPayloadError::Parse(_) => StatusCode::BAD_REQUEST,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum SqlOverHttpCancel {
|
||||
#[error("query was cancelled")]
|
||||
@@ -580,28 +599,20 @@ async fn handle_db_inner(
|
||||
|
||||
let parsed_headers = HttpHeaders::try_parse(headers)?;
|
||||
|
||||
let request_content_length = match request.body().size_hint().upper() {
|
||||
Some(v) => v,
|
||||
None => config.http_config.max_request_size_bytes + 1,
|
||||
};
|
||||
info!(request_content_length, "request size in bytes");
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_conn_content_length_bytes
|
||||
.observe(HttpDirection::Request, request_content_length as f64);
|
||||
|
||||
// we don't have a streaming request support yet so this is to prevent OOM
|
||||
// from a malicious user sending an extremely large request body
|
||||
if request_content_length > config.http_config.max_request_size_bytes {
|
||||
return Err(SqlOverHttpError::RequestTooLarge(
|
||||
config.http_config.max_request_size_bytes,
|
||||
));
|
||||
}
|
||||
|
||||
let fetch_and_process_request = Box::pin(
|
||||
async {
|
||||
let body = request.into_body().collect().await?.to_bytes();
|
||||
info!(length = body.len(), "request payload read");
|
||||
let body = read_body_with_limit(
|
||||
request.into_body(),
|
||||
config.http_config.max_request_size_bytes,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_conn_content_length_bytes
|
||||
.observe(HttpDirection::Request, body.len() as f64);
|
||||
|
||||
debug!(length = body.len(), "request payload read");
|
||||
let payload: Payload = serde_json::from_slice(&body)?;
|
||||
Ok::<Payload, ReadPayloadError>(payload) // Adjust error type accordingly
|
||||
}
|
||||
@@ -768,6 +779,7 @@ async fn handle_auth_broker_inner(
|
||||
let _metrics = client.metrics();
|
||||
|
||||
Ok(client
|
||||
.inner
|
||||
.inner
|
||||
.send_request(req)
|
||||
.await
|
||||
@@ -1095,3 +1107,63 @@ impl Discard<'_> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_payload() {
|
||||
let payload = "{\"query\":\"SELECT * FROM users WHERE name = ?\",\"params\":[\"test\"],\"arrayMode\":true}";
|
||||
let deserialized_payload: Payload = serde_json::from_str(payload).unwrap();
|
||||
|
||||
match deserialized_payload {
|
||||
Payload::Single(QueryData {
|
||||
query,
|
||||
params,
|
||||
array_mode,
|
||||
}) => {
|
||||
assert_eq!(query, "SELECT * FROM users WHERE name = ?");
|
||||
assert_eq!(params, vec![Some(String::from("test"))]);
|
||||
assert!(array_mode.unwrap());
|
||||
}
|
||||
Payload::Batch(_) => {
|
||||
panic!("deserialization failed: case with single query, one param, and array mode")
|
||||
}
|
||||
}
|
||||
|
||||
let payload = "{\"queries\":[{\"query\":\"SELECT * FROM users0 WHERE name = ?\",\"params\":[\"test0\"], \"arrayMode\":false},{\"query\":\"SELECT * FROM users1 WHERE name = ?\",\"params\":[\"test1\"],\"arrayMode\":true}]}";
|
||||
let deserialized_payload: Payload = serde_json::from_str(payload).unwrap();
|
||||
|
||||
match deserialized_payload {
|
||||
Payload::Batch(BatchQueryData { queries }) => {
|
||||
assert_eq!(queries.len(), 2);
|
||||
for (i, query) in queries.into_iter().enumerate() {
|
||||
assert_eq!(
|
||||
query.query,
|
||||
format!("SELECT * FROM users{i} WHERE name = ?")
|
||||
);
|
||||
assert_eq!(query.params, vec![Some(format!("test{i}"))]);
|
||||
assert_eq!(query.array_mode.unwrap(), i > 0);
|
||||
}
|
||||
}
|
||||
Payload::Single(_) => panic!("deserialization failed: case with multiple queries"),
|
||||
}
|
||||
|
||||
let payload = "{\"query\":\"SELECT 1\"}";
|
||||
let deserialized_payload: Payload = serde_json::from_str(payload).unwrap();
|
||||
|
||||
match deserialized_payload {
|
||||
Payload::Single(QueryData {
|
||||
query,
|
||||
params,
|
||||
array_mode,
|
||||
}) => {
|
||||
assert_eq!(query, "SELECT 1");
|
||||
assert_eq!(params, vec![]);
|
||||
assert!(array_mode.is_none());
|
||||
}
|
||||
Payload::Batch(_) => panic!("deserialization failed: case with only one query"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ use utils::{backoff, id::TenantId};
|
||||
use crate::{
|
||||
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
|
||||
init_remote, list_objects_with_retries,
|
||||
metadata_stream::{stream_tenant_timelines, stream_tenants},
|
||||
metadata_stream::{stream_tenant_timelines, stream_tenants_maybe_prefix},
|
||||
BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth, MAX_RETRIES,
|
||||
};
|
||||
|
||||
@@ -118,9 +118,17 @@ pub async fn find_garbage(
|
||||
console_config: ConsoleConfig,
|
||||
depth: TraversingDepth,
|
||||
node_kind: NodeKind,
|
||||
tenant_id_prefix: Option<String>,
|
||||
output_path: String,
|
||||
) -> anyhow::Result<()> {
|
||||
let garbage = find_garbage_inner(bucket_config, console_config, depth, node_kind).await?;
|
||||
let garbage = find_garbage_inner(
|
||||
bucket_config,
|
||||
console_config,
|
||||
depth,
|
||||
node_kind,
|
||||
tenant_id_prefix,
|
||||
)
|
||||
.await?;
|
||||
let serialized = serde_json::to_vec_pretty(&garbage)?;
|
||||
|
||||
tokio::fs::write(&output_path, &serialized).await?;
|
||||
@@ -152,6 +160,7 @@ async fn find_garbage_inner(
|
||||
console_config: ConsoleConfig,
|
||||
depth: TraversingDepth,
|
||||
node_kind: NodeKind,
|
||||
tenant_id_prefix: Option<String>,
|
||||
) -> anyhow::Result<GarbageList> {
|
||||
// Construct clients for S3 and for Console API
|
||||
let (remote_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
|
||||
@@ -178,7 +187,7 @@ async fn find_garbage_inner(
|
||||
|
||||
// Enumerate Tenants in S3, and check if each one exists in Console
|
||||
tracing::info!("Finding all tenants in {}...", bucket_config.desc_str());
|
||||
let tenants = stream_tenants(&remote_client, &target);
|
||||
let tenants = stream_tenants_maybe_prefix(&remote_client, &target, tenant_id_prefix);
|
||||
let tenants_checked = tenants.map_ok(|t| {
|
||||
let api_client = cloud_admin_api_client.clone();
|
||||
let console_cache = console_cache.clone();
|
||||
|
||||
@@ -54,6 +54,8 @@ enum Command {
|
||||
node_kind: NodeKind,
|
||||
#[arg(short, long, default_value_t=TraversingDepth::Tenant)]
|
||||
depth: TraversingDepth,
|
||||
#[arg(short, long, default_value=None)]
|
||||
tenant_id_prefix: Option<String>,
|
||||
#[arg(short, long, default_value_t = String::from("garbage.json"))]
|
||||
output_path: String,
|
||||
},
|
||||
@@ -209,10 +211,19 @@ async fn main() -> anyhow::Result<()> {
|
||||
Command::FindGarbage {
|
||||
node_kind,
|
||||
depth,
|
||||
tenant_id_prefix,
|
||||
output_path,
|
||||
} => {
|
||||
let console_config = ConsoleConfig::from_env()?;
|
||||
find_garbage(bucket_config, console_config, depth, node_kind, output_path).await
|
||||
find_garbage(
|
||||
bucket_config,
|
||||
console_config,
|
||||
depth,
|
||||
node_kind,
|
||||
tenant_id_prefix,
|
||||
output_path,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Command::PurgeGarbage {
|
||||
input_path,
|
||||
|
||||
@@ -17,9 +17,20 @@ use utils::id::{TenantId, TimelineId};
|
||||
pub fn stream_tenants<'a>(
|
||||
remote_client: &'a GenericRemoteStorage,
|
||||
target: &'a RootTarget,
|
||||
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
|
||||
stream_tenants_maybe_prefix(remote_client, target, None)
|
||||
}
|
||||
/// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
|
||||
pub fn stream_tenants_maybe_prefix<'a>(
|
||||
remote_client: &'a GenericRemoteStorage,
|
||||
target: &'a RootTarget,
|
||||
tenant_id_prefix: Option<String>,
|
||||
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
|
||||
try_stream! {
|
||||
let tenants_target = target.tenants_root();
|
||||
let mut tenants_target = target.tenants_root();
|
||||
if let Some(tenant_id_prefix) = tenant_id_prefix {
|
||||
tenants_target.prefix_in_bucket += &tenant_id_prefix;
|
||||
}
|
||||
let mut tenants_stream =
|
||||
std::pin::pin!(stream_objects_with_retries(remote_client, ListingMode::WithDelimiter, &tenants_target));
|
||||
while let Some(chunk) = tenants_stream.next().await {
|
||||
|
||||
@@ -77,14 +77,16 @@ class MockS3Server:
|
||||
class LocalFsStorage:
|
||||
root: Path
|
||||
|
||||
def tenant_path(self, tenant_id: TenantId) -> Path:
|
||||
def tenant_path(self, tenant_id: Union[TenantId, TenantShardId]) -> Path:
|
||||
return self.root / "tenants" / str(tenant_id)
|
||||
|
||||
def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
||||
def timeline_path(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
) -> Path:
|
||||
return self.tenant_path(tenant_id) / "timelines" / str(timeline_id)
|
||||
|
||||
def timeline_latest_generation(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
) -> Optional[int]:
|
||||
timeline_files = os.listdir(self.timeline_path(tenant_id, timeline_id))
|
||||
index_parts = [f for f in timeline_files if f.startswith("index_part")]
|
||||
@@ -102,7 +104,9 @@ class LocalFsStorage:
|
||||
raise RuntimeError(f"No index_part found for {tenant_id}/{timeline_id}")
|
||||
return generations[-1]
|
||||
|
||||
def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
||||
def index_path(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
) -> Path:
|
||||
latest_gen = self.timeline_latest_generation(tenant_id, timeline_id)
|
||||
if latest_gen is None:
|
||||
filename = TIMELINE_INDEX_PART_FILE_NAME
|
||||
@@ -126,7 +130,9 @@ class LocalFsStorage:
|
||||
filename = f"{local_name}-{generation:08x}"
|
||||
return self.timeline_path(tenant_id, timeline_id) / filename
|
||||
|
||||
def index_content(self, tenant_id: TenantId, timeline_id: TimelineId) -> Any:
|
||||
def index_content(
|
||||
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
|
||||
) -> Any:
|
||||
with self.index_path(tenant_id, timeline_id).open("r") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ Some handy pytest flags for local development:
|
||||
- `-k` selects a test to run
|
||||
- `--timeout=0` disables our default timeout of 300s (see `setup.cfg`)
|
||||
- `--preserve-database-files` to skip cleanup
|
||||
- `--out-dir` to produce a JSON with the recorded test metrics
|
||||
|
||||
# What performance tests do we have and how we run them
|
||||
|
||||
@@ -36,6 +37,6 @@ All tests run only once. Usually to obtain more consistent performance numbers,
|
||||
|
||||
## Results collection
|
||||
|
||||
Local test results for main branch, and results of daily performance tests, are stored in a neon project deployed in production environment. There is a Grafana dashboard that visualizes the results. Here is the [dashboard](https://observer.zenith.tech/d/DGKBm9Jnz/perf-test-results?orgId=1). The main problem with it is the unavailability to point at particular commit, though the data for that is available in the database. Needs some tweaking from someone who knows Grafana tricks.
|
||||
Local test results for main branch, and results of daily performance tests, are stored in a [neon project](https://console.neon.tech/app/projects/withered-sky-69117821) deployed in production environment. There is a Grafana dashboard that visualizes the results. Here is the [dashboard](https://observer.zenith.tech/d/DGKBm9Jnz/perf-test-results?orgId=1). The main problem with it is the unavailability to point at particular commit, though the data for that is available in the database. Needs some tweaking from someone who knows Grafana tricks.
|
||||
|
||||
There is also an inconsistency in test naming. Test name should be the same across platforms, and results can be differentiated by the platform field. But currently, platform is sometimes included in test name because of the way how parametrization works in pytest. I.e. there is a platform switch in the dashboard with neon-local-ci and neon-staging variants. I.e. some tests under neon-local-ci value for a platform switch are displayed as `Test test_runner/performance/test_bulk_insert.py::test_bulk_insert[vanilla]` and `Test test_runner/performance/test_bulk_insert.py::test_bulk_insert[neon]` which is highly confusing.
|
||||
|
||||
@@ -0,0 +1,196 @@
|
||||
import dataclasses
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.utils import humantime_to_ms
|
||||
|
||||
TARGET_RUNTIME = 60
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"tablesize_mib, batch_timeout, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
|
||||
[
|
||||
# the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout
|
||||
(50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"),
|
||||
(50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"),
|
||||
(50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"),
|
||||
# the next 4 cases demonstrate how batchable workloads benefit from batching
|
||||
(50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"),
|
||||
(50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"),
|
||||
(50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"),
|
||||
(50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"),
|
||||
],
|
||||
)
|
||||
def test_getpage_merge_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
tablesize_mib: int,
|
||||
batch_timeout: Optional[str],
|
||||
target_runtime: int,
|
||||
effective_io_concurrency: int,
|
||||
readhead_buffer_size: int,
|
||||
name: str,
|
||||
):
|
||||
"""
|
||||
Do a bunch of sequential scans and ensure that the pageserver does some merging.
|
||||
"""
|
||||
|
||||
#
|
||||
# record perf-related parameters as metrics to simplify processing of results
|
||||
#
|
||||
params: dict[str, tuple[float | int, dict[str, Any]]] = {}
|
||||
|
||||
params.update(
|
||||
{
|
||||
"tablesize_mib": (tablesize_mib, {"unit": "MiB"}),
|
||||
"batch_timeout": (
|
||||
-1 if batch_timeout is None else 1e3 * humantime_to_ms(batch_timeout),
|
||||
{"unit": "us"},
|
||||
),
|
||||
# target_runtime is just a polite ask to the workload to run for this long
|
||||
"effective_io_concurrency": (effective_io_concurrency, {}),
|
||||
"readhead_buffer_size": (readhead_buffer_size, {}),
|
||||
# name is not a metric
|
||||
}
|
||||
)
|
||||
|
||||
log.info("params: %s", params)
|
||||
|
||||
for param, (value, kwargs) in params.items():
|
||||
zenbenchmark.record(
|
||||
param,
|
||||
metric_value=value,
|
||||
unit=kwargs.pop("unit", ""),
|
||||
report=MetricReport.TEST_PARAM,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
#
|
||||
# Setup
|
||||
#
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute("SET max_parallel_workers_per_gather=0") # disable parallel backends
|
||||
cur.execute(f"SET effective_io_concurrency={effective_io_concurrency}")
|
||||
cur.execute(
|
||||
f"SET neon.readahead_buffer_size={readhead_buffer_size}"
|
||||
) # this is the current default value, but let's hard-code that
|
||||
|
||||
cur.execute("CREATE EXTENSION IF NOT EXISTS neon;")
|
||||
cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
|
||||
|
||||
log.info("Filling the table")
|
||||
cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)")
|
||||
tablesize = tablesize_mib * 1024 * 1024
|
||||
npages = tablesize // (8 * 1024)
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
|
||||
# TODO: can we force postgres to do sequential scans?
|
||||
|
||||
#
|
||||
# Run the workload, collect `Metrics` before and after, calculate difference, normalize.
|
||||
#
|
||||
|
||||
@dataclass
|
||||
class Metrics:
|
||||
time: float
|
||||
pageserver_getpage_count: float
|
||||
pageserver_vectored_get_count: float
|
||||
compute_getpage_count: float
|
||||
pageserver_cpu_seconds_total: float
|
||||
|
||||
def __sub__(self, other: "Metrics") -> "Metrics":
|
||||
return Metrics(
|
||||
time=self.time - other.time,
|
||||
pageserver_getpage_count=self.pageserver_getpage_count
|
||||
- other.pageserver_getpage_count,
|
||||
pageserver_vectored_get_count=self.pageserver_vectored_get_count
|
||||
- other.pageserver_vectored_get_count,
|
||||
compute_getpage_count=self.compute_getpage_count - other.compute_getpage_count,
|
||||
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total
|
||||
- other.pageserver_cpu_seconds_total,
|
||||
)
|
||||
|
||||
def normalize(self, by) -> "Metrics":
|
||||
return Metrics(
|
||||
time=self.time / by,
|
||||
pageserver_getpage_count=self.pageserver_getpage_count / by,
|
||||
pageserver_vectored_get_count=self.pageserver_vectored_get_count / by,
|
||||
compute_getpage_count=self.compute_getpage_count / by,
|
||||
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total / by,
|
||||
)
|
||||
|
||||
def get_metrics() -> Metrics:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"select value from neon_perf_counters where metric='getpage_wait_seconds_count';"
|
||||
)
|
||||
compute_getpage_count = cur.fetchall()[0][0]
|
||||
pageserver_metrics = ps_http.get_metrics()
|
||||
return Metrics(
|
||||
time=time.time(),
|
||||
pageserver_getpage_count=pageserver_metrics.query_one(
|
||||
"pageserver_smgr_query_seconds_count", {"smgr_query_type": "get_page_at_lsn"}
|
||||
).value,
|
||||
pageserver_vectored_get_count=pageserver_metrics.query_one(
|
||||
"pageserver_get_vectored_seconds_count", {"task_kind": "PageRequestHandler"}
|
||||
).value,
|
||||
compute_getpage_count=compute_getpage_count,
|
||||
pageserver_cpu_seconds_total=pageserver_metrics.query_one(
|
||||
"libmetrics_process_cpu_seconds_highres"
|
||||
).value,
|
||||
)
|
||||
|
||||
def workload() -> Metrics:
|
||||
start = time.time()
|
||||
iters = 0
|
||||
while time.time() - start < target_runtime or iters < 2:
|
||||
log.info("Seqscan %d", iters)
|
||||
if iters == 1:
|
||||
# round zero for warming up
|
||||
before = get_metrics()
|
||||
cur.execute(
|
||||
"select clear_buffer_cache()"
|
||||
) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests
|
||||
cur.execute("select sum(data::bigint) from t")
|
||||
assert cur.fetchall()[0][0] == npages * (npages + 1) // 2
|
||||
iters += 1
|
||||
after = get_metrics()
|
||||
return (after - before).normalize(iters - 1)
|
||||
|
||||
env.pageserver.patch_config_toml_nonrecursive({"server_side_batch_timeout": batch_timeout})
|
||||
env.pageserver.restart()
|
||||
metrics = workload()
|
||||
|
||||
log.info("Results: %s", metrics)
|
||||
|
||||
#
|
||||
# Sanity-checks on the collected data
|
||||
#
|
||||
# assert that getpage counts roughly match between compute and ps
|
||||
assert metrics.pageserver_getpage_count == pytest.approx(
|
||||
metrics.compute_getpage_count, rel=0.01
|
||||
)
|
||||
|
||||
#
|
||||
# Record the results
|
||||
#
|
||||
|
||||
for metric, value in dataclasses.asdict(metrics).items():
|
||||
zenbenchmark.record(f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM)
|
||||
|
||||
zenbenchmark.record(
|
||||
"perfmetric.batching_factor",
|
||||
metrics.pageserver_getpage_count / metrics.pageserver_vectored_get_count,
|
||||
unit="",
|
||||
report=MetricReport.HIGHER_IS_BETTER,
|
||||
)
|
||||
@@ -365,6 +365,19 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
|
||||
workload.validate(pageserver_a.id)
|
||||
workload.validate(pageserver_b.id)
|
||||
|
||||
# Force compaction on destination pageserver
|
||||
pageserver_b.http_client().timeline_compact(tenant_id, timeline_id, force_l0_compaction=True)
|
||||
|
||||
# Destination pageserver is in AttachedMulti, it should have generated deletions but
|
||||
# not enqueued them yet.
|
||||
# Check deletion metrics via prometheus - should be 0 since we're in AttachedMulti
|
||||
assert (
|
||||
pageserver_b.http_client().get_metric_value(
|
||||
"pageserver_deletion_queue_submitted_total",
|
||||
)
|
||||
== 0
|
||||
)
|
||||
|
||||
# Revert the origin to secondary
|
||||
log.info("Setting origin to Secondary")
|
||||
pageserver_a.tenant_location_configure(
|
||||
@@ -389,6 +402,17 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
|
||||
},
|
||||
)
|
||||
|
||||
# Transition to AttachedSingle should have drained deletions generated by doing a compaction
|
||||
# while in AttachedMulti.
|
||||
def blocked_deletions_drained():
|
||||
submitted = pageserver_b.http_client().get_metric_value(
|
||||
"pageserver_deletion_queue_submitted_total"
|
||||
)
|
||||
assert submitted is not None
|
||||
assert submitted > 0
|
||||
|
||||
wait_until(10, 0.1, blocked_deletions_drained)
|
||||
|
||||
workload.churn_rows(64, pageserver_b.id)
|
||||
workload.validate(pageserver_b.id)
|
||||
del workload
|
||||
|
||||
@@ -110,13 +110,15 @@ def post_checks(env: NeonEnv, test_output_dir: Path, db_name: str, endpoint: End
|
||||
|
||||
check_restored_datadir_content(test_output_dir, env, endpoint, ignored_files=ignored_files)
|
||||
|
||||
# Ensure that compaction works, on a timeline containing all the diversity that postgres regression tests create.
|
||||
# Ensure that compaction/GC works, on a timeline containing all the diversity that postgres regression tests create.
|
||||
# There should have been compactions mid-test as well, this final check is in addition those.
|
||||
for shard, pageserver in tenant_get_shards(env, env.initial_tenant):
|
||||
pageserver.http_client().timeline_checkpoint(
|
||||
shard, env.initial_timeline, force_repartition=True, force_image_layer_creation=True
|
||||
)
|
||||
|
||||
pageserver.http_client().timeline_gc(shard, env.initial_timeline, None)
|
||||
|
||||
|
||||
# Run the main PostgreSQL regression tests, in src/test/regress.
|
||||
#
|
||||
|
||||
@@ -19,7 +19,7 @@ from fixtures.neon_fixtures import (
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty
|
||||
from fixtures.remote_storage import s3_storage
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, s3_storage
|
||||
from fixtures.utils import skip_in_debug_build, wait_until
|
||||
from fixtures.workload import Workload
|
||||
from pytest_httpserver import HTTPServer
|
||||
@@ -515,11 +515,12 @@ def test_sharding_split_smoke(
|
||||
|
||||
"""
|
||||
|
||||
# We will start with 4 shards and split into 8, then migrate all those
|
||||
# 8 shards onto separate pageservers
|
||||
shard_count = 4
|
||||
split_shard_count = 8
|
||||
neon_env_builder.num_pageservers = split_shard_count * 2
|
||||
# Shard count we start with
|
||||
shard_count = 2
|
||||
# Shard count we split into
|
||||
split_shard_count = 4
|
||||
# We will have 2 shards per pageserver once done (including secondaries)
|
||||
neon_env_builder.num_pageservers = split_shard_count
|
||||
|
||||
# 1MiB stripes: enable getting some meaningful data distribution without
|
||||
# writing large quantities of data in this test. The stripe size is given
|
||||
@@ -591,7 +592,7 @@ def test_sharding_split_smoke(
|
||||
|
||||
workload.validate()
|
||||
|
||||
assert len(pre_split_pageserver_ids) == 4
|
||||
assert len(pre_split_pageserver_ids) == shard_count
|
||||
|
||||
def shards_on_disk(shard_ids):
|
||||
for pageserver in env.pageservers:
|
||||
@@ -654,9 +655,9 @@ def test_sharding_split_smoke(
|
||||
# - shard_count reconciles for the original setup of the tenant
|
||||
# - shard_count reconciles for detaching the original secondary locations during split
|
||||
# - split_shard_count reconciles during shard splitting, for setting up secondaries.
|
||||
# - shard_count of the child shards will need to fail over to their secondaries
|
||||
# - shard_count of the child shard secondary locations will get moved to emptier nodes
|
||||
expect_reconciles = shard_count * 2 + split_shard_count + shard_count * 2
|
||||
# - split_shard_count/2 of the child shards will need to fail over to their secondaries (since we have 8 shards and 4 pageservers, only 4 will move)
|
||||
expect_reconciles = shard_count * 2 + split_shard_count + split_shard_count / 2
|
||||
|
||||
reconcile_ok = env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
|
||||
)
|
||||
@@ -720,22 +721,10 @@ def test_sharding_split_smoke(
|
||||
# dominated by shard count.
|
||||
log.info(f"total: {total}")
|
||||
assert total == {
|
||||
1: 1,
|
||||
2: 1,
|
||||
3: 1,
|
||||
4: 1,
|
||||
5: 1,
|
||||
6: 1,
|
||||
7: 1,
|
||||
8: 1,
|
||||
9: 1,
|
||||
10: 1,
|
||||
11: 1,
|
||||
12: 1,
|
||||
13: 1,
|
||||
14: 1,
|
||||
15: 1,
|
||||
16: 1,
|
||||
1: 2,
|
||||
2: 2,
|
||||
3: 2,
|
||||
4: 2,
|
||||
}
|
||||
|
||||
# The controller is not required to lay out the attached locations in any particular way, but
|
||||
@@ -1685,3 +1674,111 @@ def test_top_tenants(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
assert len(top["shards"]) == n_tenants - 4
|
||||
assert set(i["id"] for i in top["shards"]) == set(str(i[0]) for i in tenants[4:])
|
||||
|
||||
|
||||
def test_sharding_gc(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Exercise GC in a sharded tenant: because only shard 0 holds SLRU content, it acts as
|
||||
the "leader" for GC, and other shards read its index to learn what LSN they should
|
||||
GC up to.
|
||||
"""
|
||||
|
||||
shard_count = 4
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
TENANT_CONF = {
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": 128 * 1024,
|
||||
"compaction_threshold": 1,
|
||||
"compaction_target_size": 128 * 1024,
|
||||
# A short PITR horizon, so that we won't have to sleep too long in the test to wait for it to
|
||||
# happen.
|
||||
"pitr_interval": "1s",
|
||||
# disable background compaction and GC. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# Disable automatic creation of image layers, as we will create them explicitly when we want them
|
||||
"image_creation_threshold": 9999,
|
||||
"image_layer_creation_check_threshold": 0,
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_shard_count=shard_count, initial_tenant_conf=TENANT_CONF
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
# Create a branch and write some data
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
initial_lsn = Lsn(workload.endpoint().safe_psql("SELECT pg_current_wal_lsn()")[0][0])
|
||||
log.info(f"Started at LSN: {initial_lsn}")
|
||||
|
||||
workload.init()
|
||||
|
||||
# Write enough data to generate multiple layers
|
||||
for _i in range(10):
|
||||
last_lsn = workload.write_rows(32)
|
||||
|
||||
assert last_lsn > initial_lsn
|
||||
|
||||
log.info(f"Wrote up to last LSN: {last_lsn}")
|
||||
|
||||
# Do full image layer generation. When we subsequently wait for PITR, all historic deltas
|
||||
# should be GC-able
|
||||
for shard_number in range(shard_count):
|
||||
shard = TenantShardId(tenant_id, shard_number, shard_count)
|
||||
env.get_tenant_pageserver(shard).http_client().timeline_compact(
|
||||
shard, timeline_id, force_image_layer_creation=True
|
||||
)
|
||||
|
||||
workload.churn_rows(32)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
# Invoke GC on a non-zero shard and verify its GC cutoff LSN does not advance
|
||||
shard_one = TenantShardId(tenant_id, 1, shard_count)
|
||||
env.get_tenant_pageserver(shard_one).http_client().timeline_gc(
|
||||
shard_one, timeline_id, gc_horizon=None
|
||||
)
|
||||
|
||||
# Check shard 1's index - GC cutoff LSN should not have advanced
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
shard_1_index = env.pageserver_remote_storage.index_content(
|
||||
tenant_id=shard_one, timeline_id=timeline_id
|
||||
)
|
||||
shard_1_gc_cutoff_lsn = Lsn(shard_1_index["metadata_bytes"]["latest_gc_cutoff_lsn"])
|
||||
log.info(f"Shard 1 cutoff LSN: {shard_1_gc_cutoff_lsn}")
|
||||
assert shard_1_gc_cutoff_lsn <= last_lsn
|
||||
|
||||
shard_zero = TenantShardId(tenant_id, 0, shard_count)
|
||||
env.get_tenant_pageserver(shard_zero).http_client().timeline_gc(
|
||||
shard_zero, timeline_id, gc_horizon=None
|
||||
)
|
||||
|
||||
# TODO: observe that GC LSN of shard 0 has moved forward in remote storage
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
shard_0_index = env.pageserver_remote_storage.index_content(
|
||||
tenant_id=shard_zero, timeline_id=timeline_id
|
||||
)
|
||||
shard_0_gc_cutoff_lsn = Lsn(shard_0_index["metadata_bytes"]["latest_gc_cutoff_lsn"])
|
||||
log.info(f"Shard 0 cutoff LSN: {shard_0_gc_cutoff_lsn}")
|
||||
assert shard_0_gc_cutoff_lsn >= last_lsn
|
||||
|
||||
# Invoke GC on all other shards and verify their GC cutoff LSNs
|
||||
for shard_number in range(1, shard_count):
|
||||
shard = TenantShardId(tenant_id, shard_number, shard_count)
|
||||
env.get_tenant_pageserver(shard).http_client().timeline_gc(
|
||||
shard, timeline_id, gc_horizon=None
|
||||
)
|
||||
|
||||
# Verify GC cutoff LSN advanced to match shard 0
|
||||
shard_index = env.pageserver_remote_storage.index_content(
|
||||
tenant_id=shard, timeline_id=timeline_id
|
||||
)
|
||||
shard_gc_cutoff_lsn = Lsn(shard_index["metadata_bytes"]["latest_gc_cutoff_lsn"])
|
||||
log.info(f"Shard {shard_number} cutoff LSN: {shard_gc_cutoff_lsn}")
|
||||
assert shard_gc_cutoff_lsn == shard_0_gc_cutoff_lsn
|
||||
|
||||
Reference in New Issue
Block a user