From 154f6dc59cc91ebde58d1a0b4a8b43aa68d1c3a5 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 11 Jul 2025 14:25:25 +0100 Subject: [PATCH 01/14] pageserver: log only on final shard resolution failure (#12565) This log is too noisy. Instead of warning on every retry, let's log only on the final failure. --- pageserver/src/tenant/timeline/handle.rs | 10 +++++----- test_runner/fixtures/pageserver/allowed_errors.py | 3 +-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pageserver/src/tenant/timeline/handle.rs b/pageserver/src/tenant/timeline/handle.rs index 33c97287c0..7bca66190f 100644 --- a/pageserver/src/tenant/timeline/handle.rs +++ b/pageserver/src/tenant/timeline/handle.rs @@ -359,14 +359,14 @@ impl Cache { Err(e) => { // Retry on tenant manager error to handle tenant split more gracefully if attempt < GET_MAX_RETRIES { - tracing::warn!( - "Fail to resolve tenant shard in attempt {}: {:?}. Retrying...", - attempt, - e - ); tokio::time::sleep(RETRY_BACKOFF).await; continue; } else { + tracing::warn!( + "Failed to resolve tenant shard after {} attempts: {:?}", + GET_MAX_RETRIES, + e + ); return Err(e); } } diff --git a/test_runner/fixtures/pageserver/allowed_errors.py b/test_runner/fixtures/pageserver/allowed_errors.py index 0e4dd571c0..59249f31ad 100755 --- a/test_runner/fixtures/pageserver/allowed_errors.py +++ b/test_runner/fixtures/pageserver/allowed_errors.py @@ -115,8 +115,7 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = ( ".*Local data loss suspected.*", # Too many frozen layers error is normal during intensive benchmarks ".*too many frozen layers.*", - # Transient errors when resolving tenant shards by page service - ".*Fail to resolve tenant shard in attempt.*", + ".*Failed to resolve tenant shard after.*", # Expected warnings when pageserver has not refreshed GC info yet ".*pitr LSN/interval not found, skipping force image creation LSN calculation.*", ".*No broker updates received for a while.*", From a8db7ebffb7e9b2a1bc9cc950a03a244d26d34d4 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 11 Jul 2025 17:17:44 +0300 Subject: [PATCH 02/14] Minor refactor of the SQL functions to get working set size estimate (#12550) Split the functions into two: one internal function to calculate the estimate, and another (two functions) to expose it as SQL functions. This is in preparation of adding new communicator implementation. With that, the SQL functions will dispatch the call to the old or new implementation depending on which is being used. --- pgxn/neon/file_cache.c | 47 +++++++++++++++--------------------------- pgxn/neon/file_cache.h | 3 ++- pgxn/neon/neon.c | 30 +++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 31 deletions(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 8cfa09bc87..0e316abd1d 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -205,6 +205,8 @@ bool AmPrewarmWorker; #define LFC_ENABLED() (lfc_ctl->limit != 0) +PGDLLEXPORT void lfc_prewarm_main(Datum main_arg); + /* * Close LFC file if opened. * All backends should close their LFC files once LFC is disabled. @@ -2135,40 +2137,25 @@ local_cache_pages(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } -PG_FUNCTION_INFO_V1(approximate_working_set_size_seconds); -Datum -approximate_working_set_size_seconds(PG_FUNCTION_ARGS) +/* + * Internal implementation of the approximate_working_set_size_seconds() + * function. + */ +int32 +lfc_approximate_working_set_size_seconds(time_t duration, bool reset) { - if (lfc_size_limit != 0) - { - int32 dc; - time_t duration = PG_ARGISNULL(0) ? (time_t)-1 : PG_GETARG_INT32(0); - LWLockAcquire(lfc_lock, LW_SHARED); - dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration); - LWLockRelease(lfc_lock); - PG_RETURN_INT32(dc); - } - PG_RETURN_NULL(); -} + int32 dc; -PG_FUNCTION_INFO_V1(approximate_working_set_size); + if (lfc_size_limit == 0) + return -1; -Datum -approximate_working_set_size(PG_FUNCTION_ARGS) -{ - if (lfc_size_limit != 0) - { - int32 dc; - bool reset = PG_GETARG_BOOL(0); - LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED); - dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, (time_t)-1); - if (reset) - memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs); - LWLockRelease(lfc_lock); - PG_RETURN_INT32(dc); - } - PG_RETURN_NULL(); + LWLockAcquire(lfc_lock, LW_SHARED); + dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration); + if (reset) + memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs); + LWLockRelease(lfc_lock); + return dc; } PG_FUNCTION_INFO_V1(get_local_cache_state); diff --git a/pgxn/neon/file_cache.h b/pgxn/neon/file_cache.h index d5ac55d5ba..14e5d4f753 100644 --- a/pgxn/neon/file_cache.h +++ b/pgxn/neon/file_cache.h @@ -47,7 +47,8 @@ extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blk extern FileCacheState* lfc_get_state(size_t max_entries); extern void lfc_prewarm(FileCacheState* fcs, uint32 n_workers); -PGDLLEXPORT void lfc_prewarm_main(Datum main_arg); +extern int32 lfc_approximate_working_set_size_seconds(time_t duration, bool reset); + static inline bool lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 9e0ca16fed..7b749f1080 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -561,6 +561,8 @@ _PG_init(void) PG_FUNCTION_INFO_V1(pg_cluster_size); PG_FUNCTION_INFO_V1(backpressure_lsns); PG_FUNCTION_INFO_V1(backpressure_throttling_time); +PG_FUNCTION_INFO_V1(approximate_working_set_size_seconds); +PG_FUNCTION_INFO_V1(approximate_working_set_size); Datum pg_cluster_size(PG_FUNCTION_ARGS) @@ -607,6 +609,34 @@ backpressure_throttling_time(PG_FUNCTION_ARGS) PG_RETURN_UINT64(BackpressureThrottlingTime()); } +Datum +approximate_working_set_size_seconds(PG_FUNCTION_ARGS) +{ + time_t duration; + int32 dc; + + duration = PG_ARGISNULL(0) ? (time_t) -1 : PG_GETARG_INT32(0); + + dc = lfc_approximate_working_set_size_seconds(duration, false); + if (dc < 0) + PG_RETURN_NULL(); + else + PG_RETURN_INT32(dc); +} + +Datum +approximate_working_set_size(PG_FUNCTION_ARGS) +{ + bool reset = PG_GETARG_BOOL(0); + int32 dc; + + dc = lfc_approximate_working_set_size_seconds(-1, reset); + if (dc < 0) + PG_RETURN_NULL(); + else + PG_RETURN_INT32(dc); +} + #if PG_MAJORVERSION_NUM >= 16 static void neon_shmem_startup_hook(void) From f4245403b36925c3ad0ef39c344ca30b1701b74f Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 11 Jul 2025 16:13:36 +0100 Subject: [PATCH 03/14] [proxy] allow testing query cancellation locally (#12568) ## Problem Canceelation requires redis, redis required control-plane. ## Summary of changes Make redis for cancellation not require control plane. Add instructions for setting up redis locally. --- proxy/README.md | 10 +++++++++- proxy/src/binary/proxy.rs | 20 +++++++++++--------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/proxy/README.md b/proxy/README.md index e10ff3d710..ff48f9f323 100644 --- a/proxy/README.md +++ b/proxy/README.md @@ -123,6 +123,11 @@ docker exec -it proxy-postgres psql -U postgres -c "CREATE TABLE neon_control_pl docker exec -it proxy-postgres psql -U postgres -c "CREATE ROLE proxy WITH SUPERUSER LOGIN PASSWORD 'password';" ``` +If you want to test query cancellation, redis is also required: +```sh +docker run --detach --name proxy-redis --publish 6379:6379 redis:7.0 +``` + Let's create self-signed certificate by running: ```sh openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj "/CN=*.local.neon.build" @@ -130,7 +135,10 @@ openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key Then we need to build proxy with 'testing' feature and run, e.g.: ```sh -RUST_LOG=proxy LOGFMT=text cargo run -p proxy --bin proxy --features testing -- --auth-backend postgres --auth-endpoint 'postgresql://postgres:proxy-postgres@127.0.0.1:5432/postgres' -c server.crt -k server.key +RUST_LOG=proxy LOGFMT=text cargo run -p proxy --bin proxy --features testing -- \ + --auth-backend postgres --auth-endpoint 'postgresql://postgres:proxy-postgres@127.0.0.1:5432/postgres' \ + --redis-auth-type="plain" --redis-plain="redis://127.0.0.1:6379" \ + -c server.crt -k server.key ``` Now from client you can start a new session: diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index 691709ce2a..16a7dc7b67 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -522,15 +522,7 @@ pub async fn run() -> anyhow::Result<()> { maintenance_tasks.spawn(usage_metrics::task_main(metrics_config)); } - if let Either::Left(auth::Backend::ControlPlane(api, ())) = &auth_backend - && let crate::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api - && let Some(client) = redis_client - { - // project info cache and invalidation of that cache. - let cache = api.caches.project_info.clone(); - maintenance_tasks.spawn(notifications::task_main(client.clone(), cache.clone())); - maintenance_tasks.spawn(async move { cache.clone().gc_worker().await }); - + if let Some(client) = redis_client { // Try to connect to Redis 3 times with 1 + (0..0.1) second interval. // This prevents immediate exit and pod restart, // which can cause hammering of the redis in case of connection issues. @@ -560,6 +552,16 @@ pub async fn run() -> anyhow::Result<()> { } } } + + #[allow(irrefutable_let_patterns)] + if let Either::Left(auth::Backend::ControlPlane(api, ())) = &auth_backend + && let crate::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api + { + // project info cache and invalidation of that cache. + let cache = api.caches.project_info.clone(); + maintenance_tasks.spawn(notifications::task_main(client, cache.clone())); + maintenance_tasks.spawn(async move { cache.gc_worker().await }); + } } let maintenance = loop { From a0a7733b5aa657553a5b91bb0a3d4f6e3847e38b Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Fri, 11 Jul 2025 10:57:50 -0500 Subject: [PATCH 04/14] Use relative paths in submodule URL references (#12559) This is a nifty trick from the hadron repo that seems to help with SSH key dance. Signed-off-by: Tristan Partin --- .gitmodules | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.gitmodules b/.gitmodules index d1330bf28c..e381fb079e 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,16 +1,16 @@ [submodule "vendor/postgres-v14"] path = vendor/postgres-v14 - url = https://github.com/neondatabase/postgres.git + url = ../postgres.git branch = REL_14_STABLE_neon [submodule "vendor/postgres-v15"] path = vendor/postgres-v15 - url = https://github.com/neondatabase/postgres.git + url = ../postgres.git branch = REL_15_STABLE_neon [submodule "vendor/postgres-v16"] path = vendor/postgres-v16 - url = https://github.com/neondatabase/postgres.git + url = ../postgres.git branch = REL_16_STABLE_neon [submodule "vendor/postgres-v17"] path = vendor/postgres-v17 - url = https://github.com/neondatabase/postgres.git + url = ../postgres.git branch = REL_17_STABLE_neon From 3300207523008ab3dd922780c4d164bd4376a007 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 11 Jul 2025 19:05:22 +0300 Subject: [PATCH 05/14] Update working set size estimate without lock (#12570) Update the WSS estimate before acquring the lock, so that we don't need to hold the lock for so long. That seems safe to me, see added comment. I was planning to do this with the new rust-based communicator implementation anyway, but it might help a little with the current C implementation too. And more importantly, having this as a separate PR gives us a chance to review this aspect independently. --- pgxn/neon/file_cache.c | 77 +++++++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 0e316abd1d..2c87f139af 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -162,8 +162,34 @@ typedef struct FileCacheControl dlist_head lru; /* double linked list for LRU replacement * algorithm */ dlist_head holes; /* double linked list of punched holes */ - HyperLogLogState wss_estimation; /* estimation of working set size */ + ConditionVariable cv[N_COND_VARS]; /* turnstile of condition variables */ + + /* + * Estimation of working set size. + * + * This is not guarded by the lock. No locking is needed because all the + * writes to the "registers" are simple 64-bit stores, to update a + * timestamp. We assume that: + * + * - 64-bit stores are atomic. We could enforce that by using + * pg_atomic_uint64 instead of TimestampTz as the datatype in hll.h, but + * for now we just rely on it implicitly. + * + * - Even if they're not, and there is a race between two stores, it + * doesn't matter much which one wins because they're both updating the + * register with the current timestamp. Or you have a race between + * resetting the register and updating it, in which case it also doesn't + * matter much which one wins. + * + * - If they're not atomic, you might get an occasional "torn write" if + * you're really unlucky, but we tolerate that too. It just means that + * the estimate will be a little off, until the register is updated + * again. + */ + HyperLogLogState wss_estimation; + + /* Prewarmer state */ PrewarmWorkerState prewarm_workers[MAX_PREWARM_WORKERS]; size_t n_prewarm_workers; size_t n_prewarm_entries; @@ -1144,6 +1170,13 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); + /* Update working set size estimate for the blocks */ + for (int i = 0; i < nblocks; i++) + { + tag.blockNum = blkno + i; + addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); + } + /* * For every chunk that has blocks we're interested in, we * 1. get the chunk header @@ -1222,14 +1255,6 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, } entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL); - - /* Approximate working set for the blocks assumed in this entry */ - for (int i = 0; i < blocks_in_chunk; i++) - { - tag.blockNum = blkno + i; - addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); - } - if (entry == NULL) { /* Pages are not cached */ @@ -1506,9 +1531,15 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, return false; CopyNRelFileInfoToBufTag(tag, rinfo); + CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); tag.forkNum = forknum; - CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); + /* Update working set size estimate for the blocks */ + if (lfc_prewarm_update_ws_estimation) + { + tag.blockNum = blkno; + addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); + } tag.blockNum = blkno - chunk_offs; hash = get_hash_value(lfc_hash, &tag); @@ -1526,19 +1557,13 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, if (lwlsn > lsn) { - elog(DEBUG1, "Skip LFC write for %d because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X", + elog(DEBUG1, "Skip LFC write for %u because LwLSN=%X/%X is greater than not_nodified_since LSN %X/%X", blkno, LSN_FORMAT_ARGS(lwlsn), LSN_FORMAT_ARGS(lsn)); LWLockRelease(lfc_lock); return false; } entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); - - if (lfc_prewarm_update_ws_estimation) - { - tag.blockNum = blkno; - addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); - } if (found) { state = GET_STATE(entry, chunk_offs); @@ -1651,9 +1676,15 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, return; CopyNRelFileInfoToBufTag(tag, rinfo); + CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); tag.forkNum = forkNum; - CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); + /* Update working set size estimate for the blocks */ + for (int i = 0; i < nblocks; i++) + { + tag.blockNum = blkno + i; + addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); + } LWLockAcquire(lfc_lock, LW_EXCLUSIVE); @@ -1694,14 +1725,6 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, cv = &lfc_ctl->cv[hash % N_COND_VARS]; entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found); - - /* Approximate working set for the blocks assumed in this entry */ - for (int i = 0; i < blocks_in_chunk; i++) - { - tag.blockNum = blkno + i; - addSHLL(&lfc_ctl->wss_estimation, hash_bytes((uint8_t const*)&tag, sizeof(tag))); - } - if (found) { /* @@ -2150,11 +2173,9 @@ lfc_approximate_working_set_size_seconds(time_t duration, bool reset) if (lfc_size_limit == 0) return -1; - LWLockAcquire(lfc_lock, LW_SHARED); dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration); if (reset) memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs); - LWLockRelease(lfc_lock); return dc; } From 379259bdd75edae91fad0d180fa513bff3e1f92b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 11 Jul 2025 19:07:14 +0200 Subject: [PATCH 06/14] storcon: don't error log on timeline delete if tenant migration is in progress (#12523) Fixes [LKB-61](https://databricks.atlassian.net/browse/LKB-61): `test_timeline_archival_chaos` being flaky with storcon error `Requested tenant is missing`. When a tenant migration is ongoing, and the attach request has been sent to the new location, but the attach hasn't finished yet, it is possible for the pageserver to return a 412 precondition failed HTTP error on timeline deletion, because it is being sent to the new location already. That one we would previously log via sth like: ``` ERROR request{method=DELETE path=/v1/tenant/1f544a11c90d1afd7af9b26e48985a4e/timeline/32818fb3ebf07cb7f06805429d7dee38 request_id=c493c04b-7f33-46d2-8a65-aac8a5516055}: Error processing HTTP request: InternalServerError(Error deleting timeline 32 818fb3ebf07cb7f06805429d7dee38 on 1f544a11c90d1afd7af9b26e48985a4e on node 2 (localhost): pageserver API: Precondition failed: Requested tenant is missing ``` This patch changes that and makes us return a more reasonable resource unavailable error. Not sure how scalable this is with tenants with a large number of shards, but that's a different discussion (we'd probably need a limited amount of per-storcon retries). example [link](https://neon-github-public-dev.s3.amazonaws.com/reports/pr-12398/15981821532/index.html#/testresult/e7785dfb1238d92f). --- storage_controller/src/service.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 31d149c5ac..0907907edc 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -5206,6 +5206,9 @@ impl Service { match res { Ok(ok) => Ok(ok), Err(mgmt_api::Error::ApiError(StatusCode::CONFLICT, _)) => Ok(StatusCode::CONFLICT), + Err(mgmt_api::Error::ApiError(StatusCode::PRECONDITION_FAILED, msg)) if msg.contains("Requested tenant is missing") => { + Err(ApiError::ResourceUnavailable("Tenant migration in progress".into())) + }, Err(mgmt_api::Error::ApiError(StatusCode::SERVICE_UNAVAILABLE, msg)) => Err(ApiError::ResourceUnavailable(msg.into())), Err(e) => { Err( From 63ca084696f4dd226bfea1abae66dcb3234d1051 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Fri, 11 Jul 2025 14:37:55 -0400 Subject: [PATCH 07/14] fix(pageserver): downgrade wal apply error during gc-compaction (#12518) ## Problem close LKB-162 close https://github.com/neondatabase/cloud/issues/30665, related to https://github.com/neondatabase/cloud/issues/29434 We see a lot of errors like: ``` 2025-05-22T23:06:14.928959Z ERROR compaction_loop{tenant_id=? shard_id=0304}:run:gc_compact_timeline{timeline_id=?}: error applying 4 WAL records 35/DC0DF0B8..3B/E43188C0 (8119 bytes) to key 000000067F0000400500006027000000B9D0, from base image with LSN 0/0 to reconstruct page image at LSN 61/150B9B20 n_attempts=0: apply_wal_records Caused by: 0: read walredo stdout 1: early eof ``` which is an acceptable form of error and we should downgrade it to warning. ## Summary of changes walredo error during gc-compaction is expected when the data below the gc horizon does not contain a full key history. This is possible in some rare cases of gc that is only able to remove data in the middle of the history but not all earlier history when a full keyspace gets deleted. Signed-off-by: Alex Chi Z --- pageserver/src/walredo.rs | 46 +++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index c6d3cafe9a..f053c9ed37 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -147,6 +147,16 @@ pub enum RedoAttemptType { GcCompaction, } +impl std::fmt::Display for RedoAttemptType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RedoAttemptType::ReadPage => write!(f, "read page"), + RedoAttemptType::LegacyCompaction => write!(f, "legacy compaction"), + RedoAttemptType::GcCompaction => write!(f, "gc compaction"), + } + } +} + /// /// Public interface of WAL redo manager /// @@ -199,6 +209,7 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, max_retry_attempts, + redo_attempt_type, ) .await }; @@ -221,6 +232,7 @@ impl PostgresRedoManager { self.conf.wal_redo_timeout, pg_version, max_retry_attempts, + redo_attempt_type, ) .await } @@ -445,6 +457,7 @@ impl PostgresRedoManager { wal_redo_timeout: Duration, pg_version: PgMajorVersion, max_retry_attempts: u32, + redo_attempt_type: RedoAttemptType, ) -> Result { *(self.last_redo_at.lock().unwrap()) = Some(Instant::now()); @@ -485,17 +498,28 @@ impl PostgresRedoManager { ); if let Err(e) = result.as_ref() { - error!( - "error applying {} WAL records {}..{} ({} bytes) to key {key}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}", - records.len(), - records.first().map(|p| p.0).unwrap_or(Lsn(0)), - records.last().map(|p| p.0).unwrap_or(Lsn(0)), - nbytes, - base_img_lsn, - lsn, - n_attempts, - e, - ); + macro_rules! message { + ($level:tt) => { + $level!( + "error applying {} WAL records {}..{} ({} bytes) to key {} during {}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}", + records.len(), + records.first().map(|p| p.0).unwrap_or(Lsn(0)), + records.last().map(|p| p.0).unwrap_or(Lsn(0)), + nbytes, + key, + redo_attempt_type, + base_img_lsn, + lsn, + n_attempts, + e, + ) + } + } + match redo_attempt_type { + RedoAttemptType::ReadPage => message!(error), + RedoAttemptType::LegacyCompaction => message!(error), + RedoAttemptType::GcCompaction => message!(warn), + } } result.map_err(Error::Other) From 4566b12a22876f1110b77da9e7b75615c9963b38 Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Fri, 11 Jul 2025 20:56:39 +0200 Subject: [PATCH 08/14] NEON: Finish Zenith->Neon rename (#12566) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Even though we're now part of Databricks, let's at least make this part consistent. ## Summary of changes - PG14: https://github.com/neondatabase/postgres/pull/669 - PG15: https://github.com/neondatabase/postgres/pull/670 - PG16: https://github.com/neondatabase/postgres/pull/671 - PG17: https://github.com/neondatabase/postgres/pull/672 --------- Co-authored-by: Arpad Müller --- compute_tools/src/compute.rs | 23 +++++++++++++ control_plane/src/endpoint.rs | 3 +- docs/core_changes.md | 7 ++-- pageserver/src/basebackup.rs | 33 +++++++++++-------- pageserver/src/import_datadir.rs | 14 ++++---- pgxn/neon_test_utils/neontest.c | 10 +++--- pgxn/typedefs.list | 22 ++++++------- test_runner/fixtures/neon_fixtures.py | 1 + .../regress/test_timeline_detach_ancestor.py | 8 ++--- vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- vendor/postgres-v17 | 2 +- vendor/revisions.json | 8 ++--- 14 files changed, 84 insertions(+), 53 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index c05cc229a2..2e0b7d7b2e 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1040,6 +1040,8 @@ impl ComputeNode { PageserverProtocol::Grpc => self.try_get_basebackup_grpc(spec, lsn)?, }; + self.fix_zenith_signal_neon_signal()?; + let mut state = self.state.lock().unwrap(); state.metrics.pageserver_connect_micros = connected.duration_since(started).as_micros() as u64; @@ -1049,6 +1051,27 @@ impl ComputeNode { Ok(()) } + /// Move the Zenith signal file to Neon signal file location. + /// This makes Compute compatible with older PageServers that don't yet + /// know about the Zenith->Neon rename. + fn fix_zenith_signal_neon_signal(&self) -> Result<()> { + let datadir = Path::new(&self.params.pgdata); + + let neonsig = datadir.join("neon.signal"); + + if neonsig.is_file() { + return Ok(()); + } + + let zenithsig = datadir.join("zenith.signal"); + + if zenithsig.is_file() { + fs::copy(zenithsig, neonsig)?; + } + + Ok(()) + } + /// Fetches a basebackup via gRPC. The connstring must use grpc://. Returns the timestamp when /// the connection was established, and the (compressed) size of the basebackup. fn try_get_basebackup_grpc(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<(Instant, usize)> { diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index ad2067e0f2..91a62b0ca4 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -32,7 +32,8 @@ //! config.json - passed to `compute_ctl` //! pgdata/ //! postgresql.conf - copy of postgresql.conf created by `compute_ctl` -//! zenith.signal +//! neon.signal +//! zenith.signal - copy of neon.signal, for backward compatibility //! //! ``` //! diff --git a/docs/core_changes.md b/docs/core_changes.md index 1388317728..abfd20af26 100644 --- a/docs/core_changes.md +++ b/docs/core_changes.md @@ -129,9 +129,10 @@ segment to bootstrap the WAL writing, but it doesn't contain the checkpoint reco changes in xlog.c, to allow starting the compute node without reading the last checkpoint record from WAL. -This includes code to read the `zenith.signal` file, which tells the startup code the LSN to start -at. When the `zenith.signal` file is present, the startup uses that LSN instead of the last -checkpoint's LSN. The system is known to be consistent at that LSN, without any WAL redo. +This includes code to read the `neon.signal` (also `zenith.signal`) file, which tells the startup +code the LSN to start at. When the `neon.signal` file is present, the startup uses that LSN +instead of the last checkpoint's LSN. The system is known to be consistent at that LSN, without +any WAL redo. ### How to get rid of the patch diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 36dada1e89..1a44c80e2d 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -114,7 +114,7 @@ where // Compute postgres doesn't have any previous WAL files, but the first // record that it's going to write needs to include the LSN of the // previous record (xl_prev). We include prev_record_lsn in the - // "zenith.signal" file, so that postgres can read it during startup. + // "neon.signal" file, so that postgres can read it during startup. // // We don't keep full history of record boundaries in the page server, // however, only the predecessor of the latest record on each @@ -751,34 +751,39 @@ where // // Add generated pg_control file and bootstrap WAL segment. - // Also send zenith.signal file with extra bootstrap data. + // Also send neon.signal and zenith.signal file with extra bootstrap data. // async fn add_pgcontrol_file( &mut self, pg_control_bytes: Bytes, system_identifier: u64, ) -> Result<(), BasebackupError> { - // add zenith.signal file - let mut zenith_signal = String::new(); + // add neon.signal file + let mut neon_signal = String::new(); if self.prev_record_lsn == Lsn(0) { if self.timeline.is_ancestor_lsn(self.lsn) { - write!(zenith_signal, "PREV LSN: none") + write!(neon_signal, "PREV LSN: none") .map_err(|e| BasebackupError::Server(e.into()))?; } else { - write!(zenith_signal, "PREV LSN: invalid") + write!(neon_signal, "PREV LSN: invalid") .map_err(|e| BasebackupError::Server(e.into()))?; } } else { - write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn) + write!(neon_signal, "PREV LSN: {}", self.prev_record_lsn) .map_err(|e| BasebackupError::Server(e.into()))?; } - self.ar - .append( - &new_tar_header("zenith.signal", zenith_signal.len() as u64)?, - zenith_signal.as_bytes(), - ) - .await - .map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,zenith.signal"))?; + + // TODO: Remove zenith.signal once all historical computes have been replaced + // ... and thus support the neon.signal file. + for signalfilename in ["neon.signal", "zenith.signal"] { + self.ar + .append( + &new_tar_header(signalfilename, neon_signal.len() as u64)?, + neon_signal.as_bytes(), + ) + .await + .map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,neon.signal"))?; + } //send pg_control let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?; diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 96fe0c1078..409cc2e3c5 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -610,13 +610,13 @@ async fn import_file( debug!("imported twophase file"); } else if file_path.starts_with("pg_wal") { debug!("found wal file in base section. ignore it"); - } else if file_path.starts_with("zenith.signal") { + } else if file_path.starts_with("zenith.signal") || file_path.starts_with("neon.signal") { // Parse zenith signal file to set correct previous LSN let bytes = read_all_bytes(reader).await?; - // zenith.signal format is "PREV LSN: prev_lsn" + // neon.signal format is "PREV LSN: prev_lsn" // TODO write serialization and deserialization in the same place. - let zenith_signal = std::str::from_utf8(&bytes)?.trim(); - let prev_lsn = match zenith_signal { + let neon_signal = std::str::from_utf8(&bytes)?.trim(); + let prev_lsn = match neon_signal { "PREV LSN: none" => Lsn(0), "PREV LSN: invalid" => Lsn(0), other => { @@ -624,17 +624,17 @@ async fn import_file( split[1] .trim() .parse::() - .context("can't parse zenith.signal")? + .context("can't parse neon.signal")? } }; - // zenith.signal is not necessarily the last file, that we handle + // neon.signal is not necessarily the last file, that we handle // but it is ok to call `finish_write()`, because final `modification.commit()` // will update lsn once more to the final one. let writer = modification.tline.writer().await; writer.finish_write(prev_lsn); - debug!("imported zenith signal {}", prev_lsn); + debug!("imported neon signal {}", prev_lsn); } else if file_path.starts_with("pg_tblspc") { // TODO Backups exported from neon won't have pg_tblspc, but we will need // this to import arbitrary postgres databases. diff --git a/pgxn/neon_test_utils/neontest.c b/pgxn/neon_test_utils/neontest.c index d37412f674..5f880dfd23 100644 --- a/pgxn/neon_test_utils/neontest.c +++ b/pgxn/neon_test_utils/neontest.c @@ -236,13 +236,13 @@ clear_buffer_cache(PG_FUNCTION_ARGS) bool save_neon_test_evict; /* - * Temporarily set the zenith_test_evict GUC, so that when we pin and + * Temporarily set the neon_test_evict GUC, so that when we pin and * unpin a buffer, the buffer is evicted. We use that hack to evict all * buffers, as there is no explicit "evict this buffer" function in the * buffer manager. */ - save_neon_test_evict = zenith_test_evict; - zenith_test_evict = true; + save_neon_test_evict = neon_test_evict; + neon_test_evict = true; PG_TRY(); { /* Scan through all the buffers */ @@ -273,7 +273,7 @@ clear_buffer_cache(PG_FUNCTION_ARGS) /* * Pin the buffer, and release it again. Because we have - * zenith_test_evict==true, this will evict the page from the + * neon_test_evict==true, this will evict the page from the * buffer cache if no one else is holding a pin on it. */ if (isvalid) @@ -286,7 +286,7 @@ clear_buffer_cache(PG_FUNCTION_ARGS) PG_FINALLY(); { /* restore the GUC */ - zenith_test_evict = save_neon_test_evict; + neon_test_evict = save_neon_test_evict; } PG_END_TRY(); diff --git a/pgxn/typedefs.list b/pgxn/typedefs.list index 760f384212..3ea8b3b091 100644 --- a/pgxn/typedefs.list +++ b/pgxn/typedefs.list @@ -2953,17 +2953,17 @@ XmlTableBuilderData YYLTYPE YYSTYPE YY_BUFFER_STATE -ZenithErrorResponse -ZenithExistsRequest -ZenithExistsResponse -ZenithGetPageRequest -ZenithGetPageResponse -ZenithMessage -ZenithMessageTag -ZenithNblocksRequest -ZenithNblocksResponse -ZenithRequest -ZenithResponse +NeonErrorResponse +NeonExistsRequest +NeonExistsResponse +NeonGetPageRequest +NeonGetPageResponse +NeonMessage +NeonMessageTag +NeonNblocksRequest +NeonNblocksResponse +NeonRequest +NeonResponse _SPI_connection _SPI_plan __AssignProcessToJobObject diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index a7b7f0e74d..b9fff05c6c 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -5409,6 +5409,7 @@ SKIP_FILES = frozenset( ( "pg_internal.init", "pg.log", + "neon.signal", "zenith.signal", "pg_hba.conf", "postgresql.conf", diff --git a/test_runner/regress/test_timeline_detach_ancestor.py b/test_runner/regress/test_timeline_detach_ancestor.py index c0f163db32..45b7af719e 100644 --- a/test_runner/regress/test_timeline_detach_ancestor.py +++ b/test_runner/regress/test_timeline_detach_ancestor.py @@ -209,9 +209,9 @@ def test_ancestor_detach_branched_from( client.timeline_delete(env.initial_tenant, env.initial_timeline) wait_timeline_detail_404(client, env.initial_tenant, env.initial_timeline) - # because we do the fullbackup from ancestor at the branch_lsn, the zenith.signal is always different - # as there is always "PREV_LSN: invalid" for "before" - skip_files = {"zenith.signal"} + # because we do the fullbackup from ancestor at the branch_lsn, the neon.signal and/or zenith.signal is always + # different as there is always "PREV_LSN: invalid" for "before" + skip_files = {"zenith.signal", "neon.signal"} assert_pageserver_backups_equal(fullbackup_before, fullbackup_after, skip_files) @@ -767,7 +767,7 @@ def test_compaction_induced_by_detaches_in_history( env.pageserver, env.initial_tenant, branch_timeline_id, branch_lsn, fullbackup_after ) - # we don't need to skip any files, because zenith.signal will be identical + # we don't need to skip any files, because neon.signal will be identical assert_pageserver_backups_equal(fullbackup_before, fullbackup_after, set()) diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 9085654ee8..8ce1f52303 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 9085654ee8022d5cc4ca719380a1dc53e5e3246f +Subproject commit 8ce1f52303aec29e098309347b57c01a1962e221 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 8c3249f36c..afd46987f3 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 8c3249f36c7df6ac0efb8ee9f1baf4aa1b83e5c9 +Subproject commit afd46987f3da50c9146a8aa59380052df0862c06 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 7a4c0eacae..e08c8d5f15 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 7a4c0eacaeb9b97416542fa19103061c166460b1 +Subproject commit e08c8d5f1576ca0487d14d154510499c5f12adfb diff --git a/vendor/postgres-v17 b/vendor/postgres-v17 index db424d42d7..353c725b0c 160000 --- a/vendor/postgres-v17 +++ b/vendor/postgres-v17 @@ -1 +1 @@ -Subproject commit db424d42d748f8ad91ac00e28db2c7f2efa42f7f +Subproject commit 353c725b0c76cc82b15af21d8360d03391dc6814 diff --git a/vendor/revisions.json b/vendor/revisions.json index b260698c86..992aa405b1 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,18 +1,18 @@ { "v17": [ "17.5", - "db424d42d748f8ad91ac00e28db2c7f2efa42f7f" + "353c725b0c76cc82b15af21d8360d03391dc6814" ], "v16": [ "16.9", - "7a4c0eacaeb9b97416542fa19103061c166460b1" + "e08c8d5f1576ca0487d14d154510499c5f12adfb" ], "v15": [ "15.13", - "8c3249f36c7df6ac0efb8ee9f1baf4aa1b83e5c9" + "afd46987f3da50c9146a8aa59380052df0862c06" ], "v14": [ "14.18", - "9085654ee8022d5cc4ca719380a1dc53e5e3246f" + "8ce1f52303aec29e098309347b57c01a1962e221" ] } From cb991fba421999e390c9debfc39fb39a636fe1e9 Mon Sep 17 00:00:00 2001 From: HaoyuHuang Date: Fri, 11 Jul 2025 12:27:55 -0700 Subject: [PATCH 09/14] A few more PS changes (#12552) # TLDR Problem-I is a bug fix. The rest are no-ops. ## Problem I Page server checks image layer creation based on the elapsed time but this check depends on the current logical size, which is only computed on shard 0. Thus, for non-0 shards, the check will be ineffective and image creation will never be done for idle tenants. ## Summary of changes I This PR fixes the problem by simply removing the dependency on current logical size. ## Summary of changes II This PR adds a timeout when calling page server to split shard to make sure SC does not wait for the API call forever. Currently the PR doesn't adds any retry logic because it's not clear whether page server shard split can be safely retried if the existing operation is still ongoing or left the storage in a bad state. Thus it's better to abort the whole operation and restart. ## Problem III `test_remote_failures` requires PS to be compiled in the testing mode. For PS in dev/staging, they are compiled without this mode. ## Summary of changes III Remove the restriction and also increase the number of total failures allowed. ## Summary of changes IV remove test on PS getpage http route. --------- Co-authored-by: Chen Luo Co-authored-by: Yecheng Yang Co-authored-by: Vlad Lazar --- control_plane/src/local_env.rs | 4 + control_plane/src/storage_controller.rs | 7 ++ libs/remote_storage/src/simulate_failures.rs | 1 + libs/utils/src/env.rs | 3 +- pageserver/src/bin/pageserver.rs | 5 -- pageserver/src/http/routes.rs | 2 +- pageserver/src/tenant/mgr.rs | 2 + pageserver/src/tenant/timeline.rs | 48 ++++++----- storage_controller/src/main.rs | 7 ++ storage_controller/src/service.rs | 27 ++++++- test_runner/regress/test_compaction.py | 62 ++++++++++++++ test_runner/regress/test_sharding.py | 85 ++++++++++++++++++++ 12 files changed, 226 insertions(+), 27 deletions(-) diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index d0611113e8..d34dd39f61 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -217,6 +217,9 @@ pub struct NeonStorageControllerConf { pub posthog_config: Option, pub kick_secondary_downloads: Option, + + #[serde(with = "humantime_serde")] + pub shard_split_request_timeout: Option, } impl NeonStorageControllerConf { @@ -250,6 +253,7 @@ impl Default for NeonStorageControllerConf { timeline_safekeeper_count: None, posthog_config: None, kick_secondary_downloads: None, + shard_split_request_timeout: None, } } } diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index dc6c82f504..f996f39967 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -648,6 +648,13 @@ impl StorageController { args.push(format!("--timeline-safekeeper-count={sk_cnt}")); } + if let Some(duration) = self.config.shard_split_request_timeout { + args.push(format!( + "--shard-split-request-timeout={}", + humantime::Duration::from(duration) + )); + } + let mut envs = vec![ ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 30d116f57c..e895380192 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -31,6 +31,7 @@ pub struct UnreliableWrapper { /* BEGIN_HADRON */ // This the probability of failure for each operation, ranged from [0, 100]. // The probability is default to 100, which means that all operations will fail. + // Storage will fail by probability up to attempts_to_fail times. attempt_failure_probability: u64, /* END_HADRON */ } diff --git a/libs/utils/src/env.rs b/libs/utils/src/env.rs index cc1cbf8009..0b3b5e6c4f 100644 --- a/libs/utils/src/env.rs +++ b/libs/utils/src/env.rs @@ -47,6 +47,7 @@ where /* BEGIN_HADRON */ pub enum DeploymentMode { + Local, Dev, Staging, Prod, @@ -64,7 +65,7 @@ pub fn get_deployment_mode() -> Option { } }, Err(_) => { - tracing::error!("DEPLOYMENT_MODE not set"); + // tracing::error!("DEPLOYMENT_MODE not set"); None } } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 299fe7e159..dfb8b437c3 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -917,11 +917,6 @@ async fn create_remote_storage_client( // If `test_remote_failures` is non-zero, wrap the client with a // wrapper that simulates failures. if conf.test_remote_failures > 0 { - if !cfg!(feature = "testing") { - anyhow::bail!( - "test_remote_failures option is not available because pageserver was compiled without the 'testing' feature" - ); - } info!( "Simulating remote failures for first {} attempts of each op", conf.test_remote_failures diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index d839bac557..0d40c5ecf7 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -4183,7 +4183,7 @@ pub fn make_router( }) .get( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/getpage", - |r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler), + |r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler), ) .get( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/touchpage", diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 15853d3614..52f67abde5 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1678,6 +1678,8 @@ impl TenantManager { // Phase 6: Release the InProgress on the parent shard drop(parent_slot_guard); + utils::pausable_failpoint!("shard-split-post-finish-pause"); + Ok(child_shards) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 718ea925b7..fe622713e9 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5604,10 +5604,11 @@ impl Timeline { /// Predicate function which indicates whether we should check if new image layers /// are required. Since checking if new image layers are required is expensive in /// terms of CPU, we only do it in the following cases: - /// 1. If the timeline has ingested sufficient WAL to justify the cost + /// 1. If the timeline has ingested sufficient WAL to justify the cost or ... /// 2. If enough time has passed since the last check: /// 1. For large tenants, we wish to perform the check more often since they - /// suffer from the lack of image layers + /// suffer from the lack of image layers. Note that we assume sharded tenants + /// to be large since non-zero shards do not track the logical size. /// 2. For small tenants (that can mostly fit in RAM), we use a much longer interval fn should_check_if_image_layers_required(self: &Arc, lsn: Lsn) -> bool { let large_timeline_threshold = self.conf.image_layer_generation_large_timeline_threshold; @@ -5621,30 +5622,39 @@ impl Timeline { let distance_based_decision = distance.0 >= min_distance; - let mut time_based_decision = false; let mut last_check_instant = self.last_image_layer_creation_check_instant.lock().unwrap(); - if let CurrentLogicalSize::Exact(logical_size) = self.current_logical_size.current_size() { - let check_required_after = - if Some(Into::::into(&logical_size)) >= large_timeline_threshold { - self.get_checkpoint_timeout() - } else { - Duration::from_secs(3600 * 48) - }; - - time_based_decision = match *last_check_instant { - Some(last_check) => { - let elapsed = last_check.elapsed(); - elapsed >= check_required_after + let check_required_after = (|| { + if self.shard_identity.is_unsharded() { + if let CurrentLogicalSize::Exact(logical_size) = + self.current_logical_size.current_size() + { + if Some(Into::::into(&logical_size)) < large_timeline_threshold { + return Duration::from_secs(3600 * 48); + } } - None => true, - }; - } + } + + self.get_checkpoint_timeout() + })(); + + let time_based_decision = match *last_check_instant { + Some(last_check) => { + let elapsed = last_check.elapsed(); + elapsed >= check_required_after + } + None => true, + }; // Do the expensive delta layer counting only if this timeline has ingested sufficient // WAL since the last check or a checkpoint timeout interval has elapsed since the last // check. let decision = distance_based_decision || time_based_decision; - + tracing::info!( + "Decided to check image layers: {}. Distance-based decision: {}, time-based decision: {}", + decision, + distance_based_decision, + time_based_decision + ); if decision { self.last_image_layer_creation_check_at.store(lsn); *last_check_instant = Some(Instant::now()); diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 2a851dc25b..5d21feeb10 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -222,6 +222,9 @@ struct Cli { /// Primarily useful for testing to reduce test execution time. #[arg(long, default_value = "false", action=ArgAction::Set)] kick_secondary_downloads: bool, + + #[arg(long)] + shard_split_request_timeout: Option, } enum StrictMode { @@ -470,6 +473,10 @@ async fn async_main() -> anyhow::Result<()> { timeline_safekeeper_count: args.timeline_safekeeper_count, posthog_config: posthog_config.clone(), kick_secondary_downloads: args.kick_secondary_downloads, + shard_split_request_timeout: args + .shard_split_request_timeout + .map(humantime::Duration::into) + .unwrap_or(Duration::MAX), }; // Validate that we can connect to the database diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 0907907edc..638cb410fa 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -60,6 +60,7 @@ use tokio::sync::mpsc::error::TrySendError; use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use utils::completion::Barrier; +use utils::env; use utils::generation::Generation; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; @@ -483,6 +484,9 @@ pub struct Config { /// When set, actively checks and initiates heatmap downloads/uploads. pub kick_secondary_downloads: bool, + + /// Timeout used for HTTP client of split requests. [`Duration::MAX`] if None. + pub shard_split_request_timeout: Duration, } impl From for ApiError { @@ -6406,18 +6410,39 @@ impl Service { // TODO: issue split calls concurrently (this only matters once we're splitting // N>1 shards into M shards -- initially we're usually splitting 1 shard into N). + // HADRON: set a timeout for splitting individual shards on page servers. + // Currently we do not perform any retry because it's not clear if page server can handle + // partially split shards correctly. + let shard_split_timeout = + if let Some(env::DeploymentMode::Local) = env::get_deployment_mode() { + Duration::from_secs(30) + } else { + self.config.shard_split_request_timeout + }; + let mut http_client_builder = reqwest::ClientBuilder::new() + .pool_max_idle_per_host(0) + .timeout(shard_split_timeout); + + for ssl_ca_cert in &self.config.ssl_ca_certs { + http_client_builder = http_client_builder.add_root_certificate(ssl_ca_cert.clone()); + } + let http_client = http_client_builder + .build() + .expect("Failed to construct HTTP client"); for target in &targets { let ShardSplitTarget { parent_id, node, child_ids, } = target; + let client = PageserverClient::new( node.get_id(), - self.http_client.clone(), + http_client.clone(), node.base_url(), self.config.pageserver_jwt_token.as_deref(), ); + let response = client .tenant_shard_split( *parent_id, diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index ab02314288..963a19d640 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -7,6 +7,7 @@ import time from enum import StrEnum import pytest +from fixtures.common_types import TenantShardId from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, @@ -960,6 +961,67 @@ def get_layer_map(env, tenant_shard_id, timeline_id, ps_id): return image_layer_count, delta_layer_count +def test_image_layer_creation_time_threshold(neon_env_builder: NeonEnvBuilder): + """ + Tests that image layers can be created when the time threshold is reached on non-0 shards. + """ + tenant_conf = { + "compaction_threshold": "100", + "image_creation_threshold": "100", + "image_layer_creation_check_threshold": "1", + # disable distance based image layer creation check + "checkpoint_distance": 10 * 1024 * 1024 * 1024, + "checkpoint_timeout": "100ms", + "image_layer_force_creation_period": "1s", + "pitr_interval": "10s", + "gc_period": "1s", + "compaction_period": "1s", + "lsn_lease_length": "1s", + } + + # consider every tenant large to run the image layer generation check more eagerly + neon_env_builder.pageserver_config_override = ( + "image_layer_generation_large_timeline_threshold=0" + ) + + neon_env_builder.num_pageservers = 1 + neon_env_builder.num_safekeepers = 1 + env = neon_env_builder.init_start( + initial_tenant_conf=tenant_conf, + initial_tenant_shard_count=2, + initial_tenant_shard_stripe_size=1, + ) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + endpoint = env.endpoints.create_start("main") + endpoint.safe_psql("CREATE TABLE foo (id INTEGER, val text)") + + for v in range(10): + endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))") + + tenant_shard_id = TenantShardId(tenant_id, 1, 2) + + # Generate some rows. + for v in range(20): + endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))") + + # restart page server so that logical size on non-0 shards is missing + env.pageserver.restart() + + (old_images, old_deltas) = get_layer_map(env, tenant_shard_id, timeline_id, 0) + log.info(f"old images: {old_images}, old deltas: {old_deltas}") + + def check_image_creation(): + (new_images, old_deltas) = get_layer_map(env, tenant_shard_id, timeline_id, 0) + log.info(f"images: {new_images}, deltas: {old_deltas}") + assert new_images > old_images + + wait_until(check_image_creation) + + endpoint.stop_and_destroy() + + def test_image_layer_force_creation_period(neon_env_builder: NeonEnvBuilder): """ Tests that page server can force creating new images if image_layer_force_creation_period is enabled diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 8ff767eca4..5549105188 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -1673,6 +1673,91 @@ def test_shard_resolve_during_split_abort(neon_env_builder: NeonEnvBuilder): # END_HADRON +# HADRON +@pytest.mark.skip(reason="The backpressure change has not been merged yet.") +def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder): + """ + Tests back pressure knobs are enforced on the per shard basis instead of at the tenant level. + """ + init_shard_count = 4 + neon_env_builder.num_pageservers = init_shard_count + stripe_size = 1 + + env = neon_env_builder.init_start( + initial_tenant_shard_count=init_shard_count, + initial_tenant_shard_stripe_size=stripe_size, + initial_tenant_conf={ + # disable auto-flush of shards and set max_replication_flush_lag as 15MB. + # The backpressure parameters must be enforced at the shard level to avoid stalling PG. + "checkpoint_distance": 1 * 1024 * 1024 * 1024, + "checkpoint_timeout": "1h", + }, + ) + + endpoint = env.endpoints.create( + "main", + config_lines=[ + "max_replication_write_lag = 0", + "max_replication_apply_lag = 0", + "max_replication_flush_lag = 15MB", + "neon.max_cluster_size = 10GB", + ], + ) + endpoint.respec(skip_pg_catalog_updates=False) # Needed for databricks_system to get created. + endpoint.start() + + # generate 20MB of data + endpoint.safe_psql( + "CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 20000) s;" + ) + res = endpoint.safe_psql( + "SELECT neon.backpressure_throttling_time() as throttling_time", dbname="databricks_system" + )[0] + assert res[0] == 0, f"throttling_time should be 0, but got {res[0]}" + + endpoint.stop() + + +# HADRON +def test_shard_split_page_server_timeout(neon_env_builder: NeonEnvBuilder): + """ + Tests that shard split can correctly handle page server timeouts and abort the split + """ + init_shard_count = 2 + neon_env_builder.num_pageservers = 1 + stripe_size = 1 + + if neon_env_builder.storage_controller_config is None: + neon_env_builder.storage_controller_config = {"shard_split_request_timeout": "5s"} + else: + neon_env_builder.storage_controller_config["shard_split_request_timeout"] = "5s" + + env = neon_env_builder.init_start( + initial_tenant_shard_count=init_shard_count, + initial_tenant_shard_stripe_size=stripe_size, + ) + + env.storage_controller.allowed_errors.extend( + [ + ".*Enqueuing background abort.*", + ".*failpoint.*", + ".*Failed to abort.*", + ".*Exclusive lock by ShardSplit was held.*", + ] + ) + env.pageserver.allowed_errors.extend([".*request was dropped before completing.*"]) + + endpoint1 = env.endpoints.create_start(branch_name="main") + + env.pageserver.http_client().configure_failpoints(("shard-split-post-finish-pause", "pause")) + + with pytest.raises(StorageControllerApiException): + env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=4) + + env.pageserver.http_client().configure_failpoints(("shard-split-post-finish-pause", "off")) + endpoint1.stop_and_destroy() + + def test_sharding_backpressure(neon_env_builder: NeonEnvBuilder): """ Check a scenario when one of the shards is much slower than others. From 380d167b7ca2c8312fafffef30ae8cbdea7fd8a0 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Fri, 11 Jul 2025 21:35:42 +0200 Subject: [PATCH 10/14] proxy: For cancellation data replace HSET+EXPIRE/HGET with SET..EX/GET (#12553) ## Problem To store cancellation data we send two commands to redis because the redis server version doesn't support HSET with EX. Also, HSET is not really needed. ## Summary of changes * Replace the HSET + EXPIRE command pair with one SET .. EX command. * Replace HGET with GET. * Leave a workaround for old keys set with HSET. * Replace some anyhow errors with specific errors to surface the WRONGTYPE error from redis. --- Cargo.lock | 1 + proxy/Cargo.toml | 3 +- proxy/src/batch.rs | 68 +++++++---- proxy/src/cancellation.rs | 111 ++++++++++++------ proxy/src/metrics.rs | 6 +- .../connection_with_credentials_provider.rs | 24 +++- proxy/src/redis/elasticache.rs | 20 +++- proxy/src/redis/kv_ops.rs | 16 ++- 8 files changed, 175 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 025f4e4116..4323254f0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5289,6 +5289,7 @@ dependencies = [ "async-trait", "atomic-take", "aws-config", + "aws-credential-types", "aws-sdk-iam", "aws-sigv4", "base64 0.22.1", diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index ce8610be24..0a406d1ca8 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -16,6 +16,7 @@ async-compression.workspace = true async-trait.workspace = true atomic-take.workspace = true aws-config.workspace = true +aws-credential-types.workspace = true aws-sdk-iam.workspace = true aws-sigv4.workspace = true base64.workspace = true @@ -127,4 +128,4 @@ rstest.workspace = true walkdir.workspace = true rand_distr = "0.4" tokio-postgres.workspace = true -tracing-test = "0.2" \ No newline at end of file +tracing-test = "0.2" diff --git a/proxy/src/batch.rs b/proxy/src/batch.rs index 33e08797f2..cf866ab9a3 100644 --- a/proxy/src/batch.rs +++ b/proxy/src/batch.rs @@ -7,13 +7,17 @@ use std::pin::pin; use std::sync::Mutex; use scopeguard::ScopeGuard; +use tokio::sync::oneshot; use tokio::sync::oneshot::error::TryRecvError; use crate::ext::LockExt; +type ProcResult

= Result<

::Res,

::Err>; + pub trait QueueProcessing: Send + 'static { type Req: Send + 'static; type Res: Send; + type Err: Send + Clone; /// Get the desired batch size. fn batch_size(&self, queue_size: usize) -> usize; @@ -24,7 +28,18 @@ pub trait QueueProcessing: Send + 'static { /// If this apply can error, it's expected that errors be forwarded to each Self::Res. /// /// Batching does not need to happen atomically. - fn apply(&mut self, req: Vec) -> impl Future> + Send; + fn apply( + &mut self, + req: Vec, + ) -> impl Future, Self::Err>> + Send; +} + +#[derive(thiserror::Error)] +pub enum BatchQueueError { + #[error(transparent)] + Result(E), + #[error(transparent)] + Cancelled(C), } pub struct BatchQueue { @@ -34,7 +49,7 @@ pub struct BatchQueue { struct BatchJob { req: P::Req, - res: tokio::sync::oneshot::Sender, + res: tokio::sync::oneshot::Sender>, } impl BatchQueue

{ @@ -55,11 +70,11 @@ impl BatchQueue

{ &self, req: P::Req, cancelled: impl Future, - ) -> Result { + ) -> Result> { let (id, mut rx) = self.inner.lock_propagate_poison().register_job(req); let mut cancelled = pin!(cancelled); - let resp = loop { + let resp: Option> = loop { // try become the leader, or try wait for success. let mut processor = tokio::select! { // try become leader. @@ -72,7 +87,7 @@ impl BatchQueue

{ if inner.queue.remove(&id).is_some() { tracing::warn!("batched task cancelled before completion"); } - return Err(cancel); + return Err(BatchQueueError::Cancelled(cancel)); }, }; @@ -96,18 +111,30 @@ impl BatchQueue

{ // good: we didn't get cancelled. ScopeGuard::into_inner(cancel_safety); - if values.len() != resps.len() { - tracing::error!( - "batch: invalid response size, expected={}, got={}", - resps.len(), - values.len() - ); - } + match values { + Ok(values) => { + if values.len() != resps.len() { + tracing::error!( + "batch: invalid response size, expected={}, got={}", + resps.len(), + values.len() + ); + } - // send response values. - for (tx, value) in std::iter::zip(resps, values) { - if tx.send(value).is_err() { - // receiver hung up but that's fine. + // send response values. + for (tx, value) in std::iter::zip(resps, values) { + if tx.send(Ok(value)).is_err() { + // receiver hung up but that's fine. + } + } + } + + Err(err) => { + for tx in resps { + if tx.send(Err(err.clone())).is_err() { + // receiver hung up but that's fine. + } + } } } @@ -129,7 +156,8 @@ impl BatchQueue

{ tracing::debug!(id, "batch: job completed"); - Ok(resp.expect("no response found. batch processer should not panic")) + resp.expect("no response found. batch processer should not panic") + .map_err(BatchQueueError::Result) } } @@ -139,8 +167,8 @@ struct BatchQueueInner { } impl BatchQueueInner

{ - fn register_job(&mut self, req: P::Req) -> (u64, tokio::sync::oneshot::Receiver) { - let (tx, rx) = tokio::sync::oneshot::channel(); + fn register_job(&mut self, req: P::Req) -> (u64, oneshot::Receiver>) { + let (tx, rx) = oneshot::channel(); let id = self.version; @@ -158,7 +186,7 @@ impl BatchQueueInner

{ (id, rx) } - fn get_batch(&mut self, p: &P) -> (Vec, Vec>) { + fn get_batch(&mut self, p: &P) -> (Vec, Vec>>) { let batch_size = p.batch_size(self.queue.len()); let mut reqs = Vec::with_capacity(batch_size); let mut resps = Vec::with_capacity(batch_size); diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 74413f1a7d..4ea4c4ea54 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -4,12 +4,11 @@ use std::pin::pin; use std::sync::{Arc, OnceLock}; use std::time::Duration; -use anyhow::anyhow; use futures::FutureExt; use ipnet::{IpNet, Ipv4Net, Ipv6Net}; use postgres_client::RawCancelToken; use postgres_client::tls::MakeTlsConnect; -use redis::{Cmd, FromRedisValue, Value}; +use redis::{Cmd, FromRedisValue, SetExpiry, SetOptions, Value}; use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::net::TcpStream; @@ -18,7 +17,7 @@ use tracing::{debug, error, info}; use crate::auth::AuthError; use crate::auth::backend::ComputeUserInfo; -use crate::batch::{BatchQueue, QueueProcessing}; +use crate::batch::{BatchQueue, BatchQueueError, QueueProcessing}; use crate::config::ComputeConfig; use crate::context::RequestContext; use crate::control_plane::ControlPlaneApi; @@ -28,7 +27,7 @@ use crate::metrics::{CancelChannelSizeGuard, CancellationRequest, Metrics, Redis use crate::pqproto::CancelKeyData; use crate::rate_limiter::LeakyBucketRateLimiter; use crate::redis::keys::KeyPrefix; -use crate::redis::kv_ops::RedisKVClient; +use crate::redis::kv_ops::{RedisKVClient, RedisKVClientError}; type IpSubnetKey = IpNet; @@ -45,6 +44,17 @@ pub enum CancelKeyOp { GetCancelData { key: CancelKeyData, }, + GetCancelDataOld { + key: CancelKeyData, + }, +} + +#[derive(thiserror::Error, Debug, Clone)] +pub enum PipelineError { + #[error("could not send cmd to redis: {0}")] + RedisKVClient(Arc), + #[error("incorrect number of responses from redis")] + IncorrectNumberOfResponses, } pub struct Pipeline { @@ -60,7 +70,7 @@ impl Pipeline { } } - async fn execute(self, client: &mut RedisKVClient) -> Vec> { + async fn execute(self, client: &mut RedisKVClient) -> Result, PipelineError> { let responses = self.replies; let batch_size = self.inner.len(); @@ -78,30 +88,20 @@ impl Pipeline { batch_size, responses, "successfully completed cancellation jobs", ); - values.into_iter().map(Ok).collect() + Ok(values.into_iter().collect()) } Ok(value) => { error!(batch_size, ?value, "unexpected redis return value"); - std::iter::repeat_with(|| Err(anyhow!("incorrect response type from redis"))) - .take(responses) - .collect() - } - Err(err) => { - std::iter::repeat_with(|| Err(anyhow!("could not send cmd to redis: {err}"))) - .take(responses) - .collect() + Err(PipelineError::IncorrectNumberOfResponses) } + Err(err) => Err(PipelineError::RedisKVClient(Arc::new(err))), } } - fn add_command_with_reply(&mut self, cmd: Cmd) { + fn add_command(&mut self, cmd: Cmd) { self.inner.add_command(cmd); self.replies += 1; } - - fn add_command_no_reply(&mut self, cmd: Cmd) { - self.inner.add_command(cmd).ignore(); - } } impl CancelKeyOp { @@ -109,12 +109,19 @@ impl CancelKeyOp { match self { CancelKeyOp::StoreCancelKey { key, value, expire } => { let key = KeyPrefix::Cancel(*key).build_redis_key(); - pipe.add_command_with_reply(Cmd::hset(&key, "data", &**value)); - pipe.add_command_no_reply(Cmd::expire(&key, expire.as_secs() as i64)); + pipe.add_command(Cmd::set_options( + &key, + &**value, + SetOptions::default().with_expiration(SetExpiry::EX(expire.as_secs())), + )); + } + CancelKeyOp::GetCancelDataOld { key } => { + let key = KeyPrefix::Cancel(*key).build_redis_key(); + pipe.add_command(Cmd::hget(key, "data")); } CancelKeyOp::GetCancelData { key } => { let key = KeyPrefix::Cancel(*key).build_redis_key(); - pipe.add_command_with_reply(Cmd::hget(key, "data")); + pipe.add_command(Cmd::get(key)); } } } @@ -127,13 +134,14 @@ pub struct CancellationProcessor { impl QueueProcessing for CancellationProcessor { type Req = (CancelChannelSizeGuard<'static>, CancelKeyOp); - type Res = anyhow::Result; + type Res = redis::Value; + type Err = PipelineError; fn batch_size(&self, _queue_size: usize) -> usize { self.batch_size } - async fn apply(&mut self, batch: Vec) -> Vec { + async fn apply(&mut self, batch: Vec) -> Result, Self::Err> { if !self.client.credentials_refreshed() { // this will cause a timeout for cancellation operations tracing::debug!( @@ -244,18 +252,18 @@ impl CancellationHandler { &self, key: CancelKeyData, ) -> Result, CancelError> { - let guard = Metrics::get() - .proxy - .cancel_channel_size - .guard(RedisMsgKind::HGet); - let op = CancelKeyOp::GetCancelData { key }; + const TIMEOUT: Duration = Duration::from_secs(5); let Some(tx) = self.tx.get() else { tracing::warn!("cancellation handler is not available"); return Err(CancelError::InternalError); }; - const TIMEOUT: Duration = Duration::from_secs(5); + let guard = Metrics::get() + .proxy + .cancel_channel_size + .guard(RedisMsgKind::Get); + let op = CancelKeyOp::GetCancelData { key }; let result = timeout( TIMEOUT, tx.call((guard, op), std::future::pending::()), @@ -264,10 +272,37 @@ impl CancellationHandler { .map_err(|_| { tracing::warn!("timed out waiting to receive GetCancelData response"); CancelError::RateLimit - })? - // cannot be cancelled - .unwrap_or_else(|x| match x {}) - .map_err(|e| { + })?; + + // We may still have cancel keys set with HSET "data". + // Check error type and retry with HGET. + // TODO: remove code after HSET is not used anymore. + let result = if let Err(err) = result.as_ref() + && let BatchQueueError::Result(err) = err + && let PipelineError::RedisKVClient(err) = err + && let RedisKVClientError::Redis(err) = &**err + && let Some(errcode) = err.code() + && errcode == "WRONGTYPE" + { + let guard = Metrics::get() + .proxy + .cancel_channel_size + .guard(RedisMsgKind::HGet); + let op = CancelKeyOp::GetCancelDataOld { key }; + timeout( + TIMEOUT, + tx.call((guard, op), std::future::pending::()), + ) + .await + .map_err(|_| { + tracing::warn!("timed out waiting to receive GetCancelData response"); + CancelError::RateLimit + })? + } else { + result + }; + + let result = result.map_err(|e| { tracing::warn!("failed to receive GetCancelData response: {e}"); CancelError::InternalError })?; @@ -442,7 +477,7 @@ impl Session { let guard = Metrics::get() .proxy .cancel_channel_size - .guard(RedisMsgKind::HSet); + .guard(RedisMsgKind::Set); let op = CancelKeyOp::StoreCancelKey { key: self.key, value: closure_json.clone(), @@ -456,7 +491,7 @@ impl Session { ); match tx.call((guard, op), cancel.as_mut()).await { - Ok(Ok(_)) => { + Ok(_) => { tracing::debug!( src=%self.key, dest=?cancel_closure.cancel_token, @@ -467,10 +502,10 @@ impl Session { tokio::time::sleep(CANCEL_KEY_REFRESH).await; } // retry immediately. - Ok(Err(error)) => { + Err(BatchQueueError::Result(error)) => { tracing::warn!(?error, "error registering cancellation key"); } - Err(Err(_cancelled)) => break, + Err(BatchQueueError::Cancelled(Err(_cancelled))) => break, } } diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 9d1a3d4358..8439082498 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -374,11 +374,9 @@ pub enum Waiting { #[label(singleton = "kind")] #[allow(clippy::enum_variant_names)] pub enum RedisMsgKind { - HSet, - HSetMultiple, + Set, + Get, HGet, - HGetAll, - HDel, } #[derive(Default, Clone)] diff --git a/proxy/src/redis/connection_with_credentials_provider.rs b/proxy/src/redis/connection_with_credentials_provider.rs index 35a3fe4334..b0bf332e44 100644 --- a/proxy/src/redis/connection_with_credentials_provider.rs +++ b/proxy/src/redis/connection_with_credentials_provider.rs @@ -4,11 +4,12 @@ use std::time::Duration; use futures::FutureExt; use redis::aio::{ConnectionLike, MultiplexedConnection}; -use redis::{ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult}; +use redis::{ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisError, RedisResult}; use tokio::task::AbortHandle; use tracing::{error, info, warn}; use super::elasticache::CredentialsProvider; +use crate::redis::elasticache::CredentialsProviderError; enum Credentials { Static(ConnectionInfo), @@ -26,6 +27,14 @@ impl Clone for Credentials { } } +#[derive(thiserror::Error, Debug)] +pub enum ConnectionProviderError { + #[error(transparent)] + Redis(#[from] RedisError), + #[error(transparent)] + CredentialsProvider(#[from] CredentialsProviderError), +} + /// A wrapper around `redis::MultiplexedConnection` that automatically refreshes the token. /// Provides PubSub connection without credentials refresh. pub struct ConnectionWithCredentialsProvider { @@ -86,15 +95,18 @@ impl ConnectionWithCredentialsProvider { } } - async fn ping(con: &mut MultiplexedConnection) -> RedisResult<()> { - redis::cmd("PING").query_async(con).await + async fn ping(con: &mut MultiplexedConnection) -> Result<(), ConnectionProviderError> { + redis::cmd("PING") + .query_async(con) + .await + .map_err(Into::into) } pub(crate) fn credentials_refreshed(&self) -> bool { self.credentials_refreshed.load(Ordering::Relaxed) } - pub(crate) async fn connect(&mut self) -> anyhow::Result<()> { + pub(crate) async fn connect(&mut self) -> Result<(), ConnectionProviderError> { let _guard = self.mutex.lock().await; if let Some(con) = self.con.as_mut() { match Self::ping(con).await { @@ -141,7 +153,7 @@ impl ConnectionWithCredentialsProvider { Ok(()) } - async fn get_connection_info(&self) -> anyhow::Result { + async fn get_connection_info(&self) -> Result { match &self.credentials { Credentials::Static(info) => Ok(info.clone()), Credentials::Dynamic(provider, addr) => { @@ -160,7 +172,7 @@ impl ConnectionWithCredentialsProvider { } } - async fn get_client(&self) -> anyhow::Result { + async fn get_client(&self) -> Result { let client = redis::Client::open(self.get_connection_info().await?)?; self.credentials_refreshed.store(true, Ordering::Relaxed); Ok(client) diff --git a/proxy/src/redis/elasticache.rs b/proxy/src/redis/elasticache.rs index 58e3c889a7..6f3b34d381 100644 --- a/proxy/src/redis/elasticache.rs +++ b/proxy/src/redis/elasticache.rs @@ -9,10 +9,12 @@ use aws_config::meta::region::RegionProviderChain; use aws_config::profile::ProfileFileCredentialsProvider; use aws_config::provider_config::ProviderConfig; use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider; +use aws_credential_types::provider::error::CredentialsError; use aws_sdk_iam::config::ProvideCredentials; use aws_sigv4::http_request::{ - self, SignableBody, SignableRequest, SignatureLocation, SigningSettings, + self, SignableBody, SignableRequest, SignatureLocation, SigningError, SigningSettings, }; +use aws_sigv4::sign::v4::signing_params::BuildError; use tracing::info; #[derive(Debug)] @@ -40,6 +42,18 @@ impl AWSIRSAConfig { } } +#[derive(thiserror::Error, Debug)] +pub enum CredentialsProviderError { + #[error(transparent)] + AwsCredentials(#[from] CredentialsError), + #[error(transparent)] + AwsSigv4Build(#[from] BuildError), + #[error(transparent)] + AwsSigv4Singing(#[from] SigningError), + #[error(transparent)] + Http(#[from] http::Error), +} + /// Credentials provider for AWS elasticache authentication. /// /// Official documentation: @@ -92,7 +106,9 @@ impl CredentialsProvider { }) } - pub(crate) async fn provide_credentials(&self) -> anyhow::Result<(String, String)> { + pub(crate) async fn provide_credentials( + &self, + ) -> Result<(String, String), CredentialsProviderError> { let aws_credentials = self .credentials_provider .provide_credentials() diff --git a/proxy/src/redis/kv_ops.rs b/proxy/src/redis/kv_ops.rs index cfdbc21839..d1e97b6b09 100644 --- a/proxy/src/redis/kv_ops.rs +++ b/proxy/src/redis/kv_ops.rs @@ -2,9 +2,18 @@ use std::time::Duration; use futures::FutureExt; use redis::aio::ConnectionLike; -use redis::{Cmd, FromRedisValue, Pipeline, RedisResult}; +use redis::{Cmd, FromRedisValue, Pipeline, RedisError, RedisResult}; use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider; +use crate::redis::connection_with_credentials_provider::ConnectionProviderError; + +#[derive(thiserror::Error, Debug)] +pub enum RedisKVClientError { + #[error(transparent)] + Redis(#[from] RedisError), + #[error(transparent)] + ConnectionProvider(#[from] ConnectionProviderError), +} pub struct RedisKVClient { client: ConnectionWithCredentialsProvider, @@ -32,12 +41,13 @@ impl RedisKVClient { Self { client } } - pub async fn try_connect(&mut self) -> anyhow::Result<()> { + pub async fn try_connect(&mut self) -> Result<(), RedisKVClientError> { self.client .connect() .boxed() .await .inspect_err(|e| tracing::error!("failed to connect to redis: {e}")) + .map_err(Into::into) } pub(crate) fn credentials_refreshed(&self) -> bool { @@ -47,7 +57,7 @@ impl RedisKVClient { pub(crate) async fn query( &mut self, q: &impl Queryable, - ) -> anyhow::Result { + ) -> Result { let e = match q.query(&mut self.client).await { Ok(t) => return Ok(t), Err(e) => e, From 9bba31bf6805e1c179b75fbb5bcab96c96980c75 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 11 Jul 2025 20:39:08 +0100 Subject: [PATCH 11/14] proxy: encode json as we parse rows (#11992) Serialize query row responses directly into JSON. Some of this code should be using the `json::value_as_object/list` macros, but I've avoided it for now to minimize the size of the diff. --- Cargo.lock | 1 + proxy/Cargo.toml | 1 + proxy/src/serverless/json.rs | 95 +++++++--------- proxy/src/serverless/sql_over_http.rs | 154 +++++++++++++------------- 4 files changed, 122 insertions(+), 129 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4323254f0a..14b460005a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5329,6 +5329,7 @@ dependencies = [ "itoa", "jose-jwa", "jose-jwk", + "json", "lasso", "measured", "metrics", diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 0a406d1ca8..82fe6818e3 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -49,6 +49,7 @@ indexmap = { workspace = true, features = ["serde"] } ipnet.workspace = true itertools.workspace = true itoa.workspace = true +json = { path = "../libs/proxy/json" } lasso = { workspace = true, features = ["multi-threaded"] } measured = { workspace = true, features = ["lasso"] } metrics.workspace = true diff --git a/proxy/src/serverless/json.rs b/proxy/src/serverless/json.rs index 2e67d07079..ef7c8a4d82 100644 --- a/proxy/src/serverless/json.rs +++ b/proxy/src/serverless/json.rs @@ -1,6 +1,7 @@ +use json::{ListSer, ObjectSer, ValueSer}; use postgres_client::Row; use postgres_client::types::{Kind, Type}; -use serde_json::{Map, Value}; +use serde_json::Value; // // Convert json non-string types to strings, so that they can be passed to Postgres @@ -74,44 +75,40 @@ pub(crate) enum JsonConversionError { UnbalancedString, } -enum OutputMode { - Array(Vec), - Object(Map), +enum OutputMode<'a> { + Array(ListSer<'a>), + Object(ObjectSer<'a>), } -impl OutputMode { - fn key(&mut self, key: &str) -> &mut Value { +impl OutputMode<'_> { + fn key(&mut self, key: &str) -> ValueSer<'_> { match self { - OutputMode::Array(values) => push_entry(values, Value::Null), - OutputMode::Object(map) => map.entry(key.to_string()).or_insert(Value::Null), + OutputMode::Array(values) => values.entry(), + OutputMode::Object(map) => map.key(key), } } - fn finish(self) -> Value { + fn finish(self) { match self { - OutputMode::Array(values) => Value::Array(values), - OutputMode::Object(map) => Value::Object(map), + OutputMode::Array(values) => values.finish(), + OutputMode::Object(map) => map.finish(), } } } -fn push_entry(arr: &mut Vec, t: T) -> &mut T { - arr.push(t); - arr.last_mut().expect("a value was just inserted") -} - // // Convert postgres row with text-encoded values to JSON object // pub(crate) fn pg_text_row_to_json( + output: ValueSer, row: &Row, raw_output: bool, array_mode: bool, -) -> Result { +) -> Result<(), JsonConversionError> { let mut entries = if array_mode { - OutputMode::Array(Vec::with_capacity(row.columns().len())) + OutputMode::Array(output.list()) } else { - OutputMode::Object(Map::with_capacity(row.columns().len())) + OutputMode::Object(output.object()) }; for (i, column) in row.columns().iter().enumerate() { @@ -120,53 +117,48 @@ pub(crate) fn pg_text_row_to_json( let value = entries.key(column.name()); match pg_value { - Some(v) if raw_output => *value = Value::String(v.to_string()), + Some(v) if raw_output => value.value(v), Some(v) => pg_text_to_json(value, v, column.type_())?, - None => *value = Value::Null, + None => value.value(json::Null), } } - Ok(entries.finish()) + entries.finish(); + Ok(()) } // // Convert postgres text-encoded value to JSON value // -fn pg_text_to_json( - output: &mut Value, - val: &str, - pg_type: &Type, -) -> Result<(), JsonConversionError> { +fn pg_text_to_json(output: ValueSer, val: &str, pg_type: &Type) -> Result<(), JsonConversionError> { if let Kind::Array(elem_type) = pg_type.kind() { // todo: we should fetch this from postgres. let delimiter = ','; - let mut array = vec![]; - pg_array_parse(&mut array, val, elem_type, delimiter)?; - *output = Value::Array(array); + json::value_as_list!(|output| pg_array_parse(output, val, elem_type, delimiter)?); return Ok(()); } match *pg_type { - Type::BOOL => *output = Value::Bool(val == "t"), + Type::BOOL => output.value(val == "t"), Type::INT2 | Type::INT4 => { let val = val.parse::()?; - *output = Value::Number(serde_json::Number::from(val)); + output.value(val); } Type::FLOAT4 | Type::FLOAT8 => { let fval = val.parse::()?; - let num = serde_json::Number::from_f64(fval); - if let Some(num) = num { - *output = Value::Number(num); + if fval.is_finite() { + output.value(fval); } else { // Pass Nan, Inf, -Inf as strings // JS JSON.stringify() does converts them to null, but we // want to preserve them, so we pass them as strings - *output = Value::String(val.to_string()); + output.value(val); } } - Type::JSON | Type::JSONB => *output = serde_json::from_str(val)?, - _ => *output = Value::String(val.to_string()), + // we assume that the string value is valid json. + Type::JSON | Type::JSONB => output.write_raw_json(val.as_bytes()), + _ => output.value(val), } Ok(()) @@ -192,7 +184,7 @@ fn pg_text_to_json( /// gets its own level of curly braces, and delimiters must be written between adjacent /// curly-braced entities of the same level. fn pg_array_parse( - elements: &mut Vec, + elements: &mut ListSer, mut pg_array: &str, elem: &Type, delim: char, @@ -221,7 +213,7 @@ fn pg_array_parse( /// reads a single array from the `pg_array` string and pushes each values to `elements`. /// returns the rest of the `pg_array` string that was not read. fn pg_array_parse_inner<'a>( - elements: &mut Vec, + elements: &mut ListSer, mut pg_array: &'a str, elem: &Type, delim: char, @@ -234,7 +226,7 @@ fn pg_array_parse_inner<'a>( let mut q = String::new(); loop { - let value = push_entry(elements, Value::Null); + let value = elements.entry(); pg_array = pg_array_parse_item(value, &mut q, pg_array, elem, delim)?; // check for separator. @@ -260,7 +252,7 @@ fn pg_array_parse_inner<'a>( /// /// `quoted` is a scratch allocation that has no defined output. fn pg_array_parse_item<'a>( - output: &mut Value, + output: ValueSer, quoted: &mut String, mut pg_array: &'a str, elem: &Type, @@ -276,9 +268,8 @@ fn pg_array_parse_item<'a>( if pg_array.starts_with('{') { // nested array. - let mut nested = vec![]; - pg_array = pg_array_parse_inner(&mut nested, pg_array, elem, delim)?; - *output = Value::Array(nested); + pg_array = + json::value_as_list!(|output| pg_array_parse_inner(output, pg_array, elem, delim))?; return Ok(pg_array); } @@ -306,7 +297,7 @@ fn pg_array_parse_item<'a>( // we might have an item string: // check for null if item == "NULL" { - *output = Value::Null; + output.value(json::Null); } else { pg_text_to_json(output, item, elem)?; } @@ -440,15 +431,15 @@ mod tests { } fn pg_text_to_json(val: &str, pg_type: &Type) -> Value { - let mut v = Value::Null; - super::pg_text_to_json(&mut v, val, pg_type).unwrap(); - v + let output = json::value_to_string!(|v| super::pg_text_to_json(v, val, pg_type).unwrap()); + serde_json::from_str(&output).unwrap() } fn pg_array_parse(pg_array: &str, pg_type: &Type) -> Value { - let mut array = vec![]; - super::pg_array_parse(&mut array, pg_array, pg_type, ',').unwrap(); - Value::Array(array) + let output = json::value_to_string!(|v| json::value_as_list!(|v| { + super::pg_array_parse(v, pg_array, pg_type, ',').unwrap(); + })); + serde_json::from_str(&output).unwrap() } #[test] diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 7a718d0280..8a14f804b6 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -14,10 +14,7 @@ use hyper::http::{HeaderName, HeaderValue}; use hyper::{Request, Response, StatusCode, header}; use indexmap::IndexMap; use postgres_client::error::{DbError, ErrorPosition, SqlState}; -use postgres_client::{ - GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, RowStream, Transaction, -}; -use serde::Serialize; +use postgres_client::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction}; use serde_json::Value; use serde_json::value::RawValue; use tokio::time::{self, Instant}; @@ -687,32 +684,21 @@ impl QueryData { let (inner, mut discard) = client.inner(); let cancel_token = inner.cancel_token(); - match select( + let mut json_buf = vec![]; + + let batch_result = match select( pin!(query_to_json( config, &mut *inner, self, - &mut 0, + json::ValueSer::new(&mut json_buf), parsed_headers )), pin!(cancel.cancelled()), ) .await { - // The query successfully completed. - Either::Left((Ok((status, results)), __not_yet_cancelled)) => { - discard.check_idle(status); - - let json_output = - serde_json::to_string(&results).expect("json serialization should not fail"); - Ok(json_output) - } - // The query failed with an error - Either::Left((Err(e), __not_yet_cancelled)) => { - discard.discard(); - Err(e) - } - // The query was cancelled. + Either::Left((res, __not_yet_cancelled)) => res, Either::Right((_cancelled, query)) => { tracing::info!("cancelling query"); if let Err(err) = cancel_token.cancel_query(NoTls).await { @@ -721,13 +707,7 @@ impl QueryData { // wait for the query cancellation match time::timeout(time::Duration::from_millis(100), query).await { // query successed before it was cancelled. - Ok(Ok((status, results))) => { - discard.check_idle(status); - - let json_output = serde_json::to_string(&results) - .expect("json serialization should not fail"); - Ok(json_output) - } + Ok(Ok(status)) => Ok(status), // query failed or was cancelled. Ok(Err(error)) => { let db_error = match &error { @@ -743,14 +723,29 @@ impl QueryData { discard.discard(); } - Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres)) + return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres)); } Err(_timeout) => { discard.discard(); - Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres)) + return Err(SqlOverHttpError::Cancelled(SqlOverHttpCancel::Postgres)); } } } + }; + + match batch_result { + // The query successfully completed. + Ok(status) => { + discard.check_idle(status); + + let json_output = String::from_utf8(json_buf).expect("json should be valid utf8"); + Ok(json_output) + } + // The query failed with an error + Err(e) => { + discard.discard(); + Err(e) + } } } } @@ -787,7 +782,7 @@ impl BatchQueryData { }) .map_err(SqlOverHttpError::Postgres)?; - let json_output = match query_batch( + let json_output = match query_batch_to_json( config, cancel.child_token(), &mut transaction, @@ -845,24 +840,21 @@ async fn query_batch( transaction: &mut Transaction<'_>, queries: BatchQueryData, parsed_headers: HttpHeaders, -) -> Result { - let mut results = Vec::with_capacity(queries.queries.len()); - let mut current_size = 0; + results: &mut json::ListSer<'_>, +) -> Result<(), SqlOverHttpError> { for stmt in queries.queries { let query = pin!(query_to_json( config, transaction, stmt, - &mut current_size, + results.entry(), parsed_headers, )); let cancelled = pin!(cancel.cancelled()); let res = select(query, cancelled).await; match res { // TODO: maybe we should check that the transaction bit is set here - Either::Left((Ok((_, values)), _cancelled)) => { - results.push(values); - } + Either::Left((Ok(_), _cancelled)) => {} Either::Left((Err(e), _cancelled)) => { return Err(e); } @@ -872,8 +864,22 @@ async fn query_batch( } } - let results = json!({ "results": results }); - let json_output = serde_json::to_string(&results).expect("json serialization should not fail"); + Ok(()) +} + +async fn query_batch_to_json( + config: &'static HttpConfig, + cancel: CancellationToken, + tx: &mut Transaction<'_>, + queries: BatchQueryData, + headers: HttpHeaders, +) -> Result { + let json_output = json::value_to_string!(|obj| json::value_as_object!(|obj| { + let results = obj.key("results"); + json::value_as_list!(|results| { + query_batch(config, cancel, tx, queries, headers, results).await?; + }); + })); Ok(json_output) } @@ -882,54 +888,54 @@ async fn query_to_json( config: &'static HttpConfig, client: &mut T, data: QueryData, - current_size: &mut usize, + output: json::ValueSer<'_>, parsed_headers: HttpHeaders, -) -> Result<(ReadyForQueryStatus, impl Serialize + use), SqlOverHttpError> { +) -> Result { let query_start = Instant::now(); - let query_params = data.params; + let mut output = json::ObjectSer::new(output); let mut row_stream = client - .query_raw_txt(&data.query, query_params) + .query_raw_txt(&data.query, data.params) .await .map_err(SqlOverHttpError::Postgres)?; let query_acknowledged = Instant::now(); - let columns_len = row_stream.statement.columns().len(); - let mut fields = Vec::with_capacity(columns_len); - + let mut json_fields = output.key("fields").list(); for c in row_stream.statement.columns() { - fields.push(json!({ - "name": c.name().to_owned(), - "dataTypeID": c.type_().oid(), - "tableID": c.table_oid(), - "columnID": c.column_id(), - "dataTypeSize": c.type_size(), - "dataTypeModifier": c.type_modifier(), - "format": "text", - })); + let json_field = json_fields.entry(); + json::value_as_object!(|json_field| { + json_field.entry("name", c.name()); + json_field.entry("dataTypeID", c.type_().oid()); + json_field.entry("tableID", c.table_oid()); + json_field.entry("columnID", c.column_id()); + json_field.entry("dataTypeSize", c.type_size()); + json_field.entry("dataTypeModifier", c.type_modifier()); + json_field.entry("format", "text"); + }); } + json_fields.finish(); - let raw_output = parsed_headers.raw_output; let array_mode = data.array_mode.unwrap_or(parsed_headers.default_array_mode); + let raw_output = parsed_headers.raw_output; // Manually drain the stream into a vector to leave row_stream hanging // around to get a command tag. Also check that the response is not too // big. - let mut rows = Vec::new(); + let mut rows = 0; + let mut json_rows = output.key("rows").list(); while let Some(row) = row_stream.next().await { let row = row.map_err(SqlOverHttpError::Postgres)?; - *current_size += row.body_len(); // we don't have a streaming response support yet so this is to prevent OOM // from a malicious query (eg a cross join) - if *current_size > config.max_response_size_bytes { + if json_rows.as_buffer().len() > config.max_response_size_bytes { return Err(SqlOverHttpError::ResponseTooLarge( config.max_response_size_bytes, )); } - let row = pg_text_row_to_json(&row, raw_output, array_mode)?; - rows.push(row); + pg_text_row_to_json(json_rows.entry(), &row, raw_output, array_mode)?; + rows += 1; // assumption: parsing pg text and converting to json takes CPU time. // let's assume it is slightly expensive, so we should consume some cooperative budget. @@ -937,16 +943,14 @@ async fn query_to_json( // of rows and never hit the tokio mpsc for a long time (although unlikely). tokio::task::consume_budget().await; } + json_rows.finish(); let query_resp_end = Instant::now(); - let RowStream { - command_tag, - status: ready, - .. - } = row_stream; + + let ready = row_stream.status; // grab the command tag and number of rows affected - let command_tag = command_tag.unwrap_or_default(); + let command_tag = row_stream.command_tag.unwrap_or_default(); let mut command_tag_split = command_tag.split(' '); let command_tag_name = command_tag_split.next().unwrap_or_default(); let command_tag_count = if command_tag_name == "INSERT" { @@ -959,7 +963,7 @@ async fn query_to_json( .and_then(|s| s.parse::().ok()); info!( - rows = rows.len(), + rows, ?ready, command_tag, acknowledgement = ?(query_acknowledged - query_start), @@ -967,16 +971,12 @@ async fn query_to_json( "finished executing query" ); - // Resulting JSON format is based on the format of node-postgres result. - let results = json!({ - "command": command_tag_name.to_string(), - "rowCount": command_tag_count, - "rows": rows, - "fields": fields, - "rowAsArray": array_mode, - }); + output.entry("command", command_tag_name); + output.entry("rowCount", command_tag_count); + output.entry("rowAsArray", array_mode); - Ok((ready, results)) + output.finish(); + Ok(ready) } enum Client { From ee7bb1a66746e4bbbf1213792b8169e00ce08334 Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Sat, 12 Jul 2025 08:57:04 +0400 Subject: [PATCH 12/14] storcon: validate new_sk_set before starting safekeeper migration (#12546) ## Problem We don't validate the validity of the `new_sk_set` before starting the migration. It is validated later, so the migration to an invalid safekeeper set will fail anyway. But at this point we might already commited an invalid `new_sk_set` to the database and there is no `abort` command yet (I ran into this issue in neon_local and ruined the timeline :) - Part of https://github.com/neondatabase/neon/issues/11669 ## Summary of changes - Add safekeeper count and safekeeper duplication checks before starting the migration - Test that we validate the `new_sk_set` before starting the migration - Add `force` option to the `TimelineSafekeeperMigrateRequest` to disable not-mandatory checks --- .../src/service/safekeeper_service.rs | 45 +++++++++++++++---- .../regress/test_safekeeper_migration.py | 38 ++++++++++++++++ 2 files changed, 75 insertions(+), 8 deletions(-) diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index 42ddf81e3e..7521d7bd86 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -39,13 +39,13 @@ use utils::lsn::Lsn; use super::Service; impl Service { - fn make_member_set(safekeepers: &[Safekeeper]) -> Result { + fn make_member_set(safekeepers: &[Safekeeper]) -> Result { let members = safekeepers .iter() .map(|sk| sk.get_safekeeper_id()) .collect::>(); - MemberSet::new(members).map_err(ApiError::InternalServerError) + MemberSet::new(members) } fn get_safekeepers(&self, ids: &[i64]) -> Result, ApiError> { @@ -80,7 +80,7 @@ impl Service { ) -> Result, ApiError> { let safekeepers = self.get_safekeepers(&timeline_persistence.sk_set)?; - let mset = Self::make_member_set(&safekeepers)?; + let mset = Self::make_member_set(&safekeepers).map_err(ApiError::InternalServerError)?; let mconf = safekeeper_api::membership::Configuration::new(mset); let req = safekeeper_api::models::TimelineCreateRequest { @@ -1105,6 +1105,26 @@ impl Service { } } + if new_sk_set.is_empty() { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "new safekeeper set is empty" + ))); + } + + if new_sk_set.len() < self.config.timeline_safekeeper_count { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "new safekeeper set must have at least {} safekeepers", + self.config.timeline_safekeeper_count + ))); + } + + let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::>(); + let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?; + // Construct new member set in advance to validate it. + // E.g. validates that there is no duplicate safekeepers. + let new_sk_member_set = + Self::make_member_set(&new_safekeepers).map_err(ApiError::BadRequest)?; + // TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks. let _tenant_lock = trace_shared_lock( &self.tenant_op_locks, @@ -1135,6 +1155,18 @@ impl Service { .map(|&id| NodeId(id as u64)) .collect::>(); + // Validate that we are not migrating to a decomissioned safekeeper. + for sk in new_safekeepers.iter() { + if !cur_sk_set.contains(&sk.get_id()) + && sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned + { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "safekeeper {} is decomissioned", + sk.get_id() + ))); + } + } + tracing::info!( ?cur_sk_set, ?new_sk_set, @@ -1177,11 +1209,8 @@ impl Service { } let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?; - let cur_sk_member_set = Self::make_member_set(&cur_safekeepers)?; - - let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::>(); - let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?; - let new_sk_member_set = Self::make_member_set(&new_safekeepers)?; + let cur_sk_member_set = + Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?; let joint_config = membership::Configuration { generation, diff --git a/test_runner/regress/test_safekeeper_migration.py b/test_runner/regress/test_safekeeper_migration.py index b82d7b9bb0..170c1a3650 100644 --- a/test_runner/regress/test_safekeeper_migration.py +++ b/test_runner/regress/test_safekeeper_migration.py @@ -2,6 +2,9 @@ from __future__ import annotations from typing import TYPE_CHECKING +import pytest +from fixtures.neon_fixtures import StorageControllerApiException + if TYPE_CHECKING: from fixtures.neon_fixtures import NeonEnvBuilder @@ -75,3 +78,38 @@ def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder): ep.start(safekeeper_generation=1, safekeepers=[3]) assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)] + + +def test_new_sk_set_validation(neon_env_builder: NeonEnvBuilder): + """ + Test that safekeeper_migrate validates the new_sk_set before starting the migration. + """ + neon_env_builder.num_safekeepers = 3 + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": True, + "timeline_safekeeper_count": 2, + } + env = neon_env_builder.init_start() + + def expect_fail(sk_set: list[int], match: str): + with pytest.raises(StorageControllerApiException, match=match): + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, sk_set + ) + # Check that we failed before commiting to the database. + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + assert mconf["generation"] == 1 + + expect_fail([], "safekeeper set is empty") + expect_fail([1], "must have at least 2 safekeepers") + expect_fail([1, 1], "duplicate safekeeper") + expect_fail([1, 100500], "does not exist") + + mconf = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline) + sk_set = mconf["sk_set"] + assert len(sk_set) == 2 + + decom_sk = [sk.id for sk in env.safekeepers if sk.id not in sk_set][0] + env.storage_controller.safekeeper_scheduling_policy(decom_sk, "Decomissioned") + + expect_fail([sk_set[0], decom_sk], "decomissioned") From a5fe67f3616b55135fa3a58c2db89bf30a9eb955 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Sun, 13 Jul 2025 19:27:39 +0200 Subject: [PATCH 13/14] proxy: cancel maintain_cancel_key task immediately (#12586) ## Problem When a connection terminates its maintain_cancel_key task keeps running until the CANCEL_KEY_REFRESH sleep finishes and then it triggers another cancel key TTL refresh before exiting. ## Summary of changes * Check for cancellation while sleeping and interrupt sleep. * If cancelled, break the loop, don't send a refresh cmd. --- proxy/src/cancellation.rs | 10 ++++++++-- proxy/src/util.rs | 14 +++++++++++--- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 4ea4c4ea54..03be9dd4cf 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -28,6 +28,7 @@ use crate::pqproto::CancelKeyData; use crate::rate_limiter::LeakyBucketRateLimiter; use crate::redis::keys::KeyPrefix; use crate::redis::kv_ops::{RedisKVClient, RedisKVClientError}; +use crate::util::run_until; type IpSubnetKey = IpNet; @@ -498,8 +499,13 @@ impl Session { "registered cancellation key" ); - // wait before continuing. - tokio::time::sleep(CANCEL_KEY_REFRESH).await; + // wait before continuing. break immediately if cancelled. + if run_until(tokio::time::sleep(CANCEL_KEY_REFRESH), cancel.as_mut()) + .await + .is_err() + { + break; + } } // retry immediately. Err(BatchQueueError::Result(error)) => { diff --git a/proxy/src/util.rs b/proxy/src/util.rs index 7fc2d9fbdb..0291216d94 100644 --- a/proxy/src/util.rs +++ b/proxy/src/util.rs @@ -7,8 +7,16 @@ pub async fn run_until_cancelled( f: F, cancellation_token: &CancellationToken, ) -> Option { - match select(pin!(f), pin!(cancellation_token.cancelled())).await { - Either::Left((f, _)) => Some(f), - Either::Right(((), _)) => None, + run_until(f, cancellation_token.cancelled()).await.ok() +} + +/// Runs the future `f` unless interrupted by future `condition`. +pub async fn run_until( + f: F1, + condition: F2, +) -> Result { + match select(pin!(f), pin!(condition)).await { + Either::Left((f1, _)) => Ok(f1), + Either::Right((f2, _)) => Err(f2), } } From 296c9190b2f6e12c571a2b71f070b1c5597738e8 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Mon, 14 Jul 2025 00:49:23 +0200 Subject: [PATCH 14/14] proxy: Use EXPIRE command to refresh cancel entries (#12580) ## Problem When refreshing cancellation data we resend the entire value again just to reset the TTL, which causes unnecessary load in proxy, on network and possibly on redis side. ## Summary of changes * Switch from using SET with full value to using EXPIRE to reset TTL. * Add a tiny delay between retries to prevent busy loop. * Shorten CancelKeyOp variants: drop redundant suffix. * Retry SET when EXPIRE failed. --- proxy/src/cancellation.rs | 130 +++++++++++++++++++++++++++----------- proxy/src/metrics.rs | 1 + 2 files changed, 95 insertions(+), 36 deletions(-) diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 03be9dd4cf..77062d3bb4 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -32,20 +32,24 @@ use crate::util::run_until; type IpSubnetKey = IpNet; -const CANCEL_KEY_TTL: std::time::Duration = std::time::Duration::from_secs(600); -const CANCEL_KEY_REFRESH: std::time::Duration = std::time::Duration::from_secs(570); +const CANCEL_KEY_TTL: Duration = Duration::from_secs(600); +const CANCEL_KEY_REFRESH: Duration = Duration::from_secs(570); // Message types for sending through mpsc channel pub enum CancelKeyOp { - StoreCancelKey { + Store { key: CancelKeyData, value: Box, - expire: std::time::Duration, + expire: Duration, }, - GetCancelData { + Refresh { + key: CancelKeyData, + expire: Duration, + }, + Get { key: CancelKeyData, }, - GetCancelDataOld { + GetOld { key: CancelKeyData, }, } @@ -108,7 +112,7 @@ impl Pipeline { impl CancelKeyOp { fn register(&self, pipe: &mut Pipeline) { match self { - CancelKeyOp::StoreCancelKey { key, value, expire } => { + CancelKeyOp::Store { key, value, expire } => { let key = KeyPrefix::Cancel(*key).build_redis_key(); pipe.add_command(Cmd::set_options( &key, @@ -116,11 +120,15 @@ impl CancelKeyOp { SetOptions::default().with_expiration(SetExpiry::EX(expire.as_secs())), )); } - CancelKeyOp::GetCancelDataOld { key } => { + CancelKeyOp::Refresh { key, expire } => { + let key = KeyPrefix::Cancel(*key).build_redis_key(); + pipe.add_command(Cmd::expire(&key, expire.as_secs() as i64)); + } + CancelKeyOp::GetOld { key } => { let key = KeyPrefix::Cancel(*key).build_redis_key(); pipe.add_command(Cmd::hget(key, "data")); } - CancelKeyOp::GetCancelData { key } => { + CancelKeyOp::Get { key } => { let key = KeyPrefix::Cancel(*key).build_redis_key(); pipe.add_command(Cmd::get(key)); } @@ -264,7 +272,7 @@ impl CancellationHandler { .proxy .cancel_channel_size .guard(RedisMsgKind::Get); - let op = CancelKeyOp::GetCancelData { key }; + let op = CancelKeyOp::Get { key }; let result = timeout( TIMEOUT, tx.call((guard, op), std::future::pending::()), @@ -289,7 +297,7 @@ impl CancellationHandler { .proxy .cancel_channel_size .guard(RedisMsgKind::HGet); - let op = CancelKeyOp::GetCancelDataOld { key }; + let op = CancelKeyOp::GetOld { key }; timeout( TIMEOUT, tx.call((guard, op), std::future::pending::()), @@ -474,45 +482,95 @@ impl Session { let mut cancel = pin!(cancel); + enum State { + Set, + Refresh, + } + let mut state = State::Set; + loop { - let guard = Metrics::get() - .proxy - .cancel_channel_size - .guard(RedisMsgKind::Set); - let op = CancelKeyOp::StoreCancelKey { - key: self.key, - value: closure_json.clone(), - expire: CANCEL_KEY_TTL, + let guard_op = match state { + State::Set => { + let guard = Metrics::get() + .proxy + .cancel_channel_size + .guard(RedisMsgKind::Set); + let op = CancelKeyOp::Store { + key: self.key, + value: closure_json.clone(), + expire: CANCEL_KEY_TTL, + }; + tracing::debug!( + src=%self.key, + dest=?cancel_closure.cancel_token, + "registering cancellation key" + ); + (guard, op) + } + + State::Refresh => { + let guard = Metrics::get() + .proxy + .cancel_channel_size + .guard(RedisMsgKind::Expire); + let op = CancelKeyOp::Refresh { + key: self.key, + expire: CANCEL_KEY_TTL, + }; + tracing::debug!( + src=%self.key, + dest=?cancel_closure.cancel_token, + "refreshing cancellation key" + ); + (guard, op) + } }; - tracing::debug!( - src=%self.key, - dest=?cancel_closure.cancel_token, - "registering cancellation key" - ); - - match tx.call((guard, op), cancel.as_mut()).await { - Ok(_) => { + match tx.call(guard_op, cancel.as_mut()).await { + // SET returns OK + Ok(Value::Okay) => { tracing::debug!( src=%self.key, dest=?cancel_closure.cancel_token, "registered cancellation key" ); - - // wait before continuing. break immediately if cancelled. - if run_until(tokio::time::sleep(CANCEL_KEY_REFRESH), cancel.as_mut()) - .await - .is_err() - { - break; - } + state = State::Refresh; } + + // EXPIRE returns 1 + Ok(Value::Int(1)) => { + tracing::debug!( + src=%self.key, + dest=?cancel_closure.cancel_token, + "refreshed cancellation key" + ); + } + + Ok(_) => { + // Any other response likely means the key expired. + tracing::warn!(src=%self.key, "refreshing cancellation key failed"); + // Re-enter the SET loop to repush full data. + state = State::Set; + } + // retry immediately. Err(BatchQueueError::Result(error)) => { - tracing::warn!(?error, "error registering cancellation key"); + tracing::warn!(?error, "error refreshing cancellation key"); + // Small delay to prevent busy loop with high cpu and logging. + tokio::time::sleep(Duration::from_millis(10)).await; + continue; } + Err(BatchQueueError::Cancelled(Err(_cancelled))) => break, } + + // wait before continuing. break immediately if cancelled. + if run_until(tokio::time::sleep(CANCEL_KEY_REFRESH), cancel.as_mut()) + .await + .is_err() + { + break; + } } if let Err(err) = cancel_closure diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 8439082498..bf4d5a11eb 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -376,6 +376,7 @@ pub enum Waiting { pub enum RedisMsgKind { Set, Get, + Expire, HGet, }