mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
Move Tenant- and TimelineInfo structs to models.rs.
They are part of the management API response structs. Let's try to concentrate everything that's part of the API in models.rs.
This commit is contained in:
committed by
Heikki Linnakangas
parent
d903dd61bd
commit
02afa2762c
@@ -12,9 +12,9 @@ use anyhow::{bail, Context};
|
||||
use nix::errno::Errno;
|
||||
use nix::sys::signal::{kill, Signal};
|
||||
use nix::unistd::Pid;
|
||||
use pageserver::http::models::{TenantConfigRequest, TenantCreateRequest, TimelineCreateRequest};
|
||||
use pageserver::tenant_mgr::TenantInfo;
|
||||
use pageserver::timelines::TimelineInfo;
|
||||
use pageserver::http::models::{
|
||||
TenantConfigRequest, TenantCreateRequest, TenantInfo, TimelineCreateRequest, TimelineInfo,
|
||||
};
|
||||
use postgres::{Config, NoTls};
|
||||
use reqwest::blocking::{Client, RequestBuilder, Response};
|
||||
use reqwest::{IntoUrl, Method};
|
||||
|
||||
@@ -9,6 +9,7 @@ use pageserver::config::defaults::{
|
||||
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
|
||||
DEFAULT_PG_LISTEN_ADDR as DEFAULT_PAGESERVER_PG_ADDR,
|
||||
};
|
||||
use pageserver::http::models::TimelineInfo;
|
||||
use safekeeper::defaults::{
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
|
||||
@@ -25,8 +26,6 @@ use utils::{
|
||||
zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
|
||||
};
|
||||
|
||||
use pageserver::timelines::TimelineInfo;
|
||||
|
||||
// Default id of a safekeeper node, if not specified on the command line.
|
||||
const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
|
||||
const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
|
||||
|
||||
@@ -7,6 +7,10 @@ use utils::{
|
||||
zid::{NodeId, ZTenantId, ZTimelineId},
|
||||
};
|
||||
|
||||
// These enums are used in the API response fields.
|
||||
use crate::repository::LocalTimelineState;
|
||||
use crate::tenant_mgr::TenantState;
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TimelineCreateRequest {
|
||||
@@ -108,3 +112,54 @@ pub struct WalReceiverEntry {
|
||||
/// the timestamp (in microseconds) of the last received message
|
||||
pub last_received_msg_ts: Option<u128>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct TenantInfo {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub id: ZTenantId,
|
||||
pub state: Option<TenantState>,
|
||||
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
|
||||
pub has_in_progress_downloads: Option<bool>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct LocalTimelineInfo {
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub ancestor_timeline_id: Option<ZTimelineId>,
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub ancestor_lsn: Option<Lsn>,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub last_record_lsn: Lsn,
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub prev_record_lsn: Option<Lsn>,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub latest_gc_cutoff_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub disk_consistent_lsn: Lsn,
|
||||
pub current_logical_size: Option<usize>, // is None when timeline is Unloaded
|
||||
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
|
||||
pub current_logical_size_non_incremental: Option<usize>,
|
||||
pub current_physical_size_non_incremental: Option<u64>,
|
||||
pub timeline_state: LocalTimelineState,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct RemoteTimelineInfo {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
pub awaits_download: bool,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct TimelineInfo {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub tenant_id: ZTenantId,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub timeline_id: ZTimelineId,
|
||||
pub local: Option<LocalTimelineInfo>,
|
||||
pub remote: Option<RemoteTimelineInfo>,
|
||||
}
|
||||
|
||||
@@ -6,16 +6,19 @@ use hyper::{Body, Request, Response, Uri};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use tracing::*;
|
||||
|
||||
use super::models::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo};
|
||||
use super::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse,
|
||||
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
|
||||
TimelineCreateRequest,
|
||||
};
|
||||
use crate::repository::Repository;
|
||||
use crate::layered_repository::metadata::TimelineMetadata;
|
||||
use crate::pgdatadir_mapping::DatadirTimeline;
|
||||
use crate::repository::{LocalTimelineState, RepositoryTimeline};
|
||||
use crate::repository::{Repository, Timeline};
|
||||
use crate::storage_sync;
|
||||
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
|
||||
use crate::tenant_config::TenantConfOpt;
|
||||
use crate::tenant_mgr::TenantInfo;
|
||||
use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo};
|
||||
use crate::TimelineImpl;
|
||||
use crate::{config::PageServerConf, tenant_mgr, timelines};
|
||||
use utils::{
|
||||
auth::JwtAuth,
|
||||
@@ -26,6 +29,7 @@ use utils::{
|
||||
request::parse_request_param,
|
||||
RequestExt, RouterBuilder,
|
||||
},
|
||||
lsn::Lsn,
|
||||
zid::{ZTenantId, ZTenantTimelineId, ZTimelineId},
|
||||
};
|
||||
|
||||
@@ -79,6 +83,104 @@ fn get_config(request: &Request<Body>) -> &'static PageServerConf {
|
||||
get_state(request).conf
|
||||
}
|
||||
|
||||
// Helper functions to construct a LocalTimelineInfo struct for a timeline
|
||||
|
||||
fn local_timeline_info_from_loaded_timeline(
|
||||
timeline: &TimelineImpl,
|
||||
include_non_incremental_logical_size: bool,
|
||||
include_non_incremental_physical_size: bool,
|
||||
) -> anyhow::Result<LocalTimelineInfo> {
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
let info = LocalTimelineInfo {
|
||||
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
|
||||
ancestor_lsn: {
|
||||
match timeline.get_ancestor_lsn() {
|
||||
Lsn(0) => None,
|
||||
lsn @ Lsn(_) => Some(lsn),
|
||||
}
|
||||
},
|
||||
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
|
||||
last_record_lsn,
|
||||
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
|
||||
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
|
||||
timeline_state: LocalTimelineState::Loaded,
|
||||
current_logical_size: Some(timeline.get_current_logical_size()),
|
||||
current_physical_size: Some(timeline.get_physical_size()),
|
||||
current_logical_size_non_incremental: if include_non_incremental_logical_size {
|
||||
Some(timeline.get_current_logical_size_non_incremental(last_record_lsn)?)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
current_physical_size_non_incremental: if include_non_incremental_physical_size {
|
||||
Some(timeline.get_physical_size_non_incremental()?)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
};
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
fn local_timeline_info_from_unloaded_timeline(metadata: &TimelineMetadata) -> LocalTimelineInfo {
|
||||
LocalTimelineInfo {
|
||||
ancestor_timeline_id: metadata.ancestor_timeline(),
|
||||
ancestor_lsn: {
|
||||
match metadata.ancestor_lsn() {
|
||||
Lsn(0) => None,
|
||||
lsn @ Lsn(_) => Some(lsn),
|
||||
}
|
||||
},
|
||||
disk_consistent_lsn: metadata.disk_consistent_lsn(),
|
||||
last_record_lsn: metadata.disk_consistent_lsn(),
|
||||
prev_record_lsn: metadata.prev_record_lsn(),
|
||||
latest_gc_cutoff_lsn: metadata.latest_gc_cutoff_lsn(),
|
||||
timeline_state: LocalTimelineState::Unloaded,
|
||||
current_logical_size: None,
|
||||
current_physical_size: None,
|
||||
current_logical_size_non_incremental: None,
|
||||
current_physical_size_non_incremental: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn local_timeline_info_from_repo_timeline(
|
||||
repo_timeline: &RepositoryTimeline<TimelineImpl>,
|
||||
include_non_incremental_logical_size: bool,
|
||||
include_non_incremental_physical_size: bool,
|
||||
) -> anyhow::Result<LocalTimelineInfo> {
|
||||
match repo_timeline {
|
||||
RepositoryTimeline::Loaded(timeline) => local_timeline_info_from_loaded_timeline(
|
||||
&*timeline,
|
||||
include_non_incremental_logical_size,
|
||||
include_non_incremental_physical_size,
|
||||
),
|
||||
RepositoryTimeline::Unloaded { metadata } => {
|
||||
Ok(local_timeline_info_from_unloaded_timeline(metadata))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn list_local_timelines(
|
||||
tenant_id: ZTenantId,
|
||||
include_non_incremental_logical_size: bool,
|
||||
include_non_incremental_physical_size: bool,
|
||||
) -> Result<Vec<(ZTimelineId, LocalTimelineInfo)>> {
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
|
||||
.with_context(|| format!("Failed to get repo for tenant {}", tenant_id))?;
|
||||
let repo_timelines = repo.list_timelines();
|
||||
|
||||
let mut local_timeline_info = Vec::with_capacity(repo_timelines.len());
|
||||
for (timeline_id, repository_timeline) in repo_timelines {
|
||||
local_timeline_info.push((
|
||||
timeline_id,
|
||||
local_timeline_info_from_repo_timeline(
|
||||
&repository_timeline,
|
||||
include_non_incremental_logical_size,
|
||||
include_non_incremental_physical_size,
|
||||
)?,
|
||||
))
|
||||
}
|
||||
Ok(local_timeline_info)
|
||||
}
|
||||
|
||||
// healthcheck handler
|
||||
async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let config = get_config(&request);
|
||||
@@ -93,16 +195,30 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
|
||||
let new_timeline_info = tokio::task::spawn_blocking(move || {
|
||||
let _enter = info_span!("/timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, lsn=?request_data.ancestor_start_lsn).entered();
|
||||
timelines::create_timeline(
|
||||
|
||||
match timelines::create_timeline(
|
||||
get_config(&request),
|
||||
tenant_id,
|
||||
request_data.new_timeline_id.map(ZTimelineId::from),
|
||||
request_data.ancestor_timeline_id.map(ZTimelineId::from),
|
||||
request_data.ancestor_start_lsn,
|
||||
)
|
||||
) {
|
||||
Ok(Some((new_timeline_id, new_timeline))) => {
|
||||
// Created. Construct a TimelineInfo for it.
|
||||
let local_info = local_timeline_info_from_loaded_timeline(new_timeline.as_ref(), false, false)?;
|
||||
Ok(Some(TimelineInfo {
|
||||
tenant_id,
|
||||
timeline_id: new_timeline_id,
|
||||
local: Some(local_info),
|
||||
remote: None,
|
||||
}))
|
||||
}
|
||||
Ok(None) => Ok(None), // timeline already exists
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(ApiError::from_err)??;
|
||||
.map_err(ApiError::from_err)??;
|
||||
|
||||
Ok(match new_timeline_info {
|
||||
Some(info) => json_response(StatusCode::CREATED, info)?,
|
||||
@@ -119,7 +235,7 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
|
||||
query_param_present(&request, "include-non-incremental-physical-size");
|
||||
let local_timeline_infos = tokio::task::spawn_blocking(move || {
|
||||
let _enter = info_span!("timeline_list", tenant = %tenant_id).entered();
|
||||
crate::timelines::get_local_timelines(
|
||||
list_local_timelines(
|
||||
tenant_id,
|
||||
include_non_incremental_logical_size,
|
||||
include_non_incremental_physical_size,
|
||||
@@ -184,9 +300,7 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
repo.get_timeline(timeline_id)
|
||||
.as_ref()
|
||||
.map(|timeline| {
|
||||
LocalTimelineInfo::from_repo_timeline(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
local_timeline_info_from_repo_timeline(
|
||||
timeline,
|
||||
include_non_incremental_logical_size,
|
||||
include_non_incremental_physical_size,
|
||||
@@ -444,24 +558,23 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
false
|
||||
});
|
||||
|
||||
let current_physical_size = match tokio::task::spawn_blocking(move || {
|
||||
crate::timelines::get_local_timelines(tenant_id, false, false)
|
||||
})
|
||||
.await
|
||||
.map_err(ApiError::from_err)?
|
||||
{
|
||||
Err(err) => {
|
||||
// Getting local timelines can fail when no local repo is on disk (e.g, when tenant data is being downloaded).
|
||||
// In that case, put a warning message into log and operate normally.
|
||||
warn!("Failed to get local timelines for tenant {tenant_id}: {err}");
|
||||
None
|
||||
}
|
||||
Ok(local_timeline_infos) => Some(
|
||||
local_timeline_infos
|
||||
.into_iter()
|
||||
.fold(0, |acc, x| acc + x.1.current_physical_size.unwrap()),
|
||||
),
|
||||
};
|
||||
let current_physical_size =
|
||||
match tokio::task::spawn_blocking(move || list_local_timelines(tenant_id, false, false))
|
||||
.await
|
||||
.map_err(ApiError::from_err)?
|
||||
{
|
||||
Err(err) => {
|
||||
// Getting local timelines can fail when no local repo is on disk (e.g, when tenant data is being downloaded).
|
||||
// In that case, put a warning message into log and operate normally.
|
||||
warn!("Failed to get local timelines for tenant {tenant_id}: {err}");
|
||||
None
|
||||
}
|
||||
Ok(local_timeline_infos) => Some(
|
||||
local_timeline_infos
|
||||
.into_iter()
|
||||
.fold(0, |acc, x| acc + x.1.current_physical_size.unwrap()),
|
||||
),
|
||||
};
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
|
||||
@@ -277,15 +277,6 @@ pub enum LocalTimelineState {
|
||||
Unloaded,
|
||||
}
|
||||
|
||||
impl<'a, T> From<&'a RepositoryTimeline<T>> for LocalTimelineState {
|
||||
fn from(local_timeline_entry: &'a RepositoryTimeline<T>) -> Self {
|
||||
match local_timeline_entry {
|
||||
RepositoryTimeline::Loaded(_) => LocalTimelineState::Loaded,
|
||||
RepositoryTimeline::Unloaded { .. } => LocalTimelineState::Unloaded,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Result of performing GC
|
||||
///
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
//! page server.
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::http::models::TenantInfo;
|
||||
use crate::layered_repository::{load_metadata, LayeredRepository};
|
||||
use crate::repository::Repository;
|
||||
use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex};
|
||||
@@ -14,7 +15,6 @@ use crate::{thread_mgr, timelines, walreceiver};
|
||||
use crate::{RepositoryImpl, TimelineImpl};
|
||||
use anyhow::Context;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt;
|
||||
@@ -502,16 +502,9 @@ fn load_local_timeline(
|
||||
Ok(inmem_timeline)
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct TenantInfo {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub id: ZTenantId,
|
||||
pub state: Option<TenantState>,
|
||||
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
|
||||
pub has_in_progress_downloads: Option<bool>,
|
||||
}
|
||||
|
||||
///
|
||||
/// Get list of tenants, for the mgmt API
|
||||
///
|
||||
pub fn list_tenants(remote_index: &RemoteTimelineIndex) -> Vec<TenantInfo> {
|
||||
tenants_state::read_tenants()
|
||||
.iter()
|
||||
|
||||
@@ -4,8 +4,6 @@
|
||||
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use postgres_ffi::ControlFileData;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use std::{
|
||||
fs,
|
||||
path::Path,
|
||||
@@ -20,138 +18,15 @@ use utils::{
|
||||
zid::{ZTenantId, ZTimelineId},
|
||||
};
|
||||
|
||||
use crate::tenant_mgr;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
layered_repository::metadata::TimelineMetadata,
|
||||
repository::{LocalTimelineState, Repository},
|
||||
storage_sync::index::RemoteIndex,
|
||||
tenant_config::TenantConfOpt,
|
||||
DatadirTimeline, RepositoryImpl, TimelineImpl,
|
||||
config::PageServerConf, repository::Repository, storage_sync::index::RemoteIndex,
|
||||
tenant_config::TenantConfOpt, RepositoryImpl, TimelineImpl,
|
||||
};
|
||||
use crate::{import_datadir, LOG_FILE_NAME};
|
||||
use crate::{layered_repository::LayeredRepository, walredo::WalRedoManager};
|
||||
use crate::{repository::RepositoryTimeline, tenant_mgr};
|
||||
use crate::{repository::Timeline, CheckpointConfig};
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct LocalTimelineInfo {
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub ancestor_timeline_id: Option<ZTimelineId>,
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub ancestor_lsn: Option<Lsn>,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub last_record_lsn: Lsn,
|
||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||
pub prev_record_lsn: Option<Lsn>,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub latest_gc_cutoff_lsn: Lsn,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub disk_consistent_lsn: Lsn,
|
||||
pub current_logical_size: Option<usize>, // is None when timeline is Unloaded
|
||||
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
|
||||
pub current_logical_size_non_incremental: Option<usize>,
|
||||
pub current_physical_size_non_incremental: Option<u64>,
|
||||
pub timeline_state: LocalTimelineState,
|
||||
}
|
||||
|
||||
impl LocalTimelineInfo {
|
||||
pub fn from_loaded_timeline(
|
||||
timeline: &TimelineImpl,
|
||||
include_non_incremental_logical_size: bool,
|
||||
include_non_incremental_physical_size: bool,
|
||||
) -> anyhow::Result<Self> {
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
let info = LocalTimelineInfo {
|
||||
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
|
||||
ancestor_lsn: {
|
||||
match timeline.get_ancestor_lsn() {
|
||||
Lsn(0) => None,
|
||||
lsn @ Lsn(_) => Some(lsn),
|
||||
}
|
||||
},
|
||||
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
|
||||
last_record_lsn,
|
||||
prev_record_lsn: Some(timeline.get_prev_record_lsn()),
|
||||
latest_gc_cutoff_lsn: *timeline.get_latest_gc_cutoff_lsn(),
|
||||
timeline_state: LocalTimelineState::Loaded,
|
||||
current_physical_size: Some(timeline.get_physical_size()),
|
||||
current_logical_size: Some(timeline.get_current_logical_size()),
|
||||
current_logical_size_non_incremental: if include_non_incremental_logical_size {
|
||||
Some(timeline.get_current_logical_size_non_incremental(last_record_lsn)?)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
current_physical_size_non_incremental: if include_non_incremental_physical_size {
|
||||
Some(timeline.get_physical_size_non_incremental()?)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
};
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
pub fn from_unloaded_timeline(metadata: &TimelineMetadata) -> Self {
|
||||
LocalTimelineInfo {
|
||||
ancestor_timeline_id: metadata.ancestor_timeline(),
|
||||
ancestor_lsn: {
|
||||
match metadata.ancestor_lsn() {
|
||||
Lsn(0) => None,
|
||||
lsn @ Lsn(_) => Some(lsn),
|
||||
}
|
||||
},
|
||||
disk_consistent_lsn: metadata.disk_consistent_lsn(),
|
||||
last_record_lsn: metadata.disk_consistent_lsn(),
|
||||
prev_record_lsn: metadata.prev_record_lsn(),
|
||||
latest_gc_cutoff_lsn: metadata.latest_gc_cutoff_lsn(),
|
||||
timeline_state: LocalTimelineState::Unloaded,
|
||||
current_logical_size: None,
|
||||
current_physical_size: None,
|
||||
current_logical_size_non_incremental: None,
|
||||
current_physical_size_non_incremental: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_repo_timeline<T>(
|
||||
tenant_id: ZTenantId,
|
||||
timeline_id: ZTimelineId,
|
||||
repo_timeline: &RepositoryTimeline<T>,
|
||||
include_non_incremental_logical_size: bool,
|
||||
include_non_incremental_physical_size: bool,
|
||||
) -> anyhow::Result<Self> {
|
||||
match repo_timeline {
|
||||
RepositoryTimeline::Loaded(_) => {
|
||||
let timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id)?;
|
||||
Self::from_loaded_timeline(
|
||||
&*timeline,
|
||||
include_non_incremental_logical_size,
|
||||
include_non_incremental_physical_size,
|
||||
)
|
||||
}
|
||||
RepositoryTimeline::Unloaded { metadata } => Ok(Self::from_unloaded_timeline(metadata)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct RemoteTimelineInfo {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
pub awaits_download: bool,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct TimelineInfo {
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub tenant_id: ZTenantId,
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
pub timeline_id: ZTimelineId,
|
||||
pub local: Option<LocalTimelineInfo>,
|
||||
pub remote: Option<RemoteTimelineInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PointInTime {
|
||||
pub timeline_id: ZTimelineId,
|
||||
@@ -333,38 +208,22 @@ fn bootstrap_timeline<R: Repository>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn get_local_timelines(
|
||||
tenant_id: ZTenantId,
|
||||
include_non_incremental_logical_size: bool,
|
||||
include_non_incremental_physical_size: bool,
|
||||
) -> Result<Vec<(ZTimelineId, LocalTimelineInfo)>> {
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
|
||||
.with_context(|| format!("Failed to get repo for tenant {}", tenant_id))?;
|
||||
let repo_timelines = repo.list_timelines();
|
||||
|
||||
let mut local_timeline_info = Vec::with_capacity(repo_timelines.len());
|
||||
for (timeline_id, repository_timeline) in repo_timelines {
|
||||
local_timeline_info.push((
|
||||
timeline_id,
|
||||
LocalTimelineInfo::from_repo_timeline(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&repository_timeline,
|
||||
include_non_incremental_logical_size,
|
||||
include_non_incremental_physical_size,
|
||||
)?,
|
||||
))
|
||||
}
|
||||
Ok(local_timeline_info)
|
||||
}
|
||||
|
||||
///
|
||||
/// Create a new timeline.
|
||||
///
|
||||
/// Returns the new timeline ID and reference to its Timeline object.
|
||||
///
|
||||
/// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
|
||||
/// the same timeline ID already exists, returns None. If `new_timeline_id` is not given,
|
||||
/// a new unique ID is generated.
|
||||
///
|
||||
pub(crate) fn create_timeline(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: ZTenantId,
|
||||
new_timeline_id: Option<ZTimelineId>,
|
||||
ancestor_timeline_id: Option<ZTimelineId>,
|
||||
mut ancestor_start_lsn: Option<Lsn>,
|
||||
) -> Result<Option<TimelineInfo>> {
|
||||
) -> Result<Option<(ZTimelineId, Arc<TimelineImpl>)>> {
|
||||
let new_timeline_id = new_timeline_id.unwrap_or_else(ZTimelineId::generate);
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
|
||||
|
||||
@@ -373,7 +232,7 @@ pub(crate) fn create_timeline(
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let new_timeline_info = match ancestor_timeline_id {
|
||||
let _new_timeline = match ancestor_timeline_id {
|
||||
Some(ancestor_timeline_id) => {
|
||||
let ancestor_timeline = repo
|
||||
.get_timeline_load(ancestor_timeline_id)
|
||||
@@ -401,26 +260,13 @@ pub(crate) fn create_timeline(
|
||||
}
|
||||
}
|
||||
|
||||
repo.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)?;
|
||||
// load the timeline into memory
|
||||
let loaded_timeline =
|
||||
tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?;
|
||||
LocalTimelineInfo::from_loaded_timeline(&*loaded_timeline, false, false)
|
||||
.context("cannot fill timeline info")?
|
||||
}
|
||||
None => {
|
||||
bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?;
|
||||
// load the timeline into memory
|
||||
let new_timeline =
|
||||
tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?;
|
||||
LocalTimelineInfo::from_loaded_timeline(&*new_timeline, false, false)
|
||||
.context("cannot fill timeline info")?
|
||||
repo.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)?
|
||||
}
|
||||
None => bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?,
|
||||
};
|
||||
Ok(Some(TimelineInfo {
|
||||
tenant_id,
|
||||
timeline_id: new_timeline_id,
|
||||
local: Some(new_timeline_info),
|
||||
remote: None,
|
||||
}))
|
||||
|
||||
// load the timeline into memory
|
||||
let loaded_timeline = tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?;
|
||||
|
||||
Ok(Some((new_timeline_id, loaded_timeline)))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user