Compare commits

..

16 Commits

Author SHA1 Message Date
Vlad Lazar
95bb6ce2e4 another fix 2025-06-26 11:11:05 +02:00
Vlad Lazar
858f5f2ddc wip: try out a fix 2025-06-25 14:26:34 +02:00
Vlad Lazar
a4fdef69ad wip: one more log 2025-06-25 12:24:40 +02:00
Vlad Lazar
dae1b58964 wip: more logging 2025-06-24 16:30:58 +02:00
Vlad Lazar
e91a410472 wip: more logs 2025-06-24 13:07:59 +02:00
Vlad Lazar
9553a2670e Merge branch 'main' into vlad/debug-test-sharding-auto-split 2025-06-23 17:47:59 +03:00
Alex Chi Z.
5e2c444525 fix(pageserver): reduce default feature flag refresh interval (#12246)
## Problem

Part of #11813 

## Summary of changes

The current interval is 30s and it costs a lot of $$$. This patch
reduced it to 600s refresh interval (which means that it takes 10min for
feature flags to propagate from UI to the pageserver). In the future we
can let storcon retrieve the feature flags and push it to pageservers.
We can consider creating a new release or we can postpone this to the
week after the next week.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-23 13:51:21 +00:00
Heikki Linnakangas
8d711229c1 ci: Fix bogus skipping of 'make all' step in CI (#12318)
The 'make all' step must run always. PR #12311 accidentally left the
condition in there to skip it if there were no changes in postgres v14
sources. That condition belonged to a whole different step that was
removed altogether in PR#12311, and the condition should've been removed
too.

Per CI failure:
https://github.com/neondatabase/neon/actions/runs/15820148967/job/44587394469
2025-06-23 13:23:33 +00:00
Vlad Lazar
0e490f3be7 pageserver: allow concurrent rw IO on in-mem layer (#12151)
## Problem

Previously, we couldn't read from an in-memory layer while a batch was
being written to it. Vice-versa, we couldn't write to it while there
was an on-going read.

## Summary of Changes

The goal of this change is to improve concurrency. Writes happened
through a &mut self method so the enforcement was at the type system
level.

We attempt to improve by:
1. Adding interior mutability to EphemeralLayer. This involves wrapping
   the buffered writer in a read-write lock.
2. Minimise the time that the read lock is held for. Only hold the read
   lock while reading from the buffers (recently flushed or pending
   flush). If we need to read from the file, drop the lock and allow IO
   to be concurrent.
   
The new benchmark variants with concurrent reads improve between 70 to
200 percent (against main).
Benchmark results are in this
[commit](891f094ce6).

## Future Changes

We can push the interior mutability into the buffered writer. The
mutable tail goes under a read lock, the flushed part goes into an
ArcSwap and then we can read from anything that is flushed _without_ any
locking.
2025-06-23 13:17:30 +00:00
Erik Grinaker
7e41ef1bec pageserver: set gRPC basebackup chunk size to 256 KB (#12314)
gRPC base backups send a stream of fixed-size 64KB chunks.

pagebench basebackup with compression enabled shows this to reduce
throughput:

* 64 KB: 55 RPS
* 128 KB: 69 RPS
* 256 KB: 73 RPS
* 1024 KB: 73 RPS

This patch sets the base backup chunk size to 256 KB.
2025-06-23 12:41:11 +00:00
Vlad Lazar
74d2d233e4 wip: more logs 2025-06-23 13:51:53 +02:00
Heikki Linnakangas
7916aa26e0 Stop using build-tools image in compute image build (#12306)
The build-tools image contains various build tools and dependencies,
mostly Rust-related. The compute image build used it to build
compute_ctl and a few other little rust binaries that are included in
the compute image. However, for extensions built in Rust (pgrx), the
build used a different layer which installed the rust toolchain using
rustup.

Switch to using the same rust toolchain for both pgrx-based extensions
and compute_ctl et al. Since we don't need anything else from the
build-tools image, I switched to using the toolchain installed with
rustup, and eliminated the dependency to build-tools altogether. The
compute image build no longer depends on build-tools.

Note: We no longer use 'mold' for linking compute_ctl et al, since mold
is not included in the build-deps-with-cargo layer. We could add it
there, but it doesn't seem worth it. I proposed stopping using mold
altogether in https://github.com/neondatabase/neon/pull/10735, but that
was rejected because 'mold' is faster for incremental builds. That
doesn't matter much for docker builds however, since they're not
incremental, and the compute binaries are not as large as the storage
server binaries anyway.
2025-06-23 09:11:05 +00:00
Heikki Linnakangas
52ab8f3e65 Use make all in the "Build and Test locally" CI workflow (#12311)
To avoid duplicating the build logic. `make all` covers the separate
`postgres-*` and `neon-pg-ext` steps, and also does `cargo build`.
That's how you would typically do a full local build anyway.
2025-06-23 09:10:32 +00:00
Heikki Linnakangas
3d822dbbde Refactor Makefile rules for building the extensions under pgxn/ (#12305) 2025-06-22 19:43:14 +00:00
Vlad Lazar
57edf217b7 trigger bench 2025-06-20 10:45:55 +02:00
Vlad Lazar
ab1335cba0 wip: add some info logs for timeline shutdown 2025-06-20 10:40:52 +02:00
24 changed files with 547 additions and 446 deletions

View File

@@ -104,11 +104,10 @@ jobs:
# Set some environment variables used by all the steps.
#
# CARGO_FLAGS is extra options to pass to "cargo build", "cargo test" etc.
# It also includes --features, if any
# CARGO_FLAGS is extra options to pass to all "cargo" subcommands.
#
# CARGO_FEATURES is passed to "cargo metadata". It is separate from CARGO_FLAGS,
# because "cargo metadata" doesn't accept --release or --debug options
# CARGO_PROFILE is passed to "cargo build", "cargo test" etc, but not to
# "cargo metadata", because it doesn't accept --release or --debug options.
#
# We run tests with addtional features, that are turned off by default (e.g. in release builds), see
# corresponding Cargo.toml files for their descriptions.
@@ -117,16 +116,16 @@ jobs:
ARCH: ${{ inputs.arch }}
SANITIZERS: ${{ inputs.sanitizers }}
run: |
CARGO_FEATURES="--features testing"
CARGO_FLAGS="--locked --features testing"
if [[ $BUILD_TYPE == "debug" && $ARCH == 'x64' ]]; then
cov_prefix="scripts/coverage --profraw-prefix=$GITHUB_JOB --dir=/tmp/coverage run"
CARGO_FLAGS="--locked"
CARGO_PROFILE=""
elif [[ $BUILD_TYPE == "debug" ]]; then
cov_prefix=""
CARGO_FLAGS="--locked"
CARGO_PROFILE=""
elif [[ $BUILD_TYPE == "release" ]]; then
cov_prefix=""
CARGO_FLAGS="--locked --release"
CARGO_PROFILE="--release"
fi
if [[ $SANITIZERS == 'enabled' ]]; then
make_vars="WITH_SANITIZERS=yes"
@@ -136,8 +135,8 @@ jobs:
{
echo "cov_prefix=${cov_prefix}"
echo "make_vars=${make_vars}"
echo "CARGO_FEATURES=${CARGO_FEATURES}"
echo "CARGO_FLAGS=${CARGO_FLAGS}"
echo "CARGO_PROFILE=${CARGO_PROFILE}"
echo "CARGO_HOME=${GITHUB_WORKSPACE}/.cargo"
} >> $GITHUB_ENV
@@ -189,34 +188,18 @@ jobs:
path: pg_install/v17
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ inputs.build-type }}-pg-${{ steps.pg_v17_rev.outputs.pg_rev }}-bookworm-${{ hashFiles('Makefile', 'build-tools.Dockerfile') }}
- name: Build postgres v14
if: steps.cache_pg_14.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v14 -j$(nproc)
- name: Build postgres v15
if: steps.cache_pg_15.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v15 -j$(nproc)
- name: Build postgres v16
if: steps.cache_pg_16.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v16 -j$(nproc)
- name: Build postgres v17
if: steps.cache_pg_17.outputs.cache-hit != 'true'
run: mold -run make ${make_vars} postgres-v17 -j$(nproc)
- name: Build neon extensions
run: mold -run make ${make_vars} neon-pg-ext -j$(nproc)
- name: Build all
# Note: the Makefile picks up BUILD_TYPE and CARGO_PROFILE from the env variables
run: mold -run make ${make_vars} all -j$(nproc) CARGO_BUILD_FLAGS="$CARGO_FLAGS"
- name: Build walproposer-lib
run: mold -run make ${make_vars} walproposer-lib -j$(nproc)
- name: Run cargo build
env:
WITH_TESTS: ${{ inputs.sanitizers != 'enabled' && '--tests' || '' }}
- name: Build unit tests
if: inputs.sanitizers != 'enabled'
run: |
export ASAN_OPTIONS=detect_leaks=0
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins ${WITH_TESTS}
${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_PROFILE --tests
# Do install *before* running rust tests because they might recompile the
# binaries with different features/flags.
@@ -228,7 +211,7 @@ jobs:
# Install target binaries
mkdir -p /tmp/neon/bin/
binaries=$(
${cov_prefix} cargo metadata $CARGO_FEATURES --format-version=1 --no-deps |
${cov_prefix} cargo metadata $CARGO_FLAGS --format-version=1 --no-deps |
jq -r '.packages[].targets[] | select(.kind | index("bin")) | .name'
)
for bin in $binaries; do
@@ -245,7 +228,7 @@ jobs:
mkdir -p /tmp/neon/test_bin/
test_exe_paths=$(
${cov_prefix} cargo test $CARGO_FLAGS $CARGO_FEATURES --message-format=json --no-run |
${cov_prefix} cargo test $CARGO_FLAGS $CARGO_PROFILE --message-format=json --no-run |
jq -r '.executable | select(. != null)'
)
for bin in $test_exe_paths; do
@@ -279,10 +262,10 @@ jobs:
export LD_LIBRARY_PATH
#nextest does not yet support running doctests
${cov_prefix} cargo test --doc $CARGO_FLAGS $CARGO_FEATURES
${cov_prefix} cargo test --doc $CARGO_FLAGS $CARGO_PROFILE
# run all non-pageserver tests
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E '!package(pageserver)'
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_PROFILE -E '!package(pageserver)'
# run pageserver tests
# (When developing new pageserver features gated by config fields, we commonly make the rust
@@ -291,13 +274,13 @@ jobs:
# pageserver tests from non-pageserver tests cuts down the time it takes for this CI step.)
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=tokio-epoll-uring \
${cov_prefix} \
cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(pageserver)'
cargo nextest run $CARGO_FLAGS $CARGO_PROFILE -E 'package(pageserver)'
# Run separate tests for real S3
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
export REMOTE_STORAGE_S3_BUCKET=neon-github-ci-tests
export REMOTE_STORAGE_S3_REGION=eu-central-1
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_s3)'
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_PROFILE -E 'package(remote_storage)' -E 'test(test_real_s3)'
# Run separate tests for real Azure Blob Storage
# XXX: replace region with `eu-central-1`-like region
@@ -306,7 +289,7 @@ jobs:
export AZURE_STORAGE_ACCESS_KEY="${{ secrets.AZURE_STORAGE_ACCESS_KEY_DEV }}"
export REMOTE_STORAGE_AZURE_CONTAINER="${{ vars.REMOTE_STORAGE_AZURE_CONTAINER }}"
export REMOTE_STORAGE_AZURE_REGION="${{ vars.REMOTE_STORAGE_AZURE_REGION }}"
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES -E 'package(remote_storage)' -E 'test(test_real_azure)'
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_PROFILE -E 'package(remote_storage)' -E 'test(test_real_azure)'
- name: Install postgres binaries
run: |

View File

@@ -670,7 +670,7 @@ jobs:
ghcr.io/neondatabase/neon:${{ needs.meta.outputs.build-tag }}-bookworm-arm64
compute-node-image-arch:
needs: [ check-permissions, build-build-tools-image, meta ]
needs: [ check-permissions, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
@@ -743,7 +743,6 @@ jobs:
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
PG_VERSION=${{ matrix.version.pg }}
BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
DEBIAN_VERSION=${{ matrix.version.debian }}
provenance: false
push: true
@@ -763,7 +762,6 @@ jobs:
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
PG_VERSION=${{ matrix.version.pg }}
BUILD_TAG=${{ needs.meta.outputs.release-tag || needs.meta.outputs.build-tag }}
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
DEBIAN_VERSION=${{ matrix.version.debian }}
provenance: false
push: true

View File

@@ -4,6 +4,12 @@ ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
# managers.
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
# CARGO_BUILD_FLAGS: Extra flags to pass to `cargo build`. `--locked`
# and `--features testing` are popular examples.
#
# CARGO_PROFILE: You can also set to override the cargo profile to
# use. By default, it is derived from BUILD_TYPE.
# All intermediate build artifacts are stored here.
BUILD_DIR := build
@@ -17,15 +23,15 @@ BUILD_TYPE ?= debug
WITH_SANITIZERS ?= no
PG_CFLAGS = -fsigned-char
ifeq ($(BUILD_TYPE),release)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl --enable-cassert --enable-depend
PG_CFLAGS += -O0 -g3 $(CFLAGS)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl
PG_CFLAGS += -O2 -g3 $(CFLAGS)
PG_LDFLAGS = $(LDFLAGS)
# Unfortunately, `--profile=...` is a nightly feature
CARGO_BUILD_FLAGS += --release
CARGO_PROFILE ?= --profile=release
else ifeq ($(BUILD_TYPE),debug)
PG_CONFIGURE_OPTS = --enable-debug --with-openssl --enable-cassert --enable-depend
PG_CFLAGS += -O0 -g3 $(CFLAGS)
PG_LDFLAGS = $(LDFLAGS)
CARGO_PROFILE ?= --profile=dev
else
$(error Bad build type '$(BUILD_TYPE)', see Makefile for options)
endif
@@ -97,7 +103,7 @@ all: neon postgres neon-pg-ext
.PHONY: neon
neon: postgres-headers walproposer-lib cargo-target-dir
+@echo "Compiling Neon"
$(CARGO_CMD_PREFIX) cargo build $(CARGO_BUILD_FLAGS)
$(CARGO_CMD_PREFIX) cargo build $(CARGO_BUILD_FLAGS) $(CARGO_PROFILE)
.PHONY: cargo-target-dir
cargo-target-dir:
# https://github.com/rust-lang/cargo/issues/14281
@@ -176,31 +182,11 @@ postgres-check-%: postgres-%
.PHONY: neon-pg-ext-%
neon-pg-ext-%: postgres-%
+@echo "Compiling neon $*"
mkdir -p $(BUILD_DIR)/neon-$*
+@echo "Compiling neon-specific Postgres extensions for $*"
mkdir -p $(BUILD_DIR)/pgxn-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
-C $(BUILD_DIR)/neon-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install
+@echo "Compiling neon_walredo $*"
mkdir -p $(BUILD_DIR)/neon-walredo-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
-C $(BUILD_DIR)/neon-walredo-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install
+@echo "Compiling neon_rmgr $*"
mkdir -p $(BUILD_DIR)/neon-rmgr-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
-C $(BUILD_DIR)/neon-rmgr-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_rmgr/Makefile install
+@echo "Compiling neon_test_utils $*"
mkdir -p $(BUILD_DIR)/neon-test-utils-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
-C $(BUILD_DIR)/neon-test-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile install
+@echo "Compiling neon_utils $*"
mkdir -p $(BUILD_DIR)/neon-utils-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \
-C $(BUILD_DIR)/neon-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install
-C $(BUILD_DIR)/pgxn-$*\
-f $(ROOT_PROJECT_DIR)/pgxn/Makefile install
# Build walproposer as a static library. walproposer source code is located
# in the pgxn/neon directory.

View File

@@ -77,9 +77,6 @@
# build_and_test.yml github workflow for how that's done.
ARG PG_VERSION
ARG REPOSITORY=ghcr.io/neondatabase
ARG IMAGE=build-tools
ARG TAG=pinned
ARG BUILD_TAG
ARG DEBIAN_VERSION=bookworm
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
@@ -150,6 +147,7 @@ RUN case $DEBIAN_VERSION in \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget ca-certificates pkg-config libssl-dev \
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd curl unzip g++ \
libclang-dev \
jsonnet \
$VERSION_INSTALLS \
&& apt clean && rm -rf /var/lib/apt/lists/* && \
useradd -ms /bin/bash nonroot -b /home
@@ -164,7 +162,7 @@ FROM build-deps AS pg-build
ARG PG_VERSION
COPY vendor/postgres-${PG_VERSION:?} postgres
RUN cd postgres && \
export CONFIGURE_CMD="./configure CFLAGS='-O0 -g3 -fsigned-char' --enable-debug --enable-cassert --with-openssl --with-uuid=ossp \
export CONFIGURE_CMD="./configure CFLAGS='-O2 -g3 -fsigned-char' --enable-debug --with-openssl --with-uuid=ossp \
--with-icu --with-libxml --with-libxslt --with-lz4" && \
if [ "${PG_VERSION:?}" != "v14" ]; then \
# zstd is available only from PG15
@@ -1634,18 +1632,7 @@ FROM pg-build AS neon-ext-build
ARG PG_VERSION
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \
-C pgxn/neon \
-s install && \
make -j $(getconf _NPROCESSORS_ONLN) \
-C pgxn/neon_utils \
-s install && \
make -j $(getconf _NPROCESSORS_ONLN) \
-C pgxn/neon_test_utils \
-s install && \
make -j $(getconf _NPROCESSORS_ONLN) \
-C pgxn/neon_rmgr \
-s install
RUN make -j $(getconf _NPROCESSORS_ONLN) -C pgxn -s install-compute
#########################################################################################
#
@@ -1735,7 +1722,7 @@ FROM extensions-${EXTENSIONS} AS neon-pg-ext-build
# Compile the Neon-specific `compute_ctl`, `fast_import`, and `local_proxy` binaries
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
FROM build-deps-with-cargo AS compute-tools
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
@@ -1745,7 +1732,7 @@ COPY --chown=nonroot . .
RUN --mount=type=cache,uid=1000,target=/home/nonroot/.cargo/registry \
--mount=type=cache,uid=1000,target=/home/nonroot/.cargo/git \
--mount=type=cache,uid=1000,target=/home/nonroot/target \
mold -run cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy && \
cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy && \
mkdir target-bin && \
cp target/release-line-debug-size-lto/compute_ctl \
target/release-line-debug-size-lto/fast_import \
@@ -1839,10 +1826,11 @@ RUN rm /usr/local/pgsql/lib/lib*.a
# Preprocess the sql_exporter configuration files
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS sql_exporter_preprocessor
FROM build-deps AS sql_exporter_preprocessor
ARG PG_VERSION
USER nonroot
WORKDIR /home/nonroot
COPY --chown=nonroot compute compute

View File

@@ -76,6 +76,10 @@ pub struct PostHogConfig {
pub private_api_url: String,
/// Public API URL
pub public_api_url: String,
/// Refresh interval for the feature flag spec
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
pub refresh_interval: Option<Duration>,
}
/// `pageserver.toml`

View File

@@ -36,7 +36,10 @@ impl FeatureResolverBackgroundLoop {
// Main loop of updating the feature flags.
handle.spawn(
async move {
tracing::info!("Starting PostHog feature resolver");
tracing::info!(
"Starting PostHog feature resolver with refresh period: {:?}",
refresh_period
);
let mut ticker = tokio::time::interval(refresh_period);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {

View File

@@ -12,6 +12,9 @@ testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "
fuzz-read-path = ["testing"]
# Enables benchmarking only APIs
benchmarking = []
[dependencies]
anyhow.workspace = true
arc-swap.workspace = true
@@ -127,6 +130,7 @@ harness = false
[[bench]]
name = "bench_ingest"
harness = false
required-features = ["benchmarking"]
[[bench]]
name = "upload_queue"

View File

@@ -1,22 +1,29 @@
use std::env;
use std::num::NonZeroUsize;
use std::sync::Arc;
use bytes::Bytes;
use camino::Utf8PathBuf;
use criterion::{Criterion, criterion_group, criterion_main};
use futures::stream::FuturesUnordered;
use pageserver::config::PageServerConf;
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::keyspace::KeySpace;
use pageserver::l0_flush::{L0FlushConfig, L0FlushGlobalState};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::storage_layer::InMemoryLayer;
use pageserver::tenant::storage_layer::IoConcurrency;
use pageserver::tenant::storage_layer::{InMemoryLayer, ValuesReconstructState};
use pageserver::{page_cache, virtual_file};
use pageserver_api::config::GetVectoredConcurrentIo;
use pageserver_api::key::Key;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::shard::TenantShardId;
use strum::IntoEnumIterator;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::sync::gate::Gate;
use wal_decoder::models::value::Value;
use wal_decoder::serialized_batch::SerializedValueBatch;
@@ -30,7 +37,7 @@ fn murmurhash32(mut h: u32) -> u32 {
h
}
#[derive(serde::Serialize, Clone, Copy, Debug)]
#[derive(serde::Serialize, Clone, Copy, Debug, PartialEq)]
enum KeyLayout {
/// Sequential unique keys
Sequential,
@@ -40,19 +47,30 @@ enum KeyLayout {
RandomReuse(u32),
}
#[derive(serde::Serialize, Clone, Copy, Debug)]
#[derive(serde::Serialize, Clone, Copy, Debug, PartialEq)]
enum WriteDelta {
Yes,
No,
}
#[derive(serde::Serialize, Clone, Copy, Debug, PartialEq)]
enum ConcurrentReads {
Yes,
No,
}
async fn ingest(
conf: &'static PageServerConf,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
concurrent_reads: ConcurrentReads,
) -> anyhow::Result<()> {
if concurrent_reads == ConcurrentReads::Yes {
assert_eq!(key_layout, KeyLayout::Sequential);
}
let mut lsn = utils::lsn::Lsn(1000);
let mut key = Key::from_i128(0x0);
@@ -68,16 +86,18 @@ async fn ingest(
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let layer = InMemoryLayer::create(
conf,
timeline_id,
tenant_shard_id,
lsn,
&gate,
&cancel,
&ctx,
)
.await?;
let layer = Arc::new(
InMemoryLayer::create(
conf,
timeline_id,
tenant_shard_id,
lsn,
&gate,
&cancel,
&ctx,
)
.await?,
);
let data = Value::Image(Bytes::from(vec![0u8; put_size]));
let data_ser_size = data.serialized_size().unwrap() as usize;
@@ -86,6 +106,61 @@ async fn ingest(
pageserver::context::DownloadBehavior::Download,
);
const READ_BATCH_SIZE: u32 = 32;
let (tx, mut rx) = tokio::sync::watch::channel::<Option<Key>>(None);
let reader_cancel = CancellationToken::new();
let reader_handle = if concurrent_reads == ConcurrentReads::Yes {
Some(tokio::task::spawn({
let cancel = reader_cancel.clone();
let layer = layer.clone();
let ctx = ctx.attached_child();
async move {
let gate = Gate::default();
let gate_guard = gate.enter().unwrap();
let io_concurrency = IoConcurrency::spawn_from_conf(
GetVectoredConcurrentIo::SidecarTask,
gate_guard,
);
rx.wait_for(|key| key.is_some()).await.unwrap();
while !cancel.is_cancelled() {
let key = match *rx.borrow() {
Some(some) => some,
None => unreachable!(),
};
let mut start_key = key;
start_key.field6 = key.field6.saturating_sub(READ_BATCH_SIZE);
let key_range = start_key..key.next();
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone());
layer
.get_values_reconstruct_data(
KeySpace::single(key_range),
Lsn(1)..Lsn(u64::MAX),
&mut reconstruct_state,
&ctx,
)
.await
.unwrap();
let mut collect_futs = std::mem::take(&mut reconstruct_state.keys)
.into_values()
.map(|state| state.sink_pending_ios())
.collect::<FuturesUnordered<_>>();
while collect_futs.next().await.is_some() {}
}
drop(io_concurrency);
gate.close().await;
}
}))
} else {
None
};
const BATCH_SIZE: usize = 16;
let mut batch = Vec::new();
@@ -113,19 +188,27 @@ async fn ingest(
batch.push((key.to_compact(), lsn, data_ser_size, data.clone()));
if batch.len() >= BATCH_SIZE {
let last_key = Key::from_compact(batch.last().unwrap().0);
let this_batch = std::mem::take(&mut batch);
let serialized = SerializedValueBatch::from_values(this_batch);
layer.put_batch(serialized, &ctx).await?;
tx.send(Some(last_key)).unwrap();
}
}
if !batch.is_empty() {
let last_key = Key::from_compact(batch.last().unwrap().0);
let this_batch = std::mem::take(&mut batch);
let serialized = SerializedValueBatch::from_values(this_batch);
layer.put_batch(serialized, &ctx).await?;
tx.send(Some(last_key)).unwrap();
}
layer.freeze(lsn + 1).await;
if matches!(write_delta, WriteDelta::Yes) {
if write_delta == WriteDelta::Yes {
let l0_flush_state = L0FlushGlobalState::new(L0FlushConfig::Direct {
max_concurrency: NonZeroUsize::new(1).unwrap(),
});
@@ -136,6 +219,11 @@ async fn ingest(
tokio::fs::remove_file(path).await?;
}
reader_cancel.cancel();
if let Some(handle) = reader_handle {
handle.await.unwrap();
}
Ok(())
}
@@ -147,6 +235,7 @@ fn ingest_main(
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
concurrent_reads: ConcurrentReads,
) {
pageserver::virtual_file::set_io_mode(io_mode);
@@ -156,7 +245,15 @@ fn ingest_main(
.unwrap();
runtime.block_on(async move {
let r = ingest(conf, put_size, put_count, key_layout, write_delta).await;
let r = ingest(
conf,
put_size,
put_count,
key_layout,
write_delta,
concurrent_reads,
)
.await;
if let Err(e) = r {
panic!("{e:?}");
}
@@ -195,6 +292,7 @@ fn criterion_benchmark(c: &mut Criterion) {
key_size: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
concurrent_reads: ConcurrentReads,
}
#[derive(Clone)]
struct HandPickedParameters {
@@ -245,7 +343,7 @@ fn criterion_benchmark(c: &mut Criterion) {
];
let exploded_parameters = {
let mut out = Vec::new();
for io_mode in IoMode::iter() {
for concurrent_reads in [ConcurrentReads::Yes, ConcurrentReads::No] {
for param in expect.clone() {
let HandPickedParameters {
volume_mib,
@@ -253,12 +351,18 @@ fn criterion_benchmark(c: &mut Criterion) {
key_layout,
write_delta,
} = param;
if key_layout != KeyLayout::Sequential && concurrent_reads == ConcurrentReads::Yes {
continue;
}
out.push(ExplodedParameters {
io_mode,
io_mode: IoMode::DirectRw,
volume_mib,
key_size,
key_layout,
write_delta,
concurrent_reads,
});
}
}
@@ -272,9 +376,10 @@ fn criterion_benchmark(c: &mut Criterion) {
key_size,
key_layout,
write_delta,
concurrent_reads,
} = self;
format!(
"io_mode={io_mode:?} volume_mib={volume_mib:?} key_size_bytes={key_size:?} key_layout={key_layout:?} write_delta={write_delta:?}"
"io_mode={io_mode:?} volume_mib={volume_mib:?} key_size_bytes={key_size:?} key_layout={key_layout:?} write_delta={write_delta:?} concurrent_reads={concurrent_reads:?}"
)
}
}
@@ -287,12 +392,23 @@ fn criterion_benchmark(c: &mut Criterion) {
key_size,
key_layout,
write_delta,
concurrent_reads,
} = params;
let put_count = volume_mib * 1024 * 1024 / key_size;
group.throughput(criterion::Throughput::Bytes((key_size * put_count) as u64));
group.sample_size(10);
group.bench_function(id, |b| {
b.iter(|| ingest_main(conf, io_mode, key_size, put_count, key_layout, write_delta))
b.iter(|| {
ingest_main(
conf,
io_mode,
key_size,
put_count,
key_layout,
write_delta,
concurrent_reads,
)
})
});
}
}

View File

@@ -12,6 +12,8 @@ use utils::id::TenantId;
use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION};
const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
#[derive(Clone)]
pub struct FeatureResolver {
inner: Option<Arc<FeatureResolverBackgroundLoop>>,
@@ -139,10 +141,13 @@ impl FeatureResolver {
}
tenants
};
// TODO: make refresh period configurable
inner
.clone()
.spawn(handle, Duration::from_secs(60), fake_tenants);
inner.clone().spawn(
handle,
posthog_config
.refresh_interval
.unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL),
fake_tenants,
);
Ok(FeatureResolver {
inner: Some(inner),
internal_properties: Some(internal_properties),

View File

@@ -3426,7 +3426,7 @@ impl TimelineMetrics {
pub fn dec_frozen_layer(&self, layer: &InMemoryLayer) {
assert!(matches!(layer.info(), InMemoryLayerInfo::Frozen { .. }));
let labels = self.make_frozen_layer_labels(layer);
let size = layer.try_len().expect("frozen layer should have no writer");
let size = layer.len();
TIMELINE_LAYER_COUNT
.get_metric_with_label_values(&labels)
.unwrap()
@@ -3441,7 +3441,7 @@ impl TimelineMetrics {
pub fn inc_frozen_layer(&self, layer: &InMemoryLayer) {
assert!(matches!(layer.info(), InMemoryLayerInfo::Frozen { .. }));
let labels = self.make_frozen_layer_labels(layer);
let size = layer.try_len().expect("frozen layer should have no writer");
let size = layer.len();
TIMELINE_LAYER_COUNT
.get_metric_with_label_values(&labels)
.unwrap()

View File

@@ -3544,8 +3544,9 @@ impl proto::PageService for GrpcPageServiceHandler {
&self,
req: tonic::Request<proto::GetBaseBackupRequest>,
) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
// Send 64 KB chunks to avoid large memory allocations.
const CHUNK_SIZE: usize = 64 * 1024;
// Send chunks of 256 KB to avoid large memory allocations. pagebench basebackup shows this
// to be the sweet spot where throughput is saturated.
const CHUNK_SIZE: usize = 256 * 1024;
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_timeline(&timeline);

View File

@@ -3,7 +3,7 @@
use std::io;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{AtomicU64, Ordering};
use camino::Utf8PathBuf;
use num_traits::Num;
@@ -18,6 +18,7 @@ use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache;
use crate::tenant::storage_layer::inmemory_layer::GlobalResourceUnits;
use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
@@ -30,9 +31,13 @@ pub struct EphemeralFile {
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
page_cache_file_id: page_cache::FileId,
bytes_written: u64,
file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
buffered_writer: BufferedWriter,
buffered_writer: tokio::sync::RwLock<BufferedWriter>,
bytes_written: AtomicU64,
resource_units: std::sync::Mutex<GlobalResourceUnits>,
}
type BufferedWriter = owned_buffers_io::write::BufferedWriter<
@@ -94,9 +99,8 @@ impl EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
page_cache_file_id,
bytes_written: 0,
file: file.clone(),
buffered_writer: BufferedWriter::new(
buffered_writer: tokio::sync::RwLock::new(BufferedWriter::new(
file,
0,
|| IoBufferMut::with_capacity(TAIL_SZ),
@@ -104,7 +108,9 @@ impl EphemeralFile {
cancel.child_token(),
ctx,
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
),
)),
bytes_written: AtomicU64::new(0),
resource_units: std::sync::Mutex::new(GlobalResourceUnits::new()),
})
}
}
@@ -151,15 +157,17 @@ impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter
#[derive(Debug, thiserror::Error)]
pub(crate) enum EphemeralFileWriteError {
#[error("{0}")]
TooLong(String),
#[error("cancelled")]
Cancelled,
}
impl EphemeralFile {
pub(crate) fn len(&self) -> u64 {
self.bytes_written
// TODO(vlad): The value returned here is not always correct if
// we have more than one concurrent writer. Writes are always
// sequenced, but we could grab the buffered writer lock if we wanted
// to.
self.bytes_written.load(Ordering::Acquire)
}
pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
@@ -186,7 +194,7 @@ impl EphemeralFile {
/// Panics if the write is short because there's no way we can recover from that.
/// TODO: make upstack handle this as an error.
pub(crate) async fn write_raw(
&mut self,
&self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> Result<u64, EphemeralFileWriteError> {
@@ -198,22 +206,13 @@ impl EphemeralFile {
}
async fn write_raw_controlled(
&mut self,
&self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> Result<(u64, Option<owned_buffers_io::write::FlushControl>), EphemeralFileWriteError> {
let pos = self.bytes_written;
let mut writer = self.buffered_writer.write().await;
let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
EphemeralFileWriteError::TooLong(format!(
"write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
srcbuf_len = srcbuf.len(),
))
})?;
// Write the payload
let (nwritten, control) = self
.buffered_writer
let (nwritten, control) = writer
.write_buffered_borrowed_controlled(srcbuf, ctx)
.await
.map_err(|e| match e {
@@ -225,43 +224,69 @@ impl EphemeralFile {
"buffered writer has no short writes"
);
self.bytes_written = new_bytes_written;
// There's no realistic risk of overflow here. We won't have exabytes sized files on disk.
let pos = self
.bytes_written
.fetch_add(srcbuf.len().into_u64(), Ordering::AcqRel);
let mut resource_units = self.resource_units.lock().unwrap();
resource_units.maybe_publish_size(self.bytes_written.load(Ordering::Relaxed));
Ok((pos, control))
}
pub(crate) fn tick(&self) -> Option<u64> {
let mut resource_units = self.resource_units.lock().unwrap();
let len = self.bytes_written.load(Ordering::Relaxed);
resource_units.publish_size(len)
}
}
impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
&self,
start: u64,
dst: tokio_epoll_uring::Slice<B>,
mut dst: tokio_epoll_uring::Slice<B>,
ctx: &RequestContext,
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
let submitted_offset = self.buffered_writer.bytes_submitted();
// We will fill the slice in back to front. Hence, we need
// the slice to be fully initialized.
// TODO(vlad): Is there a nicer way of doing this?
dst.as_mut_rust_slice_full_zeroed();
let mutable = match self.buffered_writer.inspect_mutable() {
Some(mutable) => &mutable[0..mutable.pending()],
None => {
// Timeline::cancel and hence buffered writer flush was cancelled.
// Remain read-available while timeline is shutting down.
&[]
}
};
let writer = self.buffered_writer.read().await;
let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
// Read bytes written while under lock. This is a hack to deal with concurrent
// writes updating the number of bytes written. `bytes_written` is not DIO alligned
// but we may end the read there.
//
// TODO(vlad): Feels like there's a nicer path where we align the end if it
// shoots over the end of the file.
let bytes_written = self.bytes_written.load(Ordering::Acquire);
let dst_cap = dst.bytes_total().into_u64();
let end = {
// saturating_add is correct here because the max file size is u64::MAX, so,
// if start + dst.len() > u64::MAX, then we know it will be a short read
let mut end: u64 = start.saturating_add(dst_cap);
if end > self.bytes_written {
end = self.bytes_written;
if end > bytes_written {
end = bytes_written;
}
end
};
let submitted_offset = writer.bytes_submitted();
let maybe_flushed = writer.inspect_maybe_flushed();
let mutable = match writer.inspect_mutable() {
Some(mutable) => &mutable[0..mutable.pending()],
None => {
// Timeline::cancel and hence buffered writer flush was cancelled.
// Remain read-available while timeline is shutting down.
&[]
}
};
// inclusive, exclusive
#[derive(Debug)]
struct Range<N>(N, N);
@@ -306,13 +331,33 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
let dst = if written_range.len() > 0 {
// There are three sources from which we might have to read data:
// 1. The file itself
// 2. The buffer which contains changes currently being flushed
// 3. The buffer which contains chnages yet to be flushed
//
// For better concurrency, we do them in reverse order: perform the in-memory
// reads while holding the writer lock, drop the writer lock and read from the
// file if required.
let dst = if mutable_range.len() > 0 {
let offset_in_buffer = mutable_range
.0
.checked_sub(submitted_offset)
.unwrap()
.into_usize();
let to_copy =
&mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
let bounds = dst.bounds();
let slice = self
.file
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
.await?;
Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
let mut view = dst.slice({
let start =
written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
start..end
});
view.as_mut_rust_slice_full_zeroed()
.copy_from_slice(to_copy);
Slice::from_buf_bounds(Slice::into_inner(view), bounds)
} else {
dst
};
@@ -342,24 +387,15 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
dst
};
let dst = if mutable_range.len() > 0 {
let offset_in_buffer = mutable_range
.0
.checked_sub(submitted_offset)
.unwrap()
.into_usize();
let to_copy =
&mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
drop(writer);
let dst = if written_range.len() > 0 {
let bounds = dst.bounds();
let mut view = dst.slice({
let start =
written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
start..end
});
view.as_mut_rust_slice_full_zeroed()
.copy_from_slice(to_copy);
Slice::from_buf_bounds(Slice::into_inner(view), bounds)
let slice = self
.file
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
.await?;
Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
} else {
dst
};
@@ -460,13 +496,15 @@ mod tests {
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap();
let mutable = file.buffered_writer.mutable();
let writer = file.buffered_writer.read().await;
let mutable = writer.mutable();
let cap = mutable.capacity();
let align = mutable.align();
drop(writer);
let write_nbytes = cap * 2 + cap / 2;
@@ -504,10 +542,11 @@ mod tests {
let file_contents = std::fs::read(file.file.path()).unwrap();
assert!(file_contents == content[0..cap * 2]);
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
let writer = file.buffered_writer.read().await;
let maybe_flushed_buffer_contents = writer.inspect_maybe_flushed().unwrap();
assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
let mutable_buffer_contents = file.buffered_writer.mutable();
let mutable_buffer_contents = writer.mutable();
assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
}
@@ -517,12 +556,14 @@ mod tests {
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap();
// mutable buffer and maybe_flushed buffer each has `cap` bytes.
let cap = file.buffered_writer.mutable().capacity();
let writer = file.buffered_writer.read().await;
let cap = writer.mutable().capacity();
drop(writer);
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
@@ -540,12 +581,13 @@ mod tests {
2 * cap.into_u64(),
"buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
);
let writer = file.buffered_writer.read().await;
assert_eq!(
&file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
&writer.inspect_maybe_flushed().unwrap()[0..cap],
&content[cap..cap * 2]
);
assert_eq!(
&file.buffered_writer.mutable()[0..cap / 2],
&writer.mutable()[0..cap / 2],
&content[cap * 2..cap * 2 + cap / 2]
);
}
@@ -563,13 +605,15 @@ mod tests {
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap();
let mutable = file.buffered_writer.mutable();
let writer = file.buffered_writer.read().await;
let mutable = writer.mutable();
let cap = mutable.capacity();
let align = mutable.align();
drop(writer);
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
.take(cap * 2 + cap / 2)

View File

@@ -109,7 +109,7 @@ pub(crate) enum OnDiskValue {
/// Reconstruct data accumulated for a single key during a vectored get
#[derive(Debug, Default)]
pub(crate) struct VectoredValueReconstructState {
pub struct VectoredValueReconstructState {
pub(crate) on_disk_values: Vec<(Lsn, OnDiskValueIoWaiter)>,
pub(crate) situation: ValueReconstructSituation,
@@ -244,13 +244,60 @@ impl VectoredValueReconstructState {
res
}
/// Benchmarking utility to await for the completion of all pending ios
///
/// # Cancel-Safety
///
/// Technically fine to stop polling this future, but, the IOs will still
/// be executed to completion by the sidecar task and hold on to / consume resources.
/// Better not do it to make reasonsing about the system easier.
#[cfg(feature = "benchmarking")]
pub async fn sink_pending_ios(self) -> Result<(), std::io::Error> {
let mut res = Ok(());
// We should try hard not to bail early, so that by the time we return from this
// function, all IO for this value is done. It's not required -- we could totally
// stop polling the IO futures in the sidecar task, they need to support that,
// but just stopping to poll doesn't reduce the IO load on the disk. It's easier
// to reason about the system if we just wait for all IO to complete, even if
// we're no longer interested in the result.
//
// Revisit this when IO futures are replaced with a more sophisticated IO system
// and an IO scheduler, where we know which IOs were submitted and which ones
// just queued. Cf the comment on IoConcurrency::spawn_io.
for (_lsn, waiter) in self.on_disk_values {
let value_recv_res = waiter
.wait_completion()
// we rely on the caller to poll us to completion, so this is not a bail point
.await;
match (&mut res, value_recv_res) {
(Err(_), _) => {
// We've already failed, no need to process more.
}
(Ok(_), Err(_wait_err)) => {
// This shouldn't happen - likely the sidecar task panicked.
unreachable!();
}
(Ok(_), Ok(Err(err))) => {
let err: std::io::Error = err;
res = Err(err);
}
(Ok(_ok), Ok(Ok(OnDiskValue::RawImage(_img)))) => {}
(Ok(_ok), Ok(Ok(OnDiskValue::WalRecordOrImage(_buf)))) => {}
}
}
res
}
}
/// Bag of data accumulated during a vectored get..
pub(crate) struct ValuesReconstructState {
pub struct ValuesReconstructState {
/// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
/// should not expect to get anything from this hashmap.
pub(crate) keys: HashMap<Key, VectoredValueReconstructState>,
pub keys: HashMap<Key, VectoredValueReconstructState>,
/// The keys which are already retrieved
keys_done: KeySpaceRandomAccum,
@@ -272,7 +319,7 @@ pub(crate) struct ValuesReconstructState {
/// The desired end state is that we always do parallel IO.
/// This struct and the dispatching in the impl will be removed once
/// we've built enough confidence.
pub(crate) enum IoConcurrency {
pub enum IoConcurrency {
Sequential,
SidecarTask {
task_id: usize,
@@ -317,10 +364,7 @@ impl IoConcurrency {
Self::spawn(SelectedIoConcurrency::Sequential)
}
pub(crate) fn spawn_from_conf(
conf: GetVectoredConcurrentIo,
gate_guard: GateGuard,
) -> IoConcurrency {
pub fn spawn_from_conf(conf: GetVectoredConcurrentIo, gate_guard: GateGuard) -> IoConcurrency {
let selected = match conf {
GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
@@ -425,16 +469,6 @@ impl IoConcurrency {
}
}
pub(crate) fn clone(&self) -> Self {
match self {
IoConcurrency::Sequential => IoConcurrency::Sequential,
IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask {
task_id: *task_id,
ios_tx: ios_tx.clone(),
},
}
}
/// Submit an IO to be executed in the background. DEADLOCK RISK, read the full doc string.
///
/// The IO is represented as an opaque future.
@@ -573,6 +607,18 @@ impl IoConcurrency {
}
}
impl Clone for IoConcurrency {
fn clone(&self) -> Self {
match self {
IoConcurrency::Sequential => IoConcurrency::Sequential,
IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask {
task_id: *task_id,
ios_tx: ios_tx.clone(),
},
}
}
}
/// Make noise in case the [`ValuesReconstructState`] gets dropped while
/// there are still IOs in flight.
/// Refer to `collect_pending_ios` for why we prefer not to do that.
@@ -603,7 +649,7 @@ impl Drop for ValuesReconstructState {
}
impl ValuesReconstructState {
pub(crate) fn new(io_concurrency: IoConcurrency) -> Self {
pub fn new(io_concurrency: IoConcurrency) -> Self {
Self {
keys: HashMap::new(),
keys_done: KeySpaceRandomAccum::new(),

View File

@@ -70,23 +70,15 @@ pub struct InMemoryLayer {
/// We use a separate lock for the index to reduce the critical section
/// during which reads cannot be planned.
///
/// If you need access to both the index and the underlying file at the same time,
/// respect the following locking order to avoid deadlocks:
/// 1. [`InMemoryLayer::inner`]
/// 2. [`InMemoryLayer::index`]
///
/// Note that the file backing [`InMemoryLayer::inner`] is append-only,
/// so it is not necessary to hold simultaneous locks on index.
/// This avoids holding index locks across IO, and is crucial for avoiding read tail latency.
/// Note that the file backing [`InMemoryLayer::file`] is append-only,
/// so it is not necessary to hold a lock on the index while reading or writing from the file.
/// In particular:
/// 1. It is safe to read and release [`InMemoryLayer::index`] before locking and reading from [`InMemoryLayer::inner`].
/// 2. It is safe to write and release [`InMemoryLayer::inner`] before locking and updating [`InMemoryLayer::index`].
/// 1. It is safe to read and release [`InMemoryLayer::index`] before reading from [`InMemoryLayer::file`].
/// 2. It is safe to write to [`InMemoryLayer::file`] before locking and updating [`InMemoryLayer::index`].
index: RwLock<BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>>,
/// The above fields never change, except for `end_lsn`, which is only set once,
/// and `index` (see rationale there).
/// All other changing parts are in `inner`, and protected by a mutex.
inner: RwLock<InMemoryLayerInner>,
/// Wrapper for the actual on-disk file. Uses interior mutability for concurrent reads/writes.
file: EphemeralFile,
estimated_in_mem_size: AtomicU64,
}
@@ -96,20 +88,10 @@ impl std::fmt::Debug for InMemoryLayer {
f.debug_struct("InMemoryLayer")
.field("start_lsn", &self.start_lsn)
.field("end_lsn", &self.end_lsn)
.field("inner", &self.inner)
.finish()
}
}
pub struct InMemoryLayerInner {
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
/// PerSeg::page_versions map stores offsets into this file.
file: EphemeralFile,
resource_units: GlobalResourceUnits,
}
/// Support the same max blob length as blob_io, because ultimately
/// all the InMemoryLayer contents end up being written into a delta layer,
/// using the [`crate::tenant::blob_io`].
@@ -258,12 +240,6 @@ struct IndexEntryUnpacked {
pos: u64,
}
impl std::fmt::Debug for InMemoryLayerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemoryLayerInner").finish()
}
}
/// State shared by all in-memory (ephemeral) layers. Updated infrequently during background ticks in Timeline,
/// to minimize contention.
///
@@ -280,7 +256,7 @@ pub(crate) struct GlobalResources {
}
// Per-timeline RAII struct for its contribution to [`GlobalResources`]
struct GlobalResourceUnits {
pub(crate) struct GlobalResourceUnits {
// How many dirty bytes have I added to the global dirty_bytes: this guard object is responsible
// for decrementing the global counter by this many bytes when dropped.
dirty_bytes: u64,
@@ -292,7 +268,7 @@ impl GlobalResourceUnits {
// updated when the Timeline "ticks" in the background.
const MAX_SIZE_DRIFT: u64 = 10 * 1024 * 1024;
fn new() -> Self {
pub(crate) fn new() -> Self {
GLOBAL_RESOURCES
.dirty_layers
.fetch_add(1, AtomicOrdering::Relaxed);
@@ -304,7 +280,7 @@ impl GlobalResourceUnits {
///
/// Returns the effective layer size limit that should be applied, if any, to keep
/// the total number of dirty bytes below the configured maximum.
fn publish_size(&mut self, size: u64) -> Option<u64> {
pub(crate) fn publish_size(&mut self, size: u64) -> Option<u64> {
let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
Ordering::Greater => {
@@ -349,7 +325,7 @@ impl GlobalResourceUnits {
// Call publish_size if the input size differs from last published size by more than
// the drift limit
fn maybe_publish_size(&mut self, size: u64) {
pub(crate) fn maybe_publish_size(&mut self, size: u64) {
let publish = match size.cmp(&self.dirty_bytes) {
Ordering::Equal => false,
Ordering::Greater => size - self.dirty_bytes > Self::MAX_SIZE_DRIFT,
@@ -398,8 +374,8 @@ impl InMemoryLayer {
}
}
pub(crate) fn try_len(&self) -> Option<u64> {
self.inner.try_read().map(|i| i.file.len()).ok()
pub(crate) fn len(&self) -> u64 {
self.file.len()
}
pub(crate) fn assert_writable(&self) {
@@ -430,7 +406,7 @@ impl InMemoryLayer {
// Look up the keys in the provided keyspace and update
// the reconstruct state with whatever is found.
pub(crate) async fn get_values_reconstruct_data(
pub async fn get_values_reconstruct_data(
self: &Arc<InMemoryLayer>,
keyspace: KeySpace,
lsn_range: Range<Lsn>,
@@ -479,14 +455,13 @@ impl InMemoryLayer {
}
}
}
drop(index); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
drop(index); // release the lock before we spawn the IO
let read_from = Arc::clone(self);
let read_ctx = ctx.attached_child();
reconstruct_state
.spawn_io(async move {
let inner = read_from.inner.read().await;
let f = vectored_dio_read::execute(
&inner.file,
&read_from.file,
reads
.iter()
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
@@ -518,7 +493,6 @@ impl InMemoryLayer {
// This is kinda forced for InMemoryLayer because we need to inner.read() anyway,
// but it's less obvious for DeltaLayer and ImageLayer. So, keep this explicit
// drop for consistency among all three layer types.
drop(inner);
drop(read_from);
})
.await;
@@ -549,12 +523,6 @@ impl std::fmt::Display for InMemoryLayer {
}
impl InMemoryLayer {
/// Get layer size.
pub async fn size(&self) -> Result<u64> {
let inner = self.inner.read().await;
Ok(inner.file.len())
}
pub fn estimated_in_mem_size(&self) -> u64 {
self.estimated_in_mem_size.load(AtomicOrdering::Relaxed)
}
@@ -587,10 +555,7 @@ impl InMemoryLayer {
end_lsn: OnceLock::new(),
opened_at: Instant::now(),
index: RwLock::new(BTreeMap::new()),
inner: RwLock::new(InMemoryLayerInner {
file,
resource_units: GlobalResourceUnits::new(),
}),
file,
estimated_in_mem_size: AtomicU64::new(0),
})
}
@@ -599,41 +564,37 @@ impl InMemoryLayer {
///
/// Errors are not retryable, the [`InMemoryLayer`] must be discarded, and not be read from.
/// The reason why it's not retryable is that the [`EphemeralFile`] writes are not retryable.
///
/// This method shall not be called concurrently. We enforce this property via [`crate::tenant::Timeline::write_lock`].
///
/// TODO: it can be made retryable if we aborted the process on EphemeralFile write errors.
pub async fn put_batch(
&self,
serialized_batch: SerializedValueBatch,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let (base_offset, metadata) = {
let mut inner = self.inner.write().await;
self.assert_writable();
self.assert_writable();
let base_offset = inner.file.len();
let base_offset = self.file.len();
let SerializedValueBatch {
raw,
metadata,
max_lsn: _,
len: _,
} = serialized_batch;
let SerializedValueBatch {
raw,
metadata,
max_lsn: _,
len: _,
} = serialized_batch;
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
// Write the batch to the file
self.file.write_raw(&raw, ctx).await?;
let new_size = self.file.len();
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
// also IndexEntry and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
inner.resource_units.maybe_publish_size(new_size);
(base_offset, metadata)
};
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
// also IndexEntry and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
// Update the index with the new entries
let mut index = self.index.write().await;
@@ -686,10 +647,8 @@ impl InMemoryLayer {
self.opened_at
}
pub(crate) async fn tick(&self) -> Option<u64> {
let mut inner = self.inner.write().await;
let size = inner.file.len();
inner.resource_units.publish_size(size)
pub(crate) fn tick(&self) -> Option<u64> {
self.file.tick()
}
pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {
@@ -753,12 +712,6 @@ impl InMemoryLayer {
gate: &utils::sync::gate::Gate,
cancel: CancellationToken,
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. See the comment on
// [`InMemoryLayer::freeze`] to understand how locking between the append path
// and layer flushing works.
let inner = self.inner.read().await;
let index = self.index.read().await;
use l0_flush::Inner;
@@ -793,7 +746,7 @@ impl InMemoryLayer {
match l0_flush_global_state {
l0_flush::Inner::Direct { .. } => {
let file_contents = inner.file.load_to_io_buf(ctx).await?;
let file_contents = self.file.load_to_io_buf(ctx).await?;
let file_contents = file_contents.freeze();
for (key, vec_map) in index.iter() {

View File

@@ -816,7 +816,7 @@ impl From<layer_manager::Shutdown> for FlushLayerError {
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum GetVectoredError {
pub enum GetVectoredError {
#[error("timeline shutting down")]
Cancelled,
@@ -849,7 +849,7 @@ impl From<GetReadyAncestorError> for GetVectoredError {
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum GetReadyAncestorError {
pub enum GetReadyAncestorError {
#[error("ancestor LSN wait error")]
AncestorLsnTimeout(#[from] WaitLsnError),
@@ -939,7 +939,7 @@ impl std::fmt::Debug for Timeline {
}
#[derive(thiserror::Error, Debug, Clone)]
pub(crate) enum WaitLsnError {
pub enum WaitLsnError {
// Called on a timeline which is shutting down
#[error("Shutdown")]
Shutdown,
@@ -1902,16 +1902,11 @@ impl Timeline {
return;
};
let Some(current_size) = open_layer.try_len() else {
// Unexpected: since we hold the write guard, nobody else should be writing to this layer, so
// read lock to get size should always succeed.
tracing::warn!("Lock conflict while reading size of open layer");
return;
};
let current_size = open_layer.len();
let current_lsn = self.get_last_record_lsn();
let checkpoint_distance_override = open_layer.tick().await;
let checkpoint_distance_override = open_layer.tick();
if let Some(size_override) = checkpoint_distance_override {
if current_size > size_override {
@@ -2151,12 +2146,13 @@ impl Timeline {
// Regardless of whether we're going to try_freeze_and_flush
// or not, stop ingesting any more data.
let walreceiver = self.walreceiver.lock().unwrap().take();
tracing::debug!(
tracing::info!(
is_some = walreceiver.is_some(),
"Waiting for WalReceiverManager..."
);
if let Some(walreceiver) = walreceiver {
walreceiver.shutdown().await;
tracing::info!("WalReceiverManager shut down");
}
// ... and inform any waiters for newer LSNs that there won't be any.
self.last_record_lsn.shutdown();
@@ -2253,6 +2249,7 @@ impl Timeline {
// As documented in remote_client.stop()'s doc comment, it's our responsibility
// to shut down the upload queue tasks.
// TODO: fix that, task management should be encapsulated inside remote_client.
tracing::info!("Waiting for remote uploads tasks...");
task_mgr::shutdown_tasks(
Some(TaskKind::RemoteUploadTask),
Some(self.tenant_shard_id),
@@ -2261,12 +2258,13 @@ impl Timeline {
.await;
// TODO: work toward making this a no-op. See this function's doc comment for more context.
tracing::debug!("Waiting for tasks...");
tracing::info!("Waiting for tasks...");
task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
{
// Allow any remaining in-memory layers to do cleanup -- until that, they hold the gate
// open.
tracing::info!("Waiting for layer manager shutdown...");
let mut write_guard = self.write_lock.lock().await;
self.layers
.write(LayerManagerLockHolder::Shutdown)
@@ -2278,6 +2276,7 @@ impl Timeline {
//
// TODO: once above shutdown_tasks is a no-op, we can close the gate before calling shutdown_tasks
// and use a TBD variant of shutdown_tasks that asserts that there were no tasks left.
tracing::info!("Waiting for timeline gate close...");
self.gate.close().await;
self.metrics.shutdown();
@@ -4675,6 +4674,7 @@ impl Timeline {
};
info!("started flush loop");
loop {
tokio::select! {
_ = self.cancel.cancelled() => {
@@ -4689,13 +4689,12 @@ impl Timeline {
// The highest LSN to which we flushed in the loop over frozen layers
let mut flushed_to_lsn = Lsn(0);
let result = loop {
// Force not bailing early by wrapping the code into a closure.
#[allow(clippy::redundant_closure_call)]
let result = (async || { loop {
if self.cancel.is_cancelled() {
info!("dropping out of flush loop for timeline shutdown");
// Note: we do not bother transmitting into [`layer_flush_done_tx`], because
// anyone waiting on that will respect self.cancel as well: they will stop
// waiting at the same time we as drop out of this loop.
return;
break Err(FlushLayerError::Cancelled);
}
// Break to notify potential waiters as soon as we've flushed the requested LSN. If
@@ -4708,8 +4707,8 @@ impl Timeline {
let (layer, l0_count, frozen_count, frozen_size) = {
let layers = self.layers.read(LayerManagerLockHolder::FlushLoop).await;
let Ok(lm) = layers.layer_map() else {
info!("dropping out of flush loop for timeline shutdown");
return;
info!("dropping out of flush loop for layer map shutdown");
break Err(FlushLayerError::Cancelled);
};
let l0_count = lm.level0_deltas().len();
let frozen_count = lm.frozen_layers.len();
@@ -4757,8 +4756,8 @@ impl Timeline {
match self.flush_frozen_layer(layer, ctx).await {
Ok(layer_lsn) => flushed_to_lsn = max(flushed_to_lsn, layer_lsn),
Err(FlushLayerError::Cancelled) => {
info!("dropping out of flush loop for timeline shutdown");
return;
info!("dropping out of flush loop for remote client shutdown");
break Err(FlushLayerError::Cancelled);
}
err @ Err(
FlushLayerError::NotRunning(_)
@@ -4799,7 +4798,7 @@ impl Timeline {
}
}
}
};
}})().await;
// Unsharded tenants should never advance their LSN beyond the end of the
// highest layer they write: such gaps between layer data and the frozen LSN
@@ -7372,7 +7371,7 @@ impl TimelineWriter<'_> {
.tl
.get_layer_for_write(at, &self.write_guard, ctx)
.await?;
let initial_size = layer.size().await?;
let initial_size = layer.len();
let last_freeze_at = self.last_freeze_at.load();
self.write_guard.replace(TimelineWriterState::new(
@@ -7413,7 +7412,7 @@ impl TimelineWriter<'_> {
if let Some(wait_threshold) = wait_threshold {
if l0_count >= wait_threshold {
debug!(
info!(
"layer roll waiting for flush due to compaction backpressure at {l0_count} L0 layers"
);
self.tl.wait_flush_completion(flush_id).await?;

View File

@@ -106,11 +106,12 @@ impl WalReceiver {
match loop_step_result {
Ok(()) => continue,
Err(_cancelled) => {
trace!("Connection manager loop ended, shutting down");
info!("Connection manager loop ended, shutting down");
break;
}
}
}
info!("Awaiting connection manager state shutdown ...");
connection_manager_state.shutdown().await;
*loop_status.write().unwrap() = None;
info!("task exits");
@@ -128,7 +129,7 @@ impl WalReceiver {
#[instrument(skip_all, level = tracing::Level::DEBUG)]
pub async fn shutdown(self) {
debug_assert_current_span_has_tenant_and_timeline_id();
debug!("cancelling walreceiver tasks");
info!("cancelling walreceiver tasks");
self.cancel.cancel();
match self.task.await {
Ok(()) => debug!("Shutdown success"),
@@ -171,7 +172,7 @@ enum TaskStateUpdate<E> {
Progress(E),
}
impl<E: Clone> TaskHandle<E> {
impl<E: Clone + std::fmt::Debug> TaskHandle<E> {
/// Initializes the task, starting it immediately after the creation.
///
/// The second argument to `task` is a child token of `cancel_parent` ([`CancellationToken::child_token`]).
@@ -243,10 +244,30 @@ impl<E: Clone> TaskHandle<E> {
}
/// Aborts current task, waiting for it to finish.
async fn shutdown(self) {
if let Some(jh) = self.join_handle {
async fn shutdown(mut self) {
if let Some(mut jh) = self.join_handle {
self.cancellation.cancel();
match jh.await {
let res = loop {
tokio::select! {
res = &mut jh => {
break res;
},
received = self.events_receiver.changed() => {
match received {
Ok(()) => {
let event = self.events_receiver.borrow();
tracing::info!("Received update after cancellation: {event:?}");
},
Err(err) => {
tracing::info!("Sender dropped after cancellation: {err}");
}
}
}
}
};
match res {
Ok(Ok(())) => debug!("Shutdown success"),
Ok(Err(e)) => error!("Shutdown task error: {e:?}"),
Err(je) if je.is_cancelled() => unreachable!("not used"),

View File

@@ -66,7 +66,7 @@ pub(super) async fn connection_manager_loop_step(
} {
Ok(()) => {}
Err(new_state) => {
debug!(
info!(
?new_state,
"state changed, stopping wal connection manager loop"
);
@@ -145,7 +145,7 @@ pub(super) async fn connection_manager_loop_step(
}
TaskEvent::End(walreceiver_task_result) => {
match walreceiver_task_result {
Ok(()) => debug!("WAL receiving task finished"),
Ok(()) => info!("WAL receiving task finished"),
Err(e) => error!("wal receiver task finished with an error: {e:?}"),
}
connection_manager_state.drop_old_connection(false).await;

View File

@@ -193,7 +193,7 @@ pub(super) async fn handle_walreceiver_connection(
debug_assert_current_span_has_tenant_and_timeline_id();
select! {
connection_result = connection => match connection_result {
Ok(()) => debug!("Walreceiver db connection closed"),
Ok(()) => info!("Walreceiver db connection closed"),
Err(connection_error) => {
match WalReceiverError::from(connection_error) {
WalReceiverError::ExpectedSafekeeperError(_) => {
@@ -202,7 +202,7 @@ pub(super) async fn handle_walreceiver_connection(
},
WalReceiverError::SuccessfulCompletion(_) => {}
WalReceiverError::Cancelled => {
debug!("Connection cancelled")
info!("Connection cancelled")
}
WalReceiverError::ClosedGate => {
// doesn't happen at runtime
@@ -213,7 +213,7 @@ pub(super) async fn handle_walreceiver_connection(
}
}
},
_ = connection_cancellation.cancelled() => debug!("Connection cancelled"),
_ = connection_cancellation.cancelled() => info!("Connection cancelled"),
}
drop(poller_guard);
}
@@ -299,7 +299,7 @@ pub(super) async fn handle_walreceiver_connection(
select! {
biased;
_ = cancellation.cancelled() => {
debug!("walreceiver interrupted");
info!("walreceiver interrupted");
None
}
replication_message = physical_stream.next() => replication_message,
@@ -307,6 +307,19 @@ pub(super) async fn handle_walreceiver_connection(
} {
let replication_message = replication_message?;
match &replication_message {
ReplicationMessage::XLogData(_) => {
tracing::info!("Received XLogData replication message")
}
ReplicationMessage::PrimaryKeepAlive(_) => {
tracing::info!("Received PrimaryKeepAlive replication message")
}
ReplicationMessage::RawInterpretedWalRecords(_) => {
tracing::info!("Received RawInterpretedWalRecords replication message")
}
unknown => tracing::info!("Received unknown replication message: {unknown:?}"),
}
let now = Utc::now().naive_utc();
let last_rec_lsn_before_msg = last_rec_lsn;
@@ -577,7 +590,7 @@ pub(super) async fn handle_walreceiver_connection(
shard_number: timeline.tenant_shard_id.shard_number.0 as u32,
};
debug!("neon_status_update {status_update:?}");
info!("sending neon_status_update {status_update:?}");
let mut data = BytesMut::new();
status_update.serialize(&mut data);
@@ -585,6 +598,8 @@ pub(super) async fn handle_walreceiver_connection(
.as_mut()
.zenith_status_update(data.len() as u64, &data)
.await?;
info!("sent neon_status_update");
}
}

28
pgxn/Makefile Normal file
View File

@@ -0,0 +1,28 @@
# This makefile assumes that 'pg_config' is in the path, or is passed in the
# PG_CONFIG variable.
#
# This is used in two different ways:
#
# 1. The main makefile calls this, when you invoke the `make neon-pg-ext-%`
# target. It passes PG_CONFIG pointing to pg_install/%/bin/pg_config.
# This is a VPATH build; the current directory is build/pgxn-%, and
# the path to the Makefile is passed with the -f argument.
#
# 2. compute-node.Dockerfile invokes this to build the compute extensions
# for the specific Postgres version. It relies on pg_config already
# being in $(PATH).
srcdir = $(dir $(firstword $(MAKEFILE_LIST)))
PG_CONFIG = pg_config
subdirs = neon neon_rmgr neon_walredo neon_utils neon_test_utils
.PHONY: install install-compute install-storage $(subdirs)
install: $(subdirs)
install-compute: neon neon_utils neon_test_utils neon_rmgr
install-storage: neon_rmgr neon_walredo
$(subdirs): %:
mkdir -p $*
$(MAKE) PG_CONFIG=$(PG_CONFIG) -C $* -f $(abspath $(srcdir)/$@/Makefile) install

View File

@@ -412,18 +412,6 @@ compact_prefetch_buffers(void)
return false;
}
static void
dump_prefetch_state(void)
{
neon_log(LOG, "PREFETCH STATE: ring_last=%lx, ring_receive=%lx, ring_flush=%lx, ring_unused=%lx",
MyPState->ring_last, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
for (uint64 i = MyPState->ring_last; i < MyPState->ring_unused; i++)
{
PrefetchRequest *slot = GetPrfSlot(i);
neon_log(LOG, "PREFETCH STATE: slot %lx status=%d, reqid=%lx", i, slot->status, slot->reqid);
}
}
/*
* If there might be responses still in the TCP buffer, then we should try to
* use those, to reduce any TCP backpressure on the OS/PS side.
@@ -458,24 +446,15 @@ communicator_prefetch_pump_state(void)
if (response == NULL)
break;
if (response->tag != T_NeonGetPageResponse && response->tag != T_NeonErrorResponse)
{
dump_prefetch_state();
neon_shard_log(slot->shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld",
response->tag, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
}
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
{
dump_prefetch_state();
neon_shard_log(slot->shard_no, PANIC,
neon_shard_log(slot->shard_no, ERROR,
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long) slot->my_ring_index, (long) MyPState->ring_receive);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
MyPState->n_requests_inflight -= 1;
@@ -483,13 +462,6 @@ communicator_prefetch_pump_state(void)
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
if (response->reqid != slot->reqid && response->tag != T_NeonErrorResponse)
{
dump_prefetch_state();
ereport(PANIC,
(errmsg(NEON_TAG "[shard %d, reqid %lx] pump state receive unexpected response %d with reqid %lx", slot->shard_no, slot->reqid, response->tag, response->reqid),
errbacktrace()));
}
/* update slot state */
slot->status = PRFS_RECEIVED;
slot->response = response;
@@ -750,13 +722,10 @@ prefetch_read(PrefetchRequest *slot)
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
{
dump_prefetch_state();
neon_shard_log(slot->shard_no, PANIC,
neon_shard_log(slot->shard_no, ERROR,
"Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long)slot->my_ring_index, (long)MyPState->ring_receive);
}
/*
* Copy the request info so that if an error happens and the prefetch
@@ -776,28 +745,10 @@ prefetch_read(PrefetchRequest *slot)
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
{
dump_prefetch_state();
neon_shard_log(shard_no, PANIC,
neon_shard_log(shard_no, ERROR,
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long) slot->my_ring_index, (long) MyPState->ring_receive);
}
if (response->tag != T_NeonGetPageResponse && response->tag != T_NeonErrorResponse)
{
dump_prefetch_state();
neon_shard_log(shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld",
response->tag, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
}
if (response->reqid != slot->reqid)
{
dump_prefetch_state();
ereport(PANIC,
(errmsg(NEON_TAG "[shard %d, reqid %lx] prefetch_read receive unexpected response %d with reqid %lx", slot->shard_no, slot->reqid, response->tag, response->reqid),
errbacktrace()));
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
@@ -1420,7 +1371,7 @@ page_server_request(void const *req)
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
break;
default:
neon_log(PANIC, "Unexpected request tag: %d", messageTag(req));
neon_log(ERROR, "Unexpected request tag: %d", messageTag(req));
}
shard_no = get_shard_number(&tag);
@@ -1437,13 +1388,13 @@ page_server_request(void const *req)
{
PG_TRY();
{
consume_prefetch_responses();
while (!page_server->send(shard_no, (NeonRequest *) req)
|| !page_server->flush(shard_no))
{
/* do nothing */
}
MyNeonCounters->pageserver_open_requests++;
consume_prefetch_responses();
resp = page_server->receive(shard_no);
MyNeonCounters->pageserver_open_requests--;
}
@@ -1453,7 +1404,6 @@ page_server_request(void const *req)
* Cancellation in this code needs to be handled better at some
* point, but this currently seems fine for now.
*/
prefetch_on_ps_disconnect();
page_server->disconnect(shard_no);
MyNeonCounters->pageserver_open_requests = 0;
@@ -1552,7 +1502,7 @@ nm_pack_request(NeonRequest *msg)
case T_NeonDbSizeResponse:
case T_NeonGetSlruSegmentResponse:
default:
neon_log(PANIC, "unexpected neon message tag 0x%02x", msg->tag);
neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag);
break;
}
return s;
@@ -1704,7 +1654,7 @@ nm_unpack_response(StringInfo s)
case T_NeonDbSizeRequest:
case T_NeonGetSlruSegmentRequest:
default:
neon_log(PANIC, "unexpected neon message tag 0x%02x", tag);
neon_log(ERROR, "unexpected neon message tag 0x%02x", tag);
break;
}
@@ -2433,11 +2383,12 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
.segno = segno
};
consume_prefetch_responses();
do
{
while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no));
consume_prefetch_responses();
resp = page_server->receive(shard_no);
} while (resp == NULL);

View File

@@ -116,20 +116,13 @@ typedef enum FileCacheBlockState
REQUESTED /* some other backend is waiting for block to be loaded */
} FileCacheBlockState;
typedef enum RelationKind
{
RELKIND_UNKNOWN,
RELKIND_HEAP,
RELKIND_INDEX
} RelationKind;
typedef struct FileCacheEntry
{
BufferTag key;
uint32 hash;
uint32 offset;
uint32 access_count:30;
uint32 relkind:2;
uint32 access_count;
dlist_node list_node; /* LRU/holes list node */
uint32 state[FLEXIBLE_ARRAY_MEMBER]; /* two bits per block */
} FileCacheEntry;
@@ -498,7 +491,6 @@ lfc_change_limit_hook(int newval, void *extra)
hole->hash = hash;
hole->offset = offset;
hole->access_count = 0;
hole->relkind = RELKIND_UNKNOWN;
CriticalAssert(!found);
dlist_push_tail(&lfc_ctl->holes, &hole->list_node);
@@ -1458,7 +1450,6 @@ lfc_init_new_entry(FileCacheEntry* entry, uint32 hash)
}
entry->access_count = 1;
entry->relkind = RELKIND_UNKNOWN;
entry->hash = hash;
lfc_ctl->pinned += 1;
@@ -1653,7 +1644,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
uint64 generation;
uint32 entry_offset;
int buf_offset = 0;
RelationKind relkind = RELKIND_UNKNOWN;
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return;
@@ -1670,27 +1661,7 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
return;
}
generation = lfc_ctl->generation;
for (int i = 0; i < nblocks; i++)
{
Page page = (Page)buffers[i];
if (!PageIsNew(page))
{
RelationKind pagekind = PageGetSpecialSize(page) != 0 ? RELKIND_INDEX : RELKIND_HEAP;
if (relkind == RELKIND_UNKNOWN)
{
relkind = pagekind;
}
else
{
if (relkind != pagekind)
{
ereport(PANIC,
(errmsg("Inconsistent writing %s page %u %u/%u/%u.%u to LFC", pagekind == RELKIND_INDEX ? "index" : "heap", blkno+i, RelFileInfoFmt(rinfo), forkNum),
errbacktrace()));
}
}
}
}
/*
* For every chunk that has blocks we're interested in, we
* 1. get the chunk header
@@ -1740,16 +1711,6 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
lfc_ctl->pinned += 1;
dlist_delete(&entry->list_node);
}
if (relkind != RELKIND_UNKNOWN)
{
if (entry->relkind != RELKIND_UNKNOWN && entry->relkind != relkind)
{
ereport(PANIC,
(errmsg("Writing unexpected %s page %u %u/%u/%u.%u to LFC", relkind == RELKIND_INDEX ? "index" : "heap", blkno, RelFileInfoFmt(rinfo), forkNum),
errbacktrace()));
}
entry->relkind = relkind;
}
}
else
{
@@ -1764,7 +1725,6 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
nblocks -= blocks_in_chunk;
continue;
}
entry->relkind = RELKIND_UNKNOWN;
}
entry_offset = entry->offset;

View File

@@ -126,7 +126,7 @@ static PagestoreShmemState *pagestore_shared;
static uint64 pagestore_local_counter = 0;
typedef enum PSConnectionState {
PS_Disconnected = 1, /* no connection yet */
PS_Disconnected, /* no connection yet */
PS_Connecting_Startup, /* connection starting up */
PS_Connecting_PageStream, /* negotiating pagestream */
PS_Connected, /* connected, pagestream established */
@@ -373,9 +373,8 @@ get_shard_number(BufferTag *tag)
}
static inline void
CLEANUP_AND_DISCONNECT(PageServer *shard)
CLEANUP_AND_DISCONNECT(PageServer *shard)
{
neon_log(LOG, "Cleanup and disconnect shard %d with state %d", (int)(shard - page_servers), shard->state);
if (shard->wes_read)
{
FreeWaitEventSet(shard->wes_read);
@@ -396,7 +395,7 @@ CLEANUP_AND_DISCONNECT(PageServer *shard)
* complete the connection (e.g. due to receiving an earlier cancellation
* during connection start).
* Returns true if successfully connected; false if the connection failed.
*
*
* Throws errors in unrecoverable situations, or when this backend's query
* is canceled.
*/
@@ -405,7 +404,7 @@ pageserver_connect(shardno_t shard_no, int elevel)
{
PageServer *shard = &page_servers[shard_no];
char connstr[MAX_PAGESERVER_CONNSTRING_SIZE];
neon_log(LOG, "Initiate connect to shard %d state %d", shard_no, shard->state);
/*
* Get the connection string for this shard. If the shard map has been
* updated since we last looked, this will also disconnect any existing
@@ -969,7 +968,6 @@ retry:
static void
pageserver_disconnect(shardno_t shard_no)
{
neon_log(LOG, "pageserver_disconnect shard %d", shard_no);
/*
* If the connection to any pageserver is lost, we throw away the
* whole prefetch queue, even for other pageservers. It should not
@@ -1000,7 +998,7 @@ pageserver_disconnect_shard(shardno_t shard_no)
* to attach wait events to the WaitEventSets.
*/
CLEANUP_AND_DISCONNECT(shard);
neon_log(LOG, "Disconnect shard %d", shard_no);
shard->state = PS_Disconnected;
}
@@ -1356,10 +1354,6 @@ pg_init_libpagestore(void)
{
pagestore_prepare_shmem();
for (int i = 0; i < MAX_SHARDS; i++)
page_servers[i].state = PS_Disconnected;
DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
@@ -1533,4 +1527,6 @@ pg_init_libpagestore(void)
smgr_init_hook = smgr_init_neon;
dbsize_hook = neon_dbsize;
}
memset(page_servers, 0, sizeof(page_servers));
}

View File

@@ -5,7 +5,7 @@
],
"v16": [
"16.9",
"85091d9c28958f19f24aee14526735392da11656"
"7a4c0eacaeb9b97416542fa19103061c166460b1"
],
"v15": [
"15.13",