Serialize Lsn as strings in http api

This commit is contained in:
Kirill Bulatov
2022-03-04 11:11:42 +02:00
committed by Kirill Bulatov
parent fe6fccfdae
commit c51d545fd9
9 changed files with 175 additions and 67 deletions

View File

@@ -12,9 +12,9 @@ use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use zenith_utils::auth::{encode_from_key_file, Claims, Scope};
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::ZTenantTimelineId;
use zenith_utils::zid::ZTimelineId;
use zenith_utils::zid::{HexZTenantId, ZNodeId, ZTenantId};
use zenith_utils::zid::{
HexZTenantId, HexZTimelineId, ZNodeId, ZTenantId, ZTenantTimelineId, ZTimelineId,
};
use crate::safekeeper::SafekeeperNode;
@@ -61,12 +61,12 @@ pub struct LocalEnv {
#[serde(default)]
pub safekeepers: Vec<SafekeeperConf>,
/// Keep human-readable aliases in memory (and persist them to config), to hind ZId hex strings from the user.
/// Keep human-readable aliases in memory (and persist them to config), to hide ZId hex strings from the user.
#[serde(default)]
// A `HashMap<String, HashMap<ZTenantId, ZTimelineId>>` would be more appropriate here,
// but deserialization into a generic toml object as `toml::Value::try_from` fails with an error.
// https://toml.io/en/v1.0.0 does not contain a concept of "a table inside another table".
branch_name_mappings: HashMap<String, Vec<(ZTenantId, ZTimelineId)>>,
branch_name_mappings: HashMap<String, Vec<(HexZTenantId, HexZTimelineId)>>,
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
@@ -164,6 +164,9 @@ impl LocalEnv {
.entry(branch_name.clone())
.or_default();
let tenant_id = HexZTenantId::from(tenant_id);
let timeline_id = HexZTimelineId::from(timeline_id);
let existing_ids = existing_values
.iter()
.find(|(existing_tenant_id, _)| existing_tenant_id == &tenant_id);
@@ -190,22 +193,29 @@ impl LocalEnv {
branch_name: &str,
tenant_id: ZTenantId,
) -> Option<ZTimelineId> {
let tenant_id = HexZTenantId::from(tenant_id);
self.branch_name_mappings
.get(branch_name)?
.iter()
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
.map(|&(_, timeline_id)| timeline_id)
.map(ZTimelineId::from)
}
pub fn timeline_name_mappings(&self) -> HashMap<ZTenantTimelineId, String> {
self.branch_name_mappings
.iter()
.map(|(name, tenant_timelines)| {
.flat_map(|(name, tenant_timelines)| {
tenant_timelines.iter().map(|&(tenant_id, timeline_id)| {
(ZTenantTimelineId::new(tenant_id, timeline_id), name.clone())
(
ZTenantTimelineId::new(
ZTenantId::from(tenant_id),
ZTimelineId::from(timeline_id),
),
name.clone(),
)
})
})
.flatten()
.collect()
}

View File

@@ -1,3 +1,4 @@
use std::convert::TryFrom;
use std::io::Write;
use std::net::TcpStream;
use std::path::PathBuf;
@@ -9,7 +10,7 @@ use anyhow::{bail, Context};
use nix::errno::Errno;
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use pageserver::http::models::{TenantCreateRequest, TimelineCreateRequest};
use pageserver::http::models::{TenantCreateRequest, TimelineCreateRequest, TimelineInfoResponse};
use pageserver::timelines::TimelineInfo;
use postgres::{Config, NoTls};
use reqwest::blocking::{Client, RequestBuilder, Response};
@@ -18,7 +19,7 @@ use thiserror::Error;
use zenith_utils::http::error::HttpErrorBody;
use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use zenith_utils::zid::{HexZTenantId, HexZTimelineId, ZTenantId, ZTimelineId};
use crate::local_env::LocalEnv;
use crate::{fill_rust_env_vars, read_pidfile};
@@ -339,7 +340,9 @@ impl PageServerNode {
pub fn tenant_create(&self, new_tenant_id: Option<ZTenantId>) -> anyhow::Result<ZTenantId> {
let tenant_id_string = self
.http_request(Method::POST, format!("{}/tenant", self.http_base_url))
.json(&TenantCreateRequest { new_tenant_id })
.json(&TenantCreateRequest {
new_tenant_id: new_tenant_id.map(HexZTenantId::from),
})
.send()?
.error_from_body()?
.json::<String>()?;
@@ -351,15 +354,20 @@ impl PageServerNode {
})
}
pub fn timeline_list(&self, tenant_id: &ZTenantId) -> Result<Vec<TimelineInfo>> {
Ok(self
pub fn timeline_list(&self, tenant_id: &ZTenantId) -> anyhow::Result<Vec<TimelineInfo>> {
let timeline_infos: Vec<TimelineInfoResponse> = self
.http_request(
Method::GET,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
)
.send()?
.error_from_body()?
.json()?)
.json()?;
timeline_infos
.into_iter()
.map(TimelineInfo::try_from)
.collect()
}
pub fn timeline_create(
@@ -368,20 +376,22 @@ impl PageServerNode {
new_timeline_id: Option<ZTimelineId>,
ancestor_start_lsn: Option<Lsn>,
ancestor_timeline_id: Option<ZTimelineId>,
) -> Result<TimelineInfo> {
Ok(self
) -> anyhow::Result<TimelineInfo> {
let timeline_info_response = self
.http_request(
Method::POST,
format!("{}/tenant/{}/timeline", self.http_base_url, tenant_id),
)
.json(&TimelineCreateRequest {
new_timeline_id,
new_timeline_id: new_timeline_id.map(HexZTimelineId::from),
ancestor_start_lsn,
ancestor_timeline_id,
ancestor_timeline_id: ancestor_timeline_id.map(HexZTimelineId::from),
})
.send()?
.error_from_body()?
.json()?)
.json::<TimelineInfoResponse>()?;
TimelineInfo::try_from(timeline_info_response)
}
}

View File

@@ -1,26 +1,121 @@
use crate::timelines::TimelineInfo;
use anyhow::{anyhow, bail, Context};
use serde::{Deserialize, Serialize};
use zenith_utils::zid::ZNodeId;
use zenith_utils::{
lsn::Lsn,
zid::{opt_display_serde, ZTenantId, ZTimelineId},
zid::{HexZTenantId, HexZTimelineId, ZNodeId, ZTenantId, ZTimelineId},
};
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
#[serde(default)]
#[serde(with = "opt_display_serde")]
pub new_timeline_id: Option<ZTimelineId>,
#[serde(default)]
#[serde(with = "opt_display_serde")]
pub ancestor_timeline_id: Option<ZTimelineId>,
pub new_timeline_id: Option<HexZTimelineId>,
pub ancestor_timeline_id: Option<HexZTimelineId>,
pub ancestor_start_lsn: Option<Lsn>,
}
#[derive(Serialize, Deserialize)]
pub struct TenantCreateRequest {
#[serde(default)]
#[serde(with = "opt_display_serde")]
pub new_tenant_id: Option<ZTenantId>,
pub new_tenant_id: Option<HexZTenantId>,
}
#[derive(Serialize, Deserialize)]
pub struct TimelineInfoResponse {
pub kind: String,
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
disk_consistent_lsn: String,
last_record_lsn: Option<String>,
prev_record_lsn: Option<String>,
ancestor_timeline_id: Option<HexZTimelineId>,
ancestor_lsn: Option<String>,
current_logical_size: Option<usize>,
current_logical_size_non_incremental: Option<usize>,
}
impl From<TimelineInfo> for TimelineInfoResponse {
fn from(other: TimelineInfo) -> Self {
match other {
TimelineInfo::Local {
timeline_id,
tenant_id,
last_record_lsn,
prev_record_lsn,
ancestor_timeline_id,
ancestor_lsn,
disk_consistent_lsn,
current_logical_size,
current_logical_size_non_incremental,
} => TimelineInfoResponse {
kind: "Local".to_owned(),
timeline_id,
tenant_id,
disk_consistent_lsn: disk_consistent_lsn.to_string(),
last_record_lsn: Some(last_record_lsn.to_string()),
prev_record_lsn: Some(prev_record_lsn.to_string()),
ancestor_timeline_id: ancestor_timeline_id.map(HexZTimelineId::from),
ancestor_lsn: ancestor_lsn.map(|lsn| lsn.to_string()),
current_logical_size: Some(current_logical_size),
current_logical_size_non_incremental,
},
TimelineInfo::Remote {
timeline_id,
tenant_id,
disk_consistent_lsn,
} => TimelineInfoResponse {
kind: "Remote".to_owned(),
timeline_id,
tenant_id,
disk_consistent_lsn: disk_consistent_lsn.to_string(),
last_record_lsn: None,
prev_record_lsn: None,
ancestor_timeline_id: None,
ancestor_lsn: None,
current_logical_size: None,
current_logical_size_non_incremental: None,
},
}
}
}
impl TryFrom<TimelineInfoResponse> for TimelineInfo {
type Error = anyhow::Error;
fn try_from(other: TimelineInfoResponse) -> anyhow::Result<Self> {
let parse_lsn_hex_string = |lsn_string: String| {
lsn_string
.parse::<Lsn>()
.with_context(|| format!("Failed to parse Lsn as hex string from '{}'", lsn_string))
};
let disk_consistent_lsn = parse_lsn_hex_string(other.disk_consistent_lsn)?;
Ok(match other.kind.as_str() {
"Local" => TimelineInfo::Local {
timeline_id: other.timeline_id,
tenant_id: other.tenant_id,
last_record_lsn: other
.last_record_lsn
.ok_or(anyhow!("Local timeline should have last_record_lsn"))
.and_then(parse_lsn_hex_string)?,
prev_record_lsn: other
.prev_record_lsn
.ok_or(anyhow!("Local timeline should have prev_record_lsn"))
.and_then(parse_lsn_hex_string)?,
ancestor_timeline_id: other.ancestor_timeline_id.map(ZTimelineId::from),
ancestor_lsn: other.ancestor_lsn.map(parse_lsn_hex_string).transpose()?,
disk_consistent_lsn,
current_logical_size: other.current_logical_size.ok_or(anyhow!("No "))?,
current_logical_size_non_incremental: other.current_logical_size_non_incremental,
},
"Remote" => TimelineInfo::Remote {
timeline_id: other.timeline_id,
tenant_id: other.tenant_id,
disk_consistent_lsn,
},
unknown => bail!("Unknown timeline kind: {}", unknown),
})
}
}
#[derive(Serialize)]

View File

@@ -16,11 +16,11 @@ use zenith_utils::http::{
request::parse_request_param,
};
use zenith_utils::http::{RequestExt, RouterBuilder};
use zenith_utils::zid::{HexZTimelineId, ZTimelineId};
use zenith_utils::zid::{HexZTenantId, ZTimelineId};
use super::models::StatusResponse;
use super::models::TenantCreateRequest;
use super::models::TimelineCreateRequest;
use super::models::{
StatusResponse, TenantCreateRequest, TimelineCreateRequest, TimelineInfoResponse,
};
use crate::repository::RepositoryTimeline;
use crate::timelines::TimelineInfo;
use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId};
@@ -79,13 +79,13 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
timelines::create_timeline(
get_config(&request),
tenant_id,
request_data.new_timeline_id,
request_data.ancestor_timeline_id,
request_data.new_timeline_id.map(ZTimelineId::from),
request_data.ancestor_timeline_id.map(ZTimelineId::from),
request_data.ancestor_start_lsn,
)
})
.await
.map_err(ApiError::from_err)??;
.map_err(ApiError::from_err)?.map(TimelineInfoResponse::from)?;
Ok(json_response(StatusCode::CREATED, response_data)?)
}
@@ -93,12 +93,15 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request);
let response_data = tokio::task::spawn_blocking(move || {
let response_data: Vec<TimelineInfoResponse> = tokio::task::spawn_blocking(move || {
let _enter = info_span!("timeline_list", tenant = %tenant_id).entered();
crate::timelines::get_timelines(tenant_id, include_non_incremental_logical_size)
})
.await
.map_err(ApiError::from_err)??;
.map_err(ApiError::from_err)??
.into_iter()
.map(TimelineInfoResponse::from)
.collect();
Ok(json_response(StatusCode::OK, response_data)?)
}
@@ -137,7 +140,8 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
))
})
.await
.map_err(ApiError::from_err)??;
.map_err(ApiError::from_err)?
.map(TimelineInfoResponse::from)?;
Ok(json_response(StatusCode::OK, response_data)?)
}
@@ -216,14 +220,16 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
let new_tenant_id = tokio::task::spawn_blocking(move || {
let _enter = info_span!("tenant_create", tenant = ?request_data.new_tenant_id).entered();
// TODO kb this has to be changed to create tenant only
tenant_mgr::create_repository_for_tenant(get_config(&request), request_data.new_tenant_id)
tenant_mgr::create_repository_for_tenant(
get_config(&request),
request_data.new_tenant_id.map(ZTenantId::from),
)
})
.await
.map_err(ApiError::from_err)??;
Ok(json_response(
StatusCode::CREATED,
new_tenant_id.to_string(),
HexZTenantId::from(new_tenant_id),
)?)
}

View File

@@ -4,7 +4,6 @@
use anyhow::{anyhow, bail, Context, Result};
use postgres_ffi::ControlFileData;
use serde::{Deserialize, Serialize};
use std::{
fs,
path::Path,
@@ -14,7 +13,7 @@ use std::{
use tracing::*;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{opt_display_serde, ZTenantId, ZTimelineId};
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use zenith_utils::{crashsafe_dir, logging};
use crate::{config::PageServerConf, repository::Repository};
@@ -23,17 +22,13 @@ use crate::{layered_repository::LayeredRepository, walredo::WalRedoManager};
use crate::{repository::RepositoryTimeline, tenant_mgr};
use crate::{repository::Timeline, CheckpointConfig};
#[derive(Serialize, Deserialize, Clone)]
#[serde(tag = "type")]
#[derive(Clone)]
pub enum TimelineInfo {
Local {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
last_record_lsn: Lsn,
prev_record_lsn: Lsn,
#[serde(with = "opt_display_serde")]
ancestor_timeline_id: Option<ZTimelineId>,
ancestor_lsn: Option<Lsn>,
disk_consistent_lsn: Lsn,
@@ -41,9 +36,7 @@ pub enum TimelineInfo {
current_logical_size_non_incremental: Option<usize>,
},
Remote {
#[serde(with = "hex")]
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
disk_consistent_lsn: Lsn,
},

View File

@@ -39,7 +39,7 @@ def check_client(client: ZenithPageserverHttpClient, initial_tenant: UUID):
timeline_id_str = str(timeline['timeline_id'])
timeline_details = client.timeline_detail(tenant_id=tenant_id,
timeline_id=UUID(timeline_id_str))
assert timeline_details['type'] == 'Local'
assert timeline_details['kind'] == 'Local'
assert timeline_details['tenant_id'] == tenant_id.hex
assert timeline_details['timeline_id'] == timeline_id_str

View File

@@ -85,7 +85,7 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
timeline_details = client.timeline_detail(UUID(tenant_id), UUID(timeline_id))
assert timeline_details['timeline_id'] == timeline_id
assert timeline_details['tenant_id'] == tenant_id
if timeline_details['type'] == 'Local':
if timeline_details['kind'] == 'Local':
log.info("timeline downloaded, checking its data")
break
attempts += 1

View File

@@ -13,7 +13,7 @@ from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.zenith_fixtures import PgBin, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
from fixtures.utils import lsn_to_hex, mkdir_if_needed
from fixtures.utils import lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.log_helper import log
from typing import List, Optional, Any
@@ -91,7 +91,7 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
m = TimelineMetrics(
timeline_id=timeline_id,
last_record_lsn=timeline_detail["last_record_lsn"],
last_record_lsn=lsn_from_hex(timeline_detail["last_record_lsn"]),
)
for sk_m in sk_metrics:
m.flush_lsns.append(sk_m.flush_lsn_inexact[(tenant_id.hex, timeline_id)])

View File

@@ -81,9 +81,6 @@ fn main() -> Result<()> {
.required(false);
let pg_node_arg = Arg::new("node").help("Postgres node name").required(false);
let safekeeper_node_arg = Arg::new("node")
.help("Safekeeper node name")
.required(false);
let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false);
@@ -361,7 +358,7 @@ fn print_timeline(
print!("{} ", local_or_remote);
if nesting_level > 0 {
let lsn_string = match timeline.info {
let lsn_string = match &timeline.info {
TimelineInfo::Local { ancestor_lsn, .. } => ancestor_lsn
.map(|lsn| lsn.to_string())
.unwrap_or_else(|| "Unknown local Lsn".to_string()),
@@ -430,14 +427,11 @@ fn get_timeline_infos(
env: &local_env::LocalEnv,
tenant_id: &ZTenantId,
) -> Result<HashMap<ZTimelineId, TimelineInfo>> {
let page_server = PageServerNode::from_env(env);
let timeline_infos: Vec<TimelineInfo> = page_server.timeline_list(tenant_id)?;
let timeline_infos: HashMap<ZTimelineId, TimelineInfo> = timeline_infos
Ok(PageServerNode::from_env(env)
.timeline_list(tenant_id)?
.into_iter()
.map(|timeline_info| (timeline_info.timeline_id(), timeline_info))
.collect();
Ok(timeline_infos)
.collect())
}
// Helper function to parse --tenant_id option, or get the default from config file
@@ -486,7 +480,7 @@ fn handle_init(init_match: &ArgMatches) -> Result<LocalEnv> {
.context("Failed to initialize zenith repository")?;
// default_tenantid was generated by the `env.init()` call above
let initial_tenant_id = env.default_tenant_id.unwrap();
let initial_tenant_id = ZTenantId::from(env.default_tenant_id.unwrap());
// Call 'pageserver init'.
let pageserver = PageServerNode::from_env(&env);