mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 06:30:43 +00:00
Compare commits
16 Commits
problame/r
...
jcsp/test-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8f943d9c1f | ||
|
|
9b0d02e61d | ||
|
|
6f0046b688 | ||
|
|
2b0248cd76 | ||
|
|
7b03216dca | ||
|
|
992aa91075 | ||
|
|
afe9b27983 | ||
|
|
5d91d4e843 | ||
|
|
2465e9141f | ||
|
|
2526f6aea1 | ||
|
|
5ba7315c84 | ||
|
|
6f7e3c18e4 | ||
|
|
8afb783708 | ||
|
|
1531712555 | ||
|
|
5e989a3148 | ||
|
|
985056be37 |
@@ -275,7 +275,7 @@ jobs:
|
||||
for io_mode in buffered direct direct-rw ; do
|
||||
NEON_PAGESERVER_UNIT_TEST_GET_VECTORED_CONCURRENT_IO=$get_vectored_concurrent_io \
|
||||
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine \
|
||||
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOMODE=$io_mode \
|
||||
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE=$io_mode \
|
||||
${cov_prefix} \
|
||||
cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)'
|
||||
done
|
||||
@@ -381,7 +381,7 @@ jobs:
|
||||
run_with_real_s3: true
|
||||
real_s3_bucket: neon-github-ci-tests
|
||||
real_s3_region: eu-central-1
|
||||
rerun_failed: true
|
||||
rerun_failed: false
|
||||
pg_version: ${{ matrix.pg_version }}
|
||||
sanitizers: ${{ inputs.sanitizers }}
|
||||
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
|
||||
@@ -395,7 +395,7 @@ jobs:
|
||||
BUILD_TAG: ${{ inputs.build-tag }}
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
PAGESERVER_GET_VECTORED_CONCURRENT_IO: sidecar-task
|
||||
PAGESERVER_VIRTUAL_FILE_IO_MODE: direct
|
||||
PAGESERVER_VIRTUAL_FILE_IO_MODE: direct-rw
|
||||
USE_LFC: ${{ matrix.lfc_state == 'with-lfc' && 'true' || 'false' }}
|
||||
|
||||
# Temporary disable this step until we figure out why it's so flaky
|
||||
|
||||
63
.github/workflows/build-macos.yml
vendored
63
.github/workflows/build-macos.yml
vendored
@@ -63,13 +63,8 @@ jobs:
|
||||
|
||||
- name: Cache postgres ${{ matrix.postgres-version }} build
|
||||
id: cache_pg
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/${{ matrix.postgres-version }}
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ matrix.postgres-version }}-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
@@ -134,25 +129,15 @@ jobs:
|
||||
|
||||
- name: Cache postgres v17 build
|
||||
id: cache_pg
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v17
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache walproposer-lib
|
||||
id: cache_walproposer_lib
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/build/walproposer-lib
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
@@ -218,57 +203,32 @@ jobs:
|
||||
|
||||
- name: Cache postgres v14 build
|
||||
id: cache_pg
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v14
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v14-${{ steps.pg_rev_v14.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
- name: Cache postgres v15 build
|
||||
id: cache_pg_v15
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v15
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v15-${{ steps.pg_rev_v15.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
- name: Cache postgres v16 build
|
||||
id: cache_pg_v16
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v16
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v16-${{ steps.pg_rev_v16.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
- name: Cache postgres v17 build
|
||||
id: cache_pg_v17
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v17
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache cargo deps (only for v17)
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: |
|
||||
~/.cargo/registry
|
||||
!~/.cargo/registry/src
|
||||
@@ -278,13 +238,8 @@ jobs:
|
||||
|
||||
- name: Cache walproposer-lib
|
||||
id: cache_walproposer_lib
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/build/walproposer-lib
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
|
||||
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -324,7 +324,7 @@ jobs:
|
||||
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
PAGESERVER_GET_VECTORED_CONCURRENT_IO: sidecar-task
|
||||
PAGESERVER_VIRTUAL_FILE_IO_MODE: direct
|
||||
PAGESERVER_VIRTUAL_FILE_IO_MODE: direct-rw
|
||||
SYNC_BETWEEN_TESTS: true
|
||||
# XXX: no coverage data handling here, since benchmarks are run on release builds,
|
||||
# while coverage is currently collected for the debug ones
|
||||
|
||||
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1323,7 +1323,6 @@ dependencies = [
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"signal-hook",
|
||||
"spki 0.7.3",
|
||||
"tar",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
@@ -4302,6 +4301,7 @@ dependencies = [
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"rpds",
|
||||
"rstest",
|
||||
"rustls 0.23.18",
|
||||
"scopeguard",
|
||||
"send-future",
|
||||
|
||||
@@ -173,7 +173,7 @@ RUN curl -fsSL "https://github.com/protocolbuffers/protobuf/releases/download/v$
|
||||
&& rm -rf protoc.zip protoc
|
||||
|
||||
# s5cmd
|
||||
ENV S5CMD_VERSION=2.2.2
|
||||
ENV S5CMD_VERSION=2.3.0
|
||||
RUN curl -sL "https://github.com/peak/s5cmd/releases/download/v${S5CMD_VERSION}/s5cmd_${S5CMD_VERSION}_Linux-$(uname -m | sed 's/x86_64/64bit/g' | sed 's/aarch64/arm64/g').tar.gz" | tar zxvf - s5cmd \
|
||||
&& chmod +x s5cmd \
|
||||
&& mv s5cmd /usr/local/bin/s5cmd
|
||||
@@ -206,7 +206,7 @@ RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-$(uname -m).zip" -o "aws
|
||||
&& rm awscliv2.zip
|
||||
|
||||
# Mold: A Modern Linker
|
||||
ENV MOLD_VERSION=v2.34.1
|
||||
ENV MOLD_VERSION=v2.37.1
|
||||
RUN set -e \
|
||||
&& git clone https://github.com/rui314/mold.git \
|
||||
&& mkdir mold/build \
|
||||
@@ -268,7 +268,7 @@ WORKDIR /home/nonroot
|
||||
RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /home/nonroot/.curlrc
|
||||
|
||||
# Python
|
||||
ENV PYTHON_VERSION=3.11.10 \
|
||||
ENV PYTHON_VERSION=3.11.12 \
|
||||
PYENV_ROOT=/home/nonroot/.pyenv \
|
||||
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
|
||||
RUN set -e \
|
||||
@@ -296,12 +296,12 @@ ENV RUSTC_VERSION=1.86.0
|
||||
ENV RUSTUP_HOME="/home/nonroot/.rustup"
|
||||
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
|
||||
ARG RUSTFILT_VERSION=0.2.1
|
||||
ARG CARGO_HAKARI_VERSION=0.9.33
|
||||
ARG CARGO_DENY_VERSION=0.16.2
|
||||
ARG CARGO_HACK_VERSION=0.6.33
|
||||
ARG CARGO_NEXTEST_VERSION=0.9.85
|
||||
ARG CARGO_HAKARI_VERSION=0.9.36
|
||||
ARG CARGO_DENY_VERSION=0.18.2
|
||||
ARG CARGO_HACK_VERSION=0.6.36
|
||||
ARG CARGO_NEXTEST_VERSION=0.9.94
|
||||
ARG CARGO_CHEF_VERSION=0.1.71
|
||||
ARG CARGO_DIESEL_CLI_VERSION=2.2.6
|
||||
ARG CARGO_DIESEL_CLI_VERSION=2.2.9
|
||||
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
|
||||
chmod +x rustup-init && \
|
||||
./rustup-init -y --default-toolchain ${RUSTC_VERSION} && \
|
||||
|
||||
@@ -44,7 +44,6 @@ serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
signal-hook.workspace = true
|
||||
spki = { version = "0.7.3", features = ["std"] }
|
||||
tar.workspace = true
|
||||
tower.workspace = true
|
||||
tower-http.workspace = true
|
||||
|
||||
@@ -57,13 +57,24 @@ use tracing::{error, info};
|
||||
use url::Url;
|
||||
use utils::failpoint_support;
|
||||
|
||||
// Compatibility hack: if the control plane specified any remote-ext-config
|
||||
// use the default value for extension storage proxy gateway.
|
||||
// Remove this once the control plane is updated to pass the gateway URL
|
||||
fn parse_remote_ext_config(arg: &str) -> Result<String> {
|
||||
if arg.starts_with("http") {
|
||||
Ok(arg.trim_end_matches('/').to_string())
|
||||
} else {
|
||||
Ok("http://pg-ext-s3-gateway".to_string())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(rename_all = "kebab-case")]
|
||||
struct Cli {
|
||||
#[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
|
||||
pub pgbin: String,
|
||||
|
||||
#[arg(short = 'r', long)]
|
||||
#[arg(short = 'r', long, value_parser = parse_remote_ext_config)]
|
||||
pub remote_ext_config: Option<String>,
|
||||
|
||||
/// The port to bind the external listening HTTP server to. Clients running
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use metrics::core::{AtomicF64, Collector, GenericGauge};
|
||||
use metrics::core::{AtomicF64, AtomicU64, Collector, GenericCounter, GenericGauge};
|
||||
use metrics::proto::MetricFamily;
|
||||
use metrics::{
|
||||
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter_vec,
|
||||
register_int_gauge_vec, register_uint_gauge_vec,
|
||||
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
|
||||
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
@@ -81,6 +81,22 @@ pub(crate) static COMPUTE_CTL_UP: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static PG_CURR_DOWNTIME_MS: Lazy<GenericGauge<AtomicF64>> = Lazy::new(|| {
|
||||
register_gauge!(
|
||||
"compute_pg_current_downtime_ms",
|
||||
"Non-cumulative duration of Postgres downtime in ms; resets after successful check",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static PG_TOTAL_DOWNTIME_MS: Lazy<GenericCounter<AtomicU64>> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"compute_pg_downtime_ms_total",
|
||||
"Cumulative duration of Postgres downtime in ms",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub fn collect() -> Vec<MetricFamily> {
|
||||
let mut metrics = COMPUTE_CTL_UP.collect();
|
||||
metrics.extend(INSTALLED_EXTENSIONS.collect());
|
||||
@@ -88,5 +104,7 @@ pub fn collect() -> Vec<MetricFamily> {
|
||||
metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect());
|
||||
metrics.extend(DB_MIGRATION_FAILED.collect());
|
||||
metrics.extend(AUDIT_LOG_DIR_SIZE.collect());
|
||||
metrics.extend(PG_CURR_DOWNTIME_MS.collect());
|
||||
metrics.extend(PG_TOTAL_DOWNTIME_MS.collect());
|
||||
metrics
|
||||
}
|
||||
|
||||
@@ -6,197 +6,294 @@ use chrono::{DateTime, Utc};
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use compute_api::spec::ComputeFeature;
|
||||
use postgres::{Client, NoTls};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing::{Level, error, info, instrument, span};
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
|
||||
|
||||
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
|
||||
|
||||
// Spin in a loop and figure out the last activity time in the Postgres.
|
||||
// Then update it in the shared state. This function never errors out.
|
||||
// NB: the only expected panic is at `Mutex` unwrap(), all other errors
|
||||
// should be handled gracefully.
|
||||
fn watch_compute_activity(compute: &ComputeNode) {
|
||||
// Suppose that `connstr` doesn't change
|
||||
let connstr = compute.params.connstr.clone();
|
||||
let conf = compute.get_conn_conf(Some("compute_ctl:activity_monitor"));
|
||||
struct ComputeMonitor {
|
||||
compute: Arc<ComputeNode>,
|
||||
|
||||
// During startup and configuration we connect to every Postgres database,
|
||||
// but we don't want to count this as some user activity. So wait until
|
||||
// the compute fully started before monitoring activity.
|
||||
wait_for_postgres_start(compute);
|
||||
/// The moment when Postgres had some activity,
|
||||
/// that should prevent compute from being suspended.
|
||||
last_active: Option<DateTime<Utc>>,
|
||||
|
||||
// Define `client` outside of the loop to reuse existing connection if it's active.
|
||||
let mut client = conf.connect(NoTls);
|
||||
/// The moment when we last tried to check Postgres.
|
||||
last_checked: DateTime<Utc>,
|
||||
/// The last moment we did a successful Postgres check.
|
||||
last_up: DateTime<Utc>,
|
||||
|
||||
let mut sleep = false;
|
||||
let mut prev_active_time: Option<f64> = None;
|
||||
let mut prev_sessions: Option<i64> = None;
|
||||
/// Only used for internal statistics change tracking
|
||||
/// between monitor runs and can be outdated.
|
||||
active_time: Option<f64>,
|
||||
/// Only used for internal statistics change tracking
|
||||
/// between monitor runs and can be outdated.
|
||||
sessions: Option<i64>,
|
||||
|
||||
if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
|
||||
info!("starting experimental activity monitor for {}", connstr);
|
||||
} else {
|
||||
info!("starting activity monitor for {}", connstr);
|
||||
/// Use experimental statistics-based activity monitor. It's no longer
|
||||
/// 'experimental' per se, as it's enabled for everyone, but we still
|
||||
/// keep the flag as an option to turn it off in some cases if it will
|
||||
/// misbehave.
|
||||
experimental: bool,
|
||||
}
|
||||
|
||||
impl ComputeMonitor {
|
||||
fn report_down(&self) {
|
||||
let now = Utc::now();
|
||||
|
||||
// Calculate and report current downtime
|
||||
// (since the last time Postgres was up)
|
||||
let downtime = now.signed_duration_since(self.last_up);
|
||||
PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
|
||||
|
||||
// Calculate and update total downtime
|
||||
// (cumulative duration of Postgres downtime in ms)
|
||||
let inc = now
|
||||
.signed_duration_since(self.last_checked)
|
||||
.num_milliseconds();
|
||||
PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
|
||||
}
|
||||
|
||||
loop {
|
||||
// We use `continue` a lot, so it's more convenient to sleep at the top of the loop.
|
||||
// But skip the first sleep, so we can connect to Postgres immediately.
|
||||
if sleep {
|
||||
// Should be outside of the mutex lock to allow others to read while we sleep.
|
||||
thread::sleep(MONITOR_CHECK_INTERVAL);
|
||||
} else {
|
||||
sleep = true;
|
||||
}
|
||||
fn report_up(&mut self) {
|
||||
self.last_up = Utc::now();
|
||||
PG_CURR_DOWNTIME_MS.set(0.0);
|
||||
}
|
||||
|
||||
match &mut client {
|
||||
Ok(cli) => {
|
||||
if cli.is_closed() {
|
||||
info!("connection to Postgres is closed, trying to reconnect");
|
||||
fn downtime_info(&self) -> String {
|
||||
format!(
|
||||
"total_ms: {}, current_ms: {}, last_up: {}",
|
||||
PG_TOTAL_DOWNTIME_MS.get(),
|
||||
PG_CURR_DOWNTIME_MS.get(),
|
||||
self.last_up
|
||||
)
|
||||
}
|
||||
|
||||
// Connection is closed, reconnect and try again.
|
||||
client = conf.connect(NoTls);
|
||||
continue;
|
||||
}
|
||||
/// Spin in a loop and figure out the last activity time in the Postgres.
|
||||
/// Then update it in the shared state. This function never errors out.
|
||||
/// NB: the only expected panic is at `Mutex` unwrap(), all other errors
|
||||
/// should be handled gracefully.
|
||||
#[instrument(skip_all)]
|
||||
pub fn run(&mut self) {
|
||||
// Suppose that `connstr` doesn't change
|
||||
let connstr = self.compute.params.connstr.clone();
|
||||
let conf = self
|
||||
.compute
|
||||
.get_conn_conf(Some("compute_ctl:compute_monitor"));
|
||||
|
||||
// This is a new logic, only enable if the feature flag is set.
|
||||
// TODO: remove this once we are sure that it works OR drop it altogether.
|
||||
if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
|
||||
// First, check if the total active time or sessions across all databases has changed.
|
||||
// If it did, it means that user executed some queries. In theory, it can even go down if
|
||||
// some databases were dropped, but it's still a user activity.
|
||||
match get_database_stats(cli) {
|
||||
Ok((active_time, sessions)) => {
|
||||
let mut detected_activity = false;
|
||||
// During startup and configuration we connect to every Postgres database,
|
||||
// but we don't want to count this as some user activity. So wait until
|
||||
// the compute fully started before monitoring activity.
|
||||
wait_for_postgres_start(&self.compute);
|
||||
|
||||
prev_active_time = match prev_active_time {
|
||||
Some(prev_active_time) => {
|
||||
if active_time != prev_active_time {
|
||||
detected_activity = true;
|
||||
}
|
||||
Some(active_time)
|
||||
}
|
||||
None => Some(active_time),
|
||||
};
|
||||
prev_sessions = match prev_sessions {
|
||||
Some(prev_sessions) => {
|
||||
if sessions != prev_sessions {
|
||||
detected_activity = true;
|
||||
}
|
||||
Some(sessions)
|
||||
}
|
||||
None => Some(sessions),
|
||||
};
|
||||
// Define `client` outside of the loop to reuse existing connection if it's active.
|
||||
let mut client = conf.connect(NoTls);
|
||||
|
||||
if detected_activity {
|
||||
// Update the last active time and continue, we don't need to
|
||||
// check backends state change.
|
||||
compute.update_last_active(Some(Utc::now()));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("could not get database statistics: {}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("starting compute monitor for {}", connstr);
|
||||
|
||||
// Second, if database statistics is the same, check all backends state change,
|
||||
// maybe there is some with more recent activity. `get_backends_state_change()`
|
||||
// can return None or stale timestamp, so it's `compute.update_last_active()`
|
||||
// responsibility to check if the new timestamp is more recent than the current one.
|
||||
// This helps us to discover new sessions, that did nothing yet.
|
||||
match get_backends_state_change(cli) {
|
||||
Ok(last_active) => {
|
||||
compute.update_last_active(last_active);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("could not get backends state change: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Finally, if there are existing (logical) walsenders, do not suspend.
|
||||
//
|
||||
// walproposer doesn't currently show up in pg_stat_replication,
|
||||
// but protect if it will be
|
||||
let ws_count_query = "select count(*) from pg_stat_replication where application_name != 'walproposer';";
|
||||
match cli.query_one(ws_count_query, &[]) {
|
||||
Ok(r) => match r.try_get::<&str, i64>("count") {
|
||||
Ok(num_ws) => {
|
||||
if num_ws > 0 {
|
||||
compute.update_last_active(Some(Utc::now()));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("failed to parse walsenders count: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("failed to get list of walsenders: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
//
|
||||
// Don't suspend compute if there is an active logical replication subscription
|
||||
//
|
||||
// `where pid is not null` – to filter out read only computes and subscription on branches
|
||||
//
|
||||
let logical_subscriptions_query =
|
||||
"select count(*) from pg_stat_subscription where pid is not null;";
|
||||
match cli.query_one(logical_subscriptions_query, &[]) {
|
||||
Ok(row) => match row.try_get::<&str, i64>("count") {
|
||||
Ok(num_subscribers) => {
|
||||
if num_subscribers > 0 {
|
||||
compute.update_last_active(Some(Utc::now()));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("failed to parse `pg_stat_subscription` count: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"failed to get list of active logical replication subscriptions: {:?}",
|
||||
e
|
||||
loop {
|
||||
match &mut client {
|
||||
Ok(cli) => {
|
||||
if cli.is_closed() {
|
||||
info!(
|
||||
downtime_info = self.downtime_info(),
|
||||
"connection to Postgres is closed, trying to reconnect"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
//
|
||||
// Do not suspend compute if autovacuum is running
|
||||
//
|
||||
let autovacuum_count_query = "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
|
||||
match cli.query_one(autovacuum_count_query, &[]) {
|
||||
Ok(r) => match r.try_get::<&str, i64>("count") {
|
||||
Ok(num_workers) => {
|
||||
if num_workers > 0 {
|
||||
compute.update_last_active(Some(Utc::now()));
|
||||
continue;
|
||||
self.report_down();
|
||||
|
||||
// Connection is closed, reconnect and try again.
|
||||
client = conf.connect(NoTls);
|
||||
} else {
|
||||
match self.check(cli) {
|
||||
Ok(_) => {
|
||||
self.report_up();
|
||||
self.compute.update_last_active(self.last_active);
|
||||
}
|
||||
Err(e) => {
|
||||
// Although we have many places where we can return errors in `check()`,
|
||||
// normally it shouldn't happen. I.e., we will likely return error if
|
||||
// connection got broken, query timed out, Postgres returned invalid data, etc.
|
||||
// In all such cases it's suspicious, so let's report this as downtime.
|
||||
self.report_down();
|
||||
error!(
|
||||
downtime_info = self.downtime_info(),
|
||||
"could not check Postgres: {}", e
|
||||
);
|
||||
|
||||
// Reconnect to Postgres just in case. During tests, I noticed
|
||||
// that queries in `check()` can fail with `connection closed`,
|
||||
// but `cli.is_closed()` above doesn't detect it. Even if old
|
||||
// connection is still alive, it will be dropped when we reassign
|
||||
// `client` to a new connection.
|
||||
client = conf.connect(NoTls);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("failed to parse autovacuum workers count: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("failed to get list of autovacuum workers: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("could not connect to Postgres: {}, retrying", e);
|
||||
Err(e) => {
|
||||
info!(
|
||||
downtime_info = self.downtime_info(),
|
||||
"could not connect to Postgres: {}, retrying", e
|
||||
);
|
||||
self.report_down();
|
||||
|
||||
// Establish a new connection and try again.
|
||||
client = conf.connect(NoTls);
|
||||
// Establish a new connection and try again.
|
||||
client = conf.connect(NoTls);
|
||||
}
|
||||
}
|
||||
|
||||
// Reset the `last_checked` timestamp and sleep before the next iteration.
|
||||
self.last_checked = Utc::now();
|
||||
thread::sleep(MONITOR_CHECK_INTERVAL);
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
|
||||
// This is new logic, only enable if the feature flag is set.
|
||||
// TODO: remove this once we are sure that it works OR drop it altogether.
|
||||
if self.experimental {
|
||||
// Check if the total active time or sessions across all databases has changed.
|
||||
// If it did, it means that user executed some queries. In theory, it can even go down if
|
||||
// some databases were dropped, but it's still user activity.
|
||||
match get_database_stats(cli) {
|
||||
Ok((active_time, sessions)) => {
|
||||
let mut detected_activity = false;
|
||||
|
||||
if let Some(prev_active_time) = self.active_time {
|
||||
if active_time != prev_active_time {
|
||||
detected_activity = true;
|
||||
}
|
||||
}
|
||||
self.active_time = Some(active_time);
|
||||
|
||||
if let Some(prev_sessions) = self.sessions {
|
||||
if sessions != prev_sessions {
|
||||
detected_activity = true;
|
||||
}
|
||||
}
|
||||
self.sessions = Some(sessions);
|
||||
|
||||
if detected_activity {
|
||||
// Update the last active time and continue, we don't need to
|
||||
// check backends state change.
|
||||
self.last_active = Some(Utc::now());
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!("could not get database statistics: {}", e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If database statistics are the same, check all backends for state changes.
|
||||
// Maybe there are some with more recent activity. `get_backends_state_change()`
|
||||
// can return None or stale timestamp, so it's `compute.update_last_active()`
|
||||
// responsibility to check if the new timestamp is more recent than the current one.
|
||||
// This helps us to discover new sessions that have not done anything yet.
|
||||
match get_backends_state_change(cli) {
|
||||
Ok(last_active) => match (last_active, self.last_active) {
|
||||
(Some(last_active), Some(prev_last_active)) => {
|
||||
if last_active > prev_last_active {
|
||||
self.last_active = Some(last_active);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
(Some(last_active), None) => {
|
||||
self.last_active = Some(last_active);
|
||||
return Ok(());
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"could not get backends state change: {}",
|
||||
e
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// If there are existing (logical) walsenders, do not suspend.
|
||||
//
|
||||
// N.B. walproposer doesn't currently show up in pg_stat_replication,
|
||||
// but protect if it will.
|
||||
const WS_COUNT_QUERY: &str =
|
||||
"select count(*) from pg_stat_replication where application_name != 'walproposer';";
|
||||
match cli.query_one(WS_COUNT_QUERY, &[]) {
|
||||
Ok(r) => match r.try_get::<&str, i64>("count") {
|
||||
Ok(num_ws) => {
|
||||
if num_ws > 0 {
|
||||
self.last_active = Some(Utc::now());
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let err: anyhow::Error = e.into();
|
||||
return Err(err.context("failed to parse walsenders count"));
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
// Don't suspend compute if there is an active logical replication subscription
|
||||
//
|
||||
// `where pid is not null` – to filter out read only computes and subscription on branches
|
||||
const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
|
||||
"select count(*) from pg_stat_subscription where pid is not null;";
|
||||
match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
|
||||
Ok(row) => match row.try_get::<&str, i64>("count") {
|
||||
Ok(num_subscribers) => {
|
||||
if num_subscribers > 0 {
|
||||
self.last_active = Some(Utc::now());
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"failed to parse 'pg_stat_subscription' count: {}",
|
||||
e
|
||||
));
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"failed to get list of active logical replication subscriptions: {}",
|
||||
e
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Do not suspend compute if autovacuum is running
|
||||
const AUTOVACUUM_COUNT_QUERY: &str =
|
||||
"select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
|
||||
match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
|
||||
Ok(r) => match r.try_get::<&str, i64>("count") {
|
||||
Ok(num_workers) => {
|
||||
if num_workers > 0 {
|
||||
self.last_active = Some(Utc::now());
|
||||
return Ok(());
|
||||
};
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"failed to parse autovacuum workers count: {}",
|
||||
e
|
||||
));
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"failed to get list of autovacuum workers: {}",
|
||||
e
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,9 +412,24 @@ fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime
|
||||
/// Launch a separate compute monitor thread and return its `JoinHandle`.
|
||||
pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
|
||||
let compute = Arc::clone(compute);
|
||||
let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
|
||||
let now = Utc::now();
|
||||
let mut monitor = ComputeMonitor {
|
||||
compute,
|
||||
last_active: None,
|
||||
last_checked: now,
|
||||
last_up: now,
|
||||
active_time: None,
|
||||
sessions: None,
|
||||
experimental,
|
||||
};
|
||||
|
||||
let span = span!(Level::INFO, "compute_monitor");
|
||||
thread::Builder::new()
|
||||
.name("compute-monitor".into())
|
||||
.spawn(move || watch_compute_activity(&compute))
|
||||
.spawn(move || {
|
||||
let _enter = span.enter();
|
||||
monitor.run();
|
||||
})
|
||||
.expect("cannot launch compute monitor thread")
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ use std::{io::Write, os::unix::fs::OpenOptionsExt, path::Path, time::Duration};
|
||||
use anyhow::{Context, Result, bail};
|
||||
use compute_api::responses::TlsConfig;
|
||||
use ring::digest;
|
||||
use spki::der::{Decode, PemReader};
|
||||
use x509_cert::Certificate;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -52,7 +51,7 @@ pub fn update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) {
|
||||
match try_update_key_path_blocking(pg_data, tls_config) {
|
||||
Ok(()) => break,
|
||||
Err(e) => {
|
||||
tracing::error!("could not create key file {e:?}");
|
||||
tracing::error!(error = ?e, "could not create key file");
|
||||
std::thread::sleep(Duration::from_secs(1))
|
||||
}
|
||||
}
|
||||
@@ -92,8 +91,14 @@ fn try_update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) -> Resul
|
||||
fn verify_key_cert(key: &str, cert: &str) -> Result<()> {
|
||||
use x509_cert::der::oid::db::rfc5912::ECDSA_WITH_SHA_256;
|
||||
|
||||
let cert = Certificate::decode(&mut PemReader::new(cert.as_bytes()).context("pem reader")?)
|
||||
.context("decode cert")?;
|
||||
let certs = Certificate::load_pem_chain(cert.as_bytes())
|
||||
.context("decoding PEM encoded certificates")?;
|
||||
|
||||
// First certificate is our server-cert,
|
||||
// all the rest of the certs are the CA cert chain.
|
||||
let Some(cert) = certs.first() else {
|
||||
bail!("no certificates found");
|
||||
};
|
||||
|
||||
match cert.signature_algorithm.oid {
|
||||
ECDSA_WITH_SHA_256 => {
|
||||
@@ -115,3 +120,82 @@ fn verify_key_cert(key: &str, cert: &str) -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::verify_key_cert;
|
||||
|
||||
/// Real certificate chain file, generated by cert-manager in dev.
|
||||
/// The server auth certificate has expired since 2025-04-24T15:41:35Z.
|
||||
const CERT: &str = "
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIICCDCCAa+gAwIBAgIQKhLomFcNULbZA/bPdGzaSzAKBggqhkjOPQQDAjBEMQsw
|
||||
CQYDVQQGEwJVUzESMBAGA1UEChMJTmVvbiBJbmMuMSEwHwYDVQQDExhOZW9uIEs4
|
||||
cyBJbnRlcm1lZGlhdGUgQ0EwHhcNMjUwNDIzMTU0MTM1WhcNMjUwNDI0MTU0MTM1
|
||||
WjBBMT8wPQYDVQQDEzZjb21wdXRlLXdpc3B5LWdyYXNzLXcwY21laWp3LmRlZmF1
|
||||
bHQuc3ZjLmNsdXN0ZXIubG9jYWwwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAATF
|
||||
QCcG2m/EVHAiZtSsYgVnHgoTjUL/Jtwfdrpvz2t0bVRZmBmSKhlo53uPV9Y5eKFG
|
||||
AmR54p9/gT2eO3xU7vAgo4GFMIGCMA4GA1UdDwEB/wQEAwIFoDAMBgNVHRMBAf8E
|
||||
AjAAMB8GA1UdIwQYMBaAFFR2JAhXkeiNQNEixTvAYIwxUu3QMEEGA1UdEQQ6MDiC
|
||||
NmNvbXB1dGUtd2lzcHktZ3Jhc3MtdzBjbWVpancuZGVmYXVsdC5zdmMuY2x1c3Rl
|
||||
ci5sb2NhbDAKBggqhkjOPQQDAgNHADBEAiBLG22wKG8XS9e9RxBT+kmUx/kIThcP
|
||||
DIpp7jx0PrFcdQIgEMTdnXpx5Cv/Z0NIEDxtMHUD7G0vuRPfztki36JuakM=
|
||||
-----END CERTIFICATE-----
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIICFzCCAb6gAwIBAgIUbbX98N2Ip6lWAONRk8dU9hSz+YIwCgYIKoZIzj0EAwIw
|
||||
RDELMAkGA1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEhMB8GA1UEAxMYTmVv
|
||||
biBBV1MgSW50ZXJtZWRpYXRlIENBMB4XDTI1MDQyMjE1MTAxMFoXDTI1MDcyMTE1
|
||||
MTAxMFowRDELMAkGA1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEhMB8GA1UE
|
||||
AxMYTmVvbiBLOHMgSW50ZXJtZWRpYXRlIENBMFkwEwYHKoZIzj0CAQYIKoZIzj0D
|
||||
AQcDQgAE5++m5owqNI4BPMTVNIUQH0qvU7pYhdpHGVGhdj/Lgars6ROvE6uSNQV4
|
||||
SAmJN5HBzj5/6kLQaTPWpXW7EHXjK6OBjTCBijAOBgNVHQ8BAf8EBAMCAQYwEgYD
|
||||
VR0TAQH/BAgwBgEB/wIBADAdBgNVHQ4EFgQUVHYkCFeR6I1A0SLFO8BgjDFS7dAw
|
||||
HwYDVR0jBBgwFoAUgHfNXfyKtHO0V9qoLOWCjkNiaI8wJAYDVR0eAQH/BBowGKAW
|
||||
MBSCEi5zdmMuY2x1c3Rlci5sb2NhbDAKBggqhkjOPQQDAgNHADBEAiBObVFFdXaL
|
||||
QpOXmN60dYUNnQRwjKreFduEkQgOdOlssgIgVAdJJQFgvlrvEOBhY8j5WyeKRwUN
|
||||
k/ALs6KpgaFBCGY=
|
||||
-----END CERTIFICATE-----
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIB4jCCAYegAwIBAgIUFlxWFn/11yoGdmD+6gf+yQMToS0wCgYIKoZIzj0EAwIw
|
||||
ODELMAkGA1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEVMBMGA1UEAxMMTmVv
|
||||
biBSb290IENBMB4XDTI1MDQwMzA3MTUyMloXDTI2MDQwMzA3MTUyMlowRDELMAkG
|
||||
A1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEhMB8GA1UEAxMYTmVvbiBBV1Mg
|
||||
SW50ZXJtZWRpYXRlIENBMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEqonG/IQ6
|
||||
ZxtEtOUTkkoNopPieXDO5CBKUkNFTGeJEB7OxRlSpYJgsBpaYIaD6Vc4sVk3thIF
|
||||
p+pLw52idQOIN6NjMGEwDgYDVR0PAQH/BAQDAgEGMA8GA1UdEwEB/wQFMAMBAf8w
|
||||
HQYDVR0OBBYEFIB3zV38irRztFfaqCzlgo5DYmiPMB8GA1UdIwQYMBaAFKh7M4/G
|
||||
FHvr/ORDQZt4bMLlJvHCMAoGCCqGSM49BAMCA0kAMEYCIQCbS4x7QPslONzBYbjC
|
||||
UQaQ0QLDW4CJHvQ4u4gbWFG87wIhAJMsHQHjP9qTT27Q65zQCR7O8QeLAfha1jrH
|
||||
Ag/LsxSr
|
||||
-----END CERTIFICATE-----
|
||||
";
|
||||
|
||||
/// The key corresponding to [`CERT`]
|
||||
const KEY: &str = "
|
||||
-----BEGIN EC PRIVATE KEY-----
|
||||
MHcCAQEEIDnAnrqmIJjndCLWP1iIO5X3X63Aia48TGpGuMXwvm6IoAoGCCqGSM49
|
||||
AwEHoUQDQgAExUAnBtpvxFRwImbUrGIFZx4KE41C/ybcH3a6b89rdG1UWZgZkioZ
|
||||
aOd7j1fWOXihRgJkeeKff4E9njt8VO7wIA==
|
||||
-----END EC PRIVATE KEY-----
|
||||
";
|
||||
|
||||
/// An incorrect key.
|
||||
const INCORRECT_KEY: &str = "
|
||||
-----BEGIN EC PRIVATE KEY-----
|
||||
MHcCAQEEIL6WqqBDyvM0HWz7Ir5M5+jhFWB7IzOClGn26OPrzHCXoAoGCCqGSM49
|
||||
AwEHoUQDQgAE7XVvdOy5lfwtNKb+gJEUtnG+DrnnXLY5LsHDeGQKV9PTRcEMeCrG
|
||||
YZzHyML4P6Sr4yi2ts+4B9i47uvAG8+XwQ==
|
||||
-----END EC PRIVATE KEY-----
|
||||
";
|
||||
|
||||
#[test]
|
||||
fn certificate_verification() {
|
||||
verify_key_cert(KEY, CERT).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "private key file does not match certificate")]
|
||||
fn certificate_verification_fail() {
|
||||
verify_key_cert(INCORRECT_KEY, CERT).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,21 +9,20 @@
|
||||
# to verify custom image builds (e.g pre-published ones).
|
||||
#
|
||||
# A test script for postgres extensions
|
||||
# Currently supports only v16
|
||||
# Currently supports only v16+
|
||||
#
|
||||
set -eux -o pipefail
|
||||
|
||||
COMPOSE_FILE='docker-compose.yml'
|
||||
cd $(dirname $0)
|
||||
COMPUTE_CONTAINER_NAME=docker-compose-compute-1
|
||||
TEST_CONTAINER_NAME=docker-compose-neon-test-extensions-1
|
||||
export COMPOSE_FILE='docker-compose.yml'
|
||||
export COMPOSE_PROFILES=test-extensions
|
||||
cd "$(dirname "${0}")"
|
||||
PSQL_OPTION="-h localhost -U cloud_admin -p 55433 -d postgres"
|
||||
|
||||
cleanup() {
|
||||
function cleanup() {
|
||||
echo "show container information"
|
||||
docker ps
|
||||
echo "stop containers..."
|
||||
docker compose --profile test-extensions -f $COMPOSE_FILE down
|
||||
docker compose down
|
||||
}
|
||||
|
||||
for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
|
||||
@@ -31,55 +30,55 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
|
||||
echo "clean up containers if exists"
|
||||
cleanup
|
||||
PG_TEST_VERSION=$((pg_version < 16 ? 16 : pg_version))
|
||||
PG_VERSION=$pg_version PG_TEST_VERSION=$PG_TEST_VERSION docker compose --profile test-extensions -f $COMPOSE_FILE up --quiet-pull --build -d
|
||||
PG_VERSION=${pg_version} PG_TEST_VERSION=${PG_TEST_VERSION} docker compose up --quiet-pull --build -d
|
||||
|
||||
echo "wait until the compute is ready. timeout after 60s. "
|
||||
cnt=0
|
||||
while sleep 3; do
|
||||
# check timeout
|
||||
cnt=`expr $cnt + 3`
|
||||
if [ $cnt -gt 60 ]; then
|
||||
(( cnt += 3 ))
|
||||
if [[ ${cnt} -gt 60 ]]; then
|
||||
echo "timeout before the compute is ready."
|
||||
exit 1
|
||||
fi
|
||||
if docker compose --profile test-extensions -f $COMPOSE_FILE logs "compute_is_ready" | grep -q "accepting connections"; then
|
||||
if docker compose logs "compute_is_ready" | grep -q "accepting connections"; then
|
||||
echo "OK. The compute is ready to connect."
|
||||
echo "execute simple queries."
|
||||
docker exec $COMPUTE_CONTAINER_NAME /bin/bash -c "psql $PSQL_OPTION"
|
||||
docker compose exec compute /bin/bash -c "psql ${PSQL_OPTION} -c 'SELECT 1'"
|
||||
break
|
||||
fi
|
||||
done
|
||||
|
||||
if [ $pg_version -ge 16 ]; then
|
||||
if [[ ${pg_version} -ge 16 ]]; then
|
||||
# This is required for the pg_hint_plan test, to prevent flaky log message causing the test to fail
|
||||
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
|
||||
echo Adding dummy config
|
||||
docker exec $COMPUTE_CONTAINER_NAME touch /var/db/postgres/compute/compute_ctl_temp_override.conf
|
||||
docker compose exec compute touch /var/db/postgres/compute/compute_ctl_temp_override.conf
|
||||
# The following block copies the files for the pg_hintplan test to the compute node for the extension test in an isolated docker-compose environment
|
||||
TMPDIR=$(mktemp -d)
|
||||
docker cp $TEST_CONTAINER_NAME:/ext-src/pg_hint_plan-src/data $TMPDIR/data
|
||||
docker cp $TMPDIR/data $COMPUTE_CONTAINER_NAME:/ext-src/pg_hint_plan-src/
|
||||
rm -rf $TMPDIR
|
||||
docker compose cp neon-test-extensions:/ext-src/pg_hint_plan-src/data "${TMPDIR}/data"
|
||||
docker compose cp "${TMPDIR}/data" compute:/ext-src/pg_hint_plan-src/
|
||||
rm -rf "${TMPDIR}"
|
||||
# The following block does the same for the contrib/file_fdw test
|
||||
TMPDIR=$(mktemp -d)
|
||||
docker cp $TEST_CONTAINER_NAME:/postgres/contrib/file_fdw/data $TMPDIR/data
|
||||
docker cp $TMPDIR/data $COMPUTE_CONTAINER_NAME:/postgres/contrib/file_fdw/data
|
||||
rm -rf $TMPDIR
|
||||
docker compose cp neon-test-extensions:/postgres/contrib/file_fdw/data "${TMPDIR}/data"
|
||||
docker compose cp "${TMPDIR}/data" compute:/postgres/contrib/file_fdw/data
|
||||
rm -rf "${TMPDIR}"
|
||||
# Apply patches
|
||||
cat ../compute/patches/contrib_pg${pg_version}.patch | docker exec -i $TEST_CONTAINER_NAME bash -c "(cd /postgres && patch -p1)"
|
||||
docker compose exec -i neon-test-extensions bash -c "(cd /postgres && patch -p1)" <"../compute/patches/contrib_pg${pg_version}.patch"
|
||||
# We are running tests now
|
||||
rm -f testout.txt testout_contrib.txt
|
||||
docker exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src,rag_jina_reranker_v1_tiny_en-src,rag_bge_small_en_v15-src \
|
||||
$TEST_CONTAINER_NAME /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
|
||||
docker exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
|
||||
$TEST_CONTAINER_NAME /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0
|
||||
if [ $EXT_SUCCESS -eq 0 ] || [ $CONTRIB_SUCCESS -eq 0 ]; then
|
||||
docker compose exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src,rag_jina_reranker_v1_tiny_en-src,rag_bge_small_en_v15-src \
|
||||
neon-test-extensions /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
|
||||
docker compose exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
|
||||
neon-test-extensions /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0
|
||||
if [[ ${EXT_SUCCESS} -eq 0 || ${CONTRIB_SUCCESS} -eq 0 ]]; then
|
||||
CONTRIB_FAILED=
|
||||
FAILED=
|
||||
[ $EXT_SUCCESS -eq 0 ] && FAILED=$(tail -1 testout.txt | awk '{for(i=1;i<=NF;i++){print "/ext-src/"$i;}}')
|
||||
[ $CONTRIB_SUCCESS -eq 0 ] && CONTRIB_FAILED=$(tail -1 testout_contrib.txt | awk '{for(i=0;i<=NF;i++){print "/postgres/contrib/"$i;}}')
|
||||
for d in $FAILED $CONTRIB_FAILED; do
|
||||
docker exec $TEST_CONTAINER_NAME bash -c 'for file in $(find '"$d"' -name regression.diffs -o -name regression.out); do cat $file; done' || [ $? -eq 1 ]
|
||||
[[ ${EXT_SUCCESS} -eq 0 ]] && FAILED=$(tail -1 testout.txt | awk '{for(i=1;i<=NF;i++){print "/ext-src/"$i;}}')
|
||||
[[ ${CONTRIB_SUCCESS} -eq 0 ]] && CONTRIB_FAILED=$(tail -1 testout_contrib.txt | awk '{for(i=0;i<=NF;i++){print "/postgres/contrib/"$i;}}')
|
||||
for d in ${FAILED} ${CONTRIB_FAILED}; do
|
||||
docker compose exec neon-test-extensions bash -c 'for file in $(find '"${d}"' -name regression.diffs -o -name regression.out); do cat ${file}; done' || [[ ${?} -eq 1 ]]
|
||||
done
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -169,6 +169,8 @@ pub struct TenantDescribeResponseShard {
|
||||
pub is_pending_compute_notification: bool,
|
||||
/// A shard split is currently underway
|
||||
pub is_splitting: bool,
|
||||
/// A timeline is being imported into this tenant
|
||||
pub is_importing: bool,
|
||||
|
||||
pub scheduling_policy: ShardSchedulingPolicy,
|
||||
|
||||
|
||||
@@ -1803,6 +1803,8 @@ pub struct TopTenantShardsResponse {
|
||||
}
|
||||
|
||||
pub mod virtual_file {
|
||||
use std::sync::LazyLock;
|
||||
|
||||
#[derive(
|
||||
Copy,
|
||||
Clone,
|
||||
@@ -1840,35 +1842,33 @@ pub mod virtual_file {
|
||||
pub enum IoMode {
|
||||
/// Uses buffered IO.
|
||||
Buffered,
|
||||
/// Uses direct IO, error out if the operation fails.
|
||||
/// Uses direct IO for reads only.
|
||||
#[cfg(target_os = "linux")]
|
||||
Direct,
|
||||
/// Use direct IO for reads and writes.
|
||||
#[cfg(target_os = "linux")]
|
||||
DirectRw,
|
||||
}
|
||||
|
||||
impl IoMode {
|
||||
pub fn preferred() -> Self {
|
||||
// The default behavior when running Rust unit tests without any further
|
||||
// flags is to use the newest behavior if available on the platform (Direct).
|
||||
// flags is to use the newest behavior (DirectRw).
|
||||
// The CI uses the following environment variable to unit tests for all
|
||||
// different modes.
|
||||
// NB: the Python regression & perf tests have their own defaults management
|
||||
// that writes pageserver.toml; they do not use this variable.
|
||||
if cfg!(test) {
|
||||
use once_cell::sync::Lazy;
|
||||
static CACHED: Lazy<IoMode> = Lazy::new(|| {
|
||||
static CACHED: LazyLock<IoMode> = LazyLock::new(|| {
|
||||
utils::env::var_serde_json_string(
|
||||
"NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IO_MODE",
|
||||
)
|
||||
.unwrap_or({
|
||||
.unwrap_or(
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
IoMode::Direct
|
||||
}
|
||||
IoMode::DirectRw,
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
IoMode::Buffered
|
||||
}
|
||||
})
|
||||
IoMode::Buffered,
|
||||
)
|
||||
});
|
||||
*CACHED
|
||||
} else {
|
||||
@@ -1885,6 +1885,8 @@ pub mod virtual_file {
|
||||
v if v == (IoMode::Buffered as u8) => IoMode::Buffered,
|
||||
#[cfg(target_os = "linux")]
|
||||
v if v == (IoMode::Direct as u8) => IoMode::Direct,
|
||||
#[cfg(target_os = "linux")]
|
||||
v if v == (IoMode::DirectRw as u8) => IoMode::DirectRw,
|
||||
x => return Err(x),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -106,6 +106,7 @@ hex-literal.workspace = true
|
||||
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }
|
||||
indoc.workspace = true
|
||||
uuid.workspace = true
|
||||
rstest.workspace = true
|
||||
|
||||
[[bench]]
|
||||
name = "bench_layer_map"
|
||||
|
||||
@@ -61,7 +61,7 @@ async fn ingest(
|
||||
|
||||
tokio::fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id)).await?;
|
||||
|
||||
let ctx2 =
|
||||
let ctx =
|
||||
RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error).with_scope_debug_tools();
|
||||
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
@@ -248,6 +248,8 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||
IoMode::Buffered,
|
||||
#[cfg(target_os = "linux")]
|
||||
IoMode::Direct,
|
||||
#[cfg(target_os = "linux")]
|
||||
IoMode::DirectRw,
|
||||
] {
|
||||
for param in expect.clone() {
|
||||
let HandPickedParameters {
|
||||
@@ -309,78 +311,114 @@ cargo bench --bench bench_ingest
|
||||
im4gn.2xlarge:
|
||||
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
|
||||
time: [1.8491 s 1.8540 s 1.8592 s]
|
||||
thrpt: [68.847 MiB/s 69.039 MiB/s 69.222 MiB/s]
|
||||
time: [1.2901 s 1.2943 s 1.2991 s]
|
||||
thrpt: [98.533 MiB/s 98.892 MiB/s 99.220 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
|
||||
time: [2.6976 s 2.7123 s 2.7286 s]
|
||||
thrpt: [46.911 MiB/s 47.193 MiB/s 47.450 MiB/s]
|
||||
time: [2.1387 s 2.1623 s 2.1845 s]
|
||||
thrpt: [58.595 MiB/s 59.197 MiB/s 59.851 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
|
||||
time: [1.7433 s 1.7510 s 1.7600 s]
|
||||
thrpt: [72.729 MiB/s 73.099 MiB/s 73.423 MiB/s]
|
||||
time: [1.2036 s 1.2074 s 1.2122 s]
|
||||
thrpt: [105.60 MiB/s 106.01 MiB/s 106.35 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
|
||||
time: [499.63 ms 500.07 ms 500.46 ms]
|
||||
thrpt: [255.77 MiB/s 255.96 MiB/s 256.19 MiB/s]
|
||||
time: [520.55 ms 521.46 ms 522.57 ms]
|
||||
thrpt: [244.94 MiB/s 245.47 MiB/s 245.89 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
|
||||
time: [456.97 ms 459.61 ms 461.92 ms]
|
||||
thrpt: [277.11 MiB/s 278.50 MiB/s 280.11 MiB/s]
|
||||
time: [440.33 ms 442.24 ms 444.10 ms]
|
||||
thrpt: [288.22 MiB/s 289.43 MiB/s 290.69 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
|
||||
time: [158.82 ms 159.16 ms 159.56 ms]
|
||||
thrpt: [802.22 MiB/s 804.24 MiB/s 805.93 MiB/s]
|
||||
time: [168.78 ms 169.42 ms 170.18 ms]
|
||||
thrpt: [752.16 MiB/s 755.52 MiB/s 758.40 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
|
||||
time: [1.8856 s 1.8997 s 1.9179 s]
|
||||
thrpt: [66.740 MiB/s 67.380 MiB/s 67.882 MiB/s]
|
||||
time: [1.2978 s 1.3094 s 1.3227 s]
|
||||
thrpt: [96.775 MiB/s 97.758 MiB/s 98.632 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
|
||||
time: [2.7468 s 2.7625 s 2.7785 s]
|
||||
thrpt: [46.068 MiB/s 46.335 MiB/s 46.600 MiB/s]
|
||||
time: [2.1976 s 2.2067 s 2.2154 s]
|
||||
thrpt: [57.777 MiB/s 58.006 MiB/s 58.245 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes
|
||||
time: [1.7689 s 1.7726 s 1.7767 s]
|
||||
thrpt: [72.045 MiB/s 72.208 MiB/s 72.363 MiB/s]
|
||||
time: [1.2103 s 1.2160 s 1.2233 s]
|
||||
thrpt: [104.64 MiB/s 105.26 MiB/s 105.76 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
|
||||
time: [497.64 ms 498.60 ms 499.67 ms]
|
||||
thrpt: [256.17 MiB/s 256.72 MiB/s 257.21 MiB/s]
|
||||
time: [525.05 ms 526.37 ms 527.79 ms]
|
||||
thrpt: [242.52 MiB/s 243.17 MiB/s 243.79 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
|
||||
time: [493.72 ms 505.07 ms 518.03 ms]
|
||||
thrpt: [247.09 MiB/s 253.43 MiB/s 259.26 MiB/s]
|
||||
time: [443.06 ms 444.88 ms 447.15 ms]
|
||||
thrpt: [286.26 MiB/s 287.72 MiB/s 288.90 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
|
||||
time: [267.76 ms 267.85 ms 267.96 ms]
|
||||
thrpt: [477.69 MiB/s 477.88 MiB/s 478.03 MiB/s]
|
||||
time: [169.40 ms 169.80 ms 170.17 ms]
|
||||
thrpt: [752.21 MiB/s 753.81 MiB/s 755.60 MiB/s]
|
||||
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
|
||||
time: [1.2844 s 1.2915 s 1.2990 s]
|
||||
thrpt: [98.536 MiB/s 99.112 MiB/s 99.657 MiB/s]
|
||||
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
|
||||
time: [2.1431 s 2.1663 s 2.1900 s]
|
||||
thrpt: [58.446 MiB/s 59.087 MiB/s 59.726 MiB/s]
|
||||
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
|
||||
time: [1.1906 s 1.1926 s 1.1947 s]
|
||||
thrpt: [107.14 MiB/s 107.33 MiB/s 107.51 MiB/s]
|
||||
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
|
||||
time: [516.86 ms 518.25 ms 519.47 ms]
|
||||
thrpt: [246.40 MiB/s 246.98 MiB/s 247.65 MiB/s]
|
||||
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
|
||||
time: [536.50 ms 536.53 ms 536.60 ms]
|
||||
thrpt: [238.54 MiB/s 238.57 MiB/s 238.59 MiB/s]
|
||||
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
|
||||
time: [267.77 ms 267.90 ms 268.04 ms]
|
||||
thrpt: [477.53 MiB/s 477.79 MiB/s 478.02 MiB/s]
|
||||
|
||||
Hetzner AX102:
|
||||
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
|
||||
time: [1.0683 s 1.1006 s 1.1386 s]
|
||||
thrpt: [112.42 MiB/s 116.30 MiB/s 119.82 MiB/s]
|
||||
time: [836.58 ms 861.93 ms 886.57 ms]
|
||||
thrpt: [144.38 MiB/s 148.50 MiB/s 153.00 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
|
||||
time: [1.5719 s 1.6012 s 1.6228 s]
|
||||
thrpt: [78.877 MiB/s 79.938 MiB/s 81.430 MiB/s]
|
||||
time: [1.2782 s 1.3191 s 1.3665 s]
|
||||
thrpt: [93.668 MiB/s 97.037 MiB/s 100.14 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
|
||||
time: [1.1095 s 1.1331 s 1.1580 s]
|
||||
thrpt: [110.53 MiB/s 112.97 MiB/s 115.37 MiB/s]
|
||||
time: [791.27 ms 807.08 ms 822.95 ms]
|
||||
thrpt: [155.54 MiB/s 158.60 MiB/s 161.77 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
|
||||
time: [303.20 ms 307.83 ms 311.90 ms]
|
||||
thrpt: [410.39 MiB/s 415.81 MiB/s 422.16 MiB/s]
|
||||
time: [310.78 ms 314.66 ms 318.47 ms]
|
||||
thrpt: [401.92 MiB/s 406.79 MiB/s 411.87 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
|
||||
time: [406.34 ms 429.37 ms 451.63 ms]
|
||||
thrpt: [283.42 MiB/s 298.11 MiB/s 315.00 MiB/s]
|
||||
time: [377.11 ms 387.77 ms 399.21 ms]
|
||||
thrpt: [320.63 MiB/s 330.10 MiB/s 339.42 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
|
||||
time: [134.01 ms 135.78 ms 137.48 ms]
|
||||
thrpt: [931.03 MiB/s 942.68 MiB/s 955.12 MiB/s]
|
||||
time: [128.37 ms 132.96 ms 138.55 ms]
|
||||
thrpt: [923.83 MiB/s 962.69 MiB/s 997.11 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
|
||||
time: [1.0406 s 1.0580 s 1.0772 s]
|
||||
thrpt: [118.83 MiB/s 120.98 MiB/s 123.00 MiB/s]
|
||||
time: [900.38 ms 914.88 ms 928.86 ms]
|
||||
thrpt: [137.80 MiB/s 139.91 MiB/s 142.16 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
|
||||
time: [1.5059 s 1.5339 s 1.5625 s]
|
||||
thrpt: [81.920 MiB/s 83.448 MiB/s 84.999 MiB/s]
|
||||
time: [1.2538 s 1.2936 s 1.3313 s]
|
||||
thrpt: [96.149 MiB/s 98.946 MiB/s 102.09 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes
|
||||
time: [1.0714 s 1.0934 s 1.1161 s]
|
||||
thrpt: [114.69 MiB/s 117.06 MiB/s 119.47 MiB/s]
|
||||
time: [787.17 ms 803.89 ms 820.63 ms]
|
||||
thrpt: [155.98 MiB/s 159.23 MiB/s 162.61 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
|
||||
time: [262.68 ms 265.14 ms 267.71 ms]
|
||||
thrpt: [478.13 MiB/s 482.76 MiB/s 487.29 MiB/s]
|
||||
time: [318.78 ms 321.89 ms 324.74 ms]
|
||||
thrpt: [394.16 MiB/s 397.65 MiB/s 401.53 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
|
||||
time: [375.19 ms 393.80 ms 411.40 ms]
|
||||
thrpt: [311.14 MiB/s 325.04 MiB/s 341.16 MiB/s]
|
||||
time: [374.01 ms 383.45 ms 393.20 ms]
|
||||
thrpt: [325.53 MiB/s 333.81 MiB/s 342.24 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
|
||||
time: [123.02 ms 123.85 ms 124.66 ms]
|
||||
thrpt: [1.0027 GiB/s 1.0093 GiB/s 1.0161 GiB/s]
|
||||
time: [137.98 ms 141.31 ms 143.57 ms]
|
||||
thrpt: [891.58 MiB/s 905.79 MiB/s 927.66 MiB/s]
|
||||
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
|
||||
time: [613.69 ms 622.48 ms 630.97 ms]
|
||||
thrpt: [202.86 MiB/s 205.63 MiB/s 208.57 MiB/s]
|
||||
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
|
||||
time: [1.0299 s 1.0766 s 1.1273 s]
|
||||
thrpt: [113.55 MiB/s 118.90 MiB/s 124.29 MiB/s]
|
||||
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
|
||||
time: [637.80 ms 647.78 ms 658.01 ms]
|
||||
thrpt: [194.53 MiB/s 197.60 MiB/s 200.69 MiB/s]
|
||||
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
|
||||
time: [266.09 ms 267.20 ms 268.31 ms]
|
||||
thrpt: [477.06 MiB/s 479.04 MiB/s 481.04 MiB/s]
|
||||
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
|
||||
time: [269.34 ms 273.27 ms 277.69 ms]
|
||||
thrpt: [460.95 MiB/s 468.40 MiB/s 475.24 MiB/s]
|
||||
ingest/io_mode=DirectRw volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
|
||||
time: [123.18 ms 124.24 ms 125.15 ms]
|
||||
thrpt: [1022.8 MiB/s 1.0061 GiB/s 1.0148 GiB/s]
|
||||
*/
|
||||
|
||||
@@ -1289,6 +1289,7 @@ pub(crate) enum StorageIoOperation {
|
||||
Seek,
|
||||
Fsync,
|
||||
Metadata,
|
||||
SetLen,
|
||||
}
|
||||
|
||||
impl StorageIoOperation {
|
||||
@@ -1303,6 +1304,7 @@ impl StorageIoOperation {
|
||||
StorageIoOperation::Seek => "seek",
|
||||
StorageIoOperation::Fsync => "fsync",
|
||||
StorageIoOperation::Metadata => "metadata",
|
||||
StorageIoOperation::SetLen => "set_len",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3816,6 +3816,24 @@ impl TenantShard {
|
||||
MaybeDeletedIndexPart::IndexPart(p) => p,
|
||||
};
|
||||
|
||||
// A shard split may not take place while a timeline import is on-going
|
||||
// for the tenant. Timeline imports run as part of each tenant shard
|
||||
// and rely on the sharding scheme to split the work among pageservers.
|
||||
// If we were to split in the middle of this process, we would have to
|
||||
// either ensure that it's driven to completion on the old shard set
|
||||
// or transfer it to the new shard set. It's technically possible, but complex.
|
||||
match index_part.import_pgdata {
|
||||
Some(ref import) if !import.is_done() => {
|
||||
anyhow::bail!(
|
||||
"Cannot split due to import with idempotency key: {:?}",
|
||||
import.idempotency_key()
|
||||
);
|
||||
}
|
||||
Some(_) | None => {
|
||||
// fallthrough
|
||||
}
|
||||
}
|
||||
|
||||
for child_shard in child_shards {
|
||||
tracing::info!(%timeline_id, "Uploading index_part for child {}", child_shard.to_index());
|
||||
upload_index_part(
|
||||
|
||||
@@ -15,21 +15,23 @@
|
||||
//! len >= 128: 1CCCXXXX XXXXXXXX XXXXXXXX XXXXXXXX
|
||||
//!
|
||||
use std::cmp::min;
|
||||
use std::io::Error;
|
||||
|
||||
use anyhow::Context;
|
||||
use async_compression::Level;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use crate::virtual_file::TempVirtualFile;
|
||||
use crate::virtual_file::IoBufferMut;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
|
||||
use crate::virtual_file::owned_buffers_io::write::{BufferedWriter, FlushTaskError};
|
||||
use crate::virtual_file::owned_buffers_io::write::{BufferedWriterShutdownMode, OwnedAsyncWriter};
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct CompressionInfo {
|
||||
@@ -50,12 +52,9 @@ pub struct Header {
|
||||
|
||||
impl Header {
|
||||
/// Decodes a header from a byte slice.
|
||||
pub fn decode(bytes: &[u8]) -> Result<Self, std::io::Error> {
|
||||
pub fn decode(bytes: &[u8]) -> anyhow::Result<Self> {
|
||||
let Some(&first_header_byte) = bytes.first() else {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
"zero-length blob header",
|
||||
));
|
||||
anyhow::bail!("zero-length blob header");
|
||||
};
|
||||
|
||||
// If the first bit is 0, this is just a 1-byte length prefix up to 128 bytes.
|
||||
@@ -69,12 +68,9 @@ impl Header {
|
||||
|
||||
// Otherwise, this is a 4-byte header containing compression information and length.
|
||||
const HEADER_LEN: usize = 4;
|
||||
let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN].try_into().map_err(|_| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("blob header too short: {bytes:?}"),
|
||||
)
|
||||
})?;
|
||||
let mut header_buf: [u8; HEADER_LEN] = bytes[0..HEADER_LEN]
|
||||
.try_into()
|
||||
.map_err(|_| anyhow::anyhow!("blob header too short: {bytes:?}"))?;
|
||||
|
||||
// TODO: verify the compression bits and convert to an enum.
|
||||
let compression_bits = header_buf[0] & LEN_COMPRESSION_BIT_MASK;
|
||||
@@ -94,6 +90,16 @@ impl Header {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum WriteBlobError {
|
||||
#[error(transparent)]
|
||||
Flush(FlushTaskError),
|
||||
#[error("blob too large ({len} bytes)")]
|
||||
BlobTooLarge { len: usize },
|
||||
#[error(transparent)]
|
||||
WriteBlobRaw(anyhow::Error),
|
||||
}
|
||||
|
||||
impl BlockCursor<'_> {
|
||||
/// Read a blob into a new buffer.
|
||||
pub async fn read_blob(
|
||||
@@ -213,143 +219,64 @@ pub(super) const BYTE_UNCOMPRESSED: u8 = 0x80;
|
||||
pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
|
||||
|
||||
/// A wrapper of `VirtualFile` that allows users to write blobs.
|
||||
///
|
||||
/// If a `BlobWriter` is dropped, the internal buffer will be
|
||||
/// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
|
||||
/// manually before dropping.
|
||||
pub struct BlobWriter<const BUFFERED: bool> {
|
||||
inner: TempVirtualFile,
|
||||
offset: u64,
|
||||
/// A buffer to save on write calls, only used if BUFFERED=true
|
||||
buf: Vec<u8>,
|
||||
pub struct BlobWriter<W> {
|
||||
/// We do tiny writes for the length headers; they need to be in an owned buffer;
|
||||
io_buf: Option<BytesMut>,
|
||||
writer: BufferedWriter<IoBufferMut, W>,
|
||||
offset: u64,
|
||||
}
|
||||
|
||||
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
impl<W> BlobWriter<W>
|
||||
where
|
||||
W: OwnedAsyncWriter + std::fmt::Debug + Send + Sync + 'static,
|
||||
{
|
||||
/// See [`BufferedWriter`] struct-level doc comment for semantics of `start_offset`.
|
||||
pub fn new(
|
||||
inner: TempVirtualFile,
|
||||
file: W,
|
||||
start_offset: u64,
|
||||
_gate: &utils::sync::gate::Gate,
|
||||
_cancel: CancellationToken,
|
||||
_ctx: &RequestContext,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
offset: start_offset,
|
||||
buf: Vec::with_capacity(Self::CAPACITY),
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
flush_task_span: tracing::Span,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
io_buf: Some(BytesMut::new()),
|
||||
}
|
||||
writer: BufferedWriter::new(
|
||||
file,
|
||||
start_offset,
|
||||
|| IoBufferMut::with_capacity(Self::CAPACITY),
|
||||
gate.enter()?,
|
||||
cancel,
|
||||
ctx,
|
||||
flush_task_span,
|
||||
),
|
||||
offset: start_offset,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
self.offset
|
||||
}
|
||||
|
||||
const CAPACITY: usize = if BUFFERED { 64 * 1024 } else { 0 };
|
||||
const CAPACITY: usize = 64 * 1024;
|
||||
|
||||
/// Writes the given buffer directly to the underlying `VirtualFile`.
|
||||
/// You need to make sure that the internal buffer is empty, otherwise
|
||||
/// data will be written in wrong order.
|
||||
#[inline(always)]
|
||||
async fn write_all_unbuffered<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
src_buf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<(), Error>) {
|
||||
let (src_buf, res) = self.inner.write_all(src_buf, ctx).await;
|
||||
let nbytes = match res {
|
||||
Ok(nbytes) => nbytes,
|
||||
Err(e) => return (src_buf, Err(e)),
|
||||
};
|
||||
self.offset += nbytes as u64;
|
||||
(src_buf, Ok(()))
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
/// Flushes the internal buffer to the underlying `VirtualFile`.
|
||||
pub async fn flush_buffer(&mut self, ctx: &RequestContext) -> Result<(), Error> {
|
||||
let buf = std::mem::take(&mut self.buf);
|
||||
let (slice, res) = self.inner.write_all(buf.slice_len(), ctx).await;
|
||||
res?;
|
||||
let mut buf = slice.into_raw_slice().into_inner();
|
||||
buf.clear();
|
||||
self.buf = buf;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
/// Writes as much of `src_buf` into the internal buffer as it fits
|
||||
fn write_into_buffer(&mut self, src_buf: &[u8]) -> usize {
|
||||
let remaining = Self::CAPACITY - self.buf.len();
|
||||
let to_copy = src_buf.len().min(remaining);
|
||||
self.buf.extend_from_slice(&src_buf[..to_copy]);
|
||||
self.offset += to_copy as u64;
|
||||
to_copy
|
||||
}
|
||||
|
||||
/// Internal, possibly buffered, write function
|
||||
/// Writes `src_buf` to the file at the current offset.
|
||||
async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
src_buf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<(), Error>) {
|
||||
let src_buf = src_buf.into_raw_slice();
|
||||
let src_buf_bounds = src_buf.bounds();
|
||||
let restore = move |src_buf_slice: Slice<_>| {
|
||||
FullSlice::must_new(Slice::from_buf_bounds(
|
||||
src_buf_slice.into_inner(),
|
||||
src_buf_bounds,
|
||||
))
|
||||
};
|
||||
) -> (FullSlice<Buf>, Result<(), FlushTaskError>) {
|
||||
let res = self
|
||||
.writer
|
||||
// TODO: why are we taking a FullSlice if we're going to pass a borrow downstack?
|
||||
// Can remove all the complexity around owned buffers upstack
|
||||
.write_buffered_borrowed(&src_buf, ctx)
|
||||
.await
|
||||
.map(|len| {
|
||||
self.offset += len as u64;
|
||||
});
|
||||
|
||||
if !BUFFERED {
|
||||
assert!(self.buf.is_empty());
|
||||
return self
|
||||
.write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
|
||||
.await;
|
||||
}
|
||||
let remaining = Self::CAPACITY - self.buf.len();
|
||||
let src_buf_len = src_buf.bytes_init();
|
||||
if src_buf_len == 0 {
|
||||
return (restore(src_buf), Ok(()));
|
||||
}
|
||||
let mut src_buf = src_buf.slice(0..src_buf_len);
|
||||
// First try to copy as much as we can into the buffer
|
||||
if remaining > 0 {
|
||||
let copied = self.write_into_buffer(&src_buf);
|
||||
src_buf = src_buf.slice(copied..);
|
||||
}
|
||||
// Then, if the buffer is full, flush it out
|
||||
if self.buf.len() == Self::CAPACITY {
|
||||
if let Err(e) = self.flush_buffer(ctx).await {
|
||||
return (restore(src_buf), Err(e));
|
||||
}
|
||||
}
|
||||
// Finally, write the tail of src_buf:
|
||||
// If it wholly fits into the buffer without
|
||||
// completely filling it, then put it there.
|
||||
// If not, write it out directly.
|
||||
let src_buf = if !src_buf.is_empty() {
|
||||
assert_eq!(self.buf.len(), 0);
|
||||
if src_buf.len() < Self::CAPACITY {
|
||||
let copied = self.write_into_buffer(&src_buf);
|
||||
// We just verified above that src_buf fits into our internal buffer.
|
||||
assert_eq!(copied, src_buf.len());
|
||||
restore(src_buf)
|
||||
} else {
|
||||
let (src_buf, res) = self
|
||||
.write_all_unbuffered(FullSlice::must_new(src_buf), ctx)
|
||||
.await;
|
||||
if let Err(e) = res {
|
||||
return (src_buf, Err(e));
|
||||
}
|
||||
src_buf
|
||||
}
|
||||
} else {
|
||||
restore(src_buf)
|
||||
};
|
||||
(src_buf, Ok(()))
|
||||
(src_buf, res)
|
||||
}
|
||||
|
||||
/// Write a blob of data. Returns the offset that it was written to,
|
||||
@@ -358,7 +285,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
&mut self,
|
||||
srcbuf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<u64, Error>) {
|
||||
) -> (FullSlice<Buf>, Result<u64, WriteBlobError>) {
|
||||
let (buf, res) = self
|
||||
.write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
|
||||
.await;
|
||||
@@ -372,7 +299,10 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
srcbuf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
algorithm: ImageCompressionAlgorithm,
|
||||
) -> (FullSlice<Buf>, Result<(u64, CompressionInfo), Error>) {
|
||||
) -> (
|
||||
FullSlice<Buf>,
|
||||
Result<(u64, CompressionInfo), WriteBlobError>,
|
||||
) {
|
||||
let offset = self.offset;
|
||||
let mut compression_info = CompressionInfo {
|
||||
written_compressed: false,
|
||||
@@ -388,14 +318,16 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
if len < 128 {
|
||||
// Short blob. Write a 1-byte length header
|
||||
io_buf.put_u8(len as u8);
|
||||
(self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
|
||||
let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
|
||||
let res = res.map_err(WriteBlobError::Flush);
|
||||
((slice, res), srcbuf)
|
||||
} else {
|
||||
// Write a 4-byte length header
|
||||
if len > MAX_SUPPORTED_BLOB_LEN {
|
||||
return (
|
||||
(
|
||||
io_buf.slice_len(),
|
||||
Err(Error::other(format!("blob too large ({len} bytes)"))),
|
||||
Err(WriteBlobError::BlobTooLarge { len }),
|
||||
),
|
||||
srcbuf,
|
||||
);
|
||||
@@ -429,7 +361,9 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
assert_eq!(len_buf[0] & 0xf0, 0);
|
||||
len_buf[0] |= high_bit_mask;
|
||||
io_buf.extend_from_slice(&len_buf[..]);
|
||||
(self.write_all(io_buf.slice_len(), ctx).await, srcbuf)
|
||||
let (slice, res) = self.write_all(io_buf.slice_len(), ctx).await;
|
||||
let res = res.map_err(WriteBlobError::Flush);
|
||||
((slice, res), srcbuf)
|
||||
}
|
||||
}
|
||||
.await;
|
||||
@@ -444,6 +378,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
} else {
|
||||
self.write_all(srcbuf, ctx).await
|
||||
};
|
||||
let res = res.map_err(WriteBlobError::Flush);
|
||||
(srcbuf, res.map(|_| (offset, compression_info)))
|
||||
}
|
||||
|
||||
@@ -452,9 +387,12 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
&mut self,
|
||||
raw_with_header: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<u64, Error>) {
|
||||
) -> (FullSlice<Buf>, Result<u64, WriteBlobError>) {
|
||||
// Verify the header, to ensure we don't write invalid/corrupt data.
|
||||
let header = match Header::decode(&raw_with_header) {
|
||||
let header = match Header::decode(&raw_with_header)
|
||||
.context("decoding blob header")
|
||||
.map_err(WriteBlobError::WriteBlobRaw)
|
||||
{
|
||||
Ok(header) => header,
|
||||
Err(err) => return (raw_with_header, Err(err)),
|
||||
};
|
||||
@@ -463,29 +401,26 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
let raw_len = raw_with_header.len();
|
||||
return (
|
||||
raw_with_header,
|
||||
Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("header length mismatch: {header_total_len} != {raw_len}"),
|
||||
)),
|
||||
Err(WriteBlobError::WriteBlobRaw(anyhow::anyhow!(
|
||||
"header length mismatch: {header_total_len} != {raw_len}"
|
||||
))),
|
||||
);
|
||||
}
|
||||
|
||||
let offset = self.offset;
|
||||
let (raw_with_header, result) = self.write_all(raw_with_header, ctx).await;
|
||||
let result = result.map_err(WriteBlobError::Flush);
|
||||
(raw_with_header, result.map(|_| offset))
|
||||
}
|
||||
}
|
||||
|
||||
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
/// Finish this blob writer and return the underlying [`TempVirtualFile`].
|
||||
///
|
||||
/// If there is an internal buffer (depends on `BUFFERED`), it will
|
||||
/// be flushed before this method returns.
|
||||
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<TempVirtualFile, Error> {
|
||||
if BUFFERED {
|
||||
self.flush_buffer(ctx).await?;
|
||||
}
|
||||
Ok(self.inner)
|
||||
/// Finish this blob writer and return the underlying `W`.
|
||||
pub async fn shutdown(
|
||||
self,
|
||||
mode: BufferedWriterShutdownMode,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<W, FlushTaskError> {
|
||||
let (_, file) = self.writer.shutdown(mode, ctx).await?;
|
||||
Ok(file)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -494,22 +429,25 @@ pub(crate) mod tests {
|
||||
use camino::Utf8PathBuf;
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use tracing::info_span;
|
||||
|
||||
use super::*;
|
||||
use crate::context::DownloadBehavior;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::block_io::BlockReaderRef;
|
||||
use crate::virtual_file;
|
||||
use crate::virtual_file::TempVirtualFile;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
|
||||
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
|
||||
round_trip_test_compressed::<BUFFERED>(blobs, false).await
|
||||
async fn round_trip_test(blobs: &[Vec<u8>]) -> anyhow::Result<()> {
|
||||
round_trip_test_compressed(blobs, false).await
|
||||
}
|
||||
|
||||
pub(crate) async fn write_maybe_compressed<const BUFFERED: bool>(
|
||||
pub(crate) async fn write_maybe_compressed(
|
||||
blobs: &[Vec<u8>],
|
||||
compression: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>), Error> {
|
||||
) -> anyhow::Result<(Utf8TempDir, Utf8PathBuf, Vec<u64>)> {
|
||||
let temp_dir = camino_tempfile::tempdir()?;
|
||||
let pathbuf = temp_dir.path().join("file");
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
@@ -519,10 +457,18 @@ pub(crate) mod tests {
|
||||
let mut offsets = Vec::new();
|
||||
{
|
||||
let file = TempVirtualFile::new(
|
||||
VirtualFile::create(pathbuf.as_path(), ctx).await?,
|
||||
gate.enter().unwrap(),
|
||||
VirtualFile::open_with_options_v2(
|
||||
pathbuf.as_path(),
|
||||
virtual_file::OpenOptions::new()
|
||||
.create_new(true)
|
||||
.write(true),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
gate.enter()?,
|
||||
);
|
||||
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
|
||||
let mut wtr =
|
||||
BlobWriter::new(file, 0, &gate, cancel.clone(), ctx, info_span!("test")).unwrap();
|
||||
for blob in blobs.iter() {
|
||||
let (_, res) = if compression {
|
||||
let res = wtr
|
||||
@@ -539,28 +485,28 @@ pub(crate) mod tests {
|
||||
let offs = res?;
|
||||
offsets.push(offs);
|
||||
}
|
||||
// Write out one page worth of zeros so that we can
|
||||
// read again with read_blk
|
||||
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
|
||||
let offs = res?;
|
||||
println!("Writing final blob at offs={offs}");
|
||||
|
||||
let file = wtr.into_inner(ctx).await?;
|
||||
file.disarm_into_inner();
|
||||
}
|
||||
let file = wtr
|
||||
.shutdown(
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
file.disarm_into_inner()
|
||||
};
|
||||
Ok((temp_dir, pathbuf, offsets))
|
||||
}
|
||||
|
||||
async fn round_trip_test_compressed<const BUFFERED: bool>(
|
||||
async fn round_trip_test_compressed(
|
||||
blobs: &[Vec<u8>],
|
||||
compression: bool,
|
||||
) -> Result<(), Error> {
|
||||
) -> anyhow::Result<()> {
|
||||
let ctx =
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
|
||||
let (_temp_dir, pathbuf, offsets) =
|
||||
write_maybe_compressed::<BUFFERED>(blobs, compression, &ctx).await?;
|
||||
write_maybe_compressed(blobs, compression, &ctx).await?;
|
||||
|
||||
let file = VirtualFile::open(pathbuf, &ctx).await?;
|
||||
println!("Done writing!");
|
||||
let file = VirtualFile::open_v2(pathbuf, &ctx).await?;
|
||||
let rdr = BlockReaderRef::VirtualFile(&file);
|
||||
let rdr = BlockCursor::new_with_compression(rdr, compression);
|
||||
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
|
||||
@@ -579,30 +525,27 @@ pub(crate) mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_one() -> Result<(), Error> {
|
||||
async fn test_one() -> anyhow::Result<()> {
|
||||
let blobs = &[vec![12, 21, 22]];
|
||||
round_trip_test::<false>(blobs).await?;
|
||||
round_trip_test::<true>(blobs).await?;
|
||||
round_trip_test(blobs).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_hello_simple() -> Result<(), Error> {
|
||||
async fn test_hello_simple() -> anyhow::Result<()> {
|
||||
let blobs = &[
|
||||
vec![0, 1, 2, 3],
|
||||
b"Hello, World!".to_vec(),
|
||||
Vec::new(),
|
||||
b"foobar".to_vec(),
|
||||
];
|
||||
round_trip_test::<false>(blobs).await?;
|
||||
round_trip_test::<true>(blobs).await?;
|
||||
round_trip_test_compressed::<false>(blobs, true).await?;
|
||||
round_trip_test_compressed::<true>(blobs, true).await?;
|
||||
round_trip_test(blobs).await?;
|
||||
round_trip_test_compressed(blobs, true).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_really_big_array() -> Result<(), Error> {
|
||||
async fn test_really_big_array() -> anyhow::Result<()> {
|
||||
let blobs = &[
|
||||
b"test".to_vec(),
|
||||
random_array(10 * PAGE_SZ),
|
||||
@@ -611,25 +554,22 @@ pub(crate) mod tests {
|
||||
vec![0xf3; 24 * PAGE_SZ],
|
||||
b"foobar".to_vec(),
|
||||
];
|
||||
round_trip_test::<false>(blobs).await?;
|
||||
round_trip_test::<true>(blobs).await?;
|
||||
round_trip_test_compressed::<false>(blobs, true).await?;
|
||||
round_trip_test_compressed::<true>(blobs, true).await?;
|
||||
round_trip_test(blobs).await?;
|
||||
round_trip_test_compressed(blobs, true).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_arrays_inc() -> Result<(), Error> {
|
||||
async fn test_arrays_inc() -> anyhow::Result<()> {
|
||||
let blobs = (0..PAGE_SZ / 8)
|
||||
.map(|v| random_array(v * 16))
|
||||
.collect::<Vec<_>>();
|
||||
round_trip_test::<false>(&blobs).await?;
|
||||
round_trip_test::<true>(&blobs).await?;
|
||||
round_trip_test(&blobs).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_arrays_random_size() -> Result<(), Error> {
|
||||
async fn test_arrays_random_size() -> anyhow::Result<()> {
|
||||
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
|
||||
let blobs = (0..1024)
|
||||
.map(|_| {
|
||||
@@ -641,20 +581,18 @@ pub(crate) mod tests {
|
||||
random_array(sz.into())
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
round_trip_test::<false>(&blobs).await?;
|
||||
round_trip_test::<true>(&blobs).await?;
|
||||
round_trip_test(&blobs).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_arrays_page_boundary() -> Result<(), Error> {
|
||||
async fn test_arrays_page_boundary() -> anyhow::Result<()> {
|
||||
let blobs = &[
|
||||
random_array(PAGE_SZ - 4),
|
||||
random_array(PAGE_SZ - 4),
|
||||
random_array(PAGE_SZ - 4),
|
||||
];
|
||||
round_trip_test::<false>(blobs).await?;
|
||||
round_trip_test::<true>(blobs).await?;
|
||||
round_trip_test(blobs).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,14 +4,12 @@
|
||||
|
||||
use std::ops::Deref;
|
||||
|
||||
use bytes::Bytes;
|
||||
|
||||
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, FileId, PAGE_SZ, PageReadGuard, PageWriteGuard, ReadBufResult};
|
||||
#[cfg(test)]
|
||||
use crate::virtual_file::IoBufferMut;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::virtual_file::{IoBuffer, VirtualFile};
|
||||
|
||||
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
|
||||
/// blocks, using the page cache
|
||||
@@ -247,17 +245,17 @@ pub trait BlockWriter {
|
||||
/// 'buf' must be of size PAGE_SZ. Returns the block number the page was
|
||||
/// written to.
|
||||
///
|
||||
fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
|
||||
fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error>;
|
||||
}
|
||||
|
||||
///
|
||||
/// A simple in-memory buffer of blocks.
|
||||
///
|
||||
pub struct BlockBuf {
|
||||
pub blocks: Vec<Bytes>,
|
||||
pub blocks: Vec<IoBuffer>,
|
||||
}
|
||||
impl BlockWriter for BlockBuf {
|
||||
fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
|
||||
fn write_blk(&mut self, buf: IoBuffer) -> Result<u32, std::io::Error> {
|
||||
assert!(buf.len() == PAGE_SZ);
|
||||
let blknum = self.blocks.len();
|
||||
self.blocks.push(buf);
|
||||
|
||||
@@ -25,7 +25,7 @@ use std::{io, result};
|
||||
|
||||
use async_stream::try_stream;
|
||||
use byteorder::{BE, ReadBytesExt};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use bytes::BufMut;
|
||||
use either::Either;
|
||||
use futures::{Stream, StreamExt};
|
||||
use hex;
|
||||
@@ -34,6 +34,7 @@ use tracing::error;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::tenant::block_io::{BlockReader, BlockWriter};
|
||||
use crate::virtual_file::{IoBuffer, IoBufferMut, owned_buffers_io::write::Buffer};
|
||||
|
||||
// The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
|
||||
pub const VALUE_SZ: usize = 5;
|
||||
@@ -787,12 +788,12 @@ impl<const L: usize> BuildNode<L> {
|
||||
///
|
||||
/// Serialize the node to on-disk format.
|
||||
///
|
||||
fn pack(&self) -> Bytes {
|
||||
fn pack(&self) -> IoBuffer {
|
||||
assert!(self.keys.len() == self.num_children as usize * self.suffix_len);
|
||||
assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
|
||||
assert!(self.num_children > 0);
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
|
||||
|
||||
buf.put_u16(self.num_children);
|
||||
buf.put_u8(self.level);
|
||||
@@ -805,7 +806,7 @@ impl<const L: usize> BuildNode<L> {
|
||||
assert!(buf.len() == self.size);
|
||||
|
||||
assert!(buf.len() <= PAGE_SZ);
|
||||
buf.resize(PAGE_SZ, 0);
|
||||
buf.extend_with(0, PAGE_SZ - buf.len());
|
||||
buf.freeze()
|
||||
}
|
||||
|
||||
@@ -839,7 +840,7 @@ pub(crate) mod tests {
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub(crate) struct TestDisk {
|
||||
blocks: Vec<Bytes>,
|
||||
blocks: Vec<IoBuffer>,
|
||||
}
|
||||
impl TestDisk {
|
||||
fn new() -> Self {
|
||||
@@ -857,7 +858,7 @@ pub(crate) mod tests {
|
||||
}
|
||||
}
|
||||
impl BlockWriter for &mut TestDisk {
|
||||
fn write_blk(&mut self, buf: Bytes) -> io::Result<u32> {
|
||||
fn write_blk(&mut self, buf: IoBuffer) -> io::Result<u32> {
|
||||
let blknum = self.blocks.len();
|
||||
self.blocks.push(buf);
|
||||
Ok(blknum as u32)
|
||||
|
||||
@@ -79,9 +79,9 @@ impl EphemeralFile {
|
||||
VirtualFile::open_with_options_v2(
|
||||
&filename,
|
||||
virtual_file::OpenOptions::new()
|
||||
.create_new(true)
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true),
|
||||
.write(true),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
@@ -98,6 +98,7 @@ impl EphemeralFile {
|
||||
file: file.clone(),
|
||||
buffered_writer: BufferedWriter::new(
|
||||
file,
|
||||
0,
|
||||
|| IoBufferMut::with_capacity(TAIL_SZ),
|
||||
gate.enter()?,
|
||||
cancel.child_token(),
|
||||
@@ -130,6 +131,14 @@ impl OwnedAsyncWriter for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter
|
||||
> + Send {
|
||||
self.inner.write_all_at(buf, offset, ctx)
|
||||
}
|
||||
|
||||
fn set_len(
|
||||
&self,
|
||||
len: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> impl Future<Output = std::io::Result<()>> + Send {
|
||||
self.inner.set_len(len, ctx)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
|
||||
|
||||
@@ -91,9 +91,7 @@ pub async fn download_layer_file<'a>(
|
||||
);
|
||||
|
||||
let temp_file = TempVirtualFile::new(
|
||||
// Not _v2 yet which is sensitive to virtual_file_io_mode.
|
||||
// That'll happen in PR https://github.com/neondatabase/neon/pull/11558
|
||||
VirtualFile::open_with_options(
|
||||
VirtualFile::open_with_options_v2(
|
||||
&temp_file_path,
|
||||
virtual_file::OpenOptions::new()
|
||||
.create_new(true)
|
||||
@@ -197,6 +195,7 @@ async fn download_object(
|
||||
let dst_path = destination_file.path().to_owned();
|
||||
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
|
||||
destination_file,
|
||||
0,
|
||||
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
|
||||
gate.enter().map_err(|_| DownloadError::Cancelled)?,
|
||||
cancel.child_token(),
|
||||
@@ -219,10 +218,15 @@ async fn download_object(
|
||||
FlushTaskError::Cancelled => DownloadError::Cancelled,
|
||||
})?;
|
||||
}
|
||||
let inner = buffered.shutdown(ctx).await.map_err(|e| match e {
|
||||
FlushTaskError::Cancelled => DownloadError::Cancelled,
|
||||
})?;
|
||||
Ok(inner)
|
||||
buffered
|
||||
.shutdown(
|
||||
owned_buffers_io::write::BufferedWriterShutdownMode::PadThenTruncate,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
FlushTaskError::Cancelled => DownloadError::Cancelled,
|
||||
})
|
||||
}
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -1521,12 +1521,11 @@ async fn load_heatmap(
|
||||
path: &Utf8PathBuf,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<HeatMapTenant>, anyhow::Error> {
|
||||
let mut file = match VirtualFile::open(path, ctx).await {
|
||||
Ok(file) => file,
|
||||
let st = match VirtualFile::read_to_string(path, ctx).await {
|
||||
Ok(st) => st,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
|
||||
Err(e) => Err(e)?,
|
||||
};
|
||||
let st = file.read_to_string(ctx).await?;
|
||||
let htm = serde_json::from_str(&st)?;
|
||||
Ok(Some(htm))
|
||||
}
|
||||
|
||||
@@ -29,7 +29,6 @@
|
||||
//!
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::str::FromStr;
|
||||
@@ -52,6 +51,7 @@ use tokio_epoll_uring::IoBuf;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::bin_ser::SerializeError;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -75,7 +75,8 @@ use crate::tenant::vectored_blob_io::{
|
||||
};
|
||||
use crate::virtual_file::TempVirtualFile;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
|
||||
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
|
||||
///
|
||||
@@ -113,6 +114,15 @@ impl From<&DeltaLayer> for Summary {
|
||||
}
|
||||
|
||||
impl Summary {
|
||||
/// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
|
||||
pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
|
||||
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
|
||||
Self::ser_into(self, &mut buf)?;
|
||||
// Pad zeroes to the buffer so the length is a multiple of the alignment.
|
||||
buf.extend_with(0, buf.capacity() - buf.len());
|
||||
Ok(buf.freeze())
|
||||
}
|
||||
|
||||
pub(super) fn expected(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -392,7 +402,7 @@ struct DeltaLayerWriterInner {
|
||||
|
||||
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
|
||||
|
||||
blob_writer: BlobWriter<true>,
|
||||
blob_writer: BlobWriter<TempVirtualFile>,
|
||||
|
||||
// Number of key-lsns in the layer.
|
||||
num_keys: usize,
|
||||
@@ -416,16 +426,29 @@ impl DeltaLayerWriterInner {
|
||||
// Create the file initially with a temporary filename. We don't know
|
||||
// the end key yet, so we cannot form the final filename yet. We will
|
||||
// rename it when we're done.
|
||||
//
|
||||
// Note: This overwrites any existing file. There shouldn't be any.
|
||||
// FIXME: throw an error instead?
|
||||
let path =
|
||||
DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
|
||||
let file = TempVirtualFile::new(
|
||||
VirtualFile::open_with_options_v2(
|
||||
&path,
|
||||
virtual_file::OpenOptions::new()
|
||||
.create_new(true)
|
||||
.write(true),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
gate.enter()?,
|
||||
);
|
||||
|
||||
let mut file = TempVirtualFile::new(VirtualFile::create(&path, ctx).await?, gate.enter()?);
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
|
||||
// Start at PAGE_SZ, make room for the header block
|
||||
let blob_writer = BlobWriter::new(
|
||||
file,
|
||||
PAGE_SZ as u64,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
info_span!(parent: None, "delta_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
|
||||
)?;
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -519,15 +542,24 @@ impl DeltaLayerWriterInner {
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
|
||||
|
||||
let mut file = self.blob_writer.into_inner(ctx).await?;
|
||||
let file = self
|
||||
.blob_writer
|
||||
.shutdown(
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Write out the index
|
||||
let (index_root_blk, block_buf) = self.tree.finish()?;
|
||||
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
|
||||
.await?;
|
||||
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
|
||||
// Should we just replace BlockBuf::blocks with one big buffer
|
||||
for buf in block_buf.blocks {
|
||||
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
|
||||
res?;
|
||||
offset += PAGE_SZ as u64;
|
||||
}
|
||||
assert!(self.lsn_range.start < self.lsn_range.end);
|
||||
// Fill in the summary on blk 0
|
||||
@@ -542,11 +574,9 @@ impl DeltaLayerWriterInner {
|
||||
index_root_blk,
|
||||
};
|
||||
|
||||
let mut buf = Vec::with_capacity(PAGE_SZ);
|
||||
// TODO: could use smallvec here but it's a pain with Slice<T>
|
||||
Summary::ser_into(&summary, &mut buf)?;
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
|
||||
// Writes summary at the first block (offset 0).
|
||||
let buf = summary.ser_into_page()?;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
|
||||
res?;
|
||||
|
||||
let metadata = file
|
||||
@@ -738,7 +768,7 @@ impl DeltaLayer {
|
||||
where
|
||||
F: Fn(Summary) -> Summary,
|
||||
{
|
||||
let mut file = VirtualFile::open_with_options(
|
||||
let file = VirtualFile::open_with_options_v2(
|
||||
path,
|
||||
virtual_file::OpenOptions::new().read(true).write(true),
|
||||
ctx,
|
||||
@@ -755,11 +785,8 @@ impl DeltaLayer {
|
||||
|
||||
let new_summary = rewrite(actual_summary);
|
||||
|
||||
let mut buf = Vec::with_capacity(PAGE_SZ);
|
||||
// TODO: could use smallvec here, but it's a pain with Slice<T>
|
||||
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
|
||||
let buf = new_summary.ser_into_page().context("serialize")?;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
|
||||
res?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -1415,6 +1442,19 @@ impl DeltaLayerInner {
|
||||
}
|
||||
|
||||
pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
|
||||
self.iter_with_options(
|
||||
ctx,
|
||||
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
|
||||
1024, // The default value. Unit tests might use a different value
|
||||
)
|
||||
}
|
||||
|
||||
pub fn iter_with_options<'a>(
|
||||
&'a self,
|
||||
ctx: &'a RequestContext,
|
||||
max_read_size: u64,
|
||||
max_batch_size: usize,
|
||||
) -> DeltaLayerIterator<'a> {
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
|
||||
@@ -1424,10 +1464,7 @@ impl DeltaLayerInner {
|
||||
index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
|
||||
key_values_batch: std::collections::VecDeque::new(),
|
||||
is_end: false,
|
||||
planner: StreamingVectoredReadPlanner::new(
|
||||
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
|
||||
1024, // The default value. Unit tests might use a different value
|
||||
),
|
||||
planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -27,7 +27,6 @@
|
||||
//! actual page images are stored in the "values" part.
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::fs::File;
|
||||
use std::io::SeekFrom;
|
||||
use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::str::FromStr;
|
||||
@@ -50,6 +49,7 @@ use tokio_stream::StreamExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::bin_ser::SerializeError;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -73,7 +73,8 @@ use crate::tenant::vectored_blob_io::{
|
||||
};
|
||||
use crate::virtual_file::TempVirtualFile;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::virtual_file::owned_buffers_io::write::{Buffer, BufferedWriterShutdownMode};
|
||||
use crate::virtual_file::{self, IoBuffer, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
|
||||
///
|
||||
@@ -112,6 +113,15 @@ impl From<&ImageLayer> for Summary {
|
||||
}
|
||||
|
||||
impl Summary {
|
||||
/// Serializes the summary header into an aligned buffer of lenth `PAGE_SZ`.
|
||||
pub fn ser_into_page(&self) -> Result<IoBuffer, SerializeError> {
|
||||
let mut buf = IoBufferMut::with_capacity(PAGE_SZ);
|
||||
Self::ser_into(self, &mut buf)?;
|
||||
// Pad zeroes to the buffer so the length is a multiple of the alignment.
|
||||
buf.extend_with(0, buf.capacity() - buf.len());
|
||||
Ok(buf.freeze())
|
||||
}
|
||||
|
||||
pub(super) fn expected(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -353,7 +363,7 @@ impl ImageLayer {
|
||||
where
|
||||
F: Fn(Summary) -> Summary,
|
||||
{
|
||||
let mut file = VirtualFile::open_with_options(
|
||||
let file = VirtualFile::open_with_options_v2(
|
||||
path,
|
||||
virtual_file::OpenOptions::new().read(true).write(true),
|
||||
ctx,
|
||||
@@ -370,11 +380,8 @@ impl ImageLayer {
|
||||
|
||||
let new_summary = rewrite(actual_summary);
|
||||
|
||||
let mut buf = Vec::with_capacity(PAGE_SZ);
|
||||
// TODO: could use smallvec here but it's a pain with Slice<T>
|
||||
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
|
||||
let buf = new_summary.ser_into_page().context("serialize")?;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
|
||||
res?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -678,6 +685,19 @@ impl ImageLayerInner {
|
||||
}
|
||||
|
||||
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
|
||||
self.iter_with_options(
|
||||
ctx,
|
||||
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
|
||||
1024, // The default value. Unit tests might use a different value
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn iter_with_options<'a>(
|
||||
&'a self,
|
||||
ctx: &'a RequestContext,
|
||||
max_read_size: u64,
|
||||
max_batch_size: usize,
|
||||
) -> ImageLayerIterator<'a> {
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
|
||||
@@ -687,10 +707,7 @@ impl ImageLayerInner {
|
||||
index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
|
||||
key_values_batch: VecDeque::new(),
|
||||
is_end: false,
|
||||
planner: StreamingVectoredReadPlanner::new(
|
||||
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
|
||||
1024, // The default value. Unit tests might use a different value
|
||||
),
|
||||
planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -743,7 +760,7 @@ struct ImageLayerWriterInner {
|
||||
// Number of keys in the layer.
|
||||
num_keys: usize,
|
||||
|
||||
blob_writer: BlobWriter<false>,
|
||||
blob_writer: BlobWriter<TempVirtualFile>,
|
||||
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -777,20 +794,27 @@ impl ImageLayerWriterInner {
|
||||
},
|
||||
);
|
||||
trace!("creating image layer {}", path);
|
||||
let mut file = TempVirtualFile::new(
|
||||
VirtualFile::open_with_options(
|
||||
let file = TempVirtualFile::new(
|
||||
VirtualFile::open_with_options_v2(
|
||||
&path,
|
||||
virtual_file::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true),
|
||||
.create_new(true)
|
||||
.write(true),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
gate.enter()?,
|
||||
);
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
|
||||
|
||||
// Start at `PAGE_SZ` to make room for the header block.
|
||||
let blob_writer = BlobWriter::new(
|
||||
file,
|
||||
PAGE_SZ as u64,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
info_span!(parent: None, "image_layer_writer_flush_task", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %path),
|
||||
)?;
|
||||
|
||||
// Initialize the b-tree index builder
|
||||
let block_buf = BlockBuf::new();
|
||||
@@ -918,15 +942,24 @@ impl ImageLayerWriterInner {
|
||||
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
|
||||
};
|
||||
|
||||
let mut file = self.blob_writer.into_inner(ctx).await?;
|
||||
let file = self
|
||||
.blob_writer
|
||||
.shutdown(
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(PAGE_SZ),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Write out the index
|
||||
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
|
||||
.await?;
|
||||
let mut offset = index_start_blk as u64 * PAGE_SZ as u64;
|
||||
let (index_root_blk, block_buf) = self.tree.finish()?;
|
||||
|
||||
// TODO(yuchen): https://github.com/neondatabase/neon/issues/10092
|
||||
// Should we just replace BlockBuf::blocks with one big buffer?
|
||||
for buf in block_buf.blocks {
|
||||
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), offset, ctx).await;
|
||||
res?;
|
||||
offset += PAGE_SZ as u64;
|
||||
}
|
||||
|
||||
let final_key_range = if let Some(end_key) = end_key {
|
||||
@@ -947,11 +980,9 @@ impl ImageLayerWriterInner {
|
||||
index_root_blk,
|
||||
};
|
||||
|
||||
let mut buf = Vec::with_capacity(PAGE_SZ);
|
||||
// TODO: could use smallvec here but it's a pain with Slice<T>
|
||||
Summary::ser_into(&summary, &mut buf)?;
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
let (_buf, res) = file.write_all(buf.slice_len(), ctx).await;
|
||||
// Writes summary at the first block (offset 0).
|
||||
let buf = summary.ser_into_page()?;
|
||||
let (_buf, res) = file.write_all_at(buf.slice_len(), 0, ctx).await;
|
||||
res?;
|
||||
|
||||
let metadata = file
|
||||
|
||||
@@ -19,6 +19,7 @@ pub(crate) enum LayerRef<'a> {
|
||||
}
|
||||
|
||||
impl<'a> LayerRef<'a> {
|
||||
#[allow(dead_code)]
|
||||
fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
|
||||
match self {
|
||||
Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
|
||||
@@ -26,6 +27,22 @@ impl<'a> LayerRef<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
fn iter_with_options(
|
||||
self,
|
||||
ctx: &'a RequestContext,
|
||||
max_read_size: u64,
|
||||
max_batch_size: usize,
|
||||
) -> LayerIterRef<'a> {
|
||||
match self {
|
||||
Self::Image(x) => {
|
||||
LayerIterRef::Image(x.iter_with_options(ctx, max_read_size, max_batch_size))
|
||||
}
|
||||
Self::Delta(x) => {
|
||||
LayerIterRef::Delta(x.iter_with_options(ctx, max_read_size, max_batch_size))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn layer_dbg_info(&self) -> String {
|
||||
match self {
|
||||
Self::Image(x) => x.layer_dbg_info(),
|
||||
@@ -66,6 +83,8 @@ pub(crate) enum IteratorWrapper<'a> {
|
||||
first_key_lower_bound: (Key, Lsn),
|
||||
layer: LayerRef<'a>,
|
||||
source_desc: Arc<PersistentLayerKey>,
|
||||
max_read_size: u64,
|
||||
max_batch_size: usize,
|
||||
},
|
||||
Loaded {
|
||||
iter: PeekableLayerIterRef<'a>,
|
||||
@@ -146,6 +165,8 @@ impl<'a> IteratorWrapper<'a> {
|
||||
pub fn create_from_image_layer(
|
||||
image_layer: &'a ImageLayerInner,
|
||||
ctx: &'a RequestContext,
|
||||
max_read_size: u64,
|
||||
max_batch_size: usize,
|
||||
) -> Self {
|
||||
Self::NotLoaded {
|
||||
layer: LayerRef::Image(image_layer),
|
||||
@@ -157,12 +178,16 @@ impl<'a> IteratorWrapper<'a> {
|
||||
is_delta: false,
|
||||
}
|
||||
.into(),
|
||||
max_read_size,
|
||||
max_batch_size,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_from_delta_layer(
|
||||
delta_layer: &'a DeltaLayerInner,
|
||||
ctx: &'a RequestContext,
|
||||
max_read_size: u64,
|
||||
max_batch_size: usize,
|
||||
) -> Self {
|
||||
Self::NotLoaded {
|
||||
layer: LayerRef::Delta(delta_layer),
|
||||
@@ -174,6 +199,8 @@ impl<'a> IteratorWrapper<'a> {
|
||||
is_delta: true,
|
||||
}
|
||||
.into(),
|
||||
max_read_size,
|
||||
max_batch_size,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -204,11 +231,13 @@ impl<'a> IteratorWrapper<'a> {
|
||||
first_key_lower_bound,
|
||||
layer,
|
||||
source_desc,
|
||||
max_read_size,
|
||||
max_batch_size,
|
||||
} = self
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
let iter = layer.iter(ctx);
|
||||
let iter = layer.iter_with_options(ctx, *max_read_size, *max_batch_size);
|
||||
let iter = PeekableLayerIterRef::create(iter).await?;
|
||||
if let Some((k1, l1, _)) = iter.peek() {
|
||||
let (k2, l2) = first_key_lower_bound;
|
||||
@@ -293,21 +322,41 @@ impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
|
||||
}
|
||||
|
||||
impl<'a> MergeIterator<'a> {
|
||||
pub fn create_with_options(
|
||||
deltas: &[&'a DeltaLayerInner],
|
||||
images: &[&'a ImageLayerInner],
|
||||
ctx: &'a RequestContext,
|
||||
max_read_size: u64,
|
||||
max_batch_size: usize,
|
||||
) -> Self {
|
||||
let mut heap = Vec::with_capacity(images.len() + deltas.len());
|
||||
for image in images {
|
||||
heap.push(IteratorWrapper::create_from_image_layer(
|
||||
image,
|
||||
ctx,
|
||||
max_read_size,
|
||||
max_batch_size,
|
||||
));
|
||||
}
|
||||
for delta in deltas {
|
||||
heap.push(IteratorWrapper::create_from_delta_layer(
|
||||
delta,
|
||||
ctx,
|
||||
max_read_size,
|
||||
max_batch_size,
|
||||
));
|
||||
}
|
||||
Self {
|
||||
heap: BinaryHeap::from(heap),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create(
|
||||
deltas: &[&'a DeltaLayerInner],
|
||||
images: &[&'a ImageLayerInner],
|
||||
ctx: &'a RequestContext,
|
||||
) -> Self {
|
||||
let mut heap = Vec::with_capacity(images.len() + deltas.len());
|
||||
for image in images {
|
||||
heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
|
||||
}
|
||||
for delta in deltas {
|
||||
heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
|
||||
}
|
||||
Self {
|
||||
heap: BinaryHeap::from(heap),
|
||||
}
|
||||
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
|
||||
}
|
||||
|
||||
pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
|
||||
|
||||
@@ -2828,6 +2828,41 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if the memory usage is within the limit.
|
||||
async fn check_memory_usage(
|
||||
self: &Arc<Self>,
|
||||
layer_selection: &[Layer],
|
||||
) -> Result<(), CompactionError> {
|
||||
let mut estimated_memory_usage_mb = 0.0;
|
||||
let mut num_image_layers = 0;
|
||||
let mut num_delta_layers = 0;
|
||||
let target_layer_size_bytes = 256 * 1024 * 1024;
|
||||
for layer in layer_selection {
|
||||
let layer_desc = layer.layer_desc();
|
||||
if layer_desc.is_delta() {
|
||||
// Delta layers at most have 1MB buffer; 3x to make it safe (there're deltas as large as 16KB).
|
||||
// Multiply the layer size so that tests can pass.
|
||||
estimated_memory_usage_mb +=
|
||||
3.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
|
||||
num_delta_layers += 1;
|
||||
} else {
|
||||
// Image layers at most have 1MB buffer but it might be compressed; assume 5x compression ratio.
|
||||
estimated_memory_usage_mb +=
|
||||
5.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
|
||||
num_image_layers += 1;
|
||||
}
|
||||
}
|
||||
if estimated_memory_usage_mb > 1024.0 {
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"estimated memory usage is too high: {}MB, giving up compaction; num_image_layers={}, num_delta_layers={}",
|
||||
estimated_memory_usage_mb,
|
||||
num_image_layers,
|
||||
num_delta_layers
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
|
||||
/// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
|
||||
/// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
|
||||
@@ -3264,6 +3299,17 @@ impl Timeline {
|
||||
self.check_compaction_space(&job_desc.selected_layers)
|
||||
.await?;
|
||||
|
||||
self.check_memory_usage(&job_desc.selected_layers).await?;
|
||||
if job_desc.selected_layers.len() > 100
|
||||
&& job_desc.rewrite_layers.len() as f64 >= job_desc.selected_layers.len() as f64 * 0.7
|
||||
{
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"too many layers to rewrite: {} / {}, giving up compaction",
|
||||
job_desc.rewrite_layers.len(),
|
||||
job_desc.selected_layers.len()
|
||||
)));
|
||||
}
|
||||
|
||||
// Generate statistics for the compaction
|
||||
for layer in &job_desc.selected_layers {
|
||||
let desc = layer.layer_desc();
|
||||
@@ -3359,7 +3405,13 @@ impl Timeline {
|
||||
.context("failed to collect gc compaction keyspace")
|
||||
.map_err(CompactionError::Other)?;
|
||||
let mut merge_iter = FilterIterator::create(
|
||||
MergeIterator::create(&delta_layers, &image_layers, ctx),
|
||||
MergeIterator::create_with_options(
|
||||
&delta_layers,
|
||||
&image_layers,
|
||||
ctx,
|
||||
128 * 8192, /* 1MB buffer for each of the inner iterators */
|
||||
128,
|
||||
),
|
||||
dense_ks,
|
||||
sparse_ks,
|
||||
)
|
||||
|
||||
@@ -507,7 +507,9 @@ impl<'a> VectoredBlobReader<'a> {
|
||||
|
||||
for (blob_start, meta) in blobs_at.iter().copied() {
|
||||
let header_start = (blob_start - read.start) as usize;
|
||||
let header = Header::decode(&buf[header_start..])?;
|
||||
let header = Header::decode(&buf[header_start..]).map_err(|anyhow_err| {
|
||||
std::io::Error::new(std::io::ErrorKind::InvalidData, anyhow_err)
|
||||
})?;
|
||||
let data_start = header_start + header.header_len;
|
||||
let end = data_start + header.data_len;
|
||||
let compression_bits = header.compression_bits;
|
||||
@@ -662,7 +664,6 @@ impl StreamingVectoredReadPlanner {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use anyhow::Error;
|
||||
|
||||
use super::super::blob_io::tests::{random_array, write_maybe_compressed};
|
||||
use super::*;
|
||||
@@ -945,13 +946,16 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
async fn round_trip_test_compressed(blobs: &[Vec<u8>], compression: bool) -> Result<(), Error> {
|
||||
async fn round_trip_test_compressed(
|
||||
blobs: &[Vec<u8>],
|
||||
compression: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let ctx =
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
|
||||
let (_temp_dir, pathbuf, offsets) =
|
||||
write_maybe_compressed::<true>(blobs, compression, &ctx).await?;
|
||||
write_maybe_compressed(blobs, compression, &ctx).await?;
|
||||
|
||||
let file = VirtualFile::open(&pathbuf, &ctx).await?;
|
||||
let file = VirtualFile::open_v2(&pathbuf, &ctx).await?;
|
||||
let file_len = std::fs::metadata(&pathbuf)?.len();
|
||||
|
||||
// Multiply by two (compressed data might need more space), and add a few bytes for the header
|
||||
@@ -997,7 +1001,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_really_big_array() -> Result<(), Error> {
|
||||
async fn test_really_big_array() -> anyhow::Result<()> {
|
||||
let blobs = &[
|
||||
b"test".to_vec(),
|
||||
random_array(10 * PAGE_SZ),
|
||||
@@ -1012,7 +1016,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_arrays_inc() -> Result<(), Error> {
|
||||
async fn test_arrays_inc() -> anyhow::Result<()> {
|
||||
let blobs = (0..PAGE_SZ / 8)
|
||||
.map(|v| random_array(v * 16))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@@ -12,10 +12,11 @@
|
||||
//! src/backend/storage/file/fd.c
|
||||
//!
|
||||
use std::fs::File;
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
#[cfg(target_os = "linux")]
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
use std::sync::LazyLock;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
|
||||
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -96,69 +97,38 @@ impl VirtualFile {
|
||||
Self::open_with_options_v2(path.as_ref(), OpenOptions::new().read(true), ctx).await
|
||||
}
|
||||
|
||||
pub async fn create<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let inner = VirtualFileInner::create(path, ctx).await?;
|
||||
Ok(VirtualFile {
|
||||
inner,
|
||||
_mode: IoMode::Buffered,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn create_v2<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
VirtualFile::open_with_options_v2(
|
||||
path.as_ref(),
|
||||
OpenOptions::new().write(true).create(true).truncate(true),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn open_with_options<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
open_options: &OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
|
||||
Ok(VirtualFile {
|
||||
inner,
|
||||
_mode: IoMode::Buffered,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn open_with_options_v2<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
open_options: &OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Self, std::io::Error> {
|
||||
let file = match get_io_mode() {
|
||||
IoMode::Buffered => {
|
||||
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
|
||||
VirtualFile {
|
||||
inner,
|
||||
_mode: IoMode::Buffered,
|
||||
}
|
||||
}
|
||||
let mode = get_io_mode();
|
||||
let set_o_direct = match (mode, open_options.is_write()) {
|
||||
(IoMode::Buffered, _) => false,
|
||||
#[cfg(target_os = "linux")]
|
||||
IoMode::Direct => {
|
||||
let inner = VirtualFileInner::open_with_options(
|
||||
path,
|
||||
open_options.clone().custom_flags(nix::libc::O_DIRECT),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
VirtualFile {
|
||||
inner,
|
||||
_mode: IoMode::Direct,
|
||||
}
|
||||
}
|
||||
(IoMode::Direct, false) => true,
|
||||
#[cfg(target_os = "linux")]
|
||||
(IoMode::Direct, true) => false,
|
||||
#[cfg(target_os = "linux")]
|
||||
(IoMode::DirectRw, _) => true,
|
||||
};
|
||||
Ok(file)
|
||||
let open_options = open_options.clone();
|
||||
let open_options = if set_o_direct {
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
let mut open_options = open_options;
|
||||
open_options.custom_flags(nix::libc::O_DIRECT);
|
||||
open_options
|
||||
}
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
unreachable!(
|
||||
"O_DIRECT is not supported on this platform, IoMode's that result in set_o_direct=true shouldn't even be defined"
|
||||
);
|
||||
} else {
|
||||
open_options
|
||||
};
|
||||
let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?;
|
||||
Ok(VirtualFile { inner, _mode: mode })
|
||||
}
|
||||
|
||||
pub fn path(&self) -> &Utf8Path {
|
||||
@@ -187,18 +157,14 @@ impl VirtualFile {
|
||||
self.inner.sync_data().await
|
||||
}
|
||||
|
||||
pub async fn set_len(&self, len: u64, ctx: &RequestContext) -> Result<(), Error> {
|
||||
self.inner.set_len(len, ctx).await
|
||||
}
|
||||
|
||||
pub async fn metadata(&self) -> Result<Metadata, Error> {
|
||||
self.inner.metadata().await
|
||||
}
|
||||
|
||||
pub fn remove(self) {
|
||||
self.inner.remove();
|
||||
}
|
||||
|
||||
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
self.inner.seek(pos).await
|
||||
}
|
||||
|
||||
pub async fn read_exact_at<Buf>(
|
||||
&self,
|
||||
slice: Slice<Buf>,
|
||||
@@ -229,25 +195,31 @@ impl VirtualFile {
|
||||
self.inner.write_all_at(buf, offset, ctx).await
|
||||
}
|
||||
|
||||
pub async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<Buf>,
|
||||
pub(crate) async fn read_to_string<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<usize, Error>) {
|
||||
self.inner.write_all(buf, ctx).await
|
||||
}
|
||||
|
||||
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
|
||||
self.inner.read_to_end(buf, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn read_to_string(
|
||||
&mut self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<String, anyhow::Error> {
|
||||
) -> std::io::Result<String> {
|
||||
let file = VirtualFile::open(path, ctx).await?; // TODO: open_v2
|
||||
let mut buf = Vec::new();
|
||||
self.read_to_end(&mut buf, ctx).await?;
|
||||
Ok(String::from_utf8(buf)?)
|
||||
let mut tmp = vec![0; 128];
|
||||
let mut pos: u64 = 0;
|
||||
loop {
|
||||
let slice = tmp.slice(..128);
|
||||
let (slice, res) = file.inner.read_at(slice, pos, ctx).await;
|
||||
match res {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
pos += n as u64;
|
||||
buf.extend_from_slice(&slice[..n]);
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
tmp = slice.into_inner();
|
||||
}
|
||||
String::from_utf8(buf).map_err(|_| {
|
||||
std::io::Error::new(ErrorKind::InvalidData, "file contents are not valid UTF-8")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -294,9 +266,6 @@ pub struct VirtualFileInner {
|
||||
/// belongs to a different VirtualFile.
|
||||
handle: RwLock<SlotHandle>,
|
||||
|
||||
/// Current file position
|
||||
pos: u64,
|
||||
|
||||
/// File path and options to use to open it.
|
||||
///
|
||||
/// Note: this only contains the options needed to re-open it. For example,
|
||||
@@ -561,21 +530,7 @@ impl VirtualFileInner {
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<VirtualFileInner, std::io::Error> {
|
||||
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
|
||||
}
|
||||
|
||||
/// Create a new file for writing. If the file exists, it will be truncated.
|
||||
/// Like File::create.
|
||||
pub async fn create<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<VirtualFileInner, std::io::Error> {
|
||||
Self::open_with_options(
|
||||
path.as_ref(),
|
||||
OpenOptions::new().write(true).create(true).truncate(true),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true).clone(), ctx).await
|
||||
}
|
||||
|
||||
/// Open a file with given options.
|
||||
@@ -585,7 +540,7 @@ impl VirtualFileInner {
|
||||
/// on the first time. Make sure that's sane!
|
||||
pub async fn open_with_options<P: AsRef<Utf8Path>>(
|
||||
path: P,
|
||||
open_options: &OpenOptions,
|
||||
open_options: OpenOptions,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<VirtualFileInner, std::io::Error> {
|
||||
let path = path.as_ref();
|
||||
@@ -610,7 +565,6 @@ impl VirtualFileInner {
|
||||
|
||||
let vfile = VirtualFileInner {
|
||||
handle: RwLock::new(handle),
|
||||
pos: 0,
|
||||
path: path.to_owned(),
|
||||
open_options: reopen_options,
|
||||
};
|
||||
@@ -677,6 +631,13 @@ impl VirtualFileInner {
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn set_len(&self, len: u64, _ctx: &RequestContext) -> Result<(), Error> {
|
||||
with_file!(self, StorageIoOperation::SetLen, |file_guard| {
|
||||
let (_file_guard, res) = io_engine::get().set_len(file_guard, len).await;
|
||||
res.maybe_fatal_err("set_len")
|
||||
})
|
||||
}
|
||||
|
||||
/// Helper function internal to `VirtualFile` that looks up the underlying File,
|
||||
/// opens it and evicts some other File if necessary. The passed parameter is
|
||||
/// assumed to be a function available for the physical `File`.
|
||||
@@ -744,38 +705,6 @@ impl VirtualFileInner {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn remove(self) {
|
||||
let path = self.path.clone();
|
||||
drop(self);
|
||||
std::fs::remove_file(path).expect("failed to remove the virtual file");
|
||||
}
|
||||
|
||||
pub async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
match pos {
|
||||
SeekFrom::Start(offset) => {
|
||||
self.pos = offset;
|
||||
}
|
||||
SeekFrom::End(offset) => {
|
||||
self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
|
||||
.with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
|
||||
}
|
||||
SeekFrom::Current(offset) => {
|
||||
let pos = self.pos as i128 + offset as i128;
|
||||
if pos < 0 {
|
||||
return Err(Error::new(
|
||||
ErrorKind::InvalidInput,
|
||||
"offset would be negative",
|
||||
));
|
||||
}
|
||||
if pos > u64::MAX as i128 {
|
||||
return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
|
||||
}
|
||||
self.pos = pos as u64;
|
||||
}
|
||||
}
|
||||
Ok(self.pos)
|
||||
}
|
||||
|
||||
/// Read the file contents in range `offset..(offset + slice.bytes_total())` into `slice[0..slice.bytes_total()]`.
|
||||
///
|
||||
/// The returned `Slice<Buf>` is equivalent to the input `slice`, i.e., it's the same view into the same buffer.
|
||||
@@ -859,59 +788,7 @@ impl VirtualFileInner {
|
||||
(restore(buf), Ok(()))
|
||||
}
|
||||
|
||||
/// Writes `buf` to the file at the current offset.
|
||||
///
|
||||
/// Panics if there is an uninitialized range in `buf`, as that is most likely a bug in the caller.
|
||||
pub async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<Buf>, Result<usize, Error>) {
|
||||
let buf = buf.into_raw_slice();
|
||||
let bounds = buf.bounds();
|
||||
let restore =
|
||||
|buf: Slice<_>| FullSlice::must_new(Slice::from_buf_bounds(buf.into_inner(), bounds));
|
||||
let nbytes = buf.len();
|
||||
let mut buf = buf;
|
||||
while !buf.is_empty() {
|
||||
let (tmp, res) = self.write(FullSlice::must_new(buf), ctx).await;
|
||||
buf = tmp.into_raw_slice();
|
||||
match res {
|
||||
Ok(0) => {
|
||||
return (
|
||||
restore(buf),
|
||||
Err(Error::new(
|
||||
std::io::ErrorKind::WriteZero,
|
||||
"failed to write whole buffer",
|
||||
)),
|
||||
);
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = buf.slice(n..);
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return (restore(buf), Err(e)),
|
||||
}
|
||||
}
|
||||
(restore(buf), Ok(nbytes))
|
||||
}
|
||||
|
||||
async fn write<B: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<B>,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<B>, Result<usize, std::io::Error>) {
|
||||
let pos = self.pos;
|
||||
let (buf, res) = self.write_at(buf, pos, ctx).await;
|
||||
let n = match res {
|
||||
Ok(n) => n,
|
||||
Err(e) => return (buf, Err(e)),
|
||||
};
|
||||
self.pos += n as u64;
|
||||
(buf, Ok(n))
|
||||
}
|
||||
|
||||
pub(crate) async fn read_at<Buf>(
|
||||
pub(super) async fn read_at<Buf>(
|
||||
&self,
|
||||
buf: tokio_epoll_uring::Slice<Buf>,
|
||||
offset: u64,
|
||||
@@ -939,23 +816,11 @@ impl VirtualFileInner {
|
||||
})
|
||||
}
|
||||
|
||||
/// The function aborts the process if the error is fatal.
|
||||
async fn write_at<B: IoBuf + Send>(
|
||||
&self,
|
||||
buf: FullSlice<B>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<B>, Result<usize, Error>) {
|
||||
let (slice, result) = self.write_at_inner(buf, offset, ctx).await;
|
||||
let result = result.maybe_fatal_err("write_at");
|
||||
(slice, result)
|
||||
}
|
||||
|
||||
async fn write_at_inner<B: IoBuf + Send>(
|
||||
&self,
|
||||
buf: FullSlice<B>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> (FullSlice<B>, Result<usize, Error>) {
|
||||
let file_guard = match self.lock_file().await {
|
||||
Ok(file_guard) => file_guard,
|
||||
@@ -964,30 +829,13 @@ impl VirtualFileInner {
|
||||
observe_duration!(StorageIoOperation::Write, {
|
||||
let ((_file_guard, buf), result) =
|
||||
io_engine::get().write_at(file_guard, offset, buf).await;
|
||||
let result = result.maybe_fatal_err("write_at");
|
||||
if let Ok(size) = result {
|
||||
ctx.io_size_metrics().write.add(size.into_u64());
|
||||
}
|
||||
(buf, result)
|
||||
})
|
||||
}
|
||||
|
||||
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
|
||||
let mut tmp = vec![0; 128];
|
||||
loop {
|
||||
let slice = tmp.slice(..128);
|
||||
let (slice, res) = self.read_at(slice, self.pos, ctx).await;
|
||||
match res {
|
||||
Ok(0) => return Ok(()),
|
||||
Ok(n) => {
|
||||
self.pos += n as u64;
|
||||
buf.extend_from_slice(&slice[..n]);
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
tmp = slice.into_inner();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
@@ -1202,19 +1050,6 @@ impl FileGuard {
|
||||
let _ = file.into_raw_fd();
|
||||
res
|
||||
}
|
||||
/// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
|
||||
fn with_std_file_mut<F, R>(&mut self, with: F) -> R
|
||||
where
|
||||
F: FnOnce(&mut File) -> R,
|
||||
{
|
||||
// SAFETY:
|
||||
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
|
||||
// - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
|
||||
let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
|
||||
let res = with(&mut file);
|
||||
let _ = file.into_raw_fd();
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio_epoll_uring::IoFd for FileGuard {
|
||||
@@ -1304,6 +1139,9 @@ impl OwnedAsyncWriter for VirtualFile {
|
||||
) -> (FullSlice<Buf>, std::io::Result<()>) {
|
||||
VirtualFile::write_all_at(self, buf, offset, ctx).await
|
||||
}
|
||||
async fn set_len(&self, len: u64, ctx: &RequestContext) -> std::io::Result<()> {
|
||||
VirtualFile::set_len(self, len, ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
impl OpenFiles {
|
||||
@@ -1368,8 +1206,7 @@ pub(crate) type IoBuffer = AlignedBuffer<ConstAlign<{ get_io_buffer_alignment()
|
||||
pub(crate) type IoPageSlice<'a> =
|
||||
AlignedSlice<'a, PAGE_SZ, ConstAlign<{ get_io_buffer_alignment() }>>;
|
||||
|
||||
static IO_MODE: once_cell::sync::Lazy<AtomicU8> =
|
||||
once_cell::sync::Lazy::new(|| AtomicU8::new(IoMode::preferred() as u8));
|
||||
static IO_MODE: LazyLock<AtomicU8> = LazyLock::new(|| AtomicU8::new(IoMode::preferred() as u8));
|
||||
|
||||
pub fn set_io_mode(mode: IoMode) {
|
||||
IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
|
||||
@@ -1383,7 +1220,6 @@ static SYNC_MODE: AtomicU8 = AtomicU8::new(SyncMode::Sync as u8);
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::io::Write;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -1436,43 +1272,6 @@ mod tests {
|
||||
MaybeVirtualFile::File(file) => file.write_all_at(&buf[..], offset),
|
||||
}
|
||||
}
|
||||
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
|
||||
MaybeVirtualFile::File(file) => file.seek(pos),
|
||||
}
|
||||
}
|
||||
async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => {
|
||||
let (_buf, res) = file.write_all(buf, ctx).await;
|
||||
res.map(|_| ())
|
||||
}
|
||||
MaybeVirtualFile::File(file) => file.write_all(&buf[..]),
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to slurp contents of a file, starting at the current position,
|
||||
// into a string
|
||||
async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> {
|
||||
use std::io::Read;
|
||||
let mut buf = String::new();
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => {
|
||||
let mut buf = Vec::new();
|
||||
file.read_to_end(&mut buf, ctx).await?;
|
||||
return Ok(String::from_utf8(buf).unwrap());
|
||||
}
|
||||
MaybeVirtualFile::File(file) => {
|
||||
file.read_to_string(&mut buf)?;
|
||||
}
|
||||
}
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
// Helper function to slurp a portion of a file into a string
|
||||
async fn read_string_at(
|
||||
@@ -1508,7 +1307,7 @@ mod tests {
|
||||
opts: OpenOptions,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<MaybeVirtualFile, anyhow::Error> {
|
||||
let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?;
|
||||
let vf = VirtualFile::open_with_options_v2(&path, &opts, ctx).await?;
|
||||
Ok(MaybeVirtualFile::VirtualFile(vf))
|
||||
}
|
||||
}
|
||||
@@ -1568,48 +1367,23 @@ mod tests {
|
||||
.await?;
|
||||
|
||||
file_a
|
||||
.write_all(b"foobar".to_vec().slice_len(), &ctx)
|
||||
.write_all_at(IoBuffer::from(b"foobar").slice_len(), 0, &ctx)
|
||||
.await?;
|
||||
|
||||
// cannot read from a file opened in write-only mode
|
||||
let _ = file_a.read_string(&ctx).await.unwrap_err();
|
||||
let _ = file_a.read_string_at(0, 1, &ctx).await.unwrap_err();
|
||||
|
||||
// Close the file and re-open for reading
|
||||
let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
|
||||
|
||||
// cannot write to a file opened in read-only mode
|
||||
let _ = file_a
|
||||
.write_all(b"bar".to_vec().slice_len(), &ctx)
|
||||
.write_all_at(IoBuffer::from(b"bar").slice_len(), 0, &ctx)
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
// Try simple read
|
||||
assert_eq!("foobar", file_a.read_string(&ctx).await?);
|
||||
|
||||
// It's positioned at the EOF now.
|
||||
assert_eq!("", file_a.read_string(&ctx).await?);
|
||||
|
||||
// Test seeks.
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
|
||||
assert_eq!("oobar", file_a.read_string(&ctx).await?);
|
||||
|
||||
assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
|
||||
assert_eq!("ar", file_a.read_string(&ctx).await?);
|
||||
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
|
||||
assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
|
||||
assert_eq!("bar", file_a.read_string(&ctx).await?);
|
||||
|
||||
assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
|
||||
assert_eq!("oobar", file_a.read_string(&ctx).await?);
|
||||
|
||||
// Test erroneous seeks to before byte 0
|
||||
file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
|
||||
file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
|
||||
|
||||
// the erroneous seek should have left the position unchanged
|
||||
assert_eq!("oobar", file_a.read_string(&ctx).await?);
|
||||
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
|
||||
|
||||
// Create another test file, and try FileExt functions on it.
|
||||
let path_b = testdir.join("file_b");
|
||||
@@ -1635,9 +1409,6 @@ mod tests {
|
||||
|
||||
// Open a lot of files, enough to cause some evictions. (Or to be precise,
|
||||
// open the same file many times. The effect is the same.)
|
||||
//
|
||||
// leave file_a positioned at offset 1 before we start
|
||||
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
|
||||
|
||||
let mut vfiles = Vec::new();
|
||||
for _ in 0..100 {
|
||||
@@ -1647,7 +1418,7 @@ mod tests {
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
assert_eq!("FOOBAR", vfile.read_string(&ctx).await?);
|
||||
assert_eq!("FOOBAR", vfile.read_string_at(0, 6, &ctx).await?);
|
||||
vfiles.push(vfile);
|
||||
}
|
||||
|
||||
@@ -1655,8 +1426,8 @@ mod tests {
|
||||
assert!(vfiles.len() > TEST_MAX_FILE_DESCRIPTORS * 2);
|
||||
|
||||
// The underlying file descriptor for 'file_a' should be closed now. Try to read
|
||||
// from it again. We left the file positioned at offset 1 above.
|
||||
assert_eq!("oobar", file_a.read_string(&ctx).await?);
|
||||
// from it again.
|
||||
assert_eq!("foobar", file_a.read_string_at(0, 6, &ctx).await?);
|
||||
|
||||
// Check that all the other FDs still work too. Use them in random order for
|
||||
// good measure.
|
||||
@@ -1695,7 +1466,7 @@ mod tests {
|
||||
for _ in 0..VIRTUAL_FILES {
|
||||
let f = VirtualFileInner::open_with_options(
|
||||
&test_file_path,
|
||||
OpenOptions::new().read(true),
|
||||
OpenOptions::new().read(true).clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -1750,7 +1521,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
|
||||
let post = file.read_string(&ctx).await.unwrap();
|
||||
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
|
||||
assert_eq!(post, "foo");
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
@@ -1759,7 +1530,7 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
|
||||
let post = file.read_string(&ctx).await.unwrap();
|
||||
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
|
||||
assert_eq!(post, "bar");
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
@@ -1784,7 +1555,7 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
|
||||
let post = file.read_string(&ctx).await.unwrap();
|
||||
let post = file.read_string_at(0, 3, &ctx).await.unwrap();
|
||||
assert_eq!(post, "foo");
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
|
||||
@@ -209,6 +209,27 @@ impl IoEngine {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn set_len(
|
||||
&self,
|
||||
file_guard: FileGuard,
|
||||
len: u64,
|
||||
) -> (FileGuard, std::io::Result<()>) {
|
||||
match self {
|
||||
IoEngine::NotSet => panic!("not initialized"),
|
||||
IoEngine::StdFs => {
|
||||
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
|
||||
(file_guard, res)
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
// TODO: ftruncate op for tokio-epoll-uring
|
||||
let res = file_guard.with_std_file(|std_file| std_file.set_len(len));
|
||||
(file_guard, res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn write_at<B: IoBuf + Send>(
|
||||
&self,
|
||||
file_guard: FileGuard,
|
||||
|
||||
@@ -6,7 +6,12 @@ use std::path::Path;
|
||||
use super::io_engine::IoEngine;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum OpenOptions {
|
||||
pub struct OpenOptions {
|
||||
write: bool,
|
||||
inner: Inner,
|
||||
}
|
||||
#[derive(Debug, Clone)]
|
||||
enum Inner {
|
||||
StdFs(std::fs::OpenOptions),
|
||||
#[cfg(target_os = "linux")]
|
||||
TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions),
|
||||
@@ -14,13 +19,17 @@ pub enum OpenOptions {
|
||||
|
||||
impl Default for OpenOptions {
|
||||
fn default() -> Self {
|
||||
match super::io_engine::get() {
|
||||
let inner = match super::io_engine::get() {
|
||||
IoEngine::NotSet => panic!("io engine not set"),
|
||||
IoEngine::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
|
||||
IoEngine::StdFs => Inner::StdFs(std::fs::OpenOptions::new()),
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngine::TokioEpollUring => {
|
||||
Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
|
||||
Inner::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
|
||||
}
|
||||
};
|
||||
Self {
|
||||
write: false,
|
||||
inner,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -30,13 +39,17 @@ impl OpenOptions {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub(super) fn is_write(&self) -> bool {
|
||||
self.write
|
||||
}
|
||||
|
||||
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.read(read);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
Inner::TokioEpollUring(x) => {
|
||||
let _ = x.read(read);
|
||||
}
|
||||
}
|
||||
@@ -44,12 +57,13 @@ impl OpenOptions {
|
||||
}
|
||||
|
||||
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
self.write = write;
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.write(write);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
Inner::TokioEpollUring(x) => {
|
||||
let _ = x.write(write);
|
||||
}
|
||||
}
|
||||
@@ -57,12 +71,12 @@ impl OpenOptions {
|
||||
}
|
||||
|
||||
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.create(create);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
Inner::TokioEpollUring(x) => {
|
||||
let _ = x.create(create);
|
||||
}
|
||||
}
|
||||
@@ -70,12 +84,12 @@ impl OpenOptions {
|
||||
}
|
||||
|
||||
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.create_new(create_new);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
Inner::TokioEpollUring(x) => {
|
||||
let _ = x.create_new(create_new);
|
||||
}
|
||||
}
|
||||
@@ -83,12 +97,12 @@ impl OpenOptions {
|
||||
}
|
||||
|
||||
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.truncate(truncate);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
Inner::TokioEpollUring(x) => {
|
||||
let _ = x.truncate(truncate);
|
||||
}
|
||||
}
|
||||
@@ -96,10 +110,10 @@ impl OpenOptions {
|
||||
}
|
||||
|
||||
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),
|
||||
match &self.inner {
|
||||
Inner::StdFs(x) => x.open(path).map(|file| file.into()),
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
Inner::TokioEpollUring(x) => {
|
||||
let system = super::io_engine::tokio_epoll_uring_ext::thread_local_system().await;
|
||||
system.open(path, x).await.map_err(|e| match e {
|
||||
tokio_epoll_uring::Error::Op(e) => e,
|
||||
@@ -114,12 +128,12 @@ impl OpenOptions {
|
||||
|
||||
impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
|
||||
fn mode(&mut self, mode: u32) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.mode(mode);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
Inner::TokioEpollUring(x) => {
|
||||
let _ = x.mode(mode);
|
||||
}
|
||||
}
|
||||
@@ -127,12 +141,12 @@ impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
|
||||
}
|
||||
|
||||
fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
match &mut self.inner {
|
||||
Inner::StdFs(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
Inner::TokioEpollUring(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -282,6 +282,17 @@ unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> std::io::Write for AlignedBufferMut<A> {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
self.extend_from_slice(buf);
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
|
||||
@@ -1,14 +1,19 @@
|
||||
mod flush;
|
||||
|
||||
use bytes::BufMut;
|
||||
pub(crate) use flush::FlushControl;
|
||||
use flush::FlushHandle;
|
||||
pub(crate) use flush::FlushTaskError;
|
||||
use flush::ShutdownRequest;
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::trace;
|
||||
|
||||
use super::io_buf_aligned::IoBufAligned;
|
||||
use super::io_buf_aligned::IoBufAlignedMut;
|
||||
use super::io_buf_ext::{FullSlice, IoBufExt};
|
||||
use crate::context::RequestContext;
|
||||
use crate::virtual_file::UsizeIsU64;
|
||||
use crate::virtual_file::{IoBuffer, IoBufferMut};
|
||||
|
||||
pub(crate) trait CheapCloneForRead {
|
||||
@@ -33,12 +38,49 @@ pub trait OwnedAsyncWriter {
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send;
|
||||
fn set_len(
|
||||
&self,
|
||||
len: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> impl Future<Output = std::io::Result<()>> + Send;
|
||||
}
|
||||
|
||||
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
|
||||
/// small writes into larger writes of size [`Buffer::cap`].
|
||||
///
|
||||
/// The buffer is flushed if and only if it is full ([`Buffer::pending`] == [`Buffer::cap`]).
|
||||
/// This guarantees that writes to the filesystem happen
|
||||
/// - at offsets that are multiples of [`Buffer::cap`]
|
||||
/// - in lengths that are multiples of [`Buffer::cap`]
|
||||
///
|
||||
/// Above property is useful for Direct IO, where whatever the
|
||||
/// effectively dominating disk-sector/filesystem-block/memory-page size
|
||||
/// determines the requirements on
|
||||
/// - the alignment of the pointer passed to the read/write operation
|
||||
/// - the value of `count` (i.e., the length of the read/write operation)
|
||||
/// which must be a multiple of the dominating sector/block/page size.
|
||||
///
|
||||
/// See [`BufferedWriter::shutdown`] / [`BufferedWriterShutdownMode`] for different
|
||||
/// ways of dealing with the special case that the buffer is not full by the time
|
||||
/// we are done writing.
|
||||
///
|
||||
/// The first flush to the underlying `W` happens at offset `start_offset` (arg of [`BufferedWriter::new`]).
|
||||
/// The next flush is to offset `start_offset + Buffer::cap`. The one after at `start_offset + 2 * Buffer::cap` and so on.
|
||||
///
|
||||
/// TODO: decouple buffer capacity from alignment requirement.
|
||||
/// Right now we assume [`Buffer::cap`] is the alignment requirement,
|
||||
/// but actually [`Buffer::cap`] should only determine how often we flush
|
||||
/// while writing, while a separate alignment requirement argument should
|
||||
/// be passed to determine alignment requirement. This could be used by
|
||||
/// [`BufferedWriterShutdownMode::PadThenTruncate`] to avoid excessive
|
||||
/// padding of zeroes. For example, today, with a capacity of 64KiB, we
|
||||
/// would pad up to 64KiB-1 bytes of zeroes, then truncate off 64KiB-1.
|
||||
/// This is wasteful, e.g., if the alignment requirement is 4KiB, we only
|
||||
/// need to pad & truncate up to 4KiB-1 bytes of zeroes
|
||||
///
|
||||
// TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput,
|
||||
// since we would avoid copying majority of the data into the internal buffer.
|
||||
// https://github.com/neondatabase/neon/issues/10101
|
||||
pub struct BufferedWriter<B: Buffer, W> {
|
||||
/// Clone of the buffer that was last submitted to the flush loop.
|
||||
/// `None` if no flush request has been submitted, Some forever after.
|
||||
@@ -60,9 +102,24 @@ pub struct BufferedWriter<B: Buffer, W> {
|
||||
bytes_submitted: u64,
|
||||
}
|
||||
|
||||
/// How [`BufferedWriter::shutdown`] should deal with pending (=not-yet-flushed) data.
|
||||
///
|
||||
/// Cf the [`BufferedWriter`] comment's paragraph for context on why we need to think about this.
|
||||
pub enum BufferedWriterShutdownMode {
|
||||
/// Drop pending data, don't write back to file.
|
||||
DropTail,
|
||||
/// Pad the pending data with zeroes (cf [`usize::next_multiple_of`]).
|
||||
ZeroPadToNextMultiple(usize),
|
||||
/// Fill the IO buffer with zeroes, flush to disk, the `ftruncate` the
|
||||
/// file to the exact number of bytes written to [`Self`].
|
||||
///
|
||||
/// TODO: see in [`BufferedWriter`] comment about decoupling buffer capacity from alignment requirement.
|
||||
PadThenTruncate,
|
||||
}
|
||||
|
||||
impl<B, Buf, W> BufferedWriter<B, W>
|
||||
where
|
||||
B: Buffer<IoBuf = Buf> + Send + 'static,
|
||||
B: IoBufAlignedMut + Buffer<IoBuf = Buf> + Send + 'static,
|
||||
Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
|
||||
W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
|
||||
{
|
||||
@@ -71,6 +128,7 @@ where
|
||||
/// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
|
||||
pub fn new(
|
||||
writer: W,
|
||||
start_offset: u64,
|
||||
buf_new: impl Fn() -> B,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
cancel: CancellationToken,
|
||||
@@ -88,7 +146,7 @@ where
|
||||
ctx.attached_child(),
|
||||
flush_task_span,
|
||||
),
|
||||
bytes_submitted: 0,
|
||||
bytes_submitted: start_offset,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,18 +167,80 @@ where
|
||||
}
|
||||
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub async fn shutdown(mut self, ctx: &RequestContext) -> Result<(u64, W), FlushTaskError> {
|
||||
self.flush(ctx).await?;
|
||||
pub async fn shutdown(
|
||||
mut self,
|
||||
mode: BufferedWriterShutdownMode,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(u64, W), FlushTaskError> {
|
||||
let mut mutable = self.mutable.take().expect("must not use after an error");
|
||||
let unpadded_pending = mutable.pending();
|
||||
let final_len: u64;
|
||||
let shutdown_req;
|
||||
match mode {
|
||||
BufferedWriterShutdownMode::DropTail => {
|
||||
trace!(pending=%mutable.pending(), "dropping pending data");
|
||||
drop(mutable);
|
||||
|
||||
final_len = self.bytes_submitted;
|
||||
shutdown_req = ShutdownRequest { set_len: None };
|
||||
}
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(next_multiple) => {
|
||||
let len = mutable.pending();
|
||||
let cap = mutable.cap();
|
||||
assert!(
|
||||
len <= cap,
|
||||
"buffer impl ensures this, but let's check because the extend_with below would panic if we go beyond"
|
||||
);
|
||||
let padded_len = len.next_multiple_of(next_multiple);
|
||||
assert!(
|
||||
padded_len <= cap,
|
||||
"caller specified a multiple that is larger than the buffer capacity"
|
||||
);
|
||||
let count = padded_len - len;
|
||||
mutable.extend_with(0, count);
|
||||
trace!(count, "padding with zeros");
|
||||
self.mutable = Some(mutable);
|
||||
|
||||
final_len = self.bytes_submitted + padded_len.into_u64();
|
||||
shutdown_req = ShutdownRequest { set_len: None };
|
||||
}
|
||||
BufferedWriterShutdownMode::PadThenTruncate => {
|
||||
let len = mutable.pending();
|
||||
let cap = mutable.cap();
|
||||
// TODO: see struct comment TODO on decoupling buffer capacity from alignment requirement.
|
||||
let alignment_requirement = cap;
|
||||
assert!(len <= cap, "buffer impl should ensure this");
|
||||
let padding_end_offset = len.next_multiple_of(alignment_requirement);
|
||||
assert!(
|
||||
padding_end_offset <= cap,
|
||||
"{padding_end_offset} <= {cap} ({alignment_requirement})"
|
||||
);
|
||||
let count = padding_end_offset - len;
|
||||
mutable.extend_with(0, count);
|
||||
trace!(count, "padding with zeros");
|
||||
self.mutable = Some(mutable);
|
||||
|
||||
final_len = self.bytes_submitted + len.into_u64();
|
||||
shutdown_req = ShutdownRequest {
|
||||
// Avoid set_len call if we didn't need to pad anything.
|
||||
set_len: if count > 0 { Some(final_len) } else { None },
|
||||
};
|
||||
}
|
||||
};
|
||||
let padded_pending = self.mutable.as_ref().map(|b| b.pending());
|
||||
trace!(unpadded_pending, padded_pending, "padding done");
|
||||
if self.mutable.is_some() {
|
||||
self.flush(ctx).await?;
|
||||
}
|
||||
let Self {
|
||||
mutable: buf,
|
||||
mutable: _,
|
||||
maybe_flushed: _,
|
||||
mut flush_handle,
|
||||
bytes_submitted: bytes_amount,
|
||||
bytes_submitted: _,
|
||||
} = self;
|
||||
let writer = flush_handle.shutdown().await?;
|
||||
assert!(buf.is_some());
|
||||
Ok((bytes_amount, writer))
|
||||
let writer = flush_handle.shutdown(shutdown_req).await?;
|
||||
|
||||
Ok((final_len, writer))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -224,6 +344,10 @@ pub trait Buffer {
|
||||
/// panics if `other.len() > self.cap() - self.pending()`.
|
||||
fn extend_from_slice(&mut self, other: &[u8]);
|
||||
|
||||
/// Add `count` bytes `val` into `self`.
|
||||
/// Panics if `count > self.cap() - self.pending()`.
|
||||
fn extend_with(&mut self, val: u8, count: usize);
|
||||
|
||||
/// Number of bytes in the buffer.
|
||||
fn pending(&self) -> usize;
|
||||
|
||||
@@ -251,6 +375,14 @@ impl Buffer for IoBufferMut {
|
||||
IoBufferMut::extend_from_slice(self, other);
|
||||
}
|
||||
|
||||
fn extend_with(&mut self, val: u8, count: usize) {
|
||||
if self.len() + count > self.cap() {
|
||||
panic!("Buffer capacity exceeded");
|
||||
}
|
||||
|
||||
IoBufferMut::put_bytes(self, val, count);
|
||||
}
|
||||
|
||||
fn pending(&self) -> usize {
|
||||
self.len()
|
||||
}
|
||||
@@ -273,26 +405,22 @@ impl Buffer for IoBufferMut {
|
||||
mod tests {
|
||||
use std::sync::Mutex;
|
||||
|
||||
use rstest::rstest;
|
||||
|
||||
use super::*;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
enum Op {
|
||||
Write { buf: Vec<u8>, offset: u64 },
|
||||
SetLen { len: u64 },
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
struct RecorderWriter {
|
||||
/// record bytes and write offsets.
|
||||
writes: Mutex<Vec<(Vec<u8>, u64)>>,
|
||||
}
|
||||
|
||||
impl RecorderWriter {
|
||||
/// Gets recorded bytes and write offsets.
|
||||
fn get_writes(&self) -> Vec<Vec<u8>> {
|
||||
self.writes
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|(buf, _)| buf.clone())
|
||||
.collect()
|
||||
}
|
||||
recording: Mutex<Vec<Op>>,
|
||||
}
|
||||
|
||||
impl OwnedAsyncWriter for RecorderWriter {
|
||||
@@ -302,28 +430,42 @@ mod tests {
|
||||
offset: u64,
|
||||
_: &RequestContext,
|
||||
) -> (FullSlice<Buf>, std::io::Result<()>) {
|
||||
self.writes
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push((Vec::from(&buf[..]), offset));
|
||||
self.recording.lock().unwrap().push(Op::Write {
|
||||
buf: Vec::from(&buf[..]),
|
||||
offset,
|
||||
});
|
||||
(buf, Ok(()))
|
||||
}
|
||||
async fn set_len(&self, len: u64, _ctx: &RequestContext) -> std::io::Result<()> {
|
||||
self.recording.lock().unwrap().push(Op::SetLen { len });
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn test_ctx() -> RequestContext {
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
|
||||
}
|
||||
|
||||
#[rstest]
|
||||
#[tokio::test]
|
||||
async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> {
|
||||
async fn test_write_all_borrowed_always_goes_through_buffer(
|
||||
#[values(
|
||||
BufferedWriterShutdownMode::DropTail,
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(2),
|
||||
BufferedWriterShutdownMode::PadThenTruncate
|
||||
)]
|
||||
mode: BufferedWriterShutdownMode,
|
||||
) -> anyhow::Result<()> {
|
||||
let ctx = test_ctx();
|
||||
let ctx = &ctx;
|
||||
let recorder = RecorderWriter::default();
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
let cancel = CancellationToken::new();
|
||||
let cap = 4;
|
||||
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
|
||||
recorder,
|
||||
|| IoBufferMut::with_capacity(2),
|
||||
0,
|
||||
|| IoBufferMut::with_capacity(cap),
|
||||
gate.enter()?,
|
||||
cancel,
|
||||
ctx,
|
||||
@@ -333,23 +475,89 @@ mod tests {
|
||||
writer.write_buffered_borrowed(b"abc", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"d", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"e", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"fg", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"hi", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"j", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"klmno", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"efg", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"hijklm", ctx).await?;
|
||||
|
||||
let (_, recorder) = writer.shutdown(ctx).await?;
|
||||
assert_eq!(
|
||||
recorder.get_writes(),
|
||||
{
|
||||
let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"];
|
||||
expect
|
||||
let mut expect = {
|
||||
[(0, b"abcd"), (4, b"efgh"), (8, b"ijkl")]
|
||||
.into_iter()
|
||||
.map(|(offset, v)| Op::Write {
|
||||
offset,
|
||||
buf: v[..].to_vec(),
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
let expect_next_offset = 12;
|
||||
|
||||
match &mode {
|
||||
BufferedWriterShutdownMode::DropTail => (),
|
||||
// We test the case with padding to next multiple of 2 so that it's different
|
||||
// from the alignment requirement of 4 inferred from buffer capacity.
|
||||
// See TODOs in the `BufferedWriter` struct comment on decoupling buffer capacity from alignment requirement.
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(2) => {
|
||||
expect.push(Op::Write {
|
||||
offset: expect_next_offset,
|
||||
// it's legitimate for pad-to-next multiple 2 to be < alignment requirement 4 inferred from buffer capacity
|
||||
buf: b"m\0".to_vec(),
|
||||
});
|
||||
}
|
||||
.iter()
|
||||
.map(|v| v[..].to_vec())
|
||||
.collect::<Vec<_>>()
|
||||
BufferedWriterShutdownMode::ZeroPadToNextMultiple(_) => unimplemented!(),
|
||||
BufferedWriterShutdownMode::PadThenTruncate => {
|
||||
expect.push(Op::Write {
|
||||
offset: expect_next_offset,
|
||||
buf: b"m\0\0\0".to_vec(),
|
||||
});
|
||||
expect.push(Op::SetLen { len: 13 });
|
||||
}
|
||||
}
|
||||
|
||||
let (_, recorder) = writer.shutdown(mode, ctx).await?;
|
||||
assert_eq!(&*recorder.recording.lock().unwrap(), &expect);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_len_is_skipped_if_not_needed() -> anyhow::Result<()> {
|
||||
let ctx = test_ctx();
|
||||
let ctx = &ctx;
|
||||
let recorder = RecorderWriter::default();
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
let cancel = CancellationToken::new();
|
||||
let cap = 4;
|
||||
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
|
||||
recorder,
|
||||
0,
|
||||
|| IoBufferMut::with_capacity(cap),
|
||||
gate.enter()?,
|
||||
cancel,
|
||||
ctx,
|
||||
tracing::Span::none(),
|
||||
);
|
||||
|
||||
// write a multiple of `cap`
|
||||
writer.write_buffered_borrowed(b"abc", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"defgh", ctx).await?;
|
||||
|
||||
let (_, recorder) = writer
|
||||
.shutdown(BufferedWriterShutdownMode::PadThenTruncate, ctx)
|
||||
.await?;
|
||||
|
||||
let expect = {
|
||||
[(0, b"abcd"), (4, b"efgh")]
|
||||
.into_iter()
|
||||
.map(|(offset, v)| Op::Write {
|
||||
offset,
|
||||
buf: v[..].to_vec(),
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
&*recorder.recording.lock().unwrap(),
|
||||
&expect,
|
||||
"set_len should not be called if the buffer is already aligned"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::ops::ControlFlow;
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, info, info_span, warn};
|
||||
use tracing::{Instrument, info_span, warn};
|
||||
use utils::sync::duplex;
|
||||
|
||||
use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
|
||||
@@ -18,7 +18,7 @@ pub struct FlushHandle<Buf, W> {
|
||||
pub struct FlushHandleInner<Buf, W> {
|
||||
/// A bi-directional channel that sends (buffer, offset) for writes,
|
||||
/// and receives recyled buffer.
|
||||
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
|
||||
channel: duplex::mpsc::Duplex<Request<Buf>, FullSlice<Buf>>,
|
||||
/// Join handle for the background flush task.
|
||||
join_handle: tokio::task::JoinHandle<Result<W, FlushTaskError>>,
|
||||
}
|
||||
@@ -27,9 +27,27 @@ struct FlushRequest<Buf> {
|
||||
slice: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
#[cfg(test)]
|
||||
ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
ready_to_flush_rx: Option<tokio::sync::oneshot::Receiver<()>>,
|
||||
#[cfg(test)]
|
||||
done_flush_tx: tokio::sync::oneshot::Sender<()>,
|
||||
done_flush_tx: Option<tokio::sync::oneshot::Sender<()>>,
|
||||
}
|
||||
|
||||
pub struct ShutdownRequest {
|
||||
pub set_len: Option<u64>,
|
||||
}
|
||||
|
||||
enum Request<Buf> {
|
||||
Flush(FlushRequest<Buf>),
|
||||
Shutdown(ShutdownRequest),
|
||||
}
|
||||
|
||||
impl<Buf> Request<Buf> {
|
||||
fn op_str(&self) -> &'static str {
|
||||
match self {
|
||||
Request::Flush(_) => "flush",
|
||||
Request::Shutdown(_) => "shutdown",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Constructs a request and a control object for a new flush operation.
|
||||
@@ -51,8 +69,8 @@ fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>,
|
||||
let request = FlushRequest {
|
||||
slice,
|
||||
offset,
|
||||
ready_to_flush_rx,
|
||||
done_flush_tx,
|
||||
ready_to_flush_rx: Some(ready_to_flush_rx),
|
||||
done_flush_tx: Some(done_flush_tx),
|
||||
};
|
||||
(request, control)
|
||||
}
|
||||
@@ -159,10 +177,7 @@ where
|
||||
let (request, flush_control) = new_flush_op(slice, offset);
|
||||
|
||||
// Submits the buffer to the background task.
|
||||
let submit = self.inner_mut().channel.send(request).await;
|
||||
if submit.is_err() {
|
||||
return self.handle_error().await;
|
||||
}
|
||||
self.send(Request::Flush(request)).await?;
|
||||
|
||||
// Wait for an available buffer from the background flush task.
|
||||
// This is the BACKPRESSURE mechanism: if the flush task can't keep up,
|
||||
@@ -174,15 +189,28 @@ where
|
||||
Ok((recycled, flush_control))
|
||||
}
|
||||
|
||||
/// Sends poison pill to flush task and waits for it to exit.
|
||||
pub async fn shutdown(&mut self, req: ShutdownRequest) -> Result<W, FlushTaskError> {
|
||||
self.send(Request::Shutdown(req)).await?;
|
||||
self.wait().await
|
||||
}
|
||||
|
||||
async fn send(&mut self, request: Request<Buf>) -> Result<(), FlushTaskError> {
|
||||
let submit = self.inner_mut().channel.send(request).await;
|
||||
if submit.is_err() {
|
||||
return self.handle_error().await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_error<T>(&mut self) -> Result<T, FlushTaskError> {
|
||||
Err(self
|
||||
.shutdown()
|
||||
.wait()
|
||||
.await
|
||||
.expect_err("flush task only disconnects duplex if it exits with an error"))
|
||||
}
|
||||
|
||||
/// Cleans up the channel, join the flush task.
|
||||
pub async fn shutdown(&mut self) -> Result<W, FlushTaskError> {
|
||||
async fn wait(&mut self) -> Result<W, FlushTaskError> {
|
||||
let handle = self
|
||||
.inner
|
||||
.take()
|
||||
@@ -204,7 +232,7 @@ where
|
||||
pub struct FlushBackgroundTask<Buf, W> {
|
||||
/// A bi-directional channel that receives (buffer, offset) for writes,
|
||||
/// and send back recycled buffer.
|
||||
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
|
||||
channel: duplex::mpsc::Duplex<FullSlice<Buf>, Request<Buf>>,
|
||||
/// A writter for persisting data to disk.
|
||||
writer: W,
|
||||
ctx: RequestContext,
|
||||
@@ -226,7 +254,7 @@ where
|
||||
{
|
||||
/// Creates a new background flush task.
|
||||
fn new(
|
||||
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
|
||||
channel: duplex::mpsc::Duplex<FullSlice<Buf>, Request<Buf>>,
|
||||
file: W,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
cancel: CancellationToken,
|
||||
@@ -245,15 +273,9 @@ where
|
||||
async fn run(mut self) -> Result<W, FlushTaskError> {
|
||||
// Exit condition: channel is closed and there is no remaining buffer to be flushed
|
||||
while let Some(request) = self.channel.recv().await {
|
||||
#[cfg(test)]
|
||||
{
|
||||
// In test, wait for control to signal that we are ready to flush.
|
||||
if request.ready_to_flush_rx.await.is_err() {
|
||||
tracing::debug!("control dropped");
|
||||
}
|
||||
}
|
||||
let op_kind = request.op_str();
|
||||
|
||||
// Write slice to disk at `offset`.
|
||||
// Perform the requested operation.
|
||||
//
|
||||
// Error handling happens according to the current policy of crashing
|
||||
// on fatal IO errors and retrying in place otherwise (deeming all other errors retryable).
|
||||
@@ -262,52 +284,112 @@ where
|
||||
//
|
||||
// TODO: use utils::backoff::retry once async closures are actually usable
|
||||
//
|
||||
let mut slice_storage = Some(request.slice);
|
||||
let mut request_storage = Some(request);
|
||||
for attempt in 1.. {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushTaskError::Cancelled);
|
||||
}
|
||||
let result = async {
|
||||
if attempt > 1 {
|
||||
info!("retrying flush");
|
||||
}
|
||||
let slice = slice_storage.take().expect(
|
||||
let request: Request<Buf> = request_storage .take().expect(
|
||||
"likely previous invocation of this future didn't get polled to completion",
|
||||
);
|
||||
// Don't cancel this write by doing tokio::select with self.cancel.cancelled().
|
||||
match &request {
|
||||
Request::Shutdown(ShutdownRequest { set_len: None }) => {
|
||||
request_storage = Some(request);
|
||||
return ControlFlow::Break(());
|
||||
},
|
||||
Request::Flush(_) | Request::Shutdown(ShutdownRequest { set_len: Some(_) }) => {
|
||||
},
|
||||
}
|
||||
if attempt > 1 {
|
||||
warn!(op=%request.op_str(), "retrying");
|
||||
}
|
||||
// borrows so we can async move the requests into async block while not moving these borrows here
|
||||
let writer = &self.writer;
|
||||
let request_storage = &mut request_storage;
|
||||
let ctx = &self.ctx;
|
||||
let io_fut = match request {
|
||||
Request::Flush(FlushRequest { slice, offset, #[cfg(test)] ready_to_flush_rx, #[cfg(test)] done_flush_tx }) => futures::future::Either::Left(async move {
|
||||
#[cfg(test)]
|
||||
if let Some(ready_to_flush_rx) = ready_to_flush_rx {
|
||||
{
|
||||
// In test, wait for control to signal that we are ready to flush.
|
||||
if ready_to_flush_rx.await.is_err() {
|
||||
tracing::debug!("control dropped");
|
||||
}
|
||||
}
|
||||
}
|
||||
let (slice, res) = writer.write_all_at(slice, offset, ctx).await;
|
||||
*request_storage = Some(Request::Flush(FlushRequest {
|
||||
slice,
|
||||
offset,
|
||||
#[cfg(test)]
|
||||
ready_to_flush_rx: None, // the contract is that we notify before first attempt
|
||||
#[cfg(test)]
|
||||
done_flush_tx
|
||||
}));
|
||||
res
|
||||
}),
|
||||
Request::Shutdown(ShutdownRequest { set_len }) => futures::future::Either::Right(async move {
|
||||
let set_len = set_len.expect("we filter out the None case above");
|
||||
let res = writer.set_len(set_len, ctx).await;
|
||||
*request_storage = Some(Request::Shutdown(ShutdownRequest {
|
||||
set_len: Some(set_len),
|
||||
}));
|
||||
res
|
||||
}),
|
||||
};
|
||||
// Don't cancel the io_fut by doing tokio::select with self.cancel.cancelled().
|
||||
// The underlying tokio-epoll-uring slot / kernel operation is still ongoing and occupies resources.
|
||||
// If we retry indefinitely, we'll deplete those resources.
|
||||
// Future: teach tokio-epoll-uring io_uring operation cancellation, but still,
|
||||
// wait for cancelled ops to complete and discard their error.
|
||||
let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await;
|
||||
slice_storage = Some(slice);
|
||||
let res = io_fut.await;
|
||||
let res = res.maybe_fatal_err("owned_buffers_io flush");
|
||||
let Err(err) = res else {
|
||||
if attempt > 1 {
|
||||
warn!(op=%op_kind, "retry succeeded");
|
||||
}
|
||||
return ControlFlow::Break(());
|
||||
};
|
||||
warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
|
||||
utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &self.cancel).await;
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
.instrument(info_span!("flush_attempt", %attempt))
|
||||
.instrument(info_span!("attempt", %attempt, %op_kind))
|
||||
.await;
|
||||
match result {
|
||||
ControlFlow::Break(()) => break,
|
||||
ControlFlow::Continue(()) => continue,
|
||||
}
|
||||
}
|
||||
let slice = slice_storage.expect("loop must have run at least once");
|
||||
let request = request_storage.expect("loop must have run at least once");
|
||||
|
||||
#[cfg(test)]
|
||||
{
|
||||
// In test, tell control we are done flushing buffer.
|
||||
if request.done_flush_tx.send(()).is_err() {
|
||||
tracing::debug!("control dropped");
|
||||
let slice = match request {
|
||||
Request::Flush(FlushRequest {
|
||||
slice,
|
||||
#[cfg(test)]
|
||||
mut done_flush_tx,
|
||||
..
|
||||
}) => {
|
||||
#[cfg(test)]
|
||||
{
|
||||
// In test, tell control we are done flushing buffer.
|
||||
if done_flush_tx.take().expect("always Some").send(()).is_err() {
|
||||
tracing::debug!("control dropped");
|
||||
}
|
||||
}
|
||||
slice
|
||||
}
|
||||
}
|
||||
Request::Shutdown(_) => {
|
||||
// next iteration will observe recv() returning None
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
|
||||
if self.channel.send(slice).await.is_err() {
|
||||
let send_res = self.channel.send(slice).await;
|
||||
if send_res.is_err() {
|
||||
// Although channel is closed. Still need to finish flushing the remaining buffers.
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -33,6 +33,10 @@ impl OwnedAsyncWriter for TempVirtualFile {
|
||||
) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send {
|
||||
VirtualFile::write_all_at(self, buf, offset, ctx)
|
||||
}
|
||||
|
||||
async fn set_len(&self, len: u64, ctx: &RequestContext) -> std::io::Result<()> {
|
||||
VirtualFile::set_len(self, len, ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TempVirtualFile {
|
||||
|
||||
18
poetry.lock
generated
18
poetry.lock
generated
@@ -1274,14 +1274,14 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "h11"
|
||||
version = "0.14.0"
|
||||
version = "0.16.0"
|
||||
description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"},
|
||||
{file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"},
|
||||
{file = "h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86"},
|
||||
{file = "h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1314,25 +1314,25 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "httpcore"
|
||||
version = "1.0.3"
|
||||
version = "1.0.9"
|
||||
description = "A minimal low-level HTTP client."
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "httpcore-1.0.3-py3-none-any.whl", hash = "sha256:9a6a501c3099307d9fd76ac244e08503427679b1e81ceb1d922485e2f2462ad2"},
|
||||
{file = "httpcore-1.0.3.tar.gz", hash = "sha256:5c0f9546ad17dac4d0772b0808856eb616eb8b48ce94f49ed819fd6982a8a544"},
|
||||
{file = "httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55"},
|
||||
{file = "httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
certifi = "*"
|
||||
h11 = ">=0.13,<0.15"
|
||||
h11 = ">=0.16"
|
||||
|
||||
[package.extras]
|
||||
asyncio = ["anyio (>=4.0,<5.0)"]
|
||||
http2 = ["h2 (>=3,<5)"]
|
||||
socks = ["socksio (==1.*)"]
|
||||
trio = ["trio (>=0.22.0,<0.24.0)"]
|
||||
trio = ["trio (>=0.22.0,<1.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "httpx"
|
||||
|
||||
@@ -8,7 +8,7 @@ use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::proxy::retry::CouldRetry;
|
||||
|
||||
/// A go-to error message which doesn't leak any detail.
|
||||
pub(crate) const REQUEST_FAILED: &str = "Console request failed";
|
||||
pub(crate) const REQUEST_FAILED: &str = "Control plane request failed";
|
||||
|
||||
/// Common console API error.
|
||||
#[derive(Debug, Error)]
|
||||
|
||||
@@ -72,6 +72,7 @@ impl HttpState {
|
||||
neon_metrics: NeonMetrics::new(build_info),
|
||||
allowlist_routes: &[
|
||||
"/status",
|
||||
"/live",
|
||||
"/ready",
|
||||
"/metrics",
|
||||
"/profile/cpu",
|
||||
@@ -1260,16 +1261,8 @@ async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError
|
||||
ForwardOutcome::NotForwarded(req) => req,
|
||||
};
|
||||
|
||||
// Spawn a background task: once we start stepping down, we must finish: if the client drops
|
||||
// their request we should avoid stopping in some part-stepped-down state.
|
||||
let handle = tokio::spawn(async move {
|
||||
let state = get_state(&req);
|
||||
state.service.step_down().await
|
||||
});
|
||||
|
||||
let result = handle
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
let state = get_state(&req);
|
||||
let result = state.service.step_down().await;
|
||||
|
||||
json_response(StatusCode::OK, result)
|
||||
}
|
||||
@@ -1401,6 +1394,8 @@ async fn handle_reconcile_all(req: Request<Body>) -> Result<Response<Body>, ApiE
|
||||
}
|
||||
|
||||
/// Status endpoint is just used for checking that our HTTP listener is up
|
||||
///
|
||||
/// This serves as our k8s startup probe.
|
||||
async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
@@ -1412,6 +1407,30 @@ async fn handle_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Liveness endpoint indicates that this storage controller is in a state
|
||||
/// where it can fulfill it's responsibilties. Namely, startup has finished
|
||||
/// and it is the current leader.
|
||||
///
|
||||
/// This serves as our k8s liveness probe.
|
||||
async fn handle_live(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
return res;
|
||||
}
|
||||
ForwardOutcome::NotForwarded(req) => req,
|
||||
};
|
||||
|
||||
let state = get_state(&req);
|
||||
let live = state.service.startup_complete.is_ready()
|
||||
&& state.service.get_leadership_status() == LeadershipStatus::Leader;
|
||||
|
||||
if live {
|
||||
json_response(StatusCode::OK, ())
|
||||
} else {
|
||||
json_response(StatusCode::SERVICE_UNAVAILABLE, ())
|
||||
}
|
||||
}
|
||||
|
||||
/// Readiness endpoint indicates when we're done doing startup I/O (e.g. reconciling
|
||||
/// with remote pageserver nodes). This is intended for use as a kubernetes readiness probe.
|
||||
async fn handle_ready(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
@@ -1745,6 +1764,7 @@ async fn maybe_forward(req: Request<Body>) -> ForwardOutcome {
|
||||
const NOT_FOR_FORWARD: &[&str] = &[
|
||||
"/control/v1/step_down",
|
||||
"/status",
|
||||
"/live",
|
||||
"/ready",
|
||||
"/metrics",
|
||||
"/profile/cpu",
|
||||
@@ -1969,6 +1989,9 @@ pub fn make_router(
|
||||
.get("/status", |r| {
|
||||
named_request_span(r, handle_status, RequestName("status"))
|
||||
})
|
||||
.get("/live", |r| {
|
||||
named_request_span(r, handle_live, RequestName("live"))
|
||||
})
|
||||
.get("/ready", |r| {
|
||||
named_request_span(r, handle_ready, RequestName("ready"))
|
||||
})
|
||||
|
||||
@@ -43,6 +43,19 @@ impl Leadership {
|
||||
&self,
|
||||
) -> Result<(Option<ControllerPersistence>, Option<GlobalObservedState>)> {
|
||||
let leader = self.current_leader().await?;
|
||||
|
||||
if leader.as_ref().map(|l| &l.address)
|
||||
== self
|
||||
.config
|
||||
.address_for_peers
|
||||
.as_ref()
|
||||
.map(Uri::to_string)
|
||||
.as_ref()
|
||||
{
|
||||
// We already are the current leader. This is a restart.
|
||||
return Ok((leader, None));
|
||||
}
|
||||
|
||||
let leader_step_down_state = if let Some(ref leader) = leader {
|
||||
if self.config.start_as_candidate {
|
||||
self.request_step_down(leader).await
|
||||
|
||||
@@ -55,9 +55,12 @@ impl ResponseErrorMessageExt for reqwest::Response {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
|
||||
pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
|
||||
|
||||
const STEP_DOWN_RETRIES: u32 = 8;
|
||||
const STEP_DOWN_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
impl PeerClient {
|
||||
pub(crate) fn new(http_client: reqwest::Client, uri: Uri, jwt: Option<String>) -> Self {
|
||||
Self {
|
||||
@@ -76,7 +79,7 @@ impl PeerClient {
|
||||
req
|
||||
};
|
||||
|
||||
let req = req.timeout(Duration::from_secs(2));
|
||||
let req = req.timeout(STEP_DOWN_TIMEOUT);
|
||||
|
||||
let res = req
|
||||
.send()
|
||||
@@ -94,8 +97,7 @@ impl PeerClient {
|
||||
}
|
||||
|
||||
/// Request the peer to step down and return its current observed state
|
||||
/// All errors are retried with exponential backoff for a maximum of 4 attempts.
|
||||
/// Assuming all retries are performed, the function times out after roughly 4 seconds.
|
||||
/// All errors are re-tried
|
||||
pub(crate) async fn step_down(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
@@ -104,7 +106,7 @@ impl PeerClient {
|
||||
|| self.request_step_down(),
|
||||
|_e| false,
|
||||
2,
|
||||
4,
|
||||
STEP_DOWN_RETRIES,
|
||||
"Send step down request",
|
||||
cancel,
|
||||
)
|
||||
|
||||
@@ -133,6 +133,8 @@ pub(crate) enum DatabaseOperation {
|
||||
InsertTimelineImport,
|
||||
UpdateTimelineImport,
|
||||
DeleteTimelineImport,
|
||||
ListTimelineImports,
|
||||
IsTenantImportingTimeline,
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
@@ -1640,6 +1642,30 @@ impl Persistence {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn list_timeline_imports(&self) -> DatabaseResult<Vec<TimelineImport>> {
|
||||
use crate::schema::timeline_imports::dsl;
|
||||
let persistent = self
|
||||
.with_measured_conn(DatabaseOperation::ListTimelineImports, move |conn| {
|
||||
Box::pin(async move {
|
||||
let from_db: Vec<TimelineImportPersistence> =
|
||||
dsl::timeline_imports.load(conn).await?;
|
||||
Ok(from_db)
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
|
||||
let imports: Result<Vec<TimelineImport>, _> = persistent
|
||||
.into_iter()
|
||||
.map(TimelineImport::from_persistent)
|
||||
.collect();
|
||||
match imports {
|
||||
Ok(ok) => Ok(ok.into_iter().collect()),
|
||||
Err(err) => Err(DatabaseError::Logical(format!(
|
||||
"failed to deserialize import: {err}"
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_timeline_import(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
@@ -1743,6 +1769,25 @@ impl Persistence {
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn is_tenant_importing_timeline(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> DatabaseResult<bool> {
|
||||
use crate::schema::timeline_imports::dsl;
|
||||
self.with_measured_conn(DatabaseOperation::IsTenantImportingTimeline, move |conn| {
|
||||
Box::pin(async move {
|
||||
let imports: i64 = dsl::timeline_imports
|
||||
.filter(dsl::tenant_id.eq(tenant_id.to_string()))
|
||||
.count()
|
||||
.get_result(conn)
|
||||
.await?;
|
||||
|
||||
Ok(imports > 0)
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
|
||||
|
||||
@@ -11,7 +11,7 @@ use std::num::NonZeroU32;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use anyhow::Context;
|
||||
@@ -97,7 +97,9 @@ use crate::tenant_shard::{
|
||||
ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter,
|
||||
ScheduleOptimization, ScheduleOptimizationAction, TenantShard,
|
||||
};
|
||||
use crate::timeline_import::{ShardImportStatuses, TimelineImport, UpcallClient};
|
||||
use crate::timeline_import::{
|
||||
ShardImportStatuses, TimelineImport, TimelineImportState, UpcallClient,
|
||||
};
|
||||
|
||||
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
|
||||
|
||||
@@ -524,6 +526,9 @@ pub struct Service {
|
||||
|
||||
/// HTTP client with proper CA certs.
|
||||
http_client: reqwest::Client,
|
||||
|
||||
/// Handle for the step down background task if one was ever requested
|
||||
step_down_barrier: OnceLock<tokio::sync::watch::Receiver<Option<GlobalObservedState>>>,
|
||||
}
|
||||
|
||||
impl From<ReconcileWaitError> for ApiError {
|
||||
@@ -875,6 +880,40 @@ impl Service {
|
||||
});
|
||||
}
|
||||
|
||||
// Reconcile the timeline imports:
|
||||
// 1. Mark each tenant shard of tenants with an importing timeline as importing.
|
||||
// 2. Finalize the completed imports in the background. This handles the case where
|
||||
// the previous storage controller instance shut down whilst finalizing imports.
|
||||
let imports = self.persistence.list_timeline_imports().await;
|
||||
match imports {
|
||||
Ok(mut imports) => {
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
for import in &imports {
|
||||
locked
|
||||
.tenants
|
||||
.range_mut(TenantShardId::tenant_range(import.tenant_id))
|
||||
.for_each(|(_id, shard)| {
|
||||
shard.importing = TimelineImportState::Importing
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
imports.retain(|import| import.is_complete());
|
||||
tokio::task::spawn({
|
||||
let finalize_imports_self = self.clone();
|
||||
async move {
|
||||
finalize_imports_self
|
||||
.finalize_timeline_imports(imports)
|
||||
.await
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("Could not retrieve completed imports from database: {err}");
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)"
|
||||
);
|
||||
@@ -1745,6 +1784,7 @@ impl Service {
|
||||
tenant_op_locks: Default::default(),
|
||||
node_op_locks: Default::default(),
|
||||
http_client,
|
||||
step_down_barrier: Default::default(),
|
||||
});
|
||||
|
||||
let result_task_this = this.clone();
|
||||
@@ -3752,6 +3792,22 @@ impl Service {
|
||||
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
|
||||
let is_import = create_req.is_import();
|
||||
|
||||
if is_import {
|
||||
// Ensure that there is no split on-going.
|
||||
// [`Self::tenant_shard_split`] holds the exclusive tenant lock
|
||||
// for the duration of the split, but here we handle the case
|
||||
// where we restarted and the split is being aborted.
|
||||
let locked = self.inner.read().unwrap();
|
||||
let splitting = locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(tenant_id))
|
||||
.any(|(_id, shard)| shard.splitting != SplitState::Idle);
|
||||
|
||||
if splitting {
|
||||
return Err(ApiError::Conflict("Tenant is splitting shard".to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
let timeline_info = self
|
||||
.tenant_timeline_create_pageservers(tenant_id, create_req)
|
||||
.await?;
|
||||
@@ -3789,6 +3845,14 @@ impl Service {
|
||||
.context("timeline import insert")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
// Set the importing flag on the tenant shards
|
||||
self.inner
|
||||
.write()
|
||||
.unwrap()
|
||||
.tenants
|
||||
.range_mut(TenantShardId::tenant_range(tenant_id))
|
||||
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Importing);
|
||||
|
||||
match inserted {
|
||||
true => {
|
||||
tracing::info!(%tenant_id, %timeline_id, "Inserted timeline import");
|
||||
@@ -3865,13 +3929,10 @@ impl Service {
|
||||
self: &Arc<Self>,
|
||||
import: TimelineImport,
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO(vlad): On start-up, load up the imports and notify cplane of the
|
||||
// ones that have been completed. This assumes the new cplane API will
|
||||
// be idempotent. If that's not possible, bang a flag in the database.
|
||||
// https://github.com/neondatabase/neon/issues/11570
|
||||
|
||||
tracing::info!("Finalizing timeline import");
|
||||
|
||||
pausable_failpoint!("timeline-import-pre-cplane-notification");
|
||||
|
||||
let import_failed = import.completion_error().is_some();
|
||||
|
||||
if !import_failed {
|
||||
@@ -3914,6 +3975,13 @@ impl Service {
|
||||
tracing::warn!("Failed to delete timeline import entry from database: {err}");
|
||||
}
|
||||
|
||||
self.inner
|
||||
.write()
|
||||
.unwrap()
|
||||
.tenants
|
||||
.range_mut(TenantShardId::tenant_range(import.tenant_id))
|
||||
.for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle);
|
||||
|
||||
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
|
||||
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
|
||||
// https://github.com/neondatabase/neon/issues/11569
|
||||
@@ -3922,6 +3990,15 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn finalize_timeline_imports(self: &Arc<Self>, imports: Vec<TimelineImport>) {
|
||||
futures::future::join_all(
|
||||
imports
|
||||
.into_iter()
|
||||
.map(|import| self.finalize_timeline_import(import)),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn timeline_active_on_all_shards(
|
||||
self: &Arc<Self>,
|
||||
import: &TimelineImport,
|
||||
@@ -4888,6 +4965,7 @@ impl Service {
|
||||
is_reconciling: shard.reconciler.is_some(),
|
||||
is_pending_compute_notification: shard.pending_compute_notification,
|
||||
is_splitting: matches!(shard.splitting, SplitState::Splitting),
|
||||
is_importing: shard.importing == TimelineImportState::Importing,
|
||||
scheduling_policy: shard.get_scheduling_policy(),
|
||||
preferred_az_id: shard.preferred_az().map(ToString::to_string),
|
||||
})
|
||||
@@ -5378,6 +5456,27 @@ impl Service {
|
||||
.enter()
|
||||
.map_err(|_| ApiError::ShuttingDown)?;
|
||||
|
||||
// Timeline imports on the pageserver side can't handle shard-splits.
|
||||
// If the tenant is importing a timeline, dont't shard split it.
|
||||
match self
|
||||
.persistence
|
||||
.is_tenant_importing_timeline(tenant_id)
|
||||
.await
|
||||
{
|
||||
Ok(importing) => {
|
||||
if importing {
|
||||
return Err(ApiError::Conflict(
|
||||
"Cannot shard split during timeline import".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Failed to check for running imports: {err}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
let new_shard_count = ShardCount::new(split_req.new_shard_count);
|
||||
let new_stripe_size = split_req.new_stripe_size;
|
||||
|
||||
@@ -8050,12 +8149,25 @@ impl Service {
|
||||
candidates.extend(size_candidates);
|
||||
}
|
||||
|
||||
// Filter out tenants in a prohibiting scheduling mode.
|
||||
// Filter out tenants in a prohibiting scheduling modes
|
||||
// and tenants with an ongoing import.
|
||||
//
|
||||
// Note that the import check here is oportunistic. An import might start
|
||||
// after the check before we actually update [`TenantShard::splitting`].
|
||||
// [`Self::tenant_shard_split`] checks the database whilst holding the exclusive
|
||||
// tenant lock. Imports might take a long time, so the check here allows us
|
||||
// to split something else instead of trying the same shard over and over.
|
||||
{
|
||||
let state = self.inner.read().unwrap();
|
||||
candidates.retain(|i| {
|
||||
let policy = state.tenants.get(&i.id).map(|s| s.get_scheduling_policy());
|
||||
policy == Some(ShardSchedulingPolicy::Active)
|
||||
let shard = state.tenants.get(&i.id);
|
||||
match shard {
|
||||
Some(t) => {
|
||||
t.get_scheduling_policy() == ShardSchedulingPolicy::Active
|
||||
&& t.importing == TimelineImportState::Idle
|
||||
}
|
||||
None => false,
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -8886,27 +8998,59 @@ impl Service {
|
||||
self.inner.read().unwrap().get_leadership_status()
|
||||
}
|
||||
|
||||
pub(crate) async fn step_down(&self) -> GlobalObservedState {
|
||||
/// Handler for step down requests
|
||||
///
|
||||
/// Step down runs in separate task since once it's called it should
|
||||
/// be driven to completion. Subsequent requests will wait on the same
|
||||
/// step down task.
|
||||
pub(crate) async fn step_down(self: &Arc<Self>) -> GlobalObservedState {
|
||||
let handle = self.step_down_barrier.get_or_init(|| {
|
||||
let step_down_self = self.clone();
|
||||
let (tx, rx) = tokio::sync::watch::channel::<Option<GlobalObservedState>>(None);
|
||||
tokio::spawn(async move {
|
||||
let state = step_down_self.step_down_task().await;
|
||||
tx.send(Some(state))
|
||||
.expect("Task Arc<Service> keeps receiver alive");
|
||||
});
|
||||
|
||||
rx
|
||||
});
|
||||
|
||||
handle
|
||||
.clone()
|
||||
.wait_for(|observed_state| observed_state.is_some())
|
||||
.await
|
||||
.expect("Task Arc<Service> keeps sender alive")
|
||||
.deref()
|
||||
.clone()
|
||||
.expect("Checked above")
|
||||
}
|
||||
|
||||
async fn step_down_task(&self) -> GlobalObservedState {
|
||||
tracing::info!("Received step down request from peer");
|
||||
failpoint_support::sleep_millis_async!("sleep-on-step-down-handling");
|
||||
|
||||
self.inner.write().unwrap().step_down();
|
||||
|
||||
// Wait for reconciliations to stop, or terminate this process if they
|
||||
// fail to stop in time (this indicates a bug in shutdown)
|
||||
tokio::select! {
|
||||
_ = self.stop_reconciliations(StopReconciliationsReason::SteppingDown) => {
|
||||
tracing::info!("Reconciliations stopped, proceeding with step down");
|
||||
}
|
||||
_ = async {
|
||||
failpoint_support::sleep_millis_async!("step-down-delay-timeout");
|
||||
tokio::time::sleep(Duration::from_secs(10)).await
|
||||
} => {
|
||||
tracing::warn!("Step down timed out while waiting for reconciliation gate, terminating process");
|
||||
let stop_reconciliations =
|
||||
self.stop_reconciliations(StopReconciliationsReason::SteppingDown);
|
||||
let mut stop_reconciliations = std::pin::pin!(stop_reconciliations);
|
||||
|
||||
// The caller may proceed to act as leader when it sees this request fail: reduce the chance
|
||||
// of a split-brain situation by terminating this controller instead of leaving it up in a partially-shut-down state.
|
||||
std::process::exit(1);
|
||||
let started_at = Instant::now();
|
||||
|
||||
// Wait for reconciliations to stop and warn if that's taking a long time
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut stop_reconciliations => {
|
||||
tracing::info!("Reconciliations stopped, proceeding with step down");
|
||||
break;
|
||||
}
|
||||
_ = tokio::time::sleep(Duration::from_secs(10)) => {
|
||||
tracing::warn!(
|
||||
elapsed_sec=%started_at.elapsed().as_secs(),
|
||||
"Stopping reconciliations during step down is taking too long"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::scheduler::{
|
||||
RefCountUpdate, ScheduleContext, ScheduleError, Scheduler, SecondaryShardTag, ShardTag,
|
||||
};
|
||||
use crate::service::ReconcileResultRequest;
|
||||
use crate::timeline_import::TimelineImportState;
|
||||
use crate::{Sequence, service};
|
||||
|
||||
/// Serialization helper
|
||||
@@ -100,6 +101,10 @@ pub(crate) struct TenantShard {
|
||||
/// reconciliation, and timeline creation.
|
||||
pub(crate) splitting: SplitState,
|
||||
|
||||
/// Flag indicating whether the tenant has an in-progress timeline import.
|
||||
/// Used to disallow shard splits while an import is in progress.
|
||||
pub(crate) importing: TimelineImportState,
|
||||
|
||||
/// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag
|
||||
/// is set. This flag is cleared when the tenant is popped off the delay queue.
|
||||
pub(crate) delayed_reconcile: bool,
|
||||
@@ -583,6 +588,7 @@ impl TenantShard {
|
||||
config: TenantConfig::default(),
|
||||
reconciler: None,
|
||||
splitting: SplitState::Idle,
|
||||
importing: TimelineImportState::Idle,
|
||||
sequence: Sequence(1),
|
||||
delayed_reconcile: false,
|
||||
waiter: Arc::new(SeqWait::new(Sequence(0))),
|
||||
@@ -1844,6 +1850,8 @@ impl TenantShard {
|
||||
config: serde_json::from_str(&tsp.config).unwrap(),
|
||||
reconciler: None,
|
||||
splitting: tsp.splitting,
|
||||
// Filled in during [`Service::startup_reconcile`]
|
||||
importing: TimelineImportState::Idle,
|
||||
waiter: Arc::new(SeqWait::new(Sequence::initial())),
|
||||
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
|
||||
last_error: Arc::default(),
|
||||
|
||||
@@ -14,6 +14,12 @@ use utils::{
|
||||
|
||||
use crate::{persistence::TimelineImportPersistence, service::Config};
|
||||
|
||||
#[derive(Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub(crate) enum TimelineImportState {
|
||||
Importing,
|
||||
Idle,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
|
||||
|
||||
@@ -103,7 +109,7 @@ impl TimelineImport {
|
||||
let crnt = occ.get_mut();
|
||||
if *crnt == status {
|
||||
Ok(TimelineImportUpdateFollowUp::None)
|
||||
} else if crnt.is_terminal() && !status.is_terminal() {
|
||||
} else if crnt.is_terminal() && *crnt != status {
|
||||
Err(TimelineImportUpdateError::UnexpectedUpdate)
|
||||
} else {
|
||||
*crnt = status;
|
||||
|
||||
@@ -16,4 +16,5 @@ pytest_plugins = (
|
||||
"fixtures.slow",
|
||||
"fixtures.reruns",
|
||||
"fixtures.fast_import",
|
||||
"fixtures.pg_config",
|
||||
)
|
||||
|
||||
@@ -100,6 +100,29 @@ class MetricsGetter:
|
||||
|
||||
return result
|
||||
|
||||
def get_metric_sum(
|
||||
self, names: list[str], filter: dict[str, str] | None = None, absence_ok: bool = False
|
||||
) -> float:
|
||||
"""
|
||||
Fetch all metrics matching `names` and `filter`, and sum their values
|
||||
"""
|
||||
metrics = self.get_metrics()
|
||||
samples = []
|
||||
for name in names:
|
||||
samples.extend(metrics.query_all(name, filter=filter))
|
||||
|
||||
found = False
|
||||
result = 0.0
|
||||
for sample in samples:
|
||||
result += sample.value
|
||||
found = True
|
||||
|
||||
if not found and not absence_ok:
|
||||
log.info(f"Metrics found: {metrics.metrics}")
|
||||
raise RuntimeError(f"could not find any metrics matching {names}, {filter}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def parse_metrics(text: str, name: str = "") -> Metrics:
|
||||
metrics = Metrics(name)
|
||||
|
||||
@@ -975,7 +975,56 @@ class NeonEnvBuilder:
|
||||
traceback: TracebackType | None,
|
||||
):
|
||||
# Stop all the nodes.
|
||||
bytes_written: int = 0
|
||||
getpage_requests: int = 0
|
||||
|
||||
if self.env:
|
||||
log.info("Checking for lots of I/O in tests that shouldn't")
|
||||
|
||||
sk_bytes_written: float = 0
|
||||
if self.env.safekeepers[0].running:
|
||||
try:
|
||||
_sk_bytes_written = (
|
||||
self.env.safekeepers[0]
|
||||
.http_client()
|
||||
.get_metric_value("safekeeper_write_wal_bytes_sum")
|
||||
)
|
||||
except requests.exceptions.ConnectionError:
|
||||
_sk_bytes_written = 0
|
||||
|
||||
if _sk_bytes_written is not None:
|
||||
sk_bytes_written = int(_sk_bytes_written)
|
||||
|
||||
ps_bytes_written: float = 0
|
||||
for pageserver in self.env.pageservers:
|
||||
if pageserver.running:
|
||||
try:
|
||||
_tmp_bytes_written = pageserver.http_client().get_metric_sum(
|
||||
["pageserver_io_operations_bytes_total"],
|
||||
{"operation": "write"},
|
||||
absence_ok=True,
|
||||
)
|
||||
except requests.exceptions.ConnectionError:
|
||||
_tmp_bytes_written = 0
|
||||
if _tmp_bytes_written is not None:
|
||||
ps_bytes_written += int(_tmp_bytes_written)
|
||||
|
||||
try:
|
||||
_tmp_getpage = pageserver.http_client().get_metric_value(
|
||||
"pageserver_smgr_query_started_global_count_total",
|
||||
{"smgr_query_type": "get_page_at_lsn"},
|
||||
)
|
||||
except requests.exceptions.ConnectionError:
|
||||
_tmp_getpage = 0
|
||||
|
||||
if _tmp_getpage is not None:
|
||||
getpage_requests += int(_tmp_getpage)
|
||||
assert ps_bytes_written is not None
|
||||
|
||||
log.info(f"Bytes written: SK {sk_bytes_written}, PS {ps_bytes_written}")
|
||||
log.info(f"GetPage@LSN requests: {getpage_requests}")
|
||||
bytes_written = int(max(ps_bytes_written, sk_bytes_written))
|
||||
|
||||
log.info("Cleaning up all storage and compute nodes")
|
||||
self.env.stop(
|
||||
immediate=False,
|
||||
@@ -1038,6 +1087,31 @@ class NeonEnvBuilder:
|
||||
if cleanup_error is not None:
|
||||
cleanup_error = e
|
||||
|
||||
if (
|
||||
os.environ.get("BUILD_TYPE") == "debug"
|
||||
and bytes_written
|
||||
and bytes_written > 512 * 1024 * 1024
|
||||
):
|
||||
raise RuntimeError(
|
||||
f"This test wrote too much data in debug mode: {bytes_written} bytes"
|
||||
)
|
||||
elif bytes_written > 1024 * 1024 * 1024:
|
||||
raise RuntimeError(
|
||||
f"This test wrote too much data in release mode: {bytes_written} bytes"
|
||||
)
|
||||
else:
|
||||
log.info(f"This test wrote {bytes_written} bytes")
|
||||
|
||||
# Fail tests that do more than 100MB of GetPage@LSN requests in debug mode
|
||||
if os.environ.get("BUILD_TYPE") == "debug" and getpage_requests > 12800:
|
||||
raise RuntimeError(
|
||||
f"This test read too much data from pageservers in debug mode: {getpage_requests * 8192} bytes"
|
||||
)
|
||||
elif getpage_requests > 128000:
|
||||
raise RuntimeError(
|
||||
f"This test read too much data from pageservers in release mode: {getpage_requests * 8192} bytes"
|
||||
)
|
||||
|
||||
|
||||
class NeonEnv:
|
||||
"""
|
||||
@@ -1291,7 +1365,11 @@ class NeonEnv:
|
||||
ps_cfg[key] = value
|
||||
|
||||
if self.pageserver_virtual_file_io_mode is not None:
|
||||
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
|
||||
# TODO(christian): https://github.com/neondatabase/neon/issues/11598
|
||||
if not config.test_may_use_compatibility_snapshot_binaries:
|
||||
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
|
||||
else:
|
||||
log.info("ignoring virtual_file_io_mode parametrization for compatibility test")
|
||||
|
||||
if self.pageserver_wal_receiver_protocol is not None:
|
||||
key, value = PageserverWalReceiverProtocol.to_config_key_value(
|
||||
@@ -1450,6 +1528,7 @@ class NeonEnv:
|
||||
|
||||
for sk in self.safekeepers:
|
||||
sk.stop(immediate=immediate)
|
||||
|
||||
for pageserver in self.pageservers:
|
||||
if ps_assert_metric_no_errors:
|
||||
try:
|
||||
@@ -3380,6 +3459,9 @@ class VanillaPostgres(PgProtocol):
|
||||
"""Return size of pgdatadir subdirectory in bytes."""
|
||||
return get_dir_size(self.pgdatadir / subdir)
|
||||
|
||||
def is_running(self) -> bool:
|
||||
return self.running
|
||||
|
||||
def __enter__(self) -> Self:
|
||||
return self
|
||||
|
||||
|
||||
249
test_runner/fixtures/pg_config.py
Normal file
249
test_runner/fixtures/pg_config.py
Normal file
@@ -0,0 +1,249 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import shlex
|
||||
from enum import StrEnum
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, cast, final
|
||||
|
||||
import pytest
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterator
|
||||
from typing import IO
|
||||
|
||||
from fixtures.neon_fixtures import PgBin
|
||||
|
||||
|
||||
@final
|
||||
class PgConfigKey(StrEnum):
|
||||
BINDIR = "BINDIR"
|
||||
DOCDIR = "DOCDIR"
|
||||
HTMLDIR = "HTMLDIR"
|
||||
INCLUDEDIR = "INCLUDEDIR"
|
||||
PKGINCLUDEDIR = "PKGINCLUDEDIR"
|
||||
INCLUDEDIR_SERVER = "INCLUDEDIR-SERVER"
|
||||
LIBDIR = "LIBDIR"
|
||||
PKGLIBDIR = "PKGLIBDIR"
|
||||
LOCALEDIR = "LOCALEDIR"
|
||||
MANDIR = "MANDIR"
|
||||
SHAREDIR = "SHAREDIR"
|
||||
SYSCONFDIR = "SYSCONFDIR"
|
||||
PGXS = "PGXS"
|
||||
CONFIGURE = "CONFIGURE"
|
||||
CC = "CC"
|
||||
CPPFLAGS = "CPPFLAGS"
|
||||
CFLAGS = "CFLAGS"
|
||||
CFLAGS_SL = "CFLAGS_SL"
|
||||
LDFLAGS = "LDFLAGS"
|
||||
LDFLAGS_EX = "LDFLAGS_EX"
|
||||
LDFLAGS_SL = "LDFLAGS_SL"
|
||||
LIBS = "LIBS"
|
||||
VERSION = "VERSION"
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
# TODO: This could become a TypedDict if Python ever allows StrEnums to be
|
||||
# keys.
|
||||
PgConfig = dict[PgConfigKey, str | Path | list[str]]
|
||||
|
||||
|
||||
def __get_pg_config(pg_bin: PgBin) -> PgConfig:
|
||||
"""Get pg_config values by invoking the command"""
|
||||
|
||||
cmd = pg_bin.run_nonblocking(["pg_config"])
|
||||
cmd.wait()
|
||||
if cmd.returncode != 0:
|
||||
pytest.exit("")
|
||||
assert cmd.stdout
|
||||
|
||||
stdout = cast("IO[str]", cmd.stdout)
|
||||
|
||||
# Parse the output into a dictionary
|
||||
values: PgConfig = {}
|
||||
for line in stdout.readlines():
|
||||
if "=" in line:
|
||||
key, value = line.split("=", 1)
|
||||
value = value.strip()
|
||||
match PgConfigKey(key.strip()):
|
||||
case (
|
||||
(
|
||||
PgConfigKey.CC
|
||||
| PgConfigKey.CPPFLAGS
|
||||
| PgConfigKey.CFLAGS
|
||||
| PgConfigKey.CFLAGS_SL
|
||||
| PgConfigKey.LDFLAGS
|
||||
| PgConfigKey.LDFLAGS_EX
|
||||
| PgConfigKey.LDFLAGS_SL
|
||||
| PgConfigKey.LIBS
|
||||
) as k
|
||||
):
|
||||
values[k] = shlex.split(value)
|
||||
case (
|
||||
(
|
||||
PgConfigKey.BINDIR
|
||||
| PgConfigKey.DOCDIR
|
||||
| PgConfigKey.HTMLDIR
|
||||
| PgConfigKey.INCLUDEDIR
|
||||
| PgConfigKey.PKGINCLUDEDIR
|
||||
| PgConfigKey.INCLUDEDIR_SERVER
|
||||
| PgConfigKey.LIBDIR
|
||||
| PgConfigKey.PKGLIBDIR
|
||||
| PgConfigKey.LOCALEDIR
|
||||
| PgConfigKey.MANDIR
|
||||
| PgConfigKey.SHAREDIR
|
||||
| PgConfigKey.SYSCONFDIR
|
||||
| PgConfigKey.PGXS
|
||||
) as k
|
||||
):
|
||||
values[k] = Path(value)
|
||||
case _ as k:
|
||||
values[k] = value
|
||||
|
||||
return values
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config(pg_bin: PgBin) -> Iterator[PgConfig]:
|
||||
"""Dictionary of all pg_config values from the system"""
|
||||
|
||||
yield __get_pg_config(pg_bin)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_bindir(pg_config: PgConfig) -> Iterator[Path]:
|
||||
"""BINDIR value from pg_config"""
|
||||
yield cast("Path", pg_config[PgConfigKey.BINDIR])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_docdir(pg_config: PgConfig) -> Iterator[Path]:
|
||||
"""DOCDIR value from pg_config"""
|
||||
yield cast("Path", pg_config[PgConfigKey.DOCDIR])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_htmldir(pg_config: PgConfig) -> Iterator[Path]:
|
||||
"""HTMLDIR value from pg_config"""
|
||||
yield cast("Path", pg_config[PgConfigKey.HTMLDIR])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_includedir(
|
||||
pg_config: dict[PgConfigKey, str | Path | list[str]],
|
||||
) -> Iterator[Path]:
|
||||
"""INCLUDEDIR value from pg_config"""
|
||||
yield cast("Path", pg_config[PgConfigKey.INCLUDEDIR])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_pkgincludedir(pg_config: PgConfig) -> Iterator[Path]:
|
||||
"""PKGINCLUDEDIR value from pg_config"""
|
||||
yield cast("Path", pg_config[PgConfigKey.PKGINCLUDEDIR])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_includedir_server(pg_config: PgConfig) -> Iterator[Path]:
|
||||
"""INCLUDEDIR-SERVER value from pg_config"""
|
||||
yield cast("Path", pg_config[PgConfigKey.INCLUDEDIR_SERVER])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_libdir(pg_config: PgConfig) -> Iterator[Path]:
|
||||
"""LIBDIR value from pg_config"""
|
||||
yield cast("Path", pg_config[PgConfigKey.LIBDIR])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_pkglibdir(pg_config: PgConfig) -> Iterator[Path]:
|
||||
"""PKGLIBDIR value from pg_config"""
|
||||
yield cast("Path", pg_config[PgConfigKey.PKGLIBDIR])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_localedir(pg_config: PgConfig) -> Iterator[Path]:
|
||||
"""LOCALEDIR value from pg_config"""
|
||||
yield cast("Path", pg_config[PgConfigKey.LOCALEDIR])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_mandir(pg_config: PgConfig) -> Iterator[Path]:
|
||||
"""MANDIR value from pg_config"""
|
||||
yield cast("Path", pg_config[PgConfigKey.MANDIR])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_sharedir(pg_config: PgConfig) -> Iterator[Path]:
|
||||
"""SHAREDIR value from pg_config"""
|
||||
yield cast("Path", pg_config[PgConfigKey.SHAREDIR])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_sysconfdir(pg_config: PgConfig) -> Iterator[Path]:
|
||||
"""SYSCONFDIR value from pg_config"""
|
||||
yield cast("Path", pg_config[PgConfigKey.SYSCONFDIR])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_pgxs(pg_config: PgConfig) -> Iterator[Path]:
|
||||
"""PGXS value from pg_config"""
|
||||
yield cast("Path", pg_config[PgConfigKey.PGXS])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_configure(pg_config: PgConfig) -> Iterator[str]:
|
||||
"""CONFIGURE value from pg_config"""
|
||||
yield cast("str", pg_config[PgConfigKey.CONFIGURE])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_cc(pg_config: PgConfig) -> Iterator[list[str]]:
|
||||
"""CC value from pg_config"""
|
||||
yield cast("list[str]", pg_config[PgConfigKey.CC])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_cppflags(pg_config: PgConfig) -> Iterator[list[str]]:
|
||||
"""CPPFLAGS value from pg_config"""
|
||||
yield cast("list[str]", pg_config[PgConfigKey.CPPFLAGS])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_cflags(pg_config: PgConfig) -> Iterator[list[str]]:
|
||||
"""CFLAGS value from pg_config"""
|
||||
yield cast("list[str]", pg_config[PgConfigKey.CFLAGS])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_cflags_sl(pg_config: PgConfig) -> Iterator[list[str]]:
|
||||
"""CFLAGS_SL value from pg_config"""
|
||||
yield cast("list[str]", pg_config[PgConfigKey.CFLAGS_SL])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_ldflags(pg_config: PgConfig) -> Iterator[list[str]]:
|
||||
"""LDFLAGS value from pg_config"""
|
||||
yield cast("list[str]", pg_config[PgConfigKey.LDFLAGS])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_ldflags_ex(pg_config: PgConfig) -> Iterator[list[str]]:
|
||||
"""LDFLAGS_EX value from pg_config"""
|
||||
yield cast("list[str]", pg_config[PgConfigKey.LDFLAGS_EX])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_ldflags_sl(pg_config: PgConfig) -> Iterator[list[str]]:
|
||||
"""LDFLAGS_SL value from pg_config"""
|
||||
yield cast("list[str]", pg_config[PgConfigKey.LDFLAGS_SL])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_libs(pg_config: PgConfig) -> Iterator[list[str]]:
|
||||
"""LIBS value from pg_config"""
|
||||
yield cast("list[str]", pg_config[PgConfigKey.LIBS])
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_config_version(pg_config: PgConfig) -> Iterator[str]:
|
||||
"""VERSION value from pg_config"""
|
||||
yield cast("str", pg_config[PgConfigKey.VERSION])
|
||||
@@ -323,6 +323,7 @@ class NeonProject:
|
||||
if self.restart_pgbench_on_console_errors and (
|
||||
"ERROR: Couldn't connect to compute node" in err
|
||||
or "ERROR: Console request failed" in err
|
||||
or "ERROR: Control plane request failed" in err
|
||||
):
|
||||
log.info("Restarting benchmark for %s", target)
|
||||
self.benchmarks.pop(target)
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
\echo Use "CREATE EXTENSION test_extension" to load this file. \quit
|
||||
|
||||
CREATE SCHEMA test_extension;
|
||||
|
||||
CREATE FUNCTION test_extension.motd()
|
||||
RETURNS void
|
||||
IMMUTABLE LEAKPROOF PARALLEL SAFE
|
||||
AS $$
|
||||
BEGIN
|
||||
RAISE NOTICE 'Have a great day';
|
||||
END;
|
||||
$$ LANGUAGE 'plpgsql';
|
||||
@@ -1 +0,0 @@
|
||||
comment = 'Test extension'
|
||||
@@ -1,6 +1,6 @@
|
||||
\echo Use "ALTER EXTENSION test_extension UPDATE TO '1.1'" to load this file. \quit
|
||||
\echo Use "ALTER EXTENSION test_extension_sql_only UPDATE TO '1.1'" to load this file. \quit
|
||||
|
||||
CREATE FUNCTION test_extension.fun_fact()
|
||||
CREATE FUNCTION test_extension_sql_only.fun_fact()
|
||||
RETURNS void
|
||||
IMMUTABLE LEAKPROOF PARALLEL SAFE
|
||||
AS $$
|
||||
@@ -0,0 +1,12 @@
|
||||
\echo Use "CREATE EXTENSION test_extension_sql_only" to load this file. \quit
|
||||
|
||||
CREATE SCHEMA test_extension_sql_only;
|
||||
|
||||
CREATE FUNCTION test_extension_sql_only.motd()
|
||||
RETURNS void
|
||||
IMMUTABLE LEAKPROOF PARALLEL SAFE
|
||||
AS $$
|
||||
BEGIN
|
||||
RAISE NOTICE 'Have a great day';
|
||||
END;
|
||||
$$ LANGUAGE 'plpgsql';
|
||||
@@ -0,0 +1 @@
|
||||
comment = 'Test extension SQL only'
|
||||
@@ -0,0 +1,6 @@
|
||||
\echo Use "ALTER EXTENSION test_extension_with_lib UPDATE TO '1.1'" to load this file. \quit
|
||||
|
||||
CREATE FUNCTION test_extension_with_lib.fun_fact()
|
||||
RETURNS void
|
||||
IMMUTABLE LEAKPROOF PARALLEL SAFE
|
||||
AS 'MODULE_PATHNAME', 'fun_fact' LANGUAGE C;
|
||||
@@ -0,0 +1,8 @@
|
||||
\echo Use "CREATE EXTENSION test_extension_with_lib" to load this file. \quit
|
||||
|
||||
CREATE SCHEMA test_extension_with_lib;
|
||||
|
||||
CREATE FUNCTION test_extension_with_lib.motd()
|
||||
RETURNS void
|
||||
IMMUTABLE LEAKPROOF PARALLEL SAFE
|
||||
AS 'MODULE_PATHNAME', 'motd' LANGUAGE C;
|
||||
@@ -0,0 +1,34 @@
|
||||
#include <postgres.h>
|
||||
|
||||
#include <fmgr.h>
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
|
||||
PG_FUNCTION_INFO_V1(motd);
|
||||
PG_FUNCTION_INFO_V1(fun_fact);
|
||||
|
||||
/* Old versions of Postgres didn't pre-declare this in fmgr.h */
|
||||
#if PG_MAJORVERSION_NUM <= 15
|
||||
void _PG_init(void);
|
||||
#endif
|
||||
|
||||
void
|
||||
_PG_init(void)
|
||||
{
|
||||
}
|
||||
|
||||
Datum
|
||||
motd(PG_FUNCTION_ARGS)
|
||||
{
|
||||
elog(NOTICE, "Have a great day");
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
Datum
|
||||
fun_fact(PG_FUNCTION_ARGS)
|
||||
{
|
||||
elog(NOTICE, "Neon has a melting point of -246.08 C");
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
comment = 'Test extension with lib'
|
||||
module_pathname = '$libdir/test_extension_with_lib'
|
||||
70
test_runner/regress/test_compute_monitor.py
Normal file
70
test_runner/regress/test_compute_monitor.py
Normal file
@@ -0,0 +1,70 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_compute_monitor(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that compute_ctl can detect Postgres going down (unresponsive) and
|
||||
reconnect when it comes back online. Also check that the downtime metrics
|
||||
are properly emitted.
|
||||
"""
|
||||
TEST_DB = "test_compute_monitor"
|
||||
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
# Check that default postgres database is present
|
||||
with endpoint.cursor() as cursor:
|
||||
cursor.execute("SELECT datname FROM pg_database WHERE datname = 'postgres'")
|
||||
catalog_db = cursor.fetchone()
|
||||
assert catalog_db is not None
|
||||
assert len(catalog_db) == 1
|
||||
|
||||
# Create a new database
|
||||
cursor.execute(f"CREATE DATABASE {TEST_DB}")
|
||||
|
||||
# Drop database 'postgres'
|
||||
with endpoint.cursor(dbname=TEST_DB) as cursor:
|
||||
# Use FORCE to terminate all connections to the database
|
||||
cursor.execute("DROP DATABASE postgres WITH (FORCE)")
|
||||
|
||||
client = endpoint.http_client()
|
||||
|
||||
def check_metrics_down():
|
||||
raw_metrics = client.metrics()
|
||||
metrics = parse_metrics(raw_metrics)
|
||||
compute_pg_current_downtime_ms = metrics.query_all("compute_pg_current_downtime_ms")
|
||||
assert len(compute_pg_current_downtime_ms) == 1
|
||||
assert compute_pg_current_downtime_ms[0].value > 0
|
||||
compute_pg_downtime_ms_total = metrics.query_all("compute_pg_downtime_ms_total")
|
||||
assert len(compute_pg_downtime_ms_total) == 1
|
||||
assert compute_pg_downtime_ms_total[0].value > 0
|
||||
|
||||
wait_until(check_metrics_down)
|
||||
|
||||
# Recreate postgres database
|
||||
with endpoint.cursor(dbname=TEST_DB) as cursor:
|
||||
cursor.execute("CREATE DATABASE postgres")
|
||||
|
||||
# Current downtime should reset to 0, but not total downtime
|
||||
def check_metrics_up():
|
||||
raw_metrics = client.metrics()
|
||||
metrics = parse_metrics(raw_metrics)
|
||||
compute_pg_current_downtime_ms = metrics.query_all("compute_pg_current_downtime_ms")
|
||||
assert len(compute_pg_current_downtime_ms) == 1
|
||||
assert compute_pg_current_downtime_ms[0].value == 0
|
||||
compute_pg_downtime_ms_total = metrics.query_all("compute_pg_downtime_ms_total")
|
||||
assert len(compute_pg_downtime_ms_total) == 1
|
||||
assert compute_pg_downtime_ms_total[0].value > 0
|
||||
|
||||
wait_until(check_metrics_up)
|
||||
|
||||
# Just a sanity check that we log the downtime info
|
||||
endpoint.log_contains("downtime_info")
|
||||
@@ -4,12 +4,17 @@ import os
|
||||
import platform
|
||||
import shutil
|
||||
import tarfile
|
||||
from typing import TYPE_CHECKING
|
||||
from enum import StrEnum
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, cast, final
|
||||
|
||||
import pytest
|
||||
import zstandard
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.paths import BASE_DIR
|
||||
from fixtures.pg_config import PgConfigKey
|
||||
from fixtures.utils import subprocess_capture
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -20,6 +25,7 @@ if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.pg_config import PgConfig
|
||||
from fixtures.pg_version import PgVersion
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
@@ -46,46 +52,108 @@ def neon_env_builder_local(
|
||||
return neon_env_builder
|
||||
|
||||
|
||||
@final
|
||||
class RemoteExtension(StrEnum):
|
||||
SQL_ONLY = "test_extension_sql_only"
|
||||
WITH_LIB = "test_extension_with_lib"
|
||||
|
||||
@property
|
||||
def compressed_tarball_name(self) -> str:
|
||||
return f"{self.tarball_name}.zst"
|
||||
|
||||
@property
|
||||
def control_file_name(self) -> str:
|
||||
return f"{self}.control"
|
||||
|
||||
@property
|
||||
def directory(self) -> Path:
|
||||
return BASE_DIR / "test_runner" / "regress" / "data" / "test_remote_extensions" / self
|
||||
|
||||
@property
|
||||
def shared_library_name(self) -> str:
|
||||
return f"{self}.so"
|
||||
|
||||
@property
|
||||
def tarball_name(self) -> str:
|
||||
return f"{self}.tar"
|
||||
|
||||
def archive_route(self, build_tag: str, arch: str, pg_version: PgVersion) -> str:
|
||||
return f"{build_tag}/{arch}/v{pg_version}/extensions/{self.compressed_tarball_name}"
|
||||
|
||||
def build(self, pg_config: PgConfig, output_dir: Path) -> None:
|
||||
if self is not RemoteExtension.WITH_LIB:
|
||||
return
|
||||
|
||||
cmd: list[str] = [
|
||||
*cast("list[str]", pg_config[PgConfigKey.CC]),
|
||||
*cast("list[str]", pg_config[PgConfigKey.CPPFLAGS]),
|
||||
*["-I", str(cast("Path", pg_config[PgConfigKey.INCLUDEDIR_SERVER]))],
|
||||
*cast("list[str]", pg_config[PgConfigKey.CFLAGS]),
|
||||
*cast("list[str]", pg_config[PgConfigKey.CFLAGS_SL]),
|
||||
*cast("list[str]", pg_config[PgConfigKey.LDFLAGS_EX]),
|
||||
*cast("list[str]", pg_config[PgConfigKey.LDFLAGS_SL]),
|
||||
"-shared",
|
||||
*["-o", str(output_dir / self.shared_library_name)],
|
||||
str(self.directory / "src" / f"{self}.c"),
|
||||
]
|
||||
|
||||
subprocess_capture(output_dir, cmd, check=True)
|
||||
|
||||
def control_file_contents(self) -> str:
|
||||
with open(self.directory / self.control_file_name, encoding="utf-8") as f:
|
||||
return f.read()
|
||||
|
||||
def files(self, output_dir: Path) -> dict[Path, str]:
|
||||
files = {
|
||||
# self.directory / self.control_file_name: f"share/extension/{self.control_file_name}",
|
||||
self.directory / "sql" / f"{self}--1.0.sql": f"share/extension/{self}--1.0.sql",
|
||||
self.directory
|
||||
/ "sql"
|
||||
/ f"{self}--1.0--1.1.sql": f"share/extension/{self}--1.0--1.1.sql",
|
||||
}
|
||||
|
||||
if self is RemoteExtension.WITH_LIB:
|
||||
files[output_dir / self.shared_library_name] = f"lib/{self.shared_library_name}"
|
||||
|
||||
return files
|
||||
|
||||
def package(self, output_dir: Path) -> Path:
|
||||
tarball = output_dir / self.tarball_name
|
||||
with tarfile.open(tarball, "x") as tarf:
|
||||
for file, arcname in self.files(output_dir).items():
|
||||
tarf.add(file, arcname=arcname)
|
||||
|
||||
return tarball
|
||||
|
||||
def remove(self, output_dir: Path, pg_version: PgVersion) -> None:
|
||||
for file in self.files(output_dir).values():
|
||||
if file.startswith("share/extension"):
|
||||
file = f"share/postgresql/extension/{os.path.basename(file)}"
|
||||
if file.startswith("lib"):
|
||||
file = f"lib/postgresql/{os.path.basename(file)}"
|
||||
(output_dir / "pg_install" / f"v{pg_version}" / file).unlink()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"extension",
|
||||
(RemoteExtension.SQL_ONLY, RemoteExtension.WITH_LIB),
|
||||
ids=["sql_only", "with_lib"],
|
||||
)
|
||||
def test_remote_extensions(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder_local: NeonEnvBuilder,
|
||||
httpserver_listen_address: ListenAddress,
|
||||
test_output_dir: Path,
|
||||
base_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
pg_config: PgConfig,
|
||||
extension: RemoteExtension,
|
||||
):
|
||||
# Setup a mock nginx S3 gateway which will return our test extension.
|
||||
(host, port) = httpserver_listen_address
|
||||
extensions_endpoint = f"http://{host}:{port}/pg-ext-s3-gateway"
|
||||
|
||||
build_tag = os.environ.get("BUILD_TAG", "latest")
|
||||
|
||||
# We have decided to use the Go naming convention due to Kubernetes.
|
||||
arch = platform.machine()
|
||||
match arch:
|
||||
case "aarch64":
|
||||
arch = "arm64"
|
||||
case "x86_64":
|
||||
arch = "amd64"
|
||||
case _:
|
||||
pass
|
||||
|
||||
archive_route = f"{build_tag}/{arch}/v{pg_version}/extensions/test_extension.tar.zst"
|
||||
tarball = test_output_dir / "test_extension.tar"
|
||||
extension_dir = (
|
||||
base_dir / "test_runner" / "regress" / "data" / "test_remote_extensions" / "test_extension"
|
||||
)
|
||||
|
||||
# Create tarball
|
||||
with tarfile.open(tarball, "x") as tarf:
|
||||
tarf.add(
|
||||
extension_dir / "sql" / "test_extension--1.0.sql",
|
||||
arcname="share/extension/test_extension--1.0.sql",
|
||||
)
|
||||
tarf.add(
|
||||
extension_dir / "sql" / "test_extension--1.0--1.1.sql",
|
||||
arcname="share/extension/test_extension--1.0--1.1.sql",
|
||||
)
|
||||
extension.build(pg_config, test_output_dir)
|
||||
tarball = extension.package(test_output_dir)
|
||||
|
||||
def handler(request: Request) -> Response:
|
||||
log.info(f"request: {request}")
|
||||
@@ -104,8 +172,19 @@ def test_remote_extensions(
|
||||
direct_passthrough=True,
|
||||
)
|
||||
|
||||
# We have decided to use the Go naming convention due to Kubernetes.
|
||||
arch = platform.machine()
|
||||
match arch:
|
||||
case "aarch64":
|
||||
arch = "arm64"
|
||||
case "x86_64":
|
||||
arch = "amd64"
|
||||
case _:
|
||||
pass
|
||||
|
||||
httpserver.expect_request(
|
||||
f"/pg-ext-s3-gateway/{archive_route}", method="GET"
|
||||
f"/pg-ext-s3-gateway/{extension.archive_route(build_tag=os.environ.get('BUILD_TAG', 'latest'), arch=arch, pg_version=pg_version)}",
|
||||
method="GET",
|
||||
).respond_with_handler(handler)
|
||||
|
||||
# Start a compute node with remote_extension spec
|
||||
@@ -114,21 +193,18 @@ def test_remote_extensions(
|
||||
env.create_branch("test_remote_extensions")
|
||||
endpoint = env.endpoints.create("test_remote_extensions")
|
||||
|
||||
with open(extension_dir / "test_extension.control", encoding="utf-8") as f:
|
||||
control_data = f.read()
|
||||
|
||||
# mock remote_extensions spec
|
||||
spec: dict[str, Any] = {
|
||||
"public_extensions": ["test_extension"],
|
||||
"public_extensions": [extension],
|
||||
"custom_extensions": None,
|
||||
"library_index": {
|
||||
"test_extension": "test_extension",
|
||||
extension: extension,
|
||||
},
|
||||
"extension_data": {
|
||||
"test_extension": {
|
||||
extension: {
|
||||
"archive_path": "",
|
||||
"control_data": {
|
||||
"test_extension.control": control_data,
|
||||
extension.control_file_name: extension.control_file_contents(),
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -141,8 +217,8 @@ def test_remote_extensions(
|
||||
with endpoint.connect() as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Check that appropriate files were downloaded
|
||||
cur.execute("CREATE EXTENSION test_extension VERSION '1.0'")
|
||||
cur.execute("SELECT test_extension.motd()")
|
||||
cur.execute(f"CREATE EXTENSION {extension} VERSION '1.0'")
|
||||
cur.execute(f"SELECT {extension}.motd()")
|
||||
|
||||
httpserver.check()
|
||||
|
||||
@@ -153,7 +229,7 @@ def test_remote_extensions(
|
||||
remote_ext_requests = metrics.query_all(
|
||||
"compute_ctl_remote_ext_requests_total",
|
||||
# Check that we properly report the filename in the metrics
|
||||
{"filename": "test_extension.tar.zst"},
|
||||
{"filename": extension.compressed_tarball_name},
|
||||
)
|
||||
assert len(remote_ext_requests) == 1
|
||||
for sample in remote_ext_requests:
|
||||
@@ -162,20 +238,7 @@ def test_remote_extensions(
|
||||
endpoint.stop()
|
||||
|
||||
# Remove the extension files to force a redownload of the extension.
|
||||
for file in (
|
||||
"test_extension.control",
|
||||
"test_extension--1.0.sql",
|
||||
"test_extension--1.0--1.1.sql",
|
||||
):
|
||||
(
|
||||
test_output_dir
|
||||
/ "pg_install"
|
||||
/ f"v{pg_version}"
|
||||
/ "share"
|
||||
/ "postgresql"
|
||||
/ "extension"
|
||||
/ file
|
||||
).unlink()
|
||||
extension.remove(test_output_dir, pg_version)
|
||||
|
||||
endpoint.start(remote_ext_config=extensions_endpoint)
|
||||
|
||||
@@ -183,8 +246,8 @@ def test_remote_extensions(
|
||||
with endpoint.connect() as conn:
|
||||
with conn.cursor() as cur:
|
||||
# Check that appropriate files were downloaded
|
||||
cur.execute("ALTER EXTENSION test_extension UPDATE TO '1.1'")
|
||||
cur.execute("SELECT test_extension.fun_fact()")
|
||||
cur.execute(f"ALTER EXTENSION {extension} UPDATE TO '1.1'")
|
||||
cur.execute(f"SELECT {extension}.fun_fact()")
|
||||
|
||||
# Check that we properly recorded downloads in the metrics
|
||||
client = endpoint.http_client()
|
||||
@@ -193,7 +256,7 @@ def test_remote_extensions(
|
||||
remote_ext_requests = metrics.query_all(
|
||||
"compute_ctl_remote_ext_requests_total",
|
||||
# Check that we properly report the filename in the metrics
|
||||
{"filename": "test_extension.tar.zst"},
|
||||
{"filename": extension.compressed_tarball_name},
|
||||
)
|
||||
assert len(remote_ext_requests) == 1
|
||||
for sample in remote_ext_requests:
|
||||
|
||||
@@ -18,7 +18,12 @@ from fixtures.pageserver.http import (
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import MockS3Server, RemoteStorageKind
|
||||
from fixtures.utils import shared_buffers_for_max_cu, skip_in_debug_build, wait_until
|
||||
from fixtures.utils import (
|
||||
run_only_on_default_postgres,
|
||||
shared_buffers_for_max_cu,
|
||||
skip_in_debug_build,
|
||||
wait_until,
|
||||
)
|
||||
from mypy_boto3_kms import KMSClient
|
||||
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
|
||||
from mypy_boto3_s3 import S3Client
|
||||
@@ -43,6 +48,24 @@ smoke_params = [
|
||||
]
|
||||
|
||||
|
||||
def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path):
|
||||
"""
|
||||
Mock the import S3 bucket into a local directory for a provided vanilla PG instance.
|
||||
"""
|
||||
assert not vanilla_pg.is_running()
|
||||
|
||||
path.mkdir()
|
||||
# what cplane writes before scheduling fast_import
|
||||
specpath = path / "spec.json"
|
||||
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
|
||||
# what fast_import writes
|
||||
vanilla_pg.pgdatadir.rename(path / "pgdata")
|
||||
statusdir = path / "status"
|
||||
statusdir.mkdir()
|
||||
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
|
||||
(statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True}))
|
||||
|
||||
|
||||
@skip_in_debug_build("MULTIPLE_RELATION_SEGMENTS has non trivial amount of data")
|
||||
@pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params)
|
||||
def test_pgdata_import_smoke(
|
||||
@@ -155,17 +178,8 @@ def test_pgdata_import_smoke(
|
||||
# TODO: actually exercise fast_import here
|
||||
# TODO: test s3 remote storage
|
||||
#
|
||||
importbucket = neon_env_builder.repo_dir / "importbucket"
|
||||
importbucket.mkdir()
|
||||
# what cplane writes before scheduling fast_import
|
||||
specpath = importbucket / "spec.json"
|
||||
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
|
||||
# what fast_import writes
|
||||
vanilla_pg.pgdatadir.rename(importbucket / "pgdata")
|
||||
statusdir = importbucket / "status"
|
||||
statusdir.mkdir()
|
||||
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
|
||||
(statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True}))
|
||||
importbucket_path = neon_env_builder.repo_dir / "importbucket"
|
||||
mock_import_bucket(vanilla_pg, importbucket_path)
|
||||
|
||||
#
|
||||
# Do the import
|
||||
@@ -192,7 +206,7 @@ def test_pgdata_import_smoke(
|
||||
"new_timeline_id": str(timeline_id),
|
||||
"import_pgdata": {
|
||||
"idempotency_key": str(idempotency),
|
||||
"location": {"LocalFs": {"path": str(importbucket.absolute())}},
|
||||
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
|
||||
},
|
||||
},
|
||||
)
|
||||
@@ -319,6 +333,87 @@ def test_pgdata_import_smoke(
|
||||
br_initdb_endpoint.safe_psql("select * from othertable")
|
||||
|
||||
|
||||
@run_only_on_default_postgres(reason="PG version is irrelevant here")
|
||||
def test_import_completion_on_restart(
|
||||
neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer
|
||||
):
|
||||
"""
|
||||
Validate that the storage controller delivers the import completion notification
|
||||
eventually even if it was restarted when the import initially completed.
|
||||
"""
|
||||
# Set up mock control plane HTTP server to listen for import completions
|
||||
import_completion_signaled = Event()
|
||||
|
||||
def handler(request: Request) -> Response:
|
||||
log.info(f"control plane /import_complete request: {request.json}")
|
||||
import_completion_signaled.set()
|
||||
return Response(json.dumps({}), status=200)
|
||||
|
||||
cplane_mgmt_api_server = make_httpserver
|
||||
cplane_mgmt_api_server.expect_request(
|
||||
"/storage/api/v1/import_complete", method="PUT"
|
||||
).respond_with_handler(handler)
|
||||
|
||||
# Plug the cplane mock in
|
||||
neon_env_builder.control_plane_hooks_api = (
|
||||
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
|
||||
)
|
||||
|
||||
# The import will specifiy a local filesystem path mocking remote storage
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.stop()
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
importbucket_path = neon_env_builder.repo_dir / "test_import_completion_bucket"
|
||||
mock_import_bucket(vanilla_pg, importbucket_path)
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
idempotency = ImportPgdataIdemptencyKey.random()
|
||||
|
||||
# Pause before sending the notification
|
||||
failpoint_name = "timeline-import-pre-cplane-notification"
|
||||
env.storage_controller.configure_failpoints((failpoint_name, "pause"))
|
||||
|
||||
env.storage_controller.tenant_create(tenant_id)
|
||||
env.storage_controller.timeline_create(
|
||||
tenant_id,
|
||||
{
|
||||
"new_timeline_id": str(timeline_id),
|
||||
"import_pgdata": {
|
||||
"idempotency_key": str(idempotency),
|
||||
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
def hit_failpoint():
|
||||
log.info("Checking log for pattern...")
|
||||
try:
|
||||
assert env.storage_controller.log_contains(f".*at failpoint {failpoint_name}.*")
|
||||
except Exception:
|
||||
log.exception("Failed to find pattern in log")
|
||||
raise
|
||||
|
||||
wait_until(hit_failpoint)
|
||||
assert not import_completion_signaled.is_set()
|
||||
|
||||
# Restart the storage controller before signalling control plane.
|
||||
# This clears the failpoint and we expect that the import start-up reconciliation
|
||||
# kicks in and notifies cplane.
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
def cplane_notified():
|
||||
assert import_completion_signaled.is_set()
|
||||
|
||||
wait_until(cplane_notified)
|
||||
|
||||
|
||||
def test_fast_import_with_pageserver_ingest(
|
||||
test_output_dir,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
|
||||
@@ -2894,12 +2894,10 @@ def test_storage_controller_leadership_transfer(
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("step_down_times_out", [False, True])
|
||||
def test_storage_controller_leadership_transfer_during_split(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
storage_controller_proxy: StorageControllerProxy,
|
||||
port_distributor: PortDistributor,
|
||||
step_down_times_out: bool,
|
||||
):
|
||||
"""
|
||||
Exercise a race between shard splitting and graceful leadership transfer. This is
|
||||
@@ -2940,8 +2938,8 @@ def test_storage_controller_leadership_transfer_during_split(
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
# We are testing scenarios where the step down API does not complete: either because it is stuck
|
||||
# doing a shard split, or because it totally times out on some other failpoint.
|
||||
# We are testing scenarios where the step down API does not complete: it is stuck
|
||||
# doing a shard split
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*step_down.*request was dropped before completing.*",
|
||||
@@ -2949,6 +2947,7 @@ def test_storage_controller_leadership_transfer_during_split(
|
||||
".*Send step down request failed, will retry.*",
|
||||
".*Send step down request still failed after.*retries.*",
|
||||
".*Leader .+ did not respond to step-down request.*",
|
||||
".*Stopping reconciliations during step down is taking too long.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -2960,13 +2959,6 @@ def test_storage_controller_leadership_transfer_during_split(
|
||||
pause_failpoint = "shard-split-pre-complete"
|
||||
env.storage_controller.configure_failpoints((pause_failpoint, "pause"))
|
||||
|
||||
if not step_down_times_out:
|
||||
# Prevent the timeout self-terminate code from executing: we will block step down on the
|
||||
# shard split itself
|
||||
env.storage_controller.configure_failpoints(
|
||||
("step-down-delay-timeout", "return(3600000)")
|
||||
)
|
||||
|
||||
split_fut = executor.submit(
|
||||
env.storage_controller.tenant_shard_split, list(tenants)[0], shard_count * 2
|
||||
)
|
||||
@@ -2985,13 +2977,9 @@ def test_storage_controller_leadership_transfer_during_split(
|
||||
timeout_in_seconds=30, instance_id=2, base_port=storage_controller_2_port
|
||||
)
|
||||
|
||||
if step_down_times_out:
|
||||
# Step down will time out, original controller will terminate itself
|
||||
env.storage_controller.allowed_errors.extend([".*terminating process.*"])
|
||||
else:
|
||||
# Step down does not time out: original controller hits its shard split completion
|
||||
# code path and realises that it must not purge the parent shards from the database.
|
||||
env.storage_controller.allowed_errors.extend([".*Enqueuing background abort.*"])
|
||||
# Step down does not time out: original controller hits its shard split completion
|
||||
# code path and realises that it must not purge the parent shards from the database.
|
||||
env.storage_controller.allowed_errors.extend([".*Enqueuing background abort.*"])
|
||||
|
||||
def passed_split_abort():
|
||||
try:
|
||||
@@ -3007,42 +2995,34 @@ def test_storage_controller_leadership_transfer_during_split(
|
||||
wait_until(passed_split_abort, interval=0.1, status_interval=1.0)
|
||||
assert env.storage_controller.log_contains(".*Aborting shard split.*")
|
||||
|
||||
if step_down_times_out:
|
||||
# We will let the old controller hit a timeout path where it terminates itself, rather than
|
||||
# completing step_down and trying to complete a shard split
|
||||
def old_controller_terminated():
|
||||
assert env.storage_controller.log_contains(".*terminating process.*")
|
||||
# Proxy is still talking to original controller here: disable its pause failpoint so
|
||||
# that its shard split can run to completion.
|
||||
log.info("Disabling failpoint")
|
||||
# Bypass the proxy: the python test HTTPServer is single threaded and still blocked
|
||||
# on handling the shard split request.
|
||||
env.storage_controller.request(
|
||||
"PUT",
|
||||
f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints",
|
||||
json=[{"name": "shard-split-pre-complete", "actions": "off"}],
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
wait_until(old_controller_terminated)
|
||||
else:
|
||||
# Proxy is still talking to original controller here: disable its pause failpoint so
|
||||
# that its shard split can run to completion.
|
||||
log.info("Disabling failpoint")
|
||||
# Bypass the proxy: the python test HTTPServer is single threaded and still blocked
|
||||
# on handling the shard split request.
|
||||
env.storage_controller.request(
|
||||
"PUT",
|
||||
f"http://127.0.0.1:{storage_controller_1_port}/debug/v1/failpoints",
|
||||
json=[{"name": "shard-split-pre-complete", "actions": "off"}],
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
def previous_stepped_down():
|
||||
assert (
|
||||
env.storage_controller.get_leadership_status()
|
||||
== StorageControllerLeadershipStatus.STEPPED_DOWN
|
||||
)
|
||||
|
||||
def previous_stepped_down():
|
||||
assert (
|
||||
env.storage_controller.get_leadership_status()
|
||||
== StorageControllerLeadershipStatus.STEPPED_DOWN
|
||||
)
|
||||
log.info("Awaiting step down")
|
||||
wait_until(previous_stepped_down)
|
||||
|
||||
log.info("Awaiting step down")
|
||||
wait_until(previous_stepped_down)
|
||||
|
||||
# Let the shard split complete: this may happen _after_ the replacement has come up
|
||||
# and tried to clean up the databases
|
||||
log.info("Unblocking & awaiting shard split")
|
||||
with pytest.raises(Exception, match="Unexpected child shard count"):
|
||||
# This split fails when it tries to persist results, because it encounters
|
||||
# changes already made by the new controller's abort-on-startup
|
||||
split_fut.result()
|
||||
# Let the shard split complete: this may happen _after_ the replacement has come up
|
||||
# and tried to clean up the databases
|
||||
log.info("Unblocking & awaiting shard split")
|
||||
with pytest.raises(Exception, match="Unexpected child shard count"):
|
||||
# This split fails when it tries to persist results, because it encounters
|
||||
# changes already made by the new controller's abort-on-startup
|
||||
split_fut.result()
|
||||
|
||||
log.info("Routing to new leader")
|
||||
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}")
|
||||
@@ -3060,14 +3040,13 @@ def test_storage_controller_leadership_transfer_during_split(
|
||||
env.storage_controller.wait_until_ready()
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
if not step_down_times_out:
|
||||
# Check that the stepped down instance forwards requests
|
||||
# to the new leader while it's still running.
|
||||
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
|
||||
env.storage_controller.tenant_shard_dump()
|
||||
env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"})
|
||||
status = env.storage_controller.node_status(env.pageservers[0].id)
|
||||
assert status["scheduling"] == "Pause"
|
||||
# Check that the stepped down instance forwards requests
|
||||
# to the new leader while it's still running.
|
||||
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
|
||||
env.storage_controller.tenant_shard_dump()
|
||||
env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"})
|
||||
status = env.storage_controller.node_status(env.pageservers[0].id)
|
||||
assert status["scheduling"] == "Pause"
|
||||
|
||||
|
||||
def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
Reference in New Issue
Block a user