diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index 31c205b3a8..aee31f14a7 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -101,18 +101,6 @@ impl TenantConfigRequest { } } -/// A WAL receiver's data stored inside the global `WAL_RECEIVERS`. -/// We keep one WAL receiver active per timeline. -#[serde_as] -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct WalReceiverEntry { - pub wal_source_connstr: Option, - #[serde_as(as = "Option")] - pub last_received_msg_lsn: Option, - /// the timestamp (in microseconds) of the last received message - pub last_received_msg_ts: Option, -} - #[serde_as] #[derive(Serialize, Deserialize, Clone)] pub struct TenantInfo { @@ -143,6 +131,12 @@ pub struct LocalTimelineInfo { pub current_logical_size_non_incremental: Option, pub current_physical_size_non_incremental: Option, pub timeline_state: LocalTimelineState, + + pub wal_source_connstr: Option, + #[serde_as(as = "Option")] + pub last_received_msg_lsn: Option, + /// the timestamp (in microseconds) of the last received message + pub last_received_msg_ts: Option, } #[serde_as] diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 2109fcbe5b..106c14fbc8 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -207,54 +207,6 @@ paths: schema: $ref: "#/components/schemas/Error" - /v1/tenant/{tenant_id}/timeline/{timeline_id}/wal_receiver: - parameters: - - name: tenant_id - in: path - required: true - schema: - type: string - format: hex - - name: timeline_id - in: path - required: true - schema: - type: string - format: hex - get: - description: Get wal receiver's data attached to the timeline - responses: - "200": - description: WalReceiverEntry - content: - application/json: - schema: - $ref: "#/components/schemas/WalReceiverEntry" - "401": - description: Unauthorized Error - content: - application/json: - schema: - $ref: "#/components/schemas/UnauthorizedError" - "403": - description: Forbidden Error - content: - application/json: - schema: - $ref: "#/components/schemas/ForbiddenError" - "404": - description: Error when no wal receiver is running or found - content: - application/json: - schema: - $ref: "#/components/schemas/NotFoundError" - "500": - description: Generic operation error - content: - application/json: - schema: - $ref: "#/components/schemas/Error" - /v1/tenant/{tenant_id}/attach: parameters: - name: tenant_id @@ -689,15 +641,6 @@ components: type: integer current_physical_size_non_incremental: type: integer - - WalReceiverEntry: - type: object - required: - - thread_id - - wal_source_connstr - properties: - thread_id: - type: integer wal_source_connstr: type: string last_received_msg_lsn: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 9d284405ec..fa598de402 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -91,6 +91,19 @@ fn local_timeline_info_from_loaded_timeline( include_non_incremental_physical_size: bool, ) -> anyhow::Result { let last_record_lsn = timeline.get_last_record_lsn(); + let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = { + let guard = timeline.last_received_wal.lock().unwrap(); + if let Some(info) = guard.as_ref() { + ( + Some(info.wal_source_connstr.clone()), + Some(info.last_received_msg_lsn), + Some(info.last_received_msg_ts), + ) + } else { + (None, None, None) + } + }; + let info = LocalTimelineInfo { ancestor_timeline_id: timeline.get_ancestor_timeline_id(), ancestor_lsn: { @@ -116,6 +129,9 @@ fn local_timeline_info_from_loaded_timeline( } else { None }, + wal_source_connstr, + last_received_msg_lsn, + last_received_msg_ts, }; Ok(info) } @@ -138,6 +154,9 @@ fn local_timeline_info_from_unloaded_timeline(metadata: &TimelineMetadata) -> Lo current_physical_size: None, current_logical_size_non_incremental: None, current_physical_size_non_incremental: None, + wal_source_connstr: None, + last_received_msg_lsn: None, + last_received_msg_ts: None, } } @@ -348,23 +367,6 @@ async fn timeline_detail_handler(request: Request) -> Result) -> Result, ApiError> { - let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; - check_permission(&request, Some(tenant_id))?; - - let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?; - let wal_receiver_entry = crate::walreceiver::get_wal_receiver_entry(tenant_id, timeline_id) - .instrument(info_span!("wal_receiver_get", tenant = %tenant_id, timeline = %timeline_id)) - .await - .ok_or_else(|| { - ApiError::NotFound(format!( - "WAL receiver data not found for tenant {tenant_id} and timeline {timeline_id}" - )) - })?; - - json_response(StatusCode::OK, &wal_receiver_entry) -} - // TODO makes sense to provide tenant config right away the same way as it handled in tenant_create async fn tenant_attach_handler(request: Request) -> Result, ApiError> { let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; @@ -751,9 +753,5 @@ pub fn make_router( "/v1/tenant/:tenant_id/timeline/:timeline_id/detach", timeline_delete_handler, ) - .get( - "/v1/tenant/:tenant_id/timeline/:timeline_id/wal_receiver", - wal_receiver_get_handler, - ) .any(handler_404)) } diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index c500b05e66..79a180c4cf 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -67,6 +67,9 @@ pub use crate::layered_repository::ephemeral_file::writeback as writeback_epheme // re-export for use in storage_sync.rs pub use crate::layered_repository::timeline::save_metadata; +// re-export for use in walreceiver +pub use crate::layered_repository::timeline::WalReceiverInfo; + /// Parts of the `.neon/tenants//timelines/` directory prefix. pub const TIMELINES_SEGMENT_NAME: &str = "timelines"; diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/layered_repository/timeline.rs index 703e1993e5..6ed1efd3d1 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/layered_repository/timeline.rs @@ -290,6 +290,17 @@ pub struct LayeredTimeline { /// Current logical size of the "datadir", at the last LSN. current_logical_size: AtomicIsize, + + /// Information about the last processed message by the WAL receiver, + /// or None if WAL receiver has not received anything for this timeline + /// yet. + pub last_received_wal: Mutex>, +} + +pub struct WalReceiverInfo { + pub wal_source_connstr: String, + pub last_received_msg_lsn: Lsn, + pub last_received_msg_ts: u128, } /// Inherit all the functions from DatadirTimeline, to provide the @@ -605,6 +616,8 @@ impl LayeredTimeline { current_logical_size: AtomicIsize::new(0), partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))), repartition_threshold: 0, + + last_received_wal: Mutex::new(None), }; result.repartition_threshold = result.get_checkpoint_distance() / 10; result diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 946230b0d3..43bb3fa971 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -26,7 +26,6 @@ mod walreceiver_connection; use anyhow::{ensure, Context}; use etcd_broker::Client; use itertools::Itertools; -use once_cell::sync::Lazy; use std::cell::Cell; use std::collections::{hash_map, HashMap, HashSet}; use std::future::Future; @@ -36,14 +35,13 @@ use std::thread_local; use std::time::Duration; use tokio::{ select, - sync::{mpsc, watch, RwLock}, + sync::{mpsc, watch}, task::JoinHandle, }; use tracing::*; use url::Url; use crate::config::PageServerConf; -use crate::http::models::WalReceiverEntry; use crate::tenant_mgr::{self, LocalTimelineUpdate, TenantState}; use crate::thread_mgr::{self, ThreadKind}; use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; @@ -55,23 +53,6 @@ thread_local! { pub(crate) static IS_WAL_RECEIVER: Cell = Cell::new(false); } -/// WAL receiver state for sharing with the outside world. -/// Only entries for timelines currently available in pageserver are stored. -static WAL_RECEIVER_ENTRIES: Lazy>> = - Lazy::new(|| RwLock::new(HashMap::new())); - -/// Gets the public WAL streaming entry for a certain timeline. -pub async fn get_wal_receiver_entry( - tenant_id: ZTenantId, - timeline_id: ZTimelineId, -) -> Option { - WAL_RECEIVER_ENTRIES - .read() - .await - .get(&ZTenantTimelineId::new(tenant_id, timeline_id)) - .cloned() -} - /// Sets up the main WAL receiver thread that manages the rest of the subtasks inside of it, per timeline. /// See comments in [`wal_receiver_main_thread_loop_step`] for more details on per timeline activities. pub fn init_wal_receiver_main_thread( @@ -281,13 +262,10 @@ async fn wal_receiver_main_thread_loop_step<'a>( } None => warn!("Timeline {id} does not have a tenant entry in wal receiver main thread"), }; - { - WAL_RECEIVER_ENTRIES.write().await.remove(&id); - if let Err(e) = join_confirmation_sender.send(()) { - warn!("cannot send wal_receiver shutdown confirmation {e}") - } else { - info!("confirm walreceiver shutdown for {id}"); - } + if let Err(e) = join_confirmation_sender.send(()) { + warn!("cannot send wal_receiver shutdown confirmation {e}") + } else { + info!("confirm walreceiver shutdown for {id}"); } } // Timeline got attached, retrieve all necessary information to start its broker loop and maintain this loop endlessly. @@ -322,17 +300,6 @@ async fn wal_receiver_main_thread_loop_step<'a>( } }; - { - WAL_RECEIVER_ENTRIES.write().await.insert( - id, - WalReceiverEntry { - wal_source_connstr: None, - last_received_msg_lsn: None, - last_received_msg_ts: None, - }, - ); - } - vacant_connection_manager_entry.insert( connection_manager::spawn_connection_manager_task( id, diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index 8846e27256..fbd9ccd3c5 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -19,7 +19,7 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use super::TaskEvent; use crate::{ - http::models::WalReceiverEntry, + layered_repository::WalReceiverInfo, pgdatadir_mapping::DatadirTimeline, repository::{Repository, Timeline}, tenant_mgr, @@ -232,21 +232,16 @@ pub async fn handle_walreceiver_connection( let apply_lsn = u64::from(timeline_remote_consistent_lsn); let ts = SystemTime::now(); - // Update the current WAL receiver's data stored inside the global hash table `WAL_RECEIVERS` - { - super::WAL_RECEIVER_ENTRIES.write().await.insert( - id, - WalReceiverEntry { - wal_source_connstr: Some(wal_source_connstr.to_owned()), - last_received_msg_lsn: Some(last_lsn), - last_received_msg_ts: Some( - ts.duration_since(SystemTime::UNIX_EPOCH) - .expect("Received message time should be before UNIX EPOCH!") - .as_micros(), - ), - }, - ); - } + // Update the status about what we just received. This is shown in the mgmt API. + let last_received_wal = WalReceiverInfo { + wal_source_connstr: wal_source_connstr.to_owned(), + last_received_msg_lsn: last_lsn, + last_received_msg_ts: ts + .duration_since(SystemTime::UNIX_EPOCH) + .expect("Received message time should be before UNIX EPOCH!") + .as_micros(), + }; + *timeline.last_received_wal.lock().unwrap() = Some(last_received_wal); // Send zenith feedback message. // Regular standby_status_update fields are put into this message. diff --git a/test_runner/batch_others/test_pageserver_api.py b/test_runner/batch_others/test_pageserver_api.py index a298f1d701..95791888a5 100644 --- a/test_runner/batch_others/test_pageserver_api.py +++ b/test_runner/batch_others/test_pageserver_api.py @@ -47,7 +47,8 @@ def check_client(client: NeonPageserverHttpClient, initial_tenant: UUID): for timeline in timelines: timeline_id_str = str(timeline['timeline_id']) timeline_details = client.timeline_detail(tenant_id=tenant_id, - timeline_id=UUID(timeline_id_str)) + timeline_id=UUID(timeline_id_str), + include_non_incremental_logical_size=True) assert timeline_details['tenant_id'] == tenant_id.hex assert timeline_details['timeline_id'] == timeline_id_str @@ -63,13 +64,19 @@ def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv): tenant_id, timeline_id = env.neon_cli.create_tenant() - empty_response = client.wal_receiver_get(tenant_id, timeline_id) + timeline_details = client.timeline_detail(tenant_id=tenant_id, + timeline_id=timeline_id, + include_non_incremental_logical_size=True) - assert empty_response.get('wal_source_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running' - assert empty_response.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running' - assert empty_response.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running' + assert timeline_details.get('wal_source_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running' + assert timeline_details.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running' + assert timeline_details.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running' +# Test the WAL-receiver related fields in the response to `timeline_details` API call +# +# These fields used to be returned by a separate API call, but they're part of +# `timeline_details` now. def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv): env = neon_simple_env client = env.pageserver.http_client() @@ -78,18 +85,17 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv): pg = env.postgres.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id) def expect_updated_msg_lsn(prev_msg_lsn: Optional[int]) -> int: - res = client.wal_receiver_get(tenant_id, timeline_id) + timeline_details = client.timeline_detail(tenant_id, timeline_id=timeline_id) - # a successful `wal_receiver_get` response must contain the below fields - assert list(res.keys()) == [ - "wal_source_connstr", - "last_received_msg_lsn", - "last_received_msg_ts", - ] + # a successful `timeline_details` response must contain the below fields + local_timeline_details = timeline_details['local'] + assert "wal_source_connstr" in local_timeline_details.keys() + assert "last_received_msg_lsn" in local_timeline_details.keys() + assert "last_received_msg_ts" in local_timeline_details.keys() - assert res["last_received_msg_lsn"] is not None, "the last received message's LSN is empty" + assert local_timeline_details["last_received_msg_lsn"] is not None, "the last received message's LSN is empty" - last_msg_lsn = lsn_from_hex(res["last_received_msg_lsn"]) + last_msg_lsn = lsn_from_hex(local_timeline_details["last_received_msg_lsn"]) assert prev_msg_lsn is None or prev_msg_lsn < last_msg_lsn, \ f"the last received message's LSN {last_msg_lsn} hasn't been updated \ compared to the previous message's LSN {prev_msg_lsn}" @@ -98,7 +104,7 @@ def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv): # Wait to make sure that we get a latest WAL receiver data. # We need to wait here because it's possible that we don't have access to - # the latest WAL during the time the `wal_receiver_get` API is called. + # the latest WAL yet, when the `timeline_detail` API is first called. # See: https://github.com/neondatabase/neon/issues/1768. lsn = wait_until(number_of_iterations=5, interval=1, func=lambda: expect_updated_msg_lsn(None)) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 397b932ec9..8fa9e4a2ea 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -865,10 +865,24 @@ class NeonPageserverHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json - def timeline_detail(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]: + def timeline_detail(self, + tenant_id: uuid.UUID, + timeline_id: uuid.UUID, + include_non_incremental_logical_size: bool = False, + include_non_incremental_physical_size: bool = False) -> Dict[Any, Any]: + + include_non_incremental_logical_size_str = "0" + if include_non_incremental_logical_size: + include_non_incremental_logical_size_str = "1" + + include_non_incremental_physical_size_str = "0" + if include_non_incremental_physical_size: + include_non_incremental_physical_size_str = "1" + res = self.get( f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}" + - "?include-non-incremental-logical-size=1&include-non-incremental-physical-size=1") + "?include-non-incremental-logical-size={include_non_incremental_logical_size_str}" + + "&include-non-incremental-physical-size={include_non_incremental_physical_size_str}") self.verbose_error(res) res_json = res.json() assert isinstance(res_json, dict) @@ -882,15 +896,6 @@ class NeonPageserverHttpClient(requests.Session): assert res_json is None return res_json - def wal_receiver_get(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]: - res = self.get( - f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}/wal_receiver" - ) - self.verbose_error(res) - res_json = res.json() - assert isinstance(res_json, dict) - return res_json - def get_metrics(self) -> str: res = self.get(f"http://localhost:{self.port}/metrics") self.verbose_error(res)