diff --git a/.github/workflows/build-build-tools-image.yml b/.github/workflows/build-build-tools-image.yml index 93da86a353..0a7f0cd7a0 100644 --- a/.github/workflows/build-build-tools-image.yml +++ b/.github/workflows/build-build-tools-image.yml @@ -2,6 +2,17 @@ name: Build build-tools image on: workflow_call: + inputs: + archs: + description: "Json array of architectures to build" + # Default values are set in `check-image` job, `set-variables` step + type: string + required: false + debians: + description: "Json array of Debian versions to build" + # Default values are set in `check-image` job, `set-variables` step + type: string + required: false outputs: image-tag: description: "build-tools tag" @@ -32,25 +43,37 @@ jobs: check-image: runs-on: ubuntu-22.04 outputs: - tag: ${{ steps.get-build-tools-tag.outputs.image-tag }} - found: ${{ steps.check-image.outputs.found }} + archs: ${{ steps.set-variables.outputs.archs }} + debians: ${{ steps.set-variables.outputs.debians }} + tag: ${{ steps.set-variables.outputs.image-tag }} + everything: ${{ steps.set-more-variables.outputs.everything }} + found: ${{ steps.set-more-variables.outputs.found }} steps: - uses: actions/checkout@v4 - - name: Get build-tools image tag for the current commit - id: get-build-tools-tag + - name: Set variables + id: set-variables env: + ARCHS: ${{ inputs.archs || '["x64","arm64"]' }} + DEBIANS: ${{ inputs.debians || '["bullseye","bookworm"]' }} IMAGE_TAG: | ${{ hashFiles('build-tools.Dockerfile', '.github/workflows/build-build-tools-image.yml') }} run: | - echo "image-tag=${IMAGE_TAG}" | tee -a $GITHUB_OUTPUT + echo "archs=${ARCHS}" | tee -a ${GITHUB_OUTPUT} + echo "debians=${DEBIANS}" | tee -a ${GITHUB_OUTPUT} + echo "image-tag=${IMAGE_TAG}" | tee -a ${GITHUB_OUTPUT} - - name: Check if such tag found in the registry - id: check-image + - name: Set more variables + id: set-more-variables env: - IMAGE_TAG: ${{ steps.get-build-tools-tag.outputs.image-tag }} + IMAGE_TAG: ${{ steps.set-variables.outputs.image-tag }} + EVERYTHING: | + ${{ contains(fromJson(steps.set-variables.outputs.archs), 'x64') && + contains(fromJson(steps.set-variables.outputs.archs), 'arm64') && + contains(fromJson(steps.set-variables.outputs.debians), 'bullseye') && + contains(fromJson(steps.set-variables.outputs.debians), 'bookworm') }} run: | if docker manifest inspect neondatabase/build-tools:${IMAGE_TAG}; then found=true @@ -58,8 +81,8 @@ jobs: found=false fi - echo "found=${found}" | tee -a $GITHUB_OUTPUT - + echo "everything=${EVERYTHING}" | tee -a ${GITHUB_OUTPUT} + echo "found=${found}" | tee -a ${GITHUB_OUTPUT} build-image: needs: [ check-image ] @@ -67,8 +90,8 @@ jobs: strategy: matrix: - debian-version: [ bullseye, bookworm ] - arch: [ x64, arm64 ] + arch: ${{ fromJson(needs.check-image.outputs.archs) }} + debian: ${{ fromJson(needs.check-image.outputs.debians) }} runs-on: ${{ fromJson(format('["self-hosted", "{0}"]', matrix.arch == 'arm64' && 'large-arm64' || 'large')) }} @@ -99,11 +122,11 @@ jobs: push: true pull: true build-args: | - DEBIAN_VERSION=${{ matrix.debian-version }} - cache-from: type=registry,ref=cache.neon.build/build-tools:cache-${{ matrix.debian-version }}-${{ matrix.arch }} - cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/build-tools:cache-{0}-{1},mode=max', matrix.debian-version, matrix.arch) || '' }} + DEBIAN_VERSION=${{ matrix.debian }} + cache-from: type=registry,ref=cache.neon.build/build-tools:cache-${{ matrix.debian }}-${{ matrix.arch }} + cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/build-tools:cache-{0}-{1},mode=max', matrix.debian, matrix.arch) || '' }} tags: | - neondatabase/build-tools:${{ needs.check-image.outputs.tag }}-${{ matrix.debian-version }}-${{ matrix.arch }} + neondatabase/build-tools:${{ needs.check-image.outputs.tag }}-${{ matrix.debian }}-${{ matrix.arch }} merge-images: needs: [ check-image, build-image ] @@ -118,15 +141,21 @@ jobs: - name: Create multi-arch image env: DEFAULT_DEBIAN_VERSION: bookworm + ARCHS: ${{ join(fromJson(needs.check-image.outputs.archs), ' ') }} + DEBIANS: ${{ join(fromJson(needs.check-image.outputs.debians), ' ') }} + EVERYTHING: ${{ needs.check-image.outputs.everything }} IMAGE_TAG: ${{ needs.check-image.outputs.tag }} run: | - for debian_version in bullseye bookworm; do - tags=("-t" "neondatabase/build-tools:${IMAGE_TAG}-${debian_version}") - if [ "${debian_version}" == "${DEFAULT_DEBIAN_VERSION}" ]; then + for debian in ${DEBIANS}; do + tags=("-t" "neondatabase/build-tools:${IMAGE_TAG}-${debian}") + + if [ "${EVERYTHING}" == "true" ] && [ "${debian}" == "${DEFAULT_DEBIAN_VERSION}" ]; then tags+=("-t" "neondatabase/build-tools:${IMAGE_TAG}") fi - docker buildx imagetools create "${tags[@]}" \ - neondatabase/build-tools:${IMAGE_TAG}-${debian_version}-x64 \ - neondatabase/build-tools:${IMAGE_TAG}-${debian_version}-arm64 + for arch in ${ARCHS}; do + tags+=("neondatabase/build-tools:${IMAGE_TAG}-${debian}-${arch}") + done + + docker buildx imagetools create "${tags[@]}" done diff --git a/.github/workflows/pre-merge-checks.yml b/.github/workflows/pre-merge-checks.yml index e1cec6d33d..d2f9d8a666 100644 --- a/.github/workflows/pre-merge-checks.yml +++ b/.github/workflows/pre-merge-checks.yml @@ -23,6 +23,8 @@ jobs: id: python-src with: files: | + .github/workflows/_check-codestyle-python.yml + .github/workflows/build-build-tools-image.yml .github/workflows/pre-merge-checks.yml **/**.py poetry.lock @@ -38,6 +40,10 @@ jobs: if: needs.get-changed-files.outputs.python-changed == 'true' needs: [ get-changed-files ] uses: ./.github/workflows/build-build-tools-image.yml + with: + # Build only one combination to save time + archs: '["x64"]' + debians: '["bookworm"]' secrets: inherit check-codestyle-python: @@ -45,7 +51,8 @@ jobs: needs: [ get-changed-files, build-build-tools-image ] uses: ./.github/workflows/_check-codestyle-python.yml with: - build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm + # `-bookworm-x64` suffix should match the combination in `build-build-tools-image` + build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm-x64 secrets: inherit # To get items from the merge queue merged into main we need to satisfy "Status checks that are required". diff --git a/Cargo.lock b/Cargo.lock index c1a14210de..43a46fb1eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4133,7 +4133,7 @@ dependencies = [ [[package]] name = "postgres" version = "0.19.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796" dependencies = [ "bytes", "fallible-iterator", @@ -4146,7 +4146,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796" dependencies = [ "base64 0.20.0", "byteorder", @@ -4165,7 +4165,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796" dependencies = [ "bytes", "fallible-iterator", @@ -6468,7 +6468,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.7" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#00940fcdb57a8e99e805297b75839e7c4c7b1796" dependencies = [ "async-trait", "byteorder", @@ -7120,10 +7120,16 @@ name = "wal_decoder" version = "0.1.0" dependencies = [ "anyhow", + "async-compression", "bytes", "pageserver_api", "postgres_ffi", + "prost", "serde", + "thiserror", + "tokio", + "tonic", + "tonic-build", "tracing", "utils", "workspace_hack", diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 4689cc2b83..6b670de2ea 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -58,7 +58,7 @@ use compute_tools::compute::{ forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID, }; use compute_tools::configurator::launch_configurator; -use compute_tools::extension_server::get_pg_version; +use compute_tools::extension_server::get_pg_version_string; use compute_tools::http::api::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; @@ -326,7 +326,7 @@ fn wait_spec( connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?, pgdata: pgdata.to_string(), pgbin: pgbin.to_string(), - pgversion: get_pg_version(pgbin), + pgversion: get_pg_version_string(pgbin), live_config_allowed, state: Mutex::new(new_state), state_changed: Condvar::new(), diff --git a/compute_tools/src/bin/fast_import.rs b/compute_tools/src/bin/fast_import.rs index 3b0b990df2..6716cc6234 100644 --- a/compute_tools/src/bin/fast_import.rs +++ b/compute_tools/src/bin/fast_import.rs @@ -29,6 +29,7 @@ use anyhow::Context; use aws_config::BehaviorVersion; use camino::{Utf8Path, Utf8PathBuf}; use clap::Parser; +use compute_tools::extension_server::{get_pg_version, PostgresMajorVersion}; use nix::unistd::Pid; use tracing::{info, info_span, warn, Instrument}; use utils::fs_ext::is_directory_empty; @@ -131,11 +132,17 @@ pub(crate) async fn main() -> anyhow::Result<()> { // // Initialize pgdata // + let pg_version = match get_pg_version(pg_bin_dir.as_str()) { + PostgresMajorVersion::V14 => 14, + PostgresMajorVersion::V15 => 15, + PostgresMajorVersion::V16 => 16, + PostgresMajorVersion::V17 => 17, + }; let superuser = "cloud_admin"; // XXX: this shouldn't be hard-coded postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs { superuser, locale: "en_US.UTF-8", // XXX: this shouldn't be hard-coded, - pg_version: 140000, // XXX: this shouldn't be hard-coded but derived from which compute image we're running in + pg_version, initdb_bin: pg_bin_dir.join("initdb").as_ref(), library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local. pgdata: &pgdata_dir, diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index da2d107b54..f13b2308e7 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -103,14 +103,33 @@ fn get_pg_config(argument: &str, pgbin: &str) -> String { .to_string() } -pub fn get_pg_version(pgbin: &str) -> String { +pub fn get_pg_version(pgbin: &str) -> PostgresMajorVersion { // pg_config --version returns a (platform specific) human readable string // such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc. let human_version = get_pg_config("--version", pgbin); - parse_pg_version(&human_version).to_string() + parse_pg_version(&human_version) } -fn parse_pg_version(human_version: &str) -> &str { +pub fn get_pg_version_string(pgbin: &str) -> String { + match get_pg_version(pgbin) { + PostgresMajorVersion::V14 => "v14", + PostgresMajorVersion::V15 => "v15", + PostgresMajorVersion::V16 => "v16", + PostgresMajorVersion::V17 => "v17", + } + .to_owned() +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum PostgresMajorVersion { + V14, + V15, + V16, + V17, +} + +fn parse_pg_version(human_version: &str) -> PostgresMajorVersion { + use PostgresMajorVersion::*; // Normal releases have version strings like "PostgreSQL 15.4". But there // are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL // 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version @@ -121,10 +140,10 @@ fn parse_pg_version(human_version: &str) -> &str { .captures(human_version) { Some(captures) if captures.len() == 2 => match &captures["major"] { - "14" => return "v14", - "15" => return "v15", - "16" => return "v16", - "17" => return "v17", + "14" => return V14, + "15" => return V15, + "16" => return V16, + "17" => return V17, _ => {} }, _ => {} @@ -263,24 +282,25 @@ mod tests { #[test] fn test_parse_pg_version() { - assert_eq!(parse_pg_version("PostgreSQL 15.4"), "v15"); - assert_eq!(parse_pg_version("PostgreSQL 15.14"), "v15"); + use super::PostgresMajorVersion::*; + assert_eq!(parse_pg_version("PostgreSQL 15.4"), V15); + assert_eq!(parse_pg_version("PostgreSQL 15.14"), V15); assert_eq!( parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"), - "v15" + V15 ); - assert_eq!(parse_pg_version("PostgreSQL 14.15"), "v14"); - assert_eq!(parse_pg_version("PostgreSQL 14.0"), "v14"); + assert_eq!(parse_pg_version("PostgreSQL 14.15"), V14); + assert_eq!(parse_pg_version("PostgreSQL 14.0"), V14); assert_eq!( parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"), - "v14" + V14 ); - assert_eq!(parse_pg_version("PostgreSQL 16devel"), "v16"); - assert_eq!(parse_pg_version("PostgreSQL 16beta1"), "v16"); - assert_eq!(parse_pg_version("PostgreSQL 16rc2"), "v16"); - assert_eq!(parse_pg_version("PostgreSQL 16extra"), "v16"); + assert_eq!(parse_pg_version("PostgreSQL 16devel"), V16); + assert_eq!(parse_pg_version("PostgreSQL 16beta1"), V16); + assert_eq!(parse_pg_version("PostgreSQL 16rc2"), V16); + assert_eq!(parse_pg_version("PostgreSQL 16extra"), V16); } #[test] diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index ae5e22ddc6..1d1455b95b 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -415,6 +415,11 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'timeline_offloading' as bool")?, + wal_receiver_protocol_override: settings + .remove("wal_receiver_protocol_override") + .map(serde_json::from_str) + .transpose() + .context("parse `wal_receiver_protocol_override` from json")?, }; if !settings.is_empty() { bail!("Unrecognized tenant settings: {settings:?}") diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 0abca5cdc2..721d97404b 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -278,6 +278,8 @@ pub struct TenantConfigToml { /// Enable auto-offloading of timelines. /// (either this flag or the pageserver-global one need to be set) pub timeline_offloading: bool, + + pub wal_receiver_protocol_override: Option, } pub mod defaults { @@ -510,6 +512,7 @@ impl Default for TenantConfigToml { lsn_lease_length: LsnLease::DEFAULT_LENGTH, lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS, timeline_offloading: false, + wal_receiver_protocol_override: None, } } } diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 4505101ea6..523d143381 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -229,6 +229,18 @@ impl Key { } } +impl CompactKey { + pub fn raw(&self) -> i128 { + self.0 + } +} + +impl From for CompactKey { + fn from(value: i128) -> Self { + Self(value) + } +} + impl fmt::Display for Key { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 1b86bfd91a..42c5d10c05 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -23,6 +23,7 @@ use utils::{ completion, id::{NodeId, TenantId, TimelineId}, lsn::Lsn, + postgres_client::PostgresClientProtocol, serde_system_time, }; @@ -352,6 +353,7 @@ pub struct TenantConfig { pub lsn_lease_length: Option, pub lsn_lease_length_for_ts: Option, pub timeline_offloading: Option, + pub wal_receiver_protocol_override: Option, } /// The policy for the aux file storage. diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index b7871ab01f..4b0331999d 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -688,9 +688,6 @@ pub struct InterpretedWalRecordsBody<'a> { pub streaming_lsn: u64, /// Current end of WAL on the server pub commit_lsn: u64, - /// Start LSN of the next record in PG WAL. - /// Is 0 if the portion of PG WAL did not contain any records. - pub next_record_lsn: u64, pub data: &'a [u8], } @@ -1028,7 +1025,6 @@ impl BeMessage<'_> { // dependency buf.put_u64(rec.streaming_lsn); buf.put_u64(rec.commit_lsn); - buf.put_u64(rec.next_record_lsn); buf.put_slice(rec.data); }); } diff --git a/libs/remote_storage/src/metrics.rs b/libs/remote_storage/src/metrics.rs index f1aa4c433b..48c121fbc8 100644 --- a/libs/remote_storage/src/metrics.rs +++ b/libs/remote_storage/src/metrics.rs @@ -176,7 +176,9 @@ pub(crate) struct BucketMetrics { impl Default for BucketMetrics { fn default() -> Self { - let buckets = [0.01, 0.10, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0]; + // first bucket 100 microseconds to count requests that do not need to wait at all + // and get a permit immediately + let buckets = [0.0001, 0.01, 0.10, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0]; let req_seconds = register_histogram_vec!( "remote_storage_s3_request_seconds", diff --git a/libs/utils/src/postgres_client.rs b/libs/utils/src/postgres_client.rs index 3073bbde4c..a62568202b 100644 --- a/libs/utils/src/postgres_client.rs +++ b/libs/utils/src/postgres_client.rs @@ -7,40 +7,31 @@ use postgres_connection::{parse_host_port, PgConnectionConfig}; use crate::id::TenantTimelineId; -/// Postgres client protocol types -#[derive( - Copy, - Clone, - PartialEq, - Eq, - strum_macros::EnumString, - strum_macros::Display, - serde_with::DeserializeFromStr, - serde_with::SerializeDisplay, - Debug, -)] -#[strum(serialize_all = "kebab-case")] -#[repr(u8)] +#[derive(Copy, Clone, PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum InterpretedFormat { + Bincode, + Protobuf, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum Compression { + Zstd { level: i8 }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(tag = "type", content = "args")] +#[serde(rename_all = "kebab-case")] pub enum PostgresClientProtocol { /// Usual Postgres replication protocol Vanilla, /// Custom shard-aware protocol that replicates interpreted records. /// Used to send wal from safekeeper to pageserver. - Interpreted, -} - -impl TryFrom for PostgresClientProtocol { - type Error = u8; - - fn try_from(value: u8) -> Result { - Ok(match value { - v if v == (PostgresClientProtocol::Vanilla as u8) => PostgresClientProtocol::Vanilla, - v if v == (PostgresClientProtocol::Interpreted as u8) => { - PostgresClientProtocol::Interpreted - } - x => return Err(x), - }) - } + Interpreted { + format: InterpretedFormat, + compression: Option, + }, } pub struct ConnectionConfigArgs<'a> { @@ -63,7 +54,10 @@ impl<'a> ConnectionConfigArgs<'a> { "-c".to_owned(), format!("timeline_id={}", self.ttid.timeline_id), format!("tenant_id={}", self.ttid.tenant_id), - format!("protocol={}", self.protocol as u8), + format!( + "protocol={}", + serde_json::to_string(&self.protocol).unwrap() + ), ]; if self.shard_number.is_some() { diff --git a/libs/wal_decoder/Cargo.toml b/libs/wal_decoder/Cargo.toml index c8c0f4c990..8fac4e38ca 100644 --- a/libs/wal_decoder/Cargo.toml +++ b/libs/wal_decoder/Cargo.toml @@ -8,11 +8,19 @@ license.workspace = true testing = ["pageserver_api/testing"] [dependencies] +async-compression.workspace = true anyhow.workspace = true bytes.workspace = true pageserver_api.workspace = true +prost.workspace = true postgres_ffi.workspace = true serde.workspace = true +thiserror.workspace = true +tokio = { workspace = true, features = ["io-util"] } +tonic.workspace = true tracing.workspace = true utils.workspace = true workspace_hack = { version = "0.1", path = "../../workspace_hack" } + +[build-dependencies] +tonic-build.workspace = true diff --git a/libs/wal_decoder/build.rs b/libs/wal_decoder/build.rs new file mode 100644 index 0000000000..d5b7ad02ad --- /dev/null +++ b/libs/wal_decoder/build.rs @@ -0,0 +1,11 @@ +fn main() -> Result<(), Box> { + // Generate rust code from .proto protobuf. + // + // Note: we previously tried to use deterministic location at proto/ for + // easy location, but apparently interference with cachepot sometimes fails + // the build then. Anyway, per cargo docs build script shouldn't output to + // anywhere but $OUT_DIR. + tonic_build::compile_protos("proto/interpreted_wal.proto") + .unwrap_or_else(|e| panic!("failed to compile protos {:?}", e)); + Ok(()) +} diff --git a/libs/wal_decoder/proto/interpreted_wal.proto b/libs/wal_decoder/proto/interpreted_wal.proto new file mode 100644 index 0000000000..0393392c1a --- /dev/null +++ b/libs/wal_decoder/proto/interpreted_wal.proto @@ -0,0 +1,43 @@ +syntax = "proto3"; + +package interpreted_wal; + +message InterpretedWalRecords { + repeated InterpretedWalRecord records = 1; + optional uint64 next_record_lsn = 2; +} + +message InterpretedWalRecord { + optional bytes metadata_record = 1; + SerializedValueBatch batch = 2; + uint64 next_record_lsn = 3; + bool flush_uncommitted = 4; + uint32 xid = 5; +} + +message SerializedValueBatch { + bytes raw = 1; + repeated ValueMeta metadata = 2; + uint64 max_lsn = 3; + uint64 len = 4; +} + +enum ValueMetaType { + Serialized = 0; + Observed = 1; +} + +message ValueMeta { + ValueMetaType type = 1; + CompactKey key = 2; + uint64 lsn = 3; + optional uint64 batch_offset = 4; + optional uint64 len = 5; + optional bool will_init = 6; +} + +message CompactKey { + int64 high = 1; + int64 low = 2; +} + diff --git a/libs/wal_decoder/src/lib.rs b/libs/wal_decoder/src/lib.rs index a8a26956e6..96b717021f 100644 --- a/libs/wal_decoder/src/lib.rs +++ b/libs/wal_decoder/src/lib.rs @@ -1,3 +1,4 @@ pub mod decoder; pub mod models; pub mod serialized_batch; +pub mod wire_format; diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index 7ac425cb5f..af22de5d95 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -37,12 +37,32 @@ use utils::lsn::Lsn; use crate::serialized_batch::SerializedValueBatch; +// Code generated by protobuf. +pub mod proto { + // Tonic does derives as `#[derive(Clone, PartialEq, ::prost::Message)]` + // we don't use these types for anything but broker data transmission, + // so it's ok to ignore this one. + #![allow(clippy::derive_partial_eq_without_eq)] + // The generated ValueMeta has a `len` method generate for its `len` field. + #![allow(clippy::len_without_is_empty)] + tonic::include_proto!("interpreted_wal"); +} + #[derive(Serialize, Deserialize)] pub enum FlushUncommittedRecords { Yes, No, } +/// A batch of interpreted WAL records +#[derive(Serialize, Deserialize)] +pub struct InterpretedWalRecords { + pub records: Vec, + // Start LSN of the next record after the batch. + // Note that said record may not belong to the current shard. + pub next_record_lsn: Option, +} + /// An interpreted Postgres WAL record, ready to be handled by the pageserver #[derive(Serialize, Deserialize)] pub struct InterpretedWalRecord { diff --git a/libs/wal_decoder/src/wire_format.rs b/libs/wal_decoder/src/wire_format.rs new file mode 100644 index 0000000000..5a343054c3 --- /dev/null +++ b/libs/wal_decoder/src/wire_format.rs @@ -0,0 +1,356 @@ +use bytes::{BufMut, Bytes, BytesMut}; +use pageserver_api::key::CompactKey; +use prost::{DecodeError, EncodeError, Message}; +use tokio::io::AsyncWriteExt; +use utils::bin_ser::{BeSer, DeserializeError, SerializeError}; +use utils::lsn::Lsn; +use utils::postgres_client::{Compression, InterpretedFormat}; + +use crate::models::{ + FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord, +}; + +use crate::serialized_batch::{ + ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta, +}; + +use crate::models::proto; + +#[derive(Debug, thiserror::Error)] +pub enum ToWireFormatError { + #[error("{0}")] + Bincode(#[from] SerializeError), + #[error("{0}")] + Protobuf(#[from] ProtobufSerializeError), + #[error("{0}")] + Compression(#[from] std::io::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum ProtobufSerializeError { + #[error("{0}")] + MetadataRecord(#[from] SerializeError), + #[error("{0}")] + Encode(#[from] EncodeError), +} + +#[derive(Debug, thiserror::Error)] +pub enum FromWireFormatError { + #[error("{0}")] + Bincode(#[from] DeserializeError), + #[error("{0}")] + Protobuf(#[from] ProtobufDeserializeError), + #[error("{0}")] + Decompress(#[from] std::io::Error), +} + +#[derive(Debug, thiserror::Error)] +pub enum ProtobufDeserializeError { + #[error("{0}")] + Transcode(#[from] TranscodeError), + #[error("{0}")] + Decode(#[from] DecodeError), +} + +#[derive(Debug, thiserror::Error)] +pub enum TranscodeError { + #[error("{0}")] + BadInput(String), + #[error("{0}")] + MetadataRecord(#[from] DeserializeError), +} + +pub trait ToWireFormat { + fn to_wire( + self, + format: InterpretedFormat, + compression: Option, + ) -> impl std::future::Future> + Send; +} + +pub trait FromWireFormat { + type T; + fn from_wire( + buf: &Bytes, + format: InterpretedFormat, + compression: Option, + ) -> impl std::future::Future> + Send; +} + +impl ToWireFormat for InterpretedWalRecords { + async fn to_wire( + self, + format: InterpretedFormat, + compression: Option, + ) -> Result { + use async_compression::tokio::write::ZstdEncoder; + use async_compression::Level; + + let encode_res: Result = match format { + InterpretedFormat::Bincode => { + let buf = BytesMut::new(); + let mut buf = buf.writer(); + self.ser_into(&mut buf)?; + Ok(buf.into_inner().freeze()) + } + InterpretedFormat::Protobuf => { + let proto: proto::InterpretedWalRecords = self.try_into()?; + let mut buf = BytesMut::new(); + proto + .encode(&mut buf) + .map_err(|e| ToWireFormatError::Protobuf(e.into()))?; + + Ok(buf.freeze()) + } + }; + + let buf = encode_res?; + let compressed_buf = match compression { + Some(Compression::Zstd { level }) => { + let mut encoder = ZstdEncoder::with_quality( + Vec::with_capacity(buf.len() / 4), + Level::Precise(level as i32), + ); + encoder.write_all(&buf).await?; + encoder.shutdown().await?; + Bytes::from(encoder.into_inner()) + } + None => buf, + }; + + Ok(compressed_buf) + } +} + +impl FromWireFormat for InterpretedWalRecords { + type T = Self; + + async fn from_wire( + buf: &Bytes, + format: InterpretedFormat, + compression: Option, + ) -> Result { + let decompressed_buf = match compression { + Some(Compression::Zstd { .. }) => { + use async_compression::tokio::write::ZstdDecoder; + let mut decoded_buf = Vec::with_capacity(buf.len()); + let mut decoder = ZstdDecoder::new(&mut decoded_buf); + decoder.write_all(buf).await?; + decoder.flush().await?; + Bytes::from(decoded_buf) + } + None => buf.clone(), + }; + + match format { + InterpretedFormat::Bincode => { + InterpretedWalRecords::des(&decompressed_buf).map_err(FromWireFormatError::Bincode) + } + InterpretedFormat::Protobuf => { + let proto = proto::InterpretedWalRecords::decode(decompressed_buf) + .map_err(|e| FromWireFormatError::Protobuf(e.into()))?; + InterpretedWalRecords::try_from(proto) + .map_err(|e| FromWireFormatError::Protobuf(e.into())) + } + } + } +} + +impl TryFrom for proto::InterpretedWalRecords { + type Error = SerializeError; + + fn try_from(value: InterpretedWalRecords) -> Result { + let records = value + .records + .into_iter() + .map(proto::InterpretedWalRecord::try_from) + .collect::, _>>()?; + Ok(proto::InterpretedWalRecords { + records, + next_record_lsn: value.next_record_lsn.map(|l| l.0), + }) + } +} + +impl TryFrom for proto::InterpretedWalRecord { + type Error = SerializeError; + + fn try_from(value: InterpretedWalRecord) -> Result { + let metadata_record = value + .metadata_record + .map(|meta_rec| -> Result, Self::Error> { + let mut buf = Vec::new(); + meta_rec.ser_into(&mut buf)?; + Ok(buf) + }) + .transpose()?; + + Ok(proto::InterpretedWalRecord { + metadata_record, + batch: Some(proto::SerializedValueBatch::from(value.batch)), + next_record_lsn: value.next_record_lsn.0, + flush_uncommitted: matches!(value.flush_uncommitted, FlushUncommittedRecords::Yes), + xid: value.xid, + }) + } +} + +impl From for proto::SerializedValueBatch { + fn from(value: SerializedValueBatch) -> Self { + proto::SerializedValueBatch { + raw: value.raw, + metadata: value + .metadata + .into_iter() + .map(proto::ValueMeta::from) + .collect(), + max_lsn: value.max_lsn.0, + len: value.len as u64, + } + } +} + +impl From for proto::ValueMeta { + fn from(value: ValueMeta) -> Self { + match value { + ValueMeta::Observed(obs) => proto::ValueMeta { + r#type: proto::ValueMetaType::Observed.into(), + key: Some(proto::CompactKey::from(obs.key)), + lsn: obs.lsn.0, + batch_offset: None, + len: None, + will_init: None, + }, + ValueMeta::Serialized(ser) => proto::ValueMeta { + r#type: proto::ValueMetaType::Serialized.into(), + key: Some(proto::CompactKey::from(ser.key)), + lsn: ser.lsn.0, + batch_offset: Some(ser.batch_offset), + len: Some(ser.len as u64), + will_init: Some(ser.will_init), + }, + } + } +} + +impl From for proto::CompactKey { + fn from(value: CompactKey) -> Self { + proto::CompactKey { + high: (value.raw() >> 64) as i64, + low: value.raw() as i64, + } + } +} + +impl TryFrom for InterpretedWalRecords { + type Error = TranscodeError; + + fn try_from(value: proto::InterpretedWalRecords) -> Result { + let records = value + .records + .into_iter() + .map(InterpretedWalRecord::try_from) + .collect::>()?; + + Ok(InterpretedWalRecords { + records, + next_record_lsn: value.next_record_lsn.map(Lsn::from), + }) + } +} + +impl TryFrom for InterpretedWalRecord { + type Error = TranscodeError; + + fn try_from(value: proto::InterpretedWalRecord) -> Result { + let metadata_record = value + .metadata_record + .map(|mrec| -> Result<_, DeserializeError> { MetadataRecord::des(&mrec) }) + .transpose()?; + + let batch = { + let batch = value.batch.ok_or_else(|| { + TranscodeError::BadInput("InterpretedWalRecord::batch missing".to_string()) + })?; + + SerializedValueBatch::try_from(batch)? + }; + + Ok(InterpretedWalRecord { + metadata_record, + batch, + next_record_lsn: Lsn(value.next_record_lsn), + flush_uncommitted: if value.flush_uncommitted { + FlushUncommittedRecords::Yes + } else { + FlushUncommittedRecords::No + }, + xid: value.xid, + }) + } +} + +impl TryFrom for SerializedValueBatch { + type Error = TranscodeError; + + fn try_from(value: proto::SerializedValueBatch) -> Result { + let metadata = value + .metadata + .into_iter() + .map(ValueMeta::try_from) + .collect::, _>>()?; + + Ok(SerializedValueBatch { + raw: value.raw, + metadata, + max_lsn: Lsn(value.max_lsn), + len: value.len as usize, + }) + } +} + +impl TryFrom for ValueMeta { + type Error = TranscodeError; + + fn try_from(value: proto::ValueMeta) -> Result { + match proto::ValueMetaType::try_from(value.r#type) { + Ok(proto::ValueMetaType::Serialized) => { + Ok(ValueMeta::Serialized(SerializedValueMeta { + key: value + .key + .ok_or_else(|| { + TranscodeError::BadInput("ValueMeta::key missing".to_string()) + })? + .into(), + lsn: Lsn(value.lsn), + batch_offset: value.batch_offset.ok_or_else(|| { + TranscodeError::BadInput("ValueMeta::batch_offset missing".to_string()) + })?, + len: value.len.ok_or_else(|| { + TranscodeError::BadInput("ValueMeta::len missing".to_string()) + })? as usize, + will_init: value.will_init.ok_or_else(|| { + TranscodeError::BadInput("ValueMeta::will_init missing".to_string()) + })?, + })) + } + Ok(proto::ValueMetaType::Observed) => Ok(ValueMeta::Observed(ObservedValueMeta { + key: value + .key + .ok_or_else(|| TranscodeError::BadInput("ValueMeta::key missing".to_string()))? + .into(), + lsn: Lsn(value.lsn), + })), + Err(_) => Err(TranscodeError::BadInput(format!( + "Unexpected ValueMeta::type {}", + value.r#type + ))), + } + } +} + +impl From for CompactKey { + fn from(value: proto::CompactKey) -> Self { + (((value.high as i128) << 64) | (value.low as i128)).into() + } +} diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 3cdc2a761e..78a157f51e 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -3,7 +3,7 @@ use metrics::{ register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec, register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge, register_uint_gauge_vec, - Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair, + Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair, IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec, }; use once_cell::sync::Lazy; @@ -457,6 +457,15 @@ pub(crate) static WAIT_LSN_TIME: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); +static FLUSH_WAIT_UPLOAD_TIME: Lazy = Lazy::new(|| { + register_gauge_vec!( + "pageserver_flush_wait_upload_seconds", + "Time spent waiting for preceding uploads during layer flush", + &["tenant_id", "shard_id", "timeline_id"] + ) + .expect("failed to define a metric") +}); + static LAST_RECORD_LSN: Lazy = Lazy::new(|| { register_int_gauge_vec!( "pageserver_last_record_lsn", @@ -653,6 +662,35 @@ pub(crate) static COMPRESSION_IMAGE_OUTPUT_BYTES: Lazy = Lazy::new(| .expect("failed to define a metric") }); +pub(crate) static RELSIZE_CACHE_ENTRIES: Lazy = Lazy::new(|| { + register_uint_gauge!( + "pageserver_relsize_cache_entries", + "Number of entries in the relation size cache", + ) + .expect("failed to define a metric") +}); + +pub(crate) static RELSIZE_CACHE_HITS: Lazy = Lazy::new(|| { + register_int_counter!("pageserver_relsize_cache_hits", "Relation size cache hits",) + .expect("failed to define a metric") +}); + +pub(crate) static RELSIZE_CACHE_MISSES: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_relsize_cache_misses", + "Relation size cache misses", + ) + .expect("failed to define a metric") +}); + +pub(crate) static RELSIZE_CACHE_MISSES_OLD: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_relsize_cache_misses_old", + "Relation size cache misses where the lookup LSN is older than the last relation update" + ) + .expect("failed to define a metric") +}); + pub(crate) mod initial_logical_size { use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec}; use once_cell::sync::Lazy; @@ -2336,6 +2374,7 @@ pub(crate) struct TimelineMetrics { shard_id: String, timeline_id: String, pub flush_time_histo: StorageTimeMetrics, + pub flush_wait_upload_time_gauge: Gauge, pub compact_time_histo: StorageTimeMetrics, pub create_images_time_histo: StorageTimeMetrics, pub logical_size_histo: StorageTimeMetrics, @@ -2379,6 +2418,9 @@ impl TimelineMetrics { &shard_id, &timeline_id, ); + let flush_wait_upload_time_gauge = FLUSH_WAIT_UPLOAD_TIME + .get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id]) + .unwrap(); let compact_time_histo = StorageTimeMetrics::new( StorageTimeOperation::Compact, &tenant_id, @@ -2516,6 +2558,7 @@ impl TimelineMetrics { shard_id, timeline_id, flush_time_histo, + flush_wait_upload_time_gauge, compact_time_histo, create_images_time_histo, logical_size_histo, @@ -2563,6 +2606,14 @@ impl TimelineMetrics { self.resident_physical_size_gauge.get() } + pub(crate) fn flush_wait_upload_time_gauge_add(&self, duration: f64) { + self.flush_wait_upload_time_gauge.add(duration); + crate::metrics::FLUSH_WAIT_UPLOAD_TIME + .get_metric_with_label_values(&[&self.tenant_id, &self.shard_id, &self.timeline_id]) + .unwrap() + .add(duration); + } + pub(crate) fn shutdown(&self) { let was_shutdown = self .shutdown @@ -2579,6 +2630,7 @@ impl TimelineMetrics { let timeline_id = &self.timeline_id; let shard_id = &self.shard_id; let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]); + let _ = FLUSH_WAIT_UPLOAD_TIME.remove_label_values(&[tenant_id, shard_id, timeline_id]); let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]); { RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get()); diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index c491bfe650..4f42427276 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -10,6 +10,9 @@ use super::tenant::{PageReconstructError, Timeline}; use crate::aux_file; use crate::context::RequestContext; use crate::keyspace::{KeySpace, KeySpaceAccum}; +use crate::metrics::{ + RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD, +}; use crate::span::{ debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id, @@ -1129,9 +1132,12 @@ impl Timeline { let rel_size_cache = self.rel_size_cache.read().unwrap(); if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) { if lsn >= *cached_lsn { + RELSIZE_CACHE_HITS.inc(); return Some(*nblocks); } + RELSIZE_CACHE_MISSES_OLD.inc(); } + RELSIZE_CACHE_MISSES.inc(); None } @@ -1156,6 +1162,7 @@ impl Timeline { } hash_map::Entry::Vacant(entry) => { entry.insert((lsn, nblocks)); + RELSIZE_CACHE_ENTRIES.inc(); } } } @@ -1163,13 +1170,17 @@ impl Timeline { /// Store cached relation size pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) { let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - rel_size_cache.map.insert(tag, (lsn, nblocks)); + if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() { + RELSIZE_CACHE_ENTRIES.inc(); + } } /// Remove cached relation size pub fn remove_cached_rel_size(&self, tag: &RelTag) { let mut rel_size_cache = self.rel_size_cache.write().unwrap(); - rel_size_cache.map.remove(tag); + if rel_size_cache.map.remove(tag).is_some() { + RELSIZE_CACHE_ENTRIES.dec(); + } } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0214ee68fa..bddcb534a1 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5344,6 +5344,7 @@ pub(crate) mod harness { lsn_lease_length: Some(tenant_conf.lsn_lease_length), lsn_lease_length_for_ts: Some(tenant_conf.lsn_lease_length_for_ts), timeline_offloading: Some(tenant_conf.timeline_offloading), + wal_receiver_protocol_override: tenant_conf.wal_receiver_protocol_override, } } } diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 4d6176bfd9..5d3ac5a8e3 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -19,6 +19,7 @@ use serde_json::Value; use std::num::NonZeroU64; use std::time::Duration; use utils::generation::Generation; +use utils::postgres_client::PostgresClientProtocol; #[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq)] pub(crate) enum AttachmentMode { @@ -353,6 +354,9 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub timeline_offloading: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub wal_receiver_protocol_override: Option, } impl TenantConfOpt { @@ -418,6 +422,9 @@ impl TenantConfOpt { timeline_offloading: self .lazy_slru_download .unwrap_or(global_conf.timeline_offloading), + wal_receiver_protocol_override: self + .wal_receiver_protocol_override + .or(global_conf.wal_receiver_protocol_override), } } } @@ -472,6 +479,7 @@ impl From for models::TenantConfig { lsn_lease_length: value.lsn_lease_length.map(humantime), lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime), timeline_offloading: value.timeline_offloading, + wal_receiver_protocol_override: value.wal_receiver_protocol_override, } } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 347393ba56..4951110a64 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -50,6 +50,7 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::{ fs_ext, pausable_failpoint, + postgres_client::PostgresClientProtocol, sync::gate::{Gate, GateGuard}, }; use wal_decoder::serialized_batch::SerializedValueBatch; @@ -2178,6 +2179,21 @@ impl Timeline { ) } + /// Resolve the effective WAL receiver protocol to use for this tenant. + /// + /// Priority order is: + /// 1. Tenant config override + /// 2. Default value for tenant config override + /// 3. Pageserver config override + /// 4. Pageserver config default + pub fn resolve_wal_receiver_protocol(&self) -> PostgresClientProtocol { + let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); + tenant_conf + .wal_receiver_protocol_override + .or(self.conf.default_tenant_conf.wal_receiver_protocol_override) + .unwrap_or(self.conf.wal_receiver_protocol) + } + pub(super) fn tenant_conf_updated(&self, new_conf: &AttachedTenantConf) { // NB: Most tenant conf options are read by background loops, so, // changes will automatically be picked up. @@ -2470,7 +2486,7 @@ impl Timeline { *guard = Some(WalReceiver::start( Arc::clone(self), WalReceiverConf { - protocol: self.conf.wal_receiver_protocol, + protocol: self.resolve_wal_receiver_protocol(), wal_connect_timeout, lagging_wal_timeout, max_lsn_wal_lag, @@ -3829,7 +3845,8 @@ impl Timeline { }; // Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files. - // This makes us refuse ingest until the new layers have been persisted to the remote. + // This makes us refuse ingest until the new layers have been persisted to the remote + let start = Instant::now(); self.remote_client .wait_completion() .await @@ -3842,6 +3859,8 @@ impl Timeline { FlushLayerError::Other(anyhow!(e).into()) } })?; + let duration = start.elapsed().as_secs_f64(); + self.metrics.flush_wait_upload_time_gauge_add(duration); // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`, // a compaction can delete the file and then it won't be available for uploads any more. diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 7a64703a30..583d6309ab 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -535,6 +535,7 @@ impl ConnectionManagerState { let node_id = new_sk.safekeeper_id; let connect_timeout = self.conf.wal_connect_timeout; let ingest_batch_size = self.conf.ingest_batch_size; + let protocol = self.conf.protocol; let timeline = Arc::clone(&self.timeline); let ctx = ctx.detached_child( TaskKind::WalReceiverConnectionHandler, @@ -548,6 +549,7 @@ impl ConnectionManagerState { let res = super::walreceiver_connection::handle_walreceiver_connection( timeline, + protocol, new_sk.wal_source_connconf, events_sender, cancellation.clone(), @@ -991,7 +993,7 @@ impl ConnectionManagerState { PostgresClientProtocol::Vanilla => { (None, None, None) }, - PostgresClientProtocol::Interpreted => { + PostgresClientProtocol::Interpreted { .. } => { let shard_identity = self.timeline.get_shard_identity(); ( Some(shard_identity.number.0), diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 1a0e66ceb3..31cf1b6307 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -22,7 +22,10 @@ use tokio::{select, sync::watch, time}; use tokio_postgres::{replication::ReplicationStream, Client}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn, Instrument}; -use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecord}; +use wal_decoder::{ + models::{FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords}, + wire_format::FromWireFormat, +}; use super::TaskStateUpdate; use crate::{ @@ -36,7 +39,7 @@ use crate::{ use postgres_backend::is_expected_io_error; use postgres_connection::PgConnectionConfig; use postgres_ffi::waldecoder::WalStreamDecoder; -use utils::{bin_ser::BeSer, id::NodeId, lsn::Lsn}; +use utils::{id::NodeId, lsn::Lsn, postgres_client::PostgresClientProtocol}; use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError}; /// Status of the connection. @@ -109,6 +112,7 @@ impl From for WalReceiverError { #[allow(clippy::too_many_arguments)] pub(super) async fn handle_walreceiver_connection( timeline: Arc, + protocol: PostgresClientProtocol, wal_source_connconf: PgConnectionConfig, events_sender: watch::Sender>, cancellation: CancellationToken, @@ -260,6 +264,14 @@ pub(super) async fn handle_walreceiver_connection( let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?; + let interpreted_proto_config = match protocol { + PostgresClientProtocol::Vanilla => None, + PostgresClientProtocol::Interpreted { + format, + compression, + } => Some((format, compression)), + }; + while let Some(replication_message) = { select! { _ = cancellation.cancelled() => { @@ -332,16 +344,26 @@ pub(super) async fn handle_walreceiver_connection( // This is the end LSN of the raw WAL from which the records // were interpreted. let streaming_lsn = Lsn::from(raw.streaming_lsn()); - tracing::debug!( - "Received WAL up to {streaming_lsn} with next_record_lsn={}", - Lsn(raw.next_record_lsn().unwrap_or(0)) - ); - let records = Vec::::des(raw.data()).with_context(|| { - anyhow::anyhow!( + let (format, compression) = interpreted_proto_config.unwrap(); + let batch = InterpretedWalRecords::from_wire(raw.data(), format, compression) + .await + .with_context(|| { + anyhow::anyhow!( "Failed to deserialize interpreted records ending at LSN {streaming_lsn}" ) - })?; + })?; + + let InterpretedWalRecords { + records, + next_record_lsn, + } = batch; + + tracing::debug!( + "Received WAL up to {} with next_record_lsn={:?}", + streaming_lsn, + next_record_lsn + ); // We start the modification at 0 because each interpreted record // advances it to its end LSN. 0 is just an initialization placeholder. @@ -360,14 +382,18 @@ pub(super) async fn handle_walreceiver_connection( .await?; } - let next_record_lsn = interpreted.next_record_lsn; + let local_next_record_lsn = interpreted.next_record_lsn; let ingested = walingest .ingest_record(interpreted, &mut modification, &ctx) .await - .with_context(|| format!("could not ingest record at {next_record_lsn}"))?; + .with_context(|| { + format!("could not ingest record at {local_next_record_lsn}") + })?; if !ingested { - tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}"); + tracing::debug!( + "ingest: filtered out record @ LSN {local_next_record_lsn}" + ); WAL_INGEST.records_filtered.inc(); filtered_records += 1; } @@ -399,7 +425,7 @@ pub(super) async fn handle_walreceiver_connection( // need to advance last record LSN on all shards. If we've not ingested the latest // record, then set the LSN of the modification past it. This way all shards // advance their last record LSN at the same time. - let needs_last_record_lsn_advance = match raw.next_record_lsn().map(Lsn::from) { + let needs_last_record_lsn_advance = match next_record_lsn.map(Lsn::from) { Some(lsn) if lsn > modification.get_lsn() => { modification.set_lsn(lsn).unwrap(); true diff --git a/pgxn/neon/logical_replication_monitor.c b/pgxn/neon/logical_replication_monitor.c index 1badbbed21..5eee5a1679 100644 --- a/pgxn/neon/logical_replication_monitor.c +++ b/pgxn/neon/logical_replication_monitor.c @@ -20,7 +20,7 @@ #define LS_MONITOR_CHECK_INTERVAL 10000 /* ms */ -static int logical_replication_max_snap_files = 300; +static int logical_replication_max_snap_files = 10000; /* * According to Chi (shyzh), the pageserver _should_ be good with 10 MB worth of @@ -184,7 +184,7 @@ InitLogicalReplicationMonitor(void) "Maximum allowed logical replication .snap files. When exceeded, slots are dropped until the limit is met. -1 disables the limit.", NULL, &logical_replication_max_snap_files, - 300, -1, INT_MAX, + 10000, -1, INT_MAX, PGC_SIGHUP, 0, NULL, NULL, NULL); diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 4b72a66e63..74415f1ffe 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -1,7 +1,8 @@ -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use dashmap::DashMap; +use ipnet::{IpNet, Ipv4Net, Ipv6Net}; use pq_proto::CancelKeyData; use thiserror::Error; use tokio::net::TcpStream; @@ -17,9 +18,6 @@ use crate::rate_limiter::LeakyBucketRateLimiter; use crate::redis::cancellation_publisher::{ CancellationPublisher, CancellationPublisherMut, RedisPublisherClient, }; -use std::net::IpAddr; - -use ipnet::{IpNet, Ipv4Net, Ipv6Net}; pub type CancelMap = Arc>>; pub type CancellationHandlerMain = CancellationHandler>>>; diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index fbd0c8e5c5..b910b524b1 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use futures::TryFutureExt; +use futures::{FutureExt, TryFutureExt}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, Instrument}; @@ -88,40 +88,37 @@ pub async fn task_main( crate::metrics::Protocol::Tcp, &config.region, ); - let span = ctx.span(); - let startup = Box::pin( - handle_client( - config, - backend, - &ctx, - cancellation_handler, - socket, - conn_gauge, - ) - .instrument(span.clone()), - ); - let res = startup.await; + let res = handle_client( + config, + backend, + &ctx, + cancellation_handler, + socket, + conn_gauge, + ) + .instrument(ctx.span()) + .boxed() + .await; match res { Err(e) => { - // todo: log and push to ctx the error kind ctx.set_error_kind(e.get_error_kind()); - error!(parent: &span, "per-client task finished with an error: {e:#}"); + error!(parent: &ctx.span(), "per-client task finished with an error: {e:#}"); } Ok(None) => { ctx.set_success(); } Ok(Some(p)) => { ctx.set_success(); - ctx.log_connect(); - match p.proxy_pass().instrument(span.clone()).await { + let _disconnect = ctx.log_connect(); + match p.proxy_pass().await { Ok(()) => {} Err(ErrorSource::Client(e)) => { - error!(parent: &span, "per-client task finished with an IO error from the client: {e:#}"); + error!(?session_id, "per-client task finished with an IO error from the client: {e:#}"); } Err(ErrorSource::Compute(e)) => { - error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}"); + error!(?session_id, "per-client task finished with an IO error from the compute: {e:#}"); } } } @@ -219,6 +216,7 @@ pub(crate) async fn handle_client( client: stream, aux: node.aux.clone(), compute: node, + session_id: ctx.session_id(), _req: request_gauge, _conn: conn_gauge, _cancel: session, diff --git a/proxy/src/context/mod.rs b/proxy/src/context/mod.rs index 6d2d2d51ce..4ec04deb25 100644 --- a/proxy/src/context/mod.rs +++ b/proxy/src/context/mod.rs @@ -272,11 +272,14 @@ impl RequestContext { this.success = true; } - pub fn log_connect(&self) { - self.0 - .try_lock() - .expect("should not deadlock") - .log_connect(); + pub fn log_connect(self) -> DisconnectLogger { + let mut this = self.0.into_inner(); + this.log_connect(); + + // close current span. + this.span = Span::none(); + + DisconnectLogger(this) } pub(crate) fn protocol(&self) -> Protocol { @@ -434,8 +437,14 @@ impl Drop for RequestContextInner { fn drop(&mut self) { if self.sender.is_some() { self.log_connect(); - } else { - self.log_disconnect(); } } } + +pub struct DisconnectLogger(RequestContextInner); + +impl Drop for DisconnectLogger { + fn drop(&mut self) { + self.0.log_disconnect(); + } +} diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 5d9468d89a..7fe67e43de 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -10,7 +10,7 @@ pub(crate) mod wake_compute; use std::sync::Arc; pub use copy_bidirectional::{copy_bidirectional_client_compute, ErrorSource}; -use futures::TryFutureExt; +use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; use once_cell::sync::OnceCell; use pq_proto::{BeMessage as Be, StartupMessageParams}; @@ -123,42 +123,39 @@ pub async fn task_main( crate::metrics::Protocol::Tcp, &config.region, ); - let span = ctx.span(); - let startup = Box::pin( - handle_client( - config, - auth_backend, - &ctx, - cancellation_handler, - socket, - ClientMode::Tcp, - endpoint_rate_limiter2, - conn_gauge, - ) - .instrument(span.clone()), - ); - let res = startup.await; + let res = handle_client( + config, + auth_backend, + &ctx, + cancellation_handler, + socket, + ClientMode::Tcp, + endpoint_rate_limiter2, + conn_gauge, + ) + .instrument(ctx.span()) + .boxed() + .await; match res { Err(e) => { - // todo: log and push to ctx the error kind ctx.set_error_kind(e.get_error_kind()); - warn!(parent: &span, "per-client task finished with an error: {e:#}"); + warn!(parent: &ctx.span(), "per-client task finished with an error: {e:#}"); } Ok(None) => { ctx.set_success(); } Ok(Some(p)) => { ctx.set_success(); - ctx.log_connect(); - match p.proxy_pass().instrument(span.clone()).await { + let _disconnect = ctx.log_connect(); + match p.proxy_pass().await { Ok(()) => {} Err(ErrorSource::Client(e)) => { - warn!(parent: &span, "per-client task finished with an IO error from the client: {e:#}"); + warn!(?session_id, "per-client task finished with an IO error from the client: {e:#}"); } Err(ErrorSource::Compute(e)) => { - error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}"); + error!(?session_id, "per-client task finished with an IO error from the compute: {e:#}"); } } } @@ -352,6 +349,7 @@ pub(crate) async fn handle_client( client: stream, aux: node.aux.clone(), compute: node, + session_id: ctx.session_id(), _req: request_gauge, _conn: conn_gauge, _cancel: session, diff --git a/proxy/src/proxy/passthrough.rs b/proxy/src/proxy/passthrough.rs index 5e07c8eeae..dcaa81e5cd 100644 --- a/proxy/src/proxy/passthrough.rs +++ b/proxy/src/proxy/passthrough.rs @@ -59,6 +59,7 @@ pub(crate) struct ProxyPassthrough { pub(crate) client: Stream, pub(crate) compute: PostgresConnection, pub(crate) aux: MetricsAuxInfo, + pub(crate) session_id: uuid::Uuid, pub(crate) _req: NumConnectionRequestsGuard<'static>, pub(crate) _conn: NumClientConnectionsGuard<'static>, @@ -69,7 +70,7 @@ impl ProxyPassthrough { pub(crate) async fn proxy_pass(self) -> Result<(), ErrorSource> { let res = proxy_pass(self.client, self.compute.stream, self.aux).await; if let Err(err) = self.compute.cancel_closure.try_cancel_query().await { - tracing::warn!(?err, "could not cancel the query in the database"); + tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database"); } res } diff --git a/proxy/src/redis/cancellation_publisher.rs b/proxy/src/redis/cancellation_publisher.rs index 633a2f1b81..228dbb7f64 100644 --- a/proxy/src/redis/cancellation_publisher.rs +++ b/proxy/src/redis/cancellation_publisher.rs @@ -1,6 +1,6 @@ +use core::net::IpAddr; use std::sync::Arc; -use core::net::IpAddr; use pq_proto::CancelKeyData; use redis::AsyncCommands; use tokio::sync::Mutex; diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index cec7c3c7ee..22f33b17e0 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -123,17 +123,10 @@ impl postgres_backend::Handler // https://github.com/neondatabase/neon/pull/2433#discussion_r970005064 match opt.split_once('=') { Some(("protocol", value)) => { - let raw_value = value - .parse::() - .with_context(|| format!("Failed to parse {value} as protocol"))?; - - self.protocol = Some( - PostgresClientProtocol::try_from(raw_value).map_err(|_| { - QueryError::Other(anyhow::anyhow!( - "Unexpected client protocol type: {raw_value}" - )) - })?, - ); + self.protocol = + Some(serde_json::from_str(value).with_context(|| { + format!("Failed to parse {value} as protocol") + })?); } Some(("ztenantid", value)) | Some(("tenant_id", value)) => { self.tenant_id = Some(value.parse().with_context(|| { @@ -180,7 +173,7 @@ impl postgres_backend::Handler ))); } } - PostgresClientProtocol::Interpreted => { + PostgresClientProtocol::Interpreted { .. } => { match (shard_count, shard_number, shard_stripe_size) { (Some(count), Some(number), Some(stripe_size)) => { let params = ShardParameters { diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs index cf0ee276e9..2589030422 100644 --- a/safekeeper/src/send_interpreted_wal.rs +++ b/safekeeper/src/send_interpreted_wal.rs @@ -9,9 +9,11 @@ use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder}; use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::time::MissedTickBehavior; -use utils::bin_ser::BeSer; use utils::lsn::Lsn; -use wal_decoder::models::InterpretedWalRecord; +use utils::postgres_client::Compression; +use utils::postgres_client::InterpretedFormat; +use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords}; +use wal_decoder::wire_format::ToWireFormat; use crate::send_wal::EndWatchView; use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder}; @@ -20,6 +22,8 @@ use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder}; /// This is used for sending WAL to the pageserver. Said WAL /// is pre-interpreted and filtered for the shard. pub(crate) struct InterpretedWalSender<'a, IO> { + pub(crate) format: InterpretedFormat, + pub(crate) compression: Option, pub(crate) pgb: &'a mut PostgresBackend, pub(crate) wal_stream_builder: WalReaderStreamBuilder, pub(crate) end_watch_view: EndWatchView, @@ -28,6 +32,12 @@ pub(crate) struct InterpretedWalSender<'a, IO> { pub(crate) appname: Option, } +struct Batch { + wal_end_lsn: Lsn, + available_wal_end_lsn: Lsn, + records: InterpretedWalRecords, +} + impl InterpretedWalSender<'_, IO> { /// Send interpreted WAL to a receiver. /// Stops when an error occurs or the receiver is caught up and there's no active compute. @@ -46,10 +56,13 @@ impl InterpretedWalSender<'_, IO> { keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); keepalive_ticker.reset(); + let (tx, mut rx) = tokio::sync::mpsc::channel::(2); + loop { tokio::select! { - // Get some WAL from the stream and then: decode, interpret and send it - wal = stream.next() => { + // Get some WAL from the stream and then: decode, interpret and push it down the + // pipeline. + wal = stream.next(), if tx.capacity() > 0 => { let WalBytes { wal, wal_start_lsn: _, wal_end_lsn, available_wal_end_lsn } = match wal { Some(some) => some?, None => { break; } @@ -81,10 +94,26 @@ impl InterpretedWalSender<'_, IO> { } } - let mut buf = Vec::new(); - records - .ser_into(&mut buf) - .with_context(|| "Failed to serialize interpreted WAL")?; + let batch = InterpretedWalRecords { + records, + next_record_lsn: max_next_record_lsn + }; + + tx.send(Batch {wal_end_lsn, available_wal_end_lsn, records: batch}).await.unwrap(); + }, + // For a previously interpreted batch, serialize it and push it down the wire. + batch = rx.recv() => { + let batch = match batch { + Some(b) => b, + None => { break; } + }; + + let buf = batch + .records + .to_wire(self.format, self.compression) + .await + .with_context(|| "Failed to serialize interpreted WAL") + .map_err(CopyStreamHandlerEnd::from)?; // Reset the keep alive ticker since we are sending something // over the wire now. @@ -92,13 +121,11 @@ impl InterpretedWalSender<'_, IO> { self.pgb .write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody { - streaming_lsn: wal_end_lsn.0, - commit_lsn: available_wal_end_lsn.0, - next_record_lsn: max_next_record_lsn.unwrap_or(Lsn::INVALID).0, - data: buf.as_slice(), + streaming_lsn: batch.wal_end_lsn.0, + commit_lsn: batch.available_wal_end_lsn.0, + data: &buf, })).await?; } - // Send a periodic keep alive when the connection has been idle for a while. _ = keepalive_ticker.tick() => { self.pgb diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 1acfcad418..225b7f4c05 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -454,7 +454,7 @@ impl SafekeeperPostgresHandler { } info!( - "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={}", + "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={:?}", start_pos, end_pos, matches!(end_watch, EndWatch::Flush(_)), @@ -489,7 +489,10 @@ impl SafekeeperPostgresHandler { Either::Left(sender.run()) } - PostgresClientProtocol::Interpreted => { + PostgresClientProtocol::Interpreted { + format, + compression, + } => { let pg_version = tli.tli.get_state().await.1.server.pg_version / 10000; let end_watch_view = end_watch.view(); let wal_stream_builder = WalReaderStreamBuilder { @@ -502,6 +505,8 @@ impl SafekeeperPostgresHandler { }; let sender = InterpretedWalSender { + format, + compression, pgb, wal_stream_builder, end_watch_view, diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index 330f007a77..3f90c233a6 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -168,6 +168,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = ( "pageserver_evictions_with_low_residence_duration_total", "pageserver_aux_file_estimated_size", "pageserver_valid_lsn_lease_count", + "pageserver_flush_wait_upload_seconds", counter("pageserver_tenant_throttling_count_accounted_start"), counter("pageserver_tenant_throttling_count_accounted_finish"), counter("pageserver_tenant_throttling_wait_usecs_sum"), diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 07d442b4a6..a45a311dc2 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -310,6 +310,31 @@ class PgProtocol: return self.safe_psql(query, log_query=log_query)[0][0] +class PageserverWalReceiverProtocol(StrEnum): + VANILLA = "vanilla" + INTERPRETED = "interpreted" + + @staticmethod + def to_config_key_value(proto) -> tuple[str, dict[str, Any]]: + if proto == PageserverWalReceiverProtocol.VANILLA: + return ( + "wal_receiver_protocol", + { + "type": "vanilla", + }, + ) + elif proto == PageserverWalReceiverProtocol.INTERPRETED: + return ( + "wal_receiver_protocol", + { + "type": "interpreted", + "args": {"format": "protobuf", "compression": {"zstd": {"level": 1}}}, + }, + ) + else: + raise ValueError(f"Unknown protocol type: {proto}") + + class NeonEnvBuilder: """ Builder object to create a Neon runtime environment @@ -356,6 +381,7 @@ class NeonEnvBuilder: safekeeper_extra_opts: list[str] | None = None, storage_controller_port_override: int | None = None, pageserver_virtual_file_io_mode: str | None = None, + pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None, ): self.repo_dir = repo_dir self.rust_log_override = rust_log_override @@ -409,6 +435,8 @@ class NeonEnvBuilder: self.pageserver_virtual_file_io_mode = pageserver_virtual_file_io_mode + self.pageserver_wal_receiver_protocol = pageserver_wal_receiver_protocol + assert test_name.startswith( "test_" ), "Unexpectedly instantiated from outside a test function" @@ -1023,6 +1051,7 @@ class NeonEnv: self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode + self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol # Create the neon_local's `NeonLocalInitConf` cfg: dict[str, Any] = { @@ -1092,6 +1121,13 @@ class NeonEnv: if self.pageserver_virtual_file_io_mode is not None: ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode + if self.pageserver_wal_receiver_protocol is not None: + key, value = PageserverWalReceiverProtocol.to_config_key_value( + self.pageserver_wal_receiver_protocol + ) + if key not in ps_cfg: + ps_cfg[key] = value + # Create a corresponding NeonPageserver object self.pageservers.append( NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"]) diff --git a/test_runner/performance/test_perf_ingest_using_pgcopydb.py b/test_runner/performance/test_perf_ingest_using_pgcopydb.py index 2f4574ba88..37f2e9db50 100644 --- a/test_runner/performance/test_perf_ingest_using_pgcopydb.py +++ b/test_runner/performance/test_perf_ingest_using_pgcopydb.py @@ -60,13 +60,13 @@ def build_pgcopydb_command(pgcopydb_filter_file: Path, test_output_dir: Path): "--no-acl", "--skip-db-properties", "--table-jobs", - "4", + "8", "--index-jobs", - "4", + "8", "--restore-jobs", - "4", + "8", "--split-tables-larger-than", - "10GB", + "5GB", "--skip-extensions", "--use-copy-binary", "--filters", @@ -136,7 +136,7 @@ def run_command_and_log_output(command, log_file_path: Path): "LD_LIBRARY_PATH": f"{os.getenv('PGCOPYDB_LIB_PATH')}:{os.getenv('PG_16_LIB_PATH')}", "PGCOPYDB_SOURCE_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_SOURCE_CONNSTR")), "PGCOPYDB_TARGET_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR")), - "PGOPTIONS": "-c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7", + "PGOPTIONS": "-c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=16", } # Combine the current environment with custom variables env = os.environ.copy() diff --git a/test_runner/performance/test_sharded_ingest.py b/test_runner/performance/test_sharded_ingest.py index e965aae5a0..4c21e799c8 100644 --- a/test_runner/performance/test_sharded_ingest.py +++ b/test_runner/performance/test_sharded_ingest.py @@ -15,7 +15,14 @@ from fixtures.neon_fixtures import ( @pytest.mark.timeout(600) @pytest.mark.parametrize("shard_count", [1, 8, 32]) -@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"]) +@pytest.mark.parametrize( + "wal_receiver_protocol", + [ + "vanilla", + "interpreted-bincode-compressed", + "interpreted-protobuf-compressed", + ], +) def test_sharded_ingest( neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, @@ -27,14 +34,42 @@ def test_sharded_ingest( and fanning out to a large number of shards on dedicated Pageservers. Comparing the base case (shard_count=1) to the sharded case indicates the overhead of sharding. """ - neon_env_builder.pageserver_config_override = ( - f"wal_receiver_protocol = '{wal_receiver_protocol}'" - ) - ROW_COUNT = 100_000_000 # about 7 GB of WAL neon_env_builder.num_pageservers = shard_count - env = neon_env_builder.init_start() + env = neon_env_builder.init_configs() + + for ps in env.pageservers: + if wal_receiver_protocol == "vanilla": + ps.patch_config_toml_nonrecursive( + { + "wal_receiver_protocol": { + "type": "vanilla", + } + } + ) + elif wal_receiver_protocol == "interpreted-bincode-compressed": + ps.patch_config_toml_nonrecursive( + { + "wal_receiver_protocol": { + "type": "interpreted", + "args": {"format": "bincode", "compression": {"zstd": {"level": 1}}}, + } + } + ) + elif wal_receiver_protocol == "interpreted-protobuf-compressed": + ps.patch_config_toml_nonrecursive( + { + "wal_receiver_protocol": { + "type": "interpreted", + "args": {"format": "protobuf", "compression": {"zstd": {"level": 1}}}, + } + } + ) + else: + raise AssertionError("Test must use explicit wal receiver protocol config") + + env.start() # Create a sharded tenant and timeline, and migrate it to the respective pageservers. Ensure # the storage controller doesn't mess with shard placements. diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 5744c445f6..670c2698f5 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -174,6 +174,10 @@ def test_fully_custom_config(positive_env: NeonEnv): "lsn_lease_length": "1m", "lsn_lease_length_for_ts": "5s", "timeline_offloading": True, + "wal_receiver_protocol_override": { + "type": "interpreted", + "args": {"format": "bincode", "compression": {"zstd": {"level": 1}}}, + }, } vps_http = env.storage_controller.pageserver_api() diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 79fd256304..302a8fd0d1 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -8,6 +8,7 @@ import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, + PageserverWalReceiverProtocol, generate_uploads_and_deletions, ) from fixtures.pageserver.http import PageserverApiException @@ -27,8 +28,13 @@ AGGRESIVE_COMPACTION_TENANT_CONF = { @skip_in_debug_build("only run with release build") -@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"]) -def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: str): +@pytest.mark.parametrize( + "wal_receiver_protocol", + [PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED], +) +def test_pageserver_compaction_smoke( + neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol +): """ This is a smoke test that compaction kicks in. The workload repeatedly churns a small number of rows and manually instructs the pageserver to run compaction @@ -37,10 +43,12 @@ def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder, wal_recei observed bounds. """ + neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol + # Effectively disable the page cache to rely only on image layers # to shorten reads. - neon_env_builder.pageserver_config_override = f""" -page_cache_size=10; wal_receiver_protocol='{wal_receiver_protocol}' + neon_env_builder.pageserver_config_override = """ +page_cache_size=10 """ env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF) diff --git a/test_runner/regress/test_crafted_wal_end.py b/test_runner/regress/test_crafted_wal_end.py index 70e71d99cd..6b9dcbba07 100644 --- a/test_runner/regress/test_crafted_wal_end.py +++ b/test_runner/regress/test_crafted_wal_end.py @@ -3,7 +3,7 @@ from __future__ import annotations import pytest from fixtures.log_helper import log from fixtures.neon_cli import WalCraft -from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.neon_fixtures import NeonEnvBuilder, PageserverWalReceiverProtocol # Restart nodes with WAL end having specially crafted shape, like last record # crossing segment boundary, to test decoding issues. @@ -19,13 +19,16 @@ from fixtures.neon_fixtures import NeonEnvBuilder "wal_record_crossing_segment_followed_by_small_one", ], ) -@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"]) +@pytest.mark.parametrize( + "wal_receiver_protocol", + [PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED], +) def test_crafted_wal_end( - neon_env_builder: NeonEnvBuilder, wal_type: str, wal_receiver_protocol: str + neon_env_builder: NeonEnvBuilder, + wal_type: str, + wal_receiver_protocol: PageserverWalReceiverProtocol, ): - neon_env_builder.pageserver_config_override = ( - f"wal_receiver_protocol = '{wal_receiver_protocol}'" - ) + neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol env = neon_env_builder.init_start() env.create_branch("test_crafted_wal_end") diff --git a/test_runner/regress/test_subxacts.py b/test_runner/regress/test_subxacts.py index 1d86c353be..b235da0bc7 100644 --- a/test_runner/regress/test_subxacts.py +++ b/test_runner/regress/test_subxacts.py @@ -1,7 +1,11 @@ from __future__ import annotations import pytest -from fixtures.neon_fixtures import NeonEnvBuilder, check_restored_datadir_content +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + PageserverWalReceiverProtocol, + check_restored_datadir_content, +) # Test subtransactions @@ -10,11 +14,12 @@ from fixtures.neon_fixtures import NeonEnvBuilder, check_restored_datadir_conten # maintained in the pageserver, so subtransactions are not very exciting for # Neon. They are included in the commit record though and updated in the # CLOG. -@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"]) +@pytest.mark.parametrize( + "wal_receiver_protocol", + [PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED], +) def test_subxacts(neon_env_builder: NeonEnvBuilder, test_output_dir, wal_receiver_protocol): - neon_env_builder.pageserver_config_override = ( - f"wal_receiver_protocol = '{wal_receiver_protocol}'" - ) + neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol env = neon_env_builder.init_start() endpoint = env.endpoints.create_start("main") diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index 094b10b576..b32b028fa1 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -11,7 +11,13 @@ import pytest import toml from fixtures.common_types import Lsn, TenantId, TimelineId from fixtures.log_helper import getLogger -from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper +from fixtures.neon_fixtures import ( + Endpoint, + NeonEnv, + NeonEnvBuilder, + PageserverWalReceiverProtocol, + Safekeeper, +) from fixtures.remote_storage import RemoteStorageKind from fixtures.utils import skip_in_debug_build @@ -622,12 +628,15 @@ async def run_segment_init_failure(env: NeonEnv): # Test (injected) failure during WAL segment init. # https://github.com/neondatabase/neon/issues/6401 # https://github.com/neondatabase/neon/issues/6402 -@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"]) -def test_segment_init_failure(neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: str): +@pytest.mark.parametrize( + "wal_receiver_protocol", + [PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED], +) +def test_segment_init_failure( + neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol +): neon_env_builder.num_safekeepers = 1 - neon_env_builder.pageserver_config_override = ( - f"wal_receiver_protocol = '{wal_receiver_protocol}'" - ) + neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol env = neon_env_builder.init_start() asyncio.run(run_segment_init_failure(env))