Provide single list timelines HTTP API handle

This commit is contained in:
Kirill Bulatov
2022-03-20 21:13:23 +02:00
committed by Kirill Bulatov
parent 77ed2a0fa0
commit bd6bef468c
8 changed files with 83 additions and 220 deletions

View File

@@ -5,8 +5,6 @@ use zenith_utils::{
zid::{ZNodeId, ZTenantId, ZTimelineId},
};
use crate::timelines::{LocalTimelineInfo, TimelineInfo};
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
@@ -34,154 +32,6 @@ pub struct TenantCreateRequest {
#[serde(transparent)]
pub struct TenantCreateResponse(#[serde_as(as = "DisplayFromStr")] pub ZTenantId);
#[derive(Clone)]
pub enum TimelineInfoV1 {
Local {
timeline_id: ZTimelineId,
tenant_id: ZTenantId,
last_record_lsn: Lsn,
prev_record_lsn: Option<Lsn>,
ancestor_timeline_id: Option<ZTimelineId>,
ancestor_lsn: Option<Lsn>,
disk_consistent_lsn: Lsn,
current_logical_size: Option<usize>,
current_logical_size_non_incremental: Option<usize>,
},
Remote {
timeline_id: ZTimelineId,
tenant_id: ZTenantId,
disk_consistent_lsn: Lsn,
},
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct TimelineInfoResponseV1 {
pub kind: String,
#[serde_as(as = "DisplayFromStr")]
timeline_id: ZTimelineId,
#[serde_as(as = "DisplayFromStr")]
tenant_id: ZTenantId,
#[serde_as(as = "DisplayFromStr")]
disk_consistent_lsn: Lsn,
#[serde_as(as = "Option<DisplayFromStr>")]
last_record_lsn: Option<Lsn>,
#[serde_as(as = "Option<DisplayFromStr>")]
prev_record_lsn: Option<Lsn>,
#[serde_as(as = "Option<DisplayFromStr>")]
ancestor_timeline_id: Option<ZTimelineId>,
#[serde_as(as = "Option<DisplayFromStr>")]
ancestor_lsn: Option<Lsn>,
current_logical_size: Option<usize>,
current_logical_size_non_incremental: Option<usize>,
}
impl From<TimelineInfoV1> for TimelineInfoResponseV1 {
fn from(other: TimelineInfoV1) -> Self {
match other {
TimelineInfoV1::Local {
timeline_id,
tenant_id,
last_record_lsn,
prev_record_lsn,
ancestor_timeline_id,
ancestor_lsn,
disk_consistent_lsn,
current_logical_size,
current_logical_size_non_incremental,
} => TimelineInfoResponseV1 {
kind: "Local".to_owned(),
timeline_id,
tenant_id,
disk_consistent_lsn,
last_record_lsn: Some(last_record_lsn),
prev_record_lsn,
ancestor_timeline_id,
ancestor_lsn,
current_logical_size,
current_logical_size_non_incremental,
},
TimelineInfoV1::Remote {
timeline_id,
tenant_id,
disk_consistent_lsn,
} => TimelineInfoResponseV1 {
kind: "Remote".to_owned(),
timeline_id,
tenant_id,
disk_consistent_lsn,
last_record_lsn: None,
prev_record_lsn: None,
ancestor_timeline_id: None,
ancestor_lsn: None,
current_logical_size: None,
current_logical_size_non_incremental: None,
},
}
}
}
impl TryFrom<TimelineInfoResponseV1> for TimelineInfoV1 {
type Error = anyhow::Error;
fn try_from(other: TimelineInfoResponseV1) -> anyhow::Result<Self> {
Ok(match other.kind.as_str() {
"Local" => TimelineInfoV1::Local {
timeline_id: other.timeline_id,
tenant_id: other.tenant_id,
last_record_lsn: other.last_record_lsn.ok_or(anyhow::anyhow!(
"Local timeline should have last_record_lsn"
))?,
prev_record_lsn: other.prev_record_lsn,
ancestor_timeline_id: other.ancestor_timeline_id.map(ZTimelineId::from),
ancestor_lsn: other.ancestor_lsn,
disk_consistent_lsn: other.disk_consistent_lsn,
current_logical_size: other.current_logical_size,
current_logical_size_non_incremental: other.current_logical_size_non_incremental,
},
"Remote" => TimelineInfoV1::Remote {
timeline_id: other.timeline_id,
tenant_id: other.tenant_id,
disk_consistent_lsn: other.disk_consistent_lsn,
},
unknown => anyhow::bail!("Unknown timeline kind: {}", unknown),
})
}
}
fn from_local(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
local: &LocalTimelineInfo,
) -> TimelineInfoV1 {
TimelineInfoV1::Local {
timeline_id,
tenant_id,
last_record_lsn: local.last_record_lsn,
prev_record_lsn: local.prev_record_lsn,
ancestor_timeline_id: local.ancestor_timeline_id.map(ZTimelineId::from),
ancestor_lsn: local.ancestor_lsn,
disk_consistent_lsn: local.disk_consistent_lsn,
current_logical_size: local.current_logical_size,
current_logical_size_non_incremental: local.current_logical_size_non_incremental,
}
}
impl From<TimelineInfo> for TimelineInfoV1 {
fn from(t: TimelineInfo) -> Self {
match (t.local.as_ref(), t.remote.as_ref()) {
(None, None) => unreachable!(),
(None, Some(remote)) => TimelineInfoV1::Remote {
timeline_id: t.timeline_id,
tenant_id: t.tenant_id,
disk_consistent_lsn: remote.remote_consistent_lsn.unwrap_or(Lsn(0)),
},
(Some(local), None) => from_local(t.tenant_id, t.timeline_id, local),
(Some(local), Some(_)) => from_local(t.tenant_id, t.timeline_id, local),
}
}
}
#[derive(Serialize)]
pub struct StatusResponse {
pub id: ZNodeId,

View File

@@ -148,6 +148,7 @@ paths:
format: hex
ancestor_start_lsn:
type: string
format: hex
responses:
"201":
description: TimelineInfo
@@ -289,7 +290,6 @@ components:
required:
- timeline_id
- tenant_id
- disk_consistent_lsn
properties:
timeline_id:
type: string
@@ -297,17 +297,44 @@ components:
tenant_id:
type: string
format: hex
local:
$ref: "#/components/schemas/LocalTimelineInfo"
remote:
$ref: "#/components/schemas/RemoteTimelineInfo"
RemoteTimelineInfo:
type: object
required:
- awaits_download
properties:
awaits_download:
type: boolean
remote_consistent_lsn:
type: string
format: hex
LocalTimelineInfo:
type: object
required:
- last_record_lsn
- disk_consistent_lsn
- timeline_state
properties:
last_record_lsn:
type: string
prev_record_lsn:
format: hex
disk_consistent_lsn:
type: string
format: hex
timeline_state:
type: string
ancestor_timeline_id:
type: string
format: hex
ancestor_lsn:
type: string
disk_consistent_lsn:
format: hex
prev_record_lsn:
type: string
format: hex
current_logical_size:
type: integer
current_logical_size_non_incremental:

View File

@@ -21,7 +21,6 @@ use zenith_utils::zid::{ZTenantTimelineId, ZTimelineId};
use super::models::{
StatusResponse, TenantCreateRequest, TenantCreateResponse, TimelineCreateRequest,
TimelineInfoResponseV1, TimelineInfoV1,
};
use crate::remote_storage::{schedule_timeline_download, RemoteTimelineIndex};
use crate::timelines::{
@@ -143,8 +142,7 @@ fn get_include_non_incremental_logical_size(request: &Request<Body>) -> bool {
.unwrap_or(false)
}
// common part for v1 and v2 handlers
async fn timeline_detail_common(request: Request<Body>) -> Result<TimelineInfo, ApiError> {
async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
@@ -192,25 +190,12 @@ async fn timeline_detail_common(request: Request<Body>) -> Result<TimelineInfo,
));
}
Ok(TimelineInfo {
let timeline_info = TimelineInfo {
tenant_id,
timeline_id,
local: local_timeline_info,
remote: remote_timeline_info,
})
}
// TODO remove when console adopts v2
async fn timeline_detail_handler_v1(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let timeline_info = timeline_detail_common(request).await?;
Ok(json_response(
StatusCode::OK,
TimelineInfoResponseV1::from(TimelineInfoV1::from(timeline_info)),
)?)
}
async fn timeline_detail_handler_v2(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let timeline_info = timeline_detail_common(request).await?;
};
Ok(json_response(StatusCode::OK, timeline_info)?)
}
@@ -347,11 +332,7 @@ pub fn make_router(
.post("/v1/tenant/:tenant_id/timeline", timeline_create_handler)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler_v1,
)
.get(
"/v2/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler_v2,
timeline_detail_handler,
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/attach",

View File

@@ -39,10 +39,14 @@ def check_client(client: ZenithPageserverHttpClient, initial_tenant: UUID):
timeline_id_str = str(timeline['timeline_id'])
timeline_details = client.timeline_detail(tenant_id=tenant_id,
timeline_id=UUID(timeline_id_str))
assert timeline_details['kind'] == 'Local'
assert timeline_details['tenant_id'] == tenant_id.hex
assert timeline_details['timeline_id'] == timeline_id_str
local_timeline_details = timeline_details.get('local')
assert local_timeline_details is not None
assert local_timeline_details['timeline_state'] == 'Loaded'
def test_pageserver_http_api_client(zenith_simple_env: ZenithEnv):
env = zenith_simple_env

View File

@@ -141,7 +141,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder,
# wait until pageserver receives that data
wait_for_last_record_lsn(pageserver_http, tenant, timeline, current_lsn)
timeline_detail = pageserver_http.timeline_detail_v2(tenant, timeline)
timeline_detail = assert_local(pageserver_http, tenant, timeline)
if with_load == 'with_load':
# create load table

View File

@@ -2,7 +2,7 @@ from contextlib import closing
from uuid import UUID
import psycopg2.extras
import psycopg2.errors
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, Postgres
from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, Postgres, assert_local
from fixtures.log_helper import log
import time
@@ -13,8 +13,9 @@ def test_timeline_size(zenith_simple_env: ZenithEnv):
new_timeline_id = env.zenith_cli.create_branch('test_timeline_size', 'empty')
client = env.pageserver.http_client()
res = client.timeline_detail(tenant_id=env.initial_tenant, timeline_id=new_timeline_id)
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
timeline_details = assert_local(client, env.initial_tenant, new_timeline_id)
assert timeline_details['local']['current_logical_size'] == timeline_details['local'][
'current_logical_size_non_incremental']
pgmain = env.postgres.create_start("test_timeline_size")
log.info("postgres is running on 'test_timeline_size' branch")
@@ -31,12 +32,16 @@ def test_timeline_size(zenith_simple_env: ZenithEnv):
FROM generate_series(1, 10) g
""")
res = client.timeline_detail(tenant_id=env.initial_tenant, timeline_id=new_timeline_id)
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
res = assert_local(client, env.initial_tenant, new_timeline_id)
local_details = res['local']
assert local_details["current_logical_size"] == local_details[
"current_logical_size_non_incremental"]
cur.execute("TRUNCATE foo")
res = client.timeline_detail(tenant_id=env.initial_tenant, timeline_id=new_timeline_id)
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
res = assert_local(client, env.initial_tenant, new_timeline_id)
local_details = res['local']
assert local_details["current_logical_size"] == local_details[
"current_logical_size_non_incremental"]
# wait until received_lsn_lag is 0
@@ -71,8 +76,9 @@ def test_timeline_size_quota(zenith_env_builder: ZenithEnvBuilder):
new_timeline_id = env.zenith_cli.create_branch('test_timeline_size_quota')
client = env.pageserver.http_client()
res = client.timeline_detail(tenant_id=env.initial_tenant, timeline_id=new_timeline_id)
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
res = assert_local(client, env.initial_tenant, new_timeline_id)
assert res['local']["current_logical_size"] == res['local'][
"current_logical_size_non_incremental"]
pgmain = env.postgres.create_start(
"test_timeline_size_quota",

View File

@@ -89,29 +89,33 @@ def test_many_timelines(zenith_env_builder: ZenithEnvBuilder):
sk_metrics = [sk.http_client().get_metrics() for sk in env.safekeepers]
timeline_metrics = []
with env.pageserver.http_client() as pageserver_http:
for timeline_detail in timeline_details:
timeline_id: str = timeline_detail["timeline_id"]
for timeline_detail in timeline_details:
timeline_id: str = timeline_detail["timeline_id"]
m = TimelineMetrics(
timeline_id=timeline_id,
last_record_lsn=lsn_from_hex(timeline_detail["last_record_lsn"]),
)
for sk_m in sk_metrics:
m.flush_lsns.append(sk_m.flush_lsn_inexact[(tenant_id.hex, timeline_id)])
m.commit_lsns.append(sk_m.commit_lsn_inexact[(tenant_id.hex, timeline_id)])
local_timeline_detail = timeline_detail.get('local')
if local_timeline_detail is None:
log.debug(f"Timeline {timeline_id} is not present locally, skipping")
continue
for flush_lsn, commit_lsn in zip(m.flush_lsns, m.commit_lsns):
# Invariant. May be < when transaction is in progress.
assert commit_lsn <= flush_lsn
# We only call collect_metrics() after a transaction is confirmed by
# the compute node, which only happens after a consensus of safekeepers
# has confirmed the transaction. We assume majority consensus here.
assert (2 * sum(m.last_record_lsn <= lsn
for lsn in m.flush_lsns) > zenith_env_builder.num_safekeepers)
assert (2 * sum(m.last_record_lsn <= lsn
for lsn in m.commit_lsns) > zenith_env_builder.num_safekeepers)
timeline_metrics.append(m)
m = TimelineMetrics(
timeline_id=timeline_id,
last_record_lsn=lsn_from_hex(local_timeline_detail['last_record_lsn']),
)
for sk_m in sk_metrics:
m.flush_lsns.append(sk_m.flush_lsn_inexact[(tenant_id.hex, timeline_id)])
m.commit_lsns.append(sk_m.commit_lsn_inexact[(tenant_id.hex, timeline_id)])
for flush_lsn, commit_lsn in zip(m.flush_lsns, m.commit_lsns):
# Invariant. May be < when transaction is in progress.
assert commit_lsn <= flush_lsn
# We only call collect_metrics() after a transaction is confirmed by
# the compute node, which only happens after a consensus of safekeepers
# has confirmed the transaction. We assume majority consensus here.
assert (2 * sum(m.last_record_lsn <= lsn
for lsn in m.flush_lsns) > zenith_env_builder.num_safekeepers)
assert (2 * sum(m.last_record_lsn <= lsn
for lsn in m.commit_lsns) > zenith_env_builder.num_safekeepers)
timeline_metrics.append(m)
log.info(f"{message}: {timeline_metrics}")
return timeline_metrics

View File

@@ -783,15 +783,6 @@ class ZenithPageserverHttpClient(requests.Session):
assert isinstance(res_json, dict)
return res_json
def timeline_detail_v2(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]:
res = self.get(
f"http://localhost:{self.port}/v2/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}?include-non-incremental-logical-size=1"
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def get_metrics(self) -> str:
res = self.get(f"http://localhost:{self.port}/metrics")
self.verbose_error(res)
@@ -1891,7 +1882,7 @@ def wait_for(number_of_iterations: int, interval: int, func):
def assert_local(pageserver_http_client: ZenithPageserverHttpClient,
tenant: uuid.UUID,
timeline: uuid.UUID):
timeline_detail = pageserver_http_client.timeline_detail_v2(tenant, timeline)
timeline_detail = pageserver_http_client.timeline_detail(tenant, timeline)
assert timeline_detail.get('local', {}).get("disk_consistent_lsn"), timeline_detail
return timeline_detail
@@ -1899,7 +1890,7 @@ def assert_local(pageserver_http_client: ZenithPageserverHttpClient,
def remote_consistent_lsn(pageserver_http_client: ZenithPageserverHttpClient,
tenant: uuid.UUID,
timeline: uuid.UUID) -> int:
detail = pageserver_http_client.timeline_detail_v2(tenant, timeline)
detail = pageserver_http_client.timeline_detail(tenant, timeline)
lsn_str = detail['remote']['remote_consistent_lsn']
assert isinstance(lsn_str, str)
@@ -1918,7 +1909,7 @@ def wait_for_upload(pageserver_http_client: ZenithPageserverHttpClient,
def last_record_lsn(pageserver_http_client: ZenithPageserverHttpClient,
tenant: uuid.UUID,
timeline: uuid.UUID) -> int:
detail = pageserver_http_client.timeline_detail_v2(tenant, timeline)
detail = pageserver_http_client.timeline_detail(tenant, timeline)
lsn_str = detail['local']['last_record_lsn']
assert isinstance(lsn_str, str)