mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
Merge branch 'main' into yuchen/double-buffered-writer
This commit is contained in:
73
.github/workflows/build-build-tools-image.yml
vendored
73
.github/workflows/build-build-tools-image.yml
vendored
@@ -2,6 +2,17 @@ name: Build build-tools image
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
archs:
|
||||
description: "Json array of architectures to build"
|
||||
# Default values are set in `check-image` job, `set-variables` step
|
||||
type: string
|
||||
required: false
|
||||
debians:
|
||||
description: "Json array of Debian versions to build"
|
||||
# Default values are set in `check-image` job, `set-variables` step
|
||||
type: string
|
||||
required: false
|
||||
outputs:
|
||||
image-tag:
|
||||
description: "build-tools tag"
|
||||
@@ -32,25 +43,37 @@ jobs:
|
||||
check-image:
|
||||
runs-on: ubuntu-22.04
|
||||
outputs:
|
||||
tag: ${{ steps.get-build-tools-tag.outputs.image-tag }}
|
||||
found: ${{ steps.check-image.outputs.found }}
|
||||
archs: ${{ steps.set-variables.outputs.archs }}
|
||||
debians: ${{ steps.set-variables.outputs.debians }}
|
||||
tag: ${{ steps.set-variables.outputs.image-tag }}
|
||||
everything: ${{ steps.set-more-variables.outputs.everything }}
|
||||
found: ${{ steps.set-more-variables.outputs.found }}
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- name: Get build-tools image tag for the current commit
|
||||
id: get-build-tools-tag
|
||||
- name: Set variables
|
||||
id: set-variables
|
||||
env:
|
||||
ARCHS: ${{ inputs.archs || '["x64","arm64"]' }}
|
||||
DEBIANS: ${{ inputs.debians || '["bullseye","bookworm"]' }}
|
||||
IMAGE_TAG: |
|
||||
${{ hashFiles('build-tools.Dockerfile',
|
||||
'.github/workflows/build-build-tools-image.yml') }}
|
||||
run: |
|
||||
echo "image-tag=${IMAGE_TAG}" | tee -a $GITHUB_OUTPUT
|
||||
echo "archs=${ARCHS}" | tee -a ${GITHUB_OUTPUT}
|
||||
echo "debians=${DEBIANS}" | tee -a ${GITHUB_OUTPUT}
|
||||
echo "image-tag=${IMAGE_TAG}" | tee -a ${GITHUB_OUTPUT}
|
||||
|
||||
- name: Check if such tag found in the registry
|
||||
id: check-image
|
||||
- name: Set more variables
|
||||
id: set-more-variables
|
||||
env:
|
||||
IMAGE_TAG: ${{ steps.get-build-tools-tag.outputs.image-tag }}
|
||||
IMAGE_TAG: ${{ steps.set-variables.outputs.image-tag }}
|
||||
EVERYTHING: |
|
||||
${{ contains(fromJson(steps.set-variables.outputs.archs), 'x64') &&
|
||||
contains(fromJson(steps.set-variables.outputs.archs), 'arm64') &&
|
||||
contains(fromJson(steps.set-variables.outputs.debians), 'bullseye') &&
|
||||
contains(fromJson(steps.set-variables.outputs.debians), 'bookworm') }}
|
||||
run: |
|
||||
if docker manifest inspect neondatabase/build-tools:${IMAGE_TAG}; then
|
||||
found=true
|
||||
@@ -58,8 +81,8 @@ jobs:
|
||||
found=false
|
||||
fi
|
||||
|
||||
echo "found=${found}" | tee -a $GITHUB_OUTPUT
|
||||
|
||||
echo "everything=${EVERYTHING}" | tee -a ${GITHUB_OUTPUT}
|
||||
echo "found=${found}" | tee -a ${GITHUB_OUTPUT}
|
||||
|
||||
build-image:
|
||||
needs: [ check-image ]
|
||||
@@ -67,8 +90,8 @@ jobs:
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
debian-version: [ bullseye, bookworm ]
|
||||
arch: [ x64, arm64 ]
|
||||
arch: ${{ fromJson(needs.check-image.outputs.archs) }}
|
||||
debian: ${{ fromJson(needs.check-image.outputs.debians) }}
|
||||
|
||||
runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }}
|
||||
|
||||
@@ -99,11 +122,11 @@ jobs:
|
||||
push: true
|
||||
pull: true
|
||||
build-args: |
|
||||
DEBIAN_VERSION=${{ matrix.debian-version }}
|
||||
cache-from: type=registry,ref=cache.neon.build/build-tools:cache-${{ matrix.debian-version }}-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/build-tools:cache-{0}-{1},mode=max', matrix.debian-version, matrix.arch) || '' }}
|
||||
DEBIAN_VERSION=${{ matrix.debian }}
|
||||
cache-from: type=registry,ref=cache.neon.build/build-tools:cache-${{ matrix.debian }}-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/build-tools:cache-{0}-{1},mode=max', matrix.debian, matrix.arch) || '' }}
|
||||
tags: |
|
||||
neondatabase/build-tools:${{ needs.check-image.outputs.tag }}-${{ matrix.debian-version }}-${{ matrix.arch }}
|
||||
neondatabase/build-tools:${{ needs.check-image.outputs.tag }}-${{ matrix.debian }}-${{ matrix.arch }}
|
||||
|
||||
merge-images:
|
||||
needs: [ check-image, build-image ]
|
||||
@@ -118,15 +141,21 @@ jobs:
|
||||
- name: Create multi-arch image
|
||||
env:
|
||||
DEFAULT_DEBIAN_VERSION: bookworm
|
||||
ARCHS: ${{ join(fromJson(needs.check-image.outputs.archs), ' ') }}
|
||||
DEBIANS: ${{ join(fromJson(needs.check-image.outputs.debians), ' ') }}
|
||||
EVERYTHING: ${{ needs.check-image.outputs.everything }}
|
||||
IMAGE_TAG: ${{ needs.check-image.outputs.tag }}
|
||||
run: |
|
||||
for debian_version in bullseye bookworm; do
|
||||
tags=("-t" "neondatabase/build-tools:${IMAGE_TAG}-${debian_version}")
|
||||
if [ "${debian_version}" == "${DEFAULT_DEBIAN_VERSION}" ]; then
|
||||
for debian in ${DEBIANS}; do
|
||||
tags=("-t" "neondatabase/build-tools:${IMAGE_TAG}-${debian}")
|
||||
|
||||
if [ "${EVERYTHING}" == "true" ] && [ "${debian}" == "${DEFAULT_DEBIAN_VERSION}" ]; then
|
||||
tags+=("-t" "neondatabase/build-tools:${IMAGE_TAG}")
|
||||
fi
|
||||
|
||||
docker buildx imagetools create "${tags[@]}" \
|
||||
neondatabase/build-tools:${IMAGE_TAG}-${debian_version}-x64 \
|
||||
neondatabase/build-tools:${IMAGE_TAG}-${debian_version}-arm64
|
||||
for arch in ${ARCHS}; do
|
||||
tags+=("neondatabase/build-tools:${IMAGE_TAG}-${debian}-${arch}")
|
||||
done
|
||||
|
||||
docker buildx imagetools create "${tags[@]}"
|
||||
done
|
||||
|
||||
9
.github/workflows/pre-merge-checks.yml
vendored
9
.github/workflows/pre-merge-checks.yml
vendored
@@ -23,6 +23,8 @@ jobs:
|
||||
id: python-src
|
||||
with:
|
||||
files: |
|
||||
.github/workflows/_check-codestyle-python.yml
|
||||
.github/workflows/build-build-tools-image.yml
|
||||
.github/workflows/pre-merge-checks.yml
|
||||
**/**.py
|
||||
poetry.lock
|
||||
@@ -38,6 +40,10 @@ jobs:
|
||||
if: needs.get-changed-files.outputs.python-changed == 'true'
|
||||
needs: [ get-changed-files ]
|
||||
uses: ./.github/workflows/build-build-tools-image.yml
|
||||
with:
|
||||
# Build only one combination to save time
|
||||
archs: '["x64"]'
|
||||
debians: '["bookworm"]'
|
||||
secrets: inherit
|
||||
|
||||
check-codestyle-python:
|
||||
@@ -45,7 +51,8 @@ jobs:
|
||||
needs: [ get-changed-files, build-build-tools-image ]
|
||||
uses: ./.github/workflows/_check-codestyle-python.yml
|
||||
with:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
# `-bookworm-x64` suffix should match the combination in `build-build-tools-image`
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm-x64
|
||||
secrets: inherit
|
||||
|
||||
# To get items from the merge queue merged into main we need to satisfy "Status checks that are required".
|
||||
|
||||
14
Cargo.lock
generated
14
Cargo.lock
generated
@@ -4133,7 +4133,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -4146,7 +4146,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
|
||||
dependencies = [
|
||||
"base64 0.20.0",
|
||||
"byteorder",
|
||||
@@ -4165,7 +4165,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -6468,7 +6468,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
@@ -7120,10 +7120,16 @@ name = "wal_decoder"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-compression",
|
||||
"bytes",
|
||||
"pageserver_api",
|
||||
"postgres_ffi",
|
||||
"prost",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
|
||||
@@ -58,7 +58,7 @@ use compute_tools::compute::{
|
||||
forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
|
||||
};
|
||||
use compute_tools::configurator::launch_configurator;
|
||||
use compute_tools::extension_server::get_pg_version;
|
||||
use compute_tools::extension_server::get_pg_version_string;
|
||||
use compute_tools::http::api::launch_http_server;
|
||||
use compute_tools::logger::*;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
@@ -326,7 +326,7 @@ fn wait_spec(
|
||||
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
|
||||
pgdata: pgdata.to_string(),
|
||||
pgbin: pgbin.to_string(),
|
||||
pgversion: get_pg_version(pgbin),
|
||||
pgversion: get_pg_version_string(pgbin),
|
||||
live_config_allowed,
|
||||
state: Mutex::new(new_state),
|
||||
state_changed: Condvar::new(),
|
||||
|
||||
@@ -29,6 +29,7 @@ use anyhow::Context;
|
||||
use aws_config::BehaviorVersion;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::Parser;
|
||||
use compute_tools::extension_server::{get_pg_version, PostgresMajorVersion};
|
||||
use nix::unistd::Pid;
|
||||
use tracing::{info, info_span, warn, Instrument};
|
||||
use utils::fs_ext::is_directory_empty;
|
||||
@@ -131,11 +132,17 @@ pub(crate) async fn main() -> anyhow::Result<()> {
|
||||
//
|
||||
// Initialize pgdata
|
||||
//
|
||||
let pg_version = match get_pg_version(pg_bin_dir.as_str()) {
|
||||
PostgresMajorVersion::V14 => 14,
|
||||
PostgresMajorVersion::V15 => 15,
|
||||
PostgresMajorVersion::V16 => 16,
|
||||
PostgresMajorVersion::V17 => 17,
|
||||
};
|
||||
let superuser = "cloud_admin"; // XXX: this shouldn't be hard-coded
|
||||
postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
|
||||
superuser,
|
||||
locale: "en_US.UTF-8", // XXX: this shouldn't be hard-coded,
|
||||
pg_version: 140000, // XXX: this shouldn't be hard-coded but derived from which compute image we're running in
|
||||
pg_version,
|
||||
initdb_bin: pg_bin_dir.join("initdb").as_ref(),
|
||||
library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
|
||||
pgdata: &pgdata_dir,
|
||||
|
||||
@@ -103,14 +103,33 @@ fn get_pg_config(argument: &str, pgbin: &str) -> String {
|
||||
.to_string()
|
||||
}
|
||||
|
||||
pub fn get_pg_version(pgbin: &str) -> String {
|
||||
pub fn get_pg_version(pgbin: &str) -> PostgresMajorVersion {
|
||||
// pg_config --version returns a (platform specific) human readable string
|
||||
// such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
|
||||
let human_version = get_pg_config("--version", pgbin);
|
||||
parse_pg_version(&human_version).to_string()
|
||||
parse_pg_version(&human_version)
|
||||
}
|
||||
|
||||
fn parse_pg_version(human_version: &str) -> &str {
|
||||
pub fn get_pg_version_string(pgbin: &str) -> String {
|
||||
match get_pg_version(pgbin) {
|
||||
PostgresMajorVersion::V14 => "v14",
|
||||
PostgresMajorVersion::V15 => "v15",
|
||||
PostgresMajorVersion::V16 => "v16",
|
||||
PostgresMajorVersion::V17 => "v17",
|
||||
}
|
||||
.to_owned()
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
pub enum PostgresMajorVersion {
|
||||
V14,
|
||||
V15,
|
||||
V16,
|
||||
V17,
|
||||
}
|
||||
|
||||
fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
|
||||
use PostgresMajorVersion::*;
|
||||
// Normal releases have version strings like "PostgreSQL 15.4". But there
|
||||
// are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
|
||||
// 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
|
||||
@@ -121,10 +140,10 @@ fn parse_pg_version(human_version: &str) -> &str {
|
||||
.captures(human_version)
|
||||
{
|
||||
Some(captures) if captures.len() == 2 => match &captures["major"] {
|
||||
"14" => return "v14",
|
||||
"15" => return "v15",
|
||||
"16" => return "v16",
|
||||
"17" => return "v17",
|
||||
"14" => return V14,
|
||||
"15" => return V15,
|
||||
"16" => return V16,
|
||||
"17" => return V17,
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
@@ -263,24 +282,25 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_parse_pg_version() {
|
||||
assert_eq!(parse_pg_version("PostgreSQL 15.4"), "v15");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 15.14"), "v15");
|
||||
use super::PostgresMajorVersion::*;
|
||||
assert_eq!(parse_pg_version("PostgreSQL 15.4"), V15);
|
||||
assert_eq!(parse_pg_version("PostgreSQL 15.14"), V15);
|
||||
assert_eq!(
|
||||
parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
|
||||
"v15"
|
||||
V15
|
||||
);
|
||||
|
||||
assert_eq!(parse_pg_version("PostgreSQL 14.15"), "v14");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 14.0"), "v14");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 14.15"), V14);
|
||||
assert_eq!(parse_pg_version("PostgreSQL 14.0"), V14);
|
||||
assert_eq!(
|
||||
parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
|
||||
"v14"
|
||||
V14
|
||||
);
|
||||
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16devel"), "v16");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16beta1"), "v16");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16rc2"), "v16");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16extra"), "v16");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16devel"), V16);
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16beta1"), V16);
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16rc2"), V16);
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16extra"), V16);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -415,6 +415,11 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'timeline_offloading' as bool")?,
|
||||
wal_receiver_protocol_override: settings
|
||||
.remove("wal_receiver_protocol_override")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("parse `wal_receiver_protocol_override` from json")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
|
||||
@@ -278,6 +278,8 @@ pub struct TenantConfigToml {
|
||||
/// Enable auto-offloading of timelines.
|
||||
/// (either this flag or the pageserver-global one need to be set)
|
||||
pub timeline_offloading: bool,
|
||||
|
||||
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
|
||||
}
|
||||
|
||||
pub mod defaults {
|
||||
@@ -510,6 +512,7 @@ impl Default for TenantConfigToml {
|
||||
lsn_lease_length: LsnLease::DEFAULT_LENGTH,
|
||||
lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS,
|
||||
timeline_offloading: false,
|
||||
wal_receiver_protocol_override: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -229,6 +229,18 @@ impl Key {
|
||||
}
|
||||
}
|
||||
|
||||
impl CompactKey {
|
||||
pub fn raw(&self) -> i128 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl From<i128> for CompactKey {
|
||||
fn from(value: i128) -> Self {
|
||||
Self(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Key {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
|
||||
@@ -23,6 +23,7 @@ use utils::{
|
||||
completion,
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
postgres_client::PostgresClientProtocol,
|
||||
serde_system_time,
|
||||
};
|
||||
|
||||
@@ -352,6 +353,7 @@ pub struct TenantConfig {
|
||||
pub lsn_lease_length: Option<String>,
|
||||
pub lsn_lease_length_for_ts: Option<String>,
|
||||
pub timeline_offloading: Option<bool>,
|
||||
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
|
||||
}
|
||||
|
||||
/// The policy for the aux file storage.
|
||||
|
||||
@@ -688,9 +688,6 @@ pub struct InterpretedWalRecordsBody<'a> {
|
||||
pub streaming_lsn: u64,
|
||||
/// Current end of WAL on the server
|
||||
pub commit_lsn: u64,
|
||||
/// Start LSN of the next record in PG WAL.
|
||||
/// Is 0 if the portion of PG WAL did not contain any records.
|
||||
pub next_record_lsn: u64,
|
||||
pub data: &'a [u8],
|
||||
}
|
||||
|
||||
@@ -1028,7 +1025,6 @@ impl BeMessage<'_> {
|
||||
// dependency
|
||||
buf.put_u64(rec.streaming_lsn);
|
||||
buf.put_u64(rec.commit_lsn);
|
||||
buf.put_u64(rec.next_record_lsn);
|
||||
buf.put_slice(rec.data);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -176,7 +176,9 @@ pub(crate) struct BucketMetrics {
|
||||
|
||||
impl Default for BucketMetrics {
|
||||
fn default() -> Self {
|
||||
let buckets = [0.01, 0.10, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0];
|
||||
// first bucket 100 microseconds to count requests that do not need to wait at all
|
||||
// and get a permit immediately
|
||||
let buckets = [0.0001, 0.01, 0.10, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0];
|
||||
|
||||
let req_seconds = register_histogram_vec!(
|
||||
"remote_storage_s3_request_seconds",
|
||||
|
||||
@@ -7,40 +7,31 @@ use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||
|
||||
use crate::id::TenantTimelineId;
|
||||
|
||||
/// Postgres client protocol types
|
||||
#[derive(
|
||||
Copy,
|
||||
Clone,
|
||||
PartialEq,
|
||||
Eq,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
Debug,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
#[repr(u8)]
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum InterpretedFormat {
|
||||
Bincode,
|
||||
Protobuf,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum Compression {
|
||||
Zstd { level: i8 },
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "type", content = "args")]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum PostgresClientProtocol {
|
||||
/// Usual Postgres replication protocol
|
||||
Vanilla,
|
||||
/// Custom shard-aware protocol that replicates interpreted records.
|
||||
/// Used to send wal from safekeeper to pageserver.
|
||||
Interpreted,
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for PostgresClientProtocol {
|
||||
type Error = u8;
|
||||
|
||||
fn try_from(value: u8) -> Result<Self, Self::Error> {
|
||||
Ok(match value {
|
||||
v if v == (PostgresClientProtocol::Vanilla as u8) => PostgresClientProtocol::Vanilla,
|
||||
v if v == (PostgresClientProtocol::Interpreted as u8) => {
|
||||
PostgresClientProtocol::Interpreted
|
||||
}
|
||||
x => return Err(x),
|
||||
})
|
||||
}
|
||||
Interpreted {
|
||||
format: InterpretedFormat,
|
||||
compression: Option<Compression>,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct ConnectionConfigArgs<'a> {
|
||||
@@ -63,7 +54,10 @@ impl<'a> ConnectionConfigArgs<'a> {
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", self.ttid.timeline_id),
|
||||
format!("tenant_id={}", self.ttid.tenant_id),
|
||||
format!("protocol={}", self.protocol as u8),
|
||||
format!(
|
||||
"protocol={}",
|
||||
serde_json::to_string(&self.protocol).unwrap()
|
||||
),
|
||||
];
|
||||
|
||||
if self.shard_number.is_some() {
|
||||
|
||||
@@ -8,11 +8,19 @@ license.workspace = true
|
||||
testing = ["pageserver_api/testing"]
|
||||
|
||||
[dependencies]
|
||||
async-compression.workspace = true
|
||||
anyhow.workspace = true
|
||||
bytes.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
prost.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
serde.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio = { workspace = true, features = ["io-util"] }
|
||||
tonic.workspace = true
|
||||
tracing.workspace = true
|
||||
utils.workspace = true
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build.workspace = true
|
||||
|
||||
11
libs/wal_decoder/build.rs
Normal file
11
libs/wal_decoder/build.rs
Normal file
@@ -0,0 +1,11 @@
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Generate rust code from .proto protobuf.
|
||||
//
|
||||
// Note: we previously tried to use deterministic location at proto/ for
|
||||
// easy location, but apparently interference with cachepot sometimes fails
|
||||
// the build then. Anyway, per cargo docs build script shouldn't output to
|
||||
// anywhere but $OUT_DIR.
|
||||
tonic_build::compile_protos("proto/interpreted_wal.proto")
|
||||
.unwrap_or_else(|e| panic!("failed to compile protos {:?}", e));
|
||||
Ok(())
|
||||
}
|
||||
43
libs/wal_decoder/proto/interpreted_wal.proto
Normal file
43
libs/wal_decoder/proto/interpreted_wal.proto
Normal file
@@ -0,0 +1,43 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package interpreted_wal;
|
||||
|
||||
message InterpretedWalRecords {
|
||||
repeated InterpretedWalRecord records = 1;
|
||||
optional uint64 next_record_lsn = 2;
|
||||
}
|
||||
|
||||
message InterpretedWalRecord {
|
||||
optional bytes metadata_record = 1;
|
||||
SerializedValueBatch batch = 2;
|
||||
uint64 next_record_lsn = 3;
|
||||
bool flush_uncommitted = 4;
|
||||
uint32 xid = 5;
|
||||
}
|
||||
|
||||
message SerializedValueBatch {
|
||||
bytes raw = 1;
|
||||
repeated ValueMeta metadata = 2;
|
||||
uint64 max_lsn = 3;
|
||||
uint64 len = 4;
|
||||
}
|
||||
|
||||
enum ValueMetaType {
|
||||
Serialized = 0;
|
||||
Observed = 1;
|
||||
}
|
||||
|
||||
message ValueMeta {
|
||||
ValueMetaType type = 1;
|
||||
CompactKey key = 2;
|
||||
uint64 lsn = 3;
|
||||
optional uint64 batch_offset = 4;
|
||||
optional uint64 len = 5;
|
||||
optional bool will_init = 6;
|
||||
}
|
||||
|
||||
message CompactKey {
|
||||
int64 high = 1;
|
||||
int64 low = 2;
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod decoder;
|
||||
pub mod models;
|
||||
pub mod serialized_batch;
|
||||
pub mod wire_format;
|
||||
|
||||
@@ -37,12 +37,32 @@ use utils::lsn::Lsn;
|
||||
|
||||
use crate::serialized_batch::SerializedValueBatch;
|
||||
|
||||
// Code generated by protobuf.
|
||||
pub mod proto {
|
||||
// Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]`
|
||||
// we don't use these types for anything but broker data transmission,
|
||||
// so it's ok to ignore this one.
|
||||
#![allow(clippy::derive_partial_eq_without_eq)]
|
||||
// The generated ValueMeta has a `len` method generate for its `len` field.
|
||||
#![allow(clippy::len_without_is_empty)]
|
||||
tonic::include_proto!("interpreted_wal");
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum FlushUncommittedRecords {
|
||||
Yes,
|
||||
No,
|
||||
}
|
||||
|
||||
/// A batch of interpreted WAL records
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct InterpretedWalRecords {
|
||||
pub records: Vec<InterpretedWalRecord>,
|
||||
// Start LSN of the next record after the batch.
|
||||
// Note that said record may not belong to the current shard.
|
||||
pub next_record_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
/// An interpreted Postgres WAL record, ready to be handled by the pageserver
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct InterpretedWalRecord {
|
||||
|
||||
356
libs/wal_decoder/src/wire_format.rs
Normal file
356
libs/wal_decoder/src/wire_format.rs
Normal file
@@ -0,0 +1,356 @@
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use pageserver_api::key::CompactKey;
|
||||
use prost::{DecodeError, EncodeError, Message};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use utils::bin_ser::{BeSer, DeserializeError, SerializeError};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::postgres_client::{Compression, InterpretedFormat};
|
||||
|
||||
use crate::models::{
|
||||
FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord,
|
||||
};
|
||||
|
||||
use crate::serialized_batch::{
|
||||
ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta,
|
||||
};
|
||||
|
||||
use crate::models::proto;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ToWireFormatError {
|
||||
#[error("{0}")]
|
||||
Bincode(#[from] SerializeError),
|
||||
#[error("{0}")]
|
||||
Protobuf(#[from] ProtobufSerializeError),
|
||||
#[error("{0}")]
|
||||
Compression(#[from] std::io::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ProtobufSerializeError {
|
||||
#[error("{0}")]
|
||||
MetadataRecord(#[from] SerializeError),
|
||||
#[error("{0}")]
|
||||
Encode(#[from] EncodeError),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum FromWireFormatError {
|
||||
#[error("{0}")]
|
||||
Bincode(#[from] DeserializeError),
|
||||
#[error("{0}")]
|
||||
Protobuf(#[from] ProtobufDeserializeError),
|
||||
#[error("{0}")]
|
||||
Decompress(#[from] std::io::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ProtobufDeserializeError {
|
||||
#[error("{0}")]
|
||||
Transcode(#[from] TranscodeError),
|
||||
#[error("{0}")]
|
||||
Decode(#[from] DecodeError),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum TranscodeError {
|
||||
#[error("{0}")]
|
||||
BadInput(String),
|
||||
#[error("{0}")]
|
||||
MetadataRecord(#[from] DeserializeError),
|
||||
}
|
||||
|
||||
pub trait ToWireFormat {
|
||||
fn to_wire(
|
||||
self,
|
||||
format: InterpretedFormat,
|
||||
compression: Option<Compression>,
|
||||
) -> impl std::future::Future<Output = Result<Bytes, ToWireFormatError>> + Send;
|
||||
}
|
||||
|
||||
pub trait FromWireFormat {
|
||||
type T;
|
||||
fn from_wire(
|
||||
buf: &Bytes,
|
||||
format: InterpretedFormat,
|
||||
compression: Option<Compression>,
|
||||
) -> impl std::future::Future<Output = Result<Self::T, FromWireFormatError>> + Send;
|
||||
}
|
||||
|
||||
impl ToWireFormat for InterpretedWalRecords {
|
||||
async fn to_wire(
|
||||
self,
|
||||
format: InterpretedFormat,
|
||||
compression: Option<Compression>,
|
||||
) -> Result<Bytes, ToWireFormatError> {
|
||||
use async_compression::tokio::write::ZstdEncoder;
|
||||
use async_compression::Level;
|
||||
|
||||
let encode_res: Result<Bytes, ToWireFormatError> = match format {
|
||||
InterpretedFormat::Bincode => {
|
||||
let buf = BytesMut::new();
|
||||
let mut buf = buf.writer();
|
||||
self.ser_into(&mut buf)?;
|
||||
Ok(buf.into_inner().freeze())
|
||||
}
|
||||
InterpretedFormat::Protobuf => {
|
||||
let proto: proto::InterpretedWalRecords = self.try_into()?;
|
||||
let mut buf = BytesMut::new();
|
||||
proto
|
||||
.encode(&mut buf)
|
||||
.map_err(|e| ToWireFormatError::Protobuf(e.into()))?;
|
||||
|
||||
Ok(buf.freeze())
|
||||
}
|
||||
};
|
||||
|
||||
let buf = encode_res?;
|
||||
let compressed_buf = match compression {
|
||||
Some(Compression::Zstd { level }) => {
|
||||
let mut encoder = ZstdEncoder::with_quality(
|
||||
Vec::with_capacity(buf.len() / 4),
|
||||
Level::Precise(level as i32),
|
||||
);
|
||||
encoder.write_all(&buf).await?;
|
||||
encoder.shutdown().await?;
|
||||
Bytes::from(encoder.into_inner())
|
||||
}
|
||||
None => buf,
|
||||
};
|
||||
|
||||
Ok(compressed_buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromWireFormat for InterpretedWalRecords {
|
||||
type T = Self;
|
||||
|
||||
async fn from_wire(
|
||||
buf: &Bytes,
|
||||
format: InterpretedFormat,
|
||||
compression: Option<Compression>,
|
||||
) -> Result<Self, FromWireFormatError> {
|
||||
let decompressed_buf = match compression {
|
||||
Some(Compression::Zstd { .. }) => {
|
||||
use async_compression::tokio::write::ZstdDecoder;
|
||||
let mut decoded_buf = Vec::with_capacity(buf.len());
|
||||
let mut decoder = ZstdDecoder::new(&mut decoded_buf);
|
||||
decoder.write_all(buf).await?;
|
||||
decoder.flush().await?;
|
||||
Bytes::from(decoded_buf)
|
||||
}
|
||||
None => buf.clone(),
|
||||
};
|
||||
|
||||
match format {
|
||||
InterpretedFormat::Bincode => {
|
||||
InterpretedWalRecords::des(&decompressed_buf).map_err(FromWireFormatError::Bincode)
|
||||
}
|
||||
InterpretedFormat::Protobuf => {
|
||||
let proto = proto::InterpretedWalRecords::decode(decompressed_buf)
|
||||
.map_err(|e| FromWireFormatError::Protobuf(e.into()))?;
|
||||
InterpretedWalRecords::try_from(proto)
|
||||
.map_err(|e| FromWireFormatError::Protobuf(e.into()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<InterpretedWalRecords> for proto::InterpretedWalRecords {
|
||||
type Error = SerializeError;
|
||||
|
||||
fn try_from(value: InterpretedWalRecords) -> Result<Self, Self::Error> {
|
||||
let records = value
|
||||
.records
|
||||
.into_iter()
|
||||
.map(proto::InterpretedWalRecord::try_from)
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
Ok(proto::InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: value.next_record_lsn.map(|l| l.0),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<InterpretedWalRecord> for proto::InterpretedWalRecord {
|
||||
type Error = SerializeError;
|
||||
|
||||
fn try_from(value: InterpretedWalRecord) -> Result<Self, Self::Error> {
|
||||
let metadata_record = value
|
||||
.metadata_record
|
||||
.map(|meta_rec| -> Result<Vec<u8>, Self::Error> {
|
||||
let mut buf = Vec::new();
|
||||
meta_rec.ser_into(&mut buf)?;
|
||||
Ok(buf)
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
Ok(proto::InterpretedWalRecord {
|
||||
metadata_record,
|
||||
batch: Some(proto::SerializedValueBatch::from(value.batch)),
|
||||
next_record_lsn: value.next_record_lsn.0,
|
||||
flush_uncommitted: matches!(value.flush_uncommitted, FlushUncommittedRecords::Yes),
|
||||
xid: value.xid,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SerializedValueBatch> for proto::SerializedValueBatch {
|
||||
fn from(value: SerializedValueBatch) -> Self {
|
||||
proto::SerializedValueBatch {
|
||||
raw: value.raw,
|
||||
metadata: value
|
||||
.metadata
|
||||
.into_iter()
|
||||
.map(proto::ValueMeta::from)
|
||||
.collect(),
|
||||
max_lsn: value.max_lsn.0,
|
||||
len: value.len as u64,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ValueMeta> for proto::ValueMeta {
|
||||
fn from(value: ValueMeta) -> Self {
|
||||
match value {
|
||||
ValueMeta::Observed(obs) => proto::ValueMeta {
|
||||
r#type: proto::ValueMetaType::Observed.into(),
|
||||
key: Some(proto::CompactKey::from(obs.key)),
|
||||
lsn: obs.lsn.0,
|
||||
batch_offset: None,
|
||||
len: None,
|
||||
will_init: None,
|
||||
},
|
||||
ValueMeta::Serialized(ser) => proto::ValueMeta {
|
||||
r#type: proto::ValueMetaType::Serialized.into(),
|
||||
key: Some(proto::CompactKey::from(ser.key)),
|
||||
lsn: ser.lsn.0,
|
||||
batch_offset: Some(ser.batch_offset),
|
||||
len: Some(ser.len as u64),
|
||||
will_init: Some(ser.will_init),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CompactKey> for proto::CompactKey {
|
||||
fn from(value: CompactKey) -> Self {
|
||||
proto::CompactKey {
|
||||
high: (value.raw() >> 64) as i64,
|
||||
low: value.raw() as i64,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<proto::InterpretedWalRecords> for InterpretedWalRecords {
|
||||
type Error = TranscodeError;
|
||||
|
||||
fn try_from(value: proto::InterpretedWalRecords) -> Result<Self, Self::Error> {
|
||||
let records = value
|
||||
.records
|
||||
.into_iter()
|
||||
.map(InterpretedWalRecord::try_from)
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
||||
Ok(InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: value.next_record_lsn.map(Lsn::from),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<proto::InterpretedWalRecord> for InterpretedWalRecord {
|
||||
type Error = TranscodeError;
|
||||
|
||||
fn try_from(value: proto::InterpretedWalRecord) -> Result<Self, Self::Error> {
|
||||
let metadata_record = value
|
||||
.metadata_record
|
||||
.map(|mrec| -> Result<_, DeserializeError> { MetadataRecord::des(&mrec) })
|
||||
.transpose()?;
|
||||
|
||||
let batch = {
|
||||
let batch = value.batch.ok_or_else(|| {
|
||||
TranscodeError::BadInput("InterpretedWalRecord::batch missing".to_string())
|
||||
})?;
|
||||
|
||||
SerializedValueBatch::try_from(batch)?
|
||||
};
|
||||
|
||||
Ok(InterpretedWalRecord {
|
||||
metadata_record,
|
||||
batch,
|
||||
next_record_lsn: Lsn(value.next_record_lsn),
|
||||
flush_uncommitted: if value.flush_uncommitted {
|
||||
FlushUncommittedRecords::Yes
|
||||
} else {
|
||||
FlushUncommittedRecords::No
|
||||
},
|
||||
xid: value.xid,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<proto::SerializedValueBatch> for SerializedValueBatch {
|
||||
type Error = TranscodeError;
|
||||
|
||||
fn try_from(value: proto::SerializedValueBatch) -> Result<Self, Self::Error> {
|
||||
let metadata = value
|
||||
.metadata
|
||||
.into_iter()
|
||||
.map(ValueMeta::try_from)
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
Ok(SerializedValueBatch {
|
||||
raw: value.raw,
|
||||
metadata,
|
||||
max_lsn: Lsn(value.max_lsn),
|
||||
len: value.len as usize,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<proto::ValueMeta> for ValueMeta {
|
||||
type Error = TranscodeError;
|
||||
|
||||
fn try_from(value: proto::ValueMeta) -> Result<Self, Self::Error> {
|
||||
match proto::ValueMetaType::try_from(value.r#type) {
|
||||
Ok(proto::ValueMetaType::Serialized) => {
|
||||
Ok(ValueMeta::Serialized(SerializedValueMeta {
|
||||
key: value
|
||||
.key
|
||||
.ok_or_else(|| {
|
||||
TranscodeError::BadInput("ValueMeta::key missing".to_string())
|
||||
})?
|
||||
.into(),
|
||||
lsn: Lsn(value.lsn),
|
||||
batch_offset: value.batch_offset.ok_or_else(|| {
|
||||
TranscodeError::BadInput("ValueMeta::batch_offset missing".to_string())
|
||||
})?,
|
||||
len: value.len.ok_or_else(|| {
|
||||
TranscodeError::BadInput("ValueMeta::len missing".to_string())
|
||||
})? as usize,
|
||||
will_init: value.will_init.ok_or_else(|| {
|
||||
TranscodeError::BadInput("ValueMeta::will_init missing".to_string())
|
||||
})?,
|
||||
}))
|
||||
}
|
||||
Ok(proto::ValueMetaType::Observed) => Ok(ValueMeta::Observed(ObservedValueMeta {
|
||||
key: value
|
||||
.key
|
||||
.ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))?
|
||||
.into(),
|
||||
lsn: Lsn(value.lsn),
|
||||
})),
|
||||
Err(_) => Err(TranscodeError::BadInput(format!(
|
||||
"Unexpected ValueMeta::type {}",
|
||||
value.r#type
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<proto::CompactKey> for CompactKey {
|
||||
fn from(value: proto::CompactKey) -> Self {
|
||||
(((value.high as i128) << 64) | (value.low as i128)).into()
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,7 @@ use metrics::{
|
||||
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
|
||||
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
|
||||
register_int_gauge, register_int_gauge_vec, register_uint_gauge, register_uint_gauge_vec,
|
||||
Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
|
||||
Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
|
||||
IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -457,6 +457,15 @@ pub(crate) static WAIT_LSN_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static FLUSH_WAIT_UPLOAD_TIME: Lazy<GaugeVec> = Lazy::new(|| {
|
||||
register_gauge_vec!(
|
||||
"pageserver_flush_wait_upload_seconds",
|
||||
"Time spent waiting for preceding uploads during layer flush",
|
||||
&["tenant_id", "shard_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_last_record_lsn",
|
||||
@@ -653,6 +662,35 @@ pub(crate) static COMPRESSION_IMAGE_OUTPUT_BYTES: Lazy<IntCounter> = Lazy::new(|
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_relsize_cache_entries",
|
||||
"Number of entries in the relation size cache",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!("pageserver_relsize_cache_hits", "Relation size cache hits",)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_cache_misses",
|
||||
"Relation size cache misses",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_MISSES_OLD: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_cache_misses_old",
|
||||
"Relation size cache misses where the lookup LSN is older than the last relation update"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) mod initial_logical_size {
|
||||
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -2336,6 +2374,7 @@ pub(crate) struct TimelineMetrics {
|
||||
shard_id: String,
|
||||
timeline_id: String,
|
||||
pub flush_time_histo: StorageTimeMetrics,
|
||||
pub flush_wait_upload_time_gauge: Gauge,
|
||||
pub compact_time_histo: StorageTimeMetrics,
|
||||
pub create_images_time_histo: StorageTimeMetrics,
|
||||
pub logical_size_histo: StorageTimeMetrics,
|
||||
@@ -2379,6 +2418,9 @@ impl TimelineMetrics {
|
||||
&shard_id,
|
||||
&timeline_id,
|
||||
);
|
||||
let flush_wait_upload_time_gauge = FLUSH_WAIT_UPLOAD_TIME
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
let compact_time_histo = StorageTimeMetrics::new(
|
||||
StorageTimeOperation::Compact,
|
||||
&tenant_id,
|
||||
@@ -2516,6 +2558,7 @@ impl TimelineMetrics {
|
||||
shard_id,
|
||||
timeline_id,
|
||||
flush_time_histo,
|
||||
flush_wait_upload_time_gauge,
|
||||
compact_time_histo,
|
||||
create_images_time_histo,
|
||||
logical_size_histo,
|
||||
@@ -2563,6 +2606,14 @@ impl TimelineMetrics {
|
||||
self.resident_physical_size_gauge.get()
|
||||
}
|
||||
|
||||
pub(crate) fn flush_wait_upload_time_gauge_add(&self, duration: f64) {
|
||||
self.flush_wait_upload_time_gauge.add(duration);
|
||||
crate::metrics::FLUSH_WAIT_UPLOAD_TIME
|
||||
.get_metric_with_label_values(&[&self.tenant_id, &self.shard_id, &self.timeline_id])
|
||||
.unwrap()
|
||||
.add(duration);
|
||||
}
|
||||
|
||||
pub(crate) fn shutdown(&self) {
|
||||
let was_shutdown = self
|
||||
.shutdown
|
||||
@@ -2579,6 +2630,7 @@ impl TimelineMetrics {
|
||||
let timeline_id = &self.timeline_id;
|
||||
let shard_id = &self.shard_id;
|
||||
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
let _ = FLUSH_WAIT_UPLOAD_TIME.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
{
|
||||
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
|
||||
|
||||
@@ -10,6 +10,9 @@ use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::aux_file;
|
||||
use crate::context::RequestContext;
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
|
||||
};
|
||||
use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
|
||||
@@ -1129,9 +1132,12 @@ impl Timeline {
|
||||
let rel_size_cache = self.rel_size_cache.read().unwrap();
|
||||
if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
|
||||
if lsn >= *cached_lsn {
|
||||
RELSIZE_CACHE_HITS.inc();
|
||||
return Some(*nblocks);
|
||||
}
|
||||
RELSIZE_CACHE_MISSES_OLD.inc();
|
||||
}
|
||||
RELSIZE_CACHE_MISSES.inc();
|
||||
None
|
||||
}
|
||||
|
||||
@@ -1156,6 +1162,7 @@ impl Timeline {
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert((lsn, nblocks));
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1163,13 +1170,17 @@ impl Timeline {
|
||||
/// Store cached relation size
|
||||
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
rel_size_cache.map.insert(tag, (lsn, nblocks));
|
||||
if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove cached relation size
|
||||
pub fn remove_cached_rel_size(&self, tag: &RelTag) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
rel_size_cache.map.remove(tag);
|
||||
if rel_size_cache.map.remove(tag).is_some() {
|
||||
RELSIZE_CACHE_ENTRIES.dec();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5344,6 +5344,7 @@ pub(crate) mod harness {
|
||||
lsn_lease_length: Some(tenant_conf.lsn_lease_length),
|
||||
lsn_lease_length_for_ts: Some(tenant_conf.lsn_lease_length_for_ts),
|
||||
timeline_offloading: Some(tenant_conf.timeline_offloading),
|
||||
wal_receiver_protocol_override: tenant_conf.wal_receiver_protocol_override,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use serde_json::Value;
|
||||
use std::num::NonZeroU64;
|
||||
use std::time::Duration;
|
||||
use utils::generation::Generation;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub(crate) enum AttachmentMode {
|
||||
@@ -353,6 +354,9 @@ pub struct TenantConfOpt {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub timeline_offloading: Option<bool>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -418,6 +422,9 @@ impl TenantConfOpt {
|
||||
timeline_offloading: self
|
||||
.lazy_slru_download
|
||||
.unwrap_or(global_conf.timeline_offloading),
|
||||
wal_receiver_protocol_override: self
|
||||
.wal_receiver_protocol_override
|
||||
.or(global_conf.wal_receiver_protocol_override),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -472,6 +479,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
lsn_lease_length: value.lsn_lease_length.map(humantime),
|
||||
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime),
|
||||
timeline_offloading: value.timeline_offloading,
|
||||
wal_receiver_protocol_override: value.wal_receiver_protocol_override,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,6 +50,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::{
|
||||
fs_ext, pausable_failpoint,
|
||||
postgres_client::PostgresClientProtocol,
|
||||
sync::gate::{Gate, GateGuard},
|
||||
};
|
||||
use wal_decoder::serialized_batch::SerializedValueBatch;
|
||||
@@ -2178,6 +2179,21 @@ impl Timeline {
|
||||
)
|
||||
}
|
||||
|
||||
/// Resolve the effective WAL receiver protocol to use for this tenant.
|
||||
///
|
||||
/// Priority order is:
|
||||
/// 1. Tenant config override
|
||||
/// 2. Default value for tenant config override
|
||||
/// 3. Pageserver config override
|
||||
/// 4. Pageserver config default
|
||||
pub fn resolve_wal_receiver_protocol(&self) -> PostgresClientProtocol {
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
.wal_receiver_protocol_override
|
||||
.or(self.conf.default_tenant_conf.wal_receiver_protocol_override)
|
||||
.unwrap_or(self.conf.wal_receiver_protocol)
|
||||
}
|
||||
|
||||
pub(super) fn tenant_conf_updated(&self, new_conf: &AttachedTenantConf) {
|
||||
// NB: Most tenant conf options are read by background loops, so,
|
||||
// changes will automatically be picked up.
|
||||
@@ -2470,7 +2486,7 @@ impl Timeline {
|
||||
*guard = Some(WalReceiver::start(
|
||||
Arc::clone(self),
|
||||
WalReceiverConf {
|
||||
protocol: self.conf.wal_receiver_protocol,
|
||||
protocol: self.resolve_wal_receiver_protocol(),
|
||||
wal_connect_timeout,
|
||||
lagging_wal_timeout,
|
||||
max_lsn_wal_lag,
|
||||
@@ -3829,7 +3845,8 @@ impl Timeline {
|
||||
};
|
||||
|
||||
// Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files.
|
||||
// This makes us refuse ingest until the new layers have been persisted to the remote.
|
||||
// This makes us refuse ingest until the new layers have been persisted to the remote
|
||||
let start = Instant::now();
|
||||
self.remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
@@ -3842,6 +3859,8 @@ impl Timeline {
|
||||
FlushLayerError::Other(anyhow!(e).into())
|
||||
}
|
||||
})?;
|
||||
let duration = start.elapsed().as_secs_f64();
|
||||
self.metrics.flush_wait_upload_time_gauge_add(duration);
|
||||
|
||||
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
|
||||
// a compaction can delete the file and then it won't be available for uploads any more.
|
||||
|
||||
@@ -535,6 +535,7 @@ impl ConnectionManagerState {
|
||||
let node_id = new_sk.safekeeper_id;
|
||||
let connect_timeout = self.conf.wal_connect_timeout;
|
||||
let ingest_batch_size = self.conf.ingest_batch_size;
|
||||
let protocol = self.conf.protocol;
|
||||
let timeline = Arc::clone(&self.timeline);
|
||||
let ctx = ctx.detached_child(
|
||||
TaskKind::WalReceiverConnectionHandler,
|
||||
@@ -548,6 +549,7 @@ impl ConnectionManagerState {
|
||||
|
||||
let res = super::walreceiver_connection::handle_walreceiver_connection(
|
||||
timeline,
|
||||
protocol,
|
||||
new_sk.wal_source_connconf,
|
||||
events_sender,
|
||||
cancellation.clone(),
|
||||
@@ -991,7 +993,7 @@ impl ConnectionManagerState {
|
||||
PostgresClientProtocol::Vanilla => {
|
||||
(None, None, None)
|
||||
},
|
||||
PostgresClientProtocol::Interpreted => {
|
||||
PostgresClientProtocol::Interpreted { .. } => {
|
||||
let shard_identity = self.timeline.get_shard_identity();
|
||||
(
|
||||
Some(shard_identity.number.0),
|
||||
|
||||
@@ -22,7 +22,10 @@ use tokio::{select, sync::watch, time};
|
||||
use tokio_postgres::{replication::ReplicationStream, Client};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, trace, warn, Instrument};
|
||||
use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecord};
|
||||
use wal_decoder::{
|
||||
models::{FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords},
|
||||
wire_format::FromWireFormat,
|
||||
};
|
||||
|
||||
use super::TaskStateUpdate;
|
||||
use crate::{
|
||||
@@ -36,7 +39,7 @@ use crate::{
|
||||
use postgres_backend::is_expected_io_error;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use utils::{bin_ser::BeSer, id::NodeId, lsn::Lsn};
|
||||
use utils::{id::NodeId, lsn::Lsn, postgres_client::PostgresClientProtocol};
|
||||
use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
|
||||
|
||||
/// Status of the connection.
|
||||
@@ -109,6 +112,7 @@ impl From<WalDecodeError> for WalReceiverError {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) async fn handle_walreceiver_connection(
|
||||
timeline: Arc<Timeline>,
|
||||
protocol: PostgresClientProtocol,
|
||||
wal_source_connconf: PgConnectionConfig,
|
||||
events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
|
||||
cancellation: CancellationToken,
|
||||
@@ -260,6 +264,14 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
|
||||
|
||||
let interpreted_proto_config = match protocol {
|
||||
PostgresClientProtocol::Vanilla => None,
|
||||
PostgresClientProtocol::Interpreted {
|
||||
format,
|
||||
compression,
|
||||
} => Some((format, compression)),
|
||||
};
|
||||
|
||||
while let Some(replication_message) = {
|
||||
select! {
|
||||
_ = cancellation.cancelled() => {
|
||||
@@ -332,16 +344,26 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// This is the end LSN of the raw WAL from which the records
|
||||
// were interpreted.
|
||||
let streaming_lsn = Lsn::from(raw.streaming_lsn());
|
||||
tracing::debug!(
|
||||
"Received WAL up to {streaming_lsn} with next_record_lsn={}",
|
||||
Lsn(raw.next_record_lsn().unwrap_or(0))
|
||||
);
|
||||
|
||||
let records = Vec::<InterpretedWalRecord>::des(raw.data()).with_context(|| {
|
||||
anyhow::anyhow!(
|
||||
let (format, compression) = interpreted_proto_config.unwrap();
|
||||
let batch = InterpretedWalRecords::from_wire(raw.data(), format, compression)
|
||||
.await
|
||||
.with_context(|| {
|
||||
anyhow::anyhow!(
|
||||
"Failed to deserialize interpreted records ending at LSN {streaming_lsn}"
|
||||
)
|
||||
})?;
|
||||
})?;
|
||||
|
||||
let InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn,
|
||||
} = batch;
|
||||
|
||||
tracing::debug!(
|
||||
"Received WAL up to {} with next_record_lsn={:?}",
|
||||
streaming_lsn,
|
||||
next_record_lsn
|
||||
);
|
||||
|
||||
// We start the modification at 0 because each interpreted record
|
||||
// advances it to its end LSN. 0 is just an initialization placeholder.
|
||||
@@ -360,14 +382,18 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
.await?;
|
||||
}
|
||||
|
||||
let next_record_lsn = interpreted.next_record_lsn;
|
||||
let local_next_record_lsn = interpreted.next_record_lsn;
|
||||
let ingested = walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.await
|
||||
.with_context(|| format!("could not ingest record at {next_record_lsn}"))?;
|
||||
.with_context(|| {
|
||||
format!("could not ingest record at {local_next_record_lsn}")
|
||||
})?;
|
||||
|
||||
if !ingested {
|
||||
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
|
||||
tracing::debug!(
|
||||
"ingest: filtered out record @ LSN {local_next_record_lsn}"
|
||||
);
|
||||
WAL_INGEST.records_filtered.inc();
|
||||
filtered_records += 1;
|
||||
}
|
||||
@@ -399,7 +425,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// need to advance last record LSN on all shards. If we've not ingested the latest
|
||||
// record, then set the LSN of the modification past it. This way all shards
|
||||
// advance their last record LSN at the same time.
|
||||
let needs_last_record_lsn_advance = match raw.next_record_lsn().map(Lsn::from) {
|
||||
let needs_last_record_lsn_advance = match next_record_lsn.map(Lsn::from) {
|
||||
Some(lsn) if lsn > modification.get_lsn() => {
|
||||
modification.set_lsn(lsn).unwrap();
|
||||
true
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
|
||||
#define LS_MONITOR_CHECK_INTERVAL 10000 /* ms */
|
||||
|
||||
static int logical_replication_max_snap_files = 300;
|
||||
static int logical_replication_max_snap_files = 10000;
|
||||
|
||||
/*
|
||||
* According to Chi (shyzh), the pageserver _should_ be good with 10 MB worth of
|
||||
@@ -184,7 +184,7 @@ InitLogicalReplicationMonitor(void)
|
||||
"Maximum allowed logical replication .snap files. When exceeded, slots are dropped until the limit is met. -1 disables the limit.",
|
||||
NULL,
|
||||
&logical_replication_max_snap_files,
|
||||
300, -1, INT_MAX,
|
||||
10000, -1, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::Arc;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
|
||||
use pq_proto::CancelKeyData;
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
@@ -17,9 +18,6 @@ use crate::rate_limiter::LeakyBucketRateLimiter;
|
||||
use crate::redis::cancellation_publisher::{
|
||||
CancellationPublisher, CancellationPublisherMut, RedisPublisherClient,
|
||||
};
|
||||
use std::net::IpAddr;
|
||||
|
||||
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
|
||||
|
||||
pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;
|
||||
pub type CancellationHandlerMain = CancellationHandler<Option<Arc<Mutex<RedisPublisherClient>>>>;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::TryFutureExt;
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, Instrument};
|
||||
@@ -88,40 +88,37 @@ pub async fn task_main(
|
||||
crate::metrics::Protocol::Tcp,
|
||||
&config.region,
|
||||
);
|
||||
let span = ctx.span();
|
||||
|
||||
let startup = Box::pin(
|
||||
handle_client(
|
||||
config,
|
||||
backend,
|
||||
&ctx,
|
||||
cancellation_handler,
|
||||
socket,
|
||||
conn_gauge,
|
||||
)
|
||||
.instrument(span.clone()),
|
||||
);
|
||||
let res = startup.await;
|
||||
let res = handle_client(
|
||||
config,
|
||||
backend,
|
||||
&ctx,
|
||||
cancellation_handler,
|
||||
socket,
|
||||
conn_gauge,
|
||||
)
|
||||
.instrument(ctx.span())
|
||||
.boxed()
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Err(e) => {
|
||||
// todo: log and push to ctx the error kind
|
||||
ctx.set_error_kind(e.get_error_kind());
|
||||
error!(parent: &span, "per-client task finished with an error: {e:#}");
|
||||
error!(parent: &ctx.span(), "per-client task finished with an error: {e:#}");
|
||||
}
|
||||
Ok(None) => {
|
||||
ctx.set_success();
|
||||
}
|
||||
Ok(Some(p)) => {
|
||||
ctx.set_success();
|
||||
ctx.log_connect();
|
||||
match p.proxy_pass().instrument(span.clone()).await {
|
||||
let _disconnect = ctx.log_connect();
|
||||
match p.proxy_pass().await {
|
||||
Ok(()) => {}
|
||||
Err(ErrorSource::Client(e)) => {
|
||||
error!(parent: &span, "per-client task finished with an IO error from the client: {e:#}");
|
||||
error!(?session_id, "per-client task finished with an IO error from the client: {e:#}");
|
||||
}
|
||||
Err(ErrorSource::Compute(e)) => {
|
||||
error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}");
|
||||
error!(?session_id, "per-client task finished with an IO error from the compute: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -219,6 +216,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
client: stream,
|
||||
aux: node.aux.clone(),
|
||||
compute: node,
|
||||
session_id: ctx.session_id(),
|
||||
_req: request_gauge,
|
||||
_conn: conn_gauge,
|
||||
_cancel: session,
|
||||
|
||||
@@ -272,11 +272,14 @@ impl RequestContext {
|
||||
this.success = true;
|
||||
}
|
||||
|
||||
pub fn log_connect(&self) {
|
||||
self.0
|
||||
.try_lock()
|
||||
.expect("should not deadlock")
|
||||
.log_connect();
|
||||
pub fn log_connect(self) -> DisconnectLogger {
|
||||
let mut this = self.0.into_inner();
|
||||
this.log_connect();
|
||||
|
||||
// close current span.
|
||||
this.span = Span::none();
|
||||
|
||||
DisconnectLogger(this)
|
||||
}
|
||||
|
||||
pub(crate) fn protocol(&self) -> Protocol {
|
||||
@@ -434,8 +437,14 @@ impl Drop for RequestContextInner {
|
||||
fn drop(&mut self) {
|
||||
if self.sender.is_some() {
|
||||
self.log_connect();
|
||||
} else {
|
||||
self.log_disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DisconnectLogger(RequestContextInner);
|
||||
|
||||
impl Drop for DisconnectLogger {
|
||||
fn drop(&mut self) {
|
||||
self.0.log_disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ pub(crate) mod wake_compute;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use copy_bidirectional::{copy_bidirectional_client_compute, ErrorSource};
|
||||
use futures::TryFutureExt;
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pq_proto::{BeMessage as Be, StartupMessageParams};
|
||||
@@ -123,42 +123,39 @@ pub async fn task_main(
|
||||
crate::metrics::Protocol::Tcp,
|
||||
&config.region,
|
||||
);
|
||||
let span = ctx.span();
|
||||
|
||||
let startup = Box::pin(
|
||||
handle_client(
|
||||
config,
|
||||
auth_backend,
|
||||
&ctx,
|
||||
cancellation_handler,
|
||||
socket,
|
||||
ClientMode::Tcp,
|
||||
endpoint_rate_limiter2,
|
||||
conn_gauge,
|
||||
)
|
||||
.instrument(span.clone()),
|
||||
);
|
||||
let res = startup.await;
|
||||
let res = handle_client(
|
||||
config,
|
||||
auth_backend,
|
||||
&ctx,
|
||||
cancellation_handler,
|
||||
socket,
|
||||
ClientMode::Tcp,
|
||||
endpoint_rate_limiter2,
|
||||
conn_gauge,
|
||||
)
|
||||
.instrument(ctx.span())
|
||||
.boxed()
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Err(e) => {
|
||||
// todo: log and push to ctx the error kind
|
||||
ctx.set_error_kind(e.get_error_kind());
|
||||
warn!(parent: &span, "per-client task finished with an error: {e:#}");
|
||||
warn!(parent: &ctx.span(), "per-client task finished with an error: {e:#}");
|
||||
}
|
||||
Ok(None) => {
|
||||
ctx.set_success();
|
||||
}
|
||||
Ok(Some(p)) => {
|
||||
ctx.set_success();
|
||||
ctx.log_connect();
|
||||
match p.proxy_pass().instrument(span.clone()).await {
|
||||
let _disconnect = ctx.log_connect();
|
||||
match p.proxy_pass().await {
|
||||
Ok(()) => {}
|
||||
Err(ErrorSource::Client(e)) => {
|
||||
warn!(parent: &span, "per-client task finished with an IO error from the client: {e:#}");
|
||||
warn!(?session_id, "per-client task finished with an IO error from the client: {e:#}");
|
||||
}
|
||||
Err(ErrorSource::Compute(e)) => {
|
||||
error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}");
|
||||
error!(?session_id, "per-client task finished with an IO error from the compute: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -352,6 +349,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
client: stream,
|
||||
aux: node.aux.clone(),
|
||||
compute: node,
|
||||
session_id: ctx.session_id(),
|
||||
_req: request_gauge,
|
||||
_conn: conn_gauge,
|
||||
_cancel: session,
|
||||
|
||||
@@ -59,6 +59,7 @@ pub(crate) struct ProxyPassthrough<P, S> {
|
||||
pub(crate) client: Stream<S>,
|
||||
pub(crate) compute: PostgresConnection,
|
||||
pub(crate) aux: MetricsAuxInfo,
|
||||
pub(crate) session_id: uuid::Uuid,
|
||||
|
||||
pub(crate) _req: NumConnectionRequestsGuard<'static>,
|
||||
pub(crate) _conn: NumClientConnectionsGuard<'static>,
|
||||
@@ -69,7 +70,7 @@ impl<P, S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<P, S> {
|
||||
pub(crate) async fn proxy_pass(self) -> Result<(), ErrorSource> {
|
||||
let res = proxy_pass(self.client, self.compute.stream, self.aux).await;
|
||||
if let Err(err) = self.compute.cancel_closure.try_cancel_query().await {
|
||||
tracing::warn!(?err, "could not cancel the query in the database");
|
||||
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use core::net::IpAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use core::net::IpAddr;
|
||||
use pq_proto::CancelKeyData;
|
||||
use redis::AsyncCommands;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
@@ -123,17 +123,10 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
// https://github.com/neondatabase/neon/pull/2433#discussion_r970005064
|
||||
match opt.split_once('=') {
|
||||
Some(("protocol", value)) => {
|
||||
let raw_value = value
|
||||
.parse::<u8>()
|
||||
.with_context(|| format!("Failed to parse {value} as protocol"))?;
|
||||
|
||||
self.protocol = Some(
|
||||
PostgresClientProtocol::try_from(raw_value).map_err(|_| {
|
||||
QueryError::Other(anyhow::anyhow!(
|
||||
"Unexpected client protocol type: {raw_value}"
|
||||
))
|
||||
})?,
|
||||
);
|
||||
self.protocol =
|
||||
Some(serde_json::from_str(value).with_context(|| {
|
||||
format!("Failed to parse {value} as protocol")
|
||||
})?);
|
||||
}
|
||||
Some(("ztenantid", value)) | Some(("tenant_id", value)) => {
|
||||
self.tenant_id = Some(value.parse().with_context(|| {
|
||||
@@ -180,7 +173,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
)));
|
||||
}
|
||||
}
|
||||
PostgresClientProtocol::Interpreted => {
|
||||
PostgresClientProtocol::Interpreted { .. } => {
|
||||
match (shard_count, shard_number, shard_stripe_size) {
|
||||
(Some(count), Some(number), Some(stripe_size)) => {
|
||||
let params = ShardParameters {
|
||||
|
||||
@@ -9,9 +9,11 @@ use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
|
||||
use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::time::MissedTickBehavior;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::InterpretedWalRecord;
|
||||
use utils::postgres_client::Compression;
|
||||
use utils::postgres_client::InterpretedFormat;
|
||||
use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords};
|
||||
use wal_decoder::wire_format::ToWireFormat;
|
||||
|
||||
use crate::send_wal::EndWatchView;
|
||||
use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
|
||||
@@ -20,6 +22,8 @@ use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
|
||||
/// This is used for sending WAL to the pageserver. Said WAL
|
||||
/// is pre-interpreted and filtered for the shard.
|
||||
pub(crate) struct InterpretedWalSender<'a, IO> {
|
||||
pub(crate) format: InterpretedFormat,
|
||||
pub(crate) compression: Option<Compression>,
|
||||
pub(crate) pgb: &'a mut PostgresBackend<IO>,
|
||||
pub(crate) wal_stream_builder: WalReaderStreamBuilder,
|
||||
pub(crate) end_watch_view: EndWatchView,
|
||||
@@ -28,6 +32,12 @@ pub(crate) struct InterpretedWalSender<'a, IO> {
|
||||
pub(crate) appname: Option<String>,
|
||||
}
|
||||
|
||||
struct Batch {
|
||||
wal_end_lsn: Lsn,
|
||||
available_wal_end_lsn: Lsn,
|
||||
records: InterpretedWalRecords,
|
||||
}
|
||||
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
/// Send interpreted WAL to a receiver.
|
||||
/// Stops when an error occurs or the receiver is caught up and there's no active compute.
|
||||
@@ -46,10 +56,13 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
keepalive_ticker.reset();
|
||||
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel::<Batch>(2);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Get some WAL from the stream and then: decode, interpret and send it
|
||||
wal = stream.next() => {
|
||||
// Get some WAL from the stream and then: decode, interpret and push it down the
|
||||
// pipeline.
|
||||
wal = stream.next(), if tx.capacity() > 0 => {
|
||||
let WalBytes { wal, wal_start_lsn: _, wal_end_lsn, available_wal_end_lsn } = match wal {
|
||||
Some(some) => some?,
|
||||
None => { break; }
|
||||
@@ -81,10 +94,26 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
}
|
||||
}
|
||||
|
||||
let mut buf = Vec::new();
|
||||
records
|
||||
.ser_into(&mut buf)
|
||||
.with_context(|| "Failed to serialize interpreted WAL")?;
|
||||
let batch = InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: max_next_record_lsn
|
||||
};
|
||||
|
||||
tx.send(Batch {wal_end_lsn, available_wal_end_lsn, records: batch}).await.unwrap();
|
||||
},
|
||||
// For a previously interpreted batch, serialize it and push it down the wire.
|
||||
batch = rx.recv() => {
|
||||
let batch = match batch {
|
||||
Some(b) => b,
|
||||
None => { break; }
|
||||
};
|
||||
|
||||
let buf = batch
|
||||
.records
|
||||
.to_wire(self.format, self.compression)
|
||||
.await
|
||||
.with_context(|| "Failed to serialize interpreted WAL")
|
||||
.map_err(CopyStreamHandlerEnd::from)?;
|
||||
|
||||
// Reset the keep alive ticker since we are sending something
|
||||
// over the wire now.
|
||||
@@ -92,13 +121,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
|
||||
self.pgb
|
||||
.write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
|
||||
streaming_lsn: wal_end_lsn.0,
|
||||
commit_lsn: available_wal_end_lsn.0,
|
||||
next_record_lsn: max_next_record_lsn.unwrap_or(Lsn::INVALID).0,
|
||||
data: buf.as_slice(),
|
||||
streaming_lsn: batch.wal_end_lsn.0,
|
||||
commit_lsn: batch.available_wal_end_lsn.0,
|
||||
data: &buf,
|
||||
})).await?;
|
||||
}
|
||||
|
||||
// Send a periodic keep alive when the connection has been idle for a while.
|
||||
_ = keepalive_ticker.tick() => {
|
||||
self.pgb
|
||||
|
||||
@@ -454,7 +454,7 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
|
||||
info!(
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={}",
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={:?}",
|
||||
start_pos,
|
||||
end_pos,
|
||||
matches!(end_watch, EndWatch::Flush(_)),
|
||||
@@ -489,7 +489,10 @@ impl SafekeeperPostgresHandler {
|
||||
|
||||
Either::Left(sender.run())
|
||||
}
|
||||
PostgresClientProtocol::Interpreted => {
|
||||
PostgresClientProtocol::Interpreted {
|
||||
format,
|
||||
compression,
|
||||
} => {
|
||||
let pg_version = tli.tli.get_state().await.1.server.pg_version / 10000;
|
||||
let end_watch_view = end_watch.view();
|
||||
let wal_stream_builder = WalReaderStreamBuilder {
|
||||
@@ -502,6 +505,8 @@ impl SafekeeperPostgresHandler {
|
||||
};
|
||||
|
||||
let sender = InterpretedWalSender {
|
||||
format,
|
||||
compression,
|
||||
pgb,
|
||||
wal_stream_builder,
|
||||
end_watch_view,
|
||||
|
||||
@@ -168,6 +168,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
|
||||
"pageserver_evictions_with_low_residence_duration_total",
|
||||
"pageserver_aux_file_estimated_size",
|
||||
"pageserver_valid_lsn_lease_count",
|
||||
"pageserver_flush_wait_upload_seconds",
|
||||
counter("pageserver_tenant_throttling_count_accounted_start"),
|
||||
counter("pageserver_tenant_throttling_count_accounted_finish"),
|
||||
counter("pageserver_tenant_throttling_wait_usecs_sum"),
|
||||
|
||||
@@ -310,6 +310,31 @@ class PgProtocol:
|
||||
return self.safe_psql(query, log_query=log_query)[0][0]
|
||||
|
||||
|
||||
class PageserverWalReceiverProtocol(StrEnum):
|
||||
VANILLA = "vanilla"
|
||||
INTERPRETED = "interpreted"
|
||||
|
||||
@staticmethod
|
||||
def to_config_key_value(proto) -> tuple[str, dict[str, Any]]:
|
||||
if proto == PageserverWalReceiverProtocol.VANILLA:
|
||||
return (
|
||||
"wal_receiver_protocol",
|
||||
{
|
||||
"type": "vanilla",
|
||||
},
|
||||
)
|
||||
elif proto == PageserverWalReceiverProtocol.INTERPRETED:
|
||||
return (
|
||||
"wal_receiver_protocol",
|
||||
{
|
||||
"type": "interpreted",
|
||||
"args": {"format": "protobuf", "compression": {"zstd": {"level": 1}}},
|
||||
},
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unknown protocol type: {proto}")
|
||||
|
||||
|
||||
class NeonEnvBuilder:
|
||||
"""
|
||||
Builder object to create a Neon runtime environment
|
||||
@@ -356,6 +381,7 @@ class NeonEnvBuilder:
|
||||
safekeeper_extra_opts: list[str] | None = None,
|
||||
storage_controller_port_override: int | None = None,
|
||||
pageserver_virtual_file_io_mode: str | None = None,
|
||||
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
@@ -409,6 +435,8 @@ class NeonEnvBuilder:
|
||||
|
||||
self.pageserver_virtual_file_io_mode = pageserver_virtual_file_io_mode
|
||||
|
||||
self.pageserver_wal_receiver_protocol = pageserver_wal_receiver_protocol
|
||||
|
||||
assert test_name.startswith(
|
||||
"test_"
|
||||
), "Unexpectedly instantiated from outside a test function"
|
||||
@@ -1023,6 +1051,7 @@ class NeonEnv:
|
||||
|
||||
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
|
||||
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
|
||||
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
|
||||
|
||||
# Create the neon_local's `NeonLocalInitConf`
|
||||
cfg: dict[str, Any] = {
|
||||
@@ -1092,6 +1121,13 @@ class NeonEnv:
|
||||
if self.pageserver_virtual_file_io_mode is not None:
|
||||
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
|
||||
|
||||
if self.pageserver_wal_receiver_protocol is not None:
|
||||
key, value = PageserverWalReceiverProtocol.to_config_key_value(
|
||||
self.pageserver_wal_receiver_protocol
|
||||
)
|
||||
if key not in ps_cfg:
|
||||
ps_cfg[key] = value
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
self.pageservers.append(
|
||||
NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"])
|
||||
|
||||
@@ -60,13 +60,13 @@ def build_pgcopydb_command(pgcopydb_filter_file: Path, test_output_dir: Path):
|
||||
"--no-acl",
|
||||
"--skip-db-properties",
|
||||
"--table-jobs",
|
||||
"4",
|
||||
"8",
|
||||
"--index-jobs",
|
||||
"4",
|
||||
"8",
|
||||
"--restore-jobs",
|
||||
"4",
|
||||
"8",
|
||||
"--split-tables-larger-than",
|
||||
"10GB",
|
||||
"5GB",
|
||||
"--skip-extensions",
|
||||
"--use-copy-binary",
|
||||
"--filters",
|
||||
@@ -136,7 +136,7 @@ def run_command_and_log_output(command, log_file_path: Path):
|
||||
"LD_LIBRARY_PATH": f"{os.getenv('PGCOPYDB_LIB_PATH')}:{os.getenv('PG_16_LIB_PATH')}",
|
||||
"PGCOPYDB_SOURCE_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_SOURCE_CONNSTR")),
|
||||
"PGCOPYDB_TARGET_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR")),
|
||||
"PGOPTIONS": "-c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7",
|
||||
"PGOPTIONS": "-c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=16",
|
||||
}
|
||||
# Combine the current environment with custom variables
|
||||
env = os.environ.copy()
|
||||
|
||||
@@ -15,7 +15,14 @@ from fixtures.neon_fixtures import (
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.parametrize("shard_count", [1, 8, 32])
|
||||
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
|
||||
@pytest.mark.parametrize(
|
||||
"wal_receiver_protocol",
|
||||
[
|
||||
"vanilla",
|
||||
"interpreted-bincode-compressed",
|
||||
"interpreted-protobuf-compressed",
|
||||
],
|
||||
)
|
||||
def test_sharded_ingest(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
@@ -27,14 +34,42 @@ def test_sharded_ingest(
|
||||
and fanning out to a large number of shards on dedicated Pageservers. Comparing the base case
|
||||
(shard_count=1) to the sharded case indicates the overhead of sharding.
|
||||
"""
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"wal_receiver_protocol = '{wal_receiver_protocol}'"
|
||||
)
|
||||
|
||||
ROW_COUNT = 100_000_000 # about 7 GB of WAL
|
||||
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_env_builder.init_configs()
|
||||
|
||||
for ps in env.pageservers:
|
||||
if wal_receiver_protocol == "vanilla":
|
||||
ps.patch_config_toml_nonrecursive(
|
||||
{
|
||||
"wal_receiver_protocol": {
|
||||
"type": "vanilla",
|
||||
}
|
||||
}
|
||||
)
|
||||
elif wal_receiver_protocol == "interpreted-bincode-compressed":
|
||||
ps.patch_config_toml_nonrecursive(
|
||||
{
|
||||
"wal_receiver_protocol": {
|
||||
"type": "interpreted",
|
||||
"args": {"format": "bincode", "compression": {"zstd": {"level": 1}}},
|
||||
}
|
||||
}
|
||||
)
|
||||
elif wal_receiver_protocol == "interpreted-protobuf-compressed":
|
||||
ps.patch_config_toml_nonrecursive(
|
||||
{
|
||||
"wal_receiver_protocol": {
|
||||
"type": "interpreted",
|
||||
"args": {"format": "protobuf", "compression": {"zstd": {"level": 1}}},
|
||||
}
|
||||
}
|
||||
)
|
||||
else:
|
||||
raise AssertionError("Test must use explicit wal receiver protocol config")
|
||||
|
||||
env.start()
|
||||
|
||||
# Create a sharded tenant and timeline, and migrate it to the respective pageservers. Ensure
|
||||
# the storage controller doesn't mess with shard placements.
|
||||
|
||||
@@ -174,6 +174,10 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"lsn_lease_length": "1m",
|
||||
"lsn_lease_length_for_ts": "5s",
|
||||
"timeline_offloading": True,
|
||||
"wal_receiver_protocol_override": {
|
||||
"type": "interpreted",
|
||||
"args": {"format": "bincode", "compression": {"zstd": {"level": 1}}},
|
||||
},
|
||||
}
|
||||
|
||||
vps_http = env.storage_controller.pageserver_api()
|
||||
|
||||
@@ -8,6 +8,7 @@ import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PageserverWalReceiverProtocol,
|
||||
generate_uploads_and_deletions,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
@@ -27,8 +28,13 @@ AGGRESIVE_COMPACTION_TENANT_CONF = {
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
|
||||
def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: str):
|
||||
@pytest.mark.parametrize(
|
||||
"wal_receiver_protocol",
|
||||
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
|
||||
)
|
||||
def test_pageserver_compaction_smoke(
|
||||
neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol
|
||||
):
|
||||
"""
|
||||
This is a smoke test that compaction kicks in. The workload repeatedly churns
|
||||
a small number of rows and manually instructs the pageserver to run compaction
|
||||
@@ -37,10 +43,12 @@ def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder, wal_recei
|
||||
observed bounds.
|
||||
"""
|
||||
|
||||
neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol
|
||||
|
||||
# Effectively disable the page cache to rely only on image layers
|
||||
# to shorten reads.
|
||||
neon_env_builder.pageserver_config_override = f"""
|
||||
page_cache_size=10; wal_receiver_protocol='{wal_receiver_protocol}'
|
||||
neon_env_builder.pageserver_config_override = """
|
||||
page_cache_size=10
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF)
|
||||
|
||||
@@ -3,7 +3,7 @@ from __future__ import annotations
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_cli import WalCraft
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PageserverWalReceiverProtocol
|
||||
|
||||
# Restart nodes with WAL end having specially crafted shape, like last record
|
||||
# crossing segment boundary, to test decoding issues.
|
||||
@@ -19,13 +19,16 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
"wal_record_crossing_segment_followed_by_small_one",
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
|
||||
@pytest.mark.parametrize(
|
||||
"wal_receiver_protocol",
|
||||
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
|
||||
)
|
||||
def test_crafted_wal_end(
|
||||
neon_env_builder: NeonEnvBuilder, wal_type: str, wal_receiver_protocol: str
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
wal_type: str,
|
||||
wal_receiver_protocol: PageserverWalReceiverProtocol,
|
||||
):
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"wal_receiver_protocol = '{wal_receiver_protocol}'"
|
||||
)
|
||||
neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
env.create_branch("test_crafted_wal_end")
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, check_restored_datadir_content
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PageserverWalReceiverProtocol,
|
||||
check_restored_datadir_content,
|
||||
)
|
||||
|
||||
|
||||
# Test subtransactions
|
||||
@@ -10,11 +14,12 @@ from fixtures.neon_fixtures import NeonEnvBuilder, check_restored_datadir_conten
|
||||
# maintained in the pageserver, so subtransactions are not very exciting for
|
||||
# Neon. They are included in the commit record though and updated in the
|
||||
# CLOG.
|
||||
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
|
||||
@pytest.mark.parametrize(
|
||||
"wal_receiver_protocol",
|
||||
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
|
||||
)
|
||||
def test_subxacts(neon_env_builder: NeonEnvBuilder, test_output_dir, wal_receiver_protocol):
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"wal_receiver_protocol = '{wal_receiver_protocol}'"
|
||||
)
|
||||
neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
@@ -11,7 +11,13 @@ import pytest
|
||||
import toml
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.log_helper import getLogger
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper
|
||||
from fixtures.neon_fixtures import (
|
||||
Endpoint,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PageserverWalReceiverProtocol,
|
||||
Safekeeper,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.utils import skip_in_debug_build
|
||||
|
||||
@@ -622,12 +628,15 @@ async def run_segment_init_failure(env: NeonEnv):
|
||||
# Test (injected) failure during WAL segment init.
|
||||
# https://github.com/neondatabase/neon/issues/6401
|
||||
# https://github.com/neondatabase/neon/issues/6402
|
||||
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
|
||||
def test_segment_init_failure(neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: str):
|
||||
@pytest.mark.parametrize(
|
||||
"wal_receiver_protocol",
|
||||
[PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED],
|
||||
)
|
||||
def test_segment_init_failure(
|
||||
neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol
|
||||
):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"wal_receiver_protocol = '{wal_receiver_protocol}'"
|
||||
)
|
||||
neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
asyncio.run(run_segment_init_failure(env))
|
||||
|
||||
Reference in New Issue
Block a user