Compare commits

..

12 Commits

Author SHA1 Message Date
Elizabeth Murray
2ff2eb6a9e Merge branch 'main' into elizabeth/connection-pool-with-tests 2025-06-22 14:45:44 -07:00
Elizabeth Murray
bdfc6d3ef9 Merge branch 'main' into elizabeth/connection-pool-with-tests 2025-06-20 16:32:26 -07:00
Elizabeth Murray
f47e90fd42 Add version strings to new modules. 2025-06-20 16:15:13 -07:00
Elizabeth Murray
9cc79672f3 Fix cargo deny check error, to see if this fixes Cargo.lock CI issue. 2025-06-20 15:00:09 -07:00
Elizabeth Murray
4a9b1ad5cb Merge branch 'main' into elizabeth/connection-pool-with-tests 2025-06-20 11:59:58 -07:00
Elizabeth Murray
dc4238896a Regenerate Cargo.lock as it is producing Dockerfile errors. 2025-06-20 11:45:41 -07:00
Elizabeth Murray
e1fa844da4 Clippy updates, add Cargo.lock. 2025-06-20 10:18:17 -07:00
Elizabeth Murray
c8a2612207 Add workspace license and version. 2025-06-20 09:48:10 -07:00
Elizabeth Murray
b6e89a3af8 Run clippy. 2025-06-20 09:42:11 -07:00
Elizabeth Murray
261a9ae093 Run cargo fmt. 2025-06-20 08:28:40 -07:00
Elizabeth Murray
cac4ee8ea3 Merge branch 'main' into elizabeth/connection-pool-with-tests 2025-06-20 08:27:57 -07:00
Elizabeth Murray
7636c4085a Add initial skeleton for client cache code, and request tracker. 2025-06-19 14:50:23 -07:00
27 changed files with 472 additions and 601 deletions

View File

@@ -104,10 +104,11 @@ jobs:
# Set some environment variables used by all the steps.
#
# CARGO_FLAGS is extra options to pass to all "cargo" subcommands.
# CARGO_FLAGS is extra options to pass to "cargo build", "cargo test" etc.
# It also includes --features, if any
#
# CARGO_PROFILE is passed to "cargo build", "cargo test" etc, but not to
# "cargo metadata", because it doesn't accept --release or --debug options.
# CARGO_FEATURES is passed to "cargo metadata". It is separate from CARGO_FLAGS,
# because "cargo metadata" doesn't accept --release or --debug options
#
# We run tests with addtional features, that are turned off by default (e.g. in release builds), see
# corresponding Cargo.toml files for their descriptions.
@@ -116,16 +117,16 @@ jobs:
ARCH: ${{ inputs.arch }}
SANITIZERS: ${{ inputs.sanitizers }}
run: |
CARGO_FLAGS="--locked --features testing"
CARGO_FEATURES="--features testing"
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_PROFILE=""
CARGO_FLAGS="--locked"
elif [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=""
CARGO_PROFILE=""
CARGO_FLAGS="--locked"
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=""
CARGO_PROFILE="--release"
CARGO_FLAGS="--locked --release"
fi
if [[ $SANITIZERS == 'enabled' ]]; then
make_vars="WITH_SANITIZERS=yes"
@@ -135,8 +136,8 @@ jobs:
{
echo "cov_prefix=${cov_prefix}"
echo "make_vars=${make_vars}"
echo "CARGO_FEATURES=${CARGO_FEATURES}"
echo "CARGO_FLAGS=${CARGO_FLAGS}"
echo "CARGO_PROFILE=${CARGO_PROFILE}"
echo "CARGO_HOME=${GITHUB_WORKSPACE}/.cargo"
} >> $GITHUB_ENV
@@ -188,18 +189,34 @@ jobs:
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Build all
# Note: the Makefile picks up BUILD_TYPE and CARGO_PROFILE from the env variables
run: mold -run make ${make_vars} all -j$(nproc) CARGO_BUILD_FLAGS="$CARGO_FLAGS"
- name: Build postgres v14
if: steps.cache_pg_14.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v14 -j$(nproc)
- name: Build postgres v15
if: steps.cache_pg_15.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v15 -j$(nproc)
- name: Build postgres v16
if: steps.cache_pg_16.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v16 -j$(nproc)
- name: Build postgres v17
if: steps.cache_pg_17.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v17 -j$(nproc)
- name: Build neon extensions
run: mold -run make ${make_vars} neon-pg-ext -j$(nproc)
- name: Build walproposer-lib
run: mold -run make ${make_vars} walproposer-lib -j$(nproc)
- name: Build unit tests
if: inputs.sanitizers != 'enabled'
- name: Run cargo build
env:
WITH_TESTS: ${{ inputs.sanitizers != 'enabled' && '--tests' || '' }}
run: |
export ASAN_OPTIONS=detect_leaks=0
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_PROFILE --tests
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins ${WITH_TESTS}
# Do install *before* running rust tests because they might recompile the
# binaries with different features/flags.
@@ -211,7 +228,7 @@ jobs:
# Install target binaries
mkdir -p /tmp/neon/bin/
binaries=$(
${cov_prefix} cargo metadata $CARGO_FLAGS --format-version=1 --no-deps |
${cov_prefix} cargo metadata $CARGO_FEATURES --format-version=1 --no-deps |
jq -r '.packages[].targets[] | select(.kind | index("bin")) | .name'
)
for bin in $binaries; do
@@ -228,7 +245,7 @@ jobs:
mkdir -p /tmp/neon/test_bin/
test_exe_paths=$(
${cov_prefix} cargo test $CARGO_FLAGS $CARGO_PROFILE --message-format=json --no-run |
${cov_prefix} cargo test $CARGO_FLAGS $CARGO_FEATURES --message-format=json --no-run |
jq -r '.executable | select(. != null)'
)
for bin in $test_exe_paths; do
@@ -262,10 +279,10 @@ jobs:
export LD_LIBRARY_PATH
#nextest does not yet support running doctests
${cov_prefix} cargo test --doc $CARGO_FLAGS $CARGO_PROFILE
${cov_prefix} cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
# run all non-pageserver tests
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_PROFILE -E '!package(pageserver)'
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E '!package(pageserver)'
# run pageserver tests
# (When developing new pageserver features gated by config fields, we commonly make the rust
@@ -274,13 +291,13 @@ jobs:
# pageserver tests from non-pageserver tests cuts down the time it takes for this CI step.)
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=tokio-epoll-uring \
${cov_prefix} \
cargo nextest run $CARGO_FLAGS $CARGO_PROFILE -E 'package(pageserver)'
cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)'
# Run separate tests for real S3
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
export REMOTE_STORAGE_S3_BUCKET=neon-github-ci-tests
export REMOTE_STORAGE_S3_REGION=eu-central-1
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_PROFILE -E 'package(remote_storage)' -E 'test(test_real_s3)'
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_s3)'
# Run separate tests for real Azure Blob Storage
# XXX: replace region with `eu-central-1`-like region
@@ -289,7 +306,7 @@ jobs:
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_PROFILE -E 'package(remote_storage)' -E 'test(test_real_azure)'
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_azure)'
- name: Install postgres binaries
run: |

View File

@@ -670,7 +670,7 @@ jobs:
ghcr.io/neondatabase/neon:${{ needs.meta.outputs.build-tag }}-bookworm-arm64
compute-node-image-arch:
needs: [ check-permissions, meta ]
needs: [ check-permissions, build-build-tools-image, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
@@ -743,6 +743,7 @@ jobs:
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
PG_VERSION=${{ matrix.version.pg }}
BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
DEBIAN_VERSION=${{ matrix.version.debian }}
provenance: false
push: true
@@ -762,6 +763,7 @@ jobs:
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
PG_VERSION=${{ matrix.version.pg }}
BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
DEBIAN_VERSION=${{ matrix.version.debian }}
provenance: false
push: true

41
Cargo.lock generated
View File

@@ -1235,6 +1235,25 @@ dependencies = [
"replace_with",
]
[[package]]
name = "client_cache"
version = "0.1.0"
dependencies = [
"async-trait",
"bytes",
"futures",
"http 1.1.0",
"hyper-util",
"priority-queue",
"rand 0.8.5",
"tokio",
"tokio-util",
"tonic 0.13.1",
"tower 0.5.2",
"uuid",
"workspace_hack",
]
[[package]]
name = "colorchoice"
version = "1.0.0"
@@ -5029,6 +5048,17 @@ dependencies = [
"elliptic-curve 0.13.8",
]
[[package]]
name = "priority-queue"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5676d703dda103cbb035b653a9f11448c0a7216c7926bd35fcb5865475d0c970"
dependencies = [
"autocfg",
"equivalent",
"indexmap 2.9.0",
]
[[package]]
name = "proc-macro2"
version = "1.0.94"
@@ -5647,9 +5677,16 @@ dependencies = [
[[package]]
name = "replace_with"
version = "0.1.7"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3a8614ee435691de62bcffcf4a66d91b3594bf1428a5722e79103249a095690"
checksum = "51743d3e274e2b18df81c4dc6caf8a5b8e15dbe799e0dca05c7617380094e884"
[[package]]
name = "request_tracker"
version = "0.1.0"
dependencies = [
"workspace_hack",
]
[[package]]
name = "reqwest"

View File

@@ -8,8 +8,10 @@ members = [
"pageserver/compaction",
"pageserver/ctl",
"pageserver/client",
"pageserver/communicator_pools/client_cache",
"pageserver/pagebench",
"pageserver/page_api",
"pageserver/communicator_pools/request_tracker",
"proxy",
"safekeeper",
"safekeeper/client",
@@ -257,6 +259,8 @@ pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
pageserver_page_api = { path = "./pageserver/page_api" }
client_cache = { path = "./pageserver/communicator_pools/client_cache" }
request_tracker = { path = "./pageserver/communicator_pools/request_tracker" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }

View File

@@ -4,12 +4,6 @@ ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
# managers.
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
# CARGO_BUILD_FLAGS: Extra flags to pass to `cargo build`. `--locked`
# and `--features testing` are popular examples.
#
# CARGO_PROFILE: You can also set to override the cargo profile to
# use. By default, it is derived from BUILD_TYPE.
# All intermediate build artifacts are stored here.
BUILD_DIR := build
@@ -26,12 +20,12 @@ ifeq ($(BUILD_TYPE),release)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl
PG_CFLAGS += -O2 -g3 $(CFLAGS)
PG_LDFLAGS = $(LDFLAGS)
CARGO_PROFILE ?= --profile=release
# Unfortunately, `--profile=...` is a nightly feature
CARGO_BUILD_FLAGS += --release
else ifeq ($(BUILD_TYPE),debug)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl --enable-cassert --enable-depend
PG_CFLAGS += -O0 -g3 $(CFLAGS)
PG_LDFLAGS = $(LDFLAGS)
CARGO_PROFILE ?= --profile=dev
else
$(error Bad build type '$(BUILD_TYPE)', see Makefile for options)
endif
@@ -103,7 +97,7 @@ all: neon postgres neon-pg-ext
.PHONY: neon
neon: postgres-headers walproposer-lib cargo-target-dir
+@echo "Compiling Neon"
$(CARGO_CMD_PREFIX) cargo build $(CARGO_BUILD_FLAGS) $(CARGO_PROFILE)
$(CARGO_CMD_PREFIX) cargo build $(CARGO_BUILD_FLAGS)
.PHONY: cargo-target-dir
cargo-target-dir:
# https://github.com/rust-lang/cargo/issues/14281

View File

@@ -77,6 +77,9 @@
# build_and_test.yml github workflow for how that's done.
ARG PG_VERSION
ARG REPOSITORY=ghcr.io/neondatabase
ARG IMAGE=build-tools
ARG TAG=pinned
ARG BUILD_TAG
ARG DEBIAN_VERSION=bookworm
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
@@ -100,8 +103,8 @@ ARG BULLSEYE_SLIM_SHA=sha256:e831d9a884d63734fe3dd9c491ed9a5a3d4c6a6d32c5b14f206
# If var will match one the known images, we will replace it with the known sha.
# If no match, than value will be unaffected, and will process with no-pinned image.
ARG BASE_IMAGE_SHA=debian:${DEBIAN_FLAVOR}
ARG BASE_IMAGE_SHA=${BASE_IMAGE_SHA/debian:-bookworm-slim/debian@$BOOKWORM_SLIM_SHA}
ARG BASE_IMAGE_SHA=${BASE_IMAGE_SHA/debian:-bullseye-slim/debian@$BULLSEYE_SLIM_SHA}
ARG BASE_IMAGE_SHA=${BASE_IMAGE_SHA/debian:bookworm-slim/debian@$BOOKWORM_SLIM_SHA}
ARG BASE_IMAGE_SHA=${BASE_IMAGE_SHA/debian:bullseye-slim/debian@$BULLSEYE_SLIM_SHA}
# By default, build all PostgreSQL extensions. For quick local testing when you don't
# care about the extensions, pass EXTENSIONS=none or EXTENSIONS=minimal
@@ -147,7 +150,6 @@ RUN case $DEBIAN_VERSION in \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget ca-certificates pkg-config libssl-dev \
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd curl unzip g++ \
libclang-dev \
jsonnet \
$VERSION_INSTALLS \
&& apt clean && rm -rf /var/lib/apt/lists/* && \
useradd -ms /bin/bash nonroot -b /home
@@ -1722,7 +1724,7 @@ FROM extensions-${EXTENSIONS} AS neon-pg-ext-build
# Compile the Neon-specific `compute_ctl`, `fast_import`, and `local_proxy` binaries
#
#########################################################################################
FROM build-deps-with-cargo AS compute-tools
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
@@ -1732,7 +1734,7 @@ COPY --chown=nonroot . .
RUN --mount=type=cache,uid=1000,target=/home/nonroot/.cargo/registry \
--mount=type=cache,uid=1000,target=/home/nonroot/.cargo/git \
--mount=type=cache,uid=1000,target=/home/nonroot/target \
cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy && \
mold -run cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy && \
mkdir target-bin && \
cp target/release-line-debug-size-lto/compute_ctl \
target/release-line-debug-size-lto/fast_import \
@@ -1826,11 +1828,10 @@ RUN rm /usr/local/pgsql/lib/lib*.a
# Preprocess the sql_exporter configuration files
#
#########################################################################################
FROM build-deps AS sql_exporter_preprocessor
FROM $REPOSITORY/$IMAGE:$TAG AS sql_exporter_preprocessor
ARG PG_VERSION
USER nonroot
WORKDIR /home/nonroot
COPY --chown=nonroot compute compute

View File

@@ -76,10 +76,6 @@ pub struct PostHogConfig {
pub private_api_url: String,
/// Public API URL
pub public_api_url: String,
/// Refresh interval for the feature flag spec
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
pub refresh_interval: Option<Duration>,
}
/// `pageserver.toml`

View File

@@ -36,10 +36,7 @@ impl FeatureResolverBackgroundLoop {
// Main loop of updating the feature flags.
handle.spawn(
async move {
tracing::info!(
"Starting PostHog feature resolver with refresh period: {:?}",
refresh_period
);
tracing::info!("Starting PostHog feature resolver");
let mut ticker = tokio::time::interval(refresh_period);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {

View File

@@ -12,9 +12,6 @@ testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "
fuzz-read-path = ["testing"]
# Enables benchmarking only APIs
benchmarking = []
[dependencies]
anyhow.workspace = true
arc-swap.workspace = true
@@ -130,7 +127,6 @@ harness = false
[[bench]]
name = "bench_ingest"
harness = false
required-features = ["benchmarking"]
[[bench]]
name = "upload_queue"

View File

@@ -1,29 +1,22 @@
use std::env;
use std::num::NonZeroUsize;
use std::sync::Arc;
use bytes::Bytes;
use camino::Utf8PathBuf;
use criterion::{Criterion, criterion_group, criterion_main};
use futures::stream::FuturesUnordered;
use pageserver::config::PageServerConf;
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::keyspace::KeySpace;
use pageserver::l0_flush::{L0FlushConfig, L0FlushGlobalState};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::storage_layer::IoConcurrency;
use pageserver::tenant::storage_layer::{InMemoryLayer, ValuesReconstructState};
use pageserver::tenant::storage_layer::InMemoryLayer;
use pageserver::{page_cache, virtual_file};
use pageserver_api::config::GetVectoredConcurrentIo;
use pageserver_api::key::Key;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::shard::TenantShardId;
use tokio_stream::StreamExt;
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::sync::gate::Gate;
use wal_decoder::models::value::Value;
use wal_decoder::serialized_batch::SerializedValueBatch;
@@ -37,7 +30,7 @@ fn murmurhash32(mut h: u32) -> u32 {
h
}
#[derive(serde::Serialize, Clone, Copy, Debug, PartialEq)]
#[derive(serde::Serialize, Clone, Copy, Debug)]
enum KeyLayout {
/// Sequential unique keys
Sequential,
@@ -47,30 +40,19 @@ enum KeyLayout {
RandomReuse(u32),
}
#[derive(serde::Serialize, Clone, Copy, Debug, PartialEq)]
#[derive(serde::Serialize, Clone, Copy, Debug)]
enum WriteDelta {
Yes,
No,
}
#[derive(serde::Serialize, Clone, Copy, Debug, PartialEq)]
enum ConcurrentReads {
Yes,
No,
}
async fn ingest(
conf: &'static PageServerConf,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
concurrent_reads: ConcurrentReads,
) -> anyhow::Result<()> {
if concurrent_reads == ConcurrentReads::Yes {
assert_eq!(key_layout, KeyLayout::Sequential);
}
let mut lsn = utils::lsn::Lsn(1000);
let mut key = Key::from_i128(0x0);
@@ -86,18 +68,16 @@ async fn ingest(
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let layer = Arc::new(
InMemoryLayer::create(
conf,
timeline_id,
tenant_shard_id,
lsn,
&gate,
&cancel,
&ctx,
)
.await?,
);
let layer = InMemoryLayer::create(
conf,
timeline_id,
tenant_shard_id,
lsn,
&gate,
&cancel,
&ctx,
)
.await?;
let data = Value::Image(Bytes::from(vec![0u8; put_size]));
let data_ser_size = data.serialized_size().unwrap() as usize;
@@ -106,61 +86,6 @@ async fn ingest(
pageserver::context::DownloadBehavior::Download,
);
const READ_BATCH_SIZE: u32 = 32;
let (tx, mut rx) = tokio::sync::watch::channel::<Option<Key>>(None);
let reader_cancel = CancellationToken::new();
let reader_handle = if concurrent_reads == ConcurrentReads::Yes {
Some(tokio::task::spawn({
let cancel = reader_cancel.clone();
let layer = layer.clone();
let ctx = ctx.attached_child();
async move {
let gate = Gate::default();
let gate_guard = gate.enter().unwrap();
let io_concurrency = IoConcurrency::spawn_from_conf(
GetVectoredConcurrentIo::SidecarTask,
gate_guard,
);
rx.wait_for(|key| key.is_some()).await.unwrap();
while !cancel.is_cancelled() {
let key = match *rx.borrow() {
Some(some) => some,
None => unreachable!(),
};
let mut start_key = key;
start_key.field6 = key.field6.saturating_sub(READ_BATCH_SIZE);
let key_range = start_key..key.next();
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone());
layer
.get_values_reconstruct_data(
KeySpace::single(key_range),
Lsn(1)..Lsn(u64::MAX),
&mut reconstruct_state,
&ctx,
)
.await
.unwrap();
let mut collect_futs = std::mem::take(&mut reconstruct_state.keys)
.into_values()
.map(|state| state.sink_pending_ios())
.collect::<FuturesUnordered<_>>();
while collect_futs.next().await.is_some() {}
}
drop(io_concurrency);
gate.close().await;
}
}))
} else {
None
};
const BATCH_SIZE: usize = 16;
let mut batch = Vec::new();
@@ -188,27 +113,19 @@ async fn ingest(
batch.push((key.to_compact(), lsn, data_ser_size, data.clone()));
if batch.len() >= BATCH_SIZE {
let last_key = Key::from_compact(batch.last().unwrap().0);
let this_batch = std::mem::take(&mut batch);
let serialized = SerializedValueBatch::from_values(this_batch);
layer.put_batch(serialized, &ctx).await?;
tx.send(Some(last_key)).unwrap();
}
}
if !batch.is_empty() {
let last_key = Key::from_compact(batch.last().unwrap().0);
let this_batch = std::mem::take(&mut batch);
let serialized = SerializedValueBatch::from_values(this_batch);
layer.put_batch(serialized, &ctx).await?;
tx.send(Some(last_key)).unwrap();
}
layer.freeze(lsn + 1).await;
if write_delta == WriteDelta::Yes {
if matches!(write_delta, WriteDelta::Yes) {
let l0_flush_state = L0FlushGlobalState::new(L0FlushConfig::Direct {
max_concurrency: NonZeroUsize::new(1).unwrap(),
});
@@ -219,11 +136,6 @@ async fn ingest(
tokio::fs::remove_file(path).await?;
}
reader_cancel.cancel();
if let Some(handle) = reader_handle {
handle.await.unwrap();
}
Ok(())
}
@@ -235,7 +147,6 @@ fn ingest_main(
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
concurrent_reads: ConcurrentReads,
) {
pageserver::virtual_file::set_io_mode(io_mode);
@@ -245,15 +156,7 @@ fn ingest_main(
.unwrap();
runtime.block_on(async move {
let r = ingest(
conf,
put_size,
put_count,
key_layout,
write_delta,
concurrent_reads,
)
.await;
let r = ingest(conf, put_size, put_count, key_layout, write_delta).await;
if let Err(e) = r {
panic!("{e:?}");
}
@@ -292,7 +195,6 @@ fn criterion_benchmark(c: &mut Criterion) {
key_size: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
concurrent_reads: ConcurrentReads,
}
#[derive(Clone)]
struct HandPickedParameters {
@@ -343,7 +245,7 @@ fn criterion_benchmark(c: &mut Criterion) {
];
let exploded_parameters = {
let mut out = Vec::new();
for concurrent_reads in [ConcurrentReads::Yes, ConcurrentReads::No] {
for io_mode in IoMode::iter() {
for param in expect.clone() {
let HandPickedParameters {
volume_mib,
@@ -351,18 +253,12 @@ fn criterion_benchmark(c: &mut Criterion) {
key_layout,
write_delta,
} = param;
if key_layout != KeyLayout::Sequential && concurrent_reads == ConcurrentReads::Yes {
continue;
}
out.push(ExplodedParameters {
io_mode: IoMode::DirectRw,
io_mode,
volume_mib,
key_size,
key_layout,
write_delta,
concurrent_reads,
});
}
}
@@ -376,10 +272,9 @@ fn criterion_benchmark(c: &mut Criterion) {
key_size,
key_layout,
write_delta,
concurrent_reads,
} = self;
format!(
"io_mode={io_mode:?} volume_mib={volume_mib:?} key_size_bytes={key_size:?} key_layout={key_layout:?} write_delta={write_delta:?} concurrent_reads={concurrent_reads:?}"
"io_mode={io_mode:?} volume_mib={volume_mib:?} key_size_bytes={key_size:?} key_layout={key_layout:?} write_delta={write_delta:?}"
)
}
}
@@ -392,23 +287,12 @@ fn criterion_benchmark(c: &mut Criterion) {
key_size,
key_layout,
write_delta,
concurrent_reads,
} = params;
let put_count = volume_mib * 1024 * 1024 / key_size;
group.throughput(criterion::Throughput::Bytes((key_size * put_count) as u64));
group.sample_size(10);
group.bench_function(id, |b| {
b.iter(|| {
ingest_main(
conf,
io_mode,
key_size,
put_count,
key_layout,
write_delta,
concurrent_reads,
)
})
b.iter(|| ingest_main(conf, io_mode, key_size, put_count, key_layout, write_delta))
});
}
}

View File

@@ -0,0 +1,21 @@
[package]
name = "client_cache"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
async-trait.workspace = true
bytes.workspace = true
futures.workspace = true
hyper-util.workspace = true
http.workspace = true
priority-queue = "2.3.1"
rand.workspace = true
tonic.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tower.workspace = true
uuid.workspace = true
workspace_hack.workspace = true

View File

@@ -0,0 +1,105 @@
use async_trait::async_trait;
use priority_queue::PriorityQueue;
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore};
#[async_trait]
pub trait PooledClientFactory<T>: Send + Sync + 'static {
/// Create a new pooled item.
async fn create(
&self,
connect_timeout: Duration,
) -> Result<Result<T, tonic::Status>, tokio::time::error::Elapsed>;
}
/// A pooled gRPC client with capacity tracking and error handling.
#[allow(dead_code)]
pub struct ClientCache<T> {
inner: Mutex<Inner<T>>,
fact: Arc<dyn PooledClientFactory<T> + Send + Sync>,
connect_timeout: Duration,
connect_backoff: Duration,
/// The maximum number of consumers that can use a single connection.
max_consumers: usize,
/// The number of consecutive errors before a connection is removed from the pool.
error_threshold: usize,
/// The maximum duration a connection can be idle before being removed.
max_idle_duration: Duration,
max_total_connections: usize,
client_semaphore: Arc<Semaphore>,
}
#[allow(dead_code)]
struct Inner<T> {
entries: HashMap<uuid::Uuid, CacheEntry<T>>,
pq: PriorityQueue<uuid::Uuid, usize>,
// This is updated when a connection is dropped, or we fail
// to create a new connection.
last_connect_failure: Option<Instant>,
waiters: usize,
in_progress: usize,
}
#[allow(dead_code)]
struct CacheEntry<T> {
client: T,
active_consumers: usize,
consecutive_errors: usize,
last_used: Instant,
}
/// A client borrowed from the pool.
#[allow(dead_code)]
pub struct PooledClient<T> {
pub client: T,
pool: Arc<ClientCache<T>>,
is_ok: bool,
id: uuid::Uuid,
permit: OwnedSemaphorePermit,
}
impl<T: Clone + Send + 'static> ClientCache<T> {
pub fn new(
fact: Arc<dyn PooledClientFactory<T> + Send + Sync>,
connect_timeout: Duration,
connect_backoff: Duration,
max_consumers: usize,
error_threshold: usize,
max_idle_duration: Duration,
max_total_connections: usize,
) -> Arc<Self> {
Arc::new(Self {
inner: Mutex::new(Inner::<T> {
entries: HashMap::new(),
pq: PriorityQueue::new(),
last_connect_failure: None,
waiters: 0,
in_progress: 0,
}),
fact: Arc::clone(&fact),
connect_timeout,
connect_backoff,
max_consumers,
error_threshold,
max_idle_duration,
max_total_connections,
client_semaphore: Arc::new(Semaphore::new(0)),
})
}
}
impl<T: Clone + Send + 'static> PooledClient<T> {
pub fn client(&self) -> T {
self.client.clone()
}
}

View File

@@ -0,0 +1,8 @@
[package]
name = "request_tracker"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
workspace_hack.workspace = true

View File

@@ -0,0 +1,15 @@
// Temporary placeholder until the request tracker is implemented
pub fn add(left: u64, right: u64) -> u64 {
left + right
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}

View File

@@ -583,7 +583,7 @@ fn start_pageserver(
deletion_queue_client,
l0_flush_global_state,
basebackup_prepare_sender,
feature_resolver: feature_resolver.clone(),
feature_resolver,
},
shutdown_pageserver.clone(),
);
@@ -715,7 +715,6 @@ fn start_pageserver(
disk_usage_eviction_state,
deletion_queue.new_client(),
secondary_controller,
feature_resolver,
)
.context("Failed to initialize router state")?,
);

View File

@@ -1,6 +1,5 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use arc_swap::ArcSwap;
use pageserver_api::config::NodeMetadata;
use posthog_client_lite::{
CaptureEvent, FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
@@ -13,13 +12,10 @@ use utils::id::TenantId;
use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION};
const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
#[derive(Clone)]
pub struct FeatureResolver {
inner: Option<Arc<FeatureResolverBackgroundLoop>>,
internal_properties: Option<Arc<HashMap<String, PostHogFlagFilterPropertyValue>>>,
force_overrides_for_testing: Arc<ArcSwap<HashMap<String, String>>>,
}
impl FeatureResolver {
@@ -27,7 +23,6 @@ impl FeatureResolver {
Self {
inner: None,
internal_properties: None,
force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
}
}
@@ -144,23 +139,18 @@ impl FeatureResolver {
}
tenants
};
inner.clone().spawn(
handle,
posthog_config
.refresh_interval
.unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL),
fake_tenants,
);
// TODO: make refresh period configurable
inner
.clone()
.spawn(handle, Duration::from_secs(60), fake_tenants);
Ok(FeatureResolver {
inner: Some(inner),
internal_properties: Some(internal_properties),
force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
})
} else {
Ok(FeatureResolver {
inner: None,
internal_properties: None,
force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
})
}
}
@@ -200,11 +190,6 @@ impl FeatureResolver {
flag_key: &str,
tenant_id: TenantId,
) -> Result<String, PostHogEvaluationError> {
let force_overrides = self.force_overrides_for_testing.load();
if let Some(value) = force_overrides.get(flag_key) {
return Ok(value.clone());
}
if let Some(inner) = &self.inner {
let res = inner.feature_store().evaluate_multivariate(
flag_key,
@@ -243,15 +228,6 @@ impl FeatureResolver {
flag_key: &str,
tenant_id: TenantId,
) -> Result<(), PostHogEvaluationError> {
let force_overrides = self.force_overrides_for_testing.load();
if let Some(value) = force_overrides.get(flag_key) {
return if value == "true" {
Ok(())
} else {
Err(PostHogEvaluationError::NoConditionGroupMatched)
};
}
if let Some(inner) = &self.inner {
let res = inner.feature_store().evaluate_boolean(
flag_key,
@@ -283,22 +259,8 @@ impl FeatureResolver {
inner.feature_store().is_feature_flag_boolean(flag_key)
} else {
Err(PostHogEvaluationError::NotAvailable(
"PostHog integration is not enabled, cannot auto-determine the flag type"
.to_string(),
"PostHog integration is not enabled".to_string(),
))
}
}
/// Force override a feature flag for testing. This is only for testing purposes. Assume the caller only call it
/// from a single thread so it won't race.
pub fn force_override_for_testing(&self, flag_key: &str, value: Option<&str>) {
let mut force_overrides = self.force_overrides_for_testing.load().as_ref().clone();
if let Some(value) = value {
force_overrides.insert(flag_key.to_string(), value.to_string());
} else {
force_overrides.remove(flag_key);
}
self.force_overrides_for_testing
.store(Arc::new(force_overrides));
}
}

View File

@@ -59,7 +59,6 @@ use crate::config::PageServerConf;
use crate::context;
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
use crate::deletion_queue::DeletionQueueClient;
use crate::feature_resolver::FeatureResolver;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::LocationConf;
@@ -108,7 +107,6 @@ pub struct State {
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
latest_utilization: tokio::sync::Mutex<Option<(std::time::Instant, bytes::Bytes)>>,
feature_resolver: FeatureResolver,
}
impl State {
@@ -122,7 +120,6 @@ impl State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
feature_resolver: FeatureResolver,
) -> anyhow::Result<Self> {
let allowlist_routes = &[
"/v1/status",
@@ -143,7 +140,6 @@ impl State {
deletion_queue_client,
secondary_controller,
latest_utilization: Default::default(),
feature_resolver,
})
}
}
@@ -3679,8 +3675,8 @@ async fn tenant_evaluate_feature_flag(
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let flag: String = parse_request_param(&request, "flag_key")?;
let as_type: Option<String> = parse_query_param(&request, "as")?;
let flag: String = must_parse_query_param(&request, "flag")?;
let as_type: String = must_parse_query_param(&request, "as")?;
let state = get_state(&request);
@@ -3689,11 +3685,11 @@ async fn tenant_evaluate_feature_flag(
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let properties = tenant.feature_resolver.collect_properties(tenant_shard_id.tenant_id);
if as_type.as_deref() == Some("boolean") {
if as_type == "boolean" {
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
let result = result.map(|_| true).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
} else if as_type.as_deref() == Some("multivariate") {
} else if as_type == "multivariate" {
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
} else {
@@ -3713,35 +3709,6 @@ async fn tenant_evaluate_feature_flag(
.await
}
async fn force_override_feature_flag_for_testing_put(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let flag: String = parse_request_param(&request, "flag_key")?;
let value: String = must_parse_query_param(&request, "value")?;
let state = get_state(&request);
state
.feature_resolver
.force_override_for_testing(&flag, Some(&value));
json_response(StatusCode::OK, ())
}
async fn force_override_feature_flag_for_testing_delete(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let flag: String = parse_request_param(&request, "flag_key")?;
let state = get_state(&request);
state
.feature_resolver
.force_override_for_testing(&flag, None);
json_response(StatusCode::OK, ())
}
/// Common functionality of all the HTTP API handlers.
///
/// - Adds a tracing span to each request (by `request_span`)
@@ -4118,14 +4085,8 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/activate_post_import",
|r| api_handler(r, activate_post_import_handler),
)
.get("/v1/tenant/:tenant_shard_id/feature_flag/:flag_key", |r| {
.get("/v1/tenant/:tenant_shard_id/feature_flag", |r| {
api_handler(r, tenant_evaluate_feature_flag)
})
.put("/v1/feature_flag/:flag_key", |r| {
testing_api_handler("force override feature flag - put", r, force_override_feature_flag_for_testing_put)
})
.delete("/v1/feature_flag/:flag_key", |r| {
testing_api_handler("force override feature flag - delete", r, force_override_feature_flag_for_testing_delete)
})
.any(handler_404))
}

View File

@@ -3426,7 +3426,7 @@ impl TimelineMetrics {
pub fn dec_frozen_layer(&self, layer: &InMemoryLayer) {
assert!(matches!(layer.info(), InMemoryLayerInfo::Frozen { .. }));
let labels = self.make_frozen_layer_labels(layer);
let size = layer.len();
let size = layer.try_len().expect("frozen layer should have no writer");
TIMELINE_LAYER_COUNT
.get_metric_with_label_values(&labels)
.unwrap()
@@ -3441,7 +3441,7 @@ impl TimelineMetrics {
pub fn inc_frozen_layer(&self, layer: &InMemoryLayer) {
assert!(matches!(layer.info(), InMemoryLayerInfo::Frozen { .. }));
let labels = self.make_frozen_layer_labels(layer);
let size = layer.len();
let size = layer.try_len().expect("frozen layer should have no writer");
TIMELINE_LAYER_COUNT
.get_metric_with_label_values(&labels)
.unwrap()

View File

@@ -3544,9 +3544,8 @@ impl proto::PageService for GrpcPageServiceHandler {
&self,
req: tonic::Request<proto::GetBaseBackupRequest>,
) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
// Send chunks of 256 KB to avoid large memory allocations. pagebench basebackup shows this
// to be the sweet spot where throughput is saturated.
const CHUNK_SIZE: usize = 256 * 1024;
// Send 64 KB chunks to avoid large memory allocations.
const CHUNK_SIZE: usize = 64 * 1024;
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_timeline(&timeline);

View File

@@ -3,7 +3,7 @@
use std::io;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::AtomicU64;
use camino::Utf8PathBuf;
use num_traits::Num;
@@ -18,7 +18,6 @@ use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache;
use crate::tenant::storage_layer::inmemory_layer::GlobalResourceUnits;
use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
@@ -31,13 +30,9 @@ pub struct EphemeralFile {
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
page_cache_file_id: page_cache::FileId,
bytes_written: u64,
file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
buffered_writer: tokio::sync::RwLock<BufferedWriter>,
bytes_written: AtomicU64,
resource_units: std::sync::Mutex<GlobalResourceUnits>,
buffered_writer: BufferedWriter,
}
type BufferedWriter = owned_buffers_io::write::BufferedWriter<
@@ -99,8 +94,9 @@ impl EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
page_cache_file_id,
bytes_written: 0,
file: file.clone(),
buffered_writer: tokio::sync::RwLock::new(BufferedWriter::new(
buffered_writer: BufferedWriter::new(
file,
0,
|| IoBufferMut::with_capacity(TAIL_SZ),
@@ -108,9 +104,7 @@ impl EphemeralFile {
cancel.child_token(),
ctx,
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
)),
bytes_written: AtomicU64::new(0),
resource_units: std::sync::Mutex::new(GlobalResourceUnits::new()),
),
})
}
}
@@ -157,17 +151,15 @@ impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter
#[derive(Debug, thiserror::Error)]
pub(crate) enum EphemeralFileWriteError {
#[error("{0}")]
TooLong(String),
#[error("cancelled")]
Cancelled,
}
impl EphemeralFile {
pub(crate) fn len(&self) -> u64 {
// TODO(vlad): The value returned here is not always correct if
// we have more than one concurrent writer. Writes are always
// sequenced, but we could grab the buffered writer lock if we wanted
// to.
self.bytes_written.load(Ordering::Acquire)
self.bytes_written
}
pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
@@ -194,7 +186,7 @@ impl EphemeralFile {
/// Panics if the write is short because there's no way we can recover from that.
/// TODO: make upstack handle this as an error.
pub(crate) async fn write_raw(
&self,
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> Result<u64, EphemeralFileWriteError> {
@@ -206,13 +198,22 @@ impl EphemeralFile {
}
async fn write_raw_controlled(
&self,
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> Result<(u64, Option<owned_buffers_io::write::FlushControl>), EphemeralFileWriteError> {
let mut writer = self.buffered_writer.write().await;
let pos = self.bytes_written;
let (nwritten, control) = writer
let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
EphemeralFileWriteError::TooLong(format!(
"write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
srcbuf_len = srcbuf.len(),
))
})?;
// Write the payload
let (nwritten, control) = self
.buffered_writer
.write_buffered_borrowed_controlled(srcbuf, ctx)
.await
.map_err(|e| match e {
@@ -224,61 +225,22 @@ impl EphemeralFile {
"buffered writer has no short writes"
);
// There's no realistic risk of overflow here. We won't have exabytes sized files on disk.
let pos = self
.bytes_written
.fetch_add(srcbuf.len().into_u64(), Ordering::AcqRel);
let mut resource_units = self.resource_units.lock().unwrap();
resource_units.maybe_publish_size(self.bytes_written.load(Ordering::Relaxed));
self.bytes_written = new_bytes_written;
Ok((pos, control))
}
pub(crate) fn tick(&self) -> Option<u64> {
let mut resource_units = self.resource_units.lock().unwrap();
let len = self.bytes_written.load(Ordering::Relaxed);
resource_units.publish_size(len)
}
}
impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
&self,
start: u64,
mut dst: tokio_epoll_uring::Slice<B>,
dst: tokio_epoll_uring::Slice<B>,
ctx: &RequestContext,
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
// We will fill the slice in back to front. Hence, we need
// the slice to be fully initialized.
// TODO(vlad): Is there a nicer way of doing this?
dst.as_mut_rust_slice_full_zeroed();
let submitted_offset = self.buffered_writer.bytes_submitted();
let writer = self.buffered_writer.read().await;
// Read bytes written while under lock. This is a hack to deal with concurrent
// writes updating the number of bytes written. `bytes_written` is not DIO alligned
// but we may end the read there.
//
// TODO(vlad): Feels like there's a nicer path where we align the end if it
// shoots over the end of the file.
let bytes_written = self.bytes_written.load(Ordering::Acquire);
let dst_cap = dst.bytes_total().into_u64();
let end = {
// saturating_add is correct here because the max file size is u64::MAX, so,
// if start + dst.len() > u64::MAX, then we know it will be a short read
let mut end: u64 = start.saturating_add(dst_cap);
if end > bytes_written {
end = bytes_written;
}
end
};
let submitted_offset = writer.bytes_submitted();
let maybe_flushed = writer.inspect_maybe_flushed();
let mutable = match writer.inspect_mutable() {
let mutable = match self.buffered_writer.inspect_mutable() {
Some(mutable) => &mutable[0..mutable.pending()],
None => {
// Timeline::cancel and hence buffered writer flush was cancelled.
@@ -287,6 +249,19 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
}
};
let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
let dst_cap = dst.bytes_total().into_u64();
let end = {
// saturating_add is correct here because the max file size is u64::MAX, so,
// if start + dst.len() > u64::MAX, then we know it will be a short read
let mut end: u64 = start.saturating_add(dst_cap);
if end > self.bytes_written {
end = self.bytes_written;
}
end
};
// inclusive, exclusive
#[derive(Debug)]
struct Range<N>(N, N);
@@ -331,33 +306,13 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
// There are three sources from which we might have to read data:
// 1. The file itself
// 2. The buffer which contains changes currently being flushed
// 3. The buffer which contains chnages yet to be flushed
//
// For better concurrency, we do them in reverse order: perform the in-memory
// reads while holding the writer lock, drop the writer lock and read from the
// file if required.
let dst = if mutable_range.len() > 0 {
let offset_in_buffer = mutable_range
.0
.checked_sub(submitted_offset)
.unwrap()
.into_usize();
let to_copy =
&mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
let dst = if written_range.len() > 0 {
let bounds = dst.bounds();
let mut view = dst.slice({
let start =
written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
start..end
});
view.as_mut_rust_slice_full_zeroed()
.copy_from_slice(to_copy);
Slice::from_buf_bounds(Slice::into_inner(view), bounds)
let slice = self
.file
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
.await?;
Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
} else {
dst
};
@@ -387,15 +342,24 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
dst
};
drop(writer);
let dst = if written_range.len() > 0 {
let dst = if mutable_range.len() > 0 {
let offset_in_buffer = mutable_range
.0
.checked_sub(submitted_offset)
.unwrap()
.into_usize();
let to_copy =
&mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
let bounds = dst.bounds();
let slice = self
.file
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
.await?;
Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
let mut view = dst.slice({
let start =
written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
start..end
});
view.as_mut_rust_slice_full_zeroed()
.copy_from_slice(to_copy);
Slice::from_buf_bounds(Slice::into_inner(view), bounds)
} else {
dst
};
@@ -496,15 +460,13 @@ mod tests {
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap();
let writer = file.buffered_writer.read().await;
let mutable = writer.mutable();
let mutable = file.buffered_writer.mutable();
let cap = mutable.capacity();
let align = mutable.align();
drop(writer);
let write_nbytes = cap * 2 + cap / 2;
@@ -542,11 +504,10 @@ mod tests {
let file_contents = std::fs::read(file.file.path()).unwrap();
assert!(file_contents == content[0..cap * 2]);
let writer = file.buffered_writer.read().await;
let maybe_flushed_buffer_contents = writer.inspect_maybe_flushed().unwrap();
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
let mutable_buffer_contents = writer.mutable();
let mutable_buffer_contents = file.buffered_writer.mutable();
assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
}
@@ -556,14 +517,12 @@ mod tests {
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap();
// mutable buffer and maybe_flushed buffer each has `cap` bytes.
let writer = file.buffered_writer.read().await;
let cap = writer.mutable().capacity();
drop(writer);
let cap = file.buffered_writer.mutable().capacity();
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
@@ -581,13 +540,12 @@ mod tests {
2 * cap.into_u64(),
"buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
);
let writer = file.buffered_writer.read().await;
assert_eq!(
&writer.inspect_maybe_flushed().unwrap()[0..cap],
&file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
&content[cap..cap * 2]
);
assert_eq!(
&writer.mutable()[0..cap / 2],
&file.buffered_writer.mutable()[0..cap / 2],
&content[cap * 2..cap * 2 + cap / 2]
);
}
@@ -605,15 +563,13 @@ mod tests {
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap();
let writer = file.buffered_writer.read().await;
let mutable = writer.mutable();
let mutable = file.buffered_writer.mutable();
let cap = mutable.capacity();
let align = mutable.align();
drop(writer);
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
.take(cap * 2 + cap / 2)

View File

@@ -109,7 +109,7 @@ pub(crate) enum OnDiskValue {
/// Reconstruct data accumulated for a single key during a vectored get
#[derive(Debug, Default)]
pub struct VectoredValueReconstructState {
pub(crate) struct VectoredValueReconstructState {
pub(crate) on_disk_values: Vec<(Lsn, OnDiskValueIoWaiter)>,
pub(crate) situation: ValueReconstructSituation,
@@ -244,60 +244,13 @@ impl VectoredValueReconstructState {
res
}
/// Benchmarking utility to await for the completion of all pending ios
///
/// # Cancel-Safety
///
/// Technically fine to stop polling this future, but, the IOs will still
/// be executed to completion by the sidecar task and hold on to / consume resources.
/// Better not do it to make reasonsing about the system easier.
#[cfg(feature = "benchmarking")]
pub async fn sink_pending_ios(self) -> Result<(), std::io::Error> {
let mut res = Ok(());
// We should try hard not to bail early, so that by the time we return from this
// function, all IO for this value is done. It's not required -- we could totally
// stop polling the IO futures in the sidecar task, they need to support that,
// but just stopping to poll doesn't reduce the IO load on the disk. It's easier
// to reason about the system if we just wait for all IO to complete, even if
// we're no longer interested in the result.
//
// Revisit this when IO futures are replaced with a more sophisticated IO system
// and an IO scheduler, where we know which IOs were submitted and which ones
// just queued. Cf the comment on IoConcurrency::spawn_io.
for (_lsn, waiter) in self.on_disk_values {
let value_recv_res = waiter
.wait_completion()
// we rely on the caller to poll us to completion, so this is not a bail point
.await;
match (&mut res, value_recv_res) {
(Err(_), _) => {
// We've already failed, no need to process more.
}
(Ok(_), Err(_wait_err)) => {
// This shouldn't happen - likely the sidecar task panicked.
unreachable!();
}
(Ok(_), Ok(Err(err))) => {
let err: std::io::Error = err;
res = Err(err);
}
(Ok(_ok), Ok(Ok(OnDiskValue::RawImage(_img)))) => {}
(Ok(_ok), Ok(Ok(OnDiskValue::WalRecordOrImage(_buf)))) => {}
}
}
res
}
}
/// Bag of data accumulated during a vectored get..
pub struct ValuesReconstructState {
pub(crate) struct ValuesReconstructState {
/// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
/// should not expect to get anything from this hashmap.
pub keys: HashMap<Key, VectoredValueReconstructState>,
pub(crate) keys: HashMap<Key, VectoredValueReconstructState>,
/// The keys which are already retrieved
keys_done: KeySpaceRandomAccum,
@@ -319,7 +272,7 @@ pub struct ValuesReconstructState {
/// The desired end state is that we always do parallel IO.
/// This struct and the dispatching in the impl will be removed once
/// we've built enough confidence.
pub enum IoConcurrency {
pub(crate) enum IoConcurrency {
Sequential,
SidecarTask {
task_id: usize,
@@ -364,7 +317,10 @@ impl IoConcurrency {
Self::spawn(SelectedIoConcurrency::Sequential)
}
pub fn spawn_from_conf(conf: GetVectoredConcurrentIo, gate_guard: GateGuard) -> IoConcurrency {
pub(crate) fn spawn_from_conf(
conf: GetVectoredConcurrentIo,
gate_guard: GateGuard,
) -> IoConcurrency {
let selected = match conf {
GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
@@ -469,6 +425,16 @@ impl IoConcurrency {
}
}
pub(crate) fn clone(&self) -> Self {
match self {
IoConcurrency::Sequential => IoConcurrency::Sequential,
IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask {
task_id: *task_id,
ios_tx: ios_tx.clone(),
},
}
}
/// Submit an IO to be executed in the background. DEADLOCK RISK, read the full doc string.
///
/// The IO is represented as an opaque future.
@@ -607,18 +573,6 @@ impl IoConcurrency {
}
}
impl Clone for IoConcurrency {
fn clone(&self) -> Self {
match self {
IoConcurrency::Sequential => IoConcurrency::Sequential,
IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask {
task_id: *task_id,
ios_tx: ios_tx.clone(),
},
}
}
}
/// Make noise in case the [`ValuesReconstructState`] gets dropped while
/// there are still IOs in flight.
/// Refer to `collect_pending_ios` for why we prefer not to do that.
@@ -649,7 +603,7 @@ impl Drop for ValuesReconstructState {
}
impl ValuesReconstructState {
pub fn new(io_concurrency: IoConcurrency) -> Self {
pub(crate) fn new(io_concurrency: IoConcurrency) -> Self {
Self {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),

View File

@@ -70,15 +70,23 @@ pub struct InMemoryLayer {
/// We use a separate lock for the index to reduce the critical section
/// during which reads cannot be planned.
///
/// Note that the file backing [`InMemoryLayer::file`] is append-only,
/// so it is not necessary to hold a lock on the index while reading or writing from the file.
/// If you need access to both the index and the underlying file at the same time,
/// respect the following locking order to avoid deadlocks:
/// 1. [`InMemoryLayer::inner`]
/// 2. [`InMemoryLayer::index`]
///
/// Note that the file backing [`InMemoryLayer::inner`] is append-only,
/// so it is not necessary to hold simultaneous locks on index.
/// This avoids holding index locks across IO, and is crucial for avoiding read tail latency.
/// In particular:
/// 1. It is safe to read and release [`InMemoryLayer::index`] before reading from [`InMemoryLayer::file`].
/// 2. It is safe to write to [`InMemoryLayer::file`] before locking and updating [`InMemoryLayer::index`].
/// 1. It is safe to read and release [`InMemoryLayer::index`] before locking and reading from [`InMemoryLayer::inner`].
/// 2. It is safe to write and release [`InMemoryLayer::inner`] before locking and updating [`InMemoryLayer::index`].
index: RwLock<BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>>,
/// Wrapper for the actual on-disk file. Uses interior mutability for concurrent reads/writes.
file: EphemeralFile,
/// The above fields never change, except for `end_lsn`, which is only set once,
/// and `index` (see rationale there).
/// All other changing parts are in `inner`, and protected by a mutex.
inner: RwLock<InMemoryLayerInner>,
estimated_in_mem_size: AtomicU64,
}
@@ -88,10 +96,20 @@ impl std::fmt::Debug for InMemoryLayer {
f.debug_struct("InMemoryLayer")
.field("start_lsn", &self.start_lsn)
.field("end_lsn", &self.end_lsn)
.field("inner", &self.inner)
.finish()
}
}
pub struct InMemoryLayerInner {
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
/// PerSeg::page_versions map stores offsets into this file.
file: EphemeralFile,
resource_units: GlobalResourceUnits,
}
/// Support the same max blob length as blob_io, because ultimately
/// all the InMemoryLayer contents end up being written into a delta layer,
/// using the [`crate::tenant::blob_io`].
@@ -240,6 +258,12 @@ struct IndexEntryUnpacked {
pos: u64,
}
impl std::fmt::Debug for InMemoryLayerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemoryLayerInner").finish()
}
}
/// State shared by all in-memory (ephemeral) layers. Updated infrequently during background ticks in Timeline,
/// to minimize contention.
///
@@ -256,7 +280,7 @@ pub(crate) struct GlobalResources {
}
// Per-timeline RAII struct for its contribution to [`GlobalResources`]
pub(crate) struct GlobalResourceUnits {
struct GlobalResourceUnits {
// How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
// for decrementing the global counter by this many bytes when dropped.
dirty_bytes: u64,
@@ -268,7 +292,7 @@ impl GlobalResourceUnits {
// updated when the Timeline "ticks" in the background.
const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
pub(crate) fn new() -> Self {
fn new() -> Self {
GLOBAL_RESOURCES
.dirty_layers
.fetch_add(1, AtomicOrdering::Relaxed);
@@ -280,7 +304,7 @@ impl GlobalResourceUnits {
///
/// Returns the effective layer size limit that should be applied, if any, to keep
/// the total number of dirty bytes below the configured maximum.
pub(crate) fn publish_size(&mut self, size: u64) -> Option<u64> {
fn publish_size(&mut self, size: u64) -> Option<u64> {
let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
Ordering::Greater => {
@@ -325,7 +349,7 @@ impl GlobalResourceUnits {
// Call publish_size if the input size differs from last published size by more than
// the drift limit
pub(crate) fn maybe_publish_size(&mut self, size: u64) {
fn maybe_publish_size(&mut self, size: u64) {
let publish = match size.cmp(&self.dirty_bytes) {
Ordering::Equal => false,
Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
@@ -374,8 +398,8 @@ impl InMemoryLayer {
}
}
pub(crate) fn len(&self) -> u64 {
self.file.len()
pub(crate) fn try_len(&self) -> Option<u64> {
self.inner.try_read().map(|i| i.file.len()).ok()
}
pub(crate) fn assert_writable(&self) {
@@ -406,7 +430,7 @@ impl InMemoryLayer {
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
pub async fn get_values_reconstruct_data(
pub(crate) async fn get_values_reconstruct_data(
self: &Arc<InMemoryLayer>,
keyspace: KeySpace,
lsn_range: Range<Lsn>,
@@ -455,13 +479,14 @@ impl InMemoryLayer {
}
}
}
drop(index); // release the lock before we spawn the IO
drop(index); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
let read_from = Arc::clone(self);
let read_ctx = ctx.attached_child();
reconstruct_state
.spawn_io(async move {
let inner = read_from.inner.read().await;
let f = vectored_dio_read::execute(
&read_from.file,
&inner.file,
reads
.iter()
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
@@ -493,6 +518,7 @@ impl InMemoryLayer {
// This is kinda forced for InMemoryLayer because we need to inner.read() anyway,
// but it's less obvious for DeltaLayer and ImageLayer. So, keep this explicit
// drop for consistency among all three layer types.
drop(inner);
drop(read_from);
})
.await;
@@ -523,6 +549,12 @@ impl std::fmt::Display for InMemoryLayer {
}
impl InMemoryLayer {
/// Get layer size.
pub async fn size(&self) -> Result<u64> {
let inner = self.inner.read().await;
Ok(inner.file.len())
}
pub fn estimated_in_mem_size(&self) -> u64 {
self.estimated_in_mem_size.load(AtomicOrdering::Relaxed)
}
@@ -555,7 +587,10 @@ impl InMemoryLayer {
end_lsn: OnceLock::new(),
opened_at: Instant::now(),
index: RwLock::new(BTreeMap::new()),
file,
inner: RwLock::new(InMemoryLayerInner {
file,
resource_units: GlobalResourceUnits::new(),
}),
estimated_in_mem_size: AtomicU64::new(0),
})
}
@@ -564,37 +599,41 @@ impl InMemoryLayer {
///
/// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
/// The reason why it's not retryable is that the [`EphemeralFile`] writes are not retryable.
///
/// This method shall not be called concurrently. We enforce this property via [`crate::tenant::Timeline::write_lock`].
///
/// TODO: it can be made retryable if we aborted the process on EphemeralFile write errors.
pub async fn put_batch(
&self,
serialized_batch: SerializedValueBatch,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.assert_writable();
let (base_offset, metadata) = {
let mut inner = self.inner.write().await;
self.assert_writable();
let base_offset = self.file.len();
let base_offset = inner.file.len();
let SerializedValueBatch {
raw,
metadata,
max_lsn: _,
len: _,
} = serialized_batch;
let SerializedValueBatch {
raw,
metadata,
max_lsn: _,
len: _,
} = serialized_batch;
// Write the batch to the file
self.file.write_raw(&raw, ctx).await?;
let new_size = self.file.len();
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
// also IndexEntry and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
// also IndexEntry and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
inner.resource_units.maybe_publish_size(new_size);
(base_offset, metadata)
};
// Update the index with the new entries
let mut index = self.index.write().await;
@@ -647,8 +686,10 @@ impl InMemoryLayer {
self.opened_at
}
pub(crate) fn tick(&self) -> Option<u64> {
self.file.tick()
pub(crate) async fn tick(&self) -> Option<u64> {
let mut inner = self.inner.write().await;
let size = inner.file.len();
inner.resource_units.publish_size(size)
}
pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
@@ -712,6 +753,12 @@ impl InMemoryLayer {
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. See the comment on
// [`InMemoryLayer::freeze`] to understand how locking between the append path
// and layer flushing works.
let inner = self.inner.read().await;
let index = self.index.read().await;
use l0_flush::Inner;
@@ -746,7 +793,7 @@ impl InMemoryLayer {
match l0_flush_global_state {
l0_flush::Inner::Direct { .. } => {
let file_contents = self.file.load_to_io_buf(ctx).await?;
let file_contents = inner.file.load_to_io_buf(ctx).await?;
let file_contents = file_contents.freeze();
for (key, vec_map) in index.iter() {

View File

@@ -816,7 +816,7 @@ impl From<layer_manager::Shutdown> for FlushLayerError {
}
#[derive(thiserror::Error, Debug)]
pub enum GetVectoredError {
pub(crate) enum GetVectoredError {
#[error("timeline shutting down")]
Cancelled,
@@ -849,7 +849,7 @@ impl From<GetReadyAncestorError> for GetVectoredError {
}
#[derive(thiserror::Error, Debug)]
pub enum GetReadyAncestorError {
pub(crate) enum GetReadyAncestorError {
#[error("ancestor LSN wait error")]
AncestorLsnTimeout(#[from] WaitLsnError),
@@ -939,7 +939,7 @@ impl std::fmt::Debug for Timeline {
}
#[derive(thiserror::Error, Debug, Clone)]
pub enum WaitLsnError {
pub(crate) enum WaitLsnError {
// Called on a timeline which is shutting down
#[error("Shutdown")]
Shutdown,
@@ -1902,11 +1902,16 @@ impl Timeline {
return;
};
let current_size = open_layer.len();
let Some(current_size) = open_layer.try_len() else {
// Unexpected: since we hold the write guard, nobody else should be writing to this layer, so
// read lock to get size should always succeed.
tracing::warn!("Lock conflict while reading size of open layer");
return;
};
let current_lsn = self.get_last_record_lsn();
let checkpoint_distance_override = open_layer.tick();
let checkpoint_distance_override = open_layer.tick().await;
if let Some(size_override) = checkpoint_distance_override {
if current_size > size_override {
@@ -7367,7 +7372,7 @@ impl TimelineWriter<'_> {
.tl
.get_layer_for_write(at, &self.write_guard, ctx)
.await?;
let initial_size = layer.len();
let initial_size = layer.size().await?;
let last_freeze_at = self.last_freeze_at.load();
self.write_guard.replace(TimelineWriterState::new(

View File

@@ -825,7 +825,6 @@ impl Scheduler {
struct AzScore {
home_shard_count: usize,
scheduleable: bool,
node_count: usize,
}
let mut azs: HashMap<&AvailabilityZone, AzScore> = HashMap::new();
@@ -833,7 +832,6 @@ impl Scheduler {
let az = azs.entry(&node.az).or_default();
az.home_shard_count += node.home_shard_count;
az.scheduleable |= matches!(node.may_schedule, MaySchedule::Yes(_));
az.node_count += 1;
}
// If any AZs are schedulable, then filter out the non-schedulable ones (i.e. AZs where
@@ -842,20 +840,10 @@ impl Scheduler {
azs.retain(|_, i| i.scheduleable);
}
// We will multiply up shard counts by the max node count for scoring, before dividing
// by per-node max node count, to get a normalized score that doesn't collapse to zero
// when the absolute shard count is less than the node count.
let max_node_count = azs.values().map(|i| i.node_count).max().unwrap_or(0);
// Find the AZ with the lowest number of shards currently allocated
Some(
azs.into_iter()
.min_by_key(|i| {
(
(i.1.home_shard_count * max_node_count) / i.1.node_count,
i.0,
)
})
.min_by_key(|i| (i.1.home_shard_count, i.0))
.unwrap()
.0
.clone(),

View File

@@ -1219,31 +1219,3 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
)
self.verbose_error(res)
return res.json()
def force_override_feature_flag(self, flag: str, value: str | None = None):
if value is None:
res = self.delete(
f"http://localhost:{self.port}/v1/feature_flag/{flag}",
)
else:
res = self.put(
f"http://localhost:{self.port}/v1/feature_flag/{flag}",
params={"value": value},
)
self.verbose_error(res)
def evaluate_feature_flag_boolean(self, tenant_id: TenantId, flag: str) -> Any:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/feature_flag/{flag}",
params={"as": "boolean"},
)
self.verbose_error(res)
return res.json()
def evaluate_feature_flag_multivariate(self, tenant_id: TenantId, flag: str) -> Any:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/feature_flag/{flag}",
params={"as": "multivariate"},
)
self.verbose_error(res)
return res.json()

View File

@@ -146,6 +146,8 @@ def run_benchmark(env: NeonEnv, pg_bin: PgBin, record, duration_secs: int):
ps_http.base_url,
"--page-service-connstring",
env.pageserver.connstr(password=None),
"--gzip-probability",
"1",
"--runtime",
f"{duration_secs}s",
# don't specify the targets explicitly, let pagebench auto-discover them

View File

@@ -1,51 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from fixtures.utils import run_only_on_default_postgres
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder
@run_only_on_default_postgres("Pageserver-only test only needs to run on one version")
def test_feature_flag(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.pageserver.http_client().force_override_feature_flag("test-feature-flag", "true")
assert env.pageserver.http_client().evaluate_feature_flag_boolean(
env.initial_tenant, "test-feature-flag"
)["result"]["Ok"]
assert (
env.pageserver.http_client().evaluate_feature_flag_multivariate(
env.initial_tenant, "test-feature-flag"
)["result"]["Ok"]
== "true"
)
env.pageserver.http_client().force_override_feature_flag("test-feature-flag", "false")
assert (
env.pageserver.http_client().evaluate_feature_flag_boolean(
env.initial_tenant, "test-feature-flag"
)["result"]["Err"]
== "No condition group is matched"
)
assert (
env.pageserver.http_client().evaluate_feature_flag_multivariate(
env.initial_tenant, "test-feature-flag"
)["result"]["Ok"]
== "false"
)
env.pageserver.http_client().force_override_feature_flag("test-feature-flag", None)
assert (
"Err"
in env.pageserver.http_client().evaluate_feature_flag_boolean(
env.initial_tenant, "test-feature-flag"
)["result"]
)
assert (
"Err"
in env.pageserver.http_client().evaluate_feature_flag_multivariate(
env.initial_tenant, "test-feature-flag"
)["result"]
)