diff --git a/Cargo.lock b/Cargo.lock
index 6ea1fd5593..5f36f48966 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2326,6 +2326,7 @@ dependencies = [
"fs2",
"hex",
"humantime",
+ "hyper",
"lazy_static",
"log",
"pageserver",
@@ -2333,6 +2334,7 @@ dependencies = [
"postgres-protocol",
"postgres_ffi",
"regex",
+ "routerify",
"rust-s3",
"serde",
"serde_json",
diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs
index 8b15e438da..cacb98ec84 100644
--- a/pageserver/src/http/routes.rs
+++ b/pageserver/src/http/routes.rs
@@ -1,4 +1,3 @@
-use std::str::FromStr;
use std::sync::Arc;
use anyhow::Result;
@@ -16,6 +15,8 @@ use zenith_utils::http::{
endpoint,
error::HttpErrorBody,
json::{json_request, json_response},
+ request::get_request_param,
+ request::parse_request_param,
};
use super::models::BranchCreateRequest;
@@ -57,33 +58,6 @@ fn get_config(request: &Request
) -> &'static PageServerConf {
get_state(request).conf
}
-fn get_request_param<'a>(
- request: &'a Request,
- param_name: &str,
-) -> Result<&'a str, ApiError> {
- match request.param(param_name) {
- Some(arg) => Ok(arg),
- None => {
- return Err(ApiError::BadRequest(format!(
- "no {} specified in path param",
- param_name
- )))
- }
- }
-}
-
-fn parse_request_param(
- request: &Request,
- param_name: &str,
-) -> Result {
- match get_request_param(request, param_name)?.parse() {
- Ok(v) => Ok(v),
- Err(_) => Err(ApiError::BadRequest(
- "failed to parse tenant id".to_string(),
- )),
- }
-}
-
// healthcheck handler
async fn status_handler(_: Request) -> Result, ApiError> {
Ok(Response::builder()
diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py
index 6f5e877714..263757e2e7 100644
--- a/test_runner/batch_others/test_wal_acceptor.py
+++ b/test_runner/batch_others/test_wal_acceptor.py
@@ -204,6 +204,7 @@ def test_race_conditions(zenith_cli, pageserver: ZenithPageserver, postgres: Pos
stop_value.value = 1
proc.join()
+
class ProposerPostgres:
"""Object for running safekeepers sync with walproposer"""
def __init__(self, pgdata_dir: str, pg_bin: PgBin, timeline_id: str, tenant_id: str):
@@ -292,3 +293,31 @@ def test_sync_safekeepers(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorF
log.info(f"lsn after sync = {lsn_after_sync}")
assert all(lsn_after_sync == lsn for lsn in lsn_after_append)
+
+
+def test_timeline_status(zenith_cli, pageserver, postgres, wa_factory: WalAcceptorFactory):
+ wa_factory.start_n_new(1)
+
+ zenith_cli.run(["branch", "test_timeline_status", "empty"])
+ pg = postgres.create_start('test_timeline_status', wal_acceptors=wa_factory.get_connstrs())
+
+ wa = wa_factory.instances[0]
+ wa_http_cli = wa.http_client()
+ wa_http_cli.check_status()
+
+ # learn zenith timeline from compute
+ tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
+ timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
+
+ # fetch something sensible from status
+ epoch = wa_http_cli.timeline_status(tenant_id, timeline_id).acceptor_epoch
+
+ pg.safe_psql("create table t(i int)")
+
+ # ensure epoch goes up after reboot
+ pg.stop().start()
+ pg.safe_psql("insert into t values(10)")
+
+ epoch_after_reboot = wa_http_cli.timeline_status(tenant_id,
+ timeline_id).acceptor_epoch
+ assert epoch_after_reboot > epoch
diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py
index efdd966f02..80246193fa 100644
--- a/test_runner/fixtures/zenith_fixtures.py
+++ b/test_runner/fixtures/zenith_fixtures.py
@@ -834,14 +834,21 @@ class WalAcceptor:
env = {'PAGESERVER_AUTH_TOKEN': self.auth_token} if self.auth_token else None
subprocess.run(cmd, check=True, env=env)
- # wait for wal acceptor start by checkking that pid is readable
- for _ in range(3):
- pid = self.get_pid()
- if pid is not None:
- return self
- time.sleep(0.5)
-
- raise RuntimeError("cannot get wal acceptor pid")
+ # wait for wal acceptor start by checking its status
+ started_at = time.time()
+ while True:
+ try:
+ http_cli = self.http_client()
+ http_cli.check_status()
+ except Exception as e:
+ elapsed = time.time() - started_at
+ if elapsed > 3:
+ raise RuntimeError(
+ f"timed out waiting {elapsed:.0f}s for wal acceptor start: {e}")
+ time.sleep(0.5)
+ else:
+ break # success
+ return self
@property
def pidfile(self) -> Path:
@@ -894,6 +901,10 @@ class WalAcceptor:
log.info(f"JSON_CTRL response: {all[0][0]}")
return json.loads(all[0][0])
+ def http_client(self):
+ return WalAcceptorHttpClient(port=self.port.http)
+
+
class WalAcceptorFactory:
""" An object representing multiple running wal acceptors. """
def __init__(self, zenith_binpath: Path, data_dir: Path, pageserver_port: int, port_distributor: PortDistributor):
@@ -955,6 +966,24 @@ def wa_factory(zenith_binpath: str, repo_dir: str, pageserver_port: PageserverPo
log.info('Starting wal acceptors cleanup')
wafactory.stop_all()
+@dataclass
+class PageserverTimelineStatus:
+ acceptor_epoch: int
+
+class WalAcceptorHttpClient(requests.Session):
+ def __init__(self, port: int) -> None:
+ super().__init__()
+ self.port = port
+
+ def check_status(self):
+ self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
+
+ def timeline_status(self, tenant_id: str, timeline_id: str) -> PageserverTimelineStatus:
+ res = self.get(f"http://localhost:{self.port}/v1/timeline/{tenant_id}/{timeline_id}")
+ res.raise_for_status()
+ resj = res.json()
+ return PageserverTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'])
+
@zenfixture
def base_dir() -> str:
diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml
index f31126eb05..2e2e435236 100644
--- a/walkeeper/Cargo.toml
+++ b/walkeeper/Cargo.toml
@@ -11,6 +11,8 @@ regex = "1.4.5"
bincode = "1.3"
bytes = "1.0.1"
byteorder = "1.4.3"
+hyper = "0.14"
+routerify = "2"
fs2 = "0.4.3"
lazy_static = "1.4.0"
serde_json = "1"
diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs
index 0a6448ffd3..7ce8765789 100644
--- a/walkeeper/src/bin/safekeeper.rs
+++ b/walkeeper/src/bin/safekeeper.rs
@@ -14,6 +14,7 @@ use zenith_utils::http::endpoint;
use zenith_utils::logging;
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
+use walkeeper::http;
use walkeeper::s3_offload;
use walkeeper::wal_service;
use walkeeper::WalAcceptorConf;
@@ -164,11 +165,12 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
let mut threads = Vec::new();
+ let conf_cloned = conf.clone();
let http_endpoint_thread = thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(|| {
- // No authentication at all: read-only metrics only, early stage.
- let router = endpoint::make_router();
+ // TODO authentication
+ let router = http::make_router(conf_cloned);
endpoint::serve_thread_main(router, http_listener).unwrap();
})
.unwrap();
diff --git a/walkeeper/src/http/mod.rs b/walkeeper/src/http/mod.rs
new file mode 100644
index 0000000000..c82d1c0362
--- /dev/null
+++ b/walkeeper/src/http/mod.rs
@@ -0,0 +1,2 @@
+pub mod routes;
+pub use routes::make_router;
diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs
new file mode 100644
index 0000000000..8ab405508e
--- /dev/null
+++ b/walkeeper/src/http/routes.rs
@@ -0,0 +1,88 @@
+use hyper::{Body, Request, Response, StatusCode};
+use routerify::ext::RequestExt;
+use routerify::RouterBuilder;
+use serde::Serialize;
+use serde::Serializer;
+use std::fmt::Display;
+use std::sync::Arc;
+use zenith_utils::lsn::Lsn;
+
+use crate::safekeeper::AcceptorState;
+use crate::timeline::CreateControlFile;
+use crate::timeline::GlobalTimelines;
+use crate::WalAcceptorConf;
+use zenith_utils::http::endpoint;
+use zenith_utils::http::error::ApiError;
+use zenith_utils::http::json::json_response;
+use zenith_utils::http::request::parse_request_param;
+use zenith_utils::zid::{ZTenantId, ZTimelineId};
+
+/// Healthcheck handler.
+async fn status_handler(_: Request) -> Result, ApiError> {
+ Ok(json_response(StatusCode::OK, "")?)
+}
+
+fn get_conf(request: &Request) -> &WalAcceptorConf {
+ request
+ .data::>()
+ .expect("unknown state type")
+ .as_ref()
+}
+
+fn display_serialize(z: &F, s: S) -> Result
+where
+ S: Serializer,
+ F: Display,
+{
+ s.serialize_str(&format!("{}", z))
+}
+
+/// Info about timeline on safekeeper ready for reporting.
+#[derive(Debug, Serialize)]
+struct TimelineStatus {
+ #[serde(serialize_with = "display_serialize")]
+ tenant_id: ZTenantId,
+ #[serde(serialize_with = "display_serialize")]
+ timeline_id: ZTimelineId,
+ acceptor_state: AcceptorState,
+ #[serde(serialize_with = "display_serialize")]
+ commit_lsn: Lsn,
+ #[serde(serialize_with = "display_serialize")]
+ truncate_lsn: Lsn,
+}
+
+/// Report info about timeline.
+async fn timeline_status_handler(request: Request) -> Result, ApiError> {
+ let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
+ let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
+
+ let tli = GlobalTimelines::get(
+ get_conf(&request),
+ tenant_id,
+ timeline_id,
+ CreateControlFile::False,
+ )
+ .map_err(ApiError::from_err)?;
+ let sk_state = tli.get_info();
+
+ let status = TimelineStatus {
+ tenant_id,
+ timeline_id,
+ acceptor_state: sk_state.acceptor_state,
+ commit_lsn: sk_state.commit_lsn,
+ truncate_lsn: sk_state.truncate_lsn,
+ };
+ Ok(json_response(StatusCode::OK, status)?)
+}
+
+/// Safekeeper http router.
+pub fn make_router(conf: WalAcceptorConf) -> RouterBuilder {
+ let router = endpoint::make_router();
+ router
+ .data(Arc::new(conf))
+ .get("/v1/status", status_handler)
+ .get(
+ "/v1/timeline/:tenant_id/:timeline_id",
+ timeline_status_handler,
+ )
+}
diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs
index 6c1f70efa2..4406823076 100644
--- a/walkeeper/src/lib.rs
+++ b/walkeeper/src/lib.rs
@@ -2,6 +2,7 @@
use std::path::PathBuf;
use std::time::Duration;
+pub mod http;
pub mod json_ctrl;
pub mod receive_wal;
pub mod replication;
diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs
index 5ee97a8a28..82aa6d6d36 100644
--- a/walkeeper/src/timeline.rs
+++ b/walkeeper/src/timeline.rs
@@ -289,7 +289,7 @@ lazy_static! {
}
/// A zero-sized struct used to manage access to the global timelines map.
-struct GlobalTimelines;
+pub struct GlobalTimelines;
impl GlobalTimelines {
/// Get a timeline with control file loaded from the global TIMELINES map.
diff --git a/zenith_utils/src/http/mod.rs b/zenith_utils/src/http/mod.rs
index b6740ad543..16b7e87721 100644
--- a/zenith_utils/src/http/mod.rs
+++ b/zenith_utils/src/http/mod.rs
@@ -1,3 +1,4 @@
pub mod endpoint;
pub mod error;
pub mod json;
+pub mod request;
diff --git a/zenith_utils/src/http/request.rs b/zenith_utils/src/http/request.rs
new file mode 100644
index 0000000000..3bc8993c26
--- /dev/null
+++ b/zenith_utils/src/http/request.rs
@@ -0,0 +1,33 @@
+use std::str::FromStr;
+
+use super::error::ApiError;
+use hyper::{Body, Request};
+use routerify::ext::RequestExt;
+
+pub fn get_request_param<'a>(
+ request: &'a Request,
+ param_name: &str,
+) -> Result<&'a str, ApiError> {
+ match request.param(param_name) {
+ Some(arg) => Ok(arg),
+ None => {
+ return Err(ApiError::BadRequest(format!(
+ "no {} specified in path param",
+ param_name
+ )))
+ }
+ }
+}
+
+pub fn parse_request_param(
+ request: &Request,
+ param_name: &str,
+) -> Result {
+ match get_request_param(request, param_name)?.parse() {
+ Ok(v) => Ok(v),
+ Err(_) => Err(ApiError::BadRequest(format!(
+ "failed to parse {}",
+ param_name
+ ))),
+ }
+}