Refactor communicator process initialization when new communicator is not used

This should fix the 'cargo test' failures on xlog_utils tests, which
launch Postgres in stand-alone mode, i.e. without setting 'neon_tenant'
This commit is contained in:
Heikki Linnakangas
2025-07-23 12:57:49 +03:00
parent eaec6e2fb4
commit 6cd1295d9f
5 changed files with 212 additions and 154 deletions

View File

@@ -22,71 +22,87 @@ use measured::MetricGroup;
use measured::text::BufferedTextEncoder;
use std::io::ErrorKind;
use std::sync::Arc;
use tokio::net::UnixListener;
use crate::NEON_COMMUNICATOR_SOCKET_NAME;
use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct;
impl<'a> CommunicatorWorkerProcessStruct<'a> {
/// Launch the listener
pub(crate) async fn launch_control_socket_listener(
&'static self,
) -> Result<(), std::io::Error> {
use axum::routing::get;
let app = Router::new()
.route("/metrics", get(get_metrics))
.route("/autoscaling_metrics", get(get_autoscaling_metrics))
.route("/debug/panic", get(handle_debug_panic))
.route("/debug/dump_cache_map", get(dump_cache_map))
.with_state(self);
enum ControlSocketState<'a> {
Full(&'a CommunicatorWorkerProcessStruct<'a>),
Legacy(LegacyControlSocketState),
}
// If the server is restarted, there might be an old socket still
// lying around. Remove it first.
match std::fs::remove_file(NEON_COMMUNICATOR_SOCKET_NAME) {
Ok(()) => {
tracing::warn!("removed stale control socket");
}
Err(e) if e.kind() == ErrorKind::NotFound => {}
Err(e) => {
tracing::error!("could not remove stale control socket: {e:#}");
// Try to proceed anyway. It will likely fail below though.
}
};
struct LegacyControlSocketState;
// Create the unix domain socket and start listening on it
let listener = UnixListener::bind(NEON_COMMUNICATOR_SOCKET_NAME)?;
/// Launch the listener
pub(crate) async fn launch_listener(
worker: Option<&'static CommunicatorWorkerProcessStruct<'static>>,
) -> Result<(), std::io::Error> {
use axum::routing::get;
tokio::spawn(async {
tracing::info!("control socket listener spawned");
axum::serve(listener, app)
.await
.expect("axum::serve never returns")
});
let state = match worker {
Some(worker) => ControlSocketState::Full(worker),
None => ControlSocketState::Legacy(LegacyControlSocketState),
};
Ok(())
}
let app = Router::new()
.route("/metrics", get(get_metrics))
.route("/autoscaling_metrics", get(get_autoscaling_metrics))
.route("/debug/panic", get(handle_debug_panic))
.route("/debug/dump_cache_map", get(dump_cache_map))
.with_state(Arc::new(state));
// If the server is restarted, there might be an old socket still
// lying around. Remove it first.
match std::fs::remove_file(NEON_COMMUNICATOR_SOCKET_NAME) {
Ok(()) => {
tracing::warn!("removed stale control socket");
}
Err(e) if e.kind() == ErrorKind::NotFound => {}
Err(e) => {
tracing::error!("could not remove stale control socket: {e:#}");
// Try to proceed anyway. It will likely fail below though.
}
};
// Create the unix domain socket and start listening on it
let listener = UnixListener::bind(NEON_COMMUNICATOR_SOCKET_NAME)?;
tokio::spawn(async {
tracing::info!("control socket listener spawned");
axum::serve(listener, app)
.await
.expect("axum::serve never returns")
});
Ok(())
}
/// Expose all Prometheus metrics.
async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct<'_>>) -> Response {
tracing::trace!("/metrics requested");
metrics_to_response(&state).await
async fn get_metrics(State(state): State<Arc<ControlSocketState<'_>>>) -> Response {
match state.as_ref() {
ControlSocketState::Full(worker) => metrics_to_response(&worker).await,
ControlSocketState::Legacy(_) => {
todo!()
}
}
}
/// Expose Prometheus metrics, for use by the autoscaling agent.
///
/// This is a subset of all the metrics.
async fn get_autoscaling_metrics(
State(state): State<&CommunicatorWorkerProcessStruct<'_>>,
) -> Response {
tracing::trace!("/metrics requested");
metrics_to_response(&state.lfc_metrics).await
async fn get_autoscaling_metrics(State(state): State<Arc<ControlSocketState<'_>>>) -> Response {
match state.as_ref() {
ControlSocketState::Full(worker) => metrics_to_response(&worker.lfc_metrics).await,
ControlSocketState::Legacy(_) => {
todo!()
}
}
}
async fn handle_debug_panic(
State(_state): State<&CommunicatorWorkerProcessStruct<'_>>,
) -> Response {
async fn handle_debug_panic(State(_state): State<Arc<ControlSocketState<'_>>>) -> Response {
panic!("test HTTP handler task panic");
}
@@ -104,15 +120,20 @@ async fn metrics_to_response(metrics: &(dyn MetricGroup<BufferedTextEncoder> + S
.unwrap()
}
async fn dump_cache_map(
State(state): State<&CommunicatorWorkerProcessStruct<'static>>,
) -> Response {
let mut buf: Vec<u8> = Vec::new();
state.cache.dump_map(&mut buf);
async fn dump_cache_map(State(state): State<Arc<ControlSocketState<'_>>>) -> Response {
match state.as_ref() {
ControlSocketState::Full(worker) => {
let mut buf: Vec<u8> = Vec::new();
worker.cache.dump_map(&mut buf);
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "application/text")
.body(Body::from(buf))
.unwrap()
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "application/text")
.body(Body::from(buf))
.unwrap()
}
ControlSocketState::Legacy(_) => {
todo!()
}
}
}

View File

@@ -11,6 +11,7 @@ use crate::init::CommunicatorInitStruct;
use crate::integrated_cache::{CacheResult, IntegratedCacheWriteAccess};
use crate::neon_request::{CGetPageVRequest, CPrefetchVRequest};
use crate::neon_request::{INVALID_BLOCK_NUMBER, NeonIORequest, NeonIOResult};
use crate::worker_process::control_socket;
use crate::worker_process::in_progress_ios::{RequestInProgressKey, RequestInProgressTable};
use crate::worker_process::lfc_metrics::LfcMetricsCollector;
use pageserver_client_grpc::{PageserverClient, ShardSpec, ShardStripeSize};
@@ -38,7 +39,7 @@ pub struct CommunicatorWorkerProcessStruct<'a> {
runtime: tokio::runtime::Runtime,
/// Client to communicate with the pageserver
client: Option<PageserverClient>,
client: PageserverClient,
/// Request slots that backends use to send IO requests to the communicator.
neon_request_slots: &'a [NeonIORequestSlot],
@@ -88,12 +89,31 @@ impl RequestTypeLabelGroup {
}
}
/// Launch the communicator process's Rust subsystems
#[allow(clippy::too_many_arguments)]
pub(super) fn init_legacy() -> Result<(), String> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("communicator thread")
.build()
.unwrap();
// Start the listener on the control socket
runtime
.block_on(control_socket::launch_listener(None))
.map_err(|e| e.to_string())?;
Box::leak(Box::new(runtime));
Ok(())
}
/// Launch the communicator process's Rust subsystems
#[allow(clippy::too_many_arguments)]
pub(super) fn init(
cis: CommunicatorInitStruct,
tenant_id: Option<&str>,
timeline_id: Option<&str>,
tenant_id: &str,
timeline_id: &str,
auth_token: Option<&str>,
shard_map: HashMap<utils::shard::ShardIndex, String>,
stripe_size: Option<ShardStripeSize>,
@@ -101,13 +121,9 @@ pub(super) fn init(
file_cache_path: Option<PathBuf>,
) -> Result<&'static CommunicatorWorkerProcessStruct<'static>, String> {
// The caller validated these already
let tenant_id = tenant_id
.map(TenantId::from_str)
.transpose()
let tenant_id = TenantId::from_str(tenant_id)
.map_err(|e| format!("invalid tenant ID: {e}"))?;
let timeline_id = timeline_id
.map(TimelineId::from_str)
.transpose()
let timeline_id = TimelineId::from_str(timeline_id)
.map_err(|e| format!("invalid timeline ID: {e}"))?;
let shard_spec =
ShardSpec::new(shard_map, stripe_size).map_err(|e| format!("invalid shard spec: {e}:"))?;
@@ -137,20 +153,16 @@ pub(super) fn init(
debug!("Initialised integrated cache: {cache:?}");
let client = if let (Some(tenant_id), Some(timeline_id)) = (tenant_id, timeline_id) {
let client = {
let _guard = runtime.enter();
Some(
PageserverClient::new(
tenant_id,
timeline_id,
shard_spec,
auth_token.map(|s| s.to_string()),
None,
)
.expect("could not create client"),
PageserverClient::new(
tenant_id,
timeline_id,
shard_spec,
auth_token.map(|s| s.to_string()),
None,
)
} else {
None
.expect("could not create client")
};
let worker_struct = CommunicatorWorkerProcessStruct {
@@ -188,7 +200,7 @@ pub(super) fn init(
// Start the listener on the control socket
worker_struct
.runtime
.block_on(worker_struct.launch_control_socket_listener())
.block_on(control_socket::launch_listener(Some(worker_struct)))
.map_err(|e| e.to_string())?;
Ok(worker_struct)
@@ -200,13 +212,12 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
&self,
new_shard_map: HashMap<utils::shard::ShardIndex, String>,
) {
let client = self.client.as_ref().unwrap();
let shard_spec =
ShardSpec::new(new_shard_map, self.stripe_size).expect("invalid shard spec");
{
let _in_runtime = self.runtime.enter();
if let Err(err) = client.update_shards(shard_spec) {
if let Err(err) = self.client.update_shards(shard_spec) {
tracing::error!("could not update shard map: {err:?}");
}
}
@@ -331,11 +342,6 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
/// Handle one IO request
async fn handle_request(&'static self, request: &'_ NeonIORequest) -> NeonIOResult {
let client = self
.client
.as_ref()
.expect("cannot handle requests without client");
self.request_counters
.inc(RequestTypeLabelGroup::from_req(request));
match request {
@@ -363,7 +369,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
};
let read_lsn = self.request_lsns(not_modified_since);
match client
match self.client
.get_rel_size(page_api::GetRelSizeRequest {
read_lsn,
rel,
@@ -402,7 +408,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
let lsn = Lsn(req.request_lsn);
let file_path = req.destination_file_path();
match client
match self
.client
.get_slru_segment(page_api::GetSlruSegmentRequest {
read_lsn: self.request_lsns(lsn),
kind: req.slru_kind,
@@ -448,7 +455,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
CacheResult::NotFound(lsn) => lsn,
};
match client
match self
.client
.get_db_size(page_api::GetDbSizeRequest {
read_lsn: self.request_lsns(not_modified_since),
db_oid: req.db_oid,
@@ -543,10 +551,6 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
/// Subroutine to handle a GetPageV request, since it's a little more complicated than
/// others.
async fn handle_get_pagev_request(&'t self, req: &CGetPageVRequest) -> Result<(), i32> {
let client = self
.client
.as_ref()
.expect("cannot handle requests without client");
let rel = req.reltag();
// Check the cache first
@@ -605,7 +609,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
"sending getpage request for blocks {:?} in rel {:?} lsns {}",
block_numbers, rel, read_lsn
);
match client
match self
.client
.get_page(page_api::GetPageRequest {
request_id: req.request_id.into(),
request_class: page_api::GetPageClass::Normal,
@@ -665,10 +670,6 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
///
/// This is very similar to a GetPageV request, but the results are only stored in the cache.
async fn handle_prefetchv_request(&'static self, req: &CPrefetchVRequest) -> Result<(), i32> {
let client = self
.client
.as_ref()
.expect("cannot handle requests without client");
let rel = req.reltag();
// Check the cache first
@@ -709,7 +710,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
// TODO: spawn separate tasks for these. Use the integrated cache to keep track of the
// in-flight requests
match client
match self
.client
.get_page(page_api::GetPageRequest {
request_id: req.request_id.into(),
request_class: page_api::GetPageClass::Prefetch,

View File

@@ -44,17 +44,13 @@ pub extern "C" fn communicator_worker_process_launch(
) -> Option<&'static CommunicatorWorkerProcessStruct<'static>> {
tracing::warn!("starting threads in rust code");
// Convert the arguments into more convenient Rust types
let tenant_id = if tenant_id.is_null() {
None
} else {
let tenant_id = {
let cstr = unsafe { CStr::from_ptr(tenant_id) };
Some(cstr.to_str().expect("assume UTF-8"))
cstr.to_str().expect("assume UTF-8")
};
let timeline_id = if timeline_id.is_null() {
None
} else {
let timeline_id = {
let cstr = unsafe { CStr::from_ptr(timeline_id) };
Some(cstr.to_str().expect("assume UTF-8"))
cstr.to_str().expect("assume UTF-8")
};
let auth_token = if auth_token.is_null() {
None
@@ -106,6 +102,25 @@ pub extern "C" fn communicator_worker_process_launch(
}
}
#[unsafe(no_mangle)]
pub extern "C" fn communicator_worker_process_launch_legacy(error_p: *mut *const c_char) -> bool {
// The `init` function does all the work.
let result = main_loop::init_legacy();
// On failure, return the error message to the C caller in *error_p.
match result {
Ok(()) => true,
Err(errmsg) => {
let errmsg = CString::new(errmsg).expect("no nuls within error message");
let errmsg = Box::leak(errmsg.into_boxed_c_str());
let p: *const c_char = errmsg.as_ptr();
unsafe { *error_p = p };
false
}
}
}
/// Convert the "shard map" from an array of C strings, indexed by shard no to a rust HashMap
fn shard_map_to_hash(
nshards: u32,

View File

@@ -201,6 +201,10 @@ communicator_new_shmem_size(void)
void
CommunicatorNewShmemRequest(void)
{
if (!neon_use_communicator_worker)
return;
if (neon_tenant[0] == '\0' || neon_timeline[0] == '\0')
return;
RequestAddinShmemSpace(communicator_new_shmem_size());
}
@@ -216,7 +220,14 @@ CommunicatorNewShmemInit(void)
uint64 initial_file_cache_size;
uint64 max_file_cache_size;
/* FIXME: much of this could be skipped if !neon_use_communicator_worker */
if (!neon_use_communicator_worker)
return;
if (neon_tenant[0] == '\0' || neon_timeline[0] == '\0')
{
elog(LOG, "disabling communicator worker because neon_tenant is empty");
return;
}
rc = pipe(pipefd);
if (rc != 0)
@@ -276,7 +287,6 @@ void
notify_proc_unsafe(int procno)
{
SetLatch(&communicator_shmem_ptr->backends[procno].io_completion_latch);
}
/**** Backend functions. These run in each backend ****/

View File

@@ -82,6 +82,7 @@ communicator_new_bgworker_main(Datum main_arg)
struct LoggingReceiver *logging;
const char *errmsg = NULL;
const struct CommunicatorWorkerProcessStruct *proc_handle;
bool success;
/*
* Pretend that this process is a WAL sender. That affects the shutdown
@@ -103,20 +104,6 @@ communicator_new_bgworker_main(Datum main_arg)
BackgroundWorkerUnblockSignals();
/* lfc_size_limit is in MBs */
file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ);
if (file_cache_size < 100)
file_cache_size = 100;
if (!parse_shard_map(pageserver_grpc_urls, &shard_map))
{
/* shouldn't happen, as the GUC was verified already */
elog(FATAL, "could not parse neon.pageserver_grpcs_urls");
}
connstrings = palloc(shard_map.num_shards * sizeof(char *));
for (int i = 0; i < shard_map.num_shards; i++)
connstrings[i] = shard_map.connstring[i];
/*
* By default, INFO messages are not printed to the log. We want
* `tracing::info!` messages emitted from the communicator to be printed,
@@ -131,21 +118,41 @@ communicator_new_bgworker_main(Datum main_arg)
logging = communicator_worker_configure_logging();
Assert(cis != NULL);
proc_handle = communicator_worker_process_launch(
cis,
neon_tenant[0] == '\0' ? NULL : neon_tenant,
neon_timeline[0] == '\0' ? NULL : neon_timeline,
neon_auth_token,
connstrings,
shard_map.num_shards,
neon_stripe_size,
lfc_path,
file_cache_size,
&errmsg);
pfree(connstrings);
cis = NULL;
if (proc_handle == NULL)
if (cis != NULL)
{
/* lfc_size_limit is in MBs */
file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ);
if (file_cache_size < 100)
file_cache_size = 100;
if (!parse_shard_map(pageserver_grpc_urls, &shard_map))
{
/* shouldn't happen, as the GUC was verified already */
elog(FATAL, "could not parse neon.pageserver_grpcs_urls");
}
connstrings = palloc(shard_map.num_shards * sizeof(char *));
for (int i = 0; i < shard_map.num_shards; i++)
connstrings[i] = shard_map.connstring[i];
proc_handle = communicator_worker_process_launch(
cis,
neon_tenant,
neon_timeline,
neon_auth_token,
connstrings,
shard_map.num_shards,
neon_stripe_size,
lfc_path,
file_cache_size,
&errmsg);
pfree(connstrings);
cis = NULL;
success = proc_handle != NULL;
}
else
{
success = communicator_worker_process_launch_legacy(&errmsg);
}
if (!success)
{
/*
* Something went wrong. Before exiting, forward any log messages that
@@ -206,26 +213,29 @@ communicator_new_bgworker_main(Datum main_arg)
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
/* lfc_size_limit is in MBs */
file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ);
if (file_cache_size < 100)
file_cache_size = 100;
/* Reload pageserver URLs */
if (!parse_shard_map(pageserver_grpc_urls, &shard_map))
if (proc_handle)
{
/* shouldn't happen, as the GUC was verified already */
elog(FATAL, "could not parse neon.pageserver_grpcs_urls");
}
connstrings = palloc(shard_map.num_shards * sizeof(char *));
for (int i = 0; i < shard_map.num_shards; i++)
connstrings[i] = shard_map.connstring[i];
/* lfc_size_limit is in MBs */
file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ);
if (file_cache_size < 100)
file_cache_size = 100;
communicator_worker_config_reload(proc_handle,
file_cache_size,
connstrings,
shard_map.num_shards);
pfree(connstrings);
/* Reload pageserver URLs */
if (!parse_shard_map(pageserver_grpc_urls, &shard_map))
{
/* shouldn't happen, as the GUC was verified already */
elog(FATAL, "could not parse neon.pageserver_grpcs_urls");
}
connstrings = palloc(shard_map.num_shards * sizeof(char *));
for (int i = 0; i < shard_map.num_shards; i++)
connstrings[i] = shard_map.connstring[i];
communicator_worker_config_reload(proc_handle,
file_cache_size,
connstrings,
shard_map.num_shards);
pfree(connstrings);
}
}
duration = TimestampDifferenceMilliseconds(before, GetCurrentTimestamp());