From 66f8f5f1c88073ddd00a5172a07f26a649fed122 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 19 Oct 2023 14:17:15 +0100 Subject: [PATCH] Call walproposer from Rust (#5403) Create Rust bindings for C functions from walproposer. This allows to write better tests with real walproposer code without spawning multiple processes and starting up the whole environment. `make walproposer-lib` stage was added to build static libraries `libwalproposer.a`, `libpgport.a`, `libpgcommon.a`. These libraries can be statically linked to any executable to call walproposer functions. `libs/walproposer/src/walproposer.rs` contains `test_simple_sync_safekeepers` to test that walproposer can be called from Rust to emulate sync_safekeepers logic. It can also be used as a usage example. --- .github/workflows/build_and_test.yml | 3 + .github/workflows/neon_extra_builds.yml | 3 + Cargo.lock | 11 + Cargo.toml | 2 + Makefile | 36 +- libs/walproposer/Cargo.toml | 16 + libs/walproposer/bindgen_deps.h | 1 + libs/walproposer/build.rs | 113 ++++++ libs/walproposer/src/api_bindings.rs | 455 ++++++++++++++++++++++ libs/walproposer/src/lib.rs | 14 + libs/walproposer/src/walproposer.rs | 485 ++++++++++++++++++++++++ pgxn/neon/Makefile | 17 + pgxn/neon/walproposer.c | 283 +++++++------- pgxn/neon/walproposer.h | 138 ++++--- pgxn/neon/walproposer_compat.c | 192 ++++++++++ pgxn/neon/walproposer_pg.c | 204 ++++++---- 16 files changed, 1711 insertions(+), 262 deletions(-) create mode 100644 libs/walproposer/Cargo.toml create mode 100644 libs/walproposer/bindgen_deps.h create mode 100644 libs/walproposer/build.rs create mode 100644 libs/walproposer/src/api_bindings.rs create mode 100644 libs/walproposer/src/lib.rs create mode 100644 libs/walproposer/src/walproposer.rs create mode 100644 pgxn/neon/walproposer_compat.c diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 1fed98f202..3f1c728c70 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -320,6 +320,9 @@ jobs: - name: Build neon extensions run: mold -run make neon-pg-ext -j$(nproc) + - name: Build walproposer-lib + run: mold -run make walproposer-lib -j$(nproc) + - name: Run cargo build run: | ${cov_prefix} mold -run cargo build $CARGO_FLAGS $CARGO_FEATURES --bins --tests diff --git a/.github/workflows/neon_extra_builds.yml b/.github/workflows/neon_extra_builds.yml index 8a1e4571fd..891cc8472a 100644 --- a/.github/workflows/neon_extra_builds.yml +++ b/.github/workflows/neon_extra_builds.yml @@ -103,6 +103,9 @@ jobs: - name: Build neon extensions run: make neon-pg-ext -j$(nproc) + - name: Build walproposer-lib + run: make walproposer-lib -j$(nproc) + - name: Run cargo build run: cargo build --all --release diff --git a/Cargo.lock b/Cargo.lock index 3a916b464d..f7598a79cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6092,6 +6092,17 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "walproposer" +version = "0.1.0" +dependencies = [ + "anyhow", + "bindgen", + "postgres_ffi", + "utils", + "workspace_hack", +] + [[package]] name = "want" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 26f523ce19..a0be7bb9ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "libs/tracing-utils", "libs/postgres_ffi/wal_craft", "libs/vm_monitor", + "libs/walproposer", ] [workspace.package] @@ -185,6 +186,7 @@ tenant_size_model = { version = "0.1", path = "./libs/tenant_size_model/" } tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" } utils = { version = "0.1", path = "./libs/utils/" } vm_monitor = { version = "0.1", path = "./libs/vm_monitor/" } +walproposer = { version = "0.1", path = "./libs/walproposer/" } ## Common library dependency workspace_hack = { version = "0.1", path = "./workspace_hack/" } diff --git a/Makefile b/Makefile index 33b5dcad99..3b3f0e3dac 100644 --- a/Makefile +++ b/Makefile @@ -62,7 +62,7 @@ all: neon postgres neon-pg-ext # # The 'postgres_ffi' depends on the Postgres headers. .PHONY: neon -neon: postgres-headers +neon: postgres-headers walproposer-lib +@echo "Compiling Neon" $(CARGO_CMD_PREFIX) cargo build $(CARGO_BUILD_FLAGS) @@ -168,6 +168,40 @@ neon-pg-ext-clean-%: -C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \ -f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile clean +# Build walproposer as a static library. walproposer source code is located +# in the pgxn/neon directory. +# +# We also need to include libpgport.a and libpgcommon.a, because walproposer +# uses some functions from those libraries. +# +# Some object files are removed from libpgport.a and libpgcommon.a because +# they depend on openssl and other libraries that are not included in our +# Rust build. +.PHONY: walproposer-lib +walproposer-lib: neon-pg-ext-v16 + +@echo "Compiling walproposer-lib" + mkdir -p $(POSTGRES_INSTALL_DIR)/build/walproposer-lib + $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v16/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \ + -C $(POSTGRES_INSTALL_DIR)/build/walproposer-lib \ + -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile walproposer-lib + cp $(POSTGRES_INSTALL_DIR)/v16/lib/libpgport.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib + cp $(POSTGRES_INSTALL_DIR)/v16/lib/libpgcommon.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib + $(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgport.a \ + pg_strong_random.o + $(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgcommon.a \ + pg_crc32c.o \ + hmac_openssl.o \ + cryptohash_openssl.o \ + scram-common.o \ + md5_common.o \ + checksum_helper.o + +.PHONY: walproposer-lib-clean +walproposer-lib-clean: + $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v16/bin/pg_config \ + -C $(POSTGRES_INSTALL_DIR)/build/walproposer-lib \ + -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile clean + .PHONY: neon-pg-ext neon-pg-ext: \ neon-pg-ext-v14 \ diff --git a/libs/walproposer/Cargo.toml b/libs/walproposer/Cargo.toml new file mode 100644 index 0000000000..73aa073c44 --- /dev/null +++ b/libs/walproposer/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "walproposer" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +anyhow.workspace = true +utils.workspace = true +postgres_ffi.workspace = true + +workspace_hack.workspace = true + +[build-dependencies] +anyhow.workspace = true +bindgen.workspace = true diff --git a/libs/walproposer/bindgen_deps.h b/libs/walproposer/bindgen_deps.h new file mode 100644 index 0000000000..b95788347c --- /dev/null +++ b/libs/walproposer/bindgen_deps.h @@ -0,0 +1 @@ +#include "walproposer.h" diff --git a/libs/walproposer/build.rs b/libs/walproposer/build.rs new file mode 100644 index 0000000000..d32c8ab299 --- /dev/null +++ b/libs/walproposer/build.rs @@ -0,0 +1,113 @@ +use std::{env, path::PathBuf, process::Command}; + +use anyhow::{anyhow, Context}; +use bindgen::CargoCallbacks; + +fn main() -> anyhow::Result<()> { + // Tell cargo to invalidate the built crate whenever the wrapper changes + println!("cargo:rerun-if-changed=bindgen_deps.h"); + + // Finding the location of built libraries and Postgres C headers: + // - if POSTGRES_INSTALL_DIR is set look into it, otherwise look into `/pg_install` + // - if there's a `bin/pg_config` file use it for getting include server, otherwise use `/pg_install/{PG_MAJORVERSION}/include/postgresql/server` + let pg_install_dir = if let Some(postgres_install_dir) = env::var_os("POSTGRES_INSTALL_DIR") { + postgres_install_dir.into() + } else { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../pg_install") + }; + + let pg_install_abs = std::fs::canonicalize(pg_install_dir)?; + let walproposer_lib_dir = pg_install_abs.join("build/walproposer-lib"); + let walproposer_lib_search_str = walproposer_lib_dir + .to_str() + .ok_or(anyhow!("Bad non-UTF path"))?; + + let pgxn_neon = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../pgxn/neon"); + let pgxn_neon = std::fs::canonicalize(pgxn_neon)?; + let pgxn_neon = pgxn_neon.to_str().ok_or(anyhow!("Bad non-UTF path"))?; + + println!("cargo:rustc-link-lib=static=pgport"); + println!("cargo:rustc-link-lib=static=pgcommon"); + println!("cargo:rustc-link-lib=static=walproposer"); + println!("cargo:rustc-link-search={walproposer_lib_search_str}"); + + let pg_config_bin = pg_install_abs.join("v16").join("bin").join("pg_config"); + let inc_server_path: String = if pg_config_bin.exists() { + let output = Command::new(pg_config_bin) + .arg("--includedir-server") + .output() + .context("failed to execute `pg_config --includedir-server`")?; + + if !output.status.success() { + panic!("`pg_config --includedir-server` failed") + } + + String::from_utf8(output.stdout) + .context("pg_config output is not UTF-8")? + .trim_end() + .into() + } else { + let server_path = pg_install_abs + .join("v16") + .join("include") + .join("postgresql") + .join("server") + .into_os_string(); + server_path + .into_string() + .map_err(|s| anyhow!("Bad postgres server path {s:?}"))? + }; + + // The bindgen::Builder is the main entry point + // to bindgen, and lets you build up options for + // the resulting bindings. + let bindings = bindgen::Builder::default() + // The input header we would like to generate + // bindings for. + .header("bindgen_deps.h") + // Tell cargo to invalidate the built crate whenever any of the + // included header files changed. + .parse_callbacks(Box::new(CargoCallbacks)) + .allowlist_type("WalProposer") + .allowlist_type("WalProposerConfig") + .allowlist_type("walproposer_api") + .allowlist_function("WalProposerCreate") + .allowlist_function("WalProposerStart") + .allowlist_function("WalProposerBroadcast") + .allowlist_function("WalProposerPoll") + .allowlist_function("WalProposerFree") + .allowlist_var("DEBUG5") + .allowlist_var("DEBUG4") + .allowlist_var("DEBUG3") + .allowlist_var("DEBUG2") + .allowlist_var("DEBUG1") + .allowlist_var("LOG") + .allowlist_var("INFO") + .allowlist_var("NOTICE") + .allowlist_var("WARNING") + .allowlist_var("ERROR") + .allowlist_var("FATAL") + .allowlist_var("PANIC") + .allowlist_var("WPEVENT") + .allowlist_var("WL_LATCH_SET") + .allowlist_var("WL_SOCKET_READABLE") + .allowlist_var("WL_SOCKET_WRITEABLE") + .allowlist_var("WL_TIMEOUT") + .allowlist_var("WL_SOCKET_CLOSED") + .allowlist_var("WL_SOCKET_MASK") + .clang_arg("-DWALPROPOSER_LIB") + .clang_arg(format!("-I{pgxn_neon}")) + .clang_arg(format!("-I{inc_server_path}")) + // Finish the builder and generate the bindings. + .generate() + // Unwrap the Result and panic on failure. + .expect("Unable to generate bindings"); + + // Write the bindings to the $OUT_DIR/bindings.rs file. + let out_path = PathBuf::from(env::var("OUT_DIR").unwrap()).join("bindings.rs"); + bindings + .write_to_file(out_path) + .expect("Couldn't write bindings!"); + + Ok(()) +} diff --git a/libs/walproposer/src/api_bindings.rs b/libs/walproposer/src/api_bindings.rs new file mode 100644 index 0000000000..b0ac2a879e --- /dev/null +++ b/libs/walproposer/src/api_bindings.rs @@ -0,0 +1,455 @@ +#![allow(dead_code)] + +use std::ffi::CStr; +use std::ffi::CString; + +use crate::bindings::uint32; +use crate::bindings::walproposer_api; +use crate::bindings::PGAsyncReadResult; +use crate::bindings::PGAsyncWriteResult; +use crate::bindings::Safekeeper; +use crate::bindings::Size; +use crate::bindings::StringInfoData; +use crate::bindings::TimeLineID; +use crate::bindings::TimestampTz; +use crate::bindings::WalProposer; +use crate::bindings::WalProposerConnStatusType; +use crate::bindings::WalProposerConnectPollStatusType; +use crate::bindings::WalProposerExecStatusType; +use crate::bindings::WalproposerShmemState; +use crate::bindings::XLogRecPtr; +use crate::walproposer::ApiImpl; +use crate::walproposer::WaitResult; + +extern "C" fn get_shmem_state(wp: *mut WalProposer) -> *mut WalproposerShmemState { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).get_shmem_state() + } +} + +extern "C" fn start_streaming(wp: *mut WalProposer, startpos: XLogRecPtr) { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).start_streaming(startpos) + } +} + +extern "C" fn get_flush_rec_ptr(wp: *mut WalProposer) -> XLogRecPtr { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).get_flush_rec_ptr() + } +} + +extern "C" fn get_current_timestamp(wp: *mut WalProposer) -> TimestampTz { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).get_current_timestamp() + } +} + +extern "C" fn conn_error_message(sk: *mut Safekeeper) -> *mut ::std::os::raw::c_char { + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + let msg = (*api).conn_error_message(&mut (*sk)); + let msg = CString::new(msg).unwrap(); + // TODO: fix leaking error message + msg.into_raw() + } +} + +extern "C" fn conn_status(sk: *mut Safekeeper) -> WalProposerConnStatusType { + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).conn_status(&mut (*sk)) + } +} + +extern "C" fn conn_connect_start(sk: *mut Safekeeper) { + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).conn_connect_start(&mut (*sk)) + } +} + +extern "C" fn conn_connect_poll(sk: *mut Safekeeper) -> WalProposerConnectPollStatusType { + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).conn_connect_poll(&mut (*sk)) + } +} + +extern "C" fn conn_send_query(sk: *mut Safekeeper, query: *mut ::std::os::raw::c_char) -> bool { + let query = unsafe { CStr::from_ptr(query) }; + let query = query.to_str().unwrap(); + + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).conn_send_query(&mut (*sk), query) + } +} + +extern "C" fn conn_get_query_result(sk: *mut Safekeeper) -> WalProposerExecStatusType { + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).conn_get_query_result(&mut (*sk)) + } +} + +extern "C" fn conn_flush(sk: *mut Safekeeper) -> ::std::os::raw::c_int { + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).conn_flush(&mut (*sk)) + } +} + +extern "C" fn conn_finish(sk: *mut Safekeeper) { + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).conn_finish(&mut (*sk)) + } +} + +extern "C" fn conn_async_read( + sk: *mut Safekeeper, + buf: *mut *mut ::std::os::raw::c_char, + amount: *mut ::std::os::raw::c_int, +) -> PGAsyncReadResult { + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + let (res, result) = (*api).conn_async_read(&mut (*sk)); + + // This function has guarantee that returned buf will be valid until + // the next call. So we can store a Vec in each Safekeeper and reuse + // it on the next call. + let mut inbuf = take_vec_u8(&mut (*sk).inbuf).unwrap_or_default(); + + inbuf.clear(); + inbuf.extend_from_slice(res); + + // Put a Vec back to sk->inbuf and return data ptr. + *buf = store_vec_u8(&mut (*sk).inbuf, inbuf); + *amount = res.len() as i32; + + result + } +} + +extern "C" fn conn_async_write( + sk: *mut Safekeeper, + buf: *const ::std::os::raw::c_void, + size: usize, +) -> PGAsyncWriteResult { + unsafe { + let buf = std::slice::from_raw_parts(buf as *const u8, size); + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).conn_async_write(&mut (*sk), buf) + } +} + +extern "C" fn conn_blocking_write( + sk: *mut Safekeeper, + buf: *const ::std::os::raw::c_void, + size: usize, +) -> bool { + unsafe { + let buf = std::slice::from_raw_parts(buf as *const u8, size); + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).conn_blocking_write(&mut (*sk), buf) + } +} + +extern "C" fn recovery_download( + sk: *mut Safekeeper, + _timeline: TimeLineID, + startpos: XLogRecPtr, + endpos: XLogRecPtr, +) -> bool { + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).recovery_download(&mut (*sk), startpos, endpos) + } +} + +extern "C" fn wal_read( + sk: *mut Safekeeper, + buf: *mut ::std::os::raw::c_char, + startptr: XLogRecPtr, + count: Size, +) { + unsafe { + let buf = std::slice::from_raw_parts_mut(buf as *mut u8, count); + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).wal_read(&mut (*sk), buf, startptr) + } +} + +extern "C" fn wal_reader_allocate(sk: *mut Safekeeper) { + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).wal_reader_allocate(&mut (*sk)); + } +} + +extern "C" fn free_event_set(wp: *mut WalProposer) { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).free_event_set(&mut (*wp)); + } +} + +extern "C" fn init_event_set(wp: *mut WalProposer) { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).init_event_set(&mut (*wp)); + } +} + +extern "C" fn update_event_set(sk: *mut Safekeeper, events: uint32) { + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).update_event_set(&mut (*sk), events); + } +} + +extern "C" fn add_safekeeper_event_set(sk: *mut Safekeeper, events: uint32) { + unsafe { + let callback_data = (*(*(*sk).wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).add_safekeeper_event_set(&mut (*sk), events); + } +} + +extern "C" fn wait_event_set( + wp: *mut WalProposer, + timeout: ::std::os::raw::c_long, + event_sk: *mut *mut Safekeeper, + events: *mut uint32, +) -> ::std::os::raw::c_int { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + let result = (*api).wait_event_set(&mut (*wp), timeout); + match result { + WaitResult::Latch => { + *event_sk = std::ptr::null_mut(); + *events = crate::bindings::WL_LATCH_SET; + 1 + } + WaitResult::Timeout => { + *event_sk = std::ptr::null_mut(); + *events = crate::bindings::WL_TIMEOUT; + 0 + } + WaitResult::Network(sk, event_mask) => { + *event_sk = sk; + *events = event_mask; + 1 + } + } + } +} + +extern "C" fn strong_random( + wp: *mut WalProposer, + buf: *mut ::std::os::raw::c_void, + len: usize, +) -> bool { + unsafe { + let buf = std::slice::from_raw_parts_mut(buf as *mut u8, len); + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).strong_random(buf) + } +} + +extern "C" fn get_redo_start_lsn(wp: *mut WalProposer) -> XLogRecPtr { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).get_redo_start_lsn() + } +} + +extern "C" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).finish_sync_safekeepers(lsn) + } +} + +extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, commit_lsn: XLogRecPtr) { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).process_safekeeper_feedback(&mut (*wp), commit_lsn) + } +} + +extern "C" fn confirm_wal_streamed(wp: *mut WalProposer, lsn: XLogRecPtr) { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).confirm_wal_streamed(&mut (*wp), lsn) + } +} + +extern "C" fn log_internal( + wp: *mut WalProposer, + level: ::std::os::raw::c_int, + line: *const ::std::os::raw::c_char, +) { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + let line = CStr::from_ptr(line); + let line = line.to_str().unwrap(); + (*api).log_internal(&mut (*wp), Level::from(level as u32), line) + } +} + +extern "C" fn after_election(wp: *mut WalProposer) { + unsafe { + let callback_data = (*(*wp).config).callback_data; + let api = callback_data as *mut Box; + (*api).after_election(&mut (*wp)) + } +} + +#[derive(Debug)] +pub enum Level { + Debug5, + Debug4, + Debug3, + Debug2, + Debug1, + Log, + Info, + Notice, + Warning, + Error, + Fatal, + Panic, + WPEvent, +} + +impl Level { + pub fn from(elevel: u32) -> Level { + use crate::bindings::*; + + match elevel { + DEBUG5 => Level::Debug5, + DEBUG4 => Level::Debug4, + DEBUG3 => Level::Debug3, + DEBUG2 => Level::Debug2, + DEBUG1 => Level::Debug1, + LOG => Level::Log, + INFO => Level::Info, + NOTICE => Level::Notice, + WARNING => Level::Warning, + ERROR => Level::Error, + FATAL => Level::Fatal, + PANIC => Level::Panic, + WPEVENT => Level::WPEvent, + _ => panic!("unknown log level {}", elevel), + } + } +} + +pub(crate) fn create_api() -> walproposer_api { + walproposer_api { + get_shmem_state: Some(get_shmem_state), + start_streaming: Some(start_streaming), + get_flush_rec_ptr: Some(get_flush_rec_ptr), + get_current_timestamp: Some(get_current_timestamp), + conn_error_message: Some(conn_error_message), + conn_status: Some(conn_status), + conn_connect_start: Some(conn_connect_start), + conn_connect_poll: Some(conn_connect_poll), + conn_send_query: Some(conn_send_query), + conn_get_query_result: Some(conn_get_query_result), + conn_flush: Some(conn_flush), + conn_finish: Some(conn_finish), + conn_async_read: Some(conn_async_read), + conn_async_write: Some(conn_async_write), + conn_blocking_write: Some(conn_blocking_write), + recovery_download: Some(recovery_download), + wal_read: Some(wal_read), + wal_reader_allocate: Some(wal_reader_allocate), + free_event_set: Some(free_event_set), + init_event_set: Some(init_event_set), + update_event_set: Some(update_event_set), + add_safekeeper_event_set: Some(add_safekeeper_event_set), + wait_event_set: Some(wait_event_set), + strong_random: Some(strong_random), + get_redo_start_lsn: Some(get_redo_start_lsn), + finish_sync_safekeepers: Some(finish_sync_safekeepers), + process_safekeeper_feedback: Some(process_safekeeper_feedback), + confirm_wal_streamed: Some(confirm_wal_streamed), + log_internal: Some(log_internal), + after_election: Some(after_election), + } +} + +impl std::fmt::Display for Level { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +/// Take ownership of `Vec` from StringInfoData. +pub(crate) fn take_vec_u8(pg: &mut StringInfoData) -> Option> { + if pg.data.is_null() { + return None; + } + + let ptr = pg.data as *mut u8; + let length = pg.len as usize; + let capacity = pg.maxlen as usize; + + pg.data = std::ptr::null_mut(); + pg.len = 0; + pg.maxlen = 0; + + unsafe { Some(Vec::from_raw_parts(ptr, length, capacity)) } +} + +/// Store `Vec` in StringInfoData. +fn store_vec_u8(pg: &mut StringInfoData, vec: Vec) -> *mut ::std::os::raw::c_char { + let ptr = vec.as_ptr() as *mut ::std::os::raw::c_char; + let length = vec.len(); + let capacity = vec.capacity(); + + assert!(pg.data.is_null()); + + pg.data = ptr; + pg.len = length as i32; + pg.maxlen = capacity as i32; + + std::mem::forget(vec); + + ptr +} diff --git a/libs/walproposer/src/lib.rs b/libs/walproposer/src/lib.rs new file mode 100644 index 0000000000..f26a13458b --- /dev/null +++ b/libs/walproposer/src/lib.rs @@ -0,0 +1,14 @@ +pub mod bindings { + #![allow(non_upper_case_globals)] + #![allow(non_camel_case_types)] + #![allow(non_snake_case)] + // bindgen creates some unsafe code with no doc comments. + #![allow(clippy::missing_safety_doc)] + // noted at 1.63 that in many cases there's a u32 -> u32 transmutes in bindgen code. + #![allow(clippy::useless_transmute)] + + include!(concat!(env!("OUT_DIR"), "/bindings.rs")); +} + +pub mod api_bindings; +pub mod walproposer; diff --git a/libs/walproposer/src/walproposer.rs b/libs/walproposer/src/walproposer.rs new file mode 100644 index 0000000000..4be15344c5 --- /dev/null +++ b/libs/walproposer/src/walproposer.rs @@ -0,0 +1,485 @@ +use std::ffi::CString; + +use postgres_ffi::WAL_SEGMENT_SIZE; +use utils::id::TenantTimelineId; + +use crate::{ + api_bindings::{create_api, take_vec_u8, Level}, + bindings::{ + Safekeeper, WalProposer, WalProposerConfig, WalProposerCreate, WalProposerFree, + WalProposerStart, + }, +}; + +/// Rust high-level wrapper for C walproposer API. Many methods are not required +/// for simple cases, hence todo!() in default implementations. +/// +/// Refer to `pgxn/neon/walproposer.h` for documentation. +pub trait ApiImpl { + fn get_shmem_state(&self) -> &mut crate::bindings::WalproposerShmemState { + todo!() + } + + fn start_streaming(&self, _startpos: u64) { + todo!() + } + + fn get_flush_rec_ptr(&self) -> u64 { + todo!() + } + + fn get_current_timestamp(&self) -> i64 { + todo!() + } + + fn conn_error_message(&self, _sk: &mut Safekeeper) -> String { + todo!() + } + + fn conn_status(&self, _sk: &mut Safekeeper) -> crate::bindings::WalProposerConnStatusType { + todo!() + } + + fn conn_connect_start(&self, _sk: &mut Safekeeper) { + todo!() + } + + fn conn_connect_poll( + &self, + _sk: &mut Safekeeper, + ) -> crate::bindings::WalProposerConnectPollStatusType { + todo!() + } + + fn conn_send_query(&self, _sk: &mut Safekeeper, _query: &str) -> bool { + todo!() + } + + fn conn_get_query_result( + &self, + _sk: &mut Safekeeper, + ) -> crate::bindings::WalProposerExecStatusType { + todo!() + } + + fn conn_flush(&self, _sk: &mut Safekeeper) -> i32 { + todo!() + } + + fn conn_finish(&self, _sk: &mut Safekeeper) { + todo!() + } + + fn conn_async_read(&self, _sk: &mut Safekeeper) -> (&[u8], crate::bindings::PGAsyncReadResult) { + todo!() + } + + fn conn_async_write( + &self, + _sk: &mut Safekeeper, + _buf: &[u8], + ) -> crate::bindings::PGAsyncWriteResult { + todo!() + } + + fn conn_blocking_write(&self, _sk: &mut Safekeeper, _buf: &[u8]) -> bool { + todo!() + } + + fn recovery_download(&self, _sk: &mut Safekeeper, _startpos: u64, _endpos: u64) -> bool { + todo!() + } + + fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) { + todo!() + } + + fn wal_reader_allocate(&self, _sk: &mut Safekeeper) { + todo!() + } + + fn free_event_set(&self, _wp: &mut WalProposer) { + todo!() + } + + fn init_event_set(&self, _wp: &mut WalProposer) { + todo!() + } + + fn update_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) { + todo!() + } + + fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) { + todo!() + } + + fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult { + todo!() + } + + fn strong_random(&self, _buf: &mut [u8]) -> bool { + todo!() + } + + fn get_redo_start_lsn(&self) -> u64 { + todo!() + } + + fn finish_sync_safekeepers(&self, _lsn: u64) { + todo!() + } + + fn process_safekeeper_feedback(&self, _wp: &mut WalProposer, _commit_lsn: u64) { + todo!() + } + + fn confirm_wal_streamed(&self, _wp: &mut WalProposer, _lsn: u64) { + todo!() + } + + fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) { + todo!() + } + + fn after_election(&self, _wp: &mut WalProposer) { + todo!() + } +} + +pub enum WaitResult { + Latch, + Timeout, + Network(*mut Safekeeper, u32), +} + +pub struct Config { + /// Tenant and timeline id + pub ttid: TenantTimelineId, + /// List of safekeepers in format `host:port` + pub safekeepers_list: Vec, + /// Safekeeper reconnect timeout in milliseconds + pub safekeeper_reconnect_timeout: i32, + /// Safekeeper connection timeout in milliseconds + pub safekeeper_connection_timeout: i32, + /// walproposer mode, finish when all safekeepers are synced or subscribe + /// to WAL streaming + pub sync_safekeepers: bool, +} + +/// WalProposer main struct. C methods are reexported as Rust functions. +pub struct Wrapper { + wp: *mut WalProposer, + _safekeepers_list_vec: Vec, +} + +impl Wrapper { + pub fn new(api: Box, config: Config) -> Wrapper { + let neon_tenant = CString::new(config.ttid.tenant_id.to_string()) + .unwrap() + .into_raw(); + let neon_timeline = CString::new(config.ttid.timeline_id.to_string()) + .unwrap() + .into_raw(); + + let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(",")) + .unwrap() + .into_bytes_with_nul(); + assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity()); + let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut i8; + + let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void; + + let c_config = WalProposerConfig { + neon_tenant, + neon_timeline, + safekeepers_list, + safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout, + safekeeper_connection_timeout: config.safekeeper_connection_timeout, + wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB + syncSafekeepers: config.sync_safekeepers, + systemId: 0, + pgTimeline: 1, + callback_data, + }; + let c_config = Box::into_raw(Box::new(c_config)); + + let api = create_api(); + let wp = unsafe { WalProposerCreate(c_config, api) }; + Wrapper { + wp, + _safekeepers_list_vec: safekeepers_list_vec, + } + } + + pub fn start(&self) { + unsafe { WalProposerStart(self.wp) } + } +} + +impl Drop for Wrapper { + fn drop(&mut self) { + unsafe { + let config = (*self.wp).config; + drop(Box::from_raw( + (*config).callback_data as *mut Box, + )); + drop(CString::from_raw((*config).neon_tenant)); + drop(CString::from_raw((*config).neon_timeline)); + drop(Box::from_raw(config)); + + for i in 0..(*self.wp).n_safekeepers { + let sk = &mut (*self.wp).safekeeper[i as usize]; + take_vec_u8(&mut sk.inbuf); + } + + WalProposerFree(self.wp); + } + } +} + +#[cfg(test)] +mod tests { + use std::{ + cell::Cell, + sync::{atomic::AtomicUsize, mpsc::sync_channel}, + }; + + use utils::id::TenantTimelineId; + + use crate::{api_bindings::Level, walproposer::Wrapper}; + + use super::ApiImpl; + + #[derive(Clone, Copy, Debug)] + struct WaitEventsData { + sk: *mut crate::bindings::Safekeeper, + event_mask: u32, + } + + struct MockImpl { + // data to return from wait_event_set + wait_events: Cell, + // walproposer->safekeeper messages + expected_messages: Vec>, + expected_ptr: AtomicUsize, + // safekeeper->walproposer messages + safekeeper_replies: Vec>, + replies_ptr: AtomicUsize, + // channel to send LSN to the main thread + sync_channel: std::sync::mpsc::SyncSender, + } + + impl MockImpl { + fn check_walproposer_msg(&self, msg: &[u8]) { + let ptr = self + .expected_ptr + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + + if ptr >= self.expected_messages.len() { + panic!("unexpected message from walproposer"); + } + + let expected_msg = &self.expected_messages[ptr]; + assert_eq!(msg, expected_msg.as_slice()); + } + + fn next_safekeeper_reply(&self) -> &[u8] { + let ptr = self + .replies_ptr + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + + if ptr >= self.safekeeper_replies.len() { + panic!("no more safekeeper replies"); + } + + &self.safekeeper_replies[ptr] + } + } + + impl ApiImpl for MockImpl { + fn get_current_timestamp(&self) -> i64 { + println!("get_current_timestamp"); + 0 + } + + fn conn_status( + &self, + _: &mut crate::bindings::Safekeeper, + ) -> crate::bindings::WalProposerConnStatusType { + println!("conn_status"); + crate::bindings::WalProposerConnStatusType_WP_CONNECTION_OK + } + + fn conn_connect_start(&self, _: &mut crate::bindings::Safekeeper) { + println!("conn_connect_start"); + } + + fn conn_connect_poll( + &self, + _: &mut crate::bindings::Safekeeper, + ) -> crate::bindings::WalProposerConnectPollStatusType { + println!("conn_connect_poll"); + crate::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK + } + + fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool { + println!("conn_send_query: {}", query); + true + } + + fn conn_get_query_result( + &self, + _: &mut crate::bindings::Safekeeper, + ) -> crate::bindings::WalProposerExecStatusType { + println!("conn_get_query_result"); + crate::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH + } + + fn conn_async_read( + &self, + _: &mut crate::bindings::Safekeeper, + ) -> (&[u8], crate::bindings::PGAsyncReadResult) { + println!("conn_async_read"); + let reply = self.next_safekeeper_reply(); + println!("conn_async_read result: {:?}", reply); + ( + reply, + crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS, + ) + } + + fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool { + println!("conn_blocking_write: {:?}", buf); + self.check_walproposer_msg(buf); + true + } + + fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) { + println!("wal_reader_allocate") + } + + fn free_event_set(&self, _: &mut crate::bindings::WalProposer) { + println!("free_event_set") + } + + fn init_event_set(&self, _: &mut crate::bindings::WalProposer) { + println!("init_event_set") + } + + fn update_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) { + println!( + "update_event_set, sk={:?}, events_mask={:#b}", + sk as *mut crate::bindings::Safekeeper, event_mask + ); + self.wait_events.set(WaitEventsData { sk, event_mask }); + } + + fn add_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) { + println!( + "add_safekeeper_event_set, sk={:?}, events_mask={:#b}", + sk as *mut crate::bindings::Safekeeper, event_mask + ); + self.wait_events.set(WaitEventsData { sk, event_mask }); + } + + fn wait_event_set( + &self, + _: &mut crate::bindings::WalProposer, + timeout_millis: i64, + ) -> super::WaitResult { + let data = self.wait_events.get(); + println!( + "wait_event_set, timeout_millis={}, res={:?}", + timeout_millis, data + ); + super::WaitResult::Network(data.sk, data.event_mask) + } + + fn strong_random(&self, buf: &mut [u8]) -> bool { + println!("strong_random"); + buf.fill(0); + true + } + + fn finish_sync_safekeepers(&self, lsn: u64) { + self.sync_channel.send(lsn).unwrap(); + panic!("sync safekeepers finished at lsn={}", lsn); + } + + fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) { + println!("walprop_log[{}] {}", level, msg); + } + + fn after_election(&self, _wp: &mut crate::bindings::WalProposer) { + println!("after_election"); + } + } + + /// Test that walproposer can successfully connect to safekeeper and finish + /// sync_safekeepers. API is mocked in MockImpl. + /// + /// Run this test with valgrind to detect leaks: + /// `valgrind --leak-check=full target/debug/deps/walproposer-` + #[test] + fn test_simple_sync_safekeepers() -> anyhow::Result<()> { + let ttid = TenantTimelineId::new( + "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?, + "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?, + ); + + let (sender, receiver) = sync_channel(1); + + let my_impl: Box = Box::new(MockImpl { + wait_events: Cell::new(WaitEventsData { + sk: std::ptr::null_mut(), + event_mask: 0, + }), + expected_messages: vec![ + // Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160000, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 }) + vec![ + 103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110, + 147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147, + 188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1, + ], + // VoteRequest(VoteRequest { term: 3 }) + vec![ + 118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, + ], + ], + expected_ptr: AtomicUsize::new(0), + safekeeper_replies: vec![ + // Greeting(AcceptorGreeting { term: 2, node_id: NodeId(1) }) + vec![ + 103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, + ], + // VoteResponse(VoteResponse { term: 3, vote_given: 1, flush_lsn: 0/539, truncate_lsn: 0/539, term_history: [(2, 0/539)], timeline_start_lsn: 0/539 }) + vec![ + 118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 57, + 5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, + 0, 57, 5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0, + ], + ], + replies_ptr: AtomicUsize::new(0), + sync_channel: sender, + }); + let config = crate::walproposer::Config { + ttid, + safekeepers_list: vec!["localhost:5000".to_string()], + safekeeper_reconnect_timeout: 1000, + safekeeper_connection_timeout: 10000, + sync_safekeepers: true, + }; + + let wp = Wrapper::new(my_impl, config); + + // walproposer will panic when it finishes sync_safekeepers + std::panic::catch_unwind(|| wp.start()).unwrap_err(); + // validate the resulting LSN + assert_eq!(receiver.recv()?, 1337); + Ok(()) + // drop() will free up resources here + } +} diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index e88901ed78..84835843bc 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -23,6 +23,23 @@ EXTENSION = neon DATA = neon--1.0.sql PGFILEDESC = "neon - cloud storage for PostgreSQL" +EXTRA_CLEAN = \ + libwalproposer.a + +WALPROP_OBJS = \ + $(WIN32RES) \ + walproposer.o \ + neon_utils.o \ + walproposer_compat.o + +.PHONY: walproposer-lib +walproposer-lib: CPPFLAGS += -DWALPROPOSER_LIB +walproposer-lib: libwalproposer.a; + +.PHONY: libwalproposer.a +libwalproposer.a: $(WALPROP_OBJS) + rm -f $@ + $(AR) $(AROPT) $@ $^ PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 10612e7e35..10544ba7a8 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -79,7 +79,7 @@ static int CompareLsn(const void *a, const void *b); static char *FormatSafekeeperState(SafekeeperState state); static void AssertEventsOkForState(uint32 events, Safekeeper *sk); static uint32 SafekeeperStateDesiredEvents(SafekeeperState state); -static char *FormatEvents(uint32 events); +static char *FormatEvents(WalProposer *wp, uint32 events); WalProposer * WalProposerCreate(WalProposerConfig *config, walproposer_api api) @@ -98,7 +98,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) port = strchr(host, ':'); if (port == NULL) { - elog(FATAL, "port is not specified"); + walprop_log(FATAL, "port is not specified"); } *port++ = '\0'; sep = strchr(port, ','); @@ -106,12 +106,11 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) *sep++ = '\0'; if (wp->n_safekeepers + 1 >= MAX_SAFEKEEPERS) { - elog(FATAL, "Too many safekeepers"); + walprop_log(FATAL, "Too many safekeepers"); } wp->safekeeper[wp->n_safekeepers].host = host; wp->safekeeper[wp->n_safekeepers].port = port; wp->safekeeper[wp->n_safekeepers].state = SS_OFFLINE; - wp->safekeeper[wp->n_safekeepers].conn = NULL; wp->safekeeper[wp->n_safekeepers].wp = wp; { @@ -122,13 +121,11 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) "host=%s port=%s dbname=replication options='-c timeline_id=%s tenant_id=%s'", sk->host, sk->port, wp->config->neon_timeline, wp->config->neon_tenant); if (written > MAXCONNINFO || written < 0) - elog(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port); + walprop_log(FATAL, "could not create connection string for safekeeper %s:%s", sk->host, sk->port); } initStringInfo(&wp->safekeeper[wp->n_safekeepers].outbuf); - wp->safekeeper[wp->n_safekeepers].xlogreader = wp->api.wal_reader_allocate(); - if (wp->safekeeper[wp->n_safekeepers].xlogreader == NULL) - elog(FATAL, "Failed to allocate xlog reader"); + wp->api.wal_reader_allocate(&wp->safekeeper[wp->n_safekeepers]); wp->safekeeper[wp->n_safekeepers].flushWrite = false; wp->safekeeper[wp->n_safekeepers].startStreamingAt = InvalidXLogRecPtr; wp->safekeeper[wp->n_safekeepers].streamingAt = InvalidXLogRecPtr; @@ -136,7 +133,7 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) } if (wp->n_safekeepers < 1) { - elog(FATAL, "Safekeepers addresses are not specified"); + walprop_log(FATAL, "Safekeepers addresses are not specified"); } wp->quorum = wp->n_safekeepers / 2 + 1; @@ -144,27 +141,47 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api) wp->greetRequest.tag = 'g'; wp->greetRequest.protocolVersion = SK_PROTOCOL_VERSION; wp->greetRequest.pgVersion = PG_VERSION_NUM; - wp->api.strong_random(&wp->greetRequest.proposerId, sizeof(wp->greetRequest.proposerId)); + wp->api.strong_random(wp, &wp->greetRequest.proposerId, sizeof(wp->greetRequest.proposerId)); wp->greetRequest.systemId = wp->config->systemId; if (!wp->config->neon_timeline) - elog(FATAL, "neon.timeline_id is not provided"); + walprop_log(FATAL, "neon.timeline_id is not provided"); if (*wp->config->neon_timeline != '\0' && !HexDecodeString(wp->greetRequest.timeline_id, wp->config->neon_timeline, 16)) - elog(FATAL, "Could not parse neon.timeline_id, %s", wp->config->neon_timeline); + walprop_log(FATAL, "Could not parse neon.timeline_id, %s", wp->config->neon_timeline); if (!wp->config->neon_tenant) - elog(FATAL, "neon.tenant_id is not provided"); + walprop_log(FATAL, "neon.tenant_id is not provided"); if (*wp->config->neon_tenant != '\0' && !HexDecodeString(wp->greetRequest.tenant_id, wp->config->neon_tenant, 16)) - elog(FATAL, "Could not parse neon.tenant_id, %s", wp->config->neon_tenant); + walprop_log(FATAL, "Could not parse neon.tenant_id, %s", wp->config->neon_tenant); - wp->greetRequest.timeline = wp->api.get_timeline_id(); + wp->greetRequest.timeline = wp->config->pgTimeline; wp->greetRequest.walSegSize = wp->config->wal_segment_size; - wp->api.init_event_set(wp->n_safekeepers); + wp->api.init_event_set(wp); return wp; } +void +WalProposerFree(WalProposer *wp) +{ + for (int i = 0; i < wp->n_safekeepers; i++) + { + Safekeeper *sk = &wp->safekeeper[i]; + + Assert(sk->outbuf.data != NULL); + pfree(sk->outbuf.data); + if (sk->voteResponse.termHistory.entries) + pfree(sk->voteResponse.termHistory.entries); + sk->voteResponse.termHistory.entries = NULL; + } + if (wp->propTermHistory.entries != NULL) + pfree(wp->propTermHistory.entries); + wp->propTermHistory.entries = NULL; + + pfree(wp); +} + /* * Create new AppendRequest message and start sending it. This function is * called from walsender every time the new WAL is available. @@ -190,10 +207,10 @@ WalProposerPoll(WalProposer *wp) Safekeeper *sk = NULL; int rc = 0; uint32 events = 0; - TimestampTz now = wp->api.get_current_timestamp(); + TimestampTz now = wp->api.get_current_timestamp(wp); long timeout = TimeToReconnect(wp, now); - rc = wp->api.wait_event_set(timeout, &sk, &events); + rc = wp->api.wait_event_set(wp, timeout, &sk, &events); /* Exit loop if latch is set (we got new WAL) */ if ((rc == 1 && events & WL_LATCH_SET)) @@ -224,14 +241,14 @@ WalProposerPoll(WalProposer *wp) */ if (!wp->config->syncSafekeepers) { - XLogRecPtr flushed = wp->api.get_flush_rec_ptr(); + XLogRecPtr flushed = wp->api.get_flush_rec_ptr(wp); if (flushed > wp->availableLsn) break; } } - now = wp->api.get_current_timestamp(); + now = wp->api.get_current_timestamp(wp); /* timeout expired: poll state */ if (rc == 0 || TimeToReconnect(wp, now) <= 0) { @@ -249,7 +266,7 @@ WalProposerPoll(WalProposer *wp) /* * Abandon connection attempts which take too long. */ - now = wp->api.get_current_timestamp(); + now = wp->api.get_current_timestamp(wp); for (int i = 0; i < wp->n_safekeepers; i++) { Safekeeper *sk = &wp->safekeeper[i]; @@ -257,7 +274,7 @@ WalProposerPoll(WalProposer *wp) if (TimestampDifferenceExceeds(sk->latestMsgReceivedAt, now, wp->config->safekeeper_connection_timeout)) { - elog(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that", + walprop_log(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that", sk->host, sk->port, FormatSafekeeperState(sk->state), wp->config->safekeeper_connection_timeout); ShutdownConnection(sk); } @@ -296,10 +313,10 @@ HackyRemoveWalProposerEvent(Safekeeper *to_remove) { WalProposer *wp = to_remove->wp; - /* Remove the existing event set */ - wp->api.free_event_set(); + /* Remove the existing event set, assign sk->eventPos = -1 */ + wp->api.free_event_set(wp); /* Re-initialize it without adding any safekeeper events */ - wp->api.init_event_set(wp->n_safekeepers); + wp->api.init_event_set(wp); /* * loop through the existing safekeepers. If they aren't the one we're @@ -311,13 +328,11 @@ HackyRemoveWalProposerEvent(Safekeeper *to_remove) uint32 desired_events = WL_NO_EVENTS; Safekeeper *sk = &wp->safekeeper[i]; - sk->eventPos = -1; - if (sk == to_remove) continue; /* If this safekeeper isn't offline, add an event for it! */ - if (sk->conn != NULL) + if (sk->state != SS_OFFLINE) { desired_events = SafekeeperStateDesiredEvents(sk->state); /* will set sk->eventPos */ @@ -330,9 +345,7 @@ HackyRemoveWalProposerEvent(Safekeeper *to_remove) static void ShutdownConnection(Safekeeper *sk) { - if (sk->conn) - sk->wp->api.conn_finish(sk->conn); - sk->conn = NULL; + sk->wp->api.conn_finish(sk); sk->state = SS_OFFLINE; sk->flushWrite = false; sk->streamingAt = InvalidXLogRecPtr; @@ -361,23 +374,16 @@ ResetConnection(Safekeeper *sk) } /* - * Try to establish new connection + * Try to establish new connection, it will update sk->conn. */ - sk->conn = wp->api.conn_connect_start((char *) &sk->conninfo); - - /* - * "If the result is null, then libpq has been unable to allocate a new - * PGconn structure" - */ - if (!sk->conn) - elog(FATAL, "failed to allocate new PGconn object"); + wp->api.conn_connect_start(sk); /* * PQconnectStart won't actually start connecting until we run * PQconnectPoll. Before we do that though, we need to check that it * didn't immediately fail. */ - if (wp->api.conn_status(sk->conn) == WP_CONNECTION_BAD) + if (wp->api.conn_status(sk) == WP_CONNECTION_BAD) { /*--- * According to libpq docs: @@ -388,15 +394,14 @@ ResetConnection(Safekeeper *sk) * * https://www.postgresql.org/docs/devel/libpq-connect.html#LIBPQ-PQCONNECTSTARTPARAMS */ - elog(WARNING, "Immediate failure to connect with node '%s:%s':\n\terror: %s", - sk->host, sk->port, wp->api.conn_error_message(sk->conn)); + walprop_log(WARNING, "Immediate failure to connect with node '%s:%s':\n\terror: %s", + sk->host, sk->port, wp->api.conn_error_message(sk)); /* * Even though the connection failed, we still need to clean up the * object */ - wp->api.conn_finish(sk->conn); - sk->conn = NULL; + wp->api.conn_finish(sk); return; } @@ -413,10 +418,10 @@ ResetConnection(Safekeeper *sk) * (see libpqrcv_connect, defined in * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c) */ - elog(LOG, "connecting with node %s:%s", sk->host, sk->port); + walprop_log(LOG, "connecting with node %s:%s", sk->host, sk->port); sk->state = SS_CONNECTING_WRITE; - sk->latestMsgReceivedAt = wp->api.get_current_timestamp(); + sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp); wp->api.add_safekeeper_event_set(sk, WL_SOCKET_WRITEABLE); return; @@ -447,7 +452,7 @@ TimeToReconnect(WalProposer *wp, TimestampTz now) static void ReconnectSafekeepers(WalProposer *wp) { - TimestampTz now = wp->api.get_current_timestamp(); + TimestampTz now = wp->api.get_current_timestamp(wp); if (TimeToReconnect(wp, now) == 0) { @@ -467,6 +472,8 @@ ReconnectSafekeepers(WalProposer *wp) static void AdvancePollState(Safekeeper *sk, uint32 events) { + WalProposer *wp = sk->wp; + /* * Sanity check. We assume further down that the operations don't block * because the socket is ready. @@ -481,7 +488,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) * ResetConnection */ case SS_OFFLINE: - elog(FATAL, "Unexpected safekeeper %s:%s state advancement: is offline", + walprop_log(FATAL, "Unexpected safekeeper %s:%s state advancement: is offline", sk->host, sk->port); break; /* actually unreachable, but prevents * -Wimplicit-fallthrough */ @@ -517,7 +524,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) * requests. */ case SS_VOTING: - elog(WARNING, "EOF from node %s:%s in %s state", sk->host, + walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host, sk->port, FormatSafekeeperState(sk->state)); ResetConnection(sk); return; @@ -546,7 +553,7 @@ AdvancePollState(Safekeeper *sk, uint32 events) * Idle state for waiting votes from quorum. */ case SS_IDLE: - elog(WARNING, "EOF from node %s:%s in %s state", sk->host, + walprop_log(WARNING, "EOF from node %s:%s in %s state", sk->host, sk->port, FormatSafekeeperState(sk->state)); ResetConnection(sk); return; @@ -564,7 +571,7 @@ static void HandleConnectionEvent(Safekeeper *sk) { WalProposer *wp = sk->wp; - WalProposerConnectPollStatusType result = wp->api.conn_connect_poll(sk->conn); + WalProposerConnectPollStatusType result = wp->api.conn_connect_poll(sk); /* The new set of events we'll wait on, after updating */ uint32 new_events = WL_NO_EVENTS; @@ -572,9 +579,9 @@ HandleConnectionEvent(Safekeeper *sk) switch (result) { case WP_CONN_POLLING_OK: - elog(LOG, "connected with node %s:%s", sk->host, + walprop_log(LOG, "connected with node %s:%s", sk->host, sk->port); - sk->latestMsgReceivedAt = wp->api.get_current_timestamp(); + sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp); /* * We have to pick some event to update event set. We'll @@ -596,8 +603,8 @@ HandleConnectionEvent(Safekeeper *sk) break; case WP_CONN_POLLING_FAILED: - elog(WARNING, "failed to connect to node '%s:%s': %s", - sk->host, sk->port, wp->api.conn_error_message(sk->conn)); + walprop_log(WARNING, "failed to connect to node '%s:%s': %s", + sk->host, sk->port, wp->api.conn_error_message(sk)); /* * If connecting failed, we don't want to restart the connection @@ -631,10 +638,10 @@ SendStartWALPush(Safekeeper *sk) { WalProposer *wp = sk->wp; - if (!wp->api.conn_send_query(sk->conn, "START_WAL_PUSH")) + if (!wp->api.conn_send_query(sk, "START_WAL_PUSH")) { - elog(WARNING, "Failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s", - sk->host, sk->port, wp->api.conn_error_message(sk->conn)); + walprop_log(WARNING, "Failed to send 'START_WAL_PUSH' query to safekeeper %s:%s: %s", + sk->host, sk->port, wp->api.conn_error_message(sk)); ShutdownConnection(sk); return; } @@ -647,7 +654,7 @@ RecvStartWALPushResult(Safekeeper *sk) { WalProposer *wp = sk->wp; - switch (wp->api.conn_get_query_result(sk->conn)) + switch (wp->api.conn_get_query_result(sk)) { /* * Successful result, move on to starting the handshake @@ -670,8 +677,8 @@ RecvStartWALPushResult(Safekeeper *sk) break; case WP_EXEC_FAILED: - elog(WARNING, "Failed to send query to safekeeper %s:%s: %s", - sk->host, sk->port, wp->api.conn_error_message(sk->conn)); + walprop_log(WARNING, "Failed to send query to safekeeper %s:%s: %s", + sk->host, sk->port, wp->api.conn_error_message(sk)); ShutdownConnection(sk); return; @@ -681,7 +688,7 @@ RecvStartWALPushResult(Safekeeper *sk) * wrong" */ case WP_EXEC_UNEXPECTED_SUCCESS: - elog(WARNING, "Received bad response from safekeeper %s:%s query execution", + walprop_log(WARNING, "Received bad response from safekeeper %s:%s query execution", sk->host, sk->port); ShutdownConnection(sk); return; @@ -717,7 +724,7 @@ RecvAcceptorGreeting(Safekeeper *sk) if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->greetResponse)) return; - elog(LOG, "received AcceptorGreeting from safekeeper %s:%s", sk->host, sk->port); + walprop_log(LOG, "received AcceptorGreeting from safekeeper %s:%s", sk->host, sk->port); /* Protocol is all good, move to voting. */ sk->state = SS_VOTING; @@ -737,7 +744,7 @@ RecvAcceptorGreeting(Safekeeper *sk) if (wp->n_connected == wp->quorum) { wp->propTerm++; - elog(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm); + walprop_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm); wp->voteRequest = (VoteRequest) { @@ -750,7 +757,7 @@ RecvAcceptorGreeting(Safekeeper *sk) else if (sk->greetResponse.term > wp->propTerm) { /* Another compute with higher term is running. */ - elog(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", + walprop_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", sk->host, sk->port, sk->greetResponse.term, wp->propTerm); } @@ -792,7 +799,7 @@ SendVoteRequest(Safekeeper *sk) WalProposer *wp = sk->wp; /* We have quorum for voting, send our vote request */ - elog(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, wp->voteRequest.term); + walprop_log(LOG, "requesting vote from %s:%s for term " UINT64_FORMAT, sk->host, sk->port, wp->voteRequest.term); /* On failure, logging & resetting is handled */ if (!BlockingWrite(sk, &wp->voteRequest, sizeof(wp->voteRequest), SS_WAIT_VERDICT)) return; @@ -809,7 +816,7 @@ RecvVoteResponse(Safekeeper *sk) if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->voteResponse)) return; - elog(LOG, + walprop_log(LOG, "got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X, timelineStartLsn=%X/%X", sk->host, sk->port, sk->voteResponse.voteGiven, GetHighestTerm(&sk->voteResponse.termHistory), LSN_FORMAT_ARGS(sk->voteResponse.flushLsn), @@ -824,7 +831,7 @@ RecvVoteResponse(Safekeeper *sk) if ((!sk->voteResponse.voteGiven) && (sk->voteResponse.term > wp->propTerm || wp->n_votes < wp->quorum)) { - elog(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", + walprop_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejects our connection request with term " INT64_FORMAT "", sk->host, sk->port, sk->voteResponse.term, wp->propTerm); } @@ -861,49 +868,27 @@ RecvVoteResponse(Safekeeper *sk) static void HandleElectedProposer(WalProposer *wp) { - FILE* f; - XLogRecPtr lrRestartLsn; - DetermineEpochStartLsn(wp); - /* - * If there are active logical replication subscription we need - * to provide enough WAL for their WAL senders based on th position - * of their replication slots. - */ - f = fopen("restart.lsn", "rb"); - if (f != NULL && !wp->config->syncSafekeepers) - { - fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f); - fclose(f); - if (lrRestartLsn != InvalidXLogRecPtr) - { - elog(LOG, "Logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn)); - /* start from the beginning of the segment to fetch page headers verifed by XLogReader */ - lrRestartLsn = lrRestartLsn - XLogSegmentOffset(lrRestartLsn, wal_segment_size); - wp->truncateLsn = Min(wp->truncateLsn, lrRestartLsn); - } - } - /* * Check if not all safekeepers are up-to-date, we need to download WAL * needed to synchronize them */ if (wp->truncateLsn < wp->propEpochStartLsn) { - elog(LOG, + walprop_log(LOG, "start recovery because truncateLsn=%X/%X is not " "equal to epochStartLsn=%X/%X", LSN_FORMAT_ARGS(wp->truncateLsn), LSN_FORMAT_ARGS(wp->propEpochStartLsn)); /* Perform recovery */ if (!wp->api.recovery_download(&wp->safekeeper[wp->donor], wp->greetRequest.timeline, wp->truncateLsn, wp->propEpochStartLsn)) - elog(FATAL, "Failed to recover state"); + walprop_log(FATAL, "Failed to recover state"); } else if (wp->config->syncSafekeepers) { /* Sync is not needed: just exit */ - wp->api.finish_sync_safekeepers(wp->propEpochStartLsn); + wp->api.finish_sync_safekeepers(wp, wp->propEpochStartLsn); /* unreachable */ } @@ -1004,7 +989,7 @@ DetermineEpochStartLsn(WalProposer *wp) if (wp->timelineStartLsn != InvalidXLogRecPtr && wp->timelineStartLsn != wp->safekeeper[i].voteResponse.timelineStartLsn) { - elog(WARNING, + walprop_log(WARNING, "inconsistent timelineStartLsn: current %X/%X, received %X/%X", LSN_FORMAT_ARGS(wp->timelineStartLsn), LSN_FORMAT_ARGS(wp->safekeeper[i].voteResponse.timelineStartLsn)); @@ -1020,12 +1005,12 @@ DetermineEpochStartLsn(WalProposer *wp) */ if (wp->propEpochStartLsn == InvalidXLogRecPtr && !wp->config->syncSafekeepers) { - wp->propEpochStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(); + wp->propEpochStartLsn = wp->truncateLsn = wp->api.get_redo_start_lsn(wp); if (wp->timelineStartLsn == InvalidXLogRecPtr) { - wp->timelineStartLsn = wp->api.get_redo_start_lsn(); + wp->timelineStartLsn = wp->api.get_redo_start_lsn(wp); } - elog(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn)); + walprop_log(LOG, "bumped epochStartLsn to the first record %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn)); } /* @@ -1052,7 +1037,7 @@ DetermineEpochStartLsn(WalProposer *wp) wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].term = wp->propTerm; wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propEpochStartLsn; - elog(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X", + walprop_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X", wp->quorum, wp->propTerm, LSN_FORMAT_ARGS(wp->propEpochStartLsn), @@ -1066,7 +1051,7 @@ DetermineEpochStartLsn(WalProposer *wp) */ if (!wp->config->syncSafekeepers) { - WalproposerShmemState *walprop_shared = wp->api.get_shmem_state(); + WalproposerShmemState *walprop_shared = wp->api.get_shmem_state(wp); /* * Basebackup LSN always points to the beginning of the record (not @@ -1074,7 +1059,7 @@ DetermineEpochStartLsn(WalProposer *wp) * Safekeepers don't skip header as they need continious stream of * data, so correct LSN for comparison. */ - if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn()) + if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp)) { /* * However, allow to proceed if previously elected leader was me; @@ -1084,14 +1069,21 @@ DetermineEpochStartLsn(WalProposer *wp) if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term == walprop_shared->mineLastElectedTerm))) { - elog(PANIC, + walprop_log(PANIC, "collected propEpochStartLsn %X/%X, but basebackup LSN %X/%X", LSN_FORMAT_ARGS(wp->propEpochStartLsn), - LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn())); + LSN_FORMAT_ARGS(wp->api.get_redo_start_lsn(wp))); } } walprop_shared->mineLastElectedTerm = wp->propTerm; } + + /* + * WalProposer has just elected itself and initialized history, so + * we can call election callback. Usually it updates truncateLsn to + * fetch WAL for logical replication. + */ + wp->api.after_election(wp); } /* @@ -1162,7 +1154,7 @@ SendProposerElected(Safekeeper *sk) */ sk->startStreamingAt = wp->truncateLsn; - elog(WARNING, "empty safekeeper joined cluster as %s:%s, historyStart=%X/%X, sk->startStreamingAt=%X/%X", + walprop_log(WARNING, "empty safekeeper joined cluster as %s:%s, historyStart=%X/%X, sk->startStreamingAt=%X/%X", sk->host, sk->port, LSN_FORMAT_ARGS(wp->propTermHistory.entries[0].lsn), LSN_FORMAT_ARGS(sk->startStreamingAt)); } @@ -1197,7 +1189,7 @@ SendProposerElected(Safekeeper *sk) msg.timelineStartLsn = wp->timelineStartLsn; lastCommonTerm = i >= 0 ? wp->propTermHistory.entries[i].term : 0; - elog(LOG, + walprop_log(LOG, "sending elected msg to node " UINT64_FORMAT " term=" UINT64_FORMAT ", startStreamingAt=%X/%X (lastCommonTerm=" UINT64_FORMAT "), termHistory.n_entries=%u to %s:%s, timelineStartLsn=%X/%X", sk->greetResponse.nodeId, msg.term, LSN_FORMAT_ARGS(msg.startStreamingAt), lastCommonTerm, msg.termHistory->n_entries, sk->host, sk->port, LSN_FORMAT_ARGS(msg.timelineStartLsn)); @@ -1362,13 +1354,12 @@ SendAppendRequests(Safekeeper *sk) req = &sk->appendRequest; PrepareAppendRequest(sk->wp, &sk->appendRequest, sk->streamingAt, endLsn); - ereport(DEBUG2, - (errmsg("sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s", + walprop_log(DEBUG2, "sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s", req->endLsn - req->beginLsn, LSN_FORMAT_ARGS(req->beginLsn), LSN_FORMAT_ARGS(req->endLsn), LSN_FORMAT_ARGS(req->commitLsn), - LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port))); + LSN_FORMAT_ARGS(wp->truncateLsn), sk->host, sk->port); resetStringInfo(&sk->outbuf); @@ -1378,13 +1369,13 @@ SendAppendRequests(Safekeeper *sk) /* write the WAL itself */ enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn); /* wal_read will raise error on failure */ - wp->api.wal_read(sk->xlogreader, + wp->api.wal_read(sk, &sk->outbuf.data[sk->outbuf.len], req->beginLsn, req->endLsn - req->beginLsn); sk->outbuf.len += req->endLsn - req->beginLsn; - writeResult = wp->api.conn_async_write(sk->conn, sk->outbuf.data, sk->outbuf.len); + writeResult = wp->api.conn_async_write(sk, sk->outbuf.data, sk->outbuf.len); /* Mark current message as sent, whatever the result is */ sk->streamingAt = endLsn; @@ -1406,9 +1397,9 @@ SendAppendRequests(Safekeeper *sk) return true; case PG_ASYNC_WRITE_FAIL: - elog(WARNING, "Failed to send to node %s:%s in %s state: %s", + walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), - wp->api.conn_error_message(sk->conn)); + wp->api.conn_error_message(sk)); ShutdownConnection(sk); return false; default: @@ -1446,17 +1437,16 @@ RecvAppendResponses(Safekeeper *sk) if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->appendResponse)) break; - ereport(DEBUG2, - (errmsg("received message term=" INT64_FORMAT " flushLsn=%X/%X commitLsn=%X/%X from %s:%s", + walprop_log(DEBUG2, "received message term=" INT64_FORMAT " flushLsn=%X/%X commitLsn=%X/%X from %s:%s", sk->appendResponse.term, LSN_FORMAT_ARGS(sk->appendResponse.flushLsn), LSN_FORMAT_ARGS(sk->appendResponse.commitLsn), - sk->host, sk->port))); + sk->host, sk->port); if (sk->appendResponse.term > wp->propTerm) { /* Another compute with higher term is running. */ - elog(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT "", + walprop_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT "", sk->host, sk->port, sk->appendResponse.term, wp->propTerm); } @@ -1484,7 +1474,7 @@ RecvAppendResponses(Safekeeper *sk) /* Parse a PageserverFeedback message, or the PageserverFeedback part of an AppendResponse */ void -ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback *rf) +ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, PageserverFeedback *rf) { uint8 nkeys; int i; @@ -1502,7 +1492,7 @@ ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback *rf) pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ rf->currentClusterSize = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParsePageserverFeedbackMessage: current_timeline_size %lu", + walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: current_timeline_size %lu", rf->currentClusterSize); } else if ((strcmp(key, "ps_writelsn") == 0) || (strcmp(key, "last_received_lsn") == 0)) @@ -1510,7 +1500,7 @@ ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback *rf) pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ rf->last_received_lsn = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParsePageserverFeedbackMessage: last_received_lsn %X/%X", + walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: last_received_lsn %X/%X", LSN_FORMAT_ARGS(rf->last_received_lsn)); } else if ((strcmp(key, "ps_flushlsn") == 0) || (strcmp(key, "disk_consistent_lsn") == 0)) @@ -1518,7 +1508,7 @@ ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback *rf) pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ rf->disk_consistent_lsn = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParsePageserverFeedbackMessage: disk_consistent_lsn %X/%X", + walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: disk_consistent_lsn %X/%X", LSN_FORMAT_ARGS(rf->disk_consistent_lsn)); } else if ((strcmp(key, "ps_applylsn") == 0) || (strcmp(key, "remote_consistent_lsn") == 0)) @@ -1526,7 +1516,7 @@ ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback *rf) pq_getmsgint(reply_message, sizeof(int32)); /* read value length */ rf->remote_consistent_lsn = pq_getmsgint64(reply_message); - elog(DEBUG2, "ParsePageserverFeedbackMessage: remote_consistent_lsn %X/%X", + walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: remote_consistent_lsn %X/%X", LSN_FORMAT_ARGS(rf->remote_consistent_lsn)); } else if ((strcmp(key, "ps_replytime") == 0) || (strcmp(key, "replytime") == 0)) @@ -1539,7 +1529,7 @@ ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback *rf) /* Copy because timestamptz_to_str returns a static buffer */ replyTimeStr = pstrdup(timestamptz_to_str(rf->replytime)); - elog(DEBUG2, "ParsePageserverFeedbackMessage: replytime %lu reply_time: %s", + walprop_log(DEBUG2, "ParsePageserverFeedbackMessage: replytime %lu reply_time: %s", rf->replytime, replyTimeStr); pfree(replyTimeStr); @@ -1554,7 +1544,7 @@ ParsePageserverFeedbackMessage(StringInfo reply_message, PageserverFeedback *rf) * Skip unknown keys to support backward compatibile protocol * changes */ - elog(LOG, "ParsePageserverFeedbackMessage: unknown key: %s len %d", key, len); + walprop_log(LOG, "ParsePageserverFeedbackMessage: unknown key: %s len %d", key, len); pq_getmsgbytes(reply_message, len); }; } @@ -1637,7 +1627,7 @@ HandleSafekeeperResponse(WalProposer *wp) * Advance the replication slot to free up old WAL files. Note that * slot doesn't exist if we are in syncSafekeepers mode. */ - wp->api.confirm_wal_streamed(wp->truncateLsn); + wp->api.confirm_wal_streamed(wp, wp->truncateLsn); } /* @@ -1684,7 +1674,7 @@ HandleSafekeeperResponse(WalProposer *wp) */ BroadcastAppendRequest(wp); - wp->api.finish_sync_safekeepers(wp->propEpochStartLsn); + wp->api.finish_sync_safekeepers(wp, wp->propEpochStartLsn); /* unreachable */ } } @@ -1699,7 +1689,7 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size) { WalProposer *wp = sk->wp; - switch (wp->api.conn_async_read(sk->conn, buf, buf_size)) + switch (wp->api.conn_async_read(sk, buf, buf_size)) { case PG_ASYNC_READ_SUCCESS: return true; @@ -1709,9 +1699,9 @@ AsyncRead(Safekeeper *sk, char **buf, int *buf_size) return false; case PG_ASYNC_READ_FAIL: - elog(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host, + walprop_log(WARNING, "Failed to read from node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), - wp->api.conn_error_message(sk->conn)); + wp->api.conn_error_message(sk)); ShutdownConnection(sk); return false; } @@ -1749,12 +1739,12 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) tag = pq_getmsgint64_le(&s); if (tag != anymsg->tag) { - elog(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host, + walprop_log(WARNING, "unexpected message tag %c from node %s:%s in state %s", (char) tag, sk->host, sk->port, FormatSafekeeperState(sk->state)); ResetConnection(sk); return false; } - sk->latestMsgReceivedAt = wp->api.get_current_timestamp(); + sk->latestMsgReceivedAt = wp->api.get_current_timestamp(wp); switch (tag) { case 'g': @@ -1798,7 +1788,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg) msg->hs.xmin.value = pq_getmsgint64_le(&s); msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s); if (buf_size > APPENDRESPONSE_FIXEDPART_SIZE) - ParsePageserverFeedbackMessage(&s, &msg->rf); + ParsePageserverFeedbackMessage(wp, &s, &msg->rf); pq_getmsgend(&s); return true; } @@ -1823,11 +1813,11 @@ BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState succes WalProposer *wp = sk->wp; uint32 events; - if (!wp->api.conn_blocking_write(sk->conn, msg, msg_size)) + if (!wp->api.conn_blocking_write(sk, msg, msg_size)) { - elog(WARNING, "Failed to send to node %s:%s in %s state: %s", + walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), - wp->api.conn_error_message(sk->conn)); + wp->api.conn_error_message(sk)); ShutdownConnection(sk); return false; } @@ -1857,7 +1847,7 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta { WalProposer *wp = sk->wp; - switch (wp->api.conn_async_write(sk->conn, msg, msg_size)) + switch (wp->api.conn_async_write(sk, msg, msg_size)) { case PG_ASYNC_WRITE_SUCCESS: return true; @@ -1872,9 +1862,9 @@ AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState flush_sta wp->api.update_event_set(sk, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); return false; case PG_ASYNC_WRITE_FAIL: - elog(WARNING, "Failed to send to node %s:%s in %s state: %s", + walprop_log(WARNING, "Failed to send to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), - wp->api.conn_error_message(sk->conn)); + wp->api.conn_error_message(sk)); ShutdownConnection(sk); return false; default: @@ -1902,7 +1892,7 @@ AsyncFlush(Safekeeper *sk) * 1 if unable to send everything yet [call PQflush again] * -1 if it failed [emit an error] */ - switch (wp->api.conn_flush(sk->conn)) + switch (wp->api.conn_flush(sk)) { case 0: /* flush is done */ @@ -1911,9 +1901,9 @@ AsyncFlush(Safekeeper *sk) /* Nothing to do; try again when the socket's ready */ return false; case -1: - elog(WARNING, "Failed to flush write to node %s:%s in %s state: %s", + walprop_log(WARNING, "Failed to flush write to node %s:%s in %s state: %s", sk->host, sk->port, FormatSafekeeperState(sk->state), - wp->api.conn_error_message(sk->conn)); + wp->api.conn_error_message(sk)); ResetConnection(sk); return false; default: @@ -1942,11 +1932,11 @@ CompareLsn(const void *a, const void *b) * * The strings are intended to be used as a prefix to "state", e.g.: * - * elog(LOG, "currently in %s state", FormatSafekeeperState(sk->state)); + * walprop_log(LOG, "currently in %s state", FormatSafekeeperState(sk->state)); * * If this sort of phrasing doesn't fit the message, instead use something like: * - * elog(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state)); + * walprop_log(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state)); */ static char * FormatSafekeeperState(SafekeeperState state) @@ -1994,6 +1984,7 @@ FormatSafekeeperState(SafekeeperState state) static void AssertEventsOkForState(uint32 events, Safekeeper *sk) { + WalProposer *wp = sk->wp; uint32 expected = SafekeeperStateDesiredEvents(sk->state); /* @@ -2016,8 +2007,8 @@ AssertEventsOkForState(uint32 events, Safekeeper *sk) * To give a descriptive message in the case of failure, we use elog * and then an assertion that's guaranteed to fail. */ - elog(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]", - FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state)); + walprop_log(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]", + FormatEvents(wp, events), sk->host, sk->port, FormatSafekeeperState(sk->state)); Assert(events_ok_for_state); } } @@ -2090,7 +2081,7 @@ SafekeeperStateDesiredEvents(SafekeeperState state) * The string should not be freed. It should also not be expected to remain the same between * function calls. */ static char * -FormatEvents(uint32 events) +FormatEvents(WalProposer *wp, uint32 events) { static char return_str[8]; @@ -2119,7 +2110,7 @@ FormatEvents(uint32 events) if (events & (~all_flags)) { - elog(WARNING, "Event formatting found unexpected component %d", + walprop_log(WARNING, "Event formatting found unexpected component %d", events & (~all_flags)); return_str[6] = '*'; return_str[7] = '\0'; diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index a1a9ccdfdd..664aeedfa7 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -333,24 +333,11 @@ typedef struct Safekeeper */ char conninfo[MAXCONNINFO]; - /* - * postgres protocol connection to the WAL acceptor - * - * Equals NULL only when state = SS_OFFLINE. Nonblocking is set once we - * reach SS_ACTIVE; not before. - */ - WalProposerConn *conn; - /* * Temporary buffer for the message being sent to the safekeeper. */ StringInfoData outbuf; - /* - * WAL reader, allocated for each safekeeper. - */ - XLogReaderState *xlogreader; - /* * Streaming will start here; must be record boundary. */ @@ -361,13 +348,43 @@ typedef struct Safekeeper XLogRecPtr streamingAt; /* current streaming position */ AppendRequestHeader appendRequest; /* request for sending to safekeeper */ - int eventPos; /* position in wait event set. Equal to -1 if* - * no event */ SafekeeperState state; /* safekeeper state machine state */ TimestampTz latestMsgReceivedAt; /* when latest msg is received */ AcceptorGreeting greetResponse; /* acceptor greeting */ VoteResponse voteResponse; /* the vote */ AppendResponse appendResponse; /* feedback for master */ + + + /* postgres-specific fields */ + #ifndef WALPROPOSER_LIB + /* + * postgres protocol connection to the WAL acceptor + * + * Equals NULL only when state = SS_OFFLINE. Nonblocking is set once we + * reach SS_ACTIVE; not before. + */ + WalProposerConn *conn; + + /* + * WAL reader, allocated for each safekeeper. + */ + XLogReaderState *xlogreader; + + /* + * Position in wait event set. Equal to -1 if no event + */ + int eventPos; + #endif + + + /* WalProposer library specifics */ + #ifdef WALPROPOSER_LIB + /* + * Buffer for incoming messages. Usually Rust vector is stored here. + * Caller is responsible for freeing the buffer. + */ + StringInfoData inbuf; + #endif } Safekeeper; /* Re-exported PostgresPollingStatusType */ @@ -433,7 +450,7 @@ typedef struct walproposer_api * Get WalproposerShmemState. This is used to store information about last * elected term. */ - WalproposerShmemState *(*get_shmem_state) (void); + WalproposerShmemState *(*get_shmem_state) (WalProposer *wp); /* * Start receiving notifications about new WAL. This is an infinite loop @@ -443,61 +460,63 @@ typedef struct walproposer_api void (*start_streaming) (WalProposer *wp, XLogRecPtr startpos); /* Get pointer to the latest available WAL. */ - XLogRecPtr (*get_flush_rec_ptr) (void); + XLogRecPtr (*get_flush_rec_ptr) (WalProposer *wp); /* Get current time. */ - TimestampTz (*get_current_timestamp) (void); - - /* Get postgres timeline. */ - TimeLineID (*get_timeline_id) (void); + TimestampTz (*get_current_timestamp) (WalProposer *wp); /* Current error message, aka PQerrorMessage. */ - char *(*conn_error_message) (WalProposerConn *conn); + char *(*conn_error_message) (Safekeeper *sk); /* Connection status, aka PQstatus. */ - WalProposerConnStatusType (*conn_status) (WalProposerConn *conn); + WalProposerConnStatusType (*conn_status) (Safekeeper *sk); /* Start the connection, aka PQconnectStart. */ - WalProposerConn *(*conn_connect_start) (char *conninfo); + void (*conn_connect_start) (Safekeeper *sk); /* Poll an asynchronous connection, aka PQconnectPoll. */ - WalProposerConnectPollStatusType (*conn_connect_poll) (WalProposerConn *conn); + WalProposerConnectPollStatusType (*conn_connect_poll) (Safekeeper *sk); /* Send a blocking SQL query, aka PQsendQuery. */ - bool (*conn_send_query) (WalProposerConn *conn, char *query); + bool (*conn_send_query) (Safekeeper *sk, char *query); /* Read the query result, aka PQgetResult. */ - WalProposerExecStatusType (*conn_get_query_result) (WalProposerConn *conn); + WalProposerExecStatusType (*conn_get_query_result) (Safekeeper *sk); /* Flush buffer to the network, aka PQflush. */ - int (*conn_flush) (WalProposerConn *conn); + int (*conn_flush) (Safekeeper *sk); /* Close the connection, aka PQfinish. */ - void (*conn_finish) (WalProposerConn *conn); + void (*conn_finish) (Safekeeper *sk); - /* Try to read CopyData message, aka PQgetCopyData. */ - PGAsyncReadResult (*conn_async_read) (WalProposerConn *conn, char **buf, int *amount); + /* + * Try to read CopyData message from the safekeeper, aka PQgetCopyData. + * + * On success, the data is placed in *buf. It is valid until the next call + * to this function. + */ + PGAsyncReadResult (*conn_async_read) (Safekeeper *sk, char **buf, int *amount); /* Try to write CopyData message, aka PQputCopyData. */ - PGAsyncWriteResult (*conn_async_write) (WalProposerConn *conn, void const *buf, size_t size); + PGAsyncWriteResult (*conn_async_write) (Safekeeper *sk, void const *buf, size_t size); /* Blocking CopyData write, aka PQputCopyData + PQflush. */ - bool (*conn_blocking_write) (WalProposerConn *conn, void const *buf, size_t size); + bool (*conn_blocking_write) (Safekeeper *sk, void const *buf, size_t size); /* Download WAL from startpos to endpos and make it available locally. */ bool (*recovery_download) (Safekeeper *sk, TimeLineID timeline, XLogRecPtr startpos, XLogRecPtr endpos); /* Read WAL from disk to buf. */ - void (*wal_read) (XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count); + void (*wal_read) (Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count); /* Allocate WAL reader. */ - XLogReaderState *(*wal_reader_allocate) (void); + void (*wal_reader_allocate) (Safekeeper *sk); /* Deallocate event set. */ - void (*free_event_set) (void); + void (*free_event_set) (WalProposer *wp); /* Initialize event set. */ - void (*init_event_set) (int n_safekeepers); + void (*init_event_set) (WalProposer *wp); /* Update events for an existing safekeeper connection. */ void (*update_event_set) (Safekeeper *sk, uint32 events); @@ -513,22 +532,22 @@ typedef struct walproposer_api * events mask to indicate events and sets sk to the safekeeper which has * an event. */ - int (*wait_event_set) (long timeout, Safekeeper **sk, uint32 *events); + int (*wait_event_set) (WalProposer *wp, long timeout, Safekeeper **sk, uint32 *events); /* Read random bytes. */ - bool (*strong_random) (void *buf, size_t len); + bool (*strong_random) (WalProposer *wp, void *buf, size_t len); /* * Get a basebackup LSN. Used to cross-validate with the latest available * LSN on the safekeepers. */ - XLogRecPtr (*get_redo_start_lsn) (void); + XLogRecPtr (*get_redo_start_lsn) (WalProposer *wp); /* * Finish sync safekeepers with the given LSN. This function should not * return and should exit the program. */ - void (*finish_sync_safekeepers) (XLogRecPtr lsn); + void (*finish_sync_safekeepers) (WalProposer *wp, XLogRecPtr lsn); /* * Called after every new message from the safekeeper. Used to propagate @@ -541,7 +560,22 @@ typedef struct walproposer_api * Called on peer_horizon_lsn updates. Used to advance replication slot * and to free up disk space by deleting unnecessary WAL. */ - void (*confirm_wal_streamed) (XLogRecPtr lsn); + void (*confirm_wal_streamed) (WalProposer *wp, XLogRecPtr lsn); + + /* + * Write a log message to the internal log processor. This is used only + * when walproposer is compiled as a library. Otherwise, all logging is + * handled by elog(). + */ + void (*log_internal) (WalProposer *wp, int level, const char *line); + + /* + * Called right after the proposer was elected, but before it started + * recovery and sent ProposerElected message to the safekeepers. + * + * Used by logical replication to update truncateLsn. + */ + void (*after_election) (WalProposer *wp); } walproposer_api; /* @@ -590,6 +624,13 @@ typedef struct WalProposerConfig /* Will be passed to safekeepers in greet request. */ uint64 systemId; + + /* Will be passed to safekeepers in greet request. */ + TimeLineID pgTimeline; + +#ifdef WALPROPOSER_LIB + void *callback_data; +#endif } WalProposerConfig; @@ -666,7 +707,16 @@ extern WalProposer *WalProposerCreate(WalProposerConfig *config, walproposer_api extern void WalProposerStart(WalProposer *wp); extern void WalProposerBroadcast(WalProposer *wp, XLogRecPtr startpos, XLogRecPtr endpos); extern void WalProposerPoll(WalProposer *wp); -extern void ParsePageserverFeedbackMessage(StringInfo reply_message, - PageserverFeedback *rf); +extern void WalProposerFree(WalProposer *wp); + + +#define WPEVENT 1337 /* special log level for walproposer internal events */ + +#ifdef WALPROPOSER_LIB +void WalProposerLibLog(WalProposer *wp, int elevel, char *fmt, ...); +#define walprop_log(elevel, ...) WalProposerLibLog(wp, elevel, __VA_ARGS__) +#else +#define walprop_log(elevel, ...) elog(elevel, __VA_ARGS__) +#endif #endif /* __NEON_WALPROPOSER_H__ */ diff --git a/pgxn/neon/walproposer_compat.c b/pgxn/neon/walproposer_compat.c new file mode 100644 index 0000000000..7617f21a26 --- /dev/null +++ b/pgxn/neon/walproposer_compat.c @@ -0,0 +1,192 @@ +/* + * Contains copied/adapted functions from libpq and some internal postgres functions. + * This is needed to avoid linking to full postgres server installation. This file + * is compiled as a part of libwalproposer static library. + */ + +#include +#include "walproposer.h" +#include "utils/datetime.h" +#include "miscadmin.h" + +void ExceptionalCondition(const char *conditionName, + const char *fileName, int lineNumber) +{ + fprintf(stderr, "ExceptionalCondition: %s:%d: %s\n", + fileName, lineNumber, conditionName); + fprintf(stderr, "aborting...\n"); + exit(1); +} + +void +pq_copymsgbytes(StringInfo msg, char *buf, int datalen) +{ + if (datalen < 0 || datalen > (msg->len - msg->cursor)) + ExceptionalCondition("insufficient data left in message", __FILE__, __LINE__); + memcpy(buf, &msg->data[msg->cursor], datalen); + msg->cursor += datalen; +} + +/* -------------------------------- + * pq_getmsgint - get a binary integer from a message buffer + * + * Values are treated as unsigned. + * -------------------------------- + */ +unsigned int +pq_getmsgint(StringInfo msg, int b) +{ + unsigned int result; + unsigned char n8; + uint16 n16; + uint32 n32; + + switch (b) + { + case 1: + pq_copymsgbytes(msg, (char *) &n8, 1); + result = n8; + break; + case 2: + pq_copymsgbytes(msg, (char *) &n16, 2); + result = pg_ntoh16(n16); + break; + case 4: + pq_copymsgbytes(msg, (char *) &n32, 4); + result = pg_ntoh32(n32); + break; + default: + fprintf(stderr, "unsupported integer size %d\n", b); + ExceptionalCondition("unsupported integer size", __FILE__, __LINE__); + result = 0; /* keep compiler quiet */ + break; + } + return result; +} + +/* -------------------------------- + * pq_getmsgint64 - get a binary 8-byte int from a message buffer + * + * It is tempting to merge this with pq_getmsgint, but we'd have to make the + * result int64 for all data widths --- that could be a big performance + * hit on machines where int64 isn't efficient. + * -------------------------------- + */ +int64 +pq_getmsgint64(StringInfo msg) +{ + uint64 n64; + + pq_copymsgbytes(msg, (char *) &n64, sizeof(n64)); + + return pg_ntoh64(n64); +} + +/* -------------------------------- + * pq_getmsgbyte - get a raw byte from a message buffer + * -------------------------------- + */ +int +pq_getmsgbyte(StringInfo msg) +{ + if (msg->cursor >= msg->len) + ExceptionalCondition("no data left in message", __FILE__, __LINE__); + return (unsigned char) msg->data[msg->cursor++]; +} + +/* -------------------------------- + * pq_getmsgbytes - get raw data from a message buffer + * + * Returns a pointer directly into the message buffer; note this + * may not have any particular alignment. + * -------------------------------- + */ +const char * +pq_getmsgbytes(StringInfo msg, int datalen) +{ + const char *result; + + if (datalen < 0 || datalen > (msg->len - msg->cursor)) + ExceptionalCondition("insufficient data left in message", __FILE__, __LINE__); + result = &msg->data[msg->cursor]; + msg->cursor += datalen; + return result; +} + +/* -------------------------------- + * pq_getmsgstring - get a null-terminated text string (with conversion) + * + * May return a pointer directly into the message buffer, or a pointer + * to a palloc'd conversion result. + * -------------------------------- + */ +const char * +pq_getmsgstring(StringInfo msg) +{ + char *str; + int slen; + + str = &msg->data[msg->cursor]; + + /* + * It's safe to use strlen() here because a StringInfo is guaranteed to + * have a trailing null byte. But check we found a null inside the + * message. + */ + slen = strlen(str); + if (msg->cursor + slen >= msg->len) + ExceptionalCondition("invalid string in message", __FILE__, __LINE__); + msg->cursor += slen + 1; + + return str; +} + +/* -------------------------------- + * pq_getmsgend - verify message fully consumed + * -------------------------------- + */ +void +pq_getmsgend(StringInfo msg) +{ + if (msg->cursor != msg->len) + ExceptionalCondition("invalid msg format", __FILE__, __LINE__); +} + + +/* + * Produce a C-string representation of a TimestampTz. + * + * This is mostly for use in emitting messages. + */ +const char * +timestamptz_to_str(TimestampTz t) +{ + static char buf[MAXDATELEN + 1]; + + snprintf(buf, sizeof(buf), "TimestampTz(%ld)", t); + return buf; +} + +bool +TimestampDifferenceExceeds(TimestampTz start_time, + TimestampTz stop_time, + int msec) +{ + TimestampTz diff = stop_time - start_time; + return (diff >= msec * INT64CONST(1000)); +} + +void +WalProposerLibLog(WalProposer *wp, int elevel, char *fmt, ...) +{ + char buf[1024]; + va_list args; + + fmt = _(fmt); + + va_start(args, fmt); + vsnprintf(buf, sizeof(buf), fmt, args); + va_end(args); + + wp->api.log_internal(wp, elevel, buf); +} diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index 654b411e94..865f91165b 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -73,7 +73,8 @@ static void walprop_register_bgworker(void); static void walprop_pg_init_standalone_sync_safekeepers(void); static void walprop_pg_init_walsender(void); static void walprop_pg_init_bgworker(void); -static TimestampTz walprop_pg_get_current_timestamp(void); +static TimestampTz walprop_pg_get_current_timestamp(WalProposer *wp); +static TimeLineID walprop_pg_get_timeline_id(void); static void walprop_pg_load_libpqwalreceiver(void); static process_interrupts_callback_t PrevProcessInterruptsCallback; @@ -104,6 +105,7 @@ init_walprop_config(bool syncSafekeepers) walprop_config.systemId = GetSystemIdentifier(); else walprop_config.systemId = 0; + walprop_config.pgTimeline = walprop_pg_get_timeline_id(); } /* @@ -136,7 +138,7 @@ WalProposerMain(Datum main_arg) walprop_pg_load_libpqwalreceiver(); wp = WalProposerCreate(&walprop_config, walprop_pg); - wp->last_reconnect_attempt = walprop_pg_get_current_timestamp(); + wp->last_reconnect_attempt = walprop_pg_get_current_timestamp(wp); walprop_pg_init_walsender(); WalProposerStart(wp); @@ -379,7 +381,7 @@ nwp_shmem_startup_hook(void) } static WalproposerShmemState * -walprop_pg_get_shmem_state(void) +walprop_pg_get_shmem_state(WalProposer *wp) { Assert(walprop_shared != NULL); return walprop_shared; @@ -505,7 +507,7 @@ walprop_pg_init_bgworker(void) } static XLogRecPtr -walprop_pg_get_flush_rec_ptr(void) +walprop_pg_get_flush_rec_ptr(WalProposer *wp) { #if PG_MAJORVERSION_NUM < 15 return GetFlushRecPtr(); @@ -515,7 +517,7 @@ walprop_pg_get_flush_rec_ptr(void) } static TimestampTz -walprop_pg_get_current_timestamp(void) +walprop_pg_get_current_timestamp(WalProposer *wp) { return GetCurrentTimestamp(); } @@ -565,15 +567,15 @@ ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking) /* Exported function definitions */ static char * -walprop_error_message(WalProposerConn *conn) +walprop_error_message(Safekeeper *sk) { - return PQerrorMessage(conn->pg_conn); + return PQerrorMessage(sk->conn->pg_conn); } static WalProposerConnStatusType -walprop_status(WalProposerConn *conn) +walprop_status(Safekeeper *sk) { - switch (PQstatus(conn->pg_conn)) + switch (PQstatus(sk->conn->pg_conn)) { case CONNECTION_OK: return WP_CONNECTION_OK; @@ -584,16 +586,17 @@ walprop_status(WalProposerConn *conn) } } -static WalProposerConn * -walprop_connect_start(char *conninfo) +static void +walprop_connect_start(Safekeeper *sk) { - WalProposerConn *conn; PGconn *pg_conn; const char *keywords[3]; const char *values[3]; int n; char *password = neon_auth_token; + Assert(sk->conn == NULL); + /* * Connect using the given connection string. If the NEON_AUTH_TOKEN * environment variable was set, use that as the password. @@ -611,7 +614,7 @@ walprop_connect_start(char *conninfo) n++; } keywords[n] = "dbname"; - values[n] = conninfo; + values[n] = sk->conninfo; n++; keywords[n] = NULL; values[n] = NULL; @@ -619,11 +622,11 @@ walprop_connect_start(char *conninfo) pg_conn = PQconnectStartParams(keywords, values, 1); /* - * Allocation of a PQconn can fail, and will return NULL. We want to fully - * replicate the behavior of PQconnectStart here. + * "If the result is null, then libpq has been unable to allocate a new + * PGconn structure" */ if (!pg_conn) - return NULL; + elog(FATAL, "failed to allocate new PGconn object"); /* * And in theory this allocation can fail as well, but it's incredibly @@ -632,20 +635,19 @@ walprop_connect_start(char *conninfo) * palloc will exit on failure though, so there's not much we could do if * it *did* fail. */ - conn = palloc(sizeof(WalProposerConn)); - conn->pg_conn = pg_conn; - conn->is_nonblocking = false; /* connections always start in blocking + sk->conn = palloc(sizeof(WalProposerConn)); + sk->conn->pg_conn = pg_conn; + sk->conn->is_nonblocking = false; /* connections always start in blocking * mode */ - conn->recvbuf = NULL; - return conn; + sk->conn->recvbuf = NULL; } static WalProposerConnectPollStatusType -walprop_connect_poll(WalProposerConn *conn) +walprop_connect_poll(Safekeeper *sk) { WalProposerConnectPollStatusType return_val; - switch (PQconnectPoll(conn->pg_conn)) + switch (PQconnectPoll(sk->conn->pg_conn)) { case PGRES_POLLING_FAILED: return_val = WP_CONN_POLLING_FAILED; @@ -682,24 +684,24 @@ walprop_connect_poll(WalProposerConn *conn) } static bool -walprop_send_query(WalProposerConn *conn, char *query) +walprop_send_query(Safekeeper *sk, char *query) { /* * We need to be in blocking mode for sending the query to run without * requiring a call to PQflush */ - if (!ensure_nonblocking_status(conn, false)) + if (!ensure_nonblocking_status(sk->conn, false)) return false; /* PQsendQuery returns 1 on success, 0 on failure */ - if (!PQsendQuery(conn->pg_conn, query)) + if (!PQsendQuery(sk->conn->pg_conn, query)) return false; return true; } static WalProposerExecStatusType -walprop_get_query_result(WalProposerConn *conn) +walprop_get_query_result(Safekeeper *sk) { PGresult *result; WalProposerExecStatusType return_val; @@ -708,14 +710,14 @@ walprop_get_query_result(WalProposerConn *conn) char *unexpected_success = NULL; /* Consume any input that we might be missing */ - if (!PQconsumeInput(conn->pg_conn)) + if (!PQconsumeInput(sk->conn->pg_conn)) return WP_EXEC_FAILED; - if (PQisBusy(conn->pg_conn)) + if (PQisBusy(sk->conn->pg_conn)) return WP_EXEC_NEEDS_INPUT; - result = PQgetResult(conn->pg_conn); + result = PQgetResult(sk->conn->pg_conn); /* * PQgetResult returns NULL only if getting the result was successful & @@ -777,24 +779,28 @@ walprop_get_query_result(WalProposerConn *conn) } static pgsocket -walprop_socket(WalProposerConn *conn) +walprop_socket(Safekeeper *sk) { - return PQsocket(conn->pg_conn); + return PQsocket(sk->conn->pg_conn); } static int -walprop_flush(WalProposerConn *conn) +walprop_flush(Safekeeper *sk) { - return (PQflush(conn->pg_conn)); + return (PQflush(sk->conn->pg_conn)); } static void -walprop_finish(WalProposerConn *conn) +walprop_finish(Safekeeper *sk) { - if (conn->recvbuf != NULL) - PQfreemem(conn->recvbuf); - PQfinish(conn->pg_conn); - pfree(conn); + if (!sk->conn) + return; + + if (sk->conn->recvbuf != NULL) + PQfreemem(sk->conn->recvbuf); + PQfinish(sk->conn->pg_conn); + pfree(sk->conn); + sk->conn = NULL; } /* @@ -804,18 +810,18 @@ walprop_finish(WalProposerConn *conn) * to this function. */ static PGAsyncReadResult -walprop_async_read(WalProposerConn *conn, char **buf, int *amount) +walprop_async_read(Safekeeper *sk, char **buf, int *amount) { int result; - if (conn->recvbuf != NULL) + if (sk->conn->recvbuf != NULL) { - PQfreemem(conn->recvbuf); - conn->recvbuf = NULL; + PQfreemem(sk->conn->recvbuf); + sk->conn->recvbuf = NULL; } /* Call PQconsumeInput so that we have the data we need */ - if (!PQconsumeInput(conn->pg_conn)) + if (!PQconsumeInput(sk->conn->pg_conn)) { *amount = 0; *buf = NULL; @@ -833,7 +839,7 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount) * sometimes be triggered by the server returning an ErrorResponse (which * also happens to have the effect that the copy is done). */ - switch (result = PQgetCopyData(conn->pg_conn, &conn->recvbuf, true)) + switch (result = PQgetCopyData(sk->conn->pg_conn, &sk->conn->recvbuf, true)) { case 0: *amount = 0; @@ -848,7 +854,7 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount) * We can check PQgetResult to make sure that the server * failed; it'll always result in PGRES_FATAL_ERROR */ - ExecStatusType status = PQresultStatus(PQgetResult(conn->pg_conn)); + ExecStatusType status = PQresultStatus(PQgetResult(sk->conn->pg_conn)); if (status != PGRES_FATAL_ERROR) elog(FATAL, "unexpected result status %d after failed PQgetCopyData", status); @@ -869,18 +875,18 @@ walprop_async_read(WalProposerConn *conn, char **buf, int *amount) default: /* Positive values indicate the size of the returned result */ *amount = result; - *buf = conn->recvbuf; + *buf = sk->conn->recvbuf; return PG_ASYNC_READ_SUCCESS; } } static PGAsyncWriteResult -walprop_async_write(WalProposerConn *conn, void const *buf, size_t size) +walprop_async_write(Safekeeper *sk, void const *buf, size_t size) { int result; /* If we aren't in non-blocking mode, switch to it. */ - if (!ensure_nonblocking_status(conn, true)) + if (!ensure_nonblocking_status(sk->conn, true)) return PG_ASYNC_WRITE_FAIL; /* @@ -888,7 +894,7 @@ walprop_async_write(WalProposerConn *conn, void const *buf, size_t size) * queued, 0 if it was not queued because of full buffers, or -1 if an * error occurred */ - result = PQputCopyData(conn->pg_conn, buf, size); + result = PQputCopyData(sk->conn->pg_conn, buf, size); /* * We won't get a result of zero because walproposer always empties the @@ -916,7 +922,7 @@ walprop_async_write(WalProposerConn *conn, void const *buf, size_t size) * sucessful, 1 if it was unable to send all the data in the send queue * yet -1 if it failed for some reason */ - switch (result = PQflush(conn->pg_conn)) + switch (result = PQflush(sk->conn->pg_conn)) { case 0: return PG_ASYNC_WRITE_SUCCESS; @@ -934,22 +940,22 @@ walprop_async_write(WalProposerConn *conn, void const *buf, size_t size) * information, refer to the comments there. */ static bool -walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size) +walprop_blocking_write(Safekeeper *sk, void const *buf, size_t size) { int result; /* If we are in non-blocking mode, switch out of it. */ - if (!ensure_nonblocking_status(conn, false)) + if (!ensure_nonblocking_status(sk->conn, false)) return false; - if ((result = PQputCopyData(conn->pg_conn, buf, size)) == -1) + if ((result = PQputCopyData(sk->conn->pg_conn, buf, size)) == -1) return false; Assert(result == 1); /* Because the connection is non-blocking, flushing returns 0 or -1 */ - if ((result = PQflush(conn->pg_conn)) == -1) + if ((result = PQflush(sk->conn->pg_conn)) == -1) return false; Assert(result == 0); @@ -1381,11 +1387,11 @@ XLogWalPropClose(XLogRecPtr recptr) } static void -walprop_pg_wal_read(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count) +walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count) { WALReadError errinfo; - if (!WALRead(state, + if (!WALRead(sk->xlogreader, buf, startptr, count, @@ -1396,31 +1402,38 @@ walprop_pg_wal_read(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size } } -static XLogReaderState * -walprop_pg_wal_reader_allocate(void) +static void +walprop_pg_wal_reader_allocate(Safekeeper *sk) { - return XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL); + sk->xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open,.segment_close = wal_segment_close), NULL); + if (sk->xlogreader == NULL) + elog(FATAL, "Failed to allocate xlog reader"); } static WaitEventSet *waitEvents; static void -walprop_pg_free_event_set(void) +walprop_pg_free_event_set(WalProposer *wp) { if (waitEvents) { FreeWaitEventSet(waitEvents); waitEvents = NULL; } + + for (int i = 0; i < wp->n_safekeepers; i++) + { + wp->safekeeper[i].eventPos = -1; + } } static void -walprop_pg_init_event_set(int n_safekeepers) +walprop_pg_init_event_set(WalProposer *wp) { if (waitEvents) elog(FATAL, "double-initialization of event set"); - waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + n_safekeepers); + waitEvents = CreateWaitEventSet(TopMemoryContext, 2 + wp->n_safekeepers); AddWaitEventToSet(waitEvents, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); AddWaitEventToSet(waitEvents, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, @@ -1439,11 +1452,11 @@ walprop_pg_update_event_set(Safekeeper *sk, uint32 events) static void walprop_pg_add_safekeeper_event_set(Safekeeper *sk, uint32 events) { - sk->eventPos = AddWaitEventToSet(waitEvents, events, walprop_socket(sk->conn), NULL, sk); + sk->eventPos = AddWaitEventToSet(waitEvents, events, walprop_socket(sk), NULL, sk); } static int -walprop_pg_wait_event_set(long timeout, Safekeeper **sk, uint32 *events) +walprop_pg_wait_event_set(WalProposer *wp, long timeout, Safekeeper **sk, uint32 *events) { WaitEvent event = {0}; int rc = 0; @@ -1499,7 +1512,7 @@ walprop_pg_wait_event_set(long timeout, Safekeeper **sk, uint32 *events) } static void -walprop_pg_finish_sync_safekeepers(XLogRecPtr lsn) +walprop_pg_finish_sync_safekeepers(WalProposer *wp, XLogRecPtr lsn) { fprintf(stdout, "%X/%X\n", LSN_FORMAT_ARGS(lsn)); exit(0); @@ -1611,7 +1624,7 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn) * pageserver. */ quorumFeedback.rf.disk_consistent_lsn, - walprop_pg_get_current_timestamp(), false); + walprop_pg_get_current_timestamp(wp), false); } CombineHotStanbyFeedbacks(&hsFeedback, wp); @@ -1628,18 +1641,65 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn) } static void -walprop_pg_confirm_wal_streamed(XLogRecPtr lsn) +walprop_pg_confirm_wal_streamed(WalProposer *wp, XLogRecPtr lsn) { if (MyReplicationSlot) PhysicalConfirmReceivedLocation(lsn); } +static XLogRecPtr +walprop_pg_get_redo_start_lsn(WalProposer *wp) +{ + return GetRedoStartLsn(); +} + +static bool +walprop_pg_strong_random(WalProposer *wp, void *buf, size_t len) +{ + return pg_strong_random(buf, len); +} + +static void +walprop_pg_log_internal(WalProposer *wp, int level, const char *line) +{ + elog(FATAL, "unexpected log_internal message at level %d: %s", level, line); +} + +static void +walprop_pg_after_election(WalProposer *wp) +{ + FILE* f; + XLogRecPtr lrRestartLsn; + + /* We don't need to do anything in syncSafekeepers mode.*/ + if (wp->config->syncSafekeepers) + return; + + /* + * If there are active logical replication subscription we need + * to provide enough WAL for their WAL senders based on th position + * of their replication slots. + */ + f = fopen("restart.lsn", "rb"); + if (f != NULL && !wp->config->syncSafekeepers) + { + fread(&lrRestartLsn, sizeof(lrRestartLsn), 1, f); + fclose(f); + if (lrRestartLsn != InvalidXLogRecPtr) + { + elog(LOG, "Logical replication restart LSN %X/%X", LSN_FORMAT_ARGS(lrRestartLsn)); + /* start from the beginning of the segment to fetch page headers verifed by XLogReader */ + lrRestartLsn = lrRestartLsn - XLogSegmentOffset(lrRestartLsn, wal_segment_size); + wp->truncateLsn = Min(wp->truncateLsn, lrRestartLsn); + } + } +} + static const walproposer_api walprop_pg = { .get_shmem_state = walprop_pg_get_shmem_state, .start_streaming = walprop_pg_start_streaming, .get_flush_rec_ptr = walprop_pg_get_flush_rec_ptr, .get_current_timestamp = walprop_pg_get_current_timestamp, - .get_timeline_id = walprop_pg_get_timeline_id, .conn_error_message = walprop_error_message, .conn_status = walprop_status, .conn_connect_start = walprop_connect_start, @@ -1659,9 +1719,11 @@ static const walproposer_api walprop_pg = { .update_event_set = walprop_pg_update_event_set, .add_safekeeper_event_set = walprop_pg_add_safekeeper_event_set, .wait_event_set = walprop_pg_wait_event_set, - .strong_random = pg_strong_random, - .get_redo_start_lsn = GetRedoStartLsn, + .strong_random = walprop_pg_strong_random, + .get_redo_start_lsn = walprop_pg_get_redo_start_lsn, .finish_sync_safekeepers = walprop_pg_finish_sync_safekeepers, .process_safekeeper_feedback = walprop_pg_process_safekeeper_feedback, .confirm_wal_streamed = walprop_pg_confirm_wal_streamed, + .log_internal = walprop_pg_log_internal, + .after_election = walprop_pg_after_election, };