Merge branch 'main' into yuchen/direct-io-for-read

This commit is contained in:
Yuchen Liang
2024-10-18 14:08:06 -04:00
committed by GitHub
276 changed files with 6269 additions and 4082 deletions

View File

@@ -16,7 +16,7 @@ use fail::fail_point;
use pageserver_api::key::Key;
use postgres_ffi::pg_constants;
use std::fmt::Write as FmtWrite;
use std::time::SystemTime;
use std::time::{Instant, SystemTime};
use tokio::io;
use tokio::io::AsyncWrite;
use tracing::*;
@@ -352,12 +352,25 @@ where
}
}
for (path, content) in self
let start_time = Instant::now();
let aux_files = self
.timeline
.list_aux_files(self.lsn, self.ctx)
.await
.map_err(|e| BasebackupError::Server(e.into()))?
{
.map_err(|e| BasebackupError::Server(e.into()))?;
let aux_scan_time = start_time.elapsed();
let aux_estimated_size = aux_files
.values()
.map(|content| content.len())
.sum::<usize>();
info!(
"Scanned {} aux files in {}ms, aux file content size = {}",
aux_files.len(),
aux_scan_time.as_millis(),
aux_estimated_size
);
for (path, content) in aux_files {
if path.starts_with("pg_replslot") {
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
let restart_lsn = Lsn(u64::from_le_bytes(

View File

@@ -164,6 +164,9 @@ pub struct PageServerConf {
pub image_compression: ImageCompressionAlgorithm,
/// Whether to offload archived timelines automatically
pub timeline_offloading: bool,
/// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
/// is exceeded, we start proactively closing ephemeral layers to limit the total amount
/// of ephemeral data.
@@ -321,6 +324,7 @@ impl PageServerConf {
ingest_batch_size,
max_vectored_read_bytes,
image_compression,
timeline_offloading,
ephemeral_bytes_per_memory_kb,
l0_flush,
virtual_file_io_mode,
@@ -364,6 +368,7 @@ impl PageServerConf {
ingest_batch_size,
max_vectored_read_bytes,
image_compression,
timeline_offloading,
ephemeral_bytes_per_memory_kb,
// ------------------------------------------------------------

View File

@@ -198,7 +198,7 @@ fn serialize_in_chunks<'a>(
}
}
impl<'a> ExactSizeIterator for Iter<'a> {}
impl ExactSizeIterator for Iter<'_> {}
let buffer = bytes::BytesMut::new();
let inner = input.chunks(chunk_size);

View File

@@ -654,7 +654,7 @@ impl std::fmt::Debug for EvictionCandidate {
let ts = chrono::DateTime::<chrono::Utc>::from(self.last_activity_ts);
let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
struct DisplayIsDebug<'a, T>(&'a T);
impl<'a, T: std::fmt::Display> std::fmt::Debug for DisplayIsDebug<'a, T> {
impl<T: std::fmt::Display> std::fmt::Debug for DisplayIsDebug<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
@@ -1218,16 +1218,7 @@ mod filesystem_level_usage {
let stat = Statvfs::get(tenants_dir, mock_config)
.context("statvfs failed, presumably directory got unlinked")?;
// https://unix.stackexchange.com/a/703650
let blocksize = if stat.fragment_size() > 0 {
stat.fragment_size()
} else {
stat.block_size()
};
// use blocks_available (b_avail) since, pageserver runs as unprivileged user
let avail_bytes = stat.blocks_available() * blocksize;
let total_bytes = stat.blocks() * blocksize;
let (avail_bytes, total_bytes) = stat.get_avail_total_bytes();
Ok(Usage {
config,

View File

@@ -18,7 +18,6 @@ use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use pageserver_api::models::IngestAuxFilesRequest;
use pageserver_api::models::ListAuxFilesRequest;
@@ -77,6 +76,7 @@ use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::storage_layer::LayerName;
use crate::tenant::timeline::offload::offload_timeline;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
@@ -325,6 +325,7 @@ impl From<crate::tenant::TimelineArchivalError> for ApiError {
match value {
NotFound => ApiError::NotFound(anyhow::anyhow!("timeline not found").into()),
Timeout => ApiError::Timeout("hit pageserver internal timeout".into()),
Cancelled => ApiError::ShuttingDown,
e @ HasArchivedParent(_) => {
ApiError::PreconditionFailed(e.to_string().into_boxed_str())
}
@@ -472,8 +473,6 @@ async fn build_timeline_info_common(
is_archived: Some(is_archived),
walreceiver_status,
last_aux_file_policy: timeline.last_aux_file_policy.load(),
};
Ok(info)
}
@@ -715,8 +714,15 @@ async fn timeline_archival_config_handler(
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
tenant
.apply_timeline_archival_config(timeline_id, request_data.state, ctx)
.apply_timeline_archival_config(
timeline_id,
request_data.state,
state.broker_client.clone(),
ctx,
)
.await?;
Ok::<_, ApiError>(())
}
@@ -1783,6 +1789,49 @@ async fn timeline_compact_handler(
.await
}
// Run offload immediately on given timeline.
async fn timeline_offload_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
if tenant.get_offloaded_timeline(timeline_id).is_ok() {
return json_response(StatusCode::OK, ());
}
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
if !tenant.timeline_has_no_attached_children(timeline_id) {
return Err(ApiError::PreconditionFailed(
"timeline has attached children".into(),
));
}
if !timeline.can_offload() {
return Err(ApiError::PreconditionFailed(
"Timeline::can_offload() returned false".into(),
));
}
offload_timeline(&tenant, &timeline)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
.instrument(info_span!("manual_timeline_offload", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
}
// Run checkpoint immediately on given timeline.
async fn timeline_checkpoint_handler(
request: Request<Body>,
@@ -2202,7 +2251,7 @@ async fn tenant_scan_remote_handler(
%timeline_id))
.await
{
Ok((index_part, index_generation)) => {
Ok((index_part, index_generation, _index_mtime)) => {
tracing::info!("Found timeline {tenant_shard_id}/{timeline_id} metadata (gen {index_generation:?}, {} layers, {} consistent LSN)",
index_part.layer_metadata.len(), index_part.metadata.disk_consistent_lsn());
generation = std::cmp::max(generation, index_generation);
@@ -2347,31 +2396,6 @@ async fn post_tracing_event_handler(
json_response(StatusCode::OK, ())
}
async fn force_aux_policy_switch_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&r, "timeline_id")?;
let policy: AuxFilePolicy = json_request(&mut r).await?;
let state = get_state(&r);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
timeline
.do_switch_aux_policy(policy)
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
async fn put_io_engine_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
@@ -3006,6 +3030,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_compact_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/offload",
|r| testing_api_handler("attempt timeline offload", r, timeline_offload_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/checkpoint",
|r| testing_api_handler("run timeline checkpoint", r, timeline_checkpoint_handler),
@@ -3080,10 +3108,6 @@ pub fn make_router(
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/force_aux_policy_switch",
|r| api_handler(r, force_aux_policy_switch_handler),
)
.get("/v1/utilization", |r| api_handler(r, get_utilization))
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files",

View File

@@ -1189,7 +1189,7 @@ struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
op: SmgrQueryType,
}
impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
impl Drop for GlobalAndPerTimelineHistogramTimer<'_, '_> {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
let ex_throttled = self
@@ -1560,7 +1560,7 @@ impl BasebackupQueryTime {
}
}
impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
impl BasebackupQueryTimeOngoingRecording<'_, '_> {
pub(crate) fn observe<T>(self, res: &Result<T, QueryError>) {
let elapsed = self.start.elapsed();
let ex_throttled = self

View File

@@ -26,8 +26,8 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::{Duration, Instant};
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -1137,10 +1137,10 @@ impl PageServerHandler {
.await
.map_err(map_basebackup_error)?;
} else {
let mut writer = pgb.copyout_writer();
let mut writer = BufWriter::new(pgb.copyout_writer());
if gzip {
let mut encoder = GzipEncoder::with_quality(
writer,
&mut writer,
// NOTE using fast compression because it's on the critical path
// for compute startup. For an empty database, we get
// <100KB with this method. The Level::Best compression method
@@ -1175,6 +1175,10 @@ impl PageServerHandler {
.await
.map_err(map_basebackup_error)?;
}
writer
.flush()
.await
.map_err(|e| map_basebackup_error(BasebackupError::Client(e)))?;
}
pgb.write_message_noflush(&BeMessage::CopyDone)

View File

@@ -22,7 +22,6 @@ use pageserver_api::key::{
CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
};
use pageserver_api::keyspace::SparseKeySpace;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
@@ -33,7 +32,7 @@ use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, trace, warn};
use tracing::{debug, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::pausable_failpoint;
use utils::{bin_ser::BeSer, lsn::Lsn};
@@ -677,21 +676,6 @@ impl Timeline {
self.get(CHECKPOINT_KEY, lsn, ctx).await
}
async fn list_aux_files_v1(
&self,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
match self.get(AUX_FILES_KEY, lsn, ctx).await {
Ok(buf) => Ok(AuxFilesDirectory::des(&buf)?.files),
Err(e) => {
// This is expected: historical databases do not have the key.
debug!("Failed to get info about AUX files: {}", e);
Ok(HashMap::new())
}
}
}
async fn list_aux_files_v2(
&self,
lsn: Lsn,
@@ -722,10 +706,7 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
let current_policy = self.last_aux_file_policy.load();
if let Some(AuxFilePolicy::V2) | Some(AuxFilePolicy::CrossValidation) = current_policy {
self.list_aux_files_v2(lsn, ctx).await?;
}
self.list_aux_files_v2(lsn, ctx).await?;
Ok(())
}
@@ -734,51 +715,7 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<HashMap<String, Bytes>, PageReconstructError> {
let current_policy = self.last_aux_file_policy.load();
match current_policy {
Some(AuxFilePolicy::V1) => {
let res = self.list_aux_files_v1(lsn, ctx).await?;
let empty_str = if res.is_empty() { ", empty" } else { "" };
warn!(
"this timeline is using deprecated aux file policy V1 (policy=v1{empty_str})"
);
Ok(res)
}
None => {
let res = self.list_aux_files_v1(lsn, ctx).await?;
if !res.is_empty() {
warn!("this timeline is using deprecated aux file policy V1 (policy=None)");
}
Ok(res)
}
Some(AuxFilePolicy::V2) => self.list_aux_files_v2(lsn, ctx).await,
Some(AuxFilePolicy::CrossValidation) => {
let v1_result = self.list_aux_files_v1(lsn, ctx).await;
let v2_result = self.list_aux_files_v2(lsn, ctx).await;
match (v1_result, v2_result) {
(Ok(v1), Ok(v2)) => {
if v1 != v2 {
tracing::error!(
"unmatched aux file v1 v2 result:\nv1 {v1:?}\nv2 {v2:?}"
);
return Err(PageReconstructError::Other(anyhow::anyhow!(
"unmatched aux file v1 v2 result"
)));
}
Ok(v1)
}
(Ok(_), Err(v2)) => {
tracing::error!("aux file v1 returns Ok while aux file v2 returns an err");
Err(v2)
}
(Err(v1), Ok(_)) => {
tracing::error!("aux file v2 returns Ok while aux file v1 returns an err");
Err(v1)
}
(Err(_), Err(v2)) => Err(v2),
}
}
}
self.list_aux_files_v2(lsn, ctx).await
}
pub(crate) async fn get_replorigins(
@@ -954,9 +891,6 @@ impl Timeline {
result.add_key(CONTROLFILE_KEY);
result.add_key(CHECKPOINT_KEY);
if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
result.add_key(AUX_FILES_KEY);
}
// Add extra keyspaces in the test cases. Some test cases write keys into the storage without
// creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
@@ -1166,9 +1100,6 @@ impl<'a> DatadirModification<'a> {
self.pending_directory_entries.push((DirectoryKind::Db, 0));
self.put(DBDIR_KEY, Value::Image(buf.into()));
// Create AuxFilesDirectory
self.init_aux_dir()?;
let buf = if self.tline.pg_version >= 17 {
TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
xids: HashSet::new(),
@@ -1347,9 +1278,6 @@ impl<'a> DatadirModification<'a> {
// 'true', now write the updated 'dbdirs' map back.
let buf = DbDirectory::ser(&dbdir)?;
self.put(DBDIR_KEY, Value::Image(buf.into()));
// Create AuxFilesDirectory as well
self.init_aux_dir()?;
}
if r.is_none() {
// Create RelDirectory
@@ -1545,9 +1473,6 @@ impl<'a> DatadirModification<'a> {
// Update relation size cache
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
// Update relation size cache
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
// Update logical database size.
self.pending_nblocks -= old_size as i64 - nblocks as i64;
}
@@ -1729,200 +1654,60 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
return Ok(());
}
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
files: HashMap::new(),
})?;
self.pending_directory_entries
.push((DirectoryKind::AuxFiles, 0));
self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
Ok(())
}
pub async fn put_file(
&mut self,
path: &str,
content: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
let switch_policy = self.tline.get_switch_aux_file_policy();
let policy = {
let current_policy = self.tline.last_aux_file_policy.load();
// Allowed switch path:
// * no aux files -> v1/v2/cross-validation
// * cross-validation->v2
let current_policy = if current_policy.is_none() {
// This path will only be hit once per tenant: we will decide the final policy in this code block.
// The next call to `put_file` will always have `last_aux_file_policy != None`.
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
let aux_files_key_v1 = self.tline.list_aux_files_v1(lsn, ctx).await?;
if aux_files_key_v1.is_empty() {
None
} else {
warn!("this timeline is using deprecated aux file policy V1 (detected existing v1 files)");
self.tline.do_switch_aux_policy(AuxFilePolicy::V1)?;
Some(AuxFilePolicy::V1)
}
} else {
current_policy
};
if AuxFilePolicy::is_valid_migration_path(current_policy, switch_policy) {
self.tline.do_switch_aux_policy(switch_policy)?;
info!(current=?current_policy, next=?switch_policy, "switching aux file policy");
switch_policy
} else {
// This branch handles non-valid migration path, and the case that switch_policy == current_policy.
// And actually, because the migration path always allow unspecified -> *, this unwrap_or will never be hit.
current_policy.unwrap_or(AuxFilePolicy::default_tenant_config())
}
let key = aux_file::encode_aux_file_key(path);
// retrieve the key from the engine
let old_val = match self.get(key, ctx).await {
Ok(val) => Some(val),
Err(PageReconstructError::MissingKey(_)) => None,
Err(e) => return Err(e.into()),
};
if let AuxFilePolicy::V2 | AuxFilePolicy::CrossValidation = policy {
let key = aux_file::encode_aux_file_key(path);
// retrieve the key from the engine
let old_val = match self.get(key, ctx).await {
Ok(val) => Some(val),
Err(PageReconstructError::MissingKey(_)) => None,
Err(e) => return Err(e.into()),
};
let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
aux_file::decode_file_value(old_val)?
let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
aux_file::decode_file_value(old_val)?
} else {
Vec::new()
};
let mut other_files = Vec::with_capacity(files.len());
let mut modifying_file = None;
for file @ (p, content) in files {
if path == p {
assert!(
modifying_file.is_none(),
"duplicated entries found for {}",
path
);
modifying_file = Some(content);
} else {
Vec::new()
};
let mut other_files = Vec::with_capacity(files.len());
let mut modifying_file = None;
for file @ (p, content) in files {
if path == p {
assert!(
modifying_file.is_none(),
"duplicated entries found for {}",
path
);
modifying_file = Some(content);
} else {
other_files.push(file);
}
other_files.push(file);
}
let mut new_files = other_files;
match (modifying_file, content.is_empty()) {
(Some(old_content), false) => {
self.tline
.aux_file_size_estimator
.on_update(old_content.len(), content.len());
new_files.push((path, content));
}
(Some(old_content), true) => {
self.tline
.aux_file_size_estimator
.on_remove(old_content.len());
// not adding the file key to the final `new_files` vec.
}
(None, false) => {
self.tline.aux_file_size_estimator.on_add(content.len());
new_files.push((path, content));
}
(None, true) => warn!("removing non-existing aux file: {}", path),
}
let new_val = aux_file::encode_file_value(&new_files)?;
self.put(key, Value::Image(new_val.into()));
}
if let AuxFilePolicy::V1 | AuxFilePolicy::CrossValidation = policy {
let file_path = path.to_string();
let content = if content.is_empty() {
None
} else {
Some(Bytes::copy_from_slice(content))
};
let n_files;
let mut aux_files = self.tline.aux_files.lock().await;
if let Some(mut dir) = aux_files.dir.take() {
// We already updated aux files in `self`: emit a delta and update our latest value.
dir.upsert(file_path.clone(), content.clone());
n_files = dir.files.len();
if aux_files.n_deltas == MAX_AUX_FILE_DELTAS {
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
aux_files.n_deltas = 0;
} else {
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile { file_path, content }),
);
aux_files.n_deltas += 1;
}
aux_files.dir = Some(dir);
} else {
// Check if the AUX_FILES_KEY is initialized
match self.get(AUX_FILES_KEY, ctx).await {
Ok(dir_bytes) => {
let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
// Key is already set, we may append a delta
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
file_path: file_path.clone(),
content: content.clone(),
}),
);
dir.upsert(file_path, content);
n_files = dir.files.len();
aux_files.dir = Some(dir);
}
Err(
e @ (PageReconstructError::Cancelled
| PageReconstructError::AncestorLsnTimeout(_)),
) => {
// Important that we do not interpret a shutdown error as "not found" and thereby
// reset the map.
return Err(e.into());
}
// Note: we added missing key error variant in https://github.com/neondatabase/neon/pull/7393 but
// the original code assumes all other errors are missing keys. Therefore, we keep the code path
// the same for now, though in theory, we should only match the `MissingKey` variant.
Err(
e @ (PageReconstructError::Other(_)
| PageReconstructError::WalRedo(_)
| PageReconstructError::MissingKey(_)),
) => {
// Key is missing, we must insert an image as the basis for subsequent deltas.
if !matches!(e, PageReconstructError::MissingKey(_)) {
let e = utils::error::report_compact_sources(&e);
tracing::warn!("treating error as if it was a missing key: {}", e);
}
let mut dir = AuxFilesDirectory {
files: HashMap::new(),
};
dir.upsert(file_path, content);
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
n_files = 1;
aux_files.dir = Some(dir);
}
}
let mut new_files = other_files;
match (modifying_file, content.is_empty()) {
(Some(old_content), false) => {
self.tline
.aux_file_size_estimator
.on_update(old_content.len(), content.len());
new_files.push((path, content));
}
self.pending_directory_entries
.push((DirectoryKind::AuxFiles, n_files));
(Some(old_content), true) => {
self.tline
.aux_file_size_estimator
.on_remove(old_content.len());
// not adding the file key to the final `new_files` vec.
}
(None, false) => {
self.tline.aux_file_size_estimator.on_add(content.len());
new_files.push((path, content));
}
(None, true) => warn!("removing non-existing aux file: {}", path),
}
let new_val = aux_file::encode_file_value(&new_files)?;
self.put(key, Value::Image(new_val.into()));
Ok(())
}
@@ -2092,12 +1877,6 @@ impl<'a> DatadirModification<'a> {
self.tline.get(key, lsn, ctx).await
}
/// Only used during unit tests, force putting a key into the modification.
#[cfg(test)]
pub(crate) fn put_for_test(&mut self, key: Key, val: Value) {
self.put(key, val);
}
fn put(&mut self, key: Key, val: Value) {
if Self::is_data_key(&key) {
self.put_data(key.to_compact(), val)
@@ -2215,21 +1994,6 @@ struct RelDirectory {
rels: HashSet<(Oid, u8)>,
}
#[derive(Debug, Serialize, Deserialize, Default, PartialEq)]
pub(crate) struct AuxFilesDirectory {
pub(crate) files: HashMap<String, Bytes>,
}
impl AuxFilesDirectory {
pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
if let Some(value) = value {
self.files.insert(key, value);
} else {
self.files.remove(&key);
}
}
}
#[derive(Debug, Serialize, Deserialize)]
struct RelSizeEntry {
nblocks: u32,

View File

@@ -53,6 +53,22 @@ impl Statvfs {
Statvfs::Mock(stat) => stat.block_size,
}
}
/// Get the available and total bytes on the filesystem.
pub fn get_avail_total_bytes(&self) -> (u64, u64) {
// https://unix.stackexchange.com/a/703650
let blocksize = if self.fragment_size() > 0 {
self.fragment_size()
} else {
self.block_size()
};
// use blocks_available (b_avail) since, pageserver runs as unprivileged user
let avail_bytes = self.blocks_available() * blocksize;
let total_bytes = self.blocks() * blocksize;
(avail_bytes, total_bytes)
}
}
pub mod mock {
@@ -74,7 +90,7 @@ pub mod mock {
let used_bytes = walk_dir_disk_usage(tenants_dir, name_filter.as_deref()).unwrap();
// round it up to the nearest block multiple
let used_blocks = (used_bytes + (blocksize - 1)) / blocksize;
let used_blocks = used_bytes.div_ceil(*blocksize);
if used_blocks > *total_blocks {
panic!(

View File

@@ -20,7 +20,6 @@ use enumset::EnumSet;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use pageserver_api::models;
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::models::LsnLease;
use pageserver_api::models::TimelineArchivalState;
use pageserver_api::models::TimelineState;
@@ -67,7 +66,7 @@ use self::metadata::TimelineMetadata;
use self::mgr::GetActiveTenantError;
use self::mgr::GetTenantError;
use self::remote_timeline_client::upload::upload_index_part;
use self::remote_timeline_client::RemoteTimelineClient;
use self::remote_timeline_client::{RemoteTimelineClient, WaitCompletionError};
use self::timeline::uninit::TimelineCreateGuard;
use self::timeline::uninit::TimelineExclusionError;
use self::timeline::uninit::UninitializedTimeline;
@@ -493,6 +492,8 @@ pub struct OffloadedTimeline {
pub tenant_shard_id: TenantShardId,
pub timeline_id: TimelineId,
pub ancestor_timeline_id: Option<TimelineId>,
/// Whether to retain the branch lsn at the ancestor or not
pub ancestor_retain_lsn: Option<Lsn>,
// TODO: once we persist offloaded state, make this lazily constructed
pub remote_client: Arc<RemoteTimelineClient>,
@@ -504,10 +505,14 @@ pub struct OffloadedTimeline {
impl OffloadedTimeline {
fn from_timeline(timeline: &Timeline) -> Self {
let ancestor_retain_lsn = timeline
.get_ancestor_timeline_id()
.map(|_timeline_id| timeline.get_ancestor_lsn());
Self {
tenant_shard_id: timeline.tenant_shard_id,
timeline_id: timeline.timeline_id,
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
ancestor_retain_lsn,
remote_client: timeline.remote_client.clone(),
delete_progress: timeline.delete_progress.clone(),
@@ -515,6 +520,12 @@ impl OffloadedTimeline {
}
}
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub enum MaybeOffloaded {
Yes,
No,
}
#[derive(Clone)]
pub enum TimelineOrOffloaded {
Timeline(Arc<Timeline>),
@@ -607,6 +618,9 @@ pub enum TimelineArchivalError {
#[error("Timeout")]
Timeout,
#[error("Cancelled")]
Cancelled,
#[error("ancestor is archived: {}", .0)]
HasArchivedParent(TimelineId),
@@ -617,7 +631,7 @@ pub enum TimelineArchivalError {
AlreadyInProgress,
#[error(transparent)]
Other(#[from] anyhow::Error),
Other(anyhow::Error),
}
impl Debug for TimelineArchivalError {
@@ -625,6 +639,7 @@ impl Debug for TimelineArchivalError {
match self {
Self::NotFound => write!(f, "NotFound"),
Self::Timeout => write!(f, "Timeout"),
Self::Cancelled => write!(f, "Cancelled"),
Self::HasArchivedParent(p) => f.debug_tuple("HasArchivedParent").field(p).finish(),
Self::HasUnarchivedChildren(c) => {
f.debug_tuple("HasUnarchivedChildren").field(c).finish()
@@ -784,7 +799,6 @@ impl Tenant {
index_part: Option<IndexPart>,
metadata: TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
last_aux_file_policy: Option<AuxFilePolicy>,
_ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_shard_id;
@@ -795,10 +809,6 @@ impl Tenant {
ancestor.clone(),
resources,
CreateTimelineCause::Load,
// This could be derived from ancestor branch + index part. Though the only caller of `timeline_init_and_sync` is `load_remote_timeline`,
// there will potentially be other caller of this function in the future, and we don't know whether `index_part` or `ancestor` takes precedence.
// Therefore, we pass this field explicitly for now, and remove it once we fully migrate to aux file v2.
last_aux_file_policy,
)?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
anyhow::ensure!(
@@ -813,10 +823,6 @@ impl Tenant {
if let Some(index_part) = index_part.as_ref() {
timeline.remote_client.init_upload_queue(index_part)?;
timeline
.last_aux_file_policy
.store(index_part.last_aux_file_policy());
} else {
// No data on the remote storage, but we have local metadata file. We can end up
// here with timeline_create being interrupted before finishing index part upload.
@@ -1387,15 +1393,12 @@ impl Tenant {
None
};
let last_aux_file_policy = index_part.last_aux_file_policy();
self.timeline_init_and_sync(
timeline_id,
resources,
Some(index_part),
remote_metadata,
ancestor,
last_aux_file_policy,
ctx,
)
.await
@@ -1538,8 +1541,10 @@ impl Tenant {
async fn unoffload_timeline(
self: &Arc<Self>,
timeline_id: TimelineId,
broker_client: storage_broker::BrokerClientChannel,
ctx: RequestContext,
) -> Result<Arc<Timeline>, TimelineArchivalError> {
info!("unoffloading timeline");
let cancel = self.cancel.clone();
let timeline_preload = self
.load_timeline_metadata(timeline_id, self.remote_storage.clone(), cancel)
@@ -1554,6 +1559,7 @@ impl Tenant {
error!(%timeline_id, "index_part not found on remote");
return Err(TimelineArchivalError::NotFound);
}
Err(DownloadError::Cancelled) => return Err(TimelineArchivalError::Cancelled),
Err(e) => {
// Some (possibly ephemeral) error happened during index_part download.
warn!(%timeline_id, "Failed to load index_part from remote storage, failed creation? ({e})");
@@ -1584,26 +1590,40 @@ impl Tenant {
"failed to load remote timeline {} for tenant {}",
timeline_id, self.tenant_shard_id
)
})?;
})
.map_err(TimelineArchivalError::Other)?;
let timelines = self.timelines.lock().unwrap();
if let Some(timeline) = timelines.get(&timeline_id) {
let mut offloaded_timelines = self.timelines_offloaded.lock().unwrap();
if offloaded_timelines.remove(&timeline_id).is_none() {
warn!("timeline already removed from offloaded timelines");
}
Ok(Arc::clone(timeline))
} else {
let Some(timeline) = timelines.get(&timeline_id) else {
warn!("timeline not available directly after attach");
Err(TimelineArchivalError::Other(anyhow::anyhow!(
return Err(TimelineArchivalError::Other(anyhow::anyhow!(
"timeline not available directly after attach"
)))
)));
};
let mut offloaded_timelines = self.timelines_offloaded.lock().unwrap();
if offloaded_timelines.remove(&timeline_id).is_none() {
warn!("timeline already removed from offloaded timelines");
}
// Activate the timeline (if it makes sense)
if !(timeline.is_broken() || timeline.is_stopping()) {
let background_jobs_can_start = None;
timeline.activate(
self.clone(),
broker_client.clone(),
background_jobs_can_start,
&ctx,
);
}
info!("timeline unoffloading complete");
Ok(Arc::clone(timeline))
}
pub(crate) async fn apply_timeline_archival_config(
self: &Arc<Self>,
timeline_id: TimelineId,
new_state: TimelineArchivalState,
broker_client: storage_broker::BrokerClientChannel,
ctx: RequestContext,
) -> Result<(), TimelineArchivalError> {
info!("setting timeline archival config");
@@ -1644,18 +1664,29 @@ impl Tenant {
Some(Arc::clone(timeline))
};
// Second part: unarchive timeline (if needed)
// Second part: unoffload timeline (if needed)
let timeline = if let Some(timeline) = timeline_or_unarchive_offloaded {
timeline
} else {
// Turn offloaded timeline into a non-offloaded one
self.unoffload_timeline(timeline_id, ctx).await?
self.unoffload_timeline(timeline_id, broker_client, ctx)
.await?
};
// Third part: upload new timeline archival state and block until it is present in S3
let upload_needed = timeline
let upload_needed = match timeline
.remote_client
.schedule_index_upload_for_timeline_archival_state(new_state)?;
.schedule_index_upload_for_timeline_archival_state(new_state)
{
Ok(upload_needed) => upload_needed,
Err(e) => {
if timeline.cancel.is_cancelled() {
return Err(TimelineArchivalError::Cancelled);
} else {
return Err(TimelineArchivalError::Other(e));
}
}
};
if upload_needed {
info!("Uploading new state");
@@ -1666,11 +1697,33 @@ impl Tenant {
tracing::warn!("reached timeout for waiting on upload queue");
return Err(TimelineArchivalError::Timeout);
};
v.map_err(|e| TimelineArchivalError::Other(anyhow::anyhow!(e)))?;
v.map_err(|e| match e {
WaitCompletionError::NotInitialized(e) => {
TimelineArchivalError::Other(anyhow::anyhow!(e))
}
WaitCompletionError::UploadQueueShutDownOrStopped => {
TimelineArchivalError::Cancelled
}
})?;
}
Ok(())
}
pub fn get_offloaded_timeline(
&self,
timeline_id: TimelineId,
) -> Result<Arc<OffloadedTimeline>, GetTimelineError> {
self.timelines_offloaded
.lock()
.unwrap()
.get(&timeline_id)
.map(Arc::clone)
.ok_or(GetTimelineError::NotFound {
tenant_id: self.tenant_shard_id,
timeline_id,
})
}
pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
self.tenant_shard_id
}
@@ -1758,7 +1811,6 @@ impl Tenant {
create_guard,
initdb_lsn,
None,
None,
)
.await
}
@@ -2121,7 +2173,8 @@ impl Tenant {
.iter()
.any(|(_id, tl)| tl.get_ancestor_timeline_id() == Some(*timeline_id))
};
let can_offload = can_offload && has_no_unoffloaded_children;
let can_offload =
can_offload && has_no_unoffloaded_children && self.conf.timeline_offloading;
if (is_active, can_offload) == (false, false) {
None
} else {
@@ -2206,6 +2259,13 @@ impl Tenant {
}
}
pub fn timeline_has_no_attached_children(&self, timeline_id: TimelineId) -> bool {
let timelines = self.timelines.lock().unwrap();
!timelines
.iter()
.any(|(_id, tl)| tl.get_ancestor_timeline_id() == Some(timeline_id))
}
pub fn current_state(&self) -> TenantState {
self.state.borrow().clone()
}
@@ -2253,12 +2313,13 @@ impl Tenant {
if activating {
let timelines_accessor = self.timelines.lock().unwrap();
let timelines_offloaded_accessor = self.timelines_offloaded.lock().unwrap();
let timelines_to_activate = timelines_accessor
.values()
.filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
// Before activation, populate each Timeline's GcInfo with information about its children
self.initialize_gc_info(&timelines_accessor);
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor);
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
@@ -2957,7 +3018,6 @@ impl Tenant {
ancestor: Option<Arc<Timeline>>,
resources: TimelineResources,
cause: CreateTimelineCause,
last_aux_file_policy: Option<AuxFilePolicy>,
) -> anyhow::Result<Arc<Timeline>> {
let state = match cause {
CreateTimelineCause::Load => {
@@ -2986,7 +3046,6 @@ impl Tenant {
resources,
pg_version,
state,
last_aux_file_policy,
self.attach_wal_lag_cooldown.clone(),
self.cancel.child_token(),
);
@@ -3294,10 +3353,11 @@ impl Tenant {
/// Populate all Timelines' `GcInfo` with information about their children. We do not set the
/// PITR cutoffs here, because that requires I/O: this is done later, before GC, by [`Self::refresh_gc_info_internal`]
///
/// Subsequently, parent-child relationships are updated incrementally during timeline creation/deletion.
/// Subsequently, parent-child relationships are updated incrementally inside [`Timeline::new`] and [`Timeline::drop`].
fn initialize_gc_info(
&self,
timelines: &std::sync::MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
timelines_offloaded: &std::sync::MutexGuard<HashMap<TimelineId, Arc<OffloadedTimeline>>>,
) {
// This function must be called before activation: after activation timeline create/delete operations
// might happen, and this function is not safe to run concurrently with those.
@@ -3305,20 +3365,37 @@ impl Tenant {
// Scan all timelines. For each timeline, remember the timeline ID and
// the branch point where it was created.
let mut all_branchpoints: BTreeMap<TimelineId, Vec<(Lsn, TimelineId)>> = BTreeMap::new();
let mut all_branchpoints: BTreeMap<TimelineId, Vec<(Lsn, TimelineId, MaybeOffloaded)>> =
BTreeMap::new();
timelines.iter().for_each(|(timeline_id, timeline_entry)| {
if let Some(ancestor_timeline_id) = &timeline_entry.get_ancestor_timeline_id() {
let ancestor_children = all_branchpoints.entry(*ancestor_timeline_id).or_default();
ancestor_children.push((timeline_entry.get_ancestor_lsn(), *timeline_id));
ancestor_children.push((
timeline_entry.get_ancestor_lsn(),
*timeline_id,
MaybeOffloaded::No,
));
}
});
timelines_offloaded
.iter()
.for_each(|(timeline_id, timeline_entry)| {
let Some(ancestor_timeline_id) = &timeline_entry.ancestor_timeline_id else {
return;
};
let Some(retain_lsn) = timeline_entry.ancestor_retain_lsn else {
return;
};
let ancestor_children = all_branchpoints.entry(*ancestor_timeline_id).or_default();
ancestor_children.push((retain_lsn, *timeline_id, MaybeOffloaded::Yes));
});
// The number of bytes we always keep, irrespective of PITR: this is a constant across timelines
let horizon = self.get_gc_horizon();
// Populate each timeline's GcInfo with information about its child branches
for timeline in timelines.values() {
let mut branchpoints: Vec<(Lsn, TimelineId)> = all_branchpoints
let mut branchpoints: Vec<(Lsn, TimelineId, MaybeOffloaded)> = all_branchpoints
.remove(&timeline.timeline_id)
.unwrap_or_default();
@@ -3627,7 +3704,6 @@ impl Tenant {
timeline_create_guard,
start_lsn + 1,
Some(Arc::clone(src_timeline)),
src_timeline.last_aux_file_policy.load(),
)
.await?;
@@ -3821,7 +3897,6 @@ impl Tenant {
timeline_create_guard,
pgdata_lsn,
None,
None,
)
.await?;
@@ -3893,7 +3968,6 @@ impl Tenant {
create_guard: TimelineCreateGuard<'a>,
start_lsn: Lsn,
ancestor: Option<Arc<Timeline>>,
last_aux_file_policy: Option<AuxFilePolicy>,
) -> anyhow::Result<UninitializedTimeline<'a>> {
let tenant_shard_id = self.tenant_shard_id;
@@ -3909,7 +3983,6 @@ impl Tenant {
ancestor,
resources,
CreateTimelineCause::Load,
last_aux_file_policy,
)
.context("Failed to create timeline data structure")?;
@@ -4507,7 +4580,6 @@ mod tests {
use super::*;
use crate::keyspace::KeySpaceAccum;
use crate::pgdatadir_mapping::AuxFilesDirectory;
use crate::repository::{Key, Value};
use crate::tenant::harness::*;
use crate::tenant::timeline::CompactFlags;
@@ -4516,7 +4588,7 @@ mod tests {
use bytes::{Bytes, BytesMut};
use hex_literal::hex;
use itertools::Itertools;
use pageserver_api::key::{AUX_FILES_KEY, AUX_KEY_PREFIX, NON_INHERITED_RANGE};
use pageserver_api::key::{AUX_KEY_PREFIX, NON_INHERITED_RANGE};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
use rand::{thread_rng, Rng};
@@ -4525,7 +4597,6 @@ mod tests {
use tests::timeline::{GetVectoredError, ShutdownMode};
use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn};
use timeline::{DeltaLayerTestDesc, GcInfo};
use utils::bin_ser::BeSer;
use utils::id::TenantId;
static TEST_KEY: Lazy<Key> =
@@ -4878,7 +4949,10 @@ mod tests {
{
let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
assert_eq!(branchpoints.len(), 1);
assert_eq!(branchpoints[0], (Lsn(0x40), NEW_TIMELINE_ID));
assert_eq!(
branchpoints[0],
(Lsn(0x40), NEW_TIMELINE_ID, MaybeOffloaded::No)
);
}
// You can read the key from the child branch even though the parent is
@@ -6326,16 +6400,9 @@ mod tests {
}
#[tokio::test]
async fn test_branch_copies_dirty_aux_file_flag() {
let harness = TenantHarness::create("test_branch_copies_dirty_aux_file_flag")
.await
.unwrap();
async fn test_aux_file_e2e() {
let harness = TenantHarness::create("test_aux_file_e2e").await.unwrap();
// the default aux file policy to switch is v2 if not set by the admins
assert_eq!(
harness.tenant_conf.switch_aux_file_policy,
AuxFilePolicy::default_tenant_config()
);
let (tenant, ctx) = harness.load().await;
let mut lsn = Lsn(0x08);
@@ -6345,9 +6412,6 @@ mod tests {
.await
.unwrap();
// no aux file is written at this point, so the persistent flag should be unset
assert_eq!(tline.last_aux_file_policy.load(), None);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
@@ -6358,30 +6422,6 @@ mod tests {
modification.commit(&ctx).await.unwrap();
}
// there is no tenant manager to pass the configuration through, so lets mimic it
tenant.set_new_location_config(
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt {
switch_aux_file_policy: Some(AuxFilePolicy::V2),
..Default::default()
},
tenant.generation,
&pageserver_api::models::ShardParameters::default(),
))
.unwrap(),
);
assert_eq!(
tline.get_switch_aux_file_policy(),
AuxFilePolicy::V2,
"wanted state has been updated"
);
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"aux file is written with switch_aux_file_policy unset (which is v2), so we should use v2 there"
);
// we can read everything from the storage
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
@@ -6399,12 +6439,6 @@ mod tests {
modification.commit(&ctx).await.unwrap();
}
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"keep v2 storage format when new files are written"
);
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test2"),
@@ -6416,321 +6450,9 @@ mod tests {
.await
.unwrap();
// child copies the last flag even if that is not on remote storage yet
assert_eq!(child.get_switch_aux_file_policy(), AuxFilePolicy::V2);
assert_eq!(child.last_aux_file_policy.load(), Some(AuxFilePolicy::V2));
let files = child.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(files.get("pg_logical/mappings/test1"), None);
assert_eq!(files.get("pg_logical/mappings/test2"), None);
// even if we crash here without flushing parent timeline with it's new
// last_aux_file_policy we are safe, because child was never meant to access ancestor's
// files. the ancestor can even switch back to V1 because of a migration safely.
}
#[tokio::test]
async fn aux_file_policy_switch() {
let mut harness = TenantHarness::create("aux_file_policy_switch")
.await
.unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::CrossValidation; // set to cross-validation mode
let (tenant, ctx) = harness.load().await;
let mut lsn = Lsn(0x08);
let tline: Arc<Timeline> = tenant
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
assert_eq!(
tline.last_aux_file_policy.load(),
None,
"no aux file is written so it should be unset"
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test1", b"first", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
// there is no tenant manager to pass the configuration through, so lets mimic it
tenant.set_new_location_config(
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt {
switch_aux_file_policy: Some(AuxFilePolicy::V2),
..Default::default()
},
tenant.generation,
&pageserver_api::models::ShardParameters::default(),
))
.unwrap(),
);
assert_eq!(
tline.get_switch_aux_file_policy(),
AuxFilePolicy::V2,
"wanted state has been updated"
);
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::CrossValidation),
"dirty index_part.json reflected state is yet to be updated"
);
// we can still read the auxfile v1 before we ingest anything new
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first"))
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test2", b"second", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"ingesting a file should apply the wanted switch state when applicable"
);
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first")),
"cross validation writes to both v1 and v2 so this should be available in v2"
);
assert_eq!(
files.get("pg_logical/mappings/test2"),
Some(&bytes::Bytes::from_static(b"second"))
);
// mimic again by trying to flip it from V2 to V1 (not switched to while ingesting a file)
tenant.set_new_location_config(
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt {
switch_aux_file_policy: Some(AuxFilePolicy::V1),
..Default::default()
},
tenant.generation,
&pageserver_api::models::ShardParameters::default(),
))
.unwrap(),
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test2", b"third", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
assert_eq!(
tline.get_switch_aux_file_policy(),
AuxFilePolicy::V1,
"wanted state has been updated again, even if invalid request"
);
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"ingesting a file should apply the wanted switch state when applicable"
);
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first"))
);
assert_eq!(
files.get("pg_logical/mappings/test2"),
Some(&bytes::Bytes::from_static(b"third"))
);
// mimic again by trying to flip it from from V1 to V2 (not switched to while ingesting a file)
tenant.set_new_location_config(
AttachedTenantConf::try_from(LocationConf::attached_single(
TenantConfOpt {
switch_aux_file_policy: Some(AuxFilePolicy::V2),
..Default::default()
},
tenant.generation,
&pageserver_api::models::ShardParameters::default(),
))
.unwrap(),
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test3", b"last", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
assert_eq!(tline.get_switch_aux_file_policy(), AuxFilePolicy::V2);
assert_eq!(tline.last_aux_file_policy.load(), Some(AuxFilePolicy::V2));
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first"))
);
assert_eq!(
files.get("pg_logical/mappings/test2"),
Some(&bytes::Bytes::from_static(b"third"))
);
assert_eq!(
files.get("pg_logical/mappings/test3"),
Some(&bytes::Bytes::from_static(b"last"))
);
}
#[tokio::test]
async fn aux_file_policy_force_switch() {
let mut harness = TenantHarness::create("aux_file_policy_force_switch")
.await
.unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V1;
let (tenant, ctx) = harness.load().await;
let mut lsn = Lsn(0x08);
let tline: Arc<Timeline> = tenant
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
assert_eq!(
tline.last_aux_file_policy.load(),
None,
"no aux file is written so it should be unset"
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test1", b"first", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
tline.do_switch_aux_policy(AuxFilePolicy::V2).unwrap();
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V2),
"dirty index_part.json reflected state is yet to be updated"
);
// lose all data from v1
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(files.get("pg_logical/mappings/test1"), None);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test2", b"second", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
// read data ingested in v2
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test2"),
Some(&bytes::Bytes::from_static(b"second"))
);
// lose all data from v1
assert_eq!(files.get("pg_logical/mappings/test1"), None);
}
#[tokio::test]
async fn aux_file_policy_auto_detect() {
let mut harness = TenantHarness::create("aux_file_policy_auto_detect")
.await
.unwrap();
harness.tenant_conf.switch_aux_file_policy = AuxFilePolicy::V2; // set to cross-validation mode
let (tenant, ctx) = harness.load().await;
let mut lsn = Lsn(0x08);
let tline: Arc<Timeline> = tenant
.create_test_timeline(TIMELINE_ID, lsn, DEFAULT_PG_VERSION, &ctx)
.await
.unwrap();
assert_eq!(
tline.last_aux_file_policy.load(),
None,
"no aux file is written so it should be unset"
);
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
files: vec![(
"test_file".to_string(),
Bytes::copy_from_slice(b"test_file"),
)]
.into_iter()
.collect(),
})
.unwrap();
modification.put_for_test(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
modification.commit(&ctx).await.unwrap();
}
{
lsn += 8;
let mut modification = tline.begin_modification(lsn);
modification
.put_file("pg_logical/mappings/test1", b"first", &ctx)
.await
.unwrap();
modification.commit(&ctx).await.unwrap();
}
assert_eq!(
tline.last_aux_file_policy.load(),
Some(AuxFilePolicy::V1),
"keep using v1 because there are aux files writting with v1"
);
// we can still read the auxfile v1
let files = tline.list_aux_files(lsn, &ctx).await.unwrap();
assert_eq!(
files.get("pg_logical/mappings/test1"),
Some(&bytes::Bytes::from_static(b"first"))
);
assert_eq!(
files.get("test_file"),
Some(&bytes::Bytes::from_static(b"test_file"))
);
}
#[tokio::test]
@@ -8261,8 +7983,8 @@ mod tests {
let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo {
retain_lsns: vec![
(Lsn(0x10), tline.timeline_id),
(Lsn(0x20), tline.timeline_id),
(Lsn(0x10), tline.timeline_id, MaybeOffloaded::No),
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
@@ -8489,8 +8211,8 @@ mod tests {
let mut guard = tline.gc_info.write().unwrap();
*guard = GcInfo {
retain_lsns: vec![
(Lsn(0x10), tline.timeline_id),
(Lsn(0x20), tline.timeline_id),
(Lsn(0x10), tline.timeline_id, MaybeOffloaded::No),
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
@@ -8723,7 +8445,7 @@ mod tests {
// Update GC info
let mut guard = parent_tline.gc_info.write().unwrap();
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x18), branch_tline.timeline_id)],
retain_lsns: vec![(Lsn(0x18), branch_tline.timeline_id, MaybeOffloaded::No)],
cutoffs: GcCutoffs {
time: Lsn(0x10),
space: Lsn(0x10),
@@ -8737,7 +8459,7 @@ mod tests {
// Update GC info
let mut guard = branch_tline.gc_info.write().unwrap();
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x40), branch_tline.timeline_id)],
retain_lsns: vec![(Lsn(0x40), branch_tline.timeline_id, MaybeOffloaded::No)],
cutoffs: GcCutoffs {
time: Lsn(0x50),
space: Lsn(0x50),

View File

@@ -52,13 +52,13 @@ impl From<PageReadGuard<'static>> for BlockLease<'static> {
}
#[cfg(test)]
impl<'a> From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'a> {
impl From<std::sync::Arc<[u8; PAGE_SZ]>> for BlockLease<'_> {
fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self {
BlockLease::Arc(value)
}
}
impl<'a> Deref for BlockLease<'a> {
impl Deref for BlockLease<'_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {

View File

@@ -131,7 +131,7 @@ struct OnDiskNode<'a, const L: usize> {
values: &'a [u8],
}
impl<'a, const L: usize> OnDiskNode<'a, L> {
impl<const L: usize> OnDiskNode<'_, L> {
///
/// Interpret a PAGE_SZ page as a node.
///

View File

@@ -11,6 +11,7 @@ use pageserver_api::shard::{
};
use pageserver_api::upcall_api::ReAttachResponseTenant;
use rand::{distributions::Alphanumeric, Rng};
use remote_storage::TimeoutOrCancel;
use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap, HashSet};
@@ -1350,47 +1351,17 @@ impl TenantManager {
}
}
async fn delete_tenant_remote(
&self,
tenant_shard_id: TenantShardId,
) -> Result<(), DeleteTenantError> {
let remote_path = remote_tenant_path(&tenant_shard_id);
let mut keys_stream = self.resources.remote_storage.list_streaming(
Some(&remote_path),
remote_storage::ListingMode::NoDelimiter,
None,
&self.cancel,
);
while let Some(chunk) = keys_stream.next().await {
let keys = match chunk {
Ok(listing) => listing.keys,
Err(remote_storage::DownloadError::Cancelled) => {
return Err(DeleteTenantError::Cancelled)
}
Err(remote_storage::DownloadError::NotFound) => return Ok(()),
Err(other) => return Err(DeleteTenantError::Other(anyhow::anyhow!(other))),
};
if keys.is_empty() {
tracing::info!("Remote storage already deleted");
} else {
tracing::info!("Deleting {} keys from remote storage", keys.len());
let keys = keys.into_iter().map(|o| o.key).collect::<Vec<_>>();
self.resources
.remote_storage
.delete_objects(&keys, &self.cancel)
.await?;
}
}
Ok(())
}
/// If a tenant is attached, detach it. Then remove its data from remote storage.
///
/// A tenant is considered deleted once it is gone from remote storage. It is the caller's
/// responsibility to avoid trying to attach the tenant again or use it any way once deletion
/// has started: this operation is not atomic, and must be retried until it succeeds.
///
/// As a special case, if an unsharded tenant ID is given for a sharded tenant, it will remove
/// all tenant shards in remote storage (removing all paths with the tenant prefix). The storage
/// controller uses this to purge all remote tenant data, including any stale parent shards that
/// may remain after splits. Ideally, this special case would be handled elsewhere. See:
/// <https://github.com/neondatabase/neon/pull/9394>.
pub(crate) async fn delete_tenant(
&self,
tenant_shard_id: TenantShardId,
@@ -1442,25 +1413,29 @@ impl TenantManager {
// in 500 responses to delete requests.
// - We keep the `SlotGuard` during this I/O, so that if a concurrent delete request comes in, it will
// 503/retry, rather than kicking off a wasteful concurrent deletion.
match backoff::retry(
|| async move { self.delete_tenant_remote(tenant_shard_id).await },
|e| match e {
DeleteTenantError::Cancelled => true,
DeleteTenantError::SlotError(_) => {
unreachable!("Remote deletion doesn't touch slots")
}
_ => false,
// NB: this also deletes partial prefixes, i.e. a <tenant_id> path will delete all
// <tenant_id>_<shard_id>/* objects. See method comment for why.
backoff::retry(
|| async move {
self.resources
.remote_storage
.delete_prefix(&remote_tenant_path(&tenant_shard_id), &self.cancel)
.await
},
|_| false, // backoff::retry handles cancellation
1,
3,
&format!("delete_tenant[tenant_shard_id={tenant_shard_id}]"),
&self.cancel,
)
.await
{
Some(r) => r,
None => Err(DeleteTenantError::Cancelled),
}
.unwrap_or(Err(TimeoutOrCancel::Cancel.into()))
.map_err(|err| {
if TimeoutOrCancel::caused_by_cancel(&err) {
return DeleteTenantError::Cancelled;
}
DeleteTenantError::Other(err)
})
}
#[instrument(skip_all, fields(tenant_id=%tenant.get_tenant_shard_id().tenant_id, shard_id=%tenant.get_tenant_shard_id().shard_slug(), new_shard_count=%new_shard_count.literal()))]

View File

@@ -187,7 +187,7 @@ use camino::Utf8Path;
use chrono::{NaiveDateTime, Utc};
pub(crate) use download::download_initdb_tar_zst;
use pageserver_api::models::{AuxFilePolicy, TimelineArchivalState};
use pageserver_api::models::TimelineArchivalState;
use pageserver_api::shard::{ShardIndex, TenantShardId};
use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
@@ -505,7 +505,7 @@ impl RemoteTimelineClient {
},
);
let (index_part, _index_generation) = download::download_index_part(
let (index_part, index_generation, index_last_modified) = download::download_index_part(
&self.storage_impl,
&self.tenant_shard_id,
&self.timeline_id,
@@ -519,6 +519,49 @@ impl RemoteTimelineClient {
)
.await?;
// Defense in depth: monotonicity of generation numbers is an important correctness guarantee, so when we see a very
// old index, we do extra checks in case this is the result of backward time-travel of the generation number (e.g.
// in case of a bug in the service that issues generation numbers). Indices are allowed to be old, but we expect that
// when we load an old index we are loading the _latest_ index: if we are asked to load an old index and there is
// also a newer index available, that is surprising.
const INDEX_AGE_CHECKS_THRESHOLD: Duration = Duration::from_secs(14 * 24 * 3600);
let index_age = index_last_modified.elapsed().unwrap_or_else(|e| {
if e.duration() > Duration::from_secs(5) {
// We only warn if the S3 clock and our local clock are >5s out: because this is a low resolution
// timestamp, it is common to be out by at least 1 second.
tracing::warn!("Index has modification time in the future: {e}");
}
Duration::ZERO
});
if index_age > INDEX_AGE_CHECKS_THRESHOLD {
tracing::info!(
?index_generation,
age = index_age.as_secs_f64(),
"Loaded an old index, checking for other indices..."
);
// Find the highest-generation index
let (_latest_index_part, latest_index_generation, latest_index_mtime) =
download::download_index_part(
&self.storage_impl,
&self.tenant_shard_id,
&self.timeline_id,
Generation::MAX,
cancel,
)
.await?;
if latest_index_generation > index_generation {
// Unexpected! Why are we loading such an old index if a more recent one exists?
tracing::warn!(
?index_generation,
?latest_index_generation,
?latest_index_mtime,
"Found a newer index while loading an old one"
);
}
}
if index_part.deleted_at.is_some() {
Ok(MaybeDeletedIndexPart::Deleted(index_part))
} else {
@@ -628,18 +671,6 @@ impl RemoteTimelineClient {
Ok(())
}
/// Launch an index-file upload operation in the background, with only the `aux_file_policy` flag updated.
pub(crate) fn schedule_index_upload_for_aux_file_policy_update(
self: &Arc<Self>,
last_aux_file_policy: Option<AuxFilePolicy>,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
upload_queue.dirty.last_aux_file_policy = last_aux_file_policy;
self.schedule_index_upload(upload_queue)?;
Ok(())
}
/// Launch an index-file upload operation in the background, with only the `archived_at` field updated.
///
/// Returns whether it is required to wait for the queue to be empty to ensure that the change is uploaded,
@@ -2151,7 +2182,7 @@ pub(crate) struct UploadQueueAccessor<'a> {
inner: std::sync::MutexGuard<'a, UploadQueue>,
}
impl<'a> UploadQueueAccessor<'a> {
impl UploadQueueAccessor<'_> {
pub(crate) fn latest_uploaded_index_part(&self) -> &IndexPart {
match &*self.inner {
UploadQueue::Initialized(x) => &x.clean.0,

View File

@@ -6,6 +6,7 @@
use std::collections::HashSet;
use std::future::Future;
use std::str::FromStr;
use std::time::SystemTime;
use anyhow::{anyhow, Context};
use camino::{Utf8Path, Utf8PathBuf};
@@ -343,10 +344,10 @@ async fn do_download_index_part(
timeline_id: &TimelineId,
index_generation: Generation,
cancel: &CancellationToken,
) -> Result<(IndexPart, Generation), DownloadError> {
) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
let remote_path = remote_index_path(tenant_shard_id, timeline_id, index_generation);
let index_part_bytes = download_retry_forever(
let (index_part_bytes, index_part_mtime) = download_retry_forever(
|| async {
let download = storage
.download(&remote_path, &DownloadOpts::default(), cancel)
@@ -359,7 +360,7 @@ async fn do_download_index_part(
tokio::io::copy_buf(&mut stream, &mut bytes).await?;
Ok(bytes)
Ok((bytes, download.last_modified))
},
&format!("download {remote_path:?}"),
cancel,
@@ -370,7 +371,7 @@ async fn do_download_index_part(
.with_context(|| format!("deserialize index part file at {remote_path:?}"))
.map_err(DownloadError::Other)?;
Ok((index_part, index_generation))
Ok((index_part, index_generation, index_part_mtime))
}
/// index_part.json objects are suffixed with a generation number, so we cannot
@@ -385,7 +386,7 @@ pub(crate) async fn download_index_part(
timeline_id: &TimelineId,
my_generation: Generation,
cancel: &CancellationToken,
) -> Result<(IndexPart, Generation), DownloadError> {
) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();
if my_generation.is_none() {

View File

@@ -133,10 +133,6 @@ impl IndexPart {
pub(crate) fn example() -> Self {
Self::empty(TimelineMetadata::example())
}
pub(crate) fn last_aux_file_policy(&self) -> Option<AuxFilePolicy> {
self.last_aux_file_policy
}
}
/// Metadata gathered for each of the layer files.

View File

@@ -108,7 +108,6 @@ impl scheduler::Completion for WriteComplete {
/// when we last did a write. We only populate this after doing at least one
/// write for a tenant -- this avoids holding state for tenants that have
/// uploads disabled.
struct UploaderTenantState {
// This Weak only exists to enable culling idle instances of this type
// when the Tenant has been deallocated.

View File

@@ -12,7 +12,7 @@ use crate::context::RequestContext;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use super::{GcError, LogicalSizeCalculationCause, Tenant};
use crate::tenant::Timeline;
use crate::tenant::{MaybeOffloaded, Timeline};
use utils::id::TimelineId;
use utils::lsn::Lsn;
@@ -264,10 +264,12 @@ pub(super) async fn gather_inputs(
let mut lsns: Vec<(Lsn, LsnKind)> = gc_info
.retain_lsns
.iter()
.filter(|(lsn, _child_id)| lsn > &ancestor_lsn)
.filter(|(lsn, _child_id, is_offloaded)| {
lsn > &ancestor_lsn && *is_offloaded == MaybeOffloaded::No
})
.copied()
// this assumes there are no other retain_lsns than the branchpoints
.map(|(lsn, _child_id)| (lsn, LsnKind::BranchPoint))
.map(|(lsn, _child_id, _is_offloaded)| (lsn, LsnKind::BranchPoint))
.collect::<Vec<_>>();
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));

View File

@@ -705,7 +705,7 @@ pub mod tests {
/// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.
struct RangeDisplayDebug<'a, T: std::fmt::Display>(&'a Range<T>);
impl<'a, T: std::fmt::Display> std::fmt::Debug for RangeDisplayDebug<'a, T> {
impl<T: std::fmt::Display> std::fmt::Debug for RangeDisplayDebug<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}..{}", self.0.start, self.0.end)
}

View File

@@ -529,8 +529,7 @@ impl DeltaLayerWriterInner {
key_end: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
let mut file = self.blob_writer.into_inner(ctx).await?;

View File

@@ -828,8 +828,7 @@ impl ImageLayerWriterInner {
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
// Calculate compression ratio
let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header

View File

@@ -341,6 +341,10 @@ impl Layer {
Ok(())
}
pub(crate) async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
self.0.needs_download().await
}
/// Assuming the layer is already downloaded, returns a guard which will prohibit eviction
/// while the guard exists.
///
@@ -974,7 +978,7 @@ impl LayerInner {
let timeline = self
.timeline
.upgrade()
.ok_or_else(|| DownloadError::TimelineShutdown)?;
.ok_or(DownloadError::TimelineShutdown)?;
// count cancellations, which currently remain largely unexpected
let init_cancelled = scopeguard::guard((), |_| LAYER_IMPL_METRICS.inc_init_cancelled());

View File

@@ -339,7 +339,7 @@ impl<'de> serde::Deserialize<'de> for LayerName {
struct LayerNameVisitor;
impl<'de> serde::de::Visitor<'de> for LayerNameVisitor {
impl serde::de::Visitor<'_> for LayerNameVisitor {
type Value = LayerName;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {

View File

@@ -99,21 +99,21 @@ impl<'a> PeekableLayerIterRef<'a> {
}
}
impl<'a> std::cmp::PartialEq for IteratorWrapper<'a> {
impl std::cmp::PartialEq for IteratorWrapper<'_> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<'a> std::cmp::Eq for IteratorWrapper<'a> {}
impl std::cmp::Eq for IteratorWrapper<'_> {}
impl<'a> std::cmp::PartialOrd for IteratorWrapper<'a> {
impl std::cmp::PartialOrd for IteratorWrapper<'_> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<'a> std::cmp::Ord for IteratorWrapper<'a> {
impl std::cmp::Ord for IteratorWrapper<'_> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
use std::cmp::Ordering;
let a = self.peek_next_key_lsn_value();

View File

@@ -28,9 +28,9 @@ use pageserver_api::{
},
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
models::{
AtomicAuxFilePolicy, AuxFilePolicy, CompactionAlgorithm, CompactionAlgorithmSettings,
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState,
CompactionAlgorithm, CompactionAlgorithmSettings, DownloadRemoteLayersTaskInfo,
DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, InMemoryLayerInfo, LayerMapInfo,
LsnLease, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, ShardNumber, TenantShardId},
@@ -98,12 +98,12 @@ use crate::{
use crate::{
metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize,
};
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
use crate::{pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::storage_layer::PersistentLayerKey};
use crate::{
pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind},
pgdatadir_mapping::DirectoryKind,
virtual_file::{MaybeFatalIo, VirtualFile},
};
use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
use crate::{pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS, tenant::storage_layer::PersistentLayerKey};
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
use crate::config::PageServerConf;
@@ -139,8 +139,10 @@ use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::{
config::TenantConf, storage_layer::inmemory_layer, storage_layer::LayerVisibilityHint,
config::TenantConf,
storage_layer::{inmemory_layer, LayerVisibilityHint},
upload_queue::NotInitialized,
MaybeOffloaded,
};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
@@ -204,11 +206,6 @@ pub struct TimelineResources {
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
}
pub(crate) struct AuxFilesState {
pub(crate) dir: Option<AuxFilesDirectory>,
pub(crate) n_deltas: usize,
}
/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL
/// ingestion considerably, because WAL ingestion needs to check on most records if the record
/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end
@@ -411,15 +408,9 @@ pub struct Timeline {
timeline_get_throttle:
Arc<crate::tenant::throttle::Throttle<crate::metrics::tenant_throttling::TimelineGet>>,
/// Keep aux directory cache to avoid it's reconstruction on each update
pub(crate) aux_files: tokio::sync::Mutex<AuxFilesState>,
/// Size estimator for aux file v2
pub(crate) aux_file_size_estimator: AuxFileSizeEstimator,
/// Indicate whether aux file v2 storage is enabled.
pub(crate) last_aux_file_policy: AtomicAuxFilePolicy,
/// Some test cases directly place keys into the timeline without actually modifying the directory
/// keys (i.e., DB_DIR). The test cases creating such keys will put the keyspaces here, so that
/// these keys won't get garbage-collected during compaction/GC. This field only modifies the dense
@@ -450,7 +441,7 @@ pub(crate) struct GcInfo {
/// Currently, this includes all points where child branches have
/// been forked off from. In the future, could also include
/// explicit user-defined snapshot points.
pub(crate) retain_lsns: Vec<(Lsn, TimelineId)>,
pub(crate) retain_lsns: Vec<(Lsn, TimelineId, MaybeOffloaded)>,
/// The cutoff coordinates, which are combined by selecting the minimum.
pub(crate) cutoffs: GcCutoffs,
@@ -467,8 +458,13 @@ impl GcInfo {
self.cutoffs.select_min()
}
pub(super) fn insert_child(&mut self, child_id: TimelineId, child_lsn: Lsn) {
self.retain_lsns.push((child_lsn, child_id));
pub(super) fn insert_child(
&mut self,
child_id: TimelineId,
child_lsn: Lsn,
is_offloaded: MaybeOffloaded,
) {
self.retain_lsns.push((child_lsn, child_id, is_offloaded));
self.retain_lsns.sort_by_key(|i| i.0);
}
@@ -1558,6 +1554,7 @@ impl Timeline {
}
/// Checks if the internal state of the timeline is consistent with it being able to be offloaded.
///
/// This is neccessary but not sufficient for offloading of the timeline as it might have
/// child timelines that are not offloaded yet.
pub(crate) fn can_offload(&self) -> bool {
@@ -2004,14 +2001,6 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length_for_ts)
}
pub(crate) fn get_switch_aux_file_policy(&self) -> AuxFilePolicy {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.switch_aux_file_policy
.unwrap_or(self.conf.default_tenant_conf.switch_aux_file_policy)
}
pub(crate) fn get_lazy_slru_download(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -2144,7 +2133,6 @@ impl Timeline {
resources: TimelineResources,
pg_version: u32,
state: TimelineState,
aux_file_policy: Option<AuxFilePolicy>,
attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
cancel: CancellationToken,
) -> Arc<Self> {
@@ -2164,7 +2152,9 @@ impl Timeline {
if let Some(ancestor) = &ancestor {
let mut ancestor_gc_info = ancestor.gc_info.write().unwrap();
ancestor_gc_info.insert_child(timeline_id, metadata.ancestor_lsn());
// If we construct an explicit timeline object, it's obviously not offloaded
let is_offloaded = MaybeOffloaded::No;
ancestor_gc_info.insert_child(timeline_id, metadata.ancestor_lsn(), is_offloaded);
}
Arc::new_cyclic(|myself| {
@@ -2272,15 +2262,8 @@ impl Timeline {
timeline_get_throttle: resources.timeline_get_throttle,
aux_files: tokio::sync::Mutex::new(AuxFilesState {
dir: None,
n_deltas: 0,
}),
aux_file_size_estimator: AuxFileSizeEstimator::new(aux_file_metrics),
last_aux_file_policy: AtomicAuxFilePolicy::new(aux_file_policy),
#[cfg(test)]
extra_test_dense_keyspace: ArcSwap::new(Arc::new(KeySpace::default())),
@@ -2291,10 +2274,6 @@ impl Timeline {
attach_wal_lag_cooldown,
};
if aux_file_policy == Some(AuxFilePolicy::V1) {
warn!("this timeline is using deprecated aux file policy V1 (when loading the timeline)");
}
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
@@ -3083,7 +3062,6 @@ impl Timeline {
}
impl Timeline {
#[allow(unknown_lints)] // doc_lazy_continuation is still a new lint
#[allow(clippy::doc_lazy_continuation)]
/// Get the data needed to reconstruct all keys in the provided keyspace
///
@@ -4470,14 +4448,6 @@ impl Timeline {
) -> Result<(), detach_ancestor::Error> {
detach_ancestor::complete(self, tenant, attempt, ctx).await
}
/// Switch aux file policy and schedule upload to the index part.
pub(crate) fn do_switch_aux_policy(&self, policy: AuxFilePolicy) -> anyhow::Result<()> {
self.last_aux_file_policy.store(Some(policy));
self.remote_client
.schedule_index_upload_for_aux_file_policy_update(Some(policy))?;
Ok(())
}
}
impl Drop for Timeline {
@@ -4875,7 +4845,7 @@ impl Timeline {
let retain_lsns = gc_info
.retain_lsns
.iter()
.map(|(lsn, _child_id)| *lsn)
.map(|(lsn, _child_id, _is_offloaded)| *lsn)
.collect();
// Gets the maximum LSN that holds the valid lease.

View File

@@ -29,6 +29,7 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::filter_iterator::FilterIterator;
@@ -42,7 +43,7 @@ use crate::tenant::storage_layer::{
use crate::tenant::timeline::ImageLayerCreationOutcome;
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Layer, ResidentLayer};
use crate::tenant::DeltaLayer;
use crate::tenant::{DeltaLayer, MaybeOffloaded};
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
use pageserver_api::config::tenant_conf_defaults::{
DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD,
@@ -639,7 +640,10 @@ impl Timeline {
let children = self.gc_info.read().unwrap().retain_lsns.clone();
let mut readable_points = Vec::with_capacity(children.len() + 1);
for (child_lsn, _child_timeline_id) in &children {
for (child_lsn, _child_timeline_id, is_offloaded) in &children {
if *is_offloaded == MaybeOffloaded::Yes {
continue;
}
readable_points.push(*child_lsn);
}
readable_points.push(head_lsn);
@@ -1688,6 +1692,45 @@ impl Timeline {
unreachable!("key retention is empty")
}
/// Check how much space is left on the disk
async fn check_available_space(self: &Arc<Self>) -> anyhow::Result<u64> {
let tenants_dir = self.conf.tenants_path();
let stat = Statvfs::get(&tenants_dir, None)
.context("statvfs failed, presumably directory got unlinked")?;
let (avail_bytes, _) = stat.get_avail_total_bytes();
Ok(avail_bytes)
}
/// Check if the compaction can proceed safely without running out of space. We assume the size
/// upper bound of the produced files of a compaction job is the same as all layers involved in
/// the compaction. Therefore, we need `2 * layers_to_be_compacted_size` at least to do a
/// compaction.
async fn check_compaction_space(
self: &Arc<Self>,
layer_selection: &[Layer],
) -> anyhow::Result<()> {
let available_space = self.check_available_space().await?;
let mut remote_layer_size = 0;
let mut all_layer_size = 0;
for layer in layer_selection {
let needs_download = layer.needs_download().await?;
if needs_download.is_some() {
remote_layer_size += layer.layer_desc().file_size;
}
all_layer_size += layer.layer_desc().file_size;
}
let allocated_space = (available_space as f64 * 0.8) as u64; /* reserve 20% space for other tasks */
if all_layer_size /* space needed for newly-generated file */ + remote_layer_size /* space for downloading layers */ > allocated_space
{
return Err(anyhow!("not enough space for compaction: available_space={}, allocated_space={}, all_layer_size={}, remote_layer_size={}, required_space={}",
available_space, allocated_space, all_layer_size, remote_layer_size, all_layer_size + remote_layer_size));
}
Ok(())
}
/// An experimental compaction building block that combines compaction with garbage collection.
///
/// The current implementation picks all delta + image layers that are below or intersecting with
@@ -1741,7 +1784,7 @@ impl Timeline {
let gc_info = self.gc_info.read().unwrap();
let mut retain_lsns_below_horizon = Vec::new();
let gc_cutoff = gc_info.cutoffs.select_min();
for (lsn, _timeline_id) in &gc_info.retain_lsns {
for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
if lsn < &gc_cutoff {
retain_lsns_below_horizon.push(*lsn);
}
@@ -1803,6 +1846,8 @@ impl Timeline {
lowest_retain_lsn
);
self.check_compaction_space(&layer_selection).await?;
// Step 1: (In the future) construct a k-merge iterator over all layers. For now, simply collect all keys + LSNs.
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)

View File

@@ -283,8 +283,6 @@ impl DeleteTimelineFlow {
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.
CreateTimelineCause::Delete,
// Aux file policy is not needed for deletion, assuming deletion does not read aux keyspace
None,
)
.context("create_timeline_struct")?;

View File

@@ -19,6 +19,9 @@ pub(crate) async fn offload_timeline(
return Ok(());
};
// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
timeline.shutdown(super::ShutdownMode::Hard).await;
// TODO extend guard mechanism above with method
// to make deletions possible while offloading is in progress

View File

@@ -74,7 +74,7 @@ impl<'a> BufView<'a> {
}
}
impl<'a> Deref for BufView<'a> {
impl Deref for BufView<'_> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
@@ -85,7 +85,7 @@ impl<'a> Deref for BufView<'a> {
}
}
impl<'a> AsRef<[u8]> for BufView<'a> {
impl AsRef<[u8]> for BufView<'_> {
fn as_ref(&self) -> &[u8] {
match self {
BufView::Slice(slice) => slice,
@@ -197,11 +197,6 @@ pub(crate) struct ChunkedVectoredReadBuilder {
max_read_size: Option<usize>,
}
/// Computes x / d rounded up.
fn div_round_up(x: usize, d: usize) -> usize {
(x + (d - 1)) / d
}
impl ChunkedVectoredReadBuilder {
const CHUNK_SIZE: usize = virtual_file::get_io_buffer_alignment();
/// Start building a new vectored read.
@@ -221,7 +216,7 @@ impl ChunkedVectoredReadBuilder {
.expect("First insertion always succeeds");
let start_blk_no = start_offset as usize / Self::CHUNK_SIZE;
let end_blk_no = div_round_up(end_offset as usize, Self::CHUNK_SIZE);
let end_blk_no = (end_offset as usize).div_ceil(Self::CHUNK_SIZE);
Self {
start_blk_no,
end_blk_no,
@@ -249,7 +244,7 @@ impl ChunkedVectoredReadBuilder {
pub(crate) fn extend(&mut self, start: u64, end: u64, meta: BlobMeta) -> VectoredReadExtended {
tracing::trace!(start, end, "trying to extend");
let start_blk_no = start as usize / Self::CHUNK_SIZE;
let end_blk_no = div_round_up(end as usize, Self::CHUNK_SIZE);
let end_blk_no = (end as usize).div_ceil(Self::CHUNK_SIZE);
let not_limited_by_max_read_size = {
if let Some(max_read_size) = self.max_read_size {
@@ -976,12 +971,4 @@ mod tests {
round_trip_test_compressed(&blobs, true).await?;
Ok(())
}
#[test]
fn test_div_round_up() {
const CHUNK_SIZE: usize = 512;
assert_eq!(1, div_round_up(200, CHUNK_SIZE));
assert_eq!(1, div_round_up(CHUNK_SIZE, CHUNK_SIZE));
assert_eq!(2, div_round_up(CHUNK_SIZE + 1, CHUNK_SIZE));
}
}

View File

@@ -729,9 +729,9 @@ impl VirtualFileInner {
*handle_guard = handle;
return Ok(FileGuard {
Ok(FileGuard {
slot_guard: slot_guard.downgrade(),
});
})
}
pub fn remove(self) {

View File

@@ -1,8 +1,7 @@
use crate::pgdatadir_mapping::AuxFilesDirectory;
use crate::walrecord::NeonWalRecord;
use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, BytesMut};
use bytes::BytesMut;
use pageserver_api::key::Key;
use pageserver_api::reltag::SlruKind;
use postgres_ffi::pg_constants;
@@ -13,7 +12,6 @@ use postgres_ffi::v14::nonrelfile_utils::{
};
use postgres_ffi::BLCKSZ;
use tracing::*;
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
/// Can this request be served by neon redo functions
@@ -236,13 +234,9 @@ pub(crate) fn apply_in_neon(
LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
}
}
NeonWalRecord::AuxFile { file_path, content } => {
let mut dir = AuxFilesDirectory::des(page)?;
dir.upsert(file_path.clone(), content.clone());
page.clear();
let mut writer = page.writer();
dir.ser_into(&mut writer)?;
NeonWalRecord::AuxFile { .. } => {
// No-op: this record will never be created in aux v2.
warn!("AuxFile record should not be created in aux v2");
}
#[cfg(test)]
NeonWalRecord::Test {
@@ -250,6 +244,7 @@ pub(crate) fn apply_in_neon(
clear,
will_init,
} => {
use bytes::BufMut;
if *will_init {
assert!(*clear, "init record must be clear to ensure correctness");
}
@@ -261,59 +256,3 @@ pub(crate) fn apply_in_neon(
}
Ok(())
}
#[cfg(test)]
mod test {
use bytes::Bytes;
use pageserver_api::key::AUX_FILES_KEY;
use super::*;
use std::collections::HashMap;
/// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile
#[test]
fn apply_aux_file_deltas() -> anyhow::Result<()> {
let base_dir = AuxFilesDirectory {
files: HashMap::from([
("two".to_string(), Bytes::from_static(b"content0")),
("three".to_string(), Bytes::from_static(b"contentX")),
]),
};
let base_image = AuxFilesDirectory::ser(&base_dir)?;
let deltas = vec![
// Insert
NeonWalRecord::AuxFile {
file_path: "one".to_string(),
content: Some(Bytes::from_static(b"content1")),
},
// Update
NeonWalRecord::AuxFile {
file_path: "two".to_string(),
content: Some(Bytes::from_static(b"content99")),
},
// Delete
NeonWalRecord::AuxFile {
file_path: "three".to_string(),
content: None,
},
];
let file_path = AUX_FILES_KEY;
let mut page = BytesMut::from_iter(base_image);
for record in deltas {
apply_in_neon(&record, Lsn(8), file_path, &mut page)?;
}
let reconstructed = AuxFilesDirectory::des(&page)?;
let expect = HashMap::from([
("one".to_string(), Bytes::from_static(b"content1")),
("two".to_string(), Bytes::from_static(b"content99")),
]);
assert_eq!(reconstructed.files, expect);
Ok(())
}
}