diff --git a/pgxn/neon/communicator/src/worker_process/control_socket.rs b/pgxn/neon/communicator/src/worker_process/control_socket.rs index f1d052778b..00e9f9bf11 100644 --- a/pgxn/neon/communicator/src/worker_process/control_socket.rs +++ b/pgxn/neon/communicator/src/worker_process/control_socket.rs @@ -22,71 +22,87 @@ use measured::MetricGroup; use measured::text::BufferedTextEncoder; use std::io::ErrorKind; +use std::sync::Arc; use tokio::net::UnixListener; use crate::NEON_COMMUNICATOR_SOCKET_NAME; use crate::worker_process::main_loop::CommunicatorWorkerProcessStruct; -impl<'a> CommunicatorWorkerProcessStruct<'a> { - /// Launch the listener - pub(crate) async fn launch_control_socket_listener( - &'static self, - ) -> Result<(), std::io::Error> { - use axum::routing::get; - let app = Router::new() - .route("/metrics", get(get_metrics)) - .route("/autoscaling_metrics", get(get_autoscaling_metrics)) - .route("/debug/panic", get(handle_debug_panic)) - .route("/debug/dump_cache_map", get(dump_cache_map)) - .with_state(self); +enum ControlSocketState<'a> { + Full(&'a CommunicatorWorkerProcessStruct<'a>), + Legacy(LegacyControlSocketState), +} - // If the server is restarted, there might be an old socket still - // lying around. Remove it first. - match std::fs::remove_file(NEON_COMMUNICATOR_SOCKET_NAME) { - Ok(()) => { - tracing::warn!("removed stale control socket"); - } - Err(e) if e.kind() == ErrorKind::NotFound => {} - Err(e) => { - tracing::error!("could not remove stale control socket: {e:#}"); - // Try to proceed anyway. It will likely fail below though. - } - }; +struct LegacyControlSocketState; - // Create the unix domain socket and start listening on it - let listener = UnixListener::bind(NEON_COMMUNICATOR_SOCKET_NAME)?; +/// Launch the listener +pub(crate) async fn launch_listener( + worker: Option<&'static CommunicatorWorkerProcessStruct<'static>>, +) -> Result<(), std::io::Error> { + use axum::routing::get; - tokio::spawn(async { - tracing::info!("control socket listener spawned"); - axum::serve(listener, app) - .await - .expect("axum::serve never returns") - }); + let state = match worker { + Some(worker) => ControlSocketState::Full(worker), + None => ControlSocketState::Legacy(LegacyControlSocketState), + }; - Ok(()) - } + let app = Router::new() + .route("/metrics", get(get_metrics)) + .route("/autoscaling_metrics", get(get_autoscaling_metrics)) + .route("/debug/panic", get(handle_debug_panic)) + .route("/debug/dump_cache_map", get(dump_cache_map)) + .with_state(Arc::new(state)); + + // If the server is restarted, there might be an old socket still + // lying around. Remove it first. + match std::fs::remove_file(NEON_COMMUNICATOR_SOCKET_NAME) { + Ok(()) => { + tracing::warn!("removed stale control socket"); + } + Err(e) if e.kind() == ErrorKind::NotFound => {} + Err(e) => { + tracing::error!("could not remove stale control socket: {e:#}"); + // Try to proceed anyway. It will likely fail below though. + } + }; + + // Create the unix domain socket and start listening on it + let listener = UnixListener::bind(NEON_COMMUNICATOR_SOCKET_NAME)?; + + tokio::spawn(async { + tracing::info!("control socket listener spawned"); + axum::serve(listener, app) + .await + .expect("axum::serve never returns") + }); + + Ok(()) } /// Expose all Prometheus metrics. -async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct<'_>>) -> Response { - tracing::trace!("/metrics requested"); - metrics_to_response(&state).await +async fn get_metrics(State(state): State>>) -> Response { + match state.as_ref() { + ControlSocketState::Full(worker) => metrics_to_response(&worker).await, + ControlSocketState::Legacy(_) => { + todo!() + } + } } /// Expose Prometheus metrics, for use by the autoscaling agent. /// /// This is a subset of all the metrics. -async fn get_autoscaling_metrics( - State(state): State<&CommunicatorWorkerProcessStruct<'_>>, -) -> Response { - tracing::trace!("/metrics requested"); - metrics_to_response(&state.lfc_metrics).await +async fn get_autoscaling_metrics(State(state): State>>) -> Response { + match state.as_ref() { + ControlSocketState::Full(worker) => metrics_to_response(&worker.lfc_metrics).await, + ControlSocketState::Legacy(_) => { + todo!() + } + } } -async fn handle_debug_panic( - State(_state): State<&CommunicatorWorkerProcessStruct<'_>>, -) -> Response { +async fn handle_debug_panic(State(_state): State>>) -> Response { panic!("test HTTP handler task panic"); } @@ -104,15 +120,20 @@ async fn metrics_to_response(metrics: &(dyn MetricGroup + S .unwrap() } -async fn dump_cache_map( - State(state): State<&CommunicatorWorkerProcessStruct<'static>>, -) -> Response { - let mut buf: Vec = Vec::new(); - state.cache.dump_map(&mut buf); +async fn dump_cache_map(State(state): State>>) -> Response { + match state.as_ref() { + ControlSocketState::Full(worker) => { + let mut buf: Vec = Vec::new(); + worker.cache.dump_map(&mut buf); - Response::builder() - .status(StatusCode::OK) - .header(CONTENT_TYPE, "application/text") - .body(Body::from(buf)) - .unwrap() + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/text") + .body(Body::from(buf)) + .unwrap() + } + ControlSocketState::Legacy(_) => { + todo!() + } + } } diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 0f20af6ba2..cb07e6a592 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -11,6 +11,7 @@ use crate::init::CommunicatorInitStruct; use crate::integrated_cache::{CacheResult, IntegratedCacheWriteAccess}; use crate::neon_request::{CGetPageVRequest, CPrefetchVRequest}; use crate::neon_request::{INVALID_BLOCK_NUMBER, NeonIORequest, NeonIOResult}; +use crate::worker_process::control_socket; use crate::worker_process::in_progress_ios::{RequestInProgressKey, RequestInProgressTable}; use crate::worker_process::lfc_metrics::LfcMetricsCollector; use pageserver_client_grpc::{PageserverClient, ShardSpec, ShardStripeSize}; @@ -38,7 +39,7 @@ pub struct CommunicatorWorkerProcessStruct<'a> { runtime: tokio::runtime::Runtime, /// Client to communicate with the pageserver - client: Option, + client: PageserverClient, /// Request slots that backends use to send IO requests to the communicator. neon_request_slots: &'a [NeonIORequestSlot], @@ -88,12 +89,31 @@ impl RequestTypeLabelGroup { } } +/// Launch the communicator process's Rust subsystems +#[allow(clippy::too_many_arguments)] +pub(super) fn init_legacy() -> Result<(), String> { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("communicator thread") + .build() + .unwrap(); + + // Start the listener on the control socket + runtime + .block_on(control_socket::launch_listener(None)) + .map_err(|e| e.to_string())?; + + Box::leak(Box::new(runtime)); + + Ok(()) +} + /// Launch the communicator process's Rust subsystems #[allow(clippy::too_many_arguments)] pub(super) fn init( cis: CommunicatorInitStruct, - tenant_id: Option<&str>, - timeline_id: Option<&str>, + tenant_id: &str, + timeline_id: &str, auth_token: Option<&str>, shard_map: HashMap, stripe_size: Option, @@ -101,13 +121,9 @@ pub(super) fn init( file_cache_path: Option, ) -> Result<&'static CommunicatorWorkerProcessStruct<'static>, String> { // The caller validated these already - let tenant_id = tenant_id - .map(TenantId::from_str) - .transpose() + let tenant_id = TenantId::from_str(tenant_id) .map_err(|e| format!("invalid tenant ID: {e}"))?; - let timeline_id = timeline_id - .map(TimelineId::from_str) - .transpose() + let timeline_id = TimelineId::from_str(timeline_id) .map_err(|e| format!("invalid timeline ID: {e}"))?; let shard_spec = ShardSpec::new(shard_map, stripe_size).map_err(|e| format!("invalid shard spec: {e}:"))?; @@ -137,20 +153,16 @@ pub(super) fn init( debug!("Initialised integrated cache: {cache:?}"); - let client = if let (Some(tenant_id), Some(timeline_id)) = (tenant_id, timeline_id) { + let client = { let _guard = runtime.enter(); - Some( - PageserverClient::new( - tenant_id, - timeline_id, - shard_spec, - auth_token.map(|s| s.to_string()), - None, - ) - .expect("could not create client"), + PageserverClient::new( + tenant_id, + timeline_id, + shard_spec, + auth_token.map(|s| s.to_string()), + None, ) - } else { - None + .expect("could not create client") }; let worker_struct = CommunicatorWorkerProcessStruct { @@ -188,7 +200,7 @@ pub(super) fn init( // Start the listener on the control socket worker_struct .runtime - .block_on(worker_struct.launch_control_socket_listener()) + .block_on(control_socket::launch_listener(Some(worker_struct))) .map_err(|e| e.to_string())?; Ok(worker_struct) @@ -200,13 +212,12 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { &self, new_shard_map: HashMap, ) { - let client = self.client.as_ref().unwrap(); let shard_spec = ShardSpec::new(new_shard_map, self.stripe_size).expect("invalid shard spec"); { let _in_runtime = self.runtime.enter(); - if let Err(err) = client.update_shards(shard_spec) { + if let Err(err) = self.client.update_shards(shard_spec) { tracing::error!("could not update shard map: {err:?}"); } } @@ -331,11 +342,6 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { /// Handle one IO request async fn handle_request(&'static self, request: &'_ NeonIORequest) -> NeonIOResult { - let client = self - .client - .as_ref() - .expect("cannot handle requests without client"); - self.request_counters .inc(RequestTypeLabelGroup::from_req(request)); match request { @@ -363,7 +369,7 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { }; let read_lsn = self.request_lsns(not_modified_since); - match client + match self.client .get_rel_size(page_api::GetRelSizeRequest { read_lsn, rel, @@ -402,7 +408,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { let lsn = Lsn(req.request_lsn); let file_path = req.destination_file_path(); - match client + match self + .client .get_slru_segment(page_api::GetSlruSegmentRequest { read_lsn: self.request_lsns(lsn), kind: req.slru_kind, @@ -448,7 +455,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { CacheResult::NotFound(lsn) => lsn, }; - match client + match self + .client .get_db_size(page_api::GetDbSizeRequest { read_lsn: self.request_lsns(not_modified_since), db_oid: req.db_oid, @@ -543,10 +551,6 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { /// Subroutine to handle a GetPageV request, since it's a little more complicated than /// others. async fn handle_get_pagev_request(&'t self, req: &CGetPageVRequest) -> Result<(), i32> { - let client = self - .client - .as_ref() - .expect("cannot handle requests without client"); let rel = req.reltag(); // Check the cache first @@ -605,7 +609,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { "sending getpage request for blocks {:?} in rel {:?} lsns {}", block_numbers, rel, read_lsn ); - match client + match self + .client .get_page(page_api::GetPageRequest { request_id: req.request_id.into(), request_class: page_api::GetPageClass::Normal, @@ -665,10 +670,6 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { /// /// This is very similar to a GetPageV request, but the results are only stored in the cache. async fn handle_prefetchv_request(&'static self, req: &CPrefetchVRequest) -> Result<(), i32> { - let client = self - .client - .as_ref() - .expect("cannot handle requests without client"); let rel = req.reltag(); // Check the cache first @@ -709,7 +710,8 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> { // TODO: spawn separate tasks for these. Use the integrated cache to keep track of the // in-flight requests - match client + match self + .client .get_page(page_api::GetPageRequest { request_id: req.request_id.into(), request_class: page_api::GetPageClass::Prefetch, diff --git a/pgxn/neon/communicator/src/worker_process/worker_interface.rs b/pgxn/neon/communicator/src/worker_process/worker_interface.rs index c9c985c1b7..963f46a5d4 100644 --- a/pgxn/neon/communicator/src/worker_process/worker_interface.rs +++ b/pgxn/neon/communicator/src/worker_process/worker_interface.rs @@ -44,17 +44,13 @@ pub extern "C" fn communicator_worker_process_launch( ) -> Option<&'static CommunicatorWorkerProcessStruct<'static>> { tracing::warn!("starting threads in rust code"); // Convert the arguments into more convenient Rust types - let tenant_id = if tenant_id.is_null() { - None - } else { + let tenant_id = { let cstr = unsafe { CStr::from_ptr(tenant_id) }; - Some(cstr.to_str().expect("assume UTF-8")) + cstr.to_str().expect("assume UTF-8") }; - let timeline_id = if timeline_id.is_null() { - None - } else { + let timeline_id = { let cstr = unsafe { CStr::from_ptr(timeline_id) }; - Some(cstr.to_str().expect("assume UTF-8")) + cstr.to_str().expect("assume UTF-8") }; let auth_token = if auth_token.is_null() { None @@ -106,6 +102,25 @@ pub extern "C" fn communicator_worker_process_launch( } } +#[unsafe(no_mangle)] +pub extern "C" fn communicator_worker_process_launch_legacy(error_p: *mut *const c_char) -> bool { + // The `init` function does all the work. + let result = main_loop::init_legacy(); + + // On failure, return the error message to the C caller in *error_p. + match result { + Ok(()) => true, + Err(errmsg) => { + let errmsg = CString::new(errmsg).expect("no nuls within error message"); + let errmsg = Box::leak(errmsg.into_boxed_c_str()); + let p: *const c_char = errmsg.as_ptr(); + + unsafe { *error_p = p }; + false + } + } +} + /// Convert the "shard map" from an array of C strings, indexed by shard no to a rust HashMap fn shard_map_to_hash( nshards: u32, diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index e94d7bc36b..b951ecff74 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -201,6 +201,10 @@ communicator_new_shmem_size(void) void CommunicatorNewShmemRequest(void) { + if (!neon_use_communicator_worker) + return; + if (neon_tenant[0] == '\0' || neon_timeline[0] == '\0') + return; RequestAddinShmemSpace(communicator_new_shmem_size()); } @@ -216,7 +220,14 @@ CommunicatorNewShmemInit(void) uint64 initial_file_cache_size; uint64 max_file_cache_size; - /* FIXME: much of this could be skipped if !neon_use_communicator_worker */ + if (!neon_use_communicator_worker) + return; + + if (neon_tenant[0] == '\0' || neon_timeline[0] == '\0') + { + elog(LOG, "disabling communicator worker because neon_tenant is empty"); + return; + } rc = pipe(pipefd); if (rc != 0) @@ -276,7 +287,6 @@ void notify_proc_unsafe(int procno) { SetLatch(&communicator_shmem_ptr->backends[procno].io_completion_latch); - } /**** Backend functions. These run in each backend ****/ diff --git a/pgxn/neon/communicator_process.c b/pgxn/neon/communicator_process.c index 36bd548f42..e5cdf30ab7 100644 --- a/pgxn/neon/communicator_process.c +++ b/pgxn/neon/communicator_process.c @@ -82,6 +82,7 @@ communicator_new_bgworker_main(Datum main_arg) struct LoggingReceiver *logging; const char *errmsg = NULL; const struct CommunicatorWorkerProcessStruct *proc_handle; + bool success; /* * Pretend that this process is a WAL sender. That affects the shutdown @@ -103,20 +104,6 @@ communicator_new_bgworker_main(Datum main_arg) BackgroundWorkerUnblockSignals(); - /* lfc_size_limit is in MBs */ - file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ); - if (file_cache_size < 100) - file_cache_size = 100; - - if (!parse_shard_map(pageserver_grpc_urls, &shard_map)) - { - /* shouldn't happen, as the GUC was verified already */ - elog(FATAL, "could not parse neon.pageserver_grpcs_urls"); - } - connstrings = palloc(shard_map.num_shards * sizeof(char *)); - for (int i = 0; i < shard_map.num_shards; i++) - connstrings[i] = shard_map.connstring[i]; - /* * By default, INFO messages are not printed to the log. We want * `tracing::info!` messages emitted from the communicator to be printed, @@ -131,21 +118,41 @@ communicator_new_bgworker_main(Datum main_arg) logging = communicator_worker_configure_logging(); - Assert(cis != NULL); - proc_handle = communicator_worker_process_launch( - cis, - neon_tenant[0] == '\0' ? NULL : neon_tenant, - neon_timeline[0] == '\0' ? NULL : neon_timeline, - neon_auth_token, - connstrings, - shard_map.num_shards, - neon_stripe_size, - lfc_path, - file_cache_size, - &errmsg); - pfree(connstrings); - cis = NULL; - if (proc_handle == NULL) + if (cis != NULL) + { + /* lfc_size_limit is in MBs */ + file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ); + if (file_cache_size < 100) + file_cache_size = 100; + + if (!parse_shard_map(pageserver_grpc_urls, &shard_map)) + { + /* shouldn't happen, as the GUC was verified already */ + elog(FATAL, "could not parse neon.pageserver_grpcs_urls"); + } + connstrings = palloc(shard_map.num_shards * sizeof(char *)); + for (int i = 0; i < shard_map.num_shards; i++) + connstrings[i] = shard_map.connstring[i]; + proc_handle = communicator_worker_process_launch( + cis, + neon_tenant, + neon_timeline, + neon_auth_token, + connstrings, + shard_map.num_shards, + neon_stripe_size, + lfc_path, + file_cache_size, + &errmsg); + pfree(connstrings); + cis = NULL; + success = proc_handle != NULL; + } + else + { + success = communicator_worker_process_launch_legacy(&errmsg); + } + if (!success) { /* * Something went wrong. Before exiting, forward any log messages that @@ -206,26 +213,29 @@ communicator_new_bgworker_main(Datum main_arg) ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); - /* lfc_size_limit is in MBs */ - file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ); - if (file_cache_size < 100) - file_cache_size = 100; - - /* Reload pageserver URLs */ - if (!parse_shard_map(pageserver_grpc_urls, &shard_map)) + if (proc_handle) { - /* shouldn't happen, as the GUC was verified already */ - elog(FATAL, "could not parse neon.pageserver_grpcs_urls"); - } - connstrings = palloc(shard_map.num_shards * sizeof(char *)); - for (int i = 0; i < shard_map.num_shards; i++) - connstrings[i] = shard_map.connstring[i]; + /* lfc_size_limit is in MBs */ + file_cache_size = lfc_size_limit * (1024 * 1024 / BLCKSZ); + if (file_cache_size < 100) + file_cache_size = 100; - communicator_worker_config_reload(proc_handle, - file_cache_size, - connstrings, - shard_map.num_shards); - pfree(connstrings); + /* Reload pageserver URLs */ + if (!parse_shard_map(pageserver_grpc_urls, &shard_map)) + { + /* shouldn't happen, as the GUC was verified already */ + elog(FATAL, "could not parse neon.pageserver_grpcs_urls"); + } + connstrings = palloc(shard_map.num_shards * sizeof(char *)); + for (int i = 0; i < shard_map.num_shards; i++) + connstrings[i] = shard_map.connstring[i]; + + communicator_worker_config_reload(proc_handle, + file_cache_size, + connstrings, + shard_map.num_shards); + pfree(connstrings); + } } duration = TimestampDifferenceMilliseconds(before, GetCurrentTimestamp());