add incremental tracking of logical timeline size

In order to exclude problems with synchronizing disk and memory logical
size is not stored in metadata on disk. It is calculated on timeline
"start" by scanning the contents of layered repo and then size is maintained
via an atomic variable.

This patch also adds new endpoint to pageserver http api: branch detail.
It allows retrieval of a particular branch info by its name. Size info
is also added to the response of the endpoint and used in tests.
This commit is contained in:
Dmitry Rodionov
2021-08-27 19:58:52 +03:00
committed by Dmitry
parent 1b9e49eb60
commit b4ecae33e4
10 changed files with 415 additions and 105 deletions

View File

@@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::net::TcpStream;
use std::path::PathBuf;
use std::process::Command;
@@ -230,7 +229,7 @@ impl PageServerNode {
tenantid: &ZTenantId,
) -> Result<BranchInfo> {
Ok(self
.http_request(Method::POST, format!("{}/{}", self.http_base_url, "branch"))
.http_request(Method::POST, format!("{}/branch", self.http_base_url))
.json(&BranchCreateRequest {
tenant_id: tenantid.to_owned(),
name: branch_name.to_owned(),
@@ -241,24 +240,19 @@ impl PageServerNode {
.json()?)
}
// TODO: make this a separate request type and avoid loading all the branches
pub fn branch_get_by_name(
&self,
tenantid: &ZTenantId,
branch_name: &str,
) -> Result<BranchInfo> {
let branch_infos = self.branch_list(tenantid)?;
let branch_by_name: Result<HashMap<String, BranchInfo>> = branch_infos
.into_iter()
.map(|branch_info| Ok((branch_info.name.clone(), branch_info)))
.collect();
let branch_by_name = branch_by_name?;
let branch = branch_by_name
.get(branch_name)
.ok_or_else(|| anyhow!("Branch {} not found", branch_name))?;
Ok(branch.clone())
Ok(self
.http_request(
Method::GET,
format!("{}/branch/{}/{}", self.http_base_url, tenantid, branch_name),
)
.send()?
.error_for_status()?
.json()?)
}
}

View File

@@ -30,9 +30,64 @@ pub struct BranchInfo {
pub name: String,
#[serde(with = "hex")]
pub timeline_id: ZTimelineId,
pub latest_valid_lsn: Option<Lsn>,
pub latest_valid_lsn: Lsn,
pub ancestor_id: Option<String>,
pub ancestor_lsn: Option<String>,
pub current_logical_size: usize,
pub current_logical_size_non_incremental: usize,
}
impl BranchInfo {
pub fn from_path<T: AsRef<Path>>(
path: T,
conf: &PageServerConf,
tenantid: &ZTenantId,
repo: &Arc<dyn Repository>,
) -> Result<Self> {
let name = path
.as_ref()
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_string();
let timeline_id = std::fs::read_to_string(path)?.parse::<ZTimelineId>()?;
let timeline = repo.get_timeline(timeline_id)?;
let ancestor_path = conf.ancestor_path(&timeline_id, tenantid);
let mut ancestor_id: Option<String> = None;
let mut ancestor_lsn: Option<String> = None;
if ancestor_path.exists() {
let ancestor = std::fs::read_to_string(ancestor_path)?;
let mut strings = ancestor.split('@');
ancestor_id = Some(
strings
.next()
.with_context(|| "wrong branch ancestor point in time format")?
.to_owned(),
);
ancestor_lsn = Some(
strings
.next()
.with_context(|| "wrong branch ancestor point in time format")?
.to_owned(),
);
}
Ok(BranchInfo {
name,
timeline_id,
latest_valid_lsn: timeline.get_last_record_lsn(),
ancestor_id,
ancestor_lsn,
current_logical_size: timeline.get_current_logical_size(),
current_logical_size_non_incremental: timeline
.get_current_logical_size_non_incremental(timeline.get_last_record_lsn())?,
})
}
}
#[derive(Debug, Clone, Copy)]
@@ -210,43 +265,12 @@ pub(crate) fn get_branches(conf: &PageServerConf, tenantid: &ZTenantId) -> Resul
std::fs::read_dir(&branches_dir)?
.map(|dir_entry_res| {
let dir_entry = dir_entry_res?;
let name = dir_entry.file_name().to_str().unwrap().to_string();
let timeline_id = std::fs::read_to_string(dir_entry.path())?.parse::<ZTimelineId>()?;
let latest_valid_lsn = repo
.get_timeline(timeline_id)
.map(|timeline| timeline.get_last_record_lsn())
.ok();
let ancestor_path = conf.ancestor_path(&timeline_id, tenantid);
let mut ancestor_id: Option<String> = None;
let mut ancestor_lsn: Option<String> = None;
if ancestor_path.exists() {
let ancestor = std::fs::read_to_string(ancestor_path)?;
let mut strings = ancestor.split('@');
ancestor_id = Some(
strings
.next()
.with_context(|| "wrong branch ancestor point in time format")?
.to_owned(),
);
ancestor_lsn = Some(
strings
.next()
.with_context(|| "wrong branch ancestor point in time format")?
.to_owned(),
);
}
Ok(BranchInfo {
name,
timeline_id,
latest_valid_lsn,
ancestor_id,
ancestor_lsn,
})
Ok(BranchInfo::from_path(
dir_entry.path(),
conf,
tenantid,
&repo,
)?)
})
.collect()
}
@@ -296,9 +320,11 @@ pub(crate) fn create_branch(
Ok(BranchInfo {
name: branchname.to_string(),
timeline_id: newtli,
latest_valid_lsn: Some(startpoint.lsn),
latest_valid_lsn: startpoint.lsn,
ancestor_id: None,
ancestor_lsn: None,
current_logical_size: 0,
current_logical_size_non_incremental: 0,
})
}

View File

@@ -54,7 +54,52 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/branch/{tenant_id}/{branch_name}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
format: hex
- name: branch_name
in: path
required: true
schema:
type: string
get:
description: Get branches for tenant
responses:
"200":
description: BranchInfo
content:
application/json:
schema:
$ref: "#/components/schemas/BranchInfo"
"400":
description: Error when no tenant id found in path or no branch name
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error
content:
@@ -203,6 +248,9 @@ components:
required:
- name
- timeline_id
- latest_valid_lsn
- current_logical_size
- current_logical_size_non_incremental
properties:
name:
type: string
@@ -213,6 +261,10 @@ components:
type: string
ancestor_lsn:
type: string
current_logical_size:
type: integer
current_logical_size_non_incremental:
type: integer
Error:
type: object
required:

View File

@@ -1,3 +1,4 @@
use std::str::FromStr;
use std::sync::Arc;
use anyhow::Result;
@@ -18,10 +19,8 @@ use zenith_utils::http::{
use super::models::BranchCreateRequest;
use super::models::TenantCreateRequest;
use crate::{
branches::{self},
tenant_mgr, PageServerConf, ZTenantId,
};
use crate::branches::BranchInfo;
use crate::{branches, tenant_mgr, PageServerConf, ZTenantId};
#[derive(Debug)]
struct State {
@@ -57,6 +56,35 @@ fn get_config(request: &Request<Body>) -> &'static PageServerConf {
get_state(request).conf
}
fn get_request_param<'a>(
request: &'a Request<Body>,
param_name: &str,
) -> Result<&'a str, ApiError> {
match request.param(param_name) {
Some(arg) => Ok(arg),
None => {
return Err(ApiError::BadRequest(format!(
"no {} specified in path param",
param_name
)))
}
}
}
fn parse_request_param<T: FromStr>(
request: &Request<Body>,
param_name: &str,
) -> Result<T, ApiError> {
match get_request_param(request, param_name)?.parse() {
Ok(v) => Ok(v),
Err(_) => {
return Err(ApiError::BadRequest(
"failed to parse tenant id".to_string(),
))
}
}
}
// healthcheck handler
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
Ok(Response::builder()
@@ -85,16 +113,7 @@ async fn branch_create_handler(mut request: Request<Body>) -> Result<Response<Bo
}
async fn branch_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenantid: ZTenantId = match request.param("tenant_id") {
Some(arg) => arg
.parse()
.map_err(|_| ApiError::BadRequest("failed to parse tenant id".to_string()))?,
None => {
return Err(ApiError::BadRequest(
"no tenant id specified in path param".to_string(),
))
}
};
let tenantid: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenantid))?;
@@ -106,6 +125,23 @@ async fn branch_list_handler(request: Request<Body>) -> Result<Response<Body>, A
Ok(json_response(StatusCode::OK, response_data)?)
}
// TODO add to swagger
async fn branch_detail_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenantid: ZTenantId = parse_request_param(&request, "tenant_id")?;
let branch_name: &str = get_request_param(&request, "branch_name")?;
let conf = get_state(&request).conf;
let path = conf.branch_path(&branch_name, &tenantid);
let response_data = tokio::task::spawn_blocking(move || {
let repo = tenant_mgr::get_repository_for_tenant(&tenantid)?;
BranchInfo::from_path(path, conf, &tenantid, &repo)
})
.await
.map_err(ApiError::from_err)??;
Ok(json_response(StatusCode::OK, response_data)?)
}
async fn tenant_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
// check for management permission
check_permission(&request, None)?;
@@ -159,6 +195,7 @@ pub fn make_router(
.data(Arc::new(State::new(conf, auth)))
.get("/v1/status", status_handler)
.get("/v1/branch/:tenant_id", branch_list_handler)
.get("/v1/branch/:tenant_id/:branch_name", branch_detail_handler)
.post("/v1/branch", branch_create_handler)
.get("/v1/tenant", tenant_list_handler)
.post("/v1/tenant", tenant_create_handler)

View File

@@ -15,18 +15,20 @@ use anyhow::{anyhow, bail, Context, Result};
use bytes::Bytes;
use lazy_static::lazy_static;
use log::*;
use postgres_ffi::pg_constants::BLCKSZ;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{BTreeSet, HashSet};
use std::fs;
use std::fs::File;
use std::io::Write;
use std::ops::Bound::Included;
use std::path::Path;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::{fs, thread};
use crate::relish::*;
use crate::repository::{GcResult, Repository, Timeline, WALRecord};
@@ -35,7 +37,7 @@ use crate::walredo::WalRedoManager;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
use zenith_metrics::{register_histogram, Histogram};
use zenith_metrics::{register_histogram, register_int_gauge_vec, Histogram, IntGaugeVec};
use zenith_metrics::{register_histogram_vec, HistogramVec};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::{AtomicLsn, Lsn, RecordLsn};
@@ -92,6 +94,16 @@ lazy_static! {
.expect("failed to define a metric");
}
lazy_static! {
// NOTE: can be zero if pageserver was restarted an no activity happening
static ref LOGICAL_TIMELINE_SIZE: IntGaugeVec = register_int_gauge_vec!(
"pageserver_logical_timeline_size",
"Logical timeline size (bytes)",
&["tenant_id", "timeline_id"]
)
.expect("failed to define a metric");
}
///
/// Repository consists of multiple timelines. Keep them in a hash table.
///
@@ -138,6 +150,7 @@ impl Repository for LayeredRepository {
timelineid,
self.tenantid,
self.walredo_mgr.clone(),
0,
)?;
let timeline_rc = Arc::new(timeline);
@@ -221,18 +234,24 @@ impl LayeredRepository {
None
};
let timeline = LayeredTimeline::new(
let mut timeline = LayeredTimeline::new(
self.conf,
metadata,
ancestor,
timelineid,
self.tenantid,
self.walredo_mgr.clone(),
0, // init with 0 and update after layers are loaded
)?;
// List the layers on disk, and load them into the layer map
timeline.load_layer_map()?;
// needs to be after load_layer_map
timeline.init_current_logical_size()?;
let timeline = Arc::new(timeline);
// Load any new WAL after the last checkpoint into memory.
info!(
"Loading WAL for timeline {} starting at {}",
@@ -243,11 +262,15 @@ impl LayeredRepository {
.conf
.timeline_path(&timelineid, &self.tenantid)
.join("wal");
import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?;
import_timeline_wal(&wal_dir, timeline.as_ref(), timeline.get_last_record_lsn())?;
let timeline_rc = Arc::new(timeline);
timelines.insert(timelineid, timeline_rc.clone());
Ok(timeline_rc)
if cfg!(debug_assertions) {
// check again after wal loading
Self::assert_size_calculation_matches_offloaded(Arc::clone(&timeline));
}
timelines.insert(timelineid, timeline.clone());
Ok(timeline.clone())
}
}
}
@@ -445,6 +468,24 @@ impl LayeredRepository {
totals.elapsed = now.elapsed();
Ok(totals)
}
fn assert_size_calculation_matches(incremental: usize, timeline: &LayeredTimeline) {
match timeline.get_current_logical_size_non_incremental(timeline.get_last_record_lsn()) {
Ok(non_incremental) => {
if incremental != non_incremental {
error!("timeline size calculation diverged, incremental doesn't match non incremental. incremental={} non_incremental={}", incremental, non_incremental);
}
}
Err(e) => error!("failed to calculate non incremental timeline size: {}", e),
}
}
fn assert_size_calculation_matches_offloaded(timeline: Arc<LayeredTimeline>) {
let incremental = timeline.get_current_logical_size();
thread::spawn(move || {
Self::assert_size_calculation_matches(incremental, &timeline);
});
}
}
/// Metadata stored on disk for each timeline
@@ -509,6 +550,16 @@ pub struct LayeredTimeline {
// of the branch point.
ancestor_timeline: Option<Arc<LayeredTimeline>>,
ancestor_lsn: Lsn,
// this variable indicates how much space is used from user's point of view,
// e.g. we do not account here for multiple versions of data and so on.
// this is counted incrementally based on physical relishes (excluding FileNodeMap)
// current_logical_size is not stored no disk and initialized on timeline creation using
// get_current_logical_size_non_incremental in init_current_logical_size
// this is needed because when we save it in metadata it can become out of sync
// because current_logical_size is consistent on last_record_lsn, not ondisk_consistent_lsn
// NOTE: current_logical_size also includes size of the ancestor
current_logical_size: AtomicUsize, // bytes
}
/// Public interface functions
@@ -654,9 +705,9 @@ impl Timeline for LayeredTimeline {
}
let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.get_layer_for_write(seg, rec.lsn)?;
layer.put_wal_record(blknum, rec)
self.increase_current_logical_size(layer.put_wal_record(blknum, rec)? * BLCKSZ as u32);
Ok(())
}
fn put_truncation(&self, rel: RelishTag, lsn: Lsn, relsize: u32) -> anyhow::Result<()> {
@@ -706,7 +757,7 @@ impl Timeline for LayeredTimeline {
let layer = self.get_layer_for_write(seg, lsn)?;
layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE)?;
}
self.decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32);
Ok(())
}
@@ -714,8 +765,7 @@ impl Timeline for LayeredTimeline {
trace!("drop_segment: {} at {}", rel, lsn);
if rel.is_blocky() {
let oldsize_opt = self.get_relish_size(rel, self.get_last_record_lsn())?;
if let Some(oldsize) = oldsize_opt {
if let Some(oldsize) = self.get_relish_size(rel, self.get_last_record_lsn())? {
let old_last_seg = if oldsize == 0 {
0
} else {
@@ -731,6 +781,7 @@ impl Timeline for LayeredTimeline {
let layer = self.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn)?;
}
self.decrease_current_logical_size(oldsize * BLCKSZ as u32);
} else {
warn!(
"drop_segment called on non-existent relish {} at {}",
@@ -738,6 +789,7 @@ impl Timeline for LayeredTimeline {
);
}
} else {
// TODO handle TwoPhase relishes
let seg = SegmentTag::from_blknum(rel, 0);
let layer = self.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn)?;
@@ -758,7 +810,8 @@ impl Timeline for LayeredTimeline {
let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.get_layer_for_write(seg, lsn)?;
layer.put_page_image(blknum, lsn, img)
self.increase_current_logical_size(layer.put_page_image(blknum, lsn, img)? * BLCKSZ as u32);
Ok(())
}
/// Public entry point for checkpoint(). All the logic is in the private
@@ -802,6 +855,35 @@ impl Timeline for LayeredTimeline {
self.ancestor_lsn
}
}
fn get_current_logical_size(&self) -> usize {
self.current_logical_size.load(Ordering::Acquire) as usize
}
fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize> {
let mut total_blocks: usize = 0;
// list of all relations in this timeline, including ancestor timelines
let all_rels = self.list_rels(0, 0, lsn)?;
for rel in all_rels {
if let Some(size) = self.get_relish_size(RelishTag::Relation(rel), lsn)? {
total_blocks += size as usize;
}
}
let non_rels = self.list_nonrels(lsn)?;
for non_rel in non_rels {
// TODO support TwoPhase
if matches!(non_rel, RelishTag::Slru { slru: _, segno: _ }) {
if let Some(size) = self.get_relish_size(non_rel, lsn)? {
total_blocks += size as usize;
}
}
}
Ok(total_blocks * BLCKSZ as usize)
}
}
impl LayeredTimeline {
@@ -815,6 +897,7 @@ impl LayeredTimeline {
timelineid: ZTimelineId,
tenantid: ZTenantId,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
current_logical_size: usize,
) -> Result<LayeredTimeline> {
let timeline = LayeredTimeline {
conf,
@@ -833,6 +916,7 @@ impl LayeredTimeline {
ancestor_timeline: ancestor,
ancestor_lsn: metadata.ancestor_lsn,
current_logical_size: AtomicUsize::new(current_logical_size),
};
Ok(timeline)
}
@@ -896,6 +980,23 @@ impl LayeredTimeline {
Ok(())
}
///
/// Used to init current logical size on startup
///
fn init_current_logical_size(&mut self) -> Result<()> {
if self.current_logical_size.load(Ordering::Relaxed) != 0 {
bail!("cannot init already initialized current logical size")
};
let lsn = self.get_last_record_lsn();
self.current_logical_size =
AtomicUsize::new(self.get_current_logical_size_non_incremental(lsn)?);
trace!(
"current_logical_size initialized to {}",
self.current_logical_size.load(Ordering::Relaxed)
);
Ok(())
}
///
/// Get a handle to a Layer for reading.
///
@@ -1455,6 +1556,42 @@ impl LayeredTimeline {
}
}
}
///
/// This is a helper function to increase current_total_relation_size
///
fn increase_current_logical_size(&self, diff: u32) {
let val = self
.current_logical_size
.fetch_add(diff as usize, Ordering::SeqCst);
trace!(
"increase_current_logical_size: {} + {} = {}",
val,
diff,
val + diff as usize,
);
LOGICAL_TIMELINE_SIZE
.with_label_values(&[&self.tenantid.to_string(), &self.timelineid.to_string()])
.set(val as i64 + diff as i64)
}
///
/// This is a helper function to decrease current_total_relation_size
///
fn decrease_current_logical_size(&self, diff: u32) {
let val = self
.current_logical_size
.fetch_sub(diff as usize, Ordering::SeqCst);
trace!(
"decrease_current_logical_size: {} - {} = {}",
val,
diff,
val - diff as usize,
);
LOGICAL_TIMELINE_SIZE
.with_label_values(&[&self.tenantid.to_string(), &self.timelineid.to_string()])
.set(val as i64 - diff as i64)
}
}
/// Dump contents of a layer file to stdout.

View File

@@ -310,7 +310,7 @@ impl InMemoryLayer {
// Write operations
/// Remember new page version, as a WAL record over previous version
pub fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> Result<()> {
pub fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> Result<u32> {
self.put_page_version(
blknum,
rec.lsn,
@@ -322,7 +322,7 @@ impl InMemoryLayer {
}
/// Remember new page version, as a full page image
pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> Result<()> {
pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> Result<u32> {
self.put_page_version(
blknum,
lsn,
@@ -335,7 +335,7 @@ impl InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result<()> {
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result<u32> {
assert!(self.seg.blknum_in_seg(blknum));
trace!(
@@ -403,15 +403,20 @@ impl InMemoryLayer {
}
inner.segsizes.insert(lsn, newsize);
return Ok(newsize - oldsize);
}
}
Ok(())
Ok(0)
}
/// Remember that the relation was truncated at given LSN
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> anyhow::Result<()> {
let mut inner = self.inner.lock().unwrap();
// check that this we truncate to a smaller size than segment was before the truncation
let oldsize = inner.get_seg_size(lsn);
assert!(segsize < oldsize);
let old = inner.segsizes.insert(lsn, segsize);
if old.is_some() {

View File

@@ -45,9 +45,6 @@ pub trait Repository: Send + Sync {
horizon: u64,
compact: bool,
) -> Result<GcResult>;
// TODO get timelines?
//fn get_stats(&self) -> RepositoryStats;
}
///
@@ -133,7 +130,7 @@ pub trait Timeline: Send + Sync {
/// Truncate relation
fn put_truncation(&self, rel: RelishTag, lsn: Lsn, nblocks: u32) -> Result<()>;
/// This method is used for marking dropped relations and truncated SLRU files
/// This method is used for marking dropped relations and truncated SLRU files and aborted two phase records
fn drop_relish(&self, tag: RelishTag, lsn: Lsn) -> Result<()>;
/// Track end of the latest digested WAL record.
@@ -154,14 +151,16 @@ pub trait Timeline: Send + Sync {
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
fn checkpoint(&self) -> Result<()>;
}
#[derive(Clone)]
pub struct RepositoryStats {
pub num_entries: Lsn,
pub num_page_images: Lsn,
pub num_wal_records: Lsn,
pub num_getpage_requests: Lsn,
/// Retrieve current logical size of the timeline
///
/// NOTE: counted incrementally, includes ancestors,
/// doesnt support TwoPhase relishes yet
fn get_current_logical_size(&self) -> usize;
/// Does the same as get_current_logical_size but counted on demand.
/// Used in tests to ensure thet incremental and non incremental variants match.
fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize>;
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@@ -239,6 +238,14 @@ mod tests {
buf.freeze()
}
fn assert_current_logical_size(timeline: &Arc<dyn Timeline>, lsn: Lsn) {
let incremental = timeline.get_current_logical_size();
let non_incremental = timeline
.get_current_logical_size_non_incremental(lsn)
.unwrap();
assert_eq!(incremental, non_incremental);
}
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
fn get_test_repo(test_name: &str) -> Result<Box<dyn Repository>> {
@@ -294,6 +301,8 @@ mod tests {
tline.advance_last_record_lsn(Lsn(0x50));
assert_current_logical_size(&tline, Lsn(0x50));
// The relation was created at LSN 2, not visible at LSN 1 yet.
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false);
assert!(tline.get_relish_size(TESTREL_A, Lsn(0x10))?.is_none());
@@ -338,6 +347,7 @@ mod tests {
// Truncate last block
tline.put_truncation(TESTREL_A, Lsn(0x60), 2)?;
tline.advance_last_record_lsn(Lsn(0x60));
assert_current_logical_size(&tline, Lsn(0x60));
// Check reported size and contents after truncation
assert_eq!(tline.get_relish_size(TESTREL_A, Lsn(0x60))?.unwrap(), 2);
@@ -407,6 +417,8 @@ mod tests {
}
tline.advance_last_record_lsn(Lsn(lsn));
assert_current_logical_size(&tline, Lsn(lsn));
assert_eq!(
tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(),
pg_constants::RELSEG_SIZE + 1
@@ -420,6 +432,7 @@ mod tests {
tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(),
pg_constants::RELSEG_SIZE
);
assert_current_logical_size(&tline, Lsn(lsn));
// Truncate another block
lsn += 0x10;
@@ -429,6 +442,7 @@ mod tests {
tline.get_relish_size(TESTREL_A, Lsn(lsn))?.unwrap(),
pg_constants::RELSEG_SIZE - 1
);
assert_current_logical_size(&tline, Lsn(lsn));
// Truncate to 1500, and then truncate all the way down to 0, one block at a time
// This tests the behavior at segment boundaries
@@ -444,6 +458,7 @@ mod tests {
size -= 1;
}
assert_current_logical_size(&tline, Lsn(lsn));
Ok(())
}
@@ -470,6 +485,7 @@ mod tests {
tline.put_page_image(TESTREL_B, 0, Lsn(0x20), TEST_IMG("foobar blk 0 at 2"))?;
tline.advance_last_record_lsn(Lsn(0x40));
assert_current_logical_size(&tline, Lsn(0x40));
// Branch the history, modify relation differently on the new timeline
let newtimelineid = ZTimelineId::from_str("AA223344556677881122334455667788").unwrap();
@@ -497,6 +513,8 @@ mod tests {
assert_eq!(newtline.get_relish_size(TESTREL_B, Lsn(0x40))?.unwrap(), 1);
assert_current_logical_size(&tline, Lsn(0x40));
Ok(())
}

View File

@@ -0,0 +1,39 @@
from contextlib import closing
from uuid import UUID
import psycopg2.extras
from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver
def test_timeline_size(
zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin
):
# Branch at the point where only 100 rows were inserted
zenith_cli.run(["branch", "test_timeline_size", "empty"])
client = pageserver.http_client()
res = client.branch_detail(UUID(pageserver.initial_tenant), "test_timeline_size")
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
pgmain = postgres.create_start("test_timeline_size")
print("postgres is running on 'test_timeline_size' branch")
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SHOW zenith.zenith_timeline")
# Create table, and insert the first 100 rows
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 10) g
"""
)
res = client.branch_detail(UUID(pageserver.initial_tenant), "test_timeline_size")
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]
cur.execute("TRUNCATE foo")
res = client.branch_detail(UUID(pageserver.initial_tenant), "test_timeline_size")
assert res["current_logical_size"] == res["current_logical_size_non_incremental"]

View File

@@ -224,6 +224,13 @@ class ZenithPageserverHttpClient(requests.Session):
res.raise_for_status()
return res.json()
def branch_detail(self, tenant_id: uuid.UUID, name: str) -> Dict:
res = self.get(
f"http://localhost:{self.port}/v1/branch/{tenant_id.hex}/{name}",
)
res.raise_for_status()
return res.json()
def tenant_list(self) -> List[str]:
res = self.get(f"http://localhost:{self.port}/v1/tenant")
res.raise_for_status()

View File

@@ -13,7 +13,6 @@ use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use pageserver::branches::BranchInfo;
use zenith_utils::lsn::Lsn;
///
/// Branches tree element used as a value in the HashMap.
@@ -393,9 +392,7 @@ fn handle_branch(branch_match: &ArgMatches, env: &local_env::LocalEnv) -> Result
let branch = pageserver.branch_create(branchname, startpoint_str, &tenantid)?;
println!(
"Created branch '{}' at {:?} for tenant: {}",
branch.name,
branch.latest_valid_lsn.unwrap_or(Lsn(0)),
tenantid,
branch.name, branch.latest_valid_lsn, tenantid,
);
} else {
let tenantid: ZTenantId = branch_match
@@ -434,9 +431,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
node.address,
branch_infos
.get(&node.timelineid)
.map(|bi| bi
.latest_valid_lsn
.map_or("?".to_string(), |lsn| lsn.to_string()))
.map(|bi| bi.latest_valid_lsn.to_string())
.unwrap_or_else(|| "?".to_string()),
node.status(),
);