From b5b1db29bb3b2c11665ae4f891b235fc9d5d5b31 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 10 Jul 2025 12:25:15 +0300 Subject: [PATCH] Implement shard map live-update --- .../src/worker_process/main_loop.rs | 38 +++++++- .../src/worker_process/worker_interface.rs | 9 +- pgxn/neon/communicator_new.c | 35 +++++-- pgxn/neon/libpagestore.c | 96 +++---------------- pgxn/neon/pagestore_client.h | 8 +- 5 files changed, 91 insertions(+), 95 deletions(-) diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 171bb8fbf4..3ae187ac16 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -29,17 +29,31 @@ use tracing::{error, info, info_span, trace}; use utils::lsn::Lsn; pub struct CommunicatorWorkerProcessStruct<'a> { - neon_request_slots: &'a [NeonIOHandle], + /// Tokio runtime that the main loop and any other related tasks runs in. + runtime: tokio::runtime::Handle, + /// Client to communicate with the pageserver client: PageserverClient, - pub(crate) cache: IntegratedCacheWriteAccess<'a>, + /// Request slots that backends use to send IO requests to the communicator. + neon_request_slots: &'a [NeonIOHandle], + /// Notification pipe. Backends use this to notify the communicator that a request is waiting to + /// be processed in one of the request slots. submission_pipe_read_fd: OwnedFd, + /// Locking table for all in-progress IO requests. in_progress_table: RequestInProgressTable, - // Metrics + /// Local File Cache, relation size tracking, last-written LSN tracking + pub(crate) cache: IntegratedCacheWriteAccess<'a>, + + /*** Static configuration ***/ + /// Stripe size doesn't change after startup. (The shard map is not stored here, it's passed + /// directly to the client) + stripe_size: Option, + + /*** Metrics ***/ request_counters: IntCounterVec, request_rel_exists_counter: IntCounter, request_rel_size_counter: IntCounter, @@ -146,6 +160,8 @@ pub(super) async fn init( request_nblocks_counters.with_label_values(&["rel_zero_extend"]); CommunicatorWorkerProcessStruct { + runtime: tokio::runtime::Handle::current(), + stripe_size, neon_request_slots: cis.neon_request_slots, client, cache, @@ -179,6 +195,22 @@ pub(super) async fn init( } impl<'t> CommunicatorWorkerProcessStruct<'t> { + /// Update the configuration + pub(super) fn update_shard_map( + &self, + new_shard_map: HashMap, + ) { + let shard_spec = + ShardSpec::new(new_shard_map, self.stripe_size.clone()).expect("invalid shard spec"); + + { + let _in_runtime = self.runtime.enter(); + if let Err(err) = self.client.update_shards(shard_spec) { + tracing::error!("could not update shard map: {err:?}"); + } + } + } + /// Main loop of the worker process. Receive requests from the backends and process them. pub(super) async fn run(&'static self) { let mut idxbuf: [u8; 4] = [0; 4]; diff --git a/pgxn/neon/communicator/src/worker_process/worker_interface.rs b/pgxn/neon/communicator/src/worker_process/worker_interface.rs index e873555daa..a7bd79fa83 100644 --- a/pgxn/neon/communicator/src/worker_process/worker_interface.rs +++ b/pgxn/neon/communicator/src/worker_process/worker_interface.rs @@ -51,7 +51,7 @@ pub extern "C" fn communicator_worker_process_launch( Some(PathBuf::from(c_str.to_str().unwrap())) } }; - let shard_map = parse_shard_map(nshards, shard_map); + let shard_map = shard_map_to_hash(nshards, shard_map); // start main loop let runtime = tokio::runtime::Builder::new_multi_thread() @@ -92,7 +92,7 @@ pub extern "C" fn communicator_worker_process_launch( } /// Convert the "shard map" from an array of C strings, indexed by shard no to a rust HashMap -fn parse_shard_map( +fn shard_map_to_hash( nshards: u32, shard_map: *mut *mut c_char, ) -> HashMap { @@ -124,6 +124,11 @@ fn parse_shard_map( pub extern "C" fn communicator_worker_config_reload( proc_handle: &'static CommunicatorWorkerProcessStruct<'static>, file_cache_size: u64, + shard_map: *mut *mut c_char, + nshards: u32, ) { proc_handle.cache.resize_file_cache(file_cache_size as u32); + + let shard_map = shard_map_to_hash(nshards, shard_map); + proc_handle.update_shard_map(shard_map); } diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index f2cb23cd4e..cc0a1634a7 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -292,8 +292,8 @@ communicator_new_shmem_startup(void) void communicator_new_bgworker_main(Datum main_arg) { - char **connstrs; - shardno_t num_shards; + char **connstrings; + ShardMap shard_map; struct LoggingState *logging; char errbuf[1000]; int elevel; @@ -325,7 +325,14 @@ communicator_new_bgworker_main(Datum main_arg) BackgroundWorkerUnblockSignals(); - get_shard_map(&connstrs, &num_shards); + if (!parse_shard_map(pageserver_grpc_urls, &shard_map)) + { + /* shouldn't happen, as the GUC was verified already */ + elog(FATAL, "could not parse neon.pageserver_grpcs_urls"); + } + connstrings = palloc(shard_map.num_shards * sizeof(char *)); + for (int i = 0; i < shard_map.num_shards; i++) + connstrings[i] = shard_map.connstring[i]; logging = configure_logging(); @@ -334,11 +341,12 @@ communicator_new_bgworker_main(Datum main_arg) neon_tenant, neon_timeline, neon_auth_token, - connstrs, - num_shards, + connstrings, + shard_map.num_shards, neon_stripe_size, lfc_path, file_cache_size); + pfree(connstrings); cis = NULL; elog(LOG, "communicator threads started"); @@ -357,7 +365,22 @@ communicator_new_bgworker_main(Datum main_arg) file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ); if (file_cache_size < 100) file_cache_size = 100; - communicator_worker_config_reload(proc_handle, file_cache_size); + + /* Reload pageserver URLs */ + if (!parse_shard_map(pageserver_grpc_urls, &shard_map)) + { + /* shouldn't happen, as the GUC was verified already */ + elog(FATAL, "could not parse neon.pageserver_grpcs_urls"); + } + connstrings = palloc(shard_map.num_shards * sizeof(char *)); + for (int i = 0; i < shard_map.num_shards; i++) + connstrings[i] = shard_map.connstring[i]; + + communicator_worker_config_reload(proc_handle, + file_cache_size, + connstrings, + shard_map.num_shards); + pfree(connstrings); } for (;;) diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index f64e6ee233..f99084633a 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -87,12 +87,6 @@ static int pageserver_response_log_timeout = 10000; /* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */ static int pageserver_response_disconnect_timeout = 150000; -typedef struct -{ - char connstring[MAX_SHARDS][MAX_PAGESERVER_CONNSTRING_SIZE]; - size_t num_shards; -} ShardMap; - /* * PagestoreShmemState is kept in shared memory. It contains the connection * strings for each shard. @@ -193,8 +187,8 @@ PagestoreShmemIsValid(void) * not valid, returns false. The contents of *result are undefined in * that case, and must not be relied on. */ -static bool -ParseShardMap(const char *connstr, ShardMap *result) +bool +parse_shard_map(const char *connstr, ShardMap *result) { const char *p; int nshards = 0; @@ -248,7 +242,7 @@ CheckPageserverConnstring(char **newval, void **extra, GucSource source) { char *p = *newval; - return ParseShardMap(p, NULL); + return parse_shard_map(p, NULL); } static void @@ -257,11 +251,17 @@ AssignPageserverConnstring(const char *newval, void *extra) /* * 'neon.pageserver_connstring' is ignored if the new communicator is used. * In that case, the shard map is loaded from 'neon.pageserver_grpc_urls' - * instead. + * instead, and that happens in the communicator process only. */ if (neon_enable_new_communicator) return; + /* + * Only postmaster updates the copy in shared memory. + */ + if (!PagestoreShmemIsValid() || IsUnderPostmaster) + return; + AssignShardMap(newval); } @@ -272,36 +272,15 @@ CheckPageserverGrpcUrls(char **newval, void **extra, GucSource source) { char *p = *newval; - return ParseShardMap(p, NULL); + return parse_shard_map(p, NULL); } -static void -AssignPageserverGrpcUrls(const char *newval, void *extra) -{ - /* - * 'neon.pageserver_grpc-urls' is ignored if the new communicator is not - * used. In that case, the shard map is loaded from 'neon.pageserver_connstring' - instead. - */ - if (!neon_enable_new_communicator) - return; - - AssignShardMap(newval); -} - - static void AssignShardMap(const char *newval) { ShardMap shard_map; - /* - * Only postmaster updates the copy in shared memory. - */ - if (!PagestoreShmemIsValid() || IsUnderPostmaster) - return; - - if (!ParseShardMap(newval, &shard_map)) + if (!parse_shard_map(newval, &shard_map)) { /* * shouldn't happen, because we already checked the value in @@ -324,54 +303,6 @@ AssignShardMap(const char *newval) } } -/* Return a copy of the whole shard map from shared memory */ -void -get_shard_map(char ***connstrs_p, shardno_t *num_shards_p) -{ - uint64 begin_update_counter; - uint64 end_update_counter; - ShardMap *shard_map = &pagestore_shared->shard_map; - shardno_t num_shards; - char *buf; - char **connstrs; - - buf = palloc(MAX_SHARDS*MAX_PAGESERVER_CONNSTRING_SIZE); - connstrs = palloc(sizeof(char *) * MAX_SHARDS); - - /* - * Postmaster can update the shared memory values concurrently, in which - * case we would copy a garbled mix of the old and new values. We will - * detect it because the counter's won't match, and retry. But it's - * important that we don't do anything within the retry-loop that would - * depend on the string having valid contents. - */ - do - { - char *p; - - begin_update_counter = pg_atomic_read_u64(&pagestore_shared->begin_update_counter); - end_update_counter = pg_atomic_read_u64(&pagestore_shared->end_update_counter); - - num_shards = shard_map->num_shards; - - p = buf; - for (int i = 0; i < Min(num_shards, MAX_SHARDS); i++) - { - strlcpy(p, shard_map->connstring[i], MAX_PAGESERVER_CONNSTRING_SIZE); - connstrs[i] = p; - p += MAX_PAGESERVER_CONNSTRING_SIZE; - } - - pg_memory_barrier(); - } - while (begin_update_counter != end_update_counter - || begin_update_counter != pg_atomic_read_u64(&pagestore_shared->begin_update_counter) - || end_update_counter != pg_atomic_read_u64(&pagestore_shared->end_update_counter)); - - *connstrs_p = connstrs; - *num_shards_p = num_shards; -} - /* * Get the current number of shards, and/or the connection string for a * particular shard from the shard map in shared memory. @@ -1396,7 +1327,6 @@ PagestoreShmemInit(void) pg_atomic_init_u64(&pagestore_shared->end_update_counter, 0); memset(&pagestore_shared->shard_map, 0, sizeof(ShardMap)); AssignPageserverConnstring(pageserver_connstring, NULL); - AssignPageserverGrpcUrls(pageserver_grpc_urls, NULL); } NeonPerfCountersShmemInit(); @@ -1462,7 +1392,7 @@ pg_init_libpagestore(void) "", PGC_SIGHUP, 0, /* no flags required */ - CheckPageserverGrpcUrls, AssignPageserverGrpcUrls, NULL); + CheckPageserverGrpcUrls, NULL, NULL); DefineCustomStringVariable("neon.timeline_id", "Neon timeline_id the server is running on", diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index b3c074c9ee..8ec8ce5408 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -246,7 +246,13 @@ extern int32 max_cluster_size; extern int neon_protocol_version; extern int neon_stripe_size; -extern void get_shard_map(char ***connstrs_p, shardno_t *num_shards_p); +typedef struct +{ + char connstring[MAX_SHARDS][MAX_PAGESERVER_CONNSTRING_SIZE]; + size_t num_shards; +} ShardMap; + +extern bool parse_shard_map(const char *connstr, ShardMap *result); extern shardno_t get_shard_number(BufferTag* tag); extern const f_smgr *smgr_neon(ProcNumber backend, NRelFileInfo rinfo);