Add /timeline http request to safekeeper returning its status.

Which is mainly generational state (terms) and useful LSNs.

Also add /status basic healthcheck request which is now used in tests to
determine the safekeeper is up; this fixes #726.

ref #115
This commit is contained in:
Arseny Sher
2021-10-12 22:04:17 +03:00
parent 0e026371ec
commit de744a44dd
12 changed files with 202 additions and 39 deletions

2
Cargo.lock generated
View File

@@ -2326,6 +2326,7 @@ dependencies = [
"fs2",
"hex",
"humantime",
"hyper",
"lazy_static",
"log",
"pageserver",
@@ -2333,6 +2334,7 @@ dependencies = [
"postgres-protocol",
"postgres_ffi",
"regex",
"routerify",
"rust-s3",
"serde",
"serde_json",

View File

@@ -1,4 +1,3 @@
use std::str::FromStr;
use std::sync::Arc;
use anyhow::Result;
@@ -16,6 +15,8 @@ use zenith_utils::http::{
endpoint,
error::HttpErrorBody,
json::{json_request, json_response},
request::get_request_param,
request::parse_request_param,
};
use super::models::BranchCreateRequest;
@@ -57,33 +58,6 @@ fn get_config(request: &Request<Body>) -> &'static PageServerConf {
get_state(request).conf
}
fn get_request_param<'a>(
request: &'a Request<Body>,
param_name: &str,
) -> Result<&'a str, ApiError> {
match request.param(param_name) {
Some(arg) => Ok(arg),
None => {
return Err(ApiError::BadRequest(format!(
"no {} specified in path param",
param_name
)))
}
}
}
fn parse_request_param<T: FromStr>(
request: &Request<Body>,
param_name: &str,
) -> Result<T, ApiError> {
match get_request_param(request, param_name)?.parse() {
Ok(v) => Ok(v),
Err(_) => Err(ApiError::BadRequest(
"failed to parse tenant id".to_string(),
)),
}
}
// healthcheck handler
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
Ok(Response::builder()

View File

@@ -204,6 +204,7 @@ def test_race_conditions(zenith_cli, pageserver: ZenithPageserver, postgres: Pos
stop_value.value = 1
proc.join()
class ProposerPostgres:
"""Object for running safekeepers sync with walproposer"""
def __init__(self, pgdata_dir: str, pg_bin: PgBin, timeline_id: str, tenant_id: str):
@@ -292,3 +293,31 @@ def test_sync_safekeepers(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorF
log.info(f"lsn after sync = {lsn_after_sync}")
assert all(lsn_after_sync == lsn for lsn in lsn_after_append)
def test_timeline_status(zenith_cli, pageserver, postgres, wa_factory: WalAcceptorFactory):
wa_factory.start_n_new(1)
zenith_cli.run(["branch", "test_timeline_status", "empty"])
pg = postgres.create_start('test_timeline_status', wal_acceptors=wa_factory.get_connstrs())
wa = wa_factory.instances[0]
wa_http_cli = wa.http_client()
wa_http_cli.check_status()
# learn zenith timeline from compute
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
# fetch something sensible from status
epoch = wa_http_cli.timeline_status(tenant_id, timeline_id).acceptor_epoch
pg.safe_psql("create table t(i int)")
# ensure epoch goes up after reboot
pg.stop().start()
pg.safe_psql("insert into t values(10)")
epoch_after_reboot = wa_http_cli.timeline_status(tenant_id,
timeline_id).acceptor_epoch
assert epoch_after_reboot > epoch

View File

@@ -834,14 +834,21 @@ class WalAcceptor:
env = {'PAGESERVER_AUTH_TOKEN': self.auth_token} if self.auth_token else None
subprocess.run(cmd, check=True, env=env)
# wait for wal acceptor start by checkking that pid is readable
for _ in range(3):
pid = self.get_pid()
if pid is not None:
return self
time.sleep(0.5)
raise RuntimeError("cannot get wal acceptor pid")
# wait for wal acceptor start by checking its status
started_at = time.time()
while True:
try:
http_cli = self.http_client()
http_cli.check_status()
except Exception as e:
elapsed = time.time() - started_at
if elapsed > 3:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s for wal acceptor start: {e}")
time.sleep(0.5)
else:
break # success
return self
@property
def pidfile(self) -> Path:
@@ -894,6 +901,10 @@ class WalAcceptor:
log.info(f"JSON_CTRL response: {all[0][0]}")
return json.loads(all[0][0])
def http_client(self):
return WalAcceptorHttpClient(port=self.port.http)
class WalAcceptorFactory:
""" An object representing multiple running wal acceptors. """
def __init__(self, zenith_binpath: Path, data_dir: Path, pageserver_port: int, port_distributor: PortDistributor):
@@ -955,6 +966,24 @@ def wa_factory(zenith_binpath: str, repo_dir: str, pageserver_port: PageserverPo
log.info('Starting wal acceptors cleanup')
wafactory.stop_all()
@dataclass
class PageserverTimelineStatus:
acceptor_epoch: int
class WalAcceptorHttpClient(requests.Session):
def __init__(self, port: int) -> None:
super().__init__()
self.port = port
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def timeline_status(self, tenant_id: str, timeline_id: str) -> PageserverTimelineStatus:
res = self.get(f"http://localhost:{self.port}/v1/timeline/{tenant_id}/{timeline_id}")
res.raise_for_status()
resj = res.json()
return PageserverTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'])
@zenfixture
def base_dir() -> str:

View File

@@ -11,6 +11,8 @@ regex = "1.4.5"
bincode = "1.3"
bytes = "1.0.1"
byteorder = "1.4.3"
hyper = "0.14"
routerify = "2"
fs2 = "0.4.3"
lazy_static = "1.4.0"
serde_json = "1"

View File

@@ -14,6 +14,7 @@ use zenith_utils::http::endpoint;
use zenith_utils::logging;
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use walkeeper::http;
use walkeeper::s3_offload;
use walkeeper::wal_service;
use walkeeper::WalAcceptorConf;
@@ -164,11 +165,12 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<()> {
let mut threads = Vec::new();
let conf_cloned = conf.clone();
let http_endpoint_thread = thread::Builder::new()
.name("http_endpoint_thread".into())
.spawn(|| {
// No authentication at all: read-only metrics only, early stage.
let router = endpoint::make_router();
// TODO authentication
let router = http::make_router(conf_cloned);
endpoint::serve_thread_main(router, http_listener).unwrap();
})
.unwrap();

View File

@@ -0,0 +1,2 @@
pub mod routes;
pub use routes::make_router;

View File

@@ -0,0 +1,88 @@
use hyper::{Body, Request, Response, StatusCode};
use routerify::ext::RequestExt;
use routerify::RouterBuilder;
use serde::Serialize;
use serde::Serializer;
use std::fmt::Display;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
use crate::safekeeper::AcceptorState;
use crate::timeline::CreateControlFile;
use crate::timeline::GlobalTimelines;
use crate::WalAcceptorConf;
use zenith_utils::http::endpoint;
use zenith_utils::http::error::ApiError;
use zenith_utils::http::json::json_response;
use zenith_utils::http::request::parse_request_param;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
/// Healthcheck handler.
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
Ok(json_response(StatusCode::OK, "")?)
}
fn get_conf(request: &Request<Body>) -> &WalAcceptorConf {
request
.data::<Arc<WalAcceptorConf>>()
.expect("unknown state type")
.as_ref()
}
fn display_serialize<S, F>(z: &F, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
F: Display,
{
s.serialize_str(&format!("{}", z))
}
/// Info about timeline on safekeeper ready for reporting.
#[derive(Debug, Serialize)]
struct TimelineStatus {
#[serde(serialize_with = "display_serialize")]
tenant_id: ZTenantId,
#[serde(serialize_with = "display_serialize")]
timeline_id: ZTimelineId,
acceptor_state: AcceptorState,
#[serde(serialize_with = "display_serialize")]
commit_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
truncate_lsn: Lsn,
}
/// Report info about timeline.
async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
let tli = GlobalTimelines::get(
get_conf(&request),
tenant_id,
timeline_id,
CreateControlFile::False,
)
.map_err(ApiError::from_err)?;
let sk_state = tli.get_info();
let status = TimelineStatus {
tenant_id,
timeline_id,
acceptor_state: sk_state.acceptor_state,
commit_lsn: sk_state.commit_lsn,
truncate_lsn: sk_state.truncate_lsn,
};
Ok(json_response(StatusCode::OK, status)?)
}
/// Safekeeper http router.
pub fn make_router(conf: WalAcceptorConf) -> RouterBuilder<hyper::Body, ApiError> {
let router = endpoint::make_router();
router
.data(Arc::new(conf))
.get("/v1/status", status_handler)
.get(
"/v1/timeline/:tenant_id/:timeline_id",
timeline_status_handler,
)
}

View File

@@ -2,6 +2,7 @@
use std::path::PathBuf;
use std::time::Duration;
pub mod http;
pub mod json_ctrl;
pub mod receive_wal;
pub mod replication;

View File

@@ -289,7 +289,7 @@ lazy_static! {
}
/// A zero-sized struct used to manage access to the global timelines map.
struct GlobalTimelines;
pub struct GlobalTimelines;
impl GlobalTimelines {
/// Get a timeline with control file loaded from the global TIMELINES map.

View File

@@ -1,3 +1,4 @@
pub mod endpoint;
pub mod error;
pub mod json;
pub mod request;

View File

@@ -0,0 +1,33 @@
use std::str::FromStr;
use super::error::ApiError;
use hyper::{Body, Request};
use routerify::ext::RequestExt;
pub fn get_request_param<'a>(
request: &'a Request<Body>,
param_name: &str,
) -> Result<&'a str, ApiError> {
match request.param(param_name) {
Some(arg) => Ok(arg),
None => {
return Err(ApiError::BadRequest(format!(
"no {} specified in path param",
param_name
)))
}
}
}
pub fn parse_request_param<T: FromStr>(
request: &Request<Body>,
param_name: &str,
) -> Result<T, ApiError> {
match get_request_param(request, param_name)?.parse() {
Ok(v) => Ok(v),
Err(_) => Err(ApiError::BadRequest(format!(
"failed to parse {}",
param_name
))),
}
}