Merge Repository and Tenant entities, rework tenant background jobs

This commit is contained in:
Kirill Bulatov
2022-09-07 18:01:49 +03:00
committed by Kirill Bulatov
parent f44afbaf62
commit 1a8c8b04d7
43 changed files with 615 additions and 563 deletions

View File

@@ -543,13 +543,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
match tenant_match.subcommand() {
Some(("list", _)) => {
for t in pageserver.tenant_list()? {
println!(
"{} {}",
t.id,
t.state
.map(|s| s.to_string())
.unwrap_or_else(|| String::from(""))
);
println!("{} {:?}", t.id, t.state);
}
}
Some(("create", create_match)) => {

View File

@@ -22,8 +22,8 @@ use std::time::SystemTime;
use tar::{Builder, EntryType, Header};
use tracing::*;
use crate::layered_repository::Timeline;
use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils::{generate_wal_segment, normalize_lsn, XLogFileName};

View File

@@ -3,8 +3,8 @@
//! A handy tool for debugging, that's all.
use anyhow::Result;
use clap::{App, Arg};
use pageserver::layered_repository::dump_layerfile_from_path;
use pageserver::page_cache;
use pageserver::tenant::dump_layerfile_from_path;
use pageserver::virtual_file;
use std::path::PathBuf;
use utils::project_git_version;

View File

@@ -182,7 +182,7 @@ fn initialize_config(
cfg_file_path.display()
);
} else {
// We're initializing the repo, so there's no config file yet
// We're initializing the tenant, so there's no config file yet
(
DEFAULT_CONFIG_FILE
.parse::<toml_edit::Document>()

View File

@@ -3,7 +3,7 @@
//! A handy tool for debugging, that's all.
use anyhow::Result;
use clap::{App, Arg};
use pageserver::layered_repository::metadata::TimelineMetadata;
use pageserver::tenant::metadata::TimelineMetadata;
use std::path::PathBuf;
use std::str::FromStr;
use utils::{lsn::Lsn, project_git_version};

View File

@@ -19,7 +19,7 @@ use utils::{
zid::{NodeId, ZTenantId, ZTimelineId},
};
use crate::layered_repository::TIMELINES_SEGMENT_NAME;
use crate::tenant::TIMELINES_SEGMENT_NAME;
use crate::tenant_config::{TenantConf, TenantConfOpt};
pub mod defaults {

View File

@@ -7,8 +7,7 @@ use utils::{
zid::{NodeId, ZTenantId, ZTimelineId},
};
// These enums are used in the API response fields.
use crate::tenant_mgr::TenantState;
use crate::tenant::TenantState;
#[serde_as]
#[derive(Serialize, Deserialize)]
@@ -108,7 +107,7 @@ impl TenantConfigRequest {
pub struct TenantInfo {
#[serde_as(as = "DisplayFromStr")]
pub id: ZTenantId,
pub state: Option<TenantState>,
pub state: TenantState,
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
pub has_in_progress_downloads: Option<bool>,
}

View File

@@ -489,6 +489,7 @@ components:
type: object
required:
- id
- state
properties:
id:
type: string
@@ -573,7 +574,6 @@ components:
required:
- last_record_lsn
- disk_consistent_lsn
- timeline_state
properties:
last_record_lsn:
type: string
@@ -581,8 +581,6 @@ components:
disk_consistent_lsn:
type: string
format: hex
timeline_state:
type: string
ancestor_timeline_id:
type: string
format: hex

View File

@@ -11,9 +11,9 @@ use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest,
};
use crate::layered_repository::Timeline;
use crate::storage_sync;
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
use crate::tenant::{TenantState, Timeline};
use crate::tenant_config::TenantConfOpt;
use crate::{config::PageServerConf, tenant_mgr, timelines};
use utils::{
@@ -132,12 +132,11 @@ fn list_local_timelines(
include_non_incremental_logical_size: bool,
include_non_incremental_physical_size: bool,
) -> Result<Vec<(ZTimelineId, LocalTimelineInfo)>> {
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
.with_context(|| format!("Failed to get repo for tenant {tenant_id}"))?;
let repo_timelines = repo.list_timelines();
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
let timelines = tenant.list_timelines();
let mut local_timeline_info = Vec::with_capacity(repo_timelines.len());
for (timeline_id, repository_timeline) in repo_timelines {
let mut local_timeline_info = Vec::with_capacity(timelines.len());
for (timeline_id, repository_timeline) in timelines {
local_timeline_info.push((
timeline_id,
local_timeline_info_from_timeline(
@@ -201,23 +200,31 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
query_param_present(&request, "include-non-incremental-physical-size");
check_permission(&request, Some(tenant_id))?;
let local_timeline_infos = tokio::task::spawn_blocking(move || {
let timelines = tokio::task::spawn_blocking(move || {
let _enter = info_span!("timeline_list", tenant = %tenant_id).entered();
list_local_timelines(
tenant_id,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
)
Ok::<_, anyhow::Error>(tenant_mgr::get_tenant(tenant_id, true)?.list_timelines())
})
.await
.map_err(ApiError::from_err)??;
let mut response_data = Vec::with_capacity(local_timeline_infos.len());
for (timeline_id, local_timeline_info) in local_timeline_infos {
let mut response_data = Vec::with_capacity(timelines.len());
for (timeline_id, timeline) in timelines {
let local = match local_timeline_info_from_timeline(
&timeline,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
) {
Ok(local) => Some(local),
Err(e) => {
error!("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}");
None
}
};
response_data.push(TimelineInfo {
tenant_id,
timeline_id,
local: Some(local_timeline_info),
local,
remote: get_state(&request)
.remote_index
.read()
@@ -259,28 +266,25 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
check_permission(&request, Some(tenant_id))?;
let (local_timeline_info, remote_timeline_info) = async {
// any error here will render local timeline as None
// XXX .in_current_span does not attach messages in spawn_blocking future to current future's span
let local_timeline_info = tokio::task::spawn_blocking(move || {
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let local_timeline = {
repo.get_timeline(timeline_id)
.as_ref()
.map(|timeline| {
local_timeline_info_from_timeline(
timeline,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
)
})
.transpose()?
};
Ok::<_, anyhow::Error>(local_timeline)
let timeline = tokio::task::spawn_blocking(move || {
tenant_mgr::get_tenant(tenant_id, true)?.get_timeline(timeline_id)
})
.await
.ok()
.and_then(|r| r.ok())
.flatten();
.map_err(ApiError::from_err)?;
let local_timeline_info = match timeline.and_then(|timeline| {
local_timeline_info_from_timeline(
&timeline,
include_non_incremental_logical_size,
include_non_incremental_physical_size,
)
}) {
Ok(local_info) => Some(local_info),
Err(e) => {
error!("Failed to get local timeline info: {e:#}");
None
}
};
let remote_timeline_info = {
let remote_index_read = get_state(&request).remote_index.read().await;
@@ -294,25 +298,26 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
awaits_download: remote_entry.awaits_download,
})
};
(local_timeline_info, remote_timeline_info)
Ok::<_, anyhow::Error>((local_timeline_info, remote_timeline_info))
}
.instrument(info_span!("timeline_detail", tenant = %tenant_id, timeline = %timeline_id))
.await;
.await?;
if local_timeline_info.is_none() && remote_timeline_info.is_none() {
return Err(ApiError::NotFound(format!(
Err(ApiError::NotFound(format!(
"Timeline {tenant_id}/{timeline_id} is not found neither locally nor remotely"
)));
)))
} else {
json_response(
StatusCode::OK,
TimelineInfo {
tenant_id,
timeline_id,
local: local_timeline_info,
remote: remote_timeline_info,
},
)
}
let timeline_info = TimelineInfo {
tenant_id,
timeline_id,
local: local_timeline_info,
remote: remote_timeline_info,
};
json_response(StatusCode::OK, timeline_info)
}
// TODO makes sense to provide tenant config right away the same way as it handled in tenant_create
@@ -320,10 +325,10 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
info!("Handling tenant attach {}", tenant_id);
info!("Handling tenant attach {tenant_id}");
tokio::task::spawn_blocking(move || {
if tenant_mgr::get_tenant_state(tenant_id).is_some() {
if tenant_mgr::get_tenant(tenant_id, false).is_ok() {
anyhow::bail!("Tenant is already present locally")
};
Ok(())
@@ -426,7 +431,7 @@ async fn timeline_delete_handler(request: Request<Body>) -> Result<Response<Body
let state = get_state(&request);
tenant_mgr::delete_timeline(tenant_id, timeline_id)
.instrument(info_span!("timeline_delete", tenant = %tenant_id))
.instrument(info_span!("timeline_delete", tenant = %tenant_id, timeline = %timeline_id))
.await
.map_err(ApiError::from_err)?;
@@ -478,7 +483,7 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
check_permission(&request, Some(tenant_id))?;
// if tenant is in progress of downloading it can be absent in global tenant map
let tenant_state = tokio::task::spawn_blocking(move || tenant_mgr::get_tenant_state(tenant_id))
let tenant = tokio::task::spawn_blocking(move || tenant_mgr::get_tenant(tenant_id, false))
.await
.map_err(ApiError::from_err)?;
@@ -494,13 +499,25 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
false
});
let tenant_state = match tenant {
Ok(tenant) => tenant.current_state(),
Err(e) => {
error!("Failed to get local tenant state: {e:#}");
if has_in_progress_downloads {
TenantState::Paused
} else {
TenantState::Broken
}
}
};
let current_physical_size =
match tokio::task::spawn_blocking(move || list_local_timelines(tenant_id, false, false))
.await
.map_err(ApiError::from_err)?
{
Err(err) => {
// Getting local timelines can fail when no local repo is on disk (e.g, when tenant data is being downloaded).
// Getting local timelines can fail when no local tenant directory is on disk (e.g, when tenant data is being downloaded).
// In that case, put a warning message into log and operate normally.
warn!("Failed to get local timelines for tenant {tenant_id}: {err}");
None

View File

@@ -11,9 +11,9 @@ use bytes::Bytes;
use tracing::*;
use walkdir::WalkDir;
use crate::layered_repository::Timeline;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
use postgres_ffi::v14::relfile_utils::*;

View File

@@ -3,7 +3,6 @@ pub mod config;
pub mod http;
pub mod import_datadir;
pub mod keyspace;
pub mod layered_repository;
pub mod metrics;
pub mod page_cache;
pub mod page_service;
@@ -13,6 +12,7 @@ pub mod reltag;
pub mod repository;
pub mod storage_sync;
pub mod task_mgr;
pub mod tenant;
pub mod tenant_config;
pub mod tenant_mgr;
pub mod tenant_tasks;
@@ -181,7 +181,7 @@ mod backoff_defaults_tests {
#[cfg(test)]
mod tests {
use crate::layered_repository::repo_harness::TIMELINE_ID;
use crate::tenant::harness::TIMELINE_ID;
use super::*;

View File

@@ -53,8 +53,8 @@ use utils::{
zid::{ZTenantId, ZTimelineId},
};
use crate::layered_repository::writeback_ephemeral_file;
use crate::repository::Key;
use crate::tenant::writeback_ephemeral_file;
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
const TEST_PAGE_CACHE_SIZE: usize = 50;

View File

@@ -34,13 +34,13 @@ use utils::{
use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar};
use crate::layered_repository::Timeline;
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::profiling::profpoint_start;
use crate::reltag::RelTag;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use crate::tenant_mgr;
use crate::CheckpointConfig;
use postgres_ffi::v14::xlog_utils::to_pg_timestamp;
@@ -477,8 +477,8 @@ impl PageServerHandler {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Create empty timeline
info!("creating new timeline");
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let timeline = repo.create_empty_timeline(timeline_id, base_lsn)?;
let timeline = tenant_mgr::get_tenant(tenant_id, true)?
.create_empty_timeline(timeline_id, base_lsn)?;
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
@@ -539,10 +539,7 @@ impl PageServerHandler {
) -> anyhow::Result<()> {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let timeline = repo
.get_timeline(timeline_id)
.with_context(|| format!("Timeline {timeline_id} was not found"))?;
let timeline = get_local_timeline(tenant_id, timeline_id)?;
ensure!(timeline.get_last_record_lsn() == start_lsn);
// TODO leave clean state on error. For now you can use detach to clean
@@ -770,7 +767,7 @@ impl PageServerHandler {
// when accessing management api supply None as an argument
// when using to authorize tenant pass corresponding tenant id
fn check_permission(&self, tenantid: Option<ZTenantId>) -> Result<()> {
fn check_permission(&self, tenant_id: Option<ZTenantId>) -> Result<()> {
if self.auth.is_none() {
// auth is set to Trust, nothing to check so just return ok
return Ok(());
@@ -782,7 +779,7 @@ impl PageServerHandler {
.claims
.as_ref()
.expect("claims presence already checked");
auth::check_permission(claims, tenantid)
auth::check_permission(claims, tenant_id)
}
}
@@ -809,7 +806,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
}
info!(
"jwt auth succeeded for scope: {:#?} by tenantid: {:?}",
"jwt auth succeeded for scope: {:#?} by tenant id: {:?}",
data.claims.scope, data.claims.tenant_id,
);
@@ -1013,8 +1010,8 @@ impl postgres_backend_async::Handler for PageServerHandler {
let (_, params_raw) = query_string.split_at("show ".len());
let params = params_raw.split(' ').collect::<Vec<_>>();
ensure!(params.len() == 1, "invalid param number for config command");
let tenantid = ZTenantId::from_str(params[0])?;
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
let tenant_id = ZTenantId::from_str(params[0])?;
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
pgb.write_message(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"checkpoint_distance"),
RowDescriptor::int8_col(b"checkpoint_timeout"),
@@ -1027,25 +1024,27 @@ impl postgres_backend_async::Handler for PageServerHandler {
RowDescriptor::int8_col(b"pitr_interval"),
]))?
.write_message(&BeMessage::DataRow(&[
Some(repo.get_checkpoint_distance().to_string().as_bytes()),
Some(tenant.get_checkpoint_distance().to_string().as_bytes()),
Some(
repo.get_checkpoint_timeout()
tenant
.get_checkpoint_timeout()
.as_secs()
.to_string()
.as_bytes(),
),
Some(repo.get_compaction_target_size().to_string().as_bytes()),
Some(tenant.get_compaction_target_size().to_string().as_bytes()),
Some(
repo.get_compaction_period()
tenant
.get_compaction_period()
.as_secs()
.to_string()
.as_bytes(),
),
Some(repo.get_compaction_threshold().to_string().as_bytes()),
Some(repo.get_gc_horizon().to_string().as_bytes()),
Some(repo.get_gc_period().as_secs().to_string().as_bytes()),
Some(repo.get_image_creation_threshold().to_string().as_bytes()),
Some(repo.get_pitr_interval().as_secs().to_string().as_bytes()),
Some(tenant.get_compaction_threshold().to_string().as_bytes()),
Some(tenant.get_gc_horizon().to_string().as_bytes()),
Some(tenant.get_gc_period().as_secs().to_string().as_bytes()),
Some(tenant.get_image_creation_threshold().to_string().as_bytes()),
Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()),
]))?
.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("do_gc ") {
@@ -1066,16 +1065,16 @@ impl postgres_backend_async::Handler for PageServerHandler {
let tenant_id = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
let timeline_id = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
let gc_horizon: u64 = caps
.get(4)
.map(|h| h.as_str().parse())
.unwrap_or_else(|| Ok(repo.get_gc_horizon()))?;
.unwrap_or_else(|| Ok(tenant.get_gc_horizon()))?;
// Use tenant's pitr setting
let pitr = repo.get_pitr_interval();
let result = repo.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)?;
let pitr = tenant.get_pitr_interval();
let result = tenant.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)?;
pgb.write_message(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"layers_total"),
RowDescriptor::int8_col(b"layers_needed_by_cutoff"),
@@ -1169,12 +1168,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
}
fn get_local_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> Result<Arc<Timeline>> {
tenant_mgr::get_repository_for_tenant(tenant_id)
.and_then(|repo| {
repo.get_timeline(timeline_id)
.context("No timeline in tenant's repository")
})
.with_context(|| format!("Could not get timeline {timeline_id} in tenant {tenant_id}"))
tenant_mgr::get_tenant(tenant_id, true).and_then(|tenant| tenant.get_timeline(timeline_id))
}
///

View File

@@ -7,9 +7,9 @@
//! Clarify that)
//!
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::layered_repository::Timeline;
use crate::reltag::{RelTag, SlruKind};
use crate::repository::*;
use crate::tenant::Timeline;
use crate::walrecord::ZenithWalRecord;
use anyhow::{bail, ensure, Result};
use bytes::{Buf, Bytes};
@@ -1398,16 +1398,12 @@ fn is_slru_block_key(key: Key) -> bool {
&& key.field6 != 0xffffffff // and not SlruSegSize
}
//
//-- Tests that should work the same with any Repository/Timeline implementation.
//
#[cfg(test)]
pub fn create_test_timeline(
repo: &crate::layered_repository::Repository,
tenant: &crate::tenant::Tenant,
timeline_id: utils::zid::ZTimelineId,
) -> Result<std::sync::Arc<Timeline>> {
let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?;
let tline = tenant.create_empty_timeline(timeline_id, Lsn(8))?;
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;
m.commit()?;

View File

@@ -46,10 +46,10 @@
//! Some time later, during pageserver checkpoints, in-memory data is flushed onto disk along with its metadata.
//! If the storage sync loop was successfully started before, pageserver schedules the layer files and the updated metadata file for upload, every time a layer is flushed to disk.
//! The uploads are disabled, if no remote storage configuration is provided (no sync loop is started this way either).
//! See [`crate::layered_repository`] for the upload calls and the adjacent logic.
//! See [`crate::tenant`] for the upload calls and the adjacent logic.
//!
//! Synchronization logic is able to communicate back with updated timeline sync states, [`crate::repository::TimelineSyncStatusUpdate`],
//! submitted via [`crate::tenant_mgr::apply_timeline_sync_status_updates`] function. Tenant manager applies corresponding timeline updates in pageserver's in-memory state.
//! Synchronization logic is able to communicate back with updated timeline sync states, submitted via [`crate::tenant_mgr::attach_local_tenants`] function.
//! Tenant manager applies corresponding timeline updates in pageserver's in-memory state.
//! Such submissions happen in two cases:
//! * once after the sync loop startup, to signal pageserver which timelines will be synchronized in the near future
//! * after every loop step, in case a timeline needs to be reloaded or evicted from pageserver's memory
@@ -171,11 +171,11 @@ use self::{
use crate::{
config::PageServerConf,
exponential_backoff,
layered_repository::metadata::{metadata_path, TimelineMetadata},
storage_sync::index::RemoteIndex,
task_mgr,
task_mgr::TaskKind,
task_mgr::BACKGROUND_RUNTIME,
tenant::metadata::{metadata_path, TimelineMetadata},
tenant_mgr::attach_local_tenants,
};
use crate::{
@@ -714,17 +714,17 @@ async fn storage_sync_loop(
};
if tenant_entry.has_in_progress_downloads() {
info!("Tenant {tenant_id} has pending timeline downloads, skipping repository registration");
info!("Tenant {tenant_id} has pending timeline downloads, skipping tenant registration");
continue;
} else {
info!(
"Tenant {tenant_id} download completed. Picking to register in repository"
"Tenant {tenant_id} download completed. Picking to register in tenant"
);
// Here we assume that if tenant has no in-progress downloads that
// means that it is the last completed timeline download that triggered
// sync status update. So we look at the index for available timelines
// and register them all at once in a repository for download
// to be submitted in a single operation to repository
// and register them all at once in a tenant for download
// to be submitted in a single operation to tenant
// so it can apply them at once to internal timeline map.
timelines_to_attach.0.insert(
tenant_id,
@@ -737,9 +737,7 @@ async fn storage_sync_loop(
}
drop(index_accessor);
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
if let Err(e) = attach_local_tenants(conf, &index, timelines_to_attach) {
error!("Failed to attach new timelines: {e:?}");
};
attach_local_tenants(conf, &index, timelines_to_attach);
}
}
ControlFlow::Break(()) => {
@@ -1038,13 +1036,7 @@ async fn update_local_metadata(
timeline_id,
} = sync_id;
tokio::task::spawn_blocking(move || {
crate::layered_repository::save_metadata(
conf,
timeline_id,
tenant_id,
&cloned_metadata,
true,
)
crate::tenant::save_metadata(conf, timeline_id, tenant_id, &cloned_metadata, true)
})
.await
.with_context(|| {
@@ -1411,12 +1403,12 @@ fn register_sync_status(
mod test_utils {
use utils::lsn::Lsn;
use crate::layered_repository::repo_harness::RepoHarness;
use crate::tenant::harness::TenantHarness;
use super::*;
pub(super) async fn create_local_timeline(
harness: &RepoHarness<'_>,
harness: &TenantHarness<'_>,
timeline_id: ZTimelineId,
filenames: &[&str],
metadata: TimelineMetadata,
@@ -1456,7 +1448,7 @@ mod test_utils {
#[cfg(test)]
mod tests {
use super::test_utils::dummy_metadata;
use crate::layered_repository::repo_harness::TIMELINE_ID;
use crate::tenant::harness::TIMELINE_ID;
use hex_literal::hex;
use utils::lsn::Lsn;

View File

@@ -112,8 +112,8 @@ mod tests {
use utils::lsn::Lsn;
use crate::{
layered_repository::repo_harness::{RepoHarness, TIMELINE_ID},
storage_sync::test_utils::{create_local_timeline, dummy_metadata},
tenant::harness::{TenantHarness, TIMELINE_ID},
};
use remote_storage::{LocalFs, RemoteStorage};
@@ -121,7 +121,7 @@ mod tests {
#[tokio::test]
async fn delete_timeline_negative() -> anyhow::Result<()> {
let harness = RepoHarness::create("delete_timeline_negative")?;
let harness = TenantHarness::create("delete_timeline_negative")?;
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let storage = GenericRemoteStorage::new(LocalFs::new(
@@ -154,7 +154,7 @@ mod tests {
#[tokio::test]
async fn delete_timeline() -> anyhow::Result<()> {
let harness = RepoHarness::create("delete_timeline")?;
let harness = TenantHarness::create("delete_timeline")?;
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);

View File

@@ -17,7 +17,7 @@ use tokio::{
use tracing::{debug, error, info, warn};
use crate::{
config::PageServerConf, layered_repository::metadata::metadata_path, storage_sync::SyncTask,
config::PageServerConf, storage_sync::SyncTask, tenant::metadata::metadata_path,
TEMP_FILE_SUFFIX,
};
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
@@ -425,18 +425,18 @@ mod tests {
use utils::lsn::Lsn;
use crate::{
layered_repository::repo_harness::{RepoHarness, TIMELINE_ID},
storage_sync::{
index::RelativePath,
test_utils::{create_local_timeline, dummy_metadata},
},
tenant::harness::{TenantHarness, TIMELINE_ID},
};
use super::*;
#[tokio::test]
async fn download_timeline() -> anyhow::Result<()> {
let harness = RepoHarness::create("download_timeline")?;
let harness = TenantHarness::create("download_timeline")?;
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
@@ -537,7 +537,7 @@ mod tests {
#[tokio::test]
async fn download_timeline_negatives() -> anyhow::Result<()> {
let harness = RepoHarness::create("download_timeline_negatives")?;
let harness = TenantHarness::create("download_timeline_negatives")?;
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let storage = GenericRemoteStorage::new(LocalFs::new(
@@ -596,7 +596,7 @@ mod tests {
#[tokio::test]
async fn test_download_index_part() -> anyhow::Result<()> {
let harness = RepoHarness::create("test_download_index_part")?;
let harness = TenantHarness::create("test_download_index_part")?;
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let storage = GenericRemoteStorage::new(LocalFs::new(

View File

@@ -15,7 +15,7 @@ use serde_with::{serde_as, DisplayFromStr};
use tokio::sync::RwLock;
use tracing::log::warn;
use crate::{config::PageServerConf, layered_repository::metadata::TimelineMetadata};
use crate::{config::PageServerConf, tenant::metadata::TimelineMetadata};
use utils::{
lsn::Lsn,
zid::{ZTenantId, ZTenantTimelineId, ZTimelineId},
@@ -340,11 +340,11 @@ mod tests {
use std::collections::BTreeSet;
use super::*;
use crate::layered_repository::repo_harness::{RepoHarness, TIMELINE_ID};
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
#[test]
fn index_part_conversion() {
let harness = RepoHarness::create("index_part_conversion").unwrap();
let harness = TenantHarness::create("index_part_conversion").unwrap();
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let metadata =
TimelineMetadata::new(Lsn(5).align(), Some(Lsn(4)), None, Lsn(3), Lsn(2), Lsn(1));
@@ -462,7 +462,7 @@ mod tests {
#[test]
fn index_part_conversion_negatives() {
let harness = RepoHarness::create("index_part_conversion_negatives").unwrap();
let harness = TenantHarness::create("index_part_conversion_negatives").unwrap();
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let metadata =
TimelineMetadata::new(Lsn(5).align(), Some(Lsn(4)), None, Lsn(3), Lsn(2), Lsn(1));

View File

@@ -15,9 +15,7 @@ use super::{
LayersUpload, SyncData, SyncQueue,
};
use crate::metrics::NO_LAYERS_UPLOAD;
use crate::{
config::PageServerConf, layered_repository::metadata::metadata_path, storage_sync::SyncTask,
};
use crate::{config::PageServerConf, storage_sync::SyncTask, tenant::metadata::metadata_path};
/// Serializes and uploads the given index part data to the remote storage.
pub(super) async fn upload_index_part(
@@ -202,18 +200,18 @@ mod tests {
use utils::lsn::Lsn;
use crate::{
layered_repository::repo_harness::{RepoHarness, TIMELINE_ID},
storage_sync::{
index::RelativePath,
test_utils::{create_local_timeline, dummy_metadata},
},
tenant::harness::{TenantHarness, TIMELINE_ID},
};
use super::{upload_index_part, *};
#[tokio::test]
async fn regular_layer_upload() -> anyhow::Result<()> {
let harness = RepoHarness::create("regular_layer_upload")?;
let harness = TenantHarness::create("regular_layer_upload")?;
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
@@ -301,7 +299,7 @@ mod tests {
// Currently, GC can run between upload retries, removing local layers scheduled for upload. Test this scenario.
#[tokio::test]
async fn layer_upload_after_local_fs_update() -> anyhow::Result<()> {
let harness = RepoHarness::create("layer_upload_after_local_fs_update")?;
let harness = TenantHarness::create("layer_upload_after_local_fs_update")?;
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
@@ -396,7 +394,7 @@ mod tests {
#[tokio::test]
async fn test_upload_index_part() -> anyhow::Result<()> {
let harness = RepoHarness::create("test_upload_index_part")?;
let harness = TenantHarness::create("test_upload_index_part")?;
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
let storage = GenericRemoteStorage::new(LocalFs::new(

View File

@@ -1,6 +1,6 @@
//!
//! Timeline repository implementation that keeps old data in files on disk, and
//! the recent changes in memory. See layered_repository/*_layer.rs files.
//! the recent changes in memory. See tenant/*_layer.rs files.
//! The functions here are responsible for locating the correct layer for the
//! get/put call, walking back the timeline branching history as needed.
//!
@@ -12,6 +12,7 @@
//!
use anyhow::{bail, ensure, Context, Result};
use tokio::sync::watch;
use tracing::*;
use std::cmp::min;
@@ -71,24 +72,26 @@ use storage_layer::Layer;
pub use timeline::Timeline;
// re-export this function so that page_cache.rs can use it.
pub use crate::layered_repository::ephemeral_file::writeback as writeback_ephemeral_file;
pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file;
// re-export for use in storage_sync.rs
pub use crate::layered_repository::metadata::save_metadata;
pub use crate::tenant::metadata::save_metadata;
// re-export for use in walreceiver
pub use crate::layered_repository::timeline::WalReceiverInfo;
pub use crate::tenant::timeline::WalReceiverInfo;
/// Parts of the `.neon/tenants/<tenantid>/timelines/<timelineid>` directory prefix.
pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
///
/// Repository consists of multiple timelines. Keep them in a hash table.
/// Tenant consists of multiple timelines. Keep them in a hash table.
///
pub struct Repository {
pub struct Tenant {
// Global pageserver config parameters
pub conf: &'static PageServerConf,
state: watch::Sender<TenantState>,
// Overridden tenant-specific config parameters.
// We keep TenantConfOpt sturct here to preserve the information
// about parameters that are not set.
@@ -114,17 +117,40 @@ pub struct Repository {
upload_layers: bool,
}
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
/// Tenant is fully operational, its background jobs might be running or not.
Active { background_jobs_running: bool },
/// A tenant is recognized by pageserver, but not yet ready to operate:
/// e.g. not present locally and being downloaded or being read into memory from the file system.
Paused,
/// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated.
Broken,
}
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
impl Repository {
impl Tenant {
/// Get Timeline handle for given zenith timeline ID.
/// This function is idempotent. It doesn't change internal state in any way.
pub fn get_timeline(&self, timeline_id: ZTimelineId) -> Option<Arc<Timeline>> {
self.timelines.lock().unwrap().get(&timeline_id).cloned()
pub fn get_timeline(&self, timeline_id: ZTimelineId) -> anyhow::Result<Arc<Timeline>> {
self.timelines
.lock()
.unwrap()
.get(&timeline_id)
.with_context(|| {
format!(
"Timeline {} was not found for tenant {}",
timeline_id,
self.tenant_id()
)
})
.map(Arc::clone)
}
/// Lists timelines the repository contains.
/// Up to repository's implementation to omit certain timelines that ar not considered ready for use.
/// Lists timelines the tenant contains.
/// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
pub fn list_timelines(&self) -> Vec<(ZTimelineId, Arc<Timeline>)> {
self.timelines
.lock()
@@ -425,6 +451,54 @@ impl Repository {
pub fn get_remote_index(&self) -> &RemoteIndex {
&self.remote_index
}
pub fn current_state(&self) -> TenantState {
*self.state.borrow()
}
pub fn is_active(&self) -> bool {
matches!(self.current_state(), TenantState::Active { .. })
}
pub fn should_run_tasks(&self) -> bool {
matches!(
self.current_state(),
TenantState::Active {
background_jobs_running: true
}
)
}
/// Changes tenant status to active, if it was not broken before.
/// Otherwise, ignores the state change, logging an error.
pub fn activate(&self, enable_background_jobs: bool) {
self.set_state(TenantState::Active {
background_jobs_running: enable_background_jobs,
});
}
pub fn set_state(&self, new_state: TenantState) {
match (self.current_state(), new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
}
(TenantState::Broken, _) => {
error!("Ignoring state update {new_state:?} for broken tenant");
}
(_, new_state) => {
self.state.send_replace(new_state);
if self.should_run_tasks() {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
crate::tenant_tasks::start_background_loops(self.tenant_id);
}
}
}
}
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
self.state.subscribe()
}
}
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
@@ -471,7 +545,7 @@ fn tree_sort_timelines(
}
/// Private functions
impl Repository {
impl Tenant {
pub fn get_checkpoint_distance(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
@@ -609,8 +683,9 @@ impl Repository {
tenant_id: ZTenantId,
remote_index: RemoteIndex,
upload_layers: bool,
) -> Repository {
Repository {
) -> Tenant {
let (state, _) = watch::channel(TenantState::Paused);
Tenant {
tenant_id,
conf,
tenant_conf: Arc::new(RwLock::new(tenant_conf)),
@@ -619,6 +694,7 @@ impl Repository {
walredo_mgr,
remote_index,
upload_layers,
state,
}
}
@@ -848,7 +924,7 @@ impl Repository {
// compaction (both require `layer_removal_cs` lock),
// but the GC iteration can run concurrently with branch creation.
//
// See comments in [`Repository::branch_timeline`] for more information
// See comments in [`Tenant::branch_timeline`] for more information
// about why branch creation task can run concurrently with timeline's GC iteration.
for timeline in gc_timelines {
if task_mgr::is_shutdown_requested() {
@@ -881,7 +957,7 @@ impl Repository {
}
}
impl Drop for Repository {
impl Drop for Tenant {
fn drop(&mut self) {
remove_tenant_metrics(&self.tenant_id);
}
@@ -910,7 +986,7 @@ pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> Result<()> {
}
#[cfg(test)]
pub mod repo_harness {
pub mod harness {
use bytes::{Bytes, BytesMut};
use once_cell::sync::Lazy;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
@@ -920,8 +996,8 @@ pub mod repo_harness {
use crate::storage_sync::index::RemoteIndex;
use crate::{
config::PageServerConf,
layered_repository::Repository,
repository::Key,
tenant::Tenant,
walrecord::ZenithWalRecord,
walredo::{WalRedoError, WalRedoManager},
};
@@ -968,7 +1044,7 @@ pub mod repo_harness {
}
}
pub struct RepoHarness<'a> {
pub struct TenantHarness<'a> {
pub conf: &'static PageServerConf,
pub tenant_conf: TenantConf,
pub tenant_id: ZTenantId,
@@ -979,7 +1055,7 @@ pub mod repo_harness {
),
}
impl<'a> RepoHarness<'a> {
impl<'a> TenantHarness<'a> {
pub fn create(test_name: &'static str) -> Result<Self> {
Self::create_internal(test_name, false)
}
@@ -1016,14 +1092,14 @@ pub mod repo_harness {
})
}
pub fn load(&self) -> Repository {
self.try_load().expect("failed to load test repo")
pub fn load(&self) -> Tenant {
self.try_load().expect("failed to load test tenant")
}
pub fn try_load(&self) -> Result<Repository> {
pub fn try_load(&self) -> Result<Tenant> {
let walredo_mgr = Arc::new(TestRedoManager);
let repo = Repository::new(
let tenant = Tenant::new(
self.conf,
TenantConfOpt::from(self.tenant_conf),
walredo_mgr,
@@ -1031,7 +1107,7 @@ pub mod repo_harness {
RemoteIndex::default(),
false,
);
// populate repo with locally available timelines
// populate tenant with locally available timelines
let mut timelines_to_load = HashMap::new();
for timeline_dir_entry in fs::read_dir(self.conf.timelines_path(&self.tenant_id))
.expect("should be able to read timelines dir")
@@ -1043,12 +1119,13 @@ pub mod repo_harness {
.unwrap()
.to_string_lossy()
.parse()?;
let timeline_metadata = load_metadata(self.conf, timeline_id, self.tenant_id)?;
timelines_to_load.insert(timeline_id, timeline_metadata);
}
repo.init_attach_timelines(timelines_to_load)?;
tenant.init_attach_timelines(timelines_to_load)?;
Ok(repo)
Ok(tenant)
}
pub fn timeline_path(&self, timeline_id: &ZTimelineId) -> PathBuf {
@@ -1110,8 +1187,8 @@ mod tests {
use super::metadata::METADATA_FILE_NAME;
use super::*;
use crate::keyspace::KeySpaceAccum;
use crate::layered_repository::repo_harness::*;
use crate::repository::{Key, Value};
use crate::tenant::harness::*;
use bytes::BytesMut;
use hex_literal::hex;
use once_cell::sync::Lazy;
@@ -1122,8 +1199,8 @@ mod tests {
#[test]
fn test_basic() -> Result<()> {
let repo = RepoHarness::create("test_basic")?.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tenant = TenantHarness::create("test_basic")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
@@ -1144,10 +1221,10 @@ mod tests {
#[test]
fn no_duplicate_timelines() -> Result<()> {
let repo = RepoHarness::create("no_duplicate_timelines")?.load();
let _ = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tenant = TenantHarness::create("no_duplicate_timelines")?.load();
let _ = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
match repo.create_empty_timeline(TIMELINE_ID, Lsn(0)) {
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0)) {
Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!(
e.to_string(),
@@ -1170,8 +1247,8 @@ mod tests {
///
#[test]
fn test_branch() -> Result<()> {
let repo = RepoHarness::create("test_branch")?.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tenant = TenantHarness::create("test_branch")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let writer = tline.writer();
use std::str::from_utf8;
@@ -1193,8 +1270,8 @@ mod tests {
//assert_current_logical_size(&tline, Lsn(0x40));
// Branch the history, modify relation differently on the new timeline
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x30)))?;
let newtline = repo
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x30)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
let new_writer = newtline.writer();
@@ -1263,19 +1340,20 @@ mod tests {
#[test]
fn test_prohibit_branch_creation_on_garbage_collected_data() -> Result<()> {
let repo =
RepoHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
// FIXME: this doesn't actually remove any layer currently, given how the checkpointing
// and compaction works. But it does set the 'cutoff' point so that the cross check
// below should fail.
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
// try to branch at lsn 25, should fail because we already garbage collected the data
match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) {
match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) {
Ok(_) => panic!("branching should have failed"),
Err(err) => {
assert!(err.to_string().contains("invalid branch start lsn"));
@@ -1292,11 +1370,12 @@ mod tests {
#[test]
fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> Result<()> {
let repo = RepoHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load();
let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load();
repo.create_empty_timeline(TIMELINE_ID, Lsn(0x50))?;
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x50))?;
// try to branch at lsn 0x25, should fail because initdb lsn is 0x50
match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) {
match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) {
Ok(_) => panic!("branching should have failed"),
Err(err) => {
assert!(&err.to_string().contains("invalid branch start lsn"));
@@ -1336,36 +1415,37 @@ mod tests {
#[test]
fn test_retain_data_in_parent_which_is_needed_for_child() -> Result<()> {
let repo =
RepoHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tenant =
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = repo
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok());
Ok(())
}
#[test]
fn test_parent_keeps_data_forever_after_branching() -> Result<()> {
let repo = RepoHarness::create("test_parent_keeps_data_forever_after_branching")?.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tenant =
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = repo
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
make_some_layers(newtline.as_ref(), Lsn(0x60))?;
// run gc on parent
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
// Check that the data is still accessible on the branch.
assert_eq!(
@@ -1379,16 +1459,17 @@ mod tests {
#[test]
fn timeline_load() -> Result<()> {
const TEST_NAME: &str = "timeline_load";
let harness = RepoHarness::create(TEST_NAME)?;
let harness = TenantHarness::create(TEST_NAME)?;
{
let repo = harness.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0x8000))?;
let tenant = harness.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x8000))?;
make_some_layers(tline.as_ref(), Lsn(0x8000))?;
tline.checkpoint(CheckpointConfig::Forced)?;
}
let repo = harness.load();
repo.get_timeline(TIMELINE_ID)
let tenant = harness.load();
tenant
.get_timeline(TIMELINE_ID)
.expect("cannot load timeline");
Ok(())
@@ -1397,18 +1478,18 @@ mod tests {
#[test]
fn timeline_load_with_ancestor() -> Result<()> {
const TEST_NAME: &str = "timeline_load_with_ancestor";
let harness = RepoHarness::create(TEST_NAME)?;
let harness = TenantHarness::create(TEST_NAME)?;
// create two timelines
{
let repo = harness.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tenant = harness.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
tline.checkpoint(CheckpointConfig::Forced)?;
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
let newtline = repo
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID)
.expect("Should have a local timeline");
@@ -1417,14 +1498,14 @@ mod tests {
}
// check that both of them are initially unloaded
let repo = harness.load();
let tenant = harness.load();
// check that both, child and ancestor are loaded
let _child_tline = repo
let _child_tline = tenant
.get_timeline(NEW_TIMELINE_ID)
.expect("cannot get child timeline loaded");
let _ancestor_tline = repo
let _ancestor_tline = tenant
.get_timeline(TIMELINE_ID)
.expect("cannot get ancestor timeline loaded");
@@ -1434,11 +1515,11 @@ mod tests {
#[test]
fn corrupt_metadata() -> Result<()> {
const TEST_NAME: &str = "corrupt_metadata";
let harness = RepoHarness::create(TEST_NAME)?;
let repo = harness.load();
let harness = TenantHarness::create(TEST_NAME)?;
let tenant = harness.load();
repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
drop(repo);
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
drop(tenant);
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
@@ -1473,8 +1554,8 @@ mod tests {
#[test]
fn test_images() -> Result<()> {
let repo = RepoHarness::create("test_images")?.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tenant = TenantHarness::create("test_images")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
@@ -1523,8 +1604,8 @@ mod tests {
//
#[test]
fn test_bulk_insert() -> Result<()> {
let repo = RepoHarness::create("test_bulk_insert")?.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tenant = TenantHarness::create("test_bulk_insert")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let mut lsn = Lsn(0x10);
@@ -1563,8 +1644,8 @@ mod tests {
#[test]
fn test_random_updates() -> Result<()> {
let repo = RepoHarness::create("test_random_updates")?.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tenant = TenantHarness::create("test_random_updates")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
const NUM_KEYS: usize = 1000;
@@ -1633,8 +1714,8 @@ mod tests {
#[test]
fn test_traverse_branches() -> Result<()> {
let repo = RepoHarness::create("test_traverse_branches")?.load();
let mut tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tenant = TenantHarness::create("test_traverse_branches")?.load();
let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
const NUM_KEYS: usize = 1000;
@@ -1667,8 +1748,8 @@ mod tests {
let mut tline_id = TIMELINE_ID;
for _ in 0..50 {
let new_tline_id = ZTimelineId::generate();
repo.branch_timeline(tline_id, new_tline_id, Some(lsn))?;
tline = repo
tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?;
tline = tenant
.get_timeline(new_tline_id)
.expect("Should have the branched timeline");
tline_id = new_tline_id;
@@ -1712,8 +1793,8 @@ mod tests {
#[test]
fn test_traverse_ancestors() -> Result<()> {
let repo = RepoHarness::create("test_traverse_ancestors")?.load();
let mut tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tenant = TenantHarness::create("test_traverse_ancestors")?.load();
let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
const NUM_KEYS: usize = 100;
const NUM_TLINES: usize = 50;
@@ -1728,8 +1809,8 @@ mod tests {
#[allow(clippy::needless_range_loop)]
for idx in 0..NUM_TLINES {
let new_tline_id = ZTimelineId::generate();
repo.branch_timeline(tline_id, new_tline_id, Some(lsn))?;
tline = repo
tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?;
tline = tenant
.get_timeline(new_tline_id)
.expect("Should have the branched timeline");
tline_id = new_tline_id;

View File

@@ -11,8 +11,8 @@
//! len < 128: 0XXXXXXX
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use crate::layered_repository::block_io::{BlockCursor, BlockReader};
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::{BlockCursor, BlockReader};
use std::cmp::min;
use std::io::{Error, ErrorKind};

View File

@@ -60,7 +60,7 @@ where
/// the underlying BlockReader. For example:
///
/// ```no_run
/// # use pageserver::layered_repository::block_io::{BlockReader, FileBlockReader};
/// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
/// # let reader: FileBlockReader<std::fs::File> = todo!();
/// let cursor = reader.block_cursor();
/// let buf = cursor.read_blk(1);

View File

@@ -24,15 +24,13 @@
//! "values" part.
//!
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::layered_repository::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::layered_repository::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, ValueReconstructResult, ValueReconstructState,
};
use crate::page_cache::{PageReadGuard, PAGE_SZ};
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::filename::{DeltaFileName, PathOrConf};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::virtual_file::VirtualFile;
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};

View File

@@ -25,7 +25,7 @@ use std::{cmp::Ordering, io, result};
use thiserror::Error;
use tracing::error;
use crate::layered_repository::block_io::{BlockReader, BlockWriter};
use crate::tenant::block_io::{BlockReader, BlockWriter};
// The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
pub const VALUE_SZ: usize = 5;

View File

@@ -2,11 +2,11 @@
//! used to keep in-memory layers spilled on disk.
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::BlobWriter;
use crate::layered_repository::block_io::BlockReader;
use crate::page_cache;
use crate::page_cache::PAGE_SZ;
use crate::page_cache::{ReadBufResult, WriteBufResult};
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::BlockReader;
use crate::virtual_file::VirtualFile;
use once_cell::sync::Lazy;
use std::cmp::min;
@@ -330,13 +330,13 @@ fn to_io_error(e: anyhow::Error, context: &str) -> io::Error {
#[cfg(test)]
mod tests {
use super::*;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter};
use crate::layered_repository::block_io::BlockCursor;
use crate::tenant::blob_io::{BlobCursor, BlobWriter};
use crate::tenant::block_io::BlockCursor;
use rand::{seq::SliceRandom, thread_rng, RngCore};
use std::fs;
use std::str::FromStr;
fn repo_harness(
fn harness(
test_name: &str,
) -> Result<(&'static PageServerConf, ZTenantId, ZTimelineId), io::Error> {
let repo_dir = PageServerConf::test_repo_dir(test_name);
@@ -368,7 +368,7 @@ mod tests {
#[test]
fn test_ephemeral_files() -> Result<(), io::Error> {
let (conf, tenantid, timelineid) = repo_harness("ephemeral_files")?;
let (conf, tenantid, timelineid) = harness("ephemeral_files")?;
let file_a = EphemeralFile::create(conf, tenantid, timelineid)?;
@@ -399,7 +399,7 @@ mod tests {
#[test]
fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenantid, timelineid) = repo_harness("ephemeral_blobs")?;
let (conf, tenantid, timelineid) = harness("ephemeral_blobs")?;
let mut file = EphemeralFile::create(conf, tenantid, timelineid)?;

View File

@@ -20,15 +20,13 @@
//! mapping from Key to an offset in the "values" part. The
//! actual page images are stored in the "values" part.
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::layered_repository::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::layered_repository::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::layered_repository::filename::{ImageFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, ValueReconstructResult, ValueReconstructState,
};
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::filename::{ImageFileName, PathOrConf};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::virtual_file::VirtualFile;
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{bail, ensure, Context, Result};

View File

@@ -5,14 +5,12 @@
//! its position in the file, is kept in memory, though.
//!
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter};
use crate::layered_repository::block_io::BlockReader;
use crate::layered_repository::delta_layer::{DeltaLayer, DeltaLayerWriter};
use crate::layered_repository::ephemeral_file::EphemeralFile;
use crate::layered_repository::storage_layer::{
Layer, ValueReconstructResult, ValueReconstructState,
};
use crate::repository::{Key, Value};
use crate::tenant::blob_io::{BlobCursor, BlobWriter};
use crate::tenant::block_io::BlockReader;
use crate::tenant::delta_layer::{DeltaLayer, DeltaLayerWriter};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::walrecord;
use anyhow::{bail, ensure, Result};
use std::cell::RefCell;

View File

@@ -10,11 +10,11 @@
//! corresponding files are written to disk.
//!
use crate::layered_repository::inmemory_layer::InMemoryLayer;
use crate::layered_repository::storage_layer::Layer;
use crate::layered_repository::storage_layer::{range_eq, range_overlaps};
use crate::metrics::NUM_ONDISK_LAYERS;
use crate::repository::Key;
use crate::tenant::inmemory_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use crate::tenant::storage_layer::{range_eq, range_overlaps};
use anyhow::Result;
use std::collections::VecDeque;
use std::ops::Range;

View File

@@ -1,4 +1,4 @@
//! Every image of a certain timeline from [`crate::layered_repository::Repository`]
//! Every image of a certain timeline from [`crate::tenant::Tenant`]
//! has a metadata that needs to be stored persistently.
//!
//! Later, the file gets is used in [`crate::remote_storage::storage_sync`] as a part of
@@ -216,7 +216,7 @@ pub fn save_metadata(
#[cfg(test)]
mod tests {
use super::*;
use crate::layered_repository::repo_harness::TIMELINE_ID;
use crate::tenant::harness::TIMELINE_ID;
#[test]
fn metadata_serializes_correctly() {

View File

@@ -17,7 +17,7 @@ use std::sync::atomic::{self, AtomicBool, AtomicI64, Ordering as AtomicOrdering}
use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError};
use std::time::{Duration, Instant, SystemTime};
use crate::layered_repository::{
use crate::tenant::{
delta_layer::{DeltaLayer, DeltaLayerWriter},
ephemeral_file::is_ephemeral_file,
filename::{DeltaFileName, ImageFileName},
@@ -118,7 +118,7 @@ pub struct Timeline {
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
/// and [`Repository::delete_timeline`].
/// and [`Tenant::delete_timeline`].
layer_removal_cs: Mutex<()>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected

View File

@@ -1,26 +1,31 @@
//! This module acts as a switchboard to access different repositories managed by this
//! page server.
use crate::config::PageServerConf;
use crate::http::models::TenantInfo;
use crate::layered_repository::ephemeral_file::is_ephemeral_file;
use crate::layered_repository::metadata::{TimelineMetadata, METADATA_FILE_NAME};
use crate::layered_repository::Repository;
use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex};
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
use crate::task_mgr::{self, TaskKind};
use crate::tenant_config::{TenantConf, TenantConfOpt};
use crate::walredo::{PostgresRedoManager, WalRedoManager};
use crate::{TenantTimelineValues, TEMP_FILE_SUFFIX};
use anyhow::Context;
use remote_storage::{path_with_suffix_extension, GenericRemoteStorage};
use std::collections::{hash_map, HashMap, HashSet};
use std::ffi::OsStr;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::Context;
use tracing::*;
use remote_storage::{path_with_suffix_extension, GenericRemoteStorage};
use crate::config::PageServerConf;
use crate::http::models::TenantInfo;
use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex};
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::{
ephemeral_file::is_ephemeral_file,
metadata::{TimelineMetadata, METADATA_FILE_NAME},
Tenant, TenantState,
};
use crate::tenant_config::{TenantConf, TenantConfOpt};
use crate::walredo::PostgresRedoManager;
use crate::{TenantTimelineValues, TEMP_FILE_SUFFIX};
use utils::crashsafe_dir;
use utils::zid::{ZTenantId, ZTimelineId};
@@ -28,64 +33,31 @@ mod tenants_state {
use once_cell::sync::Lazy;
use std::{
collections::HashMap,
sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
};
use utils::zid::ZTenantId;
use crate::tenant_mgr::Tenant;
use crate::tenant::Tenant;
static TENANTS: Lazy<RwLock<HashMap<ZTenantId, Tenant>>> =
static TENANTS: Lazy<RwLock<HashMap<ZTenantId, Arc<Tenant>>>> =
Lazy::new(|| RwLock::new(HashMap::new()));
pub(super) fn read_tenants() -> RwLockReadGuard<'static, HashMap<ZTenantId, Tenant>> {
pub(super) fn read_tenants() -> RwLockReadGuard<'static, HashMap<ZTenantId, Arc<Tenant>>> {
TENANTS
.read()
.expect("Failed to read() tenants lock, it got poisoned")
}
pub(super) fn write_tenants() -> RwLockWriteGuard<'static, HashMap<ZTenantId, Tenant>> {
pub(super) fn write_tenants() -> RwLockWriteGuard<'static, HashMap<ZTenantId, Arc<Tenant>>> {
TENANTS
.write()
.expect("Failed to write() tenants lock, it got poisoned")
}
}
struct Tenant {
state: TenantState,
/// Contains in-memory state, including the timeline that might not yet flushed on disk or loaded form disk.
repo: Arc<Repository>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {
// This tenant exists on local disk, and the layer map has been loaded into memory.
// The local disk might have some newer files that don't exist in cloud storage yet.
Active,
// Tenant is active, but there is no walreceiver connection.
Idle,
// This tenant exists on local disk, and the layer map has been loaded into memory.
// The local disk might have some newer files that don't exist in cloud storage yet.
// The tenant cannot be accessed anymore for any reason, but graceful shutdown.
Stopping,
// Something went wrong loading the tenant state
Broken,
}
impl std::fmt::Display for TenantState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Active => f.write_str("Active"),
Self::Idle => f.write_str("Idle"),
Self::Stopping => f.write_str("Stopping"),
Self::Broken => f.write_str("Broken"),
}
}
}
/// Initialize repositories with locally available timelines.
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
/// are scheduled for download and added to the repository once download is completed.
/// are scheduled for download and added to the tenant once download is completed.
pub fn init_tenant_mgr(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
@@ -128,7 +100,7 @@ pub fn init_tenant_mgr(
)
};
attach_local_tenants(conf, &remote_index, tenants_to_attach)?;
attach_local_tenants(conf, &remote_index, tenants_to_attach);
Ok(remote_index)
}
@@ -141,7 +113,7 @@ pub fn attach_local_tenants(
conf: &'static PageServerConf,
remote_index: &RemoteIndex,
tenants_to_attach: TenantTimelineValues<TimelineMetadata>,
) -> anyhow::Result<()> {
) {
let _entered = info_span!("attach_local_tenants").entered();
let number_of_tenants = tenants_to_attach.0.len();
@@ -152,104 +124,109 @@ pub fn attach_local_tenants(
);
debug!("Timelines to attach: {local_timelines:?}");
let repository = load_local_repo(conf, tenant_id, remote_index)
.context("Failed to load repository for tenant")?;
let repo = Arc::clone(&repository);
let tenant = load_local_tenant(conf, tenant_id, remote_index);
{
match tenants_state::write_tenants().entry(tenant_id) {
hash_map::Entry::Occupied(_) => {
anyhow::bail!("Cannot attach tenant {tenant_id}: there's already an entry in the tenant state");
error!("Cannot attach tenant {tenant_id}: there's already an entry in the tenant state");
continue;
}
hash_map::Entry::Vacant(v) => {
v.insert(Tenant {
state: TenantState::Idle,
repo,
});
v.insert(Arc::clone(&tenant));
}
}
}
if tenant.current_state() == TenantState::Broken {
warn!("Skipping timeline load for broken tenant {tenant_id}")
} else {
let has_timelines = !local_timelines.is_empty();
match tenant.init_attach_timelines(local_timelines) {
Ok(()) => {
info!("successfully loaded local timelines for tenant {tenant_id}");
tenant.activate(has_timelines);
}
Err(e) => {
error!("Failed to attach tenant timelines: {e:?}");
tenant.set_state(TenantState::Broken);
}
}
}
// XXX: current timeline init enables walreceiver that looks for tenant in the state, so insert the tenant entry before
repository
.init_attach_timelines(local_timelines)
.context("Failed to attach timelines for tenant")?;
}
info!("Processed {number_of_tenants} local tenants during attach");
Ok(())
info!("Processed {number_of_tenants} local tenants during attach")
}
fn load_local_repo(
fn load_local_tenant(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
remote_index: &RemoteIndex,
) -> anyhow::Result<Arc<Repository>> {
let repository = Repository::new(
) -> Arc<Tenant> {
let tenant = Arc::new(Tenant::new(
conf,
TenantConfOpt::default(),
Arc::new(PostgresRedoManager::new(conf, tenant_id)),
tenant_id,
remote_index.clone(),
conf.remote_storage_config.is_some(),
);
let tenant_conf = Repository::load_tenant_config(conf, tenant_id)?;
repository.update_tenant_config(tenant_conf);
Ok(Arc::new(repository))
));
match Tenant::load_tenant_config(conf, tenant_id) {
Ok(tenant_conf) => {
tenant.update_tenant_config(tenant_conf);
tenant.activate(false);
}
Err(e) => {
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
tenant.set_state(TenantState::Broken);
}
}
tenant
}
///
/// Shut down all tenants. This runs as part of pageserver shutdown.
///
pub async fn shutdown_all_tenants() {
let tenantids = {
let tenants_to_shut_down = {
let mut m = tenants_state::write_tenants();
let mut tenantids = Vec::new();
for (tenantid, tenant) in m.iter_mut() {
match tenant.state {
TenantState::Active | TenantState::Idle | TenantState::Stopping => {
tenant.state = TenantState::Stopping;
tenantids.push(*tenantid)
}
TenantState::Broken => {}
let mut tenants_to_shut_down = Vec::with_capacity(m.len());
for (_, tenant) in m.drain() {
if tenant.is_active() {
// updates tenant state, forbidding new GC and compaction iterations from starting
tenant.set_state(TenantState::Paused);
tenants_to_shut_down.push(tenant)
}
}
drop(m);
tenantids
tenants_to_shut_down
};
// Shut down all existing walreceiver connections and stop accepting the new ones.
task_mgr::shutdown_tasks(Some(TaskKind::WalReceiverManager), None, None).await;
// Ok, no background tasks running anymore. Flush any remaining data in
// memory to disk.
//
// We assume that any incoming connections that might request pages from
// the repository have already been terminated by the caller, so there
// the tenant have already been terminated by the caller, so there
// should be no more activity in any of the repositories.
//
// On error, log it but continue with the shutdown for other tenants.
for tenant_id in tenantids {
for tenant in tenants_to_shut_down {
let tenant_id = tenant.tenant_id();
debug!("shutdown tenant {tenant_id}");
match get_repository_for_tenant(tenant_id) {
Ok(repo) => {
if let Err(err) = repo.checkpoint() {
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
}
}
Err(err) => {
error!("Could not get repository for tenant {tenant_id} during shutdown: {err:?}");
}
if let Err(err) = tenant.checkpoint() {
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
}
}
}
fn create_repo(
fn create_tenant_files(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: ZTenantId,
wal_redo_manager: Arc<dyn WalRedoManager + Send + Sync>,
remote_index: RemoteIndex,
) -> anyhow::Result<Arc<Repository>> {
) -> anyhow::Result<()> {
let target_tenant_directory = conf.tenant_path(&tenant_id);
anyhow::ensure!(
!target_tenant_directory.exists(),
@@ -282,7 +259,7 @@ fn create_repo(
)
})?;
// first, create a config in the top-level temp directory, fsync the file
Repository::persist_tenant_config(&temporary_tenant_config_path, tenant_conf, true)?;
Tenant::persist_tenant_config(&temporary_tenant_config_path, tenant_conf, true)?;
// then, create a subdirectory in the top-level temp directory, fsynced
crashsafe_dir::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
format!(
@@ -312,18 +289,11 @@ fn create_repo(
fs::File::open(target_dir_parent)?.sync_all()?;
info!(
"created directory structure in {}",
"created tenant directory structure in {}",
target_tenant_directory.display()
);
Ok(Arc::new(Repository::new(
conf,
tenant_conf,
wal_redo_manager,
tenant_id,
remote_index,
conf.remote_storage_config.is_some(),
)))
Ok(())
}
fn rebase_directory(original_path: &Path, base: &Path, new_base: &Path) -> anyhow::Result<PathBuf> {
@@ -350,12 +320,17 @@ pub fn create_tenant(
}
hash_map::Entry::Vacant(v) => {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let repo = create_repo(conf, tenant_conf, tenant_id, wal_redo_manager, remote_index)?;
v.insert(Tenant {
state: TenantState::Active,
repo,
});
crate::tenant_tasks::start_background_loops(tenant_id);
create_tenant_files(conf, tenant_conf, tenant_id)?;
let tenant = Arc::new(Tenant::new(
conf,
tenant_conf,
wal_redo_manager,
tenant_id,
remote_index,
conf.remote_storage_config.is_some(),
));
tenant.activate(false);
v.insert(tenant);
Ok(Some(tenant_id))
}
}
@@ -367,70 +342,23 @@ pub fn update_tenant_config(
tenant_id: ZTenantId,
) -> anyhow::Result<()> {
info!("configuring tenant {tenant_id}");
get_repository_for_tenant(tenant_id)?.update_tenant_config(tenant_conf);
Repository::persist_tenant_config(&TenantConf::path(conf, tenant_id), tenant_conf, false)?;
get_tenant(tenant_id, true)?.update_tenant_config(tenant_conf);
Tenant::persist_tenant_config(&TenantConf::path(conf, tenant_id), tenant_conf, false)?;
Ok(())
}
pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
Some(tenants_state::read_tenants().get(&tenantid)?.state)
}
pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow::Result<()> {
let old_state = {
let mut m = tenants_state::write_tenants();
let tenant = m
.get_mut(&tenant_id)
.with_context(|| format!("Tenant not found for id {tenant_id}"))?;
let old_state = tenant.state;
tenant.state = new_state;
old_state
};
match (old_state, new_state) {
(TenantState::Broken, TenantState::Broken)
| (TenantState::Active, TenantState::Active)
| (TenantState::Idle, TenantState::Idle)
| (TenantState::Stopping, TenantState::Stopping) => {
debug!("tenant {tenant_id} already in state {new_state}");
}
(TenantState::Broken, ignored) => {
debug!("Ignoring {ignored} since tenant {tenant_id} is in broken state");
}
(_, TenantState::Broken) => {
debug!("Setting tenant {tenant_id} status to broken");
}
(TenantState::Stopping, ignored) => {
debug!("Ignoring {ignored} since tenant {tenant_id} is in stopping state");
}
(TenantState::Idle, TenantState::Active) => {
info!("activating tenant {tenant_id}");
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
crate::tenant_tasks::start_background_loops(tenant_id);
}
(TenantState::Idle, TenantState::Stopping) => {
info!("stopping idle tenant {tenant_id}");
}
(TenantState::Active, TenantState::Stopping | TenantState::Idle) => {
info!("stopping tenant {tenant_id} tasks due to new state {new_state}");
// Note: The caller is responsible for waiting for any tasks to finish.
}
}
Ok(())
}
pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result<Arc<Repository>> {
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
pub fn get_tenant(tenant_id: ZTenantId, active_only: bool) -> anyhow::Result<Arc<Tenant>> {
let m = tenants_state::read_tenants();
let tenant = m
.get(&tenant_id)
.with_context(|| format!("Tenant {tenant_id} not found"))?;
Ok(Arc::clone(&tenant.repo))
.with_context(|| format!("Tenant {tenant_id} not found in the local state"))?;
if active_only && !tenant.is_active() {
anyhow::bail!("Tenant {tenant_id} is not active")
} else {
Ok(Arc::clone(tenant))
}
}
pub async fn delete_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> anyhow::Result<()> {
@@ -455,9 +383,14 @@ pub async fn delete_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) ->
info!("waiting for timeline tasks to shutdown");
task_mgr::shutdown_tasks(None, Some(tenant_id), Some(timeline_id)).await;
info!("timeline task shutdown completed");
match tenants_state::read_tenants().get(&tenant_id) {
Some(tenant) => tenant.repo.delete_timeline(timeline_id)?,
None => anyhow::bail!("Tenant {tenant_id} not found in local tenant state"),
match get_tenant(tenant_id, true) {
Ok(tenant) => {
tenant.delete_timeline(timeline_id)?;
if tenant.list_timelines().is_empty() {
tenant.activate(false);
}
}
Err(e) => anyhow::bail!("Cannot access tenant {tenant_id} in local tenant state: {e:?}"),
}
Ok(())
@@ -467,21 +400,24 @@ pub async fn detach_tenant(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
) -> anyhow::Result<()> {
set_tenant_state(tenant_id, TenantState::Stopping)?;
let tenant = match {
let mut tenants_accessor = tenants_state::write_tenants();
tenants_accessor.remove(&tenant_id)
} {
Some(tenant) => tenant,
None => anyhow::bail!("Tenant not found for id {tenant_id}"),
};
tenant.set_state(TenantState::Paused);
// shutdown all tenant and timeline tasks: gc, compaction, page service)
task_mgr::shutdown_tasks(None, Some(tenant_id), None).await;
{
let mut tenants_accessor = tenants_state::write_tenants();
tenants_accessor.remove(&tenant_id);
}
// If removal fails there will be no way to successfully retry detach,
// because the tenant no longer exists in the in-memory map. And it needs to be removed from it
// before we remove files, because it contains references to repository
// before we remove files, because it contains references to tenant
// which references ephemeral files which are deleted on drop. So if we keep these references,
// we will attempt to remove files which no longer exist. This can be fixed by having shutdown
// mechanism for repository that will clean temporary data to avoid any references to ephemeral files
// mechanism for tenant that will clean temporary data to avoid any references to ephemeral files
let local_tenant_directory = conf.tenant_path(&tenant_id);
fs::remove_dir_all(&local_tenant_directory).with_context(|| {
format!(
@@ -512,7 +448,7 @@ pub fn list_tenant_info(remote_index: &RemoteTimelineIndex) -> Vec<TenantInfo> {
TenantInfo {
id: *id,
state: Some(tenant.state),
state: tenant.current_state(),
current_physical_size: None,
has_in_progress_downloads,
}

View File

@@ -1,12 +1,14 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::Duration;
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{Tenant, TenantState};
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use tracing::*;
use utils::zid::ZTenantId;
@@ -18,7 +20,10 @@ pub fn start_background_loops(tenant_id: ZTenantId) {
None,
&format!("compactor for tenant {tenant_id}"),
false,
compaction_loop(tenant_id),
async move {
compaction_loop(tenant_id).await;
Ok(())
},
);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
@@ -27,43 +32,50 @@ pub fn start_background_loops(tenant_id: ZTenantId) {
None,
&format!("garbage collector for tenant {tenant_id}"),
false,
gc_loop(tenant_id),
async move {
gc_loop(tenant_id).await;
Ok(())
},
);
}
///
/// Compaction task's main loop
///
async fn compaction_loop(tenant_id: ZTenantId) -> anyhow::Result<()> {
async fn compaction_loop(tenant_id: ZTenantId) {
let wait_duration = Duration::from_secs(2);
info!("starting compaction loop for {tenant_id}");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
let result = async {
async {
loop {
trace!("waking up");
let tenant = tokio::select! {
_ = task_mgr::shutdown_watcher() => {
info!("received compaction cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(tenant) => tenant,
},
};
// Run blocking part of the task
// Break if tenant is not active
if tenant_mgr::get_tenant_state(tenant_id) != Some(TenantState::Active) {
break Ok(());
}
// This should not fail. If someone started us, it means that the tenant exists.
// And before you remove a tenant, you have to wait until all the associated tasks
// exit.
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
// Run compaction
let mut sleep_duration = repo.get_compaction_period();
if let Err(e) = repo.compaction_iteration() {
error!("Compaction failed, retrying: {}", e);
sleep_duration = Duration::from_secs(2)
let mut sleep_duration = tenant.get_compaction_period();
if let Err(e) = tenant.compaction_iteration() {
error!("Compaction failed, retrying: {e:#}");
sleep_duration = wait_duration;
}
// Sleep
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
trace!("received cancellation request");
break Ok(());
info!("received compaction cancellation request during idling");
break ;
},
_ = tokio::time::sleep(sleep_duration) => {},
}
@@ -72,49 +84,49 @@ async fn compaction_loop(tenant_id: ZTenantId) -> anyhow::Result<()> {
.await;
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
info!(
"compaction loop stopped. State is {:?}",
tenant_mgr::get_tenant_state(tenant_id)
);
result
trace!("compaction loop stopped.");
}
///
/// GC task's main loop
///
async fn gc_loop(tenant_id: ZTenantId) -> anyhow::Result<()> {
async fn gc_loop(tenant_id: ZTenantId) {
let wait_duration = Duration::from_secs(2);
info!("starting gc loop for {tenant_id}");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
let result = async {
async {
loop {
trace!("waking up");
// Break if tenant is not active
if tenant_mgr::get_tenant_state(tenant_id) != Some(TenantState::Active) {
break Ok(());
}
// This should not fail. If someone started us, it means that the tenant exists.
// And before you remove a tenant, you have to wait until all the associated tasks
// exit.
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let tenant = tokio::select! {
_ = task_mgr::shutdown_watcher() => {
info!("received GC cancellation request");
return;
},
tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(tenant) => tenant,
},
};
// Run gc
let gc_period = repo.get_gc_period();
let gc_horizon = repo.get_gc_horizon();
let gc_period = tenant.get_gc_period();
let gc_horizon = tenant.get_gc_horizon();
let mut sleep_duration = gc_period;
if gc_horizon > 0 {
if let Err(e) = repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false)
if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false)
{
error!("Gc failed, retrying: {}", e);
sleep_duration = Duration::from_secs(2)
error!("Gc failed, retrying: {e:#}");
sleep_duration = wait_duration;
}
}
// Sleep
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
trace!("received cancellation request");
break Ok(());
info!("received GC cancellation request during idling");
break;
},
_ = tokio::time::sleep(sleep_duration) => {},
}
@@ -122,9 +134,50 @@ async fn gc_loop(tenant_id: ZTenantId) -> anyhow::Result<()> {
}
.await;
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
info!(
"GC loop stopped. State is {:?}",
tenant_mgr::get_tenant_state(tenant_id)
);
result
trace!("GC loop stopped.");
}
async fn wait_for_active_tenant(
tenant_id: ZTenantId,
wait: Duration,
) -> ControlFlow<(), Arc<Tenant>> {
let tenant = loop {
match tenant_mgr::get_tenant(tenant_id, false) {
Ok(tenant) => break tenant,
Err(e) => {
error!("Failed to get a tenant {tenant_id}: {e:#}");
tokio::time::sleep(wait).await;
}
}
};
// if the tenant has a proper status already, no need to wait for anything
if tenant.should_run_tasks() {
ControlFlow::Continue(tenant)
} else {
let mut tenant_state_updates = tenant.subscribe_for_state_updates();
loop {
match tenant_state_updates.changed().await {
Ok(()) => {
let new_state = *tenant_state_updates.borrow();
match new_state {
TenantState::Active {
background_jobs_running: true,
} => {
debug!("Tenant state changed to active with background jobs enabled, continuing the task loop");
return ControlFlow::Continue(tenant);
}
state => {
debug!("Not running the task loop, tenant is not active with background jobs enabled: {state:?}");
tokio::time::sleep(wait).await;
}
}
}
Err(_sender_dropped_error) => {
info!("Tenant dropped the state updates sender, quitting waiting for tenant and the task loop");
return ControlFlow::Break(());
}
}
}
}
}

View File

@@ -2,34 +2,28 @@
//! Timeline management code
//
use anyhow::{bail, Context, Result};
use remote_storage::path_with_suffix_extension;
use std::{
fs,
path::Path,
process::{Command, Stdio},
sync::Arc,
};
use anyhow::{bail, Context, Result};
use tracing::*;
use remote_storage::path_with_suffix_extension;
use utils::{
lsn::Lsn,
zid::{ZTenantId, ZTimelineId},
};
use crate::config::PageServerConf;
use crate::layered_repository::{Repository, Timeline};
use crate::tenant::{Tenant, Timeline};
use crate::tenant_mgr;
use crate::CheckpointConfig;
use crate::{import_datadir, TEMP_FILE_SUFFIX};
#[derive(Debug, Clone, Copy)]
pub struct PointInTime {
pub timeline_id: ZTimelineId,
pub lsn: Lsn,
}
// Create the cluster temporarily in 'initdbpath' directory inside the repository
// to get bootstrap data for timeline initialization.
//
@@ -69,7 +63,7 @@ fn bootstrap_timeline(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
repo: &Repository,
tenant: &Tenant,
) -> Result<Arc<Timeline>> {
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
// temporary directory for basebackup files for the given timeline.
@@ -89,7 +83,7 @@ fn bootstrap_timeline(
// LSN, and any WAL after that.
// Initdb lsn will be equal to last_record_lsn which will be set after import.
// Because we know it upfront avoid having an option or dummy zero value by passing it to create_empty_timeline.
let timeline = repo.create_empty_timeline(timeline_id, lsn)?;
let timeline = tenant.create_empty_timeline(timeline_id, lsn)?;
import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?;
fail::fail_point!("before-checkpoint-new-timeline", |_| {
@@ -127,16 +121,16 @@ pub(crate) async fn create_timeline(
mut ancestor_start_lsn: Option<Lsn>,
) -> Result<Option<Arc<Timeline>>> {
let new_timeline_id = new_timeline_id.unwrap_or_else(ZTimelineId::generate);
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
if conf.timeline_path(&new_timeline_id, &tenant_id).exists() {
debug!("timeline {} already exists", new_timeline_id);
debug!("timeline {new_timeline_id} already exists");
return Ok(None);
}
let loaded_timeline = match ancestor_timeline_id {
Some(ancestor_timeline_id) => {
let ancestor_timeline = repo
let ancestor_timeline = tenant
.get_timeline(ancestor_timeline_id)
.context("Cannot branch off the timeline that's not present in pageserver")?;
@@ -162,10 +156,13 @@ pub(crate) async fn create_timeline(
}
}
repo.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)?
tenant.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)?
}
None => bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?,
None => bootstrap_timeline(conf, tenant_id, new_timeline_id, &tenant)?,
};
// Have added new timeline into the tenant, now its background tasks are needed.
tenant.activate(true);
Ok(Some(loaded_timeline))
}

View File

@@ -30,9 +30,9 @@ use anyhow::Result;
use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use crate::layered_repository::Timeline;
use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use crate::walrecord::*;
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::v14::pg_constants;
@@ -1022,16 +1022,13 @@ impl<'a> WalIngest<'a> {
}
}
///
/// Tests that should work the same with any Repository/Timeline implementation.
///
#[allow(clippy::bool_assert_comparison)]
#[cfg(test)]
mod tests {
use super::*;
use crate::layered_repository::repo_harness::*;
use crate::layered_repository::Timeline;
use crate::pgdatadir_mapping::create_test_timeline;
use crate::tenant::harness::*;
use crate::tenant::Timeline;
use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
use postgres_ffi::RELSEG_SIZE;
@@ -1061,8 +1058,8 @@ mod tests {
#[test]
fn test_relsize() -> Result<()> {
let repo = RepoHarness::create("test_relsize")?.load();
let tline = create_test_timeline(&repo, TIMELINE_ID)?;
let tenant = TenantHarness::create("test_relsize")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut m = tline.begin_modification(Lsn(0x20));
@@ -1189,8 +1186,8 @@ mod tests {
// and then created it again within the same layer.
#[test]
fn test_drop_extend() -> Result<()> {
let repo = RepoHarness::create("test_drop_extend")?.load();
let tline = create_test_timeline(&repo, TIMELINE_ID)?;
let tenant = TenantHarness::create("test_drop_extend")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut m = tline.begin_modification(Lsn(0x20));
@@ -1229,8 +1226,8 @@ mod tests {
// and then extended it again within the same layer.
#[test]
fn test_truncate_extend() -> Result<()> {
let repo = RepoHarness::create("test_truncate_extend")?.load();
let tline = create_test_timeline(&repo, TIMELINE_ID)?;
let tenant = TenantHarness::create("test_truncate_extend")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&*tline)?;
// Create a 20 MB relation (the size is arbitrary)
@@ -1317,8 +1314,8 @@ mod tests {
/// split into multiple 1 GB segments in Postgres.
#[test]
fn test_large_rel() -> Result<()> {
let repo = RepoHarness::create("test_large_rel")?.load();
let tline = create_test_timeline(&repo, TIMELINE_ID)?;
let tenant = TenantHarness::create("test_large_rel")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut lsn = 0x10;

View File

@@ -16,10 +16,10 @@ use std::{
time::Duration,
};
use crate::layered_repository::Timeline;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::task_mgr::WALRECEIVER_RUNTIME;
use crate::tenant::Timeline;
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
use etcd_broker::{
@@ -767,11 +767,11 @@ fn wal_stream_connection_string(
#[cfg(test)]
mod tests {
use super::*;
use crate::layered_repository::repo_harness::{RepoHarness, TIMELINE_ID};
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
#[test]
fn no_connection_no_candidate() -> anyhow::Result<()> {
let harness = RepoHarness::create("no_connection_no_candidate")?;
let harness = TenantHarness::create("no_connection_no_candidate")?;
let mut state = dummy_state(&harness);
let now = Utc::now().naive_utc();
@@ -857,7 +857,7 @@ mod tests {
#[tokio::test]
async fn connection_no_candidate() -> anyhow::Result<()> {
let harness = RepoHarness::create("connection_no_candidate")?;
let harness = TenantHarness::create("connection_no_candidate")?;
let mut state = dummy_state(&harness);
let now = Utc::now().naive_utc();
@@ -948,7 +948,7 @@ mod tests {
#[test]
fn no_connection_candidate() -> anyhow::Result<()> {
let harness = RepoHarness::create("no_connection_candidate")?;
let harness = TenantHarness::create("no_connection_candidate")?;
let mut state = dummy_state(&harness);
let now = Utc::now().naive_utc();
@@ -1053,7 +1053,7 @@ mod tests {
#[tokio::test]
async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
let harness = RepoHarness::create("candidate_with_many_connection_failures")?;
let harness = TenantHarness::create("candidate_with_many_connection_failures")?;
let mut state = dummy_state(&harness);
let now = Utc::now().naive_utc();
@@ -1117,7 +1117,7 @@ mod tests {
#[tokio::test]
async fn lsn_wal_over_threshhold_current_candidate() -> anyhow::Result<()> {
let harness = RepoHarness::create("lsn_wal_over_threshcurrent_candidate")?;
let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate")?;
let mut state = dummy_state(&harness);
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();
@@ -1204,7 +1204,7 @@ mod tests {
#[tokio::test]
async fn timeout_connection_threshhold_current_candidate() -> anyhow::Result<()> {
let harness = RepoHarness::create("timeout_connection_threshhold_current_candidate")?;
let harness = TenantHarness::create("timeout_connection_threshhold_current_candidate")?;
let mut state = dummy_state(&harness);
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();
@@ -1276,7 +1276,7 @@ mod tests {
#[tokio::test]
async fn timeout_wal_over_threshhold_current_candidate() -> anyhow::Result<()> {
let harness = RepoHarness::create("timeout_wal_over_threshhold_current_candidate")?;
let harness = TenantHarness::create("timeout_wal_over_threshhold_current_candidate")?;
let mut state = dummy_state(&harness);
let current_lsn = Lsn(100_000).align();
let new_lsn = Lsn(100_100).align();
@@ -1353,7 +1353,7 @@ mod tests {
const DUMMY_SAFEKEEPER_CONNSTR: &str = "safekeeper_connstr";
fn dummy_state(harness: &RepoHarness) -> WalreceiverState {
fn dummy_state(harness: &TenantHarness) -> WalreceiverState {
WalreceiverState {
id: ZTenantTimelineId {
tenant_id: harness.tenant_id,

View File

@@ -21,10 +21,10 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use super::TaskEvent;
use crate::metrics::LIVE_CONNECTIONS_COUNT;
use crate::{
layered_repository::{Timeline, WalReceiverInfo},
task_mgr,
task_mgr::TaskKind,
task_mgr::WALRECEIVER_RUNTIME,
tenant::{Timeline, WalReceiverInfo},
tenant_mgr,
walingest::WalIngest,
walrecord::DecodedWALRecord,
@@ -141,8 +141,7 @@ pub async fn handle_walreceiver_connection(
let tenant_id = timeline.tenant_id;
let timeline_id = timeline.timeline_id;
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
.with_context(|| format!("no repository found for tenant {tenant_id}"))?;
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
//
// Start streaming the WAL, from where we left off previously.
@@ -283,7 +282,7 @@ pub async fn handle_walreceiver_connection(
})?;
if let Some(last_lsn) = status_update {
let remote_index = repo.get_remote_index();
let remote_index = tenant.get_remote_index();
let timeline_remote_consistent_lsn = remote_index
.read()
.await

View File

@@ -71,7 +71,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
# First timeline would not get loaded into pageserver due to corrupt metadata file
with pytest.raises(
Exception, match=f"Could not get timeline {timeline1} in tenant {tenant1}"
Exception, match=f"Timeline {timeline1} was not found for tenant {tenant1}"
) as err:
pg1.start()
log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}")
@@ -80,7 +80,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
# We don't have the remote storage enabled, which means timeline is in an incorrect state,
# it's not loaded at all
with pytest.raises(
Exception, match=f"Could not get timeline {timeline2} in tenant {tenant2}"
Exception, match=f"Timeline {timeline2} was not found for tenant {tenant2}"
) as err:
pg2.start()
log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}")

View File

@@ -40,11 +40,16 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder):
for t in timelines:
client.timeline_delete(tenant, t)
def assert_active_without_jobs(tenant):
assert get_state(tenant) == {"Active": {"background_jobs_running": False}}
# Create tenant, start compute
tenant, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline(name, tenant_id=tenant)
pg = env.postgres.create_start(name, tenant_id=tenant)
assert get_state(tenant) == "Active"
assert get_state(tenant) == {
"Active": {"background_jobs_running": True}
}, "Pageserver should activate a tenant and start background jobs if timelines are loaded"
# Stop compute
pg.stop()
@@ -53,6 +58,7 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder):
for tenant_info in client.tenant_list():
tenant_id = ZTenantId(tenant_info["id"])
delete_all_timelines(tenant_id)
wait_until(10, 0.2, lambda: assert_active_without_jobs(tenant_id))
# Assert that all tasks finish quickly after tenant is detached
assert get_metric_value('pageserver_tenant_task_events{event="start"}') > 0

View File

@@ -18,7 +18,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
invalid_tenant_id = ZTenantId.generate()
with pytest.raises(
NeonPageserverApiException,
match=f"Tenant {invalid_tenant_id} not found in local tenant state",
match=f"Tenant {invalid_tenant_id} not found in the local state",
):
ps_http.timeline_delete(tenant_id=invalid_tenant_id, timeline_id=invalid_timeline_id)
@@ -64,7 +64,8 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
# check 404
with pytest.raises(
NeonPageserverApiException, match="is not found neither locally nor remotely"
NeonPageserverApiException,
match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} is not found neither locally nor remotely",
):
ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)