Merge 'local' and 'remote' parts of TimelineInfo into one struct.

The 'local' part was always filled in, so that was easy to merge into
into the TimelineInfo itself. 'remote' only contained two fields,
'remote_consistent_lsn' and 'awaits_download'. I made
'remote_consistent_lsn' an optional field, and 'awaits_download' is now
false if the timeline is not present remotely.

However, I kept stub versions of the 'local' and 'remote' structs for
backwards-compatibility, with a few fields that are actively used by
the control plane. They just duplicate the fields from TimelineInfo
now. They can be removed later, once the control plane has been
updated to use the new fields.
This commit is contained in:
Heikki Linnakangas
2022-10-14 17:31:43 +03:00
committed by Heikki Linnakangas
parent 500239176c
commit 538876650a
14 changed files with 252 additions and 276 deletions

View File

@@ -358,7 +358,7 @@ fn print_timelines_tree(
// Memorize all direct children of each timeline.
for timeline in timelines.iter() {
if let Some(ancestor_timeline_id) = timeline.local.ancestor_timeline_id {
if let Some(ancestor_timeline_id) = timeline.ancestor_timeline_id {
timelines_hash
.get_mut(&ancestor_timeline_id)
.context("missing timeline info in the HashMap")?
@@ -369,7 +369,7 @@ fn print_timelines_tree(
for timeline in timelines_hash.values() {
// Start with root local timelines (no ancestors) first.
if timeline.info.local.ancestor_timeline_id.is_none() {
if timeline.info.ancestor_timeline_id.is_none() {
print_timeline(0, &Vec::from([true]), timeline, &timelines_hash)?;
}
}
@@ -386,17 +386,8 @@ fn print_timeline(
timeline: &TimelineTreeEl,
timelines: &HashMap<TimelineId, TimelineTreeEl>,
) -> Result<()> {
let local_remote = if timeline.info.remote.is_some() {
"(L)"
} else {
"(L+R)"
};
// Draw main padding
print!("{} ", local_remote);
if nesting_level > 0 {
let ancestor_lsn = match timeline.info.local.ancestor_lsn {
let ancestor_lsn = match timeline.info.ancestor_lsn {
Some(lsn) => lsn.to_string(),
None => "Unknown Lsn".to_string(),
};
@@ -589,7 +580,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
Some(pg_version),
)?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.local.last_record_lsn;
let last_record_lsn = timeline_info.last_record_lsn;
env.register_branch_mapping(
DEFAULT_BRANCH_NAME.to_string(),
@@ -644,7 +635,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
pageserver.timeline_create(tenant_id, None, None, None, Some(pg_version))?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.local.last_record_lsn;
let last_record_lsn = timeline_info.last_record_lsn;
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
println!(
@@ -724,7 +715,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
)?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info.local.last_record_lsn;
let last_record_lsn = timeline_info.last_record_lsn;
env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?;
@@ -784,7 +775,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
// Use the LSN at the end of the timeline.
timeline_infos
.get(&node.timeline_id)
.map(|bi| bi.local.last_record_lsn.to_string())
.map(|bi| bi.last_record_lsn.to_string())
.unwrap_or_else(|| "?".to_string())
}
Some(lsn) => {

View File

@@ -123,9 +123,15 @@ pub struct TenantInfo {
pub has_in_progress_downloads: Option<bool>,
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LocalTimelineInfo {
pub struct TimelineInfo {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: TimelineId,
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<TimelineId>,
#[serde_as(as = "Option<DisplayFromStr>")]
@@ -149,28 +155,33 @@ pub struct LocalTimelineInfo {
/// the timestamp (in microseconds) of the last received message
pub last_received_msg_ts: Option<u128>,
pub pg_version: u32,
#[serde_as(as = "Option<DisplayFromStr>")]
pub remote_consistent_lsn: Option<Lsn>,
pub awaits_download: bool,
// Some of the above fields are duplicated in 'local' and 'remote', for backwards-
// compatility with older clients.
pub local: LocalTimelineInfo,
pub remote: RemoteTimelineInfo,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LocalTimelineInfo {
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_timeline_id: Option<TimelineId>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_lsn: Option<Lsn>,
pub current_logical_size: Option<u64>, // is None when timeline is Unloaded
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RemoteTimelineInfo {
#[serde_as(as = "DisplayFromStr")]
pub remote_consistent_lsn: Lsn,
pub awaits_download: bool,
}
///
/// This represents the output of the "timeline_detail" API call.
///
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TimelineInfo {
#[serde_as(as = "DisplayFromStr")]
pub tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
pub timeline_id: TimelineId,
pub local: LocalTimelineInfo,
pub remote: Option<RemoteTimelineInfo>,
#[serde_as(as = "Option<DisplayFromStr>")]
pub remote_consistent_lsn: Option<Lsn>,
}
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;

View File

@@ -207,7 +207,6 @@ paths:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp:
parameters:
- name: tenant_id
@@ -612,6 +611,9 @@ components:
required:
- timeline_id
- tenant_id
- last_record_lsn
- disk_consistent_lsn
- awaits_download
properties:
timeline_id:
type: string
@@ -619,33 +621,15 @@ components:
tenant_id:
type: string
format: hex
local:
$ref: "#/components/schemas/LocalTimelineInfo"
remote:
$ref: "#/components/schemas/RemoteTimelineInfo"
RemoteTimelineInfo:
type: object
required:
- awaits_download
- remote_consistent_lsn
properties:
awaits_download:
type: boolean
remote_consistent_lsn:
type: string
format: hex
LocalTimelineInfo:
type: object
required:
- last_record_lsn
- disk_consistent_lsn
properties:
last_record_lsn:
type: string
format: hex
disk_consistent_lsn:
type: string
format: hex
remote_consistent_lsn:
type: string
format: hex
ancestor_timeline_id:
type: string
format: hex
@@ -670,7 +654,39 @@ components:
format: hex
last_received_msg_ts:
type: integer
awaits_download:
type: boolean
# These 'local' and 'remote' fields just duplicate some of the fields
# above. They are kept for backwards-compatibility. They can be removed,
# when the control plane has been updated to look at the above fields
# directly.
local:
$ref: "#/components/schemas/LocalTimelineInfo"
remote:
$ref: "#/components/schemas/RemoteTimelineInfo"
LocalTimelineInfo:
type: object
properties:
ancestor_timeline_id:
type: string
format: hex
ancestor_lsn:
type: string
format: hex
current_logical_size:
type: integer
current_physical_size:
type: integer
RemoteTimelineInfo:
type: object
required:
- remote_consistent_lsn
properties:
remote_consistent_lsn:
type: string
format: hex
Error:
type: object
required:

View File

@@ -79,13 +79,13 @@ fn get_config(request: &Request<Body>) -> &'static PageServerConf {
get_state(request).conf
}
// Helper functions to construct a LocalTimelineInfo struct for a timeline
fn local_timeline_info_from_timeline(
// Helper function to construct a TimelineInfo struct for a timeline
async fn build_timeline_info(
state: &State,
timeline: &Arc<Timeline>,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> anyhow::Result<LocalTimelineInfo> {
) -> anyhow::Result<TimelineInfo> {
let last_record_lsn = timeline.get_last_record_lsn();
let (wal_source_connstr, last_received_msg_lsn, last_received_msg_ts) = {
let guard = timeline.last_received_wal.lock().unwrap();
@@ -100,24 +100,47 @@ fn local_timeline_info_from_timeline(
}
};
let info = LocalTimelineInfo {
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
ancestor_lsn: {
match timeline.get_ancestor_lsn() {
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
}
},
let (remote_consistent_lsn, awaits_download) = if let Some(remote_entry) = state
.remote_index
.read()
.await
.timeline_entry(&TenantTimelineId {
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
}) {
(
Some(remote_entry.metadata.disk_consistent_lsn()),
remote_entry.awaits_download,
)
} else {
(None, false)
};
let ancestor_timeline_id = timeline.get_ancestor_timeline_id();
let ancestor_lsn = match timeline.get_ancestor_lsn() {
Lsn(0) => None,
lsn @ Lsn(_) => Some(lsn),
};
let current_logical_size = match timeline.get_current_logical_size() {
Ok(size) => Some(size),
Err(err) => {
error!("Timeline info creation failed to get current logical size: {err:?}");
None
}
};
let current_physical_size = Some(timeline.get_physical_size());
let info = TimelineInfo {
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
ancestor_timeline_id,
ancestor_lsn,
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
last_record_lsn,
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
current_logical_size: Some(
timeline
.get_current_logical_size()
.context("Timeline info creation failed to get current logical size")?,
),
current_physical_size: Some(timeline.get_physical_size()),
current_logical_size,
current_physical_size,
current_logical_size_non_incremental: if include_non_incremental_logical_size {
Some(timeline.get_current_logical_size_non_incremental(last_record_lsn)?)
} else {
@@ -132,32 +155,25 @@ fn local_timeline_info_from_timeline(
last_received_msg_lsn,
last_received_msg_ts,
pg_version: timeline.pg_version,
remote_consistent_lsn,
awaits_download,
// Duplicate some fields in 'local' and 'remote' fields, for backwards-compatility
// with the control plane.
local: LocalTimelineInfo {
ancestor_timeline_id,
ancestor_lsn,
current_logical_size,
current_physical_size,
},
remote: RemoteTimelineInfo {
remote_consistent_lsn,
},
};
Ok(info)
}
fn list_local_timelines(
tenant_id: TenantId,
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> Result<Vec<(TimelineId, LocalTimelineInfo)>> {
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
let timelines = tenant.list_timelines();
let mut local_timeline_info = Vec::with_capacity(timelines.len());
for repository_timeline in timelines {
local_timeline_info.push((
repository_timeline.timeline_id,
local_timeline_info_from_timeline(
&repository_timeline,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
)?,
))
}
Ok(local_timeline_info)
}
// healthcheck handler
async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let config = get_config(&request);
@@ -169,6 +185,8 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_id))?;
let state = get_state(&request);
let tenant = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?;
let new_timeline_info = async {
match tenant.create_timeline(
@@ -179,14 +197,10 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
).await {
Ok(Some(new_timeline)) => {
// Created. Construct a TimelineInfo for it.
let local_info = local_timeline_info_from_timeline(&new_timeline, false, false)
let timeline_info = build_timeline_info(state, &new_timeline, false, false)
.await
.map_err(ApiError::InternalServerError)?;
Ok(Some(TimelineInfo {
tenant_id,
timeline_id: new_timeline.timeline_id,
local: local_info,
remote: None,
}))
Ok(Some(timeline_info))
}
Ok(None) => Ok(None), // timeline already exists
Err(err) => Err(ApiError::InternalServerError(err)),
@@ -209,6 +223,8 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
query_param_present(&request, "include-non-incremental-physical-size");
check_permission(&request, Some(tenant_id))?;
let state = get_state(&request);
let timelines = tokio::task::spawn_blocking(move || {
let _enter = info_span!("timeline_list", tenant = %tenant_id).entered();
let tenant = tenant_mgr::get_tenant(tenant_id, true).map_err(ApiError::NotFound)?;
@@ -219,32 +235,17 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
let mut response_data = Vec::with_capacity(timelines.len());
for timeline in timelines {
let timeline_id = timeline.timeline_id;
let local = local_timeline_info_from_timeline(
let timeline_info = build_timeline_info(
state,
&timeline,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
)
.context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
.map_err(ApiError::InternalServerError)?;
.await
.context("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}")
.map_err(ApiError::InternalServerError)?;
response_data.push(TimelineInfo {
tenant_id,
timeline_id,
local,
remote: get_state(&request)
.remote_index
.read()
.await
.timeline_entry(&TenantTimelineId {
tenant_id,
timeline_id,
})
.map(|remote_entry| RemoteTimelineInfo {
remote_consistent_lsn: remote_entry.metadata.disk_consistent_lsn(),
awaits_download: remote_entry.awaits_download,
}),
})
response_data.push(timeline_info);
}
json_response(StatusCode::OK, response_data)
@@ -289,7 +290,9 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
query_param_present(&request, "include-non-incremental-physical-size");
check_permission(&request, Some(tenant_id))?;
let (local_timeline_info, remote_timeline_info) = async {
let state = get_state(&request);
let timeline_info = async {
let timeline = tokio::task::spawn_blocking(move || {
tenant_mgr::get_tenant(tenant_id, true)?.get_timeline(timeline_id)
})
@@ -298,40 +301,22 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
let timeline = timeline.map_err(ApiError::NotFound)?;
let local_timeline_info = local_timeline_info_from_timeline(
let timeline_info = build_timeline_info(
state,
&timeline,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
)
.context("Failed to get local timeline info: {e:#}")
.map_err(ApiError::InternalServerError)?;
.await
.context("Failed to get local timeline info: {e:#}")
.map_err(ApiError::InternalServerError)?;
let remote_timeline_info = {
let remote_index_read = get_state(&request).remote_index.read().await;
remote_index_read
.timeline_entry(&TenantTimelineId {
tenant_id,
timeline_id,
})
.map(|remote_entry| RemoteTimelineInfo {
remote_consistent_lsn: remote_entry.metadata.disk_consistent_lsn(),
awaits_download: remote_entry.awaits_download,
})
};
Ok::<_, ApiError>((local_timeline_info, remote_timeline_info))
Ok::<_, ApiError>(timeline_info)
}
.instrument(info_span!("timeline_detail", tenant = %tenant_id, timeline = %timeline_id))
.await?;
json_response(
StatusCode::OK,
TimelineInfo {
tenant_id,
timeline_id,
local: local_timeline_info,
remote: remote_timeline_info,
},
)
json_response(StatusCode::OK, timeline_info)
}
async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -553,36 +538,27 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
false
});
let tenant_state = match tenant {
Ok(tenant) => tenant.current_state(),
let (tenant_state, current_physical_size) = match tenant {
Ok(tenant) => {
let timelines = tenant.list_timelines();
// Calculate total physical size of all timelines
let mut current_physical_size = 0;
for timeline in timelines {
current_physical_size += timeline.get_physical_size();
}
(tenant.current_state(), Some(current_physical_size))
}
Err(e) => {
error!("Failed to get local tenant state: {e:#}");
if has_in_progress_downloads {
TenantState::Paused
(TenantState::Paused, None)
} else {
TenantState::Broken
(TenantState::Broken, None)
}
}
};
let current_physical_size =
match tokio::task::spawn_blocking(move || list_local_timelines(tenant_id, false, false))
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))?
{
Err(err) => {
// Getting local timelines can fail when no local tenant directory is on disk (e.g, when tenant data is being downloaded).
// In that case, put a warning message into log and operate normally.
warn!("Failed to get local timelines for tenant {tenant_id}: {err}");
None
}
Ok(local_timeline_infos) => Some(
local_timeline_infos
.into_iter()
.fold(0, |acc, x| acc + x.1.current_physical_size.unwrap()),
),
};
json_response(
StatusCode::OK,
TenantInfo {

View File

@@ -317,13 +317,13 @@ def remote_consistent_lsn(
) -> int:
detail = pageserver_http_client.timeline_detail(tenant, timeline)
if detail["remote"] is None:
lsn_str = detail["remote_consistent_lsn"]
if lsn_str is None:
# No remote information at all. This happens right after creating
# a timeline, before any part of it has been uploaded to remote
# storage yet.
return 0
else:
lsn_str = detail["remote"]["remote_consistent_lsn"]
assert isinstance(lsn_str, str)
return lsn_from_hex(lsn_str)
@@ -577,7 +577,7 @@ def main(args: argparse.Namespace):
args.work_dir, f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar"
)
pg_version = timeline["local"]["pg_version"]
pg_version = timeline["pg_version"]
# Export timeline from old pageserver
if args.only_import is False:

View File

@@ -1179,7 +1179,7 @@ CREATE_TIMELINE_ID_EXTRACTOR = re.compile(
r"^Created timeline '(?P<timeline_id>[^']+)'", re.MULTILINE
)
TIMELINE_DATA_EXTRACTOR = re.compile(
r"\s(?P<branch_name>[^\s]+)\s\[(?P<timeline_id>[^\]]+)\]", re.MULTILINE
r"\s?(?P<branch_name>[^\s]+)\s\[(?P<timeline_id>[^\]]+)\]", re.MULTILINE
)
@@ -1430,8 +1430,8 @@ class NeonCli(AbstractNeonCli):
Returns a list of (branch_name, timeline_id) tuples out of parsed `neon timeline list` CLI output.
"""
# (L) main [b49f7954224a0ad25cc0013ea107b54b]
# (L) ┣━ @0/16B5A50: test_cli_branch_list_main [20f98c79111b9015d84452258b7d5540]
# main [b49f7954224a0ad25cc0013ea107b54b]
# ┣━ @0/16B5A50: test_cli_branch_list_main [20f98c79111b9015d84452258b7d5540]
res = self.raw_cli(
["timeline", "list", "--tenant-id", str(tenant_id or self.env.initial_tenant)]
)
@@ -2702,19 +2702,6 @@ def wait_until(number_of_iterations: int, interval: float, func):
raise Exception("timed out while waiting for %s" % func) from last_exception
def assert_timeline_local(
pageserver_http_client: NeonPageserverHttpClient, tenant: TenantId, timeline: TimelineId
):
timeline_detail = pageserver_http_client.timeline_detail(
tenant,
timeline,
include_non_incremental_logical_size=True,
include_non_incremental_physical_size=True,
)
assert timeline_detail.get("local", {}).get("disk_consistent_lsn"), timeline_detail
return timeline_detail
def assert_no_in_progress_downloads_for_tenant(
pageserver_http_client: NeonPageserverHttpClient,
tenant: TenantId,
@@ -2728,15 +2715,14 @@ def remote_consistent_lsn(
) -> Lsn:
detail = pageserver_http_client.timeline_detail(tenant, timeline)
if detail["remote"] is None:
lsn_str = detail["remote_consistent_lsn"]
if lsn_str is None:
# No remote information at all. This happens right after creating
# a timeline, before any part of it has been uploaded to remote
# storage yet.
return Lsn(0)
else:
lsn_str = detail["remote"]["remote_consistent_lsn"]
assert isinstance(lsn_str, str)
return Lsn(lsn_str)
assert isinstance(lsn_str, str)
return Lsn(lsn_str)
def wait_for_upload(
@@ -2768,7 +2754,7 @@ def last_record_lsn(
) -> Lsn:
detail = pageserver_http_client.timeline_detail(tenant, timeline)
lsn_str = detail["local"]["last_record_lsn"]
lsn_str = detail["last_record_lsn"]
assert isinstance(lsn_str, str)
return Lsn(lsn_str)

View File

@@ -155,8 +155,8 @@ def test_import_from_pageserver_multisegment(pg_bin: PgBin, neon_env_builder: Ne
lsn = _generate_data(num_rows, pg)
logical_size = env.pageserver.http_client().timeline_detail(env.initial_tenant, timeline)[
"local"
]["current_logical_size"]
"current_logical_size"
]
log.info(f"timeline logical size = {logical_size / (1024 ** 2)}MB")
assert logical_size > 1024**3 # = 1GB

View File

@@ -93,7 +93,6 @@ def check_client(client: NeonPageserverHttpClient, initial_tenant: TenantId):
assert TenantId(timeline_details["tenant_id"]) == tenant_id
assert TimelineId(timeline_details["timeline_id"]) == timeline_id
assert timeline_details.get("local") is not None
def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv):
@@ -125,16 +124,15 @@ def expect_updated_msg_lsn(
timeline_details = client.timeline_detail(tenant_id, timeline_id=timeline_id)
# a successful `timeline_details` response must contain the below fields
local_timeline_details = timeline_details["local"]
assert "wal_source_connstr" in local_timeline_details.keys()
assert "last_received_msg_lsn" in local_timeline_details.keys()
assert "last_received_msg_ts" in local_timeline_details.keys()
assert "wal_source_connstr" in timeline_details.keys()
assert "last_received_msg_lsn" in timeline_details.keys()
assert "last_received_msg_ts" in timeline_details.keys()
assert (
local_timeline_details["last_received_msg_lsn"] is not None
timeline_details["last_received_msg_lsn"] is not None
), "the last received message's LSN is empty"
last_msg_lsn = Lsn(local_timeline_details["last_received_msg_lsn"])
last_msg_lsn = Lsn(timeline_details["last_received_msg_lsn"])
assert (
prev_msg_lsn is None or prev_msg_lsn < last_msg_lsn
), f"the last received message's LSN {last_msg_lsn} hasn't been updated \

View File

@@ -10,8 +10,8 @@ import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
NeonPageserverHttpClient,
RemoteStorageKind,
assert_timeline_local,
available_remote_storages,
wait_for_last_record_lsn,
wait_for_upload,
@@ -125,16 +125,15 @@ def test_remote_storage_backup_and_restore(
wait_until(
number_of_iterations=20,
interval=1,
func=lambda: assert_timeline_local(client, tenant_id, timeline_id),
func=lambda: expect_tenant_to_download_timeline(client, tenant_id),
)
detail = client.timeline_detail(tenant_id, timeline_id)
assert detail["local"] is not None
log.info("Timeline detail after attach completed: %s", detail)
assert (
Lsn(detail["local"]["last_record_lsn"]) >= current_lsn
Lsn(detail["last_record_lsn"]) >= current_lsn
), "current db Lsn should should not be less than the one stored on remote storage"
assert not detail["remote"]["awaits_download"]
assert not detail["awaits_download"]
pg = env.postgres.create_start("main")
with pg.cursor() as cur:
@@ -143,3 +142,16 @@ def test_remote_storage_backup_and_restore(
query_scalar(cur, f"SELECT secret FROM t{checkpoint_number} WHERE id = {data_id};")
== f"{data_secret}|{checkpoint_number}"
)
def expect_tenant_to_download_timeline(
client: NeonPageserverHttpClient,
tenant_id: TenantId,
):
for tenant in client.tenant_list():
if tenant["id"] == str(tenant_id):
assert not tenant.get(
"has_in_progress_downloads", True
), f"Tenant {tenant_id} should have no downloads in progress"
return
assert False, f"Tenant {tenant_id} is missing on pageserver"

View File

@@ -16,7 +16,6 @@ from fixtures.neon_fixtures import (
PortDistributor,
Postgres,
assert_no_in_progress_downloads_for_tenant,
assert_timeline_local,
base_dir,
neon_binpath,
pg_distrib_dir,
@@ -167,18 +166,18 @@ def check_timeline_attached(
old_current_lsn: Lsn,
):
# new pageserver should be in sync (modulo wal tail or vacuum activity) with the old one because there was no new writes since checkpoint
new_timeline_detail = assert_timeline_local(new_pageserver_http_client, tenant_id, timeline_id)
new_timeline_detail = new_pageserver_http_client.timeline_detail(tenant_id, timeline_id)
# when load is active these checks can break because lsns are not static
# so let's check with some margin
assert_abs_margin_ratio(
int(Lsn(new_timeline_detail["local"]["disk_consistent_lsn"])),
int(Lsn(old_timeline_detail["local"]["disk_consistent_lsn"])),
int(Lsn(new_timeline_detail["disk_consistent_lsn"])),
int(Lsn(old_timeline_detail["disk_consistent_lsn"])),
0.03,
)
assert_abs_margin_ratio(
int(Lsn(new_timeline_detail["local"]["disk_consistent_lsn"])), int(old_current_lsn), 0.03
int(Lsn(new_timeline_detail["disk_consistent_lsn"])), int(old_current_lsn), 0.03
)
@@ -301,10 +300,10 @@ def test_tenant_relocation(
# wait until pageserver receives that data
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id_main, current_lsn_main)
timeline_detail_main = assert_timeline_local(pageserver_http, tenant_id, timeline_id_main)
timeline_detail_main = pageserver_http.timeline_detail(tenant_id, timeline_id_main)
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id_second, current_lsn_second)
timeline_detail_second = assert_timeline_local(pageserver_http, tenant_id, timeline_id_second)
timeline_detail_second = pageserver_http.timeline_detail(tenant_id, timeline_id_second)
if with_load == "with_load":
# create load table

View File

@@ -65,7 +65,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
# check 404
with pytest.raises(
NeonPageserverApiException,
match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} is not found neither locally nor remotely",
match=f"Timeline {leaf_timeline_id} was not found for tenant {env.initial_tenant}",
):
ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)

View File

@@ -16,7 +16,6 @@ from fixtures.neon_fixtures import (
PortDistributor,
Postgres,
VanillaPostgres,
assert_timeline_local,
wait_for_last_flush_lsn,
)
from fixtures.types import TenantId, TimelineId
@@ -44,20 +43,16 @@ def test_timeline_size(neon_simple_env: NeonEnv):
"""
)
res = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
local_details = res["local"]
assert (
local_details["current_logical_size"]
== local_details["current_logical_size_non_incremental"]
res = client.timeline_detail(
env.initial_tenant, new_timeline_id, include_non_incremental_logical_size=True
)
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
cur.execute("TRUNCATE foo")
res = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
local_details = res["local"]
assert (
local_details["current_logical_size"]
== local_details["current_logical_size_non_incremental"]
res = client.timeline_detail(
env.initial_tenant, new_timeline_id, include_non_incremental_logical_size=True
)
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
@@ -66,22 +61,22 @@ def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
client = env.pageserver.http_client()
wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id)
timeline_details = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
timeline_details = client.timeline_detail(
env.initial_tenant, new_timeline_id, include_non_incremental_logical_size=True
)
pgmain = env.postgres.create_start("test_timeline_size_createdropdb")
log.info("postgres is running on 'test_timeline_size_createdropdb' branch")
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
res = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
local_details = res["local"]
assert (
local_details["current_logical_size"]
== local_details["current_logical_size_non_incremental"]
res = client.timeline_detail(
env.initial_tenant, new_timeline_id, include_non_incremental_logical_size=True
)
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
assert (
timeline_details["local"]["current_logical_size_non_incremental"]
== local_details["current_logical_size_non_incremental"]
timeline_details["current_logical_size_non_incremental"]
== res["current_logical_size_non_incremental"]
), "no writes should not change the incremental logical size"
cur.execute("CREATE DATABASE foodb")
@@ -97,21 +92,21 @@ def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
"""
)
res = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
local_details = res["local"]
res = client.timeline_detail(
env.initial_tenant,
new_timeline_id,
include_non_incremental_logical_size=True,
)
assert (
local_details["current_logical_size"]
== local_details["current_logical_size_non_incremental"]
res["current_logical_size"] == res["current_logical_size_non_incremental"]
)
cur.execute("DROP DATABASE foodb")
res = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
local_details = res["local"]
assert (
local_details["current_logical_size"]
== local_details["current_logical_size_non_incremental"]
res = client.timeline_detail(
env.initial_tenant, new_timeline_id, include_non_incremental_logical_size=True
)
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
# wait until received_lsn_lag is 0
@@ -210,10 +205,11 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
pg_cluster_size = cur.fetchone()
log.info(f"pg_cluster_size = {pg_cluster_size}")
new_res = assert_timeline_local(client, env.initial_tenant, new_timeline_id)
new_res = client.timeline_detail(
env.initial_tenant, new_timeline_id, include_non_incremental_logical_size=True
)
assert (
new_res["local"]["current_logical_size"]
== new_res["local"]["current_logical_size_non_incremental"]
new_res["current_logical_size"] == new_res["current_logical_size_non_incremental"]
), "after the WAL is streamed, current_logical_size is expected to be calculated and to be equal its non-incremental value"
@@ -419,7 +415,7 @@ def test_tenant_physical_size(neon_simple_env: NeonEnv):
def get_timeline_physical_size(timeline: TimelineId):
res = client.timeline_detail(tenant, timeline, include_non_incremental_physical_size=True)
return res["local"]["current_physical_size_non_incremental"]
return res["current_physical_size_non_incremental"]
timeline_total_size = get_timeline_physical_size(timeline)
for i in range(10):
@@ -450,13 +446,10 @@ def assert_physical_size(env: NeonEnv, tenant_id: TenantId, timeline_id: Timelin
"""Check the current physical size returned from timeline API
matches the total physical size of the timeline on disk"""
client = env.pageserver.http_client()
res = assert_timeline_local(client, tenant_id, timeline_id)
res = client.timeline_detail(tenant_id, timeline_id, include_non_incremental_physical_size=True)
timeline_path = env.timeline_dir(tenant_id, timeline_id)
assert (
res["local"]["current_physical_size"]
== res["local"]["current_physical_size_non_incremental"]
)
assert res["local"]["current_physical_size"] == get_timeline_dir_size(timeline_path)
assert res["current_physical_size"] == res["current_physical_size_non_incremental"]
assert res["current_physical_size"] == get_timeline_dir_size(timeline_path)
# Timeline logical size initialization is an asynchronous background task that runs once,
@@ -465,13 +458,16 @@ def wait_for_timeline_size_init(
client: NeonPageserverHttpClient, tenant: TenantId, timeline: TimelineId
):
for i in range(10):
timeline_details = assert_timeline_local(client, tenant, timeline)
if (
timeline_details["local"]["current_logical_size"]
== timeline_details["local"]["current_logical_size_non_incremental"]
):
timeline_details = client.timeline_detail(
tenant, timeline, include_non_incremental_logical_size=True
)
current_logical_size = timeline_details["current_logical_size"]
non_incremental = timeline_details["current_logical_size_non_incremental"]
if current_logical_size == non_incremental:
return
log.info(f"waiting for current_logical_size of a timeline to be calculated, iteration {i}")
log.info(
f"waiting for current_logical_size of a timeline to be calculated, iteration {i}: {current_logical_size} vs {non_incremental}"
)
time.sleep(1)
raise Exception(
f"timed out while waiting for current_logical_size of a timeline to reach its non-incremental value, details: {timeline_details}"

View File

@@ -127,14 +127,9 @@ def test_many_timelines(neon_env_builder: NeonEnvBuilder):
for timeline_detail in timeline_details:
timeline_id = TimelineId(timeline_detail["timeline_id"])
local_timeline_detail = timeline_detail.get("local")
if local_timeline_detail is None:
log.debug(f"Timeline {timeline_id} is not present locally, skipping")
continue
m = TimelineMetrics(
timeline_id=timeline_id,
last_record_lsn=Lsn(local_timeline_detail["last_record_lsn"]),
last_record_lsn=Lsn(timeline_detail["last_record_lsn"]),
)
for sk_m in sk_metrics:
m.flush_lsns.append(Lsn(sk_m.flush_lsn_inexact[(tenant_id, timeline_id)]))
@@ -536,7 +531,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
)
ps_cli = env.pageserver.http_client()
pageserver_lsn = Lsn(ps_cli.timeline_detail(tenant_id, timeline_id)["local"]["last_record_lsn"])
pageserver_lsn = Lsn(ps_cli.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
lag = last_lsn - pageserver_lsn
log.info(
f"Pageserver last_record_lsn={pageserver_lsn}; flush_lsn={last_lsn}; lag before replay is {lag / 1024}kb"
@@ -580,9 +575,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
raise RuntimeError("Timed out waiting for WAL redo")
pageserver_lsn = Lsn(
env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)["local"][
"last_record_lsn"
]
env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)["last_record_lsn"]
)
lag = last_lsn - pageserver_lsn

View File

@@ -179,9 +179,7 @@ async def run_restarts_under_load(
log.info(f"Postgres flush_lsn {flush_lsn}")
pageserver_lsn = Lsn(
env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)["local"][
"last_record_lsn"
]
env.pageserver.http_client().timeline_detail(tenant_id, timeline_id)["last_record_lsn"]
)
sk_ps_lag = flush_lsn - pageserver_lsn
log.info(f"Pageserver last_record_lsn={pageserver_lsn} lag={sk_ps_lag / 1024}kb")