mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 07:00:38 +00:00
Compare commits
51 Commits
arpad/endp
...
yuchen/ben
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e3e3b721ec | ||
|
|
1ca71d607b | ||
|
|
b6a2516c1c | ||
|
|
b54764bccb | ||
|
|
e4f437a354 | ||
|
|
8fdf786217 | ||
|
|
9e0148de11 | ||
|
|
7b41ee872e | ||
|
|
277c33ba3f | ||
|
|
2b788cb53f | ||
|
|
13feda0669 | ||
|
|
96a1b71c84 | ||
|
|
a74ab9338d | ||
|
|
c3302ad7e1 | ||
|
|
7404887b81 | ||
|
|
87e4dd23a1 | ||
|
|
7a2f0ed8d4 | ||
|
|
5c2356988e | ||
|
|
4284fcd38c | ||
|
|
441612c1ce | ||
|
|
77630e5408 | ||
|
|
3d380acbd1 | ||
|
|
4630b70962 | ||
|
|
8a37f412c2 | ||
|
|
d4ebd5ccd3 | ||
|
|
76f0e4fd1d | ||
|
|
28718bfadc | ||
|
|
e5bf2bec49 | ||
|
|
54d253d51a | ||
|
|
0f63c957a6 | ||
|
|
77801fe3be | ||
|
|
78a17a7051 | ||
|
|
826e2395a8 | ||
|
|
9db6b1e3c8 | ||
|
|
5acc61bdbc | ||
|
|
990bc65a20 | ||
|
|
6844b5f460 | ||
|
|
ffd88ede38 | ||
|
|
d6d8a16dbc | ||
|
|
20e6a0c8a2 | ||
|
|
ce7cd36100 | ||
|
|
b0d7fc7564 | ||
|
|
7b34e73c15 | ||
|
|
e5bb85d407 | ||
|
|
e0848c28d9 | ||
|
|
bdffc352e7 | ||
|
|
45998046f3 | ||
|
|
26c8b50451 | ||
|
|
f0efc908d7 | ||
|
|
224cbb4025 | ||
|
|
dd1c45e896 |
14
.github/workflows/benchmarking.yml
vendored
14
.github/workflows/benchmarking.yml
vendored
@@ -541,7 +541,7 @@ jobs:
|
||||
|
||||
runs-on: ${{ matrix.RUNNER }}
|
||||
container:
|
||||
image: neondatabase/build-tools:pinned
|
||||
image: neondatabase/build-tools:pinned-bookworm
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
@@ -558,12 +558,12 @@ jobs:
|
||||
arch=$(uname -m | sed 's/x86_64/amd64/g' | sed 's/aarch64/arm64/g')
|
||||
|
||||
cd /home/nonroot
|
||||
wget -q "https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-17/libpq5_17.2-1.pgdg110+1_${arch}.deb"
|
||||
wget -q "https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-16/postgresql-client-16_16.6-1.pgdg110+1_${arch}.deb"
|
||||
wget -q "https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-16/postgresql-16_16.6-1.pgdg110+1_${arch}.deb"
|
||||
dpkg -x libpq5_17.2-1.pgdg110+1_${arch}.deb pg
|
||||
dpkg -x postgresql-16_16.6-1.pgdg110+1_${arch}.deb pg
|
||||
dpkg -x postgresql-client-16_16.6-1.pgdg110+1_${arch}.deb pg
|
||||
wget -q "https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-17/libpq5_17.2-1.pgdg120+1_${arch}.deb"
|
||||
wget -q "https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-16/postgresql-client-16_16.6-1.pgdg120+1_${arch}.deb"
|
||||
wget -q "https://apt.postgresql.org/pub/repos/apt/pool/main/p/postgresql-16/postgresql-16_16.6-1.pgdg120+1_${arch}.deb"
|
||||
dpkg -x libpq5_17.2-1.pgdg120+1_${arch}.deb pg
|
||||
dpkg -x postgresql-16_16.6-1.pgdg120+1_${arch}.deb pg
|
||||
dpkg -x postgresql-client-16_16.6-1.pgdg120+1_${arch}.deb pg
|
||||
|
||||
mkdir -p /tmp/neon/pg_install/v16/bin
|
||||
ln -s /home/nonroot/pg/usr/lib/postgresql/16/bin/pgbench /tmp/neon/pg_install/v16/bin/pgbench
|
||||
|
||||
75
.github/workflows/build-build-tools-image.yml
vendored
75
.github/workflows/build-build-tools-image.yml
vendored
@@ -2,6 +2,17 @@ name: Build build-tools image
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
archs:
|
||||
description: "Json array of architectures to build"
|
||||
# Default values are set in `check-image` job, `set-variables` step
|
||||
type: string
|
||||
required: false
|
||||
debians:
|
||||
description: "Json array of Debian versions to build"
|
||||
# Default values are set in `check-image` job, `set-variables` step
|
||||
type: string
|
||||
required: false
|
||||
outputs:
|
||||
image-tag:
|
||||
description: "build-tools tag"
|
||||
@@ -32,25 +43,37 @@ jobs:
|
||||
check-image:
|
||||
runs-on: ubuntu-22.04
|
||||
outputs:
|
||||
tag: ${{ steps.get-build-tools-tag.outputs.image-tag }}
|
||||
found: ${{ steps.check-image.outputs.found }}
|
||||
archs: ${{ steps.set-variables.outputs.archs }}
|
||||
debians: ${{ steps.set-variables.outputs.debians }}
|
||||
tag: ${{ steps.set-variables.outputs.image-tag }}
|
||||
everything: ${{ steps.set-more-variables.outputs.everything }}
|
||||
found: ${{ steps.set-more-variables.outputs.found }}
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Get build-tools image tag for the current commit
|
||||
id: get-build-tools-tag
|
||||
- name: Set variables
|
||||
id: set-variables
|
||||
env:
|
||||
ARCHS: ${{ inputs.archs || '["x64","arm64"]' }}
|
||||
DEBIANS: ${{ inputs.debians || '["bullseye","bookworm"]' }}
|
||||
IMAGE_TAG: |
|
||||
${{ hashFiles('build-tools.Dockerfile',
|
||||
'.github/workflows/build-build-tools-image.yml') }}
|
||||
run: |
|
||||
echo "image-tag=${IMAGE_TAG}" | tee -a $GITHUB_OUTPUT
|
||||
echo "archs=${ARCHS}" | tee -a ${GITHUB_OUTPUT}
|
||||
echo "debians=${DEBIANS}" | tee -a ${GITHUB_OUTPUT}
|
||||
echo "image-tag=${IMAGE_TAG}" | tee -a ${GITHUB_OUTPUT}
|
||||
|
||||
- name: Check if such tag found in the registry
|
||||
id: check-image
|
||||
- name: Set more variables
|
||||
id: set-more-variables
|
||||
env:
|
||||
IMAGE_TAG: ${{ steps.get-build-tools-tag.outputs.image-tag }}
|
||||
IMAGE_TAG: ${{ steps.set-variables.outputs.image-tag }}
|
||||
EVERYTHING: |
|
||||
${{ contains(fromJson(steps.set-variables.outputs.archs), 'x64') &&
|
||||
contains(fromJson(steps.set-variables.outputs.archs), 'arm64') &&
|
||||
contains(fromJson(steps.set-variables.outputs.debians), 'bullseye') &&
|
||||
contains(fromJson(steps.set-variables.outputs.debians), 'bookworm') }}
|
||||
run: |
|
||||
if docker manifest inspect neondatabase/build-tools:${IMAGE_TAG}; then
|
||||
found=true
|
||||
@@ -58,8 +81,8 @@ jobs:
|
||||
found=false
|
||||
fi
|
||||
|
||||
echo "found=${found}" | tee -a $GITHUB_OUTPUT
|
||||
|
||||
echo "everything=${EVERYTHING}" | tee -a ${GITHUB_OUTPUT}
|
||||
echo "found=${found}" | tee -a ${GITHUB_OUTPUT}
|
||||
|
||||
build-image:
|
||||
needs: [ check-image ]
|
||||
@@ -67,8 +90,8 @@ jobs:
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
debian-version: [ bullseye, bookworm ]
|
||||
arch: [ x64, arm64 ]
|
||||
arch: ${{ fromJson(needs.check-image.outputs.archs) }}
|
||||
debian: ${{ fromJson(needs.check-image.outputs.debians) }}
|
||||
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
|
||||
|
||||
@@ -99,11 +122,11 @@ jobs:
|
||||
push: true
|
||||
pull: true
|
||||
build-args: |
|
||||
DEBIAN_VERSION=${{ matrix.debian-version }}
|
||||
cache-from: type=registry,ref=cache.neon.build/build-tools:cache-${{ matrix.debian-version }}-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/build-tools:cache-{0}-{1},mode=max', matrix.debian-version, matrix.arch) || '' }}
|
||||
DEBIAN_VERSION=${{ matrix.debian }}
|
||||
cache-from: type=registry,ref=cache.neon.build/build-tools:cache-${{ matrix.debian }}-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/build-tools:cache-{0}-{1},mode=max', matrix.debian, matrix.arch) || '' }}
|
||||
tags: |
|
||||
neondatabase/build-tools:${{ needs.check-image.outputs.tag }}-${{ matrix.debian-version }}-${{ matrix.arch }}
|
||||
neondatabase/build-tools:${{ needs.check-image.outputs.tag }}-${{ matrix.debian }}-${{ matrix.arch }}
|
||||
|
||||
merge-images:
|
||||
needs: [ check-image, build-image ]
|
||||
@@ -117,16 +140,22 @@ jobs:
|
||||
|
||||
- name: Create multi-arch image
|
||||
env:
|
||||
DEFAULT_DEBIAN_VERSION: bullseye
|
||||
DEFAULT_DEBIAN_VERSION: bookworm
|
||||
ARCHS: ${{ join(fromJson(needs.check-image.outputs.archs), ' ') }}
|
||||
DEBIANS: ${{ join(fromJson(needs.check-image.outputs.debians), ' ') }}
|
||||
EVERYTHING: ${{ needs.check-image.outputs.everything }}
|
||||
IMAGE_TAG: ${{ needs.check-image.outputs.tag }}
|
||||
run: |
|
||||
for debian_version in bullseye bookworm; do
|
||||
tags=("-t" "neondatabase/build-tools:${IMAGE_TAG}-${debian_version}")
|
||||
if [ "${debian_version}" == "${DEFAULT_DEBIAN_VERSION}" ]; then
|
||||
for debian in ${DEBIANS}; do
|
||||
tags=("-t" "neondatabase/build-tools:${IMAGE_TAG}-${debian}")
|
||||
|
||||
if [ "${EVERYTHING}" == "true" ] && [ "${debian}" == "${DEFAULT_DEBIAN_VERSION}" ]; then
|
||||
tags+=("-t" "neondatabase/build-tools:${IMAGE_TAG}")
|
||||
fi
|
||||
|
||||
docker buildx imagetools create "${tags[@]}" \
|
||||
neondatabase/build-tools:${IMAGE_TAG}-${debian_version}-x64 \
|
||||
neondatabase/build-tools:${IMAGE_TAG}-${debian_version}-arm64
|
||||
for arch in ${ARCHS}; do
|
||||
tags+=("neondatabase/build-tools:${IMAGE_TAG}-${debian}-${arch}")
|
||||
done
|
||||
|
||||
docker buildx imagetools create "${tags[@]}"
|
||||
done
|
||||
|
||||
2
.github/workflows/periodic_pagebench.yml
vendored
2
.github/workflows/periodic_pagebench.yml
vendored
@@ -29,7 +29,7 @@ jobs:
|
||||
trigger_bench_on_ec2_machine_in_eu_central_1:
|
||||
runs-on: [ self-hosted, small ]
|
||||
container:
|
||||
image: neondatabase/build-tools:pinned
|
||||
image: neondatabase/build-tools:pinned-bookworm
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
|
||||
2
.github/workflows/pin-build-tools-image.yml
vendored
2
.github/workflows/pin-build-tools-image.yml
vendored
@@ -94,7 +94,7 @@ jobs:
|
||||
|
||||
- name: Tag build-tools with `${{ env.TO_TAG }}` in Docker Hub, ECR, and ACR
|
||||
env:
|
||||
DEFAULT_DEBIAN_VERSION: bullseye
|
||||
DEFAULT_DEBIAN_VERSION: bookworm
|
||||
run: |
|
||||
for debian_version in bullseye bookworm; do
|
||||
tags=()
|
||||
|
||||
9
.github/workflows/pre-merge-checks.yml
vendored
9
.github/workflows/pre-merge-checks.yml
vendored
@@ -23,6 +23,8 @@ jobs:
|
||||
id: python-src
|
||||
with:
|
||||
files: |
|
||||
.github/workflows/_check-codestyle-python.yml
|
||||
.github/workflows/build-build-tools-image.yml
|
||||
.github/workflows/pre-merge-checks.yml
|
||||
**/**.py
|
||||
poetry.lock
|
||||
@@ -38,6 +40,10 @@ jobs:
|
||||
if: needs.get-changed-files.outputs.python-changed == 'true'
|
||||
needs: [ get-changed-files ]
|
||||
uses: ./.github/workflows/build-build-tools-image.yml
|
||||
with:
|
||||
# Build only one combination to save time
|
||||
archs: '["x64"]'
|
||||
debians: '["bookworm"]'
|
||||
secrets: inherit
|
||||
|
||||
check-codestyle-python:
|
||||
@@ -45,7 +51,8 @@ jobs:
|
||||
needs: [ get-changed-files, build-build-tools-image ]
|
||||
uses: ./.github/workflows/_check-codestyle-python.yml
|
||||
with:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
# `-bookworm-x64` suffix should match the combination in `build-build-tools-image`
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm-x64
|
||||
secrets: inherit
|
||||
|
||||
# To get items from the merge queue merged into main we need to satisfy "Status checks that are required".
|
||||
|
||||
17
Cargo.lock
generated
17
Cargo.lock
generated
@@ -4133,7 +4133,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -4146,7 +4146,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
|
||||
dependencies = [
|
||||
"base64 0.20.0",
|
||||
"byteorder",
|
||||
@@ -4165,7 +4165,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -5364,6 +5364,7 @@ dependencies = [
|
||||
"itertools 0.10.5",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"parking_lot 0.12.1",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
@@ -5395,6 +5396,7 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"utils",
|
||||
"wal_decoder",
|
||||
"walproposer",
|
||||
"workspace_hack",
|
||||
]
|
||||
@@ -6466,7 +6468,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
@@ -7021,6 +7023,7 @@ dependencies = [
|
||||
"serde_assert",
|
||||
"serde_json",
|
||||
"serde_path_to_error",
|
||||
"serde_with",
|
||||
"signal-hook",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
@@ -7117,10 +7120,16 @@ name = "wal_decoder"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-compression",
|
||||
"bytes",
|
||||
"pageserver_api",
|
||||
"postgres_ffi",
|
||||
"prost",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
|
||||
@@ -7,7 +7,7 @@ ARG IMAGE=build-tools
|
||||
ARG TAG=pinned
|
||||
ARG DEFAULT_PG_VERSION=17
|
||||
ARG STABLE_PG_VERSION=16
|
||||
ARG DEBIAN_VERSION=bullseye
|
||||
ARG DEBIAN_VERSION=bookworm
|
||||
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
|
||||
|
||||
# Build Postgres
|
||||
|
||||
1
Makefile
1
Makefile
@@ -38,6 +38,7 @@ ifeq ($(UNAME_S),Linux)
|
||||
# Seccomp BPF is only available for Linux
|
||||
PG_CONFIGURE_OPTS += --with-libseccomp
|
||||
else ifeq ($(UNAME_S),Darwin)
|
||||
PG_CFLAGS += -DUSE_PREFETCH
|
||||
ifndef DISABLE_HOMEBREW
|
||||
# macOS with brew-installed openssl requires explicit paths
|
||||
# It can be configured with OPENSSL_PREFIX variable
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
ARG DEBIAN_VERSION=bullseye
|
||||
ARG DEBIAN_VERSION=bookworm
|
||||
|
||||
FROM debian:bookworm-slim AS pgcopydb_builder
|
||||
ARG DEBIAN_VERSION
|
||||
|
||||
@@ -3,7 +3,7 @@ ARG REPOSITORY=neondatabase
|
||||
ARG IMAGE=build-tools
|
||||
ARG TAG=pinned
|
||||
ARG BUILD_TAG
|
||||
ARG DEBIAN_VERSION=bullseye
|
||||
ARG DEBIAN_VERSION=bookworm
|
||||
ARG DEBIAN_FLAVOR=${DEBIAN_VERSION}-slim
|
||||
|
||||
#########################################################################################
|
||||
|
||||
@@ -58,7 +58,7 @@ use compute_tools::compute::{
|
||||
forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
|
||||
};
|
||||
use compute_tools::configurator::launch_configurator;
|
||||
use compute_tools::extension_server::get_pg_version;
|
||||
use compute_tools::extension_server::get_pg_version_string;
|
||||
use compute_tools::http::api::launch_http_server;
|
||||
use compute_tools::logger::*;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
@@ -326,7 +326,7 @@ fn wait_spec(
|
||||
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
|
||||
pgdata: pgdata.to_string(),
|
||||
pgbin: pgbin.to_string(),
|
||||
pgversion: get_pg_version(pgbin),
|
||||
pgversion: get_pg_version_string(pgbin),
|
||||
live_config_allowed,
|
||||
state: Mutex::new(new_state),
|
||||
state_changed: Condvar::new(),
|
||||
|
||||
@@ -29,6 +29,7 @@ use anyhow::Context;
|
||||
use aws_config::BehaviorVersion;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::Parser;
|
||||
use compute_tools::extension_server::{get_pg_version, PostgresMajorVersion};
|
||||
use nix::unistd::Pid;
|
||||
use tracing::{info, info_span, warn, Instrument};
|
||||
use utils::fs_ext::is_directory_empty;
|
||||
@@ -131,11 +132,17 @@ pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
//
|
||||
// Initialize pgdata
|
||||
//
|
||||
let pg_version = match get_pg_version(pg_bin_dir.as_str()) {
|
||||
PostgresMajorVersion::V14 => 14,
|
||||
PostgresMajorVersion::V15 => 15,
|
||||
PostgresMajorVersion::V16 => 16,
|
||||
PostgresMajorVersion::V17 => 17,
|
||||
};
|
||||
let superuser = "cloud_admin"; // XXX: this shouldn't be hard-coded
|
||||
postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
|
||||
superuser,
|
||||
locale: "en_US.UTF-8", // XXX: this shouldn't be hard-coded,
|
||||
pg_version: 140000, // XXX: this shouldn't be hard-coded but derived from which compute image we're running in
|
||||
pg_version,
|
||||
initdb_bin: pg_bin_dir.join("initdb").as_ref(),
|
||||
library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
|
||||
pgdata: &pgdata_dir,
|
||||
|
||||
@@ -103,14 +103,33 @@ fn get_pg_config(argument: &str, pgbin: &str) -> String {
|
||||
.to_string()
|
||||
}
|
||||
|
||||
pub fn get_pg_version(pgbin: &str) -> String {
|
||||
pub fn get_pg_version(pgbin: &str) -> PostgresMajorVersion {
|
||||
// pg_config --version returns a (platform specific) human readable string
|
||||
// such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
|
||||
let human_version = get_pg_config("--version", pgbin);
|
||||
parse_pg_version(&human_version).to_string()
|
||||
parse_pg_version(&human_version)
|
||||
}
|
||||
|
||||
fn parse_pg_version(human_version: &str) -> &str {
|
||||
pub fn get_pg_version_string(pgbin: &str) -> String {
|
||||
match get_pg_version(pgbin) {
|
||||
PostgresMajorVersion::V14 => "v14",
|
||||
PostgresMajorVersion::V15 => "v15",
|
||||
PostgresMajorVersion::V16 => "v16",
|
||||
PostgresMajorVersion::V17 => "v17",
|
||||
}
|
||||
.to_owned()
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
pub enum PostgresMajorVersion {
|
||||
V14,
|
||||
V15,
|
||||
V16,
|
||||
V17,
|
||||
}
|
||||
|
||||
fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
|
||||
use PostgresMajorVersion::*;
|
||||
// Normal releases have version strings like "PostgreSQL 15.4". But there
|
||||
// are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
|
||||
// 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
|
||||
@@ -121,10 +140,10 @@ fn parse_pg_version(human_version: &str) -> &str {
|
||||
.captures(human_version)
|
||||
{
|
||||
Some(captures) if captures.len() == 2 => match &captures["major"] {
|
||||
"14" => return "v14",
|
||||
"15" => return "v15",
|
||||
"16" => return "v16",
|
||||
"17" => return "v17",
|
||||
"14" => return V14,
|
||||
"15" => return V15,
|
||||
"16" => return V16,
|
||||
"17" => return V17,
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
@@ -263,24 +282,25 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_parse_pg_version() {
|
||||
assert_eq!(parse_pg_version("PostgreSQL 15.4"), "v15");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 15.14"), "v15");
|
||||
use super::PostgresMajorVersion::*;
|
||||
assert_eq!(parse_pg_version("PostgreSQL 15.4"), V15);
|
||||
assert_eq!(parse_pg_version("PostgreSQL 15.14"), V15);
|
||||
assert_eq!(
|
||||
parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
|
||||
"v15"
|
||||
V15
|
||||
);
|
||||
|
||||
assert_eq!(parse_pg_version("PostgreSQL 14.15"), "v14");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 14.0"), "v14");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 14.15"), V14);
|
||||
assert_eq!(parse_pg_version("PostgreSQL 14.0"), V14);
|
||||
assert_eq!(
|
||||
parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
|
||||
"v14"
|
||||
V14
|
||||
);
|
||||
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16devel"), "v16");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16beta1"), "v16");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16rc2"), "v16");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16extra"), "v16");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16devel"), V16);
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16beta1"), V16);
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16rc2"), V16);
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16extra"), V16);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -415,6 +415,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'timeline_offloading' as bool")?,
|
||||
wal_receiver_protocol_override: settings
|
||||
.remove("wal_receiver_protocol_override")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("parse `wal_receiver_protocol_override` from json")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
|
||||
@@ -2,14 +2,28 @@
|
||||
|
||||
// This module has heavy inspiration from the prometheus crate's `process_collector.rs`.
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use prometheus::Gauge;
|
||||
|
||||
use crate::UIntGauge;
|
||||
|
||||
pub struct Collector {
|
||||
descs: Vec<prometheus::core::Desc>,
|
||||
vmlck: crate::UIntGauge,
|
||||
cpu_seconds_highres: Gauge,
|
||||
}
|
||||
|
||||
const NMETRICS: usize = 1;
|
||||
const NMETRICS: usize = 2;
|
||||
|
||||
static CLK_TCK_F64: Lazy<f64> = Lazy::new(|| {
|
||||
let long = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
|
||||
if long == -1 {
|
||||
panic!("sysconf(_SC_CLK_TCK) failed");
|
||||
}
|
||||
let convertible_to_f64: i32 =
|
||||
i32::try_from(long).expect("sysconf(_SC_CLK_TCK) is larger than i32");
|
||||
convertible_to_f64 as f64
|
||||
});
|
||||
|
||||
impl prometheus::core::Collector for Collector {
|
||||
fn desc(&self) -> Vec<&prometheus::core::Desc> {
|
||||
@@ -27,6 +41,12 @@ impl prometheus::core::Collector for Collector {
|
||||
mfs.extend(self.vmlck.collect())
|
||||
}
|
||||
}
|
||||
if let Ok(stat) = myself.stat() {
|
||||
let cpu_seconds = stat.utime + stat.stime;
|
||||
self.cpu_seconds_highres
|
||||
.set(cpu_seconds as f64 / *CLK_TCK_F64);
|
||||
mfs.extend(self.cpu_seconds_highres.collect());
|
||||
}
|
||||
mfs
|
||||
}
|
||||
}
|
||||
@@ -43,7 +63,23 @@ impl Collector {
|
||||
.cloned(),
|
||||
);
|
||||
|
||||
Self { descs, vmlck }
|
||||
let cpu_seconds_highres = Gauge::new(
|
||||
"libmetrics_process_cpu_seconds_highres",
|
||||
"Total user and system CPU time spent in seconds.\
|
||||
Sub-second resolution, hence better than `process_cpu_seconds_total`.",
|
||||
)
|
||||
.unwrap();
|
||||
descs.extend(
|
||||
prometheus::core::Collector::desc(&cpu_seconds_highres)
|
||||
.into_iter()
|
||||
.cloned(),
|
||||
);
|
||||
|
||||
Self {
|
||||
descs,
|
||||
vmlck,
|
||||
cpu_seconds_highres,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::{
|
||||
str::FromStr,
|
||||
time::Duration,
|
||||
};
|
||||
use utils::logging::LogFormat;
|
||||
use utils::{logging::LogFormat, postgres_client::PostgresClientProtocol};
|
||||
|
||||
use crate::models::ImageCompressionAlgorithm;
|
||||
use crate::models::LsnLease;
|
||||
@@ -120,6 +120,7 @@ pub struct ConfigToml {
|
||||
pub no_sync: Option<bool>,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub server_side_batch_timeout: Option<Duration>,
|
||||
pub wal_receiver_protocol: PostgresClientProtocol,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -277,6 +278,8 @@ pub struct TenantConfigToml {
|
||||
/// Enable auto-offloading of timelines.
|
||||
/// (either this flag or the pageserver-global one need to be set)
|
||||
pub timeline_offloading: bool,
|
||||
|
||||
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
|
||||
}
|
||||
|
||||
pub mod defaults {
|
||||
@@ -330,6 +333,9 @@ pub mod defaults {
|
||||
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
|
||||
|
||||
pub const DEFAULT_SERVER_SIDE_BATCH_TIMEOUT: Option<&str> = None;
|
||||
|
||||
pub const DEFAULT_WAL_RECEIVER_PROTOCOL: utils::postgres_client::PostgresClientProtocol =
|
||||
utils::postgres_client::PostgresClientProtocol::Vanilla;
|
||||
}
|
||||
|
||||
impl Default for ConfigToml {
|
||||
@@ -418,6 +424,7 @@ impl Default for ConfigToml {
|
||||
.map(|duration| humantime::parse_duration(duration).unwrap()),
|
||||
tenant_config: TenantConfigToml::default(),
|
||||
no_sync: None,
|
||||
wal_receiver_protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -505,6 +512,7 @@ impl Default for TenantConfigToml {
|
||||
lsn_lease_length: LsnLease::DEFAULT_LENGTH,
|
||||
lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS,
|
||||
timeline_offloading: false,
|
||||
wal_receiver_protocol_override: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -229,6 +229,18 @@ impl Key {
|
||||
}
|
||||
}
|
||||
|
||||
impl CompactKey {
|
||||
pub fn raw(&self) -> i128 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl From<i128> for CompactKey {
|
||||
fn from(value: i128) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Key {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
|
||||
@@ -23,6 +23,7 @@ use utils::{
|
||||
completion,
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
postgres_client::PostgresClientProtocol,
|
||||
serde_system_time,
|
||||
};
|
||||
|
||||
@@ -352,6 +353,7 @@ pub struct TenantConfig {
|
||||
pub lsn_lease_length: Option<String>,
|
||||
pub lsn_lease_length_for_ts: Option<String>,
|
||||
pub timeline_offloading: Option<bool>,
|
||||
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
|
||||
}
|
||||
|
||||
/// The policy for the aux file storage.
|
||||
|
||||
@@ -562,6 +562,9 @@ pub enum BeMessage<'a> {
|
||||
options: &'a [&'a str],
|
||||
},
|
||||
KeepAlive(WalSndKeepAlive),
|
||||
/// Batch of interpreted, shard filtered WAL records,
|
||||
/// ready for the pageserver to ingest
|
||||
InterpretedWalRecords(InterpretedWalRecordsBody<'a>),
|
||||
}
|
||||
|
||||
/// Common shorthands.
|
||||
@@ -672,6 +675,22 @@ pub struct WalSndKeepAlive {
|
||||
pub request_reply: bool,
|
||||
}
|
||||
|
||||
/// Batch of interpreted WAL records used in the interpreted
|
||||
/// safekeeper to pageserver protocol.
|
||||
///
|
||||
/// Note that the pageserver uses the RawInterpretedWalRecordsBody
|
||||
/// counterpart of this from the neondatabase/rust-postgres repo.
|
||||
/// If you're changing this struct, you likely need to change its
|
||||
/// twin as well.
|
||||
#[derive(Debug)]
|
||||
pub struct InterpretedWalRecordsBody<'a> {
|
||||
/// End of raw WAL in [`Self::data`]
|
||||
pub streaming_lsn: u64,
|
||||
/// Current end of WAL on the server
|
||||
pub commit_lsn: u64,
|
||||
pub data: &'a [u8],
|
||||
}
|
||||
|
||||
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
|
||||
|
||||
// single text column
|
||||
@@ -996,6 +1015,19 @@ impl BeMessage<'_> {
|
||||
Ok(())
|
||||
})?
|
||||
}
|
||||
|
||||
BeMessage::InterpretedWalRecords(rec) => {
|
||||
// We use the COPY_DATA_TAG for our custom message
|
||||
// since this tag is interpreted as raw bytes.
|
||||
buf.put_u8(b'd');
|
||||
write_body(buf, |buf| {
|
||||
buf.put_u8(b'0'); // matches INTERPRETED_WAL_RECORD_TAG in postgres-protocol
|
||||
// dependency
|
||||
buf.put_u64(rec.streaming_lsn);
|
||||
buf.put_u64(rec.commit_lsn);
|
||||
buf.put_slice(rec.data);
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -17,7 +17,6 @@ use anyhow::Result;
|
||||
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
|
||||
use azure_core::{Continuable, RetryOptions};
|
||||
use azure_identity::DefaultAzureCredential;
|
||||
use azure_storage::CloudLocation;
|
||||
use azure_storage::StorageCredentials;
|
||||
use azure_storage_blobs::blob::CopyStatus;
|
||||
use azure_storage_blobs::prelude::ClientBuilder;
|
||||
@@ -73,16 +72,8 @@ impl AzureBlobStorage {
|
||||
StorageCredentials::token_credential(Arc::new(token_credential))
|
||||
};
|
||||
|
||||
let location = match &azure_config.endpoint {
|
||||
None => CloudLocation::Public { account },
|
||||
Some(endpoint) => CloudLocation::Custom {
|
||||
account,
|
||||
uri: endpoint.clone(),
|
||||
},
|
||||
};
|
||||
let builder = ClientBuilder::with_location(location, credentials)
|
||||
// we have an outer retry
|
||||
.retry(RetryOptions::none());
|
||||
// we have an outer retry
|
||||
let builder = ClientBuilder::new(account, credentials).retry(RetryOptions::none());
|
||||
|
||||
let client = builder.container_client(azure_config.container_name.to_owned());
|
||||
|
||||
|
||||
@@ -125,8 +125,6 @@ pub struct AzureConfig {
|
||||
pub container_region: String,
|
||||
/// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once.
|
||||
pub prefix_in_container: Option<String>,
|
||||
/// The endpoint to use. Use the default if None.
|
||||
pub endpoint: Option<String>,
|
||||
/// Azure has various limits on its API calls, we need not to exceed those.
|
||||
/// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details.
|
||||
#[serde(default = "default_remote_storage_azure_concurrency_limit")]
|
||||
@@ -146,7 +144,6 @@ impl Debug for AzureConfig {
|
||||
.field("storage_account", &self.storage_account)
|
||||
.field("bucket_region", &self.container_region)
|
||||
.field("prefix_in_container", &self.prefix_in_container)
|
||||
.field("endpoint", &self.endpoint)
|
||||
.field("concurrency_limit", &self.concurrency_limit)
|
||||
.field(
|
||||
"max_keys_per_list_response",
|
||||
@@ -299,7 +296,6 @@ timeout = '5s'";
|
||||
storage_account: None,
|
||||
container_region: "westeurope".into(),
|
||||
prefix_in_container: None,
|
||||
endpoint: None,
|
||||
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
|
||||
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
|
||||
}),
|
||||
|
||||
@@ -176,7 +176,9 @@ pub(crate) struct BucketMetrics {
|
||||
|
||||
impl Default for BucketMetrics {
|
||||
fn default() -> Self {
|
||||
let buckets = [0.01, 0.10, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0];
|
||||
// first bucket 100 microseconds to count requests that do not need to wait at all
|
||||
// and get a permit immediately
|
||||
let buckets = [0.0001, 0.01, 0.10, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0];
|
||||
|
||||
let req_seconds = register_histogram_vec!(
|
||||
"remote_storage_s3_request_seconds",
|
||||
|
||||
@@ -33,6 +33,7 @@ pprof.workspace = true
|
||||
regex.workspace = true
|
||||
routerify.workspace = true
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
signal-hook.workspace = true
|
||||
thiserror.workspace = true
|
||||
|
||||
@@ -7,29 +7,88 @@ use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||
|
||||
use crate::id::TenantTimelineId;
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum InterpretedFormat {
|
||||
Bincode,
|
||||
Protobuf,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum Compression {
|
||||
Zstd { level: i8 },
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "type", content = "args")]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum PostgresClientProtocol {
|
||||
/// Usual Postgres replication protocol
|
||||
Vanilla,
|
||||
/// Custom shard-aware protocol that replicates interpreted records.
|
||||
/// Used to send wal from safekeeper to pageserver.
|
||||
Interpreted {
|
||||
format: InterpretedFormat,
|
||||
compression: Option<Compression>,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct ConnectionConfigArgs<'a> {
|
||||
pub protocol: PostgresClientProtocol,
|
||||
|
||||
pub ttid: TenantTimelineId,
|
||||
pub shard_number: Option<u8>,
|
||||
pub shard_count: Option<u8>,
|
||||
pub shard_stripe_size: Option<u32>,
|
||||
|
||||
pub listen_pg_addr_str: &'a str,
|
||||
|
||||
pub auth_token: Option<&'a str>,
|
||||
pub availability_zone: Option<&'a str>,
|
||||
}
|
||||
|
||||
impl<'a> ConnectionConfigArgs<'a> {
|
||||
fn options(&'a self) -> Vec<String> {
|
||||
let mut options = vec![
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", self.ttid.timeline_id),
|
||||
format!("tenant_id={}", self.ttid.tenant_id),
|
||||
format!(
|
||||
"protocol={}",
|
||||
serde_json::to_string(&self.protocol).unwrap()
|
||||
),
|
||||
];
|
||||
|
||||
if self.shard_number.is_some() {
|
||||
assert!(self.shard_count.is_some());
|
||||
assert!(self.shard_stripe_size.is_some());
|
||||
|
||||
options.push(format!("shard_count={}", self.shard_count.unwrap()));
|
||||
options.push(format!("shard_number={}", self.shard_number.unwrap()));
|
||||
options.push(format!(
|
||||
"shard_stripe_size={}",
|
||||
self.shard_stripe_size.unwrap()
|
||||
));
|
||||
}
|
||||
|
||||
options
|
||||
}
|
||||
}
|
||||
|
||||
/// Create client config for fetching WAL from safekeeper on particular timeline.
|
||||
/// listen_pg_addr_str is in form host:\[port\].
|
||||
pub fn wal_stream_connection_config(
|
||||
TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
}: TenantTimelineId,
|
||||
listen_pg_addr_str: &str,
|
||||
auth_token: Option<&str>,
|
||||
availability_zone: Option<&str>,
|
||||
args: ConnectionConfigArgs,
|
||||
) -> anyhow::Result<PgConnectionConfig> {
|
||||
let (host, port) =
|
||||
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
let port = port.unwrap_or(5432);
|
||||
let mut connstr = PgConnectionConfig::new_host_port(host, port)
|
||||
.extend_options([
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", timeline_id),
|
||||
format!("tenant_id={}", tenant_id),
|
||||
])
|
||||
.set_password(auth_token.map(|s| s.to_owned()));
|
||||
.extend_options(args.options())
|
||||
.set_password(args.auth_token.map(|s| s.to_owned()));
|
||||
|
||||
if let Some(availability_zone) = availability_zone {
|
||||
if let Some(availability_zone) = args.availability_zone {
|
||||
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod heavier_once_cell;
|
||||
|
||||
pub mod duplex;
|
||||
pub mod gate;
|
||||
|
||||
1
libs/utils/src/sync/duplex.rs
Normal file
1
libs/utils/src/sync/duplex.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod mpsc;
|
||||
36
libs/utils/src/sync/duplex/mpsc.rs
Normal file
36
libs/utils/src/sync/duplex/mpsc.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// A bi-directional channel.
|
||||
pub struct Duplex<S, R> {
|
||||
pub tx: mpsc::Sender<S>,
|
||||
pub rx: mpsc::Receiver<R>,
|
||||
}
|
||||
|
||||
/// Creates a bi-directional channel.
|
||||
///
|
||||
/// The channel will buffer up to the provided number of messages. Once the buffer is full,
|
||||
/// attempts to send new messages will wait until a message is received from the channel.
|
||||
/// The provided buffer capacity must be at least 1.
|
||||
pub fn channel<A: Send, B: Send>(buffer: usize) -> (Duplex<A, B>, Duplex<B, A>) {
|
||||
let (tx_a, rx_a) = mpsc::channel::<A>(buffer);
|
||||
let (tx_b, rx_b) = mpsc::channel::<B>(buffer);
|
||||
|
||||
(Duplex { tx: tx_a, rx: rx_b }, Duplex { tx: tx_b, rx: rx_a })
|
||||
}
|
||||
|
||||
impl<S: Send, R: Send> Duplex<S, R> {
|
||||
/// Sends a value, waiting until there is capacity.
|
||||
///
|
||||
/// A successful send occurs when it is determined that the other end of the channel has not hung up already.
|
||||
pub async fn send(&self, x: S) -> Result<(), mpsc::error::SendError<S>> {
|
||||
self.tx.send(x).await
|
||||
}
|
||||
|
||||
/// Receives the next value for this receiver.
|
||||
///
|
||||
/// This method returns `None` if the channel has been closed and there are
|
||||
/// no remaining messages in the channel's buffer.
|
||||
pub async fn recv(&mut self) -> Option<R> {
|
||||
self.rx.recv().await
|
||||
}
|
||||
}
|
||||
@@ -218,7 +218,7 @@ impl MemoryStatus {
|
||||
fn debug_slice(slice: &[Self]) -> impl '_ + Debug {
|
||||
struct DS<'a>(&'a [MemoryStatus]);
|
||||
|
||||
impl<'a> Debug for DS<'a> {
|
||||
impl Debug for DS<'_> {
|
||||
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
|
||||
f.debug_struct("[MemoryStatus]")
|
||||
.field(
|
||||
@@ -233,7 +233,7 @@ impl MemoryStatus {
|
||||
|
||||
struct Fields<'a, F>(&'a [MemoryStatus], F);
|
||||
|
||||
impl<'a, F: Fn(&MemoryStatus) -> T, T: Debug> Debug for Fields<'a, F> {
|
||||
impl<F: Fn(&MemoryStatus) -> T, T: Debug> Debug for Fields<'_, F> {
|
||||
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
|
||||
f.debug_list().entries(self.0.iter().map(&self.1)).finish()
|
||||
}
|
||||
|
||||
@@ -8,11 +8,19 @@ license.workspace = true
|
||||
testing = ["pageserver_api/testing"]
|
||||
|
||||
[dependencies]
|
||||
async-compression.workspace = true
|
||||
anyhow.workspace = true
|
||||
bytes.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
prost.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
serde.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio = { workspace = true, features = ["io-util"] }
|
||||
tonic.workspace = true
|
||||
tracing.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build.workspace = true
|
||||
|
||||
11
libs/wal_decoder/build.rs
Normal file
11
libs/wal_decoder/build.rs
Normal file
@@ -0,0 +1,11 @@
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Generate rust code from .proto protobuf.
|
||||
//
|
||||
// Note: we previously tried to use deterministic location at proto/ for
|
||||
// easy location, but apparently interference with cachepot sometimes fails
|
||||
// the build then. Anyway, per cargo docs build script shouldn't output to
|
||||
// anywhere but $OUT_DIR.
|
||||
tonic_build::compile_protos("proto/interpreted_wal.proto")
|
||||
.unwrap_or_else(|e| panic!("failed to compile protos {:?}", e));
|
||||
Ok(())
|
||||
}
|
||||
43
libs/wal_decoder/proto/interpreted_wal.proto
Normal file
43
libs/wal_decoder/proto/interpreted_wal.proto
Normal file
@@ -0,0 +1,43 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package interpreted_wal;
|
||||
|
||||
message InterpretedWalRecords {
|
||||
repeated InterpretedWalRecord records = 1;
|
||||
optional uint64 next_record_lsn = 2;
|
||||
}
|
||||
|
||||
message InterpretedWalRecord {
|
||||
optional bytes metadata_record = 1;
|
||||
SerializedValueBatch batch = 2;
|
||||
uint64 next_record_lsn = 3;
|
||||
bool flush_uncommitted = 4;
|
||||
uint32 xid = 5;
|
||||
}
|
||||
|
||||
message SerializedValueBatch {
|
||||
bytes raw = 1;
|
||||
repeated ValueMeta metadata = 2;
|
||||
uint64 max_lsn = 3;
|
||||
uint64 len = 4;
|
||||
}
|
||||
|
||||
enum ValueMetaType {
|
||||
Serialized = 0;
|
||||
Observed = 1;
|
||||
}
|
||||
|
||||
message ValueMeta {
|
||||
ValueMetaType type = 1;
|
||||
CompactKey key = 2;
|
||||
uint64 lsn = 3;
|
||||
optional uint64 batch_offset = 4;
|
||||
optional uint64 len = 5;
|
||||
optional bool will_init = 6;
|
||||
}
|
||||
|
||||
message CompactKey {
|
||||
int64 high = 1;
|
||||
int64 low = 2;
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod decoder;
|
||||
pub mod models;
|
||||
pub mod serialized_batch;
|
||||
pub mod wire_format;
|
||||
|
||||
@@ -37,12 +37,32 @@ use utils::lsn::Lsn;
|
||||
|
||||
use crate::serialized_batch::SerializedValueBatch;
|
||||
|
||||
// Code generated by protobuf.
|
||||
pub mod proto {
|
||||
// Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
|
||||
// we don't use these types for anything but broker data transmission,
|
||||
// so it's ok to ignore this one.
|
||||
#![allow(clippy::derive_partial_eq_without_eq)]
|
||||
// The generated ValueMeta has a `len` method generate for its `len` field.
|
||||
#![allow(clippy::len_without_is_empty)]
|
||||
tonic::include_proto!("interpreted_wal");
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum FlushUncommittedRecords {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
/// A batch of interpreted WAL records
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct InterpretedWalRecords {
|
||||
pub records: Vec<InterpretedWalRecord>,
|
||||
// Start LSN of the next record after the batch.
|
||||
// Note that said record may not belong to the current shard.
|
||||
pub next_record_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
/// An interpreted Postgres WAL record, ready to be handled by the pageserver
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct InterpretedWalRecord {
|
||||
@@ -65,6 +85,18 @@ pub struct InterpretedWalRecord {
|
||||
pub xid: TransactionId,
|
||||
}
|
||||
|
||||
impl InterpretedWalRecord {
|
||||
/// Checks if the WAL record is empty
|
||||
///
|
||||
/// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
|
||||
/// pageserver.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.batch.is_empty()
|
||||
&& self.metadata_record.is_none()
|
||||
&& matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
|
||||
}
|
||||
}
|
||||
|
||||
/// The interpreted part of the Postgres WAL record which requires metadata
|
||||
/// writes to the underlying storage engine.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
||||
@@ -496,11 +496,16 @@ impl SerializedValueBatch {
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if the batch is empty
|
||||
///
|
||||
/// A batch is empty when it contains no serialized values.
|
||||
/// Note that it may still contain observed values.
|
||||
/// Checks if the batch contains any serialized or observed values
|
||||
pub fn is_empty(&self) -> bool {
|
||||
!self.has_data() && self.metadata.is_empty()
|
||||
}
|
||||
|
||||
/// Checks if the batch contains data
|
||||
///
|
||||
/// Note that if this returns false, it may still contain observed values or
|
||||
/// a metadata record.
|
||||
pub fn has_data(&self) -> bool {
|
||||
let empty = self.raw.is_empty();
|
||||
|
||||
if cfg!(debug_assertions) && empty {
|
||||
@@ -510,7 +515,7 @@ impl SerializedValueBatch {
|
||||
.all(|meta| matches!(meta, ValueMeta::Observed(_))));
|
||||
}
|
||||
|
||||
empty
|
||||
!empty
|
||||
}
|
||||
|
||||
/// Returns the number of values serialized in the batch
|
||||
|
||||
356
libs/wal_decoder/src/wire_format.rs
Normal file
356
libs/wal_decoder/src/wire_format.rs
Normal file
@@ -0,0 +1,356 @@
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use pageserver_api::key::CompactKey;
|
||||
use prost::{DecodeError, EncodeError, Message};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use utils::bin_ser::{BeSer, DeserializeError, SerializeError};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::postgres_client::{Compression, InterpretedFormat};
|
||||
|
||||
use crate::models::{
|
||||
FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord,
|
||||
};
|
||||
|
||||
use crate::serialized_batch::{
|
||||
ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta,
|
||||
};
|
||||
|
||||
use crate::models::proto;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ToWireFormatError {
|
||||
#[error("{0}")]
|
||||
Bincode(#[from] SerializeError),
|
||||
#[error("{0}")]
|
||||
Protobuf(#[from] ProtobufSerializeError),
|
||||
#[error("{0}")]
|
||||
Compression(#[from] std::io::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ProtobufSerializeError {
|
||||
#[error("{0}")]
|
||||
MetadataRecord(#[from] SerializeError),
|
||||
#[error("{0}")]
|
||||
Encode(#[from] EncodeError),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum FromWireFormatError {
|
||||
#[error("{0}")]
|
||||
Bincode(#[from] DeserializeError),
|
||||
#[error("{0}")]
|
||||
Protobuf(#[from] ProtobufDeserializeError),
|
||||
#[error("{0}")]
|
||||
Decompress(#[from] std::io::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ProtobufDeserializeError {
|
||||
#[error("{0}")]
|
||||
Transcode(#[from] TranscodeError),
|
||||
#[error("{0}")]
|
||||
Decode(#[from] DecodeError),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum TranscodeError {
|
||||
#[error("{0}")]
|
||||
BadInput(String),
|
||||
#[error("{0}")]
|
||||
MetadataRecord(#[from] DeserializeError),
|
||||
}
|
||||
|
||||
pub trait ToWireFormat {
|
||||
fn to_wire(
|
||||
self,
|
||||
format: InterpretedFormat,
|
||||
compression: Option<Compression>,
|
||||
) -> impl std::future::Future<Output = Result<Bytes, ToWireFormatError>> + Send;
|
||||
}
|
||||
|
||||
pub trait FromWireFormat {
|
||||
type T;
|
||||
fn from_wire(
|
||||
buf: &Bytes,
|
||||
format: InterpretedFormat,
|
||||
compression: Option<Compression>,
|
||||
) -> impl std::future::Future<Output = Result<Self::T, FromWireFormatError>> + Send;
|
||||
}
|
||||
|
||||
impl ToWireFormat for InterpretedWalRecords {
|
||||
async fn to_wire(
|
||||
self,
|
||||
format: InterpretedFormat,
|
||||
compression: Option<Compression>,
|
||||
) -> Result<Bytes, ToWireFormatError> {
|
||||
use async_compression::tokio::write::ZstdEncoder;
|
||||
use async_compression::Level;
|
||||
|
||||
let encode_res: Result<Bytes, ToWireFormatError> = match format {
|
||||
InterpretedFormat::Bincode => {
|
||||
let buf = BytesMut::new();
|
||||
let mut buf = buf.writer();
|
||||
self.ser_into(&mut buf)?;
|
||||
Ok(buf.into_inner().freeze())
|
||||
}
|
||||
InterpretedFormat::Protobuf => {
|
||||
let proto: proto::InterpretedWalRecords = self.try_into()?;
|
||||
let mut buf = BytesMut::new();
|
||||
proto
|
||||
.encode(&mut buf)
|
||||
.map_err(|e| ToWireFormatError::Protobuf(e.into()))?;
|
||||
|
||||
Ok(buf.freeze())
|
||||
}
|
||||
};
|
||||
|
||||
let buf = encode_res?;
|
||||
let compressed_buf = match compression {
|
||||
Some(Compression::Zstd { level }) => {
|
||||
let mut encoder = ZstdEncoder::with_quality(
|
||||
Vec::with_capacity(buf.len() / 4),
|
||||
Level::Precise(level as i32),
|
||||
);
|
||||
encoder.write_all(&buf).await?;
|
||||
encoder.shutdown().await?;
|
||||
Bytes::from(encoder.into_inner())
|
||||
}
|
||||
None => buf,
|
||||
};
|
||||
|
||||
Ok(compressed_buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromWireFormat for InterpretedWalRecords {
|
||||
type T = Self;
|
||||
|
||||
async fn from_wire(
|
||||
buf: &Bytes,
|
||||
format: InterpretedFormat,
|
||||
compression: Option<Compression>,
|
||||
) -> Result<Self, FromWireFormatError> {
|
||||
let decompressed_buf = match compression {
|
||||
Some(Compression::Zstd { .. }) => {
|
||||
use async_compression::tokio::write::ZstdDecoder;
|
||||
let mut decoded_buf = Vec::with_capacity(buf.len());
|
||||
let mut decoder = ZstdDecoder::new(&mut decoded_buf);
|
||||
decoder.write_all(buf).await?;
|
||||
decoder.flush().await?;
|
||||
Bytes::from(decoded_buf)
|
||||
}
|
||||
None => buf.clone(),
|
||||
};
|
||||
|
||||
match format {
|
||||
InterpretedFormat::Bincode => {
|
||||
InterpretedWalRecords::des(&decompressed_buf).map_err(FromWireFormatError::Bincode)
|
||||
}
|
||||
InterpretedFormat::Protobuf => {
|
||||
let proto = proto::InterpretedWalRecords::decode(decompressed_buf)
|
||||
.map_err(|e| FromWireFormatError::Protobuf(e.into()))?;
|
||||
InterpretedWalRecords::try_from(proto)
|
||||
.map_err(|e| FromWireFormatError::Protobuf(e.into()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<InterpretedWalRecords> for proto::InterpretedWalRecords {
|
||||
type Error = SerializeError;
|
||||
|
||||
fn try_from(value: InterpretedWalRecords) -> Result<Self, Self::Error> {
|
||||
let records = value
|
||||
.records
|
||||
.into_iter()
|
||||
.map(proto::InterpretedWalRecord::try_from)
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
Ok(proto::InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: value.next_record_lsn.map(|l| l.0),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<InterpretedWalRecord> for proto::InterpretedWalRecord {
|
||||
type Error = SerializeError;
|
||||
|
||||
fn try_from(value: InterpretedWalRecord) -> Result<Self, Self::Error> {
|
||||
let metadata_record = value
|
||||
.metadata_record
|
||||
.map(|meta_rec| -> Result<Vec<u8>, Self::Error> {
|
||||
let mut buf = Vec::new();
|
||||
meta_rec.ser_into(&mut buf)?;
|
||||
Ok(buf)
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
Ok(proto::InterpretedWalRecord {
|
||||
metadata_record,
|
||||
batch: Some(proto::SerializedValueBatch::from(value.batch)),
|
||||
next_record_lsn: value.next_record_lsn.0,
|
||||
flush_uncommitted: matches!(value.flush_uncommitted, FlushUncommittedRecords::Yes),
|
||||
xid: value.xid,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SerializedValueBatch> for proto::SerializedValueBatch {
|
||||
fn from(value: SerializedValueBatch) -> Self {
|
||||
proto::SerializedValueBatch {
|
||||
raw: value.raw,
|
||||
metadata: value
|
||||
.metadata
|
||||
.into_iter()
|
||||
.map(proto::ValueMeta::from)
|
||||
.collect(),
|
||||
max_lsn: value.max_lsn.0,
|
||||
len: value.len as u64,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ValueMeta> for proto::ValueMeta {
|
||||
fn from(value: ValueMeta) -> Self {
|
||||
match value {
|
||||
ValueMeta::Observed(obs) => proto::ValueMeta {
|
||||
r#type: proto::ValueMetaType::Observed.into(),
|
||||
key: Some(proto::CompactKey::from(obs.key)),
|
||||
lsn: obs.lsn.0,
|
||||
batch_offset: None,
|
||||
len: None,
|
||||
will_init: None,
|
||||
},
|
||||
ValueMeta::Serialized(ser) => proto::ValueMeta {
|
||||
r#type: proto::ValueMetaType::Serialized.into(),
|
||||
key: Some(proto::CompactKey::from(ser.key)),
|
||||
lsn: ser.lsn.0,
|
||||
batch_offset: Some(ser.batch_offset),
|
||||
len: Some(ser.len as u64),
|
||||
will_init: Some(ser.will_init),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CompactKey> for proto::CompactKey {
|
||||
fn from(value: CompactKey) -> Self {
|
||||
proto::CompactKey {
|
||||
high: (value.raw() >> 64) as i64,
|
||||
low: value.raw() as i64,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<proto::InterpretedWalRecords> for InterpretedWalRecords {
|
||||
type Error = TranscodeError;
|
||||
|
||||
fn try_from(value: proto::InterpretedWalRecords) -> Result<Self, Self::Error> {
|
||||
let records = value
|
||||
.records
|
||||
.into_iter()
|
||||
.map(InterpretedWalRecord::try_from)
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
||||
Ok(InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: value.next_record_lsn.map(Lsn::from),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<proto::InterpretedWalRecord> for InterpretedWalRecord {
|
||||
type Error = TranscodeError;
|
||||
|
||||
fn try_from(value: proto::InterpretedWalRecord) -> Result<Self, Self::Error> {
|
||||
let metadata_record = value
|
||||
.metadata_record
|
||||
.map(|mrec| -> Result<_, DeserializeError> { MetadataRecord::des(&mrec) })
|
||||
.transpose()?;
|
||||
|
||||
let batch = {
|
||||
let batch = value.batch.ok_or_else(|| {
|
||||
TranscodeError::BadInput("InterpretedWalRecord::batch missing".to_string())
|
||||
})?;
|
||||
|
||||
SerializedValueBatch::try_from(batch)?
|
||||
};
|
||||
|
||||
Ok(InterpretedWalRecord {
|
||||
metadata_record,
|
||||
batch,
|
||||
next_record_lsn: Lsn(value.next_record_lsn),
|
||||
flush_uncommitted: if value.flush_uncommitted {
|
||||
FlushUncommittedRecords::Yes
|
||||
} else {
|
||||
FlushUncommittedRecords::No
|
||||
},
|
||||
xid: value.xid,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<proto::SerializedValueBatch> for SerializedValueBatch {
|
||||
type Error = TranscodeError;
|
||||
|
||||
fn try_from(value: proto::SerializedValueBatch) -> Result<Self, Self::Error> {
|
||||
let metadata = value
|
||||
.metadata
|
||||
.into_iter()
|
||||
.map(ValueMeta::try_from)
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
Ok(SerializedValueBatch {
|
||||
raw: value.raw,
|
||||
metadata,
|
||||
max_lsn: Lsn(value.max_lsn),
|
||||
len: value.len as usize,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<proto::ValueMeta> for ValueMeta {
|
||||
type Error = TranscodeError;
|
||||
|
||||
fn try_from(value: proto::ValueMeta) -> Result<Self, Self::Error> {
|
||||
match proto::ValueMetaType::try_from(value.r#type) {
|
||||
Ok(proto::ValueMetaType::Serialized) => {
|
||||
Ok(ValueMeta::Serialized(SerializedValueMeta {
|
||||
key: value
|
||||
.key
|
||||
.ok_or_else(|| {
|
||||
TranscodeError::BadInput("ValueMeta::key missing".to_string())
|
||||
})?
|
||||
.into(),
|
||||
lsn: Lsn(value.lsn),
|
||||
batch_offset: value.batch_offset.ok_or_else(|| {
|
||||
TranscodeError::BadInput("ValueMeta::batch_offset missing".to_string())
|
||||
})?,
|
||||
len: value.len.ok_or_else(|| {
|
||||
TranscodeError::BadInput("ValueMeta::len missing".to_string())
|
||||
})? as usize,
|
||||
will_init: value.will_init.ok_or_else(|| {
|
||||
TranscodeError::BadInput("ValueMeta::will_init missing".to_string())
|
||||
})?,
|
||||
}))
|
||||
}
|
||||
Ok(proto::ValueMetaType::Observed) => Ok(ValueMeta::Observed(ObservedValueMeta {
|
||||
key: value
|
||||
.key
|
||||
.ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))?
|
||||
.into(),
|
||||
lsn: Lsn(value.lsn),
|
||||
})),
|
||||
Err(_) => Err(TranscodeError::BadInput(format!(
|
||||
"Unexpected ValueMeta::type {}",
|
||||
value.r#type
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<proto::CompactKey> for CompactKey {
|
||||
fn from(value: proto::CompactKey) -> Self {
|
||||
(((value.high as i128) << 64) | (value.low as i128)).into()
|
||||
}
|
||||
}
|
||||
@@ -62,10 +62,8 @@ async fn ingest(
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
let entered = gate.enter().unwrap();
|
||||
|
||||
let layer =
|
||||
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, entered, &ctx).await?;
|
||||
let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &gate, &ctx).await?;
|
||||
|
||||
let data = Value::Image(Bytes::from(vec![0u8; put_size]));
|
||||
let data_ser_size = data.serialized_size().unwrap() as usize;
|
||||
|
||||
@@ -126,6 +126,7 @@ fn main() -> anyhow::Result<()> {
|
||||
// after setting up logging, log the effective IO engine choice and read path implementations
|
||||
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
|
||||
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
|
||||
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
|
||||
|
||||
// The tenants directory contains all the pageserver local disk state.
|
||||
// Create if not exists and make sure all the contents are durable before proceeding.
|
||||
|
||||
@@ -14,6 +14,7 @@ use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use std::env;
|
||||
use storage_broker::Uri;
|
||||
use utils::logging::SecretString;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
use reqwest::Url;
|
||||
@@ -190,6 +191,8 @@ pub struct PageServerConf {
|
||||
/// Maximum amount of time for which a get page request request
|
||||
/// might be held up for request merging.
|
||||
pub server_side_batch_timeout: Option<Duration>,
|
||||
|
||||
pub wal_receiver_protocol: PostgresClientProtocol,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -350,6 +353,7 @@ impl PageServerConf {
|
||||
server_side_batch_timeout,
|
||||
tenant_config,
|
||||
no_sync,
|
||||
wal_receiver_protocol,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -393,6 +397,7 @@ impl PageServerConf {
|
||||
import_pgdata_upcall_api,
|
||||
import_pgdata_upcall_api_token: import_pgdata_upcall_api_token.map(SecretString::from),
|
||||
import_pgdata_aws_endpoint_url,
|
||||
wal_receiver_protocol,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
|
||||
@@ -3,7 +3,7 @@ use metrics::{
|
||||
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
|
||||
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
|
||||
register_int_gauge, register_int_gauge_vec, register_uint_gauge, register_uint_gauge_vec,
|
||||
Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
|
||||
Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
|
||||
IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -457,6 +457,15 @@ pub(crate) static WAIT_LSN_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static FLUSH_WAIT_UPLOAD_TIME: Lazy<GaugeVec> = Lazy::new(|| {
|
||||
register_gauge_vec!(
|
||||
"pageserver_flush_wait_upload_seconds",
|
||||
"Time spent waiting for preceding uploads during layer flush",
|
||||
&["tenant_id", "shard_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_last_record_lsn",
|
||||
@@ -653,6 +662,35 @@ pub(crate) static COMPRESSION_IMAGE_OUTPUT_BYTES: Lazy<IntCounter> = Lazy::new(|
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_relsize_cache_entries",
|
||||
"Number of entries in the relation size cache",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!("pageserver_relsize_cache_hits", "Relation size cache hits",)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_cache_misses",
|
||||
"Relation size cache misses",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_MISSES_OLD: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_cache_misses_old",
|
||||
"Relation size cache misses where the lookup LSN is older than the last relation update"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) mod initial_logical_size {
|
||||
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -2336,6 +2374,7 @@ pub(crate) struct TimelineMetrics {
|
||||
shard_id: String,
|
||||
timeline_id: String,
|
||||
pub flush_time_histo: StorageTimeMetrics,
|
||||
pub flush_wait_upload_time_gauge: Gauge,
|
||||
pub compact_time_histo: StorageTimeMetrics,
|
||||
pub create_images_time_histo: StorageTimeMetrics,
|
||||
pub logical_size_histo: StorageTimeMetrics,
|
||||
@@ -2379,6 +2418,9 @@ impl TimelineMetrics {
|
||||
&shard_id,
|
||||
&timeline_id,
|
||||
);
|
||||
let flush_wait_upload_time_gauge = FLUSH_WAIT_UPLOAD_TIME
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
let compact_time_histo = StorageTimeMetrics::new(
|
||||
StorageTimeOperation::Compact,
|
||||
&tenant_id,
|
||||
@@ -2516,6 +2558,7 @@ impl TimelineMetrics {
|
||||
shard_id,
|
||||
timeline_id,
|
||||
flush_time_histo,
|
||||
flush_wait_upload_time_gauge,
|
||||
compact_time_histo,
|
||||
create_images_time_histo,
|
||||
logical_size_histo,
|
||||
@@ -2563,6 +2606,14 @@ impl TimelineMetrics {
|
||||
self.resident_physical_size_gauge.get()
|
||||
}
|
||||
|
||||
pub(crate) fn flush_wait_upload_time_gauge_add(&self, duration: f64) {
|
||||
self.flush_wait_upload_time_gauge.add(duration);
|
||||
crate::metrics::FLUSH_WAIT_UPLOAD_TIME
|
||||
.get_metric_with_label_values(&[&self.tenant_id, &self.shard_id, &self.timeline_id])
|
||||
.unwrap()
|
||||
.add(duration);
|
||||
}
|
||||
|
||||
pub(crate) fn shutdown(&self) {
|
||||
let was_shutdown = self
|
||||
.shutdown
|
||||
@@ -2579,6 +2630,7 @@ impl TimelineMetrics {
|
||||
let timeline_id = &self.timeline_id;
|
||||
let shard_id = &self.shard_id;
|
||||
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
let _ = FLUSH_WAIT_UPLOAD_TIME.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
{
|
||||
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
|
||||
|
||||
@@ -10,6 +10,9 @@ use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::aux_file;
|
||||
use crate::context::RequestContext;
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
|
||||
};
|
||||
use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
|
||||
@@ -1129,9 +1132,12 @@ impl Timeline {
|
||||
let rel_size_cache = self.rel_size_cache.read().unwrap();
|
||||
if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
|
||||
if lsn >= *cached_lsn {
|
||||
RELSIZE_CACHE_HITS.inc();
|
||||
return Some(*nblocks);
|
||||
}
|
||||
RELSIZE_CACHE_MISSES_OLD.inc();
|
||||
}
|
||||
RELSIZE_CACHE_MISSES.inc();
|
||||
None
|
||||
}
|
||||
|
||||
@@ -1156,6 +1162,7 @@ impl Timeline {
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert((lsn, nblocks));
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1163,13 +1170,17 @@ impl Timeline {
|
||||
/// Store cached relation size
|
||||
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
rel_size_cache.map.insert(tag, (lsn, nblocks));
|
||||
if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove cached relation size
|
||||
pub fn remove_cached_rel_size(&self, tag: &RelTag) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
rel_size_cache.map.remove(tag);
|
||||
if rel_size_cache.map.remove(tag).is_some() {
|
||||
RELSIZE_CACHE_ENTRIES.dec();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1229,10 +1240,9 @@ impl<'a> DatadirModification<'a> {
|
||||
}
|
||||
|
||||
pub(crate) fn has_dirty_data(&self) -> bool {
|
||||
!self
|
||||
.pending_data_batch
|
||||
self.pending_data_batch
|
||||
.as_ref()
|
||||
.map_or(true, |b| b.is_empty())
|
||||
.map_or(false, |b| b.has_data())
|
||||
}
|
||||
|
||||
/// Set the current lsn
|
||||
@@ -1408,7 +1418,7 @@ impl<'a> DatadirModification<'a> {
|
||||
Some(pending_batch) => {
|
||||
pending_batch.extend(batch);
|
||||
}
|
||||
None if !batch.is_empty() => {
|
||||
None if batch.has_data() => {
|
||||
self.pending_data_batch = Some(batch);
|
||||
}
|
||||
None => {
|
||||
|
||||
@@ -5344,6 +5344,7 @@ pub(crate) mod harness {
|
||||
lsn_lease_length: Some(tenant_conf.lsn_lease_length),
|
||||
lsn_lease_length_for_ts: Some(tenant_conf.lsn_lease_length_for_ts),
|
||||
timeline_offloading: Some(tenant_conf.timeline_offloading),
|
||||
wal_receiver_protocol_override: tenant_conf.wal_receiver_protocol_override,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use serde_json::Value;
|
||||
use std::num::NonZeroU64;
|
||||
use std::time::Duration;
|
||||
use utils::generation::Generation;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub(crate) enum AttachmentMode {
|
||||
@@ -353,6 +354,9 @@ pub struct TenantConfOpt {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub timeline_offloading: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -418,6 +422,9 @@ impl TenantConfOpt {
|
||||
timeline_offloading: self
|
||||
.lazy_slru_download
|
||||
.unwrap_or(global_conf.timeline_offloading),
|
||||
wal_receiver_protocol_override: self
|
||||
.wal_receiver_protocol_override
|
||||
.or(global_conf.wal_receiver_protocol_override),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -472,6 +479,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
lsn_lease_length: value.lsn_lease_length.map(humantime),
|
||||
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime),
|
||||
timeline_offloading: value.timeline_offloading,
|
||||
wal_receiver_protocol_override: value.wal_receiver_protocol_override,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,10 +8,8 @@ use crate::page_cache;
|
||||
use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
|
||||
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
|
||||
use crate::virtual_file::owned_buffers_io::util::size_tracking_writer;
|
||||
use crate::virtual_file::owned_buffers_io::write::Buffer;
|
||||
use crate::virtual_file::{self, owned_buffers_io, IoBufferMut, VirtualFile};
|
||||
use bytes::BytesMut;
|
||||
use camino::Utf8PathBuf;
|
||||
use num_traits::Num;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -20,6 +18,7 @@ use tracing::error;
|
||||
|
||||
use std::io;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::Arc;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
pub struct EphemeralFile {
|
||||
@@ -27,10 +26,7 @@ pub struct EphemeralFile {
|
||||
_timeline_id: TimelineId,
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
bytes_written: u64,
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter<
|
||||
BytesMut,
|
||||
size_tracking_writer::Writer<VirtualFile>,
|
||||
>,
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
|
||||
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
}
|
||||
@@ -42,9 +38,9 @@ impl EphemeralFile {
|
||||
conf: &PageServerConf,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<EphemeralFile, io::Error> {
|
||||
) -> anyhow::Result<EphemeralFile> {
|
||||
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
|
||||
let filename_disambiguator =
|
||||
NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
@@ -55,15 +51,17 @@ impl EphemeralFile {
|
||||
"ephemeral-{filename_disambiguator}"
|
||||
)));
|
||||
|
||||
let file = VirtualFile::open_with_options(
|
||||
&filename,
|
||||
virtual_file::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let file = Arc::new(
|
||||
VirtualFile::open_with_options_v2(
|
||||
&filename,
|
||||
virtual_file::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
|
||||
|
||||
@@ -73,10 +71,12 @@ impl EphemeralFile {
|
||||
page_cache_file_id,
|
||||
bytes_written: 0,
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
|
||||
size_tracking_writer::Writer::new(file),
|
||||
BytesMut::with_capacity(TAIL_SZ),
|
||||
file,
|
||||
|| IoBufferMut::with_capacity(TAIL_SZ),
|
||||
gate.enter()?,
|
||||
ctx,
|
||||
),
|
||||
_gate_guard: gate_guard,
|
||||
_gate_guard: gate.enter()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -85,7 +85,7 @@ impl Drop for EphemeralFile {
|
||||
fn drop(&mut self) {
|
||||
// unlink the file
|
||||
// we are clear to do this, because we have entered a gate
|
||||
let path = self.buffered_writer.as_inner().as_inner().path();
|
||||
let path = self.buffered_writer.as_inner().path();
|
||||
let res = std::fs::remove_file(path);
|
||||
if let Err(e) = res {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
@@ -132,6 +132,18 @@ impl EphemeralFile {
|
||||
srcbuf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<u64> {
|
||||
let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
|
||||
if let Some(control) = control {
|
||||
control.release().await;
|
||||
}
|
||||
Ok(pos)
|
||||
}
|
||||
|
||||
async fn write_raw_controlled(
|
||||
&mut self,
|
||||
srcbuf: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(u64, Option<owned_buffers_io::write::FlushControl>)> {
|
||||
let pos = self.bytes_written;
|
||||
|
||||
let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
|
||||
@@ -145,9 +157,9 @@ impl EphemeralFile {
|
||||
})?;
|
||||
|
||||
// Write the payload
|
||||
let nwritten = self
|
||||
let (nwritten, control) = self
|
||||
.buffered_writer
|
||||
.write_buffered_borrowed(srcbuf, ctx)
|
||||
.write_buffered_borrowed_controlled(srcbuf, ctx)
|
||||
.await?;
|
||||
assert_eq!(
|
||||
nwritten,
|
||||
@@ -157,7 +169,7 @@ impl EphemeralFile {
|
||||
|
||||
self.bytes_written = new_bytes_written;
|
||||
|
||||
Ok(pos)
|
||||
Ok((pos, control))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,11 +180,12 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
dst: tokio_epoll_uring::Slice<B>,
|
||||
ctx: &'a RequestContext,
|
||||
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
|
||||
let file_size_tracking_writer = self.buffered_writer.as_inner();
|
||||
let flushed_offset = file_size_tracking_writer.bytes_written();
|
||||
let submitted_offset = self.buffered_writer.bytes_submitted();
|
||||
|
||||
let buffer = self.buffered_writer.inspect_buffer();
|
||||
let buffered = &buffer[0..buffer.pending()];
|
||||
let mutable = self.buffered_writer.inspect_mutable();
|
||||
let mutable = &mutable[0..mutable.pending()];
|
||||
|
||||
let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
|
||||
|
||||
let dst_cap = dst.bytes_total().into_u64();
|
||||
let end = {
|
||||
@@ -197,11 +210,42 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
}
|
||||
}
|
||||
}
|
||||
let written_range = Range(start, std::cmp::min(end, flushed_offset));
|
||||
let buffered_range = Range(std::cmp::max(start, flushed_offset), end);
|
||||
|
||||
let (written_range, maybe_flushed_range) = {
|
||||
if maybe_flushed.is_some() {
|
||||
// [ written ][ maybe_flushed ][ mutable ]
|
||||
// <- TAIL_SZ -><- TAIL_SZ ->
|
||||
// ^
|
||||
// `submitted_offset`
|
||||
// <++++++ on disk +++++++????????????????>
|
||||
(
|
||||
Range(
|
||||
start,
|
||||
std::cmp::min(end, submitted_offset.saturating_sub(TAIL_SZ as u64)),
|
||||
),
|
||||
Range(
|
||||
std::cmp::max(start, submitted_offset.saturating_sub(TAIL_SZ as u64)),
|
||||
std::cmp::min(end, submitted_offset),
|
||||
),
|
||||
)
|
||||
} else {
|
||||
// [ written ][ mutable ]
|
||||
// <- TAIL_SZ ->
|
||||
// ^
|
||||
// `submitted_offset`
|
||||
// <++++++ on disk +++++++++++++++++++++++>
|
||||
(
|
||||
Range(start, std::cmp::min(end, submitted_offset)),
|
||||
// zero len
|
||||
Range(submitted_offset, u64::MIN),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
|
||||
|
||||
let dst = if written_range.len() > 0 {
|
||||
let file: &VirtualFile = file_size_tracking_writer.as_inner();
|
||||
let file: &VirtualFile = self.buffered_writer.as_inner();
|
||||
let bounds = dst.bounds();
|
||||
let slice = file
|
||||
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
|
||||
@@ -211,19 +255,21 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
dst
|
||||
};
|
||||
|
||||
let dst = if buffered_range.len() > 0 {
|
||||
let offset_in_buffer = buffered_range
|
||||
let dst = if maybe_flushed_range.len() > 0 {
|
||||
let offset_in_buffer = maybe_flushed_range
|
||||
.0
|
||||
.checked_sub(flushed_offset)
|
||||
.checked_sub(submitted_offset.saturating_sub(TAIL_SZ as u64))
|
||||
.unwrap()
|
||||
.into_usize();
|
||||
let to_copy =
|
||||
&buffered[offset_in_buffer..(offset_in_buffer + buffered_range.len().into_usize())];
|
||||
// Checked previously the buffer is Some.
|
||||
let maybe_flushed = maybe_flushed.unwrap();
|
||||
let to_copy = &maybe_flushed
|
||||
[offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())];
|
||||
let bounds = dst.bounds();
|
||||
let mut view = dst.slice({
|
||||
let start = written_range.len().into_usize();
|
||||
let end = start
|
||||
.checked_add(buffered_range.len().into_usize())
|
||||
.checked_add(maybe_flushed_range.len().into_usize())
|
||||
.unwrap();
|
||||
start..end
|
||||
});
|
||||
@@ -234,6 +280,28 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
dst
|
||||
};
|
||||
|
||||
let dst = if mutable_range.len() > 0 {
|
||||
let offset_in_buffer = mutable_range
|
||||
.0
|
||||
.checked_sub(submitted_offset)
|
||||
.unwrap()
|
||||
.into_usize();
|
||||
let to_copy =
|
||||
&mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
|
||||
let bounds = dst.bounds();
|
||||
let mut view = dst.slice({
|
||||
let start =
|
||||
written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
|
||||
let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
|
||||
start..end
|
||||
});
|
||||
view.as_mut_rust_slice_full_zeroed()
|
||||
.copy_from_slice(to_copy);
|
||||
Slice::from_buf_bounds(Slice::into_inner(view), bounds)
|
||||
} else {
|
||||
dst
|
||||
};
|
||||
|
||||
// TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
|
||||
|
||||
Ok((dst, (end - start).into_usize()))
|
||||
@@ -295,7 +363,7 @@ mod tests {
|
||||
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
|
||||
let file = EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
|
||||
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -326,14 +394,15 @@ mod tests {
|
||||
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
|
||||
let mut file =
|
||||
EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let cap = file.buffered_writer.inspect_buffer().capacity();
|
||||
let mutable = file.buffered_writer.inspect_mutable();
|
||||
let cap = mutable.capacity();
|
||||
let align = mutable.align();
|
||||
|
||||
let write_nbytes = cap + cap / 2;
|
||||
let write_nbytes = cap * 2 + cap / 2;
|
||||
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
@@ -341,30 +410,39 @@ mod tests {
|
||||
.collect();
|
||||
|
||||
let mut value_offsets = Vec::new();
|
||||
for i in 0..write_nbytes {
|
||||
let off = file.write_raw(&content[i..i + 1], &ctx).await.unwrap();
|
||||
for range in (0..write_nbytes)
|
||||
.step_by(align)
|
||||
.map(|start| start..(start + align).min(write_nbytes))
|
||||
{
|
||||
let off = file.write_raw(&content[range], &ctx).await.unwrap();
|
||||
value_offsets.push(off);
|
||||
}
|
||||
|
||||
assert!(file.len() as usize == write_nbytes);
|
||||
for i in 0..write_nbytes {
|
||||
assert_eq!(value_offsets[i], i.into_u64());
|
||||
let buf = IoBufferMut::with_capacity(1);
|
||||
assert_eq!(file.len() as usize, write_nbytes);
|
||||
for (i, range) in (0..write_nbytes)
|
||||
.step_by(align)
|
||||
.map(|start| start..(start + align).min(write_nbytes))
|
||||
.enumerate()
|
||||
{
|
||||
assert_eq!(value_offsets[i], range.start.into_u64());
|
||||
let buf = IoBufferMut::with_capacity(range.len());
|
||||
let (buf_slice, nread) = file
|
||||
.read_exact_at_eof_ok(i.into_u64(), buf.slice_full(), &ctx)
|
||||
.read_exact_at_eof_ok(range.start.into_u64(), buf.slice_full(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let buf = buf_slice.into_inner();
|
||||
assert_eq!(nread, 1);
|
||||
assert_eq!(&buf, &content[i..i + 1]);
|
||||
assert_eq!(nread, range.len());
|
||||
assert_eq!(&buf, &content[range]);
|
||||
}
|
||||
|
||||
let file_contents =
|
||||
std::fs::read(file.buffered_writer.as_inner().as_inner().path()).unwrap();
|
||||
assert_eq!(file_contents, &content[0..cap]);
|
||||
let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
|
||||
assert!(file_contents == content[0..cap * 2]);
|
||||
|
||||
let buffer_contents = file.buffered_writer.inspect_buffer();
|
||||
assert_eq!(buffer_contents, &content[cap..write_nbytes]);
|
||||
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
|
||||
assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
|
||||
|
||||
let mutable_buffer_contents = file.buffered_writer.inspect_mutable();
|
||||
assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -373,16 +451,16 @@ mod tests {
|
||||
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
|
||||
let mut file =
|
||||
EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let cap = file.buffered_writer.inspect_buffer().capacity();
|
||||
// mutable buffer and maybe_flushed buffer each has `cap` bytes.
|
||||
let cap = file.buffered_writer.inspect_mutable().capacity();
|
||||
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
.take(cap + cap / 2)
|
||||
.take(cap * 2 + cap / 2)
|
||||
.collect();
|
||||
|
||||
file.write_raw(&content, &ctx).await.unwrap();
|
||||
@@ -390,23 +468,21 @@ mod tests {
|
||||
// assert the state is as this test expects it to be
|
||||
assert_eq!(
|
||||
&file.load_to_io_buf(&ctx).await.unwrap(),
|
||||
&content[0..cap + cap / 2]
|
||||
&content[0..cap * 2 + cap / 2]
|
||||
);
|
||||
let md = file
|
||||
.buffered_writer
|
||||
.as_inner()
|
||||
.as_inner()
|
||||
.path()
|
||||
.metadata()
|
||||
.unwrap();
|
||||
let md = file.buffered_writer.as_inner().path().metadata().unwrap();
|
||||
assert_eq!(
|
||||
md.len(),
|
||||
cap.into_u64(),
|
||||
"buffered writer does one write if we write 1.5x buffer capacity"
|
||||
2 * cap.into_u64(),
|
||||
"buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
|
||||
);
|
||||
assert_eq!(
|
||||
&file.buffered_writer.inspect_buffer()[0..cap / 2],
|
||||
&content[cap..cap + cap / 2]
|
||||
&file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
|
||||
&content[cap..cap * 2]
|
||||
);
|
||||
assert_eq!(
|
||||
&file.buffered_writer.inspect_mutable()[0..cap / 2],
|
||||
&content[cap * 2..cap * 2 + cap / 2]
|
||||
);
|
||||
}
|
||||
|
||||
@@ -422,19 +498,19 @@ mod tests {
|
||||
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
|
||||
let mut file =
|
||||
EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let cap = file.buffered_writer.inspect_buffer().capacity();
|
||||
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mutable = file.buffered_writer.inspect_mutable();
|
||||
let cap = mutable.capacity();
|
||||
let align = mutable.align();
|
||||
let content: Vec<u8> = rand::thread_rng()
|
||||
.sample_iter(rand::distributions::Standard)
|
||||
.take(cap + cap / 2)
|
||||
.take(cap * 2 + cap / 2)
|
||||
.collect();
|
||||
|
||||
file.write_raw(&content, &ctx).await.unwrap();
|
||||
let (_, control) = file.write_raw_controlled(&content, &ctx).await.unwrap();
|
||||
|
||||
let test_read = |start: usize, len: usize| {
|
||||
let file = &file;
|
||||
@@ -454,16 +530,38 @@ mod tests {
|
||||
}
|
||||
};
|
||||
|
||||
let test_read_all_offset_combinations = || {
|
||||
async move {
|
||||
test_read(align, align).await;
|
||||
// border onto edge of file
|
||||
test_read(cap - align, align).await;
|
||||
// read across file and buffer
|
||||
test_read(cap - align, 2 * align).await;
|
||||
// stay from start of maybe flushed buffer
|
||||
test_read(cap, align).await;
|
||||
// completely within maybe flushed buffer
|
||||
test_read(cap + align, align).await;
|
||||
// border onto edge of maybe flushed buffer.
|
||||
test_read(cap * 2 - align, align).await;
|
||||
// read across maybe flushed and mutable buffer
|
||||
test_read(cap * 2 - align, 2 * align).await;
|
||||
// read across three segments
|
||||
test_read(cap - align, cap + 2 * align).await;
|
||||
// completely within mutable buffer
|
||||
test_read(cap * 2 + align, align).await;
|
||||
}
|
||||
};
|
||||
|
||||
// completely within the file range
|
||||
assert!(20 < cap, "test assumption");
|
||||
test_read(10, 10).await;
|
||||
// border onto edge of file
|
||||
test_read(cap - 10, 10).await;
|
||||
// read across file and buffer
|
||||
test_read(cap - 10, 20).await;
|
||||
// stay from start of buffer
|
||||
test_read(cap, 10).await;
|
||||
// completely within buffer
|
||||
test_read(cap + 10, 10).await;
|
||||
assert!(align < cap, "test assumption");
|
||||
assert!(cap % align == 0);
|
||||
|
||||
// test reads at different flush stages.
|
||||
let not_started = control.unwrap().into_not_started();
|
||||
test_read_all_offset_combinations().await;
|
||||
let in_progress = not_started.ready_to_flush();
|
||||
test_read_all_offset_combinations().await;
|
||||
in_progress.wait_until_flush_is_done().await;
|
||||
test_read_all_offset_combinations().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -681,6 +681,7 @@ impl RemoteTimelineClient {
|
||||
layer_file_name: &LayerName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
local_path: &Utf8Path,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, DownloadError> {
|
||||
@@ -700,6 +701,7 @@ impl RemoteTimelineClient {
|
||||
layer_file_name,
|
||||
layer_metadata,
|
||||
local_path,
|
||||
gate,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::future::Future;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
@@ -26,9 +27,7 @@ use crate::span::{
|
||||
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
|
||||
use crate::tenant::storage_layer::LayerName;
|
||||
use crate::tenant::Generation;
|
||||
#[cfg_attr(target_os = "macos", allow(unused_imports))]
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
|
||||
use crate::virtual_file::{on_fatal_io_error, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
@@ -58,6 +57,7 @@ pub async fn download_layer_file<'a>(
|
||||
layer_file_name: &'a LayerName,
|
||||
layer_metadata: &'a LayerFileMetadata,
|
||||
local_path: &Utf8Path,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, DownloadError> {
|
||||
@@ -86,7 +86,9 @@ pub async fn download_layer_file<'a>(
|
||||
let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
|
||||
|
||||
let bytes_amount = download_retry(
|
||||
|| async { download_object(storage, &remote_path, &temp_file_path, cancel, ctx).await },
|
||||
|| async {
|
||||
download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await
|
||||
},
|
||||
&format!("download {remote_path:?}"),
|
||||
cancel,
|
||||
)
|
||||
@@ -146,6 +148,7 @@ async fn download_object<'a>(
|
||||
storage: &'a GenericRemoteStorage,
|
||||
src_path: &RemotePath,
|
||||
dst_path: &Utf8PathBuf,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: &CancellationToken,
|
||||
#[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
|
||||
) -> Result<u64, DownloadError> {
|
||||
@@ -203,13 +206,16 @@ async fn download_object<'a>(
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
|
||||
use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
|
||||
use bytes::BytesMut;
|
||||
use crate::virtual_file::owned_buffers_io;
|
||||
async {
|
||||
let destination_file = VirtualFile::create(dst_path, ctx)
|
||||
.await
|
||||
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
let destination_file = Arc::new(
|
||||
VirtualFile::create(dst_path, ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("create a destination file for layer '{dst_path}'")
|
||||
})
|
||||
.map_err(DownloadError::Other)?,
|
||||
);
|
||||
|
||||
let mut download = storage
|
||||
.download(src_path, &DownloadOpts::default(), cancel)
|
||||
@@ -217,14 +223,16 @@ async fn download_object<'a>(
|
||||
|
||||
pausable_failpoint!("before-downloading-layer-stream-pausable");
|
||||
|
||||
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
|
||||
destination_file,
|
||||
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
|
||||
gate.enter().map_err(|_| DownloadError::Cancelled)?,
|
||||
ctx,
|
||||
);
|
||||
|
||||
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
|
||||
// There's chunks_vectored() on the stream.
|
||||
let (bytes_amount, destination_file) = async {
|
||||
let size_tracking = size_tracking_writer::Writer::new(destination_file);
|
||||
let mut buffered = owned_buffers_io::write::BufferedWriter::<BytesMut, _>::new(
|
||||
size_tracking,
|
||||
BytesMut::with_capacity(super::BUFFER_SIZE),
|
||||
);
|
||||
while let Some(res) =
|
||||
futures::StreamExt::next(&mut download.download_stream).await
|
||||
{
|
||||
@@ -232,10 +240,10 @@ async fn download_object<'a>(
|
||||
Ok(chunk) => chunk,
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
buffered.write_buffered(chunk.slice_len(), ctx).await?;
|
||||
buffered.write_buffered_borrowed(&chunk, ctx).await?;
|
||||
}
|
||||
let size_tracking = buffered.flush_and_into_inner(ctx).await?;
|
||||
Ok(size_tracking.into_inner())
|
||||
let inner = buffered.flush_and_into_inner(ctx).await?;
|
||||
Ok(inner)
|
||||
}
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -1181,6 +1181,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
&layer.name,
|
||||
&layer.metadata,
|
||||
&local_path,
|
||||
&self.secondary_state.gate,
|
||||
&self.secondary_state.cancel,
|
||||
ctx,
|
||||
)
|
||||
|
||||
@@ -555,13 +555,12 @@ impl InMemoryLayer {
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
start_lsn: Lsn,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<InMemoryLayer> {
|
||||
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
|
||||
|
||||
let file =
|
||||
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?;
|
||||
let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate, ctx).await?;
|
||||
let key = InMemoryLayerFileId(file.page_cache_file_id());
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
|
||||
@@ -1149,6 +1149,7 @@ impl LayerInner {
|
||||
&self.desc.layer_name(),
|
||||
&self.metadata(),
|
||||
&self.path,
|
||||
&timeline.gate,
|
||||
&timeline.cancel,
|
||||
ctx,
|
||||
)
|
||||
|
||||
@@ -50,6 +50,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::{
|
||||
fs_ext, pausable_failpoint,
|
||||
postgres_client::PostgresClientProtocol,
|
||||
sync::gate::{Gate, GateGuard},
|
||||
};
|
||||
use wal_decoder::serialized_batch::SerializedValueBatch;
|
||||
@@ -2178,6 +2179,21 @@ impl Timeline {
|
||||
)
|
||||
}
|
||||
|
||||
/// Resolve the effective WAL receiver protocol to use for this tenant.
|
||||
///
|
||||
/// Priority order is:
|
||||
/// 1. Tenant config override
|
||||
/// 2. Default value for tenant config override
|
||||
/// 3. Pageserver config override
|
||||
/// 4. Pageserver config default
|
||||
pub fn resolve_wal_receiver_protocol(&self) -> PostgresClientProtocol {
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.wal_receiver_protocol_override
|
||||
.or(self.conf.default_tenant_conf.wal_receiver_protocol_override)
|
||||
.unwrap_or(self.conf.wal_receiver_protocol)
|
||||
}
|
||||
|
||||
pub(super) fn tenant_conf_updated(&self, new_conf: &AttachedTenantConf) {
|
||||
// NB: Most tenant conf options are read by background loops, so,
|
||||
// changes will automatically be picked up.
|
||||
@@ -2470,6 +2486,7 @@ impl Timeline {
|
||||
*guard = Some(WalReceiver::start(
|
||||
Arc::clone(self),
|
||||
WalReceiverConf {
|
||||
protocol: self.resolve_wal_receiver_protocol(),
|
||||
wal_connect_timeout,
|
||||
lagging_wal_timeout,
|
||||
max_lsn_wal_lag,
|
||||
@@ -3470,7 +3487,6 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
let mut guard = self.layers.write().await;
|
||||
let gate_guard = self.gate.enter().context("enter gate for inmem layer")?;
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
ensure!(
|
||||
@@ -3487,7 +3503,7 @@ impl Timeline {
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
gate_guard,
|
||||
&self.gate,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -3829,7 +3845,8 @@ impl Timeline {
|
||||
};
|
||||
|
||||
// Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files.
|
||||
// This makes us refuse ingest until the new layers have been persisted to the remote.
|
||||
// This makes us refuse ingest until the new layers have been persisted to the remote
|
||||
let start = Instant::now();
|
||||
self.remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
@@ -3842,6 +3859,8 @@ impl Timeline {
|
||||
FlushLayerError::Other(anyhow!(e).into())
|
||||
}
|
||||
})?;
|
||||
let duration = start.elapsed().as_secs_f64();
|
||||
self.metrics.flush_wait_upload_time_gauge_add(duration);
|
||||
|
||||
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
|
||||
// a compaction can delete the file and then it won't be available for uploads any more.
|
||||
@@ -5896,7 +5915,7 @@ impl<'a> TimelineWriter<'a> {
|
||||
batch: SerializedValueBatch,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
if batch.is_empty() {
|
||||
if !batch.has_data() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
||||
@@ -182,7 +182,7 @@ impl OpenLayerManager {
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
ensure!(lsn.is_aligned());
|
||||
@@ -212,15 +212,9 @@ impl OpenLayerManager {
|
||||
lsn
|
||||
);
|
||||
|
||||
let new_layer = InMemoryLayer::create(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
start_lsn,
|
||||
gate_guard,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let new_layer =
|
||||
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, &gate, ctx)
|
||||
.await?;
|
||||
let layer = Arc::new(new_layer);
|
||||
|
||||
self.layer_map.open_layer = Some(layer.clone());
|
||||
|
||||
@@ -38,6 +38,7 @@ use storage_broker::BrokerClientChannel;
|
||||
use tokio::sync::watch;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
use self::connection_manager::ConnectionManagerStatus;
|
||||
|
||||
@@ -45,6 +46,7 @@ use super::Timeline;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct WalReceiverConf {
|
||||
pub protocol: PostgresClientProtocol,
|
||||
/// The timeout on the connection to safekeeper for WAL streaming.
|
||||
pub wal_connect_timeout: Duration,
|
||||
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
|
||||
|
||||
@@ -36,7 +36,9 @@ use postgres_connection::PgConnectionConfig;
|
||||
use utils::backoff::{
|
||||
exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
};
|
||||
use utils::postgres_client::wal_stream_connection_config;
|
||||
use utils::postgres_client::{
|
||||
wal_stream_connection_config, ConnectionConfigArgs, PostgresClientProtocol,
|
||||
};
|
||||
use utils::{
|
||||
id::{NodeId, TenantTimelineId},
|
||||
lsn::Lsn,
|
||||
@@ -533,6 +535,7 @@ impl ConnectionManagerState {
|
||||
let node_id = new_sk.safekeeper_id;
|
||||
let connect_timeout = self.conf.wal_connect_timeout;
|
||||
let ingest_batch_size = self.conf.ingest_batch_size;
|
||||
let protocol = self.conf.protocol;
|
||||
let timeline = Arc::clone(&self.timeline);
|
||||
let ctx = ctx.detached_child(
|
||||
TaskKind::WalReceiverConnectionHandler,
|
||||
@@ -546,6 +549,7 @@ impl ConnectionManagerState {
|
||||
|
||||
let res = super::walreceiver_connection::handle_walreceiver_connection(
|
||||
timeline,
|
||||
protocol,
|
||||
new_sk.wal_source_connconf,
|
||||
events_sender,
|
||||
cancellation.clone(),
|
||||
@@ -984,15 +988,33 @@ impl ConnectionManagerState {
|
||||
if info.safekeeper_connstr.is_empty() {
|
||||
return None; // no connection string, ignore sk
|
||||
}
|
||||
match wal_stream_connection_config(
|
||||
self.id,
|
||||
info.safekeeper_connstr.as_ref(),
|
||||
match &self.conf.auth_token {
|
||||
None => None,
|
||||
Some(x) => Some(x),
|
||||
|
||||
let (shard_number, shard_count, shard_stripe_size) = match self.conf.protocol {
|
||||
PostgresClientProtocol::Vanilla => {
|
||||
(None, None, None)
|
||||
},
|
||||
self.conf.availability_zone.as_deref(),
|
||||
) {
|
||||
PostgresClientProtocol::Interpreted { .. } => {
|
||||
let shard_identity = self.timeline.get_shard_identity();
|
||||
(
|
||||
Some(shard_identity.number.0),
|
||||
Some(shard_identity.count.0),
|
||||
Some(shard_identity.stripe_size.0),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let connection_conf_args = ConnectionConfigArgs {
|
||||
protocol: self.conf.protocol,
|
||||
ttid: self.id,
|
||||
shard_number,
|
||||
shard_count,
|
||||
shard_stripe_size,
|
||||
listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
|
||||
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
|
||||
availability_zone: self.conf.availability_zone.as_deref()
|
||||
};
|
||||
|
||||
match wal_stream_connection_config(connection_conf_args) {
|
||||
Ok(connstr) => Some((*sk_id, info, connstr)),
|
||||
Err(e) => {
|
||||
error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
|
||||
@@ -1096,6 +1118,7 @@ impl ReconnectReason {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
|
||||
use pageserver_api::config::defaults::DEFAULT_WAL_RECEIVER_PROTOCOL;
|
||||
use url::Host;
|
||||
|
||||
fn dummy_broker_sk_timeline(
|
||||
@@ -1532,6 +1555,7 @@ mod tests {
|
||||
timeline,
|
||||
cancel: CancellationToken::new(),
|
||||
conf: WalReceiverConf {
|
||||
protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
|
||||
wal_connect_timeout: Duration::from_secs(1),
|
||||
lagging_wal_timeout: Duration::from_secs(1),
|
||||
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
|
||||
|
||||
@@ -22,7 +22,10 @@ use tokio::{select, sync::watch, time};
|
||||
use tokio_postgres::{replication::ReplicationStream, Client};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, trace, warn, Instrument};
|
||||
use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecord};
|
||||
use wal_decoder::{
|
||||
models::{FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords},
|
||||
wire_format::FromWireFormat,
|
||||
};
|
||||
|
||||
use super::TaskStateUpdate;
|
||||
use crate::{
|
||||
@@ -36,7 +39,7 @@ use crate::{
|
||||
use postgres_backend::is_expected_io_error;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use utils::{id::NodeId, lsn::Lsn};
|
||||
use utils::{id::NodeId, lsn::Lsn, postgres_client::PostgresClientProtocol};
|
||||
use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
|
||||
|
||||
/// Status of the connection.
|
||||
@@ -109,6 +112,7 @@ impl From<WalDecodeError> for WalReceiverError {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) async fn handle_walreceiver_connection(
|
||||
timeline: Arc<Timeline>,
|
||||
protocol: PostgresClientProtocol,
|
||||
wal_source_connconf: PgConnectionConfig,
|
||||
events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
|
||||
cancellation: CancellationToken,
|
||||
@@ -260,6 +264,14 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
|
||||
|
||||
let interpreted_proto_config = match protocol {
|
||||
PostgresClientProtocol::Vanilla => None,
|
||||
PostgresClientProtocol::Interpreted {
|
||||
format,
|
||||
compression,
|
||||
} => Some((format, compression)),
|
||||
};
|
||||
|
||||
while let Some(replication_message) = {
|
||||
select! {
|
||||
_ = cancellation.cancelled() => {
|
||||
@@ -291,6 +303,15 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
connection_status.latest_connection_update = now;
|
||||
connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
|
||||
}
|
||||
ReplicationMessage::RawInterpretedWalRecords(raw) => {
|
||||
connection_status.latest_connection_update = now;
|
||||
if !raw.data().is_empty() {
|
||||
connection_status.latest_wal_update = now;
|
||||
}
|
||||
|
||||
connection_status.commit_lsn = Some(Lsn::from(raw.commit_lsn()));
|
||||
connection_status.streaming_lsn = Some(Lsn::from(raw.streaming_lsn()));
|
||||
}
|
||||
&_ => {}
|
||||
};
|
||||
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
|
||||
@@ -298,7 +319,144 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
uncommitted: &mut u64,
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
modification.commit(ctx).await?;
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let status_update = match replication_message {
|
||||
ReplicationMessage::RawInterpretedWalRecords(raw) => {
|
||||
WAL_INGEST.bytes_received.inc_by(raw.data().len() as u64);
|
||||
|
||||
let mut uncommitted_records = 0;
|
||||
let mut filtered_records = 0;
|
||||
|
||||
// This is the end LSN of the raw WAL from which the records
|
||||
// were interpreted.
|
||||
let streaming_lsn = Lsn::from(raw.streaming_lsn());
|
||||
|
||||
let (format, compression) = interpreted_proto_config.unwrap();
|
||||
let batch = InterpretedWalRecords::from_wire(raw.data(), format, compression)
|
||||
.await
|
||||
.with_context(|| {
|
||||
anyhow::anyhow!(
|
||||
"Failed to deserialize interpreted records ending at LSN {streaming_lsn}"
|
||||
)
|
||||
})?;
|
||||
|
||||
let InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn,
|
||||
} = batch;
|
||||
|
||||
tracing::debug!(
|
||||
"Received WAL up to {} with next_record_lsn={:?}",
|
||||
streaming_lsn,
|
||||
next_record_lsn
|
||||
);
|
||||
|
||||
// We start the modification at 0 because each interpreted record
|
||||
// advances it to its end LSN. 0 is just an initialization placeholder.
|
||||
let mut modification = timeline.begin_modification(Lsn(0));
|
||||
|
||||
for interpreted in records {
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||
&& uncommitted_records > 0
|
||||
{
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let local_next_record_lsn = interpreted.next_record_lsn;
|
||||
let ingested = walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("could not ingest record at {local_next_record_lsn}")
|
||||
})?;
|
||||
|
||||
if !ingested {
|
||||
tracing::debug!(
|
||||
"ingest: filtered out record @ LSN {local_next_record_lsn}"
|
||||
);
|
||||
WAL_INGEST.records_filtered.inc();
|
||||
filtered_records += 1;
|
||||
}
|
||||
|
||||
uncommitted_records += 1;
|
||||
|
||||
// FIXME: this cannot be made pausable_failpoint without fixing the
|
||||
// failpoint library; in tests, the added amount of debugging will cause us
|
||||
// to timeout the tests.
|
||||
fail_point!("walreceiver-after-ingest");
|
||||
|
||||
// Commit every ingest_batch_size records. Even if we filtered out
|
||||
// all records, we still need to call commit to advance the LSN.
|
||||
if uncommitted_records >= ingest_batch_size
|
||||
|| modification.approx_pending_bytes()
|
||||
> DatadirModification::MAX_PENDING_BYTES
|
||||
{
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Records might have been filtered out on the safekeeper side, but we still
|
||||
// need to advance last record LSN on all shards. If we've not ingested the latest
|
||||
// record, then set the LSN of the modification past it. This way all shards
|
||||
// advance their last record LSN at the same time.
|
||||
let needs_last_record_lsn_advance = match next_record_lsn.map(Lsn::from) {
|
||||
Some(lsn) if lsn > modification.get_lsn() => {
|
||||
modification.set_lsn(lsn).unwrap();
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if uncommitted_records > 0 || needs_last_record_lsn_advance {
|
||||
// Commit any uncommitted records
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if !caught_up && streaming_lsn >= end_of_wal {
|
||||
info!("caught up at LSN {streaming_lsn}");
|
||||
caught_up = true;
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
"Ingested WAL up to {streaming_lsn}. Last record LSN is {}",
|
||||
timeline.get_last_record_lsn()
|
||||
);
|
||||
|
||||
Some(streaming_lsn)
|
||||
}
|
||||
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
// Pass the WAL data to the decoder, and see if we can decode
|
||||
// more records as a result.
|
||||
@@ -316,21 +474,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
let mut uncommitted_records = 0;
|
||||
let mut filtered_records = 0;
|
||||
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
uncommitted: &mut u64,
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
modification.commit(ctx).await?;
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
|
||||
@@ -20,7 +20,7 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use owned_buffers_io::aligned_buffer::buffer::AlignedBuffer;
|
||||
use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlign};
|
||||
use owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
|
||||
use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut};
|
||||
use owned_buffers_io::io_buf_ext::FullSlice;
|
||||
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -63,9 +63,6 @@ pub(crate) mod owned_buffers_io {
|
||||
pub(crate) mod io_buf_ext;
|
||||
pub(crate) mod slice;
|
||||
pub(crate) mod write;
|
||||
pub(crate) mod util {
|
||||
pub(crate) mod size_tracking_writer;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -221,7 +218,7 @@ impl VirtualFile {
|
||||
self.inner.read_exact_at_page(page, offset, ctx).await
|
||||
}
|
||||
|
||||
pub async fn write_all_at<Buf: IoBuf + Send>(
|
||||
pub async fn write_all_at<Buf: IoBufAligned + Send>(
|
||||
&self,
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
@@ -1325,14 +1322,14 @@ impl Drop for VirtualFileInner {
|
||||
}
|
||||
|
||||
impl OwnedAsyncWriter for VirtualFile {
|
||||
#[inline(always)]
|
||||
async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
async fn write_all_at<Buf: IoBufAligned + Send>(
|
||||
&self,
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(usize, FullSlice<Buf>)> {
|
||||
let (buf, res) = VirtualFile::write_all(self, buf, ctx).await;
|
||||
res.map(move |v| (v, buf))
|
||||
) -> std::io::Result<FullSlice<Buf>> {
|
||||
let (buf, res) = VirtualFile::write_all_at(self, buf, offset, ctx).await;
|
||||
res.map(|_| buf)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1451,7 +1448,7 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn write_all_at<Buf: IoBuf + Send>(
|
||||
async fn write_all_at<Buf: IoBufAligned + Send>(
|
||||
&self,
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
@@ -1594,6 +1591,7 @@ mod tests {
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
file_a
|
||||
.write_all(b"foobar".to_vec().slice_len(), &ctx)
|
||||
.await?;
|
||||
@@ -1652,10 +1650,10 @@ mod tests {
|
||||
)
|
||||
.await?;
|
||||
file_b
|
||||
.write_all_at(b"BAR".to_vec().slice_len(), 3, &ctx)
|
||||
.write_all_at(IoBuffer::from(b"BAR").slice_len(), 3, &ctx)
|
||||
.await?;
|
||||
file_b
|
||||
.write_all_at(b"FOO".to_vec().slice_len(), 0, &ctx)
|
||||
.write_all_at(IoBuffer::from(b"FOO").slice_len(), 0, &ctx)
|
||||
.await?;
|
||||
|
||||
assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
|
||||
|
||||
@@ -4,7 +4,7 @@ pub trait Alignment: std::marker::Unpin + 'static {
|
||||
}
|
||||
|
||||
/// Alignment at compile time.
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct ConstAlign<const A: usize>;
|
||||
|
||||
impl<const A: usize> Alignment for ConstAlign<A> {
|
||||
@@ -14,7 +14,7 @@ impl<const A: usize> Alignment for ConstAlign<A> {
|
||||
}
|
||||
|
||||
/// Alignment at run time.
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct RuntimeAlign {
|
||||
align: usize,
|
||||
}
|
||||
|
||||
@@ -3,9 +3,10 @@ use std::{
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use super::{alignment::Alignment, raw::RawAlignedBuffer};
|
||||
use super::{alignment::Alignment, raw::RawAlignedBuffer, AlignedBufferMut, ConstAlign};
|
||||
|
||||
/// An shared, immutable aligned buffer type.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AlignedBuffer<A: Alignment> {
|
||||
/// Shared raw buffer.
|
||||
raw: Arc<RawAlignedBuffer<A>>,
|
||||
@@ -86,6 +87,13 @@ impl<A: Alignment> AlignedBuffer<A> {
|
||||
range: begin..end,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the mutable aligned buffer, if the immutable aligned buffer
|
||||
/// has exactly one strong reference. Otherwise returns `None`.
|
||||
pub fn into_mut(self) -> Option<AlignedBufferMut<A>> {
|
||||
let raw = Arc::into_inner(self.raw)?;
|
||||
Some(AlignedBufferMut::from_raw(raw))
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> Deref for AlignedBuffer<A> {
|
||||
@@ -108,6 +116,14 @@ impl<A: Alignment> PartialEq<[u8]> for AlignedBuffer<A> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<const A: usize, const N: usize> From<&[u8; N]> for AlignedBuffer<ConstAlign<A>> {
|
||||
fn from(value: &[u8; N]) -> Self {
|
||||
let mut buf = AlignedBufferMut::with_capacity(N);
|
||||
buf.extend_from_slice(value);
|
||||
buf.freeze()
|
||||
}
|
||||
}
|
||||
|
||||
/// SAFETY: the underlying buffer references a stable memory region.
|
||||
unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBuffer<A> {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::{
|
||||
mem::MaybeUninit,
|
||||
ops::{Deref, DerefMut},
|
||||
};
|
||||
|
||||
use super::{
|
||||
alignment::{Alignment, ConstAlign},
|
||||
@@ -46,6 +49,11 @@ impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
|
||||
}
|
||||
|
||||
impl<A: Alignment> AlignedBufferMut<A> {
|
||||
/// Constructs a mutable aligned buffer from raw.
|
||||
pub(super) fn from_raw(raw: RawAlignedBuffer<A>) -> Self {
|
||||
AlignedBufferMut { raw }
|
||||
}
|
||||
|
||||
/// Returns the total number of bytes the buffer can hold.
|
||||
#[inline]
|
||||
pub fn capacity(&self) -> usize {
|
||||
@@ -128,6 +136,39 @@ impl<A: Alignment> AlignedBufferMut<A> {
|
||||
let len = self.len();
|
||||
AlignedBuffer::from_raw(self.raw, 0..len)
|
||||
}
|
||||
|
||||
/// Clones and appends all elements in a slice to the buffer. Reserves additional capacity as needed.
|
||||
#[inline]
|
||||
pub fn extend_from_slice(&mut self, extend: &[u8]) {
|
||||
let cnt = extend.len();
|
||||
self.reserve(cnt);
|
||||
|
||||
// SAFETY: we already reserved additional `cnt` bytes, safe to perform memcpy.
|
||||
unsafe {
|
||||
let dst = self.spare_capacity_mut();
|
||||
// Reserved above
|
||||
debug_assert!(dst.len() >= cnt);
|
||||
|
||||
core::ptr::copy_nonoverlapping(extend.as_ptr(), dst.as_mut_ptr().cast(), cnt);
|
||||
}
|
||||
// SAFETY: We do have at least `cnt` bytes remaining before advance.
|
||||
unsafe {
|
||||
bytes::BufMut::advance_mut(self, cnt);
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the remaining spare capacity of the vector as a slice of `MaybeUninit<u8>`.
|
||||
#[inline]
|
||||
fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<u8>] {
|
||||
// SAFETY: we guarantees that the `Self::capacity()` bytes from
|
||||
// `Self::as_mut_ptr()` are allocated.
|
||||
unsafe {
|
||||
let ptr = self.as_mut_ptr().add(self.len());
|
||||
let len = self.capacity() - self.len();
|
||||
|
||||
core::slice::from_raw_parts_mut(ptr.cast(), len)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: Alignment> Deref for AlignedBufferMut<A> {
|
||||
|
||||
@@ -19,7 +19,7 @@ impl<'a, const N: usize, const A: usize> AlignedSlice<'a, N, ConstAlign<A>> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, const N: usize, A: Alignment> Deref for AlignedSlice<'a, N, A> {
|
||||
impl<const N: usize, A: Alignment> Deref for AlignedSlice<'_, N, A> {
|
||||
type Target = [u8; N];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
@@ -27,13 +27,13 @@ impl<'a, const N: usize, A: Alignment> Deref for AlignedSlice<'a, N, A> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, const N: usize, A: Alignment> DerefMut for AlignedSlice<'a, N, A> {
|
||||
impl<const N: usize, A: Alignment> DerefMut for AlignedSlice<'_, N, A> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.buf
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, const N: usize, A: Alignment> AsRef<[u8; N]> for AlignedSlice<'a, N, A> {
|
||||
impl<const N: usize, A: Alignment> AsRef<[u8; N]> for AlignedSlice<'_, N, A> {
|
||||
fn as_ref(&self) -> &[u8; N] {
|
||||
self.buf
|
||||
}
|
||||
|
||||
@@ -1,9 +1,15 @@
|
||||
use tokio_epoll_uring::IoBufMut;
|
||||
use tokio_epoll_uring::{IoBuf, IoBufMut};
|
||||
|
||||
use crate::virtual_file::{IoBufferMut, PageWriteGuardBuf};
|
||||
use crate::virtual_file::{IoBuffer, IoBufferMut, PageWriteGuardBuf};
|
||||
|
||||
/// A marker trait for a mutable aligned buffer type.
|
||||
pub trait IoBufAlignedMut: IoBufMut {}
|
||||
|
||||
/// A marker trait for an aligned buffer type.
|
||||
pub trait IoBufAligned: IoBuf {}
|
||||
|
||||
impl IoBufAlignedMut for IoBufferMut {}
|
||||
|
||||
impl IoBufAligned for IoBuffer {}
|
||||
|
||||
impl IoBufAlignedMut for PageWriteGuardBuf {}
|
||||
|
||||
@@ -5,6 +5,8 @@ use bytes::{Bytes, BytesMut};
|
||||
use std::ops::{Deref, Range};
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice};
|
||||
|
||||
use super::write::CheapCloneForRead;
|
||||
|
||||
/// The true owned equivalent for Rust [`slice`]. Use this for the write path.
|
||||
///
|
||||
/// Unlike [`tokio_epoll_uring::Slice`], which we unfortunately inherited from `tokio-uring`,
|
||||
@@ -43,6 +45,17 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<B> CheapCloneForRead for FullSlice<B>
|
||||
where
|
||||
B: IoBuf + CheapCloneForRead,
|
||||
{
|
||||
fn cheap_clone(&self) -> Self {
|
||||
Self {
|
||||
slice: self.slice.get_ref().cheap_clone().slice_full(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait IoBufExt {
|
||||
/// Get a [`FullSlice`] for the entire buffer, i.e., `self[..]` or `self[0..self.len()]`.
|
||||
fn slice_len(self) -> FullSlice<Self>
|
||||
|
||||
@@ -1,50 +0,0 @@
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
virtual_file::owned_buffers_io::{io_buf_ext::FullSlice, write::OwnedAsyncWriter},
|
||||
};
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
|
||||
pub struct Writer<W> {
|
||||
dst: W,
|
||||
bytes_amount: u64,
|
||||
}
|
||||
|
||||
impl<W> Writer<W> {
|
||||
pub fn new(dst: W) -> Self {
|
||||
Self {
|
||||
dst,
|
||||
bytes_amount: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bytes_written(&self) -> u64 {
|
||||
self.bytes_amount
|
||||
}
|
||||
|
||||
pub fn as_inner(&self) -> &W {
|
||||
&self.dst
|
||||
}
|
||||
|
||||
/// Returns the wrapped `VirtualFile` object as well as the number
|
||||
/// of bytes that were written to it through this object.
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub fn into_inner(self) -> (u64, W) {
|
||||
(self.bytes_amount, self.dst)
|
||||
}
|
||||
}
|
||||
|
||||
impl<W> OwnedAsyncWriter for Writer<W>
|
||||
where
|
||||
W: OwnedAsyncWriter,
|
||||
{
|
||||
#[inline(always)]
|
||||
async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<Buf>,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(usize, FullSlice<Buf>)> {
|
||||
let (nwritten, buf) = self.dst.write_all(buf, ctx).await?;
|
||||
self.bytes_amount += u64::try_from(nwritten).unwrap();
|
||||
Ok((nwritten, buf))
|
||||
}
|
||||
}
|
||||
@@ -1,55 +1,89 @@
|
||||
mod flush;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use flush::FlushHandle;
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
virtual_file::{IoBuffer, IoBufferMut},
|
||||
};
|
||||
|
||||
use super::io_buf_ext::{FullSlice, IoBufExt};
|
||||
use super::{
|
||||
io_buf_aligned::IoBufAligned,
|
||||
io_buf_ext::{FullSlice, IoBufExt},
|
||||
};
|
||||
|
||||
pub(crate) use flush::FlushControl;
|
||||
|
||||
pub(crate) trait CheapCloneForRead {
|
||||
/// Returns a cheap clone of the buffer.
|
||||
fn cheap_clone(&self) -> Self;
|
||||
}
|
||||
|
||||
impl CheapCloneForRead for IoBuffer {
|
||||
fn cheap_clone(&self) -> Self {
|
||||
// Cheap clone over an `Arc`.
|
||||
self.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// A trait for doing owned-buffer write IO.
|
||||
/// Think [`tokio::io::AsyncWrite`] but with owned buffers.
|
||||
/// The owned buffers need to be aligned due to Direct IO requirements.
|
||||
pub trait OwnedAsyncWriter {
|
||||
async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
fn write_all_at<Buf: IoBufAligned + Send>(
|
||||
&self,
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(usize, FullSlice<Buf>)>;
|
||||
) -> impl std::future::Future<Output = std::io::Result<FullSlice<Buf>>> + Send;
|
||||
}
|
||||
|
||||
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
|
||||
/// small writes into larger writes of size [`Buffer::cap`].
|
||||
///
|
||||
/// # Passthrough Of Large Writers
|
||||
///
|
||||
/// Calls to [`BufferedWriter::write_buffered`] that are larger than [`Buffer::cap`]
|
||||
/// cause the internal buffer to be flushed prematurely so that the large
|
||||
/// buffered write is passed through to the underlying [`OwnedAsyncWriter`].
|
||||
///
|
||||
/// This pass-through is generally beneficial for throughput, but if
|
||||
/// the storage backend of the [`OwnedAsyncWriter`] is a shared resource,
|
||||
/// unlimited large writes may cause latency or fairness issues.
|
||||
///
|
||||
/// In such cases, a different implementation that always buffers in memory
|
||||
/// may be preferable.
|
||||
pub struct BufferedWriter<B, W> {
|
||||
writer: W,
|
||||
// TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput,
|
||||
// since we would avoid copying majority of the data into the internal buffer.
|
||||
pub struct BufferedWriter<B: Buffer, W> {
|
||||
writer: Arc<W>,
|
||||
/// invariant: always remains Some(buf) except
|
||||
/// - while IO is ongoing => goes back to Some() once the IO completed successfully
|
||||
/// - after an IO error => stays `None` forever
|
||||
///
|
||||
/// In these exceptional cases, it's `None`.
|
||||
buf: Option<B>,
|
||||
mutable: Option<B>,
|
||||
/// A handle to the background flush task for writting data to disk.
|
||||
flush_handle: FlushHandle<B::IoBuf, W>,
|
||||
/// The number of bytes submitted to the background task.
|
||||
bytes_submitted: u64,
|
||||
}
|
||||
|
||||
impl<B, Buf, W> BufferedWriter<B, W>
|
||||
where
|
||||
B: Buffer<IoBuf = Buf> + Send,
|
||||
Buf: IoBuf + Send,
|
||||
W: OwnedAsyncWriter,
|
||||
B: Buffer<IoBuf = Buf> + Send + 'static,
|
||||
Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
|
||||
W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
|
||||
{
|
||||
pub fn new(writer: W, buf: B) -> Self {
|
||||
/// Creates a new buffered writer.
|
||||
///
|
||||
/// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
|
||||
pub fn new(
|
||||
writer: Arc<W>,
|
||||
buf_new: impl Fn() -> B,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
ctx: &RequestContext,
|
||||
) -> Self {
|
||||
Self {
|
||||
writer,
|
||||
buf: Some(buf),
|
||||
writer: writer.clone(),
|
||||
mutable: Some(buf_new()),
|
||||
flush_handle: FlushHandle::spawn_new(
|
||||
writer,
|
||||
buf_new(),
|
||||
gate_guard,
|
||||
ctx.attached_child(),
|
||||
),
|
||||
bytes_submitted: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,87 +91,70 @@ where
|
||||
&self.writer
|
||||
}
|
||||
|
||||
/// Returns the number of bytes submitted to the background flush task.
|
||||
pub fn bytes_submitted(&self) -> u64 {
|
||||
self.bytes_submitted
|
||||
}
|
||||
|
||||
/// Panics if used after any of the write paths returned an error
|
||||
pub fn inspect_buffer(&self) -> &B {
|
||||
self.buf()
|
||||
pub fn inspect_mutable(&self) -> &B {
|
||||
self.mutable()
|
||||
}
|
||||
|
||||
/// Gets a reference to the maybe flushed read-only buffer.
|
||||
/// Returns `None` if the writer has not submitted any flush request.
|
||||
pub fn inspect_maybe_flushed(&self) -> Option<&FullSlice<Buf>> {
|
||||
self.flush_handle.maybe_flushed.as_ref()
|
||||
}
|
||||
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub async fn flush_and_into_inner(mut self, ctx: &RequestContext) -> std::io::Result<W> {
|
||||
pub async fn flush_and_into_inner(
|
||||
mut self,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(u64, Arc<W>)> {
|
||||
self.flush(ctx).await?;
|
||||
|
||||
let Self { buf, writer } = self;
|
||||
let Self {
|
||||
mutable: buf,
|
||||
writer,
|
||||
mut flush_handle,
|
||||
bytes_submitted: bytes_amount,
|
||||
} = self;
|
||||
flush_handle.shutdown().await?;
|
||||
assert!(buf.is_some());
|
||||
Ok(writer)
|
||||
Ok((bytes_amount, writer))
|
||||
}
|
||||
|
||||
/// Gets a reference to the mutable in-memory buffer.
|
||||
#[inline(always)]
|
||||
fn buf(&self) -> &B {
|
||||
self.buf
|
||||
fn mutable(&self) -> &B {
|
||||
self.mutable
|
||||
.as_ref()
|
||||
.expect("must not use after we returned an error")
|
||||
}
|
||||
|
||||
/// Guarantees that if Ok() is returned, all bytes in `chunk` have been accepted.
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub async fn write_buffered<S: IoBuf + Send>(
|
||||
pub async fn write_buffered_borrowed(
|
||||
&mut self,
|
||||
chunk: FullSlice<S>,
|
||||
chunk: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<(usize, FullSlice<S>)> {
|
||||
let chunk = chunk.into_raw_slice();
|
||||
|
||||
let chunk_len = chunk.len();
|
||||
// avoid memcpy for the middle of the chunk
|
||||
if chunk.len() >= self.buf().cap() {
|
||||
self.flush(ctx).await?;
|
||||
// do a big write, bypassing `buf`
|
||||
assert_eq!(
|
||||
self.buf
|
||||
.as_ref()
|
||||
.expect("must not use after an error")
|
||||
.pending(),
|
||||
0
|
||||
);
|
||||
let (nwritten, chunk) = self
|
||||
.writer
|
||||
.write_all(FullSlice::must_new(chunk), ctx)
|
||||
.await?;
|
||||
assert_eq!(nwritten, chunk_len);
|
||||
return Ok((nwritten, chunk));
|
||||
) -> std::io::Result<usize> {
|
||||
let (len, control) = self.write_buffered_borrowed_controlled(chunk, ctx).await?;
|
||||
if let Some(control) = control {
|
||||
control.release().await;
|
||||
}
|
||||
// in-memory copy the < BUFFER_SIZED tail of the chunk
|
||||
assert!(chunk.len() < self.buf().cap());
|
||||
let mut slice = &chunk[..];
|
||||
while !slice.is_empty() {
|
||||
let buf = self.buf.as_mut().expect("must not use after an error");
|
||||
let need = buf.cap() - buf.pending();
|
||||
let have = slice.len();
|
||||
let n = std::cmp::min(need, have);
|
||||
buf.extend_from_slice(&slice[..n]);
|
||||
slice = &slice[n..];
|
||||
if buf.pending() >= buf.cap() {
|
||||
assert_eq!(buf.pending(), buf.cap());
|
||||
self.flush(ctx).await?;
|
||||
}
|
||||
}
|
||||
assert!(slice.is_empty(), "by now we should have drained the chunk");
|
||||
Ok((chunk_len, FullSlice::must_new(chunk)))
|
||||
Ok(len)
|
||||
}
|
||||
|
||||
/// Strictly less performant variant of [`Self::write_buffered`] that allows writing borrowed data.
|
||||
///
|
||||
/// It is less performant because we always have to copy the borrowed data into the internal buffer
|
||||
/// before we can do the IO. The [`Self::write_buffered`] can avoid this, which is more performant
|
||||
/// for large writes.
|
||||
pub async fn write_buffered_borrowed(
|
||||
/// In addition to bytes submitted in this write, also returns a handle that can control the flush behavior.
|
||||
pub(crate) async fn write_buffered_borrowed_controlled(
|
||||
&mut self,
|
||||
mut chunk: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<usize> {
|
||||
) -> std::io::Result<(usize, Option<FlushControl>)> {
|
||||
let chunk_len = chunk.len();
|
||||
let mut control: Option<FlushControl> = None;
|
||||
while !chunk.is_empty() {
|
||||
let buf = self.buf.as_mut().expect("must not use after an error");
|
||||
let buf = self.mutable.as_mut().expect("must not use after an error");
|
||||
let need = buf.cap() - buf.pending();
|
||||
let have = chunk.len();
|
||||
let n = std::cmp::min(need, have);
|
||||
@@ -145,26 +162,27 @@ where
|
||||
chunk = &chunk[n..];
|
||||
if buf.pending() >= buf.cap() {
|
||||
assert_eq!(buf.pending(), buf.cap());
|
||||
self.flush(ctx).await?;
|
||||
if let Some(control) = control.take() {
|
||||
control.release().await;
|
||||
}
|
||||
control = self.flush(ctx).await?;
|
||||
}
|
||||
}
|
||||
Ok(chunk_len)
|
||||
Ok((chunk_len, control))
|
||||
}
|
||||
|
||||
async fn flush(&mut self, ctx: &RequestContext) -> std::io::Result<()> {
|
||||
let buf = self.buf.take().expect("must not use after an error");
|
||||
#[must_use = "caller must explcitly check the flush control"]
|
||||
async fn flush(&mut self, _ctx: &RequestContext) -> std::io::Result<Option<FlushControl>> {
|
||||
let buf = self.mutable.take().expect("must not use after an error");
|
||||
let buf_len = buf.pending();
|
||||
if buf_len == 0 {
|
||||
self.buf = Some(buf);
|
||||
return Ok(());
|
||||
self.mutable = Some(buf);
|
||||
return Ok(None);
|
||||
}
|
||||
let slice = buf.flush();
|
||||
let (nwritten, slice) = self.writer.write_all(slice, ctx).await?;
|
||||
assert_eq!(nwritten, buf_len);
|
||||
self.buf = Some(Buffer::reuse_after_flush(
|
||||
slice.into_raw_slice().into_inner(),
|
||||
));
|
||||
Ok(())
|
||||
let (recycled, flush_control) = self.flush_handle.flush(buf, self.bytes_submitted).await?;
|
||||
self.bytes_submitted += u64::try_from(buf_len).unwrap();
|
||||
self.mutable = Some(recycled);
|
||||
Ok(Some(flush_control))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,37 +237,73 @@ impl Buffer for BytesMut {
|
||||
}
|
||||
}
|
||||
|
||||
impl OwnedAsyncWriter for Vec<u8> {
|
||||
async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
buf: FullSlice<Buf>,
|
||||
_: &RequestContext,
|
||||
) -> std::io::Result<(usize, FullSlice<Buf>)> {
|
||||
self.extend_from_slice(&buf[..]);
|
||||
Ok((buf.len(), buf))
|
||||
impl Buffer for IoBufferMut {
|
||||
type IoBuf = IoBuffer;
|
||||
|
||||
fn cap(&self) -> usize {
|
||||
self.capacity()
|
||||
}
|
||||
|
||||
fn extend_from_slice(&mut self, other: &[u8]) {
|
||||
IoBufferMut::extend_from_slice(self, other);
|
||||
}
|
||||
|
||||
fn pending(&self) -> usize {
|
||||
self.len()
|
||||
}
|
||||
|
||||
fn flush(self) -> FullSlice<Self::IoBuf> {
|
||||
self.freeze().slice_len()
|
||||
}
|
||||
|
||||
/// Caller should make sure that `iobuf` only have one strong reference before invoking this method.
|
||||
fn reuse_after_flush(iobuf: Self::IoBuf) -> Self {
|
||||
let mut recycled = iobuf
|
||||
.into_mut()
|
||||
.expect("buffer should only have one strong reference");
|
||||
recycled.clear();
|
||||
recycled
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use bytes::BytesMut;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use super::*;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
struct RecorderWriter {
|
||||
writes: Vec<Vec<u8>>,
|
||||
/// record bytes and write offsets.
|
||||
writes: Mutex<Vec<(Vec<u8>, u64)>>,
|
||||
}
|
||||
|
||||
impl RecorderWriter {
|
||||
/// Gets recorded bytes and write offsets.
|
||||
fn get_writes(&self) -> Vec<Vec<u8>> {
|
||||
self.writes
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|(buf, _)| buf.clone())
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl OwnedAsyncWriter for RecorderWriter {
|
||||
async fn write_all<Buf: IoBuf + Send>(
|
||||
&mut self,
|
||||
async fn write_all_at<Buf: IoBufAligned + Send>(
|
||||
&self,
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
_: &RequestContext,
|
||||
) -> std::io::Result<(usize, FullSlice<Buf>)> {
|
||||
self.writes.push(Vec::from(&buf[..]));
|
||||
Ok((buf.len(), buf))
|
||||
) -> std::io::Result<FullSlice<Buf>> {
|
||||
self.writes
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push((Vec::from(&buf[..]), offset));
|
||||
Ok(buf)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -257,71 +311,21 @@ mod tests {
|
||||
RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
|
||||
}
|
||||
|
||||
macro_rules! write {
|
||||
($writer:ident, $data:literal) => {{
|
||||
$writer
|
||||
.write_buffered(::bytes::Bytes::from_static($data).slice_len(), &test_ctx())
|
||||
.await?;
|
||||
}};
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_buffered_writes_only() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
write!(writer, b"a");
|
||||
write!(writer, b"b");
|
||||
write!(writer, b"c");
|
||||
write!(writer, b"d");
|
||||
write!(writer, b"e");
|
||||
let recorder = writer.flush_and_into_inner(&test_ctx()).await?;
|
||||
assert_eq!(
|
||||
recorder.writes,
|
||||
vec![Vec::from(b"ab"), Vec::from(b"cd"), Vec::from(b"e")]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_passthrough_writes_only() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
write!(writer, b"abc");
|
||||
write!(writer, b"de");
|
||||
write!(writer, b"");
|
||||
write!(writer, b"fghijk");
|
||||
let recorder = writer.flush_and_into_inner(&test_ctx()).await?;
|
||||
assert_eq!(
|
||||
recorder.writes,
|
||||
vec![Vec::from(b"abc"), Vec::from(b"de"), Vec::from(b"fghijk")]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> {
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
write!(writer, b"a");
|
||||
write!(writer, b"bc");
|
||||
write!(writer, b"d");
|
||||
write!(writer, b"e");
|
||||
let recorder = writer.flush_and_into_inner(&test_ctx()).await?;
|
||||
assert_eq!(
|
||||
recorder.writes,
|
||||
vec![Vec::from(b"a"), Vec::from(b"bc"), Vec::from(b"de")]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_all_borrowed_always_goes_through_buffer() -> std::io::Result<()> {
|
||||
async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> {
|
||||
let ctx = test_ctx();
|
||||
let ctx = &ctx;
|
||||
let recorder = RecorderWriter::default();
|
||||
let mut writer = BufferedWriter::new(recorder, BytesMut::with_capacity(2));
|
||||
let recorder = Arc::new(RecorderWriter::default());
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
|
||||
recorder,
|
||||
|| IoBufferMut::with_capacity(2),
|
||||
gate.enter()?,
|
||||
ctx,
|
||||
);
|
||||
|
||||
writer.write_buffered_borrowed(b"abc", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"d", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"e", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"fg", ctx).await?;
|
||||
@@ -329,9 +333,9 @@ mod tests {
|
||||
writer.write_buffered_borrowed(b"j", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"klmno", ctx).await?;
|
||||
|
||||
let recorder = writer.flush_and_into_inner(ctx).await?;
|
||||
let (_, recorder) = writer.flush_and_into_inner(ctx).await?;
|
||||
assert_eq!(
|
||||
recorder.writes,
|
||||
recorder.get_writes(),
|
||||
{
|
||||
let expect: &[&[u8]] = &[b"ab", b"cd", b"ef", b"gh", b"ij", b"kl", b"mn", b"o"];
|
||||
expect
|
||||
|
||||
309
pageserver/src/virtual_file/owned_buffers_io/write/flush.rs
Normal file
309
pageserver/src/virtual_file/owned_buffers_io/write/flush.rs
Normal file
@@ -0,0 +1,309 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use utils::sync::duplex;
|
||||
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
virtual_file::owned_buffers_io::{io_buf_aligned::IoBufAligned, io_buf_ext::FullSlice},
|
||||
};
|
||||
|
||||
use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
|
||||
|
||||
/// A handle to the flush task.
|
||||
pub struct FlushHandle<Buf, W> {
|
||||
inner: Option<FlushHandleInner<Buf, W>>,
|
||||
/// Immutable buffer for serving tail reads.
|
||||
/// `None` if no flush request has been submitted.
|
||||
pub(super) maybe_flushed: Option<FullSlice<Buf>>,
|
||||
}
|
||||
|
||||
pub struct FlushHandleInner<Buf, W> {
|
||||
/// A bi-directional channel that sends (buffer, offset) for writes,
|
||||
/// and receives recyled buffer.
|
||||
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
|
||||
/// Join handle for the background flush task.
|
||||
join_handle: tokio::task::JoinHandle<std::io::Result<Arc<W>>>,
|
||||
}
|
||||
|
||||
struct FlushRequest<Buf> {
|
||||
slice: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
#[cfg(test)]
|
||||
ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
#[cfg(test)]
|
||||
done_flush_tx: tokio::sync::oneshot::Sender<()>,
|
||||
}
|
||||
|
||||
/// Constructs a request and a control object for a new flush operation.
|
||||
#[cfg(not(test))]
|
||||
fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
|
||||
let request = FlushRequest { slice, offset };
|
||||
let control = FlushControl::untracked();
|
||||
|
||||
(request, control)
|
||||
}
|
||||
|
||||
/// Constructs a request and a control object for a new flush operation.
|
||||
#[cfg(test)]
|
||||
fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
|
||||
let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
|
||||
let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel();
|
||||
let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx);
|
||||
|
||||
let request = FlushRequest {
|
||||
slice,
|
||||
offset,
|
||||
ready_to_flush_rx,
|
||||
done_flush_tx,
|
||||
};
|
||||
(request, control)
|
||||
}
|
||||
|
||||
/// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior.
|
||||
#[cfg(test)]
|
||||
pub(crate) struct FlushControl {
|
||||
not_started: FlushNotStarted,
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub(crate) struct FlushControl;
|
||||
|
||||
impl FlushControl {
|
||||
#[cfg(test)]
|
||||
fn not_started(
|
||||
ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
|
||||
done_flush_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
) -> Self {
|
||||
FlushControl {
|
||||
not_started: FlushNotStarted {
|
||||
ready_to_flush_tx,
|
||||
done_flush_rx,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
fn untracked() -> Self {
|
||||
FlushControl
|
||||
}
|
||||
|
||||
/// In tests, turn flush control into a not started state.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn into_not_started(self) -> FlushNotStarted {
|
||||
self.not_started
|
||||
}
|
||||
|
||||
/// Release control to the submitted buffer.
|
||||
///
|
||||
/// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution.
|
||||
pub async fn release(self) {
|
||||
#[cfg(test)]
|
||||
{
|
||||
self.not_started
|
||||
.ready_to_flush()
|
||||
.wait_until_flush_is_done()
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Buf, W> FlushHandle<Buf, W>
|
||||
where
|
||||
Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
|
||||
W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
|
||||
{
|
||||
/// Spawns a new background flush task and obtains a handle.
|
||||
///
|
||||
/// Note: The background task so we do not need to explicitly maintain a queue of buffers.
|
||||
pub fn spawn_new<B>(
|
||||
file: Arc<W>,
|
||||
buf: B,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
ctx: RequestContext,
|
||||
) -> Self
|
||||
where
|
||||
B: Buffer<IoBuf = Buf> + Send + 'static,
|
||||
{
|
||||
let (front, back) = duplex::mpsc::channel(2);
|
||||
|
||||
let join_handle = tokio::spawn(async move {
|
||||
FlushBackgroundTask::new(back, file, gate_guard, ctx)
|
||||
.run(buf.flush())
|
||||
.await
|
||||
});
|
||||
|
||||
FlushHandle {
|
||||
inner: Some(FlushHandleInner {
|
||||
channel: front,
|
||||
join_handle,
|
||||
}),
|
||||
maybe_flushed: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Submits a buffer to be flushed in the background task.
|
||||
/// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
|
||||
/// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
|
||||
/// clear `maybe_flushed`.
|
||||
pub async fn flush<B>(&mut self, buf: B, offset: u64) -> std::io::Result<(B, FlushControl)>
|
||||
where
|
||||
B: Buffer<IoBuf = Buf> + Send + 'static,
|
||||
{
|
||||
let slice = buf.flush();
|
||||
|
||||
// Saves a buffer for read while flushing. This also removes reference to the old buffer.
|
||||
self.maybe_flushed = Some(slice.cheap_clone());
|
||||
|
||||
let (request, flush_control) = new_flush_op(slice, offset);
|
||||
|
||||
// Submits the buffer to the background task.
|
||||
let submit = self.inner_mut().channel.send(request).await;
|
||||
if submit.is_err() {
|
||||
return self.handle_error().await;
|
||||
}
|
||||
|
||||
// Wait for an available buffer from the background flush task.
|
||||
let Some(recycled) = self.inner_mut().channel.recv().await else {
|
||||
return self.handle_error().await;
|
||||
};
|
||||
|
||||
// The only other place that could hold a reference to the recycled buffer
|
||||
// is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
|
||||
|
||||
let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
|
||||
Ok((recycled, flush_control))
|
||||
}
|
||||
|
||||
/// Cleans up the channel, join the flush task.
|
||||
pub async fn shutdown(&mut self) -> std::io::Result<Arc<W>> {
|
||||
let handle = self
|
||||
.inner
|
||||
.take()
|
||||
.expect("must not use after we returned an error");
|
||||
drop(handle.channel.tx);
|
||||
handle.join_handle.await.unwrap()
|
||||
}
|
||||
|
||||
/// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
|
||||
/// This only happens if the handle is used after an error.
|
||||
fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
|
||||
self.inner
|
||||
.as_mut()
|
||||
.expect("must not use after we returned an error")
|
||||
}
|
||||
|
||||
async fn handle_error<T>(&mut self) -> std::io::Result<T> {
|
||||
Err(self.shutdown().await.unwrap_err())
|
||||
}
|
||||
}
|
||||
|
||||
/// A background task for flushing data to disk.
|
||||
pub struct FlushBackgroundTask<Buf, W> {
|
||||
/// A bi-directional channel that receives (buffer, offset) for writes,
|
||||
/// and send back recycled buffer.
|
||||
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
|
||||
/// A writter for persisting data to disk.
|
||||
writer: Arc<W>,
|
||||
ctx: RequestContext,
|
||||
/// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
}
|
||||
|
||||
impl<Buf, W> FlushBackgroundTask<Buf, W>
|
||||
where
|
||||
Buf: IoBufAligned + Send + Sync,
|
||||
W: OwnedAsyncWriter + Sync + 'static,
|
||||
{
|
||||
/// Creates a new background flush task.
|
||||
fn new(
|
||||
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
|
||||
file: Arc<W>,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
ctx: RequestContext,
|
||||
) -> Self {
|
||||
FlushBackgroundTask {
|
||||
channel,
|
||||
writer: file,
|
||||
_gate_guard: gate_guard,
|
||||
ctx,
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs the background flush task.
|
||||
/// The passed in slice is immediately sent back to the flush handle through the duplex channel.
|
||||
async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
|
||||
// Sends the extra buffer back to the handle.
|
||||
self.channel.send(slice).await.map_err(|_| {
|
||||
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
|
||||
})?;
|
||||
|
||||
// Exit condition: channel is closed and there is no remaining buffer to be flushed
|
||||
while let Some(request) = self.channel.recv().await {
|
||||
#[cfg(test)]
|
||||
{
|
||||
// In test, wait for control to signal that we are ready to flush.
|
||||
if request.ready_to_flush_rx.await.is_err() {
|
||||
tracing::debug!("control dropped");
|
||||
}
|
||||
}
|
||||
|
||||
// Write slice to disk at `offset`.
|
||||
let slice = self
|
||||
.writer
|
||||
.write_all_at(request.slice, request.offset, &self.ctx)
|
||||
.await?;
|
||||
|
||||
#[cfg(test)]
|
||||
{
|
||||
// In test, tell control we are done flushing buffer.
|
||||
if request.done_flush_tx.send(()).is_err() {
|
||||
tracing::debug!("control dropped");
|
||||
}
|
||||
}
|
||||
|
||||
// Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
|
||||
if self.channel.send(slice).await.is_err() {
|
||||
// Although channel is closed. Still need to finish flushing the remaining buffers.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(self.writer)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) struct FlushNotStarted {
|
||||
ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
|
||||
done_flush_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) struct FlushInProgress {
|
||||
done_flush_rx: tokio::sync::oneshot::Receiver<()>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) struct FlushDone;
|
||||
|
||||
#[cfg(test)]
|
||||
impl FlushNotStarted {
|
||||
/// Signals the background task the buffer is ready to flush to disk.
|
||||
pub fn ready_to_flush(self) -> FlushInProgress {
|
||||
self.ready_to_flush_tx
|
||||
.send(())
|
||||
.map(|_| FlushInProgress {
|
||||
done_flush_rx: self.done_flush_rx,
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl FlushInProgress {
|
||||
/// Waits until background flush is done.
|
||||
pub async fn wait_until_flush_is_done(self) -> FlushDone {
|
||||
self.done_flush_rx.await.unwrap();
|
||||
FlushDone
|
||||
}
|
||||
}
|
||||
@@ -20,7 +20,7 @@
|
||||
|
||||
#define LS_MONITOR_CHECK_INTERVAL 10000 /* ms */
|
||||
|
||||
static int logical_replication_max_snap_files = 300;
|
||||
static int logical_replication_max_snap_files = 10000;
|
||||
|
||||
/*
|
||||
* According to Chi (shyzh), the pageserver _should_ be good with 10 MB worth of
|
||||
@@ -184,7 +184,7 @@ InitLogicalReplicationMonitor(void)
|
||||
"Maximum allowed logical replication .snap files. When exceeded, slots are dropped until the limit is met. -1 disables the limit.",
|
||||
NULL,
|
||||
&logical_replication_max_snap_files,
|
||||
300, -1, INT_MAX,
|
||||
10000, -1, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::Arc;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
|
||||
use pq_proto::CancelKeyData;
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
@@ -17,9 +18,6 @@ use crate::rate_limiter::LeakyBucketRateLimiter;
|
||||
use crate::redis::cancellation_publisher::{
|
||||
CancellationPublisher, CancellationPublisherMut, RedisPublisherClient,
|
||||
};
|
||||
use std::net::IpAddr;
|
||||
|
||||
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
|
||||
|
||||
pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;
|
||||
pub type CancellationHandlerMain = CancellationHandler<Option<Arc<Mutex<RedisPublisherClient>>>>;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::TryFutureExt;
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, Instrument};
|
||||
@@ -88,40 +88,37 @@ pub async fn task_main(
|
||||
crate::metrics::Protocol::Tcp,
|
||||
&config.region,
|
||||
);
|
||||
let span = ctx.span();
|
||||
|
||||
let startup = Box::pin(
|
||||
handle_client(
|
||||
config,
|
||||
backend,
|
||||
&ctx,
|
||||
cancellation_handler,
|
||||
socket,
|
||||
conn_gauge,
|
||||
)
|
||||
.instrument(span.clone()),
|
||||
);
|
||||
let res = startup.await;
|
||||
let res = handle_client(
|
||||
config,
|
||||
backend,
|
||||
&ctx,
|
||||
cancellation_handler,
|
||||
socket,
|
||||
conn_gauge,
|
||||
)
|
||||
.instrument(ctx.span())
|
||||
.boxed()
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Err(e) => {
|
||||
// todo: log and push to ctx the error kind
|
||||
ctx.set_error_kind(e.get_error_kind());
|
||||
error!(parent: &span, "per-client task finished with an error: {e:#}");
|
||||
error!(parent: &ctx.span(), "per-client task finished with an error: {e:#}");
|
||||
}
|
||||
Ok(None) => {
|
||||
ctx.set_success();
|
||||
}
|
||||
Ok(Some(p)) => {
|
||||
ctx.set_success();
|
||||
ctx.log_connect();
|
||||
match p.proxy_pass().instrument(span.clone()).await {
|
||||
let _disconnect = ctx.log_connect();
|
||||
match p.proxy_pass().await {
|
||||
Ok(()) => {}
|
||||
Err(ErrorSource::Client(e)) => {
|
||||
error!(parent: &span, "per-client task finished with an IO error from the client: {e:#}");
|
||||
error!(?session_id, "per-client task finished with an IO error from the client: {e:#}");
|
||||
}
|
||||
Err(ErrorSource::Compute(e)) => {
|
||||
error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}");
|
||||
error!(?session_id, "per-client task finished with an IO error from the compute: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -219,6 +216,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
client: stream,
|
||||
aux: node.aux.clone(),
|
||||
compute: node,
|
||||
session_id: ctx.session_id(),
|
||||
_req: request_gauge,
|
||||
_conn: conn_gauge,
|
||||
_cancel: session,
|
||||
|
||||
@@ -272,11 +272,14 @@ impl RequestContext {
|
||||
this.success = true;
|
||||
}
|
||||
|
||||
pub fn log_connect(&self) {
|
||||
self.0
|
||||
.try_lock()
|
||||
.expect("should not deadlock")
|
||||
.log_connect();
|
||||
pub fn log_connect(self) -> DisconnectLogger {
|
||||
let mut this = self.0.into_inner();
|
||||
this.log_connect();
|
||||
|
||||
// close current span.
|
||||
this.span = Span::none();
|
||||
|
||||
DisconnectLogger(this)
|
||||
}
|
||||
|
||||
pub(crate) fn protocol(&self) -> Protocol {
|
||||
@@ -434,8 +437,14 @@ impl Drop for RequestContextInner {
|
||||
fn drop(&mut self) {
|
||||
if self.sender.is_some() {
|
||||
self.log_connect();
|
||||
} else {
|
||||
self.log_disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DisconnectLogger(RequestContextInner);
|
||||
|
||||
impl Drop for DisconnectLogger {
|
||||
fn drop(&mut self) {
|
||||
self.0.log_disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,7 +114,7 @@ impl MockControlPlane {
|
||||
|
||||
Ok((secret, allowed_ips))
|
||||
}
|
||||
.map_err(crate::error::log_error::<GetAuthInfoError>)
|
||||
.inspect_err(|e: &GetAuthInfoError| tracing::error!("{e}"))
|
||||
.instrument(info_span!("postgres", url = self.endpoint.as_str()))
|
||||
.await?;
|
||||
Ok(AuthInfo {
|
||||
|
||||
@@ -134,8 +134,8 @@ impl NeonControlPlaneClient {
|
||||
project_id: body.project_id,
|
||||
})
|
||||
}
|
||||
.map_err(crate::error::log_error)
|
||||
.instrument(info_span!("http", id = request_id))
|
||||
.inspect_err(|e| tracing::debug!(error = ?e))
|
||||
.instrument(info_span!("do_get_auth_info"))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -193,8 +193,8 @@ impl NeonControlPlaneClient {
|
||||
|
||||
Ok(rules)
|
||||
}
|
||||
.map_err(crate::error::log_error)
|
||||
.instrument(info_span!("http", id = request_id))
|
||||
.inspect_err(|e| tracing::debug!(error = ?e))
|
||||
.instrument(info_span!("do_get_endpoint_jwks"))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -252,9 +252,8 @@ impl NeonControlPlaneClient {
|
||||
|
||||
Ok(node)
|
||||
}
|
||||
.map_err(crate::error::log_error)
|
||||
// TODO: redo this span stuff
|
||||
.instrument(info_span!("http", id = request_id))
|
||||
.inspect_err(|e| tracing::debug!(error = ?e))
|
||||
.instrument(info_span!("do_wake_compute"))
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,12 +10,6 @@ pub(crate) fn io_error(e: impl Into<Box<dyn StdError + Send + Sync>>) -> io::Err
|
||||
io::Error::new(io::ErrorKind::Other, e)
|
||||
}
|
||||
|
||||
/// A small combinator for pluggable error logging.
|
||||
pub(crate) fn log_error<E: fmt::Display>(e: E) -> E {
|
||||
tracing::error!("{e}");
|
||||
e
|
||||
}
|
||||
|
||||
/// Marks errors that may be safely shown to a client.
|
||||
/// This trait can be seen as a specialized version of [`ToString`].
|
||||
///
|
||||
|
||||
@@ -10,7 +10,7 @@ pub(crate) mod wake_compute;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use copy_bidirectional::{copy_bidirectional_client_compute, ErrorSource};
|
||||
use futures::TryFutureExt;
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pq_proto::{BeMessage as Be, StartupMessageParams};
|
||||
@@ -123,42 +123,39 @@ pub async fn task_main(
|
||||
crate::metrics::Protocol::Tcp,
|
||||
&config.region,
|
||||
);
|
||||
let span = ctx.span();
|
||||
|
||||
let startup = Box::pin(
|
||||
handle_client(
|
||||
config,
|
||||
auth_backend,
|
||||
&ctx,
|
||||
cancellation_handler,
|
||||
socket,
|
||||
ClientMode::Tcp,
|
||||
endpoint_rate_limiter2,
|
||||
conn_gauge,
|
||||
)
|
||||
.instrument(span.clone()),
|
||||
);
|
||||
let res = startup.await;
|
||||
let res = handle_client(
|
||||
config,
|
||||
auth_backend,
|
||||
&ctx,
|
||||
cancellation_handler,
|
||||
socket,
|
||||
ClientMode::Tcp,
|
||||
endpoint_rate_limiter2,
|
||||
conn_gauge,
|
||||
)
|
||||
.instrument(ctx.span())
|
||||
.boxed()
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Err(e) => {
|
||||
// todo: log and push to ctx the error kind
|
||||
ctx.set_error_kind(e.get_error_kind());
|
||||
warn!(parent: &span, "per-client task finished with an error: {e:#}");
|
||||
warn!(parent: &ctx.span(), "per-client task finished with an error: {e:#}");
|
||||
}
|
||||
Ok(None) => {
|
||||
ctx.set_success();
|
||||
}
|
||||
Ok(Some(p)) => {
|
||||
ctx.set_success();
|
||||
ctx.log_connect();
|
||||
match p.proxy_pass().instrument(span.clone()).await {
|
||||
let _disconnect = ctx.log_connect();
|
||||
match p.proxy_pass().await {
|
||||
Ok(()) => {}
|
||||
Err(ErrorSource::Client(e)) => {
|
||||
warn!(parent: &span, "per-client task finished with an IO error from the client: {e:#}");
|
||||
warn!(?session_id, "per-client task finished with an IO error from the client: {e:#}");
|
||||
}
|
||||
Err(ErrorSource::Compute(e)) => {
|
||||
error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}");
|
||||
error!(?session_id, "per-client task finished with an IO error from the compute: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -352,6 +349,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
client: stream,
|
||||
aux: node.aux.clone(),
|
||||
compute: node,
|
||||
session_id: ctx.session_id(),
|
||||
_req: request_gauge,
|
||||
_conn: conn_gauge,
|
||||
_cancel: session,
|
||||
|
||||
@@ -59,6 +59,7 @@ pub(crate) struct ProxyPassthrough<P, S> {
|
||||
pub(crate) client: Stream<S>,
|
||||
pub(crate) compute: PostgresConnection,
|
||||
pub(crate) aux: MetricsAuxInfo,
|
||||
pub(crate) session_id: uuid::Uuid,
|
||||
|
||||
pub(crate) _req: NumConnectionRequestsGuard<'static>,
|
||||
pub(crate) _conn: NumClientConnectionsGuard<'static>,
|
||||
@@ -69,7 +70,7 @@ impl<P, S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<P, S> {
|
||||
pub(crate) async fn proxy_pass(self) -> Result<(), ErrorSource> {
|
||||
let res = proxy_pass(self.client, self.compute.stream, self.aux).await;
|
||||
if let Err(err) = self.compute.cancel_closure.try_cancel_query().await {
|
||||
tracing::warn!(?err, "could not cancel the query in the database");
|
||||
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{error, info};
|
||||
|
||||
use super::connect_compute::ComputeConnectBackend;
|
||||
use crate::config::RetryConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::errors::WakeComputeError;
|
||||
use crate::control_plane::errors::{ControlPlaneError, WakeComputeError};
|
||||
use crate::control_plane::CachedNodeInfo;
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{
|
||||
@@ -11,6 +11,18 @@ use crate::metrics::{
|
||||
};
|
||||
use crate::proxy::retry::{retry_after, should_retry};
|
||||
|
||||
// Use macro to retain original callsite.
|
||||
macro_rules! log_wake_compute_error {
|
||||
(error = ?$error:expr, $num_retries:expr, retriable = $retriable:literal) => {
|
||||
match $error {
|
||||
WakeComputeError::ControlPlane(ControlPlaneError::Message(_)) => {
|
||||
info!(error = ?$error, num_retries = $num_retries, retriable = $retriable, "couldn't wake compute node")
|
||||
}
|
||||
_ => error!(error = ?$error, num_retries = $num_retries, retriable = $retriable, "couldn't wake compute node"),
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
|
||||
num_retries: &mut u32,
|
||||
ctx: &RequestContext,
|
||||
@@ -20,7 +32,7 @@ pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
|
||||
loop {
|
||||
match api.wake_compute(ctx).await {
|
||||
Err(e) if !should_retry(&e, *num_retries, config) => {
|
||||
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
|
||||
log_wake_compute_error!(error = ?e, num_retries, retriable = false);
|
||||
report_error(&e, false);
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
@@ -32,7 +44,7 @@ pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
|
||||
return Err(e);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
|
||||
log_wake_compute_error!(error = ?e, num_retries, retriable = true);
|
||||
report_error(&e, true);
|
||||
}
|
||||
Ok(n) => {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use core::net::IpAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use core::net::IpAddr;
|
||||
use pq_proto::CancelKeyData;
|
||||
use redis::AsyncCommands;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
@@ -28,6 +28,7 @@ hyper0.workspace = true
|
||||
futures.workspace = true
|
||||
once_cell.workspace = true
|
||||
parking_lot.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
postgres.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
pprof.workspace = true
|
||||
@@ -58,6 +59,7 @@ sd-notify.workspace = true
|
||||
storage_broker.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
utils.workspace = true
|
||||
wal_decoder.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
|
||||
@@ -2,11 +2,15 @@
|
||||
//! protocol commands.
|
||||
|
||||
use anyhow::Context;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
|
||||
use std::future::Future;
|
||||
use std::str::{self, FromStr};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{debug, info, info_span, Instrument};
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
use utils::shard::{ShardCount, ShardNumber};
|
||||
|
||||
use crate::auth::check_permission;
|
||||
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
|
||||
@@ -35,6 +39,8 @@ pub struct SafekeeperPostgresHandler {
|
||||
pub tenant_id: Option<TenantId>,
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
pub ttid: TenantTimelineId,
|
||||
pub shard: Option<ShardIdentity>,
|
||||
pub protocol: Option<PostgresClientProtocol>,
|
||||
/// Unique connection id is logged in spans for observability.
|
||||
pub conn_id: ConnectionId,
|
||||
/// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
|
||||
@@ -107,11 +113,21 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
) -> Result<(), QueryError> {
|
||||
if let FeStartupPacket::StartupMessage { params, .. } = sm {
|
||||
if let Some(options) = params.options_raw() {
|
||||
let mut shard_count: Option<u8> = None;
|
||||
let mut shard_number: Option<u8> = None;
|
||||
let mut shard_stripe_size: Option<u32> = None;
|
||||
|
||||
for opt in options {
|
||||
// FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy,
|
||||
// remove these after the PR gets deployed:
|
||||
// https://github.com/neondatabase/neon/pull/2433#discussion_r970005064
|
||||
match opt.split_once('=') {
|
||||
Some(("protocol", value)) => {
|
||||
self.protocol =
|
||||
Some(serde_json::from_str(value).with_context(|| {
|
||||
format!("Failed to parse {value} as protocol")
|
||||
})?);
|
||||
}
|
||||
Some(("ztenantid", value)) | Some(("tenant_id", value)) => {
|
||||
self.tenant_id = Some(value.parse().with_context(|| {
|
||||
format!("Failed to parse {value} as tenant id")
|
||||
@@ -127,9 +143,54 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
metrics.set_client_az(client_az)
|
||||
}
|
||||
}
|
||||
Some(("shard_count", value)) => {
|
||||
shard_count = Some(value.parse::<u8>().with_context(|| {
|
||||
format!("Failed to parse {value} as shard count")
|
||||
})?);
|
||||
}
|
||||
Some(("shard_number", value)) => {
|
||||
shard_number = Some(value.parse::<u8>().with_context(|| {
|
||||
format!("Failed to parse {value} as shard number")
|
||||
})?);
|
||||
}
|
||||
Some(("shard_stripe_size", value)) => {
|
||||
shard_stripe_size = Some(value.parse::<u32>().with_context(|| {
|
||||
format!("Failed to parse {value} as shard stripe size")
|
||||
})?);
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
match self.protocol() {
|
||||
PostgresClientProtocol::Vanilla => {
|
||||
if shard_count.is_some()
|
||||
|| shard_number.is_some()
|
||||
|| shard_stripe_size.is_some()
|
||||
{
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"Shard params specified for vanilla protocol"
|
||||
)));
|
||||
}
|
||||
}
|
||||
PostgresClientProtocol::Interpreted { .. } => {
|
||||
match (shard_count, shard_number, shard_stripe_size) {
|
||||
(Some(count), Some(number), Some(stripe_size)) => {
|
||||
let params = ShardParameters {
|
||||
count: ShardCount(count),
|
||||
stripe_size: ShardStripeSize(stripe_size),
|
||||
};
|
||||
self.shard =
|
||||
Some(ShardIdentity::from_params(ShardNumber(number), ¶ms));
|
||||
}
|
||||
_ => {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"Shard params were not specified"
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(app_name) = params.get("application_name") {
|
||||
@@ -150,6 +211,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
tracing::field::debug(self.appname.clone()),
|
||||
);
|
||||
|
||||
if let Some(shard) = self.shard.as_ref() {
|
||||
tracing::Span::current()
|
||||
.record("shard", tracing::field::display(shard.shard_slug()));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
Err(QueryError::Other(anyhow::anyhow!(
|
||||
@@ -258,6 +324,8 @@ impl SafekeeperPostgresHandler {
|
||||
tenant_id: None,
|
||||
timeline_id: None,
|
||||
ttid: TenantTimelineId::empty(),
|
||||
shard: None,
|
||||
protocol: None,
|
||||
conn_id,
|
||||
claims: None,
|
||||
auth,
|
||||
@@ -265,6 +333,10 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn protocol(&self) -> PostgresClientProtocol {
|
||||
self.protocol.unwrap_or(PostgresClientProtocol::Vanilla)
|
||||
}
|
||||
|
||||
// when accessing management api supply None as an argument
|
||||
// when using to authorize tenant pass corresponding tenant id
|
||||
fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
|
||||
|
||||
@@ -29,6 +29,7 @@ pub mod receive_wal;
|
||||
pub mod recovery;
|
||||
pub mod remove_wal;
|
||||
pub mod safekeeper;
|
||||
pub mod send_interpreted_wal;
|
||||
pub mod send_wal;
|
||||
pub mod state;
|
||||
pub mod timeline;
|
||||
@@ -38,6 +39,7 @@ pub mod timeline_manager;
|
||||
pub mod timelines_set;
|
||||
pub mod wal_backup;
|
||||
pub mod wal_backup_partial;
|
||||
pub mod wal_reader_stream;
|
||||
pub mod wal_service;
|
||||
pub mod wal_storage;
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ use tokio::{
|
||||
use tokio_postgres::replication::ReplicationStream;
|
||||
use tokio_postgres::types::PgLsn;
|
||||
use tracing::*;
|
||||
use utils::postgres_client::{ConnectionConfigArgs, PostgresClientProtocol};
|
||||
use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config};
|
||||
|
||||
use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE};
|
||||
@@ -325,7 +326,17 @@ async fn recovery_stream(
|
||||
conf: &SafeKeeperConf,
|
||||
) -> anyhow::Result<String> {
|
||||
// TODO: pass auth token
|
||||
let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?;
|
||||
let connection_conf_args = ConnectionConfigArgs {
|
||||
protocol: PostgresClientProtocol::Vanilla,
|
||||
ttid: tli.ttid,
|
||||
shard_number: None,
|
||||
shard_count: None,
|
||||
shard_stripe_size: None,
|
||||
listen_pg_addr_str: &donor.pg_connstr,
|
||||
auth_token: None,
|
||||
availability_zone: None,
|
||||
};
|
||||
let cfg = wal_stream_connection_config(connection_conf_args)?;
|
||||
let mut cfg = cfg.to_tokio_postgres_config();
|
||||
// It will make safekeeper give out not committed WAL (up to flush_lsn).
|
||||
cfg.application_name(&format!("safekeeper_{}", conf.my_id));
|
||||
|
||||
148
safekeeper/src/send_interpreted_wal.rs
Normal file
148
safekeeper/src/send_interpreted_wal.rs
Normal file
@@ -0,0 +1,148 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
|
||||
use postgres_ffi::MAX_SEND_SIZE;
|
||||
use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
|
||||
use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::time::MissedTickBehavior;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::postgres_client::Compression;
|
||||
use utils::postgres_client::InterpretedFormat;
|
||||
use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords};
|
||||
use wal_decoder::wire_format::ToWireFormat;
|
||||
|
||||
use crate::send_wal::EndWatchView;
|
||||
use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
|
||||
|
||||
/// Shard-aware interpreted record sender.
|
||||
/// This is used for sending WAL to the pageserver. Said WAL
|
||||
/// is pre-interpreted and filtered for the shard.
|
||||
pub(crate) struct InterpretedWalSender<'a, IO> {
|
||||
pub(crate) format: InterpretedFormat,
|
||||
pub(crate) compression: Option<Compression>,
|
||||
pub(crate) pgb: &'a mut PostgresBackend<IO>,
|
||||
pub(crate) wal_stream_builder: WalReaderStreamBuilder,
|
||||
pub(crate) end_watch_view: EndWatchView,
|
||||
pub(crate) shard: ShardIdentity,
|
||||
pub(crate) pg_version: u32,
|
||||
pub(crate) appname: Option<String>,
|
||||
}
|
||||
|
||||
struct Batch {
|
||||
wal_end_lsn: Lsn,
|
||||
available_wal_end_lsn: Lsn,
|
||||
records: InterpretedWalRecords,
|
||||
}
|
||||
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
/// Send interpreted WAL to a receiver.
|
||||
/// Stops when an error occurs or the receiver is caught up and there's no active compute.
|
||||
///
|
||||
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
|
||||
/// convenience.
|
||||
pub(crate) async fn run(self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
let mut wal_position = self.wal_stream_builder.start_pos();
|
||||
let mut wal_decoder =
|
||||
WalStreamDecoder::new(self.wal_stream_builder.start_pos(), self.pg_version);
|
||||
|
||||
let stream = self.wal_stream_builder.build(MAX_SEND_SIZE).await?;
|
||||
let mut stream = std::pin::pin!(stream);
|
||||
|
||||
let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
|
||||
keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
keepalive_ticker.reset();
|
||||
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel::<Batch>(2);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Get some WAL from the stream and then: decode, interpret and push it down the
|
||||
// pipeline.
|
||||
wal = stream.next(), if tx.capacity() > 0 => {
|
||||
let WalBytes { wal, wal_start_lsn: _, wal_end_lsn, available_wal_end_lsn } = match wal {
|
||||
Some(some) => some?,
|
||||
None => { break; }
|
||||
};
|
||||
|
||||
wal_position = wal_end_lsn;
|
||||
wal_decoder.feed_bytes(&wal);
|
||||
|
||||
let mut records = Vec::new();
|
||||
let mut max_next_record_lsn = None;
|
||||
while let Some((next_record_lsn, recdata)) = wal_decoder
|
||||
.poll_decode()
|
||||
.with_context(|| "Failed to decode WAL")?
|
||||
{
|
||||
assert!(next_record_lsn.is_aligned());
|
||||
max_next_record_lsn = Some(next_record_lsn);
|
||||
|
||||
// Deserialize and interpret WAL record
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
&self.shard,
|
||||
next_record_lsn,
|
||||
self.pg_version,
|
||||
)
|
||||
.with_context(|| "Failed to interpret WAL")?;
|
||||
|
||||
if !interpreted.is_empty() {
|
||||
records.push(interpreted);
|
||||
}
|
||||
}
|
||||
|
||||
let batch = InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: max_next_record_lsn
|
||||
};
|
||||
|
||||
tx.send(Batch {wal_end_lsn, available_wal_end_lsn, records: batch}).await.unwrap();
|
||||
},
|
||||
// For a previously interpreted batch, serialize it and push it down the wire.
|
||||
batch = rx.recv() => {
|
||||
let batch = match batch {
|
||||
Some(b) => b,
|
||||
None => { break; }
|
||||
};
|
||||
|
||||
let buf = batch
|
||||
.records
|
||||
.to_wire(self.format, self.compression)
|
||||
.await
|
||||
.with_context(|| "Failed to serialize interpreted WAL")
|
||||
.map_err(CopyStreamHandlerEnd::from)?;
|
||||
|
||||
// Reset the keep alive ticker since we are sending something
|
||||
// over the wire now.
|
||||
keepalive_ticker.reset();
|
||||
|
||||
self.pgb
|
||||
.write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
|
||||
streaming_lsn: batch.wal_end_lsn.0,
|
||||
commit_lsn: batch.available_wal_end_lsn.0,
|
||||
data: &buf,
|
||||
})).await?;
|
||||
}
|
||||
// Send a periodic keep alive when the connection has been idle for a while.
|
||||
_ = keepalive_ticker.tick() => {
|
||||
self.pgb
|
||||
.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
|
||||
wal_end: self.end_watch_view.get().0,
|
||||
timestamp: get_current_timestamp(),
|
||||
request_reply: true,
|
||||
}))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The loop above ends when the receiver is caught up and there's no more WAL to send.
|
||||
Err(CopyStreamHandlerEnd::ServerInitiated(format!(
|
||||
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
|
||||
self.appname, wal_position,
|
||||
)))
|
||||
}
|
||||
}
|
||||
@@ -5,12 +5,15 @@ use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::metrics::RECEIVED_PS_FEEDBACKS;
|
||||
use crate::receive_wal::WalReceivers;
|
||||
use crate::safekeeper::{Term, TermLsn};
|
||||
use crate::send_interpreted_wal::InterpretedWalSender;
|
||||
use crate::timeline::WalResidentTimeline;
|
||||
use crate::wal_reader_stream::WalReaderStreamBuilder;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::wal_storage::WalReader;
|
||||
use crate::GlobalTimelines;
|
||||
use anyhow::{bail, Context as AnyhowContext};
|
||||
use bytes::Bytes;
|
||||
use futures::future::Either;
|
||||
use parking_lot::Mutex;
|
||||
use postgres_backend::PostgresBackend;
|
||||
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
|
||||
@@ -22,6 +25,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use utils::failpoint_support;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
use std::cmp::{max, min};
|
||||
use std::net::SocketAddr;
|
||||
@@ -226,7 +230,7 @@ impl WalSenders {
|
||||
|
||||
/// Get remote_consistent_lsn reported by the pageserver. Returns None if
|
||||
/// client is not pageserver.
|
||||
fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
|
||||
pub fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
|
||||
let shared = self.mutex.lock();
|
||||
let slot = shared.get_slot(id);
|
||||
match slot.feedback {
|
||||
@@ -370,6 +374,16 @@ pub struct WalSenderGuard {
|
||||
walsenders: Arc<WalSenders>,
|
||||
}
|
||||
|
||||
impl WalSenderGuard {
|
||||
pub fn id(&self) -> WalSenderId {
|
||||
self.id
|
||||
}
|
||||
|
||||
pub fn walsenders(&self) -> &Arc<WalSenders> {
|
||||
&self.walsenders
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WalSenderGuard {
|
||||
fn drop(&mut self) {
|
||||
self.walsenders.unregister(self.id);
|
||||
@@ -440,11 +454,12 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
|
||||
info!(
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={:?}",
|
||||
start_pos,
|
||||
end_pos,
|
||||
matches!(end_watch, EndWatch::Flush(_)),
|
||||
appname
|
||||
appname,
|
||||
self.protocol(),
|
||||
);
|
||||
|
||||
// switch to copy
|
||||
@@ -456,21 +471,56 @@ impl SafekeeperPostgresHandler {
|
||||
// not synchronized with sends, so this avoids deadlocks.
|
||||
let reader = pgb.split().context("START_REPLICATION split")?;
|
||||
|
||||
let send_fut = match self.protocol() {
|
||||
PostgresClientProtocol::Vanilla => {
|
||||
let sender = WalSender {
|
||||
pgb,
|
||||
// should succeed since we're already holding another guard
|
||||
tli: tli.wal_residence_guard().await?,
|
||||
appname,
|
||||
start_pos,
|
||||
end_pos,
|
||||
term,
|
||||
end_watch,
|
||||
ws_guard: ws_guard.clone(),
|
||||
wal_reader,
|
||||
send_buf: vec![0u8; MAX_SEND_SIZE],
|
||||
};
|
||||
|
||||
Either::Left(sender.run())
|
||||
}
|
||||
PostgresClientProtocol::Interpreted {
|
||||
format,
|
||||
compression,
|
||||
} => {
|
||||
let pg_version = tli.tli.get_state().await.1.server.pg_version / 10000;
|
||||
let end_watch_view = end_watch.view();
|
||||
let wal_stream_builder = WalReaderStreamBuilder {
|
||||
tli: tli.wal_residence_guard().await?,
|
||||
start_pos,
|
||||
end_pos,
|
||||
term,
|
||||
end_watch,
|
||||
wal_sender_guard: ws_guard.clone(),
|
||||
};
|
||||
|
||||
let sender = InterpretedWalSender {
|
||||
format,
|
||||
compression,
|
||||
pgb,
|
||||
wal_stream_builder,
|
||||
end_watch_view,
|
||||
shard: self.shard.unwrap(),
|
||||
pg_version,
|
||||
appname,
|
||||
};
|
||||
|
||||
Either::Right(sender.run())
|
||||
}
|
||||
};
|
||||
|
||||
let tli_cancel = tli.cancel.clone();
|
||||
|
||||
let mut sender = WalSender {
|
||||
pgb,
|
||||
// should succeed since we're already holding another guard
|
||||
tli: tli.wal_residence_guard().await?,
|
||||
appname,
|
||||
start_pos,
|
||||
end_pos,
|
||||
term,
|
||||
end_watch,
|
||||
ws_guard: ws_guard.clone(),
|
||||
wal_reader,
|
||||
send_buf: vec![0u8; MAX_SEND_SIZE],
|
||||
};
|
||||
let mut reply_reader = ReplyReader {
|
||||
reader,
|
||||
ws_guard: ws_guard.clone(),
|
||||
@@ -479,7 +529,7 @@ impl SafekeeperPostgresHandler {
|
||||
|
||||
let res = tokio::select! {
|
||||
// todo: add read|write .context to these errors
|
||||
r = sender.run() => r,
|
||||
r = send_fut => r,
|
||||
r = reply_reader.run() => r,
|
||||
_ = tli_cancel.cancelled() => {
|
||||
return Err(CopyStreamHandlerEnd::Cancelled);
|
||||
@@ -504,16 +554,22 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/// TODO(vlad): maybe lift this instead
|
||||
/// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
|
||||
/// given term (recovery by walproposer or peer safekeeper).
|
||||
enum EndWatch {
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum EndWatch {
|
||||
Commit(Receiver<Lsn>),
|
||||
Flush(Receiver<TermLsn>),
|
||||
}
|
||||
|
||||
impl EndWatch {
|
||||
pub(crate) fn view(&self) -> EndWatchView {
|
||||
EndWatchView(self.clone())
|
||||
}
|
||||
|
||||
/// Get current end of WAL.
|
||||
fn get(&self) -> Lsn {
|
||||
pub(crate) fn get(&self) -> Lsn {
|
||||
match self {
|
||||
EndWatch::Commit(r) => *r.borrow(),
|
||||
EndWatch::Flush(r) => r.borrow().lsn,
|
||||
@@ -521,15 +577,44 @@ impl EndWatch {
|
||||
}
|
||||
|
||||
/// Wait for the update.
|
||||
async fn changed(&mut self) -> anyhow::Result<()> {
|
||||
pub(crate) async fn changed(&mut self) -> anyhow::Result<()> {
|
||||
match self {
|
||||
EndWatch::Commit(r) => r.changed().await?,
|
||||
EndWatch::Flush(r) => r.changed().await?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for_lsn(
|
||||
&mut self,
|
||||
lsn: Lsn,
|
||||
client_term: Option<Term>,
|
||||
) -> anyhow::Result<Lsn> {
|
||||
loop {
|
||||
let end_pos = self.get();
|
||||
if end_pos > lsn {
|
||||
return Ok(end_pos);
|
||||
}
|
||||
if let EndWatch::Flush(rx) = &self {
|
||||
let curr_term = rx.borrow().term;
|
||||
if let Some(client_term) = client_term {
|
||||
if curr_term != client_term {
|
||||
bail!("term changed: requested {}, now {}", client_term, curr_term);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.changed().await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct EndWatchView(EndWatch);
|
||||
|
||||
impl EndWatchView {
|
||||
pub(crate) fn get(&self) -> Lsn {
|
||||
self.0.get()
|
||||
}
|
||||
}
|
||||
/// A half driving sending WAL.
|
||||
struct WalSender<'a, IO> {
|
||||
pgb: &'a mut PostgresBackend<IO>,
|
||||
@@ -566,7 +651,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
///
|
||||
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
|
||||
/// convenience.
|
||||
async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
loop {
|
||||
// Wait for the next portion if it is not there yet, or just
|
||||
// update our end of WAL available for sending value, we
|
||||
|
||||
149
safekeeper/src/wal_reader_stream.rs
Normal file
149
safekeeper/src/wal_reader_stream.rs
Normal file
@@ -0,0 +1,149 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_stream::try_stream;
|
||||
use bytes::Bytes;
|
||||
use futures::Stream;
|
||||
use postgres_backend::CopyStreamHandlerEnd;
|
||||
use std::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
safekeeper::Term,
|
||||
send_wal::{EndWatch, WalSenderGuard},
|
||||
timeline::WalResidentTimeline,
|
||||
};
|
||||
|
||||
pub(crate) struct WalReaderStreamBuilder {
|
||||
pub(crate) tli: WalResidentTimeline,
|
||||
pub(crate) start_pos: Lsn,
|
||||
pub(crate) end_pos: Lsn,
|
||||
pub(crate) term: Option<Term>,
|
||||
pub(crate) end_watch: EndWatch,
|
||||
pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
|
||||
}
|
||||
|
||||
impl WalReaderStreamBuilder {
|
||||
pub(crate) fn start_pos(&self) -> Lsn {
|
||||
self.start_pos
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct WalBytes {
|
||||
/// Raw PG WAL
|
||||
pub(crate) wal: Bytes,
|
||||
/// Start LSN of [`Self::wal`]
|
||||
#[allow(dead_code)]
|
||||
pub(crate) wal_start_lsn: Lsn,
|
||||
/// End LSN of [`Self::wal`]
|
||||
pub(crate) wal_end_lsn: Lsn,
|
||||
/// End LSN of WAL available on the safekeeper.
|
||||
///
|
||||
/// For pagservers this will be commit LSN,
|
||||
/// while for the compute it will be the flush LSN.
|
||||
pub(crate) available_wal_end_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl WalReaderStreamBuilder {
|
||||
/// Builds a stream of Postgres WAL starting from [`Self::start_pos`].
|
||||
/// The stream terminates when the receiver (pageserver) is fully caught up
|
||||
/// and there's no active computes.
|
||||
pub(crate) async fn build(
|
||||
self,
|
||||
buffer_size: usize,
|
||||
) -> anyhow::Result<impl Stream<Item = Result<WalBytes, CopyStreamHandlerEnd>>> {
|
||||
// TODO(vlad): The code below duplicates functionality from [`crate::send_wal`].
|
||||
// We can make the raw WAL sender use this stream too and remove the duplication.
|
||||
let Self {
|
||||
tli,
|
||||
mut start_pos,
|
||||
mut end_pos,
|
||||
term,
|
||||
mut end_watch,
|
||||
wal_sender_guard,
|
||||
} = self;
|
||||
let mut wal_reader = tli.get_walreader(start_pos).await?;
|
||||
let mut buffer = vec![0; buffer_size];
|
||||
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
Ok(try_stream! {
|
||||
loop {
|
||||
let have_something_to_send = end_pos > start_pos;
|
||||
|
||||
if !have_something_to_send {
|
||||
// wait for lsn
|
||||
let res = timeout(POLL_STATE_TIMEOUT, end_watch.wait_for_lsn(start_pos, term)).await;
|
||||
match res {
|
||||
Ok(ok) => {
|
||||
end_pos = ok?;
|
||||
},
|
||||
Err(_) => {
|
||||
if let EndWatch::Commit(_) = end_watch {
|
||||
if let Some(remote_consistent_lsn) = wal_sender_guard
|
||||
.walsenders()
|
||||
.get_ws_remote_consistent_lsn(wal_sender_guard.id())
|
||||
{
|
||||
if tli.should_walsender_stop(remote_consistent_lsn).await {
|
||||
// Stop streaming if the receivers are caught up and
|
||||
// there's no active compute. This causes the loop in
|
||||
// [`crate::send_interpreted_wal::InterpretedWalSender::run`]
|
||||
// to exit and terminate the WAL stream.
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
assert!(
|
||||
end_pos > start_pos,
|
||||
"nothing to send after waiting for WAL"
|
||||
);
|
||||
|
||||
// try to send as much as available, capped by the buffer size
|
||||
let mut chunk_end_pos = start_pos + buffer_size as u64;
|
||||
// if we went behind available WAL, back off
|
||||
if chunk_end_pos >= end_pos {
|
||||
chunk_end_pos = end_pos;
|
||||
} else {
|
||||
// If sending not up to end pos, round down to page boundary to
|
||||
// avoid breaking WAL record not at page boundary, as protocol
|
||||
// demands. See walsender.c (XLogSendPhysical).
|
||||
chunk_end_pos = chunk_end_pos
|
||||
.checked_sub(chunk_end_pos.block_offset())
|
||||
.unwrap();
|
||||
}
|
||||
let send_size = (chunk_end_pos.0 - start_pos.0) as usize;
|
||||
let buffer = &mut buffer[..send_size];
|
||||
let send_size: usize;
|
||||
{
|
||||
// If uncommitted part is being pulled, check that the term is
|
||||
// still the expected one.
|
||||
let _term_guard = if let Some(t) = term {
|
||||
Some(tli.acquire_term(t).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Read WAL into buffer. send_size can be additionally capped to
|
||||
// segment boundary here.
|
||||
send_size = wal_reader.read(buffer).await?
|
||||
};
|
||||
let wal = Bytes::copy_from_slice(&buffer[..send_size]);
|
||||
|
||||
yield WalBytes {
|
||||
wal,
|
||||
wal_start_lsn: start_pos,
|
||||
wal_end_lsn: start_pos + send_size as u64,
|
||||
available_wal_end_lsn: end_pos
|
||||
};
|
||||
|
||||
start_pos += send_size as u64;
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -168,6 +168,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
|
||||
"pageserver_evictions_with_low_residence_duration_total",
|
||||
"pageserver_aux_file_estimated_size",
|
||||
"pageserver_valid_lsn_lease_count",
|
||||
"pageserver_flush_wait_upload_seconds",
|
||||
counter("pageserver_tenant_throttling_count_accounted_start"),
|
||||
counter("pageserver_tenant_throttling_count_accounted_finish"),
|
||||
counter("pageserver_tenant_throttling_wait_usecs_sum"),
|
||||
|
||||
@@ -310,6 +310,31 @@ class PgProtocol:
|
||||
return self.safe_psql(query, log_query=log_query)[0][0]
|
||||
|
||||
|
||||
class PageserverWalReceiverProtocol(StrEnum):
|
||||
VANILLA = "vanilla"
|
||||
INTERPRETED = "interpreted"
|
||||
|
||||
@staticmethod
|
||||
def to_config_key_value(proto) -> tuple[str, dict[str, Any]]:
|
||||
if proto == PageserverWalReceiverProtocol.VANILLA:
|
||||
return (
|
||||
"wal_receiver_protocol",
|
||||
{
|
||||
"type": "vanilla",
|
||||
},
|
||||
)
|
||||
elif proto == PageserverWalReceiverProtocol.INTERPRETED:
|
||||
return (
|
||||
"wal_receiver_protocol",
|
||||
{
|
||||
"type": "interpreted",
|
||||
"args": {"format": "protobuf", "compression": {"zstd": {"level": 1}}},
|
||||
},
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unknown protocol type: {proto}")
|
||||
|
||||
|
||||
class NeonEnvBuilder:
|
||||
"""
|
||||
Builder object to create a Neon runtime environment
|
||||
@@ -356,6 +381,7 @@ class NeonEnvBuilder:
|
||||
safekeeper_extra_opts: list[str] | None = None,
|
||||
storage_controller_port_override: int | None = None,
|
||||
pageserver_virtual_file_io_mode: str | None = None,
|
||||
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
@@ -409,6 +435,8 @@ class NeonEnvBuilder:
|
||||
|
||||
self.pageserver_virtual_file_io_mode = pageserver_virtual_file_io_mode
|
||||
|
||||
self.pageserver_wal_receiver_protocol = pageserver_wal_receiver_protocol
|
||||
|
||||
assert test_name.startswith(
|
||||
"test_"
|
||||
), "Unexpectedly instantiated from outside a test function"
|
||||
@@ -1023,6 +1051,7 @@ class NeonEnv:
|
||||
|
||||
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
|
||||
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
|
||||
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
|
||||
|
||||
# Create the neon_local's `NeonLocalInitConf`
|
||||
cfg: dict[str, Any] = {
|
||||
@@ -1092,6 +1121,13 @@ class NeonEnv:
|
||||
if self.pageserver_virtual_file_io_mode is not None:
|
||||
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
|
||||
|
||||
if self.pageserver_wal_receiver_protocol is not None:
|
||||
key, value = PageserverWalReceiverProtocol.to_config_key_value(
|
||||
self.pageserver_wal_receiver_protocol
|
||||
)
|
||||
if key not in ps_cfg:
|
||||
ps_cfg[key] = value
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
self.pageservers.append(
|
||||
NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"])
|
||||
|
||||
@@ -15,6 +15,7 @@ Some handy pytest flags for local development:
|
||||
- `-k` selects a test to run
|
||||
- `--timeout=0` disables our default timeout of 300s (see `setup.cfg`)
|
||||
- `--preserve-database-files` to skip cleanup
|
||||
- `--out-dir` to produce a JSON with the recorded test metrics
|
||||
|
||||
# What performance tests do we have and how we run them
|
||||
|
||||
@@ -36,6 +37,6 @@ All tests run only once. Usually to obtain more consistent performance numbers,
|
||||
|
||||
## Results collection
|
||||
|
||||
Local test results for main branch, and results of daily performance tests, are stored in a neon project deployed in production environment. There is a Grafana dashboard that visualizes the results. Here is the [dashboard](https://observer.zenith.tech/d/DGKBm9Jnz/perf-test-results?orgId=1). The main problem with it is the unavailability to point at particular commit, though the data for that is available in the database. Needs some tweaking from someone who knows Grafana tricks.
|
||||
Local test results for main branch, and results of daily performance tests, are stored in a [neon project](https://console.neon.tech/app/projects/withered-sky-69117821) deployed in production environment. There is a Grafana dashboard that visualizes the results. Here is the [dashboard](https://observer.zenith.tech/d/DGKBm9Jnz/perf-test-results?orgId=1). The main problem with it is the unavailability to point at particular commit, though the data for that is available in the database. Needs some tweaking from someone who knows Grafana tricks.
|
||||
|
||||
There is also an inconsistency in test naming. Test name should be the same across platforms, and results can be differentiated by the platform field. But currently, platform is sometimes included in test name because of the way how parametrization works in pytest. I.e. there is a platform switch in the dashboard with neon-local-ci and neon-staging variants. I.e. some tests under neon-local-ci value for a platform switch are displayed as `Test test_runner/performance/test_bulk_insert.py::test_bulk_insert[vanilla]` and `Test test_runner/performance/test_bulk_insert.py::test_bulk_insert[neon]` which is highly confusing.
|
||||
|
||||
@@ -0,0 +1,307 @@
|
||||
import dataclasses
|
||||
import json
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
|
||||
from fixtures.utils import humantime_to_ms
|
||||
|
||||
TARGET_RUNTIME = 60
|
||||
|
||||
|
||||
@pytest.mark.skip("See https://github.com/neondatabase/neon/pull/9820#issue-2675856095")
|
||||
@pytest.mark.parametrize(
|
||||
"tablesize_mib, batch_timeout, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
|
||||
[
|
||||
# the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout
|
||||
(50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"),
|
||||
(50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"),
|
||||
(50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"),
|
||||
# the next 4 cases demonstrate how batchable workloads benefit from batching
|
||||
(50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"),
|
||||
(50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"),
|
||||
(50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"),
|
||||
(50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"),
|
||||
],
|
||||
)
|
||||
def test_getpage_merge_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
tablesize_mib: int,
|
||||
batch_timeout: str | None,
|
||||
target_runtime: int,
|
||||
effective_io_concurrency: int,
|
||||
readhead_buffer_size: int,
|
||||
name: str,
|
||||
):
|
||||
"""
|
||||
Do a bunch of sequential scans and ensure that the pageserver does some merging.
|
||||
"""
|
||||
|
||||
#
|
||||
# record perf-related parameters as metrics to simplify processing of results
|
||||
#
|
||||
params: dict[str, tuple[float | int, dict[str, Any]]] = {}
|
||||
|
||||
params.update(
|
||||
{
|
||||
"tablesize_mib": (tablesize_mib, {"unit": "MiB"}),
|
||||
"batch_timeout": (
|
||||
-1 if batch_timeout is None else 1e3 * humantime_to_ms(batch_timeout),
|
||||
{"unit": "us"},
|
||||
),
|
||||
# target_runtime is just a polite ask to the workload to run for this long
|
||||
"effective_io_concurrency": (effective_io_concurrency, {}),
|
||||
"readhead_buffer_size": (readhead_buffer_size, {}),
|
||||
# name is not a metric
|
||||
}
|
||||
)
|
||||
|
||||
log.info("params: %s", params)
|
||||
|
||||
for param, (value, kwargs) in params.items():
|
||||
zenbenchmark.record(
|
||||
param,
|
||||
metric_value=value,
|
||||
unit=kwargs.pop("unit", ""),
|
||||
report=MetricReport.TEST_PARAM,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
#
|
||||
# Setup
|
||||
#
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute("SET max_parallel_workers_per_gather=0") # disable parallel backends
|
||||
cur.execute(f"SET effective_io_concurrency={effective_io_concurrency}")
|
||||
cur.execute(
|
||||
f"SET neon.readahead_buffer_size={readhead_buffer_size}"
|
||||
) # this is the current default value, but let's hard-code that
|
||||
|
||||
cur.execute("CREATE EXTENSION IF NOT EXISTS neon;")
|
||||
cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
|
||||
|
||||
log.info("Filling the table")
|
||||
cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)")
|
||||
tablesize = tablesize_mib * 1024 * 1024
|
||||
npages = tablesize // (8 * 1024)
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
|
||||
# TODO: can we force postgres to do sequential scans?
|
||||
|
||||
#
|
||||
# Run the workload, collect `Metrics` before and after, calculate difference, normalize.
|
||||
#
|
||||
|
||||
@dataclass
|
||||
class Metrics:
|
||||
time: float
|
||||
pageserver_getpage_count: float
|
||||
pageserver_vectored_get_count: float
|
||||
compute_getpage_count: float
|
||||
pageserver_cpu_seconds_total: float
|
||||
|
||||
def __sub__(self, other: "Metrics") -> "Metrics":
|
||||
return Metrics(
|
||||
time=self.time - other.time,
|
||||
pageserver_getpage_count=self.pageserver_getpage_count
|
||||
- other.pageserver_getpage_count,
|
||||
pageserver_vectored_get_count=self.pageserver_vectored_get_count
|
||||
- other.pageserver_vectored_get_count,
|
||||
compute_getpage_count=self.compute_getpage_count - other.compute_getpage_count,
|
||||
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total
|
||||
- other.pageserver_cpu_seconds_total,
|
||||
)
|
||||
|
||||
def normalize(self, by) -> "Metrics":
|
||||
return Metrics(
|
||||
time=self.time / by,
|
||||
pageserver_getpage_count=self.pageserver_getpage_count / by,
|
||||
pageserver_vectored_get_count=self.pageserver_vectored_get_count / by,
|
||||
compute_getpage_count=self.compute_getpage_count / by,
|
||||
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total / by,
|
||||
)
|
||||
|
||||
def get_metrics() -> Metrics:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"select value from neon_perf_counters where metric='getpage_wait_seconds_count';"
|
||||
)
|
||||
compute_getpage_count = cur.fetchall()[0][0]
|
||||
pageserver_metrics = ps_http.get_metrics()
|
||||
return Metrics(
|
||||
time=time.time(),
|
||||
pageserver_getpage_count=pageserver_metrics.query_one(
|
||||
"pageserver_smgr_query_seconds_count", {"smgr_query_type": "get_page_at_lsn"}
|
||||
).value,
|
||||
pageserver_vectored_get_count=pageserver_metrics.query_one(
|
||||
"pageserver_get_vectored_seconds_count", {"task_kind": "PageRequestHandler"}
|
||||
).value,
|
||||
compute_getpage_count=compute_getpage_count,
|
||||
pageserver_cpu_seconds_total=pageserver_metrics.query_one(
|
||||
"libmetrics_process_cpu_seconds_highres"
|
||||
).value,
|
||||
)
|
||||
|
||||
def workload() -> Metrics:
|
||||
start = time.time()
|
||||
iters = 0
|
||||
while time.time() - start < target_runtime or iters < 2:
|
||||
log.info("Seqscan %d", iters)
|
||||
if iters == 1:
|
||||
# round zero for warming up
|
||||
before = get_metrics()
|
||||
cur.execute(
|
||||
"select clear_buffer_cache()"
|
||||
) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests
|
||||
cur.execute("select sum(data::bigint) from t")
|
||||
assert cur.fetchall()[0][0] == npages * (npages + 1) // 2
|
||||
iters += 1
|
||||
after = get_metrics()
|
||||
return (after - before).normalize(iters - 1)
|
||||
|
||||
env.pageserver.patch_config_toml_nonrecursive({"server_side_batch_timeout": batch_timeout})
|
||||
env.pageserver.restart()
|
||||
metrics = workload()
|
||||
|
||||
log.info("Results: %s", metrics)
|
||||
|
||||
#
|
||||
# Sanity-checks on the collected data
|
||||
#
|
||||
# assert that getpage counts roughly match between compute and ps
|
||||
assert metrics.pageserver_getpage_count == pytest.approx(
|
||||
metrics.compute_getpage_count, rel=0.01
|
||||
)
|
||||
|
||||
#
|
||||
# Record the results
|
||||
#
|
||||
|
||||
for metric, value in dataclasses.asdict(metrics).items():
|
||||
zenbenchmark.record(f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM)
|
||||
|
||||
zenbenchmark.record(
|
||||
"perfmetric.batching_factor",
|
||||
metrics.pageserver_getpage_count / metrics.pageserver_vectored_get_count,
|
||||
unit="",
|
||||
report=MetricReport.HIGHER_IS_BETTER,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skip("See https://github.com/neondatabase/neon/pull/9820#issue-2675856095")
|
||||
@pytest.mark.parametrize(
|
||||
"batch_timeout", [None, "10us", "20us", "50us", "100us", "200us", "500us", "1ms"]
|
||||
)
|
||||
def test_timer_precision(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
pg_bin: PgBin,
|
||||
batch_timeout: str | None,
|
||||
):
|
||||
"""
|
||||
Determine the batching timeout precision (mean latency) and tail latency impact.
|
||||
|
||||
The baseline is `None`; an ideal batching timeout implementation would increase
|
||||
the mean latency by exactly `batch_timeout`.
|
||||
|
||||
That is not the case with the current implementation, will be addressed in future changes.
|
||||
"""
|
||||
|
||||
#
|
||||
# Setup
|
||||
#
|
||||
|
||||
def patch_ps_config(ps_config):
|
||||
ps_config["server_side_batch_timeout"] = batch_timeout
|
||||
|
||||
neon_env_builder.pageserver_config_override = patch_ps_config
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute("SET max_parallel_workers_per_gather=0") # disable parallel backends
|
||||
cur.execute("SET effective_io_concurrency=1")
|
||||
|
||||
cur.execute("CREATE EXTENSION IF NOT EXISTS neon;")
|
||||
cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
|
||||
|
||||
log.info("Filling the table")
|
||||
cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)")
|
||||
tablesize = 50 * 1024 * 1024
|
||||
npages = tablesize // (8 * 1024)
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
|
||||
# TODO: can we force postgres to do sequential scans?
|
||||
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
endpoint.stop()
|
||||
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
|
||||
#
|
||||
# Run single-threaded pagebench (TODO: dedup with other benchmark code)
|
||||
#
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
# https://github.com/neondatabase/neon/issues/6925
|
||||
r".*query handler for.*pagestream.*failed: unexpected message: CopyFail during COPY.*"
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
cmd = [
|
||||
str(env.neon_binpath / "pagebench"),
|
||||
"get-page-latest-lsn",
|
||||
"--mgmt-api-endpoint",
|
||||
ps_http.base_url,
|
||||
"--page-service-connstring",
|
||||
env.pageserver.connstr(password=None),
|
||||
"--num-clients",
|
||||
"1",
|
||||
"--runtime",
|
||||
"10s",
|
||||
]
|
||||
log.info(f"command: {' '.join(cmd)}")
|
||||
basepath = pg_bin.run_capture(cmd, with_command_header=False)
|
||||
results_path = Path(basepath + ".stdout")
|
||||
log.info(f"Benchmark results at: {results_path}")
|
||||
|
||||
with open(results_path) as f:
|
||||
results = json.load(f)
|
||||
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
|
||||
|
||||
total = results["total"]
|
||||
|
||||
metric = "latency_mean"
|
||||
zenbenchmark.record(
|
||||
metric,
|
||||
metric_value=humantime_to_ms(total[metric]),
|
||||
unit="ms",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
|
||||
metric = "latency_percentiles"
|
||||
for k, v in total[metric].items():
|
||||
zenbenchmark.record(
|
||||
f"{metric}.{k}",
|
||||
metric_value=humantime_to_ms(v),
|
||||
unit="ms",
|
||||
report=MetricReport.LOWER_IS_BETTER,
|
||||
)
|
||||
@@ -103,6 +103,9 @@ def test_compaction_l0_memory(neon_compare: NeonCompare):
|
||||
cur.execute(f"update tbl{i} set j = {j};")
|
||||
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
pageserver_http.timeline_checkpoint(
|
||||
tenant_id, timeline_id, compact=False
|
||||
) # ^1: flush all in-memory layers
|
||||
endpoint.stop()
|
||||
|
||||
# Check we have generated the L0 stack we expected
|
||||
@@ -118,7 +121,9 @@ def test_compaction_l0_memory(neon_compare: NeonCompare):
|
||||
return v * 1024
|
||||
|
||||
before = rss_hwm()
|
||||
pageserver_http.timeline_compact(tenant_id, timeline_id)
|
||||
pageserver_http.timeline_compact(
|
||||
tenant_id, timeline_id
|
||||
) # ^1: we must ensure during this process no new L0 layers are flushed
|
||||
after = rss_hwm()
|
||||
|
||||
log.info(f"RSS across compaction: {before} -> {after} (grew {after - before})")
|
||||
@@ -137,7 +142,7 @@ def test_compaction_l0_memory(neon_compare: NeonCompare):
|
||||
# To be fixed in https://github.com/neondatabase/neon/issues/8184, after which
|
||||
# this memory estimate can be revised far downwards to something that doesn't scale
|
||||
# linearly with the layer sizes.
|
||||
MEMORY_ESTIMATE = (initial_l0s_size - final_l0s_size) * 1.5
|
||||
MEMORY_ESTIMATE = (initial_l0s_size - final_l0s_size) * 1.25
|
||||
|
||||
# If we find that compaction is using more memory, this may indicate a regression
|
||||
assert compaction_mapped_rss < MEMORY_ESTIMATE
|
||||
|
||||
114
test_runner/performance/test_ingest_insert_bulk.py
Normal file
114
test_runner/performance/test_ingest_insert_bulk.py
Normal file
@@ -0,0 +1,114 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import random
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.common_types import Lsn
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.parametrize("size", [8, 64, 1024, 8192])
|
||||
@pytest.mark.parametrize("backpressure", [True, False])
|
||||
@pytest.mark.parametrize("fsync", [True, False])
|
||||
def test_ingest_insert_bulk(
|
||||
request: pytest.FixtureRequest,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
fsync: bool,
|
||||
backpressure: bool,
|
||||
size: int,
|
||||
):
|
||||
"""
|
||||
Benchmarks ingestion of 8 GB of sequential insert WAL with concurrent inserts.
|
||||
"""
|
||||
|
||||
CONCURRENCY = 1 # 1 is optimal without fsync or backpressure
|
||||
VOLUME = 8 * 1024**3
|
||||
rows = VOLUME // (size + 64) # +64 roughly accounts for per-row WAL overhead
|
||||
|
||||
# Change Direct IO modes
|
||||
neon_env_builder.pageserver_virtual_file_io_mode = "direct"
|
||||
neon_env_builder.safekeepers_enable_fsync = fsync
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# NB: neon_local defaults to max_replication_write_lag=15MB, which is too low.
|
||||
# Production uses 500MB.
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
f"fsync = {fsync}",
|
||||
"max_replication_apply_lag = 0",
|
||||
f"max_replication_flush_lag = {'10GB' if backpressure else '0'}",
|
||||
f"max_replication_write_lag = {'500MB' if backpressure else '0'}",
|
||||
],
|
||||
)
|
||||
endpoint.safe_psql("create extension neon")
|
||||
|
||||
# Wait for the timeline to be propagated to the pageserver.
|
||||
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
# Ingest rows.
|
||||
log.info("Ingesting data")
|
||||
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
|
||||
def insert_rows(endpoint, table, count, value):
|
||||
with endpoint.connect().cursor() as cur:
|
||||
cur.execute("set statement_timeout = 0")
|
||||
cur.execute(f"create table {table} (id int, data bytea)")
|
||||
cur.execute(f"insert into {table} values (generate_series(1, {count}), %s)", (value,))
|
||||
|
||||
with zenbenchmark.record_duration("ingest"):
|
||||
with ThreadPoolExecutor(max_workers=CONCURRENCY) as pool:
|
||||
for i in range(CONCURRENCY):
|
||||
# Write a random value for all rows. This is sufficient to prevent compression, e.g.
|
||||
# in TOAST. Randomly generating every row is too slow.
|
||||
value = random.randbytes(size)
|
||||
worker_rows = rows / CONCURRENCY
|
||||
pool.submit(insert_rows, endpoint, f"table{i}", worker_rows, value)
|
||||
|
||||
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
client = env.pageserver.http_client()
|
||||
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
|
||||
|
||||
backpressure_time = endpoint.safe_psql("select backpressure_throttling_time()")[0][0]
|
||||
|
||||
# Now that all data is ingested, delete and recreate the tenant in the pageserver. This will
|
||||
# reingest all the WAL directly from the safekeeper. This gives us a baseline of how fast the
|
||||
# pageserver can ingest this WAL in isolation.
|
||||
pg_version = PgVersion(
|
||||
str(client.timeline_detail(env.initial_tenant, env.initial_timeline)["pg_version"])
|
||||
)
|
||||
status = env.storage_controller.inspect(tenant_shard_id=env.initial_tenant)
|
||||
assert status is not None
|
||||
|
||||
endpoint.stop() # avoid spurious getpage errors
|
||||
client.tenant_delete(env.initial_tenant)
|
||||
env.pageserver.tenant_create(tenant_id=env.initial_tenant, generation=status[0])
|
||||
|
||||
with zenbenchmark.record_duration("recover"):
|
||||
log.info("Recovering WAL into pageserver")
|
||||
client.timeline_create(pg_version, env.initial_tenant, env.initial_timeline)
|
||||
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
|
||||
|
||||
# Emit metrics.
|
||||
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
|
||||
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
|
||||
zenbenchmark.record("row_count", rows, "rows", MetricReport.TEST_PARAM)
|
||||
zenbenchmark.record("concurrency", CONCURRENCY, "clients", MetricReport.TEST_PARAM)
|
||||
zenbenchmark.record(
|
||||
"backpressure_time", backpressure_time // 1000, "ms", MetricReport.LOWER_IS_BETTER
|
||||
)
|
||||
|
||||
props = {p["name"]: p["value"] for _, p in request.node.user_properties}
|
||||
for name in ("ingest", "recover"):
|
||||
throughput = int(wal_written_mb / props[name])
|
||||
zenbenchmark.record(f"{name}_throughput", throughput, "MB/s", MetricReport.HIGHER_IS_BETTER)
|
||||
@@ -60,13 +60,13 @@ def build_pgcopydb_command(pgcopydb_filter_file: Path, test_output_dir: Path):
|
||||
"--no-acl",
|
||||
"--skip-db-properties",
|
||||
"--table-jobs",
|
||||
"4",
|
||||
"8",
|
||||
"--index-jobs",
|
||||
"4",
|
||||
"8",
|
||||
"--restore-jobs",
|
||||
"4",
|
||||
"8",
|
||||
"--split-tables-larger-than",
|
||||
"10GB",
|
||||
"5GB",
|
||||
"--skip-extensions",
|
||||
"--use-copy-binary",
|
||||
"--filters",
|
||||
@@ -136,7 +136,7 @@ def run_command_and_log_output(command, log_file_path: Path):
|
||||
"LD_LIBRARY_PATH": f"{os.getenv('PGCOPYDB_LIB_PATH')}:{os.getenv('PG_16_LIB_PATH')}",
|
||||
"PGCOPYDB_SOURCE_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_SOURCE_CONNSTR")),
|
||||
"PGCOPYDB_TARGET_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR")),
|
||||
"PGOPTIONS": "-c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7",
|
||||
"PGOPTIONS": "-c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=16",
|
||||
}
|
||||
# Combine the current environment with custom variables
|
||||
env = os.environ.copy()
|
||||
|
||||
@@ -15,21 +15,61 @@ from fixtures.neon_fixtures import (
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.parametrize("shard_count", [1, 8, 32])
|
||||
@pytest.mark.parametrize(
|
||||
"wal_receiver_protocol",
|
||||
[
|
||||
"vanilla",
|
||||
"interpreted-bincode-compressed",
|
||||
"interpreted-protobuf-compressed",
|
||||
],
|
||||
)
|
||||
def test_sharded_ingest(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
shard_count: int,
|
||||
wal_receiver_protocol: str,
|
||||
):
|
||||
"""
|
||||
Benchmarks sharded ingestion throughput, by ingesting a large amount of WAL into a Safekeeper
|
||||
and fanning out to a large number of shards on dedicated Pageservers. Comparing the base case
|
||||
(shard_count=1) to the sharded case indicates the overhead of sharding.
|
||||
"""
|
||||
|
||||
ROW_COUNT = 100_000_000 # about 7 GB of WAL
|
||||
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_env_builder.init_configs()
|
||||
|
||||
for ps in env.pageservers:
|
||||
if wal_receiver_protocol == "vanilla":
|
||||
ps.patch_config_toml_nonrecursive(
|
||||
{
|
||||
"wal_receiver_protocol": {
|
||||
"type": "vanilla",
|
||||
}
|
||||
}
|
||||
)
|
||||
elif wal_receiver_protocol == "interpreted-bincode-compressed":
|
||||
ps.patch_config_toml_nonrecursive(
|
||||
{
|
||||
"wal_receiver_protocol": {
|
||||
"type": "interpreted",
|
||||
"args": {"format": "bincode", "compression": {"zstd": {"level": 1}}},
|
||||
}
|
||||
}
|
||||
)
|
||||
elif wal_receiver_protocol == "interpreted-protobuf-compressed":
|
||||
ps.patch_config_toml_nonrecursive(
|
||||
{
|
||||
"wal_receiver_protocol": {
|
||||
"type": "interpreted",
|
||||
"args": {"format": "protobuf", "compression": {"zstd": {"level": 1}}},
|
||||
}
|
||||
}
|
||||
)
|
||||
else:
|
||||
raise AssertionError("Test must use explicit wal receiver protocol config")
|
||||
|
||||
env.start()
|
||||
|
||||
# Create a sharded tenant and timeline, and migrate it to the respective pageservers. Ensure
|
||||
# the storage controller doesn't mess with shard placements.
|
||||
@@ -50,7 +90,6 @@ def test_sharded_ingest(
|
||||
# Start the endpoint.
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
|
||||
# Ingest data and measure WAL volume and duration.
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
@@ -68,4 +107,48 @@ def test_sharded_ingest(
|
||||
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
|
||||
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
|
||||
|
||||
total_ingested = 0
|
||||
total_records_received = 0
|
||||
ingested_by_ps = []
|
||||
for pageserver in env.pageservers:
|
||||
ingested = pageserver.http_client().get_metric_value(
|
||||
"pageserver_wal_ingest_bytes_received_total"
|
||||
)
|
||||
records_received = pageserver.http_client().get_metric_value(
|
||||
"pageserver_wal_ingest_records_received_total"
|
||||
)
|
||||
|
||||
if ingested is None:
|
||||
ingested = 0
|
||||
|
||||
if records_received is None:
|
||||
records_received = 0
|
||||
|
||||
ingested_by_ps.append(
|
||||
(
|
||||
pageserver.id,
|
||||
{
|
||||
"ingested": ingested,
|
||||
"records_received": records_received,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
total_ingested += int(ingested)
|
||||
total_records_received += int(records_received)
|
||||
|
||||
total_ingested_mb = total_ingested / (1024 * 1024)
|
||||
zenbenchmark.record("wal_ingested", total_ingested_mb, "MB", MetricReport.LOWER_IS_BETTER)
|
||||
zenbenchmark.record(
|
||||
"records_received", total_records_received, "records", MetricReport.LOWER_IS_BETTER
|
||||
)
|
||||
|
||||
ingested_by_ps.sort(key=lambda x: x[0])
|
||||
for _, stats in ingested_by_ps:
|
||||
for k in stats:
|
||||
if k != "records_received":
|
||||
stats[k] /= 1024**2
|
||||
|
||||
log.info(f"WAL ingested by each pageserver {ingested_by_ps}")
|
||||
|
||||
assert tenant_get_shards(env, tenant_id) == shards, "shards moved"
|
||||
|
||||
@@ -174,6 +174,10 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"lsn_lease_length": "1m",
|
||||
"lsn_lease_length_for_ts": "5s",
|
||||
"timeline_offloading": True,
|
||||
"wal_receiver_protocol_override": {
|
||||
"type": "interpreted",
|
||||
"args": {"format": "bincode", "compression": {"zstd": {"level": 1}}},
|
||||
},
|
||||
}
|
||||
|
||||
vps_http = env.storage_controller.pageserver_api()
|
||||
|
||||
@@ -8,6 +8,7 @@ import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PageserverWalReceiverProtocol,
|
||||
generate_uploads_and_deletions,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
@@ -27,7 +28,13 @@ AGGRESIVE_COMPACTION_TENANT_CONF = {
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize(
|
||||
"wal_receiver_protocol",
|
||||
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
|
||||
)
|
||||
def test_pageserver_compaction_smoke(
|
||||
neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol
|
||||
):
|
||||
"""
|
||||
This is a smoke test that compaction kicks in. The workload repeatedly churns
|
||||
a small number of rows and manually instructs the pageserver to run compaction
|
||||
@@ -36,6 +43,8 @@ def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
observed bounds.
|
||||
"""
|
||||
|
||||
neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol
|
||||
|
||||
# Effectively disable the page cache to rely only on image layers
|
||||
# to shorten reads.
|
||||
neon_env_builder.pageserver_config_override = """
|
||||
|
||||
@@ -3,7 +3,7 @@ from __future__ import annotations
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_cli import WalCraft
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PageserverWalReceiverProtocol
|
||||
|
||||
# Restart nodes with WAL end having specially crafted shape, like last record
|
||||
# crossing segment boundary, to test decoding issues.
|
||||
@@ -19,7 +19,17 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
"wal_record_crossing_segment_followed_by_small_one",
|
||||
],
|
||||
)
|
||||
def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
|
||||
@pytest.mark.parametrize(
|
||||
"wal_receiver_protocol",
|
||||
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
|
||||
)
|
||||
def test_crafted_wal_end(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
wal_type: str,
|
||||
wal_receiver_protocol: PageserverWalReceiverProtocol,
|
||||
):
|
||||
neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
env.create_branch("test_crafted_wal_end")
|
||||
env.pageserver.allowed_errors.extend(
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PageserverWalReceiverProtocol,
|
||||
check_restored_datadir_content,
|
||||
)
|
||||
|
||||
|
||||
# Test subtransactions
|
||||
@@ -9,8 +14,14 @@ from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
# maintained in the pageserver, so subtransactions are not very exciting for
|
||||
# Neon. They are included in the commit record though and updated in the
|
||||
# CLOG.
|
||||
def test_subxacts(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
@pytest.mark.parametrize(
|
||||
"wal_receiver_protocol",
|
||||
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
|
||||
)
|
||||
def test_subxacts(neon_env_builder: NeonEnvBuilder, test_output_dir, wal_receiver_protocol):
|
||||
neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
|
||||
@@ -11,7 +11,13 @@ import pytest
|
||||
import toml
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.log_helper import getLogger
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper
|
||||
from fixtures.neon_fixtures import (
|
||||
Endpoint,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PageserverWalReceiverProtocol,
|
||||
Safekeeper,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.utils import skip_in_debug_build
|
||||
|
||||
@@ -622,8 +628,15 @@ async def run_segment_init_failure(env: NeonEnv):
|
||||
# Test (injected) failure during WAL segment init.
|
||||
# https://github.com/neondatabase/neon/issues/6401
|
||||
# https://github.com/neondatabase/neon/issues/6402
|
||||
def test_segment_init_failure(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize(
|
||||
"wal_receiver_protocol",
|
||||
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
|
||||
)
|
||||
def test_segment_init_failure(
|
||||
neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol
|
||||
):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
asyncio.run(run_segment_init_failure(env))
|
||||
|
||||
Reference in New Issue
Block a user