From e4a70faa08a480caa648a533c9ca579db8709fad Mon Sep 17 00:00:00 2001 From: Thang Pham Date: Mon, 16 May 2022 11:05:43 -0400 Subject: [PATCH] Add more information to timeline-related APIs (#1673) Resolves #1488. - implemented `GET tenant/:tenant_id/timeline/:timeline_id/wal_receiver` endpoint - returned `thread_id` in `thread_mgr::spawn` - added `latest_gc_cutoff_lsn` field to `LocalTimelineInfo` struct --- pageserver/src/http/openapi_spec.yml | 62 ++++++++++++++++ pageserver/src/http/routes.rs | 28 ++++++++ pageserver/src/tenant_mgr.rs | 1 + pageserver/src/thread_mgr.rs | 4 +- pageserver/src/timelines.rs | 4 ++ pageserver/src/walreceiver.rs | 72 +++++++++++++++---- .../batch_others/test_pageserver_api.py | 41 ++++++++++- test_runner/fixtures/zenith_fixtures.py | 9 +++ 8 files changed, 204 insertions(+), 17 deletions(-) diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 9932a2d08d..55f7b3c5a7 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -123,6 +123,53 @@ paths: schema: $ref: "#/components/schemas/Error" + /v1/tenant/{tenant_id}/timeline/{timeline_id}/wal_receiver: + parameters: + - name: tenant_id + in: path + required: true + schema: + type: string + format: hex + - name: timeline_id + in: path + required: true + schema: + type: string + format: hex + get: + description: Get wal receiver's data attached to the timeline + responses: + "200": + description: WalReceiverEntry + content: + application/json: + schema: + $ref: "#/components/schemas/WalReceiverEntry" + "401": + description: Unauthorized Error + content: + application/json: + schema: + $ref: "#/components/schemas/UnauthorizedError" + "403": + description: Forbidden Error + content: + application/json: + schema: + $ref: "#/components/schemas/ForbiddenError" + "404": + description: Error when no wal receiver is running or found + content: + application/json: + schema: + $ref: "#/components/schemas/NotFoundError" + "500": + description: Generic operation error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" /v1/tenant/{tenant_id}/timeline/{timeline_id}/attach: parameters: @@ -520,6 +567,21 @@ components: type: integer current_logical_size_non_incremental: type: integer + WalReceiverEntry: + type: object + required: + - thread_id + - wal_producer_connstr + properties: + thread_id: + type: integer + wal_producer_connstr: + type: string + last_received_msg_lsn: + type: string + format: hex + last_received_msg_ts: + type: integer Error: type: object diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 0104df826e..bb650a34ed 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -224,6 +224,30 @@ async fn timeline_detail_handler(request: Request) -> Result) -> Result, ApiError> { + let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; + check_permission(&request, Some(tenant_id))?; + + let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?; + + let wal_receiver = tokio::task::spawn_blocking(move || { + let _enter = + info_span!("wal_receiver_get", tenant = %tenant_id, timeline = %timeline_id).entered(); + + crate::walreceiver::get_wal_receiver_entry(tenant_id, timeline_id) + }) + .await + .map_err(ApiError::from_err)? + .ok_or_else(|| { + ApiError::NotFound(format!( + "WAL receiver not found for tenant {} and timeline {}", + tenant_id, timeline_id + )) + })?; + + json_response(StatusCode::OK, wal_receiver) +} + async fn timeline_attach_handler(request: Request) -> Result, ApiError> { let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; @@ -485,6 +509,10 @@ pub fn make_router( "/v1/tenant/:tenant_id/timeline/:timeline_id", timeline_detail_handler, ) + .get( + "/v1/tenant/:tenant_id/timeline/:timeline_id/wal_receiver", + wal_receiver_get_handler, + ) .post( "/v1/tenant/:tenant_id/timeline/:timeline_id/attach", timeline_attach_handler, diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 9bde9a5c4a..bbe66d7f80 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -281,6 +281,7 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> { false, move || crate::tenant_threads::gc_loop(tenant_id), ) + .map(|_thread_id| ()) // update the `Result::Ok` type to match the outer function's return signature .with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}")); if let Err(e) = &gc_spawn_result { diff --git a/pageserver/src/thread_mgr.rs b/pageserver/src/thread_mgr.rs index b908f220ee..473cddda58 100644 --- a/pageserver/src/thread_mgr.rs +++ b/pageserver/src/thread_mgr.rs @@ -139,7 +139,7 @@ pub fn spawn( name: &str, shutdown_process_on_error: bool, f: F, -) -> std::io::Result<()> +) -> std::io::Result where F: FnOnce() -> anyhow::Result<()> + Send + 'static, { @@ -193,7 +193,7 @@ where drop(jh_guard); // The thread is now running. Nothing more to do here - Ok(()) + Ok(thread_id) } /// This wrapper function runs in a newly-spawned thread. It initializes the diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index 7cfd33c40b..eadf5bf4e0 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -45,6 +45,8 @@ pub struct LocalTimelineInfo { #[serde_as(as = "Option")] pub prev_record_lsn: Option, #[serde_as(as = "DisplayFromStr")] + pub latest_gc_cutoff_lsn: Lsn, + #[serde_as(as = "DisplayFromStr")] pub disk_consistent_lsn: Lsn, pub current_logical_size: Option, // is None when timeline is Unloaded pub current_logical_size_non_incremental: Option, @@ -68,6 +70,7 @@ impl LocalTimelineInfo { disk_consistent_lsn: datadir_tline.tline.get_disk_consistent_lsn(), last_record_lsn, prev_record_lsn: Some(datadir_tline.tline.get_prev_record_lsn()), + latest_gc_cutoff_lsn: *datadir_tline.tline.get_latest_gc_cutoff_lsn(), timeline_state: LocalTimelineState::Loaded, current_logical_size: Some(datadir_tline.get_current_logical_size()), current_logical_size_non_incremental: if include_non_incremental_logical_size { @@ -91,6 +94,7 @@ impl LocalTimelineInfo { disk_consistent_lsn: metadata.disk_consistent_lsn(), last_record_lsn: metadata.disk_consistent_lsn(), prev_record_lsn: metadata.prev_record_lsn(), + latest_gc_cutoff_lsn: metadata.latest_gc_cutoff_lsn(), timeline_state: LocalTimelineState::Unloaded, current_logical_size: None, current_logical_size_non_incremental: None, diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index b7a33364c9..b8f349af8f 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -18,6 +18,8 @@ use lazy_static::lazy_static; use postgres_ffi::waldecoder::*; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; use std::cell::Cell; use std::collections::HashMap; use std::str::FromStr; @@ -35,11 +37,19 @@ use utils::{ zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}, }; -// -// We keep one WAL Receiver active per timeline. -// -struct WalReceiverEntry { +/// +/// A WAL receiver's data stored inside the global `WAL_RECEIVERS`. +/// We keep one WAL receiver active per timeline. +/// +#[serde_as] +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct WalReceiverEntry { + thread_id: u64, wal_producer_connstr: String, + #[serde_as(as = "Option")] + last_received_msg_lsn: Option, + /// the timestamp (in microseconds) of the last received message + last_received_msg_ts: Option, } lazy_static! { @@ -74,7 +84,7 @@ pub fn launch_wal_receiver( receiver.wal_producer_connstr = wal_producer_connstr.into(); } None => { - thread_mgr::spawn( + let thread_id = thread_mgr::spawn( ThreadKind::WalReceiver, Some(tenantid), Some(timelineid), @@ -88,7 +98,10 @@ pub fn launch_wal_receiver( )?; let receiver = WalReceiverEntry { + thread_id, wal_producer_connstr: wal_producer_connstr.into(), + last_received_msg_lsn: None, + last_received_msg_ts: None, }; receivers.insert((tenantid, timelineid), receiver); @@ -99,15 +112,13 @@ pub fn launch_wal_receiver( Ok(()) } -// Look up current WAL producer connection string in the hash table -fn get_wal_producer_connstr(tenantid: ZTenantId, timelineid: ZTimelineId) -> String { +/// Look up a WAL receiver's data in the global `WAL_RECEIVERS` +pub fn get_wal_receiver_entry( + tenant_id: ZTenantId, + timeline_id: ZTimelineId, +) -> Option { let receivers = WAL_RECEIVERS.lock().unwrap(); - - receivers - .get(&(tenantid, timelineid)) - .unwrap() - .wal_producer_connstr - .clone() + receivers.get(&(tenant_id, timeline_id)).cloned() } // @@ -118,7 +129,18 @@ fn thread_main(conf: &'static PageServerConf, tenant_id: ZTenantId, timeline_id: info!("WAL receiver thread started"); // Look up the current WAL producer address - let wal_producer_connstr = get_wal_producer_connstr(tenant_id, timeline_id); + let wal_producer_connstr = { + match get_wal_receiver_entry(tenant_id, timeline_id) { + Some(e) => e.wal_producer_connstr, + None => { + info!( + "Unable to create the WAL receiver thread: no WAL receiver entry found for tenant {} and timeline {}", + tenant_id, timeline_id + ); + return; + } + } + }; // Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server, // and start streaming WAL from it. @@ -318,6 +340,28 @@ fn walreceiver_main( let apply_lsn = u64::from(timeline_remote_consistent_lsn); let ts = SystemTime::now(); + // Update the current WAL receiver's data stored inside the global hash table `WAL_RECEIVERS` + { + let mut receivers = WAL_RECEIVERS.lock().unwrap(); + let entry = match receivers.get_mut(&(tenant_id, timeline_id)) { + Some(e) => e, + None => { + anyhow::bail!( + "no WAL receiver entry found for tenant {} and timeline {}", + tenant_id, + timeline_id + ); + } + }; + + entry.last_received_msg_lsn = Some(last_lsn); + entry.last_received_msg_ts = Some( + ts.duration_since(SystemTime::UNIX_EPOCH) + .expect("Received message time should be before UNIX EPOCH!") + .as_micros(), + ); + } + // Send zenith feedback message. // Regular standby_status_update fields are put into this message. let zenith_status_update = ZenithFeedback { diff --git a/test_runner/batch_others/test_pageserver_api.py b/test_runner/batch_others/test_pageserver_api.py index 13f6ef358e..7fe3b4dff5 100644 --- a/test_runner/batch_others/test_pageserver_api.py +++ b/test_runner/batch_others/test_pageserver_api.py @@ -1,6 +1,12 @@ from uuid import uuid4, UUID import pytest -from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient +from fixtures.zenith_fixtures import ( + DEFAULT_BRANCH_NAME, + ZenithEnv, + ZenithEnvBuilder, + ZenithPageserverHttpClient, + ZenithPageserverApiException, +) # test that we cannot override node id @@ -48,6 +54,39 @@ def check_client(client: ZenithPageserverHttpClient, initial_tenant: UUID): assert local_timeline_details['timeline_state'] == 'Loaded' +def test_pageserver_http_get_wal_receiver_not_found(zenith_simple_env: ZenithEnv): + env = zenith_simple_env + client = env.pageserver.http_client() + + tenant_id, timeline_id = env.zenith_cli.create_tenant() + + # no PG compute node is running, so no WAL receiver is running + with pytest.raises(ZenithPageserverApiException) as e: + _ = client.wal_receiver_get(tenant_id, timeline_id) + assert "Not Found" in str(e.value) + + +def test_pageserver_http_get_wal_receiver_success(zenith_simple_env: ZenithEnv): + env = zenith_simple_env + client = env.pageserver.http_client() + + tenant_id, timeline_id = env.zenith_cli.create_tenant() + pg = env.postgres.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id) + + res = client.wal_receiver_get(tenant_id, timeline_id) + assert list(res.keys()) == [ + "thread_id", + "wal_producer_connstr", + "last_received_msg_lsn", + "last_received_msg_ts", + ] + + # make a DB modification then expect getting a new WAL receiver's data + pg.safe_psql("CREATE TABLE t(key int primary key, value text)") + res2 = client.wal_receiver_get(tenant_id, timeline_id) + assert res2["last_received_msg_lsn"] > res["last_received_msg_lsn"] + + def test_pageserver_http_api_client(zenith_simple_env: ZenithEnv): env = zenith_simple_env client = env.pageserver.http_client() diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 50b7ef6dbb..14eae60248 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -786,6 +786,15 @@ class ZenithPageserverHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json + def wal_receiver_get(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]: + res = self.get( + f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}/wal_receiver" + ) + self.verbose_error(res) + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + def get_metrics(self) -> str: res = self.get(f"http://localhost:{self.port}/metrics") self.verbose_error(res)