Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Bayandin
c1e6132975 Downgrade allure-pytest to 2.13.1 from ^2.13.2 2023-06-12 11:18:21 +01:00
30 changed files with 1021 additions and 1334 deletions

7
Cargo.lock generated
View File

@@ -110,12 +110,6 @@ dependencies = [
"backtrace",
]
[[package]]
name = "arc-swap"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
[[package]]
name = "archery"
version = "0.5.0"
@@ -2548,7 +2542,6 @@ name = "pageserver"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"async-stream",
"async-trait",
"byteorder",

View File

@@ -32,7 +32,6 @@ license = "Apache-2.0"
## All dependency versions, used in the project
[workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-stream = "0.3"
async-trait = "0.1"
atty = "0.2.14"

View File

@@ -370,6 +370,11 @@ impl ComputeNode {
// 'Close' connection
drop(client);
info!(
"finished configuration of compute for project {}",
spec.cluster.cluster_id.as_deref().unwrap_or("None")
);
Ok(())
}
@@ -422,22 +427,22 @@ impl ComputeNode {
#[instrument(skip(self))]
pub fn start_compute(&self) -> Result<std::process::Child> {
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = compute_state.pspec.as_ref().expect("spec must be set");
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
pspec.tenant_id,
pspec.timeline_id,
spec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
spec.spec.operation_uuid.as_deref().unwrap_or("None"),
spec.tenant_id,
spec.timeline_id,
);
self.prepare_pgdata(&compute_state)?;
let start_time = Utc::now();
let pg = self.start_postgres(pspec.storage_auth_token.clone())?;
let pg = self.start_postgres(spec.storage_auth_token.clone())?;
if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates {
if spec.spec.mode == ComputeMode::Primary {
self.apply_config(&compute_state)?;
}
@@ -457,11 +462,6 @@ impl ComputeNode {
}
self.set_status(ComputeStatus::Running);
info!(
"finished configuration of compute for project {}",
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None")
);
Ok(pg)
}

View File

@@ -450,7 +450,6 @@ impl Endpoint {
// Create spec file
let spec = ComputeSpec {
skip_pg_catalog_updates: false,
format_version: 1.0,
operation_uuid: None,
cluster: Cluster {

View File

@@ -27,12 +27,6 @@ pub struct ComputeSpec {
pub cluster: Cluster,
pub delta_operations: Option<Vec<DeltaOp>>,
/// An optinal hint that can be passed to speed up startup time if we know
/// that no pg catalog mutations (like role creation, database creation,
/// extension creation) need to be done on the actual database to start.
#[serde(default)] // Default false
pub skip_pg_catalog_updates: bool,
// Information needed to connect to the storage layer.
//
// `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed.

View File

@@ -12,7 +12,6 @@ testing = ["fail/failpoints"]
[dependencies]
anyhow.workspace = true
arc-swap.workspace = true
async-stream.workspace = true
async-trait.workspace = true
byteorder.workspace = true

View File

@@ -1,23 +1,22 @@
use pageserver::keyspace::{KeyPartitioning, KeySpace};
use pageserver::repository::Key;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::storage_layer::{tests::LayerDescriptor, Layer, LayerFileName};
use pageserver::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
use pageserver::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName};
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use std::cmp::{max, min};
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
let mut layer_map = LayerMap::default();
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
let mut layer_map = LayerMap::<LayerDescriptor>::default();
let mut min_lsn = Lsn(u64::MAX);
let mut max_lsn = Lsn(0);
@@ -34,7 +33,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
min_lsn = min(min_lsn, lsn_range.start);
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
updates.insert_historic(layer.layer_desc().clone());
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer));
}
println!("min: {min_lsn}, max: {max_lsn}");
@@ -44,7 +43,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
}
/// Construct a layer map query pattern for benchmarks
fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> {
fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn)> {
// For each image layer we query one of the pages contained, at LSN right
// before the image layer was created. This gives us a somewhat uniform
// coverage of both the lsn and key space because image layers have
@@ -70,7 +69,7 @@ fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> {
// Construct a partitioning for testing get_difficulty map when we
// don't have an exact result of `collect_keyspace` to work with.
fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning {
fn uniform_key_partitioning(layer_map: &LayerMap<LayerDescriptor>, _lsn: Lsn) -> KeyPartitioning {
let mut parts = Vec::new();
// We add a partition boundary at the start of each image layer,
@@ -210,15 +209,13 @@ fn bench_sequential(c: &mut Criterion) {
for i in 0..100_000 {
let i32 = (i as u32) % 100;
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
let layer = LayerDescriptor::from(PersistentLayerDesc::new_img(
TenantId::generate(),
TimelineId::generate(),
zero.add(10 * i32)..zero.add(10 * i32 + 1),
Lsn(i),
false,
0,
));
updates.insert_historic(layer.layer_desc().clone());
let layer = LayerDescriptor {
key: zero.add(10 * i32)..zero.add(10 * i32 + 1),
lsn: Lsn(i)..Lsn(i + 1),
is_incremental: false,
short_id: format!("Layer {}", i),
};
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer));
}
updates.flush();
println!("Finished layer map init in {:?}", now.elapsed());

View File

@@ -516,7 +516,7 @@ async fn collect_eviction_candidates(
if !tl.is_active() {
continue;
}
let info = tl.get_local_layers_for_disk_usage_eviction().await;
let info = tl.get_local_layers_for_disk_usage_eviction();
debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
tenant_candidates.extend(
info.resident_layers

View File

@@ -215,7 +215,7 @@ async fn build_timeline_info(
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
let mut info = build_timeline_info_common(timeline, ctx).await?;
let mut info = build_timeline_info_common(timeline, ctx)?;
if include_non_incremental_logical_size {
// XXX we should be using spawn_ondemand_logical_size_calculation here.
// Otherwise, if someone deletes the timeline / detaches the tenant while
@@ -233,7 +233,7 @@ async fn build_timeline_info(
Ok(info)
}
async fn build_timeline_info_common(
fn build_timeline_info_common(
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<TimelineInfo> {
@@ -264,7 +264,7 @@ async fn build_timeline_info_common(
None
}
};
let current_physical_size = Some(timeline.layer_size_sum().await);
let current_physical_size = Some(timeline.layer_size_sum());
let state = timeline.current_state();
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
@@ -330,7 +330,6 @@ async fn timeline_create_handler(
Ok(Some(new_timeline)) => {
// Created. Construct a TimelineInfo for it.
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::CREATED, timeline_info)
}
@@ -592,7 +591,7 @@ async fn tenant_status(
// Calculate total physical size of all timelines
let mut current_physical_size = 0;
for timeline in tenant.list_timelines().iter() {
current_physical_size += timeline.layer_size_sum().await;
current_physical_size += timeline.layer_size_sum();
}
let state = tenant.current_state();
@@ -702,7 +701,7 @@ async fn layer_map_info_handler(
check_permission(&request, Some(tenant_id))?;
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let layer_map_info = timeline.layer_map_info(reset).await;
let layer_map_info = timeline.layer_map_info(reset);
json_response(StatusCode::OK, layer_map_info)
}

View File

@@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir(
{
pg_control = Some(control_file);
}
modification.flush().await?;
modification.flush()?;
}
}
// We're done importing all the data files.
modification.commit().await?;
modification.commit()?;
// We expect the Postgres server to be shut down cleanly.
let pg_control = pg_control.context("pg_control file not found")?;
@@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar(
// We found the pg_control file.
pg_control = Some(res);
}
modification.flush().await?;
modification.flush()?;
}
tokio_tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
@@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar(
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit().await?;
modification.commit()?;
Ok(())
}
@@ -594,7 +594,7 @@ async fn import_file(
// zenith.signal is not necessarily the last file, that we handle
// but it is ok to call `finish_write()`, because final `modification.commit()`
// will update lsn once more to the final one.
let writer = modification.tline.writer().await;
let writer = modification.tline.writer();
writer.finish_write(prev_lsn);
debug!("imported zenith signal {}", prev_lsn);

View File

@@ -1122,7 +1122,7 @@ impl<'a> DatadirModification<'a> {
/// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice.
pub async fn flush(&mut self) -> anyhow::Result<()> {
pub fn flush(&mut self) -> anyhow::Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks;
@@ -1130,20 +1130,19 @@ impl<'a> DatadirModification<'a> {
return Ok(());
}
let writer = self.tline.writer().await;
let writer = self.tline.writer();
// Flush relation and SLRU data blocks, keep metadata.
let mut retained_pending_updates = HashMap::new();
for (key, value) in self.pending_updates.drain() {
if is_rel_block_key(key) || is_slru_block_key(key) {
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
writer.put(key, self.lsn, &value).await?;
let mut result: anyhow::Result<()> = Ok(());
self.pending_updates.retain(|&key, value| {
if result.is_ok() && (is_rel_block_key(key) || is_slru_block_key(key)) {
result = writer.put(key, self.lsn, value);
false
} else {
retained_pending_updates.insert(key, value);
true
}
}
self.pending_updates.extend(retained_pending_updates);
});
result?;
if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
@@ -1158,17 +1157,17 @@ impl<'a> DatadirModification<'a> {
/// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub async fn commit(&mut self) -> anyhow::Result<()> {
let writer = self.tline.writer().await;
pub fn commit(&mut self) -> anyhow::Result<()> {
let writer = self.tline.writer();
let lsn = self.lsn;
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
for (key, value) in self.pending_updates.drain() {
writer.put(key, lsn, &value).await?;
writer.put(key, lsn, &value)?;
}
for key_range in self.pending_deletions.drain(..) {
writer.delete(key_range, lsn).await?;
writer.delete(key_range, lsn)?;
}
writer.finish_write(lsn);

View File

@@ -11,7 +11,7 @@
//! parent timeline, and the last LSN that has been written to disk.
//!
use anyhow::{bail, ensure, Context};
use anyhow::{bail, Context};
use futures::FutureExt;
use pageserver_api::models::TimelineState;
use remote_storage::DownloadError;
@@ -87,7 +87,6 @@ pub mod disk_btree;
pub(crate) mod ephemeral_file;
pub mod layer_map;
pub mod manifest;
pub mod layer_map_mgr;
pub mod metadata;
mod par_fsync;
@@ -187,13 +186,18 @@ struct TimelineUninitMark {
}
impl UninitializedTimeline<'_> {
/// Finish timeline creation: insert it into the Tenant's timelines map and remove the
/// uninit mark file.
/// Ensures timeline data is valid, loads it into pageserver's memory and removes
/// uninit mark file on success.
///
/// This function launches the flush loop if not already done.
///
/// The caller is responsible for activating the timeline (function `.activate()`).
fn finish_creation(mut self) -> anyhow::Result<Arc<Timeline>> {
fn initialize_with_lock(
mut self,
_ctx: &RequestContext,
timelines: &mut HashMap<TimelineId, Arc<Timeline>>,
load_layer_map: bool,
) -> anyhow::Result<Arc<Timeline>> {
let timeline_id = self.timeline_id;
let tenant_id = self.owning_tenant.tenant_id;
@@ -201,19 +205,25 @@ impl UninitializedTimeline<'_> {
format!("No timeline for initalization found for {tenant_id}/{timeline_id}")
})?;
// Check that the caller initialized disk_consistent_lsn
let new_disk_consistent_lsn = new_timeline.get_disk_consistent_lsn();
ensure!(
new_disk_consistent_lsn.is_valid(),
"new timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
);
// TODO it would be good to ensure that, but apparently a lot of our testing is dependend on that at least
// ensure!(new_disk_consistent_lsn.is_valid(),
// "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn and cannot be initialized");
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
match timelines.entry(timeline_id) {
Entry::Occupied(_) => anyhow::bail!(
"Found freshly initialized timeline {tenant_id}/{timeline_id} in the tenant map"
),
Entry::Vacant(v) => {
if load_layer_map {
new_timeline
.load_layer_map(new_disk_consistent_lsn)
.with_context(|| {
format!(
"Failed to load layermap for timeline {tenant_id}/{timeline_id}"
)
})?;
}
uninit_mark.remove_uninit_mark().with_context(|| {
format!(
"Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}"
@@ -242,10 +252,9 @@ impl UninitializedTimeline<'_> {
.await
.context("Failed to import basebackup")?;
// Flush the new layer files to disk, before we make the timeline as available to
// the outside world.
//
// Flush loop needs to be spawned in order to be able to flush.
// We want to run proper checkpoint before we mark timeline as available to outside world
// Thus spawning flush loop manually and skipping flush_loop setup in initialize_with_lock
raw_timeline.maybe_spawn_flush_loop();
fail::fail_point!("before-checkpoint-new-timeline", |_| {
@@ -257,9 +266,10 @@ impl UninitializedTimeline<'_> {
.await
.context("Failed to flush after basebackup import")?;
// All the data has been imported. Insert the Timeline into the tenant's timelines
// map and remove the uninit mark file.
let tl = self.finish_creation()?;
// Initialize without loading the layer map. We started with an empty layer map, and already
// updated it for the layers that we created during the import.
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
let tl = self.initialize_with_lock(ctx, &mut timelines, false)?;
tl.activate(broker_client, None, ctx);
Ok(tl)
}
@@ -302,6 +312,15 @@ fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) {
}
impl TimelineUninitMark {
/// Useful for initializing timelines, existing on disk after the restart.
pub fn dummy() -> Self {
Self {
uninit_mark_deleted: true,
uninit_mark_path: PathBuf::new(),
timeline_path: PathBuf::new(),
}
}
fn new(uninit_mark_path: PathBuf, timeline_path: PathBuf) -> Self {
Self {
uninit_mark_deleted: false,
@@ -495,7 +514,7 @@ impl Tenant {
ancestor: Option<Arc<Timeline>>,
first_save: bool,
init_order: Option<&InitializationOrder>,
_ctx: &RequestContext,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_id;
@@ -506,39 +525,54 @@ impl Tenant {
.context("merge_local_remote_metadata")?
.to_owned();
let timeline = self.create_timeline_struct(
timeline_id,
up_to_date_metadata,
ancestor.clone(),
remote_client,
init_order,
)?;
let new_disk_consistent_lsn = timeline.get_disk_consistent_lsn();
anyhow::ensure!(
new_disk_consistent_lsn.is_valid(),
"Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
);
timeline
.load_layer_map(new_disk_consistent_lsn)
.await
.with_context(|| {
format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
})?;
{
let timeline = {
// avoiding holding it across awaits
let mut timelines_accessor = self.timelines.lock().unwrap();
match timelines_accessor.entry(timeline_id) {
Entry::Occupied(_) => {
// The uninit mark file acts as a lock that prevents another task from
// initializing the timeline at the same time.
unreachable!(
"Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
);
}
Entry::Vacant(v) => {
v.insert(Arc::clone(&timeline));
timeline.maybe_spawn_flush_loop();
if timelines_accessor.contains_key(&timeline_id) {
anyhow::bail!(
"Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
);
}
let dummy_timeline = self.create_timeline_data(
timeline_id,
up_to_date_metadata,
ancestor.clone(),
remote_client,
init_order,
)?;
let timeline = UninitializedTimeline {
owning_tenant: self,
timeline_id,
raw_timeline: Some((dummy_timeline, TimelineUninitMark::dummy())),
};
// Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote
// But we shouldnt start walreceiver before we have all the data locally, because working walreceiver
// will ingest data which may require looking at the layers which are not yet available locally
match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true) {
Ok(new_timeline) => new_timeline,
Err(e) => {
error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}");
// FIXME using None is a hack, it wont hurt, just ugly.
// Ideally initialize_with_lock error should return timeline in the error
// Or return ownership of itself completely so somethin like into_broken
// can be called directly on Uninitielized timeline
// also leades to redundant .clone
let broken_timeline = self
.create_timeline_data(
timeline_id,
up_to_date_metadata,
ancestor.clone(),
None,
None,
)
.with_context(|| {
format!("creating broken timeline data for {tenant_id}/{timeline_id}")
})?;
broken_timeline.set_broken(e.to_string());
timelines_accessor.insert(timeline_id, broken_timeline);
return Err(e);
}
}
};
@@ -562,8 +596,7 @@ impl Tenant {
|| timeline
.layers
.read()
.await
.0
.unwrap()
.iter_historic_layers()
.next()
.is_some(),
@@ -1131,14 +1164,14 @@ impl Tenant {
.init_upload_queue_stopped_to_continue_deletion(&index_part)?;
let timeline = self
.create_timeline_struct(
.create_timeline_data(
timeline_id,
&local_metadata,
ancestor,
Some(remote_client),
init_order,
)
.context("create_timeline_struct")?;
.context("create_timeline_data")?;
let guard = Arc::clone(&timeline.delete_lock).lock_owned().await;
@@ -1264,8 +1297,6 @@ impl Tenant {
drop(timelines);
let new_metadata = TimelineMetadata::new(
// Initialize disk_consistent LSN to 0, The caller must import some data to
// make it valid, before calling finish_creation()
Lsn(0),
None,
None,
@@ -1274,11 +1305,11 @@ impl Tenant {
initdb_lsn,
pg_version,
);
self.prepare_new_timeline(
self.prepare_timeline(
new_timeline_id,
&new_metadata,
timeline_uninit_mark,
initdb_lsn,
true,
None,
)
}
@@ -1289,7 +1320,7 @@ impl Tenant {
// This makes the various functions which anyhow::ensure! for Active state work in tests.
// Our current tests don't need the background loops.
#[cfg(test)]
pub async fn create_test_timeline(
pub fn create_test_timeline(
&self,
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
@@ -1307,14 +1338,12 @@ impl Tenant {
.context("init_empty_test_timeline")?;
modification
.commit()
.await
.context("commit init_empty_test_timeline modification")?;
// Flush to disk so that uninit_tl's check for valid disk_consistent_lsn passes.
tline.maybe_spawn_flush_loop();
tline.freeze_and_flush().await.context("freeze_and_flush")?;
let tl = uninit_tl.finish_creation()?;
let mut timelines = self.timelines.lock().unwrap();
// load_layers=false because create_empty_timeline already did that what's necessary (set next_open_layer)
// and modification.init_empty() already created layers.
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, false)?;
// The non-test code would call tl.activate() here.
tl.set_state(TimelineState::Active);
Ok(tl)
@@ -2246,12 +2275,7 @@ impl Tenant {
}
}
/// Helper function to create a new Timeline struct.
///
/// The returned Timeline is in Loading state. The caller is responsible for
/// initializing any on-disk state, and for inserting the Timeline to the 'timelines'
/// map.
fn create_timeline_struct(
fn create_timeline_data(
&self,
new_timeline_id: TimelineId,
new_metadata: &TimelineMetadata,
@@ -2671,7 +2695,7 @@ impl Tenant {
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
_ctx: &RequestContext,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let src_id = src_timeline.timeline_id;
@@ -2756,15 +2780,17 @@ impl Tenant {
src_timeline.pg_version,
);
let uninitialized_timeline = self.prepare_new_timeline(
dst_id,
&metadata,
timeline_uninit_mark,
start_lsn + 1,
Some(Arc::clone(src_timeline)),
)?;
let new_timeline = uninitialized_timeline.finish_creation()?;
let new_timeline = {
let mut timelines = self.timelines.lock().unwrap();
self.prepare_timeline(
dst_id,
&metadata,
timeline_uninit_mark,
false,
Some(Arc::clone(src_timeline)),
)?
.initialize_with_lock(ctx, &mut timelines, true)?
};
// Root timeline gets its layers during creation and uploads them along with the metadata.
// A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
@@ -2840,13 +2866,8 @@ impl Tenant {
pgdata_lsn,
pg_version,
);
let raw_timeline = self.prepare_new_timeline(
timeline_id,
&new_metadata,
timeline_uninit_mark,
pgdata_lsn,
None,
)?;
let raw_timeline =
self.prepare_timeline(timeline_id, &new_metadata, timeline_uninit_mark, true, None)?;
let tenant_id = raw_timeline.owning_tenant.tenant_id;
let unfinished_timeline = raw_timeline.raw_timeline()?;
@@ -2862,10 +2883,10 @@ impl Tenant {
format!("Failed to import pgdatadir for timeline {tenant_id}/{timeline_id}")
})?;
// Flush the new layer files to disk, before we make the timeline as available to
// Flush the new layer files to disk, before we mark the timeline as available to
// the outside world.
//
// Flush loop needs to be spawned in order to be able to flush.
// Thus spawn flush loop manually and skip flush_loop setup in initialize_with_lock
unfinished_timeline.maybe_spawn_flush_loop();
fail::fail_point!("before-checkpoint-new-timeline", |_| {
@@ -2881,8 +2902,12 @@ impl Tenant {
)
})?;
// All done!
let timeline = raw_timeline.finish_creation()?;
// Initialize the timeline without loading the layer map, because we already updated the layer
// map above, when we imported the datadir.
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
raw_timeline.initialize_with_lock(ctx, &mut timelines, false)?
};
info!(
"created root timeline {} timeline.lsn {}",
@@ -2893,18 +2918,14 @@ impl Tenant {
Ok(timeline)
}
/// Creates intermediate timeline structure and its files.
///
/// An empty layer map is initialized, and new data and WAL can be imported starting
/// at 'disk_consistent_lsn'. After any initial data has been imported, call
/// `finish_creation` to insert the Timeline into the timelines map and to remove the
/// uninit mark file.
fn prepare_new_timeline(
/// Creates intermediate timeline structure and its files, without loading it into memory.
/// It's up to the caller to import the necesary data and import the timeline into memory.
fn prepare_timeline(
&self,
new_timeline_id: TimelineId,
new_metadata: &TimelineMetadata,
uninit_mark: TimelineUninitMark,
start_lsn: Lsn,
init_layers: bool,
ancestor: Option<Arc<Timeline>>,
) -> anyhow::Result<UninitializedTimeline> {
let tenant_id = self.tenant_id;
@@ -2922,27 +2943,33 @@ impl Tenant {
None
};
let timeline_struct = self
.create_timeline_struct(new_timeline_id, new_metadata, ancestor, remote_client, None)
.context("Failed to create timeline data structure")?;
timeline_struct.init_empty_layer_map(start_lsn);
if let Err(e) =
self.create_timeline_files(&uninit_mark.timeline_path, new_timeline_id, new_metadata)
{
error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
cleanup_timeline_directory(uninit_mark);
return Err(e);
match self.create_timeline_files(
&uninit_mark.timeline_path,
new_timeline_id,
new_metadata,
ancestor,
remote_client,
) {
Ok(new_timeline) => {
if init_layers {
new_timeline.layers.write().unwrap().next_open_layer_at =
Some(new_timeline.initdb_lsn);
}
debug!(
"Successfully created initial files for timeline {tenant_id}/{new_timeline_id}"
);
Ok(UninitializedTimeline {
owning_tenant: self,
timeline_id: new_timeline_id,
raw_timeline: Some((new_timeline, uninit_mark)),
})
}
Err(e) => {
error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
cleanup_timeline_directory(uninit_mark);
Err(e)
}
}
debug!("Successfully created initial files for timeline {tenant_id}/{new_timeline_id}");
Ok(UninitializedTimeline {
owning_tenant: self,
timeline_id: new_timeline_id,
raw_timeline: Some((timeline_struct, uninit_mark)),
})
}
fn create_timeline_files(
@@ -2950,8 +2977,13 @@ impl Tenant {
timeline_path: &Path,
new_timeline_id: TimelineId,
new_metadata: &TimelineMetadata,
) -> anyhow::Result<()> {
crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
ancestor: Option<Arc<Timeline>>,
remote_client: Option<RemoteTimelineClient>,
) -> anyhow::Result<Arc<Timeline>> {
let timeline_data = self
.create_timeline_data(new_timeline_id, new_metadata, ancestor, remote_client, None)
.context("Failed to create timeline data structure")?;
crashsafe::create_dir_all(timeline_path).context("Failed to create timeline directory")?;
fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
@@ -2965,7 +2997,8 @@ impl Tenant {
true,
)
.context("Failed to create timeline metadata")?;
Ok(())
Ok(timeline_data)
}
/// Attempts to create an uninit mark file for the timeline initialization.
@@ -3580,21 +3613,16 @@ mod tests {
#[tokio::test]
async fn test_basic() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
let writer = tline.writer().await;
writer
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
.await?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
writer.finish_write(Lsn(0x10));
drop(writer);
let writer = tline.writer().await;
writer
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
.await?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
writer.finish_write(Lsn(0x20));
drop(writer);
@@ -3619,9 +3647,7 @@ mod tests {
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
.load()
.await;
let _ = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) {
Ok(_) => panic!("duplicate timeline creation should fail"),
@@ -3652,10 +3678,9 @@ mod tests {
use std::str::from_utf8;
let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let writer = tline.writer().await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
let writer = tline.writer();
#[allow(non_snake_case)]
let TEST_KEY_A: Key = Key::from_hex("112222222233333333444444445500000001").unwrap();
@@ -3663,21 +3688,13 @@ mod tests {
let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap();
// Insert a value on the timeline
writer
.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))
.await?;
writer
.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))
.await?;
writer.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))?;
writer.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))?;
writer.finish_write(Lsn(0x20));
writer
.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))
.await?;
writer.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))?;
writer.finish_write(Lsn(0x30));
writer
.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))
.await?;
writer.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))?;
writer.finish_write(Lsn(0x40));
//assert_current_logical_size(&tline, Lsn(0x40));
@@ -3689,10 +3706,8 @@ mod tests {
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
let new_writer = newtline.writer().await;
new_writer
.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))
.await?;
let new_writer = newtline.writer();
new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?;
new_writer.finish_write(Lsn(0x40));
// Check page contents on both branches
@@ -3718,46 +3733,38 @@ mod tests {
let mut lsn = start_lsn;
#[allow(non_snake_case)]
{
let writer = tline.writer().await;
let writer = tline.writer();
// Create a relation on the timeline
writer
.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
writer.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer.finish_write(lsn);
lsn += 0x10;
writer
.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
writer.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer.finish_write(lsn);
lsn += 0x10;
}
tline.freeze_and_flush().await?;
{
let writer = tline.writer().await;
writer
.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
let writer = tline.writer();
writer.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer.finish_write(lsn);
lsn += 0x10;
writer
.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
writer.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer.finish_write(lsn);
}
tline.freeze_and_flush().await
@@ -3769,9 +3776,8 @@ mod tests {
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
@@ -3808,9 +3814,8 @@ mod tests {
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?;
// try to branch at lsn 0x25, should fail because initdb lsn is 0x50
match tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
@@ -3859,9 +3864,8 @@ mod tests {
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
@@ -3909,9 +3913,8 @@ mod tests {
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
@@ -3934,9 +3937,8 @@ mod tests {
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
@@ -3968,9 +3970,8 @@ mod tests {
let harness = TenantHarness::create(TEST_NAME)?;
{
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
}
@@ -3989,9 +3990,8 @@ mod tests {
// create two timelines
{
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
@@ -4028,9 +4028,8 @@ mod tests {
let harness = TenantHarness::create(TEST_NAME)?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
drop(tline);
drop(tenant);
@@ -4068,44 +4067,35 @@ mod tests {
#[tokio::test]
async fn test_images() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
let writer = tline.writer().await;
writer
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
.await?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
writer.finish_write(Lsn(0x10));
drop(writer);
tline.freeze_and_flush().await?;
tline.compact(&ctx).await?;
let writer = tline.writer().await;
writer
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
.await?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
writer.finish_write(Lsn(0x20));
drop(writer);
tline.freeze_and_flush().await?;
tline.compact(&ctx).await?;
let writer = tline.writer().await;
writer
.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))
.await?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?;
writer.finish_write(Lsn(0x30));
drop(writer);
tline.freeze_and_flush().await?;
tline.compact(&ctx).await?;
let writer = tline.writer().await;
writer
.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))
.await?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?;
writer.finish_write(Lsn(0x40));
drop(writer);
@@ -4143,9 +4133,8 @@ mod tests {
#[tokio::test]
async fn test_bulk_insert() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)?;
let mut lsn = Lsn(0x10);
@@ -4156,14 +4145,12 @@ mod tests {
for _ in 0..50 {
for _ in 0..10000 {
test_key.field6 = blknum;
let writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
let writer = tline.writer();
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
writer.finish_write(lsn);
drop(writer);
@@ -4189,9 +4176,8 @@ mod tests {
#[tokio::test]
async fn test_random_updates() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
const NUM_KEYS: usize = 1000;
@@ -4208,14 +4194,12 @@ mod tests {
for blknum in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
test_key.field6 = blknum as u32;
let writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
let writer = tline.writer();
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
writer.finish_write(lsn);
updated[blknum] = lsn;
drop(writer);
@@ -4228,14 +4212,12 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
let writer = tline.writer();
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
writer.finish_write(lsn);
drop(writer);
updated[blknum] = lsn;
@@ -4268,9 +4250,8 @@ mod tests {
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
.load()
.await;
let mut tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let mut tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
const NUM_KEYS: usize = 1000;
@@ -4287,14 +4268,12 @@ mod tests {
for blknum in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
test_key.field6 = blknum as u32;
let writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
let writer = tline.writer();
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
writer.finish_write(lsn);
updated[blknum] = lsn;
drop(writer);
@@ -4315,14 +4294,12 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
let writer = tline.writer();
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
println!("updating {} at {}", blknum, lsn);
writer.finish_write(lsn);
drop(writer);
@@ -4356,9 +4333,8 @@ mod tests {
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
.load()
.await;
let mut tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let mut tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)?;
const NUM_KEYS: usize = 100;
const NUM_TLINES: usize = 50;
@@ -4383,14 +4359,12 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let writer = tline.writer().await;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
)
.await?;
let writer = tline.writer();
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
)?;
println!("updating [{}][{}] at {}", idx, blknum, lsn);
writer.finish_write(lsn);
drop(writer);
@@ -4461,7 +4435,6 @@ mod tests {
.context("init_empty_test_timeline")?;
modification
.commit()
.await
.context("commit init_empty_test_timeline modification")?;
// Do the flush. The flush code will check the expectations that we set above.

View File

@@ -51,23 +51,25 @@ use crate::keyspace::KeyPartitioning;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use anyhow::Context;
use anyhow::Result;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::{LayerKey, Replacement};
pub use historic_layer_coverage::Replacement;
use super::storage_layer::range_eq;
use super::storage_layer::PersistentLayerDesc;
use super::storage_layer::PersistentLayerKey;
///
/// LayerMap tracks what layers exist on a timeline.
///
#[derive(Default, Clone)]
pub struct LayerMap {
pub struct LayerMap<L: ?Sized> {
//
// 'open_layer' holds the current InMemoryLayer that is accepting new
// records. If it is None, 'next_open_layer_at' will be set instead, indicating
@@ -93,6 +95,24 @@ pub struct LayerMap {
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
/// Mapping from persistent layer key to the actual layer object. Currently, it stores delta, image, and
/// remote layers. In future refactors, this will be eventually moved out of LayerMap into Timeline, and
/// RemoteLayer will be removed.
mapping: HashMap<PersistentLayerKey, Arc<L>>,
}
impl<L: ?Sized> Default for LayerMap<L> {
fn default() -> Self {
Self {
open_layer: None,
next_open_layer_at: None,
frozen_layers: VecDeque::default(),
l0_delta_layers: Vec::default(),
historic: BufferedHistoricLayerCoverage::default(),
mapping: HashMap::default(),
}
}
}
/// The primary update API for the layer map.
@@ -100,21 +120,24 @@ pub struct LayerMap {
/// Batching historic layer insertions and removals is good for
/// performance and this struct helps us do that correctly.
#[must_use]
pub struct BatchedUpdates<'a> {
pub struct BatchedUpdates<'a, L: ?Sized + Layer> {
// While we hold this exclusive reference to the layer map the type checker
// will prevent us from accidentally reading any unflushed updates.
layer_map: &'a mut LayerMap,
layer_map: &'a mut LayerMap<L>,
}
/// Provide ability to batch more updates while hiding the read
/// API so we don't accidentally read without flushing.
impl BatchedUpdates<'_> {
impl<L> BatchedUpdates<'_, L>
where
L: ?Sized + Layer,
{
///
/// Insert an on-disk layer.
///
// TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
self.layer_map.insert_historic_noflush(layer_desc)
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
self.layer_map.insert_historic_noflush(layer_desc, layer)
}
///
@@ -122,8 +145,31 @@ impl BatchedUpdates<'_> {
///
/// This should be called when the corresponding file on disk has been deleted.
///
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) {
self.layer_map.remove_historic_noflush(layer_desc)
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
self.layer_map.remove_historic_noflush(layer_desc, layer)
}
/// Replaces existing layer iff it is the `expected`.
///
/// If the expected layer has been removed it will not be inserted by this function.
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// be done.
///
/// TODO replacement can be done without buffering and rebuilding layer map updates.
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
expected_desc: PersistentLayerDesc,
expected: &Arc<L>,
new_desc: PersistentLayerDesc,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
self.layer_map
.replace_historic_noflush(expected_desc, expected, new_desc, new)
}
// We will flush on drop anyway, but this method makes it
@@ -139,19 +185,25 @@ impl BatchedUpdates<'_> {
// than panic later or read without flushing.
//
// TODO maybe warn if flush hasn't explicitly been called
impl Drop for BatchedUpdates<'_> {
impl<L> Drop for BatchedUpdates<'_, L>
where
L: ?Sized + Layer,
{
fn drop(&mut self) {
self.layer_map.flush_updates();
}
}
/// Return value of LayerMap::search
pub struct SearchResult {
pub layer: Arc<PersistentLayerDesc>,
pub struct SearchResult<L: ?Sized> {
pub layer: Arc<L>,
pub lsn_floor: Lsn,
}
impl LayerMap {
impl<L> LayerMap<L>
where
L: ?Sized + Layer,
{
///
/// Find the latest layer (by lsn.end) that covers the given
/// 'key', with lsn.start < 'end_lsn'.
@@ -183,7 +235,7 @@ impl LayerMap {
/// NOTE: This only searches the 'historic' layers, *not* the
/// 'open' and 'frozen' layers!
///
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
let latest_delta = version.delta_coverage.query(key.to_i128());
let latest_image = version.image_coverage.query(key.to_i128());
@@ -192,6 +244,7 @@ impl LayerMap {
(None, None) => None,
(None, Some(image)) => {
let lsn_floor = image.get_lsn_range().start;
let image = self.get_layer_from_mapping(&image.key()).clone();
Some(SearchResult {
layer: image,
lsn_floor,
@@ -199,6 +252,7 @@ impl LayerMap {
}
(Some(delta), None) => {
let lsn_floor = delta.get_lsn_range().start;
let delta = self.get_layer_from_mapping(&delta.key()).clone();
Some(SearchResult {
layer: delta,
lsn_floor,
@@ -209,6 +263,7 @@ impl LayerMap {
let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
let image_exact_match = img_lsn + 1 == end_lsn;
if image_is_newer || image_exact_match {
let image = self.get_layer_from_mapping(&image.key()).clone();
Some(SearchResult {
layer: image,
lsn_floor: img_lsn,
@@ -216,6 +271,7 @@ impl LayerMap {
} else {
let lsn_floor =
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
let delta = self.get_layer_from_mapping(&delta.key()).clone();
Some(SearchResult {
layer: delta,
lsn_floor,
@@ -226,7 +282,7 @@ impl LayerMap {
}
/// Start a batch of updates, applied on drop
pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
pub fn batch_update(&mut self) -> BatchedUpdates<'_, L> {
BatchedUpdates { layer_map: self }
}
@@ -236,32 +292,48 @@ impl LayerMap {
/// Helper function for BatchedUpdates::insert_historic
///
/// TODO(chi): remove L generic so that we do not need to pass layer object.
pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
pub(self) fn insert_historic_noflush(
&mut self,
layer_desc: PersistentLayerDesc,
layer: Arc<L>,
) {
self.mapping.insert(layer_desc.key(), layer.clone());
// TODO: See #3869, resulting #4088, attempted fix and repro #4094
if Self::is_l0(&layer_desc) {
if Self::is_l0(&layer) {
self.l0_delta_layers.push(layer_desc.clone().into());
}
self.historic.insert(
historic_layer_coverage::LayerKey::from(&layer_desc),
historic_layer_coverage::LayerKey::from(&*layer),
layer_desc.into(),
);
}
fn get_layer_from_mapping(&self, key: &PersistentLayerKey) -> &Arc<L> {
let layer = self
.mapping
.get(key)
.with_context(|| format!("{key:?}"))
.expect("inconsistent layer mapping");
layer
}
///
/// Remove an on-disk layer from the map.
///
/// Helper function for BatchedUpdates::remove_historic
///
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
self.historic
.remove(historic_layer_coverage::LayerKey::from(&layer_desc));
let layer_key = layer_desc.key();
if Self::is_l0(&layer_desc) {
.remove(historic_layer_coverage::LayerKey::from(&*layer));
if Self::is_l0(&layer) {
let len_before = self.l0_delta_layers.len();
let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
l0_delta_layers.retain(|other| other.key() != layer_key);
l0_delta_layers.retain(|other| {
!Self::compare_arced_layers(self.get_layer_from_mapping(&other.key()), &layer)
});
self.l0_delta_layers = l0_delta_layers;
// this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
// there's a chance that the comparison fails at runtime due to it comparing (pointer,
@@ -272,6 +344,69 @@ impl LayerMap {
"failed to locate removed historic layer from l0_delta_layers"
);
}
self.mapping.remove(&layer_desc.key());
}
pub(self) fn replace_historic_noflush(
&mut self,
expected_desc: PersistentLayerDesc,
expected: &Arc<L>,
new_desc: PersistentLayerDesc,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
let key = historic_layer_coverage::LayerKey::from(&**expected);
let other = historic_layer_coverage::LayerKey::from(&*new);
let expected_l0 = Self::is_l0(expected);
let new_l0 = Self::is_l0(&new);
anyhow::ensure!(
key == other,
"expected and new must have equal LayerKeys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"expected and new must both be l0 deltas or neither should be: {expected_l0} != {new_l0}"
);
let l0_index = if expected_l0 {
// find the index in case replace worked, we need to replace that as well
let pos = self.l0_delta_layers.iter().position(|slot| {
Self::compare_arced_layers(self.get_layer_from_mapping(&slot.key()), expected)
});
if pos.is_none() {
return Ok(Replacement::NotFound);
}
pos
} else {
None
};
let new_desc = Arc::new(new_desc);
let replaced = self.historic.replace(&key, new_desc.clone(), |existing| {
**existing == expected_desc
});
if let Replacement::Replaced { .. } = &replaced {
self.mapping.remove(&expected_desc.key());
self.mapping.insert(new_desc.key(), new);
if let Some(index) = l0_index {
self.l0_delta_layers[index] = new_desc;
}
}
let replaced = match replaced {
Replacement::Replaced { in_buffered } => Replacement::Replaced { in_buffered },
Replacement::NotFound => Replacement::NotFound,
Replacement::RemovalBuffered => Replacement::RemovalBuffered,
Replacement::Unexpected(x) => {
Replacement::Unexpected(self.get_layer_from_mapping(&x.key()).clone())
}
};
Ok(replaced)
}
/// Helper function for BatchedUpdates::drop.
@@ -319,8 +454,10 @@ impl LayerMap {
Ok(true)
}
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
self.historic.iter()
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
self.historic
.iter()
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
}
///
@@ -335,7 +472,7 @@ impl LayerMap {
&self,
key_range: &Range<Key>,
lsn: Lsn,
) -> Result<Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)>> {
) -> Result<Vec<(Range<Key>, Option<Arc<L>>)>> {
let version = match self.historic.get().unwrap().get_version(lsn.0) {
Some(v) => v,
None => return Ok(vec![]),
@@ -345,26 +482,36 @@ impl LayerMap {
let end = key_range.end.to_i128();
// Initialize loop variables
let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![];
let mut current_key = start;
let mut current_val = version.image_coverage.query(start);
// Loop through the change events and push intervals
for (change_key, change_val) in version.image_coverage.range(start..end) {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
coverage.push((kr, current_val.take()));
coverage.push((
kr,
current_val
.take()
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
));
current_key = change_key;
current_val = change_val.clone();
}
// Add the final interval
let kr = Key::from_i128(current_key)..Key::from_i128(end);
coverage.push((kr, current_val.take()));
coverage.push((
kr,
current_val
.take()
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
));
Ok(coverage)
}
pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
pub fn is_l0(layer: &L) -> bool {
range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX))
}
@@ -390,7 +537,7 @@ impl LayerMap {
/// TODO The optimal number should probably be slightly higher than 1, but to
/// implement that we need to plumb a lot more context into this function
/// than just the current partition_range.
pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
pub fn is_reimage_worthy(layer: &L, partition_range: &Range<Key>) -> bool {
// Case 1
if !Self::is_l0(layer) {
return true;
@@ -448,7 +595,9 @@ impl LayerMap {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count = Self::is_reimage_worthy(&val, key) as usize;
let base_count =
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath =
self.count_deltas(&kr, &lr, new_limit)?;
@@ -471,7 +620,9 @@ impl LayerMap {
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count = Self::is_reimage_worthy(&val, key) as usize;
let base_count =
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
max_stacked_deltas = std::cmp::max(
@@ -621,8 +772,12 @@ impl LayerMap {
}
/// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
Ok(self.l0_delta_layers.to_vec())
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<L>>> {
Ok(self
.l0_delta_layers
.iter()
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
.collect())
}
/// debugging function to print out the contents of the layer map
@@ -647,79 +802,72 @@ impl LayerMap {
println!("End dump LayerMap");
Ok(())
}
}
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
///
/// Returns `true` if the two `Arc` point to the same layer, false otherwise.
///
/// If comparing persistent layers, ALWAYS compare the layer descriptor key.
#[inline(always)]
pub fn compare_arced_layers<L: ?Sized>(left: &Arc<L>, right: &Arc<L>) -> bool {
// "dyn Trait" objects are "fat pointers" in that they have two components:
// - pointer to the object
// - pointer to the vtable
//
// rust does not provide a guarantee that these vtables are unique, but however
// `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
// pointer and the vtable need to be equal.
//
// See: https://github.com/rust-lang/rust/issues/103763
//
// A future version of rust will most likely use this form below, where we cast each
// pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
// not affect the comparison.
//
// See: https://github.com/rust-lang/rust/pull/106450
let left = Arc::as_ptr(left) as *const ();
let right = Arc::as_ptr(right) as *const ();
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
///
/// Returns `true` if the two `Arc` point to the same layer, false otherwise.
#[inline(always)]
pub fn compare_arced_layers(left: &Arc<L>, right: &Arc<L>) -> bool {
// "dyn Trait" objects are "fat pointers" in that they have two components:
// - pointer to the object
// - pointer to the vtable
//
// rust does not provide a guarantee that these vtables are unique, but however
// `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
// pointer and the vtable need to be equal.
//
// See: https://github.com/rust-lang/rust/issues/103763
//
// A future version of rust will most likely use this form below, where we cast each
// pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
// not affect the comparison.
//
// See: https://github.com/rust-lang/rust/pull/106450
let left = Arc::as_ptr(left) as *const ();
let right = Arc::as_ptr(right) as *const ();
left == right
left == right
}
}
#[cfg(test)]
mod tests {
use super::LayerMap;
use crate::tenant::storage_layer::{tests::LayerDescriptor, LayerFileName};
use super::{LayerMap, Replacement};
use crate::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName};
use std::str::FromStr;
use std::sync::Arc;
mod l0_delta_layers_updated {
use crate::tenant::{
storage_layer::{PersistentLayer, PersistentLayerDesc},
timeline::LayerMapping,
};
use super::*;
#[test]
fn for_full_range_delta() {
// l0_delta_layers are used by compaction, and should observe all buffered updates
l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
true
)
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
true
)
}
#[test]
fn for_non_full_range_delta() {
// has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta
l0_delta_layers_updated_scenario(
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
// because not full range
false
)
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
// because not full range
false
)
}
#[test]
fn for_image() {
l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
// code only checks if it is a full range layer, doesn't care about images, which must
// mean we should in practice never have full range images
false
)
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
// code only checks if it is a full range layer, doesn't care about images, which must
// mean we should in practice never have full range images
false
)
}
#[test]
@@ -735,16 +883,16 @@ mod tests {
let not_found = Arc::new(layer.clone());
let new_version = Arc::new(layer);
// after the immutable storage state refactor, the replace operation
// will not use layer map any more. We keep it here for consistency in test cases
// and can remove it in the future.
let _map = LayerMap::default();
let mut map = LayerMap::default();
let mut mapping = LayerMapping::new();
let res = map.batch_update().replace_historic(
not_found.get_persistent_layer_desc(),
&not_found,
new_version.get_persistent_layer_desc(),
new_version,
);
mapping
.replace_and_verify(not_found, new_version)
.unwrap_err();
assert!(matches!(res, Ok(Replacement::NotFound)), "{res:?}");
}
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
@@ -755,44 +903,49 @@ mod tests {
let downloaded = Arc::new(skeleton);
let mut map = LayerMap::default();
let mut mapping = LayerMapping::new();
// two disjoint Arcs in different lifecycle phases. even if it seems they must be the
// same layer, we use LayerMap::compare_arced_layers as the identity of layers.
assert_eq!(remote.layer_desc(), downloaded.layer_desc());
assert!(!LayerMap::compare_arced_layers(&remote, &downloaded));
let expected_in_counts = (1, usize::from(expected_l0));
map.batch_update()
.insert_historic(remote.layer_desc().clone());
mapping.insert(remote.clone());
assert_eq!(
count_layer_in(&map, remote.layer_desc()),
expected_in_counts
);
.insert_historic(remote.get_persistent_layer_desc(), remote.clone());
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
mapping
.replace_and_verify(remote, downloaded.clone())
let replaced = map
.batch_update()
.replace_historic(
remote.get_persistent_layer_desc(),
&remote,
downloaded.get_persistent_layer_desc(),
downloaded.clone(),
)
.expect("name derived attributes are the same");
assert_eq!(
count_layer_in(&map, downloaded.layer_desc()),
expected_in_counts
assert!(
matches!(replaced, Replacement::Replaced { .. }),
"{replaced:?}"
);
assert_eq!(count_layer_in(&map, &downloaded), expected_in_counts);
map.batch_update()
.remove_historic(downloaded.layer_desc().clone());
assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
.remove_historic(downloaded.get_persistent_layer_desc(), downloaded.clone());
assert_eq!(count_layer_in(&map, &downloaded), (0, 0));
}
fn count_layer_in(map: &LayerMap, layer: &PersistentLayerDesc) -> (usize, usize) {
fn count_layer_in<L: Layer + ?Sized>(map: &LayerMap<L>, layer: &Arc<L>) -> (usize, usize) {
let historic = map
.iter_historic_layers()
.filter(|x| x.key() == layer.key())
.filter(|x| LayerMap::compare_arced_layers(x, layer))
.count();
let l0s = map
.get_level0_deltas()
.expect("why does this return a result");
let l0 = l0s.iter().filter(|x| x.key() == layer.key()).count();
let l0 = l0s
.iter()
.filter(|x| LayerMap::compare_arced_layers(x, layer))
.count();
(historic, l0)
}

View File

@@ -3,8 +3,6 @@ use std::ops::Range;
use tracing::info;
use crate::tenant::storage_layer::PersistentLayerDesc;
use super::layer_coverage::LayerCoverageTuple;
/// Layers in this module are identified and indexed by this data.
@@ -55,24 +53,11 @@ impl<'a, L: crate::tenant::storage_layer::Layer + ?Sized> From<&'a L> for LayerK
}
}
impl From<&PersistentLayerDesc> for LayerKey {
fn from(layer: &PersistentLayerDesc) -> Self {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
LayerKey {
key: kr.start.to_i128()..kr.end.to_i128(),
lsn: lr.start.0..lr.end.0,
is_image: !layer.is_incremental(),
}
}
}
/// Efficiently queryable layer coverage for each LSN.
///
/// Allows answering layer map queries very efficiently,
/// but doesn't allow retroactive insertion, which is
/// sometimes necessary. See BufferedHistoricLayerCoverage.
#[derive(Clone)]
pub struct HistoricLayerCoverage<Value> {
/// The latest state
head: LayerCoverageTuple<Value>,
@@ -426,7 +411,6 @@ fn test_persistent_overlapping() {
///
/// See this for more on persistent and retroactive techniques:
/// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
#[derive(Clone)]
pub struct BufferedHistoricLayerCoverage<Value> {
/// A persistent layer map that we rebuild when we need to retroactively update
historic_coverage: HistoricLayerCoverage<Value>,
@@ -483,11 +467,6 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
///
/// Returns a `Replacement` value describing the outcome; only the case of
/// `Replacement::Replaced` modifies the map and requires a rebuild.
///
/// This function is unlikely to be used in the future because LayerMap now only records the
/// layer descriptors. Therefore, anything added to the layer map will only be removed or
/// added, and never replaced.
#[allow(dead_code)]
pub fn replace<F>(
&mut self,
layer_key: &LayerKey,

View File

@@ -15,7 +15,6 @@ use rpds::RedBlackTreeMapSync;
///
/// NOTE The struct is parameterized over Value for easier
/// testing, but in practice it's some sort of layer.
#[derive(Clone)]
pub struct LayerCoverage<Value> {
/// For every change in coverage (as we sweep the key space)
/// we store (lsn.end, value).
@@ -140,7 +139,6 @@ impl<Value: Clone> LayerCoverage<Value> {
}
/// Image and delta coverage at a specific LSN.
#[derive(Clone)]
pub struct LayerCoverageTuple<Value> {
pub image_coverage: LayerCoverage<Value>,
pub delta_coverage: LayerCoverage<Value>,

View File

@@ -1,146 +0,0 @@
//! This module implements `LayerMapMgr`, which manages a layer map object and provides lock-free access to the state.
//!
//! A common usage pattern is as follows:
//!
//! ```ignore
//! async fn compaction(&self) {
//! // Get the current state.
//! let state = self.layer_map_mgr.read();
//! // No lock held at this point. Do compaction based on the state. This part usually incurs I/O operations and may
//! // take a long time.
//! let compaction_result = self.do_compaction(&state).await?;
//! // Update the state.
//! self.layer_map_mgr.update(|mut state| async move {
//! // do updates to the state, return it.
//! Ok(state)
//! }).await?;
//! }
//! ```
use anyhow::Result;
use arc_swap::ArcSwap;
use futures::Future;
use std::sync::Arc;
use super::layer_map::LayerMap;
/// Manages the storage state. Provide utility functions to modify the layer map and get an immutable reference to the
/// layer map.
pub struct LayerMapMgr {
layer_map: ArcSwap<LayerMap>,
state_lock: tokio::sync::Mutex<()>,
}
impl LayerMapMgr {
/// Get the current state of the layer map.
pub fn read(&self) -> Arc<LayerMap> {
// TODO: it is possible to use `load` to reduce the overhead of cloning the Arc, but read path usually involves
// disk reads and layer mapping fetching, and therefore it's not a big deal to use a more optimized version
// here.
self.layer_map.load_full()
}
/// Clone the layer map for modification.
fn clone_for_write(&self, _state_lock_witness: &tokio::sync::MutexGuard<'_, ()>) -> LayerMap {
(**self.layer_map.load()).clone()
}
pub fn new(layer_map: LayerMap) -> Self {
Self {
layer_map: ArcSwap::new(Arc::new(layer_map)),
state_lock: tokio::sync::Mutex::new(()),
}
}
/// Update the layer map.
pub async fn update<O, F>(&self, operation: O) -> Result<()>
where
O: FnOnce(LayerMap) -> F,
F: Future<Output = Result<LayerMap>>,
{
let state_lock = self.state_lock.lock().await;
let state = self.clone_for_write(&state_lock);
let new_state = operation(state).await?;
self.layer_map.store(Arc::new(new_state));
Ok(())
}
}
#[cfg(test)]
mod tests {
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::{repository::Key, tenant::storage_layer::PersistentLayerDesc};
use super::*;
#[tokio::test]
async fn test_layer_map_manage() -> Result<()> {
let mgr = LayerMapMgr::new(Default::default());
mgr.update(|mut map| async move {
let mut updates = map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_img(
TenantId::generate(),
TimelineId::generate(),
Key::from_i128(0)..Key::from_i128(1),
Lsn(0),
false,
0,
));
updates.flush();
Ok(map)
})
.await?;
let ref_1 = mgr.read();
mgr.update(|mut map| async move {
let mut updates = map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_img(
TenantId::generate(),
TimelineId::generate(),
Key::from_i128(1)..Key::from_i128(2),
Lsn(0),
false,
0,
));
updates.flush();
Ok(map)
})
.await?;
let ref_2 = mgr.read();
// Modification should not be visible to the old reference.
assert_eq!(
ref_1
.search(Key::from_i128(0), Lsn(1))
.unwrap()
.layer
.key_range,
Key::from_i128(0)..Key::from_i128(1)
);
assert!(ref_1.search(Key::from_i128(1), Lsn(1)).is_none());
// Modification should be visible to the new reference.
assert_eq!(
ref_2
.search(Key::from_i128(0), Lsn(1))
.unwrap()
.layer
.key_range,
Key::from_i128(0)..Key::from_i128(1)
);
assert_eq!(
ref_2
.search(Key::from_i128(1), Lsn(1))
.unwrap()
.layer
.key_range,
Key::from_i128(1)..Key::from_i128(2)
);
Ok(())
}
}

View File

@@ -1392,12 +1392,7 @@ mod tests {
let harness = TenantHarness::create(test_name)?;
let (tenant, ctx) = runtime.block_on(harness.load());
// create an empty timeline directory
let _ = runtime.block_on(tenant.create_test_timeline(
TIMELINE_ID,
Lsn(8),
DEFAULT_PG_VERSION,
&ctx,
))?;
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)?;
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;

View File

@@ -176,10 +176,13 @@ impl LayerAccessStats {
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn for_loading_layer(
layer_map_lock_held_witness: &BatchedUpdates<'_>,
pub(crate) fn for_loading_layer<L>(
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
status: LayerResidenceStatus,
) -> Self {
) -> Self
where
L: ?Sized + Layer,
{
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event(
layer_map_lock_held_witness,
@@ -194,11 +197,14 @@ impl LayerAccessStats {
/// The `new_status` is not recorded in `self`.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn clone_for_residence_change(
pub(crate) fn clone_for_residence_change<L>(
&self,
layer_map_lock_held_witness: &BatchedUpdates<'_>,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
new_status: LayerResidenceStatus,
) -> LayerAccessStats {
) -> LayerAccessStats
where
L: ?Sized + Layer,
{
let clone = {
let inner = self.0.lock().unwrap();
inner.clone()
@@ -226,12 +232,14 @@ impl LayerAccessStats {
/// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
/// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
///
pub(crate) fn record_residence_event(
pub(crate) fn record_residence_event<L>(
&self,
_layer_map_lock_held_witness: &BatchedUpdates<'_>,
_layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
status: LayerResidenceStatus,
reason: LayerResidenceEventReason,
) {
) where
L: ?Sized + Layer,
{
let mut locked = self.0.lock().unwrap();
locked.iter_mut().for_each(|inner| {
inner
@@ -381,10 +389,10 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
}
/// Returned by [`Layer::iter`]
pub type LayerIter<'i> = Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + 'i + Send>;
pub type LayerIter<'i> = Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + 'i>;
/// Returned by [`Layer::key_iter`]
pub type LayerKeyIter<'i> = Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'i + Send>;
pub type LayerKeyIter<'i> = Box<dyn Iterator<Item = (Key, Lsn, u64)> + 'i>;
/// A Layer contains all data in a "rectangle" consisting of a range of keys and
/// range of LSNs.
@@ -465,125 +473,94 @@ pub fn downcast_remote_layer(
}
}
pub mod tests {
use super::*;
/// Holds metadata about a layer without any content. Used mostly for testing.
///
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
/// LayerDescriptor.
#[derive(Clone, Debug)]
pub struct LayerDescriptor {
pub key: Range<Key>,
pub lsn: Range<Lsn>,
pub is_incremental: bool,
pub short_id: String,
}
/// Holds metadata about a layer without any content. Used mostly for testing.
///
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
/// LayerDescriptor.
#[derive(Clone, Debug)]
pub struct LayerDescriptor {
base: PersistentLayerDesc,
impl LayerDescriptor {
/// `LayerDescriptor` is only used for testing purpose so it does not matter whether it is image / delta,
/// and the tenant / timeline id does not matter.
pub fn get_persistent_layer_desc(&self) -> PersistentLayerDesc {
PersistentLayerDesc::new_delta(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
self.key.clone(),
self.lsn.clone(),
233,
)
}
}
impl Layer for LayerDescriptor {
fn get_key_range(&self) -> Range<Key> {
self.key.clone()
}
impl From<PersistentLayerDesc> for LayerDescriptor {
fn from(base: PersistentLayerDesc) -> Self {
Self { base }
}
fn get_lsn_range(&self) -> Range<Lsn> {
self.lsn.clone()
}
impl Layer for LayerDescriptor {
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn get_key_range(&self) -> Range<Key> {
self.layer_desc().key_range.clone()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn get_lsn_range(&self) -> Range<Lsn> {
self.layer_desc().lsn_range.clone()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn is_incremental(&self) -> bool {
self.layer_desc().is_incremental
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn short_id(&self) -> String {
self.layer_desc().short_id()
}
fn is_incremental(&self) -> bool {
self.is_incremental
}
impl PersistentLayer for LayerDescriptor {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.base
}
fn local_path(&self) -> Option<PathBuf> {
unimplemented!()
}
fn iter(&self, _: &RequestContext) -> Result<LayerIter<'_>> {
unimplemented!()
}
fn key_iter(&self, _: &RequestContext) -> Result<LayerKeyIter<'_>> {
unimplemented!()
}
fn delete_resident_layer_file(&self) -> Result<()> {
unimplemented!()
}
fn info(&self, _: LayerAccessStatsReset) -> HistoricLayerInfo {
unimplemented!()
}
fn access_stats(&self) -> &LayerAccessStats {
unimplemented!()
}
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
impl From<DeltaFileName> for LayerDescriptor {
fn from(value: DeltaFileName) -> Self {
LayerDescriptor {
base: PersistentLayerDesc::new_delta(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn_range,
233,
),
}
}
fn short_id(&self) -> String {
self.short_id.clone()
}
impl From<ImageFileName> for LayerDescriptor {
fn from(value: ImageFileName) -> Self {
LayerDescriptor {
base: PersistentLayerDesc::new_img(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn,
false,
233,
),
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!()
}
}
impl From<DeltaFileName> for LayerDescriptor {
fn from(value: DeltaFileName) -> Self {
let short_id = value.to_string();
LayerDescriptor {
key: value.key_range,
lsn: value.lsn_range,
is_incremental: true,
short_id,
}
}
}
impl From<LayerFileName> for LayerDescriptor {
fn from(value: LayerFileName) -> Self {
match value {
LayerFileName::Delta(d) => Self::from(d),
LayerFileName::Image(i) => Self::from(i),
}
impl From<ImageFileName> for LayerDescriptor {
fn from(value: ImageFileName) -> Self {
let short_id = value.to_string();
let lsn = value.lsn_as_range();
LayerDescriptor {
key: value.key_range,
lsn,
is_incremental: false,
short_id,
}
}
}
impl From<LayerFileName> for LayerDescriptor {
fn from(value: LayerFileName) -> Self {
match value {
LayerFileName::Delta(d) => Self::from(d),
LayerFileName::Image(i) => Self::from(i),
}
}
}

View File

@@ -37,7 +37,6 @@ use crate::virtual_file::VirtualFile;
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use once_cell::sync::OnceCell;
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
@@ -47,6 +46,7 @@ use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::*;
use utils::{
@@ -184,7 +184,7 @@ pub struct DeltaLayer {
access_stats: LayerAccessStats,
inner: OnceCell<DeltaLayerInner>,
inner: RwLock<DeltaLayerInner>,
}
impl std::fmt::Debug for DeltaLayer {
@@ -201,17 +201,21 @@ impl std::fmt::Debug for DeltaLayer {
}
pub struct DeltaLayerInner {
/// If false, the fields below have not been loaded into memory yet.
loaded: bool,
// values copied from summary
index_start_blk: u32,
index_root_blk: u32,
/// Reader object for reading blocks from the file.
file: FileBlockReader<VirtualFile>,
/// Reader object for reading blocks from the file. (None if not loaded yet)
file: Option<FileBlockReader<VirtualFile>>,
}
impl std::fmt::Debug for DeltaLayerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DeltaLayerInner")
.field("loaded", &self.loaded)
.field("index_start_blk", &self.index_start_blk)
.field("index_root_blk", &self.index_root_blk)
.finish()
@@ -242,7 +246,7 @@ impl Layer for DeltaLayer {
inner.index_start_blk, inner.index_root_blk
);
let file = &inner.file;
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
@@ -311,7 +315,7 @@ impl Layer for DeltaLayer {
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
// Scan the page versions backwards, starting from `lsn`.
let file = &inner.file;
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
@@ -496,22 +500,51 @@ impl DeltaLayer {
/// Open the underlying file and read the metadata into memory, if it's
/// not loaded already.
///
fn load(&self, access_kind: LayerAccessKind, ctx: &RequestContext) -> Result<&DeltaLayerInner> {
fn load(
&self,
access_kind: LayerAccessKind,
ctx: &RequestContext,
) -> Result<RwLockReadGuard<DeltaLayerInner>> {
self.access_stats
.record_access(access_kind, ctx.task_kind());
// Quick exit if already loaded
self.inner
.get_or_try_init(|| self.load_inner())
.with_context(|| format!("Failed to load delta layer {}", self.path().display()))
loop {
// Quick exit if already loaded
let inner = self.inner.read().unwrap();
if inner.loaded {
return Ok(inner);
}
// Need to open the file and load the metadata. Upgrade our lock to
// a write lock. (Or rather, release and re-lock in write mode.)
drop(inner);
let inner = self.inner.write().unwrap();
if !inner.loaded {
self.load_inner(inner).with_context(|| {
format!("Failed to load delta layer {}", self.path().display())
})?;
} else {
// Another thread loaded it while we were not holding the lock.
}
// We now have the file open and loaded. There's no function to do
// that in the std library RwLock, so we have to release and re-lock
// in read mode. (To be precise, the lock guard was moved in the
// above call to `load_inner`, so it's already been released). And
// while we do that, another thread could unload again, so we have
// to re-check and retry if that happens.
}
}
fn load_inner(&self) -> Result<DeltaLayerInner> {
fn load_inner(&self, mut inner: RwLockWriteGuard<DeltaLayerInner>) -> Result<()> {
let path = self.path();
let file = VirtualFile::open(&path)
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
// Open the file if it's not open already.
if inner.file.is_none() {
let file = VirtualFile::open(&path)
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
inner.file = Some(FileBlockReader::new(file));
}
let file = inner.file.as_mut().unwrap();
let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
@@ -538,13 +571,13 @@ impl DeltaLayer {
}
}
inner.index_start_blk = actual_summary.index_start_blk;
inner.index_root_blk = actual_summary.index_root_blk;
debug!("loaded from {}", &path.display());
Ok(DeltaLayerInner {
file,
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
})
inner.loaded = true;
Ok(())
}
/// Create a DeltaLayer struct representing an existing file on disk.
@@ -566,7 +599,12 @@ impl DeltaLayer {
file_size,
),
access_stats,
inner: once_cell::sync::OnceCell::new(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
index_start_blk: 0,
index_root_blk: 0,
}),
}
}
@@ -593,7 +631,12 @@ impl DeltaLayer {
metadata.len(),
),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: once_cell::sync::OnceCell::new(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
index_start_blk: 0,
index_root_blk: 0,
}),
})
}
@@ -757,7 +800,12 @@ impl DeltaLayerWriterInner {
metadata.len(),
),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: once_cell::sync::OnceCell::new(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
index_start_blk,
index_root_blk,
}),
};
// fsync the file
@@ -892,13 +940,13 @@ struct DeltaValueIter<'a> {
reader: BlockCursor<Adapter<'a>>,
}
struct Adapter<'a>(&'a DeltaLayerInner);
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
impl<'a> BlockReader for Adapter<'a> {
type BlockLease = PageReadGuard<'static>;
fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, std::io::Error> {
self.0.file.read_blk(blknum)
self.0.file.as_ref().unwrap().read_blk(blknum)
}
}
@@ -911,8 +959,8 @@ impl<'a> Iterator for DeltaValueIter<'a> {
}
impl<'a> DeltaValueIter<'a> {
fn new(inner: &'a DeltaLayerInner) -> Result<Self> {
let file = &inner.file;
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
@@ -985,8 +1033,8 @@ impl Iterator for DeltaKeyIter {
}
impl<'a> DeltaKeyIter {
fn new(inner: &'a DeltaLayerInner) -> Result<Self> {
let file = &inner.file;
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
@@ -1026,21 +1074,3 @@ impl<'a> DeltaKeyIter {
Ok(iter)
}
}
#[cfg(test)]
mod test {
use super::DeltaKeyIter;
use super::DeltaLayer;
use super::DeltaValueIter;
// We will soon need the iters to be send in the compaction code.
// Cf https://github.com/neondatabase/neon/pull/4462#issuecomment-1587398883
// Cf https://github.com/neondatabase/neon/issues/4471
#[test]
fn is_send() {
fn assert_send<T: Send>() {}
assert_send::<DeltaLayer>();
assert_send::<DeltaValueIter>();
assert_send::<DeltaKeyIter>();
}
}

View File

@@ -304,7 +304,7 @@ impl InMemoryLayer {
Ok(())
}
pub async fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
pub fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
// TODO: Currently, we just leak the storage for any deleted keys
Ok(())

View File

@@ -218,12 +218,15 @@ impl RemoteLayer {
}
/// Create a Layer struct representing this layer, after it has been downloaded.
pub fn create_downloaded_layer(
pub fn create_downloaded_layer<L>(
&self,
layer_map_lock_held_witness: &BatchedUpdates<'_>,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
conf: &'static PageServerConf,
file_size: u64,
) -> Arc<dyn PersistentLayer> {
) -> Arc<dyn PersistentLayer>
where
L: ?Sized + Layer,
{
if self.desc.is_delta {
let fname = self.desc.delta_file_name();
Arc::new(DeltaLayer::new(

File diff suppressed because it is too large Load Diff

View File

@@ -197,11 +197,9 @@ impl Timeline {
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
let layers = self.layers.read().unwrap();
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
let hist_layer = mapping.get_from_desc(&hist_layer);
if hist_layer.is_remote_layer() {
continue;
}

View File

@@ -1325,7 +1325,6 @@ mod tests {
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
.await
.expect("Failed to create an empty timeline for dummy wal connection manager");
ConnectionManagerState {

View File

@@ -304,15 +304,12 @@ pub(super) async fn handle_walreceiver_connection(
}
}
timeline
.check_checkpoint_distance()
.await
.with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
timeline.check_checkpoint_distance().with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
if let Some(last_lsn) = status_update {
let timeline_remote_consistent_lsn =

View File

@@ -333,7 +333,7 @@ impl<'a> WalIngest<'a> {
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
modification.commit().await?;
modification.commit()?;
Ok(())
}
@@ -1199,7 +1199,7 @@ mod tests {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
m.commit().await?;
m.commit()?;
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
Ok(walingest)
@@ -1208,9 +1208,7 @@ mod tests {
#[tokio::test]
async fn test_relsize() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
let mut m = tline.begin_modification(Lsn(0x20));
@@ -1218,22 +1216,22 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
.await?;
m.commit().await?;
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x30));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
.await?;
m.commit().await?;
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x40));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
.await?;
m.commit().await?;
m.commit()?;
let mut m = tline.begin_modification(Lsn(0x50));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
.await?;
m.commit().await?;
m.commit()?;
assert_current_logical_size(&tline, Lsn(0x50));
@@ -1319,7 +1317,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
.await?;
m.commit().await?;
m.commit()?;
assert_current_logical_size(&tline, Lsn(0x60));
// Check reported size and contents after truncation
@@ -1361,7 +1359,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
.await?;
m.commit().await?;
m.commit()?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
@@ -1374,7 +1372,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
.await?;
m.commit().await?;
m.commit()?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
@@ -1399,7 +1397,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
.await?;
m.commit().await?;
m.commit()?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
@@ -1429,16 +1427,14 @@ mod tests {
#[tokio::test]
async fn test_drop_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
let mut m = tline.begin_modification(Lsn(0x20));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
.await?;
m.commit().await?;
m.commit()?;
// Check that rel exists and size is correct
assert_eq!(
@@ -1457,7 +1453,7 @@ mod tests {
// Drop rel
let mut m = tline.begin_modification(Lsn(0x30));
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
m.commit().await?;
m.commit()?;
// Check that rel is not visible anymore
assert_eq!(
@@ -1475,7 +1471,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
.await?;
m.commit().await?;
m.commit()?;
// Check that rel exists and size is correct
assert_eq!(
@@ -1500,9 +1496,7 @@ mod tests {
#[tokio::test]
async fn test_truncate_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
// Create a 20 MB relation (the size is arbitrary)
@@ -1514,7 +1508,7 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
.await?;
}
m.commit().await?;
m.commit()?;
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
@@ -1559,7 +1553,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
.await?;
m.commit().await?;
m.commit()?;
// Check reported size and contents after truncation
assert_eq!(
@@ -1608,7 +1602,7 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
.await?;
}
m.commit().await?;
m.commit()?;
assert_eq!(
tline
@@ -1642,9 +1636,7 @@ mod tests {
#[tokio::test]
async fn test_large_rel() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)?;
let mut walingest = init_walingest_test(&tline, &ctx).await?;
let mut lsn = 0x10;
@@ -1655,7 +1647,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
.await?;
m.commit().await?;
m.commit()?;
}
assert_current_logical_size(&tline, Lsn(lsn));
@@ -1671,7 +1663,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
.await?;
m.commit().await?;
m.commit()?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE
@@ -1684,7 +1676,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
.await?;
m.commit().await?;
m.commit()?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE - 1
@@ -1700,7 +1692,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
.await?;
m.commit().await?;
m.commit()?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
size as BlockNumber

124
poetry.lock generated
View File

@@ -1,10 +1,9 @@
# This file is automatically @generated by Poetry and should not be changed by hand.
# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand.
[[package]]
name = "aiohttp"
version = "3.7.4"
description = "Async http client/server framework (asyncio)"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -62,7 +61,6 @@ speedups = ["aiodns", "brotlipy", "cchardet"]
name = "aiopg"
version = "1.3.4"
description = "Postgres integration with asyncio."
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -79,30 +77,28 @@ sa = ["sqlalchemy[postgresql-psycopg2binary] (>=1.3,<1.5)"]
[[package]]
name = "allure-pytest"
version = "2.13.2"
version = "2.13.1"
description = "Allure pytest integration"
category = "main"
optional = false
python-versions = "*"
files = [
{file = "allure-pytest-2.13.2.tar.gz", hash = "sha256:22243159e8ec81ce2b5254b4013802198821b1b42f118f69d4a289396607c7b3"},
{file = "allure_pytest-2.13.2-py3-none-any.whl", hash = "sha256:17de9dbee7f61c8e66a5b5e818b00e419dbcea44cb55c24319401ba813220690"},
{file = "allure-pytest-2.13.1.tar.gz", hash = "sha256:68d69456eeb65af4061ec06a80bc941163b0616e8216554d36b070a6bf070e08"},
{file = "allure_pytest-2.13.1-py3-none-any.whl", hash = "sha256:a8de2fc3b3effe2d8f98801646920de3f055b779710f4c806dbee7c613c24633"},
]
[package.dependencies]
allure-python-commons = "2.13.2"
allure-python-commons = "2.13.1"
pytest = ">=4.5.0"
[[package]]
name = "allure-python-commons"
version = "2.13.2"
version = "2.13.1"
description = "Common module for integrate allure with python-based frameworks"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
{file = "allure-python-commons-2.13.2.tar.gz", hash = "sha256:8a03681330231b1deadd86b97ff68841c6591320114ae638570f1ed60d7a2033"},
{file = "allure_python_commons-2.13.2-py3-none-any.whl", hash = "sha256:2bb3646ec3fbf5b36d178a5e735002bc130ae9f9ba80f080af97d368ba375051"},
{file = "allure-python-commons-2.13.1.tar.gz", hash = "sha256:3fc13e1da8ebb23f9ab5c9c72ad04595023cdd5078dbb8604939997faebed5cb"},
{file = "allure_python_commons-2.13.1-py3-none-any.whl", hash = "sha256:d08e04867bddf44fef55def3d67f4bc25af58a1bf9fcffcf4ec3331f7f2ef0d0"},
]
[package.dependencies]
@@ -113,7 +109,6 @@ pluggy = ">=0.4.0"
name = "async-timeout"
version = "3.0.1"
description = "Timeout context manager for asyncio programs"
category = "main"
optional = false
python-versions = ">=3.5.3"
files = [
@@ -125,7 +120,6 @@ files = [
name = "asyncpg"
version = "0.27.0"
description = "An asyncio PostgreSQL driver"
category = "main"
optional = false
python-versions = ">=3.7.0"
files = [
@@ -176,7 +170,6 @@ test = ["flake8 (>=5.0.4,<5.1.0)", "uvloop (>=0.15.3)"]
name = "attrs"
version = "21.4.0"
description = "Classes Without Boilerplate"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
files = [
@@ -194,7 +187,6 @@ tests-no-zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy"
name = "aws-sam-translator"
version = "1.48.0"
description = "AWS SAM Translator is a library that transform SAM templates into AWS CloudFormation templates"
category = "main"
optional = false
python-versions = ">=3.7, <=4.0, !=4.0"
files = [
@@ -204,7 +196,7 @@ files = [
]
[package.dependencies]
boto3 = ">=1.19.5,<2.0.0"
boto3 = ">=1.19.5,<2.dev0"
jsonschema = ">=3.2,<4.0"
[package.extras]
@@ -214,7 +206,6 @@ dev = ["black (==20.8b1)", "boto3 (>=1.23,<2)", "click (>=7.1,<8.0)", "coverage
name = "aws-xray-sdk"
version = "2.10.0"
description = "The AWS X-Ray SDK for Python (the SDK) enables Python developers to record and emit information from within their applications to the AWS X-Ray service."
category = "main"
optional = false
python-versions = "*"
files = [
@@ -230,7 +221,6 @@ wrapt = "*"
name = "backoff"
version = "2.2.1"
description = "Function decoration for backoff and retry"
category = "main"
optional = false
python-versions = ">=3.7,<4.0"
files = [
@@ -242,7 +232,6 @@ files = [
name = "black"
version = "23.3.0"
description = "The uncompromising code formatter."
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
@@ -292,7 +281,6 @@ uvloop = ["uvloop (>=0.15.2)"]
name = "boto3"
version = "1.26.16"
description = "The AWS SDK for Python"
category = "main"
optional = false
python-versions = ">= 3.7"
files = [
@@ -312,7 +300,6 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"]
name = "boto3-stubs"
version = "1.26.16"
description = "Type annotations for boto3 1.26.16 generated with mypy-boto3-builder 7.11.11"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -657,7 +644,6 @@ xray = ["mypy-boto3-xray (>=1.26.0,<1.27.0)"]
name = "botocore"
version = "1.29.16"
description = "Low-level, data-driven core of boto 3."
category = "main"
optional = false
python-versions = ">= 3.7"
files = [
@@ -677,7 +663,6 @@ crt = ["awscrt (==0.14.0)"]
name = "botocore-stubs"
version = "1.27.38"
description = "Type annotations for botocore 1.27.38 generated with mypy-boto3-builder 7.10.1"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -692,7 +677,6 @@ typing-extensions = ">=4.1.0"
name = "certifi"
version = "2022.12.7"
description = "Python package for providing Mozilla's CA Bundle."
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -704,7 +688,6 @@ files = [
name = "cffi"
version = "1.15.1"
description = "Foreign Function Interface for Python calling C code."
category = "main"
optional = false
python-versions = "*"
files = [
@@ -781,7 +764,6 @@ pycparser = "*"
name = "cfn-lint"
version = "0.61.3"
description = "Checks CloudFormation templates for practices and behaviour that could potentially be improved"
category = "main"
optional = false
python-versions = ">=3.6, <=4.0, !=4.0"
files = [
@@ -803,7 +785,6 @@ sarif-om = ">=1.0.4,<1.1.0"
name = "chardet"
version = "3.0.4"
description = "Universal encoding detector for Python 2 and 3"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -815,7 +796,6 @@ files = [
name = "charset-normalizer"
version = "2.1.0"
description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet."
category = "main"
optional = false
python-versions = ">=3.6.0"
files = [
@@ -830,7 +810,6 @@ unicode-backport = ["unicodedata2"]
name = "click"
version = "8.1.3"
description = "Composable command line interface toolkit"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -845,7 +824,6 @@ colorama = {version = "*", markers = "platform_system == \"Windows\""}
name = "colorama"
version = "0.4.5"
description = "Cross-platform colored terminal text."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
files = [
@@ -857,7 +835,6 @@ files = [
name = "cryptography"
version = "41.0.0"
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -899,7 +876,6 @@ test-randomorder = ["pytest-randomly"]
name = "docker"
version = "4.2.2"
description = "A Python library for the Docker Engine API."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
files = [
@@ -921,7 +897,6 @@ tls = ["cryptography (>=1.3.4)", "idna (>=2.0.0)", "pyOpenSSL (>=17.5.0)"]
name = "ecdsa"
version = "0.18.0"
description = "ECDSA cryptographic signature library (pure python)"
category = "main"
optional = false
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
files = [
@@ -940,7 +915,6 @@ gmpy2 = ["gmpy2"]
name = "exceptiongroup"
version = "1.1.1"
description = "Backport of PEP 654 (exception groups)"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -955,7 +929,6 @@ test = ["pytest (>=6)"]
name = "execnet"
version = "1.9.0"
description = "execnet: rapid multi-Python deployment"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
files = [
@@ -970,7 +943,6 @@ testing = ["pre-commit"]
name = "flask"
version = "2.2.5"
description = "A simple framework for building complex web applications."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -993,7 +965,6 @@ dotenv = ["python-dotenv"]
name = "flask-cors"
version = "3.0.10"
description = "A Flask extension adding a decorator for CORS support"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1009,7 +980,6 @@ Six = "*"
name = "graphql-core"
version = "3.2.1"
description = "GraphQL implementation for Python, a port of GraphQL.js, the JavaScript reference implementation for GraphQL."
category = "main"
optional = false
python-versions = ">=3.6,<4"
files = [
@@ -1021,7 +991,6 @@ files = [
name = "idna"
version = "3.3"
description = "Internationalized Domain Names in Applications (IDNA)"
category = "main"
optional = false
python-versions = ">=3.5"
files = [
@@ -1033,7 +1002,6 @@ files = [
name = "importlib-metadata"
version = "4.12.0"
description = "Read metadata from Python packages"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1053,7 +1021,6 @@ testing = ["flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pyfakefs
name = "iniconfig"
version = "1.1.1"
description = "iniconfig: brain-dead simple config-ini parsing"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1065,7 +1032,6 @@ files = [
name = "itsdangerous"
version = "2.1.2"
description = "Safely pass data to untrusted environments and back."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1077,7 +1043,6 @@ files = [
name = "jinja2"
version = "3.1.2"
description = "A very fast and expressive template engine."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1095,7 +1060,6 @@ i18n = ["Babel (>=2.7)"]
name = "jmespath"
version = "1.0.1"
description = "JSON Matching Expressions"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1107,7 +1071,6 @@ files = [
name = "jschema-to-python"
version = "1.2.3"
description = "Generate source code for Python classes from a JSON schema."
category = "main"
optional = false
python-versions = ">= 2.7"
files = [
@@ -1124,7 +1087,6 @@ pbr = "*"
name = "jsondiff"
version = "2.0.0"
description = "Diff JSON and JSON-like structures in Python"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1136,7 +1098,6 @@ files = [
name = "jsonpatch"
version = "1.32"
description = "Apply JSON-Patches (RFC 6902)"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
files = [
@@ -1151,7 +1112,6 @@ jsonpointer = ">=1.9"
name = "jsonpickle"
version = "2.2.0"
description = "Python library for serializing any arbitrary object graph into JSON"
category = "main"
optional = false
python-versions = ">=2.7"
files = [
@@ -1168,7 +1128,6 @@ testing-libs = ["simplejson", "ujson", "yajl"]
name = "jsonpointer"
version = "2.3"
description = "Identify specific nodes in a JSON document (RFC 6901)"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
files = [
@@ -1180,7 +1139,6 @@ files = [
name = "jsonschema"
version = "3.2.0"
description = "An implementation of JSON Schema validation for Python"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1202,7 +1160,6 @@ format-nongpl = ["idna", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-va
name = "junit-xml"
version = "1.9"
description = "Creates JUnit XML test result documents that can be read by tools such as Jenkins"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1217,7 +1174,6 @@ six = "*"
name = "markupsafe"
version = "2.1.1"
description = "Safely add untrusted strings to HTML/XML markup."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1267,7 +1223,6 @@ files = [
name = "moto"
version = "4.1.2"
description = ""
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1328,7 +1283,6 @@ xray = ["aws-xray-sdk (>=0.93,!=0.96)", "setuptools"]
name = "multidict"
version = "6.0.4"
description = "multidict implementation"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1412,7 +1366,6 @@ files = [
name = "mypy"
version = "1.3.0"
description = "Optional static typing for Python"
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
@@ -1459,7 +1412,6 @@ reports = ["lxml"]
name = "mypy-boto3-s3"
version = "1.26.0.post1"
description = "Type annotations for boto3.S3 1.26.0 service generated with mypy-boto3-builder 7.11.10"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1474,7 +1426,6 @@ typing-extensions = ">=4.1.0"
name = "mypy-extensions"
version = "1.0.0"
description = "Type system extensions for programs checked with the mypy type checker."
category = "dev"
optional = false
python-versions = ">=3.5"
files = [
@@ -1486,7 +1437,6 @@ files = [
name = "networkx"
version = "2.8.5"
description = "Python package for creating and manipulating graphs and networks"
category = "main"
optional = false
python-versions = ">=3.8"
files = [
@@ -1505,7 +1455,6 @@ test = ["codecov (>=2.1)", "pytest (>=7.1)", "pytest-cov (>=3.0)"]
name = "openapi-schema-validator"
version = "0.2.3"
description = "OpenAPI schema validation for Python"
category = "main"
optional = false
python-versions = ">=3.7.0,<4.0.0"
files = [
@@ -1525,7 +1474,6 @@ strict-rfc3339 = ["strict-rfc3339"]
name = "openapi-spec-validator"
version = "0.4.0"
description = "OpenAPI 2.0 (aka Swagger) and OpenAPI 3.0 spec validator"
category = "main"
optional = false
python-versions = ">=3.7.0,<4.0.0"
files = [
@@ -1546,7 +1494,6 @@ requests = ["requests"]
name = "packaging"
version = "23.0"
description = "Core utilities for Python packages"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1558,7 +1505,6 @@ files = [
name = "pathspec"
version = "0.9.0"
description = "Utility library for gitignore style pattern matching of file paths."
category = "dev"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7"
files = [
@@ -1570,7 +1516,6 @@ files = [
name = "pbr"
version = "5.9.0"
description = "Python Build Reasonableness"
category = "main"
optional = false
python-versions = ">=2.6"
files = [
@@ -1582,7 +1527,6 @@ files = [
name = "platformdirs"
version = "2.5.2"
description = "A small Python module for determining appropriate platform-specific dirs, e.g. a \"user data dir\"."
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
@@ -1598,7 +1542,6 @@ test = ["appdirs (==1.4.4)", "pytest (>=6)", "pytest-cov (>=2.7)", "pytest-mock
name = "pluggy"
version = "1.0.0"
description = "plugin and hook calling mechanisms for python"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -1614,7 +1557,6 @@ testing = ["pytest", "pytest-benchmark"]
name = "prometheus-client"
version = "0.14.1"
description = "Python client for the Prometheus monitoring system."
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -1629,7 +1571,6 @@ twisted = ["twisted"]
name = "psutil"
version = "5.9.4"
description = "Cross-platform lib for process and system monitoring in Python."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
files = [
@@ -1656,7 +1597,6 @@ test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"]
name = "psycopg2-binary"
version = "2.9.3"
description = "psycopg2 - Python-PostgreSQL Database Adapter"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -1725,7 +1665,6 @@ files = [
name = "pyasn1"
version = "0.4.8"
description = "ASN.1 types and codecs"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1737,7 +1676,6 @@ files = [
name = "pycparser"
version = "2.21"
description = "C parser in Python"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
files = [
@@ -1749,7 +1687,6 @@ files = [
name = "pyjwt"
version = "2.4.0"
description = "JSON Web Token implementation in Python"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -1770,7 +1707,6 @@ tests = ["coverage[toml] (==5.0.4)", "pytest (>=6.0.0,<7.0.0)"]
name = "pyparsing"
version = "3.0.9"
description = "pyparsing module - Classes and methods to define and execute parsing grammars"
category = "main"
optional = false
python-versions = ">=3.6.8"
files = [
@@ -1785,7 +1721,6 @@ diagrams = ["jinja2", "railroad-diagrams"]
name = "pypiwin32"
version = "223"
description = ""
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1800,7 +1735,6 @@ pywin32 = ">=223"
name = "pyrsistent"
version = "0.18.1"
description = "Persistent/Functional/Immutable data structures"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1831,7 +1765,6 @@ files = [
name = "pytest"
version = "7.3.1"
description = "pytest: simple powerful testing with Python"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1854,7 +1787,6 @@ testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "no
name = "pytest-asyncio"
version = "0.21.0"
description = "Pytest support for asyncio"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1873,7 +1805,6 @@ testing = ["coverage (>=6.2)", "flaky (>=3.5.0)", "hypothesis (>=5.7.1)", "mypy
name = "pytest-httpserver"
version = "1.0.8"
description = "pytest-httpserver is a httpserver for pytest"
category = "main"
optional = false
python-versions = ">=3.8,<4.0"
files = [
@@ -1888,7 +1819,6 @@ Werkzeug = ">=2.0.0"
name = "pytest-lazy-fixture"
version = "0.6.3"
description = "It helps to use fixtures in pytest.mark.parametrize"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -1903,7 +1833,6 @@ pytest = ">=3.2.5"
name = "pytest-order"
version = "1.1.0"
description = "pytest plugin to run your tests in a specific order"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -1921,7 +1850,6 @@ pytest = [
name = "pytest-rerunfailures"
version = "11.1.2"
description = "pytest plugin to re-run tests to eliminate flaky failures"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1937,7 +1865,6 @@ pytest = ">=5.3"
name = "pytest-timeout"
version = "2.1.0"
description = "pytest plugin to abort hanging tests"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -1952,7 +1879,6 @@ pytest = ">=5.0.0"
name = "pytest-xdist"
version = "3.3.1"
description = "pytest xdist plugin for distributed testing, most importantly across multiple CPUs"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -1973,7 +1899,6 @@ testing = ["filelock"]
name = "python-dateutil"
version = "2.8.2"
description = "Extensions to the standard Python datetime module"
category = "main"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
files = [
@@ -1988,7 +1913,6 @@ six = ">=1.5"
name = "python-jose"
version = "3.3.0"
description = "JOSE implementation in Python"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -2011,7 +1935,6 @@ pycryptodome = ["pyasn1", "pycryptodome (>=3.3.1,<4.0.0)"]
name = "pywin32"
version = "301"
description = "Python for Window Extensions"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -2031,7 +1954,6 @@ files = [
name = "pyyaml"
version = "6.0"
description = "YAML parser and emitter for Python"
category = "main"
optional = false
python-versions = ">=3.6"
files = [
@@ -2081,7 +2003,6 @@ files = [
name = "requests"
version = "2.31.0"
description = "Python HTTP for Humans."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2103,7 +2024,6 @@ use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
name = "responses"
version = "0.21.0"
description = "A utility library for mocking out the `requests` Python library."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2122,7 +2042,6 @@ tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asy
name = "rsa"
version = "4.9"
description = "Pure-Python RSA implementation"
category = "main"
optional = false
python-versions = ">=3.6,<4"
files = [
@@ -2137,7 +2056,6 @@ pyasn1 = ">=0.1.3"
name = "ruff"
version = "0.0.269"
description = "An extremely fast Python linter, written in Rust."
category = "dev"
optional = false
python-versions = ">=3.7"
files = [
@@ -2164,7 +2082,6 @@ files = [
name = "s3transfer"
version = "0.6.0"
description = "An Amazon S3 Transfer Manager"
category = "main"
optional = false
python-versions = ">= 3.7"
files = [
@@ -2182,7 +2099,6 @@ crt = ["botocore[crt] (>=1.20.29,<2.0a.0)"]
name = "sarif-om"
version = "1.0.4"
description = "Classes implementing the SARIF 2.1.0 object model."
category = "main"
optional = false
python-versions = ">= 2.7"
files = [
@@ -2198,7 +2114,6 @@ pbr = "*"
name = "setuptools"
version = "65.5.1"
description = "Easily download, build, install, upgrade, and uninstall Python packages"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2215,7 +2130,6 @@ testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (
name = "six"
version = "1.16.0"
description = "Python 2 and 3 compatibility utilities"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*"
files = [
@@ -2227,7 +2141,6 @@ files = [
name = "sshpubkeys"
version = "3.3.1"
description = "SSH public key parser"
category = "main"
optional = false
python-versions = ">=3"
files = [
@@ -2246,7 +2159,6 @@ dev = ["twine", "wheel", "yapf"]
name = "toml"
version = "0.10.2"
description = "Python Library for Tom's Obvious, Minimal Language"
category = "main"
optional = false
python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
files = [
@@ -2258,7 +2170,6 @@ files = [
name = "tomli"
version = "2.0.1"
description = "A lil' TOML parser"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2270,7 +2181,6 @@ files = [
name = "types-psutil"
version = "5.9.5.12"
description = "Typing stubs for psutil"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -2282,7 +2192,6 @@ files = [
name = "types-psycopg2"
version = "2.9.21.10"
description = "Typing stubs for psycopg2"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -2294,7 +2203,6 @@ files = [
name = "types-pytest-lazy-fixture"
version = "0.6.3.3"
description = "Typing stubs for pytest-lazy-fixture"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -2306,7 +2214,6 @@ files = [
name = "types-requests"
version = "2.31.0.0"
description = "Typing stubs for requests"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -2321,7 +2228,6 @@ types-urllib3 = "*"
name = "types-s3transfer"
version = "0.6.0.post3"
description = "Type annotations and code completion for s3transfer"
category = "main"
optional = false
python-versions = ">=3.7,<4.0"
files = [
@@ -2333,7 +2239,6 @@ files = [
name = "types-toml"
version = "0.10.8.6"
description = "Typing stubs for toml"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -2345,7 +2250,6 @@ files = [
name = "types-urllib3"
version = "1.26.17"
description = "Typing stubs for urllib3"
category = "main"
optional = false
python-versions = "*"
files = [
@@ -2357,7 +2261,6 @@ files = [
name = "typing-extensions"
version = "4.6.1"
description = "Backported and Experimental Type Hints for Python 3.7+"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2369,7 +2272,6 @@ files = [
name = "urllib3"
version = "1.26.11"
description = "HTTP library with thread-safe connection pooling, file post, and more."
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*, <4"
files = [
@@ -2386,7 +2288,6 @@ socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
name = "websocket-client"
version = "1.3.3"
description = "WebSocket client for Python with low level API options"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2403,7 +2304,6 @@ test = ["websockets"]
name = "werkzeug"
version = "2.2.3"
description = "The comprehensive WSGI web application library."
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2421,7 +2321,6 @@ watchdog = ["watchdog"]
name = "wrapt"
version = "1.14.1"
description = "Module for decorators, wrappers and monkey patching."
category = "main"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7"
files = [
@@ -2495,7 +2394,6 @@ files = [
name = "xmltodict"
version = "0.13.0"
description = "Makes working with XML feel like you are working with JSON"
category = "main"
optional = false
python-versions = ">=3.4"
files = [
@@ -2507,7 +2405,6 @@ files = [
name = "yarl"
version = "1.8.2"
description = "Yet another URL library"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2595,7 +2492,6 @@ multidict = ">=4.0"
name = "zipp"
version = "3.8.1"
description = "Backport of pathlib-compatible object wrapper for zip files"
category = "main"
optional = false
python-versions = ">=3.7"
files = [
@@ -2610,4 +2506,4 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "c6c217033f50430c31b0979b74db222e6bab2301abd8b9f0cce5a9d5bccc578f"
content-hash = "305be93b4987509365a86e3726e22632655a3d205e47c47bcb46f8fcb7a0d284"

View File

@@ -26,7 +26,7 @@ prometheus-client = "^0.14.1"
pytest-timeout = "^2.1.0"
Werkzeug = "^2.2.3"
pytest-order = "^1.1.0"
allure-pytest = "^2.13.2"
allure-pytest = "2.13.1"
pytest-asyncio = "^0.21.0"
toml = "^0.10.2"
psutil = "^5.9.4"

View File

@@ -98,22 +98,6 @@ pub struct PhysicalStorage {
/// - points to write_lsn, so no seek is needed for writing
/// - doesn't point to the end of the segment
file: Option<File>,
/// When false, we have just initialized storage using the LSN from find_end_of_wal().
/// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
/// there can be a case with unexpected .partial file.
///
/// Imagine the following:
/// - 000000010000000000000001
/// - it was fully written, but the last record is split between 2 segments
/// - after restart, find_end_of_wal() returned 0/1FFFFF0, which is in the end of this segment
/// - write_lsn, write_record_lsn and flush_record_lsn were initialized to 0/1FFFFF0
/// - 000000010000000000000002.partial
/// - it has only 1 byte written, which is not enough to make a full WAL record
///
/// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal().
/// This flag will be set to true after the first truncate_wal() call.
is_truncated_after_restart: bool,
}
impl PhysicalStorage {
@@ -173,7 +157,6 @@ impl PhysicalStorage {
flush_record_lsn: flush_lsn,
decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
file: None,
is_truncated_after_restart: false,
})
}
@@ -398,10 +381,7 @@ impl Storage for PhysicalStorage {
// Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on
// disk (this happens on each connect).
if self.is_truncated_after_restart
&& end_pos == self.write_lsn
&& end_pos == self.flush_record_lsn
{
if end_pos == self.write_lsn {
return Ok(());
}
@@ -434,7 +414,6 @@ impl Storage for PhysicalStorage {
self.write_lsn = end_pos;
self.write_record_lsn = end_pos;
self.flush_record_lsn = end_pos;
self.is_truncated_after_restart = true;
Ok(())
}

View File

@@ -163,6 +163,7 @@ def test_forward_params_to_client(static_proxy: NeonProxy):
assert conn.get_parameter_status(name) == value
@pytest.mark.timeout(5)
def test_close_on_connections_exit(static_proxy: NeonProxy):
# Open two connections, send SIGTERM, then ensure that proxy doesn't exit
# until after connections close.