From de744a44dd63d22cff56ceb8d200897e54414a0c Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 12 Oct 2021 22:04:17 +0300 Subject: [PATCH] Add /timeline http request to safekeeper returning its status. Which is mainly generational state (terms) and useful LSNs. Also add /status basic healthcheck request which is now used in tests to determine the safekeeper is up; this fixes #726. ref #115 --- Cargo.lock | 2 + pageserver/src/http/routes.rs | 30 +------ test_runner/batch_others/test_wal_acceptor.py | 29 ++++++ test_runner/fixtures/zenith_fixtures.py | 45 ++++++++-- walkeeper/Cargo.toml | 2 + walkeeper/src/bin/safekeeper.rs | 6 +- walkeeper/src/http/mod.rs | 2 + walkeeper/src/http/routes.rs | 88 +++++++++++++++++++ walkeeper/src/lib.rs | 1 + walkeeper/src/timeline.rs | 2 +- zenith_utils/src/http/mod.rs | 1 + zenith_utils/src/http/request.rs | 33 +++++++ 12 files changed, 202 insertions(+), 39 deletions(-) create mode 100644 walkeeper/src/http/mod.rs create mode 100644 walkeeper/src/http/routes.rs create mode 100644 zenith_utils/src/http/request.rs diff --git a/Cargo.lock b/Cargo.lock index 6ea1fd5593..5f36f48966 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2326,6 +2326,7 @@ dependencies = [ "fs2", "hex", "humantime", + "hyper", "lazy_static", "log", "pageserver", @@ -2333,6 +2334,7 @@ dependencies = [ "postgres-protocol", "postgres_ffi", "regex", + "routerify", "rust-s3", "serde", "serde_json", diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 8b15e438da..cacb98ec84 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1,4 +1,3 @@ -use std::str::FromStr; use std::sync::Arc; use anyhow::Result; @@ -16,6 +15,8 @@ use zenith_utils::http::{ endpoint, error::HttpErrorBody, json::{json_request, json_response}, + request::get_request_param, + request::parse_request_param, }; use super::models::BranchCreateRequest; @@ -57,33 +58,6 @@ fn get_config(request: &Request) -> &'static PageServerConf { get_state(request).conf } -fn get_request_param<'a>( - request: &'a Request, - param_name: &str, -) -> Result<&'a str, ApiError> { - match request.param(param_name) { - Some(arg) => Ok(arg), - None => { - return Err(ApiError::BadRequest(format!( - "no {} specified in path param", - param_name - ))) - } - } -} - -fn parse_request_param( - request: &Request, - param_name: &str, -) -> Result { - match get_request_param(request, param_name)?.parse() { - Ok(v) => Ok(v), - Err(_) => Err(ApiError::BadRequest( - "failed to parse tenant id".to_string(), - )), - } -} - // healthcheck handler async fn status_handler(_: Request) -> Result, ApiError> { Ok(Response::builder() diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 6f5e877714..263757e2e7 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -204,6 +204,7 @@ def test_race_conditions(zenith_cli, pageserver: ZenithPageserver, postgres: Pos stop_value.value = 1 proc.join() + class ProposerPostgres: """Object for running safekeepers sync with walproposer""" def __init__(self, pgdata_dir: str, pg_bin: PgBin, timeline_id: str, tenant_id: str): @@ -292,3 +293,31 @@ def test_sync_safekeepers(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorF log.info(f"lsn after sync = {lsn_after_sync}") assert all(lsn_after_sync == lsn for lsn in lsn_after_append) + + +def test_timeline_status(zenith_cli, pageserver, postgres, wa_factory: WalAcceptorFactory): + wa_factory.start_n_new(1) + + zenith_cli.run(["branch", "test_timeline_status", "empty"]) + pg = postgres.create_start('test_timeline_status', wal_acceptors=wa_factory.get_connstrs()) + + wa = wa_factory.instances[0] + wa_http_cli = wa.http_client() + wa_http_cli.check_status() + + # learn zenith timeline from compute + tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0] + timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0] + + # fetch something sensible from status + epoch = wa_http_cli.timeline_status(tenant_id, timeline_id).acceptor_epoch + + pg.safe_psql("create table t(i int)") + + # ensure epoch goes up after reboot + pg.stop().start() + pg.safe_psql("insert into t values(10)") + + epoch_after_reboot = wa_http_cli.timeline_status(tenant_id, + timeline_id).acceptor_epoch + assert epoch_after_reboot > epoch diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index efdd966f02..80246193fa 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -834,14 +834,21 @@ class WalAcceptor: env = {'PAGESERVER_AUTH_TOKEN': self.auth_token} if self.auth_token else None subprocess.run(cmd, check=True, env=env) - # wait for wal acceptor start by checkking that pid is readable - for _ in range(3): - pid = self.get_pid() - if pid is not None: - return self - time.sleep(0.5) - - raise RuntimeError("cannot get wal acceptor pid") + # wait for wal acceptor start by checking its status + started_at = time.time() + while True: + try: + http_cli = self.http_client() + http_cli.check_status() + except Exception as e: + elapsed = time.time() - started_at + if elapsed > 3: + raise RuntimeError( + f"timed out waiting {elapsed:.0f}s for wal acceptor start: {e}") + time.sleep(0.5) + else: + break # success + return self @property def pidfile(self) -> Path: @@ -894,6 +901,10 @@ class WalAcceptor: log.info(f"JSON_CTRL response: {all[0][0]}") return json.loads(all[0][0]) + def http_client(self): + return WalAcceptorHttpClient(port=self.port.http) + + class WalAcceptorFactory: """ An object representing multiple running wal acceptors. """ def __init__(self, zenith_binpath: Path, data_dir: Path, pageserver_port: int, port_distributor: PortDistributor): @@ -955,6 +966,24 @@ def wa_factory(zenith_binpath: str, repo_dir: str, pageserver_port: PageserverPo log.info('Starting wal acceptors cleanup') wafactory.stop_all() +@dataclass +class PageserverTimelineStatus: + acceptor_epoch: int + +class WalAcceptorHttpClient(requests.Session): + def __init__(self, port: int) -> None: + super().__init__() + self.port = port + + def check_status(self): + self.get(f"http://localhost:{self.port}/v1/status").raise_for_status() + + def timeline_status(self, tenant_id: str, timeline_id: str) -> PageserverTimelineStatus: + res = self.get(f"http://localhost:{self.port}/v1/timeline/{tenant_id}/{timeline_id}") + res.raise_for_status() + resj = res.json() + return PageserverTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch']) + @zenfixture def base_dir() -> str: diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index f31126eb05..2e2e435236 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -11,6 +11,8 @@ regex = "1.4.5" bincode = "1.3" bytes = "1.0.1" byteorder = "1.4.3" +hyper = "0.14" +routerify = "2" fs2 = "0.4.3" lazy_static = "1.4.0" serde_json = "1" diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index 0a6448ffd3..7ce8765789 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -14,6 +14,7 @@ use zenith_utils::http::endpoint; use zenith_utils::logging; use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR}; +use walkeeper::http; use walkeeper::s3_offload; use walkeeper::wal_service; use walkeeper::WalAcceptorConf; @@ -164,11 +165,12 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> { let mut threads = Vec::new(); + let conf_cloned = conf.clone(); let http_endpoint_thread = thread::Builder::new() .name("http_endpoint_thread".into()) .spawn(|| { - // No authentication at all: read-only metrics only, early stage. - let router = endpoint::make_router(); + // TODO authentication + let router = http::make_router(conf_cloned); endpoint::serve_thread_main(router, http_listener).unwrap(); }) .unwrap(); diff --git a/walkeeper/src/http/mod.rs b/walkeeper/src/http/mod.rs new file mode 100644 index 0000000000..c82d1c0362 --- /dev/null +++ b/walkeeper/src/http/mod.rs @@ -0,0 +1,2 @@ +pub mod routes; +pub use routes::make_router; diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs new file mode 100644 index 0000000000..8ab405508e --- /dev/null +++ b/walkeeper/src/http/routes.rs @@ -0,0 +1,88 @@ +use hyper::{Body, Request, Response, StatusCode}; +use routerify::ext::RequestExt; +use routerify::RouterBuilder; +use serde::Serialize; +use serde::Serializer; +use std::fmt::Display; +use std::sync::Arc; +use zenith_utils::lsn::Lsn; + +use crate::safekeeper::AcceptorState; +use crate::timeline::CreateControlFile; +use crate::timeline::GlobalTimelines; +use crate::WalAcceptorConf; +use zenith_utils::http::endpoint; +use zenith_utils::http::error::ApiError; +use zenith_utils::http::json::json_response; +use zenith_utils::http::request::parse_request_param; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; + +/// Healthcheck handler. +async fn status_handler(_: Request) -> Result, ApiError> { + Ok(json_response(StatusCode::OK, "")?) +} + +fn get_conf(request: &Request) -> &WalAcceptorConf { + request + .data::>() + .expect("unknown state type") + .as_ref() +} + +fn display_serialize(z: &F, s: S) -> Result +where + S: Serializer, + F: Display, +{ + s.serialize_str(&format!("{}", z)) +} + +/// Info about timeline on safekeeper ready for reporting. +#[derive(Debug, Serialize)] +struct TimelineStatus { + #[serde(serialize_with = "display_serialize")] + tenant_id: ZTenantId, + #[serde(serialize_with = "display_serialize")] + timeline_id: ZTimelineId, + acceptor_state: AcceptorState, + #[serde(serialize_with = "display_serialize")] + commit_lsn: Lsn, + #[serde(serialize_with = "display_serialize")] + truncate_lsn: Lsn, +} + +/// Report info about timeline. +async fn timeline_status_handler(request: Request) -> Result, ApiError> { + let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; + let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?; + + let tli = GlobalTimelines::get( + get_conf(&request), + tenant_id, + timeline_id, + CreateControlFile::False, + ) + .map_err(ApiError::from_err)?; + let sk_state = tli.get_info(); + + let status = TimelineStatus { + tenant_id, + timeline_id, + acceptor_state: sk_state.acceptor_state, + commit_lsn: sk_state.commit_lsn, + truncate_lsn: sk_state.truncate_lsn, + }; + Ok(json_response(StatusCode::OK, status)?) +} + +/// Safekeeper http router. +pub fn make_router(conf: WalAcceptorConf) -> RouterBuilder { + let router = endpoint::make_router(); + router + .data(Arc::new(conf)) + .get("/v1/status", status_handler) + .get( + "/v1/timeline/:tenant_id/:timeline_id", + timeline_status_handler, + ) +} diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 6c1f70efa2..4406823076 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -2,6 +2,7 @@ use std::path::PathBuf; use std::time::Duration; +pub mod http; pub mod json_ctrl; pub mod receive_wal; pub mod replication; diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 5ee97a8a28..82aa6d6d36 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -289,7 +289,7 @@ lazy_static! { } /// A zero-sized struct used to manage access to the global timelines map. -struct GlobalTimelines; +pub struct GlobalTimelines; impl GlobalTimelines { /// Get a timeline with control file loaded from the global TIMELINES map. diff --git a/zenith_utils/src/http/mod.rs b/zenith_utils/src/http/mod.rs index b6740ad543..16b7e87721 100644 --- a/zenith_utils/src/http/mod.rs +++ b/zenith_utils/src/http/mod.rs @@ -1,3 +1,4 @@ pub mod endpoint; pub mod error; pub mod json; +pub mod request; diff --git a/zenith_utils/src/http/request.rs b/zenith_utils/src/http/request.rs new file mode 100644 index 0000000000..3bc8993c26 --- /dev/null +++ b/zenith_utils/src/http/request.rs @@ -0,0 +1,33 @@ +use std::str::FromStr; + +use super::error::ApiError; +use hyper::{Body, Request}; +use routerify::ext::RequestExt; + +pub fn get_request_param<'a>( + request: &'a Request, + param_name: &str, +) -> Result<&'a str, ApiError> { + match request.param(param_name) { + Some(arg) => Ok(arg), + None => { + return Err(ApiError::BadRequest(format!( + "no {} specified in path param", + param_name + ))) + } + } +} + +pub fn parse_request_param( + request: &Request, + param_name: &str, +) -> Result { + match get_request_param(request, param_name)?.parse() { + Ok(v) => Ok(v), + Err(_) => Err(ApiError::BadRequest(format!( + "failed to parse {}", + param_name + ))), + } +}