Evict layers on demand (#3486)

Closes https://github.com/neondatabase/neon/issues/3439

Adds a set of commands to manipulate the layer map:
* dump the layer map contents
* evict the layer form the layer map (remove the local file, put the
remote layer instead in the layer map)
* download the layer (operation, reversing the eviction)

The commands will change later, when the statistics is added on top, so
the swagger schema is not adjusted.

The commands might have issues with big amount of layers: no pagination
is done for the dump command, eviction and download commands look for
the layer to evict/download by iterating all layers sequentially and
comparing the layer names.
For now, that seems to be tolerable ("big" number of layers is ~2_000)
and further experiments are needed.

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
Kirill Bulatov
2023-02-02 12:14:44 +02:00
committed by GitHub
parent f474495ba0
commit 2759f1a22e
11 changed files with 576 additions and 70 deletions

View File

@@ -227,6 +227,52 @@ pub struct TimelineInfo {
pub state: TimelineState,
}
#[derive(Debug, Clone, Serialize)]
pub struct LayerMapInfo {
pub in_memory_layers: Vec<InMemoryLayerInfo>,
pub historic_layers: Vec<HistoricLayerInfo>,
}
#[serde_as]
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind")]
pub enum InMemoryLayerInfo {
Open {
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
},
Frozen {
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
#[serde_as(as = "DisplayFromStr")]
lsn_end: Lsn,
},
}
#[serde_as]
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind")]
pub enum HistoricLayerInfo {
Delta {
layer_file_name: String,
layer_file_size: Option<u64>,
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
#[serde_as(as = "DisplayFromStr")]
lsn_end: Lsn,
remote: bool,
},
Image {
layer_file_name: String,
layer_file_size: Option<u64>,
#[serde_as(as = "DisplayFromStr")]
lsn_start: Lsn,
remote: bool,
},
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DownloadRemoteLayersTaskSpawnRequest {
pub max_concurrent_downloads: NonZeroUsize,

View File

@@ -7,7 +7,7 @@ use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::http::request::{must_get_query_param, parse_query_param};
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
@@ -317,10 +317,7 @@ async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response
let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = mgr::get_tenant(tenant_id, true)
.await
.and_then(|tenant| tenant.get_timeline(timeline_id, true))
.map_err(ApiError::NotFound)?;
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let result = timeline
.find_lsn_for_timestamp(timestamp_pg, &ctx)
.await
@@ -528,6 +525,62 @@ async fn tenant_size_handler(request: Request<Body>) -> Result<Response<Body>, A
)
}
async fn layer_map_info_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let layer_map_info = timeline.layer_map_info();
json_response(StatusCode::OK, layer_map_info)
}
async fn layer_download_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let layer_file_name = get_request_param(&request, "layer_file_name")?;
check_permission(&request, Some(tenant_id))?;
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let downloaded = timeline
.download_layer(layer_file_name)
.await
.map_err(ApiError::InternalServerError)?;
match downloaded {
Some(true) => json_response(StatusCode::OK, ()),
Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
None => json_response(
StatusCode::BAD_REQUEST,
format!("Layer {tenant_id}/{timeline_id}/{layer_file_name} not found"),
),
}
}
async fn evict_timeline_layer_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let layer_file_name = get_request_param(&request, "layer_file_name")?;
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let evicted = timeline
.evict_layer(layer_file_name)
.await
.map_err(ApiError::InternalServerError)?;
match evicted {
Some(true) => json_response(StatusCode::OK, ()),
Some(false) => json_response(StatusCode::NOT_MODIFIED, ()),
None => json_response(
StatusCode::BAD_REQUEST,
format!("Layer {tenant_id}/{timeline_id}/{layer_file_name} not found"),
),
}
}
// Helper function to standardize the error messages we produce on bad durations
//
// Intended to be used with anyhow's `with_context`, e.g.:
@@ -804,12 +857,7 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let tenant = mgr::get_tenant(tenant_id, true)
.await
.map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(ApiError::NotFound)?;
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
timeline
.freeze_and_flush()
.await
@@ -830,12 +878,7 @@ async fn timeline_download_remote_layers_handler_post(
let body: DownloadRemoteLayersTaskSpawnRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_id))?;
let tenant = mgr::get_tenant(tenant_id, true)
.await
.map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(ApiError::NotFound)?;
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
match timeline.spawn_download_all_remote_layers(body).await {
Ok(st) => json_response(StatusCode::ACCEPTED, st),
Err(st) => json_response(StatusCode::CONFLICT, st),
@@ -846,15 +889,10 @@ async fn timeline_download_remote_layers_handler_get(
request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let tenant = mgr::get_tenant(tenant_id, true)
.await
.map_err(ApiError::NotFound)?;
let timeline = tenant
.get_timeline(timeline_id, true)
.map_err(ApiError::NotFound)?;
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let info = timeline
.get_download_all_remote_layers_task_info()
.context("task never started since last pageserver process start")
@@ -862,6 +900,18 @@ async fn timeline_download_remote_layers_handler_get(
json_response(StatusCode::OK, info)
}
async fn active_timeline_of_active_tenant(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<Arc<Timeline>, ApiError> {
let tenant = mgr::get_tenant(tenant_id, true)
.await
.map_err(ApiError::NotFound)?;
tenant
.get_timeline(timeline_id, true)
.map_err(ApiError::NotFound)
}
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(
StatusCode::NOT_FOUND,
@@ -958,5 +1008,17 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_delete_handler,
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer",
layer_map_info_handler,
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
layer_download_handler,
)
.delete(
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
evict_timeline_layer_handler,
)
.any(handler_404))
}

View File

@@ -6,11 +6,13 @@ mod image_layer;
mod inmemory_layer;
mod remote_layer;
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::repository::{Key, Value};
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
use bytes::Bytes;
use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range;
use std::path::PathBuf;
use std::sync::Arc;
@@ -21,7 +23,7 @@ use utils::{
};
pub use delta_layer::{DeltaLayer, DeltaLayerWriter};
pub use filename::{DeltaFileName, ImageFileName, LayerFileName, PathOrConf};
pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
pub use image_layer::{ImageLayer, ImageLayerWriter};
pub use inmemory_layer::InMemoryLayer;
pub use remote_layer::RemoteLayer;
@@ -187,6 +189,8 @@ pub trait PersistentLayer: Layer {
/// Should not change over the lifetime of the layer object because
/// current_physical_size is computed as the som of this value.
fn file_size(&self) -> Option<u64>;
fn info(&self) -> HistoricLayerInfo;
}
pub fn downcast_remote_layer(
@@ -246,3 +250,16 @@ impl Layer for LayerDescriptor {
todo!()
}
}
/// Helper enum to hold a PageServerConf, or a path
///
/// This is used by DeltaLayer and ImageLayer. Normally, this holds a reference to the
/// global config, and paths to layer files are constructed using the tenant/timeline
/// path from the config. But in the 'pageserver_binutils' binary, we need to construct a Layer
/// struct for a file on disk, without having a page server running, so that we have no
/// config. In that case, we use the Path variant to hold the full path to the file on
/// disk.
enum PathOrConf {
Path(PathBuf),
Conf(&'static PageServerConf),
}

View File

@@ -37,6 +37,7 @@ use crate::virtual_file::VirtualFile;
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use pageserver_api::models::HistoricLayerInfo;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
@@ -417,6 +418,19 @@ impl PersistentLayer for DeltaLayer {
fn file_size(&self) -> Option<u64> {
Some(self.file_size)
}
fn info(&self) -> HistoricLayerInfo {
let layer_file_name = self.filename().file_name();
let lsn_range = self.get_lsn_range();
HistoricLayerInfo::Delta {
layer_file_name,
layer_file_size: Some(self.file_size),
lsn_start: lsn_range.start,
lsn_end: lsn_range.end,
remote: false,
}
}
}
impl DeltaLayer {

View File

@@ -1,12 +1,10 @@
//!
//! Helper functions for dealing with filenames of the image and delta layer files.
//!
use crate::config::PageServerConf;
use crate::repository::Key;
use std::cmp::Ordering;
use std::fmt;
use std::ops::Range;
use std::path::PathBuf;
use std::str::FromStr;
use utils::lsn::Lsn;
@@ -270,16 +268,3 @@ impl<'de> serde::de::Visitor<'de> for LayerFileNameVisitor {
v.parse().map_err(|e| E::custom(e))
}
}
/// Helper enum to hold a PageServerConf, or a path
///
/// This is used by DeltaLayer and ImageLayer. Normally, this holds a reference to the
/// global config, and paths to layer files are constructed using the tenant/timeline
/// path from the config. But in the 'pageserver_binutils' binary, we need to construct a Layer
/// struct for a file on disk, without having a page server running, so that we have no
/// config. In that case, we use the Path variant to hold the full path to the file on
/// disk.
pub enum PathOrConf {
Path(PathBuf),
Conf(&'static PageServerConf),
}

View File

@@ -34,6 +34,7 @@ use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use hex;
use pageserver_api::models::HistoricLayerInfo;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::fs::{self, File};
@@ -51,8 +52,8 @@ use utils::{
lsn::Lsn,
};
use super::filename::{ImageFileName, LayerFileName, PathOrConf};
use super::{Layer, LayerIter};
use super::filename::{ImageFileName, LayerFileName};
use super::{Layer, LayerIter, PathOrConf};
///
/// Header stored in the beginning of the file
@@ -235,6 +236,18 @@ impl PersistentLayer for ImageLayer {
fn file_size(&self) -> Option<u64> {
Some(self.file_size)
}
fn info(&self) -> HistoricLayerInfo {
let layer_file_name = self.filename().file_name();
let lsn_range = self.get_lsn_range();
HistoricLayerInfo::Image {
layer_file_name,
layer_file_size: Some(self.file_size),
lsn_start: lsn_range.start,
remote: false,
}
}
}
impl ImageLayer {

View File

@@ -13,6 +13,7 @@ use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
use crate::walrecord;
use anyhow::{ensure, Result};
use pageserver_api::models::InMemoryLayerInfo;
use std::cell::RefCell;
use std::collections::HashMap;
use tracing::*;
@@ -80,6 +81,16 @@ impl InMemoryLayer {
pub fn get_timeline_id(&self) -> TimelineId {
self.timeline_id
}
pub fn info(&self) -> InMemoryLayerInfo {
let lsn_start = self.start_lsn;
let lsn_end = self.inner.read().unwrap().end_lsn;
match lsn_end {
Some(lsn_end) => InMemoryLayerInfo::Frozen { lsn_start, lsn_end },
None => InMemoryLayerInfo::Open { lsn_start },
}
}
}
impl Layer for InMemoryLayer {

View File

@@ -7,6 +7,7 @@ use crate::repository::Key;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use anyhow::{bail, Result};
use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range;
use std::path::PathBuf;
use std::sync::Arc;
@@ -136,6 +137,28 @@ impl PersistentLayer for RemoteLayer {
fn file_size(&self) -> Option<u64> {
self.layer_metadata.file_size()
}
fn info(&self) -> HistoricLayerInfo {
let layer_file_name = self.filename().file_name();
let lsn_range = self.get_lsn_range();
if self.is_delta {
HistoricLayerInfo::Delta {
layer_file_name,
layer_file_size: self.layer_metadata.file_size(),
lsn_start: lsn_range.start,
lsn_end: lsn_range.end,
remote: true,
}
} else {
HistoricLayerInfo::Image {
layer_file_name,
layer_file_size: self.layer_metadata.file_size(),
lsn_start: lsn_range.start,
remote: true,
}
}
}
}
impl RemoteLayer {

View File

@@ -10,7 +10,7 @@ use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::models::{
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
DownloadRemoteLayersTaskState, TimelineState,
DownloadRemoteLayersTaskState, LayerMapInfo, TimelineState,
};
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
use tokio_util::sync::CancellationToken;
@@ -69,6 +69,7 @@ use crate::ZERO_PAGE;
use crate::{is_temporary, task_mgr};
use walreceiver::spawn_connection_manager_task;
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::storage_layer::{DeltaLayer, ImageLayer, Layer};
@@ -91,7 +92,7 @@ pub struct Timeline {
pub pg_version: u32,
pub layers: RwLock<LayerMap<dyn PersistentLayer>>,
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
last_freeze_at: AtomicLsn,
// Atomic would be more appropriate here.
@@ -663,7 +664,7 @@ impl Timeline {
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let _layer_removal_cs = self.layer_removal_cs.lock().await;
let layer_removal_cs = self.layer_removal_cs.lock().await;
// Is the timeline being deleted?
let state = *self.state.borrow();
if state == TimelineState::Stopping {
@@ -696,7 +697,8 @@ impl Timeline {
// 3. Compact
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size, ctx).await?;
self.compact_level0(&layer_removal_cs, target_file_size, ctx)
.await?;
timer.stop_and_record();
// If `create_image_layers' or `compact_level0` scheduled any
@@ -832,6 +834,85 @@ impl Timeline {
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
self.state.subscribe()
}
pub fn layer_map_info(&self) -> LayerMapInfo {
let layer_map = self.layers.read().unwrap();
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer {
in_memory_layers.push(open_layer.info());
}
for frozen_layer in &layer_map.frozen_layers {
in_memory_layers.push(frozen_layer.info());
}
let mut historic_layers = Vec::new();
for historic_layer in layer_map.iter_historic_layers() {
historic_layers.push(historic_layer.info());
}
LayerMapInfo {
in_memory_layers,
historic_layers,
}
}
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
if self.remote_client.is_none() {
return Ok(Some(false));
}
self.download_remote_layer(remote_layer).await?;
Ok(Some(true))
}
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
if local_layer.is_remote_layer() {
return Ok(Some(false));
}
let Some(remote_client) = &self.remote_client else { return Ok(Some(false)) };
// ensure the current layer is uploaded for sure
remote_client
.wait_completion()
.await
.context("wait for layer upload ops to complete")?;
let layer_metadata = LayerFileMetadata::new(
local_layer
.file_size()
.expect("Local layer should have a file size"),
);
let new_remote_layer = Arc::new(match local_layer.filename() {
LayerFileName::Image(image_name) => RemoteLayer::new_img(
self.tenant_id,
self.timeline_id,
&image_name,
&layer_metadata,
),
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
self.tenant_id,
self.timeline_id,
&delta_name,
&layer_metadata,
),
#[cfg(test)]
LayerFileName::Test(_) => unreachable!(),
});
let gc_lock = self.layer_removal_cs.lock().await;
let mut layers = self.layers.write().unwrap();
let mut updates = layers.batch_update();
self.delete_historic_layer(&gc_lock, local_layer, &mut updates)?;
updates.insert_historic(new_remote_layer);
updates.flush();
drop(layers);
drop(gc_lock);
Ok(Some(true))
}
}
// Private functions
@@ -1621,6 +1702,43 @@ impl Timeline {
Err(e) => error!("Failed to compute current logical size for metrics update: {e:?}"),
}
}
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
return Some(historic_layer);
}
}
None
}
/// Removes the layer from local FS (if present) and from memory.
/// Remote storage is not affected by this operation.
fn delete_historic_layer(
&self,
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
layer: Arc<dyn PersistentLayer>,
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
) -> anyhow::Result<()> {
let layer_size = layer.file_size();
layer.delete()?;
if let Some(layer_size) = layer_size {
self.metrics.resident_physical_size_gauge.sub(layer_size);
}
// TODO Removing from the bottom of the layer map is expensive.
// Maybe instead discard all layer map historic versions that
// won't be needed for page reconstruction for this timeline,
// and mark what we can't delete yet as deleted from the layer
// map index without actually rebuilding the index.
updates.remove_historic(layer);
Ok(())
}
}
type TraversalId = String;
@@ -2727,6 +2845,7 @@ impl Timeline {
///
async fn compact_level0(
&self,
layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
target_file_size: u64,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -2780,14 +2899,8 @@ impl Timeline {
// delete the old ones
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact {
if let Some(path) = l.local_path() {
self.metrics
.resident_physical_size_gauge
.sub(path.metadata()?.len());
}
layer_names_to_delete.push(l.filename());
l.delete()?;
updates.remove_historic(l);
self.delete_historic_layer(layer_removal_cs, l, &mut updates)?;
}
updates.flush();
drop(layers);
@@ -2907,7 +3020,7 @@ impl Timeline {
fail_point!("before-timeline-gc");
let _layer_removal_cs = self.layer_removal_cs.lock().await;
let layer_removal_cs = self.layer_removal_cs.lock().await;
// Is the timeline being deleted?
let state = *self.state.borrow();
if state == TimelineState::Stopping {
@@ -2926,7 +3039,13 @@ impl Timeline {
let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
let res = self
.gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
.gc_timeline(
&layer_removal_cs,
horizon_cutoff,
pitr_cutoff,
retain_lsns,
new_gc_cutoff,
)
.instrument(
info_span!("gc_timeline", timeline = %self.timeline_id, cutoff = %new_gc_cutoff),
)
@@ -2940,6 +3059,7 @@ impl Timeline {
async fn gc_timeline(
&self,
layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
horizon_cutoff: Lsn,
pitr_cutoff: Lsn,
retain_lsns: Vec<Lsn>,
@@ -3095,22 +3215,12 @@ impl Timeline {
// (couldn't do this in the loop above, because you cannot modify a collection
// while iterating it. BTreeMap::retain() would be another option)
let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len());
for doomed_layer in layers_to_remove {
if let Some(path) = doomed_layer.local_path() {
self.metrics
.resident_physical_size_gauge
.sub(path.metadata()?.len());
{
for doomed_layer in layers_to_remove {
layer_names_to_delete.push(doomed_layer.filename());
self.delete_historic_layer(layer_removal_cs, doomed_layer, &mut updates)?; // FIXME: schedule succeeded deletions before returning?
result.layers_removed += 1;
}
layer_names_to_delete.push(doomed_layer.filename());
doomed_layer.delete()?; // FIXME: schedule succeeded deletions before returning?
// TODO Removing from the bottom of the layer map is expensive.
// Maybe instead discard all layer map historic versions that
// won't be needed for page reconstruction for this timeline,
// and mark what we can't delete yet as deleted from the layer
// map index without actually rebuilding the index.
updates.remove_historic(doomed_layer);
result.layers_removed += 1;
}
if result.layers_removed != 0 {

View File

@@ -1472,6 +1472,91 @@ class PageserverHttpClient(requests.Session):
assert len(relevant) == 1
return relevant[0].lstrip(name).strip()
def layer_map_info(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> LayerMapInfo:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/",
)
self.verbose_error(res)
return LayerMapInfo.from_json(res.json())
def download_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str):
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}",
)
self.verbose_error(res)
assert res.status_code == 200
def evict_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str):
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}",
)
self.verbose_error(res)
assert res.status_code == 200
@dataclass
class LayerMapInfo:
in_memory_layers: List[InMemoryLayerInfo]
historic_layers: List[HistoricLayerInfo]
@classmethod
def from_json(cls, d: Dict[str, Any]) -> LayerMapInfo:
info = LayerMapInfo(in_memory_layers=[], historic_layers=[])
json_in_memory_layers = d["in_memory_layers"]
assert isinstance(json_in_memory_layers, List)
for json_in_memory_layer in json_in_memory_layers:
info.in_memory_layers.append(InMemoryLayerInfo.from_json(json_in_memory_layer))
json_historic_layers = d["historic_layers"]
assert isinstance(json_historic_layers, List)
for json_historic_layer in json_historic_layers:
info.historic_layers.append(HistoricLayerInfo.from_json(json_historic_layer))
return info
@dataclass
class InMemoryLayerInfo:
kind: str
lsn_start: str
lsn_end: Optional[str]
@classmethod
def from_json(cls, d: Dict[str, Any]) -> InMemoryLayerInfo:
return InMemoryLayerInfo(
kind=d["kind"],
lsn_start=d["lsn_start"],
lsn_end=d.get("lsn_end"),
)
@dataclass
class HistoricLayerInfo:
kind: str
layer_file_name: str
layer_file_size: Optional[int]
lsn_start: str
lsn_end: Optional[str]
remote: bool
@classmethod
def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo:
return HistoricLayerInfo(
kind=d["kind"],
layer_file_name=d["layer_file_name"],
layer_file_size=d.get("layer_file_size"),
lsn_start=d["lsn_start"],
lsn_end=d.get("lsn_end"),
remote=d["remote"],
)
@dataclass
class PageserverPort:

View File

@@ -0,0 +1,140 @@
import pytest
from fixtures.neon_fixtures import (
NeonEnvBuilder,
RemoteStorageKind,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar
# Crates a few layers, ensures that we can evict them (removing locally but keeping track of them anyway)
# and then download them back.
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_basic_eviction(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_download_remote_layers_api",
)
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
pg = env.postgres.create_start("main")
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
# Create a number of layers in the tenant
with pg.cursor() as cur:
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 5000000) g
"""
)
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
client.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
timeline_path = env.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
initial_local_layers = sorted(
list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))
)
assert (
len(initial_local_layers) > 1
), f"Should create multiple layers for timeline, but got {initial_local_layers}"
# Compare layer map dump with the local layers, ensure everything's present locally and matches
initial_layer_map_info = client.layer_map_info(tenant_id=tenant_id, timeline_id=timeline_id)
assert (
not initial_layer_map_info.in_memory_layers
), "Should have no in memory layers after flushing"
assert len(initial_local_layers) == len(
initial_layer_map_info.historic_layers
), "Should have the same layers in memory and on disk"
for returned_layer in initial_layer_map_info.historic_layers:
assert (
returned_layer.kind == "Delta"
), f"Did not create and expect image layers, but got {returned_layer}"
assert (
not returned_layer.remote
), f"All created layers should be present locally, but got {returned_layer}"
local_layers = list(
filter(lambda layer: layer.name == returned_layer.layer_file_name, initial_local_layers)
)
assert (
len(local_layers) == 1
), f"Did not find returned layer {returned_layer} in local layers {initial_local_layers}"
local_layer = local_layers[0]
assert (
returned_layer.layer_file_size == local_layer.stat().st_size
), f"Returned layer {returned_layer} has a different file size than local layer {local_layer}"
# Detach all layers, ensre they are not in the local FS, but are still dumped as part of the layer map
for local_layer in initial_local_layers:
client.evict_layer(
tenant_id=tenant_id, timeline_id=timeline_id, layer_name=local_layer.name
)
assert not any(
new_local_layer.name == local_layer.name for new_local_layer in timeline_path.glob("*")
), f"Did not expect to find {local_layer} layer after evicting"
empty_layers = list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))
assert (
not empty_layers
), f"After evicting all layers, timeline {tenant_id}/{timeline_id} should have no layers locally, but got: {empty_layers}"
evicted_layer_map_info = client.layer_map_info(tenant_id=tenant_id, timeline_id=timeline_id)
assert (
not evicted_layer_map_info.in_memory_layers
), "Should have no in memory layers after flushing and evicting"
assert len(initial_local_layers) == len(
evicted_layer_map_info.historic_layers
), "Should have the same layers in memory and on disk initially"
for returned_layer in evicted_layer_map_info.historic_layers:
assert (
returned_layer.kind == "Delta"
), f"Did not create and expect image layers, but got {returned_layer}"
assert (
returned_layer.remote
), f"All layers should be evicted and not present locally, but got {returned_layer}"
assert any(
local_layer.name == returned_layer.layer_file_name
for local_layer in initial_local_layers
), f"Did not find returned layer {returned_layer} in local layers {initial_local_layers}"
# redownload all evicted layers and ensure the initial state is restored
for local_layer in initial_local_layers:
client.download_layer(
tenant_id=tenant_id, timeline_id=timeline_id, layer_name=local_layer.name
)
client.timeline_download_remote_layers(
tenant_id,
timeline_id,
# allow some concurrency to unveil potential concurrency bugs
max_concurrent_downloads=10,
errors_ok=False,
at_least_one_download=False,
)
redownloaded_layers = sorted(
list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))
)
assert (
redownloaded_layers == initial_local_layers
), "Should have the same layers locally after redownloading the evicted layers"
redownloaded_layer_map_info = client.layer_map_info(
tenant_id=tenant_id, timeline_id=timeline_id
)
assert (
redownloaded_layer_map_info == initial_layer_map_info
), "Should have the same layer map after redownloading the evicted layers"