diff --git a/.github/workflows/_build-and-test-locally.yml b/.github/workflows/_build-and-test-locally.yml index 3f66f41ef2..f9b96271a4 100644 --- a/.github/workflows/_build-and-test-locally.yml +++ b/.github/workflows/_build-and-test-locally.yml @@ -313,10 +313,10 @@ jobs: # Use tar to copy files matching the pattern, preserving the paths in the destionation tar c \ pg_install/v* \ - pg_install/build/*/src/test/regress/*.so \ - pg_install/build/*/src/test/regress/pg_regress \ - pg_install/build/*/src/test/isolation/isolationtester \ - pg_install/build/*/src/test/isolation/pg_isolation_regress \ + build/*/src/test/regress/*.so \ + build/*/src/test/regress/pg_regress \ + build/*/src/test/isolation/isolationtester \ + build/*/src/test/isolation/pg_isolation_regress \ | tar x -C /tmp/neon - name: Upload Neon artifact diff --git a/.github/workflows/build-macos.yml b/.github/workflows/build-macos.yml index 0f7fa3e813..226369de52 100644 --- a/.github/workflows/build-macos.yml +++ b/.github/workflows/build-macos.yml @@ -144,7 +144,7 @@ jobs: id: cache_walproposer_lib uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 with: - path: pg_install/build/walproposer-lib + path: build/walproposer-lib key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }} - name: Checkout submodule vendor/postgres-v17 @@ -169,11 +169,11 @@ jobs: run: make walproposer-lib -j$(sysctl -n hw.ncpu) - - name: Upload "pg_install/build/walproposer-lib" artifact + - name: Upload "build/walproposer-lib" artifact uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2 with: - name: pg_install--build--walproposer-lib - path: pg_install/build/walproposer-lib + name: build--walproposer-lib + path: build/walproposer-lib # The artifact is supposed to be used by the next job in the same workflow, # so there’s no need to store it for too long. retention-days: 1 @@ -226,11 +226,11 @@ jobs: name: pg_install--v17 path: pg_install/v17 - - name: Download "pg_install/build/walproposer-lib" artifact + - name: Download "build/walproposer-lib" artifact uses: actions/download-artifact@d3f86a106a0bac45b974a628896c90dbdf5c8093 # v4.3.0 with: - name: pg_install--build--walproposer-lib - path: pg_install/build/walproposer-lib + name: build--walproposer-lib + path: build/walproposer-lib # `actions/download-artifact` doesn't preserve permissions: # https://github.com/actions/download-artifact?tab=readme-ov-file#permission-loss diff --git a/.gitignore b/.gitignore index 45eb4dbf0e..70c7e96303 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /artifact_cache +/build /pg_install /target /tmp_check diff --git a/Cargo.lock b/Cargo.lock index d3dfbc87e3..8d3e427e48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4272,6 +4272,7 @@ dependencies = [ "tokio-util", "tonic 0.13.1", "tracing", + "url", "utils", "workspace_hack", ] @@ -4489,6 +4490,8 @@ dependencies = [ "pageserver_api", "postgres_ffi", "prost 0.13.5", + "strum", + "strum_macros", "thiserror 1.0.69", "tokio", "tonic 0.13.1", diff --git a/Dockerfile b/Dockerfile index f72d7d9bbc..69657067de 100644 --- a/Dockerfile +++ b/Dockerfile @@ -45,7 +45,6 @@ COPY --chown=nonroot scripts/ninstall.sh scripts/ninstall.sh ENV BUILD_TYPE=release RUN set -e \ && mold -run make -j $(nproc) -s neon-pg-ext \ - && rm -rf pg_install/build \ && tar -C pg_install -czf /home/nonroot/postgres_install.tar.gz . # Prepare cargo-chef recipe diff --git a/Makefile b/Makefile index 5130e17e59..dee50a51c1 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,12 @@ ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST)))) -# Where to install Postgres, default is ./pg_install, maybe useful for package managers +# Where to install Postgres, default is ./pg_install, maybe useful for package +# managers. POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/ +# All intermediate build artifacts are stored here. +BUILD_DIR := build + ICU_PREFIX_DIR := /usr/local/icu # @@ -104,21 +108,20 @@ cargo-target-dir: # Some rules are duplicated for Postgres v14 and 15. We may want to refactor # to avoid the duplication in the future, but it's tolerable for now. # -$(POSTGRES_INSTALL_DIR)/build/%/config.status: - - mkdir -p $(POSTGRES_INSTALL_DIR) - test -e $(POSTGRES_INSTALL_DIR)/CACHEDIR.TAG || echo "$(CACHEDIR_TAG_CONTENTS)" > $(POSTGRES_INSTALL_DIR)/CACHEDIR.TAG +$(BUILD_DIR)/%/config.status: + mkdir -p $(BUILD_DIR) + test -e $(BUILD_DIR)/CACHEDIR.TAG || echo "$(CACHEDIR_TAG_CONTENTS)" > $(BUILD_DIR)/CACHEDIR.TAG +@echo "Configuring Postgres $* build" @test -s $(ROOT_PROJECT_DIR)/vendor/postgres-$*/configure || { \ echo "\nPostgres submodule not found in $(ROOT_PROJECT_DIR)/vendor/postgres-$*/, execute "; \ echo "'git submodule update --init --recursive --depth 2 --progress .' in project root.\n"; \ exit 1; } - mkdir -p $(POSTGRES_INSTALL_DIR)/build/$* + mkdir -p $(BUILD_DIR)/$* VERSION=$*; \ EXTRA_VERSION=$$(cd $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION && git rev-parse HEAD); \ - (cd $(POSTGRES_INSTALL_DIR)/build/$$VERSION && \ + (cd $(BUILD_DIR)/$$VERSION && \ env PATH="$(EXTRA_PATH_OVERRIDES):$$PATH" $(ROOT_PROJECT_DIR)/vendor/postgres-$$VERSION/configure \ CFLAGS='$(PG_CFLAGS)' LDFLAGS='$(PG_LDFLAGS)' \ $(PG_CONFIGURE_OPTS) --with-extra-version=" ($$EXTRA_VERSION)" \ @@ -130,73 +133,73 @@ $(POSTGRES_INSTALL_DIR)/build/%/config.status: # the "build-all-versions" entry points) where direct mention of PostgreSQL # versions is used. .PHONY: postgres-configure-v17 -postgres-configure-v17: $(POSTGRES_INSTALL_DIR)/build/v17/config.status +postgres-configure-v17: $(BUILD_DIR)/v17/config.status .PHONY: postgres-configure-v16 -postgres-configure-v16: $(POSTGRES_INSTALL_DIR)/build/v16/config.status +postgres-configure-v16: $(BUILD_DIR)/v16/config.status .PHONY: postgres-configure-v15 -postgres-configure-v15: $(POSTGRES_INSTALL_DIR)/build/v15/config.status +postgres-configure-v15: $(BUILD_DIR)/v15/config.status .PHONY: postgres-configure-v14 -postgres-configure-v14: $(POSTGRES_INSTALL_DIR)/build/v14/config.status +postgres-configure-v14: $(BUILD_DIR)/v14/config.status # Install the PostgreSQL header files into $(POSTGRES_INSTALL_DIR)//include .PHONY: postgres-headers-% postgres-headers-%: postgres-configure-% +@echo "Installing PostgreSQL $* headers" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/include MAKELEVEL=0 install + $(MAKE) -C $(BUILD_DIR)/$*/src/include MAKELEVEL=0 install # Compile and install PostgreSQL .PHONY: postgres-% postgres-%: postgres-configure-% \ postgres-headers-% # to prevent `make install` conflicts with neon's `postgres-headers` +@echo "Compiling PostgreSQL $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 install + $(MAKE) -C $(BUILD_DIR)/$* MAKELEVEL=0 install +@echo "Compiling libpq $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/interfaces/libpq install + $(MAKE) -C $(BUILD_DIR)/$*/src/interfaces/libpq install +@echo "Compiling pg_prewarm $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_prewarm install + $(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_prewarm install +@echo "Compiling pg_buffercache $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_buffercache install + $(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_buffercache install +@echo "Compiling pg_visibility $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_visibility install + $(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_visibility install +@echo "Compiling pageinspect $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pageinspect install + $(MAKE) -C $(BUILD_DIR)/$*/contrib/pageinspect install +@echo "Compiling pg_trgm $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/pg_trgm install + $(MAKE) -C $(BUILD_DIR)/$*/contrib/pg_trgm install +@echo "Compiling amcheck $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/amcheck install + $(MAKE) -C $(BUILD_DIR)/$*/contrib/amcheck install +@echo "Compiling test_decoding $*" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/contrib/test_decoding install + $(MAKE) -C $(BUILD_DIR)/$*/contrib/test_decoding install .PHONY: postgres-check-% postgres-check-%: postgres-% - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$* MAKELEVEL=0 check + $(MAKE) -C $(BUILD_DIR)/$* MAKELEVEL=0 check .PHONY: neon-pg-ext-% neon-pg-ext-%: postgres-% +@echo "Compiling neon $*" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-$* + mkdir -p $(BUILD_DIR)/neon-$* $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-$* \ + -C $(BUILD_DIR)/neon-$* \ -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile install +@echo "Compiling neon_walredo $*" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* + mkdir -p $(BUILD_DIR)/neon-walredo-$* $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-walredo-$* \ + -C $(BUILD_DIR)/neon-walredo-$* \ -f $(ROOT_PROJECT_DIR)/pgxn/neon_walredo/Makefile install +@echo "Compiling neon_rmgr $*" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$* + mkdir -p $(BUILD_DIR)/neon-rmgr-$* $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-rmgr-$* \ + -C $(BUILD_DIR)/neon-rmgr-$* \ -f $(ROOT_PROJECT_DIR)/pgxn/neon_rmgr/Makefile install +@echo "Compiling neon_test_utils $*" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* + mkdir -p $(BUILD_DIR)/neon-test-utils-$* $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-test-utils-$* \ + -C $(BUILD_DIR)/neon-test-utils-$* \ -f $(ROOT_PROJECT_DIR)/pgxn/neon_test_utils/Makefile install +@echo "Compiling neon_utils $*" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* + mkdir -p $(BUILD_DIR)/neon-utils-$* $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config COPT='$(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \ + -C $(BUILD_DIR)/neon-utils-$* \ -f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install # Build walproposer as a static library. walproposer source code is located @@ -211,15 +214,15 @@ neon-pg-ext-%: postgres-% .PHONY: walproposer-lib walproposer-lib: neon-pg-ext-v17 +@echo "Compiling walproposer-lib" - mkdir -p $(POSTGRES_INSTALL_DIR)/build/walproposer-lib + mkdir -p $(BUILD_DIR)/walproposer-lib $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config COPT='$(COPT)' \ - -C $(POSTGRES_INSTALL_DIR)/build/walproposer-lib \ + -C $(BUILD_DIR)/walproposer-lib \ -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile walproposer-lib - cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgport.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib - cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgcommon.a $(POSTGRES_INSTALL_DIR)/build/walproposer-lib - $(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgport.a \ + cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgport.a $(BUILD_DIR)/walproposer-lib + cp $(POSTGRES_INSTALL_DIR)/v17/lib/libpgcommon.a $(BUILD_DIR)/walproposer-lib + $(AR) d $(BUILD_DIR)/walproposer-lib/libpgport.a \ pg_strong_random.o - $(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgcommon.a \ + $(AR) d $(BUILD_DIR)/walproposer-lib/libpgcommon.a \ checksum_helper.o \ cryptohash_openssl.o \ hmac_openssl.o \ @@ -227,7 +230,7 @@ walproposer-lib: neon-pg-ext-v17 parse_manifest.o \ scram-common.o ifeq ($(UNAME_S),Linux) - $(AR) d $(POSTGRES_INSTALL_DIR)/build/walproposer-lib/libpgcommon.a \ + $(AR) d $(BUILD_DIR)/walproposer-lib/libpgcommon.a \ pg_crc32c.o endif @@ -272,7 +275,7 @@ fmt: postgres-%-pg-bsd-indent: postgres-% +@echo "Compiling pg_bsd_indent" - $(MAKE) -C $(POSTGRES_INSTALL_DIR)/build/$*/src/tools/pg_bsd_indent/ + $(MAKE) -C $(BUILD_DIR)/$*/src/tools/pg_bsd_indent/ # Create typedef list for the core. Note that generally it should be combined with # buildfarm one to cover platform specific stuff. @@ -291,7 +294,7 @@ postgres-%-pgindent: postgres-%-pg-bsd-indent postgres-%-typedefs.list cat $(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/tools/pgindent/typedefs.list |\ cat - postgres-$*-typedefs.list | sort | uniq > postgres-$*-typedefs-full.list +@echo note: you might want to run it on selected files/dirs instead. - INDENT=$(POSTGRES_INSTALL_DIR)/build/$*/src/tools/pg_bsd_indent/pg_bsd_indent \ + INDENT=$(BUILD_DIR)/$*/src/tools/pg_bsd_indent/pg_bsd_indent \ $(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/tools/pgindent/pgindent --typedefs postgres-$*-typedefs-full.list \ $(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/ \ --excludes $(ROOT_PROJECT_DIR)/vendor/postgres-$*/src/tools/pgindent/exclude_file_patterns @@ -302,9 +305,9 @@ postgres-%-pgindent: postgres-%-pg-bsd-indent postgres-%-typedefs.list neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17 $(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/v17/bin/pg_config COPT='$(COPT)' \ FIND_TYPEDEF=$(ROOT_PROJECT_DIR)/vendor/postgres-v17/src/tools/find_typedef \ - INDENT=$(POSTGRES_INSTALL_DIR)/build/v17/src/tools/pg_bsd_indent/pg_bsd_indent \ + INDENT=$(BUILD_DIR)/v17/src/tools/pg_bsd_indent/pg_bsd_indent \ PGINDENT_SCRIPT=$(ROOT_PROJECT_DIR)/vendor/postgres-v17/src/tools/pgindent/pgindent \ - -C $(POSTGRES_INSTALL_DIR)/build/neon-v17 \ + -C $(BUILD_DIR)/neon-v17 \ -f $(ROOT_PROJECT_DIR)/pgxn/neon/Makefile pgindent diff --git a/libs/proxy/tokio-postgres2/src/cancel_query.rs b/libs/proxy/tokio-postgres2/src/cancel_query.rs index 4c2a5ef50f..94fbf333ed 100644 --- a/libs/proxy/tokio-postgres2/src/cancel_query.rs +++ b/libs/proxy/tokio-postgres2/src/cancel_query.rs @@ -1,5 +1,3 @@ -use std::io; - use tokio::net::TcpStream; use crate::client::SocketConfig; @@ -8,7 +6,7 @@ use crate::tls::MakeTlsConnect; use crate::{Error, cancel_query_raw, connect_socket}; pub(crate) async fn cancel_query( - config: Option, + config: SocketConfig, ssl_mode: SslMode, tls: T, process_id: i32, @@ -17,16 +15,6 @@ pub(crate) async fn cancel_query( where T: MakeTlsConnect, { - let config = match config { - Some(config) => config, - None => { - return Err(Error::connect(io::Error::new( - io::ErrorKind::InvalidInput, - "unknown host", - ))); - } - }; - let hostname = match &config.host { Host::Tcp(host) => &**host, }; diff --git a/libs/proxy/tokio-postgres2/src/cancel_token.rs b/libs/proxy/tokio-postgres2/src/cancel_token.rs index f6526395ee..c5566b4ad9 100644 --- a/libs/proxy/tokio-postgres2/src/cancel_token.rs +++ b/libs/proxy/tokio-postgres2/src/cancel_token.rs @@ -7,11 +7,16 @@ use crate::config::SslMode; use crate::tls::{MakeTlsConnect, TlsConnect}; use crate::{Error, cancel_query, cancel_query_raw}; -/// The capability to request cancellation of in-progress queries on a -/// connection. -#[derive(Clone, Serialize, Deserialize)] +/// A cancellation token that allows easy cancellation of a query. +#[derive(Clone)] pub struct CancelToken { - pub socket_config: Option, + pub socket_config: SocketConfig, + pub raw: RawCancelToken, +} + +/// A raw cancellation token that allows cancellation of a query, given a fresh connection to postgres. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RawCancelToken { pub ssl_mode: SslMode, pub process_id: i32, pub secret_key: i32, @@ -36,14 +41,16 @@ impl CancelToken { { cancel_query::cancel_query( self.socket_config.clone(), - self.ssl_mode, + self.raw.ssl_mode, tls, - self.process_id, - self.secret_key, + self.raw.process_id, + self.raw.secret_key, ) .await } +} +impl RawCancelToken { /// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new /// connection itself. pub async fn cancel_query_raw(&self, stream: S, tls: T) -> Result<(), Error> diff --git a/libs/proxy/tokio-postgres2/src/client.rs b/libs/proxy/tokio-postgres2/src/client.rs index a7edfc076a..41b22e35b6 100644 --- a/libs/proxy/tokio-postgres2/src/client.rs +++ b/libs/proxy/tokio-postgres2/src/client.rs @@ -12,6 +12,7 @@ use postgres_protocol2::message::frontend; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; +use crate::cancel_token::RawCancelToken; use crate::codec::{BackendMessages, FrontendMessage}; use crate::config::{Host, SslMode}; use crate::query::RowStream; @@ -331,10 +332,12 @@ impl Client { /// connection associated with this client. pub fn cancel_token(&self) -> CancelToken { CancelToken { - socket_config: Some(self.socket_config.clone()), - ssl_mode: self.ssl_mode, - process_id: self.process_id, - secret_key: self.secret_key, + socket_config: self.socket_config.clone(), + raw: RawCancelToken { + ssl_mode: self.ssl_mode, + process_id: self.process_id, + secret_key: self.secret_key, + }, } } diff --git a/libs/proxy/tokio-postgres2/src/lib.rs b/libs/proxy/tokio-postgres2/src/lib.rs index 9556070ed5..791c93b972 100644 --- a/libs/proxy/tokio-postgres2/src/lib.rs +++ b/libs/proxy/tokio-postgres2/src/lib.rs @@ -3,7 +3,7 @@ use postgres_protocol2::message::backend::ReadyForQueryBody; -pub use crate::cancel_token::CancelToken; +pub use crate::cancel_token::{CancelToken, RawCancelToken}; pub use crate::client::{Client, SocketConfig}; pub use crate::config::Config; pub use crate::connect_raw::RawConnection; diff --git a/libs/walproposer/build.rs b/libs/walproposer/build.rs index 530ceb1327..b13c8b32b4 100644 --- a/libs/walproposer/build.rs +++ b/libs/walproposer/build.rs @@ -13,22 +13,24 @@ fn main() -> anyhow::Result<()> { // Tell cargo to invalidate the built crate whenever the wrapper changes println!("cargo:rerun-if-changed=bindgen_deps.h"); + let root_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../.."); + // Finding the location of built libraries and Postgres C headers: // - if POSTGRES_INSTALL_DIR is set look into it, otherwise look into `/pg_install` // - if there's a `bin/pg_config` file use it for getting include server, otherwise use `/pg_install/{PG_MAJORVERSION}/include/postgresql/server` let pg_install_dir = if let Some(postgres_install_dir) = env::var_os("POSTGRES_INSTALL_DIR") { postgres_install_dir.into() } else { - PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../pg_install") + root_path.join("pg_install") }; let pg_install_abs = std::fs::canonicalize(pg_install_dir)?; - let walproposer_lib_dir = pg_install_abs.join("build/walproposer-lib"); + let walproposer_lib_dir = root_path.join("build/walproposer-lib"); let walproposer_lib_search_str = walproposer_lib_dir .to_str() .ok_or(anyhow!("Bad non-UTF path"))?; - let pgxn_neon = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../pgxn/neon"); + let pgxn_neon = root_path.join("pgxn/neon"); let pgxn_neon = std::fs::canonicalize(pgxn_neon)?; let pgxn_neon = pgxn_neon.to_str().ok_or(anyhow!("Bad non-UTF path"))?; diff --git a/pageserver/page_api/Cargo.toml b/pageserver/page_api/Cargo.toml index 8b13b9e1db..c5283c2b09 100644 --- a/pageserver/page_api/Cargo.toml +++ b/pageserver/page_api/Cargo.toml @@ -11,6 +11,8 @@ futures.workspace = true pageserver_api.workspace = true postgres_ffi.workspace = true prost.workspace = true +strum.workspace = true +strum_macros.workspace = true thiserror.workspace = true tokio.workspace = true tonic.workspace = true diff --git a/pageserver/page_api/src/model.rs b/pageserver/page_api/src/model.rs index ef7f89473f..6efa742799 100644 --- a/pageserver/page_api/src/model.rs +++ b/pageserver/page_api/src/model.rs @@ -459,7 +459,7 @@ impl GetPageResponse { /// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream /// (potentially shared by many backends), and a gRPC status response would terminate the stream so /// we send GetPageResponse messages with these codes instead. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, PartialEq, strum_macros::Display)] pub enum GetPageStatusCode { /// Unknown status. For forwards compatibility: used when an older client version receives a new /// status code from a newer server version. diff --git a/pageserver/pagebench/Cargo.toml b/pageserver/pagebench/Cargo.toml index 5e4af88e69..f5dfc0db25 100644 --- a/pageserver/pagebench/Cargo.toml +++ b/pageserver/pagebench/Cargo.toml @@ -25,6 +25,7 @@ tokio.workspace = true tokio-stream.workspace = true tokio-util.workspace = true tonic.workspace = true +url.workspace = true pageserver_client.workspace = true pageserver_api.workspace = true diff --git a/pageserver/pagebench/src/cmd/basebackup.rs b/pageserver/pagebench/src/cmd/basebackup.rs index 8015db528d..e028174c1d 100644 --- a/pageserver/pagebench/src/cmd/basebackup.rs +++ b/pageserver/pagebench/src/cmd/basebackup.rs @@ -13,7 +13,6 @@ use pageserver_client::mgmt_api::ForceAwaitLogicalSize; use pageserver_client::page_service::BasebackupRequest; use pageserver_page_api as page_api; use rand::prelude::*; -use reqwest::Url; use tokio::io::AsyncRead; use tokio::sync::Barrier; use tokio::task::JoinSet; @@ -21,6 +20,7 @@ use tokio_util::compat::{TokioAsyncReadCompatExt as _, TokioAsyncWriteCompatExt use tokio_util::io::StreamReader; use tonic::async_trait; use tracing::{info, instrument}; +use url::Url; use utils::id::TenantTimelineId; use utils::lsn::Lsn; use utils::shard::ShardIndex; @@ -156,12 +156,16 @@ async fn main_impl( let mut work_senders = HashMap::new(); let mut tasks = Vec::new(); - let connurl = Url::parse(&args.page_service_connstring)?; + let scheme = match Url::parse(&args.page_service_connstring) { + Ok(url) => url.scheme().to_lowercase().to_string(), + Err(url::ParseError::RelativeUrlWithoutBase) => "postgresql".to_string(), + Err(err) => return Err(anyhow!("invalid connstring: {err}")), + }; for &tl in &timelines { let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are work_senders.insert(tl, sender); - let client: Box = match connurl.scheme() { + let client: Box = match scheme.as_str() { "postgresql" | "postgres" => Box::new( LibpqClient::new(&args.page_service_connstring, tl, !args.no_compression).await?, ), diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 3a68a77279..a297819e9b 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -10,33 +10,31 @@ use anyhow::Context; use async_trait::async_trait; use bytes::Bytes; use camino::Utf8PathBuf; +use futures::{Stream, StreamExt as _}; use pageserver_api::key::Key; use pageserver_api::keyspace::KeySpaceAccum; use pageserver_api::pagestream_api::{PagestreamGetPageRequest, PagestreamRequest}; use pageserver_api::reltag::RelTag; use pageserver_api::shard::TenantShardId; -use pageserver_page_api::proto; +use pageserver_page_api as page_api; use rand::prelude::*; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::info; +use url::Url; use utils::id::TenantTimelineId; use utils::lsn::Lsn; +use utils::shard::ShardIndex; use crate::util::tokio_thread_local_stats::AllThreadLocalStats; use crate::util::{request_stats, tokio_thread_local_stats}; -#[derive(clap::ValueEnum, Clone, Debug)] -enum Protocol { - Libpq, - Grpc, -} - /// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace. #[derive(clap::Parser)] pub(crate) struct Args { #[clap(long, default_value = "http://localhost:9898")] mgmt_api_endpoint: String, + /// Pageserver connection string. Supports postgresql:// and grpc:// protocols. #[clap(long, default_value = "postgres://postgres@localhost:64000")] page_service_connstring: String, #[clap(long)] @@ -45,8 +43,9 @@ pub(crate) struct Args { num_clients: NonZeroUsize, #[clap(long)] runtime: Option, - #[clap(long, value_enum, default_value = "libpq")] - protocol: Protocol, + /// If true, enable compression (only for gRPC). + #[clap(long)] + compression: bool, /// Each client sends requests at the given rate. /// /// If a request takes too long and we should be issuing a new request already, @@ -325,18 +324,32 @@ async fn main_impl( .unwrap(); Box::pin(async move { - let client: Box = match args.protocol { - Protocol::Libpq => Box::new( - LibpqClient::new(args.page_service_connstring.clone(), worker_id.timeline) - .await - .unwrap(), + let scheme = match Url::parse(&args.page_service_connstring) { + Ok(url) => url.scheme().to_lowercase().to_string(), + Err(url::ParseError::RelativeUrlWithoutBase) => "postgresql".to_string(), + Err(err) => panic!("invalid connstring: {err}"), + }; + let client: Box = match scheme.as_str() { + "postgresql" | "postgres" => { + assert!(!args.compression, "libpq does not support compression"); + Box::new( + LibpqClient::new(&args.page_service_connstring, worker_id.timeline) + .await + .unwrap(), + ) + } + + "grpc" => Box::new( + GrpcClient::new( + &args.page_service_connstring, + worker_id.timeline, + args.compression, + ) + .await + .unwrap(), ), - Protocol::Grpc => Box::new( - GrpcClient::new(args.page_service_connstring.clone(), worker_id.timeline) - .await - .unwrap(), - ), + scheme => panic!("unsupported scheme {scheme}"), }; run_worker(args, client, ss, cancel, rps_period, ranges, weights).await }) @@ -543,8 +556,8 @@ struct LibpqClient { } impl LibpqClient { - async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result { - let inner = pageserver_client::page_service::Client::new(connstring) + async fn new(connstring: &str, ttid: TenantTimelineId) -> anyhow::Result { + let inner = pageserver_client::page_service::Client::new(connstring.to_string()) .await? .pagestream(ttid.tenant_id, ttid.timeline_id) .await?; @@ -600,34 +613,36 @@ impl Client for LibpqClient { } } -/// A gRPC client using the raw, no-frills gRPC client. +/// A gRPC Pageserver client. struct GrpcClient { - req_tx: tokio::sync::mpsc::Sender, - resp_rx: tonic::Streaming, + req_tx: tokio::sync::mpsc::Sender, + resp_rx: Pin> + Send>>, } impl GrpcClient { - async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result { - let mut client = pageserver_page_api::proto::PageServiceClient::connect(connstring).await?; + async fn new( + connstring: &str, + ttid: TenantTimelineId, + compression: bool, + ) -> anyhow::Result { + let mut client = page_api::Client::new( + connstring.to_string(), + ttid.tenant_id, + ttid.timeline_id, + ShardIndex::unsharded(), + None, + compression.then_some(tonic::codec::CompressionEncoding::Zstd), + ) + .await?; // The channel has a buffer size of 1, since 0 is not allowed. It does not matter, since the // benchmark will control the queue depth (i.e. in-flight requests) anyway, and requests are // buffered by Tonic and the OS too. let (req_tx, req_rx) = tokio::sync::mpsc::channel(1); let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx); - let mut req = tonic::Request::new(req_stream); - let metadata = req.metadata_mut(); - metadata.insert("neon-tenant-id", ttid.tenant_id.to_string().try_into()?); - metadata.insert("neon-timeline-id", ttid.timeline_id.to_string().try_into()?); - metadata.insert("neon-shard-id", "0000".try_into()?); + let resp_rx = Box::pin(client.get_pages(req_stream).await?); - let resp = client.get_pages(req).await?; - let resp_stream = resp.into_inner(); - - Ok(Self { - req_tx, - resp_rx: resp_stream, - }) + Ok(Self { req_tx, resp_rx }) } } @@ -641,27 +656,27 @@ impl Client for GrpcClient { rel: RelTag, blks: Vec, ) -> anyhow::Result<()> { - let req = proto::GetPageRequest { + let req = page_api::GetPageRequest { request_id: req_id, - request_class: proto::GetPageClass::Normal as i32, - read_lsn: Some(proto::ReadLsn { - request_lsn: req_lsn.0, - not_modified_since_lsn: mod_lsn.0, - }), - rel: Some(rel.into()), - block_number: blks, + request_class: page_api::GetPageClass::Normal, + read_lsn: page_api::ReadLsn { + request_lsn: req_lsn, + not_modified_since_lsn: Some(mod_lsn), + }, + rel, + block_numbers: blks, }; self.req_tx.send(req).await?; Ok(()) } async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec)> { - let resp = self.resp_rx.message().await?.unwrap(); + let resp = self.resp_rx.next().await.unwrap().unwrap(); anyhow::ensure!( - resp.status_code == proto::GetPageStatusCode::Ok as i32, + resp.status_code == page_api::GetPageStatusCode::Ok, "unexpected status code: {}", - resp.status_code + resp.status_code, ); - Ok((resp.request_id, resp.page_image)) + Ok((resp.request_id, resp.page_images)) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a0e9d8f06a..c8a41f7875 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -6543,7 +6543,7 @@ impl Timeline { debug!("retain_lsns: {:?}", retain_lsns); - let mut layers_to_remove = Vec::new(); + let max_retain_lsn = retain_lsns.iter().max(); // Scan all layers in the timeline (remote or on-disk). // @@ -6553,108 +6553,110 @@ impl Timeline { // 3. it doesn't need to be retained for 'retain_lsns'; // 4. it does not need to be kept for LSNs holding valid leases. // 5. newer on-disk image layers cover the layer's whole key range - // - // TODO holding a write lock is too agressive and avoidable - let mut guard = self - .layers - .write(LayerManagerLockHolder::GarbageCollection) - .await; - let layers = guard.layer_map()?; - 'outer: for l in layers.iter_historic_layers() { - result.layers_total += 1; + let layers_to_remove = { + let mut layers_to_remove = Vec::new(); - // 1. Is it newer than GC horizon cutoff point? - if l.get_lsn_range().end > space_cutoff { - info!( - "keeping {} because it's newer than space_cutoff {}", - l.layer_name(), - space_cutoff, - ); - result.layers_needed_by_cutoff += 1; - continue 'outer; - } + let guard = self + .layers + .read(LayerManagerLockHolder::GarbageCollection) + .await; + let layers = guard.layer_map()?; + 'outer: for l in layers.iter_historic_layers() { + result.layers_total += 1; - // 2. It is newer than PiTR cutoff point? - if l.get_lsn_range().end > time_cutoff { - info!( - "keeping {} because it's newer than time_cutoff {}", - l.layer_name(), - time_cutoff, - ); - result.layers_needed_by_pitr += 1; - continue 'outer; - } - - // 3. Is it needed by a child branch? - // NOTE With that we would keep data that - // might be referenced by child branches forever. - // We can track this in child timeline GC and delete parent layers when - // they are no longer needed. This might be complicated with long inheritance chains. - // - // TODO Vec is not a great choice for `retain_lsns` - for retain_lsn in &retain_lsns { - // start_lsn is inclusive - if &l.get_lsn_range().start <= retain_lsn { - info!( - "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}", + // 1. Is it newer than GC horizon cutoff point? + if l.get_lsn_range().end > space_cutoff { + debug!( + "keeping {} because it's newer than space_cutoff {}", l.layer_name(), - retain_lsn, - l.is_incremental(), + space_cutoff, ); - result.layers_needed_by_branches += 1; + result.layers_needed_by_cutoff += 1; continue 'outer; } - } - // 4. Is there a valid lease that requires us to keep this layer? - if let Some(lsn) = &max_lsn_with_valid_lease { - // keep if layer start <= any of the lease - if &l.get_lsn_range().start <= lsn { - info!( - "keeping {} because there is a valid lease preventing GC at {}", + // 2. It is newer than PiTR cutoff point? + if l.get_lsn_range().end > time_cutoff { + debug!( + "keeping {} because it's newer than time_cutoff {}", l.layer_name(), - lsn, + time_cutoff, ); - result.layers_needed_by_leases += 1; + result.layers_needed_by_pitr += 1; continue 'outer; } + + // 3. Is it needed by a child branch? + // NOTE With that we would keep data that + // might be referenced by child branches forever. + // We can track this in child timeline GC and delete parent layers when + // they are no longer needed. This might be complicated with long inheritance chains. + if let Some(retain_lsn) = max_retain_lsn { + // start_lsn is inclusive + if &l.get_lsn_range().start <= retain_lsn { + debug!( + "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}", + l.layer_name(), + retain_lsn, + l.is_incremental(), + ); + result.layers_needed_by_branches += 1; + continue 'outer; + } + } + + // 4. Is there a valid lease that requires us to keep this layer? + if let Some(lsn) = &max_lsn_with_valid_lease { + // keep if layer start <= any of the lease + if &l.get_lsn_range().start <= lsn { + debug!( + "keeping {} because there is a valid lease preventing GC at {}", + l.layer_name(), + lsn, + ); + result.layers_needed_by_leases += 1; + continue 'outer; + } + } + + // 5. Is there a later on-disk layer for this relation? + // + // The end-LSN is exclusive, while disk_consistent_lsn is + // inclusive. For example, if disk_consistent_lsn is 100, it is + // OK for a delta layer to have end LSN 101, but if the end LSN + // is 102, then it might not have been fully flushed to disk + // before crash. + // + // For example, imagine that the following layers exist: + // + // 1000 - image (A) + // 1000-2000 - delta (B) + // 2000 - image (C) + // 2000-3000 - delta (D) + // 3000 - image (E) + // + // If GC horizon is at 2500, we can remove layers A and B, but + // we cannot remove C, even though it's older than 2500, because + // the delta layer 2000-3000 depends on it. + if !layers + .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff)) + { + debug!("keeping {} because it is the latest layer", l.layer_name()); + result.layers_not_updated += 1; + continue 'outer; + } + + // We didn't find any reason to keep this file, so remove it. + info!( + "garbage collecting {} is_dropped: xx is_incremental: {}", + l.layer_name(), + l.is_incremental(), + ); + layers_to_remove.push(l); } - // 5. Is there a later on-disk layer for this relation? - // - // The end-LSN is exclusive, while disk_consistent_lsn is - // inclusive. For example, if disk_consistent_lsn is 100, it is - // OK for a delta layer to have end LSN 101, but if the end LSN - // is 102, then it might not have been fully flushed to disk - // before crash. - // - // For example, imagine that the following layers exist: - // - // 1000 - image (A) - // 1000-2000 - delta (B) - // 2000 - image (C) - // 2000-3000 - delta (D) - // 3000 - image (E) - // - // If GC horizon is at 2500, we can remove layers A and B, but - // we cannot remove C, even though it's older than 2500, because - // the delta layer 2000-3000 depends on it. - if !layers - .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff)) - { - info!("keeping {} because it is the latest layer", l.layer_name()); - result.layers_not_updated += 1; - continue 'outer; - } - - // We didn't find any reason to keep this file, so remove it. - info!( - "garbage collecting {} is_dropped: xx is_incremental: {}", - l.layer_name(), - l.is_incremental(), - ); - layers_to_remove.push(l); - } + layers_to_remove + }; if !layers_to_remove.is_empty() { // Persist the new GC cutoff value before we actually remove anything. @@ -6670,15 +6672,19 @@ impl Timeline { } })?; + let mut guard = self + .layers + .write(LayerManagerLockHolder::GarbageCollection) + .await; + let gc_layers = layers_to_remove .iter() - .map(|x| guard.get_from_desc(x)) + .flat_map(|desc| guard.try_get_from_key(&desc.key()).cloned()) .collect::>(); result.layers_removed = gc_layers.len() as u64; self.remote_client.schedule_gc_update(&gc_layers)?; - guard.open_mut()?.finish_gc_timeline(&gc_layers); #[cfg(feature = "testing")] diff --git a/proxy/src/batch.rs b/proxy/src/batch.rs new file mode 100644 index 0000000000..61bdf2b747 --- /dev/null +++ b/proxy/src/batch.rs @@ -0,0 +1,146 @@ +//! Batch processing system based on intrusive linked lists. +//! +//! Enqueuing a batch job requires no allocations, with +//! direct support for cancelling jobs early. +use std::collections::BTreeMap; +use std::pin::pin; +use std::sync::Mutex; + +use futures::future::Either; +use scopeguard::ScopeGuard; +use tokio::sync::oneshot::error::TryRecvError; + +use crate::ext::LockExt; + +pub trait QueueProcessing: Send + 'static { + type Req: Send + 'static; + type Res: Send; + + /// Get the desired batch size. + fn batch_size(&self, queue_size: usize) -> usize; + + /// This applies a full batch of events. + /// Must respond with a full batch of replies. + /// + /// If this apply can error, it's expected that errors be forwarded to each Self::Res. + /// + /// Batching does not need to happen atomically. + fn apply(&mut self, req: Vec) -> impl Future> + Send; +} + +pub struct BatchQueue { + processor: tokio::sync::Mutex

, + inner: Mutex>, +} + +struct BatchJob { + req: P::Req, + res: tokio::sync::oneshot::Sender, +} + +impl BatchQueue

{ + pub fn new(p: P) -> Self { + Self { + processor: tokio::sync::Mutex::new(p), + inner: Mutex::new(BatchQueueInner { + version: 0, + queue: BTreeMap::new(), + }), + } + } + + pub async fn call(&self, req: P::Req) -> P::Res { + let (id, mut rx) = self.inner.lock_propagate_poison().register_job(req); + let guard = scopeguard::guard(id, move |id| { + let mut inner = self.inner.lock_propagate_poison(); + if inner.queue.remove(&id).is_some() { + tracing::debug!("batched task cancelled before completion"); + } + }); + + let resp = loop { + // try become the leader, or try wait for success. + let mut processor = match futures::future::select(rx, pin!(self.processor.lock())).await + { + // we got the resp. + Either::Left((resp, _)) => break resp.ok(), + // we are the leader. + Either::Right((p, rx_)) => { + rx = rx_; + p + } + }; + + let (reqs, resps) = self.inner.lock_propagate_poison().get_batch(&processor); + + // apply a batch. + let values = processor.apply(reqs).await; + + // send response values. + for (tx, value) in std::iter::zip(resps, values) { + // sender hung up but that's fine. + drop(tx.send(value)); + } + + match rx.try_recv() { + Ok(resp) => break Some(resp), + Err(TryRecvError::Closed) => break None, + // edge case - there was a race condition where + // we became the leader but were not in the batch. + // + // Example: + // thread 1: register job id=1 + // thread 2: register job id=2 + // thread 2: processor.lock().await + // thread 1: processor.lock().await + // thread 2: becomes leader, batch_size=1, jobs=[1]. + Err(TryRecvError::Empty) => {} + } + }; + + // already removed. + ScopeGuard::into_inner(guard); + + resp.expect("no response found. batch processer should not panic") + } +} + +struct BatchQueueInner { + version: u64, + queue: BTreeMap>, +} + +impl BatchQueueInner

{ + fn register_job(&mut self, req: P::Req) -> (u64, tokio::sync::oneshot::Receiver) { + let (tx, rx) = tokio::sync::oneshot::channel(); + + let id = self.version; + + // Overflow concern: + // This is a u64, and we might enqueue 2^16 tasks per second. + // This gives us 2^48 seconds (9 million years). + // Even if this does overflow, it will not break, but some + // jobs with the higher version might never get prioritised. + self.version += 1; + + self.queue.insert(id, BatchJob { req, res: tx }); + + (id, rx) + } + + fn get_batch(&mut self, p: &P) -> (Vec, Vec>) { + let batch_size = p.batch_size(self.queue.len()); + let mut reqs = Vec::with_capacity(batch_size); + let mut resps = Vec::with_capacity(batch_size); + + while reqs.len() < batch_size { + let Some((_, job)) = self.queue.pop_first() else { + break; + }; + reqs.push(job.req); + resps.push(job.res); + } + + (reqs, resps) + } +} diff --git a/proxy/src/binary/local_proxy.rs b/proxy/src/binary/local_proxy.rs index ba10fce7b4..e3be454713 100644 --- a/proxy/src/binary/local_proxy.rs +++ b/proxy/src/binary/local_proxy.rs @@ -201,7 +201,7 @@ pub async fn run() -> anyhow::Result<()> { auth_backend, http_listener, shutdown.clone(), - Arc::new(CancellationHandler::new(&config.connect_to_compute, None)), + Arc::new(CancellationHandler::new(&config.connect_to_compute)), endpoint_rate_limiter, ); diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index 6ab6df5610..9215dbf73f 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -23,7 +23,8 @@ use utils::{project_build_tag, project_git_version}; use crate::auth::backend::jwt::JwkCache; use crate::auth::backend::{ConsoleRedirectBackend, MaybeOwned}; -use crate::cancellation::{CancellationHandler, handle_cancel_messages}; +use crate::batch::BatchQueue; +use crate::cancellation::{CancellationHandler, CancellationProcessor}; use crate::config::{ self, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig, ProjectInfoCacheOptions, ProxyConfig, ProxyProtocolV2, remote_storage_from_toml, @@ -392,13 +393,7 @@ pub async fn run() -> anyhow::Result<()> { .as_ref() .map(|redis_publisher| RedisKVClient::new(redis_publisher.clone(), redis_rps_limit)); - // channel size should be higher than redis client limit to avoid blocking - let cancel_ch_size = args.cancellation_ch_size; - let (tx_cancel, rx_cancel) = tokio::sync::mpsc::channel(cancel_ch_size); - let cancellation_handler = Arc::new(CancellationHandler::new( - &config.connect_to_compute, - Some(tx_cancel), - )); + let cancellation_handler = Arc::new(CancellationHandler::new(&config.connect_to_compute)); let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new_with_shards( RateBucketInfo::to_leaky_bucket(&args.endpoint_rps_limit) @@ -530,21 +525,11 @@ pub async fn run() -> anyhow::Result<()> { match redis_kv_client.try_connect().await { Ok(()) => { info!("Connected to Redis KV client"); - maintenance_tasks.spawn(async move { - handle_cancel_messages( - &mut redis_kv_client, - rx_cancel, - args.cancellation_batch_size, - ) - .await?; + cancellation_handler.init_tx(BatchQueue::new(CancellationProcessor { + client: redis_kv_client, + batch_size: args.cancellation_batch_size, + })); - drop(redis_kv_client); - - // `handle_cancel_messages` was terminated due to the tx_cancel - // being dropped. this is not worthy of an error, and this task can only return `Err`, - // so let's wait forever instead. - std::future::pending().await - }); break; } Err(e) => { diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index cce4c1d3a0..036f36c7f6 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -1,19 +1,23 @@ +use std::convert::Infallible; use std::net::{IpAddr, SocketAddr}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; +use std::time::Duration; -use anyhow::{Context, anyhow}; +use anyhow::anyhow; +use futures::FutureExt; use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -use postgres_client::CancelToken; +use postgres_client::RawCancelToken; use postgres_client::tls::MakeTlsConnect; use redis::{Cmd, FromRedisValue, Value}; use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::net::TcpStream; -use tokio::sync::{mpsc, oneshot}; -use tracing::{debug, error, info, warn}; +use tokio::time::timeout; +use tracing::{debug, error, info}; use crate::auth::AuthError; use crate::auth::backend::ComputeUserInfo; +use crate::batch::{BatchQueue, QueueProcessing}; use crate::config::ComputeConfig; use crate::context::RequestContext; use crate::control_plane::ControlPlaneApi; @@ -27,46 +31,36 @@ use crate::redis::kv_ops::RedisKVClient; type IpSubnetKey = IpNet; -const CANCEL_KEY_TTL: i64 = 1_209_600; // 2 weeks cancellation key expire time +const CANCEL_KEY_TTL: std::time::Duration = std::time::Duration::from_secs(600); +const CANCEL_KEY_REFRESH: std::time::Duration = std::time::Duration::from_secs(570); // Message types for sending through mpsc channel pub enum CancelKeyOp { StoreCancelKey { - key: String, - field: String, - value: String, - resp_tx: Option>>, - _guard: CancelChannelSizeGuard<'static>, - expire: i64, // TTL for key + key: CancelKeyData, + value: Box, + expire: std::time::Duration, }, GetCancelData { - key: String, - resp_tx: oneshot::Sender>>, - _guard: CancelChannelSizeGuard<'static>, - }, - RemoveCancelKey { - key: String, - field: String, - resp_tx: Option>>, - _guard: CancelChannelSizeGuard<'static>, + key: CancelKeyData, }, } pub struct Pipeline { inner: redis::Pipeline, - replies: Vec, + replies: usize, } impl Pipeline { fn with_capacity(n: usize) -> Self { Self { inner: redis::Pipeline::with_capacity(n), - replies: Vec::with_capacity(n), + replies: 0, } } - async fn execute(&mut self, client: &mut RedisKVClient) { - let responses = self.replies.len(); + async fn execute(self, client: &mut RedisKVClient) -> Vec> { + let responses = self.replies; let batch_size = self.inner.len(); match client.query(&self.inner).await { @@ -76,176 +70,73 @@ impl Pipeline { batch_size, responses, "successfully completed cancellation jobs", ); - for (value, reply) in std::iter::zip(values, self.replies.drain(..)) { - reply.send_value(value); - } + values.into_iter().map(Ok).collect() } Ok(value) => { error!(batch_size, ?value, "unexpected redis return value"); - for reply in self.replies.drain(..) { - reply.send_err(anyhow!("incorrect response type from redis")); - } + std::iter::repeat_with(|| Err(anyhow!("incorrect response type from redis"))) + .take(responses) + .collect() } Err(err) => { - for reply in self.replies.drain(..) { - reply.send_err(anyhow!("could not send cmd to redis: {err}")); - } + std::iter::repeat_with(|| Err(anyhow!("could not send cmd to redis: {err}"))) + .take(responses) + .collect() } } - - self.inner.clear(); - self.replies.clear(); } - fn add_command_with_reply(&mut self, cmd: Cmd, reply: CancelReplyOp) { + fn add_command_with_reply(&mut self, cmd: Cmd) { self.inner.add_command(cmd); - self.replies.push(reply); + self.replies += 1; } fn add_command_no_reply(&mut self, cmd: Cmd) { self.inner.add_command(cmd).ignore(); } - - fn add_command(&mut self, cmd: Cmd, reply: Option) { - match reply { - Some(reply) => self.add_command_with_reply(cmd, reply), - None => self.add_command_no_reply(cmd), - } - } } impl CancelKeyOp { - fn register(self, pipe: &mut Pipeline) { + fn register(&self, pipe: &mut Pipeline) { #[allow(clippy::used_underscore_binding)] match self { - CancelKeyOp::StoreCancelKey { - key, - field, - value, - resp_tx, - _guard, - expire, - } => { - let reply = - resp_tx.map(|resp_tx| CancelReplyOp::StoreCancelKey { resp_tx, _guard }); - pipe.add_command(Cmd::hset(&key, field, value), reply); - pipe.add_command_no_reply(Cmd::expire(key, expire)); + CancelKeyOp::StoreCancelKey { key, value, expire } => { + let key = KeyPrefix::Cancel(*key).build_redis_key(); + pipe.add_command_with_reply(Cmd::hset(&key, "data", &**value)); + pipe.add_command_no_reply(Cmd::expire(&key, expire.as_secs() as i64)); } - CancelKeyOp::GetCancelData { - key, - resp_tx, - _guard, - } => { - let reply = CancelReplyOp::GetCancelData { resp_tx, _guard }; - pipe.add_command_with_reply(Cmd::hgetall(key), reply); - } - CancelKeyOp::RemoveCancelKey { - key, - field, - resp_tx, - _guard, - } => { - let reply = - resp_tx.map(|resp_tx| CancelReplyOp::RemoveCancelKey { resp_tx, _guard }); - pipe.add_command(Cmd::hdel(key, field), reply); + CancelKeyOp::GetCancelData { key } => { + let key = KeyPrefix::Cancel(*key).build_redis_key(); + pipe.add_command_with_reply(Cmd::hget(key, "data")); } } } } -// Message types for sending through mpsc channel -pub enum CancelReplyOp { - StoreCancelKey { - resp_tx: oneshot::Sender>, - _guard: CancelChannelSizeGuard<'static>, - }, - GetCancelData { - resp_tx: oneshot::Sender>>, - _guard: CancelChannelSizeGuard<'static>, - }, - RemoveCancelKey { - resp_tx: oneshot::Sender>, - _guard: CancelChannelSizeGuard<'static>, - }, +pub struct CancellationProcessor { + pub client: RedisKVClient, + pub batch_size: usize, } -impl CancelReplyOp { - fn send_err(self, e: anyhow::Error) { - match self { - CancelReplyOp::StoreCancelKey { resp_tx, _guard } => { - resp_tx - .send(Err(e)) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - CancelReplyOp::GetCancelData { resp_tx, _guard } => { - resp_tx - .send(Err(e)) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - CancelReplyOp::RemoveCancelKey { resp_tx, _guard } => { - resp_tx - .send(Err(e)) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - } +impl QueueProcessing for CancellationProcessor { + type Req = (CancelChannelSizeGuard<'static>, CancelKeyOp); + type Res = anyhow::Result; + + fn batch_size(&self, _queue_size: usize) -> usize { + self.batch_size } - fn send_value(self, v: redis::Value) { - match self { - CancelReplyOp::StoreCancelKey { resp_tx, _guard } => { - let send = - FromRedisValue::from_owned_redis_value(v).context("could not parse value"); - resp_tx - .send(send) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - CancelReplyOp::GetCancelData { resp_tx, _guard } => { - let send = - FromRedisValue::from_owned_redis_value(v).context("could not parse value"); - resp_tx - .send(send) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - CancelReplyOp::RemoveCancelKey { resp_tx, _guard } => { - let send = - FromRedisValue::from_owned_redis_value(v).context("could not parse value"); - resp_tx - .send(send) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } - } - } -} - -// Running as a separate task to accept messages through the rx channel -pub async fn handle_cancel_messages( - client: &mut RedisKVClient, - mut rx: mpsc::Receiver, - batch_size: usize, -) -> anyhow::Result<()> { - let mut batch = Vec::with_capacity(batch_size); - let mut pipeline = Pipeline::with_capacity(batch_size); - - loop { - if rx.recv_many(&mut batch, batch_size).await == 0 { - warn!("shutting down cancellation queue"); - break Ok(()); - } + async fn apply(&mut self, batch: Vec) -> Vec { + let mut pipeline = Pipeline::with_capacity(batch.len()); let batch_size = batch.len(); debug!(batch_size, "running cancellation jobs"); - for msg in batch.drain(..) { - msg.register(&mut pipeline); + for (_, op) in &batch { + op.register(&mut pipeline); } - pipeline.execute(client).await; + pipeline.execute(&mut self.client).await } } @@ -256,7 +147,7 @@ pub struct CancellationHandler { compute_config: &'static ComputeConfig, // rate limiter of cancellation requests limiter: Arc>>, - tx: Option>, // send messages to the redis KV client task + tx: OnceLock>, // send messages to the redis KV client task } #[derive(Debug, Error)] @@ -296,13 +187,10 @@ impl ReportableError for CancelError { } impl CancellationHandler { - pub fn new( - compute_config: &'static ComputeConfig, - tx: Option>, - ) -> Self { + pub fn new(compute_config: &'static ComputeConfig) -> Self { Self { compute_config, - tx, + tx: OnceLock::new(), limiter: Arc::new(std::sync::Mutex::new( LeakyBucketRateLimiter::::new_with_shards( LeakyBucketRateLimiter::::DEFAULT, @@ -312,7 +200,14 @@ impl CancellationHandler { } } - pub(crate) fn get_key(self: &Arc) -> Session { + pub fn init_tx(&self, queue: BatchQueue) { + self.tx + .set(queue) + .map_err(|_| {}) + .expect("cancellation queue should be registered once"); + } + + pub(crate) fn get_key(self: Arc) -> Session { // we intentionally generate a random "backend pid" and "secret key" here. // we use the corresponding u64 as an identifier for the // actual endpoint+pid+secret for postgres/pgbouncer. @@ -322,14 +217,10 @@ impl CancellationHandler { let key: CancelKeyData = rand::random(); - let prefix_key: KeyPrefix = KeyPrefix::Cancel(key); - let redis_key = prefix_key.build_redis_key(); - debug!("registered new query cancellation key {key}"); Session { key, - redis_key, - cancellation_handler: Arc::clone(self), + cancellation_handler: self, } } @@ -337,62 +228,43 @@ impl CancellationHandler { &self, key: CancelKeyData, ) -> Result, CancelError> { - let prefix_key: KeyPrefix = KeyPrefix::Cancel(key); - let redis_key = prefix_key.build_redis_key(); + let guard = Metrics::get() + .proxy + .cancel_channel_size + .guard(RedisMsgKind::HGet); + let op = CancelKeyOp::GetCancelData { key }; - let (resp_tx, resp_rx) = tokio::sync::oneshot::channel(); - let op = CancelKeyOp::GetCancelData { - key: redis_key, - resp_tx, - _guard: Metrics::get() - .proxy - .cancel_channel_size - .guard(RedisMsgKind::HGetAll), - }; - - let Some(tx) = &self.tx else { + let Some(tx) = self.tx.get() else { tracing::warn!("cancellation handler is not available"); return Err(CancelError::InternalError); }; - tx.try_send(op) + const TIMEOUT: Duration = Duration::from_secs(5); + let result = timeout(TIMEOUT, tx.call((guard, op))) + .await + .map_err(|_| { + tracing::warn!("timed out waiting to receive GetCancelData response"); + CancelError::RateLimit + })? .map_err(|e| { - tracing::warn!("failed to send GetCancelData for {key}: {e}"); - }) - .map_err(|()| CancelError::InternalError)?; + tracing::warn!("failed to receive GetCancelData response: {e}"); + CancelError::InternalError + })?; - let result = resp_rx.await.map_err(|e| { + let cancel_state_str = String::from_owned_redis_value(result).map_err(|e| { tracing::warn!("failed to receive GetCancelData response: {e}"); CancelError::InternalError })?; - let cancel_state_str: Option = match result { - Ok(mut state) => { - if state.len() == 1 { - Some(state.remove(0).1) - } else { - tracing::warn!("unexpected number of entries in cancel state: {state:?}"); - return Err(CancelError::InternalError); - } - } - Err(e) => { - tracing::warn!("failed to receive cancel state from redis: {e}"); - return Err(CancelError::InternalError); - } - }; + let cancel_closure: CancelClosure = + serde_json::from_str(&cancel_state_str).map_err(|e| { + tracing::warn!("failed to deserialize cancel state: {e}"); + CancelError::InternalError + })?; - let cancel_state: Option = match cancel_state_str { - Some(state) => { - let cancel_closure: CancelClosure = serde_json::from_str(&state).map_err(|e| { - tracing::warn!("failed to deserialize cancel state: {e}"); - CancelError::InternalError - })?; - Some(cancel_closure) - } - None => None, - }; - Ok(cancel_state) + Ok(Some(cancel_closure)) } + /// Try to cancel a running query for the corresponding connection. /// If the cancellation key is not found, it will be published to Redis. /// check_allowed - if true, check if the IP is allowed to cancel the query. @@ -467,10 +339,10 @@ impl CancellationHandler { /// This should've been a [`std::future::Future`], but /// it's impossible to name a type of an unboxed future /// (we'd need something like `#![feature(type_alias_impl_trait)]`). -#[derive(Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct CancelClosure { socket_addr: SocketAddr, - cancel_token: CancelToken, + cancel_token: RawCancelToken, hostname: String, // for pg_sni router user_info: ComputeUserInfo, } @@ -478,7 +350,7 @@ pub struct CancelClosure { impl CancelClosure { pub(crate) fn new( socket_addr: SocketAddr, - cancel_token: CancelToken, + cancel_token: RawCancelToken, hostname: String, user_info: ComputeUserInfo, ) -> Self { @@ -491,7 +363,7 @@ impl CancelClosure { } /// Cancels the query running on user's compute node. pub(crate) async fn try_cancel_query( - self, + &self, compute_config: &ComputeConfig, ) -> Result<(), CancelError> { let socket = TcpStream::connect(self.socket_addr).await?; @@ -512,7 +384,6 @@ impl CancelClosure { pub(crate) struct Session { /// The user-facing key identifying this session. key: CancelKeyData, - redis_key: String, cancellation_handler: Arc, } @@ -521,60 +392,66 @@ impl Session { &self.key } - // Send the store key op to the cancellation handler and set TTL for the key - pub(crate) fn write_cancel_key( + /// Ensure the cancel key is continously refreshed, + /// but stop when the channel is dropped. + pub(crate) async fn maintain_cancel_key( &self, - cancel_closure: CancelClosure, - ) -> Result<(), CancelError> { - let Some(tx) = &self.cancellation_handler.tx else { - tracing::warn!("cancellation handler is not available"); - return Err(CancelError::InternalError); - }; + session_id: uuid::Uuid, + cancel: tokio::sync::oneshot::Receiver, + cancel_closure: &CancelClosure, + compute_config: &ComputeConfig, + ) { + futures::future::select( + std::pin::pin!(self.maintain_redis_cancel_key(cancel_closure)), + cancel, + ) + .await; - let closure_json = serde_json::to_string(&cancel_closure).map_err(|e| { - tracing::warn!("failed to serialize cancel closure: {e}"); - CancelError::InternalError - })?; - - let op = CancelKeyOp::StoreCancelKey { - key: self.redis_key.clone(), - field: "data".to_string(), - value: closure_json, - resp_tx: None, - _guard: Metrics::get() - .proxy - .cancel_channel_size - .guard(RedisMsgKind::HSet), - expire: CANCEL_KEY_TTL, - }; - - let _ = tx.try_send(op).map_err(|e| { - let key = self.key; - tracing::warn!("failed to send StoreCancelKey for {key}: {e}"); - }); - Ok(()) + if let Err(err) = cancel_closure + .try_cancel_query(compute_config) + .boxed() + .await + { + tracing::warn!( + ?session_id, + ?err, + "could not cancel the query in the database" + ); + } } - pub(crate) fn remove_cancel_key(&self) -> Result<(), CancelError> { - let Some(tx) = &self.cancellation_handler.tx else { + // Ensure the cancel key is continously refreshed. + async fn maintain_redis_cancel_key(&self, cancel_closure: &CancelClosure) -> ! { + let Some(tx) = self.cancellation_handler.tx.get() else { tracing::warn!("cancellation handler is not available"); - return Err(CancelError::InternalError); + // don't exit, as we only want to exit if cancelled externally. + std::future::pending().await }; - let op = CancelKeyOp::RemoveCancelKey { - key: self.redis_key.clone(), - field: "data".to_string(), - resp_tx: None, - _guard: Metrics::get() + let closure_json = serde_json::to_string(&cancel_closure) + .expect("serialising to json string should not fail") + .into_boxed_str(); + + loop { + let guard = Metrics::get() .proxy .cancel_channel_size - .guard(RedisMsgKind::HDel), - }; + .guard(RedisMsgKind::HSet); + let op = CancelKeyOp::StoreCancelKey { + key: self.key, + value: closure_json.clone(), + expire: CANCEL_KEY_TTL, + }; - let _ = tx.try_send(op).map_err(|e| { - let key = self.key; - tracing::warn!("failed to send RemoveCancelKey for {key}: {e}"); - }); - Ok(()) + tracing::debug!( + src=%self.key, + dest=?cancel_closure.cancel_token, + "registering cancellation key" + ); + + if tx.call((guard, op)).await.is_ok() { + tokio::time::sleep(CANCEL_KEY_REFRESH).await; + } + } } } diff --git a/proxy/src/compute/mod.rs b/proxy/src/compute/mod.rs index aae1fea07d..5dd264b35e 100644 --- a/proxy/src/compute/mod.rs +++ b/proxy/src/compute/mod.rs @@ -9,7 +9,7 @@ use itertools::Itertools; use postgres_client::config::{AuthKeys, SslMode}; use postgres_client::maybe_tls_stream::MaybeTlsStream; use postgres_client::tls::MakeTlsConnect; -use postgres_client::{CancelToken, NoTls, RawConnection}; +use postgres_client::{NoTls, RawCancelToken, RawConnection}; use postgres_protocol::message::backend::NoticeResponseBody; use thiserror::Error; use tokio::net::{TcpStream, lookup_host}; @@ -265,7 +265,8 @@ impl ConnectInfo { } } -type RustlsStream = >::Stream; +pub type RustlsStream = >::Stream; +pub type MaybeRustlsStream = MaybeTlsStream; pub(crate) struct PostgresConnection { /// Socket connected to a compute node. @@ -279,7 +280,7 @@ pub(crate) struct PostgresConnection { /// Notices received from compute after authenticating pub(crate) delayed_notice: Vec, - _guage: NumDbConnectionsGuard<'static>, + pub(crate) guage: NumDbConnectionsGuard<'static>, } impl ConnectInfo { @@ -327,8 +328,7 @@ impl ConnectInfo { // Yet another reason to rework the connection establishing code. let cancel_closure = CancelClosure::new( socket_addr, - CancelToken { - socket_config: None, + RawCancelToken { ssl_mode: self.ssl_mode, process_id, secret_key, @@ -343,7 +343,7 @@ impl ConnectInfo { delayed_notice, cancel_closure, aux, - _guage: Metrics::get().proxy.db_connections.guard(ctx.protocol()), + guage: Metrics::get().proxy.db_connections.guard(ctx.protocol()), }; Ok(connection) diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 5331ea41fd..89adfc9049 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -120,7 +120,7 @@ pub async fn task_main( Ok(Some(p)) => { ctx.set_success(); let _disconnect = ctx.log_connect(); - match p.proxy_pass(&config.connect_to_compute).await { + match p.proxy_pass().await { Ok(()) => {} Err(ErrorSource::Client(e)) => { error!( @@ -232,22 +232,35 @@ pub(crate) async fn handle_client( .or_else(|e| async { Err(stream.throw_error(e, Some(ctx)).await) }) .await?; - let cancellation_handler_clone = Arc::clone(&cancellation_handler); - let session = cancellation_handler_clone.get_key(); - - session.write_cancel_key(node.cancel_closure.clone())?; + let session = cancellation_handler.get_key(); prepare_client_connection(&node, *session.key(), &mut stream); let stream = stream.flush_and_into_inner().await?; + let session_id = ctx.session_id(); + let (cancel_on_shutdown, cancel) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + session + .maintain_cancel_key( + session_id, + cancel, + &node.cancel_closure, + &config.connect_to_compute, + ) + .await; + }); + Ok(Some(ProxyPassthrough { client: stream, - aux: node.aux.clone(), + compute: node.stream, + + aux: node.aux, private_link_id: None, - compute: node, - session_id: ctx.session_id(), - cancel: session, + + _cancel_on_shutdown: cancel_on_shutdown, + _req: request_gauge, _conn: conn_gauge, + _db_conn: node.guage, })) } diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 026c6aeba9..d96f582fad 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -75,6 +75,7 @@ pub mod binary; mod auth; +mod batch; mod cache; mod cancellation; mod compute; diff --git a/proxy/src/pglb/passthrough.rs b/proxy/src/pglb/passthrough.rs index 6f651d383d..d4c029f6d9 100644 --- a/proxy/src/pglb/passthrough.rs +++ b/proxy/src/pglb/passthrough.rs @@ -1,15 +1,17 @@ -use futures::FutureExt; +use std::convert::Infallible; + use smol_str::SmolStr; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::debug; use utils::measured_stream::MeasuredStream; use super::copy_bidirectional::ErrorSource; -use crate::cancellation; -use crate::compute::PostgresConnection; -use crate::config::ComputeConfig; +use crate::compute::MaybeRustlsStream; use crate::control_plane::messages::MetricsAuxInfo; -use crate::metrics::{Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard}; +use crate::metrics::{ + Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard, + NumDbConnectionsGuard, +}; use crate::stream::Stream; use crate::usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS}; @@ -64,40 +66,20 @@ pub(crate) async fn proxy_pass( pub(crate) struct ProxyPassthrough { pub(crate) client: Stream, - pub(crate) compute: PostgresConnection, + pub(crate) compute: MaybeRustlsStream, + pub(crate) aux: MetricsAuxInfo, - pub(crate) session_id: uuid::Uuid, pub(crate) private_link_id: Option, - pub(crate) cancel: cancellation::Session, + + pub(crate) _cancel_on_shutdown: tokio::sync::oneshot::Sender, pub(crate) _req: NumConnectionRequestsGuard<'static>, pub(crate) _conn: NumClientConnectionsGuard<'static>, + pub(crate) _db_conn: NumDbConnectionsGuard<'static>, } impl ProxyPassthrough { - pub(crate) async fn proxy_pass( - self, - compute_config: &ComputeConfig, - ) -> Result<(), ErrorSource> { - let res = proxy_pass( - self.client, - self.compute.stream, - self.aux, - self.private_link_id, - ) - .await; - if let Err(err) = self - .compute - .cancel_closure - .try_cancel_query(compute_config) - .boxed() - .await - { - tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database"); - } - - drop(self.cancel.remove_cancel_key()); // we don't need a result. If the queue is full, we just log the error - - res + pub(crate) async fn proxy_pass(self) -> Result<(), ErrorSource> { + proxy_pass(self.client, self.compute, self.aux, self.private_link_id).await } } diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 4211406f6c..7da1b8d8fa 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -155,7 +155,7 @@ pub async fn task_main( Ok(Some(p)) => { ctx.set_success(); let _disconnect = ctx.log_connect(); - match p.proxy_pass(&config.connect_to_compute).await { + match p.proxy_pass().await { Ok(()) => {} Err(ErrorSource::Client(e)) => { warn!( @@ -372,13 +372,24 @@ pub(crate) async fn handle_client( Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?, }; - let cancellation_handler_clone = Arc::clone(&cancellation_handler); - let session = cancellation_handler_clone.get_key(); + let session = cancellation_handler.get_key(); - session.write_cancel_key(node.cancel_closure.clone())?; prepare_client_connection(&node, *session.key(), &mut stream); let stream = stream.flush_and_into_inner().await?; + let session_id = ctx.session_id(); + let (cancel_on_shutdown, cancel) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + session + .maintain_cancel_key( + session_id, + cancel, + &node.cancel_closure, + &config.connect_to_compute, + ) + .await; + }); + let private_link_id = match ctx.extra() { Some(ConnectionInfoExtra::Aws { vpce_id }) => Some(vpce_id.clone()), Some(ConnectionInfoExtra::Azure { link_id }) => Some(link_id.to_smolstr()), @@ -387,13 +398,16 @@ pub(crate) async fn handle_client( Ok(Some(ProxyPassthrough { client: stream, - aux: node.aux.clone(), + compute: node.stream, + + aux: node.aux, private_link_id, - compute: node, - session_id: ctx.session_id(), - cancel: session, + + _cancel_on_shutdown: cancel_on_shutdown, + _req: request_gauge, _conn: conn_gauge, + _db_conn: node.guage, })) } diff --git a/proxy/src/redis/keys.rs b/proxy/src/redis/keys.rs index 3113bad949..b453e6851c 100644 --- a/proxy/src/redis/keys.rs +++ b/proxy/src/redis/keys.rs @@ -1,8 +1,4 @@ -use std::io::ErrorKind; - -use anyhow::Ok; - -use crate::pqproto::{CancelKeyData, id_to_cancel_key}; +use crate::pqproto::CancelKeyData; pub mod keyspace { pub const CANCEL_PREFIX: &str = "cancel"; @@ -23,39 +19,12 @@ impl KeyPrefix { } } } - - #[allow(dead_code)] - pub(crate) fn as_str(&self) -> &'static str { - match self { - KeyPrefix::Cancel(_) => keyspace::CANCEL_PREFIX, - } - } -} - -#[allow(dead_code)] -pub(crate) fn parse_redis_key(key: &str) -> anyhow::Result { - let (prefix, key_str) = key.split_once(':').ok_or_else(|| { - anyhow::anyhow!(std::io::Error::new( - ErrorKind::InvalidData, - "missing prefix" - )) - })?; - - match prefix { - keyspace::CANCEL_PREFIX => { - let id = u64::from_str_radix(key_str, 16)?; - - Ok(KeyPrefix::Cancel(id_to_cancel_key(id))) - } - _ => Err(anyhow::anyhow!(std::io::Error::new( - ErrorKind::InvalidData, - "unknown prefix" - ))), - } } #[cfg(test)] mod tests { + use crate::pqproto::id_to_cancel_key; + use super::*; #[test] @@ -65,16 +34,4 @@ mod tests { let redis_key = cancel_key.build_redis_key(); assert_eq!(redis_key, "cancel:30390000d431"); } - - #[test] - fn test_parse_redis_key() { - let redis_key = "cancel:30390000d431"; - let key: KeyPrefix = parse_redis_key(redis_key).expect("Failed to parse key"); - - let ref_key = id_to_cancel_key(12345 << 32 | 54321); - - assert_eq!(key.as_str(), KeyPrefix::Cancel(ref_key).as_str()); - let KeyPrefix::Cancel(cancel_key) = key; - assert_eq!(ref_key, cancel_key); - } } diff --git a/proxy/src/redis/kv_ops.rs b/proxy/src/redis/kv_ops.rs index f71730c533..f8d3b5cc66 100644 --- a/proxy/src/redis/kv_ops.rs +++ b/proxy/src/redis/kv_ops.rs @@ -1,3 +1,6 @@ +use std::time::Duration; + +use futures::FutureExt; use redis::aio::ConnectionLike; use redis::{Cmd, FromRedisValue, Pipeline, RedisResult}; @@ -35,14 +38,11 @@ impl RedisKVClient { } pub async fn try_connect(&mut self) -> anyhow::Result<()> { - match self.client.connect().await { - Ok(()) => {} - Err(e) => { - tracing::error!("failed to connect to redis: {e}"); - return Err(e); - } - } - Ok(()) + self.client + .connect() + .boxed() + .await + .inspect_err(|e| tracing::error!("failed to connect to redis: {e}")) } pub(crate) async fn query( @@ -54,15 +54,25 @@ impl RedisKVClient { return Err(anyhow::anyhow!("Rate limit exceeded")); } - match q.query(&mut self.client).await { + let e = match q.query(&mut self.client).await { Ok(t) => return Ok(t), - Err(e) => { - tracing::error!("failed to run query: {e}"); + Err(e) => e, + }; + + tracing::error!("failed to run query: {e}"); + match e.retry_method() { + redis::RetryMethod::Reconnect => { + tracing::info!("Redis client is disconnected. Reconnecting..."); + self.try_connect().await?; } + redis::RetryMethod::RetryImmediately => {} + redis::RetryMethod::WaitAndRetry => { + // somewhat arbitrary. + tokio::time::sleep(Duration::from_millis(100)).await; + } + _ => Err(e)?, } - tracing::info!("Redis client is disconnected. Reconnecting..."); - self.try_connect().await?; Ok(q.query(&mut self.client).await?) } } diff --git a/proxy/src/serverless/websocket.rs b/proxy/src/serverless/websocket.rs index 8648a94869..0d374e6df2 100644 --- a/proxy/src/serverless/websocket.rs +++ b/proxy/src/serverless/websocket.rs @@ -167,7 +167,7 @@ pub(crate) async fn serve_websocket( Ok(Some(p)) => { ctx.set_success(); ctx.log_connect(); - match p.proxy_pass(&config.connect_to_compute).await { + match p.proxy_pass().await { Ok(()) => Ok(()), Err(ErrorSource::Client(err)) => Err(err).context("client"), Err(ErrorSource::Compute(err)) => Err(err).context("compute"), diff --git a/test_runner/regress/test_pg_regress.py b/test_runner/regress/test_pg_regress.py index 3695ece66b..728241b465 100644 --- a/test_runner/regress/test_pg_regress.py +++ b/test_runner/regress/test_pg_regress.py @@ -173,7 +173,11 @@ def test_pg_regress( (runpath / "testtablespace").mkdir(parents=True) # Compute all the file locations that pg_regress will need. - build_path = pg_distrib_dir / f"build/{env.pg_version.v_prefixed}/src/test/regress" + # + # XXX: We assume that the `build` directory is a sibling of the + # pg_distrib_dir. That is the default when you check out the + # repository; `build` and `pg_install` are created side by side. + build_path = pg_distrib_dir / f"../build/{env.pg_version.v_prefixed}/src/test/regress" src_path = base_dir / f"vendor/postgres-{env.pg_version.v_prefixed}/src/test/regress" bindir = pg_distrib_dir / f"v{env.pg_version}/bin" schedule = src_path / "parallel_schedule" @@ -250,7 +254,11 @@ def test_isolation( (runpath / "testtablespace").mkdir(parents=True) # Compute all the file locations that pg_isolation_regress will need. - build_path = pg_distrib_dir / f"build/{env.pg_version.v_prefixed}/src/test/isolation" + # + # XXX: We assume that the `build` directory is a sibling of the + # pg_distrib_dir. That is the default when you check out the + # repository; `build` and `pg_install` are created side by side. + build_path = pg_distrib_dir / f"../build/{env.pg_version.v_prefixed}/src/test/isolation" src_path = base_dir / f"vendor/postgres-{env.pg_version.v_prefixed}/src/test/isolation" bindir = pg_distrib_dir / f"v{env.pg_version}/bin" schedule = src_path / "isolation_schedule" @@ -314,8 +322,11 @@ def test_sql_regress( (runpath / "testtablespace").mkdir(parents=True) # Compute all the file locations that pg_regress will need. - # This test runs neon specific tests - build_path = pg_distrib_dir / f"build/v{env.pg_version}/src/test/regress" + # + # XXX: We assume that the `build` directory is a sibling of the + # pg_distrib_dir. That is the default when you check out the + # repository; `build` and `pg_install` are created side by side. + build_path = pg_distrib_dir / f"../build/{env.pg_version.v_prefixed}/src/test/regress" src_path = base_dir / "test_runner/sql_regress" bindir = pg_distrib_dir / f"v{env.pg_version}/bin" schedule = src_path / "parallel_schedule" diff --git a/test_runner/regress/test_s3_restore.py b/test_runner/regress/test_s3_restore.py index 082808f9ff..2d7be1f9d1 100644 --- a/test_runner/regress/test_s3_restore.py +++ b/test_runner/regress/test_s3_restore.py @@ -74,7 +74,7 @@ def test_tenant_s3_restore( last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) last_flush_lsns.append(last_flush_lsn) ps_http.timeline_checkpoint(tenant_id, timeline_id) - wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn) + wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn, timeout=60) log.info(f"{timeline} timeline {timeline_id} {last_flush_lsn=}") parent = timeline