Implement shard map live-update

This commit is contained in:
Heikki Linnakangas
2025-07-10 12:25:15 +03:00
parent ed4652b65b
commit b5b1db29bb
5 changed files with 91 additions and 95 deletions

View File

@@ -29,17 +29,31 @@ use tracing::{error, info, info_span, trace};
use utils::lsn::Lsn;
pub struct CommunicatorWorkerProcessStruct<'a> {
neon_request_slots: &'a [NeonIOHandle],
/// Tokio runtime that the main loop and any other related tasks runs in.
runtime: tokio::runtime::Handle,
/// Client to communicate with the pageserver
client: PageserverClient,
pub(crate) cache: IntegratedCacheWriteAccess<'a>,
/// Request slots that backends use to send IO requests to the communicator.
neon_request_slots: &'a [NeonIOHandle],
/// Notification pipe. Backends use this to notify the communicator that a request is waiting to
/// be processed in one of the request slots.
submission_pipe_read_fd: OwnedFd,
/// Locking table for all in-progress IO requests.
in_progress_table: RequestInProgressTable,
// Metrics
/// Local File Cache, relation size tracking, last-written LSN tracking
pub(crate) cache: IntegratedCacheWriteAccess<'a>,
/*** Static configuration ***/
/// Stripe size doesn't change after startup. (The shard map is not stored here, it's passed
/// directly to the client)
stripe_size: Option<ShardStripeSize>,
/*** Metrics ***/
request_counters: IntCounterVec,
request_rel_exists_counter: IntCounter,
request_rel_size_counter: IntCounter,
@@ -146,6 +160,8 @@ pub(super) async fn init(
request_nblocks_counters.with_label_values(&["rel_zero_extend"]);
CommunicatorWorkerProcessStruct {
runtime: tokio::runtime::Handle::current(),
stripe_size,
neon_request_slots: cis.neon_request_slots,
client,
cache,
@@ -179,6 +195,22 @@ pub(super) async fn init(
}
impl<'t> CommunicatorWorkerProcessStruct<'t> {
/// Update the configuration
pub(super) fn update_shard_map(
&self,
new_shard_map: HashMap<utils::shard::ShardIndex, String>,
) {
let shard_spec =
ShardSpec::new(new_shard_map, self.stripe_size.clone()).expect("invalid shard spec");
{
let _in_runtime = self.runtime.enter();
if let Err(err) = self.client.update_shards(shard_spec) {
tracing::error!("could not update shard map: {err:?}");
}
}
}
/// Main loop of the worker process. Receive requests from the backends and process them.
pub(super) async fn run(&'static self) {
let mut idxbuf: [u8; 4] = [0; 4];

View File

@@ -51,7 +51,7 @@ pub extern "C" fn communicator_worker_process_launch(
Some(PathBuf::from(c_str.to_str().unwrap()))
}
};
let shard_map = parse_shard_map(nshards, shard_map);
let shard_map = shard_map_to_hash(nshards, shard_map);
// start main loop
let runtime = tokio::runtime::Builder::new_multi_thread()
@@ -92,7 +92,7 @@ pub extern "C" fn communicator_worker_process_launch(
}
/// Convert the "shard map" from an array of C strings, indexed by shard no to a rust HashMap
fn parse_shard_map(
fn shard_map_to_hash(
nshards: u32,
shard_map: *mut *mut c_char,
) -> HashMap<utils::shard::ShardIndex, String> {
@@ -124,6 +124,11 @@ fn parse_shard_map(
pub extern "C" fn communicator_worker_config_reload(
proc_handle: &'static CommunicatorWorkerProcessStruct<'static>,
file_cache_size: u64,
shard_map: *mut *mut c_char,
nshards: u32,
) {
proc_handle.cache.resize_file_cache(file_cache_size as u32);
let shard_map = shard_map_to_hash(nshards, shard_map);
proc_handle.update_shard_map(shard_map);
}

View File

@@ -292,8 +292,8 @@ communicator_new_shmem_startup(void)
void
communicator_new_bgworker_main(Datum main_arg)
{
char **connstrs;
shardno_t num_shards;
char **connstrings;
ShardMap shard_map;
struct LoggingState *logging;
char errbuf[1000];
int elevel;
@@ -325,7 +325,14 @@ communicator_new_bgworker_main(Datum main_arg)
BackgroundWorkerUnblockSignals();
get_shard_map(&connstrs, &num_shards);
if (!parse_shard_map(pageserver_grpc_urls, &shard_map))
{
/* shouldn't happen, as the GUC was verified already */
elog(FATAL, "could not parse neon.pageserver_grpcs_urls");
}
connstrings = palloc(shard_map.num_shards * sizeof(char *));
for (int i = 0; i < shard_map.num_shards; i++)
connstrings[i] = shard_map.connstring[i];
logging = configure_logging();
@@ -334,11 +341,12 @@ communicator_new_bgworker_main(Datum main_arg)
neon_tenant,
neon_timeline,
neon_auth_token,
connstrs,
num_shards,
connstrings,
shard_map.num_shards,
neon_stripe_size,
lfc_path,
file_cache_size);
pfree(connstrings);
cis = NULL;
elog(LOG, "communicator threads started");
@@ -357,7 +365,22 @@ communicator_new_bgworker_main(Datum main_arg)
file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ);
if (file_cache_size < 100)
file_cache_size = 100;
communicator_worker_config_reload(proc_handle, file_cache_size);
/* Reload pageserver URLs */
if (!parse_shard_map(pageserver_grpc_urls, &shard_map))
{
/* shouldn't happen, as the GUC was verified already */
elog(FATAL, "could not parse neon.pageserver_grpcs_urls");
}
connstrings = palloc(shard_map.num_shards * sizeof(char *));
for (int i = 0; i < shard_map.num_shards; i++)
connstrings[i] = shard_map.connstring[i];
communicator_worker_config_reload(proc_handle,
file_cache_size,
connstrings,
shard_map.num_shards);
pfree(connstrings);
}
for (;;)

View File

@@ -87,12 +87,6 @@ static int pageserver_response_log_timeout = 10000;
/* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */
static int pageserver_response_disconnect_timeout = 150000;
typedef struct
{
char connstring[MAX_SHARDS][MAX_PAGESERVER_CONNSTRING_SIZE];
size_t num_shards;
} ShardMap;
/*
* PagestoreShmemState is kept in shared memory. It contains the connection
* strings for each shard.
@@ -193,8 +187,8 @@ PagestoreShmemIsValid(void)
* not valid, returns false. The contents of *result are undefined in
* that case, and must not be relied on.
*/
static bool
ParseShardMap(const char *connstr, ShardMap *result)
bool
parse_shard_map(const char *connstr, ShardMap *result)
{
const char *p;
int nshards = 0;
@@ -248,7 +242,7 @@ CheckPageserverConnstring(char **newval, void **extra, GucSource source)
{
char *p = *newval;
return ParseShardMap(p, NULL);
return parse_shard_map(p, NULL);
}
static void
@@ -257,11 +251,17 @@ AssignPageserverConnstring(const char *newval, void *extra)
/*
* 'neon.pageserver_connstring' is ignored if the new communicator is used.
* In that case, the shard map is loaded from 'neon.pageserver_grpc_urls'
* instead.
* instead, and that happens in the communicator process only.
*/
if (neon_enable_new_communicator)
return;
/*
* Only postmaster updates the copy in shared memory.
*/
if (!PagestoreShmemIsValid() || IsUnderPostmaster)
return;
AssignShardMap(newval);
}
@@ -272,36 +272,15 @@ CheckPageserverGrpcUrls(char **newval, void **extra, GucSource source)
{
char *p = *newval;
return ParseShardMap(p, NULL);
return parse_shard_map(p, NULL);
}
static void
AssignPageserverGrpcUrls(const char *newval, void *extra)
{
/*
* 'neon.pageserver_grpc-urls' is ignored if the new communicator is not
* used. In that case, the shard map is loaded from 'neon.pageserver_connstring'
instead.
*/
if (!neon_enable_new_communicator)
return;
AssignShardMap(newval);
}
static void
AssignShardMap(const char *newval)
{
ShardMap shard_map;
/*
* Only postmaster updates the copy in shared memory.
*/
if (!PagestoreShmemIsValid() || IsUnderPostmaster)
return;
if (!ParseShardMap(newval, &shard_map))
if (!parse_shard_map(newval, &shard_map))
{
/*
* shouldn't happen, because we already checked the value in
@@ -324,54 +303,6 @@ AssignShardMap(const char *newval)
}
}
/* Return a copy of the whole shard map from shared memory */
void
get_shard_map(char ***connstrs_p, shardno_t *num_shards_p)
{
uint64 begin_update_counter;
uint64 end_update_counter;
ShardMap *shard_map = &pagestore_shared->shard_map;
shardno_t num_shards;
char *buf;
char **connstrs;
buf = palloc(MAX_SHARDS*MAX_PAGESERVER_CONNSTRING_SIZE);
connstrs = palloc(sizeof(char *) * MAX_SHARDS);
/*
* Postmaster can update the shared memory values concurrently, in which
* case we would copy a garbled mix of the old and new values. We will
* detect it because the counter's won't match, and retry. But it's
* important that we don't do anything within the retry-loop that would
* depend on the string having valid contents.
*/
do
{
char *p;
begin_update_counter = pg_atomic_read_u64(&pagestore_shared->begin_update_counter);
end_update_counter = pg_atomic_read_u64(&pagestore_shared->end_update_counter);
num_shards = shard_map->num_shards;
p = buf;
for (int i = 0; i < Min(num_shards, MAX_SHARDS); i++)
{
strlcpy(p, shard_map->connstring[i], MAX_PAGESERVER_CONNSTRING_SIZE);
connstrs[i] = p;
p += MAX_PAGESERVER_CONNSTRING_SIZE;
}
pg_memory_barrier();
}
while (begin_update_counter != end_update_counter
|| begin_update_counter != pg_atomic_read_u64(&pagestore_shared->begin_update_counter)
|| end_update_counter != pg_atomic_read_u64(&pagestore_shared->end_update_counter));
*connstrs_p = connstrs;
*num_shards_p = num_shards;
}
/*
* Get the current number of shards, and/or the connection string for a
* particular shard from the shard map in shared memory.
@@ -1396,7 +1327,6 @@ PagestoreShmemInit(void)
pg_atomic_init_u64(&pagestore_shared->end_update_counter, 0);
memset(&pagestore_shared->shard_map, 0, sizeof(ShardMap));
AssignPageserverConnstring(pageserver_connstring, NULL);
AssignPageserverGrpcUrls(pageserver_grpc_urls, NULL);
}
NeonPerfCountersShmemInit();
@@ -1462,7 +1392,7 @@ pg_init_libpagestore(void)
"",
PGC_SIGHUP,
0, /* no flags required */
CheckPageserverGrpcUrls, AssignPageserverGrpcUrls, NULL);
CheckPageserverGrpcUrls, NULL, NULL);
DefineCustomStringVariable("neon.timeline_id",
"Neon timeline_id the server is running on",

View File

@@ -246,7 +246,13 @@ extern int32 max_cluster_size;
extern int neon_protocol_version;
extern int neon_stripe_size;
extern void get_shard_map(char ***connstrs_p, shardno_t *num_shards_p);
typedef struct
{
char connstring[MAX_SHARDS][MAX_PAGESERVER_CONNSTRING_SIZE];
size_t num_shards;
} ShardMap;
extern bool parse_shard_map(const char *connstr, ShardMap *result);
extern shardno_t get_shard_number(BufferTag* tag);
extern const f_smgr *smgr_neon(ProcNumber backend, NRelFileInfo rinfo);