From c51d545fd974385c104799b9e18d67d6a8047afa Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Fri, 4 Mar 2022 11:11:42 +0200 Subject: [PATCH] Serialize Lsn as strings in http api --- control_plane/src/local_env.rs | 26 ++-- control_plane/src/storage.rs | 32 +++-- pageserver/src/http/models.rs | 117 ++++++++++++++++-- pageserver/src/http/routes.rs | 32 +++-- pageserver/src/timelines.rs | 11 +- .../batch_others/test_pageserver_api.py | 2 +- .../batch_others/test_remote_storage.py | 2 +- test_runner/batch_others/test_wal_acceptor.py | 4 +- zenith/src/main.rs | 16 +-- 9 files changed, 175 insertions(+), 67 deletions(-) diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 9278a9df5a..2a1d51fe08 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -12,9 +12,9 @@ use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use zenith_utils::auth::{encode_from_key_file, Claims, Scope}; use zenith_utils::postgres_backend::AuthType; -use zenith_utils::zid::ZTenantTimelineId; -use zenith_utils::zid::ZTimelineId; -use zenith_utils::zid::{HexZTenantId, ZNodeId, ZTenantId}; +use zenith_utils::zid::{ + HexZTenantId, HexZTimelineId, ZNodeId, ZTenantId, ZTenantTimelineId, ZTimelineId, +}; use crate::safekeeper::SafekeeperNode; @@ -61,12 +61,12 @@ pub struct LocalEnv { #[serde(default)] pub safekeepers: Vec, - /// Keep human-readable aliases in memory (and persist them to config), to hind ZId hex strings from the user. + /// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user. #[serde(default)] // A `HashMap>` would be more appropriate here, // but deserialization into a generic toml object as `toml::Value::try_from` fails with an error. // https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table". - branch_name_mappings: HashMap>, + branch_name_mappings: HashMap>, } #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] @@ -164,6 +164,9 @@ impl LocalEnv { .entry(branch_name.clone()) .or_default(); + let tenant_id = HexZTenantId::from(tenant_id); + let timeline_id = HexZTimelineId::from(timeline_id); + let existing_ids = existing_values .iter() .find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id); @@ -190,22 +193,29 @@ impl LocalEnv { branch_name: &str, tenant_id: ZTenantId, ) -> Option { + let tenant_id = HexZTenantId::from(tenant_id); self.branch_name_mappings .get(branch_name)? .iter() .find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id) .map(|&(_, timeline_id)| timeline_id) + .map(ZTimelineId::from) } pub fn timeline_name_mappings(&self) -> HashMap { self.branch_name_mappings .iter() - .map(|(name, tenant_timelines)| { + .flat_map(|(name, tenant_timelines)| { tenant_timelines.iter().map(|&(tenant_id, timeline_id)| { - (ZTenantTimelineId::new(tenant_id, timeline_id), name.clone()) + ( + ZTenantTimelineId::new( + ZTenantId::from(tenant_id), + ZTimelineId::from(timeline_id), + ), + name.clone(), + ) }) }) - .flatten() .collect() } diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 3c68823760..259fc79708 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -1,3 +1,4 @@ +use std::convert::TryFrom; use std::io::Write; use std::net::TcpStream; use std::path::PathBuf; @@ -9,7 +10,7 @@ use anyhow::{bail, Context}; use nix::errno::Errno; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; -use pageserver::http::models::{TenantCreateRequest, TimelineCreateRequest}; +use pageserver::http::models::{TenantCreateRequest, TimelineCreateRequest, TimelineInfoResponse}; use pageserver::timelines::TimelineInfo; use postgres::{Config, NoTls}; use reqwest::blocking::{Client, RequestBuilder, Response}; @@ -18,7 +19,7 @@ use thiserror::Error; use zenith_utils::http::error::HttpErrorBody; use zenith_utils::lsn::Lsn; use zenith_utils::postgres_backend::AuthType; -use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use zenith_utils::zid::{HexZTenantId, HexZTimelineId, ZTenantId, ZTimelineId}; use crate::local_env::LocalEnv; use crate::{fill_rust_env_vars, read_pidfile}; @@ -339,7 +340,9 @@ impl PageServerNode { pub fn tenant_create(&self, new_tenant_id: Option) -> anyhow::Result { let tenant_id_string = self .http_request(Method::POST, format!("{}/tenant", self.http_base_url)) - .json(&TenantCreateRequest { new_tenant_id }) + .json(&TenantCreateRequest { + new_tenant_id: new_tenant_id.map(HexZTenantId::from), + }) .send()? .error_from_body()? .json::()?; @@ -351,15 +354,20 @@ impl PageServerNode { }) } - pub fn timeline_list(&self, tenant_id: &ZTenantId) -> Result> { - Ok(self + pub fn timeline_list(&self, tenant_id: &ZTenantId) -> anyhow::Result> { + let timeline_infos: Vec = self .http_request( Method::GET, format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id), ) .send()? .error_from_body()? - .json()?) + .json()?; + + timeline_infos + .into_iter() + .map(TimelineInfo::try_from) + .collect() } pub fn timeline_create( @@ -368,20 +376,22 @@ impl PageServerNode { new_timeline_id: Option, ancestor_start_lsn: Option, ancestor_timeline_id: Option, - ) -> Result { - Ok(self + ) -> anyhow::Result { + let timeline_info_response = self .http_request( Method::POST, format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id), ) .json(&TimelineCreateRequest { - new_timeline_id, + new_timeline_id: new_timeline_id.map(HexZTimelineId::from), ancestor_start_lsn, - ancestor_timeline_id, + ancestor_timeline_id: ancestor_timeline_id.map(HexZTimelineId::from), }) .send()? .error_from_body()? - .json()?) + .json::()?; + + TimelineInfo::try_from(timeline_info_response) } } diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index 28d9791438..9844e7ea82 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -1,26 +1,121 @@ +use crate::timelines::TimelineInfo; +use anyhow::{anyhow, bail, Context}; use serde::{Deserialize, Serialize}; -use zenith_utils::zid::ZNodeId; use zenith_utils::{ lsn::Lsn, - zid::{opt_display_serde, ZTenantId, ZTimelineId}, + zid::{HexZTenantId, HexZTimelineId, ZNodeId, ZTenantId, ZTimelineId}, }; #[derive(Serialize, Deserialize)] pub struct TimelineCreateRequest { - #[serde(default)] - #[serde(with = "opt_display_serde")] - pub new_timeline_id: Option, - #[serde(default)] - #[serde(with = "opt_display_serde")] - pub ancestor_timeline_id: Option, + pub new_timeline_id: Option, + pub ancestor_timeline_id: Option, pub ancestor_start_lsn: Option, } #[derive(Serialize, Deserialize)] pub struct TenantCreateRequest { - #[serde(default)] - #[serde(with = "opt_display_serde")] - pub new_tenant_id: Option, + pub new_tenant_id: Option, +} + +#[derive(Serialize, Deserialize)] +pub struct TimelineInfoResponse { + pub kind: String, + #[serde(with = "hex")] + timeline_id: ZTimelineId, + #[serde(with = "hex")] + tenant_id: ZTenantId, + disk_consistent_lsn: String, + last_record_lsn: Option, + prev_record_lsn: Option, + ancestor_timeline_id: Option, + ancestor_lsn: Option, + current_logical_size: Option, + current_logical_size_non_incremental: Option, +} + +impl From for TimelineInfoResponse { + fn from(other: TimelineInfo) -> Self { + match other { + TimelineInfo::Local { + timeline_id, + tenant_id, + last_record_lsn, + prev_record_lsn, + ancestor_timeline_id, + ancestor_lsn, + disk_consistent_lsn, + current_logical_size, + current_logical_size_non_incremental, + } => TimelineInfoResponse { + kind: "Local".to_owned(), + timeline_id, + tenant_id, + disk_consistent_lsn: disk_consistent_lsn.to_string(), + last_record_lsn: Some(last_record_lsn.to_string()), + prev_record_lsn: Some(prev_record_lsn.to_string()), + ancestor_timeline_id: ancestor_timeline_id.map(HexZTimelineId::from), + ancestor_lsn: ancestor_lsn.map(|lsn| lsn.to_string()), + current_logical_size: Some(current_logical_size), + current_logical_size_non_incremental, + }, + TimelineInfo::Remote { + timeline_id, + tenant_id, + disk_consistent_lsn, + } => TimelineInfoResponse { + kind: "Remote".to_owned(), + timeline_id, + tenant_id, + disk_consistent_lsn: disk_consistent_lsn.to_string(), + last_record_lsn: None, + prev_record_lsn: None, + ancestor_timeline_id: None, + ancestor_lsn: None, + current_logical_size: None, + current_logical_size_non_incremental: None, + }, + } + } +} + +impl TryFrom for TimelineInfo { + type Error = anyhow::Error; + + fn try_from(other: TimelineInfoResponse) -> anyhow::Result { + let parse_lsn_hex_string = |lsn_string: String| { + lsn_string + .parse::() + .with_context(|| format!("Failed to parse Lsn as hex string from '{}'", lsn_string)) + }; + + let disk_consistent_lsn = parse_lsn_hex_string(other.disk_consistent_lsn)?; + Ok(match other.kind.as_str() { + "Local" => TimelineInfo::Local { + timeline_id: other.timeline_id, + tenant_id: other.tenant_id, + last_record_lsn: other + .last_record_lsn + .ok_or(anyhow!("Local timeline should have last_record_lsn")) + .and_then(parse_lsn_hex_string)?, + prev_record_lsn: other + .prev_record_lsn + .ok_or(anyhow!("Local timeline should have prev_record_lsn")) + .and_then(parse_lsn_hex_string)?, + ancestor_timeline_id: other.ancestor_timeline_id.map(ZTimelineId::from), + ancestor_lsn: other.ancestor_lsn.map(parse_lsn_hex_string).transpose()?, + disk_consistent_lsn, + current_logical_size: other.current_logical_size.ok_or(anyhow!("No "))?, + current_logical_size_non_incremental: other.current_logical_size_non_incremental, + }, + "Remote" => TimelineInfo::Remote { + timeline_id: other.timeline_id, + tenant_id: other.tenant_id, + disk_consistent_lsn, + }, + unknown => bail!("Unknown timeline kind: {}", unknown), + }) + } } #[derive(Serialize)] diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index efcc7ae2f3..abc4043bdd 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -16,11 +16,11 @@ use zenith_utils::http::{ request::parse_request_param, }; use zenith_utils::http::{RequestExt, RouterBuilder}; -use zenith_utils::zid::{HexZTimelineId, ZTimelineId}; +use zenith_utils::zid::{HexZTenantId, ZTimelineId}; -use super::models::StatusResponse; -use super::models::TenantCreateRequest; -use super::models::TimelineCreateRequest; +use super::models::{ + StatusResponse, TenantCreateRequest, TimelineCreateRequest, TimelineInfoResponse, +}; use crate::repository::RepositoryTimeline; use crate::timelines::TimelineInfo; use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId}; @@ -79,13 +79,13 @@ async fn timeline_create_handler(mut request: Request) -> Result) -> Result, let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request); - let response_data = tokio::task::spawn_blocking(move || { + let response_data: Vec = tokio::task::spawn_blocking(move || { let _enter = info_span!("timeline_list", tenant = %tenant_id).entered(); crate::timelines::get_timelines(tenant_id, include_non_incremental_logical_size) }) .await - .map_err(ApiError::from_err)??; + .map_err(ApiError::from_err)?? + .into_iter() + .map(TimelineInfoResponse::from) + .collect(); Ok(json_response(StatusCode::OK, response_data)?) } @@ -137,7 +140,8 @@ async fn timeline_detail_handler(request: Request) -> Result) -> Result, ancestor_lsn: Option, disk_consistent_lsn: Lsn, @@ -41,9 +36,7 @@ pub enum TimelineInfo { current_logical_size_non_incremental: Option, }, Remote { - #[serde(with = "hex")] timeline_id: ZTimelineId, - #[serde(with = "hex")] tenant_id: ZTenantId, disk_consistent_lsn: Lsn, }, diff --git a/test_runner/batch_others/test_pageserver_api.py b/test_runner/batch_others/test_pageserver_api.py index 7d2c0800a2..41b1899882 100644 --- a/test_runner/batch_others/test_pageserver_api.py +++ b/test_runner/batch_others/test_pageserver_api.py @@ -39,7 +39,7 @@ def check_client(client: ZenithPageserverHttpClient, initial_tenant: UUID): timeline_id_str = str(timeline['timeline_id']) timeline_details = client.timeline_detail(tenant_id=tenant_id, timeline_id=UUID(timeline_id_str)) - assert timeline_details['type'] == 'Local' + assert timeline_details['kind'] == 'Local' assert timeline_details['tenant_id'] == tenant_id.hex assert timeline_details['timeline_id'] == timeline_id_str diff --git a/test_runner/batch_others/test_remote_storage.py b/test_runner/batch_others/test_remote_storage.py index abd06bf5e9..edcc768819 100644 --- a/test_runner/batch_others/test_remote_storage.py +++ b/test_runner/batch_others/test_remote_storage.py @@ -85,7 +85,7 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder, timeline_details = client.timeline_detail(UUID(tenant_id), UUID(timeline_id)) assert timeline_details['timeline_id'] == timeline_id assert timeline_details['tenant_id'] == tenant_id - if timeline_details['type'] == 'Local': + if timeline_details['kind'] == 'Local': log.info("timeline downloaded, checking its data") break attempts += 1 diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 9518a14b75..02da7ee749 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -13,7 +13,7 @@ from dataclasses import dataclass, field from multiprocessing import Process, Value from pathlib import Path from fixtures.zenith_fixtures import PgBin, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol -from fixtures.utils import lsn_to_hex, mkdir_if_needed +from fixtures.utils import lsn_to_hex, mkdir_if_needed, lsn_from_hex from fixtures.log_helper import log from typing import List, Optional, Any @@ -91,7 +91,7 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder): m = TimelineMetrics( timeline_id=timeline_id, - last_record_lsn=timeline_detail["last_record_lsn"], + last_record_lsn=lsn_from_hex(timeline_detail["last_record_lsn"]), ) for sk_m in sk_metrics: m.flush_lsns.append(sk_m.flush_lsn_inexact[(tenant_id.hex, timeline_id)]) diff --git a/zenith/src/main.rs b/zenith/src/main.rs index c4636fa1a6..165a7d7950 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -81,9 +81,6 @@ fn main() -> Result<()> { .required(false); let pg_node_arg = Arg::new("node").help("Postgres node name").required(false); - let safekeeper_node_arg = Arg::new("node") - .help("Safekeeper node name") - .required(false); let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false); @@ -361,7 +358,7 @@ fn print_timeline( print!("{} ", local_or_remote); if nesting_level > 0 { - let lsn_string = match timeline.info { + let lsn_string = match &timeline.info { TimelineInfo::Local { ancestor_lsn, .. } => ancestor_lsn .map(|lsn| lsn.to_string()) .unwrap_or_else(|| "Unknown local Lsn".to_string()), @@ -430,14 +427,11 @@ fn get_timeline_infos( env: &local_env::LocalEnv, tenant_id: &ZTenantId, ) -> Result> { - let page_server = PageServerNode::from_env(env); - let timeline_infos: Vec = page_server.timeline_list(tenant_id)?; - let timeline_infos: HashMap = timeline_infos + Ok(PageServerNode::from_env(env) + .timeline_list(tenant_id)? .into_iter() .map(|timeline_info| (timeline_info.timeline_id(), timeline_info)) - .collect(); - - Ok(timeline_infos) + .collect()) } // Helper function to parse --tenant_id option, or get the default from config file @@ -486,7 +480,7 @@ fn handle_init(init_match: &ArgMatches) -> Result { .context("Failed to initialize zenith repository")?; // default_tenantid was generated by the `env.init()` call above - let initial_tenant_id = env.default_tenant_id.unwrap(); + let initial_tenant_id = ZTenantId::from(env.default_tenant_id.unwrap()); // Call 'pageserver init'. let pageserver = PageServerNode::from_env(&env);